Skip to content

Part 6: Background Jobs

Tutorial Part 6: Background Jobs & Async Processing

Section titled “Tutorial Part 6: Background Jobs & Async Processing”

Not everything should happen during a web request. Long-running tasks, email notifications, and scheduled jobs need to run in the background. In this part, we’ll build a robust background job system that keeps your API responsive while handling complex work asynchronously.

By the end of this part:

  • Task queue with Celery and Redis
  • Email notifications with templates
  • Scheduled jobs with Celery Beat
  • File processing and uploads
  • Progress tracking for long-running tasks
  • Error handling and retries
  • Job monitoring dashboard

Background jobs solve three problems:

  1. Response time: Don’t make users wait for slow operations
  2. Reliability: Retry failed operations automatically
  3. Scalability: Process jobs on separate workers
# Without background jobs (slow, blocks response)
@app.post("/send-report")
async def send_report(email: str):
report = generate_report() # Takes 30 seconds
send_email(email, report) # Takes 2 seconds
return {"status": "sent"} # User waits 32+ seconds!
# With background jobs (fast, non-blocking)
@app.post("/send-report")
async def send_report(email: str, background_tasks: BackgroundTasks):
background_tasks.add_task(generate_and_send_report, email)
return {"status": "queued"} # Instant response!

Install required packages:

Terminal window
# Using uv
uv add celery[redis] redis flower python-dotenv jinja2
# Or using pip
pip install celery[redis] redis flower python-dotenv jinja2

Configure Redis (our message broker):

Terminal window
# macOS
brew install redis
brew services start redis
# Ubuntu/Debian
sudo apt-get install redis-server
sudo systemctl start redis
# Docker
docker run -d -p 6379:6379 redis:alpine

Create app/celery_app.py:

"""
Celery configuration for background tasks.
Celery handles:
- Task queuing and execution
- Retries and error handling
- Scheduled tasks
- Result storage
"""
from celery import Celery
from celery.schedules import crontab
from app.config import settings
# Create Celery instance
celery_app = Celery(
"taskflow",
broker=settings.REDIS_URL or "redis://localhost:6379/0",
backend=settings.REDIS_URL or "redis://localhost:6379/0",
include=["app.tasks"] # Auto-discover tasks in this module
)
# Configuration
celery_app.conf.update(
# Task execution settings
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="UTC",
enable_utc=True,
# Performance settings
worker_prefetch_multiplier=4, # Prefetch 4 tasks per worker
task_acks_late=True, # Acknowledge after task completes
# Retry settings
task_default_retry_delay=60, # Retry after 1 minute
task_max_retries=3, # Max 3 retries
# Result backend settings
result_expires=3600, # Results expire after 1 hour
# Task routing (optional)
task_routes={
"app.tasks.email.*": {"queue": "email"},
"app.tasks.heavy.*": {"queue": "heavy"},
},
# Rate limiting
task_annotations={
"app.tasks.send_email": {"rate_limit": "100/m"}, # 100 emails per minute
}
)
# Scheduled tasks (Celery Beat)
celery_app.conf.beat_schedule = {
# Daily summary email at 9 AM
"daily-summary": {
"task": "app.tasks.send_daily_summary",
"schedule": crontab(hour=9, minute=0),
},
# Clean up old sessions every hour
"cleanup-sessions": {
"task": "app.tasks.cleanup_expired_sessions",
"schedule": crontab(minute=0), # Every hour at :00
},
# Check for overdue tasks every 30 minutes
"check-overdue": {
"task": "app.tasks.check_overdue_tasks",
"schedule": crontab(minute="*/30"),
},
}

Create app/tasks/__init__.py:

