saga-orchestration

Par wshobson · agents

Implémentez des patterns saga pour les transactions distribuées et les workflows inter-agrégats. Utilisez cette compétence pour implémenter des transactions distribuées entre microservices lorsque le 2PC n'est pas disponible, concevoir des actions compensatoires pour les workflows de commandes échouées qui couvrent les services d'inventaire, de paiement et d'expédition, construire des coordinateurs de saga orientés événements pour les systèmes de réservation de voyage qui doivent annuler atomiquement les réservations d'hôtels, de vols et de voitures, ou déboguer des états de saga bloqués en production où les étapes de compensation ne se terminent jamais.

npx skills add https://github.com/wshobson/agents --skill saga-orchestration

Orchestration de Saga

Patterns pour gérer les transactions distribuées et les processus métier longue durée sans validation en deux phases.

Entrées et Sorties

Ce que vous fournissez :

  • Limites de service et responsabilités (quel service possède quelle étape)
  • Exigences transactionnelles (quelles étapes doivent être atomiques, lesquelles peuvent être finalement cohérentes)
  • Modes de défaillance pour chaque étape (transitoires vs. permanents, politique de retry)
  • Exigences SLA par étape (informe la configuration des timeouts)
  • Infrastructure de messaging/événements existante (Kafka, RabbitMQ, SQS, etc.)

Ce que cette skill produit :

  • Définition de saga avec étapes ordonnées, commandes d'action et commandes de compensation
  • Implémentation orchestratrice ou chorégraphique pour le pattern choisi
  • Logique de compensation pour chaque service participant (idempotente, toujours réussie)
  • Configuration des timeouts d'étape avec délais par étape
  • Configuration du monitoring : métriques de machine d'état, détection de saga bloquée, récupération DLQ

Quand utiliser cette skill

  • Coordonner des transactions multi-service sans verrous distribués
  • Implémenter les transactions compensatoires pour les défaillances partielles
  • Gérer les workflows métier longue durée (minutes à heures)
  • Gérer les défaillances dans les systèmes distribués où l'atomicité est requise
  • Construire des processus de traitement de commande, approbation ou réservation
  • Remplacer la fragile validation en deux phases par une compensation async

Concepts Clés

Types de Pattern Saga

Chorégraphie                        Orchestration
┌─────┐  ┌─────┐  ┌─────┐         ┌─────────────┐
│Svc A│─►│Svc B│─►│Svc C│         │ Orchestrator│
└─────┘  └─────┘  └─────┘         └──────┬──────┘
   │        │        │                   │
   ▼        ▼        ▼             ┌─────┼─────┐
 Event    Event    Event           ▼     ▼     ▼
                                ┌────┐┌────┐┌────┐
Chaque service réagit à         │Svc1││Svc2││Svc3│
l'événement du service          └────┘└────┘└────┘
précédent.
Pas de coordinateur central.    Coordinateur central envoie
                                des commandes et suit l'état.

Choisir l'orchestration quand : Vous avez besoin d'un suivi explicite des étapes, de retries et de visibilité centralisée. Plus facile à déboguer.

Choisir la chorégraphie quand : Vous voulez un couplage faible et des services qui peuvent évoluer indépendamment. Plus difficile à tracer.

États d'Exécution de Saga

État Description
Started Saga initiée, première étape envoyée
Pending En attente de réponse d'étape d'un participant
Compensating Une étape a échoué ; rollback des étapes réussies
Completed Toutes les étapes avant ont réussi
Failed Saga échouée et toutes les compensations terminées

Règles de Compensation

Situation Gestion
Étape jamais démarrée Aucune compensation nécessaire (sauter)
Étape complétée avec succès Exécuter la commande de compensation
Étape échouée avant la fin Aucune compensation nécessaire ; marquer échouée
La compensation elle-même échoue Retry avec backoff → DLQ → alerte intervention manuelle
Résultat d'étape n'existe plus Traiter compensation comme succès (idempotence)

Templates

Template 1 : Saga de Traitement de Commande (Orchestration)

Sous-classe concrète de l'orchestrateur de base. Définit quatre étapes couvrant l'inventaire, le paiement, l'expédition et la notification. Voir references/advanced-patterns.md pour la classe de base abstraite SagaOrchestrator complète.

from saga_orchestrator import SagaOrchestrator, SagaStep
from typing import Dict, List


class OrderFulfillmentSaga(SagaOrchestrator):
    """Orchestre le traitement des commandes entre quatre services participants."""

    @property
    def saga_type(self) -> str:
        return "OrderFulfillment"

    def define_steps(self, data: Dict) -> List[SagaStep]:
        return [
            SagaStep(
                name="reserve_inventory",
                action="InventoryService.ReserveItems",
                compensation="InventoryService.ReleaseReservation"
            ),
            SagaStep(
                name="process_payment",
                action="PaymentService.ProcessPayment",
                compensation="PaymentService.RefundPayment"
            ),
            SagaStep(
                name="create_shipment",
                action="ShippingService.CreateShipment",
                compensation="ShippingService.CancelShipment"
            ),
            SagaStep(
                name="send_confirmation",
                action="NotificationService.SendOrderConfirmation",
                compensation="NotificationService.SendCancellationNotice"
            ),
        ]


