Skip to content

Part 7: Deployment

You’ve built a complete API - now let’s deploy it to production. In this final part, we’ll containerize your application, set up CI/CD pipelines, configure monitoring, and deploy to multiple platforms with production-grade security and performance.

By the end of this part:

  • Docker containers for app and workers
  • Production configuration management
  • Database migrations in production
  • CI/CD pipeline with GitHub Actions
  • Deployment to Railway, Fly.io, and AWS
  • Monitoring with Prometheus and Grafana
  • Logging with structured logs
  • Security hardening and SSL

Before deploying, ensure you have:

  • Environment variables configured
  • Database migrations ready
  • Static files served properly
  • Security headers enabled
  • CORS configured correctly
  • Rate limiting active
  • Error tracking setup
  • Monitoring configured
  • Backup strategy defined
  • Rollback plan ready

Create Dockerfile:

# Multi-stage build for smaller production image
FROM python:3.12-slim as builder
# Install build dependencies
RUN apt-get update && apt-get install -y \
gcc \
&& rm -rf /var/lib/apt/lists/*
# Create virtual environment
RUN python -m venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"
# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Production stage
FROM python:3.12-slim
# Install runtime dependencies
RUN apt-get update && apt-get install -y \
postgresql-client \
curl \
&& rm -rf /var/lib/apt/lists/*
# Copy virtual environment from builder
COPY --from=builder /opt/venv /opt/venv
ENV PATH="/opt/venv/bin:$PATH"
# Create non-root user for security
RUN useradd -m -u 1000 appuser && \
mkdir -p /app && \
chown -R appuser:appuser /app
WORKDIR /app
# Copy application code
COPY --chown=appuser:appuser . .
# Switch to non-root user
USER appuser
# Health check
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1
# Expose port
EXPOSE 8000
# Run with gunicorn for production
CMD ["gunicorn", "app.main:app", \
"--worker-class", "uvicorn.workers.UvicornWorker", \
"--workers", "4", \
"--bind", "0.0.0.0:8000", \
"--access-logfile", "-", \
"--error-logfile", "-"]

Create docker-compose.yml for local production testing:

version: '3.8'
services:
# PostgreSQL database
postgres:
image: postgres:15-alpine
environment:
POSTGRES_USER: ${DB_USER:-taskflow}
POSTGRES_PASSWORD: ${DB_PASSWORD:-secret}
POSTGRES_DB: ${DB_NAME:-taskflow}
volumes:
- postgres_data:/var/lib/postgresql/data
ports:
- "5432:5432"
healthcheck:
test: ["CMD-SHELL", "pg_isready -U ${DB_USER:-taskflow}"]
interval: 10s
timeout: 5s
retries: 5
# Redis for caching and queues
redis:
image: redis:7-alpine
command: redis-server --requirepass ${REDIS_PASSWORD:-secret}
ports:
- "6379:6379"
volumes:
- redis_data:/data
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 5s
retries: 5
# Main application
app:
build: .
environment:
DATABASE_URL: postgresql+asyncpg://${DB_USER:-taskflow}:${DB_PASSWORD:-secret}@postgres:5432/${DB_NAME:-taskflow}
REDIS_URL: redis://:${REDIS_PASSWORD:-secret}@redis:6379/0
SECRET_KEY: ${SECRET_KEY}
ENVIRONMENT: production
ports:
- "8000:8000"
depends_on:
postgres:
condition: service_healthy
redis:
condition: service_healthy
volumes:
- ./static:/app/static
- ./uploads:/app/uploads
restart: unless-stopped
# Celery worker
worker:
build: .
command: celery -A app.celery_app worker --loglevel=info --concurrency=4
environment:
DATABASE_URL: postgresql+asyncpg://${DB_USER:-taskflow}:${DB_PASSWORD:-secret}@postgres:5432/${DB_NAME:-taskflow}
REDIS_URL: redis://:${REDIS_PASSWORD:-secret}@redis:6379/0
SECRET_KEY: ${SECRET_KEY}
depends_on:
- postgres
- redis
restart: unless-stopped
# Celery beat scheduler
beat:
build: .
command: celery -A app.celery_app beat --loglevel=info
environment:
DATABASE_URL: postgresql+asyncpg://${DB_USER:-taskflow}:${DB_PASSWORD:-secret}@postgres:5432/${DB_NAME:-taskflow}
REDIS_URL: redis://:${REDIS_PASSWORD:-secret}@redis:6379/0
SECRET_KEY: ${SECRET_KEY}
depends_on:
- postgres
- redis
restart: unless-stopped
# Nginx reverse proxy
nginx:
image: nginx:alpine
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf:ro
- ./static:/usr/share/nginx/html/static:ro
- ./ssl:/etc/nginx/ssl:ro
ports:
- "80:80"
- "443:443"
depends_on:
- app
restart: unless-stopped
volumes:
postgres_data:
redis_data:

Create app/config/production.py:

"""
Production configuration with security and performance settings.
"""
import os
from typing import List
import secrets
class ProductionConfig:
"""Production-specific configuration."""
# Environment validation
REQUIRED_ENV_VARS = [
"DATABASE_URL",
"SECRET_KEY",
"REDIS_URL",
"SENTRY_DSN",
"AWS_ACCESS_KEY_ID",
"AWS_SECRET_ACCESS_KEY"
]
def __init__(self):
self._validate_environment()
def _validate_environment(self):
"""Ensure all required environment variables are set."""
missing = []
for var in self.REQUIRED_ENV_VARS:
if not os.getenv(var):
missing.append(var)
if missing:
raise ValueError(f"Missing required environment variables: {missing}")
# Security
SECRET_KEY = os.getenv("SECRET_KEY")
if not SECRET_KEY or SECRET_KEY == "development-secret":
raise ValueError("SECRET_KEY must be set and secure in production")
# Ensure key is strong enough
if len(SECRET_KEY) < 32:
raise ValueError("SECRET_KEY must be at least 32 characters")
# Database
DATABASE_URL = os.getenv("DATABASE_URL")
DATABASE_POOL_SIZE = int(os.getenv("DATABASE_POOL_SIZE", "20"))
DATABASE_MAX_OVERFLOW = int(os.getenv("DATABASE_MAX_OVERFLOW", "40"))
DATABASE_POOL_TIMEOUT = int(os.getenv("DATABASE_POOL_TIMEOUT", "30"))
# Redis
REDIS_URL = os.getenv("REDIS_URL")
REDIS_MAX_CONNECTIONS = int(os.getenv("REDIS_MAX_CONNECTIONS", "50"))
# Security Headers
SECURITY_HEADERS = {
"Strict-Transport-Security": "max-age=63072000; includeSubDomains; preload",
"X-Content-Type-Options": "nosniff",
"X-Frame-Options": "DENY",
"X-XSS-Protection": "1; mode=block",
"Content-Security-Policy": "default-src 'self'; script-src 'self' 'unsafe-inline';",
"Referrer-Policy": "strict-origin-when-cross-origin"
}
# CORS
CORS_ORIGINS: List[str] = os.getenv("CORS_ORIGINS", "").split(",")
CORS_ALLOW_CREDENTIALS = True
CORS_ALLOW_METHODS = ["GET", "POST", "PUT", "DELETE", "OPTIONS"]
CORS_MAX_AGE = 86400 # 24 hours
# Rate Limiting
RATE_LIMIT_ENABLED = True
RATE_LIMIT_DEFAULT = "100/minute"
RATE_LIMIT_STORAGE_URL = os.getenv("REDIS_URL")
# Session
SESSION_COOKIE_SECURE = True # HTTPS only
SESSION_COOKIE_HTTPONLY = True # No JS access
SESSION_COOKIE_SAMESITE = "Strict"
SESSION_LIFETIME_SECONDS = 3600 # 1 hour
# Monitoring
SENTRY_DSN = os.getenv("SENTRY_DSN")
SENTRY_ENVIRONMENT = "production"
SENTRY_TRACES_SAMPLE_RATE = 0.1 # 10% of requests
# Logging
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO")
LOG_FORMAT = "json" # Structured logging
LOG_FILE = "/var/log/taskflow/app.log"
# Performance
CACHE_TTL = 300 # 5 minutes
PAGINATION_MAX_SIZE = 100
REQUEST_TIMEOUT = 30 # seconds
WORKER_CONNECTIONS = 1000
# File Upload
MAX_UPLOAD_SIZE = 10 * 1024 * 1024 # 10MB
ALLOWED_UPLOAD_EXTENSIONS = {".jpg", ".jpeg", ".png", ".pdf", ".csv"}
# Email
SMTP_HOST = os.getenv("SMTP_HOST")
SMTP_PORT = int(os.getenv("SMTP_PORT", "587"))
SMTP_USER = os.getenv("SMTP_USER")
SMTP_PASSWORD = os.getenv("SMTP_PASSWORD")
SMTP_USE_TLS = True
EMAIL_FROM = os.getenv("EMAIL_FROM", "noreply@taskflow.com")
# AWS (for S3, etc.)
AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID")
AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY")
AWS_REGION = os.getenv("AWS_REGION", "us-east-1")
AWS_S3_BUCKET = os.getenv("AWS_S3_BUCKET", "taskflow-uploads")
# Create singleton
config = ProductionConfig()

Create scripts/migrate.py for production migrations:

#!/usr/bin/env python
"""
Run database migrations in production.
This script ensures migrations are applied safely with
proper error handling and rollback capabilities.
"""
import asyncio
import sys
from pathlib import Path
from alembic import command
from alembic.config import Config
from sqlalchemy import create_engine, text
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def check_database_connection(database_url: str) -> bool:
"""Verify database is accessible."""
try:
engine = create_engine(database_url.replace("+asyncpg", ""))
with engine.connect() as conn:
conn.execute(text("SELECT 1"))
logger.info("Database connection successful")
return True
except Exception as e:
logger.error(f"Database connection failed: {e}")
return False
def backup_database(database_url: str, backup_path: str) -> bool:
"""Create database backup before migration."""
try:
import subprocess
# Parse database URL
from urllib.parse import urlparse
parsed = urlparse(database_url)
# Run pg_dump
cmd = [
"pg_dump",
"-h", parsed.hostname,
"-p", str(parsed.port or 5432),
"-U", parsed.username,
"-d", parsed.path[1:], # Remove leading /
"-f", backup_path,
"--verbose"
]
# Set password via environment
import os
env = os.environ.copy()
env["PGPASSWORD"] = parsed.password
result = subprocess.run(cmd, env=env, capture_output=True, text=True)
if result.returncode == 0:
logger.info(f"Database backed up to {backup_path}")
return True
else:
logger.error(f"Backup failed: {result.stderr}")
return False
except Exception as e:
logger.error(f"Backup error: {e}")
return False
def run_migrations(database_url: str, backup: bool = True) -> bool:
"""
Run Alembic migrations with safety checks.
Args:
database_url: Database connection string
backup: Whether to backup before migrating
Returns:
True if successful
"""
# Check connection
if not check_database_connection(database_url):
logger.error("Cannot connect to database")
return False
# Backup if requested
if backup:
backup_path = f"backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}.sql"
if not backup_database(database_url, backup_path):
logger.warning("Backup failed, continue anyway? (y/n)")
if input().lower() != 'y':
return False
try:
# Configure Alembic
alembic_cfg = Config("alembic.ini")
alembic_cfg.set_main_option("sqlalchemy.url", database_url)
# Show current revision
logger.info("Current database revision:")
command.current(alembic_cfg)
# Show pending migrations
logger.info("Pending migrations:")
command.history(alembic_cfg)
# Run migrations
logger.info("Running migrations...")
command.upgrade(alembic_cfg, "head")
logger.info("Migrations completed successfully")
return True
except Exception as e:
logger.error(f"Migration failed: {e}")
logger.error("Run rollback if needed: alembic downgrade -1")
return False
if __name__ == "__main__":
import os
from dotenv import load_dotenv
load_dotenv()
database_url = os.getenv("DATABASE_URL")
if not database_url:
logger.error("DATABASE_URL not set")
sys.exit(1)
# Run with backup by default
success = run_migrations(database_url, backup=True)
sys.exit(0 if success else 1)

Create nginx.conf:

worker_processes auto;
error_log /var/log/nginx/error.log warn;
pid /var/run/nginx.pid;
events {
worker_connections 4096;
use epoll;
}
http {
include /etc/nginx/mime.types;
default_type application/octet-stream;
# Logging
log_format main '$remote_addr - $remote_user [$time_local] "$request" '
'$status $body_bytes_sent "$http_referer" '
'"$http_user_agent" "$http_x_forwarded_for" '
'rt=$request_time uct="$upstream_connect_time" '
'uht="$upstream_header_time" urt="$upstream_response_time"';
access_log /var/log/nginx/access.log main;
# Performance
sendfile on;
tcp_nopush on;
tcp_nodelay on;
keepalive_timeout 65;
types_hash_max_size 2048;
# Gzip
gzip on;
gzip_vary on;
gzip_min_length 1024;
gzip_types text/plain text/css text/xml text/javascript
application/json application/javascript application/xml+rss
application/rss+xml application/atom+xml image/svg+xml
text/x-js text/x-cross-domain-policy application/x-font-ttf
application/x-font-opentype application/vnd.ms-fontobject
image/x-icon;
# Rate limiting
limit_req_zone $binary_remote_addr zone=api:10m rate=10r/s;
limit_req_zone $binary_remote_addr zone=auth:10m rate=5r/m;
# Upstream
upstream app {
least_conn;
server app:8000 max_fails=3 fail_timeout=30s;
# Add more servers for load balancing
# server app2:8000 max_fails=3 fail_timeout=30s;
keepalive 32;
}
# Redirect HTTP to HTTPS
server {
listen 80;
server_name taskflow.com www.taskflow.com;
return 301 https://$server_name$request_uri;
}
# HTTPS server
server {
listen 443 ssl http2;
server_name taskflow.com;
# SSL
ssl_certificate /etc/nginx/ssl/cert.pem;
ssl_certificate_key /etc/nginx/ssl/key.pem;
ssl_protocols TLSv1.2 TLSv1.3;
ssl_ciphers HIGH:!aNULL:!MD5;
ssl_prefer_server_ciphers on;
ssl_session_cache shared:SSL:10m;
ssl_session_timeout 10m;
# Security headers
add_header Strict-Transport-Security "max-age=63072000; includeSubDomains; preload" always;
add_header X-Frame-Options DENY always;
add_header X-Content-Type-Options nosniff always;
add_header X-XSS-Protection "1; mode=block" always;
# Max body size for uploads
client_max_body_size 10M;
# Static files
location /static/ {
alias /usr/share/nginx/html/static/;
expires 30d;
add_header Cache-Control "public, immutable";
}
# Health check
location /health {
proxy_pass http://app;
access_log off;
}
# Authentication endpoints (strict rate limiting)
location ~ ^/(auth|login|register) {
limit_req zone=auth burst=2 nodelay;
proxy_pass http://app;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
# API endpoints
location / {
limit_req zone=api burst=20 nodelay;
proxy_pass http://app;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# Timeouts
proxy_connect_timeout 60s;
proxy_send_timeout 60s;
proxy_read_timeout 60s;
# Buffering
proxy_buffering off;
proxy_request_buffering off;
}
# WebSocket support
location /ws/ {
proxy_pass http://app;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
}
}
}

Create scripts/deploy.sh:

#!/bin/bash
set -e
echo "🚀 Starting deployment..."
# Configuration
ENVIRONMENT=${1:-staging}
VERSION=$(git rev-parse --short HEAD)
TIMESTAMP=$(date +%Y%m%d-%H%M%S)
# Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m'
echo -e "${YELLOW}Deploying version ${VERSION} to ${ENVIRONMENT}${NC}"
# Pre-deployment checks
echo "📋 Running pre-deployment checks..."
# Check if all tests pass
echo "Running tests..."
pytest tests/ --quiet
if [ $? -ne 0 ]; then
echo -e "${RED}Tests failed! Aborting deployment.${NC}"
exit 1
fi
# Check for uncommitted changes
if [ -n "$(git status --porcelain)" ]; then
echo -e "${YELLOW}Warning: Uncommitted changes detected${NC}"
read -p "Continue anyway? (y/n) " -n 1 -r
echo
if [[ ! $REPLY =~ ^[Yy]$ ]]; then
exit 1
fi
fi
# Build Docker image
echo "🐳 Building Docker image..."
docker build -t taskflow:${VERSION} .
# Tag for registry
docker tag taskflow:${VERSION} registry.example.com/taskflow:${VERSION}
docker tag taskflow:${VERSION} registry.example.com/taskflow:latest
# Push to registry
echo "📤 Pushing to registry..."
docker push registry.example.com/taskflow:${VERSION}
docker push registry.example.com/taskflow:latest
# Run database migrations
echo "🗄️ Running database migrations..."
python scripts/migrate.py
# Deploy based on environment
case $ENVIRONMENT in
staging)
echo "🎬 Deploying to staging..."
kubectl set image deployment/taskflow-api \
taskflow=registry.example.com/taskflow:${VERSION} \
-n staging
;;
production)
echo "🚀 Deploying to production..."
# Create backup
echo "Creating backup..."
kubectl exec -it postgres-0 -n production -- \
pg_dump -U taskflow taskflow > backup-${TIMESTAMP}.sql
# Rolling deployment
kubectl set image deployment/taskflow-api \
taskflow=registry.example.com/taskflow:${VERSION} \
-n production \
--record
# Wait for rollout
kubectl rollout status deployment/taskflow-api -n production
;;
*)
echo -e "${RED}Unknown environment: ${ENVIRONMENT}${NC}"
exit 1
;;
esac
# Post-deployment checks
echo "Running post-deployment checks..."
# Health check
HEALTH_URL="https://${ENVIRONMENT}.taskflow.com/health"
HTTP_CODE=$(curl -s -o /dev/null -w "%{http_code}" $HEALTH_URL)
if [ $HTTP_CODE -eq 200 ]; then
echo -e "${GREEN}Health check passed!${NC}"
else
echo -e "${RED}Health check failed! HTTP ${HTTP_CODE}${NC}"
echo "Rolling back..."
kubectl rollout undo deployment/taskflow-api -n ${ENVIRONMENT}
exit 1
fi
# Smoke tests
echo "Running smoke tests..."
python scripts/smoke_tests.py --env ${ENVIRONMENT}
# Send deployment notification
curl -X POST https://hooks.slack.com/services/YOUR/WEBHOOK/URL \
-H 'Content-Type: application/json' \
-d "{
\"text\": \"Deployment successful!\",
\"attachments\": [{
\"color\": \"good\",
\"fields\": [
{\"title\": \"Environment\", \"value\": \"${ENVIRONMENT}\", \"short\": true},
{\"title\": \"Version\", \"value\": \"${VERSION}\", \"short\": true},
{\"title\": \"Deployed by\", \"value\": \"$(git config user.name)\", \"short\": true},
{\"title\": \"Time\", \"value\": \"${TIMESTAMP}\", \"short\": true}
]
}]
}"
echo -e "${GREEN}Deployment complete!${NC}"

Create railway.json:

{
"$schema": "https://railway.app/railway.schema.json",
"build": {
"builder": "DOCKERFILE"
},
"deploy": {
"startCommand": "python scripts/migrate.py && gunicorn app.main:app --worker-class uvicorn.workers.UvicornWorker",
"healthcheckPath": "/health",
"restartPolicyType": "ON_FAILURE",
"restartPolicyMaxRetries": 3
},
"environments": {
"production": {
"ENVIRONMENT": "production",
"LOG_LEVEL": "INFO"
}
}
}

Deploy with Railway CLI:

Terminal window
# Install Railway CLI
npm install -g @railway/cli
# Login
railway login
# Initialize project
railway init
# Add PostgreSQL and Redis
railway add postgresql
railway add redis
# Deploy
railway up

Create fly.toml:

app = "taskflow-api"
[build]
dockerfile = "Dockerfile"
[env]
PORT = "8000"
ENVIRONMENT = "production"
[experimental]
auto_rollback = true
[[services]]
http_checks = []
internal_port = 8000
protocol = "tcp"
script_checks = []
[services.concurrency]
hard_limit = 100
soft_limit = 80
type = "connections"
[[services.ports]]
force_https = true
handlers = ["http"]
port = 80
[[services.ports]]
handlers = ["tls", "http"]
port = 443
[[services.tcp_checks]]
grace_period = "5s"
interval = "15s"
restart_limit = 0
timeout = "2s"
[[services.http_checks]]
interval = 10000
grace_period = "5s"
method = "get"
path = "/health"
protocol = "http"
restart_limit = 0
timeout = 2000
tls_skip_verify = false
[mounts]
destination = "/app/data"
source = "taskflow_data"

Deploy:

Terminal window
# Install Fly CLI
curl -L https://fly.io/install.sh | sh
# Login
fly auth login
# Create app
fly apps create taskflow-api
# Add PostgreSQL
fly postgres create
fly postgres attach
# Add Redis
fly redis create
# Deploy
fly deploy
# Scale
fly scale count 2

Add metrics to your app:

app/monitoring/metrics.py
from prometheus_client import Counter, Histogram, Gauge, generate_latest
import time
# Metrics
request_count = Counter(
'http_requests_total',
'Total HTTP requests',
['method', 'endpoint', 'status']
)
request_duration = Histogram(
'http_request_duration_seconds',
'HTTP request duration',
['method', 'endpoint']
)
active_users = Gauge(
'active_users_total',
'Number of active users'
)
task_queue_size = Gauge(
'task_queue_size',
'Number of tasks in queue',
['queue']
)
@app.middleware("http")
async def metrics_middleware(request, call_next):
"""Track metrics for each request."""
start_time = time.time()
response = await call_next(request)
duration = time.time() - start_time
request_count.labels(
method=request.method,
endpoint=request.url.path,
status=response.status_code
).inc()
request_duration.labels(
method=request.method,
endpoint=request.url.path
).observe(duration)
return response
@app.get("/metrics")
async def metrics():
"""Prometheus metrics endpoint."""
return Response(
content=generate_latest(),
media_type="text/plain"
)

Configure production logging:

app/logging_config.py
import logging
import json
from pythonjsonlogger import jsonlogger
def setup_logging(environment: str):
"""Configure structured logging for production."""
# JSON formatter
formatter = jsonlogger.JsonFormatter(
fmt='%(asctime)s %(levelname)s %(name)s %(message)s',
rename_fields={
'asctime': 'timestamp',
'levelname': 'level',
'name': 'logger'
}
)
# Console handler
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
# File handler for production
if environment == "production":
file_handler = logging.FileHandler('/var/log/taskflow/app.log')
file_handler.setFormatter(formatter)
logging.root.addHandler(file_handler)
# Configure root logger
logging.root.setLevel(logging.INFO)
logging.root.addHandler(console_handler)
# Add request ID to logs
import contextvars
request_id_var = contextvars.ContextVar('request_id', default=None)
class RequestIdFilter(logging.Filter):
def filter(self, record):
record.request_id = request_id_var.get()
return True
logging.root.addFilter(RequestIdFilter())

Configure Sentry:

app/sentry_config.py
import sentry_sdk
from sentry_sdk.integrations.asgi import SentryAsgiMiddleware
from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration
def init_sentry(app, dsn: str, environment: str):
"""Initialize Sentry error tracking."""
sentry_sdk.init(
dsn=dsn,
environment=environment,
integrations=[
SqlalchemyIntegration(),
],
traces_sample_rate=0.1, # 10% of transactions
profiles_sample_rate=0.1, # 10% profiling
attach_stacktrace=True,
send_default_pii=False, # Don't send personally identifiable info
# Filter sensitive data
before_send=filter_sensitive_data
)
# Add middleware
app.add_middleware(SentryAsgiMiddleware)
def filter_sensitive_data(event, hint):
"""Remove sensitive data before sending to Sentry."""
# Remove passwords, tokens, etc.
if 'request' in event and 'data' in event['request']:
data = event['request']['data']
for key in ['password', 'token', 'secret', 'api_key']:
if key in data:
data[key] = '[FILTERED]'
return event

Implement multi-level caching:

app/cache.py
import redis
from functools import wraps
import pickle
import hashlib
redis_client = redis.from_url(settings.REDIS_URL)
def cache_key(*args, **kwargs):
"""Generate cache key from arguments."""
key_data = f"{args}:{kwargs}".encode()
return hashlib.md5(key_data).hexdigest()
def cached(ttl=300):
"""Cache decorator for expensive operations."""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
# Generate cache key
key = f"cache:{func.__name__}:{cache_key(*args, **kwargs)}"
# Try to get from cache
cached_value = redis_client.get(key)
if cached_value:
return pickle.loads(cached_value)
# Execute function
result = await func(*args, **kwargs)
# Store in cache
redis_client.setex(
key,
ttl,
pickle.dumps(result)
)
return result
return wrapper
return decorator
# Usage
@cached(ttl=600)
async def get_user_stats(user_id: int):
# Expensive calculation
return calculate_stats(user_id)

Optimize queries and connections:

app/database_optimized.py
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.pool import NullPool, QueuePool
def create_optimized_engine(database_url: str, environment: str):
"""Create optimized database engine for production."""
if environment == "production":
return create_async_engine(
database_url,
# Connection pooling
poolclass=QueuePool,
pool_size=20,
max_overflow=40,
pool_timeout=30,
pool_recycle=1800, # Recycle connections after 30 minutes
# Performance
echo=False,
connect_args={
"server_settings": {
"application_name": "taskflow_api",
"jit": "off"
},
"command_timeout": 60,
"prepared_statement_cache_size": 0, # Disable for PgBouncer
}
)
else:
# Development settings
return create_async_engine(
database_url,
echo=True,
poolclass=NullPool # No pooling in dev
)

Implement these security measures:

app/security.py
from zenith import Request, HTTPException
from zenith.security import HTTPBearer
import hmac
import hashlib
class SecurityMiddleware:
"""Production security middleware."""
def __init__(self, app):
self.app = app
async def __call__(self, request: Request, call_next):
# Check for common attacks
if self._is_suspicious_request(request):
raise HTTPException(status_code=403, detail="Forbidden")
# Add security headers
response = await call_next(request)
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
response.headers["X-XSS-Protection"] = "1; mode=block"
response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
# HSTS for HTTPS
if request.url.scheme == "https":
response.headers["Strict-Transport-Security"] = (
"max-age=63072000; includeSubDomains; preload"
)
return response
def _is_suspicious_request(self, request: Request) -> bool:
"""Detect suspicious requests."""
# Check for SQL injection attempts
suspicious_patterns = [
"union select",
"drop table",
"<script>",
"javascript:",
"../",
"..\\",
]
url = str(request.url)
for pattern in suspicious_patterns:
if pattern in url.lower():
return True
# Check for excessive parameters
if len(request.query_params) > 20:
return True
return False
# API key validation for admin endpoints
def verify_api_key(api_key: str) -> bool:
"""Verify API key using constant-time comparison."""
expected = settings.ADMIN_API_KEY.encode()
provided = api_key.encode()
return hmac.compare_digest(expected, provided)

Solution: Check health endpoint, ensure database migrations ran, verify environment variables

Solution: Limit worker connections, implement connection pooling, use pagination

Solution: Add caching, optimize database queries, use CDN for static files

In this final part, you’ve implemented: Docker containerization with multi-stage builds Production configuration management Database migrations and backups CI/CD pipeline with GitHub Actions Deployment to multiple platforms Nginx reverse proxy configuration Monitoring with Prometheus and Grafana Structured logging and error tracking Security hardening and SSL Performance optimization strategies

Congratulations! You’ve built and deployed a production-ready API. You now have:

  • A complete task management system
  • Professional development practices
  • Production deployment knowledge
  • Monitoring and observability
  • Security best practices

Continue learning with:


Thank you for following this tutorial! If you built something cool, share it in our GitHub Discussions.

Questions? Check our FAQ or ask in Discord.