annotating-task-lineage

Annotez les tâches Airflow avec des métadonnées de lignage de données en utilisant les `inlets` et `outlets`. À utiliser lorsque l'utilisateur souhaite ajouter des métadonnées de lignage aux tâches, spécifier des datasets d'entrée/sortie, ou activer le suivi du lignage pour des opérateurs sans extraction OpenLineage intégrée.

npx skills add https://github.com/astronomer/agents --skill annotating-task-lineage

Annoter la lignée des tâches avec les entrées et sorties

Cette skill vous guide dans l'ajout d'annotations de lignée manuelles aux tâches Airflow en utilisant inlets et outlets.

Référence : Consultez le guide du développeur du fournisseur OpenLineage pour les derniers opérateurs et motifs supportés.

Sur Astro

Les annotations de lignée définies avec inlets et outlets sont visualisées dans l'onglet Lineage amélioré d'Astro, qui fournit des vues de lignée inter-DAG et inter-déploiement. Cela signifie que vos annotations sont immédiatement visibles dans l'interface utilisateur d'Astro, vous donnant une vue unifiée du flux de données dans toute votre organisation Astro.

Quand utiliser cette approche

Scénario Utiliser Inlets/Outlets ?
L'opérateur dispose de méthodes OpenLineage (get_openlineage_facets_on_*) ❌ Modifier directement la méthode OL
L'opérateur n'a pas d'extracteur OpenLineage intégré ✅ Oui
Une lignée au niveau table simple est suffisante ✅ Oui
Configuration rapide de lignée sans code personnalisé ✅ Oui
Besoin de lignée au niveau colonne ❌ Utiliser les méthodes OpenLineage ou un extracteur personnalisé
Logique d'extraction complexe requise ❌ Utiliser les méthodes OpenLineage ou un extracteur personnalisé

Remarque : Les inlets/outlets sont le recours de plus basse priorité. Si un extracteur ou une méthode OpenLineage existe pour l'opérateur, il prend la priorité. Utilisez cette approche pour les opérateurs sans extracteurs.


Types supportés pour les inlets/outlets

Vous pouvez utiliser des objets OpenLineage Dataset ou des Airflow Assets pour les inlets et outlets :

OpenLineage Datasets (Recommandé)

from openlineage.client.event_v2 import Dataset

# Database tables
source_table = Dataset(
    namespace="postgres://mydb:5432",
    name="public.orders",
)
target_table = Dataset(
    namespace="snowflake://account.snowflakecomputing.com",
    name="staging.orders_clean",
)

# Files
input_file = Dataset(
    namespace="s3://my-bucket",
    name="raw/events/2024-01-01.json",
)

Airflow Assets (Airflow 3+)

from airflow.sdk import Asset

# Using Airflow's native Asset type
orders_asset = Asset(uri="s3://my-bucket/data/orders")

Airflow Datasets (Airflow 2.4+)

from airflow.datasets import Dataset

# Using Airflow's Dataset type (Airflow 2.4-2.x)
orders_dataset = Dataset(uri="s3://my-bucket/data/orders")

Utilisation basique

Définir les inlets et outlets sur les opérateurs

from airflow import DAG
from airflow.operators.bash import BashOperator
from openlineage.client.event_v2 import Dataset
import pendulum

# Define your lineage datasets
source_table = Dataset(
    namespace="snowflake://account.snowflakecomputing.com",
    name="raw.orders",
)
target_table = Dataset(
    namespace="snowflake://account.snowflakecomputing.com",
    name="staging.orders_clean",
)
output_file = Dataset(
    namespace="s3://my-bucket",
    name="exports/orders.parquet",
)

with DAG(
    dag_id="etl_with_lineage",
    start_date=pendulum.datetime(2024, 1, 1, tz="UTC"),
    schedule="@daily",
) as dag:

    transform = BashOperator(
        task_id="transform_orders",
        bash_command="echo 'transforming...'",
        inlets=[source_table],           # What this task reads
        outlets=[target_table],          # What this task writes
    )

    export = BashOperator(
        task_id="export_to_s3",
        bash_command="echo 'exporting...'",
        inlets=[target_table],           # Reads from previous output
        outlets=[output_file],           # Writes to S3
    )

    transform >> export

