airflow

Interroge, gère et dépanne Apache Airflow via le CLI `af`. Couvre le listing des DAGs, le déclenchement de runs, la lecture des logs de tâches, le diagnostic des échecs, le débogage des erreurs d'import de DAGs, la vérification des connexions, variables, pools et la surveillance de l'état de santé. Redirige également vers des sous-skills pour écrire des DAGs, déboguer, déployer et migrer d'Airflow 2 vers 3. À utiliser quand l'utilisateur mentionne « Airflow », « DAG », « DAG run », « task log », « import error », « parse error », « broken DAG », ou demande à « déclencher un pipeline », « déboguer des erreurs d'import », « vérifier la santé d'Airflow », « lister les connexions », « relancer un run », ou toute opération Airflow. NE PAS utiliser pour l'analytique warehouse/SQL sur les tables de métadonnées Airflow — utiliser analyzing-data à la place.

npx skills add https://github.com/astronomer/agents --skill airflow

Opérations Airflow

Utilisez les commandes af pour interroger, gérer et dépanner les workflows Airflow.

Astro CLI

Astro CLI est la façon recommandée de lancer Airflow en local et de déployer en production. Elle fournit un environnement Airflow containerisé fonctionnant out-of-the-box :

# Initialiser un nouveau projet
astro dev init

# Lancer Airflow localement (webserver sur http://localhost:8080)
astro dev start

# Parser les DAGs pour détecter les erreurs rapidement (pas besoin de lancer Airflow)
astro dev parse

# Exécuter pytest sur vos DAGs
astro dev pytest

# Déployer en production
astro deploy            # Déploiement complet (image + DAGs)
astro deploy --dags     # Déploiement DAGs uniquement (rapide, pas de build d'image)

Pour plus de détails :

  • Nouveau projet ? Voir la skill setting-up-astro-project
  • Environnement local ? Voir la skill managing-astro-local-env
  • Déploiement ? Voir la skill deploying-airflow

Exécuter la CLI

Ces commandes supposent que af est dans le PATH. Lancez-le via astro otto pour l'obtenir automatiquement, ou installez-le en standalone avec uv tool install astro-airflow-mcp.

Configuration des instances

Gérez plusieurs instances Airflow avec configuration persistante :

# Ajouter une nouvelle instance
af instance add prod --url https://airflow.example.com --token "$API_TOKEN"
af instance add staging --url https://staging.example.com --username admin --password admin

# Lister et basculer entre instances
af instance list      # Affiche toutes les instances dans un tableau
af instance use prod  # Basculer vers l'instance prod
af instance current   # Afficher l'instance actuelle
af instance delete old-instance

# Auto-découverte d'instances (utiliser --dry-run pour aperçu)
af instance discover --dry-run        # Aperçu de toutes les instances découvrables
af instance discover                  # Découvrir depuis tous les backends (astro, local)
af instance discover astro            # Découvrir uniquement les déploiements Astro
af instance discover astro --all-workspaces  # Inclure tous les espaces de travail accessibles
af instance discover local            # Scanner les ports Airflow locaux courants
af instance discover local --scan     # Scan profond de tous les ports 1024-65535

# IMPORTANT : Toujours exécuter avec --dry-run en premier et demander la 
# consentement de l'utilisateur avant de lancer discover sans. Le mode non-dry-run 
# crée des jetons API dans Astro Cloud, une action sensible nécessitant une approbation explicite.

# Surcharger l'instance pour une commande unique
af --instance staging dags list

