Azure.Messaging.ServiceBus (.NET)
SDK de messagerie d'entreprise pour la livraison fiable de messages avec files d'attente, rubriques, abonnements et sessions.
Installation
dotnet add package Azure.Messaging.ServiceBus
dotnet add package Azure.Identity
Version actuelle : v7.20.1 (stable)
Variables d'environnement
AZURE_SERVICEBUS_FULLY_QUALIFIED_NAMESPACE=<namespace>.servicebus.windows.net
# Ou chaîne de connexion (moins sécurisé)
AZURE_SERVICEBUS_CONNECTION_STRING=Endpoint=sb://...
Authentification
Microsoft Entra ID (Recommandé)
using Azure.Identity;
using Azure.Messaging.ServiceBus;
string fullyQualifiedNamespace = "<namespace>.servicebus.windows.net";
await using ServiceBusClient client = new(fullyQualifiedNamespace, new DefaultAzureCredential());
Chaîne de connexion
string connectionString = "<connection_string>";
await using ServiceBusClient client = new(connectionString);
Injection de dépendances ASP.NET Core
services.AddAzureClients(builder =>
{
builder.AddServiceBusClientWithNamespace("<namespace>.servicebus.windows.net");
builder.UseCredential(new DefaultAzureCredential());
});
Hiérarchie des clients
ServiceBusClient
├── CreateSender(queueOrTopicName) → ServiceBusSender
├── CreateReceiver(queueName) → ServiceBusReceiver
├── CreateReceiver(topicName, subName) → ServiceBusReceiver
├── AcceptNextSessionAsync(queueName) → ServiceBusSessionReceiver
├── CreateProcessor(queueName) → ServiceBusProcessor
└── CreateSessionProcessor(queueName) → ServiceBusSessionProcessor
ServiceBusAdministrationClient (client séparé pour CRUD)
Workflows principaux
1. Envoyer des messages
await using ServiceBusClient client = new(fullyQualifiedNamespace, new DefaultAzureCredential());
ServiceBusSender sender = client.CreateSender("my-queue");
// Message unique
ServiceBusMessage message = new("Hello world!");
await sender.SendMessageAsync(message);
// Batching sécurisé (recommandé)
using ServiceBusMessageBatch batch = await sender.CreateMessageBatchAsync();
if (batch.TryAddMessage(new ServiceBusMessage("Message 1")))
{
// Message ajouté avec succès
}
if (batch.TryAddMessage(new ServiceBusMessage("Message 2")))
{
// Message ajouté avec succès
}
await sender.SendMessagesAsync(batch);
2. Recevoir des messages
ServiceBusReceiver receiver = client.CreateReceiver("my-queue");
// Message unique
ServiceBusReceivedMessage message = await receiver.ReceiveMessageAsync();
string body = message.Body.ToString();
Console.WriteLine(body);
// Confirmer le message (le retire de la file d'attente)
await receiver.CompleteMessageAsync(message);
// Réception par lot
IReadOnlyList<ServiceBusReceivedMessage> messages = await receiver.ReceiveMessagesAsync(maxMessages: 10);
foreach (var msg in messages)
{
Console.WriteLine(msg.Body.ToString());
await receiver.CompleteMessageAsync(msg);
}
3. Traitement des messages
// Complete - retire le message de la file d'attente
await receiver.CompleteMessageAsync(message);
// Abandon - libère le verrou, le message peut être reçu à nouveau
await receiver.AbandonMessageAsync(message);
// Différé - empêche la réception normale, utilisez ReceiveDeferredMessageAsync
await receiver.DeferMessageAsync(message);
// Dead Letter - déplace vers la sous-file d'attente des lettres mortes
await receiver.DeadLetterMessageAsync(message, "InvalidFormat", "Message body was not valid JSON");
4. Traitement en arrière-plan avec Processor
ServiceBusProcessor processor = client.CreateProcessor("my-queue", new ServiceBusProcessorOptions
{
AutoCompleteMessages = false,
MaxConcurrentCalls = 2
});
processor.ProcessMessageAsync += async (args) =>
{
try
{
string body = args.Message.Body.ToString();
Console.WriteLine($"Received: {body}");
await args.CompleteMessageAsync(args.Message);
}
catch (Exception ex)
{
Console.WriteLine($"Error processing: {ex.Message}");
await args.AbandonMessageAsync(args.Message);
}
};
processor.ProcessErrorAsync += (args) =>
{
Console.WriteLine($"Error source: {args.ErrorSource}");
Console.WriteLine($"Entity: {args.EntityPath}");
Console.WriteLine($"Exception: {args.Exception}");
return Task.CompletedTask;
};
await processor.StartProcessingAsync();
// ... l'application s'exécute
await processor.StopProcessingAsync();
5. Sessions (Traitement ordonné)
// Envoyer un message de session
ServiceBusMessage message = new("Hello")
{
SessionId = "order-123"
};
await sender.SendMessageAsync(message);
// Recevoir de la prochaine session disponible
ServiceBusSessionReceiver receiver = await client.AcceptNextSessionAsync("my-queue");
// Ou recevoir d'une session spécifique
ServiceBusSessionReceiver receiver = await client.AcceptSessionAsync("my-queue", "order-123");
// Gestion de l'état de la session
await receiver.SetSessionStateAsync(new BinaryData("processing"));
BinaryData state = await receiver.GetSessionStateAsync();
// Renouveler le verrou de session
await receiver.RenewSessionLockAsync();
6. File d'attente des lettres mortes
// Recevoir de la file d'attente des lettres mortes
ServiceBusReceiver dlqReceiver = client.CreateReceiver("my-queue", new ServiceBusReceiverOptions
{
SubQueue = SubQueue.DeadLetter
});
ServiceBusReceivedMessage dlqMessage = await dlqReceiver.ReceiveMessageAsync();
// Accéder aux métadonnées de lettre morte
string reason = dlqMessage.DeadLetterReason;
string description = dlqMessage.DeadLetterErrorDescription;
Console.WriteLine($"Dead letter reason: {reason} - {description}");
7. Rubriques et abonnements
// Envoyer à une rubrique
ServiceBusSender topicSender = client.CreateSender("my-topic");
await topicSender.SendMessageAsync(new ServiceBusMessage("Broadcast message"));
// Recevoir d'un abonnement
ServiceBusReceiver subReceiver = client.CreateReceiver("my-topic", "my-subscription");
var message = await subReceiver.ReceiveMessageAsync();
8. Administration (CRUD)
var adminClient = new ServiceBusAdministrationClient(
fullyQualifiedNamespace,
new DefaultAzureCredential());
// Créer une file d'attente
var options = new CreateQueueOptions("my-queue")
{
MaxDeliveryCount = 10,
LockDuration = TimeSpan.FromSeconds(30),
RequiresSession = true,
DeadLetteringOnMessageExpiration = true
};
QueueProperties queue = await adminClient.CreateQueueAsync(options);
// Mettre à jour une file d'attente
queue.LockDuration = TimeSpan.FromSeconds(60);
await adminClient.UpdateQueueAsync(queue);
// Créer une rubrique et un abonnement
await adminClient.CreateTopicAsync(new CreateTopicOptions("my-topic"));
await adminClient.CreateSubscriptionAsync(new CreateSubscriptionOptions("my-topic", "my-subscription"));
// Supprimer
await adminClient.DeleteQueueAsync("my-queue");
9. Transactions entre entités
var options = new ServiceBusClientOptions { EnableCrossEntityTransactions = true };
await using var client = new ServiceBusClient(connectionString, options);
ServiceBusReceiver receiverA = client.CreateReceiver("queueA");
ServiceBusSender senderB = client.CreateSender("queueB");
ServiceBusReceivedMessage receivedMessage = await receiverA.ReceiveMessageAsync();
using (var ts = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
{
await receiverA.CompleteMessageAsync(receivedMessage);
await senderB.SendMessageAsync(new ServiceBusMessage("Forwarded"));
ts.Complete();
}
Référence des types clés
| Type | Objectif |
|---|---|
ServiceBusClient |
Point d'entrée principal, gère la connexion |
ServiceBusSender |
Envoie des messages aux files d'attente/rubriques |
ServiceBusReceiver |
Reçoit les messages des files d'attente/abonnements |
ServiceBusSessionReceiver |
Reçoit les messages de session |
ServiceBusProcessor |
Traitement des messages en arrière-plan |
ServiceBusSessionProcessor |
Traitement des sessions en arrière-plan |
ServiceBusAdministrationClient |
CRUD pour files d'attente/rubriques/abonnements |
ServiceBusMessage |
Message à envoyer |
ServiceBusReceivedMessage |
Message reçu avec métadonnées |
ServiceBusMessageBatch |
Lot de messages |
Bonnes pratiques
- Utiliser des singletons — Les clients, senders, receivers et processors sont thread-safe
- Toujours disposer — Utilisez
await usingou appelezDisposeAsync() - Ordre de suppression — Fermez d'abord les senders/receivers/processors, puis le client
- Utiliser DefaultAzureCredential — Préférez-le aux chaînes de connexion en production
- Utiliser les processors pour le travail en arrière-plan — Gère automatiquement le renouvellement des verrous
- Utiliser le batching sécurisé —
CreateMessageBatchAsync()etTryAddMessage() - Gérer les erreurs transitoires — Utilisez
ServiceBusException.Reason - Configurer le transport — Utilisez
AmqpWebSocketssi les ports 5 671/5 672 sont bloqués - Définir une durée de verrou appropriée — La valeur par défaut est 30 secondes
- Utiliser les sessions pour le tri — FIFO au sein d'une session
Gestion des erreurs
try
{
await sender.SendMessageAsync(message);
}
catch (ServiceBusException ex) when (ex.Reason == ServiceBusFailureReason.ServiceBusy)
{
// Réessayer avec backoff
}
catch (ServiceBusException ex)
{
Console.WriteLine($"Service Bus Error: {ex.Reason} - {ex.Message}");
}
SDKs associés
| SDK | Objectif | Installation |
|---|---|---|
Azure.Messaging.ServiceBus |
Service Bus (ce SDK) | dotnet add package Azure.Messaging.ServiceBus |
Azure.Messaging.EventHubs |
Streaming d'événements | dotnet add package Azure.Messaging.EventHubs |
Azure.Messaging.EventGrid |
Routage d'événements | dotnet add package Azure.Messaging.EventGrid |