Skip to content

Part 4: Authentication

Tutorial Part 4: Authentication & Authorization

Section titled “Tutorial Part 4: Authentication & Authorization”

Security is critical for any API. In this part, we’ll implement a complete authentication system with JWT tokens, secure password handling, role-based access control, and best practices you’ll use in production.

By the end of this part:

  • JWT token authentication with refresh tokens
  • Secure password hashing with Argon2 (via pwdlib)
  • Login, logout, and registration endpoints
  • Password reset flow with email tokens
  • Role-based access control (RBAC)
  • Session management
  • Rate limiting for security endpoints

Understanding Authentication vs Authorization

Section titled “Understanding Authentication vs Authorization”

Before we code, let’s clarify these often-confused concepts:

  • Authentication: Who are you? (Identity verification)
  • Authorization: What can you do? (Permission checking)
# Authentication: Is this a valid user?
user = authenticate(email="alice@example.com", password="secret")
# Authorization: Can this user perform this action?
if not user.can_delete_project(project):
raise PermissionError("You cannot delete this project")

First, install the required packages:

Terminal window
# Using uv (Zenith already includes pwdlib with Argon2)
uv add python-jose[cryptography] python-multipart
# Or using pip
pip install python-jose[cryptography] python-multipart pwdlib[argon2]

Never store passwords in plain text. We’ll use Argon2id for secure hashing (via pwdlib, included with Zenith).

Update app/auth.py:

