Skip to main content

Requests Library

HTTP for Humans - Master Web Requests! 🌐

Requests is the most popular Python library for making HTTP requests. With its elegant and simple API, you can interact with web services, APIs, and websites effortlessly. From simple GET requests to complex authentication flows, Requests makes HTTP feel Pythonic and intuitive.

Understanding HTTP

graph LR A[Client Python] --> B[HTTP Request] B --> C[Web Server] C --> D[Process Request] D --> E[HTTP Response] E --> A B --> F[Method: GET/POST/PUT/DELETE] B --> G[Headers] B --> H[Body/Payload] E --> I[Status Code] E --> J[Response Headers] E --> K[Response Body]

Installation and Basic Usage

# Installation
"""
pip install requests
"""

import requests
import json
from pprint import pprint
import time
from datetime import datetime

# Check version
print(f"Requests version: {requests.__version__}")

# Basic GET request
response = requests.get('https://httpbin.org/get')

# Check response
print(f"Status Code: {response.status_code}")
print(f"Response Type: {type(response)}")
print(f"Encoding: {response.encoding}")
print(f"Headers: {response.headers}")

# Different ways to access response content
print(response.text)      # As string
print(response.content)   # As bytes
print(response.json())    # Parse as JSON

# Response attributes
print(f"URL: {response.url}")
print(f"Status: {response.status_code}")
print(f"OK: {response.ok}")  # True if status < 400
print(f"Elapsed time: {response.elapsed}")
print(f"Cookies: {response.cookies}")
print(f"History: {response.history}")  # Redirects

HTTP Methods

GET Requests

# GET request with parameters
params = {
    'q': 'python requests',
    'limit': 10,
    'offset': 0
}

response = requests.get('https://httpbin.org/get', params=params)
print(response.url)  # Shows URL with parameters

# Multiple values for same parameter
params = {'category': ['python', 'data-science']}
response = requests.get('https://httpbin.org/get', params=params)

# URL encoding is handled automatically
params = {'search': 'python & data science'}
response = requests.get('https://httpbin.org/get', params=params)

# Working with query strings
from urllib.parse import urlencode, parse_qs

# Manual URL construction
base_url = 'https://api.example.com/search'
query_string = urlencode({'q': 'python', 'page': 1})
full_url = f"{base_url}?{query_string}"
print(full_url)

# Parse query string from URL
from urllib.parse import urlparse
parsed = urlparse(response.url)
query_params = parse_qs(parsed.query)
print(query_params)

POST Requests

# POST with form data
form_data = {
    'username': 'john_doe',
    'email': 'john@example.com',
    'age': 30
}

response = requests.post('https://httpbin.org/post', data=form_data)
print(response.json())

# POST with JSON data
json_data = {
    'name': 'Product A',
    'price': 29.99,
    'categories': ['electronics', 'gadgets'],
    'metadata': {
        'created_at': datetime.now().isoformat(),
        'updated_by': 'admin'
    }
}

# Method 1: Using json parameter
response = requests.post('https://httpbin.org/post', json=json_data)

# Method 2: Manual JSON encoding
headers = {'Content-Type': 'application/json'}
response = requests.post('https://httpbin.org/post', 
                        data=json.dumps(json_data),
                        headers=headers)

# File upload
files = {'file': open('data.csv', 'rb')}
response = requests.post('https://httpbin.org/post', files=files)
files['file'].close()

# Multiple files
files = [
    ('files', ('file1.txt', open('file1.txt', 'rb'), 'text/plain')),
    ('files', ('file2.txt', open('file2.txt', 'rb'), 'text/plain'))
]
response = requests.post('https://httpbin.org/post', files=files)

# Mixed form data and files
data = {'name': 'John', 'age': 30}
files = {'avatar': open('avatar.jpg', 'rb')}
response = requests.post('https://httpbin.org/post', data=data, files=files)

Other HTTP Methods

# PUT request (update entire resource)
update_data = {
    'id': 123,
    'name': 'Updated Product',
    'price': 39.99
}
response = requests.put('https://httpbin.org/put', json=update_data)

# PATCH request (partial update)
partial_update = {'price': 34.99}
response = requests.patch('https://httpbin.org/patch', json=partial_update)

