Azure.Messaging.EventHubs (.NET)
SDK de streaming d'événements haute performance pour envoyer et recevoir des événements via Azure Event Hubs.
Installation
# Package principal (envoi et réception simple)
dotnet add package Azure.Messaging.EventHubs
# Package Processor (réception en production avec points de contrôle)
dotnet add package Azure.Messaging.EventHubs.Processor
# Authentification
dotnet add package Azure.Identity
# Pour les points de contrôle (requis par EventProcessorClient)
dotnet add package Azure.Storage.Blobs
Versions actuelles : Azure.Messaging.EventHubs v5.12.2, Azure.Messaging.EventHubs.Processor v5.12.2
Variables d'environnement
EVENTHUB_FULLY_QUALIFIED_NAMESPACE=<namespace>.servicebus.windows.net
EVENTHUB_NAME=<event-hub-name>
# Pour les points de contrôle (EventProcessorClient)
BLOB_STORAGE_CONNECTION_STRING=<storage-connection-string>
BLOB_CONTAINER_NAME=<checkpoint-container>
# Alternative : authentification par chaîne de connexion (non recommandée en production)
EVENTHUB_CONNECTION_STRING=Endpoint=sb://<namespace>.servicebus.windows.net/;SharedAccessKeyName=...
Authentification
using Azure.Identity;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Producer;
// Utilisez toujours DefaultAzureCredential en production
var credential = new DefaultAzureCredential();
var fullyQualifiedNamespace = Environment.GetEnvironmentVariable("EVENTHUB_FULLY_QUALIFIED_NAMESPACE");
var eventHubName = Environment.GetEnvironmentVariable("EVENTHUB_NAME");
var producer = new EventHubProducerClient(
fullyQualifiedNamespace,
eventHubName,
credential);
Rôles RBAC requis :
- Envoi :
Azure Event Hubs Data Sender - Réception :
Azure Event Hubs Data Receiver - Les deux :
Azure Event Hubs Data Owner
Types de clients
| Client | Objectif | Quand l'utiliser |
|---|---|---|
EventHubProducerClient |
Envoyer des événements immédiatement par lots | Envoi en temps réel, contrôle total du lot |
EventHubBufferedProducerClient |
Lot automatique avec envoi en arrière-plan | Très haut volume, scénarios fire-and-forget |
EventHubConsumerClient |
Lecture simple d'événements | Prototypage uniquement, PAS pour la production |
EventProcessorClient |
Traitement d'événements en production | Toujours l'utiliser pour la réception en production |
Flux de travail principal
1. Envoyer des événements (Lot)
using Azure.Identity;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Producer;
await using var producer = new EventHubProducerClient(
fullyQualifiedNamespace,
eventHubName,
new DefaultAzureCredential());
// Créer un lot (respecte automatiquement les limites de taille)
using EventDataBatch batch = await producer.CreateBatchAsync();
// Ajouter des événements au lot
var events = new[]
{
new EventData(BinaryData.FromString("{\"id\": 1, \"message\": \"Hello\"}")),
new EventData(BinaryData.FromString("{\"id\": 2, \"message\": \"World\"}"))
};
foreach (var eventData in events)
{
if (!batch.TryAdd(eventData))
{
// Lot plein - l'envoyer et en créer un nouveau
await producer.SendAsync(batch);
batch = await producer.CreateBatchAsync();
if (!batch.TryAdd(eventData))
{
throw new Exception("Event too large for empty batch");
}
}
}
// Envoyer les événements restants
if (batch.Count > 0)
{
await producer.SendAsync(batch);
}
2. Envoyer des événements (Bufférisé - Haut volume)
using Azure.Messaging.EventHubs.Producer;
var options = new EventHubBufferedProducerClientOptions
{
MaximumWaitTime = TimeSpan.FromSeconds(1)
};
await using var producer = new EventHubBufferedProducerClient(
fullyQualifiedNamespace,
eventHubName,
new DefaultAzureCredential(),
options);
// Gérer succès/échec d'envoi
producer.SendEventBatchSucceededAsync += args =>
{
Console.WriteLine($"Batch sent: {args.EventBatch.Count} events");
return Task.CompletedTask;
};
producer.SendEventBatchFailedAsync += args =>
{
Console.WriteLine($"Batch failed: {args.Exception.Message}");
return Task.CompletedTask;
};
// Mettre en file d'attente les événements (envoyés automatiquement en arrière-plan)
for (int i = 0; i < 1000; i++)
{
await producer.EnqueueEventAsync(new EventData($"Event {i}"));
}
// Vider les événements restants avant suppression
await producer.FlushAsync();
3. Recevoir des événements (Production - EventProcessorClient)
using Azure.Identity;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Consumer;
using Azure.Messaging.EventHubs.Processor;
using Azure.Storage.Blobs;
// Conteneur blob pour les points de contrôle
var blobClient = new BlobContainerClient(
Environment.GetEnvironmentVariable("BLOB_STORAGE_CONNECTION_STRING"),
Environment.GetEnvironmentVariable("BLOB_CONTAINER_NAME"));
await blobClient.CreateIfNotExistsAsync();
// Créer le processeur
var processor = new EventProcessorClient(
blobClient,
EventHubConsumerClient.DefaultConsumerGroup,
fullyQualifiedNamespace,
eventHubName,
new DefaultAzureCredential());
// Gérer les événements
processor.ProcessEventAsync += async args =>
{
Console.WriteLine($"Partition: {args.Partition.PartitionId}");
Console.WriteLine($"Data: {args.Data.EventBody}");
// Point de contrôle après traitement (ou points de contrôle par lot)
await args.UpdateCheckpointAsync();
};
// Gérer les erreurs
processor.ProcessErrorAsync += args =>
{
Console.WriteLine($"Error: {args.Exception.Message}");
Console.WriteLine($"Partition: {args.PartitionId}");
return Task.CompletedTask;
};
// Démarrer le traitement
await processor.StartProcessingAsync();
// Exécuter jusqu'à annulation
await Task.Delay(Timeout.Infinite, cancellationToken);
// Arrêter proprement
await processor.StopProcessingAsync();
4. Opérations sur les partitions
// Obtenir les IDs des partitions
string[] partitionIds = await producer.GetPartitionIdsAsync();
// Envoyer à une partition spécifique (à utiliser avec parcimonie)
var options = new SendEventOptions
{
PartitionId = "0"
};
await producer.SendAsync(events, options);
// Utiliser une clé de partition (recommandé pour l'ordre)
var batchOptions = new CreateBatchOptions
{
PartitionKey = "customer-123" // Les événements avec la même clé vont à la même partition
};
using var batch = await producer.CreateBatchAsync(batchOptions);
Options EventPosition
Contrôler où commencer la lecture :
// Commencer depuis le début
EventPosition.Earliest
// Commencer depuis la fin (nouveaux événements uniquement)
EventPosition.Latest
// Commencer à partir d'un offset spécifique
EventPosition.FromOffset(12345)
// Commencer à partir d'un numéro de séquence spécifique
EventPosition.FromSequenceNumber(100)
// Commencer à partir d'une heure spécifique
EventPosition.FromEnqueuedTime(DateTimeOffset.UtcNow.AddHours(-1))
Intégration ASP.NET Core
// Program.cs
using Azure.Identity;
using Azure.Messaging.EventHubs.Producer;
using Microsoft.Extensions.Azure;
builder.Services.AddAzureClients(clientBuilder =>
{
clientBuilder.AddEventHubProducerClient(
builder.Configuration["EventHub:FullyQualifiedNamespace"],
builder.Configuration["EventHub:Name"]);
clientBuilder.UseCredential(new DefaultAzureCredential());
});
// Injecter dans un contrôleur/service
public class EventService
{
private readonly EventHubProducerClient _producer;
public EventService(EventHubProducerClient producer)
{
_producer = producer;
}
public async Task SendAsync(string message)
{
using var batch = await _producer.CreateBatchAsync();
batch.TryAdd(new EventData(message));
await _producer.SendAsync(batch);
}
}
Meilleures pratiques
- Utiliser
EventProcessorClientpour la réception — Ne jamais utiliserEventHubConsumerClienten production - Faire des points de contrôle de manière stratégique — Après N événements ou un intervalle de temps, pas à chaque événement
- Utiliser des clés de partition — Pour les garanties d'ordre dans une partition
- Réutiliser les clients — Créer une fois, utiliser comme singleton (thread-safe)
- Utiliser
await using— Assure une suppression appropriée - Gérer
ProcessErrorAsync— Toujours enregistrer un gestionnaire d'erreurs - Faire des lots d'événements — Utiliser
CreateBatchAsync()pour respecter les limites de taille - Utiliser le producteur bufférisé — Pour les scénarios de très haut volume avec lot automatique
Gestion des erreurs
using Azure.Messaging.EventHubs;
try
{
await producer.SendAsync(batch);
}
catch (EventHubsException ex) when (ex.Reason == EventHubsException.FailureReason.ServiceBusy)
{
// Réessayer avec backoff
await Task.Delay(TimeSpan.FromSeconds(5));
}
catch (EventHubsException ex) when (ex.IsTransient)
{
// Erreur transitoire - sûr de réessayer
Console.WriteLine($"Transient error: {ex.Message}");
}
catch (EventHubsException ex)
{
// Erreur non transitoire
Console.WriteLine($"Error: {ex.Reason} - {ex.Message}");
}
Stratégies de points de contrôle
| Stratégie | Quand l'utiliser |
|---|---|
| Chaque événement | Faible volume, données critiques |
| Tous les N événements | Débit équilibré/fiabilité |
| Basée sur le temps | Intervalles de points de contrôle constants |
| Complétion de lot | Après traitement d'un lot logique |
// Point de contrôle tous les 100 événements
private int _eventCount = 0;
processor.ProcessEventAsync += async args =>
{
// Traiter l'événement...
_eventCount++;
if (_eventCount >= 100)
{
await args.UpdateCheckpointAsync();
_eventCount = 0;
}
};
SDKs associés
| SDK | Objectif | Installation |
|---|---|---|
Azure.Messaging.EventHubs |
Envoi/réception principal | dotnet add package Azure.Messaging.EventHubs |
Azure.Messaging.EventHubs.Processor |
Traitement en production | dotnet add package Azure.Messaging.EventHubs.Processor |
Azure.ResourceManager.EventHubs |
Plan de gestion (créer des hubs) | dotnet add package Azure.ResourceManager.EventHubs |
Microsoft.Azure.WebJobs.Extensions.EventHubs |
Liaison Azure Functions | dotnet add package Microsoft.Azure.WebJobs.Extensions.EventHubs |