Skip to main content

API Authentication

Secure Your API Connections! 🔐

API authentication is the gateway to accessing protected resources and services. From simple API keys to complex OAuth flows, understanding authentication mechanisms is crucial for working with modern APIs. Master these patterns to securely integrate with any service, protect your credentials, and build robust data pipelines.

Authentication Methods Overview

graph TD A[API Authentication] --> B[API Keys] A --> C[Basic Auth] A --> D[Bearer Tokens] A --> E[OAuth 2.0] A --> F[JWT] A --> G[HMAC] B --> H[Header/Query/Body] C --> I[Username/Password] D --> J[Access Tokens] E --> K[Authorization Code] E --> L[Client Credentials] E --> M[Implicit] F --> N[Self-contained Tokens] G --> O[Signature-based]

Environment Setup and Security

Secure Credential Management

# NEVER hardcode credentials in your code!

import os
from dotenv import load_dotenv
import json
import keyring
from cryptography.fernet import Fernet
import base64

# Method 1: Environment Variables (.env file)
"""
Create a .env file:
API_KEY=your-secret-api-key
CLIENT_ID=your-client-id
CLIENT_SECRET=your-client-secret

Add .env to .gitignore!
"""

# Load environment variables
load_dotenv()

# Access credentials
api_key = os.getenv('API_KEY')
client_id = os.getenv('CLIENT_ID')
client_secret = os.getenv('CLIENT_SECRET')

# Method 2: Configuration Files
def load_config(config_file='config.json'):
    """Load configuration from JSON file"""
    try:
        with open(config_file, 'r') as f:
            config = json.load(f)
        return config
    except FileNotFoundError:
        print(f"Config file {config_file} not found")
        return {}

# Method 3: System Keyring
def store_credential(service, username, password):
    """Store credential in system keyring"""
    keyring.set_password(service, username, password)

def get_credential(service, username):
    """Retrieve credential from system keyring"""
    return keyring.get_password(service, username)

# Example usage
# store_credential('my_api', 'api_key', 'secret123')
# api_key = get_credential('my_api', 'api_key')

# Method 4: Encrypted Configuration
class EncryptedConfig:
    """Encrypted configuration manager"""
    
    def __init__(self, key=None):
        if key:
            self.cipher = Fernet(key)
        else:
            self.cipher = Fernet(Fernet.generate_key())
    
    def encrypt_config(self, config_dict):
        """Encrypt configuration dictionary"""
        json_string = json.dumps(config_dict)
        encrypted = self.cipher.encrypt(json_string.encode())
        return base64.b64encode(encrypted).decode()
    
    def decrypt_config(self, encrypted_string):
        """Decrypt configuration string"""
        encrypted = base64.b64decode(encrypted_string.encode())
        decrypted = self.cipher.decrypt(encrypted)
        return json.loads(decrypted.decode())

# Best practice: Configuration hierarchy
class ConfigManager:
    """Hierarchical configuration management"""
    
    def __init__(self):
        self.config = {}
        self._load_defaults()
        self._load_environment()
        self._load_config_file()
    
    def _load_defaults(self):
        """Load default configuration"""
        self.config = {
            'api_base_url': 'https://api.example.com',
            'timeout': 30,
            'retry_count': 3
        }
    
    def _load_environment(self):
        """Override with environment variables"""
        for key in ['API_KEY', 'CLIENT_ID', 'CLIENT_SECRET']:
            value = os.getenv(key)
            if value:
                self.config[key.lower()] = value
    
    def _load_config_file(self):
        """Override with config file if exists"""
        if os.path.exists('config.json'):
            with open('config.json', 'r') as f:
                file_config = json.load(f)
                self.config.update(file_config)
    
    def get(self, key, default=None):
        """Get configuration value"""
        return self.config.get(key, default)

API Key Authentication

Different API Key Patterns

import requests
from urllib.parse import urlencode

