Skip to main content

Rate Limiting and Ethics

Be a Responsible Data Citizen! 🀝

With great scraping power comes great responsibility. Rate limiting isn't just about avoiding IP bansβ€”it's about respecting the services we use, following legal guidelines, and being a good citizen of the web. Learn to build scrapers that are not only effective but also ethical, sustainable, and legally compliant.

Understanding Rate Limiting

graph TD A[Rate Limiting] --> B[Client-Side] A --> C[Server-Side] B --> D[Request Throttling] B --> E[Backoff Strategies] B --> F[Queue Management] C --> G[429 Status] C --> H[Retry-After Header] C --> I[X-RateLimit Headers] E --> J[Fixed Delay] E --> K[Exponential Backoff] E --> L[Jitter] I --> M[Limit] I --> N[Remaining] I --> O[Reset]

Types of Rate Limits

Common Rate Limiting Patterns

import time
import requests
from datetime import datetime, timedelta
from collections import deque
import threading
import random

# 1. Fixed Window Rate Limiting
class FixedWindowRateLimiter:
    """Fixed time window rate limiting"""
    
    def __init__(self, max_requests=60, window_seconds=60):
        self.max_requests = max_requests
        self.window_seconds = window_seconds
        self.window_start = time.time()
        self.request_count = 0
        self.lock = threading.Lock()
    
    def allow_request(self):
        """Check if request is allowed"""
        with self.lock:
            current_time = time.time()
            
            # Check if we're in a new window
            if current_time - self.window_start >= self.window_seconds:
                self.window_start = current_time
                self.request_count = 0
            
            # Check if we can make a request
            if self.request_count < self.max_requests:
                self.request_count += 1
                return True
            
            return False
    
    def wait_if_needed(self):
        """Wait until request is allowed"""
        while not self.allow_request():
            # Calculate time until next window
            time_to_wait = self.window_seconds - (time.time() - self.window_start)
            if time_to_wait > 0:
                print(f"Rate limit reached. Waiting {time_to_wait:.2f} seconds...")
                time.sleep(min(time_to_wait, 1))

# 2. Sliding Window Rate Limiting
class SlidingWindowRateLimiter:
    """Sliding window rate limiting"""
    
    def __init__(self, max_requests=60, window_seconds=60):
        self.max_requests = max_requests
        self.window_seconds = window_seconds
        self.requests = deque()
        self.lock = threading.Lock()
    
    def allow_request(self):
        """Check if request is allowed"""
        with self.lock:
            current_time = time.time()
            
            # Remove old requests outside the window
            while self.requests and self.requests[0] < current_time - self.window_seconds:
                self.requests.popleft()
            
            # Check if we can make a request
            if len(self.requests) < self.max_requests:
                self.requests.append(current_time)
                return True
            
            return False
    
    def wait_if_needed(self):
        """Wait until request is allowed"""
        while not self.allow_request():
            with self.lock:
                if self.requests:
                    # Wait until oldest request expires
                    oldest = self.requests[0]
                    time_to_wait = (oldest + self.window_seconds) - time.time()
                    if time_to_wait > 0:
                        print(f"Rate limit reached. Waiting {time_to_wait:.2f} seconds...")
                        time.sleep(min(time_to_wait, 0.1))
                else:
                    time.sleep(0.1)

# 3. Token Bucket Algorithm
class TokenBucketRateLimiter:
    """Token bucket rate limiting algorithm"""
    
    def __init__(self, tokens_per_second=1, bucket_size=10):
        self.tokens_per_second = tokens_per_second
        self.bucket_size = bucket_size
        self.tokens = bucket_size
        self.last_update = time.time()
        self.lock = threading.Lock()
    
    def _refill(self):
        """Refill tokens based on elapsed time"""
        current_time = time.time()
        elapsed = current_time - self.last_update
        
        # Add new tokens
        new_tokens = elapsed * self.tokens_per_second
        self.tokens = min(self.tokens + new_tokens, self.bucket_size)
        self.last_update = current_time
    
    def allow_request(self, tokens=1):
        """Check if request is allowed"""
        with self.lock:
            self._refill()
            
            if self.tokens >= tokens:
                self.tokens -= tokens
                return True
            
            return False
    
    def wait_if_needed(self, tokens=1):
        """Wait until tokens are available"""
        while not self.allow_request(tokens):
            with self.lock:
                self._refill()
                if self.tokens < tokens:
                    # Calculate wait time
                    tokens_needed = tokens - self.tokens
                    time_to_wait = tokens_needed / self.tokens_per_second
                    print(f"Waiting {time_to_wait:.2f} seconds for tokens...")
            
            time.sleep(0.1)

