Motifs asynchrones Python
Guidance complète pour implémenter des applications Python asynchrones en utilisant asyncio, des motifs de programmation concurrente et async/await pour construire des systèmes haute performance et non-bloquants.
Quand utiliser cette compétence
- Construire des API web asynchrones (FastAPI, aiohttp, Sanic)
- Implémenter des opérations I/O concurrentes (base de données, fichier, réseau)
- Créer des web scrapers avec requêtes concurrentes
- Développer des applications temps réel (serveurs WebSocket, systèmes de chat)
- Traiter plusieurs tâches indépendantes simultanément
- Construire des microservices avec communication asynchrone
- Optimiser les charges de travail liées à l'I/O
- Implémenter des tâches de fond asynchrones et des files d'attente
Guide de décision Sync vs Async
Avant d'adopter async, considérez si c'est le bon choix pour votre cas d'usage.
| Cas d'usage | Approche recommandée |
|---|---|
| Beaucoup d'appels réseau/DB concurrents | asyncio |
| Calcul lié au CPU | multiprocessing ou thread pool |
| I/O mixte + CPU | Déporter le travail CPU avec asyncio.to_thread() |
| Scripts simples, peu de connexions | Sync (plus simple, plus facile à déboguer) |
| API web avec haute concurrence | Frameworks asynchrones (FastAPI, aiohttp) |
Règle clé : Restez entièrement sync ou entièrement async dans un chemin d'appel. Le mélange crée des blocages cachés et de la complexité.
Concepts fondamentaux
1. Event Loop
L'event loop est le cœur d'asyncio, gérant et ordonnançant les tâches asynchrones.
Caractéristiques clés :
- Multitâche coopératif mono-thread
- Ordonnance les coroutines pour l'exécution
- Gère les opérations I/O sans blocage
- Gère les callbacks et futures
2. Coroutines
Fonctions définies avec async def qui peuvent être suspendues et reprises.
Syntaxe :
async def my_coroutine():
result = await some_async_operation()
return result
3. Tasks
Coroutines ordonnancées qui s'exécutent de manière concurrente sur l'event loop.
4. Futures
Objets bas niveau représentant les résultats éventuels d'opérations asynchrones.
5. Async Context Managers
Ressources qui supportent async with pour un nettoyage approprié.
6. Async Iterators
Objets qui supportent async for pour itérer sur des sources de données asynchrones.
Démarrage rapide
import asyncio
async def main():
print("Hello")
await asyncio.sleep(1)
print("World")
# Python 3.7+
asyncio.run(main())
Motifs fondamentaux
Motif 1 : Async/Await basique
import asyncio
async def fetch_data(url: str) -> dict:
"""Récupérer les données d'une URL de manière asynchrone."""
await asyncio.sleep(1) # Simuler l'I/O
return {"url": url, "data": "result"}
async def main():
result = await fetch_data("https://api.example.com")
print(result)
asyncio.run(main())
Motif 2 : Exécution concurrente avec gather()
import asyncio
from typing import List
async def fetch_user(user_id: int) -> dict:
"""Récupérer les données utilisateur."""
await asyncio.sleep(0.5)
return {"id": user_id, "name": f"User {user_id}"}
async def fetch_all_users(user_ids: List[int]) -> List[dict]:
"""Récupérer plusieurs utilisateurs de manière concurrente."""
tasks = [fetch_user(uid) for uid in user_ids]
results = await asyncio.gather(*tasks)
return results
async def main():
user_ids = [1, 2, 3, 4, 5]
users = await fetch_all_users(user_ids)
print(f"Fetched {len(users)} users")
asyncio.run(main())
Motif 3 : Création et gestion de tâches
import asyncio
async def background_task(name: str, delay: int):
"""Tâche de fond longue durée."""
print(f"{name} started")
await asyncio.sleep(delay)
print(f"{name} completed")
return f"Result from {name}"
async def main():
# Créer les tâches
task1 = asyncio.create_task(background_task("Task 1", 2))
task2 = asyncio.create_task(background_task("Task 2", 1))
# Faire autre chose
print("Main: doing other work")
await asyncio.sleep(0.5)
# Attendre les tâches
result1 = await task1
result2 = await task2
print(f"Results: {result1}, {result2}")
asyncio.run(main())
Motif 4 : Gestion d'erreurs dans le code asynchrone
import asyncio
from typing import List, Optional
async def risky_operation(item_id: int) -> dict:
"""Opération qui pourrait échouer."""
await asyncio.sleep(0.1)
if item_id % 3 == 0:
raise ValueError(f"Item {item_id} failed")
return {"id": item_id, "status": "success"}
async def safe_operation(item_id: int) -> Optional[dict]:
"""Wrapper avec gestion d'erreurs."""
try:
return await risky_operation(item_id)
except ValueError as e:
print(f"Error: {e}")
return None
async def process_items(item_ids: List[int]):
"""Traiter plusieurs éléments avec gestion d'erreurs."""
tasks = [safe_operation(iid) for iid in item_ids]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Filtrer les échecs
successful = [r for r in results if r is not None and not isinstance(r, Exception)]
failed = [r for r in results if isinstance(r, Exception)]
print(f"Success: {len(successful)}, Failed: {len(failed)}")
return successful
asyncio.run(process_items([1, 2, 3, 4, 5, 6]))
Motif 5 : Gestion des délais
import asyncio
async def slow_operation(delay: int) -> str:
"""Opération qui prend du temps."""
await asyncio.sleep(delay)
return f"Completed after {delay}s"
async def with_timeout():
"""Exécuter l'opération avec timeout."""
try:
result = await asyncio.wait_for(slow_operation(5), timeout=2.0)
print(result)
except asyncio.TimeoutError:
print("Operation timed out")
asyncio.run(with_timeout())
Motifs avancés
Motif 6 : Async Context Managers
import asyncio
from typing import Optional
class AsyncDatabaseConnection:
"""Context manager de connexion à base de données asynchrone."""
def __init__(self, dsn: str):
self.dsn = dsn
self.connection: Optional[object] = None
async def __aenter__(self):
print("Opening connection")
await asyncio.sleep(0.1) # Simuler la connexion
self.connection = {"dsn": self.dsn, "connected": True}
return self.connection
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("Closing connection")
await asyncio.sleep(0.1) # Simuler le nettoyage
self.connection = None
async def query_database():
"""Utiliser un context manager asynchrone."""
async with AsyncDatabaseConnection("postgresql://localhost") as conn:
print(f"Using connection: {conn}")
await asyncio.sleep(0.2) # Simuler la requête
return {"rows": 10}
asyncio.run(query_database())
Motif 7 : Async Iterators et Generators
import asyncio
from typing import AsyncIterator
async def async_range(start: int, end: int, delay: float = 0.1) -> AsyncIterator[int]:
"""Générateur asynchrone qui cède des nombres avec délai."""
for i in range(start, end):
await asyncio.sleep(delay)
yield i
async def fetch_pages(url: str, max_pages: int) -> AsyncIterator[dict]:
"""Récupérer les données paginées de manière asynchrone."""
for page in range(1, max_pages + 1):
await asyncio.sleep(0.2) # Simuler l'appel API
yield {
"page": page,
"url": f"{url}?page={page}",
"data": [f"item_{page}_{i}" for i in range(5)]
}
async def consume_async_iterator():
"""Consommer un async iterator."""
async for number in async_range(1, 5):
print(f"Number: {number}")
print("\nFetching pages:")
async for page_data in fetch_pages("https://api.example.com/items", 3):
print(f"Page {page_data['page']}: {len(page_data['data'])} items")
asyncio.run(consume_async_iterator())
Motif 8 : Motif Producteur-Consommateur
import asyncio
from asyncio import Queue
from typing import Optional
async def producer(queue: Queue, producer_id: int, num_items: int):
"""Produire des éléments et les mettre dans la file d'attente."""
for i in range(num_items):
item = f"Item-{producer_id}-{i}"
await queue.put(item)
print(f"Producer {producer_id} produced: {item}")
await asyncio.sleep(0.1)
await queue.put(None) # Signal de fin
async def consumer(queue: Queue, consumer_id: int):
"""Consommer des éléments de la file d'attente."""
while True:
item = await queue.get()
if item is None:
queue.task_done()
break
print(f"Consumer {consumer_id} processing: {item}")
await asyncio.sleep(0.2) # Simuler le travail
queue.task_done()
async def producer_consumer_example():
"""Exécuter le motif producteur-consommateur."""
queue = Queue(maxsize=10)
# Créer les tâches
producers = [
asyncio.create_task(producer(queue, i, 5))
for i in range(2)
]
consumers = [
asyncio.create_task(consumer(queue, i))
for i in range(3)
]
# Attendre les producteurs
await asyncio.gather(*producers)
# Attendre que la file soit vide
await queue.join()
# Annuler les consommateurs
for c in consumers:
c.cancel()
asyncio.run(producer_consumer_example())
Motif 9 : Semaphore pour limiter le débit
import asyncio
from typing import List
async def api_call(url: str, semaphore: asyncio.Semaphore) -> dict:
"""Effectuer un appel API avec limitation du débit."""
async with semaphore:
print(f"Calling {url}")
await asyncio.sleep(0.5) # Simuler l'appel API
return {"url": url, "status": 200}
async def rate_limited_requests(urls: List[str], max_concurrent: int = 5):
"""Effectuer plusieurs requêtes avec limitation du débit."""
semaphore = asyncio.Semaphore(max_concurrent)
tasks = [api_call(url, semaphore) for url in urls]
results = await asyncio.gather(*tasks)
return results
async def main():
urls = [f"https://api.example.com/item/{i}" for i in range(20)]
results = await rate_limited_requests(urls, max_concurrent=3)
print(f"Completed {len(results)} requests")
asyncio.run(main())
Motif 10 : Async Locks et Synchronisation
import asyncio
class AsyncCounter:
"""Compteur asynchrone thread-safe."""
def __init__(self):
self.value = 0
self.lock = asyncio.Lock()
async def increment(self):
"""Incrémenter le compteur de manière sûre."""
async with self.lock:
current = self.value
await asyncio.sleep(0.01) # Simuler le travail
self.value = current + 1
async def get_value(self) -> int:
"""Obtenir la valeur actuelle."""
async with self.lock:
return self.value
async def worker(counter: AsyncCounter, worker_id: int):
"""Worker qui incrémente le compteur."""
for _ in range(10):
await counter.increment()
print(f"Worker {worker_id} incremented")
async def test_counter():
"""Tester le compteur concurrent."""
counter = AsyncCounter()
workers = [asyncio.create_task(worker(counter, i)) for i in range(5)]
await asyncio.gather(*workers)
final_value = await counter.get_value()
print(f"Final counter value: {final_value}")
asyncio.run(test_counter())
Applications réelles
Web Scraping avec aiohttp
import asyncio
import aiohttp
from typing import List, Dict
async def fetch_url(session: aiohttp.ClientSession, url: str) -> Dict:
"""Récupérer une seule URL."""
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
text = await response.text()
return {
"url": url,
"status": response.status,
"length": len(text)
}
except Exception as e:
return {"url": url, "error": str(e)}
async def scrape_urls(urls: List[str]) -> List[Dict]:
"""Scraper plusieurs URLs de manière concurrente."""
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
async def main():
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/status/404",
]
results = await scrape_urls(urls)
for result in results:
print(result)
asyncio.run(main())
Opérations asynchrones sur base de données
import asyncio
from typing import List, Optional
# Client de base de données asynchrone simulé
class AsyncDB:
"""Base de données asynchrone simulée."""
async def execute(self, query: str) -> List[dict]:
"""Exécuter une requête."""
await asyncio.sleep(0.1)
return [{"id": 1, "name": "Example"}]
async def fetch_one(self, query: str) -> Optional[dict]:
"""Récupérer une seule ligne."""
await asyncio.sleep(0.1)
return {"id": 1, "name": "Example"}
async def get_user_data(db: AsyncDB, user_id: int) -> dict:
"""Récupérer l'utilisateur et les données associées de manière concurrente."""
user_task = db.fetch_one(f"SELECT * FROM users WHERE id = {user_id}")
orders_task = db.execute(f"SELECT * FROM orders WHERE user_id = {user_id}")
profile_task = db.fetch_one(f"SELECT * FROM profiles WHERE user_id = {user_id}")
user, orders, profile = await asyncio.gather(user_task, orders_task, profile_task)
return {
"user": user,
"orders": orders,
"profile": profile
}
async def main():
db = AsyncDB()
user_data = await get_user_data(db, 1)
print(user_data)
asyncio.run(main())
Serveur WebSocket
import asyncio
from typing import Set
# Connexion WebSocket simulée
class WebSocket:
"""WebSocket simulé."""
def __init__(self, client_id: str):
self.client_id = client_id
async def send(self, message: str):
"""Envoyer un message."""
print(f"Sending to {self.client_id}: {message}")
await asyncio.sleep(0.01)
async def recv(self) -> str:
"""Recevoir un message."""
await asyncio.sleep(1)
return f"Message from {self.client_id}"
class WebSocketServer:
"""Serveur WebSocket simple."""
def __init__(self):
self.clients: Set[WebSocket] = set()
async def register(self, websocket: WebSocket):
"""Enregistrer un nouveau client."""
self.clients.add(websocket)
print(f"Client {websocket.client_id} connected")
async def unregister(self, websocket: WebSocket):
"""Désenregistrer un client."""
self.clients.remove(websocket)
print(f"Client {websocket.client_id} disconnected")
async def broadcast(self, message: str):
"""Diffuser un message à tous les clients."""
if self.clients:
tasks = [client.send(message) for client in self.clients]
await asyncio.gather(*tasks)
async def handle_client(self, websocket: WebSocket):
"""Gérer une connexion client individuelle."""
await self.register(websocket)
try:
async for message in self.message_iterator(websocket):
await self.broadcast(f"{websocket.client_id}: {message}")
finally:
await self.unregister(websocket)
async def message_iterator(self, websocket: WebSocket):
"""Itérer sur les messages du client."""
for _ in range(3): # Simuler 3 messages
yield await websocket.recv()
Meilleures pratiques de performance
1. Utiliser des pools de connexion
import asyncio
import aiohttp
async def with_connection_pool():
"""Utiliser un pool de connexion pour l'efficacité."""
connector = aiohttp.TCPConnector(limit=100, limit_per_host=10)
async with aiohttp.ClientSession(connector=connector) as session:
tasks = [session.get(f"https://api.example.com/item/{i}") for i in range(50)]
responses = await asyncio.gather(*tasks)
return responses
2. Opérations par lot
async def batch_process(items: List[str], batch_size: int = 10):
"""Traiter les éléments par lot."""
for i in range(0, len(items), batch_size):
batch = items[i:i + batch_size]
tasks = [process_item(item) for item in batch]
await asyncio.gather(*tasks)
print(f"Processed batch {i // batch_size + 1}")
async def process_item(item: str):
"""Traiter un élément unique."""
await asyncio.sleep(0.1)
return f"Processed: {item}"
3. Éviter les opérations de blocage
Ne bloquez jamais l'event loop avec des opérations synchrones. Un seul appel de blocage arrête toutes les tâches concurrentes.
# MAUVAIS - bloque l'event loop entière
async def fetch_data_bad():
import time
import requests
time.sleep(1) # Bloque!
response = requests.get(url) # Bloque aussi!
# BON - utiliser des bibliothèques asynchrones natives (par ex. httpx pour HTTP async)
import httpx
async def fetch_data_good(url: str):
await asyncio.sleep(1)
async with httpx.AsyncClient() as client:
response = await client.get(url)
Encapsuler le code bloquant avec asyncio.to_thread() (Python 3.9+) :
Quand vous devez utiliser des bibliothèques synchrones, déporter vers un thread pool:
import asyncio
from pathlib import Path
async def read_file_async(path: str) -> str:
"""Lire un fichier sans bloquer l'event loop."""
# asyncio.to_thread() exécute le code sync dans un thread pool
return await asyncio.to_thread(Path(path).read_text)
async def call_sync_library(data: dict) -> dict:
"""Encapsuler un appel de bibliothèque synchrone."""
# Utile pour les drivers de base de données sync, l'I/O fichier, le travail CPU
return await asyncio.to_thread(sync_library.process, data)
Approche bas niveau avec run_in_executor() :
import asyncio
import concurrent.futures
from typing import Any
def blocking_operation(data: Any) -> Any:
"""Opération bloquante intensive en CPU."""
import time
time.sleep(1)
return data * 2
async def run_in_executor(data: Any) -> Any:
"""Exécuter l'opération bloquante dans un thread pool."""
loop = asyncio.get_running_loop()
with concurrent.futures.ThreadPoolExecutor() as pool:
result = await loop.run_in_executor(pool, blocking_operation, data)
return result
async def main():
results = await asyncio.gather(*[run_in_executor(i) for i in range(5)])
print(results)
asyncio.run(main())
Pièges courants
1. Oublier await
# Mauvais - retourne un objet coroutine, ne s'exécute pas
result = async_function()
# Correct
result = await async_function()
2. Bloquer l'Event Loop
# Mauvais - bloque l'event loop
import time
async def bad():
time.sleep(1) # Bloque!
# Correct
async def good():
await asyncio.sleep(1) # Non-bloquant
3. Ne pas gérer l'annulation
async def cancelable_task():
"""Tâche qui gère l'annulation."""
try:
while True:
await asyncio.sleep(1)
print("Working...")
except asyncio.CancelledError:
print("Task cancelled, cleaning up...")
# Effectuer le nettoyage
raise # Re-lever pour propager l'annulation
4. Mélanger le code sync et async
# Mauvais - ne peut pas appeler async depuis sync directement
def sync_function():
result = await async_function() # SyntaxError!
# Correct
def sync_function():
result = asyncio.run(async_function())
Tester le code asynchrone
import asyncio
import pytest
# Utiliser pytest-asyncio
@pytest.mark.asyncio
async def test_async_function():
"""Tester une fonction asynchrone."""
result = await fetch_data("https://api.example.com")
assert result is not None
@pytest.mark.asyncio
async def test_with_timeout():
"""Tester avec timeout."""
with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(slow_operation(5), timeout=1.0)