python-resource-management

Par wshobson · agents

Gestion des ressources Python avec les gestionnaires de contexte, les patterns de nettoyage et le streaming. À utiliser pour gérer des connexions, des descripteurs de fichiers, implémenter une logique de nettoyage ou construire des réponses en streaming avec état accumulé.

npx skills add https://github.com/wshobson/agents --skill python-resource-management

Gestion des ressources Python

Gérez les ressources de manière déterministe en utilisant les gestionnaires de contexte. Les ressources comme les connexions de base de données, les descripteurs de fichiers et les sockets réseau doivent être libérées de manière fiable, même en cas d'exception.

Quand utiliser cette compétence

  • Gérer les connexions de base de données et les pools de connexions
  • Travailler avec des descripteurs de fichiers et les E/S
  • Implémenter des gestionnaires de contexte personnalisés
  • Construire des réponses en streaming avec état
  • Gérer le nettoyage imbriqué des ressources
  • Créer des gestionnaires de contexte asynchrones

Concepts fondamentaux

1. Gestionnaires de contexte

L'instruction with garantit que les ressources sont libérées automatiquement, même en cas d'exception.

2. Méthodes de protocole

__enter__/__exit__ pour la gestion synchrone, __aenter__/__aexit__ pour la gestion asynchrone des ressources.

3. Nettoyage inconditionnel

__exit__ s'exécute toujours, indépendamment du fait qu'une exception se soit produite.

4. Gestion des exceptions

Retournez True depuis __exit__ pour supprimer les exceptions, False pour les propager.

Démarrage rapide

from contextlib import contextmanager

@contextmanager
def managed_resource():
    resource = acquire_resource()
    try:
        yield resource
    finally:
        resource.cleanup()

with managed_resource() as r:
    r.do_work()

Patterns fondamentaux

Pattern 1 : Gestionnaire de contexte basé sur une classe

Implémentez le protocole du gestionnaire de contexte pour les ressources complexes.

class DatabaseConnection:
    """Database connection with automatic cleanup."""

    def __init__(self, dsn: str) -> None:
        self._dsn = dsn
        self._conn: Connection | None = None

    def connect(self) -> None:
        """Establish database connection."""
        self._conn = psycopg.connect(self._dsn)

    def close(self) -> None:
        """Close connection if open."""
        if self._conn is not None:
            self._conn.close()
            self._conn = None

    def __enter__(self) -> "DatabaseConnection":
        """Enter context: connect and return self."""
        self.connect()
        return self

    def __exit__(
        self,
        exc_type: type[BaseException] | None,
        exc_val: BaseException | None,
        exc_tb: TracebackType | None,
    ) -> None:
        """Exit context: always close connection."""
        self.close()

# Usage with context manager (preferred)
with DatabaseConnection(dsn) as db:
    result = db.execute(query)

# Manual management when needed
db = DatabaseConnection(dsn)
db.connect()
try:
    result = db.execute(query)
finally:
    db.close()

Pattern 2 : Gestionnaire de contexte asynchrone

Pour les ressources asynchrones, implémentez le protocole asynchrone.

class AsyncDatabasePool:
    """Async database connection pool."""

    def __init__(self, dsn: str, min_size: int = 1, max_size: int = 10) -> None:
        self._dsn = dsn
        self._min_size = min_size
        self._max_size = max_size
        self._pool: asyncpg.Pool | None = None

    async def __aenter__(self) -> "AsyncDatabasePool":
        """Create connection pool."""
        self._pool = await asyncpg.create_pool(
            self._dsn,
            min_size=self._min_size,
            max_size=self._max_size,
        )
        return self

    async def __aexit__(
        self,
        exc_type: type[BaseException] | None,
        exc_val: BaseException | None,
        exc_tb: TracebackType | None,
    ) -> None:
        """Close all connections in pool."""
        if self._pool is not None:
            await self._pool.close()

    async def execute(self, query: str, *args) -> list[dict]:
        """Execute query using pooled connection."""
        async with self._pool.acquire() as conn:
            return await conn.fetch(query, *args)

# Usage
async with AsyncDatabasePool(dsn) as pool:
    users = await pool.execute("SELECT * FROM users WHERE active = $1", True)

Pattern 3 : Utiliser le décorateur @contextmanager

Simplifiez les gestionnaires de contexte avec le décorateur pour les cas simples.

from contextlib import contextmanager, asynccontextmanager
import time
import structlog

logger = structlog.get_logger()

@contextmanager
def timed_block(name: str):
    """Time a block of code."""
    start = time.perf_counter()
    try:
        yield
    finally:
        elapsed = time.perf_counter() - start
        logger.info(f"{name} completed", duration_seconds=round(elapsed, 3))

# Usage
with timed_block("data_processing"):
    process_large_dataset()

@asynccontextmanager
async def database_transaction(conn: AsyncConnection):
    """Manage database transaction."""
    await conn.execute("BEGIN")
    try:
        yield conn
        await conn.execute("COMMIT")
    except Exception:
        await conn.execute("ROLLBACK")
        raise

# Usage
async with database_transaction(conn) as tx:
    await tx.execute("INSERT INTO users ...")
    await tx.execute("INSERT INTO audit_log ...")

Pattern 4 : Libération inconditionnelle des ressources

Nettoyez toujours les ressources dans __exit__, indépendamment des exceptions.

