Implémentation Blueprint
Vous aidez un utilisateur à travailler avec Blueprint, un système pour composer des DAGs Airflow à partir de YAML en utilisant des templates Python réutilisables. Exécutez les étapes dans l'ordre et préférez la configuration la plus simple qui répond aux besoins de l'utilisateur.
Package :
airflow-blueprintsur PyPI Repo : https://github.com/astronomer/blueprint Requiert : Python 3.10+, Airflow 2.5+, Blueprint 0.2.0+
Avant de commencer
Confirmez auprès de l'utilisateur :
- Version Airflow ≥2.5
- Version Python ≥3.10
- Cas d'usage : Blueprint est destiné à des templates standardisés et validés. Si l'utilisateur a besoin de la flexibilité complète d'Airflow, suggérez d'écrire les DAGs directement ou d'utiliser DAG Factory à la place.
Déterminez ce dont l'utilisateur a besoin
| Demande utilisateur | Action |
|---|---|
| « Créer un blueprint » / « Définir un template » | Aller à Créer des Blueprints |
| « Créer un DAG à partir de YAML » / « Composer des étapes » | Aller à Composer des DAGs en YAML |
| « Personnaliser les arguments DAG » / « Ajouter des tags au DAG » | Aller à Personnaliser la configuration au niveau du DAG |
| « Surcharger la config à l'exécution » / « Déclencher avec des paramètres » | Aller à Surcharges de paramètres à l'exécution |
| « Post-traiter les DAGs » / « Ajouter un callback » | Aller à Callbacks post-construction |
| « Valider mon YAML » / « Linter le blueprint » | Aller à Commandes de validation |
| « Configurer un blueprint dans mon projet » | Aller à Configuration du projet |
| « Verser mon blueprint » | Aller à Versioning |
| « Générer un schéma » / « Configuration Astro IDE » | Aller à Génération de schéma |
| Erreurs Blueprint / dépannage | Aller à Dépannage |
Configuration du projet
Si l'utilisateur part de zéro, guidez-le dans la configuration :
1. Installer le package
# Ajouter à requirements.txt
airflow-blueprint>=0.2.0
# Ou installer directement
pip install airflow-blueprint
2. Créer le Loader
Créez dags/loader.py :
from blueprint import build_all
build_all()
La configuration au niveau du DAG (schedule, description, tags, default_args, etc.) est gérée via les champs YAML et les templates BlueprintDagArgs — voir Personnaliser la configuration au niveau du DAG.
3. Vérifier l'installation
uvx --from airflow-blueprint blueprint list
Si aucun blueprint n'est trouvé, l'utilisateur doit d'abord créer des classes blueprint.
Créer des Blueprints
Quand l'utilisateur veut créer un nouveau template blueprint :
Structure Blueprint
# dags/templates/my_blueprints.py
from airflow.operators.bash import BashOperator
from airflow.utils.task_group import TaskGroup
from blueprint import Blueprint, BaseModel, Field
class MyConfig(BaseModel):
# Champ obligatoire avec description (utilisé dans la sortie CLI et le schéma JSON)
source_table: str = Field(description="Source table name")
# Champ optionnel avec défaut et validation
batch_size: int = Field(default=1000, ge=1)
class MyBlueprint(Blueprint[MyConfig]):
"""Docstring becomes blueprint description."""
def render(self, config: MyConfig) -> TaskGroup:
with TaskGroup(group_id=self.step_id) as group:
BashOperator(
task_id="my_task",
bash_command=f"echo '{config.source_table}'"
)
return group
Règles clés
| Élément | Requête |
|---|---|
| Classe Config | Doit hériter de BaseModel |
| Classe Blueprint | Doit hériter de Blueprint[ConfigClass] |
Méthode render() |
Doit retourner TaskGroup ou BaseOperator |
| IDs de tâche | Utiliser self.step_id pour l'ID de groupe/tâche |
Recommandez une validation stricte
Suggérez d'ajouter extra="forbid" pour détecter les fautes de frappe dans YAML :
from pydantic import ConfigDict
class MyConfig(BaseModel):
model_config = ConfigDict(extra="forbid")
# fields...
Composer des DAGs en YAML
Quand l'utilisateur veut créer un DAG à partir de blueprints :
Structure YAML
# dags/my_pipeline.dag.yaml
dag_id: my_pipeline
schedule: "@daily"
description: "My data pipeline"
steps:
step_one:
blueprint: my_blueprint
source_table: raw.customers
batch_size: 500
step_two:
blueprint: another_blueprint
depends_on: [step_one]
target: analytics.output
Par défaut, seuls schedule et description sont supportés comme champs au niveau du DAG (via le DefaultDagArgs intégré). Pour d'autres champs comme tags, default_args, catchup, etc., voir Personnaliser la configuration au niveau du DAG.
Clés réservées dans les étapes
| Clé | Objectif |
|---|---|
blueprint |
Nom du template (obligatoire) |
depends_on |
Liste des noms d'étapes en amont |
version |
Épingler à une version spécifique du blueprint |
Tout le reste est passé à la config du blueprint.
Support Jinja2
YAML supporte le templating Jinja2 avec accès aux variables d'environnement, variables/connexions Airflow, et contexte d'exécution :
dag_id: "{{ env.get('ENV', 'dev') }}_pipeline"
schedule: "{{ var.value.schedule | default('@daily') }}"
steps:
extract:
blueprint: extract
output_path: "/data/{{ context.ds_nodash }}/output.csv"
run_id: "{{ context.dag_run.run_id }}"
Variables de template disponibles :
env— variables d'environnementvar— Variables Airflowconn— Connexions Airflowcontext— proxy qui génère des expressions de template Airflow pour les macros d'exécution (ex.context.ds_nodash,context.dag_run.conf,context.task_instance.xcom_pull(...))
Personnaliser la configuration au niveau du DAG
Par défaut, Blueprint supporte schedule et description comme champs YAML au niveau du DAG. Pour utiliser d'autres arguments du constructeur DAG (tags, default_args, catchup, etc.), définissez une sous-classe BlueprintDagArgs.
Quand l'utiliser
- L'utilisateur veut
tags,default_args,catchup,start_date, ou tout autre kwarg DAG en YAML - L'utilisateur veut dériver les propriétés du DAG à partir de la config (ex. nom d'équipe → owner, tier → retries)
Définir une sous-classe BlueprintDagArgs
# dags/templates/my_dag_args.py
from pydantic import BaseModel
from blueprint import BlueprintDagArgs
class MyDagArgsConfig(BaseModel):
schedule: str | None = None
description: str | None = None
tags: list[str] = []
owner: str = "data-team"
retries: int = 2
class MyDagArgs(BlueprintDagArgs[MyDagArgsConfig]):
def render(self, config: MyDagArgsConfig) -> dict[str, Any]:
return {
"schedule": config.schedule,
"description": config.description,
"tags": config.tags,
"default_args": {
"owner": config.owner,
"retries": config.retries,
},
}
Ensuite en YAML, les champs supplémentaires sont validés par le modèle de config :
dag_id: my_pipeline
schedule: "@daily"
tags: [etl, production]
owner: data-team
retries: 3
steps:
extract:
blueprint: extract
source_table: raw.data
Règles
- Une seule sous-classe
BlueprintDagArgspar projet (lèveMultipleDagArgsErrors'il en existe plusieurs) - La méthode
render()retourne un dictionnaire de kwargs passé au constructeur AirflowDAG() - Si aucune sous-classe personnalisée n'existe, le
DefaultDagArgsintégré est utilisé (supporte uniquementscheduleetdescription)
Surcharges de paramètres à l'exécution
Les champs de config Blueprint peuvent être surchargés au moment du déclenchement du DAG en utilisant les paramètres Airflow. Cela permet aux utilisateurs de personnaliser le comportement lors du déclenchement manuel des DAGs depuis l'interface Airflow.
Utiliser self.param() dans les champs de template d'opérateur
Utilisez self.param("field") dans les champs de template d'opérateur pour rendre un champ de config surcharge à l'exécution :
class ExtractConfig(BaseModel):
query: str = Field(description="SQL query to run")
batch_size: int = Field(default=1000, ge=1)
class Extract(Blueprint[ExtractConfig]):
def render(self, config: ExtractConfig) -> TaskGroup:
with TaskGroup(group_id=self.step_id) as group:
BashOperator(
task_id="run_query",
bash_command=f"run-etl --query {self.param('query')} --batch {self.param('batch_size')}"
)
return group
Utiliser self.resolve_config() dans les callables Python
Pour les callables @task ou PythonOperator, utilisez self.resolve_config() pour fusionner les paramètres d'exécution dans la config :
class Extract(Blueprint[ExtractConfig]):
def render(self, config: ExtractConfig) -> TaskGroup:
bp = self # capture reference for closure
@task(task_id="run_query")
def run_query(**context):
resolved = bp.resolve_config(config, context)
# resolved.query has the runtime override if one was provided
execute(resolved.query, resolved.batch_size)
with TaskGroup(group_id=self.step_id) as group:
run_query()
return group
Fonctionnement
- Les paramètres sont générés automatiquement à partir des modèles de config Pydantic et espacés par étape (ex.
step_name__field) - Les valeurs YAML deviennent des défauts de paramètre ; les métadonnées Pydantic (description, contraintes, valeurs enum) circulent vers le formulaire de déclenchement Airflow
- Les surcharges invalides lèvent
ValidationErrorau moment de l'exécution
Callbacks post-construction
Utilisez on_dag_built pour post-traiter les DAGs après leur construction. C'est utile pour ajouter des tags, des contrôles d'accès, des métadonnées d'audit, ou toute préoccupation transversale.
from pathlib import Path
from blueprint import build_all
def add_audit_tags(dag, yaml_path: Path) -> None:
dag.tags.append("managed-by-blueprint")
dag.tags.append(f"source:{yaml_path.name}")
build_all(on_dag_built=add_audit_tags)
Le callback reçoit :
dag— l'objet AirflowDAGconstruit (mutable)yaml_path— lePathvers le fichier YAML qui a défini le DAG
Commandes de validation
Exécutez les commandes CLI avec uvx :
uvx --from airflow-blueprint blueprint <command>
| Commande | Quand l'utiliser |
|---|---|
blueprint list |
Afficher les blueprints disponibles |
blueprint describe <name> |
Afficher le schéma de config pour un blueprint |
blueprint describe <name> -v N |
Afficher le schéma pour une version spécifique |
blueprint lint |
Valider tous les fichiers *.dag.yaml |
blueprint lint <path> |
Valider un fichier spécifique |
blueprint schema <name> |
Générer un schéma JSON |
blueprint new |
Création interactive de YAML DAG |
Flux de validation
# Vérifier tous les fichiers YAML
blueprint lint
# Sortie attendue pour les fichiers valides :
# PASS customer_pipeline.dag.yaml (dag_id=customer_pipeline)
Versioning
Quand l'utilisateur doit verser les blueprints pour la compatibilité ascendante :
Convention de nommage des versions
- v1 :
MyBlueprint(sans suffixe) - v2 :
MyBlueprintV2 - v3 :
MyBlueprintV3
# v1 - original
class ExtractConfig(BaseModel):
source_table: str
class Extract(Blueprint[ExtractConfig]):
def render(self, config): ...
# v2 - breaking changes, new class
class ExtractV2Config(BaseModel):
sources: list[dict] # Different schema
class ExtractV2(Blueprint[ExtractV2Config]):
def render(self, config): ...
Nom et version explicites
Comme alternative à la convention du nom de classe, les blueprints peuvent définir name et version directement :
class MyCustomExtractor(Blueprint[ExtractV3Config]):
name = "extract"
version = 3
def render(self, config): ...
C'est utile quand le nom de la classe ne suit pas la convention NameV{N} ou quand vous voulez un contrôle plus clair.
Utiliser les versions en YAML
steps:
# Épingler à v1
legacy_extract:
blueprint: extract
version: 1
source_table: raw.data
# Utiliser la plus récente (v2)
new_extract:
blueprint: extract
sources: [{table: orders}]
Génération de schéma
Générez des schémas JSON pour l'autocomplétion d'éditeur ou des outils externes :
# Générer le schéma pour un blueprint
blueprint schema extract > extract.schema.json
Détection automatique de projet Astro
Après avoir créé ou modifié un blueprint, vérifiez automatiquement si le projet est un projet Astro en cherchant un répertoire .astro/ (créé par astro dev init).
Si le projet est un projet Astro, régénérez automatiquement les schémas sans demander :
mkdir -p blueprint/generated-schemas
# Pour chaque nom de `blueprint list` : blueprint schema NAME > blueprint/generated-schemas/NAME.schema.json
L'IDE Astro lit blueprint/generated-schemas/ pour afficher les formulaires de configuration. Garder les schémas synchronisés garantit que le générateur visuel reflète toujours les dernières configs de blueprint.
Si vous ne pouvez pas déterminer si le projet est un projet Astro, posez la question à l'utilisateur une fois et mémorisez-la pour le reste de la session.
Dépannage
« Blueprint not found »
Cause : Classe Blueprint non dans le chemin Python.
Fix : Vérifiez le répertoire de template ou utilisez --template-dir :
blueprint list --template-dir dags/templates/
« Extra inputs are not permitted »
Cause : Faute de frappe dans le nom du champ YAML avec extra="forbid" activé.
Fix : Exécutez blueprint describe <name> pour voir les noms de champs valides.
Le DAG n'apparaît pas dans Airflow
Cause : Loader manquant ou cassé.
Fix : Assurez-vous que dags/loader.py existe et appelle build_all() :
from blueprint import build_all
build_all()
Les erreurs de validation affichées comme des erreurs d'import Airflow
À partir de v0.2.0, les erreurs de validation Pydantic sont affichées comme des erreurs d'import Airflow avec des messages utiles au lieu d'être silencieusement avalées. Le message d'erreur inclut des détails sur les champs manquants, les champs inattendus, et les incompatibilités de type, ainsi qu'un guide pour exécuter blueprint lint ou blueprint describe.
« Cyclic dependency detected »
Cause : Références circulaires dans depends_on.
Fix : Examinez les dépendances d'étape et supprimez les cycles.
« MultipleDagArgsError »
Cause : Plus d'une sous-classe BlueprintDagArgs découverte dans le projet.
Fix : Une seule sous-classe BlueprintDagArgs est autorisée. Supprimez ou fusionnez les doublons.
Déboguer dans l'interface Airflow
Chaque tâche Blueprint a des champs supplémentaires dans Rendered Template :
blueprint_step_config- config YAML résolueblueprint_step_code- source Python du blueprint
Liste de vérification de vérification
Avant de finir, vérifiez avec l'utilisateur :
- [ ]
blueprint listaffiche ses templates - [ ]
blueprint lintpasse pour tous les fichiers YAML - [ ]
dags/loader.pyexiste avecbuild_all() - [ ] Le DAG apparaît dans l'interface Airflow sans erreurs de parsing
Référence
Astro IDE
- Docs Blueprint Astro IDE : https://docs.astronomer.io/astro/ide-blueprint