dag-factory

Créez des DAGs Apache Airflow de manière déclarative avec des configurations YAML dag-factory. À utiliser lors de la création de templates dag-factory, de la composition de DAGs depuis YAML pour dag-factory, de la configuration des valeurs par défaut/tâches dynamiques/datasets/callbacks pour dag-factory, ou de la validation des configurations dag-factory.

npx skills add https://github.com/astronomer/agents --skill dag-factory

DAG Factory

Vous aidez un utilisateur à construire des DAGs Apache Airflow de manière déclarative avec dag-factory, une bibliothèque qui transforme des fichiers de configuration YAML en DAGs Airflow. Exécutez les étapes dans l'ordre et préférez la configuration la plus simple qui répond aux besoins de l'utilisateur.

Package : dag-factory sur PyPI Repo : https://github.com/astronomer/dag-factory Docs : https://astronomer.github.io/dag-factory/latest/ Cibles : dag-factory v1.0+ uniquement. Pour les projets pré-1.0, consultez reference/migration.md avant d'appliquer les conseils de cette compétence. Prérequis : Python 3.10+, Airflow 2.4+ (Airflow 3 supporté)

Avant de commencer

Confirmez avec l'utilisateur :

  1. Version d'Airflow ≥2.4
  2. Version de Python ≥3.10
  3. Version de dag-factory : cette compétence cible v1.0+. Si le projet est en <1.0, suivez reference/migration.md pour mettre à jour avant de continuer.
  4. Cas d'usage : dag-factory est pour la création déclarative et sans code de DAGs. Si l'utilisateur a besoin de modèles Pythonic réutilisables et validés avec Pydantic, suggérez blueprint à la place. S'il a besoin de toute la flexibilité Python, suggérez la compétence authoring-dags.

Déterminez ce dont l'utilisateur a besoin

Demande de l'utilisateur Action
« Créer un DAG YAML » / « Convertir ce DAG Python en YAML » Allez à Defining a DAG in YAML
« Configurer dag-factory dans mon projet » Allez à Project Setup
« Partager les paramètres par défaut entre les DAGs » / « Définir start_date une fois » Allez à Defaults
« Utiliser un opérateur personnalisé » / « Utiliser KPO / Slack / Snowflake » Allez à Custom & Provider Operators
« Tâches dynamiques / mappées » / « expand / partial » Allez à Dynamic Task Mapping
« Planifier sur dataset » / « Outlets et inlets » Allez à Datasets
« Ajouter un callback » / « Slack en cas d'échec » Allez à Callbacks
« Utiliser une timetable » / « datetime en YAML » / « timedelta en YAML » Allez à Custom Python Objects (__type__)
« Linter mon YAML » / « Valider » Allez à Validation Commands
« Convertir Airflow 2 YAML en Airflow 3 » Allez à Validation Commands (dagfactory convert)
« Migrer depuis dag-factory <1.0 » Consultez reference/migration.md
Erreurs dag-factory / dépannage Allez à Troubleshooting

Project Setup

1. Installer le package

Ajoutez à requirements.txt :

dag-factory>=1.0.0

dag-factory n'installe pas automatiquement les fournisseurs Airflow. Installez tous les packages de fournisseurs que votre YAML référence (par exemple, apache-airflow-providers-slack, apache-airflow-providers-cncf-kubernetes).

2. Créer le Loader

Créez dags/load_dags.py pour que le processeur DAG d'Airflow le détecte :

import os
from pathlib import Path

from dagfactory import load_yaml_dags

CONFIG_ROOT_DIR = Path(os.getenv("CONFIG_ROOT_DIR", "/usr/local/airflow/dags/"))

# Option A: charger tous les *.yml / *.yaml sous un dossier
load_yaml_dags(globals_dict=globals(), dags_folder=str(CONFIG_ROOT_DIR))

# Option B: charger un seul fichier
# load_yaml_dags(globals_dict=globals(), config_filepath=str(CONFIG_ROOT_DIR / "my_dag.yml"))

# Option C: charger depuis un dict en Python
# load_yaml_dags(globals_dict=globals(), config_dict={...})

globals_dict=globals() est obligatoire pour que les objets DAG générés soient enregistrés dans l'espace de noms du module où Airflow peut les découvrir.

3. Vérifier l'installation

dagfactory --version

Defining a DAG in YAML

Chaque clé YAML de niveau supérieur (autre que default) définit un DAG. La clé devient le dag_id. Utilisez le format de liste pour tasks et task_groups — c'est le format recommandé depuis v1.0.0.

# dags/example_dag_factory.yml
default:
  default_args:
    start_date: 2024-11-11

