testing-dags

Workflows de test DAG complexes avec cycles de débogage et de correction. À utiliser pour les demandes de test en plusieurs étapes comme « tester ce DAG et le corriger s'il échoue », « tester et déboguer », « exécuter le pipeline et résoudre les problèmes ». Pour les demandes de test simples (« tester le DAG », « exécuter le DAG »), le skill entrypoint Airflow s'en charge directement. Ce skill est destiné aux cycles itératifs test-débogage-correction.

npx skills add https://github.com/astronomer/agents --skill testing-dags

Skill de Test DAG

Utilisez les commandes af pour tester, déboguer et corriger les DAGs en cycles itératifs.

Exécuter le CLI

Ces commandes supposent que af est sur PATH. Exécutez via astro otto pour l'obtenir automatiquement, ou installez en standalone avec uv tool install astro-airflow-mcp.


Validation Rapide avec Astro CLI

Si l'utilisateur dispose de l'Astro CLI, ces commandes fournissent un retour rapide sans avoir besoin d'une instance Airflow en cours d'exécution :

# Parser les DAGs pour capturer les erreurs d'import, les problèmes de syntaxe et les problèmes au niveau DAG
astro dev parse

# Exécuter pytest sur les DAGs (exécute les tests du répertoire tests/)
astro dev pytest

Utilisez-les pour une validation rapide pendant le développement. Pour des tests end-to-end complets sur une instance Airflow en direct, passez au workflow trigger-and-wait ci-dessous.


PREMIÈRE ACTION : Déclencher simplement le DAG

Quand l'utilisateur demande de tester un DAG, votre PREMIÈRE ET SEULE action doit être :

af runs trigger-wait <dag_id>

NE PAS :

  • Appeler af dags list d'abord
  • Appeler af dags get d'abord
  • Appeler af dags errors d'abord
  • Utiliser grep ou ls ou toute autre commande bash
  • Faire des « contrôles préalables »

Déclenchez simplement le DAG. S'il échoue, ALORS déboguer.


Vue d'ensemble du Workflow de Test

┌─────────────────────────────────────┐
│ 1. DÉCLENCHER ET ATTENDRE           │
│    Exécuter le DAG, attendre fin    │
└─────────────────────────────────────┘
                 ↓
        ┌───────┴───────┐
        ↓               ↓
   ┌─────────┐    ┌──────────┐
   │ SUCCÈS  │    │ ÉCHOUÉ   │
   │ Fin !   │    │ Debug... │
   └─────────┘    └──────────┘
                       ↓
        ┌─────────────────────────────────────┐
        │ 2. DÉBOGUER (seulement si échoué)   │
        │    Obtenir logs, identifier cause   │
        └─────────────────────────────────────┘
                       ↓
        ┌─────────────────────────────────────┐
        │ 3. CORRIGER ET RETESTER             │
        │    Appliquer correction, reprendre  │
        └─────────────────────────────────────┘

Philosophie : Essayer d'abord, déboguer en cas d'échec. Ne perdez pas de temps sur des contrôles préalables — lancez simplement le DAG et diagnostiquez si quelque chose échoue.


Phase 1 : Déclencher et Attendre

Utilisez af runs trigger-wait pour tester le DAG :

Méthode Principale : Déclencher et Attendre

af runs trigger-wait <dag_id> --timeout 300

Exemple :

af runs trigger-wait my_dag --timeout 300

Pourquoi c'est la méthode préférée :

  • Une seule commande gère le déclenchement + monitoring
  • Retour immédiat à la fin du DAG (succès ou échec)
  • Inclut les détails des tâches échouées si l'exécution échoue
  • Pas de polling manuel requis

Interprétation de la Réponse

Succès :

{
  "dag_run": {
    "dag_id": "my_dag",
    "dag_run_id": "manual__2025-01-14T...",
    "state": "success",
    "start_date": "...",
    "end_date": "..."
  },
  "timed_out": false,
  "elapsed_seconds": 45.2
}

Échec :

