cosmos-dbt-fusion

À utiliser lors de l'exécution d'un projet dbt Fusion avec Astronomer Cosmos. Couvre la configuration Cosmos 1.11+ pour Fusion sur Snowflake/Databricks avec `ExecutionMode.LOCAL`. Avant de mettre en œuvre, vérifiez que le moteur dbt est Fusion (et non Core), que l'entrepôt de données est pris en charge et que l'exécution locale est acceptable. Ne couvre pas dbt Core.

npx skills add https://github.com/astronomer/agents --skill cosmos-dbt-fusion

Cosmos + dbt Fusion : Checklist d'implémentation

Exécutez les étapes dans l'ordre. Cette skill couvre uniquement les contraintes spécifiques à Fusion.

Note de version : le support dbt Fusion a été introduit dans Cosmos 1.11.0. Nécessite Cosmos ≥1.11.

Référence : Consultez reference/cosmos-config.md pour les détails sur ProfileConfig, operator_args et la compatibilité Airflow 3.

Avant de commencer, confirmez : (1) dbt engine = Fusion (pas Core → utilisez cosmos-dbt-core), (2) warehouse = Snowflake, Databricks, Bigquery et Redshift uniquement.

Contraintes spécifiques à Fusion

Contrainte Détails
Pas d'async AIRFLOW_ASYNC non supporté
Pas de virtualenv Fusion est un binaire, pas un package Python
Support warehouse Snowflake, Databricks, Bigquery et Redshift supportés en phase preview

1. Confirmez la version de Cosmos

CRITIQUE : Cosmos 1.11.0 a introduit la compatibilité dbt Fusion.

# Vérifiez la version installée
pip show astronomer-cosmos

# Installez/mettez à jour si nécessaire
pip install "astronomer-cosmos>=1.11.0"

Valider : pip show astronomer-cosmos rapporte version ≥ 1.11.0


2. Installez le binaire dbt Fusion (OBLIGATOIRE)

dbt Fusion n'est PAS fourni avec Cosmos ou dbt Core. Installez-le dans le runtime/image Airflow.

Déterminez où installer le binaire Fusion (Dockerfile / image de base / runtime).

Exemple d'installation Dockerfile

USER root
RUN apt-get update && apt-get install -y curl
ENV SHELL=/bin/bash
RUN curl -fsSL https://public.cdn.getdbt.com/fs/install/install.sh | sh -s -- --update
USER astro

Chemins d'installation courants

Environnement Chemin typique
Astro Runtime /home/astro/.local/bin/dbt
Système-wide /usr/local/bin/dbt

Valider : Le binaire dbt existe au chemin choisi et dbt --version réussit.


3. Choisissez la stratégie de parsing (RenderConfig)

La stratégie de parsing est identique à dbt Core. Choisissez UNE :

Mode de chargement Quand l'utiliser Entrées requises
dbt_manifest Grands projets ; parsing le plus rapide ProjectConfig.manifest_path
dbt_ls Sélecteurs complexes ; nécessite la sélection native dbt Binaire Fusion accessible au scheduler
automatic Configurations simples ; laisser Cosmos décider (aucune)
from cosmos import RenderConfig, LoadMode

_render_config = RenderConfig(
    load_method=LoadMode.AUTOMATIC,  # ou DBT_MANIFEST, DBT_LS
)

4. Configurez la connexion warehouse (ProfileConfig)

Référence : Consultez reference/cosmos-config.md pour les options ProfileConfig complètes et des exemples.

from cosmos import ProfileConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping

_profile_config = ProfileConfig(
    profile_name="default",
    target_name="dev",
    profile_mapping=SnowflakeUserPasswordProfileMapping(
        conn_id="snowflake_default",
    ),
)

5. Configurez ExecutionConfig (LOCAL uniquement)

CRITIQUE : dbt Fusion avec Cosmos nécessite ExecutionMode.LOCAL avec dbt_executable_path pointant vers le binaire Fusion.

from cosmos import ExecutionConfig
from cosmos.constants import InvocationMode

_execution_config = ExecutionConfig(
    invocation_mode=InvocationMode.SUBPROCESS,
    dbt_executable_path="/home/astro/.local/bin/dbt",  # OBLIGATOIRE : chemin vers le binaire Fusion
    # execution_mode est LOCAL par défaut - ne pas changer
)

6. Configurez Project (ProjectConfig)

from cosmos import ProjectConfig

_project_config = ProjectConfig(
    dbt_project_path="/path/to/dbt/project",
    # manifest_path="/path/to/manifest.json",  # pour le mode de chargement dbt_manifest
    # install_dbt_deps=False,  # si les deps sont précalculées en CI
)

7. Assemblez DAG / TaskGroup

Option A : DbtDag (Autonome)

from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping
from pendulum import datetime

_project_config = ProjectConfig(
    dbt_project_path="/usr/local/airflow/dbt/my_project",
)

_profile_config = ProfileConfig(
    profile_name="default",
    target_name="dev",
    profile_mapping=SnowflakeUserPasswordProfileMapping(
        conn_id="snowflake_default",
    ),
)

_execution_config = ExecutionConfig(
    dbt_executable_path="/home/astro/.local/bin/dbt",  # Binaire Fusion
)

_render_config = RenderConfig()

my_fusion_dag = DbtDag(
    dag_id="my_fusion_cosmos_dag",
    project_config=_project_config,
    profile_config=_profile_config,
    execution_config=_execution_config,
    render_config=_render_config,
    start_date=datetime(2025, 1, 1),
    schedule="@daily",
)

Option B : DbtTaskGroup (Dans un DAG existant)

from airflow.sdk import dag, task  # Airflow 3.x
# from airflow.decorators import dag, task  # Airflow 2.x
from airflow.models.baseoperator import chain
from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig, ExecutionConfig
from pendulum import datetime

_project_config = ProjectConfig(dbt_project_path="/usr/local/airflow/dbt/my_project")
_profile_config = ProfileConfig(profile_name="default", target_name="dev")
_execution_config = ExecutionConfig(dbt_executable_path="/home/astro/.local/bin/dbt")

@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def my_dag():
    @task
    def pre_dbt():
        return "some_value"

    dbt = DbtTaskGroup(
        group_id="dbt_fusion_project",
        project_config=_project_config,
        profile_config=_profile_config,
        execution_config=_execution_config,
    )

    @task
    def post_dbt():
        pass

    chain(pre_dbt(), dbt, post_dbt())

my_dag()

8. Validation finale

Avant de finaliser, vérifiez :

  • [ ] Version Cosmos : ≥1.11.0
  • [ ] Binaire Fusion installé : Le chemin existe et est exécutable
  • [ ] Warehouse supporté : Snowflake, Databricks, Bigquery ou Redshift uniquement
  • [ ] Gestion des secrets : Connexions Airflow ou variables d'env, JAMAIS en texte brut

Dépannage

Si l'utilisateur signale des régressions dbt Core après l'activation de Fusion :

AIRFLOW__COSMOS__PRE_DBT_FUSION=1

L'utilisateur doit tester

  • [ ] Le DAG se parse dans l'UI Airflow (aucune erreur d'import/parse-time)
  • [ ] Une exécution manuelle réussit contre le warehouse cible (au moins un modèle)

Référence


Skills connexes

  • cosmos-dbt-core : Pour les projets dbt Core (pas Fusion)
  • authoring-dags : Patterns généraux d'authoring de DAG
  • testing-dags : Tester les DAG après création

Skills similaires