Migrer airflow-ai-sdk vers apache-airflow-providers-common-ai
Cette skill migre les projets Airflow de airflow-ai-sdk vers apache-airflow-providers-common-ai (0.1.0+), le provider IA officiel d'Airflow construit sur PydanticAI.
CRITIQUE : Le nouveau provider nécessite Airflow 3.0+ et pydantic-ai-slim >= 1.34.0. La surface API a changé : la configuration LLM passe du code (chaînes/objets de modèle) aux connexions Airflow (type
pydanticai). Il n'y a pas de@task.embeddans le nouveau provider.
Avant de commencer
Utilisez l'outil Grep avec le motif ci-dessous pour inventorier tout ce qui doit être migré :
airflow_ai_sdk|airflow-ai-sdk|ai_sdk|@task\.llm|@task\.agent|@task\.llm_branch|@task\.embed
À partir des résultats, capturez :
- Tous les fichiers important
airflow-ai-sdk/airflow_ai_sdk - Quels décorateurs sont utilisés :
@task.llm,@task.agent,@task.llm_branch,@task.embed - Le motif de configuration du modèle (noms de chaîne comme
"gpt-5", ou objetsOpenAIModel(...)) - Toutes les sous-classes
airflow_ai_sdk.BaseModelutilisées commeoutput_type
Utilisez cet inventaire pour guider les étapes ci-dessous.
Étape 1 : Mettre à jour requirements.txt
Supprimez :
airflow-ai-sdk[openai]
# ou toute variante : airflow-ai-sdk[openai]==0.1.7, airflow-ai-sdk[anthropic], etc.
Ajoutez :
apache-airflow-providers-common-ai[openai]>=0.1.0
Utilisez la dernière version 0.x disponible, sauf si l'utilisateur en a épinglé une spécifique. Les extras disponibles correspondent au provider LLM : [openai], [anthropic], [google], [bedrock], [groq], [mistral], [mcp].
Conservez sentence-transformers et torch si le projet utilise des embeddings (ils s'exécutent désormais via un simple @task au lieu de @task.embed).
Étape 2 : Créer une connexion PydanticAI
Le nouveau provider utilise une connexion Airflow au lieu de chaînes de modèle ou d'objets dans le code.
Type de connexion : pydanticai
ID de connexion par défaut : pydanticai_default
Via variable d'environnement (.env)
AIRFLOW_CONN_PYDANTICAI_DEFAULT='{
"conn_type": "pydanticai",
"password": "<api-key>",
"extra": {
"model": "<provider>:<model-name>"
}
}'
Format du modèle
Le champ modèle utilise le format provider:model :
| Provider | Exemple de valeur de modèle |
|---|---|
| OpenAI | openai:gpt-5 |
| Anthropic | anthropic:claude-sonnet-4-20250514 |
google:gemini-2.5-pro |
|
| Groq | groq:llama-3.3-70b-versatile |
| Mistral | mistral:mistral-large-latest |
| Bedrock | bedrock:us.anthropic.claude-sonnet-4-20250514-v1:0 |
Endpoints personnalisés (Ollama, vLLM, Snowflake Cortex, etc.)
Définissez host sur l'URL de base :
AIRFLOW_CONN_PYDANTICAI_CORTEX='{
"conn_type": "pydanticai",
"password": "<api-key>",
"host": "https://my-endpoint.com/v1",
"extra": {
"model": "openai:<model-name>"
}
}'
Utilisez le préfixe openai: pour toute API compatible OpenAI, quel que soit le provider réel.
Convention d'ID de connexion
Le nom de la variable d'environnement détermine l'ID de connexion :
AIRFLOW_CONN_PYDANTICAI_DEFAULTcréepydanticai_defaultAIRFLOW_CONN_PYDANTICAI_CORTEXcréepydanticai_cortex
Priorité de résolution du modèle
- Paramètre
model_idsur le décorateur/opérateur (plus élevé) modeldans l'extra JSON de la connexion (fallback)
Étape 3 : Migrer les décorateurs
@task.llm
# AVANT (airflow-ai-sdk)
import airflow_ai_sdk as ai_sdk
class MyOutput(ai_sdk.BaseModel):
field: str
@task.llm(
model="gpt-5", # ou model=OpenAIModel(...)
system_prompt="You are helpful.",
output_type=MyOutput,
)
def my_task(text: str) -> str:
return text
# APRÈS (apache-airflow-providers-common-ai)
from pydantic import BaseModel
class MyOutput(BaseModel):
field: str
@task.llm(
llm_conn_id="pydanticai_default", # ID de connexion Airflow
system_prompt="You are helpful.",
output_type=MyOutput,
)
def my_task(text: str) -> str:
return text
Mappage des paramètres :
| airflow-ai-sdk | common-ai provider | Notes |
|---|---|---|
model="gpt-5" |
llm_conn_id="pydanticai_default" |
Modèle spécifié dans la connexion |
model=OpenAIModel(...) |
llm_conn_id="pydanticai_default" |
Modèle + endpoint dans la connexion |
system_prompt="..." |
system_prompt="..." |
Inchangé |
output_type=MyModel |
output_type=MyModel |
Inchangé |
result_type=MyModel |
output_type=MyModel |
result_type était déjà déprécié |
| (non disponible) | model_id="openai:gpt-5" |
Substituer le modèle de la connexion |
| (non disponible) | require_approval=True |
Examen HITL intégré |
| (non disponible) | agent_params={...} |
Kwargs supplémentaires pour pydantic-ai Agent |
@task.llm_branch
# AVANT
@task.llm_branch(
model="gpt-5",
system_prompt="Choose a team...",
allow_multiple_branches=False,
)
def route(text: str) -> str:
return text
# APRÈS
@task.llm_branch(
llm_conn_id="pydanticai_default",
system_prompt="Choose a team...",
allow_multiple_branches=False, # même paramètre, inchangé
)
def route(text: str) -> str:
return text
Seul changement : model= devient llm_conn_id=.
@task.agent
C'est le plus grand changement d'API. L'Agent n'est plus pré-construit dans le code utilisateur.
# AVANT (airflow-ai-sdk) - Agent construit au niveau du module
from pydantic_ai import Agent
my_agent = Agent(
"gpt-5",
system_prompt="You are a research assistant.",
tools=[search_tool, lookup_tool],
)
@task.agent(agent=my_agent)
def research(question: str) -> str:
return question
# APRÈS (common-ai provider) - Pas d'objet Agent, config via paramètres
@task.agent(
llm_conn_id="pydanticai_default",
system_prompt="You are a research assistant.",
agent_params={"tools": [search_tool, lookup_tool]},
)
def research(question: str) -> str:
return question
Mappage des paramètres :
| airflow-ai-sdk | common-ai provider | Notes |
|---|---|---|
agent=Agent(model, ...) |
llm_conn_id="..." |
Modèle depuis la connexion |
system_prompt de l'Agent |
system_prompt="..." |
Désormais un paramètre du décorateur |
tools=[...] de l'Agent |
agent_params={"tools": [...]} |
Outils via dict agent_params |
output_type de l'Agent |
output_type=MyModel |
Désormais un paramètre du décorateur |
| (non disponible) | toolsets=[...] |
Objets Toolset de pydantic-ai 1.x |
| (non disponible) | durable=True |
Mise en cache au niveau des étapes |
| (non disponible) | enable_hitl_review=True |
Boucle d'examen humain itérative |
Idée clé : Tout ce qui était configuré sur le constructeur Agent() va désormais soit dans un paramètre de décorateur de premier niveau, soit dans agent_params. Le dict agent_params est transmis directement au constructeur Agent de pydantic-ai.
@task.embed (PAS D'ÉQUIVALENT)
Le nouveau provider n'inclut PAS de décorateur embed. Remplacez par un simple @task :
# AVANT (airflow-ai-sdk)
@task.embed(
model_name="all-MiniLM-L6-v2",
encode_kwargs={"normalize_embeddings": True},
max_active_tis_per_dagrun=1,
)
def embed_text(text: str) -> str:
return text
# APRÈS (simple @task avec sentence-transformers)
@task(max_active_tis_per_dagrun=1)
def embed_text(text: str) -> list[float]:
from sentence_transformers import SentenceTransformer
model = SentenceTransformer("all-MiniLM-L6-v2")
return model.encode(text, normalize_embeddings=True).tolist()
Remarque : Le modèle est chargé à chaque exécution de tâche. Pour les petites charges de travail, c'est correct. Pour les gros lots, envisagez d'encoder tous les textes dans une seule tâche au lieu d'utiliser .expand().
Étape 4 : Mettre à jour les imports
| Ancien import | Nouvel import |
|---|---|
import airflow_ai_sdk as ai_sdk |
Supprimez entièrement |
from airflow_ai_sdk import BaseModel |
from pydantic import BaseModel |
from airflow_ai_sdk.models.base import BaseModel |
from pydantic import BaseModel |
class Foo(ai_sdk.BaseModel): |
class Foo(BaseModel): |
from pydantic_ai import Agent |
Supprimez si Agent était utilisé uniquement pour @task.agent |
from pydantic_ai.models.openai import OpenAIModel |
Supprimez (configuration du modèle maintenant dans la connexion) |
Les décorateurs @task.llm, @task.agent, @task.llm_branch sont auto-enregistrés par le provider. Aucun import explicite nécessaire au-delà de from airflow.sdk import task.
Les imports pydantic_ai pour une utilisation non-décorateur (par ex. BinaryContent pour multimodal) restent valides puisque le nouveau provider dépend de pydantic-ai-slim>=1.34.0.
Étape 5 : Mettre à jour connections.yaml (si utilisé pour les tests locaux)
pydanticai_default:
conn_type: pydanticai
password: <api-key>
extra:
model: "openai:gpt-5"
Pour les endpoints personnalisés :
pydanticai_cortex:
conn_type: pydanticai
password: <api-key>
host: https://my-endpoint.com/v1
extra:
model: "openai:llama3.1-8b"
Étape 6 : Nettoyer les variables d'environnement
Le nouveau provider lit la configuration du modèle depuis la connexion pydanticai, donc les variables d'environnement qui alimentaient précédemment le modèle dans le code sont généralement redondantes. Avant de supprimer l'une d'elles, cherchez dans le projet (et tout script/service sibling) pour confirmer que rien d'autre ne les référence :
OPENAI_API_KEY|OPENAI_BASE_URL|ANTHROPIC_API_KEY|GOOGLE_API_KEY
Candidats à la suppression uniquement si aucun autre code les référence :
OPENAI_API_KEY(maintenant dans le champ password de la connexion pydanticai)OPENAI_BASE_URL(maintenant dans le champ host de la connexion)- Variables de noms de modèle personnalisés (maintenant dans la extra.model de la connexion)
Si quoi que ce soit en dehors des DAG migrés les utilise toujours (autres DAG non encore migrés, scripts d'aide, services non-Airflow partageant le .env), laissez-les en place.
Conservez les variables d'environnement AIRFLOW_CONN_* pour toutes les connexions.
Étape 7 : Vérifier
Après la migration, cherchez dans la base de code pour confirmer qu'aucune référence obsolète ne subsiste :
airflow_ai_sdk|airflow-ai-sdk|ai_sdk\.BaseModel|from pydantic_ai import Agent|from pydantic_ai.models
Vérifiez :
- [ ] Aucun import depuis
airflow_ai_sdk - [ ] Aucun objet
Agent()créé pour@task.agent(sauf s'il est utilisé en dehors des décorateurs) - [ ] Aucun paramètre
model=sur les décorateurs LLM (devrait êtrellm_conn_id=) - [ ] Tous les
@task.embedremplacés par un simple@task - [ ] Connexion
pydanticaiconfigurée dans.envou connections.yaml - [ ]
requirements.txtcontientapache-airflow-providers-common-ai[...]au lieu deairflow-ai-sdk[...]
Référence rapide : Nouvelles fonctionnalités du provider common-ai
Ces fonctionnalités sont disponibles après la migration mais n'ont pas d'équivalent airflow-ai-sdk :
| Fonctionnalité | Paramètre | Description |
|---|---|---|
| Approbation HITL | require_approval=True sur @task.llm |
Pause pour examen humain avant retour |
| Boucle d'examen HITL | enable_hitl_review=True sur @task.agent |
Examen itérative avec régénération |
| Exécution durable | durable=True sur @task.agent |
Mise en cache au niveau des étapes pour la résilience |
| Journalisation des outils | enable_tool_logging=True sur @task.agent |
Logs d'appel d'outil au niveau INFO (défaut : activé) |
| Substitution de modèle | model_id="openai:gpt-5" |
Substituer le modèle de la connexion par tâche |
| Analyse de fichier | @task.llm_file_analysis |
Analyser des fichiers/images via ObjectStoragePath |
| NL-to-SQL | @task.llm_sql |
Générer SQL à partir du langage naturel |