Azure Queue Storage SDK pour Python
Simple, rentable message queuing pour la communication asynchrone.
Installation
pip install azure-storage-queue azure-identity
Variables d'environnement
AZURE_STORAGE_ACCOUNT_URL=https://<account>.queue.core.windows.net # Requis pour toutes les méthodes d'authentification
AZURE_TOKEN_CREDENTIALS=prod # Requis uniquement si DefaultAzureCredential est utilisé en production
Authentification
from azure.identity import DefaultAzureCredential, ManagedIdentityCredential
from azure.storage.queue import QueueServiceClient, QueueClient
# Dev local : DefaultAzureCredential. Production : définir AZURE_TOKEN_CREDENTIALS=prod ou AZURE_TOKEN_CREDENTIALS=<specific_credential>
credential = DefaultAzureCredential(require_envvar=True)
# Ou utiliser une credential spécifique directement en production :
# Voir https://learn.microsoft.com/python/api/overview/azure/identity-readme?view=azure-python#credential-classes
# credential = ManagedIdentityCredential()
account_url = "https://<account>.queue.core.windows.net"
# Client de service
service_client = QueueServiceClient(account_url=account_url, credential=credential)
# Client de queue
queue_client = QueueClient(account_url=account_url, queue_name="myqueue", credential=credential)
Opérations de Queue
# Créer une queue
service_client.create_queue("myqueue")
# Obtenir le client de queue
queue_client = service_client.get_queue_client("myqueue")
# Supprimer la queue
service_client.delete_queue("myqueue")
# Lister les queues
for queue in service_client.list_queues():
print(queue.name)
Envoyer des Messages
# Envoyer un message (string)
queue_client.send_message("Hello, Queue!")
# Envoyer avec options
queue_client.send_message(
content="Delayed message",
visibility_timeout=60, # Caché pendant 60 secondes
time_to_live=3600 # Expire dans 1 heure
)
# Envoyer JSON
import json
data = {"task": "process", "id": 123}
queue_client.send_message(json.dumps(data))
Recevoir des Messages
# Recevoir des messages (les rend temporairement invisibles)
messages = queue_client.receive_messages(
messages_per_page=10,
visibility_timeout=30 # 30 secondes pour traiter
)
for message in messages:
print(f"ID: {message.id}")
print(f"Content: {message.content}")
print(f"Dequeue count: {message.dequeue_count}")
# Traiter le message...
# Supprimer après traitement
queue_client.delete_message(message)
Consulter des Messages
# Consulter sans masquer (n'affecte pas la visibilité)
messages = queue_client.peek_messages(max_messages=5)
for message in messages:
print(message.content)
Mettre à Jour un Message
# Étendre la visibilité ou mettre à jour le contenu
messages = queue_client.receive_messages()
for message in messages:
# Étendre le timeout (besoin de plus de temps)
queue_client.update_message(
message,
visibility_timeout=60
)
# Mettre à jour le contenu et le timeout
queue_client.update_message(
message,
content="Updated content",
visibility_timeout=60
)
Supprimer un Message
# Supprimer après traitement réussi
messages = queue_client.receive_messages()
for message in messages:
try:
# Traiter...
queue_client.delete_message(message)
except Exception:
# Le message devient visible à nouveau après le timeout
pass
Vider la Queue
# Supprimer tous les messages
queue_client.clear_messages()
Propriétés de Queue
# Obtenir les propriétés de la queue
properties = queue_client.get_queue_properties()
print(f"Approximate message count: {properties.approximate_message_count}")
# Définir/obtenir les métadonnées
queue_client.set_queue_metadata(metadata={"environment": "production"})
properties = queue_client.get_queue_properties()
print(properties.metadata)
Client Asynchrone
from azure.storage.queue.aio import QueueServiceClient, QueueClient
from azure.identity.aio import DefaultAzureCredential
async def queue_operations():
credential = DefaultAzureCredential()
async with QueueClient(
account_url="https://<account>.queue.core.windows.net",
queue_name="myqueue",
credential=credential
) as client:
# Envoyer
await client.send_message("Async message")
# Recevoir
async for message in client.receive_messages():
print(message.content)
await client.delete_message(message)
import asyncio
asyncio.run(queue_operations())
Encodage Base64
from azure.storage.queue import QueueClient, BinaryBase64EncodePolicy, BinaryBase64DecodePolicy
# Pour les données binaires
queue_client = QueueClient(
account_url=account_url,
queue_name="myqueue",
credential=credential,
message_encode_policy=BinaryBase64EncodePolicy(),
message_decode_policy=BinaryBase64DecodePolicy()
)
# Envoyer des bytes
queue_client.send_message(b"Binary content")
Bonnes Pratiques
- Supprimer les messages après traitement pour éviter le retraitement
- Définir un visibility timeout approprié basé sur le temps de traitement
- Gérer
dequeue_countpour la détection de poison messages - Utiliser le client asynchrone pour les scénarios à haut débit
- Utiliser
peek_messagespour surveiller sans affecter la queue - Définir
time_to_livepour éviter les messages obsolètes - Considérer Service Bus pour les fonctionnalités avancées (sessions, topics)