dbt-transformation-patterns

Par wshobson · agents

Maîtrisez dbt (data build tool) pour l'ingénierie analytique : organisation des modèles, tests, documentation et stratégies incrémentielles. À utiliser lors de la création de transformations de données, de modèles de données ou de la mise en œuvre des bonnes pratiques d'ingénierie analytique.

npx skills add https://github.com/wshobson/agents --skill dbt-transformation-patterns

Motifs de Transformation dbt

Motifs prêts pour la production avec dbt (data build tool) incluant l'organisation des modèles, les stratégies de test, la documentation et le traitement incrémental.

Quand utiliser cette compétence

  • Construire des pipelines de transformation de données avec dbt
  • Organiser les modèles en couches staging, intermediate et marts
  • Implémenter des tests de qualité des données
  • Créer des modèles incrémentiels pour les grands ensembles de données
  • Documenter les modèles de données et la lignée
  • Configurer la structure du projet dbt

Concepts fondamentaux

1. Couches de modèles (Architecture Medallion)

sources/          Définitions des données brutes
    ↓
staging/          Correspondance 1:1 avec la source, nettoyage léger
    ↓
intermediate/     Logique métier, jointures, agrégations
    ↓
marts/            Tables d'analytics finales

2. Conventions de nommage

Couche Préfixe Exemple
Staging stg_ stg_stripe__payments
Intermediate int_ int_payments_pivoted
Marts dim_, fct_ dim_customers, fct_orders

Démarrage rapide

# dbt_project.yml
name: "analytics"
version: "1.0.0"
profile: "analytics"

model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["seeds"]
macro-paths: ["macros"]

vars:
  start_date: "2020-01-01"

models:
  analytics:
    staging:
      +materialized: view
      +schema: staging
    intermediate:
      +materialized: ephemeral
    marts:
      +materialized: table
      +schema: analytics
# Structure du projet
models/
├── staging/
│   ├── stripe/
│   │   ├── _stripe__sources.yml
│   │   ├── _stripe__models.yml
│   │   ├── stg_stripe__customers.sql
│   │   └── stg_stripe__payments.sql
│   └── shopify/
│       ├── _shopify__sources.yml
│       └── stg_shopify__orders.sql
├── intermediate/
│   └── finance/
│       └── int_payments_pivoted.sql
└── marts/
    ├── core/
    │   ├── _core__models.yml
    │   ├── dim_customers.sql
    │   └── fct_orders.sql
    └── finance/
        └── fct_revenue.sql

Motifs

Motif 1 : Définitions des sources

# models/staging/stripe/_stripe__sources.yml
version: 2

sources:
  - name: stripe
    description: Données brutes de Stripe chargées via Fivetran
    database: raw
    schema: stripe
    loader: fivetran
    loaded_at_field: _fivetran_synced
    freshness:
      warn_after: { count: 12, period: hour }
      error_after: { count: 24, period: hour }
    tables:
      - name: customers
        description: Enregistrements de clients Stripe
        columns:
          - name: id
            description: Clé primaire
            tests:
              - unique
              - not_null
          - name: email
            description: Email du client
          - name: created
            description: Timestamp de création du compte

      - name: payments
        description: Transactions de paiement Stripe
        columns:
          - name: id
            tests:
              - unique
              - not_null
          - name: customer_id
            tests:
              - not_null
              - relationships:
                  to: source('stripe', 'customers')
                  field: id

Motif 2 : Modèles de staging

-- models/staging/stripe/stg_stripe__customers.sql
with source as (
    select * from {{ source('stripe', 'customers') }}
),

renamed as (
    select
        -- ids
        id as customer_id,

        -- strings
        lower(email) as email,
        name as customer_name,

        -- timestamps
        created as created_at,

        -- metadata
        _fivetran_synced as _loaded_at

    from source
)

select * from renamed
-- models/staging/stripe/stg_stripe__payments.sql
{{
    config(
        materialized='incremental',
        unique_key='payment_id',
        on_schema_change='append_new_columns'
    )
}}

with source as (
    select * from {{ source('stripe', 'payments') }}

    {% if is_incremental() %}
    where _fivetran_synced > (select max(_loaded_at) from {{ this }})
    {% endif %}
),

