Chargement en parallèle de données fragmentées -> cupynumeric load
Pourquoi cette skill existe. cupynumeric reflète l'API NumPy,
incluant cupynumeric.load pour un seul fichier .npy. Au-delà de cela,
le chargement de fichiers vit dans Legate, pas dans cupynumeric :
| Format | Loader intégré |
|---|---|
.npy unique |
cupynumeric.load(path) (parité API NumPy) |
| HDF5 (fichier unique) | legate.io.hdf5.from_file / from_file_batched |
| Multi-fichier fragmenté (tout format), Parquet/Arrow, binaire brut, layouts personnalisés | Pas de loader intégré — cette skill. |
Cette skill montre le moyen canonique de combler le vide de la dernière ligne :
écrire une tâche Python Legate qui appelle le lecteur tiers que le
format nécessite (h5py, pyarrow, np.memmap, ...) à l'intérieur du
corps de la tâche, et laisser Legate distribuer les lectures sur les GPUs / nœuds.
Pour les formats avec un loader intégré, préférez-le sauf si vous avez besoin d'un
corps de tâche personnalisé (loader basé sur mmap, décodeur spécifique au format,
métadonnées annexes, lectures partielles / fragmentées).
Motif canonique : partitionnement manuel + lancement manuel, dimensionné à la
machine, pas aux fichiers. Seul l'axe 0 est fragmenté ; les axes trailing
voyagent ensemble à l'intérieur de chaque tuile. Les comptes de lignes par fragment
peuvent différer d'un fichier à l'autre (seuls dtype et les axes trailing doivent
correspondre) ; le lancement remplit tous les processeurs disponibles quel que soit
le nombre de fichiers.
.npy est l'exemple travaillé car l'en-tête porte la forme et le
dtype sur disque, mais le squelette s'applique à tout format avec des lectures
bon marché par plage/tranche (binaire brut, HDF5, Parquet/Arrow — voir « Autres
formats » ci-dessous). Implémentation de référence :
assets/examples/parallel_npy_load.py.
Hypothèse de disposition des données
Cette skill concerne purement le chargement — elle suppose que les données sont déjà
disposées sur un système de fichiers partagé de manière prévisible et indexable.
Produire ces fichiers sort du cadre (l'exemple embarque une sous-commande write
pour la commodité, mais les vrais utilisateurs apportent les leurs).
L'exemple travaillé suppose une disposition spécifique :
- Un répertoire contenant des fichiers nommés
shard_0000.npy,shard_0001.npy, ... dans une séquence d'entiers contigue (largeur remplie de zéros 4). - Tous les fragments partagent le même
dtypeet les mêmes axes trailing (shape[1:]) ; l'axe 0 (lignes par fragment) peut différer d'un fichier à l'autre — la recette construit un tableau de décalage de lignes cumulatif et lit la tranche chevauchante de chaque fichier de l'intérieur de la tâche feuille. - Le répertoire est visible par chaque rang (système de fichiers partagé pour les exécutions multi-nœuds).
La fonction discover_layout() de l'exemple affiche ce qu'elle a trouvé et échoue
fermement avec un message d'erreur descriptif quand la disposition est incorrecte
(répertoire manquant, pas de fragments, dtype / axes trailing incompatibles, ou
un trou dans la séquence shard_NNNN.npy contigue).
Si vos données résident dans une disposition différente — binaire brut à pas constant, un fichier HDF5 avec un dataset par fragment, un arborescence de répertoires, ... — seul le modèle de glob, le lecteur par fichier (étape 4 ci-dessous), et la découverte des métadonnées (étape 1 ci-dessous) changent. La machinerie de partitionnement et lancement est agnostique à la disposition.
Quand utiliser
Voir le tableau des formats ci-dessus pour la décision de routage (loader intégré vs. cette skill). Au-delà, deux indices supplémentaires indiquent que cette skill est le bon choix :
- Remplacer les lectures séquentielles
np.concatenate([read(f) for f in files])par des lectures parallèles par GPU. - Démontrer comment une tâche Python Legate définie par l'utilisateur écrit dans un tableau de sortie cupynumeric via un lancement manuel.
Exemples
Les chemins ci-dessous sont écrits par rapport au répertoire de cette skill (le script
se trouve à assets/examples/parallel_npy_load.py). Ajustez le préfixe pour
correspondre à l'endroit où votre skill est installée (par ex.
skills/cupynumeric-parallel-data-load/assets/... si la skill vit
sous un répertoire skills/ au niveau supérieur).
# Nœud unique, 4 GPUs.
legate --gpus 4 --fbmem 4000 --min-gpu-chunk 1 \
assets/examples/parallel_npy_load.py \
read --shard-dir /shared/scratch/demo
# Multi-nœuds, 2 nœuds x 4 GPUs (slurm), système de fichiers partagé à --shard-dir.
# Générer les fragments une fois sur le rang 0, puis re-lancer `read` à n'importe quelle échelle.
legate --launcher srun --nodes 2 --cpus 1 \
assets/examples/parallel_npy_load.py \
write --shard-dir /shared/scratch/demo
legate --launcher srun --nodes 2 --ranks-per-node 4 \
--gpus 4 --fbmem 4000 --min-gpu-chunk 1 \
assets/examples/parallel_npy_load.py \
read --shard-dir /shared/scratch/demo
Pas de drapeaux de disposition — le pilote de lecture parcourt chaque en-tête .npy
pour récupérer les comptes de lignes par fichier, la forme trailing et le dtype,
puis dérive tile_rows du nombre de processeurs disponibles.
--min-gpu-chunk 1 n'est nécessaire que quand le nombre d'éléments par tuile est
inférieur à la taille de chunk minimale par défaut de Legate pour les lancements GPU
(par ex. les valeurs par défaut de l'exemple travaillé — lignes totales divisées sur
4 GPUs à ~1M par tuile — tombent sous le seuil et seraient autrement repliées sur
un seul GPU). Pour les datasets de taille production (des dizaines de
millions d'éléments par tuile ou plus) vous pouvez laisser le drapeau et
laisser Legate utiliser sa valeur par défaut. Le porter à une valeur modérée (par ex.
--min-gpu-chunk 1024) est correct quand chaque tuile est assez grande pour que
le surcoût par tâche importe plus que d'obtenir chaque GPU une tuile.
Instructions
Cinq étapes à partir d'un exemple travaillé .npy ; seule l'étape 1 (analyser l'en-tête
de format) et l'étape 4 (le lecteur par fichier à l'intérieur du corps de tâche)
sont spécifiques au format. Les trois autres (allouer la destination, partitionner,
attendre) sont réutilisées inchangées sur les formats — voir « Autres formats »
ci-dessous pour les points de permutation.
1. Lire les métadonnées de chaque fragment
Parcourir le répertoire et jeter un œil à chaque en-tête .npy (mmap_mode="r"
ne lit que l'en-tête). L'en-tête porte la forme et le dtype par fragment, donc le
pilote peut récupérer les lignes totales, la forme trailing et un tableau de
décalage de lignes cumulatif sans jamais charger les données :
paths = sorted(SHARD_DIR.glob("shard_*.npy"))
per_file_rows = [] # lignes selon l'axe 0 par fichier
trailing_shape = None # shape[1:], doit correspondre d'un fichier à l'autre
dtype = None
for p in paths:
hdr = np.load(p, mmap_mode="r")
if trailing_shape is None:
trailing_shape = tuple(hdr.shape[1:])
dtype = hdr.dtype
elif tuple(hdr.shape[1:]) != trailing_shape or hdr.dtype != dtype:
raise RuntimeError(
f"{p.name}: incompatibilité de forme trailing / dtype "
f"({hdr.shape[1:]}/{hdr.dtype} vs {trailing_shape}/{dtype})"
)
per_file_rows.append(int(hdr.shape[0]))
cum_rows = np.cumsum([0] + per_file_rows, dtype=np.int64) # longueur N+1
total_rows = int(cum_rows[-1])
L'extrait ci-dessus impose que dtype et trailing_shape (soit
shape[1:]) correspondent d'un fichier à l'autre. Les comptes de lignes par fragment
peuvent différer — le tableau des lignes cumulatives gère cela. Le code production
devrait aussi vérifier que les noms forment une séquence contigue
shard_0000.npy ... shard_NNNN.npy (omis de l'extrait par brièveté ; voir
discover_layout() dans l'exemple travaillé). La découverte repose seulement sur
ce que le format sur disque expose lui-même (l'en-tête .npy ici, .shape /
.dtype pour HDF5, etc.) ; tout annexe (manifeste, hashes de contenu) est une
étape de vérification séparée par-dessus.
2. Créer le magasin de sortie cupynumeric à partir des métadonnées
Le tableau total s'étend sur total_rows selon l'axe 0 ; les axes trailing viennent
de trailing_shape inchangés. Utilisez cn.empty — la tâche écrase chaque cellule,
l'init à zéro serait du gaspillage.
import cupynumeric as cn
total_shape = (total_rows,) + trailing_shape
out = cn.empty(total_shape, dtype=dtype)
3. Tuiler le magasin par nombre de processeurs
La forme du lancement est dimensionnée aux processeurs disponibles, pas au
nombre de fichiers. Choisissez tile_rows = ceil(total_rows / num_processors) et
partitionnez l'axe 0 par cette taille de tuile. Les axes trailing ne sont pas
partitionnés (la tuile s'étend sur toute l'étendue là). La dernière tuile peut
être courte — c'est exactement ce que partition_by_tiling supporte — donc la
recette n'a pas besoin de contrainte de divisibilité.
from legate.core import TaskTarget, get_legate_runtime
from legate.core.data_interface import as_logical_array
runtime = get_legate_runtime()
machine = runtime.get_machine()
num_processors = max(
machine.count(TaskTarget.GPU),
machine.count(TaskTarget.OMP),
machine.count(TaskTarget.CPU),
1,
)
tile_rows = max(1, (total_rows + num_processors - 1) // num_processors)
tile_shape = (tile_rows,) + trailing_shape
partition = as_logical_array(out).data.partition_by_tiling(tile_shape)
num_tasks = (total_rows + tile_rows - 1) // tile_rows # correspondre au nombre de tuiles de la partition
4. Définir la tâche feuille et la lancer manuellement
PATHS et CUM_ROWS (les chemins de fichiers et le tableau de décalage de lignes
cumulatif de l'étape 1) plus TILE_ROWS sont remplis comme variables globales de
module par le pilote avant le lancement ; la réplication du contrôle exécute le
pilote sur chaque rang, donc chaque travailleur voit des valeurs identiques.
Chaque tâche construit d'abord sa vue consommatrice (cupy sur GPU, numpy sur
CPU/OMP) et lit le nombre de lignes réel de la tuile à partir de view.shape[0]
— PhysicalStore lui-même n'a pas d'attribut .shape, donc passer par la
vue est requis. Elle calcule alors sa plage de lignes globale à partir de sa
coordonnée de lancement et ce nombre de lignes, bisecte cum_rows pour la(les)
fichier(s) chevauchant(s), et copie chaque tranche de fichier chevauchante dans la
tranche de destination correspondante. Enregistrez les variantes CPU, OMP et GPU
pour que le même lancement s'exécute inchangé partout ; la dispatch sur
ctx.get_variant_kind() choisit le consommateur correspondant à l'endroit où le
OutputStore réside (cp.from_dlpack(dst) pour FBMEM,
np.asarray(dst) pour SYSMEM). cupy est importé seulement à l'intérieur de la
branche GPU, donc le corps de tâche se charge sur les machines sans cupy.
import bisect
from legate.core import TaskContext, VariantCode
from legate.core.task import OutputStore, task
@task(variants=(VariantCode.CPU, VariantCode.OMP, VariantCode.GPU))
def load_tile(ctx: TaskContext, dst: OutputStore) -> None:
t = ctx.task_index[0] # index de tuile 0..num_tasks-1
variant = ctx.get_variant_kind()
if variant == VariantCode.GPU:
import cupy as cp # lazy : seulement sur GPU
view = cp.from_dlpack(dst)
else:
view = np.asarray(dst) # vue numpy sans copie
tile_rows_actual = view.shape[0] # courte sur la dernière tuile
row_start = t * TILE_ROWS # départ global selon l'axe 0
row_end = row_start + tile_rows_actual
# Trouver la plage semi-ouverte d'indices de fichier qui chevauchent [row_start, row_end).
first_file = bisect.bisect_right(CUM_ROWS, row_start) - 1
last_file = bisect.bisect_right(CUM_ROWS, row_end - 1) - 1
for f in range(first_file, last_file + 1):
# Intersection de la tuile [row_start, row_end) avec le fichier [cum[f], cum[f+1]).
lo = max(row_start, int(CUM_ROWS[f]))
hi = min(row_end, int(CUM_ROWS[f + 1]))
file_lo = lo - int(CUM_ROWS[f])
file_hi = hi - int(CUM_ROWS[f])
dst_lo = lo - row_start
dst_hi = hi - row_start
chunk = np.ascontiguousarray(
np.load(PATHS[f], mmap_mode="r")[file_lo:file_hi]
)
if variant == VariantCode.GPU:
view[dst_lo:dst_hi].set(chunk) # cudaMemcpyAsync H2D
else:
view[dst_lo:dst_hi] = chunk # écriture numpy sans copie
manual_task = runtime.create_manual_task(
load_tile.library,
load_tile.task_id,
(num_tasks,), # domaine de lancement == nombre de tuiles
)
manual_task.add_output(partition)
manual_task.execute()
Les deux consommateurs passent par les producteurs natifs de PhysicalStore
(__dlpack__ pour cupy, __array_interface__ pour np.asarray) —
vues sans copie de la tuile locale. Le coût de bisect est O(log num_shards)
et la boucle interne itère généralement 1–2 fois (les tuiles chevauchent au
plus quelques fichiers).
5. Attendre et vérifier
get_legate_runtime().issue_execution_fence(block=True)
Contraintes strictes
-
Tous les fragments doivent partager
dtypeet les axes trailing (shape[1:]). La recette empile les fragments selon l'axe 0 ; les axes trailing de la destination viennent detrailing_shape, que l'étape de découverte verrouille à la valeur du premier fichier. Les comptes de lignes par fragment (shape[0]) peuvent différer librement — le tableau de décalage cumulatif les gère. L'exemple rejette tout fragment dont ledtypeou la forme trailing diffère du premier avec un message d'erreur descriptif. -
Choisissez le consommateur qui correspond à la variante.
cp.from_dlpackrejette les magasins résidant en SYSMEM ;np.asarrayretourne silencieusement une vue hôte d'un magasin résidant en FBMEM que vous ne pouvez pas réellement écrire au travers. Dispatcher surctx.get_variant_kind()pour que chaque variante utilise son propre consommateur — voir l'étape 4. -
Les vues mmap ne sont pas toujours C-contiguës — enveloppez chaque tranche par fichier avec
np.ascontiguousarray(arr[file_lo:file_hi])avant.set()ou l'écriture numpy sur place. -
Multi-nœuds :
SHARD_DIRdoit être sur un système de fichiers partagé. Chaque travailleur (sur chaque rang) ouvre les fragments par chemin ; les chemins/tmplocaux aux nœuds ne fonctionnent que pour les démos nœud unique.
Variantes
Chemin rapide fragment uniforme (une tâche par fichier)
Quand chaque fragment a déjà le même (shape, dtype) et que vous avez
justement num_shards processeurs disponibles, la machinerie cum-rows / bisect
est du surcoût. Définissez tile_rows = shard_shape[0] et
num_tasks = num_shards ; la partition a alors une tuile par fichier
et chaque tâche lit exactement un fichier d'un bout à l'autre (pas de bisect,
pas de boucle interne). Le commutateur côté pilote est une ligne :
if all(r == per_file_rows[0] for r in per_file_rows) and num_shards == num_processors:
tile_rows = per_file_rows[0]
else:
tile_rows = max(1, (total_rows + num_processors - 1) // num_processors)
Le même corps de tâche load_tile fonctionne toujours en l'un ou l'autre mode —
la boucle interne itère juste exactement une fois par tâche. Il n'y a pas besoin
d'un corps de tâche séparé pour le chemin rapide.
Sur-décomposer pour un meilleur équilibrage de charge
Le défaut tile_rows = ceil(total_rows / num_processors) donne une tuile par
processeur. Pour sur-décomposer par un facteur K (tuiles plus petites,
plus de point de tâches, queueing à granularité plus fine), divisez par
K * num_processors à la place :
tile_rows = max(1, (total_rows + K * num_processors - 1) // (K * num_processors))
num_tasks = ceil(total_rows / tile_rows) s'étend alors à environ
K * num_processors. Le même corps de tâche fonctionne toujours — bisect
arrive juste sur plus de tâches par fichier.
Autres formats
Seul le lecteur par fichier à l'intérieur de load_tile change. Le contrat du
lecteur : étant donné un chemin de fichier et une plage de lignes semi-ouverte
[file_lo, file_hi) selon l'axe 0, retourner un tableau numpy de forme
(file_hi - file_lo,) + trailing_shape qui peut être rendu C-contigu.
Les lectures bon marché par plage/tranche sont requises — les formats qui ne
supportent que « lire tout le fichier » défont le cas de chevauchement partiel
(une tuile qui couvre seulement partie d'un fichier).
| Format | Lecteur à l'intérieur de la tâche feuille |
|---|---|
.npy (exemple travaillé) |
host = np.ascontiguousarray(np.load(p, mmap_mode="r")[file_lo:file_hi]) |
| Binaire brut (forme fixe) | arr = np.memmap(p, dtype=DTYPE, mode="r", shape=(rows_in_file, *trailing_shape)); host = np.ascontiguousarray(arr[file_lo:file_hi]) |
| HDF5 | with h5py.File(p, "r") as f: host = np.ascontiguousarray(f["data"][file_lo:file_hi]) |
| Parquet / Arrow | tbl = pq.read_table(p, columns=..., use_threads=False).slice(file_lo, file_hi - file_lo); host = tbl.to_pandas().values |
(Pour les loaders intégrés monolithiques par format, voir le tableau « Pourquoi cette skill existe » en haut de ce fichier.)
L'étape de découverte (étape 1) analyse les métadonnées de chaque format : .npy /
HDF5 / Parquet portent tous le nombre de lignes par fichier + dtype sur disque.
Le binaire brut ne le fait pas — annexe ou dériver de la taille de fichier.
Pièges courants
cn.asarray(dst) est illégal dans une tâche feuille
À l'intérieur d'un corps @task, toute opération cupynumeric qui touche le
runtime au niveau supérieur — cn.asarray(store), assignation de tranche
cn_dst[s] = host_np — déclenche create_index_space du mauvais contexte et
Legion abandonne :
LEGION API USAGE EXCEPTION: Invalid task context passed to runtime call
create_index_space
Correction : consommer la capsule DLPack avec une bibliothèque tiers (cupy /
torch / numpy) à l'intérieur des tâches feuille. cn.asarray est correct dans
le pilote, simplement pas dans les tâches feuille. Voir
examples/dlpack/leaf_task_interop.py pour le contournement flaveur torch.
L'assert dans la tâche abandonne le runtime
Legate traite les exceptions non levées dans une @task comme une violation de
contrat et abandonne sauf si la tâche a été enregistrée avec throws_exception().
Sanity-check sur l'hôte avant le lancement.
Le domaine de lancement doit correspondre au nombre de tuiles de la partition
create_manual_task(launch_shape=...) et partition_by_tiling(...) sont
indépendants — le runtime ne détecte pas une incompatibilité. Domaine de lancement
plus grand → tuiles hors plage ; plus petit → tuiles non écrites. Dérivez toujours
les deux du même (total_rows, tile_rows) via deux divisions ceil séparées
(dimensionner le domaine de lancement à num_processors directement sur-lancerait
quand num_processors > total_rows) :
tile_rows = max(1, (total_rows + num_processors - 1) // num_processors)
num_tasks = (total_rows + tile_rows - 1) // tile_rows
partition = ...partition_by_tiling((tile_rows,) + trailing_shape)
runtime.create_manual_task(load_tile.library, load_tile.task_id, (num_tasks,))