diff --git a/src-tauri/src/main.rs b/src-tauri/src/main.rs index 25718939..58965aa1 100644 --- a/src-tauri/src/main.rs +++ b/src-tauri/src/main.rs @@ -6012,10 +6012,10 @@ async fn pyramid_publish_question_set( async fn pyramid_check_staleness( state: tauri::State<'_, SharedState>, slug: String, - files: Option>, + files: Option>, threshold: Option, ) -> Result { - use wire_node_lib::pyramid::staleness_bridge; + use wire_node_lib::pyramid::staleness; let threshold = threshold.unwrap_or(state.pyramid.operational.tier2.staleness_threshold); @@ -6024,13 +6024,13 @@ async fn pyramid_check_staleness( let explicit = files .as_ref() .filter(|f| !f.is_empty()) - .map(|f| staleness_bridge::entries_to_changed_files(f)); + .map(|f| staleness::entries_to_changed_files(f)); if let Some(files) = explicit { (files, "explicit".to_string()) } else { let conn = state.pyramid.reader.lock().await; - let files = staleness_bridge::auto_detect_changed_files(&conn, &slug) + let files = staleness::auto_detect_changed_files(&conn, &slug) .map_err(|e| format!("failed to auto-detect changed files: {}", e))?; (files, "auto_detect_observation_events".to_string()) } @@ -6043,13 +6043,13 @@ async fn pyramid_check_staleness( let slug_owned = slug.clone(); let result = tokio::task::spawn_blocking(move || { let c = conn.blocking_lock(); - staleness_bridge::run_trace_decide_check(&c, &slug_owned, &changed_files, threshold) + staleness::run_trace_decide_check(&c, &slug_owned, &changed_files, threshold) }) .await; match result { Ok(Ok((report, decide_work, deltas_processed))) => { - let response = staleness_bridge::CheckStalenessResponse { + let response = staleness::CheckStalenessResponse { source, files_processed, report, diff --git a/src-tauri/src/pyramid/dadbear_extend.rs b/src-tauri/src/pyramid/dadbear_extend.rs index cf1afa1a..ef597748 100644 --- a/src-tauri/src/pyramid/dadbear_extend.rs +++ b/src-tauri/src/pyramid/dadbear_extend.rs @@ -620,7 +620,7 @@ pub async fn run_tick_for_config( } // Tombstone deleted paths: route through the canonical file_deleted -> tombstone - // primitive (staleness_bridge -> dadbear_compiler -> dispatch_tombstone) so the + // primitive (staleness -> dadbear_compiler -> dispatch_tombstone) so the // deletion propagates (supersedes only THAT file's L0 + upstream delta). Previously // this called db::mark_ingest_stale, which only flips status='stale'; get_pending_ingests // is pending-only, so the deletion was never re-claimed NOR tombstoned — silently dropped. diff --git a/src-tauri/src/pyramid/mod.rs b/src-tauri/src/pyramid/mod.rs index e281b256..2bcca09b 100644 --- a/src-tauri/src/pyramid/mod.rs +++ b/src-tauri/src/pyramid/mod.rs @@ -126,7 +126,6 @@ pub mod stale_check_decision; pub mod stale_helpers; pub mod stale_helpers_upper; pub mod staleness; -pub mod staleness_bridge; pub mod step_context; pub mod structural_extract; pub mod supersession; diff --git a/src-tauri/src/pyramid/routes.rs b/src-tauri/src/pyramid/routes.rs index 711459ca..8f4013aa 100644 --- a/src-tauri/src/pyramid/routes.rs +++ b/src-tauri/src/pyramid/routes.rs @@ -35,7 +35,7 @@ use super::query; use super::reading_modes; use super::recovery; use super::slug; -use super::staleness_bridge; +use super::staleness; use super::types::CharacterizationResult; use super::types::*; use super::vine; @@ -1919,7 +1919,7 @@ pub fn pyramid_routes( .and(warp::path::end()) .and(warp::post()) .and(with_auth_state(state.clone())) - .and(warp::body::json::()) + .and(warp::body::json::()) .and_then(handle_check_staleness)); // REMOTE-SAFE: GET /pyramid/:slug/question-overlays — read-only, dual auth @@ -10974,7 +10974,7 @@ async fn handle_publish_question_set( async fn handle_check_staleness( slug_name: String, state: Arc, - body: staleness_bridge::CheckStalenessRequest, + body: staleness::CheckStalenessRequest, ) -> Result { let threshold = body .threshold @@ -10986,13 +10986,13 @@ async fn handle_check_staleness( .files .as_ref() .filter(|f| !f.is_empty()) - .map(|f| staleness_bridge::entries_to_changed_files(f)); + .map(|f| staleness::entries_to_changed_files(f)); if let Some(files) = explicit { (files, "explicit".to_string()) } else { let conn = state.reader.lock().await; - match staleness_bridge::auto_detect_changed_files(&conn, &slug_name) { + match staleness::auto_detect_changed_files(&conn, &slug_name) { Ok(files) => (files, "auto_detect_observation_events".to_string()), Err(e) => { return Ok(json_error( @@ -11011,13 +11011,13 @@ async fn handle_check_staleness( let slug_owned = slug_name.clone(); let result = tokio::task::spawn_blocking(move || { let c = conn.blocking_lock(); - staleness_bridge::run_trace_decide_check(&c, &slug_owned, &changed_files, threshold) + staleness::run_trace_decide_check(&c, &slug_owned, &changed_files, threshold) }) .await; match result { Ok(Ok((report, decide_work, deltas_processed))) => { - let response = staleness_bridge::CheckStalenessResponse { + let response = staleness::CheckStalenessResponse { source, files_processed, report, @@ -12109,7 +12109,7 @@ async fn handle_ingest_scan( } // Tombstone deleted paths: route through the canonical file_deleted -> tombstone - // primitive (staleness_bridge -> dadbear_compiler -> dispatch_tombstone) so the + // primitive (staleness -> dadbear_compiler -> dispatch_tombstone) so the // deletion propagates (supersedes only THAT file's L0 + upstream delta). Previously // this called db::mark_ingest_stale, which only flips status='stale'; get_pending_ingests // is pending-only, so the deletion was never re-claimed NOR tombstoned — silently dropped. diff --git a/src-tauri/src/pyramid/staleness.rs b/src-tauri/src/pyramid/staleness.rs index c3333c19..909ef37c 100644 --- a/src-tauri/src/pyramid/staleness.rs +++ b/src-tauri/src/pyramid/staleness.rs @@ -15,6 +15,7 @@ use serde::{Deserialize, Serialize}; use std::collections::HashMap; use tracing::{debug, info, warn}; +use super::dadbear_compiler::{self, CompilationResult}; use super::db; use super::types::{EvidenceVerdict, SourceDelta}; @@ -27,6 +28,39 @@ pub struct ChangedFile { pub change_type: ChangeType, } +/// A single file change entry in the manual check-staleness request body. +#[derive(Debug, Clone, Deserialize)] +pub struct FileChangeEntry { + pub path: String, + pub change_type: String, +} + +/// Request body for `POST /pyramid/:slug/check-staleness`. +/// If `files` is None or empty, auto-detect from DADBEAR observation events. +#[derive(Debug, Clone, Deserialize, Default)] +pub struct CheckStalenessRequest { + #[serde(default)] + pub files: Option>, + /// Override the configured staleness threshold. + #[serde(default)] + pub threshold: Option, +} + +/// Full manual TRACE -> DECIDE check response. +#[derive(Debug, Clone, Serialize)] +pub struct CheckStalenessResponse { + /// How the changed files were determined. + pub source: String, + /// Number of changed files processed. + pub files_processed: usize, + /// The deterministic TRACE report. + pub report: StalenessReport, + /// Work items compiled for the supervisor-owned DECIDE step. + pub decide_work: CompilationResult, + /// Number of source deltas marked processed after DECIDE work was compiled. + pub deltas_processed: usize, +} + /// How a source file changed. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] #[serde(rename_all = "lowercase")] @@ -109,6 +143,185 @@ pub fn detect_source_changes( Ok(deltas) } +/// Convert request body entries to internal `ChangedFile` format. +pub fn entries_to_changed_files(entries: &[FileChangeEntry]) -> Vec { + entries + .iter() + .map(|e| ChangedFile { + path: e.path.clone(), + change_type: ChangeType::from_str(&e.change_type), + }) + .collect() +} + +/// Run the unified manual staleness path: +/// 1. detect_source_changes — save deltas, get pending set +/// 2. propagate_staleness — deterministic evidence-DAG TRACE, no LLM +/// 3. compile_source_deltas_for_decide — enqueue supervisor DECIDE work items +/// +/// `changed_files` is the list of files that changed. If empty, this is a no-op +/// that returns an empty report. +pub fn run_trace_decide_check( + conn: &Connection, + slug: &str, + changed_files: &[ChangedFile], + threshold: f64, +) -> Result<(StalenessReport, CompilationResult, usize)> { + if changed_files.is_empty() { + info!( + slug, + "No changed files provided, returning empty trace-decide report" + ); + return Ok(( + StalenessReport { + affected_questions: vec![], + max_depth_reached: 0, + staleness_scores: Default::default(), + }, + CompilationResult { + new_cursor: 0, + items_compiled: 0, + deps_created: 0, + deduped: 0, + }, + 0, + )); + } + + let deltas = detect_source_changes(conn, slug, changed_files)?; + + info!( + slug, + delta_count = deltas.len(), + file_count = changed_files.len(), + "Source changes detected, compiling direct TRACE->DECIDE work" + ); + + let trace_run_id = uuid::Uuid::new_v4().to_string(); + let run = dadbear_compiler::compile_source_deltas_for_decide( + conn, + slug, + &trace_run_id, + &deltas, + threshold, + None, + None, + )?; + + info!( + slug, + trace_run_id, + affected = run.report.affected_questions.len(), + items_compiled = run.compilation.items_compiled, + deduped = run.compilation.deduped, + deltas_processed = run.deltas_processed, + "Trace-decide staleness check complete" + ); + + Ok((run.report, run.compilation, run.deltas_processed)) +} + +/// Ensure the `last_bridge_observation_id` column exists on `pyramid_build_metadata`. +fn ensure_observation_cursor_column(conn: &Connection) { + let has_column: bool = conn + .prepare( + "SELECT 1 FROM pragma_table_info('pyramid_build_metadata') + WHERE name = 'last_bridge_observation_id'", + ) + .and_then(|mut stmt| stmt.exists([])) + .unwrap_or(false); + + if !has_column { + let _ = conn.execute_batch( + "ALTER TABLE pyramid_build_metadata + ADD COLUMN last_bridge_observation_id INTEGER DEFAULT 0;", + ); + } +} + +/// Get the current observation cursor for a slug. +fn get_observation_cursor(conn: &Connection, slug: &str) -> i64 { + conn.query_row( + "SELECT COALESCE(last_bridge_observation_id, 0) + FROM pyramid_build_metadata WHERE slug = ?1", + rusqlite::params![slug], + |row| row.get(0), + ) + .unwrap_or(0) +} + +/// Advance the observation cursor to the given event ID. +fn advance_observation_cursor(conn: &Connection, slug: &str, new_cursor: i64) { + let _ = conn.execute( + "INSERT INTO pyramid_build_metadata (slug, last_bridge_observation_id, updated_at) + VALUES (?1, ?2, datetime('now')) + ON CONFLICT(slug) DO UPDATE SET + last_bridge_observation_id = ?2, + updated_at = datetime('now')", + rusqlite::params![slug, new_cursor], + ); +} + +/// Read file-level observation events and convert them into `ChangedFile` entries. +pub fn auto_detect_changed_files(conn: &Connection, slug: &str) -> Result> { + ensure_observation_cursor_column(conn); + + let cursor = get_observation_cursor(conn, slug); + let mut stmt = conn.prepare( + "SELECT id, event_type, file_path FROM dadbear_observation_events + WHERE slug = ?1 AND id > ?2 + AND event_type IN ('file_modified', 'file_created', 'file_deleted', 'file_renamed') + AND file_path IS NOT NULL AND file_path != '' + ORDER BY id ASC", + )?; + + let rows: Vec<(i64, String, String)> = stmt + .query_map(rusqlite::params![slug, cursor], |row| { + Ok(( + row.get::<_, i64>(0)?, + row.get::<_, String>(1)?, + row.get::<_, String>(2)?, + )) + })? + .filter_map(|r| r.ok()) + .collect(); + + if rows.is_empty() { + return Ok(Vec::new()); + } + + let max_id = rows.iter().map(|(id, _, _)| *id).max().unwrap_or(cursor); + let mut seen = std::collections::HashSet::new(); + let mut changed_files = Vec::new(); + for (_id, event_type, file_path) in &rows { + if !seen.insert((file_path.clone(), event_type.clone())) { + continue; + } + let change_type = match event_type.as_str() { + "file_created" => ChangeType::Addition, + "file_deleted" => ChangeType::Deletion, + "file_modified" | "file_renamed" => ChangeType::Modification, + _ => ChangeType::Modification, + }; + changed_files.push(ChangedFile { + path: file_path.clone(), + change_type, + }); + } + + advance_observation_cursor(conn, slug, max_id); + + info!( + slug, + count = changed_files.len(), + cursor_from = cursor, + cursor_to = max_id, + "Auto-detected changed files from observation events" + ); + + Ok(changed_files) +} + // ── Step 2: Propagate Staleness ─────────────────────────────────────────────── /// Traces evidence weights upward from changed L0 nodes through the pyramid @@ -510,6 +723,126 @@ mod tests { assert!(ChangeType::Deletion.skip_first_attenuation()); } + #[test] + fn test_entries_to_changed_files() { + let entries = vec![ + FileChangeEntry { + path: "src/main.rs".to_string(), + change_type: "modification".to_string(), + }, + FileChangeEntry { + path: "src/new.rs".to_string(), + change_type: "addition".to_string(), + }, + FileChangeEntry { + path: "src/old.rs".to_string(), + change_type: "deletion".to_string(), + }, + ]; + + let files = entries_to_changed_files(&entries); + assert_eq!(files.len(), 3); + assert_eq!(files[0].change_type, ChangeType::Modification); + assert_eq!(files[1].change_type, ChangeType::Addition); + assert_eq!(files[2].change_type, ChangeType::Deletion); + } + + #[test] + fn test_empty_changed_files_returns_empty_report() { + let conn = crate::pyramid::test_support::test_db_conn(); + let (report, decide_work, deltas_processed) = + run_trace_decide_check(&conn, "test", &[], 0.3).unwrap(); + assert!(report.affected_questions.is_empty()); + assert_eq!(decide_work.items_compiled, 0); + assert_eq!(decide_work.deduped, 0); + assert_eq!(deltas_processed, 0); + } + + #[test] + fn trace_decide_check_compiles_work_without_legacy_queue() { + let conn = crate::pyramid::test_support::test_db_conn(); + crate::pyramid::db::init_pyramid_db(&conn).unwrap(); + + let slug = "trace-decide"; + conn.execute( + "INSERT INTO pyramid_slugs (slug, content_type, source_path) + VALUES (?1, 'code', '/src')", + rusqlite::params![slug], + ) + .unwrap(); + conn.execute( + "INSERT INTO pyramid_file_hashes (slug, file_path, hash, chunk_count, node_ids) + VALUES (?1, 'src/main.rs', 'abc123', 1, '[\"L0-001\"]')", + rusqlite::params![slug], + ) + .unwrap(); + conn.execute( + "INSERT INTO pyramid_evidence + (slug, source_node_id, target_node_id, verdict, weight, reason) + VALUES (?1, 'L0-001', 'L1-001', 'KEEP', 0.95, 'direct citation')", + rusqlite::params![slug], + ) + .unwrap(); + + let changed = vec![ChangedFile { + path: "src/main.rs".to_string(), + change_type: ChangeType::Modification, + }]; + let (report, decide_work, deltas_processed) = + run_trace_decide_check(&conn, slug, &changed, 0.3).unwrap(); + + assert_eq!(report.affected_questions, vec!["L1-001".to_string()]); + assert_eq!(decide_work.items_compiled, 1); + assert_eq!(decide_work.deduped, 0); + assert_eq!(deltas_processed, 1); + + let queued_count: i64 = conn + .query_row( + "SELECT COUNT(*) FROM pyramid_staleness_queue WHERE slug = ?1", + rusqlite::params![slug], + |row| row.get(0), + ) + .unwrap(); + assert_eq!( + queued_count, 0, + "manual TRACE->DECIDE must not write the legacy staleness queue" + ); + + let work_count: i64 = conn + .query_row( + "SELECT COUNT(*) FROM dadbear_work_items + WHERE slug = ?1 AND primitive = 'stale_check' + AND step_name = 'cascade_stale_check'", + rusqlite::params![slug], + |row| row.get(0), + ) + .unwrap(); + assert_eq!(work_count, 1); + + let unprocessed = crate::pyramid::db::get_unprocessed_source_deltas(&conn, slug).unwrap(); + assert!( + unprocessed.is_empty(), + "source deltas advance only after DECIDE work has been compiled" + ); + } + + #[test] + fn test_default_request_deserializes() { + let json = r#"{}"#; + let req: CheckStalenessRequest = serde_json::from_str(json).unwrap(); + assert!(req.files.is_none()); + assert!(req.threshold.is_none()); + } + + #[test] + fn test_request_with_files_deserializes() { + let json = + r#"{"files": [{"path": "src/main.rs", "change_type": "modification"}], "threshold": 0.5}"#; + let req: CheckStalenessRequest = serde_json::from_str(json).unwrap(); + assert_eq!(req.files.as_ref().unwrap().len(), 1); + assert_eq!(req.threshold, Some(0.5)); + } + #[test] fn test_path_normalization() { assert_eq!(normalize_file_path("src/main.rs"), "src/main.rs"); diff --git a/src-tauri/src/pyramid/staleness_bridge.rs b/src-tauri/src/pyramid/staleness_bridge.rs deleted file mode 100644 index 17f72d8e..00000000 --- a/src-tauri/src/pyramid/staleness_bridge.rs +++ /dev/null @@ -1,369 +0,0 @@ -// pyramid/staleness_bridge.rs — Manual bridge into unified DADBEAR TRACE -> DECIDE -// -// The route handler (`POST /pyramid/:slug/check-staleness`) can be called: -// 1. Manually (e.g., from the frontend or CLI) with explicit changed files -// 2. With no body to auto-detect from DADBEAR observation events -// -// This bridge must not write to `pyramid_staleness_queue`. It records source -// deltas, runs deterministic TRACE, and compiles supervisor-owned DECIDE work. - -use anyhow::Result; -use rusqlite::Connection; -use serde::{Deserialize, Serialize}; -use tracing::info; - -use super::dadbear_compiler::{self, CompilationResult}; -use super::staleness::{self, ChangeType, ChangedFile, StalenessReport}; - -// ── Request / Response Types ───────────────────────────────────────────────── - -/// A single file change entry in the request body. -#[derive(Debug, Clone, Deserialize)] -pub struct FileChangeEntry { - pub path: String, - pub change_type: String, // "addition", "modification", "deletion" -} - -/// Request body for `POST /pyramid/:slug/check-staleness`. -/// If `files` is None or empty, auto-detect from pending mutations. -#[derive(Debug, Clone, Deserialize, Default)] -pub struct CheckStalenessRequest { - #[serde(default)] - pub files: Option>, - /// Override the default staleness threshold (0.3). - #[serde(default)] - pub threshold: Option, -} - -/// Full staleness check response. -#[derive(Debug, Clone, Serialize)] -pub struct CheckStalenessResponse { - /// How the changed files were determined. - pub source: String, - /// Number of changed files processed. - pub files_processed: usize, - /// The staleness propagation report. - pub report: StalenessReport, - /// Work items compiled for the supervisor-owned DECIDE step. - pub decide_work: CompilationResult, - /// Number of source deltas marked processed after DECIDE work was compiled. - pub deltas_processed: usize, -} - -// ── Core Bridge Function ───────────────────────────────────────────────────── - -/// Run the unified manual staleness path: -/// 1. detect_source_changes — save deltas, get pending set -/// 2. trace_staleness — deterministic evidence-DAG walk, no LLM -/// 3. compile_source_deltas_for_decide — enqueue supervisor DECIDE work items -/// -/// `changed_files` is the list of files that changed. If empty, this is a no-op -/// that returns an empty report (the caller should auto-detect before calling). -pub fn run_trace_decide_check( - conn: &Connection, - slug: &str, - changed_files: &[ChangedFile], - threshold: f64, -) -> Result<(StalenessReport, CompilationResult, usize)> { - if changed_files.is_empty() { - info!( - slug, - "No changed files provided, returning empty trace-decide report" - ); - return Ok(( - StalenessReport { - affected_questions: vec![], - max_depth_reached: 0, - staleness_scores: Default::default(), - }, - CompilationResult { - new_cursor: 0, - items_compiled: 0, - deps_created: 0, - deduped: 0, - }, - 0, - )); - } - - let deltas = staleness::detect_source_changes(conn, slug, changed_files)?; - - info!( - slug, - delta_count = deltas.len(), - file_count = changed_files.len(), - "Source changes detected, compiling direct TRACE->DECIDE work" - ); - - let trace_run_id = uuid::Uuid::new_v4().to_string(); - let run = dadbear_compiler::compile_source_deltas_for_decide( - conn, - slug, - &trace_run_id, - &deltas, - threshold, - None, - None, - )?; - - info!( - slug, - trace_run_id, - affected = run.report.affected_questions.len(), - items_compiled = run.compilation.items_compiled, - deduped = run.compilation.deduped, - deltas_processed = run.deltas_processed, - "Trace-decide staleness check complete" - ); - - Ok((run.report, run.compilation, run.deltas_processed)) -} - -// ── Auto-detect from Observation Events (with WAL fallback) ───────────────── - -/// Ensure the `last_bridge_observation_id` column exists on `pyramid_build_metadata`. -/// Uses ALTER TABLE IF NOT EXISTS pattern (idempotent). -fn ensure_bridge_cursor_column(conn: &Connection) { - // SQLite doesn't have ALTER TABLE ... ADD COLUMN IF NOT EXISTS, so we - // check pragma table_info first. - let has_column: bool = conn - .prepare("SELECT 1 FROM pragma_table_info('pyramid_build_metadata') WHERE name = 'last_bridge_observation_id'") - .and_then(|mut stmt| stmt.exists([])) - .unwrap_or(false); - - if !has_column { - let _ = conn.execute_batch( - "ALTER TABLE pyramid_build_metadata ADD COLUMN last_bridge_observation_id INTEGER DEFAULT 0;" - ); - } -} - -/// Get the current bridge cursor for a slug. -fn get_bridge_cursor(conn: &Connection, slug: &str) -> i64 { - conn.query_row( - "SELECT COALESCE(last_bridge_observation_id, 0) FROM pyramid_build_metadata WHERE slug = ?1", - rusqlite::params![slug], - |row| row.get(0), - ) - .unwrap_or(0) -} - -/// Advance the bridge cursor to the given observation event ID. -fn advance_bridge_cursor(conn: &Connection, slug: &str, new_cursor: i64) { - let _ = conn.execute( - "INSERT INTO pyramid_build_metadata (slug, last_bridge_observation_id, updated_at) - VALUES (?1, ?2, datetime('now')) - ON CONFLICT(slug) DO UPDATE SET last_bridge_observation_id = ?2, updated_at = datetime('now')", - rusqlite::params![slug, new_cursor], - ); -} - -/// Read observation events from `dadbear_observation_events` using a cursor, -/// converting file-level events to `ChangedFile` entries for the staleness pipeline. -/// -/// Read observation events from `dadbear_observation_events` using a cursor, -/// converting file-level events to `ChangedFile` entries for the staleness pipeline. -pub fn auto_detect_changed_files(conn: &Connection, slug: &str) -> Result> { - // Ensure the cursor column exists (idempotent migration) - ensure_bridge_cursor_column(conn); - - // Try the new observation events path first - let cursor = get_bridge_cursor(conn, slug); - - let mut stmt = conn.prepare( - "SELECT id, event_type, file_path FROM dadbear_observation_events - WHERE slug = ?1 AND id > ?2 - AND event_type IN ('file_modified', 'file_created', 'file_deleted', 'file_renamed') - AND file_path IS NOT NULL AND file_path != '' - ORDER BY id ASC", - )?; - - let rows: Vec<(i64, String, String)> = stmt - .query_map(rusqlite::params![slug, cursor], |row| { - Ok(( - row.get::<_, i64>(0)?, - row.get::<_, String>(1)?, - row.get::<_, String>(2)?, - )) - })? - .filter_map(|r| r.ok()) - .collect(); - - if !rows.is_empty() { - // New path: read from observation events and advance cursor - let max_id = rows.iter().map(|(id, _, _)| *id).max().unwrap_or(cursor); - - let mut seen = std::collections::HashSet::new(); - let mut changed_files = Vec::new(); - for (_id, event_type, file_path) in &rows { - if !seen.insert((file_path.clone(), event_type.clone())) { - continue; - } - let change_type = match event_type.as_str() { - "file_created" => ChangeType::Addition, - "file_deleted" => ChangeType::Deletion, - "file_modified" => ChangeType::Modification, - "file_renamed" => ChangeType::Modification, - _ => ChangeType::Modification, - }; - changed_files.push(ChangedFile { - path: file_path.clone(), - change_type, - }); - } - - advance_bridge_cursor(conn, slug, max_id); - - info!( - slug, - count = changed_files.len(), - cursor_from = cursor, - cursor_to = max_id, - "Auto-detected changed files from observation events (cursor advanced)" - ); - - return Ok(changed_files); - } - - // No fallback — the old WAL (pyramid_pending_mutations) has been dropped. - // If observation events are empty, return empty vec. - Ok(Vec::new()) -} - -/// Convert request body entries to internal `ChangedFile` format. -pub fn entries_to_changed_files(entries: &[FileChangeEntry]) -> Vec { - entries - .iter() - .map(|e| ChangedFile { - path: e.path.clone(), - change_type: ChangeType::from_str(&e.change_type), - }) - .collect() -} - -// ── Tests ──────────────────────────────────────────────────────────────────── - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_entries_to_changed_files() { - let entries = vec![ - FileChangeEntry { - path: "src/main.rs".to_string(), - change_type: "modification".to_string(), - }, - FileChangeEntry { - path: "src/new.rs".to_string(), - change_type: "addition".to_string(), - }, - FileChangeEntry { - path: "src/old.rs".to_string(), - change_type: "deletion".to_string(), - }, - ]; - - let files = entries_to_changed_files(&entries); - assert_eq!(files.len(), 3); - assert_eq!(files[0].change_type, ChangeType::Modification); - assert_eq!(files[1].change_type, ChangeType::Addition); - assert_eq!(files[2].change_type, ChangeType::Deletion); - } - - #[test] - fn test_empty_changed_files_returns_empty_report() { - // Use an in-memory DB (we won't hit it since changed_files is empty) - let conn = crate::pyramid::test_support::test_db_conn(); - let (report, decide_work, deltas_processed) = - run_trace_decide_check(&conn, "test", &[], 0.3).unwrap(); - assert!(report.affected_questions.is_empty()); - assert_eq!(decide_work.items_compiled, 0); - assert_eq!(decide_work.deduped, 0); - assert_eq!(deltas_processed, 0); - } - - #[test] - fn trace_decide_check_compiles_work_without_legacy_queue() { - let conn = crate::pyramid::test_support::test_db_conn(); - crate::pyramid::db::init_pyramid_db(&conn).unwrap(); - - let slug = "bridge-trace-decide"; - conn.execute( - "INSERT INTO pyramid_slugs (slug, content_type, source_path) - VALUES (?1, 'code', '/src')", - rusqlite::params![slug], - ) - .unwrap(); - conn.execute( - "INSERT INTO pyramid_file_hashes (slug, file_path, hash, chunk_count, node_ids) - VALUES (?1, 'src/main.rs', 'abc123', 1, '[\"L0-001\"]')", - rusqlite::params![slug], - ) - .unwrap(); - conn.execute( - "INSERT INTO pyramid_evidence - (slug, source_node_id, target_node_id, verdict, weight, reason) - VALUES (?1, 'L0-001', 'L1-001', 'KEEP', 0.95, 'direct citation')", - rusqlite::params![slug], - ) - .unwrap(); - - let changed = vec![ChangedFile { - path: "src/main.rs".to_string(), - change_type: ChangeType::Modification, - }]; - let (report, decide_work, deltas_processed) = - run_trace_decide_check(&conn, slug, &changed, 0.3).unwrap(); - - assert_eq!(report.affected_questions, vec!["L1-001".to_string()]); - assert_eq!(decide_work.items_compiled, 1); - assert_eq!(decide_work.deduped, 0); - assert_eq!(deltas_processed, 1); - - let queued_count: i64 = conn - .query_row( - "SELECT COUNT(*) FROM pyramid_staleness_queue WHERE slug = ?1", - rusqlite::params![slug], - |row| row.get(0), - ) - .unwrap(); - assert_eq!( - queued_count, 0, - "manual bridge must not write the legacy staleness queue" - ); - - let work_count: i64 = conn - .query_row( - "SELECT COUNT(*) FROM dadbear_work_items - WHERE slug = ?1 AND primitive = 'stale_check' - AND step_name = 'cascade_stale_check'", - rusqlite::params![slug], - |row| row.get(0), - ) - .unwrap(); - assert_eq!(work_count, 1); - - let unprocessed = crate::pyramid::db::get_unprocessed_source_deltas(&conn, slug).unwrap(); - assert!( - unprocessed.is_empty(), - "source deltas advance only after DECIDE work has been compiled" - ); - } - - #[test] - fn test_default_request_deserializes() { - let json = r#"{}"#; - let req: CheckStalenessRequest = serde_json::from_str(json).unwrap(); - assert!(req.files.is_none()); - assert!(req.threshold.is_none()); - } - - #[test] - fn test_request_with_files_deserializes() { - let json = r#"{"files": [{"path": "src/main.rs", "change_type": "modification"}], "threshold": 0.5}"#; - let req: CheckStalenessRequest = serde_json::from_str(json).unwrap(); - assert_eq!(req.files.as_ref().unwrap().len(), 1); - assert_eq!(req.threshold, Some(0.5)); - } -}