Continue to Part 7
Ready to deploy? Continue to Deployment →
Not everything should happen during a web request. Long-running tasks, email notifications, and scheduled jobs need to run in the background. In this part, we’ll build a robust background job system that keeps your API responsive while handling complex work asynchronously.
By the end of this part:
Background jobs solve three problems:
# Without background jobs (slow, blocks response)@app.post("/send-report")async def send_report(email: str): report = generate_report() # Takes 30 seconds send_email(email, report) # Takes 2 seconds return {"status": "sent"} # User waits 32+ seconds!
# With background jobs (fast, non-blocking)@app.post("/send-report")async def send_report(email: str, background_tasks: BackgroundTasks): background_tasks.add_task(generate_and_send_report, email) return {"status": "queued"} # Instant response!Install required packages:
# Using uvuv add celery[redis] redis flower python-dotenv jinja2
# Or using pippip install celery[redis] redis flower python-dotenv jinja2Configure Redis (our message broker):
# macOSbrew install redisbrew services start redis
# Ubuntu/Debiansudo apt-get install redis-serversudo systemctl start redis
# Dockerdocker run -d -p 6379:6379 redis:alpineCreate app/celery_app.py:
"""Celery configuration for background tasks.
Celery handles:- Task queuing and execution- Retries and error handling- Scheduled tasks- Result storage"""
from celery import Celeryfrom celery.schedules import crontabfrom app.config import settings
# Create Celery instancecelery_app = Celery( "taskflow", broker=settings.REDIS_URL or "redis://localhost:6379/0", backend=settings.REDIS_URL or "redis://localhost:6379/0", include=["app.tasks"] # Auto-discover tasks in this module)
# Configurationcelery_app.conf.update( # Task execution settings task_serializer="json", accept_content=["json"], result_serializer="json", timezone="UTC", enable_utc=True,
# Performance settings worker_prefetch_multiplier=4, # Prefetch 4 tasks per worker task_acks_late=True, # Acknowledge after task completes
# Retry settings task_default_retry_delay=60, # Retry after 1 minute task_max_retries=3, # Max 3 retries
# Result backend settings result_expires=3600, # Results expire after 1 hour
# Task routing (optional) task_routes={ "app.tasks.email.*": {"queue": "email"}, "app.tasks.heavy.*": {"queue": "heavy"}, },
# Rate limiting task_annotations={ "app.tasks.send_email": {"rate_limit": "100/m"}, # 100 emails per minute })
# Scheduled tasks (Celery Beat)celery_app.conf.beat_schedule = { # Daily summary email at 9 AM "daily-summary": { "task": "app.tasks.send_daily_summary", "schedule": crontab(hour=9, minute=0), },
# Clean up old sessions every hour "cleanup-sessions": { "task": "app.tasks.cleanup_expired_sessions", "schedule": crontab(minute=0), # Every hour at :00 },
# Check for overdue tasks every 30 minutes "check-overdue": { "task": "app.tasks.check_overdue_tasks", "schedule": crontab(minute="*/30"), },}Create app/tasks/__init__.py:
"""Background tasks for async processing.
Tasks are functions that run outside the request cycle,allowing long-running operations without blocking responses."""
from typing import List, Dict, Anyfrom datetime import datetime, timedeltafrom celery import Taskfrom app.celery_app import celery_appfrom app.database import get_sessionfrom app.models import User, Task as TaskModel, Projectfrom app.email import EmailService
# Custom task base class for better error handlingclass CallbackTask(Task): """Task that runs callbacks on success/failure."""
def on_success(self, retval, task_id, args, kwargs): """Called on successful execution.""" print(f"Task {task_id} succeeded with result: {retval}")
def on_failure(self, exc, task_id, args, kwargs, einfo): """Called on task failure.""" print(f"Task {task_id} failed with error: {exc}") # Could send alert to monitoring service
@celery_app.task(bind=True, base=CallbackTask)def send_email( self, to: str, subject: str, template: str, context: Dict[str, Any]) -> bool: """ Send email with template.
This is a bound task (bind=True) which gives access to self for retries and task metadata.
Args: to: Recipient email subject: Email subject template: Template name context: Template variables
Returns: True if sent successfully
Raises: Retry: If email fails (automatic retry) """ try: email_service = EmailService()
# Render template html_content = email_service.render_template(template, context)
# Send email email_service.send( to=to, subject=subject, html_content=html_content )
return True
except Exception as exc: # Retry with exponential backoff raise self.retry(exc=exc, countdown=2 ** self.request.retries)
@celery_app.taskdef send_welcome_email(user_id: int) -> None: """ Send welcome email to new user.
This task is called after user registration. """ # Get user from database async def get_user(): async for session in get_session(): user = await session.get(User, user_id) return user
import asyncio user = asyncio.run(get_user())
if not user: return
# Send welcome email send_email.delay( to=user.email, subject="Welcome to TaskFlow!", template="welcome", context={ "name": user.name, "verification_url": f"{settings.FRONTEND_URL}/verify/{user.email_verification_token}" } )
@celery_app.taskdef send_task_reminder(task_id: int) -> None: """ Send reminder for upcoming task due date.
Checks task due date and sends reminder to assignee. """ async def process(): async for session in get_session(): # Get task with assignee task = await session.get(TaskModel, task_id) if not task or not task.assignee_id: return
await session.refresh(task, ["assignee", "project"])
# Check if reminder needed (due in next 24 hours) time_until_due = task.due_date - datetime.utcnow() if timedelta(0) < time_until_due < timedelta(days=1): send_email.delay( to=task.assignee.email, subject=f"Task Due Soon: {task.title}", template="task_reminder", context={ "task": task.title, "project": task.project.name, "due_date": task.due_date.strftime("%Y-%m-%d %H:%M"), "hours_left": int(time_until_due.total_seconds() / 3600) } )
import asyncio asyncio.run(process())
@celery_app.task(bind=True, max_retries=5)def process_file_upload( self, file_path: str, user_id: int, project_id: int) -> Dict[str, Any]: """ Process uploaded file (e.g., CSV import).
This demonstrates a long-running task with progress tracking.
Args: file_path: Path to uploaded file user_id: User who uploaded project_id: Project to import into
Returns: Processing result with statistics """ import csv import os
try: # Update task state for progress tracking self.update_state( state="PROCESSING", meta={"current": 0, "total": 100, "status": "Reading file..."} )
# Read and process file tasks_created = 0 errors = []
with open(file_path, 'r') as file: reader = csv.DictReader(file) rows = list(reader) total = len(rows)
for i, row in enumerate(rows): # Update progress self.update_state( state="PROCESSING", meta={ "current": i + 1, "total": total, "status": f"Processing row {i + 1}/{total}" } )
# Create task from row try: async def create_task(): async for session in get_session(): task = TaskModel( title=row["title"], description=row.get("description", ""), project_id=project_id, created_by=user_id, priority=int(row.get("priority", 1)) ) session.add(task) await session.commit()
import asyncio asyncio.run(create_task()) tasks_created += 1
except Exception as e: errors.append(f"Row {i + 1}: {str(e)}")
# Clean up file os.remove(file_path)
return { "tasks_created": tasks_created, "errors": errors, "status": "completed" }
except Exception as exc: # Retry on failure raise self.retry(exc=exc, countdown=60)Create app/templates/email/base.html:
<!DOCTYPE html><html><head> <meta charset="utf-8"> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <title>{% block title %}TaskFlow{% endblock %}</title> <style> body { font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, Helvetica, Arial, sans-serif; line-height: 1.6; color: #333; max-width: 600px; margin: 0 auto; padding: 20px; } .header { background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); color: white; padding: 30px; text-align: center; border-radius: 10px 10px 0 0; } .content { background: white; padding: 30px; border: 1px solid #e1e4e8; border-radius: 0 0 10px 10px; } .button { display: inline-block; padding: 12px 24px; background: #667eea; color: white; text-decoration: none; border-radius: 5px; margin: 20px 0; } .footer { margin-top: 30px; padding-top: 20px; border-top: 1px solid #e1e4e8; font-size: 12px; color: #666; text-align: center; } </style></head><body> <div class="header"> <h1>TaskFlow</h1> {% block header %}{% endblock %} </div>
<div class="content"> {% block content %}{% endblock %} </div>
<div class="footer"> <p>© 2024 TaskFlow. All rights reserved.</p> <p> <a href="{{ settings.FRONTEND_URL }}/unsubscribe">Unsubscribe</a> | <a href="{{ settings.FRONTEND_URL }}/preferences">Email Preferences</a> </p> </div></body></html>Create app/templates/email/welcome.html:
{% extends "email/base.html" %}
{% block header %}<h2>Welcome to TaskFlow!</h2>{% endblock %}
{% block content %}<p>Hi {{ name }},</p>
<p>Welcome to TaskFlow! We're excited to have you on board.</p>
<p>To get started, please verify your email address:</p>
<a href="{{ verification_url }}" class="button">Verify Email</a>
<p>Once verified, you can:</p><ul> <li>Create projects to organize your work</li> <li>Add tasks with due dates and priorities</li> <li>Collaborate with team members</li> <li>Track progress and productivity</li></ul>
<p>If you have any questions, reply to this email or check out our<a href="{{ settings.FRONTEND_URL }}/help">help center</a>.</p>
<p>Best regards,<br>The TaskFlow Team</p>{% endblock %}Create app/tasks/scheduled.py:
"""Scheduled tasks that run periodically.
These tasks are triggered by Celery Beat accordingto the schedule defined in celery_app.py."""
from datetime import datetime, timedeltafrom app.celery_app import celery_appfrom app.database import get_sessionfrom app.models import User, Task, UserSessionfrom sqlmodel import select, and_
@celery_app.taskdef send_daily_summary() -> None: """ Send daily summary email to all active users.
Runs every day at 9 AM. """ async def process(): async for session in get_session(): # Get all active users stmt = select(User).where( and_( User.is_active == True, User.is_verified == True ) ) result = await session.exec(stmt) users = result.all()
for user in users: # Get user's tasks for today today = datetime.utcnow().date() tomorrow = today + timedelta(days=1)
stmt = select(Task).where( and_( Task.assignee_id == user.id, Task.due_date >= today, Task.due_date < tomorrow, Task.is_completed == False ) ) result = await session.exec(stmt) tasks_today = result.all()
if tasks_today: # Send summary email from app.tasks import send_email send_email.delay( to=user.email, subject=f"Your TaskFlow Daily Summary - {len(tasks_today)} tasks due", template="daily_summary", context={ "name": user.name, "tasks": [ { "title": task.title, "project": task.project.name, "priority": task.priority, "due_time": task.due_date.strftime("%H:%M") } for task in tasks_today ], "date": today.strftime("%B %d, %Y") } )
import asyncio asyncio.run(process())
@celery_app.taskdef cleanup_expired_sessions() -> int: """ Clean up expired user sessions.
Runs every hour to remove old sessions and free up database space.
Returns: Number of sessions deleted """ async def cleanup(): async for session in get_session(): # Find expired sessions now = datetime.utcnow() stmt = select(UserSession).where( UserSession.expires_at < now ) result = await session.exec(stmt) expired_sessions = result.all()
# Delete them count = 0 for expired in expired_sessions: await session.delete(expired) count += 1
await session.commit() return count
import asyncio deleted = asyncio.run(cleanup()) print(f"Cleaned up {deleted} expired sessions") return deleted
@celery_app.taskdef check_overdue_tasks() -> None: """ Check for overdue tasks and notify assignees.
Runs every 30 minutes to find tasks that just became overdue. """ async def check(): async for session in get_session(): # Find tasks that became overdue in last 30 minutes now = datetime.utcnow() thirty_minutes_ago = now - timedelta(minutes=30)
stmt = select(Task).where( and_( Task.due_date < now, Task.due_date >= thirty_minutes_ago, Task.is_completed == False, Task.assignee_id != None ) ) result = await session.exec(stmt) overdue_tasks = result.all()
for task in overdue_tasks: await session.refresh(task, ["assignee", "project"])
# Send overdue notification from app.tasks import send_email send_email.delay( to=task.assignee.email, subject=f"⚠️ Overdue Task: {task.title}", template="task_overdue", context={ "task": task.title, "project": task.project.name, "assignee": task.assignee.name, "due_date": task.due_date.strftime("%Y-%m-%d %H:%M"), "task_url": f"{settings.FRONTEND_URL}/tasks/{task.id}" } )
import asyncio asyncio.run(check())
@celery_app.taskdef generate_weekly_report(user_id: int) -> str: """ Generate weekly productivity report for a user.
This is a heavy task that analyzes user activity and generates insights.
Args: user_id: User to generate report for
Returns: Report ID for retrieval """ import uuid import json
report_id = str(uuid.uuid4())
async def generate(): async for session in get_session(): # Get user user = await session.get(User, user_id) if not user: return
# Calculate stats for past week week_ago = datetime.utcnow() - timedelta(days=7)
# Tasks completed stmt = select(Task).where( and_( Task.assignee_id == user_id, Task.completed_at >= week_ago ) ) result = await session.exec(stmt) completed_tasks = result.all()
# Tasks created stmt = select(Task).where( and_( Task.created_by == user_id, Task.created_at >= week_ago ) ) result = await session.exec(stmt) created_tasks = result.all()
# Build report report = { "user_id": user_id, "user_name": user.name, "period": { "start": week_ago.isoformat(), "end": datetime.utcnow().isoformat() }, "stats": { "tasks_completed": len(completed_tasks), "tasks_created": len(created_tasks), "productivity_score": calculate_productivity_score( completed_tasks, created_tasks ) }, "insights": generate_insights(completed_tasks, created_tasks) }
# Store report in cache import redis r = redis.Redis.from_url(settings.REDIS_URL) r.setex( f"report:{report_id}", 3600, # Expire after 1 hour json.dumps(report) )
# Send report via email from app.tasks import send_email send_email.delay( to=user.email, subject="Your Weekly TaskFlow Report", template="weekly_report", context=report )
import asyncio asyncio.run(generate())
return report_id
def calculate_productivity_score(completed_tasks, created_tasks) -> int: """Calculate productivity score (0-100).""" # Simple scoring algorithm base_score = len(completed_tasks) * 10 bonus = len(completed_tasks) - len(created_tasks) # Completing more than creating
score = min(100, max(0, base_score + bonus * 5)) return score
def generate_insights(completed_tasks, created_tasks) -> List[str]: """Generate insights from task data.""" insights = []
if len(completed_tasks) > 10: insights.append("🎉 Great job! You completed over 10 tasks this week!")
if len(created_tasks) > len(completed_tasks) * 2: insights.append("📝 You created many new tasks. Consider prioritizing completion.")
# Add more insights based on patterns return insightsUpdate your API endpoints to use background tasks:
# In app/main.py
from app.tasks import send_welcome_email, process_file_uploadfrom zenith import BackgroundTasks, UploadFile, File
@app.post("/auth/register")async def register( user_data: UserCreate, background_tasks: BackgroundTasks, session = Depends(get_session)): """Register user with background email.""" service = UserService(session) user = await service.create_user(user_data)
# Queue welcome email (non-blocking) send_welcome_email.delay(user.id)
return user
@app.post("/projects/{project_id}/import")async def import_tasks( project_id: int, file: UploadFile = File(...), current_user = Depends(get_current_user), session = Depends(get_session)): """ Import tasks from CSV file.
This endpoint immediately returns while processing happens in the background. """ # Verify project ownership project = await session.get(Project, project_id) if not project or project.owner_id != current_user.id: raise HTTPException(status_code=403)
# Save uploaded file import tempfile import shutil
with tempfile.NamedTemporaryFile(delete=False, suffix=".csv") as tmp: shutil.copyfileobj(file.file, tmp) temp_path = tmp.name
# Queue processing task task = process_file_upload.delay( file_path=temp_path, user_id=current_user.id, project_id=project_id )
return { "task_id": task.id, "status": "processing", "status_url": f"/tasks/status/{task.id}" }
@app.get("/tasks/status/{task_id}")async def get_task_status(task_id: str): """ Get status of background task.
Returns progress information for long-running tasks. """ from celery.result import AsyncResult
task = AsyncResult(task_id, app=celery_app)
if task.state == "PENDING": return {"status": "pending", "message": "Task not started"} elif task.state == "PROCESSING": return { "status": "processing", "current": task.info.get("current", 0), "total": task.info.get("total", 1), "message": task.info.get("status", "") } elif task.state == "SUCCESS": return { "status": "completed", "result": task.result } else: return { "status": "failed", "error": str(task.info) }Create start_worker.py:
#!/usr/bin/env python"""Start Celery worker for processing background tasks."""
import sysfrom pathlib import Path
# Add project to pathsys.path.insert(0, str(Path(__file__).parent))
from app.celery_app import celery_app
if __name__ == "__main__": celery_app.start()Start workers and scheduler:
# Start worker (in separate terminal)celery -A app.celery_app worker --loglevel=info
# For development with auto-reloadwatchmedo auto-restart -d app -p '*.py' -- celery -A app.celery_app worker --loglevel=info
# Start beat scheduler (for periodic tasks)celery -A app.celery_app beat --loglevel=info
# Start Flower (monitoring dashboard)celery -A app.celery_app flower
# Access Flower at http://localhost:5555Flower provides a web interface for monitoring:
from flower import Flowerfrom app.celery_app import celery_app
def setup_monitoring(): """Configure Flower monitoring.""" flower_app = Flower( capp=celery_app, options={ "port": 5555, "basic_auth": ["admin:password"], # Add authentication "persistent": True, # Save state "db": "flower.db", # Database file "max_tasks": 10000, # Task history limit } ) return flower_appAdd monitoring to your API:
@app.get("/admin/jobs/stats")async def get_job_stats( current_user = Depends(require_admin)): """Get background job statistics.""" from celery import states from celery.result import AsyncResult
# Get active tasks inspect = celery_app.control.inspect() active = inspect.active() scheduled = inspect.scheduled() reserved = inspect.reserved()
# Get task counts by state stats = { "active": sum(len(tasks) for tasks in active.values()) if active else 0, "scheduled": sum(len(tasks) for tasks in scheduled.values()) if scheduled else 0, "reserved": sum(len(tasks) for tasks in reserved.values()) if reserved else 0, "workers": len(active) if active else 0 }
return stats
@app.post("/admin/jobs/retry/{task_id}")async def retry_failed_task( task_id: str, current_user = Depends(require_admin)): """Retry a failed task.""" from celery.result import AsyncResult
task = AsyncResult(task_id, app=celery_app)
if task.state == "FAILURE": # Retry the task task.retry() return {"status": "retrying", "task_id": task_id} else: return {"status": task.state, "message": "Task not in failed state"}Implement robust error handling:
from celery import Taskfrom celery.exceptions import MaxRetriesExceededErrorimport logging
logger = logging.getLogger(__name__)
class BaseTask(Task): """ Base task with automatic retrying and error handling. """
autoretry_for = (Exception,) # Retry on any exception retry_kwargs = {"max_retries": 3} # Max 3 retries retry_backoff = True # Exponential backoff retry_backoff_max = 600 # Max 10 minutes between retries retry_jitter = True # Add randomness to prevent thundering herd
def on_failure(self, exc, task_id, args, kwargs, einfo): """Handle task failure.""" logger.error(f"Task {task_id} failed: {exc}")
# Send alert for critical tasks if self.name in ["send_password_reset", "process_payment"]: send_admin_alert( subject=f"Critical task failed: {self.name}", body=f"Task {task_id} failed with error: {exc}" )
def on_retry(self, exc, task_id, args, kwargs, einfo): """Handle task retry.""" logger.warning(f"Task {task_id} retrying: {exc}")
def on_success(self, retval, task_id, args, kwargs): """Handle task success.""" logger.info(f"Task {task_id} completed successfully")
# Use the base task@celery_app.task(base=BaseTask, bind=True)def resilient_task(self, data): """Task that automatically retries on failure.""" try: # Do work result = process_data(data) return result
except TemporaryError as exc: # Retry with backoff raise self.retry(exc=exc)
except PermanentError as exc: # Don't retry permanent errors logger.error(f"Permanent error: {exc}") raise
except MaxRetriesExceededError: # Final failure after all retries logger.error("Task failed after maximum retries") send_admin_alert("Task permanently failed", str(self.request)) raiseCreate tests/test_tasks.py:
"""Test background tasks."""
import pytestfrom unittest.mock import patch, MagicMockfrom app.tasks import send_email, send_welcome_emailfrom app.tasks.scheduled import cleanup_expired_sessions
class TestEmailTasks: """Test email sending tasks."""
@patch('app.tasks.EmailService') def test_send_email_task(self, mock_email_service): """Test email sending task.""" # Mock email service mock_service = MagicMock() mock_email_service.return_value = mock_service
# Call task directly (not async) result = send_email( to="test@example.com", subject="Test Subject", template="welcome", context={"name": "Test User"} )
# Verify email service was called mock_service.send.assert_called_once() assert result is True
@patch('app.tasks.send_email.delay') async def test_welcome_email_task(self, mock_send_delay, session): """Test welcome email task.""" # Create test user from tests.factories import UserFactory user = UserFactory() session.add(user) await session.commit()
# Call task send_welcome_email(user.id)
# Verify email was queued mock_send_delay.assert_called_once() call_args = mock_send_delay.call_args[1] assert call_args["to"] == user.email assert "Welcome" in call_args["subject"]
class TestScheduledTasks: """Test scheduled tasks."""
async def test_cleanup_expired_sessions(self, session): """Test session cleanup task.""" from datetime import datetime, timedelta from app.models import UserSession
# Create expired and valid sessions expired_session = UserSession( user_id=1, refresh_token="expired", expires_at=datetime.utcnow() - timedelta(hours=1) ) valid_session = UserSession( user_id=1, refresh_token="valid", expires_at=datetime.utcnow() + timedelta(hours=1) )
session.add_all([expired_session, valid_session]) await session.commit()
# Run cleanup deleted_count = cleanup_expired_sessions()
assert deleted_count == 1
# Verify only expired session was deleted from sqlmodel import select stmt = select(UserSession) result = await session.exec(stmt) remaining = result.all()
assert len(remaining) == 1 assert remaining[0].refresh_token == "valid"
class TestTaskRetries: """Test task retry logic."""
@patch('app.tasks.EmailService') def test_email_retry_on_failure(self, mock_email_service): """Test that email task retries on failure.""" # Mock email service to fail mock_service = MagicMock() mock_service.send.side_effect = Exception("Network error") mock_email_service.return_value = mock_service
# Task should raise Retry exception with pytest.raises(Exception): send_email( to="test@example.com", subject="Test", template="test", context={} )Route tasks to appropriate queues:
celery_app.conf.task_routes = { 'app.tasks.send_email': {'queue': 'email', 'priority': 5}, 'app.tasks.process_file_upload': {'queue': 'heavy', 'priority': 1}, 'app.tasks.scheduled.*': {'queue': 'scheduled', 'priority': 3},}
# Start workers for specific queues# celery -A app.celery_app worker -Q email --concurrency=4# celery -A app.celery_app worker -Q heavy --concurrency=2Implement priority queues:
from kombu import Queue, Exchange
celery_app.conf.task_queues = ( Queue('default', Exchange('default'), routing_key='default'), Queue('high_priority', Exchange('high_priority'), routing_key='high_priority'), Queue('low_priority', Exchange('low_priority'), routing_key='low_priority'),)
# Send high priority tasksend_urgent_email.apply_async( args=[user_id], queue='high_priority', priority=9 # 0-9, higher is more important)Solution: Ensure Redis is running, worker is started, and tasks are properly registered
Solution: Use task deduplication with unique task IDs
Solution: Set worker max tasks per child: --max-tasks-per-child=100
In this part, you’ve implemented: Task queue with Celery and Redis Email notifications with templates Scheduled jobs with Celery Beat File processing and uploads Progress tracking for long tasks Error handling and retries Job monitoring with Flower Testing strategies for async tasks
In Part 7: Deployment, we’ll:
Continue to Part 7
Ready to deploy? Continue to Deployment →
Questions? Check our FAQ or ask in GitHub Discussions.