Skip to content

Database Guide

This comprehensive guide covers Zenith’s database integration, from basic setup to advanced patterns.

Zenith provides seamless database integration with modern development patterns:

Key Features:

  • Zero Configuration: Works out of the box, no setup files needed
  • Request-Scoped Sessions: Automatic session management, no manual get_db() functions
  • Rails-like Query API: User.where(active=True).limit(10).all()
  • Built-in CRUD: User.create(), User.find(), User.update(), User.destroy()
  • Auto-404 Handling: User.find_or_404(id) raises HTTP 404 automatically
  • Type Safety: Full SQLModel + Pydantic validation
  • Any Database: PostgreSQL, MySQL, SQLite - same code works everywhere
  • Connection Pooling: Optimized for high performance
  • Transaction Safety: Automatic rollback on errors

Architecture: Clean database operations with intuitive APIs

# What you'd normally write for database setup
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker, declarative_base
from sqlalchemy import Column, Integer, String
import os
# Manual engine configuration
DATABASE_URL = os.getenv("DATABASE_URL", "sqlite+aiosqlite:///./app.db")
engine = create_async_engine(DATABASE_URL, echo=True)
AsyncSessionLocal = sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)
Base = declarative_base()
# Manual model definition
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
name = Column(String)
email = Column(String)
# Manual session management everywhere
async def get_db():
async with AsyncSessionLocal() as session:
yield session
# Manual CRUD operations
async def get_all_users():
async with AsyncSessionLocal() as session:
result = await session.execute(select(User))
return result.scalars().all()
# Multiple files and extensive configuration required

Different databases require specific URL formats:

# PostgreSQL (recommended for production)
DATABASE_URL = "postgresql+asyncpg://user:password@localhost:5432/dbname"
# MySQL/MariaDB
DATABASE_URL = "mysql+aiomysql://user:password@localhost:3306/dbname"
# SQLite (development only)
DATABASE_URL = "sqlite+aiosqlite:///./app.db"
# With connection parameters
DATABASE_URL = "postgresql+asyncpg://user:password@localhost/dbname?ssl=require&timeout=10"

Optimize connection usage with pooling:

from sqlalchemy.pool import NullPool, QueuePool, StaticPool
# Production settings
engine = create_async_engine(
DATABASE_URL,
poolclass=QueuePool,
pool_size=20, # Number of persistent connections
max_overflow=40, # Maximum overflow connections
pool_timeout=30, # Timeout waiting for connection
pool_recycle=1800, # Recycle connections after 30 minutes
pool_pre_ping=True, # Test connections before use
)
# Development settings (no pooling)
engine = create_async_engine(
DATABASE_URL,
poolclass=NullPool, # No connection pooling
echo=True # Log all SQL
)
# In-memory SQLite (testing)
engine = create_async_engine(
"sqlite+aiosqlite:///:memory:",
poolclass=StaticPool, # Single connection for SQLite
connect_args={"check_same_thread": False}
)

ZenithModel extends SQLModel with convenient methods:

from zenith.db import ZenithModel
from sqlmodel import Field, Relationship
from datetime import datetime
from typing import Optional, List
class BaseModel(ZenithModel):
"""Base model with common fields."""
id: Optional[int] = Field(default=None, primary_key=True)
created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: datetime = Field(default_factory=datetime.utcnow)
class Config:
# Pydantic configuration
json_encoders = {
datetime: lambda v: v.isoformat()
}
class User(BaseModel, table=True):
"""User model with all field types."""
__tablename__ = "users" # Custom table name
# Required fields
email: str = Field(unique=True, index=True, sa_column_kwargs={"nullable": False})
username: str = Field(index=True, min_length=3, max_length=50)
# Optional fields
full_name: Optional[str] = Field(default=None, max_length=100)
bio: Optional[str] = Field(default=None, sa_column_kwargs={"type_": Text})
# Boolean with default
is_active: bool = Field(default=True)
is_verified: bool = Field(default=False)
# Numeric fields
age: Optional[int] = Field(default=None, ge=0, le=150)
balance: float = Field(default=0.0, ge=0.0)
# JSON field
metadata: dict = Field(default_factory=dict, sa_column_kwargs={"type_": JSON})
# Relationships
posts: List["Post"] = Relationship(back_populates="author", cascade_delete=True)
profile: Optional["Profile"] = Relationship(back_populates="user", sa_relationship_kwargs={"uselist": False})
# Computed property
@property
def display_name(self) -> str:
return self.full_name or self.username
# Validation
@field_validator("email")
def validate_email(cls, v):
if "@" not in v:
raise ValueError("Invalid email")
return v.lower()
# Indexes and constraints
__table_args__ = (
Index("idx_user_email_active", "email", "is_active"),
CheckConstraint("age >= 0", name="check_age_positive"),
UniqueConstraint("email", "username", name="uq_email_username"),
)