Plusieurs entrées et sorties

Les tâches lisent souvent à partir de plusieurs sources et écrivent à plusieurs destinations :

from openlineage.client.event_v2 import Dataset

# Multiple source tables
customers = Dataset(namespace="postgres://crm:5432", name="public.customers")
orders = Dataset(namespace="postgres://sales:5432", name="public.orders")
products = Dataset(namespace="postgres://inventory:5432", name="public.products")

# Multiple output tables
daily_summary = Dataset(namespace="snowflake://account", name="analytics.daily_summary")
customer_metrics = Dataset(namespace="snowflake://account", name="analytics.customer_metrics")

aggregate_task = PythonOperator(
    task_id="build_daily_aggregates",
    python_callable=build_aggregates,
    inlets=[customers, orders, products],      # All inputs
    outlets=[daily_summary, customer_metrics], # All outputs
)

Définir la lignée dans les opérateurs personnalisés

Lors de la création d'opérateurs personnalisés, vous avez deux options :

Option 1 : Implémenter les méthodes OpenLineage (Recommandé)

C'est l'approche préférée car elle vous donne un contrôle total sur l'extraction de lignée :

from airflow.models import BaseOperator


class MyCustomOperator(BaseOperator):
    def __init__(self, source_table: str, target_table: str, **kwargs):
        super().__init__(**kwargs)
        self.source_table = source_table
        self.target_table = target_table

    def execute(self, context):
        # ... perform the actual work ...
        self.log.info(f"Processing {self.source_table} -> {self.target_table}")

    def get_openlineage_facets_on_complete(self, task_instance):
        """Return lineage after successful execution."""
        from openlineage.client.event_v2 import Dataset
        from airflow.providers.openlineage.extractors import OperatorLineage

        return OperatorLineage(
            inputs=[Dataset(namespace="warehouse://db", name=self.source_table)],
            outputs=[Dataset(namespace="warehouse://db", name=self.target_table)],
        )

Option 2 : Définir les inlets/outlets dynamiquement

Pour les cas plus simples, définissez la lignée dans la méthode execute (opérateurs non-deferrable uniquement) :

from airflow.models import BaseOperator
from openlineage.client.event_v2 import Dataset


class MyCustomOperator(BaseOperator):
    def __init__(self, source_table: str, target_table: str, **kwargs):
        super().__init__(**kwargs)
        self.source_table = source_table
        self.target_table = target_table

    def execute(self, context):
        # Set lineage dynamically based on operator parameters
        self.inlets = [
            Dataset(namespace="warehouse://db", name=self.source_table)
        ]
        self.outlets = [
            Dataset(namespace="warehouse://db", name=self.target_table)
        ]

        # ... perform the actual work ...
        self.log.info(f"Processing {self.source_table} -> {self.target_table}")

Assistants de nommage des Dataset

Utilisez les assistants de nommage des dataset OpenLineage pour assurer un nommage cohérent sur les plateformes :

from openlineage.client.event_v2 import Dataset

# Snowflake
from openlineage.client.naming.snowflake import SnowflakeDatasetNaming

naming = SnowflakeDatasetNaming(
    account_identifier="myorg-myaccount",
    database="mydb",
    schema="myschema",
    table="mytable",
)
dataset = Dataset(namespace=naming.get_namespace(), name=naming.get_name())
# -> namespace: "snowflake://myorg-myaccount", name: "mydb.myschema.mytable"

# BigQuery
from openlineage.client.naming.bigquery import BigQueryDatasetNaming

naming = BigQueryDatasetNaming(
    project="my-project",
    dataset="my_dataset",
    table="my_table",
)
dataset = Dataset(namespace=naming.get_namespace(), name=naming.get_name())
# -> namespace: "bigquery", name: "my-project.my_dataset.my_table"

