Orchestration de Saga
Patterns pour gérer les transactions distribuées et les processus métier longue durée sans validation en deux phases.
Entrées et Sorties
Ce que vous fournissez :
- Limites de service et responsabilités (quel service possède quelle étape)
- Exigences transactionnelles (quelles étapes doivent être atomiques, lesquelles peuvent être finalement cohérentes)
- Modes de défaillance pour chaque étape (transitoires vs. permanents, politique de retry)
- Exigences SLA par étape (informe la configuration des timeouts)
- Infrastructure de messaging/événements existante (Kafka, RabbitMQ, SQS, etc.)
Ce que cette skill produit :
- Définition de saga avec étapes ordonnées, commandes d'action et commandes de compensation
- Implémentation orchestratrice ou chorégraphique pour le pattern choisi
- Logique de compensation pour chaque service participant (idempotente, toujours réussie)
- Configuration des timeouts d'étape avec délais par étape
- Configuration du monitoring : métriques de machine d'état, détection de saga bloquée, récupération DLQ
Quand utiliser cette skill
- Coordonner des transactions multi-service sans verrous distribués
- Implémenter les transactions compensatoires pour les défaillances partielles
- Gérer les workflows métier longue durée (minutes à heures)
- Gérer les défaillances dans les systèmes distribués où l'atomicité est requise
- Construire des processus de traitement de commande, approbation ou réservation
- Remplacer la fragile validation en deux phases par une compensation async
Concepts Clés
Types de Pattern Saga
Chorégraphie Orchestration
┌─────┐ ┌─────┐ ┌─────┐ ┌─────────────┐
│Svc A│─►│Svc B│─►│Svc C│ │ Orchestrator│
└─────┘ └─────┘ └─────┘ └──────┬──────┘
│ │ │ │
▼ ▼ ▼ ┌─────┼─────┐
Event Event Event ▼ ▼ ▼
┌────┐┌────┐┌────┐
Chaque service réagit à │Svc1││Svc2││Svc3│
l'événement du service └────┘└────┘└────┘
précédent.
Pas de coordinateur central. Coordinateur central envoie
des commandes et suit l'état.
Choisir l'orchestration quand : Vous avez besoin d'un suivi explicite des étapes, de retries et de visibilité centralisée. Plus facile à déboguer.
Choisir la chorégraphie quand : Vous voulez un couplage faible et des services qui peuvent évoluer indépendamment. Plus difficile à tracer.
États d'Exécution de Saga
| État | Description |
|---|---|
| Started | Saga initiée, première étape envoyée |
| Pending | En attente de réponse d'étape d'un participant |
| Compensating | Une étape a échoué ; rollback des étapes réussies |
| Completed | Toutes les étapes avant ont réussi |
| Failed | Saga échouée et toutes les compensations terminées |
Règles de Compensation
| Situation | Gestion |
|---|---|
| Étape jamais démarrée | Aucune compensation nécessaire (sauter) |
| Étape complétée avec succès | Exécuter la commande de compensation |
| Étape échouée avant la fin | Aucune compensation nécessaire ; marquer échouée |
| La compensation elle-même échoue | Retry avec backoff → DLQ → alerte intervention manuelle |
| Résultat d'étape n'existe plus | Traiter compensation comme succès (idempotence) |
Templates
Template 1 : Saga de Traitement de Commande (Orchestration)
Sous-classe concrète de l'orchestrateur de base. Définit quatre étapes couvrant l'inventaire, le paiement, l'expédition et la notification. Voir references/advanced-patterns.md pour la classe de base abstraite SagaOrchestrator complète.
from saga_orchestrator import SagaOrchestrator, SagaStep
from typing import Dict, List
class OrderFulfillmentSaga(SagaOrchestrator):
"""Orchestre le traitement des commandes entre quatre services participants."""
@property
def saga_type(self) -> str:
return "OrderFulfillment"
def define_steps(self, data: Dict) -> List[SagaStep]:
return [
SagaStep(
name="reserve_inventory",
action="InventoryService.ReserveItems",
compensation="InventoryService.ReleaseReservation"
),
SagaStep(
name="process_payment",
action="PaymentService.ProcessPayment",
compensation="PaymentService.RefundPayment"
),
SagaStep(
name="create_shipment",
action="ShippingService.CreateShipment",
compensation="ShippingService.CancelShipment"
),
SagaStep(
name="send_confirmation",
action="NotificationService.SendOrderConfirmation",
compensation="NotificationService.SendCancellationNotice"
),
]
# Démarrer une saga
async def create_order(order_data: Dict, saga_store, event_publisher):
saga = OrderFulfillmentSaga(saga_store, event_publisher)
return await saga.start({
"order_id": order_data["order_id"],
"customer_id": order_data["customer_id"],
"items": order_data["items"],
"payment_method": order_data["payment_method"],
"shipping_address": order_data["shipping_address"],
})
# Service participant — gère la commande et publie la réponse
class InventoryService:
async def handle_reserve_items(self, command: Dict):
try:
reservation = await self.reserve(command["items"], command["order_id"])
await self.event_publisher.publish("SagaStepCompleted", {
"saga_id": command["saga_id"],
"step_name": "reserve_inventory",
"result": {"reservation_id": reservation.id}
})
except InsufficientInventoryError as e:
await self.event_publisher.publish("SagaStepFailed", {
"saga_id": command["saga_id"],
"step_name": "reserve_inventory",
"error": str(e)
})
async def handle_release_reservation(self, command: Dict):
"""Compensation — idempotente, publie toujours la fin."""
try:
await self.release_reservation(
command["original_result"]["reservation_id"]
)
except ReservationNotFoundError:
pass # Déjà libéré — traiter comme succès
await self.event_publisher.publish("SagaCompensationCompleted", {
"saga_id": command["saga_id"],
"step_name": "reserve_inventory"
})
Template 2 : Saga Basée sur la Chorégraphie
Chaque service écoute l'événement du service précédent et réagit. Pas de coordinateur central. La compensation est déclenchée par les événements d'échec qui se propagent en arrière.
from dataclasses import dataclass
from typing import Dict, Any
@dataclass
class SagaContext:
"""Transporté à travers tous les événements d'une saga chorégraphiée."""
saga_id: str
step: int
data: Dict[str, Any]
completed_steps: list
class OrderChoreographySaga:
"""Saga basée sur la chorégraphie — les services réagissent aux événements les uns des autres."""
def __init__(self, event_bus):
self.event_bus = event_bus
self._register_handlers()
def _register_handlers(self):
# Chemin avant
self.event_bus.subscribe("OrderCreated", self._on_order_created)
self.event_bus.subscribe("InventoryReserved", self._on_inventory_reserved)
self.event_bus.subscribe("PaymentProcessed", self._on_payment_processed)
self.event_bus.subscribe("ShipmentCreated", self._on_shipment_created)
# Chemin de compensation
self.event_bus.subscribe("PaymentFailed", self._on_payment_failed)
self.event_bus.subscribe("ShipmentFailed", self._on_shipment_failed)
async def _on_order_created(self, event: Dict):
await self.event_bus.publish("ReserveInventory", {
"saga_id": event["order_id"],
"order_id": event["order_id"],
"items": event["items"],
})
async def _on_inventory_reserved(self, event: Dict):
await self.event_bus.publish("ProcessPayment", {
"saga_id": event["saga_id"],
"order_id": event["order_id"],
"amount": event["total_amount"],
"reservation_id": event["reservation_id"],
})
async def _on_payment_processed(self, event: Dict):
await self.event_bus.publish("CreateShipment", {
"saga_id": event["saga_id"],
"order_id": event["order_id"],
"payment_id": event["payment_id"],
})
async def _on_shipment_created(self, event: Dict):
await self.event_bus.publish("OrderFulfilled", {
"saga_id": event["saga_id"],
"order_id": event["order_id"],
"tracking_number": event["tracking_number"],
})
# Gestionnaires de compensation
async def _on_payment_failed(self, event: Dict):
"""Paiement échoué — libérer l'inventaire et marquer la commande comme échouée."""
await self.event_bus.publish("ReleaseInventory", {
"saga_id": event["saga_id"],
"reservation_id": event["reservation_id"],
})
await self.event_bus.publish("OrderFailed", {
"order_id": event["order_id"],
"reason": "Payment failed",
})
async def _on_shipment_failed(self, event: Dict):
"""Expédition échouée — rembourser le paiement et libérer l'inventaire."""
await self.event_bus.publish("RefundPayment", {
"saga_id": event["saga_id"],
"payment_id": event["payment_id"],
})
await self.event_bus.publish("ReleaseInventory", {
"saga_id": event["saga_id"],
"reservation_id": event["reservation_id"],
})
Template 3 : Gardes d'Étape Idempotentes
Chaque participant doit se protéger contre la livraison de commande dupliquée. Stocker une clé d'idempotence avant l'exécution et retourner le résultat mis en cache lors de la rejeu.
async def handle_reserve_items(self, command: Dict):
"""Étape de réservation protégée par l'idempotence."""
idempotency_key = f"reserve-{command['order_id']}"
existing = await self.reservation_store.find_by_key(idempotency_key)
if existing:
# Déjà exécuté — retourner le résultat précédent sans effets secondaires
await self.event_publisher.publish("SagaStepCompleted", {
"saga_id": command["saga_id"],
"step_name": "reserve_inventory",
"result": {"reservation_id": existing.id}
})
return
# Première exécution
reservation = await self.reserve(
items=command["items"],
order_id=command["order_id"],
idempotency_key=idempotency_key
)
await self.event_publisher.publish("SagaStepCompleted", {
"saga_id": command["saga_id"],
"step_name": "reserve_inventory",
"result": {"reservation_id": reservation.id}
})
Bonnes Pratiques
À Faire
- Rendre chaque étape idempotente — Les commandes peuvent être rejouées lors de la reconnexion du broker
- Concevoir les compensations soigneusement — C'est le chemin de code le plus critique
- Utiliser des correlation IDs — Le
saga_iddoit circuler dans chaque événement et log - Implémenter des timeouts par étape — Ne jamais attendre indéfiniment une réponse de participant
- Logger les transitions d'état —
saga_id,step_name,old_state → new_stateà chaque changement - Tester explicitement les chemins de compensation — Injecter des défaillances à chaque index d'étape dans les tests d'intégration
À Ne Pas Faire
- Ne pas supposer une fin instantanée — Les sagas sont async et peuvent prendre des minutes
- Ne pas sauter les tests de compensation — Le chemin de rollback est le plus difficile à bien faire
- Ne pas coupler les services directement — Utiliser le messaging async, jamais d'appels synchrones dans une étape de saga
- Ne pas ignorer les défaillances partielles — Une étape partiellement exécutée a encore besoin de compensation
- Ne pas utiliser un timeout global — Chaque étape a des caractéristiques de latence différentes
Dépannage
Saga bloquée en état COMPENSATING
Une saga entre en compensation mais ne atteint jamais FAILED. Cela signifie qu'un gestionnaire de compensation lève une exception non gérée et ne publie jamais SagaCompensationCompleted. Ajouter la gestion de dead-letter queue (DLQ) aux consumers de compensation et s'assurer que chaque action de compensation publie un événement de résultat même quand l'opération sous-jacente avait déjà été annulée.
async def handle_release_reservation(self, command: Dict):
try:
await self.release_reservation(command["original_result"]["reservation_id"])
except ReservationNotFoundError:
pass # Déjà libéré — traiter comme succès
# Toujours publier la fin, quel que soit le résultat
await self.event_publisher.publish("SagaCompensationCompleted", {
"saga_id": command["saga_id"],
"step_name": "reserve_inventory"
})
Exécutions de saga dupliquées au redémarrage
Si votre service d'orchestrateur redémarre au milieu d'une saga, il peut rejour les événements et réexécuter les étapes déjà complétées. Protéger chaque action d'étape avec une clé d'idempotence — voir Template 3 ci-dessus.
Saga de chorégraphie perdant des événements
Dans une saga basée sur la chorégraphie, un service aval peut manquer un événement s'il était hors ligne lors de la publication. Utiliser un broker de messages durable (Kafka avec réplication, RabbitMQ avec persistance) et stocker l'état actuel de la saga dans une table saga_log dédiée pour pouvoir rejour depuis la dernière étape valide connue.
Timeout se déclenchant avant qu'une étape lente mais valide ne se complète
Une étape comme create_shipment peut prendre jusqu'à 15 minutes lors des pics de charge mais votre timeout global est de 5 minutes, causant une compensation erronée. Rendre les timeouts d'étape configurables par type d'étape — voir references/advanced-patterns.md pour l'implémentation TimeoutSagaOrchestrator et le pattern dict STEP_TIMEOUTS.
Ordre de compensation ne correspondant pas à l'ordre d'exécution
Quand deux étapes se complètent avant qu'une défaillance ne soit détectée, la compensation doit s'exécuter en ordre strictement inverse ou vous laissez les données dans un état incohérent. Vérifier que _compensate() itère de current_step - 1 à 0, et ajouter un test d'intégration qui échoue délibérément à chaque index d'étape pour confirmer le bon ordre de rollback.
Patterns Avancés
Le répertoire references/ contient des implémentations prêtes pour la production non nécessaires pour la plupart des sagas :
references/advanced-patterns.md— Classe de base abstraiteSagaOrchestratorcomplète,TimeoutSagaOrchestratoravec délais par étape, chaîne de transaction compensatoire de transfert bancaire détaillée, instrumentation Prometheus, alertes PromQL de saga bloquée, et worker de récupération DLQ.
Skills Connexes
cqrs-implementation— Associer les sagas avec CQRS pour les mises à jour du read-model après la fin de chaque étapeevent-store-design— Stocker les événements de saga dans un event store pour une piste d'audit complète et la capacité de rejeuworkflow-orchestration-patterns— Moteurs de workflow haut niveau (Temporal, Conductor) qui s'appuient sur les concepts de saga