Plugins Airflow 3
Les plugins Airflow 3 vous permettent d'intégrer directement des applications FastAPI, des interfaces React, des middlewares, des macros, des boutons d'opérateur et des calendriers personnalisés dans le processus Airflow. Aucun sidecar, aucun serveur supplémentaire.
CRITIQUE : Les composants de plugin (fastapi_apps, react_apps, external_views) nécessitent Airflow 3.1+. NE JAMAIS importer
flask,flask_appbuilder, ou utiliserappbuilder_views/flask_blueprints— ce sont des patterns Airflow 2 qui ne fonctionnent pas dans Airflow 3. Si le code existant les utilise, réécrivez l'intégralité du bloc d'enregistrement en utilisant FastAPI.Sécurité : Les endpoints des plugins FastAPI ne sont pas automatiquement protégés par l'authentification Airflow. Si vos endpoints doivent être privés, implémentez explicitement l'authentification en utilisant les utilitaires de sécurité FastAPI.
Redémarrage requis : Les changements apportés aux fichiers de plugin Python nécessitent de redémarrer le serveur API. Les changements de fichiers statiques (HTML, JS, CSS) sont chargés immédiatement. Définissez
AIRFLOW__CORE__LAZY_LOAD_PLUGINS=Falseen développement pour charger les plugins au démarrage plutôt que de façon lazy.Toujours des chemins relatifs : Dans
external_views,hrefne doit pas avoir de slash initial. En HTML et JavaScript, utilisez des chemins relatifs pour tous les actifs et appelsfetch(). Les chemins absolus se cassent derrière les proxies inverses.
Avant d'écrire du code, vérifiez
- Utilisez-vous
fastapi_apps/ FastAPI — pasappbuilder_views/ Flask ? - Tous les chemins d'actifs HTML/JS et appels
fetch()sont-ils relatifs (pas de slash initial) ? - Tous les appels SDK synchrones ou SQLAlchemy sont-ils encapsulés dans
asyncio.to_thread()? - Les répertoires
static/etassets/existent-ils avant le montage de l'application FastAPI ? - Si l'endpoint doit être privé, avez-vous ajouté une authentification FastAPI explicite ?
Étape 1 : Choisir les composants du plugin
Une classe de plugin unique peut enregistrer plusieurs types de composants à la fois.
| Composant | Ce qu'il fait | Champ |
|---|---|---|
| Endpoints API personnalisés | Application FastAPI montée dans le processus Airflow | fastapi_apps |
| Lien nav / page | Intègre une URL en iframe ou crée un lien externe | external_views |
| Composant React | Application React personnalisée intégrée dans l'interface Airflow | react_apps |
| Middleware API | Intercepte toutes les requêtes/réponses de l'API Airflow | fastapi_root_middlewares |
| Macros Jinja | Fonctions Python réutilisables dans les templates DAG | macros |
| Bouton task instance | Bouton de lien supplémentaire dans la vue Détail de la tâche | operator_extra_links / global_operator_extra_links |
| Calendrier personnalisé | Logique de planification personnalisée | timetables |
| Event hooks | Callbacks listener pour les événements Airflow | listeners |
Étape 2 : Skeleton d'enregistrement du plugin
Structure de fichiers du projet
Donnez à chaque plugin son propre sous-répertoire sous plugins/ — cela garde le fichier Python, les actifs statiques et les templates ensemble et rend les projets multi-plugins gérables :
plugins/
my-plugin/
plugin.py # sous-classe AirflowPlugin — découverte auto par Airflow
static/
index.html
app.js
assets/
icon.svg
BASE_DIR = Path(__file__).parent dans plugin.py se résout en plugins/my-plugin/ — les chemins statiques et actifs seront corrects relativement à cela. Créez le sous-répertoire et tous les dossiers static/assets avant de démarrer Airflow, sinon StaticFiles lèvera une exception à l'import.
from pathlib import Path
from airflow.plugins_manager import AirflowPlugin
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
BASE_DIR = Path(__file__).parent
app = FastAPI(title="My Plugin")
# Les deux répertoires doivent exister avant le démarrage d'Airflow ou FastAPI lève une exception à l'import
app.mount("/static", StaticFiles(directory=BASE_DIR / "static"), name="static")
app.mount("/assets", StaticFiles(directory=BASE_DIR / "assets"), name="assets")
class MyPlugin(AirflowPlugin):
name = "my_plugin"
fastapi_apps = [
{
"app": app,
"url_prefix": "/my-plugin", # plugin disponible à {AIRFLOW_HOST}/my-plugin/
"name": "My Plugin",
}
]
external_views = [
{
"name": "My Plugin",
"href": "my-plugin/ui", # PAS de slash initial — se casse sur Astro et proxies inverses
"destination": "nav", # voir table des emplacements ci-dessous
"category": "browse", # catégorie barre nav (destination nav uniquement)
"url_route": "my-plugin", # nom de route unique (requis pour les apps React)
"icon": "/my-plugin/static/icon.svg" # UTILISE un slash initial — servi par FastAPI
}
]
Emplacements des vues externes
destination |
Où cela apparaît |
|---|---|
"nav" |
Barre de navigation à gauche (définissez aussi category) |
"dag" |
Onglet supplémentaire sur chaque page Dag |
"dag_run" |
Onglet supplémentaire sur chaque page de run Dag |
"task" |
Onglet supplémentaire sur chaque page de tâche |
"task_instance" |
Onglet supplémentaire sur chaque page de task instance |
Catégories de la barre nav (destination: "nav")
Définissez "category" pour placer le lien sous un groupe nav spécifique : "browse", "admin", ou omettez pour le niveau supérieur.
URLs externes et plugins minimaux
href peut être un chemin relatif vers un endpoint interne ("my-plugin/ui") ou une URL externe complète. Un plugin avec seulement external_views et pas de fastapi_apps est valide — pas de backend nécessaire pour un simple lien ou onglet :
from airflow.plugins_manager import AirflowPlugin
class LearnViewPlugin(AirflowPlugin):
name = "learn_view_plugin"
external_views = [
{
"name": "Learn Airflow 3",
"href": "https://www.astronomer.io/docs/learn",
"destination": "dag", # ajoute un onglet à chaque page Dag
"url_route": "learn"
}
]
La règle du pas-de-slash-initial s'applique uniquement aux chemins internes — les URLs complètes https:// sont acceptées.
Étape 3 : Servir le point d'entrée de l'interface
@app.get("/ui", response_class=FileResponse)
async def serve_ui():
return FileResponse(BASE_DIR / "static" / "index.html")
En HTML, utilisez toujours des chemins relatifs. Les chemins absolus se cassent quand Airflow est monté à un sous-chemin :
<!-- correct -->
<link rel="stylesheet" href="static/app.css" />
<script src="static/app.js?v=20240315"></script>
<!-- se casse derrière un proxy inverse -->
<script src="/my-plugin/static/app.js"></script>
Même règle en JavaScript :
fetch('api/dags') // correct — relatif à la page actuelle
fetch('/my-plugin/api/dags') // se casse sur Astro et déploiements sub-path
Étape 4 : Appeler l'API Airflow depuis votre plugin
Seulement si votre plugin appelle l'API REST Airflow. Les plugins qui servent uniquement des fichiers statiques, enregistrent
external_views, ou utilisent l'accès direct à la DB n'ont pas besoin de cette étape — passez à l'étape 5 ou 6.
Ajouter la dépendance
Seulement si la communication API REST est implémentée : ajoutez apache-airflow-client aux dépendances du projet. Vérifiez quel fichier existe et agissez en conséquence :
| Fichier trouvé | Action |
|---|---|
requirements.txt |
Ajoutez apache-airflow-client |
pyproject.toml (uv / poetry) |
uv add apache-airflow-client ou poetry add apache-airflow-client |
| Aucun des deux | Dites à l'utilisateur : « Ajoutez apache-airflow-client à vos dépendances avant d'exécuter le plugin. » |
Utilisez apache-airflow-client pour communiquer avec l'API REST d'Airflow lui-même. Le SDK est synchrone mais les routes FastAPI sont async — n'appelez jamais directement les méthodes SDK bloquantes dans async def ou vous allez bloquer la boucle d'événements et geler toutes les requêtes concurrentes.
Gestion des tokens JWT
Mettez en cache un token par processus. Rafraîchissez 5 minutes avant l'expiration d'une heure. Utilisez le double-checked locking pour que plusieurs requêtes concurrentes ne se disputent pas le rafraîchissement simultanément :
Remplacez
MYPLUGIN_par un court préfixe majuscule dérivé du nom du plugin (par ex. si le plugin s'appelle « Trip Analyzer », utilisezTRIP_ANALYZER_). Si aucun nom de plugin n'a été donné, demandez à l'utilisateur avant d'écrire les noms des variables d'environnement.
import asyncio
import os
import threading
import time
import airflow_client.client as airflow_sdk
import requests
AIRFLOW_HOST = os.environ.get("MYPLUGIN_HOST", "http://localhost:8080")
AIRFLOW_USER = os.environ.get("MYPLUGIN_USERNAME", "admin")
AIRFLOW_PASS = os.environ.get("MYPLUGIN_PASSWORD", "admin")
AIRFLOW_TOKEN = os.environ.get("MYPLUGIN_TOKEN") # Astronomer Astro : token API de déploiement
_cached_token: str | None = None
_token_expires_at: float = 0.0
_token_lock = threading.Lock()
def _fetch_fresh_token() -> str:
"""Échange username/password pour un JWT via l'endpoint d'auth d'Airflow."""
response = requests.post(
f"{AIRFLOW_HOST}/auth/token",
json={"username": AIRFLOW_USER, "password": AIRFLOW_PASS},
timeout=10,
)
response.raise_for_status()
return response.json()["access_token"]
def _get_token() -> str:
# Astronomer Astro production : utiliser le token API de déploiement statique directement
if AIRFLOW_TOKEN:
return AIRFLOW_TOKEN
global _cached_token, _token_expires_at
now = time.monotonic()
# Chemin rapide — pas de lock si toujours valide
if _cached_token and now < _token_expires_at:
return _cached_token
# Chemin lent — un thread rafraîchit, les autres attendent
with _token_lock:
if _cached_token and now < _token_expires_at:
return _cached_token
_cached_token = _fetch_fresh_token()
_token_expires_at = now + 55 * 60 # rafraîchir 5 min avant l'expiration d'une heure
return _cached_token
def _make_config() -> airflow_sdk.Configuration:
config = airflow_sdk.Configuration(host=AIRFLOW_HOST)
config.access_token = _get_token()
return config
Après implémentation de l'auth, dites à l'utilisateur :
-
Développement local : définissez
MYPLUGIN_USERNAMEetMYPLUGIN_PASSWORDdans.env— l'échange JWT se fait automatiquement. -
Astronomer Astro (production) : créez un token API de déploiement et définissez-le en tant que
MYPLUGIN_TOKEN— l'échange JWT est complètement ignoré :- Interface Astro → ouvrir le déploiement → Access → API Tokens → + Deployment API Token
- Copier la valeur du token (affichée une seule fois)
astro deployment variable create MYPLUGIN_TOKEN=<token>
MYPLUGIN_USERNAMEetMYPLUGIN_PASSWORDne sont pas nécessaires sur Astro.
Encapsuler les appels SDK avec asyncio.to_thread
from fastapi import HTTPException
from airflow_client.client.api import DAGApi
@app.get("/api/dags")
async def list_dags():
try:
def _fetch():
with airflow_sdk.ApiClient(_make_config()) as client:
return DAGApi(client).get_dags(limit=100).dags
dags = await asyncio.to_thread(_fetch)
return [{"dag_id": d.dag_id, "is_paused": d.is_paused, "timetable_summary": d.timetable_summary} for d in dags]
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
Noms de champs API : Ne devinez jamais les noms de champs de réponse — vérifiez contre la référence API REST. Champs clés de
DAGResponse:dag_id,dag_display_name,description,is_paused,timetable_summary,timetable_description,fileloc,owners,tags.
Le pattern est toujours : définissez une def interne simple _fetch() avec toute la logique SDK, puis await asyncio.to_thread(_fetch).
Alternative : Accès direct à la base de données
Avertissement — utiliser avec prudence et le signaler à l'utilisateur. La métadb d'Airflow n'est pas une interface publique. Les écritures directes ou les requêtes mal formées peuvent corrompre l'état du scheduler. Chaque fois que vous utilisez ce pattern, dites explicitement à l'utilisateur : « Ceci accède directement à la base de données interne d'Airflow. Les modèles internes ne font pas partie de l'API publique, peuvent changer entre les versions d'Airflow, et les requêtes incorrectes peuvent causer des problèmes dans la métadb. Préférez
apache-airflow-clientsauf si l'opération n'est pas exposée via l'API REST. »
Puisque les endpoints du plugin FastAPI s'exécutent à l'intérieur du processus du serveur API (pas dans un worker de tâche), ils ont accès direct aux modèles SQLAlchemy internes d'Airflow — pas d'aller-retour HTTP ou JWT nécessaire. Utilisez uniquement pour les opérations de lecture non exposées via l'API REST, ou quand le surcoût HTTP supplémentaire compte vraiment. Encapsulez toujours les appels DB dans asyncio.to_thread() — les requêtes SQLAlchemy sont bloquantes.
from airflow.models import DagBag, DagModel
from airflow.utils.db import provide_session
@app.get("/api/dags/status")
async def dag_status():
def _fetch():
@provide_session
def _query(session=None):
dagbag = DagBag()
paused = sum(
1 for dag_id in dagbag.dags
if (m := session.query(DagModel).filter(DagModel.dag_id == dag_id).first())
and m.is_paused
)
return {"total": len(dagbag.dags), "paused": paused}
return _query()
return await asyncio.to_thread(_fetch)
Étape 5 : Patterns d'endpoints API courants
Si vous avez besoin d'une méthode SDK ou d'un champ non montré dans les exemples ci-dessous, vérifiez-le avant de générer du code — ne devinez pas. Soit exécutez
python3 -c "from airflow_client.client.api import <Class>; print([m for m in dir(<Class>) if not m.startswith('_')])"dans n'importe quel environnement où le SDK est installé, soit cherchez la définition de la classe dans le repoapache/airflow-client-python.
from airflow_client.client.api import DAGApi, DagRunApi
from airflow_client.client.models import TriggerDAGRunPostBody, DAGPatchBody
@app.post("/api/dags/{dag_id}/trigger")
async def trigger_dag(dag_id: str):
def _run():
with airflow_sdk.ApiClient(_make_config()) as client:
return DagRunApi(client).trigger_dag_run(dag_id, TriggerDAGRunPostBody())
result = await asyncio.to_thread(_run)
return {"run_id": result.dag_run_id, "state": normalize_state(result.state)}
@app.patch("/api/dags/{dag_id}/pause")
async def toggle_pause(dag_id: str, is_paused: bool):
def _run():
with airflow_sdk.ApiClient(_make_config()) as client:
DAGApi(client).patch_dag(dag_id, DAGPatchBody(is_paused=is_paused))
await asyncio.to_thread(_run)
return {"dag_id": dag_id, "is_paused": is_paused}
@app.delete("/api/dags/{dag_id}")
async def delete_dag(dag_id: str):
def _run():
with airflow_sdk.ApiClient(_make_config()) as client:
DAGApi(client).delete_dag(dag_id)
await asyncio.to_thread(_run)
return {"deleted": dag_id}
def normalize_state(raw) -> str:
"""Convertir les objets énumérés du SDK en simples chaînes avant envoi au frontend."""
if raw is None:
return "never_run"
return str(raw).lower()
Runs DAG, task instances et logs
Ce sont les appels les plus courants au-delà du CRUD DAG basique. Pour tout ce qui n'est pas montré ici, consultez la référence API REST pour les endpoints disponibles et les noms des classes/méthodes SDK correspondants.
from airflow_client.client.api import DagRunApi, TaskInstanceApi
# Dernier run pour un DAG
@app.get("/api/dags/{dag_id}/runs/latest")
async def latest_run(dag_id: str):
def _fetch():
with airflow_sdk.ApiClient(_make_config()) as client:
runs = DagRunApi(client).get_dag_runs(dag_id, limit=1, order_by="-start_date").dag_runs
return runs[0] if runs else None
run = await asyncio.to_thread(_fetch)
if not run:
return {"state": "never_run"}
return {"run_id": run.dag_run_id, "state": normalize_state(run.state)}
# Task instances pour un run spécifique
@app.get("/api/dags/{dag_id}/runs/{run_id}/tasks")
async def task_instances(dag_id: str, run_id: str):
def _fetch():
with airflow_sdk.ApiClient(_make_config()) as client:
return TaskInstanceApi(client).get_task_instances(dag_id, run_id).task_instances
tasks = await asyncio.to_thread(_fetch)
return [{"task_id": t.task_id, "state": normalize_state(t.state)} for t in tasks]
# Log de tâche (try_number commence à 1)
@app.get("/api/dags/{dag_id}/runs/{run_id}/tasks/{task_id}/logs/{try_number}")
async def task_log(dag_id: str, run_id: str, task_id: str, try_number: int):
def _fetch():
with airflow_sdk.ApiClient(_make_config()) as client:
return TaskInstanceApi(client).get_log(
dag_id, run_id, task_id, try_number, map_index=-1
)
result = await asyncio.to_thread(_fetch)
return {"log": result.content if hasattr(result, "content") else str(result)}
Proxy de streaming
Utilisez StreamingResponse pour proxifier du contenu binaire depuis une URL externe à travers le plugin — utile quand le navigateur ne peut pas récupérer la ressource directement (CORS, auth, etc.) :
import requests
from starlette.responses import StreamingResponse
@app.get("/api/files/{filename}")
async def proxy_file(filename: str):
def _stream():
r = requests.get(f"https://files.example.com/{filename}", stream=True)
r.raise_for_status()
return r
response = await asyncio.to_thread(_stream)
return StreamingResponse(
response.iter_content(chunk_size=8192),
media_type="application/octet-stream",
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
)
Notez que requests.get() est bloquant — récupérez dans asyncio.to_thread pour que la boucle d'événements ne soit pas figée en attendant le serveur distant.
Étape 6 : Autres types de composants de plugin
Macros
Les macros sont chargées par le scheduler (et le processeur DAG), pas le serveur API. Redémarrez le scheduler après les changements.
from airflow.plugins_manager import AirflowPlugin
def format_confidence(confidence: float) -> str:
return f"{confidence * 100:.2f}%"
class MyPlugin(AirflowPlugin):
name = "my_plugin"
macros = [format_confidence]
Utilisez dans n'importe quel champ templé — y compris avec XCom :
{{ macros.my_plugin.format_confidence(0.95) }}
{{ macros.my_plugin.format_confidence(ti.xcom_pull(task_ids='score_task')['confidence']) }}
Le pattern de nommage est toujours macros.{nom_plugin}.{nom_fonction}.
Middleware
Le middleware s'applique à toutes les requêtes de l'API Airflow, y compris l'API REST intégrée et tous les plugins FastAPI. Utilisez avec prudence et filtrez explicitement les requêtes si nécessaire :
from starlette.middleware.base import BaseHTTPMiddleware
from fastapi import Request, Response
class AuditMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next) -> Response:
# s'exécute avant chaque requête au serveur API Airflow
response = await call_next(request)
return response
class MyPlugin(AirflowPlugin):
name = "my_plugin"
fastapi_root_middlewares = [
{"middleware": AuditMiddleware, "args": [], "kwargs": {}, "name": "Audit"}
]
Liens supplémentaires d'opérateur
from airflow.sdk.bases.operatorlink import BaseOperatorLink
class MyDashboardLink(BaseOperatorLink):
name = "Open in Dashboard"
def get_link(self, operator, *, ti_key, **context) -> str:
return f"https://my-dashboard.example.com/tasks/{ti_key.task_id}"
class MyPlugin(AirflowPlugin):
name = "my_plugin"
global_operator_extra_links = [MyDashboardLink()] # apparaît sur chaque tâche
# operator_extra_links = [MyDashboardLink()] # attacher à un opérateur spécifique à la place
Apps React
Les apps React sont intégrées en tant que bundles JavaScript servis via FastAPI. Le bundle doit s'exposer en tant que variable globale correspondant au nom du plugin :
// Dans votre bundle (par ex. my-app.js)
globalThis['My Plugin'] = MyComponent; // correspond au nom du plugin
globalThis.AirflowPlugin = MyComponent; // fallback qu'Airflow cherche
class MyPlugin(AirflowPlugin):
name = "my_plugin"
fastapi_apps = [{"app": app, "url_prefix": "/my-plugin", "name": "My Plugin"}]
react_apps = [
{
"name": "My Plugin",
"bundle_url": "/my-plugin/my-app.js",
"destination": "nav",
"category": "browse",
"url_route": "my-plugin",
}
]
Le même bundle peut être enregistré à plusieurs destinations en ajoutant plusieurs entrées — chacune nécessite un url_route unique :
react_apps = [
{"name": "My Widget", "bundle_url": "/my-plugin/widget.js", "destination": "nav", "url_route": "my-widget-nav"},
{"name": "My Widget", "bundle_url": "/my-plugin/widget.js", "destination": "dag", "url_route": "my-widget-dag"},
]
L'intégration des apps React est expérimentale dans Airflow 3.1. Les interfaces peuvent changer dans les versions futures.
Étape 7 : Variables d'environnement et déploiement
Ne codez jamais en dur les identifiants :
AIRFLOW_HOST = os.environ.get("MYPLUGIN_HOST", "http://localhost:8080")
AIRFLOW_USER = os.environ.get("MYPLUGIN_USERNAME", "admin")
AIRFLOW_PASS = os.environ.get("MYPLUGIN_PASSWORD", "admin")
Astro CLI local :
# .env
MYPLUGIN_HOST=http://localhost:8080
MYPLUGIN_USERNAME=admin
MYPLUGIN_PASSWORD=admin
astro dev restart # requis après tout changement de plugin Python
# Vérifier les logs par composant (CLI Astro) :
astro dev logs --api-server # FastAPI apps, external_views — les erreurs d'import de plugin s'affichent ici
astro dev logs --scheduler # macros, timetables, listeners, operator links
astro dev logs --dag-processor # erreurs d'analyse DAG
# Non-Astro :
airflow plugins # CLI — liste tous les plugins chargés
Astronomer en production :
astro deployment variable create --deployment-id <id> MYPLUGIN_HOST=https://airflow.example.com
Auto-reload en développement (saute le lazy loading) :
AIRFLOW__CORE__LAZY_LOAD_PLUGINS=False
Cache busting pour les fichiers statiques après déploiement :
<script src="static/app.js?v=20240315-1"></script>
Vérifier que le plugin a été chargé : ouvrir Admin > Plugins dans l'interface Airflow.
Docs OpenAPI sont auto-générés pour les plugins FastAPI :
- Interface Swagger :
{AIRFLOW_HOST}/{url_prefix}/docs - JSON OpenAPI :
{AIRFLOW_HOST}/{url_prefix}/openapi.json
Pièges courants
| Problème | Cause | Solution |
|---|---|---|
| Lien nav va à 404 | / initial dans href |
"my-plugin/ui" pas "/my-plugin/ui" |
| Icône nav ne s'affiche pas | / manquant dans icon |
icon prend un chemin absolu : "/my-plugin/static/icon.svg" |
| Boucle d'événements figée sous charge | SDK synchrone appelé directement dans async def |
Encapsuler avec asyncio.to_thread() |
| Erreurs 401 après 1 heure | JWT expire sans rafraîchissement | Utiliser le pattern de rafraîchissement 5 minutes avant expiration |
StaticFiles lève une exception au démarrage |
Répertoire manquant | Créer assets/ et static/ avant de démarrer |
| Plugin ne s'affiche pas | Fichier Python modifié sans redémarrage | astro dev restart |
| Endpoints accessibles sans connexion | Les apps FastAPI ne sont pas auto-authentifiées | Ajouter la sécurité FastAPI (par ex. OAuth2, clé API) si les endpoints doivent être privés |
| Middleware affectant les mauvaises routes | Le middleware s'applique à tout le trafic API | Filtrer par request.url.path à l'intérieur de dispatch() |
JS fetch() se casse sur Astro |
Chemin absolu dans fetch() |
Toujours utiliser des chemins relatifs : fetch('api/dags') |
Références
- Documentation des plugins Airflow
- Référence API REST Airflow — liste complète des endpoints avec noms des classes/méthodes SDK
- Astronomer : Utilisation des plugins Airflow