Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 38 additions & 3 deletions app/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,24 +154,30 @@ def keyword_search(
user_id: str | None = None,
limit: int = 10,
scan_limit: int = DEFAULT_KEYWORD_SCAN_LIMIT,
extra_filters: dict | None = None,
) -> dict:
"""Case-insensitive substring search over stored memory text.

A literal-match fallback for terms semantic search misses (names, IDs, URLs,
rare tokens). Scans up to `scan_limit` of the user's memories via the vector
store's payload listing and matches `query` as a case-insensitive substring
of each memory's text, returning the most recent matches first. Scoped by
`user_id` only (it spans the whole user store, like the MCP read tools).
`user_id` only (it spans the whole user store, like the MCP read tools);
`extra_filters` adds exact-match payload conditions (e.g. provenance fields).
An empty/whitespace query matches nothing. Fail-open: any store error
returns no results.
"""
needle = query.strip().casefold()
if not needle:
return {"results": []}
memory = get_memory()
filters = {"user_id": user_id} if user_id else None
filters: dict = {}
if user_id:
filters["user_id"] = user_id
if extra_filters:
filters.update(extra_filters)
try:
result = memory.vector_store.list(filters=filters, top_k=scan_limit)
result = memory.vector_store.list(filters=filters or None, top_k=scan_limit)
except Exception:
return {"results": []}
points = result[0] if isinstance(result, tuple) else result
Expand All @@ -183,3 +189,32 @@ def keyword_search(
]
matches.sort(key=_point_recency, reverse=True) # most recently touched first
return {"results": [_point_to_result(p) for p in matches[:limit]]}


def _result_expiry(item) -> datetime | None:
"""Parse an `expires_at` from a result item (top-level or nested metadata)."""
if not isinstance(item, dict):
return None
ts = _parse_timestamp(item.get("expires_at"))
if ts is None and isinstance(item.get("metadata"), dict):
ts = _parse_timestamp(item["metadata"].get("expires_at"))
return ts


def drop_expired(results: dict, now: datetime | None = None) -> dict:
"""Remove memories whose `expires_at` is at/before `now` from a results dict.

Supports the provenance convention's expiry field so stale facts can be
filtered out of reads. Items without a (parseable) `expires_at` are kept.
Anything not shaped like ``{"results": [...]}`` is returned unchanged.
"""
if not isinstance(results, dict):
return results
items = results.get("results")
if not isinstance(items, list):
return results
now = now or datetime.now(UTC)
results["results"] = [
item for item in items if (exp := _result_expiry(item)) is None or exp > now
]
return results
43 changes: 36 additions & 7 deletions app/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,25 @@ class SearchRequest(BaseModel):
# (unchanged), 1 = order almost entirely by how recently a memory was touched.
recency_weight: float = Field(default=0.0, ge=0.0, le=1.0)
recency_half_life_days: float = Field(default=30.0, gt=0.0)
# Provenance/review-metadata filters (exact match); see the metadata convention.
source: str | None = None
confidence: str | None = None
review_status: str | None = None
exclude_expired: bool = False


class UpdateMemoryRequest(BaseModel):
content: str


def _provenance_filters(
source: str | None, confidence: str | None, review_status: str | None
) -> dict:
"""Exact-match payload filters for the provenance/review metadata convention."""
pairs = (("source", source), ("confidence", confidence), ("review_status", review_status))
return {key: value for key, value in pairs if value}


def _scope_kwargs(
user_id: str | None, agent_id: str | None = None, run_id: str | None = None
) -> dict:
Expand All @@ -75,24 +88,40 @@ def add_memory(req: AddMemoryRequest) -> dict:

@router.post("/memories/search")
def search_memories(req: SearchRequest) -> dict:
filters = _scope_kwargs(req.user_id, req.agent_id, req.run_id)
prov = _provenance_filters(req.source, req.confidence, req.review_status)
if req.mode == "keyword":
return memory_mod.keyword_search(req.query, user_id=filters["user_id"], limit=req.limit)
memory = memory_mod.get_memory()
results = memory.search(query=req.query, filters=filters, top_k=req.limit)
return rerank_by_recency(results, req.recency_weight, req.recency_half_life_days)
results = memory_mod.keyword_search(
req.query,
user_id=_scope_kwargs(req.user_id)["user_id"],
limit=req.limit,
extra_filters=prov or None,
)
else:
filters = {**_scope_kwargs(req.user_id, req.agent_id, req.run_id), **prov}
memory = memory_mod.get_memory()
raw = memory.search(query=req.query, filters=filters, top_k=req.limit)
results = rerank_by_recency(raw, req.recency_weight, req.recency_half_life_days)
return memory_mod.drop_expired(results) if req.exclude_expired else results


