blueprint

Définissez des modèles de groupes de tâches Airflow réutilisables avec validation Pydantic et composez des DAGs à partir de YAML. À utiliser pour créer des modèles blueprint, composer des DAGs depuis YAML, valider des configurations ou permettre la création de DAGs sans code pour les non-ingénieurs.

npx skills add https://github.com/astronomer/agents --skill blueprint

Implémentation Blueprint

Vous aidez un utilisateur à travailler avec Blueprint, un système pour composer des DAGs Airflow à partir de YAML en utilisant des templates Python réutilisables. Exécutez les étapes dans l'ordre et préférez la configuration la plus simple qui répond aux besoins de l'utilisateur.

Package : airflow-blueprint sur PyPI Repo : https://github.com/astronomer/blueprint Requiert : Python 3.10+, Airflow 2.5+, Blueprint 0.2.0+

Avant de commencer

Confirmez auprès de l'utilisateur :

  1. Version Airflow ≥2.5
  2. Version Python ≥3.10
  3. Cas d'usage : Blueprint est destiné à des templates standardisés et validés. Si l'utilisateur a besoin de la flexibilité complète d'Airflow, suggérez d'écrire les DAGs directement ou d'utiliser DAG Factory à la place.

Déterminez ce dont l'utilisateur a besoin

Demande utilisateur Action
« Créer un blueprint » / « Définir un template » Aller à Créer des Blueprints
« Créer un DAG à partir de YAML » / « Composer des étapes » Aller à Composer des DAGs en YAML
« Personnaliser les arguments DAG » / « Ajouter des tags au DAG » Aller à Personnaliser la configuration au niveau du DAG
« Surcharger la config à l'exécution » / « Déclencher avec des paramètres » Aller à Surcharges de paramètres à l'exécution
« Post-traiter les DAGs » / « Ajouter un callback » Aller à Callbacks post-construction
« Valider mon YAML » / « Linter le blueprint » Aller à Commandes de validation
« Configurer un blueprint dans mon projet » Aller à Configuration du projet
« Verser mon blueprint » Aller à Versioning
« Générer un schéma » / « Configuration Astro IDE » Aller à Génération de schéma
Erreurs Blueprint / dépannage Aller à Dépannage

Configuration du projet

Si l'utilisateur part de zéro, guidez-le dans la configuration :

1. Installer le package

# Ajouter à requirements.txt
airflow-blueprint>=0.2.0

# Ou installer directement
pip install airflow-blueprint

2. Créer le Loader

Créez dags/loader.py :

from blueprint import build_all

build_all()

La configuration au niveau du DAG (schedule, description, tags, default_args, etc.) est gérée via les champs YAML et les templates BlueprintDagArgs — voir Personnaliser la configuration au niveau du DAG.

3. Vérifier l'installation

uvx --from airflow-blueprint blueprint list

Si aucun blueprint n'est trouvé, l'utilisateur doit d'abord créer des classes blueprint.


Créer des Blueprints

Quand l'utilisateur veut créer un nouveau template blueprint :

Structure Blueprint

# dags/templates/my_blueprints.py
from airflow.operators.bash import BashOperator
from airflow.utils.task_group import TaskGroup
from blueprint import Blueprint, BaseModel, Field

class MyConfig(BaseModel):
    # Champ obligatoire avec description (utilisé dans la sortie CLI et le schéma JSON)
    source_table: str = Field(description="Source table name")
    # Champ optionnel avec défaut et validation
    batch_size: int = Field(default=1000, ge=1)

class MyBlueprint(Blueprint[MyConfig]):
    """Docstring becomes blueprint description."""

    def render(self, config: MyConfig) -> TaskGroup:
        with TaskGroup(group_id=self.step_id) as group:
            BashOperator(
                task_id="my_task",
                bash_command=f"echo '{config.source_table}'"
            )
        return group

Règles clés

