Database Connections from Python
Bridge Python and Databases Seamlessly! 🔌
Databases are the backbone of data science. Whether you're working with traditional SQL databases like PostgreSQL and MySQL, or modern NoSQL solutions like MongoDB, Python provides powerful tools to connect, query, and manipulate your data. Master these connections to unlock the full potential of your data infrastructure!
Why Database Connections Matter
Direct database access from Python enables:
- 🚀 Performance: Query large datasets without loading everything into memory
- 🔄 Real-time Data: Access live, up-to-date information
- 🔐 Security: Leverage database authentication and permissions
- 📊 Scalability: Work with datasets larger than RAM
- 🤝 Integration: Connect to existing data infrastructure
- ⚡ Efficiency: Use database engines for complex operations
Database Connection Libraries
graph TD
A[Python Database Libraries] --> B[Low-Level Drivers]
A --> C[ORMs]
A --> D[Data Analysis]
B --> E[psycopg2 - PostgreSQL]
B --> F[PyMySQL - MySQL]
B --> G[sqlite3 - SQLite]
B --> H[pymongo - MongoDB]
C --> I[SQLAlchemy]
C --> J[Django ORM]
C --> K[Peewee]
D --> L[pandas]
D --> M[dask]
D --> N[polars]
SQLite - Getting Started
SQLite is perfect for learning - it's built into Python and requires no server setup:
import sqlite3
import pandas as pd
# Connect to SQLite database (creates file if doesn't exist)
conn = sqlite3.connect('example.db') # Use ':memory:' for in-memory DB
cursor = conn.cursor()
# Create a table
cursor.execute('''
CREATE TABLE IF NOT EXISTS employees (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
department TEXT,
salary REAL,
hire_date DATE
)
''')
# Insert data
cursor.execute('''
INSERT INTO employees (name, department, salary, hire_date)
VALUES (?, ?, ?, ?)
''', ('John Doe', 'Engineering', 75000, '2020-01-15'))
# Insert multiple rows
employees = [
('Jane Smith', 'Marketing', 65000, '2019-03-22'),
('Bob Johnson', 'Sales', 70000, '2021-06-10'),
('Alice Brown', 'Engineering', 80000, '2018-11-05')
]
cursor.executemany('''
INSERT INTO employees (name, department, salary, hire_date)
VALUES (?, ?, ?, ?)
''', employees)
# Commit changes
conn.commit()
# Query data
cursor.execute('SELECT * FROM employees WHERE salary > ?', (70000,))
results = cursor.fetchall()
for row in results:
print(row)
# Using pandas to read SQL
df = pd.read_sql_query('SELECT * FROM employees', conn)
print(df)
# Close connection
conn.close()
PostgreSQL with psycopg2
Installation and Basic Connection
# Install: pip install psycopg2-binary
import psycopg2
from psycopg2 import sql
import pandas as pd
# Connection parameters
conn_params = {
'host': 'localhost',
'database': 'mydb',
'user': 'username',
'password': 'password',
'port': 5432
}
# Establish connection
try:
conn = psycopg2.connect(**conn_params)
cursor = conn.cursor()
print("Connected to PostgreSQL!")
# Get database version
cursor.execute("SELECT version();")
version = cursor.fetchone()
print(f"PostgreSQL version: {version[0]}")
except psycopg2.Error as e:
print(f"Error connecting to PostgreSQL: {e}")
# Create table with PostgreSQL-specific features
cursor.execute('''
CREATE TABLE IF NOT EXISTS products (
id SERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL,
price DECIMAL(10, 2),
category VARCHAR(50),
in_stock BOOLEAN DEFAULT true,
tags TEXT[],
metadata JSONB,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# Insert data with RETURNING clause
cursor.execute('''
INSERT INTO products (name, price, category, tags, metadata)
VALUES (%s, %s, %s, %s, %s)
RETURNING id
''', (
'Laptop',
999.99,
'Electronics',
['portable', 'computer', 'business'],
'{"brand": "TechCorp", "warranty": "2 years"}'
))
new_id = cursor.fetchone()[0]
print(f"Inserted product with ID: {new_id}")
# Commit and close
conn.commit()
cursor.close()
conn.close()
Context Manager for Safe Connections
import psycopg2
from contextlib import contextmanager
@contextmanager
def get_db_connection(**kwargs):
"""Context manager for database connections"""
conn = None
try:
conn = psycopg2.connect(**kwargs)
yield conn
except psycopg2.Error as e:
if conn:
conn.rollback()
raise e
finally:
if conn:
conn.close()
# Usage
with get_db_connection(**conn_params) as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM products WHERE price < %s", (500,))
results = cursor.fetchall()
conn.commit()
MySQL with PyMySQL
# Install: pip install pymysql
import pymysql
import pandas as pd
from datetime import datetime
# Connect to MySQL
connection = pymysql.connect(
host='localhost',
user='root',
password='password',
database='testdb',
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor # Return results as dictionaries
)
try:
with connection.cursor() as cursor:
# Create table
cursor.execute('''
CREATE TABLE IF NOT EXISTS orders (
order_id INT AUTO_INCREMENT PRIMARY KEY,
customer_name VARCHAR(100),
product VARCHAR(100),
quantity INT,
order_date DATETIME,
status ENUM('pending', 'processing', 'shipped', 'delivered')
)
''')
# Insert with prepared statement
sql = '''
INSERT INTO orders (customer_name, product, quantity, order_date, status)
VALUES (%s, %s, %s, %s, %s)
'''
orders = [
('Alice Johnson', 'Widget A', 5, datetime.now(), 'pending'),
('Bob Smith', 'Gadget B', 2, datetime.now(), 'processing'),
('Charlie Brown', 'Tool C', 1, datetime.now(), 'shipped')
]
cursor.executemany(sql, orders)
# Query with JOIN (assuming we have a customers table)
cursor.execute('''
SELECT o.*, c.email
FROM orders o
LEFT JOIN customers c ON o.customer_name = c.name
WHERE o.status != 'delivered'
''')
# Fetch results as dictionaries
results = cursor.fetchall()
for row in results:
print(f"Order {row['order_id']}: {row['product']} for {row['customer_name']}")
# Commit changes
connection.commit()
finally:
connection.close()
SQLAlchemy - The Power of ORMs
Core SQLAlchemy
# Install: pip install sqlalchemy
from sqlalchemy import create_engine, text, MetaData, Table, Column, Integer, String, Float, DateTime
from sqlalchemy.sql import select, insert, update, delete
import pandas as pd
from datetime import datetime
# Create engine (connection pool)
# SQLite
engine = create_engine('sqlite:///example.db', echo=True)
# PostgreSQL
# engine = create_engine('postgresql://user:password@localhost/dbname')
# MySQL
# engine = create_engine('mysql+pymysql://user:password@localhost/dbname')
# SQL Server
# engine = create_engine('mssql+pyodbc://user:password@server/database?driver=ODBC+Driver+17+for+SQL+Server')
# Execute raw SQL
with engine.connect() as conn:
result = conn.execute(text("SELECT * FROM employees WHERE salary > :sal"), {"sal": 70000})
for row in result:
print(row)
# Using pandas with SQLAlchemy
df = pd.read_sql_query("SELECT * FROM employees", engine)
print(df)
# Write DataFrame to database
new_df = pd.DataFrame({
'name': ['New Employee'],
'department': ['IT'],
'salary': [85000],
'hire_date': [datetime.now()]
})
new_df.to_sql('employees', engine, if_exists='append', index=False)
# Table reflection - work with existing tables
metadata = MetaData()
metadata.reflect(bind=engine)
employees_table = metadata.tables['employees']
# Query using SQLAlchemy core
with engine.connect() as conn:
# Select
query = select(employees_table).where(employees_table.c.department == 'Engineering')
result = conn.execute(query)
# Insert
insert_stmt = insert(employees_table).values(
name='Sarah Wilson',
department='HR',
salary=60000,
hire_date=datetime.now()
)
conn.execute(insert_stmt)
conn.commit()
SQLAlchemy ORM
from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship
from datetime import datetime
# Create base class
Base = declarative_base()
# Define models
class Department(Base):
__tablename__ = 'departments'
id = Column(Integer, primary_key=True)
name = Column(String(50), unique=True, nullable=False)
budget = Column(Float)
# Relationship
employees = relationship("Employee", back_populates="department")
def __repr__(self):
return f""
class Employee(Base):
__tablename__ = 'employees_orm'
id = Column(Integer, primary_key=True)
name = Column(String(100), nullable=False)
email = Column(String(100), unique=True)
salary = Column(Float)
hire_date = Column(DateTime, default=datetime.now)
department_id = Column(Integer, ForeignKey('departments.id'))
# Relationship
department = relationship("Department", back_populates="employees")
def __repr__(self):
return f""
# Create engine and tables
engine = create_engine('sqlite:///orm_example.db', echo=True)
Base.metadata.create_all(engine)
# Create session
Session = sessionmaker(bind=engine)
session = Session()
# Create and add objects
eng_dept = Department(name='Engineering', budget=1000000)
mkt_dept = Department(name='Marketing', budget=500000)
session.add_all([eng_dept, mkt_dept])
session.commit()
# Add employees
john = Employee(
name='John Doe',
email='john@example.com',
salary=75000,
department=eng_dept
)
jane = Employee(
name='Jane Smith',
email='jane@example.com',
salary=65000,
department=mkt_dept
)
session.add_all([john, jane])
session.commit()
# Query using ORM
# Get all employees
all_employees = session.query(Employee).all()
for emp in all_employees:
print(f"{emp.name} works in {emp.department.name}")
# Filter
high_earners = session.query(Employee).filter(Employee.salary > 70000).all()
# Join
results = session.query(Employee, Department)\
.join(Department)\
.filter(Department.budget > 600000)\
.all()
# Aggregation
from sqlalchemy import func
dept_stats = session.query(
Department.name,
func.count(Employee.id).label('employee_count'),
func.avg(Employee.salary).label('avg_salary')
).join(Employee).group_by(Department.name).all()
for stat in dept_stats:
print(f"{stat.name}: {stat.employee_count} employees, avg salary: ${stat.avg_salary:,.2f}")
# Update
john.salary = 80000
session.commit()
# Delete
session.delete(jane)
session.commit()
# Close session
session.close()
Connection Pooling
Efficient connection management for production applications:
from sqlalchemy import create_engine
from sqlalchemy.pool import NullPool, QueuePool
# Connection pool configuration
engine = create_engine(
'postgresql://user:pass@localhost/db',
poolclass=QueuePool,
pool_size=10, # Number of connections to maintain
max_overflow=20, # Maximum overflow connections
pool_pre_ping=True, # Test connections before using
pool_recycle=3600, # Recycle connections after 1 hour
echo_pool=True # Log pool checkouts/checkins
)
# For applications that need fresh connections
engine = create_engine(
'postgresql://user:pass@localhost/db',
poolclass=NullPool # No connection pooling
)
# Custom pool configuration
from sqlalchemy.pool import StaticPool
# For SQLite with threading
engine = create_engine(
'sqlite:///example.db',
connect_args={'check_same_thread': False},
poolclass=StaticPool
)
MongoDB with PyMongo
Working with NoSQL databases:
# Install: pip install pymongo
from pymongo import MongoClient
import pandas as pd
from datetime import datetime
# Connect to MongoDB
client = MongoClient('mongodb://localhost:27017/')
# Or with authentication
# client = MongoClient('mongodb://username:password@localhost:27017/')
# Select database and collection
db = client['mydatabase']
collection = db['users']
# Insert documents
user = {
'name': 'Alice Johnson',
'email': 'alice@example.com',
'age': 30,
'interests': ['python', 'data science', 'machine learning'],
'address': {
'street': '123 Main St',
'city': 'New York',
'country': 'USA'
},
'created_at': datetime.now()
}
# Insert one
result = collection.insert_one(user)
print(f"Inserted ID: {result.inserted_id}")
# Insert many
users = [
{'name': 'Bob Smith', 'age': 25, 'email': 'bob@example.com'},
{'name': 'Charlie Brown', 'age': 35, 'email': 'charlie@example.com'}
]
collection.insert_many(users)
# Query documents
# Find one
user = collection.find_one({'name': 'Alice Johnson'})
print(user)
# Find many with filter
young_users = collection.find({'age': {'$lt': 30}})
for user in young_users:
print(user['name'])
# Complex queries
results = collection.find({
'$and': [
{'age': {'$gte': 25}},
{'interests': {'$in': ['python', 'javascript']}}
]
})
# Aggregation pipeline
pipeline = [
{'$match': {'age': {'$gte': 25}}},
{'$group': {'_id': '$city', 'count': {'$sum': 1}}},
{'$sort': {'count': -1}}
]
agg_results = collection.aggregate(pipeline)
for result in agg_results:
print(result)
# Update documents
collection.update_one(
{'name': 'Alice Johnson'},
{'$set': {'age': 31, 'updated_at': datetime.now()}}
)
# Update many
collection.update_many(
{'age': {'$lt': 30}},
{'$inc': {'age': 1}} # Increment age by 1
)
# Delete documents
collection.delete_one({'name': 'Bob Smith'})
collection.delete_many({'age': {'$gt': 40}})
# Convert to DataFrame
cursor = collection.find()
df = pd.DataFrame(list(cursor))
print(df)
# Create indexes for better performance
collection.create_index('email', unique=True)
collection.create_index([('age', 1), ('name', -1)]) # Compound index
# Close connection
client.close()
Async Database Operations
Async PostgreSQL with asyncpg
# Install: pip install asyncpg asyncio
import asyncio
import asyncpg
import pandas as pd
async def run_queries():
# Create connection pool
pool = await asyncpg.create_pool(
host='localhost',
database='testdb',
user='user',
password='password',
min_size=10,
max_size=20
)
async with pool.acquire() as connection:
# Execute query
rows = await connection.fetch('''
SELECT * FROM employees WHERE salary > $1
''', 70000)
for row in rows:
print(dict(row))
# Transaction
async with connection.transaction():
await connection.execute('''
UPDATE employees SET salary = salary * 1.1
WHERE department = $1
''', 'Engineering')
# This will be rolled back if error occurs
await connection.execute('''
INSERT INTO audit_log (action, timestamp)
VALUES ($1, $2)
''', 'Salary increase', datetime.now())
# Close pool
await pool.close()
# Run async function
asyncio.run(run_queries())
# Async with SQLAlchemy
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
async def async_sqlalchemy():
# Create async engine
engine = create_async_engine(
'postgresql+asyncpg://user:password@localhost/db',
echo=True
)
# Create async session
async_session = sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)
async with async_session() as session:
result = await session.execute(
select(Employee).where(Employee.salary > 70000)
)
employees = result.scalars().all()
await engine.dispose()
asyncio.run(async_sqlalchemy())
Best Practices
Security
import os
from dotenv import load_dotenv
import psycopg2
# Load environment variables
load_dotenv()
# Never hardcode credentials
DB_CONFIG = {
'host': os.getenv('DB_HOST'),
'database': os.getenv('DB_NAME'),
'user': os.getenv('DB_USER'),
'password': os.getenv('DB_PASSWORD')
}
# Use parameterized queries to prevent SQL injection
def safe_query(conn, user_input):
cursor = conn.cursor()
# NEVER do this - vulnerable to SQL injection
# bad_query = f"SELECT * FROM users WHERE name = '{user_input}'"
# Do this instead - parameterized query
safe_query = "SELECT * FROM users WHERE name = %s"
cursor.execute(safe_query, (user_input,))
return cursor.fetchall()
# Use connection encryption
ssl_conn = psycopg2.connect(
**DB_CONFIG,
sslmode='require',
sslcert='client-cert.pem',
sslkey='client-key.pem',
sslrootcert='ca-cert.pem'
)
Error Handling
import logging
from sqlalchemy import create_engine
from sqlalchemy.exc import SQLAlchemyError, IntegrityError, OperationalError
from retry import retry
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class DatabaseManager:
def __init__(self, connection_string):
self.engine = create_engine(connection_string)
@retry(tries=3, delay=2, backoff=2)
def execute_with_retry(self, query, params=None):
"""Execute query with automatic retry on failure"""
try:
with self.engine.connect() as conn:
result = conn.execute(query, params or {})
conn.commit()
return result
except OperationalError as e:
logger.error(f"Database operational error: {e}")
raise
except IntegrityError as e:
logger.error(f"Data integrity error: {e}")
# Don't retry integrity errors
raise
except SQLAlchemyError as e:
logger.error(f"Database error: {e}")
raise
except Exception as e:
logger.error(f"Unexpected error: {e}")
raise
def health_check(self):
"""Check database connectivity"""
try:
with self.engine.connect() as conn:
conn.execute("SELECT 1")
logger.info("Database connection successful")
return True
except Exception as e:
logger.error(f"Database health check failed: {e}")
return False
Performance Optimization
# Batch operations
def batch_insert(engine, df, table_name, chunksize=1000):
"""Insert DataFrame in batches for better performance"""
with engine.begin() as conn:
df.to_sql(
table_name,
con=conn,
if_exists='append',
index=False,
chunksize=chunksize,
method='multi' # Use multi-row insert
)
# Use COPY for PostgreSQL (fastest method)
def bulk_copy_postgres(conn, df, table_name):
"""Use COPY for bulk insert in PostgreSQL"""
from io import StringIO
output = StringIO()
df.to_csv(output, sep='\t', header=False, index=False)
output.seek(0)
cursor = conn.cursor()
cursor.copy_from(output, table_name, columns=df.columns.tolist())
conn.commit()
# Query optimization
def optimized_query(engine):
"""Demonstrate query optimization techniques"""
# Use LIMIT for testing
query = """
SELECT * FROM large_table
LIMIT 1000
"""
# Use indexes
create_index = """
CREATE INDEX IF NOT EXISTS idx_salary
ON employees(salary)
"""
# Use EXPLAIN to analyze query plan
explain_query = """
EXPLAIN ANALYZE
SELECT e.name, d.name
FROM employees e
JOIN departments d ON e.dept_id = d.id
WHERE e.salary > 70000
"""
with engine.connect() as conn:
result = conn.execute(explain_query)
for row in result:
print(row)
Monitoring and Logging
import time
from contextlib import contextmanager
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@contextmanager
def log_query_performance(query_name):
"""Log query execution time"""
start_time = time.time()
try:
yield
finally:
execution_time = time.time() - start_time
logger.info(f"Query '{query_name}' took {execution_time:.3f} seconds")
# Usage
with log_query_performance("fetch_employees"):
df = pd.read_sql("SELECT * FROM employees", engine)
# Connection pool monitoring
from sqlalchemy import event
@event.listens_for(engine, "connect")
def receive_connect(dbapi_conn, connection_record):
logger.info("Database connection established")
@event.listens_for(engine, "checkout")
def receive_checkout(dbapi_conn, connection_record, connection_proxy):
logger.debug("Connection checked out from pool")
@event.listens_for(engine, "checkin")
def receive_checkin(dbapi_conn, connection_record):
logger.debug("Connection returned to pool")
Practice Exercises
Exercise 1: Multi-Database ETL Pipeline
Create an ETL pipeline that:
- Extracts data from SQLite
- Transforms it using pandas
- Loads into PostgreSQL
- Logs all operations
- Handles errors gracefully
Exercise 2: Database Migration Tool
Build a tool that:
- Connects to source database
- Reads schema information
- Creates matching tables in target database
- Migrates data with progress tracking
- Validates data integrity
Exercise 3: Real-time Data Sync
Implement a system that:
- Monitors changes in source database
- Syncs to multiple target databases
- Handles connection failures
- Provides sync status dashboard
Key Takeaways
- 🔌 Multiple connection options from low-level drivers to ORMs
- 🛡️ Always use parameterized queries for security
- ⚡ Connection pooling is essential for production
- 🔄 Async operations for high-performance applications
- 📊 Pandas integration makes data analysis seamless
- 🎯 Choose the right tool: raw SQL, ORM, or NoSQL
- 🔍 Monitor and log database operations