Skip to content

Service System

Services are Zenith’s way of organizing business logic. They provide a clean separation between your domain logic and web layer, making your code more maintainable, testable, and reusable.

# Business logic mixed with web layer - messy and hard to maintain
@app.post("/users")
async def create_user(user_data: UserCreate, db: Session):
# Problem 1: Business validation mixed with HTTP handling
if len(user_data.password) < 8:
raise HTTPException(400, "Password too short") # HTTP concern in business logic!
# Problem 2: Database queries directly in route handler
existing = db.query(User).filter_by(email=user_data.email).first()
if existing:
raise HTTPException(400, "Email already exists") # More HTTP mixing
# Problem 3: Security logic (password hashing) in route
password_hash = bcrypt.hashpw(user_data.password.encode(), bcrypt.gensalt())
# Problem 4: Database operations scattered everywhere
user = User(
email=user_data.email,
password_hash=password_hash
)
db.add(user)
db.commit() # What if this fails?
# Problem 5: Side effects mixed in
send_email(user.email, "Welcome!") # What if email fails?
# Problem 6: Logging scattered through routes
logger.info(f"User created: {user.email}")
# Problem 7: How do you test this? Mock everything?
# Problem 8: How do you reuse this logic in CLI commands?
# Problem 9: How do you add more features without making this huge?
return user
# Clean separation of concerns - business logic in service
class UserService(Service):
"""Encapsulates ALL user-related business logic.
Benefits:
1. Testable - mock only what you need
2. Reusable - same logic for API, CLI, background jobs
3. Maintainable - all user logic in one place
4. Scalable - easy to add features
"""
async def create_user(self, user_data: UserCreate) -> User:
"""Create a new user with all business rules applied."""
# Step 1: Validate business rules (not HTTP validation!)
await self._validate_password(user_data.password)
await self._ensure_email_unique(user_data.email)
# Step 2: Core business operation
user = await self._save_user(user_data)
# Step 3: Side effects (organized, not scattered)
await self._send_welcome_email(user)
await self._log_user_creation(user)
return user # Just the user, no HTTP concerns
# Private methods organize complex logic
async def _validate_password(self, password: str):
"""Business rule: passwords must be secure."""
if len(password) < 8:
raise ValueError("Password must be at least 8 characters") # Domain exception
if not any(c.isdigit() for c in password):
raise ValueError("Password must contain a number")
async def _ensure_email_unique(self, email: str):
"""Business rule: emails must be unique."""
if await self.db.exists(User, email=email):
raise ValueError(f"Email {email} already registered") # Domain exception
# Route handler becomes trivially simple
@app.post("/users")
async def create_user(
user_data: UserCreate,
users: UserService = Inject(UserService) # Service injected automatically
) -> User:
# That's it! Route only handles HTTP concerns:
# 1. Receive request (handled by Zenith)
# 2. Call business logic
# 3. Return response (handled by Zenith)
return await users.create_user(user_data)
# Now you can reuse the same logic everywhere:
# CLI command
@cli.command()
async def create_admin(email: str, password: str):
users = UserService()
await users.create_user(UserCreate(email=email, password=password, role="admin"))
# Background job
@task_queue.job
async def import_users(csv_file: str):
users = UserService()
for row in read_csv(csv_file):
await users.create_user(UserCreate(**row))
from zenith import Service, Inject
from typing import List, Optional
from datetime import datetime
class ProductService(Service):
"""Handles all product-related business logic.
Services in Zenith:
- Inherit from Service base class
- Get automatic dependency injection
- Have access to self.db (database session)
- Can emit and listen to events
- Are singletons (one instance shared across requests)
"""
async def list_products(
self,
category: Optional[str] = None,
limit: int = 100
) -> List[Product]:
"""List products with optional filtering.
This method shows:
- Query building based on parameters
- Automatic limit for performance
- Type-safe return values
"""
# Start with base query
query = select(Product)
# Add filters conditionally (clean pattern)
if category:
query = query.where(Product.category == category)
# Always limit results to prevent memory issues
# Business rule: never return more than 100 products at once
products = await self.db.exec(query.limit(min(limit, 100)))
return products.all() # Returns List[Product] automatically
async def get_product(self, product_id: int) -> Optional[Product]:
"""Get a single product by ID.
Returns None if not found - let the route decide
if that should be a 404 or something else.
"""
# self.db is automatically injected and managed
return await self.db.get(Product, product_id)
async def create_product(self, data: ProductCreate) -> Product:
"""Create a new product with validation.
Shows the typical create pattern:
1. Validate business rules
2. Create entity
3. Save to database
4. Return fresh entity with ID
"""
# Business validation (not HTTP validation)
if data.price < 0:
raise ValueError("Product price cannot be negative")
if await self._product_name_exists(data.name):
raise ValueError(f"Product '{data.name}' already exists")
# Create the entity
product = Product(**data.model_dump())
product.created_at = datetime.utcnow() # Add metadata
# Database operations
self.db.add(product) # Stage for insert
await self.db.commit() # Save to database
await self.db.refresh(product) # Get generated ID and defaults
# Emit event for other parts of the system
await self.events.emit("product.created", product)
return product # Now has ID from database
async def update_stock(self, product_id: int, quantity: int) -> Product:
"""Update product stock level - shows complex business operation.
This method demonstrates:
- Fetching before updating
- Business rule validation
- Audit trail (updated_at)
- Event emission for side effects
"""
# Step 1: Fetch the product (reuse existing method)
product = await self.get_product(product_id)
# Step 2: Validate it exists (business rule)
if not product:
raise ValueError(f"Product {product_id} not found")
# Step 3: Validate business rules
if quantity < 0:
raise ValueError("Stock quantity cannot be negative")
# Step 4: Check for low stock warning
old_quantity = product.stock_quantity
if old_quantity > 10 and quantity <= 10:
# Business event: stock is running low
await self.events.emit("product.stock_low", {
"product_id": product_id,
"old_quantity": old_quantity,
"new_quantity": quantity
})
# Step 5: Update the entity
product.stock_quantity = quantity
product.updated_at = datetime.utcnow() # Audit trail
# Step 6: Save changes
await self.db.commit()
await self.db.refresh(product)
# Step 7: Emit event for other systems (inventory, analytics, etc.)
await self.events.emit("product.stock_updated", {
"product_id": product_id,
"old_quantity": old_quantity,
"new_quantity": quantity,
"updated_by": self.current_user.id if self.current_user else None
})
return product
# Private helper methods keep public API clean
async def _product_name_exists(self, name: str) -> bool:
"""Check if product name already exists."""
query = select(Product).where(Product.name == name)
result = await self.db.exec(query).first()
return result is not None