basic_example_dag:
  default_args:
    owner: "custom_owner"
  description: "this is an example dag"
  schedule: "0 3 * * *"
  catchup: false
  task_groups:
    - group_name: "example_task_group"
      tooltip: "this is an example task group"
      dependencies: [task_1]
  tasks:
    - task_id: "task_1"
      operator: airflow.operators.bash.BashOperator
      bash_command: "echo 1"
    - task_id: "task_2"
      operator: airflow.operators.bash.BashOperator
      bash_command: "echo 2"
      dependencies: [task_1]
    - task_id: "task_3"
      operator: airflow.operators.bash.BashOperator
      bash_command: "echo 3"
      dependencies: [task_1]
      task_group_name: "example_task_group"

Champs clés

Champ Objectif
default niveau supérieur Arguments DAG partagés appliqués à chaque DAG dans ce fichier
default_args DAG ou bloc default Standard Airflow default_args (owner, retries, start_date, ...)
schedule DAG Expression cron, preset (@daily), liste Dataset, ou timetable __type__
catchup / description / tags DAG Standard Airflow DAG kwargs
tasks DAG Liste de dicts de tâches ; chacun exige task_id et operator
operator tâche Chemin d'import complet vers la classe d'opérateur (par exemple airflow.operators.bash.BashOperator)
dependencies tâche / task_group Liste des task_ids ou group_names en amont
task_groups DAG Liste de dicts de groupes ; chacun exige group_name
task_group_name tâche Assigne une tâche à un task group

Les tâches n'ont pas besoin d'être ordonnées par dépendance dans le YAML — dag-factory résout la topologie du DAG.

Format Dictionnaire (Hérité)

Le format dictionnaire pré-1.0 (où tasks est un dict indexé par task_id) fonctionne toujours pour la compatibilité rétroactive, mais préférez le format liste pour le nouveau code.


Defaults

Il y a quatre façons de définir les valeurs par défaut, dans l'ordre de précédence (plus haut en premier) :

  1. default_args / clés au niveau DAG à l'intérieur d'un DAG individuel
  2. Le bloc default: de niveau supérieur dans le même fichier YAML
  3. Argument defaults_config_dict= passé à load_yaml_dags
  4. Un fichier defaults.yml (ou defaults.yaml) via defaults_config_path= (ou auto-détecté à côté du YAML du DAG)

Note : les noms d'argument du loader et plusieurs autres noms de champs ont changé en v1.0.0. Consultez reference/migration.md si vous travaillez sur un projet plus ancien.

Bloc default dans le même fichier

Puissant pour créer des modèles avec plusieurs DAGs depuis un fichier :

default:
  default_args:
    owner: "data-team"
    start_date: 2025-01-01
    retries: 2
  catchup: false
  schedule: "@daily"

dag_one:
  description: "first DAG"
  tasks:
    - task_id: t1
      operator: airflow.operators.bash.BashOperator
      bash_command: "echo one"

dag_two:
  description: "second DAG"
  tasks:
    - task_id: t1
      operator: airflow.operators.bash.BashOperator
      bash_command: "echo two"

Fichier defaults.yml

Placez un defaults.yml à côté du YAML du DAG, ou pointez defaults_config_path vers un répertoire parent. dag-factory fusionne tous les fichiers defaults.yml en remontant l'arborescence, avec le fichier le plus proche du YAML du DAG gagnant. Les arguments au niveau DAG (par exemple schedule, catchup) vont à la racine de defaults.yml ; les paramètres par défaut par tâche vont sous default_args.

# defaults.yml
schedule: 0 1 * * *
catchup: false
default_args:
  start_date: '2024-12-31'
  owner: data-team

Custom & Provider Operators

Référencez n'importe quel opérateur par son chemin d'import Python complet. dag-factory passe toutes les autres clés de tâche comme kwargs à cet opérateur.

tasks:
  - task_id: begin
    operator: airflow.providers.standard.operators.empty.EmptyOperator
  - task_id: make_bread
    operator: customized.operators.breakfast_operators.MakeBreadOperator
    bread_type: 'Sourdough'

Le package de l'opérateur doit être installé et importable. Pour Airflow 3, préférez airflow.providers.standard.operators.* aux chemins hérités airflow.operators.* — l'CLI dagfactory convert les réécrit automatiquement.

KubernetesPodOperator

Spécifiez le chemin d'opérateur et passez les kwargs directement. À partir de v1.0, dag-factory ne fait plus de cast de type hérité — utilisez __type__ pour les objets k8s imbriqués.

tasks:
  - task_id: hello-world-pod
    operator: airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator
    image: "python:3.12-slim"
    cmds: ["python", "-c"]
    arguments: ["print('hi')"]
    name: example-pod
    namespace: default
    container_resources:
      __type__: kubernetes.client.models.V1ResourceRequirements
      limits: {cpu: "1", memory: "1024Mi"}
      requests: {cpu: "0.5", memory: "512Mi"}

