data-quality-frameworks

Par wshobson · agents

Implémentez la validation de la qualité des données avec Great Expectations, les tests dbt et les data contracts. À utiliser lors de la construction de pipelines de qualité des données, de l'implémentation de règles de validation ou de l'établissement de data contracts.

npx skills add https://github.com/wshobson/agents --skill data-quality-frameworks

Cadres de Qualité des Données

Modèles de production pour implémenter la qualité des données avec Great Expectations, les tests dbt et les contrats de données afin d'assurer des pipelines de données fiables.

Quand Utiliser Cette Compétence

  • Implémenter des vérifications de qualité des données dans les pipelines
  • Mettre en place la validation Great Expectations
  • Construire des suites de tests dbt complètes
  • Établir des contrats de données entre les équipes
  • Surveiller les métriques de qualité des données
  • Automatiser la validation des données en CI/CD

Concepts Fondamentaux

1. Dimensions de la Qualité des Données

Dimension Description Vérification Exemple
Complétude Aucune valeur manquante expect_column_values_to_not_be_null
Unicité Aucun doublon expect_column_values_to_be_unique
Validité Les valeurs dans la plage attendue expect_column_values_to_be_in_set
Exactitude Les données correspondent à la réalité Validation de référence croisée
Cohérence Aucune contradiction expect_column_pair_values_A_to_be_greater_than_B
Actualité Les données sont récentes expect_column_max_to_be_between

2. Pyramide de Test pour les Données

          /\
         /  \     Tests d'Intégration (entre tables)
        /────\
       /      \   Tests Unitaires (colonne unique)
      /────────\
     /          \ Tests de Schéma (structure)
    /────────────\

Démarrage Rapide

Configuration de Great Expectations

# Installer
pip install great_expectations

# Initialiser le projet
great_expectations init

# Créer une source de données
great_expectations datasource new
# great_expectations/checkpoints/daily_validation.yml
import great_expectations as gx

# Créer le contexte
context = gx.get_context()

# Créer la suite d'attentes
suite = context.add_expectation_suite("orders_suite")

# Ajouter les attentes
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToNotBeNull(column="order_id")
)
suite.add_expectation(
    gx.expectations.ExpectColumnValuesToBeUnique(column="order_id")
)

# Valider
results = context.run_checkpoint(checkpoint_name="daily_orders")

Modèles

Modèle 1 : Suite Great Expectations

# expectations/orders_suite.py
import great_expectations as gx
from great_expectations.core import ExpectationSuite
from great_expectations.core.expectation_configuration import ExpectationConfiguration

def build_orders_suite() -> ExpectationSuite:
    """Construire une suite d'attentes complète pour les commandes"""

    suite = ExpectationSuite(expectation_suite_name="orders_suite")

    # Attentes de schéma
    suite.add_expectation(ExpectationConfiguration(
        expectation_type="expect_table_columns_to_match_set",
        kwargs={
            "column_set": ["order_id", "customer_id", "amount", "status", "created_at"],
            "exact_match": False  # Permettre des colonnes supplémentaires
        }
    ))

    # Clé primaire
    suite.add_expectation(ExpectationConfiguration(
        expectation_type="expect_column_values_to_not_be_null",
        kwargs={"column": "order_id"}
    ))
    suite.add_expectation(ExpectationConfiguration(
        expectation_type="expect_column_values_to_be_unique",
        kwargs={"column": "order_id"}
    ))

    # Clé étrangère
    suite.add_expectation(ExpectationConfiguration(
        expectation_type="expect_column_values_to_not_be_null",
        kwargs={"column": "customer_id"}
    ))

    # Valeurs catégoriques
    suite.add_expectation(ExpectationConfiguration(
        expectation_type="expect_column_values_to_be_in_set",
        kwargs={
            "column": "status",
            "value_set": ["pending", "processing", "shipped", "delivered", "cancelled"]
        }
    ))

    # Plages numériques
    suite.add_expectation(ExpectationConfiguration(
        expectation_type="expect_column_values_to_be_between",
        kwargs={
            "column": "amount",
            "min_value": 0,
            "max_value": 100000,
            "strict_min": True  # amount > 0
        }
    ))

    # Validité de la date
    suite.add_expectation(ExpectationConfiguration(
        expectation_type="expect_column_values_to_be_dateutil_parseable",
        kwargs={"column": "created_at"}
    ))

    # Fraîcheur - les données doivent être récentes
    suite.add_expectation(ExpectationConfiguration(
        expectation_type="expect_column_max_to_be_between",
        kwargs={
            "column": "created_at",
            "min_value": {"$PARAMETER": "now - timedelta(days=1)"},
            "max_value": {"$PARAMETER": "now"}
        }
    ))

    # Vérification du nombre de lignes
    suite.add_expectation(ExpectationConfiguration(
        expectation_type="expect_table_row_count_to_be_between",
        kwargs={
            "min_value": 1000,  # S'attendre à au moins 1000 lignes
            "max_value": 10000000
        }
    ))

    # Attentes statistiques
    suite.add_expectation(ExpectationConfiguration(
        expectation_type="expect_column_mean_to_be_between",
        kwargs={
            "column": "amount",
            "min_value": 50,
            "max_value": 500
        }
    ))

    return suite

