Motifs de Transformation dbt
Motifs prêts pour la production avec dbt (data build tool) incluant l'organisation des modèles, les stratégies de test, la documentation et le traitement incrémental.
Quand utiliser cette compétence
- Construire des pipelines de transformation de données avec dbt
- Organiser les modèles en couches staging, intermediate et marts
- Implémenter des tests de qualité des données
- Créer des modèles incrémentiels pour les grands ensembles de données
- Documenter les modèles de données et la lignée
- Configurer la structure du projet dbt
Concepts fondamentaux
1. Couches de modèles (Architecture Medallion)
sources/ Définitions des données brutes
↓
staging/ Correspondance 1:1 avec la source, nettoyage léger
↓
intermediate/ Logique métier, jointures, agrégations
↓
marts/ Tables d'analytics finales
2. Conventions de nommage
| Couche | Préfixe | Exemple |
|---|---|---|
| Staging | stg_ |
stg_stripe__payments |
| Intermediate | int_ |
int_payments_pivoted |
| Marts | dim_, fct_ |
dim_customers, fct_orders |
Démarrage rapide
# dbt_project.yml
name: "analytics"
version: "1.0.0"
profile: "analytics"
model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["seeds"]
macro-paths: ["macros"]
vars:
start_date: "2020-01-01"
models:
analytics:
staging:
+materialized: view
+schema: staging
intermediate:
+materialized: ephemeral
marts:
+materialized: table
+schema: analytics
# Structure du projet
models/
├── staging/
│ ├── stripe/
│ │ ├── _stripe__sources.yml
│ │ ├── _stripe__models.yml
│ │ ├── stg_stripe__customers.sql
│ │ └── stg_stripe__payments.sql
│ └── shopify/
│ ├── _shopify__sources.yml
│ └── stg_shopify__orders.sql
├── intermediate/
│ └── finance/
│ └── int_payments_pivoted.sql
└── marts/
├── core/
│ ├── _core__models.yml
│ ├── dim_customers.sql
│ └── fct_orders.sql
└── finance/
└── fct_revenue.sql
Motifs
Motif 1 : Définitions des sources
# models/staging/stripe/_stripe__sources.yml
version: 2
sources:
- name: stripe
description: Données brutes de Stripe chargées via Fivetran
database: raw
schema: stripe
loader: fivetran
loaded_at_field: _fivetran_synced
freshness:
warn_after: { count: 12, period: hour }
error_after: { count: 24, period: hour }
tables:
- name: customers
description: Enregistrements de clients Stripe
columns:
- name: id
description: Clé primaire
tests:
- unique
- not_null
- name: email
description: Email du client
- name: created
description: Timestamp de création du compte
- name: payments
description: Transactions de paiement Stripe
columns:
- name: id
tests:
- unique
- not_null
- name: customer_id
tests:
- not_null
- relationships:
to: source('stripe', 'customers')
field: id
Motif 2 : Modèles de staging
-- models/staging/stripe/stg_stripe__customers.sql
with source as (
select * from {{ source('stripe', 'customers') }}
),
renamed as (
select
-- ids
id as customer_id,
-- strings
lower(email) as email,
name as customer_name,
-- timestamps
created as created_at,
-- metadata
_fivetran_synced as _loaded_at
from source
)
select * from renamed
-- models/staging/stripe/stg_stripe__payments.sql
{{
config(
materialized='incremental',
unique_key='payment_id',
on_schema_change='append_new_columns'
)
}}
with source as (
select * from {{ source('stripe', 'payments') }}
{% if is_incremental() %}
where _fivetran_synced > (select max(_loaded_at) from {{ this }})
{% endif %}
),
renamed as (
select
-- ids
id as payment_id,
customer_id,
invoice_id,
-- amounts (convert cents to dollars)
amount / 100.0 as amount,
amount_refunded / 100.0 as amount_refunded,
-- status
status as payment_status,
-- timestamps
created as created_at,
-- metadata
_fivetran_synced as _loaded_at
from source
)
select * from renamed
Motif 3 : Modèles intermédiaires
-- models/intermediate/finance/int_payments_pivoted_to_customer.sql
with payments as (
select * from {{ ref('stg_stripe__payments') }}
),
customers as (
select * from {{ ref('stg_stripe__customers') }}
),
payment_summary as (
select
customer_id,
count(*) as total_payments,
count(case when payment_status = 'succeeded' then 1 end) as successful_payments,
sum(case when payment_status = 'succeeded' then amount else 0 end) as total_amount_paid,
min(created_at) as first_payment_at,
max(created_at) as last_payment_at
from payments
group by customer_id
)
select
customers.customer_id,
customers.email,
customers.created_at as customer_created_at,
coalesce(payment_summary.total_payments, 0) as total_payments,
coalesce(payment_summary.successful_payments, 0) as successful_payments,
coalesce(payment_summary.total_amount_paid, 0) as lifetime_value,
payment_summary.first_payment_at,
payment_summary.last_payment_at
from customers
left join payment_summary using (customer_id)
Motif 4 : Modèles Mart (Dimensions et Faits)
-- models/marts/core/dim_customers.sql
{{
config(
materialized='table',
unique_key='customer_id'
)
}}
with customers as (
select * from {{ ref('int_payments_pivoted_to_customer') }}
),
orders as (
select * from {{ ref('stg_shopify__orders') }}
),
order_summary as (
select
customer_id,
count(*) as total_orders,
sum(total_price) as total_order_value,
min(created_at) as first_order_at,
max(created_at) as last_order_at
from orders
group by customer_id
),
final as (
select
-- surrogate key
{{ dbt_utils.generate_surrogate_key(['customers.customer_id']) }} as customer_key,
-- natural key
customers.customer_id,
-- attributes
customers.email,
customers.customer_created_at,
-- payment metrics
customers.total_payments,
customers.successful_payments,
customers.lifetime_value,
customers.first_payment_at,
customers.last_payment_at,
-- order metrics
coalesce(order_summary.total_orders, 0) as total_orders,
coalesce(order_summary.total_order_value, 0) as total_order_value,
order_summary.first_order_at,
order_summary.last_order_at,
-- calculated fields
case
when customers.lifetime_value >= 1000 then 'high'
when customers.lifetime_value >= 100 then 'medium'
else 'low'
end as customer_tier,
-- timestamps
current_timestamp as _loaded_at
from customers
left join order_summary using (customer_id)
)
select * from final
-- models/marts/core/fct_orders.sql
{{
config(
materialized='incremental',
unique_key='order_id',
incremental_strategy='merge'
)
}}
with orders as (
select * from {{ ref('stg_shopify__orders') }}
{% if is_incremental() %}
where updated_at > (select max(updated_at) from {{ this }})
{% endif %}
),
customers as (
select * from {{ ref('dim_customers') }}
),
final as (
select
-- keys
orders.order_id,
customers.customer_key,
orders.customer_id,
-- dimensions
orders.order_status,
orders.fulfillment_status,
orders.payment_status,
-- measures
orders.subtotal,
orders.tax,
orders.shipping,
orders.total_price,
orders.total_discount,
orders.item_count,
-- timestamps
orders.created_at,
orders.updated_at,
orders.fulfilled_at,
-- metadata
current_timestamp as _loaded_at
from orders
left join customers on orders.customer_id = customers.customer_id
)
select * from final
Motif 5 : Tests et documentation
# models/marts/core/_core__models.yml
version: 2
models:
- name: dim_customers
description: Dimension client avec métriques de paiement et de commande
columns:
- name: customer_key
description: Clé de substitution pour la dimension client
tests:
- unique
- not_null
- name: customer_id
description: Clé naturelle du système source
tests:
- unique
- not_null
- name: email
description: Adresse email du client
tests:
- not_null
- name: customer_tier
description: Tier de valeur client basé sur la valeur de vie
tests:
- accepted_values:
values: ["high", "medium", "low"]
- name: lifetime_value
description: Montant total payé par le client
tests:
- dbt_utils.expression_is_true:
expression: ">= 0"
- name: fct_orders
description: Table de faits de commandes avec tous les transactions de commandes
tests:
- dbt_utils.recency:
datepart: day
field: created_at
interval: 1
columns:
- name: order_id
tests:
- unique
- not_null
- name: customer_key
tests:
- not_null
- relationships:
to: ref('dim_customers')
field: customer_key
Motif 6 : Macros et code DRY
-- macros/cents_to_dollars.sql
{% macro cents_to_dollars(column_name, precision=2) %}
round({{ column_name }} / 100.0, {{ precision }})
{% endmacro %}
-- macros/generate_schema_name.sql
{% macro generate_schema_name(custom_schema_name, node) %}
{%- set default_schema = target.schema -%}
{%- if custom_schema_name is none -%}
{{ default_schema }}
{%- else -%}
{{ default_schema }}_{{ custom_schema_name }}
{%- endif -%}
{% endmacro %}
-- macros/limit_data_in_dev.sql
{% macro limit_data_in_dev(column_name, days=3) %}
{% if target.name == 'dev' %}
where {{ column_name }} >= dateadd(day, -{{ days }}, current_date)
{% endif %}
{% endmacro %}
-- Usage in model
select * from {{ ref('stg_orders') }}
{{ limit_data_in_dev('created_at') }}
Motif 7 : Stratégies incrémentielles
-- Delete+Insert (par défaut pour la plupart des entrepôts)
{{
config(
materialized='incremental',
unique_key='id',
incremental_strategy='delete+insert'
)
}}
-- Merge (optimal pour les données en retard)
{{
config(
materialized='incremental',
unique_key='id',
incremental_strategy='merge',
merge_update_columns=['status', 'amount', 'updated_at']
)
}}
-- Insert Overwrite (basé sur partitions)
{{
config(
materialized='incremental',
incremental_strategy='insert_overwrite',
partition_by={
"field": "created_date",
"data_type": "date",
"granularity": "day"
}
)
}}
select
*,
date(created_at) as created_date
from {{ ref('stg_events') }}
{% if is_incremental() %}
where created_date >= dateadd(day, -3, current_date)
{% endif %}
Commandes dbt
# Development
dbt run # Exécuter tous les modèles
dbt run --select staging # Exécuter les modèles de staging uniquement
dbt run --select +fct_orders # Exécuter fct_orders et ses dépendances en amont
dbt run --select fct_orders+ # Exécuter fct_orders et ses dépendances en aval
dbt run --full-refresh # Reconstruire les modèles incrémentiels
# Testing
dbt test # Exécuter tous les tests
dbt test --select stg_stripe # Tester des modèles spécifiques
dbt build # Exécuter + tester dans l'ordre du DAG
# Documentation
dbt docs generate # Générer la documentation
dbt docs serve # Servir la documentation localement
# Debugging
dbt compile # Compiler le SQL sans exécuter
dbt debug # Tester la connexion
dbt ls --select tag:critical # Lister les modèles par tag
Bonnes pratiques
À faire
- Utiliser la couche staging - Nettoyer les données une fois, les utiliser partout
- Tester agressivement - Not null, unique, relationships
- Documenter tout - Descriptions des colonnes, descriptions des modèles
- Utiliser le mode incrémental - Pour les tables > 1 M de lignes
- Contrôle de version - Projet dbt dans Git
À ne pas faire
- Ne pas sauter le staging - Raw → mart est une dette technique
- Ne pas coder les dates en dur - Utiliser
{{ var('start_date') }} - Ne pas répéter la logique - Extraire vers les macros
- Ne pas tester en production - Utiliser la cible dev
- Ne pas ignorer la fraîcheur - Surveiller les données source