Mode Dynamic DALI
Le mode dynamique est l'API Python impérative de DALI. Appelez les opérateurs DALI comme des fonctions Python ordinaires avec un contrôle de flux standard -- pas de graphe de pipeline, pas de pipe.build(), pas de pipe.run().
import nvidia.dali.experimental.dynamic as ndd
Types de données principaux
Tensor -- un seul échantillon
t = ndd.tensor(data) # copie
t = ndd.as_tensor(data) # enveloppe, pas de copie si possible
t.cpu() # déplacer vers CPU
t.gpu() # déplacer vers GPU
t.torch(copy=False) # conversion en tenseur PyTorch sans copie (par défaut)
t[1:3] # slicing supporté
np.asarray(t) # NumPy via __array__ (CPU uniquement)
Supporte __dlpack__, __cuda_array_interface__, __array__, opérateurs arithmétiques.
Batch -- collection d'échantillons (formes variables OK)
b = ndd.batch([arr1, arr2]) # copie
b = ndd.as_batch(data) # enveloppe, pas de copie si possible
Batch n'a pas __getitem__ -- batch[i] lève TypeError car l'indexation est ambiguë (sélection d'échantillon vs. slicing par échantillon). Utilisez plutôt les API explicites :
| Intention | Méthode | Retourne |
|---|---|---|
| Obtenir l'échantillon i | batch.select(i) |
Tensor |
| Obtenir un sous-ensemble d'échantillons | batch.select(slice_or_list) |
Batch |
| Slicing dans chaque échantillon | batch.slice[...] |
Batch (même batch_size) |
.select() choisit quels échantillons. .slice indexe à l'intérieur de chaque échantillon.
xy = ndd.random.uniform(batch_size=16, range=[0, 1], shape=2)
crop_x = xy.slice[0] # Batch de 16 scalaires, premier élément de chaque échantillon
crop_y = xy.slice[1] # Batch de 16 scalaires, second élément de chaque échantillon
sample_0 = xy.select(0) # Tensor, l'intégralité du premier échantillon [x, y]
Conversion PyTorch :
batch.torch()-- fonctionne pour les formes uniformes ; lève pour les batches irréguliersbatch.torch(pad=True)-- complète les batches irréguliers avec des zéros à la forme maximale (utiliser pour l'audio de longueur variable, les boîtes de détection, etc.)batch.torch(copy=None)est la valeur par défaut (évite la copie si possible)- Batch n'a pas
__dlpack__-- utilisezndd.as_tensor(batch)en premier pour les consommateurs DLPack.ndd.as_tensorsupporte aussipad. Tensor.torch(copy=False)est la valeur par défaut (pas de copie)
Itération : for sample in batch: produit des Tensors.
Lecteurs
Les lecteurs sont des objets avec état -- créez une fois, réutilisez à travers les epochs. C'est important car les lecteurs conservent un état interne comme l'ordre de mélange et la position du shard.
reader = ndd.readers.File(file_root=image_dir, random_shuffle=True)
for epoch in range(num_epochs):
for jpegs, labels in reader.next_epoch(batch_size=64):
# jpegs, labels sont des objets Batch
...
Points clés :
- Les sorties du lecteur (jpegs, labels, etc.) sont des tenseurs/batches CPU. Les labels restent généralement sur CPU jusqu'à ce que vous les convertiez pour votre framework (par ex.
labels.torch().to(device)). - Les classes de lecteur sont en PascalCase :
ndd.readers.File(...),ndd.readers.COCO(...),ndd.readers.TFRecord(...) batch_sizeva ànext_epoch(), pas au constructeur du lecteurnext_epoch(batch_size=N)produit des tuples deBatch;next_epoch()sans batch_size produit des tuples deTensor- L'itérateur de
next_epoch()doit être entièrement consommé avant d'appelernext_epoch()à nouveau - Une fois qu'un lecteur est utilisé avec une batch_size donnée, il ne peut pas être changé. De même, un lecteur utilisé en mode batch ne peut pas basculer vers le mode échantillon ou vice versa.
Lecture shardée pour l'entraînement distribué :
reader = ndd.readers.File(
file_root=image_dir,
shard_id=rank, num_shards=world_size,
stick_to_shard=True,
pad_last_batch=True,
)
Gestion des périphériques
- Le périphérique est déduit des entrées -- GPU si l'une des entrées est sur GPU
- Pour le décodage hybride : utilisez
device="gpu"(NON"mixed"). Le mot-clé"mixed"est un concept du mode pipeline pour le transfert CPU-vers-GPU implicite ; en mode dynamique, passerdevice="gpu"active le même chemin de décodage accéléré par matériel. - N'appelez pas
.cpu()avant de passer à un modèle GPU --.torch()vous donne directement un tenseur GPU..cpu()n'est nécessaire que pour les consommateurs requérant la mémoire hôte (numpy,__array__). - La synchronisation du flux CUDA entre DALI et PyTorch est automatique via DLPack -- aucune gestion manuelle de flux nécessaire.
Modèle d'exécution
Le mode par défaut est eager -- exécution asynchrone dans un thread d'arrière-plan, retour immédiat.
Pas besoin d'.evaluate() dans la plupart des cas. Toute consommation de données (.torch(), __dlpack__, __array__, .shape, accès aux propriétés, itération) déclenche l'évaluation automatiquement.
Pour le débogage, basculez vers le mode synchrone pour que les erreurs remontent au site d'appel exact plutôt que plus tard dans la queue asynchrone :
with ndd.EvalMode.sync_full:
images = ndd.decoders.image(jpegs, device="gpu")
images = ndd.resize(images, size=[224, 224])
# Toute erreur remonte ici, à l'exact op qui a échoué
Modes (synchronicité croissante) : deferred < eager < sync_cpu < sync_full
Utilisez EvalMode.sync_full pour le débogage au lieu de disperser des appels .evaluate() -- c'est plus propre et capture tous les problèmes à la fois. sync_cpu est souvent suffisant et plus léger que sync_full.
Configuration des threads
ndd.set_num_threads(4) # Appelez une fois au démarrage, seulement si nécessaire pour surcharger les défauts
Contrôle les threads de travail internes de DALI pour les opérateurs CPU. Défaut : nombre d'affinités CPU ou variable d'env DALI_NUM_THREADS. Non lié au threading au niveau Python.
RNG
Deux approches (utilisez l'une, pas les deux) :
# Approche 1 : définir la seed par défaut thread-local (simple, suffisant pour la plupart des cas)
ndd.random.set_seed(42)
angles = ndd.random.uniform(batch_size=64, range=(-30, 30))
# Approche 2 : objet RNG explicite (contrôle plus fin, passer rng= à chaque op)
rng = ndd.random.RNG(seed=42)
values = ndd.random.uniform(batch_size=64, range=[0, 1], shape=2, rng=rng)
Quand rng= est passé à un op aléatoire, le RNG explicite surcharge la seed par défaut. Thread-local : chaque thread a un état aléatoire indépendant.
Les ops aléatoires ont besoin d'une batch_size explicite quand on travaille avec des batches -- il n'y a pas de batch_size au niveau du pipeline à hériter.
Exemple : Pipeline de classification d'images
import nvidia.dali.experimental.dynamic as ndd
ndd.set_num_threads(4)
reader = ndd.readers.File(file_root="/data/imagenet/train", random_shuffle=True)
for epoch in range(num_epochs):
for jpegs, labels in reader.next_epoch(batch_size=64):
images = ndd.decoders.image(jpegs, device="gpu")
images = ndd.resize(images, size=[224, 224])
images = ndd.crop_mirror_normalize(
images,
mean=[0.485 * 255, 0.456 * 255, 0.406 * 255],
std=[0.229 * 255, 0.224 * 255, 0.225 * 255],
)
train_step(images.torch(), labels.torch())
Erreurs courantes
| Mauvais | Correct | Pourquoi |
|---|---|---|
device="mixed" |
device="gpu" |
"mixed" est en mode pipeline uniquement |
batch[i] |
batch.select(i) |
Batch n'a pas __getitem__ |
batch.select(0) pour le slicing par échantillon |
batch.slice[0] |
.select() choisit les échantillons ; .slice slice à l'intérieur de chaque échantillon |
.evaluate() après chaque op |
Laisser la consommation déclencher eval | .torch(), .shape, etc. la déclenchent automatiquement |
.cpu() avant un modèle GPU |
.torch() directement |
Évite un round-trip wasteful D2H + H2D |
| Recréer un lecteur à chaque epoch | reader.next_epoch() |
Les lecteurs sont stateful -- créez une fois, réutilisez |
ndd.readers.file(...) |
ndd.readers.File(...) |
Les classes de lecteur sont PascalCase |
break de la boucle next_epoch() |
Épuiser l'itérateur ou créer un nouveau lecteur | L'itérateur doit être entièrement consommé avant le prochain next_epoch() |
Pas de batch_size aux ops aléatoires |
ndd.random.uniform(batch_size=N, ...) |
Pas de batch_size au niveau du pipeline à hériter |
Migration depuis le mode pipeline
| Mode pipeline | Mode dynamique |
|---|---|
@pipeline_def / pipe.build() / pipe.run() |
Appels de fonction directs dans une boucle |
fn.readers.file(...) |
ndd.readers.File(...) (PascalCase, stateful) |
fn.decoders.image(jpegs, device="mixed") |
ndd.decoders.image(jpegs, device="gpu") |
fn.op_name(...) |
ndd.op_name(...) |
Pipeline-level batch_size=64 |
reader.next_epoch(batch_size=64) + ops aléatoires batch_size=64 |
Pipeline-level seed=42 |
ndd.random.set_seed(42) ou ndd.random.RNG(seed=42) |
Pipeline-level num_threads=4 |
ndd.set_num_threads(4) au démarrage |
output.at(i) |
batch.select(i) |
output.as_cpu() |
batch.cpu() |
pipe.run() retourne tuple de TensorList |
reader.next_epoch(batch_size=N) produit tuples de Batch |