Observabilité Python
Instrumentez les applications Python avec des logs structurés, des métriques et des traces. Quand quelque chose se casse en production, vous devez répondre à « quoi, où, et pourquoi » sans déployer de nouveau code.
Quand utiliser cette compétence
- Ajouter des logs structurés aux applications
- Implémenter la collecte de métriques avec Prometheus
- Configurer le tracing distribué entre les services
- Propager les ID de corrélation à travers les chaînes de requêtes
- Déboguer les problèmes en production
- Construire des tableaux de bord d'observabilité
Concepts fondamentaux
1. Logs structurés
Émettre les logs au format JSON avec des champs cohérents pour les environnements de production. Les logs lisibles par machine permettent des requêtes et des alertes puissantes. Pour le développement local, envisagez des formats lisibles par l'homme.
2. Les quatre signaux dorés
Suivez la latence, le trafic, les erreurs et la saturation pour chaque limite de service.
3. ID de corrélation
Transmettez un ID unique à travers tous les logs et spans pour une seule requête, permettant le tracing de bout en bout.
4. Cardinalité bornée
Maintenez les valeurs des labels de métriques bornées. Les labels non bornés (comme les ID utilisateur) explosent les coûts de stockage.
Démarrage rapide
import structlog
structlog.configure(
processors=[
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer(),
],
)
logger = structlog.get_logger()
logger.info("Request processed", user_id="123", duration_ms=45)
Patterns fondamentaux
Pattern 1 : Logs structurés avec Structlog
Configurez structlog pour une sortie JSON avec des champs cohérents.
import logging
import structlog
def configure_logging(log_level: str = "INFO") -> None:
"""Configure structured logging for the application."""
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.JSONRenderer(),
],
wrapper_class=structlog.make_filtering_bound_logger(
getattr(logging, log_level.upper())
),
context_class=dict,
logger_factory=structlog.PrintLoggerFactory(),
cache_logger_on_first_use=True,
)
# Initialize at application startup
configure_logging("INFO")
logger = structlog.get_logger()
Pattern 2 : Champs de log cohérents
Chaque entrée de log doit inclure des champs standard pour le filtrage et la corrélation.
import structlog
from contextvars import ContextVar
# Store correlation ID in context
correlation_id: ContextVar[str] = ContextVar("correlation_id", default="")
logger = structlog.get_logger()
def process_request(request: Request) -> Response:
"""Process request with structured logging."""
logger.info(
"Request received",
correlation_id=correlation_id.get(),
method=request.method,
path=request.path,
user_id=request.user_id,
)
try:
result = handle_request(request)
logger.info(
"Request completed",
correlation_id=correlation_id.get(),
status_code=200,
duration_ms=elapsed,
)
return result
except Exception as e:
logger.error(
"Request failed",
correlation_id=correlation_id.get(),
error_type=type(e).__name__,
error_message=str(e),
)
raise
Pattern 3 : Niveaux de log sémantiques
Utilisez les niveaux de log de manière cohérente dans l'application.
| Niveau | Objectif | Exemples |
|---|---|---|
DEBUG |
Diagnostics de développement | Valeurs de variables, état interne |
INFO |
Cycle de vie des requêtes, opérations | Début/fin de requête, fin de job |
WARNING |
Anomalies récupérables | Tentatives de relance, fallback utilisé |
ERROR |
Défaillances nécessitant attention | Exceptions, service indisponible |
# DEBUG: Detailed internal information
logger.debug("Cache lookup", key=cache_key, hit=cache_hit)
# INFO: Normal operational events
logger.info("Order created", order_id=order.id, total=order.total)
# WARNING: Abnormal but handled situations
logger.warning(
"Rate limit approaching",
current_rate=950,
limit=1000,
reset_seconds=30,
)
# ERROR: Failures requiring investigation
logger.error(
"Payment processing failed",
order_id=order.id,
error=str(e),
payment_provider="stripe",
)
Ne loggez jamais le comportement attendu au niveau ERROR. Un utilisateur qui entre un mauvais mot de passe est INFO, pas ERROR.
Pattern 4 : Propagation des ID de corrélation
Générez un ID unique à l'entrée et transmettez-le à travers toutes les opérations.
from contextvars import ContextVar
import uuid
import structlog
correlation_id: ContextVar[str] = ContextVar("correlation_id", default="")
def set_correlation_id(cid: str | None = None) -> str:
"""Set correlation ID for current context."""
cid = cid or str(uuid.uuid4())
correlation_id.set(cid)
structlog.contextvars.bind_contextvars(correlation_id=cid)
return cid
# FastAPI middleware example
from fastapi import Request
async def correlation_middleware(request: Request, call_next):
"""Middleware to set and propagate correlation ID."""
# Use incoming header or generate new
cid = request.headers.get("X-Correlation-ID") or str(uuid.uuid4())
set_correlation_id(cid)
response = await call_next(request)
response.headers["X-Correlation-ID"] = cid
return response
Propagez vers les requêtes sortantes :
import httpx
async def call_downstream_service(endpoint: str, data: dict) -> dict:
"""Call downstream service with correlation ID."""
async with httpx.AsyncClient() as client:
response = await client.post(
endpoint,
json=data,
headers={"X-Correlation-ID": correlation_id.get()},
)
return response.json()
Patterns avancés
Pattern 5 : Les quatre signaux dorés avec Prometheus
Suivez ces métriques pour chaque limite de service :
from prometheus_client import Counter, Histogram, Gauge
# Latency: How long requests take
REQUEST_LATENCY = Histogram(
"http_request_duration_seconds",
"Request latency in seconds",
["method", "endpoint", "status"],
buckets=[0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10],
)
# Traffic: Request rate
REQUEST_COUNT = Counter(
"http_requests_total",
"Total HTTP requests",
["method", "endpoint", "status"],
)
# Errors: Error rate
ERROR_COUNT = Counter(
"http_errors_total",
"Total HTTP errors",
["method", "endpoint", "error_type"],
)
# Saturation: Resource utilization
DB_POOL_USAGE = Gauge(
"db_connection_pool_used",
"Number of database connections in use",
)
Instrumentez vos endpoints :
import time
from functools import wraps
def track_request(func):
"""Decorator to track request metrics."""
@wraps(func)
async def wrapper(request: Request, *args, **kwargs):
method = request.method
endpoint = request.url.path
start = time.perf_counter()
try:
response = await func(request, *args, **kwargs)
status = str(response.status_code)
return response
except Exception as e:
status = "500"
ERROR_COUNT.labels(
method=method,
endpoint=endpoint,
error_type=type(e).__name__,
).inc()
raise
finally:
duration = time.perf_counter() - start
REQUEST_COUNT.labels(method=method, endpoint=endpoint, status=status).inc()
REQUEST_LATENCY.labels(method=method, endpoint=endpoint, status=status).observe(duration)
return wrapper
Pattern 6 : Cardinalité bornée
Évitez les labels avec des valeurs non bornées pour prévenir l'explosion des métriques.
# BAD: User ID has potentially millions of values
REQUEST_COUNT.labels(method="GET", user_id=user.id) # Don't do this!
# GOOD: Bounded values only
REQUEST_COUNT.labels(method="GET", endpoint="/users", status="200")
# If you need per-user metrics, use a different approach:
# - Log the user_id and query logs
# - Use a separate analytics system
# - Bucket users by type/tier
REQUEST_COUNT.labels(
method="GET",
endpoint="/users",
user_tier="premium", # Bounded set of values
)
Pattern 7 : Opérations chronométrées avec gestionnaire de contexte
Créez un gestionnaire de contexte de timing réutilisable pour les opérations.
from contextlib import contextmanager
import time
import structlog
logger = structlog.get_logger()
@contextmanager
def timed_operation(name: str, **extra_fields):
"""Context manager for timing and logging operations."""
start = time.perf_counter()
logger.debug("Operation started", operation=name, **extra_fields)
try:
yield
except Exception as e:
elapsed_ms = (time.perf_counter() - start) * 1000
logger.error(
"Operation failed",
operation=name,
duration_ms=round(elapsed_ms, 2),
error=str(e),
**extra_fields,
)
raise
else:
elapsed_ms = (time.perf_counter() - start) * 1000
logger.info(
"Operation completed",
operation=name,
duration_ms=round(elapsed_ms, 2),
**extra_fields,
)
# Usage
with timed_operation("fetch_user_orders", user_id=user.id):
orders = await order_repository.get_by_user(user.id)
Pattern 8 : Tracing OpenTelemetry
Configurez le tracing distribué avec OpenTelemetry.
Note : OpenTelemetry évolue activement. Consultez la documentation officielle Python pour les derniers patterns d'API et les meilleures pratiques.
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
def configure_tracing(service_name: str, otlp_endpoint: str) -> None:
"""Configure OpenTelemetry tracing."""
provider = TracerProvider()
processor = BatchSpanProcessor(OTLPSpanExporter(endpoint=otlp_endpoint))
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)
tracer = trace.get_tracer(__name__)
async def process_order(order_id: str) -> Order:
"""Process order with tracing."""
with tracer.start_as_current_span("process_order") as span:
span.set_attribute("order.id", order_id)
with tracer.start_as_current_span("validate_order"):
validate_order(order_id)
with tracer.start_as_current_span("charge_payment"):
charge_payment(order_id)
with tracer.start_as_current_span("send_confirmation"):
send_confirmation(order_id)
return order
Résumé des meilleures pratiques
- Utilisez des logs structurés - Logs JSON avec des champs cohérents
- Propagez les ID de corrélation - Transmettez à travers toutes les requêtes et logs
- Suivez les quatre signaux dorés - Latence, trafic, erreurs, saturation
- Bornez la cardinalité des labels - N'utilisez jamais de valeurs non bornées comme labels de métriques
- Loggez aux niveaux appropriés - Ne criez pas au loup avec ERROR
- Incluez le contexte - ID utilisateur, ID requête, nom d'opération dans les logs
- Utilisez les gestionnaires de contexte - Timing et gestion d'erreurs cohérents
- Séparez les préoccupations - Le code d'observabilité ne doit pas polluer la logique métier
- Testez votre observabilité - Vérifiez les logs et métriques dans les tests d'intégration
- Configurez des alertes - Les métriques sont inutiles sans alertes