dask

Par mkurman · zorai

Calcul distribué pour les workflows pandas/NumPy dépassant la capacité RAM. À utiliser lorsque vous devez faire évoluer du code pandas/NumPy existant au-delà de la mémoire disponible ou sur des clusters. Idéal pour le traitement de fichiers en parallèle, le ML distribué et l'intégration avec du code pandas existant. Pour de l'analytique out-of-core sur une machine unique, préférez vaex ; pour la performance en mémoire, utilisez polars.

npx skills add https://github.com/mkurman/zorai --skill dask

Dask

Vue d'ensemble

Dask est une bibliothèque Python pour le calcul parallèle et distribué qui active trois capacités critiques :

  • Exécution dépassant la mémoire disponible sur des machines uniques pour les données excédant la RAM disponible
  • Traitement parallèle pour améliorer la vitesse de calcul sur plusieurs cœurs
  • Calcul distribué supportant des datasets à l'échelle du téraoctet sur plusieurs machines

Dask se met à l'échelle depuis les ordinateurs portables (traitement ~100 GiO) jusqu'aux clusters (traitement ~100 Tio) tout en conservant des APIs Python familières.

Quand utiliser cette skill

Cette skill doit être utilisée quand :

  • Traiter des datasets qui dépassent la RAM disponible
  • Mettre à l'échelle les opérations pandas ou NumPy sur des datasets plus grands
  • Paralléliser les calculs pour améliorer les performances
  • Traiter efficacement plusieurs fichiers (CSV, Parquet, JSON, journaux texte)
  • Construire des workflows parallèles personnalisés avec dépendances entre tâches
  • Distribuer les charges de travail sur plusieurs cœurs ou machines

Capacités principales

Dask fournit cinq composants principaux, chacun adapté à différents cas d'usage :

1. DataFrames - Opérations Pandas parallèles

Objectif : Mettre à l'échelle les opérations pandas sur des datasets plus grands via le traitement parallèle.

Quand utiliser :

  • Les données tabulaires dépassent la RAM disponible
  • Besoin de traiter plusieurs fichiers CSV/Parquet ensemble
  • Les opérations pandas sont lentes et nécessitent une parallélisation
  • Passer d'un prototype pandas à la production

Documentation de référence : Pour des conseils complets sur les DataFrames Dask, consultez references/dataframes.md qui inclut :

  • Lecture des données (fichiers uniques, multiples, patterns glob)
  • Opérations courantes (filtrage, groupby, jointures, agrégations)
  • Opérations personnalisées avec map_partitions
  • Conseils d'optimisation des performances
  • Motifs courants (ETL, séries temporelles, traitement multi-fichiers)

Exemple rapide :

import dask.dataframe as dd

# Lire plusieurs fichiers comme un seul DataFrame
ddf = dd.read_csv('data/2024-*.csv')

# Les opérations sont lazy jusqu'à l'appel de compute()
filtered = ddf[ddf['value'] > 100]
result = filtered.groupby('category').mean().compute()

Points clés :

  • Les opérations sont lazy (construisent un graphe de tâches) jusqu'à l'appel de .compute()
  • Utiliser map_partitions pour des opérations personnalisées efficaces
  • Convertir en DataFrame tôt quand on travaille avec des données structurées provenant d'autres sources

2. Arrays - Opérations NumPy parallèles

Objectif : Étendre les capacités de NumPy aux datasets plus grands que la mémoire en utilisant des algorithmes par blocs.

Quand utiliser :

  • Les arrays dépassent la RAM disponible
  • Les opérations NumPy nécessitent une parallélisation
  • Travailler avec des datasets scientifiques (HDF5, Zarr, NetCDF)
  • Besoin d'algèbre linéaire parallèle ou d'opérations sur arrays

Documentation de référence : Pour des conseils complets sur les Arrays Dask, consultez references/arrays.md qui inclut :

  • Création d'arrays (à partir de NumPy, aléatoires, depuis le disque)
  • Stratégies de chunking et optimisation
  • Opérations courantes (arithmétique, réductions, algèbre linéaire)
  • Opérations personnalisées avec map_blocks
  • Intégration avec HDF5, Zarr et XArray

