dali-dynamic-mode

Par nvidia · skills

À utiliser lors de l'écriture de code de chargement ou de prétraitement de données DALI avec `nvidia.dali.experimental.dynamic` (ndd), ou lors de la conversion de code DALI en mode pipeline vers le mode dynamique, ou lorsque l'utilisateur pose des questions sur le mode dynamique DALI, DALI impératif, ou ndd. Utilisez cette skill dès qu'une personne mentionne « ndd », « dynamic mode », ou souhaite charger/augmenter des données avec DALI en dehors d'une définition de pipeline.

npx skills add https://github.com/nvidia/skills --skill dali-dynamic-mode

Mode Dynamic DALI

Le mode dynamique est l'API Python impérative de DALI. Appelez les opérateurs DALI comme des fonctions Python ordinaires avec un contrôle de flux standard -- pas de graphe de pipeline, pas de pipe.build(), pas de pipe.run().

import nvidia.dali.experimental.dynamic as ndd

Types de données principaux

Tensor -- un seul échantillon

t = ndd.tensor(data)           # copie
t = ndd.as_tensor(data)        # enveloppe, pas de copie si possible
t.cpu()                        # déplacer vers CPU
t.gpu()                        # déplacer vers GPU
t.torch(copy=False)            # conversion en tenseur PyTorch sans copie (par défaut)
t[1:3]                         # slicing supporté
np.asarray(t)                  # NumPy via __array__ (CPU uniquement)

Supporte __dlpack__, __cuda_array_interface__, __array__, opérateurs arithmétiques.

Batch -- collection d'échantillons (formes variables OK)

b = ndd.batch([arr1, arr2])    # copie
b = ndd.as_batch(data)         # enveloppe, pas de copie si possible

