Skip to content

Part 2: Data Models

In Part 1, we stored tasks in memory - not very useful for a real API! Now we’ll add a proper database and explore how ZenithModel streamlines traditional ORM setup.

By the end of this part, you’ll understand:

  • Setting up a database with Zenith
  • Creating models with ZenithModel
  • Defining relationships between models
  • Running database migrations
  • Validating data with Pydantic
  • Querying data efficiently
# Traditional SQLAlchemy approach
from sqlalchemy import create_engine, Column, Integer, String, Boolean, DateTime, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship, Session
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from datetime import datetime
import os
# Manual database setup
DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:///./taskflow.db")
engine = create_async_engine(DATABASE_URL)
AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
Base = declarative_base()
# Manual model definitions with verbose syntax
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
email = Column(String, unique=True, index=True)
hashed_password = Column(String)
full_name = Column(String)
is_active = Column(Boolean, default=True)
created_at = Column(DateTime, default=datetime.utcnow)
# Manual relationship definition
projects = relationship("Project", back_populates="owner")
class Project(Base):
__tablename__ = "projects"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, index=True)
description = Column(String)
owner_id = Column(Integer, ForeignKey("users.id"))
created_at = Column(DateTime, default=datetime.utcnow)
# Manual relationships
owner = relationship("User", back_populates="projects")
tasks = relationship("Task", back_populates="project")
# Manual dependency injection for database sessions
async def get_db():
async with AsyncSessionLocal() as session:
yield session
# Manual CRUD operations
async def create_user(db: AsyncSession, email: str, password: str):
# Hash password manually
hashed_password = hash_password(password)
db_user = User(email=email, hashed_password=hashed_password)
db.add(db_user)
await db.commit()
await db.refresh(db_user)
return db_user
# This is just the beginning - you need hundreds more lines!
# With ZenithModel - same functionality, cleaner and more concise
from zenith import Zenith
from zenith.db import ZenithModel
from sqlmodel import Field, Relationship
from datetime import datetime
from typing import Optional, List
# Zero-config database setup
app = Zenith() # Database automatically configured!
# Clean model definitions with built-in methods
class User(ZenithModel, table=True):
id: Optional[int] = Field(primary_key=True)
email: str = Field(unique=True, index=True)
full_name: str
is_active: bool = Field(default=True)
created_at: datetime = Field(default_factory=datetime.now)
# Clean relationship syntax
projects: List["Project"] = Relationship(back_populates="owner")
class Project(ZenithModel, table=True):
id: Optional[int] = Field(primary_key=True)
name: str = Field(index=True)
description: Optional[str] = None
owner_id: int = Field(foreign_key="user.id")
created_at: datetime = Field(default_factory=datetime.now)
# Relationships are simple and type-safe
owner: Optional[User] = Relationship(back_populates="projects")
tasks: List["Task"] = Relationship(back_populates="project")
# Built-in CRUD operations - no manual session management!
@app.post("/users")
async def create_user(email: str, full_name: str, password: str):
# One line to create and save - password hashing automatic!
user = await User.create(
email=email,
full_name=full_name,
password=password # ZenithModel handles hashing!
)
return {"user": user.model_dump()}
@app.get("/users/{user_id}/projects")
async def get_user_projects(user_id: int):
# Chainable queries with relationships
projects = await (
Project.where(owner_id=user_id)
.includes('tasks') # Eager load tasks
.order_by('-created_at')
.all()
)
return {"projects": [p.model_dump() for p in projects]}
# No manual session management needed - ZenithModel handles it!
# No manual CRUD functions - built into the models!
# Clean, integrated approach

ZenithModel provides clean, type-safe models with built-in functionality.

Terminal window
# Database drivers are included with Zenith
# Optional: Add migration tools
# Using uv
uv add alembic # For advanced migrations (optional)
# Or using pip
pip install alembic

Update app/config.py:

import os
from enum import Enum
class Environment(str, Enum):
"""Application environments."""
DEVELOPMENT = "development"
TESTING = "testing"
PRODUCTION = "production"
class Settings:
"""Enhanced configuration with database settings."""
# Environment
ENVIRONMENT: Environment = Environment(
os.getenv("ENVIRONMENT", "development")
)
# Database
DATABASE_URL: str = os.getenv(
"DATABASE_URL",
"sqlite+aiosqlite:///./taskflow.db" # Async SQLite for dev
)
# For production PostgreSQL:
# DATABASE_URL = "postgresql+asyncpg://user:pass@localhost/taskflow"
# Database pool settings (for production)
DATABASE_POOL_SIZE: int = int(os.getenv("DATABASE_POOL_SIZE", "5"))
DATABASE_MAX_OVERFLOW: int = int(os.getenv("DATABASE_MAX_OVERFLOW", "10"))
DATABASE_POOL_TIMEOUT: int = int(os.getenv("DATABASE_POOL_TIMEOUT", "30"))
@property
def is_production(self) -> bool:
return self.ENVIRONMENT == Environment.PRODUCTION
@property
def is_testing(self) -> bool:
return self.ENVIRONMENT == Environment.TESTING
# Database configuration based on environment
def get_database_url(self) -> str:
"""Get database URL with proper async driver."""
if "sqlite" in self.DATABASE_URL:
# Ensure async SQLite driver
return self.DATABASE_URL.replace("sqlite://", "sqlite+aiosqlite://")
elif "postgresql" in self.DATABASE_URL:
# Ensure async PostgreSQL driver
return self.DATABASE_URL.replace("postgresql://", "postgresql+asyncpg://")
return self.DATABASE_URL
settings = Settings()

Create app/database.py:

"""
Database configuration and session management.
This module sets up the database engine, session factory,
and base model class for all our models.
"""
from sqlmodel import SQLModel, create_engine, Session
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
from app.config import settings
from zenith.db import ZenithModel
import logging
logger = logging.getLogger(__name__)
# Create async engine
engine = create_async_engine(
settings.get_database_url(),
echo=settings.DEBUG, # Log SQL queries in debug mode
future=True,
pool_size=settings.DATABASE_POOL_SIZE,
max_overflow=settings.DATABASE_MAX_OVERFLOW,
pool_timeout=settings.DATABASE_POOL_TIMEOUT,
)
# Create async session factory
async_session = sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False # Don't expire objects after commit
)
async def init_db():
"""
Initialize database tables.
In production, use Alembic migrations instead!
"""
async with engine.begin() as conn:
# Create all tables - Note: In Zenith, use app.database.create_all()
await conn.run_sync(SQLModel.metadata.create_all)
logger.info("Database tables created successfully")
async def get_session() -> AsyncSession:
"""
Get database session.
This is a dependency that will be injected into route handlers.
"""
async with async_session() as session:
try:
yield session
finally:
await session.close()
# For Zenith's automatic session management
from zenith import Session
from sqlalchemy.ext.asyncio import AsyncSession
# Session is a pre-configured dependency for database access

Now let’s create our models. We’ll build a complete task management system with users, projects, and tasks.

Create app/models/user.py:

"""
User model for authentication and ownership.
Users can create projects and tasks, and collaborate with others.
"""
from zenith.db import ZenithModel, Field
from typing import Optional, List
from datetime import datetime
from enum import Enum
import re
class UserRole(str, Enum):
"""User roles for authorization."""
ADMIN = "admin"
USER = "user"
GUEST = "guest"
class User(ZenithModel, table=True):
"""
User database model with Zenith's enhanced patterns.
ZenithModel provides automatic session management and intuitive query methods.
"""
__tablename__ = "users"
id: Optional[int] = Field(default=None, primary_key=True)
# User information
email: str = Field(
unique=True,
index=True,
description="User's email address"
)
username: str = Field(
unique=True,
index=True,
min_length=3,
max_length=50,
description="Unique username"
)
full_name: Optional[str] = Field(
default=None,
max_length=100,
description="User's full name"
)
is_active: bool = Field(
default=True,
description="Is user account active"
)
role: UserRole = Field(
default=UserRole.USER,
description="User role for permissions"
)
# Authentication
password_hash: str = Field(description="Bcrypt password hash")
# Timestamps - ZenithModel can auto-manage these
created_at: datetime = Field(
default_factory=datetime.utcnow,
description="When user registered"
)
updated_at: Optional[datetime] = Field(
default=None,
description="Last profile update"
)
last_login: Optional[datetime] = Field(
default=None,
description="Last successful login"
)
# Relationships - these create foreign keys
projects: List["Project"] = Relationship(
back_populates="owner",
cascade_delete=True # Delete projects when user is deleted
)
assigned_tasks: List["Task"] = Relationship(
back_populates="assignee",
sa_relationship_kwargs={"foreign_keys": "[Task.assignee_id]"}
)
created_tasks: List["Task"] = Relationship(
back_populates="creator",
sa_relationship_kwargs={"foreign_keys": "[Task.creator_id]"}
)
def __repr__(self) -> str:
return f"<User {self.username}>"
class Config:
# Pydantic config
validate_assignment = True # Validate on attribute assignment
use_enum_values = True # Use enum values not objects
# Models for API requests/responses
class UserCreate(UserBase):
"""Model for creating a new user."""
password: str = Field(min_length=8, max_length=100)
class UserUpdate(SQLModel):
"""Model for updating user - all fields optional."""
email: Optional[str] = None
username: Optional[str] = None
full_name: Optional[str] = None
password: Optional[str] = None
class UserResponse(UserBase):
"""Model for API responses - no password."""
id: int
created_at: datetime
projects_count: int = 0
tasks_count: int = 0

Create app/models/project.py:

"""
Project model for organizing tasks.
Projects group related tasks and can have multiple collaborators.
"""
from sqlmodel import Field, Relationship, SQLModel
from typing import Optional, List
from datetime import datetime
from enum import Enum
class ProjectStatus(str, Enum):
"""Project lifecycle states."""
PLANNING = "planning"
ACTIVE = "active"
ON_HOLD = "on_hold"
COMPLETED = "completed"
ARCHIVED = "archived"
class ProjectBase(SQLModel):
"""Base project properties."""
name: str = Field(
min_length=1,
max_length=200,
description="Project name"
)
description: Optional[str] = Field(
default=None,
max_length=1000,
description="Project description"
)
status: ProjectStatus = Field(
default=ProjectStatus.PLANNING,
description="Current project status"
)
color: Optional[str] = Field(
default="#3B82F6", # Nice blue
regex=r"^#[0-9A-Fa-f]{6}$",
description="Project color for UI"
)
is_public: bool = Field(
default=False,
description="Is project visible to non-members"
)
class Project(ProjectBase, table=True):
"""Project database model."""
__tablename__ = "projects"
id: Optional[int] = Field(default=None, primary_key=True)
# Foreign keys
owner_id: int = Field(
foreign_key="users.id",
description="Project owner/creator"
)
# Timestamps
created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
# Relationships
owner: "User" = Relationship(back_populates="projects")
tasks: List["Task"] = Relationship(
back_populates="project",
cascade_delete=True
)
# Statistics (computed properties)
@property
def tasks_count(self) -> int:
"""Total number of tasks."""
return len(self.tasks) if self.tasks else 0
@property
def completed_tasks_count(self) -> int:
"""Number of completed tasks."""
if not self.tasks:
return 0
return sum(1 for task in self.tasks if task.is_completed)
@property
def completion_percentage(self) -> float:
"""Project completion percentage."""
if not self.tasks:
return 0.0
if self.tasks_count == 0:
return 0.0
return (self.completed_tasks_count / self.tasks_count) * 100
class Config:
validate_assignment = True
use_enum_values = True
# API models
class ProjectCreate(ProjectBase):
"""Model for creating a project."""
pass
class ProjectUpdate(SQLModel):
"""Model for updating a project."""
name: Optional[str] = None
description: Optional[str] = None
status: Optional[ProjectStatus] = None
color: Optional[str] = None
is_public: Optional[bool] = None
class ProjectResponse(ProjectBase):
"""Model for API responses."""
id: int
owner_id: int
created_at: datetime
tasks_count: int = 0
completed_tasks_count: int = 0
completion_percentage: float = 0.0

