Skip to content

Database Models

ZenithModel provides a streamlined database interface with automatic session management and intuitive query patterns.

ZenithModel simplifies database operations with:

  • Automatic session management - No more async with session() blocks
  • Chainable queries - Intuitive User.where().order_by().limit() patterns
  • Built-in CRUD methods - create(), find(), update(), delete()
  • Type safety - Full IDE support and type checking
  • Automatic configuration - Just inherit and use
Use ZenithModel WhenUse Plain SQLModel When
Building typical web APIsNeed fine-grained transaction control
Want automatic session managementWorking with multiple databases
Need rapid developmentImplementing complex stored procedures
Standard CRUD operationsRaw SQL performance optimization
Request-scoped sessions workNeed connection pooling customization
# ZenithModel provides intuitive, chainable queries
async def get_active_users():
# One line that reads like English:
# "Get users where active is true, order by created_at descending, limit to 10"
return await User.where(active=True).order_by('-created_at').limit(10).all()
# Features:
# • Automatic session management
# • Type-safe query building
# • No boilerplate code
# • Clean, readable syntax
from zenith.db import ZenithModel # Our enhanced model base class
from sqlmodel import Field # Field definitions from SQLModel
from datetime import datetime
class User(ZenithModel, table=True):
"""User model with automatic session management.
Just by inheriting from ZenithModel, you get:
- Automatic session management (no more async with blocks!)
- Built-in CRUD methods (create, find, update, delete)
- Chainable queries (where, order_by, limit)
- Type safety and IDE autocomplete
- Automatic JSON serialization
"""
# Primary key - automatically generated by database
id: int | None = Field(primary_key=True)
# Unique email with database index for fast lookups
email: str = Field(
unique=True, # Database constraint: no duplicates
index=True # Create index for fast WHERE email = ? queries
)
# Required field with length constraint
name: str = Field(
max_length=100 # Prevents overly long names
)
# Boolean with default value
active: bool = Field(
default=True # New users are active by default
)
# Timestamp automatically set on creation
created_at: datetime = Field(
default_factory=datetime.utcnow # Function called on insert
)
# That's it! No __init__, no session management, no boilerplate
# 📝 CREATE - Insert new records
user = await User.create(
email="alice@example.com",
name="Alice Smith"
# Note: 'active' defaults to True
# Note: 'created_at' is set automatically
)
# Returns: User instance with generated ID
# Behind the scenes: INSERT INTO user (email, name, active, created_at) VALUES (...)
# 🔍 FIND - Retrieve single records
user = await User.find(1) # Returns None if not found
user = await User.find_or_404(1) # Raises HTTPException(404) if not found
# When to use which:
# - find(): When absence is normal (e.g., checking if username taken)
# - find_or_404(): When absence is an error (e.g., viewing user profile)
# 📄 QUERY - Retrieve multiple records
# Simple query
active_users = await User.where(active=True).all()
# Chained query (reads like English!)
recent_users = await (
User.where(active=True) # Filter: only active users
.order_by('-created_at') # Sort: newest first (- means DESC)
.limit(5) # Limit: only 5 results
.all() # Execute: get all matching records
)
# Note: order_by() validates column names to prevent SQL injection
# Using invalid column names will raise ValueError with available columns
# ✏️ UPDATE - Modify existing records
await user.update(
name="Alice Johnson",
active=False # Can update multiple fields at once
)
# Behind the scenes: UPDATE user SET name = ?, active = ? WHERE id = ?
# 🗑️ DELETE - Remove records
await user.delete()
# Behind the scenes: DELETE FROM user WHERE id = ?
# Note: This is permanent! Consider soft deletes for important data
# 📊 AGGREGATE - Count and statistics
user_count = await User.where(active=True).count()
total_users = await User.count() # Count all
# 🎢 CHAINING - Build complex queries step by step
query = User.where(active=True)
if search_term:
query = query.where(User.name.contains(search_term))
if min_age:
query = query.where(User.age >= min_age)
users = await query.order_by('name').all()
from zenith import Zenith
from pydantic import BaseModel
app = Zenith()
# Input validation model
class UserCreate(BaseModel):
email: str
name: str
# GET /users - List all active users
@app.get("/users")
async def list_users():
"""
Notice what's NOT here:
- No session: AsyncSession parameter
- No async with get_session() as session
- No dependency injection for database
- No transaction management
ZenithModel handles ALL of this automatically!
"""
# Just write your query - it reads like English
users = await User.where(active=True).all()
# Convert to dict for JSON serialization
# Use explicit response models to exclude sensitive fields
return {"users": [u.model_dump() for u in users]}
# GET /users/{id} - Get specific user
@app.get("/users/{user_id}")
async def get_user(user_id: int):
"""
find_or_404 pattern:
- If found: returns the user
- If not found: raises HTTPException(404)
- No manual error handling needed!
"""
user = await User.find_or_404(user_id)
# Best practice: Use explicit response models to exclude sensitive fields
return user.model_dump()
# POST /users - Create new user
@app.post("/users")
async def create_user(data: UserCreate):
"""
Creating records is simple:
1. Validate input with Pydantic (automatic)
2. Create record with ZenithModel
3. Return created record
"""
# Check if email already exists (business rule)
existing = await User.where(email=data.email).first()
if existing:
raise HTTPException(409, "Email already registered")
# Create the user
user = await User.create(
**data.model_dump() # Unpack all fields from Pydantic model
)
# Return 201 Created with the new user
return {"user": user.model_dump()}, 201
# PUT /users/{id} - Update user
@app.put("/users/{user_id}")
async def update_user(user_id: int, data: UserUpdate):
# Find the user
user = await User.find_or_404(user_id)
# Update only provided fields
await user.update(**data.model_dump(exclude_unset=True))
return user.model_dump()
# DELETE /users/{id} - Delete user
@app.delete("/users/{user_id}")
async def delete_user(user_id: int):
user = await User.find_or_404(user_id)
# Soft delete (recommended) vs hard delete
# await user.update(deleted_at=datetime.utcnow()) # Soft delete
await user.delete() # Hard delete
return {"message": "User deleted"}, 204
@app.get("/users")
async def list_users(
page: int = 1, # Current page (1-indexed for better UX)
per_page: int = 20 # Items per page (default 20, max should be 100)
):
"""
Pagination prevents memory issues and improves response times.
URL examples:
- /users -> Page 1, 20 items
- /users?page=2 -> Page 2, 20 items
- /users?page=3&per_page=50 -> Page 3, 50 items
"""
# Calculate offset for database query
# Page 1 -> offset 0, Page 2 -> offset 20, etc.
offset = (page - 1) * per_page
# Protect against too large page sizes
per_page = min(per_page, 100) # Cap at 100 items
# Build the paginated query
users = await (
User.where(active=True) # Filter condition
.order_by('-created_at') # Sort order (consistent pagination!)
.offset(offset) # Skip previous pages
.limit(per_page) # Take only current page
.all() # Execute query
)
# Get total count for pagination metadata
# This is a separate query but necessary for UI
total = await User.where(active=True).count()
# Calculate total pages (ceiling division)
total_pages = (total + per_page - 1) // per_page
# Return paginated response with metadata
return {
"users": [u.model_dump() for u in users],
"pagination": {
"page": page,
"per_page": per_page,
"total": total,
"pages": total_pages,
"has_next": page < total_pages,
"has_prev": page > 1
}
}
# Alternative: Cursor-based pagination (better for real-time data)
@app.get("/users/feed")
async def user_feed(
cursor: int = None, # Last seen user ID
limit: int = 20
):
"""Cursor pagination - better for feeds that update frequently."""
query = User.where(active=True)
# Continue from cursor if provided
if cursor:
query = query.where(User.id < cursor) # IDs before cursor
users = await query.order_by('-id').limit(limit).all()
# Next cursor is the last item's ID
next_cursor = users[-1].id if users else None
return {
"users": [u.model_dump() for u in users],
"next_cursor": next_cursor
}
@app.get("/users/search")
async def search_users(
q: str = None,
active: bool = None,
role: str = None
):
query = User.query()
if q:
query = query.where(User.name.contains(q) | User.email.contains(q))
if active is not None:
query = query.where(active=active)
if role:
query = query.where(role=role)
users = await query.order_by('-created_at').all()
return {"users": [u.model_dump() for u in users]}
from sqlmodel import Relationship
class Post(ZenithModel, table=True):
"""Blog post with author relationship."""
id: int | None = Field(primary_key=True)
title: str
content: str
# Foreign key to User table
author_id: int = Field(
foreign_key="user.id" # References user.id column
)
# Relationship property for ORM navigation
author: User = Relationship()
# Reverse relationship (optional)
comments: list["Comment"] = Relationship(back_populates="post")
# BAD: N+1 Query Problem
async def bad_example():
posts = await Post.where(published=True).all() # 1 query
for post in posts:
# Each iteration triggers a new query!
author = await User.find(post.author_id) # N queries
print(f"{post.title} by {author.name}")
# Total: 1 + N queries (bad for performance!)
# GOOD: Eager Loading with includes()
async def good_example():
# Load posts WITH their authors in a single query
posts = await (
Post.includes('author') # JOIN with User table
.where(published=True)
.all()
)
# SQL: SELECT * FROM post LEFT JOIN user ON post.author_id = user.id
# Now accessing author doesn't trigger queries
for post in posts:
print(f"{post.title} by {post.author.name}") # No extra queries!
# Total: 1 query (much better!)
# Multiple relationships
posts = await (
Post.includes('author', 'comments') # Load both relationships
.where(published=True)
.all()
)
# Nested relationships
posts = await (
Post.includes('comments.author') # Load comments and their authors
.where(published=True)
.all()
)
# Conditional relationship loading
if include_author:
query = Post.includes('author')
else:
query = Post.query()
posts = await query.where(published=True).all()
# Bulk create
users = await User.bulk_create([
{"email": "user1@example.com", "name": "User 1"},
{"email": "user2@example.com", "name": "User 2"},
{"email": "user3@example.com", "name": "User 3"},
])
# Bulk update
await User.where(role="trial").update_all(role="free")
# Bulk delete
await User.where(active=False, created_at__lt=thirty_days_ago).delete_all()
from pydantic import validator, EmailStr
class User(ZenithModel, table=True):
email: EmailStr = Field(unique=True)
age: int = Field(ge=13, le=120)
@validator('email')
def validate_email_domain(cls, v):
allowed = ['company.com', 'partner.org']
domain = v.split('@')[1]
if domain not in allowed:
raise ValueError(f'Email domain must be one of: {allowed}')
return v.lower()
class User(ZenithModel, table=True):
email: str
password_hash: str = Field(exclude=True) # Never serialize
def to_public_dict(self) -> dict:
"""Control what gets exposed."""
return {
'id': self.id,
'email': self.email,
'created_at': self.created_at.isoformat()
}
class SoftDeleteModel(ZenithModel):
deleted_at: datetime | None = None
@classmethod
def active(cls):
"""Query only non-deleted records."""
return cls.where(deleted_at=None)
async def soft_delete(self):
"""Mark as deleted instead of removing."""
await self.update(deleted_at=datetime.utcnow())
class User(SoftDeleteModel, table=True):
email: str
name: str
# Use soft delete pattern
active_users = await User.active().all()
await user.soft_delete() # Doesn't actually delete
  • Use ZenithModel for standard web applications
  • Leverage built-in methods for common operations
  • Add custom validation with Pydantic validators
  • Create base models for shared fields
  • Use type hints for better IDE support
  • Don’t put business logic in models
  • Don’t expose sensitive fields in serialization
  • Don’t use for complex multi-database transactions
  • Don’t ignore validation errors
  • Don’t bypass the ORM for simple queries

