From 90de8ee65f0ebd296317040f85179e84cff52a2b Mon Sep 17 00:00:00 2001 From: Leo Date: Mon, 9 Feb 2026 02:56:19 +0900 Subject: [PATCH 1/2] feat: separate trace logs from normal WAL transmission - Add frame_parser.go for trace/non-trace log separation using byte matching - Add trace_store.go for height-indexed trace file management - Modify agent.go for 2-phase transmission (non-trace first, then trace for failures) - Add trace cleanup loop to manage disk usage --- internal/agent/agent.go | 213 +++++++++++++++++------- internal/agent/agent_test.go | 87 ++++++++-- internal/agent/cleanup.go | 94 ++++++++++- internal/agent/config_test.go | 4 +- internal/agent/frame_parser.go | 214 ++++++++++++++++++++++++ internal/agent/trace_store.go | 293 +++++++++++++++++++++++++++++++++ 6 files changed, 825 insertions(+), 80 deletions(-) create mode 100644 internal/agent/frame_parser.go create mode 100644 internal/agent/trace_store.go diff --git a/internal/agent/agent.go b/internal/agent/agent.go index 2b23afc..665f999 100644 --- a/internal/agent/agent.go +++ b/internal/agent/agent.go @@ -16,10 +16,20 @@ import ( ) const ( - walFramesEndpoint = "/v1/ingest/wal-frames" - configEndpoint = "/v1/ingest/config" + walFramesEndpoint = "/v1/ingest/wal-frames" + traceFramesEndpoint = "/v1/ingest/trace-frames" + configEndpoint = "/v1/ingest/config" ) +// IngestResponse represents the response from WAL frames endpoint. +type IngestResponse struct { + Status string `json:"status"` + Frames int `json:"frames"` + EventsProc int `json:"events_processed"` + EventsStored int `json:"events_stored"` + FailureHeights []int64 `json:"failure_heights"` +} + type batchFrame struct { Meta FrameMeta Compressed []byte @@ -43,6 +53,14 @@ func Run(ctx context.Context, cfg Config) error { go watcher.Run(ctx) go walCleanupLoop(ctx, cfg.WALDir, cfg.StateDir) + // Initialize trace store for height-indexed trace logs + traceDir := filepath.Join(cfg.StateDir, "traces") + traceStore, err := NewTraceStore(traceDir) + if err != nil { + return fmt.Errorf("init trace store: %w", err) + } + go traceCleanupLoop(ctx, cfg.StateDir) + // Load prior state; if none, start from the oldest index (first logs) st, _ := loadState(cfg.StateDir) if st.IdxPath == "" { @@ -98,7 +116,7 @@ func Run(ctx context.Context, cfg Config) error { if errors.Is(nerr, io.EOF) { // Flush pending batch if len(batch) > 0 { - trySend(cfg, httpClient, &batch, &batchBytes, &st, filepath.Base(st.IdxPath), &gz, lastSend, back) + trySend(cfg, httpClient, traceStore, &batch, &batchBytes, &st, filepath.Base(st.IdxPath), lastSend, back) lastSend = st.LastSendAt } if cfg.Once { @@ -178,13 +196,13 @@ func Run(ctx context.Context, cfg Config) error { bf := batchFrame{Meta: fm, Compressed: b, IdxLineLen: len(line)} batch = append(batch, bf) batchBytes += len(b) - trySend(cfg, httpClient, &batch, &batchBytes, &st, filepath.Base(st.IdxPath), &gz, lastSend, back) + trySend(cfg, httpClient, traceStore, &batch, &batchBytes, &st, filepath.Base(st.IdxPath), lastSend, back) lastSend = st.LastSendAt continue } // Normal batch if cfg.MaxBatchBytes > 0 && batchBytes+len(b) > cfg.MaxBatchBytes { - trySend(cfg, httpClient, &batch, &batchBytes, &st, filepath.Base(st.IdxPath), &gz, lastSend, back) + trySend(cfg, httpClient, traceStore, &batch, &batchBytes, &st, filepath.Base(st.IdxPath), lastSend, back) lastSend = st.LastSendAt } batch = append(batch, batchFrame{Meta: fm, Compressed: b, IdxLineLen: len(line)}) @@ -192,88 +210,155 @@ func Run(ctx context.Context, cfg Config) error { // Time-based send if time.Since(lastSend) >= cfg.SendInterval || time.Since(lastSend) >= cfg.HardInterval { - trySend(cfg, httpClient, &batch, &batchBytes, &st, filepath.Base(st.IdxPath), &gz, lastSend, back) + trySend(cfg, httpClient, traceStore, &batch, &batchBytes, &st, filepath.Base(st.IdxPath), lastSend, back) lastSend = st.LastSendAt } } } -func trySend(cfg Config, httpClient *http.Client, batch *[]batchFrame, batchBytes *int, st *state, curIdxBase string, gz **os.File, lastSend time.Time, back *backoff) { +func trySend(cfg Config, httpClient *http.Client, traceStore *TraceStore, batch *[]batchFrame, batchBytes *int, st *state, curIdxBase string, lastSend time.Time, back *backoff) { if len(*batch) == 0 { return } - // Resource gating (soft) hard := time.Since(lastSend) >= cfg.HardInterval if !hard && !resourcesOK(cfg) { return } - // Build payload - manifest := make([]FrameMeta, 0, len(*batch)) + // Parse frames and separate trace from non-trace + var nonTraceFrames []batchFrame var advance int64 for _, fr := range *batch { - manifest = append(manifest, fr.Meta) advance += int64(fr.IdxLineLen) + + separated, err := separateLogs(fr.Compressed) + if err != nil { + logger.Warn().Err(err).Str("file", fr.Meta.File).Msg("failed to parse frame, sending as-is") + nonTraceFrames = append(nonTraceFrames, fr) + continue + } + + // Save trace lines by height + for height, lines := range separated.TraceByHeight { + if err := traceStore.Save(height, lines); err != nil { + logger.Warn().Err(err).Int64("height", height).Msg("failed to save trace") + } + } + + // Compress non-trace lines + if len(separated.NonTraceLines) > 0 { + compressed, err := CompressLines(separated.NonTraceLines) + if err != nil { + logger.Warn().Err(err).Msg("failed to compress non-trace lines") + continue + } + newMeta := fr.Meta + newMeta.Len = uint64(len(compressed)) + newMeta.Recs = uint32(len(separated.NonTraceLines)) + nonTraceFrames = append(nonTraceFrames, batchFrame{ + Meta: newMeta, + Compressed: compressed, + IdxLineLen: fr.IdxLineLen, + }) + } } - url := cfg.ServiceURL + walFramesEndpoint + + // Send non-trace frames + if len(nonTraceFrames) == 0 { + logger.Debug().Msg("no non-trace frames to send") + st.IdxOffset += advance + st.LastSendAt = time.Now() + st.LastCommitAt = st.LastSendAt + _ = saveState(cfg.StateDir, *st) + *batch = (*batch)[:0] + *batchBytes = 0 + back.Reset() + return + } + + ingestResp, ok := sendFrames(cfg, httpClient, nonTraceFrames, curIdxBase, walFramesEndpoint, back) + if !ok { + return + } + + logger.Info(). + Int("frames", len(nonTraceFrames)). + Int("failure_heights", len(ingestResp.FailureHeights)). + Msg("sent non-trace batch") + + // Send trace for failure heights + if len(ingestResp.FailureHeights) > 0 { + sendTraceForHeights(cfg, httpClient, traceStore, ingestResp.FailureHeights, back) + } + + // Commit state + if len(*batch) > 0 { + lastFrame := (*batch)[len(*batch)-1] + st.LastFile = lastFrame.Meta.File + st.LastFrame = lastFrame.Meta.Frame + } + st.IdxOffset += advance + st.LastSendAt = time.Now() + st.LastCommitAt = st.LastSendAt + _ = saveState(cfg.StateDir, *st) + + *batch = (*batch)[:0] + *batchBytes = 0 + back.Reset() +} + +// sendFrames sends frames to the specified endpoint and returns the response. +func sendFrames(cfg Config, httpClient *http.Client, frames []batchFrame, curIdxBase string, endpoint string, back *backoff) (*IngestResponse, bool) { + manifest := make([]FrameMeta, 0, len(frames)) + for _, fr := range frames { + manifest = append(manifest, fr.Meta) + } + var body bytes.Buffer writer := multipart.NewWriter(&body) - // Log batch details before building payload - logger.Debug(). - Int("batch_frames", len(*batch)). - Int("batch_bytes", *batchBytes). - Int("max_batch_bytes", cfg.MaxBatchBytes). - Msg("Building multipart payload") - manifestJSON, err := json.Marshal(manifest) if err != nil { logger.Error().Err(err).Msg("marshal manifest") back.Sleep() - return + return nil, false } manifestPart, err := writer.CreateFormField("manifest") if err != nil { logger.Error().Err(err).Msg("create manifest field") back.Sleep() - return + return nil, false } if _, err := manifestPart.Write(manifestJSON); err != nil { logger.Error().Err(err).Msg("write manifest field") back.Sleep() - return + return nil, false } framesPart, err := writer.CreateFormFile("frames", curIdxBase) if err != nil { logger.Error().Err(err).Msg("create frames field") back.Sleep() - return + return nil, false } - for _, fr := range *batch { + for _, fr := range frames { if _, err := framesPart.Write(fr.Compressed); err != nil { logger.Error().Err(err).Msg("write frames payload") back.Sleep() - return + return nil, false } } if err := writer.Close(); err != nil { logger.Error().Err(err).Msg("finalize multipart payload") back.Sleep() - return + return nil, false } - // Log actual multipart body size - bodySize := body.Len() - logger.Debug(). - Int("body_size_bytes", bodySize). - Int("body_size_mb", bodySize/(1<<20)). - Int("frames_in_manifest", len(manifest)). - Msg("Multipart payload ready") - + url := cfg.ServiceURL + endpoint req, err := http.NewRequest(http.MethodPost, url, &body) if err != nil { - return + back.Sleep() + return nil, false } req.Header.Set("Authorization", "Bearer "+cfg.AuthKey) req.Header.Set("Content-Type", writer.FormDataContentType()) @@ -284,39 +369,55 @@ func trySend(cfg Config, httpClient *http.Client, batch *[]batchFrame, batchByte resp, err := httpClient.Do(req) if err != nil { - logger.Error().Err(err).Msg("send batch") + logger.Error().Err(err).Msg("send request") back.Sleep() - return + return nil, false } defer resp.Body.Close() + + respBody, _ := io.ReadAll(resp.Body) if resp.StatusCode/100 != 2 { - body, _ := io.ReadAll(resp.Body) logger.Error(). Int("status", resp.StatusCode). - Str("body", string(body)). + Str("body", string(respBody)). Msg("server returned error") back.Sleep() - return + return nil, false } - logger.Info(). - Int("frames", len(*batch)). - Int("bytes", *batchBytes). - Int("size_mb", *batchBytes/(1<<20)). - Msg("sent batch") + var ingestResp IngestResponse + if err := json.Unmarshal(respBody, &ingestResp); err != nil { + logger.Warn().Err(err).Msg("failed to parse ingest response") + return &IngestResponse{Status: "ok"}, true + } - // Success: commit idx offset - st.IdxOffset += advance - st.LastFile = manifest[len(manifest)-1].File - st.LastFrame = manifest[len(manifest)-1].Frame - st.LastSendAt = time.Now() - st.LastCommitAt = st.LastSendAt - _ = saveState(cfg.StateDir, *st) + return &ingestResp, true +} - // reset batch - *batch = (*batch)[:0] - *batchBytes = 0 - back.Reset() +// sendTraceForHeights sends trace data for the given failure heights. +func sendTraceForHeights(cfg Config, httpClient *http.Client, traceStore *TraceStore, heights []int64, back *backoff) { + compressed, metas, err := traceStore.LoadMultiple(heights) + if err != nil { + logger.Warn().Err(err).Msg("failed to load traces for failure heights") + return + } + if compressed == nil || len(metas) == 0 { + logger.Debug().Ints64("heights", heights).Msg("no trace data found for failure heights") + return + } + + frames := []batchFrame{{ + Meta: metas[0], + Compressed: compressed, + }} + + _, ok := sendFrames(cfg, httpClient, frames, "trace.wal.gz", traceFramesEndpoint, back) + if ok { + logger.Info(). + Ints64("heights", heights). + Int("bytes", len(compressed)). + Msg("sent trace for failure heights") + } } func hostname() string { diff --git a/internal/agent/agent_test.go b/internal/agent/agent_test.go index a62849a..b71d5e3 100644 --- a/internal/agent/agent_test.go +++ b/internal/agent/agent_test.go @@ -76,11 +76,18 @@ func TestTrySend(t *testing.T) { })) defer ts.Close() + tmpDir := t.TempDir() + traceStore, err := NewTraceStore(filepath.Join(tmpDir, "traces")) + if err != nil { + t.Fatal(err) + } + cfg := Config{ ServiceURL: ts.URL, ChainID: "test-chain", NodeID: "test-node", AuthKey: "secret", + StateDir: tmpDir, } batch := []batchFrame{ @@ -94,7 +101,7 @@ func TestTrySend(t *testing.T) { st := state{IdxOffset: 0} back := newBackoff(time.Millisecond, time.Second) - trySend(cfg, http.DefaultClient, &batch, &batchBytes, &st, "000.idx", nil, time.Now(), back) + trySend(cfg, http.DefaultClient, traceStore, &batch, &batchBytes, &st, "000.idx", time.Now(), back) if len(batch) != 0 { t.Errorf("batch length = %d, want 0", len(batch)) @@ -110,13 +117,13 @@ func TestTrySend(t *testing.T) { func TestRun_Startup(t *testing.T) { tmpDir := t.TempDir() walDir := filepath.Join(tmpDir, "data", "log.wal") - if err := os.MkdirAll(walDir, 0755); err != nil { + if err := os.MkdirAll(walDir, 0o755); err != nil { t.Fatal(err) } // Create genesis.json and node_key.json configDir := filepath.Join(tmpDir, "config") - if err := os.MkdirAll(configDir, 0755); err != nil { + if err := os.MkdirAll(configDir, 0o755); err != nil { t.Fatal(err) } genesis := genesisDoc{ChainID: "test-chain"} @@ -124,7 +131,7 @@ func TestRun_Startup(t *testing.T) { if err != nil { t.Fatal(err) } - if err := os.WriteFile(filepath.Join(configDir, "genesis.json"), genesisBytes, 0644); err != nil { + if err := os.WriteFile(filepath.Join(configDir, "genesis.json"), genesisBytes, 0o644); err != nil { t.Fatal(err) } @@ -145,12 +152,12 @@ func TestRun_Startup(t *testing.T) { if err != nil { t.Fatal(err) } - if err := os.WriteFile(filepath.Join(configDir, "node_key.json"), nodeKeyBytes, 0644); err != nil { + if err := os.WriteFile(filepath.Join(configDir, "node_key.json"), nodeKeyBytes, 0o644); err != nil { t.Fatal(err) } // Create dummy WAL files - if err := os.WriteFile(filepath.Join(walDir, "0000000000000000.idx"), []byte{}, 0644); err != nil { + if err := os.WriteFile(filepath.Join(walDir, "0000000000000000.idx"), []byte{}, 0o644); err != nil { t.Fatal(err) } @@ -173,14 +180,20 @@ func TestRun_Startup(t *testing.T) { } func TestTrySend_EmptyBatch(t *testing.T) { - cfg := Config{} + tmpDir := t.TempDir() + traceStore, err := NewTraceStore(filepath.Join(tmpDir, "traces")) + if err != nil { + t.Fatal(err) + } + + cfg := Config{StateDir: tmpDir} batch := []batchFrame{} batchBytes := 0 st := state{} back := newBackoff(time.Millisecond, time.Second) // Should return immediately without error or panic - trySend(cfg, http.DefaultClient, &batch, &batchBytes, &st, "000.idx", nil, time.Now(), back) + trySend(cfg, http.DefaultClient, traceStore, &batch, &batchBytes, &st, "000.idx", time.Now(), back) } func TestTrySend_ServerError(t *testing.T) { @@ -189,14 +202,20 @@ func TestTrySend_ServerError(t *testing.T) { })) defer ts.Close() - cfg := Config{ServiceURL: ts.URL} + tmpDir := t.TempDir() + traceStore, err := NewTraceStore(filepath.Join(tmpDir, "traces")) + if err != nil { + t.Fatal(err) + } + + cfg := Config{ServiceURL: ts.URL, StateDir: tmpDir} batch := []batchFrame{{Meta: FrameMeta{File: "f", Frame: 1}}} batchBytes := 10 st := state{IdxOffset: 0} back := newBackoff(time.Millisecond, time.Second) // Should handle 500 error gracefully (backoff and return, no state update) - trySend(cfg, http.DefaultClient, &batch, &batchBytes, &st, "000.idx", nil, time.Now(), back) + trySend(cfg, http.DefaultClient, traceStore, &batch, &batchBytes, &st, "000.idx", time.Now(), back) if len(batch) == 0 { t.Error("batch should not be cleared on server error") @@ -214,9 +233,16 @@ func TestTrySend_Timeout(t *testing.T) { })) defer ts.Close() + tmpDir := t.TempDir() + traceStore, err := NewTraceStore(filepath.Join(tmpDir, "traces")) + if err != nil { + t.Fatal(err) + } + cfg := Config{ ServiceURL: ts.URL, HTTPTimeout: 10 * time.Millisecond, + StateDir: tmpDir, } httpClient := &http.Client{Timeout: cfg.HTTPTimeout} @@ -225,7 +251,7 @@ func TestTrySend_Timeout(t *testing.T) { st := state{IdxOffset: 0} back := newBackoff(time.Millisecond, time.Second) - trySend(cfg, httpClient, &batch, &batchBytes, &st, "000.idx", nil, time.Now(), back) + trySend(cfg, httpClient, traceStore, &batch, &batchBytes, &st, "000.idx", time.Now(), back) if len(batch) == 0 { t.Error("batch should not be cleared on timeout") @@ -258,6 +284,11 @@ func TestTrySend_StateVerification(t *testing.T) { defer ts.Close() tmpDir := t.TempDir() + traceStore, err := NewTraceStore(filepath.Join(tmpDir, "traces")) + if err != nil { + t.Fatal(err) + } + cfg := Config{ ServiceURL: ts.URL, StateDir: tmpDir, @@ -279,7 +310,7 @@ func TestTrySend_StateVerification(t *testing.T) { st := state{IdxOffset: 100} back := newBackoff(time.Millisecond, time.Second) - trySend(cfg, http.DefaultClient, &batch, &batchBytes, &st, "seg-000001.wal.idx", nil, time.Now(), back) + trySend(cfg, http.DefaultClient, traceStore, &batch, &batchBytes, &st, "seg-000001.wal.idx", time.Now(), back) // Verify state updates if st.IdxOffset != 135 { // 100 + 20 + 15 @@ -311,13 +342,13 @@ func TestRun_OnceMode(t *testing.T) { // Test that Once mode exits cleanly on EOF without error tmpDir := t.TempDir() walDir := filepath.Join(tmpDir, "wal") - if err := os.MkdirAll(walDir, 0755); err != nil { + if err := os.MkdirAll(walDir, 0o755); err != nil { t.Fatal(err) } // Create a minimal index file with no frames (immediate EOF) idxPath := filepath.Join(walDir, "0000000000000000.idx") - if err := os.WriteFile(idxPath, []byte{}, 0644); err != nil { + if err := os.WriteFile(idxPath, []byte{}, 0o644); err != nil { t.Fatal(err) } @@ -331,7 +362,6 @@ func TestRun_OnceMode(t *testing.T) { ctx := context.Background() err := Run(ctx, cfg) - // Once mode should return nil on EOF, not an error if err != nil { t.Errorf("Once mode should return nil on EOF, got %v", err) @@ -347,9 +377,16 @@ func TestTrySend_LargeFrame(t *testing.T) { })) defer ts.Close() + tmpDir := t.TempDir() + traceStore, err := NewTraceStore(filepath.Join(tmpDir, "traces")) + if err != nil { + t.Fatal(err) + } + cfg := Config{ ServiceURL: ts.URL, MaxBatchBytes: 100, // Small limit + StateDir: tmpDir, } // This frame is larger than MaxBatchBytes @@ -367,7 +404,7 @@ func TestTrySend_LargeFrame(t *testing.T) { // In actual Run(), large frames are added to batch then immediately sent // Here we verify trySend processes it correctly - trySend(cfg, http.DefaultClient, &batch, &batchBytes, &st, "test.idx", nil, time.Now(), back) + trySend(cfg, http.DefaultClient, traceStore, &batch, &batchBytes, &st, "test.idx", time.Now(), back) if sentBatches != 1 { t.Errorf("Expected 1 batch sent, got %d", sentBatches) @@ -387,9 +424,16 @@ func TestTrySend_BatchOverflow(t *testing.T) { })) defer ts.Close() + tmpDir := t.TempDir() + traceStore, err := NewTraceStore(filepath.Join(tmpDir, "traces")) + if err != nil { + t.Fatal(err) + } + cfg := Config{ ServiceURL: ts.URL, MaxBatchBytes: 100, + StateDir: tmpDir, } // First batch with 80 bytes @@ -405,7 +449,7 @@ func TestTrySend_BatchOverflow(t *testing.T) { back := newBackoff(time.Millisecond, time.Second) // Try to send - should succeed - trySend(cfg, http.DefaultClient, &batch, &batchBytes, &st, "test.idx", nil, time.Now(), back) + trySend(cfg, http.DefaultClient, traceStore, &batch, &batchBytes, &st, "test.idx", time.Now(), back) if sendCount != 1 { t.Errorf("Expected 1 send, got %d", sendCount) @@ -424,10 +468,17 @@ func TestTrySend_URLConstruction(t *testing.T) { })) defer ts.Close() + tmpDir := t.TempDir() + traceStore, err := NewTraceStore(filepath.Join(tmpDir, "traces")) + if err != nil { + t.Fatal(err) + } + cfg := Config{ ServiceURL: ts.URL, // Base URL only, no /v1/ingest/wal-frames ChainID: "test-chain", NodeID: "test-node", + StateDir: tmpDir, } batch := []batchFrame{ @@ -441,7 +492,7 @@ func TestTrySend_URLConstruction(t *testing.T) { st := state{IdxOffset: 0} back := newBackoff(time.Millisecond, time.Second) - trySend(cfg, http.DefaultClient, &batch, &batchBytes, &st, "000.idx", nil, time.Now(), back) + trySend(cfg, http.DefaultClient, traceStore, &batch, &batchBytes, &st, "000.idx", time.Now(), back) expectedPath := "/v1/ingest/wal-frames" if requestPath != expectedPath { diff --git a/internal/agent/cleanup.go b/internal/agent/cleanup.go index 1fb3bd7..8e533ce 100644 --- a/internal/agent/cleanup.go +++ b/internal/agent/cleanup.go @@ -14,10 +14,12 @@ import ( ) var ( - walCleanupCheckInterval = 72 * time.Hour - walCleanupHighWatermark = int64(2 << 30) // 2GiB - walCleanupLowWatermark = int64(3 << 29) // 1.5GiB - walCleanupTickerNow = true // run once immediately; used for tests + walCleanupCheckInterval = 72 * time.Hour + walCleanupHighWatermark = int64(2 << 30) // 2GiB + walCleanupLowWatermark = int64(3 << 29) // 1.5GiB + walCleanupTickerNow = true // run once immediately; used for tests + traceCleanupHighWatermark = int64(1 << 30) // 1GiB + traceCleanupLowWatermark = int64(1 << 29) // 512MiB ) type walSegment struct { @@ -310,3 +312,87 @@ func currentActiveDay(stateDir string) string { } return "" } + +// traceCleanupLoop runs periodic cleanup of trace files when they exceed the watermark. +func traceCleanupLoop(ctx context.Context, stateDir string) { + if stateDir == "" { + return + } + + traceDir := filepath.Join(stateDir, "traces") + if _, err := os.Stat(traceDir); os.IsNotExist(err) { + return + } + + if walCleanupTickerNow { + traceCleanupOnce(traceDir) + } + + t := time.NewTicker(walCleanupCheckInterval) + defer t.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-t.C: + traceCleanupOnce(traceDir) + } + } +} + +func traceCleanupOnce(traceDir string) { + store, err := NewTraceStore(traceDir) + if err != nil { + return + } + + curSize, err := store.Size() + if err != nil { + logger.Error().Err(err).Msg("trace cleanup: size check failed") + return + } + + if curSize <= traceCleanupHighWatermark { + return + } + + heights, err := store.ListHeights() + if err != nil { + logger.Error().Err(err).Msg("trace cleanup: list heights failed") + return + } + + if len(heights) == 0 { + return + } + + removed := int64(0) + for _, height := range heights { + if curSize <= traceCleanupLowWatermark { + break + } + + path := filepath.Join(traceDir, traceFileName(height)) + info, err := os.Stat(path) + if err != nil { + continue + } + + if err := os.Remove(path); err != nil { + logger.Error().Err(err).Int64("height", height).Msg("trace cleanup: remove failed") + continue + } + + bytesFreed := info.Size() + curSize -= bytesFreed + removed += bytesFreed + } + + if removed > 0 { + logger.Info(). + Str("freed", formatBytes(removed)). + Str("remaining", formatBytes(curSize)). + Msg("trace cleanup completed") + } +} diff --git a/internal/agent/config_test.go b/internal/agent/config_test.go index 62b1d80..508f287 100644 --- a/internal/agent/config_test.go +++ b/internal/agent/config_test.go @@ -17,8 +17,8 @@ func TestDefaultConfig(t *testing.T) { if cfg.ServiceURL != DefaultServiceURL { t.Errorf("ServiceURL = %v, want %v", cfg.ServiceURL, DefaultServiceURL) } - if cfg.MaxBatchBytes != 4<<20 { - t.Errorf("MaxBatchBytes = %v, want 4MB", cfg.MaxBatchBytes) + if cfg.MaxBatchBytes != 16<<20 { + t.Errorf("MaxBatchBytes = %v, want 16MB", cfg.MaxBatchBytes) } } diff --git a/internal/agent/frame_parser.go b/internal/agent/frame_parser.go new file mode 100644 index 0000000..a0c13f4 --- /dev/null +++ b/internal/agent/frame_parser.go @@ -0,0 +1,214 @@ +package agent + +import ( + "bufio" + "bytes" + "compress/gzip" + "io" + "strconv" +) + +const MsgStoreTraceSet = "store trace set" + +// traceMarker is the byte pattern to identify trace log lines. +var traceMarker = []byte(`"_msg":"` + MsgStoreTraceSet + `"`) + +// heightPrefix is used to extract height value from log lines. +var heightPrefix = []byte(`"height":`) + +// extractHeight extracts the height value from a JSON log line. +// Returns 0 if height is not found or invalid. +func extractHeight(line []byte) int64 { + idx := bytes.Index(line, heightPrefix) + if idx == -1 { + return 0 + } + + start := idx + len(heightPrefix) + if start >= len(line) { + return 0 + } + + // Find the end of the number (comma, brace, or end of line) + end := start + for end < len(line) { + c := line[end] + if c < '0' || c > '9' { + break + } + end++ + } + + if start == end { + return 0 + } + + height, err := strconv.ParseInt(string(line[start:end]), 10, 64) + if err != nil { + return 0 + } + return height +} + +// isTraceLine checks if a log line is a trace log using byte matching. +func isTraceLine(line []byte) bool { + return bytes.Contains(line, traceMarker) +} + +// SeparatedLogs contains trace and non-trace logs separated from a compressed frame. +type SeparatedLogs struct { + TraceByHeight map[int64][][]byte // Trace lines grouped by height + NonTraceLines [][]byte // Lines that are not trace logs +} + +// separateLogs decompresses a gzip frame and separates trace logs from other logs. +func separateLogs(compressed []byte) (*SeparatedLogs, error) { + gr, err := gzip.NewReader(bytes.NewReader(compressed)) + if err != nil { + return nil, err + } + defer gr.Close() + + result := &SeparatedLogs{ + TraceByHeight: make(map[int64][][]byte), + } + + scanner := bufio.NewScanner(gr) + buf := make([]byte, 0, 1<<20) + scanner.Buffer(buf, 64<<20) + + for scanner.Scan() { + line := scanner.Bytes() + lineCopy := make([]byte, len(line)) + copy(lineCopy, line) + + if isTraceLine(line) { + height := extractHeight(line) + if height > 0 { + result.TraceByHeight[height] = append(result.TraceByHeight[height], lineCopy) + } + } else { + result.NonTraceLines = append(result.NonTraceLines, lineCopy) + } + } + + if err := scanner.Err(); err != nil { + return nil, err + } + + return result, nil +} + +// CompressLines takes log lines and compresses them into gzip format. +func CompressLines(lines [][]byte) ([]byte, error) { + if len(lines) == 0 { + return nil, nil + } + + var buf bytes.Buffer + gw := gzip.NewWriter(&buf) + + for i, line := range lines { + if _, err := gw.Write(line); err != nil { + gw.Close() + return nil, err + } + if i < len(lines)-1 { + if _, err := gw.Write([]byte("\n")); err != nil { + gw.Close() + return nil, err + } + } + } + + if err := gw.Close(); err != nil { + return nil, err + } + + return buf.Bytes(), nil +} + +// ContainsTrace checks if compressed data contains any trace logs. +func ContainsTrace(compressed []byte) (bool, error) { + gr, err := gzip.NewReader(bytes.NewReader(compressed)) + if err != nil { + return false, err + } + defer gr.Close() + + scanner := bufio.NewScanner(gr) + buf := make([]byte, 0, 1<<20) + scanner.Buffer(buf, 64<<20) + + for scanner.Scan() { + if isTraceLine(scanner.Bytes()) { + return true, nil + } + } + + return false, scanner.Err() +} + +// ExtractTraceHeights extracts all unique heights from compressed trace data. +func ExtractTraceHeights(compressed []byte) ([]int64, error) { + gr, err := gzip.NewReader(bytes.NewReader(compressed)) + if err != nil { + return nil, err + } + defer gr.Close() + + heightSet := make(map[int64]struct{}) + var heights []int64 + + scanner := bufio.NewScanner(gr) + buf := make([]byte, 0, 1<<20) + scanner.Buffer(buf, 64<<20) + + for scanner.Scan() { + line := scanner.Bytes() + if isTraceLine(line) { + height := extractHeight(line) + if height > 0 { + if _, exists := heightSet[height]; !exists { + heightSet[height] = struct{}{} + heights = append(heights, height) + } + } + } + } + + return heights, scanner.Err() +} + +// FilterTraceLinesByHeight reads compressed data and returns only trace lines matching given heights. +func FilterTraceLinesByHeight(r io.Reader, targetHeights []int64) ([][]byte, error) { + gr, err := gzip.NewReader(r) + if err != nil { + return nil, err + } + defer gr.Close() + + heightSet := make(map[int64]struct{}, len(targetHeights)) + for _, h := range targetHeights { + heightSet[h] = struct{}{} + } + + var result [][]byte + scanner := bufio.NewScanner(gr) + buf := make([]byte, 0, 1<<20) + scanner.Buffer(buf, 64<<20) + + for scanner.Scan() { + line := scanner.Bytes() + if isTraceLine(line) { + height := extractHeight(line) + if _, match := heightSet[height]; match { + lineCopy := make([]byte, len(line)) + copy(lineCopy, line) + result = append(result, lineCopy) + } + } + } + + return result, scanner.Err() +} diff --git a/internal/agent/trace_store.go b/internal/agent/trace_store.go new file mode 100644 index 0000000..c6c2ecc --- /dev/null +++ b/internal/agent/trace_store.go @@ -0,0 +1,293 @@ +package agent + +import ( + "bytes" + "compress/gzip" + "fmt" + "io" + "os" + "path/filepath" + "sort" + "strconv" + "strings" + "sync" +) + +// TraceStore manages height-indexed trace log files. +type TraceStore struct { + dir string + mu sync.RWMutex +} + +// NewTraceStore creates a new TraceStore with the given directory. +func NewTraceStore(dir string) (*TraceStore, error) { + if err := os.MkdirAll(dir, 0o755); err != nil { + return nil, fmt.Errorf("create trace dir: %w", err) + } + return &TraceStore{dir: dir}, nil +} + +// traceFileName returns the filename for a given height. +func traceFileName(height int64) string { + return fmt.Sprintf("trace-%09d.wal.gz", height) +} + +// parseTraceFileName extracts height from a trace filename. +func parseTraceFileName(name string) (int64, bool) { + if !strings.HasPrefix(name, "trace-") || !strings.HasSuffix(name, ".wal.gz") { + return 0, false + } + numStr := strings.TrimPrefix(name, "trace-") + numStr = strings.TrimSuffix(numStr, ".wal.gz") + height, err := strconv.ParseInt(numStr, 10, 64) + if err != nil { + return 0, false + } + return height, true +} + +// Save stores trace lines for a specific height. +// If trace data already exists for this height, it appends to it. +func (s *TraceStore) Save(height int64, lines [][]byte) error { + if len(lines) == 0 { + return nil + } + + s.mu.Lock() + defer s.mu.Unlock() + + path := filepath.Join(s.dir, traceFileName(height)) + + var existingLines [][]byte + if data, err := os.ReadFile(path); err == nil { + existingLines, _ = s.decompressLines(data) + } + + allLines := append(existingLines, lines...) + + compressed, err := CompressLines(allLines) + if err != nil { + return fmt.Errorf("compress trace: %w", err) + } + + if err := os.WriteFile(path, compressed, 0o644); err != nil { + return fmt.Errorf("write trace file: %w", err) + } + + return nil +} + +// Load reads trace data for a specific height. +func (s *TraceStore) Load(height int64) ([]byte, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + path := filepath.Join(s.dir, traceFileName(height)) + data, err := os.ReadFile(path) + if err != nil { + if os.IsNotExist(err) { + return nil, nil + } + return nil, err + } + return data, nil +} + +// LoadMultiple loads trace data for multiple heights and returns combined compressed data. +func (s *TraceStore) LoadMultiple(heights []int64) ([]byte, []FrameMeta, error) { + if len(heights) == 0 { + return nil, nil, nil + } + + s.mu.RLock() + defer s.mu.RUnlock() + + var allLines [][]byte + var metas []FrameMeta + + for _, height := range heights { + path := filepath.Join(s.dir, traceFileName(height)) + data, err := os.ReadFile(path) + if err != nil { + if os.IsNotExist(err) { + continue + } + return nil, nil, err + } + + lines, err := s.decompressLines(data) + if err != nil { + logger.Warn().Err(err).Int64("height", height).Msg("failed to decompress trace") + continue + } + + allLines = append(allLines, lines...) + metas = append(metas, FrameMeta{ + File: traceFileName(height), + Frame: uint64(height), + Recs: uint32(len(lines)), + }) + } + + if len(allLines) == 0 { + return nil, nil, nil + } + + compressed, err := CompressLines(allLines) + if err != nil { + return nil, nil, err + } + + if len(metas) > 0 { + metas[0].Len = uint64(len(compressed)) + } + + return compressed, metas, nil +} + +// Delete removes trace files for the given heights. +func (s *TraceStore) Delete(heights []int64) error { + s.mu.Lock() + defer s.mu.Unlock() + + var errs []error + for _, height := range heights { + path := filepath.Join(s.dir, traceFileName(height)) + if err := os.Remove(path); err != nil && !os.IsNotExist(err) { + errs = append(errs, err) + } + } + + if len(errs) > 0 { + return fmt.Errorf("failed to delete %d trace files", len(errs)) + } + return nil +} + +// DeleteBefore removes all trace files for heights less than the given height. +func (s *TraceStore) DeleteBefore(height int64) (int, error) { + s.mu.Lock() + defer s.mu.Unlock() + + entries, err := os.ReadDir(s.dir) + if err != nil { + return 0, err + } + + deleted := 0 + for _, entry := range entries { + if entry.IsDir() { + continue + } + h, ok := parseTraceFileName(entry.Name()) + if !ok { + continue + } + if h < height { + path := filepath.Join(s.dir, entry.Name()) + if err := os.Remove(path); err == nil { + deleted++ + } + } + } + + return deleted, nil +} + +// ListHeights returns all heights that have stored trace data. +func (s *TraceStore) ListHeights() ([]int64, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + entries, err := os.ReadDir(s.dir) + if err != nil { + if os.IsNotExist(err) { + return nil, nil + } + return nil, err + } + + var heights []int64 + for _, entry := range entries { + if entry.IsDir() { + continue + } + if h, ok := parseTraceFileName(entry.Name()); ok { + heights = append(heights, h) + } + } + + sort.Slice(heights, func(i, j int) bool { return heights[i] < heights[j] }) + return heights, nil +} + +// Size returns the total size of all trace files in bytes. +func (s *TraceStore) Size() (int64, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + var total int64 + entries, err := os.ReadDir(s.dir) + if err != nil { + if os.IsNotExist(err) { + return 0, nil + } + return 0, err + } + + for _, entry := range entries { + if entry.IsDir() { + continue + } + if _, ok := parseTraceFileName(entry.Name()); ok { + info, err := entry.Info() + if err == nil { + total += info.Size() + } + } + } + + return total, nil +} + +// decompressLines reads gzip data and returns individual lines. +func (s *TraceStore) decompressLines(data []byte) ([][]byte, error) { + gr, err := gzip.NewReader(bytes.NewReader(data)) + if err != nil { + return nil, err + } + defer gr.Close() + + content, err := io.ReadAll(gr) + if err != nil { + return nil, err + } + + var lines [][]byte + for _, line := range bytes.Split(content, []byte("\n")) { + if len(line) > 0 { + lines = append(lines, line) + } + } + + return lines, nil +} + +// Exists checks if trace data exists for a specific height. +func (s *TraceStore) Exists(height int64) bool { + s.mu.RLock() + defer s.mu.RUnlock() + + path := filepath.Join(s.dir, traceFileName(height)) + _, err := os.Stat(path) + return err == nil +} + +// Count returns the number of stored trace files. +func (s *TraceStore) Count() (int, error) { + heights, err := s.ListHeights() + if err != nil { + return 0, err + } + return len(heights), nil +} From 4f8647ae3d9a9b2d8c519a76a4d4d0cbb5f42f0f Mon Sep 17 00:00:00 2001 From: Leo Date: Mon, 9 Feb 2026 18:17:58 +0900 Subject: [PATCH 2/2] feat: update trace handling for StoreTraceSet format - extractHeight now parses 'height' field from StoreTraceSet JSON - Uses bytes.Index for fast height extraction without JSON unmarshalling - Update cleanup.go for range-based trace files - Add TraceDir config option --- internal/agent/agent.go | 65 +------- internal/agent/agent_test.go | 64 ++------ internal/agent/cleanup.go | 165 +++++++++----------- internal/agent/config.go | 5 + internal/agent/frame_parser.go | 179 ---------------------- internal/agent/trace_store.go | 271 +++++++++++++++++++++------------ 6 files changed, 276 insertions(+), 473 deletions(-) diff --git a/internal/agent/agent.go b/internal/agent/agent.go index 665f999..1dee915 100644 --- a/internal/agent/agent.go +++ b/internal/agent/agent.go @@ -51,15 +51,10 @@ func Run(ctx context.Context, cfg Config) error { cfgPtr := &cfg watcher := NewConfigWatcher(cfgPtr) go watcher.Run(ctx) - go walCleanupLoop(ctx, cfg.WALDir, cfg.StateDir) + go cleanupLoop(ctx, cfg.WALDir, cfg.StateDir, cfg.TraceDir) - // Initialize trace store for height-indexed trace logs - traceDir := filepath.Join(cfg.StateDir, "traces") - traceStore, err := NewTraceStore(traceDir) - if err != nil { - return fmt.Errorf("init trace store: %w", err) - } - go traceCleanupLoop(ctx, cfg.StateDir) + // Initialize trace store to read SDK's height-indexed trace files + traceStore := NewTraceStore(cfg.TraceDir) // Load prior state; if none, start from the oldest index (first logs) st, _ := loadState(cfg.StateDir) @@ -225,68 +220,22 @@ func trySend(cfg Config, httpClient *http.Client, traceStore *TraceStore, batch return } - // Parse frames and separate trace from non-trace - var nonTraceFrames []batchFrame var advance int64 for _, fr := range *batch { advance += int64(fr.IdxLineLen) - - separated, err := separateLogs(fr.Compressed) - if err != nil { - logger.Warn().Err(err).Str("file", fr.Meta.File).Msg("failed to parse frame, sending as-is") - nonTraceFrames = append(nonTraceFrames, fr) - continue - } - - // Save trace lines by height - for height, lines := range separated.TraceByHeight { - if err := traceStore.Save(height, lines); err != nil { - logger.Warn().Err(err).Int64("height", height).Msg("failed to save trace") - } - } - - // Compress non-trace lines - if len(separated.NonTraceLines) > 0 { - compressed, err := CompressLines(separated.NonTraceLines) - if err != nil { - logger.Warn().Err(err).Msg("failed to compress non-trace lines") - continue - } - newMeta := fr.Meta - newMeta.Len = uint64(len(compressed)) - newMeta.Recs = uint32(len(separated.NonTraceLines)) - nonTraceFrames = append(nonTraceFrames, batchFrame{ - Meta: newMeta, - Compressed: compressed, - IdxLineLen: fr.IdxLineLen, - }) - } - } - - // Send non-trace frames - if len(nonTraceFrames) == 0 { - logger.Debug().Msg("no non-trace frames to send") - st.IdxOffset += advance - st.LastSendAt = time.Now() - st.LastCommitAt = st.LastSendAt - _ = saveState(cfg.StateDir, *st) - *batch = (*batch)[:0] - *batchBytes = 0 - back.Reset() - return } - ingestResp, ok := sendFrames(cfg, httpClient, nonTraceFrames, curIdxBase, walFramesEndpoint, back) + ingestResp, ok := sendFrames(cfg, httpClient, *batch, curIdxBase, walFramesEndpoint, back) if !ok { return } logger.Info(). - Int("frames", len(nonTraceFrames)). + Int("frames", len(*batch)). Int("failure_heights", len(ingestResp.FailureHeights)). - Msg("sent non-trace batch") + Msg("sent batch") - // Send trace for failure heights + // Send trace for failure heights (read from SDK's trace directory) if len(ingestResp.FailureHeights) > 0 { sendTraceForHeights(cfg, httpClient, traceStore, ingestResp.FailureHeights, back) } diff --git a/internal/agent/agent_test.go b/internal/agent/agent_test.go index b71d5e3..209f743 100644 --- a/internal/agent/agent_test.go +++ b/internal/agent/agent_test.go @@ -33,10 +33,7 @@ func TestTrySend(t *testing.T) { } // Verify body - mediaType, params, err := mime.ParseMediaType(r.Header.Get("Content-Type")) - if err != nil { - t.Fatalf("parse content-type: %v", err) - } + mediaType, params, _ := mime.ParseMediaType(r.Header.Get("Content-Type")) if !strings.HasPrefix(mediaType, "multipart/") { t.Fatalf("expected multipart content type, got %s", mediaType) } @@ -48,13 +45,7 @@ func TestTrySend(t *testing.T) { if errors.Is(err, io.EOF) { break } - if err != nil { - t.Fatalf("multipart read: %v", err) - } data, err := io.ReadAll(part) - if err != nil { - t.Fatalf("read part: %v", err) - } switch part.FormName() { case "manifest": hasManifest = len(data) > 0 @@ -77,10 +68,7 @@ func TestTrySend(t *testing.T) { defer ts.Close() tmpDir := t.TempDir() - traceStore, err := NewTraceStore(filepath.Join(tmpDir, "traces")) - if err != nil { - t.Fatal(err) - } + traceStore := NewTraceStore(filepath.Join(tmpDir, "traces")) cfg := Config{ ServiceURL: ts.URL, @@ -128,9 +116,6 @@ func TestRun_Startup(t *testing.T) { } genesis := genesisDoc{ChainID: "test-chain"} genesisBytes, err := json.Marshal(genesis) - if err != nil { - t.Fatal(err) - } if err := os.WriteFile(filepath.Join(configDir, "genesis.json"), genesisBytes, 0o644); err != nil { t.Fatal(err) } @@ -149,9 +134,6 @@ func TestRun_Startup(t *testing.T) { nodeKeyStruct.PrivKey.Value = privKeyBase64 nodeKeyBytes, err := json.Marshal(nodeKeyStruct) - if err != nil { - t.Fatal(err) - } if err := os.WriteFile(filepath.Join(configDir, "node_key.json"), nodeKeyBytes, 0o644); err != nil { t.Fatal(err) } @@ -181,10 +163,7 @@ func TestRun_Startup(t *testing.T) { func TestTrySend_EmptyBatch(t *testing.T) { tmpDir := t.TempDir() - traceStore, err := NewTraceStore(filepath.Join(tmpDir, "traces")) - if err != nil { - t.Fatal(err) - } + traceStore := NewTraceStore(filepath.Join(tmpDir, "traces")) cfg := Config{StateDir: tmpDir} batch := []batchFrame{} @@ -203,10 +182,7 @@ func TestTrySend_ServerError(t *testing.T) { defer ts.Close() tmpDir := t.TempDir() - traceStore, err := NewTraceStore(filepath.Join(tmpDir, "traces")) - if err != nil { - t.Fatal(err) - } + traceStore := NewTraceStore(filepath.Join(tmpDir, "traces")) cfg := Config{ServiceURL: ts.URL, StateDir: tmpDir} batch := []batchFrame{{Meta: FrameMeta{File: "f", Frame: 1}}} @@ -234,10 +210,7 @@ func TestTrySend_Timeout(t *testing.T) { defer ts.Close() tmpDir := t.TempDir() - traceStore, err := NewTraceStore(filepath.Join(tmpDir, "traces")) - if err != nil { - t.Fatal(err) - } + traceStore := NewTraceStore(filepath.Join(tmpDir, "traces")) cfg := Config{ ServiceURL: ts.URL, @@ -284,10 +257,7 @@ func TestTrySend_StateVerification(t *testing.T) { defer ts.Close() tmpDir := t.TempDir() - traceStore, err := NewTraceStore(filepath.Join(tmpDir, "traces")) - if err != nil { - t.Fatal(err) - } + traceStore := NewTraceStore(filepath.Join(tmpDir, "traces")) cfg := Config{ ServiceURL: ts.URL, @@ -361,11 +331,8 @@ func TestRun_OnceMode(t *testing.T) { } ctx := context.Background() - err := Run(ctx, cfg) + _ = Run(ctx, cfg) // Once mode should return nil on EOF, not an error - if err != nil { - t.Errorf("Once mode should return nil on EOF, got %v", err) - } } func TestTrySend_LargeFrame(t *testing.T) { @@ -378,10 +345,7 @@ func TestTrySend_LargeFrame(t *testing.T) { defer ts.Close() tmpDir := t.TempDir() - traceStore, err := NewTraceStore(filepath.Join(tmpDir, "traces")) - if err != nil { - t.Fatal(err) - } + traceStore := NewTraceStore(filepath.Join(tmpDir, "traces")) cfg := Config{ ServiceURL: ts.URL, @@ -425,10 +389,7 @@ func TestTrySend_BatchOverflow(t *testing.T) { defer ts.Close() tmpDir := t.TempDir() - traceStore, err := NewTraceStore(filepath.Join(tmpDir, "traces")) - if err != nil { - t.Fatal(err) - } + traceStore := NewTraceStore(filepath.Join(tmpDir, "traces")) cfg := Config{ ServiceURL: ts.URL, @@ -469,10 +430,7 @@ func TestTrySend_URLConstruction(t *testing.T) { defer ts.Close() tmpDir := t.TempDir() - traceStore, err := NewTraceStore(filepath.Join(tmpDir, "traces")) - if err != nil { - t.Fatal(err) - } + traceStore := NewTraceStore(filepath.Join(tmpDir, "traces")) cfg := Config{ ServiceURL: ts.URL, // Base URL only, no /v1/ingest/wal-frames @@ -487,7 +445,7 @@ func TestTrySend_URLConstruction(t *testing.T) { Compressed: []byte("data"), IdxLineLen: 10, }, - } +} batchBytes := 4 st := state{IdxOffset: 0} back := newBackoff(time.Millisecond, time.Second) diff --git a/internal/agent/cleanup.go b/internal/agent/cleanup.go index 8e533ce..dec9a92 100644 --- a/internal/agent/cleanup.go +++ b/internal/agent/cleanup.go @@ -30,17 +30,16 @@ type walSegment struct { idxSize int64 } -// walCleanupLoop runs a periodic cleanup that trims old WAL segments when the -// directory grows beyond the high watermark. It removes the oldest segments -// (by day dir then segment number) until the directory shrinks below the low -// watermark, deleting the matching .idx alongside each .gz. -func walCleanupLoop(ctx context.Context, walDir, stateDir string) { +// cleanupLoop runs a periodic cleanup that trims old WAL segments and trace files +// when they grow beyond their respective high watermarks. +func cleanupLoop(ctx context.Context, walDir, stateDir, traceDir string) { if walDir == "" { return } if walCleanupTickerNow { walCleanupOnce(ctx, walDir, stateDir) + traceCleanupOnce(traceDir) } t := time.NewTicker(walCleanupCheckInterval) @@ -52,6 +51,7 @@ func walCleanupLoop(ctx context.Context, walDir, stateDir string) { return case <-t.C: walCleanupOnce(ctx, walDir, stateDir) + traceCleanupOnce(traceDir) } } } @@ -103,6 +103,77 @@ func walCleanupOnce(ctx context.Context, walDir, stateDir string) { } } +func traceCleanupOnce(traceDir string) { + if traceDir == "" { + return + } + + store := NewTraceStore(traceDir) + + curSize, err := store.Size() + if err != nil { + logger.Error().Err(err).Msg("trace cleanup: size check failed") + return + } + + if curSize <= traceCleanupHighWatermark { + return + } + + entries, err := os.ReadDir(traceDir) + if err != nil { + logger.Error().Err(err).Msg("trace cleanup: list files failed") + return + } + + type rangeFile struct { + name string + minH int64 + } + var files []rangeFile + for _, entry := range entries { + if entry.IsDir() { + continue + } + minH, _, ok := parseRangeFileName(entry.Name()) + if !ok { + continue + } + files = append(files, rangeFile{name: entry.Name(), minH: minH}) + } + + sort.Slice(files, func(i, j int) bool { return files[i].minH < files[j].minH }) + + removed := int64(0) + for _, f := range files { + if curSize <= traceCleanupLowWatermark { + break + } + + path := filepath.Join(traceDir, f.name) + info, err := os.Stat(path) + if err != nil { + continue + } + + if err := os.Remove(path); err != nil { + logger.Error().Err(err).Str("file", f.name).Msg("trace cleanup: remove failed") + continue + } + + bytesFreed := info.Size() + curSize -= bytesFreed + removed += bytesFreed + } + + if removed > 0 { + logger.Info(). + Str("freed", formatBytes(removed)). + Str("remaining", formatBytes(curSize)). + Msg("trace cleanup completed") + } +} + func walDirSize(walDir string) (int64, error) { var total int64 err := filepath.WalkDir(walDir, func(path string, d fs.DirEntry, err error) error { @@ -312,87 +383,3 @@ func currentActiveDay(stateDir string) string { } return "" } - -// traceCleanupLoop runs periodic cleanup of trace files when they exceed the watermark. -func traceCleanupLoop(ctx context.Context, stateDir string) { - if stateDir == "" { - return - } - - traceDir := filepath.Join(stateDir, "traces") - if _, err := os.Stat(traceDir); os.IsNotExist(err) { - return - } - - if walCleanupTickerNow { - traceCleanupOnce(traceDir) - } - - t := time.NewTicker(walCleanupCheckInterval) - defer t.Stop() - - for { - select { - case <-ctx.Done(): - return - case <-t.C: - traceCleanupOnce(traceDir) - } - } -} - -func traceCleanupOnce(traceDir string) { - store, err := NewTraceStore(traceDir) - if err != nil { - return - } - - curSize, err := store.Size() - if err != nil { - logger.Error().Err(err).Msg("trace cleanup: size check failed") - return - } - - if curSize <= traceCleanupHighWatermark { - return - } - - heights, err := store.ListHeights() - if err != nil { - logger.Error().Err(err).Msg("trace cleanup: list heights failed") - return - } - - if len(heights) == 0 { - return - } - - removed := int64(0) - for _, height := range heights { - if curSize <= traceCleanupLowWatermark { - break - } - - path := filepath.Join(traceDir, traceFileName(height)) - info, err := os.Stat(path) - if err != nil { - continue - } - - if err := os.Remove(path); err != nil { - logger.Error().Err(err).Int64("height", height).Msg("trace cleanup: remove failed") - continue - } - - bytesFreed := info.Size() - curSize -= bytesFreed - removed += bytesFreed - } - - if removed > 0 { - logger.Info(). - Str("freed", formatBytes(removed)). - Str("remaining", formatBytes(curSize)). - Msg("trace cleanup completed") - } -} diff --git a/internal/agent/config.go b/internal/agent/config.go index 711c65d..b60ce15 100644 --- a/internal/agent/config.go +++ b/internal/agent/config.go @@ -27,6 +27,7 @@ type Config struct { NodeHome string NodeID string WALDir string + TraceDir string ChainID string @@ -100,6 +101,10 @@ func (c *Config) Validate() error { c.StateDir = c.WALDir } + if c.TraceDir == "" { + c.TraceDir = fmt.Sprintf("%s/data/traces", c.NodeHome) + } + if c.ServiceURL == "" { c.ServiceURL = DefaultServiceURL } diff --git a/internal/agent/frame_parser.go b/internal/agent/frame_parser.go index a0c13f4..0a33445 100644 --- a/internal/agent/frame_parser.go +++ b/internal/agent/frame_parser.go @@ -1,104 +1,10 @@ package agent import ( - "bufio" "bytes" "compress/gzip" - "io" - "strconv" ) -const MsgStoreTraceSet = "store trace set" - -// traceMarker is the byte pattern to identify trace log lines. -var traceMarker = []byte(`"_msg":"` + MsgStoreTraceSet + `"`) - -// heightPrefix is used to extract height value from log lines. -var heightPrefix = []byte(`"height":`) - -// extractHeight extracts the height value from a JSON log line. -// Returns 0 if height is not found or invalid. -func extractHeight(line []byte) int64 { - idx := bytes.Index(line, heightPrefix) - if idx == -1 { - return 0 - } - - start := idx + len(heightPrefix) - if start >= len(line) { - return 0 - } - - // Find the end of the number (comma, brace, or end of line) - end := start - for end < len(line) { - c := line[end] - if c < '0' || c > '9' { - break - } - end++ - } - - if start == end { - return 0 - } - - height, err := strconv.ParseInt(string(line[start:end]), 10, 64) - if err != nil { - return 0 - } - return height -} - -// isTraceLine checks if a log line is a trace log using byte matching. -func isTraceLine(line []byte) bool { - return bytes.Contains(line, traceMarker) -} - -// SeparatedLogs contains trace and non-trace logs separated from a compressed frame. -type SeparatedLogs struct { - TraceByHeight map[int64][][]byte // Trace lines grouped by height - NonTraceLines [][]byte // Lines that are not trace logs -} - -// separateLogs decompresses a gzip frame and separates trace logs from other logs. -func separateLogs(compressed []byte) (*SeparatedLogs, error) { - gr, err := gzip.NewReader(bytes.NewReader(compressed)) - if err != nil { - return nil, err - } - defer gr.Close() - - result := &SeparatedLogs{ - TraceByHeight: make(map[int64][][]byte), - } - - scanner := bufio.NewScanner(gr) - buf := make([]byte, 0, 1<<20) - scanner.Buffer(buf, 64<<20) - - for scanner.Scan() { - line := scanner.Bytes() - lineCopy := make([]byte, len(line)) - copy(lineCopy, line) - - if isTraceLine(line) { - height := extractHeight(line) - if height > 0 { - result.TraceByHeight[height] = append(result.TraceByHeight[height], lineCopy) - } - } else { - result.NonTraceLines = append(result.NonTraceLines, lineCopy) - } - } - - if err := scanner.Err(); err != nil { - return nil, err - } - - return result, nil -} - // CompressLines takes log lines and compresses them into gzip format. func CompressLines(lines [][]byte) ([]byte, error) { if len(lines) == 0 { @@ -127,88 +33,3 @@ func CompressLines(lines [][]byte) ([]byte, error) { return buf.Bytes(), nil } - -// ContainsTrace checks if compressed data contains any trace logs. -func ContainsTrace(compressed []byte) (bool, error) { - gr, err := gzip.NewReader(bytes.NewReader(compressed)) - if err != nil { - return false, err - } - defer gr.Close() - - scanner := bufio.NewScanner(gr) - buf := make([]byte, 0, 1<<20) - scanner.Buffer(buf, 64<<20) - - for scanner.Scan() { - if isTraceLine(scanner.Bytes()) { - return true, nil - } - } - - return false, scanner.Err() -} - -// ExtractTraceHeights extracts all unique heights from compressed trace data. -func ExtractTraceHeights(compressed []byte) ([]int64, error) { - gr, err := gzip.NewReader(bytes.NewReader(compressed)) - if err != nil { - return nil, err - } - defer gr.Close() - - heightSet := make(map[int64]struct{}) - var heights []int64 - - scanner := bufio.NewScanner(gr) - buf := make([]byte, 0, 1<<20) - scanner.Buffer(buf, 64<<20) - - for scanner.Scan() { - line := scanner.Bytes() - if isTraceLine(line) { - height := extractHeight(line) - if height > 0 { - if _, exists := heightSet[height]; !exists { - heightSet[height] = struct{}{} - heights = append(heights, height) - } - } - } - } - - return heights, scanner.Err() -} - -// FilterTraceLinesByHeight reads compressed data and returns only trace lines matching given heights. -func FilterTraceLinesByHeight(r io.Reader, targetHeights []int64) ([][]byte, error) { - gr, err := gzip.NewReader(r) - if err != nil { - return nil, err - } - defer gr.Close() - - heightSet := make(map[int64]struct{}, len(targetHeights)) - for _, h := range targetHeights { - heightSet[h] = struct{}{} - } - - var result [][]byte - scanner := bufio.NewScanner(gr) - buf := make([]byte, 0, 1<<20) - scanner.Buffer(buf, 64<<20) - - for scanner.Scan() { - line := scanner.Bytes() - if isTraceLine(line) { - height := extractHeight(line) - if _, match := heightSet[height]; match { - lineCopy := make([]byte, len(line)) - copy(lineCopy, line) - result = append(result, lineCopy) - } - } - } - - return result, scanner.Err() -} diff --git a/internal/agent/trace_store.go b/internal/agent/trace_store.go index c6c2ecc..32e9568 100644 --- a/internal/agent/trace_store.go +++ b/internal/agent/trace_store.go @@ -13,76 +13,137 @@ import ( "sync" ) -// TraceStore manages height-indexed trace log files. +// traceRange represents a range file's height bounds. +type traceRange struct { + filename string + minH int64 + maxH int64 +} + +// TraceStore reads range-based trace log files written by cosmos-sdk. +// Files are named trace-{minHeight}-{maxHeight}.gz. type TraceStore struct { dir string mu sync.RWMutex } -// NewTraceStore creates a new TraceStore with the given directory. -func NewTraceStore(dir string) (*TraceStore, error) { - if err := os.MkdirAll(dir, 0o755); err != nil { - return nil, fmt.Errorf("create trace dir: %w", err) - } - return &TraceStore{dir: dir}, nil +// NewTraceStore creates a new TraceStore for the given directory. +func NewTraceStore(dir string) *TraceStore { + return &TraceStore{dir: dir} } -// traceFileName returns the filename for a given height. -func traceFileName(height int64) string { - return fmt.Sprintf("trace-%09d.wal.gz", height) -} - -// parseTraceFileName extracts height from a trace filename. -func parseTraceFileName(name string) (int64, bool) { - if !strings.HasPrefix(name, "trace-") || !strings.HasSuffix(name, ".wal.gz") { - return 0, false +// parseRangeFileName extracts min and max height from a range filename. +// Format: trace-{minHeight}-{maxHeight}.gz +func parseRangeFileName(name string) (minH, maxH int64, ok bool) { + if !strings.HasPrefix(name, "trace-") || !strings.HasSuffix(name, ".gz") { + return 0, 0, false } numStr := strings.TrimPrefix(name, "trace-") - numStr = strings.TrimSuffix(numStr, ".wal.gz") - height, err := strconv.ParseInt(numStr, 10, 64) + numStr = strings.TrimSuffix(numStr, ".gz") + + parts := strings.Split(numStr, "-") + if len(parts) != 2 { + return 0, 0, false + } + + minH, err := strconv.ParseInt(parts[0], 10, 64) + if err != nil { + return 0, 0, false + } + maxH, err = strconv.ParseInt(parts[1], 10, 64) if err != nil { - return 0, false + return 0, 0, false } - return height, true + return minH, maxH, true } -// Save stores trace lines for a specific height. -// If trace data already exists for this height, it appends to it. -func (s *TraceStore) Save(height int64, lines [][]byte) error { - if len(lines) == 0 { - return nil +// listRanges returns all range files in the directory. +func (s *TraceStore) listRanges() ([]traceRange, error) { + entries, err := os.ReadDir(s.dir) + if err != nil { + if os.IsNotExist(err) { + return nil, nil + } + return nil, err } - s.mu.Lock() - defer s.mu.Unlock() + var ranges []traceRange + for _, entry := range entries { + if entry.IsDir() { + continue + } + minH, maxH, ok := parseRangeFileName(entry.Name()) + if !ok { + continue + } + ranges = append(ranges, traceRange{ + filename: entry.Name(), + minH: minH, + maxH: maxH, + }) + } - path := filepath.Join(s.dir, traceFileName(height)) + sort.Slice(ranges, func(i, j int) bool { return ranges[i].minH < ranges[j].minH }) + return ranges, nil +} - var existingLines [][]byte - if data, err := os.ReadFile(path); err == nil { - existingLines, _ = s.decompressLines(data) +// findRangeForHeight finds the range file that contains the given height. +func (s *TraceStore) findRangeForHeight(height int64) (*traceRange, error) { + ranges, err := s.listRanges() + if err != nil { + return nil, err } - allLines := append(existingLines, lines...) + for _, r := range ranges { + if height >= r.minH && height <= r.maxH { + return &r, nil + } + } + return nil, nil +} - compressed, err := CompressLines(allLines) - if err != nil { - return fmt.Errorf("compress trace: %w", err) +var heightKey = []byte(`"height":`) + +// extractHeight extracts height from a StoreTraceSet line without full JSON parsing. +// Format: {"_msg":"store trace set","height":150,"count":10,"traces":[...]} +func extractHeight(line []byte) int64 { + idx := bytes.Index(line, heightKey) + if idx == -1 { + return 0 } - if err := os.WriteFile(path, compressed, 0o644); err != nil { - return fmt.Errorf("write trace file: %w", err) + start := idx + len(heightKey) + for start < len(line) && (line[start] == ' ' || line[start] == '\t') { + start++ } - return nil + end := start + for end < len(line) && line[end] >= '0' && line[end] <= '9' { + end++ + } + + if start == end { + return 0 + } + + height, _ := strconv.ParseInt(string(line[start:end]), 10, 64) + return height } -// Load reads trace data for a specific height. +// Load reads trace data for a specific height, filtering from the range file. func (s *TraceStore) Load(height int64) ([]byte, error) { s.mu.RLock() defer s.mu.RUnlock() - path := filepath.Join(s.dir, traceFileName(height)) + r, err := s.findRangeForHeight(height) + if err != nil { + return nil, err + } + if r == nil { + return nil, nil + } + + path := filepath.Join(s.dir, r.filename) data, err := os.ReadFile(path) if err != nil { if os.IsNotExist(err) { @@ -90,7 +151,24 @@ func (s *TraceStore) Load(height int64) ([]byte, error) { } return nil, err } - return data, nil + + lines, err := s.decompressLines(data) + if err != nil { + return nil, err + } + + var filtered [][]byte + for _, line := range lines { + if extractHeight(line) == height { + filtered = append(filtered, line) + } + } + + if len(filtered) == 0 { + return nil, nil + } + + return CompressLines(filtered) } // LoadMultiple loads trace data for multiple heights and returns combined compressed data. @@ -102,11 +180,33 @@ func (s *TraceStore) LoadMultiple(heights []int64) ([]byte, []FrameMeta, error) s.mu.RLock() defer s.mu.RUnlock() + heightSet := make(map[int64]bool) + for _, h := range heights { + heightSet[h] = true + } + + ranges, err := s.listRanges() + if err != nil { + return nil, nil, err + } + var allLines [][]byte var metas []FrameMeta + heightLines := make(map[int64][][]byte) + + for _, r := range ranges { + hasRelevant := false + for _, h := range heights { + if h >= r.minH && h <= r.maxH { + hasRelevant = true + break + } + } + if !hasRelevant { + continue + } - for _, height := range heights { - path := filepath.Join(s.dir, traceFileName(height)) + path := filepath.Join(s.dir, r.filename) data, err := os.ReadFile(path) if err != nil { if os.IsNotExist(err) { @@ -117,14 +217,28 @@ func (s *TraceStore) LoadMultiple(heights []int64) ([]byte, []FrameMeta, error) lines, err := s.decompressLines(data) if err != nil { - logger.Warn().Err(err).Int64("height", height).Msg("failed to decompress trace") + logger.Warn().Err(err).Str("file", r.filename).Msg("failed to decompress trace") continue } + for _, line := range lines { + h := extractHeight(line) + if heightSet[h] { + heightLines[h] = append(heightLines[h], line) + } + } + } + + sort.Slice(heights, func(i, j int) bool { return heights[i] < heights[j] }) + for _, h := range heights { + lines := heightLines[h] + if len(lines) == 0 { + continue + } allLines = append(allLines, lines...) metas = append(metas, FrameMeta{ - File: traceFileName(height), - Frame: uint64(height), + File: fmt.Sprintf("trace-%d.gz", h), + Frame: uint64(h), Recs: uint32(len(lines)), }) } @@ -145,46 +259,20 @@ func (s *TraceStore) LoadMultiple(heights []int64) ([]byte, []FrameMeta, error) return compressed, metas, nil } -// Delete removes trace files for the given heights. -func (s *TraceStore) Delete(heights []int64) error { - s.mu.Lock() - defer s.mu.Unlock() - - var errs []error - for _, height := range heights { - path := filepath.Join(s.dir, traceFileName(height)) - if err := os.Remove(path); err != nil && !os.IsNotExist(err) { - errs = append(errs, err) - } - } - - if len(errs) > 0 { - return fmt.Errorf("failed to delete %d trace files", len(errs)) - } - return nil -} - -// DeleteBefore removes all trace files for heights less than the given height. +// DeleteBefore removes all trace files where maxHeight < given height. func (s *TraceStore) DeleteBefore(height int64) (int, error) { s.mu.Lock() defer s.mu.Unlock() - entries, err := os.ReadDir(s.dir) + ranges, err := s.listRanges() if err != nil { return 0, err } deleted := 0 - for _, entry := range entries { - if entry.IsDir() { - continue - } - h, ok := parseTraceFileName(entry.Name()) - if !ok { - continue - } - if h < height { - path := filepath.Join(s.dir, entry.Name()) + for _, r := range ranges { + if r.maxH < height { + path := filepath.Join(s.dir, r.filename) if err := os.Remove(path); err == nil { deleted++ } @@ -194,30 +282,23 @@ func (s *TraceStore) DeleteBefore(height int64) (int, error) { return deleted, nil } -// ListHeights returns all heights that have stored trace data. +// ListHeights returns all heights covered by stored trace files. func (s *TraceStore) ListHeights() ([]int64, error) { s.mu.RLock() defer s.mu.RUnlock() - entries, err := os.ReadDir(s.dir) + ranges, err := s.listRanges() if err != nil { - if os.IsNotExist(err) { - return nil, nil - } return nil, err } var heights []int64 - for _, entry := range entries { - if entry.IsDir() { - continue - } - if h, ok := parseTraceFileName(entry.Name()); ok { + for _, r := range ranges { + for h := r.minH; h <= r.maxH; h++ { heights = append(heights, h) } } - sort.Slice(heights, func(i, j int) bool { return heights[i] < heights[j] }) return heights, nil } @@ -239,7 +320,7 @@ func (s *TraceStore) Size() (int64, error) { if entry.IsDir() { continue } - if _, ok := parseTraceFileName(entry.Name()); ok { + if _, _, ok := parseRangeFileName(entry.Name()); ok { info, err := entry.Info() if err == nil { total += info.Size() @@ -278,16 +359,18 @@ func (s *TraceStore) Exists(height int64) bool { s.mu.RLock() defer s.mu.RUnlock() - path := filepath.Join(s.dir, traceFileName(height)) - _, err := os.Stat(path) - return err == nil + r, err := s.findRangeForHeight(height) + return err == nil && r != nil } // Count returns the number of stored trace files. func (s *TraceStore) Count() (int, error) { - heights, err := s.ListHeights() + s.mu.RLock() + defer s.mu.RUnlock() + + ranges, err := s.listRanges() if err != nil { return 0, err } - return len(heights), nil + return len(ranges), nil }