Continue to Part 5
Ready to test your API? Continue to Testing →
Security is critical for any API. In this part, we’ll implement a complete authentication system with JWT tokens, secure password handling, role-based access control, and best practices you’ll use in production.
By the end of this part:
Before we code, let’s clarify these often-confused concepts:
# Authentication: Is this a valid user?user = authenticate(email="alice@example.com", password="secret")
# Authorization: Can this user perform this action?if not user.can_delete_project(project): raise PermissionError("You cannot delete this project")First, install the required packages:
# Using uv (Zenith already includes pwdlib with Argon2)uv add python-jose[cryptography] python-multipart
# Or using pippip install python-jose[cryptography] python-multipart pwdlib[argon2]Never store passwords in plain text. We’ll use Argon2id for secure hashing (via pwdlib, included with Zenith).
Update app/auth.py:
"""Authentication system with JWT tokens and secure password handling.
This module handles:- Password hashing with Argon2id (modern, secure)- JWT token generation and validation- User authentication- Permission checking"""
from typing import Optional, Dict, Anyfrom datetime import datetime, timedeltafrom jose import JWTError, jwtfrom pwdlib import PasswordHashfrom app.config import settings
# Password hashing configuration# Argon2id is the modern recommended algorithm (winner of Password Hashing Competition 2015)pwd_hash = PasswordHash.recommended() # Uses Argon2id with secure defaults
# JWT ConfigurationSECRET_KEY = settings.SECRET_KEYALGORITHM = "HS256"ACCESS_TOKEN_EXPIRE_MINUTES = 30REFRESH_TOKEN_EXPIRE_DAYS = 7
def hash_password(password: str) -> str: """ Hash a password using Argon2id.
Argon2id automatically handles: - Salt generation (unique per password) - Memory-hard computation (resistant to GPU/ASIC attacks) - Timing attack resistance
Args: password: Plain text password
Returns: Hashed password safe for storage """ return pwd_hash.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool: """ Verify a password against its hash.
This is timing-attack resistant - takes the same time whether the password is correct or not.
Args: plain_password: Password to check hashed_password: Stored hash to check against
Returns: True if password matches, False otherwise """ return pwd_hash.verify(plain_password, hashed_password)
def create_access_token( data: Dict[str, Any], expires_delta: Optional[timedelta] = None) -> str: """ Create a JWT access token.
Access tokens are short-lived (30 minutes default) and contain minimal user information.
Args: data: Payload to encode (user_id, email, etc) expires_delta: Custom expiration time
Returns: Encoded JWT token """ to_encode = data.copy()
# Set expiration if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({ "exp": expire, "type": "access" })
# Create and return the token encoded_jwt = jwt.encode( to_encode, SECRET_KEY, algorithm=ALGORITHM ) return encoded_jwt
def create_refresh_token( data: Dict[str, Any]) -> str: """ Create a JWT refresh token.
Refresh tokens are long-lived (7 days) and used to get new access tokens without re-authenticating.
Args: data: Minimal user identification
Returns: Encoded refresh token """ to_encode = data.copy() expire = datetime.utcnow() + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)
to_encode.update({ "exp": expire, "type": "refresh" })
encoded_jwt = jwt.encode( to_encode, SECRET_KEY, algorithm=ALGORITHM ) return encoded_jwt
def decode_token(token: str) -> Dict[str, Any]: """ Decode and validate a JWT token.
Checks: - Signature validity - Expiration time - Token structure
Args: token: JWT token to decode
Returns: Decoded token payload
Raises: JWTError: If token is invalid or expired """ try: payload = jwt.decode( token, SECRET_KEY, algorithms=[ALGORITHM] ) return payload except JWTError: raiseUpdate app/models.py to add role support:
from enum import Enum
class UserRole(str, Enum): """User roles for authorization.""" ADMIN = "admin" MANAGER = "manager" USER = "user"
class User(UserBase, table=True): """Enhanced user model with roles and security fields.""" __tablename__ = "users"
id: Optional[int] = Field(default=None, primary_key=True) password_hash: str role: UserRole = Field(default=UserRole.USER)
# Security fields is_active: bool = Field(default=True) is_verified: bool = Field(default=False) # Email verified last_login: Optional[datetime] = None failed_login_attempts: int = Field(default=0) locked_until: Optional[datetime] = None # Account lockout
# Tokens for various flows email_verification_token: Optional[str] = None password_reset_token: Optional[str] = None password_reset_expires: Optional[datetime] = None
# Timestamps created_at: datetime = Field(default_factory=datetime.utcnow) updated_at: datetime = Field(default_factory=datetime.utcnow)
# Relationships projects: List["Project"] = Relationship(back_populates="owner") sessions: List["UserSession"] = Relationship(back_populates="user")
# Authorization methods def has_role(self, role: UserRole) -> bool: """Check if user has a specific role or higher.""" role_hierarchy = { UserRole.USER: 0, UserRole.MANAGER: 1, UserRole.ADMIN: 2 } return role_hierarchy.get(self.role, 0) >= role_hierarchy.get(role, 0)
def can_manage_project(self, project: "Project") -> bool: """Check if user can manage a project.""" return ( self.role == UserRole.ADMIN or project.owner_id == self.id or self.id in project.member_ids )
def is_locked(self) -> bool: """Check if account is locked due to failed attempts.""" if self.locked_until: if datetime.utcnow() < self.locked_until: return True # Unlock if time has passed self.locked_until = None self.failed_login_attempts = 0 return FalseCreate app/services/auth.py:
"""Authentication service handling login, registration, and tokens."""
from typing import Optionalfrom datetime import datetime, timedeltaimport secretsfrom sqlmodel import selectfrom app.services import BaseServicefrom app.models import User, UserRole, UserSessionfrom app.auth import ( hash_password, verify_password, create_access_token, create_refresh_token, decode_token)from app.exceptions import ( AuthenticationError, ValidationError, ConflictError, NotFoundError)from app.email import send_email
class AuthService(BaseService): """Handles authentication and authorization."""
# Security constants MAX_LOGIN_ATTEMPTS = 5 LOCKOUT_DURATION_MINUTES = 30 TOKEN_LENGTH = 32
async def register( self, email: str, password: str, name: str, send_verification: bool = True ) -> User: """ Register a new user.
Steps: 1. Validate email uniqueness 2. Validate password strength 3. Hash password 4. Create user account 5. Send verification email
Args: email: User's email address password: Plain text password name: User's display name send_verification: Whether to send email verification
Returns: Created user object
Raises: ConflictError: If email already exists ValidationError: If password is too weak """ # Check email uniqueness stmt = select(User).where(User.email == email) existing = await self.session.exec(stmt).first() if existing: raise ConflictError("Email already registered")
# Validate password strength self._validate_password_strength(password)
# Create user user = User( email=email, password_hash=hash_password(password), name=name, role=UserRole.USER, is_verified=not send_verification # Skip verification in tests )
# Generate email verification token if send_verification: user.email_verification_token = secrets.token_urlsafe(self.TOKEN_LENGTH)
self.session.add(user) await self.commit()
# Send verification email if send_verification: await self._send_verification_email(user)
return user
async def login( self, email: str, password: str, ip_address: Optional[str] = None, user_agent: Optional[str] = None ) -> Dict[str, Any]: """ Authenticate user and create session.
Security features: - Account lockout after failed attempts - Session tracking - Refresh token for extended access
Args: email: User's email password: Plain text password ip_address: Client IP for session tracking user_agent: Client user agent
Returns: Dict with tokens and user info
Raises: AuthenticationError: If login fails """ # Get user stmt = select(User).where(User.email == email) user = await self.session.exec(stmt).first()
if not user: # Don't reveal whether email exists raise AuthenticationError("Invalid email or password")
# Check if account is locked if user.is_locked(): minutes_left = (user.locked_until - datetime.utcnow()).seconds // 60 raise AuthenticationError( f"Account locked. Try again in {minutes_left} minutes." )
# Verify password if not verify_password(password, user.password_hash): # Increment failed attempts user.failed_login_attempts += 1
# Lock account if too many failures if user.failed_login_attempts >= self.MAX_LOGIN_ATTEMPTS: user.locked_until = ( datetime.utcnow() + timedelta(minutes=self.LOCKOUT_DURATION_MINUTES) ) await self.commit() raise AuthenticationError("Too many failed attempts. Account locked.")
await self.commit() raise AuthenticationError("Invalid email or password")
# Check if email is verified if not user.is_verified: raise AuthenticationError("Please verify your email first")
# Check if account is active if not user.is_active: raise AuthenticationError("Account has been deactivated")
# Reset failed attempts on successful login user.failed_login_attempts = 0 user.last_login = datetime.utcnow()
# Create tokens access_token = create_access_token( data={ "sub": user.email, "user_id": user.id, "role": user.role } )
refresh_token = create_refresh_token( data={ "sub": user.email, "user_id": user.id } )
# Create session record session = UserSession( user_id=user.id, refresh_token=refresh_token, ip_address=ip_address, user_agent=user_agent, expires_at=datetime.utcnow() + timedelta(days=7) ) self.session.add(session)
await self.commit()
return { "access_token": access_token, "refresh_token": refresh_token, "token_type": "bearer", "user": user }
async def refresh_access_token( self, refresh_token: str ) -> Dict[str, str]: """ Generate new access token from refresh token.
This allows users to stay logged in without re-entering credentials.
Args: refresh_token: Valid refresh token
Returns: New access token
Raises: AuthenticationError: If refresh token is invalid """ try: # Decode refresh token payload = decode_token(refresh_token)
if payload.get("type") != "refresh": raise AuthenticationError("Invalid token type")
# Check if session exists and is valid stmt = select(UserSession).where( UserSession.refresh_token == refresh_token ) session = await self.session.exec(stmt).first()
if not session or session.is_expired(): raise AuthenticationError("Session expired")
# Get user user = await self.session.get(User, payload["user_id"]) if not user or not user.is_active: raise AuthenticationError("User not found or inactive")
# Create new access token access_token = create_access_token( data={ "sub": user.email, "user_id": user.id, "role": user.role } )
return { "access_token": access_token, "token_type": "bearer" }
except JWTError: raise AuthenticationError("Invalid refresh token")
async def logout( self, user_id: int, refresh_token: Optional[str] = None ) -> bool: """ Logout user by invalidating session.
Args: user_id: User to logout refresh_token: Specific session to invalidate
Returns: True if successful """ if refresh_token: # Invalidate specific session stmt = select(UserSession).where( (UserSession.user_id == user_id) & (UserSession.refresh_token == refresh_token) ) session = await self.session.exec(stmt).first() if session: await self.session.delete(session) else: # Invalidate all sessions stmt = select(UserSession).where(UserSession.user_id == user_id) sessions = await self.session.exec(stmt).all() for session in sessions: await self.session.delete(session)
await self.commit() return True
async def request_password_reset( self, email: str ) -> bool: """ Initiate password reset process.
Generates a secure token and sends reset email.
Args: email: User's email address
Returns: True (always, to prevent email enumeration) """ # Get user (but don't reveal if exists) stmt = select(User).where(User.email == email) user = await self.session.exec(stmt).first()
if user and user.is_active: # Generate reset token user.password_reset_token = secrets.token_urlsafe(self.TOKEN_LENGTH) user.password_reset_expires = ( datetime.utcnow() + timedelta(hours=1) )
await self.commit()
# Send reset email await self._send_password_reset_email(user)
# Always return True to prevent email enumeration return True
async def reset_password( self, token: str, new_password: str ) -> bool: """ Reset password using token.
Args: token: Password reset token new_password: New password
Returns: True if successful
Raises: ValidationError: If token invalid or expired """ # Find user with token stmt = select(User).where(User.password_reset_token == token) user = await self.session.exec(stmt).first()
if not user: raise ValidationError("Invalid reset token")
# Check expiration if user.password_reset_expires < datetime.utcnow(): raise ValidationError("Reset token has expired")
# Validate new password self._validate_password_strength(new_password)
# Update password user.password_hash = hash_password(new_password) user.password_reset_token = None user.password_reset_expires = None
# Invalidate all sessions (force re-login) stmt = select(UserSession).where(UserSession.user_id == user.id) sessions = await self.session.exec(stmt).all() for session in sessions: await self.session.delete(session)
await self.commit()
# Send confirmation email await self._send_password_changed_email(user)
return True
def _validate_password_strength(self, password: str) -> None: """ Validate password meets security requirements.
Requirements: - At least 8 characters - Contains uppercase and lowercase - Contains at least one number - Not a common password
Raises: ValidationError: If password is weak """ if len(password) < 8: raise ValidationError("Password must be at least 8 characters")
if not any(c.isupper() for c in password): raise ValidationError("Password must contain uppercase letter")
if not any(c.islower() for c in password): raise ValidationError("Password must contain lowercase letter")
if not any(c.isdigit() for c in password): raise ValidationError("Password must contain a number")
# Check against common passwords common_passwords = ["password", "12345678", "qwerty", "abc123"] if password.lower() in common_passwords: raise ValidationError("Password is too common")
async def _send_verification_email(self, user: User) -> None: """Send email verification link.""" verification_url = ( f"{settings.FRONTEND_URL}/verify-email" f"?token={user.email_verification_token}" )
await send_email( to=user.email, subject="Verify your email", body=f"Click here to verify: {verification_url}" )
async def _send_password_reset_email(self, user: User) -> None: """Send password reset link.""" reset_url = ( f"{settings.FRONTEND_URL}/reset-password" f"?token={user.password_reset_token}" )
await send_email( to=user.email, subject="Reset your password", body=f"Click here to reset: {reset_url}" )
async def _send_password_changed_email(self, user: User) -> None: """Send password change confirmation.""" await send_email( to=user.email, subject="Password changed", body="Your password has been successfully changed." )Create a session model to track user sessions:
class UserSession(SQLModel, table=True): """Track user sessions for security and analytics.""" __tablename__ = "user_sessions"
id: Optional[int] = Field(default=None, primary_key=True) user_id: int = Field(foreign_key="users.id") refresh_token: str = Field(unique=True, index=True) ip_address: Optional[str] user_agent: Optional[str] created_at: datetime = Field(default_factory=datetime.utcnow) last_used: datetime = Field(default_factory=datetime.utcnow) expires_at: datetime
# Relationship user: User = Relationship(back_populates="sessions")
def is_expired(self) -> bool: """Check if session has expired.""" return datetime.utcnow() > self.expires_at
def touch(self) -> None: """Update last used timestamp.""" self.last_used = datetime.utcnow()Create app/dependencies.py:
"""Dependency injection for authentication and authorization."""
from typing import Optionalfrom zenith import Depends, HTTPException, statusfrom zenith.security import HTTPBearer, HTTPAuthorizationCredentialsfrom jose import JWTErrorfrom sqlmodel import selectfrom app.database import get_sessionfrom app.models import User, UserRolefrom app.auth import decode_token
# Bearer token schemesecurity = HTTPBearer()
async def get_current_user( credentials: HTTPAuthorizationCredentials = Depends(security), session = Depends(get_session)) -> User: """ Get current authenticated user from JWT token.
Validates: - Token signature - Token expiration - User exists and is active
Args: credentials: Bearer token from Authorization header session: Database session
Returns: Current user object
Raises: HTTPException: 401 if authentication fails """ token = credentials.credentials
try: # Decode token payload = decode_token(token)
# Must be access token if payload.get("type") != "access": raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token type" )
# Get user user_id = payload.get("user_id") if not user_id: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token payload" )
user = await session.get(User, user_id)
if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found" )
if not user.is_active: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="User account is deactivated" )
return user
except JWTError: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or expired token" )
async def get_current_active_user( current_user: User = Depends(get_current_user)) -> User: """ Get current user and ensure they're active.
Args: current_user: User from token
Returns: Active user
Raises: HTTPException: 403 if user is inactive """ if not current_user.is_active: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Inactive user" ) return current_user
def require_role(role: UserRole): """ Dependency to require specific role.
Usage: @app.get("/admin", dependencies=[Depends(require_role(UserRole.ADMIN))])
Args: role: Required role
Returns: Dependency function """ async def role_checker( current_user: User = Depends(get_current_active_user) ) -> User: if not current_user.has_role(role): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Requires {role} role" ) return current_user
return role_checker
class PermissionChecker: """ Flexible permission checking.
Usage: @app.get("/resource", dependencies=[Depends(PermissionChecker("read:resource"))]) """
def __init__(self, required_permission: str): self.required_permission = required_permission
async def __call__( self, current_user: User = Depends(get_current_active_user) ) -> User: # Check if user has permission (implement your logic) if not self._user_has_permission(current_user, self.required_permission): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Missing permission: {self.required_permission}" ) return current_user
def _user_has_permission( self, user: User, permission: str ) -> bool: """Check if user has specific permission.""" # Simple role-based check (extend as needed) permission_map = { UserRole.ADMIN: ["*"], # Admin has all permissions UserRole.MANAGER: ["read:*", "write:project", "write:task"], UserRole.USER: ["read:*", "write:own"] }
user_permissions = permission_map.get(user.role, [])
# Check exact match or wildcard for perm in user_permissions: if perm == "*" or perm == permission: return True if perm.endswith("*") and permission.startswith(perm[:-1]): return True
return FalseAdd rate limiting to prevent brute force attacks:
from slowapi import Limiter, _rate_limit_exceeded_handlerfrom slowapi.util import get_remote_addressfrom slowapi.errors import RateLimitExceeded
# Create limiterlimiter = Limiter( key_func=get_remote_address, default_limits=["1000 per hour"])
# Add to appapp.state.limiter = limiterapp.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
# Apply to auth endpoints@app.post("/auth/login")@limiter.limit("5 per minute") # Strict limit for loginasync def login(...): ...
@app.post("/auth/register")@limiter.limit("3 per hour") # Prevent spam registrationsasync def register(...): ...
@app.post("/auth/password-reset")@limiter.limit("3 per hour") # Prevent email floodingasync def request_password_reset(...): ...Create tests/test_auth.py:
"""Test authentication and authorization."""
import pytestfrom datetime import datetime, timedeltafrom jose import jwtfrom app.auth import ( hash_password, verify_password, create_access_token, decode_token)
class TestPasswordHashing: """Test password hashing functions."""
def test_hash_password(self): """Test password hashing.""" password = "SecurePassword123" hashed = hash_password(password)
# Hash should be different from original assert hashed != password
# Should be a valid Argon2 hash assert hashed.startswith("$argon2id$")
def test_verify_password(self): """Test password verification.""" password = "TestPassword456" hashed = hash_password(password)
# Correct password should verify assert verify_password(password, hashed)
# Wrong password should fail assert not verify_password("WrongPassword", hashed)
def test_different_hashes(self): """Test that same password produces different hashes.""" password = "SamePassword789" hash1 = hash_password(password) hash2 = hash_password(password)
# Different hashes due to random salt assert hash1 != hash2
# But both verify correctly assert verify_password(password, hash1) assert verify_password(password, hash2)
class TestJWTTokens: """Test JWT token generation and validation."""
def test_create_access_token(self): """Test access token creation.""" data = {"user_id": 123, "email": "test@example.com"} token = create_access_token(data)
# Decode and verify payload = decode_token(token) assert payload["user_id"] == 123 assert payload["email"] == "test@example.com" assert payload["type"] == "access"
def test_token_expiration(self): """Test that expired tokens are rejected.""" data = {"user_id": 123}
# Create token that expires immediately token = create_access_token( data, expires_delta=timedelta(seconds=-1) )
# Should raise error when decoding with pytest.raises(Exception): decode_token(token)
class TestAuthEndpoints: """Test authentication endpoints."""
async def test_register_success(self, client): """Test successful registration.""" response = await client.post( "/auth/register", json={ "email": "newuser@example.com", "password": "SecurePass123", "name": "New User" } )
assert response.status_code == 201 data = response.json() assert data["email"] == "newuser@example.com" assert "password" not in data
async def test_register_weak_password(self, client): """Test registration with weak password.""" response = await client.post( "/auth/register", json={ "email": "weak@example.com", "password": "weak", # Too short "name": "Weak User" } )
assert response.status_code == 422 assert "at least 8 characters" in response.json()["detail"]
async def test_login_lockout(self, client): """Test account lockout after failed attempts.""" # Register user await client.post( "/auth/register", json={ "email": "lockout@example.com", "password": "CorrectPass123", "name": "Test User" } )
# Try login with wrong password 5 times for i in range(5): response = await client.post( "/auth/login", json={ "email": "lockout@example.com", "password": "WrongPassword" } )
# 6th attempt should show lockout response = await client.post( "/auth/login", json={ "email": "lockout@example.com", "password": "CorrectPass123" # Even correct password } )
assert response.status_code == 401 assert "locked" in response.json()["detail"].lower()
async def test_refresh_token(self, client): """Test refresh token flow.""" # Register and login await client.post( "/auth/register", json={ "email": "refresh@example.com", "password": "TestPass123", "name": "Test User" } )
login_response = await client.post( "/auth/login", json={ "email": "refresh@example.com", "password": "TestPass123" } )
tokens = login_response.json() refresh_token = tokens["refresh_token"]
# Use refresh token to get new access token response = await client.post( "/auth/refresh", json={"refresh_token": refresh_token} )
assert response.status_code == 200 assert "access_token" in response.json()
class TestAuthorization: """Test role-based authorization."""
async def test_admin_only_endpoint(self, client, admin_token, user_token): """Test that admin endpoints require admin role.""" # Admin should succeed response = await client.get( "/admin/users", headers={"Authorization": f"Bearer {admin_token}"} ) assert response.status_code == 200
# Regular user should fail response = await client.get( "/admin/users", headers={"Authorization": f"Bearer {user_token}"} ) assert response.status_code == 403 assert "admin role" in response.json()["detail"].lower()
async def test_project_ownership(self, client, user1_token, user2_token): """Test that users can only modify their own projects.""" # User1 creates a project response = await client.post( "/projects", json={"name": "User1 Project"}, headers={"Authorization": f"Bearer {user1_token}"} ) project_id = response.json()["id"]
# User1 can update their project response = await client.patch( f"/projects/{project_id}", json={"name": "Updated Project"}, headers={"Authorization": f"Bearer {user1_token}"} ) assert response.status_code == 200
# User2 cannot update User1's project response = await client.patch( f"/projects/{project_id}", json={"name": "Hacked Project"}, headers={"Authorization": f"Bearer {user2_token}"} ) assert response.status_code == 403Never store tokens in localStorage (XSS vulnerable). Use httpOnly cookies or secure memory storage:
// Bad: Vulnerable to XSSlocalStorage.setItem('token', accessToken);
// Good: HttpOnly cookie (set by server)// Or in-memory for SPAsclass TokenStore { constructor() { this.accessToken = null; this.refreshToken = null; }
setTokens(access, refresh) { this.accessToken = access; // Store refresh in httpOnly cookie via API }}Always use HTTPS in production:
@app.on_event("startup")async def enforce_https(): """Redirect HTTP to HTTPS in production.""" if settings.ENVIRONMENT == "production": app.add_middleware( HTTPSRedirectMiddleware, force_https=True )Add security headers to prevent common attacks:
from zenith.middleware.cors import CORSMiddlewarefrom secure import SecureHeaders
# Security headerssecure_headers = SecureHeaders()
@app.middleware("http")async def add_security_headers(request, call_next): response = await call_next(request) secure_headers.framework.zenith(response) return response
# CORS configurationapp.add_middleware( CORSMiddleware, allow_origins=settings.ALLOWED_ORIGINS, allow_credentials=True, allow_methods=["GET", "POST", "PUT", "DELETE"], allow_headers=["Authorization", "Content-Type"])Always validate and sanitize input:
from email_validator import validate_email, EmailNotValidError
def validate_email_address(email: str) -> str: """Validate and normalize email.""" try: # Validate and get normalized result validation = validate_email(email) return validation.email except EmailNotValidError: raise ValidationError("Invalid email address")Solution: Check SECRET_KEY is set, token hasn’t expired, and Authorization header format is correct: Bearer <token>
Solution: Always invalidate tokens after use and set expiration times
Solution: Regenerate session IDs after login and use secure session cookies
In this part, you’ve implemented: JWT authentication with access and refresh tokens Secure password hashing with Argon2id (via pwdlib) Account lockout for brute force protection Password reset flow with email tokens Role-based access control (RBAC) Session management and tracking Rate limiting for security endpoints Security best practices
In Part 5: Testing, we’ll:
Continue to Part 5
Ready to test your API? Continue to Testing →
Questions? Check our FAQ or ask in GitHub Discussions.