Skip to content

Routing & Request Handling

Zenith’s routing system provides type-safe, organized API endpoints with automatic validation and minimal configuration.

Zenith’s routing system provides:

  • Decorator-based routing - Clean, intuitive endpoint definition
  • Automatic validation - Pydantic models validate all inputs
  • Type safety - Full IDE support with autocomplete
  • Router organization - Group related endpoints together
  • Minimal configuration - Focus on business logic
Use CasePatternExample
Simple APIsFlat routing@app.get("/users")
Large APIsRouter groupsusers_router = Router()
MicroservicesVersioned routes/api/v1/users
Multi-tenantPrefix routing/{tenant}/users
File servingPath matching/{file_path:path}

Zenith provides automatic validation and type conversion:

# Zenith automatic validation - clean and type-safe
@app.get("/users")
async def get_users(
page: int = 1, # Type hints enable automatic validation
limit: int = 10 # Values converted from query string
):
# Zenith automatically:
# 1. Extracts 'page' and 'limit' from query string (?page=2&limit=20)
# 2. Converts string values to integers
# 3. Applies defaults if parameters are missing
# 4. Returns detailed 422 error if validation fails
# 5. Provides full IDE autocomplete and type checking
return await fetch_users(page, limit) # Clean business logic only

Zenith provides an intuitive decorator-based routing system with full type safety.

from zenith import Zenith
# Create your application - zero configuration needed
app = Zenith()
# GET endpoint - the simplest possible route
@app.get("/")
async def root():
# Zenith automatically converts dict to JSON response
return {"message": "Hello World"}
# POST endpoint - receives JSON body
@app.post("/items")
async def create_item(item: dict): # 'item' is auto-parsed from request body
# The dict parameter tells Zenith to:
# 1. Expect JSON in the request body
# 2. Parse it automatically
# 3. Return 400 if body is invalid JSON
return {"created": item}
# PUT endpoint - combines path parameter and body
@app.put("/items/{item_id}")
async def update_item(
item_id: int, # Extracted from URL path and converted to int
item: dict # Extracted from request body as JSON
):
# Both parameters are validated and available here
return {"updated": item_id, "data": item}
# DELETE endpoint - typically just needs ID
@app.delete("/items/{item_id}")
async def delete_item(item_id: int):
# Common pattern: delete by ID, return confirmation
return {"deleted": item_id}

Extract values directly from the URL path with automatic type conversion:

# Simple path parameter - most common pattern
@app.get("/users/{user_id}")
async def get_user(user_id: int): # The type hint is your validator!
# How it works:
# URL: /users/123 -> user_id = 123 (converted to int) (valid)
# URL: /users/alice -> 422 error: "not a valid integer" (invalid)
# URL: /users/ -> 404 error: route not found (invalid)
# At this point, user_id is guaranteed to be an integer
return {"user_id": user_id}
# Multiple path parameters - perfect for hierarchical data
@app.get("/posts/{year}/{month}/{slug}")
async def get_post(
year: int, # Will be converted from string to int
month: int, # Also converted and validated
slug: str # Stays as string (no conversion needed)
):
# Example URL: /posts/2024/03/my-awesome-post
# Zenith extracts: year=2024, month=3, slug="my-awesome-post"
# Common use case: blog post URLs or date-based content
post = await fetch_post_by_date_and_slug(year, month, slug)
return post
# Special :path converter - captures everything including slashes
@app.get("/files/{file_path:path}")
async def get_file(file_path: str):
# The :path suffix is special - it captures ALL remaining path segments
#
# Example URLs:
# /files/document.pdf -> file_path = "document.pdf"
# /files/2024/reports/q1.pdf -> file_path = "2024/reports/q1.pdf"
# /files/deep/nested/path.txt -> file_path = "deep/nested/path.txt"
# Perfect for file servers or proxying requests
return serve_file(file_path)

Extract and validate values from the URL query string (everything after the ?):

