async-python-patterns

Par wshobson · agents

Maîtrisez Python asyncio, la programmation concurrente et les patterns async/await pour des applications haute performance. À utiliser lors de la création d'APIs async, de systèmes concurrents ou d'applications I/O-bound nécessitant des opérations non bloquantes.

npx skills add https://github.com/wshobson/agents --skill async-python-patterns

Motifs asynchrones Python

Guidance complète pour implémenter des applications Python asynchrones en utilisant asyncio, des motifs de programmation concurrente et async/await pour construire des systèmes haute performance et non-bloquants.

Quand utiliser cette compétence

  • Construire des API web asynchrones (FastAPI, aiohttp, Sanic)
  • Implémenter des opérations I/O concurrentes (base de données, fichier, réseau)
  • Créer des web scrapers avec requêtes concurrentes
  • Développer des applications temps réel (serveurs WebSocket, systèmes de chat)
  • Traiter plusieurs tâches indépendantes simultanément
  • Construire des microservices avec communication asynchrone
  • Optimiser les charges de travail liées à l'I/O
  • Implémenter des tâches de fond asynchrones et des files d'attente

Guide de décision Sync vs Async

Avant d'adopter async, considérez si c'est le bon choix pour votre cas d'usage.

Cas d'usage Approche recommandée
Beaucoup d'appels réseau/DB concurrents asyncio
Calcul lié au CPU multiprocessing ou thread pool
I/O mixte + CPU Déporter le travail CPU avec asyncio.to_thread()
Scripts simples, peu de connexions Sync (plus simple, plus facile à déboguer)
API web avec haute concurrence Frameworks asynchrones (FastAPI, aiohttp)

Règle clé : Restez entièrement sync ou entièrement async dans un chemin d'appel. Le mélange crée des blocages cachés et de la complexité.

Concepts fondamentaux

1. Event Loop

L'event loop est le cœur d'asyncio, gérant et ordonnançant les tâches asynchrones.

Caractéristiques clés :

  • Multitâche coopératif mono-thread
  • Ordonnance les coroutines pour l'exécution
  • Gère les opérations I/O sans blocage
  • Gère les callbacks et futures

2. Coroutines

Fonctions définies avec async def qui peuvent être suspendues et reprises.

Syntaxe :

async def my_coroutine():
    result = await some_async_operation()
    return result

3. Tasks

Coroutines ordonnancées qui s'exécutent de manière concurrente sur l'event loop.

4. Futures

Objets bas niveau représentant les résultats éventuels d'opérations asynchrones.

5. Async Context Managers

Ressources qui supportent async with pour un nettoyage approprié.

6. Async Iterators

Objets qui supportent async for pour itérer sur des sources de données asynchrones.

Démarrage rapide

import asyncio

async def main():
    print("Hello")
    await asyncio.sleep(1)
    print("World")

# Python 3.7+
asyncio.run(main())

Motifs fondamentaux

Motif 1 : Async/Await basique

import asyncio

async def fetch_data(url: str) -> dict:
    """Récupérer les données d'une URL de manière asynchrone."""
    await asyncio.sleep(1)  # Simuler l'I/O
    return {"url": url, "data": "result"}

async def main():
    result = await fetch_data("https://api.example.com")
    print(result)

asyncio.run(main())

Motif 2 : Exécution concurrente avec gather()

import asyncio
from typing import List

async def fetch_user(user_id: int) -> dict:
    """Récupérer les données utilisateur."""
    await asyncio.sleep(0.5)
    return {"id": user_id, "name": f"User {user_id}"}

async def fetch_all_users(user_ids: List[int]) -> List[dict]:
    """Récupérer plusieurs utilisateurs de manière concurrente."""
    tasks = [fetch_user(uid) for uid in user_ids]
    results = await asyncio.gather(*tasks)
    return results

async def main():
    user_ids = [1, 2, 3, 4, 5]
    users = await fetch_all_users(user_ids)
    print(f"Fetched {len(users)} users")