# 4. Leaky Bucket Algorithm
class LeakyBucketRateLimiter:
    """Leaky bucket rate limiting"""
    
    def __init__(self, leak_rate=1, bucket_size=10):
        self.leak_rate = leak_rate  # Requests per second
        self.bucket_size = bucket_size
        self.queue = deque(maxlen=bucket_size)
        self.last_leak = time.time()
        self.lock = threading.Lock()
        
        # Start leak thread
        self.running = True
        self.leak_thread = threading.Thread(target=self._leak_loop)
        self.leak_thread.daemon = True
        self.leak_thread.start()
    
    def _leak_loop(self):
        """Background thread to process requests"""
        while self.running:
            time.sleep(1 / self.leak_rate)
            with self.lock:
                if self.queue:
                    self.queue.popleft()
    
    def allow_request(self):
        """Add request to bucket"""
        with self.lock:
            if len(self.queue) < self.bucket_size:
                self.queue.append(time.time())
                return True
            return False
    
    def stop(self):
        """Stop the leak thread"""
        self.running = False
        self.leak_thread.join()

Handling Server Rate Limits

Response Header Parsing

import time
import requests
from datetime import datetime, timedelta

class ServerRateLimitHandler:
    """Handle server-side rate limiting responses"""
    
    def __init__(self):
        self.rate_limits = {}
        self.retry_after = {}
    
    def parse_rate_limit_headers(self, response):
        """Parse rate limit information from headers"""
        headers = response.headers
        
        # Standard rate limit headers
        rate_limit_info = {
            'limit': None,
            'remaining': None,
            'reset': None,
            'retry_after': None
        }
        
        # X-RateLimit headers (GitHub, Twitter style)
        if 'X-RateLimit-Limit' in headers:
            rate_limit_info['limit'] = int(headers['X-RateLimit-Limit'])
        
        if 'X-RateLimit-Remaining' in headers:
            rate_limit_info['remaining'] = int(headers['X-RateLimit-Remaining'])
        
        if 'X-RateLimit-Reset' in headers:
            reset_timestamp = int(headers['X-RateLimit-Reset'])
            rate_limit_info['reset'] = datetime.fromtimestamp(reset_timestamp)
        
        # Alternative header names
        if 'X-Rate-Limit-Limit' in headers:
            rate_limit_info['limit'] = int(headers['X-Rate-Limit-Limit'])
        
        if 'X-Rate-Limit-Remaining' in headers:
            rate_limit_info['remaining'] = int(headers['X-Rate-Limit-Remaining'])
        
        if 'X-Rate-Limit-Reset' in headers:
            reset_timestamp = int(headers['X-Rate-Limit-Reset'])
            rate_limit_info['reset'] = datetime.fromtimestamp(reset_timestamp)
        
        # Retry-After header (429 responses)
        if 'Retry-After' in headers:
            retry_after = headers['Retry-After']
            if retry_after.isdigit():
                # Seconds to wait
                rate_limit_info['retry_after'] = int(retry_after)
            else:
                # HTTP date
                retry_time = datetime.strptime(retry_after, '%a, %d %b %Y %H:%M:%S GMT')
                rate_limit_info['retry_after'] = (retry_time - datetime.utcnow()).total_seconds()
        
        return rate_limit_info
    
    def should_wait(self, rate_limit_info):
        """Determine if we should wait based on rate limit info"""
        if rate_limit_info['retry_after']:
            return True, rate_limit_info['retry_after']
        
        if rate_limit_info['remaining'] is not None and rate_limit_info['remaining'] <= 0:
            if rate_limit_info['reset']:
                wait_time = (rate_limit_info['reset'] - datetime.now()).total_seconds()
                return True, max(wait_time, 0)
        
        return False, 0
    
    def handle_response(self, response, url=None):
        """Handle rate limited response"""
        # Check for rate limit status
        if response.status_code == 429:
            print(f"Rate limited (429) for {url or response.url}")
            
            rate_limit_info = self.parse_rate_limit_headers(response)
            should_wait, wait_time = self.should_wait(rate_limit_info)
            
            if should_wait:
                print(f"Waiting {wait_time:.2f} seconds before retry...")
                time.sleep(wait_time)
                return True  # Should retry
        
        # Check if we're approaching rate limit
        elif response.status_code == 200:
            rate_limit_info = self.parse_rate_limit_headers(response)
            
            if rate_limit_info['remaining'] is not None:
                print(f"Rate limit: {rate_limit_info['remaining']}/{rate_limit_info['limit']} remaining")
                
                # Proactive slowdown when approaching limit
                if rate_limit_info['remaining'] < 10:
                    print("Approaching rate limit, slowing down...")
                    time.sleep(1)
        
        return False  # No retry needed

# Example: Adaptive rate limiting based on server response
class AdaptiveRateLimiter:
    """Dynamically adjust rate based on server responses"""
    
    def __init__(self, initial_delay=1.0):
        self.delay = initial_delay
        self.min_delay = 0.1
        self.max_delay = 60.0
        self.success_count = 0
        self.rate_limit_count = 0
    
    def wait(self):
        """Wait with current delay"""
        if self.delay > 0:
            time.sleep(self.delay)
    
    def on_success(self):
        """Adjust delay after successful request"""
        self.success_count += 1
        
        # Speed up after consecutive successes
        if self.success_count >= 10:
            self.delay = max(self.delay * 0.9, self.min_delay)
            self.success_count = 0
            print(f"Speeding up: delay = {self.delay:.2f}s")
    
    def on_rate_limit(self):
        """Adjust delay after rate limit"""
        self.rate_limit_count += 1
        self.success_count = 0
        
        # Slow down exponentially
        self.delay = min(self.delay * 2, self.max_delay)
        print(f"Slowing down: delay = {self.delay:.2f}s")
    
    def on_error(self):
        """Adjust delay after error"""
        self.success_count = 0
        
        # Slight slowdown
        self.delay = min(self.delay * 1.5, self.max_delay)
        print(f"Error encountered: delay = {self.delay:.2f}s")