class APIKeyAuth:
    """Handle different API key authentication patterns"""
    
    def __init__(self, api_key, key_name='api_key'):
        self.api_key = api_key
        self.key_name = key_name
    
    def auth_via_header(self, session=None):
        """API key in header"""
        headers = {self.key_name: self.api_key}
        
        if session:
            session.headers.update(headers)
            return session
        else:
            return headers
    
    def auth_via_query(self, url, params=None):
        """API key in query parameters"""
        if params is None:
            params = {}
        
        params[self.key_name] = self.api_key
        
        # Build complete URL
        if '?' in url:
            return f"{url}&{urlencode({self.key_name: self.api_key})}"
        else:
            return f"{url}?{urlencode(params)}"
    
    def auth_via_body(self, data=None):
        """API key in request body"""
        if data is None:
            data = {}
        
        data[self.key_name] = self.api_key
        return data

# Example implementations for popular APIs
class OpenAIClient:
    """OpenAI API authentication example"""
    
    def __init__(self, api_key):
        self.api_key = api_key
        self.base_url = 'https://api.openai.com/v1'
        self.session = requests.Session()
        self.session.headers.update({
            'Authorization': f'Bearer {api_key}',
            'Content-Type': 'application/json'
        })
    
    def complete(self, prompt, model='gpt-3.5-turbo'):
        """Make completion request"""
        response = self.session.post(
            f'{self.base_url}/chat/completions',
            json={
                'model': model,
                'messages': [{'role': 'user', 'content': prompt}]
            }
        )
        response.raise_for_status()
        return response.json()

class GoogleMapsClient:
    """Google Maps API authentication example"""
    
    def __init__(self, api_key):
        self.api_key = api_key
        self.base_url = 'https://maps.googleapis.com/maps/api'
    
    def geocode(self, address):
        """Geocode an address"""
        response = requests.get(
            f'{self.base_url}/geocode/json',
            params={
                'address': address,
                'key': self.api_key  # API key in query parameter
            }
        )
        response.raise_for_status()
        return response.json()

class RapidAPIClient:
    """RapidAPI authentication example"""
    
    def __init__(self, api_key, api_host):
        self.api_key = api_key
        self.api_host = api_host
        self.session = requests.Session()
        self.session.headers.update({
            'X-RapidAPI-Key': api_key,
            'X-RapidAPI-Host': api_host
        })
    
    def request(self, endpoint, method='GET', **kwargs):
        """Make authenticated request"""
        url = f"https://{self.api_host}/{endpoint}"
        response = self.session.request(method, url, **kwargs)
        response.raise_for_status()
        return response.json()

Basic Authentication

import requests
from requests.auth import HTTPBasicAuth
import base64

class BasicAuthClient:
    """HTTP Basic Authentication implementation"""
    
    def __init__(self, username, password):
        self.username = username
        self.password = password
        self.session = requests.Session()
        self.session.auth = (username, password)
    
    def manual_header(self):
        """Create Authorization header manually"""
        credentials = f"{self.username}:{self.password}"
        encoded = base64.b64encode(credentials.encode()).decode()
        return {'Authorization': f'Basic {encoded}'}
    
    def test_auth(self, url):
        """Test authentication"""
        response = self.session.get(url)
        return response.status_code == 200

# Example: JIRA API with Basic Auth
class JIRAClient:
    """JIRA API client with Basic Authentication"""
    
    def __init__(self, domain, email, api_token):
        self.domain = domain
        self.base_url = f'https://{domain}.atlassian.net/rest/api/3'
        self.session = requests.Session()
        # JIRA uses email and API token for basic auth
        self.session.auth = (email, api_token)
        self.session.headers.update({
            'Accept': 'application/json',
            'Content-Type': 'application/json'
        })
    
    def get_issue(self, issue_key):
        """Get JIRA issue details"""
        response = self.session.get(f'{self.base_url}/issue/{issue_key}')
        response.raise_for_status()
        return response.json()
    
    def create_issue(self, project_key, summary, description, issue_type='Task'):
        """Create new JIRA issue"""
        data = {
            'fields': {
                'project': {'key': project_key},
                'summary': summary,
                'description': description,
                'issuetype': {'name': issue_type}
            }
        }
        response = self.session.post(f'{self.base_url}/issue', json=data)
        response.raise_for_status()
        return response.json()

# Digest Authentication (less common)
from requests.auth import HTTPDigestAuth

