Opérations Airflow
Utilisez les commandes af pour interroger, gérer et dépanner les workflows Airflow.
Astro CLI
Astro CLI est la façon recommandée de lancer Airflow en local et de déployer en production. Elle fournit un environnement Airflow containerisé fonctionnant out-of-the-box :
# Initialiser un nouveau projet
astro dev init
# Lancer Airflow localement (webserver sur http://localhost:8080)
astro dev start
# Parser les DAGs pour détecter les erreurs rapidement (pas besoin de lancer Airflow)
astro dev parse
# Exécuter pytest sur vos DAGs
astro dev pytest
# Déployer en production
astro deploy # Déploiement complet (image + DAGs)
astro deploy --dags # Déploiement DAGs uniquement (rapide, pas de build d'image)
Pour plus de détails :
- Nouveau projet ? Voir la skill setting-up-astro-project
- Environnement local ? Voir la skill managing-astro-local-env
- Déploiement ? Voir la skill deploying-airflow
Exécuter la CLI
Ces commandes supposent que af est dans le PATH. Lancez-le via astro otto pour l'obtenir automatiquement, ou installez-le en standalone avec uv tool install astro-airflow-mcp.
Configuration des instances
Gérez plusieurs instances Airflow avec configuration persistante :
# Ajouter une nouvelle instance
af instance add prod --url https://airflow.example.com --token "$API_TOKEN"
af instance add staging --url https://staging.example.com --username admin --password admin
# Lister et basculer entre instances
af instance list # Affiche toutes les instances dans un tableau
af instance use prod # Basculer vers l'instance prod
af instance current # Afficher l'instance actuelle
af instance delete old-instance
# Auto-découverte d'instances (utiliser --dry-run pour aperçu)
af instance discover --dry-run # Aperçu de toutes les instances découvrables
af instance discover # Découvrir depuis tous les backends (astro, local)
af instance discover astro # Découvrir uniquement les déploiements Astro
af instance discover astro --all-workspaces # Inclure tous les espaces de travail accessibles
af instance discover local # Scanner les ports Airflow locaux courants
af instance discover local --scan # Scan profond de tous les ports 1024-65535
# IMPORTANT : Toujours exécuter avec --dry-run en premier et demander la
# consentement de l'utilisateur avant de lancer discover sans. Le mode non-dry-run
# crée des jetons API dans Astro Cloud, une action sensible nécessitant une approbation explicite.
# Surcharger l'instance pour une commande unique
af --instance staging dags list
Fichier config : ~/.af/config.yaml (surcharger avec --config ou la variable d'env AF_CONFIG)
Les jetons dans la config peuvent référencer des variables d'environnement en utilisant la syntaxe ${VAR} :
instances:
- name: prod
url: https://airflow.example.com
auth:
token: ${AIRFLOW_API_TOKEN}
Ou utiliser directement les variables d'environnement (pas besoin de fichier config) :
export AIRFLOW_API_URL=http://localhost:8080
export AIRFLOW_AUTH_TOKEN=your-token-here
# Ou username/password :
export AIRFLOW_USERNAME=admin
export AIRFLOW_PASSWORD=admin
Ou les flags CLI : af --airflow-url http://localhost:8080 --token "$TOKEN" <command>
Référence rapide
| Commande | Description |
|---|---|
af health |
Vérification de la santé du système |
af dags list |
Lister tous les DAGs |
af dags get <dag_id> |
Obtenir les détails d'un DAG |
af dags explore <dag_id> |
Investigation complète d'un DAG |
af dags source <dag_id> |
Obtenir le code source du DAG |
af dags pause <dag_id> |
Pause l'ordonnancement du DAG |
af dags unpause <dag_id> |
Reprendre l'ordonnancement du DAG |
af dags errors |
Lister les erreurs d'importation |
af dags warnings |
Lister les avertissements DAG |
af dags stats |
Statistiques des exécutions DAG |
af runs list |
Lister les exécutions DAG |
af runs get <dag_id> <run_id> |
Obtenir les détails d'une exécution |
af runs trigger <dag_id> |
Déclencher une exécution DAG |
af runs trigger-wait <dag_id> |
Déclencher et attendre la fin |
af runs delete <dag_id> <run_id> |
Supprimer définitivement une exécution DAG |
af runs clear <dag_id> <run_id> |
Effacer une exécution pour la réexécution |
af runs diagnose <dag_id> <run_id> |
Diagnostiquer une exécution échouée |
af tasks list <dag_id> |
Lister les tâches d'un DAG |
af tasks get <dag_id> <task_id> |
Obtenir la définition d'une tâche |
af tasks instance <dag_id> <run_id> <task_id> |
Obtenir l'instance de tâche |
af tasks logs <dag_id> <run_id> <task_id> |
Obtenir les logs de tâche |
af config version |
Version d'Airflow |
af config show |
Configuration complète |
af config connections |
Lister les connexions |
af config variables |
Lister les variables |
af config variable <key> |
Obtenir une variable spécifique |
af config pools |
Lister les pools |
af config pool <name> |
Obtenir les détails d'un pool |
af config plugins |
Lister les plugins |
af config providers |
Lister les providers |
af config assets |
Lister les assets/datasets |
af api <endpoint> |
Accès direct à l'API REST |
af api ls |
Lister les endpoints API disponibles |
af api ls --filter X |
Lister les endpoints correspondant au motif |
af registry providers |
Lister les providers dans le Registry Airflow |
af registry modules <provider> |
Lister les opérateurs/hooks/sensors/transfers d'un provider |
af registry parameters <provider> |
Signatures de constructeurs (nom, type, défaut, requis) pour les classes d'un provider |
af registry connections <provider> |
Types de connexion exposés par un provider |
Motifs d'intention utilisateur
Prise en main
- "Comment lancer Airflow localement ?" / "Configurer Airflow" -> utiliser la skill managing-astro-local-env (utilise Astro CLI)
- "Créer un nouveau projet Airflow" / "Initialiser le projet" -> utiliser la skill setting-up-astro-project (utilise Astro CLI)
- "Comment installer Airflow ?" / "Démarrer avec Airflow" -> utiliser la skill setting-up-astro-project
Opérations DAG
- "Quels DAGs existent ?" / "Lister tous les DAGs" ->
af dags list - "Parlez-moi du DAG X" / "Qu'est-ce que le DAG Y ?" ->
af dags explore <dag_id> - "Quel est l'ordonnancement du DAG X ?" ->
af dags get <dag_id> - "Montrez-moi le code du DAG X" ->
af dags source <dag_id> - "Arrêter le DAG X" / "Pause ce workflow" ->
af dags pause <dag_id> - "Reprendre le DAG X" ->
af dags unpause <dag_id> - "Y a-t-il des erreurs DAG ?" ->
af dags errors - "Créer un nouveau DAG" / "Écrire un pipeline" -> utiliser la skill authoring-dags
Opérations d'exécution
- "Quelles exécutions ont eu lieu ?" ->
af runs list - "Exécuter le DAG X" / "Déclencher le pipeline" ->
af runs trigger <dag_id> - "Exécuter le DAG X et attendre" ->
af runs trigger-wait <dag_id> - "Pourquoi cette exécution a-t-elle échouée ?" ->
af runs diagnose <dag_id> <run_id> - "Supprimer cette exécution" / "Supprimer l'exécution bloquée" ->
af runs delete <dag_id> <run_id> - "Effacer cette exécution" / "Réessayer cette exécution" / "Relancer ceci" ->
af runs clear <dag_id> <run_id> - "Tester ce DAG et corriger s'il échoue" -> utiliser la skill testing-dags
Opérations de tâche
- "Quelles tâches sont dans le DAG X ?" ->
af tasks list <dag_id> - "Obtenir les logs de tâche" / "Pourquoi la tâche a-t-elle échouée ?" ->
af tasks logs <dag_id> <run_id> <task_id> - "Analyse complète des causes racines" / "Diagnostiquer et corriger" -> utiliser la skill debugging-dags
Opérations de données
- "Les données sont-elles fraîches ?" / "Quand cette table a-t-elle été mise à jour en dernier ?" -> utiliser la skill checking-freshness
- "D'où provient cette donnée ?" -> utiliser la skill tracing-upstream-lineage
- "Qu'est-ce qui dépend de cette table ?" / "Qu'est-ce qui se casse si je change cela ?" -> utiliser la skill tracing-downstream-lineage
Opérations de déploiement
- "Déployer mes DAGs" / "Pousser en production" -> utiliser la skill deploying-airflow
- "Configurer CI/CD" / "Automatiser les déploiements" -> utiliser la skill deploying-airflow
- "Déployer sur Kubernetes" / "Configurer Helm" -> utiliser la skill deploying-airflow
- "astro deploy" / "Déploiement DAGs uniquement" -> utiliser la skill deploying-airflow
Opérations système
- "Quelle version d'Airflow ?" ->
af config version - "Quelles connexions existent ?" ->
af config connections - "Les pools sont-ils pleins ?" ->
af config pools - "Airflow est-il sain ?" ->
af health
Exploration API
- "Quels endpoints API sont disponibles ?" ->
af api ls - "Trouver les endpoints des variables" ->
af api ls --filter variable - "Accéder aux valeurs XCom" / "Obtenir XCom" ->
af api xcom-entries -F dag_id=X -F task_id=Y - "Obtenir les event logs" / "Historique d'audit" ->
af api event-logs -F dag_id=X - "Créer une connexion via API" ->
af api connections -X POST --body '{...}' - "Créer une variable via API" ->
af api variables -X POST -F key=name -f value=val
Découverte du Registry
- "Quels opérateurs le provider X a-t-il ?" ->
af registry modules <provider> - "Quels sont les paramètres du constructeur pour l'opérateur Y ?" ->
af registry parameters <provider> - "Quels providers existent ?" / "Y a-t-il un provider pour Z ?" ->
af registry providers - "Quels types de connexion le provider X expose-t-il ?" ->
af registry connections <provider> - "Écrire un DAG avec un opérateur spécifique" -> utiliser le registry pour vérifier la signature actuelle avant de copier des exemples
Workflows courants
Valider les DAGs avant le déploiement
Si vous utilisez Astro CLI, vous pouvez valider les DAGs sans instance Airflow en cours d'exécution :
# Parser les DAGs pour détecter les erreurs d'importation et de syntaxe
astro dev parse
# Exécuter les tests unitaires
astro dev pytest
Sinon, valider sur une instance en cours d'exécution :
af dags errors # Vérifier les erreurs de parsing/importation
af dags warnings # Vérifier les avertissements de dépréciation
Découvrir les signatures d'opérateurs avant d'écrire du code
Le Registry Airflow à airflow.apache.org/registry est la source d'autorité pour les classes provider et leurs signatures de constructeur actuelles. Préférez-le à la mémoire ou à la documentation périmée lors de l'authoring de DAGs — le registry reflète la version provider en direct.
# Lister tous les providers et choisir celui dont vous avez besoin
af registry providers | jq '.providers[] | {id, name, version}'
# Lister tous les opérateurs / hooks / sensors d'un provider (ex. standard, amazon, google)
af registry modules standard \
| jq '.modules[] | {name, type, import_path, docs_url}'
# Obtenir la signature de constructeur actuelle d'une classe spécifique
af registry parameters standard \
| jq '.classes["airflow.providers.standard.operators.hitl.ApprovalOperator"].parameters'
# Filtrer les modules par substring (utile quand vous connaissez le concept mais pas la classe)
af registry modules standard \
| jq '.modules[] | select(.import_path | test("hitl"))'
Les résultats sont cachés localement : 1 heure pour la dernière version, 30 jours pour les versions épinglées (qui sont immuables). Ajouter --version X.Y.Z à tout appel modules / parameters / connections pour cibler une version spécifique.
Investiguer une exécution échouée
# 1. Lister les exécutions récentes pour trouver l'échec
af runs list --dag-id my_dag
# 2. Diagnostiquer l'exécution spécifique
af runs diagnose my_dag manual__2024-01-15T10:00:00+00:00
# 3. Obtenir les logs pour la tâche échouée (à partir de la sortie diagnose)
af tasks logs my_dag manual__2024-01-15T10:00:00+00:00 extract_data
# 4. Après correction, effacer l'exécution pour réessayer toutes les tâches
af runs clear my_dag manual__2024-01-15T10:00:00+00:00
Vérification de santé du matin
# 1. Santé générale du système
af health
# 2. Vérifier les DAGs cassés
af dags errors
# 3. Vérifier l'utilisation des pools
af config pools
Comprendre un DAG
# Obtenir un aperçu complet (métadonnées + tâches + source)
af dags explore my_dag
Vérifier pourquoi le DAG n'exécute pas
# Vérifier si en pause
af dags get my_dag
# Vérifier les erreurs d'importation
af dags errors
# Vérifier les exécutions récentes
af runs list --dag-id my_dag
Déclencher et surveiller
# Option 1 : Déclencher et attendre (bloquant)
af runs trigger-wait my_dag --timeout 1800
# Option 2 : Déclencher et vérifier plus tard
af runs trigger my_dag
af runs get my_dag <run_id>
Format de sortie
Toutes les commandes produisent du JSON (sauf les commandes instance qui utilisent des tableaux lisibles par l'humain) :
af dags list
# {
# "total_dags": 5,
# "returned_count": 5,
# "dags": [...]
# }
Utiliser jq pour filtrer :
# Trouver les exécutions échouées
af runs list | jq '.dag_runs[] | select(.state == "failed")'
# Obtenir uniquement les IDs de DAG
af dags list | jq '.dags[].dag_id'
# Trouver les DAGs en pause
af dags list | jq '[.dags[] | select(.is_paused == true)]'
Options des logs de tâche
# Obtenir les logs pour une tentative de réessai spécifique
af tasks logs my_dag run_id task_id --try 2
# Obtenir les logs pour un index de tâche mappée
af tasks logs my_dag run_id task_id --map-index 5
Accès direct à l'API avec af api
Utiliser af api pour les endpoints non couverts par les commandes de haut niveau (XCom, event-logs, backfills, etc).
# Découvrir les endpoints disponibles
af api ls
af api ls --filter variable
# Usage basique
af api dags
af api dags -F limit=10 -F only_active=true
af api variables -X POST -F key=my_var -f value="my value"
af api variables/old_var -X DELETE
Syntaxe des champs : -F key=value convertit automatiquement les types, -f key=value garde en tant que chaîne.
Référence complète : Voir api-reference.md pour toutes les options, endpoints courants (XCom, event-logs, backfills), et exemples.
Skills associées
| Skill | Utiliser quand... |
|---|---|
| authoring-dags | Créer ou éditer des fichiers DAG avec les bonnes pratiques |
| testing-dags | Cycles itératifs test -> debug -> correction -> re-test |
| debugging-dags | Analyse approfondie des causes racines et diagnostic d'échec |
| checking-freshness | Vérifier si les données sont à jour ou périmées |
| tracing-upstream-lineage | Trouver d'où proviennent les données |
| tracing-downstream-lineage | Analyse d'impact — qu'est-ce qui se casse si quelque chose change |
| deploying-airflow | Déployer les DAGs en production (Astro, Docker Compose, Kubernetes) |
| migrating-airflow-2-to-3 | Upgrader les DAGs d'Airflow 2.x vers 3.x |
| managing-astro-local-env | Démarrer, arrêter, ou dépanner Airflow localement |
| setting-up-astro-project | Initialiser un nouveau projet Astro/Airflow |