API Authentication
Secure Your API Connections! 🔐
API authentication is the gateway to accessing protected resources and services. From simple API keys to complex OAuth flows, understanding authentication mechanisms is crucial for working with modern APIs. Master these patterns to securely integrate with any service, protect your credentials, and build robust data pipelines.
Authentication Methods Overview
graph TD
A[API Authentication] --> B[API Keys]
A --> C[Basic Auth]
A --> D[Bearer Tokens]
A --> E[OAuth 2.0]
A --> F[JWT]
A --> G[HMAC]
B --> H[Header/Query/Body]
C --> I[Username/Password]
D --> J[Access Tokens]
E --> K[Authorization Code]
E --> L[Client Credentials]
E --> M[Implicit]
F --> N[Self-contained Tokens]
G --> O[Signature-based]
Environment Setup and Security
Secure Credential Management
# NEVER hardcode credentials in your code!
import os
from dotenv import load_dotenv
import json
import keyring
from cryptography.fernet import Fernet
import base64
# Method 1: Environment Variables (.env file)
"""
Create a .env file:
API_KEY=your-secret-api-key
CLIENT_ID=your-client-id
CLIENT_SECRET=your-client-secret
Add .env to .gitignore!
"""
# Load environment variables
load_dotenv()
# Access credentials
api_key = os.getenv('API_KEY')
client_id = os.getenv('CLIENT_ID')
client_secret = os.getenv('CLIENT_SECRET')
# Method 2: Configuration Files
def load_config(config_file='config.json'):
"""Load configuration from JSON file"""
try:
with open(config_file, 'r') as f:
config = json.load(f)
return config
except FileNotFoundError:
print(f"Config file {config_file} not found")
return {}
# Method 3: System Keyring
def store_credential(service, username, password):
"""Store credential in system keyring"""
keyring.set_password(service, username, password)
def get_credential(service, username):
"""Retrieve credential from system keyring"""
return keyring.get_password(service, username)
# Example usage
# store_credential('my_api', 'api_key', 'secret123')
# api_key = get_credential('my_api', 'api_key')
# Method 4: Encrypted Configuration
class EncryptedConfig:
"""Encrypted configuration manager"""
def __init__(self, key=None):
if key:
self.cipher = Fernet(key)
else:
self.cipher = Fernet(Fernet.generate_key())
def encrypt_config(self, config_dict):
"""Encrypt configuration dictionary"""
json_string = json.dumps(config_dict)
encrypted = self.cipher.encrypt(json_string.encode())
return base64.b64encode(encrypted).decode()
def decrypt_config(self, encrypted_string):
"""Decrypt configuration string"""
encrypted = base64.b64decode(encrypted_string.encode())
decrypted = self.cipher.decrypt(encrypted)
return json.loads(decrypted.decode())
# Best practice: Configuration hierarchy
class ConfigManager:
"""Hierarchical configuration management"""
def __init__(self):
self.config = {}
self._load_defaults()
self._load_environment()
self._load_config_file()
def _load_defaults(self):
"""Load default configuration"""
self.config = {
'api_base_url': 'https://api.example.com',
'timeout': 30,
'retry_count': 3
}
def _load_environment(self):
"""Override with environment variables"""
for key in ['API_KEY', 'CLIENT_ID', 'CLIENT_SECRET']:
value = os.getenv(key)
if value:
self.config[key.lower()] = value
def _load_config_file(self):
"""Override with config file if exists"""
if os.path.exists('config.json'):
with open('config.json', 'r') as f:
file_config = json.load(f)
self.config.update(file_config)
def get(self, key, default=None):
"""Get configuration value"""
return self.config.get(key, default)
API Key Authentication
Different API Key Patterns
import requests
from urllib.parse import urlencode
class APIKeyAuth:
"""Handle different API key authentication patterns"""
def __init__(self, api_key, key_name='api_key'):
self.api_key = api_key
self.key_name = key_name
def auth_via_header(self, session=None):
"""API key in header"""
headers = {self.key_name: self.api_key}
if session:
session.headers.update(headers)
return session
else:
return headers
def auth_via_query(self, url, params=None):
"""API key in query parameters"""
if params is None:
params = {}
params[self.key_name] = self.api_key
# Build complete URL
if '?' in url:
return f"{url}&{urlencode({self.key_name: self.api_key})}"
else:
return f"{url}?{urlencode(params)}"
def auth_via_body(self, data=None):
"""API key in request body"""
if data is None:
data = {}
data[self.key_name] = self.api_key
return data
# Example implementations for popular APIs
class OpenAIClient:
"""OpenAI API authentication example"""
def __init__(self, api_key):
self.api_key = api_key
self.base_url = 'https://api.openai.com/v1'
self.session = requests.Session()
self.session.headers.update({
'Authorization': f'Bearer {api_key}',
'Content-Type': 'application/json'
})
def complete(self, prompt, model='gpt-3.5-turbo'):
"""Make completion request"""
response = self.session.post(
f'{self.base_url}/chat/completions',
json={
'model': model,
'messages': [{'role': 'user', 'content': prompt}]
}
)
response.raise_for_status()
return response.json()
class GoogleMapsClient:
"""Google Maps API authentication example"""
def __init__(self, api_key):
self.api_key = api_key
self.base_url = 'https://maps.googleapis.com/maps/api'
def geocode(self, address):
"""Geocode an address"""
response = requests.get(
f'{self.base_url}/geocode/json',
params={
'address': address,
'key': self.api_key # API key in query parameter
}
)
response.raise_for_status()
return response.json()
class RapidAPIClient:
"""RapidAPI authentication example"""
def __init__(self, api_key, api_host):
self.api_key = api_key
self.api_host = api_host
self.session = requests.Session()
self.session.headers.update({
'X-RapidAPI-Key': api_key,
'X-RapidAPI-Host': api_host
})
def request(self, endpoint, method='GET', **kwargs):
"""Make authenticated request"""
url = f"https://{self.api_host}/{endpoint}"
response = self.session.request(method, url, **kwargs)
response.raise_for_status()
return response.json()
Basic Authentication
import requests
from requests.auth import HTTPBasicAuth
import base64
class BasicAuthClient:
"""HTTP Basic Authentication implementation"""
def __init__(self, username, password):
self.username = username
self.password = password
self.session = requests.Session()
self.session.auth = (username, password)
def manual_header(self):
"""Create Authorization header manually"""
credentials = f"{self.username}:{self.password}"
encoded = base64.b64encode(credentials.encode()).decode()
return {'Authorization': f'Basic {encoded}'}
def test_auth(self, url):
"""Test authentication"""
response = self.session.get(url)
return response.status_code == 200
# Example: JIRA API with Basic Auth
class JIRAClient:
"""JIRA API client with Basic Authentication"""
def __init__(self, domain, email, api_token):
self.domain = domain
self.base_url = f'https://{domain}.atlassian.net/rest/api/3'
self.session = requests.Session()
# JIRA uses email and API token for basic auth
self.session.auth = (email, api_token)
self.session.headers.update({
'Accept': 'application/json',
'Content-Type': 'application/json'
})
def get_issue(self, issue_key):
"""Get JIRA issue details"""
response = self.session.get(f'{self.base_url}/issue/{issue_key}')
response.raise_for_status()
return response.json()
def create_issue(self, project_key, summary, description, issue_type='Task'):
"""Create new JIRA issue"""
data = {
'fields': {
'project': {'key': project_key},
'summary': summary,
'description': description,
'issuetype': {'name': issue_type}
}
}
response = self.session.post(f'{self.base_url}/issue', json=data)
response.raise_for_status()
return response.json()
# Digest Authentication (less common)
from requests.auth import HTTPDigestAuth
class DigestAuthClient:
"""HTTP Digest Authentication"""
def __init__(self, username, password):
self.auth = HTTPDigestAuth(username, password)
def request(self, url):
response = requests.get(url, auth=self.auth)
return response
OAuth 2.0 Authentication
OAuth 2.0 Flows
import requests
from requests_oauthlib import OAuth2Session
import webbrowser
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import urlparse, parse_qs
import threading
class OAuth2Client:
"""Generic OAuth 2.0 client implementation"""
def __init__(self, client_id, client_secret, redirect_uri):
self.client_id = client_id
self.client_secret = client_secret
self.redirect_uri = redirect_uri
self.token = None
def get_authorization_url(self, auth_base_url, scope=None, state=None):
"""Generate authorization URL"""
oauth = OAuth2Session(
self.client_id,
redirect_uri=self.redirect_uri,
scope=scope,
state=state
)
authorization_url, state = oauth.authorization_url(auth_base_url)
return authorization_url, state
def fetch_token(self, token_url, authorization_response):
"""Exchange authorization code for access token"""
oauth = OAuth2Session(
self.client_id,
redirect_uri=self.redirect_uri
)
token = oauth.fetch_token(
token_url,
authorization_response=authorization_response,
client_secret=self.client_secret
)
self.token = token
return token
def refresh_token(self, token_url, refresh_token):
"""Refresh expired access token"""
data = {
'grant_type': 'refresh_token',
'refresh_token': refresh_token,
'client_id': self.client_id,
'client_secret': self.client_secret
}
response = requests.post(token_url, data=data)
response.raise_for_status()
self.token = response.json()
return self.token
# Authorization Code Flow (most secure for web apps)
class GitHubOAuth:
"""GitHub OAuth implementation"""
def __init__(self, client_id, client_secret):
self.client_id = client_id
self.client_secret = client_secret
self.authorization_base_url = 'https://github.com/login/oauth/authorize'
self.token_url = 'https://github.com/login/oauth/access_token'
self.redirect_uri = 'http://localhost:8080/callback'
self.token = None
def authorize(self):
"""Complete OAuth flow"""
# Step 1: Get authorization URL
oauth = OAuth2Session(self.client_id, redirect_uri=self.redirect_uri)
authorization_url, state = oauth.authorization_url(
self.authorization_base_url,
scope=['repo', 'user']
)
print(f'Please visit this URL to authorize: {authorization_url}')
webbrowser.open(authorization_url)
# Step 2: Get authorization code from callback
authorization_code = self._get_authorization_code()
# Step 3: Exchange code for token
self._fetch_token(authorization_code)
def _get_authorization_code(self):
"""Start local server to receive callback"""
code = {'value': None}
class CallbackHandler(BaseHTTPRequestHandler):
def do_GET(self):
query = urlparse(self.path).query
params = parse_qs(query)
code['value'] = params.get('code', [None])[0]
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write(b'Authorization successful!
')
def log_message(self, format, *args):
pass # Suppress log messages
server = HTTPServer(('localhost', 8080), CallbackHandler)
server.handle_request()
return code['value']
def _fetch_token(self, code):
"""Exchange authorization code for access token"""
data = {
'client_id': self.client_id,
'client_secret': self.client_secret,
'code': code
}
headers = {'Accept': 'application/json'}
response = requests.post(self.token_url, data=data, headers=headers)
response.raise_for_status()
self.token = response.json()['access_token']
return self.token
def get_user(self):
"""Get authenticated user information"""
headers = {'Authorization': f'token {self.token}'}
response = requests.get('https://api.github.com/user', headers=headers)
response.raise_for_status()
return response.json()
# Client Credentials Flow (for server-to-server)
class SpotifyClient:
"""Spotify API client credentials flow"""
def __init__(self, client_id, client_secret):
self.client_id = client_id
self.client_secret = client_secret
self.token_url = 'https://accounts.spotify.com/api/token'
self.base_url = 'https://api.spotify.com/v1'
self.token = None
def authenticate(self):
"""Get access token using client credentials"""
auth = (self.client_id, self.client_secret)
data = {'grant_type': 'client_credentials'}
response = requests.post(self.token_url, auth=auth, data=data)
response.raise_for_status()
self.token = response.json()['access_token']
return self.token
def search(self, query, type='track', limit=10):
"""Search Spotify catalog"""
if not self.token:
self.authenticate()
headers = {'Authorization': f'Bearer {self.token}'}
params = {
'q': query,
'type': type,
'limit': limit
}
response = requests.get(
f'{self.base_url}/search',
headers=headers,
params=params
)
# Handle token expiration
if response.status_code == 401:
self.authenticate()
headers = {'Authorization': f'Bearer {self.token}'}
response = requests.get(
f'{self.base_url}/search',
headers=headers,
params=params
)
response.raise_for_status()
return response.json()
JWT (JSON Web Tokens)
import jwt
import time
from datetime import datetime, timedelta
import requests
class JWTAuth:
"""JWT authentication handler"""
def __init__(self, secret_key, algorithm='HS256'):
self.secret_key = secret_key
self.algorithm = algorithm
def create_token(self, payload, expires_in=3600):
"""Create JWT token"""
# Add standard claims
payload['iat'] = datetime.utcnow()
payload['exp'] = datetime.utcnow() + timedelta(seconds=expires_in)
# Generate token
token = jwt.encode(payload, self.secret_key, algorithm=self.algorithm)
return token
def decode_token(self, token):
"""Decode and verify JWT token"""
try:
payload = jwt.decode(
token,
self.secret_key,
algorithms=[self.algorithm]
)
return payload
except jwt.ExpiredSignatureError:
raise Exception("Token has expired")
except jwt.InvalidTokenError as e:
raise Exception(f"Invalid token: {e}")
def is_token_valid(self, token):
"""Check if token is valid"""
try:
self.decode_token(token)
return True
except:
return False
# Example: Auth0 JWT authentication
class Auth0Client:
"""Auth0 JWT authentication client"""
def __init__(self, domain, client_id, client_secret, audience):
self.domain = domain
self.client_id = client_id
self.client_secret = client_secret
self.audience = audience
self.token = None
self.token_expires_at = 0
def get_token(self):
"""Get access token from Auth0"""
# Check if current token is still valid
if self.token and time.time() < self.token_expires_at:
return self.token
# Request new token
url = f'https://{self.domain}/oauth/token'
data = {
'client_id': self.client_id,
'client_secret': self.client_secret,
'audience': self.audience,
'grant_type': 'client_credentials'
}
response = requests.post(url, json=data)
response.raise_for_status()
token_data = response.json()
self.token = token_data['access_token']
self.token_expires_at = time.time() + token_data['expires_in'] - 60
return self.token
def make_authenticated_request(self, url, method='GET', **kwargs):
"""Make authenticated API request"""
token = self.get_token()
headers = kwargs.get('headers', {})
headers['Authorization'] = f'Bearer {token}'
kwargs['headers'] = headers
response = requests.request(method, url, **kwargs)
# Retry with new token if unauthorized
if response.status_code == 401:
self.token = None
token = self.get_token()
headers['Authorization'] = f'Bearer {token}'
response = requests.request(method, url, **kwargs)
return response
# Example: Firebase JWT authentication
class FirebaseAuth:
"""Firebase authentication using JWT"""
def __init__(self, service_account_key):
self.service_account = service_account_key
self.token = None
self.token_expires_at = 0
def _create_custom_token(self):
"""Create custom Firebase token"""
import google.auth
from google.oauth2 import service_account
credentials = service_account.Credentials.from_service_account_info(
self.service_account,
scopes=['https://www.googleapis.com/auth/firebase']
)
# Create JWT claims
now = time.time()
payload = {
'iss': self.service_account['client_email'],
'sub': self.service_account['client_email'],
'aud': 'https://identitytoolkit.googleapis.com/google.identity.identitytoolkit.v1.IdentityToolkit',
'iat': now,
'exp': now + 3600,
'uid': 'service-account'
}
# Sign token
token = jwt.encode(
payload,
self.service_account['private_key'],
algorithm='RS256'
)
return token
HMAC Authentication
import hmac
import hashlib
import time
import requests
from urllib.parse import urlencode
class HMACAuth:
"""HMAC-based authentication"""
def __init__(self, api_key, api_secret):
self.api_key = api_key
self.api_secret = api_secret
def generate_signature(self, method, url, params=None, body=None):
"""Generate HMAC signature"""
# Create string to sign
timestamp = str(int(time.time()))
if params:
query_string = urlencode(sorted(params.items()))
else:
query_string = ''
if body:
body_string = json.dumps(body, separators=(',', ':'))
else:
body_string = ''
# Construct message
message = f"{method}\n{url}\n{query_string}\n{body_string}\n{timestamp}"
# Generate signature
signature = hmac.new(
self.api_secret.encode(),
message.encode(),
hashlib.sha256
).hexdigest()
return signature, timestamp
def make_request(self, method, url, params=None, json_data=None):
"""Make authenticated request with HMAC"""
signature, timestamp = self.generate_signature(
method, url, params, json_data
)
headers = {
'X-API-Key': self.api_key,
'X-Signature': signature,
'X-Timestamp': timestamp
}
response = requests.request(
method, url,
params=params,
json=json_data,
headers=headers
)
return response
# Example: AWS Signature Version 4
class AWSSignatureV4:
"""AWS Signature Version 4 authentication"""
def __init__(self, access_key, secret_key, region, service):
self.access_key = access_key
self.secret_key = secret_key
self.region = region
self.service = service
def sign_request(self, request):
"""Sign AWS API request"""
from botocore.auth import SigV4Auth
from botocore.awsrequest import AWSRequest
import boto3
# Create credentials
credentials = boto3.Session(
aws_access_key_id=self.access_key,
aws_secret_access_key=self.secret_key
).get_credentials()
# Create AWS request
aws_request = AWSRequest(
method=request.method,
url=request.url,
data=request.body,
headers=request.headers
)
# Sign request
SigV4Auth(credentials, self.service, self.region).add_auth(aws_request)
# Update original request headers
request.headers.update(dict(aws_request.headers))
return request
# Example: Binance API HMAC authentication
class BinanceClient:
"""Binance API client with HMAC authentication"""
def __init__(self, api_key, api_secret):
self.api_key = api_key
self.api_secret = api_secret
self.base_url = 'https://api.binance.com'
def _generate_signature(self, params):
"""Generate Binance API signature"""
query_string = urlencode(params)
signature = hmac.new(
self.api_secret.encode(),
query_string.encode(),
hashlib.sha256
).hexdigest()
return signature
def get_account(self):
"""Get account information"""
endpoint = '/api/v3/account'
params = {
'timestamp': int(time.time() * 1000),
'recvWindow': 5000
}
# Add signature
params['signature'] = self._generate_signature(params)
headers = {'X-MBX-APIKEY': self.api_key}
response = requests.get(
f'{self.base_url}{endpoint}',
params=params,
headers=headers
)
response.raise_for_status()
return response.json()
Multi-Factor Authentication
import pyotp
import qrcode
from io import BytesIO
import base64
class MFAManager:
"""Multi-factor authentication manager"""
def __init__(self, issuer_name='MyApp'):
self.issuer_name = issuer_name
def generate_secret(self):
"""Generate new TOTP secret"""
return pyotp.random_base32()
def generate_qr_code(self, email, secret):
"""Generate QR code for authenticator app"""
totp_uri = pyotp.totp.TOTP(secret).provisioning_uri(
name=email,
issuer_name=self.issuer_name
)
# Generate QR code
qr = qrcode.QRCode(version=1, box_size=10, border=5)
qr.add_data(totp_uri)
qr.make(fit=True)
img = qr.make_image(fill_color="black", back_color="white")
# Convert to base64 for embedding
buffer = BytesIO()
img.save(buffer, format='PNG')
img_str = base64.b64encode(buffer.getvalue()).decode()
return f"data:image/png;base64,{img_str}"
def verify_token(self, secret, token, window=1):
"""Verify TOTP token"""
totp = pyotp.TOTP(secret)
return totp.verify(token, valid_window=window)
def generate_backup_codes(self, count=10):
"""Generate backup codes"""
import secrets
codes = []
for _ in range(count):
code = secrets.token_hex(4).upper()
codes.append(f"{code[:4]}-{code[4:]}")
return codes
# SMS-based 2FA
class SMS2FA:
"""SMS two-factor authentication"""
def __init__(self, twilio_sid, twilio_token, from_number):
from twilio.rest import Client
self.client = Client(twilio_sid, twilio_token)
self.from_number = from_number
self.verification_codes = {}
def send_verification(self, phone_number):
"""Send verification code via SMS"""
import random
# Generate 6-digit code
code = str(random.randint(100000, 999999))
# Store code (in production, use Redis or database)
self.verification_codes[phone_number] = {
'code': code,
'timestamp': time.time()
}
# Send SMS
message = self.client.messages.create(
body=f"Your verification code is: {code}",
from_=self.from_number,
to=phone_number
)
return message.sid
def verify_code(self, phone_number, code, expiry_seconds=300):
"""Verify SMS code"""
if phone_number not in self.verification_codes:
return False
stored = self.verification_codes[phone_number]
# Check expiry
if time.time() - stored['timestamp'] > expiry_seconds:
del self.verification_codes[phone_number]
return False
# Check code
if stored['code'] == code:
del self.verification_codes[phone_number]
return True
return False
Token Management
import pickle
import json
from datetime import datetime, timedelta
import sqlite3
class TokenManager:
"""Manage and persist authentication tokens"""
def __init__(self, storage_path='tokens.db'):
self.storage_path = storage_path
self._init_db()
def _init_db(self):
"""Initialize token database"""
conn = sqlite3.connect(self.storage_path)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS tokens (
service TEXT PRIMARY KEY,
token TEXT NOT NULL,
refresh_token TEXT,
expires_at TIMESTAMP,
metadata TEXT
)
''')
conn.commit()
conn.close()
def save_token(self, service, token, refresh_token=None, expires_in=None, metadata=None):
"""Save token to database"""
conn = sqlite3.connect(self.storage_path)
cursor = conn.cursor()
expires_at = None
if expires_in:
expires_at = datetime.now() + timedelta(seconds=expires_in)
metadata_json = json.dumps(metadata) if metadata else None
cursor.execute('''
INSERT OR REPLACE INTO tokens
(service, token, refresh_token, expires_at, metadata)
VALUES (?, ?, ?, ?, ?)
''', (service, token, refresh_token, expires_at, metadata_json))
conn.commit()
conn.close()
def get_token(self, service):
"""Get token from database"""
conn = sqlite3.connect(self.storage_path)
cursor = conn.cursor()
cursor.execute('''
SELECT token, refresh_token, expires_at, metadata
FROM tokens
WHERE service = ?
''', (service,))
row = cursor.fetchone()
conn.close()
if not row:
return None
token, refresh_token, expires_at, metadata_json = row
# Check expiry
if expires_at:
expires = datetime.fromisoformat(expires_at)
if expires < datetime.now():
return None # Token expired
metadata = json.loads(metadata_json) if metadata_json else None
return {
'token': token,
'refresh_token': refresh_token,
'expires_at': expires_at,
'metadata': metadata
}
def delete_token(self, service):
"""Delete token from database"""
conn = sqlite3.connect(self.storage_path)
cursor = conn.cursor()
cursor.execute('DELETE FROM tokens WHERE service = ?', (service,))
conn.commit()
conn.close()
# Automatic token refresh
class AutoRefreshClient:
"""Client with automatic token refresh"""
def __init__(self, client_id, client_secret, token_manager):
self.client_id = client_id
self.client_secret = client_secret
self.token_manager = token_manager
self.service_name = 'my_service'
def _get_valid_token(self):
"""Get valid token, refreshing if necessary"""
token_data = self.token_manager.get_token(self.service_name)
if not token_data:
# No token, need to authenticate
return self._authenticate()
# Check if token needs refresh
if token_data['expires_at']:
expires = datetime.fromisoformat(token_data['expires_at'])
if expires - datetime.now() < timedelta(minutes=5):
# Token expires soon, refresh it
return self._refresh_token(token_data['refresh_token'])
return token_data['token']
def _authenticate(self):
"""Perform initial authentication"""
# Implementation depends on auth method
pass
def _refresh_token(self, refresh_token):
"""Refresh access token"""
response = requests.post(
'https://api.example.com/token',
data={
'grant_type': 'refresh_token',
'refresh_token': refresh_token,
'client_id': self.client_id,
'client_secret': self.client_secret
}
)
response.raise_for_status()
token_data = response.json()
# Save new token
self.token_manager.save_token(
self.service_name,
token_data['access_token'],
token_data.get('refresh_token'),
token_data.get('expires_in')
)
return token_data['access_token']
def make_request(self, url, **kwargs):
"""Make authenticated request with automatic refresh"""
token = self._get_valid_token()
headers = kwargs.get('headers', {})
headers['Authorization'] = f'Bearer {token}'
kwargs['headers'] = headers
response = requests.get(url, **kwargs)
# Retry with new token if unauthorized
if response.status_code == 401:
token = self._refresh_token(
self.token_manager.get_token(self.service_name)['refresh_token']
)
headers['Authorization'] = f'Bearer {token}'
response = requests.get(url, **kwargs)
return response
Security Best Practices
# Security best practices for API authentication
import hashlib
import secrets
from cryptography.fernet import Fernet
class SecureAPIClient:
"""API client with security best practices"""
def __init__(self):
self.session = requests.Session()
self._configure_security()
def _configure_security(self):
"""Configure security settings"""
# 1. Use HTTPS only
self.session.mount('http://', HTTPAdapter(max_retries=0))
# 2. Verify SSL certificates
self.session.verify = True
# 3. Set secure headers
self.session.headers.update({
'User-Agent': 'SecureAPIClient/1.0',
'Accept': 'application/json',
'X-Requested-With': 'XMLHttpRequest'
})
# 4. Enable certificate pinning (optional)
# self.session.mount('https://', PinnedHTTPAdapter())
def validate_response(self, response):
"""Validate API response"""
# Check content type
content_type = response.headers.get('Content-Type', '')
if 'application/json' not in content_type:
raise ValueError(f"Unexpected content type: {content_type}")
# Check for security headers
security_headers = [
'X-Content-Type-Options',
'X-Frame-Options',
'Strict-Transport-Security'
]
for header in security_headers:
if header not in response.headers:
print(f"Warning: Missing security header {header}")
return True
# Secure credential storage
class CredentialVault:
"""Secure credential storage"""
def __init__(self, master_password):
# Derive encryption key from master password
salt = b'stable_salt' # In production, use random salt
kdf = hashlib.pbkdf2_hmac('sha256', master_password.encode(), salt, 100000)
self.cipher = Fernet(base64.urlsafe_b64encode(kdf[:32]))
self.credentials = {}
def store_credential(self, service, credential):
"""Store encrypted credential"""
encrypted = self.cipher.encrypt(credential.encode())
self.credentials[service] = encrypted
def get_credential(self, service):
"""Retrieve and decrypt credential"""
if service not in self.credentials:
return None
encrypted = self.credentials[service]
decrypted = self.cipher.decrypt(encrypted)
return decrypted.decode()
# API key rotation
class APIKeyRotation:
"""Manage API key rotation"""
def __init__(self, api_service):
self.api_service = api_service
self.current_key = None
self.previous_key = None
self.rotation_interval = timedelta(days=30)
self.last_rotation = None
def should_rotate(self):
"""Check if key should be rotated"""
if not self.last_rotation:
return True
return datetime.now() - self.last_rotation > self.rotation_interval
def rotate_key(self):
"""Rotate API key"""
# Generate new key
new_key = secrets.token_urlsafe(32)
# Keep previous key for grace period
self.previous_key = self.current_key
self.current_key = new_key
self.last_rotation = datetime.now()
# Update key with API service
self.api_service.update_api_key(new_key)
return new_key
def get_active_keys(self):
"""Get list of active keys"""
keys = [self.current_key]
if self.previous_key:
keys.append(self.previous_key)
return keys
# Rate limiting and retry with authentication
class AuthenticatedRateLimiter:
"""Rate limiter with authentication awareness"""
def __init__(self, calls_per_second=1):
self.calls_per_second = calls_per_second
self.last_call = {}
self.retry_after = {}
def wait_if_needed(self, endpoint, auth_level='standard'):
"""Wait if rate limit requires"""
# Different limits for different auth levels
limits = {
'anonymous': 0.1, # 1 call per 10 seconds
'standard': 1, # 1 call per second
'premium': 10 # 10 calls per second
}
limit = limits.get(auth_level, 1)
delay = 1.0 / limit
now = time.time()
# Check retry-after header
if endpoint in self.retry_after:
retry_time = self.retry_after[endpoint]
if now < retry_time:
time.sleep(retry_time - now)
del self.retry_after[endpoint]
return
# Check normal rate limit
if endpoint in self.last_call:
elapsed = now - self.last_call[endpoint]
if elapsed < delay:
time.sleep(delay - elapsed)
self.last_call[endpoint] = time.time()
def handle_rate_limit_response(self, response, endpoint):
"""Handle rate limit response"""
if response.status_code == 429:
# Check Retry-After header
retry_after = response.headers.get('Retry-After')
if retry_after:
if retry_after.isdigit():
self.retry_after[endpoint] = time.time() + int(retry_after)
else:
# Parse HTTP date
retry_time = datetime.strptime(retry_after, '%a, %d %b %Y %H:%M:%S GMT')
self.retry_after[endpoint] = retry_time.timestamp()
else:
# Default backoff
self.retry_after[endpoint] = time.time() + 60
Practice Exercises
Exercise 1: Multi-Service API Client
Build a unified API client that:
- Supports multiple authentication methods
- Automatically detects auth type from configuration
- Handles token refresh automatically
- Implements retry logic with backoff
- Securely stores credentials
Exercise 2: OAuth 2.0 Flow Implementation
Implement a complete OAuth 2.0 flow:
- Authorization code flow with PKCE
- Token refresh handling
- Secure token storage
- Session management
- Logout functionality
Exercise 3: API Security Auditor
Create a security audit tool that:
- Tests authentication endpoints
- Checks for security headers
- Validates SSL certificates
- Tests rate limiting
- Generates security report
Key Takeaways
- 🔐 Never hardcode credentials - use environment variables or secure storage
- 🔑 Choose the right authentication method for your use case
- 🔄 Implement automatic token refresh for better UX
- 🛡️ Always verify SSL certificates and use HTTPS
- 💾 Securely store tokens with encryption
- ⏱️ Handle rate limiting and implement retry logic
- 🔒 Use MFA when available for enhanced security