“No session available”

  • Ensure you’re using ZenithModel, not SQLModel
  • Check that the Zenith app middleware is configured
  • Verify you’re in an async request context

“Relationship not loaded”

  • Use .includes('relationship') for eager loading
  • Check foreign key definitions
  • Ensure related models are imported

“Validation error on create”

  • Check field constraints match your data
  • Verify required fields are provided
  • Look for custom validator errors
  1. Use select_related for joins

    posts = await Post.includes('author', 'comments').all()
  2. Paginate large datasets

    users = await User.paginate(page=1, per_page=20)
  3. Index frequently queried fields

    email: str = Field(unique=True, index=True)
  4. Use bulk operations

    await User.bulk_create(user_list)
import pytest
from zenith.testing import TestClient
@pytest.mark.asyncio
async def test_user_creation():
async with TestClient(app) as client:
response = await client.post("/users", json={
"email": "test@company.com",
"name": "Test User"
})
assert response.status_code == 201
user = response.json()["user"]
assert user["email"] == "test@company.com"
@pytest.mark.asyncio
async def test_user_validation():
with pytest.raises(ValueError, match="Email domain"):
User(email="test@invalid.com", name="Test")
# Before: SQLAlchemy
session.query(User).filter(User.active == True).limit(10).all()
# After: ZenithModel
await User.where(active=True).limit(10).all()
# Traditional ORM pattern (conventional approach)
User.objects.filter(active=True).order_by('-created_at')[:10]
# ZenithModel - Same intuitive syntax, async-first
await User.where(active=True).order_by('-created_at').limit(10).all()
# The syntax is familiar to developers from various backgrounds
# while providing modern async capabilities

Learn more in the complete documentation or explore example applications.