Exemple rapide :

import dask.array as da

# Créer un grand array avec chunks
x = da.random.random((100000, 100000), chunks=(10000, 10000))

# Les opérations sont lazy
y = x + 100
z = y.mean(axis=0)

# Calculer le résultat
result = z.compute()

Points clés :

  • La taille des chunks est critique (viser ~100 Mo par chunk)
  • Les opérations travaillent sur les chunks en parallèle
  • Réchunker les données quand nécessaire pour des opérations efficaces
  • Utiliser map_blocks pour les opérations non disponibles dans Dask

3. Bags - Traitement parallèle de données non structurées

Objectif : Traiter des données non structurées ou semi-structurées (texte, JSON, journaux) avec des opérations fonctionnelles.

Quand utiliser :

  • Traiter des fichiers texte, journaux ou enregistrements JSON
  • Nettoyage de données et ETL avant analyse structurée
  • Travailler avec des objets Python qui ne correspondent pas aux formats array/dataframe
  • Besoin de traitement streaming efficace en mémoire

Documentation de référence : Pour des conseils complets sur les Bags Dask, consultez references/bags.md qui inclut :

  • Lecture de fichiers texte et JSON
  • Opérations fonctionnelles (map, filter, fold, groupby)
  • Conversion en DataFrames
  • Motifs courants (analyse de journaux, traitement JSON, traitement de texte)
  • Considérations de performance

Exemple rapide :

import dask.bag as db
import json

# Lire et parser les fichiers JSON
bag = db.read_text('logs/*.json').map(json.loads)

# Filtrer et transformer
valid = bag.filter(lambda x: x['status'] == 'valid')
processed = valid.map(lambda x: {'id': x['id'], 'value': x['value']})

# Convertir en DataFrame pour analyse
ddf = processed.to_dataframe()

Points clés :

  • Utiliser pour le nettoyage initial des données, puis convertir en DataFrame/Array
  • Utiliser foldby au lieu de groupby pour de meilleures performances
  • Les opérations sont streaming et efficaces en mémoire
  • Convertir en formats structurés (DataFrame) pour des opérations complexes

4. Futures - Parallélisation basée sur les tâches

Objectif : Construire des workflows parallèles personnalisés avec un contrôle fin sur l'exécution des tâches et leurs dépendances.

Quand utiliser :

  • Construire des workflows dynamiques, évolutifs
  • Besoin d'exécution immédiate des tâches (pas lazy)
  • Les calculs dépendent de conditions à l'exécution
  • Implémenter des algorithmes parallèles personnalisés
  • Besoin de calculs avec état

Documentation de référence : Pour des conseils complets sur les Futures Dask, consultez references/futures.md qui inclut :

  • Configuration du client distribué
  • Soumission de tâches et travail avec les futures
  • Dépendances entre tâches et mouvement des données
  • Coordination avancée (queues, locks, events, actors)
  • Motifs courants (balayage de paramètres, tâches dynamiques, algorithmes itératifs)

Exemple rapide :

from dask.distributed import Client

client = Client()  # Créer un cluster local

# Soumettre des tâches (exécution immédiate)
def process(x):
    return x ** 2

futures = client.map(process, range(100))

# Récupérer les résultats
results = client.gather(futures)

client.close()

Points clés :

  • Requiert un client distribué (même pour une machine unique)
  • Les tâches s'exécutent immédiatement lors de la soumission
  • Pré-scatter les grandes données pour éviter les transferts répétés
  • ~1 ms de surcharge par tâche (pas approprié pour millions de petites tâches)
  • Utiliser les actors pour les workflows avec état

5. Schedulers - Backends d'exécution

Objectif : Contrôler comment et où les tâches Dask s'exécutent (threads, processes, distribué).

Quand choisir un Scheduler :

  • Threads (défaut) : Opérations NumPy/Pandas, bibliothèques libérant le GIL, bénéfice de la mémoire partagée
  • Processes : Code Python pur, traitement de texte, opérations limitées par le GIL
  • Synchronous : Débogage avec pdb, profiling, compréhension des erreurs
  • Distributed : Besoin du dashboard, clusters multi-machines, fonctionnalités avancées