from typing import Optional
@app.get("/search")
async def search(
# Required parameter - no default value means it MUST be provided
q: str,
# Optional with defaults - commonly used for pagination
limit: int = 10, # If not provided, defaults to 10
offset: int = 0, # Start from beginning by default
# Truly optional - can be None
sort: Optional[str] = None # User might not want sorting
):
"""
Real-world search endpoint with common patterns.
Example requests and what Zenith does:
1. Basic search:
GET /search?q=python
-> q="python", limit=10, offset=0, sort=None
2. Search with pagination:
GET /search?q=python&limit=5&offset=10
-> q="python", limit=5, offset=10, sort=None
(Returns 5 results starting from the 11th match)
3. Search with sorting:
GET /search?q=python&sort=date&limit=20
-> q="python", limit=20, offset=0, sort="date"
4. Missing required parameter:
GET /search
-> 422 Unprocessable Entity
-> {"detail": [{"loc": ["query", "q"],
"msg": "field required"}]}
5. Invalid type:
GET /search?q=python&limit=abc
-> 422 error (limit must be integer)
"""
# At this point, all parameters are validated and converted
results = await database.search(
query=q,
limit=limit,
offset=offset,
order_by=sort
)
return {
"query": q,
"results": results,
"pagination": {
"limit": limit,
"offset": offset,
"total": len(results)
},
"sort": sort
}
Section titled “With Pydantic Models (Recommended for Complex Data)”
from pydantic import BaseModel, Field, validator
from datetime import datetime
class PostCreate(BaseModel):
"""Model for creating a new blog post.
Pydantic models give you:
- Automatic validation
- Type conversion
- Default values
- Custom validators
- Auto-generated documentation
"""
# Required fields - must be in request
title: str = Field(
..., # ... means required
min_length=1,
max_length=200,
description="Post title for display"
)
content: str = Field(
...,
min_length=10,
description="The main post content in markdown"
)
# Optional fields with defaults
published: bool = False # Draft by default
tags: list[str] = [] # Empty list if not provided
# Custom validation
@validator('tags')
def validate_tags(cls, v):
# Ensure no more than 5 tags
if len(v) > 5:
raise ValueError('Too many tags (max 5)')
# Normalize tags to lowercase
return [tag.lower() for tag in v]
@app.post("/posts", response_model=PostResponse)
async def create_post(post: PostCreate):
"""
Create a new blog post.
What happens when a request comes in:
1. Zenith receives JSON body
2. Attempts to parse into PostCreate model
3. Runs all field validations
4. Runs custom validators
5. If valid: passes model to function
6. If invalid: returns 422 with detailed errors
Example valid request:
POST /posts
{
"title": "Getting Started with Zenith",
"content": "Zenith is a modern Python web framework...",
"tags": ["python", "web", "framework"]
}
Example validation error:
POST /posts
{"title": ""}
Returns 422:
{
"detail": [
{"loc": ["body", "title"],
"msg": "ensure this value has at least 1 characters"},
{"loc": ["body", "content"],
"msg": "field required"}
]
}
"""
# At this point, 'post' is fully validated
# All fields have correct types and pass validation rules
# Save to database
saved_post = await database.posts.create(
**post.model_dump() # Convert to dict for database
)
return saved_post # Zenith converts to JSON automatically
from zenith import UploadFile, File
# Single file upload endpoint
@app.post("/upload")
async def upload_file(
file: UploadFile = File() # File() tells Zenith to expect multipart/form-data
):
"""
Handle single file upload.
How to call this endpoint:
- Use multipart/form-data (not JSON)
- Field name must be 'file'
Example with curl:
curl -X POST "http://localhost:8000/upload" \
-F "file=@document.pdf"
Example with JavaScript:
const formData = new FormData();
formData.append('file', fileInput.files[0]);
await fetch('/upload', {method: 'POST', body: formData});
"""
# UploadFile provides useful attributes:
# - filename: Original filename from client
# - content_type: MIME type (e.g., "image/png")
# - file: SpooledTemporaryFile object for streaming
# Read file contents into memory (careful with large files!)
contents = await file.read()
# For large files, stream to disk instead:
# async with aiofiles.open(f'uploads/{file.filename}', 'wb') as f:
# while chunk := await file.read(1024 * 1024): # 1MB chunks
# await f.write(chunk)
return {
"filename": file.filename,
"size": len(contents),
"content_type": file.content_type
}
# Multiple file upload endpoint
@app.post("/upload-multiple")
async def upload_multiple(
files: list[UploadFile] = File() # List type = multiple files expected
):
"""
Handle multiple file uploads.
HTML form example:
<form method="POST" enctype="multipart/form-data">
<input type="file" name="files" multiple>
<button type="submit">Upload</button>
</form>
JavaScript example:
const formData = new FormData();
for (const file of fileInput.files) {
formData.append('files', file); // Same field name, multiple values
}
"""
uploaded_info = []
for file in files:
# Process each file
size = len(await file.read())
uploaded_info.append({
"filename": file.filename,
"size": size,
"type": file.content_type
})
return {"uploaded": uploaded_info, "count": len(files)}

