-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathgroup_scheduler.go
More file actions
110 lines (97 loc) · 4.22 KB
/
group_scheduler.go
File metadata and controls
110 lines (97 loc) · 4.22 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
/*
Copyright 2026 The ARCORIS Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package bufferpool
import "errors"
// startCoordinatorScheduler starts the opt-in group-level coordinator scheduler
// from construction policy.
//
// Construction and PublishPolicy are both scheduler activation paths, but both
// use the same schedulerTick wrapper so scheduled cycles enter the exact
// TickInto path used by manual callers. A disabled coordinator policy is a no-op
// and keeps default group behavior manual.
func (g *PoolGroup) startCoordinatorScheduler(tickerFactory controllerSchedulerTickerFactory) error {
policy := g.config.Policy.Normalize().Coordinator
return g.startCoordinatorSchedulerForPolicy(policy, tickerFactory)
}
// stopCoordinatorScheduler stops the group-level coordinator scheduler.
//
// Close calls this before taking runtimeMu.Lock. That ordering matters because a
// scheduled TickInto may already hold or be waiting for runtimeMu.RLock. Stop
// waits for the scheduler goroutine without holding any group lock that TickInto
// might need, then Close can safely acquire runtimeMu and close child
// partitions.
func (g *PoolGroup) stopCoordinatorScheduler() {
g.coordinatorScheduler.Stop()
}
func (g *PoolGroup) startCoordinatorSchedulerForPolicy(
policy PoolGroupCoordinatorPolicy,
tickerFactory controllerSchedulerTickerFactory,
) error {
policy = PoolGroupPolicy{Coordinator: policy}.Normalize().Coordinator
if !policy.Enabled {
return nil
}
return g.coordinatorScheduler.Start(g.coordinatorSchedulerStartInput(policy, tickerFactory))
}
func (g *PoolGroup) prepareCoordinatorSchedulerStart(
policy PoolGroupCoordinatorPolicy,
) (controllerSchedulerPreparedStart, error) {
policy = PoolGroupPolicy{Coordinator: policy}.Normalize().Coordinator
return prepareControllerSchedulerStart(g.coordinatorSchedulerStartInput(policy, g.coordinatorSchedulerTickerFactory))
}
func (g *PoolGroup) coordinatorSchedulerStartInput(
policy PoolGroupCoordinatorPolicy,
tickerFactory controllerSchedulerTickerFactory,
) controllerSchedulerStartInput {
return controllerSchedulerStartInput{
Interval: policy.TickInterval,
Tick: g.schedulerTick,
IsClosedError: func(err error) bool { return errors.Is(err, ErrClosed) },
TickerFactory: tickerFactory,
}
}
// applyCoordinatorSchedulerPolicyChange runs after policy publication releases
// runtimeMu. Disable and retime can wait for Stop here without deadlocking a
// scheduled TickInto that needs runtimeMu.RLock.
func (g *PoolGroup) applyCoordinatorSchedulerPolicyChange(
change schedulerPolicyChange,
prepared controllerSchedulerPreparedStart,
) error {
switch change.Kind {
case schedulerPolicyChangeNone:
return nil
case schedulerPolicyChangeEnable:
return g.coordinatorScheduler.startPrepared(prepared)
case schedulerPolicyChangeDisable:
g.stopCoordinatorScheduler()
return nil
case schedulerPolicyChangeRetime:
return g.coordinatorScheduler.replacePrepared(prepared)
default:
return newError(ErrInvalidPolicy, policyUpdateFailureSchedulerChange)
}
}
// schedulerTick performs one scheduled group coordinator cycle.
//
// The report is intentionally stack-local and discarded immediately. TickInto is
// still responsible for publishing lightweight ControllerStatus, enforcing the
// no-overlap gate, publishing partition budget targets, and returning lifecycle
// errors to the scheduler runtime. This method does not tick partitions
// directly, call Pool.Get/Pool.Put, scan Pool internals, or execute Pool trim.
// ControllerStatus remains the last coordinator-cycle outcome; lifecycle
// queries remain responsible for telling callers whether the group itself is
// closed.
func (g *PoolGroup) schedulerTick() error {
var report PoolGroupCoordinatorReport
return g.TickInto(&report)
}