SDK Azure Cosmos DB pour Python
Bibliothèque client pour l'API NoSQL d'Azure Cosmos DB — base de données distribuée mondialement et multi-modèle.
Installation
pip install azure-cosmos azure-identity
Variables d'environnement
COSMOS_ENDPOINT=https://<account>.documents.azure.com:443/ # Requis pour toutes les méthodes d'authentification
COSMOS_DATABASE=mydb # Requis pour toutes les méthodes d'authentification
COSMOS_CONTAINER=mycontainer # Requis pour toutes les méthodes d'authentification
AZURE_TOKEN_CREDENTIALS=prod # Requis uniquement si DefaultAzureCredential est utilisé en production
Authentification
import os
from azure.identity import DefaultAzureCredential, ManagedIdentityCredential
from azure.cosmos import CosmosClient
# Dev local : DefaultAzureCredential. Production : définir AZURE_TOKEN_CREDENTIALS=prod ou AZURE_TOKEN_CREDENTIALS=<specific_credential>
credential = DefaultAzureCredential(require_envvar=True)
# Ou utiliser une credential spécifique directement en production :
# Voir https://learn.microsoft.com/python/api/overview/azure/identity-readme?view=azure-python#credential-classes
# credential = ManagedIdentityCredential()
endpoint = "https://<account>.documents.azure.com:443/"
client = CosmosClient(url=endpoint, credential=credential)
Hiérarchie des clients
| Client | Objectif | Obtenu via |
|---|---|---|
CosmosClient |
Opérations au niveau du compte | Instanciation directe |
DatabaseProxy |
Opérations de base de données | client.get_database_client() |
ContainerProxy |
Opérations de conteneur/élément | database.get_container_client() |
Flux de travail principal
Configurer une base de données et un conteneur
# Obtenir ou créer une base de données
database = client.create_database_if_not_exists(id="mydb")
# Obtenir ou créer un conteneur avec clé de partition
container = database.create_container_if_not_exists(
id="mycontainer",
partition_key=PartitionKey(path="/category")
)
# Obtenir un existant
database = client.get_database_client("mydb")
container = database.get_container_client("mycontainer")
Créer un élément
item = {
"id": "item-001", # Requis : unique dans la partition
"category": "electronics", # Valeur de clé de partition
"name": "Laptop",
"price": 999.99,
"tags": ["computer", "portable"]
}
created = container.create_item(body=item)
print(f"Created: {created['id']}")
Lire un élément
# La lecture requiert l'id ET la clé de partition
item = container.read_item(
item="item-001",
partition_key="electronics"
)
print(f"Name: {item['name']}")
Mettre à jour un élément (Remplacer)
item = container.read_item(item="item-001", partition_key="electronics")
item["price"] = 899.99
item["on_sale"] = True
updated = container.replace_item(item=item["id"], body=item)
Upsert un élément
# Créer s'il n'existe pas, remplacer s'il existe
item = {
"id": "item-002",
"category": "electronics",
"name": "Tablet",
"price": 499.99
}
result = container.upsert_item(body=item)
Supprimer un élément
container.delete_item(
item="item-001",
partition_key="electronics"
)
Requêtes
Requête simple
# Requête dans une partition (efficace)
query = "SELECT * FROM c WHERE c.price < @max_price"
items = container.query_items(
query=query,
parameters=[{"name": "@max_price", "value": 500}],
partition_key="electronics"
)
for item in items:
print(f"{item['name']}: ${item['price']}")
Requête entre partitions
# Entre partitions (plus coûteux, à utiliser avec parcimonie)
query = "SELECT * FROM c WHERE c.price < @max_price"
items = container.query_items(
query=query,
parameters=[{"name": "@max_price", "value": 500}],
enable_cross_partition_query=True
)
for item in items:
print(item)
Requête avec projection
query = "SELECT c.id, c.name, c.price FROM c WHERE c.category = @category"
items = container.query_items(
query=query,
parameters=[{"name": "@category", "value": "electronics"}],
partition_key="electronics"
)
Lire tous les éléments
# Lire tous les éléments d'une partition
items = container.read_all_items() # Entre partitions
# Ou avec clé de partition
items = container.query_items(
query="SELECT * FROM c",
partition_key="electronics"
)
Clés de partition
Critique : Toujours inclure la clé de partition pour des opérations efficaces.
from azure.cosmos import PartitionKey
# Clé de partition unique
container = database.create_container_if_not_exists(
id="orders",
partition_key=PartitionKey(path="/customer_id")
)
# Clé de partition hiérarchique (préversion)
container = database.create_container_if_not_exists(
id="events",
partition_key=PartitionKey(path=["/tenant_id", "/user_id"])
)
Débit
# Créer un conteneur avec débit approvisionné
container = database.create_container_if_not_exists(
id="mycontainer",
partition_key=PartitionKey(path="/pk"),
offer_throughput=400 # RU/s
)
# Lire le débit actuel
offer = container.read_offer()
print(f"Throughput: {offer.offer_throughput} RU/s")
# Mettre à jour le débit
container.replace_throughput(throughput=1000)
Client asynchrone
from azure.cosmos.aio import CosmosClient
from azure.identity.aio import DefaultAzureCredential
async def cosmos_operations():
credential = DefaultAzureCredential()
async with CosmosClient(endpoint, credential=credential) as client:
database = client.get_database_client("mydb")
container = database.get_container_client("mycontainer")
# Créer
await container.create_item(body={"id": "1", "pk": "test"})
# Lire
item = await container.read_item(item="1", partition_key="test")
# Requête
async for item in container.query_items(
query="SELECT * FROM c",
partition_key="test"
):
print(item)
import asyncio
asyncio.run(cosmos_operations())
Gestion des erreurs
from azure.cosmos.exceptions import CosmosHttpResponseError
try:
item = container.read_item(item="nonexistent", partition_key="pk")
except CosmosHttpResponseError as e:
if e.status_code == 404:
print("Item not found")
elif e.status_code == 429:
print(f"Rate limited. Retry after: {e.headers.get('x-ms-retry-after-ms')}ms")
else:
raise
Bonnes pratiques
- Toujours spécifier la clé de partition pour les lectures ponctuelles et les requêtes
- Utiliser des requêtes paramétrées pour prévenir les injections et améliorer la mise en cache
- Éviter les requêtes entre partitions autant que possible
- Utiliser
upsert_itempour les écritures idempotentes - Utiliser le client asynchrone pour les scénarios de haut débit
- Concevoir la clé de partition pour une distribution de données uniforme
- Utiliser
read_itemau lieu d'une requête pour la récupération d'un seul document
Fichiers de référence
| Fichier | Contenu |
|---|---|
| references/partitioning.md | Stratégies de clé de partition, clés hiérarchiques, détection et atténuation des partitions chaudes |
| references/query-patterns.md | Optimisation des requêtes, agrégations, pagination, transactions, change feed |
| scripts/setup_cosmos_container.py | Outil CLI pour créer des conteneurs avec partitionnement, débit et indexation |