# DELETE request
response = requests.delete('https://httpbin.org/delete', 
                          params={'id': 123})

# HEAD request (headers only, no body)
response = requests.head('https://httpbin.org/get')
print(response.headers)
print(response.text)  # Empty

# OPTIONS request (check allowed methods)
response = requests.options('https://httpbin.org/get')
print(response.headers.get('Allow'))

Headers and Authentication

Custom Headers

# Setting custom headers
headers = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
    'Accept': 'application/json',
    'Accept-Language': 'en-US,en;q=0.9',
    'Cache-Control': 'no-cache',
    'X-Custom-Header': 'CustomValue'
}

response = requests.get('https://httpbin.org/headers', headers=headers)
print(response.json())

# Common header patterns
def get_with_headers(url, auth_token=None):
    """Make request with common headers"""
    headers = {
        'User-Agent': 'Python Requests/2.28.0',
        'Accept': 'application/json',
        'Accept-Encoding': 'gzip, deflate',
    }
    
    if auth_token:
        headers['Authorization'] = f'Bearer {auth_token}'
    
    return requests.get(url, headers=headers)

# Response headers
response = requests.get('https://httpbin.org/get')
print(f"Content-Type: {response.headers.get('Content-Type')}")
print(f"Server: {response.headers.get('Server')}")
print(f"Date: {response.headers.get('Date')}")

# Case-insensitive header access
print(response.headers['content-type'])
print(response.headers['Content-Type'])
print(response.headers['CONTENT-TYPE'])  # All work the same

Authentication Methods

# Basic Authentication
from requests.auth import HTTPBasicAuth

# Method 1: Using auth parameter
response = requests.get('https://httpbin.org/basic-auth/user/pass',
                       auth=('user', 'pass'))

# Method 2: Using HTTPBasicAuth
response = requests.get('https://httpbin.org/basic-auth/user/pass',
                       auth=HTTPBasicAuth('user', 'pass'))

# Bearer Token Authentication
token = 'your-api-token-here'
headers = {'Authorization': f'Bearer {token}'}
response = requests.get('https://api.example.com/data', headers=headers)

# API Key Authentication
# Method 1: In header
headers = {'X-API-Key': 'your-api-key'}
response = requests.get('https://api.example.com/data', headers=headers)

# Method 2: In query parameter
params = {'api_key': 'your-api-key'}
response = requests.get('https://api.example.com/data', params=params)

# Custom Authentication Class
class CustomAuth(requests.auth.AuthBase):
    """Custom authentication handler"""
    def __init__(self, token):
        self.token = token
    
    def __call__(self, request):
        # Modify request headers
        request.headers['X-Custom-Auth'] = self.token
        return request

response = requests.get('https://api.example.com/data',
                       auth=CustomAuth('my-token'))

# OAuth 2.0 (using requests-oauthlib)
"""
pip install requests-oauthlib
"""
from requests_oauthlib import OAuth2Session

client_id = 'your-client-id'
client_secret = 'your-client-secret'
redirect_uri = 'http://localhost:8080/callback'
authorization_base_url = 'https://github.com/login/oauth/authorize'
token_url = 'https://github.com/login/oauth/access_token'

# OAuth2 flow
github = OAuth2Session(client_id, redirect_uri=redirect_uri)
authorization_url, state = github.authorization_url(authorization_base_url)

Sessions and Cookies

Using Sessions

# Sessions maintain state across requests
session = requests.Session()

# Set default headers for all requests in session
session.headers.update({
    'User-Agent': 'Python Bot 1.0',
    'Accept': 'application/json'
})

# Cookies persist across requests in same session
session.get('https://httpbin.org/cookies/set/session_id/12345')
response = session.get('https://httpbin.org/cookies')
print(response.json())  # Shows the cookie

# Session with authentication
session.auth = ('user', 'pass')
response = session.get('https://httpbin.org/basic-auth/user/pass')

# Context manager for automatic cleanup
with requests.Session() as session:
    session.get('https://httpbin.org/cookies/set/temp/value')
    response = session.get('https://httpbin.org/cookies')
    print(response.json())

# Advanced session configuration
session = requests.Session()
session.mount('https://', requests.adapters.HTTPAdapter(max_retries=3))
session.timeout = 30  # Default timeout for all requests

