diff --git a/app/memory.py b/app/memory.py index 99fc5bb..e841f0f 100644 --- a/app/memory.py +++ b/app/memory.py @@ -4,9 +4,14 @@ from datetime import UTC, datetime from functools import lru_cache +import structlog + from app.config import Settings, get_settings +from app.metrics import observe_bulk_delete from app.ranking import _parse_timestamp +_log = structlog.get_logger() + def _provider_config(model: str, api_key: str | None) -> dict: config = {"model": model} @@ -290,6 +295,72 @@ def keyword_search( return {"results": _substring_matches(points, needle, limit)} +# Hard ceiling on deletions per bulk_delete call. Bounds request duration (each +# delete is a mem0 round-trip) so a huge purge can't outlive the reverse proxy's +# timeout; callers loop on has_more instead. +BULK_DELETE_MAX = 1000 + + +def bulk_delete(*, filters: dict, confirm: bool = False, max_delete: int = BULK_DELETE_MAX) -> dict: + """Delete (or, by default, just count) every memory matching exact filters. + + Dry-run by default: with confirm=False nothing is deleted; the response + reports how many memories matched and a small sample so the caller can + verify the blast radius before re-posting with confirm=true. + + Deletions go through mem0's Memory.delete per ID — never a raw vector-store + filter delete — so mem0's history/bookkeeping stays consistent. At most + `max_delete` memories are removed per call; `has_more` tells the caller to + loop. The caller is responsible for ensuring `filters` is meaningfully + scoped (see the REST endpoint). + """ + memory = get_memory() + result = memory.vector_store.list(filters=filters, top_k=max_delete + 1) + points = result[0] if isinstance(result, tuple) else result + points = list(points or []) + has_more = len(points) > max_delete + points = points[:max_delete] + sample = [ + {"id": item["id"], "memory": item.get("memory")} + for item in (_point_to_result(p) for p in points[:10]) + ] + response = { + "matched": len(points), + "deleted": 0, + "dry_run": not confirm, + "has_more": has_more, + "sample": sample, + } + if not confirm: + return response + deleted = 0 + error: str | None = None + for point in points: + point_id = getattr(point, "id", None) + if point_id is None: + continue + try: + memory.delete(memory_id=point_id) + except Exception: + # Report partial progress instead of losing the response: deletes + # are idempotent, so the caller just re-posts until has_more is + # false. The traceback is logged below. + error = "delete_failed_partway" + _log.exception("bulk_delete_item_failed", memory_id=point_id) + break + deleted += 1 + observe_bulk_delete(deleted) + _log.warning( + "bulk_delete", filters=filters, matched=len(points), deleted=deleted, + has_more=has_more, error=error, + ) + response["deleted"] = deleted + if error: + response["error"] = error + response["has_more"] = True # the remainder was not attempted + return response + + # Upper bound on the list-pagination offset. Offset paging is emulated by # over-fetching offset+limit+1 items and slicing (mem0's get_all has no native # offset), so the bound caps the per-request fetch size. 10k is far beyond a diff --git a/app/metrics.py b/app/metrics.py index 82c458f..3a92d6b 100644 --- a/app/metrics.py +++ b/app/metrics.py @@ -14,6 +14,11 @@ ) +BULK_DELETED = Counter( + "memories_bulk_deleted_total", + "Memories removed via the bulk delete endpoint.", +) + # Brute-force signal: failed auth attempts and rate-limited rejections, by # auth surface ("rest", "mcp", "oauth_consent", "oauth_token"). AUTH_FAILURES = Counter( @@ -33,6 +38,10 @@ def observe_request(method: str, path: str, status: int, duration_s: float) -> N REQUEST_LATENCY.labels(method=method, path=path).observe(duration_s) +def observe_bulk_delete(count: int) -> None: + BULK_DELETED.inc(count) + + def observe_auth_failure(surface: str) -> None: AUTH_FAILURES.labels(surface=surface).inc() diff --git a/app/rest.py b/app/rest.py index 981bd36..aa48bcd 100644 --- a/app/rest.py +++ b/app/rest.py @@ -55,6 +55,19 @@ class UpdateMemoryRequest(BaseModel): content: str +class BulkDeleteRequest(BaseModel): + # Exact-match filters; at least one besides user_id is required, so a bare + # POST can never wipe the whole store. + agent_id: str | None = None + run_id: str | None = None + source: str | None = None + confidence: str | None = None + review_status: str | None = None + user_id: str | None = None + # False (default) = dry run: count + sample only, nothing deleted. + confirm: bool = False + + # --- Response models --------------------------------------------------------- # These document and validate the *stable* parts of mem0's payloads without # freezing them: extra="allow" passes unexpected mem0 fields through untouched, @@ -185,6 +198,28 @@ def list_memories( return memory_mod.drop_expired(results) if exclude_expired else results +@router.post("/memories/delete_bulk") +def delete_bulk(req: BulkDeleteRequest) -> dict: + """Delete every memory matching the given filters; dry-run by default. + + A POST (not DELETE) for the same reason search is: it takes a JSON body, + and DELETE request bodies are ambiguous across proxies/clients. + """ + prov = _provenance_filters(req.source, req.confidence, req.review_status) + if not (req.agent_id or req.run_id or prov): + raise HTTPException( + status_code=422, + detail=( + "Provide at least one filter (agent_id, run_id, source, " + "confidence, review_status). Deleting the entire store is not " + "supported through this endpoint; if you really mean to start " + "over, drop the Qdrant collection instead." + ), + ) + filters = {**_scope_kwargs(req.user_id, req.agent_id, req.run_id), **prov} + return memory_mod.bulk_delete(filters=filters, confirm=req.confirm) + + def _get_or_404(memory, memory_id: str) -> dict: result = memory.get(memory_id=memory_id) if not result: diff --git a/docs/DEVELOPER_GUIDE.md b/docs/DEVELOPER_GUIDE.md index 51b20b3..a36aedd 100644 --- a/docs/DEVELOPER_GUIDE.md +++ b/docs/DEVELOPER_GUIDE.md @@ -75,7 +75,12 @@ app/ is past. list_paginated() implements offset paging for list reads: mem0's get_all has no offset, so it over-fetches offset+limit+1 (capped by MAX_LIST_OFFSET) and slices, using the extra item as the has_more signal. - The most tweak-prone file. + bulk_delete() backs POST /memories/delete_bulk: dry-run by default, capped + at BULK_DELETE_MAX per call (has_more signals the caller to loop), and + deletes through Memory.delete per ID — never a raw vector-store filter + delete — so mem0's history stays consistent. Deliberately NOT exposed as + an MCP tool: a destructive filter-delete is an operator action, not + something a model should reach for. The most tweak-prone file. mcp_server.py build_mcp(): the six MCP tools, each thinly wrapping a mem0 op with user_id defaulted to MEM0_DEFAULT_USER_ID. list_memories pages (default 50, max 100 per call) so the whole store is never returned in one response. diff --git a/docs/USER_GUIDE.md b/docs/USER_GUIDE.md index 9422308..362724c 100644 --- a/docs/USER_GUIDE.md +++ b/docs/USER_GUIDE.md @@ -681,6 +681,42 @@ Body: `{"content": "new text"}`. Returns 404 if the memory does not exist. Returns `{"deleted": true, "memory_id": "…"}`, or 404 if the memory does not exist. +### Bulk delete — `POST /api/v1/memories/delete_bulk` + +Deletes every memory matching exact-match filters: `agent_id`, `run_id`, `source`, +`confidence`, `review_status` (plus optional `user_id`). **At least one filter +besides `user_id` is required** — wiping the whole store through this endpoint is +deliberately impossible. + +It is a **dry run by default**: with `"confirm": false` (or omitted) nothing is +deleted; the response reports the match count and a sample of up to 10 items so +you can verify the blast radius first. Re-post the same body with +`"confirm": true` to actually delete. `matched` and `deleted` are **per call**, +capped at 1000; if `has_more` is `true`, more memories match than this call +covered — repeat the call until it's `false`. If a deletion fails partway, the +response carries `"error": "delete_failed_partway"` with the partial `deleted` +count; deletes are idempotent, so just re-post. Deletions go through mem0 (not a +raw vector-store filter delete), so each memory's history stays consistent. + +Typical use — undo a bad import run: + +```bash +# 1. Dry run: how much would this delete? +curl -X POST https://mem0.your-domain.com/api/v1/memories/delete_bulk \ + -H "Authorization: Bearer $MEM0_API_KEY" -H "Content-Type: application/json" \ + -d '{"source": "import:chatgpt"}' +# -> {"matched": 412, "deleted": 0, "dry_run": true, "has_more": false, "sample": [...]} + +# 2. Looks right - confirm: +curl -X POST https://mem0.your-domain.com/api/v1/memories/delete_bulk \ + -H "Authorization: Bearer $MEM0_API_KEY" -H "Content-Type: application/json" \ + -d '{"source": "import:chatgpt", "confirm": true}' +# -> {"matched": 412, "deleted": 412, "dry_run": false, "has_more": false, ...} +``` + +There is intentionally **no MCP equivalent** — a destructive filter-delete is an +operator/script action, not something a connected agent should be able to reach for. + ### History — `GET /api/v1/memories/{memory_id}/history` Returns the change history for a memory. diff --git a/tests/test_memory.py b/tests/test_memory.py index 8ec45d5..36f51c8 100644 --- a/tests/test_memory.py +++ b/tests/test_memory.py @@ -295,6 +295,65 @@ def _patch_indexed(monkeypatch, indexed_points, scanned_points=()): return fake +# --- bulk_delete --------------------------------------------------------------- + + +def _patch_bulk(monkeypatch, points): + import app.memory as m + + fake = MagicMock() + fake.vector_store.list.return_value = (points, None) + monkeypatch.setattr(m, "get_memory", lambda: fake) + return fake + + +def test_bulk_delete_caps_and_reports_has_more(monkeypatch): + from app.memory import bulk_delete + + points = [_point(f"m{i}", f"fact {i}") for i in range(5)] + fake = _patch_bulk(monkeypatch, points) + out = bulk_delete(filters={"agent_id": "a"}, confirm=True, max_delete=3) + assert out["matched"] == 3 + assert out["deleted"] == 3 + assert out["has_more"] is True + assert fake.delete.call_count == 3 + # One extra point is fetched purely as the has_more signal. + _, kwargs = fake.vector_store.list.call_args + assert kwargs["top_k"] == 4 + + +def test_bulk_delete_sample_capped_at_ten(monkeypatch): + from app.memory import bulk_delete + + points = [_point(f"m{i}", f"fact {i}") for i in range(15)] + _patch_bulk(monkeypatch, points) + out = bulk_delete(filters={"agent_id": "a"}) + assert out["matched"] == 15 + assert len(out["sample"]) == 10 + assert out["dry_run"] is True + + +def test_bulk_delete_goes_through_mem0_not_vector_store(monkeypatch): + from app.memory import bulk_delete + + fake = _patch_bulk(monkeypatch, [_point("m1", "x")]) + bulk_delete(filters={"agent_id": "a"}, confirm=True) + fake.delete.assert_called_once_with(memory_id="m1") + fake.vector_store.delete.assert_not_called() + + +def test_bulk_delete_partial_failure_reports_progress(monkeypatch): + from app.memory import bulk_delete + + fake = _patch_bulk(monkeypatch, [_point(f"m{i}", "x") for i in range(3)]) + fake.delete.side_effect = [None, RuntimeError("qdrant hiccup"), None] + out = bulk_delete(filters={"agent_id": "a"}, confirm=True) + assert out["deleted"] == 1 + assert out["error"] == "delete_failed_partway" + assert out["has_more"] is True # remainder not attempted; caller re-posts + assert fake.delete.call_count == 2 # stopped at the failure + + def test_keyword_search_uses_index_and_skips_full_scan(monkeypatch): from qdrant_client import models diff --git a/tests/test_rest.py b/tests/test_rest.py index 4738ee1..101bb4f 100644 --- a/tests/test_rest.py +++ b/tests/test_rest.py @@ -372,6 +372,82 @@ def test_healthz_unreachable(app_instance): assert resp.json()["ok"] is False +def _bulk(c, auth_header, body): + return c.post("/api/v1/memories/delete_bulk", json=body, headers=auth_header) + + +def test_bulk_delete_requires_a_filter(app_instance, mem, auth_header): + c = _client(app_instance) + # No filter at all, and user_id alone, must both be rejected. + assert _bulk(c, auth_header, {}).status_code == 422 + assert _bulk(c, auth_header, {"user_id": "default-user"}).status_code == 422 + assert _bulk(c, auth_header, {"confirm": True}).status_code == 422 + mem.vector_store.list.assert_not_called() + mem.delete.assert_not_called() + + +def test_bulk_delete_dry_run_by_default(app_instance, mem, auth_header): + from types import SimpleNamespace + + points = [ + SimpleNamespace(id=f"m{i}", payload={"data": f"fact {i}"}) for i in range(3) + ] + mem.vector_store.list.return_value = (points, None) + c = _client(app_instance) + resp = _bulk(c, auth_header, {"agent_id": "capture:telegram"}) + assert resp.status_code == 200 + body = resp.json() + assert body["dry_run"] is True + assert body["matched"] == 3 + assert body["deleted"] == 0 + assert body["has_more"] is False + assert body["sample"][0] == {"id": "m0", "memory": "fact 0"} + mem.delete.assert_not_called() + _, kwargs = mem.vector_store.list.call_args + assert kwargs["filters"] == { + "user_id": "default-user", + "agent_id": "capture:telegram", + } + + +def test_bulk_delete_confirm_deletes_each_id(app_instance, mem, auth_header): + from types import SimpleNamespace + + points = [SimpleNamespace(id=f"m{i}", payload={"data": "x"}) for i in range(3)] + mem.vector_store.list.return_value = (points, None) + c = _client(app_instance) + body = _bulk( + c, auth_header, {"source": "import:chatgpt", "confirm": True} + ).json() + assert body["dry_run"] is False + assert body["deleted"] == 3 + assert [c.kwargs["memory_id"] for c in mem.delete.call_args_list] == [ + "m0", + "m1", + "m2", + ] + + +def test_bulk_delete_composes_provenance_and_agent_filters( + app_instance, mem, auth_header +): + mem.vector_store.list.return_value = ([], None) + c = _client(app_instance) + resp = _bulk( + c, + auth_header, + {"agent_id": "n8n", "review_status": "rejected", "confidence": "low"}, + ) + assert resp.status_code == 200 + _, kwargs = mem.vector_store.list.call_args + assert kwargs["filters"] == { + "user_id": "default-user", + "agent_id": "n8n", + "review_status": "rejected", + "confidence": "low", + } + + def test_update_missing_memory_404(app_instance, mem, auth_header): mem.get.return_value = None c = _client(app_instance)