NoSQL Basics (MongoDB)
Beyond Relational: Document Databases for Modern Data! 🚀
NoSQL databases offer flexibility and scalability that traditional SQL databases sometimes can't match. MongoDB, the leading document database, stores data in flexible JSON-like documents, making it perfect for unstructured data, real-time analytics, and modern applications. Learn when and how to leverage NoSQL in your data science projects!
SQL vs NoSQL: Understanding the Differences
graph TD
A[Database Types] --> B[SQL/Relational]
A --> C[NoSQL]
B --> D[PostgreSQL]
B --> E[MySQL]
B --> F[SQL Server]
C --> G[Document: MongoDB]
C --> H[Key-Value: Redis]
C --> I[Column: Cassandra]
C --> J[Graph: Neo4j]
G --> K[JSON Documents]
H --> L[Simple Pairs]
I --> M[Wide Columns]
J --> N[Nodes & Edges]
When to Use NoSQL
# Comparison: SQL vs NoSQL Use Cases
sql_use_cases = {
"ACID Compliance": "Financial transactions",
"Complex Joins": "Multi-table relationships",
"Structured Data": "Fixed schema requirements",
"Strong Consistency": "Inventory management",
"SQL Expertise": "Business intelligence tools"
}
nosql_use_cases = {
"Flexible Schema": "Varying data structures",
"Horizontal Scaling": "Distributed systems",
"Big Data": "High volume/velocity",
"Real-time": "IoT sensor data",
"Document Storage": "Content management",
"Nested Data": "JSON/XML documents",
"Geographic Distribution": "Global applications"
}
# Decision matrix
def choose_database(requirements):
"""Simple decision helper for SQL vs NoSQL"""
sql_score = 0
nosql_score = 0
if requirements.get('needs_transactions'):
sql_score += 2
if requirements.get('fixed_schema'):
sql_score += 2
if requirements.get('complex_queries'):
sql_score += 2
if requirements.get('flexible_schema'):
nosql_score += 2
if requirements.get('horizontal_scaling'):
nosql_score += 2
if requirements.get('nested_documents'):
nosql_score += 2
return "SQL" if sql_score > nosql_score else "NoSQL"
MongoDB Installation and Setup
# Install MongoDB and PyMongo
"""
# Install MongoDB (varies by OS)
# macOS: brew install mongodb-community
# Ubuntu: sudo apt-get install mongodb
# Windows: Download installer from mongodb.com
# Install Python driver
pip install pymongo
pip install pandas
pip install python-dotenv # for environment variables
"""
# Basic connection
from pymongo import MongoClient
import pandas as pd
import json
from datetime import datetime, timedelta
import numpy as np
# Connect to MongoDB
# Local connection
client = MongoClient('mongodb://localhost:27017/')
# Connection with authentication
# client = MongoClient('mongodb://username:password@localhost:27017/')
# MongoDB Atlas (cloud) connection
# client = MongoClient('mongodb+srv://username:password@cluster.mongodb.net/')
# Select/create database
db = client['data_science_db']
# Select/create collection (like a table in SQL)
collection = db['users']
# Check connection
print("Databases:", client.list_database_names())
print("Collections in db:", db.list_collection_names())
# Server info
server_info = client.server_info()
print(f"MongoDB version: {server_info['version']}")
CRUD Operations in MongoDB
Create (Insert) Operations
# Insert single document
user = {
"name": "Alice Johnson",
"email": "alice@example.com",
"age": 30,
"roles": ["user", "admin"],
"address": {
"street": "123 Main St",
"city": "New York",
"country": "USA",
"coordinates": {"lat": 40.7128, "lng": -74.0060}
},
"interests": ["data science", "machine learning", "python"],
"created_at": datetime.now(),
"metadata": {
"source": "web",
"campaign": "summer_2024"
}
}
result = collection.insert_one(user)
print(f"Inserted document ID: {result.inserted_id}")
# Insert multiple documents
users = [
{
"name": "Bob Smith",
"email": "bob@example.com",
"age": 25,
"roles": ["user"],
"interests": ["python", "mongodb"],
"created_at": datetime.now()
},
{
"name": "Carol White",
"email": "carol@example.com",
"age": 35,
"roles": ["user", "moderator"],
"interests": ["analytics", "visualization"],
"purchase_history": [
{"product": "Course A", "price": 99.99, "date": datetime.now()},
{"product": "Course B", "price": 149.99, "date": datetime.now()}
],
"created_at": datetime.now()
}
]
result = collection.insert_many(users)
print(f"Inserted {len(result.inserted_ids)} documents")
# Bulk insert with error handling
from pymongo.errors import BulkWriteError
def bulk_insert_with_validation(documents):
"""Bulk insert with error handling"""
try:
result = collection.insert_many(documents, ordered=False)
print(f"Successfully inserted {len(result.inserted_ids)} documents")
return result.inserted_ids
except BulkWriteError as e:
print(f"Bulk write error: {e.details}")
# Process successful insertions
successful = len(documents) - len(e.details['writeErrors'])
print(f"Successfully inserted {successful} documents")
return None
Read (Query) Operations
# Find one document
user = collection.find_one({"name": "Alice Johnson"})
print(json.dumps(user, default=str, indent=2))
# Find multiple documents
users = collection.find({"age": {"$gte": 25}})
for user in users:
print(f"{user['name']}: {user['age']} years old")
# Query with multiple conditions
query = {
"$and": [
{"age": {"$gte": 25}},
{"age": {"$lte": 35}},
{"interests": {"$in": ["python", "data science"]}}
]
}
results = collection.find(query)
# Convert to DataFrame
df = pd.DataFrame(list(collection.find()))
print(df.head())
# Projection (select specific fields)
projection = {"name": 1, "email": 1, "age": 1, "_id": 0}
users = collection.find({}, projection)
# Complex queries
# Nested field query
users_in_ny = collection.find({"address.city": "New York"})
# Array queries
python_users = collection.find({"interests": "python"})
# Regex search
pattern = {"name": {"$regex": "^A", "$options": "i"}} # Names starting with 'A'
users = collection.find(pattern)
# Comparison operators
comparison_queries = {
"Equal": {"age": 30},
"Not Equal": {"age": {"$ne": 30}},
"Greater Than": {"age": {"$gt": 30}},
"Greater or Equal": {"age": {"$gte": 30}},
"Less Than": {"age": {"$lt": 30}},
"Less or Equal": {"age": {"$lte": 30}},
"In List": {"age": {"$in": [25, 30, 35]}},
"Not In List": {"age": {"$nin": [25, 30, 35]}},
}
# Logical operators
logical_queries = {
"AND": {"$and": [{"age": {"$gte": 25}}, {"age": {"$lte": 35}}]},
"OR": {"$or": [{"age": {"$lt": 25}}, {"age": {"$gt": 35}}]},
"NOT": {"age": {"$not": {"$eq": 30}}},
"NOR": {"$nor": [{"age": 25}, {"age": 35}]}
}
# Element operators
element_queries = {
"Exists": {"address": {"$exists": True}},
"Type": {"age": {"$type": "int"}}
}
# Array operators
array_queries = {
"All": {"interests": {"$all": ["python", "mongodb"]}},
"Element Match": {"purchase_history": {"$elemMatch": {"price": {"$gt": 100}}}},
"Size": {"interests": {"$size": 3}}
}
Update Operations
# Update one document
result = collection.update_one(
{"name": "Alice Johnson"},
{"$set": {"age": 31, "updated_at": datetime.now()}}
)
print(f"Modified {result.modified_count} document(s)")
# Update many documents
result = collection.update_many(
{"age": {"$lt": 30}},
{"$inc": {"age": 1}} # Increment age by 1
)
print(f"Modified {result.modified_count} document(s)")
# Update operators
update_operators = {
# Field operators
"$set": {"field": "value"}, # Set field value
"$unset": {"field": ""}, # Remove field
"$inc": {"count": 1}, # Increment
"$mul": {"price": 1.1}, # Multiply
"$rename": {"oldName": "newName"}, # Rename field
"$min": {"score": 10}, # Update if less than current
"$max": {"score": 100}, # Update if greater than current
"$currentDate": {"lastModified": True}, # Set to current date
# Array operators
"$push": {"tags": "new_tag"}, # Add to array
"$pull": {"tags": "old_tag"}, # Remove from array
"$addToSet": {"tags": "unique_tag"}, # Add if not exists
"$pop": {"tags": 1}, # Remove last element (1) or first (-1)
"$pullAll": {"tags": ["tag1", "tag2"]}, # Remove multiple
}
# Upsert (update or insert)
result = collection.update_one(
{"email": "new@example.com"},
{"$set": {"name": "New User", "created_at": datetime.now()}},
upsert=True
)
# Replace entire document
result = collection.replace_one(
{"name": "Bob Smith"},
{
"name": "Robert Smith",
"email": "robert@example.com",
"age": 26,
"roles": ["premium_user"],
"updated_at": datetime.now()
}
)
# Bulk updates
from pymongo import UpdateOne
bulk_updates = [
UpdateOne({"name": "Alice Johnson"}, {"$set": {"status": "active"}}),
UpdateOne({"name": "Bob Smith"}, {"$set": {"status": "inactive"}}),
UpdateOne({"name": "Carol White"}, {"$inc": {"login_count": 1}})
]
result = collection.bulk_write(bulk_updates)
print(f"Bulk update: {result.modified_count} modified")
Delete Operations
# Delete one document
result = collection.delete_one({"name": "Test User"})
print(f"Deleted {result.deleted_count} document(s)")
# Delete many documents
result = collection.delete_many({"age": {"$lt": 18}})
print(f"Deleted {result.deleted_count} document(s)")
# Delete with conditions
result = collection.delete_many({
"$and": [
{"created_at": {"$lt": datetime.now() - timedelta(days=365)}},
{"status": "inactive"}
]
})
# Delete all documents (careful!)
# result = collection.delete_many({})
# Drop collection entirely
# collection.drop()
Aggregation Pipeline
Basic Aggregation
# MongoDB Aggregation Pipeline - powerful data processing
# Sample e-commerce data
orders = db['orders']
# Insert sample data
sample_orders = [
{
"order_id": i,
"customer_id": np.random.randint(1, 100),
"products": [
{
"product_id": np.random.randint(1, 50),
"name": f"Product_{np.random.randint(1, 50)}",
"category": np.random.choice(['Electronics', 'Clothing', 'Books']),
"price": round(np.random.uniform(10, 500), 2),
"quantity": np.random.randint(1, 5)
}
for _ in range(np.random.randint(1, 4))
],
"order_date": datetime.now() - timedelta(days=np.random.randint(0, 365)),
"status": np.random.choice(['pending', 'shipped', 'delivered']),
"region": np.random.choice(['North', 'South', 'East', 'West'])
}
for i in range(1000)
]
orders.insert_many(sample_orders)
# Basic aggregation pipeline
pipeline = [
# Stage 1: Match (filter)
{"$match": {"status": "delivered"}},
# Stage 2: Unwind array
{"$unwind": "$products"},
# Stage 3: Group and aggregate
{"$group": {
"_id": "$products.category",
"total_revenue": {"$sum": {"$multiply": ["$products.price", "$products.quantity"]}},
"order_count": {"$sum": 1},
"avg_price": {"$avg": "$products.price"}
}},
# Stage 4: Sort
{"$sort": {"total_revenue": -1}},
# Stage 5: Limit
{"$limit": 10}
]
results = list(orders.aggregate(pipeline))
df = pd.DataFrame(results)
print(df)
Advanced Aggregation
# Complex aggregation example: Sales analytics
advanced_pipeline = [
# Stage 1: Match recent orders
{"$match": {
"order_date": {"$gte": datetime.now() - timedelta(days=90)}
}},
# Stage 2: Add computed fields
{"$addFields": {
"order_total": {
"$reduce": {
"input": "$products",
"initialValue": 0,
"in": {
"$add": ["$$value", {"$multiply": ["$$this.price", "$$this.quantity"]}]
}
}
},
"item_count": {"$size": "$products"},
"order_month": {"$dateToString": {"format": "%Y-%m", "date": "$order_date"}}
}},
# Stage 3: Lookup (join) customer data
{"$lookup": {
"from": "customers",
"localField": "customer_id",
"foreignField": "customer_id",
"as": "customer_info"
}},
# Stage 4: Unwind customer info
{"$unwind": {"path": "$customer_info", "preserveNullAndEmptyArrays": True}},
# Stage 5: Group by month and region
{"$group": {
"_id": {
"month": "$order_month",
"region": "$region"
},
"total_revenue": {"$sum": "$order_total"},
"order_count": {"$sum": 1},
"avg_order_value": {"$avg": "$order_total"},
"unique_customers": {"$addToSet": "$customer_id"},
"max_order": {"$max": "$order_total"},
"min_order": {"$min": "$order_total"}
}},
# Stage 6: Add customer count
{"$addFields": {
"customer_count": {"$size": "$unique_customers"}
}},
# Stage 7: Remove customer array (no longer needed)
{"$project": {
"unique_customers": 0
}},
# Stage 8: Sort by revenue
{"$sort": {"total_revenue": -1}},
# Stage 9: Facet for multiple outputs
{"$facet": {
"by_region": [
{"$group": {
"_id": "$_id.region",
"total": {"$sum": "$total_revenue"}
}},
{"$sort": {"total": -1}}
],
"by_month": [
{"$group": {
"_id": "$_id.month",
"total": {"$sum": "$total_revenue"}
}},
{"$sort": {"_id": 1}}
],
"top_combinations": [
{"$limit": 5}
]
}}
]
results = list(orders.aggregate(advanced_pipeline))
print(json.dumps(results[0], default=str, indent=2))
Aggregation Operators
# Common aggregation operators
# Arithmetic operators
arithmetic_ops = {
"$add": ["$price", "$tax"],
"$subtract": ["$total", "$discount"],
"$multiply": ["$quantity", "$unit_price"],
"$divide": ["$total", "$quantity"],
"$mod": ["$value", 10],
"$pow": ["$base", 2],
"$sqrt": "$value",
"$abs": "$difference"
}
# String operators
string_ops = {
"$concat": ["$first_name", " ", "$last_name"],
"$substr": ["$description", 0, 100],
"$toLower": "$email",
"$toUpper": "$code",
"$split": ["$tags", ","],
"$trim": {"input": "$text", "chars": " "}
}
# Date operators
date_ops = {
"$dayOfMonth": "$date",
"$month": "$date",
"$year": "$date",
"$dayOfWeek": "$date",
"$dateToString": {"format": "%Y-%m-%d", "date": "$date"},
"$dateDiff": {
"startDate": "$start_date",
"endDate": "$end_date",
"unit": "day"
}
}
# Array operators
array_ops = {
"$size": "$items",
"$slice": ["$array", 5],
"$arrayElemAt": ["$array", 0],
"$filter": {
"input": "$items",
"as": "item",
"cond": {"$gte": ["$$item.price", 100]}
},
"$map": {
"input": "$items",
"as": "item",
"in": {"$multiply": ["$$item.price", 1.1]}
}
}
# Conditional operators
conditional_ops = {
"$cond": {
"if": {"$gte": ["$score", 90]},
"then": "A",
"else": "B"
},
"$switch": {
"branches": [
{"case": {"$gte": ["$score", 90]}, "then": "A"},
{"case": {"$gte": ["$score", 80]}, "then": "B"},
{"case": {"$gte": ["$score", 70]}, "then": "C"}
],
"default": "F"
},
"$ifNull": ["$optional_field", "default_value"]
}
Indexing in MongoDB
# Create indexes for better performance
# Single field index
collection.create_index("email", unique=True)
collection.create_index([("age", 1)]) # 1 for ascending, -1 for descending
# Compound index
collection.create_index([
("category", 1),
("price", -1)
])
# Text index for search
collection.create_index([("description", "text"), ("title", "text")])
# 2D geospatial index
collection.create_index([("location", "2dsphere")])
# Partial index (conditional)
collection.create_index(
"age",
partialFilterExpression={"age": {"$gte": 18}}
)
# TTL index (auto-delete documents)
collection.create_index(
"expireAt",
expireAfterSeconds=3600 # Delete after 1 hour
)
# List all indexes
indexes = collection.list_indexes()
for index in indexes:
print(index)
# Index usage statistics
stats = db.command("collStats", "users", indexDetails=True)
print(json.dumps(stats['indexSizes'], indent=2))
# Drop index
collection.drop_index("age_1")
# Explain query execution
explanation = collection.find({"age": {"$gte": 25}}).explain()
print(json.dumps(explanation['executionStats'], default=str, indent=2))
Data Modeling in MongoDB
Embedding vs Referencing
# Embedding Pattern (Denormalized)
embedded_blog_post = {
"_id": ObjectId(),
"title": "Introduction to MongoDB",
"author": {
"name": "John Doe",
"email": "john@example.com",
"bio": "Database expert"
},
"comments": [
{
"user": "Alice",
"text": "Great post!",
"date": datetime.now()
},
{
"user": "Bob",
"text": "Very helpful",
"date": datetime.now()
}
],
"tags": ["mongodb", "nosql", "database"],
"created_at": datetime.now()
}
# Pros: Single query gets all data, atomic updates
# Cons: Document size limit (16MB), data duplication
# Reference Pattern (Normalized)
# Authors collection
author = {
"_id": ObjectId(),
"name": "John Doe",
"email": "john@example.com",
"bio": "Database expert"
}
# Posts collection
post = {
"_id": ObjectId(),
"title": "Introduction to MongoDB",
"author_id": author["_id"], # Reference to author
"created_at": datetime.now()
}
# Comments collection
comment = {
"_id": ObjectId(),
"post_id": post["_id"], # Reference to post
"user": "Alice",
"text": "Great post!",
"date": datetime.now()
}
# Pros: No duplication, smaller documents
# Cons: Multiple queries needed, no transactions (before MongoDB 4.0)
# Hybrid Pattern
hybrid_post = {
"_id": ObjectId(),
"title": "Introduction to MongoDB",
"author_id": ObjectId(), # Reference
"author_name": "John Doe", # Denormalized for display
"comment_count": 2, # Cached count
"recent_comments": [ # Embed recent only
{"user": "Alice", "text": "Great post!"}
],
"tags": ["mongodb", "nosql"],
"created_at": datetime.now()
}
Schema Design Patterns
# Common MongoDB design patterns
# 1. Polymorphic Pattern - Different types in same collection
products = [
{
"type": "book",
"title": "MongoDB Guide",
"author": "John Doe",
"isbn": "978-1234567890",
"pages": 350
},
{
"type": "movie",
"title": "Data Science Documentary",
"director": "Jane Smith",
"duration": 120,
"format": "BluRay"
}
]
# 2. Attribute Pattern - Flexible attributes
product_with_attrs = {
"name": "Laptop",
"attributes": [
{"key": "color", "value": "silver"},
{"key": "ram", "value": "16GB", "type": "spec"},
{"key": "warranty", "value": "2 years", "type": "service"}
]
}
# 3. Bucket Pattern - Time series data
sensor_bucket = {
"sensor_id": "sensor_001",
"start_time": datetime(2024, 1, 1, 0, 0),
"end_time": datetime(2024, 1, 1, 1, 0),
"measurement_count": 3600,
"measurements": [
{"timestamp": datetime(2024, 1, 1, 0, 0, 0), "temp": 22.5, "humidity": 45},
{"timestamp": datetime(2024, 1, 1, 0, 0, 1), "temp": 22.6, "humidity": 45},
# ... up to 3600 measurements
],
"summary": {
"avg_temp": 22.5,
"max_temp": 23.1,
"min_temp": 22.0
}
}
# 4. Computed Pattern - Pre-calculated values
order_with_computed = {
"order_id": 1,
"items": [
{"product": "A", "quantity": 2, "price": 10},
{"product": "B", "quantity": 1, "price": 20}
],
"subtotal": 40, # Pre-computed
"tax": 4, # Pre-computed
"total": 44, # Pre-computed
"item_count": 3 # Pre-computed
}
# 5. Subset Pattern - Frequently accessed subset
user_with_subset = {
"_id": ObjectId(),
"name": "John Doe",
"email": "john@example.com",
"recent_posts": [ # Last 10 posts
{"title": "Post 1", "date": datetime.now()},
{"title": "Post 2", "date": datetime.now()}
],
"total_posts": 150 # Full count
}
# 6. Tree Pattern - Hierarchical data
category_tree = {
"_id": "electronics",
"name": "Electronics",
"path": ",electronics,",
"parent": None,
"ancestors": []
}
subcategory = {
"_id": "laptops",
"name": "Laptops",
"path": ",electronics,laptops,",
"parent": "electronics",
"ancestors": ["electronics"]
}
MongoDB with Pandas
# Working with MongoDB and Pandas
# Load data from MongoDB to DataFrame
def mongo_to_dataframe(collection, query={}, projection=None):
"""Convert MongoDB collection to pandas DataFrame"""
cursor = collection.find(query, projection)
df = pd.DataFrame(list(cursor))
# Handle ObjectId
if '_id' in df.columns:
df['_id'] = df['_id'].astype(str)
return df
# Example usage
df = mongo_to_dataframe(
collection,
query={"age": {"$gte": 25}},
projection={"name": 1, "age": 1, "email": 1}
)
print(df.head())
# Save DataFrame to MongoDB
def dataframe_to_mongo(df, collection, if_exists='append'):
"""Save pandas DataFrame to MongoDB"""
records = df.to_dict('records')
if if_exists == 'replace':
collection.delete_many({})
if records:
collection.insert_many(records)
print(f"Inserted {len(records)} records")
# Complex analysis with aggregation and pandas
def analyze_sales_data():
"""Combine MongoDB aggregation with pandas analysis"""
# MongoDB aggregation
pipeline = [
{"$match": {"status": "delivered"}},
{"$group": {
"_id": {
"date": {"$dateToString": {"format": "%Y-%m-%d", "date": "$order_date"}},
"category": "$category"
},
"revenue": {"$sum": "$total"},
"orders": {"$sum": 1}
}}
]
results = list(orders.aggregate(pipeline))
# Convert to DataFrame
df = pd.DataFrame(results)
df['date'] = pd.to_datetime(df['_id'].apply(lambda x: x['date']))
df['category'] = df['_id'].apply(lambda x: x['category'])
df = df.drop('_id', axis=1)
# Pandas analysis
pivot = df.pivot_table(
values='revenue',
index='date',
columns='category',
aggfunc='sum',
fill_value=0
)
# Calculate moving averages
for col in pivot.columns:
pivot[f'{col}_ma7'] = pivot[col].rolling(window=7).mean()
return pivot
# Time series analysis
def time_series_from_mongo():
"""Extract and analyze time series data"""
# Get data
df = mongo_to_dataframe(
orders,
query={"order_date": {"$gte": datetime.now() - timedelta(days=90)}}
)
# Process dates
df['order_date'] = pd.to_datetime(df['order_date'])
df = df.set_index('order_date')
# Resample to daily
daily_sales = df.resample('D')['total'].agg(['sum', 'count', 'mean'])
# Add time-based features
daily_sales['day_of_week'] = daily_sales.index.dayofweek
daily_sales['month'] = daily_sales.index.month
daily_sales['is_weekend'] = daily_sales['day_of_week'].isin([5, 6])
return daily_sales
Performance Optimization
# MongoDB performance optimization techniques
# 1. Batch operations
def batch_operations():
"""Use bulk operations for better performance"""
from pymongo import InsertOne, UpdateOne, DeleteOne
operations = []
# Prepare bulk operations
for i in range(10000):
operations.append(
InsertOne({
"user_id": i,
"score": np.random.randint(0, 100)
})
)
# Execute in bulk
result = collection.bulk_write(operations)
print(f"Inserted: {result.inserted_count}")
# 2. Connection pooling
from pymongo import MongoClient
# Configure connection pool
client = MongoClient(
'mongodb://localhost:27017/',
maxPoolSize=50,
minPoolSize=10,
maxIdleTimeMS=10000,
waitQueueTimeoutMS=10000
)
# 3. Query optimization
def optimize_queries():
"""Query optimization strategies"""
# Use projection to limit fields
results = collection.find(
{"age": {"$gte": 25}},
{"name": 1, "email": 1, "_id": 0}
)
# Use limit for large results
results = collection.find().limit(100)
# Use skip for pagination (careful with large offsets)
page = 2
page_size = 20
results = collection.find().skip((page - 1) * page_size).limit(page_size)
# Use cursor batch size
results = collection.find().batch_size(100)
# Use hint to force index
results = collection.find({"age": 25}).hint("age_1")
# 4. Aggregation optimization
def optimize_aggregation():
"""Aggregation pipeline optimization"""
# Place $match early in pipeline
pipeline = [
{"$match": {"status": "active"}}, # Filter first
{"$project": {"name": 1, "age": 1}}, # Then project
{"$group": {"_id": "$age", "count": {"$sum": 1}}}
]
# Use indexes for $match and $sort
collection.create_index([("status", 1), ("age", 1)])
# Use allowDiskUse for large datasets
results = collection.aggregate(pipeline, allowDiskUse=True)
# 5. Monitoring performance
def monitor_performance():
"""Monitor MongoDB performance"""
# Database statistics
stats = db.command("dbStats")
print(f"Database size: {stats['dataSize']} bytes")
print(f"Index size: {stats['indexSize']} bytes")
# Collection statistics
coll_stats = db.command("collStats", "users")
print(f"Document count: {coll_stats['count']}")
print(f"Average document size: {coll_stats['avgObjSize']} bytes")
# Current operations
current_ops = db.current_op()
for op in current_ops['inprog']:
if op.get('secs_running', 0) > 5:
print(f"Long running operation: {op}")
# Profile slow queries
db.set_profiling_level(1, slow_ms=100)
# Get profiling data
profile_data = db.system.profile.find().limit(10)
for profile in profile_data:
print(f"Slow query: {profile['command']} - {profile['millis']}ms")
MongoDB vs SQL: Migration Guide
# SQL to MongoDB query translation
# SQL: SELECT * FROM users WHERE age > 25
mongo_query = collection.find({"age": {"$gt": 25}})
# SQL: SELECT name, email FROM users WHERE age > 25
mongo_query = collection.find(
{"age": {"$gt": 25}},
{"name": 1, "email": 1, "_id": 0}
)
# SQL: SELECT * FROM users WHERE age BETWEEN 25 AND 35
mongo_query = collection.find({
"age": {"$gte": 25, "$lte": 35}
})
# SQL: SELECT * FROM users WHERE name LIKE 'A%'
mongo_query = collection.find({
"name": {"$regex": "^A", "$options": "i"}
})
# SQL: SELECT COUNT(*) FROM users GROUP BY status
mongo_query = collection.aggregate([
{"$group": {
"_id": "$status",
"count": {"$sum": 1}
}}
])
# SQL: SELECT * FROM users ORDER BY age DESC LIMIT 10
mongo_query = collection.find().sort("age", -1).limit(10)
# SQL: SELECT * FROM orders o JOIN customers c ON o.customer_id = c.id
mongo_query = orders.aggregate([
{"$lookup": {
"from": "customers",
"localField": "customer_id",
"foreignField": "_id",
"as": "customer"
}},
{"$unwind": "$customer"}
])
# SQL: UPDATE users SET status = 'active' WHERE age > 25
mongo_update = collection.update_many(
{"age": {"$gt": 25}},
{"$set": {"status": "active"}}
)
# SQL: DELETE FROM users WHERE status = 'inactive'
mongo_delete = collection.delete_many({"status": "inactive"})
# SQL to MongoDB concept mapping
sql_to_mongo_concepts = {
"Table": "Collection",
"Row": "Document",
"Column": "Field",
"Primary Key": "_id",
"Index": "Index",
"Join": "$lookup",
"GROUP BY": "$group",
"WHERE": "$match",
"HAVING": "$match (after $group)",
"ORDER BY": "$sort",
"LIMIT": "$limit",
"OFFSET": "$skip"
}
Real-world Use Cases
# 1. IoT Sensor Data Storage
def store_sensor_data():
"""Time-series data with bucketing"""
sensor_collection = db['sensor_data']
# Bucket pattern for efficient storage
bucket_size = 3600 # 1 hour buckets
def add_reading(sensor_id, timestamp, data):
bucket_start = timestamp.replace(minute=0, second=0, microsecond=0)
sensor_collection.update_one(
{
"sensor_id": sensor_id,
"bucket_start": bucket_start
},
{
"$push": {
"readings": {
"timestamp": timestamp,
**data
}
},
"$inc": {"count": 1},
"$set": {
"bucket_end": bucket_start + timedelta(hours=1)
}
},
upsert=True
)
# Add sensor reading
add_reading(
"sensor_001",
datetime.now(),
{"temperature": 22.5, "humidity": 45}
)
# 2. Content Management System
def cms_example():
"""Flexible schema for content"""
content_collection = db['content']
# Article with dynamic fields
article = {
"type": "article",
"title": "MongoDB for Data Science",
"author": "John Doe",
"content": "...",
"tags": ["mongodb", "nosql", "data science"],
"metadata": {
"views": 0,
"likes": 0,
"reading_time": "5 min"
},
"comments": [],
"created_at": datetime.now(),
"published": True
}
# Video content (different structure)
video = {
"type": "video",
"title": "MongoDB Tutorial",
"url": "https://...",
"duration": 1200,
"thumbnail": "https://...",
"transcript": "...",
"created_at": datetime.now()
}
content_collection.insert_many([article, video])
# 3. Real-time Analytics
def real_time_analytics():
"""Stream processing and analytics"""
events_collection = db['events']
# Watch for changes (Change Streams)
with events_collection.watch() as stream:
for change in stream:
if change['operationType'] == 'insert':
doc = change['fullDocument']
# Real-time processing
if doc['event_type'] == 'page_view':
# Update analytics
db.analytics.update_one(
{"page": doc['page']},
{
"$inc": {"views": 1},
"$set": {"last_viewed": datetime.now()}
},
upsert=True
)
# 4. Recommendation Engine
def recommendation_engine():
"""User behavior and recommendations"""
# Store user interactions
interactions = db['interactions']
interaction = {
"user_id": "user123",
"item_id": "product456",
"action": "view",
"timestamp": datetime.now(),
"context": {
"source": "search",
"device": "mobile",
"session_id": "sess789"
}
}
interactions.insert_one(interaction)
# Calculate recommendations
pipeline = [
# Get user's interactions
{"$match": {"user_id": "user123"}},
# Find similar users
{"$lookup": {
"from": "interactions",
"let": {"items": "$item_id"},
"pipeline": [
{"$match": {"user_id": {"$ne": "user123"}}},
{"$group": {
"_id": "$user_id",
"common_items": {"$sum": {"$cond": [{"$in": ["$item_id", "$$items"]}, 1, 0]}}
}},
{"$sort": {"common_items": -1}},
{"$limit": 10}
],
"as": "similar_users"
}}
]
recommendations = list(interactions.aggregate(pipeline))
Practice Exercises
Exercise 1: Build a Blog Platform
Create a MongoDB-based blog with:
- User profiles with nested preferences
- Posts with embedded comments (first 10)
- Tags and categories with counters
- Full-text search on posts
- Analytics tracking (views, likes)
Exercise 2: Time Series Analysis
Implement a sensor data system:
- Use bucket pattern for hourly data
- Calculate rolling averages
- Detect anomalies
- Create aggregation for dashboards
- Implement data archival
Exercise 3: E-commerce Migration
Migrate SQL e-commerce to MongoDB:
- Design schema for products, orders, customers
- Handle relationships (embed vs reference)
- Implement shopping cart
- Create analytics pipeline
- Compare performance with SQL version
Key Takeaways
- 📄 MongoDB stores data as flexible JSON-like documents
- 🔄 No fixed schema allows for agile development
- 🚀 Horizontal scaling for big data applications
- 🔍 Powerful aggregation pipeline for complex analytics
- 🎯 Choose between embedding and referencing wisely
- ⚡ Proper indexing is crucial for performance
- 🔗 Integrates well with pandas for data science