Skip to main content

NoSQL Basics (MongoDB)

Beyond Relational: Document Databases for Modern Data! 🚀

NoSQL databases offer flexibility and scalability that traditional SQL databases sometimes can't match. MongoDB, the leading document database, stores data in flexible JSON-like documents, making it perfect for unstructured data, real-time analytics, and modern applications. Learn when and how to leverage NoSQL in your data science projects!

SQL vs NoSQL: Understanding the Differences

graph TD A[Database Types] --> B[SQL/Relational] A --> C[NoSQL] B --> D[PostgreSQL] B --> E[MySQL] B --> F[SQL Server] C --> G[Document: MongoDB] C --> H[Key-Value: Redis] C --> I[Column: Cassandra] C --> J[Graph: Neo4j] G --> K[JSON Documents] H --> L[Simple Pairs] I --> M[Wide Columns] J --> N[Nodes & Edges]

When to Use NoSQL

# Comparison: SQL vs NoSQL Use Cases

sql_use_cases = {
    "ACID Compliance": "Financial transactions",
    "Complex Joins": "Multi-table relationships",
    "Structured Data": "Fixed schema requirements",
    "Strong Consistency": "Inventory management",
    "SQL Expertise": "Business intelligence tools"
}

nosql_use_cases = {
    "Flexible Schema": "Varying data structures",
    "Horizontal Scaling": "Distributed systems",
    "Big Data": "High volume/velocity",
    "Real-time": "IoT sensor data",
    "Document Storage": "Content management",
    "Nested Data": "JSON/XML documents",
    "Geographic Distribution": "Global applications"
}

# Decision matrix
def choose_database(requirements):
    """Simple decision helper for SQL vs NoSQL"""
    
    sql_score = 0
    nosql_score = 0
    
    if requirements.get('needs_transactions'):
        sql_score += 2
    if requirements.get('fixed_schema'):
        sql_score += 2
    if requirements.get('complex_queries'):
        sql_score += 2
    
    if requirements.get('flexible_schema'):
        nosql_score += 2
    if requirements.get('horizontal_scaling'):
        nosql_score += 2
    if requirements.get('nested_documents'):
        nosql_score += 2
    
    return "SQL" if sql_score > nosql_score else "NoSQL"

MongoDB Installation and Setup

# Install MongoDB and PyMongo
"""
# Install MongoDB (varies by OS)
# macOS: brew install mongodb-community
# Ubuntu: sudo apt-get install mongodb
# Windows: Download installer from mongodb.com

# Install Python driver
pip install pymongo
pip install pandas
pip install python-dotenv  # for environment variables
"""

# Basic connection
from pymongo import MongoClient
import pandas as pd
import json
from datetime import datetime, timedelta
import numpy as np

# Connect to MongoDB
# Local connection
client = MongoClient('mongodb://localhost:27017/')

# Connection with authentication
# client = MongoClient('mongodb://username:password@localhost:27017/')

# MongoDB Atlas (cloud) connection
# client = MongoClient('mongodb+srv://username:password@cluster.mongodb.net/')

# Select/create database
db = client['data_science_db']

# Select/create collection (like a table in SQL)
collection = db['users']

# Check connection
print("Databases:", client.list_database_names())
print("Collections in db:", db.list_collection_names())

# Server info
server_info = client.server_info()
print(f"MongoDB version: {server_info['version']}")

CRUD Operations in MongoDB

Create (Insert) Operations

# Insert single document
user = {
    "name": "Alice Johnson",
    "email": "alice@example.com",
    "age": 30,
    "roles": ["user", "admin"],
    "address": {
        "street": "123 Main St",
        "city": "New York",
        "country": "USA",
        "coordinates": {"lat": 40.7128, "lng": -74.0060}
    },
    "interests": ["data science", "machine learning", "python"],
    "created_at": datetime.now(),
    "metadata": {
        "source": "web",
        "campaign": "summer_2024"
    }
}

result = collection.insert_one(user)
print(f"Inserted document ID: {result.inserted_id}")

# Insert multiple documents
users = [
    {
        "name": "Bob Smith",
        "email": "bob@example.com",
        "age": 25,
        "roles": ["user"],
        "interests": ["python", "mongodb"],
        "created_at": datetime.now()
    },
    {
        "name": "Carol White",
        "email": "carol@example.com",
        "age": 35,
        "roles": ["user", "moderator"],
        "interests": ["analytics", "visualization"],
        "purchase_history": [
            {"product": "Course A", "price": 99.99, "date": datetime.now()},
            {"product": "Course B", "price": 149.99, "date": datetime.now()}
        ],
        "created_at": datetime.now()
    }
]

