Skip to content

Service API

The Service system is Zenith’s approach to organizing business logic separately from web concerns. Services provide dependency injection, database access, and service composition.

from zenith import Service, Inject
from zenith.db import Session
from sqlmodel import select
from typing import Optional
# WHY SERVICES?
# Problem: Putting business logic in routes makes them huge and untestable
# Solution: Services organize business logic in reusable, testable classes
class UserService(Service):
"""All user-related business logic in one place.
Services provide:
- Database access (self.db)
- Current user info (self.current_user)
- Request context (self.request)
- Caching (self.cache)
- Logging (self.logger)
"""
# No __init__ needed - inherits from Service base class
# Container is optional and automatically provided when used with DI
async def create_user(self, email: str, name: str) -> User:
"""Create a new user - business logic, not HTTP logic.
This method:
1. Creates user object
2. Saves to database
3. Returns fresh user with ID
Notice: No HTTP status codes, no request/response objects!
"""
# Step 1: Create the user object (in memory)
user = User(email=email, name=name)
# Step 2: Add to database session (staged for insert)
self.db.add(user) # self.db is automatically injected!
# Step 3: Commit the transaction (actually saves)
await self.db.commit()
# Step 4: Refresh to get generated fields (ID, timestamps)
await self.db.refresh(user)
return user # Now has ID from database
async def get_user_by_email(self, email: str) -> Optional[User]:
"""Find user by email address.
Returns None if not found (not HTTP 404!).
Let the route decide what HTTP response to send.
"""
# Build SQL query using SQLModel
statement = select(User).where(User.email == email)
# Execute query using injected database session
result = await self.db.exec(statement)
# Get first result or None
return result.first()
# HOW TO USE IN ROUTES:
# @app.post("/users")
# async def create_user(
# user_data: UserCreate,
# users: UserService = Inject(UserService) # Service injected automatically!
# ):
# user = await users.create_user(user_data.email, user_data.name)
# return user # Route decides the HTTP response

Service with Dependency Injection (Composing Services)

Section titled “Service with Dependency Injection (Composing Services)”
from zenith import Service, Inject
from app.services.email import EmailService
from app.services.storage import StorageService
class UserService(Service):
"""User service that depends on other services.
Real-world services often need:
- Email sending
- File storage
- Payment processing
- Notification systems
- Analytics tracking
"""
def __init__(
self,
email_service: EmailService,
storage: StorageService
):
"""Initialize with dependencies (auto-injected by framework).
Dependencies are automatically resolved and injected based on type hints.
For manual/test usage, pass instances directly to constructor.
Args:
email_service: Email service instance (auto-injected)
storage: Storage service instance (auto-injected)
"""
self.email_service = email_service
self.storage = storage
async def register_user(self, user_data: UserCreate) -> User:
"""Complete user registration flow.
This method orchestrates multiple services:
1. Database (inherited from Service)
2. Email service (injected)
3. Storage service (injected)
"""
# Step 1: Check if email already exists (business rule)
existing = await self.get_user_by_email(user_data.email)
if existing:
# Raise domain exception (not HTTP exception!)
raise ValueError("Email already registered")
# The route will convert this to HTTP 400
# Step 2: Create user in database
user = User(**user_data.model_dump()) # Convert Pydantic to SQLModel
self.db.add(user) # Stage for insert
await self.db.commit() # Save to database
await self.db.refresh(user) # Get generated ID
# Step 3: Send welcome email (using injected service)
try:
await self.email_service.send_welcome_email(user)
except Exception as e:
# Log but don't fail registration
self.logger.error(f"Failed to send welcome email: {e}")
# Could queue for retry instead
# Step 4: Create default avatar (using injected service)
if self.storage:
await self.storage.create_default_avatar(user.id)
return user # Registration complete!
async def get_user_by_email(self, email: str) -> Optional[User]:
"""Helper method used by register_user."""
statement = select(User).where(User.email == email)
result = await self.db.exec(statement)
return result.first()
# TESTING IS EASY WITH CONSTRUCTOR INJECTION:
# async def test_register_user():
# # Create mock dependencies
# mock_email = Mock(EmailService)
# mock_storage = Mock(StorageService)
#
# # Pass mocks directly to constructor - no framework needed!
# service = UserService(
# email_service=mock_email,
# storage=mock_storage
# )
#
# # Test the business logic
# user = await service.register_user(test_data)
#
# # Verify mocks were called
# mock_email.send_welcome_email.assert_called_once()

