Annoter la lignée des tâches avec les entrées et sorties
Cette skill vous guide dans l'ajout d'annotations de lignée manuelles aux tâches Airflow en utilisant inlets et outlets.
Référence : Consultez le guide du développeur du fournisseur OpenLineage pour les derniers opérateurs et motifs supportés.
Sur Astro
Les annotations de lignée définies avec inlets et outlets sont visualisées dans l'onglet Lineage amélioré d'Astro, qui fournit des vues de lignée inter-DAG et inter-déploiement. Cela signifie que vos annotations sont immédiatement visibles dans l'interface utilisateur d'Astro, vous donnant une vue unifiée du flux de données dans toute votre organisation Astro.
Quand utiliser cette approche
| Scénario | Utiliser Inlets/Outlets ? |
|---|---|
L'opérateur dispose de méthodes OpenLineage (get_openlineage_facets_on_*) |
❌ Modifier directement la méthode OL |
| L'opérateur n'a pas d'extracteur OpenLineage intégré | ✅ Oui |
| Une lignée au niveau table simple est suffisante | ✅ Oui |
| Configuration rapide de lignée sans code personnalisé | ✅ Oui |
| Besoin de lignée au niveau colonne | ❌ Utiliser les méthodes OpenLineage ou un extracteur personnalisé |
| Logique d'extraction complexe requise | ❌ Utiliser les méthodes OpenLineage ou un extracteur personnalisé |
Remarque : Les inlets/outlets sont le recours de plus basse priorité. Si un extracteur ou une méthode OpenLineage existe pour l'opérateur, il prend la priorité. Utilisez cette approche pour les opérateurs sans extracteurs.
Types supportés pour les inlets/outlets
Vous pouvez utiliser des objets OpenLineage Dataset ou des Airflow Assets pour les inlets et outlets :
OpenLineage Datasets (Recommandé)
from openlineage.client.event_v2 import Dataset
# Database tables
source_table = Dataset(
namespace="postgres://mydb:5432",
name="public.orders",
)
target_table = Dataset(
namespace="snowflake://account.snowflakecomputing.com",
name="staging.orders_clean",
)
# Files
input_file = Dataset(
namespace="s3://my-bucket",
name="raw/events/2024-01-01.json",
)
Airflow Assets (Airflow 3+)
from airflow.sdk import Asset
# Using Airflow's native Asset type
orders_asset = Asset(uri="s3://my-bucket/data/orders")
Airflow Datasets (Airflow 2.4+)
from airflow.datasets import Dataset
# Using Airflow's Dataset type (Airflow 2.4-2.x)
orders_dataset = Dataset(uri="s3://my-bucket/data/orders")
Utilisation basique
Définir les inlets et outlets sur les opérateurs
from airflow import DAG
from airflow.operators.bash import BashOperator
from openlineage.client.event_v2 import Dataset
import pendulum
# Define your lineage datasets
source_table = Dataset(
namespace="snowflake://account.snowflakecomputing.com",
name="raw.orders",
)
target_table = Dataset(
namespace="snowflake://account.snowflakecomputing.com",
name="staging.orders_clean",
)
output_file = Dataset(
namespace="s3://my-bucket",
name="exports/orders.parquet",
)
with DAG(
dag_id="etl_with_lineage",
start_date=pendulum.datetime(2024, 1, 1, tz="UTC"),
schedule="@daily",
) as dag:
transform = BashOperator(
task_id="transform_orders",
bash_command="echo 'transforming...'",
inlets=[source_table], # What this task reads
outlets=[target_table], # What this task writes
)
export = BashOperator(
task_id="export_to_s3",
bash_command="echo 'exporting...'",
inlets=[target_table], # Reads from previous output
outlets=[output_file], # Writes to S3
)
transform >> export
Plusieurs entrées et sorties
Les tâches lisent souvent à partir de plusieurs sources et écrivent à plusieurs destinations :
from openlineage.client.event_v2 import Dataset
# Multiple source tables
customers = Dataset(namespace="postgres://crm:5432", name="public.customers")
orders = Dataset(namespace="postgres://sales:5432", name="public.orders")
products = Dataset(namespace="postgres://inventory:5432", name="public.products")
# Multiple output tables
daily_summary = Dataset(namespace="snowflake://account", name="analytics.daily_summary")
customer_metrics = Dataset(namespace="snowflake://account", name="analytics.customer_metrics")
aggregate_task = PythonOperator(
task_id="build_daily_aggregates",
python_callable=build_aggregates,
inlets=[customers, orders, products], # All inputs
outlets=[daily_summary, customer_metrics], # All outputs
)
Définir la lignée dans les opérateurs personnalisés
Lors de la création d'opérateurs personnalisés, vous avez deux options :
Option 1 : Implémenter les méthodes OpenLineage (Recommandé)
C'est l'approche préférée car elle vous donne un contrôle total sur l'extraction de lignée :
from airflow.models import BaseOperator
class MyCustomOperator(BaseOperator):
def __init__(self, source_table: str, target_table: str, **kwargs):
super().__init__(**kwargs)
self.source_table = source_table
self.target_table = target_table
def execute(self, context):
# ... perform the actual work ...
self.log.info(f"Processing {self.source_table} -> {self.target_table}")
def get_openlineage_facets_on_complete(self, task_instance):
"""Return lineage after successful execution."""
from openlineage.client.event_v2 import Dataset
from airflow.providers.openlineage.extractors import OperatorLineage
return OperatorLineage(
inputs=[Dataset(namespace="warehouse://db", name=self.source_table)],
outputs=[Dataset(namespace="warehouse://db", name=self.target_table)],
)
Option 2 : Définir les inlets/outlets dynamiquement
Pour les cas plus simples, définissez la lignée dans la méthode execute (opérateurs non-deferrable uniquement) :
from airflow.models import BaseOperator
from openlineage.client.event_v2 import Dataset
class MyCustomOperator(BaseOperator):
def __init__(self, source_table: str, target_table: str, **kwargs):
super().__init__(**kwargs)
self.source_table = source_table
self.target_table = target_table
def execute(self, context):
# Set lineage dynamically based on operator parameters
self.inlets = [
Dataset(namespace="warehouse://db", name=self.source_table)
]
self.outlets = [
Dataset(namespace="warehouse://db", name=self.target_table)
]
# ... perform the actual work ...
self.log.info(f"Processing {self.source_table} -> {self.target_table}")
Assistants de nommage des Dataset
Utilisez les assistants de nommage des dataset OpenLineage pour assurer un nommage cohérent sur les plateformes :
from openlineage.client.event_v2 import Dataset
# Snowflake
from openlineage.client.naming.snowflake import SnowflakeDatasetNaming
naming = SnowflakeDatasetNaming(
account_identifier="myorg-myaccount",
database="mydb",
schema="myschema",
table="mytable",
)
dataset = Dataset(namespace=naming.get_namespace(), name=naming.get_name())
# -> namespace: "snowflake://myorg-myaccount", name: "mydb.myschema.mytable"
# BigQuery
from openlineage.client.naming.bigquery import BigQueryDatasetNaming
naming = BigQueryDatasetNaming(
project="my-project",
dataset="my_dataset",
table="my_table",
)
dataset = Dataset(namespace=naming.get_namespace(), name=naming.get_name())
# -> namespace: "bigquery", name: "my-project.my_dataset.my_table"
# S3
from openlineage.client.naming.s3 import S3DatasetNaming
naming = S3DatasetNaming(bucket="my-bucket", key="path/to/file.parquet")
dataset = Dataset(namespace=naming.get_namespace(), name=naming.get_name())
# -> namespace: "s3://my-bucket", name: "path/to/file.parquet"
# PostgreSQL
from openlineage.client.naming.postgres import PostgresDatasetNaming
naming = PostgresDatasetNaming(
host="localhost",
port=5432,
database="mydb",
schema="public",
table="users",
)
dataset = Dataset(namespace=naming.get_namespace(), name=naming.get_name())
# -> namespace: "postgres://localhost:5432", name: "mydb.public.users"
Remarque : Utilisez toujours les assistants de nommage au lieu de construire les espaces de noms manuellement. Si un assistant manque pour votre plateforme, consultez le dépôt OpenLineage ou demandez-le.
Règles de priorité
OpenLineage utilise cette priorité pour l'extraction de lignée :
- Extracteurs personnalisés (plus élevée) - Extracteurs enregistrés par l'utilisateur
- Méthodes OpenLineage -
get_openlineage_facets_on_*dans l'opérateur - Lignée au niveau hook - Lignée collectée à partir de hooks via
HookLineageCollector - Inlets/Outlets (plus basse) - Recours si rien d'autre n'extrait la lignée
Remarque : Si un extracteur ou une méthode existe mais ne retourne aucun dataset, OpenLineage vérifiera la lignée au niveau hook, puis recourra aux inlets/outlets.
Bonnes pratiques
Utiliser les assistants de nommage
Utilisez toujours les assistants de nommage OpenLineage pour une création cohérente de dataset :
from openlineage.client.event_v2 import Dataset
from openlineage.client.naming.snowflake import SnowflakeDatasetNaming
def snowflake_dataset(schema: str, table: str) -> Dataset:
"""Create a Snowflake Dataset using the naming helper."""
naming = SnowflakeDatasetNaming(
account_identifier="mycompany",
database="analytics",
schema=schema,
table=table,
)
return Dataset(namespace=naming.get_namespace(), name=naming.get_name())
# Usage
source = snowflake_dataset("raw", "orders")
target = snowflake_dataset("staging", "orders_clean")
Documenter votre lignée
Ajoutez des commentaires expliquant le flux de données :
transform = SqlOperator(
task_id="transform_orders",
sql="...",
# Lineage: Reads raw orders, joins with customers, writes to staging
inlets=[
snowflake_dataset("raw", "orders"),
snowflake_dataset("raw", "customers"),
],
outlets=[
snowflake_dataset("staging", "order_details"),
],
)
Maintenir la lignée exacte
- Mettez à jour les inlets/outlets lorsque les requêtes SQL changent
- Incluez toutes les tables référencées dans les JOINs comme inlets
- Incluez toutes les tables écrites (y compris les tables temporaires si pertinent)
- Les annotations outlet-only et inlet-only sont valides. Les annotations unilatérales sont encouragées pour la visibilité de lignée même sans inlet ou outlet correspondant dans un autre DAG.
Limitations
| Limitation | Solution |
|---|---|
| Au niveau table uniquement (pas de lignée colonne) | Utiliser les méthodes OpenLineage ou un extracteur personnalisé |
| Remplacée par les extracteurs/méthodes | Utiliser uniquement pour les opérateurs sans extracteurs |
| Statique au moment du parsing du DAG | Définir dynamiquement dans execute() ou utiliser les méthodes OL |
| Les opérateurs deferrable perdent la lignée dynamique | Utiliser les méthodes OL à la place ; les attributs définis dans execute() sont perdus lors du deferring |
Skills connexes
- creating-openlineage-extractors : Pour la lignée au niveau colonne ou l'extraction complexe
- tracing-upstream-lineage : Enquêter sur l'origine des données
- tracing-downstream-lineage : Enquêter sur ce qui dépend des données