Azure Event Hubs SDK pour Python
Plateforme de streaming big data pour l'ingestion d'événements haute débit.
Installation
pip install azure-eventhub azure-identity
# Pour le checkpointing avec le stockage blob
pip install azure-eventhub-checkpointstoreblob-aio
Variables d'environnement
EVENT_HUB_FULLY_QUALIFIED_NAMESPACE=<namespace>.servicebus.windows.net # Requis pour toutes les méthodes d'auth
EVENT_HUB_NAME=my-eventhub # Requis pour toutes les méthodes d'auth
STORAGE_ACCOUNT_URL=https://<account>.blob.core.windows.net # Requis pour le stockage des checkpoints
CHECKPOINT_CONTAINER=checkpoints # Requis pour le stockage des checkpoints
AZURE_TOKEN_CREDENTIALS=prod # Requis uniquement si DefaultAzureCredential est utilisée en production
Authentification
from azure.identity import DefaultAzureCredential, ManagedIdentityCredential
from azure.eventhub import EventHubProducerClient, EventHubConsumerClient
# Dev local : DefaultAzureCredential. Production : définir AZURE_TOKEN_CREDENTIALS=prod ou AZURE_TOKEN_CREDENTIALS=<specific_credential>
credential = DefaultAzureCredential(require_envvar=True)
# Ou utiliser une credential spécifique directement en production :
# Voir https://learn.microsoft.com/python/api/overview/azure/identity-readme?view=azure-python#credential-classes
# credential = ManagedIdentityCredential()
namespace = "<namespace>.servicebus.windows.net"
eventhub_name = "my-eventhub"
# Producer
producer = EventHubProducerClient(
fully_qualified_namespace=namespace,
eventhub_name=eventhub_name,
credential=credential
)
# Consumer
consumer = EventHubConsumerClient(
fully_qualified_namespace=namespace,
eventhub_name=eventhub_name,
consumer_group="$Default",
credential=credential
)
Types de client
| Client |
Objectif |
EventHubProducerClient |
Envoyer des événements à Event Hub |
EventHubConsumerClient |
Recevoir des événements depuis Event Hub |
BlobCheckpointStore |
Suivre la progression du consumer |
Envoyer des événements
from azure.eventhub import EventHubProducerClient, EventData
from azure.identity import DefaultAzureCredential
producer = EventHubProducerClient(
fully_qualified_namespace="<namespace>.servicebus.windows.net",
eventhub_name="my-eventhub",
credential=DefaultAzureCredential()
)
with producer:
# Créer un batch (gère les limites de taille)
event_data_batch = producer.create_batch()
for i in range(10):
try:
event_data_batch.add(EventData(f"Event {i}"))
except ValueError:
# Le batch est plein, envoyer et en créer un nouveau
producer.send_batch(event_data_batch)
event_data_batch = producer.create_batch()
event_data_batch.add(EventData(f"Event {i}"))
# Envoyer les restants
producer.send_batch(event_data_batch)
Envoyer vers une partition spécifique
# Par ID de partition
event_data_batch = producer.create_batch(partition_id="0")
# Par clé de partition (hachage cohérent)
event_data_batch = producer.create_batch(partition_key="user-123")
Recevoir des événements
Réception simple
from azure.eventhub import EventHubConsumerClient
def on_event(partition_context, event):
print(f"Partition: {partition_context.partition_id}")
print(f"Data: {event.body_as_str()}")
partition_context.update_checkpoint(event)
consumer = EventHubConsumerClient(
fully_qualified_namespace="<namespace>.servicebus.windows.net",
eventhub_name="my-eventhub",
consumer_group="$Default",
credential=DefaultAzureCredential()
)
with consumer:
consumer.receive(
on_event=on_event,
starting_position="-1", # Début du stream
)
Avec Blob Checkpoint Store (Production)
from azure.eventhub import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblob import BlobCheckpointStore
from azure.identity import DefaultAzureCredential
checkpoint_store = BlobCheckpointStore(
blob_account_url="https://<account>.blob.core.windows.net",
container_name="checkpoints",
credential=DefaultAzureCredential()
)
consumer = EventHubConsumerClient(
fully_qualified_namespace="<namespace>.servicebus.windows.net",
eventhub_name="my-eventhub",
consumer_group="$Default",
credential=DefaultAzureCredential(),
checkpoint_store=checkpoint_store
)
def on_event(partition_context, event):
print(f"Received: {event.body_as_str()}")
# Checkpoint après le traitement
partition_context.update_checkpoint(event)
with consumer:
consumer.receive(on_event=on_event)
Client async
from azure.eventhub.aio import EventHubProducerClient, EventHubConsumerClient
from azure.identity.aio import DefaultAzureCredential
import asyncio
async def send_events():
credential = DefaultAzureCredential()
async with EventHubProducerClient(
fully_qualified_namespace="<namespace>.servicebus.windows.net",
eventhub_name="my-eventhub",
credential=credential
) as producer:
batch = await producer.create_batch()
batch.add(EventData("Async event"))
await producer.send_batch(batch)
async def receive_events():
async def on_event(partition_context, event):
print(event.body_as_str())
await partition_context.update_checkpoint(event)
async with EventHubConsumerClient(
fully_qualified_namespace="<namespace>.servicebus.windows.net",
eventhub_name="my-eventhub",
consumer_group="$Default",
credential=DefaultAzureCredential()
) as consumer:
await consumer.receive(on_event=on_event)
asyncio.run(send_events())
Propriétés d'événement
event = EventData("My event body")
# Définir les propriétés
event.properties = {"custom_property": "value"}
event.content_type = "application/json"
# Lire les propriétés (à la réception)
print(event.body_as_str())
print(event.sequence_number)
print(event.offset)
print(event.enqueued_time)
print(event.partition_key)
Obtenir les informations Event Hub
with producer:
info = producer.get_eventhub_properties()
print(f"Name: {info['name']}")
print(f"Partitions: {info['partition_ids']}")
for partition_id in info['partition_ids']:
partition_info = producer.get_partition_properties(partition_id)
print(f"Partition {partition_id}: {partition_info['last_enqueued_sequence_number']}")
Bonnes pratiques
- Utiliser les batches pour envoyer plusieurs événements
- Utiliser le checkpoint store en production pour un traitement fiable
- Utiliser le client async pour les scénarios haute débit
- Utiliser les clés de partition pour une livraison ordonnée dans une partition
- Gérer les limites de taille de batch — capturer ValueError quand le batch est plein
- Utiliser les gestionnaires de contexte (
with/async with) pour un nettoyage correct
- Définir les groupes de consumers appropriés pour différentes applications
Fichiers de référence