Stratégies d'Embedding
Guide pour sélectionner et optimiser les modèles d'embedding pour les applications de recherche vectorielle.
Quand utiliser cette compétence
- Choisir des modèles d'embedding pour RAG
- Optimiser les stratégies de chunking
- Affiner les embeddings pour des domaines spécifiques
- Comparer les performances des modèles d'embedding
- Réduire les dimensions d'embedding
- Gérer du contenu multilingue
Concepts fondamentaux
1. Comparaison des modèles d'embedding (2026)
| Modèle | Dimensions | Tokens max | Meilleur pour |
|---|---|---|---|
| voyage-3-large | 1024 | 32000 | Applications Claude (recommandé par Anthropic) |
| voyage-3 | 1024 | 32000 | Applications Claude, économique |
| voyage-code-3 | 1024 | 32000 | Recherche de code |
| voyage-finance-2 | 1024 | 32000 | Documents financiers |
| voyage-law-2 | 1024 | 32000 | Documents juridiques |
| text-embedding-3-large | 3072 | 8191 | Applications OpenAI, haute précision |
| text-embedding-3-small | 1536 | 8191 | Applications OpenAI, économique |
| bge-large-en-v1.5 | 1024 | 512 | Open source, déploiement local |
| all-MiniLM-L6-v2 | 384 | 256 | Rapide, léger |
| multilingual-e5-large | 1024 | 512 | Multilingue |
2. Pipeline d'embedding
Document → Chunking → Prétraitement → Modèle d'embedding → Vecteur
↓
[Chevauchement, Taille] [Nettoyage, Normalisation] [API/Local]
Templates
Template 1 : Embeddings Voyage AI (Recommandé pour Claude)
from langchain_voyageai import VoyageAIEmbeddings
from typing import List
import os
# Initialiser les embeddings Voyage AI (recommandé par Anthropic pour Claude)
embeddings = VoyageAIEmbeddings(
model="voyage-3-large",
voyage_api_key=os.environ.get("VOYAGE_API_KEY")
)
def get_embeddings(texts: List[str]) -> List[List[float]]:
"""Obtenir les embeddings de Voyage AI."""
return embeddings.embed_documents(texts)
def get_query_embedding(query: str) -> List[float]:
"""Obtenir l'embedding d'une seule requête."""
return embeddings.embed_query(query)
# Modèles spécialisés pour les domaines
code_embeddings = VoyageAIEmbeddings(model="voyage-code-3")
finance_embeddings = VoyageAIEmbeddings(model="voyage-finance-2")
legal_embeddings = VoyageAIEmbeddings(model="voyage-law-2")
Template 2 : Embeddings OpenAI
from openai import OpenAI
from typing import List
import numpy as np
client = OpenAI()
def get_embeddings(
texts: List[str],
model: str = "text-embedding-3-small",
dimensions: int = None
) -> List[List[float]]:
"""Obtenir les embeddings d'OpenAI avec réduction de dimension optionnelle."""
# Gérer le batch pour les listes importantes
batch_size = 100
all_embeddings = []
for i in range(0, len(texts), batch_size):
batch = texts[i:i + batch_size]
kwargs = {"input": batch, "model": model}
if dimensions:
# Réduction de dimensionnalité Matryoshka
kwargs["dimensions"] = dimensions
response = client.embeddings.create(**kwargs)
embeddings = [item.embedding for item in response.data]
all_embeddings.extend(embeddings)
return all_embeddings
def get_embedding(text: str, **kwargs) -> List[float]:
"""Obtenir un embedding unique."""
return get_embeddings([text], **kwargs)[0]
# Réduction de dimension avec embeddings Matryoshka
def get_reduced_embedding(text: str, dimensions: int = 512) -> List[float]:
"""Obtenir un embedding avec dimensions réduites (Matryoshka)."""
return get_embedding(
text,
model="text-embedding-3-small",
dimensions=dimensions
)
Template 3 : Embeddings locaux avec Sentence Transformers
from sentence_transformers import SentenceTransformer
from typing import List, Optional
import numpy as np
class LocalEmbedder:
"""Embedding local avec sentence-transformers."""
def __init__(
self,
model_name: str = "BAAI/bge-large-en-v1.5",
device: str = "cuda"
):
self.model = SentenceTransformer(model_name, device=device)
self.model_name = model_name
def embed(
self,
texts: List[str],
normalize: bool = True,
show_progress: bool = False
) -> np.ndarray:
"""Incorporer les textes avec normalisation optionnelle."""
embeddings = self.model.encode(
texts,
normalize_embeddings=normalize,
show_progress_bar=show_progress,
convert_to_numpy=True
)
return embeddings
def embed_query(self, query: str) -> np.ndarray:
"""Incorporer une requête avec le préfixe approprié pour les modèles de récupération."""
# Les modèles BGE et similaires bénéficient d'un préfixe de requête
if "bge" in self.model_name.lower():
query = f"Represent this sentence for searching relevant passages: {query}"
return self.embed([query])[0]
def embed_documents(self, documents: List[str]) -> np.ndarray:
"""Incorporer les documents pour l'indexation."""
return self.embed(documents)
# Modèle E5 avec instructions
class E5Embedder:
def __init__(self, model_name: str = "intfloat/multilingual-e5-large"):
self.model = SentenceTransformer(model_name)
def embed_query(self, query: str) -> np.ndarray:
"""E5 nécessite le préfixe 'query:' pour les requêtes."""
return self.model.encode(f"query: {query}")
def embed_document(self, document: str) -> np.ndarray:
"""E5 nécessite le préfixe 'passage:' pour les documents."""
return self.model.encode(f"passage: {document}")
Template 4 : Stratégies de chunking
from typing import List, Tuple
import re
def chunk_by_tokens(
text: str,
chunk_size: int = 512,
chunk_overlap: int = 50,
tokenizer=None
) -> List[str]:
"""Découper le texte par nombre de tokens."""
import tiktoken
tokenizer = tokenizer or tiktoken.get_encoding("cl100k_base")
tokens = tokenizer.encode(text)
chunks = []
start = 0
while start < len(tokens):
end = start + chunk_size
chunk_tokens = tokens[start:end]
chunk_text = tokenizer.decode(chunk_tokens)
chunks.append(chunk_text)
start = end - chunk_overlap
return chunks
def chunk_by_sentences(
text: str,
max_chunk_size: int = 1000,
min_chunk_size: int = 100
) -> List[str]:
"""Découper le texte par phrases, en respectant les limites de taille."""
import nltk
sentences = nltk.sent_tokenize(text)
chunks = []
current_chunk = []
current_size = 0
for sentence in sentences:
sentence_size = len(sentence)
if current_size + sentence_size > max_chunk_size and current_chunk:
chunks.append(" ".join(current_chunk))
current_chunk = []
current_size = 0
current_chunk.append(sentence)
current_size += sentence_size
if current_chunk:
chunks.append(" ".join(current_chunk))
return chunks
def chunk_by_semantic_sections(
text: str,
headers_pattern: str = r'^#{1,3}\s+.+$'
) -> List[Tuple[str, str]]:
"""Découper le markdown par en-têtes, en préservant la hiérarchie."""
lines = text.split('\n')
chunks = []
current_header = ""
current_content = []
for line in lines:
if re.match(headers_pattern, line, re.MULTILINE):
if current_content:
chunks.append((current_header, '\n'.join(current_content)))
current_header = line
current_content = []
else:
current_content.append(line)
if current_content:
chunks.append((current_header, '\n'.join(current_content)))
return chunks
def recursive_character_splitter(
text: str,
chunk_size: int = 1000,
chunk_overlap: int = 200,
separators: List[str] = None
) -> List[str]:
"""Séparateur récursif style LangChain."""
separators = separators or ["\n\n", "\n", ". ", " ", ""]
def split_text(text: str, separators: List[str]) -> List[str]:
if not text:
return []
separator = separators[0]
remaining_separators = separators[1:]
if separator == "":
# Division au niveau des caractères
return [text[i:i+chunk_size] for i in range(0, len(text), chunk_size - chunk_overlap)]
splits = text.split(separator)
chunks = []
current_chunk = []
current_length = 0
for split in splits:
split_length = len(split) + len(separator)
if current_length + split_length > chunk_size and current_chunk:
chunk_text = separator.join(current_chunk)
# Diviser récursivement si toujours trop volumineux
if len(chunk_text) > chunk_size and remaining_separators:
chunks.extend(split_text(chunk_text, remaining_separators))
else:
chunks.append(chunk_text)
# Commencer un nouveau chunk avec chevauchement
overlap_splits = []
overlap_length = 0
for s in reversed(current_chunk):
if overlap_length + len(s) <= chunk_overlap:
overlap_splits.insert(0, s)
overlap_length += len(s)
else:
break
current_chunk = overlap_splits
current_length = overlap_length
current_chunk.append(split)
current_length += split_length
if current_chunk:
chunks.append(separator.join(current_chunk))
return chunks
return split_text(text, separators)
Template 5 : Pipeline d'embedding spécifique au domaine
import re
from typing import List, Optional
from dataclasses import dataclass
@dataclass
class EmbeddedDocument:
id: str
document_id: str
chunk_index: int
text: str
embedding: List[float]
metadata: dict
class DomainEmbeddingPipeline:
"""Pipeline pour les embeddings spécifiques au domaine."""
def __init__(
self,
embedding_model: str = "voyage-3-large",
chunk_size: int = 512,
chunk_overlap: int = 50,
preprocessing_fn=None
):
self.embeddings = VoyageAIEmbeddings(model=embedding_model)
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.preprocess = preprocessing_fn or self._default_preprocess
def _default_preprocess(self, text: str) -> str:
"""Prétraitement par défaut."""
# Supprimer les espaces excessifs
text = re.sub(r'\s+', ' ', text)
# Supprimer les caractères spéciaux (personnaliser pour votre domaine)
text = re.sub(r'[^\w\s.,!?-]', '', text)
return text.strip()
async def process_documents(
self,
documents: List[dict],
id_field: str = "id",
content_field: str = "content",
metadata_fields: Optional[List[str]] = None
) -> List[EmbeddedDocument]:
"""Traiter les documents pour le stockage vectoriel."""
processed = []
for doc in documents:
content = doc[content_field]
doc_id = doc[id_field]
# Prétraiter
cleaned = self.preprocess(content)
# Découper
chunks = chunk_by_tokens(
cleaned,
self.chunk_size,
self.chunk_overlap
)
# Créer les embeddings
embeddings = await self.embeddings.aembed_documents(chunks)
# Créer les enregistrements
for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)):
metadata = {"document_id": doc_id, "chunk_index": i}
# Ajouter les champs de métadonnées spécifiés
if metadata_fields:
for field in metadata_fields:
if field in doc:
metadata[field] = doc[field]
processed.append(EmbeddedDocument(
id=f"{doc_id}_chunk_{i}",
document_id=doc_id,
chunk_index=i,
text=chunk,
embedding=embedding,
metadata=metadata
))
return processed
# Pipeline spécifique au code
class CodeEmbeddingPipeline:
"""Pipeline spécialisé pour les embeddings de code."""
def __init__(self):
# Utiliser le modèle spécifique au code de Voyage
self.embeddings = VoyageAIEmbeddings(model="voyage-code-3")
def chunk_code(self, code: str, language: str) -> List[dict]:
"""Découper le code par fonctions/classes en utilisant tree-sitter."""
try:
import tree_sitter_languages
parser = tree_sitter_languages.get_parser(language)
tree = parser.parse(bytes(code, "utf8"))
chunks = []
# Extraire les définitions de fonctions et de classes
self._extract_nodes(tree.root_node, code, chunks)
return chunks
except ImportError:
# Revenir à un chunking simple
return [{"text": code, "type": "module"}]
def _extract_nodes(self, node, source_code: str, chunks: list):
"""Extraire récursivement les définitions de fonctions/classes."""
if node.type in ['function_definition', 'class_definition', 'method_definition']:
text = source_code[node.start_byte:node.end_byte]
chunks.append({
"text": text,
"type": node.type,
"name": self._get_name(node),
"start_line": node.start_point[0],
"end_line": node.end_point[0]
})
for child in node.children:
self._extract_nodes(child, source_code, chunks)
def _get_name(self, node) -> str:
"""Extraire le nom du nœud fonction/classe."""
for child in node.children:
if child.type == 'identifier' or child.type == 'name':
return child.text.decode('utf8')
return "unknown"
async def embed_with_context(
self,
chunk: str,
context: str = ""
) -> List[float]:
"""Incorporer le code avec le contexte environnant."""
if context:
combined = f"Context: {context}\n\nCode:\n{chunk}"
else:
combined = chunk
return await self.embeddings.aembed_query(combined)
Template 6 : Évaluation de la qualité des embeddings
import numpy as np
from typing import List, Dict
def evaluate_retrieval_quality(
queries: List[str],
relevant_docs: List[List[str]], # Liste des ID de documents pertinents par requête
retrieved_docs: List[List[str]], # Liste des ID de documents récupérés par requête
k: int = 10
) -> Dict[str, float]:
"""Évaluer la qualité des embeddings pour la récupération."""
def precision_at_k(relevant: set, retrieved: List[str], k: int) -> float:
retrieved_k = retrieved[:k]
relevant_retrieved = len(set(retrieved_k) & relevant)
return relevant_retrieved / k if k > 0 else 0
def recall_at_k(relevant: set, retrieved: List[str], k: int) -> float:
retrieved_k = retrieved[:k]
relevant_retrieved = len(set(retrieved_k) & relevant)
return relevant_retrieved / len(relevant) if relevant else 0
def mrr(relevant: set, retrieved: List[str]) -> float:
for i, doc in enumerate(retrieved):
if doc in relevant:
return 1 / (i + 1)
return 0
def ndcg_at_k(relevant: set, retrieved: List[str], k: int) -> float:
dcg = sum(
1 / np.log2(i + 2) if doc in relevant else 0
for i, doc in enumerate(retrieved[:k])
)
ideal_dcg = sum(1 / np.log2(i + 2) for i in range(min(len(relevant), k)))
return dcg / ideal_dcg if ideal_dcg > 0 else 0
metrics = {
f"precision@{k}": [],
f"recall@{k}": [],
"mrr": [],
f"ndcg@{k}": []
}
for relevant, retrieved in zip(relevant_docs, retrieved_docs):
relevant_set = set(relevant)
metrics[f"precision@{k}"].append(precision_at_k(relevant_set, retrieved, k))
metrics[f"recall@{k}"].append(recall_at_k(relevant_set, retrieved, k))
metrics["mrr"].append(mrr(relevant_set, retrieved))
metrics[f"ndcg@{k}"].append(ndcg_at_k(relevant_set, retrieved, k))
return {name: np.mean(values) for name, values in metrics.items()}
def compute_embedding_similarity(
embeddings1: np.ndarray,
embeddings2: np.ndarray,
metric: str = "cosine"
) -> np.ndarray:
"""Calculer la matrice de similarité entre les ensembles d'embeddings."""
if metric == "cosine":
# Normaliser et calculer le produit scalaire
norm1 = embeddings1 / np.linalg.norm(embeddings1, axis=1, keepdims=True)
norm2 = embeddings2 / np.linalg.norm(embeddings2, axis=1, keepdims=True)
return norm1 @ norm2.T
elif metric == "euclidean":
from scipy.spatial.distance import cdist
return -cdist(embeddings1, embeddings2, metric='euclidean')
elif metric == "dot":
return embeddings1 @ embeddings2.T
else:
raise ValueError(f"Unknown metric: {metric}")
def compare_embedding_models(
texts: List[str],
models: Dict[str, callable],
queries: List[str],
relevant_indices: List[List[int]],
k: int = 5
) -> Dict[str, Dict[str, float]]:
"""Comparer plusieurs modèles d'embedding sur la qualité de la récupération."""
results = {}
for model_name, embed_fn in models.items():
# Incorporer tous les textes
doc_embeddings = np.array(embed_fn(texts))
retrieved_per_query = []
for query in queries:
query_embedding = np.array(embed_fn([query])[0])
# Calculer les similarités
similarities = compute_embedding_similarity(
query_embedding.reshape(1, -1),
doc_embeddings,
metric="cosine"
)[0]
# Obtenir les indices des top-k
top_k_indices = np.argsort(similarities)[::-1][:k]
retrieved_per_query.append([str(i) for i in top_k_indices])
# Convertir les indices pertinents en ID de chaînes
relevant_docs = [[str(i) for i in indices] for indices in relevant_indices]
results[model_name] = evaluate_retrieval_quality(
queries, relevant_docs, retrieved_per_query, k
)
return results
Bonnes pratiques
À faire
- Adapter le modèle au cas d'usage : Code vs prose vs multilingue
- Découper intelligemment : Préserver les limites sémantiques
- Normaliser les embeddings : Pour la recherche par similarité cosinus
- Traiter par batch : Plus efficace qu'un par un
- Mettre en cache les embeddings : Éviter de recalculer pour le contenu statique
- Utiliser Voyage AI pour les applications Claude : Recommandé par Anthropic
À éviter
- Ignorer les limites de tokens : La troncature perd des informations
- Mélanger les modèles d'embedding : Espaces vectoriels incompatibles
- Sauter le prétraitement : Garbage in, garbage out
- Sur-découper : Perdre du contexte important
- Oublier les métadonnées : Essentielles pour le filtrage et le débogage