Kit SDK Azure Blob Storage pour Python
Bibliothèque cliente pour Azure Blob Storage — stockage d'objets pour données non structurées.
Installation
pip install azure-storage-blob azure-identity
Variables d'environnement
AZURE_STORAGE_ACCOUNT_NAME=<your-storage-account> # Requis pour toutes les méthodes d'auth
# Ou utiliser l'URL complète
AZURE_STORAGE_ACCOUNT_URL=https://<account>.blob.core.windows.net # Alternative au nom du compte
AZURE_TOKEN_CREDENTIALS=prod # Requis uniquement si DefaultAzureCredential est utilisé en production
Authentification
from azure.identity import DefaultAzureCredential, ManagedIdentityCredential
from azure.storage.blob import BlobServiceClient
# Dev local : DefaultAzureCredential. Production : définir AZURE_TOKEN_CREDENTIALS=prod ou AZURE_TOKEN_CREDENTIALS=<specific_credential>
credential = DefaultAzureCredential(require_envvar=True)
# Ou utiliser une credential spécifique directement en production :
# Voir https://learn.microsoft.com/python/api/overview/azure/identity-readme?view=azure-python#credential-classes
# credential = ManagedIdentityCredential()
account_url = "https://<account>.blob.core.windows.net"
blob_service_client = BlobServiceClient(account_url, credential=credential)
Hiérarchie des clients
| Client | Objectif | Obtenir de |
|---|---|---|
BlobServiceClient |
Opérations au niveau du compte | Instanciation directe |
ContainerClient |
Opérations sur les conteneurs | blob_service_client.get_container_client() |
BlobClient |
Opérations sur un seul blob | container_client.get_blob_client() |
Flux de travail principal
Créer un conteneur
container_client = blob_service_client.get_container_client("mycontainer")
container_client.create_container()
Charger un blob
# À partir d'un chemin de fichier
blob_client = blob_service_client.get_blob_client(
container="mycontainer",
blob="sample.txt"
)
with open("./local-file.txt", "rb") as data:
blob_client.upload_blob(data, overwrite=True)
# À partir de bytes/string
blob_client.upload_blob(b"Hello, World!", overwrite=True)
# À partir d'un flux
import io
stream = io.BytesIO(b"Stream content")
blob_client.upload_blob(stream, overwrite=True)
Télécharger un blob
blob_client = blob_service_client.get_blob_client(
container="mycontainer",
blob="sample.txt"
)
# Vers un fichier
with open("./downloaded.txt", "wb") as file:
download_stream = blob_client.download_blob()
file.write(download_stream.readall())
# En mémoire
download_stream = blob_client.download_blob()
content = download_stream.readall() # bytes
# Lire dans un buffer existant
stream = io.BytesIO()
num_bytes = blob_client.download_blob().readinto(stream)
Lister les blobs
container_client = blob_service_client.get_container_client("mycontainer")
# Lister tous les blobs
for blob in container_client.list_blobs():
print(f"{blob.name} - {blob.size} bytes")
# Lister avec préfixe (style dossier)
for blob in container_client.list_blobs(name_starts_with="logs/"):
print(blob.name)
# Parcourir la hiérarchie des blobs (répertoires virtuels)
for item in container_client.walk_blobs(delimiter="/"):
if item.get("prefix"):
print(f"Directory: {item['prefix']}")
else:
print(f"Blob: {item.name}")
Supprimer un blob
blob_client.delete_blob()
# Supprimer avec snapshots
blob_client.delete_blob(delete_snapshots="include")
Optimisation des performances
# Configurer les tailles de bloc pour les gros téléchargements/chargements
blob_client = BlobClient(
account_url=account_url,
container_name="mycontainer",
blob_name="large-file.zip",
credential=credential,
max_block_size=4 * 1024 * 1024, # Blocs de 4 MiB
max_single_put_size=64 * 1024 * 1024 # Limite de chargement unique de 64 MiB
)
# Chargement parallèle
blob_client.upload_blob(data, max_concurrency=4)
# Téléchargement parallèle
download_stream = blob_client.download_blob(max_concurrency=4)
Jetons SAS
from datetime import datetime, timedelta, timezone
from azure.storage.blob import generate_blob_sas, BlobSasPermissions
sas_token = generate_blob_sas(
account_name="<account>",
container_name="mycontainer",
blob_name="sample.txt",
account_key="<account-key>", # Ou utiliser une clé de délégation utilisateur
permission=BlobSasPermissions(read=True),
expiry=datetime.now(timezone.utc) + timedelta(hours=1)
)
# Utiliser le jeton SAS
blob_url = f"https://<account>.blob.core.windows.net/mycontainer/sample.txt?{sas_token}"
Propriétés et métadonnées des blobs
# Obtenir les propriétés
properties = blob_client.get_blob_properties()
print(f"Size: {properties.size}")
print(f"Content-Type: {properties.content_settings.content_type}")
print(f"Last modified: {properties.last_modified}")
# Définir les métadonnées
blob_client.set_blob_metadata(metadata={"category": "logs", "year": "2024"})
# Définir le type de contenu
from azure.storage.blob import ContentSettings
blob_client.set_http_headers(
content_settings=ContentSettings(content_type="application/json")
)
Client asynchrone
from azure.identity.aio import DefaultAzureCredential
from azure.storage.blob.aio import BlobServiceClient
async def upload_async():
credential = DefaultAzureCredential()
async with BlobServiceClient(account_url, credential=credential) as client:
blob_client = client.get_blob_client("mycontainer", "sample.txt")
with open("./file.txt", "rb") as data:
await blob_client.upload_blob(data, overwrite=True)
# Télécharger de façon asynchrone
async def download_async():
async with BlobServiceClient(account_url, credential=credential) as client:
blob_client = client.get_blob_client("mycontainer", "sample.txt")
stream = await blob_client.download_blob()
data = await stream.readall()
Bonnes pratiques
- Utiliser DefaultAzureCredential au lieu de chaînes de connexion
- Utiliser les gestionnaires de contexte pour les clients asynchrones
- Définir
overwrite=Trueexplicitement lors du rechargement - Utiliser
max_concurrencypour les transferts de gros fichiers - Préférer
readinto()àreadall()pour l'efficacité mémoire - Utiliser
walk_blobs()pour l'énumération hiérarchique - Définir les types de contenu appropriés pour les blobs servis sur le web