cupynumeric-parallel-data-load

Par nvidia · skills

Chargez un dataset fragmenté sur disque (fichiers `.npy` fragmentés, Parquet/Arrow, binaire brut, HDF5 fragmenté, dispositions personnalisées) dans un ndarray cuPyNumeric distribué via une partition manuelle et un lancement de `@task` feuille avec variantes CPU/OMP/GPU. À utiliser quand aucun chargeur en un seul appel ne convient, notamment lorsque le nombre de lignes par fragment diffère selon les fichiers. Préférez `cupynumeric.load` ou `legate.io.hdf5.from_file` lorsque ces fonctions s'appliquent.

npx skills add https://github.com/nvidia/skills --skill cupynumeric-parallel-data-load

Chargement en parallèle de données fragmentées -> cupynumeric load

Pourquoi cette skill existe. cupynumeric reflète l'API NumPy, incluant cupynumeric.load pour un seul fichier .npy. Au-delà de cela, le chargement de fichiers vit dans Legate, pas dans cupynumeric :

Format Loader intégré
.npy unique cupynumeric.load(path) (parité API NumPy)
HDF5 (fichier unique) legate.io.hdf5.from_file / from_file_batched
Multi-fichier fragmenté (tout format), Parquet/Arrow, binaire brut, layouts personnalisés Pas de loader intégré — cette skill.

Cette skill montre le moyen canonique de combler le vide de la dernière ligne : écrire une tâche Python Legate qui appelle le lecteur tiers que le format nécessite (h5py, pyarrow, np.memmap, ...) à l'intérieur du corps de la tâche, et laisser Legate distribuer les lectures sur les GPUs / nœuds. Pour les formats avec un loader intégré, préférez-le sauf si vous avez besoin d'un corps de tâche personnalisé (loader basé sur mmap, décodeur spécifique au format, métadonnées annexes, lectures partielles / fragmentées).

Motif canonique : partitionnement manuel + lancement manuel, dimensionné à la machine, pas aux fichiers. Seul l'axe 0 est fragmenté ; les axes trailing voyagent ensemble à l'intérieur de chaque tuile. Les comptes de lignes par fragment peuvent différer d'un fichier à l'autre (seuls dtype et les axes trailing doivent correspondre) ; le lancement remplit tous les processeurs disponibles quel que soit le nombre de fichiers.

.npy est l'exemple travaillé car l'en-tête porte la forme et le dtype sur disque, mais le squelette s'applique à tout format avec des lectures bon marché par plage/tranche (binaire brut, HDF5, Parquet/Arrow — voir « Autres formats » ci-dessous). Implémentation de référence : assets/examples/parallel_npy_load.py.

Hypothèse de disposition des données