Response Models (Control What Clients See)

Section titled “Response Models (Control What Clients See)”
from pydantic import BaseModel, Field
from datetime import datetime
class PostResponse(BaseModel):
"""Model for API responses - controls exactly what data is sent to clients.
Benefits of response models:
1. Hide sensitive fields (password_hash, internal_notes)
2. Add computed fields (full_name from first + last)
3. Ensure consistent API responses
4. Auto-generate accurate API documentation
"""
# Only these fields will be in the response
id: int
title: str
content: str
author: str
created_at: datetime
tags: list[str]
# Fields NOT in this model won't be exposed, even if present in data:
# - password_hash
# - internal_notes
# - deleted_at
class Config:
# How to serialize datetime objects
json_encoders = {
datetime: lambda v: v.isoformat() # Convert to ISO 8601 string
}
# Use response_model to enforce the output schema
@app.get("/posts/{post_id}", response_model=PostResponse)
async def get_post(post_id: int) -> PostResponse:
"""
Get a single post.
The response_model parameter ensures:
1. Return value is validated against PostResponse
2. Only fields in PostResponse are sent to client
3. OpenAPI docs show exact response structure
4. Type errors caught before sending response
"""
# Fetch from database (might have extra fields)
post_data = await database.posts.find_by_id(post_id)
# post_data might contain: password_hash, internal_notes, etc.
# Create response model instance
# This automatically filters to only allowed fields!
return PostResponse(
id=post_data.id,
title=post_data.title,
content=post_data.content,
author=post_data.author_name, # Can transform field names
created_at=post_data.created_at,
tags=post_data.tags or [] # Provide defaults
)
# Alternative: If your database model matches response model:
# return PostResponse(**post_data.model_dump())
# Versioned API structure
v1_router = Router(prefix="/api/v1")
v2_router = Router(prefix="/api/v2")
@v1_router.get("/users")
async def get_users_v1():
# V1 implementation
return {"version": "1.0", "users": []}
@v2_router.get("/users")
async def get_users_v2():
# V2 with pagination
return {
"version": "2.0",
"users": [],
"pagination": {"page": 1, "total": 0}
}
app.include_router(v1_router)
app.include_router(v2_router)
from zenith import Router, HTTPException
from typing import List
# Create a router for grouping related endpoints
router = Router(
prefix="/api/items", # All routes will start with /api/items
tags=["Items"] # Groups endpoints in API docs
)
# GET /api/items - List all items (with pagination)
@router.get("/", response_model=List[Item])
async def list_items(
skip: int = 0, # For pagination: skip first N items
limit: int = 100 # Maximum items to return (prevent huge responses)
):
"""List all items with pagination.
RESTful convention:
- GET on collection (/items) returns list
- Always paginate to prevent memory issues
- Return consistent structure
Example: GET /api/items?skip=20&limit=10
Returns items 21-30
"""
# Enforce reasonable limits to protect your server
limit = min(limit, 100) # Cap at 100 even if user asks for more
items = await database.items.find(
skip=skip,
limit=limit
)
# Zenith automatically serializes list of Pydantic models
return items
# GET /api/items/{id} - Get specific item
@router.get("/{item_id}", response_model=Item)
async def get_item(item_id: int):
"""Get a single item by ID.
RESTful convention:
- GET on resource (/items/123) returns single item
- Return 404 if not found (not empty response)
"""
# Try to find the item
item = await database.items.find_by_id(item_id)
# Important: Be explicit about missing resources
if not item:
# HTTPException automatically returns proper error response
raise HTTPException(
status_code=404,
detail=f"Item with id {item_id} not found"
)
return item
# POST /api/items - Create new item
@router.post(
"/",
response_model=Item,
status_code=201 # 201 Created (not 200 OK) for new resources
)
async def create_item(item: ItemCreate):
"""Create a new item.
RESTful convention:
- POST to collection creates new resource
- Return 201 Created (not 200)
- Return the created resource with ID
- Include Location header (Zenith does this)
"""
# Validate unique constraints
existing = await database.items.find_by_name(item.name)
if existing:
raise HTTPException(
status_code=409, # 409 Conflict for duplicates
detail=f"Item with name '{item.name}' already exists"
)
# Create and save
new_item = await database.items.create(**item.model_dump())
# Return created item (now has ID from database)
return new_item
# PUT /api/items/{id} - Full update
@router.put("/{item_id}", response_model=Item)
async def update_item(
item_id: int,
item: ItemUpdate # Complete replacement data
):
"""Update an existing item (full replacement).
RESTful convention:
- PUT replaces entire resource
- Must provide all fields
- Return updated resource
- Use PATCH for partial updates
"""
# Verify item exists
existing = await database.items.find_by_id(item_id)
if not existing:
raise HTTPException(404, f"Item {item_id} not found")
# Replace all fields
updated = await database.items.update(
item_id,
**item.model_dump()
)
return updated
# DELETE /api/items/{id} - Remove item
@router.delete(
"/{item_id}",
status_code=204 # 204 No Content (success but no response body)
)
async def delete_item(item_id: int):
"""Delete an item.
RESTful convention:
- DELETE removes resource
- Return 204 No Content (not 200)
- No response body needed
- Idempotent: deleting twice is OK
"""
# Check if item exists
existing = await database.items.find_by_id(item_id)
if not existing:
# Idempotent: already deleted is success
return # 204 No Content
# Perform deletion
await database.items.delete(item_id)
# No return needed - Zenith sends 204 automatically
@app.get("/{tenant_id}/dashboard")
async def tenant_dashboard(
tenant_id: str,
user = Depends(get_current_user)
):
# Verify user has access to tenant
if not user.has_tenant_access(tenant_id):
raise HTTPException(403, "Access denied")
return await get_tenant_data(tenant_id)
routes/users.py
from zenith import Router
router = Router(prefix="/users", tags=["Users"])
@router.get("/")
async def list_users():
return {"users": []}
@router.get("/{user_id}")
async def get_user(user_id: int):
return {"user_id": user_id}
@router.post("/")
async def create_user(user: dict):
return {"created": user}
main.py
from zenith import Zenith
from routes import users, posts, auth
app = Zenith()
# Include routers
app.include_router(users.router)
app.include_router(posts.router)
app.include_router(auth.router, prefix="/api/auth")
# Create parent router
api_router = Router(prefix="/api/v1")
# Create child routers
users_router = Router(prefix="/users", tags=["Users"])
posts_router = Router(prefix="/posts", tags=["Posts"])
# Include child routers in parent
api_router.include_router(users_router)
api_router.include_router(posts_router)
# Include parent in app
app.include_router(api_router)
from zenith import Depends, Header, HTTPException
# Database connection dependency
async def get_db():
"""Provide database connection to routes.
This pattern ensures:
- Connection is created per request
- Properly closed after request
- Shared across multiple operations in same request
"""
# Create new database session for this request
db = DatabaseSession()
try:
# 'yield' makes this a generator
# Route handler runs here with 'db' available
yield db
# After route completes, commit any pending transactions
await db.commit()
except Exception:
# If route raised exception, rollback
await db.rollback()
raise
finally:
# Always close connection
await db.close()
# Authentication dependency
async def get_current_user(
# Extract token from Authorization header
authorization: str = Header()
):
"""Extract and verify current user from JWT token.
Common authentication pattern:
1. Get token from header
2. Decode and verify
3. Load user data
4. Make available to route
"""
# Header should be: "Bearer <token>"
if not authorization.startswith("Bearer "):
raise HTTPException(401, "Invalid authorization header")
token = authorization[7:] # Remove "Bearer " prefix
# Decode JWT token
try:
payload = decode_jwt_token(token) # Your JWT logic
user_id = payload.get("user_id")
except InvalidTokenError:
raise HTTPException(401, "Invalid or expired token")
# Load user from database
user = await database.users.find_by_id(user_id)
if not user:
raise HTTPException(401, "User not found")
return user
# Optional dependency - user might not be logged in
async def get_optional_user(
authorization: str = Header(None) # None = optional
):
"""Get user if authenticated, None otherwise."""
if not authorization:
return None
return await get_current_user(authorization)
# Use dependencies in routes
@app.get("/protected")
async def protected_route(
# These dependencies run before your route
user = Depends(get_current_user), # Required: user must be logged in
db = Depends(get_db) # Get database connection
):
"""
Protected endpoint requiring authentication.
The Depends() system:
1. Runs get_current_user() first
2. If it raises HTTPException, stops here (returns error)
3. If successful, runs get_db()
4. Passes both results to this function
5. Runs cleanup code (finally blocks) after response
"""
# At this point:
# - user is authenticated and loaded
# - db is connected and ready
# Use the injected dependencies
user_posts = await db.query("SELECT * FROM posts WHERE user_id = ?", user.id)
return {
"user": user.username,
"posts": len(user_posts),
"db_connected": True
}
# Dependencies can depend on other dependencies!
@app.get("/admin")
async def admin_route(
user = Depends(get_current_user),
# This dependency uses the user dependency
is_admin = Depends(lambda u=Depends(get_current_user): u.role == "admin")
):
if not is_admin:
raise HTTPException(403, "Admin access required")
return {"message": "Admin dashboard"}
@app.post("/items", status_code=201)
async def create_item(item: dict):
return {"created": item}
@app.delete("/items/{item_id}", status_code=204)
async def delete_item(item_id: int):
# No content returned
pass
@app.get(
"/users",
tags=["Users"],
summary="List all users",
description="""
Retrieve a paginated list of all users.
Requires authentication.
""",
response_description="List of users"
)
async def list_users():
return {"users": []}
from zenith import JSONResponse, HTMLResponse, FileResponse, Response
# Custom JSON response with headers
@app.get("/json")
async def json_response():
"""Return JSON with custom status code and headers.
Use cases:
- Add cache headers
- Set custom status codes
- Add CORS headers dynamically
"""
return JSONResponse(
content={"message": "Custom JSON", "timestamp": datetime.now().isoformat()},
status_code=200, # Could be 201, 202, etc.
headers={
"X-Custom": "Header",
"Cache-Control": "max-age=3600", # Cache for 1 hour
"X-Process-Time": "0.123" # Custom metrics
}
)
# Return HTML directly
@app.get("/html")
async def html_response():
"""Serve HTML content directly.
Useful for:
- Simple pages without templates
- Dynamic HTML generation
- Error pages
"""
# Could generate this dynamically
html_content = f"""
<!DOCTYPE html>
<html>
<head>
<title>Zenith App</title>
<style>
body {{ font-family: system-ui; padding: 2rem; }}
.time {{ color: #666; }}
</style>
</head>
<body>
<h1>Hello from Zenith!</h1>
<p class="time">Generated at: {datetime.now()}</p>
</body>
</html>
"""
return HTMLResponse(
content=html_content,
status_code=200
)
# File download endpoint
@app.get("/download/{file_id}")
async def download_file(file_id: int):
"""Serve files for download.
FileResponse features:
- Streams large files efficiently
- Sets correct Content-Type
- Handles Range requests (resume downloads)
- Adds Content-Disposition for downloads
"""
# Look up file in database
file_record = await database.files.find_by_id(file_id)
if not file_record:
raise HTTPException(404, "File not found")
# Security: Validate user has permission
# if not user_can_access(current_user, file_record):
# raise HTTPException(403, "Access denied")
return FileResponse(
path=file_record.storage_path, # Server filesystem path
filename=file_record.original_name, # Name for download
media_type=file_record.mime_type, # e.g., "application/pdf"
# Optional: force download instead of display
headers={
"Content-Disposition": f'attachment; filename="{file_record.original_name}"'
}
)
# Streaming response for large data
@app.get("/stream")
async def stream_data():
"""Stream large responses to avoid memory issues."""
async def generate():
# Generate data in chunks
for i in range(1000000):
yield f"Data line {i}\n".encode()
if i % 1000 == 0:
# Give other requests a chance
await asyncio.sleep(0)
return Response(
content=generate(), # Async generator
media_type="text/plain",
headers={"X-Content-Type-Options": "nosniff"}
)
# More specific routes should be defined first
@app.get("/users/me")
async def get_current_user():
return {"user": "current"}
@app.get("/users/{user_id}")
async def get_user(user_id: int):
return {"user_id": user_id}
@app.get("/items/{item_id:^[0-9]+$}")
async def get_item_numeric(item_id: str):
# Only matches numeric IDs
return {"item_id": int(item_id)}
@app.get("/items/{item_id:^[a-z]+$}")
async def get_item_alpha(item_id: str):
# Only matches alphabetic IDs
return {"item_id": item_id}
@app.get("/{full_path:path}")
async def catch_all(full_path: str):
# Catches all unmatched routes
return {"path": full_path, "message": "Not found"}
from zenith import WebSocket
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Echo: {data}")
  • Group related endpoints in router modules
  • Use consistent naming (plural for collections, singular for items)
  • Version your API from the start (/api/v1/)
  • Return proper status codes (201 for creation, 204 for deletion)
  • Document endpoints with docstrings and OpenAPI metadata
  • Validate everything with Pydantic models
  • Don’t mix business logic in route handlers
  • Don’t use generic exception messages
  • Don’t forget to handle edge cases (404, 409, etc.)
  • Don’t expose internal errors to clients
  • Don’t use synchronous operations in async routes

