Rate Limiting and Ethics
Be a Responsible Data Citizen! π€
With great scraping power comes great responsibility. Rate limiting isn't just about avoiding IP bansβit's about respecting the services we use, following legal guidelines, and being a good citizen of the web. Learn to build scrapers that are not only effective but also ethical, sustainable, and legally compliant.
Understanding Rate Limiting
graph TD
A[Rate Limiting] --> B[Client-Side]
A --> C[Server-Side]
B --> D[Request Throttling]
B --> E[Backoff Strategies]
B --> F[Queue Management]
C --> G[429 Status]
C --> H[Retry-After Header]
C --> I[X-RateLimit Headers]
E --> J[Fixed Delay]
E --> K[Exponential Backoff]
E --> L[Jitter]
I --> M[Limit]
I --> N[Remaining]
I --> O[Reset]
Types of Rate Limits
Common Rate Limiting Patterns
import time
import requests
from datetime import datetime, timedelta
from collections import deque
import threading
import random
# 1. Fixed Window Rate Limiting
class FixedWindowRateLimiter:
"""Fixed time window rate limiting"""
def __init__(self, max_requests=60, window_seconds=60):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.window_start = time.time()
self.request_count = 0
self.lock = threading.Lock()
def allow_request(self):
"""Check if request is allowed"""
with self.lock:
current_time = time.time()
# Check if we're in a new window
if current_time - self.window_start >= self.window_seconds:
self.window_start = current_time
self.request_count = 0
# Check if we can make a request
if self.request_count < self.max_requests:
self.request_count += 1
return True
return False
def wait_if_needed(self):
"""Wait until request is allowed"""
while not self.allow_request():
# Calculate time until next window
time_to_wait = self.window_seconds - (time.time() - self.window_start)
if time_to_wait > 0:
print(f"Rate limit reached. Waiting {time_to_wait:.2f} seconds...")
time.sleep(min(time_to_wait, 1))
# 2. Sliding Window Rate Limiting
class SlidingWindowRateLimiter:
"""Sliding window rate limiting"""
def __init__(self, max_requests=60, window_seconds=60):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.requests = deque()
self.lock = threading.Lock()
def allow_request(self):
"""Check if request is allowed"""
with self.lock:
current_time = time.time()
# Remove old requests outside the window
while self.requests and self.requests[0] < current_time - self.window_seconds:
self.requests.popleft()
# Check if we can make a request
if len(self.requests) < self.max_requests:
self.requests.append(current_time)
return True
return False
def wait_if_needed(self):
"""Wait until request is allowed"""
while not self.allow_request():
with self.lock:
if self.requests:
# Wait until oldest request expires
oldest = self.requests[0]
time_to_wait = (oldest + self.window_seconds) - time.time()
if time_to_wait > 0:
print(f"Rate limit reached. Waiting {time_to_wait:.2f} seconds...")
time.sleep(min(time_to_wait, 0.1))
else:
time.sleep(0.1)
# 3. Token Bucket Algorithm
class TokenBucketRateLimiter:
"""Token bucket rate limiting algorithm"""
def __init__(self, tokens_per_second=1, bucket_size=10):
self.tokens_per_second = tokens_per_second
self.bucket_size = bucket_size
self.tokens = bucket_size
self.last_update = time.time()
self.lock = threading.Lock()
def _refill(self):
"""Refill tokens based on elapsed time"""
current_time = time.time()
elapsed = current_time - self.last_update
# Add new tokens
new_tokens = elapsed * self.tokens_per_second
self.tokens = min(self.tokens + new_tokens, self.bucket_size)
self.last_update = current_time
def allow_request(self, tokens=1):
"""Check if request is allowed"""
with self.lock:
self._refill()
if self.tokens >= tokens:
self.tokens -= tokens
return True
return False
def wait_if_needed(self, tokens=1):
"""Wait until tokens are available"""
while not self.allow_request(tokens):
with self.lock:
self._refill()
if self.tokens < tokens:
# Calculate wait time
tokens_needed = tokens - self.tokens
time_to_wait = tokens_needed / self.tokens_per_second
print(f"Waiting {time_to_wait:.2f} seconds for tokens...")
time.sleep(0.1)
# 4. Leaky Bucket Algorithm
class LeakyBucketRateLimiter:
"""Leaky bucket rate limiting"""
def __init__(self, leak_rate=1, bucket_size=10):
self.leak_rate = leak_rate # Requests per second
self.bucket_size = bucket_size
self.queue = deque(maxlen=bucket_size)
self.last_leak = time.time()
self.lock = threading.Lock()
# Start leak thread
self.running = True
self.leak_thread = threading.Thread(target=self._leak_loop)
self.leak_thread.daemon = True
self.leak_thread.start()
def _leak_loop(self):
"""Background thread to process requests"""
while self.running:
time.sleep(1 / self.leak_rate)
with self.lock:
if self.queue:
self.queue.popleft()
def allow_request(self):
"""Add request to bucket"""
with self.lock:
if len(self.queue) < self.bucket_size:
self.queue.append(time.time())
return True
return False
def stop(self):
"""Stop the leak thread"""
self.running = False
self.leak_thread.join()
Handling Server Rate Limits
Response Header Parsing
import time
import requests
from datetime import datetime, timedelta
class ServerRateLimitHandler:
"""Handle server-side rate limiting responses"""
def __init__(self):
self.rate_limits = {}
self.retry_after = {}
def parse_rate_limit_headers(self, response):
"""Parse rate limit information from headers"""
headers = response.headers
# Standard rate limit headers
rate_limit_info = {
'limit': None,
'remaining': None,
'reset': None,
'retry_after': None
}
# X-RateLimit headers (GitHub, Twitter style)
if 'X-RateLimit-Limit' in headers:
rate_limit_info['limit'] = int(headers['X-RateLimit-Limit'])
if 'X-RateLimit-Remaining' in headers:
rate_limit_info['remaining'] = int(headers['X-RateLimit-Remaining'])
if 'X-RateLimit-Reset' in headers:
reset_timestamp = int(headers['X-RateLimit-Reset'])
rate_limit_info['reset'] = datetime.fromtimestamp(reset_timestamp)
# Alternative header names
if 'X-Rate-Limit-Limit' in headers:
rate_limit_info['limit'] = int(headers['X-Rate-Limit-Limit'])
if 'X-Rate-Limit-Remaining' in headers:
rate_limit_info['remaining'] = int(headers['X-Rate-Limit-Remaining'])
if 'X-Rate-Limit-Reset' in headers:
reset_timestamp = int(headers['X-Rate-Limit-Reset'])
rate_limit_info['reset'] = datetime.fromtimestamp(reset_timestamp)
# Retry-After header (429 responses)
if 'Retry-After' in headers:
retry_after = headers['Retry-After']
if retry_after.isdigit():
# Seconds to wait
rate_limit_info['retry_after'] = int(retry_after)
else:
# HTTP date
retry_time = datetime.strptime(retry_after, '%a, %d %b %Y %H:%M:%S GMT')
rate_limit_info['retry_after'] = (retry_time - datetime.utcnow()).total_seconds()
return rate_limit_info
def should_wait(self, rate_limit_info):
"""Determine if we should wait based on rate limit info"""
if rate_limit_info['retry_after']:
return True, rate_limit_info['retry_after']
if rate_limit_info['remaining'] is not None and rate_limit_info['remaining'] <= 0:
if rate_limit_info['reset']:
wait_time = (rate_limit_info['reset'] - datetime.now()).total_seconds()
return True, max(wait_time, 0)
return False, 0
def handle_response(self, response, url=None):
"""Handle rate limited response"""
# Check for rate limit status
if response.status_code == 429:
print(f"Rate limited (429) for {url or response.url}")
rate_limit_info = self.parse_rate_limit_headers(response)
should_wait, wait_time = self.should_wait(rate_limit_info)
if should_wait:
print(f"Waiting {wait_time:.2f} seconds before retry...")
time.sleep(wait_time)
return True # Should retry
# Check if we're approaching rate limit
elif response.status_code == 200:
rate_limit_info = self.parse_rate_limit_headers(response)
if rate_limit_info['remaining'] is not None:
print(f"Rate limit: {rate_limit_info['remaining']}/{rate_limit_info['limit']} remaining")
# Proactive slowdown when approaching limit
if rate_limit_info['remaining'] < 10:
print("Approaching rate limit, slowing down...")
time.sleep(1)
return False # No retry needed
# Example: Adaptive rate limiting based on server response
class AdaptiveRateLimiter:
"""Dynamically adjust rate based on server responses"""
def __init__(self, initial_delay=1.0):
self.delay = initial_delay
self.min_delay = 0.1
self.max_delay = 60.0
self.success_count = 0
self.rate_limit_count = 0
def wait(self):
"""Wait with current delay"""
if self.delay > 0:
time.sleep(self.delay)
def on_success(self):
"""Adjust delay after successful request"""
self.success_count += 1
# Speed up after consecutive successes
if self.success_count >= 10:
self.delay = max(self.delay * 0.9, self.min_delay)
self.success_count = 0
print(f"Speeding up: delay = {self.delay:.2f}s")
def on_rate_limit(self):
"""Adjust delay after rate limit"""
self.rate_limit_count += 1
self.success_count = 0
# Slow down exponentially
self.delay = min(self.delay * 2, self.max_delay)
print(f"Slowing down: delay = {self.delay:.2f}s")
def on_error(self):
"""Adjust delay after error"""
self.success_count = 0
# Slight slowdown
self.delay = min(self.delay * 1.5, self.max_delay)
print(f"Error encountered: delay = {self.delay:.2f}s")
Backoff Strategies
Implementation of Various Backoff Algorithms
import time
import random
from functools import wraps
# 1. Fixed Backoff
def fixed_backoff(delay=1.0, max_retries=3):
"""Fixed delay between retries"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt < max_retries - 1:
print(f"Attempt {attempt + 1} failed: {e}")
print(f"Waiting {delay} seconds before retry...")
time.sleep(delay)
else:
raise
return None
return wrapper
return decorator
# 2. Linear Backoff
def linear_backoff(initial_delay=1.0, increment=1.0, max_retries=5):
"""Linear increase in delay"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
delay = initial_delay
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt < max_retries - 1:
print(f"Attempt {attempt + 1} failed: {e}")
print(f"Waiting {delay} seconds before retry...")
time.sleep(delay)
delay += increment
else:
raise
return None
return wrapper
return decorator
# 3. Exponential Backoff
def exponential_backoff(initial_delay=1.0, multiplier=2, max_delay=60, max_retries=5):
"""Exponential increase in delay"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
delay = initial_delay
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt < max_retries - 1:
print(f"Attempt {attempt + 1} failed: {e}")
actual_delay = min(delay, max_delay)
print(f"Waiting {actual_delay} seconds before retry...")
time.sleep(actual_delay)
delay *= multiplier
else:
raise
return None
return wrapper
return decorator
# 4. Exponential Backoff with Jitter
def exponential_backoff_with_jitter(initial_delay=1.0, max_delay=60, max_retries=5):
"""Exponential backoff with random jitter to prevent thundering herd"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt < max_retries - 1:
# Calculate delay with jitter
base_delay = min(initial_delay * (2 ** attempt), max_delay)
jitter = random.uniform(0, base_delay * 0.1) # 10% jitter
actual_delay = base_delay + jitter
print(f"Attempt {attempt + 1} failed: {e}")
print(f"Waiting {actual_delay:.2f} seconds before retry...")
time.sleep(actual_delay)
else:
raise
return None
return wrapper
return decorator
# 5. Fibonacci Backoff
def fibonacci_backoff(max_delay=60, max_retries=8):
"""Fibonacci sequence for delays"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
fib_prev, fib_curr = 0, 1
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt < max_retries - 1:
delay = min(fib_curr, max_delay)
print(f"Attempt {attempt + 1} failed: {e}")
print(f"Waiting {delay} seconds before retry...")
time.sleep(delay)
fib_prev, fib_curr = fib_curr, fib_prev + fib_curr
else:
raise
return None
return wrapper
return decorator
# Advanced retry strategy with conditions
class RetryStrategy:
"""Configurable retry strategy"""
def __init__(self,
max_retries=3,
backoff_type='exponential',
initial_delay=1.0,
max_delay=60,
retry_on=(Exception,),
dont_retry_on=(),
on_retry=None):
self.max_retries = max_retries
self.backoff_type = backoff_type
self.initial_delay = initial_delay
self.max_delay = max_delay
self.retry_on = retry_on
self.dont_retry_on = dont_retry_on
self.on_retry = on_retry
def calculate_delay(self, attempt):
"""Calculate delay based on backoff type"""
if self.backoff_type == 'fixed':
return self.initial_delay
elif self.backoff_type == 'linear':
return self.initial_delay * (attempt + 1)
elif self.backoff_type == 'exponential':
return min(self.initial_delay * (2 ** attempt), self.max_delay)
elif self.backoff_type == 'fibonacci':
a, b = 0, self.initial_delay
for _ in range(attempt):
a, b = b, a + b
return min(b, self.max_delay)
else:
return self.initial_delay
def should_retry(self, exception):
"""Check if exception should trigger retry"""
if isinstance(exception, self.dont_retry_on):
return False
return isinstance(exception, self.retry_on)
def execute(self, func, *args, **kwargs):
"""Execute function with retry strategy"""
last_exception = None
for attempt in range(self.max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
last_exception = e
if not self.should_retry(e) or attempt >= self.max_retries - 1:
raise
delay = self.calculate_delay(attempt)
if self.on_retry:
self.on_retry(attempt, delay, e)
print(f"Attempt {attempt + 1}/{self.max_retries} failed: {e}")
print(f"Retrying in {delay:.2f} seconds...")
time.sleep(delay)
raise last_exception
# Usage example
@exponential_backoff_with_jitter(max_retries=5)
def fetch_data(url):
response = requests.get(url, timeout=10)
response.raise_for_status()
return response.json()
Ethical Web Scraping
Robots.txt and Ethical Guidelines
from urllib.robotparser import RobotFileParser
from urllib.parse import urlparse, urljoin
import requests
import time
class EthicalScraper:
"""Web scraper that follows ethical guidelines"""
def __init__(self, user_agent="EthicalBot/1.0", delay=1.0):
self.user_agent = user_agent
self.delay = delay
self.robots_cache = {}
self.session = requests.Session()
self.session.headers.update({'User-Agent': self.user_agent})
# Ethical scraping guidelines
self.guidelines = """
ETHICAL SCRAPING GUIDELINES:
1. Always check and respect robots.txt
2. Identify yourself with a descriptive User-Agent
3. Respect rate limits and add delays between requests
4. Don't scrape personal data without consent
5. Check and follow the website's Terms of Service
6. Cache responses to avoid repeated requests
7. Scrape during off-peak hours when possible
8. Stop immediately if asked by the website owner
9. Don't circumvent authentication or paywalls
10. Give credit and respect copyright
"""
def check_robots_txt(self, url):
"""Check if URL is allowed by robots.txt"""
parsed_url = urlparse(url)
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
# Cache robots.txt per domain
if base_url not in self.robots_cache:
rp = RobotFileParser()
rp.set_url(f"{base_url}/robots.txt")
try:
rp.read()
self.robots_cache[base_url] = rp
except:
# If robots.txt doesn't exist or can't be read, allow by default
print(f"Could not read robots.txt for {base_url}")
self.robots_cache[base_url] = None
rp = self.robots_cache[base_url]
if rp is None:
return True, None
# Check if URL is allowed
can_fetch = rp.can_fetch(self.user_agent, url)
# Get crawl delay if specified
crawl_delay = rp.crawl_delay(self.user_agent)
return can_fetch, crawl_delay
def scrape(self, url, force=False):
"""Scrape URL following ethical guidelines"""
# Check robots.txt
allowed, crawl_delay = self.check_robots_txt(url)
if not allowed and not force:
print(f"Scraping {url} is not allowed by robots.txt")
return None
if not allowed and force:
print(f"WARNING: Scraping {url} despite robots.txt (forced)")
# Use crawl delay if specified
if crawl_delay:
print(f"Using crawl delay of {crawl_delay} seconds from robots.txt")
time.sleep(crawl_delay)
else:
time.sleep(self.delay)
# Make request
try:
response = self.session.get(url, timeout=30)
response.raise_for_status()
# Check for signs that we're not welcome
if self._check_blocking_signs(response):
print(f"Warning: Possible blocking detected for {url}")
return response
except requests.RequestException as e:
print(f"Error scraping {url}: {e}")
return None
def _check_blocking_signs(self, response):
"""Check for signs of blocking or rate limiting"""
signs = [
'captcha',
'blocked',
'denied',
'forbidden',
'rate limit',
'too many requests'
]
content = response.text.lower()
for sign in signs:
if sign in content:
return True
return False
def print_guidelines(self):
"""Print ethical scraping guidelines"""
print(self.guidelines)
# Responsible API usage
class ResponsibleAPIClient:
"""API client with built-in ethical practices"""
def __init__(self, api_key, rate_limit=60, rate_window=60):
self.api_key = api_key
self.rate_limiter = SlidingWindowRateLimiter(rate_limit, rate_window)
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'ResponsibleAPIClient/1.0',
'X-API-Key': api_key
})
# Track API usage
self.usage_stats = {
'requests': 0,
'errors': 0,
'rate_limits': 0,
'data_retrieved': 0
}
def request(self, url, **kwargs):
"""Make responsible API request"""
# Check rate limit
self.rate_limiter.wait_if_needed()
# Make request
try:
response = self.session.get(url, **kwargs)
self.usage_stats['requests'] += 1
# Track data usage
if response.content:
self.usage_stats['data_retrieved'] += len(response.content)
# Handle rate limiting
if response.status_code == 429:
self.usage_stats['rate_limits'] += 1
self._handle_rate_limit(response)
response.raise_for_status()
return response
except requests.RequestException as e:
self.usage_stats['errors'] += 1
raise
def _handle_rate_limit(self, response):
"""Handle rate limit response"""
retry_after = response.headers.get('Retry-After', 60)
if isinstance(retry_after, str) and retry_after.isdigit():
wait_time = int(retry_after)
else:
wait_time = 60
print(f"Rate limited. Waiting {wait_time} seconds...")
time.sleep(wait_time)
def get_usage_report(self):
"""Get usage statistics"""
return {
'total_requests': self.usage_stats['requests'],
'error_rate': self.usage_stats['errors'] / max(self.usage_stats['requests'], 1),
'rate_limit_hits': self.usage_stats['rate_limits'],
'data_retrieved_mb': self.usage_stats['data_retrieved'] / (1024 * 1024)
}
Legal and Compliance
Terms of Service Checker
import re
from bs4 import BeautifulSoup
class TermsOfServiceChecker:
"""Check website terms of service for scraping restrictions"""
def __init__(self):
self.restriction_keywords = [
'no scraping',
'no crawling',
'no automated',
'no robots',
'no data mining',
'no harvesting',
'prohibited',
'not permitted',
'written consent',
'prior permission'
]
self.api_keywords = [
'api available',
'developer api',
'data api',
'rest api',
'graphql'
]
def find_terms_url(self, homepage_url):
"""Find Terms of Service URL from homepage"""
try:
response = requests.get(homepage_url, timeout=10)
soup = BeautifulSoup(response.content, 'html.parser')
# Common patterns for ToS links
patterns = [
'terms', 'tos', 'legal', 'terms-of-service',
'terms-of-use', 'terms-and-conditions'
]
for link in soup.find_all('a'):
href = link.get('href', '').lower()
text = link.text.lower()
for pattern in patterns:
if pattern in href or pattern in text:
return urljoin(homepage_url, link.get('href'))
return None
except Exception as e:
print(f"Error finding ToS: {e}")
return None
def check_terms(self, terms_url):
"""Check terms of service for restrictions"""
try:
response = requests.get(terms_url, timeout=10)
soup = BeautifulSoup(response.content, 'html.parser')
text = soup.get_text().lower()
results = {
'scraping_prohibited': False,
'api_available': False,
'restrictions': [],
'api_mentions': []
}
# Check for scraping restrictions
for keyword in self.restriction_keywords:
if keyword in text:
results['scraping_prohibited'] = True
# Find context
sentences = text.split('.')
for sentence in sentences:
if keyword in sentence:
results['restrictions'].append(sentence.strip())
# Check for API availability
for keyword in self.api_keywords:
if keyword in text:
results['api_available'] = True
# Find context
sentences = text.split('.')
for sentence in sentences:
if keyword in sentence:
results['api_mentions'].append(sentence.strip())
return results
except Exception as e:
print(f"Error checking ToS: {e}")
return None
def generate_compliance_report(self, website_url):
"""Generate compliance report for website"""
report = {
'website': website_url,
'timestamp': datetime.now().isoformat(),
'robots_txt': None,
'terms_of_service': None,
'recommendations': []
}
# Check robots.txt
scraper = EthicalScraper()
allowed, crawl_delay = scraper.check_robots_txt(website_url)
report['robots_txt'] = {
'allowed': allowed,
'crawl_delay': crawl_delay
}
if not allowed:
report['recommendations'].append("Robots.txt prohibits scraping")
# Check Terms of Service
terms_url = self.find_terms_url(website_url)
if terms_url:
terms_check = self.check_terms(terms_url)
report['terms_of_service'] = terms_check
if terms_check and terms_check['scraping_prohibited']:
report['recommendations'].append("Terms of Service may prohibit scraping")
if terms_check and terms_check['api_available']:
report['recommendations'].append("API is available - consider using it instead")
# General recommendations
report['recommendations'].extend([
"Always respect rate limits",
"Use descriptive User-Agent",
"Cache responses when possible",
"Consider contacting website owner for permission",
"Document your data source and respect copyright"
])
return report
# GDPR Compliance
class GDPRCompliantScraper:
"""Scraper that follows GDPR guidelines"""
def __init__(self):
self.personal_data_patterns = [
r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', # Email
r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b', # Phone (US)
r'\b\d{9,}\b', # SSN-like numbers
r'\b(?:\d{4}[-\s]?){3}\d{4}\b', # Credit card
]
self.session = requests.Session()
def contains_personal_data(self, text):
"""Check if text contains personal data"""
for pattern in self.personal_data_patterns:
if re.search(pattern, text):
return True
return False
def anonymize_data(self, text):
"""Anonymize personal data in text"""
# Replace emails
text = re.sub(
r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
'[EMAIL_REDACTED]',
text
)
# Replace phone numbers
text = re.sub(
r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b',
'[PHONE_REDACTED]',
text
)
# Replace potential SSN
text = re.sub(
r'\b\d{9}\b',
'[ID_REDACTED]',
text
)
# Replace credit card-like numbers
text = re.sub(
r'\b(?:\d{4}[-\s]?){3}\d{4}\b',
'[CC_REDACTED]',
text
)
return text
def scrape_with_privacy(self, url, anonymize=True):
"""Scrape with privacy protection"""
response = self.session.get(url)
if response.status_code == 200:
content = response.text
# Check for personal data
if self.contains_personal_data(content):
print("Warning: Personal data detected in content")
if anonymize:
content = self.anonymize_data(content)
print("Personal data has been anonymized")
else:
print("Consider anonymizing or not storing this data")
return content
return None
Distributed and Polite Scraping
Polite Scraping Architecture
import queue
import threading
from datetime import datetime, timedelta
import hashlib
class PoliteScrapeQueue:
"""Polite scraping queue with per-domain rate limiting"""
def __init__(self, default_delay=1.0, max_workers=5):
self.default_delay = default_delay
self.max_workers = max_workers
self.domain_queues = {}
self.domain_last_access = {}
self.domain_delays = {}
self.lock = threading.Lock()
self.workers = []
self.running = True
def add_url(self, url, priority=5, delay=None):
"""Add URL to appropriate domain queue"""
domain = urlparse(url).netloc
with self.lock:
if domain not in self.domain_queues:
self.domain_queues[domain] = queue.PriorityQueue()
self.domain_delays[domain] = delay or self.default_delay
self.domain_last_access[domain] = 0
# Add to queue with priority
self.domain_queues[domain].put((priority, url))
def get_next_url(self):
"""Get next URL respecting per-domain delays"""
with self.lock:
current_time = time.time()
best_domain = None
min_wait = float('inf')
# Find domain that can be accessed soonest
for domain, q in self.domain_queues.items():
if q.empty():
continue
last_access = self.domain_last_access[domain]
delay = self.domain_delays[domain]
time_since_access = current_time - last_access
if time_since_access >= delay:
# Can access now
best_domain = domain
break
else:
# Calculate wait time
wait_time = delay - time_since_access
if wait_time < min_wait:
min_wait = wait_time
best_domain = domain
if best_domain:
# Check if we need to wait
last_access = self.domain_last_access[best_domain]
delay = self.domain_delays[best_domain]
time_since_access = current_time - last_access
if time_since_access < delay:
wait_time = delay - time_since_access
time.sleep(wait_time)
# Get URL from queue
priority, url = self.domain_queues[best_domain].get()
self.domain_last_access[best_domain] = time.time()
return url
return None
class DistributedScraper:
"""Distributed scraping with coordination"""
def __init__(self, num_workers=5, cache_dir='cache'):
self.num_workers = num_workers
self.cache_dir = cache_dir
self.task_queue = queue.Queue()
self.result_queue = queue.Queue()
self.seen_urls = set()
self.lock = threading.Lock()
# Create cache directory
import os
os.makedirs(cache_dir, exist_ok=True)
def _get_cache_path(self, url):
"""Get cache file path for URL"""
url_hash = hashlib.md5(url.encode()).hexdigest()
return f"{self.cache_dir}/{url_hash}.cache"
def _is_cached(self, url, max_age_hours=24):
"""Check if URL is cached and fresh"""
cache_path = self._get_cache_path(url)
if os.path.exists(cache_path):
# Check age
file_time = datetime.fromtimestamp(os.path.getmtime(cache_path))
age = datetime.now() - file_time
if age < timedelta(hours=max_age_hours):
return True
return False
def _get_from_cache(self, url):
"""Get content from cache"""
cache_path = self._get_cache_path(url)
try:
with open(cache_path, 'r', encoding='utf-8') as f:
return f.read()
except:
return None
def _save_to_cache(self, url, content):
"""Save content to cache"""
cache_path = self._get_cache_path(url)
try:
with open(cache_path, 'w', encoding='utf-8') as f:
f.write(content)
except:
pass
def worker(self, worker_id):
"""Worker thread for scraping"""
session = requests.Session()
session.headers.update({
'User-Agent': f'DistributedScraper/1.0 (Worker-{worker_id})'
})
while True:
try:
# Get task from queue
task = self.task_queue.get(timeout=5)
if task is None:
break
url = task['url']
# Check cache first
if self._is_cached(url):
content = self._get_from_cache(url)
if content:
print(f"Worker {worker_id}: Using cached {url}")
self.result_queue.put({
'url': url,
'content': content,
'from_cache': True
})
continue
# Scrape URL
print(f"Worker {worker_id}: Scraping {url}")
try:
response = session.get(url, timeout=30)
response.raise_for_status()
content = response.text
# Save to cache
self._save_to_cache(url, content)
# Add to results
self.result_queue.put({
'url': url,
'content': content,
'from_cache': False
})
except Exception as e:
print(f"Worker {worker_id}: Error scraping {url}: {e}")
self.result_queue.put({
'url': url,
'error': str(e)
})
# Be polite
time.sleep(1)
except queue.Empty:
continue
except Exception as e:
print(f"Worker {worker_id}: Unexpected error: {e}")
def scrape_urls(self, urls):
"""Scrape multiple URLs with workers"""
# Add URLs to task queue
for url in urls:
with self.lock:
if url not in self.seen_urls:
self.seen_urls.add(url)
self.task_queue.put({'url': url})
# Start workers
workers = []
for i in range(self.num_workers):
t = threading.Thread(target=self.worker, args=(i,))
t.start()
workers.append(t)
# Wait for completion
self.task_queue.join()
# Stop workers
for _ in range(self.num_workers):
self.task_queue.put(None)
for t in workers:
t.join()
# Collect results
results = []
while not self.result_queue.empty():
results.append(self.result_queue.get())
return results
Monitoring and Alerts
import logging
from datetime import datetime, timedelta
import smtplib
from email.mime.text import MIMEText
class ScrapingMonitor:
"""Monitor scraping activities and send alerts"""
def __init__(self, alert_email=None):
self.alert_email = alert_email
self.stats = {
'total_requests': 0,
'successful_requests': 0,
'failed_requests': 0,
'rate_limits_hit': 0,
'robots_txt_blocks': 0,
'total_data_mb': 0,
'start_time': datetime.now()
}
self.alerts = []
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('scraping.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
def log_request(self, url, success=True, data_size=0, rate_limited=False, robots_blocked=False):
"""Log scraping request"""
self.stats['total_requests'] += 1
if success:
self.stats['successful_requests'] += 1
else:
self.stats['failed_requests'] += 1
if rate_limited:
self.stats['rate_limits_hit'] += 1
self.check_rate_limit_alert()
if robots_blocked:
self.stats['robots_txt_blocks'] += 1
self.stats['total_data_mb'] += data_size / (1024 * 1024)
# Log
status = 'SUCCESS' if success else 'FAILED'
self.logger.info(f"{status}: {url} - Size: {data_size} bytes")
def check_rate_limit_alert(self):
"""Check if rate limit alerts should be sent"""
# Alert if hit rate limits too frequently
if self.stats['rate_limits_hit'] >= 10:
self.send_alert(
"High Rate Limit Hits",
f"Hit rate limits {self.stats['rate_limits_hit']} times"
)
self.stats['rate_limits_hit'] = 0 # Reset counter
def send_alert(self, subject, message):
"""Send alert email"""
alert = {
'timestamp': datetime.now(),
'subject': subject,
'message': message
}
self.alerts.append(alert)
self.logger.warning(f"ALERT: {subject} - {message}")
if self.alert_email:
# Send email alert
try:
msg = MIMEText(message)
msg['Subject'] = f"Scraping Alert: {subject}"
msg['From'] = 'scraper@example.com'
msg['To'] = self.alert_email
# Configure SMTP server
# smtp = smtplib.SMTP('localhost')
# smtp.send_message(msg)
# smtp.quit()
except Exception as e:
self.logger.error(f"Failed to send email alert: {e}")
def get_report(self):
"""Generate scraping report"""
runtime = datetime.now() - self.stats['start_time']
report = {
'runtime': str(runtime),
'total_requests': self.stats['total_requests'],
'success_rate': self.stats['successful_requests'] / max(self.stats['total_requests'], 1),
'failed_requests': self.stats['failed_requests'],
'rate_limits_hit': self.stats['rate_limits_hit'],
'robots_blocks': self.stats['robots_txt_blocks'],
'data_scraped_mb': self.stats['total_data_mb'],
'requests_per_minute': self.stats['total_requests'] / max(runtime.total_seconds() / 60, 1),
'alerts': self.alerts
}
return report
def print_summary(self):
"""Print summary statistics"""
report = self.get_report()
print("\n" + "="*50)
print("SCRAPING SUMMARY")
print("="*50)
print(f"Runtime: {report['runtime']}")
print(f"Total Requests: {report['total_requests']}")
print(f"Success Rate: {report['success_rate']:.2%}")
print(f"Failed Requests: {report['failed_requests']}")
print(f"Rate Limits Hit: {report['rate_limits_hit']}")
print(f"Robots.txt Blocks: {report['robots_blocks']}")
print(f"Data Scraped: {report['data_scraped_mb']:.2f} MB")
print(f"Requests/Minute: {report['requests_per_minute']:.2f}")
if report['alerts']:
print(f"\nAlerts: {len(report['alerts'])}")
for alert in report['alerts'][-5:]: # Show last 5 alerts
print(f" - {alert['timestamp']}: {alert['subject']}")
print("="*50)
Best Practices Summary
# Complete ethical scraping template
class BestPracticesScraper:
"""Template for ethical and responsible web scraping"""
def __init__(self, name="EthicalScraper", version="1.0"):
self.user_agent = f"{name}/{version} (Contact: your-email@example.com)"
self.session = requests.Session()
self.session.headers.update({'User-Agent': self.user_agent})
# Rate limiting
self.rate_limiter = TokenBucketRateLimiter(tokens_per_second=1)
# Retry strategy
self.retry_strategy = RetryStrategy(
max_retries=3,
backoff_type='exponential',
initial_delay=1.0
)
# Monitoring
self.monitor = ScrapingMonitor()
# Cache
self.cache = {}
# Best practices checklist
self.checklist = {
'robots_txt_checked': False,
'terms_of_service_reviewed': False,
'rate_limiting_enabled': True,
'user_agent_set': True,
'caching_enabled': True,
'monitoring_enabled': True,
'gdpr_compliant': False,
'error_handling': True
}
def pre_scrape_checklist(self, url):
"""Run pre-scraping checklist"""
print("Pre-Scraping Checklist:")
print("-" * 30)
# Check robots.txt
allowed, delay = self.check_robots_txt(url)
self.checklist['robots_txt_checked'] = True
print(f"β Robots.txt checked: {'Allowed' if allowed else 'Blocked'}")
# Check rate limiting
print(f"β Rate limiting: {self.rate_limiter.tokens_per_second} req/sec")
# User agent
print(f"β User-Agent: {self.user_agent}")
# Recommendations
if not allowed:
print("\nβ οΈ WARNING: Robots.txt prohibits scraping this URL")
print("Consider:")
print(" - Looking for an official API")
print(" - Contacting the website owner")
print(" - Finding alternative data sources")
return allowed
def scrape_responsibly(self, urls):
"""Main scraping method with all best practices"""
results = []
for url in urls:
# Pre-scrape checks
if not self.pre_scrape_checklist(url):
continue
# Check cache
if url in self.cache:
print(f"Using cached result for {url}")
results.append(self.cache[url])
continue
# Rate limiting
self.rate_limiter.wait_if_needed()
# Scrape with retry
try:
response = self.retry_strategy.execute(
self._make_request, url
)
# Cache result
self.cache[url] = response
results.append(response)
# Monitor
self.monitor.log_request(
url,
success=True,
data_size=len(response.content)
)
except Exception as e:
print(f"Failed to scrape {url}: {e}")
self.monitor.log_request(url, success=False)
# Print summary
self.monitor.print_summary()
return results
def _make_request(self, url):
"""Make HTTP request"""
response = self.session.get(url, timeout=30)
response.raise_for_status()
return response
# Usage
scraper = BestPracticesScraper()
urls = ['https://example.com/page1', 'https://example.com/page2']
results = scraper.scrape_responsibly(urls)
Practice Exercises
Exercise 1: Build a Rate-Limited API Client
Create a comprehensive API client that:
- Implements adaptive rate limiting
- Handles all rate limit response headers
- Uses exponential backoff with jitter
- Monitors and reports API usage
- Caches responses appropriately
Exercise 2: Ethical Scraper Framework
Build a framework that:
- Checks robots.txt and ToS automatically
- Implements per-domain rate limiting
- Detects and handles blocking
- Anonymizes personal data
- Generates compliance reports
Exercise 3: Distributed Scraping System
Create a distributed scraper that:
- Coordinates multiple workers
- Implements domain-based queuing
- Handles failures gracefully
- Provides real-time monitoring
- Respects rate limits across workers
Key Takeaways
- β±οΈ Always implement rate limiting to avoid overwhelming servers
- π Check and respect robots.txt and Terms of Service
- π Use exponential backoff with jitter for retries
- π€ Set descriptive User-Agent headers
- πΎ Cache responses to minimize requests
- π Monitor and log scraping activities
- π Handle personal data responsibly (GDPR compliance)
- βοΈ Consider legal and ethical implications
- π― Look for official APIs before scraping
- π€ Be a good citizen of the web