Cosmos + dbt Core : Liste de contrôle d'implémentation
Exécutez les étapes dans l'ordre. Préférez la configuration la plus simple qui répond aux contraintes de l'utilisateur.
Note de version : Cette skill cible Cosmos 1.11+ et Airflow 3.x. Si l'utilisateur est sur Airflow 2.x, ajustez les imports en conséquence (voir Appendice A).
Référence : Dernière version stable : https://pypi.org/project/astronomer-cosmos/
Avant de commencer, confirmez : (1) moteur dbt = Core (pas Fusion → utilisez cosmos-dbt-fusion), (2) type d'entrepôt, (3) version Airflow, (4) environnement d'exécution (env Airflow / venv / conteneur), (5) DbtDag vs DbtTaskGroup vs opérateurs individuels, (6) disponibilité du manifest.
1. Configurer le projet (ProjectConfig)
| Approche | À utiliser quand | Paramètre requis |
|---|---|---|
| Chemin du projet | Fichiers disponibles localement | dbt_project_path |
| Manifest uniquement | Chargement dbt_manifest |
manifest_path + project_name |
from cosmos import ProjectConfig
_project_config = ProjectConfig(
dbt_project_path="/path/to/dbt/project",
# manifest_path="/path/to/manifest.json", # for dbt_manifest load mode
# project_name="my_project", # if using manifest_path without dbt_project_path
# install_dbt_deps=False, # if deps precomputed in CI
)
2. Choisir la stratégie d'analyse (RenderConfig)
Choisissez UN mode de chargement selon les contraintes :
| Mode de chargement | À utiliser quand | Entrées requises | Contraintes |
|---|---|---|---|
dbt_manifest |
Grands projets ; exécution conteneurisée ; plus rapide | ProjectConfig.manifest_path |
Le manifest distant nécessite manifest_conn_id |
dbt_ls |
Sélecteurs complexes ; besoin de sélection native dbt | dbt installé OU dbt_executable_path |
Peut également être utilisé avec exécution conteneurisée |
dbt_ls_file |
Sélection dbt_ls sans exécuter dbt_ls à chaque analyse | RenderConfig.dbt_ls_path |
select/exclude ne fonctionneront pas |
automatic (par défaut) |
Configurations simples ; laisser Cosmos choisir | (aucune) | Retombée : manifest → dbt_ls → custom |
CRITIQUE : Exécution conteneurisée (
DOCKER/KUBERNETES/etc.)
from cosmos import RenderConfig, LoadMode
_render_config = RenderConfig(
load_method=LoadMode.DBT_MANIFEST, # or DBT_LS, DBT_LS_FILE, AUTOMATIC
)
3. Choisir le mode d'exécution (ExecutionConfig)
Référence : Consultez reference/cosmos-config.md pour des exemples de configuration détaillés par mode.
Choisissez UN mode d'exécution :
| Mode d'exécution | À utiliser quand | Vitesse | Configuration requise |
|---|---|---|---|
WATCHER |
Plus rapide ; visibilité single dbt build |
Plus rapide | Adaptateur dbt dans env OU dbt_executable_path ou dbt Fusion |
WATCHER_KUBERNETES |
Méthode isolée la plus rapide ; visibilité single dbt build |
Rapide | dbt installé dans le conteneur |
LOCAL + DBT_RUNNER |
dbt + adaptateur dans la même installation Python qu'Airflow | Rapide | dbt 1.5+ dans requirements.txt |
LOCAL + SUBPROCESS |
dbt + adaptateur disponibles dans le déploiement Airflow, dans une installation Python isolée | Moyen | dbt_executable_path |
AIRFLOW_ASYNC |
BigQuery + transformations longue durée | Rapide | Airflow ≥2.8 ; dépendances du provider |
KUBERNETES |
Isolation entre Airflow et dbt | Moyen | Airflow ≥2.8 ; dépendances du provider |
VIRTUALENV |
Impossible de modifier l'image ; venv à l'exécution | Plus lent | py_requirements dans operator_args |
| Autres approches conteneurisées | Support isolation Airflow et dbt | Moyen | configuration conteneur |
from cosmos import ExecutionConfig, ExecutionMode
_execution_config = ExecutionConfig(
execution_mode=ExecutionMode.WATCHER, # or LOCAL, VIRTUALENV, AIRFLOW_ASYNC, KUBERNETES, etc.
)
4. Configurer la connexion à l'entrepôt (ProfileConfig)
Référence : Consultez reference/cosmos-config.md pour les options ProfileConfig détaillées et toutes les classes ProfileMapping.
Option A : Connexion Airflow + ProfileMapping (Recommandé)
from cosmos import ProfileConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping
_profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
profile_mapping=SnowflakeUserPasswordProfileMapping(
conn_id="snowflake_default",
profile_args={"schema": "my_schema"},
),
)
Option B : profiles.yml existant
CRITIQUE : Ne codez pas en dur les secrets ; utilisez des variables d'environnement.
from cosmos import ProfileConfig
_profile_config = ProfileConfig(
profile_name="my_profile",
target_name="dev",
profiles_yml_filepath="/path/to/profiles.yml",
)
5. Configurer le comportement des tests (RenderConfig)
Référence : Consultez reference/cosmos-config.md pour les options de test détaillées.
| TestBehavior | Comportement |
|---|---|
AFTER_EACH (par défaut) |
Les tests s'exécutent immédiatement après chaque modèle (par défaut) |
BUILD |
Combiner run + test en un single dbt build |
AFTER_ALL |
Tous les tests après la fin de tous les modèles |
NONE |
Ignorer les tests |
from cosmos import RenderConfig, TestBehavior
_render_config = RenderConfig(
test_behavior=TestBehavior.AFTER_EACH,
)
6. Configurer operator_args
Référence : Consultez reference/cosmos-config.md pour les options operator_args détaillées.
_operator_args = {
# Paramètres BaseOperator
"retries": 3,
# Paramètres spécifiques à Cosmos
"install_deps": False,
"full_refresh": False,
"quiet": True,
# Variables dbt d'exécution (XCom / params)
"vars": '{"my_var": "{{ ti.xcom_pull(task_ids=\'pre_dbt\') }}"}',
}
7. Assembler DAG / TaskGroup
Option A : DbtDag (Autonome)
from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping
from pendulum import datetime
_project_config = ProjectConfig(
dbt_project_path="/usr/local/airflow/dbt/my_project",
)
_profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
profile_mapping=SnowflakeUserPasswordProfileMapping(
conn_id="snowflake_default",
),
)
_execution_config = ExecutionConfig()
_render_config = RenderConfig()
my_cosmos_dag = DbtDag(
dag_id="my_cosmos_dag",
project_config=_project_config,
profile_config=_profile_config,
execution_config=_execution_config,
render_config=_render_config,
operator_args={},
start_date=datetime(2025, 1, 1),
schedule="@daily",
)
Option B : DbtTaskGroup (À l'intérieur d'un DAG existant)
from airflow.sdk import dag, task # Airflow 3.x
# from airflow.decorators import dag, task # Airflow 2.x
from airflow.models.baseoperator import chain
from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig
from pendulum import datetime
_project_config = ProjectConfig(dbt_project_path="/usr/local/airflow/dbt/my_project")
_profile_config = ProfileConfig(profile_name="default", target_name="dev")
_execution_config = ExecutionConfig()
_render_config = RenderConfig()
@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def my_dag():
@task
def pre_dbt():
return "some_value"
dbt = DbtTaskGroup(
group_id="dbt_project",
project_config=_project_config,
profile_config=_profile_config,
execution_config=_execution_config,
render_config=_render_config,
)
@task
def post_dbt():
pass
chain(pre_dbt(), dbt, post_dbt())
my_dag()
Option C : Utiliser les opérateurs Cosmos directement
import os
from datetime import datetime
from pathlib import Path
from typing import Any
from airflow import DAG
try:
from airflow.providers.standard.operators.python import PythonOperator
except ImportError:
from airflow.operators.python import PythonOperator
from cosmos import DbtCloneLocalOperator, DbtRunLocalOperator, DbtSeedLocalOperator, ProfileConfig
from cosmos.io import upload_to_aws_s3
DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt"
DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH))
DBT_PROJ_DIR = DBT_ROOT_PATH / "jaffle_shop"
DBT_PROFILE_PATH = DBT_PROJ_DIR / "profiles.yml"
DBT_ARTIFACT = DBT_PROJ_DIR / "target"
profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
profiles_yml_filepath=DBT_PROFILE_PATH,
)
def check_s3_file(bucket_name: str, file_key: str, aws_conn_id: str = "aws_default", **context: Any) -> bool:
"""Check if a file exists in the given S3 bucket."""
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
s3_key = f"{context['dag'].dag_id}/{context['run_id']}/seed/0/{file_key}"
print(f"Checking if file {s3_key} exists in S3 bucket...")
hook = S3Hook(aws_conn_id=aws_conn_id)
return hook.check_for_key(key=s3_key, bucket_name=bucket_name)
with DAG("example_operators", start_date=datetime(2024, 1, 1), catchup=False) as dag:
seed_operator = DbtSeedLocalOperator(
profile_config=profile_config,
project_dir=DBT_PROJ_DIR,
task_id="seed",
dbt_cmd_flags=["--select", "raw_customers"],
install_deps=True,
append_env=True,
)
check_file_uploaded_task = PythonOperator(
task_id="check_file_uploaded_task",
python_callable=check_s3_file,
op_kwargs={
"aws_conn_id": "aws_s3_conn",
"bucket_name": "cosmos-artifacts-upload",
"file_key": "target/run_results.json",
},
)
run_operator = DbtRunLocalOperator(
profile_config=profile_config,
project_dir=DBT_PROJ_DIR,
task_id="run",
dbt_cmd_flags=["--models", "stg_customers"],
install_deps=True,
append_env=True,
)
clone_operator = DbtCloneLocalOperator(
profile_config=profile_config,
project_dir=DBT_PROJ_DIR,
task_id="clone",
dbt_cmd_flags=["--models", "stg_customers", "--state", DBT_ARTIFACT],
install_deps=True,
append_env=True,
)
seed_operator >> run_operator >> clone_operator
seed_operator >> check_file_uploaded_task
Définir les dépendances sur les tâches Cosmos individuelles
from cosmos import DbtDag, DbtResourceType
from airflow.sdk import task, chain
with DbtDag(...) as dag:
@task
def upstream_task():
pass
_upstream = upstream_task()
for unique_id, dbt_node in dag.dbt_graph.filtered_nodes.items():
if dbt_node.resource_type == DbtResourceType.SEED:
my_dbt_task = dag.tasks_map[unique_id]
chain(_upstream, my_dbt_task)
8. Vérifications de sécurité
Avant de finaliser, vérifiez :
- [ ] Le mode d'exécution correspond aux contraintes (AIRFLOW_ASYNC → BigQuery uniquement)
- [ ] L'adaptateur d'entrepôt installé pour le mode d'exécution choisi
- [ ] Les secrets via connexions Airflow ou variables env, PAS en texte brut
- [ ] Le mode de chargement correspond à l'exécution (sélecteurs complexes → dbt_ls)
- [ ] Les URIs d'assets Airflow 3 si les DAGs en aval sont planifiés sur les assets Cosmos (voir Appendice A)
Appendice A : Compatibilité Airflow 3
Différences d'imports
| Airflow 3.x | Airflow 2.x |
|---|---|
from airflow.sdk import dag, task |
from airflow.decorators import dag, task |
from airflow.sdk import chain |
from airflow.models.baseoperator import chain |
Changement de format URI Asset/Dataset
Cosmos ≤1.9 (Airflow 2 Datasets) :
postgres://0.0.0.0:5434/postgres.public.orders
Cosmos ≥1.10 (Airflow 3 Assets) :
postgres://0.0.0.0:5434/postgres/public/orders
CRITIQUE : Mettez à jour les URIs d'assets lors de la mise à niveau vers Airflow 3.
Appendice B : Extras opérationnels
Mise en cache
Cosmos met en cache les artefacts pour accélérer l'analyse. Activé par défaut.
Référence : https://astronomer.github.io/astronomer-cosmos/configuration/caching.html
Imports optimisés pour la mémoire
AIRFLOW__COSMOS__ENABLE_MEMORY_OPTIMISED_IMPORTS=True
Quand activé :
from cosmos.airflow.dag import DbtDag # instead of: from cosmos import DbtDag
Upload d'artefacts vers le stockage d'objets
AIRFLOW__COSMOS__REMOTE_TARGET_PATH=s3://bucket/target_dir/
AIRFLOW__COSMOS__REMOTE_TARGET_PATH_CONN_ID=aws_default
from cosmos.io import upload_to_cloud_storage
my_dag = DbtDag(
# ...
operator_args={"callback": upload_to_cloud_storage},
)
Hébergement de dbt Docs (Airflow 3.1+ / Cosmos 1.11+)
AIRFLOW__COSMOS__DBT_DOCS_PROJECTS='{
"my_project": {
"dir": "s3://bucket/docs/",
"index": "index.html",
"conn_id": "aws_default",
"name": "My Project"
}
}'
Référence : https://astronomer.github.io/astronomer-cosmos/configuration/hosting-docs.html
Skills connexes
- cosmos-dbt-fusion : Pour les projets dbt Fusion (pas dbt Core)
- authoring-dags : Motifs généraux d'authoring de DAGs
- testing-dags : Test de DAGs après leur création