@router.get("/memories")
def list_memories(
user_id: str | None = None,
agent_id: str | None = None,
run_id: str | None = None,
source: str | None = None,
confidence: str | None = None,
review_status: str | None = None,
exclude_expired: bool = False,
limit: int = Query(default=50, ge=1, le=100),
) -> dict:
memory = memory_mod.get_memory()
filters = _scope_kwargs(user_id, agent_id, run_id)
return memory.get_all(filters=filters, top_k=limit)
filters = {
**_scope_kwargs(user_id, agent_id, run_id),
**_provenance_filters(source, confidence, review_status),
}
results = memory.get_all(filters=filters, top_k=limit)
return memory_mod.drop_expired(results) if exclude_expired else results


@router.get("/memories/{memory_id}")
Expand Down
7 changes: 4 additions & 3 deletions docs/DEVELOPER_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,13 @@ app/
with that fingerprint already exists (fail-open — a lookup error just proceeds).
keyword_search() is the substring-match fallback behind search mode="keyword":
it scans the user's memories via vector_store.list() and matches the query as a
case-insensitive substring of the `data` payload (fail-open). The most tweak-prone
file.
case-insensitive substring of the `data` payload (fail-open). drop_expired()
removes results whose provenance `expires_at` is past. The most tweak-prone file.
mcp_server.py build_mcp(): the six MCP tools, each thinly wrapping a mem0 op with
user_id defaulted to MEM0_DEFAULT_USER_ID.
rest.py REST router under /api/v1 (mounted with prefix in main.py). Pydantic request
models, _scope_kwargs() for user/agent/run scoping, check_qdrant() helper.
models, _scope_kwargs() for user/agent/run scoping, _provenance_filters() for
the source/confidence/review_status metadata convention, check_qdrant() helper.
ranking.py rerank_by_recency(): optional, opt-in post-search re-ranking that blends
mem0's similarity score with a recency decay. No-op when recency_weight=0,
so default REST/MCP search behavior is unchanged.
Expand Down
49 changes: 48 additions & 1 deletion docs/USER_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,43 @@ Memories can optionally be tagged with:

`user_id` always defaults to `MEM0_DEFAULT_USER_ID`; you rarely need to set it.

### Provenance and review metadata (convention)

For "governed" agent memory — distinguishing trusted from untrusted, fresh from
stale — there's a small **convention** of reserved `metadata` keys. They're just
ordinary metadata (nothing enforces them), but the REST read endpoints can filter
on them, and agents can reason about them:

| Key | Recommended values | Meaning |
|---|---|---|
| `source` | free-form, e.g. `user`, `agent`, `import:chatgpt`, `capture:telegram`, `tool:n8n` | Where the memory came from. The import scripts and capture bot already set this. |
| `confidence` | `high`, `medium`, `low`, `unknown` | How much to trust the content. |
| `review_status` | `unreviewed`, `approved`, `rejected`, `stale` | Whether a human/agent has vetted it. |
| `reviewed_by` / `reviewed_at` | free-form / ISO 8601 | Who vetted it and when (optional). |
| `expires_at` | ISO 8601 timestamp | When the fact should stop being trusted. |

`confidence` and `review_status` are **independent** — a memory can be
high-confidence but unreviewed, or approved but deliberately low-confidence.
`expires_at` addresses the common agent-memory failure mode where *old memory
stays trusted after the world changed*: set it on facts that age out, then pass
`exclude_expired=true` on reads (below) to drop them. This is a flat convention
layered on `metadata`; the existing top-level `agent_id` remains the writer tag.

Set them on any write, e.g.:

```bash
curl -X POST https://mem0.your-domain.com/api/v1/memories \
-H "Authorization: Bearer $MEM0_API_KEY" -H "Content-Type: application/json" \
-d '{"content": "Q3 OKRs are finalized", "agent_id": "claude-code",
"metadata": {"source": "user", "confidence": "high",
"review_status": "approved", "expires_at": "2026-10-01T00:00:00Z"}}'
```

