similarity-search-patterns

Par wshobson · agents

Implémentez une recherche par similarité efficace avec des bases de données vectorielles. À utiliser pour construire une recherche sémantique, implémenter des requêtes de plus proches voisins ou optimiser les performances de récupération.

npx skills add https://github.com/wshobson/agents --skill similarity-search-patterns

Patterns de Recherche par Similarité

Patterns pour implémenter une recherche par similarité efficace dans les systèmes de production.

Quand Utiliser Cette Skill

  • Construire des systèmes de recherche sémantique
  • Implémenter la récupération RAG
  • Créer des moteurs de recommandation
  • Optimiser la latence de recherche
  • Passer à l'échelle sur des millions de vecteurs
  • Combiner la recherche sémantique et par mots-clés

Concepts Fondamentaux

1. Métriques de Distance

Métrique Formule Idéale Pour
Cosine 1 - (A·B)/(‖A‖‖B‖) Embeddings normalisés
Euclidienne (L2) √Σ(a-b)² Embeddings bruts
Produit Scalaire A·B La magnitude compte
Manhattan (L1) Σ |a-b| Vecteurs creux

2. Types d'Index

┌─────────────────────────────────────────────────┐
│                 Types d'Index                    │
├─────────────┬───────────────┬───────────────────┤
│    Flat     │     HNSW      │    IVF+PQ         │
│ (Exact)     │ (Graph-based) │ (Quantized)       │
├─────────────┼───────────────┼───────────────────┤
│ O(n) search │ O(log n)      │ O(√n)             │
│ 100% recall │ ~95-99%       │ ~90-95%           │
│ Small data  │ Medium-Large  │ Very Large        │
└─────────────┴───────────────┴───────────────────┘

Templates

Template 1 : Implémentation Pinecone

from pinecone import Pinecone, ServerlessSpec
from typing import List, Dict, Optional
import hashlib

class PineconeVectorStore:
    def __init__(
        self,
        api_key: str,
        index_name: str,
        dimension: int = 1536,
        metric: str = "cosine"
    ):
        self.pc = Pinecone(api_key=api_key)

        # Create index if not exists
        if index_name not in self.pc.list_indexes().names():
            self.pc.create_index(
                name=index_name,
                dimension=dimension,
                metric=metric,
                spec=ServerlessSpec(cloud="aws", region="us-east-1")
            )

        self.index = self.pc.Index(index_name)

    def upsert(
        self,
        vectors: List[Dict],
        namespace: str = ""
    ) -> int:
        """
        Upsert vectors.
        vectors: [{"id": str, "values": List[float], "metadata": dict}]
        """
        # Batch upsert
        batch_size = 100
        total = 0

        for i in range(0, len(vectors), batch_size):
            batch = vectors[i:i + batch_size]
            self.index.upsert(vectors=batch, namespace=namespace)
            total += len(batch)

        return total

    def search(
        self,
        query_vector: List[float],
        top_k: int = 10,
        namespace: str = "",
        filter: Optional[Dict] = None,
        include_metadata: bool = True
    ) -> List[Dict]:
        """Rechercher des vecteurs similaires."""
        results = self.index.query(
            vector=query_vector,
            top_k=top_k,
            namespace=namespace,
            filter=filter,
            include_metadata=include_metadata
        )

        return [
            {
                "id": match.id,
                "score": match.score,
                "metadata": match.metadata
            }
            for match in results.matches
        ]

    def search_with_rerank(
        self,
        query: str,
        query_vector: List[float],
        top_k: int = 10,
        rerank_top_n: int = 50,
        namespace: str = ""
    ) -> List[Dict]:
        """Rechercher et réclasser les résultats."""
        # Over-fetch for reranking
        initial_results = self.search(
            query_vector,
            top_k=rerank_top_n,
            namespace=namespace
        )

        # Rerank with cross-encoder or LLM
        reranked = self._rerank(query, initial_results)

        return reranked[:top_k]

    def _rerank(self, query: str, results: List[Dict]) -> List[Dict]:
        """Réclasser les résultats avec un cross-encoder."""
        from sentence_transformers import CrossEncoder

        model = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2')

        pairs = [(query, r["metadata"]["text"]) for r in results]
        scores = model.predict(pairs)

        for result, score in zip(results, scores):
            result["rerank_score"] = float(score)

        return sorted(results, key=lambda x: x["rerank_score"], reverse=True)

    def delete(self, ids: List[str], namespace: str = ""):
        """Supprimer des vecteurs par ID."""
        self.index.delete(ids=ids, namespace=namespace)

    def delete_by_filter(self, filter: Dict, namespace: str = ""):
        """Supprimer les vecteurs correspondant au filtre."""
        self.index.delete(filter=filter, namespace=namespace)

