Conception d'Event Store
Guide complet pour concevoir des event stores pour des applications event-sourcées.
Quand utiliser cette compétence
- Concevoir l'infrastructure d'event sourcing
- Choisir entre les technologies d'event store
- Implémenter des event stores personnalisés
- Optimiser le stockage et la récupération d'événements
- Configurer les schémas d'event store
- Planifier la scalabilité de l'event store
Concepts fondamentaux
1. Architecture de l'Event Store
┌─────────────────────────────────────────────────────┐
│ Event Store │
├─────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Stream 1 │ │ Stream 2 │ │ Stream 3 │ │
│ │ (Aggregate) │ │ (Aggregate) │ │ (Aggregate) │ │
│ ├─────────────┤ ├─────────────┤ ├─────────────┤ │
│ │ Event 1 │ │ Event 1 │ │ Event 1 │ │
│ │ Event 2 │ │ Event 2 │ │ Event 2 │ │
│ │ Event 3 │ │ ... │ │ Event 3 │ │
│ │ ... │ │ │ │ Event 4 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
├─────────────────────────────────────────────────────┤
│ Global Position: 1 → 2 → 3 → 4 → 5 → 6 → ... │
└─────────────────────────────────────────────────────┘
2. Prérequis de l'Event Store
| Prérequis |
Description |
| Append-only |
Les événements sont immuables, uniquement des ajouts |
| Ordonné |
Ordonnancement par stream et global |
| Versionné |
Contrôle d'optimistic concurrency |
| Subscriptions |
Notifications d'événements en temps réel |
| Idempotent |
Gérer les écritures dupliquées de manière sûre |
Comparaison des technologies
| Technologie |
Idéal pour |
Limitations |
| EventStoreDB |
Event sourcing pur |
Monobjectif |
| PostgreSQL |
Stack Postgres existant |
Implémentation manuelle |
| Kafka |
Streaming haute débit |
Pas idéal pour les requêtes par stream |
| DynamoDB |
Serverless, natif AWS |
Limitations de requête |
| Marten |
Écosystèmes .NET |
Spécifique à .NET |
Modèles
Modèle 1 : Schéma PostgreSQL Event Store
-- Table des événements
CREATE TABLE events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
stream_id VARCHAR(255) NOT NULL,
stream_type VARCHAR(255) NOT NULL,
event_type VARCHAR(255) NOT NULL,
event_data JSONB NOT NULL,
metadata JSONB DEFAULT '{}',
version BIGINT NOT NULL,
global_position BIGSERIAL,
created_at TIMESTAMPTZ DEFAULT NOW(),
CONSTRAINT unique_stream_version UNIQUE (stream_id, version)
);
-- Index pour les requêtes de stream
CREATE INDEX idx_events_stream_id ON events(stream_id, version);
-- Index pour la souscription globale
CREATE INDEX idx_events_global_position ON events(global_position);
-- Index pour les requêtes de type d'événement
CREATE INDEX idx_events_event_type ON events(event_type);
-- Index pour les requêtes basées sur le temps
CREATE INDEX idx_events_created_at ON events(created_at);
-- Table des snapshots
CREATE TABLE snapshots (
stream_id VARCHAR(255) PRIMARY KEY,
stream_type VARCHAR(255) NOT NULL,
snapshot_data JSONB NOT NULL,
version BIGINT NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- Table de checkpoint des souscriptions
CREATE TABLE subscription_checkpoints (
subscription_id VARCHAR(255) PRIMARY KEY,
last_position BIGINT NOT NULL DEFAULT 0,
updated_at TIMESTAMPTZ DEFAULT NOW()
);
Modèle 2 : Implémentation Python Event Store
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Optional, List
from uuid import UUID, uuid4
import json
import asyncpg
@dataclass
class Event:
stream_id: str
event_type: str
data: dict
metadata: dict = field(default_factory=dict)
event_id: UUID = field(default_factory=uuid4)
version: Optional[int] = None
global_position: Optional[int] = None
created_at: datetime = field(default_factory=datetime.utcnow)
class EventStore:
def __init__(self, pool: asyncpg.Pool):
self.pool = pool
async def append_events(
self,
stream_id: str,
stream_type: str,
events: List[Event],
expected_version: Optional[int] = None
) -> List[Event]:
"""Ajouter des événements à un stream avec optimistic concurrency."""
async with self.pool.acquire() as conn:
async with conn.transaction():
# Vérifier la version attendue
if expected_version is not None:
current = await conn.fetchval(
"SELECT MAX(version) FROM events WHERE stream_id = $1",
stream_id
)
current = current or 0
if current != expected_version:
raise ConcurrencyError(
f"Expected version {expected_version}, got {current}"
)
# Obtenir la version de départ
start_version = await conn.fetchval(
"SELECT COALESCE(MAX(version), 0) + 1 FROM events WHERE stream_id = $1",
stream_id
)
# Insérer les événements
saved_events = []
for i, event in enumerate(events):
event.version = start_version + i
row = await conn.fetchrow(
"""
INSERT INTO events (id, stream_id, stream_type, event_type,
event_data, metadata, version, created_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
RETURNING global_position
""",
event.event_id,
stream_id,
stream_type,
event.event_type,
json.dumps(event.data),
json.dumps(event.metadata),
event.version,
event.created_at
)
event.global_position = row['global_position']
saved_events.append(event)
return saved_events
async def read_stream(
self,
stream_id: str,
from_version: int = 0,
limit: int = 1000
) -> List[Event]:
"""Lire les événements d'un stream."""
async with self.pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT id, stream_id, event_type, event_data, metadata,
version, global_position, created_at
FROM events
WHERE stream_id = $1 AND version >= $2
ORDER BY version
LIMIT $3
""",
stream_id, from_version, limit
)
return [self._row_to_event(row) for row in rows]
async def read_all(
self,
from_position: int = 0,
limit: int = 1000
) -> List[Event]:
"""Lire tous les événements globalement."""
async with self.pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT id, stream_id, event_type, event_data, metadata,
version, global_position, created_at
FROM events
WHERE global_position > $1
ORDER BY global_position
LIMIT $2
""",
from_position, limit
)
return [self._row_to_event(row) for row in rows]
async def subscribe(
self,
subscription_id: str,
handler,
from_position: int = 0,
batch_size: int = 100
):
"""S'abonner à tous les événements à partir d'une position."""
# Obtenir le checkpoint
async with self.pool.acquire() as conn:
checkpoint = await conn.fetchval(
"""
SELECT last_position FROM subscription_checkpoints
WHERE subscription_id = $1
""",
subscription_id
)
position = checkpoint or from_position
while True:
events = await self.read_all(position, batch_size)
if not events:
await asyncio.sleep(1) # Intervalle de polling
continue
for event in events:
await handler(event)
position = event.global_position
# Sauvegarder le checkpoint
async with self.pool.acquire() as conn:
await conn.execute(
"""
INSERT INTO subscription_checkpoints (subscription_id, last_position)
VALUES ($1, $2)
ON CONFLICT (subscription_id)
DO UPDATE SET last_position = $2, updated_at = NOW()
""",
subscription_id, position
)
def _row_to_event(self, row) -> Event:
return Event(
event_id=row['id'],
stream_id=row['stream_id'],
event_type=row['event_type'],
data=json.loads(row['event_data']),
metadata=json.loads(row['metadata']),
version=row['version'],
global_position=row['global_position'],
created_at=row['created_at']
)
class ConcurrencyError(Exception):
"""Levée quand la vérification d'optimistic concurrency échoue."""
pass
Modèle 3 : Utilisation d'EventStoreDB
from esdbclient import EventStoreDBClient, NewEvent, StreamState
import json
# Connexion
client = EventStoreDBClient(uri="esdb://localhost:2113?tls=false")
# Ajouter des événements
def append_events(stream_name: str, events: list, expected_revision=None):
new_events = [
NewEvent(
type=event['type'],
data=json.dumps(event['data']).encode(),
metadata=json.dumps(event.get('metadata', {})).encode()
)
for event in events
]
if expected_revision is None:
state = StreamState.ANY
elif expected_revision == -1:
state = StreamState.NO_STREAM
else:
state = expected_revision
return client.append_to_stream(
stream_name=stream_name,
events=new_events,
current_version=state
)
# Lire un stream
def read_stream(stream_name: str, from_revision: int = 0):
events = client.get_stream(
stream_name=stream_name,
stream_position=from_revision
)
return [
{
'type': event.type,
'data': json.loads(event.data),
'metadata': json.loads(event.metadata) if event.metadata else {},
'stream_position': event.stream_position,
'commit_position': event.commit_position
}
for event in events
]
# S'abonner à tous
async def subscribe_to_all(handler, from_position: int = 0):
subscription = client.subscribe_to_all(commit_position=from_position)
async for event in subscription:
await handler({
'type': event.type,
'data': json.loads(event.data),
'stream_id': event.stream_name,
'position': event.commit_position
})
# Projection de catégorie ($ce-Category)
def read_category(category: str):
"""Lire tous les événements d'une catégorie en utilisant la projection système."""
return read_stream(f"$ce-{category}")
Modèle 4 : Event Store DynamoDB
import boto3
from boto3.dynamodb.conditions import Key
from datetime import datetime
import json
import uuid
class DynamoEventStore:
def __init__(self, table_name: str):
self.dynamodb = boto3.resource('dynamodb')
self.table = self.dynamodb.Table(table_name)
def append_events(self, stream_id: str, events: list, expected_version: int = None):
"""Ajouter des événements avec écriture conditionnelle pour la concurrence."""
with self.table.batch_writer() as batch:
for i, event in enumerate(events):
version = (expected_version or 0) + i + 1
item = {
'PK': f"STREAM#{stream_id}",
'SK': f"VERSION#{version:020d}",
'GSI1PK': 'EVENTS',
'GSI1SK': datetime.utcnow().isoformat(),
'event_id': str(uuid.uuid4()),
'stream_id': stream_id,
'event_type': event['type'],
'event_data': json.dumps(event['data']),
'version': version,
'created_at': datetime.utcnow().isoformat()
}
batch.put_item(Item=item)
return events
def read_stream(self, stream_id: str, from_version: int = 0):
"""Lire les événements d'un stream."""
response = self.table.query(
KeyConditionExpression=Key('PK').eq(f"STREAM#{stream_id}") &
Key('SK').gte(f"VERSION#{from_version:020d}")
)
return [
{
'event_type': item['event_type'],
'data': json.loads(item['event_data']),
'version': item['version']
}
for item in response['Items']
]
# Définition de la table (CloudFormation/Terraform)
"""
Table DynamoDB :
- PK (Partition Key) : String
- SK (Sort Key) : String
- GSI1PK, GSI1SK pour l'ordonnancement global
Capacité : À la demande ou provisionné selon les besoins de débit
"""
Bonnes pratiques
À faire
- Utiliser des IDs de stream qui incluent le type d'agrégat -
Order-{uuid}
- Inclure des IDs de corrélation/causalité - Pour le tracing
- Versionner les événements dès le jour 1 - Planifier l'évolution du schéma
- Implémenter l'idempotence - Utiliser les IDs d'événement pour la déduplication
- Indexer correctement - En fonction de vos motifs de requête
À ne pas faire
- Ne pas mettre à jour ou supprimer des événements - Ce sont des faits immuables
- Ne pas stocker de grandes charges utiles - Garder les événements petits
- Ne pas ignorer l'optimistic concurrency - Prévient la corruption de données
- Ne pas ignorer la contrepression - Gérer les consommateurs lents