Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions app/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,14 @@
from datetime import UTC, datetime
from functools import lru_cache

import structlog

from app.config import Settings, get_settings
from app.metrics import observe_bulk_delete
from app.ranking import _parse_timestamp

_log = structlog.get_logger()


def _provider_config(model: str, api_key: str | None) -> dict:
config = {"model": model}
Expand Down Expand Up @@ -290,6 +295,72 @@ def keyword_search(
return {"results": _substring_matches(points, needle, limit)}


# Hard ceiling on deletions per bulk_delete call. Bounds request duration (each
# delete is a mem0 round-trip) so a huge purge can't outlive the reverse proxy's
# timeout; callers loop on has_more instead.
BULK_DELETE_MAX = 1000


def bulk_delete(*, filters: dict, confirm: bool = False, max_delete: int = BULK_DELETE_MAX) -> dict:
"""Delete (or, by default, just count) every memory matching exact filters.

Dry-run by default: with confirm=False nothing is deleted; the response
reports how many memories matched and a small sample so the caller can
verify the blast radius before re-posting with confirm=true.

Deletions go through mem0's Memory.delete per ID — never a raw vector-store
filter delete — so mem0's history/bookkeeping stays consistent. At most
`max_delete` memories are removed per call; `has_more` tells the caller to
loop. The caller is responsible for ensuring `filters` is meaningfully
scoped (see the REST endpoint).
"""
memory = get_memory()
result = memory.vector_store.list(filters=filters, top_k=max_delete + 1)
points = result[0] if isinstance(result, tuple) else result
points = list(points or [])
has_more = len(points) > max_delete
points = points[:max_delete]
sample = [
{"id": item["id"], "memory": item.get("memory")}
for item in (_point_to_result(p) for p in points[:10])
]
response = {
"matched": len(points),
"deleted": 0,
"dry_run": not confirm,
"has_more": has_more,
"sample": sample,
}
if not confirm:
return response
deleted = 0
error: str | None = None
for point in points:
point_id = getattr(point, "id", None)
if point_id is None:
continue
try:
memory.delete(memory_id=point_id)
except Exception:
# Report partial progress instead of losing the response: deletes
# are idempotent, so the caller just re-posts until has_more is
# false. The traceback is logged below.
error = "delete_failed_partway"
_log.exception("bulk_delete_item_failed", memory_id=point_id)
break
deleted += 1
observe_bulk_delete(deleted)
_log.warning(
"bulk_delete", filters=filters, matched=len(points), deleted=deleted,
has_more=has_more, error=error,
)
response["deleted"] = deleted
if error:
response["error"] = error
response["has_more"] = True # the remainder was not attempted
return response


# Upper bound on the list-pagination offset. Offset paging is emulated by
# over-fetching offset+limit+1 items and slicing (mem0's get_all has no native
# offset), so the bound caps the per-request fetch size. 10k is far beyond a
Expand Down
9 changes: 9 additions & 0 deletions app/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@
)


BULK_DELETED = Counter(
"memories_bulk_deleted_total",
"Memories removed via the bulk delete endpoint.",
)