# S3
from openlineage.client.naming.s3 import S3DatasetNaming

naming = S3DatasetNaming(bucket="my-bucket", key="path/to/file.parquet")
dataset = Dataset(namespace=naming.get_namespace(), name=naming.get_name())
# -> namespace: "s3://my-bucket", name: "path/to/file.parquet"

# PostgreSQL
from openlineage.client.naming.postgres import PostgresDatasetNaming

naming = PostgresDatasetNaming(
    host="localhost",
    port=5432,
    database="mydb",
    schema="public",
    table="users",
)
dataset = Dataset(namespace=naming.get_namespace(), name=naming.get_name())
# -> namespace: "postgres://localhost:5432", name: "mydb.public.users"

Remarque : Utilisez toujours les assistants de nommage au lieu de construire les espaces de noms manuellement. Si un assistant manque pour votre plateforme, consultez le dépôt OpenLineage ou demandez-le.


Règles de priorité

OpenLineage utilise cette priorité pour l'extraction de lignée :

  1. Extracteurs personnalisés (plus élevée) - Extracteurs enregistrés par l'utilisateur
  2. Méthodes OpenLineage - get_openlineage_facets_on_* dans l'opérateur
  3. Lignée au niveau hook - Lignée collectée à partir de hooks via HookLineageCollector
  4. Inlets/Outlets (plus basse) - Recours si rien d'autre n'extrait la lignée

Remarque : Si un extracteur ou une méthode existe mais ne retourne aucun dataset, OpenLineage vérifiera la lignée au niveau hook, puis recourra aux inlets/outlets.


Bonnes pratiques

Utiliser les assistants de nommage

Utilisez toujours les assistants de nommage OpenLineage pour une création cohérente de dataset :

from openlineage.client.event_v2 import Dataset
from openlineage.client.naming.snowflake import SnowflakeDatasetNaming


def snowflake_dataset(schema: str, table: str) -> Dataset:
    """Create a Snowflake Dataset using the naming helper."""
    naming = SnowflakeDatasetNaming(
        account_identifier="mycompany",
        database="analytics",
        schema=schema,
        table=table,
    )
    return Dataset(namespace=naming.get_namespace(), name=naming.get_name())


# Usage
source = snowflake_dataset("raw", "orders")
target = snowflake_dataset("staging", "orders_clean")

Documenter votre lignée

Ajoutez des commentaires expliquant le flux de données :

transform = SqlOperator(
    task_id="transform_orders",
    sql="...",
    # Lineage: Reads raw orders, joins with customers, writes to staging
    inlets=[
        snowflake_dataset("raw", "orders"),
        snowflake_dataset("raw", "customers"),
    ],
    outlets=[
        snowflake_dataset("staging", "order_details"),
    ],
)

Maintenir la lignée exacte

  • Mettez à jour les inlets/outlets lorsque les requêtes SQL changent
  • Incluez toutes les tables référencées dans les JOINs comme inlets
  • Incluez toutes les tables écrites (y compris les tables temporaires si pertinent)
  • Les annotations outlet-only et inlet-only sont valides. Les annotations unilatérales sont encouragées pour la visibilité de lignée même sans inlet ou outlet correspondant dans un autre DAG.

Limitations

Limitation Solution
Au niveau table uniquement (pas de lignée colonne) Utiliser les méthodes OpenLineage ou un extracteur personnalisé
Remplacée par les extracteurs/méthodes Utiliser uniquement pour les opérateurs sans extracteurs
Statique au moment du parsing du DAG Définir dynamiquement dans execute() ou utiliser les méthodes OL
Les opérateurs deferrable perdent la lignée dynamique Utiliser les méthodes OL à la place ; les attributs définis dans execute() sont perdus lors du deferring

Skills connexes

  • creating-openlineage-extractors : Pour la lignée au niveau colonne ou l'extraction complexe
  • tracing-upstream-lineage : Enquêter sur l'origine des données
  • tracing-downstream-lineage : Enquêter sur ce qui dépend des données