Skip to content

Part 3: CRUD Operations

Now that we have our database models, let’s build the complete CRUD (Create, Read, Update, Delete) operations for our TaskFlow API. You’ll see how ZenithModel transforms verbose, error-prone traditional CRUD into clean, intuitive operations.

By the end of this part, you’ll have:

  • Complete CRUD for users, projects, and tasks
  • Proper error handling with custom exceptions
  • Pagination and filtering for list endpoints
  • Transaction handling for data consistency
  • Service layer for business logic
  • Input validation and sanitization

Let’s explore Zenith’s approach to CRUD operations:

ZenithModel - Clean and Type-Safe

# Creating a user - so many manual steps!
from passlib.context import CryptContext
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
@app.post("/users")
async def create_user(
email: str,
password: str,
full_name: str,
db: AsyncSession = Depends(get_db) # Manual dependency injection
):
# Manual password hashing
hashed_password = pwd_context.hash(password)
# Manual existence check
stmt = select(User).where(User.email == email)
result = await db.execute(stmt)
if result.first():
raise HTTPException(status_code=400, detail="Email already exists")
# Manual model creation
user = User(
email=email,
hashed_password=hashed_password,
full_name=full_name
)
# Manual session management
db.add(user)
try:
await db.commit()
await db.refresh(user)
except Exception as e:
await db.rollback()
raise HTTPException(status_code=500, detail="Database error")
return user
# Finding users - verbose and error-prone
@app.get("/users/{user_id}")
async def get_user(user_id: int, db: AsyncSession = Depends(get_db)):
stmt = select(User).where(User.id == user_id)
result = await db.execute(stmt)
user = result.scalar_one_or_none()
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
# Pagination - complex and repetitive
@app.get("/users")
async def list_users(
skip: int = 0,
limit: int = 100,
db: AsyncSession = Depends(get_db)
):
# Count query for pagination
count_stmt = select(func.count()).select_from(User)
total_result = await db.execute(count_stmt)
total = total_result.scalar()
# Data query
query = select(User).where(User.is_active == True)
query = query.offset(skip).limit(limit)
result = await db.execute(query)
users = result.scalars().all()
return {"users": users, "total": total, "skip": skip, "limit": limit}
# This is just 3 endpoints and already ~80 lines!
# Imagine multiply this by every model in your app...

ZenithModel - Clean and Intuitive

# Creating a user - one clean line!
@app.post("/users")
async def create_user(email: str, password: str, full_name: str):
"""Create a new user with automatic validation and security."""
# ZenithModel handles:
# - Password hashing automatically
# - Duplicate email checking
# - Session management
# - Error handling
# - Transaction rollback on failure
user = await User.create(
email=email,
password=password, # Automatically hashed with bcrypt
full_name=full_name
)
return {"user": user.model_dump()} # Built-in serialization
# Finding users - simple and safe
@app.get("/users/{user_id}")
async def get_user(user_id: int):
"""Get user with automatic 404 handling."""
# find_or_404() automatically raises HTTP 404 if not found
user = await User.find_or_404(user_id)
return {"user": user.model_dump()}
# Pagination - built-in and powerful
@app.get("/users")
async def list_users(page: int = 1, per_page: int = 20, active: bool = True):
"""List users with automatic pagination and filtering."""
# Chainable query methods with built-in pagination
query = User.where(is_active=active) # Clean filtering
# Paginate() handles count query and data query automatically
result = await query.paginate(page=page, per_page=per_page)
return {
"users": [u.model_dump() for u in result.items],
"total": result.total,
"page": result.page,
"pages": result.pages,
"per_page": result.per_page
}
# Update user - with validation and partial updates
@app.put("/users/{user_id}")
async def update_user(user_id: int, **updates):
"""Update user with automatic validation."""
user = await User.find_or_404(user_id)
# Built-in update method with validation
await user.update(**updates) # Only updates provided fields
return {"user": user.model_dump()}
# Delete user - with soft delete option
@app.delete("/users/{user_id}")
async def delete_user(user_id: int, soft: bool = True):
"""Delete user (soft delete by default)."""
user = await User.find_or_404(user_id)
if soft:
await user.update(is_active=False) # Soft delete
return {"message": "User deactivated"}
else:
await user.destroy() # Hard delete
return {"message": "User deleted"}
# All CRUD operations in ~40 lines vs 200+ traditional!
# Plus you get:
# Automatic password hashing
# Built-in pagination
# Automatic 404 handling
# Session management
# Transaction safety
# Input validation
# Clean serialization
# Finding users - intuitive and readable
user = await User.where(email=email).first()
# Pagination - simple method chaining
users = await User.where(is_active=True).offset(skip).limit(limit).all()
total = await User.where(is_active=True).count()