Backoff Strategies

Implementation of Various Backoff Algorithms

import time
import random
from functools import wraps

# 1. Fixed Backoff
def fixed_backoff(delay=1.0, max_retries=3):
    """Fixed delay between retries"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    if attempt < max_retries - 1:
                        print(f"Attempt {attempt + 1} failed: {e}")
                        print(f"Waiting {delay} seconds before retry...")
                        time.sleep(delay)
                    else:
                        raise
            return None
        return wrapper
    return decorator

# 2. Linear Backoff
def linear_backoff(initial_delay=1.0, increment=1.0, max_retries=5):
    """Linear increase in delay"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            delay = initial_delay
            
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    if attempt < max_retries - 1:
                        print(f"Attempt {attempt + 1} failed: {e}")
                        print(f"Waiting {delay} seconds before retry...")
                        time.sleep(delay)
                        delay += increment
                    else:
                        raise
            return None
        return wrapper
    return decorator

# 3. Exponential Backoff
def exponential_backoff(initial_delay=1.0, multiplier=2, max_delay=60, max_retries=5):
    """Exponential increase in delay"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            delay = initial_delay
            
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    if attempt < max_retries - 1:
                        print(f"Attempt {attempt + 1} failed: {e}")
                        actual_delay = min(delay, max_delay)
                        print(f"Waiting {actual_delay} seconds before retry...")
                        time.sleep(actual_delay)
                        delay *= multiplier
                    else:
                        raise
            return None
        return wrapper
    return decorator

# 4. Exponential Backoff with Jitter
def exponential_backoff_with_jitter(initial_delay=1.0, max_delay=60, max_retries=5):
    """Exponential backoff with random jitter to prevent thundering herd"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    if attempt < max_retries - 1:
                        # Calculate delay with jitter
                        base_delay = min(initial_delay * (2 ** attempt), max_delay)
                        jitter = random.uniform(0, base_delay * 0.1)  # 10% jitter
                        actual_delay = base_delay + jitter
                        
                        print(f"Attempt {attempt + 1} failed: {e}")
                        print(f"Waiting {actual_delay:.2f} seconds before retry...")
                        time.sleep(actual_delay)
                    else:
                        raise
            return None
        return wrapper
    return decorator

