cosmos-dbt-core

À utiliser pour transformer un projet dbt Core en DAG/TaskGroup Airflow avec Astronomer Cosmos. Ne couvre pas dbt Fusion. Avant l'implémentation, vérifier le moteur dbt, l'entrepôt de données, la version d'Airflow, l'environnement d'exécution, le choix DAG vs TaskGroup, et la disponibilité du manifest.

npx skills add https://github.com/astronomer/agents --skill cosmos-dbt-core

Cosmos + dbt Core : Liste de contrôle d'implémentation

Exécutez les étapes dans l'ordre. Préférez la configuration la plus simple qui répond aux contraintes de l'utilisateur.

Note de version : Cette skill cible Cosmos 1.11+ et Airflow 3.x. Si l'utilisateur est sur Airflow 2.x, ajustez les imports en conséquence (voir Appendice A).

Référence : Dernière version stable : https://pypi.org/project/astronomer-cosmos/

Avant de commencer, confirmez : (1) moteur dbt = Core (pas Fusion → utilisez cosmos-dbt-fusion), (2) type d'entrepôt, (3) version Airflow, (4) environnement d'exécution (env Airflow / venv / conteneur), (5) DbtDag vs DbtTaskGroup vs opérateurs individuels, (6) disponibilité du manifest.


1. Configurer le projet (ProjectConfig)

Approche À utiliser quand Paramètre requis
Chemin du projet Fichiers disponibles localement dbt_project_path
Manifest uniquement Chargement dbt_manifest manifest_path + project_name
from cosmos import ProjectConfig

_project_config = ProjectConfig(
    dbt_project_path="/path/to/dbt/project",
    # manifest_path="/path/to/manifest.json",  # for dbt_manifest load mode
    # project_name="my_project",  # if using manifest_path without dbt_project_path
    # install_dbt_deps=False,  # if deps precomputed in CI
)

2. Choisir la stratégie d'analyse (RenderConfig)

Choisissez UN mode de chargement selon les contraintes :

Mode de chargement À utiliser quand Entrées requises Contraintes
dbt_manifest Grands projets ; exécution conteneurisée ; plus rapide ProjectConfig.manifest_path Le manifest distant nécessite manifest_conn_id
dbt_ls Sélecteurs complexes ; besoin de sélection native dbt dbt installé OU dbt_executable_path Peut également être utilisé avec exécution conteneurisée
dbt_ls_file Sélection dbt_ls sans exécuter dbt_ls à chaque analyse RenderConfig.dbt_ls_path select/exclude ne fonctionneront pas
automatic (par défaut) Configurations simples ; laisser Cosmos choisir (aucune) Retombée : manifest → dbt_ls → custom

CRITIQUE : Exécution conteneurisée (DOCKER/KUBERNETES/etc.)

from cosmos import RenderConfig, LoadMode

_render_config = RenderConfig(
    load_method=LoadMode.DBT_MANIFEST,  # or DBT_LS, DBT_LS_FILE, AUTOMATIC
)

3. Choisir le mode d'exécution (ExecutionConfig)

Référence : Consultez reference/cosmos-config.md pour des exemples de configuration détaillés par mode.

Choisissez UN mode d'exécution :

Mode d'exécution À utiliser quand Vitesse Configuration requise
WATCHER Plus rapide ; visibilité single dbt build Plus rapide Adaptateur dbt dans env OU dbt_executable_path ou dbt Fusion
WATCHER_KUBERNETES Méthode isolée la plus rapide ; visibilité single dbt build Rapide dbt installé dans le conteneur
LOCAL + DBT_RUNNER dbt + adaptateur dans la même installation Python qu'Airflow Rapide dbt 1.5+ dans requirements.txt
LOCAL + SUBPROCESS dbt + adaptateur disponibles dans le déploiement Airflow, dans une installation Python isolée Moyen dbt_executable_path
AIRFLOW_ASYNC BigQuery + transformations longue durée Rapide Airflow ≥2.8 ; dépendances du provider
KUBERNETES Isolation entre Airflow et dbt Moyen Airflow ≥2.8 ; dépendances du provider
VIRTUALENV Impossible de modifier l'image ; venv à l'exécution Plus lent py_requirements dans operator_args
Autres approches conteneurisées Support isolation Airflow et dbt Moyen configuration conteneur
from cosmos import ExecutionConfig, ExecutionMode

_execution_config = ExecutionConfig(
    execution_mode=ExecutionMode.WATCHER,  # or LOCAL, VIRTUALENV, AIRFLOW_ASYNC, KUBERNETES, etc.
)

4. Configurer la connexion à l'entrepôt (ProfileConfig)

Référence : Consultez reference/cosmos-config.md pour les options ProfileConfig détaillées et toutes les classes ProfileMapping.

Option A : Connexion Airflow + ProfileMapping (Recommandé)

