Service System
What are Services?
Section titled “What are Services?”Services are Zenith’s way of organizing business logic. They provide a clean separation between your domain logic and web layer, making your code more maintainable, testable, and reusable.
Why Use Services?
Section titled “Why Use Services?”Traditional Approach (Without Services)
Section titled “Traditional Approach (Without Services)”# Business logic mixed with web layer - messy and hard to maintain@app.post("/users")async def create_user(user_data: UserCreate, db: Session): # Problem 1: Business validation mixed with HTTP handling if len(user_data.password) < 8: raise HTTPException(400, "Password too short") # HTTP concern in business logic!
# Problem 2: Database queries directly in route handler existing = db.query(User).filter_by(email=user_data.email).first() if existing: raise HTTPException(400, "Email already exists") # More HTTP mixing
# Problem 3: Security logic (password hashing) in route password_hash = bcrypt.hashpw(user_data.password.encode(), bcrypt.gensalt())
# Problem 4: Database operations scattered everywhere user = User( email=user_data.email, password_hash=password_hash ) db.add(user) db.commit() # What if this fails?
# Problem 5: Side effects mixed in send_email(user.email, "Welcome!") # What if email fails?
# Problem 6: Logging scattered through routes logger.info(f"User created: {user.email}")
# Problem 7: How do you test this? Mock everything? # Problem 8: How do you reuse this logic in CLI commands? # Problem 9: How do you add more features without making this huge?
return userZenith Approach (With Services)
Section titled “Zenith Approach (With Services)”# Clean separation of concerns - business logic in serviceclass UserService(Service): """Encapsulates ALL user-related business logic.
Benefits: 1. Testable - mock only what you need 2. Reusable - same logic for API, CLI, background jobs 3. Maintainable - all user logic in one place 4. Scalable - easy to add features """
async def create_user(self, user_data: UserCreate) -> User: """Create a new user with all business rules applied."""
# Step 1: Validate business rules (not HTTP validation!) await self._validate_password(user_data.password) await self._ensure_email_unique(user_data.email)
# Step 2: Core business operation user = await self._save_user(user_data)
# Step 3: Side effects (organized, not scattered) await self._send_welcome_email(user) await self._log_user_creation(user)
return user # Just the user, no HTTP concerns
# Private methods organize complex logic async def _validate_password(self, password: str): """Business rule: passwords must be secure.""" if len(password) < 8: raise ValueError("Password must be at least 8 characters") # Domain exception if not any(c.isdigit() for c in password): raise ValueError("Password must contain a number")
async def _ensure_email_unique(self, email: str): """Business rule: emails must be unique.""" if await self.db.exists(User, email=email): raise ValueError(f"Email {email} already registered") # Domain exception
# Route handler becomes trivially simple@app.post("/users")async def create_user( user_data: UserCreate, users: UserService = Inject(UserService) # Service injected automatically) -> User: # That's it! Route only handles HTTP concerns: # 1. Receive request (handled by Zenith) # 2. Call business logic # 3. Return response (handled by Zenith) return await users.create_user(user_data)
# Now you can reuse the same logic everywhere:# CLI command@cli.command()async def create_admin(email: str, password: str): users = UserService() await users.create_user(UserCreate(email=email, password=password, role="admin"))
# Background job@task_queue.jobasync def import_users(csv_file: str): users = UserService() for row in read_csv(csv_file): await users.create_user(UserCreate(**row))Creating Services
Section titled “Creating Services”Basic Service
Section titled “Basic Service”from zenith import Service, Injectfrom typing import List, Optionalfrom datetime import datetime
class ProductService(Service): """Handles all product-related business logic.
Services in Zenith: - Inherit from Service base class - Get automatic dependency injection - Have access to self.db (database session) - Can emit and listen to events - Are singletons (one instance shared across requests) """
async def list_products( self, category: Optional[str] = None, limit: int = 100 ) -> List[Product]: """List products with optional filtering.
This method shows: - Query building based on parameters - Automatic limit for performance - Type-safe return values """
# Start with base query query = select(Product)
# Add filters conditionally (clean pattern) if category: query = query.where(Product.category == category)
# Always limit results to prevent memory issues # Business rule: never return more than 100 products at once products = await self.db.exec(query.limit(min(limit, 100)))
return products.all() # Returns List[Product] automatically
async def get_product(self, product_id: int) -> Optional[Product]: """Get a single product by ID.
Returns None if not found - let the route decide if that should be a 404 or something else. """
# self.db is automatically injected and managed return await self.db.get(Product, product_id)
async def create_product(self, data: ProductCreate) -> Product: """Create a new product with validation.
Shows the typical create pattern: 1. Validate business rules 2. Create entity 3. Save to database 4. Return fresh entity with ID """
# Business validation (not HTTP validation) if data.price < 0: raise ValueError("Product price cannot be negative")
if await self._product_name_exists(data.name): raise ValueError(f"Product '{data.name}' already exists")
# Create the entity product = Product(**data.model_dump()) product.created_at = datetime.utcnow() # Add metadata
# Database operations self.db.add(product) # Stage for insert await self.db.commit() # Save to database await self.db.refresh(product) # Get generated ID and defaults
# Emit event for other parts of the system await self.events.emit("product.created", product)
return product # Now has ID from database
async def update_stock(self, product_id: int, quantity: int) -> Product: """Update product stock level - shows complex business operation.
This method demonstrates: - Fetching before updating - Business rule validation - Audit trail (updated_at) - Event emission for side effects """
# Step 1: Fetch the product (reuse existing method) product = await self.get_product(product_id)
# Step 2: Validate it exists (business rule) if not product: raise ValueError(f"Product {product_id} not found")
# Step 3: Validate business rules if quantity < 0: raise ValueError("Stock quantity cannot be negative")
# Step 4: Check for low stock warning old_quantity = product.stock_quantity if old_quantity > 10 and quantity <= 10: # Business event: stock is running low await self.events.emit("product.stock_low", { "product_id": product_id, "old_quantity": old_quantity, "new_quantity": quantity })
# Step 5: Update the entity product.stock_quantity = quantity product.updated_at = datetime.utcnow() # Audit trail
# Step 6: Save changes await self.db.commit() await self.db.refresh(product)
# Step 7: Emit event for other systems (inventory, analytics, etc.) await self.events.emit("product.stock_updated", { "product_id": product_id, "old_quantity": old_quantity, "new_quantity": quantity, "updated_by": self.current_user.id if self.current_user else None })
return product
# Private helper methods keep public API clean async def _product_name_exists(self, name: str) -> bool: """Check if product name already exists.""" query = select(Product).where(Product.name == name) result = await self.db.exec(query).first() return result is not NoneService with Dependencies (Composing Services)
Section titled “Service with Dependencies (Composing Services)”class OrderService(Service): """Handles order processing logic.
This service orchestrates multiple other services to complete a complex business operation. """
def __init__( self, container: DIContainer | None = None, products: ProductService | None = None, payments: PaymentService | None = None, notifications: NotificationService | None = None ): """Initialize with optional dependencies.
Dependency injection benefits: 1. Easy to test (mock dependencies) 2. Loose coupling 3. Single responsibility 4. Easy to swap implementations
Args: container: DI container (automatically provided in routes) products: Product service (optional, for testing) payments: Payment service (optional, for testing) notifications: Notification service (optional, for testing) """ super().__init__(container) self.products = products self.payments = payments self.notifications = notifications
async def create_order(self, order_data: OrderCreate) -> Order: """Create a new order with full processing.
This shows a complex business transaction that: 1. Validates inventory 2. Processes payment 3. Creates order 4. Updates inventory 5. Sends notifications
If ANY step fails, everything rolls back! """
# Use database transaction for consistency async with self.db.begin(): # Auto-rollback on error
# Step 1: Validate all products exist and have stock for item in order_data.items: # Reuse ProductService logic product = await self.products.get_product(item.product_id)
# Business validation if not product: raise ValueError(f"Product {item.product_id} not found")
if product.stock_quantity < item.quantity: # Detailed error for better UX raise ValueError( f"Insufficient stock for {product.name}: " f"requested {item.quantity}, available {product.stock_quantity}" )
# Additional business rule: max quantity per order if item.quantity > 100: raise ValueError(f"Maximum 100 units per item")
# Step 2: Calculate order total total = await self._calculate_total(order_data.items)
# Business rule: minimum order amount if total < 10.00: raise ValueError("Minimum order amount is $10.00")
# Step 3: Process payment (might fail!) try: payment = await self.payments.process_payment( amount=total, payment_method=order_data.payment_method, customer_id=order_data.customer_id ) except PaymentFailedError as e: # Log for monitoring logger.warning(f"Payment failed for customer {order_data.customer_id}: {e}") # Re-raise with better message raise ValueError(f"Payment processing failed: {e}")
# Step 4: Create the order record order = Order( customer_id=order_data.customer_id, items=order_data.items, total=total, payment_id=payment.id, status=OrderStatus.CONFIRMED, created_at=datetime.utcnow() )
self.db.add(order) await self.db.commit() # Get order ID
# Step 5: Update inventory for each item for item in order_data.items: # This also emits stock events await self.products.update_stock( item.product_id, product.stock_quantity - item.quantity # Reduce stock )
# Step 6: Send confirmation (non-critical) try: await self.notifications.send_order_confirmation(order) except Exception as e: # Don't fail order for notification failure logger.error(f"Failed to send confirmation for order {order.id}: {e}") # Could queue for retry instead await self.queue_notification_retry(order.id)
# Step 7: Emit domain event await self.events.emit("order.created", { "order_id": order.id, "customer_id": order.customer_id, "total": order.total, "item_count": len(order.items) })
return order
async def _calculate_total(self, items: List[OrderItem]) -> float: """Calculate order total with business rules.""" total = 0.0
for item in items: product = await self.products.get_product(item.product_id)
# Apply quantity discounts unit_price = product.price if item.quantity >= 10: unit_price *= 0.95 # 5% discount for 10+ if item.quantity >= 50: unit_price *= 0.90 # 10% discount for 50+
total += unit_price * item.quantity
return round(total, 2) # Always round money to 2 decimalsService Patterns
Section titled “Service Patterns”Repository Pattern (Advanced Data Layer Separation)
Section titled “Repository Pattern (Advanced Data Layer Separation)”class UserRepository: """Data access layer for users.
Repository pattern benefits: 1. Separates SQL from business logic 2. Makes testing easier (mock the repository) 3. Can swap data stores (PostgreSQL → MongoDB) 4. Centralizes query optimization """
def __init__(self, db: Session): self.db = db
async def find_by_email(self, email: str) -> Optional[User]: """Find user by email address.
This method only knows about data access, not business rules. """ # Build the query statement = select(User).where(User.email == email)
# Execute and return first result (or None) result = await self.db.exec(statement) return result.first()
async def find_by_id(self, user_id: int) -> Optional[User]: """Find user by primary key.""" # Use ORM's optimized get method return await self.db.get(User, user_id)
async def find_active_users( self, limit: int = 100, offset: int = 0 ) -> List[User]: """Find all active users with pagination.""" statement = ( select(User) .where(User.is_active == True) .order_by(User.created_at.desc()) # Newest first .limit(limit) .offset(offset) ) result = await self.db.exec(statement) return result.all()
async def save(self, user: User) -> User: """Persist user to database.""" self.db.add(user) # Add to session await self.db.commit() # Save to database await self.db.refresh(user) # Get generated fields (ID, timestamps) return user
async def delete(self, user: User) -> None: """Remove user from database.""" await self.db.delete(user) await self.db.commit()
# Complex queries stay in repository async def find_users_with_expired_subscriptions(self) -> List[User]: """Find users whose subscriptions have expired.
Complex SQL stays in repository, not in service! """ statement = ( select(User) .join(Subscription) .where(Subscription.expires_at < datetime.utcnow()) .where(Subscription.status == "active") ) result = await self.db.exec(statement) return result.all()
class UserService(Service): """Business logic layer for users.
Service uses repository for data access, focuses on business rules and orchestration. """
def __init__(self, repo: UserRepository = Inject(UserService)): """Initialize with repository dependency.""" self.repo = repo
async def get_user_by_email(self, email: str) -> User: """Get user by email with business validation.
Service adds business logic on top of repository. """ # Validate input (business rule) if not self._is_valid_email(email): raise ValueError(f"Invalid email format: {email}")
# Use repository for data access user = await self.repo.find_by_email(email)
# Apply business rule: user must exist if not user: raise UserNotFoundError(f"No user with email {email}")
# Apply business rule: account must be active if not user.is_active: raise UserInactiveError(f"User account is deactivated")
return user
async def deactivate_expired_users(self) -> int: """Deactivate users with expired subscriptions.
Business logic that uses repository's complex query. """ # Get expired users from repository expired_users = await self.repo.find_users_with_expired_subscriptions()
count = 0 for user in expired_users: # Apply business rules if user.is_premium: # Don't deactivate premium users await self._send_renewal_reminder(user) continue
# Deactivate user user.is_active = False user.deactivated_at = datetime.utcnow() await self.repo.save(user)
# Send notification await self._send_deactivation_notice(user) count += 1
logger.info(f"Deactivated {count} users with expired subscriptions") return count
def _is_valid_email(self, email: str) -> bool: """Validate email format (business rule).""" import re pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' return re.match(pattern, email) is not NoneEvent-Driven Services
Section titled “Event-Driven Services”Services can emit and listen to events:
class NotificationService(Service): """Handles notification logic."""
async def on_startup(self): """Subscribe to events on startup.""" self.events.subscribe("user.created", self.send_welcome_email) self.events.subscribe("order.completed", self.send_order_email) self.events.subscribe("payment.failed", self.send_payment_failed_email)
async def send_welcome_email(self, event: Event): """Send welcome email when user is created.""" user = event.data await self.email_service.send( to=user.email, subject="Welcome to our platform!", template="welcome", context={"user": user} )Standalone Service Usage (Outside DI)
Section titled “Standalone Service Usage (Outside DI)”Sometimes you need to use Services outside of route handlers - in helper functions, middleware, background jobs, or CLI commands. The Service.create() factory method makes this easy:
# In a helper function (not a route handler)async def get_user_from_request(request: Request) -> User | None: """Extract and validate user from API key in request headers."""
# Get API key from header api_key = request.headers.get("X-API-Key") if not api_key: return None
# Create service instance for standalone usage auth_service = await ApiKeyService.create()
# Use service business logic user = await auth_service.validate_api_key(api_key) return user
# In middlewareclass CustomAuthMiddleware: async def __call__(self, request: Request, call_next): # Use Service.create() to access business logic user_service = await UserService.create()
# Authenticate user using service user = await user_service.authenticate_from_request(request)
# Attach to request request.state.user = user
return await call_next(request)
# In a background jobasync def send_daily_reports(): """Background task that sends daily reports to active users."""
# Create services without DI users = await UserService.create() reports = await ReportService.create()
# Use business logic active_users = await users.get_active_users() for user in active_users: report = await reports.generate_daily_report(user.id) await reports.send_email(user.email, report)
# In a CLI commandasync def cli_create_user(email: str, name: str): """CLI command to create a user."""
# Create service for CLI usage users = await UserService.create()
# Use same business logic as the API try: user = await users.create_user(UserCreate(email=email, name=name)) print(f"Created user: {user.email}") except ValueError as e: print(f"Error: {e}")How it works:
Service.create()creates a new instance of the Service- Calls
initialize()automatically - Returns a ready-to-use service without DI container
- Service has
container = Noneandevents = None - Perfect for standalone usage patterns
Example: Refactoring from direct DB queries to Service:
# Before: Direct database queries (Issue #2 pattern)async def get_user_from_request(request: Request) -> User | None: authorization = request.headers.get("authorization") if not authorization or not authorization.startswith("Bearer "): return None
key = authorization[7:]
# Direct database query - business logic scattered api_key = await ApiKey.find_by(key=key, is_active=True) if not api_key: return None
return await User.find(api_key.user_id)
# After: Using Service.create() (clean business logic)async def get_user_from_request(request: Request) -> User | None: authorization = request.headers.get("authorization") if not authorization or not authorization.startswith("Bearer "): return None
key = authorization[7:]
# Use service for business logic auth_service = await ApiKeyService.create() return await auth_service.get_user_by_key(key)This pattern was introduced to solve Issue #2 from real-world usage.
Testing Services
Section titled “Testing Services”Services are easy to test in isolation:
import pytestfrom zenith.testing import TestService
@pytest.mark.asyncioasync def test_user_creation(): """Test user creation logic.
TestService provides: - Isolated database (in-memory) - Automatic cleanup - Mocked dependencies - No HTTP layer needed """
# TestService creates isolated environment for testing async with TestService(UserService) as users: # Step 1: Prepare test data user_data = UserCreate( email="test@example.com", password="securepassword123", # Will be hashed name="Test User" )
# Step 2: Execute business logic (no HTTP!) user = await users.create_user(user_data)
# Step 3: Assert business rules were applied assert user.email == "test@example.com" assert user.name == "Test User" assert user.id is not None # Database generated ID
# Step 4: Verify security was applied assert user.password_hash != "securepassword123" # Must be hashed assert user.password_hash.startswith("$2b$") # bcrypt format
# Step 5: Verify side effects (if needed) # In real test, you might check: # - Email was queued # - Event was emitted # - Audit log was created
@pytest.mark.asyncioasync def test_duplicate_email(): """Test that duplicate emails are rejected.
This tests a business rule, not HTTP validation! """
async with TestService(UserService) as users: # Create first user successfully await users.create_user(UserCreate( email="test@example.com", password="password123", name="First User" ))
# Try to create second user with same email # Note: pytest.raises catches the domain exception with pytest.raises(ValueError, match="already exists"): await users.create_user(UserCreate( email="test@example.com", # Duplicate! password="password456", name="Second User" ))
@pytest.mark.asyncioasync def test_order_processing(): """Test complex order processing with mocked dependencies."""
async with TestService(OrderService) as orders: # Mock the payment service orders.payments.process_payment = AsyncMock( return_value=Payment(id=123, status="completed") )
# Mock the notification service orders.notifications.send_order_confirmation = AsyncMock()
# Create test order order_data = OrderCreate( customer_id=1, items=[ OrderItem(product_id=1, quantity=2), OrderItem(product_id=2, quantity=1) ], payment_method="credit_card" )
# Process order order = await orders.create_order(order_data)
# Verify order was created assert order.id is not None assert order.status == OrderStatus.CONFIRMED
# Verify payment was processed orders.payments.process_payment.assert_called_once()
# Verify notification was sent orders.notifications.send_order_confirmation.assert_called_once_with(order)
# Test error handling@pytest.mark.asyncioasync def test_order_fails_on_insufficient_stock(): """Test that orders fail gracefully when stock is insufficient."""
async with TestService(OrderService) as orders: # Setup: Create product with limited stock product = await orders.products.create_product( ProductCreate(name="Limited Item", price=10.00, stock_quantity=1) )
# Try to order more than available order_data = OrderCreate( customer_id=1, items=[OrderItem(product_id=product.id, quantity=5)], # Want 5, have 1 payment_method="credit_card" )
# Should raise business error with pytest.raises(ValueError, match="Insufficient stock"): await orders.create_order(order_data)
# Verify stock wasn't changed updated_product = await orders.products.get_product(product.id) assert updated_product.stock_quantity == 1 # Still 1Advanced Patterns
Section titled “Advanced Patterns”Service Composition (Complex Business Operations)
Section titled “Service Composition (Complex Business Operations)”class CheckoutService(Service): """Orchestrates the entire checkout process.
This pattern is called 'Service Orchestration': - One service coordinates multiple others - Handles complex transactions - Manages rollback on failure - Perfect for multi-step business processes """
def __init__( self, # Each service handles its own domain cart: CartService = Inject(CartService), inventory: InventoryService = Inject(InventoryService), payment: PaymentService = Inject(PaymentService), shipping: ShippingService = Inject(ShippingService), email: EmailService = Inject(EmailService) ): """Initialize with all required services.
Each service is independently testable and reusable. """ super().__init__() self.cart = cart self.inventory = inventory self.payment = payment self.shipping = shipping self.email = email
async def process_checkout( self, user_id: int, payment_info: PaymentInfo, shipping_info: ShippingInfo ) -> Order: """Complete the entire checkout process.
This method demonstrates: 1. Database transactions for consistency 2. Inventory reservation pattern 3. Payment processing with rollback 4. Multi-service coordination 5. Error recovery strategies """
# Start database transaction for all-or-nothing execution async with self.db.transaction(): # Step 1: Get and validate cart cart_items = await self.cart.get_items(user_id)
if not cart_items: raise ValueError("Cart is empty")
# Business rule: max items per order if len(cart_items) > 100: raise ValueError("Maximum 100 items per order")
# Step 2: Reserve inventory (prevents overselling) # This is the 'two-phase commit' pattern reservations = await self.inventory.reserve_items(cart_items)
try: # Step 3: Calculate final price (might include discounts, taxes) subtotal = self.calculate_subtotal(cart_items) tax = self.calculate_tax(subtotal, shipping_info.state) shipping_cost = await self.shipping.calculate_cost( items=cart_items, destination=shipping_info ) total = subtotal + tax + shipping_cost
# Step 4: Process payment (external service, might fail!) logger.info(f"Processing payment of ${total} for user {user_id}")
payment = await self.payment.charge( amount=total, payment_info=payment_info, metadata={ "user_id": user_id, "item_count": len(cart_items) } )
# Step 5: Create the order (payment succeeded) order = await self.create_order( user_id=user_id, items=cart_items, payment_id=payment.id, subtotal=subtotal, tax=tax, shipping_cost=shipping_cost, total=total )
# Step 6: Convert reservations to confirmed inventory deduction await self.inventory.confirm_reservations(reservations)
# Step 7: Schedule shipping shipment = await self.shipping.schedule( order=order, shipping_info=shipping_info, items=cart_items ) order.shipment_id = shipment.id
# Step 8: Clear the cart (order complete) await self.cart.clear(user_id)
# Step 9: Send confirmation email (non-critical) try: await self.email.send_order_confirmation( order=order, user_email=await self.get_user_email(user_id) ) except Exception as e: # Don't fail order for email issues logger.error(f"Failed to send confirmation email: {e}") # Queue for retry later await self.queue.enqueue( "send_order_confirmation", order_id=order.id )
# Step 10: Emit success event await self.events.emit("checkout.completed", { "order_id": order.id, "user_id": user_id, "total": total, "item_count": len(cart_items) })
logger.info(f"Checkout completed: Order {order.id} for user {user_id}") return order
except PaymentFailedError as e: # Payment failed - rollback everything! logger.warning(f"Payment failed for user {user_id}: {e}")
# Release the inventory reservations await self.inventory.release_reservations(reservations)
# Emit failure event for monitoring await self.events.emit("checkout.payment_failed", { "user_id": user_id, "error": str(e), "amount": total })
# Re-raise with user-friendly message raise ValueError( "Payment could not be processed. " "Please check your payment information and try again." )
except Exception as e: # Any other error - ensure cleanup logger.error(f"Checkout failed for user {user_id}: {e}")
# Always release reservations on failure await self.inventory.release_reservations(reservations)
# Emit generic failure event await self.events.emit("checkout.failed", { "user_id": user_id, "error": str(e) })
raise # Re-raise original errorService Middleware
Section titled “Service Middleware”Add cross-cutting concerns to all context methods:
class AuditedService(Service): """Base service with audit logging."""
async def __call__(self, method_name: str, *args, **kwargs): """Log all service method calls.""" start_time = time.time() user = self.request.user if hasattr(self.request, 'user') else None
try: result = await super().__call__(method_name, *args, **kwargs)
# Log success await self.audit_log.create( user_id=user.id if user else None, action=f"{self.__class__.__name__}.{method_name}", status="success", duration=time.time() - start_time )
return result
except Exception as e: # Log failure await self.audit_log.create( user_id=user.id if user else None, action=f"{self.__class__.__name__}.{method_name}", status="error", error=str(e), duration=time.time() - start_time ) raiseBest Practices
Section titled “Best Practices”- Keep services focused on a single domain
- Use type hints for all methods
- Write comprehensive tests for service logic
- Use dependency injection for flexibility
- Document complex business rules
Don’ts
Section titled “Don’ts”- Don’t import web framework code in services
- Don’t handle HTTP errors in services
- Don’t make services too large (split if needed)
- Don’t mix different domain concerns
- Don’t bypass services for business logic
Next Steps
Section titled “Next Steps”- Learn about Dependency Injection
- Explore Testing Services
- See Real-world Examples