-
Notifications
You must be signed in to change notification settings - Fork 100
[Draft] Phase 1: PD-aware autoscaling (soft role coordination) #948
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,63 @@ | ||
| /* | ||
| Copyright The Volcano Authors. | ||
|
|
||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||
| you may not use this file except in compliance with the License. | ||
| You may obtain a copy of the License at | ||
|
|
||
| http://www.apache.org/licenses/LICENSE-2.0 | ||
|
|
||
| Unless required by applicable law or agreed to in writing, software | ||
| distributed under the License is distributed on an "AS IS" BASIS, | ||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| See the License for the specific language governing permissions and | ||
| limitations under the License. | ||
| */ | ||
|
|
||
| package v1alpha1 | ||
|
|
||
| // Hand-written DeepCopy implementations for the Phase 1 PD-aware | ||
| // coordination types. | ||
| // | ||
| // IMPORTANT (reviewers): zz_generated.deepcopy.go is intentionally left | ||
| // untouched. Before merging, run `make generate` so controller-gen | ||
| // regenerates that file — this will (a) duplicate the methods below into | ||
| // zz_generated.deepcopy.go and (b) extend HeterogeneousTarget.DeepCopyInto | ||
| // to deep-copy the new Coordination pointer. After regeneration, this file | ||
| // can be deleted. | ||
|
|
||
| // DeepCopyInto copies the receiver into out. in must be non-nil. | ||
| func (in *Coordination) DeepCopyInto(out *Coordination) { | ||
| *out = *in | ||
| if in.PreferredRatio != nil { | ||
| in, out := &in.PreferredRatio, &out.PreferredRatio | ||
| *out = make(map[string]RoleRange, len(*in)) | ||
| for k, v := range *in { | ||
| (*out)[k] = v | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // DeepCopy returns a deep copy of the receiver, or nil if the receiver is nil. | ||
| func (in *Coordination) DeepCopy() *Coordination { | ||
| if in == nil { | ||
| return nil | ||
| } | ||
| out := new(Coordination) | ||
| in.DeepCopyInto(out) | ||
| return out | ||
| } | ||
|
|
||
| // DeepCopyInto copies the receiver into out. in must be non-nil. | ||
| // RoleRange has no pointer fields, so a value copy suffices. | ||
| func (in *RoleRange) DeepCopyInto(out *RoleRange) { *out = *in } | ||
|
|
||
| // DeepCopy returns a deep copy of the receiver, or nil if the receiver is nil. | ||
| func (in *RoleRange) DeepCopy() *RoleRange { | ||
| if in == nil { | ||
| return nil | ||
| } | ||
| out := new(RoleRange) | ||
| in.DeepCopyInto(out) | ||
| return out | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,6 +18,7 @@ package autoscaler | |
|
|
||
| import ( | ||
| "context" | ||
| "math" | ||
| "sort" | ||
|
|
||
| workload "github.com/volcano-sh/kthena/pkg/apis/workload/v1alpha1" | ||
|
|
@@ -152,6 +153,9 @@ func (optimizer *Optimizer) Optimize(ctx context.Context, podLister listerv1.Pod | |
| size := len(optimizer.Meta.Config.Params) | ||
| unreadyInstancesCount := int32(0) | ||
| readyInstancesMetrics := make([]algorithm.Metrics, 0, size) | ||
| // readyMetricsByName mirrors readyInstancesMetrics but keyed by target name | ||
| // so coordination can correlate pressure signals with each role. | ||
| readyMetricsByName := make(map[string]algorithm.Metrics, size) | ||
| instancesCountSum := int32(0) | ||
| // Update all model serving instances' metrics | ||
| for _, param := range optimizer.Meta.Config.Params { | ||
|
|
@@ -169,6 +173,7 @@ func (optimizer *Optimizer) Optimize(ctx context.Context, podLister listerv1.Pod | |
| } | ||
| unreadyInstancesCount += currentUnreadyInstancesCount | ||
| readyInstancesMetrics = append(readyInstancesMetrics, currentReadyInstancesMetrics) | ||
| readyMetricsByName[param.Target.TargetRef.Name] = currentReadyInstancesMetrics | ||
| } | ||
| // Get recommended replicas of all model serving instances | ||
| instancesAlgorithm := algorithm.RecommendedInstancesAlgorithm{ | ||
|
|
@@ -204,5 +209,220 @@ func (optimizer *Optimizer) Optimize(ctx context.Context, podLister listerv1.Pod | |
| optimizer.Status.AppendCorrected(recommendedInstances) | ||
|
|
||
| replicasMap := optimizer.Meta.RestoreReplicasOfEachBackend(recommendedInstances) | ||
|
|
||
| // PD-aware coordination (Phase 1): when enabled and not in panic mode, | ||
| // bias the cost-based distribution using pressure signals and a soft | ||
| // preferred-ratio band. Disabled by default — behavior is unchanged. | ||
| if coord := optimizer.Meta.Config.Coordination; coord != nil && | ||
| coord.Mode == workload.CoordinationModePreferred && | ||
| !optimizer.Status.IsPanicMode() { | ||
| replicasMap = optimizer.Meta.applyCoordination(coord, recommendedInstances, replicasMap, readyMetricsByName) | ||
| } | ||
| return replicasMap, nil | ||
| } | ||
|
|
||
| // pressureMetricNames are the Phase 1 placeholder signals the optimizer | ||
| // consults to estimate per-role load. They are read by name from whatever | ||
| // the configured MetricCollector exposes — there is no hard coupling: a | ||
| // missing metric simply contributes zero. Phase 2 will let users configure | ||
| // these per role; for now keeping them as a small constant set keeps the | ||
| // blast radius small and the diff reviewable. | ||
| var pressureMetricNames = []string{ | ||
| "queue_depth", // prefill + decode: pending requests | ||
| "kv_cache_utilization", // decode: KV-cache pressure (typically 0..1) | ||
| "ttft", // prefill: time-to-first-token (seconds) | ||
| } | ||
|
Comment on lines
+224
to
+234
|
||
|
|
||
| // coordinationAlpha is the fixed blend weight in Phase 1 (0.5 = equal cost | ||
| // and pressure influence). Made configurable in Phase 2. | ||
| const coordinationAlpha = 0.5 | ||
|
|
||
| // applyCoordination biases the cost-based baseline allocation toward roles | ||
| // under pressure, clipped to a per-role preferred ratio band. | ||
| // | ||
| // Phase 1 simplifications worth flagging for reviewers: | ||
| // - alpha is hardcoded. | ||
| // - pressure metric names are a fixed set (see pressureMetricNames). | ||
| // - infeasible bands (e.g. mins summing > 1) cause "drift" rather than an | ||
| // error: the caller still gets a best-effort allocation. | ||
| // - panic mode is handled by the caller (it bypasses this function), per | ||
| // the proposal. | ||
| func (meta *OptimizerMeta) applyCoordination( | ||
| coord *workload.Coordination, | ||
| total int32, | ||
| baseline map[string]int32, | ||
| metricsByName map[string]algorithm.Metrics, | ||
| ) map[string]int32 { | ||
| params := meta.Config.Params | ||
| n := len(params) | ||
| if n == 0 || total <= 0 { | ||
| return baseline | ||
| } | ||
|
|
||
| names := make([]string, 0, n) | ||
| minReps := make(map[string]int32, n) | ||
| maxReps := make(map[string]int32, n) | ||
| for _, p := range params { | ||
| nm := p.Target.TargetRef.Name | ||
| names = append(names, nm) | ||
| minReps[nm] = p.MinReplicas | ||
| maxReps[nm] = p.MaxReplicas | ||
| } | ||
|
|
||
| costShare := costShareFromBaseline(names, baseline) | ||
| pressureShare := pressureShareNormalized(names, metricsByName) | ||
| if pressureShare == nil { | ||
| // No usable pressure data — degrade gracefully to cost-only blend, | ||
| // which is equivalent to the existing cost-based behavior. | ||
| pressureShare = costShare | ||
| } | ||
|
|
||
| // Blend, clip into the preferred band, renormalize. | ||
| raw := make(map[string]float64, n) | ||
| for _, nm := range names { | ||
| raw[nm] = (1-coordinationAlpha)*costShare[nm] + coordinationAlpha*pressureShare[nm] | ||
| } | ||
| for _, nm := range names { | ||
| r, ok := coord.PreferredRatio[nm] | ||
| if !ok { | ||
| continue | ||
| } | ||
| lo, hi := float64(r.Min)/100.0, float64(r.Max)/100.0 | ||
| if raw[nm] < lo { | ||
| raw[nm] = lo | ||
| } | ||
| if raw[nm] > hi { | ||
| raw[nm] = hi | ||
| } | ||
|
Comment on lines
+285
to
+296
|
||
| } | ||
| var sumRaw float64 | ||
| for _, v := range raw { | ||
| sumRaw += v | ||
| } | ||
| if sumRaw <= 0 { | ||
| return baseline | ||
| } | ||
| for nm := range raw { | ||
| raw[nm] /= sumRaw | ||
| } | ||
|
|
||
| // Convert to integers via largest-remainder so the sum is preserved, | ||
| // then clamp to each param's Min/Max. Drift after clamping is allowed. | ||
| return roundSharesToReplicas(names, raw, total, minReps, maxReps) | ||
| } | ||
|
Comment on lines
+309
to
+312
|
||
|
|
||
| // costShareFromBaseline derives a per-role share in [0,1] summing to 1 from | ||
| // the cost-based replica baseline. If the baseline is empty, fall back to a | ||
| // uniform split so blending still has a sensible cost component. | ||
| func costShareFromBaseline(names []string, baseline map[string]int32) map[string]float64 { | ||
| out := make(map[string]float64, len(names)) | ||
| var sum float64 | ||
| for _, nm := range names { | ||
| sum += float64(baseline[nm]) | ||
| } | ||
| if sum <= 0 { | ||
| eq := 1.0 / float64(len(names)) | ||
| for _, nm := range names { | ||
| out[nm] = eq | ||
| } | ||
| return out | ||
| } | ||
| for _, nm := range names { | ||
| out[nm] = float64(baseline[nm]) / sum | ||
| } | ||
| return out | ||
| } | ||
|
|
||
| // pressureShareNormalized produces a per-role pressure share summing to 1. | ||
| // | ||
| // Each metric in pressureMetricNames is normalized *across roles | ||
| // independently* before being summed, so signals on different scales | ||
| // (counts, ratios, seconds) contribute equally instead of the largest-scale | ||
| // metric dominating. Returns nil when no role reports any usable signal — | ||
| // the caller falls back to cost-only. | ||
| func pressureShareNormalized(names []string, metricsByName map[string]algorithm.Metrics) map[string]float64 { | ||
| contrib := make(map[string]float64, len(names)) | ||
| any := false | ||
| for _, key := range pressureMetricNames { | ||
| var sum float64 | ||
| vals := make(map[string]float64, len(names)) | ||
| for _, nm := range names { | ||
| // Treat a missing per-role map as "no signal" rather than | ||
| // indexing into a nil map and relying on Go's zero-value read. | ||
| m, ok := metricsByName[nm] | ||
| if !ok || m == nil { | ||
| continue | ||
| } | ||
| v, ok := m[key] | ||
| if !ok || v <= 0 || math.IsNaN(v) || math.IsInf(v, 0) { | ||
| continue | ||
| } | ||
| vals[nm] = v | ||
| sum += v | ||
| } | ||
| if sum <= 0 { | ||
| continue | ||
| } | ||
| any = true | ||
| for nm, v := range vals { | ||
| contrib[nm] += v / sum | ||
| } | ||
| } | ||
| if !any { | ||
| return nil | ||
| } | ||
| var total float64 | ||
| for _, v := range contrib { | ||
| total += v | ||
| } | ||
| if total <= 0 { | ||
| return nil | ||
| } | ||
| out := make(map[string]float64, len(names)) | ||
| for _, nm := range names { | ||
| out[nm] = contrib[nm] / total | ||
| } | ||
| return out | ||
| } | ||
|
|
||
| // roundSharesToReplicas converts fractional shares to integer replica counts | ||
| // via the largest-remainder method (preserving the total), then clamps each | ||
| // to its [min,max] band. If clamping drifts the total, that drift is | ||
| // accepted: Phase 1 prefers a feasible, predictable allocation over a | ||
| // retry/repair loop. | ||
| func roundSharesToReplicas(names []string, shares map[string]float64, total int32, minReps, maxReps map[string]int32) map[string]int32 { | ||
| out := make(map[string]int32, len(names)) | ||
| type rem struct { | ||
| name string | ||
| r float64 | ||
| } | ||
| rems := make([]rem, 0, len(names)) | ||
| var assigned int32 | ||
| for _, nm := range names { | ||
| v := shares[nm] * float64(total) | ||
| floor := int32(v) | ||
| out[nm] = floor | ||
| assigned += floor | ||
| rems = append(rems, rem{nm, v - float64(floor)}) | ||
| } | ||
| // Stable + name tiebreak so equal remainders produce a deterministic | ||
| // allocation (avoids flaky tests and unexplained run-to-run drift). | ||
| sort.SliceStable(rems, func(i, j int) bool { | ||
| if rems[i].r != rems[j].r { | ||
| return rems[i].r > rems[j].r | ||
| } | ||
| return rems[i].name < rems[j].name | ||
| }) | ||
| for i := 0; assigned < total && i < len(rems); i++ { | ||
| out[rems[i].name]++ | ||
| assigned++ | ||
| } | ||
| for _, nm := range names { | ||
| if out[nm] < minReps[nm] { | ||
| out[nm] = minReps[nm] | ||
| } else if out[nm] > maxReps[nm] { | ||
| out[nm] = maxReps[nm] | ||
| } | ||
| } | ||
|
Comment on lines
+420
to
+426
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The current implementation of |
||
| return out | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The API types were updated to add Coordination, but the generated deepcopy file (zz_generated.deepcopy.go) hasn’t been regenerated, so AutoscalingPolicyBinding/HeterogeneousTarget DeepCopy* currently won’t deep-copy the new Coordination pointer. This can lead to shared pointers/maps across copies. Also, keeping these hand-written DeepCopy methods in-tree will cause duplicate-method compile errors the next time
make generateruns. Please run code generation and commit the updated zz_generated.deepcopy.go (including deep-copying HeterogeneousTarget.Coordination), then remove this temporary file.