Skill de Test DAG
Utilisez les commandes af pour tester, déboguer et corriger les DAGs en cycles itératifs.
Exécuter le CLI
Ces commandes supposent que af est sur PATH. Exécutez via astro otto pour l'obtenir automatiquement, ou installez en standalone avec uv tool install astro-airflow-mcp.
Validation Rapide avec Astro CLI
Si l'utilisateur dispose de l'Astro CLI, ces commandes fournissent un retour rapide sans avoir besoin d'une instance Airflow en cours d'exécution :
# Parser les DAGs pour capturer les erreurs d'import, les problèmes de syntaxe et les problèmes au niveau DAG
astro dev parse
# Exécuter pytest sur les DAGs (exécute les tests du répertoire tests/)
astro dev pytest
Utilisez-les pour une validation rapide pendant le développement. Pour des tests end-to-end complets sur une instance Airflow en direct, passez au workflow trigger-and-wait ci-dessous.
PREMIÈRE ACTION : Déclencher simplement le DAG
Quand l'utilisateur demande de tester un DAG, votre PREMIÈRE ET SEULE action doit être :
af runs trigger-wait <dag_id>
NE PAS :
- Appeler
af dags listd'abord - Appeler
af dags getd'abord - Appeler
af dags errorsd'abord - Utiliser
grepoulsou toute autre commande bash - Faire des « contrôles préalables »
Déclenchez simplement le DAG. S'il échoue, ALORS déboguer.
Vue d'ensemble du Workflow de Test
┌─────────────────────────────────────┐
│ 1. DÉCLENCHER ET ATTENDRE │
│ Exécuter le DAG, attendre fin │
└─────────────────────────────────────┘
↓
┌───────┴───────┐
↓ ↓
┌─────────┐ ┌──────────┐
│ SUCCÈS │ │ ÉCHOUÉ │
│ Fin ! │ │ Debug... │
└─────────┘ └──────────┘
↓
┌─────────────────────────────────────┐
│ 2. DÉBOGUER (seulement si échoué) │
│ Obtenir logs, identifier cause │
└─────────────────────────────────────┘
↓
┌─────────────────────────────────────┐
│ 3. CORRIGER ET RETESTER │
│ Appliquer correction, reprendre │
└─────────────────────────────────────┘
Philosophie : Essayer d'abord, déboguer en cas d'échec. Ne perdez pas de temps sur des contrôles préalables — lancez simplement le DAG et diagnostiquez si quelque chose échoue.
Phase 1 : Déclencher et Attendre
Utilisez af runs trigger-wait pour tester le DAG :
Méthode Principale : Déclencher et Attendre
af runs trigger-wait <dag_id> --timeout 300
Exemple :
af runs trigger-wait my_dag --timeout 300
Pourquoi c'est la méthode préférée :
- Une seule commande gère le déclenchement + monitoring
- Retour immédiat à la fin du DAG (succès ou échec)
- Inclut les détails des tâches échouées si l'exécution échoue
- Pas de polling manuel requis
Interprétation de la Réponse
Succès :
{
"dag_run": {
"dag_id": "my_dag",
"dag_run_id": "manual__2025-01-14T...",
"state": "success",
"start_date": "...",
"end_date": "..."
},
"timed_out": false,
"elapsed_seconds": 45.2
}
Échec :
{
"dag_run": {
"state": "failed"
},
"timed_out": false,
"elapsed_seconds": 30.1,
"failed_tasks": [
{
"task_id": "extract_data",
"state": "failed",
"try_number": 2
}
]
}
Timeout :
{
"dag_id": "my_dag",
"dag_run_id": "manual__...",
"state": "running",
"timed_out": true,
"elapsed_seconds": 300.0,
"message": "Timed out after 300 seconds. DAG run is still running."
}
Alternative : Déclencher et Monitorer Séparément
Utilisez ceci seulement quand vous avez besoin de plus de contrôle :
# Étape 1 : Déclencher
af runs trigger my_dag
# Retour : {"dag_run_id": "manual__...", "state": "queued"}
# Étape 2 : Vérifier le statut
af runs get my_dag manual__2025-01-14T...
# Retour : état courant
Traiter les Résultats
En Cas de Succès
Le DAG s'est exécuté avec succès. Résumez pour l'utilisateur :
- Temps écoulé total
- Nombre de tâches complétées
- Tout résultat notable (si visible dans les logs)
Vous avez terminé !
En Cas de Timeout
Le DAG est toujours en cours d'exécution. Options :
- Vérifier le statut courant :
af runs get <dag_id> <dag_run_id> - Demander à l'utilisateur s'il veut continuer à attendre
- Augmenter le timeout et réessayer
En Cas d'Échec
Passer à la Phase 2 (Debug) pour identifier la cause racine.
Phase 2 : Déboguer les Échecs (Seulement si Nécessaire)
Quand une exécution DAG échoue, utilisez ces commandes pour diagnostiquer :
Obtenir un Diagnostic Complet
af runs diagnose <dag_id> <dag_run_id>
Retour en un seul appel :
- Métadonnées d'exécution (état, timing)
- Toutes les instances de tâche avec leurs états
- Résumé des tâches échouées
- Comptage des états (succès, échec, skippé, etc.)
Obtenir les Logs de Tâche
af tasks logs <dag_id> <dag_run_id> <task_id>
Exemple :
af tasks logs my_dag manual__2025-01-14T... extract_data
Pour une tentative de retry spécifique :
af tasks logs my_dag manual__2025-01-14T... extract_data --try 2
Cherchez :
- Messages d'exception et stack traces
- Erreurs de connexion (base de données, API, S3)
- Erreurs de permission
- Erreurs de timeout
- Dépendances manquantes
Vérifier les Tâches Amont
Si une tâche montre upstream_failed, la cause racine est dans une tâche amont. Utilisez af runs diagnose pour trouver quelle tâche a vraiment échoué.
Vérifier les Erreurs d'Import (Si DAG N'a Pas Démarré)
Si le déclenchement a échoué parce que le DAG n'existe pas :
af dags errors
Cela révèle les erreurs de syntaxe ou les dépendances manquantes qui ont empêché le DAG de se charger.
Phase 3 : Corriger et Retester
Une fois que vous identifiez le problème :
Corrections Courantes
| Problème | Correction |
|---|---|
| Import manquant | Ajouter au fichier DAG |
| Package manquant | Ajouter à requirements.txt |
| Erreur de connexion | Vérifier af config connections, vérifier les credentials |
| Variable manquante | Vérifier af config variables, créer si nécessaire |
| Timeout | Augmenter le timeout de la tâche ou optimiser la query |
| Erreur de permission | Vérifier les credentials dans la connexion |
Après Correction
- Enregistrer le fichier
- Retester :
af runs trigger-wait <dag_id>
Répétez la boucle test → debug → correction jusqu'au succès du DAG.
Référence Rapide CLI
| Phase | Commande | Objectif |
|---|---|---|
| Test | af runs trigger-wait <dag_id> |
Méthode de test principale — commencer ici |
| Test | af runs trigger <dag_id> |
Démarrer l'exécution (alternative) |
| Test | af runs get <dag_id> <run_id> |
Vérifier le statut d'exécution |
| Debug | af runs diagnose <dag_id> <run_id> |
Diagnostic complet d'échec |
| Debug | af tasks logs <dag_id> <run_id> <task_id> |
Obtenir sortie/erreurs de tâche |
| Debug | af dags errors |
Vérifier les erreurs de parsing (si DAG ne charge pas) |
| Debug | af dags get <dag_id> |
Vérifier la configuration du DAG |
| Debug | af dags explore <dag_id> |
Inspection complète du DAG |
| Config | af config connections |
Lister les connexions |
| Config | af config variables |
Lister les variables |
Scénarios de Test
Scénario 1 : Tester un DAG (Cas Heureux)
af runs trigger-wait my_dag
# Succès ! Fin.
Scénario 2 : Tester un DAG (Avec Échec)
# 1. Exécuter et attendre
af runs trigger-wait my_dag
# Échoué...
# 2. Trouver les tâches échouées
af runs diagnose my_dag manual__2025-01-14T...
# 3. Obtenir les détails d'erreur
af tasks logs my_dag manual__2025-01-14T... extract_data
# 4. [Corriger le problème dans le code du DAG]
# 5. Retester
af runs trigger-wait my_dag
Scénario 3 : DAG N'Existe Pas / Ne Charge Pas
# 1. Déclenchement échoue - DAG non trouvé
af runs trigger-wait my_dag
# Erreur : DAG non trouvé
# 2. Trouver l'erreur de parsing
af dags errors
# 3. [Corriger le problème dans le code du DAG]
# 4. Retester
af runs trigger-wait my_dag
Scénario 4 : Déboguer une Exécution Planifiée Échouée
# 1. Obtenir le résumé d'échec
af runs diagnose my_dag scheduled__2025-01-14T...
# 2. Obtenir l'erreur de la tâche échouée
af tasks logs my_dag scheduled__2025-01-14T... failed_task_id
# 3. [Corriger le problème]
# 4. Retester
af runs trigger-wait my_dag
Scénario 5 : Tester avec Configuration Personnalisée
af runs trigger-wait my_dag --conf '{"env": "staging", "batch_size": 100}' --timeout 600
Scénario 6 : DAG Longue Exécution
# Attendre jusqu'à 1 heure
af runs trigger-wait my_dag --timeout 3600
# Si timeout, vérifier l'état courant
af runs get my_dag manual__2025-01-14T...
Conseils de Débogage
Motifs d'Erreur Courants
Connection Refused / Timeout :
- Vérifier
af config connectionspour l'host/port correct - Vérifier la connectivité réseau vers le système externe
- Vérifier que les credentials de connexion sont corrects
ModuleNotFoundError :
- Package manquant de
requirements.txt - Après ajout, peut nécessiter un redémarrage de l'environnement
PermissionError :
- Vérifier les rôles IAM, les droits de base de données, les clés API
- Vérifier que la connexion a les credentials corrects
Task Timeout :
- Query ou opération prenant trop longtemps
- Envisager d'ajouter un paramètre timeout à la tâche
- Optimiser la query/opération sous-jacente
Lire les Logs de Tâche
Les logs de tâche montrent généralement :
- Timestamp de démarrage de la tâche
- Toute déclaration print/log du code de tâche
- Valeur retournée (pour les fonctions décorées @task)
- Exception + full stack trace (si échoué)
- Timestamp de fin de tâche et durée
Concentrez-vous sur l'exception au bas des logs de tâche échouée.
Sur Astro
Les déploiements Astro supportent la promotion d'environnement, ce qui aide à structurer votre workflow de test :
- Déploiement Dev : Tester les DAGs librement avec
astro deploy --dagspour une itération rapide - Déploiement Staging : Exécuter des tests d'intégration sur des données type production
- Déploiement Production : Déployer seulement après validation dans les environnements inférieurs
- Utiliser des déploiements Astro séparés pour chaque environnement et promouvoir le code à travers eux
Skills Associés
- authoring-dags : Pour créer de nouveaux DAGs (inclut la validation avant le test)
- debugging-dags : Pour le dépannage général d'Airflow
- deploying-airflow : Pour déployer les DAGs en production après test