Dynamic Task Mapping

Utilisez les clés expand et partial sur une tâche pour mapper dynamiquement. dag-factory a deux façons distinctes de référencer la sortie d'une tâche en amont :

  • task_id.output — Référence de style XCom, utilisée à l'intérieur de expand op_args / op_kwargs (et les kwargs équivalents d'autres opérateurs).
  • +task_id — Référence de valeur nue, utilisée quand la valeur se situe directement sous expand (par exemple expand: {number: +numbers_list}) ou comme argument du décorateur TaskFlow.

Ne mélangez pas : +request ne se résoudra pas à l'intérieur de op_args, et request.output ne se résoudra pas comme une valeur expand nue.

dynamic_task_map:
  default_args:
    start_date: 2025-01-01
  schedule: "0 3 * * *"
  tasks:
    - task_id: request
      operator: airflow.providers.standard.operators.python.PythonOperator
      python_callable_name: make_list
      python_callable_file: $CONFIG_ROOT_DIR/expand_tasks.py
    - task_id: process
      operator: airflow.providers.standard.operators.python.PythonOperator
      python_callable_name: consume_value
      python_callable_file: $CONFIG_ROOT_DIR/expand_tasks.py
      partial:
        op_kwargs:
          fixed_param: "test"
      expand:
        op_args: request.output    # XCom-style — utilisé à l'intérieur de op_args / op_kwargs
      dependencies: [request]

Forme de valeur nue (tâches du décorateur TaskFlow, ou tout mapping non-op_args) :

tasks:
  - task_id: numbers_list
    decorator: airflow.sdk.definitions.decorators.task
    python_callable: sample.build_numbers_list
  - task_id: double_number
    decorator: airflow.sdk.definitions.decorators.task
    python_callable: sample.double
    expand:
      number: +numbers_list   # + se résout vers le XComArg de la tâche en amont `numbers_list`

Pour les indices de map nommés (Airflow 2.9+), définissez map_index_template: "{{ task.custom_mapping_key }}" et faites en sorte que le callable assigne context["custom_mapping_key"].

Motifs testés : mapping simple, mapping généré par tâche, mapping répété, partial, mapping multi-paramètres, map_index_template. Non supporté / non testé : mapping sur task groups, zipping, transformation de données extensibles.


Datasets

Utilisez inlets / outlets sur les tâches pour déclarer les producteurs de datasets, et une liste d'URI de datasets comme schedule pour les consommer.

producer_dag:
  default_args:
    start_date: '2024-01-01'
  schedule: "0 5 * * *"
  catchup: false
  tasks:
    - task_id: task_1
      operator: airflow.operators.bash.BashOperator
      bash_command: "echo 1"
      outlets: ['s3://bucket_example/raw/dataset1.json']

consumer_dag:
  default_args:
    start_date: '2024-01-01'
  schedule: ['s3://bucket_example/raw/dataset1.json']
  catchup: false
  tasks:
    - task_id: task_1
      operator: airflow.operators.bash.BashOperator
      bash_command: "echo 'consumer'"

Planification conditionnelle de datasets (Airflow 2.9+ / dag-factory 0.22+)

Imbrication des opérateurs logiques __and__ / __or__ sous la clé datasets.

schedule:
  datasets:
    __or__:
      - __and__:
          - s3://bucket-cjmm/raw/dataset_custom_1
          - s3://bucket-cjmm/raw/dataset_custom_2
      - s3://bucket-cjmm/raw/dataset_custom_3

Callbacks

Trois styles, tous valides au niveau DAG, TaskGroup ou Tâche (ou sous default_args) :

1. Chaîne pointant vers un callable

- task_id: task_1
  operator: airflow.operators.bash.BashOperator
  bash_command: "echo task_1"
  on_failure_callback: include.custom_callbacks.output_standard_message

Avec kwargs :

- task_id: task_2
  operator: airflow.operators.bash.BashOperator
  bash_command: "echo task_2"
  on_success_callback:
    callback: include.custom_callbacks.output_custom_message
    param1: "Task status"
    param2: "Successful!"

2. Chemin de fichier + nom de fonction (pas de kwargs)

- task_id: task_3
  operator: airflow.operators.bash.BashOperator
  bash_command: "echo task_3"
  on_retry_callback_name: output_standard_message
  on_retry_callback_file: /usr/local/airflow/include/custom_callbacks.py

3. Callbacks du fournisseur

- task_id: task_4
  operator: airflow.operators.bash.BashOperator
  bash_command: "echo task_4"
  on_failure_callback:
    callback: airflow.providers.slack.notifications.slack.send_slack_notification
    slack_conn_id: slack_conn_id
    text: ":red_circle: Task Failed."
    channel: "#channel"

