python-background-jobs

Par wshobson · agents

Modèles de tâches en arrière-plan en Python, incluant les files de tâches, les workers et l'architecture orientée événements. À utiliser lors de l'implémentation du traitement de tâches asynchrones, des files d'attente de jobs, des opérations longues, ou pour découpler le traitement du cycle requête/réponse.

npx skills add https://github.com/wshobson/agents --skill python-background-jobs

Tâches de fond Python et files d'attente

Découple le travail de longue durée ou peu fiable des cycles requête/réponse. Retourne immédiatement à l'utilisateur tandis que les workers de fond gèrent le travail lourd de manière asynchrone.

Quand utiliser cette compétence

  • Traiter des tâches qui prennent plus de quelques secondes
  • Envoyer des e-mails, notifications ou webhooks
  • Générer des rapports ou exporter des données
  • Traiter les uploads ou transformations médias
  • Intégrer des services externes peu fiables
  • Construire des architectures pilotées par événements

Concepts fondamentaux

1. Motif file d'attente de tâches

L'API accepte une requête, enfile une tâche, retourne immédiatement avec un ID de tâche. Les workers traitent les tâches de manière asynchrone.

2. Idempotence

Les tâches peuvent être relancées en cas d'échec. Concevez pour une ré-exécution sûre.

3. Machine d'état des tâches

Les tâches transitent par les états : en attente → en cours → réussi/échoué.

4. Livraison au moins une fois

La plupart des files d'attente garantissent une livraison au moins une fois. Votre code doit gérer les doublons.

Démarrage rapide

Cette compétence utilise Celery pour les exemples, une file d'attente de tâches largement adoptée. Les alternatives comme RQ, Dramatiq et les solutions natives cloud (AWS SQS, GCP Tasks) sont tout aussi valables.

from celery import Celery

app = Celery("tasks", broker="redis://localhost:6379")

@app.task
def send_email(to: str, subject: str, body: str) -> None:
    # This runs in a background worker
    email_client.send(to, subject, body)

# In your API handler
send_email.delay("user@example.com", "Welcome!", "Thanks for signing up")

Motifs fondamentaux

Motif 1 : Retourner l'ID de tâche immédiatement

Pour les opérations dépassant quelques secondes, retournez un ID de tâche et traitez de manière asynchrone.

from uuid import uuid4
from dataclasses import dataclass
from enum import Enum
from datetime import datetime

class JobStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    SUCCEEDED = "succeeded"
    FAILED = "failed"

@dataclass
class Job:
    id: str
    status: JobStatus
    created_at: datetime
    started_at: datetime | None = None
    completed_at: datetime | None = None
    result: dict | None = None
    error: str | None = None

# API endpoint
async def start_export(request: ExportRequest) -> JobResponse:
    """Start export job and return job ID."""
    job_id = str(uuid4())

    # Persist job record
    await jobs_repo.create(Job(
        id=job_id,
        status=JobStatus.PENDING,
        created_at=datetime.utcnow(),
    ))

    # Enqueue task for background processing
    await task_queue.enqueue(
        "export_data",
        job_id=job_id,
        params=request.model_dump(),
    )

    # Return immediately with job ID
    return JobResponse(
        job_id=job_id,
        status="pending",
        poll_url=f"/jobs/{job_id}",
    )

Motif 2 : Configuration des tâches Celery

Configurez les tâches Celery avec des paramètres appropriés de retry et timeout.

from celery import Celery

app = Celery("tasks", broker="redis://localhost:6379")

# Global configuration
app.conf.update(
    task_time_limit=3600,          # Hard limit: 1 hour
    task_soft_time_limit=3000,      # Soft limit: 50 minutes
    task_acks_late=True,            # Acknowledge after completion
    task_reject_on_worker_lost=True,
    worker_prefetch_multiplier=1,   # Don't prefetch too many tasks
)

@app.task(
    bind=True,
    max_retries=3,
    default_retry_delay=60,
    autoretry_for=(ConnectionError, TimeoutError),
)
def process_payment(self, payment_id: str) -> dict:
    """Process payment with automatic retry on transient errors."""
    try:
        result = payment_gateway.charge(payment_id)
        return {"status": "success", "transaction_id": result.id}
    except PaymentDeclinedError as e:
        # Don't retry permanent failures
        return {"status": "declined", "reason": str(e)}
    except TransientError as e:
        # Retry with exponential backoff
        raise self.retry(exc=e, countdown=2 ** self.request.retries * 60)

Motif 3 : Rendre les tâches idempotentes

Les workers peuvent relancer en cas de crash ou timeout. Concevez pour une ré-exécution sûre.

@app.task(bind=True)
def process_order(self, order_id: str) -> None:
    """Process order idempotently."""
    order = orders_repo.get(order_id)

    # Already processed? Return early
    if order.status == OrderStatus.COMPLETED:
        logger.info("Order already processed", order_id=order_id)
        return

    # Already in progress? Check if we should continue
    if order.status == OrderStatus.PROCESSING:
        # Use idempotency key to avoid double-charging
        pass

    # Process with idempotency key
    result = payment_provider.charge(
        amount=order.total,
        idempotency_key=f"order-{order_id}",  # Critical!
    )

    orders_repo.update(order_id, status=OrderStatus.COMPLETED)