Template 2 : Implémentation Qdrant

from qdrant_client import QdrantClient
from qdrant_client.http import models
from typing import List, Dict, Optional

class QdrantVectorStore:
    def __init__(
        self,
        url: str = "localhost",
        port: int = 6333,
        collection_name: str = "documents",
        vector_size: int = 1536
    ):
        self.client = QdrantClient(url=url, port=port)
        self.collection_name = collection_name

        # Create collection if not exists
        collections = self.client.get_collections().collections
        if collection_name not in [c.name for c in collections]:
            self.client.create_collection(
                collection_name=collection_name,
                vectors_config=models.VectorParams(
                    size=vector_size,
                    distance=models.Distance.COSINE
                ),
                # Optional: enable quantization for memory efficiency
                quantization_config=models.ScalarQuantization(
                    scalar=models.ScalarQuantizationConfig(
                        type=models.ScalarType.INT8,
                        quantile=0.99,
                        always_ram=True
                    )
                )
            )

    def upsert(self, points: List[Dict]) -> int:
        """
        Upsert points.
        points: [{"id": str/int, "vector": List[float], "payload": dict}]
        """
        qdrant_points = [
            models.PointStruct(
                id=p["id"],
                vector=p["vector"],
                payload=p.get("payload", {})
            )
            for p in points
        ]

        self.client.upsert(
            collection_name=self.collection_name,
            points=qdrant_points
        )
        return len(points)

    def search(
        self,
        query_vector: List[float],
        limit: int = 10,
        filter: Optional[models.Filter] = None,
        score_threshold: Optional[float] = None
    ) -> List[Dict]:
        """Rechercher des vecteurs similaires."""
        results = self.client.search(
            collection_name=self.collection_name,
            query_vector=query_vector,
            limit=limit,
            query_filter=filter,
            score_threshold=score_threshold
        )

        return [
            {
                "id": r.id,
                "score": r.score,
                "payload": r.payload
            }
            for r in results
        ]

    def search_with_filter(
        self,
        query_vector: List[float],
        must_conditions: List[Dict] = None,
        should_conditions: List[Dict] = None,
        must_not_conditions: List[Dict] = None,
        limit: int = 10
    ) -> List[Dict]:
        """Rechercher avec des filtres complexes."""
        conditions = []

        if must_conditions:
            conditions.extend([
                models.FieldCondition(
                    key=c["key"],
                    match=models.MatchValue(value=c["value"])
                )
                for c in must_conditions
            ])

        filter = models.Filter(must=conditions) if conditions else None

        return self.search(query_vector, limit=limit, filter=filter)

    def search_with_sparse(
        self,
        dense_vector: List[float],
        sparse_vector: Dict[int, float],
        limit: int = 10,
        dense_weight: float = 0.7
    ) -> List[Dict]:
        """Recherche hybride avec vecteurs denses et creux."""
        # Requires collection with named vectors
        results = self.client.search(
            collection_name=self.collection_name,
            query_vector=models.NamedVector(
                name="dense",
                vector=dense_vector
            ),
            limit=limit
        )
        return [{"id": r.id, "score": r.score, "payload": r.payload} for r in results]

