azure-eventhub-ts

Créez des applications de streaming d'événements avec le SDK Azure Event Hubs pour JavaScript (`@azure/event-hubs`). À utiliser pour implémenter une ingestion d'événements à haut débit, des analyses en temps réel, de la télémétrie IoT ou des architectures orientées événements avec des consommateurs partitionnés.

npx skills add https://github.com/microsoft/skills --skill azure-eventhub-ts

SDK Azure Event Hubs pour TypeScript

Streaming d'événements haut débit et ingestion de données en temps réel.

Installation

npm install @azure/event-hubs @azure/identity

Pour le checkpointing avec des groupes de consommateurs :

npm install @azure/eventhubs-checkpointstore-blob @azure/storage-blob

Variables d'environnement

EVENTHUB_NAMESPACE=<namespace>.servicebus.windows.net
EVENTHUB_NAME=my-eventhub
STORAGE_ACCOUNT_NAME=<storage-account>
STORAGE_CONTAINER_NAME=checkpoints
AZURE_TOKEN_CREDENTIALS=prod # Requis uniquement si DefaultAzureCredential est utilisé en production

Authentification

import { EventHubProducerClient, EventHubConsumerClient } from "@azure/event-hubs";
import { DefaultAzureCredential, ManagedIdentityCredential } from "@azure/identity";

const fullyQualifiedNamespace = process.env.EVENTHUB_NAMESPACE!;
const eventHubName = process.env.EVENTHUB_NAME!;
// Dev local : DefaultAzureCredential. Production : définissez AZURE_TOKEN_CREDENTIALS=prod ou AZURE_TOKEN_CREDENTIALS=<specific_credential>
const credential = new DefaultAzureCredential({requiredEnvVars: ["AZURE_TOKEN_CREDENTIALS"]});
// Ou utilisez une credential spécifique directement en production :
// Voir https://learn.microsoft.com/javascript/api/overview/azure/identity-readme?view=azure-node-latest#credential-classes
// const credential = new ManagedIdentityCredential();

// Producer
const producer = new EventHubProducerClient(fullyQualifiedNamespace, eventHubName, credential);

// Consumer
const consumer = new EventHubConsumerClient(
  "$Default", // Consumer group
  fullyQualifiedNamespace,
  eventHubName,
  credential
);

Flux de travail principal

Envoyer des événements

const producer = new EventHubProducerClient(namespace, eventHubName, credential);

// Créer un batch et ajouter des événements
const batch = await producer.createBatch();
batch.tryAdd({ body: { temperature: 72.5, deviceId: "sensor-1" } });
batch.tryAdd({ body: { temperature: 68.2, deviceId: "sensor-2" } });

await producer.sendBatch(batch);
await producer.close();

Envoyer à une partition spécifique

// Par ID de partition
const batch = await producer.createBatch({ partitionId: "0" });

// Par clé de partition (hachage cohérent)
const batch = await producer.createBatch({ partitionKey: "device-123" });

Recevoir des événements (simple)

const consumer = new EventHubConsumerClient("$Default", namespace, eventHubName, credential);

const subscription = consumer.subscribe({
  processEvents: async (events, context) => {
    for (const event of events) {
      console.log(`Partition: ${context.partitionId}, Body: ${JSON.stringify(event.body)}`);
    }
  },
  processError: async (err, context) => {
    console.error(`Erreur sur la partition ${context.partitionId}: ${err.message}`);
  },
});

// S'arrêter après un certain temps
setTimeout(async () => {
  await subscription.close();
  await consumer.close();
}, 60000);

Recevoir avec checkpointing (Production)

import { EventHubConsumerClient } from "@azure/event-hubs";
import { ContainerClient } from "@azure/storage-blob";
import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob";

const containerClient = new ContainerClient(
  `https://${storageAccount}.blob.core.windows.net/${containerName}`,
  credential
);

const checkpointStore = new BlobCheckpointStore(containerClient);

const consumer = new EventHubConsumerClient(
  "$Default",
  namespace,
  eventHubName,
  credential,
  checkpointStore
);

