Database Integration
Overview
Section titled “Overview”Zenith provides integrated database integration with async SQLAlchemy, automatic migrations, and connection pooling. It supports PostgreSQL, MySQL, SQLite, and other SQL databases.
Quick Start (Database in 5 Minutes)
Section titled “Quick Start (Database in 5 Minutes)”from zenith import Zenithfrom zenith.core import DB # ✨ Magic dependency injection shortcutfrom zenith.db import ZenithModel # Enhanced model with built-in methodsfrom sqlmodel import Fieldfrom typing import Optionalfrom datetime import datetime
app = Zenith()
# STEP 1: Define your models with ZenithModel (not SQLModel!)class User(ZenithModel, table=True): # table=True means this is a database table """User model with automatic session management.
ZenithModel gives you: - No manual session management - Built-in CRUD methods (create, find, update, delete) - Chainable queries (where, order_by, limit) - Automatic JSON serialization """
# Primary key - auto-incremented by database id: Optional[int] = Field(default=None, primary_key=True)
# Unique email with index for fast lookups email: str = Field( unique=True, # Database enforces uniqueness index=True # Creates index for WHERE email = ? queries )
# Required field name: str # No default = required
# Auto-set timestamp created_at: datetime = Field( default_factory=datetime.utcnow # Function called on insert )
# STEP 2: Use in routes - NO SESSION MANAGEMENT NEEDED!@app.post("/users")async def create_user( user: User, # Pydantic validates input session: AsyncSession = Session # This injects the database session automatically!): """Create a new user.
Notice what's missing: - No async with get_session() - No session.add() - No session.commit() - No session.refresh()
ZenithModel handles ALL of this! """
# One line to create and save! new_user = await User.create( name=user.name, email=user.email # created_at is set automatically )
return new_user # Automatically converted to JSON
@app.get("/users")async def get_users(): """List users with intuitive query syntax.
Look how this reads like English! """
# Chainable query methods users = await ( User.where(active=True) # Filter .order_by('-created_at') # Sort (- means DESC) .limit(10) # Limit results .all() # Execute and get all )
return {"users": users} # Auto-serialized to JSON
# That's it! Database integration complete! 🎉Database Configuration
Section titled “Database Configuration”Connection URLs (One URL to Rule Them All)
Section titled “Connection URLs (One URL to Rule Them All)”PostgreSQL (Production Choice):
# Standard connection (synchronous - avoid in production)DATABASE_URL = "postgresql://user:password@localhost/dbname"# ^protocol^ ^user^ ^pass^ ^host^ ^database^
# ASYNC with asyncpg (RECOMMENDED for production)DATABASE_URL = "postgresql+asyncpg://user:password@localhost/dbname"# ^^^^^^^^ Async driver for 10x performance
# With SSL (required for cloud databases)DATABASE_URL = "postgresql+asyncpg://user:pass@host/db?sslmode=require"# ^^^^^^^^^^^^^^^^# sslmode options:# - disable: No SSL (dev only!)# - require: Use SSL (production)# - verify-full: SSL + hostname verification (most secure)
# Full production exampleDATABASE_URL = ( "postgresql+asyncpg://" "myapp:SecurePass123!@" # Credentials "db.example.com:5432/" # Host and port "production_db" # Database name "?sslmode=require" # SSL required "&pool_size=20" # Connection pool "&max_overflow=0" # No overflow connections)MySQL (Alternative Choice):
# Standard (synchronous)DATABASE_URL = "mysql://user:password@localhost/dbname"
# ASYNC with aiomysql (for async operations)DATABASE_URL = "mysql+aiomysql://user:password@localhost/dbname"
# With proper character encoding (IMPORTANT for emojis 🚀)DATABASE_URL = "mysql+aiomysql://user:pass@localhost/db?charset=utf8mb4"# ^^^^^^^^^^^^^^^^# utf8mb4 = Full UTF-8 support including emojis# utf8 = Old MySQL UTF-8 (doesn't support all characters)SQLite (Development/Testing):
# File-based database (good for development)DATABASE_URL = "sqlite:///./app.db"# ^^^^^^^^ ^^ ^^^^^^^^# protocol // path to file
# ASYNC with aiosqlite (for async operations)DATABASE_URL = "sqlite+aiosqlite:///./app.db"# ^^^^^^^^^^^ Async driver
# In-memory database (perfect for testing - super fast!)DATABASE_URL = "sqlite:///:memory:"# ^^^^^^^^ Special path for RAM storage
# Development with absolute pathDATABASE_URL = "sqlite+aiosqlite:////Users/me/project/dev.db"# ^^ Four slashes for absolute path!Environment-based configuration (RECOMMENDED):
import os
# Use different databases per environmentDATABASE_URL = os.getenv( "DATABASE_URL", "sqlite+aiosqlite:///./dev.db" # Default for development)
# Production: Set DATABASE_URL environment variable# Development: Uses SQLite automaticallyConnection Pooling (Don’t DDoS Your Database!)
Section titled “Connection Pooling (Don’t DDoS Your Database!)”from zenith.db import create_engine
# Configure connection pool for productionengine = create_engine( DATABASE_URL,
# POOL SIZE - How many connections to keep ready pool_size=20, # Rule of thumb: 2-4 connections per CPU core # Too small: requests wait for connections # Too large: database overhead, memory usage
# OVERFLOW - Extra connections when pool is full max_overflow=40, # Creates up to 40 MORE connections if needed # Total possible: pool_size + max_overflow = 60 # Set to 0 to disable (recommended for cloud databases)
# TIMEOUT - How long to wait for a connection pool_timeout=30, # After 30 seconds, give up and raise error # Prevents infinite waiting
# RECYCLE - Replace old connections pool_recycle=3600, # 1 hour in seconds # Why? Some databases close idle connections # MySQL: wait_timeout default is 8 hours # PostgreSQL: No default timeout # Cloud databases: Often 5-15 minutes!
# PRE-PING - Test connection before use pool_pre_ping=True, # Sends "SELECT 1" before each connection use # Small overhead but prevents "connection lost" errors # ALWAYS use in production!
# DEBUG - SQL query logging echo=False # Set True to see ALL SQL queries (dev only!) # echo="debug" for even more detail)
# Cloud database optimized settingscloud_engine = create_engine( DATABASE_URL, pool_size=10, # Smaller pool for cloud max_overflow=0, # No overflow (cloud connection limits) pool_timeout=10, # Fail fast pool_recycle=300, # 5 minutes (aggressive recycling) pool_pre_ping=True, # Essential for cloud!)Models with SQLModel
Section titled “Models with SQLModel”SQLModel combines SQLAlchemy and Pydantic for type-safe database models:
Basic Model (Your Data Blueprint)
Section titled “Basic Model (Your Data Blueprint)”from zenith.db import SQLModel, Fieldfrom typing import Optionalfrom datetime import datetime
class Product(SQLModel, table=True): # table=True creates database table """Product model with all common patterns.
This becomes a database table with columns matching the fields. """
# PRIMARY KEY - Unique identifier for each row id: Optional[int] = Field( default=None, # None means auto-generate primary_key=True # This is the unique ID ) # Database generates: 1, 2, 3, 4...
# REQUIRED FIELDS - Must provide when creating name: str = Field( index=True # Create index for fast searches ) # Index makes WHERE name = 'iPhone' fast
price: float = Field( ge=0 # Validation: greater than or equal to 0 ) # Prevents negative prices at database level
# OPTIONAL FIELDS - Can be NULL in database description: Optional[str] = None # None = NULL in database # Optional[str] tells type checker this can be None
# FIELD WITH DEFAULT - Auto-set if not provided stock: int = Field( default=0, # Default value if not specified ge=0 # Can't have negative stock )
# TIMESTAMPS - Track when created/modified created_at: datetime = Field( default_factory=datetime.utcnow # Function called on insert ) # Automatically set to current time when created
updated_at: Optional[datetime] = None # Manually set this when updating # Better: Use database triggers or middleware
# UNIQUE CONSTRAINT - No duplicates allowed sku: str = Field( unique=True, # Database enforces uniqueness index=True # Also index for fast lookups ) # Example: "PROD-12345" can only exist once
# What this creates in PostgreSQL: # CREATE TABLE product ( # id SERIAL PRIMARY KEY, # name VARCHAR NOT NULL, # price FLOAT NOT NULL CHECK (price >= 0), # description TEXT, # stock INTEGER DEFAULT 0 CHECK (stock >= 0), # created_at TIMESTAMP NOT NULL, # updated_at TIMESTAMP, # sku VARCHAR UNIQUE NOT NULL # ); # CREATE INDEX ix_product_name ON product(name); # CREATE INDEX ix_product_sku ON product(sku);Relationships (Connecting Your Data)
Section titled “Relationships (Connecting Your Data)”from sqlmodel import Relationship
# ONE-TO-MANY: One user has many postsclass User(SQLModel, table=True): id: Optional[int] = Field(default=None, primary_key=True) email: str = Field(unique=True) name: str
# One user -> Many posts posts: list["Post"] = Relationship( back_populates="author" # Post model has 'author' field ) # Forward reference "Post" because Post class defined below # This creates NO database column - it's Python-only navigation
class Post(SQLModel, table=True): id: Optional[int] = Field(default=None, primary_key=True) title: str content: str
# FOREIGN KEY - Links to User table author_id: int = Field( foreign_key="user.id" # References user table, id column ) # This DOES create a database column! # Database enforces: author_id must exist in user.id
# Many posts -> One author (reverse of User.posts) author: User = Relationship( back_populates="posts" # User model has 'posts' field )
# MANY-TO-MANY: Posts can have multiple tags, tags can be on multiple posts tags: list["Tag"] = Relationship( back_populates="posts", link_model="PostTag" # Join table defined below )
# Usage example:# user = await User.find(1)# user.posts # List of all posts by this user (lazy loaded)## post = await Post.find(1)# post.author # The User who wrote this post# post.tags # List of all tags on this post
class Tag(SQLModel, table=True): id: Optional[int] = Field(default=None, primary_key=True) name: str = Field(unique=True) # "python", "tutorial", etc.
# Reverse relationship posts: list[Post] = Relationship( back_populates="tags", link_model="PostTag" # Same join table )
# JOIN TABLE for many-to-many (required!)class PostTag(SQLModel, table=True): """Link table connecting posts and tags.
This creates the actual database table: CREATE TABLE posttag ( post_id INT REFERENCES post(id), tag_id INT REFERENCES tag(id), PRIMARY KEY (post_id, tag_id) ) """
# Composite primary key (both fields together are unique) post_id: int = Field( foreign_key="post.id", primary_key=True # Part 1 of composite key ) tag_id: int = Field( foreign_key="tag.id", primary_key=True # Part 2 of composite key )
# No additional fields needed for basic many-to-many # But you could add: created_at, added_by, etc.
# Example: Tagging a post# post = await Post.find(1)# python_tag = await Tag.where(name="python").first()# post.tags.append(python_tag)# await post.save()Model Inheritance
Section titled “Model Inheritance”# Base model for shared fieldsclass TimestampMixin(SQLModel): created_at: datetime = Field(default_factory=datetime.utcnow) updated_at: Optional[datetime] = None
# Inherit from baseclass Article(TimestampMixin, table=True): id: Optional[int] = Field(default=None, primary_key=True) title: str content: str
class Comment(TimestampMixin, table=True): id: Optional[int] = Field(default=None, primary_key=True) text: str article_id: int = Field(foreign_key="article.id")Database Sessions
Section titled “Database Sessions”Dependency Injection
Section titled “Dependency Injection”from zenith import Dependsfrom zenith.core import DB
@app.get("/users")async def list_users(session: AsyncSession = Session): # Use ZenithModel for intuitive queries users = await User.all() return usersManual Session Management
Section titled “Manual Session Management”from zenith.db import SessionLocal
@app.post("/users")async def create_user(user_data: UserCreate): async with SessionLocal() as session: user = User(**user_data.model_dump()) session.add(user) await session.commit() await session.refresh(user) return userTransaction Management (All or Nothing!)
Section titled “Transaction Management (All or Nothing!)”@app.post("/transfer")async def transfer_funds( from_account: int, to_account: int, amount: float, session: AsyncSession = Session # Injected database session): """ Bank transfer - MUST be atomic!
Transaction ensures: - ALL changes succeed together - OR all changes rollback on error - No partial transfers (money disappearing!) """
# Begin transaction - everything inside succeeds or fails together async with db.begin(): # Auto-commit on success, auto-rollback on error
# Step 1: Get both accounts (with row locking) sender = await db.get( Account, from_account, with_for_update=True # Lock row to prevent race conditions ) receiver = await db.get( Account, to_account, with_for_update=True )
# Step 2: Validate accounts exist if not sender or not receiver: raise HTTPException(404, "Account not found")
# Step 3: Check sufficient balance if sender.balance < amount: raise HTTPException(400, "Insufficient funds") # Transaction automatically rolls back!
# Step 4: Perform transfer sender.balance -= amount # Deduct from sender receiver.balance += amount # Add to receiver
# Step 5: Audit log (also in transaction) transfer_log = TransferLog( from_account=from_account, to_account=to_account, amount=amount, timestamp=datetime.utcnow() ) db.add(transfer_log)
# Transaction commits here if no exceptions # If ANY exception occurs, EVERYTHING rolls back
# Outside transaction - changes are committed return { "status": "success", "from_balance": sender.balance, "to_balance": receiver.balance }
# What happens on error:# 1. Exception raised anywhere in 'with' block# 2. Database rolls back ALL changes# 3. Sender keeps money, receiver gets nothing# 4. Database stays consistent!Queries
Section titled “Queries”Basic Queries (Finding Your Data)
Section titled “Basic Queries (Finding Your Data)”from sqlmodel import select, col
# GET ALL RECORDSstatement = select(User) # Build SELECT * FROM userusers = await session.exec(statement).all() # Execute and fetch all# SQL: SELECT * FROM user
# GET BY PRIMARY KEY (fastest)user = await session.get(User, user_id) # Direct lookup by ID# SQL: SELECT * FROM user WHERE id = ?# Returns None if not found
# FILTER WITH WHEREstatement = select(User).where( User.age >= 18 # Note: >= not Python's >=)adult_users = await session.exec(statement).all()# SQL: SELECT * FROM user WHERE age >= 18
# MULTIPLE CONDITIONS (AND logic)statement = select(User).where( User.age >= 18, # Condition 1 AND User.is_active == True # Condition 2)# SQL: SELECT * FROM user WHERE age >= 18 AND is_active = true
# OR CONDITIONSfrom sqlmodel import or_
statement = select(User).where( or_( User.role == "admin", # Either admin User.role == "moderator" # OR moderator ))# SQL: SELECT * FROM user WHERE role = 'admin' OR role = 'moderator'
# COMBINING AND/ORfrom sqlmodel import and_, or_
statement = select(User).where( and_( User.is_active == True, or_( User.role == "admin", User.subscription == "premium" ) ))# SQL: SELECT * FROM user# WHERE is_active = true# AND (role = 'admin' OR subscription = 'premium')
# NULL CHECKSstatement = select(User).where( User.deleted_at == None # IS NULL)# SQL: SELECT * FROM user WHERE deleted_at IS NULL
statement = select(User).where( User.bio != None # IS NOT NULL)# SQL: SELECT * FROM user WHERE bio IS NOT NULLAdvanced Queries
Section titled “Advanced Queries”# Joinsstatement = ( select(Post, User) .join(User) .where(User.is_active == True))results = session.exec(statement).all()
# Aggregationsfrom sqlmodel import funcstatement = ( select(func.count(User.id)) .where(User.created_at >= datetime(2024, 1, 1)))user_count = session.exec(statement).one()
# Group bystatement = ( select(User.country, func.count(User.id)) .group_by(User.country) .having(func.count(User.id) > 10))
# Order and limitstatement = ( select(Post) .order_by(Post.created_at.desc()) .limit(10) .offset(20))Pagination
Section titled “Pagination”from typing import Generic, TypeVar, Listfrom pydantic import BaseModel
T = TypeVar('T')
class Page(BaseModel, Generic[T]): items: List[T] total: int page: int per_page: int pages: int
@app.get("/users", response_model=Page[User])async def list_users( page: int = 1, per_page: int = 20, session: AsyncSession = Session): # Count total count_statement = select(func.count(User.id)) total = session.exec(count_statement).one()
# Get page statement = ( select(User) .offset((page - 1) * per_page) .limit(per_page) ) users = session.exec(statement).all()
return Page( items=users, total=total, page=page, per_page=per_page, pages=(total + per_page - 1) // per_page )Migrations with Alembic
Section titled “Migrations with Alembic”# Initialize migrations manuallyalembic init migrations
# Create migrationalembic revision --autogenerate -m "add users table"
# Apply migrationsalembic upgrade head
# Rollbackalembic downgrade -1Configuration
Section titled “Configuration”[alembic]script_location = migrationssqlalchemy.url = postgresql://localhost/mydb
# migrations/env.pyfrom zenith.db import SQLModelfrom app.models import * # Import all models
target_metadata = SQLModel.metadataAuto-generating Migrations
Section titled “Auto-generating Migrations”# After model changesalembic revision --autogenerate -m "add email field to users"
# Review generated migrationcat migrations/versions/xxx_add_email_field_to_users.py
# Applyalembic upgrade headTesting with Databases
Section titled “Testing with Databases”import pytestfrom zenith.testing import TestClientfrom zenith.db import create_engine, SQLModel
@pytest.fixtureasync def test_db(): """Create test database.""" engine = create_engine("sqlite:///:memory:") SQLModel.metadata.create_all(engine) yield engine engine.dispose()
@pytest.mark.asyncioasync def test_create_user(test_db): app.state.engine = test_db
async with TestClient(app) as client: response = await client.post("/users", json={ "email": "test@example.com", "name": "Test User" }) assert response.status_code == 201 user = response.json() assert user["email"] == "test@example.com"Performance Optimization
Section titled “Performance Optimization”Eager Loading
Section titled “Eager Loading”from sqlmodel import selectinload
# Avoid N+1 queriesstatement = ( select(User) .options(selectinload(User.posts)) # Load posts in one query)users = session.exec(statement).all()
# Multiple relationshipsstatement = ( select(Post) .options( selectinload(Post.author), selectinload(Post.tags) ))Bulk Operations
Section titled “Bulk Operations”# Bulk insertusers = [ User(email=f"user{i}@example.com", name=f"User {i}") for i in range(1000)]session.bulk_insert_mappings(User, users)await session.commit()
# Bulk updatesession.bulk_update_mappings( User, [{"id": 1, "is_active": False}, {"id": 2, "is_active": False}])Query Optimization
Section titled “Query Optimization”# Use indexesclass User(SQLModel, table=True): email: str = Field(index=True) # Single column index
class Config: # Composite index __table_args__ = ( Index("ix_user_email_active", "email", "is_active"), )
# Only select needed columnsstatement = select(User.id, User.email).where(User.is_active == True)
# Use exists for checksfrom sqlmodel import existshas_admin = session.exec( select(exists().where(User.role == "admin"))).one()Next Steps
Section titled “Next Steps”- Implement Authentication with database users
- Learn about Testing database operations
- Explore Performance optimization