# Connection pooling with session
class APIClient:
    def __init__(self, base_url, api_key):
        self.base_url = base_url
        self.session = requests.Session()
        self.session.headers.update({
            'X-API-Key': api_key,
            'Accept': 'application/json'
        })
    
    def get(self, endpoint, **kwargs):
        url = f"{self.base_url}/{endpoint}"
        return self.session.get(url, **kwargs)
    
    def post(self, endpoint, **kwargs):
        url = f"{self.base_url}/{endpoint}"
        return self.session.post(url, **kwargs)
    
    def close(self):
        self.session.close()

# Usage
client = APIClient('https://api.example.com', 'your-api-key')
response = client.get('users', params={'page': 1})
client.close()

Cookie Management

# Working with cookies
import requests
from http.cookiejar import CookieJar

# Send cookies with request
cookies = {'session_id': 'abc123', 'user_pref': 'dark_mode'}
response = requests.get('https://httpbin.org/cookies', cookies=cookies)

# Access response cookies
response = requests.get('https://httpbin.org/cookies/set/new_cookie/value123')
print(response.cookies['new_cookie'])

# Cookie jar for advanced management
jar = CookieJar()
session = requests.Session()
session.cookies = jar

# Save and load cookies
import pickle

# Save cookies to file
with open('cookies.pkl', 'wb') as f:
    pickle.dump(session.cookies, f)

# Load cookies from file
with open('cookies.pkl', 'rb') as f:
    cookies = pickle.load(f)
    session.cookies.update(cookies)

# Extract cookies as dict
cookies_dict = requests.utils.dict_from_cookiejar(session.cookies)
print(cookies_dict)

# Create cookie jar from dict
from requests.utils import cookiejar_from_dict
new_jar = cookiejar_from_dict(cookies_dict)

Error Handling and Timeouts

Handling Response Errors

# Check response status
response = requests.get('https://httpbin.org/status/404')

# Method 1: Check status_code
if response.status_code == 200:
    print("Success!")
elif response.status_code == 404:
    print("Not found!")
else:
    print(f"Error: {response.status_code}")

# Method 2: Use response.ok
if response.ok:  # True if status_code < 400
    data = response.json()
else:
    print(f"Request failed: {response.status_code}")

# Method 3: Raise exception for bad status
try:
    response = requests.get('https://httpbin.org/status/500')
    response.raise_for_status()  # Raises HTTPError for bad status
    data = response.json()
except requests.exceptions.HTTPError as e:
    print(f"HTTP Error: {e}")
except requests.exceptions.RequestException as e:
    print(f"Request Error: {e}")

# Complete error handling
def safe_request(url, **kwargs):
    """Make request with comprehensive error handling"""
    try:
        response = requests.get(url, **kwargs)
        response.raise_for_status()
        return response
    
    except requests.exceptions.Timeout:
        print(f"Request timed out for {url}")
    except requests.exceptions.TooManyRedirects:
        print(f"Too many redirects for {url}")
    except requests.exceptions.ConnectionError:
        print(f"Connection error for {url}")
    except requests.exceptions.HTTPError as e:
        print(f"HTTP {e.response.status_code} error for {url}")
    except requests.exceptions.RequestException as e:
        print(f"Request exception for {url}: {e}")
    except Exception as e:
        print(f"Unexpected error: {e}")
    
    return None

Timeouts and Retries

# Set timeout for requests
# Timeout as single value (both connect and read)
response = requests.get('https://httpbin.org/delay/1', timeout=5)

# Separate connect and read timeouts
response = requests.get('https://httpbin.org/delay/1', 
                       timeout=(3.05, 27))  # (connect, read)

# No timeout (wait forever) - NOT RECOMMENDED
response = requests.get('https://httpbin.org/delay/1', timeout=None)

# Retry logic with exponential backoff
from urllib3.util.retry import Retry
from requests.adapters import HTTPAdapter

