event-store-design

Par wshobson · agents

Concevez et implémentez des event stores pour les systèmes basés sur l'event sourcing. À utiliser lors de la construction d'une infrastructure d'event sourcing, du choix de technologies d'event store, ou de l'implémentation de patterns de persistance d'événements.

npx skills add https://github.com/wshobson/agents --skill event-store-design

Conception d'Event Store

Guide complet pour concevoir des event stores pour des applications event-sourcées.

Quand utiliser cette compétence

  • Concevoir l'infrastructure d'event sourcing
  • Choisir entre les technologies d'event store
  • Implémenter des event stores personnalisés
  • Optimiser le stockage et la récupération d'événements
  • Configurer les schémas d'event store
  • Planifier la scalabilité de l'event store

Concepts fondamentaux

1. Architecture de l'Event Store

┌─────────────────────────────────────────────────────┐
│                    Event Store                       │
├─────────────────────────────────────────────────────┤
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐ │
│  │   Stream 1   │  │   Stream 2   │  │   Stream 3   │ │
│  │ (Aggregate)  │  │ (Aggregate)  │  │ (Aggregate)  │ │
│  ├─────────────┤  ├─────────────┤  ├─────────────┤ │
│  │ Event 1     │  │ Event 1     │  │ Event 1     │ │
│  │ Event 2     │  │ Event 2     │  │ Event 2     │ │
│  │ Event 3     │  │ ...         │  │ Event 3     │ │
│  │ ...         │  │             │  │ Event 4     │ │
│  └─────────────┘  └─────────────┘  └─────────────┘ │
├─────────────────────────────────────────────────────┤
│  Global Position: 1 → 2 → 3 → 4 → 5 → 6 → ...     │
└─────────────────────────────────────────────────────┘

2. Prérequis de l'Event Store

Prérequis Description
Append-only Les événements sont immuables, uniquement des ajouts
Ordonné Ordonnancement par stream et global
Versionné Contrôle d'optimistic concurrency
Subscriptions Notifications d'événements en temps réel
Idempotent Gérer les écritures dupliquées de manière sûre

Comparaison des technologies

Technologie Idéal pour Limitations
EventStoreDB Event sourcing pur Monobjectif
PostgreSQL Stack Postgres existant Implémentation manuelle
Kafka Streaming haute débit Pas idéal pour les requêtes par stream
DynamoDB Serverless, natif AWS Limitations de requête
Marten Écosystèmes .NET Spécifique à .NET

Modèles

Modèle 1 : Schéma PostgreSQL Event Store

-- Table des événements
CREATE TABLE events (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    stream_id VARCHAR(255) NOT NULL,
    stream_type VARCHAR(255) NOT NULL,
    event_type VARCHAR(255) NOT NULL,
    event_data JSONB NOT NULL,
    metadata JSONB DEFAULT '{}',
    version BIGINT NOT NULL,
    global_position BIGSERIAL,
    created_at TIMESTAMPTZ DEFAULT NOW(),

    CONSTRAINT unique_stream_version UNIQUE (stream_id, version)
);

-- Index pour les requêtes de stream
CREATE INDEX idx_events_stream_id ON events(stream_id, version);

-- Index pour la souscription globale
CREATE INDEX idx_events_global_position ON events(global_position);

-- Index pour les requêtes de type d'événement
CREATE INDEX idx_events_event_type ON events(event_type);

-- Index pour les requêtes basées sur le temps
CREATE INDEX idx_events_created_at ON events(created_at);

-- Table des snapshots
CREATE TABLE snapshots (
    stream_id VARCHAR(255) PRIMARY KEY,
    stream_type VARCHAR(255) NOT NULL,
    snapshot_data JSONB NOT NULL,
    version BIGINT NOT NULL,
    created_at TIMESTAMPTZ DEFAULT NOW()
);

-- Table de checkpoint des souscriptions
CREATE TABLE subscription_checkpoints (
    subscription_id VARCHAR(255) PRIMARY KEY,
    last_position BIGINT NOT NULL DEFAULT 0,
    updated_at TIMESTAMPTZ DEFAULT NOW()
);

Modèle 2 : Implémentation Python Event Store

from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Optional, List
from uuid import UUID, uuid4
import json
import asyncpg

