creating-openlineage-extractors

Créez des extracteurs OpenLineage personnalisés pour les opérateurs Airflow. À utiliser lorsque l'utilisateur a besoin de la traçabilité depuis des opérateurs non pris en charge ou tiers, souhaite une traçabilité au niveau des colonnes, ou a besoin d'une logique d'extraction complexe au-delà de ce que fournissent les inlets/outlets.

npx skills add https://github.com/astronomer/agents --skill creating-openlineage-extractors

Créer des Extracteurs OpenLineage

Cette compétence vous guide dans la création d'extracteurs OpenLineage personnalisés pour capturer la lignée à partir d'opérateurs Airflow qui n'ont pas de support intégré.

Référence : Consultez le guide du développeur du fournisseur OpenLineage pour les derniers modèles et la liste des opérateurs/hooks pris en charge.

Quand utiliser chaque approche

Scénario Approche
Opérateur que vous possédez/maintenez Méthodes OpenLineage (recommandé, plus simple)
Opérateur tiers que vous ne pouvez pas modifier Extracteur personnalisé
Besoin de lignée au niveau des colonnes Méthodes OpenLineage ou Extracteur personnalisé
Logique d'extraction complexe Méthodes OpenLineage ou Extracteur personnalisé
Lignée simple au niveau des tables Inlets/Outlets (plus simple, mais priorité basse)

Important : Préférez toujours les méthodes OpenLineage aux extracteurs personnalisés si possible. Les extracteurs sont plus difficiles à écrire, plus susceptibles de diverger du comportement de l'opérateur après des modifications, et plus difficiles à déboguer.

Sur Astro

Astro inclut l'intégration OpenLineage intégrée — aucune configuration de transport supplémentaire n'est nécessaire. Les événements de lignée sont automatiquement collectés et affichés dans l'onglet Lineage de l'interface Astro. Les extracteurs personnalisés déployés sur un projet Astro sont automatiquement détectés, vous devez donc seulement les enregistrer dans airflow.cfg ou via une variable d'environnement et déployer.


Deux approches

1. Méthodes OpenLineage (Recommandé)

À utiliser quand vous pouvez ajouter des méthodes directement à votre opérateur personnalisé. C'est la solution de référence pour les opérateurs que vous possédez.

2. Extracteurs personnalisés

À utiliser quand vous avez besoin de lignée d'opérateurs tiers ou de fournisseurs que vous ne pouvez pas modifier.


Approche 1 : Méthodes OpenLineage (Recommandé)

Quand vous possédez l'opérateur, ajoutez les méthodes OpenLineage directement :

from airflow.models import BaseOperator


class MyCustomOperator(BaseOperator):
    """Custom operator with built-in OpenLineage support."""

    def __init__(self, source_table: str, target_table: str, **kwargs):
        super().__init__(**kwargs)
        self.source_table = source_table
        self.target_table = target_table
        self._rows_processed = 0  # Set during execution

    def execute(self, context):
        # Do the actual work
        self._rows_processed = self._process_data()
        return self._rows_processed

    def get_openlineage_facets_on_start(self):
        """Called when task starts. Return known inputs/outputs."""
        # Import locally to avoid circular imports
        from openlineage.client.event_v2 import Dataset
        from airflow.providers.openlineage.extractors import OperatorLineage

        return OperatorLineage(
            inputs=[Dataset(namespace="postgres://db", name=self.source_table)],
            outputs=[Dataset(namespace="postgres://db", name=self.target_table)],
        )

    def get_openlineage_facets_on_complete(self, task_instance):
        """Called after success. Add runtime metadata."""
        from openlineage.client.event_v2 import Dataset
        from openlineage.client.facet_v2 import output_statistics_output_dataset
        from airflow.providers.openlineage.extractors import OperatorLineage

        return OperatorLineage(
            inputs=[Dataset(namespace="postgres://db", name=self.source_table)],
            outputs=[
                Dataset(
                    namespace="postgres://db",
                    name=self.target_table,
                    facets={
                        "outputStatistics": output_statistics_output_dataset.OutputStatisticsOutputDatasetFacet(
                            rowCount=self._rows_processed
                        )
                    },
                )
            ],
        )

    def get_openlineage_facets_on_failure(self, task_instance):
        """Called after failure. Optional - for partial lineage."""
        return None

Référence des méthodes OpenLineage

