Lineage en amont : Sources
Tracez les origines des données - répondez à « D'où viennent ces données ? »
Enquête de lineage
Étape 1 : Identifier le type de cible
Déterminez ce que nous traçons :
- Table : Tracez ce qui remplit cette table
- Colonne : Tracez d'où vient cette colonne spécifique
- DAG : Tracez quelles sources de données ce DAG lit
Étape 2 : Trouver le DAG producteur
Les tables sont généralement remplies par des DAGs Airflow. Trouvez la connexion :
-
Recherchez les DAGs par nom : Utilisez
af dags listet cherchez les noms de DAG correspondant au nom de la tableload_customers-> tablecustomersetl_daily_orders-> tableorders
-
Explorez le code source du DAG : Utilisez
af dags source <dag_id>pour lire la définition du DAG- Cherchez les instructions INSERT, MERGE, CREATE TABLE
- Trouvez la table cible dans le code
-
Vérifiez les tâches du DAG : Utilisez
af tasks list <dag_id>pour voir quelles opérations le DAG effectue
Sur Astro
Si vous utilisez Astro, l'onglet Lineage dans l'interface Astro offre une exploration visuelle du lineage entre DAGs et datasets. Utilisez-le pour tracer rapidement les dépendances en amont sans chercher manuellement le code source des DAGs.
Sur OSS Airflow
Utilisez le code source des DAGs et les logs des tâches pour tracer le lineage (pas d'interface multi-DAG intégrée).
Étape 3 : Tracer les sources de données
À partir du code du DAG, identifiez les tables et systèmes sources :
Sources SQL (cherchez les clauses FROM) :
# Dans le code du DAG :
SELECT * FROM source_schema.source_table # <- Ceci est une source en amont
Sources externes (cherchez les références de connexion) :
S3Operator-> source de bucket S3PostgresOperator-> source de base de données PostgresSalesforceOperator-> source d'API SalesforceHttpOperator-> source d'API REST
Sources de fichiers :
- Fichiers CSV/Parquet dans le stockage objet
- Dépôts SFTP
- Chemins de fichiers locaux
Étape 4 : Construire la chaîne de lineage
Tracez récursivement chaque source :
CIBLE : analytics.orders_daily
^
+-- DAG : etl_daily_orders
^
+-- SOURCE : raw.orders (table)
| ^
| +-- DAG : ingest_orders
| ^
| +-- SOURCE : API Salesforce (externe)
|
+-- SOURCE : dim.customers (table)
^
+-- DAG : load_customers
^
+-- SOURCE : PostgreSQL (BD externe)
Étape 5 : Vérifier la santé de la source
Pour chaque source en amont :
- Tables : Vérifiez la fraîcheur avec la skill checking-freshness
- DAGs : Vérifiez le statut des exécutions récentes avec
af dags stats - Systèmes externes : Notez les informations de connexion du code du DAG
Lineage pour les colonnes
Lors du traçage d'une colonne spécifique :
- Trouvez la colonne dans le schéma de la table cible
- Cherchez dans le code source du DAG les références à ce nom de colonne
- Tracez les transformations :
- Mappages directs :
source.col AS target_col - Transformations :
COALESCE(a.col, b.col) AS target_col - Agrégations :
SUM(detail.amount) AS total_amount
- Mappages directs :
Résultat : Rapport de lineage
Résumé
Réponse en une ligne : « Cette table est remplie par le DAG X à partir des sources Y et Z »
Diagramme de lineage
[Salesforce] --> [raw.opportunities] --> [stg.opportunities] --> [fct.sales]
| |
DAG : ingest_sfdc DAG : transform_sales
Détails des sources
| Source | Type | Connexion | Fraîcheur | Propriétaire |
|---|---|---|---|---|
| raw.orders | Table | Interne | il y a 2h | data-team |
| Salesforce | API | salesforce_conn | Temps réel | sales-ops |
Chaîne de transformation
Décrivez comment les données circulent et se transforment :
- Les données brutes arrivent dans
raw.ordersvia la synchronisation de l'API Salesforce - Le DAG
transform_ordersnettoie et déduplique dansstg.orders - Le DAG
build_order_factsfusionne avec les dimensions dansfct.orders
Implications de qualité des données
- Points de défaillance uniques ?
- Sources en amont obsolètes ?
- Chaînes de transformation complexes qui pourraient se casser ?
Skills associées
- Vérifier la fraîcheur de la source : skill checking-freshness
- Déboguer un DAG source : skill debugging-dags
- Tracer les impacts en aval : skill tracing-downstream-lineage
- Ajouter des annotations manuelles de lineage : skill annotating-task-lineage
- Construire des extracteurs de lineage personnalisés : skill creating-openlineage-extractors