ZenithModel Benefits:

  • 📝 Readable: User.where(active=True) - clean, intuitive syntax
  • 🔗 Chainable: Method chaining flows naturally
  • 🎯 Focused: Write business logic, not database plumbing
  • 🔒 Type Safe: Full type hints and IDE support

Before diving into endpoints, let’s create a service layer. This separates business logic from HTTP concerns - a pattern used by every serious production application.

Create app/services/__init__.py:

"""
Service layer for business logic.
Services contain the "what" and "why" of your application,
while routes handle the "how" of HTTP communication.
"""
from typing import Optional, List
from sqlmodel import select
from sqlalchemy.exc import IntegrityError
from app.database import get_session
from app.models import User, Project, Task
from app.exceptions import (
NotFoundError,
ConflictError,
ValidationError,
PermissionError
)
class BaseService:
"""Base service with common patterns."""
def __init__(self, session):
self.session = session
async def commit(self):
"""Commit changes with error handling."""
try:
await self.session.commit()
except IntegrityError as e:
await self.session.rollback()
# Convert database errors to API errors
if "UNIQUE constraint" in str(e):
raise ConflictError("Resource already exists")
raise ValidationError(f"Database constraint violation: {e}")

Create app/services/users.py:

"""
User service for authentication and user management.
This service handles all user-related business logic including
password hashing, email validation, and authentication.
"""
from typing import Optional, List
from datetime import datetime, timedelta
from sqlmodel import select, func
from app.services import BaseService
from app.models import User, UserCreate, UserUpdate
from app.auth import hash_password, verify_password, create_access_token
from app.exceptions import NotFoundError, ConflictError, ValidationError
class UserService(BaseService):
"""Handles user operations and authentication."""
async def create_user(self, user_data: UserCreate) -> User:
"""
Create a new user with validation.
ZenithModel provides clean, intuitive patterns for database operations.
"""
# Check if email already exists - Zenith's clean syntax
existing = await User.where(email=user_data.email).first()
if existing:
raise ConflictError(f"Email {user_data.email} is already registered")
# Validate password strength
if len(user_data.password) < 8:
raise ValidationError("Password must be at least 8 characters")
# Create user with ZenithModel's create method
user = await User.create(
name=user_data.name,
email=user_data.email,
password_hash=hash_password(user_data.password)
)
return user
async def get_user(self, user_id: int) -> User:
"""Get user by ID with ZenithModel's find_or_404."""
return await User.find_or_404(user_id)
async def get_user_by_email(self, email: str) -> Optional[User]:
"""Get user by email - showcasing Zenith's clean query syntax."""
return await User.where(email=email).first()
async def list_users(
self,
skip: int = 0,
limit: int = 100,
search: Optional[str] = None
) -> tuple[List[User], int]:
"""
List users with pagination and search.
Showcasing ZenithModel's chainable query patterns.
"""
# Build query with ZenithModel's chainable methods
query = User.where(is_active=True)
# Add search filter if provided
if search:
query = query.where(
name__contains=search,
email__contains=search,
_operator="OR" # ZenithModel supports OR queries
)
# Get total count for pagination
total = await query.count()
# Apply pagination and ordering with method chaining
users = await query.order_by("-created_at").offset(skip).limit(limit).all()
return users, total
async def update_user(
self,
user_id: int,
user_update: UserUpdate,
current_user_id: int
) -> User:
"""
Update user with ZenithModel's convenient update patterns.
Users can only update their own profile unless they're admin.
"""
# Check permissions
if user_id != current_user_id:
raise PermissionError("You can only update your own profile")
# Get existing user
user = await User.find_or_404(user_id)
# Prepare update data
update_data = user_update.model_dump(exclude_unset=True)
# Handle password update separately
if "password" in update_data:
password = update_data.pop("password")
if password and len(password) >= 8:
update_data["password_hash"] = hash_password(password)
# Add timestamp
update_data["updated_at"] = datetime.utcnow()
# Update with ZenithModel's clean update method
await user.update(**update_data)
return user
async def delete_user(self, user_id: int, current_user_id: int) -> bool:
"""
Soft delete with ZenithModel patterns.
Instead of actually deleting, we mark as inactive.
This preserves data integrity and audit trails.
"""
if user_id != current_user_id:
raise PermissionError("You can only delete your own account")
# Soft delete with ZenithModel's update method
user = await User.find_or_404(user_id)
await user.update(
is_active=False,
deleted_at=datetime.utcnow()
)
return True
async def authenticate(self, email: str, password: str) -> Optional[dict]:
"""
Authenticate user and return tokens.
Returns:
Dict with access_token and user info, or None if auth fails
"""
user = await self.get_user_by_email(email)
if not user or not user.is_active:
return None
if not verify_password(password, user.password_hash):
return None
# Generate JWT token
access_token = create_access_token(
data={"sub": user.email, "user_id": user.id}
)
return {
"access_token": access_token,
"token_type": "bearer",
"user": user
}