result = collection.insert_many(users)
print(f"Inserted {len(result.inserted_ids)} documents")

# Bulk insert with error handling
from pymongo.errors import BulkWriteError

def bulk_insert_with_validation(documents):
    """Bulk insert with error handling"""
    try:
        result = collection.insert_many(documents, ordered=False)
        print(f"Successfully inserted {len(result.inserted_ids)} documents")
        return result.inserted_ids
    except BulkWriteError as e:
        print(f"Bulk write error: {e.details}")
        # Process successful insertions
        successful = len(documents) - len(e.details['writeErrors'])
        print(f"Successfully inserted {successful} documents")
        return None

Read (Query) Operations

# Find one document
user = collection.find_one({"name": "Alice Johnson"})
print(json.dumps(user, default=str, indent=2))

# Find multiple documents
users = collection.find({"age": {"$gte": 25}})
for user in users:
    print(f"{user['name']}: {user['age']} years old")

# Query with multiple conditions
query = {
    "$and": [
        {"age": {"$gte": 25}},
        {"age": {"$lte": 35}},
        {"interests": {"$in": ["python", "data science"]}}
    ]
}
results = collection.find(query)

# Convert to DataFrame
df = pd.DataFrame(list(collection.find()))
print(df.head())

# Projection (select specific fields)
projection = {"name": 1, "email": 1, "age": 1, "_id": 0}
users = collection.find({}, projection)

# Complex queries
# Nested field query
users_in_ny = collection.find({"address.city": "New York"})

# Array queries
python_users = collection.find({"interests": "python"})

# Regex search
pattern = {"name": {"$regex": "^A", "$options": "i"}}  # Names starting with 'A'
users = collection.find(pattern)

# Comparison operators
comparison_queries = {
    "Equal": {"age": 30},
    "Not Equal": {"age": {"$ne": 30}},
    "Greater Than": {"age": {"$gt": 30}},
    "Greater or Equal": {"age": {"$gte": 30}},
    "Less Than": {"age": {"$lt": 30}},
    "Less or Equal": {"age": {"$lte": 30}},
    "In List": {"age": {"$in": [25, 30, 35]}},
    "Not In List": {"age": {"$nin": [25, 30, 35]}},
}

# Logical operators
logical_queries = {
    "AND": {"$and": [{"age": {"$gte": 25}}, {"age": {"$lte": 35}}]},
    "OR": {"$or": [{"age": {"$lt": 25}}, {"age": {"$gt": 35}}]},
    "NOT": {"age": {"$not": {"$eq": 30}}},
    "NOR": {"$nor": [{"age": 25}, {"age": 35}]}
}

# Element operators
element_queries = {
    "Exists": {"address": {"$exists": True}},
    "Type": {"age": {"$type": "int"}}
}

# Array operators
array_queries = {
    "All": {"interests": {"$all": ["python", "mongodb"]}},
    "Element Match": {"purchase_history": {"$elemMatch": {"price": {"$gt": 100}}}},
    "Size": {"interests": {"$size": 3}}
}

Update Operations

# Update one document
result = collection.update_one(
    {"name": "Alice Johnson"},
    {"$set": {"age": 31, "updated_at": datetime.now()}}
)
print(f"Modified {result.modified_count} document(s)")

# Update many documents
result = collection.update_many(
    {"age": {"$lt": 30}},
    {"$inc": {"age": 1}}  # Increment age by 1
)
print(f"Modified {result.modified_count} document(s)")

# Update operators
update_operators = {
    # Field operators
    "$set": {"field": "value"},  # Set field value
    "$unset": {"field": ""},  # Remove field
    "$inc": {"count": 1},  # Increment
    "$mul": {"price": 1.1},  # Multiply
    "$rename": {"oldName": "newName"},  # Rename field
    "$min": {"score": 10},  # Update if less than current
    "$max": {"score": 100},  # Update if greater than current
    "$currentDate": {"lastModified": True},  # Set to current date
    
    # Array operators
    "$push": {"tags": "new_tag"},  # Add to array
    "$pull": {"tags": "old_tag"},  # Remove from array
    "$addToSet": {"tags": "unique_tag"},  # Add if not exists
    "$pop": {"tags": 1},  # Remove last element (1) or first (-1)
    "$pullAll": {"tags": ["tag1", "tag2"]},  # Remove multiple
}