asyncio.run(main())

Motif 3 : Création et gestion de tâches

import asyncio

async def background_task(name: str, delay: int):
    """Tâche de fond longue durée."""
    print(f"{name} started")
    await asyncio.sleep(delay)
    print(f"{name} completed")
    return f"Result from {name}"

async def main():
    # Créer les tâches
    task1 = asyncio.create_task(background_task("Task 1", 2))
    task2 = asyncio.create_task(background_task("Task 2", 1))

    # Faire autre chose
    print("Main: doing other work")
    await asyncio.sleep(0.5)

    # Attendre les tâches
    result1 = await task1
    result2 = await task2

    print(f"Results: {result1}, {result2}")

asyncio.run(main())

Motif 4 : Gestion d'erreurs dans le code asynchrone

import asyncio
from typing import List, Optional

async def risky_operation(item_id: int) -> dict:
    """Opération qui pourrait échouer."""
    await asyncio.sleep(0.1)
    if item_id % 3 == 0:
        raise ValueError(f"Item {item_id} failed")
    return {"id": item_id, "status": "success"}

async def safe_operation(item_id: int) -> Optional[dict]:
    """Wrapper avec gestion d'erreurs."""
    try:
        return await risky_operation(item_id)
    except ValueError as e:
        print(f"Error: {e}")
        return None

async def process_items(item_ids: List[int]):
    """Traiter plusieurs éléments avec gestion d'erreurs."""
    tasks = [safe_operation(iid) for iid in item_ids]
    results = await asyncio.gather(*tasks, return_exceptions=True)

    # Filtrer les échecs
    successful = [r for r in results if r is not None and not isinstance(r, Exception)]
    failed = [r for r in results if isinstance(r, Exception)]

    print(f"Success: {len(successful)}, Failed: {len(failed)}")
    return successful

asyncio.run(process_items([1, 2, 3, 4, 5, 6]))

Motif 5 : Gestion des délais

import asyncio

async def slow_operation(delay: int) -> str:
    """Opération qui prend du temps."""
    await asyncio.sleep(delay)
    return f"Completed after {delay}s"

async def with_timeout():
    """Exécuter l'opération avec timeout."""
    try:
        result = await asyncio.wait_for(slow_operation(5), timeout=2.0)
        print(result)
    except asyncio.TimeoutError:
        print("Operation timed out")

asyncio.run(with_timeout())

Motifs avancés

Motif 6 : Async Context Managers

import asyncio
from typing import Optional

class AsyncDatabaseConnection:
    """Context manager de connexion à base de données asynchrone."""

    def __init__(self, dsn: str):
        self.dsn = dsn
        self.connection: Optional[object] = None

    async def __aenter__(self):
        print("Opening connection")
        await asyncio.sleep(0.1)  # Simuler la connexion
        self.connection = {"dsn": self.dsn, "connected": True}
        return self.connection

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("Closing connection")
        await asyncio.sleep(0.1)  # Simuler le nettoyage
        self.connection = None

async def query_database():
    """Utiliser un context manager asynchrone."""
    async with AsyncDatabaseConnection("postgresql://localhost") as conn:
        print(f"Using connection: {conn}")
        await asyncio.sleep(0.2)  # Simuler la requête
        return {"rows": 10}

asyncio.run(query_database())

Motif 7 : Async Iterators et Generators

import asyncio
from typing import AsyncIterator

async def async_range(start: int, end: int, delay: float = 0.1) -> AsyncIterator[int]:
    """Générateur asynchrone qui cède des nombres avec délai."""
    for i in range(start, end):
        await asyncio.sleep(delay)
        yield i

async def fetch_pages(url: str, max_pages: int) -> AsyncIterator[dict]:
    """Récupérer les données paginées de manière asynchrone."""
    for page in range(1, max_pages + 1):
        await asyncio.sleep(0.2)  # Simuler l'appel API
        yield {
            "page": page,
            "url": f"{url}?page={page}",
            "data": [f"item_{page}_{i}" for i in range(5)]
        }

