Opérateurs Airflow Human-in-the-Loop
Mettez en pause un DAG jusqu'à ce qu'un humain réponde via l'interface Airflow ou l'API REST. Les opérateurs HITL sont deferrable — ils libèrent leur slot worker pendant l'attente.
Nécessite Airflow 3.1+ (
af config version).Localisation UI : Browse → Required Actions. Répondez depuis l'onglet Required Actions de la page de l'instance de tâche.
Références croisées :
airflow-aipour les décorateurs de tâche IA/LLM ;airflowpour les commandes de découverte de registre et API utilisées ci-dessous.
Étape 1 — Choisissez la capacité dont vous avez besoin
| Capacité | Classe (vérifier à l'étape 2) |
|---|---|
| Approuver ou rejeter ; les tâches en aval sont ignorées en cas de rejet | ApprovalOperator |
| Présenter N options et retourner celles qui ont été choisies | HITLOperator |
| Brancher vers une ou plusieurs tâches en aval selon un choix | HITLBranchOperator |
| Collecter un formulaire (sans étape d'approbation/sélection) | HITLEntryOperator |
| Utiliser le trigger HITL directement (avancé / opérateurs personnalisés) | HITLTrigger |
C'est le seul endroit où les noms de classe sont codés en dur. Le provider ajoute, renomme et supprime des paramètres à chaque version — ne copiez pas les listes de paramètres de mémoire. Récupérez la signature actuelle avant d'écrire du code.
Étape 2 — Découvrez les signatures actuelles depuis le Registre Airflow
Avant d'écrire du code HITL, exécutez ces commandes pour voir la liste en direct et les paramètres de constructeur (consultez la skill airflow pour la référence complète de af registry) :
# Tous les modules liés à HITL du provider standard
af registry modules standard \
| jq '.modules[] | select(.import_path | test("\\.hitl\\.")) | {name, type, import_path, short_description, docs_url}'
# Signatures de constructeur : name, type, default, required, description
af registry parameters standard \
| jq '.classes | to_entries[] | select(.key | test("\\.hitl\\.")) | {fqn: .key, parameters: .value.parameters}'
# Épinglez à la version exacte du provider installé
af config providers \
| jq '.providers[] | select(.package_name == "apache-airflow-providers-standard") | .version'
# ensuite : af registry parameters standard --version <VERSION>
Si le registre affiche un paramètre que cette skill ne mentionne pas, préférez le registre. Si le registre affiche une classe qui ne figure pas à l'étape 1, traitez-la comme additive — le tableau de décision ci-dessus peut être obsolète.
Étape 3 — Exemple canonique (portail d'approbation)
Point de départ pour toute tâche HITL. Adaptez en remplaçant le nom de la classe et les paramètres selon l'étape 2.
from airflow.providers.standard.operators.hitl import ApprovalOperator
from airflow.sdk import dag, task, chain, Param
from pendulum import datetime
@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def approval_example():
@task
def prepare():
return "Review quarterly report"
approval = ApprovalOperator(
task_id="approve_report",
subject="Report Approval",
body="{{ ti.xcom_pull(task_ids='prepare') }}",
defaults="Approve", # Auto-sélectionné en cas de timeout
params={"comments": Param("", type="string")},
)
@task
def after_approval(result):
print(f"Decision: {result['chosen_options']}")
chain(prepare(), approval)
after_approval(approval.output)
approval_example()
Pour les autres classes de l'étape 1, la structure est la même (task_id, subject, plus les paramètres spécifiques à la classe). Vérifiez chaque constructeur via l'étape 2 — par exemple, HITLBranchOperator nécessite que chaque option corresponde directement à un identifiant de tâche en aval ou soit résolue via un paramètre de mapping exposé dans le registre.
Étape 4 — Contrats de comportement (stables entre les versions)
Timeout
- Avec
defaultsdéfini : la tâche réussit en cas de timeout, option(s) par défaut sélectionnée(s). - Sans
defaults: la tâche échoue en cas de timeout.
Markdown + Jinja dans body
body supporte le Markdown et est templatable en Jinja. Rendez le contexte XCom directement :
body = """**Total Budget:** {{ ti.xcom_pull(task_ids='get_budget') }}
| Category | Amount |
|----------|--------|
| Marketing | $1M |
"""
Callbacks
Tous les opérateurs HITL acceptent les kwargs de callback Airflow standard (on_success_callback, on_failure_callback, etc.).
Notifiers
Les opérateurs HITL acceptent une liste notifiers. Dans la méthode notify(context) d'un notifier, construisez un lien vers la tâche en attente avec HITLOperator.generate_link_to_ui_from_context(context, base_url=...).
Restreindre qui peut répondre
Le nom du paramètre et le format d'identifiant accepté dépendent du gestionnaire d'authentification actif. Ne codez pas en dur — vérifiez lequel est actif et quel kwarg le provider actuel expose :
af config show | jq '.auth_manager // .core.auth_manager'
Ensuite, recherchez le kwarg actuel à l'étape 2 (au moment de la rédaction, c'est assigned_users, acceptant les identifiants dans le format que le gestionnaire d'authentification actif utilise — Astro utilise l'ID utilisateur Astro, FabAuthManager utilise l'email, SimpleAuthManager utilise le nom d'utilisateur).
Étape 5 — Répondre depuis des intégrations externes
Pour les bots Slack, les applications personnalisées ou les scripts. Découvrez le endpoint en direct plutôt que de coder en dur un chemin :
af api ls --filter hitl # liste des endpoints en direct
af api spec \
| jq '.paths | to_entries[] | select(.key | test("hitl"))' # schémas de requête/réponse
Le motif PATCH-pour-répondre est stable ; le chemin exact est découvert. Forme typique :
import os, requests
HOST = os.environ["AIRFLOW_HOST"]
TOKEN = os.environ["AIRFLOW_API_TOKEN"]
HEADERS = {"Authorization": f"Bearer {TOKEN}"}
# Lister les en attente — utilisez le chemin de `af api ls --filter hitl`
requests.get(f"{HOST}/<path>", headers=HEADERS, params={"state": "pending"})
# Répondre — même famille de chemin découverte, PATCH
requests.patch(
f"{HOST}/<path>/{dag_id}/{run_id}/{task_id}",
headers=HEADERS,
json={"chosen_options": ["Approve"], "params_input": {"comments": "ok"}},
)
Étape 6 — Vérifications de sécurité
- [ ] Version Airflow ≥ 3.1 (
af config version). - [ ] Les kwargs du constructeur correspondent à la sortie actuelle du registre de l'étape 2 — pas de dérives du style
respondents-vs-assigned_users. - [ ] Pour le branching : chaque option résout vers un identifiant de tâche en aval (directement ou via le kwarg de mapping de l'étape 2).
- [ ] Chaque valeur dans
defaultsfigure aussi dansoptions. - [ ]
execution_timeoutdéfini ;defaultsconfiguré si le timeout doit réussir plutôt que d'échouer. - [ ] Token API configuré si les répondants externes font partie du flux.
Références
L'URL des docs en amont est exposée par module par le registre — ne la codez pas en dur :
af registry modules standard \
| jq '.modules[] | select(.import_path | test("\\.hitl\\.")) | {name, docs_url}'
Skills connexes
- airflow — référence de commande
af registry,af api,af config. - airflow-ai — décorateurs de tâche IA/LLM et motifs GenAI.
- authoring-dags — bonnes pratiques générales d'écriture de DAG.
- testing-dags — cycles itératifs de test → débogage → correction.