from cosmos import ProfileConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping

_profile_config = ProfileConfig(
    profile_name="default",
    target_name="dev",
    profile_mapping=SnowflakeUserPasswordProfileMapping(
        conn_id="snowflake_default",
        profile_args={"schema": "my_schema"},
    ),
)

Option B : profiles.yml existant

CRITIQUE : Ne codez pas en dur les secrets ; utilisez des variables d'environnement.

from cosmos import ProfileConfig

_profile_config = ProfileConfig(
    profile_name="my_profile",
    target_name="dev",
    profiles_yml_filepath="/path/to/profiles.yml",
)

5. Configurer le comportement des tests (RenderConfig)

Référence : Consultez reference/cosmos-config.md pour les options de test détaillées.

TestBehavior Comportement
AFTER_EACH (par défaut) Les tests s'exécutent immédiatement après chaque modèle (par défaut)
BUILD Combiner run + test en un single dbt build
AFTER_ALL Tous les tests après la fin de tous les modèles
NONE Ignorer les tests
from cosmos import RenderConfig, TestBehavior

_render_config = RenderConfig(
    test_behavior=TestBehavior.AFTER_EACH,
)

6. Configurer operator_args

Référence : Consultez reference/cosmos-config.md pour les options operator_args détaillées.

_operator_args = {
    # Paramètres BaseOperator
    "retries": 3,

    # Paramètres spécifiques à Cosmos
    "install_deps": False,
    "full_refresh": False,
    "quiet": True,

    # Variables dbt d'exécution (XCom / params)
    "vars": '{"my_var": "{{ ti.xcom_pull(task_ids=\'pre_dbt\') }}"}',
}

7. Assembler DAG / TaskGroup

Option A : DbtDag (Autonome)

from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping
from pendulum import datetime

_project_config = ProjectConfig(
    dbt_project_path="/usr/local/airflow/dbt/my_project",
)

_profile_config = ProfileConfig(
    profile_name="default",
    target_name="dev",
    profile_mapping=SnowflakeUserPasswordProfileMapping(
        conn_id="snowflake_default",
    ),
)

_execution_config = ExecutionConfig()
_render_config = RenderConfig()

my_cosmos_dag = DbtDag(
    dag_id="my_cosmos_dag",
    project_config=_project_config,
    profile_config=_profile_config,
    execution_config=_execution_config,
    render_config=_render_config,
    operator_args={},
    start_date=datetime(2025, 1, 1),
    schedule="@daily",
)

Option B : DbtTaskGroup (À l'intérieur d'un DAG existant)

from airflow.sdk import dag, task  # Airflow 3.x
# from airflow.decorators import dag, task  # Airflow 2.x
from airflow.models.baseoperator import chain
from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig
from pendulum import datetime

_project_config = ProjectConfig(dbt_project_path="/usr/local/airflow/dbt/my_project")
_profile_config = ProfileConfig(profile_name="default", target_name="dev")
_execution_config = ExecutionConfig()
_render_config = RenderConfig()

@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def my_dag():
    @task
    def pre_dbt():
        return "some_value"

    dbt = DbtTaskGroup(
        group_id="dbt_project",
        project_config=_project_config,
        profile_config=_profile_config,
        execution_config=_execution_config,
        render_config=_render_config,
    )

    @task
    def post_dbt():
        pass

    chain(pre_dbt(), dbt, post_dbt())

my_dag()

Option C : Utiliser les opérateurs Cosmos directement

import os
from datetime import datetime
from pathlib import Path
from typing import Any

from airflow import DAG

try:
    from airflow.providers.standard.operators.python import PythonOperator
except ImportError:
    from airflow.operators.python import PythonOperator

from cosmos import DbtCloneLocalOperator, DbtRunLocalOperator, DbtSeedLocalOperator, ProfileConfig
from cosmos.io import upload_to_aws_s3

DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt"
DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH))
DBT_PROJ_DIR = DBT_ROOT_PATH / "jaffle_shop"
DBT_PROFILE_PATH = DBT_PROJ_DIR / "profiles.yml"
DBT_ARTIFACT = DBT_PROJ_DIR / "target"

profile_config = ProfileConfig(
    profile_name="default",
    target_name="dev",
    profiles_yml_filepath=DBT_PROFILE_PATH,
)


def check_s3_file(bucket_name: str, file_key: str, aws_conn_id: str = "aws_default", **context: Any) -> bool:
    """Check if a file exists in the given S3 bucket."""
    from airflow.providers.amazon.aws.hooks.s3 import S3Hook

    s3_key = f"{context['dag'].dag_id}/{context['run_id']}/seed/0/{file_key}"
    print(f"Checking if file {s3_key} exists in S3 bucket...")
    hook = S3Hook(aws_conn_id=aws_conn_id)
    return hook.check_for_key(key=s3_key, bucket_name=bucket_name)