Fichier config : ~/.af/config.yaml (surcharger avec --config ou la variable d'env AF_CONFIG)

Les jetons dans la config peuvent référencer des variables d'environnement en utilisant la syntaxe ${VAR} :

instances:
- name: prod
  url: https://airflow.example.com
  auth:
    token: ${AIRFLOW_API_TOKEN}

Ou utiliser directement les variables d'environnement (pas besoin de fichier config) :

export AIRFLOW_API_URL=http://localhost:8080
export AIRFLOW_AUTH_TOKEN=your-token-here
# Ou username/password :
export AIRFLOW_USERNAME=admin
export AIRFLOW_PASSWORD=admin

Ou les flags CLI : af --airflow-url http://localhost:8080 --token "$TOKEN" <command>

Référence rapide

Commande Description
af health Vérification de la santé du système
af dags list Lister tous les DAGs
af dags get <dag_id> Obtenir les détails d'un DAG
af dags explore <dag_id> Investigation complète d'un DAG
af dags source <dag_id> Obtenir le code source du DAG
af dags pause <dag_id> Pause l'ordonnancement du DAG
af dags unpause <dag_id> Reprendre l'ordonnancement du DAG
af dags errors Lister les erreurs d'importation
af dags warnings Lister les avertissements DAG
af dags stats Statistiques des exécutions DAG
af runs list Lister les exécutions DAG
af runs get <dag_id> <run_id> Obtenir les détails d'une exécution
af runs trigger <dag_id> Déclencher une exécution DAG
af runs trigger-wait <dag_id> Déclencher et attendre la fin
af runs delete <dag_id> <run_id> Supprimer définitivement une exécution DAG
af runs clear <dag_id> <run_id> Effacer une exécution pour la réexécution
af runs diagnose <dag_id> <run_id> Diagnostiquer une exécution échouée
af tasks list <dag_id> Lister les tâches d'un DAG
af tasks get <dag_id> <task_id> Obtenir la définition d'une tâche
af tasks instance <dag_id> <run_id> <task_id> Obtenir l'instance de tâche
af tasks logs <dag_id> <run_id> <task_id> Obtenir les logs de tâche
af config version Version d'Airflow
af config show Configuration complète
af config connections Lister les connexions
af config variables Lister les variables
af config variable <key> Obtenir une variable spécifique
af config pools Lister les pools
af config pool <name> Obtenir les détails d'un pool
af config plugins Lister les plugins
af config providers Lister les providers
af config assets Lister les assets/datasets
af api <endpoint> Accès direct à l'API REST
af api ls Lister les endpoints API disponibles
af api ls --filter X Lister les endpoints correspondant au motif
af registry providers Lister les providers dans le Registry Airflow
af registry modules <provider> Lister les opérateurs/hooks/sensors/transfers d'un provider
af registry parameters <provider> Signatures de constructeurs (nom, type, défaut, requis) pour les classes d'un provider
af registry connections <provider> Types de connexion exposés par un provider

Motifs d'intention utilisateur

Prise en main

  • "Comment lancer Airflow localement ?" / "Configurer Airflow" -> utiliser la skill managing-astro-local-env (utilise Astro CLI)
  • "Créer un nouveau projet Airflow" / "Initialiser le projet" -> utiliser la skill setting-up-astro-project (utilise Astro CLI)
  • "Comment installer Airflow ?" / "Démarrer avec Airflow" -> utiliser la skill setting-up-astro-project

Opérations DAG

  • "Quels DAGs existent ?" / "Lister tous les DAGs" -> af dags list
  • "Parlez-moi du DAG X" / "Qu'est-ce que le DAG Y ?" -> af dags explore <dag_id>
  • "Quel est l'ordonnancement du DAG X ?" -> af dags get <dag_id>
  • "Montrez-moi le code du DAG X" -> af dags source <dag_id>
  • "Arrêter le DAG X" / "Pause ce workflow" -> af dags pause <dag_id>
  • "Reprendre le DAG X" -> af dags unpause <dag_id>
  • "Y a-t-il des erreurs DAG ?" -> af dags errors
  • "Créer un nouveau DAG" / "Écrire un pipeline" -> utiliser la skill authoring-dags

Opérations d'exécution

  • "Quelles exécutions ont eu lieu ?" -> af runs list
  • "Exécuter le DAG X" / "Déclencher le pipeline" -> af runs trigger <dag_id>
  • "Exécuter le DAG X et attendre" -> af runs trigger-wait <dag_id>
  • "Pourquoi cette exécution a-t-elle échouée ?" -> af runs diagnose <dag_id> <run_id>
  • "Supprimer cette exécution" / "Supprimer l'exécution bloquée" -> af runs delete <dag_id> <run_id>
  • "Effacer cette exécution" / "Réessayer cette exécution" / "Relancer ceci" -> af runs clear <dag_id> <run_id>
  • "Tester ce DAG et corriger s'il échoue" -> utiliser la skill testing-dags

Opérations de tâche

  • "Quelles tâches sont dans le DAG X ?" -> af tasks list <dag_id>
  • "Obtenir les logs de tâche" / "Pourquoi la tâche a-t-elle échouée ?" -> af tasks logs <dag_id> <run_id> <task_id>
  • "Analyse complète des causes racines" / "Diagnostiquer et corriger" -> utiliser la skill debugging-dags

Opérations de données

  • "Les données sont-elles fraîches ?" / "Quand cette table a-t-elle été mise à jour en dernier ?" -> utiliser la skill checking-freshness
  • "D'où provient cette donnée ?" -> utiliser la skill tracing-upstream-lineage
  • "Qu'est-ce qui dépend de cette table ?" / "Qu'est-ce qui se casse si je change cela ?" -> utiliser la skill tracing-downstream-lineage

Opérations de déploiement

  • "Déployer mes DAGs" / "Pousser en production" -> utiliser la skill deploying-airflow
  • "Configurer CI/CD" / "Automatiser les déploiements" -> utiliser la skill deploying-airflow
  • "Déployer sur Kubernetes" / "Configurer Helm" -> utiliser la skill deploying-airflow
  • "astro deploy" / "Déploiement DAGs uniquement" -> utiliser la skill deploying-airflow

Opérations système

  • "Quelle version d'Airflow ?" -> af config version
  • "Quelles connexions existent ?" -> af config connections
  • "Les pools sont-ils pleins ?" -> af config pools
  • "Airflow est-il sain ?" -> af health

Exploration API

  • "Quels endpoints API sont disponibles ?" -> af api ls
  • "Trouver les endpoints des variables" -> af api ls --filter variable
  • "Accéder aux valeurs XCom" / "Obtenir XCom" -> af api xcom-entries -F dag_id=X -F task_id=Y
  • "Obtenir les event logs" / "Historique d'audit" -> af api event-logs -F dag_id=X
  • "Créer une connexion via API" -> af api connections -X POST --body '{...}'
  • "Créer une variable via API" -> af api variables -X POST -F key=name -f value=val

Découverte du Registry

  • "Quels opérateurs le provider X a-t-il ?" -> af registry modules <provider>
  • "Quels sont les paramètres du constructeur pour l'opérateur Y ?" -> af registry parameters <provider>
  • "Quels providers existent ?" / "Y a-t-il un provider pour Z ?" -> af registry providers
  • "Quels types de connexion le provider X expose-t-il ?" -> af registry connections <provider>
  • "Écrire un DAG avec un opérateur spécifique" -> utiliser le registry pour vérifier la signature actuelle avant de copier des exemples

Workflows courants

Valider les DAGs avant le déploiement

Si vous utilisez Astro CLI, vous pouvez valider les DAGs sans instance Airflow en cours d'exécution :

# Parser les DAGs pour détecter les erreurs d'importation et de syntaxe
astro dev parse

# Exécuter les tests unitaires
astro dev pytest

Sinon, valider sur une instance en cours d'exécution :

af dags errors     # Vérifier les erreurs de parsing/importation
af dags warnings   # Vérifier les avertissements de dépréciation

Découvrir les signatures d'opérateurs avant d'écrire du code

Le Registry Airflow à airflow.apache.org/registry est la source d'autorité pour les classes provider et leurs signatures de constructeur actuelles. Préférez-le à la mémoire ou à la documentation périmée lors de l'authoring de DAGs — le registry reflète la version provider en direct.

# Lister tous les providers et choisir celui dont vous avez besoin
af registry providers | jq '.providers[] | {id, name, version}'

# Lister tous les opérateurs / hooks / sensors d'un provider (ex. standard, amazon, google)
af registry modules standard \
  | jq '.modules[] | {name, type, import_path, docs_url}'

# Obtenir la signature de constructeur actuelle d'une classe spécifique
af registry parameters standard \
  | jq '.classes["airflow.providers.standard.operators.hitl.ApprovalOperator"].parameters'

# Filtrer les modules par substring (utile quand vous connaissez le concept mais pas la classe)
af registry modules standard \
  | jq '.modules[] | select(.import_path | test("hitl"))'

Les résultats sont cachés localement : 1 heure pour la dernière version, 30 jours pour les versions épinglées (qui sont immuables). Ajouter --version X.Y.Z à tout appel modules / parameters / connections pour cibler une version spécifique.

Investiguer une exécution échouée

# 1. Lister les exécutions récentes pour trouver l'échec
af runs list --dag-id my_dag

# 2. Diagnostiquer l'exécution spécifique
af runs diagnose my_dag manual__2024-01-15T10:00:00+00:00

# 3. Obtenir les logs pour la tâche échouée (à partir de la sortie diagnose)
af tasks logs my_dag manual__2024-01-15T10:00:00+00:00 extract_data

# 4. Après correction, effacer l'exécution pour réessayer toutes les tâches
af runs clear my_dag manual__2024-01-15T10:00:00+00:00

Vérification de santé du matin

# 1. Santé générale du système
af health

# 2. Vérifier les DAGs cassés
af dags errors

# 3. Vérifier l'utilisation des pools
af config pools

Comprendre un DAG

# Obtenir un aperçu complet (métadonnées + tâches + source)
af dags explore my_dag

Vérifier pourquoi le DAG n'exécute pas

# Vérifier si en pause
af dags get my_dag

# Vérifier les erreurs d'importation
af dags errors

# Vérifier les exécutions récentes
af runs list --dag-id my_dag

Déclencher et surveiller

# Option 1 : Déclencher et attendre (bloquant)
af runs trigger-wait my_dag --timeout 1800

# Option 2 : Déclencher et vérifier plus tard
af runs trigger my_dag
af runs get my_dag <run_id>

Format de sortie

Toutes les commandes produisent du JSON (sauf les commandes instance qui utilisent des tableaux lisibles par l'humain) :

af dags list
# {
#   "total_dags": 5,
#   "returned_count": 5,
#   "dags": [...]
# }

Utiliser jq pour filtrer :

# Trouver les exécutions échouées
af runs list | jq '.dag_runs[] | select(.state == "failed")'

# Obtenir uniquement les IDs de DAG
af dags list | jq '.dags[].dag_id'

# Trouver les DAGs en pause
af dags list | jq '[.dags[] | select(.is_paused == true)]'

Options des logs de tâche

# Obtenir les logs pour une tentative de réessai spécifique
af tasks logs my_dag run_id task_id --try 2

# Obtenir les logs pour un index de tâche mappée
af tasks logs my_dag run_id task_id --map-index 5

Accès direct à l'API avec af api

Utiliser af api pour les endpoints non couverts par les commandes de haut niveau (XCom, event-logs, backfills, etc).

# Découvrir les endpoints disponibles
af api ls
af api ls --filter variable

# Usage basique
af api dags
af api dags -F limit=10 -F only_active=true
af api variables -X POST -F key=my_var -f value="my value"
af api variables/old_var -X DELETE

Syntaxe des champs : -F key=value convertit automatiquement les types, -f key=value garde en tant que chaîne.

Référence complète : Voir api-reference.md pour toutes les options, endpoints courants (XCom, event-logs, backfills), et exemples.

Skills associées

Skill Utiliser quand...
authoring-dags Créer ou éditer des fichiers DAG avec les bonnes pratiques
testing-dags Cycles itératifs test -> debug -> correction -> re-test
debugging-dags Analyse approfondie des causes racines et diagnostic d'échec
checking-freshness Vérifier si les données sont à jour ou périmées
tracing-upstream-lineage Trouver d'où proviennent les données
tracing-downstream-lineage Analyse d'impact — qu'est-ce qui se casse si quelque chose change
deploying-airflow Déployer les DAGs en production (Astro, Docker Compose, Kubernetes)
migrating-airflow-2-to-3 Upgrader les DAGs d'Airflow 2.x vers 3.x
managing-astro-local-env Démarrer, arrêter, ou dépanner Airflow localement
setting-up-astro-project Initialiser un nouveau projet Astro/Airflow

Skills similaires