Every service has access to a database session:

class ProductService(Service):
async def create_product(self, product_data: ProductCreate) -> Product:
product = Product(**product_data.model_dump())
# Database session available as self.db
self.db.add(product)
await self.db.commit()
await self.db.refresh(product)
return product
async def bulk_update_prices(self, updates: List[PriceUpdate]):
"""Bulk update with transaction."""
async with self.db.begin(): # Transaction
for update in updates:
product = await self.db.get(Product, update.product_id)
if product:
product.price = update.new_price
# Auto-commit on success, rollback on error

Access current request information:

class AuditService(Service):
async def log_user_action(self, action: str, details: dict):
"""Log user action with request context."""
log_entry = AuditLog(
user_id=self.current_user.id if self.current_user else None,
action=action,
details=details,
ip_address=self.request.client.host,
user_agent=self.request.headers.get('user-agent'),
timestamp=datetime.utcnow()
)
self.db.add(log_entry)
await self.db.commit()

Built-in caching support:

from zenith.cache import cache_result
class ProductService(Service):
@cache_result(ttl=300) # Cache for 5 minutes
async def get_featured_products(self) -> List[Product]:
"""Get featured products with caching."""
statement = select(Product).where(
Product.is_featured == True,
Product.is_active == True
)
result = await self.db.exec(statement)
return result.all()
async def update_product(self, product_id: int, updates: ProductUpdate):
"""Update product and invalidate cache."""
product = await self.db.get(Product, product_id)
if not product:
return None
# Update product
for field, value in updates.model_dump(exclude_unset=True).items():
setattr(product, field, value)
await self.db.commit()
# Invalidate related caches
await self.cache.invalidate('get_featured_products')
return product
from zenith import Router, Inject, HTTPException
from app.services.users import UserService
from app.models import UserCreate, User
router = Router()
@router.post("/users")
async def create_user(
user_data: UserCreate, # Pydantic model from request body
users: UserService = Inject(UserService) # ✨ Magic dependency injection!
) -> User:
"""Create a new user.
WHAT HAPPENS HERE:
1. Zenith creates UserService instance
2. Injects database session into service
3. Passes service to this function
4. You use service for business logic
5. Zenith cleans up after request
YOU DON'T WORRY ABOUT:
- Creating service instances
- Managing database connections
- Cleaning up resources
- Transaction management
"""
# Just call the service method!
return await users.create_user(user_data)
# Service handles all business logic
# Route just connects HTTP to business logic
@router.get("/users/{user_id}")
async def get_user(
user_id: int, # From URL path
users: UserService = Inject(UserService) # Injected service
) -> User:
"""Get user by ID.
SEPARATION OF CONCERNS:
- Service: Finds user (business logic)
- Route: Returns 404 if not found (HTTP logic)
"""
# Service returns None if not found (business logic)
user = await users.get_user(user_id)
# Route decides HTTP response (web logic)
if not user:
raise HTTPException(
status_code=404,
detail=f"User with ID {user_id} not found"
)
return user # Zenith converts to JSON automatically
# COMPARE TO DOING IT WITHOUT SERVICES:
# @router.post("/users")
# async def create_user_bad(user_data: UserCreate):
# # All this logic in the route!
# async with get_session() as db:
# existing = db.query(User).filter_by(email=user_data.email).first()
# if existing:
# raise HTTPException(400, "Email taken")
#
# user = User(**user_data.model_dump())
# db.add(user)
# await db.commit()
#
# send_email(user.email, "Welcome!") # What if this fails?
#
# return user
#
# # Problems:
# # - Can't test without HTTP
# # - Can't reuse in CLI/background tasks
# # - Route is doing too much
# # - Hard to maintain

