Profiling
Identify bottlenecks and optimize critical paths
Master performance optimization in Zenith with profiling, caching, database optimization, and scaling strategies.
Profiling
Identify bottlenecks and optimize critical paths
Caching
Multi-layer caching for maximum performance
Database
Query optimization and connection pooling
Scaling
Horizontal scaling and load balancing
# Performance baseline (measured on M3 Max, Python 3.13)# - Simple endpoints: 13,074 req/s (+69% from routing optimization)# - JSON endpoints: 12,274 req/s (+24% from routing optimization)# - With full middleware: 8,781 req/s (72% retention)# - Memory usage: <100MB for 1000 concurrent requests# - Startup time: <100msfrom zenith import Zenith
# Automatic performance optimizations in v0.0.1+app = Zenith() # Includes:# - Connection pooling# - Response caching headers# - Efficient JSON serialization# - Async request handling# - Optimized middleware orderingfrom zenith.performance import track_performance, profile, measure
@app.get("/api/users")@track_performance(threshold_ms=100) # Log if slower than 100msasync def get_users(): users = await User.all() return {"users": users}
@app.get("/api/analytics")@profile # Detailed profiling outputasync def get_analytics(): # Complex computation data = await calculate_analytics() return data
@app.get("/api/report")@measure("report_generation") # Custom metric nameasync def generate_report(): report = await create_report() return reportfrom zenith.performance import performance_timer
@app.get("/api/complex")async def complex_operation(): with performance_timer("database_query"): users = await User.where(active=True).all()
with performance_timer("processing"): results = process_users(users)
with performance_timer("serialization"): return {"results": results}
# Logs:# Performance: database_query took 45.2ms# Performance: processing took 120.5ms# Performance: serialization took 15.3msfrom zenith.middleware.performance import PerformanceMiddleware
app = Zenith( middleware=[ PerformanceMiddleware( slow_request_threshold=500, # Log requests > 500ms track_memory=True, # Track memory usage track_cpu=True, # Track CPU usage include_routes=["/api/*"], # Only track API routes exclude_routes=["/health"] # Skip health checks ) ])
# Automatic metrics at /metrics endpoint# - request_duration_seconds# - request_size_bytes# - response_size_bytes# - memory_usage_bytes# - cpu_usage_percentfrom zenith.monitoring import metrics
# Counter for tracking eventsuser_signups = metrics.counter( "user_signups_total", "Total number of user signups")
# Histogram for tracking distributionsrequest_latency = metrics.histogram( "request_latency_seconds", "Request latency distribution", buckets=[0.01, 0.05, 0.1, 0.5, 1.0])
# Gauge for current valuesactive_connections = metrics.gauge( "active_connections", "Number of active WebSocket connections")
# Summary for percentilesprocessing_time = metrics.summary( "processing_time_seconds", "Processing time summary")
@app.post("/api/signup")async def signup(user_data: UserCreate): with request_latency.time(): user = await create_user(user_data) user_signups.inc() return userfrom zenith.caching import cached, cache_responsefrom datetime import timedelta
# Function-level caching@cached(ttl=300) # Cache for 5 minutesasync def get_expensive_data(): # Complex computation return await compute_data()
# Route-level caching@app.get("/api/products")@cache_response( ttl=timedelta(minutes=15), vary_by=["category", "page"], # Cache key includes these params condition=lambda resp: resp.status_code == 200)async def get_products(category: str = None, page: int = 1): products = await fetch_products(category, page) return {"products": products}from zenith.caching import CacheManager, MemoryCache, RedisCache
# Configure cache layerscache = CacheManager( layers=[ MemoryCache(max_size=1000, ttl=60), # L1: Fast memory cache RedisCache(redis_url="redis://localhost", ttl=3600) # L2: Redis ])
@app.get("/api/user/{user_id}")async def get_user(user_id: int): # Try cache first cache_key = f"user:{user_id}" user = await cache.get(cache_key)
if user is None: # Cache miss - fetch from database user = await User.find(user_id) if user: await cache.set(cache_key, user.model_dump())
return userfrom zenith.caching import cache_invalidate, CacheTags
# Tag-based invalidation@app.get("/api/posts/{post_id}")@cache_response(ttl=3600, tags=["posts"])async def get_post(post_id: int): return await Post.find(post_id)
@app.put("/api/posts/{post_id}")async def update_post(post_id: int, data: PostUpdate): post = await Post.find(post_id) await post.update(**data.dict())
# Invalidate all cached posts await cache_invalidate(tags=["posts"])
return post
# Pattern-based invalidation@app.delete("/api/users/{user_id}")async def delete_user(user_id: int): await User.delete(user_id)
# Invalidate specific patterns await cache_invalidate(patterns=[ f"user:{user_id}", f"user:{user_id}:*", "users:list" ])from zenith.caching import conditional_cache
@app.get("/api/search")@conditional_cache( # Cache only for common queries condition=lambda req: len(req.query_params.get("q", "")) > 3, # Different TTL based on query popularity ttl_func=lambda req: 3600 if is_popular_query(req) else 300, # Include user ID in cache key for personalized results key_func=lambda req: f"search:{req.user.id}:{req.query_params}")async def search(q: str, user: User = Auth): results = await perform_search(q, user) return {"results": results}from zenith.db import create_async_enginefrom sqlalchemy.pool import NullPool, QueuePool
# Optimized connection poolingengine = create_async_engine( "postgresql+asyncpg://localhost/db", pool_size=20, # Number of connections max_overflow=10, # Extra connections when needed pool_timeout=30, # Wait time for connection pool_recycle=3600, # Recycle connections after 1 hour pool_pre_ping=True, # Test connections before use echo_pool=True # Log pool checkouts/checkins)
# For serverless/edge functionsserverless_engine = create_async_engine( "postgresql+asyncpg://localhost/db", poolclass=NullPool, # No connection pooling connect_args={ "server_settings": { "jit": "off" # Disable JIT for faster cold starts } })from zenith.db import select, join, exists, funcfrom sqlalchemy import Index
class Post(ZenithModel, table=True): __tablename__ = "posts"
id: int = Field(primary_key=True) user_id: int = Field(foreign_key="users.id", index=True) title: str = Field(index=True) content: str published: bool = Field(default=False, index=True) created_at: datetime = Field(default_factory=datetime.utcnow)
# Composite indexes for common queries __table_args__ = ( Index("idx_user_published", "user_id", "published"), Index("idx_published_created", "published", "created_at"), )
# Efficient queries with eager loading@app.get("/api/posts")async def get_posts(): # N+1 query problem - BAD posts = await Post.where(published=True).all() for post in posts: post.author = await User.find(post.user_id) # Extra query per post!
# Eager loading - GOOD posts = await Post.where(published=True).includes("author").all()
# Select only needed columns posts = await Post.select("id", "title", "created_at").where(published=True).all()
return {"posts": posts}from zenith.db import cached_query
class User(ZenithModel, table=True): @classmethod @cached_query(ttl=300) # Cache query results for 5 minutes async def get_active_users(cls): return await cls.where(active=True).order_by("-last_login").limit(100)
@classmethod @cached_query( ttl=600, key=lambda user_id: f"user_posts:{user_id}" ) async def get_user_posts(cls, user_id: int): return await Post.where(user_id=user_id).includes("comments").all()# Bulk insert - 10x faster than individual inserts@app.post("/api/users/bulk")async def bulk_create_users(users_data: list[UserCreate]): # Inefficient - individual inserts # for data in users_data: # await User.create(**data.dict())
# Efficient - bulk insert users = [User(**data.dict()) for data in users_data] await User.bulk_create(users)
return {"created": len(users)}
# Bulk update@app.put("/api/posts/publish")async def bulk_publish(post_ids: list[int]): await Post.where(Post.id.in_(post_ids)).update(published=True) return {"updated": len(post_ids)}import asynciofrom zenith.performance import gather_with_concurrency
@app.get("/api/dashboard")async def get_dashboard(user: User = Auth): # Sequential - SLOW # stats = await get_user_stats(user.id) # posts = await get_recent_posts(user.id) # notifications = await get_notifications(user.id)
# Concurrent - FAST stats, posts, notifications = await asyncio.gather( get_user_stats(user.id), get_recent_posts(user.id), get_notifications(user.id) )
return { "stats": stats, "posts": posts, "notifications": notifications }
# Controlled concurrency for many operations@app.get("/api/process-all")async def process_all(): items = await Item.all()
# Process with max 10 concurrent operations results = await gather_with_concurrency( [process_item(item) for item in items], max_concurrent=10 )
return {"processed": len(results)}from zenith.background import BackgroundTasks, priority_task
@app.post("/api/import")async def import_data( file: UploadFile, background: BackgroundTasks): # Return immediately, process in background task_id = background.add_task( process_import, file_path=file.filename, priority="high" # Process high-priority tasks first )
return {"task_id": task_id, "status": "processing"}
@priority_task(priority="low", max_retries=3)async def cleanup_old_data(): """Low-priority cleanup task.""" deleted = await OldData.delete_where( created_at__lt=datetime.utcnow() - timedelta(days=30) ) logger.info(f"Cleaned up {deleted} old records")from zenith.middleware import CompressionMiddleware
app = Zenith( middleware=[ CompressionMiddleware( minimum_size=1000, # Only compress > 1KB compression_level=6, # 1-9, higher = better compression exclude_types=["image/", "video/"], # Don't compress these prefer_algorithm="br" # Prefer Brotli over gzip ) ])
# Response-specific compression@app.get("/api/large-data")async def get_large_data(): data = await fetch_large_dataset()
return Response( content=data, headers={ "Content-Encoding": "gzip", "Vary": "Accept-Encoding" } )from zenith.responses import StreamingResponseimport asyncio
@app.get("/api/stream")async def stream_data(): async def generate(): for i in range(10000): chunk = f"Data chunk {i}\n" yield chunk.encode() await asyncio.sleep(0.01) # Simulate processing
return StreamingResponse( generate(), media_type="text/plain", headers={"X-Content-Type-Options": "nosniff"} )
# Server-Sent Events (SSE)@app.get("/api/events")async def event_stream(): async def generate_events(): while True: event = await get_next_event() yield f"data: {json.dumps(event)}\n\n"
return StreamingResponse( generate_events(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive" } )from zenith.pagination import Paginator, CursorPagination
@app.get("/api/items")async def get_items( page: int = 1, size: int = 20, sort: str = "-created_at"): # Offset pagination (simple but less efficient for large datasets) paginator = Paginator(page=page, size=size) items = await Item.paginate(paginator, sort=sort)
return { "items": items, "page": page, "size": size, "total": paginator.total, "pages": paginator.pages }
# Cursor pagination (efficient for large datasets)@app.get("/api/feed")async def get_feed(cursor: str = None, limit: int = 20): pagination = CursorPagination(cursor=cursor, limit=limit) posts = await Post.cursor_paginate( pagination, order_by="created_at" )
return { "posts": posts, "next_cursor": pagination.next_cursor, "has_more": pagination.has_more }from zenith.caching import LRUCache, TTLCachefrom weakref import WeakValueDictionary
# Bounded LRU cachelru_cache = LRUCache(maxsize=1000) # Evict least recently used
# Time-based cache with size limitttl_cache = TTLCache(maxsize=5000, ttl=300) # 5 min TTL
# Weak reference cache (automatic cleanup)weak_cache = WeakValueDictionary()
@app.get("/api/compute/{key}")async def compute_value(key: str): # Check caches in order if key in lru_cache: return lru_cache[key]
# Compute expensive value value = await expensive_computation(key)
# Store in cache lru_cache[key] = value weak_cache[key] = value # Will be GC'd if memory needed
return valuefrom dataclasses import dataclassfrom typing import NamedTupleimport sys
# Use slots to reduce memory overhead@dataclassclass User: __slots__ = ["id", "name", "email", "created_at"] id: int name: str email: str created_at: datetime
# NamedTuple for immutable dataclass Point(NamedTuple): x: float y: float z: float
# Memory comparisonregular_user = {"id": 1, "name": "Alice", "email": "alice@example.com"}slotted_user = User(1, "Alice", "alice@example.com", datetime.now())
print(sys.getsizeof(regular_user)) # ~296 bytesprint(sys.getsizeof(slotted_user)) # ~56 bytes (83% reduction!)@app.get("/api/export")async def export_data(): # Memory-inefficient - loads all data # all_users = await User.all() # return {"users": [u.model_dump() for u in all_users]}
# Memory-efficient - streaming with generator async def generate_users(): async for user in User.stream(): # Fetch in batches yield json.dumps(user.model_dump()) + "\n"
return StreamingResponse( generate_users(), media_type="application/x-ndjson" )from locust import HttpUser, task, between
class ZenithUser(HttpUser): wait_time = between(1, 3) # Wait 1-3 seconds between requests
@task(3) # Weight: 3x more likely than other tasks def get_users(self): self.client.get("/api/users")
@task(1) def get_user(self): user_id = random.randint(1, 1000) self.client.get(f"/api/users/{user_id}")
@task(2) def create_user(self): self.client.post("/api/users", json={ "name": "Test User", "email": f"user{random.randint(1, 10000)}@example.com" })
def on_start(self): # Login before testing self.client.post("/auth/login", json={ "username": "testuser", "password": "testpass" })
# Run: locust -f locustfile.py -H http://localhost:8000 -u 100 -r 10import asyncioimport timefrom zenith.testing import PerformanceTest
class APIBenchmark(PerformanceTest): async def setup(self): """Create test data.""" self.users = await User.bulk_create([ User(name=f"User {i}", email=f"user{i}@example.com") for i in range(1000) ])
async def benchmark_list_users(self): """Benchmark user listing endpoint.""" start = time.perf_counter()
tasks = [ self.client.get("/api/users?page=1&size=100") for _ in range(100) ] responses = await asyncio.gather(*tasks)
duration = time.perf_counter() - start
assert all(r.status_code == 200 for r in responses)
return { "requests": 100, "duration": duration, "rps": 100 / duration, "avg_latency": duration / 100 * 1000 # ms }
async def teardown(self): """Clean up test data.""" await User.delete_all()
# Run benchmarksasync def run_benchmarks(): benchmark = APIBenchmark() results = await benchmark.run_all()
print("Benchmark Results:") for name, metrics in results.items(): print(f"{name}: {metrics['rps']:.2f} req/s, {metrics['avg_latency']:.2f}ms avg")import multiprocessing
# Workersworkers = multiprocessing.cpu_count() * 2 + 1worker_class = "uvicorn.workers.UvicornWorker"worker_connections = 1000
# Serverbind = "0.0.0.0:8000"keepalive = 5
# Performancepreload_app = True # Load app before forkingmax_requests = 1000 # Restart workers after N requestsmax_requests_jitter = 50 # Randomize restart
# Loggingaccesslog = "-"errorlog = "-"loglevel = "info"# Run with Gunicorngunicorn main:app -c gunicorn_config.py
# Or with Uvicorn directly (single worker)uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4upstream zenith_backend { least_conn; # Use least connections algorithm
server app1:8000 weight=3; # Higher capacity server server app2:8000 weight=2; server app3:8000 weight=1;
keepalive 32; # Keep connections alive}
server { listen 80; server_name api.example.com;
# Enable HTTP/2 listen 443 ssl http2;
# Compression gzip on; gzip_types application/json text/plain;
# Caching proxy_cache_path /var/cache/nginx levels=1:2 keys_zone=api_cache:10m max_size=1g;
location /api/ { proxy_pass http://zenith_backend; proxy_http_version 1.1;
# Headers proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
# Connection pooling proxy_set_header Connection "";
# Caching proxy_cache api_cache; proxy_cache_valid 200 5m; proxy_cache_key "$request_method$request_uri$args";
# Timeouts proxy_connect_timeout 5s; proxy_send_timeout 10s; proxy_read_timeout 30s; }}apiVersion: apps/v1kind: Deploymentmetadata: name: zenith-apispec: replicas: 3 selector: matchLabels: app: zenith-api template: metadata: labels: app: zenith-api spec: containers: - name: api image: zenith:latest ports: - containerPort: 8000 env: - name: DATABASE_URL valueFrom: secretKeyRef: name: db-secret key: url resources: requests: memory: "256Mi" cpu: "250m" limits: memory: "512Mi" cpu: "1000m" livenessProbe: httpGet: path: /health port: 8000 initialDelaySeconds: 30 periodSeconds: 10 readinessProbe: httpGet: path: /health/ready port: 8000 initialDelaySeconds: 5 periodSeconds: 5
---apiVersion: autoscaling/v2kind: HorizontalPodAutoscalermetadata: name: zenith-api-hpaspec: scaleTargetRef: apiVersion: apps/v1 kind: Deployment name: zenith-api minReplicas: 3 maxReplicas: 10 metrics: - type: Resource resource: name: cpu target: type: Utilization averageUtilization: 70 - type: Resource resource: name: memory target: type: Utilization averageUtilization: 80Deployment Guide
Deploy and scale your optimized application Read the Deployment Guide →
Monitoring Guide
Set up comprehensive monitoring Read the Monitoring Guide →