def create_session_with_retries(
    retries=3,
    backoff_factor=0.3,
    status_forcelist=(500, 502, 504)
):
    """Create session with automatic retries"""
    session = requests.Session()
    
    retry = Retry(
        total=retries,
        read=retries,
        connect=retries,
        backoff_factor=backoff_factor,
        status_forcelist=status_forcelist,
        allowed_methods=["HEAD", "GET", "PUT", "DELETE", "OPTIONS", "TRACE"]
    )
    
    adapter = HTTPAdapter(max_retries=retry)
    session.mount('http://', adapter)
    session.mount('https://', adapter)
    
    return session

# Use session with retries
session = create_session_with_retries()
response = session.get('https://httpbin.org/status/500')  # Will retry

# Custom retry logic
def retry_request(url, max_retries=3, delay=1, backoff=2):
    """Custom retry implementation"""
    for attempt in range(max_retries):
        try:
            response = requests.get(url, timeout=10)
            response.raise_for_status()
            return response
        
        except (requests.exceptions.RequestException, 
                requests.exceptions.HTTPError) as e:
            if attempt == max_retries - 1:
                raise
            
            wait_time = delay * (backoff ** attempt)
            print(f"Attempt {attempt + 1} failed. Retrying in {wait_time}s...")
            time.sleep(wait_time)
    
    return None

Advanced Features

Streaming Responses

# Stream large responses
response = requests.get('https://httpbin.org/stream/100', stream=True)

# Process line by line
for line in response.iter_lines():
    if line:
        data = json.loads(line)
        print(data)

# Download large file in chunks
def download_file(url, filename, chunk_size=8192):
    """Download file with progress tracking"""
    with requests.get(url, stream=True) as response:
        response.raise_for_status()
        
        total_size = int(response.headers.get('content-length', 0))
        downloaded = 0
        
        with open(filename, 'wb') as file:
            for chunk in response.iter_content(chunk_size=chunk_size):
                if chunk:
                    file.write(chunk)
                    downloaded += len(chunk)
                    
                    if total_size > 0:
                        progress = (downloaded / total_size) * 100
                        print(f"Progress: {progress:.2f}%", end='\r')
        
        print(f"\nDownloaded {filename}")

# Stream JSON data
def stream_json_array(url):
    """Stream large JSON array"""
    with requests.get(url, stream=True) as response:
        response.raise_for_status()
        
        parser = ijson.items(response.raw, 'item')
        for item in parser:
            yield item

# Server-Sent Events (SSE)
def stream_sse(url):
    """Stream Server-Sent Events"""
    response = requests.get(url, stream=True)
    
    for line in response.iter_lines():
        if line:
            line = line.decode('utf-8')
            if line.startswith('data: '):
                data = line[6:]  # Remove 'data: ' prefix
                yield json.loads(data)

Async Requests with Threading

import concurrent.futures
import threading
from queue import Queue

# Parallel requests using ThreadPoolExecutor
def fetch_url(url):
    """Fetch single URL"""
    try:
        response = requests.get(url, timeout=10)
        response.raise_for_status()
        return {'url': url, 'status': response.status_code, 'size': len(response.content)}
    except Exception as e:
        return {'url': url, 'error': str(e)}

urls = [
    'https://httpbin.org/delay/1',
    'https://httpbin.org/delay/2',
    'https://httpbin.org/delay/3',
    'https://httpbin.org/get',
    'https://httpbin.org/post'
]

# Fetch URLs in parallel
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    results = list(executor.map(fetch_url, urls))

for result in results:
    print(result)

# Async pattern with queue
class AsyncRequester:
    def __init__(self, num_workers=5):
        self.queue = Queue()
        self.results = []
        self.num_workers = num_workers
    
    def worker(self):
        """Worker thread function"""
        while True:
            url = self.queue.get()
            if url is None:
                break
            
            result = fetch_url(url)
            self.results.append(result)
            self.queue.task_done()
    
    def fetch_all(self, urls):
        """Fetch all URLs asynchronously"""
        # Start worker threads
        threads = []
        for _ in range(self.num_workers):
            t = threading.Thread(target=self.worker)
            t.start()
            threads.append(t)
        
        # Add URLs to queue
        for url in urls:
            self.queue.put(url)
        
        # Wait for completion
        self.queue.join()
        
        # Stop workers
        for _ in range(self.num_workers):
            self.queue.put(None)
        
        for t in threads:
            t.join()
        
        return self.results