Service with Dependencies (Composing Services)

Section titled “Service with Dependencies (Composing Services)”
class OrderService(Service):
"""Handles order processing logic.
This service orchestrates multiple other services
to complete a complex business operation.
"""
def __init__(
self,
container: DIContainer | None = None,
products: ProductService | None = None,
payments: PaymentService | None = None,
notifications: NotificationService | None = None
):
"""Initialize with optional dependencies.
Dependency injection benefits:
1. Easy to test (mock dependencies)
2. Loose coupling
3. Single responsibility
4. Easy to swap implementations
Args:
container: DI container (automatically provided in routes)
products: Product service (optional, for testing)
payments: Payment service (optional, for testing)
notifications: Notification service (optional, for testing)
"""
super().__init__(container)
self.products = products
self.payments = payments
self.notifications = notifications
async def create_order(self, order_data: OrderCreate) -> Order:
"""Create a new order with full processing.
This shows a complex business transaction that:
1. Validates inventory
2. Processes payment
3. Creates order
4. Updates inventory
5. Sends notifications
If ANY step fails, everything rolls back!
"""
# Use database transaction for consistency
async with self.db.begin(): # Auto-rollback on error
# Step 1: Validate all products exist and have stock
for item in order_data.items:
# Reuse ProductService logic
product = await self.products.get_product(item.product_id)
# Business validation
if not product:
raise ValueError(f"Product {item.product_id} not found")
if product.stock_quantity < item.quantity:
# Detailed error for better UX
raise ValueError(
f"Insufficient stock for {product.name}: "
f"requested {item.quantity}, available {product.stock_quantity}"
)
# Additional business rule: max quantity per order
if item.quantity > 100:
raise ValueError(f"Maximum 100 units per item")
# Step 2: Calculate order total
total = await self._calculate_total(order_data.items)
# Business rule: minimum order amount
if total < 10.00:
raise ValueError("Minimum order amount is $10.00")
# Step 3: Process payment (might fail!)
try:
payment = await self.payments.process_payment(
amount=total,
payment_method=order_data.payment_method,
customer_id=order_data.customer_id
)
except PaymentFailedError as e:
# Log for monitoring
logger.warning(f"Payment failed for customer {order_data.customer_id}: {e}")
# Re-raise with better message
raise ValueError(f"Payment processing failed: {e}")
# Step 4: Create the order record
order = Order(
customer_id=order_data.customer_id,
items=order_data.items,
total=total,
payment_id=payment.id,
status=OrderStatus.CONFIRMED,
created_at=datetime.utcnow()
)
self.db.add(order)
await self.db.commit() # Get order ID
# Step 5: Update inventory for each item
for item in order_data.items:
# This also emits stock events
await self.products.update_stock(
item.product_id,
product.stock_quantity - item.quantity # Reduce stock
)
# Step 6: Send confirmation (non-critical)
try:
await self.notifications.send_order_confirmation(order)
except Exception as e:
# Don't fail order for notification failure
logger.error(f"Failed to send confirmation for order {order.id}: {e}")
# Could queue for retry instead
await self.queue_notification_retry(order.id)
# Step 7: Emit domain event
await self.events.emit("order.created", {
"order_id": order.id,
"customer_id": order.customer_id,
"total": order.total,
"item_count": len(order.items)
})
return order
async def _calculate_total(self, items: List[OrderItem]) -> float:
"""Calculate order total with business rules."""
total = 0.0
for item in items:
product = await self.products.get_product(item.product_id)
# Apply quantity discounts
unit_price = product.price
if item.quantity >= 10:
unit_price *= 0.95 # 5% discount for 10+
if item.quantity >= 50:
unit_price *= 0.90 # 10% discount for 50+
total += unit_price * item.quantity
return round(total, 2) # Always round money to 2 decimals

