Implémentation CQRS
Guide complet pour implémenter des patterns CQRS (Command Query Responsibility Segregation).
Quand utiliser cette compétence
- Séparer les préoccupations de lecture et d'écriture
- Mettre à l'échelle les lectures indépendamment des écritures
- Construire des systèmes basés sur event sourcing
- Optimiser les scénarios de requêtes complexes
- Modèles de données lecture/écriture différents nécessaires
- Exigences de reporting haute performance
Concepts fondamentaux
1. Architecture CQRS
┌─────────────┐
│ Client │
└──────┬──────┘
│
┌────────────┴────────────┐
│ │
▼ ▼
┌─────────────┐ ┌─────────────┐
│ Commands │ │ Queries │
│ API │ │ API │
└──────┬──────┘ └──────┬──────┘
│ │
▼ ▼
┌─────────────┐ ┌─────────────┐
│ Command │ │ Query │
│ Handlers │ │ Handlers │
└──────┬──────┘ └──────┬──────┘
│ │
▼ ▼
┌─────────────┐ ┌─────────────┐
│ Write │─────────►│ Read │
│ Model │ Events │ Model │
└─────────────┘ └─────────────┘
2. Composants clés
| Composant | Responsabilité |
|---|---|
| Command | Intention de changer l'état |
| Command Handler | Valide et exécute les commandes |
| Event | Enregistrement du changement |
| Query | Demande de données |
| Query Handler | Récupère les données du modèle de lecture |
| Projector | Met à jour le modèle de lecture depuis les événements |
Templates
Template 1 : Infrastructure de commandes
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import TypeVar, Generic, Dict, Any, Type
from datetime import datetime
import uuid
# Base de commande
@dataclass
class Command:
command_id: str = None
timestamp: datetime = None
def __post_init__(self):
self.command_id = self.command_id or str(uuid.uuid4())
self.timestamp = self.timestamp or datetime.utcnow()
# Commandes concrètes
@dataclass
class CreateOrder(Command):
customer_id: str
items: list
shipping_address: dict
@dataclass
class AddOrderItem(Command):
order_id: str
product_id: str
quantity: int
price: float
@dataclass
class CancelOrder(Command):
order_id: str
reason: str
# Base du gestionnaire de commandes
T = TypeVar('T', bound=Command)
class CommandHandler(ABC, Generic[T]):
@abstractmethod
async def handle(self, command: T) -> Any:
pass
# Bus de commandes
class CommandBus:
def __init__(self):
self._handlers: Dict[Type[Command], CommandHandler] = {}
def register(self, command_type: Type[Command], handler: CommandHandler):
self._handlers[command_type] = handler
async def dispatch(self, command: Command) -> Any:
handler = self._handlers.get(type(command))
if not handler:
raise ValueError(f"No handler for {type(command).__name__}")
return await handler.handle(command)
# Implémentation du gestionnaire de commandes
class CreateOrderHandler(CommandHandler[CreateOrder]):
def __init__(self, order_repository, event_store):
self.order_repository = order_repository
self.event_store = event_store
async def handle(self, command: CreateOrder) -> str:
# Valider
if not command.items:
raise ValueError("Order must have at least one item")
# Créer un agrégat
order = Order.create(
customer_id=command.customer_id,
items=command.items,
shipping_address=command.shipping_address
)
# Persister les événements
await self.event_store.append_events(
stream_id=f"Order-{order.id}",
stream_type="Order",
events=order.uncommitted_events
)
return order.id
Template 2 : Infrastructure de requêtes
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import TypeVar, Generic, List, Optional
# Base de requête
@dataclass
class Query:
pass
# Requêtes concrètes
@dataclass
class GetOrderById(Query):
order_id: str
@dataclass
class GetCustomerOrders(Query):
customer_id: str
status: Optional[str] = None
page: int = 1
page_size: int = 20
@dataclass
class SearchOrders(Query):
query: str
filters: dict = None
sort_by: str = "created_at"
sort_order: str = "desc"
# Types de résultat de requête
@dataclass
class OrderView:
order_id: str
customer_id: str
status: str
total_amount: float
item_count: int
created_at: datetime
shipped_at: Optional[datetime] = None
@dataclass
class PaginatedResult(Generic[T]):
items: List[T]
total: int
page: int
page_size: int
@property
def total_pages(self) -> int:
return (self.total + self.page_size - 1) // self.page_size
# Base du gestionnaire de requête
T = TypeVar('T', bound=Query)
R = TypeVar('R')
class QueryHandler(ABC, Generic[T, R]):
@abstractmethod
async def handle(self, query: T) -> R:
pass
# Bus de requêtes
class QueryBus:
def __init__(self):
self._handlers: Dict[Type[Query], QueryHandler] = {}
def register(self, query_type: Type[Query], handler: QueryHandler):
self._handlers[query_type] = handler
async def dispatch(self, query: Query) -> Any:
handler = self._handlers.get(type(query))
if not handler:
raise ValueError(f"No handler for {type(query).__name__}")
return await handler.handle(query)
# Implémentation du gestionnaire de requête
class GetOrderByIdHandler(QueryHandler[GetOrderById, Optional[OrderView]]):
def __init__(self, read_db):
self.read_db = read_db
async def handle(self, query: GetOrderById) -> Optional[OrderView]:
async with self.read_db.acquire() as conn:
row = await conn.fetchrow(
"""
SELECT order_id, customer_id, status, total_amount,
item_count, created_at, shipped_at
FROM order_views
WHERE order_id = $1
""",
query.order_id
)
if row:
return OrderView(**dict(row))
return None
class GetCustomerOrdersHandler(QueryHandler[GetCustomerOrders, PaginatedResult[OrderView]]):
def __init__(self, read_db):
self.read_db = read_db
async def handle(self, query: GetCustomerOrders) -> PaginatedResult[OrderView]:
async with self.read_db.acquire() as conn:
# Construire la requête avec filtre de statut optionnel
where_clause = "customer_id = $1"
params = [query.customer_id]
if query.status:
where_clause += " AND status = $2"
params.append(query.status)
# Obtenir le nombre total
total = await conn.fetchval(
f"SELECT COUNT(*) FROM order_views WHERE {where_clause}",
*params
)
# Obtenir les résultats paginés
offset = (query.page - 1) * query.page_size
rows = await conn.fetch(
f"""
SELECT order_id, customer_id, status, total_amount,
item_count, created_at, shipped_at
FROM order_views
WHERE {where_clause}
ORDER BY created_at DESC
LIMIT ${len(params) + 1} OFFSET ${len(params) + 2}
""",
*params, query.page_size, offset
)
return PaginatedResult(
items=[OrderView(**dict(row)) for row in rows],
total=total,
page=query.page,
page_size=query.page_size
)
Template 3 : Application FastAPI CQRS
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel
from typing import List, Optional
app = FastAPI()
# Modèles de requête/réponse
class CreateOrderRequest(BaseModel):
customer_id: str
items: List[dict]
shipping_address: dict
class OrderResponse(BaseModel):
order_id: str
customer_id: str
status: str
total_amount: float
item_count: int
created_at: datetime
# Injection de dépendances
def get_command_bus() -> CommandBus:
return app.state.command_bus
def get_query_bus() -> QueryBus:
return app.state.query_bus
# Endpoints de commandes (POST, PUT, DELETE)
@app.post("/orders", response_model=dict)
async def create_order(
request: CreateOrderRequest,
command_bus: CommandBus = Depends(get_command_bus)
):
command = CreateOrder(
customer_id=request.customer_id,
items=request.items,
shipping_address=request.shipping_address
)
order_id = await command_bus.dispatch(command)
return {"order_id": order_id}
@app.post("/orders/{order_id}/items")
async def add_item(
order_id: str,
product_id: str,
quantity: int,
price: float,
command_bus: CommandBus = Depends(get_command_bus)
):
command = AddOrderItem(
order_id=order_id,
product_id=product_id,
quantity=quantity,
price=price
)
await command_bus.dispatch(command)
return {"status": "item_added"}
@app.delete("/orders/{order_id}")
async def cancel_order(
order_id: str,
reason: str,
command_bus: CommandBus = Depends(get_command_bus)
):
command = CancelOrder(order_id=order_id, reason=reason)
await command_bus.dispatch(command)
return {"status": "cancelled"}
# Endpoints de requêtes (GET)
@app.get("/orders/{order_id}", response_model=OrderResponse)
async def get_order(
order_id: str,
query_bus: QueryBus = Depends(get_query_bus)
):
query = GetOrderById(order_id=order_id)
result = await query_bus.dispatch(query)
if not result:
raise HTTPException(status_code=404, detail="Order not found")
return result
@app.get("/customers/{customer_id}/orders")
async def get_customer_orders(
customer_id: str,
status: Optional[str] = None,
page: int = 1,
page_size: int = 20,
query_bus: QueryBus = Depends(get_query_bus)
):
query = GetCustomerOrders(
customer_id=customer_id,
status=status,
page=page,
page_size=page_size
)
return await query_bus.dispatch(query)
@app.get("/orders/search")
async def search_orders(
q: str,
sort_by: str = "created_at",
query_bus: QueryBus = Depends(get_query_bus)
):
query = SearchOrders(query=q, sort_by=sort_by)
return await query_bus.dispatch(query)
Template 4 : Synchronisation du modèle de lecture
class ReadModelSynchronizer:
"""Garde les modèles de lecture synchronisés avec les événements."""
def __init__(self, event_store, read_db, projections: List[Projection]):
self.event_store = event_store
self.read_db = read_db
self.projections = {p.name: p for p in projections}
async def run(self):
"""Synchronise continuellement les modèles de lecture."""
while True:
for name, projection in self.projections.items():
await self._sync_projection(projection)
await asyncio.sleep(0.1)
async def _sync_projection(self, projection: Projection):
checkpoint = await self._get_checkpoint(projection.name)
events = await self.event_store.read_all(
from_position=checkpoint,
limit=100
)
for event in events:
if event.event_type in projection.handles():
try:
await projection.apply(event)
except Exception as e:
# Enregistrer l'erreur, éventuellement réessayer ou ignorer
logger.error(f"Projection error: {e}")
continue
await self._save_checkpoint(projection.name, event.global_position)
async def rebuild_projection(self, projection_name: str):
"""Reconstruire une projection depuis zéro."""
projection = self.projections[projection_name]
# Effacer les données existantes
await projection.clear()
# Réinitialiser le checkpoint
await self._save_checkpoint(projection_name, 0)
# Reconstruire
while True:
checkpoint = await self._get_checkpoint(projection_name)
events = await self.event_store.read_all(checkpoint, 1000)
if not events:
break
for event in events:
if event.event_type in projection.handles():
await projection.apply(event)
await self._save_checkpoint(
projection_name,
events[-1].global_position
)
Template 5 : Gestion de la cohérence éventuelle
class ConsistentQueryHandler:
"""Gestionnaire de requête qui peut attendre la cohérence."""
def __init__(self, read_db, event_store):
self.read_db = read_db
self.event_store = event_store
async def query_after_command(
self,
query: Query,
expected_version: int,
stream_id: str,
timeout: float = 5.0
):
"""
Exécuter la requête, en assurant que le modèle de lecture est à la version attendue.
Utilisé pour la cohérence read-your-writes.
"""
start_time = time.time()
while time.time() - start_time < timeout:
# Vérifier si le modèle de lecture a rattrapé son retard
projection_version = await self._get_projection_version(stream_id)
if projection_version >= expected_version:
return await self.execute_query(query)
# Attendre un peu et réessayer
await asyncio.sleep(0.1)
# Timeout - renvoyer les données obsolètes avec un avertissement
return {
"data": await self.execute_query(query),
"_warning": "Data may be stale"
}
async def _get_projection_version(self, stream_id: str) -> int:
"""Obtenir la dernière version d'événement traitée pour un flux."""
async with self.read_db.acquire() as conn:
return await conn.fetchval(
"SELECT last_event_version FROM projection_state WHERE stream_id = $1",
stream_id
) or 0
Bonnes pratiques
À faire
- Séparer les modèles de commande et de requête - Besoins différents
- Utiliser la cohérence éventuelle - Accepter le délai de propagation
- Valider dans les gestionnaires de commande - Avant le changement d'état
- Dénormaliser les modèles de lecture - Optimiser pour les requêtes
- Versionner vos événements - Pour l'évolution du schéma
À ne pas faire
- Ne pas interroger dans les commandes - Utiliser uniquement pour les écritures
- Ne pas coupler les schémas lecture/écriture - Évolution indépendante
- Ne pas sur-ingénier - Commencer simple
- Ne pas ignorer les SLA de cohérence - Définir le délai acceptable