Requests Library
HTTP for Humans - Master Web Requests! 🌐
Requests is the most popular Python library for making HTTP requests. With its elegant and simple API, you can interact with web services, APIs, and websites effortlessly. From simple GET requests to complex authentication flows, Requests makes HTTP feel Pythonic and intuitive.
Understanding HTTP
graph LR
A[Client Python] --> B[HTTP Request]
B --> C[Web Server]
C --> D[Process Request]
D --> E[HTTP Response]
E --> A
B --> F[Method: GET/POST/PUT/DELETE]
B --> G[Headers]
B --> H[Body/Payload]
E --> I[Status Code]
E --> J[Response Headers]
E --> K[Response Body]
Installation and Basic Usage
# Installation
"""
pip install requests
"""
import requests
import json
from pprint import pprint
import time
from datetime import datetime
# Check version
print(f"Requests version: {requests.__version__}")
# Basic GET request
response = requests.get('https://httpbin.org/get')
# Check response
print(f"Status Code: {response.status_code}")
print(f"Response Type: {type(response)}")
print(f"Encoding: {response.encoding}")
print(f"Headers: {response.headers}")
# Different ways to access response content
print(response.text) # As string
print(response.content) # As bytes
print(response.json()) # Parse as JSON
# Response attributes
print(f"URL: {response.url}")
print(f"Status: {response.status_code}")
print(f"OK: {response.ok}") # True if status < 400
print(f"Elapsed time: {response.elapsed}")
print(f"Cookies: {response.cookies}")
print(f"History: {response.history}") # Redirects
HTTP Methods
GET Requests
# GET request with parameters
params = {
'q': 'python requests',
'limit': 10,
'offset': 0
}
response = requests.get('https://httpbin.org/get', params=params)
print(response.url) # Shows URL with parameters
# Multiple values for same parameter
params = {'category': ['python', 'data-science']}
response = requests.get('https://httpbin.org/get', params=params)
# URL encoding is handled automatically
params = {'search': 'python & data science'}
response = requests.get('https://httpbin.org/get', params=params)
# Working with query strings
from urllib.parse import urlencode, parse_qs
# Manual URL construction
base_url = 'https://api.example.com/search'
query_string = urlencode({'q': 'python', 'page': 1})
full_url = f"{base_url}?{query_string}"
print(full_url)
# Parse query string from URL
from urllib.parse import urlparse
parsed = urlparse(response.url)
query_params = parse_qs(parsed.query)
print(query_params)
POST Requests
# POST with form data
form_data = {
'username': 'john_doe',
'email': 'john@example.com',
'age': 30
}
response = requests.post('https://httpbin.org/post', data=form_data)
print(response.json())
# POST with JSON data
json_data = {
'name': 'Product A',
'price': 29.99,
'categories': ['electronics', 'gadgets'],
'metadata': {
'created_at': datetime.now().isoformat(),
'updated_by': 'admin'
}
}
# Method 1: Using json parameter
response = requests.post('https://httpbin.org/post', json=json_data)
# Method 2: Manual JSON encoding
headers = {'Content-Type': 'application/json'}
response = requests.post('https://httpbin.org/post',
data=json.dumps(json_data),
headers=headers)
# File upload
files = {'file': open('data.csv', 'rb')}
response = requests.post('https://httpbin.org/post', files=files)
files['file'].close()
# Multiple files
files = [
('files', ('file1.txt', open('file1.txt', 'rb'), 'text/plain')),
('files', ('file2.txt', open('file2.txt', 'rb'), 'text/plain'))
]
response = requests.post('https://httpbin.org/post', files=files)
# Mixed form data and files
data = {'name': 'John', 'age': 30}
files = {'avatar': open('avatar.jpg', 'rb')}
response = requests.post('https://httpbin.org/post', data=data, files=files)
Other HTTP Methods
# PUT request (update entire resource)
update_data = {
'id': 123,
'name': 'Updated Product',
'price': 39.99
}
response = requests.put('https://httpbin.org/put', json=update_data)
# PATCH request (partial update)
partial_update = {'price': 34.99}
response = requests.patch('https://httpbin.org/patch', json=partial_update)
# DELETE request
response = requests.delete('https://httpbin.org/delete',
params={'id': 123})
# HEAD request (headers only, no body)
response = requests.head('https://httpbin.org/get')
print(response.headers)
print(response.text) # Empty
# OPTIONS request (check allowed methods)
response = requests.options('https://httpbin.org/get')
print(response.headers.get('Allow'))
Headers and Authentication
Custom Headers
# Setting custom headers
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'application/json',
'Accept-Language': 'en-US,en;q=0.9',
'Cache-Control': 'no-cache',
'X-Custom-Header': 'CustomValue'
}
response = requests.get('https://httpbin.org/headers', headers=headers)
print(response.json())
# Common header patterns
def get_with_headers(url, auth_token=None):
"""Make request with common headers"""
headers = {
'User-Agent': 'Python Requests/2.28.0',
'Accept': 'application/json',
'Accept-Encoding': 'gzip, deflate',
}
if auth_token:
headers['Authorization'] = f'Bearer {auth_token}'
return requests.get(url, headers=headers)
# Response headers
response = requests.get('https://httpbin.org/get')
print(f"Content-Type: {response.headers.get('Content-Type')}")
print(f"Server: {response.headers.get('Server')}")
print(f"Date: {response.headers.get('Date')}")
# Case-insensitive header access
print(response.headers['content-type'])
print(response.headers['Content-Type'])
print(response.headers['CONTENT-TYPE']) # All work the same
Authentication Methods
# Basic Authentication
from requests.auth import HTTPBasicAuth
# Method 1: Using auth parameter
response = requests.get('https://httpbin.org/basic-auth/user/pass',
auth=('user', 'pass'))
# Method 2: Using HTTPBasicAuth
response = requests.get('https://httpbin.org/basic-auth/user/pass',
auth=HTTPBasicAuth('user', 'pass'))
# Bearer Token Authentication
token = 'your-api-token-here'
headers = {'Authorization': f'Bearer {token}'}
response = requests.get('https://api.example.com/data', headers=headers)
# API Key Authentication
# Method 1: In header
headers = {'X-API-Key': 'your-api-key'}
response = requests.get('https://api.example.com/data', headers=headers)
# Method 2: In query parameter
params = {'api_key': 'your-api-key'}
response = requests.get('https://api.example.com/data', params=params)
# Custom Authentication Class
class CustomAuth(requests.auth.AuthBase):
"""Custom authentication handler"""
def __init__(self, token):
self.token = token
def __call__(self, request):
# Modify request headers
request.headers['X-Custom-Auth'] = self.token
return request
response = requests.get('https://api.example.com/data',
auth=CustomAuth('my-token'))
# OAuth 2.0 (using requests-oauthlib)
"""
pip install requests-oauthlib
"""
from requests_oauthlib import OAuth2Session
client_id = 'your-client-id'
client_secret = 'your-client-secret'
redirect_uri = 'http://localhost:8080/callback'
authorization_base_url = 'https://github.com/login/oauth/authorize'
token_url = 'https://github.com/login/oauth/access_token'
# OAuth2 flow
github = OAuth2Session(client_id, redirect_uri=redirect_uri)
authorization_url, state = github.authorization_url(authorization_base_url)
Sessions and Cookies
Using Sessions
# Sessions maintain state across requests
session = requests.Session()
# Set default headers for all requests in session
session.headers.update({
'User-Agent': 'Python Bot 1.0',
'Accept': 'application/json'
})
# Cookies persist across requests in same session
session.get('https://httpbin.org/cookies/set/session_id/12345')
response = session.get('https://httpbin.org/cookies')
print(response.json()) # Shows the cookie
# Session with authentication
session.auth = ('user', 'pass')
response = session.get('https://httpbin.org/basic-auth/user/pass')
# Context manager for automatic cleanup
with requests.Session() as session:
session.get('https://httpbin.org/cookies/set/temp/value')
response = session.get('https://httpbin.org/cookies')
print(response.json())
# Advanced session configuration
session = requests.Session()
session.mount('https://', requests.adapters.HTTPAdapter(max_retries=3))
session.timeout = 30 # Default timeout for all requests
# Connection pooling with session
class APIClient:
def __init__(self, base_url, api_key):
self.base_url = base_url
self.session = requests.Session()
self.session.headers.update({
'X-API-Key': api_key,
'Accept': 'application/json'
})
def get(self, endpoint, **kwargs):
url = f"{self.base_url}/{endpoint}"
return self.session.get(url, **kwargs)
def post(self, endpoint, **kwargs):
url = f"{self.base_url}/{endpoint}"
return self.session.post(url, **kwargs)
def close(self):
self.session.close()
# Usage
client = APIClient('https://api.example.com', 'your-api-key')
response = client.get('users', params={'page': 1})
client.close()
Cookie Management
# Working with cookies
import requests
from http.cookiejar import CookieJar
# Send cookies with request
cookies = {'session_id': 'abc123', 'user_pref': 'dark_mode'}
response = requests.get('https://httpbin.org/cookies', cookies=cookies)
# Access response cookies
response = requests.get('https://httpbin.org/cookies/set/new_cookie/value123')
print(response.cookies['new_cookie'])
# Cookie jar for advanced management
jar = CookieJar()
session = requests.Session()
session.cookies = jar
# Save and load cookies
import pickle
# Save cookies to file
with open('cookies.pkl', 'wb') as f:
pickle.dump(session.cookies, f)
# Load cookies from file
with open('cookies.pkl', 'rb') as f:
cookies = pickle.load(f)
session.cookies.update(cookies)
# Extract cookies as dict
cookies_dict = requests.utils.dict_from_cookiejar(session.cookies)
print(cookies_dict)
# Create cookie jar from dict
from requests.utils import cookiejar_from_dict
new_jar = cookiejar_from_dict(cookies_dict)
Error Handling and Timeouts
Handling Response Errors
# Check response status
response = requests.get('https://httpbin.org/status/404')
# Method 1: Check status_code
if response.status_code == 200:
print("Success!")
elif response.status_code == 404:
print("Not found!")
else:
print(f"Error: {response.status_code}")
# Method 2: Use response.ok
if response.ok: # True if status_code < 400
data = response.json()
else:
print(f"Request failed: {response.status_code}")
# Method 3: Raise exception for bad status
try:
response = requests.get('https://httpbin.org/status/500')
response.raise_for_status() # Raises HTTPError for bad status
data = response.json()
except requests.exceptions.HTTPError as e:
print(f"HTTP Error: {e}")
except requests.exceptions.RequestException as e:
print(f"Request Error: {e}")
# Complete error handling
def safe_request(url, **kwargs):
"""Make request with comprehensive error handling"""
try:
response = requests.get(url, **kwargs)
response.raise_for_status()
return response
except requests.exceptions.Timeout:
print(f"Request timed out for {url}")
except requests.exceptions.TooManyRedirects:
print(f"Too many redirects for {url}")
except requests.exceptions.ConnectionError:
print(f"Connection error for {url}")
except requests.exceptions.HTTPError as e:
print(f"HTTP {e.response.status_code} error for {url}")
except requests.exceptions.RequestException as e:
print(f"Request exception for {url}: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
return None
Timeouts and Retries
# Set timeout for requests
# Timeout as single value (both connect and read)
response = requests.get('https://httpbin.org/delay/1', timeout=5)
# Separate connect and read timeouts
response = requests.get('https://httpbin.org/delay/1',
timeout=(3.05, 27)) # (connect, read)
# No timeout (wait forever) - NOT RECOMMENDED
response = requests.get('https://httpbin.org/delay/1', timeout=None)
# Retry logic with exponential backoff
from urllib3.util.retry import Retry
from requests.adapters import HTTPAdapter
def create_session_with_retries(
retries=3,
backoff_factor=0.3,
status_forcelist=(500, 502, 504)
):
"""Create session with automatic retries"""
session = requests.Session()
retry = Retry(
total=retries,
read=retries,
connect=retries,
backoff_factor=backoff_factor,
status_forcelist=status_forcelist,
allowed_methods=["HEAD", "GET", "PUT", "DELETE", "OPTIONS", "TRACE"]
)
adapter = HTTPAdapter(max_retries=retry)
session.mount('http://', adapter)
session.mount('https://', adapter)
return session
# Use session with retries
session = create_session_with_retries()
response = session.get('https://httpbin.org/status/500') # Will retry
# Custom retry logic
def retry_request(url, max_retries=3, delay=1, backoff=2):
"""Custom retry implementation"""
for attempt in range(max_retries):
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
return response
except (requests.exceptions.RequestException,
requests.exceptions.HTTPError) as e:
if attempt == max_retries - 1:
raise
wait_time = delay * (backoff ** attempt)
print(f"Attempt {attempt + 1} failed. Retrying in {wait_time}s...")
time.sleep(wait_time)
return None
Advanced Features
Streaming Responses
# Stream large responses
response = requests.get('https://httpbin.org/stream/100', stream=True)
# Process line by line
for line in response.iter_lines():
if line:
data = json.loads(line)
print(data)
# Download large file in chunks
def download_file(url, filename, chunk_size=8192):
"""Download file with progress tracking"""
with requests.get(url, stream=True) as response:
response.raise_for_status()
total_size = int(response.headers.get('content-length', 0))
downloaded = 0
with open(filename, 'wb') as file:
for chunk in response.iter_content(chunk_size=chunk_size):
if chunk:
file.write(chunk)
downloaded += len(chunk)
if total_size > 0:
progress = (downloaded / total_size) * 100
print(f"Progress: {progress:.2f}%", end='\r')
print(f"\nDownloaded {filename}")
# Stream JSON data
def stream_json_array(url):
"""Stream large JSON array"""
with requests.get(url, stream=True) as response:
response.raise_for_status()
parser = ijson.items(response.raw, 'item')
for item in parser:
yield item
# Server-Sent Events (SSE)
def stream_sse(url):
"""Stream Server-Sent Events"""
response = requests.get(url, stream=True)
for line in response.iter_lines():
if line:
line = line.decode('utf-8')
if line.startswith('data: '):
data = line[6:] # Remove 'data: ' prefix
yield json.loads(data)
Async Requests with Threading
import concurrent.futures
import threading
from queue import Queue
# Parallel requests using ThreadPoolExecutor
def fetch_url(url):
"""Fetch single URL"""
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
return {'url': url, 'status': response.status_code, 'size': len(response.content)}
except Exception as e:
return {'url': url, 'error': str(e)}
urls = [
'https://httpbin.org/delay/1',
'https://httpbin.org/delay/2',
'https://httpbin.org/delay/3',
'https://httpbin.org/get',
'https://httpbin.org/post'
]
# Fetch URLs in parallel
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
results = list(executor.map(fetch_url, urls))
for result in results:
print(result)
# Async pattern with queue
class AsyncRequester:
def __init__(self, num_workers=5):
self.queue = Queue()
self.results = []
self.num_workers = num_workers
def worker(self):
"""Worker thread function"""
while True:
url = self.queue.get()
if url is None:
break
result = fetch_url(url)
self.results.append(result)
self.queue.task_done()
def fetch_all(self, urls):
"""Fetch all URLs asynchronously"""
# Start worker threads
threads = []
for _ in range(self.num_workers):
t = threading.Thread(target=self.worker)
t.start()
threads.append(t)
# Add URLs to queue
for url in urls:
self.queue.put(url)
# Wait for completion
self.queue.join()
# Stop workers
for _ in range(self.num_workers):
self.queue.put(None)
for t in threads:
t.join()
return self.results
# Usage
requester = AsyncRequester(num_workers=5)
results = requester.fetch_all(urls)
Working with APIs
REST API Client
class RESTClient:
"""Generic REST API client"""
def __init__(self, base_url, auth_token=None):
self.base_url = base_url.rstrip('/')
self.session = requests.Session()
# Default headers
self.session.headers.update({
'Accept': 'application/json',
'Content-Type': 'application/json'
})
if auth_token:
self.session.headers['Authorization'] = f'Bearer {auth_token}'
def _request(self, method, endpoint, **kwargs):
"""Make HTTP request"""
url = f"{self.base_url}/{endpoint.lstrip('/')}"
# Set default timeout if not provided
kwargs.setdefault('timeout', 30)
response = self.session.request(method, url, **kwargs)
response.raise_for_status()
# Return JSON if possible, otherwise return response
try:
return response.json()
except ValueError:
return response.text
def get(self, endpoint, params=None):
"""GET request"""
return self._request('GET', endpoint, params=params)
def post(self, endpoint, data=None):
"""POST request"""
return self._request('POST', endpoint, json=data)
def put(self, endpoint, data=None):
"""PUT request"""
return self._request('PUT', endpoint, json=data)
def patch(self, endpoint, data=None):
"""PATCH request"""
return self._request('PATCH', endpoint, json=data)
def delete(self, endpoint):
"""DELETE request"""
return self._request('DELETE', endpoint)
# Example: GitHub API client
class GitHubClient(RESTClient):
"""GitHub API client"""
def __init__(self, token=None):
super().__init__('https://api.github.com', token)
self.session.headers['Accept'] = 'application/vnd.github.v3+json'
def get_user(self, username):
"""Get user information"""
return self.get(f'/users/{username}')
def get_repos(self, username):
"""Get user repositories"""
return self.get(f'/users/{username}/repos')
def create_gist(self, description, files):
"""Create a new gist"""
data = {
'description': description,
'public': True,
'files': files
}
return self.post('/gists', data)
# Usage
client = GitHubClient()
user = client.get_user('octocat')
repos = client.get_repos('octocat')
Pagination Handling
class PaginatedAPI:
"""Handle paginated API responses"""
def __init__(self, base_url):
self.base_url = base_url
self.session = requests.Session()
def fetch_all_pages(self, endpoint, method='offset', **kwargs):
"""Fetch all pages of paginated data"""
all_data = []
if method == 'offset':
return self._fetch_offset_pagination(endpoint, **kwargs)
elif method == 'cursor':
return self._fetch_cursor_pagination(endpoint, **kwargs)
elif method == 'link':
return self._fetch_link_pagination(endpoint, **kwargs)
else:
raise ValueError(f"Unknown pagination method: {method}")
def _fetch_offset_pagination(self, endpoint, page_size=100):
"""Handle offset/limit pagination"""
offset = 0
all_data = []
while True:
params = {'offset': offset, 'limit': page_size}
response = self.session.get(f"{self.base_url}/{endpoint}",
params=params)
response.raise_for_status()
data = response.json()
items = data.get('items', data.get('results', []))
if not items:
break
all_data.extend(items)
offset += page_size
# Check if we have all data
total = data.get('total', data.get('count'))
if total and offset >= total:
break
time.sleep(0.1) # Rate limiting
return all_data
def _fetch_cursor_pagination(self, endpoint, page_size=100):
"""Handle cursor-based pagination"""
all_data = []
cursor = None
while True:
params = {'limit': page_size}
if cursor:
params['cursor'] = cursor
response = self.session.get(f"{self.base_url}/{endpoint}",
params=params)
response.raise_for_status()
data = response.json()
items = data.get('items', data.get('results', []))
if not items:
break
all_data.extend(items)
cursor = data.get('next_cursor')
if not cursor:
break
time.sleep(0.1)
return all_data
def _fetch_link_pagination(self, endpoint):
"""Handle Link header pagination (GitHub style)"""
all_data = []
url = f"{self.base_url}/{endpoint}"
while url:
response = self.session.get(url)
response.raise_for_status()
all_data.extend(response.json())
# Parse Link header
links = self._parse_link_header(response.headers.get('Link', ''))
url = links.get('next')
time.sleep(0.1)
return all_data
def _parse_link_header(self, link_header):
"""Parse Link header for pagination"""
links = {}
if not link_header:
return links
for link in link_header.split(','):
parts = link.split(';')
if len(parts) == 2:
url = parts[0].strip()[1:-1] # Remove < and >
rel = parts[1].split('=')[1].strip()[1:-1] # Remove quotes
links[rel] = url
return links
Real-world Examples
Weather API Integration
# OpenWeatherMap API example
class WeatherAPI:
"""OpenWeatherMap API client"""
def __init__(self, api_key):
self.api_key = api_key
self.base_url = 'https://api.openweathermap.org/data/2.5'
self.session = requests.Session()
def get_current_weather(self, city, country_code=None):
"""Get current weather for a city"""
if country_code:
q = f"{city},{country_code}"
else:
q = city
params = {
'q': q,
'appid': self.api_key,
'units': 'metric'
}
response = self.session.get(f"{self.base_url}/weather",
params=params)
response.raise_for_status()
data = response.json()
return {
'city': data['name'],
'country': data['sys']['country'],
'temperature': data['main']['temp'],
'feels_like': data['main']['feels_like'],
'humidity': data['main']['humidity'],
'description': data['weather'][0]['description'],
'wind_speed': data['wind']['speed'],
'timestamp': datetime.fromtimestamp(data['dt'])
}
def get_forecast(self, city, days=5):
"""Get weather forecast"""
params = {
'q': city,
'appid': self.api_key,
'units': 'metric',
'cnt': days * 8 # 8 forecasts per day (3-hour intervals)
}
response = self.session.get(f"{self.base_url}/forecast",
params=params)
response.raise_for_status()
data = response.json()
forecasts = []
for item in data['list']:
forecasts.append({
'datetime': datetime.fromtimestamp(item['dt']),
'temperature': item['main']['temp'],
'description': item['weather'][0]['description'],
'precipitation': item.get('rain', {}).get('3h', 0)
})
return forecasts
# Usage
# weather_api = WeatherAPI('your-api-key')
# weather = weather_api.get_current_weather('London', 'UK')
# forecast = weather_api.get_forecast('London', days=3)
Data Collection Pipeline
import csv
import sqlite3
from datetime import datetime, timedelta
class DataCollector:
"""Automated data collection pipeline"""
def __init__(self, db_path='data.db'):
self.db_path = db_path
self.session = requests.Session()
self.session.headers['User-Agent'] = 'DataCollector/1.0'
self._init_database()
def _init_database(self):
"""Initialize database"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS collected_data (
id INTEGER PRIMARY KEY AUTOINCREMENT,
source TEXT,
url TEXT,
data TEXT,
collected_at TIMESTAMP,
status TEXT
)
''')
conn.commit()
conn.close()
def collect(self, sources):
"""Collect data from multiple sources"""
results = []
for source in sources:
try:
result = self._collect_single(source)
results.append(result)
self._save_result(result)
# Rate limiting
time.sleep(source.get('delay', 1))
except Exception as e:
print(f"Error collecting from {source['name']}: {e}")
results.append({
'source': source['name'],
'status': 'error',
'error': str(e)
})
return results
def _collect_single(self, source):
"""Collect from single source"""
response = self.session.get(
source['url'],
params=source.get('params'),
headers=source.get('headers', {}),
timeout=source.get('timeout', 30)
)
response.raise_for_status()
# Parse based on content type
content_type = response.headers.get('content-type', '')
if 'json' in content_type:
data = response.json()
elif 'xml' in content_type:
data = response.text
# Parse XML as needed
else:
data = response.text
return {
'source': source['name'],
'url': response.url,
'data': data,
'collected_at': datetime.now(),
'status': 'success'
}
def _save_result(self, result):
"""Save result to database"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO collected_data (source, url, data, collected_at, status)
VALUES (?, ?, ?, ?, ?)
''', (
result['source'],
result['url'],
json.dumps(result['data']) if isinstance(result['data'], dict) else result['data'],
result['collected_at'],
result['status']
))
conn.commit()
conn.close()
def export_to_csv(self, filename='collected_data.csv'):
"""Export collected data to CSV"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('SELECT * FROM collected_data ORDER BY collected_at DESC')
rows = cursor.fetchall()
with open(filename, 'w', newline='') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(['ID', 'Source', 'URL', 'Data', 'Collected At', 'Status'])
writer.writerows(rows)
conn.close()
print(f"Exported {len(rows)} records to {filename}")
# Usage
collector = DataCollector()
sources = [
{
'name': 'API 1',
'url': 'https://api.example.com/data',
'params': {'limit': 100},
'delay': 2
},
{
'name': 'API 2',
'url': 'https://another-api.com/feed',
'headers': {'API-Key': 'secret'},
'timeout': 10
}
]
# results = collector.collect(sources)
# collector.export_to_csv()
Best Practices
# Best practices for using Requests
# 1. Always use sessions for multiple requests to same host
with requests.Session() as session:
# Multiple requests share connection pool
session.get('https://api.example.com/endpoint1')
session.get('https://api.example.com/endpoint2')
# 2. Set timeouts on all requests
response = requests.get('https://api.example.com/data', timeout=30)
# 3. Use context managers for file uploads
with open('data.csv', 'rb') as f:
files = {'file': f}
response = requests.post('https://api.example.com/upload', files=files)
# 4. Handle errors gracefully
try:
response = requests.get('https://api.example.com/data')
response.raise_for_status()
data = response.json()
except requests.exceptions.RequestException as e:
print(f"Request failed: {e}")
data = None
# 5. Respect rate limits
class RateLimiter:
def __init__(self, calls_per_second=1):
self.delay = 1.0 / calls_per_second
self.last_call = 0
def wait(self):
elapsed = time.time() - self.last_call
if elapsed < self.delay:
time.sleep(self.delay - elapsed)
self.last_call = time.time()
# 6. Log requests for debugging
import logging
logging.basicConfig(level=logging.DEBUG)
requests_log = logging.getLogger("requests.packages.urllib3")
requests_log.setLevel(logging.DEBUG)
requests_log.propagate = True
# 7. Validate SSL certificates (default, but worth mentioning)
response = requests.get('https://api.example.com', verify=True)
# 8. Use environment variables for sensitive data
import os
api_key = os.environ.get('API_KEY')
headers = {'Authorization': f'Bearer {api_key}'}
Practice Exercises
Exercise 1: Build an API Wrapper
Create a complete API wrapper for a public API:
- Implement all CRUD operations
- Handle authentication
- Implement pagination
- Add retry logic with exponential backoff
- Include comprehensive error handling
Exercise 2: Multi-Source Data Aggregator
Build a system that:
- Fetches data from multiple APIs concurrently
- Handles different authentication methods
- Normalizes data from different sources
- Implements caching to reduce API calls
- Exports aggregated data
Exercise 3: API Performance Monitor
Create a monitoring tool that:
- Tests API endpoints periodically
- Measures response times
- Checks for status codes
- Alerts on failures or slow responses
- Generates performance reports
Key Takeaways
- 🌐 Requests makes HTTP simple and Pythonic
- 🔐 Multiple authentication methods supported
- 🍪 Sessions maintain state across requests
- ⚡ Connection pooling improves performance
- 🔄 Retry strategies handle transient failures
- 📊 Stream large responses to save memory
- ⚠️ Always handle errors and set timeouts