renamed as (
    select
        -- ids
        id as payment_id,
        customer_id,
        invoice_id,

        -- amounts (convert cents to dollars)
        amount / 100.0 as amount,
        amount_refunded / 100.0 as amount_refunded,

        -- status
        status as payment_status,

        -- timestamps
        created as created_at,

        -- metadata
        _fivetran_synced as _loaded_at

    from source
)

select * from renamed

Motif 3 : Modèles intermédiaires

-- models/intermediate/finance/int_payments_pivoted_to_customer.sql
with payments as (
    select * from {{ ref('stg_stripe__payments') }}
),

customers as (
    select * from {{ ref('stg_stripe__customers') }}
),

payment_summary as (
    select
        customer_id,
        count(*) as total_payments,
        count(case when payment_status = 'succeeded' then 1 end) as successful_payments,
        sum(case when payment_status = 'succeeded' then amount else 0 end) as total_amount_paid,
        min(created_at) as first_payment_at,
        max(created_at) as last_payment_at
    from payments
    group by customer_id
)

select
    customers.customer_id,
    customers.email,
    customers.created_at as customer_created_at,
    coalesce(payment_summary.total_payments, 0) as total_payments,
    coalesce(payment_summary.successful_payments, 0) as successful_payments,
    coalesce(payment_summary.total_amount_paid, 0) as lifetime_value,
    payment_summary.first_payment_at,
    payment_summary.last_payment_at

from customers
left join payment_summary using (customer_id)

Motif 4 : Modèles Mart (Dimensions et Faits)

-- models/marts/core/dim_customers.sql
{{
    config(
        materialized='table',
        unique_key='customer_id'
    )
}}

with customers as (
    select * from {{ ref('int_payments_pivoted_to_customer') }}
),

orders as (
    select * from {{ ref('stg_shopify__orders') }}
),

order_summary as (
    select
        customer_id,
        count(*) as total_orders,
        sum(total_price) as total_order_value,
        min(created_at) as first_order_at,
        max(created_at) as last_order_at
    from orders
    group by customer_id
),

final as (
    select
        -- surrogate key
        {{ dbt_utils.generate_surrogate_key(['customers.customer_id']) }} as customer_key,

        -- natural key
        customers.customer_id,

        -- attributes
        customers.email,
        customers.customer_created_at,

        -- payment metrics
        customers.total_payments,
        customers.successful_payments,
        customers.lifetime_value,
        customers.first_payment_at,
        customers.last_payment_at,

        -- order metrics
        coalesce(order_summary.total_orders, 0) as total_orders,
        coalesce(order_summary.total_order_value, 0) as total_order_value,
        order_summary.first_order_at,
        order_summary.last_order_at,

        -- calculated fields
        case
            when customers.lifetime_value >= 1000 then 'high'
            when customers.lifetime_value >= 100 then 'medium'
            else 'low'
        end as customer_tier,

        -- timestamps
        current_timestamp as _loaded_at

    from customers
    left join order_summary using (customer_id)
)

select * from final
-- models/marts/core/fct_orders.sql
{{
    config(
        materialized='incremental',
        unique_key='order_id',
        incremental_strategy='merge'
    )
}}

with orders as (
    select * from {{ ref('stg_shopify__orders') }}

    {% if is_incremental() %}
    where updated_at > (select max(updated_at) from {{ this }})
    {% endif %}
),

customers as (
    select * from {{ ref('dim_customers') }}
),

final as (
    select
        -- keys
        orders.order_id,
        customers.customer_key,
        orders.customer_id,

        -- dimensions
        orders.order_status,
        orders.fulfillment_status,
        orders.payment_status,

        -- measures
        orders.subtotal,
        orders.tax,
        orders.shipping,
        orders.total_price,
        orders.total_discount,
        orders.item_count,

        -- timestamps
        orders.created_at,
        orders.updated_at,
        orders.fulfilled_at,

        -- metadata
        current_timestamp as _loaded_at

    from orders
    left join customers on orders.customer_id = customers.customer_id
)

select * from final

Motif 5 : Tests et documentation

# models/marts/core/_core__models.yml
version: 2

