airflow-hitl

À utiliser lorsque l'utilisateur a besoin de workflows human-in-the-loop dans Airflow (approbation/rejet, saisie de formulaire, ou branchement piloté par un humain). Couvre ApprovalOperator, HITLOperator, HITLBranchOperator, HITLEntryOperator, HITLTrigger. Nécessite Airflow 3.1+. Ne couvre pas les appels IA/LLM (voir airflow-ai).

npx skills add https://github.com/astronomer/agents --skill airflow-hitl

Opérateurs Airflow Human-in-the-Loop

Mettez en pause un DAG jusqu'à ce qu'un humain réponde via l'interface Airflow ou l'API REST. Les opérateurs HITL sont deferrable — ils libèrent leur slot worker pendant l'attente.

Nécessite Airflow 3.1+ (af config version).

Localisation UI : Browse → Required Actions. Répondez depuis l'onglet Required Actions de la page de l'instance de tâche.

Références croisées : airflow-ai pour les décorateurs de tâche IA/LLM ; airflow pour les commandes de découverte de registre et API utilisées ci-dessous.


Étape 1 — Choisissez la capacité dont vous avez besoin

Capacité Classe (vérifier à l'étape 2)
Approuver ou rejeter ; les tâches en aval sont ignorées en cas de rejet ApprovalOperator
Présenter N options et retourner celles qui ont été choisies HITLOperator
Brancher vers une ou plusieurs tâches en aval selon un choix HITLBranchOperator
Collecter un formulaire (sans étape d'approbation/sélection) HITLEntryOperator
Utiliser le trigger HITL directement (avancé / opérateurs personnalisés) HITLTrigger

C'est le seul endroit où les noms de classe sont codés en dur. Le provider ajoute, renomme et supprime des paramètres à chaque version — ne copiez pas les listes de paramètres de mémoire. Récupérez la signature actuelle avant d'écrire du code.


Étape 2 — Découvrez les signatures actuelles depuis le Registre Airflow

Avant d'écrire du code HITL, exécutez ces commandes pour voir la liste en direct et les paramètres de constructeur (consultez la skill airflow pour la référence complète de af registry) :

# Tous les modules liés à HITL du provider standard
af registry modules standard \
  | jq '.modules[] | select(.import_path | test("\\.hitl\\.")) | {name, type, import_path, short_description, docs_url}'

# Signatures de constructeur : name, type, default, required, description
af registry parameters standard \
  | jq '.classes | to_entries[] | select(.key | test("\\.hitl\\.")) | {fqn: .key, parameters: .value.parameters}'

# Épinglez à la version exacte du provider installé
af config providers \
  | jq '.providers[] | select(.package_name == "apache-airflow-providers-standard") | .version'
# ensuite : af registry parameters standard --version <VERSION>

Si le registre affiche un paramètre que cette skill ne mentionne pas, préférez le registre. Si le registre affiche une classe qui ne figure pas à l'étape 1, traitez-la comme additive — le tableau de décision ci-dessus peut être obsolète.


Étape 3 — Exemple canonique (portail d'approbation)

Point de départ pour toute tâche HITL. Adaptez en remplaçant le nom de la classe et les paramètres selon l'étape 2.

from airflow.providers.standard.operators.hitl import ApprovalOperator
from airflow.sdk import dag, task, chain, Param
from pendulum import datetime

@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def approval_example():
    @task
    def prepare():
        return "Review quarterly report"

    approval = ApprovalOperator(
        task_id="approve_report",
        subject="Report Approval",
        body="{{ ti.xcom_pull(task_ids='prepare') }}",
        defaults="Approve",              # Auto-sélectionné en cas de timeout
        params={"comments": Param("", type="string")},
    )

    @task
    def after_approval(result):
        print(f"Decision: {result['chosen_options']}")

    chain(prepare(), approval)
    after_approval(approval.output)

approval_example()

Pour les autres classes de l'étape 1, la structure est la même (task_id, subject, plus les paramètres spécifiques à la classe). Vérifiez chaque constructeur via l'étape 2 — par exemple, HITLBranchOperator nécessite que chaque option corresponde directement à un identifiant de tâche en aval ou soit résolue via un paramètre de mapping exposé dans le registre.


Étape 4 — Contrats de comportement (stables entre les versions)

Timeout

  • Avec defaults défini : la tâche réussit en cas de timeout, option(s) par défaut sélectionnée(s).
  • Sans defaults : la tâche échoue en cas de timeout.

Markdown + Jinja dans body

body supporte le Markdown et est templatable en Jinja. Rendez le contexte XCom directement :

body = """**Total Budget:** {{ ti.xcom_pull(task_ids='get_budget') }}

| Category | Amount |
|----------|--------|
| Marketing | $1M |
"""

Callbacks

Tous les opérateurs HITL acceptent les kwargs de callback Airflow standard (on_success_callback, on_failure_callback, etc.).

Notifiers

Les opérateurs HITL acceptent une liste notifiers. Dans la méthode notify(context) d'un notifier, construisez un lien vers la tâche en attente avec HITLOperator.generate_link_to_ui_from_context(context, base_url=...).

Restreindre qui peut répondre

Le nom du paramètre et le format d'identifiant accepté dépendent du gestionnaire d'authentification actif. Ne codez pas en dur — vérifiez lequel est actif et quel kwarg le provider actuel expose :

af config show | jq '.auth_manager // .core.auth_manager'

Ensuite, recherchez le kwarg actuel à l'étape 2 (au moment de la rédaction, c'est assigned_users, acceptant les identifiants dans le format que le gestionnaire d'authentification actif utilise — Astro utilise l'ID utilisateur Astro, FabAuthManager utilise l'email, SimpleAuthManager utilise le nom d'utilisateur).


Étape 5 — Répondre depuis des intégrations externes

Pour les bots Slack, les applications personnalisées ou les scripts. Découvrez le endpoint en direct plutôt que de coder en dur un chemin :

af api ls --filter hitl           # liste des endpoints en direct
af api spec \
  | jq '.paths | to_entries[] | select(.key | test("hitl"))'   # schémas de requête/réponse

Le motif PATCH-pour-répondre est stable ; le chemin exact est découvert. Forme typique :

import os, requests

HOST = os.environ["AIRFLOW_HOST"]
TOKEN = os.environ["AIRFLOW_API_TOKEN"]
HEADERS = {"Authorization": f"Bearer {TOKEN}"}

# Lister les en attente — utilisez le chemin de `af api ls --filter hitl`
requests.get(f"{HOST}/<path>", headers=HEADERS, params={"state": "pending"})

# Répondre — même famille de chemin découverte, PATCH
requests.patch(
    f"{HOST}/<path>/{dag_id}/{run_id}/{task_id}",
    headers=HEADERS,
    json={"chosen_options": ["Approve"], "params_input": {"comments": "ok"}},
)

Étape 6 — Vérifications de sécurité

  • [ ] Version Airflow ≥ 3.1 (af config version).
  • [ ] Les kwargs du constructeur correspondent à la sortie actuelle du registre de l'étape 2 — pas de dérives du style respondents-vs-assigned_users.
  • [ ] Pour le branching : chaque option résout vers un identifiant de tâche en aval (directement ou via le kwarg de mapping de l'étape 2).
  • [ ] Chaque valeur dans defaults figure aussi dans options.
  • [ ] execution_timeout défini ; defaults configuré si le timeout doit réussir plutôt que d'échouer.
  • [ ] Token API configuré si les répondants externes font partie du flux.

Références

L'URL des docs en amont est exposée par module par le registre — ne la codez pas en dur :

af registry modules standard \
  | jq '.modules[] | select(.import_path | test("\\.hitl\\.")) | {name, docs_url}'

Skills connexes

  • airflow — référence de commande af registry, af api, af config.
  • airflow-ai — décorateurs de tâche IA/LLM et motifs GenAI.
  • authoring-dags — bonnes pratiques générales d'écriture de DAG.
  • testing-dags — cycles itératifs de test → débogage → correction.

Skills similaires