Skip to content

Authentication

Zenith provides a comprehensive authentication system with JWT tokens, password hashing, and flexible authorization. It includes built-in support for user registration, login, and protected routes.

from zenith import Zenith, Depends, HTTPException, Auth
from zenith.auth import create_access_token, get_current_user
from zenith.auth.password import hash_password, verify_password
from pydantic import BaseModel, EmailStr
app = Zenith()
# Input validation models
class UserLogin(BaseModel):
"""Login credentials - what users send to authenticate."""
email: EmailStr # EmailStr validates email format automatically
password: str # Plain text (will be hashed, never stored)
class UserCreate(UserLogin):
"""Registration data - extends login with additional fields."""
name: str # Additional field for registration
# In-memory user store for demo (use database in production!)
users_db = {} # {email: {email, name, password_hash}}
# REGISTRATION ENDPOINT - Create new user accounts
@app.post("/auth/register")
async def register(user: UserCreate):
"""
User registration flow:
1. Check if email is already taken
2. Hash the password (never store plain text!)
3. Save user to database
4. Return JWT token for immediate login
"""
# Step 1: Check if user already exists (prevent duplicates)
if user.email in users_db:
# Use proper HTTP status codes:
# 400 = Bad Request (client error)
# 409 = Conflict (would be more semantic)
raise HTTPException(400, "Email already registered")
# Step 2: Create user record with hashed password
users_db[user.email] = {
"email": user.email,
"name": user.name,
# hash_password uses bcrypt - industry standard, salted, slow by design
"password_hash": hash_password(user.password)
}
# Password is now gone from memory - we only keep the hash
# Step 3: Generate JWT token for immediate access
# "sub" (subject) is JWT standard claim for user identity
token = create_access_token({"sub": user.email})
# Step 4: Return token in standard OAuth2 format
return {
"access_token": token,
"token_type": "bearer" # Tells client to use: Authorization: Bearer <token>
}
# LOGIN ENDPOINT - Authenticate existing users
@app.post("/auth/login")
async def login(credentials: UserLogin):
"""
Login flow:
1. Find user by email
2. Verify password against stored hash
3. Generate new JWT token
4. Return token for API access
"""
# Step 1: Look up user (returns None if not found)
user = users_db.get(credentials.email)
# Step 2: Verify credentials
if not user or not verify_password(credentials.password, user["password_hash"]):
# Important: Same error for "user not found" and "wrong password"
# This prevents attackers from discovering valid emails
raise HTTPException(401, "Invalid credentials") # 401 = Unauthorized
# Step 3: Generate JWT token (user is authenticated)
token = create_access_token({"sub": credentials.email})
# Step 4: Return token
return {
"access_token": token,
"token_type": "bearer",
# Optional: Include user info to save a round trip
"user": {"email": user["email"], "name": user["name"]}
}
# PROTECTED ENDPOINT - Requires valid JWT token
@app.get("/protected")
async def protected_route(
user = Depends(get_current_user) # Automatically validates JWT
):
"""
Protected route that requires authentication.
How it works:
1. Client sends: Authorization: Bearer <token>
2. get_current_user extracts and validates token
3. If valid: user info is injected
4. If invalid: 401 Unauthorized is returned
No manual token validation needed!
"""
return {
"message": f"Hello {user['email']}!",
"user": user,
"note": "This endpoint requires valid JWT token"
}
# Example usage:
# 1. Register: POST /auth/register {"email": "user@example.com", "password": "SecurePass123!", "name": "John"}
# 2. Get token from response
# 3. Use token: GET /protected with header "Authorization: Bearer <token>"
from zenith.auth import JWTConfig
import os
# Configure JWT token generation and validation
jwt_config = JWTConfig(
# SECRET_KEY is critical - NEVER commit to git!
secret_key=os.getenv("SECRET_KEY", "your-secret-key"),
# In production: use long, random key (32+ bytes)
# Generate with: openssl rand -hex 32
# Algorithm for signing tokens
algorithm="HS256", # HMAC with SHA-256 (symmetric)
# Other options: RS256 (RSA, asymmetric), ES256 (ECDSA)
# Token expiration times
access_token_expire_minutes=30, # Short-lived for security
refresh_token_expire_days=7 # Long-lived for convenience
)
# Apply configuration to your app
app = Zenith(auth_config=jwt_config)
# Now all auth functions use these settings automatically
jwt_config = JWTConfig(
# RSA ASYMMETRIC KEYS (more secure for distributed systems)
algorithm="RS256", # RSA with SHA-256
# Public key for verification (can be shared)
public_key=open("public.pem").read(),
# Private key for signing (keep secret!)
private_key=open("private.pem").read(),
# Generate RSA keys with:
# openssl genrsa -out private.pem 2048
# openssl rsa -in private.pem -pubout -out public.pem
# TOKEN LIFETIMES (balance security vs convenience)
access_token_expire_minutes=15, # Very short for high security
refresh_token_expire_days=30, # Long enough to avoid frequent logins
# JWT STANDARD CLAIMS (for multi-service architectures)
issuer="https://api.example.com", # Who created the token
audience="https://app.example.com", # Who can use the token
# VALIDATION OPTIONS (all should be True in production)
verify_signature=True, # Check token hasn't been tampered with
verify_exp=True, # Check token hasn't expired
verify_aud=True, # Check token is for this service
require_exp=True, # Reject tokens without expiration
# Additional security
leeway_seconds=0, # No tolerance for expired tokens
verify_iat=True, # Verify "issued at" time
verify_nbf=True # Verify "not before" time
)
# Why use RSA over HMAC?
# - Public key can verify tokens without ability to create them
# - Perfect for microservices where multiple services verify tokens
# - API gateway can verify without knowing signing secret