# Upsert (update or insert)
result = collection.update_one(
    {"email": "new@example.com"},
    {"$set": {"name": "New User", "created_at": datetime.now()}},
    upsert=True
)

# Replace entire document
result = collection.replace_one(
    {"name": "Bob Smith"},
    {
        "name": "Robert Smith",
        "email": "robert@example.com",
        "age": 26,
        "roles": ["premium_user"],
        "updated_at": datetime.now()
    }
)

# Bulk updates
from pymongo import UpdateOne

bulk_updates = [
    UpdateOne({"name": "Alice Johnson"}, {"$set": {"status": "active"}}),
    UpdateOne({"name": "Bob Smith"}, {"$set": {"status": "inactive"}}),
    UpdateOne({"name": "Carol White"}, {"$inc": {"login_count": 1}})
]

result = collection.bulk_write(bulk_updates)
print(f"Bulk update: {result.modified_count} modified")

Delete Operations

# Delete one document
result = collection.delete_one({"name": "Test User"})
print(f"Deleted {result.deleted_count} document(s)")

# Delete many documents
result = collection.delete_many({"age": {"$lt": 18}})
print(f"Deleted {result.deleted_count} document(s)")

# Delete with conditions
result = collection.delete_many({
    "$and": [
        {"created_at": {"$lt": datetime.now() - timedelta(days=365)}},
        {"status": "inactive"}
    ]
})

# Delete all documents (careful!)
# result = collection.delete_many({})

# Drop collection entirely
# collection.drop()

Aggregation Pipeline

Basic Aggregation

# MongoDB Aggregation Pipeline - powerful data processing

# Sample e-commerce data
orders = db['orders']

# Insert sample data
sample_orders = [
    {
        "order_id": i,
        "customer_id": np.random.randint(1, 100),
        "products": [
            {
                "product_id": np.random.randint(1, 50),
                "name": f"Product_{np.random.randint(1, 50)}",
                "category": np.random.choice(['Electronics', 'Clothing', 'Books']),
                "price": round(np.random.uniform(10, 500), 2),
                "quantity": np.random.randint(1, 5)
            }
            for _ in range(np.random.randint(1, 4))
        ],
        "order_date": datetime.now() - timedelta(days=np.random.randint(0, 365)),
        "status": np.random.choice(['pending', 'shipped', 'delivered']),
        "region": np.random.choice(['North', 'South', 'East', 'West'])
    }
    for i in range(1000)
]

orders.insert_many(sample_orders)

# Basic aggregation pipeline
pipeline = [
    # Stage 1: Match (filter)
    {"$match": {"status": "delivered"}},
    
    # Stage 2: Unwind array
    {"$unwind": "$products"},
    
    # Stage 3: Group and aggregate
    {"$group": {
        "_id": "$products.category",
        "total_revenue": {"$sum": {"$multiply": ["$products.price", "$products.quantity"]}},
        "order_count": {"$sum": 1},
        "avg_price": {"$avg": "$products.price"}
    }},
    
    # Stage 4: Sort
    {"$sort": {"total_revenue": -1}},
    
    # Stage 5: Limit
    {"$limit": 10}
]

results = list(orders.aggregate(pipeline))
df = pd.DataFrame(results)
print(df)

Advanced Aggregation

# Complex aggregation example: Sales analytics