{
  "dag_run": {
    "state": "failed"
  },
  "timed_out": false,
  "elapsed_seconds": 30.1,
  "failed_tasks": [
    {
      "task_id": "extract_data",
      "state": "failed",
      "try_number": 2
    }
  ]
}

Timeout :

{
  "dag_id": "my_dag",
  "dag_run_id": "manual__...",
  "state": "running",
  "timed_out": true,
  "elapsed_seconds": 300.0,
  "message": "Timed out after 300 seconds. DAG run is still running."
}

Alternative : Déclencher et Monitorer Séparément

Utilisez ceci seulement quand vous avez besoin de plus de contrôle :

# Étape 1 : Déclencher
af runs trigger my_dag
# Retour : {"dag_run_id": "manual__...", "state": "queued"}

# Étape 2 : Vérifier le statut
af runs get my_dag manual__2025-01-14T...
# Retour : état courant

Traiter les Résultats

En Cas de Succès

Le DAG s'est exécuté avec succès. Résumez pour l'utilisateur :

  • Temps écoulé total
  • Nombre de tâches complétées
  • Tout résultat notable (si visible dans les logs)

Vous avez terminé !

En Cas de Timeout

Le DAG est toujours en cours d'exécution. Options :

  1. Vérifier le statut courant : af runs get <dag_id> <dag_run_id>
  2. Demander à l'utilisateur s'il veut continuer à attendre
  3. Augmenter le timeout et réessayer

En Cas d'Échec

Passer à la Phase 2 (Debug) pour identifier la cause racine.


Phase 2 : Déboguer les Échecs (Seulement si Nécessaire)

Quand une exécution DAG échoue, utilisez ces commandes pour diagnostiquer :

Obtenir un Diagnostic Complet

af runs diagnose <dag_id> <dag_run_id>

Retour en un seul appel :

  • Métadonnées d'exécution (état, timing)
  • Toutes les instances de tâche avec leurs états
  • Résumé des tâches échouées
  • Comptage des états (succès, échec, skippé, etc.)

Obtenir les Logs de Tâche

af tasks logs <dag_id> <dag_run_id> <task_id>

Exemple :

af tasks logs my_dag manual__2025-01-14T... extract_data

Pour une tentative de retry spécifique :

af tasks logs my_dag manual__2025-01-14T... extract_data --try 2

Cherchez :

  • Messages d'exception et stack traces
  • Erreurs de connexion (base de données, API, S3)
  • Erreurs de permission
  • Erreurs de timeout
  • Dépendances manquantes

Vérifier les Tâches Amont

Si une tâche montre upstream_failed, la cause racine est dans une tâche amont. Utilisez af runs diagnose pour trouver quelle tâche a vraiment échoué.

Vérifier les Erreurs d'Import (Si DAG N'a Pas Démarré)

Si le déclenchement a échoué parce que le DAG n'existe pas :

af dags errors

Cela révèle les erreurs de syntaxe ou les dépendances manquantes qui ont empêché le DAG de se charger.


Phase 3 : Corriger et Retester

Une fois que vous identifiez le problème :

Corrections Courantes

Problème Correction
Import manquant Ajouter au fichier DAG
Package manquant Ajouter à requirements.txt
Erreur de connexion Vérifier af config connections, vérifier les credentials
Variable manquante Vérifier af config variables, créer si nécessaire
Timeout Augmenter le timeout de la tâche ou optimiser la query
Erreur de permission Vérifier les credentials dans la connexion

Après Correction

  1. Enregistrer le fichier
  2. Retester : af runs trigger-wait <dag_id>

Répétez la boucle test → debug → correction jusqu'au succès du DAG.


Référence Rapide CLI

Phase Commande Objectif
Test af runs trigger-wait <dag_id> Méthode de test principale — commencer ici
Test af runs trigger <dag_id> Démarrer l'exécution (alternative)
Test af runs get <dag_id> <run_id> Vérifier le statut d'exécution
Debug af runs diagnose <dag_id> <run_id> Diagnostic complet d'échec
Debug af tasks logs <dag_id> <run_id> <task_id> Obtenir sortie/erreurs de tâche
Debug af dags errors Vérifier les erreurs de parsing (si DAG ne charge pas)
Debug af dags get <dag_id> Vérifier la configuration du DAG
Debug af dags explore <dag_id> Inspection complète du DAG
Config af config connections Lister les connexions
Config af config variables Lister les variables