Le package du fournisseur doit être installé.


Custom Python Objects (__type__)

Pour tout ce qui n'est pas un scalaire simple — datetime, timedelta, Asset, timetables, objets k8s — utilisez la syntaxe d'objet généralisée :

start_date:
  __type__: datetime.datetime
  year: 2025
  month: 1
  day: 1

execution_timeout:
  __type__: datetime.timedelta
  hours: 1

schedule:
  __type__: airflow.timetables.trigger.CronTriggerTimetable
  cron: "0 1 * * 3"
  timezone: UTC
  • __type__ est le chemin d'import complet vers la classe
  • __args__ est une liste d'arguments positionnels
  • Les autres clés deviennent des arguments de mot-clé
  • Pour les listes d'objets typés, utilisez __type__: builtins.list avec une clé items:

Clés réservées

N'utilisez pas ces clés YAML pour vos propres données — dag-factory les réserve : __type__, __args__, __join__, __and__, __or__. La clé items est aussi réservée quand elle est utilisée à l'intérieur d'un bloc __type__: builtins.list — n'ajoutez pas de champ personnalisé nommé items à une construction de liste typée.


Validation Commands

Après l'installation, l'CLI dagfactory est sur PATH :

Commande Quand l'utiliser
dagfactory --version Confirmer l'installation / la version
dagfactory lint <path> Valider la syntaxe YAML pour un fichier ou un répertoire
dagfactory lint <path> --verbose Afficher un tableau de résultats par fichier
dagfactory convert <path> Afficher les diffs pour migrer les chemins d'import Airflow 2 → 3
dagfactory convert <path> --override Appliquer les conversions sur place

Workflow de validation

# 1. Linter le YAML
dagfactory lint dags/

# 2. Faire analyser par Airflow pour détecter les erreurs d'opérateur/import
#    (utilisateurs Astro CLI)
astro dev parse

dagfactory lint ne vérifie que la syntaxe YAML — les erreurs d'import d'opérateur et les kwargs manquants apparaissent au moment de l'analyse d'Airflow.


Troubleshooting

« Operator not found » / ModuleNotFoundError

Cause : Package du fournisseur non installé, ou chemin d'import incorrect.

Fix : Installez le fournisseur (pip install apache-airflow-providers-...) et vérifiez le chemin. Pour Airflow 3, exécutez dagfactory convert pour mettre à jour les chemins hérités airflow.operators.* vers airflow.providers.standard.operators.*.

Le YAML s'analyse mais le DAG n'apparaît pas dans Airflow

Cause : Fichier loader manquant ou globals_dict=globals() non passé.

Fix : Assurez-vous qu'un fichier Python dans dags/ appelle load_yaml_dags(globals_dict=globals(), ...). Vérifiez astro dev parse (ou airflow dags list-import-errors) pour les erreurs d'analyse.

« Argument is not JSON-serializable » / mauvais type de kwarg

Cause : Une chaîne scalaire est passée là où un objet Python est attendu (par exemple start_date: "2025-01-01" pour un champ qui a besoin d'un datetime).

Fix : Utilisez __type__: datetime.datetime (ou datetime.timedelta etc.) selon Custom Python Objects.

Planification conditionnelle de dataset ignorée

Cause : Airflow <2.9, dag-factory <0.22, ou utilisation des clés héritées !and/!or.

Fix : Mettez à jour et renommez vers __and__ / __or__.

Plusieurs defaults.yml ne fusionnent pas comme prévu

Cause : defaults_config_path ne pointe pas vers un répertoire parent du YAML du DAG.

Fix : Définissez defaults_config_path sur le dossier ancêtre le plus haut que vous voulez inclure ; dag-factory parcourt l'arborescence du fichier DAG → ancêtre et fusionne dans cet ordre, les fichiers les plus proches du DAG gagnant.


Verification Checklist

Avant de terminer, vérifiez avec l'utilisateur :

  • [ ] dagfactory lint dags/ réussit
  • [ ] Le fichier loader existe dans dags/ et appelle load_yaml_dags(globals_dict=globals(), ...)
  • [ ] Les fournisseurs Airflow requis sont dans requirements.txt
  • [ ] Le DAG apparaît dans l'interface utilisateur d'Airflow sans erreurs d'import

Related Skills

  • authoring-dags — Écrire des DAGs Airflow en pur Python avec la validation de l'CLI af. À utiliser quand YAML ne peut pas exprimer ce que vous avez besoin.
  • testing-dags : Pour tester les DAGs, déboguer les défaillances, et la boucle test → correction → retest
  • debugging-dags : Pour dépanner les DAGs défaillants

Reference

Skills similaires