Create app/exceptions.py for consistent error handling:

"""
Custom exceptions for clean error handling.
These exceptions are caught by our error handlers and converted
to appropriate HTTP responses.
"""
from typing import Any, Optional
class APIException(Exception):
"""Base exception for all API errors."""
def __init__(
self,
message: str,
status_code: int = 400,
details: Optional[dict] = None
):
self.message = message
self.status_code = status_code
self.details = details or {}
super().__init__(message)
class NotFoundError(APIException):
"""Resource not found."""
def __init__(self, message: str = "Resource not found"):
super().__init__(message, status_code=404)
class ConflictError(APIException):
"""Resource conflict (duplicate, etc)."""
def __init__(self, message: str = "Resource conflict"):
super().__init__(message, status_code=409)
class ValidationError(APIException):
"""Validation failed."""
def __init__(self, message: str = "Validation failed", details: dict = None):
super().__init__(message, status_code=422, details=details)
class PermissionError(APIException):
"""Permission denied."""
def __init__(self, message: str = "Permission denied"):
super().__init__(message, status_code=403)
class AuthenticationError(APIException):
"""Authentication failed."""
def __init__(self, message: str = "Authentication required"):
super().__init__(message, status_code=401)

Now let’s create the routes. Update app/main.py:

"""
TaskFlow API - Main application with CRUD operations.
"""
from typing import Optional
from zenith import Zenith
from zenith.responses import JSONResponse
from app.database import get_session
from app.services.users import UserService
from app.models import UserCreate, UserUpdate, UserResponse
from app.auth import get_current_user
from app.exceptions import APIException
app = Zenith(
title="TaskFlow API",
version="0.0.1",
description="Complete task management system"
)
# Error handler for our custom exceptions
@app.exception_handler(APIException)
async def api_exception_handler(request, exc: APIException):
"""Convert our custom exceptions to JSON responses."""
return JSONResponse(
status_code=exc.status_code,
content={
"error": exc.message,
"details": exc.details
}
)
# ============= USER ENDPOINTS =============
@app.post("/users", response_model=UserResponse, status_code=201)
async def create_user(
user_data: UserCreate,
session = Depends(get_session)
):
"""
Register a new user.
Requirements:
- Email must be unique
- Password must be at least 8 characters
- Name is required
Returns:
Created user (without password)
"""
service = UserService(session)
user = await service.create_user(user_data)
return user
@app.get("/users", response_model=list[UserResponse])
async def list_users(
skip: int = Query(0, ge=0, description="Number of users to skip"),
limit: int = Query(100, ge=1, le=1000, description="Max users to return"),
search: Optional[str] = Query(None, description="Search in name/email"),
session = Depends(get_session)
):
"""
List all users with pagination.
Features:
- Pagination with skip/limit
- Search by name or email
- Returns total count in headers
"""
service = UserService(session)
users, total = await service.list_users(skip, limit, search)
# Add total count to response headers for pagination
return JSONResponse(
content=[user.model_dump() for user in users],
headers={"X-Total-Count": str(total)}
)
@app.get("/users/{user_id}", response_model=UserResponse)
async def get_user(
user_id: int = Path(..., ge=1, description="User ID"),
session = Depends(get_session)
):
"""
Get a specific user by ID.
Raises:
404: User not found
"""
service = UserService(session)
user = await service.get_user(user_id)
return user
@app.patch("/users/{user_id}", response_model=UserResponse)
async def update_user(
user_id: int = Path(..., ge=1),
user_update: UserUpdate,
current_user = Depends(get_current_user),
session = Depends(get_session)
):
"""
Update user profile.
Rules:
- Users can only update their own profile
- Email changes require re-verification (not implemented)
- Password changes require old password (not implemented)
Raises:
403: Permission denied
404: User not found
"""
service = UserService(session)
user = await service.update_user(
user_id,
user_update,
current_user.id
)
return user
@app.delete("/users/{user_id}", status_code=204)
async def delete_user(
user_id: int = Path(..., ge=1),
current_user = Depends(get_current_user),
session = Depends(get_session)
):
"""
Delete (deactivate) a user account.
Note: This performs a soft delete, marking the user as inactive
rather than removing from database.
Raises:
403: Permission denied
404: User not found
"""
service = UserService(session)
await service.delete_user(user_id, current_user.id)
return None # 204 No Content