models:
  - name: dim_customers
    description: Dimension client avec métriques de paiement et de commande
    columns:
      - name: customer_key
        description: Clé de substitution pour la dimension client
        tests:
          - unique
          - not_null

      - name: customer_id
        description: Clé naturelle du système source
        tests:
          - unique
          - not_null

      - name: email
        description: Adresse email du client
        tests:
          - not_null

      - name: customer_tier
        description: Tier de valeur client basé sur la valeur de vie
        tests:
          - accepted_values:
              values: ["high", "medium", "low"]

      - name: lifetime_value
        description: Montant total payé par le client
        tests:
          - dbt_utils.expression_is_true:
              expression: ">= 0"

  - name: fct_orders
    description: Table de faits de commandes avec tous les transactions de commandes
    tests:
      - dbt_utils.recency:
          datepart: day
          field: created_at
          interval: 1
    columns:
      - name: order_id
        tests:
          - unique
          - not_null
      - name: customer_key
        tests:
          - not_null
          - relationships:
              to: ref('dim_customers')
              field: customer_key

Motif 6 : Macros et code DRY

-- macros/cents_to_dollars.sql
{% macro cents_to_dollars(column_name, precision=2) %}
    round({{ column_name }} / 100.0, {{ precision }})
{% endmacro %}

-- macros/generate_schema_name.sql
{% macro generate_schema_name(custom_schema_name, node) %}
    {%- set default_schema = target.schema -%}
    {%- if custom_schema_name is none -%}
        {{ default_schema }}
    {%- else -%}
        {{ default_schema }}_{{ custom_schema_name }}
    {%- endif -%}
{% endmacro %}

-- macros/limit_data_in_dev.sql
{% macro limit_data_in_dev(column_name, days=3) %}
    {% if target.name == 'dev' %}
        where {{ column_name }} >= dateadd(day, -{{ days }}, current_date)
    {% endif %}
{% endmacro %}

-- Usage in model
select * from {{ ref('stg_orders') }}
{{ limit_data_in_dev('created_at') }}

Motif 7 : Stratégies incrémentielles

-- Delete+Insert (par défaut pour la plupart des entrepôts)
{{
    config(
        materialized='incremental',
        unique_key='id',
        incremental_strategy='delete+insert'
    )
}}

-- Merge (optimal pour les données en retard)
{{
    config(
        materialized='incremental',
        unique_key='id',
        incremental_strategy='merge',
        merge_update_columns=['status', 'amount', 'updated_at']
    )
}}

-- Insert Overwrite (basé sur partitions)
{{
    config(
        materialized='incremental',
        incremental_strategy='insert_overwrite',
        partition_by={
            "field": "created_date",
            "data_type": "date",
            "granularity": "day"
        }
    )
}}

select
    *,
    date(created_at) as created_date
from {{ ref('stg_events') }}

{% if is_incremental() %}
where created_date >= dateadd(day, -3, current_date)
{% endif %}

Commandes dbt

# Development
dbt run                          # Exécuter tous les modèles
dbt run --select staging         # Exécuter les modèles de staging uniquement
dbt run --select +fct_orders     # Exécuter fct_orders et ses dépendances en amont
dbt run --select fct_orders+     # Exécuter fct_orders et ses dépendances en aval
dbt run --full-refresh           # Reconstruire les modèles incrémentiels

# Testing
dbt test                         # Exécuter tous les tests
dbt test --select stg_stripe     # Tester des modèles spécifiques
dbt build                        # Exécuter + tester dans l'ordre du DAG

# Documentation
dbt docs generate                # Générer la documentation
dbt docs serve                   # Servir la documentation localement

# Debugging
dbt compile                      # Compiler le SQL sans exécuter
dbt debug                        # Tester la connexion
dbt ls --select tag:critical     # Lister les modèles par tag

Bonnes pratiques

À faire

  • Utiliser la couche staging - Nettoyer les données une fois, les utiliser partout
  • Tester agressivement - Not null, unique, relationships
  • Documenter tout - Descriptions des colonnes, descriptions des modèles
  • Utiliser le mode incrémental - Pour les tables > 1 M de lignes
  • Contrôle de version - Projet dbt dans Git

À ne pas faire

  • Ne pas sauter le staging - Raw → mart est une dette technique
  • Ne pas coder les dates en dur - Utiliser {{ var('start_date') }}
  • Ne pas répéter la logique - Extraire vers les macros
  • Ne pas tester en production - Utiliser la cible dev
  • Ne pas ignorer la fraîcheur - Surveiller les données source

Skills similaires