Hashing Passwords (Never Store Plain Text!)

Section titled “Hashing Passwords (Never Store Plain Text!)”
from zenith.auth.password import (
hash_password,
verify_password,
PasswordConfig
)
# Configure password requirements and hashing
password_config = PasswordConfig(
# BCRYPT WORK FACTOR (cost parameter)
bcrypt_rounds=12, # 2^12 iterations
# Higher = more secure but slower
# 10 = ~10ms (development)
# 12 = ~250ms (production)
# 14 = ~1s (high security)
# Adjust based on your server capacity
# PASSWORD COMPLEXITY REQUIREMENTS
min_length=8, # Minimum characters
require_uppercase=True, # Must have A-Z
require_lowercase=True, # Must have a-z
require_digits=True, # Must have 0-9
require_special=True, # Must have !@#$%^&* etc.
# Additional options
max_length=128, # Prevent DoS with huge passwords
banned_passwords=[ # Common passwords to reject
"password", "12345678", "qwerty"
]
)
# HASHING - One way transformation
hashed = hash_password("SecurePass123!", config=password_config)
# Result: $2b$12$LQv3c1yqBWVHxkd0LHAkCOYz6TtxMQJqhN8/LewY.hsHL8jjF3DRM
# ^^^^ ^^ ^^^^^^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
# alg cost salt hash
# VERIFICATION - Compare plain text with hash
is_valid = verify_password("SecurePass123!", hashed)
# Returns: True if password matches, False otherwise
# Important: verify_password is constant-time to prevent timing attacks
# Example: Registration flow
@app.post("/register")
async def register(email: str, password: str):
# Validate password strength first
if len(password) < password_config.min_length:
raise HTTPException(400, "Password too short")
# Hash before storing
user = User(
email=email,
password_hash=hash_password(password, password_config)
)
await user.save()
# Original password is now gone - only hash remains!
return {"message": "User created"}
from zenith.auth.password import validate_password_strength
@app.post("/auth/register")
async def register(user: UserCreate):
# Validate password strength
errors = validate_password_strength(user.password)
if errors:
raise HTTPException(400, {"errors": errors})
# Continue with registration...
from zenith.auth import create_access_token, create_refresh_token
from datetime import timedelta
# STANDARD ACCESS TOKEN - Short-lived, contains user info
access_token = create_access_token(
data={
"sub": user.email, # 'subject' - WHO is this token for?
"role": user.role # What can they do?
},
expires_delta=timedelta(minutes=15) # Dies quickly for security
)
# Use for: API requests, checking permissions
# Token size: ~200-500 bytes
# REFRESH TOKEN - Long-lived, minimal info
refresh_token = create_refresh_token(
data={
"sub": user.email # Only identity, no permissions
},
expires_delta=timedelta(days=30) # Lives longer for convenience
)
# Use for: Getting new access tokens without re-login
# Store: HttpOnly cookie or secure storage
# CUSTOM CLAIMS - Add app-specific data
token = create_access_token(
data={
# Standard claims
"sub": user.email, # Subject (user identity)
"iat": datetime.utcnow(), # Issued at (automatic)
"exp": ..., # Expiration (automatic)
# Custom claims for your app
"role": user.role, # For role-based access
"permissions": [ # For fine-grained access
"posts:read",
"posts:write",
"users:read"
],
"org_id": user.organization_id, # Multi-tenant apps
"subscription": "premium" # Feature flags
}
)
# WARNING: Don't put sensitive data in JWT!
# JWTs are encoded, NOT encrypted - anyone can read them
# BAD: password, SSN, credit card
# GOOD: user_id, email, role, permissions
from zenith.auth import decode_token
@app.get("/auth/me")
async def get_current_user_info(token: str = Header()):
try:
payload = decode_token(token)
return {
"email": payload["sub"],
"role": payload.get("role"),
"expires": payload["exp"]
}
except JWTError as e:
raise HTTPException(401, str(e))
@app.post("/auth/refresh")
async def refresh_token(refresh_token: str):
try:
# Verify refresh token
payload = decode_token(refresh_token, verify_exp=True)
# Get user
user = await get_user_by_email(payload["sub"])
if not user:
raise HTTPException(401, "User not found")
# Issue new access token
access_token = create_access_token({"sub": user.email})
return {"access_token": access_token}
except JWTError:
raise HTTPException(401, "Invalid refresh token")
from zenith.auth import require_auth
@app.get("/profile")
@require_auth
async def get_profile(current_user = Depends(get_current_user)):
return current_user
from zenith.auth import require_role
# SINGLE ROLE REQUIREMENT
@app.delete("/users/{user_id}")
@require_role("admin") # Decorator checks role BEFORE function runs
async def delete_user(
user_id: int,
current_user = Depends(get_current_user) # Still need user info
):
"""
Only admins can delete users.
Flow:
1. Client sends: DELETE /users/123 with JWT token
2. require_role decorator extracts and validates token
3. Checks if token contains role="admin"
4. If yes: proceeds to function
5. If no: returns 403 Forbidden
"""
# At this point, we KNOW user is admin
await User.find(user_id).delete()
# Log admin action for audit
logger.info(f"Admin {current_user['email']} deleted user {user_id}")
return {"deleted": user_id}
# MULTIPLE ROLES (OR logic)
@app.get("/admin/dashboard")
@require_role(["admin", "moderator"]) # Admin OR moderator
async def admin_dashboard(
current_user = Depends(get_current_user)
):
"""
Multiple roles = ANY of these roles work.
"""
# Different UI based on role
if current_user["role"] == "admin":
return {
"role": "admin",
"features": ["users", "settings", "billing"],
"can_delete": True
}
else: # moderator
return {
"role": "moderator",
"features": ["users", "content"],
"can_delete": False
}
# HIERARCHICAL ROLES
ROLE_HIERARCHY = {
"super_admin": ["admin", "moderator", "user"],
"admin": ["moderator", "user"],
"moderator": ["user"],
"user": []
}
def has_role(user_role: str, required_role: str) -> bool:
"""Check if user role includes required role."""
if user_role == required_role:
return True
return required_role in ROLE_HIERARCHY.get(user_role, [])
from zenith.auth import require_permission
@app.post("/posts")
@require_permission("posts:write")
async def create_post(
post: PostCreate,
current_user = Depends(get_current_user)
):
return {"created": post}
@app.put("/posts/{post_id}")
@require_permission("posts:edit")
async def edit_post(
post_id: int,
post: PostUpdate,
current_user = Depends(get_current_user)
):
return {"updated": post_id}
from zenith.auth import AuthorizationChecker
class OwnershipChecker(AuthorizationChecker):
"""Check if user owns the resource.
Use when roles aren't enough:
- Users can only edit THEIR OWN posts
- Users can only delete THEIR OWN comments
- Users can only view THEIR OWN orders
"""
async def check(
self,
user: dict, # Current user from JWT
resource_id: int, # ID of resource to check
resource_type: str # Type for polymorphic checks
) -> bool:
"""Return True if user owns resource."""
# Look up resource in database
if resource_type == "post":
resource = await Post.find(resource_id)
elif resource_type == "comment":
resource = await Comment.find(resource_id)
else:
return False # Unknown resource type
# Check ownership
if not resource:
return False # Resource doesn't exist
return resource.owner_id == user["id"] # Is owner?
# USE IN ROUTES - Combine with role checks
@app.put("/posts/{post_id}")
async def update_post(
post_id: int,
post_update: PostUpdate,
current_user = Depends(get_current_user),
checker: OwnershipChecker = Depends() # Inject checker
):
"""
Users can only edit their own posts.
Authorization logic:
1. User must be authenticated (get_current_user)
2. User must own the post (OwnershipChecker)
3. OR user must be admin (bypass ownership)
"""
# Admin bypass - admins can edit any post
if current_user.get("role") == "admin":
# Log admin override for audit
logger.info(f"Admin {current_user['email']} editing post {post_id}")
# Regular user - must own the post
elif not await checker.check(current_user, post_id, "post"):
raise HTTPException(
403,
"You can only edit your own posts"
)
# Proceed with update (authorized)
post = await Post.find_or_404(post_id)
await post.update(**post_update.model_dump())
return {"updated": post_id, "post": post.model_dump()}
# COMPLEX AUTHORIZATION - Multiple checks
@app.post("/teams/{team_id}/invite")
async def invite_to_team(
team_id: int,
email: str,
current_user = Depends(get_current_user)
):
"""Complex auth: Must be team owner OR admin OR have invite permission."""
team = await Team.find_or_404(team_id)
# Check various authorization paths
is_owner = team.owner_id == current_user["id"]
is_admin = current_user.get("role") == "admin"
has_permission = "team:invite" in current_user.get("permissions", [])
is_member = current_user["id"] in team.member_ids
if not (is_owner or is_admin or (is_member and has_permission)):
raise HTTPException(
403,
"You need to be team owner or have invite permission"
)
# Proceed with invitation
await send_invitation(team_id, email)
return {"invited": email}
from zenith.auth.oauth import GoogleOAuth
google_oauth = GoogleOAuth(
client_id=os.getenv("GOOGLE_CLIENT_ID"),
client_secret=os.getenv("GOOGLE_CLIENT_SECRET"),
redirect_uri="https://api.example.com/auth/google/callback"
)
@app.get("/auth/google")
async def google_login():
return {"url": google_oauth.get_authorization_url()}
@app.get("/auth/google/callback")
async def google_callback(code: str):
# Exchange code for token
token = await google_oauth.get_access_token(code)
# Get user info
user_info = await google_oauth.get_user_info(token)
# Create or update user
user = await get_or_create_user(
email=user_info["email"],
name=user_info["name"],
provider="google"
)
# Generate JWT
access_token = create_access_token({"sub": user.email})
return {"access_token": access_token}
from zenith.auth.oauth import GitHubOAuth
github_oauth = GitHubOAuth(
client_id=os.getenv("GITHUB_CLIENT_ID"),
client_secret=os.getenv("GITHUB_CLIENT_SECRET"),
redirect_uri="https://api.example.com/auth/github/callback"
)
@app.get("/auth/github")
async def github_login():
return {"url": github_oauth.get_authorization_url(scope="user:email")}
@app.get("/auth/github/callback")
async def github_callback(code: str):
# Similar to Google OAuth
pass
from zenith.sessions import SessionMiddleware, SessionConfig
app.add_middleware(SessionMiddleware, {
"secret_key": os.getenv("SESSION_SECRET"),
"cookie_name": "session_id",
"cookie_secure": True, # HTTPS only
"cookie_httponly": True,
"cookie_samesite": "lax",
"max_age": 86400 # 24 hours
})
@app.post("/auth/login")
async def login(credentials: UserLogin, request: Request):
# Verify credentials...
# Store in session
request.session["user_id"] = user.id
request.session["email"] = user.email
return {"status": "logged in"}
@app.post("/auth/logout")
async def logout(request: Request):
request.session.clear()
return {"status": "logged out"}
from zenith.auth.mfa import TOTP, generate_secret
@app.post("/auth/mfa/enable")
async def enable_mfa(current_user = Depends(get_current_user)):
# Generate secret
secret = generate_secret()
# Save to user (encrypted)
await save_user_mfa_secret(current_user["id"], secret)
# Generate QR code URL
totp = TOTP(secret)
qr_url = totp.provisioning_uri(
name=current_user["email"],
issuer="MyApp"
)
return {"qr_url": qr_url, "secret": secret}
@app.post("/auth/mfa/verify")
async def verify_mfa(
code: str,
current_user = Depends(get_current_user)
):
# Get user's secret
secret = await get_user_mfa_secret(current_user["id"])
# Verify code
totp = TOTP(secret)
if not totp.verify(code):
raise HTTPException(400, "Invalid code")
# Mark MFA as verified
await mark_mfa_verified(current_user["id"])
return {"status": "MFA enabled"}
from zenith.testing import TestClient, MockAuth
@pytest.mark.asyncio
async def test_protected_route():
async with TestClient(app) as client:
# Test without auth
response = await client.get("/protected")
assert response.status_code == 401
# Test with mock auth
with MockAuth(user={"email": "test@example.com", "role": "admin"}):
response = await client.get("/protected")
assert response.status_code == 200
assert response.json()["email"] == "test@example.com"
@pytest.mark.asyncio
async def test_login():
async with TestClient(app) as client:
# Register user
response = await client.post("/auth/register", json={
"email": "test@example.com",
"password": "SecurePass123!",
"name": "Test User"
})
assert response.status_code == 200
token = response.json()["access_token"]
# Use token
response = await client.get(
"/protected",
headers={"Authorization": f"Bearer {token}"}
)
assert response.status_code == 200
  1. Use environment variables for secrets
  2. Implement rate limiting on auth endpoints
  3. Log authentication attempts for security monitoring
  4. Use secure cookies with HttpOnly and SameSite flags
  5. Implement token rotation for long-lived sessions
  6. Add CAPTCHA for registration and login after failures
  7. Email verification for new accounts
  8. Password reset with secure tokens