“Route not found (404)”

  • Check route registration order (specific before generic)
  • Verify router is included in app
  • Check for typos in path parameters

“Validation error”

  • Ensure request body matches Pydantic model
  • Check required vs optional fields
  • Verify type annotations are correct

“Method not allowed (405)”

  • Verify HTTP method matches decorator
  • Check if route supports the method

“Depends() not working”

  • Import from zenith not traditional frameworks
  • Ensure dependency returns/yields value
  • Check async vs sync consistency
# Debug: List all registered routes
def print_routes(app):
"""Print all routes for debugging.
Helpful when:
- Route not found errors
- Checking route registration
- Understanding route precedence
"""
print("\nRegistered Routes:")
print("-" * 50)
for route in app.routes:
# Some routes might not have methods (websockets)
methods = getattr(route, 'methods', ['WS'])
# Format: GET,POST /api/users/{user_id}
print(f"{','.join(methods):10} {route.path:30} {route.name}")
print("-" * 50)
print(f"Total routes: {len(app.routes)}")
# Call it after all routes are registered
print_routes(app)
# Test specific routes programmatically
from zenith.testing import TestClient
async def test_route_access():
"""Test if routes are accessible."""
async with TestClient(app) as client:
# Test a specific endpoint
response = await client.get("/api/users")
print(f"\nTesting GET /api/users:")
print(f"Status: {response.status_code}")
if response.status_code == 200:
print(f"Success! Body: {response.json()}")
elif response.status_code == 404:
print("ERROR: Route not found - check registration")
elif response.status_code == 401:
print("Requires authentication - add token")
elif response.status_code == 422:
print(f"Validation error: {response.json()}")
# Test with authentication
response = await client.get(
"/api/users",
headers={"Authorization": "Bearer test-token"}
)
print(f"\nWith auth: {response.status_code}")
# Run the test
await test_route_access()
# Debug route matching issues
@app.get("/debug/test/{path:path}")
async def debug_route(path: str):
"""Catch-all route for debugging.
Add temporarily to see what's being requested.
"""
return {
"message": "Debug route caught this",
"path_received": path,
"tip": "If you see this, your intended route isn't registered"
}
  1. Use path parameters for IDs - Faster than query params

    # Good: O(1) lookup
    @app.get("/users/{user_id}")
    # Slower: Requires parsing
    @app.get("/users?id={user_id}")
  2. Limit query results - Always paginate

    @app.get("/items")
    async def list_items(limit: int = 100):
    if limit > 1000:
    limit = 1000 # Cap maximum
  3. Cache route handlers - For expensive operations

    from zenith.cache import cached
    @app.get("/stats")
    @cached(ttl=60) # Cache for 1 minute
    async def get_stats():
    return await calculate_expensive_stats()
