From e923da8a1c460c7a1494390db0d8d58fcda41f47 Mon Sep 17 00:00:00 2001 From: zuchka Date: Fri, 8 May 2026 15:21:28 -0700 Subject: [PATCH 1/8] feat(evaluator): parse 'over run' as run-lifetime windowed condition Co-Authored-By: Claude Opus 4.7 (1M context) --- internal/evaluator/condition.go | 33 ++++++++++-------- internal/evaluator/evaluator_test.go | 50 ++++++++++++++++++++++++++++ 2 files changed, 70 insertions(+), 13 deletions(-) diff --git a/internal/evaluator/condition.go b/internal/evaluator/condition.go index 39ea9ab..34ad080 100644 --- a/internal/evaluator/condition.go +++ b/internal/evaluator/condition.go @@ -10,16 +10,17 @@ import ( var ( reEventCond = regexp.MustCompile(`^value\s*(>|>=|<|<=|==|!=)\s*(-?\d+(?:\.\d+)?)$`) - reWindowedCond = regexp.MustCompile(`^(avg|max|min|count|sum)\(value\)\s+over\s+(\d+[smh])\s*(>|>=|<|<=|==|!=)\s*(-?\d+(?:\.\d+)?)$`) + reWindowedCond = regexp.MustCompile(`^(avg|max|min|count|sum)\(value\)\s+over\s+(\d+[smh]|run)\s*(>|>=|<|<=|==|!=)\s*(-?\d+(?:\.\d+)?)$`) ) // Condition is a parsed alert condition. type Condition struct { - Windowed bool - Op string - Literal float64 - Func string // windowed only: avg|max|min|count|sum - Window time.Duration // windowed only + Windowed bool + RunBounded bool // true when "over run"; Window is zero in this case + Op string + Literal float64 + Func string // windowed only: avg|max|min|count|sum + Window time.Duration // windowed only; zero when RunBounded } // ParseCondition parses a condition string from ding.yaml. @@ -30,12 +31,17 @@ func ParseCondition(s string) (Condition, error) { } if m := reWindowedCond.FindStringSubmatch(s); m != nil { fn := m[1] - dur, err := time.ParseDuration(m[2]) + windowToken := m[2] + op := m[3] + lit, _ := strconv.ParseFloat(m[4], 64) + if windowToken == "run" { + return Condition{Windowed: true, RunBounded: true, Func: fn, Window: 0, Op: op, Literal: lit}, nil + } + dur, err := time.ParseDuration(windowToken) if err != nil { return Condition{}, fmt.Errorf("invalid duration in condition %q: %w", s, err) } - lit, _ := strconv.ParseFloat(m[4], 64) - return Condition{Windowed: true, Func: fn, Window: dur, Op: m[3], Literal: lit}, nil + return Condition{Windowed: true, Func: fn, Window: dur, Op: op, Literal: lit}, nil } return Condition{}, fmt.Errorf("unrecognized condition syntax: %q", s) } @@ -82,9 +88,10 @@ type evalContext struct { // ID is the sequential integer assigned at parse time (zero-based per rule) and // is the same integer used as the middle segment of the buffer key. type windowedLeaf struct { - ID int - Func string - Window time.Duration + ID int + Func string + Window time.Duration + RunBounded bool } // leafExpr is a leaf node wrapping a single parsed Condition. @@ -106,7 +113,7 @@ func (l *leafExpr) eval(ctx evalContext) bool { func (l *leafExpr) collectWindowedLeaves() []windowedLeaf { if l.Windowed { - return []windowedLeaf{{ID: l.id, Func: l.Func, Window: l.Window}} + return []windowedLeaf{{ID: l.id, Func: l.Func, Window: l.Window, RunBounded: l.RunBounded}} } return nil } diff --git a/internal/evaluator/evaluator_test.go b/internal/evaluator/evaluator_test.go index ea57c56..2d84dc0 100644 --- a/internal/evaluator/evaluator_test.go +++ b/internal/evaluator/evaluator_test.go @@ -71,6 +71,56 @@ func TestParseCondition_WindowedNegative(t *testing.T) { } } +func TestParseCondition_OverRun(t *testing.T) { + c, err := evaluator.ParseCondition("avg(value) over run > 80") + if err != nil { + t.Fatal(err) + } + if !c.Windowed { + t.Fatal("expected windowed condition") + } + if !c.RunBounded { + t.Fatal("expected RunBounded=true") + } + if c.Func != "avg" || c.Op != ">" || c.Literal != 80 { + t.Errorf("unexpected condition: %+v", c) + } + if c.Window != 0 { + t.Errorf("expected Window=0 for over run, got %v", c.Window) + } +} + +func TestParseCondition_OverNm_NotRunBounded(t *testing.T) { + // Backward-compat: existing duration syntax must still parse with RunBounded=false. + c, err := evaluator.ParseCondition("avg(value) over 5m > 80") + if err != nil { + t.Fatal(err) + } + if c.RunBounded { + t.Fatal("expected RunBounded=false for over 5m") + } + if c.Window != 5*time.Minute { + t.Errorf("expected Window=5m, got %v", c.Window) + } +} + +func TestParseCondition_OverRunAllAggregators(t *testing.T) { + for _, fn := range []string{"avg", "max", "min", "count", "sum"} { + input := fn + "(value) over run > 0" + c, err := evaluator.ParseCondition(input) + if err != nil { + t.Errorf("%s: parse error: %v", input, err) + continue + } + if !c.RunBounded { + t.Errorf("%s: expected RunBounded=true", input) + } + if c.Func != fn { + t.Errorf("%s: got Func=%s", input, c.Func) + } + } +} + func TestParseCondition_Invalid(t *testing.T) { _, err := evaluator.ParseCondition("value OVER 95") if err == nil { From f5ff42a7a911d4ef4d36f76321572eb4cc0caf97 Mon Sep 17 00:00:00 2001 From: zuchka Date: Fri, 8 May 2026 15:29:07 -0700 Subject: [PATCH 2/8] feat(evaluator): runBounded flag on RingBuffer skips time eviction Co-Authored-By: Claude Opus 4.7 (1M context) --- internal/evaluator/engine.go | 2 +- internal/evaluator/evaluator_test.go | 73 ++++++++++++++++++++++++++-- internal/evaluator/ringbuffer.go | 24 ++++++--- internal/evaluator/state.go | 2 +- 4 files changed, 88 insertions(+), 13 deletions(-) diff --git a/internal/evaluator/engine.go b/internal/evaluator/engine.go index 5e85173..905ad39 100644 --- a/internal/evaluator/engine.go +++ b/internal/evaluator/engine.go @@ -370,7 +370,7 @@ func (e *Engine) getOrCreateBuffer(key string, window time.Duration) *RingBuffer if buf, ok := e.buffers[key]; ok { return buf } - buf := NewRingBuffer(window, e.maxBuf) + buf := NewRingBuffer(window, e.maxBuf, false) e.buffers[key] = buf return buf } diff --git a/internal/evaluator/evaluator_test.go b/internal/evaluator/evaluator_test.go index 2d84dc0..6187499 100644 --- a/internal/evaluator/evaluator_test.go +++ b/internal/evaluator/evaluator_test.go @@ -163,7 +163,7 @@ func TestMatch_EmptyMatchBlock(t *testing.T) { // ---- Ring buffer ---- func TestRingBuffer_EvictsOldEntries(t *testing.T) { - rb := evaluator.NewRingBuffer(5*time.Minute, 1000) + rb := evaluator.NewRingBuffer(5*time.Minute, 1000, false) now := time.Now() rb.Add(10.0, now.Add(-6*time.Minute)) // old, should be evicted @@ -178,7 +178,7 @@ func TestRingBuffer_EvictsOldEntries(t *testing.T) { } func TestRingBuffer_MaxSize(t *testing.T) { - rb := evaluator.NewRingBuffer(5*time.Minute, 3) + rb := evaluator.NewRingBuffer(5*time.Minute, 3, false) now := time.Now() rb.Add(1.0, now) rb.Add(2.0, now) @@ -191,7 +191,7 @@ func TestRingBuffer_MaxSize(t *testing.T) { } func TestRingBuffer_EmptyReturnsZeroAndFalse(t *testing.T) { - rb := evaluator.NewRingBuffer(5*time.Minute, 1000) + rb := evaluator.NewRingBuffer(5*time.Minute, 1000, false) now := time.Now() if rb.HasEntries(now) { t.Error("empty buffer should have no entries") @@ -199,7 +199,7 @@ func TestRingBuffer_EmptyReturnsZeroAndFalse(t *testing.T) { } func TestRingBuffer_Aggregates(t *testing.T) { - rb := evaluator.NewRingBuffer(5*time.Minute, 1000) + rb := evaluator.NewRingBuffer(5*time.Minute, 1000, false) now := time.Now() for _, v := range []float64{10, 20, 30, 40, 50} { rb.Add(v, now) @@ -221,6 +221,71 @@ func TestRingBuffer_Aggregates(t *testing.T) { } } +func TestRingBuffer_RunBounded_NoTimeEviction(t *testing.T) { + // A run-bounded buffer must NOT evict by time, regardless of how far + // "now" advances past the entries' timestamps. + rb := evaluator.NewRingBuffer(0, 1000, true) + t0 := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC) + rb.Add(10, t0) + rb.Add(20, t0.Add(1*time.Hour)) + rb.Add(30, t0.Add(24*time.Hour)) + + // Query 30 days later. A wall-clock buffer would evict everything. + now := t0.Add(30 * 24 * time.Hour) + if got := rb.Count(now); got != 3 { + t.Errorf("expected Count=3 (no time eviction), got %v", got) + } + if got := rb.Avg(now); got != 20 { + t.Errorf("expected Avg=20, got %v", got) + } + if got := rb.Min(now); got != 10 { + t.Errorf("expected Min=10, got %v", got) + } + if got := rb.Max(now); got != 30 { + t.Errorf("expected Max=30, got %v", got) + } +} + +func TestRingBuffer_RunBounded_RespectsMaxSize(t *testing.T) { + // Run-bounded does NOT mean unbounded memory; maxSize still clips + // from oldest. This protects ding from runaway buffers in long runs. + rb := evaluator.NewRingBuffer(0, 3, true) + t0 := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC) + rb.Add(1, t0) + rb.Add(2, t0.Add(1*time.Second)) + rb.Add(3, t0.Add(2*time.Second)) + rb.Add(4, t0.Add(3*time.Second)) + rb.Add(5, t0.Add(4*time.Second)) + + now := t0.Add(1 * time.Hour) + if got := rb.Count(now); got != 3 { + t.Errorf("expected Count=3 (maxSize cap), got %v", got) + } + // Oldest two (1, 2) are dropped; remaining are 3, 4, 5. + if got := rb.Min(now); got != 3 { + t.Errorf("expected Min=3, got %v", got) + } + if got := rb.Max(now); got != 5 { + t.Errorf("expected Max=5, got %v", got) + } +} + +func TestRingBuffer_WallClock_StillEvicts(t *testing.T) { + // Backward-compat: a buffer constructed with runBounded=false must + // continue to evict by time. + rb := evaluator.NewRingBuffer(5*time.Minute, 1000, false) + t0 := time.Date(2026, 1, 1, 0, 0, 0, 0, time.UTC) + rb.Add(10, t0) + rb.Add(20, t0.Add(10*time.Minute)) // outside any 5-minute window of t0 + now := t0.Add(11 * time.Minute) + + // At now=t0+11m, the cutoff is now-5m = t0+6m. Only the second entry + // (at t0+10m) is after the cutoff. The first (at t0) is evicted. + if got := rb.Count(now); got != 1 { + t.Errorf("expected Count=1 (wall-clock eviction), got %v", got) + } +} + // ---- Cooldown ---- func TestCooldown_NotActive(t *testing.T) { diff --git a/internal/evaluator/ringbuffer.go b/internal/evaluator/ringbuffer.go index d30ea34..21064bd 100644 --- a/internal/evaluator/ringbuffer.go +++ b/internal/evaluator/ringbuffer.go @@ -11,17 +11,23 @@ type entry struct { } // RingBuffer is a time-based sliding window of float64 values. +// When runBounded is true, evict() is a no-op — the buffer accumulates +// all entries (subject to maxSize) for the lifetime of the process, +// implementing "over run" condition semantics. // Thread-safe. type RingBuffer struct { - mu sync.Mutex - entries []entry - window time.Duration - maxSize int + mu sync.Mutex + entries []entry + window time.Duration + maxSize int + runBounded bool } -// NewRingBuffer creates a new RingBuffer with the given window and max size. -func NewRingBuffer(window time.Duration, maxSize int) *RingBuffer { - return &RingBuffer{window: window, maxSize: maxSize} +// NewRingBuffer creates a new RingBuffer. When runBounded is true, the +// window argument is ignored and entries are never evicted by time +// (only by maxSize, oldest-first). +func NewRingBuffer(window time.Duration, maxSize int, runBounded bool) *RingBuffer { + return &RingBuffer{window: window, maxSize: maxSize, runBounded: runBounded} } // Add inserts a new value observed at time at. @@ -36,7 +42,11 @@ func (rb *RingBuffer) Add(value float64, at time.Time) { } // evict removes entries older than the window. Must be called with lock held. +// Run-bounded buffers do not evict by time; this method is a no-op for them. func (rb *RingBuffer) evict(now time.Time) { + if rb.runBounded { + return + } cutoff := now.Add(-rb.window) i := 0 for i < len(rb.entries) && rb.entries[i].at.Before(cutoff) { diff --git a/internal/evaluator/state.go b/internal/evaluator/state.go index e1ca9f8..980ea51 100644 --- a/internal/evaluator/state.go +++ b/internal/evaluator/state.go @@ -72,7 +72,7 @@ func SnapshotEngine(e *Engine) StateSnapshot { func RestoreEngine(e *Engine, snap StateSnapshot, now time.Time) { e.bufMu.Lock() for key, bs := range snap.Buffers { - rb := NewRingBuffer(bs.Window, bs.MaxSize) + rb := NewRingBuffer(bs.Window, bs.MaxSize, false) cutoff := now.Add(-bs.Window) for _, ent := range bs.Entries { if ent.At.After(cutoff) { From 0069b51ca93bda98e9021748572892005647bc98 Mon Sep 17 00:00:00 2001 From: zuchka Date: Fri, 8 May 2026 15:34:41 -0700 Subject: [PATCH 3/8] feat(evaluator): plumb RunBounded from condition leaf to ring buffer Co-Authored-By: Claude Sonnet 4.6 --- internal/evaluator/end_of_run_test.go | 92 +++++++++++++++++++++++++++ internal/evaluator/engine.go | 9 +-- 2 files changed, 97 insertions(+), 4 deletions(-) diff --git a/internal/evaluator/end_of_run_test.go b/internal/evaluator/end_of_run_test.go index 263a95f..6ee95a3 100644 --- a/internal/evaluator/end_of_run_test.go +++ b/internal/evaluator/end_of_run_test.go @@ -214,6 +214,98 @@ func TestProcess_DuringRunStillFiresAlongsideEndOfRun(t *testing.T) { } } +// TestProcessEndOfRun_OverRun_AggregatesAcrossWholeRun verifies that a rule +// with `condition: avg(value) over run > X` and `mode: end-of-run` aggregates +// across the entire run, regardless of how long the run lasted, and fires +// correctly at run exit. Distinguishes "over run" from "over Nm" by spanning +// timestamps wider than any plausible wall-clock window. +func TestProcessEndOfRun_OverRun_AggregatesAcrossWholeRun(t *testing.T) { + rules := []EngineRule{ + { + Name: "whole_run_avg", + Match: map[string]string{"metric": "mem"}, + Condition: "avg(value) over run > 50", + Message: "avg mem was {{ .avg }}", + Alerts: []string{"stdout"}, + Mode: "end-of-run", + }, + } + eng, err := NewEngine(rules, 1000) + if err != nil { + t.Fatalf("NewEngine: %v", err) + } + + // Feed events spanning 2 hours — far beyond any wall-clock window. + t0 := time.Now() + eng.Process(ingester.Event{Metric: "mem", Value: 40, At: t0}, t0) + eng.Process(ingester.Event{Metric: "mem", Value: 60, At: t0.Add(1 * time.Hour)}, t0.Add(1*time.Hour)) + eng.Process(ingester.Event{Metric: "mem", Value: 80, At: t0.Add(2 * time.Hour)}, t0.Add(2*time.Hour)) + + // At end-of-run the buffer should still hold all three entries: + // avg(40, 60, 80) = 60, which is > 50. + alerts := eng.ProcessEndOfRun(t0.Add(2 * time.Hour)) + if len(alerts) != 1 { + t.Fatalf("expected 1 end-of-run alert, got %d", len(alerts)) + } + if alerts[0].Avg != 60 { + t.Errorf("alert.Avg = %v, want 60 (avg of full run)", alerts[0].Avg) + } + if alerts[0].Count != 3 { + t.Errorf("alert.Count = %v, want 3 (all events in run)", alerts[0].Count) + } +} + +// TestProcess_OverRun_FiresMidRunWithCooldown verifies the orthogonal +// composition: `over run` without `mode: end-of-run` fires mid-run when +// the threshold crosses, and cooldown prevents repeated firing. +func TestProcess_OverRun_FiresMidRunWithCooldown(t *testing.T) { + rules := []EngineRule{ + { + Name: "errors_pile_up", + Match: map[string]string{"metric": "errors"}, + Condition: "count(value) over run > 2", + Cooldown: 10 * time.Minute, + Message: "errors: {{ .count }}", + Alerts: []string{"stdout"}, + }, + } + eng, err := NewEngine(rules, 1000) + if err != nil { + t.Fatalf("NewEngine: %v", err) + } + + t0 := time.Now() + // Events 1, 2 — count is at most 2, condition (count > 2) false. + for i := 0; i < 2; i++ { + alerts := eng.Process(ingester.Event{ + Metric: "errors", Value: 1, + At: t0.Add(time.Duration(i) * time.Second), + }, t0.Add(time.Duration(i)*time.Second)) + if len(alerts) > 0 { + t.Fatalf("event %d: rule fired prematurely (count=%v)", i, alerts[0].Count) + } + } + // Event 3 — count crosses to 3, condition true, alert fires. + alerts := eng.Process(ingester.Event{ + Metric: "errors", Value: 1, + At: t0.Add(2 * time.Second), + }, t0.Add(2*time.Second)) + if len(alerts) != 1 { + t.Fatalf("expected 1 alert at event 3, got %d", len(alerts)) + } + if alerts[0].Count != 3 { + t.Errorf("alert.Count = %v, want 3", alerts[0].Count) + } + // Event 4 — count is 4, condition true, but cooldown blocks the alert. + alerts = eng.Process(ingester.Event{ + Metric: "errors", Value: 1, + At: t0.Add(3 * time.Second), + }, t0.Add(3*time.Second)) + if len(alerts) != 0 { + t.Errorf("expected cooldown to block alert at event 4, got %d", len(alerts)) + } +} + // TestParseLabelKey verifies that the labelKey reverser handles the formats // produced by LabelSetKey. func TestParseLabelKey(t *testing.T) { diff --git a/internal/evaluator/engine.go b/internal/evaluator/engine.go index 905ad39..4363366 100644 --- a/internal/evaluator/engine.go +++ b/internal/evaluator/engine.go @@ -135,7 +135,7 @@ func (e *Engine) Process(event ingester.Event, now time.Time) []Alert { } for _, leaf := range leaves { leafBufKey := rule.Name + ":" + strconv.Itoa(leaf.ID) + ":" + labelKey - buf := e.getOrCreateBuffer(leafBufKey, leaf.Window) + buf := e.getOrCreateBuffer(leafBufKey, leaf.Window, leaf.RunBounded) buf.Add(event.Value, event.At) if buf.HasEntries(now) { ctx.Available[leaf.ID] = true @@ -189,7 +189,7 @@ func (e *Engine) Process(event ingester.Event, now time.Time) []Alert { if len(leaves) == 1 { leaf := leaves[0] leafBufKey := rule.Name + ":" + strconv.Itoa(leaf.ID) + ":" + labelKey - buf := e.getOrCreateBuffer(leafBufKey, leaf.Window) + buf := e.getOrCreateBuffer(leafBufKey, leaf.Window, leaf.RunBounded) alert.Avg = buf.Avg(now) alert.Max = buf.Max(now) alert.Min = buf.Min(now) @@ -364,13 +364,14 @@ func (e *Engine) trackLabelKey(ruleName, labelKey string) { // getOrCreateBuffer returns the ring buffer for a buffer key, creating it if needed. // Uses bufMu independently of the RWMutex so it is safe to call from Process() under RLock. -func (e *Engine) getOrCreateBuffer(key string, window time.Duration) *RingBuffer { +// runBounded is honored only on first creation; subsequent calls return the existing buffer. +func (e *Engine) getOrCreateBuffer(key string, window time.Duration, runBounded bool) *RingBuffer { e.bufMu.Lock() defer e.bufMu.Unlock() if buf, ok := e.buffers[key]; ok { return buf } - buf := NewRingBuffer(window, e.maxBuf, false) + buf := NewRingBuffer(window, e.maxBuf, runBounded) e.buffers[key] = buf return buf } From 417d9f7a52e3a32f88723bbf5d2918d5cf434f7a Mon Sep 17 00:00:00 2001 From: zuchka Date: Fri, 8 May 2026 15:40:02 -0700 Subject: [PATCH 4/8] feat(evaluator): round-trip RunBounded through snapshot/restore Co-Authored-By: Claude Opus 4.7 (1M context) --- internal/evaluator/state.go | 29 ++++++++++------ internal/evaluator/state_test.go | 59 ++++++++++++++++++++++++++++++++ 2 files changed, 78 insertions(+), 10 deletions(-) diff --git a/internal/evaluator/state.go b/internal/evaluator/state.go index 980ea51..0689e09 100644 --- a/internal/evaluator/state.go +++ b/internal/evaluator/state.go @@ -17,9 +17,10 @@ type StateSnapshot struct { } type BufferSnapshot struct { - Window time.Duration `json:"window_ns"` - MaxSize int `json:"max_size"` - Entries []EntrySnapshot `json:"entries"` + Window time.Duration `json:"window_ns"` + MaxSize int `json:"max_size"` + RunBounded bool `json:"run_bounded,omitempty"` + Entries []EntrySnapshot `json:"entries"` } type EntrySnapshot struct { @@ -47,9 +48,10 @@ func SnapshotEngine(e *Engine) StateSnapshot { entries[i] = EntrySnapshot{Value: ent.value, At: ent.at.UTC()} } snap.Buffers[key] = BufferSnapshot{ - Window: rb.window, - MaxSize: rb.maxSize, - Entries: entries, + Window: rb.window, + MaxSize: rb.maxSize, + RunBounded: rb.runBounded, + Entries: entries, } rb.mu.Unlock() } @@ -72,12 +74,19 @@ func SnapshotEngine(e *Engine) StateSnapshot { func RestoreEngine(e *Engine, snap StateSnapshot, now time.Time) { e.bufMu.Lock() for key, bs := range snap.Buffers { - rb := NewRingBuffer(bs.Window, bs.MaxSize, false) - cutoff := now.Add(-bs.Window) - for _, ent := range bs.Entries { - if ent.At.After(cutoff) { + rb := NewRingBuffer(bs.Window, bs.MaxSize, bs.RunBounded) + if bs.RunBounded { + // Run-bounded buffers don't evict by time; restore all entries. + for _, ent := range bs.Entries { rb.entries = append(rb.entries, entry{value: ent.Value, at: ent.At}) } + } else { + cutoff := now.Add(-bs.Window) + for _, ent := range bs.Entries { + if ent.At.After(cutoff) { + rb.entries = append(rb.entries, entry{value: ent.Value, at: ent.At}) + } + } } if len(rb.entries) > 0 { e.buffers[key] = rb diff --git a/internal/evaluator/state_test.go b/internal/evaluator/state_test.go index 7ab2416..ef8fb5c 100644 --- a/internal/evaluator/state_test.go +++ b/internal/evaluator/state_test.go @@ -408,3 +408,62 @@ func keysOf(m map[string]evaluator.BufferSnapshot) []string { } return keys } + +// TestSnapshotRestore_RunBoundedRoundtrips verifies that a run-bounded +// buffer's RunBounded flag and full entry list survive a snapshot/restore +// cycle. The wall-clock cutoff filter must NOT be applied during restore +// for run-bounded buffers — otherwise all entries older than `now` would +// be dropped on restore. +func TestSnapshotRestore_RunBoundedRoundtrips(t *testing.T) { + rules := []evaluator.EngineRule{ + { + Name: "whole_run_avg", + Match: map[string]string{"metric": "mem"}, + Condition: "avg(value) over run > 50", + Mode: "end-of-run", + Alerts: []string{"stdout"}, + }, + } + eng1, err := evaluator.NewEngine(rules, 1000) + if err != nil { + t.Fatalf("NewEngine: %v", err) + } + + t0 := time.Now().Add(-1 * time.Hour) // entries are 1h old + eng1.Process(ingester.Event{Metric: "mem", Value: 60, At: t0}, t0) + eng1.Process(ingester.Event{Metric: "mem", Value: 80, At: t0.Add(30 * time.Minute)}, t0.Add(30*time.Minute)) + + snap := evaluator.SnapshotEngine(eng1) + if len(snap.Buffers) == 0 { + t.Fatal("expected at least one buffer in snapshot") + } + for key, bs := range snap.Buffers { + if !bs.RunBounded { + t.Errorf("snapshot buffer %q has RunBounded=false; expected true", key) + } + if len(bs.Entries) != 2 { + t.Errorf("snapshot buffer %q has %d entries; expected 2", key, len(bs.Entries)) + } + } + + // Restore into a fresh engine. Entries are 1 hour old; if the cutoff + // filter were applied to a run-bounded buffer it would drop them. + eng2, err := evaluator.NewEngine(rules, 1000) + if err != nil { + t.Fatalf("NewEngine: %v", err) + } + evaluator.RestoreEngine(eng2, snap, time.Now()) + + snap2 := evaluator.SnapshotEngine(eng2) + if len(snap2.Buffers) == 0 { + t.Fatal("expected restored buffer to have entries") + } + for key, bs := range snap2.Buffers { + if !bs.RunBounded { + t.Errorf("restored buffer %q has RunBounded=false; expected true", key) + } + if len(bs.Entries) != 2 { + t.Errorf("restored buffer %q has %d entries; expected 2 (cutoff must not apply)", key, len(bs.Entries)) + } + } +} From 777057105c45e7d36e97c40370aeceb26ea45c20 Mon Sep 17 00:00:00 2001 From: zuchka Date: Fri, 8 May 2026 15:43:53 -0700 Subject: [PATCH 5/8] test(cli): end-to-end test-rule verification for over-run windowing Co-Authored-By: Claude Sonnet 4.6 --- internal/cli/test_rule_test.go | 44 ++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/internal/cli/test_rule_test.go b/internal/cli/test_rule_test.go index 6a57af7..e58a5af 100644 --- a/internal/cli/test_rule_test.go +++ b/internal/cli/test_rule_test.go @@ -90,6 +90,50 @@ rules: } } +func TestRunTestRule_OverRunWindow_AggregatesAcrossRun(t *testing.T) { + dir := t.TempDir() + cfg := filepath.Join(dir, "ding.yaml") + if err := os.WriteFile(cfg, []byte(` +notifiers: + slack: + type: webhook + url: https://example.invalid/webhook +rules: + - name: high_avg_mem + match: { metric: mem } + condition: avg(value) over run > 50 + mode: end-of-run + message: "avg mem: {{ .avg }}" + alert: + - notifier: slack +`), 0644); err != nil { + t.Fatalf("write config: %v", err) + } + + // Three events spanning 2 hours — far wider than any wall-clock window + // the test would plausibly use. avg(40, 60, 80) = 60 > 50, so the + // end-of-run rule should fire with avg=60. + events := `{"metric":"mem","value":40,"timestamp":"2026-05-08T10:00:00Z"} +{"metric":"mem","value":60,"timestamp":"2026-05-08T11:00:00Z"} +{"metric":"mem","value":80,"timestamp":"2026-05-08T12:00:00Z"} +` + in := strings.NewReader(events) + var out, errBuf bytes.Buffer + + err := runTestRule(cfg, "text", false, in, &out, &errBuf) + if err != nil { + t.Fatalf("runTestRule: %v\nstderr: %s", err, errBuf.String()) + } + + got := out.String() + if !strings.Contains(got, "high_avg_mem") { + t.Errorf("expected high_avg_mem rule to fire at end-of-run:\n%s", got) + } + if !strings.Contains(got, "avg mem: 60") { + t.Errorf("expected rendered message to show avg=60 across whole run:\n%s", got) + } +} + func TestRunTestRule_NoMatch_SilentStdout(t *testing.T) { dir := t.TempDir() cfg := writeFixtureConfig(t, dir) From 4dcfb28f36f0629970ca48045cb7bcd98e6a8cf8 Mon Sep 17 00:00:00 2001 From: zuchka Date: Fri, 8 May 2026 15:47:05 -0700 Subject: [PATCH 6/8] test(cli): assert single fire for end-of-run over-run rule Co-Authored-By: Claude Sonnet 4.6 --- internal/cli/test_rule_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/internal/cli/test_rule_test.go b/internal/cli/test_rule_test.go index e58a5af..36845b1 100644 --- a/internal/cli/test_rule_test.go +++ b/internal/cli/test_rule_test.go @@ -132,6 +132,9 @@ rules: if !strings.Contains(got, "avg mem: 60") { t.Errorf("expected rendered message to show avg=60 across whole run:\n%s", got) } + if got := strings.Count(out.String(), "would fire"); got != 1 { + t.Errorf("expected exactly 1 fire (mode: end-of-run), got %d:\n%s", got, out.String()) + } } func TestRunTestRule_NoMatch_SilentStdout(t *testing.T) { From 9c18ea2b566334acaf1edca063e14333dd3d6420 Mon Sep 17 00:00:00 2001 From: zuchka Date: Fri, 8 May 2026 15:48:15 -0700 Subject: [PATCH 7/8] test(cli): verify wall-clock eviction in test-rule windowed path Co-Authored-By: Claude Opus 4.7 (1M context) --- internal/cli/test_rule_test.go | 51 ++++++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/internal/cli/test_rule_test.go b/internal/cli/test_rule_test.go index 36845b1..25ea6eb 100644 --- a/internal/cli/test_rule_test.go +++ b/internal/cli/test_rule_test.go @@ -137,6 +137,57 @@ rules: } } +func TestRunTestRule_WindowedRule_EvictsOldestBeforeWindow(t *testing.T) { + // Sibling to TestRunTestRule_WindowedRule_FiresAfterEnoughEvents. + // That test fits all events inside the 5m window so it doesn't + // exercise eviction. This one spans the boundary: events at t0 and + // t0+10m with a 5m window — the older event must be evicted by the + // time the newer one is processed, leaving the rule's avg computed + // over the second event alone. + dir := t.TempDir() + cfg := filepath.Join(dir, "ding.yaml") + if err := os.WriteFile(cfg, []byte(` +notifiers: + slack: + type: webhook + url: https://example.invalid/webhook +rules: + - name: hot_avg + match: { metric: temp } + condition: avg(value) over 5m > 50 + message: "avg high: {{ .avg }}" + alert: + - notifier: slack +`), 0644); err != nil { + t.Fatalf("write config: %v", err) + } + + // First event at t0 is well below threshold (10). + // Second event 10 minutes later is above threshold (90). + // With a 5m window, the first event is evicted before the second + // is evaluated. The avg of {90} alone is 90, condition true. + // If eviction were broken (no eviction), avg would be (10+90)/2=50, + // which is NOT > 50, and the rule would NOT fire. + events := `{"metric":"temp","value":10,"timestamp":"2026-05-08T10:00:00Z"} +{"metric":"temp","value":90,"timestamp":"2026-05-08T10:10:00Z"} +` + in := strings.NewReader(events) + var out, errBuf bytes.Buffer + + err := runTestRule(cfg, "text", false, in, &out, &errBuf) + if err != nil { + t.Fatalf("runTestRule: %v\nstderr: %s", err, errBuf.String()) + } + + got := out.String() + if !strings.Contains(got, "hot_avg") { + t.Errorf("expected hot_avg to fire (eviction should drop the first event so avg=90):\n%s", got) + } + if !strings.Contains(got, "avg high: 90") { + t.Errorf("expected avg=90 (only second event remains after 5m eviction):\n%s", got) + } +} + func TestRunTestRule_NoMatch_SilentStdout(t *testing.T) { dir := t.TempDir() cfg := writeFixtureConfig(t, dir) From 603e0a0c0f4fc58305cacbd8be038d4172537d24 Mon Sep 17 00:00:00 2001 From: zuchka Date: Fri, 8 May 2026 15:51:19 -0700 Subject: [PATCH 8/8] docs: over run window-mode (configuration.md + ding.yaml.example) Co-Authored-By: Claude Sonnet 4.6 --- ding.yaml.example | 12 +++++++++++ docs/configuration.md | 46 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 58 insertions(+) diff --git a/ding.yaml.example b/ding.yaml.example index 9ae3800..b1be8a7 100644 --- a/ding.yaml.example +++ b/ding.yaml.example @@ -146,3 +146,15 @@ rules: # message: "job failed: exit code {{ .value }} after {{ .duration_seconds }}s" # alert: # - notifier: github_actions + + # Whole-run aggregate alert — fires once at run exit if avg memory across + # the entire run exceeded 80%. The "over run" window is bounded by the + # ding run subprocess lifetime; no events are evicted by wall-clock time. + # - name: high_avg_mem + # match: + # metric: mem_pct + # condition: avg(value) over run > 80 + # mode: end-of-run + # message: "avg memory was {{ .avg }}% across the run" + # alert: + # - notifier: slack diff --git a/docs/configuration.md b/docs/configuration.md index d3e94cd..bb0a8c6 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -266,6 +266,7 @@ A list of alerting rules. Rules are evaluated independently; each has its own co | `match.metric` | string | no | Metric name filter | | `condition` | string | yes | Evaluation expression (see below) | | `cooldown` | duration | no | Minimum time between consecutive alerts for the same label-set | +| `mode` | string | no | Set to `end-of-run` to defer evaluation until `ding run` exits; omit for immediate (mid-run) evaluation | | `message` | string | no | Alert message template (Go `text/template` syntax) | | `alert` | list | yes | List of `{notifier: }` targets | @@ -301,6 +302,51 @@ value < 5 OR count(value) over 1m > 100 Comparison operators: `>`, `>=`, `<`, `<=`, `==`, `!=` +#### Run-lifetime windows: `over run` + +In addition to wall-clock durations like `over 5m`, the windowed-condition +grammar accepts the literal `run`, which bounds the window to the lifetime +of the `ding run` subprocess. Run-bounded windows do not evict entries by +time — every event observed during the run is included in the aggregate, +subject only to the configured `max_buffer_size` cap. + +```yaml +rules: + # Whole-run aggregate, fires once at exit + - name: high_avg_mem + match: { metric: mem_pct } + condition: avg(value) over run > 80 + mode: end-of-run + message: "avg memory was {{ .avg }}% across the run" + + # Run-bounded sliding window, fires mid-run on threshold cross + - name: errors_pile_up + match: { metric: errors } + condition: count(value) over run > 10 + cooldown: 30s + message: "errors in this run: {{ .count }}" +``` + +The behavior matrix: + +| condition window | `mode: end-of-run`? | result | +|---|---|---| +| `over 5m` | no | wall-clock sliding (default) | +| `over 5m` | yes | aggregate of last 5m of run, fires at exit | +| `over run` | no | run-bounded sliding, fires mid-run when threshold crosses | +| `over run` | yes | whole-run aggregate, fires once at exit | + +**Cooldown caveat.** Aggregates like `count` are monotonically non-decreasing +under `over run` — once `count > 10`, it stays `> 10`. Without `mode: +end-of-run` or a `cooldown:`, such a rule fires on every subsequent matching +event. Pair `over run` mid-run rules with a meaningful `cooldown:` (or use +`mode: end-of-run` for fire-once-at-exit semantics). + +**`ding serve` mode.** `over run` is supported syntactically in the daemon +mode, where it means "since daemon start" (the buffer accumulates indefinitely, +capped by `max_buffer_size`). The wedge use case is `ding run`; prefer +wall-clock windows in long-running serve deployments. + ### Message template variables | Variable | Available | Description |