Modèle 2 : Point de Contrôle Great Expectations

# great_expectations/checkpoints/orders_checkpoint.yml
name: orders_checkpoint
config_version: 1.0
class_name: Checkpoint
run_name_template: "%Y%m%d-%H%M%S-orders-validation"

validations:
  - batch_request:
      datasource_name: warehouse
      data_connector_name: default_inferred_data_connector_name
      data_asset_name: orders
      data_connector_query:
        index: -1 # Dernier lot
    expectation_suite_name: orders_suite

action_list:
  - name: store_validation_result
    action:
      class_name: StoreValidationResultAction

  - name: store_evaluation_parameters
    action:
      class_name: StoreEvaluationParametersAction

  - name: update_data_docs
    action:
      class_name: UpdateDataDocsAction

  # Notification Slack en cas d'échec
  - name: send_slack_notification
    action:
      class_name: SlackNotificationAction
      slack_webhook: ${SLACK_WEBHOOK}
      notify_on: failure
      renderer:
        module_name: great_expectations.render.renderer.slack_renderer
        class_name: SlackRenderer
# Exécuter le point de contrôle
import great_expectations as gx

context = gx.get_context()
result = context.run_checkpoint(checkpoint_name="orders_checkpoint")

if not result.success:
    failed_expectations = [
        r for r in result.run_results.values()
        if not r.success
    ]
    raise ValueError(f"Data quality check failed: {failed_expectations}")

Modèle 3 : Tests de Données dbt

# models/marts/core/_core__models.yml
version: 2

models:
  - name: fct_orders
    description: Tableau de faits des commandes
    tests:
      # Tests au niveau de la table
      - dbt_utils.recency:
          datepart: day
          field: created_at
          interval: 1
      - dbt_utils.at_least_one
      - dbt_utils.expression_is_true:
          expression: "total_amount >= 0"

    columns:
      - name: order_id
        description: Clé primaire
        tests:
          - unique
          - not_null

      - name: customer_id
        description: Clé étrangère vers dim_customers
        tests:
          - not_null
          - relationships:
              to: ref('dim_customers')
              field: customer_id

      - name: order_status
        tests:
          - accepted_values:
              values:
                ["pending", "processing", "shipped", "delivered", "cancelled"]

      - name: total_amount
        tests:
          - not_null
          - dbt_utils.expression_is_true:
              expression: ">= 0"

      - name: created_at
        tests:
          - not_null
          - dbt_utils.expression_is_true:
              expression: "<= current_timestamp"

  - name: dim_customers
    columns:
      - name: customer_id
        tests:
          - unique
          - not_null

      - name: email
        tests:
          - unique
          - not_null
          # Test regex personnalisé
          - dbt_utils.expression_is_true:
              expression: "email ~ '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\\.[A-Za-z]{2,}$'"

Modèle 4 : Tests dbt Personnalisés

-- tests/generic/test_row_count_in_range.sql
{% test row_count_in_range(model, min_count, max_count) %}

with row_count as (
    select count(*) as cnt from {{ model }}
)

select cnt
from row_count
where cnt < {{ min_count }} or cnt > {{ max_count }}

{% endtest %}

-- Utilisation dans schema.yml :
-- tests:
--   - row_count_in_range:
--       min_count: 1000
--       max_count: 10000000
-- tests/generic/test_sequential_values.sql
{% test sequential_values(model, column_name, interval=1) %}

with lagged as (
    select
        {{ column_name }},
        lag({{ column_name }}) over (order by {{ column_name }}) as prev_value
    from {{ model }}
)

select *
from lagged
where {{ column_name }} - prev_value != {{ interval }}
  and prev_value is not null

{% endtest %}
-- tests/singular/assert_orders_customers_match.sql
-- Test singulier : règle métier spécifique

with orders_customers as (
    select distinct customer_id from {{ ref('fct_orders') }}
),

dim_customers as (
    select customer_id from {{ ref('dim_customers') }}
),

orphaned_orders as (
    select o.customer_id
    from orders_customers o
    left join dim_customers c using (customer_id)
    where c.customer_id is null
)

select * from orphaned_orders
-- Le test réussit si cela retourne 0 lignes

Modèle 5 : Contrats de Données

# contracts/orders_contract.yaml
apiVersion: datacontract.com/v1.0.0
kind: DataContract
metadata:
  name: orders
  version: 1.0.0
  owner: data-platform-team
  contact: data-team@company.com

info:
  title: Contrat de Données pour les Commandes
  description: Contrat pour les données d'événements de commande de la plateforme e-commerce
  purpose: Analytique, reporting et fonctionnalités ML

servers:
  production:
    type: snowflake
    account: company.us-east-1
    database: ANALYTICS
    schema: CORE

terms:
  usage: Analytique interne uniquement
  limitations: Les PII ne doivent pas être exposées dans les marts en aval
  billing: Facturé par To de données scannées