Filter reads by them with the query params on `GET /api/v1/memories` (and the same
fields on search) — see the [REST API reference](#rest-api-reference). Per the
shared-store design, the **MCP** read tools never filter by these (they span the
whole store); metadata filtering is a REST-only affordance for scripts.

## Prerequisites

Before deploying you need:
Expand Down Expand Up @@ -561,13 +598,23 @@ it's a literal-match fallback, not a replacement for semantic retrieval.

### List memories — `GET /api/v1/memories`

Query params: `agent_id`, `run_id`, `user_id`, `limit` (1–100, default 50).
Query params: `agent_id`, `run_id`, `user_id`, `limit` (1–100, default 50), plus the
provenance/review filters `source`, `confidence`, `review_status` (exact match), and
`exclude_expired` (drop memories whose `expires_at` is in the past). See
[Provenance and review metadata](#provenance-and-review-metadata-convention).

```bash
curl https://mem0.your-domain.com/api/v1/memories?limit=20 \
-H "Authorization: Bearer $MEM0_API_KEY"

# Only approved, non-expired memories imported from ChatGPT:
curl "https://mem0.your-domain.com/api/v1/memories?source=import:chatgpt&review_status=approved&exclude_expired=true" \
-H "Authorization: Bearer $MEM0_API_KEY"
```

The same `source` / `confidence` / `review_status` / `exclude_expired` fields are
accepted on `POST /api/v1/memories/search` (in both `semantic` and `keyword` modes).

### Get one — `GET /api/v1/memories/{memory_id}`

Returns 404 if the memory does not exist.
Expand Down
32 changes: 32 additions & 0 deletions tests/test_memory.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datetime import UTC, datetime
from types import SimpleNamespace
from unittest.mock import MagicMock

Expand All @@ -7,9 +8,12 @@
_existing_fingerprint_id,
add_memory,
content_fingerprint,
drop_expired,
keyword_search,
)

EXPIRY_NOW = datetime(2026, 6, 7, tzinfo=UTC)


def _point(id, data, created_at="2026-06-01T00:00:00+00:00", **extra):
return SimpleNamespace(id=id, payload={"data": data, "created_at": created_at, **extra})
Expand Down Expand Up @@ -236,6 +240,34 @@ def test_keyword_search_empty_query_matches_nothing(monkeypatch):
fake.vector_store.list.assert_not_called() # short-circuits before scanning


def test_keyword_search_passes_extra_filters(monkeypatch):
fake = _patch_keyword(monkeypatch, [_point("1", "Philips hub")])
keyword_search("philips", user_id="ian", extra_filters={"review_status": "approved"})
_, kwargs = fake.vector_store.list.call_args
assert kwargs["filters"] == {"user_id": "ian", "review_status": "approved"}


# --- drop_expired ------------------------------------------------------------


def test_drop_expired_removes_past_keeps_future_and_missing():
results = {
"results": [
{"id": "past", "metadata": {"expires_at": "2020-01-01T00:00:00+00:00"}},
{"id": "future", "metadata": {"expires_at": "2030-01-01T00:00:00Z"}},
{"id": "no_expiry", "metadata": {}},
{"id": "toplevel_past", "expires_at": "2019-01-01T00:00:00+00:00"},
]
}
out = drop_expired(results, now=EXPIRY_NOW)
assert [i["id"] for i in out["results"]] == ["future", "no_expiry"]


def test_drop_expired_passes_through_non_results():
assert drop_expired([], now=EXPIRY_NOW) == []
assert drop_expired({"results": "x"}, now=EXPIRY_NOW) == {"results": "x"}


def test_keyword_search_fails_open(monkeypatch):
import app.memory as m

Expand Down
60 changes: 60 additions & 0 deletions tests/test_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,66 @@ def test_search_invalid_mode_rejected(app_instance, mem, auth_header):
assert resp.status_code == 422


def test_list_filters_by_provenance_metadata(app_instance, mem, auth_header):
mem.get_all.return_value = {"results": []}
c = _client(app_instance)
resp = c.get(
"/api/v1/memories?source=import:chatgpt&confidence=high&review_status=approved",
headers=auth_header,
)
assert resp.status_code == 200
_, kwargs = mem.get_all.call_args
f = kwargs["filters"]
assert f["user_id"] == "default-user"
assert f["source"] == "import:chatgpt"
assert f["confidence"] == "high"
assert f["review_status"] == "approved"


def test_list_exclude_expired(app_instance, mem, auth_header):
mem.get_all.return_value = {
"results": [
{"id": "stale", "metadata": {"expires_at": "2000-01-01T00:00:00+00:00"}},
{"id": "fresh", "metadata": {}},
]
}
c = _client(app_instance)
resp = c.get("/api/v1/memories?exclude_expired=true", headers=auth_header)
assert resp.status_code == 200
assert [i["id"] for i in resp.json()["results"]] == ["fresh"]


def test_search_filters_by_provenance_metadata(app_instance, mem, auth_header):
mem.search.return_value = {"results": []}
c = _client(app_instance)
resp = c.post(
"/api/v1/memories/search",
json={"query": "x", "review_status": "approved"},
headers=auth_header,
)
assert resp.status_code == 200
_, kwargs = mem.search.call_args
assert kwargs["filters"]["review_status"] == "approved"
assert kwargs["filters"]["user_id"] == "default-user"


def test_search_exclude_expired(app_instance, mem, auth_header):
mem.search.return_value = {
"results": [
{"id": "stale", "metadata": {"expires_at": "2000-01-01T00:00:00+00:00"}},
{"id": "fresh", "metadata": {}},
]
}
c = _client(app_instance)
resp = c.post(
"/api/v1/memories/search",
json={"query": "x", "exclude_expired": True},
headers=auth_header,
)
assert resp.status_code == 200
assert [i["id"] for i in resp.json()["results"]] == ["fresh"]


def test_search_scoped_by_run_id(app_instance, mem, auth_header):
mem.search.return_value = {"results": []}
c = _client(app_instance)
Expand Down
Loading