diff --git a/docs/index.rst b/docs/index.rst index 67899c7c..8ad9d72f 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -56,6 +56,7 @@ LOTUS implements the semantic operator programming model and provides an optimiz retriever_models reranker_models multimodal_models + vector_store usage .. toctree:: diff --git a/docs/vector_store.rst b/docs/vector_store.rst new file mode 100644 index 00000000..162dc196 --- /dev/null +++ b/docs/vector_store.rst @@ -0,0 +1,124 @@ +Vector Stores +===================== + +Lotus supports multiple vector store backends for efficient semantic indexing and search. This document describes how to use and configure the available vector stores, including Qdrant, Faiss, and Weaviate. + +Supported Vector Stores +---------------------- +- QdrantVS +- FaissVS +- WeaviateVS + +QdrantVS +-------- + +**Installation** +^^^^^^^^^^^^^^^^ +Install the Qdrant client and Lotus with Qdrant support: + +.. code-block:: bash + + pip install qdrant-client lotus[qdrant] + +**Running Qdrant** +^^^^^^^^^^^^^^^^^^ +You can run Qdrant locally using Docker: + +.. code-block:: bash + + docker run -p 6333:6333 -p 6334:6334 \ + -v "$(pwd)/qdrant_storage:/qdrant/storage:z" \ + qdrant/qdrant + +**Example Usage** +^^^^^^^^^^^^^^^^^ + +.. code-block:: python + + import pandas as pd + from qdrant_client import QdrantClient + import lotus + from lotus.models import LiteLLMRM # or SentenceTransformersRM + from lotus.vector_store import QdrantVS + + # Start Qdrant server before running this code + client = QdrantClient(url="http://localhost:6333") + rm = LiteLLMRM(model="text-embedding-3-small") + vs = QdrantVS(client) + lotus.settings.configure(rm=rm, vs=vs) + + data = {"Course Name": ["Machine Learning 101", "Introduction to Cooking"]} + df = pd.DataFrame(data) + df = df.sem_index("Course Name", "my_qdrant_index") + result = df.sem_search("Course Name", "Find the course about machine learning", K=1) + print(result) + +FaissVS +------- + +**Installation** +^^^^^^^^^^^^^^^^ + +.. code-block:: bash + + pip install faiss-cpu lotus + +**Example Usage** +^^^^^^^^^^^^^^^^^ + +.. code-block:: python + + import pandas as pd + import lotus + from lotus.models import LiteLLMRM + from lotus.vector_store import FaissVS + + rm = LiteLLMRM(model="text-embedding-3-small") + vs = FaissVS() + lotus.settings.configure(rm=rm, vs=vs) + + data = {"Course Name": ["Machine Learning 101", "Introduction to Cooking"]} + df = pd.DataFrame(data) + df = df.sem_index("Course Name", "my_faiss_index") + result = df.sem_search("Course Name", "Find the course about machine learning", K=1) + print(result) + +WeaviateVS +---------- + +**Installation** +^^^^^^^^^^^^^^^^ + +.. code-block:: bash + + pip install weaviate-client lotus[weaviate] + +**Running Weaviate** +^^^^^^^^^^^^^^^^^^^^ +You can run Weaviate locally using Docker: + +.. code-block:: bash + + docker run -p 8080:8080 -p 50051:50051 cr.weaviate.io/semitechnologies/weaviate:1.29.1 + +**Example Usage** +^^^^^^^^^^^^^^^^^ + +.. code-block:: python + + import pandas as pd + import weaviate + import lotus + from lotus.models import LiteLLMRM + from lotus.vector_store import WeaviateVS + + client = weaviate.Client("http://localhost:8080") + rm = LiteLLMRM(model="text-embedding-3-small") + vs = WeaviateVS(client) + lotus.settings.configure(rm=rm, vs=vs) + + data = {"Course Name": ["Machine Learning 101", "Introduction to Cooking"]} + df = pd.DataFrame(data) + df = df.sem_index("Course Name", "my_weaviate_index") + result = df.sem_search("Course Name", "Find the course about machine learning", K=1) + print(result) diff --git a/examples/vs_examples/search_qdrant.py b/examples/vs_examples/search_qdrant.py new file mode 100644 index 00000000..de829d18 --- /dev/null +++ b/examples/vs_examples/search_qdrant.py @@ -0,0 +1,42 @@ +import pandas as pd +from qdrant_client import QdrantClient + +import lotus +from lotus.models import SentenceTransformersRM +from lotus.vector_store import QdrantVS + +# Run this command to start the qdrant server +# docker run -p 6333:6333 -p 6334:6334 \ +# -v "$(pwd)/qdrant_storage:/qdrant/storage:z" \ +# qdrant/qdrant +client = QdrantClient(url="http://localhost:6333") +rm = SentenceTransformersRM(model="intfloat/e5-base-v2") +vs = QdrantVS(client) + +lotus.settings.configure(rm=rm, vs=vs) +data = { + "Course Name": [ + "Probability and Random Processes", + "Optimization Methods in Engineering", + "Digital Design and Integrated Circuits", + "Computer Security", + "Introduction to Computer Science", + "Introduction to Data Science", + "Introduction to Machine Learning", + "Introduction to Artificial Intelligence", + "Introduction to Robotics", + "Introduction to Computer Vision", + "Introduction to Natural Language Processing", + "Introduction to Reinforcement Learning", + "Introduction to Deep Learning", + "Introduction to Computer Networks", + ] +} +df = pd.DataFrame(data) + +df = df.sem_index("Course Name", "index_dir").sem_search( + "Course Name", + "Which course name is most related to machine learning?", + K=8, +) +print(df) diff --git a/lotus/vector_store/__init__.py b/lotus/vector_store/__init__.py index f207eebb..c6aa0c8a 100644 --- a/lotus/vector_store/__init__.py +++ b/lotus/vector_store/__init__.py @@ -1,5 +1,6 @@ from lotus.vector_store.vs import VS from lotus.vector_store.faiss_vs import FaissVS from lotus.vector_store.weaviate_vs import WeaviateVS +from lotus.vector_store.qdrant_vs import QdrantVS -__all__ = ["VS", "FaissVS", "WeaviateVS"] +__all__ = ["VS", "FaissVS", "WeaviateVS", "QdrantVS"] diff --git a/lotus/vector_store/qdrant_vs.py b/lotus/vector_store/qdrant_vs.py new file mode 100644 index 00000000..6214fcea --- /dev/null +++ b/lotus/vector_store/qdrant_vs.py @@ -0,0 +1,169 @@ +from typing import Any + +import numpy as np +from numpy.typing import NDArray + +from lotus.types import RMOutput +from lotus.vector_store.vs import VS + +try: + from qdrant_client import QdrantClient + from qdrant_client.http import models +except ImportError: + QdrantClient = None + + +class QdrantVS(VS): + def __init__(self, client, max_batch_size: int = 128): + if QdrantClient is None: + raise ImportError("Please install the qdrant client using `pip install lotus[qdrant]`") + + super().__init__() + self.client = client + self.max_batch_size = max_batch_size + + self.index_dir: str | None = None + self.embedding_dim: int | None = None + + def index(self, docs: list[str], embeddings: NDArray[np.float64], index_dir: str, **kwargs: dict[str, Any]): + """Create a collection and add documents with their embeddings""" + self.index_dir = index_dir + self.embedding_dim = np.reshape(embeddings, (len(embeddings), -1)).shape[1] + + # Delete collection if it already exists + try: + self.client.delete_collection(collection_name=index_dir) + except Exception: + pass + + # Create the collection with appropriate settings + self.client.create_collection( + collection_name=index_dir, + vectors_config=models.VectorParams( + size=self.embedding_dim, + distance=models.Distance.COSINE, + ), + ) + + # Prepare points to add to the collection + points = [] + for idx, (doc, embedding) in enumerate(zip(docs, embeddings)): + points.append( + models.PointStruct( + id=idx, + vector=embedding.tolist(), + payload={"content": doc, "doc_id": idx}, + ) + ) + + # Add points to the collection in batches + for i in range(0, len(points), self.max_batch_size): + batch = points[i : i + self.max_batch_size] + self.client.upsert( + collection_name=index_dir, + points=batch, + wait=True, + ) + + def load_index(self, index_dir: str): + """Load/set the collection name to use""" + self.index_dir = index_dir + + # Verify collection exists + collections = self.client.get_collections().collections + collection_names = [collection.name for collection in collections] + + if index_dir not in collection_names: + raise ValueError(f"Collection {index_dir} not found") + + # Get vector size for future reference + collection_info = self.client.get_collection(collection_name=index_dir) + vectors = collection_info.config.params.vectors + if isinstance(vectors, dict): + self.embedding_dim = next(iter(vectors.values())).size + else: + self.embedding_dim = vectors.size + + def __call__( + self, query_vectors: NDArray[np.float64], K: int, ids: list[int] | None = None, **kwargs: dict[str, Any] + ) -> RMOutput: + """Perform vector search using pre-computed query vectors""" + if self.index_dir is None: + raise ValueError("No collection loaded. Call load_index first.") + + results = [] + for query_vector in query_vectors: + # Create a filter for specific IDs if provided + id_filter = None + if ids is not None: + id_filter = models.Filter( + must=[ + models.FieldCondition( + key="doc_id", + match=models.MatchAny(any=ids), + ) + ] + ) + + # Perform the search + search_result = self.client.search( + collection_name=self.index_dir, + query_vector=query_vector.tolist(), + limit=K, + query_filter=id_filter, + with_payload=True, + ) + results.append(search_result) + + # Process results into expected format + all_distances = [] + all_indices = [] + + for result in results: + distances = [] + indices = [] + + for scored_point in result: + # Get document ID + doc_id = scored_point.payload.get("doc_id", -1) + indices.append(doc_id) + + # Convert score to similarity (Qdrant returns a similarity score already) + similarity = scored_point.score if scored_point.score is not None else 0.0 + distances.append(similarity) + + # Pad results if fewer than K matches + while len(indices) < K: + indices.append(-1) + distances.append(0.0) + + all_distances.append(distances) + all_indices.append(indices) + + return RMOutput( + distances=np.array(all_distances, dtype=np.float32).tolist(), # type: ignore + indices=np.array(all_indices, dtype=np.int64).tolist(), # type: ignore + ) + + def get_vectors_from_index(self, index_dir: str, ids: list[int]) -> NDArray[np.float64]: + """Retrieve vectors from Qdrant collection given specific ids""" + if self.index_dir != index_dir: + self.load_index(index_dir) + + # Retrieve points by IDs + points = self.client.retrieve( + collection_name=index_dir, + ids=ids, + with_vectors=True, + ) + + # Extract vectors and ensure order matches the input ids + assert self.embedding_dim is not None + vectors = np.zeros((len(ids), self.embedding_dim), dtype=np.float64) + id_to_idx = {id: idx for idx, id in enumerate(ids)} + + for point in points: + if point.id in id_to_idx: + vectors[id_to_idx[point.id]] = np.array(point.vector, dtype=np.float64) + + return vectors diff --git a/pyproject.toml b/pyproject.toml index 447fe882..4cf66e63 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -56,6 +56,9 @@ file_extractor = [ weaviate = [ "weaviate-client", ] +qdrant = [ + "qdrant-client", +] data_connectors = [ "sqlalchemy", "boto3", diff --git a/tests/test_qdrant.py b/tests/test_qdrant.py new file mode 100644 index 00000000..43069e3a --- /dev/null +++ b/tests/test_qdrant.py @@ -0,0 +1,146 @@ +import pandas as pd +import pytest +from qdrant_client import QdrantClient + +import lotus +from lotus.models import LM, LiteLLMRM +from lotus.types import CascadeArgs, ProxyModel +from lotus.vector_store import QdrantVS +from tests.base_test import BaseTest + + +@pytest.fixture(scope="module") +def qdrant_vs(): + client = QdrantClient(url="http://localhost:6333") + vs = QdrantVS(client) + return vs + + +@pytest.fixture(scope="module") +def rm(): + return LiteLLMRM(model="text-embedding-3-small") + + +@pytest.fixture +def sample_df(): + return pd.DataFrame( + { + "Course Name": [ + "Introduction to Cooking", + "Advanced Cooking", + "Basic Mathematics", + "Advanced Mathematics", + "Machine Learning 101", + "Deep Learning for Beginners", + "History of Art", + "Modern Art Techniques", + ], + "Department": ["Culinary", "Culinary", "Math", "Math", "CS", "CS", "Art", "Art"], + "Level": [100, 200, 100, 200, 300, 400, 100, 200], + } + ) + + +class TestQdrantVS(BaseTest): + @pytest.fixture(autouse=True) + def setup_vs(self, rm, qdrant_vs): + lotus.settings.configure(rm=rm, vs=qdrant_vs) + + def test_index_creation_and_reload(self, sample_df): + df = sample_df.copy() + df = df.sem_index("Course Name", "qdrant_test_index") + # Now reload the index and check it works + lotus.settings.vs.load_index("qdrant_test_index") + # Should not raise + assert lotus.settings.vs.index_dir == "qdrant_test_index" + + def test_simple_sem_search(self, sample_df): + df = sample_df.copy().sem_index("Course Name", "qdrant_search_index") + result = df.sem_search("Course Name", "Find the course about machine learning", K=1) + assert len(result) == 1 + assert result["Course Name"].iloc[0] == "Machine Learning 101" + + def test_sem_search_with_filter(self, sample_df): + df = sample_df.copy().sem_index("Course Name", "qdrant_filter_index") + filtered_df = df[df["Department"] == "CS"] + result = filtered_df.sem_search("Course Name", "Find the course about deep learning", K=1) + assert len(result) == 1 + assert result["Course Name"].iloc[0] == "Deep Learning for Beginners" + assert all(dept == "CS" for dept in result["Department"]) + + def test_sem_join(self, sample_df): + left = pd.DataFrame({"Skill": ["Machine Learning", "Cooking"]}) + right = sample_df.copy().sem_index("Course Name", "qdrant_join_index") + joined = left.sem_sim_join(right, left_on="Skill", right_on="Course Name", K=1) + assert len(joined) == 2 + # Each skill should match the most obvious course(s) + expected = { + "Machine Learning": ["Machine Learning 101"], + "Cooking": ["Introduction to Cooking", "Advanced Cooking"], + } + for _, row in joined.iterrows(): + assert any(exp in row["Course Name"] for exp in expected[row["Skill"]]) + + def test_sem_cluster_by(self): + df = pd.DataFrame( + { + "Item": [ + "Apple", + "Banana", + "Orange", + "Cat", + "Dog", + "Tiger", + ], + "Category": [ + "Fruit", + "Fruit", + "Fruit", + "Animal", + "Animal", + "Animal", + ], + } + ) + df = df.sem_index("Item", "qdrant_cluster_index") + clustered = df.sem_cluster_by("Item", 2) + + # Map cluster ids to categories + cluster_map = clustered.groupby("Category")["cluster_id"].agg(lambda x: x.mode()[0]) + # All items in the same category should have the same cluster id + for category, group in clustered.groupby("Category"): + cluster_ids = group["cluster_id"].unique() + assert len(cluster_ids) == 1, f"Category {category} split across clusters: {cluster_ids}" + # The two categories should be assigned to different clusters + assert cluster_map.nunique() == 2, f"Both categories mapped to the same cluster: {cluster_map}" + + def test_sem_filter_cascade(self): + lm = LM(model="gpt-4o-mini") + lotus.settings.configure(lm=lm) + df = pd.DataFrame( + { + "Course Name": [ + "Introduction to Cooking", + "Advanced Cooking", + "Basic Mathematics", + "Advanced Mathematics", + "Machine Learning 101", + "Deep Learning for Beginners", + "History of Art", + "Modern Art Techniques", + ] + } + ) + df = df.sem_index("Course Name", "qdrant_filter_cascade_index") + cascade_args = CascadeArgs( + recall_target=0.9, + precision_target=0.9, + sampling_percentage=0.5, + failure_probability=0.2, + proxy_model=ProxyModel.EMBEDDING_MODEL, + ) + filtered_df, stats = df.sem_filter( + user_instruction="{Course Name} is about cooking", cascade_args=cascade_args, return_stats=True + ) + print(filtered_df) + print(stats)