"""
Background tasks for async processing.
Tasks are functions that run outside the request cycle,
allowing long-running operations without blocking responses.
"""
from typing import List, Dict, Any
from datetime import datetime, timedelta
from celery import Task
from app.celery_app import celery_app
from app.database import get_session
from app.models import User, Task as TaskModel, Project
from app.email import EmailService
# Custom task base class for better error handling
class CallbackTask(Task):
"""Task that runs callbacks on success/failure."""
def on_success(self, retval, task_id, args, kwargs):
"""Called on successful execution."""
print(f"Task {task_id} succeeded with result: {retval}")
def on_failure(self, exc, task_id, args, kwargs, einfo):
"""Called on task failure."""
print(f"Task {task_id} failed with error: {exc}")
# Could send alert to monitoring service
@celery_app.task(bind=True, base=CallbackTask)
def send_email(
self,
to: str,
subject: str,
template: str,
context: Dict[str, Any]
) -> bool:
"""
Send email with template.
This is a bound task (bind=True) which gives access to
self for retries and task metadata.
Args:
to: Recipient email
subject: Email subject
template: Template name
context: Template variables
Returns:
True if sent successfully
Raises:
Retry: If email fails (automatic retry)
"""
try:
email_service = EmailService()
# Render template
html_content = email_service.render_template(template, context)
# Send email
email_service.send(
to=to,
subject=subject,
html_content=html_content
)
return True
except Exception as exc:
# Retry with exponential backoff
raise self.retry(exc=exc, countdown=2 ** self.request.retries)
@celery_app.task
def send_welcome_email(user_id: int) -> None:
"""
Send welcome email to new user.
This task is called after user registration.
"""
# Get user from database
async def get_user():
async for session in get_session():
user = await session.get(User, user_id)
return user
import asyncio
user = asyncio.run(get_user())
if not user:
return
# Send welcome email
send_email.delay(
to=user.email,
subject="Welcome to TaskFlow!",
template="welcome",
context={
"name": user.name,
"verification_url": f"{settings.FRONTEND_URL}/verify/{user.email_verification_token}"
}
)
@celery_app.task
def send_task_reminder(task_id: int) -> None:
"""
Send reminder for upcoming task due date.
Checks task due date and sends reminder to assignee.
"""
async def process():
async for session in get_session():
# Get task with assignee
task = await session.get(TaskModel, task_id)
if not task or not task.assignee_id:
return
await session.refresh(task, ["assignee", "project"])
# Check if reminder needed (due in next 24 hours)
time_until_due = task.due_date - datetime.utcnow()
if timedelta(0) < time_until_due < timedelta(days=1):
send_email.delay(
to=task.assignee.email,
subject=f"Task Due Soon: {task.title}",
template="task_reminder",
context={
"task": task.title,
"project": task.project.name,
"due_date": task.due_date.strftime("%Y-%m-%d %H:%M"),
"hours_left": int(time_until_due.total_seconds() / 3600)
}
)
import asyncio
asyncio.run(process())
@celery_app.task(bind=True, max_retries=5)
def process_file_upload(
self,
file_path: str,
user_id: int,
project_id: int
) -> Dict[str, Any]:
"""
Process uploaded file (e.g., CSV import).
This demonstrates a long-running task with progress tracking.
Args:
file_path: Path to uploaded file
user_id: User who uploaded
project_id: Project to import into
Returns:
Processing result with statistics
"""
import csv
import os
try:
# Update task state for progress tracking
self.update_state(
state="PROCESSING",
meta={"current": 0, "total": 100, "status": "Reading file..."}
)
# Read and process file
tasks_created = 0
errors = []
with open(file_path, 'r') as file:
reader = csv.DictReader(file)
rows = list(reader)
total = len(rows)
for i, row in enumerate(rows):
# Update progress
self.update_state(
state="PROCESSING",
meta={
"current": i + 1,
"total": total,
"status": f"Processing row {i + 1}/{total}"
}
)
# Create task from row
try:
async def create_task():
async for session in get_session():
task = TaskModel(
title=row["title"],
description=row.get("description", ""),
project_id=project_id,
created_by=user_id,
priority=int(row.get("priority", 1))
)
session.add(task)
await session.commit()
import asyncio
asyncio.run(create_task())
tasks_created += 1
except Exception as e:
errors.append(f"Row {i + 1}: {str(e)}")
# Clean up file
os.remove(file_path)
return {
"tasks_created": tasks_created,
"errors": errors,
"status": "completed"
}
except Exception as exc:
# Retry on failure
raise self.retry(exc=exc, countdown=60)

Create app/templates/email/base.html:

<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>{% block title %}TaskFlow{% endblock %}</title>
<style>
body {
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, Helvetica, Arial, sans-serif;
line-height: 1.6;
color: #333;
max-width: 600px;
margin: 0 auto;
padding: 20px;
}
.header {
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
color: white;
padding: 30px;
text-align: center;
border-radius: 10px 10px 0 0;
}
.content {
background: white;
padding: 30px;
border: 1px solid #e1e4e8;
border-radius: 0 0 10px 10px;
}
.button {
display: inline-block;
padding: 12px 24px;
background: #667eea;
color: white;
text-decoration: none;
border-radius: 5px;
margin: 20px 0;
}
.footer {
margin-top: 30px;
padding-top: 20px;
border-top: 1px solid #e1e4e8;
font-size: 12px;
color: #666;
text-align: center;
}
</style>
</head>
<body>
<div class="header">
<h1>TaskFlow</h1>
{% block header %}{% endblock %}
</div>
<div class="content">
{% block content %}{% endblock %}
</div>
<div class="footer">
<p>© 2024 TaskFlow. All rights reserved.</p>
<p>
<a href="{{ settings.FRONTEND_URL }}/unsubscribe">Unsubscribe</a> |
<a href="{{ settings.FRONTEND_URL }}/preferences">Email Preferences</a>
</p>
</div>
</body>
</html>

Create app/templates/email/welcome.html:

{% extends "email/base.html" %}
{% block header %}
<h2>Welcome to TaskFlow!</h2>
{% endblock %}
{% block content %}
<p>Hi {{ name }},</p>
<p>Welcome to TaskFlow! We're excited to have you on board.</p>
<p>To get started, please verify your email address:</p>
<a href="{{ verification_url }}" class="button">Verify Email</a>
<p>Once verified, you can:</p>
<ul>
<li>Create projects to organize your work</li>
<li>Add tasks with due dates and priorities</li>
<li>Collaborate with team members</li>
<li>Track progress and productivity</li>
</ul>
<p>If you have any questions, reply to this email or check out our
<a href="{{ settings.FRONTEND_URL }}/help">help center</a>.</p>
<p>Best regards,<br>The TaskFlow Team</p>
{% endblock %}

Create app/tasks/scheduled.py:

"""
Scheduled tasks that run periodically.
These tasks are triggered by Celery Beat according
to the schedule defined in celery_app.py.
"""
from datetime import datetime, timedelta
from app.celery_app import celery_app
from app.database import get_session
from app.models import User, Task, UserSession
from sqlmodel import select, and_
@celery_app.task
def send_daily_summary() -> None:
"""
Send daily summary email to all active users.
Runs every day at 9 AM.
"""
async def process():
async for session in get_session():
# Get all active users
stmt = select(User).where(
and_(
User.is_active == True,
User.is_verified == True
)
)
result = await session.exec(stmt)
users = result.all()
for user in users:
# Get user's tasks for today
today = datetime.utcnow().date()
tomorrow = today + timedelta(days=1)
stmt = select(Task).where(
and_(
Task.assignee_id == user.id,
Task.due_date >= today,
Task.due_date < tomorrow,
Task.is_completed == False
)
)
result = await session.exec(stmt)
tasks_today = result.all()
if tasks_today:
# Send summary email
from app.tasks import send_email
send_email.delay(
to=user.email,
subject=f"Your TaskFlow Daily Summary - {len(tasks_today)} tasks due",
template="daily_summary",
context={
"name": user.name,
"tasks": [
{
"title": task.title,
"project": task.project.name,
"priority": task.priority,
"due_time": task.due_date.strftime("%H:%M")
}
for task in tasks_today
],
"date": today.strftime("%B %d, %Y")
}
)
import asyncio
asyncio.run(process())
@celery_app.task
def cleanup_expired_sessions() -> int:
"""
Clean up expired user sessions.
Runs every hour to remove old sessions and
free up database space.
Returns:
Number of sessions deleted
"""
async def cleanup():
async for session in get_session():
# Find expired sessions
now = datetime.utcnow()
stmt = select(UserSession).where(
UserSession.expires_at < now
)
result = await session.exec(stmt)
expired_sessions = result.all()
# Delete them
count = 0
for expired in expired_sessions:
await session.delete(expired)
count += 1
await session.commit()
return count
import asyncio
deleted = asyncio.run(cleanup())
print(f"Cleaned up {deleted} expired sessions")
return deleted
@celery_app.task
def check_overdue_tasks() -> None:
"""
Check for overdue tasks and notify assignees.
Runs every 30 minutes to find tasks that just
became overdue.
"""
async def check():
async for session in get_session():
# Find tasks that became overdue in last 30 minutes
now = datetime.utcnow()
thirty_minutes_ago = now - timedelta(minutes=30)
stmt = select(Task).where(
and_(
Task.due_date < now,
Task.due_date >= thirty_minutes_ago,
Task.is_completed == False,
Task.assignee_id != None
)
)
result = await session.exec(stmt)
overdue_tasks = result.all()
for task in overdue_tasks:
await session.refresh(task, ["assignee", "project"])
# Send overdue notification
from app.tasks import send_email
send_email.delay(
to=task.assignee.email,
subject=f"⚠️ Overdue Task: {task.title}",
template="task_overdue",
context={
"task": task.title,
"project": task.project.name,
"assignee": task.assignee.name,
"due_date": task.due_date.strftime("%Y-%m-%d %H:%M"),
"task_url": f"{settings.FRONTEND_URL}/tasks/{task.id}"
}
)
import asyncio
asyncio.run(check())
@celery_app.task
def generate_weekly_report(user_id: int) -> str:
"""
Generate weekly productivity report for a user.
This is a heavy task that analyzes user activity
and generates insights.
Args:
user_id: User to generate report for
Returns:
Report ID for retrieval
"""
import uuid
import json
report_id = str(uuid.uuid4())
async def generate():
async for session in get_session():
# Get user
user = await session.get(User, user_id)
if not user:
return
# Calculate stats for past week
week_ago = datetime.utcnow() - timedelta(days=7)
# Tasks completed
stmt = select(Task).where(
and_(
Task.assignee_id == user_id,
Task.completed_at >= week_ago
)
)
result = await session.exec(stmt)
completed_tasks = result.all()
# Tasks created
stmt = select(Task).where(
and_(
Task.created_by == user_id,
Task.created_at >= week_ago
)
)
result = await session.exec(stmt)
created_tasks = result.all()
# Build report
report = {
"user_id": user_id,
"user_name": user.name,
"period": {
"start": week_ago.isoformat(),
"end": datetime.utcnow().isoformat()
},
"stats": {
"tasks_completed": len(completed_tasks),
"tasks_created": len(created_tasks),
"productivity_score": calculate_productivity_score(
completed_tasks,
created_tasks
)
},
"insights": generate_insights(completed_tasks, created_tasks)
}
# Store report in cache
import redis
r = redis.Redis.from_url(settings.REDIS_URL)
r.setex(
f"report:{report_id}",
3600, # Expire after 1 hour
json.dumps(report)
)
# Send report via email
from app.tasks import send_email
send_email.delay(
to=user.email,
subject="Your Weekly TaskFlow Report",
template="weekly_report",
context=report
)
import asyncio
asyncio.run(generate())
return report_id
def calculate_productivity_score(completed_tasks, created_tasks) -> int:
"""Calculate productivity score (0-100)."""
# Simple scoring algorithm
base_score = len(completed_tasks) * 10
bonus = len(completed_tasks) - len(created_tasks) # Completing more than creating
score = min(100, max(0, base_score + bonus * 5))
return score
def generate_insights(completed_tasks, created_tasks) -> List[str]:
"""Generate insights from task data."""
insights = []
if len(completed_tasks) > 10:
insights.append("🎉 Great job! You completed over 10 tasks this week!")
if len(created_tasks) > len(completed_tasks) * 2:
insights.append("📝 You created many new tasks. Consider prioritizing completion.")
# Add more insights based on patterns
return insights

Update your API endpoints to use background tasks:

# In app/main.py
from app.tasks import send_welcome_email, process_file_upload
from zenith import BackgroundTasks, UploadFile, File
@app.post("/auth/register")
async def register(
user_data: UserCreate,
background_tasks: BackgroundTasks,
session = Depends(get_session)
):
"""Register user with background email."""
service = UserService(session)
user = await service.create_user(user_data)
# Queue welcome email (non-blocking)
send_welcome_email.delay(user.id)
return user
@app.post("/projects/{project_id}/import")
async def import_tasks(
project_id: int,
file: UploadFile = File(...),
current_user = Depends(get_current_user),
session = Depends(get_session)
):
"""
Import tasks from CSV file.
This endpoint immediately returns while processing
happens in the background.
"""
# Verify project ownership
project = await session.get(Project, project_id)
if not project or project.owner_id != current_user.id:
raise HTTPException(status_code=403)
# Save uploaded file
import tempfile
import shutil
with tempfile.NamedTemporaryFile(delete=False, suffix=".csv") as tmp:
shutil.copyfileobj(file.file, tmp)
temp_path = tmp.name
# Queue processing task
task = process_file_upload.delay(
file_path=temp_path,
user_id=current_user.id,
project_id=project_id
)
return {
"task_id": task.id,
"status": "processing",
"status_url": f"/tasks/status/{task.id}"
}
@app.get("/tasks/status/{task_id}")
async def get_task_status(task_id: str):
"""
Get status of background task.
Returns progress information for long-running tasks.
"""
from celery.result import AsyncResult
task = AsyncResult(task_id, app=celery_app)
if task.state == "PENDING":
return {"status": "pending", "message": "Task not started"}
elif task.state == "PROCESSING":
return {
"status": "processing",
"current": task.info.get("current", 0),
"total": task.info.get("total", 1),
"message": task.info.get("status", "")
}
elif task.state == "SUCCESS":
return {
"status": "completed",
"result": task.result
}
else:
return {
"status": "failed",
"error": str(task.info)
}