Create app/services/projects.py:

"""
Project service for managing projects and team collaboration.
"""
from typing import List, Optional
from datetime import datetime
from sqlmodel import select, func
from app.services import BaseService
from app.models import Project, ProjectCreate, ProjectUpdate, User
from app.exceptions import NotFoundError, PermissionError
class ProjectService(BaseService):
"""Manages project operations."""
async def create_project(
self,
project_data: ProjectCreate,
owner_id: int
) -> Project:
"""
Create a new project.
The creating user becomes the project owner.
"""
project = Project(
**project_data.model_dump(),
owner_id=owner_id
)
self.session.add(project)
await self.commit()
await self.session.refresh(project)
# Load owner relationship
await self.session.refresh(project, ["owner"])
return project
async def get_project(self, project_id: int) -> Project:
"""Get project with owner loaded."""
project = await self.session.get(Project, project_id)
if not project:
raise NotFoundError(f"Project {project_id} not found")
# Eager load relationships
await self.session.refresh(project, ["owner", "tasks"])
return project
async def list_projects(
self,
user_id: Optional[int] = None,
skip: int = 0,
limit: int = 100,
include_archived: bool = False
) -> tuple[List[Project], int]:
"""
List projects with filters.
Args:
user_id: Filter by owner
skip: Pagination offset
limit: Max items to return
include_archived: Include archived projects
Returns:
Tuple of (projects, total_count)
"""
query = select(Project)
# Filter by owner if specified
if user_id:
query = query.where(Project.owner_id == user_id)
# Exclude archived unless requested
if not include_archived:
query = query.where(Project.is_archived == False)
# Get total count
count_stmt = select(func.count()).select_from(query.subquery())
total = await self.session.exec(count_stmt).one()
# Apply pagination and ordering
query = (query
.order_by(Project.created_at.desc())
.offset(skip)
.limit(limit))
result = await self.session.exec(query)
projects = result.all()
# Load owners for all projects
for project in projects:
await self.session.refresh(project, ["owner"])
return projects, total
async def update_project(
self,
project_id: int,
project_update: ProjectUpdate,
user_id: int
) -> Project:
"""
Update project if user is owner.
Only the project owner can update project details.
"""
project = await self.get_project(project_id)
# Check ownership
if project.owner_id != user_id:
raise PermissionError("Only the project owner can update it")
# Update fields
update_data = project_update.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(project, field, value)
project.updated_at = datetime.utcnow()
await self.commit()
await self.session.refresh(project)
return project
async def archive_project(
self,
project_id: int,
user_id: int
) -> Project:
"""
Archive (soft delete) a project.
Archived projects are hidden but not deleted,
preserving task history.
"""
project = await self.get_project(project_id)
if project.owner_id != user_id:
raise PermissionError("Only the project owner can archive it")
project.is_archived = True
project.archived_at = datetime.utcnow()
await self.commit()
return project
async def get_project_stats(self, project_id: int) -> dict:
"""
Get project statistics.
Returns task counts by status, completion rate, etc.
"""
project = await self.get_project(project_id)
# In a real app, we'd use SQL aggregation
total_tasks = len(project.tasks)
completed_tasks = sum(1 for t in project.tasks if t.is_completed)
return {
"total_tasks": total_tasks,
"completed_tasks": completed_tasks,
"pending_tasks": total_tasks - completed_tasks,
"completion_rate": (
completed_tasks / total_tasks * 100
if total_tasks > 0 else 0
)
}