Documentation de référence : Pour des conseils complets sur les Schedulers Dask, consultez references/schedulers.md qui inclut :

  • Descriptions et caractéristiques des schedulers détaillées
  • Méthodes de configuration (globale, context manager, par-compute)
  • Considérations de performance et surcharge
  • Motifs courants et dépannage
  • Configuration des threads pour des performances optimales

Exemple rapide :

import dask
import dask.dataframe as dd

# Utiliser threads pour DataFrame (défaut, bon pour numérique)
ddf = dd.read_csv('data.csv')
result1 = ddf.mean().compute()  # Utilise threads

# Utiliser processes pour travail lourd en Python
import dask.bag as db
bag = db.read_text('logs/*.txt')
result2 = bag.map(python_function).compute(scheduler='processes')

# Utiliser synchronous pour déboguer
dask.config.set(scheduler='synchronous')
result3 = problematic_computation.compute()  # Peut utiliser pdb

# Utiliser distributed pour monitoring et scaling
from dask.distributed import Client
client = Client()
result4 = computation.compute()  # Utilise distributed avec dashboard

Points clés :

  • Threads : Surcharge minimale (~10 µs/tâche), meilleur pour travail numérique
  • Processes : Évite le GIL (~10 ms/tâche), meilleur pour travail Python
  • Distributed : Dashboard de monitoring (~1 ms/tâche), se met à l'échelle aux clusters
  • Peut changer de scheduler par calcul ou globalement

Bonnes pratiques

Pour des conseils complets sur l'optimisation des performances, les stratégies de gestion mémoire et les pièges courants à éviter, consultez references/best-practices.md. Les principes clés incluent :

Commencer par des solutions plus simples

Avant d'utiliser Dask, explorez :

  • De meilleurs algorithmes
  • Des formats de fichiers efficaces (Parquet au lieu de CSV)
  • Du code compilé (Numba, Cython)
  • L'échantillonnage des données

Règles de performance critiques

1. Ne pas charger les données localement puis les passer à Dask

# Mauvais : Charge toutes les données en mémoire d'abord
import pandas as pd
df = pd.read_csv('large.csv')
ddf = dd.from_pandas(df, npartitions=10)

# Correct : Laisser Dask gérer le chargement
import dask.dataframe as dd
ddf = dd.read_csv('large.csv')

2. Éviter les appels compute() répétés

# Mauvais : Chaque compute est séparé
for item in items:
    result = dask_computation(item).compute()

# Correct : Un seul compute pour tous
computations = [dask_computation(item) for item in items]
results = dask.compute(*computations)

3. Ne pas construire des graphes de tâches excessivement grands

  • Augmenter les tailles de chunks si millions de tâches
  • Utiliser map_partitions/map_blocks pour fusionner les opérations
  • Vérifier la taille du graphe de tâches : len(ddf.__dask_graph__())

4. Choisir des tailles de chunks appropriées

  • Cible : ~100 Mo par chunk (ou 10 chunks par cœur dans la mémoire du worker)
  • Trop grand : Débordement mémoire
  • Trop petit : Surcharge de scheduling

5. Utiliser le Dashboard

from dask.distributed import Client
client = Client()
print(client.dashboard_link)  # Monitorer les performances, identifier les goulots

Motifs de workflow courants

Pipeline ETL

import dask.dataframe as dd

# Extract : Lire les données
ddf = dd.read_csv('raw_data/*.csv')

# Transform : Nettoyer et traiter
ddf = ddf[ddf['status'] == 'valid']
ddf['amount'] = ddf['amount'].astype('float64')
ddf = ddf.dropna(subset=['important_col'])

# Load : Agréger et sauvegarder
summary = ddf.groupby('category').agg({'amount': ['sum', 'mean']})
summary.to_parquet('output/summary.parquet')

Pipeline non-structuré vers structuré

import dask.bag as db
import json

# Commencer avec Bag pour données non-structurées
bag = db.read_text('logs/*.json').map(json.loads)
bag = bag.filter(lambda x: x['status'] == 'valid')

# Convertir en DataFrame pour analyse structurée
ddf = bag.to_dataframe()
result = ddf.groupby('category').mean().compute()

Calcul sur array à grande échelle

