Skip to content
12 changes: 12 additions & 0 deletions ding.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -146,3 +146,15 @@ rules:
# message: "job failed: exit code {{ .value }} after {{ .duration_seconds }}s"
# alert:
# - notifier: github_actions

# Whole-run aggregate alert — fires once at run exit if avg memory across
# the entire run exceeded 80%. The "over run" window is bounded by the
# ding run subprocess lifetime; no events are evicted by wall-clock time.
# - name: high_avg_mem
# match:
# metric: mem_pct
# condition: avg(value) over run > 80
# mode: end-of-run
# message: "avg memory was {{ .avg }}% across the run"
# alert:
# - notifier: slack
46 changes: 46 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ A list of alerting rules. Rules are evaluated independently; each has its own co
| `match.metric` | string | no | Metric name filter |
| `condition` | string | yes | Evaluation expression (see below) |
| `cooldown` | duration | no | Minimum time between consecutive alerts for the same label-set |
| `mode` | string | no | Set to `end-of-run` to defer evaluation until `ding run` exits; omit for immediate (mid-run) evaluation |
| `message` | string | no | Alert message template (Go `text/template` syntax) |
| `alert` | list | yes | List of `{notifier: <name>}` targets |

Expand Down Expand Up @@ -301,6 +302,51 @@ value < 5 OR count(value) over 1m > 100

Comparison operators: `>`, `>=`, `<`, `<=`, `==`, `!=`

#### Run-lifetime windows: `over run`

In addition to wall-clock durations like `over 5m`, the windowed-condition
grammar accepts the literal `run`, which bounds the window to the lifetime
of the `ding run` subprocess. Run-bounded windows do not evict entries by
time — every event observed during the run is included in the aggregate,
subject only to the configured `max_buffer_size` cap.

```yaml
rules:
# Whole-run aggregate, fires once at exit
- name: high_avg_mem
match: { metric: mem_pct }
condition: avg(value) over run > 80
mode: end-of-run
message: "avg memory was {{ .avg }}% across the run"

# Run-bounded sliding window, fires mid-run on threshold cross
- name: errors_pile_up
match: { metric: errors }
condition: count(value) over run > 10
cooldown: 30s
message: "errors in this run: {{ .count }}"
```

The behavior matrix:

| condition window | `mode: end-of-run`? | result |
|---|---|---|
| `over 5m` | no | wall-clock sliding (default) |
| `over 5m` | yes | aggregate of last 5m of run, fires at exit |
| `over run` | no | run-bounded sliding, fires mid-run when threshold crosses |
| `over run` | yes | whole-run aggregate, fires once at exit |

**Cooldown caveat.** Aggregates like `count` are monotonically non-decreasing
under `over run` — once `count > 10`, it stays `> 10`. Without `mode:
end-of-run` or a `cooldown:`, such a rule fires on every subsequent matching
event. Pair `over run` mid-run rules with a meaningful `cooldown:` (or use
`mode: end-of-run` for fire-once-at-exit semantics).

**`ding serve` mode.** `over run` is supported syntactically in the daemon
mode, where it means "since daemon start" (the buffer accumulates indefinitely,
capped by `max_buffer_size`). The wedge use case is `ding run`; prefer
wall-clock windows in long-running serve deployments.

### Message template variables

| Variable | Available | Description |
Expand Down
98 changes: 98 additions & 0 deletions internal/cli/test_rule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,104 @@ rules:
}
}