Create start_worker.py:

#!/usr/bin/env python
"""
Start Celery worker for processing background tasks.
"""
import sys
from pathlib import Path
# Add project to path
sys.path.insert(0, str(Path(__file__).parent))
from app.celery_app import celery_app
if __name__ == "__main__":
celery_app.start()

Start workers and scheduler:

Terminal window
# Start worker (in separate terminal)
celery -A app.celery_app worker --loglevel=info
# For development with auto-reload
watchmedo auto-restart -d app -p '*.py' -- celery -A app.celery_app worker --loglevel=info
# Start beat scheduler (for periodic tasks)
celery -A app.celery_app beat --loglevel=info
# Start Flower (monitoring dashboard)
celery -A app.celery_app flower
# Access Flower at http://localhost:5555

Flower provides a web interface for monitoring:

app/monitoring.py
from flower import Flower
from app.celery_app import celery_app
def setup_monitoring():
"""Configure Flower monitoring."""
flower_app = Flower(
capp=celery_app,
options={
"port": 5555,
"basic_auth": ["admin:password"], # Add authentication
"persistent": True, # Save state
"db": "flower.db", # Database file
"max_tasks": 10000, # Task history limit
}
)
return flower_app

Add monitoring to your API:

@app.get("/admin/jobs/stats")
async def get_job_stats(
current_user = Depends(require_admin)
):
"""Get background job statistics."""
from celery import states
from celery.result import AsyncResult
# Get active tasks
inspect = celery_app.control.inspect()
active = inspect.active()
scheduled = inspect.scheduled()
reserved = inspect.reserved()
# Get task counts by state
stats = {
"active": sum(len(tasks) for tasks in active.values()) if active else 0,
"scheduled": sum(len(tasks) for tasks in scheduled.values()) if scheduled else 0,
"reserved": sum(len(tasks) for tasks in reserved.values()) if reserved else 0,
"workers": len(active) if active else 0
}
return stats
@app.post("/admin/jobs/retry/{task_id}")
async def retry_failed_task(
task_id: str,
current_user = Depends(require_admin)
):
"""Retry a failed task."""
from celery.result import AsyncResult
task = AsyncResult(task_id, app=celery_app)
if task.state == "FAILURE":
# Retry the task
task.retry()
return {"status": "retrying", "task_id": task_id}
else:
return {"status": task.state, "message": "Task not in failed state"}

Implement robust error handling:

app/tasks/error_handling.py
from celery import Task
from celery.exceptions import MaxRetriesExceededError
import logging
logger = logging.getLogger(__name__)
class BaseTask(Task):
"""
Base task with automatic retrying and error handling.
"""
autoretry_for = (Exception,) # Retry on any exception
retry_kwargs = {"max_retries": 3} # Max 3 retries
retry_backoff = True # Exponential backoff
retry_backoff_max = 600 # Max 10 minutes between retries
retry_jitter = True # Add randomness to prevent thundering herd
def on_failure(self, exc, task_id, args, kwargs, einfo):
"""Handle task failure."""
logger.error(f"Task {task_id} failed: {exc}")
# Send alert for critical tasks
if self.name in ["send_password_reset", "process_payment"]:
send_admin_alert(
subject=f"Critical task failed: {self.name}",
body=f"Task {task_id} failed with error: {exc}"
)
def on_retry(self, exc, task_id, args, kwargs, einfo):
"""Handle task retry."""
logger.warning(f"Task {task_id} retrying: {exc}")
def on_success(self, retval, task_id, args, kwargs):
"""Handle task success."""
logger.info(f"Task {task_id} completed successfully")
# Use the base task
@celery_app.task(base=BaseTask, bind=True)
def resilient_task(self, data):
"""Task that automatically retries on failure."""
try:
# Do work
result = process_data(data)
return result
except TemporaryError as exc:
# Retry with backoff
raise self.retry(exc=exc)
except PermanentError as exc:
# Don't retry permanent errors
logger.error(f"Permanent error: {exc}")
raise
except MaxRetriesExceededError:
# Final failure after all retries
logger.error("Task failed after maximum retries")
send_admin_alert("Task permanently failed", str(self.request))
raise