async def consume_async_iterator():
    """Consommer un async iterator."""
    async for number in async_range(1, 5):
        print(f"Number: {number}")

    print("\nFetching pages:")
    async for page_data in fetch_pages("https://api.example.com/items", 3):
        print(f"Page {page_data['page']}: {len(page_data['data'])} items")

asyncio.run(consume_async_iterator())

Motif 8 : Motif Producteur-Consommateur

import asyncio
from asyncio import Queue
from typing import Optional

async def producer(queue: Queue, producer_id: int, num_items: int):
    """Produire des éléments et les mettre dans la file d'attente."""
    for i in range(num_items):
        item = f"Item-{producer_id}-{i}"
        await queue.put(item)
        print(f"Producer {producer_id} produced: {item}")
        await asyncio.sleep(0.1)
    await queue.put(None)  # Signal de fin

async def consumer(queue: Queue, consumer_id: int):
    """Consommer des éléments de la file d'attente."""
    while True:
        item = await queue.get()
        if item is None:
            queue.task_done()
            break

        print(f"Consumer {consumer_id} processing: {item}")
        await asyncio.sleep(0.2)  # Simuler le travail
        queue.task_done()

async def producer_consumer_example():
    """Exécuter le motif producteur-consommateur."""
    queue = Queue(maxsize=10)

    # Créer les tâches
    producers = [
        asyncio.create_task(producer(queue, i, 5))
        for i in range(2)
    ]

    consumers = [
        asyncio.create_task(consumer(queue, i))
        for i in range(3)
    ]

    # Attendre les producteurs
    await asyncio.gather(*producers)

    # Attendre que la file soit vide
    await queue.join()

    # Annuler les consommateurs
    for c in consumers:
        c.cancel()

asyncio.run(producer_consumer_example())

Motif 9 : Semaphore pour limiter le débit

import asyncio
from typing import List

async def api_call(url: str, semaphore: asyncio.Semaphore) -> dict:
    """Effectuer un appel API avec limitation du débit."""
    async with semaphore:
        print(f"Calling {url}")
        await asyncio.sleep(0.5)  # Simuler l'appel API
        return {"url": url, "status": 200}

async def rate_limited_requests(urls: List[str], max_concurrent: int = 5):
    """Effectuer plusieurs requêtes avec limitation du débit."""
    semaphore = asyncio.Semaphore(max_concurrent)
    tasks = [api_call(url, semaphore) for url in urls]
    results = await asyncio.gather(*tasks)
    return results

async def main():
    urls = [f"https://api.example.com/item/{i}" for i in range(20)]
    results = await rate_limited_requests(urls, max_concurrent=3)
    print(f"Completed {len(results)} requests")

asyncio.run(main())

Motif 10 : Async Locks et Synchronisation

import asyncio

class AsyncCounter:
    """Compteur asynchrone thread-safe."""

    def __init__(self):
        self.value = 0
        self.lock = asyncio.Lock()

    async def increment(self):
        """Incrémenter le compteur de manière sûre."""
        async with self.lock:
            current = self.value
            await asyncio.sleep(0.01)  # Simuler le travail
            self.value = current + 1

    async def get_value(self) -> int:
        """Obtenir la valeur actuelle."""
        async with self.lock:
            return self.value

async def worker(counter: AsyncCounter, worker_id: int):
    """Worker qui incrémente le compteur."""
    for _ in range(10):
        await counter.increment()
        print(f"Worker {worker_id} incremented")

async def test_counter():
    """Tester le compteur concurrent."""
    counter = AsyncCounter()

    workers = [asyncio.create_task(worker(counter, i)) for i in range(5)]
    await asyncio.gather(*workers)

    final_value = await counter.get_value()
    print(f"Final counter value: {final_value}")

asyncio.run(test_counter())

Applications réelles

Web Scraping avec aiohttp

import asyncio
import aiohttp
from typing import List, Dict