"""
Authentication system with JWT tokens and secure password handling.
This module handles:
- Password hashing with Argon2id (modern, secure)
- JWT token generation and validation
- User authentication
- Permission checking
"""
from typing import Optional, Dict, Any
from datetime import datetime, timedelta
from jose import JWTError, jwt
from pwdlib import PasswordHash
from app.config import settings
# Password hashing configuration
# Argon2id is the modern recommended algorithm (winner of Password Hashing Competition 2015)
pwd_hash = PasswordHash.recommended() # Uses Argon2id with secure defaults
# JWT Configuration
SECRET_KEY = settings.SECRET_KEY
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
REFRESH_TOKEN_EXPIRE_DAYS = 7
def hash_password(password: str) -> str:
"""
Hash a password using Argon2id.
Argon2id automatically handles:
- Salt generation (unique per password)
- Memory-hard computation (resistant to GPU/ASIC attacks)
- Timing attack resistance
Args:
password: Plain text password
Returns:
Hashed password safe for storage
"""
return pwd_hash.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""
Verify a password against its hash.
This is timing-attack resistant - takes the same time
whether the password is correct or not.
Args:
plain_password: Password to check
hashed_password: Stored hash to check against
Returns:
True if password matches, False otherwise
"""
return pwd_hash.verify(plain_password, hashed_password)
def create_access_token(
data: Dict[str, Any],
expires_delta: Optional[timedelta] = None
) -> str:
"""
Create a JWT access token.
Access tokens are short-lived (30 minutes default) and
contain minimal user information.
Args:
data: Payload to encode (user_id, email, etc)
expires_delta: Custom expiration time
Returns:
Encoded JWT token
"""
to_encode = data.copy()
# Set expiration
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({
"exp": expire,
"type": "access"
})
# Create and return the token
encoded_jwt = jwt.encode(
to_encode,
SECRET_KEY,
algorithm=ALGORITHM
)
return encoded_jwt
def create_refresh_token(
data: Dict[str, Any]
) -> str:
"""
Create a JWT refresh token.
Refresh tokens are long-lived (7 days) and used to
get new access tokens without re-authenticating.
Args:
data: Minimal user identification
Returns:
Encoded refresh token
"""
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
to_encode.update({
"exp": expire,
"type": "refresh"
})
encoded_jwt = jwt.encode(
to_encode,
SECRET_KEY,
algorithm=ALGORITHM
)
return encoded_jwt
def decode_token(token: str) -> Dict[str, Any]:
"""
Decode and validate a JWT token.
Checks:
- Signature validity
- Expiration time
- Token structure
Args:
token: JWT token to decode
Returns:
Decoded token payload
Raises:
JWTError: If token is invalid or expired
"""
try:
payload = jwt.decode(
token,
SECRET_KEY,
algorithms=[ALGORITHM]
)
return payload
except JWTError:
raise

Update app/models.py to add role support:

from enum import Enum
class UserRole(str, Enum):
"""User roles for authorization."""
ADMIN = "admin"
MANAGER = "manager"
USER = "user"
class User(UserBase, table=True):
"""Enhanced user model with roles and security fields."""
__tablename__ = "users"
id: Optional[int] = Field(default=None, primary_key=True)
password_hash: str
role: UserRole = Field(default=UserRole.USER)
# Security fields
is_active: bool = Field(default=True)
is_verified: bool = Field(default=False) # Email verified
last_login: Optional[datetime] = None
failed_login_attempts: int = Field(default=0)
locked_until: Optional[datetime] = None # Account lockout
# Tokens for various flows
email_verification_token: Optional[str] = None
password_reset_token: Optional[str] = None
password_reset_expires: Optional[datetime] = None
# Timestamps
created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: datetime = Field(default_factory=datetime.utcnow)
# Relationships
projects: List["Project"] = Relationship(back_populates="owner")
sessions: List["UserSession"] = Relationship(back_populates="user")
# Authorization methods
def has_role(self, role: UserRole) -> bool:
"""Check if user has a specific role or higher."""
role_hierarchy = {
UserRole.USER: 0,
UserRole.MANAGER: 1,
UserRole.ADMIN: 2
}
return role_hierarchy.get(self.role, 0) >= role_hierarchy.get(role, 0)
def can_manage_project(self, project: "Project") -> bool:
"""Check if user can manage a project."""
return (
self.role == UserRole.ADMIN or
project.owner_id == self.id or
self.id in project.member_ids
)
def is_locked(self) -> bool:
"""Check if account is locked due to failed attempts."""
if self.locked_until:
if datetime.utcnow() < self.locked_until:
return True
# Unlock if time has passed
self.locked_until = None
self.failed_login_attempts = 0
return False

Create app/services/auth.py:

"""
Authentication service handling login, registration, and tokens.
"""
from typing import Optional
from datetime import datetime, timedelta
import secrets
from sqlmodel import select
from app.services import BaseService
from app.models import User, UserRole, UserSession
from app.auth import (
hash_password,
verify_password,
create_access_token,
create_refresh_token,
decode_token
)
from app.exceptions import (
AuthenticationError,
ValidationError,
ConflictError,
NotFoundError
)
from app.email import send_email
class AuthService(BaseService):
"""Handles authentication and authorization."""
# Security constants
MAX_LOGIN_ATTEMPTS = 5
LOCKOUT_DURATION_MINUTES = 30
TOKEN_LENGTH = 32
async def register(
self,
email: str,
password: str,
name: str,
send_verification: bool = True
) -> User:
"""
Register a new user.
Steps:
1. Validate email uniqueness
2. Validate password strength
3. Hash password
4. Create user account
5. Send verification email
Args:
email: User's email address
password: Plain text password
name: User's display name
send_verification: Whether to send email verification
Returns:
Created user object
Raises:
ConflictError: If email already exists
ValidationError: If password is too weak
"""
# Check email uniqueness
stmt = select(User).where(User.email == email)
existing = await self.session.exec(stmt).first()
if existing:
raise ConflictError("Email already registered")
# Validate password strength
self._validate_password_strength(password)
# Create user
user = User(
email=email,
password_hash=hash_password(password),
name=name,
role=UserRole.USER,
is_verified=not send_verification # Skip verification in tests
)
# Generate email verification token
if send_verification:
user.email_verification_token = secrets.token_urlsafe(self.TOKEN_LENGTH)
self.session.add(user)
await self.commit()
# Send verification email
if send_verification:
await self._send_verification_email(user)
return user
async def login(
self,
email: str,
password: str,
ip_address: Optional[str] = None,
user_agent: Optional[str] = None
) -> Dict[str, Any]:
"""
Authenticate user and create session.
Security features:
- Account lockout after failed attempts
- Session tracking
- Refresh token for extended access
Args:
email: User's email
password: Plain text password
ip_address: Client IP for session tracking
user_agent: Client user agent
Returns:
Dict with tokens and user info
Raises:
AuthenticationError: If login fails
"""
# Get user
stmt = select(User).where(User.email == email)
user = await self.session.exec(stmt).first()
if not user:
# Don't reveal whether email exists
raise AuthenticationError("Invalid email or password")
# Check if account is locked
if user.is_locked():
minutes_left = (user.locked_until - datetime.utcnow()).seconds // 60
raise AuthenticationError(
f"Account locked. Try again in {minutes_left} minutes."
)
# Verify password
if not verify_password(password, user.password_hash):
# Increment failed attempts
user.failed_login_attempts += 1
# Lock account if too many failures
if user.failed_login_attempts >= self.MAX_LOGIN_ATTEMPTS:
user.locked_until = (
datetime.utcnow() +
timedelta(minutes=self.LOCKOUT_DURATION_MINUTES)
)
await self.commit()
raise AuthenticationError("Too many failed attempts. Account locked.")
await self.commit()
raise AuthenticationError("Invalid email or password")
# Check if email is verified
if not user.is_verified:
raise AuthenticationError("Please verify your email first")
# Check if account is active
if not user.is_active:
raise AuthenticationError("Account has been deactivated")
# Reset failed attempts on successful login
user.failed_login_attempts = 0
user.last_login = datetime.utcnow()
# Create tokens
access_token = create_access_token(
data={
"sub": user.email,
"user_id": user.id,
"role": user.role
}
)
refresh_token = create_refresh_token(
data={
"sub": user.email,
"user_id": user.id
}
)
# Create session record
session = UserSession(
user_id=user.id,
refresh_token=refresh_token,
ip_address=ip_address,
user_agent=user_agent,
expires_at=datetime.utcnow() + timedelta(days=7)
)
self.session.add(session)
await self.commit()
return {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "bearer",
"user": user
}
async def refresh_access_token(
self,
refresh_token: str
) -> Dict[str, str]:
"""
Generate new access token from refresh token.
This allows users to stay logged in without
re-entering credentials.
Args:
refresh_token: Valid refresh token
Returns:
New access token
Raises:
AuthenticationError: If refresh token is invalid
"""
try:
# Decode refresh token
payload = decode_token(refresh_token)
if payload.get("type") != "refresh":
raise AuthenticationError("Invalid token type")
# Check if session exists and is valid
stmt = select(UserSession).where(
UserSession.refresh_token == refresh_token
)
session = await self.session.exec(stmt).first()
if not session or session.is_expired():
raise AuthenticationError("Session expired")
# Get user
user = await self.session.get(User, payload["user_id"])
if not user or not user.is_active:
raise AuthenticationError("User not found or inactive")
# Create new access token
access_token = create_access_token(
data={
"sub": user.email,
"user_id": user.id,
"role": user.role
}
)
return {
"access_token": access_token,
"token_type": "bearer"
}
except JWTError:
raise AuthenticationError("Invalid refresh token")
async def logout(
self,
user_id: int,
refresh_token: Optional[str] = None
) -> bool:
"""
Logout user by invalidating session.
Args:
user_id: User to logout
refresh_token: Specific session to invalidate
Returns:
True if successful
"""
if refresh_token:
# Invalidate specific session
stmt = select(UserSession).where(
(UserSession.user_id == user_id) &
(UserSession.refresh_token == refresh_token)
)
session = await self.session.exec(stmt).first()
if session:
await self.session.delete(session)
else:
# Invalidate all sessions
stmt = select(UserSession).where(UserSession.user_id == user_id)
sessions = await self.session.exec(stmt).all()
for session in sessions:
await self.session.delete(session)
await self.commit()
return True
async def request_password_reset(
self,
email: str
) -> bool:
"""
Initiate password reset process.
Generates a secure token and sends reset email.
Args:
email: User's email address
Returns:
True (always, to prevent email enumeration)
"""
# Get user (but don't reveal if exists)
stmt = select(User).where(User.email == email)
user = await self.session.exec(stmt).first()
if user and user.is_active:
# Generate reset token
user.password_reset_token = secrets.token_urlsafe(self.TOKEN_LENGTH)
user.password_reset_expires = (
datetime.utcnow() + timedelta(hours=1)
)
await self.commit()
# Send reset email
await self._send_password_reset_email(user)
# Always return True to prevent email enumeration
return True
async def reset_password(
self,
token: str,
new_password: str
) -> bool:
"""
Reset password using token.
Args:
token: Password reset token
new_password: New password
Returns:
True if successful
Raises:
ValidationError: If token invalid or expired
"""
# Find user with token
stmt = select(User).where(User.password_reset_token == token)
user = await self.session.exec(stmt).first()
if not user:
raise ValidationError("Invalid reset token")
# Check expiration
if user.password_reset_expires < datetime.utcnow():
raise ValidationError("Reset token has expired")
# Validate new password
self._validate_password_strength(new_password)
# Update password
user.password_hash = hash_password(new_password)
user.password_reset_token = None
user.password_reset_expires = None
# Invalidate all sessions (force re-login)
stmt = select(UserSession).where(UserSession.user_id == user.id)
sessions = await self.session.exec(stmt).all()
for session in sessions:
await self.session.delete(session)
await self.commit()
# Send confirmation email
await self._send_password_changed_email(user)
return True
def _validate_password_strength(self, password: str) -> None:
"""
Validate password meets security requirements.
Requirements:
- At least 8 characters
- Contains uppercase and lowercase
- Contains at least one number
- Not a common password
Raises:
ValidationError: If password is weak
"""
if len(password) < 8:
raise ValidationError("Password must be at least 8 characters")
if not any(c.isupper() for c in password):
raise ValidationError("Password must contain uppercase letter")
if not any(c.islower() for c in password):
raise ValidationError("Password must contain lowercase letter")
if not any(c.isdigit() for c in password):
raise ValidationError("Password must contain a number")
# Check against common passwords
common_passwords = ["password", "12345678", "qwerty", "abc123"]
if password.lower() in common_passwords:
raise ValidationError("Password is too common")
async def _send_verification_email(self, user: User) -> None:
"""Send email verification link."""
verification_url = (
f"{settings.FRONTEND_URL}/verify-email"
f"?token={user.email_verification_token}"
)
await send_email(
to=user.email,
subject="Verify your email",
body=f"Click here to verify: {verification_url}"
)
async def _send_password_reset_email(self, user: User) -> None:
"""Send password reset link."""
reset_url = (
f"{settings.FRONTEND_URL}/reset-password"
f"?token={user.password_reset_token}"
)
await send_email(
to=user.email,
subject="Reset your password",
body=f"Click here to reset: {reset_url}"
)
async def _send_password_changed_email(self, user: User) -> None:
"""Send password change confirmation."""
await send_email(
to=user.email,
subject="Password changed",
body="Your password has been successfully changed."
)

Create a session model to track user sessions:

class UserSession(SQLModel, table=True):
"""Track user sessions for security and analytics."""
__tablename__ = "user_sessions"
id: Optional[int] = Field(default=None, primary_key=True)
user_id: int = Field(foreign_key="users.id")
refresh_token: str = Field(unique=True, index=True)
ip_address: Optional[str]
user_agent: Optional[str]
created_at: datetime = Field(default_factory=datetime.utcnow)
last_used: datetime = Field(default_factory=datetime.utcnow)
expires_at: datetime
# Relationship
user: User = Relationship(back_populates="sessions")
def is_expired(self) -> bool:
"""Check if session has expired."""
return datetime.utcnow() > self.expires_at
def touch(self) -> None:
"""Update last used timestamp."""
self.last_used = datetime.utcnow()

Create app/dependencies.py:

"""
Dependency injection for authentication and authorization.
"""
from typing import Optional
from zenith import Depends, HTTPException, status
from zenith.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import JWTError
from sqlmodel import select
from app.database import get_session
from app.models import User, UserRole
from app.auth import decode_token
# Bearer token scheme
security = HTTPBearer()
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
session = Depends(get_session)
) -> User:
"""
Get current authenticated user from JWT token.
Validates:
- Token signature
- Token expiration
- User exists and is active
Args:
credentials: Bearer token from Authorization header
session: Database session
Returns:
Current user object
Raises:
HTTPException: 401 if authentication fails
"""
token = credentials.credentials
try:
# Decode token
payload = decode_token(token)
# Must be access token
if payload.get("type") != "access":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token type"
)
# Get user
user_id = payload.get("user_id")
if not user_id:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token payload"
)
user = await session.get(User, user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found"
)
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User account is deactivated"
)
return user
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or expired token"
)
async def get_current_active_user(
current_user: User = Depends(get_current_user)
) -> User:
"""
Get current user and ensure they're active.
Args:
current_user: User from token
Returns:
Active user
Raises:
HTTPException: 403 if user is inactive
"""
if not current_user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Inactive user"
)
return current_user
def require_role(role: UserRole):
"""
Dependency to require specific role.
Usage:
@app.get("/admin", dependencies=[Depends(require_role(UserRole.ADMIN))])
Args:
role: Required role
Returns:
Dependency function
"""
async def role_checker(
current_user: User = Depends(get_current_active_user)
) -> User:
if not current_user.has_role(role):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Requires {role} role"
)
return current_user
return role_checker
class PermissionChecker:
"""
Flexible permission checking.
Usage:
@app.get("/resource", dependencies=[Depends(PermissionChecker("read:resource"))])
"""
def __init__(self, required_permission: str):
self.required_permission = required_permission
async def __call__(
self,
current_user: User = Depends(get_current_active_user)
) -> User:
# Check if user has permission (implement your logic)
if not self._user_has_permission(current_user, self.required_permission):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Missing permission: {self.required_permission}"
)
return current_user
def _user_has_permission(
self,
user: User,
permission: str
) -> bool:
"""Check if user has specific permission."""
# Simple role-based check (extend as needed)
permission_map = {
UserRole.ADMIN: ["*"], # Admin has all permissions
UserRole.MANAGER: ["read:*", "write:project", "write:task"],
UserRole.USER: ["read:*", "write:own"]
}
user_permissions = permission_map.get(user.role, [])
# Check exact match or wildcard
for perm in user_permissions:
if perm == "*" or perm == permission:
return True
if perm.endswith("*") and permission.startswith(perm[:-1]):
return True
return False

Add rate limiting to prevent brute force attacks:

from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
# Create limiter
limiter = Limiter(
key_func=get_remote_address,
default_limits=["1000 per hour"]
)
# Add to app
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
# Apply to auth endpoints
@app.post("/auth/login")
@limiter.limit("5 per minute") # Strict limit for login
async def login(...):
...
@app.post("/auth/register")
@limiter.limit("3 per hour") # Prevent spam registrations
async def register(...):
...
@app.post("/auth/password-reset")
@limiter.limit("3 per hour") # Prevent email flooding
async def request_password_reset(...):
...

Create tests/test_auth.py:

"""
Test authentication and authorization.
"""
import pytest
from datetime import datetime, timedelta
from jose import jwt
from app.auth import (
hash_password,
verify_password,
create_access_token,
decode_token
)
class TestPasswordHashing:
"""Test password hashing functions."""
def test_hash_password(self):
"""Test password hashing."""
password = "SecurePassword123"
hashed = hash_password(password)
# Hash should be different from original
assert hashed != password
# Should be a valid Argon2 hash
assert hashed.startswith("$argon2id$")
def test_verify_password(self):
"""Test password verification."""
password = "TestPassword456"
hashed = hash_password(password)
# Correct password should verify
assert verify_password(password, hashed)
# Wrong password should fail
assert not verify_password("WrongPassword", hashed)
def test_different_hashes(self):
"""Test that same password produces different hashes."""
password = "SamePassword789"
hash1 = hash_password(password)
hash2 = hash_password(password)
# Different hashes due to random salt
assert hash1 != hash2
# But both verify correctly
assert verify_password(password, hash1)
assert verify_password(password, hash2)
class TestJWTTokens:
"""Test JWT token generation and validation."""
def test_create_access_token(self):
"""Test access token creation."""
data = {"user_id": 123, "email": "test@example.com"}
token = create_access_token(data)
# Decode and verify
payload = decode_token(token)
assert payload["user_id"] == 123
assert payload["email"] == "test@example.com"
assert payload["type"] == "access"
def test_token_expiration(self):
"""Test that expired tokens are rejected."""
data = {"user_id": 123}
# Create token that expires immediately
token = create_access_token(
data,
expires_delta=timedelta(seconds=-1)
)
# Should raise error when decoding
with pytest.raises(Exception):
decode_token(token)
class TestAuthEndpoints:
"""Test authentication endpoints."""
async def test_register_success(self, client):
"""Test successful registration."""
response = await client.post(
"/auth/register",
json={
"email": "newuser@example.com",
"password": "SecurePass123",
"name": "New User"
}
)
assert response.status_code == 201
data = response.json()
assert data["email"] == "newuser@example.com"
assert "password" not in data
async def test_register_weak_password(self, client):
"""Test registration with weak password."""
response = await client.post(
"/auth/register",
json={
"email": "weak@example.com",
"password": "weak", # Too short
"name": "Weak User"
}
)
assert response.status_code == 422
assert "at least 8 characters" in response.json()["detail"]
async def test_login_lockout(self, client):
"""Test account lockout after failed attempts."""
# Register user
await client.post(
"/auth/register",
json={
"email": "lockout@example.com",
"password": "CorrectPass123",
"name": "Test User"
}
)
# Try login with wrong password 5 times
for i in range(5):
response = await client.post(
"/auth/login",
json={
"email": "lockout@example.com",
"password": "WrongPassword"
}
)
# 6th attempt should show lockout
response = await client.post(
"/auth/login",
json={
"email": "lockout@example.com",
"password": "CorrectPass123" # Even correct password
}
)
assert response.status_code == 401
assert "locked" in response.json()["detail"].lower()
async def test_refresh_token(self, client):
"""Test refresh token flow."""
# Register and login
await client.post(
"/auth/register",
json={
"email": "refresh@example.com",
"password": "TestPass123",
"name": "Test User"
}
)
login_response = await client.post(
"/auth/login",
json={
"email": "refresh@example.com",
"password": "TestPass123"
}
)
tokens = login_response.json()
refresh_token = tokens["refresh_token"]
# Use refresh token to get new access token
response = await client.post(
"/auth/refresh",
json={"refresh_token": refresh_token}
)
assert response.status_code == 200
assert "access_token" in response.json()
class TestAuthorization:
"""Test role-based authorization."""
async def test_admin_only_endpoint(self, client, admin_token, user_token):
"""Test that admin endpoints require admin role."""
# Admin should succeed
response = await client.get(
"/admin/users",
headers={"Authorization": f"Bearer {admin_token}"}
)
assert response.status_code == 200
# Regular user should fail
response = await client.get(
"/admin/users",
headers={"Authorization": f"Bearer {user_token}"}
)
assert response.status_code == 403
assert "admin role" in response.json()["detail"].lower()
async def test_project_ownership(self, client, user1_token, user2_token):
"""Test that users can only modify their own projects."""
# User1 creates a project
response = await client.post(
"/projects",
json={"name": "User1 Project"},
headers={"Authorization": f"Bearer {user1_token}"}
)
project_id = response.json()["id"]
# User1 can update their project
response = await client.patch(
f"/projects/{project_id}",
json={"name": "Updated Project"},
headers={"Authorization": f"Bearer {user1_token}"}
)
assert response.status_code == 200
# User2 cannot update User1's project
response = await client.patch(
f"/projects/{project_id}",
json={"name": "Hacked Project"},
headers={"Authorization": f"Bearer {user2_token}"}
)
assert response.status_code == 403

Never store tokens in localStorage (XSS vulnerable). Use httpOnly cookies or secure memory storage:

// Bad: Vulnerable to XSS
localStorage.setItem('token', accessToken);
// Good: HttpOnly cookie (set by server)
// Or in-memory for SPAs
class TokenStore {
constructor() {
this.accessToken = null;
this.refreshToken = null;
}
setTokens(access, refresh) {
this.accessToken = access;
// Store refresh in httpOnly cookie via API
}
}

Always use HTTPS in production:

@app.on_event("startup")
async def enforce_https():
"""Redirect HTTP to HTTPS in production."""
if settings.ENVIRONMENT == "production":
app.add_middleware(
HTTPSRedirectMiddleware,
force_https=True
)

Add security headers to prevent common attacks:

from zenith.middleware.cors import CORSMiddleware
from secure import SecureHeaders
# Security headers
secure_headers = SecureHeaders()
@app.middleware("http")
async def add_security_headers(request, call_next):
response = await call_next(request)
secure_headers.framework.zenith(response)
return response
# CORS configuration
app.add_middleware(
CORSMiddleware,
allow_origins=settings.ALLOWED_ORIGINS,
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE"],
allow_headers=["Authorization", "Content-Type"]
)

Always validate and sanitize input:

from email_validator import validate_email, EmailNotValidError
def validate_email_address(email: str) -> str:
"""Validate and normalize email."""
try:
# Validate and get normalized result
validation = validate_email(email)
return validation.email
except EmailNotValidError:
raise ValidationError("Invalid email address")

Solution: Check SECRET_KEY is set, token hasn’t expired, and Authorization header format is correct: Bearer <token>

Solution: Always invalidate tokens after use and set expiration times

Solution: Regenerate session IDs after login and use secure session cookies

In this part, you’ve implemented: JWT authentication with access and refresh tokens Secure password hashing with Argon2id (via pwdlib) Account lockout for brute force protection Password reset flow with email tokens Role-based access control (RBAC) Session management and tracking Rate limiting for security endpoints Security best practices

In Part 5: Testing, we’ll:

  • Write comprehensive test suites
  • Test authentication flows
  • Mock external dependencies
  • Measure test coverage
  • Set up continuous integration

Questions? Check our FAQ or ask in GitHub Discussions.