tracing-upstream-lineage

Tracer la lignée des données en amont. À utiliser lorsque l'utilisateur demande d'où proviennent les données, ce qui alimente une table, les dépendances en amont, les sources de données, ou lorsqu'il a besoin de comprendre l'origine des données.

npx skills add https://github.com/astronomer/agents --skill tracing-upstream-lineage

Lineage en amont : Sources

Tracez les origines des données - répondez à « D'où viennent ces données ? »

Enquête de lineage

Étape 1 : Identifier le type de cible

Déterminez ce que nous traçons :

  • Table : Tracez ce qui remplit cette table
  • Colonne : Tracez d'où vient cette colonne spécifique
  • DAG : Tracez quelles sources de données ce DAG lit

Étape 2 : Trouver le DAG producteur

Les tables sont généralement remplies par des DAGs Airflow. Trouvez la connexion :

  1. Recherchez les DAGs par nom : Utilisez af dags list et cherchez les noms de DAG correspondant au nom de la table

    • load_customers -> table customers
    • etl_daily_orders -> table orders
  2. Explorez le code source du DAG : Utilisez af dags source <dag_id> pour lire la définition du DAG

    • Cherchez les instructions INSERT, MERGE, CREATE TABLE
    • Trouvez la table cible dans le code
  3. Vérifiez les tâches du DAG : Utilisez af tasks list <dag_id> pour voir quelles opérations le DAG effectue

Sur Astro

Si vous utilisez Astro, l'onglet Lineage dans l'interface Astro offre une exploration visuelle du lineage entre DAGs et datasets. Utilisez-le pour tracer rapidement les dépendances en amont sans chercher manuellement le code source des DAGs.

Sur OSS Airflow

Utilisez le code source des DAGs et les logs des tâches pour tracer le lineage (pas d'interface multi-DAG intégrée).

Étape 3 : Tracer les sources de données

À partir du code du DAG, identifiez les tables et systèmes sources :

Sources SQL (cherchez les clauses FROM) :

# Dans le code du DAG :
SELECT * FROM source_schema.source_table  # <- Ceci est une source en amont

Sources externes (cherchez les références de connexion) :

  • S3Operator -> source de bucket S3
  • PostgresOperator -> source de base de données Postgres
  • SalesforceOperator -> source d'API Salesforce
  • HttpOperator -> source d'API REST

Sources de fichiers :

  • Fichiers CSV/Parquet dans le stockage objet
  • Dépôts SFTP
  • Chemins de fichiers locaux

Étape 4 : Construire la chaîne de lineage

Tracez récursivement chaque source :

CIBLE : analytics.orders_daily
    ^
    +-- DAG : etl_daily_orders
            ^
            +-- SOURCE : raw.orders (table)
            |       ^
            |       +-- DAG : ingest_orders
            |               ^
            |               +-- SOURCE : API Salesforce (externe)
            |
            +-- SOURCE : dim.customers (table)
                    ^
                    +-- DAG : load_customers
                            ^
                            +-- SOURCE : PostgreSQL (BD externe)

Étape 5 : Vérifier la santé de la source

Pour chaque source en amont :

  • Tables : Vérifiez la fraîcheur avec la skill checking-freshness
  • DAGs : Vérifiez le statut des exécutions récentes avec af dags stats
  • Systèmes externes : Notez les informations de connexion du code du DAG

Lineage pour les colonnes

Lors du traçage d'une colonne spécifique :

  1. Trouvez la colonne dans le schéma de la table cible
  2. Cherchez dans le code source du DAG les références à ce nom de colonne
  3. Tracez les transformations :
    • Mappages directs : source.col AS target_col
    • Transformations : COALESCE(a.col, b.col) AS target_col
    • Agrégations : SUM(detail.amount) AS total_amount

Résultat : Rapport de lineage

Résumé

Réponse en une ligne : « Cette table est remplie par le DAG X à partir des sources Y et Z »

Diagramme de lineage

[Salesforce] --> [raw.opportunities] --> [stg.opportunities] --> [fct.sales]
                        |                        |
                   DAG : ingest_sfdc         DAG : transform_sales

Détails des sources

Source Type Connexion Fraîcheur Propriétaire
raw.orders Table Interne il y a 2h data-team
Salesforce API salesforce_conn Temps réel sales-ops

Chaîne de transformation

Décrivez comment les données circulent et se transforment :

  1. Les données brutes arrivent dans raw.orders via la synchronisation de l'API Salesforce
  2. Le DAG transform_orders nettoie et déduplique dans stg.orders
  3. Le DAG build_order_facts fusionne avec les dimensions dans fct.orders

Implications de qualité des données

  • Points de défaillance uniques ?
  • Sources en amont obsolètes ?
  • Chaînes de transformation complexes qui pourraient se casser ?

Skills associées

  • Vérifier la fraîcheur de la source : skill checking-freshness
  • Déboguer un DAG source : skill debugging-dags
  • Tracer les impacts en aval : skill tracing-downstream-lineage
  • Ajouter des annotations manuelles de lineage : skill annotating-task-lineage
  • Construire des extracteurs de lineage personnalisés : skill creating-openlineage-extractors

Skills similaires