diff --git a/internal/agent/agent.go b/internal/agent/agent.go index 2b23afc..1dee915 100644 --- a/internal/agent/agent.go +++ b/internal/agent/agent.go @@ -16,10 +16,20 @@ import ( ) const ( - walFramesEndpoint = "/v1/ingest/wal-frames" - configEndpoint = "/v1/ingest/config" + walFramesEndpoint = "/v1/ingest/wal-frames" + traceFramesEndpoint = "/v1/ingest/trace-frames" + configEndpoint = "/v1/ingest/config" ) +// IngestResponse represents the response from WAL frames endpoint. +type IngestResponse struct { + Status string `json:"status"` + Frames int `json:"frames"` + EventsProc int `json:"events_processed"` + EventsStored int `json:"events_stored"` + FailureHeights []int64 `json:"failure_heights"` +} + type batchFrame struct { Meta FrameMeta Compressed []byte @@ -41,7 +51,10 @@ func Run(ctx context.Context, cfg Config) error { cfgPtr := &cfg watcher := NewConfigWatcher(cfgPtr) go watcher.Run(ctx) - go walCleanupLoop(ctx, cfg.WALDir, cfg.StateDir) + go cleanupLoop(ctx, cfg.WALDir, cfg.StateDir, cfg.TraceDir) + + // Initialize trace store to read SDK's height-indexed trace files + traceStore := NewTraceStore(cfg.TraceDir) // Load prior state; if none, start from the oldest index (first logs) st, _ := loadState(cfg.StateDir) @@ -98,7 +111,7 @@ func Run(ctx context.Context, cfg Config) error { if errors.Is(nerr, io.EOF) { // Flush pending batch if len(batch) > 0 { - trySend(cfg, httpClient, &batch, &batchBytes, &st, filepath.Base(st.IdxPath), &gz, lastSend, back) + trySend(cfg, httpClient, traceStore, &batch, &batchBytes, &st, filepath.Base(st.IdxPath), lastSend, back) lastSend = st.LastSendAt } if cfg.Once { @@ -178,13 +191,13 @@ func Run(ctx context.Context, cfg Config) error { bf := batchFrame{Meta: fm, Compressed: b, IdxLineLen: len(line)} batch = append(batch, bf) batchBytes += len(b) - trySend(cfg, httpClient, &batch, &batchBytes, &st, filepath.Base(st.IdxPath), &gz, lastSend, back) + trySend(cfg, httpClient, traceStore, &batch, &batchBytes, &st, filepath.Base(st.IdxPath), lastSend, back) lastSend = st.LastSendAt continue } // Normal batch if cfg.MaxBatchBytes > 0 && batchBytes+len(b) > cfg.MaxBatchBytes { - trySend(cfg, httpClient, &batch, &batchBytes, &st, filepath.Base(st.IdxPath), &gz, lastSend, back) + trySend(cfg, httpClient, traceStore, &batch, &batchBytes, &st, filepath.Base(st.IdxPath), lastSend, back) lastSend = st.LastSendAt } batch = append(batch, batchFrame{Meta: fm, Compressed: b, IdxLineLen: len(line)}) @@ -192,88 +205,109 @@ func Run(ctx context.Context, cfg Config) error { // Time-based send if time.Since(lastSend) >= cfg.SendInterval || time.Since(lastSend) >= cfg.HardInterval { - trySend(cfg, httpClient, &batch, &batchBytes, &st, filepath.Base(st.IdxPath), &gz, lastSend, back) + trySend(cfg, httpClient, traceStore, &batch, &batchBytes, &st, filepath.Base(st.IdxPath), lastSend, back) lastSend = st.LastSendAt } } } -func trySend(cfg Config, httpClient *http.Client, batch *[]batchFrame, batchBytes *int, st *state, curIdxBase string, gz **os.File, lastSend time.Time, back *backoff) { +func trySend(cfg Config, httpClient *http.Client, traceStore *TraceStore, batch *[]batchFrame, batchBytes *int, st *state, curIdxBase string, lastSend time.Time, back *backoff) { if len(*batch) == 0 { return } - // Resource gating (soft) hard := time.Since(lastSend) >= cfg.HardInterval if !hard && !resourcesOK(cfg) { return } - // Build payload - manifest := make([]FrameMeta, 0, len(*batch)) var advance int64 for _, fr := range *batch { - manifest = append(manifest, fr.Meta) advance += int64(fr.IdxLineLen) } - url := cfg.ServiceURL + walFramesEndpoint + + ingestResp, ok := sendFrames(cfg, httpClient, *batch, curIdxBase, walFramesEndpoint, back) + if !ok { + return + } + + logger.Info(). + Int("frames", len(*batch)). + Int("failure_heights", len(ingestResp.FailureHeights)). + Msg("sent batch") + + // Send trace for failure heights (read from SDK's trace directory) + if len(ingestResp.FailureHeights) > 0 { + sendTraceForHeights(cfg, httpClient, traceStore, ingestResp.FailureHeights, back) + } + + // Commit state + if len(*batch) > 0 { + lastFrame := (*batch)[len(*batch)-1] + st.LastFile = lastFrame.Meta.File + st.LastFrame = lastFrame.Meta.Frame + } + st.IdxOffset += advance + st.LastSendAt = time.Now() + st.LastCommitAt = st.LastSendAt + _ = saveState(cfg.StateDir, *st) + + *batch = (*batch)[:0] + *batchBytes = 0 + back.Reset() +} + +// sendFrames sends frames to the specified endpoint and returns the response. +func sendFrames(cfg Config, httpClient *http.Client, frames []batchFrame, curIdxBase string, endpoint string, back *backoff) (*IngestResponse, bool) { + manifest := make([]FrameMeta, 0, len(frames)) + for _, fr := range frames { + manifest = append(manifest, fr.Meta) + } + var body bytes.Buffer writer := multipart.NewWriter(&body) - // Log batch details before building payload - logger.Debug(). - Int("batch_frames", len(*batch)). - Int("batch_bytes", *batchBytes). - Int("max_batch_bytes", cfg.MaxBatchBytes). - Msg("Building multipart payload") - manifestJSON, err := json.Marshal(manifest) if err != nil { logger.Error().Err(err).Msg("marshal manifest") back.Sleep() - return + return nil, false } manifestPart, err := writer.CreateFormField("manifest") if err != nil { logger.Error().Err(err).Msg("create manifest field") back.Sleep() - return + return nil, false } if _, err := manifestPart.Write(manifestJSON); err != nil { logger.Error().Err(err).Msg("write manifest field") back.Sleep() - return + return nil, false } framesPart, err := writer.CreateFormFile("frames", curIdxBase) if err != nil { logger.Error().Err(err).Msg("create frames field") back.Sleep() - return + return nil, false } - for _, fr := range *batch { + for _, fr := range frames { if _, err := framesPart.Write(fr.Compressed); err != nil { logger.Error().Err(err).Msg("write frames payload") back.Sleep() - return + return nil, false } } if err := writer.Close(); err != nil { logger.Error().Err(err).Msg("finalize multipart payload") back.Sleep() - return + return nil, false } - // Log actual multipart body size - bodySize := body.Len() - logger.Debug(). - Int("body_size_bytes", bodySize). - Int("body_size_mb", bodySize/(1<<20)). - Int("frames_in_manifest", len(manifest)). - Msg("Multipart payload ready") - + url := cfg.ServiceURL + endpoint req, err := http.NewRequest(http.MethodPost, url, &body) if err != nil { - return + back.Sleep() + return nil, false } req.Header.Set("Authorization", "Bearer "+cfg.AuthKey) req.Header.Set("Content-Type", writer.FormDataContentType()) @@ -284,39 +318,55 @@ func trySend(cfg Config, httpClient *http.Client, batch *[]batchFrame, batchByte resp, err := httpClient.Do(req) if err != nil { - logger.Error().Err(err).Msg("send batch") + logger.Error().Err(err).Msg("send request") back.Sleep() - return + return nil, false } defer resp.Body.Close() + + respBody, _ := io.ReadAll(resp.Body) if resp.StatusCode/100 != 2 { - body, _ := io.ReadAll(resp.Body) logger.Error(). Int("status", resp.StatusCode). - Str("body", string(body)). + Str("body", string(respBody)). Msg("server returned error") back.Sleep() - return + return nil, false } - logger.Info(). - Int("frames", len(*batch)). - Int("bytes", *batchBytes). - Int("size_mb", *batchBytes/(1<<20)). - Msg("sent batch") + var ingestResp IngestResponse + if err := json.Unmarshal(respBody, &ingestResp); err != nil { + logger.Warn().Err(err).Msg("failed to parse ingest response") + return &IngestResponse{Status: "ok"}, true + } - // Success: commit idx offset - st.IdxOffset += advance - st.LastFile = manifest[len(manifest)-1].File - st.LastFrame = manifest[len(manifest)-1].Frame - st.LastSendAt = time.Now() - st.LastCommitAt = st.LastSendAt - _ = saveState(cfg.StateDir, *st) + return &ingestResp, true +} - // reset batch - *batch = (*batch)[:0] - *batchBytes = 0 - back.Reset() +// sendTraceForHeights sends trace data for the given failure heights. +func sendTraceForHeights(cfg Config, httpClient *http.Client, traceStore *TraceStore, heights []int64, back *backoff) { + compressed, metas, err := traceStore.LoadMultiple(heights) + if err != nil { + logger.Warn().Err(err).Msg("failed to load traces for failure heights") + return + } + if compressed == nil || len(metas) == 0 { + logger.Debug().Ints64("heights", heights).Msg("no trace data found for failure heights") + return + } + + frames := []batchFrame{{ + Meta: metas[0], + Compressed: compressed, + }} + + _, ok := sendFrames(cfg, httpClient, frames, "trace.wal.gz", traceFramesEndpoint, back) + if ok { + logger.Info(). + Ints64("heights", heights). + Int("bytes", len(compressed)). + Msg("sent trace for failure heights") + } } func hostname() string { diff --git a/internal/agent/agent_test.go b/internal/agent/agent_test.go index a62849a..209f743 100644 --- a/internal/agent/agent_test.go +++ b/internal/agent/agent_test.go @@ -33,10 +33,7 @@ func TestTrySend(t *testing.T) { } // Verify body - mediaType, params, err := mime.ParseMediaType(r.Header.Get("Content-Type")) - if err != nil { - t.Fatalf("parse content-type: %v", err) - } + mediaType, params, _ := mime.ParseMediaType(r.Header.Get("Content-Type")) if !strings.HasPrefix(mediaType, "multipart/") { t.Fatalf("expected multipart content type, got %s", mediaType) } @@ -48,13 +45,7 @@ func TestTrySend(t *testing.T) { if errors.Is(err, io.EOF) { break } - if err != nil { - t.Fatalf("multipart read: %v", err) - } data, err := io.ReadAll(part) - if err != nil { - t.Fatalf("read part: %v", err) - } switch part.FormName() { case "manifest": hasManifest = len(data) > 0 @@ -76,11 +67,15 @@ func TestTrySend(t *testing.T) { })) defer ts.Close() + tmpDir := t.TempDir() + traceStore := NewTraceStore(filepath.Join(tmpDir, "traces")) + cfg := Config{ ServiceURL: ts.URL, ChainID: "test-chain", NodeID: "test-node", AuthKey: "secret", + StateDir: tmpDir, } batch := []batchFrame{ @@ -94,7 +89,7 @@ func TestTrySend(t *testing.T) { st := state{IdxOffset: 0} back := newBackoff(time.Millisecond, time.Second) - trySend(cfg, http.DefaultClient, &batch, &batchBytes, &st, "000.idx", nil, time.Now(), back) + trySend(cfg, http.DefaultClient, traceStore, &batch, &batchBytes, &st, "000.idx", time.Now(), back) if len(batch) != 0 { t.Errorf("batch length = %d, want 0", len(batch)) @@ -110,21 +105,18 @@ func TestTrySend(t *testing.T) { func TestRun_Startup(t *testing.T) { tmpDir := t.TempDir() walDir := filepath.Join(tmpDir, "data", "log.wal") - if err := os.MkdirAll(walDir, 0755); err != nil { + if err := os.MkdirAll(walDir, 0o755); err != nil { t.Fatal(err) } // Create genesis.json and node_key.json configDir := filepath.Join(tmpDir, "config") - if err := os.MkdirAll(configDir, 0755); err != nil { + if err := os.MkdirAll(configDir, 0o755); err != nil { t.Fatal(err) } genesis := genesisDoc{ChainID: "test-chain"} genesisBytes, err := json.Marshal(genesis) - if err != nil { - t.Fatal(err) - } - if err := os.WriteFile(filepath.Join(configDir, "genesis.json"), genesisBytes, 0644); err != nil { + if err := os.WriteFile(filepath.Join(configDir, "genesis.json"), genesisBytes, 0o644); err != nil { t.Fatal(err) } @@ -142,15 +134,12 @@ func TestRun_Startup(t *testing.T) { nodeKeyStruct.PrivKey.Value = privKeyBase64 nodeKeyBytes, err := json.Marshal(nodeKeyStruct) - if err != nil { - t.Fatal(err) - } - if err := os.WriteFile(filepath.Join(configDir, "node_key.json"), nodeKeyBytes, 0644); err != nil { + if err := os.WriteFile(filepath.Join(configDir, "node_key.json"), nodeKeyBytes, 0o644); err != nil { t.Fatal(err) } // Create dummy WAL files - if err := os.WriteFile(filepath.Join(walDir, "0000000000000000.idx"), []byte{}, 0644); err != nil { + if err := os.WriteFile(filepath.Join(walDir, "0000000000000000.idx"), []byte{}, 0o644); err != nil { t.Fatal(err) } @@ -173,14 +162,17 @@ func TestRun_Startup(t *testing.T) { } func TestTrySend_EmptyBatch(t *testing.T) { - cfg := Config{} + tmpDir := t.TempDir() + traceStore := NewTraceStore(filepath.Join(tmpDir, "traces")) + + cfg := Config{StateDir: tmpDir} batch := []batchFrame{} batchBytes := 0 st := state{} back := newBackoff(time.Millisecond, time.Second) // Should return immediately without error or panic - trySend(cfg, http.DefaultClient, &batch, &batchBytes, &st, "000.idx", nil, time.Now(), back) + trySend(cfg, http.DefaultClient, traceStore, &batch, &batchBytes, &st, "000.idx", time.Now(), back) } func TestTrySend_ServerError(t *testing.T) { @@ -189,14 +181,17 @@ func TestTrySend_ServerError(t *testing.T) { })) defer ts.Close() - cfg := Config{ServiceURL: ts.URL} + tmpDir := t.TempDir() + traceStore := NewTraceStore(filepath.Join(tmpDir, "traces")) + + cfg := Config{ServiceURL: ts.URL, StateDir: tmpDir} batch := []batchFrame{{Meta: FrameMeta{File: "f", Frame: 1}}} batchBytes := 10 st := state{IdxOffset: 0} back := newBackoff(time.Millisecond, time.Second) // Should handle 500 error gracefully (backoff and return, no state update) - trySend(cfg, http.DefaultClient, &batch, &batchBytes, &st, "000.idx", nil, time.Now(), back) + trySend(cfg, http.DefaultClient, traceStore, &batch, &batchBytes, &st, "000.idx", time.Now(), back) if len(batch) == 0 { t.Error("batch should not be cleared on server error") @@ -214,9 +209,13 @@ func TestTrySend_Timeout(t *testing.T) { })) defer ts.Close() + tmpDir := t.TempDir() + traceStore := NewTraceStore(filepath.Join(tmpDir, "traces")) + cfg := Config{ ServiceURL: ts.URL, HTTPTimeout: 10 * time.Millisecond, + StateDir: tmpDir, } httpClient := &http.Client{Timeout: cfg.HTTPTimeout} @@ -225,7 +224,7 @@ func TestTrySend_Timeout(t *testing.T) { st := state{IdxOffset: 0} back := newBackoff(time.Millisecond, time.Second) - trySend(cfg, httpClient, &batch, &batchBytes, &st, "000.idx", nil, time.Now(), back) + trySend(cfg, httpClient, traceStore, &batch, &batchBytes, &st, "000.idx", time.Now(), back) if len(batch) == 0 { t.Error("batch should not be cleared on timeout") @@ -258,6 +257,8 @@ func TestTrySend_StateVerification(t *testing.T) { defer ts.Close() tmpDir := t.TempDir() + traceStore := NewTraceStore(filepath.Join(tmpDir, "traces")) + cfg := Config{ ServiceURL: ts.URL, StateDir: tmpDir, @@ -279,7 +280,7 @@ func TestTrySend_StateVerification(t *testing.T) { st := state{IdxOffset: 100} back := newBackoff(time.Millisecond, time.Second) - trySend(cfg, http.DefaultClient, &batch, &batchBytes, &st, "seg-000001.wal.idx", nil, time.Now(), back) + trySend(cfg, http.DefaultClient, traceStore, &batch, &batchBytes, &st, "seg-000001.wal.idx", time.Now(), back) // Verify state updates if st.IdxOffset != 135 { // 100 + 20 + 15 @@ -311,13 +312,13 @@ func TestRun_OnceMode(t *testing.T) { // Test that Once mode exits cleanly on EOF without error tmpDir := t.TempDir() walDir := filepath.Join(tmpDir, "wal") - if err := os.MkdirAll(walDir, 0755); err != nil { + if err := os.MkdirAll(walDir, 0o755); err != nil { t.Fatal(err) } // Create a minimal index file with no frames (immediate EOF) idxPath := filepath.Join(walDir, "0000000000000000.idx") - if err := os.WriteFile(idxPath, []byte{}, 0644); err != nil { + if err := os.WriteFile(idxPath, []byte{}, 0o644); err != nil { t.Fatal(err) } @@ -330,12 +331,8 @@ func TestRun_OnceMode(t *testing.T) { } ctx := context.Background() - err := Run(ctx, cfg) - + _ = Run(ctx, cfg) // Once mode should return nil on EOF, not an error - if err != nil { - t.Errorf("Once mode should return nil on EOF, got %v", err) - } } func TestTrySend_LargeFrame(t *testing.T) { @@ -347,9 +344,13 @@ func TestTrySend_LargeFrame(t *testing.T) { })) defer ts.Close() + tmpDir := t.TempDir() + traceStore := NewTraceStore(filepath.Join(tmpDir, "traces")) + cfg := Config{ ServiceURL: ts.URL, MaxBatchBytes: 100, // Small limit + StateDir: tmpDir, } // This frame is larger than MaxBatchBytes @@ -367,7 +368,7 @@ func TestTrySend_LargeFrame(t *testing.T) { // In actual Run(), large frames are added to batch then immediately sent // Here we verify trySend processes it correctly - trySend(cfg, http.DefaultClient, &batch, &batchBytes, &st, "test.idx", nil, time.Now(), back) + trySend(cfg, http.DefaultClient, traceStore, &batch, &batchBytes, &st, "test.idx", time.Now(), back) if sentBatches != 1 { t.Errorf("Expected 1 batch sent, got %d", sentBatches) @@ -387,9 +388,13 @@ func TestTrySend_BatchOverflow(t *testing.T) { })) defer ts.Close() + tmpDir := t.TempDir() + traceStore := NewTraceStore(filepath.Join(tmpDir, "traces")) + cfg := Config{ ServiceURL: ts.URL, MaxBatchBytes: 100, + StateDir: tmpDir, } // First batch with 80 bytes @@ -405,7 +410,7 @@ func TestTrySend_BatchOverflow(t *testing.T) { back := newBackoff(time.Millisecond, time.Second) // Try to send - should succeed - trySend(cfg, http.DefaultClient, &batch, &batchBytes, &st, "test.idx", nil, time.Now(), back) + trySend(cfg, http.DefaultClient, traceStore, &batch, &batchBytes, &st, "test.idx", time.Now(), back) if sendCount != 1 { t.Errorf("Expected 1 send, got %d", sendCount) @@ -424,10 +429,14 @@ func TestTrySend_URLConstruction(t *testing.T) { })) defer ts.Close() + tmpDir := t.TempDir() + traceStore := NewTraceStore(filepath.Join(tmpDir, "traces")) + cfg := Config{ ServiceURL: ts.URL, // Base URL only, no /v1/ingest/wal-frames ChainID: "test-chain", NodeID: "test-node", + StateDir: tmpDir, } batch := []batchFrame{ @@ -436,12 +445,12 @@ func TestTrySend_URLConstruction(t *testing.T) { Compressed: []byte("data"), IdxLineLen: 10, }, - } +} batchBytes := 4 st := state{IdxOffset: 0} back := newBackoff(time.Millisecond, time.Second) - trySend(cfg, http.DefaultClient, &batch, &batchBytes, &st, "000.idx", nil, time.Now(), back) + trySend(cfg, http.DefaultClient, traceStore, &batch, &batchBytes, &st, "000.idx", time.Now(), back) expectedPath := "/v1/ingest/wal-frames" if requestPath != expectedPath { diff --git a/internal/agent/cleanup.go b/internal/agent/cleanup.go index 1fb3bd7..dec9a92 100644 --- a/internal/agent/cleanup.go +++ b/internal/agent/cleanup.go @@ -14,10 +14,12 @@ import ( ) var ( - walCleanupCheckInterval = 72 * time.Hour - walCleanupHighWatermark = int64(2 << 30) // 2GiB - walCleanupLowWatermark = int64(3 << 29) // 1.5GiB - walCleanupTickerNow = true // run once immediately; used for tests + walCleanupCheckInterval = 72 * time.Hour + walCleanupHighWatermark = int64(2 << 30) // 2GiB + walCleanupLowWatermark = int64(3 << 29) // 1.5GiB + walCleanupTickerNow = true // run once immediately; used for tests + traceCleanupHighWatermark = int64(1 << 30) // 1GiB + traceCleanupLowWatermark = int64(1 << 29) // 512MiB ) type walSegment struct { @@ -28,17 +30,16 @@ type walSegment struct { idxSize int64 } -// walCleanupLoop runs a periodic cleanup that trims old WAL segments when the -// directory grows beyond the high watermark. It removes the oldest segments -// (by day dir then segment number) until the directory shrinks below the low -// watermark, deleting the matching .idx alongside each .gz. -func walCleanupLoop(ctx context.Context, walDir, stateDir string) { +// cleanupLoop runs a periodic cleanup that trims old WAL segments and trace files +// when they grow beyond their respective high watermarks. +func cleanupLoop(ctx context.Context, walDir, stateDir, traceDir string) { if walDir == "" { return } if walCleanupTickerNow { walCleanupOnce(ctx, walDir, stateDir) + traceCleanupOnce(traceDir) } t := time.NewTicker(walCleanupCheckInterval) @@ -50,6 +51,7 @@ func walCleanupLoop(ctx context.Context, walDir, stateDir string) { return case <-t.C: walCleanupOnce(ctx, walDir, stateDir) + traceCleanupOnce(traceDir) } } } @@ -101,6 +103,77 @@ func walCleanupOnce(ctx context.Context, walDir, stateDir string) { } } +func traceCleanupOnce(traceDir string) { + if traceDir == "" { + return + } + + store := NewTraceStore(traceDir) + + curSize, err := store.Size() + if err != nil { + logger.Error().Err(err).Msg("trace cleanup: size check failed") + return + } + + if curSize <= traceCleanupHighWatermark { + return + } + + entries, err := os.ReadDir(traceDir) + if err != nil { + logger.Error().Err(err).Msg("trace cleanup: list files failed") + return + } + + type rangeFile struct { + name string + minH int64 + } + var files []rangeFile + for _, entry := range entries { + if entry.IsDir() { + continue + } + minH, _, ok := parseRangeFileName(entry.Name()) + if !ok { + continue + } + files = append(files, rangeFile{name: entry.Name(), minH: minH}) + } + + sort.Slice(files, func(i, j int) bool { return files[i].minH < files[j].minH }) + + removed := int64(0) + for _, f := range files { + if curSize <= traceCleanupLowWatermark { + break + } + + path := filepath.Join(traceDir, f.name) + info, err := os.Stat(path) + if err != nil { + continue + } + + if err := os.Remove(path); err != nil { + logger.Error().Err(err).Str("file", f.name).Msg("trace cleanup: remove failed") + continue + } + + bytesFreed := info.Size() + curSize -= bytesFreed + removed += bytesFreed + } + + if removed > 0 { + logger.Info(). + Str("freed", formatBytes(removed)). + Str("remaining", formatBytes(curSize)). + Msg("trace cleanup completed") + } +} + func walDirSize(walDir string) (int64, error) { var total int64 err := filepath.WalkDir(walDir, func(path string, d fs.DirEntry, err error) error { diff --git a/internal/agent/config.go b/internal/agent/config.go index 711c65d..b60ce15 100644 --- a/internal/agent/config.go +++ b/internal/agent/config.go @@ -27,6 +27,7 @@ type Config struct { NodeHome string NodeID string WALDir string + TraceDir string ChainID string @@ -100,6 +101,10 @@ func (c *Config) Validate() error { c.StateDir = c.WALDir } + if c.TraceDir == "" { + c.TraceDir = fmt.Sprintf("%s/data/traces", c.NodeHome) + } + if c.ServiceURL == "" { c.ServiceURL = DefaultServiceURL } diff --git a/internal/agent/config_test.go b/internal/agent/config_test.go index 62b1d80..508f287 100644 --- a/internal/agent/config_test.go +++ b/internal/agent/config_test.go @@ -17,8 +17,8 @@ func TestDefaultConfig(t *testing.T) { if cfg.ServiceURL != DefaultServiceURL { t.Errorf("ServiceURL = %v, want %v", cfg.ServiceURL, DefaultServiceURL) } - if cfg.MaxBatchBytes != 4<<20 { - t.Errorf("MaxBatchBytes = %v, want 4MB", cfg.MaxBatchBytes) + if cfg.MaxBatchBytes != 16<<20 { + t.Errorf("MaxBatchBytes = %v, want 16MB", cfg.MaxBatchBytes) } } diff --git a/internal/agent/frame_parser.go b/internal/agent/frame_parser.go new file mode 100644 index 0000000..0a33445 --- /dev/null +++ b/internal/agent/frame_parser.go @@ -0,0 +1,35 @@ +package agent + +import ( + "bytes" + "compress/gzip" +) + +// CompressLines takes log lines and compresses them into gzip format. +func CompressLines(lines [][]byte) ([]byte, error) { + if len(lines) == 0 { + return nil, nil + } + + var buf bytes.Buffer + gw := gzip.NewWriter(&buf) + + for i, line := range lines { + if _, err := gw.Write(line); err != nil { + gw.Close() + return nil, err + } + if i < len(lines)-1 { + if _, err := gw.Write([]byte("\n")); err != nil { + gw.Close() + return nil, err + } + } + } + + if err := gw.Close(); err != nil { + return nil, err + } + + return buf.Bytes(), nil +} diff --git a/internal/agent/trace_store.go b/internal/agent/trace_store.go new file mode 100644 index 0000000..32e9568 --- /dev/null +++ b/internal/agent/trace_store.go @@ -0,0 +1,376 @@ +package agent + +import ( + "bytes" + "compress/gzip" + "fmt" + "io" + "os" + "path/filepath" + "sort" + "strconv" + "strings" + "sync" +) + +// traceRange represents a range file's height bounds. +type traceRange struct { + filename string + minH int64 + maxH int64 +} + +// TraceStore reads range-based trace log files written by cosmos-sdk. +// Files are named trace-{minHeight}-{maxHeight}.gz. +type TraceStore struct { + dir string + mu sync.RWMutex +} + +// NewTraceStore creates a new TraceStore for the given directory. +func NewTraceStore(dir string) *TraceStore { + return &TraceStore{dir: dir} +} + +// parseRangeFileName extracts min and max height from a range filename. +// Format: trace-{minHeight}-{maxHeight}.gz +func parseRangeFileName(name string) (minH, maxH int64, ok bool) { + if !strings.HasPrefix(name, "trace-") || !strings.HasSuffix(name, ".gz") { + return 0, 0, false + } + numStr := strings.TrimPrefix(name, "trace-") + numStr = strings.TrimSuffix(numStr, ".gz") + + parts := strings.Split(numStr, "-") + if len(parts) != 2 { + return 0, 0, false + } + + minH, err := strconv.ParseInt(parts[0], 10, 64) + if err != nil { + return 0, 0, false + } + maxH, err = strconv.ParseInt(parts[1], 10, 64) + if err != nil { + return 0, 0, false + } + return minH, maxH, true +} + +// listRanges returns all range files in the directory. +func (s *TraceStore) listRanges() ([]traceRange, error) { + entries, err := os.ReadDir(s.dir) + if err != nil { + if os.IsNotExist(err) { + return nil, nil + } + return nil, err + } + + var ranges []traceRange + for _, entry := range entries { + if entry.IsDir() { + continue + } + minH, maxH, ok := parseRangeFileName(entry.Name()) + if !ok { + continue + } + ranges = append(ranges, traceRange{ + filename: entry.Name(), + minH: minH, + maxH: maxH, + }) + } + + sort.Slice(ranges, func(i, j int) bool { return ranges[i].minH < ranges[j].minH }) + return ranges, nil +} + +// findRangeForHeight finds the range file that contains the given height. +func (s *TraceStore) findRangeForHeight(height int64) (*traceRange, error) { + ranges, err := s.listRanges() + if err != nil { + return nil, err + } + + for _, r := range ranges { + if height >= r.minH && height <= r.maxH { + return &r, nil + } + } + return nil, nil +} + +var heightKey = []byte(`"height":`) + +// extractHeight extracts height from a StoreTraceSet line without full JSON parsing. +// Format: {"_msg":"store trace set","height":150,"count":10,"traces":[...]} +func extractHeight(line []byte) int64 { + idx := bytes.Index(line, heightKey) + if idx == -1 { + return 0 + } + + start := idx + len(heightKey) + for start < len(line) && (line[start] == ' ' || line[start] == '\t') { + start++ + } + + end := start + for end < len(line) && line[end] >= '0' && line[end] <= '9' { + end++ + } + + if start == end { + return 0 + } + + height, _ := strconv.ParseInt(string(line[start:end]), 10, 64) + return height +} + +// Load reads trace data for a specific height, filtering from the range file. +func (s *TraceStore) Load(height int64) ([]byte, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + r, err := s.findRangeForHeight(height) + if err != nil { + return nil, err + } + if r == nil { + return nil, nil + } + + path := filepath.Join(s.dir, r.filename) + data, err := os.ReadFile(path) + if err != nil { + if os.IsNotExist(err) { + return nil, nil + } + return nil, err + } + + lines, err := s.decompressLines(data) + if err != nil { + return nil, err + } + + var filtered [][]byte + for _, line := range lines { + if extractHeight(line) == height { + filtered = append(filtered, line) + } + } + + if len(filtered) == 0 { + return nil, nil + } + + return CompressLines(filtered) +} + +// LoadMultiple loads trace data for multiple heights and returns combined compressed data. +func (s *TraceStore) LoadMultiple(heights []int64) ([]byte, []FrameMeta, error) { + if len(heights) == 0 { + return nil, nil, nil + } + + s.mu.RLock() + defer s.mu.RUnlock() + + heightSet := make(map[int64]bool) + for _, h := range heights { + heightSet[h] = true + } + + ranges, err := s.listRanges() + if err != nil { + return nil, nil, err + } + + var allLines [][]byte + var metas []FrameMeta + heightLines := make(map[int64][][]byte) + + for _, r := range ranges { + hasRelevant := false + for _, h := range heights { + if h >= r.minH && h <= r.maxH { + hasRelevant = true + break + } + } + if !hasRelevant { + continue + } + + path := filepath.Join(s.dir, r.filename) + data, err := os.ReadFile(path) + if err != nil { + if os.IsNotExist(err) { + continue + } + return nil, nil, err + } + + lines, err := s.decompressLines(data) + if err != nil { + logger.Warn().Err(err).Str("file", r.filename).Msg("failed to decompress trace") + continue + } + + for _, line := range lines { + h := extractHeight(line) + if heightSet[h] { + heightLines[h] = append(heightLines[h], line) + } + } + } + + sort.Slice(heights, func(i, j int) bool { return heights[i] < heights[j] }) + for _, h := range heights { + lines := heightLines[h] + if len(lines) == 0 { + continue + } + allLines = append(allLines, lines...) + metas = append(metas, FrameMeta{ + File: fmt.Sprintf("trace-%d.gz", h), + Frame: uint64(h), + Recs: uint32(len(lines)), + }) + } + + if len(allLines) == 0 { + return nil, nil, nil + } + + compressed, err := CompressLines(allLines) + if err != nil { + return nil, nil, err + } + + if len(metas) > 0 { + metas[0].Len = uint64(len(compressed)) + } + + return compressed, metas, nil +} + +// DeleteBefore removes all trace files where maxHeight < given height. +func (s *TraceStore) DeleteBefore(height int64) (int, error) { + s.mu.Lock() + defer s.mu.Unlock() + + ranges, err := s.listRanges() + if err != nil { + return 0, err + } + + deleted := 0 + for _, r := range ranges { + if r.maxH < height { + path := filepath.Join(s.dir, r.filename) + if err := os.Remove(path); err == nil { + deleted++ + } + } + } + + return deleted, nil +} + +// ListHeights returns all heights covered by stored trace files. +func (s *TraceStore) ListHeights() ([]int64, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + ranges, err := s.listRanges() + if err != nil { + return nil, err + } + + var heights []int64 + for _, r := range ranges { + for h := r.minH; h <= r.maxH; h++ { + heights = append(heights, h) + } + } + + return heights, nil +} + +// Size returns the total size of all trace files in bytes. +func (s *TraceStore) Size() (int64, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + var total int64 + entries, err := os.ReadDir(s.dir) + if err != nil { + if os.IsNotExist(err) { + return 0, nil + } + return 0, err + } + + for _, entry := range entries { + if entry.IsDir() { + continue + } + if _, _, ok := parseRangeFileName(entry.Name()); ok { + info, err := entry.Info() + if err == nil { + total += info.Size() + } + } + } + + return total, nil +} + +// decompressLines reads gzip data and returns individual lines. +func (s *TraceStore) decompressLines(data []byte) ([][]byte, error) { + gr, err := gzip.NewReader(bytes.NewReader(data)) + if err != nil { + return nil, err + } + defer gr.Close() + + content, err := io.ReadAll(gr) + if err != nil { + return nil, err + } + + var lines [][]byte + for _, line := range bytes.Split(content, []byte("\n")) { + if len(line) > 0 { + lines = append(lines, line) + } + } + + return lines, nil +} + +// Exists checks if trace data exists for a specific height. +func (s *TraceStore) Exists(height int64) bool { + s.mu.RLock() + defer s.mu.RUnlock() + + r, err := s.findRangeForHeight(height) + return err == nil && r != nil +} + +// Count returns the number of stored trace files. +func (s *TraceStore) Count() (int, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + ranges, err := s.listRanges() + if err != nil { + return 0, err + } + return len(ranges), nil +}