python-observability

Par wshobson · agents

Modèles d'observabilité Python incluant la journalisation structurée, les métriques et le tracing distribué. À utiliser lors de l'ajout de logs, de l'implémentation de la collecte de métriques, de la mise en place du tracing ou du débogage de systèmes en production.

npx skills add https://github.com/wshobson/agents --skill python-observability

Observabilité Python

Instrumentez les applications Python avec des logs structurés, des métriques et des traces. Quand quelque chose se casse en production, vous devez répondre à « quoi, où, et pourquoi » sans déployer de nouveau code.

Quand utiliser cette compétence

  • Ajouter des logs structurés aux applications
  • Implémenter la collecte de métriques avec Prometheus
  • Configurer le tracing distribué entre les services
  • Propager les ID de corrélation à travers les chaînes de requêtes
  • Déboguer les problèmes en production
  • Construire des tableaux de bord d'observabilité

Concepts fondamentaux

1. Logs structurés

Émettre les logs au format JSON avec des champs cohérents pour les environnements de production. Les logs lisibles par machine permettent des requêtes et des alertes puissantes. Pour le développement local, envisagez des formats lisibles par l'homme.

2. Les quatre signaux dorés

Suivez la latence, le trafic, les erreurs et la saturation pour chaque limite de service.

3. ID de corrélation

Transmettez un ID unique à travers tous les logs et spans pour une seule requête, permettant le tracing de bout en bout.

4. Cardinalité bornée

Maintenez les valeurs des labels de métriques bornées. Les labels non bornés (comme les ID utilisateur) explosent les coûts de stockage.

Démarrage rapide

import structlog

structlog.configure(
    processors=[
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.JSONRenderer(),
    ],
)

logger = structlog.get_logger()
logger.info("Request processed", user_id="123", duration_ms=45)

Patterns fondamentaux

Pattern 1 : Logs structurés avec Structlog

Configurez structlog pour une sortie JSON avec des champs cohérents.

import logging
import structlog

def configure_logging(log_level: str = "INFO") -> None:
    """Configure structured logging for the application."""
    structlog.configure(
        processors=[
            structlog.contextvars.merge_contextvars,
            structlog.processors.add_log_level,
            structlog.processors.TimeStamper(fmt="iso"),
            structlog.processors.StackInfoRenderer(),
            structlog.processors.format_exc_info,
            structlog.processors.JSONRenderer(),
        ],
        wrapper_class=structlog.make_filtering_bound_logger(
            getattr(logging, log_level.upper())
        ),
        context_class=dict,
        logger_factory=structlog.PrintLoggerFactory(),
        cache_logger_on_first_use=True,
    )

# Initialize at application startup
configure_logging("INFO")
logger = structlog.get_logger()

Pattern 2 : Champs de log cohérents

Chaque entrée de log doit inclure des champs standard pour le filtrage et la corrélation.

import structlog
from contextvars import ContextVar

# Store correlation ID in context
correlation_id: ContextVar[str] = ContextVar("correlation_id", default="")

logger = structlog.get_logger()

def process_request(request: Request) -> Response:
    """Process request with structured logging."""
    logger.info(
        "Request received",
        correlation_id=correlation_id.get(),
        method=request.method,
        path=request.path,
        user_id=request.user_id,
    )

    try:
        result = handle_request(request)
        logger.info(
            "Request completed",
            correlation_id=correlation_id.get(),
            status_code=200,
            duration_ms=elapsed,
        )
        return result
    except Exception as e:
        logger.error(
            "Request failed",
            correlation_id=correlation_id.get(),
            error_type=type(e).__name__,
            error_message=str(e),
        )
        raise

Pattern 3 : Niveaux de log sémantiques

Utilisez les niveaux de log de manière cohérente dans l'application.

Niveau Objectif Exemples
DEBUG Diagnostics de développement Valeurs de variables, état interne
INFO Cycle de vie des requêtes, opérations Début/fin de requête, fin de job
WARNING Anomalies récupérables Tentatives de relance, fallback utilisé
ERROR Défaillances nécessitant attention Exceptions, service indisponible
# DEBUG: Detailed internal information
logger.debug("Cache lookup", key=cache_key, hit=cache_hit)

# INFO: Normal operational events
logger.info("Order created", order_id=order.id, total=order.total)

# WARNING: Abnormal but handled situations
logger.warning(
    "Rate limit approaching",
    current_rate=950,
    limit=1000,
    reset_seconds=30,
)

# ERROR: Failures requiring investigation
logger.error(
    "Payment processing failed",
    order_id=order.id,
    error=str(e),
    payment_provider="stripe",
)

Ne loggez jamais le comportement attendu au niveau ERROR. Un utilisateur qui entre un mauvais mot de passe est INFO, pas ERROR.

Pattern 4 : Propagation des ID de corrélation

Générez un ID unique à l'entrée et transmettez-le à travers toutes les opérations.

from contextvars import ContextVar
import uuid
import structlog

correlation_id: ContextVar[str] = ContextVar("correlation_id", default="")

def set_correlation_id(cid: str | None = None) -> str:
    """Set correlation ID for current context."""
    cid = cid or str(uuid.uuid4())
    correlation_id.set(cid)
    structlog.contextvars.bind_contextvars(correlation_id=cid)
    return cid

# FastAPI middleware example
from fastapi import Request

async def correlation_middleware(request: Request, call_next):
    """Middleware to set and propagate correlation ID."""
    # Use incoming header or generate new
    cid = request.headers.get("X-Correlation-ID") or str(uuid.uuid4())
    set_correlation_id(cid)

    response = await call_next(request)
    response.headers["X-Correlation-ID"] = cid
    return response

Propagez vers les requêtes sortantes :

import httpx

