Cadres de Qualité des Données
Modèles de production pour implémenter la qualité des données avec Great Expectations, les tests dbt et les contrats de données afin d'assurer des pipelines de données fiables.
Quand Utiliser Cette Compétence
- Implémenter des vérifications de qualité des données dans les pipelines
- Mettre en place la validation Great Expectations
- Construire des suites de tests dbt complètes
- Établir des contrats de données entre les équipes
- Surveiller les métriques de qualité des données
- Automatiser la validation des données en CI/CD
Concepts Fondamentaux
1. Dimensions de la Qualité des Données
| Dimension | Description | Vérification Exemple |
|---|---|---|
| Complétude | Aucune valeur manquante | expect_column_values_to_not_be_null |
| Unicité | Aucun doublon | expect_column_values_to_be_unique |
| Validité | Les valeurs dans la plage attendue | expect_column_values_to_be_in_set |
| Exactitude | Les données correspondent à la réalité | Validation de référence croisée |
| Cohérence | Aucune contradiction | expect_column_pair_values_A_to_be_greater_than_B |
| Actualité | Les données sont récentes | expect_column_max_to_be_between |
2. Pyramide de Test pour les Données
/\
/ \ Tests d'Intégration (entre tables)
/────\
/ \ Tests Unitaires (colonne unique)
/────────\
/ \ Tests de Schéma (structure)
/────────────\
Démarrage Rapide
Configuration de Great Expectations
# Installer
pip install great_expectations
# Initialiser le projet
great_expectations init
# Créer une source de données
great_expectations datasource new
# great_expectations/checkpoints/daily_validation.yml
import great_expectations as gx
# Créer le contexte
context = gx.get_context()
# Créer la suite d'attentes
suite = context.add_expectation_suite("orders_suite")
# Ajouter les attentes
suite.add_expectation(
gx.expectations.ExpectColumnValuesToNotBeNull(column="order_id")
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeUnique(column="order_id")
)
# Valider
results = context.run_checkpoint(checkpoint_name="daily_orders")
Modèles
Modèle 1 : Suite Great Expectations
# expectations/orders_suite.py
import great_expectations as gx
from great_expectations.core import ExpectationSuite
from great_expectations.core.expectation_configuration import ExpectationConfiguration
def build_orders_suite() -> ExpectationSuite:
"""Construire une suite d'attentes complète pour les commandes"""
suite = ExpectationSuite(expectation_suite_name="orders_suite")
# Attentes de schéma
suite.add_expectation(ExpectationConfiguration(
expectation_type="expect_table_columns_to_match_set",
kwargs={
"column_set": ["order_id", "customer_id", "amount", "status", "created_at"],
"exact_match": False # Permettre des colonnes supplémentaires
}
))
# Clé primaire
suite.add_expectation(ExpectationConfiguration(
expectation_type="expect_column_values_to_not_be_null",
kwargs={"column": "order_id"}
))
suite.add_expectation(ExpectationConfiguration(
expectation_type="expect_column_values_to_be_unique",
kwargs={"column": "order_id"}
))
# Clé étrangère
suite.add_expectation(ExpectationConfiguration(
expectation_type="expect_column_values_to_not_be_null",
kwargs={"column": "customer_id"}
))
# Valeurs catégoriques
suite.add_expectation(ExpectationConfiguration(
expectation_type="expect_column_values_to_be_in_set",
kwargs={
"column": "status",
"value_set": ["pending", "processing", "shipped", "delivered", "cancelled"]
}
))
# Plages numériques
suite.add_expectation(ExpectationConfiguration(
expectation_type="expect_column_values_to_be_between",
kwargs={
"column": "amount",
"min_value": 0,
"max_value": 100000,
"strict_min": True # amount > 0
}
))
# Validité de la date
suite.add_expectation(ExpectationConfiguration(
expectation_type="expect_column_values_to_be_dateutil_parseable",
kwargs={"column": "created_at"}
))
# Fraîcheur - les données doivent être récentes
suite.add_expectation(ExpectationConfiguration(
expectation_type="expect_column_max_to_be_between",
kwargs={
"column": "created_at",
"min_value": {"$PARAMETER": "now - timedelta(days=1)"},
"max_value": {"$PARAMETER": "now"}
}
))
# Vérification du nombre de lignes
suite.add_expectation(ExpectationConfiguration(
expectation_type="expect_table_row_count_to_be_between",
kwargs={
"min_value": 1000, # S'attendre à au moins 1000 lignes
"max_value": 10000000
}
))
# Attentes statistiques
suite.add_expectation(ExpectationConfiguration(
expectation_type="expect_column_mean_to_be_between",
kwargs={
"column": "amount",
"min_value": 50,
"max_value": 500
}
))
return suite
Modèle 2 : Point de Contrôle Great Expectations
# great_expectations/checkpoints/orders_checkpoint.yml
name: orders_checkpoint
config_version: 1.0
class_name: Checkpoint
run_name_template: "%Y%m%d-%H%M%S-orders-validation"
validations:
- batch_request:
datasource_name: warehouse
data_connector_name: default_inferred_data_connector_name
data_asset_name: orders
data_connector_query:
index: -1 # Dernier lot
expectation_suite_name: orders_suite
action_list:
- name: store_validation_result
action:
class_name: StoreValidationResultAction
- name: store_evaluation_parameters
action:
class_name: StoreEvaluationParametersAction
- name: update_data_docs
action:
class_name: UpdateDataDocsAction
# Notification Slack en cas d'échec
- name: send_slack_notification
action:
class_name: SlackNotificationAction
slack_webhook: ${SLACK_WEBHOOK}
notify_on: failure
renderer:
module_name: great_expectations.render.renderer.slack_renderer
class_name: SlackRenderer
# Exécuter le point de contrôle
import great_expectations as gx
context = gx.get_context()
result = context.run_checkpoint(checkpoint_name="orders_checkpoint")
if not result.success:
failed_expectations = [
r for r in result.run_results.values()
if not r.success
]
raise ValueError(f"Data quality check failed: {failed_expectations}")
Modèle 3 : Tests de Données dbt
# models/marts/core/_core__models.yml
version: 2
models:
- name: fct_orders
description: Tableau de faits des commandes
tests:
# Tests au niveau de la table
- dbt_utils.recency:
datepart: day
field: created_at
interval: 1
- dbt_utils.at_least_one
- dbt_utils.expression_is_true:
expression: "total_amount >= 0"
columns:
- name: order_id
description: Clé primaire
tests:
- unique
- not_null
- name: customer_id
description: Clé étrangère vers dim_customers
tests:
- not_null
- relationships:
to: ref('dim_customers')
field: customer_id
- name: order_status
tests:
- accepted_values:
values:
["pending", "processing", "shipped", "delivered", "cancelled"]
- name: total_amount
tests:
- not_null
- dbt_utils.expression_is_true:
expression: ">= 0"
- name: created_at
tests:
- not_null
- dbt_utils.expression_is_true:
expression: "<= current_timestamp"
- name: dim_customers
columns:
- name: customer_id
tests:
- unique
- not_null
- name: email
tests:
- unique
- not_null
# Test regex personnalisé
- dbt_utils.expression_is_true:
expression: "email ~ '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\\.[A-Za-z]{2,}$'"
Modèle 4 : Tests dbt Personnalisés
-- tests/generic/test_row_count_in_range.sql
{% test row_count_in_range(model, min_count, max_count) %}
with row_count as (
select count(*) as cnt from {{ model }}
)
select cnt
from row_count
where cnt < {{ min_count }} or cnt > {{ max_count }}
{% endtest %}
-- Utilisation dans schema.yml :
-- tests:
-- - row_count_in_range:
-- min_count: 1000
-- max_count: 10000000
-- tests/generic/test_sequential_values.sql
{% test sequential_values(model, column_name, interval=1) %}
with lagged as (
select
{{ column_name }},
lag({{ column_name }}) over (order by {{ column_name }}) as prev_value
from {{ model }}
)
select *
from lagged
where {{ column_name }} - prev_value != {{ interval }}
and prev_value is not null
{% endtest %}
-- tests/singular/assert_orders_customers_match.sql
-- Test singulier : règle métier spécifique
with orders_customers as (
select distinct customer_id from {{ ref('fct_orders') }}
),
dim_customers as (
select customer_id from {{ ref('dim_customers') }}
),
orphaned_orders as (
select o.customer_id
from orders_customers o
left join dim_customers c using (customer_id)
where c.customer_id is null
)
select * from orphaned_orders
-- Le test réussit si cela retourne 0 lignes
Modèle 5 : Contrats de Données
# contracts/orders_contract.yaml
apiVersion: datacontract.com/v1.0.0
kind: DataContract
metadata:
name: orders
version: 1.0.0
owner: data-platform-team
contact: data-team@company.com
info:
title: Contrat de Données pour les Commandes
description: Contrat pour les données d'événements de commande de la plateforme e-commerce
purpose: Analytique, reporting et fonctionnalités ML
servers:
production:
type: snowflake
account: company.us-east-1
database: ANALYTICS
schema: CORE
terms:
usage: Analytique interne uniquement
limitations: Les PII ne doivent pas être exposées dans les marts en aval
billing: Facturé par To de données scannées
schema:
type: object
properties:
order_id:
type: string
format: uuid
description: Identifiant de commande unique
required: true
unique: true
pii: false
customer_id:
type: string
format: uuid
description: Identifiant du client
required: true
pii: true
piiClassification: indirect
total_amount:
type: number
minimum: 0
maximum: 100000
description: Total de la commande en USD
created_at:
type: string
format: date-time
description: Horodatage de création de la commande
required: true
status:
type: string
enum: [pending, processing, shipped, delivered, cancelled]
description: Statut actuel de la commande
quality:
type: SodaCL
specification:
checks for orders:
- row_count > 0
- missing_count(order_id) = 0
- duplicate_count(order_id) = 0
- invalid_count(status) = 0:
valid values: [pending, processing, shipped, delivered, cancelled]
- freshness(created_at) < 24h
sla:
availability: 99,9 %
freshness: 1 heure
latency: 5 minutes
Modèle 6 : Pipeline de Qualité Automatisé
# quality_pipeline.py
from dataclasses import dataclass
from typing import List, Dict, Any
import great_expectations as gx
from datetime import datetime
@dataclass
class QualityResult:
table: str
passed: bool
total_expectations: int
failed_expectations: int
details: List[Dict[str, Any]]
timestamp: datetime
class DataQualityPipeline:
"""Orchestrer les vérifications de qualité des données entre les tables"""
def __init__(self, context: gx.DataContext):
self.context = context
self.results: List[QualityResult] = []
def validate_table(self, table: str, suite: str) -> QualityResult:
"""Valider une seule table par rapport à la suite d'attentes"""
checkpoint_config = {
"name": f"{table}_validation",
"config_version": 1.0,
"class_name": "Checkpoint",
"validations": [{
"batch_request": {
"datasource_name": "warehouse",
"data_asset_name": table,
},
"expectation_suite_name": suite,
}],
}
result = self.context.run_checkpoint(**checkpoint_config)
# Analyser les résultats
validation_result = list(result.run_results.values())[0]
results = validation_result.results
failed = [r for r in results if not r.success]
return QualityResult(
table=table,
passed=result.success,
total_expectations=len(results),
failed_expectations=len(failed),
details=[{
"expectation": r.expectation_config.expectation_type,
"success": r.success,
"observed_value": r.result.get("observed_value"),
} for r in results],
timestamp=datetime.now()
)
def run_all(self, tables: Dict[str, str]) -> Dict[str, QualityResult]:
"""Exécuter la validation pour toutes les tables"""
results = {}
for table, suite in tables.items():
print(f"Validating {table}...")
results[table] = self.validate_table(table, suite)
return results
def generate_report(self, results: Dict[str, QualityResult]) -> str:
"""Générer un rapport de qualité"""
report = ["# Rapport de Qualité des Données", f"Généré : {datetime.now()}", ""]
total_passed = sum(1 for r in results.values() if r.passed)
total_tables = len(results)
report.append(f"## Résumé : {total_passed}/{total_tables} tables validées")
report.append("")
for table, result in results.items():
status = "✅" if result.passed else "❌"
report.append(f"### {status} {table}")
report.append(f"- Attentes : {result.total_expectations}")
report.append(f"- Échouées : {result.failed_expectations}")
if not result.passed:
report.append("- Vérifications échouées :")
for detail in result.details:
if not detail["success"]:
report.append(f" - {detail['expectation']}: {detail['observed_value']}")
report.append("")
return "\n".join(report)
# Utilisation
context = gx.get_context()
pipeline = DataQualityPipeline(context)
tables_to_validate = {
"orders": "orders_suite",
"customers": "customers_suite",
"products": "products_suite",
}
results = pipeline.run_all(tables_to_validate)
report = pipeline.generate_report(results)
# Échouer le pipeline si une table a échoué
if not all(r.passed for r in results.values()):
print(report)
raise ValueError("Data quality checks failed!")
Bonnes Pratiques
À Faire
- Tester tôt - Valider les données sources avant les transformations
- Tester progressivement - Ajouter des tests à mesure que vous trouvez des problèmes
- Documenter les attentes - Descriptions claires pour chaque test
- Alerte en cas d'échec - Intégrer avec la surveillance
- Versionnifier les contrats - Suivre les changements de schéma
À Éviter
- Ne pas tout tester - Se concentrer sur les colonnes critiques
- Ne pas ignorer les avertissements - Ils précèdent souvent les échecs
- Ne pas sauter la fraîcheur - Les données obsolètes sont de mauvaises données
- Ne pas coder en dur les seuils - Utiliser des lignes de base dynamiques
- Ne pas tester isolément - Tester les relations aussi