azure-servicebus-py

npx skills add https://github.com/microsoft/skills --skill azure-servicebus-py

SDK Azure Service Bus pour Python

Messagerie d'entreprise pour une communication cloud fiable avec files d'attente et rubriques pub/sub.

Installation

pip install azure-servicebus azure-identity

Variables d'environnement

SERVICEBUS_FULLY_QUALIFIED_NAMESPACE=<namespace>.servicebus.windows.net  # Requis pour toutes les méthodes d'authentification
SERVICEBUS_QUEUE_NAME=myqueue  # Requis pour les opérations sur files d'attente
SERVICEBUS_TOPIC_NAME=mytopic  # Requis pour les opérations sur rubriques
SERVICEBUS_SUBSCRIPTION_NAME=mysubscription  # Requis pour les opérations sur abonnements
AZURE_TOKEN_CREDENTIALS=prod # Requis uniquement si DefaultAzureCredential est utilisé en production

Authentification

from azure.identity import DefaultAzureCredential, ManagedIdentityCredential
from azure.servicebus import ServiceBusClient

# Dev local : DefaultAzureCredential. Production : définir AZURE_TOKEN_CREDENTIALS=prod ou AZURE_TOKEN_CREDENTIALS=<specific_credential>
credential = DefaultAzureCredential(require_envvar=True)
# Ou utiliser directement des identifiants spécifiques en production :
# Voir https://learn.microsoft.com/python/api/overview/azure/identity-readme?view=azure-python#credential-classes
# credential = ManagedIdentityCredential()
namespace = "<namespace>.servicebus.windows.net"

client = ServiceBusClient(
    fully_qualified_namespace=namespace,
    credential=credential
)

Types de clients

Client Objectif Obtenir depuis
ServiceBusClient Gestion des connexions Instanciation directe
ServiceBusSender Envoyer des messages client.get_queue_sender() / get_topic_sender()
ServiceBusReceiver Recevoir des messages client.get_queue_receiver() / get_subscription_receiver()

Envoyer des messages (Async)

import asyncio
from azure.servicebus.aio import ServiceBusClient
from azure.servicebus import ServiceBusMessage
from azure.identity.aio import DefaultAzureCredential

async def send_messages():
    credential = DefaultAzureCredential()

    async with ServiceBusClient(
        fully_qualified_namespace="<namespace>.servicebus.windows.net",
        credential=credential
    ) as client:
        sender = client.get_queue_sender(queue_name="myqueue")

        async with sender:
            # Message unique
            message = ServiceBusMessage("Hello, Service Bus!")
            await sender.send_messages(message)

            # Lot de messages
            messages = [ServiceBusMessage(f"Message {i}") for i in range(10)]
            await sender.send_messages(messages)

            # Batch de messages (pour contrôler la taille)
            batch = await sender.create_message_batch()
            for i in range(100):
                try:
                    batch.add_message(ServiceBusMessage(f"Batch message {i}"))
                except ValueError:  # Batch plein
                    await sender.send_messages(batch)
                    batch = await sender.create_message_batch()
                    batch.add_message(ServiceBusMessage(f"Batch message {i}"))
            await sender.send_messages(batch)

asyncio.run(send_messages())

Recevoir des messages (Async)

async def receive_messages():
    credential = DefaultAzureCredential()

    async with ServiceBusClient(
        fully_qualified_namespace="<namespace>.servicebus.windows.net",
        credential=credential
    ) as client:
        receiver = client.get_queue_receiver(queue_name="myqueue")

        async with receiver:
            # Recevoir par lot
            messages = await receiver.receive_messages(
                max_message_count=10,
                max_wait_time=5  # secondes
            )

            for msg in messages:
                print(f"Received: {str(msg)}")
                await receiver.complete_message(msg)  # Retirer de la file d'attente

asyncio.run(receive_messages())

Modes de réception

Mode Comportement Cas d'usage
PEEK_LOCK (défaut) Message verrouillé, doit être complété/abandonné Traitement fiable
RECEIVE_AND_DELETE Supprimé immédiatement à la réception Livraison au maximum une fois
from azure.servicebus import ServiceBusReceiveMode

