From 343b9bdd8e690dc7ab595d65375d7b70836e4084 Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 8 Apr 2026 11:15:41 +0000 Subject: [PATCH] feat: cross-session memory persistence with hybrid search (closes #133) Wire up the 3-tier memory system so conversation context survives across REPL sessions: - Tier 2: ConversationEngine._maybe_compact() now calls compact_and_save() when a summaries directory is configured, writing a Markdown summary to disk and auto-indexing it into Tier 3. - Tier 3: MemorySearcher gains an optional embedding_fn callable. When provided, summaries are embedded and stored in a new session_embeddings table; search() fuses BM25 keyword hits with cosine-similarity vector hits via reciprocal rank fusion. Keyword-only and vector-only search paths still work on their own. - Cross-session loading: qracer repl instantiates a file-backed MemorySearcher at ~/.qracer/memory_index.duckdb, indexes every Markdown file in ~/.qracer/summaries/ on startup, and reports how many past contexts were loaded. Side improvements driven by the tests: - FTS extension loading is now lazy (deferred to first keyword search) and cached per process, so pure-vector and offline workflows aren't blocked by a missing fts extension. - _keyword_search() degrades gracefully to an empty result set when the FTS extension can't be loaded, allowing vector-only hybrid search to proceed. Updated docs/memory-system.md to reflect the new wiring. --- docs/memory-system.md | 16 +- qracer/cli.py | 11 + qracer/conversation/engine.py | 37 +- qracer/memory/memory_searcher.py | 479 +++++++++++++++------- tests/conversation/test_engine.py | 91 ++++ tests/conversation/test_topic_resolver.py | 8 +- tests/memory/test_memory_searcher.py | 147 ++++++- 7 files changed, 620 insertions(+), 169 deletions(-) diff --git a/docs/memory-system.md b/docs/memory-system.md index be90012..1a394b5 100644 --- a/docs/memory-system.md +++ b/docs/memory-system.md @@ -17,22 +17,22 @@ Complete audit trail — reconstructs exactly what the agent did and why. ## Tier 2 — Compressed Summary (Markdown) -> **구현 예정** — 현재 SessionCompactor가 존재하지만 요약이 파일로 저장되지 않습니다. - -When a session exceeds 8,000 tokens (measured via `tiktoken`), SessionManager compacts the conversation into a Markdown summary using the reporter role (Haiku). The summary replaces raw turns in the active context window; the JSONL is preserved. +When a session exceeds 8,000 tokens (rough `len // 4` estimate of the JSONL log), the `ConversationEngine` invokes `SessionCompactor.compact_and_save()` after each turn via `_maybe_compact()`. The reporter role (Haiku) condenses the turns into a concise Markdown summary, which is written to `~/.qracer/summaries/.md`. The raw JSONL log is preserved untouched. ## Tier 3 — Search Index (DuckDB) -> **구현 예정** — 현재 MemorySearcher가 DuckDB FTS로 키워드 검색을 지원하지만, 벡터 임베딩(HNSW) 검색은 미구현입니다. - -DuckDB indexes all Tier 2 Markdown summaries for hybrid retrieval: keyword (FTS) + vector similarity (VSS/HNSW). Writes occur only at compaction time. +`MemorySearcher` indexes Tier 2 Markdown summaries in DuckDB for hybrid retrieval: keyword (BM25 via FTS) and, when an embedding function is supplied, vector similarity via DuckDB's `list_cosine_similarity`. The two branches are fused with reciprocal rank fusion so scores from different scales can be combined without normalisation. -- Embedding model: `text-embedding-3-small` (OpenAI API) or `all-MiniLM-L6-v2` (local fallback). -- Tables: `session_index` (FTS), `session_embeddings` (HNSW). +- Embedding is pluggable via the `embedding_fn: Callable[[str], list[float]]` parameter — callers can back it with the Claude API, `text-embedding-3-small`, `sentence-transformers`, or any other model. When `embedding_fn` is `None` the searcher falls back to keyword-only search. +- Tables: `session_index` (FTS) and `session_embeddings` (cosine similarity). - Source of truth is the Markdown files; DuckDB is the index only. The agent calls `memory_search` autonomously when past context may be relevant. +## Cross-Session Loading + +On `qracer repl` startup, the CLI instantiates a file-backed `MemorySearcher` at `~/.qracer/memory_index.duckdb` and re-indexes every Markdown file in `~/.qracer/summaries/`. The number of loaded contexts is printed to the user so returning sessions immediately know how much prior memory is in scope. + ## MEMORY.md vs. Tier 2 > **구현 예정** — MEMORY.md, BOOTSTRAP.md 기반 크로스 세션 메모리는 아직 구현되지 않았습니다. diff --git a/qracer/cli.py b/qracer/cli.py index ec47581..8c2edda 100644 --- a/qracer/cli.py +++ b/qracer/cli.py @@ -867,6 +867,7 @@ def repl() -> None: from qracer.alert_monitor import AlertMonitor from qracer.alerts import AlertStore from qracer.conversation.engine import ConversationEngine + from qracer.memory.memory_searcher import MemorySearcher from qracer.memory.session_logger import SessionLogger from qracer.watchlist import Watchlist @@ -894,6 +895,14 @@ def repl() -> None: session_id = uuid.uuid4().hex[:12] session_logger = SessionLogger(sessions_dir / f"{session_id}.jsonl") + # Cross-session memory (Tier 2 summaries + Tier 3 search index). + summaries_dir = _user_dir() / "summaries" + summaries_dir.mkdir(parents=True, exist_ok=True) + memory_searcher = MemorySearcher(_user_dir() / "memory_index.duckdb") + loaded_contexts = memory_searcher.index_directory(summaries_dir) + if loaded_contexts: + click.echo(f" ✓ Loaded {loaded_contexts} past session summaries from {summaries_dir}") + reports_dir = _user_dir() / "reports" watchlist = Watchlist(_user_dir() / "watchlist.json") @@ -915,6 +924,8 @@ def repl() -> None: session_logger=session_logger, report_dir=reports_dir, language=app_cfg.language, + memory_searcher=memory_searcher, + summaries_dir=summaries_dir, ) task_executor = TaskExecutor(task_store, data_registry, llm_registry, engine=engine) diff --git a/qracer/conversation/engine.py b/qracer/conversation/engine.py index a628763..46dc32c 100644 --- a/qracer/conversation/engine.py +++ b/qracer/conversation/engine.py @@ -70,6 +70,7 @@ def __init__( report_dir: Path | None = None, memory_searcher: MemorySearcher | None = None, language: str = "en", + summaries_dir: Path | None = None, ) -> None: self._llm = llm_registry self._data = data_registry @@ -77,6 +78,7 @@ def __init__( self._portfolio_config = portfolio_config or PortfolioConfig() self._memory_searcher = memory_searcher self._language = language + self._summaries_dir = summaries_dir analysis_loop = AnalysisLoop( llm_registry, @@ -190,19 +192,34 @@ def _log_turn(self, role: str, content: str, **kwargs: object) -> None: ) async def _maybe_compact(self) -> None: - """Trigger compaction if the session log exceeds the token threshold.""" + """Trigger compaction if the session log exceeds the token threshold. + + When a ``summaries_dir`` is configured the compacted summary is also + persisted to disk (Tier 2) and, if a ``memory_searcher`` is present, + indexed into the search index (Tier 3) so future sessions can find + it. + """ if self._compactor is None or self._session_logger is None: return - if self._compactor.needs_compaction(self._session_logger): - try: - result = await self._compactor.compact(self._session_logger) - logger.info( - "Session compacted: %d turns → %d tokens summary", - result.turn_count, - result.output_tokens, + if not self._compactor.needs_compaction(self._session_logger): + return + try: + if self._summaries_dir is not None: + result = await self._compactor.compact_and_save( + self._session_logger, self._summaries_dir ) - except Exception: - logger.warning("Session compaction failed", exc_info=True) + if self._memory_searcher is not None: + session_id = self._session_logger.path.stem + self._memory_searcher.index_summary(session_id, result.summary) + else: + result = await self._compactor.compact(self._session_logger) + logger.info( + "Session compacted: %d turns → %d tokens summary", + result.turn_count, + result.output_tokens, + ) + except Exception: + logger.warning("Session compaction failed", exc_info=True) async def query(self, user_input: str) -> EngineResponse: """Process a user query through the full pipeline.""" diff --git a/qracer/memory/memory_searcher.py b/qracer/memory/memory_searcher.py index 6060253..2d5dc9d 100644 --- a/qracer/memory/memory_searcher.py +++ b/qracer/memory/memory_searcher.py @@ -1,148 +1,331 @@ -"""MemorySearcher — hybrid retrieval over session summaries (Tier 3). - -Indexes Tier 2 Markdown summaries in DuckDB for keyword search via full-text -search (FTS). The Markdown files on disk remain the source of truth; DuckDB -is the search index only. -""" - -from __future__ import annotations - -import logging -from dataclasses import dataclass -from datetime import datetime -from pathlib import Path - -import duckdb - -logger = logging.getLogger(__name__) - -_SCHEMA_SQL = """ -CREATE TABLE IF NOT EXISTS session_index ( - session_id VARCHAR PRIMARY KEY, - summary VARCHAR NOT NULL, - indexed_at TIMESTAMP NOT NULL -); -""" - -_FTS_INDEX_SQL = """ -PRAGMA create_fts_index('session_index', 'session_id', 'summary', overwrite=1); -""" - - -@dataclass(frozen=True) -class SearchResult: - """A single search hit from the memory index.""" - - session_id: str - summary: str - score: float - indexed_at: datetime - - -class MemorySearcher: - """Keyword search over compacted session summaries using DuckDB FTS. - - Usage:: - - searcher = MemorySearcher() # in-memory - searcher = MemorySearcher("memory_index.db") # file-backed - searcher.index_summary("sess_001", "# AAPL analysis ...") - results = searcher.search("AAPL earnings") - searcher.close() - """ - - def __init__(self, path: str | Path | None = None) -> None: - db_path = str(path) if path else ":memory:" - self._conn = duckdb.connect(db_path) - self._init_schema() - self._fts_dirty = True - - def _init_schema(self) -> None: - self._conn.execute("INSTALL fts") - self._conn.execute("LOAD fts") - self._conn.execute(_SCHEMA_SQL) - - def _rebuild_fts(self) -> None: - """Rebuild the FTS index if data has changed since last build.""" - if not self._fts_dirty: - return - count = self._conn.execute("SELECT count(*) FROM session_index").fetchone() - if count and count[0] > 0: - self._conn.execute(_FTS_INDEX_SQL) - self._fts_dirty = False - - @property - def connection(self) -> duckdb.DuckDBPyConnection: - return self._conn - - def index_summary(self, session_id: str, summary: str) -> None: - """Insert or replace a session summary in the search index.""" - self._conn.execute( - """ - INSERT OR REPLACE INTO session_index (session_id, summary, indexed_at) - VALUES (?, ?, ?) - """, - [session_id, summary, datetime.now()], - ) - self._fts_dirty = True - - def index_directory(self, summaries_dir: Path) -> int: - """Index all ``.md`` files in a directory. Returns count of files indexed.""" - count = 0 - for md_file in sorted(summaries_dir.glob("*.md")): - session_id = md_file.stem - summary = md_file.read_text(encoding="utf-8") - if summary.strip(): - self.index_summary(session_id, summary) - count += 1 - return count - - def search(self, query: str, limit: int = 10) -> list[SearchResult]: - """Run a keyword search over indexed summaries. - - Returns results ordered by relevance score (descending). - """ - self._rebuild_fts() - - row_count = self._conn.execute("SELECT count(*) FROM session_index").fetchone() - if not row_count or row_count[0] == 0: - return [] - - rows = self._conn.execute( - """ - SELECT - session_id, - summary, - fts_main_session_index.match_bm25(session_id, ?) AS score, - indexed_at - FROM session_index - WHERE score IS NOT NULL - ORDER BY score DESC - LIMIT ? - """, - [query, limit], - ).fetchall() - - return [ - SearchResult( - session_id=r[0], - summary=r[1], - score=r[2], - indexed_at=r[3], - ) - for r in rows - ] - - def remove(self, session_id: str) -> None: - """Remove a session from the index.""" - self._conn.execute("DELETE FROM session_index WHERE session_id = ?", [session_id]) - self._fts_dirty = True - - def close(self) -> None: - self._conn.close() - - def __enter__(self) -> MemorySearcher: - return self - - def __exit__(self, *_: object) -> None: - self.close() +"""MemorySearcher — hybrid retrieval over session summaries (Tier 3). + +Indexes Tier 2 Markdown summaries in DuckDB for keyword search via full-text +search (FTS) and, optionally, semantic similarity via stored embeddings. The +Markdown files on disk remain the source of truth; DuckDB is the search index +only. +""" + +from __future__ import annotations + +import logging +from collections.abc import Callable +from dataclasses import dataclass +from datetime import datetime +from pathlib import Path + +import duckdb + +logger = logging.getLogger(__name__) + +EmbeddingFn = Callable[[str], list[float]] + +_SCHEMA_SQL = """ +CREATE TABLE IF NOT EXISTS session_index ( + session_id VARCHAR PRIMARY KEY, + summary VARCHAR NOT NULL, + indexed_at TIMESTAMP NOT NULL +); +""" + +_EMBEDDINGS_SCHEMA_SQL = """ +CREATE TABLE IF NOT EXISTS session_embeddings ( + session_id VARCHAR PRIMARY KEY, + embedding FLOAT[] NOT NULL, + indexed_at TIMESTAMP NOT NULL +); +""" + +_FTS_INDEX_SQL = """ +PRAGMA create_fts_index('session_index', 'session_id', 'summary', overwrite=1); +""" + +# Reciprocal rank fusion constant (Cormack et al. 2009). +_RRF_K = 60 + +# Process-wide cache: the DuckDB FTS extension download can take 80+s to time +# out in offline environments, so we avoid repeating the attempt. +_FTS_AVAILABLE: bool | None = None + + +@dataclass(frozen=True) +class SearchResult: + """A single search hit from the memory index.""" + + session_id: str + summary: str + score: float + indexed_at: datetime + + +class MemorySearcher: + """Hybrid keyword + vector search over compacted session summaries. + + Keyword search uses DuckDB FTS (BM25). Vector search is optional and + requires an ``embedding_fn`` callable that maps a string to a dense + vector; when provided, :meth:`search` runs both branches and fuses + results via reciprocal rank fusion. + + Usage:: + + searcher = MemorySearcher() # keyword only + searcher = MemorySearcher(embedding_fn=my_embed) # hybrid search + searcher.index_summary("sess_001", "# AAPL analysis ...") + results = searcher.search("AAPL earnings") + searcher.close() + """ + + def __init__( + self, + path: str | Path | None = None, + *, + embedding_fn: EmbeddingFn | None = None, + ) -> None: + db_path = str(path) if path else ":memory:" + self._conn = duckdb.connect(db_path) + self._embedding_fn = embedding_fn + self._fts_loaded = False + self._init_schema() + self._fts_dirty = True + + def _init_schema(self) -> None: + self._conn.execute(_SCHEMA_SQL) + self._conn.execute(_EMBEDDINGS_SCHEMA_SQL) + + def _ensure_fts_loaded(self) -> None: + """Install/load the DuckDB FTS extension on first use. + + Deferred until a keyword search actually runs so that pure + vector-search workloads (and offline test environments that + cannot download DuckDB extensions) are not impacted. Failures + are cached at process level to avoid repeatedly paying the 80+s + download timeout. + """ + global _FTS_AVAILABLE + if self._fts_loaded: + return + if _FTS_AVAILABLE is False: + raise RuntimeError("DuckDB FTS extension unavailable") + try: + self._conn.execute("INSTALL fts") + self._conn.execute("LOAD fts") + except Exception: + _FTS_AVAILABLE = False + raise + _FTS_AVAILABLE = True + self._fts_loaded = True + + def _rebuild_fts(self) -> None: + """Rebuild the FTS index if data has changed since last build.""" + if not self._fts_dirty: + return + self._ensure_fts_loaded() + count = self._conn.execute("SELECT count(*) FROM session_index").fetchone() + if count and count[0] > 0: + self._conn.execute(_FTS_INDEX_SQL) + self._fts_dirty = False + + @property + def connection(self) -> duckdb.DuckDBPyConnection: + return self._conn + + @property + def has_embeddings(self) -> bool: + """Whether this searcher was configured with an embedding function.""" + return self._embedding_fn is not None + + def index_summary(self, session_id: str, summary: str) -> None: + """Insert or replace a session summary in the search index. + + When an embedding function is configured, the summary is also + embedded and stored in ``session_embeddings``. + """ + now = datetime.now() + self._conn.execute( + """ + INSERT OR REPLACE INTO session_index (session_id, summary, indexed_at) + VALUES (?, ?, ?) + """, + [session_id, summary, now], + ) + self._fts_dirty = True + + if self._embedding_fn is not None: + try: + vector = self._embedding_fn(summary) + except Exception: + logger.warning( + "Embedding function failed for session %s", session_id, exc_info=True + ) + return + self._conn.execute( + """ + INSERT OR REPLACE INTO session_embeddings (session_id, embedding, indexed_at) + VALUES (?, ?, ?) + """, + [session_id, vector, now], + ) + + def index_directory(self, summaries_dir: Path) -> int: + """Index all ``.md`` files in a directory. Returns count of files indexed.""" + count = 0 + for md_file in sorted(summaries_dir.glob("*.md")): + session_id = md_file.stem + summary = md_file.read_text(encoding="utf-8") + if summary.strip(): + self.index_summary(session_id, summary) + count += 1 + return count + + def search(self, query: str, limit: int = 10) -> list[SearchResult]: + """Run a keyword (or hybrid) search over indexed summaries. + + Returns results ordered by relevance score (descending). When an + embedding function is configured, keyword and vector hits are fused + with reciprocal rank fusion; otherwise only keyword results are + returned. + """ + kw_results = self._keyword_search(query, limit) + if self._embedding_fn is None: + return kw_results + + vec_results = self._vector_search(query, limit) + return self._merge_results(kw_results, vec_results, limit) + + def _keyword_search(self, query: str, limit: int) -> list[SearchResult]: + """Run BM25 keyword search via DuckDB FTS. + + If the FTS extension cannot be loaded (for instance in an offline + environment), this branch degrades to an empty result set rather + than failing the whole hybrid search. + """ + try: + self._rebuild_fts() + except Exception: + logger.warning("FTS keyword search unavailable", exc_info=True) + return [] + + row_count = self._conn.execute("SELECT count(*) FROM session_index").fetchone() + if not row_count or row_count[0] == 0: + return [] + + rows = self._conn.execute( + """ + SELECT + session_id, + summary, + fts_main_session_index.match_bm25(session_id, ?) AS score, + indexed_at + FROM session_index + WHERE score IS NOT NULL + ORDER BY score DESC + LIMIT ? + """, + [query, limit], + ).fetchall() + + return [ + SearchResult( + session_id=r[0], + summary=r[1], + score=r[2], + indexed_at=r[3], + ) + for r in rows + ] + + def _vector_search(self, query: str, limit: int) -> list[SearchResult]: + """Run cosine similarity search using the stored embeddings. + + Returns an empty list when no embedding function is configured, the + embedding call fails, or the embeddings table is empty. + """ + if self._embedding_fn is None: + return [] + + row_count = self._conn.execute("SELECT count(*) FROM session_embeddings").fetchone() + if not row_count or row_count[0] == 0: + return [] + + try: + query_vector = self._embedding_fn(query) + except Exception: + logger.warning("Embedding function failed for query", exc_info=True) + return [] + + rows = self._conn.execute( + """ + SELECT + si.session_id, + si.summary, + list_cosine_similarity(se.embedding, ?::FLOAT[]) AS score, + si.indexed_at + FROM session_embeddings se + JOIN session_index si USING (session_id) + WHERE score IS NOT NULL + ORDER BY score DESC + LIMIT ? + """, + [query_vector, limit], + ).fetchall() + + return [ + SearchResult( + session_id=r[0], + summary=r[1], + score=float(r[2]), + indexed_at=r[3], + ) + for r in rows + ] + + def _merge_results( + self, + keyword_results: list[SearchResult], + vector_results: list[SearchResult], + limit: int, + ) -> list[SearchResult]: + """Fuse keyword and vector hits with reciprocal rank fusion. + + RRF combines rankings without needing to normalise the underlying + scores (BM25 and cosine similarity are on different scales). Each + document's fused score is ``sum(1 / (k + rank_i))`` across input + lists that contain it, where *rank* is 1-based. + """ + scores: dict[str, float] = {} + records: dict[str, SearchResult] = {} + + for rank, result in enumerate(keyword_results, start=1): + scores[result.session_id] = scores.get(result.session_id, 0.0) + 1.0 / (_RRF_K + rank) + records[result.session_id] = result + + for rank, result in enumerate(vector_results, start=1): + scores[result.session_id] = scores.get(result.session_id, 0.0) + 1.0 / (_RRF_K + rank) + # Prefer the keyword-side record if present; otherwise use vector. + records.setdefault(result.session_id, result) + + fused = [ + SearchResult( + session_id=sid, + summary=records[sid].summary, + score=score, + indexed_at=records[sid].indexed_at, + ) + for sid, score in sorted(scores.items(), key=lambda kv: kv[1], reverse=True) + ] + return fused[:limit] + + def remove(self, session_id: str) -> None: + """Remove a session from the index.""" + self._conn.execute("DELETE FROM session_index WHERE session_id = ?", [session_id]) + self._conn.execute("DELETE FROM session_embeddings WHERE session_id = ?", [session_id]) + self._fts_dirty = True + + def close(self) -> None: + self._conn.close() + + def __enter__(self) -> MemorySearcher: + return self + + def __exit__(self, *_: object) -> None: + self.close() diff --git a/tests/conversation/test_engine.py b/tests/conversation/test_engine.py index b63fb70..547832b 100644 --- a/tests/conversation/test_engine.py +++ b/tests/conversation/test_engine.py @@ -745,3 +745,94 @@ async def test_default_language_is_english(self) -> None: response = await engine.query("AAPL price") assert "unavailable" in response.text + + +# --------------------------------------------------------------------------- +# Cross-session memory: compact_and_save + Tier 3 auto-indexing +# --------------------------------------------------------------------------- + + +class TestCompactionPersistence: + async def test_maybe_compact_saves_to_disk_and_indexes(self, tmp_path) -> None: + """When summaries_dir and memory_searcher are set, compaction should + write a Markdown summary to disk AND auto-index it into Tier 3.""" + from qracer.memory.memory_searcher import MemorySearcher + from qracer.memory.session_compactor import CompactionResult + from qracer.memory.session_logger import SessionLogger, TurnRecord + + session_logger = SessionLogger(tmp_path / "sessions" / "abc123.jsonl") + for i in range(1, 4): + session_logger.append(TurnRecord(turn=i, role="user", content=f"query {i}")) + session_logger.append(TurnRecord(turn=i, role="assistant", content=f"answer {i}")) + + summaries_dir = tmp_path / "summaries" + searcher = MemorySearcher() + + llm = _mock_llm_registry({Role.RESEARCHER: "", Role.ANALYST: "", Role.STRATEGIST: ""}) + engine = ConversationEngine( + llm, + DataRegistry(), + session_logger=session_logger, + memory_searcher=searcher, + summaries_dir=summaries_dir, + ) + + # Stub the compactor so we don't depend on reporter LLM wiring. + engine._compactor.needs_compaction = lambda _: True # type: ignore[union-attr] + + async def fake_compact_and_save(sl, out_dir): # type: ignore[no-untyped-def] + out_dir.mkdir(parents=True, exist_ok=True) + md_path = out_dir / (sl.path.stem + ".md") + md_path.write_text("# Session Summary\n\n- Discussed AAPL earnings", encoding="utf-8") + return CompactionResult( + summary="# Session Summary\n\n- Discussed AAPL earnings", + turn_count=6, + input_tokens=100, + output_tokens=20, + cost=0.0, + ) + + engine._compactor.compact_and_save = fake_compact_and_save # type: ignore[union-attr,method-assign] + engine._compactor.compact = AsyncMock() # type: ignore[union-attr,method-assign] + + await engine._maybe_compact() + + # Tier 2: file on disk. + assert (summaries_dir / "abc123.md").exists() + # Tier 3: the auto-indexed row is directly observable on the index + # connection without relying on the FTS extension (which may be + # unavailable in sandboxed environments). + row = searcher.connection.execute( + "SELECT summary FROM session_index WHERE session_id = 'abc123'" + ).fetchone() + assert row is not None + assert "AAPL earnings" in row[0] + # Verified we took the save-branch, not the in-memory branch. + engine._compactor.compact.assert_not_called() # type: ignore[union-attr] + + searcher.close() + + async def test_maybe_compact_falls_back_to_in_memory_without_dir(self, tmp_path) -> None: + """Without summaries_dir, compaction should call compact() (no disk).""" + from qracer.memory.session_compactor import CompactionResult + from qracer.memory.session_logger import SessionLogger, TurnRecord + + session_logger = SessionLogger(tmp_path / "sessions" / "abc123.jsonl") + session_logger.append(TurnRecord(turn=1, role="user", content="hi")) + + llm = _mock_llm_registry({Role.RESEARCHER: "", Role.ANALYST: "", Role.STRATEGIST: ""}) + engine = ConversationEngine(llm, DataRegistry(), session_logger=session_logger) + assert engine._compactor is not None + + engine._compactor.needs_compaction = lambda _: True # type: ignore[method-assign] + engine._compactor.compact = AsyncMock( # type: ignore[method-assign] + return_value=CompactionResult( + summary="x", turn_count=1, input_tokens=1, output_tokens=1, cost=0.0 + ) + ) + engine._compactor.compact_and_save = AsyncMock() # type: ignore[method-assign] + + await engine._maybe_compact() + + engine._compactor.compact.assert_awaited_once() + engine._compactor.compact_and_save.assert_not_called() diff --git a/tests/conversation/test_topic_resolver.py b/tests/conversation/test_topic_resolver.py index 03dabff..700b98c 100644 --- a/tests/conversation/test_topic_resolver.py +++ b/tests/conversation/test_topic_resolver.py @@ -44,9 +44,11 @@ async def test_empty_context(self): class TestEmbeddingSearch: async def test_found_in_memory(self): ctx = _ctx() + searcher = MemorySearcher() try: - searcher = MemorySearcher() + searcher._ensure_fts_loaded() except Exception: + searcher.close() pytest.skip("DuckDB FTS extension unavailable") searcher.index_summary("sess_aapl", "# AAPLA Analysis\nAAPL beat earnings...") result = await resolve_unknown_topic( @@ -58,9 +60,11 @@ async def test_found_in_memory(self): async def test_not_found_in_memory(self): ctx = _ctx() + searcher = MemorySearcher() try: - searcher = MemorySearcher() + searcher._ensure_fts_loaded() except Exception: + searcher.close() pytest.skip("DuckDB FTS extension unavailable") result = await resolve_unknown_topic("Unknown stock", ctx, memory_searcher=searcher) assert result.resolved is False diff --git a/tests/memory/test_memory_searcher.py b/tests/memory/test_memory_searcher.py index 138e6f1..3916dd6 100644 --- a/tests/memory/test_memory_searcher.py +++ b/tests/memory/test_memory_searcher.py @@ -12,9 +12,13 @@ @pytest.fixture def searcher() -> Iterator[MemorySearcher]: + s = MemorySearcher() try: - s = MemorySearcher() + # Pre-warm the FTS extension so tests that rely on keyword search + # skip cleanly when the extension cannot be installed (e.g. offline). + s._ensure_fts_loaded() except Exception: + s.close() pytest.skip("DuckDB FTS extension unavailable") yield s s.close() @@ -73,6 +77,13 @@ def test_search_limit(self, searcher: MemorySearcher) -> None: assert len(results) <= 2 def test_context_manager(self) -> None: + try: + s = MemorySearcher() + s._ensure_fts_loaded() + except Exception: + pytest.skip("DuckDB FTS extension unavailable") + s.close() + with MemorySearcher() as s: s.index_summary("sess_001", "Test content") results = s.search("Test") @@ -88,3 +99,137 @@ def test_search_result_fields(self, searcher: MemorySearcher) -> None: assert r.summary == "AAPL earnings beat expectations" assert isinstance(r.score, float) assert r.indexed_at is not None + + +# --------------------------------------------------------------------------- +# Hybrid (keyword + vector) search +# --------------------------------------------------------------------------- + + +def _fake_embedding(text: str) -> list[float]: + """Deterministic 3-dim embedding keyed on which topic words appear. + + Used by tests to exercise vector search without pulling in a real + embedding model. The axes represent: AAPL/earnings, Fed/rates, crypto. + """ + text_lower = text.lower() + dims = [ + 1.0 if any(k in text_lower for k in ("aapl", "earnings")) else 0.0, + 1.0 if any(k in text_lower for k in ("fed", "rate", "inflation")) else 0.0, + 1.0 if any(k in text_lower for k in ("crypto", "bitcoin")) else 0.0, + ] + # Avoid all-zero vectors so cosine similarity is defined. + if sum(dims) == 0: + dims[0] = 0.01 + return dims + + +@pytest.fixture +def hybrid_searcher() -> Iterator[MemorySearcher]: + """Hybrid searcher whose tests do not require the FTS extension. + + Vector-only paths do not depend on FTS, so we avoid pre-warming it here + and only skip if DuckDB itself fails to open. + """ + s = MemorySearcher(embedding_fn=_fake_embedding) + yield s + s.close() + + +class TestHybridSearch: + def test_has_embeddings_flag(self, hybrid_searcher: MemorySearcher) -> None: + assert hybrid_searcher.has_embeddings is True + + def test_kw_searcher_has_no_embeddings(self, searcher: MemorySearcher) -> None: + assert searcher.has_embeddings is False + + def test_index_summary_stores_embedding(self, hybrid_searcher: MemorySearcher) -> None: + hybrid_searcher.index_summary("sess_001", "AAPL earnings were strong") + row = hybrid_searcher.connection.execute( + "SELECT embedding FROM session_embeddings WHERE session_id = 'sess_001'" + ).fetchone() + assert row is not None + assert list(row[0]) == _fake_embedding("AAPL earnings were strong") + + def test_vector_search_finds_semantic_match(self, hybrid_searcher: MemorySearcher) -> None: + hybrid_searcher.index_summary("sess_aapl", "Quarterly earnings for the iPhone maker") + hybrid_searcher.index_summary("sess_fed", "Fed held rates steady amid sticky inflation") + hybrid_searcher.index_summary( + "sess_btc", "Crypto market rallied as bitcoin broke resistance" + ) + + # Query shares the Fed/rates axis but not exact keywords. + results = hybrid_searcher._vector_search("rate decision outlook", limit=5) + assert results, "vector search should return at least one hit" + assert results[0].session_id == "sess_fed" + + def test_search_is_hybrid_when_embedding_fn_set(self, hybrid_searcher: MemorySearcher) -> None: + hybrid_searcher.index_summary("sess_aapl", "AAPL earnings beat expectations") + hybrid_searcher.index_summary("sess_fed", "Fed policy and inflation trajectory") + + results = hybrid_searcher.search("AAPL earnings", limit=5) + assert results + session_ids = [r.session_id for r in results] + assert "sess_aapl" in session_ids + # RRF scores are always positive, and both branches contributed. + assert all(r.score > 0 for r in results) + + def test_merge_results_reciprocal_rank_fusion(self, hybrid_searcher: MemorySearcher) -> None: + from datetime import datetime + + from qracer.memory.memory_searcher import SearchResult + + now = datetime.now() + keyword = [ + SearchResult("sess_a", "a", 5.0, now), + SearchResult("sess_b", "b", 3.0, now), + ] + vector = [ + SearchResult("sess_b", "b", 0.95, now), + SearchResult("sess_c", "c", 0.80, now), + ] + + merged = hybrid_searcher._merge_results(keyword, vector, limit=10) + + ids = [r.session_id for r in merged] + # sess_b appears in both lists → should rank first under RRF. + assert ids[0] == "sess_b" + assert set(ids) == {"sess_a", "sess_b", "sess_c"} + # Scores must be strictly decreasing. + scores = [r.score for r in merged] + assert scores == sorted(scores, reverse=True) + + def test_remove_clears_embedding_row(self, hybrid_searcher: MemorySearcher) -> None: + hybrid_searcher.index_summary("sess_001", "AAPL earnings") + hybrid_searcher.remove("sess_001") + row = hybrid_searcher.connection.execute( + "SELECT count(*) FROM session_embeddings" + ).fetchone() + assert row is not None and row[0] == 0 + + def test_vector_search_returns_empty_without_embedding_fn( + self, searcher: MemorySearcher + ) -> None: + searcher.index_summary("sess_001", "AAPL earnings") + assert searcher._vector_search("AAPL", limit=5) == [] + + def test_embedding_failure_does_not_block_indexing(self) -> None: + def broken(_: str) -> list[float]: + raise RuntimeError("embedding service down") + + with MemorySearcher(embedding_fn=broken) as s: + # Indexing must still succeed even though the embedding fn + # throws — the summary row is written before embedding is + # attempted. + s.index_summary("sess_001", "AAPL earnings beat") + + row = s.connection.execute( + "SELECT count(*) FROM session_index WHERE session_id = 'sess_001'" + ).fetchone() + assert row is not None and row[0] == 1 + + # The embedding row should be absent because the fn raised. + emb_row = s.connection.execute( + "SELECT count(*) FROM session_embeddings WHERE session_id = 'sess_001'" + ).fetchone() + assert emb_row is not None and emb_row[0] == 0