azure-eventhub-dotnet

npx skills add https://github.com/microsoft/skills --skill azure-eventhub-dotnet

Azure.Messaging.EventHubs (.NET)

SDK de streaming d'événements haute performance pour envoyer et recevoir des événements via Azure Event Hubs.

Installation

# Package principal (envoi et réception simple)
dotnet add package Azure.Messaging.EventHubs

# Package Processor (réception en production avec points de contrôle)
dotnet add package Azure.Messaging.EventHubs.Processor

# Authentification
dotnet add package Azure.Identity

# Pour les points de contrôle (requis par EventProcessorClient)
dotnet add package Azure.Storage.Blobs

Versions actuelles : Azure.Messaging.EventHubs v5.12.2, Azure.Messaging.EventHubs.Processor v5.12.2

Variables d'environnement

EVENTHUB_FULLY_QUALIFIED_NAMESPACE=<namespace>.servicebus.windows.net
EVENTHUB_NAME=<event-hub-name>

# Pour les points de contrôle (EventProcessorClient)
BLOB_STORAGE_CONNECTION_STRING=<storage-connection-string>
BLOB_CONTAINER_NAME=<checkpoint-container>

# Alternative : authentification par chaîne de connexion (non recommandée en production)
EVENTHUB_CONNECTION_STRING=Endpoint=sb://<namespace>.servicebus.windows.net/;SharedAccessKeyName=...

Authentification

using Azure.Identity;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Producer;

// Utilisez toujours DefaultAzureCredential en production
var credential = new DefaultAzureCredential();

var fullyQualifiedNamespace = Environment.GetEnvironmentVariable("EVENTHUB_FULLY_QUALIFIED_NAMESPACE");
var eventHubName = Environment.GetEnvironmentVariable("EVENTHUB_NAME");

var producer = new EventHubProducerClient(
    fullyQualifiedNamespace,
    eventHubName,
    credential);

Rôles RBAC requis :

  • Envoi : Azure Event Hubs Data Sender
  • Réception : Azure Event Hubs Data Receiver
  • Les deux : Azure Event Hubs Data Owner

Types de clients

Client Objectif Quand l'utiliser
EventHubProducerClient Envoyer des événements immédiatement par lots Envoi en temps réel, contrôle total du lot
EventHubBufferedProducerClient Lot automatique avec envoi en arrière-plan Très haut volume, scénarios fire-and-forget
EventHubConsumerClient Lecture simple d'événements Prototypage uniquement, PAS pour la production
EventProcessorClient Traitement d'événements en production Toujours l'utiliser pour la réception en production

Flux de travail principal

1. Envoyer des événements (Lot)

using Azure.Identity;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Producer;

await using var producer = new EventHubProducerClient(
    fullyQualifiedNamespace,
    eventHubName,
    new DefaultAzureCredential());

// Créer un lot (respecte automatiquement les limites de taille)
using EventDataBatch batch = await producer.CreateBatchAsync();

// Ajouter des événements au lot
var events = new[]
{
    new EventData(BinaryData.FromString("{\"id\": 1, \"message\": \"Hello\"}")),
    new EventData(BinaryData.FromString("{\"id\": 2, \"message\": \"World\"}"))
};

foreach (var eventData in events)
{
    if (!batch.TryAdd(eventData))
    {
        // Lot plein - l'envoyer et en créer un nouveau
        await producer.SendAsync(batch);
        batch = await producer.CreateBatchAsync();

        if (!batch.TryAdd(eventData))
        {
            throw new Exception("Event too large for empty batch");
        }
    }
}

// Envoyer les événements restants
if (batch.Count > 0)
{
    await producer.SendAsync(batch);
}

2. Envoyer des événements (Bufférisé - Haut volume)

using Azure.Messaging.EventHubs.Producer;

var options = new EventHubBufferedProducerClientOptions
{
    MaximumWaitTime = TimeSpan.FromSeconds(1)
};

await using var producer = new EventHubBufferedProducerClient(
    fullyQualifiedNamespace,
    eventHubName,
    new DefaultAzureCredential(),
    options);

// Gérer succès/échec d'envoi
producer.SendEventBatchSucceededAsync += args =>
{
    Console.WriteLine($"Batch sent: {args.EventBatch.Count} events");
    return Task.CompletedTask;
};

producer.SendEventBatchFailedAsync += args =>
{
    Console.WriteLine($"Batch failed: {args.Exception.Message}");
    return Task.CompletedTask;
};

// Mettre en file d'attente les événements (envoyés automatiquement en arrière-plan)
for (int i = 0; i < 1000; i++)
{
    await producer.EnqueueEventAsync(new EventData($"Event {i}"));
}

// Vider les événements restants avant suppression
await producer.FlushAsync();

3. Recevoir des événements (Production - EventProcessorClient)

using Azure.Identity;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Consumer;
using Azure.Messaging.EventHubs.Processor;
using Azure.Storage.Blobs;

// Conteneur blob pour les points de contrôle
var blobClient = new BlobContainerClient(
    Environment.GetEnvironmentVariable("BLOB_STORAGE_CONNECTION_STRING"),
    Environment.GetEnvironmentVariable("BLOB_CONTAINER_NAME"));

await blobClient.CreateIfNotExistsAsync();

// Créer le processeur
var processor = new EventProcessorClient(
    blobClient,
    EventHubConsumerClient.DefaultConsumerGroup,
    fullyQualifiedNamespace,
    eventHubName,
    new DefaultAzureCredential());

