-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathgroup_coordinator_evaluation.go
More file actions
115 lines (99 loc) · 4.33 KB
/
group_coordinator_evaluation.go
File metadata and controls
115 lines (99 loc) · 4.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
/*
Copyright 2026 The ARCORIS Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package bufferpool
import "time"
// PoolGroupControllerEvaluation is a pure group controller projection.
//
// Evaluation turns two group samples into an aggregate window, rates, and
// advisory score values. It does not mutate PoolGroup state, publish runtime
// policy, execute trim, redistribute budgets, or start background work.
type PoolGroupControllerEvaluation struct {
// Window contains the aggregate countermovement.
Window PoolGroupWindow
// Rates contains aggregate window-derived ratios and throughput.
Rates PoolGroupWindowRates
// Scores contains pure scalar score projections.
Scores PoolGroupScoreValues
}
// NewPoolGroupControllerEvaluation returns a pure group control evaluation.
func NewPoolGroupControllerEvaluation(
previous PoolGroupSample,
current PoolGroupSample,
elapsed time.Duration,
budget PoolGroupBudgetSnapshot,
pressure PoolGroupPressureSnapshot,
evaluator PoolGroupScoreEvaluator,
) PoolGroupControllerEvaluation {
window := NewPoolGroupWindow(previous, current)
rates := NewPoolGroupTimedWindowRates(window, elapsed)
return PoolGroupControllerEvaluation{
Window: window,
Rates: rates,
Scores: evaluator.ScoreValues(rates, budget, pressure),
}
}
// evaluateGroupCoordinatorCycle computes one non-committing group projection.
//
// Evaluation samples the group, builds the group window, computes aggregate and
// partition-level scores, and plans group-to-partition budget targets. It does
// not publish partition targets, commit coordinator previous-sample state, start
// partition schedulers, execute Pool trim, or inspect Pool shards/classes. The
// generation advance stays here because existing TickInto semantics allocate an
// attempt generation before publication or status selection is known.
func (g *PoolGroup) evaluateGroupCoordinatorCycle(dst *PoolGroupCoordinatorReport) groupCoordinatorCycleEvaluation {
generation := g.generation.Advance()
runtime := g.currentRuntimeSnapshot()
now := clockNow(g.coordinator.clock)
sample := dst.Sample
g.sampleWithRuntimeAndGeneration(&sample, runtime, generation)
previous := sample
elapsed := time.Duration(0)
if g.coordinator.hasPreviousSample {
previous = g.coordinator.previousSample
elapsed = clockElapsed(g.coordinator.previousSampleTime, now)
}
window := dst.Window
window.Reset(previous, sample)
rates := NewPoolGroupTimedWindowRates(window, elapsed)
metrics := newPoolGroupMetrics(g.name, sample)
budget := newGroupBudgetSnapshot(runtime.Policy.Budget, sample)
pressure := newGroupPressureSnapshot(runtime.Policy.Pressure, sample)
scoreEvaluator := NewPoolGroupScoreEvaluator(runtime.Policy.Score)
scores := scoreEvaluator.ScoreValues(rates, budget, pressure)
partitionScores := g.groupPartitionScores(window, elapsed, scoreEvaluator)
partitionBudgetAllocation := g.coordinatorPartitionBudgetReport(generation, runtime, window, partitionScores)
budgetPublication := PoolGroupBudgetPublicationReport{
Generation: generation,
Allocation: newBudgetAllocationDiagnostics(partitionBudgetAllocation.Allocation),
Targets: partitionBudgetAllocation.Targets,
Published: false,
}
return groupCoordinatorCycleEvaluation{
generation: generation,
runtime: runtime,
now: now,
sample: sample,
previous: previous,
elapsed: elapsed,
window: window,
rates: rates,
metrics: metrics,
budget: budget,
pressure: pressure,
scores: scores,
partitionScores: partitionScores,
partitionBudgetAllocation: partitionBudgetAllocation,
partitionBudgetTargets: partitionBudgetAllocation.Targets,
budgetPublication: budgetPublication,
}
}