diff --git a/vault/vault.go b/vault/vault.go index eb6d0a7..7a7561c 100644 --- a/vault/vault.go +++ b/vault/vault.go @@ -9,6 +9,7 @@ import ( "path/filepath" "strings" "sync" + "time" "github.com/fsnotify/fsnotify" "github.com/google/uuid" @@ -66,6 +67,14 @@ type Client struct { searchIndex *SearchIndex // inverted index for full-text search mu sync.RWMutex // protects all maps above watcher *fsnotify.Watcher // file system watcher + + // Debounce state for fsnotify events. Absorbs the back-to-back + // Remove+Create sequences produced by atomic temp+rename saves + // (internal write paths, iCloud sync, external editors). See + // watcher_debounce.go for the coalescer and atomic helpers. + pendingMu sync.Mutex + pendingEvents map[string]*pendingEvent + debounceDelay time.Duration // 0 → defaultDebounceDelay (100ms) } // blockLookup stores a block and its page for UUID-based retrieval. @@ -308,7 +317,10 @@ func (c *Client) watchLoop() { } } -// handleEvent processes a single fsnotify event. +// handleEvent processes a single fsnotify event by scheduling a debounced +// resolution. The actual resolve reads disk state after debounceDelay so it +// absorbs the back-to-back Remove+Create sequences produced by atomic +// temp+rename. See watcher_debounce.go for the coalescer and atomic helpers. func (c *Client) handleEvent(event fsnotify.Event) { // Skip non-.md files. if !strings.HasSuffix(event.Name, ".md") { @@ -330,43 +342,26 @@ func (c *Client) handleEvent(event fsnotify.Event) { return } + // Coalesce every Create/Write/Remove/Rename per path. The final + // resolve (after debounceDelay) reads real disk state: present → + // reindex, absent → remove. Race-free against atomic rename. switch { - case event.Op&fsnotify.Create == fsnotify.Create, event.Op&fsnotify.Write == fsnotify.Write: - // File created or modified: re-index it. - content, err := os.ReadFile(event.Name) - if err != nil { - log.Printf("graphthulhu: failed to read %s: %v\n", event.Name, err) - return - } - info, err := os.Stat(event.Name) - if err != nil { - log.Printf("graphthulhu: failed to stat %s: %v\n", event.Name, err) - return - } - c.indexFile(filepath.ToSlash(relPath), string(content), info) - c.BuildBacklinks() - log.Printf("graphthulhu: reindexed %s\n", relPath) - - case event.Op&fsnotify.Remove == fsnotify.Remove: - // File deleted: remove from index. - name := strings.TrimSuffix(filepath.ToSlash(relPath), ".md") - lowerName := strings.ToLower(name) - c.removePageFromIndex(lowerName) - c.BuildBacklinks() - log.Printf("graphthulhu: removed %s from index\n", relPath) - - case event.Op&fsnotify.Rename == fsnotify.Rename: - // File renamed: treat as remove (new name will trigger Create event). - name := strings.TrimSuffix(filepath.ToSlash(relPath), ".md") - lowerName := strings.ToLower(name) - c.removePageFromIndex(lowerName) - c.BuildBacklinks() - log.Printf("graphthulhu: removed %s from index (rename)\n", relPath) + case event.Op&(fsnotify.Create|fsnotify.Write|fsnotify.Remove|fsnotify.Rename) != 0: + c.scheduleEventResolution(event.Name, relPath) } } -// Close stops the file watcher. +// Close stops the file watcher and any pending debounce timers. +// Without this, timers scheduled before shutdown would still fire and +// mutate c.pages / log spam after the server has stopped. func (c *Client) Close() error { + c.pendingMu.Lock() + for _, pe := range c.pendingEvents { + pe.timer.Stop() + } + c.pendingEvents = nil + c.pendingMu.Unlock() + if c.watcher != nil { return c.watcher.Close() } diff --git a/vault/vault_test.go b/vault/vault_test.go index ec9ca4e..fbb5495 100644 --- a/vault/vault_test.go +++ b/vault/vault_test.go @@ -656,8 +656,8 @@ func TestWatchFileCreate(t *testing.T) { t.Fatalf("WriteFile: %v", err) } - // Give the watcher time to process. - time.Sleep(100 * time.Millisecond) + // Give the watcher time to process (fsnotify event + 100ms debounce + resolve). + time.Sleep(250 * time.Millisecond) // Should be indexed. page, err := c.GetPage(ctx, "watched-create") @@ -695,8 +695,8 @@ func TestWatchFileModify(t *testing.T) { t.Fatalf("WriteFile modify: %v", err) } - // Give the watcher time to process. - time.Sleep(100 * time.Millisecond) + // Give the watcher time to process (fsnotify event + 100ms debounce + resolve). + time.Sleep(250 * time.Millisecond) // Should have updated content. blocks, err := c.GetPageBlocksTree(ctx, "watched-modify") @@ -743,8 +743,8 @@ func TestWatchFileDelete(t *testing.T) { t.Fatalf("Remove: %v", err) } - // Give the watcher time to process. - time.Sleep(100 * time.Millisecond) + // Give the watcher time to process (fsnotify event + 100ms debounce + resolve). + time.Sleep(250 * time.Millisecond) // Should be gone from index. page, err := c.GetPage(ctx, "watched-delete") diff --git a/vault/watcher_debounce.go b/vault/watcher_debounce.go new file mode 100644 index 0000000..15d9862 --- /dev/null +++ b/vault/watcher_debounce.go @@ -0,0 +1,123 @@ +package vault + +import ( + "log" + "os" + "path/filepath" + "strings" + "time" +) + +// defaultDebounceDelay is how long we wait before resolving an fsnotify +// event. 100ms absorbs the macOS / iCloud atomic temp+rename pattern +// (Remove followed by Create within ~10ms) without noticeably delaying +// real deletes or legitimate writes. +const defaultDebounceDelay = 100 * time.Millisecond + +// pendingEvent pairs a debounce timer with the path it resolves. +type pendingEvent struct { + absPath string + relPath string + timer *time.Timer +} + +// scheduleEventResolution coalesces fsnotify events for the same path. +// A new event for an in-flight path resets the existing timer so we wait +// debounceDelay again before resolving. The resolver reads disk state +// and applies remove or reindex accordingly — race-free against atomic +// temp+rename, since the final state always reflects what is on disk at +// resolve time, no matter which Remove/Create sequence we received. +func (c *Client) scheduleEventResolution(absPath, relPath string) { + c.pendingMu.Lock() + defer c.pendingMu.Unlock() + + if c.pendingEvents == nil { + c.pendingEvents = make(map[string]*pendingEvent) + } + + delay := c.debounceDelay + if delay == 0 { + delay = defaultDebounceDelay + } + + if existing, ok := c.pendingEvents[absPath]; ok { + existing.timer.Stop() + } + + pe := &pendingEvent{absPath: absPath, relPath: relPath} + pe.timer = time.AfterFunc(delay, func() { + c.resolveFileState(absPath, relPath) + c.pendingMu.Lock() + // Match by pointer identity: a concurrent schedule may have + // installed a newer pendingEvent between timer fire and lock + // acquisition. Don't delete that newer entry. + if cur, ok := c.pendingEvents[absPath]; ok && cur == pe { + delete(c.pendingEvents, absPath) + } + c.pendingMu.Unlock() + }) + c.pendingEvents[absPath] = pe +} + +// resolveFileState reads the current disk state and applies remove or +// reindex. Called by the debounce timer. Race-free against fsnotify: the +// final state always reflects what is on disk at the time of the call, +// no matter which event sequence was delivered. +func (c *Client) resolveFileState(absPath, relPath string) { + info, err := os.Stat(absPath) + if err != nil { + // File absent (real delete or unresolved rename). Remove for good. + name := strings.TrimSuffix(filepath.ToSlash(relPath), ".md") + lowerName := strings.ToLower(name) + c.removePageWithLinks(lowerName) + log.Printf("graphthulhu: removed %s from index\n", relPath) + return + } + content, err := os.ReadFile(absPath) + if err != nil { + log.Printf("graphthulhu: failed to read %s: %v\n", absPath, err) + return + } + c.indexFileWithLinks(filepath.ToSlash(relPath), string(content), info) + log.Printf("graphthulhu: reindexed %s\n", relPath) +} + +// indexFileWithLinks parses, indexes a file, AND rebuilds backlinks under +// a single lock. A reader cannot observe a state where the page exists in +// c.pages but backlinks don't yet reflect its new content. Replaces the +// previous indexFile + BuildBacklinks sequence (two separate lock +// acquisitions with an observable intermediate window). +func (c *Client) indexFileWithLinks(relPath, content string, info os.FileInfo) { + page := c.parseFile(relPath, content, info) + c.mu.Lock() + defer c.mu.Unlock() + c.applyPageIndex(page) + c.rebuildLinksLocked() +} + +// removePageWithLinks removes a page AND rebuilds backlinks under a +// single lock, so the removal is atomic to readers. +func (c *Client) removePageWithLinks(lowerName string) { + c.mu.Lock() + defer c.mu.Unlock() + c.removePageFromIndexLocked(lowerName) + c.rebuildLinksLocked() +} + +// flushPendingEventsForTest synchronously resolves all pending events. +// Used only by tests to avoid waiting for the real debounceDelay. +// Production code has no reason to call this. +func (c *Client) flushPendingEventsForTest() { + c.pendingMu.Lock() + pending := make([]*pendingEvent, 0, len(c.pendingEvents)) + for _, pe := range c.pendingEvents { + pe.timer.Stop() + pending = append(pending, pe) + } + c.pendingEvents = nil + c.pendingMu.Unlock() + + for _, pe := range pending { + c.resolveFileState(pe.absPath, pe.relPath) + } +} diff --git a/vault/watcher_debounce_test.go b/vault/watcher_debounce_test.go new file mode 100644 index 0000000..ccb3f57 --- /dev/null +++ b/vault/watcher_debounce_test.go @@ -0,0 +1,256 @@ +package vault + +import ( + "context" + "os" + "path/filepath" + "strings" + "sync" + "testing" + "time" +) + +// TestDebounce_AtomicRenameNoMissingPage simulates an atomic temp+rename: +// a Remove(target.md) immediately followed by Create(target.md) must NOT +// make the page disappear from the index, even transiently. +// +// Before this fix, the watcher did unconditional remove on Remove and +// reindex on Create — a ~10ms intermediate window where a concurrent +// get_page would see "page absent." +func TestDebounce_AtomicRenameNoMissingPage(t *testing.T) { + c := testWritableVault(t) + // Bump debounce so the assertion below runs comfortably inside the + // debounce window even on slow CI runners (Windows fsnotify can take + // 50-100ms per event). + c.debounceDelay = 200 * time.Millisecond + ctx := context.Background() + + // Seed: create an initial file and index it. + target := filepath.Join(c.vaultPath, "target.md") + if err := os.WriteFile(target, []byte("# Target initial\n\nv1"), 0o644); err != nil { + t.Fatalf("seed: %v", err) + } + if err := c.Reload(); err != nil { + t.Fatalf("reload: %v", err) + } + + if err := c.Watch(); err != nil { + t.Fatalf("watch: %v", err) + } + defer c.Close() + + // Simulate atomic temp+rename: write a temp then rename over the target. + temp := filepath.Join(c.vaultPath, ".target.tmp.md") + if err := os.WriteFile(temp, []byte("# Target updated\n\nv2"), 0o644); err != nil { + t.Fatalf("temp write: %v", err) + } + if err := os.Rename(temp, target); err != nil { + t.Fatalf("rename: %v", err) + } + + // During the debounce window (before resolve), the original page must + // stay accessible — no "page disappeared" window. + page, err := c.GetPage(ctx, "target") + if err != nil || page == nil { + t.Errorf("page must remain accessible during debounce window, got page=%v err=%v", page, err) + } + + // After resolve completes, the page must reflect the new content. + time.Sleep(400 * time.Millisecond) + blocks, err := c.GetPageBlocksTree(ctx, "target") + if err != nil { + t.Fatalf("GetPageBlocksTree post-resolve: %v", err) + } + if len(blocks) == 0 || !strings.Contains(blocks[0].Content, "Target updated") { + t.Errorf("expected updated content after resolve, got blocks=%+v", blocks) + } +} + +// TestDebounce_BurstWritesCoalesce simulates rapid successive writes +// (typical of multi-writer workloads where multiple producers update the +// same page in quick succession) and verifies that only the final state +// is resolved — not a cascade of expensive reindexes. +func TestDebounce_BurstWritesCoalesce(t *testing.T) { + c := testWritableVault(t) + // Bump debounce to give the burst loop comfortable headroom under + // the debounce window even on slow CI. + c.debounceDelay = 200 * time.Millisecond + ctx := context.Background() + + target := filepath.Join(c.vaultPath, "burst.md") + if err := c.Watch(); err != nil { + t.Fatalf("watch: %v", err) + } + defer c.Close() + + // 10 successive writes inside the debounce window → 1 resolve expected. + for i := 0; i < 10; i++ { + content := strings.NewReplacer("$N", string(rune('0'+i))).Replace("# Burst v$N\n\ncontent $N") + if err := os.WriteFile(target, []byte(content), 0o644); err != nil { + t.Fatalf("write %d: %v", i, err) + } + time.Sleep(5 * time.Millisecond) + } + + // After debounce + resolve: must reflect v9 (the last write). + time.Sleep(500 * time.Millisecond) + blocks, err := c.GetPageBlocksTree(ctx, "burst") + if err != nil { + t.Fatalf("GetPageBlocksTree: %v", err) + } + if len(blocks) == 0 || !strings.Contains(blocks[0].Content, "Burst v9") { + t.Errorf("expected final state v9, got %+v", blocks) + } +} + +// TestIndexFileWithLinks_Atomic verifies that a concurrent reader can +// NEVER observe a state where the page exists in c.pages but backlinks +// don't yet reflect it (the intermediate window between indexFile and +// BuildBacklinks in the previous implementation — race 1). +func TestIndexFileWithLinks_Atomic(t *testing.T) { + c := testWritableVault(t) + ctx := context.Background() + + // Pre-setup: one page that will link to the new target. + if err := os.WriteFile(filepath.Join(c.vaultPath, "linker.md"), + []byte("# Linker\n\n- to [[atomic-target]]"), 0o644); err != nil { + t.Fatal(err) + } + if err := c.Reload(); err != nil { + t.Fatal(err) + } + + // Concurrent reader spamming GetPage + getBacklinks. + stop := make(chan struct{}) + var inconsistencies int + var mu sync.Mutex + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-stop: + return + default: + } + page, _ := c.GetPage(ctx, "atomic-target") + if page == nil { + continue + } + // Page exists: backlinks MUST include Linker (atomicity). + c.mu.RLock() + bl := c.backlinks[strings.ToLower(page.Name)] + c.mu.RUnlock() + found := false + for _, b := range bl { + if strings.EqualFold(b.fromPage, "linker") { + found = true + break + } + } + if !found { + mu.Lock() + inconsistencies++ + mu.Unlock() + } + } + }() + + // Index the target page 50 times (each indexFileWithLinks must be atomic). + info, _ := os.Stat(filepath.Join(c.vaultPath, "linker.md")) + for i := 0; i < 50; i++ { + c.indexFileWithLinks("atomic-target.md", "# Atomic Target\n\nbody", info) + time.Sleep(time.Microsecond) + } + + close(stop) + wg.Wait() + + mu.Lock() + defer mu.Unlock() + if inconsistencies > 0 { + t.Errorf("observed %d inconsistencies (page exists but backlinks missing) — atomicity broken", inconsistencies) + } +} + +// TestRemovePageWithLinks_Atomic: analogue for removal. +func TestRemovePageWithLinks_Atomic(t *testing.T) { + c := testWritableVault(t) + ctx := context.Background() + + // Pre-setup: target page + linker. + if err := os.WriteFile(filepath.Join(c.vaultPath, "removable.md"), + []byte("# Removable\n\nbody"), 0o644); err != nil { + t.Fatal(err) + } + if err := os.WriteFile(filepath.Join(c.vaultPath, "linker2.md"), + []byte("# Linker2\n\n- to [[removable]]"), 0o644); err != nil { + t.Fatal(err) + } + if err := c.Reload(); err != nil { + t.Fatal(err) + } + + // Verify setup is OK. + page, _ := c.GetPage(ctx, "removable") + if page == nil { + t.Fatal("setup: removable should exist") + } + + // Remove atomically. + c.removePageWithLinks("removable") + + // Page absent; backlinks for 'removable' should be empty or missing. + page, _ = c.GetPage(ctx, "removable") + if page != nil { + t.Errorf("page should be removed, got %+v", page) + } + c.mu.RLock() + bl := c.backlinks["removable"] + c.mu.RUnlock() + // Note: backlinks from other pages still POINTING at 'removable' may + // persist until those source pages are rebuilt — that's fine. The + // contract is atomicity with respect to the primary index c.pages, + // not transitive consistency. + _ = bl +} + +// TestScheduleEventResolution_Coalesce: 5 successive events for the same +// path within the debounce window must yield exactly 1 resolve. +func TestScheduleEventResolution_Coalesce(t *testing.T) { + c := testWritableVault(t) + c.debounceDelay = 50 * time.Millisecond + + target := filepath.Join(c.vaultPath, "coalesce.md") + if err := os.WriteFile(target, []byte("# Coalesce v0"), 0o644); err != nil { + t.Fatal(err) + } + + // 5 successive events inside the 50ms window. + for i := 0; i < 5; i++ { + c.scheduleEventResolution(target, "coalesce.md") + time.Sleep(5 * time.Millisecond) + } + + // Before delay: 1 pending timer (the previous 4 were stopped). + c.pendingMu.Lock() + pending := len(c.pendingEvents) + c.pendingMu.Unlock() + if pending != 1 { + t.Errorf("expected 1 pending event after 5 schedules, got %d", pending) + } + + // After delay + margin: pending empty, page indexed. + time.Sleep(150 * time.Millisecond) + c.pendingMu.Lock() + pending = len(c.pendingEvents) + c.pendingMu.Unlock() + if pending != 0 { + t.Errorf("expected pending events flushed, got %d remaining", pending) + } + page, err := c.GetPage(context.Background(), "coalesce") + if err != nil || page == nil { + t.Errorf("page should be indexed after resolve, page=%v err=%v", page, err) + } +}