Créer des Extracteurs OpenLineage
Cette compétence vous guide dans la création d'extracteurs OpenLineage personnalisés pour capturer la lignée à partir d'opérateurs Airflow qui n'ont pas de support intégré.
Référence : Consultez le guide du développeur du fournisseur OpenLineage pour les derniers modèles et la liste des opérateurs/hooks pris en charge.
Quand utiliser chaque approche
| Scénario | Approche |
|---|---|
| Opérateur que vous possédez/maintenez | Méthodes OpenLineage (recommandé, plus simple) |
| Opérateur tiers que vous ne pouvez pas modifier | Extracteur personnalisé |
| Besoin de lignée au niveau des colonnes | Méthodes OpenLineage ou Extracteur personnalisé |
| Logique d'extraction complexe | Méthodes OpenLineage ou Extracteur personnalisé |
| Lignée simple au niveau des tables | Inlets/Outlets (plus simple, mais priorité basse) |
Important : Préférez toujours les méthodes OpenLineage aux extracteurs personnalisés si possible. Les extracteurs sont plus difficiles à écrire, plus susceptibles de diverger du comportement de l'opérateur après des modifications, et plus difficiles à déboguer.
Sur Astro
Astro inclut l'intégration OpenLineage intégrée — aucune configuration de transport supplémentaire n'est nécessaire. Les événements de lignée sont automatiquement collectés et affichés dans l'onglet Lineage de l'interface Astro. Les extracteurs personnalisés déployés sur un projet Astro sont automatiquement détectés, vous devez donc seulement les enregistrer dans airflow.cfg ou via une variable d'environnement et déployer.
Deux approches
1. Méthodes OpenLineage (Recommandé)
À utiliser quand vous pouvez ajouter des méthodes directement à votre opérateur personnalisé. C'est la solution de référence pour les opérateurs que vous possédez.
2. Extracteurs personnalisés
À utiliser quand vous avez besoin de lignée d'opérateurs tiers ou de fournisseurs que vous ne pouvez pas modifier.
Approche 1 : Méthodes OpenLineage (Recommandé)
Quand vous possédez l'opérateur, ajoutez les méthodes OpenLineage directement :
from airflow.models import BaseOperator
class MyCustomOperator(BaseOperator):
"""Custom operator with built-in OpenLineage support."""
def __init__(self, source_table: str, target_table: str, **kwargs):
super().__init__(**kwargs)
self.source_table = source_table
self.target_table = target_table
self._rows_processed = 0 # Set during execution
def execute(self, context):
# Do the actual work
self._rows_processed = self._process_data()
return self._rows_processed
def get_openlineage_facets_on_start(self):
"""Called when task starts. Return known inputs/outputs."""
# Import locally to avoid circular imports
from openlineage.client.event_v2 import Dataset
from airflow.providers.openlineage.extractors import OperatorLineage
return OperatorLineage(
inputs=[Dataset(namespace="postgres://db", name=self.source_table)],
outputs=[Dataset(namespace="postgres://db", name=self.target_table)],
)
def get_openlineage_facets_on_complete(self, task_instance):
"""Called after success. Add runtime metadata."""
from openlineage.client.event_v2 import Dataset
from openlineage.client.facet_v2 import output_statistics_output_dataset
from airflow.providers.openlineage.extractors import OperatorLineage
return OperatorLineage(
inputs=[Dataset(namespace="postgres://db", name=self.source_table)],
outputs=[
Dataset(
namespace="postgres://db",
name=self.target_table,
facets={
"outputStatistics": output_statistics_output_dataset.OutputStatisticsOutputDatasetFacet(
rowCount=self._rows_processed
)
},
)
],
)
def get_openlineage_facets_on_failure(self, task_instance):
"""Called after failure. Optional - for partial lineage."""
return None
Référence des méthodes OpenLineage
| Méthode | Quand appelée | Obligatoire |
|---|---|---|
get_openlineage_facets_on_start() |
La tâche entre en RUNNING | Non |
get_openlineage_facets_on_complete(ti) |
La tâche réussit | Non |
get_openlineage_facets_on_failure(ti) |
La tâche échoue | Non |
Implémentez uniquement les méthodes dont vous avez besoin. Les méthodes non implémentées tombent en cascade vers la lignée au niveau du Hook ou les inlets/outlets.
Approche 2 : Extracteurs personnalisés
Utilisez cette approche uniquement quand vous ne pouvez pas modifier l'opérateur (par ex., opérateurs tiers ou de fournisseurs).
Structure de base
from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
from openlineage.client.event_v2 import Dataset
class MyOperatorExtractor(BaseExtractor):
"""Extract lineage from MyCustomOperator."""
@classmethod
def get_operator_classnames(cls) -> list[str]:
"""Return operator class names this extractor handles."""
return ["MyCustomOperator"]
def _execute_extraction(self) -> OperatorLineage | None:
"""Called BEFORE operator executes. Use for known inputs/outputs."""
# Access operator properties via self.operator
source_table = self.operator.source_table
target_table = self.operator.target_table
return OperatorLineage(
inputs=[
Dataset(
namespace="postgres://mydb:5432",
name=f"public.{source_table}",
)
],
outputs=[
Dataset(
namespace="postgres://mydb:5432",
name=f"public.{target_table}",
)
],
)
def extract_on_complete(self, task_instance) -> OperatorLineage | None:
"""Called AFTER operator executes. Use for runtime-determined lineage."""
# Access properties set during execution
# Useful for operators that determine outputs at runtime
return None
Structure OperatorLineage
from airflow.providers.openlineage.extractors.base import OperatorLineage
from openlineage.client.event_v2 import Dataset
from openlineage.client.facet_v2 import sql_job
lineage = OperatorLineage(
inputs=[Dataset(namespace="...", name="...")], # Input datasets
outputs=[Dataset(namespace="...", name="...")], # Output datasets
run_facets={"sql": sql_job.SQLJobFacet(query="SELECT...")}, # Run metadata
job_facets={}, # Job metadata
)
Méthodes d'extraction
| Méthode | Quand appelée | À utiliser pour |
|---|---|---|
_execute_extraction() |
Avant l'exécution de l'opérateur | Lignée statique/connue |
extract_on_complete(task_instance) |
Après succès | Lignée déterminée à l'exécution |
extract_on_failure(task_instance) |
Après échec | Lignée partielle en cas d'erreur |
Enregistrement des extracteurs
Option 1 : Fichier de configuration (airflow.cfg)
[openlineage]
extractors = mypackage.extractors.MyOperatorExtractor;mypackage.extractors.AnotherExtractor
Option 2 : Variable d'environnement
AIRFLOW__OPENLINEAGE__EXTRACTORS='mypackage.extractors.MyOperatorExtractor;mypackage.extractors.AnotherExtractor'
Important : Le chemin doit être importable depuis le worker Airflow. Placez les extracteurs dans votre dossier DAGs ou dans un paquet installé.
Modèles courants
Extracteur d'opérateur SQL
from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
from openlineage.client.event_v2 import Dataset
from openlineage.client.facet_v2 import sql_job
class MySqlOperatorExtractor(BaseExtractor):
@classmethod
def get_operator_classnames(cls) -> list[str]:
return ["MySqlOperator"]
def _execute_extraction(self) -> OperatorLineage | None:
sql = self.operator.sql
conn_id = self.operator.conn_id
# Parse SQL to find tables (simplified example)
# In practice, use a SQL parser like sqlglot
inputs, outputs = self._parse_sql(sql)
namespace = f"postgres://{conn_id}"
return OperatorLineage(
inputs=[Dataset(namespace=namespace, name=t) for t in inputs],
outputs=[Dataset(namespace=namespace, name=t) for t in outputs],
job_facets={
"sql": sql_job.SQLJobFacet(query=sql)
},
)
def _parse_sql(self, sql: str) -> tuple[list[str], list[str]]:
"""Parse SQL to extract table names. Use sqlglot for real parsing."""
# Simplified example - use proper SQL parser in production
inputs = []
outputs = []
# ... parsing logic ...
return inputs, outputs
Extracteur de transfert de fichiers
from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
from openlineage.client.event_v2 import Dataset
class S3ToSnowflakeExtractor(BaseExtractor):
@classmethod
def get_operator_classnames(cls) -> list[str]:
return ["S3ToSnowflakeOperator"]
def _execute_extraction(self) -> OperatorLineage | None:
s3_bucket = self.operator.s3_bucket
s3_key = self.operator.s3_key
table = self.operator.table
schema = self.operator.schema
return OperatorLineage(
inputs=[
Dataset(
namespace=f"s3://{s3_bucket}",
name=s3_key,
)
],
outputs=[
Dataset(
namespace="snowflake://myaccount.snowflakecomputing.com",
name=f"{schema}.{table}",
)
],
)
Lignée dynamique à partir de l'exécution
from openlineage.client.event_v2 import Dataset
class DynamicOutputExtractor(BaseExtractor):
@classmethod
def get_operator_classnames(cls) -> list[str]:
return ["DynamicOutputOperator"]
def _execute_extraction(self) -> OperatorLineage | None:
# Only inputs known before execution
return OperatorLineage(
inputs=[Dataset(namespace="...", name=self.operator.source)],
)
def extract_on_complete(self, task_instance) -> OperatorLineage | None:
# Outputs determined during execution
# Access via operator properties set in execute()
outputs = self.operator.created_tables # Set during execute()
return OperatorLineage(
inputs=[Dataset(namespace="...", name=self.operator.source)],
outputs=[Dataset(namespace="...", name=t) for t in outputs],
)
Pièges courants
1. Importations circulaires
Problème : Importer des modules Airflow au niveau supérieur cause des importations circulaires.
# ❌ MAUVAIS - peut causer des problèmes d'importation circulaire
from airflow.models import TaskInstance
from openlineage.client.event_v2 import Dataset
class MyExtractor(BaseExtractor):
...
# ✅ BON - importer à l'intérieur des méthodes
class MyExtractor(BaseExtractor):
def _execute_extraction(self):
from openlineage.client.event_v2 import Dataset
# ...
2. Mauvais chemin d'importation
Problème : Le chemin de l'extracteur ne correspond pas à l'emplacement du module réel.
# ❌ Mauvais - le chemin n'existe pas
AIRFLOW__OPENLINEAGE__EXTRACTORS='extractors.MyExtractor'
# ✅ Correct - chemin importable complet
AIRFLOW__OPENLINEAGE__EXTRACTORS='dags.extractors.my_extractor.MyExtractor'
3. Ne pas gérer None
Problème : L'extraction échoue quand les propriétés de l'opérateur sont None.
# ✅ Gérer les propriétés optionnelles
def _execute_extraction(self) -> OperatorLineage | None:
if not self.operator.source_table:
return None # Skip extraction
return OperatorLineage(...)
Tester les extracteurs
Tests unitaires
import pytest
from unittest.mock import MagicMock
from mypackage.extractors import MyOperatorExtractor
def test_extractor():
# Mock the operator
operator = MagicMock()
operator.source_table = "input_table"
operator.target_table = "output_table"
# Create extractor
extractor = MyOperatorExtractor(operator)
# Test extraction
lineage = extractor._execute_extraction()
assert len(lineage.inputs) == 1
assert lineage.inputs[0].name == "input_table"
assert len(lineage.outputs) == 1
assert lineage.outputs[0].name == "output_table"
Règles de précédence
OpenLineage vérifie la lignée dans cet ordre :
- Extracteurs personnalisés (priorité la plus élevée)
- Méthodes OpenLineage sur l'opérateur
- Lignée au niveau du Hook (depuis
HookLineageCollector) - Inlets/Outlets (priorité la plus basse)
Si un extracteur personnalisé existe, il remplace l'extraction intégrée et les inlets/outlets.
Compétences associées
- annotating-task-lineage : Pour la lignée simple au niveau des tables avec inlets/outlets
- tracing-upstream-lineage : Investiguer les origines des données
- tracing-downstream-lineage : Investiguer les dépendances de données