advanced_pipeline = [
    # Stage 1: Match recent orders
    {"$match": {
        "order_date": {"$gte": datetime.now() - timedelta(days=90)}
    }},
    
    # Stage 2: Add computed fields
    {"$addFields": {
        "order_total": {
            "$reduce": {
                "input": "$products",
                "initialValue": 0,
                "in": {
                    "$add": ["$$value", {"$multiply": ["$$this.price", "$$this.quantity"]}]
                }
            }
        },
        "item_count": {"$size": "$products"},
        "order_month": {"$dateToString": {"format": "%Y-%m", "date": "$order_date"}}
    }},
    
    # Stage 3: Lookup (join) customer data
    {"$lookup": {
        "from": "customers",
        "localField": "customer_id",
        "foreignField": "customer_id",
        "as": "customer_info"
    }},
    
    # Stage 4: Unwind customer info
    {"$unwind": {"path": "$customer_info", "preserveNullAndEmptyArrays": True}},
    
    # Stage 5: Group by month and region
    {"$group": {
        "_id": {
            "month": "$order_month",
            "region": "$region"
        },
        "total_revenue": {"$sum": "$order_total"},
        "order_count": {"$sum": 1},
        "avg_order_value": {"$avg": "$order_total"},
        "unique_customers": {"$addToSet": "$customer_id"},
        "max_order": {"$max": "$order_total"},
        "min_order": {"$min": "$order_total"}
    }},
    
    # Stage 6: Add customer count
    {"$addFields": {
        "customer_count": {"$size": "$unique_customers"}
    }},
    
    # Stage 7: Remove customer array (no longer needed)
    {"$project": {
        "unique_customers": 0
    }},
    
    # Stage 8: Sort by revenue
    {"$sort": {"total_revenue": -1}},
    
    # Stage 9: Facet for multiple outputs
    {"$facet": {
        "by_region": [
            {"$group": {
                "_id": "$_id.region",
                "total": {"$sum": "$total_revenue"}
            }},
            {"$sort": {"total": -1}}
        ],
        "by_month": [
            {"$group": {
                "_id": "$_id.month",
                "total": {"$sum": "$total_revenue"}
            }},
            {"$sort": {"_id": 1}}
        ],
        "top_combinations": [
            {"$limit": 5}
        ]
    }}
]

results = list(orders.aggregate(advanced_pipeline))
print(json.dumps(results[0], default=str, indent=2))

Aggregation Operators

# Common aggregation operators

# Arithmetic operators
arithmetic_ops = {
    "$add": ["$price", "$tax"],
    "$subtract": ["$total", "$discount"],
    "$multiply": ["$quantity", "$unit_price"],
    "$divide": ["$total", "$quantity"],
    "$mod": ["$value", 10],
    "$pow": ["$base", 2],
    "$sqrt": "$value",
    "$abs": "$difference"
}

# String operators
string_ops = {
    "$concat": ["$first_name", " ", "$last_name"],
    "$substr": ["$description", 0, 100],
    "$toLower": "$email",
    "$toUpper": "$code",
    "$split": ["$tags", ","],
    "$trim": {"input": "$text", "chars": " "}
}

# Date operators
date_ops = {
    "$dayOfMonth": "$date",
    "$month": "$date",
    "$year": "$date",
    "$dayOfWeek": "$date",
    "$dateToString": {"format": "%Y-%m-%d", "date": "$date"},
    "$dateDiff": {
        "startDate": "$start_date",
        "endDate": "$end_date",
        "unit": "day"
    }
}

# Array operators
array_ops = {
    "$size": "$items",
    "$slice": ["$array", 5],
    "$arrayElemAt": ["$array", 0],
    "$filter": {
        "input": "$items",
        "as": "item",
        "cond": {"$gte": ["$$item.price", 100]}
    },
    "$map": {
        "input": "$items",
        "as": "item",
        "in": {"$multiply": ["$$item.price", 1.1]}
    }
}

# Conditional operators
conditional_ops = {
    "$cond": {
        "if": {"$gte": ["$score", 90]},
        "then": "A",
        "else": "B"
    },
    "$switch": {
        "branches": [
            {"case": {"$gte": ["$score", 90]}, "then": "A"},
            {"case": {"$gte": ["$score", 80]}, "then": "B"},
            {"case": {"$gte": ["$score", 70]}, "then": "C"}
        ],
        "default": "F"
    },
    "$ifNull": ["$optional_field", "default_value"]
}

Indexing in MongoDB

# Create indexes for better performance

# Single field index
collection.create_index("email", unique=True)
collection.create_index([("age", 1)])  # 1 for ascending, -1 for descending

# Compound index
collection.create_index([
    ("category", 1),
    ("price", -1)
])

# Text index for search
collection.create_index([("description", "text"), ("title", "text")])

# 2D geospatial index
collection.create_index([("location", "2dsphere")])

# Partial index (conditional)
collection.create_index(
    "age",
    partialFilterExpression={"age": {"$gte": 18}}
)

# TTL index (auto-delete documents)
collection.create_index(
    "expireAt",
    expireAfterSeconds=3600  # Delete after 1 hour
)