class DigestAuthClient:
    """HTTP Digest Authentication"""
    
    def __init__(self, username, password):
        self.auth = HTTPDigestAuth(username, password)
    
    def request(self, url):
        response = requests.get(url, auth=self.auth)
        return response

OAuth 2.0 Authentication

OAuth 2.0 Flows

import requests
from requests_oauthlib import OAuth2Session
import webbrowser
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import urlparse, parse_qs
import threading

class OAuth2Client:
    """Generic OAuth 2.0 client implementation"""
    
    def __init__(self, client_id, client_secret, redirect_uri):
        self.client_id = client_id
        self.client_secret = client_secret
        self.redirect_uri = redirect_uri
        self.token = None
    
    def get_authorization_url(self, auth_base_url, scope=None, state=None):
        """Generate authorization URL"""
        oauth = OAuth2Session(
            self.client_id,
            redirect_uri=self.redirect_uri,
            scope=scope,
            state=state
        )
        authorization_url, state = oauth.authorization_url(auth_base_url)
        return authorization_url, state
    
    def fetch_token(self, token_url, authorization_response):
        """Exchange authorization code for access token"""
        oauth = OAuth2Session(
            self.client_id,
            redirect_uri=self.redirect_uri
        )
        
        token = oauth.fetch_token(
            token_url,
            authorization_response=authorization_response,
            client_secret=self.client_secret
        )
        
        self.token = token
        return token
    
    def refresh_token(self, token_url, refresh_token):
        """Refresh expired access token"""
        data = {
            'grant_type': 'refresh_token',
            'refresh_token': refresh_token,
            'client_id': self.client_id,
            'client_secret': self.client_secret
        }
        
        response = requests.post(token_url, data=data)
        response.raise_for_status()
        
        self.token = response.json()
        return self.token