class FileProcessor:
    """Process file with guaranteed cleanup."""

    def __init__(self, path: str) -> None:
        self._path = path
        self._file: IO | None = None
        self._temp_files: list[Path] = []

    def __enter__(self) -> "FileProcessor":
        self._file = open(self._path, "r")
        return self

    def __exit__(
        self,
        exc_type: type[BaseException] | None,
        exc_val: BaseException | None,
        exc_tb: TracebackType | None,
    ) -> None:
        """Clean up all resources unconditionally."""
        # Close main file
        if self._file is not None:
            self._file.close()

        # Clean up any temporary files
        for temp_file in self._temp_files:
            try:
                temp_file.unlink()
            except OSError:
                pass  # Best effort cleanup

        # Return None/False to propagate any exception

Patterns avancés

Pattern 5 : Suppression sélective des exceptions

Supprimez uniquement les exceptions spécifiques et documentées.

class StreamWriter:
    """Writer that handles broken pipe gracefully."""

    def __init__(self, stream) -> None:
        self._stream = stream

    def __enter__(self) -> "StreamWriter":
        return self

    def __exit__(
        self,
        exc_type: type[BaseException] | None,
        exc_val: BaseException | None,
        exc_tb: TracebackType | None,
    ) -> bool:
        """Clean up, suppressing BrokenPipeError on shutdown."""
        self._stream.close()

        # Suppress BrokenPipeError (client disconnected)
        # This is expected behavior, not an error
        if exc_type is BrokenPipeError:
            return True  # Exception suppressed

        return False  # Propagate all other exceptions

Pattern 6 : Streaming avec état accumulé

Maintenez à la fois les chunks incrémentiels et l'état accumulé lors du streaming.

from collections.abc import Generator
from dataclasses import dataclass, field

@dataclass
class StreamingResult:
    """Accumulated streaming result."""

    chunks: list[str] = field(default_factory=list)
    _finalized: bool = False

    @property
    def content(self) -> str:
        """Get accumulated content."""
        return "".join(self.chunks)

    def add_chunk(self, chunk: str) -> None:
        """Add chunk to accumulator."""
        if self._finalized:
            raise RuntimeError("Cannot add to finalized result")
        self.chunks.append(chunk)

    def finalize(self) -> str:
        """Mark stream complete and return content."""
        self._finalized = True
        return self.content

def stream_with_accumulation(
    response: StreamingResponse,
) -> Generator[tuple[str, str], None, str]:
    """Stream response while accumulating content.

    Yields:
        Tuple of (accumulated_content, new_chunk) for each chunk.

    Returns:
        Final accumulated content.
    """
    result = StreamingResult()

    for chunk in response.iter_content():
        result.add_chunk(chunk)
        yield result.content, chunk

    return result.finalize()

Pattern 7 : Accumulation efficace de chaînes

Évitez la concaténation de chaînes en O(n²) lors de l'accumulation.

def accumulate_stream(stream) -> str:
    """Efficiently accumulate stream content."""
    # BAD: O(n²) due to string immutability
    # content = ""
    # for chunk in stream:
    #     content += chunk  # Creates new string each time

    # GOOD: O(n) with list and join
    chunks: list[str] = []
    for chunk in stream:
        chunks.append(chunk)
    return "".join(chunks)  # Single allocation

Pattern 8 : Suivi des métriques de streaming

Mesurez le délai jusqu'au premier octet et le temps de streaming total.

import time
from collections.abc import Generator

def stream_with_metrics(
    response: StreamingResponse,
) -> Generator[str, None, dict]:
    """Stream response while collecting metrics.

    Yields:
        Content chunks.

    Returns:
        Metrics dictionary.
    """
    start = time.perf_counter()
    first_chunk_time: float | None = None
    chunk_count = 0
    total_bytes = 0

    for chunk in response.iter_content():
        if first_chunk_time is None:
            first_chunk_time = time.perf_counter() - start

        chunk_count += 1
        total_bytes += len(chunk.encode())
        yield chunk

    total_time = time.perf_counter() - start

    return {
        "time_to_first_byte_ms": round((first_chunk_time or 0) * 1000, 2),
        "total_time_ms": round(total_time * 1000, 2),
        "chunk_count": chunk_count,
        "total_bytes": total_bytes,
    }

Pattern 9 : Gérer plusieurs ressources avec ExitStack

Gérez un nombre dynamique de ressources proprement.

from contextlib import ExitStack, AsyncExitStack
from pathlib import Path

def process_files(paths: list[Path]) -> list[str]:
    """Process multiple files with automatic cleanup."""
    results = []

    with ExitStack() as stack:
        # Open all files - they'll all be closed when block exits
        files = [stack.enter_context(open(p)) for p in paths]

        for f in files:
            results.append(f.read())

    return results

async def process_connections(hosts: list[str]) -> list[dict]:
    """Process multiple async connections."""
    results = []

    async with AsyncExitStack() as stack:
        connections = [
            await stack.enter_async_context(connect_to_host(host))
            for host in hosts
        ]

        for conn in connections:
            results.append(await conn.fetch_data())

    return results

Résumé des bonnes pratiques

  1. Utilisez toujours les gestionnaires de contexte - Pour toute ressource qui nécessite un nettoyage
  2. Nettoyez inconditionnellement - __exit__ s'exécute même en cas d'exception
  3. Ne supprimez pas inopinément - Retournez False sauf si la suppression est intentionnelle
  4. Utilisez @contextmanager - Pour les patterns de ressource simples
  5. Implémentez les deux protocoles - Supportez with et la gestion manuelle
  6. Utilisez ExitStack - Pour un nombre dynamique de ressources
  7. Accumulez efficacement - Liste + join, pas concaténation de chaînes
  8. Suivez les métriques - Le délai jusqu'au premier octet importe pour le streaming
  9. Documentez le comportement - En particulier la suppression des exceptions
  10. Testez les chemins de nettoyage - Vérifiez que les ressources sont libérées en cas d'erreurs

Skills similaires