func TestRunTestRule_OverRunWindow_AggregatesAcrossRun(t *testing.T) {
dir := t.TempDir()
cfg := filepath.Join(dir, "ding.yaml")
if err := os.WriteFile(cfg, []byte(`
notifiers:
slack:
type: webhook
url: https://example.invalid/webhook
rules:
- name: high_avg_mem
match: { metric: mem }
condition: avg(value) over run > 50
mode: end-of-run
message: "avg mem: {{ .avg }}"
alert:
- notifier: slack
`), 0644); err != nil {
t.Fatalf("write config: %v", err)
}

// Three events spanning 2 hours — far wider than any wall-clock window
// the test would plausibly use. avg(40, 60, 80) = 60 > 50, so the
// end-of-run rule should fire with avg=60.
events := `{"metric":"mem","value":40,"timestamp":"2026-05-08T10:00:00Z"}
{"metric":"mem","value":60,"timestamp":"2026-05-08T11:00:00Z"}
{"metric":"mem","value":80,"timestamp":"2026-05-08T12:00:00Z"}
`
in := strings.NewReader(events)
var out, errBuf bytes.Buffer

err := runTestRule(cfg, "text", false, in, &out, &errBuf)
if err != nil {
t.Fatalf("runTestRule: %v\nstderr: %s", err, errBuf.String())
}

got := out.String()
if !strings.Contains(got, "high_avg_mem") {
t.Errorf("expected high_avg_mem rule to fire at end-of-run:\n%s", got)
}
if !strings.Contains(got, "avg mem: 60") {
t.Errorf("expected rendered message to show avg=60 across whole run:\n%s", got)
}
if got := strings.Count(out.String(), "would fire"); got != 1 {
t.Errorf("expected exactly 1 fire (mode: end-of-run), got %d:\n%s", got, out.String())
}
}

func TestRunTestRule_WindowedRule_EvictsOldestBeforeWindow(t *testing.T) {
// Sibling to TestRunTestRule_WindowedRule_FiresAfterEnoughEvents.
// That test fits all events inside the 5m window so it doesn't
// exercise eviction. This one spans the boundary: events at t0 and
// t0+10m with a 5m window — the older event must be evicted by the
// time the newer one is processed, leaving the rule's avg computed
// over the second event alone.
dir := t.TempDir()
cfg := filepath.Join(dir, "ding.yaml")
if err := os.WriteFile(cfg, []byte(`
notifiers:
slack:
type: webhook
url: https://example.invalid/webhook
rules:
- name: hot_avg
match: { metric: temp }
condition: avg(value) over 5m > 50
message: "avg high: {{ .avg }}"
alert:
- notifier: slack
`), 0644); err != nil {
t.Fatalf("write config: %v", err)
}

// First event at t0 is well below threshold (10).
// Second event 10 minutes later is above threshold (90).
// With a 5m window, the first event is evicted before the second
// is evaluated. The avg of {90} alone is 90, condition true.
// If eviction were broken (no eviction), avg would be (10+90)/2=50,
// which is NOT > 50, and the rule would NOT fire.
events := `{"metric":"temp","value":10,"timestamp":"2026-05-08T10:00:00Z"}
{"metric":"temp","value":90,"timestamp":"2026-05-08T10:10:00Z"}
`
in := strings.NewReader(events)
var out, errBuf bytes.Buffer

err := runTestRule(cfg, "text", false, in, &out, &errBuf)
if err != nil {
t.Fatalf("runTestRule: %v\nstderr: %s", err, errBuf.String())
}

got := out.String()
if !strings.Contains(got, "hot_avg") {
t.Errorf("expected hot_avg to fire (eviction should drop the first event so avg=90):\n%s", got)
}
if !strings.Contains(got, "avg high: 90") {
t.Errorf("expected avg=90 (only second event remains after 5m eviction):\n%s", got)
}
}