Create app/services/tasks.py:

"""
Task service with transaction support.
Shows how to handle complex operations that need
to update multiple tables atomically.
"""
from typing import List, Optional
from datetime import datetime, timedelta
from sqlmodel import select, and_
from app.services import BaseService
from app.models import Task, TaskCreate, TaskUpdate, Project
from app.exceptions import NotFoundError, PermissionError, ValidationError
class TaskService(BaseService):
"""Manages task operations with transaction support."""
async def create_task(
self,
task_data: TaskCreate,
user_id: int
) -> Task:
"""
Create a task within a project.
This demonstrates transaction handling:
1. Verify project exists and user has access
2. Create the task
3. Update project's task count
4. Commit all changes atomically
"""
# Start transaction context
async with self.session.begin():
# Verify project exists and user owns it
project = await self.session.get(Project, task_data.project_id)
if not project:
raise NotFoundError(f"Project {task_data.project_id} not found")
if project.owner_id != user_id:
raise PermissionError("You can only add tasks to your own projects")
# Create the task
task = Task(
**task_data.model_dump(),
created_by=user_id
)
# Set default due date if not provided (7 days from now)
if not task.due_date:
task.due_date = datetime.utcnow() + timedelta(days=7)
self.session.add(task)
# Update project's last activity
project.updated_at = datetime.utcnow()
# Transaction commits automatically when exiting the context
# Refresh to get the committed data
await self.session.refresh(task)
await self.session.refresh(task, ["project", "assignee"])
return task
async def list_tasks(
self,
project_id: Optional[int] = None,
assignee_id: Optional[int] = None,
status: Optional[str] = None,
skip: int = 0,
limit: int = 100
) -> tuple[List[Task], int]:
"""
List tasks with multiple filters.
This shows how to build complex queries dynamically.
"""
# Start with base query
query = select(Task)
# Build filter conditions
conditions = []
if project_id:
conditions.append(Task.project_id == project_id)
if assignee_id:
conditions.append(Task.assignee_id == assignee_id)
if status:
if status == "completed":
conditions.append(Task.is_completed == True)
elif status == "pending":
conditions.append(Task.is_completed == False)
elif status == "overdue":
conditions.append(
and_(
Task.is_completed == False,
Task.due_date < datetime.utcnow()
)
)
# Apply all conditions
if conditions:
query = query.where(and_(*conditions))
# Get count
count_stmt = select(func.count()).select_from(query.subquery())
total = await self.session.exec(count_stmt).one()
# Order by priority and due date
query = (query
.order_by(Task.priority.desc(), Task.due_date)
.offset(skip)
.limit(limit))
result = await self.session.exec(query)
tasks = result.all()
# Load relationships
for task in tasks:
await self.session.refresh(task, ["project", "assignee"])
return tasks, total
async def update_task(
self,
task_id: int,
task_update: TaskUpdate,
user_id: int
) -> Task:
"""
Update task with permission checks.
Rules:
- Project owner can update any task
- Assignee can update completion status
- Others have no access
"""
task = await self.session.get(Task, task_id)
if not task:
raise NotFoundError(f"Task {task_id} not found")
# Load project to check ownership
await self.session.refresh(task, ["project"])
# Permission checks
is_owner = task.project.owner_id == user_id
is_assignee = task.assignee_id == user_id
if not (is_owner or is_assignee):
raise PermissionError("You don't have permission to update this task")
# Assignees can only update completion status
if is_assignee and not is_owner:
update_data = task_update.model_dump(exclude_unset=True)
allowed_fields = {"is_completed", "completed_at"}
if set(update_data.keys()) - allowed_fields:
raise PermissionError("Assignees can only update completion status")
# Apply updates
update_data = task_update.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(task, field, value)
# Auto-set completed_at when marking as complete
if "is_completed" in update_data:
if update_data["is_completed"]:
task.completed_at = datetime.utcnow()
else:
task.completed_at = None
task.updated_at = datetime.utcnow()
await self.commit()
await self.session.refresh(task)
return task
async def bulk_update_tasks(
self,
task_ids: List[int],
update_data: dict,
user_id: int
) -> int:
"""
Update multiple tasks at once.
This demonstrates bulk operations with transactions.
Returns the count of updated tasks.
"""
updated_count = 0
async with self.session.begin():
for task_id in task_ids:
task = await self.session.get(Task, task_id)
if not task:
continue
# Check permissions (simplified)
await self.session.refresh(task, ["project"])
if task.project.owner_id != user_id:
continue
# Apply updates
for field, value in update_data.items():
if hasattr(task, field):
setattr(task, field, value)
task.updated_at = datetime.utcnow()
updated_count += 1
return updated_count