Create app/models/task.py:

"""
Task model - the core of our application.
Tasks belong to projects and can be assigned to users.
"""
from sqlmodel import Field, Relationship, SQLModel
from typing import Optional, List
from datetime import datetime, date
from enum import Enum
class TaskPriority(str, Enum):
"""Task priority levels."""
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
URGENT = "urgent"
class TaskStatus(str, Enum):
"""Task workflow states."""
TODO = "todo"
IN_PROGRESS = "in_progress"
IN_REVIEW = "in_review"
DONE = "done"
CANCELLED = "cancelled"
class TaskBase(SQLModel):
"""Base task properties."""
title: str = Field(
min_length=1,
max_length=200,
description="Task title"
)
description: Optional[str] = Field(
default=None,
max_length=5000,
description="Detailed task description"
)
priority: TaskPriority = Field(
default=TaskPriority.MEDIUM,
description="Task priority"
)
status: TaskStatus = Field(
default=TaskStatus.TODO,
description="Current status"
)
due_date: Optional[date] = Field(
default=None,
description="Task deadline"
)
estimated_hours: Optional[float] = Field(
default=None,
ge=0,
le=1000,
description="Estimated hours to complete"
)
class Task(TaskBase, table=True):
"""Task database model."""
__tablename__ = "tasks"
id: Optional[int] = Field(default=None, primary_key=True)
# Foreign keys
project_id: int = Field(
foreign_key="projects.id",
description="Parent project"
)
creator_id: int = Field(
foreign_key="users.id",
description="Who created the task"
)
assignee_id: Optional[int] = Field(
default=None,
foreign_key="users.id",
description="Who is assigned to the task"
)
# Timestamps
created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: Optional[datetime] = None
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
# Relationships
project: "Project" = Relationship(back_populates="tasks")
creator: "User" = Relationship(
back_populates="created_tasks",
sa_relationship_kwargs={"foreign_keys": "[Task.creator_id]"}
)
assignee: Optional["User"] = Relationship(
back_populates="assigned_tasks",
sa_relationship_kwargs={"foreign_keys": "[Task.assignee_id]"}
)
# Computed properties
@property
def is_completed(self) -> bool:
"""Check if task is done."""
return self.status == TaskStatus.DONE
@property
def is_overdue(self) -> bool:
"""Check if task is past due date."""
if not self.due_date:
return False
if self.is_completed:
return False
return date.today() > self.due_date
@property
def days_until_due(self) -> Optional[int]:
"""Days remaining until due date."""
if not self.due_date:
return None
if self.is_completed:
return None
delta = self.due_date - date.today()
return delta.days
class Config:
validate_assignment = True
use_enum_values = True
# API models
class TaskCreate(TaskBase):
"""Model for creating a task."""
project_id: int
assignee_id: Optional[int] = None
class TaskUpdate(SQLModel):
"""Model for updating a task."""
title: Optional[str] = None
description: Optional[str] = None
priority: Optional[TaskPriority] = None
status: Optional[TaskStatus] = None
due_date: Optional[date] = None
estimated_hours: Optional[float] = None
assignee_id: Optional[int] = None
class TaskResponse(TaskBase):
"""Model for API responses."""
id: int
project_id: int
creator_id: int
assignee_id: Optional[int]
created_at: datetime
is_completed: bool
is_overdue: bool
days_until_due: Optional[int]
project_name: str = ""
assignee_name: Optional[str] = None

