azure-storage-queue-py

npx skills add https://github.com/microsoft/skills --skill azure-storage-queue-py

SDK Azure Queue Storage pour Python

Mise en file d'attente de messages simple et économique pour la communication asynchrone.

Installation

pip install azure-storage-queue azure-identity

Variables d'environnement

AZURE_STORAGE_ACCOUNT_URL=https://<account>.queue.core.windows.net  # Requis pour toutes les méthodes d'authentification
AZURE_TOKEN_CREDENTIALS=prod # Requis uniquement si DefaultAzureCredential est utilisé en production

Authentification

from azure.identity import DefaultAzureCredential, ManagedIdentityCredential
from azure.storage.queue import QueueServiceClient, QueueClient

# Dev local : DefaultAzureCredential. Production : définir AZURE_TOKEN_CREDENTIALS=prod ou AZURE_TOKEN_CREDENTIALS=<specific_credential>
credential = DefaultAzureCredential(require_envvar=True)
# Ou utiliser une credential spécifique directement en production :
# Voir https://learn.microsoft.com/python/api/overview/azure/identity-readme?view=azure-python#credential-classes
# credential = ManagedIdentityCredential()
account_url = "https://<account>.queue.core.windows.net"

# Client de service
service_client = QueueServiceClient(account_url=account_url, credential=credential)

# Client de file d'attente
queue_client = QueueClient(account_url=account_url, queue_name="myqueue", credential=credential)

Opérations de file d'attente

# Créer une file d'attente
service_client.create_queue("myqueue")

# Obtenir le client de file d'attente
queue_client = service_client.get_queue_client("myqueue")

# Supprimer la file d'attente
service_client.delete_queue("myqueue")

# Lister les files d'attente
for queue in service_client.list_queues():
    print(queue.name)

Envoyer des messages

# Envoyer un message (chaîne)
queue_client.send_message("Hello, Queue!")

# Envoyer avec options
queue_client.send_message(
    content="Delayed message",
    visibility_timeout=60,  # Masqué pendant 60 secondes
    time_to_live=3600       # Expire dans 1 heure
)

# Envoyer du JSON
import json
data = {"task": "process", "id": 123}
queue_client.send_message(json.dumps(data))

Recevoir des messages

# Recevoir des messages (les rend invisibles temporairement)
messages = queue_client.receive_messages(
    messages_per_page=10,
    visibility_timeout=30  # 30 secondes pour traiter
)

for message in messages:
    print(f"ID: {message.id}")
    print(f"Content: {message.content}")
    print(f"Dequeue count: {message.dequeue_count}")

    # Traiter le message...

    # Supprimer après traitement
    queue_client.delete_message(message)

Consulter les messages

# Consulter sans masquer (n'affecte pas la visibilité)
messages = queue_client.peek_messages(max_messages=5)

for message in messages:
    print(message.content)

Mettre à jour un message

# Étendre la visibilité ou mettre à jour le contenu
messages = queue_client.receive_messages()
for message in messages:
    # Étendre le timeout (besoin de plus de temps)
    queue_client.update_message(
        message,
        visibility_timeout=60
    )

    # Mettre à jour le contenu et le timeout
    queue_client.update_message(
        message,
        content="Updated content",
        visibility_timeout=60
    )

Supprimer un message

# Supprimer après traitement réussi
messages = queue_client.receive_messages()
for message in messages:
    try:
        # Traiter...
        queue_client.delete_message(message)
    except Exception:
        # Le message redevient visible après le timeout
        pass

Vider la file d'attente

# Supprimer tous les messages
queue_client.clear_messages()

Propriétés de la file d'attente

# Obtenir les propriétés de la file d'attente
properties = queue_client.get_queue_properties()
print(f"Approximate message count: {properties.approximate_message_count}")

# Définir/obtenir les métadonnées
queue_client.set_queue_metadata(metadata={"environment": "production"})
properties = queue_client.get_queue_properties()
print(properties.metadata)

Client asynchrone

from azure.storage.queue.aio import QueueServiceClient, QueueClient
from azure.identity.aio import DefaultAzureCredential

async def queue_operations():
    credential = DefaultAzureCredential()

    async with QueueClient(
        account_url="https://<account>.queue.core.windows.net",
        queue_name="myqueue",
        credential=credential
    ) as client:
        # Envoyer
        await client.send_message("Async message")

        # Recevoir
        async for message in client.receive_messages():
            print(message.content)
            await client.delete_message(message)

import asyncio
asyncio.run(queue_operations())

Encodage Base64

from azure.storage.queue import QueueClient, BinaryBase64EncodePolicy, BinaryBase64DecodePolicy

# Pour les données binaires
queue_client = QueueClient(
    account_url=account_url,
    queue_name="myqueue",
    credential=credential,
    message_encode_policy=BinaryBase64EncodePolicy(),
    message_decode_policy=BinaryBase64DecodePolicy()
)

# Envoyer des bytes
queue_client.send_message(b"Binary content")

Bonnes pratiques

  1. Supprimer les messages après traitement pour éviter le retraitement
  2. Définir un timeout de visibilité approprié basé sur le temps de traitement
  3. Gérer dequeue_count pour la détection de messages empoisonnés
  4. Utiliser le client asynchrone pour les scénarios à haut débit
  5. Utiliser peek_messages pour surveiller sans affecter la file d'attente
  6. Définir time_to_live pour éviter les messages obsolètes
  7. Considérer Service Bus pour les fonctionnalités avancées (sessions, topics)

Skills similaires