# List all indexes
indexes = collection.list_indexes()
for index in indexes:
    print(index)

# Index usage statistics
stats = db.command("collStats", "users", indexDetails=True)
print(json.dumps(stats['indexSizes'], indent=2))

# Drop index
collection.drop_index("age_1")

# Explain query execution
explanation = collection.find({"age": {"$gte": 25}}).explain()
print(json.dumps(explanation['executionStats'], default=str, indent=2))

Data Modeling in MongoDB

Embedding vs Referencing

# Embedding Pattern (Denormalized)
embedded_blog_post = {
    "_id": ObjectId(),
    "title": "Introduction to MongoDB",
    "author": {
        "name": "John Doe",
        "email": "john@example.com",
        "bio": "Database expert"
    },
    "comments": [
        {
            "user": "Alice",
            "text": "Great post!",
            "date": datetime.now()
        },
        {
            "user": "Bob",
            "text": "Very helpful",
            "date": datetime.now()
        }
    ],
    "tags": ["mongodb", "nosql", "database"],
    "created_at": datetime.now()
}

# Pros: Single query gets all data, atomic updates
# Cons: Document size limit (16MB), data duplication

# Reference Pattern (Normalized)
# Authors collection
author = {
    "_id": ObjectId(),
    "name": "John Doe",
    "email": "john@example.com",
    "bio": "Database expert"
}

# Posts collection
post = {
    "_id": ObjectId(),
    "title": "Introduction to MongoDB",
    "author_id": author["_id"],  # Reference to author
    "created_at": datetime.now()
}

# Comments collection
comment = {
    "_id": ObjectId(),
    "post_id": post["_id"],  # Reference to post
    "user": "Alice",
    "text": "Great post!",
    "date": datetime.now()
}

# Pros: No duplication, smaller documents
# Cons: Multiple queries needed, no transactions (before MongoDB 4.0)

# Hybrid Pattern
hybrid_post = {
    "_id": ObjectId(),
    "title": "Introduction to MongoDB",
    "author_id": ObjectId(),  # Reference
    "author_name": "John Doe",  # Denormalized for display
    "comment_count": 2,  # Cached count
    "recent_comments": [  # Embed recent only
        {"user": "Alice", "text": "Great post!"}
    ],
    "tags": ["mongodb", "nosql"],
    "created_at": datetime.now()
}

Schema Design Patterns

# Common MongoDB design patterns

# 1. Polymorphic Pattern - Different types in same collection
products = [
    {
        "type": "book",
        "title": "MongoDB Guide",
        "author": "John Doe",
        "isbn": "978-1234567890",
        "pages": 350
    },
    {
        "type": "movie",
        "title": "Data Science Documentary",
        "director": "Jane Smith",
        "duration": 120,
        "format": "BluRay"
    }
]

# 2. Attribute Pattern - Flexible attributes
product_with_attrs = {
    "name": "Laptop",
    "attributes": [
        {"key": "color", "value": "silver"},
        {"key": "ram", "value": "16GB", "type": "spec"},
        {"key": "warranty", "value": "2 years", "type": "service"}
    ]
}

# 3. Bucket Pattern - Time series data
sensor_bucket = {
    "sensor_id": "sensor_001",
    "start_time": datetime(2024, 1, 1, 0, 0),
    "end_time": datetime(2024, 1, 1, 1, 0),
    "measurement_count": 3600,
    "measurements": [
        {"timestamp": datetime(2024, 1, 1, 0, 0, 0), "temp": 22.5, "humidity": 45},
        {"timestamp": datetime(2024, 1, 1, 0, 0, 1), "temp": 22.6, "humidity": 45},
        # ... up to 3600 measurements
    ],
    "summary": {
        "avg_temp": 22.5,
        "max_temp": 23.1,
        "min_temp": 22.0
    }
}

# 4. Computed Pattern - Pre-calculated values
order_with_computed = {
    "order_id": 1,
    "items": [
        {"product": "A", "quantity": 2, "price": 10},
        {"product": "B", "quantity": 1, "price": 20}
    ],
    "subtotal": 40,  # Pre-computed
    "tax": 4,  # Pre-computed
    "total": 44,  # Pre-computed
    "item_count": 3  # Pre-computed
}

