SDK Azure Event Hubs pour TypeScript
Streaming d'événements haut débit et ingestion de données en temps réel.
Installation
npm install @azure/event-hubs @azure/identity
Pour le checkpointing avec des groupes de consommateurs :
npm install @azure/eventhubs-checkpointstore-blob @azure/storage-blob
Variables d'environnement
EVENTHUB_NAMESPACE=<namespace>.servicebus.windows.net
EVENTHUB_NAME=my-eventhub
STORAGE_ACCOUNT_NAME=<storage-account>
STORAGE_CONTAINER_NAME=checkpoints
AZURE_TOKEN_CREDENTIALS=prod # Requis uniquement si DefaultAzureCredential est utilisé en production
Authentification
import { EventHubProducerClient, EventHubConsumerClient } from "@azure/event-hubs";
import { DefaultAzureCredential, ManagedIdentityCredential } from "@azure/identity";
const fullyQualifiedNamespace = process.env.EVENTHUB_NAMESPACE!;
const eventHubName = process.env.EVENTHUB_NAME!;
// Dev local : DefaultAzureCredential. Production : définissez AZURE_TOKEN_CREDENTIALS=prod ou AZURE_TOKEN_CREDENTIALS=<specific_credential>
const credential = new DefaultAzureCredential({requiredEnvVars: ["AZURE_TOKEN_CREDENTIALS"]});
// Ou utilisez une credential spécifique directement en production :
// Voir https://learn.microsoft.com/javascript/api/overview/azure/identity-readme?view=azure-node-latest#credential-classes
// const credential = new ManagedIdentityCredential();
// Producer
const producer = new EventHubProducerClient(fullyQualifiedNamespace, eventHubName, credential);
// Consumer
const consumer = new EventHubConsumerClient(
"$Default", // Consumer group
fullyQualifiedNamespace,
eventHubName,
credential
);
Flux de travail principal
Envoyer des événements
const producer = new EventHubProducerClient(namespace, eventHubName, credential);
// Créer un batch et ajouter des événements
const batch = await producer.createBatch();
batch.tryAdd({ body: { temperature: 72.5, deviceId: "sensor-1" } });
batch.tryAdd({ body: { temperature: 68.2, deviceId: "sensor-2" } });
await producer.sendBatch(batch);
await producer.close();
Envoyer à une partition spécifique
// Par ID de partition
const batch = await producer.createBatch({ partitionId: "0" });
// Par clé de partition (hachage cohérent)
const batch = await producer.createBatch({ partitionKey: "device-123" });
Recevoir des événements (simple)
const consumer = new EventHubConsumerClient("$Default", namespace, eventHubName, credential);
const subscription = consumer.subscribe({
processEvents: async (events, context) => {
for (const event of events) {
console.log(`Partition: ${context.partitionId}, Body: ${JSON.stringify(event.body)}`);
}
},
processError: async (err, context) => {
console.error(`Erreur sur la partition ${context.partitionId}: ${err.message}`);
},
});
// S'arrêter après un certain temps
setTimeout(async () => {
await subscription.close();
await consumer.close();
}, 60000);
Recevoir avec checkpointing (Production)
import { EventHubConsumerClient } from "@azure/event-hubs";
import { ContainerClient } from "@azure/storage-blob";
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob";
const containerClient = new ContainerClient(
`https://${storageAccount}.blob.core.windows.net/${containerName}`,
credential
);
const checkpointStore = new BlobCheckpointStore(containerClient);
const consumer = new EventHubConsumerClient(
"$Default",
namespace,
eventHubName,
credential,
checkpointStore
);
const subscription = consumer.subscribe({
processEvents: async (events, context) => {
for (const event of events) {
console.log(`Traitement: ${JSON.stringify(event.body)}`);
}
// Checkpoint après le traitement du batch
if (events.length > 0) {
await context.updateCheckpoint(events[events.length - 1]);
}
},
processError: async (err, context) => {
console.error(`Erreur: ${err.message}`);
},
});
Recevoir à partir d'une position spécifique
const subscription = consumer.subscribe({
processEvents: async (events, context) => { /* ... */ },
processError: async (err, context) => { /* ... */ },
}, {
startPosition: {
// Commencer du début
"0": { offset: "@earliest" },
// Commencer de la fin (nouveaux événements uniquement)
"1": { offset: "@latest" },
// Commencer à partir d'un offset spécifique
"2": { offset: "12345" },
// Commencer à partir d'une heure spécifique
"3": { enqueuedOn: new Date("2024-01-01") },
},
});
Propriétés de l'Event Hub
// Obtenir les informations du hub
const hubProperties = await producer.getEventHubProperties();
console.log(`Partitions: ${hubProperties.partitionIds}`);
// Obtenir les informations de la partition
const partitionProperties = await producer.getPartitionProperties("0");
console.log(`Dernier numéro de séquence: ${partitionProperties.lastEnqueuedSequenceNumber}`);
Options de traitement par batch
const subscription = consumer.subscribe(
{
processEvents: async (events, context) => { /* ... */ },
processError: async (err, context) => { /* ... */ },
},
{
maxBatchSize: 100, // Max événements par batch
maxWaitTimeInSeconds: 30, // Attente max pour un batch
}
);
Types clés
import {
EventHubProducerClient,
EventHubConsumerClient,
EventData,
ReceivedEventData,
PartitionContext,
Subscription,
SubscriptionEventHandlers,
CreateBatchOptions,
EventPosition,
} from "@azure/event-hubs";
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob";
Propriétés des événements
// Envoyer avec des propriétés
const batch = await producer.createBatch();
batch.tryAdd({
body: { data: "payload" },
properties: {
eventType: "telemetry",
deviceId: "sensor-1",
},
contentType: "application/json",
correlationId: "request-123",
});
// Accéder dans le récepteur
consumer.subscribe({
processEvents: async (events, context) => {
for (const event of events) {
console.log(`Type: ${event.properties?.eventType}`);
console.log(`Séquence: ${event.sequenceNumber}`);
console.log(`Enqueued: ${event.enqueuedTimeUtc}`);
console.log(`Offset: ${event.offset}`);
}
},
});
Gestion des erreurs
consumer.subscribe({
processEvents: async (events, context) => {
try {
for (const event of events) {
await processEvent(event);
}
await context.updateCheckpoint(events[events.length - 1]);
} catch (error) {
// Ne pas faire de checkpoint en cas d'erreur - les événements seront retraités
console.error("Le traitement a échoué:", error);
}
},
processError: async (err, context) => {
if (err.name === "MessagingError") {
// Erreur transitoire - le SDK va réessayer
console.warn("Erreur transitoire:", err.message);
} else {
// Erreur fatale
console.error("Erreur fatale:", err);
}
},
});
Bonnes pratiques
- Utiliser le checkpointing - Toujours faire des checkpoints en production pour un traitement exactly-once
- Envoyer par batch - Utiliser
createBatch()pour un envoi efficace - Clés de partition - Utiliser des clés de partition pour garantir l'ordre des événements connexes
- Groupes de consommateurs - Utiliser des groupes de consommateurs séparés pour différents pipelines de traitement
- Gérer les erreurs correctement - Ne pas faire de checkpoint en cas d'échec du traitement
- Fermer les clients - Toujours fermer le producer/consumer quand c'est terminé
- Surveiller le lag - Suivre
lastEnqueuedSequenceNumberpar rapport à la séquence traitée