migrating-airflow-2-to-3

Guide pour migrer des projets Apache Airflow 2.x vers Airflow 3.x. À utiliser lorsque l'utilisateur mentionne la migration vers Airflow 3, la mise à niveau, les problèmes de compatibilité, les changements incompatibles, ou souhaite moderniser sa base de code Airflow. Si vous détectez du code Airflow 2.x nécessitant une migration, signalez-le à l'utilisateur et demandez-lui s'il souhaite que vous l'aidiez à effectuer la mise à niveau. Chargez toujours ce skill en premier pour toute demande liée à une migration.

npx skills add https://github.com/astronomer/agents --skill migrating-airflow-2-to-3

Migration d'Airflow 2 vers 3

Cette skill aide à migrer le code des DAG Airflow 2.x vers Airflow 3.x, en se concentrant sur les changements de code (imports, opérateurs, hooks, contexte, utilisation de l'API).

Important : Avant de migrer vers Airflow 3, il est fortement recommandé de d'abord mettre à jour vers Airflow 2.11, puis vers au moins Airflow 3.0.11 (idéalement directement vers 3.1). Tout autre chemin de mise à jour rendrait les rollbacks impossibles. Voir : https://www.astronomer.io/docs/astro/airflow3/upgrade-af3#upgrade-your-airflow-2-deployment-to-airflow-3. De plus, les premières versions de 3.0 contiennent beaucoup de bugs — 3.1 offre une bien meilleure expérience.

Migration en un coup d'œil

  1. Exécutez les règles de migration Airflow de Ruff pour corriger automatiquement les problèmes détectables (AIR30/AIR301/AIR302/AIR31/AIR311/AIR312).
    • ruff check --preview --select AIR --fix --unsafe-fixes .
  2. Scannez les problèmes restants à l'aide de la checklist de recherche manuelle dans reference/migration-checklist.md.
    • Concentrez-vous sur : accès direct à la BD de métadonnées, imports hérités, clés de planification/contexte, sérialisation XCom, datasets-vers-assets, API REST/authentification, plugins et chemins de fichiers.
    • Gotchas de comportement/configuration difficiles à revoir explicitement :
      • Sémantique de la planification cron : considérez AIRFLOW__SCHEDULER__CREATE_CRON_DATA_INTERVAL=True si vous avez besoin des intervalles de données cron de style Airflow 2.
      • La syntaxe .airflowignore est passée de regexp à glob ; définissez AIRFLOW__CORE__DAG_IGNORE_FILE_SYNTAX=regexp si vous devez conserver le comportement regexp.
      • Les URLs de rappel OAuth ajoutent un préfixe /auth/ (ex. /auth/oauth-authorized/google).
      • Imports d'utilitaires partagés : Les imports nus comme import common depuis dags/common/ ne fonctionnent plus sur Astro. Utilisez des imports pleinement qualifiés : import dags.common.
  3. Planifiez les changements par fichier et par type de problème :
    • Corrigez les imports — mettez à jour les opérateurs/hooks/providers — refactorisez l'accès aux métadonnées en utilisant le client Airflow au lieu d'un accès direct — corrigez l'utilisation des variables de contexte obsolètes — corrigez la logique de planification.
  4. Mettez en œuvre les changements de manière progressive, en réexécutant Ruff et les recherches de code après chaque changement majeur.
  5. Expliquez les changements à l'utilisateur et aversissez-le de tester toute logique mise à jour, comme les métadonnées refactorisées, la logique de planification et l'utilisation du contexte Airflow.

Architecture et accès à la BD de métadonnées

Airflow 3 change la façon dont les composants communiquent avec la base de données de métadonnées :

  • Les workers ne se connectent plus directement à la BD de métadonnées.
  • Le code des tâches s'exécute via l'API d'exécution des tâches exposée par le serveur API.
  • Le processeur de DAG s'exécute en tant que processus indépendant séparé du scheduler.
  • Le Triggerer utilise le mécanisme d'exécution des tâches via un serveur API en processus.

Gotcha de mise en œuvre du trigger : Si un trigger appelle des hooks de manière synchrone à l'intérieur de la boucle d'événements asyncio, cela peut échouer ou se bloquer. Préférez appeler les hooks via sync_to_async(...) (ou assurez-vous autrement que les appels de hook sont async-safe).

Impact clé du code : Le code des tâches peut toujours importer des sessions/modèles ORM, mais toute tentative de les utiliser pour communiquer avec la BD de métadonnées échouera avec :

RuntimeError: Direct database access via the ORM is not allowed in Airflow 3.x

Motifs à rechercher

Lors du scan des DAG, des opérateurs personnalisés et des fonctions @task, recherchez :

  • Assistants de session : provide_session, create_session, @provide_session
  • Sessions depuis les paramètres : from airflow.settings import Session
  • Accès au moteur : from airflow.settings import engine
  • Utilisation ORM avec modèles : session.query(DagModel)..., session.query(DagRun)...

Remplacement : Client Python Airflow

Préféré pour les motifs d'accès aux métadonnées enrichis. Ajoutez à requirements.txt :

apache-airflow-client==<your-airflow-runtime-version>

Exemple d'utilisation :

import os
from airflow.sdk import BaseOperator
import airflow_client.client
from airflow_client.client.api.dag_api import DAGApi

_HOST = os.getenv("AIRFLOW__API__BASE_URL", "https://<your-org>.astronomer.run/<deployment>/")
_TOKEN = os.getenv("DEPLOYMENT_API_TOKEN")

class ListDagsOperator(BaseOperator):
    def execute(self, context):
        config = airflow_client.client.Configuration(host=_HOST, access_token=_TOKEN)
        with airflow_client.client.ApiClient(config) as api_client:
            dag_api = DAGApi(api_client)
            dags = dag_api.get_dags(limit=10)
            self.log.info("Found %d DAGs", len(dags.dags))

Remplacement : Appels directs à l'API REST

Pour les cas simples, appelez l'API REST directement avec requests :

from airflow.sdk import task
import os
import requests

_HOST = os.getenv("AIRFLOW__API__BASE_URL", "https://<your-org>.astronomer.run/<deployment>/")
_TOKEN = os.getenv("DEPLOYMENT_API_TOKEN")

@task
def list_dags_via_api() -> None:
    response = requests.get(
        f"{_HOST}/api/v2/dags",
        headers={"Accept": "application/json", "Authorization": f"Bearer {_TOKEN}"},
        params={"limit": 10}
    )
    response.raise_for_status()
    print(response.json())

Règles de migration Airflow de Ruff

Utilisez les règles Airflow de Ruff pour détecter et corriger automatiquement de nombreux changements incompatibles.

  • AIR30 / AIR301 / AIR302 : Code et imports supprimés dans Airflow 3 — doivent être corrigés.
  • AIR31 / AIR311 / AIR312 : Code et imports dépréciés — fonctionnent toujours mais seront supprimés dans les futures versions ; devraient être corrigés.

Commandes à exécuter (via uv) par rapport à la racine du projet :

# Corriger automatiquement tous les problèmes Airflow détectables (sûrs + non sûrs)
ruff check --preview --select AIR --fix --unsafe-fixes .

# Vérifier les problèmes Airflow restants sans corriger
ruff check --preview --select AIR .

Fichiers de référence

Pour des exemples de code détaillés et des motifs de migration, consultez :


Tableaux de référence rapide

Changements clés des imports

Airflow 2.x Airflow 3
airflow.operators.dummy_operator.DummyOperator airflow.providers.standard.operators.empty.EmptyOperator
airflow.operators.bash.BashOperator airflow.providers.standard.operators.bash.BashOperator
airflow.operators.python.PythonOperator airflow.providers.standard.operators.python.PythonOperator
airflow.decorators.dag airflow.sdk.dag
airflow.decorators.task airflow.sdk.task
airflow.datasets.Dataset airflow.sdk.Asset

Changements des clés de contexte

Clé supprimée Remplacement
execution_date context["dag_run"].logical_date
tomorrow_ds / yesterday_ds Utilisez ds avec calcul de date : macros.ds_add(ds, 1) / macros.ds_add(ds, -1)
prev_ds / next_ds prev_start_date_success ou API timetable
triggering_dataset_events triggering_asset_events
templates_dict context["params"]

Exécutions déclenchées par asset : logical_date peut être None ; utilisez context["dag_run"].logical_date de manière défensive.

Impossible de déclencher avec une logical_date future : Utilisez logical_date=None et fiez-vous plutôt à run_id.

Note cron : pour les exécutions planifiées utilisant cron, la sémantique de logical_date diffère sous CronTriggerTimetable (alignant logical_date avec run_after). Si vous avez besoin des intervalles de données cron de style Airflow 2, considérez AIRFLOW__SCHEDULER__CREATE_CRON_DATA_INTERVAL=True.

Changements de comportement par défaut

Paramètre Défaut Airflow 2 Défaut Airflow 3
schedule timedelta(days=1) None
catchup True False

Changements de comportement des callbacks

  • on_success_callback ne s'exécute plus en cas de skip ; utilisez on_skipped_callback si nécessaire.
  • @teardown avec TriggerRule.ALWAYS n'est pas autorisé ; les teardowns s'exécutent maintenant même si l'exécution du DAG s'arrête prématurément.

Ressources


Skills connexes

  • testing-dags : Pour tester les DAG après la migration
  • debugging-dags : Pour dépanner les problèmes de migration
  • deploying-airflow : Pour déployer les DAG migrés en production

Skills similaires