Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 116 additions & 17 deletions app/memory.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import hashlib
import json
import threading
from datetime import UTC, datetime
from functools import lru_cache

Expand Down Expand Up @@ -148,6 +149,93 @@ def _point_recency(point) -> datetime:
return ts or datetime.min.replace(tzinfo=UTC)


# Tri-state cache for the full-text index on the `data` payload field:
# None = not attempted yet, True = created/confirmed, False = creation failed
# (old Qdrant, permissions) — don't retry on every search, use the scan path.
_keyword_index_state: bool | None = None
_keyword_index_lock = threading.Lock()


def reset_keyword_index_state() -> None:
"""Forget whether the keyword index exists (test hook / ops escape hatch)."""
global _keyword_index_state
with _keyword_index_lock:
_keyword_index_state = None


def _ensure_keyword_index(memory) -> bool:
"""Idempotently create the full-text index on `data`; cache the outcome.

Locked so concurrent first searches (FastAPI runs sync endpoints in a
threadpool) attempt creation exactly once — without it, a transient failure
in a racing duplicate attempt could overwrite a successful True with False.
"""
global _keyword_index_state
with _keyword_index_lock:
if _keyword_index_state is None:
from qdrant_client import models

try:
memory.vector_store.client.create_payload_index(
collection_name=memory.vector_store.collection_name,
field_name="data",
field_schema=models.TextIndexParams(
type=models.TextIndexType.TEXT,
tokenizer=models.TokenizerType.WORD,
lowercase=True,
),
)
_keyword_index_state = True
except Exception:
_keyword_index_state = False
return _keyword_index_state


def _indexed_keyword_points(memory, query: str, filters: dict, scan_limit: int) -> list:
"""Fetch candidate points server-side via the full-text index.

Qdrant's MatchText requires every (lowercased) query token to appear in the
document, so this transfers only plausible candidates instead of the whole
store. It is a prefilter, not the final answer — substring verification
still happens in keyword_search().
"""
from qdrant_client import models

conditions = [
models.FieldCondition(key=key, match=models.MatchValue(value=value))
for key, value in filters.items()
]
conditions.append(
models.FieldCondition(key="data", match=models.MatchText(text=query))
)
points, _ = memory.vector_store.client.scroll(
collection_name=memory.vector_store.collection_name,
scroll_filter=models.Filter(must=conditions),
limit=scan_limit,
with_payload=True,
with_vectors=False,
)
return points or []


def _scanned_points(memory, filters: dict, scan_limit: int) -> list:
"""Legacy path: pull up to scan_limit payloads and match client-side."""
result = memory.vector_store.list(filters=filters or None, top_k=scan_limit)
points = result[0] if isinstance(result, tuple) else result
return list(points or [])


def _substring_matches(points: list, needle: str, limit: int) -> list[dict]:
matches = [
point
for point in points
if isinstance((getattr(point, "payload", None) or {}).get("data"), str)
and needle in point.payload["data"].casefold()
]
matches.sort(key=_point_recency, reverse=True) # most recently touched first
return [_point_to_result(p) for p in matches[:limit]]