receiver = client.get_queue_receiver(
    queue_name="myqueue",
    receive_mode=ServiceBusReceiveMode.RECEIVE_AND_DELETE
)

Règlement des messages

async with receiver:
    messages = await receiver.receive_messages(max_message_count=1)

    for msg in messages:
        try:
            # Traiter le message...
            await receiver.complete_message(msg)  # Succès - retirer de la file d'attente
        except ProcessingError:
            await receiver.abandon_message(msg)  # Réessayer plus tard
        except PermanentError:
            await receiver.dead_letter_message(
                msg,
                reason="ProcessingFailed",
                error_description="Could not process"
            )
Action Effet
complete_message() Retirer de la file d'attente (succès)
abandon_message() Relâcher le verrou, réessayer immédiatement
dead_letter_message() Déplacer vers la file d'attente de lettres mortes
defer_message() Mettre de côté, recevoir par numéro de séquence

Rubriques et abonnements

# Envoyer vers une rubrique
sender = client.get_topic_sender(topic_name="mytopic")
async with sender:
    await sender.send_messages(ServiceBusMessage("Topic message"))

# Recevoir depuis un abonnement
receiver = client.get_subscription_receiver(
    topic_name="mytopic",
    subscription_name="mysubscription"
)
async with receiver:
    messages = await receiver.receive_messages(max_message_count=10)

Sessions (FIFO)

# Envoyer avec session
message = ServiceBusMessage("Session message")
message.session_id = "order-123"
await sender.send_messages(message)

# Recevoir depuis une session spécifique
receiver = client.get_queue_receiver(
    queue_name="session-queue",
    session_id="order-123"
)

# Recevoir depuis la session disponible suivante
from azure.servicebus import NEXT_AVAILABLE_SESSION
receiver = client.get_queue_receiver(
    queue_name="session-queue",
    session_id=NEXT_AVAILABLE_SESSION
)

Messages planifiés

from datetime import datetime, timedelta, timezone

message = ServiceBusMessage("Scheduled message")
scheduled_time = datetime.now(timezone.utc) + timedelta(minutes=10)

# Planifier le message
sequence_number = await sender.schedule_messages(message, scheduled_time)

# Annuler le message planifié
await sender.cancel_scheduled_messages(sequence_number)

File d'attente de lettres mortes

from azure.servicebus import ServiceBusSubQueue

# Recevoir depuis la file d'attente de lettres mortes
dlq_receiver = client.get_queue_receiver(
    queue_name="myqueue",
    sub_queue=ServiceBusSubQueue.DEAD_LETTER
)

async with dlq_receiver:
    messages = await dlq_receiver.receive_messages(max_message_count=10)
    for msg in messages:
        print(f"Dead-lettered: {msg.dead_letter_reason}")
        await dlq_receiver.complete_message(msg)

Client synchrone (pour les scripts simples)

from azure.servicebus import ServiceBusClient, ServiceBusMessage
from azure.identity import DefaultAzureCredential

with ServiceBusClient(
    fully_qualified_namespace="<namespace>.servicebus.windows.net",
    credential=DefaultAzureCredential()
) as client:
    with client.get_queue_sender("myqueue") as sender:
        sender.send_messages(ServiceBusMessage("Sync message"))

    with client.get_queue_receiver("myqueue") as receiver:
        for msg in receiver:
            print(str(msg))
            receiver.complete_message(msg)

Bonnes pratiques

  1. Utiliser le client async pour les charges de travail en production
  2. Utiliser des context managers (async with) pour un nettoyage approprié
  3. Compléter les messages après un traitement réussi
  4. Utiliser la file d'attente de lettres mortes pour les messages toxiques
  5. Utiliser les sessions pour un traitement ordonné, FIFO
  6. Utiliser des batches de messages pour les scénarios haut débit
  7. Définir max_wait_time pour éviter un blocage infini

Fichiers de référence

Fichier Contenu
references/patterns.md Consommateurs concurrents, sessions, patterns de retry, requête-réponse, transactions
references/dead-letter.md Gestion DLQ, messages toxiques, stratégies de retraitement
scripts/setup_servicebus.py CLI pour la gestion des files d'attente/rubriques/abonnements et la surveillance DLQ

Skills similaires