Méthode Quand appelée Obligatoire
get_openlineage_facets_on_start() La tâche entre en RUNNING Non
get_openlineage_facets_on_complete(ti) La tâche réussit Non
get_openlineage_facets_on_failure(ti) La tâche échoue Non

Implémentez uniquement les méthodes dont vous avez besoin. Les méthodes non implémentées tombent en cascade vers la lignée au niveau du Hook ou les inlets/outlets.


Approche 2 : Extracteurs personnalisés

Utilisez cette approche uniquement quand vous ne pouvez pas modifier l'opérateur (par ex., opérateurs tiers ou de fournisseurs).

Structure de base

from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
from openlineage.client.event_v2 import Dataset


class MyOperatorExtractor(BaseExtractor):
    """Extract lineage from MyCustomOperator."""

    @classmethod
    def get_operator_classnames(cls) -> list[str]:
        """Return operator class names this extractor handles."""
        return ["MyCustomOperator"]

    def _execute_extraction(self) -> OperatorLineage | None:
        """Called BEFORE operator executes. Use for known inputs/outputs."""
        # Access operator properties via self.operator
        source_table = self.operator.source_table
        target_table = self.operator.target_table

        return OperatorLineage(
            inputs=[
                Dataset(
                    namespace="postgres://mydb:5432",
                    name=f"public.{source_table}",
                )
            ],
            outputs=[
                Dataset(
                    namespace="postgres://mydb:5432",
                    name=f"public.{target_table}",
                )
            ],
        )

    def extract_on_complete(self, task_instance) -> OperatorLineage | None:
        """Called AFTER operator executes. Use for runtime-determined lineage."""
        # Access properties set during execution
        # Useful for operators that determine outputs at runtime
        return None

Structure OperatorLineage

from airflow.providers.openlineage.extractors.base import OperatorLineage
from openlineage.client.event_v2 import Dataset
from openlineage.client.facet_v2 import sql_job

lineage = OperatorLineage(
    inputs=[Dataset(namespace="...", name="...")],      # Input datasets
    outputs=[Dataset(namespace="...", name="...")],     # Output datasets
    run_facets={"sql": sql_job.SQLJobFacet(query="SELECT...")},  # Run metadata
    job_facets={},                                      # Job metadata
)

Méthodes d'extraction

Méthode Quand appelée À utiliser pour
_execute_extraction() Avant l'exécution de l'opérateur Lignée statique/connue
extract_on_complete(task_instance) Après succès Lignée déterminée à l'exécution
extract_on_failure(task_instance) Après échec Lignée partielle en cas d'erreur

Enregistrement des extracteurs

Option 1 : Fichier de configuration (airflow.cfg)

[openlineage]
extractors = mypackage.extractors.MyOperatorExtractor;mypackage.extractors.AnotherExtractor

Option 2 : Variable d'environnement

AIRFLOW__OPENLINEAGE__EXTRACTORS='mypackage.extractors.MyOperatorExtractor;mypackage.extractors.AnotherExtractor'

Important : Le chemin doit être importable depuis le worker Airflow. Placez les extracteurs dans votre dossier DAGs ou dans un paquet installé.


Modèles courants

Extracteur d'opérateur SQL

from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
from openlineage.client.event_v2 import Dataset
from openlineage.client.facet_v2 import sql_job


class MySqlOperatorExtractor(BaseExtractor):
    @classmethod
    def get_operator_classnames(cls) -> list[str]:
        return ["MySqlOperator"]

    def _execute_extraction(self) -> OperatorLineage | None:
        sql = self.operator.sql
        conn_id = self.operator.conn_id

        # Parse SQL to find tables (simplified example)
        # In practice, use a SQL parser like sqlglot
        inputs, outputs = self._parse_sql(sql)

        namespace = f"postgres://{conn_id}"

        return OperatorLineage(
            inputs=[Dataset(namespace=namespace, name=t) for t in inputs],
            outputs=[Dataset(namespace=namespace, name=t) for t in outputs],
            job_facets={
                "sql": sql_job.SQLJobFacet(query=sql)
            },
        )

    def _parse_sql(self, sql: str) -> tuple[list[str], list[str]]:
        """Parse SQL to extract table names. Use sqlglot for real parsing."""
        # Simplified example - use proper SQL parser in production
        inputs = []
        outputs = []
        # ... parsing logic ...
        return inputs, outputs