Template 3 : pgvector avec PostgreSQL

import asyncpg
from typing import List, Dict, Optional
import numpy as np

class PgVectorStore:
    def __init__(self, connection_string: str):
        self.connection_string = connection_string

    async def init(self):
        """Initialiser le pool de connexion et l'extension."""
        self.pool = await asyncpg.create_pool(self.connection_string)

        async with self.pool.acquire() as conn:
            # Enable extension
            await conn.execute("CREATE EXTENSION IF NOT EXISTS vector")

            # Create table
            await conn.execute("""
                CREATE TABLE IF NOT EXISTS documents (
                    id TEXT PRIMARY KEY,
                    content TEXT,
                    metadata JSONB,
                    embedding vector(1536)
                )
            """)

            # Create index (HNSW for better performance)
            await conn.execute("""
                CREATE INDEX IF NOT EXISTS documents_embedding_idx
                ON documents
                USING hnsw (embedding vector_cosine_ops)
                WITH (m = 16, ef_construction = 64)
            """)

    async def upsert(self, documents: List[Dict]):
        """Upsert des documents avec embeddings."""
        async with self.pool.acquire() as conn:
            await conn.executemany(
                """
                INSERT INTO documents (id, content, metadata, embedding)
                VALUES ($1, $2, $3, $4)
                ON CONFLICT (id) DO UPDATE SET
                    content = EXCLUDED.content,
                    metadata = EXCLUDED.metadata,
                    embedding = EXCLUDED.embedding
                """,
                [
                    (
                        doc["id"],
                        doc["content"],
                        doc.get("metadata", {}),
                        np.array(doc["embedding"]).tolist()
                    )
                    for doc in documents
                ]
            )

    async def search(
        self,
        query_embedding: List[float],
        limit: int = 10,
        filter_metadata: Optional[Dict] = None
    ) -> List[Dict]:
        """Rechercher des documents similaires."""
        query = """
            SELECT id, content, metadata,
                   1 - (embedding <=> $1::vector) as similarity
            FROM documents
        """

        params = [query_embedding]

        if filter_metadata:
            conditions = []
            for key, value in filter_metadata.items():
                params.append(value)
                conditions.append(f"metadata->>'{key}' = ${len(params)}")
            query += " WHERE " + " AND ".join(conditions)

        query += f" ORDER BY embedding <=> $1::vector LIMIT ${len(params) + 1}"
        params.append(limit)

        async with self.pool.acquire() as conn:
            rows = await conn.fetch(query, *params)

        return [
            {
                "id": row["id"],
                "content": row["content"],
                "metadata": row["metadata"],
                "score": row["similarity"]
            }
            for row in rows
        ]

    async def hybrid_search(
        self,
        query_embedding: List[float],
        query_text: str,
        limit: int = 10,
        vector_weight: float = 0.5
    ) -> List[Dict]:
        """Recherche hybride combinant vecteur et texte complet."""
        async with self.pool.acquire() as conn:
            rows = await conn.fetch(
                """
                WITH vector_results AS (
                    SELECT id, content, metadata,
                           1 - (embedding <=> $1::vector) as vector_score
                    FROM documents
                    ORDER BY embedding <=> $1::vector
                    LIMIT $3 * 2
                ),
                text_results AS (
                    SELECT id, content, metadata,
                           ts_rank(to_tsvector('english', content),
                                   plainto_tsquery('english', $2)) as text_score
                    FROM documents
                    WHERE to_tsvector('english', content) @@ plainto_tsquery('english', $2)
                    LIMIT $3 * 2
                )
                SELECT
                    COALESCE(v.id, t.id) as id,
                    COALESCE(v.content, t.content) as content,
                    COALESCE(v.metadata, t.metadata) as metadata,
                    COALESCE(v.vector_score, 0) * $4 +
                    COALESCE(t.text_score, 0) * (1 - $4) as combined_score
                FROM vector_results v
                FULL OUTER JOIN text_results t ON v.id = t.id
                ORDER BY combined_score DESC
                LIMIT $3
                """,
                query_embedding, query_text, limit, vector_weight
            )

        return [dict(row) for row in rows]