Create app/models/__init__.py to export all models:

"""
Data models for TaskFlow API.
This module exports all models for easy importing.
"""
from app.models.user import (
User,
UserCreate,
UserUpdate,
UserResponse,
UserRole
)
from app.models.project import (
Project,
ProjectCreate,
ProjectUpdate,
ProjectResponse,
ProjectStatus
)
from app.models.task import (
Task,
TaskCreate,
TaskUpdate,
TaskResponse,
TaskPriority,
TaskStatus
)
# This allows: from app.models import User, Project, Task
__all__ = [
# User models
"User",
"UserCreate",
"UserUpdate",
"UserResponse",
"UserRole",
# Project models
"Project",
"ProjectCreate",
"ProjectUpdate",
"ProjectResponse",
"ProjectStatus",
# Task models
"Task",
"TaskCreate",
"TaskUpdate",
"TaskResponse",
"TaskPriority",
"TaskStatus",
]

Now let’s set up Alembic for database migrations. This tracks schema changes over time.

Terminal window
# Initialize Alembic in your project
alembic init migrations

Update alembic.ini:

# Update the database URL line
sqlalchemy.url = sqlite:///./taskflow.db
# For async support, update:
[alembic]
# Add async template
script_location = migrations
compare_type = true
compare_server_default = true

Update migrations/env.py:

"""Alembic environment configuration."""
from logging.config import fileConfig
from sqlalchemy import pool
from sqlalchemy.ext.asyncio import async_engine_from_config
from alembic import context
from app.database import SQLModel
import asyncio
# Import all models so Alembic can detect them
from app.models import * # noqa
# Alembic Config object
config = context.config
# Configure logging
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# Model metadata for autogenerate
target_metadata = SQLModel.metadata
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode."""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def do_run_migrations(connection):
"""Execute migrations."""
context.configure(connection=connection, target_metadata=target_metadata)
with context.begin_transaction():
context.run_migrations()
async def run_async_migrations():
"""Run migrations in async mode."""
from app.config import settings
configuration = config.get_section(config.config_ini_section)
configuration["sqlalchemy.url"] = settings.get_database_url()
connectable = async_engine_from_config(
configuration,
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
await connectable.dispose()
def run_migrations_online() -> None:
"""Run migrations in 'online' mode."""
asyncio.run(run_async_migrations())
# Determine which mode to run in
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()
Terminal window
# Create initial migration
alembic revision --autogenerate -m "Initial models: users, projects, tasks"
# Apply the migration
alembic upgrade head

Let’s update app/main.py to use our new models:

"""Enhanced TaskFlow API with database models."""
from zenith import Zenith, DB
from app.config import settings
from app.database import init_db
from app.models import User, Project, Task, ProjectCreate, TaskCreate
from typing import List
app = Zenith(
title=settings.APP_NAME,
version=settings.APP_VERSION,
debug=settings.DEBUG
)
@app.on_event("startup")
async def startup():
"""Initialize database on startup."""
await init_db()
print("Database initialized")
# Test endpoint to create sample data
@app.post("/seed")
async def seed_database(session: AsyncSession = Session):
"""
Seed database with sample data for testing.
WARNING: This is for development only!
"""
# Create a test user
user = User(
email="alice@example.com",
username="alice",
full_name="Alice Johnson",
password_hash="hashed_password_here" # In reality, hash properly!
)
db.add(user)
await db.commit()
# Create a project
project = Project(
name="Learn Zenith",
description="Complete the Zenith tutorial",
owner_id=user.id
)
db.add(project)
await db.commit()
# Create some tasks
tasks = [
Task(
title="Complete Part 1",
project_id=project.id,
creator_id=user.id,
status="done"
),
Task(
title="Complete Part 2",
project_id=project.id,
creator_id=user.id,
status="in_progress"
),
Task(
title="Complete Part 3",
project_id=project.id,
creator_id=user.id,
assignee_id=user.id
),
]
for task in tasks:
db.add(task)
await db.commit()
return {
"message": "Database seeded successfully",
"data": {
"users": 1,
"projects": 1,
"tasks": len(tasks)
}
}
@app.get("/users")
async def list_users(session: AsyncSession = Session) -> List[User]:
"""List all users."""
users = await session.exec(select(User))
return users.all()
@app.get("/projects")
async def list_projects(session: AsyncSession = Session) -> List[Project]:
"""List all projects with task counts."""
projects = await session.exec(
select(Project).options(selectinload(Project.tasks))
)
return projects.all()
@app.get("/tasks")
async def list_tasks(
status: Optional[str] = None,
session: AsyncSession = Session
) -> List[Task]:
"""List tasks with optional status filter."""
query = select(Task)
if status:
query = query.where(Task.status == status)
tasks = await db.exec(query)
return tasks.all()

Test the new endpoints:

Terminal window
# Seed the database
curl -X POST http://localhost:8000/seed
# List users
curl http://localhost:8000/users
# List projects
curl http://localhost:8000/projects
# List tasks
curl http://localhost:8000/tasks?status=todo

Our models have three types of relationships:

# One user owns many projects
user.projects # List of projects owned by user
project.owner # The user who owns this project
# One project contains many tasks
project.tasks # List of tasks in project
task.project # The project this task belongs to

Many-to-One: Tasks → User (two relationships)

Section titled “Many-to-One: Tasks → User (two relationships)”
# Tasks have a creator and optional assignee
task.creator # User who created the task
task.assignee # User assigned to the task (optional)

Here are common query patterns you’ll use:

from sqlmodel import select, and_, or_
# Get user with all their projects
user = await db.exec(
select(User)
.where(User.id == 1)
.options(selectinload(User.projects))
)
# Get tasks assigned to a user
tasks = await db.exec(
select(Task)
.where(Task.assignee_id == user.id)
.where(Task.status != "done")
)
# Complex query: Get overdue tasks in active projects
overdue_tasks = await db.exec(
select(Task)
.join(Project)
.where(
and_(
Task.due_date < date.today(),
Task.status != "done",
Project.status == "active"
)
)
)
# Count tasks per project
from sqlmodel import func
task_counts = await db.exec(
select(
Project.name,
func.count(Task.id).label("task_count")
)
.join(Task)
.group_by(Project.id)
)

Our models use Pydantic validation automatically:

# This will raise ValidationError
user = UserCreate(
email="invalid-email", # Invalid email format
username="a", # Too short (min 3 chars)
password="123" # Too short (min 8 chars)
)
# This passes validation
user = UserCreate(
email="alice@example.com",
username="alice",
password="SecurePass123!"
)

Common validations:

  • Field(min_length=X, max_length=Y) - String length
  • Field(ge=X, le=Y) - Number range (greater/less than or equal)
  • Field(regex=r"pattern") - Regex pattern matching
  • Field(unique=True) - Database uniqueness constraint
  • Custom validators with @validator decorator

In this part, you’ve: Set up a database connection Created complex data models with relationships Learned about database migrations Implemented data validation Understood different relationship types Written complex queries

Solution: Either drop the database and recreate, or use migrations:

Terminal window
# Drop all tables (loses data!)
rm taskflow.db
# Then recreate
alembic upgrade head

Solution: Use eager loading:

# Eager load relationships
query = select(Project).options(selectinload(Project.tasks))

Solution: Catch and format validation errors:

from pydantic import ValidationError
try:
user = UserCreate(**data)
except ValidationError as e:
return {"errors": e.errors()}

In Part 3: CRUD Operations, we’ll:

  • Build complete CRUD endpoints
  • Implement proper error handling
  • Add pagination and filtering
  • Learn about transactions
  • Create service layers

Questions? Check our Database Guide for more details.