ZenithModel provides intuitive query methods:

# Create
user = await User.create(
email="alice@example.com",
username="alice",
full_name="Alice Smith"
)
# Find by ID
user = await User.find(1) # Returns None if not found
user = await User.find_or_404(1) # Raises 404 if not found
# Find by field
user = await User.find_by(email="alice@example.com")
# Get all
all_users = await User.all()
# Filtering
active_users = await User.where(is_active=True).all()
verified_users = await User.where(is_active=True, is_verified=True).all()
# Complex queries with operators
from sqlmodel import or_, and_, not_
adults = await User.where(User.age >= 18).all()
users_with_balance = await User.where(User.balance > 0).all()
matching_users = await User.where(
or_(
User.email.contains("example.com"),
User.username.startswith("admin")
)
).all()
# Ordering
recent_users = await User.order_by("-created_at").limit(10)
alphabetical = await User.order_by("username").all()
# Pagination
page1 = await User.offset(0).limit(10)
page2 = await User.offset(10).limit(10)
# Counting
total_users = await User.count()
active_count = await User.where(is_active=True).count()
# Updates
user = await User.find(1)
user.email = "newemail@example.com"
await user.save()
# Or update directly
await User.update(1, email="newemail@example.com", is_verified=True)
# Delete
await user.delete()
# Or
await User.delete(1)
# Bulk operations
await User.bulk_create([
User(email="user1@example.com", username="user1"),
User(email="user2@example.com", username="user2"),
])
await User.bulk_update(
[{"id": 1, "is_active": False}, {"id": 2, "is_active": False}]
)
await User.where(is_active=False).delete_all()

Build complex queries fluently with seamless method chaining:

# Seamless single-line chaining - no intermediate awaits needed!
user = await User.where(email="alice@example.com").first()
users = await User.where(active=True).order_by("-created_at").limit(10).all()
count = await User.where(active=True).count()
# Chain multiple conditions
results = await (User
.where(is_active=True)
.where(User.age >= 18)
.order_by("-created_at")
.limit(20)
.offset(40)
.all())
# With relationships (eager loading)
users_with_posts = await (User
.where(is_active=True)
.includes("posts") # Eager load posts
.all())
# Select specific columns
emails = await (User
.select(User.email, User.username)
.where(is_verified=True)
.all())
# Aggregate functions
from sqlmodel import func
user_stats = await (User
.select(
func.count(User.id).label("total"),
func.avg(User.age).label("avg_age"),
func.max(User.created_at).label("latest_signup")
)
.where(is_active=True)
.one())
class Author(ZenithModel, table=True):
id: Optional[int] = Field(primary_key=True)
name: str
# One author has many books
books: List["Book"] = Relationship(back_populates="author")
class Book(ZenithModel, table=True):
id: Optional[int] = Field(primary_key=True)
title: str
author_id: int = Field(foreign_key="author.id")
# Many books belong to one author
author: Author = Relationship(back_populates="books")
# Usage
author = await Author.create(name="Stephen King")
book = await Book.create(title="The Shining", author_id=author.id)
# Eager loading
author_with_books = await Author.find(1).includes("books")
for book in author_with_books.books:
print(book.title)
# Reverse query
books_by_author = await Book.where(author_id=author.id).all()
class User(ZenithModel, table=True):
id: Optional[int] = Field(primary_key=True)
email: str
# One user has one profile
profile: Optional["Profile"] = Relationship(
back_populates="user",
sa_relationship_kwargs={"uselist": False}
)
class Profile(ZenithModel, table=True):
id: Optional[int] = Field(primary_key=True)
user_id: int = Field(foreign_key="user.id", unique=True)
bio: str
# One profile belongs to one user
user: User = Relationship(back_populates="profile")
# Usage
user = await User.create(email="alice@example.com")
profile = await Profile.create(user_id=user.id, bio="Developer")
# Access relationship
user_with_profile = await User.find(user.id).includes("profile")
print(user_with_profile.profile.bio)
# Association table
class PostTag(SQLModel, table=True):
post_id: int = Field(foreign_key="post.id", primary_key=True)
tag_id: int = Field(foreign_key="tag.id", primary_key=True)
# Optional: additional fields on relationship
added_at: datetime = Field(default_factory=datetime.utcnow)
class Post(ZenithModel, table=True):
id: Optional[int] = Field(primary_key=True)
title: str
# Many posts have many tags
tags: List["Tag"] = Relationship(
back_populates="posts",
link_model=PostTag
)
class Tag(ZenithModel, table=True):
id: Optional[int] = Field(primary_key=True)
name: str = Field(unique=True)
# Many tags belong to many posts
posts: List[Post] = Relationship(
back_populates="tags",
link_model=PostTag
)
# Usage
post = await Post.create(title="Python Tips")
tag1 = await Tag.create(name="python")
tag2 = await Tag.create(name="programming")
# Add tags to post
await PostTag.create(post_id=post.id, tag_id=tag1.id)
await PostTag.create(post_id=post.id, tag_id=tag2.id)
# Query with tags
post_with_tags = await Post.find(post.id).includes("tags")
for tag in post_with_tags.tags:
print(tag.name)
# Find posts by tag
python_posts = await Post.where(
Post.tags.any(Tag.name == "python")
).all()
class Employee(ZenithModel, table=True):
id: Optional[int] = Field(primary_key=True)
name: str
manager_id: Optional[int] = Field(foreign_key="employee.id")
# Self-referential relationships
manager: Optional["Employee"] = Relationship(
back_populates="subordinates",
sa_relationship_kwargs={
"remote_side": "Employee.id"
}
)
subordinates: List["Employee"] = Relationship(
back_populates="manager"
)
# Usage
ceo = await Employee.create(name="Alice CEO")
manager = await Employee.create(name="Bob Manager", manager_id=ceo.id)
employee = await Employee.create(name="Charlie Employee", manager_id=manager.id)
# Get hierarchy
manager_with_team = await Employee.find(manager.id).includes("subordinates")
# BAD: N+1 queries
users = await User.all() # 1 query
for user in users:
posts = await Post.where(author_id=user.id).all() # N queries
print(f"{user.name}: {len(posts)} posts")
# GOOD: Eager loading
users = await User.includes("posts").all() # 2 queries total
for user in users:
print(f"{user.name}: {len(user.posts)} posts") # No additional queries
# BETTER: Join loading for single query
from sqlalchemy.orm import selectinload, joinedload
stmt = select(User).options(
joinedload(User.posts) # Single query with JOIN
)
users = await session.exec(stmt).all()
# 1. Use select_related for one-to-one and foreign keys
posts = await Post.includes("author").all() # Joins in single query
# 2. Use prefetch_related for many-to-many
posts = await Post.includes("tags").all() # Separate query for tags
# 3. Only select needed columns
emails = await User.select(User.email).where(is_active=True).all()
# 4. Use exists() for existence checks
user_exists = await User.where(email="test@example.com").exists()
# 5. Batch operations
await User.bulk_create(users) # Single INSERT for multiple rows
# 6. Use raw SQL for complex queries
from sqlalchemy import text
result = await session.exec(
text("""
SELECT u.id, u.email, COUNT(p.id) as post_count
FROM users u
LEFT JOIN posts p ON u.id = p.author_id
WHERE u.is_active = true
GROUP BY u.id, u.email
HAVING COUNT(p.id) > 10
""")
).all()
class OptimizedModel(ZenithModel, table=True):
# Single column index
email: str = Field(index=True)
# Unique index
username: str = Field(unique=True)
# Composite indexes
__table_args__ = (
# Multi-column index
Index("idx_user_active_verified", "is_active", "is_verified"),
# Unique constraint on multiple columns
UniqueConstraint("email", "tenant_id", name="uq_email_per_tenant"),
# Partial index (PostgreSQL)
Index(
"idx_active_users",
"email",
postgresql_where="is_active = true"
),
# Full-text search index (PostgreSQL)
Index(
"idx_fulltext",
"title",
postgresql_using="gin",
postgresql_ops={"title": "gin_trgm_ops"}
),
)
import time
from contextlib import asynccontextmanager
@asynccontextmanager
async def query_timer(description: str):
"""Monitor query execution time."""
start = time.perf_counter()
try:
yield
finally:
elapsed = time.perf_counter() - start
if elapsed > 1.0: # Log slow queries
logger.warning(f"Slow query ({elapsed:.2f}s): {description}")
# Usage
async with query_timer("Load users with posts"):
users = await User.includes("posts").all()
# Enable query logging
engine = create_async_engine(
DATABASE_URL,
echo=True, # Log all queries
echo_pool="debug" # Log connection pool events
)
# Query explain plan (PostgreSQL)
from sqlalchemy import text
explain = await session.exec(
text("EXPLAIN ANALYZE SELECT * FROM users WHERE is_active = true")
).all()
Terminal window
# Initialize Alembic
alembic init alembic
# Create first migration
alembic revision --autogenerate -m "Initial migration"
# Apply migrations
alembic upgrade head
# Rollback
alembic downgrade -1

