Dask
Vue d'ensemble
Dask est une bibliothèque Python pour le calcul parallèle et distribué qui active trois capacités critiques :
- Exécution dépassant la mémoire disponible sur des machines uniques pour les données excédant la RAM disponible
- Traitement parallèle pour améliorer la vitesse de calcul sur plusieurs cœurs
- Calcul distribué supportant des datasets à l'échelle du téraoctet sur plusieurs machines
Dask se met à l'échelle depuis les ordinateurs portables (traitement ~100 GiO) jusqu'aux clusters (traitement ~100 Tio) tout en conservant des APIs Python familières.
Quand utiliser cette skill
Cette skill doit être utilisée quand :
- Traiter des datasets qui dépassent la RAM disponible
- Mettre à l'échelle les opérations pandas ou NumPy sur des datasets plus grands
- Paralléliser les calculs pour améliorer les performances
- Traiter efficacement plusieurs fichiers (CSV, Parquet, JSON, journaux texte)
- Construire des workflows parallèles personnalisés avec dépendances entre tâches
- Distribuer les charges de travail sur plusieurs cœurs ou machines
Capacités principales
Dask fournit cinq composants principaux, chacun adapté à différents cas d'usage :
1. DataFrames - Opérations Pandas parallèles
Objectif : Mettre à l'échelle les opérations pandas sur des datasets plus grands via le traitement parallèle.
Quand utiliser :
- Les données tabulaires dépassent la RAM disponible
- Besoin de traiter plusieurs fichiers CSV/Parquet ensemble
- Les opérations pandas sont lentes et nécessitent une parallélisation
- Passer d'un prototype pandas à la production
Documentation de référence : Pour des conseils complets sur les DataFrames Dask, consultez references/dataframes.md qui inclut :
- Lecture des données (fichiers uniques, multiples, patterns glob)
- Opérations courantes (filtrage, groupby, jointures, agrégations)
- Opérations personnalisées avec
map_partitions - Conseils d'optimisation des performances
- Motifs courants (ETL, séries temporelles, traitement multi-fichiers)
Exemple rapide :
import dask.dataframe as dd
# Lire plusieurs fichiers comme un seul DataFrame
ddf = dd.read_csv('data/2024-*.csv')
# Les opérations sont lazy jusqu'à l'appel de compute()
filtered = ddf[ddf['value'] > 100]
result = filtered.groupby('category').mean().compute()
Points clés :
- Les opérations sont lazy (construisent un graphe de tâches) jusqu'à l'appel de
.compute() - Utiliser
map_partitionspour des opérations personnalisées efficaces - Convertir en DataFrame tôt quand on travaille avec des données structurées provenant d'autres sources
2. Arrays - Opérations NumPy parallèles
Objectif : Étendre les capacités de NumPy aux datasets plus grands que la mémoire en utilisant des algorithmes par blocs.
Quand utiliser :
- Les arrays dépassent la RAM disponible
- Les opérations NumPy nécessitent une parallélisation
- Travailler avec des datasets scientifiques (HDF5, Zarr, NetCDF)
- Besoin d'algèbre linéaire parallèle ou d'opérations sur arrays
Documentation de référence : Pour des conseils complets sur les Arrays Dask, consultez references/arrays.md qui inclut :
- Création d'arrays (à partir de NumPy, aléatoires, depuis le disque)
- Stratégies de chunking et optimisation
- Opérations courantes (arithmétique, réductions, algèbre linéaire)
- Opérations personnalisées avec
map_blocks - Intégration avec HDF5, Zarr et XArray
Exemple rapide :
import dask.array as da
# Créer un grand array avec chunks
x = da.random.random((100000, 100000), chunks=(10000, 10000))
# Les opérations sont lazy
y = x + 100
z = y.mean(axis=0)
# Calculer le résultat
result = z.compute()
Points clés :
- La taille des chunks est critique (viser ~100 Mo par chunk)
- Les opérations travaillent sur les chunks en parallèle
- Réchunker les données quand nécessaire pour des opérations efficaces
- Utiliser
map_blockspour les opérations non disponibles dans Dask
3. Bags - Traitement parallèle de données non structurées
Objectif : Traiter des données non structurées ou semi-structurées (texte, JSON, journaux) avec des opérations fonctionnelles.
Quand utiliser :
- Traiter des fichiers texte, journaux ou enregistrements JSON
- Nettoyage de données et ETL avant analyse structurée
- Travailler avec des objets Python qui ne correspondent pas aux formats array/dataframe
- Besoin de traitement streaming efficace en mémoire
Documentation de référence : Pour des conseils complets sur les Bags Dask, consultez references/bags.md qui inclut :
- Lecture de fichiers texte et JSON
- Opérations fonctionnelles (map, filter, fold, groupby)
- Conversion en DataFrames
- Motifs courants (analyse de journaux, traitement JSON, traitement de texte)
- Considérations de performance
Exemple rapide :
import dask.bag as db
import json
# Lire et parser les fichiers JSON
bag = db.read_text('logs/*.json').map(json.loads)
# Filtrer et transformer
valid = bag.filter(lambda x: x['status'] == 'valid')
processed = valid.map(lambda x: {'id': x['id'], 'value': x['value']})
# Convertir en DataFrame pour analyse
ddf = processed.to_dataframe()
Points clés :
- Utiliser pour le nettoyage initial des données, puis convertir en DataFrame/Array
- Utiliser
foldbyau lieu degroupbypour de meilleures performances - Les opérations sont streaming et efficaces en mémoire
- Convertir en formats structurés (DataFrame) pour des opérations complexes
4. Futures - Parallélisation basée sur les tâches
Objectif : Construire des workflows parallèles personnalisés avec un contrôle fin sur l'exécution des tâches et leurs dépendances.
Quand utiliser :
- Construire des workflows dynamiques, évolutifs
- Besoin d'exécution immédiate des tâches (pas lazy)
- Les calculs dépendent de conditions à l'exécution
- Implémenter des algorithmes parallèles personnalisés
- Besoin de calculs avec état
Documentation de référence : Pour des conseils complets sur les Futures Dask, consultez references/futures.md qui inclut :
- Configuration du client distribué
- Soumission de tâches et travail avec les futures
- Dépendances entre tâches et mouvement des données
- Coordination avancée (queues, locks, events, actors)
- Motifs courants (balayage de paramètres, tâches dynamiques, algorithmes itératifs)
Exemple rapide :
from dask.distributed import Client
client = Client() # Créer un cluster local
# Soumettre des tâches (exécution immédiate)
def process(x):
return x ** 2
futures = client.map(process, range(100))
# Récupérer les résultats
results = client.gather(futures)
client.close()
Points clés :
- Requiert un client distribué (même pour une machine unique)
- Les tâches s'exécutent immédiatement lors de la soumission
- Pré-scatter les grandes données pour éviter les transferts répétés
- ~1 ms de surcharge par tâche (pas approprié pour millions de petites tâches)
- Utiliser les actors pour les workflows avec état
5. Schedulers - Backends d'exécution
Objectif : Contrôler comment et où les tâches Dask s'exécutent (threads, processes, distribué).
Quand choisir un Scheduler :
- Threads (défaut) : Opérations NumPy/Pandas, bibliothèques libérant le GIL, bénéfice de la mémoire partagée
- Processes : Code Python pur, traitement de texte, opérations limitées par le GIL
- Synchronous : Débogage avec pdb, profiling, compréhension des erreurs
- Distributed : Besoin du dashboard, clusters multi-machines, fonctionnalités avancées
Documentation de référence : Pour des conseils complets sur les Schedulers Dask, consultez references/schedulers.md qui inclut :
- Descriptions et caractéristiques des schedulers détaillées
- Méthodes de configuration (globale, context manager, par-compute)
- Considérations de performance et surcharge
- Motifs courants et dépannage
- Configuration des threads pour des performances optimales
Exemple rapide :
import dask
import dask.dataframe as dd
# Utiliser threads pour DataFrame (défaut, bon pour numérique)
ddf = dd.read_csv('data.csv')
result1 = ddf.mean().compute() # Utilise threads
# Utiliser processes pour travail lourd en Python
import dask.bag as db
bag = db.read_text('logs/*.txt')
result2 = bag.map(python_function).compute(scheduler='processes')
# Utiliser synchronous pour déboguer
dask.config.set(scheduler='synchronous')
result3 = problematic_computation.compute() # Peut utiliser pdb
# Utiliser distributed pour monitoring et scaling
from dask.distributed import Client
client = Client()
result4 = computation.compute() # Utilise distributed avec dashboard
Points clés :
- Threads : Surcharge minimale (~10 µs/tâche), meilleur pour travail numérique
- Processes : Évite le GIL (~10 ms/tâche), meilleur pour travail Python
- Distributed : Dashboard de monitoring (~1 ms/tâche), se met à l'échelle aux clusters
- Peut changer de scheduler par calcul ou globalement
Bonnes pratiques
Pour des conseils complets sur l'optimisation des performances, les stratégies de gestion mémoire et les pièges courants à éviter, consultez references/best-practices.md. Les principes clés incluent :
Commencer par des solutions plus simples
Avant d'utiliser Dask, explorez :
- De meilleurs algorithmes
- Des formats de fichiers efficaces (Parquet au lieu de CSV)
- Du code compilé (Numba, Cython)
- L'échantillonnage des données
Règles de performance critiques
1. Ne pas charger les données localement puis les passer à Dask
# Mauvais : Charge toutes les données en mémoire d'abord
import pandas as pd
df = pd.read_csv('large.csv')
ddf = dd.from_pandas(df, npartitions=10)
# Correct : Laisser Dask gérer le chargement
import dask.dataframe as dd
ddf = dd.read_csv('large.csv')
2. Éviter les appels compute() répétés
# Mauvais : Chaque compute est séparé
for item in items:
result = dask_computation(item).compute()
# Correct : Un seul compute pour tous
computations = [dask_computation(item) for item in items]
results = dask.compute(*computations)
3. Ne pas construire des graphes de tâches excessivement grands
- Augmenter les tailles de chunks si millions de tâches
- Utiliser
map_partitions/map_blockspour fusionner les opérations - Vérifier la taille du graphe de tâches :
len(ddf.__dask_graph__())
4. Choisir des tailles de chunks appropriées
- Cible : ~100 Mo par chunk (ou 10 chunks par cœur dans la mémoire du worker)
- Trop grand : Débordement mémoire
- Trop petit : Surcharge de scheduling
5. Utiliser le Dashboard
from dask.distributed import Client
client = Client()
print(client.dashboard_link) # Monitorer les performances, identifier les goulots
Motifs de workflow courants
Pipeline ETL
import dask.dataframe as dd
# Extract : Lire les données
ddf = dd.read_csv('raw_data/*.csv')
# Transform : Nettoyer et traiter
ddf = ddf[ddf['status'] == 'valid']
ddf['amount'] = ddf['amount'].astype('float64')
ddf = ddf.dropna(subset=['important_col'])
# Load : Agréger et sauvegarder
summary = ddf.groupby('category').agg({'amount': ['sum', 'mean']})
summary.to_parquet('output/summary.parquet')
Pipeline non-structuré vers structuré
import dask.bag as db
import json
# Commencer avec Bag pour données non-structurées
bag = db.read_text('logs/*.json').map(json.loads)
bag = bag.filter(lambda x: x['status'] == 'valid')
# Convertir en DataFrame pour analyse structurée
ddf = bag.to_dataframe()
result = ddf.groupby('category').mean().compute()
Calcul sur array à grande échelle
import dask.array as da
# Charger ou créer un grand array
x = da.from_zarr('large_dataset.zarr')
# Traiter par chunks
normalized = (x - x.mean()) / x.std()
# Sauvegarder le résultat
da.to_zarr(normalized, 'normalized.zarr')
Workflow parallèle personnalisé
from dask.distributed import Client
client = Client()
# Scatter le grand dataset une fois
data = client.scatter(large_dataset)
# Traiter en parallèle avec dépendances
futures = []
for param in parameters:
future = client.submit(process, data, param)
futures.append(future)
# Récupérer les résultats
results = client.gather(futures)
Sélectionner le bon composant
Utilisez ce guide de décision pour choisir le composant Dask approprié :
Type de données :
- Données tabulaires → DataFrames
- Arrays numériques → Arrays
- Texte/JSON/journaux → Bags (puis convertir en DataFrame)
- Objets Python personnalisés → Bags ou Futures
Type d'opération :
- Opérations pandas standard → DataFrames
- Opérations NumPy standard → Arrays
- Tâches parallèles personnalisées → Futures
- Traitement texte/ETL → Bags
Niveau de contrôle :
- Haut-niveau, automatique → DataFrames/Arrays
- Bas-niveau, manuel → Futures
Type de workflow :
- Graphe de calcul statique → DataFrames/Arrays/Bags
- Dynamique, évolutif → Futures
Considérations d'intégration
Formats de fichiers
- Efficaces : Parquet, HDF5, Zarr (colonnaires, compressés, parallèles)
- Compatibles mais plus lents : CSV (utiliser pour l'ingestion initiale uniquement)
- Pour Arrays : HDF5, Zarr, NetCDF
Conversion entre collections
# Bag → DataFrame
ddf = bag.to_dataframe()
# DataFrame → Array (pour données numériques)
arr = ddf.to_dask_array(lengths=True)
# Array → DataFrame
ddf = dd.from_dask_array(arr, columns=['col1', 'col2'])
Avec d'autres bibliothèques
- XArray : Enveloppe les arrays Dask avec dimensions étiquetées (géospatial, imaging)
- Dask-ML : Machine learning avec APIs compatibles scikit-learn
- Distributed : Gestion de clusters avancée et monitoring
Débogage et développement
Workflow de développement itératif
-
Tester sur petites données avec scheduler synchronous :
dask.config.set(scheduler='synchronous') result = computation.compute() # Peut utiliser pdb, débogage facile -
Valider avec threads sur un échantillon :
sample = ddf.head(1000) # Petit échantillon # Tester la logique, puis mettre à l'échelle au dataset complet -
Mettre à l'échelle avec distributed pour monitoring :
from dask.distributed import Client client = Client() print(client.dashboard_link) # Monitorer les performances result = computation.compute()
Problèmes courants
Erreurs mémoire :
- Réduire les tailles de chunks
- Utiliser
persist()stratégiquement et supprimer après utilisation - Vérifier les fuites mémoire dans les fonctions personnalisées
Démarrage lent :
- Graphe de tâches trop grand (augmenter les tailles de chunks)
- Utiliser
map_partitionsoumap_blockspour réduire les tâches
Parallélisation faible :
- Chunks trop grands (augmenter le nombre de partitions)
- Utiliser threads avec code Python (passer à processes)
- Dépendances de données empêchant le parallélisme
Fichiers de référence
Tous les fichiers de documentation de référence peuvent être lus selon les besoins pour des informations détaillées :
references/dataframes.md- Guide complet Dask DataFramereferences/arrays.md- Guide complet Dask Arrayreferences/bags.md- Guide complet Dask Bagreferences/futures.md- Guide complet Dask Futures et calcul distribuéreferences/schedulers.md- Guide complet de sélection et configuration des schedulersreferences/best-practices.md- Optimisation des performances et dépannage complets
Charger ces fichiers quand les utilisateurs ont besoin d'informations détaillées sur des composants, opérations ou motifs Dask spécifiques au-delà des conseils rapides fournis ici.