Update app/main.py to add the remaining routes:

# ============= PROJECT ENDPOINTS =============
@app.post("/projects", response_model=ProjectResponse, status_code=201)
async def create_project(
project_data: ProjectCreate,
current_user = Depends(get_current_user),
session = Depends(get_session)
):
"""Create a new project."""
service = ProjectService(session)
project = await service.create_project(project_data, current_user.id)
return project
@app.get("/projects", response_model=list[ProjectResponse])
async def list_projects(
my_projects_only: bool = Query(False, description="Only show my projects"),
include_archived: bool = Query(False, description="Include archived"),
skip: int = Query(0, ge=0),
limit: int = Query(100, ge=1, le=1000),
current_user = Depends(get_current_user),
session = Depends(get_session)
):
"""List projects with filters."""
service = ProjectService(session)
user_id = current_user.id if my_projects_only else None
projects, total = await service.list_projects(
user_id=user_id,
skip=skip,
limit=limit,
include_archived=include_archived
)
return JSONResponse(
content=[p.model_dump() for p in projects],
headers={"X-Total-Count": str(total)}
)
@app.get("/projects/{project_id}/stats")
async def get_project_stats(
project_id: int,
session = Depends(get_session)
):
"""Get project statistics."""
service = ProjectService(session)
stats = await service.get_project_stats(project_id)
return stats
# ============= TASK ENDPOINTS =============
@app.post("/tasks", response_model=TaskResponse, status_code=201)
async def create_task(
task_data: TaskCreate,
current_user = Depends(get_current_user),
session = Depends(get_session)
):
"""Create a new task."""
service = TaskService(session)
task = await service.create_task(task_data, current_user.id)
return task
@app.get("/tasks", response_model=list[TaskResponse])
async def list_tasks(
project_id: Optional[int] = Query(None, description="Filter by project"),
assignee_id: Optional[int] = Query(None, description="Filter by assignee"),
status: Optional[str] = Query(None, enum=["pending", "completed", "overdue"]),
skip: int = Query(0, ge=0),
limit: int = Query(100, ge=1, le=1000),
session = Depends(get_session)
):
"""List tasks with multiple filters."""
service = TaskService(session)
tasks, total = await service.list_tasks(
project_id=project_id,
assignee_id=assignee_id,
status=status,
skip=skip,
limit=limit
)
return JSONResponse(
content=[t.model_dump() for t in tasks],
headers={"X-Total-Count": str(total)}
)
@app.patch("/tasks/{task_id}", response_model=TaskResponse)
async def update_task(
task_id: int,
task_update: TaskUpdate,
current_user = Depends(get_current_user),
session = Depends(get_session)
):
"""Update a task."""
service = TaskService(session)
task = await service.update_task(task_id, task_update, current_user.id)
return task
@app.post("/tasks/bulk-update")
async def bulk_update_tasks(
task_ids: List[int] = Body(..., description="List of task IDs"),
update_data: dict = Body(..., description="Fields to update"),
current_user = Depends(get_current_user),
session = Depends(get_session)
):
"""
Update multiple tasks at once.
Useful for bulk operations like marking multiple tasks complete.
"""
service = TaskService(session)
count = await service.bulk_update_tasks(
task_ids,
update_data,
current_user.id
)
return {"updated_count": count}