Extracteur de transfert de fichiers

from airflow.providers.openlineage.extractors.base import BaseExtractor, OperatorLineage
from openlineage.client.event_v2 import Dataset


class S3ToSnowflakeExtractor(BaseExtractor):
    @classmethod
    def get_operator_classnames(cls) -> list[str]:
        return ["S3ToSnowflakeOperator"]

    def _execute_extraction(self) -> OperatorLineage | None:
        s3_bucket = self.operator.s3_bucket
        s3_key = self.operator.s3_key
        table = self.operator.table
        schema = self.operator.schema

        return OperatorLineage(
            inputs=[
                Dataset(
                    namespace=f"s3://{s3_bucket}",
                    name=s3_key,
                )
            ],
            outputs=[
                Dataset(
                    namespace="snowflake://myaccount.snowflakecomputing.com",
                    name=f"{schema}.{table}",
                )
            ],
        )

Lignée dynamique à partir de l'exécution

from openlineage.client.event_v2 import Dataset


class DynamicOutputExtractor(BaseExtractor):
    @classmethod
    def get_operator_classnames(cls) -> list[str]:
        return ["DynamicOutputOperator"]

    def _execute_extraction(self) -> OperatorLineage | None:
        # Only inputs known before execution
        return OperatorLineage(
            inputs=[Dataset(namespace="...", name=self.operator.source)],
        )

    def extract_on_complete(self, task_instance) -> OperatorLineage | None:
        # Outputs determined during execution
        # Access via operator properties set in execute()
        outputs = self.operator.created_tables  # Set during execute()

        return OperatorLineage(
            inputs=[Dataset(namespace="...", name=self.operator.source)],
            outputs=[Dataset(namespace="...", name=t) for t in outputs],
        )

Pièges courants

1. Importations circulaires

Problème : Importer des modules Airflow au niveau supérieur cause des importations circulaires.

# ❌ MAUVAIS - peut causer des problèmes d'importation circulaire
from airflow.models import TaskInstance
from openlineage.client.event_v2 import Dataset

class MyExtractor(BaseExtractor):
    ...
# ✅ BON - importer à l'intérieur des méthodes
class MyExtractor(BaseExtractor):
    def _execute_extraction(self):
        from openlineage.client.event_v2 import Dataset
        # ...

2. Mauvais chemin d'importation

Problème : Le chemin de l'extracteur ne correspond pas à l'emplacement du module réel.

# ❌ Mauvais - le chemin n'existe pas
AIRFLOW__OPENLINEAGE__EXTRACTORS='extractors.MyExtractor'

# ✅ Correct - chemin importable complet
AIRFLOW__OPENLINEAGE__EXTRACTORS='dags.extractors.my_extractor.MyExtractor'

3. Ne pas gérer None

Problème : L'extraction échoue quand les propriétés de l'opérateur sont None.

# ✅ Gérer les propriétés optionnelles
def _execute_extraction(self) -> OperatorLineage | None:
    if not self.operator.source_table:
        return None  # Skip extraction

    return OperatorLineage(...)

Tester les extracteurs

Tests unitaires

import pytest
from unittest.mock import MagicMock
from mypackage.extractors import MyOperatorExtractor


def test_extractor():
    # Mock the operator
    operator = MagicMock()
    operator.source_table = "input_table"
    operator.target_table = "output_table"

    # Create extractor
    extractor = MyOperatorExtractor(operator)

    # Test extraction
    lineage = extractor._execute_extraction()

    assert len(lineage.inputs) == 1
    assert lineage.inputs[0].name == "input_table"
    assert len(lineage.outputs) == 1
    assert lineage.outputs[0].name == "output_table"

Règles de précédence

OpenLineage vérifie la lignée dans cet ordre :

  1. Extracteurs personnalisés (priorité la plus élevée)
  2. Méthodes OpenLineage sur l'opérateur
  3. Lignée au niveau du Hook (depuis HookLineageCollector)
  4. Inlets/Outlets (priorité la plus basse)

Si un extracteur personnalisé existe, il remplace l'extraction intégrée et les inlets/outlets.


Compétences associées

  • annotating-task-lineage : Pour la lignée simple au niveau des tables avec inlets/outlets
  • tracing-upstream-lineage : Investiguer les origines des données
  • tracing-downstream-lineage : Investiguer les dépendances de données

Skills similaires