Cette skill concerne purement le chargement — elle suppose que les données sont déjà disposées sur un système de fichiers partagé de manière prévisible et indexable. Produire ces fichiers sort du cadre (l'exemple embarque une sous-commande write pour la commodité, mais les vrais utilisateurs apportent les leurs).

L'exemple travaillé suppose une disposition spécifique :

  • Un répertoire contenant des fichiers nommés shard_0000.npy, shard_0001.npy, ... dans une séquence d'entiers contigue (largeur remplie de zéros 4).
  • Tous les fragments partagent le même dtype et les mêmes axes trailing (shape[1:]) ; l'axe 0 (lignes par fragment) peut différer d'un fichier à l'autre — la recette construit un tableau de décalage de lignes cumulatif et lit la tranche chevauchante de chaque fichier de l'intérieur de la tâche feuille.
  • Le répertoire est visible par chaque rang (système de fichiers partagé pour les exécutions multi-nœuds).

La fonction discover_layout() de l'exemple affiche ce qu'elle a trouvé et échoue fermement avec un message d'erreur descriptif quand la disposition est incorrecte (répertoire manquant, pas de fragments, dtype / axes trailing incompatibles, ou un trou dans la séquence shard_NNNN.npy contigue).

Si vos données résident dans une disposition différente — binaire brut à pas constant, un fichier HDF5 avec un dataset par fragment, un arborescence de répertoires, ... — seul le modèle de glob, le lecteur par fichier (étape 4 ci-dessous), et la découverte des métadonnées (étape 1 ci-dessous) changent. La machinerie de partitionnement et lancement est agnostique à la disposition.

Quand utiliser

Voir le tableau des formats ci-dessus pour la décision de routage (loader intégré vs. cette skill). Au-delà, deux indices supplémentaires indiquent que cette skill est le bon choix :

  • Remplacer les lectures séquentielles np.concatenate([read(f) for f in files]) par des lectures parallèles par GPU.
  • Démontrer comment une tâche Python Legate définie par l'utilisateur écrit dans un tableau de sortie cupynumeric via un lancement manuel.

Exemples

Les chemins ci-dessous sont écrits par rapport au répertoire de cette skill (le script se trouve à assets/examples/parallel_npy_load.py). Ajustez le préfixe pour correspondre à l'endroit où votre skill est installée (par ex. skills/cupynumeric-parallel-data-load/assets/... si la skill vit sous un répertoire skills/ au niveau supérieur).

# Nœud unique, 4 GPUs.
legate --gpus 4 --fbmem 4000 --min-gpu-chunk 1 \
    assets/examples/parallel_npy_load.py \
    read --shard-dir /shared/scratch/demo
# Multi-nœuds, 2 nœuds x 4 GPUs (slurm), système de fichiers partagé à --shard-dir.
# Générer les fragments une fois sur le rang 0, puis re-lancer `read` à n'importe quelle échelle.
legate --launcher srun --nodes 2 --cpus 1 \
    assets/examples/parallel_npy_load.py \
    write --shard-dir /shared/scratch/demo

legate --launcher srun --nodes 2 --ranks-per-node 4 \
    --gpus 4 --fbmem 4000 --min-gpu-chunk 1 \
    assets/examples/parallel_npy_load.py \
    read --shard-dir /shared/scratch/demo

Pas de drapeaux de disposition — le pilote de lecture parcourt chaque en-tête .npy pour récupérer les comptes de lignes par fichier, la forme trailing et le dtype, puis dérive tile_rows du nombre de processeurs disponibles.

--min-gpu-chunk 1 n'est nécessaire que quand le nombre d'éléments par tuile est inférieur à la taille de chunk minimale par défaut de Legate pour les lancements GPU (par ex. les valeurs par défaut de l'exemple travaillé — lignes totales divisées sur 4 GPUs à ~1M par tuile — tombent sous le seuil et seraient autrement repliées sur un seul GPU). Pour les datasets de taille production (des dizaines de millions d'éléments par tuile ou plus) vous pouvez laisser le drapeau et laisser Legate utiliser sa valeur par défaut. Le porter à une valeur modérée (par ex. --min-gpu-chunk 1024) est correct quand chaque tuile est assez grande pour que le surcoût par tâche importe plus que d'obtenir chaque GPU une tuile.

Instructions

Cinq étapes à partir d'un exemple travaillé .npy ; seule l'étape 1 (analyser l'en-tête de format) et l'étape 4 (le lecteur par fichier à l'intérieur du corps de tâche) sont spécifiques au format. Les trois autres (allouer la destination, partitionner, attendre) sont réutilisées inchangées sur les formats — voir « Autres formats » ci-dessous pour les points de permutation.

1. Lire les métadonnées de chaque fragment

Parcourir le répertoire et jeter un œil à chaque en-tête .npy (mmap_mode="r" ne lit que l'en-tête). L'en-tête porte la forme et le dtype par fragment, donc le pilote peut récupérer les lignes totales, la forme trailing et un tableau de décalage de lignes cumulatif sans jamais charger les données :

paths = sorted(SHARD_DIR.glob("shard_*.npy"))

per_file_rows = []                       # lignes selon l'axe 0 par fichier
trailing_shape = None                    # shape[1:], doit correspondre d'un fichier à l'autre
dtype = None
for p in paths:
    hdr = np.load(p, mmap_mode="r")
    if trailing_shape is None:
        trailing_shape = tuple(hdr.shape[1:])
        dtype = hdr.dtype
    elif tuple(hdr.shape[1:]) != trailing_shape or hdr.dtype != dtype:
        raise RuntimeError(
            f"{p.name}: incompatibilité de forme trailing / dtype "
            f"({hdr.shape[1:]}/{hdr.dtype} vs {trailing_shape}/{dtype})"
        )
    per_file_rows.append(int(hdr.shape[0]))

cum_rows = np.cumsum([0] + per_file_rows, dtype=np.int64)  # longueur N+1
total_rows = int(cum_rows[-1])

L'extrait ci-dessus impose que dtype et trailing_shape (soit shape[1:]) correspondent d'un fichier à l'autre. Les comptes de lignes par fragment peuvent différer — le tableau des lignes cumulatives gère cela. Le code production devrait aussi vérifier que les noms forment une séquence contigue shard_0000.npy ... shard_NNNN.npy (omis de l'extrait par brièveté ; voir discover_layout() dans l'exemple travaillé). La découverte repose seulement sur ce que le format sur disque expose lui-même (l'en-tête .npy ici, .shape / .dtype pour HDF5, etc.) ; tout annexe (manifeste, hashes de contenu) est une étape de vérification séparée par-dessus.

2. Créer le magasin de sortie cupynumeric à partir des métadonnées

Le tableau total s'étend sur total_rows selon l'axe 0 ; les axes trailing viennent de trailing_shape inchangés. Utilisez cn.empty — la tâche écrase chaque cellule, l'init à zéro serait du gaspillage.

import cupynumeric as cn

total_shape = (total_rows,) + trailing_shape
out = cn.empty(total_shape, dtype=dtype)

3. Tuiler le magasin par nombre de processeurs

La forme du lancement est dimensionnée aux processeurs disponibles, pas au nombre de fichiers. Choisissez tile_rows = ceil(total_rows / num_processors) et partitionnez l'axe 0 par cette taille de tuile. Les axes trailing ne sont pas partitionnés (la tuile s'étend sur toute l'étendue là). La dernière tuile peut être courte — c'est exactement ce que partition_by_tiling supporte — donc la recette n'a pas besoin de contrainte de divisibilité.

from legate.core import TaskTarget, get_legate_runtime
from legate.core.data_interface import as_logical_array

runtime = get_legate_runtime()
machine = runtime.get_machine()
num_processors = max(
    machine.count(TaskTarget.GPU),
    machine.count(TaskTarget.OMP),
    machine.count(TaskTarget.CPU),
    1,
)

tile_rows = max(1, (total_rows + num_processors - 1) // num_processors)
tile_shape = (tile_rows,) + trailing_shape
partition = as_logical_array(out).data.partition_by_tiling(tile_shape)

num_tasks = (total_rows + tile_rows - 1) // tile_rows  # correspondre au nombre de tuiles de la partition

4. Définir la tâche feuille et la lancer manuellement

PATHS et CUM_ROWS (les chemins de fichiers et le tableau de décalage de lignes cumulatif de l'étape 1) plus TILE_ROWS sont remplis comme variables globales de module par le pilote avant le lancement ; la réplication du contrôle exécute le pilote sur chaque rang, donc chaque travailleur voit des valeurs identiques.

Chaque tâche construit d'abord sa vue consommatrice (cupy sur GPU, numpy sur CPU/OMP) et lit le nombre de lignes réel de la tuile à partir de view.shape[0]PhysicalStore lui-même n'a pas d'attribut .shape, donc passer par la vue est requis. Elle calcule alors sa plage de lignes globale à partir de sa coordonnée de lancement et ce nombre de lignes, bisecte cum_rows pour la(les) fichier(s) chevauchant(s), et copie chaque tranche de fichier chevauchante dans la tranche de destination correspondante. Enregistrez les variantes CPU, OMP et GPU pour que le même lancement s'exécute inchangé partout ; la dispatch sur ctx.get_variant_kind() choisit le consommateur correspondant à l'endroit où le OutputStore réside (cp.from_dlpack(dst) pour FBMEM, np.asarray(dst) pour SYSMEM). cupy est importé seulement à l'intérieur de la branche GPU, donc le corps de tâche se charge sur les machines sans cupy.

import bisect
from legate.core import TaskContext, VariantCode
from legate.core.task import OutputStore, task

@task(variants=(VariantCode.CPU, VariantCode.OMP, VariantCode.GPU))
def load_tile(ctx: TaskContext, dst: OutputStore) -> None:
    t = ctx.task_index[0]                              # index de tuile 0..num_tasks-1

    variant = ctx.get_variant_kind()
    if variant == VariantCode.GPU:
        import cupy as cp                              # lazy : seulement sur GPU
        view = cp.from_dlpack(dst)
    else:
        view = np.asarray(dst)                         # vue numpy sans copie

    tile_rows_actual = view.shape[0]                   # courte sur la dernière tuile
    row_start = t * TILE_ROWS                          # départ global selon l'axe 0
    row_end = row_start + tile_rows_actual

    # Trouver la plage semi-ouverte d'indices de fichier qui chevauchent [row_start, row_end).
    first_file = bisect.bisect_right(CUM_ROWS, row_start) - 1
    last_file = bisect.bisect_right(CUM_ROWS, row_end - 1) - 1

    for f in range(first_file, last_file + 1):
        # Intersection de la tuile [row_start, row_end) avec le fichier [cum[f], cum[f+1]).
        lo = max(row_start, int(CUM_ROWS[f]))
        hi = min(row_end, int(CUM_ROWS[f + 1]))
        file_lo = lo - int(CUM_ROWS[f])
        file_hi = hi - int(CUM_ROWS[f])
        dst_lo = lo - row_start
        dst_hi = hi - row_start
        chunk = np.ascontiguousarray(
            np.load(PATHS[f], mmap_mode="r")[file_lo:file_hi]
        )
        if variant == VariantCode.GPU:
            view[dst_lo:dst_hi].set(chunk)             # cudaMemcpyAsync H2D
        else:
            view[dst_lo:dst_hi] = chunk                # écriture numpy sans copie

manual_task = runtime.create_manual_task(
    load_tile.library,
    load_tile.task_id,
    (num_tasks,),                                      # domaine de lancement == nombre de tuiles
)
manual_task.add_output(partition)
manual_task.execute()

Les deux consommateurs passent par les producteurs natifs de PhysicalStore (__dlpack__ pour cupy, __array_interface__ pour np.asarray) — vues sans copie de la tuile locale. Le coût de bisect est O(log num_shards) et la boucle interne itère généralement 1–2 fois (les tuiles chevauchent au plus quelques fichiers).

5. Attendre et vérifier

get_legate_runtime().issue_execution_fence(block=True)

Contraintes strictes

  1. Tous les fragments doivent partager dtype et les axes trailing (shape[1:]). La recette empile les fragments selon l'axe 0 ; les axes trailing de la destination viennent de trailing_shape, que l'étape de découverte verrouille à la valeur du premier fichier. Les comptes de lignes par fragment (shape[0]) peuvent différer librement — le tableau de décalage cumulatif les gère. L'exemple rejette tout fragment dont le dtype ou la forme trailing diffère du premier avec un message d'erreur descriptif.

  2. Choisissez le consommateur qui correspond à la variante. cp.from_dlpack rejette les magasins résidant en SYSMEM ; np.asarray retourne silencieusement une vue hôte d'un magasin résidant en FBMEM que vous ne pouvez pas réellement écrire au travers. Dispatcher sur ctx.get_variant_kind() pour que chaque variante utilise son propre consommateur — voir l'étape 4.

  3. Les vues mmap ne sont pas toujours C-contiguës — enveloppez chaque tranche par fichier avec np.ascontiguousarray(arr[file_lo:file_hi]) avant .set() ou l'écriture numpy sur place.

  4. Multi-nœuds : SHARD_DIR doit être sur un système de fichiers partagé. Chaque travailleur (sur chaque rang) ouvre les fragments par chemin ; les chemins /tmp locaux aux nœuds ne fonctionnent que pour les démos nœud unique.

Variantes

Chemin rapide fragment uniforme (une tâche par fichier)

Quand chaque fragment a déjà le même (shape, dtype) et que vous avez justement num_shards processeurs disponibles, la machinerie cum-rows / bisect est du surcoût. Définissez tile_rows = shard_shape[0] et num_tasks = num_shards ; la partition a alors une tuile par fichier et chaque tâche lit exactement un fichier d'un bout à l'autre (pas de bisect, pas de boucle interne). Le commutateur côté pilote est une ligne :

if all(r == per_file_rows[0] for r in per_file_rows) and num_shards == num_processors:
    tile_rows = per_file_rows[0]
else:
    tile_rows = max(1, (total_rows + num_processors - 1) // num_processors)

Le même corps de tâche load_tile fonctionne toujours en l'un ou l'autre mode — la boucle interne itère juste exactement une fois par tâche. Il n'y a pas besoin d'un corps de tâche séparé pour le chemin rapide.

Sur-décomposer pour un meilleur équilibrage de charge

Le défaut tile_rows = ceil(total_rows / num_processors) donne une tuile par processeur. Pour sur-décomposer par un facteur K (tuiles plus petites, plus de point de tâches, queueing à granularité plus fine), divisez par K * num_processors à la place :

tile_rows = max(1, (total_rows + K * num_processors - 1) // (K * num_processors))

num_tasks = ceil(total_rows / tile_rows) s'étend alors à environ K * num_processors. Le même corps de tâche fonctionne toujours — bisect arrive juste sur plus de tâches par fichier.

Autres formats

Seul le lecteur par fichier à l'intérieur de load_tile change. Le contrat du lecteur : étant donné un chemin de fichier et une plage de lignes semi-ouverte [file_lo, file_hi) selon l'axe 0, retourner un tableau numpy de forme (file_hi - file_lo,) + trailing_shape qui peut être rendu C-contigu. Les lectures bon marché par plage/tranche sont requises — les formats qui ne supportent que « lire tout le fichier » défont le cas de chevauchement partiel (une tuile qui couvre seulement partie d'un fichier).

Format Lecteur à l'intérieur de la tâche feuille
.npy (exemple travaillé) host = np.ascontiguousarray(np.load(p, mmap_mode="r")[file_lo:file_hi])
Binaire brut (forme fixe) arr = np.memmap(p, dtype=DTYPE, mode="r", shape=(rows_in_file, *trailing_shape)); host = np.ascontiguousarray(arr[file_lo:file_hi])
HDF5 with h5py.File(p, "r") as f: host = np.ascontiguousarray(f["data"][file_lo:file_hi])
Parquet / Arrow tbl = pq.read_table(p, columns=..., use_threads=False).slice(file_lo, file_hi - file_lo); host = tbl.to_pandas().values

(Pour les loaders intégrés monolithiques par format, voir le tableau « Pourquoi cette skill existe » en haut de ce fichier.)

L'étape de découverte (étape 1) analyse les métadonnées de chaque format : .npy / HDF5 / Parquet portent tous le nombre de lignes par fichier + dtype sur disque. Le binaire brut ne le fait pas — annexe ou dériver de la taille de fichier.

Pièges courants

cn.asarray(dst) est illégal dans une tâche feuille

À l'intérieur d'un corps @task, toute opération cupynumeric qui touche le runtime au niveau supérieur — cn.asarray(store), assignation de tranche cn_dst[s] = host_np — déclenche create_index_space du mauvais contexte et Legion abandonne :

LEGION API USAGE EXCEPTION: Invalid task context passed to runtime call
create_index_space

Correction : consommer la capsule DLPack avec une bibliothèque tiers (cupy / torch / numpy) à l'intérieur des tâches feuille. cn.asarray est correct dans le pilote, simplement pas dans les tâches feuille. Voir examples/dlpack/leaf_task_interop.py pour le contournement flaveur torch.

L'assert dans la tâche abandonne le runtime

Legate traite les exceptions non levées dans une @task comme une violation de contrat et abandonne sauf si la tâche a été enregistrée avec throws_exception(). Sanity-check sur l'hôte avant le lancement.

Le domaine de lancement doit correspondre au nombre de tuiles de la partition

create_manual_task(launch_shape=...) et partition_by_tiling(...) sont indépendants — le runtime ne détecte pas une incompatibilité. Domaine de lancement plus grand → tuiles hors plage ; plus petit → tuiles non écrites. Dérivez toujours les deux du même (total_rows, tile_rows) via deux divisions ceil séparées (dimensionner le domaine de lancement à num_processors directement sur-lancerait quand num_processors > total_rows) :

tile_rows = max(1, (total_rows + num_processors - 1) // num_processors)
num_tasks = (total_rows + tile_rows - 1) // tile_rows
partition = ...partition_by_tiling((tile_rows,) + trailing_shape)
runtime.create_manual_task(load_tile.library, load_tile.task_id, (num_tasks,))

Skills similaires