azure-eventhub-java

Créez des applications de streaming en temps réel avec le SDK Azure Event Hubs pour Java. À utiliser lors de l'implémentation de l'event streaming, de l'ingestion de données à haut débit, ou de la construction d'architectures événementielles.

npx skills add https://github.com/microsoft/skills --skill azure-eventhub-java

SDK Azure Event Hubs pour Java

Créez des applications de streaming en temps réel avec le SDK Azure Event Hubs pour Java.

Installation

<dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-messaging-eventhubs</artifactId>
    <version>5.19.0</version>
</dependency>

<!-- Pour le magasin de points de contrôle (production) -->
<dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
    <version>1.20.0</version>
</dependency>

Création du client

EventHubProducerClient

import com.azure.messaging.eventhubs.EventHubProducerClient;
import com.azure.messaging.eventhubs.EventHubClientBuilder;

// Avec une chaîne de connexion
EventHubProducerClient producer = new EventHubClientBuilder()
    .connectionString("<connection-string>", "<event-hub-name>")
    .buildProducerClient();

// Chaîne de connexion complète avec EntityPath
EventHubProducerClient producer = new EventHubClientBuilder()
    .connectionString("<connection-string-with-entity-path>")
    .buildProducerClient();

Avec DefaultAzureCredential

import com.azure.identity.DefaultAzureCredentialBuilder;

EventHubProducerClient producer = new EventHubClientBuilder()
    .fullyQualifiedNamespace("<namespace>.servicebus.windows.net")
    .eventHubName("<event-hub-name>")
    .credential(new DefaultAzureCredentialBuilder().build())
    .buildProducerClient();

EventHubConsumerClient

import com.azure.messaging.eventhubs.EventHubConsumerClient;

EventHubConsumerClient consumer = new EventHubClientBuilder()
    .connectionString("<connection-string>", "<event-hub-name>")
    .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
    .buildConsumerClient();

Clients asynchrones

import com.azure.messaging.eventhubs.EventHubProducerAsyncClient;
import com.azure.messaging.eventhubs.EventHubConsumerAsyncClient;

EventHubProducerAsyncClient asyncProducer = new EventHubClientBuilder()
    .connectionString("<connection-string>", "<event-hub-name>")
    .buildAsyncProducerClient();

EventHubConsumerAsyncClient asyncConsumer = new EventHubClientBuilder()
    .connectionString("<connection-string>", "<event-hub-name>")
    .consumerGroup("$Default")
    .buildAsyncConsumerClient();

Modèles courants

Envoyer un événement unique

import com.azure.messaging.eventhubs.EventData;

EventData eventData = new EventData("Hello, Event Hubs!");
producer.send(Collections.singletonList(eventData));

Envoyer un lot d'événements

import com.azure.messaging.eventhubs.EventDataBatch;
import com.azure.messaging.eventhubs.models.CreateBatchOptions;

// Créer un lot
EventDataBatch batch = producer.createBatch();

// Ajouter des événements (retourne false si le lot est plein)
for (int i = 0; i < 100; i++) {
    EventData event = new EventData("Event " + i);
    if (!batch.tryAdd(event)) {
        // Le lot est plein, l'envoyer et créer un nouveau lot
        producer.send(batch);
        batch = producer.createBatch();
        batch.tryAdd(event);
    }
}

// Envoyer les événements restants
if (batch.getCount() > 0) {
    producer.send(batch);
}

Envoyer vers une partition spécifique

CreateBatchOptions options = new CreateBatchOptions()
    .setPartitionId("0");

EventDataBatch batch = producer.createBatch(options);
batch.tryAdd(new EventData("Partition 0 event"));
producer.send(batch);

Envoyer avec une clé de partition

CreateBatchOptions options = new CreateBatchOptions()
    .setPartitionKey("customer-123");

EventDataBatch batch = producer.createBatch(options);
batch.tryAdd(new EventData("Customer event"));
producer.send(batch);

Événement avec propriétés

EventData event = new EventData("Order created");
event.getProperties().put("orderId", "ORD-123");
event.getProperties().put("customerId", "CUST-456");
event.getProperties().put("priority", 1);

producer.send(Collections.singletonList(event));

Recevoir des événements (simple)

import com.azure.messaging.eventhubs.models.EventPosition;
import com.azure.messaging.eventhubs.models.PartitionEvent;

// Recevoir depuis une partition spécifique
Iterable<PartitionEvent> events = consumer.receiveFromPartition(
    "0",                           // partitionId
    10,                            // maxEvents
    EventPosition.earliest(),      // startingPosition
    Duration.ofSeconds(30)         // timeout
);

for (PartitionEvent partitionEvent : events) {
    EventData event = partitionEvent.getData();
    System.out.println("Body: " + event.getBodyAsString());
    System.out.println("Sequence: " + event.getSequenceNumber());
    System.out.println("Offset: " + event.getOffset());
}

EventProcessorClient (Production)

import com.azure.messaging.eventhubs.EventProcessorClient;
import com.azure.messaging.eventhubs.EventProcessorClientBuilder;
import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore;
import com.azure.storage.blob.BlobContainerAsyncClient;
import com.azure.storage.blob.BlobContainerClientBuilder;

