Migration d'Airflow 2 vers 3
Cette skill aide à migrer le code des DAG Airflow 2.x vers Airflow 3.x, en se concentrant sur les changements de code (imports, opérateurs, hooks, contexte, utilisation de l'API).
Important : Avant de migrer vers Airflow 3, il est fortement recommandé de d'abord mettre à jour vers Airflow 2.11, puis vers au moins Airflow 3.0.11 (idéalement directement vers 3.1). Tout autre chemin de mise à jour rendrait les rollbacks impossibles. Voir : https://www.astronomer.io/docs/astro/airflow3/upgrade-af3#upgrade-your-airflow-2-deployment-to-airflow-3. De plus, les premières versions de 3.0 contiennent beaucoup de bugs — 3.1 offre une bien meilleure expérience.
Migration en un coup d'œil
- Exécutez les règles de migration Airflow de Ruff pour corriger automatiquement les problèmes détectables (AIR30/AIR301/AIR302/AIR31/AIR311/AIR312).
ruff check --preview --select AIR --fix --unsafe-fixes .
- Scannez les problèmes restants à l'aide de la checklist de recherche manuelle dans reference/migration-checklist.md.
- Concentrez-vous sur : accès direct à la BD de métadonnées, imports hérités, clés de planification/contexte, sérialisation XCom, datasets-vers-assets, API REST/authentification, plugins et chemins de fichiers.
- Gotchas de comportement/configuration difficiles à revoir explicitement :
- Sémantique de la planification cron : considérez
AIRFLOW__SCHEDULER__CREATE_CRON_DATA_INTERVAL=Truesi vous avez besoin des intervalles de données cron de style Airflow 2. - La syntaxe
.airflowignoreest passée de regexp à glob ; définissezAIRFLOW__CORE__DAG_IGNORE_FILE_SYNTAX=regexpsi vous devez conserver le comportement regexp. - Les URLs de rappel OAuth ajoutent un préfixe
/auth/(ex./auth/oauth-authorized/google). - Imports d'utilitaires partagés : Les imports nus comme
import commondepuisdags/common/ne fonctionnent plus sur Astro. Utilisez des imports pleinement qualifiés :import dags.common.
- Sémantique de la planification cron : considérez
- Planifiez les changements par fichier et par type de problème :
- Corrigez les imports — mettez à jour les opérateurs/hooks/providers — refactorisez l'accès aux métadonnées en utilisant le client Airflow au lieu d'un accès direct — corrigez l'utilisation des variables de contexte obsolètes — corrigez la logique de planification.
- Mettez en œuvre les changements de manière progressive, en réexécutant Ruff et les recherches de code après chaque changement majeur.
- Expliquez les changements à l'utilisateur et aversissez-le de tester toute logique mise à jour, comme les métadonnées refactorisées, la logique de planification et l'utilisation du contexte Airflow.
Architecture et accès à la BD de métadonnées
Airflow 3 change la façon dont les composants communiquent avec la base de données de métadonnées :
- Les workers ne se connectent plus directement à la BD de métadonnées.
- Le code des tâches s'exécute via l'API d'exécution des tâches exposée par le serveur API.
- Le processeur de DAG s'exécute en tant que processus indépendant séparé du scheduler.
- Le Triggerer utilise le mécanisme d'exécution des tâches via un serveur API en processus.
Gotcha de mise en œuvre du trigger : Si un trigger appelle des hooks de manière synchrone à l'intérieur de la boucle d'événements asyncio, cela peut échouer ou se bloquer. Préférez appeler les hooks via sync_to_async(...) (ou assurez-vous autrement que les appels de hook sont async-safe).
Impact clé du code : Le code des tâches peut toujours importer des sessions/modèles ORM, mais toute tentative de les utiliser pour communiquer avec la BD de métadonnées échouera avec :
RuntimeError: Direct database access via the ORM is not allowed in Airflow 3.x
Motifs à rechercher
Lors du scan des DAG, des opérateurs personnalisés et des fonctions @task, recherchez :
- Assistants de session :
provide_session,create_session,@provide_session - Sessions depuis les paramètres :
from airflow.settings import Session - Accès au moteur :
from airflow.settings import engine - Utilisation ORM avec modèles :
session.query(DagModel)...,session.query(DagRun)...
Remplacement : Client Python Airflow
Préféré pour les motifs d'accès aux métadonnées enrichis. Ajoutez à requirements.txt :
apache-airflow-client==<your-airflow-runtime-version>
Exemple d'utilisation :
import os
from airflow.sdk import BaseOperator
import airflow_client.client
from airflow_client.client.api.dag_api import DAGApi
_HOST = os.getenv("AIRFLOW__API__BASE_URL", "https://<your-org>.astronomer.run/<deployment>/")
_TOKEN = os.getenv("DEPLOYMENT_API_TOKEN")
class ListDagsOperator(BaseOperator):
def execute(self, context):
config = airflow_client.client.Configuration(host=_HOST, access_token=_TOKEN)
with airflow_client.client.ApiClient(config) as api_client:
dag_api = DAGApi(api_client)
dags = dag_api.get_dags(limit=10)
self.log.info("Found %d DAGs", len(dags.dags))
Remplacement : Appels directs à l'API REST
Pour les cas simples, appelez l'API REST directement avec requests :
from airflow.sdk import task
import os
import requests
_HOST = os.getenv("AIRFLOW__API__BASE_URL", "https://<your-org>.astronomer.run/<deployment>/")
_TOKEN = os.getenv("DEPLOYMENT_API_TOKEN")
@task
def list_dags_via_api() -> None:
response = requests.get(
f"{_HOST}/api/v2/dags",
headers={"Accept": "application/json", "Authorization": f"Bearer {_TOKEN}"},
params={"limit": 10}
)
response.raise_for_status()
print(response.json())
Règles de migration Airflow de Ruff
Utilisez les règles Airflow de Ruff pour détecter et corriger automatiquement de nombreux changements incompatibles.
- AIR30 / AIR301 / AIR302 : Code et imports supprimés dans Airflow 3 — doivent être corrigés.
- AIR31 / AIR311 / AIR312 : Code et imports dépréciés — fonctionnent toujours mais seront supprimés dans les futures versions ; devraient être corrigés.
Commandes à exécuter (via uv) par rapport à la racine du projet :
# Corriger automatiquement tous les problèmes Airflow détectables (sûrs + non sûrs)
ruff check --preview --select AIR --fix --unsafe-fixes .
# Vérifier les problèmes Airflow restants sans corriger
ruff check --preview --select AIR .
Fichiers de référence
Pour des exemples de code détaillés et des motifs de migration, consultez :
- reference/config-changes.md — Déplacements, renommages et suppressions de sections
airflow.cfg - reference/migration-patterns.md — Exemples de code pour les imports, la planification, XCom, les Assets, les bundles DAG, les changements de comportement à l'exécution
- reference/removed-methods.md — Méthodes de modèle supprimées avec chemins de migration SDK/API
- reference/migration-checklist.md — Motifs de recherche et correctifs pour les problèmes que Ruff ne détecte pas
Tableaux de référence rapide
Changements clés des imports
| Airflow 2.x | Airflow 3 |
|---|---|
airflow.operators.dummy_operator.DummyOperator |
airflow.providers.standard.operators.empty.EmptyOperator |
airflow.operators.bash.BashOperator |
airflow.providers.standard.operators.bash.BashOperator |
airflow.operators.python.PythonOperator |
airflow.providers.standard.operators.python.PythonOperator |
airflow.decorators.dag |
airflow.sdk.dag |
airflow.decorators.task |
airflow.sdk.task |
airflow.datasets.Dataset |
airflow.sdk.Asset |
Changements des clés de contexte
| Clé supprimée | Remplacement |
|---|---|
execution_date |
context["dag_run"].logical_date |
tomorrow_ds / yesterday_ds |
Utilisez ds avec calcul de date : macros.ds_add(ds, 1) / macros.ds_add(ds, -1) |
prev_ds / next_ds |
prev_start_date_success ou API timetable |
triggering_dataset_events |
triggering_asset_events |
templates_dict |
context["params"] |
Exécutions déclenchées par asset : logical_date peut être None ; utilisez context["dag_run"].logical_date de manière défensive.
Impossible de déclencher avec une logical_date future : Utilisez logical_date=None et fiez-vous plutôt à run_id.
Note cron : pour les exécutions planifiées utilisant cron, la sémantique de logical_date diffère sous CronTriggerTimetable (alignant logical_date avec run_after). Si vous avez besoin des intervalles de données cron de style Airflow 2, considérez AIRFLOW__SCHEDULER__CREATE_CRON_DATA_INTERVAL=True.
Changements de comportement par défaut
| Paramètre | Défaut Airflow 2 | Défaut Airflow 3 |
|---|---|---|
schedule |
timedelta(days=1) |
None |
catchup |
True |
False |
Changements de comportement des callbacks
on_success_callbackne s'exécute plus en cas de skip ; utilisezon_skipped_callbacksi nécessaire.@teardownavecTriggerRule.ALWAYSn'est pas autorisé ; les teardowns s'exécutent maintenant même si l'exécution du DAG s'arrête prématurément.
Ressources
Skills connexes
- testing-dags : Pour tester les DAG après la migration
- debugging-dags : Pour dépanner les problèmes de migration
- deploying-airflow : Pour déployer les DAG migrés en production