schema:
  type: object
  properties:
    order_id:
      type: string
      format: uuid
      description: Identifiant de commande unique
      required: true
      unique: true
      pii: false

    customer_id:
      type: string
      format: uuid
      description: Identifiant du client
      required: true
      pii: true
      piiClassification: indirect

    total_amount:
      type: number
      minimum: 0
      maximum: 100000
      description: Total de la commande en USD

    created_at:
      type: string
      format: date-time
      description: Horodatage de création de la commande
      required: true

    status:
      type: string
      enum: [pending, processing, shipped, delivered, cancelled]
      description: Statut actuel de la commande

quality:
  type: SodaCL
  specification:
    checks for orders:
      - row_count > 0
      - missing_count(order_id) = 0
      - duplicate_count(order_id) = 0
      - invalid_count(status) = 0:
          valid values: [pending, processing, shipped, delivered, cancelled]
      - freshness(created_at) < 24h

sla:
  availability: 99,9 %
  freshness: 1 heure
  latency: 5 minutes

Modèle 6 : Pipeline de Qualité Automatisé

# quality_pipeline.py
from dataclasses import dataclass
from typing import List, Dict, Any
import great_expectations as gx
from datetime import datetime

@dataclass
class QualityResult:
    table: str
    passed: bool
    total_expectations: int
    failed_expectations: int
    details: List[Dict[str, Any]]
    timestamp: datetime

class DataQualityPipeline:
    """Orchestrer les vérifications de qualité des données entre les tables"""

    def __init__(self, context: gx.DataContext):
        self.context = context
        self.results: List[QualityResult] = []

    def validate_table(self, table: str, suite: str) -> QualityResult:
        """Valider une seule table par rapport à la suite d'attentes"""

        checkpoint_config = {
            "name": f"{table}_validation",
            "config_version": 1.0,
            "class_name": "Checkpoint",
            "validations": [{
                "batch_request": {
                    "datasource_name": "warehouse",
                    "data_asset_name": table,
                },
                "expectation_suite_name": suite,
            }],
        }

        result = self.context.run_checkpoint(**checkpoint_config)

        # Analyser les résultats
        validation_result = list(result.run_results.values())[0]
        results = validation_result.results

        failed = [r for r in results if not r.success]

        return QualityResult(
            table=table,
            passed=result.success,
            total_expectations=len(results),
            failed_expectations=len(failed),
            details=[{
                "expectation": r.expectation_config.expectation_type,
                "success": r.success,
                "observed_value": r.result.get("observed_value"),
            } for r in results],
            timestamp=datetime.now()
        )

    def run_all(self, tables: Dict[str, str]) -> Dict[str, QualityResult]:
        """Exécuter la validation pour toutes les tables"""
        results = {}

        for table, suite in tables.items():
            print(f"Validating {table}...")
            results[table] = self.validate_table(table, suite)

        return results

    def generate_report(self, results: Dict[str, QualityResult]) -> str:
        """Générer un rapport de qualité"""
        report = ["# Rapport de Qualité des Données", f"Généré : {datetime.now()}", ""]

        total_passed = sum(1 for r in results.values() if r.passed)
        total_tables = len(results)

        report.append(f"## Résumé : {total_passed}/{total_tables} tables validées")
        report.append("")

        for table, result in results.items():
            status = "✅" if result.passed else "❌"
            report.append(f"### {status} {table}")
            report.append(f"- Attentes : {result.total_expectations}")
            report.append(f"- Échouées : {result.failed_expectations}")

            if not result.passed:
                report.append("- Vérifications échouées :")
                for detail in result.details:
                    if not detail["success"]:
                        report.append(f"  - {detail['expectation']}: {detail['observed_value']}")
            report.append("")

        return "\n".join(report)

# Utilisation
context = gx.get_context()
pipeline = DataQualityPipeline(context)

tables_to_validate = {
    "orders": "orders_suite",
    "customers": "customers_suite",
    "products": "products_suite",
}

results = pipeline.run_all(tables_to_validate)
report = pipeline.generate_report(results)

# Échouer le pipeline si une table a échoué
if not all(r.passed for r in results.values()):
    print(report)
    raise ValueError("Data quality checks failed!")

Bonnes Pratiques

À Faire

  • Tester tôt - Valider les données sources avant les transformations
  • Tester progressivement - Ajouter des tests à mesure que vous trouvez des problèmes
  • Documenter les attentes - Descriptions claires pour chaque test
  • Alerte en cas d'échec - Intégrer avec la surveillance
  • Versionnifier les contrats - Suivre les changements de schéma

À Éviter

  • Ne pas tout tester - Se concentrer sur les colonnes critiques
  • Ne pas ignorer les avertissements - Ils précèdent souvent les échecs
  • Ne pas sauter la fraîcheur - Les données obsolètes sont de mauvaises données
  • Ne pas coder en dur les seuils - Utiliser des lignes de base dynamiques
  • Ne pas tester isolément - Tester les relations aussi

Skills similaires