# Usage
requester = AsyncRequester(num_workers=5)
results = requester.fetch_all(urls)

Working with APIs

REST API Client

class RESTClient:
    """Generic REST API client"""
    
    def __init__(self, base_url, auth_token=None):
        self.base_url = base_url.rstrip('/')
        self.session = requests.Session()
        
        # Default headers
        self.session.headers.update({
            'Accept': 'application/json',
            'Content-Type': 'application/json'
        })
        
        if auth_token:
            self.session.headers['Authorization'] = f'Bearer {auth_token}'
    
    def _request(self, method, endpoint, **kwargs):
        """Make HTTP request"""
        url = f"{self.base_url}/{endpoint.lstrip('/')}"
        
        # Set default timeout if not provided
        kwargs.setdefault('timeout', 30)
        
        response = self.session.request(method, url, **kwargs)
        response.raise_for_status()
        
        # Return JSON if possible, otherwise return response
        try:
            return response.json()
        except ValueError:
            return response.text
    
    def get(self, endpoint, params=None):
        """GET request"""
        return self._request('GET', endpoint, params=params)
    
    def post(self, endpoint, data=None):
        """POST request"""
        return self._request('POST', endpoint, json=data)
    
    def put(self, endpoint, data=None):
        """PUT request"""
        return self._request('PUT', endpoint, json=data)
    
    def patch(self, endpoint, data=None):
        """PATCH request"""
        return self._request('PATCH', endpoint, json=data)
    
    def delete(self, endpoint):
        """DELETE request"""
        return self._request('DELETE', endpoint)

# Example: GitHub API client
class GitHubClient(RESTClient):
    """GitHub API client"""
    
    def __init__(self, token=None):
        super().__init__('https://api.github.com', token)
        self.session.headers['Accept'] = 'application/vnd.github.v3+json'
    
    def get_user(self, username):
        """Get user information"""
        return self.get(f'/users/{username}')
    
    def get_repos(self, username):
        """Get user repositories"""
        return self.get(f'/users/{username}/repos')
    
    def create_gist(self, description, files):
        """Create a new gist"""
        data = {
            'description': description,
            'public': True,
            'files': files
        }
        return self.post('/gists', data)

# Usage
client = GitHubClient()
user = client.get_user('octocat')
repos = client.get_repos('octocat')

Pagination Handling

class PaginatedAPI:
    """Handle paginated API responses"""
    
    def __init__(self, base_url):
        self.base_url = base_url
        self.session = requests.Session()
    
    def fetch_all_pages(self, endpoint, method='offset', **kwargs):
        """Fetch all pages of paginated data"""
        all_data = []
        
        if method == 'offset':
            return self._fetch_offset_pagination(endpoint, **kwargs)
        elif method == 'cursor':
            return self._fetch_cursor_pagination(endpoint, **kwargs)
        elif method == 'link':
            return self._fetch_link_pagination(endpoint, **kwargs)
        else:
            raise ValueError(f"Unknown pagination method: {method}")
    
    def _fetch_offset_pagination(self, endpoint, page_size=100):
        """Handle offset/limit pagination"""
        offset = 0
        all_data = []
        
        while True:
            params = {'offset': offset, 'limit': page_size}
            response = self.session.get(f"{self.base_url}/{endpoint}", 
                                       params=params)
            response.raise_for_status()
            
            data = response.json()
            items = data.get('items', data.get('results', []))
            
            if not items:
                break
            
            all_data.extend(items)
            offset += page_size
            
            # Check if we have all data
            total = data.get('total', data.get('count'))
            if total and offset >= total:
                break
            
            time.sleep(0.1)  # Rate limiting
        
        return all_data
    
    def _fetch_cursor_pagination(self, endpoint, page_size=100):
        """Handle cursor-based pagination"""
        all_data = []
        cursor = None
        
        while True:
            params = {'limit': page_size}
            if cursor:
                params['cursor'] = cursor
            
            response = self.session.get(f"{self.base_url}/{endpoint}", 
                                       params=params)
            response.raise_for_status()
            
            data = response.json()
            items = data.get('items', data.get('results', []))
            
            if not items:
                break
            
            all_data.extend(items)
            cursor = data.get('next_cursor')
            
            if not cursor:
                break
            
            time.sleep(0.1)
        
        return all_data
    
    def _fetch_link_pagination(self, endpoint):
        """Handle Link header pagination (GitHub style)"""
        all_data = []
        url = f"{self.base_url}/{endpoint}"
        
        while url:
            response = self.session.get(url)
            response.raise_for_status()
            
            all_data.extend(response.json())
            
            # Parse Link header
            links = self._parse_link_header(response.headers.get('Link', ''))
            url = links.get('next')
            
            time.sleep(0.1)
        
        return all_data
    
    def _parse_link_header(self, link_header):
        """Parse Link header for pagination"""
        links = {}
        
        if not link_header:
            return links
        
        for link in link_header.split(','):
            parts = link.split(';')
            if len(parts) == 2:
                url = parts[0].strip()[1:-1]  # Remove < and >
                rel = parts[1].split('=')[1].strip()[1:-1]  # Remove quotes
                links[rel] = url
        
        return links

