Tâches de fond Python et files d'attente
Découple le travail de longue durée ou peu fiable des cycles requête/réponse. Retourne immédiatement à l'utilisateur tandis que les workers de fond gèrent le travail lourd de manière asynchrone.
Quand utiliser cette compétence
- Traiter des tâches qui prennent plus de quelques secondes
- Envoyer des e-mails, notifications ou webhooks
- Générer des rapports ou exporter des données
- Traiter les uploads ou transformations médias
- Intégrer des services externes peu fiables
- Construire des architectures pilotées par événements
Concepts fondamentaux
1. Motif file d'attente de tâches
L'API accepte une requête, enfile une tâche, retourne immédiatement avec un ID de tâche. Les workers traitent les tâches de manière asynchrone.
2. Idempotence
Les tâches peuvent être relancées en cas d'échec. Concevez pour une ré-exécution sûre.
3. Machine d'état des tâches
Les tâches transitent par les états : en attente → en cours → réussi/échoué.
4. Livraison au moins une fois
La plupart des files d'attente garantissent une livraison au moins une fois. Votre code doit gérer les doublons.
Démarrage rapide
Cette compétence utilise Celery pour les exemples, une file d'attente de tâches largement adoptée. Les alternatives comme RQ, Dramatiq et les solutions natives cloud (AWS SQS, GCP Tasks) sont tout aussi valables.
from celery import Celery
app = Celery("tasks", broker="redis://localhost:6379")
@app.task
def send_email(to: str, subject: str, body: str) -> None:
# This runs in a background worker
email_client.send(to, subject, body)
# In your API handler
send_email.delay("user@example.com", "Welcome!", "Thanks for signing up")
Motifs fondamentaux
Motif 1 : Retourner l'ID de tâche immédiatement
Pour les opérations dépassant quelques secondes, retournez un ID de tâche et traitez de manière asynchrone.
from uuid import uuid4
from dataclasses import dataclass
from enum import Enum
from datetime import datetime
class JobStatus(Enum):
PENDING = "pending"
RUNNING = "running"
SUCCEEDED = "succeeded"
FAILED = "failed"
@dataclass
class Job:
id: str
status: JobStatus
created_at: datetime
started_at: datetime | None = None
completed_at: datetime | None = None
result: dict | None = None
error: str | None = None
# API endpoint
async def start_export(request: ExportRequest) -> JobResponse:
"""Start export job and return job ID."""
job_id = str(uuid4())
# Persist job record
await jobs_repo.create(Job(
id=job_id,
status=JobStatus.PENDING,
created_at=datetime.utcnow(),
))
# Enqueue task for background processing
await task_queue.enqueue(
"export_data",
job_id=job_id,
params=request.model_dump(),
)
# Return immediately with job ID
return JobResponse(
job_id=job_id,
status="pending",
poll_url=f"/jobs/{job_id}",
)
Motif 2 : Configuration des tâches Celery
Configurez les tâches Celery avec des paramètres appropriés de retry et timeout.
from celery import Celery
app = Celery("tasks", broker="redis://localhost:6379")
# Global configuration
app.conf.update(
task_time_limit=3600, # Hard limit: 1 hour
task_soft_time_limit=3000, # Soft limit: 50 minutes
task_acks_late=True, # Acknowledge after completion
task_reject_on_worker_lost=True,
worker_prefetch_multiplier=1, # Don't prefetch too many tasks
)
@app.task(
bind=True,
max_retries=3,
default_retry_delay=60,
autoretry_for=(ConnectionError, TimeoutError),
)
def process_payment(self, payment_id: str) -> dict:
"""Process payment with automatic retry on transient errors."""
try:
result = payment_gateway.charge(payment_id)
return {"status": "success", "transaction_id": result.id}
except PaymentDeclinedError as e:
# Don't retry permanent failures
return {"status": "declined", "reason": str(e)}
except TransientError as e:
# Retry with exponential backoff
raise self.retry(exc=e, countdown=2 ** self.request.retries * 60)
Motif 3 : Rendre les tâches idempotentes
Les workers peuvent relancer en cas de crash ou timeout. Concevez pour une ré-exécution sûre.
@app.task(bind=True)
def process_order(self, order_id: str) -> None:
"""Process order idempotently."""
order = orders_repo.get(order_id)
# Already processed? Return early
if order.status == OrderStatus.COMPLETED:
logger.info("Order already processed", order_id=order_id)
return
# Already in progress? Check if we should continue
if order.status == OrderStatus.PROCESSING:
# Use idempotency key to avoid double-charging
pass
# Process with idempotency key
result = payment_provider.charge(
amount=order.total,
idempotency_key=f"order-{order_id}", # Critical!
)
orders_repo.update(order_id, status=OrderStatus.COMPLETED)
Stratégies d'idempotence :
- Vérifier avant d'écrire : Vérifier l'état avant l'action
- Clés d'idempotence : Utiliser des tokens uniques avec les services externes
- Motifs d'upsert :
INSERT ... ON CONFLICT UPDATE - Fenêtre de déduplication : Tracker les IDs traités pendant N heures
Motif 4 : Gestion de l'état des tâches
Persistez les transitions d'état des tâches pour la visibilité et le débogage.
class JobRepository:
"""Repository for managing job state."""
async def create(self, job: Job) -> Job:
"""Create new job record."""
await self._db.execute(
"""INSERT INTO jobs (id, status, created_at)
VALUES ($1, $2, $3)""",
job.id, job.status.value, job.created_at,
)
return job
async def update_status(
self,
job_id: str,
status: JobStatus,
**fields,
) -> None:
"""Update job status with timestamp."""
updates = {"status": status.value, **fields}
if status == JobStatus.RUNNING:
updates["started_at"] = datetime.utcnow()
elif status in (JobStatus.SUCCEEDED, JobStatus.FAILED):
updates["completed_at"] = datetime.utcnow()
await self._db.execute(
"UPDATE jobs SET status = $1, ... WHERE id = $2",
updates, job_id,
)
logger.info(
"Job status updated",
job_id=job_id,
status=status.value,
)
Motifs avancés
Motif 5 : File de lettres mortes
Gérez les tâches définitivement échouées pour inspection manuelle.
@app.task(bind=True, max_retries=3)
def process_webhook(self, webhook_id: str, payload: dict) -> None:
"""Process webhook with DLQ for failures."""
try:
result = send_webhook(payload)
if not result.success:
raise WebhookFailedError(result.error)
except Exception as e:
if self.request.retries >= self.max_retries:
# Move to dead letter queue for manual inspection
dead_letter_queue.send({
"task": "process_webhook",
"webhook_id": webhook_id,
"payload": payload,
"error": str(e),
"attempts": self.request.retries + 1,
"failed_at": datetime.utcnow().isoformat(),
})
logger.error(
"Webhook moved to DLQ after max retries",
webhook_id=webhook_id,
error=str(e),
)
return
# Exponential backoff retry
raise self.retry(exc=e, countdown=2 ** self.request.retries * 60)
Motif 6 : Point de terminaison de polling de statut
Fournissez un point de terminaison pour que les clients vérifient le statut de la tâche.
from fastapi import FastAPI, HTTPException
app = FastAPI()
@app.get("/jobs/{job_id}")
async def get_job_status(job_id: str) -> JobStatusResponse:
"""Get current status of a background job."""
job = await jobs_repo.get(job_id)
if job is None:
raise HTTPException(404, f"Job {job_id} not found")
return JobStatusResponse(
job_id=job.id,
status=job.status.value,
created_at=job.created_at,
started_at=job.started_at,
completed_at=job.completed_at,
result=job.result if job.status == JobStatus.SUCCEEDED else None,
error=job.error if job.status == JobStatus.FAILED else None,
# Helpful for clients
is_terminal=job.status in (JobStatus.SUCCEEDED, JobStatus.FAILED),
)
Motif 7 : Chaînage de tâches et workflows
Composez des workflows complexes à partir de tâches simples.
from celery import chain, group, chord
# Simple chain: A → B → C
workflow = chain(
extract_data.s(source_id),
transform_data.s(),
load_data.s(destination_id),
)
# Parallel execution: A, B, C all at once
parallel = group(
send_email.s(user_email),
send_sms.s(user_phone),
update_analytics.s(event_data),
)
# Chord: Run tasks in parallel, then a callback
# Process all items, then send completion notification
workflow = chord(
[process_item.s(item_id) for item_id in item_ids],
send_completion_notification.s(batch_id),
)
workflow.apply_async()
Motif 8 : Files d'attente de tâches alternatives
Choisissez le bon outil pour vos besoins.
RQ (Redis Queue) : Simple, basé sur Redis
from rq import Queue
from redis import Redis
queue = Queue(connection=Redis())
job = queue.enqueue(send_email, "user@example.com", "Subject", "Body")
Dramatiq : Alternative moderne à Celery
import dramatiq
from dramatiq.brokers.redis import RedisBroker
dramatiq.set_broker(RedisBroker())
@dramatiq.actor
def send_email(to: str, subject: str, body: str) -> None:
email_client.send(to, subject, body)
Options natives cloud :
- AWS SQS + Lambda
- Google Cloud Tasks
- Azure Functions
Résumé des bonnes pratiques
- Retourner immédiatement - Ne bloquez pas les requêtes pour les opérations longues
- Persister l'état des tâches - Activer le polling de statut et le débogage
- Rendre les tâches idempotentes - Sûr de relancer en cas d'échec
- Utiliser des clés d'idempotence - Pour les appels de services externes
- Définir des timeouts - Limites souples et dures
- Implémenter une DLQ - Capturer les tâches définitivement échouées
- Logger les transitions - Tracker les changements d'état des tâches
- Relancer de manière appropriée - Backoff exponentiel pour les erreurs transitoires
- Ne pas relancer les échecs permanents - Erreurs de validation, identifiants invalides
- Surveiller la profondeur de la file - Alerter sur la croissance des arriérés