@dataclass
class Event:
    stream_id: str
    event_type: str
    data: dict
    metadata: dict = field(default_factory=dict)
    event_id: UUID = field(default_factory=uuid4)
    version: Optional[int] = None
    global_position: Optional[int] = None
    created_at: datetime = field(default_factory=datetime.utcnow)


class EventStore:
    def __init__(self, pool: asyncpg.Pool):
        self.pool = pool

    async def append_events(
        self,
        stream_id: str,
        stream_type: str,
        events: List[Event],
        expected_version: Optional[int] = None
    ) -> List[Event]:
        """Ajouter des événements à un stream avec optimistic concurrency."""
        async with self.pool.acquire() as conn:
            async with conn.transaction():
                # Vérifier la version attendue
                if expected_version is not None:
                    current = await conn.fetchval(
                        "SELECT MAX(version) FROM events WHERE stream_id = $1",
                        stream_id
                    )
                    current = current or 0
                    if current != expected_version:
                        raise ConcurrencyError(
                            f"Expected version {expected_version}, got {current}"
                        )

                # Obtenir la version de départ
                start_version = await conn.fetchval(
                    "SELECT COALESCE(MAX(version), 0) + 1 FROM events WHERE stream_id = $1",
                    stream_id
                )

                # Insérer les événements
                saved_events = []
                for i, event in enumerate(events):
                    event.version = start_version + i
                    row = await conn.fetchrow(
                        """
                        INSERT INTO events (id, stream_id, stream_type, event_type,
                                          event_data, metadata, version, created_at)
                        VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
                        RETURNING global_position
                        """,
                        event.event_id,
                        stream_id,
                        stream_type,
                        event.event_type,
                        json.dumps(event.data),
                        json.dumps(event.metadata),
                        event.version,
                        event.created_at
                    )
                    event.global_position = row['global_position']
                    saved_events.append(event)

                return saved_events

    async def read_stream(
        self,
        stream_id: str,
        from_version: int = 0,
        limit: int = 1000
    ) -> List[Event]:
        """Lire les événements d'un stream."""
        async with self.pool.acquire() as conn:
            rows = await conn.fetch(
                """
                SELECT id, stream_id, event_type, event_data, metadata,
                       version, global_position, created_at
                FROM events
                WHERE stream_id = $1 AND version >= $2
                ORDER BY version
                LIMIT $3
                """,
                stream_id, from_version, limit
            )
            return [self._row_to_event(row) for row in rows]

    async def read_all(
        self,
        from_position: int = 0,
        limit: int = 1000
    ) -> List[Event]:
        """Lire tous les événements globalement."""
        async with self.pool.acquire() as conn:
            rows = await conn.fetch(
                """
                SELECT id, stream_id, event_type, event_data, metadata,
                       version, global_position, created_at
                FROM events
                WHERE global_position > $1
                ORDER BY global_position
                LIMIT $2
                """,
                from_position, limit
            )
            return [self._row_to_event(row) for row in rows]

    async def subscribe(
        self,
        subscription_id: str,
        handler,
        from_position: int = 0,
        batch_size: int = 100
    ):
        """S'abonner à tous les événements à partir d'une position."""
        # Obtenir le checkpoint
        async with self.pool.acquire() as conn:
            checkpoint = await conn.fetchval(
                """
                SELECT last_position FROM subscription_checkpoints
                WHERE subscription_id = $1
                """,
                subscription_id
            )
            position = checkpoint or from_position

        while True:
            events = await self.read_all(position, batch_size)
            if not events:
                await asyncio.sleep(1)  # Intervalle de polling
                continue

            for event in events:
                await handler(event)
                position = event.global_position

            # Sauvegarder le checkpoint
            async with self.pool.acquire() as conn:
                await conn.execute(
                    """
                    INSERT INTO subscription_checkpoints (subscription_id, last_position)
                    VALUES ($1, $2)
                    ON CONFLICT (subscription_id)
                    DO UPDATE SET last_position = $2, updated_at = NOW()
                    """,
                    subscription_id, position
                )

    def _row_to_event(self, row) -> Event:
        return Event(
            event_id=row['id'],
            stream_id=row['stream_id'],
            event_type=row['event_type'],
            data=json.loads(row['event_data']),
            metadata=json.loads(row['metadata']),
            version=row['version'],
            global_position=row['global_position'],
            created_at=row['created_at']
        )


class ConcurrencyError(Exception):
    """Levée quand la vérification d'optimistic concurrency échoue."""
    pass