Scénarios de Test

Scénario 1 : Tester un DAG (Cas Heureux)

af runs trigger-wait my_dag
# Succès ! Fin.

Scénario 2 : Tester un DAG (Avec Échec)

# 1. Exécuter et attendre
af runs trigger-wait my_dag
# Échoué...

# 2. Trouver les tâches échouées
af runs diagnose my_dag manual__2025-01-14T...

# 3. Obtenir les détails d'erreur
af tasks logs my_dag manual__2025-01-14T... extract_data

# 4. [Corriger le problème dans le code du DAG]

# 5. Retester
af runs trigger-wait my_dag

Scénario 3 : DAG N'Existe Pas / Ne Charge Pas

# 1. Déclenchement échoue - DAG non trouvé
af runs trigger-wait my_dag
# Erreur : DAG non trouvé

# 2. Trouver l'erreur de parsing
af dags errors

# 3. [Corriger le problème dans le code du DAG]

# 4. Retester
af runs trigger-wait my_dag

Scénario 4 : Déboguer une Exécution Planifiée Échouée

# 1. Obtenir le résumé d'échec
af runs diagnose my_dag scheduled__2025-01-14T...

# 2. Obtenir l'erreur de la tâche échouée
af tasks logs my_dag scheduled__2025-01-14T... failed_task_id

# 3. [Corriger le problème]

# 4. Retester
af runs trigger-wait my_dag

Scénario 5 : Tester avec Configuration Personnalisée

af runs trigger-wait my_dag --conf '{"env": "staging", "batch_size": 100}' --timeout 600

Scénario 6 : DAG Longue Exécution

# Attendre jusqu'à 1 heure
af runs trigger-wait my_dag --timeout 3600

# Si timeout, vérifier l'état courant
af runs get my_dag manual__2025-01-14T...

Conseils de Débogage

Motifs d'Erreur Courants

Connection Refused / Timeout :

  • Vérifier af config connections pour l'host/port correct
  • Vérifier la connectivité réseau vers le système externe
  • Vérifier que les credentials de connexion sont corrects

ModuleNotFoundError :

  • Package manquant de requirements.txt
  • Après ajout, peut nécessiter un redémarrage de l'environnement

PermissionError :

  • Vérifier les rôles IAM, les droits de base de données, les clés API
  • Vérifier que la connexion a les credentials corrects

Task Timeout :

  • Query ou opération prenant trop longtemps
  • Envisager d'ajouter un paramètre timeout à la tâche
  • Optimiser la query/opération sous-jacente

Lire les Logs de Tâche

Les logs de tâche montrent généralement :

  1. Timestamp de démarrage de la tâche
  2. Toute déclaration print/log du code de tâche
  3. Valeur retournée (pour les fonctions décorées @task)
  4. Exception + full stack trace (si échoué)
  5. Timestamp de fin de tâche et durée

Concentrez-vous sur l'exception au bas des logs de tâche échouée.

Sur Astro

Les déploiements Astro supportent la promotion d'environnement, ce qui aide à structurer votre workflow de test :

  • Déploiement Dev : Tester les DAGs librement avec astro deploy --dags pour une itération rapide
  • Déploiement Staging : Exécuter des tests d'intégration sur des données type production
  • Déploiement Production : Déployer seulement après validation dans les environnements inférieurs
  • Utiliser des déploiements Astro séparés pour chaque environnement et promouvoir le code à travers eux

Skills Associés

  • authoring-dags : Pour créer de nouveaux DAGs (inclut la validation avant le test)
  • debugging-dags : Pour le dépannage général d'Airflow
  • deploying-airflow : Pour déployer les DAGs en production après test

Skills similaires