func TestRunTestRule_NoMatch_SilentStdout(t *testing.T) {
dir := t.TempDir()
cfg := writeFixtureConfig(t, dir)
Expand Down
33 changes: 20 additions & 13 deletions internal/evaluator/condition.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,17 @@ import (

var (
reEventCond = regexp.MustCompile(`^value\s*(>|>=|<|<=|==|!=)\s*(-?\d+(?:\.\d+)?)$`)
reWindowedCond = regexp.MustCompile(`^(avg|max|min|count|sum)\(value\)\s+over\s+(\d+[smh])\s*(>|>=|<|<=|==|!=)\s*(-?\d+(?:\.\d+)?)$`)
reWindowedCond = regexp.MustCompile(`^(avg|max|min|count|sum)\(value\)\s+over\s+(\d+[smh]|run)\s*(>|>=|<|<=|==|!=)\s*(-?\d+(?:\.\d+)?)$`)
)

// Condition is a parsed alert condition.
type Condition struct {
Windowed bool
Op string
Literal float64
Func string // windowed only: avg|max|min|count|sum
Window time.Duration // windowed only
Windowed bool
RunBounded bool // true when "over run"; Window is zero in this case
Op string
Literal float64
Func string // windowed only: avg|max|min|count|sum
Window time.Duration // windowed only; zero when RunBounded
}

// ParseCondition parses a condition string from ding.yaml.
Expand All @@ -30,12 +31,17 @@ func ParseCondition(s string) (Condition, error) {
}
if m := reWindowedCond.FindStringSubmatch(s); m != nil {
fn := m[1]
dur, err := time.ParseDuration(m[2])
windowToken := m[2]
op := m[3]
lit, _ := strconv.ParseFloat(m[4], 64)
if windowToken == "run" {
return Condition{Windowed: true, RunBounded: true, Func: fn, Window: 0, Op: op, Literal: lit}, nil
}
dur, err := time.ParseDuration(windowToken)
if err != nil {
return Condition{}, fmt.Errorf("invalid duration in condition %q: %w", s, err)
}
lit, _ := strconv.ParseFloat(m[4], 64)
return Condition{Windowed: true, Func: fn, Window: dur, Op: m[3], Literal: lit}, nil
return Condition{Windowed: true, Func: fn, Window: dur, Op: op, Literal: lit}, nil
}
return Condition{}, fmt.Errorf("unrecognized condition syntax: %q", s)
}
Expand Down Expand Up @@ -82,9 +88,10 @@ type evalContext struct {
// ID is the sequential integer assigned at parse time (zero-based per rule) and
// is the same integer used as the middle segment of the buffer key.
type windowedLeaf struct {
ID int
Func string
Window time.Duration
ID int
Func string
Window time.Duration
RunBounded bool
}

// leafExpr is a leaf node wrapping a single parsed Condition.
Expand All @@ -106,7 +113,7 @@ func (l *leafExpr) eval(ctx evalContext) bool {

func (l *leafExpr) collectWindowedLeaves() []windowedLeaf {
if l.Windowed {
return []windowedLeaf{{ID: l.id, Func: l.Func, Window: l.Window}}
return []windowedLeaf{{ID: l.id, Func: l.Func, Window: l.Window, RunBounded: l.RunBounded}}
}
return nil
}
Expand Down
92 changes: 92 additions & 0 deletions internal/evaluator/end_of_run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,98 @@ func TestProcess_DuringRunStillFiresAlongsideEndOfRun(t *testing.T) {
}
}

// TestProcessEndOfRun_OverRun_AggregatesAcrossWholeRun verifies that a rule
// with `condition: avg(value) over run > X` and `mode: end-of-run` aggregates
// across the entire run, regardless of how long the run lasted, and fires
// correctly at run exit. Distinguishes "over run" from "over Nm" by spanning
// timestamps wider than any plausible wall-clock window.
func TestProcessEndOfRun_OverRun_AggregatesAcrossWholeRun(t *testing.T) {
rules := []EngineRule{
{
Name: "whole_run_avg",
Match: map[string]string{"metric": "mem"},
Condition: "avg(value) over run > 50",
Message: "avg mem was {{ .avg }}",
Alerts: []string{"stdout"},
Mode: "end-of-run",
},
}
eng, err := NewEngine(rules, 1000)
if err != nil {
t.Fatalf("NewEngine: %v", err)
}

// Feed events spanning 2 hours — far beyond any wall-clock window.
t0 := time.Now()
eng.Process(ingester.Event{Metric: "mem", Value: 40, At: t0}, t0)
eng.Process(ingester.Event{Metric: "mem", Value: 60, At: t0.Add(1 * time.Hour)}, t0.Add(1*time.Hour))
eng.Process(ingester.Event{Metric: "mem", Value: 80, At: t0.Add(2 * time.Hour)}, t0.Add(2*time.Hour))

// At end-of-run the buffer should still hold all three entries:
// avg(40, 60, 80) = 60, which is > 50.
alerts := eng.ProcessEndOfRun(t0.Add(2 * time.Hour))
if len(alerts) != 1 {
t.Fatalf("expected 1 end-of-run alert, got %d", len(alerts))
}
if alerts[0].Avg != 60 {
t.Errorf("alert.Avg = %v, want 60 (avg of full run)", alerts[0].Avg)
}
if alerts[0].Count != 3 {
t.Errorf("alert.Count = %v, want 3 (all events in run)", alerts[0].Count)
}
}

// TestProcess_OverRun_FiresMidRunWithCooldown verifies the orthogonal
// composition: `over run` without `mode: end-of-run` fires mid-run when
// the threshold crosses, and cooldown prevents repeated firing.
func TestProcess_OverRun_FiresMidRunWithCooldown(t *testing.T) {
rules := []EngineRule{
{
Name: "errors_pile_up",
Match: map[string]string{"metric": "errors"},
Condition: "count(value) over run > 2",
Cooldown: 10 * time.Minute,
Message: "errors: {{ .count }}",
Alerts: []string{"stdout"},
},
}
eng, err := NewEngine(rules, 1000)
if err != nil {
t.Fatalf("NewEngine: %v", err)
}

t0 := time.Now()
// Events 1, 2 — count is at most 2, condition (count > 2) false.
for i := 0; i < 2; i++ {
alerts := eng.Process(ingester.Event{
Metric: "errors", Value: 1,
At: t0.Add(time.Duration(i) * time.Second),
}, t0.Add(time.Duration(i)*time.Second))
if len(alerts) > 0 {
t.Fatalf("event %d: rule fired prematurely (count=%v)", i, alerts[0].Count)
}
}
// Event 3 — count crosses to 3, condition true, alert fires.
alerts := eng.Process(ingester.Event{
Metric: "errors", Value: 1,
At: t0.Add(2 * time.Second),
}, t0.Add(2*time.Second))
if len(alerts) != 1 {
t.Fatalf("expected 1 alert at event 3, got %d", len(alerts))
}
if alerts[0].Count != 3 {
t.Errorf("alert.Count = %v, want 3", alerts[0].Count)
}
// Event 4 — count is 4, condition true, but cooldown blocks the alert.
alerts = eng.Process(ingester.Event{
Metric: "errors", Value: 1,
At: t0.Add(3 * time.Second),
}, t0.Add(3*time.Second))
if len(alerts) != 0 {
t.Errorf("expected cooldown to block alert at event 4, got %d", len(alerts))
}
}

// TestParseLabelKey verifies that the labelKey reverser handles the formats
// produced by LabelSetKey.
func TestParseLabelKey(t *testing.T) {
Expand Down
9 changes: 5 additions & 4 deletions internal/evaluator/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (e *Engine) Process(event ingester.Event, now time.Time) []Alert {
}
for _, leaf := range leaves {
leafBufKey := rule.Name + ":" + strconv.Itoa(leaf.ID) + ":" + labelKey
buf := e.getOrCreateBuffer(leafBufKey, leaf.Window)
buf := e.getOrCreateBuffer(leafBufKey, leaf.Window, leaf.RunBounded)
buf.Add(event.Value, event.At)
if buf.HasEntries(now) {
ctx.Available[leaf.ID] = true
Expand Down Expand Up @@ -189,7 +189,7 @@ func (e *Engine) Process(event ingester.Event, now time.Time) []Alert {
if len(leaves) == 1 {
leaf := leaves[0]
leafBufKey := rule.Name + ":" + strconv.Itoa(leaf.ID) + ":" + labelKey
buf := e.getOrCreateBuffer(leafBufKey, leaf.Window)
buf := e.getOrCreateBuffer(leafBufKey, leaf.Window, leaf.RunBounded)
alert.Avg = buf.Avg(now)
alert.Max = buf.Max(now)
alert.Min = buf.Min(now)
Expand Down Expand Up @@ -364,13 +364,14 @@ func (e *Engine) trackLabelKey(ruleName, labelKey string) {

// getOrCreateBuffer returns the ring buffer for a buffer key, creating it if needed.
// Uses bufMu independently of the RWMutex so it is safe to call from Process() under RLock.
func (e *Engine) getOrCreateBuffer(key string, window time.Duration) *RingBuffer {
// runBounded is honored only on first creation; subsequent calls return the existing buffer.
func (e *Engine) getOrCreateBuffer(key string, window time.Duration, runBounded bool) *RingBuffer {
e.bufMu.Lock()
defer e.bufMu.Unlock()
if buf, ok := e.buffers[key]; ok {
return buf
}
buf := NewRingBuffer(window, e.maxBuf)
buf := NewRingBuffer(window, e.maxBuf, runBounded)
e.buffers[key] = buf
return buf
}
Expand Down
Loading
Loading