Repository Pattern (Advanced Data Layer Separation)

Section titled “Repository Pattern (Advanced Data Layer Separation)”
class UserRepository:
"""Data access layer for users.
Repository pattern benefits:
1. Separates SQL from business logic
2. Makes testing easier (mock the repository)
3. Can swap data stores (PostgreSQL → MongoDB)
4. Centralizes query optimization
"""
def __init__(self, db: Session):
self.db = db
async def find_by_email(self, email: str) -> Optional[User]:
"""Find user by email address.
This method only knows about data access,
not business rules.
"""
# Build the query
statement = select(User).where(User.email == email)
# Execute and return first result (or None)
result = await self.db.exec(statement)
return result.first()
async def find_by_id(self, user_id: int) -> Optional[User]:
"""Find user by primary key."""
# Use ORM's optimized get method
return await self.db.get(User, user_id)
async def find_active_users(
self,
limit: int = 100,
offset: int = 0
) -> List[User]:
"""Find all active users with pagination."""
statement = (
select(User)
.where(User.is_active == True)
.order_by(User.created_at.desc()) # Newest first
.limit(limit)
.offset(offset)
)
result = await self.db.exec(statement)
return result.all()
async def save(self, user: User) -> User:
"""Persist user to database."""
self.db.add(user) # Add to session
await self.db.commit() # Save to database
await self.db.refresh(user) # Get generated fields (ID, timestamps)
return user
async def delete(self, user: User) -> None:
"""Remove user from database."""
await self.db.delete(user)
await self.db.commit()
# Complex queries stay in repository
async def find_users_with_expired_subscriptions(self) -> List[User]:
"""Find users whose subscriptions have expired.
Complex SQL stays in repository, not in service!
"""
statement = (
select(User)
.join(Subscription)
.where(Subscription.expires_at < datetime.utcnow())
.where(Subscription.status == "active")
)
result = await self.db.exec(statement)
return result.all()
class UserService(Service):
"""Business logic layer for users.
Service uses repository for data access,
focuses on business rules and orchestration.
"""
def __init__(self, repo: UserRepository = Inject(UserService)):
"""Initialize with repository dependency."""
self.repo = repo
async def get_user_by_email(self, email: str) -> User:
"""Get user by email with business validation.
Service adds business logic on top of repository.
"""
# Validate input (business rule)
if not self._is_valid_email(email):
raise ValueError(f"Invalid email format: {email}")
# Use repository for data access
user = await self.repo.find_by_email(email)
# Apply business rule: user must exist
if not user:
raise UserNotFoundError(f"No user with email {email}")
# Apply business rule: account must be active
if not user.is_active:
raise UserInactiveError(f"User account is deactivated")
return user
async def deactivate_expired_users(self) -> int:
"""Deactivate users with expired subscriptions.
Business logic that uses repository's complex query.
"""
# Get expired users from repository
expired_users = await self.repo.find_users_with_expired_subscriptions()
count = 0
for user in expired_users:
# Apply business rules
if user.is_premium: # Don't deactivate premium users
await self._send_renewal_reminder(user)
continue
# Deactivate user
user.is_active = False
user.deactivated_at = datetime.utcnow()
await self.repo.save(user)
# Send notification
await self._send_deactivation_notice(user)
count += 1
logger.info(f"Deactivated {count} users with expired subscriptions")
return count
def _is_valid_email(self, email: str) -> bool:
"""Validate email format (business rule)."""
import re
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
return re.match(pattern, email) is not None

