Skip to content

Database Integration

Zenith provides integrated database integration with async SQLAlchemy, automatic migrations, and connection pooling. It supports PostgreSQL, MySQL, SQLite, and other SQL databases.

from zenith import Zenith
from zenith.core import DB # ✨ Magic dependency injection shortcut
from zenith.db import ZenithModel # Enhanced model with built-in methods
from sqlmodel import Field
from typing import Optional
from datetime import datetime
app = Zenith()
# STEP 1: Define your models with ZenithModel (not SQLModel!)
class User(ZenithModel, table=True): # table=True means this is a database table
"""User model with automatic session management.
ZenithModel gives you:
- No manual session management
- Built-in CRUD methods (create, find, update, delete)
- Chainable queries (where, order_by, limit)
- Automatic JSON serialization
"""
# Primary key - auto-incremented by database
id: Optional[int] = Field(default=None, primary_key=True)
# Unique email with index for fast lookups
email: str = Field(
unique=True, # Database enforces uniqueness
index=True # Creates index for WHERE email = ? queries
)
# Required field
name: str # No default = required
# Auto-set timestamp
created_at: datetime = Field(
default_factory=datetime.utcnow # Function called on insert
)
# STEP 2: Use in routes - NO SESSION MANAGEMENT NEEDED!
@app.post("/users")
async def create_user(
user: User, # Pydantic validates input
session: AsyncSession = Session # This injects the database session automatically!
):
"""Create a new user.
Notice what's missing:
- No async with get_session()
- No session.add()
- No session.commit()
- No session.refresh()
ZenithModel handles ALL of this!
"""
# One line to create and save!
new_user = await User.create(
name=user.name,
email=user.email
# created_at is set automatically
)
return new_user # Automatically converted to JSON
@app.get("/users")
async def get_users():
"""List users with intuitive query syntax.
Look how this reads like English!
"""
# Chainable query methods
users = await (
User.where(active=True) # Filter
.order_by('-created_at') # Sort (- means DESC)
.limit(10) # Limit results
.all() # Execute and get all
)
return {"users": users} # Auto-serialized to JSON
# That's it! Database integration complete! 🎉

Connection URLs (One URL to Rule Them All)

Section titled “Connection URLs (One URL to Rule Them All)”

PostgreSQL (Production Choice):

# Standard connection (synchronous - avoid in production)
DATABASE_URL = "postgresql://user:password@localhost/dbname"
# ^protocol^ ^user^ ^pass^ ^host^ ^database^
# ASYNC with asyncpg (RECOMMENDED for production)
DATABASE_URL = "postgresql+asyncpg://user:password@localhost/dbname"
# ^^^^^^^^ Async driver for 10x performance
# With SSL (required for cloud databases)
DATABASE_URL = "postgresql+asyncpg://user:pass@host/db?sslmode=require"
# ^^^^^^^^^^^^^^^^
# sslmode options:
# - disable: No SSL (dev only!)
# - require: Use SSL (production)
# - verify-full: SSL + hostname verification (most secure)
# Full production example
DATABASE_URL = (
"postgresql+asyncpg://"
"myapp:SecurePass123!@" # Credentials
"db.example.com:5432/" # Host and port
"production_db" # Database name
"?sslmode=require" # SSL required
"&pool_size=20" # Connection pool
"&max_overflow=0" # No overflow connections
)

MySQL (Alternative Choice):

# Standard (synchronous)
DATABASE_URL = "mysql://user:password@localhost/dbname"
# ASYNC with aiomysql (for async operations)
DATABASE_URL = "mysql+aiomysql://user:password@localhost/dbname"
# With proper character encoding (IMPORTANT for emojis 🚀)
DATABASE_URL = "mysql+aiomysql://user:pass@localhost/db?charset=utf8mb4"
# ^^^^^^^^^^^^^^^^
# utf8mb4 = Full UTF-8 support including emojis
# utf8 = Old MySQL UTF-8 (doesn't support all characters)

SQLite (Development/Testing):

# File-based database (good for development)
DATABASE_URL = "sqlite:///./app.db"
# ^^^^^^^^ ^^ ^^^^^^^^
# protocol // path to file
# ASYNC with aiosqlite (for async operations)
DATABASE_URL = "sqlite+aiosqlite:///./app.db"
# ^^^^^^^^^^^ Async driver
# In-memory database (perfect for testing - super fast!)
DATABASE_URL = "sqlite:///:memory:"
# ^^^^^^^^ Special path for RAM storage
# Development with absolute path
DATABASE_URL = "sqlite+aiosqlite:////Users/me/project/dev.db"
# ^^ Four slashes for absolute path!

Environment-based configuration (RECOMMENDED):