# Démarrer une saga
async def create_order(order_data: Dict, saga_store, event_publisher):
    saga = OrderFulfillmentSaga(saga_store, event_publisher)
    return await saga.start({
        "order_id": order_data["order_id"],
        "customer_id": order_data["customer_id"],
        "items": order_data["items"],
        "payment_method": order_data["payment_method"],
        "shipping_address": order_data["shipping_address"],
    })


# Service participant — gère la commande et publie la réponse
class InventoryService:
    async def handle_reserve_items(self, command: Dict):
        try:
            reservation = await self.reserve(command["items"], command["order_id"])
            await self.event_publisher.publish("SagaStepCompleted", {
                "saga_id": command["saga_id"],
                "step_name": "reserve_inventory",
                "result": {"reservation_id": reservation.id}
            })
        except InsufficientInventoryError as e:
            await self.event_publisher.publish("SagaStepFailed", {
                "saga_id": command["saga_id"],
                "step_name": "reserve_inventory",
                "error": str(e)
            })

    async def handle_release_reservation(self, command: Dict):
        """Compensation — idempotente, publie toujours la fin."""
        try:
            await self.release_reservation(
                command["original_result"]["reservation_id"]
            )
        except ReservationNotFoundError:
            pass  # Déjà libéré — traiter comme succès
        await self.event_publisher.publish("SagaCompensationCompleted", {
            "saga_id": command["saga_id"],
            "step_name": "reserve_inventory"
        })

Template 2 : Saga Basée sur la Chorégraphie

Chaque service écoute l'événement du service précédent et réagit. Pas de coordinateur central. La compensation est déclenchée par les événements d'échec qui se propagent en arrière.

from dataclasses import dataclass
from typing import Dict, Any


@dataclass
class SagaContext:
    """Transporté à travers tous les événements d'une saga chorégraphiée."""
    saga_id: str
    step: int
    data: Dict[str, Any]
    completed_steps: list


class OrderChoreographySaga:
    """Saga basée sur la chorégraphie — les services réagissent aux événements les uns des autres."""

    def __init__(self, event_bus):
        self.event_bus = event_bus
        self._register_handlers()

    def _register_handlers(self):
        # Chemin avant
        self.event_bus.subscribe("OrderCreated",       self._on_order_created)
        self.event_bus.subscribe("InventoryReserved",  self._on_inventory_reserved)
        self.event_bus.subscribe("PaymentProcessed",   self._on_payment_processed)
        self.event_bus.subscribe("ShipmentCreated",    self._on_shipment_created)
        # Chemin de compensation
        self.event_bus.subscribe("PaymentFailed",      self._on_payment_failed)
        self.event_bus.subscribe("ShipmentFailed",     self._on_shipment_failed)

    async def _on_order_created(self, event: Dict):
        await self.event_bus.publish("ReserveInventory", {
            "saga_id": event["order_id"],
            "order_id": event["order_id"],
            "items": event["items"],
        })

    async def _on_inventory_reserved(self, event: Dict):
        await self.event_bus.publish("ProcessPayment", {
            "saga_id": event["saga_id"],
            "order_id": event["order_id"],
            "amount": event["total_amount"],
            "reservation_id": event["reservation_id"],
        })

    async def _on_payment_processed(self, event: Dict):
        await self.event_bus.publish("CreateShipment", {
            "saga_id": event["saga_id"],
            "order_id": event["order_id"],
            "payment_id": event["payment_id"],
        })

    async def _on_shipment_created(self, event: Dict):
        await self.event_bus.publish("OrderFulfilled", {
            "saga_id": event["saga_id"],
            "order_id": event["order_id"],
            "tracking_number": event["tracking_number"],
        })

    # Gestionnaires de compensation
    async def _on_payment_failed(self, event: Dict):
        """Paiement échoué — libérer l'inventaire et marquer la commande comme échouée."""
        await self.event_bus.publish("ReleaseInventory", {
            "saga_id": event["saga_id"],
            "reservation_id": event["reservation_id"],
        })
        await self.event_bus.publish("OrderFailed", {
            "order_id": event["order_id"],
            "reason": "Payment failed",
        })

    async def _on_shipment_failed(self, event: Dict):
        """Expédition échouée — rembourser le paiement et libérer l'inventaire."""
        await self.event_bus.publish("RefundPayment", {
            "saga_id": event["saga_id"],
            "payment_id": event["payment_id"],
        })
        await self.event_bus.publish("ReleaseInventory", {
            "saga_id": event["saga_id"],
            "reservation_id": event["reservation_id"],
        })

Template 3 : Gardes d'Étape Idempotentes

Chaque participant doit se protéger contre la livraison de commande dupliquée. Stocker une clé d'idempotence avant l'exécution et retourner le résultat mis en cache lors de la rejeu.

