azure-cosmos-py

npx skills add https://github.com/microsoft/skills --skill azure-cosmos-py

SDK Azure Cosmos DB pour Python

Bibliothèque client pour l'API NoSQL d'Azure Cosmos DB — base de données distribuée mondialement et multi-modèle.

Installation

pip install azure-cosmos azure-identity

Variables d'environnement

COSMOS_ENDPOINT=https://<account>.documents.azure.com:443/  # Requis pour toutes les méthodes d'authentification
COSMOS_DATABASE=mydb  # Requis pour toutes les méthodes d'authentification
COSMOS_CONTAINER=mycontainer  # Requis pour toutes les méthodes d'authentification
AZURE_TOKEN_CREDENTIALS=prod # Requis uniquement si DefaultAzureCredential est utilisé en production

Authentification

import os
from azure.identity import DefaultAzureCredential, ManagedIdentityCredential
from azure.cosmos import CosmosClient

# Dev local : DefaultAzureCredential. Production : définir AZURE_TOKEN_CREDENTIALS=prod ou AZURE_TOKEN_CREDENTIALS=<specific_credential>
credential = DefaultAzureCredential(require_envvar=True)
# Ou utiliser une credential spécifique directement en production :
# Voir https://learn.microsoft.com/python/api/overview/azure/identity-readme?view=azure-python#credential-classes
# credential = ManagedIdentityCredential()

endpoint = "https://<account>.documents.azure.com:443/"

client = CosmosClient(url=endpoint, credential=credential)

Hiérarchie des clients

Client Objectif Obtenu via
CosmosClient Opérations au niveau du compte Instanciation directe
DatabaseProxy Opérations de base de données client.get_database_client()
ContainerProxy Opérations de conteneur/élément database.get_container_client()

Flux de travail principal

Configurer une base de données et un conteneur

# Obtenir ou créer une base de données
database = client.create_database_if_not_exists(id="mydb")

# Obtenir ou créer un conteneur avec clé de partition
container = database.create_container_if_not_exists(
    id="mycontainer",
    partition_key=PartitionKey(path="/category")
)

# Obtenir un existant
database = client.get_database_client("mydb")
container = database.get_container_client("mycontainer")

Créer un élément

item = {
    "id": "item-001",           # Requis : unique dans la partition
    "category": "electronics",   # Valeur de clé de partition
    "name": "Laptop",
    "price": 999.99,
    "tags": ["computer", "portable"]
}

created = container.create_item(body=item)
print(f"Created: {created['id']}")

Lire un élément

# La lecture requiert l'id ET la clé de partition
item = container.read_item(
    item="item-001",
    partition_key="electronics"
)
print(f"Name: {item['name']}")

Mettre à jour un élément (Remplacer)

item = container.read_item(item="item-001", partition_key="electronics")
item["price"] = 899.99
item["on_sale"] = True

updated = container.replace_item(item=item["id"], body=item)

Upsert un élément

# Créer s'il n'existe pas, remplacer s'il existe
item = {
    "id": "item-002",
    "category": "electronics",
    "name": "Tablet",
    "price": 499.99
}

result = container.upsert_item(body=item)

Supprimer un élément

container.delete_item(
    item="item-001",
    partition_key="electronics"
)

Requêtes

Requête simple

# Requête dans une partition (efficace)
query = "SELECT * FROM c WHERE c.price < @max_price"
items = container.query_items(
    query=query,
    parameters=[{"name": "@max_price", "value": 500}],
    partition_key="electronics"
)

for item in items:
    print(f"{item['name']}: ${item['price']}")

Requête entre partitions

# Entre partitions (plus coûteux, à utiliser avec parcimonie)
query = "SELECT * FROM c WHERE c.price < @max_price"
items = container.query_items(
    query=query,
    parameters=[{"name": "@max_price", "value": 500}],
    enable_cross_partition_query=True
)

for item in items:
    print(item)

Requête avec projection

query = "SELECT c.id, c.name, c.price FROM c WHERE c.category = @category"
items = container.query_items(
    query=query,
    parameters=[{"name": "@category", "value": "electronics"}],
    partition_key="electronics"
)

Lire tous les éléments

# Lire tous les éléments d'une partition
items = container.read_all_items()  # Entre partitions
# Ou avec clé de partition
items = container.query_items(
    query="SELECT * FROM c",
    partition_key="electronics"
)

Clés de partition

Critique : Toujours inclure la clé de partition pour des opérations efficaces.

from azure.cosmos import PartitionKey

# Clé de partition unique
container = database.create_container_if_not_exists(
    id="orders",
    partition_key=PartitionKey(path="/customer_id")
)

# Clé de partition hiérarchique (préversion)
container = database.create_container_if_not_exists(
    id="events",
    partition_key=PartitionKey(path=["/tenant_id", "/user_id"])
)

Débit

# Créer un conteneur avec débit approvisionné
container = database.create_container_if_not_exists(
    id="mycontainer",
    partition_key=PartitionKey(path="/pk"),
    offer_throughput=400  # RU/s
)

# Lire le débit actuel
offer = container.read_offer()
print(f"Throughput: {offer.offer_throughput} RU/s")

# Mettre à jour le débit
container.replace_throughput(throughput=1000)

Client asynchrone

from azure.cosmos.aio import CosmosClient
from azure.identity.aio import DefaultAzureCredential

async def cosmos_operations():
    credential = DefaultAzureCredential()

    async with CosmosClient(endpoint, credential=credential) as client:
        database = client.get_database_client("mydb")
        container = database.get_container_client("mycontainer")

        # Créer
        await container.create_item(body={"id": "1", "pk": "test"})

        # Lire
        item = await container.read_item(item="1", partition_key="test")

        # Requête
        async for item in container.query_items(
            query="SELECT * FROM c",
            partition_key="test"
        ):
            print(item)

import asyncio
asyncio.run(cosmos_operations())

Gestion des erreurs

from azure.cosmos.exceptions import CosmosHttpResponseError

try:
    item = container.read_item(item="nonexistent", partition_key="pk")
except CosmosHttpResponseError as e:
    if e.status_code == 404:
        print("Item not found")
    elif e.status_code == 429:
        print(f"Rate limited. Retry after: {e.headers.get('x-ms-retry-after-ms')}ms")
    else:
        raise

Bonnes pratiques

  1. Toujours spécifier la clé de partition pour les lectures ponctuelles et les requêtes
  2. Utiliser des requêtes paramétrées pour prévenir les injections et améliorer la mise en cache
  3. Éviter les requêtes entre partitions autant que possible
  4. Utiliser upsert_item pour les écritures idempotentes
  5. Utiliser le client asynchrone pour les scénarios de haut débit
  6. Concevoir la clé de partition pour une distribution de données uniforme
  7. Utiliser read_item au lieu d'une requête pour la récupération d'un seul document

Fichiers de référence

Fichier Contenu
references/partitioning.md Stratégies de clé de partition, clés hiérarchiques, détection et atténuation des partitions chaudes
references/query-patterns.md Optimisation des requêtes, agrégations, pagination, transactions, change feed
scripts/setup_cosmos_container.py Outil CLI pour créer des conteneurs avec partitionnement, débit et indexation

Skills similaires