const subscription = consumer.subscribe({
  processEvents: async (events, context) => {
    for (const event of events) {
      console.log(`Traitement: ${JSON.stringify(event.body)}`);
    }
    // Checkpoint après le traitement du batch
    if (events.length > 0) {
      await context.updateCheckpoint(events[events.length - 1]);
    }
  },
  processError: async (err, context) => {
    console.error(`Erreur: ${err.message}`);
  },
});

Recevoir à partir d'une position spécifique

const subscription = consumer.subscribe({
  processEvents: async (events, context) => { /* ... */ },
  processError: async (err, context) => { /* ... */ },
}, {
  startPosition: {
    // Commencer du début
    "0": { offset: "@earliest" },
    // Commencer de la fin (nouveaux événements uniquement)
    "1": { offset: "@latest" },
    // Commencer à partir d'un offset spécifique
    "2": { offset: "12345" },
    // Commencer à partir d'une heure spécifique
    "3": { enqueuedOn: new Date("2024-01-01") },
  },
});

Propriétés de l'Event Hub

// Obtenir les informations du hub
const hubProperties = await producer.getEventHubProperties();
console.log(`Partitions: ${hubProperties.partitionIds}`);

// Obtenir les informations de la partition
const partitionProperties = await producer.getPartitionProperties("0");
console.log(`Dernier numéro de séquence: ${partitionProperties.lastEnqueuedSequenceNumber}`);

Options de traitement par batch

const subscription = consumer.subscribe(
  {
    processEvents: async (events, context) => { /* ... */ },
    processError: async (err, context) => { /* ... */ },
  },
  {
    maxBatchSize: 100,           // Max événements par batch
    maxWaitTimeInSeconds: 30,    // Attente max pour un batch
  }
);

Types clés

import {
  EventHubProducerClient,
  EventHubConsumerClient,
  EventData,
  ReceivedEventData,
  PartitionContext,
  Subscription,
  SubscriptionEventHandlers,
  CreateBatchOptions,
  EventPosition,
} from "@azure/event-hubs";

import { BlobCheckpointStore } from "@azure/eventhubs-checkpointstore-blob";

Propriétés des événements

// Envoyer avec des propriétés
const batch = await producer.createBatch();
batch.tryAdd({
  body: { data: "payload" },
  properties: {
    eventType: "telemetry",
    deviceId: "sensor-1",
  },
  contentType: "application/json",
  correlationId: "request-123",
});

// Accéder dans le récepteur
consumer.subscribe({
  processEvents: async (events, context) => {
    for (const event of events) {
      console.log(`Type: ${event.properties?.eventType}`);
      console.log(`Séquence: ${event.sequenceNumber}`);
      console.log(`Enqueued: ${event.enqueuedTimeUtc}`);
      console.log(`Offset: ${event.offset}`);
    }
  },
});

Gestion des erreurs

consumer.subscribe({
  processEvents: async (events, context) => {
    try {
      for (const event of events) {
        await processEvent(event);
      }
      await context.updateCheckpoint(events[events.length - 1]);
    } catch (error) {
      // Ne pas faire de checkpoint en cas d'erreur - les événements seront retraités
      console.error("Le traitement a échoué:", error);
    }
  },
  processError: async (err, context) => {
    if (err.name === "MessagingError") {
      // Erreur transitoire - le SDK va réessayer
      console.warn("Erreur transitoire:", err.message);
    } else {
      // Erreur fatale
      console.error("Erreur fatale:", err);
    }
  },
});

Bonnes pratiques

  1. Utiliser le checkpointing - Toujours faire des checkpoints en production pour un traitement exactly-once
  2. Envoyer par batch - Utiliser createBatch() pour un envoi efficace
  3. Clés de partition - Utiliser des clés de partition pour garantir l'ordre des événements connexes
  4. Groupes de consommateurs - Utiliser des groupes de consommateurs séparés pour différents pipelines de traitement
  5. Gérer les erreurs correctement - Ne pas faire de checkpoint en cas d'échec du traitement
  6. Fermer les clients - Toujours fermer le producer/consumer quand c'est terminé
  7. Surveiller le lag - Suivre lastEnqueuedSequenceNumber par rapport à la séquence traitée

Skills similaires