with DAG("example_operators", start_date=datetime(2024, 1, 1), catchup=False) as dag:
    seed_operator = DbtSeedLocalOperator(
        profile_config=profile_config,
        project_dir=DBT_PROJ_DIR,
        task_id="seed",
        dbt_cmd_flags=["--select", "raw_customers"],
        install_deps=True,
        append_env=True,
    )

    check_file_uploaded_task = PythonOperator(
        task_id="check_file_uploaded_task",
        python_callable=check_s3_file,
        op_kwargs={
            "aws_conn_id": "aws_s3_conn",
            "bucket_name": "cosmos-artifacts-upload",
            "file_key": "target/run_results.json",
        },
    )

    run_operator = DbtRunLocalOperator(
        profile_config=profile_config,
        project_dir=DBT_PROJ_DIR,
        task_id="run",
        dbt_cmd_flags=["--models", "stg_customers"],
        install_deps=True,
        append_env=True,
    )

    clone_operator = DbtCloneLocalOperator(
        profile_config=profile_config,
        project_dir=DBT_PROJ_DIR,
        task_id="clone",
        dbt_cmd_flags=["--models", "stg_customers", "--state", DBT_ARTIFACT],
        install_deps=True,
        append_env=True,
    )

    seed_operator >> run_operator >> clone_operator
    seed_operator >> check_file_uploaded_task

Définir les dépendances sur les tâches Cosmos individuelles

from cosmos import DbtDag, DbtResourceType
from airflow.sdk import task, chain

with DbtDag(...) as dag:
    @task
    def upstream_task():
        pass

    _upstream = upstream_task()

    for unique_id, dbt_node in dag.dbt_graph.filtered_nodes.items():
        if dbt_node.resource_type == DbtResourceType.SEED:
            my_dbt_task = dag.tasks_map[unique_id]
            chain(_upstream, my_dbt_task)

8. Vérifications de sécurité

Avant de finaliser, vérifiez :

  • [ ] Le mode d'exécution correspond aux contraintes (AIRFLOW_ASYNC → BigQuery uniquement)
  • [ ] L'adaptateur d'entrepôt installé pour le mode d'exécution choisi
  • [ ] Les secrets via connexions Airflow ou variables env, PAS en texte brut
  • [ ] Le mode de chargement correspond à l'exécution (sélecteurs complexes → dbt_ls)
  • [ ] Les URIs d'assets Airflow 3 si les DAGs en aval sont planifiés sur les assets Cosmos (voir Appendice A)

Appendice A : Compatibilité Airflow 3

Différences d'imports

Airflow 3.x Airflow 2.x
from airflow.sdk import dag, task from airflow.decorators import dag, task
from airflow.sdk import chain from airflow.models.baseoperator import chain

Changement de format URI Asset/Dataset

Cosmos ≤1.9 (Airflow 2 Datasets) :

postgres://0.0.0.0:5434/postgres.public.orders

Cosmos ≥1.10 (Airflow 3 Assets) :

postgres://0.0.0.0:5434/postgres/public/orders

CRITIQUE : Mettez à jour les URIs d'assets lors de la mise à niveau vers Airflow 3.


Appendice B : Extras opérationnels

Mise en cache

Cosmos met en cache les artefacts pour accélérer l'analyse. Activé par défaut.

Référence : https://astronomer.github.io/astronomer-cosmos/configuration/caching.html

Imports optimisés pour la mémoire

AIRFLOW__COSMOS__ENABLE_MEMORY_OPTIMISED_IMPORTS=True

Quand activé :

from cosmos.airflow.dag import DbtDag  # instead of: from cosmos import DbtDag

Upload d'artefacts vers le stockage d'objets

AIRFLOW__COSMOS__REMOTE_TARGET_PATH=s3://bucket/target_dir/
AIRFLOW__COSMOS__REMOTE_TARGET_PATH_CONN_ID=aws_default
from cosmos.io import upload_to_cloud_storage

my_dag = DbtDag(
    # ...
    operator_args={"callback": upload_to_cloud_storage},
)

Hébergement de dbt Docs (Airflow 3.1+ / Cosmos 1.11+)

AIRFLOW__COSMOS__DBT_DOCS_PROJECTS='{
    "my_project": {
        "dir": "s3://bucket/docs/",
        "index": "index.html",
        "conn_id": "aws_default",
        "name": "My Project"
    }
}'

Référence : https://astronomer.github.io/astronomer-cosmos/configuration/hosting-docs.html


Skills connexes

  • cosmos-dbt-fusion : Pour les projets dbt Fusion (pas dbt Core)
  • authoring-dags : Motifs généraux d'authoring de DAGs
  • testing-dags : Test de DAGs après leur création

Skills similaires