From 4b0af422a1b7bcb8e6b78fa4ebf526430604b384 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 9 Jun 2026 20:23:54 +0000 Subject: [PATCH 1/2] Push keyword search down to Qdrant via a full-text payload index keyword_search() previously transferred up to 5000 full payloads per query and substring-matched in Python; past that cap, older memories silently became unsearchable. Now a full-text index on the data field (created lazily and idempotently on first keyword search; outcome cached) lets Qdrant prefilter candidates server-side with MatchText, and the original substring check verifies them so semantics are unchanged. MatchText is token-based, so queries that only match inside a word (or yield no surviving candidates) transparently fall back to the legacy scan - recall never regresses, and stores on Qdrant versions that cannot create the index keep working via the scan path. Closes #64 https://claude.ai/code/session_01H2Dbh6kD8bseWZZEf7kGhx --- app/memory.py | 124 ++++++++++++++++++++++++++++++++++------ docs/DEVELOPER_GUIDE.md | 14 +++-- docs/USER_GUIDE.md | 13 ++++- tests/conftest.py | 9 +++ tests/test_memory.py | 104 +++++++++++++++++++++++++++++++++ 5 files changed, 240 insertions(+), 24 deletions(-) diff --git a/app/memory.py b/app/memory.py index 05697a2..173d099 100644 --- a/app/memory.py +++ b/app/memory.py @@ -148,6 +148,85 @@ def _point_recency(point) -> datetime: return ts or datetime.min.replace(tzinfo=UTC) +# Tri-state cache for the full-text index on the `data` payload field: +# None = not attempted yet, True = created/confirmed, False = creation failed +# (old Qdrant, permissions) — don't retry on every search, use the scan path. +_keyword_index_state: bool | None = None + + +def reset_keyword_index_state() -> None: + """Forget whether the keyword index exists (test hook / ops escape hatch).""" + global _keyword_index_state + _keyword_index_state = None + + +def _ensure_keyword_index(memory) -> bool: + """Idempotently create the full-text index on `data`; cache the outcome.""" + global _keyword_index_state + if _keyword_index_state is None: + from qdrant_client import models + + try: + memory.vector_store.client.create_payload_index( + collection_name=memory.vector_store.collection_name, + field_name="data", + field_schema=models.TextIndexParams( + type=models.TextIndexType.TEXT, + tokenizer=models.TokenizerType.WORD, + lowercase=True, + ), + ) + _keyword_index_state = True + except Exception: + _keyword_index_state = False + return _keyword_index_state + + +def _indexed_keyword_points(memory, query: str, filters: dict, scan_limit: int) -> list: + """Fetch candidate points server-side via the full-text index. + + Qdrant's MatchText requires every (lowercased) query token to appear in the + document, so this transfers only plausible candidates instead of the whole + store. It is a prefilter, not the final answer — substring verification + still happens in keyword_search(). + """ + from qdrant_client import models + + conditions = [ + models.FieldCondition(key=key, match=models.MatchValue(value=value)) + for key, value in filters.items() + ] + conditions.append( + models.FieldCondition(key="data", match=models.MatchText(text=query)) + ) + points, _ = memory.vector_store.client.scroll( + collection_name=memory.vector_store.collection_name, + scroll_filter=models.Filter(must=conditions), + limit=scan_limit, + with_payload=True, + with_vectors=False, + ) + return points or [] + + +def _scanned_points(memory, filters: dict, scan_limit: int) -> list: + """Legacy path: pull up to scan_limit payloads and match client-side.""" + result = memory.vector_store.list(filters=filters or None, top_k=scan_limit) + points = result[0] if isinstance(result, tuple) else result + return list(points or []) + + +def _substring_matches(points: list, needle: str, limit: int) -> list[dict]: + matches = [ + point + for point in points + if isinstance((getattr(point, "payload", None) or {}).get("data"), str) + and needle in point.payload["data"].casefold() + ] + matches.sort(key=_point_recency, reverse=True) # most recently touched first + return [_point_to_result(p) for p in matches[:limit]] + + def keyword_search( query: str, *, @@ -159,13 +238,22 @@ def keyword_search( """Case-insensitive substring search over stored memory text. A literal-match fallback for terms semantic search misses (names, IDs, URLs, - rare tokens). Scans up to `scan_limit` of the user's memories via the vector - store's payload listing and matches `query` as a case-insensitive substring - of each memory's text, returning the most recent matches first. Scoped by - `user_id` only (it spans the whole user store, like the MCP read tools); - `extra_filters` adds exact-match payload conditions (e.g. provenance fields). - An empty/whitespace query matches nothing. Fail-open: any store error - returns no results. + rare tokens). Matching is done in two stages so the store isn't shipped over + the network on every query: + + 1. Indexed prefilter: a full-text payload index on `data` (created lazily, + idempotent) lets Qdrant return only points containing every query token. + 2. Substring verification: the original case-insensitive substring check + runs over the candidates, preserving exact semantics (e.g. phrase + contiguity). + + If the index is unavailable, the indexed query fails, or it yields no + surviving matches (a mid-token fragment like "hil" never token-matches), + the legacy full scan of up to `scan_limit` payloads runs instead, so recall + is never worse than before. Scoped by `user_id` only (it spans the whole + user store, like the MCP read tools); `extra_filters` adds exact-match + payload conditions (e.g. provenance fields). An empty/whitespace query + matches nothing. Fail-open: any store error returns no results. """ needle = query.strip().casefold() if not needle: @@ -176,19 +264,21 @@ def keyword_search( filters["user_id"] = user_id if extra_filters: filters.update(extra_filters) + if _ensure_keyword_index(memory): + try: + candidates = _indexed_keyword_points( + memory, query.strip(), filters, scan_limit + ) + except Exception: + candidates = [] + results = _substring_matches(candidates, needle, limit) + if results: + return {"results": results} try: - result = memory.vector_store.list(filters=filters or None, top_k=scan_limit) + points = _scanned_points(memory, filters, scan_limit) except Exception: return {"results": []} - points = result[0] if isinstance(result, tuple) else result - matches = [ - point - for point in (points or []) - if isinstance((getattr(point, "payload", None) or {}).get("data"), str) - and needle in point.payload["data"].casefold() - ] - matches.sort(key=_point_recency, reverse=True) # most recently touched first - return {"results": [_point_to_result(p) for p in matches[:limit]]} + return {"results": _substring_matches(points, needle, limit)} def _result_expiry(item) -> datetime | None: diff --git a/docs/DEVELOPER_GUIDE.md b/docs/DEVELOPER_GUIDE.md index ddb96b0..5ee1653 100644 --- a/docs/DEVELOPER_GUIDE.md +++ b/docs/DEVELOPER_GUIDE.md @@ -63,10 +63,16 @@ app/ cheap content-fingerprint dedup: it SHA-256s the normalized raw input, stores it in the `content_fp` payload field, and skips the LLM extraction if a memory with that fingerprint already exists (fail-open — a lookup error just proceeds). - keyword_search() is the substring-match fallback behind search mode="keyword": - it scans the user's memories via vector_store.list() and matches the query as a - case-insensitive substring of the `data` payload (fail-open). drop_expired() - removes results whose provenance `expires_at` is past. The most tweak-prone file. + keyword_search() backs search mode="keyword" in two stages: a Qdrant + full-text index on the `data` field (created lazily+idempotently on first + use; outcome cached in _keyword_index_state) prefilters candidates server- + side via MatchText, then the original case-insensitive substring check + verifies them (MatchText is token-based; substring semantics are the + contract). If the index is unavailable, the query fails, or nothing + survives verification (mid-token fragments never token-match), it falls + back to the legacy vector_store.list() scan, so recall never regresses + (fail-open). drop_expired() removes results whose provenance `expires_at` + is past. The most tweak-prone file. mcp_server.py build_mcp(): the six MCP tools, each thinly wrapping a mem0 op with user_id defaulted to MEM0_DEFAULT_USER_ID. rest.py REST router under /api/v1 (mounted with prefix in main.py). Pydantic request diff --git a/docs/USER_GUIDE.md b/docs/USER_GUIDE.md index 2d37127..602db75 100644 --- a/docs/USER_GUIDE.md +++ b/docs/USER_GUIDE.md @@ -591,11 +591,18 @@ curl -X POST https://mem0.your-domain.com/api/v1/memories/search \ ``` The default is `"mode": "semantic"`. The MCP `search_memories` tool accepts the -same `mode` argument. Keyword mode spans the whole user store and scans up to a -few thousand of the most recent memories per query — ample for a personal store; -it's a literal-match fallback, not a replacement for semantic retrieval. +same `mode` argument. Keyword mode spans the whole user store; it's a +literal-match fallback, not a replacement for semantic retrieval. (`recency_weight` applies to semantic mode only.) +Under the hood, keyword matching is pushed down to Qdrant via a full-text payload +index that the server creates automatically on first use (no setup or maintenance +needed). Whole-word queries are answered from the index; queries that only match +*inside* a word (e.g. `hil` matching `Philips`) transparently fall back to a scan +of up to a few thousand memories, so results are the same as before — exact, +case-insensitive substring matches, most recent first. If your Qdrant version +can't create the index, everything still works via the scan path. + ### List memories — `GET /api/v1/memories` Query params: `agent_id`, `run_id`, `user_id`, `limit` (1–100, default 50), plus the diff --git a/tests/conftest.py b/tests/conftest.py index e5aa6d2..c9f5c86 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -31,6 +31,15 @@ memory_mod.get_memory = lambda: FAKE_MEMORY +@pytest.fixture(autouse=True) +def _reset_keyword_index_state(): + # The keyword-index existence check is cached module-wide; tests must not + # inherit another test's cached outcome. + memory_mod.reset_keyword_index_state() + yield + memory_mod.reset_keyword_index_state() + + @pytest.fixture def mem(): FAKE_MEMORY.reset_mock() diff --git a/tests/test_memory.py b/tests/test_memory.py index 73525c7..941d736 100644 --- a/tests/test_memory.py +++ b/tests/test_memory.py @@ -156,6 +156,9 @@ def _patch_keyword(monkeypatch, points): fake = MagicMock() fake.vector_store.list.return_value = (points, None) + # Indexed prefilter finds nothing, so these tests exercise the scan path's + # substring/ordering semantics; the indexed path has its own tests below. + fake.vector_store.client.scroll.return_value = ([], None) monkeypatch.setattr(m, "get_memory", lambda: fake) return fake @@ -275,3 +278,104 @@ def test_keyword_search_fails_open(monkeypatch): fake.vector_store.list.side_effect = RuntimeError("qdrant down") monkeypatch.setattr(m, "get_memory", lambda: fake) assert keyword_search("x", user_id="ian") == {"results": []} + + +# --- keyword_search indexed path ---------------------------------------------- + + +def _patch_indexed(monkeypatch, indexed_points, scanned_points=()): + import app.memory as m + + fake = MagicMock() + fake.vector_store.collection_name = "test_memories" + fake.vector_store.client.scroll.return_value = (list(indexed_points), None) + fake.vector_store.list.return_value = (list(scanned_points), None) + monkeypatch.setattr(m, "get_memory", lambda: fake) + return fake + + +def test_keyword_search_uses_index_and_skips_full_scan(monkeypatch): + from qdrant_client import models + + fake = _patch_indexed( + monkeypatch, [_point("1", "Ian uses Philips Hue lights", user_id="ian")] + ) + out = keyword_search("philips", user_id="ian") + assert [r["id"] for r in out["results"]] == ["1"] + fake.vector_store.list.assert_not_called() # no full-store transfer + + # The index is created once, on the data field, as a full-text index. + fake.vector_store.client.create_payload_index.assert_called_once() + _, idx_kwargs = fake.vector_store.client.create_payload_index.call_args + assert idx_kwargs["field_name"] == "data" + assert idx_kwargs["field_schema"].tokenizer == models.TokenizerType.WORD + + # The scroll filter combines exact-match scoping with the MatchText clause. + _, kwargs = fake.vector_store.client.scroll.call_args + conditions = kwargs["scroll_filter"].must + matches = {c.key: c.match for c in conditions} + assert matches["user_id"] == models.MatchValue(value="ian") + assert matches["data"] == models.MatchText(text="philips") + + +def test_keyword_index_created_once_across_searches(monkeypatch): + fake = _patch_indexed(monkeypatch, [_point("1", "alpha", user_id="ian")]) + keyword_search("alpha", user_id="ian") + keyword_search("alpha", user_id="ian") + assert fake.vector_store.client.create_payload_index.call_count == 1 + + +def test_keyword_indexed_results_still_substring_verified(monkeypatch): + # MatchText is token-based: both tokens present but not contiguous must not + # match the substring query. + fake = _patch_indexed( + monkeypatch, [_point("1", "oat in my milk", user_id="ian")] + ) + out = keyword_search("oat milk", user_id="ian") + assert out["results"] == [] + fake.vector_store.list.assert_called_once() # fell back to the scan + + +def test_keyword_falls_back_to_scan_for_mid_token_fragment(monkeypatch): + # "hil" never token-matches, but the scan still finds it in "Philips". + fake = _patch_indexed( + monkeypatch, + indexed_points=[], + scanned_points=[_point("1", "Ian uses Philips Hue lights", user_id="ian")], + ) + out = keyword_search("hil", user_id="ian") + assert [r["id"] for r in out["results"]] == ["1"] + fake.vector_store.list.assert_called_once() + + +def test_keyword_falls_back_when_index_creation_fails(monkeypatch): + fake = _patch_indexed( + monkeypatch, + indexed_points=[], + scanned_points=[_point("1", "alpha", user_id="ian")], + ) + fake.vector_store.client.create_payload_index.side_effect = RuntimeError("old qdrant") + out = keyword_search("alpha", user_id="ian") + assert [r["id"] for r in out["results"]] == ["1"] + fake.vector_store.client.scroll.assert_not_called() + # The failed creation is cached: the next search doesn't retry it. + keyword_search("alpha", user_id="ian") + assert fake.vector_store.client.create_payload_index.call_count == 1 + + +def test_keyword_falls_back_when_indexed_query_raises(monkeypatch): + fake = _patch_indexed( + monkeypatch, + indexed_points=[], + scanned_points=[_point("1", "alpha", user_id="ian")], + ) + fake.vector_store.client.scroll.side_effect = RuntimeError("scroll broke") + out = keyword_search("alpha", user_id="ian") + assert [r["id"] for r in out["results"]] == ["1"] + + +def test_keyword_empty_query_touches_nothing(monkeypatch): + fake = _patch_indexed(monkeypatch, []) + assert keyword_search(" ", user_id="ian") == {"results": []} + fake.vector_store.client.create_payload_index.assert_not_called() + fake.vector_store.list.assert_not_called() From bcf9f2706bf0822b519f321c8984fd86d863aac3 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 9 Jun 2026 20:31:06 +0000 Subject: [PATCH 2/2] Address review: lock index-state cache, explicit scroll stub in REST test https://claude.ai/code/session_01H2Dbh6kD8bseWZZEf7kGhx --- app/memory.py | 47 +++++++++++++++++++++++++++------------------- tests/test_rest.py | 3 +++ 2 files changed, 31 insertions(+), 19 deletions(-) diff --git a/app/memory.py b/app/memory.py index 173d099..85264ad 100644 --- a/app/memory.py +++ b/app/memory.py @@ -1,5 +1,6 @@ import hashlib import json +import threading from datetime import UTC, datetime from functools import lru_cache @@ -152,34 +153,42 @@ def _point_recency(point) -> datetime: # None = not attempted yet, True = created/confirmed, False = creation failed # (old Qdrant, permissions) — don't retry on every search, use the scan path. _keyword_index_state: bool | None = None +_keyword_index_lock = threading.Lock() def reset_keyword_index_state() -> None: """Forget whether the keyword index exists (test hook / ops escape hatch).""" global _keyword_index_state - _keyword_index_state = None + with _keyword_index_lock: + _keyword_index_state = None def _ensure_keyword_index(memory) -> bool: - """Idempotently create the full-text index on `data`; cache the outcome.""" - global _keyword_index_state - if _keyword_index_state is None: - from qdrant_client import models + """Idempotently create the full-text index on `data`; cache the outcome. - try: - memory.vector_store.client.create_payload_index( - collection_name=memory.vector_store.collection_name, - field_name="data", - field_schema=models.TextIndexParams( - type=models.TextIndexType.TEXT, - tokenizer=models.TokenizerType.WORD, - lowercase=True, - ), - ) - _keyword_index_state = True - except Exception: - _keyword_index_state = False - return _keyword_index_state + Locked so concurrent first searches (FastAPI runs sync endpoints in a + threadpool) attempt creation exactly once — without it, a transient failure + in a racing duplicate attempt could overwrite a successful True with False. + """ + global _keyword_index_state + with _keyword_index_lock: + if _keyword_index_state is None: + from qdrant_client import models + + try: + memory.vector_store.client.create_payload_index( + collection_name=memory.vector_store.collection_name, + field_name="data", + field_schema=models.TextIndexParams( + type=models.TextIndexType.TEXT, + tokenizer=models.TokenizerType.WORD, + lowercase=True, + ), + ) + _keyword_index_state = True + except Exception: + _keyword_index_state = False + return _keyword_index_state def _indexed_keyword_points(memory, query: str, filters: dict, scan_limit: int) -> list: diff --git a/tests/test_rest.py b/tests/test_rest.py index 8309b8b..b006042 100644 --- a/tests/test_rest.py +++ b/tests/test_rest.py @@ -135,6 +135,9 @@ def test_search_keyword_mode(app_instance, mem, auth_header): point = SimpleNamespace(id="1", payload={"data": "the Philips hub", "created_at": "2026-06-01T00:00:00+00:00"}) # noqa: E501 mem.vector_store.list.return_value = ([point], None) + # Indexed prefilter finds nothing -> exercises the scan fallback explicitly + # (rather than via a MagicMock unpack failure). + mem.vector_store.client.scroll.return_value = ([], None) c = _client(app_instance) resp = c.post( "/api/v1/memories/search",