Élément Requête
Classe Config Doit hériter de BaseModel
Classe Blueprint Doit hériter de Blueprint[ConfigClass]
Méthode render() Doit retourner TaskGroup ou BaseOperator
IDs de tâche Utiliser self.step_id pour l'ID de groupe/tâche

Recommandez une validation stricte

Suggérez d'ajouter extra="forbid" pour détecter les fautes de frappe dans YAML :

from pydantic import ConfigDict

class MyConfig(BaseModel):
    model_config = ConfigDict(extra="forbid")
    # fields...

Composer des DAGs en YAML

Quand l'utilisateur veut créer un DAG à partir de blueprints :

Structure YAML

# dags/my_pipeline.dag.yaml
dag_id: my_pipeline
schedule: "@daily"
description: "My data pipeline"

steps:
  step_one:
    blueprint: my_blueprint
    source_table: raw.customers
    batch_size: 500

  step_two:
    blueprint: another_blueprint
    depends_on: [step_one]
    target: analytics.output

Par défaut, seuls schedule et description sont supportés comme champs au niveau du DAG (via le DefaultDagArgs intégré). Pour d'autres champs comme tags, default_args, catchup, etc., voir Personnaliser la configuration au niveau du DAG.

Clés réservées dans les étapes

Clé Objectif
blueprint Nom du template (obligatoire)
depends_on Liste des noms d'étapes en amont
version Épingler à une version spécifique du blueprint

Tout le reste est passé à la config du blueprint.

Support Jinja2

YAML supporte le templating Jinja2 avec accès aux variables d'environnement, variables/connexions Airflow, et contexte d'exécution :

dag_id: "{{ env.get('ENV', 'dev') }}_pipeline"
schedule: "{{ var.value.schedule | default('@daily') }}"

steps:
  extract:
    blueprint: extract
    output_path: "/data/{{ context.ds_nodash }}/output.csv"
    run_id: "{{ context.dag_run.run_id }}"

Variables de template disponibles :

  • env — variables d'environnement
  • var — Variables Airflow
  • conn — Connexions Airflow
  • context — proxy qui génère des expressions de template Airflow pour les macros d'exécution (ex. context.ds_nodash, context.dag_run.conf, context.task_instance.xcom_pull(...))

Personnaliser la configuration au niveau du DAG

Par défaut, Blueprint supporte schedule et description comme champs YAML au niveau du DAG. Pour utiliser d'autres arguments du constructeur DAG (tags, default_args, catchup, etc.), définissez une sous-classe BlueprintDagArgs.

Quand l'utiliser

  • L'utilisateur veut tags, default_args, catchup, start_date, ou tout autre kwarg DAG en YAML
  • L'utilisateur veut dériver les propriétés du DAG à partir de la config (ex. nom d'équipe → owner, tier → retries)

Définir une sous-classe BlueprintDagArgs

# dags/templates/my_dag_args.py
from pydantic import BaseModel
from blueprint import BlueprintDagArgs

class MyDagArgsConfig(BaseModel):
    schedule: str | None = None
    description: str | None = None
    tags: list[str] = []
    owner: str = "data-team"
    retries: int = 2

class MyDagArgs(BlueprintDagArgs[MyDagArgsConfig]):
    def render(self, config: MyDagArgsConfig) -> dict[str, Any]:
        return {
            "schedule": config.schedule,
            "description": config.description,
            "tags": config.tags,
            "default_args": {
                "owner": config.owner,
                "retries": config.retries,
            },
        }

Ensuite en YAML, les champs supplémentaires sont validés par le modèle de config :

dag_id: my_pipeline
schedule: "@daily"
tags: [etl, production]
owner: data-team
retries: 3

steps:
  extract:
    blueprint: extract
    source_table: raw.data