async def fetch_url(session: aiohttp.ClientSession, url: str) -> Dict:
    """Récupérer une seule URL."""
    try:
        async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
            text = await response.text()
            return {
                "url": url,
                "status": response.status,
                "length": len(text)
            }
    except Exception as e:
        return {"url": url, "error": str(e)}

async def scrape_urls(urls: List[str]) -> List[Dict]:
    """Scraper plusieurs URLs de manière concurrente."""
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        return results

async def main():
    urls = [
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/2",
        "https://httpbin.org/status/404",
    ]

    results = await scrape_urls(urls)
    for result in results:
        print(result)

asyncio.run(main())

Opérations asynchrones sur base de données

import asyncio
from typing import List, Optional

# Client de base de données asynchrone simulé
class AsyncDB:
    """Base de données asynchrone simulée."""

    async def execute(self, query: str) -> List[dict]:
        """Exécuter une requête."""
        await asyncio.sleep(0.1)
        return [{"id": 1, "name": "Example"}]

    async def fetch_one(self, query: str) -> Optional[dict]:
        """Récupérer une seule ligne."""
        await asyncio.sleep(0.1)
        return {"id": 1, "name": "Example"}

async def get_user_data(db: AsyncDB, user_id: int) -> dict:
    """Récupérer l'utilisateur et les données associées de manière concurrente."""
    user_task = db.fetch_one(f"SELECT * FROM users WHERE id = {user_id}")
    orders_task = db.execute(f"SELECT * FROM orders WHERE user_id = {user_id}")
    profile_task = db.fetch_one(f"SELECT * FROM profiles WHERE user_id = {user_id}")

    user, orders, profile = await asyncio.gather(user_task, orders_task, profile_task)

    return {
        "user": user,
        "orders": orders,
        "profile": profile
    }

async def main():
    db = AsyncDB()
    user_data = await get_user_data(db, 1)
    print(user_data)

asyncio.run(main())

Serveur WebSocket

import asyncio
from typing import Set

# Connexion WebSocket simulée
class WebSocket:
    """WebSocket simulé."""

    def __init__(self, client_id: str):
        self.client_id = client_id

    async def send(self, message: str):
        """Envoyer un message."""
        print(f"Sending to {self.client_id}: {message}")
        await asyncio.sleep(0.01)

    async def recv(self) -> str:
        """Recevoir un message."""
        await asyncio.sleep(1)
        return f"Message from {self.client_id}"

class WebSocketServer:
    """Serveur WebSocket simple."""

    def __init__(self):
        self.clients: Set[WebSocket] = set()

    async def register(self, websocket: WebSocket):
        """Enregistrer un nouveau client."""
        self.clients.add(websocket)
        print(f"Client {websocket.client_id} connected")

    async def unregister(self, websocket: WebSocket):
        """Désenregistrer un client."""
        self.clients.remove(websocket)
        print(f"Client {websocket.client_id} disconnected")

    async def broadcast(self, message: str):
        """Diffuser un message à tous les clients."""
        if self.clients:
            tasks = [client.send(message) for client in self.clients]
            await asyncio.gather(*tasks)

    async def handle_client(self, websocket: WebSocket):
        """Gérer une connexion client individuelle."""
        await self.register(websocket)
        try:
            async for message in self.message_iterator(websocket):
                await self.broadcast(f"{websocket.client_id}: {message}")
        finally:
            await self.unregister(websocket)

    async def message_iterator(self, websocket: WebSocket):
        """Itérer sur les messages du client."""
        for _ in range(3):  # Simuler 3 messages
            yield await websocket.recv()

Meilleures pratiques de performance

1. Utiliser des pools de connexion

import asyncio
import aiohttp

async def with_connection_pool():
    """Utiliser un pool de connexion pour l'efficacité."""
    connector = aiohttp.TCPConnector(limit=100, limit_per_host=10)

    async with aiohttp.ClientSession(connector=connector) as session:
        tasks = [session.get(f"https://api.example.com/item/{i}") for i in range(50)]
        responses = await asyncio.gather(*tasks)
        return responses

