From 08378290652cb513fff3e780dcb4b3b9d8e26e93 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 9 Jun 2026 20:33:37 +0000 Subject: [PATCH 1/2] Bulk delete memories by filter with dry-run default POST /api/v1/memories/delete_bulk deletes every memory matching exact filters (agent_id, run_id, source, confidence, review_status) so a bad import run or a misbehaving capture agent's writes can be undone in two calls instead of scripted one-by-one deletes against a 100-item page cap. Safety rails: dry-run by default (count + 10-item sample, nothing deleted) until the caller re-posts with confirm=true; at least one filter besides user_id is required so an unscoped POST can never wipe the store; deletions are capped at 1000 per call with has_more signalling the caller to loop, bounding request duration under proxy timeouts. Deletions go through Memory.delete per ID, keeping mem0's history consistent. Deliberately not exposed as an MCP tool. Deletions are logged and counted in a new memories_bulk_deleted_total metric. Closes #67 https://claude.ai/code/session_01H2Dbh6kD8bseWZZEf7kGhx --- app/memory.py | 56 ++++++++++++++++++++++++++++++ app/metrics.py | 10 ++++++ app/rest.py | 35 +++++++++++++++++++ docs/DEVELOPER_GUIDE.md | 8 ++++- docs/USER_GUIDE.md | 34 ++++++++++++++++++ tests/test_memory.py | 47 +++++++++++++++++++++++++ tests/test_rest.py | 76 +++++++++++++++++++++++++++++++++++++++++ 7 files changed, 265 insertions(+), 1 deletion(-) diff --git a/app/memory.py b/app/memory.py index 05697a2..1bfc9e0 100644 --- a/app/memory.py +++ b/app/memory.py @@ -3,9 +3,14 @@ from datetime import UTC, datetime from functools import lru_cache +import structlog + from app.config import Settings, get_settings +from app.metrics import observe_bulk_delete from app.ranking import _parse_timestamp +_log = structlog.get_logger() + def _provider_config(model: str, api_key: str | None) -> dict: config = {"model": model} @@ -191,6 +196,57 @@ def keyword_search( return {"results": [_point_to_result(p) for p in matches[:limit]]} +# Hard ceiling on deletions per bulk_delete call. Bounds request duration (each +# delete is a mem0 round-trip) so a huge purge can't outlive the reverse proxy's +# timeout; callers loop on has_more instead. +BULK_DELETE_MAX = 1000 + + +def bulk_delete(*, filters: dict, confirm: bool = False, max_delete: int = BULK_DELETE_MAX) -> dict: + """Delete (or, by default, just count) every memory matching exact filters. + + Dry-run by default: with confirm=False nothing is deleted; the response + reports how many memories matched and a small sample so the caller can + verify the blast radius before re-posting with confirm=true. + + Deletions go through mem0's Memory.delete per ID — never a raw vector-store + filter delete — so mem0's history/bookkeeping stays consistent. At most + `max_delete` memories are removed per call; `has_more` tells the caller to + loop. The caller is responsible for ensuring `filters` is meaningfully + scoped (see the REST endpoint). + """ + memory = get_memory() + result = memory.vector_store.list(filters=filters, top_k=max_delete + 1) + points = result[0] if isinstance(result, tuple) else result + points = list(points or []) + has_more = len(points) > max_delete + points = points[:max_delete] + sample = [ + {"id": item["id"], "memory": item.get("memory")} + for item in (_point_to_result(p) for p in points[:10]) + ] + response = { + "matched": len(points), + "deleted": 0, + "dry_run": not confirm, + "has_more": has_more, + "sample": sample, + } + if not confirm: + return response + deleted = 0 + for point in points: + memory.delete(memory_id=point.id) + deleted += 1 + observe_bulk_delete(deleted) + _log.warning( + "bulk_delete", filters=filters, matched=len(points), deleted=deleted, + has_more=has_more, + ) + response["deleted"] = deleted + return response + + def _result_expiry(item) -> datetime | None: """Parse an `expires_at` from a result item (top-level or nested metadata).""" if not isinstance(item, dict): diff --git a/app/metrics.py b/app/metrics.py index 939e643..344eb2a 100644 --- a/app/metrics.py +++ b/app/metrics.py @@ -14,6 +14,16 @@ ) +BULK_DELETED = Counter( + "memories_bulk_deleted_total", + "Memories removed via the bulk delete endpoint.", +) + + def observe_request(method: str, path: str, status: int, duration_s: float) -> None: REQUEST_COUNT.labels(method=method, path=path, status=str(status)).inc() REQUEST_LATENCY.labels(method=method, path=path).observe(duration_s) + + +def observe_bulk_delete(count: int) -> None: + BULK_DELETED.inc(count) diff --git a/app/rest.py b/app/rest.py index 013ee1c..09126f8 100644 --- a/app/rest.py +++ b/app/rest.py @@ -55,6 +55,19 @@ class UpdateMemoryRequest(BaseModel): content: str +class BulkDeleteRequest(BaseModel): + # Exact-match filters; at least one besides user_id is required, so a bare + # POST can never wipe the whole store. + agent_id: str | None = None + run_id: str | None = None + source: str | None = None + confidence: str | None = None + review_status: str | None = None + user_id: str | None = None + # False (default) = dry run: count + sample only, nothing deleted. + confirm: bool = False + + def _provenance_filters( source: str | None, confidence: str | None, review_status: str | None ) -> dict: @@ -124,6 +137,28 @@ def list_memories( return memory_mod.drop_expired(results) if exclude_expired else results +@router.post("/memories/delete_bulk") +def delete_bulk(req: BulkDeleteRequest) -> dict: + """Delete every memory matching the given filters; dry-run by default. + + A POST (not DELETE) for the same reason search is: it takes a JSON body, + and DELETE request bodies are ambiguous across proxies/clients. + """ + prov = _provenance_filters(req.source, req.confidence, req.review_status) + if not (req.agent_id or req.run_id or prov): + raise HTTPException( + status_code=422, + detail=( + "Provide at least one filter (agent_id, run_id, source, " + "confidence, review_status). Deleting the entire store is not " + "supported through this endpoint — restore from a backup-less " + "state by dropping the Qdrant collection instead." + ), + ) + filters = {**_scope_kwargs(req.user_id, req.agent_id, req.run_id), **prov} + return memory_mod.bulk_delete(filters=filters, confirm=req.confirm) + + @router.get("/memories/{memory_id}") def get_memory_by_id(memory_id: str) -> dict: memory = memory_mod.get_memory() diff --git a/docs/DEVELOPER_GUIDE.md b/docs/DEVELOPER_GUIDE.md index ddb96b0..440a1d0 100644 --- a/docs/DEVELOPER_GUIDE.md +++ b/docs/DEVELOPER_GUIDE.md @@ -66,7 +66,13 @@ app/ keyword_search() is the substring-match fallback behind search mode="keyword": it scans the user's memories via vector_store.list() and matches the query as a case-insensitive substring of the `data` payload (fail-open). drop_expired() - removes results whose provenance `expires_at` is past. The most tweak-prone file. + removes results whose provenance `expires_at` is past. bulk_delete() backs + POST /memories/delete_bulk: dry-run by default, capped at BULK_DELETE_MAX + per call (has_more signals the caller to loop), and deletes through + Memory.delete per ID — never a raw vector-store filter delete — so mem0's + history stays consistent. Deliberately NOT exposed as an MCP tool: a + destructive filter-delete is an operator action, not something a model + should reach for. The most tweak-prone file. mcp_server.py build_mcp(): the six MCP tools, each thinly wrapping a mem0 op with user_id defaulted to MEM0_DEFAULT_USER_ID. rest.py REST router under /api/v1 (mounted with prefix in main.py). Pydantic request diff --git a/docs/USER_GUIDE.md b/docs/USER_GUIDE.md index 2d37127..ec04ca5 100644 --- a/docs/USER_GUIDE.md +++ b/docs/USER_GUIDE.md @@ -627,6 +627,40 @@ Body: `{"content": "new text"}`. Returns `{"deleted": true, "memory_id": "…"}`. +### Bulk delete — `POST /api/v1/memories/delete_bulk` + +Deletes every memory matching exact-match filters: `agent_id`, `run_id`, `source`, +`confidence`, `review_status` (plus optional `user_id`). **At least one filter +besides `user_id` is required** — wiping the whole store through this endpoint is +deliberately impossible. + +It is a **dry run by default**: with `"confirm": false` (or omitted) nothing is +deleted; the response reports the match count and a sample of up to 10 items so +you can verify the blast radius first. Re-post the same body with +`"confirm": true` to actually delete. At most 1000 memories are removed per call; +if `has_more` is `true`, repeat the call until it isn't. Deletions go through +mem0 (not a raw vector-store filter delete), so each memory's history stays +consistent. + +Typical use — undo a bad import run: + +```bash +# 1. Dry run: how much would this delete? +curl -X POST https://mem0.your-domain.com/api/v1/memories/delete_bulk \ + -H "Authorization: Bearer $MEM0_API_KEY" -H "Content-Type: application/json" \ + -d '{"source": "import:chatgpt"}' +# -> {"matched": 412, "deleted": 0, "dry_run": true, "has_more": false, "sample": [...]} + +# 2. Looks right - confirm: +curl -X POST https://mem0.your-domain.com/api/v1/memories/delete_bulk \ + -H "Authorization: Bearer $MEM0_API_KEY" -H "Content-Type: application/json" \ + -d '{"source": "import:chatgpt", "confirm": true}' +# -> {"matched": 412, "deleted": 412, "dry_run": false, "has_more": false, ...} +``` + +There is intentionally **no MCP equivalent** — a destructive filter-delete is an +operator/script action, not something a connected agent should be able to reach for. + ### History — `GET /api/v1/memories/{memory_id}/history` Returns the change history for a memory. diff --git a/tests/test_memory.py b/tests/test_memory.py index 73525c7..77df099 100644 --- a/tests/test_memory.py +++ b/tests/test_memory.py @@ -275,3 +275,50 @@ def test_keyword_search_fails_open(monkeypatch): fake.vector_store.list.side_effect = RuntimeError("qdrant down") monkeypatch.setattr(m, "get_memory", lambda: fake) assert keyword_search("x", user_id="ian") == {"results": []} + + +# --- bulk_delete --------------------------------------------------------------- + + +def _patch_bulk(monkeypatch, points): + import app.memory as m + + fake = MagicMock() + fake.vector_store.list.return_value = (points, None) + monkeypatch.setattr(m, "get_memory", lambda: fake) + return fake + + +def test_bulk_delete_caps_and_reports_has_more(monkeypatch): + from app.memory import bulk_delete + + points = [_point(f"m{i}", f"fact {i}") for i in range(5)] + fake = _patch_bulk(monkeypatch, points) + out = bulk_delete(filters={"agent_id": "a"}, confirm=True, max_delete=3) + assert out["matched"] == 3 + assert out["deleted"] == 3 + assert out["has_more"] is True + assert fake.delete.call_count == 3 + # One extra point is fetched purely as the has_more signal. + _, kwargs = fake.vector_store.list.call_args + assert kwargs["top_k"] == 4 + + +def test_bulk_delete_sample_capped_at_ten(monkeypatch): + from app.memory import bulk_delete + + points = [_point(f"m{i}", f"fact {i}") for i in range(15)] + _patch_bulk(monkeypatch, points) + out = bulk_delete(filters={"agent_id": "a"}) + assert out["matched"] == 15 + assert len(out["sample"]) == 10 + assert out["dry_run"] is True + + +def test_bulk_delete_goes_through_mem0_not_vector_store(monkeypatch): + from app.memory import bulk_delete + + fake = _patch_bulk(monkeypatch, [_point("m1", "x")]) + bulk_delete(filters={"agent_id": "a"}, confirm=True) + fake.delete.assert_called_once_with(memory_id="m1") + fake.vector_store.delete.assert_not_called() diff --git a/tests/test_rest.py b/tests/test_rest.py index 8309b8b..d6a5fea 100644 --- a/tests/test_rest.py +++ b/tests/test_rest.py @@ -308,3 +308,79 @@ def test_healthz_unreachable(app_instance): resp = c.get("/healthz") assert resp.status_code == 503 assert resp.json()["ok"] is False + + +def _bulk(c, auth_header, body): + return c.post("/api/v1/memories/delete_bulk", json=body, headers=auth_header) + + +def test_bulk_delete_requires_a_filter(app_instance, mem, auth_header): + c = _client(app_instance) + # No filter at all, and user_id alone, must both be rejected. + assert _bulk(c, auth_header, {}).status_code == 422 + assert _bulk(c, auth_header, {"user_id": "default-user"}).status_code == 422 + assert _bulk(c, auth_header, {"confirm": True}).status_code == 422 + mem.vector_store.list.assert_not_called() + mem.delete.assert_not_called() + + +def test_bulk_delete_dry_run_by_default(app_instance, mem, auth_header): + from types import SimpleNamespace + + points = [ + SimpleNamespace(id=f"m{i}", payload={"data": f"fact {i}"}) for i in range(3) + ] + mem.vector_store.list.return_value = (points, None) + c = _client(app_instance) + resp = _bulk(c, auth_header, {"agent_id": "capture:telegram"}) + assert resp.status_code == 200 + body = resp.json() + assert body["dry_run"] is True + assert body["matched"] == 3 + assert body["deleted"] == 0 + assert body["has_more"] is False + assert body["sample"][0] == {"id": "m0", "memory": "fact 0"} + mem.delete.assert_not_called() + _, kwargs = mem.vector_store.list.call_args + assert kwargs["filters"] == { + "user_id": "default-user", + "agent_id": "capture:telegram", + } + + +def test_bulk_delete_confirm_deletes_each_id(app_instance, mem, auth_header): + from types import SimpleNamespace + + points = [SimpleNamespace(id=f"m{i}", payload={"data": "x"}) for i in range(3)] + mem.vector_store.list.return_value = (points, None) + c = _client(app_instance) + body = _bulk( + c, auth_header, {"source": "import:chatgpt", "confirm": True} + ).json() + assert body["dry_run"] is False + assert body["deleted"] == 3 + assert [c.kwargs["memory_id"] for c in mem.delete.call_args_list] == [ + "m0", + "m1", + "m2", + ] + + +def test_bulk_delete_composes_provenance_and_agent_filters( + app_instance, mem, auth_header +): + mem.vector_store.list.return_value = ([], None) + c = _client(app_instance) + resp = _bulk( + c, + auth_header, + {"agent_id": "n8n", "review_status": "rejected", "confidence": "low"}, + ) + assert resp.status_code == 200 + _, kwargs = mem.vector_store.list.call_args + assert kwargs["filters"] == { + "user_id": "default-user", + "agent_id": "n8n", + "review_status": "rejected", + "confidence": "low", + } From 8e8ba9e4d6544d40614d5af5d3d6cbd17e9c41d9 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 9 Jun 2026 20:40:57 +0000 Subject: [PATCH 2/2] Address review: partial-failure reporting, defensive ids, clearer docs https://claude.ai/code/session_01H2Dbh6kD8bseWZZEf7kGhx --- app/memory.py | 19 +++++++++++++++++-- app/rest.py | 4 ++-- docs/USER_GUIDE.md | 10 ++++++---- tests/test_memory.py | 12 ++++++++++++ 4 files changed, 37 insertions(+), 8 deletions(-) diff --git a/app/memory.py b/app/memory.py index 1bfc9e0..70dd166 100644 --- a/app/memory.py +++ b/app/memory.py @@ -235,15 +235,30 @@ def bulk_delete(*, filters: dict, confirm: bool = False, max_delete: int = BULK_ if not confirm: return response deleted = 0 + error: str | None = None for point in points: - memory.delete(memory_id=point.id) + point_id = getattr(point, "id", None) + if point_id is None: + continue + try: + memory.delete(memory_id=point_id) + except Exception: + # Report partial progress instead of losing the response: deletes + # are idempotent, so the caller just re-posts until has_more is + # false. The traceback is logged below. + error = "delete_failed_partway" + _log.exception("bulk_delete_item_failed", memory_id=point_id) + break deleted += 1 observe_bulk_delete(deleted) _log.warning( "bulk_delete", filters=filters, matched=len(points), deleted=deleted, - has_more=has_more, + has_more=has_more, error=error, ) response["deleted"] = deleted + if error: + response["error"] = error + response["has_more"] = True # the remainder was not attempted return response diff --git a/app/rest.py b/app/rest.py index 09126f8..c8821f5 100644 --- a/app/rest.py +++ b/app/rest.py @@ -151,8 +151,8 @@ def delete_bulk(req: BulkDeleteRequest) -> dict: detail=( "Provide at least one filter (agent_id, run_id, source, " "confidence, review_status). Deleting the entire store is not " - "supported through this endpoint — restore from a backup-less " - "state by dropping the Qdrant collection instead." + "supported through this endpoint; if you really mean to start " + "over, drop the Qdrant collection instead." ), ) filters = {**_scope_kwargs(req.user_id, req.agent_id, req.run_id), **prov} diff --git a/docs/USER_GUIDE.md b/docs/USER_GUIDE.md index ec04ca5..0670bdd 100644 --- a/docs/USER_GUIDE.md +++ b/docs/USER_GUIDE.md @@ -637,10 +637,12 @@ deliberately impossible. It is a **dry run by default**: with `"confirm": false` (or omitted) nothing is deleted; the response reports the match count and a sample of up to 10 items so you can verify the blast radius first. Re-post the same body with -`"confirm": true` to actually delete. At most 1000 memories are removed per call; -if `has_more` is `true`, repeat the call until it isn't. Deletions go through -mem0 (not a raw vector-store filter delete), so each memory's history stays -consistent. +`"confirm": true` to actually delete. `matched` and `deleted` are **per call**, +capped at 1000; if `has_more` is `true`, more memories match than this call +covered — repeat the call until it's `false`. If a deletion fails partway, the +response carries `"error": "delete_failed_partway"` with the partial `deleted` +count; deletes are idempotent, so just re-post. Deletions go through mem0 (not a +raw vector-store filter delete), so each memory's history stays consistent. Typical use — undo a bad import run: diff --git a/tests/test_memory.py b/tests/test_memory.py index 77df099..c7559f3 100644 --- a/tests/test_memory.py +++ b/tests/test_memory.py @@ -322,3 +322,15 @@ def test_bulk_delete_goes_through_mem0_not_vector_store(monkeypatch): bulk_delete(filters={"agent_id": "a"}, confirm=True) fake.delete.assert_called_once_with(memory_id="m1") fake.vector_store.delete.assert_not_called() + + +def test_bulk_delete_partial_failure_reports_progress(monkeypatch): + from app.memory import bulk_delete + + fake = _patch_bulk(monkeypatch, [_point(f"m{i}", "x") for i in range(3)]) + fake.delete.side_effect = [None, RuntimeError("qdrant hiccup"), None] + out = bulk_delete(filters={"agent_id": "a"}, confirm=True) + assert out["deleted"] == 1 + assert out["error"] == "delete_failed_partway" + assert out["has_more"] is True # remainder not attempted; caller re-posts + assert fake.delete.call_count == 2 # stopped at the failure