import dask.array as da

# Charger ou créer un grand array
x = da.from_zarr('large_dataset.zarr')

# Traiter par chunks
normalized = (x - x.mean()) / x.std()

# Sauvegarder le résultat
da.to_zarr(normalized, 'normalized.zarr')

Workflow parallèle personnalisé

from dask.distributed import Client

client = Client()

# Scatter le grand dataset une fois
data = client.scatter(large_dataset)

# Traiter en parallèle avec dépendances
futures = []
for param in parameters:
    future = client.submit(process, data, param)
    futures.append(future)

# Récupérer les résultats
results = client.gather(futures)

Sélectionner le bon composant

Utilisez ce guide de décision pour choisir le composant Dask approprié :

Type de données :

  • Données tabulaires → DataFrames
  • Arrays numériques → Arrays
  • Texte/JSON/journaux → Bags (puis convertir en DataFrame)
  • Objets Python personnalisés → Bags ou Futures

Type d'opération :

  • Opérations pandas standard → DataFrames
  • Opérations NumPy standard → Arrays
  • Tâches parallèles personnalisées → Futures
  • Traitement texte/ETL → Bags

Niveau de contrôle :

  • Haut-niveau, automatique → DataFrames/Arrays
  • Bas-niveau, manuel → Futures

Type de workflow :

  • Graphe de calcul statique → DataFrames/Arrays/Bags
  • Dynamique, évolutif → Futures

Considérations d'intégration

Formats de fichiers

  • Efficaces : Parquet, HDF5, Zarr (colonnaires, compressés, parallèles)
  • Compatibles mais plus lents : CSV (utiliser pour l'ingestion initiale uniquement)
  • Pour Arrays : HDF5, Zarr, NetCDF

Conversion entre collections

# Bag → DataFrame
ddf = bag.to_dataframe()

# DataFrame → Array (pour données numériques)
arr = ddf.to_dask_array(lengths=True)

# Array → DataFrame
ddf = dd.from_dask_array(arr, columns=['col1', 'col2'])

Avec d'autres bibliothèques

  • XArray : Enveloppe les arrays Dask avec dimensions étiquetées (géospatial, imaging)
  • Dask-ML : Machine learning avec APIs compatibles scikit-learn
  • Distributed : Gestion de clusters avancée et monitoring

Débogage et développement

Workflow de développement itératif

  1. Tester sur petites données avec scheduler synchronous :

    dask.config.set(scheduler='synchronous')
    result = computation.compute()  # Peut utiliser pdb, débogage facile
  2. Valider avec threads sur un échantillon :

    sample = ddf.head(1000)  # Petit échantillon
    # Tester la logique, puis mettre à l'échelle au dataset complet
  3. Mettre à l'échelle avec distributed pour monitoring :

    from dask.distributed import Client
    client = Client()
    print(client.dashboard_link)  # Monitorer les performances
    result = computation.compute()

Problèmes courants

Erreurs mémoire :

  • Réduire les tailles de chunks
  • Utiliser persist() stratégiquement et supprimer après utilisation
  • Vérifier les fuites mémoire dans les fonctions personnalisées

Démarrage lent :

  • Graphe de tâches trop grand (augmenter les tailles de chunks)
  • Utiliser map_partitions ou map_blocks pour réduire les tâches

Parallélisation faible :

  • Chunks trop grands (augmenter le nombre de partitions)
  • Utiliser threads avec code Python (passer à processes)
  • Dépendances de données empêchant le parallélisme

Fichiers de référence

Tous les fichiers de documentation de référence peuvent être lus selon les besoins pour des informations détaillées :

  • references/dataframes.md - Guide complet Dask DataFrame
  • references/arrays.md - Guide complet Dask Array
  • references/bags.md - Guide complet Dask Bag
  • references/futures.md - Guide complet Dask Futures et calcul distribué
  • references/schedulers.md - Guide complet de sélection et configuration des schedulers
  • references/best-practices.md - Optimisation des performances et dépannage complets

Charger ces fichiers quand les utilisateurs ont besoin d'informations détaillées sur des composants, opérations ou motifs Dask spécifiques au-delà des conseils rapides fournis ici.

Skills similaires