Services can emit and listen to events:

class NotificationService(Service):
"""Handles notification logic."""
async def on_startup(self):
"""Subscribe to events on startup."""
self.events.subscribe("user.created", self.send_welcome_email)
self.events.subscribe("order.completed", self.send_order_email)
self.events.subscribe("payment.failed", self.send_payment_failed_email)
async def send_welcome_email(self, event: Event):
"""Send welcome email when user is created."""
user = event.data
await self.email_service.send(
to=user.email,
subject="Welcome to our platform!",
template="welcome",
context={"user": user}
)

Sometimes you need to use Services outside of route handlers - in helper functions, middleware, background jobs, or CLI commands. The Service.create() factory method makes this easy:

# In a helper function (not a route handler)
async def get_user_from_request(request: Request) -> User | None:
"""Extract and validate user from API key in request headers."""
# Get API key from header
api_key = request.headers.get("X-API-Key")
if not api_key:
return None
# Create service instance for standalone usage
auth_service = await ApiKeyService.create()
# Use service business logic
user = await auth_service.validate_api_key(api_key)
return user
# In middleware
class CustomAuthMiddleware:
async def __call__(self, request: Request, call_next):
# Use Service.create() to access business logic
user_service = await UserService.create()
# Authenticate user using service
user = await user_service.authenticate_from_request(request)
# Attach to request
request.state.user = user
return await call_next(request)
# In a background job
async def send_daily_reports():
"""Background task that sends daily reports to active users."""
# Create services without DI
users = await UserService.create()
reports = await ReportService.create()
# Use business logic
active_users = await users.get_active_users()
for user in active_users:
report = await reports.generate_daily_report(user.id)
await reports.send_email(user.email, report)
# In a CLI command
async def cli_create_user(email: str, name: str):
"""CLI command to create a user."""
# Create service for CLI usage
users = await UserService.create()
# Use same business logic as the API
try:
user = await users.create_user(UserCreate(email=email, name=name))
print(f"Created user: {user.email}")
except ValueError as e:
print(f"Error: {e}")

How it works:

  • Service.create() creates a new instance of the Service
  • Calls initialize() automatically
  • Returns a ready-to-use service without DI container
  • Service has container = None and events = None
  • Perfect for standalone usage patterns

Example: Refactoring from direct DB queries to Service:

# Before: Direct database queries (Issue #2 pattern)
async def get_user_from_request(request: Request) -> User | None:
authorization = request.headers.get("authorization")
if not authorization or not authorization.startswith("Bearer "):
return None
key = authorization[7:]
# Direct database query - business logic scattered
api_key = await ApiKey.find_by(key=key, is_active=True)
if not api_key:
return None
return await User.find(api_key.user_id)
# After: Using Service.create() (clean business logic)
async def get_user_from_request(request: Request) -> User | None:
authorization = request.headers.get("authorization")
if not authorization or not authorization.startswith("Bearer "):
return None
key = authorization[7:]
# Use service for business logic
auth_service = await ApiKeyService.create()
return await auth_service.get_user_by_key(key)

This pattern was introduced to solve Issue #2 from real-world usage.

Services are easy to test in isolation:

import pytest
from zenith.testing import TestService
@pytest.mark.asyncio
async def test_user_creation():
"""Test user creation logic.
TestService provides:
- Isolated database (in-memory)
- Automatic cleanup
- Mocked dependencies
- No HTTP layer needed
"""
# TestService creates isolated environment for testing
async with TestService(UserService) as users:
# Step 1: Prepare test data
user_data = UserCreate(
email="test@example.com",
password="securepassword123", # Will be hashed
name="Test User"
)
# Step 2: Execute business logic (no HTTP!)
user = await users.create_user(user_data)
# Step 3: Assert business rules were applied
assert user.email == "test@example.com"
assert user.name == "Test User"
assert user.id is not None # Database generated ID
# Step 4: Verify security was applied
assert user.password_hash != "securepassword123" # Must be hashed
assert user.password_hash.startswith("$2b$") # bcrypt format
# Step 5: Verify side effects (if needed)
# In real test, you might check:
# - Email was queued
# - Event was emitted
# - Audit log was created
@pytest.mark.asyncio
async def test_duplicate_email():
"""Test that duplicate emails are rejected.
This tests a business rule, not HTTP validation!
"""
async with TestService(UserService) as users:
# Create first user successfully
await users.create_user(UserCreate(
email="test@example.com",
password="password123",
name="First User"
))
# Try to create second user with same email
# Note: pytest.raises catches the domain exception
with pytest.raises(ValueError, match="already exists"):
await users.create_user(UserCreate(
email="test@example.com", # Duplicate!
password="password456",
name="Second User"
))
@pytest.mark.asyncio
async def test_order_processing():
"""Test complex order processing with mocked dependencies."""
async with TestService(OrderService) as orders:
# Mock the payment service
orders.payments.process_payment = AsyncMock(
return_value=Payment(id=123, status="completed")
)
# Mock the notification service
orders.notifications.send_order_confirmation = AsyncMock()
# Create test order
order_data = OrderCreate(
customer_id=1,
items=[
OrderItem(product_id=1, quantity=2),
OrderItem(product_id=2, quantity=1)
],
payment_method="credit_card"
)
# Process order
order = await orders.create_order(order_data)
# Verify order was created
assert order.id is not None
assert order.status == OrderStatus.CONFIRMED
# Verify payment was processed
orders.payments.process_payment.assert_called_once()
# Verify notification was sent
orders.notifications.send_order_confirmation.assert_called_once_with(order)
# Test error handling
@pytest.mark.asyncio
async def test_order_fails_on_insufficient_stock():
"""Test that orders fail gracefully when stock is insufficient."""
async with TestService(OrderService) as orders:
# Setup: Create product with limited stock
product = await orders.products.create_product(
ProductCreate(name="Limited Item", price=10.00, stock_quantity=1)
)
# Try to order more than available
order_data = OrderCreate(
customer_id=1,
items=[OrderItem(product_id=product.id, quantity=5)], # Want 5, have 1
payment_method="credit_card"
)
# Should raise business error
with pytest.raises(ValueError, match="Insufficient stock"):
await orders.create_order(order_data)
# Verify stock wasn't changed
updated_product = await orders.products.get_product(product.id)
assert updated_product.stock_quantity == 1 # Still 1

Service Composition (Complex Business Operations)

