diff --git a/.github/tests/rm_tests.py b/.github/tests/rm_tests.py index dc65b178..d4a06ef8 100644 --- a/.github/tests/rm_tests.py +++ b/.github/tests/rm_tests.py @@ -5,7 +5,7 @@ import lotus from lotus.models import CrossEncoderReranker, LiteLLMRM, SentenceTransformersRM -from lotus.vector_store import FaissVS +from lotus.vector_store import FaissVS, PineconeVS ################################################################################ # Setup @@ -33,6 +33,7 @@ VECTOR_STORE_TO_CLS = { 'local': FaissVS, + 'pinecone': PineconeVS, } @@ -169,7 +170,7 @@ def test_dedup(setup_models): ################################################################################ -@pytest.mark.parametrize("vs", VECTOR_STORE_TO_CLS.keys()) +@pytest.mark.parametrize("vs", [key for key in VECTOR_STORE_TO_CLS.keys() if key != "pinecone"]) @pytest.mark.parametrize("model", get_enabled("intfloat/e5-small-v2", "text-embedding-3-small")) def test_vs_cluster_by(setup_models, setup_vs, vs, model): rm = setup_models[model] @@ -219,7 +220,7 @@ def test_vs_search_rm_only(setup_models, setup_vs, vs, model): df = df.sem_search("Course Name", "Optimization", K=1) assert df["Course Name"].tolist() == ["Optimization Methods in Engineering"] -@pytest.mark.parametrize("vs", VECTOR_STORE_TO_CLS.keys()) +@pytest.mark.parametrize("vs", [vs for vs in VECTOR_STORE_TO_CLS.keys() if vs != 'pinecone']) @pytest.mark.parametrize("model", get_enabled("intfloat/e5-small-v2", "text-embedding-3-small")) def test_vs_sim_join(setup_models, setup_vs, vs, model): rm = setup_models[model] @@ -252,7 +253,7 @@ def test_vs_sim_join(setup_models, setup_vs, vs, model): "intfloat/e5-small-v2" not in ENABLED_MODEL_NAMES, reason="Skipping test because intfloat/e5-small-v2 is not enabled", ) -@pytest.mark.parametrize("vs", VECTOR_STORE_TO_CLS.keys()) +@pytest.mark.parametrize("vs", [key for key in VECTOR_STORE_TO_CLS.keys() if key != 'pinecone']) def test_vs_dedup(setup_models, setup_vs, vs): rm = setup_models["intfloat/e5-small-v2"] my_vs = setup_vs[vs] @@ -320,8 +321,9 @@ def test_search(setup_models): df = df.sem_search("Course Name", "Optimization", K=2, n_rerank=1) assert df["Course Name"].tolist() == ["Optimization Methods in Engineering"] +@pytest.mark.parametrize("vs", VECTOR_STORE_TO_CLS.keys()) @pytest.mark.parametrize("model", get_enabled("intfloat/e5-small-v2", "text-embedding-3-small")) -def test_filtered_vector_search(setup_models, model): +def test_filtered_vector_search(setup_models, setup_vs, vs, model): """ Test filtered vector search. @@ -336,7 +338,7 @@ def test_filtered_vector_search(setup_models, model): expected to pick out the culinary course "Gourmet Cooking Advanced". """ rm = setup_models[model] - vs = FaissVS() + vs = setup_vs[vs] lotus.settings.configure(rm=rm, vs=vs) data = { @@ -355,7 +357,7 @@ def test_filtered_vector_search(setup_models, model): } df = pd.DataFrame(data) # Index the 'Course Name' column to generate semantic embeddings. - df = df.sem_index("Course Name", "filtered_index_dir") + df = df.sem_index("Course Name", "filteredindexdir") # Filter the DataFrame to only include Culinary courses. df_filtered = df[df["Category"] == "Culinary"] # Perform semantic search on the filtered DataFrame. diff --git a/lotus/models/rm.py b/lotus/models/rm.py index 37a73c22..11855735 100644 --- a/lotus/models/rm.py +++ b/lotus/models/rm.py @@ -34,4 +34,4 @@ def convert_query_to_query_vector(self, queries: Union[pd.Series, str, Image.Ima queries = queries.tolist() # Create embeddings for text queries query_vectors = self._embed(queries) - return query_vectors \ No newline at end of file + return query_vectors diff --git a/lotus/sem_ops/sem_search.py b/lotus/sem_ops/sem_search.py index f470c2cc..4cdf973d 100644 --- a/lotus/sem_ops/sem_search.py +++ b/lotus/sem_ops/sem_search.py @@ -61,7 +61,6 @@ def __call__( df_idxs = self._obj.index cur_min = len(df_idxs) - K = min(K, cur_min) search_K = K diff --git a/lotus/sem_ops/sem_sim_join.py b/lotus/sem_ops/sem_sim_join.py index b4733a51..0ea6bc6b 100644 --- a/lotus/sem_ops/sem_sim_join.py +++ b/lotus/sem_ops/sem_sim_join.py @@ -86,6 +86,7 @@ def __call__( right_ids = list(other.index) vs_output: RMOutput = vs(query_vectors, K, ids=right_ids) + print(f'vs_output: {vs_output}') distances = vs_output.distances indices = vs_output.indices diff --git a/lotus/vector_store/__init__.py b/lotus/vector_store/__init__.py index f1f130da..69f5ddc1 100644 --- a/lotus/vector_store/__init__.py +++ b/lotus/vector_store/__init__.py @@ -1,4 +1,4 @@ from lotus.vector_store.vs import VS from lotus.vector_store.faiss_vs import FaissVS - -__all__ = ["VS", "FaissVS"] +from lotus.vector_store.pinecone_vs import PineconeVS +__all__ = ["VS", "FaissVS", "PineconeVS"] diff --git a/lotus/vector_store/pinecone_vs.py b/lotus/vector_store/pinecone_vs.py new file mode 100644 index 00000000..0bbeeef0 --- /dev/null +++ b/lotus/vector_store/pinecone_vs.py @@ -0,0 +1,150 @@ +from typing import Any, Optional + +import numpy as np +import pandas as pd +from numpy.typing import NDArray +from tqdm import tqdm + +from lotus.types import RMOutput +from lotus.vector_store.vs import VS + +try: + from pinecone import Index, Pinecone, ServerlessSpec +except ImportError as err: + raise ImportError( + "The pinecone library is required to use PineconeVS. Install it with `pip install pinecone`", + ) from err + +class PineconeVS(VS): + def __init__(self, max_batch_size: int = 64): + + api_key = 'pcsk_45ecSY_CW62eJeL4jwj6dUfaqM6j9dL3uwK12rudednzGisWMxJv9bHH2DLz6tWoY91W84' + + """Initialize Pinecone client with API key and environment""" + super() + self.pinecone = Pinecone(api_key=api_key) + self.pc_index:Index | None = None + self.max_batch_size = max_batch_size + + def __del__(self): + return + + + def index(self, docs: pd.Series, embeddings: Any, index_dir: str, **kwargs: dict[str, Any]): + """Create an index and add documents to it""" + self.index_dir = index_dir + + dimension = embeddings.shape[1] + + # Check if index already exists + if index_dir not in self.pinecone.list_indexes().names(): + # Create new index with the correct dimension + self.pinecone.create_index( + name=index_dir, + dimension=dimension, + metric="cosine", + spec=ServerlessSpec( + cloud='aws', + region='us-east-1' + ) + ) + elif self.pinecone.describe_index(index_dir).dimension != dimension: + # resolve any potential dimension-mismatch errors + self.pinecone.delete_index(index_dir) + self.pinecone.create_index( + name=index_dir, + dimension=dimension, + metric="cosine", + spec=ServerlessSpec( + cloud='aws', + region='us-east-1' + ) + ) + + # Connect to index + self.pc_index = self.pinecone.Index(index_dir) + + # Convert docs to list if it's a pandas Series + docs_list = docs.tolist() if isinstance(docs, pd.Series) else docs + + # Prepare vectors for upsert + vectors = [] + for idx, (embedding, doc) in enumerate(zip(embeddings, docs_list)): + vectors.append({ + "id": str(idx), + "values": embedding.tolist(), # Pinecone expects lists, not numpy arrays + "metadata": { + "content": doc, + "doc_id": idx + } + }) + + # Upsert in batches of 100 + batch_size = 100 + for i in tqdm(range(0, len(vectors), batch_size), desc="Uploading to Pinecone"): + batch = vectors[i:i + batch_size] + self.pc_index.upsert(vectors=batch) + + def load_index(self, index_dir: str): + """Connect to an existing Pinecone index""" + if index_dir not in self.pinecone.list_indexes(): + raise ValueError(f"Index {index_dir} not found") + + self.index_dir = index_dir + self.pc_index = self.pinecone.Index(index_dir) + + def __call__( + self, + query_vectors, + K: int, + ids: Optional[list[int]] = None, + **kwargs: dict[str, Any] + ) -> RMOutput: + """Perform vector search using Pinecone""" + if self.pc_index is None: + raise ValueError("No index loaded. Call load_index first.") + K = min(K, 10000) + + # Perform searches + all_distances = [] + all_indices = [] + + for query_vector in query_vectors: + # Query Pinecone + results = self.pc_index.query( + vector=query_vector.tolist(), + top_k=max(K, 2), + include_metadata=True, + filter={ + "doc_id": { + "$in": ids + } , + } if ids is not None else None, + **kwargs + ) + + + # Extract distances and indices + distances = [] + indices = [] + + for match in results.matches: + indices.append(int(match.metadata["doc_id"])) + distances.append(match.score) + + # Pad results if fewer than K matches + while len(indices) < K: + indices.append(-1) # Use -1 for padding + distances.append(0.0) + + all_distances.append(distances) + all_indices.append(indices) + + return RMOutput( + distances=np.array(all_distances, dtype=np.float32).tolist(), + indices=np.array(all_indices, dtype=np.int64).tolist() + ) + + def get_vectors_from_index(self, index_dir: str, ids: list[int]) -> NDArray[np.float64]: + """Retrieve vectors for specific document IDs""" + raise ValueError('Not a Pinecone supported operation!')