SDK Azure Service Bus pour Python
Messagerie d'entreprise pour une communication cloud fiable avec files d'attente et rubriques pub/sub.
Installation
pip install azure-servicebus azure-identity
Variables d'environnement
SERVICEBUS_FULLY_QUALIFIED_NAMESPACE=<namespace>.servicebus.windows.net # Requis pour toutes les méthodes d'authentification
SERVICEBUS_QUEUE_NAME=myqueue # Requis pour les opérations sur files d'attente
SERVICEBUS_TOPIC_NAME=mytopic # Requis pour les opérations sur rubriques
SERVICEBUS_SUBSCRIPTION_NAME=mysubscription # Requis pour les opérations sur abonnements
AZURE_TOKEN_CREDENTIALS=prod # Requis uniquement si DefaultAzureCredential est utilisé en production
Authentification
from azure.identity import DefaultAzureCredential, ManagedIdentityCredential
from azure.servicebus import ServiceBusClient
# Dev local : DefaultAzureCredential. Production : définir AZURE_TOKEN_CREDENTIALS=prod ou AZURE_TOKEN_CREDENTIALS=<specific_credential>
credential = DefaultAzureCredential(require_envvar=True)
# Ou utiliser directement des identifiants spécifiques en production :
# Voir https://learn.microsoft.com/python/api/overview/azure/identity-readme?view=azure-python#credential-classes
# credential = ManagedIdentityCredential()
namespace = "<namespace>.servicebus.windows.net"
client = ServiceBusClient(
fully_qualified_namespace=namespace,
credential=credential
)
Types de clients
| Client |
Objectif |
Obtenir depuis |
ServiceBusClient |
Gestion des connexions |
Instanciation directe |
ServiceBusSender |
Envoyer des messages |
client.get_queue_sender() / get_topic_sender() |
ServiceBusReceiver |
Recevoir des messages |
client.get_queue_receiver() / get_subscription_receiver() |
Envoyer des messages (Async)
import asyncio
from azure.servicebus.aio import ServiceBusClient
from azure.servicebus import ServiceBusMessage
from azure.identity.aio import DefaultAzureCredential
async def send_messages():
credential = DefaultAzureCredential()
async with ServiceBusClient(
fully_qualified_namespace="<namespace>.servicebus.windows.net",
credential=credential
) as client:
sender = client.get_queue_sender(queue_name="myqueue")
async with sender:
# Message unique
message = ServiceBusMessage("Hello, Service Bus!")
await sender.send_messages(message)
# Lot de messages
messages = [ServiceBusMessage(f"Message {i}") for i in range(10)]
await sender.send_messages(messages)
# Batch de messages (pour contrôler la taille)
batch = await sender.create_message_batch()
for i in range(100):
try:
batch.add_message(ServiceBusMessage(f"Batch message {i}"))
except ValueError: # Batch plein
await sender.send_messages(batch)
batch = await sender.create_message_batch()
batch.add_message(ServiceBusMessage(f"Batch message {i}"))
await sender.send_messages(batch)
asyncio.run(send_messages())
Recevoir des messages (Async)
async def receive_messages():
credential = DefaultAzureCredential()
async with ServiceBusClient(
fully_qualified_namespace="<namespace>.servicebus.windows.net",
credential=credential
) as client:
receiver = client.get_queue_receiver(queue_name="myqueue")
async with receiver:
# Recevoir par lot
messages = await receiver.receive_messages(
max_message_count=10,
max_wait_time=5 # secondes
)
for msg in messages:
print(f"Received: {str(msg)}")
await receiver.complete_message(msg) # Retirer de la file d'attente
asyncio.run(receive_messages())
Modes de réception
| Mode |
Comportement |
Cas d'usage |
PEEK_LOCK (défaut) |
Message verrouillé, doit être complété/abandonné |
Traitement fiable |
RECEIVE_AND_DELETE |
Supprimé immédiatement à la réception |
Livraison au maximum une fois |
from azure.servicebus import ServiceBusReceiveMode
receiver = client.get_queue_receiver(
queue_name="myqueue",
receive_mode=ServiceBusReceiveMode.RECEIVE_AND_DELETE
)
Règlement des messages
async with receiver:
messages = await receiver.receive_messages(max_message_count=1)
for msg in messages:
try:
# Traiter le message...
await receiver.complete_message(msg) # Succès - retirer de la file d'attente
except ProcessingError:
await receiver.abandon_message(msg) # Réessayer plus tard
except PermanentError:
await receiver.dead_letter_message(
msg,
reason="ProcessingFailed",
error_description="Could not process"
)
| Action |
Effet |
complete_message() |
Retirer de la file d'attente (succès) |
abandon_message() |
Relâcher le verrou, réessayer immédiatement |
dead_letter_message() |
Déplacer vers la file d'attente de lettres mortes |
defer_message() |
Mettre de côté, recevoir par numéro de séquence |
Rubriques et abonnements
# Envoyer vers une rubrique
sender = client.get_topic_sender(topic_name="mytopic")
async with sender:
await sender.send_messages(ServiceBusMessage("Topic message"))
# Recevoir depuis un abonnement
receiver = client.get_subscription_receiver(
topic_name="mytopic",
subscription_name="mysubscription"
)
async with receiver:
messages = await receiver.receive_messages(max_message_count=10)
Sessions (FIFO)
# Envoyer avec session
message = ServiceBusMessage("Session message")
message.session_id = "order-123"
await sender.send_messages(message)
# Recevoir depuis une session spécifique
receiver = client.get_queue_receiver(
queue_name="session-queue",
session_id="order-123"
)
# Recevoir depuis la session disponible suivante
from azure.servicebus import NEXT_AVAILABLE_SESSION
receiver = client.get_queue_receiver(
queue_name="session-queue",
session_id=NEXT_AVAILABLE_SESSION
)
Messages planifiés
from datetime import datetime, timedelta, timezone
message = ServiceBusMessage("Scheduled message")
scheduled_time = datetime.now(timezone.utc) + timedelta(minutes=10)
# Planifier le message
sequence_number = await sender.schedule_messages(message, scheduled_time)
# Annuler le message planifié
await sender.cancel_scheduled_messages(sequence_number)
File d'attente de lettres mortes
from azure.servicebus import ServiceBusSubQueue
# Recevoir depuis la file d'attente de lettres mortes
dlq_receiver = client.get_queue_receiver(
queue_name="myqueue",
sub_queue=ServiceBusSubQueue.DEAD_LETTER
)
async with dlq_receiver:
messages = await dlq_receiver.receive_messages(max_message_count=10)
for msg in messages:
print(f"Dead-lettered: {msg.dead_letter_reason}")
await dlq_receiver.complete_message(msg)
Client synchrone (pour les scripts simples)
from azure.servicebus import ServiceBusClient, ServiceBusMessage
from azure.identity import DefaultAzureCredential
with ServiceBusClient(
fully_qualified_namespace="<namespace>.servicebus.windows.net",
credential=DefaultAzureCredential()
) as client:
with client.get_queue_sender("myqueue") as sender:
sender.send_messages(ServiceBusMessage("Sync message"))
with client.get_queue_receiver("myqueue") as receiver:
for msg in receiver:
print(str(msg))
receiver.complete_message(msg)
Bonnes pratiques
- Utiliser le client async pour les charges de travail en production
- Utiliser des context managers (
async with) pour un nettoyage approprié
- Compléter les messages après un traitement réussi
- Utiliser la file d'attente de lettres mortes pour les messages toxiques
- Utiliser les sessions pour un traitement ordonné, FIFO
- Utiliser des batches de messages pour les scénarios haut débit
- Définir
max_wait_time pour éviter un blocage infini
Fichiers de référence