# 5. Fibonacci Backoff
def fibonacci_backoff(max_delay=60, max_retries=8):
    """Fibonacci sequence for delays"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            fib_prev, fib_curr = 0, 1
            
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    if attempt < max_retries - 1:
                        delay = min(fib_curr, max_delay)
                        print(f"Attempt {attempt + 1} failed: {e}")
                        print(f"Waiting {delay} seconds before retry...")
                        time.sleep(delay)
                        
                        fib_prev, fib_curr = fib_curr, fib_prev + fib_curr
                    else:
                        raise
            return None
        return wrapper
    return decorator

# Advanced retry strategy with conditions
class RetryStrategy:
    """Configurable retry strategy"""
    
    def __init__(self, 
                 max_retries=3,
                 backoff_type='exponential',
                 initial_delay=1.0,
                 max_delay=60,
                 retry_on=(Exception,),
                 dont_retry_on=(),
                 on_retry=None):
        self.max_retries = max_retries
        self.backoff_type = backoff_type
        self.initial_delay = initial_delay
        self.max_delay = max_delay
        self.retry_on = retry_on
        self.dont_retry_on = dont_retry_on
        self.on_retry = on_retry
    
    def calculate_delay(self, attempt):
        """Calculate delay based on backoff type"""
        if self.backoff_type == 'fixed':
            return self.initial_delay
        elif self.backoff_type == 'linear':
            return self.initial_delay * (attempt + 1)
        elif self.backoff_type == 'exponential':
            return min(self.initial_delay * (2 ** attempt), self.max_delay)
        elif self.backoff_type == 'fibonacci':
            a, b = 0, self.initial_delay
            for _ in range(attempt):
                a, b = b, a + b
            return min(b, self.max_delay)
        else:
            return self.initial_delay
    
    def should_retry(self, exception):
        """Check if exception should trigger retry"""
        if isinstance(exception, self.dont_retry_on):
            return False
        return isinstance(exception, self.retry_on)
    
    def execute(self, func, *args, **kwargs):
        """Execute function with retry strategy"""
        last_exception = None
        
        for attempt in range(self.max_retries):
            try:
                return func(*args, **kwargs)
            except Exception as e:
                last_exception = e
                
                if not self.should_retry(e) or attempt >= self.max_retries - 1:
                    raise
                
                delay = self.calculate_delay(attempt)
                
                if self.on_retry:
                    self.on_retry(attempt, delay, e)
                
                print(f"Attempt {attempt + 1}/{self.max_retries} failed: {e}")
                print(f"Retrying in {delay:.2f} seconds...")
                time.sleep(delay)
        
        raise last_exception

# Usage example
@exponential_backoff_with_jitter(max_retries=5)
def fetch_data(url):
    response = requests.get(url, timeout=10)
    response.raise_for_status()
    return response.json()

Ethical Web Scraping

Robots.txt and Ethical Guidelines

from urllib.robotparser import RobotFileParser
from urllib.parse import urlparse, urljoin
import requests
import time

class EthicalScraper:
    """Web scraper that follows ethical guidelines"""
    
    def __init__(self, user_agent="EthicalBot/1.0", delay=1.0):
        self.user_agent = user_agent
        self.delay = delay
        self.robots_cache = {}
        self.session = requests.Session()
        self.session.headers.update({'User-Agent': self.user_agent})
        
        # Ethical scraping guidelines
        self.guidelines = """
        ETHICAL SCRAPING GUIDELINES:
        1. Always check and respect robots.txt
        2. Identify yourself with a descriptive User-Agent
        3. Respect rate limits and add delays between requests
        4. Don't scrape personal data without consent
        5. Check and follow the website's Terms of Service
        6. Cache responses to avoid repeated requests
        7. Scrape during off-peak hours when possible
        8. Stop immediately if asked by the website owner
        9. Don't circumvent authentication or paywalls
        10. Give credit and respect copyright
        """
    
    def check_robots_txt(self, url):
        """Check if URL is allowed by robots.txt"""
        parsed_url = urlparse(url)
        base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
        
        # Cache robots.txt per domain
        if base_url not in self.robots_cache:
            rp = RobotFileParser()
            rp.set_url(f"{base_url}/robots.txt")
            
            try:
                rp.read()
                self.robots_cache[base_url] = rp
            except:
                # If robots.txt doesn't exist or can't be read, allow by default
                print(f"Could not read robots.txt for {base_url}")
                self.robots_cache[base_url] = None
        
        rp = self.robots_cache[base_url]
        
        if rp is None:
            return True, None
        
        # Check if URL is allowed
        can_fetch = rp.can_fetch(self.user_agent, url)
        
        # Get crawl delay if specified
        crawl_delay = rp.crawl_delay(self.user_agent)
        
        return can_fetch, crawl_delay
    
    def scrape(self, url, force=False):
        """Scrape URL following ethical guidelines"""
        # Check robots.txt
        allowed, crawl_delay = self.check_robots_txt(url)
        
        if not allowed and not force:
            print(f"Scraping {url} is not allowed by robots.txt")
            return None
        
        if not allowed and force:
            print(f"WARNING: Scraping {url} despite robots.txt (forced)")
        
        # Use crawl delay if specified
        if crawl_delay:
            print(f"Using crawl delay of {crawl_delay} seconds from robots.txt")
            time.sleep(crawl_delay)
        else:
            time.sleep(self.delay)
        
        # Make request
        try:
            response = self.session.get(url, timeout=30)
            response.raise_for_status()
            
            # Check for signs that we're not welcome
            if self._check_blocking_signs(response):
                print(f"Warning: Possible blocking detected for {url}")
            
            return response
            
        except requests.RequestException as e:
            print(f"Error scraping {url}: {e}")
            return None
    
    def _check_blocking_signs(self, response):
        """Check for signs of blocking or rate limiting"""
        signs = [
            'captcha',
            'blocked',
            'denied',
            'forbidden',
            'rate limit',
            'too many requests'
        ]
        
        content = response.text.lower()
        
        for sign in signs:
            if sign in content:
                return True
        
        return False
    
    def print_guidelines(self):
        """Print ethical scraping guidelines"""
        print(self.guidelines)

# Responsible API usage
class ResponsibleAPIClient:
    """API client with built-in ethical practices"""
    
    def __init__(self, api_key, rate_limit=60, rate_window=60):
        self.api_key = api_key
        self.rate_limiter = SlidingWindowRateLimiter(rate_limit, rate_window)
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': 'ResponsibleAPIClient/1.0',
            'X-API-Key': api_key
        })
        
        # Track API usage
        self.usage_stats = {
            'requests': 0,
            'errors': 0,
            'rate_limits': 0,
            'data_retrieved': 0
        }
    
    def request(self, url, **kwargs):
        """Make responsible API request"""
        # Check rate limit
        self.rate_limiter.wait_if_needed()
        
        # Make request
        try:
            response = self.session.get(url, **kwargs)
            self.usage_stats['requests'] += 1
            
            # Track data usage
            if response.content:
                self.usage_stats['data_retrieved'] += len(response.content)
            
            # Handle rate limiting
            if response.status_code == 429:
                self.usage_stats['rate_limits'] += 1
                self._handle_rate_limit(response)
            
            response.raise_for_status()
            return response
            
        except requests.RequestException as e:
            self.usage_stats['errors'] += 1
            raise
    
    def _handle_rate_limit(self, response):
        """Handle rate limit response"""
        retry_after = response.headers.get('Retry-After', 60)
        
        if isinstance(retry_after, str) and retry_after.isdigit():
            wait_time = int(retry_after)
        else:
            wait_time = 60
        
        print(f"Rate limited. Waiting {wait_time} seconds...")
        time.sleep(wait_time)
    
    def get_usage_report(self):
        """Get usage statistics"""
        return {
            'total_requests': self.usage_stats['requests'],
            'error_rate': self.usage_stats['errors'] / max(self.usage_stats['requests'], 1),
            'rate_limit_hits': self.usage_stats['rate_limits'],
            'data_retrieved_mb': self.usage_stats['data_retrieved'] / (1024 * 1024)
        }

Legal and Compliance

Terms of Service Checker

import re
from bs4 import BeautifulSoup

class TermsOfServiceChecker:
    """Check website terms of service for scraping restrictions"""
    
    def __init__(self):
        self.restriction_keywords = [
            'no scraping',
            'no crawling',
            'no automated',
            'no robots',
            'no data mining',
            'no harvesting',
            'prohibited',
            'not permitted',
            'written consent',
            'prior permission'
        ]
        
        self.api_keywords = [
            'api available',
            'developer api',
            'data api',
            'rest api',
            'graphql'
        ]
    
    def find_terms_url(self, homepage_url):
        """Find Terms of Service URL from homepage"""
        try:
            response = requests.get(homepage_url, timeout=10)
            soup = BeautifulSoup(response.content, 'html.parser')
            
            # Common patterns for ToS links
            patterns = [
                'terms', 'tos', 'legal', 'terms-of-service',
                'terms-of-use', 'terms-and-conditions'
            ]
            
            for link in soup.find_all('a'):
                href = link.get('href', '').lower()
                text = link.text.lower()
                
                for pattern in patterns:
                    if pattern in href or pattern in text:
                        return urljoin(homepage_url, link.get('href'))
            
            return None
            
        except Exception as e:
            print(f"Error finding ToS: {e}")
            return None
    
    def check_terms(self, terms_url):
        """Check terms of service for restrictions"""
        try:
            response = requests.get(terms_url, timeout=10)
            soup = BeautifulSoup(response.content, 'html.parser')
            text = soup.get_text().lower()
            
            results = {
                'scraping_prohibited': False,
                'api_available': False,
                'restrictions': [],
                'api_mentions': []
            }
            
            # Check for scraping restrictions
            for keyword in self.restriction_keywords:
                if keyword in text:
                    results['scraping_prohibited'] = True
                    
                    # Find context
                    sentences = text.split('.')
                    for sentence in sentences:
                        if keyword in sentence:
                            results['restrictions'].append(sentence.strip())
            
            # Check for API availability
            for keyword in self.api_keywords:
                if keyword in text:
                    results['api_available'] = True
                    
                    # Find context
                    sentences = text.split('.')
                    for sentence in sentences:
                        if keyword in sentence:
                            results['api_mentions'].append(sentence.strip())
            
            return results
            
        except Exception as e:
            print(f"Error checking ToS: {e}")
            return None
    
    def generate_compliance_report(self, website_url):
        """Generate compliance report for website"""
        report = {
            'website': website_url,
            'timestamp': datetime.now().isoformat(),
            'robots_txt': None,
            'terms_of_service': None,
            'recommendations': []
        }
        
        # Check robots.txt
        scraper = EthicalScraper()
        allowed, crawl_delay = scraper.check_robots_txt(website_url)
        
        report['robots_txt'] = {
            'allowed': allowed,
            'crawl_delay': crawl_delay
        }
        
        if not allowed:
            report['recommendations'].append("Robots.txt prohibits scraping")
        
        # Check Terms of Service
        terms_url = self.find_terms_url(website_url)
        
        if terms_url:
            terms_check = self.check_terms(terms_url)
            report['terms_of_service'] = terms_check
            
            if terms_check and terms_check['scraping_prohibited']:
                report['recommendations'].append("Terms of Service may prohibit scraping")
            
            if terms_check and terms_check['api_available']:
                report['recommendations'].append("API is available - consider using it instead")
        
        # General recommendations
        report['recommendations'].extend([
            "Always respect rate limits",
            "Use descriptive User-Agent",
            "Cache responses when possible",
            "Consider contacting website owner for permission",
            "Document your data source and respect copyright"
        ])
        
        return report

# GDPR Compliance
class GDPRCompliantScraper:
    """Scraper that follows GDPR guidelines"""
    
    def __init__(self):
        self.personal_data_patterns = [
            r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',  # Email
            r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b',  # Phone (US)
            r'\b\d{9,}\b',  # SSN-like numbers
            r'\b(?:\d{4}[-\s]?){3}\d{4}\b',  # Credit card
        ]
        
        self.session = requests.Session()
    
    def contains_personal_data(self, text):
        """Check if text contains personal data"""
        for pattern in self.personal_data_patterns:
            if re.search(pattern, text):
                return True
        return False
    
    def anonymize_data(self, text):
        """Anonymize personal data in text"""
        # Replace emails
        text = re.sub(
            r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
            '[EMAIL_REDACTED]',
            text
        )
        
        # Replace phone numbers
        text = re.sub(
            r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b',
            '[PHONE_REDACTED]',
            text
        )
        
        # Replace potential SSN
        text = re.sub(
            r'\b\d{9}\b',
            '[ID_REDACTED]',
            text
        )
        
        # Replace credit card-like numbers
        text = re.sub(
            r'\b(?:\d{4}[-\s]?){3}\d{4}\b',
            '[CC_REDACTED]',
            text
        )
        
        return text
    
    def scrape_with_privacy(self, url, anonymize=True):
        """Scrape with privacy protection"""
        response = self.session.get(url)
        
        if response.status_code == 200:
            content = response.text
            
            # Check for personal data
            if self.contains_personal_data(content):
                print("Warning: Personal data detected in content")
                
                if anonymize:
                    content = self.anonymize_data(content)
                    print("Personal data has been anonymized")
                else:
                    print("Consider anonymizing or not storing this data")
            
            return content
        
        return None

Distributed and Polite Scraping

Polite Scraping Architecture

import queue
import threading
from datetime import datetime, timedelta
import hashlib

class PoliteScrapeQueue:
    """Polite scraping queue with per-domain rate limiting"""
    
    def __init__(self, default_delay=1.0, max_workers=5):
        self.default_delay = default_delay
        self.max_workers = max_workers
        self.domain_queues = {}
        self.domain_last_access = {}
        self.domain_delays = {}
        self.lock = threading.Lock()
        self.workers = []
        self.running = True
    
    def add_url(self, url, priority=5, delay=None):
        """Add URL to appropriate domain queue"""
        domain = urlparse(url).netloc
        
        with self.lock:
            if domain not in self.domain_queues:
                self.domain_queues[domain] = queue.PriorityQueue()
                self.domain_delays[domain] = delay or self.default_delay
                self.domain_last_access[domain] = 0
            
            # Add to queue with priority
            self.domain_queues[domain].put((priority, url))
    
    def get_next_url(self):
        """Get next URL respecting per-domain delays"""
        with self.lock:
            current_time = time.time()
            best_domain = None
            min_wait = float('inf')
            
            # Find domain that can be accessed soonest
            for domain, q in self.domain_queues.items():
                if q.empty():
                    continue
                
                last_access = self.domain_last_access[domain]
                delay = self.domain_delays[domain]
                time_since_access = current_time - last_access
                
                if time_since_access >= delay:
                    # Can access now
                    best_domain = domain
                    break
                else:
                    # Calculate wait time
                    wait_time = delay - time_since_access
                    if wait_time < min_wait:
                        min_wait = wait_time
                        best_domain = domain
            
            if best_domain:
                # Check if we need to wait
                last_access = self.domain_last_access[best_domain]
                delay = self.domain_delays[best_domain]
                time_since_access = current_time - last_access
                
                if time_since_access < delay:
                    wait_time = delay - time_since_access
                    time.sleep(wait_time)
                
                # Get URL from queue
                priority, url = self.domain_queues[best_domain].get()
                self.domain_last_access[best_domain] = time.time()
                
                return url
            
            return None

class DistributedScraper:
    """Distributed scraping with coordination"""
    
    def __init__(self, num_workers=5, cache_dir='cache'):
        self.num_workers = num_workers
        self.cache_dir = cache_dir
        self.task_queue = queue.Queue()
        self.result_queue = queue.Queue()
        self.seen_urls = set()
        self.lock = threading.Lock()
        
        # Create cache directory
        import os
        os.makedirs(cache_dir, exist_ok=True)
    
    def _get_cache_path(self, url):
        """Get cache file path for URL"""
        url_hash = hashlib.md5(url.encode()).hexdigest()
        return f"{self.cache_dir}/{url_hash}.cache"
    
    def _is_cached(self, url, max_age_hours=24):
        """Check if URL is cached and fresh"""
        cache_path = self._get_cache_path(url)
        
        if os.path.exists(cache_path):
            # Check age
            file_time = datetime.fromtimestamp(os.path.getmtime(cache_path))
            age = datetime.now() - file_time
            
            if age < timedelta(hours=max_age_hours):
                return True
        
        return False
    
    def _get_from_cache(self, url):
        """Get content from cache"""
        cache_path = self._get_cache_path(url)
        
        try:
            with open(cache_path, 'r', encoding='utf-8') as f:
                return f.read()
        except:
            return None
    
    def _save_to_cache(self, url, content):
        """Save content to cache"""
        cache_path = self._get_cache_path(url)
        
        try:
            with open(cache_path, 'w', encoding='utf-8') as f:
                f.write(content)
        except:
            pass
    
    def worker(self, worker_id):
        """Worker thread for scraping"""
        session = requests.Session()
        session.headers.update({
            'User-Agent': f'DistributedScraper/1.0 (Worker-{worker_id})'
        })
        
        while True:
            try:
                # Get task from queue
                task = self.task_queue.get(timeout=5)
                
                if task is None:
                    break
                
                url = task['url']
                
                # Check cache first
                if self._is_cached(url):
                    content = self._get_from_cache(url)
                    if content:
                        print(f"Worker {worker_id}: Using cached {url}")
                        self.result_queue.put({
                            'url': url,
                            'content': content,
                            'from_cache': True
                        })
                        continue
                
                # Scrape URL
                print(f"Worker {worker_id}: Scraping {url}")
                
                try:
                    response = session.get(url, timeout=30)
                    response.raise_for_status()
                    
                    content = response.text
                    
                    # Save to cache
                    self._save_to_cache(url, content)
                    
                    # Add to results
                    self.result_queue.put({
                        'url': url,
                        'content': content,
                        'from_cache': False
                    })
                    
                except Exception as e:
                    print(f"Worker {worker_id}: Error scraping {url}: {e}")
                    self.result_queue.put({
                        'url': url,
                        'error': str(e)
                    })
                
                # Be polite
                time.sleep(1)
                
            except queue.Empty:
                continue
            except Exception as e:
                print(f"Worker {worker_id}: Unexpected error: {e}")
    
    def scrape_urls(self, urls):
        """Scrape multiple URLs with workers"""
        # Add URLs to task queue
        for url in urls:
            with self.lock:
                if url not in self.seen_urls:
                    self.seen_urls.add(url)
                    self.task_queue.put({'url': url})
        
        # Start workers
        workers = []
        for i in range(self.num_workers):
            t = threading.Thread(target=self.worker, args=(i,))
            t.start()
            workers.append(t)
        
        # Wait for completion
        self.task_queue.join()
        
        # Stop workers
        for _ in range(self.num_workers):
            self.task_queue.put(None)
        
        for t in workers:
            t.join()
        
        # Collect results
        results = []
        while not self.result_queue.empty():
            results.append(self.result_queue.get())
        
        return results

Monitoring and Alerts

import logging
from datetime import datetime, timedelta
import smtplib
from email.mime.text import MIMEText

class ScrapingMonitor:
    """Monitor scraping activities and send alerts"""
    
    def __init__(self, alert_email=None):
        self.alert_email = alert_email
        self.stats = {
            'total_requests': 0,
            'successful_requests': 0,
            'failed_requests': 0,
            'rate_limits_hit': 0,
            'robots_txt_blocks': 0,
            'total_data_mb': 0,
            'start_time': datetime.now()
        }
        self.alerts = []
        
        # Set up logging
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('scraping.log'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)
    
    def log_request(self, url, success=True, data_size=0, rate_limited=False, robots_blocked=False):
        """Log scraping request"""
        self.stats['total_requests'] += 1
        
        if success:
            self.stats['successful_requests'] += 1
        else:
            self.stats['failed_requests'] += 1
        
        if rate_limited:
            self.stats['rate_limits_hit'] += 1
            self.check_rate_limit_alert()
        
        if robots_blocked:
            self.stats['robots_txt_blocks'] += 1
        
        self.stats['total_data_mb'] += data_size / (1024 * 1024)
        
        # Log
        status = 'SUCCESS' if success else 'FAILED'
        self.logger.info(f"{status}: {url} - Size: {data_size} bytes")
    
    def check_rate_limit_alert(self):
        """Check if rate limit alerts should be sent"""
        # Alert if hit rate limits too frequently
        if self.stats['rate_limits_hit'] >= 10:
            self.send_alert(
                "High Rate Limit Hits",
                f"Hit rate limits {self.stats['rate_limits_hit']} times"
            )
            self.stats['rate_limits_hit'] = 0  # Reset counter
    
    def send_alert(self, subject, message):
        """Send alert email"""
        alert = {
            'timestamp': datetime.now(),
            'subject': subject,
            'message': message
        }
        self.alerts.append(alert)
        
        self.logger.warning(f"ALERT: {subject} - {message}")
        
        if self.alert_email:
            # Send email alert
            try:
                msg = MIMEText(message)
                msg['Subject'] = f"Scraping Alert: {subject}"
                msg['From'] = 'scraper@example.com'
                msg['To'] = self.alert_email
                
                # Configure SMTP server
                # smtp = smtplib.SMTP('localhost')
                # smtp.send_message(msg)
                # smtp.quit()
            except Exception as e:
                self.logger.error(f"Failed to send email alert: {e}")
    
    def get_report(self):
        """Generate scraping report"""
        runtime = datetime.now() - self.stats['start_time']
        
        report = {
            'runtime': str(runtime),
            'total_requests': self.stats['total_requests'],
            'success_rate': self.stats['successful_requests'] / max(self.stats['total_requests'], 1),
            'failed_requests': self.stats['failed_requests'],
            'rate_limits_hit': self.stats['rate_limits_hit'],
            'robots_blocks': self.stats['robots_txt_blocks'],
            'data_scraped_mb': self.stats['total_data_mb'],
            'requests_per_minute': self.stats['total_requests'] / max(runtime.total_seconds() / 60, 1),
            'alerts': self.alerts
        }
        
        return report
    
    def print_summary(self):
        """Print summary statistics"""
        report = self.get_report()
        
        print("\n" + "="*50)
        print("SCRAPING SUMMARY")
        print("="*50)
        print(f"Runtime: {report['runtime']}")
        print(f"Total Requests: {report['total_requests']}")
        print(f"Success Rate: {report['success_rate']:.2%}")
        print(f"Failed Requests: {report['failed_requests']}")
        print(f"Rate Limits Hit: {report['rate_limits_hit']}")
        print(f"Robots.txt Blocks: {report['robots_blocks']}")
        print(f"Data Scraped: {report['data_scraped_mb']:.2f} MB")
        print(f"Requests/Minute: {report['requests_per_minute']:.2f}")
        
        if report['alerts']:
            print(f"\nAlerts: {len(report['alerts'])}")
            for alert in report['alerts'][-5:]:  # Show last 5 alerts
                print(f"  - {alert['timestamp']}: {alert['subject']}")
        
        print("="*50)

Best Practices Summary

# Complete ethical scraping template

class BestPracticesScraper:
    """Template for ethical and responsible web scraping"""
    
    def __init__(self, name="EthicalScraper", version="1.0"):
        self.user_agent = f"{name}/{version} (Contact: your-email@example.com)"
        self.session = requests.Session()
        self.session.headers.update({'User-Agent': self.user_agent})
        
        # Rate limiting
        self.rate_limiter = TokenBucketRateLimiter(tokens_per_second=1)
        
        # Retry strategy
        self.retry_strategy = RetryStrategy(
            max_retries=3,
            backoff_type='exponential',
            initial_delay=1.0
        )
        
        # Monitoring
        self.monitor = ScrapingMonitor()
        
        # Cache
        self.cache = {}
        
        # Best practices checklist
        self.checklist = {
            'robots_txt_checked': False,
            'terms_of_service_reviewed': False,
            'rate_limiting_enabled': True,
            'user_agent_set': True,
            'caching_enabled': True,
            'monitoring_enabled': True,
            'gdpr_compliant': False,
            'error_handling': True
        }
    
    def pre_scrape_checklist(self, url):
        """Run pre-scraping checklist"""
        print("Pre-Scraping Checklist:")
        print("-" * 30)
        
        # Check robots.txt
        allowed, delay = self.check_robots_txt(url)
        self.checklist['robots_txt_checked'] = True
        print(f"βœ“ Robots.txt checked: {'Allowed' if allowed else 'Blocked'}")
        
        # Check rate limiting
        print(f"βœ“ Rate limiting: {self.rate_limiter.tokens_per_second} req/sec")
        
        # User agent
        print(f"βœ“ User-Agent: {self.user_agent}")
        
        # Recommendations
        if not allowed:
            print("\n⚠️ WARNING: Robots.txt prohibits scraping this URL")
            print("Consider:")
            print("  - Looking for an official API")
            print("  - Contacting the website owner")
            print("  - Finding alternative data sources")
        
        return allowed
    
    def scrape_responsibly(self, urls):
        """Main scraping method with all best practices"""
        results = []
        
        for url in urls:
            # Pre-scrape checks
            if not self.pre_scrape_checklist(url):
                continue
            
            # Check cache
            if url in self.cache:
                print(f"Using cached result for {url}")
                results.append(self.cache[url])
                continue
            
            # Rate limiting
            self.rate_limiter.wait_if_needed()
            
            # Scrape with retry
            try:
                response = self.retry_strategy.execute(
                    self._make_request, url
                )
                
                # Cache result
                self.cache[url] = response
                results.append(response)
                
                # Monitor
                self.monitor.log_request(
                    url, 
                    success=True,
                    data_size=len(response.content)
                )
                
            except Exception as e:
                print(f"Failed to scrape {url}: {e}")
                self.monitor.log_request(url, success=False)
        
        # Print summary
        self.monitor.print_summary()
        
        return results
    
    def _make_request(self, url):
        """Make HTTP request"""
        response = self.session.get(url, timeout=30)
        response.raise_for_status()
        return response

# Usage
scraper = BestPracticesScraper()
urls = ['https://example.com/page1', 'https://example.com/page2']
results = scraper.scrape_responsibly(urls)

Practice Exercises

Exercise 1: Build a Rate-Limited API Client

Create a comprehensive API client that:

  1. Implements adaptive rate limiting
  2. Handles all rate limit response headers
  3. Uses exponential backoff with jitter
  4. Monitors and reports API usage
  5. Caches responses appropriately

Exercise 2: Ethical Scraper Framework

Build a framework that:

  1. Checks robots.txt and ToS automatically
  2. Implements per-domain rate limiting
  3. Detects and handles blocking
  4. Anonymizes personal data
  5. Generates compliance reports

Exercise 3: Distributed Scraping System

Create a distributed scraper that:

  1. Coordinates multiple workers
  2. Implements domain-based queuing
  3. Handles failures gracefully
  4. Provides real-time monitoring
  5. Respects rate limits across workers

Key Takeaways

Further Resources