Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 107 additions & 57 deletions internal/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,20 @@ import (
)

const (
walFramesEndpoint = "/v1/ingest/wal-frames"
configEndpoint = "/v1/ingest/config"
walFramesEndpoint = "/v1/ingest/wal-frames"
traceFramesEndpoint = "/v1/ingest/trace-frames"
configEndpoint = "/v1/ingest/config"
)

// IngestResponse represents the response from WAL frames endpoint.
type IngestResponse struct {
Status string `json:"status"`
Frames int `json:"frames"`
EventsProc int `json:"events_processed"`
EventsStored int `json:"events_stored"`
FailureHeights []int64 `json:"failure_heights"`
}

type batchFrame struct {
Meta FrameMeta
Compressed []byte
Expand All @@ -41,7 +51,10 @@ func Run(ctx context.Context, cfg Config) error {
cfgPtr := &cfg
watcher := NewConfigWatcher(cfgPtr)
go watcher.Run(ctx)
go walCleanupLoop(ctx, cfg.WALDir, cfg.StateDir)
go cleanupLoop(ctx, cfg.WALDir, cfg.StateDir, cfg.TraceDir)

// Initialize trace store to read SDK's height-indexed trace files
traceStore := NewTraceStore(cfg.TraceDir)

// Load prior state; if none, start from the oldest index (first logs)
st, _ := loadState(cfg.StateDir)
Expand Down Expand Up @@ -98,7 +111,7 @@ func Run(ctx context.Context, cfg Config) error {
if errors.Is(nerr, io.EOF) {
// Flush pending batch
if len(batch) > 0 {
trySend(cfg, httpClient, &batch, &batchBytes, &st, filepath.Base(st.IdxPath), &gz, lastSend, back)
trySend(cfg, httpClient, traceStore, &batch, &batchBytes, &st, filepath.Base(st.IdxPath), lastSend, back)
lastSend = st.LastSendAt
}
if cfg.Once {
Expand Down Expand Up @@ -178,102 +191,123 @@ func Run(ctx context.Context, cfg Config) error {
bf := batchFrame{Meta: fm, Compressed: b, IdxLineLen: len(line)}
batch = append(batch, bf)
batchBytes += len(b)
trySend(cfg, httpClient, &batch, &batchBytes, &st, filepath.Base(st.IdxPath), &gz, lastSend, back)
trySend(cfg, httpClient, traceStore, &batch, &batchBytes, &st, filepath.Base(st.IdxPath), lastSend, back)
lastSend = st.LastSendAt
continue
}
// Normal batch
if cfg.MaxBatchBytes > 0 && batchBytes+len(b) > cfg.MaxBatchBytes {
trySend(cfg, httpClient, &batch, &batchBytes, &st, filepath.Base(st.IdxPath), &gz, lastSend, back)
trySend(cfg, httpClient, traceStore, &batch, &batchBytes, &st, filepath.Base(st.IdxPath), lastSend, back)
lastSend = st.LastSendAt
}
batch = append(batch, batchFrame{Meta: fm, Compressed: b, IdxLineLen: len(line)})
batchBytes += len(b)

// Time-based send
if time.Since(lastSend) >= cfg.SendInterval || time.Since(lastSend) >= cfg.HardInterval {
trySend(cfg, httpClient, &batch, &batchBytes, &st, filepath.Base(st.IdxPath), &gz, lastSend, back)
trySend(cfg, httpClient, traceStore, &batch, &batchBytes, &st, filepath.Base(st.IdxPath), lastSend, back)
lastSend = st.LastSendAt
}
}
}

func trySend(cfg Config, httpClient *http.Client, batch *[]batchFrame, batchBytes *int, st *state, curIdxBase string, gz **os.File, lastSend time.Time, back *backoff) {
func trySend(cfg Config, httpClient *http.Client, traceStore *TraceStore, batch *[]batchFrame, batchBytes *int, st *state, curIdxBase string, lastSend time.Time, back *backoff) {
if len(*batch) == 0 {
return
}
// Resource gating (soft)
hard := time.Since(lastSend) >= cfg.HardInterval
if !hard && !resourcesOK(cfg) {
return
}

// Build payload
manifest := make([]FrameMeta, 0, len(*batch))
var advance int64
for _, fr := range *batch {
manifest = append(manifest, fr.Meta)
advance += int64(fr.IdxLineLen)
}
url := cfg.ServiceURL + walFramesEndpoint

ingestResp, ok := sendFrames(cfg, httpClient, *batch, curIdxBase, walFramesEndpoint, back)
if !ok {
return
}

logger.Info().
Int("frames", len(*batch)).
Int("failure_heights", len(ingestResp.FailureHeights)).
Msg("sent batch")

// Send trace for failure heights (read from SDK's trace directory)
if len(ingestResp.FailureHeights) > 0 {
sendTraceForHeights(cfg, httpClient, traceStore, ingestResp.FailureHeights, back)
}

// Commit state
if len(*batch) > 0 {
lastFrame := (*batch)[len(*batch)-1]
st.LastFile = lastFrame.Meta.File
st.LastFrame = lastFrame.Meta.Frame
}
st.IdxOffset += advance
st.LastSendAt = time.Now()
st.LastCommitAt = st.LastSendAt
_ = saveState(cfg.StateDir, *st)

*batch = (*batch)[:0]
*batchBytes = 0
back.Reset()
}

// sendFrames sends frames to the specified endpoint and returns the response.
func sendFrames(cfg Config, httpClient *http.Client, frames []batchFrame, curIdxBase string, endpoint string, back *backoff) (*IngestResponse, bool) {
manifest := make([]FrameMeta, 0, len(frames))
for _, fr := range frames {
manifest = append(manifest, fr.Meta)
}

var body bytes.Buffer
writer := multipart.NewWriter(&body)

// Log batch details before building payload
logger.Debug().
Int("batch_frames", len(*batch)).
Int("batch_bytes", *batchBytes).
Int("max_batch_bytes", cfg.MaxBatchBytes).
Msg("Building multipart payload")

manifestJSON, err := json.Marshal(manifest)
if err != nil {
logger.Error().Err(err).Msg("marshal manifest")
back.Sleep()
return
return nil, false
}
manifestPart, err := writer.CreateFormField("manifest")
if err != nil {
logger.Error().Err(err).Msg("create manifest field")
back.Sleep()
return
return nil, false
}
if _, err := manifestPart.Write(manifestJSON); err != nil {
logger.Error().Err(err).Msg("write manifest field")
back.Sleep()
return
return nil, false
}

framesPart, err := writer.CreateFormFile("frames", curIdxBase)
if err != nil {
logger.Error().Err(err).Msg("create frames field")
back.Sleep()
return
return nil, false
}
for _, fr := range *batch {
for _, fr := range frames {
if _, err := framesPart.Write(fr.Compressed); err != nil {
logger.Error().Err(err).Msg("write frames payload")
back.Sleep()
return
return nil, false
}
}
if err := writer.Close(); err != nil {
logger.Error().Err(err).Msg("finalize multipart payload")
back.Sleep()
return
return nil, false
}

// Log actual multipart body size
bodySize := body.Len()
logger.Debug().
Int("body_size_bytes", bodySize).
Int("body_size_mb", bodySize/(1<<20)).
Int("frames_in_manifest", len(manifest)).
Msg("Multipart payload ready")

url := cfg.ServiceURL + endpoint
req, err := http.NewRequest(http.MethodPost, url, &body)
if err != nil {
return
back.Sleep()
return nil, false
}
req.Header.Set("Authorization", "Bearer "+cfg.AuthKey)
req.Header.Set("Content-Type", writer.FormDataContentType())
Expand All @@ -284,39 +318,55 @@ func trySend(cfg Config, httpClient *http.Client, batch *[]batchFrame, batchByte

resp, err := httpClient.Do(req)
if err != nil {
logger.Error().Err(err).Msg("send batch")
logger.Error().Err(err).Msg("send request")
back.Sleep()
return
return nil, false
}
defer resp.Body.Close()

respBody, _ := io.ReadAll(resp.Body)
if resp.StatusCode/100 != 2 {
body, _ := io.ReadAll(resp.Body)
logger.Error().
Int("status", resp.StatusCode).
Str("body", string(body)).
Str("body", string(respBody)).
Msg("server returned error")
back.Sleep()
return
return nil, false
}

logger.Info().
Int("frames", len(*batch)).
Int("bytes", *batchBytes).
Int("size_mb", *batchBytes/(1<<20)).
Msg("sent batch")
var ingestResp IngestResponse
if err := json.Unmarshal(respBody, &ingestResp); err != nil {
logger.Warn().Err(err).Msg("failed to parse ingest response")
return &IngestResponse{Status: "ok"}, true
}

// Success: commit idx offset
st.IdxOffset += advance
st.LastFile = manifest[len(manifest)-1].File
st.LastFrame = manifest[len(manifest)-1].Frame
st.LastSendAt = time.Now()
st.LastCommitAt = st.LastSendAt
_ = saveState(cfg.StateDir, *st)
return &ingestResp, true
}

// reset batch
*batch = (*batch)[:0]
*batchBytes = 0
back.Reset()
// sendTraceForHeights sends trace data for the given failure heights.
func sendTraceForHeights(cfg Config, httpClient *http.Client, traceStore *TraceStore, heights []int64, back *backoff) {
compressed, metas, err := traceStore.LoadMultiple(heights)
if err != nil {
logger.Warn().Err(err).Msg("failed to load traces for failure heights")
return
}
if compressed == nil || len(metas) == 0 {
logger.Debug().Ints64("heights", heights).Msg("no trace data found for failure heights")
return
}

frames := []batchFrame{{
Meta: metas[0],
Compressed: compressed,
}}

_, ok := sendFrames(cfg, httpClient, frames, "trace.wal.gz", traceFramesEndpoint, back)
if ok {
logger.Info().
Ints64("heights", heights).
Int("bytes", len(compressed)).
Msg("sent trace for failure heights")
}
}

func hostname() string {
Expand Down
Loading