Edit alembic/env.py:

import asyncio
from sqlalchemy import pool
from sqlalchemy.ext.asyncio import async_engine_from_config
from alembic import context
from app.models import * # Import all models
def run_migrations_online():
"""Run migrations in online mode with async support."""
configuration = config.get_section(config.config_ini_section)
configuration["sqlalchemy.url"] = os.getenv("DATABASE_URL")
connectable = async_engine_from_config(
configuration,
prefix="sqlalchemy.",
poolclass=pool.NullPool,
future=True
)
async def run_async_migrations():
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
asyncio.run(run_async_migrations())
def do_run_migrations(connection):
context.configure(
connection=connection,
target_metadata=target_metadata,
compare_type=True, # Detect column type changes
compare_server_default=True, # Detect default changes
)
with context.begin_transaction():
context.run_migrations()
"""Add full text search
Revision ID: abc123
Revises: def456
Create Date: 2024-01-01 00:00:00
"""
from alembic import op
import sqlalchemy as sa
def upgrade():
# Create extension
op.execute("CREATE EXTENSION IF NOT EXISTS pg_trgm")
# Add GIN index for full-text search
op.create_index(
"idx_posts_search",
"posts",
[sa.text("to_tsvector('english', title || ' ' || content)")],
postgresql_using="gin"
)
# Add computed column
op.add_column(
"users",
sa.Column(
"full_name",
sa.String(),
sa.Computed("first_name || ' ' || last_name", persisted=True)
)
)
def downgrade():
op.drop_index("idx_posts_search")
op.drop_column("users", "full_name")
from typing import Optional
from datetime import datetime
class SoftDeleteMixin(ZenithModel):
"""Mixin for soft delete functionality."""
is_deleted: bool = Field(default=False, index=True)
deleted_at: Optional[datetime] = Field(default=None)
deleted_by: Optional[int] = Field(default=None)
async def soft_delete(self, user_id: Optional[int] = None):
"""Mark record as deleted without removing from database."""
self.is_deleted = True
self.deleted_at = datetime.utcnow()
self.deleted_by = user_id
await self.save()
async def restore(self):
"""Restore soft-deleted record."""
self.is_deleted = False
self.deleted_at = None
self.deleted_by = None
await self.save()
@classmethod
def active(cls):
"""Filter to only non-deleted records."""
return cls.where(is_deleted=False)
@classmethod
def deleted(cls):
"""Filter to only deleted records."""
return cls.where(is_deleted=True)
class Document(SoftDeleteMixin, table=True):
id: Optional[int] = Field(primary_key=True)
title: str
# Usage
doc = await Document.find(1)
await doc.soft_delete(user_id=current_user.id)
# Query only active documents
active_docs = await Document.active().all()
# Include deleted documents
all_docs = await Document.all()
# Restore
await doc.restore()
class AuditMixin(ZenithModel):
"""Track all changes to records."""
created_by: Optional[int] = Field(foreign_key="user.id")
created_at: datetime = Field(default_factory=datetime.utcnow)
updated_by: Optional[int] = Field(foreign_key="user.id")
updated_at: datetime = Field(default_factory=datetime.utcnow)
version: int = Field(default=1)
@classmethod
async def create_with_audit(cls, user_id: int, **data):
"""Create with audit information."""
instance = cls(
**data,
created_by=user_id,
updated_by=user_id
)
return await instance.save()
async def update_with_audit(self, user_id: int, **data):
"""Update with audit information."""
for key, value in data.items():
setattr(self, key, value)
self.updated_by = user_id
self.updated_at = datetime.utcnow()
self.version += 1
return await self.save()
# Separate audit log table
class AuditLog(ZenithModel, table=True):
id: Optional[int] = Field(primary_key=True)
table_name: str
record_id: int
action: str # CREATE, UPDATE, DELETE
changes: dict = Field(sa_column_kwargs={"type_": JSON})
user_id: int
timestamp: datetime = Field(default_factory=datetime.utcnow)
@classmethod
async def log_change(cls, model, action: str, user_id: int, changes: dict = None):
"""Log a change to audit trail."""
await cls.create(
table_name=model.__tablename__,
record_id=model.id,
action=action,
changes=changes or {},
user_id=user_id
)
class TenantMixin(ZenithModel):
"""Multi-tenant data isolation."""
tenant_id: int = Field(index=True)
@classmethod
def for_tenant(cls, tenant_id: int):
"""Filter by tenant."""
return cls.where(tenant_id=tenant_id)
@classmethod
async def create_for_tenant(cls, tenant_id: int, **data):
"""Create record for specific tenant."""
return await cls.create(tenant_id=tenant_id, **data)
class TenantAwareModel(TenantMixin, table=True):
id: Optional[int] = Field(primary_key=True)
name: str
__table_args__ = (
# Ensure uniqueness within tenant
UniqueConstraint("tenant_id", "name", name="uq_name_per_tenant"),
)
# Usage with dependency injection
async def get_current_tenant(request: Request) -> int:
"""Extract tenant from request."""
# Could be from subdomain, header, or JWT
return request.headers.get("X-Tenant-ID")
@app.get("/items")
async def list_items(tenant_id: int = Depends(get_current_tenant)):
return await TenantAwareModel.for_tenant(tenant_id).all()
import hashlib
import pickle
from functools import wraps
class CachedModel(ZenithModel):
"""Model with built-in caching."""
@classmethod
def cache_key(cls, **filters):
"""Generate cache key from filters."""
key_str = f"{cls.__name__}:{filters}"
return hashlib.md5(key_str.encode()).hexdigest()
@classmethod
async def find_cached(cls, id: int, ttl: int = 300):
"""Find with caching."""
cache_key = f"{cls.__name__}:{id}"
# Try cache first
cached = await redis.get(cache_key)
if cached:
return pickle.loads(cached)
# Load from database
instance = await cls.find(id)
if instance:
await redis.setex(
cache_key,
ttl,
pickle.dumps(instance)
)
return instance
async def invalidate_cache(self):
"""Remove from cache after update."""
cache_key = f"{self.__class__.__name__}:{self.id}"
await redis.delete(cache_key)
async def save(self):
"""Save and invalidate cache."""
result = await super().save()
await self.invalidate_cache()
return result
# JSON operations
from sqlalchemy.dialects.postgresql import JSONB
class Configuration(ZenithModel, table=True):
settings: dict = Field(sa_column_kwargs={"type_": JSONB})
# Query JSON fields
configs = await Configuration.where(
Configuration.settings["theme"] == "dark"
).all()
# Array fields
from sqlalchemy.dialects.postgresql import ARRAY
class Article(ZenithModel, table=True):
tags: List[str] = Field(sa_column_kwargs={"type_": ARRAY(String)})
# Query arrays
articles = await Article.where(
Article.tags.contains(["python", "async"])
).all()
# Full-text search
from sqlalchemy import func
results = await Post.where(
func.to_tsvector("english", Post.content).match("python async")
).all()
# Window functions
from sqlalchemy import select, func
stmt = select(
User.id,
User.name,
func.row_number().over(
partition_by=User.department_id,
order_by=User.salary.desc()
).label("rank")
)
# SQLite doesn't support some features
if settings.DATABASE_URL.startswith("sqlite"):
# No concurrent writes
engine = create_async_engine(
DATABASE_URL,
connect_args={"check_same_thread": False},
poolclass=StaticPool # Single connection
)
# Limited ALTER TABLE
# Can't drop columns, modify constraints
# No arrays or JSON queries
# Store as TEXT and parse in Python
# Reuse sessions with dependency injection
async def get_db():
async with async_session() as session:
yield session
@app.get("/users")
async def list_users(db: AsyncSession = Depends(get_db)):
return await db.exec(select(User)).all()
# Use context managers
async with async_session() as session:
async with session.begin():
# All operations in transaction
await perform_operations(session)
# Instead of many individual inserts
for data in user_data_list:
await User.create(**data) # BAD: N queries
# Use bulk operations
await User.bulk_create([
User(**data) for data in user_data_list
]) # GOOD: 1 query
# Prevent unexpected queries
class Post(ZenithModel, table=True):
# Lazy load by default
author: User = Relationship(back_populates="posts")
# Always eager load
tags: List[Tag] = Relationship(
back_populates="posts",
sa_relationship_kwargs={"lazy": "joined"}
)
from functools import lru_cache
@lru_cache(maxsize=100)
async def get_user_cached(user_id: int):
"""Cache frequently accessed users."""
return await User.find(user_id)
# Clear cache when user updates
get_user_cached.cache_clear()

