azure-eventhub-py

npx skills add https://github.com/microsoft/skills --skill azure-eventhub-py

Azure Event Hubs SDK pour Python

Plateforme de streaming big data pour l'ingestion d'événements haute débit.

Installation

pip install azure-eventhub azure-identity
# Pour le checkpointing avec le stockage blob
pip install azure-eventhub-checkpointstoreblob-aio

Variables d'environnement

EVENT_HUB_FULLY_QUALIFIED_NAMESPACE=<namespace>.servicebus.windows.net  # Requis pour toutes les méthodes d'auth
EVENT_HUB_NAME=my-eventhub  # Requis pour toutes les méthodes d'auth
STORAGE_ACCOUNT_URL=https://<account>.blob.core.windows.net  # Requis pour le stockage des checkpoints
CHECKPOINT_CONTAINER=checkpoints  # Requis pour le stockage des checkpoints
AZURE_TOKEN_CREDENTIALS=prod # Requis uniquement si DefaultAzureCredential est utilisée en production

Authentification

from azure.identity import DefaultAzureCredential, ManagedIdentityCredential
from azure.eventhub import EventHubProducerClient, EventHubConsumerClient

# Dev local : DefaultAzureCredential. Production : définir AZURE_TOKEN_CREDENTIALS=prod ou AZURE_TOKEN_CREDENTIALS=<specific_credential>
credential = DefaultAzureCredential(require_envvar=True)
# Ou utiliser une credential spécifique directement en production :
# Voir https://learn.microsoft.com/python/api/overview/azure/identity-readme?view=azure-python#credential-classes
# credential = ManagedIdentityCredential()
namespace = "<namespace>.servicebus.windows.net"
eventhub_name = "my-eventhub"

# Producer
producer = EventHubProducerClient(
    fully_qualified_namespace=namespace,
    eventhub_name=eventhub_name,
    credential=credential
)

# Consumer
consumer = EventHubConsumerClient(
    fully_qualified_namespace=namespace,
    eventhub_name=eventhub_name,
    consumer_group="$Default",
    credential=credential
)

Types de client

Client Objectif
EventHubProducerClient Envoyer des événements à Event Hub
EventHubConsumerClient Recevoir des événements depuis Event Hub
BlobCheckpointStore Suivre la progression du consumer

Envoyer des événements

from azure.eventhub import EventHubProducerClient, EventData
from azure.identity import DefaultAzureCredential

producer = EventHubProducerClient(
    fully_qualified_namespace="<namespace>.servicebus.windows.net",
    eventhub_name="my-eventhub",
    credential=DefaultAzureCredential()
)

with producer:
    # Créer un batch (gère les limites de taille)
    event_data_batch = producer.create_batch()

    for i in range(10):
        try:
            event_data_batch.add(EventData(f"Event {i}"))
        except ValueError:
            # Le batch est plein, envoyer et en créer un nouveau
            producer.send_batch(event_data_batch)
            event_data_batch = producer.create_batch()
            event_data_batch.add(EventData(f"Event {i}"))

    # Envoyer les restants
    producer.send_batch(event_data_batch)

Envoyer vers une partition spécifique

# Par ID de partition
event_data_batch = producer.create_batch(partition_id="0")

# Par clé de partition (hachage cohérent)
event_data_batch = producer.create_batch(partition_key="user-123")

Recevoir des événements

Réception simple

from azure.eventhub import EventHubConsumerClient

def on_event(partition_context, event):
    print(f"Partition: {partition_context.partition_id}")
    print(f"Data: {event.body_as_str()}")
    partition_context.update_checkpoint(event)

consumer = EventHubConsumerClient(
    fully_qualified_namespace="<namespace>.servicebus.windows.net",
    eventhub_name="my-eventhub",
    consumer_group="$Default",
    credential=DefaultAzureCredential()
)

with consumer:
    consumer.receive(
        on_event=on_event,
        starting_position="-1",  # Début du stream
    )

Avec Blob Checkpoint Store (Production)

from azure.eventhub import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblob import BlobCheckpointStore
from azure.identity import DefaultAzureCredential

checkpoint_store = BlobCheckpointStore(
    blob_account_url="https://<account>.blob.core.windows.net",
    container_name="checkpoints",
    credential=DefaultAzureCredential()
)

consumer = EventHubConsumerClient(
    fully_qualified_namespace="<namespace>.servicebus.windows.net",
    eventhub_name="my-eventhub",
    consumer_group="$Default",
    credential=DefaultAzureCredential(),
    checkpoint_store=checkpoint_store
)

def on_event(partition_context, event):
    print(f"Received: {event.body_as_str()}")
    # Checkpoint après le traitement
    partition_context.update_checkpoint(event)

with consumer:
    consumer.receive(on_event=on_event)

Client async

from azure.eventhub.aio import EventHubProducerClient, EventHubConsumerClient
from azure.identity.aio import DefaultAzureCredential
import asyncio

async def send_events():
    credential = DefaultAzureCredential()

    async with EventHubProducerClient(
        fully_qualified_namespace="<namespace>.servicebus.windows.net",
        eventhub_name="my-eventhub",
        credential=credential
    ) as producer:
        batch = await producer.create_batch()
        batch.add(EventData("Async event"))
        await producer.send_batch(batch)

async def receive_events():
    async def on_event(partition_context, event):
        print(event.body_as_str())
        await partition_context.update_checkpoint(event)

    async with EventHubConsumerClient(
        fully_qualified_namespace="<namespace>.servicebus.windows.net",
        eventhub_name="my-eventhub",
        consumer_group="$Default",
        credential=DefaultAzureCredential()
    ) as consumer:
        await consumer.receive(on_event=on_event)

asyncio.run(send_events())

Propriétés d'événement

event = EventData("My event body")

# Définir les propriétés
event.properties = {"custom_property": "value"}
event.content_type = "application/json"

# Lire les propriétés (à la réception)
print(event.body_as_str())
print(event.sequence_number)
print(event.offset)
print(event.enqueued_time)
print(event.partition_key)

Obtenir les informations Event Hub

with producer:
    info = producer.get_eventhub_properties()
    print(f"Name: {info['name']}")
    print(f"Partitions: {info['partition_ids']}")

    for partition_id in info['partition_ids']:
        partition_info = producer.get_partition_properties(partition_id)
        print(f"Partition {partition_id}: {partition_info['last_enqueued_sequence_number']}")

Bonnes pratiques

  1. Utiliser les batches pour envoyer plusieurs événements
  2. Utiliser le checkpoint store en production pour un traitement fiable
  3. Utiliser le client async pour les scénarios haute débit
  4. Utiliser les clés de partition pour une livraison ordonnée dans une partition
  5. Gérer les limites de taille de batch — capturer ValueError quand le batch est plein
  6. Utiliser les gestionnaires de contexte (with/async with) pour un nettoyage correct
  7. Définir les groupes de consumers appropriés pour différentes applications

Fichiers de référence

Fichier Contenu
references/checkpointing.md Modèles de checkpoint store, checkpointing blob, stratégies de checkpoint
references/partitions.md Gestion des partitions, équilibrage de charge, positions de démarrage
scripts/setup_consumer.py CLI pour les informations Event Hub, configuration du consumer, envoi/réception d'événements

Skills similaires