# 5. Subset Pattern - Frequently accessed subset
user_with_subset = {
    "_id": ObjectId(),
    "name": "John Doe",
    "email": "john@example.com",
    "recent_posts": [  # Last 10 posts
        {"title": "Post 1", "date": datetime.now()},
        {"title": "Post 2", "date": datetime.now()}
    ],
    "total_posts": 150  # Full count
}

# 6. Tree Pattern - Hierarchical data
category_tree = {
    "_id": "electronics",
    "name": "Electronics",
    "path": ",electronics,",
    "parent": None,
    "ancestors": []
}

subcategory = {
    "_id": "laptops",
    "name": "Laptops",
    "path": ",electronics,laptops,",
    "parent": "electronics",
    "ancestors": ["electronics"]
}

MongoDB with Pandas

# Working with MongoDB and Pandas

# Load data from MongoDB to DataFrame
def mongo_to_dataframe(collection, query={}, projection=None):
    """Convert MongoDB collection to pandas DataFrame"""
    cursor = collection.find(query, projection)
    df = pd.DataFrame(list(cursor))
    
    # Handle ObjectId
    if '_id' in df.columns:
        df['_id'] = df['_id'].astype(str)
    
    return df

# Example usage
df = mongo_to_dataframe(
    collection,
    query={"age": {"$gte": 25}},
    projection={"name": 1, "age": 1, "email": 1}
)
print(df.head())

# Save DataFrame to MongoDB
def dataframe_to_mongo(df, collection, if_exists='append'):
    """Save pandas DataFrame to MongoDB"""
    records = df.to_dict('records')
    
    if if_exists == 'replace':
        collection.delete_many({})
    
    if records:
        collection.insert_many(records)
        print(f"Inserted {len(records)} records")

# Complex analysis with aggregation and pandas
def analyze_sales_data():
    """Combine MongoDB aggregation with pandas analysis"""
    
    # MongoDB aggregation
    pipeline = [
        {"$match": {"status": "delivered"}},
        {"$group": {
            "_id": {
                "date": {"$dateToString": {"format": "%Y-%m-%d", "date": "$order_date"}},
                "category": "$category"
            },
            "revenue": {"$sum": "$total"},
            "orders": {"$sum": 1}
        }}
    ]
    
    results = list(orders.aggregate(pipeline))
    
    # Convert to DataFrame
    df = pd.DataFrame(results)
    df['date'] = pd.to_datetime(df['_id'].apply(lambda x: x['date']))
    df['category'] = df['_id'].apply(lambda x: x['category'])
    df = df.drop('_id', axis=1)
    
    # Pandas analysis
    pivot = df.pivot_table(
        values='revenue',
        index='date',
        columns='category',
        aggfunc='sum',
        fill_value=0
    )
    
    # Calculate moving averages
    for col in pivot.columns:
        pivot[f'{col}_ma7'] = pivot[col].rolling(window=7).mean()
    
    return pivot

# Time series analysis
def time_series_from_mongo():
    """Extract and analyze time series data"""
    
    # Get data
    df = mongo_to_dataframe(
        orders,
        query={"order_date": {"$gte": datetime.now() - timedelta(days=90)}}
    )
    
    # Process dates
    df['order_date'] = pd.to_datetime(df['order_date'])
    df = df.set_index('order_date')
    
    # Resample to daily
    daily_sales = df.resample('D')['total'].agg(['sum', 'count', 'mean'])
    
    # Add time-based features
    daily_sales['day_of_week'] = daily_sales.index.dayofweek
    daily_sales['month'] = daily_sales.index.month
    daily_sales['is_weekend'] = daily_sales['day_of_week'].isin([5, 6])
    
    return daily_sales

Performance Optimization

# MongoDB performance optimization techniques

# 1. Batch operations
def batch_operations():
    """Use bulk operations for better performance"""
    from pymongo import InsertOne, UpdateOne, DeleteOne
    
    operations = []
    
    # Prepare bulk operations
    for i in range(10000):
        operations.append(
            InsertOne({
                "user_id": i,
                "score": np.random.randint(0, 100)
            })
        )
    
    # Execute in bulk
    result = collection.bulk_write(operations)
    print(f"Inserted: {result.inserted_count}")

# 2. Connection pooling
from pymongo import MongoClient

# Configure connection pool
client = MongoClient(
    'mongodb://localhost:27017/',
    maxPoolSize=50,
    minPoolSize=10,
    maxIdleTimeMS=10000,
    waitQueueTimeoutMS=10000
)