# Authorization Code Flow (most secure for web apps)
class GitHubOAuth:
    """GitHub OAuth implementation"""
    
    def __init__(self, client_id, client_secret):
        self.client_id = client_id
        self.client_secret = client_secret
        self.authorization_base_url = 'https://github.com/login/oauth/authorize'
        self.token_url = 'https://github.com/login/oauth/access_token'
        self.redirect_uri = 'http://localhost:8080/callback'
        self.token = None
    
    def authorize(self):
        """Complete OAuth flow"""
        # Step 1: Get authorization URL
        oauth = OAuth2Session(self.client_id, redirect_uri=self.redirect_uri)
        authorization_url, state = oauth.authorization_url(
            self.authorization_base_url,
            scope=['repo', 'user']
        )
        
        print(f'Please visit this URL to authorize: {authorization_url}')
        webbrowser.open(authorization_url)
        
        # Step 2: Get authorization code from callback
        authorization_code = self._get_authorization_code()
        
        # Step 3: Exchange code for token
        self._fetch_token(authorization_code)
    
    def _get_authorization_code(self):
        """Start local server to receive callback"""
        code = {'value': None}
        
        class CallbackHandler(BaseHTTPRequestHandler):
            def do_GET(self):
                query = urlparse(self.path).query
                params = parse_qs(query)
                code['value'] = params.get('code', [None])[0]
                
                self.send_response(200)
                self.send_header('Content-type', 'text/html')
                self.end_headers()
                self.wfile.write(b'

Authorization successful!

') def log_message(self, format, *args): pass # Suppress log messages server = HTTPServer(('localhost', 8080), CallbackHandler) server.handle_request() return code['value'] def _fetch_token(self, code): """Exchange authorization code for access token""" data = { 'client_id': self.client_id, 'client_secret': self.client_secret, 'code': code } headers = {'Accept': 'application/json'} response = requests.post(self.token_url, data=data, headers=headers) response.raise_for_status() self.token = response.json()['access_token'] return self.token def get_user(self): """Get authenticated user information""" headers = {'Authorization': f'token {self.token}'} response = requests.get('https://api.github.com/user', headers=headers) response.raise_for_status() return response.json() # Client Credentials Flow (for server-to-server) class SpotifyClient: """Spotify API client credentials flow""" def __init__(self, client_id, client_secret): self.client_id = client_id self.client_secret = client_secret self.token_url = 'https://accounts.spotify.com/api/token' self.base_url = 'https://api.spotify.com/v1' self.token = None def authenticate(self): """Get access token using client credentials""" auth = (self.client_id, self.client_secret) data = {'grant_type': 'client_credentials'} response = requests.post(self.token_url, auth=auth, data=data) response.raise_for_status() self.token = response.json()['access_token'] return self.token def search(self, query, type='track', limit=10): """Search Spotify catalog""" if not self.token: self.authenticate() headers = {'Authorization': f'Bearer {self.token}'} params = { 'q': query, 'type': type, 'limit': limit } response = requests.get( f'{self.base_url}/search', headers=headers, params=params ) # Handle token expiration if response.status_code == 401: self.authenticate() headers = {'Authorization': f'Bearer {self.token}'} response = requests.get( f'{self.base_url}/search', headers=headers, params=params ) response.raise_for_status() return response.json()

JWT (JSON Web Tokens)

import jwt
import time
from datetime import datetime, timedelta
import requests

class JWTAuth:
    """JWT authentication handler"""
    
    def __init__(self, secret_key, algorithm='HS256'):
        self.secret_key = secret_key
        self.algorithm = algorithm
    
    def create_token(self, payload, expires_in=3600):
        """Create JWT token"""
        # Add standard claims
        payload['iat'] = datetime.utcnow()
        payload['exp'] = datetime.utcnow() + timedelta(seconds=expires_in)
        
        # Generate token
        token = jwt.encode(payload, self.secret_key, algorithm=self.algorithm)
        return token
    
    def decode_token(self, token):
        """Decode and verify JWT token"""
        try:
            payload = jwt.decode(
                token,
                self.secret_key,
                algorithms=[self.algorithm]
            )
            return payload
        except jwt.ExpiredSignatureError:
            raise Exception("Token has expired")
        except jwt.InvalidTokenError as e:
            raise Exception(f"Invalid token: {e}")
    
    def is_token_valid(self, token):
        """Check if token is valid"""
        try:
            self.decode_token(token)
            return True
        except:
            return False

# Example: Auth0 JWT authentication
class Auth0Client:
    """Auth0 JWT authentication client"""
    
    def __init__(self, domain, client_id, client_secret, audience):
        self.domain = domain
        self.client_id = client_id
        self.client_secret = client_secret
        self.audience = audience
        self.token = None
        self.token_expires_at = 0
    
    def get_token(self):
        """Get access token from Auth0"""
        # Check if current token is still valid
        if self.token and time.time() < self.token_expires_at:
            return self.token
        
        # Request new token
        url = f'https://{self.domain}/oauth/token'
        data = {
            'client_id': self.client_id,
            'client_secret': self.client_secret,
            'audience': self.audience,
            'grant_type': 'client_credentials'
        }
        
        response = requests.post(url, json=data)
        response.raise_for_status()
        
        token_data = response.json()
        self.token = token_data['access_token']
        self.token_expires_at = time.time() + token_data['expires_in'] - 60
        
        return self.token
    
    def make_authenticated_request(self, url, method='GET', **kwargs):
        """Make authenticated API request"""
        token = self.get_token()
        
        headers = kwargs.get('headers', {})
        headers['Authorization'] = f'Bearer {token}'
        kwargs['headers'] = headers
        
        response = requests.request(method, url, **kwargs)
        
        # Retry with new token if unauthorized
        if response.status_code == 401:
            self.token = None
            token = self.get_token()
            headers['Authorization'] = f'Bearer {token}'
            response = requests.request(method, url, **kwargs)
        
        return response

# Example: Firebase JWT authentication
class FirebaseAuth:
    """Firebase authentication using JWT"""
    
    def __init__(self, service_account_key):
        self.service_account = service_account_key
        self.token = None
        self.token_expires_at = 0
    
    def _create_custom_token(self):
        """Create custom Firebase token"""
        import google.auth
        from google.oauth2 import service_account
        
        credentials = service_account.Credentials.from_service_account_info(
            self.service_account,
            scopes=['https://www.googleapis.com/auth/firebase']
        )
        
        # Create JWT claims
        now = time.time()
        payload = {
            'iss': self.service_account['client_email'],
            'sub': self.service_account['client_email'],
            'aud': 'https://identitytoolkit.googleapis.com/google.identity.identitytoolkit.v1.IdentityToolkit',
            'iat': now,
            'exp': now + 3600,
            'uid': 'service-account'
        }
        
        # Sign token
        token = jwt.encode(
            payload,
            self.service_account['private_key'],
            algorithm='RS256'
        )
        
        return token

HMAC Authentication

import hmac
import hashlib
import time
import requests
from urllib.parse import urlencode

class HMACAuth:
    """HMAC-based authentication"""
    
    def __init__(self, api_key, api_secret):
        self.api_key = api_key
        self.api_secret = api_secret
    
    def generate_signature(self, method, url, params=None, body=None):
        """Generate HMAC signature"""
        # Create string to sign
        timestamp = str(int(time.time()))
        
        if params:
            query_string = urlencode(sorted(params.items()))
        else:
            query_string = ''
        
        if body:
            body_string = json.dumps(body, separators=(',', ':'))
        else:
            body_string = ''
        
        # Construct message
        message = f"{method}\n{url}\n{query_string}\n{body_string}\n{timestamp}"
        
        # Generate signature
        signature = hmac.new(
            self.api_secret.encode(),
            message.encode(),
            hashlib.sha256
        ).hexdigest()
        
        return signature, timestamp
    
    def make_request(self, method, url, params=None, json_data=None):
        """Make authenticated request with HMAC"""
        signature, timestamp = self.generate_signature(
            method, url, params, json_data
        )
        
        headers = {
            'X-API-Key': self.api_key,
            'X-Signature': signature,
            'X-Timestamp': timestamp
        }
        
        response = requests.request(
            method, url,
            params=params,
            json=json_data,
            headers=headers
        )
        
        return response

# Example: AWS Signature Version 4
class AWSSignatureV4:
    """AWS Signature Version 4 authentication"""
    
    def __init__(self, access_key, secret_key, region, service):
        self.access_key = access_key
        self.secret_key = secret_key
        self.region = region
        self.service = service
    
    def sign_request(self, request):
        """Sign AWS API request"""
        from botocore.auth import SigV4Auth
        from botocore.awsrequest import AWSRequest
        import boto3
        
        # Create credentials
        credentials = boto3.Session(
            aws_access_key_id=self.access_key,
            aws_secret_access_key=self.secret_key
        ).get_credentials()
        
        # Create AWS request
        aws_request = AWSRequest(
            method=request.method,
            url=request.url,
            data=request.body,
            headers=request.headers
        )
        
        # Sign request
        SigV4Auth(credentials, self.service, self.region).add_auth(aws_request)
        
        # Update original request headers
        request.headers.update(dict(aws_request.headers))
        
        return request

# Example: Binance API HMAC authentication
class BinanceClient:
    """Binance API client with HMAC authentication"""
    
    def __init__(self, api_key, api_secret):
        self.api_key = api_key
        self.api_secret = api_secret
        self.base_url = 'https://api.binance.com'
    
    def _generate_signature(self, params):
        """Generate Binance API signature"""
        query_string = urlencode(params)
        signature = hmac.new(
            self.api_secret.encode(),
            query_string.encode(),
            hashlib.sha256
        ).hexdigest()
        return signature
    
    def get_account(self):
        """Get account information"""
        endpoint = '/api/v3/account'
        
        params = {
            'timestamp': int(time.time() * 1000),
            'recvWindow': 5000
        }
        
        # Add signature
        params['signature'] = self._generate_signature(params)
        
        headers = {'X-MBX-APIKEY': self.api_key}
        
        response = requests.get(
            f'{self.base_url}{endpoint}',
            params=params,
            headers=headers
        )
        response.raise_for_status()
        
        return response.json()

Multi-Factor Authentication

import pyotp
import qrcode
from io import BytesIO
import base64

class MFAManager:
    """Multi-factor authentication manager"""
    
    def __init__(self, issuer_name='MyApp'):
        self.issuer_name = issuer_name
    
    def generate_secret(self):
        """Generate new TOTP secret"""
        return pyotp.random_base32()
    
    def generate_qr_code(self, email, secret):
        """Generate QR code for authenticator app"""
        totp_uri = pyotp.totp.TOTP(secret).provisioning_uri(
            name=email,
            issuer_name=self.issuer_name
        )
        
        # Generate QR code
        qr = qrcode.QRCode(version=1, box_size=10, border=5)
        qr.add_data(totp_uri)
        qr.make(fit=True)
        
        img = qr.make_image(fill_color="black", back_color="white")
        
        # Convert to base64 for embedding
        buffer = BytesIO()
        img.save(buffer, format='PNG')
        img_str = base64.b64encode(buffer.getvalue()).decode()
        
        return f"data:image/png;base64,{img_str}"
    
    def verify_token(self, secret, token, window=1):
        """Verify TOTP token"""
        totp = pyotp.TOTP(secret)
        return totp.verify(token, valid_window=window)
    
    def generate_backup_codes(self, count=10):
        """Generate backup codes"""
        import secrets
        codes = []
        for _ in range(count):
            code = secrets.token_hex(4).upper()
            codes.append(f"{code[:4]}-{code[4:]}")
        return codes

# SMS-based 2FA
class SMS2FA:
    """SMS two-factor authentication"""
    
    def __init__(self, twilio_sid, twilio_token, from_number):
        from twilio.rest import Client
        self.client = Client(twilio_sid, twilio_token)
        self.from_number = from_number
        self.verification_codes = {}
    
    def send_verification(self, phone_number):
        """Send verification code via SMS"""
        import random
        
        # Generate 6-digit code
        code = str(random.randint(100000, 999999))
        
        # Store code (in production, use Redis or database)
        self.verification_codes[phone_number] = {
            'code': code,
            'timestamp': time.time()
        }
        
        # Send SMS
        message = self.client.messages.create(
            body=f"Your verification code is: {code}",
            from_=self.from_number,
            to=phone_number
        )
        
        return message.sid
    
    def verify_code(self, phone_number, code, expiry_seconds=300):
        """Verify SMS code"""
        if phone_number not in self.verification_codes:
            return False
        
        stored = self.verification_codes[phone_number]
        
        # Check expiry
        if time.time() - stored['timestamp'] > expiry_seconds:
            del self.verification_codes[phone_number]
            return False
        
        # Check code
        if stored['code'] == code:
            del self.verification_codes[phone_number]
            return True
        
        return False

Token Management

import pickle
import json
from datetime import datetime, timedelta
import sqlite3

class TokenManager:
    """Manage and persist authentication tokens"""
    
    def __init__(self, storage_path='tokens.db'):
        self.storage_path = storage_path
        self._init_db()
    
    def _init_db(self):
        """Initialize token database"""
        conn = sqlite3.connect(self.storage_path)
        cursor = conn.cursor()
        
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS tokens (
                service TEXT PRIMARY KEY,
                token TEXT NOT NULL,
                refresh_token TEXT,
                expires_at TIMESTAMP,
                metadata TEXT
            )
        ''')
        
        conn.commit()
        conn.close()
    
    def save_token(self, service, token, refresh_token=None, expires_in=None, metadata=None):
        """Save token to database"""
        conn = sqlite3.connect(self.storage_path)
        cursor = conn.cursor()
        
        expires_at = None
        if expires_in:
            expires_at = datetime.now() + timedelta(seconds=expires_in)
        
        metadata_json = json.dumps(metadata) if metadata else None
        
        cursor.execute('''
            INSERT OR REPLACE INTO tokens 
            (service, token, refresh_token, expires_at, metadata)
            VALUES (?, ?, ?, ?, ?)
        ''', (service, token, refresh_token, expires_at, metadata_json))
        
        conn.commit()
        conn.close()
    
    def get_token(self, service):
        """Get token from database"""
        conn = sqlite3.connect(self.storage_path)
        cursor = conn.cursor()
        
        cursor.execute('''
            SELECT token, refresh_token, expires_at, metadata
            FROM tokens
            WHERE service = ?
        ''', (service,))
        
        row = cursor.fetchone()
        conn.close()
        
        if not row:
            return None
        
        token, refresh_token, expires_at, metadata_json = row
        
        # Check expiry
        if expires_at:
            expires = datetime.fromisoformat(expires_at)
            if expires < datetime.now():
                return None  # Token expired
        
        metadata = json.loads(metadata_json) if metadata_json else None
        
        return {
            'token': token,
            'refresh_token': refresh_token,
            'expires_at': expires_at,
            'metadata': metadata
        }
    
    def delete_token(self, service):
        """Delete token from database"""
        conn = sqlite3.connect(self.storage_path)
        cursor = conn.cursor()
        
        cursor.execute('DELETE FROM tokens WHERE service = ?', (service,))
        
        conn.commit()
        conn.close()

# Automatic token refresh
class AutoRefreshClient:
    """Client with automatic token refresh"""
    
    def __init__(self, client_id, client_secret, token_manager):
        self.client_id = client_id
        self.client_secret = client_secret
        self.token_manager = token_manager
        self.service_name = 'my_service'
    
    def _get_valid_token(self):
        """Get valid token, refreshing if necessary"""
        token_data = self.token_manager.get_token(self.service_name)
        
        if not token_data:
            # No token, need to authenticate
            return self._authenticate()
        
        # Check if token needs refresh
        if token_data['expires_at']:
            expires = datetime.fromisoformat(token_data['expires_at'])
            if expires - datetime.now() < timedelta(minutes=5):
                # Token expires soon, refresh it
                return self._refresh_token(token_data['refresh_token'])
        
        return token_data['token']
    
    def _authenticate(self):
        """Perform initial authentication"""
        # Implementation depends on auth method
        pass
    
    def _refresh_token(self, refresh_token):
        """Refresh access token"""
        response = requests.post(
            'https://api.example.com/token',
            data={
                'grant_type': 'refresh_token',
                'refresh_token': refresh_token,
                'client_id': self.client_id,
                'client_secret': self.client_secret
            }
        )
        response.raise_for_status()
        
        token_data = response.json()
        
        # Save new token
        self.token_manager.save_token(
            self.service_name,
            token_data['access_token'],
            token_data.get('refresh_token'),
            token_data.get('expires_in')
        )
        
        return token_data['access_token']
    
    def make_request(self, url, **kwargs):
        """Make authenticated request with automatic refresh"""
        token = self._get_valid_token()
        
        headers = kwargs.get('headers', {})
        headers['Authorization'] = f'Bearer {token}'
        kwargs['headers'] = headers
        
        response = requests.get(url, **kwargs)
        
        # Retry with new token if unauthorized
        if response.status_code == 401:
            token = self._refresh_token(
                self.token_manager.get_token(self.service_name)['refresh_token']
            )
            headers['Authorization'] = f'Bearer {token}'
            response = requests.get(url, **kwargs)
        
        return response

Security Best Practices

# Security best practices for API authentication

import hashlib
import secrets
from cryptography.fernet import Fernet

class SecureAPIClient:
    """API client with security best practices"""
    
    def __init__(self):
        self.session = requests.Session()
        self._configure_security()
    
    def _configure_security(self):
        """Configure security settings"""
        # 1. Use HTTPS only
        self.session.mount('http://', HTTPAdapter(max_retries=0))
        
        # 2. Verify SSL certificates
        self.session.verify = True
        
        # 3. Set secure headers
        self.session.headers.update({
            'User-Agent': 'SecureAPIClient/1.0',
            'Accept': 'application/json',
            'X-Requested-With': 'XMLHttpRequest'
        })
        
        # 4. Enable certificate pinning (optional)
        # self.session.mount('https://', PinnedHTTPAdapter())
    
    def validate_response(self, response):
        """Validate API response"""
        # Check content type
        content_type = response.headers.get('Content-Type', '')
        if 'application/json' not in content_type:
            raise ValueError(f"Unexpected content type: {content_type}")
        
        # Check for security headers
        security_headers = [
            'X-Content-Type-Options',
            'X-Frame-Options',
            'Strict-Transport-Security'
        ]
        
        for header in security_headers:
            if header not in response.headers:
                print(f"Warning: Missing security header {header}")
        
        return True

# Secure credential storage
class CredentialVault:
    """Secure credential storage"""
    
    def __init__(self, master_password):
        # Derive encryption key from master password
        salt = b'stable_salt'  # In production, use random salt
        kdf = hashlib.pbkdf2_hmac('sha256', master_password.encode(), salt, 100000)
        self.cipher = Fernet(base64.urlsafe_b64encode(kdf[:32]))
        self.credentials = {}
    
    def store_credential(self, service, credential):
        """Store encrypted credential"""
        encrypted = self.cipher.encrypt(credential.encode())
        self.credentials[service] = encrypted
    
    def get_credential(self, service):
        """Retrieve and decrypt credential"""
        if service not in self.credentials:
            return None
        
        encrypted = self.credentials[service]
        decrypted = self.cipher.decrypt(encrypted)
        return decrypted.decode()

# API key rotation
class APIKeyRotation:
    """Manage API key rotation"""
    
    def __init__(self, api_service):
        self.api_service = api_service
        self.current_key = None
        self.previous_key = None
        self.rotation_interval = timedelta(days=30)
        self.last_rotation = None
    
    def should_rotate(self):
        """Check if key should be rotated"""
        if not self.last_rotation:
            return True
        
        return datetime.now() - self.last_rotation > self.rotation_interval
    
    def rotate_key(self):
        """Rotate API key"""
        # Generate new key
        new_key = secrets.token_urlsafe(32)
        
        # Keep previous key for grace period
        self.previous_key = self.current_key
        self.current_key = new_key
        self.last_rotation = datetime.now()
        
        # Update key with API service
        self.api_service.update_api_key(new_key)
        
        return new_key
    
    def get_active_keys(self):
        """Get list of active keys"""
        keys = [self.current_key]
        if self.previous_key:
            keys.append(self.previous_key)
        return keys

# Rate limiting and retry with authentication
class AuthenticatedRateLimiter:
    """Rate limiter with authentication awareness"""
    
    def __init__(self, calls_per_second=1):
        self.calls_per_second = calls_per_second
        self.last_call = {}
        self.retry_after = {}
    
    def wait_if_needed(self, endpoint, auth_level='standard'):
        """Wait if rate limit requires"""
        # Different limits for different auth levels
        limits = {
            'anonymous': 0.1,  # 1 call per 10 seconds
            'standard': 1,      # 1 call per second
            'premium': 10       # 10 calls per second
        }
        
        limit = limits.get(auth_level, 1)
        delay = 1.0 / limit
        
        now = time.time()
        
        # Check retry-after header
        if endpoint in self.retry_after:
            retry_time = self.retry_after[endpoint]
            if now < retry_time:
                time.sleep(retry_time - now)
                del self.retry_after[endpoint]
                return
        
        # Check normal rate limit
        if endpoint in self.last_call:
            elapsed = now - self.last_call[endpoint]
            if elapsed < delay:
                time.sleep(delay - elapsed)
        
        self.last_call[endpoint] = time.time()
    
    def handle_rate_limit_response(self, response, endpoint):
        """Handle rate limit response"""
        if response.status_code == 429:
            # Check Retry-After header
            retry_after = response.headers.get('Retry-After')
            if retry_after:
                if retry_after.isdigit():
                    self.retry_after[endpoint] = time.time() + int(retry_after)
                else:
                    # Parse HTTP date
                    retry_time = datetime.strptime(retry_after, '%a, %d %b %Y %H:%M:%S GMT')
                    self.retry_after[endpoint] = retry_time.timestamp()
            else:
                # Default backoff
                self.retry_after[endpoint] = time.time() + 60

Practice Exercises

Exercise 1: Multi-Service API Client

Build a unified API client that:

  1. Supports multiple authentication methods
  2. Automatically detects auth type from configuration
  3. Handles token refresh automatically
  4. Implements retry logic with backoff
  5. Securely stores credentials

Exercise 2: OAuth 2.0 Flow Implementation

Implement a complete OAuth 2.0 flow:

  1. Authorization code flow with PKCE
  2. Token refresh handling
  3. Secure token storage
  4. Session management
  5. Logout functionality

Exercise 3: API Security Auditor

Create a security audit tool that:

  1. Tests authentication endpoints
  2. Checks for security headers
  3. Validates SSL certificates
  4. Tests rate limiting
  5. Generates security report

Key Takeaways

Further Resources