import os
# Use different databases per environment
DATABASE_URL = os.getenv(
"DATABASE_URL",
"sqlite+aiosqlite:///./dev.db" # Default for development
)
# Production: Set DATABASE_URL environment variable
# Development: Uses SQLite automatically

Connection Pooling (Don’t DDoS Your Database!)

Section titled “Connection Pooling (Don’t DDoS Your Database!)”
from zenith.db import create_engine
# Configure connection pool for production
engine = create_engine(
DATABASE_URL,
# POOL SIZE - How many connections to keep ready
pool_size=20,
# Rule of thumb: 2-4 connections per CPU core
# Too small: requests wait for connections
# Too large: database overhead, memory usage
# OVERFLOW - Extra connections when pool is full
max_overflow=40,
# Creates up to 40 MORE connections if needed
# Total possible: pool_size + max_overflow = 60
# Set to 0 to disable (recommended for cloud databases)
# TIMEOUT - How long to wait for a connection
pool_timeout=30,
# After 30 seconds, give up and raise error
# Prevents infinite waiting
# RECYCLE - Replace old connections
pool_recycle=3600, # 1 hour in seconds
# Why? Some databases close idle connections
# MySQL: wait_timeout default is 8 hours
# PostgreSQL: No default timeout
# Cloud databases: Often 5-15 minutes!
# PRE-PING - Test connection before use
pool_pre_ping=True,
# Sends "SELECT 1" before each connection use
# Small overhead but prevents "connection lost" errors
# ALWAYS use in production!
# DEBUG - SQL query logging
echo=False # Set True to see ALL SQL queries (dev only!)
# echo="debug" for even more detail
)
# Cloud database optimized settings
cloud_engine = create_engine(
DATABASE_URL,
pool_size=10, # Smaller pool for cloud
max_overflow=0, # No overflow (cloud connection limits)
pool_timeout=10, # Fail fast
pool_recycle=300, # 5 minutes (aggressive recycling)
pool_pre_ping=True, # Essential for cloud!
)

SQLModel combines SQLAlchemy and Pydantic for type-safe database models:

from zenith.db import SQLModel, Field
from typing import Optional
from datetime import datetime
class Product(SQLModel, table=True): # table=True creates database table
"""Product model with all common patterns.
This becomes a database table with columns matching the fields.
"""
# PRIMARY KEY - Unique identifier for each row
id: Optional[int] = Field(
default=None, # None means auto-generate
primary_key=True # This is the unique ID
)
# Database generates: 1, 2, 3, 4...
# REQUIRED FIELDS - Must provide when creating
name: str = Field(
index=True # Create index for fast searches
)
# Index makes WHERE name = 'iPhone' fast
price: float = Field(
ge=0 # Validation: greater than or equal to 0
)
# Prevents negative prices at database level
# OPTIONAL FIELDS - Can be NULL in database
description: Optional[str] = None
# None = NULL in database
# Optional[str] tells type checker this can be None
# FIELD WITH DEFAULT - Auto-set if not provided
stock: int = Field(
default=0, # Default value if not specified
ge=0 # Can't have negative stock
)
# TIMESTAMPS - Track when created/modified
created_at: datetime = Field(
default_factory=datetime.utcnow # Function called on insert
)
# Automatically set to current time when created
updated_at: Optional[datetime] = None
# Manually set this when updating
# Better: Use database triggers or middleware
# UNIQUE CONSTRAINT - No duplicates allowed
sku: str = Field(
unique=True, # Database enforces uniqueness
index=True # Also index for fast lookups
)
# Example: "PROD-12345" can only exist once
# What this creates in PostgreSQL:
# CREATE TABLE product (
# id SERIAL PRIMARY KEY,
# name VARCHAR NOT NULL,
# price FLOAT NOT NULL CHECK (price >= 0),
# description TEXT,
# stock INTEGER DEFAULT 0 CHECK (stock >= 0),
# created_at TIMESTAMP NOT NULL,
# updated_at TIMESTAMP,
# sku VARCHAR UNIQUE NOT NULL
# );
# CREATE INDEX ix_product_name ON product(name);
# CREATE INDEX ix_product_sku ON product(sku);
from sqlmodel import Relationship
# ONE-TO-MANY: One user has many posts
class User(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
email: str = Field(unique=True)
name: str
# One user -> Many posts
posts: list["Post"] = Relationship(
back_populates="author" # Post model has 'author' field
)
# Forward reference "Post" because Post class defined below
# This creates NO database column - it's Python-only navigation
class Post(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
title: str
content: str
# FOREIGN KEY - Links to User table
author_id: int = Field(
foreign_key="user.id" # References user table, id column
)
# This DOES create a database column!
# Database enforces: author_id must exist in user.id
# Many posts -> One author (reverse of User.posts)
author: User = Relationship(
back_populates="posts" # User model has 'posts' field
)
# MANY-TO-MANY: Posts can have multiple tags, tags can be on multiple posts
tags: list["Tag"] = Relationship(
back_populates="posts",
link_model="PostTag" # Join table defined below
)
# Usage example:
# user = await User.find(1)
# user.posts # List of all posts by this user (lazy loaded)
#
# post = await Post.find(1)
# post.author # The User who wrote this post
# post.tags # List of all tags on this post
class Tag(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
name: str = Field(unique=True) # "python", "tutorial", etc.
# Reverse relationship
posts: list[Post] = Relationship(
back_populates="tags",
link_model="PostTag" # Same join table
)
# JOIN TABLE for many-to-many (required!)
class PostTag(SQLModel, table=True):
"""Link table connecting posts and tags.
This creates the actual database table:
CREATE TABLE posttag (
post_id INT REFERENCES post(id),
tag_id INT REFERENCES tag(id),
PRIMARY KEY (post_id, tag_id)
)
"""
# Composite primary key (both fields together are unique)
post_id: int = Field(
foreign_key="post.id",
primary_key=True # Part 1 of composite key
)
tag_id: int = Field(
foreign_key="tag.id",
primary_key=True # Part 2 of composite key
)
# No additional fields needed for basic many-to-many
# But you could add: created_at, added_by, etc.
# Example: Tagging a post
# post = await Post.find(1)
# python_tag = await Tag.where(name="python").first()
# post.tags.append(python_tag)
# await post.save()
# Base model for shared fields
class TimestampMixin(SQLModel):
created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: Optional[datetime] = None
# Inherit from base
class Article(TimestampMixin, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
title: str
content: str
class Comment(TimestampMixin, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
text: str
article_id: int = Field(foreign_key="article.id")
from zenith import Depends
from zenith.core import DB
@app.get("/users")
async def list_users(session: AsyncSession = Session):
# Use ZenithModel for intuitive queries
users = await User.all()
return users
from zenith.db import SessionLocal
@app.post("/users")
async def create_user(user_data: UserCreate):
async with SessionLocal() as session:
user = User(**user_data.model_dump())
session.add(user)
await session.commit()
await session.refresh(user)
return user
@app.post("/transfer")
async def transfer_funds(
from_account: int,
to_account: int,
amount: float,
session: AsyncSession = Session # Injected database session
):
"""
Bank transfer - MUST be atomic!
Transaction ensures:
- ALL changes succeed together
- OR all changes rollback on error
- No partial transfers (money disappearing!)
"""
# Begin transaction - everything inside succeeds or fails together
async with db.begin(): # Auto-commit on success, auto-rollback on error
# Step 1: Get both accounts (with row locking)
sender = await db.get(
Account,
from_account,
with_for_update=True # Lock row to prevent race conditions
)
receiver = await db.get(
Account,
to_account,
with_for_update=True
)
# Step 2: Validate accounts exist
if not sender or not receiver:
raise HTTPException(404, "Account not found")
# Step 3: Check sufficient balance
if sender.balance < amount:
raise HTTPException(400, "Insufficient funds")
# Transaction automatically rolls back!
# Step 4: Perform transfer
sender.balance -= amount # Deduct from sender
receiver.balance += amount # Add to receiver
# Step 5: Audit log (also in transaction)
transfer_log = TransferLog(
from_account=from_account,
to_account=to_account,
amount=amount,
timestamp=datetime.utcnow()
)
db.add(transfer_log)
# Transaction commits here if no exceptions
# If ANY exception occurs, EVERYTHING rolls back
# Outside transaction - changes are committed
return {
"status": "success",
"from_balance": sender.balance,
"to_balance": receiver.balance
}
# What happens on error:
# 1. Exception raised anywhere in 'with' block
# 2. Database rolls back ALL changes
# 3. Sender keeps money, receiver gets nothing
# 4. Database stays consistent!
from sqlmodel import select, col
# GET ALL RECORDS
statement = select(User) # Build SELECT * FROM user
users = await session.exec(statement).all() # Execute and fetch all
# SQL: SELECT * FROM user
# GET BY PRIMARY KEY (fastest)
user = await session.get(User, user_id) # Direct lookup by ID
# SQL: SELECT * FROM user WHERE id = ?
# Returns None if not found
# FILTER WITH WHERE
statement = select(User).where(
User.age >= 18 # Note: >= not Python's >=
)
adult_users = await session.exec(statement).all()
# SQL: SELECT * FROM user WHERE age >= 18
# MULTIPLE CONDITIONS (AND logic)
statement = select(User).where(
User.age >= 18, # Condition 1 AND
User.is_active == True # Condition 2
)
# SQL: SELECT * FROM user WHERE age >= 18 AND is_active = true
# OR CONDITIONS
from sqlmodel import or_
statement = select(User).where(
or_(
User.role == "admin", # Either admin
User.role == "moderator" # OR moderator
)
)
# SQL: SELECT * FROM user WHERE role = 'admin' OR role = 'moderator'
# COMBINING AND/OR
from sqlmodel import and_, or_
statement = select(User).where(
and_(
User.is_active == True,
or_(
User.role == "admin",
User.subscription == "premium"
)
)
)
# SQL: SELECT * FROM user
# WHERE is_active = true
# AND (role = 'admin' OR subscription = 'premium')
# NULL CHECKS
statement = select(User).where(
User.deleted_at == None # IS NULL
)
# SQL: SELECT * FROM user WHERE deleted_at IS NULL
statement = select(User).where(
User.bio != None # IS NOT NULL
)
# SQL: SELECT * FROM user WHERE bio IS NOT NULL
# Joins
statement = (
select(Post, User)
.join(User)
.where(User.is_active == True)
)
results = session.exec(statement).all()
# Aggregations
from sqlmodel import func
statement = (
select(func.count(User.id))
.where(User.created_at >= datetime(2024, 1, 1))
)
user_count = session.exec(statement).one()
# Group by
statement = (
select(User.country, func.count(User.id))
.group_by(User.country)
.having(func.count(User.id) > 10)
)
# Order and limit
statement = (
select(Post)
.order_by(Post.created_at.desc())
.limit(10)
.offset(20)
)
from typing import Generic, TypeVar, List
from pydantic import BaseModel
T = TypeVar('T')
class Page(BaseModel, Generic[T]):
items: List[T]
total: int
page: int
per_page: int
pages: int
@app.get("/users", response_model=Page[User])
async def list_users(
page: int = 1,
per_page: int = 20,
session: AsyncSession = Session
):
# Count total
count_statement = select(func.count(User.id))
total = session.exec(count_statement).one()
# Get page
statement = (
select(User)
.offset((page - 1) * per_page)
.limit(per_page)
)
users = session.exec(statement).all()
return Page(
items=users,
total=total,
page=page,
per_page=per_page,
pages=(total + per_page - 1) // per_page
)
Terminal window
# Initialize migrations manually
alembic init migrations
# Create migration
alembic revision --autogenerate -m "add users table"
# Apply migrations
alembic upgrade head
# Rollback
alembic downgrade -1
alembic.ini
[alembic]
script_location = migrations
sqlalchemy.url = postgresql://localhost/mydb
# migrations/env.py
from zenith.db import SQLModel
from app.models import * # Import all models
target_metadata = SQLModel.metadata
# After model changes
alembic revision --autogenerate -m "add email field to users"
# Review generated migration
cat migrations/versions/xxx_add_email_field_to_users.py
# Apply
alembic upgrade head
import pytest
from zenith.testing import TestClient
from zenith.db import create_engine, SQLModel
@pytest.fixture
async def test_db():
"""Create test database."""
engine = create_engine("sqlite:///:memory:")
SQLModel.metadata.create_all(engine)
yield engine
engine.dispose()
@pytest.mark.asyncio
async def test_create_user(test_db):
app.state.engine = test_db
async with TestClient(app) as client:
response = await client.post("/users", json={
"email": "test@example.com",
"name": "Test User"
})
assert response.status_code == 201
user = response.json()
assert user["email"] == "test@example.com"
from sqlmodel import selectinload
# Avoid N+1 queries
statement = (
select(User)
.options(selectinload(User.posts)) # Load posts in one query
)
users = session.exec(statement).all()
# Multiple relationships
statement = (
select(Post)
.options(
selectinload(Post.author),
selectinload(Post.tags)
)
)
# Bulk insert
users = [
User(email=f"user{i}@example.com", name=f"User {i}")
for i in range(1000)
]
session.bulk_insert_mappings(User, users)
await session.commit()
# Bulk update
session.bulk_update_mappings(
User,
[{"id": 1, "is_active": False}, {"id": 2, "is_active": False}]
)
# Use indexes
class User(SQLModel, table=True):
email: str = Field(index=True) # Single column index
class Config:
# Composite index
__table_args__ = (
Index("ix_user_email_active", "email", "is_active"),
)
# Only select needed columns
statement = select(User.id, User.email).where(User.is_active == True)
# Use exists for checks
from sqlmodel import exists
has_admin = session.exec(
select(exists().where(User.role == "admin"))
).one()