# 3. Query optimization
def optimize_queries():
    """Query optimization strategies"""
    
    # Use projection to limit fields
    results = collection.find(
        {"age": {"$gte": 25}},
        {"name": 1, "email": 1, "_id": 0}
    )
    
    # Use limit for large results
    results = collection.find().limit(100)
    
    # Use skip for pagination (careful with large offsets)
    page = 2
    page_size = 20
    results = collection.find().skip((page - 1) * page_size).limit(page_size)
    
    # Use cursor batch size
    results = collection.find().batch_size(100)
    
    # Use hint to force index
    results = collection.find({"age": 25}).hint("age_1")

# 4. Aggregation optimization
def optimize_aggregation():
    """Aggregation pipeline optimization"""
    
    # Place $match early in pipeline
    pipeline = [
        {"$match": {"status": "active"}},  # Filter first
        {"$project": {"name": 1, "age": 1}},  # Then project
        {"$group": {"_id": "$age", "count": {"$sum": 1}}}
    ]
    
    # Use indexes for $match and $sort
    collection.create_index([("status", 1), ("age", 1)])
    
    # Use allowDiskUse for large datasets
    results = collection.aggregate(pipeline, allowDiskUse=True)

# 5. Monitoring performance
def monitor_performance():
    """Monitor MongoDB performance"""
    
    # Database statistics
    stats = db.command("dbStats")
    print(f"Database size: {stats['dataSize']} bytes")
    print(f"Index size: {stats['indexSize']} bytes")
    
    # Collection statistics
    coll_stats = db.command("collStats", "users")
    print(f"Document count: {coll_stats['count']}")
    print(f"Average document size: {coll_stats['avgObjSize']} bytes")
    
    # Current operations
    current_ops = db.current_op()
    for op in current_ops['inprog']:
        if op.get('secs_running', 0) > 5:
            print(f"Long running operation: {op}")
    
    # Profile slow queries
    db.set_profiling_level(1, slow_ms=100)
    
    # Get profiling data
    profile_data = db.system.profile.find().limit(10)
    for profile in profile_data:
        print(f"Slow query: {profile['command']} - {profile['millis']}ms")

MongoDB vs SQL: Migration Guide

# SQL to MongoDB query translation

# SQL: SELECT * FROM users WHERE age > 25
mongo_query = collection.find({"age": {"$gt": 25}})

# SQL: SELECT name, email FROM users WHERE age > 25
mongo_query = collection.find(
    {"age": {"$gt": 25}},
    {"name": 1, "email": 1, "_id": 0}
)

# SQL: SELECT * FROM users WHERE age BETWEEN 25 AND 35
mongo_query = collection.find({
    "age": {"$gte": 25, "$lte": 35}
})

# SQL: SELECT * FROM users WHERE name LIKE 'A%'
mongo_query = collection.find({
    "name": {"$regex": "^A", "$options": "i"}
})

# SQL: SELECT COUNT(*) FROM users GROUP BY status
mongo_query = collection.aggregate([
    {"$group": {
        "_id": "$status",
        "count": {"$sum": 1}
    }}
])

# SQL: SELECT * FROM users ORDER BY age DESC LIMIT 10
mongo_query = collection.find().sort("age", -1).limit(10)

# SQL: SELECT * FROM orders o JOIN customers c ON o.customer_id = c.id
mongo_query = orders.aggregate([
    {"$lookup": {
        "from": "customers",
        "localField": "customer_id",
        "foreignField": "_id",
        "as": "customer"
    }},
    {"$unwind": "$customer"}
])

# SQL: UPDATE users SET status = 'active' WHERE age > 25
mongo_update = collection.update_many(
    {"age": {"$gt": 25}},
    {"$set": {"status": "active"}}
)

# SQL: DELETE FROM users WHERE status = 'inactive'
mongo_delete = collection.delete_many({"status": "inactive"})

# SQL to MongoDB concept mapping
sql_to_mongo_concepts = {
    "Table": "Collection",
    "Row": "Document",
    "Column": "Field",
    "Primary Key": "_id",
    "Index": "Index",
    "Join": "$lookup",
    "GROUP BY": "$group",
    "WHERE": "$match",
    "HAVING": "$match (after $group)",
    "ORDER BY": "$sort",
    "LIMIT": "$limit",
    "OFFSET": "$skip"
}