def keyword_search(
query: str,
*,
Expand All @@ -159,13 +247,22 @@ def keyword_search(
"""Case-insensitive substring search over stored memory text.

A literal-match fallback for terms semantic search misses (names, IDs, URLs,
rare tokens). Scans up to `scan_limit` of the user's memories via the vector
store's payload listing and matches `query` as a case-insensitive substring
of each memory's text, returning the most recent matches first. Scoped by
`user_id` only (it spans the whole user store, like the MCP read tools);
`extra_filters` adds exact-match payload conditions (e.g. provenance fields).
An empty/whitespace query matches nothing. Fail-open: any store error
returns no results.
rare tokens). Matching is done in two stages so the store isn't shipped over
the network on every query:

1. Indexed prefilter: a full-text payload index on `data` (created lazily,
idempotent) lets Qdrant return only points containing every query token.
2. Substring verification: the original case-insensitive substring check
runs over the candidates, preserving exact semantics (e.g. phrase
contiguity).

If the index is unavailable, the indexed query fails, or it yields no
surviving matches (a mid-token fragment like "hil" never token-matches),
the legacy full scan of up to `scan_limit` payloads runs instead, so recall
is never worse than before. Scoped by `user_id` only (it spans the whole
user store, like the MCP read tools); `extra_filters` adds exact-match
payload conditions (e.g. provenance fields). An empty/whitespace query
matches nothing. Fail-open: any store error returns no results.
"""
needle = query.strip().casefold()
if not needle:
Expand All @@ -176,19 +273,21 @@ def keyword_search(
filters["user_id"] = user_id
if extra_filters:
filters.update(extra_filters)
if _ensure_keyword_index(memory):
try:
candidates = _indexed_keyword_points(
memory, query.strip(), filters, scan_limit
)
except Exception:
candidates = []
results = _substring_matches(candidates, needle, limit)
if results:
return {"results": results}
try:
result = memory.vector_store.list(filters=filters or None, top_k=scan_limit)
points = _scanned_points(memory, filters, scan_limit)
except Exception:
return {"results": []}
points = result[0] if isinstance(result, tuple) else result
matches = [
point
for point in (points or [])
if isinstance((getattr(point, "payload", None) or {}).get("data"), str)
and needle in point.payload["data"].casefold()
]
matches.sort(key=_point_recency, reverse=True) # most recently touched first
return {"results": [_point_to_result(p) for p in matches[:limit]]}
return {"results": _substring_matches(points, needle, limit)}


# Upper bound on the list-pagination offset. Offset paging is emulated by
Expand Down
20 changes: 13 additions & 7 deletions docs/DEVELOPER_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,19 @@ app/
cheap content-fingerprint dedup: it SHA-256s the normalized raw input, stores
it in the `content_fp` payload field, and skips the LLM extraction if a memory
with that fingerprint already exists (fail-open — a lookup error just proceeds).
keyword_search() is the substring-match fallback behind search mode="keyword":
it scans the user's memories via vector_store.list() and matches the query as a
case-insensitive substring of the `data` payload (fail-open). drop_expired()
removes results whose provenance `expires_at` is past. list_paginated()
implements offset paging for list reads: mem0's get_all has no offset, so it
over-fetches offset+limit+1 (capped by MAX_LIST_OFFSET) and slices, using the
extra item as the has_more signal. The most tweak-prone file.
keyword_search() backs search mode="keyword" in two stages: a Qdrant
full-text index on the `data` field (created lazily+idempotently on first
use; outcome cached in _keyword_index_state) prefilters candidates server-
side via MatchText, then the original case-insensitive substring check
verifies them (MatchText is token-based; substring semantics are the
contract). If the index is unavailable, the query fails, or nothing
survives verification (mid-token fragments never token-match), it falls
back to the legacy vector_store.list() scan, so recall never regresses
(fail-open). drop_expired() removes results whose provenance `expires_at`
is past. list_paginated() implements offset paging for list reads: mem0's
get_all has no offset, so it over-fetches offset+limit+1 (capped by
MAX_LIST_OFFSET) and slices, using the extra item as the has_more signal.
The most tweak-prone file.
mcp_server.py build_mcp(): the six MCP tools, each thinly wrapping a mem0 op with
user_id defaulted to MEM0_DEFAULT_USER_ID. list_memories pages (default 50,
max 100 per call) so the whole store is never returned in one response.
Expand Down
13 changes: 10 additions & 3 deletions docs/USER_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -609,11 +609,18 @@ curl -X POST https://mem0.your-domain.com/api/v1/memories/search \
```

The default is `"mode": "semantic"`. The MCP `search_memories` tool accepts the
same `mode` argument. Keyword mode spans the whole user store and scans up to a
few thousand of the most recent memories per query — ample for a personal store;
it's a literal-match fallback, not a replacement for semantic retrieval.
same `mode` argument. Keyword mode spans the whole user store; it's a
literal-match fallback, not a replacement for semantic retrieval.
(`recency_weight` applies to semantic mode only.)

Under the hood, keyword matching is pushed down to Qdrant via a full-text payload
index that the server creates automatically on first use (no setup or maintenance
needed). Whole-word queries are answered from the index; queries that only match
*inside* a word (e.g. `hil` matching `Philips`) transparently fall back to a scan
of up to a few thousand memories, so results are the same as before — exact,
case-insensitive substring matches, most recent first. If your Qdrant version
can't create the index, everything still works via the scan path.

### List memories — `GET /api/v1/memories`

Query params: `agent_id`, `run_id`, `user_id`, `limit` (1–100, default 50), `offset`
Expand Down
9 changes: 9 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,15 @@
memory_mod.get_memory = lambda: FAKE_MEMORY


@pytest.fixture(autouse=True)
def _reset_keyword_index_state():
# The keyword-index existence check is cached module-wide; tests must not
# inherit another test's cached outcome.
memory_mod.reset_keyword_index_state()
yield
memory_mod.reset_keyword_index_state()


@pytest.fixture(autouse=True)
def _reset_rate_limiters():
# Failed-auth counts are keyed by client IP and the TestClient always
Expand Down
107 changes: 107 additions & 0 deletions tests/test_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,9 @@ def _patch_keyword(monkeypatch, points):

fake = MagicMock()
fake.vector_store.list.return_value = (points, None)
# Indexed prefilter finds nothing, so these tests exercise the scan path's
# substring/ordering semantics; the indexed path has its own tests below.
fake.vector_store.client.scroll.return_value = ([], None)
monkeypatch.setattr(m, "get_memory", lambda: fake)
return fake

Expand Down Expand Up @@ -278,6 +281,110 @@ def test_keyword_search_fails_open(monkeypatch):
assert keyword_search("x", user_id="ian") == {"results": []}


# --- keyword_search indexed path ----------------------------------------------


def _patch_indexed(monkeypatch, indexed_points, scanned_points=()):
import app.memory as m

fake = MagicMock()
fake.vector_store.collection_name = "test_memories"
fake.vector_store.client.scroll.return_value = (list(indexed_points), None)
fake.vector_store.list.return_value = (list(scanned_points), None)
monkeypatch.setattr(m, "get_memory", lambda: fake)
return fake


def test_keyword_search_uses_index_and_skips_full_scan(monkeypatch):
from qdrant_client import models

fake = _patch_indexed(
monkeypatch, [_point("1", "Ian uses Philips Hue lights", user_id="ian")]
)
out = keyword_search("philips", user_id="ian")
assert [r["id"] for r in out["results"]] == ["1"]
fake.vector_store.list.assert_not_called() # no full-store transfer

# The index is created once, on the data field, as a full-text index.
fake.vector_store.client.create_payload_index.assert_called_once()
_, idx_kwargs = fake.vector_store.client.create_payload_index.call_args
assert idx_kwargs["field_name"] == "data"
assert idx_kwargs["field_schema"].tokenizer == models.TokenizerType.WORD

# The scroll filter combines exact-match scoping with the MatchText clause.
_, kwargs = fake.vector_store.client.scroll.call_args
conditions = kwargs["scroll_filter"].must
matches = {c.key: c.match for c in conditions}
assert matches["user_id"] == models.MatchValue(value="ian")
assert matches["data"] == models.MatchText(text="philips")


def test_keyword_index_created_once_across_searches(monkeypatch):
fake = _patch_indexed(monkeypatch, [_point("1", "alpha", user_id="ian")])
keyword_search("alpha", user_id="ian")
keyword_search("alpha", user_id="ian")
assert fake.vector_store.client.create_payload_index.call_count == 1


def test_keyword_indexed_results_still_substring_verified(monkeypatch):
# MatchText is token-based: both tokens present but not contiguous must not
# match the substring query.
fake = _patch_indexed(
monkeypatch, [_point("1", "oat in my milk", user_id="ian")]
)
out = keyword_search("oat milk", user_id="ian")
assert out["results"] == []
fake.vector_store.list.assert_called_once() # fell back to the scan


def test_keyword_falls_back_to_scan_for_mid_token_fragment(monkeypatch):
# "hil" never token-matches, but the scan still finds it in "Philips".
fake = _patch_indexed(
monkeypatch,
indexed_points=[],
scanned_points=[_point("1", "Ian uses Philips Hue lights", user_id="ian")],
)
out = keyword_search("hil", user_id="ian")
assert [r["id"] for r in out["results"]] == ["1"]
fake.vector_store.list.assert_called_once()


def test_keyword_falls_back_when_index_creation_fails(monkeypatch):
fake = _patch_indexed(
monkeypatch,
indexed_points=[],
scanned_points=[_point("1", "alpha", user_id="ian")],
)
fake.vector_store.client.create_payload_index.side_effect = RuntimeError("old qdrant")
out = keyword_search("alpha", user_id="ian")
assert [r["id"] for r in out["results"]] == ["1"]
fake.vector_store.client.scroll.assert_not_called()
# The failed creation is cached: the next search doesn't retry it.
keyword_search("alpha", user_id="ian")
assert fake.vector_store.client.create_payload_index.call_count == 1


def test_keyword_falls_back_when_indexed_query_raises(monkeypatch):
fake = _patch_indexed(
monkeypatch,
indexed_points=[],
scanned_points=[_point("1", "alpha", user_id="ian")],
)
fake.vector_store.client.scroll.side_effect = RuntimeError("scroll broke")
out = keyword_search("alpha", user_id="ian")
assert [r["id"] for r in out["results"]] == ["1"]


def test_keyword_empty_query_touches_nothing(monkeypatch):
fake = _patch_indexed(monkeypatch, [])
assert keyword_search(" ", user_id="ian") == {"results": []}
fake.vector_store.client.create_payload_index.assert_not_called()
fake.vector_store.list.assert_not_called()


# --- list_paginated -------------------------------------------------------------


def test_list_paginated_shapes_and_slices(mem):
mem.get_all.return_value = {
"results": [{"id": f"m{i}"} for i in range(7)]
Expand Down
3 changes: 3 additions & 0 deletions tests/test_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ def test_search_keyword_mode(app_instance, mem, auth_header):

point = SimpleNamespace(id="1", payload={"data": "the Philips hub", "created_at": "2026-06-01T00:00:00+00:00"}) # noqa: E501
mem.vector_store.list.return_value = ([point], None)
# Indexed prefilter finds nothing -> exercises the scan fallback explicitly
# (rather than via a MagicMock unpack failure).
mem.vector_store.client.scroll.return_value = ([], None)
c = _client(app_instance)
resp = c.post(
"/api/v1/memories/search",
Expand Down
Loading