Multiple Services (Complex Business Operations)

Section titled “Multiple Services (Complex Business Operations)”
@router.post("/orders")
async def create_order(
order_data: OrderCreate, # From request body
# Inject multiple services and dependencies
current_user: User = Inject(get_current_user), # Auth dependency
orders: OrderService = Inject(OrderService), # Order logic
inventory: InventoryService = Inject(InventoryService), # Stock management
payments: PaymentService = Inject(PaymentService) # Payment processing
) -> Order:
"""Create an order - orchestrating multiple services.
This endpoint shows real-world complexity:
- Multiple services working together
- Transaction coordination
- Error handling across services
- Business rule validation
Each service handles its own domain!
"""
# Step 1: CHECK INVENTORY (before taking payment!)
available = await inventory.check_availability(order_data.items)
if not available:
# Return user-friendly error
unavailable_items = await inventory.get_unavailable_items(order_data.items)
raise HTTPException(
status_code=400,
detail={
"error": "Some items are not available",
"unavailable": unavailable_items
}
)
# Step 2: PROCESS PAYMENT (might fail!)
try:
payment = await payments.process_payment(
user_id=current_user.id,
payment_method=order_data.payment_method,
amount=order_data.total
)
except PaymentFailedError as e:
# Payment failed - don't create order!
raise HTTPException(
status_code=402, # Payment Required
detail={
"error": "Payment failed",
"reason": str(e),
"suggestion": "Please check your payment method"
}
)
# Step 3: CREATE ORDER (payment succeeded)
order = await orders.create_order(
current_user,
order_data,
payment_id=payment.id # Link to payment
)
# Step 4: RESERVE INVENTORY (mark items as sold)
try:
await inventory.reserve_items(
order_data.items,
order_id=order.id
)
except Exception as e:
# Inventory reservation failed - need to refund!
await payments.refund(payment.id)
await orders.cancel_order(order.id)
raise HTTPException(
status_code=500,
detail="Order processing failed. Payment will be refunded."
)
# Success! All services coordinated successfully
return order
# WHY SERVICES MAKE THIS BETTER:
# 1. Each service is independently testable
# 2. Services can be mocked for testing
# 3. Business logic is reusable (CLI, background jobs)
# 4. Clear separation of concerns
# 5. Easy to add new services (shipping, tax, etc.)
class OrderService(Service):
def __init__(
self,
users: UserService,
products: ProductService,
notifications: NotificationService
):
"""Dependencies auto-injected by framework based on type hints."""
self.users = users
self.products = products
self.notifications = notifications
async def process_order(self, order_data: OrderCreate, user_id: int) -> Order:
# Get user (using nested service)
user = await self.users.get_user(user_id)
if not user:
raise ValueError("User not found")
# Validate products
for item in order_data.items:
product = await self.products.get_product(item.product_id)
if not product or not product.is_available:
raise ValueError(f"Product {item.product_id} not available")
# Create order
order = Order(
user_id=user_id,
items=order_data.items,
total=sum(item.price * item.quantity for item in order_data.items)
)
self.db.add(order)
await self.db.commit()
await self.db.refresh(order)
# Send confirmation (using nested service)
await self.notifications.send_order_confirmation(user, order)
return order
class BaseService(Service):
"""Base service with common functionality."""
async def log_action(self, action: str, entity_id: int):
"""Log action for audit trail."""
log = ActivityLog(
action=action,
entity_id=entity_id,
user_id=self.current_user.id if self.current_user else None,
timestamp=datetime.utcnow()
)
self.db.add(log)
await self.db.commit()
async def check_permission(self, permission: str) -> bool:
"""Check if current user has permission."""
if not self.current_user:
return False
return permission in self.current_user.permissions
class UserService(BaseService):
"""User service extending base functionality."""
async def update_user(self, user_id: int, updates: UserUpdate) -> User:
# Check permission (inherited method)
if not await self.check_permission('users:edit'):
raise PermissionError("Insufficient permissions")
user = await self.db.get(User, user_id)
if not user:
return None
# Update user
for field, value in updates.model_dump(exclude_unset=True).items():
setattr(user, field, value)
await self.db.commit()
# Log action (inherited method)
await self.log_action('user_updated', user_id)
return user
from zenith.exceptions import BusinessLogicError, ValidationError
class PaymentService(Service):
async def process_payment(
self,
user_id: int,
amount: float,
payment_method: str
) -> Payment:
try:
# Validate amount
if amount <= 0:
raise ValidationError("Amount must be positive")
# Get user
user = await self.db.get(User, user_id)
if not user:
raise ValidationError("User not found")
# Check payment method
if not user.has_payment_method(payment_method):
raise BusinessLogicError("Payment method not available")
# Process with external service
result = await self.payment_service.charge(
user.stripe_customer_id,
amount,
payment_method
)
if not result.success:
raise BusinessLogicError(f"Payment failed: {result.error}")
# Create payment record
payment = Payment(
user_id=user_id,
amount=amount,
payment_method=payment_method,
external_id=result.transaction_id,
status='completed'
)
self.db.add(payment)
await self.db.commit()
await self.db.refresh(payment)
return payment
except Exception as e:
# Log error for debugging
self.logger.error(f"Payment processing failed: {e}")
# Re-raise business logic errors as-is
if isinstance(e, (BusinessLogicError, ValidationError)):
raise
# Wrap other exceptions
raise BusinessLogicError("Payment processing failed") from e
from zenith import Zenith
from app.services import UserService, ProductService
app = Zenith()
# Configure service dependencies
app.configure_service(UserService, {
'cache_ttl': 300,
'audit_enabled': True
})
app.configure_service(ProductService, {
'enable_inventory_sync': True,
'price_change_notifications': True
})
class UserService(Service):
def __init__(self, config: dict | None = None):
"""Initialize with optional configuration."""
self.config = config or {}
self.audit_enabled = self.config.get('audit_enabled', False)
self.cache_ttl = self.config.get('cache_ttl', 60)
async def create_user(self, user_data: UserCreate) -> User:
user = User(**user_data.model_dump())
self.db.add(user)
await self.db.commit()
# Optional audit logging
if self.audit_enabled:
await self.log_user_creation(user)
return user
from zenith.testing import TestService
import pytest
@pytest.mark.asyncio
async def test_user_creation():
async with TestService(UserService) as users:
# Create test user
user_data = UserCreate(
email="test@example.com",
name="Test User"
)
user = await users.create_user(user_data)
assert user.email == "test@example.com"
assert user.name == "Test User"
assert user.id is not None
@pytest.mark.asyncio
async def test_user_service_with_mocks():
# Mock dependencies
mock_email_service = Mock()
mock_storage = Mock()
async with TestService(UserService, {
'email_service': mock_email_service,
'storage': mock_storage
}) as users:
user_data = UserCreate(
email="test@example.com",
name="Test User"
)
user = await users.register_user(user_data)
# Verify email service was called
mock_email_service.send_welcome_email.assert_called_once_with(user)
assert user.email == "test@example.com"
class UserService(Service):
async def bulk_create_users(self, users_data: List[UserCreate]) -> List[User]:
"""Efficient bulk user creation."""
users = [User(**data.model_dump()) for data in users_data]
# Use bulk operations
self.db.add_all(users)
await self.db.commit()
# Refresh all at once
for user in users:
await self.db.refresh(user)
return users
class ProductService(Service):
@property
def search_service(self):
"""Lazy-loaded search service."""
if not hasattr(self, '_search_service'):
self._search_service = SearchService()
return self._search_service
async def search_products(self, query: str) -> List[Product]:
# Search service only initialized when needed
return await self.search_service.search(query)
  1. Creation - Service instantiated with dependencies
  2. Database Session - Automatic session management
  3. Request Binding - Access to current request/user
  4. Method Execution - Business logic execution
  5. Cleanup - Automatic resource cleanup

Services handle the complete lifecycle automatically, ensuring proper resource management and transaction handling.