// Créer le magasin de points de contrôle
BlobContainerAsyncClient blobClient = new BlobContainerClientBuilder()
    .connectionString("<storage-connection-string>")
    .containerName("checkpoints")
    .buildAsyncClient();

// Créer le processeur
EventProcessorClient processor = new EventProcessorClientBuilder()
    .connectionString("<eventhub-connection-string>", "<event-hub-name>")
    .consumerGroup("$Default")
    .checkpointStore(new BlobCheckpointStore(blobClient))
    .processEvent(eventContext -> {
        EventData event = eventContext.getEventData();
        System.out.println("Processing: " + event.getBodyAsString());

        // Créer un point de contrôle après le traitement
        eventContext.updateCheckpoint();
    })
    .processError(errorContext -> {
        System.err.println("Error: " + errorContext.getThrowable().getMessage());
        System.err.println("Partition: " + errorContext.getPartitionContext().getPartitionId());
    })
    .buildEventProcessorClient();

// Commencer le traitement
processor.start();

// Continuer en arrière-plan...
Thread.sleep(Duration.ofMinutes(5).toMillis());

// Arrêter proprement
processor.stop();

Traitement par lots

EventProcessorClient processor = new EventProcessorClientBuilder()
    .connectionString("<connection-string>", "<event-hub-name>")
    .consumerGroup("$Default")
    .checkpointStore(new BlobCheckpointStore(blobClient))
    .processEventBatch(eventBatchContext -> {
        List<EventData> events = eventBatchContext.getEvents();
        System.out.printf("Received %d events%n", events.size());

        for (EventData event : events) {
            // Traiter chaque événement
            System.out.println(event.getBodyAsString());
        }

        // Créer un point de contrôle après le lot
        eventBatchContext.updateCheckpoint();
    }, 50) // maxBatchSize
    .processError(errorContext -> {
        System.err.println("Error: " + errorContext.getThrowable());
    })
    .buildEventProcessorClient();

Réception asynchrone

asyncConsumer.receiveFromPartition("0", EventPosition.latest())
    .subscribe(
        partitionEvent -> {
            EventData event = partitionEvent.getData();
            System.out.println("Received: " + event.getBodyAsString());
        },
        error -> System.err.println("Error: " + error),
        () -> System.out.println("Complete")
    );

Obtenir les propriétés du hub

// Obtenir les infos du hub
EventHubProperties hubProps = producer.getEventHubProperties();
System.out.println("Hub: " + hubProps.getName());
System.out.println("Partitions: " + hubProps.getPartitionIds());

// Obtenir les infos de la partition
PartitionProperties partitionProps = producer.getPartitionProperties("0");
System.out.println("Begin sequence: " + partitionProps.getBeginningSequenceNumber());
System.out.println("Last sequence: " + partitionProps.getLastEnqueuedSequenceNumber());
System.out.println("Last offset: " + partitionProps.getLastEnqueuedOffset());

Positions d'événements

// Commencer par le début
EventPosition.earliest()

// Commencer par la fin (nouveaux événements uniquement)
EventPosition.latest()

// À partir d'un offset spécifique
EventPosition.fromOffset(12345L)

// À partir d'un numéro de séquence spécifique
EventPosition.fromSequenceNumber(100L)

// À partir d'une heure spécifique
EventPosition.fromEnqueuedTime(Instant.now().minus(Duration.ofHours(1)))

Gestion des erreurs

import com.azure.messaging.eventhubs.models.ErrorContext;

.processError(errorContext -> {
    Throwable error = errorContext.getThrowable();
    String partitionId = errorContext.getPartitionContext().getPartitionId();

    if (error instanceof AmqpException) {
        AmqpException amqpError = (AmqpException) error;
        if (amqpError.isTransient()) {
            System.out.println("Transient error, will retry");
        }
    }

    System.err.printf("Error on partition %s: %s%n", partitionId, error.getMessage());
})

Libération des ressources

// Toujours fermer les clients
try {
    producer.send(batch);
} finally {
    producer.close();
}

// Ou utiliser try-with-resources
try (EventHubProducerClient producer = new EventHubClientBuilder()
        .connectionString(connectionString, eventHubName)
        .buildProducerClient()) {
    producer.send(events);
}

Variables d'environnement

EVENT_HUBS_CONNECTION_STRING=Endpoint=sb://<namespace>.servicebus.windows.net/;SharedAccessKeyName=...
EVENT_HUBS_NAME=<event-hub-name>
STORAGE_CONNECTION_STRING=<for-checkpointing>

Bonnes pratiques

  1. Utiliser EventProcessorClient : En production, fournit l'équilibrage de charge et les points de contrôle
  2. Traiter par lots : Utiliser EventDataBatch pour un envoi efficace
  3. Clés de partition : Utiliser pour les garanties de tri dans une partition
  4. Points de contrôle : Créer un point de contrôle après le traitement pour éviter la réitération
  5. Gestion des erreurs : Gérer les erreurs transitoires avec des tentatives
  6. Fermer les clients : Toujours fermer le producteur/consommateur quand terminé

Expressions déclencheurs

  • "Event Hubs Java"
  • "streaming d'événements Azure"
  • "ingestion de données en temps réel"
  • "EventProcessorClient"
  • "producteur consommateur de hub d'événements"
  • "traitement de partition"

Skills similaires