# Brute-force signal: failed auth attempts and rate-limited rejections, by
# auth surface ("rest", "mcp", "oauth_consent", "oauth_token").
AUTH_FAILURES = Counter(
Expand All @@ -33,6 +38,10 @@ def observe_request(method: str, path: str, status: int, duration_s: float) -> N
REQUEST_LATENCY.labels(method=method, path=path).observe(duration_s)


def observe_bulk_delete(count: int) -> None:
BULK_DELETED.inc(count)


def observe_auth_failure(surface: str) -> None:
AUTH_FAILURES.labels(surface=surface).inc()

Expand Down
35 changes: 35 additions & 0 deletions app/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,19 @@ class UpdateMemoryRequest(BaseModel):
content: str


class BulkDeleteRequest(BaseModel):
# Exact-match filters; at least one besides user_id is required, so a bare
# POST can never wipe the whole store.
agent_id: str | None = None
run_id: str | None = None
source: str | None = None
confidence: str | None = None
review_status: str | None = None
user_id: str | None = None
# False (default) = dry run: count + sample only, nothing deleted.
confirm: bool = False


# --- Response models ---------------------------------------------------------
# These document and validate the *stable* parts of mem0's payloads without
# freezing them: extra="allow" passes unexpected mem0 fields through untouched,
Expand Down Expand Up @@ -185,6 +198,28 @@ def list_memories(
return memory_mod.drop_expired(results) if exclude_expired else results


@router.post("/memories/delete_bulk")
def delete_bulk(req: BulkDeleteRequest) -> dict:
"""Delete every memory matching the given filters; dry-run by default.

A POST (not DELETE) for the same reason search is: it takes a JSON body,
and DELETE request bodies are ambiguous across proxies/clients.
"""
prov = _provenance_filters(req.source, req.confidence, req.review_status)
if not (req.agent_id or req.run_id or prov):
raise HTTPException(
status_code=422,
detail=(
"Provide at least one filter (agent_id, run_id, source, "
"confidence, review_status). Deleting the entire store is not "
"supported through this endpoint; if you really mean to start "
"over, drop the Qdrant collection instead."
),
)
filters = {**_scope_kwargs(req.user_id, req.agent_id, req.run_id), **prov}
return memory_mod.bulk_delete(filters=filters, confirm=req.confirm)


def _get_or_404(memory, memory_id: str) -> dict:
result = memory.get(memory_id=memory_id)
if not result:
Expand Down
7 changes: 6 additions & 1 deletion docs/DEVELOPER_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,12 @@ app/
is past. list_paginated() implements offset paging for list reads: mem0's
get_all has no offset, so it over-fetches offset+limit+1 (capped by
MAX_LIST_OFFSET) and slices, using the extra item as the has_more signal.
The most tweak-prone file.
bulk_delete() backs POST /memories/delete_bulk: dry-run by default, capped
at BULK_DELETE_MAX per call (has_more signals the caller to loop), and
deletes through Memory.delete per ID — never a raw vector-store filter
delete — so mem0's history stays consistent. Deliberately NOT exposed as
an MCP tool: a destructive filter-delete is an operator action, not
something a model should reach for. The most tweak-prone file.
mcp_server.py build_mcp(): the six MCP tools, each thinly wrapping a mem0 op with
user_id defaulted to MEM0_DEFAULT_USER_ID. list_memories pages (default 50,
max 100 per call) so the whole store is never returned in one response.
Expand Down
36 changes: 36 additions & 0 deletions docs/USER_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,42 @@ Body: `{"content": "new text"}`. Returns 404 if the memory does not exist.

Returns `{"deleted": true, "memory_id": "…"}`, or 404 if the memory does not exist.

### Bulk delete — `POST /api/v1/memories/delete_bulk`

Deletes every memory matching exact-match filters: `agent_id`, `run_id`, `source`,
`confidence`, `review_status` (plus optional `user_id`). **At least one filter
besides `user_id` is required** — wiping the whole store through this endpoint is
deliberately impossible.

It is a **dry run by default**: with `"confirm": false` (or omitted) nothing is
deleted; the response reports the match count and a sample of up to 10 items so
you can verify the blast radius first. Re-post the same body with
`"confirm": true` to actually delete. `matched` and `deleted` are **per call**,
capped at 1000; if `has_more` is `true`, more memories match than this call
covered — repeat the call until it's `false`. If a deletion fails partway, the
response carries `"error": "delete_failed_partway"` with the partial `deleted`
count; deletes are idempotent, so just re-post. Deletions go through mem0 (not a
raw vector-store filter delete), so each memory's history stays consistent.

Typical use — undo a bad import run:

```bash
# 1. Dry run: how much would this delete?
curl -X POST https://mem0.your-domain.com/api/v1/memories/delete_bulk \
-H "Authorization: Bearer $MEM0_API_KEY" -H "Content-Type: application/json" \
-d '{"source": "import:chatgpt"}'
# -> {"matched": 412, "deleted": 0, "dry_run": true, "has_more": false, "sample": [...]}

# 2. Looks right - confirm:
curl -X POST https://mem0.your-domain.com/api/v1/memories/delete_bulk \
-H "Authorization: Bearer $MEM0_API_KEY" -H "Content-Type: application/json" \
-d '{"source": "import:chatgpt", "confirm": true}'
# -> {"matched": 412, "deleted": 412, "dry_run": false, "has_more": false, ...}
```

There is intentionally **no MCP equivalent** — a destructive filter-delete is an
operator/script action, not something a connected agent should be able to reach for.

### History — `GET /api/v1/memories/{memory_id}/history`

Returns the change history for a memory.
Expand Down
59 changes: 59 additions & 0 deletions tests/test_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,65 @@ def _patch_indexed(monkeypatch, indexed_points, scanned_points=()):
return fake


# --- bulk_delete ---------------------------------------------------------------


def _patch_bulk(monkeypatch, points):
import app.memory as m

fake = MagicMock()
fake.vector_store.list.return_value = (points, None)
monkeypatch.setattr(m, "get_memory", lambda: fake)
return fake


def test_bulk_delete_caps_and_reports_has_more(monkeypatch):
from app.memory import bulk_delete

points = [_point(f"m{i}", f"fact {i}") for i in range(5)]
fake = _patch_bulk(monkeypatch, points)
out = bulk_delete(filters={"agent_id": "a"}, confirm=True, max_delete=3)
assert out["matched"] == 3
assert out["deleted"] == 3
assert out["has_more"] is True
assert fake.delete.call_count == 3
# One extra point is fetched purely as the has_more signal.
_, kwargs = fake.vector_store.list.call_args
assert kwargs["top_k"] == 4


def test_bulk_delete_sample_capped_at_ten(monkeypatch):
from app.memory import bulk_delete

points = [_point(f"m{i}", f"fact {i}") for i in range(15)]
_patch_bulk(monkeypatch, points)
out = bulk_delete(filters={"agent_id": "a"})
assert out["matched"] == 15
assert len(out["sample"]) == 10
assert out["dry_run"] is True


def test_bulk_delete_goes_through_mem0_not_vector_store(monkeypatch):
from app.memory import bulk_delete

fake = _patch_bulk(monkeypatch, [_point("m1", "x")])
bulk_delete(filters={"agent_id": "a"}, confirm=True)
fake.delete.assert_called_once_with(memory_id="m1")
fake.vector_store.delete.assert_not_called()


def test_bulk_delete_partial_failure_reports_progress(monkeypatch):
from app.memory import bulk_delete

fake = _patch_bulk(monkeypatch, [_point(f"m{i}", "x") for i in range(3)])
fake.delete.side_effect = [None, RuntimeError("qdrant hiccup"), None]
out = bulk_delete(filters={"agent_id": "a"}, confirm=True)
assert out["deleted"] == 1
assert out["error"] == "delete_failed_partway"
assert out["has_more"] is True # remainder not attempted; caller re-posts
assert fake.delete.call_count == 2 # stopped at the failure


def test_keyword_search_uses_index_and_skips_full_scan(monkeypatch):
from qdrant_client import models

Expand Down
76 changes: 76 additions & 0 deletions tests/test_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,82 @@ def test_healthz_unreachable(app_instance):
assert resp.json()["ok"] is False


def _bulk(c, auth_header, body):
return c.post("/api/v1/memories/delete_bulk", json=body, headers=auth_header)


def test_bulk_delete_requires_a_filter(app_instance, mem, auth_header):
c = _client(app_instance)
# No filter at all, and user_id alone, must both be rejected.
assert _bulk(c, auth_header, {}).status_code == 422
assert _bulk(c, auth_header, {"user_id": "default-user"}).status_code == 422
assert _bulk(c, auth_header, {"confirm": True}).status_code == 422
mem.vector_store.list.assert_not_called()
mem.delete.assert_not_called()


def test_bulk_delete_dry_run_by_default(app_instance, mem, auth_header):
from types import SimpleNamespace

points = [
SimpleNamespace(id=f"m{i}", payload={"data": f"fact {i}"}) for i in range(3)
]
mem.vector_store.list.return_value = (points, None)
c = _client(app_instance)
resp = _bulk(c, auth_header, {"agent_id": "capture:telegram"})
assert resp.status_code == 200
body = resp.json()
assert body["dry_run"] is True
assert body["matched"] == 3
assert body["deleted"] == 0
assert body["has_more"] is False
assert body["sample"][0] == {"id": "m0", "memory": "fact 0"}
mem.delete.assert_not_called()
_, kwargs = mem.vector_store.list.call_args
assert kwargs["filters"] == {
"user_id": "default-user",
"agent_id": "capture:telegram",
}


def test_bulk_delete_confirm_deletes_each_id(app_instance, mem, auth_header):
from types import SimpleNamespace

points = [SimpleNamespace(id=f"m{i}", payload={"data": "x"}) for i in range(3)]
mem.vector_store.list.return_value = (points, None)
c = _client(app_instance)
body = _bulk(
c, auth_header, {"source": "import:chatgpt", "confirm": True}
).json()
assert body["dry_run"] is False
assert body["deleted"] == 3
assert [c.kwargs["memory_id"] for c in mem.delete.call_args_list] == [
"m0",
"m1",
"m2",
]


def test_bulk_delete_composes_provenance_and_agent_filters(
app_instance, mem, auth_header
):
mem.vector_store.list.return_value = ([], None)
c = _client(app_instance)
resp = _bulk(
c,
auth_header,
{"agent_id": "n8n", "review_status": "rejected", "confidence": "low"},
)
assert resp.status_code == 200
_, kwargs = mem.vector_store.list.call_args
assert kwargs["filters"] == {
"user_id": "default-user",
"agent_id": "n8n",
"review_status": "rejected",
"confidence": "low",
}


def test_update_missing_memory_404(app_instance, mem, auth_header):
mem.get.return_value = None
c = _client(app_instance)
Expand Down
Loading