Solution: Increase pool size, ensure sessions are closed properly, use connection pooling

Solution: Add indexes, use eager loading, optimize N+1 queries, analyze with EXPLAIN

Solution: Use WAL mode, limit concurrent writes, consider PostgreSQL for production

Solution: Review migration history, resolve conflicts manually, test migrations in staging

# BAD: String concatenation
query = f"SELECT * FROM users WHERE email = '{email}'"
# BAD: String formatting
query = "SELECT * FROM users WHERE email = '%s'" % email
# GOOD: Parameterized queries
stmt = select(User).where(User.email == email)
# GOOD: With raw SQL
from sqlalchemy import text
stmt = text("SELECT * FROM users WHERE email = :email")
result = await session.exec(stmt, {"email": email})
from cryptography.fernet import Fernet
class EncryptedField(Field):
"""Field that encrypts data at rest."""
def __init__(self, *args, **kwargs):
self.fernet = Fernet(settings.ENCRYPTION_KEY)
super().__init__(*args, **kwargs)
def encrypt(self, value: str) -> bytes:
return self.fernet.encrypt(value.encode())
def decrypt(self, value: bytes) -> str:
return self.fernet.decrypt(value).decode()
class SecureModel(ZenithModel, table=True):
ssn: str = EncryptedField() # Encrypted at rest

Need help? Check our FAQ or ask in GitHub Discussions.