Let’s write comprehensive tests. Create tests/test_crud.py:

"""
Test CRUD operations for TaskFlow API.
These tests demonstrate how to test:
- Success cases
- Error cases
- Permissions
- Transactions
"""
import pytest
from httpx import AsyncClient
from app.main import app
from app.database import get_session
from app.models import User, Project, Task
@pytest.fixture
async def client():
"""Create test client."""
async with AsyncClient(app=app, base_url="http://test") as ac:
yield ac
@pytest.fixture
async def auth_headers(client):
"""Create a user and return auth headers."""
# Create user
response = await client.post(
"/users",
json={
"name": "Test User",
"email": "test@example.com",
"password": "password123"
}
)
assert response.status_code == 201
# Login
response = await client.post(
"/auth/login",
json={
"email": "test@example.com",
"password": "password123"
}
)
token = response.json()["access_token"]
return {"Authorization": f"Bearer {token}"}
class TestUserCRUD:
"""Test user CRUD operations."""
async def test_create_user_success(self, client):
"""Test successful user creation."""
response = await client.post(
"/users",
json={
"name": "Alice Smith",
"email": "alice@example.com",
"password": "secure123"
}
)
assert response.status_code == 201
data = response.json()
assert data["name"] == "Alice Smith"
assert data["email"] == "alice@example.com"
assert "password" not in data # Password should never be returned
async def test_create_user_duplicate_email(self, client):
"""Test that duplicate emails are rejected."""
user_data = {
"name": "Bob Jones",
"email": "bob@example.com",
"password": "password123"
}
# Create first user
response = await client.post("/users", json=user_data)
assert response.status_code == 201
# Try to create duplicate
response = await client.post("/users", json=user_data)
assert response.status_code == 409
assert "already registered" in response.json()["error"]
async def test_list_users_pagination(self, client):
"""Test user listing with pagination."""
# Create multiple users
for i in range(15):
await client.post(
"/users",
json={
"name": f"User {i}",
"email": f"user{i}@example.com",
"password": "password123"
}
)
# Test pagination
response = await client.get("/users?skip=0&limit=10")
assert response.status_code == 200
assert len(response.json()) == 10
assert response.headers["X-Total-Count"] == "15"
# Test second page
response = await client.get("/users?skip=10&limit=10")
assert len(response.json()) == 5
class TestProjectCRUD:
"""Test project CRUD operations."""
async def test_create_project(self, client, auth_headers):
"""Test project creation."""
response = await client.post(
"/projects",
json={
"name": "New Website",
"description": "Redesign company website"
},
headers=auth_headers
)
assert response.status_code == 201
data = response.json()
assert data["name"] == "New Website"
assert data["owner"]["email"] == "test@example.com"
async def test_project_permissions(self, client, auth_headers):
"""Test that only owners can update projects."""
# Create project as user 1
response = await client.post(
"/projects",
json={"name": "My Project"},
headers=auth_headers
)
project_id = response.json()["id"]
# Create another user
await client.post(
"/users",
json={
"name": "Other User",
"email": "other@example.com",
"password": "password123"
}
)
# Login as other user
response = await client.post(
"/auth/login",
json={
"email": "other@example.com",
"password": "password123"
}
)
other_token = response.json()["access_token"]
other_headers = {"Authorization": f"Bearer {other_token}"}
# Try to update project as other user
response = await client.patch(
f"/projects/{project_id}",
json={"name": "Hacked Project"},
headers=other_headers
)
assert response.status_code == 403
assert "permission" in response.json()["error"].lower()
class TestTaskCRUD:
"""Test task CRUD with transactions."""
async def test_create_task_with_transaction(self, client, auth_headers):
"""Test that task creation is transactional."""
# Create project first
response = await client.post(
"/projects",
json={"name": "Test Project"},
headers=auth_headers
)
project_id = response.json()["id"]
# Create task
response = await client.post(
"/tasks",
json={
"title": "Implement feature",
"description": "Add new functionality",
"project_id": project_id,
"priority": 3
},
headers=auth_headers
)
assert response.status_code == 201
task = response.json()
assert task["title"] == "Implement feature"
assert task["project"]["id"] == project_id
async def test_bulk_update_tasks(self, client, auth_headers):
"""Test updating multiple tasks at once."""
# Create project
response = await client.post(
"/projects",
json={"name": "Bulk Test"},
headers=auth_headers
)
project_id = response.json()["id"]
# Create multiple tasks
task_ids = []
for i in range(5):
response = await client.post(
"/tasks",
json={
"title": f"Task {i}",
"project_id": project_id
},
headers=auth_headers
)
task_ids.append(response.json()["id"])
# Bulk update
response = await client.post(
"/tasks/bulk-update",
json={
"task_ids": task_ids,
"update_data": {"is_completed": True}
},
headers=auth_headers
)
assert response.json()["updated_count"] == 5