from zenith.testing import TestClient
async def test_routes():
async with TestClient(app) as client:
# Test GET
response = await client.get("/")
assert response.status_code == 200
# Test POST
response = await client.post(
"/items",
json={"name": "Test Item"}
)
assert response.status_code == 201
# Test with headers
response = await client.get(
"/protected",
headers={"Authorization": "Bearer token"}
)
assert response.status_code == 200
# Multiple HTTP methods on same route
# Route: /users/<user_id>
def user_detail(user_id_str, method):
if method == 'POST':
# Get request data somehow
data = get_json_data()
# Manual validation needed
validate_data(data)
# Manual type conversion
try:
user_id = int(user_id_str)
except ValueError:
return {"error": "Invalid ID"}, 400
return {"user_id": user_id}
# Zenith - Separate handlers for clarity
@app.get("/users/{user_id}")
async def get_user(user_id: int): # Type conversion automatic
return {"user_id": user_id}
@app.post("/users/{user_id}")
async def update_user(user_id: int, data: UserUpdate):
# Validation automatic via Pydantic!
return {"updated": user_id}
Express.js
app.get('/users/:id', (req, res) => {
const id = parseInt(req.params.id);
if (isNaN(id)) {
return res.status(400).json({error: 'Invalid ID'});
}
res.json({user_id: id});
});
# Zenith
@app.get("/users/{user_id}")
async def get_user(user_id: int): # Auto-validates int!
return {"user_id": user_id}

Learn more in the API Reference or explore routing examples.