2. Opérations par lot

async def batch_process(items: List[str], batch_size: int = 10):
    """Traiter les éléments par lot."""
    for i in range(0, len(items), batch_size):
        batch = items[i:i + batch_size]
        tasks = [process_item(item) for item in batch]
        await asyncio.gather(*tasks)
        print(f"Processed batch {i // batch_size + 1}")

async def process_item(item: str):
    """Traiter un élément unique."""
    await asyncio.sleep(0.1)
    return f"Processed: {item}"

3. Éviter les opérations de blocage

Ne bloquez jamais l'event loop avec des opérations synchrones. Un seul appel de blocage arrête toutes les tâches concurrentes.

# MAUVAIS - bloque l'event loop entière
async def fetch_data_bad():
    import time
    import requests
    time.sleep(1)  # Bloque!
    response = requests.get(url)  # Bloque aussi!

# BON - utiliser des bibliothèques asynchrones natives (par ex. httpx pour HTTP async)
import httpx

async def fetch_data_good(url: str):
    await asyncio.sleep(1)
    async with httpx.AsyncClient() as client:
        response = await client.get(url)

Encapsuler le code bloquant avec asyncio.to_thread() (Python 3.9+) :

Quand vous devez utiliser des bibliothèques synchrones, déporter vers un thread pool:

import asyncio
from pathlib import Path

async def read_file_async(path: str) -> str:
    """Lire un fichier sans bloquer l'event loop."""
    # asyncio.to_thread() exécute le code sync dans un thread pool
    return await asyncio.to_thread(Path(path).read_text)

async def call_sync_library(data: dict) -> dict:
    """Encapsuler un appel de bibliothèque synchrone."""
    # Utile pour les drivers de base de données sync, l'I/O fichier, le travail CPU
    return await asyncio.to_thread(sync_library.process, data)

Approche bas niveau avec run_in_executor() :

import asyncio
import concurrent.futures
from typing import Any

def blocking_operation(data: Any) -> Any:
    """Opération bloquante intensive en CPU."""
    import time
    time.sleep(1)
    return data * 2

async def run_in_executor(data: Any) -> Any:
    """Exécuter l'opération bloquante dans un thread pool."""
    loop = asyncio.get_running_loop()
    with concurrent.futures.ThreadPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, blocking_operation, data)
        return result

async def main():
    results = await asyncio.gather(*[run_in_executor(i) for i in range(5)])
    print(results)

asyncio.run(main())

Pièges courants

1. Oublier await

# Mauvais - retourne un objet coroutine, ne s'exécute pas
result = async_function()

# Correct
result = await async_function()

2. Bloquer l'Event Loop

# Mauvais - bloque l'event loop
import time
async def bad():
    time.sleep(1)  # Bloque!

# Correct
async def good():
    await asyncio.sleep(1)  # Non-bloquant

3. Ne pas gérer l'annulation

async def cancelable_task():
    """Tâche qui gère l'annulation."""
    try:
        while True:
            await asyncio.sleep(1)
            print("Working...")
    except asyncio.CancelledError:
        print("Task cancelled, cleaning up...")
        # Effectuer le nettoyage
        raise  # Re-lever pour propager l'annulation

4. Mélanger le code sync et async

# Mauvais - ne peut pas appeler async depuis sync directement
def sync_function():
    result = await async_function()  # SyntaxError!

# Correct
def sync_function():
    result = asyncio.run(async_function())

Tester le code asynchrone

import asyncio
import pytest

# Utiliser pytest-asyncio
@pytest.mark.asyncio
async def test_async_function():
    """Tester une fonction asynchrone."""
    result = await fetch_data("https://api.example.com")
    assert result is not None

@pytest.mark.asyncio
async def test_with_timeout():
    """Tester avec timeout."""
    with pytest.raises(asyncio.TimeoutError):
        await asyncio.wait_for(slow_operation(5), timeout=1.0)

Skills similaires