// Gérer les événements
processor.ProcessEventAsync += async args =>
{
    Console.WriteLine($"Partition: {args.Partition.PartitionId}");
    Console.WriteLine($"Data: {args.Data.EventBody}");

    // Point de contrôle après traitement (ou points de contrôle par lot)
    await args.UpdateCheckpointAsync();
};

// Gérer les erreurs
processor.ProcessErrorAsync += args =>
{
    Console.WriteLine($"Error: {args.Exception.Message}");
    Console.WriteLine($"Partition: {args.PartitionId}");
    return Task.CompletedTask;
};

// Démarrer le traitement
await processor.StartProcessingAsync();

// Exécuter jusqu'à annulation
await Task.Delay(Timeout.Infinite, cancellationToken);

// Arrêter proprement
await processor.StopProcessingAsync();

4. Opérations sur les partitions

// Obtenir les IDs des partitions
string[] partitionIds = await producer.GetPartitionIdsAsync();

// Envoyer à une partition spécifique (à utiliser avec parcimonie)
var options = new SendEventOptions
{
    PartitionId = "0"
};
await producer.SendAsync(events, options);

// Utiliser une clé de partition (recommandé pour l'ordre)
var batchOptions = new CreateBatchOptions
{
    PartitionKey = "customer-123"  // Les événements avec la même clé vont à la même partition
};
using var batch = await producer.CreateBatchAsync(batchOptions);

Options EventPosition

Contrôler où commencer la lecture :

// Commencer depuis le début
EventPosition.Earliest

// Commencer depuis la fin (nouveaux événements uniquement)
EventPosition.Latest

// Commencer à partir d'un offset spécifique
EventPosition.FromOffset(12345)

// Commencer à partir d'un numéro de séquence spécifique
EventPosition.FromSequenceNumber(100)

// Commencer à partir d'une heure spécifique
EventPosition.FromEnqueuedTime(DateTimeOffset.UtcNow.AddHours(-1))

Intégration ASP.NET Core

// Program.cs
using Azure.Identity;
using Azure.Messaging.EventHubs.Producer;
using Microsoft.Extensions.Azure;

builder.Services.AddAzureClients(clientBuilder =>
{
    clientBuilder.AddEventHubProducerClient(
        builder.Configuration["EventHub:FullyQualifiedNamespace"],
        builder.Configuration["EventHub:Name"]);

    clientBuilder.UseCredential(new DefaultAzureCredential());
});

// Injecter dans un contrôleur/service
public class EventService
{
    private readonly EventHubProducerClient _producer;

    public EventService(EventHubProducerClient producer)
    {
        _producer = producer;
    }

    public async Task SendAsync(string message)
    {
        using var batch = await _producer.CreateBatchAsync();
        batch.TryAdd(new EventData(message));
        await _producer.SendAsync(batch);
    }
}

Meilleures pratiques

  1. Utiliser EventProcessorClient pour la réception — Ne jamais utiliser EventHubConsumerClient en production
  2. Faire des points de contrôle de manière stratégique — Après N événements ou un intervalle de temps, pas à chaque événement
  3. Utiliser des clés de partition — Pour les garanties d'ordre dans une partition
  4. Réutiliser les clients — Créer une fois, utiliser comme singleton (thread-safe)
  5. Utiliser await using — Assure une suppression appropriée
  6. Gérer ProcessErrorAsync — Toujours enregistrer un gestionnaire d'erreurs
  7. Faire des lots d'événements — Utiliser CreateBatchAsync() pour respecter les limites de taille
  8. Utiliser le producteur bufférisé — Pour les scénarios de très haut volume avec lot automatique

Gestion des erreurs

using Azure.Messaging.EventHubs;

try
{
    await producer.SendAsync(batch);
}
catch (EventHubsException ex) when (ex.Reason == EventHubsException.FailureReason.ServiceBusy)
{
    // Réessayer avec backoff
    await Task.Delay(TimeSpan.FromSeconds(5));
}
catch (EventHubsException ex) when (ex.IsTransient)
{
    // Erreur transitoire - sûr de réessayer
    Console.WriteLine($"Transient error: {ex.Message}");
}
catch (EventHubsException ex)
{
    // Erreur non transitoire
    Console.WriteLine($"Error: {ex.Reason} - {ex.Message}");
}

Stratégies de points de contrôle

Stratégie Quand l'utiliser
Chaque événement Faible volume, données critiques
Tous les N événements Débit équilibré/fiabilité
Basée sur le temps Intervalles de points de contrôle constants
Complétion de lot Après traitement d'un lot logique
// Point de contrôle tous les 100 événements
private int _eventCount = 0;

processor.ProcessEventAsync += async args =>
{
    // Traiter l'événement...

    _eventCount++;
    if (_eventCount >= 100)
    {
        await args.UpdateCheckpointAsync();
        _eventCount = 0;
    }
};

SDKs associés

SDK Objectif Installation
Azure.Messaging.EventHubs Envoi/réception principal dotnet add package Azure.Messaging.EventHubs
Azure.Messaging.EventHubs.Processor Traitement en production dotnet add package Azure.Messaging.EventHubs.Processor
Azure.ResourceManager.EventHubs Plan de gestion (créer des hubs) dotnet add package Azure.ResourceManager.EventHubs
Microsoft.Azure.WebJobs.Extensions.EventHubs Liaison Azure Functions dotnet add package Microsoft.Azure.WebJobs.Extensions.EventHubs

Skills similaires