Real-world Examples

Weather API Integration

# OpenWeatherMap API example
class WeatherAPI:
    """OpenWeatherMap API client"""
    
    def __init__(self, api_key):
        self.api_key = api_key
        self.base_url = 'https://api.openweathermap.org/data/2.5'
        self.session = requests.Session()
    
    def get_current_weather(self, city, country_code=None):
        """Get current weather for a city"""
        if country_code:
            q = f"{city},{country_code}"
        else:
            q = city
        
        params = {
            'q': q,
            'appid': self.api_key,
            'units': 'metric'
        }
        
        response = self.session.get(f"{self.base_url}/weather", 
                                   params=params)
        response.raise_for_status()
        
        data = response.json()
        
        return {
            'city': data['name'],
            'country': data['sys']['country'],
            'temperature': data['main']['temp'],
            'feels_like': data['main']['feels_like'],
            'humidity': data['main']['humidity'],
            'description': data['weather'][0]['description'],
            'wind_speed': data['wind']['speed'],
            'timestamp': datetime.fromtimestamp(data['dt'])
        }
    
    def get_forecast(self, city, days=5):
        """Get weather forecast"""
        params = {
            'q': city,
            'appid': self.api_key,
            'units': 'metric',
            'cnt': days * 8  # 8 forecasts per day (3-hour intervals)
        }
        
        response = self.session.get(f"{self.base_url}/forecast", 
                                   params=params)
        response.raise_for_status()
        
        data = response.json()
        forecasts = []
        
        for item in data['list']:
            forecasts.append({
                'datetime': datetime.fromtimestamp(item['dt']),
                'temperature': item['main']['temp'],
                'description': item['weather'][0]['description'],
                'precipitation': item.get('rain', {}).get('3h', 0)
            })
        
        return forecasts

# Usage
# weather_api = WeatherAPI('your-api-key')
# weather = weather_api.get_current_weather('London', 'UK')
# forecast = weather_api.get_forecast('London', days=3)

Data Collection Pipeline

import csv
import sqlite3
from datetime import datetime, timedelta