Règles

  • Une seule sous-classe BlueprintDagArgs par projet (lève MultipleDagArgsError s'il en existe plusieurs)
  • La méthode render() retourne un dictionnaire de kwargs passé au constructeur Airflow DAG()
  • Si aucune sous-classe personnalisée n'existe, le DefaultDagArgs intégré est utilisé (supporte uniquement schedule et description)

Surcharges de paramètres à l'exécution

Les champs de config Blueprint peuvent être surchargés au moment du déclenchement du DAG en utilisant les paramètres Airflow. Cela permet aux utilisateurs de personnaliser le comportement lors du déclenchement manuel des DAGs depuis l'interface Airflow.

Utiliser self.param() dans les champs de template d'opérateur

Utilisez self.param("field") dans les champs de template d'opérateur pour rendre un champ de config surcharge à l'exécution :

class ExtractConfig(BaseModel):
    query: str = Field(description="SQL query to run")
    batch_size: int = Field(default=1000, ge=1)

class Extract(Blueprint[ExtractConfig]):
    def render(self, config: ExtractConfig) -> TaskGroup:
        with TaskGroup(group_id=self.step_id) as group:
            BashOperator(
                task_id="run_query",
                bash_command=f"run-etl --query {self.param('query')} --batch {self.param('batch_size')}"
            )
        return group

Utiliser self.resolve_config() dans les callables Python

Pour les callables @task ou PythonOperator, utilisez self.resolve_config() pour fusionner les paramètres d'exécution dans la config :

class Extract(Blueprint[ExtractConfig]):
    def render(self, config: ExtractConfig) -> TaskGroup:
        bp = self  # capture reference for closure

        @task(task_id="run_query")
        def run_query(**context):
            resolved = bp.resolve_config(config, context)
            # resolved.query has the runtime override if one was provided
            execute(resolved.query, resolved.batch_size)

        with TaskGroup(group_id=self.step_id) as group:
            run_query()
        return group

Fonctionnement

  • Les paramètres sont générés automatiquement à partir des modèles de config Pydantic et espacés par étape (ex. step_name__field)
  • Les valeurs YAML deviennent des défauts de paramètre ; les métadonnées Pydantic (description, contraintes, valeurs enum) circulent vers le formulaire de déclenchement Airflow
  • Les surcharges invalides lèvent ValidationError au moment de l'exécution

Callbacks post-construction

Utilisez on_dag_built pour post-traiter les DAGs après leur construction. C'est utile pour ajouter des tags, des contrôles d'accès, des métadonnées d'audit, ou toute préoccupation transversale.

from pathlib import Path
from blueprint import build_all

def add_audit_tags(dag, yaml_path: Path) -> None:
    dag.tags.append("managed-by-blueprint")
    dag.tags.append(f"source:{yaml_path.name}")

build_all(on_dag_built=add_audit_tags)

Le callback reçoit :

  • dag — l'objet Airflow DAG construit (mutable)
  • yaml_path — le Path vers le fichier YAML qui a défini le DAG

Commandes de validation

Exécutez les commandes CLI avec uvx :

uvx --from airflow-blueprint blueprint <command>
Commande Quand l'utiliser
blueprint list Afficher les blueprints disponibles
blueprint describe <name> Afficher le schéma de config pour un blueprint
blueprint describe <name> -v N Afficher le schéma pour une version spécifique
blueprint lint Valider tous les fichiers *.dag.yaml
blueprint lint <path> Valider un fichier spécifique
blueprint schema <name> Générer un schéma JSON
blueprint new Création interactive de YAML DAG

Flux de validation

# Vérifier tous les fichiers YAML
blueprint lint

# Sortie attendue pour les fichiers valides :
# PASS customer_pipeline.dag.yaml (dag_id=customer_pipeline)

Versioning

Quand l'utilisateur doit verser les blueprints pour la compatibilité ascendante :

Convention de nommage des versions

  • v1 : MyBlueprint (sans suffixe)
  • v2 : MyBlueprintV2
  • v3 : MyBlueprintV3
# v1 - original
class ExtractConfig(BaseModel):
    source_table: str

class Extract(Blueprint[ExtractConfig]):
    def render(self, config): ...

# v2 - breaking changes, new class
class ExtractV2Config(BaseModel):
    sources: list[dict]  # Different schema

class ExtractV2(Blueprint[ExtractV2Config]):
    def render(self, config): ...

Nom et version explicites

Comme alternative à la convention du nom de classe, les blueprints peuvent définir name et version directement :

class MyCustomExtractor(Blueprint[ExtractV3Config]):
    name = "extract"
    version = 3

    def render(self, config): ...

C'est utile quand le nom de la classe ne suit pas la convention NameV{N} ou quand vous voulez un contrôle plus clair.

Utiliser les versions en YAML

steps:
  # Épingler à v1
  legacy_extract:
    blueprint: extract
    version: 1
    source_table: raw.data

  # Utiliser la plus récente (v2)
  new_extract:
    blueprint: extract
    sources: [{table: orders}]

Génération de schéma

Générez des schémas JSON pour l'autocomplétion d'éditeur ou des outils externes :

# Générer le schéma pour un blueprint
blueprint schema extract > extract.schema.json

Détection automatique de projet Astro

Après avoir créé ou modifié un blueprint, vérifiez automatiquement si le projet est un projet Astro en cherchant un répertoire .astro/ (créé par astro dev init).

Si le projet est un projet Astro, régénérez automatiquement les schémas sans demander :

mkdir -p blueprint/generated-schemas
# Pour chaque nom de `blueprint list` : blueprint schema NAME > blueprint/generated-schemas/NAME.schema.json

L'IDE Astro lit blueprint/generated-schemas/ pour afficher les formulaires de configuration. Garder les schémas synchronisés garantit que le générateur visuel reflète toujours les dernières configs de blueprint.

Si vous ne pouvez pas déterminer si le projet est un projet Astro, posez la question à l'utilisateur une fois et mémorisez-la pour le reste de la session.


Dépannage

« Blueprint not found »

Cause : Classe Blueprint non dans le chemin Python.

Fix : Vérifiez le répertoire de template ou utilisez --template-dir :

blueprint list --template-dir dags/templates/

« Extra inputs are not permitted »

Cause : Faute de frappe dans le nom du champ YAML avec extra="forbid" activé.

Fix : Exécutez blueprint describe <name> pour voir les noms de champs valides.

Le DAG n'apparaît pas dans Airflow

Cause : Loader manquant ou cassé.

Fix : Assurez-vous que dags/loader.py existe et appelle build_all() :

from blueprint import build_all
build_all()

Les erreurs de validation affichées comme des erreurs d'import Airflow

À partir de v0.2.0, les erreurs de validation Pydantic sont affichées comme des erreurs d'import Airflow avec des messages utiles au lieu d'être silencieusement avalées. Le message d'erreur inclut des détails sur les champs manquants, les champs inattendus, et les incompatibilités de type, ainsi qu'un guide pour exécuter blueprint lint ou blueprint describe.

« Cyclic dependency detected »

Cause : Références circulaires dans depends_on.

Fix : Examinez les dépendances d'étape et supprimez les cycles.

« MultipleDagArgsError »

Cause : Plus d'une sous-classe BlueprintDagArgs découverte dans le projet.

Fix : Une seule sous-classe BlueprintDagArgs est autorisée. Supprimez ou fusionnez les doublons.

Déboguer dans l'interface Airflow

Chaque tâche Blueprint a des champs supplémentaires dans Rendered Template :

  • blueprint_step_config - config YAML résolue
  • blueprint_step_code - source Python du blueprint

Liste de vérification de vérification

Avant de finir, vérifiez avec l'utilisateur :

  • [ ] blueprint list affiche ses templates
  • [ ] blueprint lint passe pour tous les fichiers YAML
  • [ ] dags/loader.py existe avec build_all()
  • [ ] Le DAG apparaît dans l'interface Airflow sans erreurs de parsing

Référence

Astro IDE

Skills similaires