Batch n'a pas __getitem__ -- batch[i] lève TypeError car l'indexation est ambiguë (sélection d'échantillon vs. slicing par échantillon). Utilisez plutôt les API explicites :

Intention Méthode Retourne
Obtenir l'échantillon i batch.select(i) Tensor
Obtenir un sous-ensemble d'échantillons batch.select(slice_or_list) Batch
Slicing dans chaque échantillon batch.slice[...] Batch (même batch_size)

.select() choisit quels échantillons. .slice indexe à l'intérieur de chaque échantillon.

xy = ndd.random.uniform(batch_size=16, range=[0, 1], shape=2)
crop_x = xy.slice[0]       # Batch de 16 scalaires, premier élément de chaque échantillon
crop_y = xy.slice[1]       # Batch de 16 scalaires, second élément de chaque échantillon
sample_0 = xy.select(0)    # Tensor, l'intégralité du premier échantillon [x, y]

Conversion PyTorch :

  • batch.torch() -- fonctionne pour les formes uniformes ; lève pour les batches irréguliers
  • batch.torch(pad=True) -- complète les batches irréguliers avec des zéros à la forme maximale (utiliser pour l'audio de longueur variable, les boîtes de détection, etc.)
  • batch.torch(copy=None) est la valeur par défaut (évite la copie si possible)
  • Batch n'a pas __dlpack__ -- utilisez ndd.as_tensor(batch) en premier pour les consommateurs DLPack. ndd.as_tensor supporte aussi pad.
  • Tensor.torch(copy=False) est la valeur par défaut (pas de copie)

Itération : for sample in batch: produit des Tensors.

Lecteurs

Les lecteurs sont des objets avec état -- créez une fois, réutilisez à travers les epochs. C'est important car les lecteurs conservent un état interne comme l'ordre de mélange et la position du shard.

reader = ndd.readers.File(file_root=image_dir, random_shuffle=True)

for epoch in range(num_epochs):
    for jpegs, labels in reader.next_epoch(batch_size=64):
        # jpegs, labels sont des objets Batch
        ...

Points clés :

  • Les sorties du lecteur (jpegs, labels, etc.) sont des tenseurs/batches CPU. Les labels restent généralement sur CPU jusqu'à ce que vous les convertiez pour votre framework (par ex. labels.torch().to(device)).
  • Les classes de lecteur sont en PascalCase : ndd.readers.File(...), ndd.readers.COCO(...), ndd.readers.TFRecord(...)
  • batch_size va à next_epoch(), pas au constructeur du lecteur
  • next_epoch(batch_size=N) produit des tuples de Batch ; next_epoch() sans batch_size produit des tuples de Tensor
  • L'itérateur de next_epoch() doit être entièrement consommé avant d'appeler next_epoch() à nouveau
  • Une fois qu'un lecteur est utilisé avec une batch_size donnée, il ne peut pas être changé. De même, un lecteur utilisé en mode batch ne peut pas basculer vers le mode échantillon ou vice versa.

Lecture shardée pour l'entraînement distribué :

reader = ndd.readers.File(
    file_root=image_dir,
    shard_id=rank, num_shards=world_size,
    stick_to_shard=True,
    pad_last_batch=True,
)

Gestion des périphériques

  • Le périphérique est déduit des entrées -- GPU si l'une des entrées est sur GPU
  • Pour le décodage hybride : utilisez device="gpu" (NON "mixed"). Le mot-clé "mixed" est un concept du mode pipeline pour le transfert CPU-vers-GPU implicite ; en mode dynamique, passer device="gpu" active le même chemin de décodage accéléré par matériel.
  • N'appelez pas .cpu() avant de passer à un modèle GPU -- .torch() vous donne directement un tenseur GPU. .cpu() n'est nécessaire que pour les consommateurs requérant la mémoire hôte (numpy, __array__).
  • La synchronisation du flux CUDA entre DALI et PyTorch est automatique via DLPack -- aucune gestion manuelle de flux nécessaire.

Modèle d'exécution

Le mode par défaut est eager -- exécution asynchrone dans un thread d'arrière-plan, retour immédiat.

Pas besoin d'.evaluate() dans la plupart des cas. Toute consommation de données (.torch(), __dlpack__, __array__, .shape, accès aux propriétés, itération) déclenche l'évaluation automatiquement.

Pour le débogage, basculez vers le mode synchrone pour que les erreurs remontent au site d'appel exact plutôt que plus tard dans la queue asynchrone :

with ndd.EvalMode.sync_full:
    images = ndd.decoders.image(jpegs, device="gpu")
    images = ndd.resize(images, size=[224, 224])
    # Toute erreur remonte ici, à l'exact op qui a échoué

Modes (synchronicité croissante) : deferred < eager < sync_cpu < sync_full

Utilisez EvalMode.sync_full pour le débogage au lieu de disperser des appels .evaluate() -- c'est plus propre et capture tous les problèmes à la fois. sync_cpu est souvent suffisant et plus léger que sync_full.

Configuration des threads

ndd.set_num_threads(4)  # Appelez une fois au démarrage, seulement si nécessaire pour surcharger les défauts

Contrôle les threads de travail internes de DALI pour les opérateurs CPU. Défaut : nombre d'affinités CPU ou variable d'env DALI_NUM_THREADS. Non lié au threading au niveau Python.

RNG

Deux approches (utilisez l'une, pas les deux) :

# Approche 1 : définir la seed par défaut thread-local (simple, suffisant pour la plupart des cas)
ndd.random.set_seed(42)
angles = ndd.random.uniform(batch_size=64, range=(-30, 30))

# Approche 2 : objet RNG explicite (contrôle plus fin, passer rng= à chaque op)
rng = ndd.random.RNG(seed=42)
values = ndd.random.uniform(batch_size=64, range=[0, 1], shape=2, rng=rng)

Quand rng= est passé à un op aléatoire, le RNG explicite surcharge la seed par défaut. Thread-local : chaque thread a un état aléatoire indépendant.

Les ops aléatoires ont besoin d'une batch_size explicite quand on travaille avec des batches -- il n'y a pas de batch_size au niveau du pipeline à hériter.

Exemple : Pipeline de classification d'images

import nvidia.dali.experimental.dynamic as ndd

ndd.set_num_threads(4)
reader = ndd.readers.File(file_root="/data/imagenet/train", random_shuffle=True)

for epoch in range(num_epochs):
    for jpegs, labels in reader.next_epoch(batch_size=64):
        images = ndd.decoders.image(jpegs, device="gpu")
        images = ndd.resize(images, size=[224, 224])
        images = ndd.crop_mirror_normalize(
            images,
            mean=[0.485 * 255, 0.456 * 255, 0.406 * 255],
            std=[0.229 * 255, 0.224 * 255, 0.225 * 255],
        )
        train_step(images.torch(), labels.torch())

Erreurs courantes

Mauvais Correct Pourquoi
device="mixed" device="gpu" "mixed" est en mode pipeline uniquement
batch[i] batch.select(i) Batch n'a pas __getitem__
batch.select(0) pour le slicing par échantillon batch.slice[0] .select() choisit les échantillons ; .slice slice à l'intérieur de chaque échantillon
.evaluate() après chaque op Laisser la consommation déclencher eval .torch(), .shape, etc. la déclenchent automatiquement
.cpu() avant un modèle GPU .torch() directement Évite un round-trip wasteful D2H + H2D
Recréer un lecteur à chaque epoch reader.next_epoch() Les lecteurs sont stateful -- créez une fois, réutilisez
ndd.readers.file(...) ndd.readers.File(...) Les classes de lecteur sont PascalCase
break de la boucle next_epoch() Épuiser l'itérateur ou créer un nouveau lecteur L'itérateur doit être entièrement consommé avant le prochain next_epoch()
Pas de batch_size aux ops aléatoires ndd.random.uniform(batch_size=N, ...) Pas de batch_size au niveau du pipeline à hériter

Migration depuis le mode pipeline

Mode pipeline Mode dynamique
@pipeline_def / pipe.build() / pipe.run() Appels de fonction directs dans une boucle
fn.readers.file(...) ndd.readers.File(...) (PascalCase, stateful)
fn.decoders.image(jpegs, device="mixed") ndd.decoders.image(jpegs, device="gpu")
fn.op_name(...) ndd.op_name(...)
Pipeline-level batch_size=64 reader.next_epoch(batch_size=64) + ops aléatoires batch_size=64
Pipeline-level seed=42 ndd.random.set_seed(42) ou ndd.random.RNG(seed=42)
Pipeline-level num_threads=4 ndd.set_num_threads(4) au démarrage
output.at(i) batch.select(i)
output.as_cpu() batch.cpu()
pipe.run() retourne tuple de TensorList reader.next_epoch(batch_size=N) produit tuples de Batch

Skills similaires