Explore More
Ready for more? Check out our advanced guides and API reference
You’ve built a complete API - now let’s deploy it to production. In this final part, we’ll containerize your application, set up CI/CD pipelines, configure monitoring, and deploy to multiple platforms with production-grade security and performance.
By the end of this part:
Before deploying, ensure you have:
Create Dockerfile:
# Multi-stage build for smaller production imageFROM python:3.12-slim as builder
# Install build dependenciesRUN apt-get update && apt-get install -y \ gcc \ && rm -rf /var/lib/apt/lists/*
# Create virtual environmentRUN python -m venv /opt/venvENV PATH="/opt/venv/bin:$PATH"
# Install Python dependenciesCOPY requirements.txt .RUN pip install --no-cache-dir -r requirements.txt
# Production stageFROM python:3.12-slim
# Install runtime dependenciesRUN apt-get update && apt-get install -y \ postgresql-client \ curl \ && rm -rf /var/lib/apt/lists/*
# Copy virtual environment from builderCOPY --from=builder /opt/venv /opt/venvENV PATH="/opt/venv/bin:$PATH"
# Create non-root user for securityRUN useradd -m -u 1000 appuser && \ mkdir -p /app && \ chown -R appuser:appuser /app
WORKDIR /app
# Copy application codeCOPY --chown=appuser:appuser . .
# Switch to non-root userUSER appuser
# Health checkHEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \ CMD curl -f http://localhost:8000/health || exit 1
# Expose portEXPOSE 8000
# Run with gunicorn for productionCMD ["gunicorn", "app.main:app", \ "--worker-class", "uvicorn.workers.UvicornWorker", \ "--workers", "4", \ "--bind", "0.0.0.0:8000", \ "--access-logfile", "-", \ "--error-logfile", "-"]Create docker-compose.yml for local production testing:
version: '3.8'
services: # PostgreSQL database postgres: image: postgres:15-alpine environment: POSTGRES_USER: ${DB_USER:-taskflow} POSTGRES_PASSWORD: ${DB_PASSWORD:-secret} POSTGRES_DB: ${DB_NAME:-taskflow} volumes: - postgres_data:/var/lib/postgresql/data ports: - "5432:5432" healthcheck: test: ["CMD-SHELL", "pg_isready -U ${DB_USER:-taskflow}"] interval: 10s timeout: 5s retries: 5
# Redis for caching and queues redis: image: redis:7-alpine command: redis-server --requirepass ${REDIS_PASSWORD:-secret} ports: - "6379:6379" volumes: - redis_data:/data healthcheck: test: ["CMD", "redis-cli", "ping"] interval: 10s timeout: 5s retries: 5
# Main application app: build: . environment: DATABASE_URL: postgresql+asyncpg://${DB_USER:-taskflow}:${DB_PASSWORD:-secret}@postgres:5432/${DB_NAME:-taskflow} REDIS_URL: redis://:${REDIS_PASSWORD:-secret}@redis:6379/0 SECRET_KEY: ${SECRET_KEY} ENVIRONMENT: production ports: - "8000:8000" depends_on: postgres: condition: service_healthy redis: condition: service_healthy volumes: - ./static:/app/static - ./uploads:/app/uploads restart: unless-stopped
# Celery worker worker: build: . command: celery -A app.celery_app worker --loglevel=info --concurrency=4 environment: DATABASE_URL: postgresql+asyncpg://${DB_USER:-taskflow}:${DB_PASSWORD:-secret}@postgres:5432/${DB_NAME:-taskflow} REDIS_URL: redis://:${REDIS_PASSWORD:-secret}@redis:6379/0 SECRET_KEY: ${SECRET_KEY} depends_on: - postgres - redis restart: unless-stopped
# Celery beat scheduler beat: build: . command: celery -A app.celery_app beat --loglevel=info environment: DATABASE_URL: postgresql+asyncpg://${DB_USER:-taskflow}:${DB_PASSWORD:-secret}@postgres:5432/${DB_NAME:-taskflow} REDIS_URL: redis://:${REDIS_PASSWORD:-secret}@redis:6379/0 SECRET_KEY: ${SECRET_KEY} depends_on: - postgres - redis restart: unless-stopped
# Nginx reverse proxy nginx: image: nginx:alpine volumes: - ./nginx.conf:/etc/nginx/nginx.conf:ro - ./static:/usr/share/nginx/html/static:ro - ./ssl:/etc/nginx/ssl:ro ports: - "80:80" - "443:443" depends_on: - app restart: unless-stopped
volumes: postgres_data: redis_data:Create app/config/production.py:
"""Production configuration with security and performance settings."""
import osfrom typing import Listimport secrets
class ProductionConfig: """Production-specific configuration."""
# Environment validation REQUIRED_ENV_VARS = [ "DATABASE_URL", "SECRET_KEY", "REDIS_URL", "SENTRY_DSN", "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY" ]
def __init__(self): self._validate_environment()
def _validate_environment(self): """Ensure all required environment variables are set.""" missing = [] for var in self.REQUIRED_ENV_VARS: if not os.getenv(var): missing.append(var)
if missing: raise ValueError(f"Missing required environment variables: {missing}")
# Security SECRET_KEY = os.getenv("SECRET_KEY") if not SECRET_KEY or SECRET_KEY == "development-secret": raise ValueError("SECRET_KEY must be set and secure in production")
# Ensure key is strong enough if len(SECRET_KEY) < 32: raise ValueError("SECRET_KEY must be at least 32 characters")
# Database DATABASE_URL = os.getenv("DATABASE_URL") DATABASE_POOL_SIZE = int(os.getenv("DATABASE_POOL_SIZE", "20")) DATABASE_MAX_OVERFLOW = int(os.getenv("DATABASE_MAX_OVERFLOW", "40")) DATABASE_POOL_TIMEOUT = int(os.getenv("DATABASE_POOL_TIMEOUT", "30"))
# Redis REDIS_URL = os.getenv("REDIS_URL") REDIS_MAX_CONNECTIONS = int(os.getenv("REDIS_MAX_CONNECTIONS", "50"))
# Security Headers SECURITY_HEADERS = { "Strict-Transport-Security": "max-age=63072000; includeSubDomains; preload", "X-Content-Type-Options": "nosniff", "X-Frame-Options": "DENY", "X-XSS-Protection": "1; mode=block", "Content-Security-Policy": "default-src 'self'; script-src 'self' 'unsafe-inline';", "Referrer-Policy": "strict-origin-when-cross-origin" }
# CORS CORS_ORIGINS: List[str] = os.getenv("CORS_ORIGINS", "").split(",") CORS_ALLOW_CREDENTIALS = True CORS_ALLOW_METHODS = ["GET", "POST", "PUT", "DELETE", "OPTIONS"] CORS_MAX_AGE = 86400 # 24 hours
# Rate Limiting RATE_LIMIT_ENABLED = True RATE_LIMIT_DEFAULT = "100/minute" RATE_LIMIT_STORAGE_URL = os.getenv("REDIS_URL")
# Session SESSION_COOKIE_SECURE = True # HTTPS only SESSION_COOKIE_HTTPONLY = True # No JS access SESSION_COOKIE_SAMESITE = "Strict" SESSION_LIFETIME_SECONDS = 3600 # 1 hour
# Monitoring SENTRY_DSN = os.getenv("SENTRY_DSN") SENTRY_ENVIRONMENT = "production" SENTRY_TRACES_SAMPLE_RATE = 0.1 # 10% of requests
# Logging LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO") LOG_FORMAT = "json" # Structured logging LOG_FILE = "/var/log/taskflow/app.log"
# Performance CACHE_TTL = 300 # 5 minutes PAGINATION_MAX_SIZE = 100 REQUEST_TIMEOUT = 30 # seconds WORKER_CONNECTIONS = 1000
# File Upload MAX_UPLOAD_SIZE = 10 * 1024 * 1024 # 10MB ALLOWED_UPLOAD_EXTENSIONS = {".jpg", ".jpeg", ".png", ".pdf", ".csv"}
# Email SMTP_HOST = os.getenv("SMTP_HOST") SMTP_PORT = int(os.getenv("SMTP_PORT", "587")) SMTP_USER = os.getenv("SMTP_USER") SMTP_PASSWORD = os.getenv("SMTP_PASSWORD") SMTP_USE_TLS = True EMAIL_FROM = os.getenv("EMAIL_FROM", "noreply@taskflow.com")
# AWS (for S3, etc.) AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID") AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY") AWS_REGION = os.getenv("AWS_REGION", "us-east-1") AWS_S3_BUCKET = os.getenv("AWS_S3_BUCKET", "taskflow-uploads")
# Create singletonconfig = ProductionConfig()Create scripts/migrate.py for production migrations:
#!/usr/bin/env python"""Run database migrations in production.
This script ensures migrations are applied safely withproper error handling and rollback capabilities."""
import asyncioimport sysfrom pathlib import Pathfrom alembic import commandfrom alembic.config import Configfrom sqlalchemy import create_engine, textimport logging
logging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__)
def check_database_connection(database_url: str) -> bool: """Verify database is accessible.""" try: engine = create_engine(database_url.replace("+asyncpg", "")) with engine.connect() as conn: conn.execute(text("SELECT 1")) logger.info("Database connection successful") return True except Exception as e: logger.error(f"Database connection failed: {e}") return False
def backup_database(database_url: str, backup_path: str) -> bool: """Create database backup before migration.""" try: import subprocess
# Parse database URL from urllib.parse import urlparse parsed = urlparse(database_url)
# Run pg_dump cmd = [ "pg_dump", "-h", parsed.hostname, "-p", str(parsed.port or 5432), "-U", parsed.username, "-d", parsed.path[1:], # Remove leading / "-f", backup_path, "--verbose" ]
# Set password via environment import os env = os.environ.copy() env["PGPASSWORD"] = parsed.password
result = subprocess.run(cmd, env=env, capture_output=True, text=True)
if result.returncode == 0: logger.info(f"Database backed up to {backup_path}") return True else: logger.error(f"Backup failed: {result.stderr}") return False
except Exception as e: logger.error(f"Backup error: {e}") return False
def run_migrations(database_url: str, backup: bool = True) -> bool: """ Run Alembic migrations with safety checks.
Args: database_url: Database connection string backup: Whether to backup before migrating
Returns: True if successful """ # Check connection if not check_database_connection(database_url): logger.error("Cannot connect to database") return False
# Backup if requested if backup: backup_path = f"backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}.sql" if not backup_database(database_url, backup_path): logger.warning("Backup failed, continue anyway? (y/n)") if input().lower() != 'y': return False
try: # Configure Alembic alembic_cfg = Config("alembic.ini") alembic_cfg.set_main_option("sqlalchemy.url", database_url)
# Show current revision logger.info("Current database revision:") command.current(alembic_cfg)
# Show pending migrations logger.info("Pending migrations:") command.history(alembic_cfg)
# Run migrations logger.info("Running migrations...") command.upgrade(alembic_cfg, "head")
logger.info("Migrations completed successfully") return True
except Exception as e: logger.error(f"Migration failed: {e}") logger.error("Run rollback if needed: alembic downgrade -1") return False
if __name__ == "__main__": import os from dotenv import load_dotenv
load_dotenv()
database_url = os.getenv("DATABASE_URL") if not database_url: logger.error("DATABASE_URL not set") sys.exit(1)
# Run with backup by default success = run_migrations(database_url, backup=True) sys.exit(0 if success else 1)Create nginx.conf:
worker_processes auto;error_log /var/log/nginx/error.log warn;pid /var/run/nginx.pid;
events { worker_connections 4096; use epoll;}
http { include /etc/nginx/mime.types; default_type application/octet-stream;
# Logging log_format main '$remote_addr - $remote_user [$time_local] "$request" ' '$status $body_bytes_sent "$http_referer" ' '"$http_user_agent" "$http_x_forwarded_for" ' 'rt=$request_time uct="$upstream_connect_time" ' 'uht="$upstream_header_time" urt="$upstream_response_time"';
access_log /var/log/nginx/access.log main;
# Performance sendfile on; tcp_nopush on; tcp_nodelay on; keepalive_timeout 65; types_hash_max_size 2048;
# Gzip gzip on; gzip_vary on; gzip_min_length 1024; gzip_types text/plain text/css text/xml text/javascript application/json application/javascript application/xml+rss application/rss+xml application/atom+xml image/svg+xml text/x-js text/x-cross-domain-policy application/x-font-ttf application/x-font-opentype application/vnd.ms-fontobject image/x-icon;
# Rate limiting limit_req_zone $binary_remote_addr zone=api:10m rate=10r/s; limit_req_zone $binary_remote_addr zone=auth:10m rate=5r/m;
# Upstream upstream app { least_conn; server app:8000 max_fails=3 fail_timeout=30s; # Add more servers for load balancing # server app2:8000 max_fails=3 fail_timeout=30s; keepalive 32; }
# Redirect HTTP to HTTPS server { listen 80; server_name taskflow.com www.taskflow.com; return 301 https://$server_name$request_uri; }
# HTTPS server server { listen 443 ssl http2; server_name taskflow.com;
# SSL ssl_certificate /etc/nginx/ssl/cert.pem; ssl_certificate_key /etc/nginx/ssl/key.pem; ssl_protocols TLSv1.2 TLSv1.3; ssl_ciphers HIGH:!aNULL:!MD5; ssl_prefer_server_ciphers on; ssl_session_cache shared:SSL:10m; ssl_session_timeout 10m;
# Security headers add_header Strict-Transport-Security "max-age=63072000; includeSubDomains; preload" always; add_header X-Frame-Options DENY always; add_header X-Content-Type-Options nosniff always; add_header X-XSS-Protection "1; mode=block" always;
# Max body size for uploads client_max_body_size 10M;
# Static files location /static/ { alias /usr/share/nginx/html/static/; expires 30d; add_header Cache-Control "public, immutable"; }
# Health check location /health { proxy_pass http://app; access_log off; }
# Authentication endpoints (strict rate limiting) location ~ ^/(auth|login|register) { limit_req zone=auth burst=2 nodelay; proxy_pass http://app; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-Proto $scheme; }
# API endpoints location / { limit_req zone=api burst=20 nodelay;
proxy_pass http://app; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "upgrade"; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-Proto $scheme;
# Timeouts proxy_connect_timeout 60s; proxy_send_timeout 60s; proxy_read_timeout 60s;
# Buffering proxy_buffering off; proxy_request_buffering off; }
# WebSocket support location /ws/ { proxy_pass http://app; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "upgrade"; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; } }}Create scripts/deploy.sh:
#!/bin/bashset -e
echo "🚀 Starting deployment..."
# ConfigurationENVIRONMENT=${1:-staging}VERSION=$(git rev-parse --short HEAD)TIMESTAMP=$(date +%Y%m%d-%H%M%S)
# Colors for outputRED='\033[0;31m'GREEN='\033[0;32m'YELLOW='\033[1;33m'NC='\033[0m'
echo -e "${YELLOW}Deploying version ${VERSION} to ${ENVIRONMENT}${NC}"
# Pre-deployment checksecho "📋 Running pre-deployment checks..."
# Check if all tests passecho "Running tests..."pytest tests/ --quietif [ $? -ne 0 ]; then echo -e "${RED}Tests failed! Aborting deployment.${NC}" exit 1fi
# Check for uncommitted changesif [ -n "$(git status --porcelain)" ]; then echo -e "${YELLOW}Warning: Uncommitted changes detected${NC}" read -p "Continue anyway? (y/n) " -n 1 -r echo if [[ ! $REPLY =~ ^[Yy]$ ]]; then exit 1 fifi
# Build Docker imageecho "🐳 Building Docker image..."docker build -t taskflow:${VERSION} .
# Tag for registrydocker tag taskflow:${VERSION} registry.example.com/taskflow:${VERSION}docker tag taskflow:${VERSION} registry.example.com/taskflow:latest
# Push to registryecho "📤 Pushing to registry..."docker push registry.example.com/taskflow:${VERSION}docker push registry.example.com/taskflow:latest
# Run database migrationsecho "🗄️ Running database migrations..."python scripts/migrate.py
# Deploy based on environmentcase $ENVIRONMENT in staging) echo "🎬 Deploying to staging..." kubectl set image deployment/taskflow-api \ taskflow=registry.example.com/taskflow:${VERSION} \ -n staging ;;
production) echo "🚀 Deploying to production..."
# Create backup echo "Creating backup..." kubectl exec -it postgres-0 -n production -- \ pg_dump -U taskflow taskflow > backup-${TIMESTAMP}.sql
# Rolling deployment kubectl set image deployment/taskflow-api \ taskflow=registry.example.com/taskflow:${VERSION} \ -n production \ --record
# Wait for rollout kubectl rollout status deployment/taskflow-api -n production ;;
*) echo -e "${RED}Unknown environment: ${ENVIRONMENT}${NC}" exit 1 ;;esac
# Post-deployment checksecho "Running post-deployment checks..."
# Health checkHEALTH_URL="https://${ENVIRONMENT}.taskflow.com/health"HTTP_CODE=$(curl -s -o /dev/null -w "%{http_code}" $HEALTH_URL)
if [ $HTTP_CODE -eq 200 ]; then echo -e "${GREEN}Health check passed!${NC}"else echo -e "${RED}Health check failed! HTTP ${HTTP_CODE}${NC}" echo "Rolling back..." kubectl rollout undo deployment/taskflow-api -n ${ENVIRONMENT} exit 1fi
# Smoke testsecho "Running smoke tests..."python scripts/smoke_tests.py --env ${ENVIRONMENT}
# Send deployment notificationcurl -X POST https://hooks.slack.com/services/YOUR/WEBHOOK/URL \ -H 'Content-Type: application/json' \ -d "{ \"text\": \"Deployment successful!\", \"attachments\": [{ \"color\": \"good\", \"fields\": [ {\"title\": \"Environment\", \"value\": \"${ENVIRONMENT}\", \"short\": true}, {\"title\": \"Version\", \"value\": \"${VERSION}\", \"short\": true}, {\"title\": \"Deployed by\", \"value\": \"$(git config user.name)\", \"short\": true}, {\"title\": \"Time\", \"value\": \"${TIMESTAMP}\", \"short\": true} ] }] }"
echo -e "${GREEN}Deployment complete!${NC}"Create railway.json:
{ "$schema": "https://railway.app/railway.schema.json", "build": { "builder": "DOCKERFILE" }, "deploy": { "startCommand": "python scripts/migrate.py && gunicorn app.main:app --worker-class uvicorn.workers.UvicornWorker", "healthcheckPath": "/health", "restartPolicyType": "ON_FAILURE", "restartPolicyMaxRetries": 3 }, "environments": { "production": { "ENVIRONMENT": "production", "LOG_LEVEL": "INFO" } }}Deploy with Railway CLI:
# Install Railway CLInpm install -g @railway/cli
# Loginrailway login
# Initialize projectrailway init
# Add PostgreSQL and Redisrailway add postgresqlrailway add redis
# Deployrailway upCreate fly.toml:
app = "taskflow-api"
[build] dockerfile = "Dockerfile"
[env] PORT = "8000" ENVIRONMENT = "production"
[experimental] auto_rollback = true
[[services]] http_checks = [] internal_port = 8000 protocol = "tcp" script_checks = []
[services.concurrency] hard_limit = 100 soft_limit = 80 type = "connections"
[[services.ports]] force_https = true handlers = ["http"] port = 80
[[services.ports]] handlers = ["tls", "http"] port = 443
[[services.tcp_checks]] grace_period = "5s" interval = "15s" restart_limit = 0 timeout = "2s"
[[services.http_checks]] interval = 10000 grace_period = "5s" method = "get" path = "/health" protocol = "http" restart_limit = 0 timeout = 2000 tls_skip_verify = false
[mounts] destination = "/app/data" source = "taskflow_data"Deploy:
# Install Fly CLIcurl -L https://fly.io/install.sh | sh
# Loginfly auth login
# Create appfly apps create taskflow-api
# Add PostgreSQLfly postgres createfly postgres attach
# Add Redisfly redis create
# Deployfly deploy
# Scalefly scale count 2Add metrics to your app:
from prometheus_client import Counter, Histogram, Gauge, generate_latestimport time
# Metricsrequest_count = Counter( 'http_requests_total', 'Total HTTP requests', ['method', 'endpoint', 'status'])
request_duration = Histogram( 'http_request_duration_seconds', 'HTTP request duration', ['method', 'endpoint'])
active_users = Gauge( 'active_users_total', 'Number of active users')
task_queue_size = Gauge( 'task_queue_size', 'Number of tasks in queue', ['queue'])
@app.middleware("http")async def metrics_middleware(request, call_next): """Track metrics for each request.""" start_time = time.time()
response = await call_next(request)
duration = time.time() - start_time request_count.labels( method=request.method, endpoint=request.url.path, status=response.status_code ).inc()
request_duration.labels( method=request.method, endpoint=request.url.path ).observe(duration)
return response
@app.get("/metrics")async def metrics(): """Prometheus metrics endpoint.""" return Response( content=generate_latest(), media_type="text/plain" )Configure production logging:
import loggingimport jsonfrom pythonjsonlogger import jsonlogger
def setup_logging(environment: str): """Configure structured logging for production."""
# JSON formatter formatter = jsonlogger.JsonFormatter( fmt='%(asctime)s %(levelname)s %(name)s %(message)s', rename_fields={ 'asctime': 'timestamp', 'levelname': 'level', 'name': 'logger' } )
# Console handler console_handler = logging.StreamHandler() console_handler.setFormatter(formatter)
# File handler for production if environment == "production": file_handler = logging.FileHandler('/var/log/taskflow/app.log') file_handler.setFormatter(formatter) logging.root.addHandler(file_handler)
# Configure root logger logging.root.setLevel(logging.INFO) logging.root.addHandler(console_handler)
# Add request ID to logs import contextvars request_id_var = contextvars.ContextVar('request_id', default=None)
class RequestIdFilter(logging.Filter): def filter(self, record): record.request_id = request_id_var.get() return True
logging.root.addFilter(RequestIdFilter())Configure Sentry:
import sentry_sdkfrom sentry_sdk.integrations.asgi import SentryAsgiMiddlewarefrom sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration
def init_sentry(app, dsn: str, environment: str): """Initialize Sentry error tracking."""
sentry_sdk.init( dsn=dsn, environment=environment, integrations=[ SqlalchemyIntegration(), ], traces_sample_rate=0.1, # 10% of transactions profiles_sample_rate=0.1, # 10% profiling attach_stacktrace=True, send_default_pii=False, # Don't send personally identifiable info
# Filter sensitive data before_send=filter_sensitive_data )
# Add middleware app.add_middleware(SentryAsgiMiddleware)
def filter_sensitive_data(event, hint): """Remove sensitive data before sending to Sentry."""
# Remove passwords, tokens, etc. if 'request' in event and 'data' in event['request']: data = event['request']['data'] for key in ['password', 'token', 'secret', 'api_key']: if key in data: data[key] = '[FILTERED]'
return eventImplement multi-level caching:
import redisfrom functools import wrapsimport pickleimport hashlib
redis_client = redis.from_url(settings.REDIS_URL)
def cache_key(*args, **kwargs): """Generate cache key from arguments.""" key_data = f"{args}:{kwargs}".encode() return hashlib.md5(key_data).hexdigest()
def cached(ttl=300): """Cache decorator for expensive operations."""
def decorator(func): @wraps(func) async def wrapper(*args, **kwargs): # Generate cache key key = f"cache:{func.__name__}:{cache_key(*args, **kwargs)}"
# Try to get from cache cached_value = redis_client.get(key) if cached_value: return pickle.loads(cached_value)
# Execute function result = await func(*args, **kwargs)
# Store in cache redis_client.setex( key, ttl, pickle.dumps(result) )
return result
return wrapper return decorator
# Usage@cached(ttl=600)async def get_user_stats(user_id: int): # Expensive calculation return calculate_stats(user_id)Optimize queries and connections:
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSessionfrom sqlalchemy.pool import NullPool, QueuePool
def create_optimized_engine(database_url: str, environment: str): """Create optimized database engine for production."""
if environment == "production": return create_async_engine( database_url, # Connection pooling poolclass=QueuePool, pool_size=20, max_overflow=40, pool_timeout=30, pool_recycle=1800, # Recycle connections after 30 minutes
# Performance echo=False, connect_args={ "server_settings": { "application_name": "taskflow_api", "jit": "off" }, "command_timeout": 60, "prepared_statement_cache_size": 0, # Disable for PgBouncer } ) else: # Development settings return create_async_engine( database_url, echo=True, poolclass=NullPool # No pooling in dev )Implement these security measures:
from zenith import Request, HTTPExceptionfrom zenith.security import HTTPBearerimport hmacimport hashlib
class SecurityMiddleware: """Production security middleware."""
def __init__(self, app): self.app = app
async def __call__(self, request: Request, call_next): # Check for common attacks if self._is_suspicious_request(request): raise HTTPException(status_code=403, detail="Forbidden")
# Add security headers response = await call_next(request)
response.headers["X-Content-Type-Options"] = "nosniff" response.headers["X-Frame-Options"] = "DENY" response.headers["X-XSS-Protection"] = "1; mode=block" response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
# HSTS for HTTPS if request.url.scheme == "https": response.headers["Strict-Transport-Security"] = ( "max-age=63072000; includeSubDomains; preload" )
return response
def _is_suspicious_request(self, request: Request) -> bool: """Detect suspicious requests."""
# Check for SQL injection attempts suspicious_patterns = [ "union select", "drop table", "<script>", "javascript:", "../", "..\\", ]
url = str(request.url) for pattern in suspicious_patterns: if pattern in url.lower(): return True
# Check for excessive parameters if len(request.query_params) > 20: return True
return False
# API key validation for admin endpointsdef verify_api_key(api_key: str) -> bool: """Verify API key using constant-time comparison.""" expected = settings.ADMIN_API_KEY.encode() provided = api_key.encode() return hmac.compare_digest(expected, provided)Solution: Check health endpoint, ensure database migrations ran, verify environment variables
Solution: Limit worker connections, implement connection pooling, use pagination
Solution: Add caching, optimize database queries, use CDN for static files
In this final part, you’ve implemented: Docker containerization with multi-stage builds Production configuration management Database migrations and backups CI/CD pipeline with GitHub Actions Deployment to multiple platforms Nginx reverse proxy configuration Monitoring with Prometheus and Grafana Structured logging and error tracking Security hardening and SSL Performance optimization strategies
Congratulations! You’ve built and deployed a production-ready API. You now have:
Continue learning with:
Explore More
Ready for more? Check out our advanced guides and API reference
Thank you for following this tutorial! If you built something cool, share it in our GitHub Discussions.