Stratégies d'idempotence :

  1. Vérifier avant d'écrire : Vérifier l'état avant l'action
  2. Clés d'idempotence : Utiliser des tokens uniques avec les services externes
  3. Motifs d'upsert : INSERT ... ON CONFLICT UPDATE
  4. Fenêtre de déduplication : Tracker les IDs traités pendant N heures

Motif 4 : Gestion de l'état des tâches

Persistez les transitions d'état des tâches pour la visibilité et le débogage.

class JobRepository:
    """Repository for managing job state."""

    async def create(self, job: Job) -> Job:
        """Create new job record."""
        await self._db.execute(
            """INSERT INTO jobs (id, status, created_at)
               VALUES ($1, $2, $3)""",
            job.id, job.status.value, job.created_at,
        )
        return job

    async def update_status(
        self,
        job_id: str,
        status: JobStatus,
        **fields,
    ) -> None:
        """Update job status with timestamp."""
        updates = {"status": status.value, **fields}

        if status == JobStatus.RUNNING:
            updates["started_at"] = datetime.utcnow()
        elif status in (JobStatus.SUCCEEDED, JobStatus.FAILED):
            updates["completed_at"] = datetime.utcnow()

        await self._db.execute(
            "UPDATE jobs SET status = $1, ... WHERE id = $2",
            updates, job_id,
        )

        logger.info(
            "Job status updated",
            job_id=job_id,
            status=status.value,
        )

Motifs avancés

Motif 5 : File de lettres mortes

Gérez les tâches définitivement échouées pour inspection manuelle.

@app.task(bind=True, max_retries=3)
def process_webhook(self, webhook_id: str, payload: dict) -> None:
    """Process webhook with DLQ for failures."""
    try:
        result = send_webhook(payload)
        if not result.success:
            raise WebhookFailedError(result.error)
    except Exception as e:
        if self.request.retries >= self.max_retries:
            # Move to dead letter queue for manual inspection
            dead_letter_queue.send({
                "task": "process_webhook",
                "webhook_id": webhook_id,
                "payload": payload,
                "error": str(e),
                "attempts": self.request.retries + 1,
                "failed_at": datetime.utcnow().isoformat(),
            })
            logger.error(
                "Webhook moved to DLQ after max retries",
                webhook_id=webhook_id,
                error=str(e),
            )
            return

        # Exponential backoff retry
        raise self.retry(exc=e, countdown=2 ** self.request.retries * 60)

Motif 6 : Point de terminaison de polling de statut

Fournissez un point de terminaison pour que les clients vérifient le statut de la tâche.

from fastapi import FastAPI, HTTPException

app = FastAPI()

@app.get("/jobs/{job_id}")
async def get_job_status(job_id: str) -> JobStatusResponse:
    """Get current status of a background job."""
    job = await jobs_repo.get(job_id)

    if job is None:
        raise HTTPException(404, f"Job {job_id} not found")

    return JobStatusResponse(
        job_id=job.id,
        status=job.status.value,
        created_at=job.created_at,
        started_at=job.started_at,
        completed_at=job.completed_at,
        result=job.result if job.status == JobStatus.SUCCEEDED else None,
        error=job.error if job.status == JobStatus.FAILED else None,
        # Helpful for clients
        is_terminal=job.status in (JobStatus.SUCCEEDED, JobStatus.FAILED),
    )

Motif 7 : Chaînage de tâches et workflows

Composez des workflows complexes à partir de tâches simples.

from celery import chain, group, chord

# Simple chain: A → B → C
workflow = chain(
    extract_data.s(source_id),
    transform_data.s(),
    load_data.s(destination_id),
)

# Parallel execution: A, B, C all at once
parallel = group(
    send_email.s(user_email),
    send_sms.s(user_phone),
    update_analytics.s(event_data),
)

# Chord: Run tasks in parallel, then a callback
# Process all items, then send completion notification
workflow = chord(
    [process_item.s(item_id) for item_id in item_ids],
    send_completion_notification.s(batch_id),
)

workflow.apply_async()

Motif 8 : Files d'attente de tâches alternatives

Choisissez le bon outil pour vos besoins.

RQ (Redis Queue) : Simple, basé sur Redis

from rq import Queue
from redis import Redis

queue = Queue(connection=Redis())
job = queue.enqueue(send_email, "user@example.com", "Subject", "Body")

Dramatiq : Alternative moderne à Celery

import dramatiq
from dramatiq.brokers.redis import RedisBroker

dramatiq.set_broker(RedisBroker())

@dramatiq.actor
def send_email(to: str, subject: str, body: str) -> None:
    email_client.send(to, subject, body)

Options natives cloud :

  • AWS SQS + Lambda
  • Google Cloud Tasks
  • Azure Functions

Résumé des bonnes pratiques

  1. Retourner immédiatement - Ne bloquez pas les requêtes pour les opérations longues
  2. Persister l'état des tâches - Activer le polling de statut et le débogage
  3. Rendre les tâches idempotentes - Sûr de relancer en cas d'échec
  4. Utiliser des clés d'idempotence - Pour les appels de services externes
  5. Définir des timeouts - Limites souples et dures
  6. Implémenter une DLQ - Capturer les tâches définitivement échouées
  7. Logger les transitions - Tracker les changements d'état des tâches
  8. Relancer de manière appropriée - Backoff exponentiel pour les erreurs transitoires
  9. Ne pas relancer les échecs permanents - Erreurs de validation, identifiants invalides
  10. Surveiller la profondeur de la file - Alerter sur la croissance des arriérés

Skills similaires