Section titled “Service Composition (Complex Business Operations)”
class CheckoutService(Service):
"""Orchestrates the entire checkout process.
This pattern is called 'Service Orchestration':
- One service coordinates multiple others
- Handles complex transactions
- Manages rollback on failure
- Perfect for multi-step business processes
"""
def __init__(
self,
# Each service handles its own domain
cart: CartService = Inject(CartService),
inventory: InventoryService = Inject(InventoryService),
payment: PaymentService = Inject(PaymentService),
shipping: ShippingService = Inject(ShippingService),
email: EmailService = Inject(EmailService)
):
"""Initialize with all required services.
Each service is independently testable and reusable.
"""
super().__init__()
self.cart = cart
self.inventory = inventory
self.payment = payment
self.shipping = shipping
self.email = email
async def process_checkout(
self,
user_id: int,
payment_info: PaymentInfo,
shipping_info: ShippingInfo
) -> Order:
"""Complete the entire checkout process.
This method demonstrates:
1. Database transactions for consistency
2. Inventory reservation pattern
3. Payment processing with rollback
4. Multi-service coordination
5. Error recovery strategies
"""
# Start database transaction for all-or-nothing execution
async with self.db.transaction():
# Step 1: Get and validate cart
cart_items = await self.cart.get_items(user_id)
if not cart_items:
raise ValueError("Cart is empty")
# Business rule: max items per order
if len(cart_items) > 100:
raise ValueError("Maximum 100 items per order")
# Step 2: Reserve inventory (prevents overselling)
# This is the 'two-phase commit' pattern
reservations = await self.inventory.reserve_items(cart_items)
try:
# Step 3: Calculate final price (might include discounts, taxes)
subtotal = self.calculate_subtotal(cart_items)
tax = self.calculate_tax(subtotal, shipping_info.state)
shipping_cost = await self.shipping.calculate_cost(
items=cart_items,
destination=shipping_info
)
total = subtotal + tax + shipping_cost
# Step 4: Process payment (external service, might fail!)
logger.info(f"Processing payment of ${total} for user {user_id}")
payment = await self.payment.charge(
amount=total,
payment_info=payment_info,
metadata={
"user_id": user_id,
"item_count": len(cart_items)
}
)
# Step 5: Create the order (payment succeeded)
order = await self.create_order(
user_id=user_id,
items=cart_items,
payment_id=payment.id,
subtotal=subtotal,
tax=tax,
shipping_cost=shipping_cost,
total=total
)
# Step 6: Convert reservations to confirmed inventory deduction
await self.inventory.confirm_reservations(reservations)
# Step 7: Schedule shipping
shipment = await self.shipping.schedule(
order=order,
shipping_info=shipping_info,
items=cart_items
)
order.shipment_id = shipment.id
# Step 8: Clear the cart (order complete)
await self.cart.clear(user_id)
# Step 9: Send confirmation email (non-critical)
try:
await self.email.send_order_confirmation(
order=order,
user_email=await self.get_user_email(user_id)
)
except Exception as e:
# Don't fail order for email issues
logger.error(f"Failed to send confirmation email: {e}")
# Queue for retry later
await self.queue.enqueue(
"send_order_confirmation",
order_id=order.id
)
# Step 10: Emit success event
await self.events.emit("checkout.completed", {
"order_id": order.id,
"user_id": user_id,
"total": total,
"item_count": len(cart_items)
})
logger.info(f"Checkout completed: Order {order.id} for user {user_id}")
return order
except PaymentFailedError as e:
# Payment failed - rollback everything!
logger.warning(f"Payment failed for user {user_id}: {e}")
# Release the inventory reservations
await self.inventory.release_reservations(reservations)
# Emit failure event for monitoring
await self.events.emit("checkout.payment_failed", {
"user_id": user_id,
"error": str(e),
"amount": total
})
# Re-raise with user-friendly message
raise ValueError(
"Payment could not be processed. "
"Please check your payment information and try again."
)
except Exception as e:
# Any other error - ensure cleanup
logger.error(f"Checkout failed for user {user_id}: {e}")
# Always release reservations on failure
await self.inventory.release_reservations(reservations)
# Emit generic failure event
await self.events.emit("checkout.failed", {
"user_id": user_id,
"error": str(e)
})
raise # Re-raise original error

Add cross-cutting concerns to all context methods:

class AuditedService(Service):
"""Base service with audit logging."""
async def __call__(self, method_name: str, *args, **kwargs):
"""Log all service method calls."""
start_time = time.time()
user = self.request.user if hasattr(self.request, 'user') else None
try:
result = await super().__call__(method_name, *args, **kwargs)
# Log success
await self.audit_log.create(
user_id=user.id if user else None,
action=f"{self.__class__.__name__}.{method_name}",
status="success",
duration=time.time() - start_time
)
return result
except Exception as e:
# Log failure
await self.audit_log.create(
user_id=user.id if user else None,
action=f"{self.__class__.__name__}.{method_name}",
status="error",
error=str(e),
duration=time.time() - start_time
)
raise
  • Keep services focused on a single domain
  • Use type hints for all methods
  • Write comprehensive tests for service logic
  • Use dependency injection for flexibility
  • Document complex business rules
  • Don’t import web framework code in services
  • Don’t handle HTTP errors in services
  • Don’t make services too large (split if needed)
  • Don’t mix different domain concerns
  • Don’t bypass services for business logic