Always return total count for pagination:

# In your service
async def list_items(skip: int, limit: int):
query = select(Item)
total = await session.exec(select(func.count()).select_from(query.subquery())).one()
items = await session.exec(query.offset(skip).limit(limit)).all()
return items, total
# In your route
@app.get("/items")
async def get_items(...):
items, total = await service.list_items(skip, limit)
return JSONResponse(
content=[item.model_dump() for item in items],
headers={"X-Total-Count": str(total)}
)

Preserve data integrity with soft deletes:

class SoftDeleteMixin:
is_active: bool = Field(default=True)
deleted_at: Optional[datetime] = Field(default=None)
def soft_delete(self):
self.is_active = False
self.deleted_at = datetime.utcnow()
# Usage in queries
query = select(Item).where(Item.is_active == True)

Use transactions for multi-step operations:

async def complex_operation():
async with session.begin(): # Starts transaction
# All operations here are atomic
user = await create_user(...)
project = await create_project(user.id, ...)
await send_welcome_email(user.email)
# Commits automatically on success
# Rolls back automatically on exception

Start your application:

Terminal window
# Run with hot reload
uvicorn app.main:app --reload
# Or with uv
uv run uvicorn app.main:app --reload

Test the endpoints:

Terminal window
# Create a user
curl -X POST http://localhost:8000/users \
-H "Content-Type: application/json" \
-d '{"name": "Alice", "email": "alice@example.com", "password": "secure123"}'
# List users with pagination
curl "http://localhost:8000/users?skip=0&limit=10"
# Get specific user
curl http://localhost:8000/users/1
# Run tests
pytest tests/test_crud.py -v
  1. Use select columns: Only fetch needed columns
  2. Eager loading: Load relationships when needed
  3. Indexes: Add indexes for frequently queried columns
  4. Pagination: Always paginate large result sets
from functools import lru_cache
from zenith.cache import cache
@cache(ttl=300) # Cache for 5 minutes
async def get_user_stats(user_id: int):
# Expensive calculation
return calculate_stats(user_id)

In this part, you’ve mastered: Service layer pattern for business logic Complete CRUD operations for all models Custom exception handling Permission checks and authorization Pagination with total counts Search and filtering Transaction handling Bulk operations Soft delete pattern Comprehensive testing

Solution: Ensure related records exist before creating dependent records. Use transactions to ensure atomicity.

Solution: Use eager loading with refresh() or join queries to load relationships efficiently.

Solution: Add pagination, limit default page size, add database indexes on filtered columns.

In Part 4: Authentication, we’ll:

  • Implement JWT authentication
  • Add login and logout endpoints
  • Create password reset flow
  • Add OAuth integration
  • Implement role-based access control

Questions? Check our FAQ or ask in GitHub Discussions.