Modèle 3 : Utilisation d'EventStoreDB

from esdbclient import EventStoreDBClient, NewEvent, StreamState
import json

# Connexion
client = EventStoreDBClient(uri="esdb://localhost:2113?tls=false")

# Ajouter des événements
def append_events(stream_name: str, events: list, expected_revision=None):
    new_events = [
        NewEvent(
            type=event['type'],
            data=json.dumps(event['data']).encode(),
            metadata=json.dumps(event.get('metadata', {})).encode()
        )
        for event in events
    ]

    if expected_revision is None:
        state = StreamState.ANY
    elif expected_revision == -1:
        state = StreamState.NO_STREAM
    else:
        state = expected_revision

    return client.append_to_stream(
        stream_name=stream_name,
        events=new_events,
        current_version=state
    )

# Lire un stream
def read_stream(stream_name: str, from_revision: int = 0):
    events = client.get_stream(
        stream_name=stream_name,
        stream_position=from_revision
    )
    return [
        {
            'type': event.type,
            'data': json.loads(event.data),
            'metadata': json.loads(event.metadata) if event.metadata else {},
            'stream_position': event.stream_position,
            'commit_position': event.commit_position
        }
        for event in events
    ]

# S'abonner à tous
async def subscribe_to_all(handler, from_position: int = 0):
    subscription = client.subscribe_to_all(commit_position=from_position)
    async for event in subscription:
        await handler({
            'type': event.type,
            'data': json.loads(event.data),
            'stream_id': event.stream_name,
            'position': event.commit_position
        })

# Projection de catégorie ($ce-Category)
def read_category(category: str):
    """Lire tous les événements d'une catégorie en utilisant la projection système."""
    return read_stream(f"$ce-{category}")

Modèle 4 : Event Store DynamoDB

import boto3
from boto3.dynamodb.conditions import Key
from datetime import datetime
import json
import uuid

class DynamoEventStore:
    def __init__(self, table_name: str):
        self.dynamodb = boto3.resource('dynamodb')
        self.table = self.dynamodb.Table(table_name)

    def append_events(self, stream_id: str, events: list, expected_version: int = None):
        """Ajouter des événements avec écriture conditionnelle pour la concurrence."""
        with self.table.batch_writer() as batch:
            for i, event in enumerate(events):
                version = (expected_version or 0) + i + 1
                item = {
                    'PK': f"STREAM#{stream_id}",
                    'SK': f"VERSION#{version:020d}",
                    'GSI1PK': 'EVENTS',
                    'GSI1SK': datetime.utcnow().isoformat(),
                    'event_id': str(uuid.uuid4()),
                    'stream_id': stream_id,
                    'event_type': event['type'],
                    'event_data': json.dumps(event['data']),
                    'version': version,
                    'created_at': datetime.utcnow().isoformat()
                }
                batch.put_item(Item=item)
        return events

    def read_stream(self, stream_id: str, from_version: int = 0):
        """Lire les événements d'un stream."""
        response = self.table.query(
            KeyConditionExpression=Key('PK').eq(f"STREAM#{stream_id}") &
                                  Key('SK').gte(f"VERSION#{from_version:020d}")
        )
        return [
            {
                'event_type': item['event_type'],
                'data': json.loads(item['event_data']),
                'version': item['version']
            }
            for item in response['Items']
        ]

# Définition de la table (CloudFormation/Terraform)
"""
Table DynamoDB :
  - PK (Partition Key) : String
  - SK (Sort Key) : String
  - GSI1PK, GSI1SK pour l'ordonnancement global

Capacité : À la demande ou provisionné selon les besoins de débit
"""

Bonnes pratiques

À faire

  • Utiliser des IDs de stream qui incluent le type d'agrégat - Order-{uuid}
  • Inclure des IDs de corrélation/causalité - Pour le tracing
  • Versionner les événements dès le jour 1 - Planifier l'évolution du schéma
  • Implémenter l'idempotence - Utiliser les IDs d'événement pour la déduplication
  • Indexer correctement - En fonction de vos motifs de requête

À ne pas faire

  • Ne pas mettre à jour ou supprimer des événements - Ce sont des faits immuables
  • Ne pas stocker de grandes charges utiles - Garder les événements petits
  • Ne pas ignorer l'optimistic concurrency - Prévient la corruption de données
  • Ne pas ignorer la contrepression - Gérer les consommateurs lents

Skills similaires