async def call_downstream_service(endpoint: str, data: dict) -> dict:
    """Call downstream service with correlation ID."""
    async with httpx.AsyncClient() as client:
        response = await client.post(
            endpoint,
            json=data,
            headers={"X-Correlation-ID": correlation_id.get()},
        )
        return response.json()

Patterns avancés

Pattern 5 : Les quatre signaux dorés avec Prometheus

Suivez ces métriques pour chaque limite de service :

from prometheus_client import Counter, Histogram, Gauge

# Latency: How long requests take
REQUEST_LATENCY = Histogram(
    "http_request_duration_seconds",
    "Request latency in seconds",
    ["method", "endpoint", "status"],
    buckets=[0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10],
)

# Traffic: Request rate
REQUEST_COUNT = Counter(
    "http_requests_total",
    "Total HTTP requests",
    ["method", "endpoint", "status"],
)

# Errors: Error rate
ERROR_COUNT = Counter(
    "http_errors_total",
    "Total HTTP errors",
    ["method", "endpoint", "error_type"],
)

# Saturation: Resource utilization
DB_POOL_USAGE = Gauge(
    "db_connection_pool_used",
    "Number of database connections in use",
)

Instrumentez vos endpoints :

import time
from functools import wraps

def track_request(func):
    """Decorator to track request metrics."""
    @wraps(func)
    async def wrapper(request: Request, *args, **kwargs):
        method = request.method
        endpoint = request.url.path
        start = time.perf_counter()

        try:
            response = await func(request, *args, **kwargs)
            status = str(response.status_code)
            return response
        except Exception as e:
            status = "500"
            ERROR_COUNT.labels(
                method=method,
                endpoint=endpoint,
                error_type=type(e).__name__,
            ).inc()
            raise
        finally:
            duration = time.perf_counter() - start
            REQUEST_COUNT.labels(method=method, endpoint=endpoint, status=status).inc()
            REQUEST_LATENCY.labels(method=method, endpoint=endpoint, status=status).observe(duration)

    return wrapper

Pattern 6 : Cardinalité bornée

Évitez les labels avec des valeurs non bornées pour prévenir l'explosion des métriques.

# BAD: User ID has potentially millions of values
REQUEST_COUNT.labels(method="GET", user_id=user.id)  # Don't do this!

# GOOD: Bounded values only
REQUEST_COUNT.labels(method="GET", endpoint="/users", status="200")

# If you need per-user metrics, use a different approach:
# - Log the user_id and query logs
# - Use a separate analytics system
# - Bucket users by type/tier
REQUEST_COUNT.labels(
    method="GET",
    endpoint="/users",
    user_tier="premium",  # Bounded set of values
)

Pattern 7 : Opérations chronométrées avec gestionnaire de contexte

Créez un gestionnaire de contexte de timing réutilisable pour les opérations.

from contextlib import contextmanager
import time
import structlog

logger = structlog.get_logger()

@contextmanager
def timed_operation(name: str, **extra_fields):
    """Context manager for timing and logging operations."""
    start = time.perf_counter()
    logger.debug("Operation started", operation=name, **extra_fields)

    try:
        yield
    except Exception as e:
        elapsed_ms = (time.perf_counter() - start) * 1000
        logger.error(
            "Operation failed",
            operation=name,
            duration_ms=round(elapsed_ms, 2),
            error=str(e),
            **extra_fields,
        )
        raise
    else:
        elapsed_ms = (time.perf_counter() - start) * 1000
        logger.info(
            "Operation completed",
            operation=name,
            duration_ms=round(elapsed_ms, 2),
            **extra_fields,
        )

# Usage
with timed_operation("fetch_user_orders", user_id=user.id):
    orders = await order_repository.get_by_user(user.id)

Pattern 8 : Tracing OpenTelemetry

Configurez le tracing distribué avec OpenTelemetry.

Note : OpenTelemetry évolue activement. Consultez la documentation officielle Python pour les derniers patterns d'API et les meilleures pratiques.

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter

def configure_tracing(service_name: str, otlp_endpoint: str) -> None:
    """Configure OpenTelemetry tracing."""
    provider = TracerProvider()
    processor = BatchSpanProcessor(OTLPSpanExporter(endpoint=otlp_endpoint))
    provider.add_span_processor(processor)
    trace.set_tracer_provider(provider)

tracer = trace.get_tracer(__name__)

async def process_order(order_id: str) -> Order:
    """Process order with tracing."""
    with tracer.start_as_current_span("process_order") as span:
        span.set_attribute("order.id", order_id)

        with tracer.start_as_current_span("validate_order"):
            validate_order(order_id)

        with tracer.start_as_current_span("charge_payment"):
            charge_payment(order_id)

        with tracer.start_as_current_span("send_confirmation"):
            send_confirmation(order_id)

        return order

Résumé des meilleures pratiques

  1. Utilisez des logs structurés - Logs JSON avec des champs cohérents
  2. Propagez les ID de corrélation - Transmettez à travers toutes les requêtes et logs
  3. Suivez les quatre signaux dorés - Latence, trafic, erreurs, saturation
  4. Bornez la cardinalité des labels - N'utilisez jamais de valeurs non bornées comme labels de métriques
  5. Loggez aux niveaux appropriés - Ne criez pas au loup avec ERROR
  6. Incluez le contexte - ID utilisateur, ID requête, nom d'opération dans les logs
  7. Utilisez les gestionnaires de contexte - Timing et gestion d'erreurs cohérents
  8. Séparez les préoccupations - Le code d'observabilité ne doit pas polluer la logique métier
  9. Testez votre observabilité - Vérifiez les logs et métriques dans les tests d'intégration
  10. Configurez des alertes - Les métriques sont inutiles sans alertes

Skills similaires