Authentication
Overview
Section titled “Overview”Zenith provides a comprehensive authentication system with JWT tokens, password hashing, and flexible authorization. It includes built-in support for user registration, login, and protected routes.
Quick Start (Complete Auth in 5 Minutes)
Section titled “Quick Start (Complete Auth in 5 Minutes)”from zenith import Zenith, Depends, HTTPException, Authfrom zenith.auth import create_access_token, get_current_userfrom zenith.auth.password import hash_password, verify_passwordfrom pydantic import BaseModel, EmailStr
app = Zenith()
# Input validation modelsclass UserLogin(BaseModel): """Login credentials - what users send to authenticate.""" email: EmailStr # EmailStr validates email format automatically password: str # Plain text (will be hashed, never stored)
class UserCreate(UserLogin): """Registration data - extends login with additional fields.""" name: str # Additional field for registration
# In-memory user store for demo (use database in production!)users_db = {} # {email: {email, name, password_hash}}
# REGISTRATION ENDPOINT - Create new user accounts@app.post("/auth/register")async def register(user: UserCreate): """ User registration flow: 1. Check if email is already taken 2. Hash the password (never store plain text!) 3. Save user to database 4. Return JWT token for immediate login """
# Step 1: Check if user already exists (prevent duplicates) if user.email in users_db: # Use proper HTTP status codes: # 400 = Bad Request (client error) # 409 = Conflict (would be more semantic) raise HTTPException(400, "Email already registered")
# Step 2: Create user record with hashed password users_db[user.email] = { "email": user.email, "name": user.name, # hash_password uses bcrypt - industry standard, salted, slow by design "password_hash": hash_password(user.password) } # Password is now gone from memory - we only keep the hash
# Step 3: Generate JWT token for immediate access # "sub" (subject) is JWT standard claim for user identity token = create_access_token({"sub": user.email})
# Step 4: Return token in standard OAuth2 format return { "access_token": token, "token_type": "bearer" # Tells client to use: Authorization: Bearer <token> }
# LOGIN ENDPOINT - Authenticate existing users@app.post("/auth/login")async def login(credentials: UserLogin): """ Login flow: 1. Find user by email 2. Verify password against stored hash 3. Generate new JWT token 4. Return token for API access """
# Step 1: Look up user (returns None if not found) user = users_db.get(credentials.email)
# Step 2: Verify credentials if not user or not verify_password(credentials.password, user["password_hash"]): # Important: Same error for "user not found" and "wrong password" # This prevents attackers from discovering valid emails raise HTTPException(401, "Invalid credentials") # 401 = Unauthorized
# Step 3: Generate JWT token (user is authenticated) token = create_access_token({"sub": credentials.email})
# Step 4: Return token return { "access_token": token, "token_type": "bearer", # Optional: Include user info to save a round trip "user": {"email": user["email"], "name": user["name"]} }
# PROTECTED ENDPOINT - Requires valid JWT token@app.get("/protected")async def protected_route( user = Depends(get_current_user) # Automatically validates JWT): """ Protected route that requires authentication.
How it works: 1. Client sends: Authorization: Bearer <token> 2. get_current_user extracts and validates token 3. If valid: user info is injected 4. If invalid: 401 Unauthorized is returned
No manual token validation needed! """
return { "message": f"Hello {user['email']}!", "user": user, "note": "This endpoint requires valid JWT token" }
# Example usage:# 1. Register: POST /auth/register {"email": "user@example.com", "password": "SecurePass123!", "name": "John"}# 2. Get token from response# 3. Use token: GET /protected with header "Authorization: Bearer <token>"JWT Configuration
Section titled “JWT Configuration”Basic Setup
Section titled “Basic Setup”from zenith.auth import JWTConfigimport os
# Configure JWT token generation and validationjwt_config = JWTConfig( # SECRET_KEY is critical - NEVER commit to git! secret_key=os.getenv("SECRET_KEY", "your-secret-key"), # In production: use long, random key (32+ bytes) # Generate with: openssl rand -hex 32
# Algorithm for signing tokens algorithm="HS256", # HMAC with SHA-256 (symmetric) # Other options: RS256 (RSA, asymmetric), ES256 (ECDSA)
# Token expiration times access_token_expire_minutes=30, # Short-lived for security refresh_token_expire_days=7 # Long-lived for convenience)
# Apply configuration to your appapp = Zenith(auth_config=jwt_config)
# Now all auth functions use these settings automaticallyAdvanced Configuration (Production-Grade)
Section titled “Advanced Configuration (Production-Grade)”jwt_config = JWTConfig( # RSA ASYMMETRIC KEYS (more secure for distributed systems) algorithm="RS256", # RSA with SHA-256
# Public key for verification (can be shared) public_key=open("public.pem").read(), # Private key for signing (keep secret!) private_key=open("private.pem").read(),
# Generate RSA keys with: # openssl genrsa -out private.pem 2048 # openssl rsa -in private.pem -pubout -out public.pem
# TOKEN LIFETIMES (balance security vs convenience) access_token_expire_minutes=15, # Very short for high security refresh_token_expire_days=30, # Long enough to avoid frequent logins
# JWT STANDARD CLAIMS (for multi-service architectures) issuer="https://api.example.com", # Who created the token audience="https://app.example.com", # Who can use the token
# VALIDATION OPTIONS (all should be True in production) verify_signature=True, # Check token hasn't been tampered with verify_exp=True, # Check token hasn't expired verify_aud=True, # Check token is for this service require_exp=True, # Reject tokens without expiration
# Additional security leeway_seconds=0, # No tolerance for expired tokens verify_iat=True, # Verify "issued at" time verify_nbf=True # Verify "not before" time)
# Why use RSA over HMAC?# - Public key can verify tokens without ability to create them# - Perfect for microservices where multiple services verify tokens# - API gateway can verify without knowing signing secretPassword Security
Section titled “Password Security”Hashing Passwords (Never Store Plain Text!)
Section titled “Hashing Passwords (Never Store Plain Text!)”from zenith.auth.password import ( hash_password, verify_password, PasswordConfig)
# Configure password requirements and hashingpassword_config = PasswordConfig( # BCRYPT WORK FACTOR (cost parameter) bcrypt_rounds=12, # 2^12 iterations # Higher = more secure but slower # 10 = ~10ms (development) # 12 = ~250ms (production) # 14 = ~1s (high security) # Adjust based on your server capacity
# PASSWORD COMPLEXITY REQUIREMENTS min_length=8, # Minimum characters require_uppercase=True, # Must have A-Z require_lowercase=True, # Must have a-z require_digits=True, # Must have 0-9 require_special=True, # Must have !@#$%^&* etc.
# Additional options max_length=128, # Prevent DoS with huge passwords banned_passwords=[ # Common passwords to reject "password", "12345678", "qwerty" ])
# HASHING - One way transformationhashed = hash_password("SecurePass123!", config=password_config)# Result: $2b$12$LQv3c1yqBWVHxkd0LHAkCOYz6TtxMQJqhN8/LewY.hsHL8jjF3DRM# ^^^^ ^^ ^^^^^^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^# alg cost salt hash
# VERIFICATION - Compare plain text with hashis_valid = verify_password("SecurePass123!", hashed)# Returns: True if password matches, False otherwise
# Important: verify_password is constant-time to prevent timing attacks
# Example: Registration flow@app.post("/register")async def register(email: str, password: str): # Validate password strength first if len(password) < password_config.min_length: raise HTTPException(400, "Password too short")
# Hash before storing user = User( email=email, password_hash=hash_password(password, password_config) ) await user.save()
# Original password is now gone - only hash remains! return {"message": "User created"}Password Validation
Section titled “Password Validation”from zenith.auth.password import validate_password_strength
@app.post("/auth/register")async def register(user: UserCreate): # Validate password strength errors = validate_password_strength(user.password) if errors: raise HTTPException(400, {"errors": errors})
# Continue with registration...Token Management
Section titled “Token Management”Creating Tokens (What Goes in a JWT?)
Section titled “Creating Tokens (What Goes in a JWT?)”from zenith.auth import create_access_token, create_refresh_tokenfrom datetime import timedelta
# STANDARD ACCESS TOKEN - Short-lived, contains user infoaccess_token = create_access_token( data={ "sub": user.email, # 'subject' - WHO is this token for? "role": user.role # What can they do? }, expires_delta=timedelta(minutes=15) # Dies quickly for security)# Use for: API requests, checking permissions# Token size: ~200-500 bytes
# REFRESH TOKEN - Long-lived, minimal inforefresh_token = create_refresh_token( data={ "sub": user.email # Only identity, no permissions }, expires_delta=timedelta(days=30) # Lives longer for convenience)# Use for: Getting new access tokens without re-login# Store: HttpOnly cookie or secure storage
# CUSTOM CLAIMS - Add app-specific datatoken = create_access_token( data={ # Standard claims "sub": user.email, # Subject (user identity) "iat": datetime.utcnow(), # Issued at (automatic) "exp": ..., # Expiration (automatic)
# Custom claims for your app "role": user.role, # For role-based access "permissions": [ # For fine-grained access "posts:read", "posts:write", "users:read" ], "org_id": user.organization_id, # Multi-tenant apps "subscription": "premium" # Feature flags })
# WARNING: Don't put sensitive data in JWT!# JWTs are encoded, NOT encrypted - anyone can read them# BAD: password, SSN, credit card# GOOD: user_id, email, role, permissionsDecoding Tokens
Section titled “Decoding Tokens”from zenith.auth import decode_token
@app.get("/auth/me")async def get_current_user_info(token: str = Header()): try: payload = decode_token(token) return { "email": payload["sub"], "role": payload.get("role"), "expires": payload["exp"] } except JWTError as e: raise HTTPException(401, str(e))Token Refresh
Section titled “Token Refresh”@app.post("/auth/refresh")async def refresh_token(refresh_token: str): try: # Verify refresh token payload = decode_token(refresh_token, verify_exp=True)
# Get user user = await get_user_by_email(payload["sub"]) if not user: raise HTTPException(401, "User not found")
# Issue new access token access_token = create_access_token({"sub": user.email}) return {"access_token": access_token}
except JWTError: raise HTTPException(401, "Invalid refresh token")Protected Routes
Section titled “Protected Routes”Basic Protection
Section titled “Basic Protection”from zenith.auth import require_auth
@app.get("/profile")@require_authasync def get_profile(current_user = Depends(get_current_user)): return current_userRole-Based Access Control (RBAC)
Section titled “Role-Based Access Control (RBAC)”from zenith.auth import require_role
# SINGLE ROLE REQUIREMENT@app.delete("/users/{user_id}")@require_role("admin") # Decorator checks role BEFORE function runsasync def delete_user( user_id: int, current_user = Depends(get_current_user) # Still need user info): """ Only admins can delete users.
Flow: 1. Client sends: DELETE /users/123 with JWT token 2. require_role decorator extracts and validates token 3. Checks if token contains role="admin" 4. If yes: proceeds to function 5. If no: returns 403 Forbidden """
# At this point, we KNOW user is admin await User.find(user_id).delete()
# Log admin action for audit logger.info(f"Admin {current_user['email']} deleted user {user_id}")
return {"deleted": user_id}
# MULTIPLE ROLES (OR logic)@app.get("/admin/dashboard")@require_role(["admin", "moderator"]) # Admin OR moderatorasync def admin_dashboard( current_user = Depends(get_current_user)): """ Multiple roles = ANY of these roles work. """
# Different UI based on role if current_user["role"] == "admin": return { "role": "admin", "features": ["users", "settings", "billing"], "can_delete": True } else: # moderator return { "role": "moderator", "features": ["users", "content"], "can_delete": False }
# HIERARCHICAL ROLESROLE_HIERARCHY = { "super_admin": ["admin", "moderator", "user"], "admin": ["moderator", "user"], "moderator": ["user"], "user": []}
def has_role(user_role: str, required_role: str) -> bool: """Check if user role includes required role.""" if user_role == required_role: return True return required_role in ROLE_HIERARCHY.get(user_role, [])Permission-Based Access
Section titled “Permission-Based Access”from zenith.auth import require_permission
@app.post("/posts")@require_permission("posts:write")async def create_post( post: PostCreate, current_user = Depends(get_current_user)): return {"created": post}
@app.put("/posts/{post_id}")@require_permission("posts:edit")async def edit_post( post_id: int, post: PostUpdate, current_user = Depends(get_current_user)): return {"updated": post_id}Custom Authorization (Beyond Roles)
Section titled “Custom Authorization (Beyond Roles)”from zenith.auth import AuthorizationChecker
class OwnershipChecker(AuthorizationChecker): """Check if user owns the resource.
Use when roles aren't enough: - Users can only edit THEIR OWN posts - Users can only delete THEIR OWN comments - Users can only view THEIR OWN orders """
async def check( self, user: dict, # Current user from JWT resource_id: int, # ID of resource to check resource_type: str # Type for polymorphic checks ) -> bool: """Return True if user owns resource."""
# Look up resource in database if resource_type == "post": resource = await Post.find(resource_id) elif resource_type == "comment": resource = await Comment.find(resource_id) else: return False # Unknown resource type
# Check ownership if not resource: return False # Resource doesn't exist
return resource.owner_id == user["id"] # Is owner?
# USE IN ROUTES - Combine with role checks@app.put("/posts/{post_id}")async def update_post( post_id: int, post_update: PostUpdate, current_user = Depends(get_current_user), checker: OwnershipChecker = Depends() # Inject checker): """ Users can only edit their own posts.
Authorization logic: 1. User must be authenticated (get_current_user) 2. User must own the post (OwnershipChecker) 3. OR user must be admin (bypass ownership) """
# Admin bypass - admins can edit any post if current_user.get("role") == "admin": # Log admin override for audit logger.info(f"Admin {current_user['email']} editing post {post_id}")
# Regular user - must own the post elif not await checker.check(current_user, post_id, "post"): raise HTTPException( 403, "You can only edit your own posts" )
# Proceed with update (authorized) post = await Post.find_or_404(post_id) await post.update(**post_update.model_dump())
return {"updated": post_id, "post": post.model_dump()}
# COMPLEX AUTHORIZATION - Multiple checks@app.post("/teams/{team_id}/invite")async def invite_to_team( team_id: int, email: str, current_user = Depends(get_current_user)): """Complex auth: Must be team owner OR admin OR have invite permission."""
team = await Team.find_or_404(team_id)
# Check various authorization paths is_owner = team.owner_id == current_user["id"] is_admin = current_user.get("role") == "admin" has_permission = "team:invite" in current_user.get("permissions", []) is_member = current_user["id"] in team.member_ids
if not (is_owner or is_admin or (is_member and has_permission)): raise HTTPException( 403, "You need to be team owner or have invite permission" )
# Proceed with invitation await send_invitation(team_id, email) return {"invited": email}OAuth2 Integration
Section titled “OAuth2 Integration”Google OAuth
Section titled “Google OAuth”from zenith.auth.oauth import GoogleOAuth
google_oauth = GoogleOAuth( client_id=os.getenv("GOOGLE_CLIENT_ID"), client_secret=os.getenv("GOOGLE_CLIENT_SECRET"), redirect_uri="https://api.example.com/auth/google/callback")
@app.get("/auth/google")async def google_login(): return {"url": google_oauth.get_authorization_url()}
@app.get("/auth/google/callback")async def google_callback(code: str): # Exchange code for token token = await google_oauth.get_access_token(code)
# Get user info user_info = await google_oauth.get_user_info(token)
# Create or update user user = await get_or_create_user( email=user_info["email"], name=user_info["name"], provider="google" )
# Generate JWT access_token = create_access_token({"sub": user.email}) return {"access_token": access_token}GitHub OAuth
Section titled “GitHub OAuth”from zenith.auth.oauth import GitHubOAuth
github_oauth = GitHubOAuth( client_id=os.getenv("GITHUB_CLIENT_ID"), client_secret=os.getenv("GITHUB_CLIENT_SECRET"), redirect_uri="https://api.example.com/auth/github/callback")
@app.get("/auth/github")async def github_login(): return {"url": github_oauth.get_authorization_url(scope="user:email")}
@app.get("/auth/github/callback")async def github_callback(code: str): # Similar to Google OAuth passSession Management
Section titled “Session Management”Cookie-Based Sessions
Section titled “Cookie-Based Sessions”from zenith.sessions import SessionMiddleware, SessionConfig
app.add_middleware(SessionMiddleware, { "secret_key": os.getenv("SESSION_SECRET"), "cookie_name": "session_id", "cookie_secure": True, # HTTPS only "cookie_httponly": True, "cookie_samesite": "lax", "max_age": 86400 # 24 hours})
@app.post("/auth/login")async def login(credentials: UserLogin, request: Request): # Verify credentials...
# Store in session request.session["user_id"] = user.id request.session["email"] = user.email
return {"status": "logged in"}
@app.post("/auth/logout")async def logout(request: Request): request.session.clear() return {"status": "logged out"}Multi-Factor Authentication
Section titled “Multi-Factor Authentication”from zenith.auth.mfa import TOTP, generate_secret
@app.post("/auth/mfa/enable")async def enable_mfa(current_user = Depends(get_current_user)): # Generate secret secret = generate_secret()
# Save to user (encrypted) await save_user_mfa_secret(current_user["id"], secret)
# Generate QR code URL totp = TOTP(secret) qr_url = totp.provisioning_uri( name=current_user["email"], issuer="MyApp" )
return {"qr_url": qr_url, "secret": secret}
@app.post("/auth/mfa/verify")async def verify_mfa( code: str, current_user = Depends(get_current_user)): # Get user's secret secret = await get_user_mfa_secret(current_user["id"])
# Verify code totp = TOTP(secret) if not totp.verify(code): raise HTTPException(400, "Invalid code")
# Mark MFA as verified await mark_mfa_verified(current_user["id"])
return {"status": "MFA enabled"}Testing Authentication
Section titled “Testing Authentication”from zenith.testing import TestClient, MockAuth
@pytest.mark.asyncioasync def test_protected_route(): async with TestClient(app) as client: # Test without auth response = await client.get("/protected") assert response.status_code == 401
# Test with mock auth with MockAuth(user={"email": "test@example.com", "role": "admin"}): response = await client.get("/protected") assert response.status_code == 200 assert response.json()["email"] == "test@example.com"
@pytest.mark.asyncioasync def test_login(): async with TestClient(app) as client: # Register user response = await client.post("/auth/register", json={ "email": "test@example.com", "password": "SecurePass123!", "name": "Test User" }) assert response.status_code == 200 token = response.json()["access_token"]
# Use token response = await client.get( "/protected", headers={"Authorization": f"Bearer {token}"} ) assert response.status_code == 200Best Practices
Section titled “Best Practices”- Use environment variables for secrets
- Implement rate limiting on auth endpoints
- Log authentication attempts for security monitoring
- Use secure cookies with HttpOnly and SameSite flags
- Implement token rotation for long-lived sessions
- Add CAPTCHA for registration and login after failures
- Email verification for new accounts
- Password reset with secure tokens
Next Steps
Section titled “Next Steps”- Implement Database user storage
- Add Middleware for auth
- Learn about Testing auth flows