Real-world Use Cases

# 1. IoT Sensor Data Storage
def store_sensor_data():
    """Time-series data with bucketing"""
    
    sensor_collection = db['sensor_data']
    
    # Bucket pattern for efficient storage
    bucket_size = 3600  # 1 hour buckets
    
    def add_reading(sensor_id, timestamp, data):
        bucket_start = timestamp.replace(minute=0, second=0, microsecond=0)
        
        sensor_collection.update_one(
            {
                "sensor_id": sensor_id,
                "bucket_start": bucket_start
            },
            {
                "$push": {
                    "readings": {
                        "timestamp": timestamp,
                        **data
                    }
                },
                "$inc": {"count": 1},
                "$set": {
                    "bucket_end": bucket_start + timedelta(hours=1)
                }
            },
            upsert=True
        )
    
    # Add sensor reading
    add_reading(
        "sensor_001",
        datetime.now(),
        {"temperature": 22.5, "humidity": 45}
    )

# 2. Content Management System
def cms_example():
    """Flexible schema for content"""
    
    content_collection = db['content']
    
    # Article with dynamic fields
    article = {
        "type": "article",
        "title": "MongoDB for Data Science",
        "author": "John Doe",
        "content": "...",
        "tags": ["mongodb", "nosql", "data science"],
        "metadata": {
            "views": 0,
            "likes": 0,
            "reading_time": "5 min"
        },
        "comments": [],
        "created_at": datetime.now(),
        "published": True
    }
    
    # Video content (different structure)
    video = {
        "type": "video",
        "title": "MongoDB Tutorial",
        "url": "https://...",
        "duration": 1200,
        "thumbnail": "https://...",
        "transcript": "...",
        "created_at": datetime.now()
    }
    
    content_collection.insert_many([article, video])

# 3. Real-time Analytics
def real_time_analytics():
    """Stream processing and analytics"""
    
    events_collection = db['events']
    
    # Watch for changes (Change Streams)
    with events_collection.watch() as stream:
        for change in stream:
            if change['operationType'] == 'insert':
                doc = change['fullDocument']
                
                # Real-time processing
                if doc['event_type'] == 'page_view':
                    # Update analytics
                    db.analytics.update_one(
                        {"page": doc['page']},
                        {
                            "$inc": {"views": 1},
                            "$set": {"last_viewed": datetime.now()}
                        },
                        upsert=True
                    )

# 4. Recommendation Engine
def recommendation_engine():
    """User behavior and recommendations"""
    
    # Store user interactions
    interactions = db['interactions']
    
    interaction = {
        "user_id": "user123",
        "item_id": "product456",
        "action": "view",
        "timestamp": datetime.now(),
        "context": {
            "source": "search",
            "device": "mobile",
            "session_id": "sess789"
        }
    }
    
    interactions.insert_one(interaction)
    
    # Calculate recommendations
    pipeline = [
        # Get user's interactions
        {"$match": {"user_id": "user123"}},
        
        # Find similar users
        {"$lookup": {
            "from": "interactions",
            "let": {"items": "$item_id"},
            "pipeline": [
                {"$match": {"user_id": {"$ne": "user123"}}},
                {"$group": {
                    "_id": "$user_id",
                    "common_items": {"$sum": {"$cond": [{"$in": ["$item_id", "$$items"]}, 1, 0]}}
                }},
                {"$sort": {"common_items": -1}},
                {"$limit": 10}
            ],
            "as": "similar_users"
        }}
    ]
    
    recommendations = list(interactions.aggregate(pipeline))

Practice Exercises

Exercise 1: Build a Blog Platform

Create a MongoDB-based blog with:

  1. User profiles with nested preferences
  2. Posts with embedded comments (first 10)
  3. Tags and categories with counters
  4. Full-text search on posts
  5. Analytics tracking (views, likes)

Exercise 2: Time Series Analysis

Implement a sensor data system:

  1. Use bucket pattern for hourly data
  2. Calculate rolling averages
  3. Detect anomalies
  4. Create aggregation for dashboards
  5. Implement data archival

Exercise 3: E-commerce Migration

Migrate SQL e-commerce to MongoDB:

  1. Design schema for products, orders, customers
  2. Handle relationships (embed vs reference)
  3. Implement shopping cart
  4. Create analytics pipeline
  5. Compare performance with SQL version

Key Takeaways

Further Resources