class DataCollector:
    """Automated data collection pipeline"""
    
    def __init__(self, db_path='data.db'):
        self.db_path = db_path
        self.session = requests.Session()
        self.session.headers['User-Agent'] = 'DataCollector/1.0'
        self._init_database()
    
    def _init_database(self):
        """Initialize database"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS collected_data (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                source TEXT,
                url TEXT,
                data TEXT,
                collected_at TIMESTAMP,
                status TEXT
            )
        ''')
        
        conn.commit()
        conn.close()
    
    def collect(self, sources):
        """Collect data from multiple sources"""
        results = []
        
        for source in sources:
            try:
                result = self._collect_single(source)
                results.append(result)
                self._save_result(result)
                
                # Rate limiting
                time.sleep(source.get('delay', 1))
                
            except Exception as e:
                print(f"Error collecting from {source['name']}: {e}")
                results.append({
                    'source': source['name'],
                    'status': 'error',
                    'error': str(e)
                })
        
        return results
    
    def _collect_single(self, source):
        """Collect from single source"""
        response = self.session.get(
            source['url'],
            params=source.get('params'),
            headers=source.get('headers', {}),
            timeout=source.get('timeout', 30)
        )
        response.raise_for_status()
        
        # Parse based on content type
        content_type = response.headers.get('content-type', '')
        
        if 'json' in content_type:
            data = response.json()
        elif 'xml' in content_type:
            data = response.text
            # Parse XML as needed
        else:
            data = response.text
        
        return {
            'source': source['name'],
            'url': response.url,
            'data': data,
            'collected_at': datetime.now(),
            'status': 'success'
        }
    
    def _save_result(self, result):
        """Save result to database"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute('''
            INSERT INTO collected_data (source, url, data, collected_at, status)
            VALUES (?, ?, ?, ?, ?)
        ''', (
            result['source'],
            result['url'],
            json.dumps(result['data']) if isinstance(result['data'], dict) else result['data'],
            result['collected_at'],
            result['status']
        ))
        
        conn.commit()
        conn.close()
    
    def export_to_csv(self, filename='collected_data.csv'):
        """Export collected data to CSV"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute('SELECT * FROM collected_data ORDER BY collected_at DESC')
        rows = cursor.fetchall()
        
        with open(filename, 'w', newline='') as csvfile:
            writer = csv.writer(csvfile)
            writer.writerow(['ID', 'Source', 'URL', 'Data', 'Collected At', 'Status'])
            writer.writerows(rows)
        
        conn.close()
        print(f"Exported {len(rows)} records to {filename}")

# Usage
collector = DataCollector()

sources = [
    {
        'name': 'API 1',
        'url': 'https://api.example.com/data',
        'params': {'limit': 100},
        'delay': 2
    },
    {
        'name': 'API 2',
        'url': 'https://another-api.com/feed',
        'headers': {'API-Key': 'secret'},
        'timeout': 10
    }
]

# results = collector.collect(sources)
# collector.export_to_csv()

Best Practices

# Best practices for using Requests

# 1. Always use sessions for multiple requests to same host
with requests.Session() as session:
    # Multiple requests share connection pool
    session.get('https://api.example.com/endpoint1')
    session.get('https://api.example.com/endpoint2')

# 2. Set timeouts on all requests
response = requests.get('https://api.example.com/data', timeout=30)

# 3. Use context managers for file uploads
with open('data.csv', 'rb') as f:
    files = {'file': f}
    response = requests.post('https://api.example.com/upload', files=files)

# 4. Handle errors gracefully
try:
    response = requests.get('https://api.example.com/data')
    response.raise_for_status()
    data = response.json()
except requests.exceptions.RequestException as e:
    print(f"Request failed: {e}")
    data = None

# 5. Respect rate limits
class RateLimiter:
    def __init__(self, calls_per_second=1):
        self.delay = 1.0 / calls_per_second
        self.last_call = 0
    
    def wait(self):
        elapsed = time.time() - self.last_call
        if elapsed < self.delay:
            time.sleep(self.delay - elapsed)
        self.last_call = time.time()

# 6. Log requests for debugging
import logging

logging.basicConfig(level=logging.DEBUG)
requests_log = logging.getLogger("requests.packages.urllib3")
requests_log.setLevel(logging.DEBUG)
requests_log.propagate = True

# 7. Validate SSL certificates (default, but worth mentioning)
response = requests.get('https://api.example.com', verify=True)

# 8. Use environment variables for sensitive data
import os

api_key = os.environ.get('API_KEY')
headers = {'Authorization': f'Bearer {api_key}'}

Practice Exercises

Exercise 1: Build an API Wrapper

Create a complete API wrapper for a public API:

  1. Implement all CRUD operations
  2. Handle authentication
  3. Implement pagination
  4. Add retry logic with exponential backoff
  5. Include comprehensive error handling

Exercise 2: Multi-Source Data Aggregator

Build a system that:

  1. Fetches data from multiple APIs concurrently
  2. Handles different authentication methods
  3. Normalizes data from different sources
  4. Implements caching to reduce API calls
  5. Exports aggregated data

Exercise 3: API Performance Monitor

Create a monitoring tool that:

  1. Tests API endpoints periodically
  2. Measures response times
  3. Checks for status codes
  4. Alerts on failures or slow responses
  5. Generates performance reports

Key Takeaways

Further Resources