DAG Factory
Vous aidez un utilisateur à construire des DAGs Apache Airflow de manière déclarative avec dag-factory, une bibliothèque qui transforme des fichiers de configuration YAML en DAGs Airflow. Exécutez les étapes dans l'ordre et préférez la configuration la plus simple qui répond aux besoins de l'utilisateur.
Package :
dag-factorysur PyPI Repo : https://github.com/astronomer/dag-factory Docs : https://astronomer.github.io/dag-factory/latest/ Cibles : dag-factory v1.0+ uniquement. Pour les projets pré-1.0, consultez reference/migration.md avant d'appliquer les conseils de cette compétence. Prérequis : Python 3.10+, Airflow 2.4+ (Airflow 3 supporté)
Avant de commencer
Confirmez avec l'utilisateur :
- Version d'Airflow ≥2.4
- Version de Python ≥3.10
- Version de dag-factory : cette compétence cible v1.0+. Si le projet est en <1.0, suivez reference/migration.md pour mettre à jour avant de continuer.
- Cas d'usage : dag-factory est pour la création déclarative et sans code de DAGs. Si l'utilisateur a besoin de modèles Pythonic réutilisables et validés avec Pydantic, suggérez blueprint à la place. S'il a besoin de toute la flexibilité Python, suggérez la compétence authoring-dags.
Déterminez ce dont l'utilisateur a besoin
| Demande de l'utilisateur | Action |
|---|---|
| « Créer un DAG YAML » / « Convertir ce DAG Python en YAML » | Allez à Defining a DAG in YAML |
| « Configurer dag-factory dans mon projet » | Allez à Project Setup |
| « Partager les paramètres par défaut entre les DAGs » / « Définir start_date une fois » | Allez à Defaults |
| « Utiliser un opérateur personnalisé » / « Utiliser KPO / Slack / Snowflake » | Allez à Custom & Provider Operators |
| « Tâches dynamiques / mappées » / « expand / partial » | Allez à Dynamic Task Mapping |
| « Planifier sur dataset » / « Outlets et inlets » | Allez à Datasets |
| « Ajouter un callback » / « Slack en cas d'échec » | Allez à Callbacks |
| « Utiliser une timetable » / « datetime en YAML » / « timedelta en YAML » | Allez à Custom Python Objects (__type__) |
| « Linter mon YAML » / « Valider » | Allez à Validation Commands |
| « Convertir Airflow 2 YAML en Airflow 3 » | Allez à Validation Commands (dagfactory convert) |
| « Migrer depuis dag-factory <1.0 » | Consultez reference/migration.md |
| Erreurs dag-factory / dépannage | Allez à Troubleshooting |
Project Setup
1. Installer le package
Ajoutez à requirements.txt :
dag-factory>=1.0.0
dag-factory n'installe pas automatiquement les fournisseurs Airflow. Installez tous les packages de fournisseurs que votre YAML référence (par exemple, apache-airflow-providers-slack, apache-airflow-providers-cncf-kubernetes).
2. Créer le Loader
Créez dags/load_dags.py pour que le processeur DAG d'Airflow le détecte :
import os
from pathlib import Path
from dagfactory import load_yaml_dags
CONFIG_ROOT_DIR = Path(os.getenv("CONFIG_ROOT_DIR", "/usr/local/airflow/dags/"))
# Option A: charger tous les *.yml / *.yaml sous un dossier
load_yaml_dags(globals_dict=globals(), dags_folder=str(CONFIG_ROOT_DIR))
# Option B: charger un seul fichier
# load_yaml_dags(globals_dict=globals(), config_filepath=str(CONFIG_ROOT_DIR / "my_dag.yml"))
# Option C: charger depuis un dict en Python
# load_yaml_dags(globals_dict=globals(), config_dict={...})
globals_dict=globals() est obligatoire pour que les objets DAG générés soient enregistrés dans l'espace de noms du module où Airflow peut les découvrir.
3. Vérifier l'installation
dagfactory --version
Defining a DAG in YAML
Chaque clé YAML de niveau supérieur (autre que default) définit un DAG. La clé devient le dag_id. Utilisez le format de liste pour tasks et task_groups — c'est le format recommandé depuis v1.0.0.
# dags/example_dag_factory.yml
default:
default_args:
start_date: 2024-11-11
basic_example_dag:
default_args:
owner: "custom_owner"
description: "this is an example dag"
schedule: "0 3 * * *"
catchup: false
task_groups:
- group_name: "example_task_group"
tooltip: "this is an example task group"
dependencies: [task_1]
tasks:
- task_id: "task_1"
operator: airflow.operators.bash.BashOperator
bash_command: "echo 1"
- task_id: "task_2"
operator: airflow.operators.bash.BashOperator
bash_command: "echo 2"
dependencies: [task_1]
- task_id: "task_3"
operator: airflow.operators.bash.BashOperator
bash_command: "echo 3"
dependencies: [task_1]
task_group_name: "example_task_group"
Champs clés
| Champ | Où | Objectif |
|---|---|---|
default |
niveau supérieur | Arguments DAG partagés appliqués à chaque DAG dans ce fichier |
default_args |
DAG ou bloc default |
Standard Airflow default_args (owner, retries, start_date, ...) |
schedule |
DAG | Expression cron, preset (@daily), liste Dataset, ou timetable __type__ |
catchup / description / tags |
DAG | Standard Airflow DAG kwargs |
tasks |
DAG | Liste de dicts de tâches ; chacun exige task_id et operator |
operator |
tâche | Chemin d'import complet vers la classe d'opérateur (par exemple airflow.operators.bash.BashOperator) |
dependencies |
tâche / task_group | Liste des task_ids ou group_names en amont |
task_groups |
DAG | Liste de dicts de groupes ; chacun exige group_name |
task_group_name |
tâche | Assigne une tâche à un task group |
Les tâches n'ont pas besoin d'être ordonnées par dépendance dans le YAML — dag-factory résout la topologie du DAG.
Format Dictionnaire (Hérité)
Le format dictionnaire pré-1.0 (où tasks est un dict indexé par task_id) fonctionne toujours pour la compatibilité rétroactive, mais préférez le format liste pour le nouveau code.
Defaults
Il y a quatre façons de définir les valeurs par défaut, dans l'ordre de précédence (plus haut en premier) :
default_args/ clés au niveau DAG à l'intérieur d'un DAG individuel- Le bloc
default:de niveau supérieur dans le même fichier YAML - Argument
defaults_config_dict=passé àload_yaml_dags - Un fichier
defaults.yml(oudefaults.yaml) viadefaults_config_path=(ou auto-détecté à côté du YAML du DAG)
Note : les noms d'argument du loader et plusieurs autres noms de champs ont changé en v1.0.0. Consultez reference/migration.md si vous travaillez sur un projet plus ancien.
Bloc default dans le même fichier
Puissant pour créer des modèles avec plusieurs DAGs depuis un fichier :
default:
default_args:
owner: "data-team"
start_date: 2025-01-01
retries: 2
catchup: false
schedule: "@daily"
dag_one:
description: "first DAG"
tasks:
- task_id: t1
operator: airflow.operators.bash.BashOperator
bash_command: "echo one"
dag_two:
description: "second DAG"
tasks:
- task_id: t1
operator: airflow.operators.bash.BashOperator
bash_command: "echo two"
Fichier defaults.yml
Placez un defaults.yml à côté du YAML du DAG, ou pointez defaults_config_path vers un répertoire parent. dag-factory fusionne tous les fichiers defaults.yml en remontant l'arborescence, avec le fichier le plus proche du YAML du DAG gagnant. Les arguments au niveau DAG (par exemple schedule, catchup) vont à la racine de defaults.yml ; les paramètres par défaut par tâche vont sous default_args.
# defaults.yml
schedule: 0 1 * * *
catchup: false
default_args:
start_date: '2024-12-31'
owner: data-team
Custom & Provider Operators
Référencez n'importe quel opérateur par son chemin d'import Python complet. dag-factory passe toutes les autres clés de tâche comme kwargs à cet opérateur.
tasks:
- task_id: begin
operator: airflow.providers.standard.operators.empty.EmptyOperator
- task_id: make_bread
operator: customized.operators.breakfast_operators.MakeBreadOperator
bread_type: 'Sourdough'
Le package de l'opérateur doit être installé et importable. Pour Airflow 3, préférez airflow.providers.standard.operators.* aux chemins hérités airflow.operators.* — l'CLI dagfactory convert les réécrit automatiquement.
KubernetesPodOperator
Spécifiez le chemin d'opérateur et passez les kwargs directement. À partir de v1.0, dag-factory ne fait plus de cast de type hérité — utilisez __type__ pour les objets k8s imbriqués.
tasks:
- task_id: hello-world-pod
operator: airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator
image: "python:3.12-slim"
cmds: ["python", "-c"]
arguments: ["print('hi')"]
name: example-pod
namespace: default
container_resources:
__type__: kubernetes.client.models.V1ResourceRequirements
limits: {cpu: "1", memory: "1024Mi"}
requests: {cpu: "0.5", memory: "512Mi"}
Dynamic Task Mapping
Utilisez les clés expand et partial sur une tâche pour mapper dynamiquement. dag-factory a deux façons distinctes de référencer la sortie d'une tâche en amont :
task_id.output— Référence de style XCom, utilisée à l'intérieur deexpandop_args/op_kwargs(et les kwargs équivalents d'autres opérateurs).+task_id— Référence de valeur nue, utilisée quand la valeur se situe directement sousexpand(par exempleexpand: {number: +numbers_list}) ou comme argument du décorateur TaskFlow.
Ne mélangez pas : +request ne se résoudra pas à l'intérieur de op_args, et request.output ne se résoudra pas comme une valeur expand nue.
dynamic_task_map:
default_args:
start_date: 2025-01-01
schedule: "0 3 * * *"
tasks:
- task_id: request
operator: airflow.providers.standard.operators.python.PythonOperator
python_callable_name: make_list
python_callable_file: $CONFIG_ROOT_DIR/expand_tasks.py
- task_id: process
operator: airflow.providers.standard.operators.python.PythonOperator
python_callable_name: consume_value
python_callable_file: $CONFIG_ROOT_DIR/expand_tasks.py
partial:
op_kwargs:
fixed_param: "test"
expand:
op_args: request.output # XCom-style — utilisé à l'intérieur de op_args / op_kwargs
dependencies: [request]
Forme de valeur nue (tâches du décorateur TaskFlow, ou tout mapping non-op_args) :
tasks:
- task_id: numbers_list
decorator: airflow.sdk.definitions.decorators.task
python_callable: sample.build_numbers_list
- task_id: double_number
decorator: airflow.sdk.definitions.decorators.task
python_callable: sample.double
expand:
number: +numbers_list # + se résout vers le XComArg de la tâche en amont `numbers_list`
Pour les indices de map nommés (Airflow 2.9+), définissez map_index_template: "{{ task.custom_mapping_key }}" et faites en sorte que le callable assigne context["custom_mapping_key"].
Motifs testés : mapping simple, mapping généré par tâche, mapping répété, partial, mapping multi-paramètres, map_index_template.
Non supporté / non testé : mapping sur task groups, zipping, transformation de données extensibles.
Datasets
Utilisez inlets / outlets sur les tâches pour déclarer les producteurs de datasets, et une liste d'URI de datasets comme schedule pour les consommer.
producer_dag:
default_args:
start_date: '2024-01-01'
schedule: "0 5 * * *"
catchup: false
tasks:
- task_id: task_1
operator: airflow.operators.bash.BashOperator
bash_command: "echo 1"
outlets: ['s3://bucket_example/raw/dataset1.json']
consumer_dag:
default_args:
start_date: '2024-01-01'
schedule: ['s3://bucket_example/raw/dataset1.json']
catchup: false
tasks:
- task_id: task_1
operator: airflow.operators.bash.BashOperator
bash_command: "echo 'consumer'"
Planification conditionnelle de datasets (Airflow 2.9+ / dag-factory 0.22+)
Imbrication des opérateurs logiques __and__ / __or__ sous la clé datasets.
schedule:
datasets:
__or__:
- __and__:
- s3://bucket-cjmm/raw/dataset_custom_1
- s3://bucket-cjmm/raw/dataset_custom_2
- s3://bucket-cjmm/raw/dataset_custom_3
Callbacks
Trois styles, tous valides au niveau DAG, TaskGroup ou Tâche (ou sous default_args) :
1. Chaîne pointant vers un callable
- task_id: task_1
operator: airflow.operators.bash.BashOperator
bash_command: "echo task_1"
on_failure_callback: include.custom_callbacks.output_standard_message
Avec kwargs :
- task_id: task_2
operator: airflow.operators.bash.BashOperator
bash_command: "echo task_2"
on_success_callback:
callback: include.custom_callbacks.output_custom_message
param1: "Task status"
param2: "Successful!"
2. Chemin de fichier + nom de fonction (pas de kwargs)
- task_id: task_3
operator: airflow.operators.bash.BashOperator
bash_command: "echo task_3"
on_retry_callback_name: output_standard_message
on_retry_callback_file: /usr/local/airflow/include/custom_callbacks.py
3. Callbacks du fournisseur
- task_id: task_4
operator: airflow.operators.bash.BashOperator
bash_command: "echo task_4"
on_failure_callback:
callback: airflow.providers.slack.notifications.slack.send_slack_notification
slack_conn_id: slack_conn_id
text: ":red_circle: Task Failed."
channel: "#channel"
Le package du fournisseur doit être installé.
Custom Python Objects (__type__)
Pour tout ce qui n'est pas un scalaire simple — datetime, timedelta, Asset, timetables, objets k8s — utilisez la syntaxe d'objet généralisée :
start_date:
__type__: datetime.datetime
year: 2025
month: 1
day: 1
execution_timeout:
__type__: datetime.timedelta
hours: 1
schedule:
__type__: airflow.timetables.trigger.CronTriggerTimetable
cron: "0 1 * * 3"
timezone: UTC
__type__est le chemin d'import complet vers la classe__args__est une liste d'arguments positionnels- Les autres clés deviennent des arguments de mot-clé
- Pour les listes d'objets typés, utilisez
__type__: builtins.listavec une cléitems:
Clés réservées
N'utilisez pas ces clés YAML pour vos propres données — dag-factory les réserve : __type__, __args__, __join__, __and__, __or__. La clé items est aussi réservée quand elle est utilisée à l'intérieur d'un bloc __type__: builtins.list — n'ajoutez pas de champ personnalisé nommé items à une construction de liste typée.
Validation Commands
Après l'installation, l'CLI dagfactory est sur PATH :
| Commande | Quand l'utiliser |
|---|---|
dagfactory --version |
Confirmer l'installation / la version |
dagfactory lint <path> |
Valider la syntaxe YAML pour un fichier ou un répertoire |
dagfactory lint <path> --verbose |
Afficher un tableau de résultats par fichier |
dagfactory convert <path> |
Afficher les diffs pour migrer les chemins d'import Airflow 2 → 3 |
dagfactory convert <path> --override |
Appliquer les conversions sur place |
Workflow de validation
# 1. Linter le YAML
dagfactory lint dags/
# 2. Faire analyser par Airflow pour détecter les erreurs d'opérateur/import
# (utilisateurs Astro CLI)
astro dev parse
dagfactory lint ne vérifie que la syntaxe YAML — les erreurs d'import d'opérateur et les kwargs manquants apparaissent au moment de l'analyse d'Airflow.
Troubleshooting
« Operator not found » / ModuleNotFoundError
Cause : Package du fournisseur non installé, ou chemin d'import incorrect.
Fix : Installez le fournisseur (pip install apache-airflow-providers-...) et vérifiez le chemin. Pour Airflow 3, exécutez dagfactory convert pour mettre à jour les chemins hérités airflow.operators.* vers airflow.providers.standard.operators.*.
Le YAML s'analyse mais le DAG n'apparaît pas dans Airflow
Cause : Fichier loader manquant ou globals_dict=globals() non passé.
Fix : Assurez-vous qu'un fichier Python dans dags/ appelle load_yaml_dags(globals_dict=globals(), ...). Vérifiez astro dev parse (ou airflow dags list-import-errors) pour les erreurs d'analyse.
« Argument is not JSON-serializable » / mauvais type de kwarg
Cause : Une chaîne scalaire est passée là où un objet Python est attendu (par exemple start_date: "2025-01-01" pour un champ qui a besoin d'un datetime).
Fix : Utilisez __type__: datetime.datetime (ou datetime.timedelta etc.) selon Custom Python Objects.
Planification conditionnelle de dataset ignorée
Cause : Airflow <2.9, dag-factory <0.22, ou utilisation des clés héritées !and/!or.
Fix : Mettez à jour et renommez vers __and__ / __or__.
Plusieurs defaults.yml ne fusionnent pas comme prévu
Cause : defaults_config_path ne pointe pas vers un répertoire parent du YAML du DAG.
Fix : Définissez defaults_config_path sur le dossier ancêtre le plus haut que vous voulez inclure ; dag-factory parcourt l'arborescence du fichier DAG → ancêtre et fusionne dans cet ordre, les fichiers les plus proches du DAG gagnant.
Verification Checklist
Avant de terminer, vérifiez avec l'utilisateur :
- [ ]
dagfactory lint dags/réussit - [ ] Le fichier loader existe dans
dags/et appelleload_yaml_dags(globals_dict=globals(), ...) - [ ] Les fournisseurs Airflow requis sont dans
requirements.txt - [ ] Le DAG apparaît dans l'interface utilisateur d'Airflow sans erreurs d'import
Related Skills
- authoring-dags — Écrire des DAGs Airflow en pur Python avec la validation de l'CLI
af. À utiliser quand YAML ne peut pas exprimer ce que vous avez besoin. - testing-dags : Pour tester les DAGs, déboguer les défaillances, et la boucle test → correction → retest
- debugging-dags : Pour dépanner les DAGs défaillants