Create tests/test_tasks.py:

"""
Test background tasks.
"""
import pytest
from unittest.mock import patch, MagicMock
from app.tasks import send_email, send_welcome_email
from app.tasks.scheduled import cleanup_expired_sessions
class TestEmailTasks:
"""Test email sending tasks."""
@patch('app.tasks.EmailService')
def test_send_email_task(self, mock_email_service):
"""Test email sending task."""
# Mock email service
mock_service = MagicMock()
mock_email_service.return_value = mock_service
# Call task directly (not async)
result = send_email(
to="test@example.com",
subject="Test Subject",
template="welcome",
context={"name": "Test User"}
)
# Verify email service was called
mock_service.send.assert_called_once()
assert result is True
@patch('app.tasks.send_email.delay')
async def test_welcome_email_task(self, mock_send_delay, session):
"""Test welcome email task."""
# Create test user
from tests.factories import UserFactory
user = UserFactory()
session.add(user)
await session.commit()
# Call task
send_welcome_email(user.id)
# Verify email was queued
mock_send_delay.assert_called_once()
call_args = mock_send_delay.call_args[1]
assert call_args["to"] == user.email
assert "Welcome" in call_args["subject"]
class TestScheduledTasks:
"""Test scheduled tasks."""
async def test_cleanup_expired_sessions(self, session):
"""Test session cleanup task."""
from datetime import datetime, timedelta
from app.models import UserSession
# Create expired and valid sessions
expired_session = UserSession(
user_id=1,
refresh_token="expired",
expires_at=datetime.utcnow() - timedelta(hours=1)
)
valid_session = UserSession(
user_id=1,
refresh_token="valid",
expires_at=datetime.utcnow() + timedelta(hours=1)
)
session.add_all([expired_session, valid_session])
await session.commit()
# Run cleanup
deleted_count = cleanup_expired_sessions()
assert deleted_count == 1
# Verify only expired session was deleted
from sqlmodel import select
stmt = select(UserSession)
result = await session.exec(stmt)
remaining = result.all()
assert len(remaining) == 1
assert remaining[0].refresh_token == "valid"
class TestTaskRetries:
"""Test task retry logic."""
@patch('app.tasks.EmailService')
def test_email_retry_on_failure(self, mock_email_service):
"""Test that email task retries on failure."""
# Mock email service to fail
mock_service = MagicMock()
mock_service.send.side_effect = Exception("Network error")
mock_email_service.return_value = mock_service
# Task should raise Retry exception
with pytest.raises(Exception):
send_email(
to="test@example.com",
subject="Test",
template="test",
context={}
)

Route tasks to appropriate queues:

app/celery_app.py
celery_app.conf.task_routes = {
'app.tasks.send_email': {'queue': 'email', 'priority': 5},
'app.tasks.process_file_upload': {'queue': 'heavy', 'priority': 1},
'app.tasks.scheduled.*': {'queue': 'scheduled', 'priority': 3},
}
# Start workers for specific queues
# celery -A app.celery_app worker -Q email --concurrency=4
# celery -A app.celery_app worker -Q heavy --concurrency=2

Implement priority queues:

from kombu import Queue, Exchange
celery_app.conf.task_queues = (
Queue('default', Exchange('default'), routing_key='default'),
Queue('high_priority', Exchange('high_priority'), routing_key='high_priority'),
Queue('low_priority', Exchange('low_priority'), routing_key='low_priority'),
)
# Send high priority task
send_urgent_email.apply_async(
args=[user_id],
queue='high_priority',
priority=9 # 0-9, higher is more important
)

Solution: Ensure Redis is running, worker is started, and tasks are properly registered

Solution: Use task deduplication with unique task IDs

Solution: Set worker max tasks per child: --max-tasks-per-child=100

In this part, you’ve implemented: Task queue with Celery and Redis Email notifications with templates Scheduled jobs with Celery Beat File processing and uploads Progress tracking for long tasks Error handling and retries Job monitoring with Flower Testing strategies for async tasks

In Part 7: Deployment, we’ll:

  • Containerize with Docker
  • Deploy to production
  • Set up CI/CD pipelines
  • Configure monitoring and logging
  • Implement scaling strategies

Questions? Check our FAQ or ask in GitHub Discussions.