Template 4 : Implémentation Weaviate

import weaviate
from weaviate.util import generate_uuid5
from typing import List, Dict, Optional

class WeaviateVectorStore:
    def __init__(
        self,
        url: str = "http://localhost:8080",
        class_name: str = "Document"
    ):
        self.client = weaviate.Client(url=url)
        self.class_name = class_name
        self._ensure_schema()

    def _ensure_schema(self):
        """Créer le schéma s'il n'existe pas."""
        schema = {
            "class": self.class_name,
            "vectorizer": "none",  # We provide vectors
            "properties": [
                {"name": "content", "dataType": ["text"]},
                {"name": "source", "dataType": ["string"]},
                {"name": "chunk_id", "dataType": ["int"]}
            ]
        }

        if not self.client.schema.exists(self.class_name):
            self.client.schema.create_class(schema)

    def upsert(self, documents: List[Dict]):
        """Upsert en batch des documents."""
        with self.client.batch as batch:
            batch.batch_size = 100

            for doc in documents:
                batch.add_data_object(
                    data_object={
                        "content": doc["content"],
                        "source": doc.get("source", ""),
                        "chunk_id": doc.get("chunk_id", 0)
                    },
                    class_name=self.class_name,
                    uuid=generate_uuid5(doc["id"]),
                    vector=doc["embedding"]
                )

    def search(
        self,
        query_vector: List[float],
        limit: int = 10,
        where_filter: Optional[Dict] = None
    ) -> List[Dict]:
        """Recherche par vecteur."""
        query = (
            self.client.query
            .get(self.class_name, ["content", "source", "chunk_id"])
            .with_near_vector({"vector": query_vector})
            .with_limit(limit)
            .with_additional(["distance", "id"])
        )

        if where_filter:
            query = query.with_where(where_filter)

        results = query.do()

        return [
            {
                "id": item["_additional"]["id"],
                "content": item["content"],
                "source": item["source"],
                "score": 1 - item["_additional"]["distance"]
            }
            for item in results["data"]["Get"][self.class_name]
        ]

    def hybrid_search(
        self,
        query: str,
        query_vector: List[float],
        limit: int = 10,
        alpha: float = 0.5  # 0 = keyword, 1 = vector
    ) -> List[Dict]:
        """Recherche hybride combinant BM25 et vecteur."""
        results = (
            self.client.query
            .get(self.class_name, ["content", "source"])
            .with_hybrid(query=query, vector=query_vector, alpha=alpha)
            .with_limit(limit)
            .with_additional(["score"])
            .do()
        )

        return [
            {
                "content": item["content"],
                "source": item["source"],
                "score": item["_additional"]["score"]
            }
            for item in results["data"]["Get"][self.class_name]
        ]

Bonnes Pratiques

À Faire

  • Utiliser l'index approprié - HNSW dans la plupart des cas
  • Ajuster les paramètres - ef_search, nprobe pour recall/speed
  • Implémenter la recherche hybride - Combiner avec la recherche par mots-clés
  • Monitorer le recall - Mesurer la qualité de la recherche
  • Pré-filtrer quand possible - Réduire l'espace de recherche

À Éviter

  • Ne pas ignorer l'évaluation - Mesurer avant d'optimiser
  • Ne pas sur-indexer - Commencer par flat, passer à l'échelle progressivement
  • Ne pas ignorer la latence - P99 compte pour l'UX
  • Ne pas oublier les coûts - Le stockage de vecteurs s'ajoute

Skills similaires