async def handle_reserve_items(self, command: Dict):
    """Étape de réservation protégée par l'idempotence."""
    idempotency_key = f"reserve-{command['order_id']}"
    existing = await self.reservation_store.find_by_key(idempotency_key)
    if existing:
        # Déjà exécuté — retourner le résultat précédent sans effets secondaires
        await self.event_publisher.publish("SagaStepCompleted", {
            "saga_id": command["saga_id"],
            "step_name": "reserve_inventory",
            "result": {"reservation_id": existing.id}
        })
        return

    # Première exécution
    reservation = await self.reserve(
        items=command["items"],
        order_id=command["order_id"],
        idempotency_key=idempotency_key
    )
    await self.event_publisher.publish("SagaStepCompleted", {
        "saga_id": command["saga_id"],
        "step_name": "reserve_inventory",
        "result": {"reservation_id": reservation.id}
    })

Bonnes Pratiques

À Faire

  • Rendre chaque étape idempotente — Les commandes peuvent être rejouées lors de la reconnexion du broker
  • Concevoir les compensations soigneusement — C'est le chemin de code le plus critique
  • Utiliser des correlation IDs — Le saga_id doit circuler dans chaque événement et log
  • Implémenter des timeouts par étape — Ne jamais attendre indéfiniment une réponse de participant
  • Logger les transitions d'étatsaga_id, step_name, old_state → new_state à chaque changement
  • Tester explicitement les chemins de compensation — Injecter des défaillances à chaque index d'étape dans les tests d'intégration

À Ne Pas Faire

  • Ne pas supposer une fin instantanée — Les sagas sont async et peuvent prendre des minutes
  • Ne pas sauter les tests de compensation — Le chemin de rollback est le plus difficile à bien faire
  • Ne pas coupler les services directement — Utiliser le messaging async, jamais d'appels synchrones dans une étape de saga
  • Ne pas ignorer les défaillances partielles — Une étape partiellement exécutée a encore besoin de compensation
  • Ne pas utiliser un timeout global — Chaque étape a des caractéristiques de latence différentes

Dépannage

Saga bloquée en état COMPENSATING

Une saga entre en compensation mais ne atteint jamais FAILED. Cela signifie qu'un gestionnaire de compensation lève une exception non gérée et ne publie jamais SagaCompensationCompleted. Ajouter la gestion de dead-letter queue (DLQ) aux consumers de compensation et s'assurer que chaque action de compensation publie un événement de résultat même quand l'opération sous-jacente avait déjà été annulée.

async def handle_release_reservation(self, command: Dict):
    try:
        await self.release_reservation(command["original_result"]["reservation_id"])
    except ReservationNotFoundError:
        pass  # Déjà libéré — traiter comme succès
    # Toujours publier la fin, quel que soit le résultat
    await self.event_publisher.publish("SagaCompensationCompleted", {
        "saga_id": command["saga_id"],
        "step_name": "reserve_inventory"
    })

Exécutions de saga dupliquées au redémarrage

Si votre service d'orchestrateur redémarre au milieu d'une saga, il peut rejour les événements et réexécuter les étapes déjà complétées. Protéger chaque action d'étape avec une clé d'idempotence — voir Template 3 ci-dessus.

Saga de chorégraphie perdant des événements

Dans une saga basée sur la chorégraphie, un service aval peut manquer un événement s'il était hors ligne lors de la publication. Utiliser un broker de messages durable (Kafka avec réplication, RabbitMQ avec persistance) et stocker l'état actuel de la saga dans une table saga_log dédiée pour pouvoir rejour depuis la dernière étape valide connue.

Timeout se déclenchant avant qu'une étape lente mais valide ne se complète

Une étape comme create_shipment peut prendre jusqu'à 15 minutes lors des pics de charge mais votre timeout global est de 5 minutes, causant une compensation erronée. Rendre les timeouts d'étape configurables par type d'étape — voir references/advanced-patterns.md pour l'implémentation TimeoutSagaOrchestrator et le pattern dict STEP_TIMEOUTS.

Ordre de compensation ne correspondant pas à l'ordre d'exécution

Quand deux étapes se complètent avant qu'une défaillance ne soit détectée, la compensation doit s'exécuter en ordre strictement inverse ou vous laissez les données dans un état incohérent. Vérifier que _compensate() itère de current_step - 1 à 0, et ajouter un test d'intégration qui échoue délibérément à chaque index d'étape pour confirmer le bon ordre de rollback.


Patterns Avancés

Le répertoire references/ contient des implémentations prêtes pour la production non nécessaires pour la plupart des sagas :

  • references/advanced-patterns.md — Classe de base abstraite SagaOrchestrator complète, TimeoutSagaOrchestrator avec délais par étape, chaîne de transaction compensatoire de transfert bancaire détaillée, instrumentation Prometheus, alertes PromQL de saga bloquée, et worker de récupération DLQ.

Skills Connexes

  • cqrs-implementation — Associer les sagas avec CQRS pour les mises à jour du read-model après la fin de chaque étape
  • event-store-design — Stocker les événements de saga dans un event store pour une piste d'audit complète et la capacité de rejeu
  • workflow-orchestration-patterns — Moteurs de workflow haut niveau (Temporal, Conductor) qui s'appuient sur les concepts de saga

Skills similaires