Testing Guide
Learn to test database operations effectively Testing Guide →
This comprehensive guide covers Zenith’s database integration, from basic setup to advanced patterns.
Zenith provides seamless database integration with modern development patterns:
Key Features:
get_db() functionsUser.where(active=True).limit(10).all()User.create(), User.find(), User.update(), User.destroy()User.find_or_404(id) raises HTTP 404 automaticallyArchitecture: Clean database operations with intuitive APIs
# What you'd normally write for database setupfrom sqlalchemy.ext.asyncio import create_async_engine, AsyncSessionfrom sqlalchemy.orm import sessionmaker, declarative_basefrom sqlalchemy import Column, Integer, Stringimport os
# Manual engine configurationDATABASE_URL = os.getenv("DATABASE_URL", "sqlite+aiosqlite:///./app.db")engine = create_async_engine(DATABASE_URL, echo=True)AsyncSessionLocal = sessionmaker( engine, class_=AsyncSession, expire_on_commit=False)Base = declarative_base()
# Manual model definitionclass User(Base): __tablename__ = "users" id = Column(Integer, primary_key=True) name = Column(String) email = Column(String)
# Manual session management everywhereasync def get_db(): async with AsyncSessionLocal() as session: yield session
# Manual CRUD operationsasync def get_all_users(): async with AsyncSessionLocal() as session: result = await session.execute(select(User)) return result.scalars().all()
# Multiple files and extensive configuration required# With Zenith - clean and streamlinedfrom zenith import Zenithfrom zenith.db import ZenithModelfrom sqlmodel import Field
# Zenith automatically configures:# - Database engine with connection pooling# - Session management# - Transaction handling# - Error handling# - Development vs production settingsapp = Zenith()
# Clean model definition with built-in methodsclass User(ZenithModel, table=True): id: int | None = Field(primary_key=True) name: str email: str = Field(unique=True)
# Built-in query methods - no session management needed!users = await User.all() # Get all usersuser = await User.find(123) # Find by IDuser = await User.find_or_404(123) # With auto-404users = await User.where(name="Alice").all() # Filter query
# ZenithModel automatically:# Manages request-scoped sessions# Handles transactions and rollbacks# Provides Rails-like query methods# Includes serialization (model_dump())# Validates data with Pydantic# Works with any database (PostgreSQL, MySQL, SQLite)
# Zero configuration, maximum productivity!from sqlalchemy.ext.asyncio import create_async_engine, AsyncSessionfrom sqlalchemy.orm import sessionmakerfrom sqlmodel import SQLModel
# Create engine with connection poolengine = create_async_engine( "postgresql+asyncpg://user:password@localhost/dbname", echo=True, # SQL logging pool_size=20, # Connection pool size max_overflow=40, # Maximum overflow connections pool_pre_ping=True, # Verify connections before use pool_recycle=3600, # Recycle connections after 1 hour)
# Create session factoryasync_session = sessionmaker( engine, class_=AsyncSession, expire_on_commit=False)
# Initialize databaseasync def init_db(): async with engine.begin() as conn: await conn.run_sync(SQLModel.metadata.create_all)Different databases require specific URL formats:
# PostgreSQL (recommended for production)DATABASE_URL = "postgresql+asyncpg://user:password@localhost:5432/dbname"
# MySQL/MariaDBDATABASE_URL = "mysql+aiomysql://user:password@localhost:3306/dbname"
# SQLite (development only)DATABASE_URL = "sqlite+aiosqlite:///./app.db"
# With connection parametersDATABASE_URL = "postgresql+asyncpg://user:password@localhost/dbname?ssl=require&timeout=10"Optimize connection usage with pooling:
from sqlalchemy.pool import NullPool, QueuePool, StaticPool
# Production settingsengine = create_async_engine( DATABASE_URL, poolclass=QueuePool, pool_size=20, # Number of persistent connections max_overflow=40, # Maximum overflow connections pool_timeout=30, # Timeout waiting for connection pool_recycle=1800, # Recycle connections after 30 minutes pool_pre_ping=True, # Test connections before use)
# Development settings (no pooling)engine = create_async_engine( DATABASE_URL, poolclass=NullPool, # No connection pooling echo=True # Log all SQL)
# In-memory SQLite (testing)engine = create_async_engine( "sqlite+aiosqlite:///:memory:", poolclass=StaticPool, # Single connection for SQLite connect_args={"check_same_thread": False})ZenithModel extends SQLModel with convenient methods:
from zenith.db import ZenithModelfrom sqlmodel import Field, Relationshipfrom datetime import datetimefrom typing import Optional, List
class BaseModel(ZenithModel): """Base model with common fields."""
id: Optional[int] = Field(default=None, primary_key=True) created_at: datetime = Field(default_factory=datetime.utcnow) updated_at: datetime = Field(default_factory=datetime.utcnow)
class Config: # Pydantic configuration json_encoders = { datetime: lambda v: v.isoformat() }
class User(BaseModel, table=True): """User model with all field types.""" __tablename__ = "users" # Custom table name
# Required fields email: str = Field(unique=True, index=True, sa_column_kwargs={"nullable": False}) username: str = Field(index=True, min_length=3, max_length=50)
# Optional fields full_name: Optional[str] = Field(default=None, max_length=100) bio: Optional[str] = Field(default=None, sa_column_kwargs={"type_": Text})
# Boolean with default is_active: bool = Field(default=True) is_verified: bool = Field(default=False)
# Numeric fields age: Optional[int] = Field(default=None, ge=0, le=150) balance: float = Field(default=0.0, ge=0.0)
# JSON field metadata: dict = Field(default_factory=dict, sa_column_kwargs={"type_": JSON})
# Relationships posts: List["Post"] = Relationship(back_populates="author", cascade_delete=True) profile: Optional["Profile"] = Relationship(back_populates="user", sa_relationship_kwargs={"uselist": False})
# Computed property @property def display_name(self) -> str: return self.full_name or self.username
# Validation @field_validator("email") def validate_email(cls, v): if "@" not in v: raise ValueError("Invalid email") return v.lower()
# Indexes and constraints __table_args__ = ( Index("idx_user_email_active", "email", "is_active"), CheckConstraint("age >= 0", name="check_age_positive"), UniqueConstraint("email", "username", name="uq_email_username"), )ZenithModel provides intuitive query methods:
# Createuser = await User.create( email="alice@example.com", username="alice", full_name="Alice Smith")
# Find by IDuser = await User.find(1) # Returns None if not founduser = await User.find_or_404(1) # Raises 404 if not found
# Find by fielduser = await User.find_by(email="alice@example.com")
# Get allall_users = await User.all()
# Filteringactive_users = await User.where(is_active=True).all()verified_users = await User.where(is_active=True, is_verified=True).all()
# Complex queries with operatorsfrom sqlmodel import or_, and_, not_
adults = await User.where(User.age >= 18).all()users_with_balance = await User.where(User.balance > 0).all()matching_users = await User.where( or_( User.email.contains("example.com"), User.username.startswith("admin") )).all()
# Orderingrecent_users = await User.order_by("-created_at").limit(10)alphabetical = await User.order_by("username").all()
# Paginationpage1 = await User.offset(0).limit(10)page2 = await User.offset(10).limit(10)
# Countingtotal_users = await User.count()active_count = await User.where(is_active=True).count()
# Updatesuser = await User.find(1)user.email = "newemail@example.com"await user.save()
# Or update directlyawait User.update(1, email="newemail@example.com", is_verified=True)
# Deleteawait user.delete()# Orawait User.delete(1)
# Bulk operationsawait User.bulk_create([ User(email="user1@example.com", username="user1"), User(email="user2@example.com", username="user2"),])
await User.bulk_update( [{"id": 1, "is_active": False}, {"id": 2, "is_active": False}])
await User.where(is_active=False).delete_all()Build complex queries fluently with seamless method chaining:
# Seamless single-line chaining - no intermediate awaits needed!user = await User.where(email="alice@example.com").first()users = await User.where(active=True).order_by("-created_at").limit(10).all()count = await User.where(active=True).count()
# Chain multiple conditionsresults = await (User .where(is_active=True) .where(User.age >= 18) .order_by("-created_at") .limit(20) .offset(40) .all())
# With relationships (eager loading)users_with_posts = await (User .where(is_active=True) .includes("posts") # Eager load posts .all())# Select specific columnsemails = await (User .select(User.email, User.username) .where(is_verified=True) .all())
# Aggregate functionsfrom sqlmodel import func
user_stats = await (User .select( func.count(User.id).label("total"), func.avg(User.age).label("avg_age"), func.max(User.created_at).label("latest_signup") ) .where(is_active=True) .one())class Author(ZenithModel, table=True): id: Optional[int] = Field(primary_key=True) name: str
# One author has many books books: List["Book"] = Relationship(back_populates="author")
class Book(ZenithModel, table=True): id: Optional[int] = Field(primary_key=True) title: str author_id: int = Field(foreign_key="author.id")
# Many books belong to one author author: Author = Relationship(back_populates="books")
# Usageauthor = await Author.create(name="Stephen King")book = await Book.create(title="The Shining", author_id=author.id)
# Eager loadingauthor_with_books = await Author.find(1).includes("books")for book in author_with_books.books: print(book.title)
# Reverse querybooks_by_author = await Book.where(author_id=author.id).all()class User(ZenithModel, table=True): id: Optional[int] = Field(primary_key=True) email: str
# One user has one profile profile: Optional["Profile"] = Relationship( back_populates="user", sa_relationship_kwargs={"uselist": False} )
class Profile(ZenithModel, table=True): id: Optional[int] = Field(primary_key=True) user_id: int = Field(foreign_key="user.id", unique=True) bio: str
# One profile belongs to one user user: User = Relationship(back_populates="profile")
# Usageuser = await User.create(email="alice@example.com")profile = await Profile.create(user_id=user.id, bio="Developer")
# Access relationshipuser_with_profile = await User.find(user.id).includes("profile")print(user_with_profile.profile.bio)# Association tableclass PostTag(SQLModel, table=True): post_id: int = Field(foreign_key="post.id", primary_key=True) tag_id: int = Field(foreign_key="tag.id", primary_key=True)
# Optional: additional fields on relationship added_at: datetime = Field(default_factory=datetime.utcnow)
class Post(ZenithModel, table=True): id: Optional[int] = Field(primary_key=True) title: str
# Many posts have many tags tags: List["Tag"] = Relationship( back_populates="posts", link_model=PostTag )
class Tag(ZenithModel, table=True): id: Optional[int] = Field(primary_key=True) name: str = Field(unique=True)
# Many tags belong to many posts posts: List[Post] = Relationship( back_populates="tags", link_model=PostTag )
# Usagepost = await Post.create(title="Python Tips")tag1 = await Tag.create(name="python")tag2 = await Tag.create(name="programming")
# Add tags to postawait PostTag.create(post_id=post.id, tag_id=tag1.id)await PostTag.create(post_id=post.id, tag_id=tag2.id)
# Query with tagspost_with_tags = await Post.find(post.id).includes("tags")for tag in post_with_tags.tags: print(tag.name)
# Find posts by tagpython_posts = await Post.where( Post.tags.any(Tag.name == "python")).all()class Employee(ZenithModel, table=True): id: Optional[int] = Field(primary_key=True) name: str manager_id: Optional[int] = Field(foreign_key="employee.id")
# Self-referential relationships manager: Optional["Employee"] = Relationship( back_populates="subordinates", sa_relationship_kwargs={ "remote_side": "Employee.id" } ) subordinates: List["Employee"] = Relationship( back_populates="manager" )
# Usageceo = await Employee.create(name="Alice CEO")manager = await Employee.create(name="Bob Manager", manager_id=ceo.id)employee = await Employee.create(name="Charlie Employee", manager_id=manager.id)
# Get hierarchymanager_with_team = await Employee.find(manager.id).includes("subordinates")# BAD: N+1 queriesusers = await User.all() # 1 queryfor user in users: posts = await Post.where(author_id=user.id).all() # N queries print(f"{user.name}: {len(posts)} posts")
# GOOD: Eager loadingusers = await User.includes("posts").all() # 2 queries totalfor user in users: print(f"{user.name}: {len(user.posts)} posts") # No additional queries
# BETTER: Join loading for single queryfrom sqlalchemy.orm import selectinload, joinedload
stmt = select(User).options( joinedload(User.posts) # Single query with JOIN)users = await session.exec(stmt).all()# 1. Use select_related for one-to-one and foreign keysposts = await Post.includes("author").all() # Joins in single query
# 2. Use prefetch_related for many-to-manyposts = await Post.includes("tags").all() # Separate query for tags
# 3. Only select needed columnsemails = await User.select(User.email).where(is_active=True).all()
# 4. Use exists() for existence checksuser_exists = await User.where(email="test@example.com").exists()
# 5. Batch operationsawait User.bulk_create(users) # Single INSERT for multiple rows
# 6. Use raw SQL for complex queriesfrom sqlalchemy import text
result = await session.exec( text(""" SELECT u.id, u.email, COUNT(p.id) as post_count FROM users u LEFT JOIN posts p ON u.id = p.author_id WHERE u.is_active = true GROUP BY u.id, u.email HAVING COUNT(p.id) > 10 """)).all()class OptimizedModel(ZenithModel, table=True): # Single column index email: str = Field(index=True)
# Unique index username: str = Field(unique=True)
# Composite indexes __table_args__ = ( # Multi-column index Index("idx_user_active_verified", "is_active", "is_verified"),
# Unique constraint on multiple columns UniqueConstraint("email", "tenant_id", name="uq_email_per_tenant"),
# Partial index (PostgreSQL) Index( "idx_active_users", "email", postgresql_where="is_active = true" ),
# Full-text search index (PostgreSQL) Index( "idx_fulltext", "title", postgresql_using="gin", postgresql_ops={"title": "gin_trgm_ops"} ), )import timefrom contextlib import asynccontextmanager
@asynccontextmanagerasync def query_timer(description: str): """Monitor query execution time.""" start = time.perf_counter() try: yield finally: elapsed = time.perf_counter() - start if elapsed > 1.0: # Log slow queries logger.warning(f"Slow query ({elapsed:.2f}s): {description}")
# Usageasync with query_timer("Load users with posts"): users = await User.includes("posts").all()
# Enable query loggingengine = create_async_engine( DATABASE_URL, echo=True, # Log all queries echo_pool="debug" # Log connection pool events)
# Query explain plan (PostgreSQL)from sqlalchemy import text
explain = await session.exec( text("EXPLAIN ANALYZE SELECT * FROM users WHERE is_active = true")).all()# Initialize Alembicalembic init alembic
# Create first migrationalembic revision --autogenerate -m "Initial migration"
# Apply migrationsalembic upgrade head
# Rollbackalembic downgrade -1Edit alembic/env.py:
import asynciofrom sqlalchemy import poolfrom sqlalchemy.ext.asyncio import async_engine_from_configfrom alembic import contextfrom app.models import * # Import all models
def run_migrations_online(): """Run migrations in online mode with async support."""
configuration = config.get_section(config.config_ini_section) configuration["sqlalchemy.url"] = os.getenv("DATABASE_URL")
connectable = async_engine_from_config( configuration, prefix="sqlalchemy.", poolclass=pool.NullPool, future=True )
async def run_async_migrations(): async with connectable.connect() as connection: await connection.run_sync(do_run_migrations)
asyncio.run(run_async_migrations())
def do_run_migrations(connection): context.configure( connection=connection, target_metadata=target_metadata, compare_type=True, # Detect column type changes compare_server_default=True, # Detect default changes )
with context.begin_transaction(): context.run_migrations()"""Add full text search
Revision ID: abc123Revises: def456Create Date: 2024-01-01 00:00:00
"""from alembic import opimport sqlalchemy as sa
def upgrade(): # Create extension op.execute("CREATE EXTENSION IF NOT EXISTS pg_trgm")
# Add GIN index for full-text search op.create_index( "idx_posts_search", "posts", [sa.text("to_tsvector('english', title || ' ' || content)")], postgresql_using="gin" )
# Add computed column op.add_column( "users", sa.Column( "full_name", sa.String(), sa.Computed("first_name || ' ' || last_name", persisted=True) ) )
def downgrade(): op.drop_index("idx_posts_search") op.drop_column("users", "full_name")from typing import Optionalfrom datetime import datetime
class SoftDeleteMixin(ZenithModel): """Mixin for soft delete functionality."""
is_deleted: bool = Field(default=False, index=True) deleted_at: Optional[datetime] = Field(default=None) deleted_by: Optional[int] = Field(default=None)
async def soft_delete(self, user_id: Optional[int] = None): """Mark record as deleted without removing from database.""" self.is_deleted = True self.deleted_at = datetime.utcnow() self.deleted_by = user_id await self.save()
async def restore(self): """Restore soft-deleted record.""" self.is_deleted = False self.deleted_at = None self.deleted_by = None await self.save()
@classmethod def active(cls): """Filter to only non-deleted records.""" return cls.where(is_deleted=False)
@classmethod def deleted(cls): """Filter to only deleted records.""" return cls.where(is_deleted=True)
class Document(SoftDeleteMixin, table=True): id: Optional[int] = Field(primary_key=True) title: str
# Usagedoc = await Document.find(1)await doc.soft_delete(user_id=current_user.id)
# Query only active documentsactive_docs = await Document.active().all()
# Include deleted documentsall_docs = await Document.all()
# Restoreawait doc.restore()class AuditMixin(ZenithModel): """Track all changes to records."""
created_by: Optional[int] = Field(foreign_key="user.id") created_at: datetime = Field(default_factory=datetime.utcnow) updated_by: Optional[int] = Field(foreign_key="user.id") updated_at: datetime = Field(default_factory=datetime.utcnow) version: int = Field(default=1)
@classmethod async def create_with_audit(cls, user_id: int, **data): """Create with audit information.""" instance = cls( **data, created_by=user_id, updated_by=user_id ) return await instance.save()
async def update_with_audit(self, user_id: int, **data): """Update with audit information.""" for key, value in data.items(): setattr(self, key, value)
self.updated_by = user_id self.updated_at = datetime.utcnow() self.version += 1
return await self.save()
# Separate audit log tableclass AuditLog(ZenithModel, table=True): id: Optional[int] = Field(primary_key=True) table_name: str record_id: int action: str # CREATE, UPDATE, DELETE changes: dict = Field(sa_column_kwargs={"type_": JSON}) user_id: int timestamp: datetime = Field(default_factory=datetime.utcnow)
@classmethod async def log_change(cls, model, action: str, user_id: int, changes: dict = None): """Log a change to audit trail.""" await cls.create( table_name=model.__tablename__, record_id=model.id, action=action, changes=changes or {}, user_id=user_id )class TenantMixin(ZenithModel): """Multi-tenant data isolation."""
tenant_id: int = Field(index=True)
@classmethod def for_tenant(cls, tenant_id: int): """Filter by tenant.""" return cls.where(tenant_id=tenant_id)
@classmethod async def create_for_tenant(cls, tenant_id: int, **data): """Create record for specific tenant.""" return await cls.create(tenant_id=tenant_id, **data)
class TenantAwareModel(TenantMixin, table=True): id: Optional[int] = Field(primary_key=True) name: str
__table_args__ = ( # Ensure uniqueness within tenant UniqueConstraint("tenant_id", "name", name="uq_name_per_tenant"), )
# Usage with dependency injectionasync def get_current_tenant(request: Request) -> int: """Extract tenant from request.""" # Could be from subdomain, header, or JWT return request.headers.get("X-Tenant-ID")
@app.get("/items")async def list_items(tenant_id: int = Depends(get_current_tenant)): return await TenantAwareModel.for_tenant(tenant_id).all()import hashlibimport picklefrom functools import wraps
class CachedModel(ZenithModel): """Model with built-in caching."""
@classmethod def cache_key(cls, **filters): """Generate cache key from filters.""" key_str = f"{cls.__name__}:{filters}" return hashlib.md5(key_str.encode()).hexdigest()
@classmethod async def find_cached(cls, id: int, ttl: int = 300): """Find with caching.""" cache_key = f"{cls.__name__}:{id}"
# Try cache first cached = await redis.get(cache_key) if cached: return pickle.loads(cached)
# Load from database instance = await cls.find(id) if instance: await redis.setex( cache_key, ttl, pickle.dumps(instance) )
return instance
async def invalidate_cache(self): """Remove from cache after update.""" cache_key = f"{self.__class__.__name__}:{self.id}" await redis.delete(cache_key)
async def save(self): """Save and invalidate cache.""" result = await super().save() await self.invalidate_cache() return result# JSON operationsfrom sqlalchemy.dialects.postgresql import JSONB
class Configuration(ZenithModel, table=True): settings: dict = Field(sa_column_kwargs={"type_": JSONB})
# Query JSON fieldsconfigs = await Configuration.where( Configuration.settings["theme"] == "dark").all()
# Array fieldsfrom sqlalchemy.dialects.postgresql import ARRAY
class Article(ZenithModel, table=True): tags: List[str] = Field(sa_column_kwargs={"type_": ARRAY(String)})
# Query arraysarticles = await Article.where( Article.tags.contains(["python", "async"])).all()
# Full-text searchfrom sqlalchemy import func
results = await Post.where( func.to_tsvector("english", Post.content).match("python async")).all()
# Window functionsfrom sqlalchemy import select, func
stmt = select( User.id, User.name, func.row_number().over( partition_by=User.department_id, order_by=User.salary.desc() ).label("rank"))# SQLite doesn't support some featuresif settings.DATABASE_URL.startswith("sqlite"): # No concurrent writes engine = create_async_engine( DATABASE_URL, connect_args={"check_same_thread": False}, poolclass=StaticPool # Single connection )
# Limited ALTER TABLE # Can't drop columns, modify constraints
# No arrays or JSON queries # Store as TEXT and parse in Python# Reuse sessions with dependency injectionasync def get_db(): async with async_session() as session: yield session
@app.get("/users")async def list_users(db: AsyncSession = Depends(get_db)): return await db.exec(select(User)).all()
# Use context managersasync with async_session() as session: async with session.begin(): # All operations in transaction await perform_operations(session)# Instead of many individual insertsfor data in user_data_list: await User.create(**data) # BAD: N queries
# Use bulk operationsawait User.bulk_create([ User(**data) for data in user_data_list]) # GOOD: 1 query# Prevent unexpected queriesclass Post(ZenithModel, table=True): # Lazy load by default author: User = Relationship(back_populates="posts")
# Always eager load tags: List[Tag] = Relationship( back_populates="posts", sa_relationship_kwargs={"lazy": "joined"} )from functools import lru_cache
@lru_cache(maxsize=100)async def get_user_cached(user_id: int): """Cache frequently accessed users.""" return await User.find(user_id)
# Clear cache when user updatesget_user_cached.cache_clear()Solution: Increase pool size, ensure sessions are closed properly, use connection pooling
Solution: Add indexes, use eager loading, optimize N+1 queries, analyze with EXPLAIN
Solution: Use WAL mode, limit concurrent writes, consider PostgreSQL for production
Solution: Review migration history, resolve conflicts manually, test migrations in staging
# BAD: String concatenationquery = f"SELECT * FROM users WHERE email = '{email}'"
# BAD: String formattingquery = "SELECT * FROM users WHERE email = '%s'" % email
# GOOD: Parameterized queriesstmt = select(User).where(User.email == email)
# GOOD: With raw SQLfrom sqlalchemy import textstmt = text("SELECT * FROM users WHERE email = :email")result = await session.exec(stmt, {"email": email})from cryptography.fernet import Fernet
class EncryptedField(Field): """Field that encrypts data at rest."""
def __init__(self, *args, **kwargs): self.fernet = Fernet(settings.ENCRYPTION_KEY) super().__init__(*args, **kwargs)
def encrypt(self, value: str) -> bytes: return self.fernet.encrypt(value.encode())
def decrypt(self, value: bytes) -> str: return self.fernet.decrypt(value).decode()
class SecureModel(ZenithModel, table=True): ssn: str = EncryptedField() # Encrypted at restTesting Guide
Learn to test database operations effectively Testing Guide →
Need help? Check our FAQ or ask in GitHub Discussions.