Skip to content

Performance Guide

Master performance optimization in Zenith with profiling, caching, database optimization, and scaling strategies.

Profiling

Identify bottlenecks and optimize critical paths

Caching

Multi-layer caching for maximum performance

Database

Query optimization and connection pooling

Scaling

Horizontal scaling and load balancing

# Performance baseline (measured on M3 Max, Python 3.13)
# - Simple endpoints: 13,074 req/s (+69% from routing optimization)
# - JSON endpoints: 12,274 req/s (+24% from routing optimization)
# - With full middleware: 8,781 req/s (72% retention)
# - Memory usage: <100MB for 1000 concurrent requests
# - Startup time: <100ms
from zenith import Zenith
# Automatic performance optimizations in v0.0.1+
app = Zenith() # Includes:
# - Connection pooling
# - Response caching headers
# - Efficient JSON serialization
# - Async request handling
# - Optimized middleware ordering
from zenith.performance import track_performance, profile, measure
@app.get("/api/users")
@track_performance(threshold_ms=100) # Log if slower than 100ms
async def get_users():
users = await User.all()
return {"users": users}
@app.get("/api/analytics")
@profile # Detailed profiling output
async def get_analytics():
# Complex computation
data = await calculate_analytics()
return data
@app.get("/api/report")
@measure("report_generation") # Custom metric name
async def generate_report():
report = await create_report()
return report
from zenith.performance import performance_timer
@app.get("/api/complex")
async def complex_operation():
with performance_timer("database_query"):
users = await User.where(active=True).all()
with performance_timer("processing"):
results = process_users(users)
with performance_timer("serialization"):
return {"results": results}
# Logs:
# Performance: database_query took 45.2ms
# Performance: processing took 120.5ms
# Performance: serialization took 15.3ms
from zenith.middleware.performance import PerformanceMiddleware
app = Zenith(
middleware=[
PerformanceMiddleware(
slow_request_threshold=500, # Log requests > 500ms
track_memory=True, # Track memory usage
track_cpu=True, # Track CPU usage
include_routes=["/api/*"], # Only track API routes
exclude_routes=["/health"] # Skip health checks
)
]
)
# Automatic metrics at /metrics endpoint
# - request_duration_seconds
# - request_size_bytes
# - response_size_bytes
# - memory_usage_bytes
# - cpu_usage_percent
from zenith.monitoring import metrics
# Counter for tracking events
user_signups = metrics.counter(
"user_signups_total",
"Total number of user signups"
)
# Histogram for tracking distributions
request_latency = metrics.histogram(
"request_latency_seconds",
"Request latency distribution",
buckets=[0.01, 0.05, 0.1, 0.5, 1.0]
)
# Gauge for current values
active_connections = metrics.gauge(
"active_connections",
"Number of active WebSocket connections"
)
# Summary for percentiles
processing_time = metrics.summary(
"processing_time_seconds",
"Processing time summary"
)
@app.post("/api/signup")
async def signup(user_data: UserCreate):
with request_latency.time():
user = await create_user(user_data)
user_signups.inc()
return user
from zenith.caching import cached, cache_response
from datetime import timedelta
# Function-level caching
@cached(ttl=300) # Cache for 5 minutes
async def get_expensive_data():
# Complex computation
return await compute_data()
# Route-level caching
@app.get("/api/products")
@cache_response(
ttl=timedelta(minutes=15),
vary_by=["category", "page"], # Cache key includes these params
condition=lambda resp: resp.status_code == 200
)
async def get_products(category: str = None, page: int = 1):
products = await fetch_products(category, page)
return {"products": products}
from zenith.caching import CacheManager, MemoryCache, RedisCache
# Configure cache layers
cache = CacheManager(
layers=[
MemoryCache(max_size=1000, ttl=60), # L1: Fast memory cache
RedisCache(redis_url="redis://localhost", ttl=3600) # L2: Redis
]
)
@app.get("/api/user/{user_id}")
async def get_user(user_id: int):
# Try cache first
cache_key = f"user:{user_id}"
user = await cache.get(cache_key)
if user is None:
# Cache miss - fetch from database
user = await User.find(user_id)
if user:
await cache.set(cache_key, user.model_dump())
return user
from zenith.caching import cache_invalidate, CacheTags
# Tag-based invalidation
@app.get("/api/posts/{post_id}")
@cache_response(ttl=3600, tags=["posts"])
async def get_post(post_id: int):
return await Post.find(post_id)
@app.put("/api/posts/{post_id}")
async def update_post(post_id: int, data: PostUpdate):
post = await Post.find(post_id)
await post.update(**data.dict())
# Invalidate all cached posts
await cache_invalidate(tags=["posts"])
return post
# Pattern-based invalidation
@app.delete("/api/users/{user_id}")
async def delete_user(user_id: int):
await User.delete(user_id)
# Invalidate specific patterns
await cache_invalidate(patterns=[
f"user:{user_id}",
f"user:{user_id}:*",
"users:list"
])
from zenith.caching import conditional_cache
@app.get("/api/search")
@conditional_cache(
# Cache only for common queries
condition=lambda req: len(req.query_params.get("q", "")) > 3,
# Different TTL based on query popularity
ttl_func=lambda req: 3600 if is_popular_query(req) else 300,
# Include user ID in cache key for personalized results
key_func=lambda req: f"search:{req.user.id}:{req.query_params}"
)
async def search(q: str, user: User = Auth):
results = await perform_search(q, user)
return {"results": results}
from zenith.db import create_async_engine
from sqlalchemy.pool import NullPool, QueuePool
# Optimized connection pooling
engine = create_async_engine(
"postgresql+asyncpg://localhost/db",
pool_size=20, # Number of connections
max_overflow=10, # Extra connections when needed
pool_timeout=30, # Wait time for connection
pool_recycle=3600, # Recycle connections after 1 hour
pool_pre_ping=True, # Test connections before use
echo_pool=True # Log pool checkouts/checkins
)
# For serverless/edge functions
serverless_engine = create_async_engine(
"postgresql+asyncpg://localhost/db",
poolclass=NullPool, # No connection pooling
connect_args={
"server_settings": {
"jit": "off" # Disable JIT for faster cold starts
}
}
)
from zenith.db import select, join, exists, func
from sqlalchemy import Index
class Post(ZenithModel, table=True):
__tablename__ = "posts"
id: int = Field(primary_key=True)
user_id: int = Field(foreign_key="users.id", index=True)
title: str = Field(index=True)
content: str
published: bool = Field(default=False, index=True)
created_at: datetime = Field(default_factory=datetime.utcnow)
# Composite indexes for common queries
__table_args__ = (
Index("idx_user_published", "user_id", "published"),
Index("idx_published_created", "published", "created_at"),
)
# Efficient queries with eager loading
@app.get("/api/posts")
async def get_posts():
# N+1 query problem - BAD
posts = await Post.where(published=True).all()
for post in posts:
post.author = await User.find(post.user_id) # Extra query per post!
# Eager loading - GOOD
posts = await Post.where(published=True).includes("author").all()
# Select only needed columns
posts = await Post.select("id", "title", "created_at").where(published=True).all()
return {"posts": posts}
from zenith.db import cached_query
class User(ZenithModel, table=True):
@classmethod
@cached_query(ttl=300) # Cache query results for 5 minutes
async def get_active_users(cls):
return await cls.where(active=True).order_by("-last_login").limit(100)
@classmethod
@cached_query(
ttl=600,
key=lambda user_id: f"user_posts:{user_id}"
)
async def get_user_posts(cls, user_id: int):
return await Post.where(user_id=user_id).includes("comments").all()
# Bulk insert - 10x faster than individual inserts
@app.post("/api/users/bulk")
async def bulk_create_users(users_data: list[UserCreate]):
# Inefficient - individual inserts
# for data in users_data:
# await User.create(**data.dict())
# Efficient - bulk insert
users = [User(**data.dict()) for data in users_data]
await User.bulk_create(users)
return {"created": len(users)}
# Bulk update
@app.put("/api/posts/publish")
async def bulk_publish(post_ids: list[int]):
await Post.where(Post.id.in_(post_ids)).update(published=True)
return {"updated": len(post_ids)}
import asyncio
from zenith.performance import gather_with_concurrency
@app.get("/api/dashboard")
async def get_dashboard(user: User = Auth):
# Sequential - SLOW
# stats = await get_user_stats(user.id)
# posts = await get_recent_posts(user.id)
# notifications = await get_notifications(user.id)
# Concurrent - FAST
stats, posts, notifications = await asyncio.gather(
get_user_stats(user.id),
get_recent_posts(user.id),
get_notifications(user.id)
)
return {
"stats": stats,
"posts": posts,
"notifications": notifications
}
# Controlled concurrency for many operations
@app.get("/api/process-all")
async def process_all():
items = await Item.all()
# Process with max 10 concurrent operations
results = await gather_with_concurrency(
[process_item(item) for item in items],
max_concurrent=10
)
return {"processed": len(results)}
from zenith.background import BackgroundTasks, priority_task
@app.post("/api/import")
async def import_data(
file: UploadFile,
background: BackgroundTasks
):
# Return immediately, process in background
task_id = background.add_task(
process_import,
file_path=file.filename,
priority="high" # Process high-priority tasks first
)
return {"task_id": task_id, "status": "processing"}
@priority_task(priority="low", max_retries=3)
async def cleanup_old_data():
"""Low-priority cleanup task."""
deleted = await OldData.delete_where(
created_at__lt=datetime.utcnow() - timedelta(days=30)
)
logger.info(f"Cleaned up {deleted} old records")
from zenith.middleware import CompressionMiddleware
app = Zenith(
middleware=[
CompressionMiddleware(
minimum_size=1000, # Only compress > 1KB
compression_level=6, # 1-9, higher = better compression
exclude_types=["image/", "video/"], # Don't compress these
prefer_algorithm="br" # Prefer Brotli over gzip
)
]
)
# Response-specific compression
@app.get("/api/large-data")
async def get_large_data():
data = await fetch_large_dataset()
return Response(
content=data,
headers={
"Content-Encoding": "gzip",
"Vary": "Accept-Encoding"
}
)
from zenith.responses import StreamingResponse
import asyncio
@app.get("/api/stream")
async def stream_data():
async def generate():
for i in range(10000):
chunk = f"Data chunk {i}\n"
yield chunk.encode()
await asyncio.sleep(0.01) # Simulate processing
return StreamingResponse(
generate(),
media_type="text/plain",
headers={"X-Content-Type-Options": "nosniff"}
)
# Server-Sent Events (SSE)
@app.get("/api/events")
async def event_stream():
async def generate_events():
while True:
event = await get_next_event()
yield f"data: {json.dumps(event)}\n\n"
return StreamingResponse(
generate_events(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive"
}
)
from zenith.pagination import Paginator, CursorPagination
@app.get("/api/items")
async def get_items(
page: int = 1,
size: int = 20,
sort: str = "-created_at"
):
# Offset pagination (simple but less efficient for large datasets)
paginator = Paginator(page=page, size=size)
items = await Item.paginate(paginator, sort=sort)
return {
"items": items,
"page": page,
"size": size,
"total": paginator.total,
"pages": paginator.pages
}
# Cursor pagination (efficient for large datasets)
@app.get("/api/feed")
async def get_feed(cursor: str = None, limit: int = 20):
pagination = CursorPagination(cursor=cursor, limit=limit)
posts = await Post.cursor_paginate(
pagination,
order_by="created_at"
)
return {
"posts": posts,
"next_cursor": pagination.next_cursor,
"has_more": pagination.has_more
}
from zenith.caching import LRUCache, TTLCache
from weakref import WeakValueDictionary
# Bounded LRU cache
lru_cache = LRUCache(maxsize=1000) # Evict least recently used
# Time-based cache with size limit
ttl_cache = TTLCache(maxsize=5000, ttl=300) # 5 min TTL
# Weak reference cache (automatic cleanup)
weak_cache = WeakValueDictionary()
@app.get("/api/compute/{key}")
async def compute_value(key: str):
# Check caches in order
if key in lru_cache:
return lru_cache[key]
# Compute expensive value
value = await expensive_computation(key)
# Store in cache
lru_cache[key] = value
weak_cache[key] = value # Will be GC'd if memory needed
return value
from dataclasses import dataclass
from typing import NamedTuple
import sys
# Use slots to reduce memory overhead
@dataclass
class User:
__slots__ = ["id", "name", "email", "created_at"]
id: int
name: str
email: str
created_at: datetime
# NamedTuple for immutable data
class Point(NamedTuple):
x: float
y: float
z: float
# Memory comparison
regular_user = {"id": 1, "name": "Alice", "email": "alice@example.com"}
slotted_user = User(1, "Alice", "alice@example.com", datetime.now())
print(sys.getsizeof(regular_user)) # ~296 bytes
print(sys.getsizeof(slotted_user)) # ~56 bytes (83% reduction!)
@app.get("/api/export")
async def export_data():
# Memory-inefficient - loads all data
# all_users = await User.all()
# return {"users": [u.model_dump() for u in all_users]}
# Memory-efficient - streaming with generator
async def generate_users():
async for user in User.stream(): # Fetch in batches
yield json.dumps(user.model_dump()) + "\n"
return StreamingResponse(
generate_users(),
media_type="application/x-ndjson"
)
locustfile.py
from locust import HttpUser, task, between
class ZenithUser(HttpUser):
wait_time = between(1, 3) # Wait 1-3 seconds between requests
@task(3) # Weight: 3x more likely than other tasks
def get_users(self):
self.client.get("/api/users")
@task(1)
def get_user(self):
user_id = random.randint(1, 1000)
self.client.get(f"/api/users/{user_id}")
@task(2)
def create_user(self):
self.client.post("/api/users", json={
"name": "Test User",
"email": f"user{random.randint(1, 10000)}@example.com"
})
def on_start(self):
# Login before testing
self.client.post("/auth/login", json={
"username": "testuser",
"password": "testpass"
})
# Run: locust -f locustfile.py -H http://localhost:8000 -u 100 -r 10
benchmarks/api_benchmark.py
import asyncio
import time
from zenith.testing import PerformanceTest
class APIBenchmark(PerformanceTest):
async def setup(self):
"""Create test data."""
self.users = await User.bulk_create([
User(name=f"User {i}", email=f"user{i}@example.com")
for i in range(1000)
])
async def benchmark_list_users(self):
"""Benchmark user listing endpoint."""
start = time.perf_counter()
tasks = [
self.client.get("/api/users?page=1&size=100")
for _ in range(100)
]
responses = await asyncio.gather(*tasks)
duration = time.perf_counter() - start
assert all(r.status_code == 200 for r in responses)
return {
"requests": 100,
"duration": duration,
"rps": 100 / duration,
"avg_latency": duration / 100 * 1000 # ms
}
async def teardown(self):
"""Clean up test data."""
await User.delete_all()
# Run benchmarks
async def run_benchmarks():
benchmark = APIBenchmark()
results = await benchmark.run_all()
print("Benchmark Results:")
for name, metrics in results.items():
print(f"{name}: {metrics['rps']:.2f} req/s, {metrics['avg_latency']:.2f}ms avg")
gunicorn_config.py
import multiprocessing
# Workers
workers = multiprocessing.cpu_count() * 2 + 1
worker_class = "uvicorn.workers.UvicornWorker"
worker_connections = 1000
# Server
bind = "0.0.0.0:8000"
keepalive = 5
# Performance
preload_app = True # Load app before forking
max_requests = 1000 # Restart workers after N requests
max_requests_jitter = 50 # Randomize restart
# Logging
accesslog = "-"
errorlog = "-"
loglevel = "info"
Terminal window
# Run with Gunicorn
gunicorn main:app -c gunicorn_config.py
# Or with Uvicorn directly (single worker)
uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4
nginx.conf
upstream zenith_backend {
least_conn; # Use least connections algorithm
server app1:8000 weight=3; # Higher capacity server
server app2:8000 weight=2;
server app3:8000 weight=1;
keepalive 32; # Keep connections alive
}
server {
listen 80;
server_name api.example.com;
# Enable HTTP/2
listen 443 ssl http2;
# Compression
gzip on;
gzip_types application/json text/plain;
# Caching
proxy_cache_path /var/cache/nginx levels=1:2 keys_zone=api_cache:10m max_size=1g;
location /api/ {
proxy_pass http://zenith_backend;
proxy_http_version 1.1;
# Headers
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
# Connection pooling
proxy_set_header Connection "";
# Caching
proxy_cache api_cache;
proxy_cache_valid 200 5m;
proxy_cache_key "$request_method$request_uri$args";
# Timeouts
proxy_connect_timeout 5s;
proxy_send_timeout 10s;
proxy_read_timeout 30s;
}
}
k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: zenith-api
spec:
replicas: 3
selector:
matchLabels:
app: zenith-api
template:
metadata:
labels:
app: zenith-api
spec:
containers:
- name: api
image: zenith:latest
ports:
- containerPort: 8000
env:
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: db-secret
key: url
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "1000m"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /health/ready
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: zenith-api-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: zenith-api
minReplicas: 3
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
  • Use performance decorators during development
  • Profile critical code paths
  • Implement caching for expensive operations
  • Use bulk operations for database writes
  • Implement pagination for list endpoints
  • Use eager loading to prevent N+1 queries
  • Run performance benchmarks
  • Load test with expected traffic
  • Monitor memory usage under load
  • Test cache hit rates
  • Verify connection pooling
  • Check for memory leaks
  • Enable response compression
  • Configure connection pooling
  • Set up monitoring and alerts
  • Implement auto-scaling
  • Use CDN for static assets
  • Configure database indexes
  • Track request latency percentiles (p50, p95, p99)
  • Monitor error rates
  • Track memory and CPU usage
  • Monitor database query times
  • Check cache hit rates
  • Set up alerting thresholds