From f2d370c8b48e9c95938973d164e57cec5afef0c0 Mon Sep 17 00:00:00 2001 From: Timur Tuktamyshev Date: Wed, 30 Jul 2025 11:38:30 +0300 Subject: [PATCH 1/2] feat: add compaction and refactor queue as doubly-linked list Signed-off-by: Timur Tuktamyshev --- pkg/hook/task_metadata/task_metadata.go | 19 + pkg/hook/task_metadata/task_metadata_test.go | 76 -- .../combine_binding_context_test.go | 146 ++- pkg/shell-operator/operator.go | 21 +- pkg/task/dump/dump_test.go | 18 +- pkg/task/queue/queue_set.go | 20 +- pkg/task/queue/task_queue.go | 419 +++++-- pkg/task/queue/task_queue_benchmark_test.go | 289 +++++ pkg/task/queue/task_queue_compaction_test.go | 275 +++++ pkg/task/queue/task_queue_list.go | 1058 +++++++++++++++++ pkg/task/queue/task_queue_requeue_test.go | 239 ++++ pkg/task/queue/task_queue_test.go | 138 +++ pkg/task/task.go | 13 + test/hook/context/context_combiner.go | 6 +- 14 files changed, 2508 insertions(+), 229 deletions(-) create mode 100644 pkg/task/queue/task_queue_benchmark_test.go create mode 100644 pkg/task/queue/task_queue_compaction_test.go create mode 100644 pkg/task/queue/task_queue_list.go create mode 100644 pkg/task/queue/task_queue_requeue_test.go diff --git a/pkg/hook/task_metadata/task_metadata.go b/pkg/hook/task_metadata/task_metadata.go index b3567903..8185ffd6 100644 --- a/pkg/hook/task_metadata/task_metadata.go +++ b/pkg/hook/task_metadata/task_metadata.go @@ -26,6 +26,14 @@ type BindingContextAccessor interface { GetBindingContext() []bindingcontext.BindingContext } +type BindingContextSetter interface { + SetBindingContext([]bindingcontext.BindingContext) interface{} +} + +type MonitorIDSetter interface { + SetMonitorIDs([]string) interface{} +} + type MonitorIDAccessor interface { GetMonitorIDs() []string } @@ -74,6 +82,12 @@ func (m HookMetadata) GetBindingContext() []bindingcontext.BindingContext { return m.BindingContext } +func (m HookMetadata) SetBindingContext(context []bindingcontext.BindingContext) interface{} { + m.BindingContext = context + + return m +} + func (m HookMetadata) GetAllowFailure() bool { return m.AllowFailure } @@ -82,6 +96,11 @@ func (m HookMetadata) GetMonitorIDs() []string { return m.MonitorIDs } +func (m HookMetadata) SetMonitorIDs(monitorIDs []string) interface{} { + m.MonitorIDs = monitorIDs + return m +} + func (m *HookMetadata) WithHookName(name string) *HookMetadata { m.HookName = name return m diff --git a/pkg/hook/task_metadata/task_metadata_test.go b/pkg/hook/task_metadata/task_metadata_test.go index ac3f37ab..41e15778 100644 --- a/pkg/hook/task_metadata/task_metadata_test.go +++ b/pkg/hook/task_metadata/task_metadata_test.go @@ -1,18 +1,13 @@ package task_metadata import ( - "fmt" - "strings" "testing" . "github.com/onsi/gomega" - "github.com/stretchr/testify/assert" bctx "github.com/flant/shell-operator/pkg/hook/binding_context" htypes "github.com/flant/shell-operator/pkg/hook/types" - "github.com/flant/shell-operator/pkg/metric" "github.com/flant/shell-operator/pkg/task" - "github.com/flant/shell-operator/pkg/task/queue" ) func Test_HookMetadata_Access(t *testing.T) { @@ -38,74 +33,3 @@ func Test_HookMetadata_Access(t *testing.T) { g.Expect(hm.BindingContext[0].Binding).Should(Equal("each_1_min")) g.Expect(hm.BindingContext[1].Binding).Should(Equal("each_5_min")) } - -func Test_HookMetadata_QueueDump_Task_Description(t *testing.T) { - g := NewWithT(t) - - logLabels := map[string]string{ - "hook": "hook1.sh", - } - - metricStorage := metric.NewStorageMock(t) - metricStorage.HistogramObserveMock.Set(func(metric string, value float64, labels map[string]string, buckets []float64) { - assert.Equal(t, metric, "{PREFIX}tasks_queue_action_duration_seconds") - assert.NotZero(t, value) - assert.Equal(t, map[string]string{ - "queue_action": "AddLast", - "queue_name": "", - }, labels) - assert.Nil(t, buckets) - }) - - q := queue.NewTasksQueue().WithMetricStorage(metricStorage) - - q.AddLast(task.NewTask(EnableKubernetesBindings). - WithMetadata(HookMetadata{ - HookName: "hook1.sh", - Binding: string(EnableKubernetesBindings), - })) - - q.AddLast(task.NewTask(HookRun). - WithMetadata(HookMetadata{ - HookName: "hook1.sh", - BindingType: htypes.OnKubernetesEvent, - Binding: "monitor_pods", - }). - WithLogLabels(logLabels). - WithQueueName("main")) - - q.AddLast(task.NewTask(HookRun). - WithMetadata(HookMetadata{ - HookName: "hook1.sh", - BindingType: htypes.Schedule, - AllowFailure: true, - Binding: "every 1 sec", - Group: "monitor_pods", - }). - WithLogLabels(logLabels). - WithQueueName("main")) - - queueDump := taskQueueToText(q) - - g.Expect(queueDump).Should(ContainSubstring("hook1.sh"), "Queue dump should reveal a hook name.") - g.Expect(queueDump).Should(ContainSubstring("EnableKubernetesBindings"), "Queue dump should reveal EnableKubernetesBindings.") - g.Expect(queueDump).Should(ContainSubstring(":kubernetes:"), "Queue dump should show kubernetes binding.") - g.Expect(queueDump).Should(ContainSubstring(":schedule:"), "Queue dump should show schedule binding.") - g.Expect(queueDump).Should(ContainSubstring("group=monitor_pods"), "Queue dump should show group name.") -} - -func taskQueueToText(q *queue.TaskQueue) string { - var buf strings.Builder - buf.WriteString(fmt.Sprintf("Queue '%s': length %d, status: '%s'\n", q.Name, q.Length(), q.Status)) - buf.WriteString("\n") - - index := 1 - q.Iterate(func(task task.Task) { - buf.WriteString(fmt.Sprintf("%2d. ", index)) - buf.WriteString(task.GetDescription()) - buf.WriteString("\n") - index++ - }) - - return buf.String() -} diff --git a/pkg/shell-operator/combine_binding_context_test.go b/pkg/shell-operator/combine_binding_context_test.go index 64bdb3be..ea5b0b87 100644 --- a/pkg/shell-operator/combine_binding_context_test.go +++ b/pkg/shell-operator/combine_binding_context_test.go @@ -9,7 +9,7 @@ import ( "github.com/stretchr/testify/assert" bindingcontext "github.com/flant/shell-operator/pkg/hook/binding_context" - . "github.com/flant/shell-operator/pkg/hook/task_metadata" + "github.com/flant/shell-operator/pkg/hook/task_metadata" "github.com/flant/shell-operator/pkg/hook/types" kemtypes "github.com/flant/shell-operator/pkg/kube_events_manager/types" "github.com/flant/shell-operator/pkg/metric" @@ -33,16 +33,20 @@ func Test_CombineBindingContext_MultipleHooks(t *testing.T) { TaskQueues := queue.NewTaskQueueSet().WithMetricStorage(metricStorage) TaskQueues.WithContext(context.Background()) - TaskQueues.NewNamedQueue("test_multiple_hooks", func(_ context.Context, _ task.Task) queue.TaskResult { - return queue.TaskResult{ - Status: "Success", - } + TaskQueues.NewNamedQueue("test_multiple_hooks", queue.QueueOpts{ + Handler: func(_ context.Context, _ task.Task) queue.TaskResult { + return queue.TaskResult{ + Status: "Success", + } + }, + CompactableTypes: []task.TaskType{task_metadata.HookRun}, + CompactionCallback: nil, }) tasks := []task.Task{ - task.NewTask(HookRun). + task.NewTask(task_metadata.HookRun). WithQueueName("test_multiple_hooks"). - WithMetadata(HookMetadata{ + WithMetadata(task_metadata.HookMetadata{ HookName: "hook1.sh", BindingContext: []bindingcontext.BindingContext{ { @@ -51,9 +55,9 @@ func Test_CombineBindingContext_MultipleHooks(t *testing.T) { }, }, }), - task.NewTask(HookRun). + task.NewTask(task_metadata.HookRun). WithQueueName("test_multiple_hooks"). - WithMetadata(HookMetadata{ + WithMetadata(task_metadata.HookMetadata{ HookName: "hook1.sh", BindingContext: []bindingcontext.BindingContext{ { @@ -62,9 +66,9 @@ func Test_CombineBindingContext_MultipleHooks(t *testing.T) { }, }, }), - task.NewTask(HookRun). + task.NewTask(task_metadata.HookRun). WithQueueName("test_multiple_hooks"). - WithMetadata(HookMetadata{ + WithMetadata(task_metadata.HookMetadata{ HookName: "hook1.sh", BindingContext: []bindingcontext.BindingContext{ { @@ -72,9 +76,9 @@ func Test_CombineBindingContext_MultipleHooks(t *testing.T) { }, }, }), - task.NewTask(HookRun). + task.NewTask(task_metadata.HookRun). WithQueueName("test_multiple_hooks"). - WithMetadata(HookMetadata{ + WithMetadata(task_metadata.HookMetadata{ HookName: "hook1.sh", BindingContext: []bindingcontext.BindingContext{ { @@ -83,9 +87,9 @@ func Test_CombineBindingContext_MultipleHooks(t *testing.T) { }, }, }), - task.NewTask(HookRun). + task.NewTask(task_metadata.HookRun). WithQueueName("test_multiple_hooks"). - WithMetadata(HookMetadata{ + WithMetadata(task_metadata.HookMetadata{ HookName: "hook2.sh", BindingContext: []bindingcontext.BindingContext{ { @@ -94,9 +98,9 @@ func Test_CombineBindingContext_MultipleHooks(t *testing.T) { }, }, }), - task.NewTask(HookRun). + task.NewTask(task_metadata.HookRun). WithQueueName("test_multiple_hooks"). - WithMetadata(HookMetadata{ + WithMetadata(task_metadata.HookMetadata{ HookName: "hook1.sh", BindingContext: []bindingcontext.BindingContext{ { @@ -138,16 +142,20 @@ func Test_CombineBindingContext_Nil_On_NoCombine(t *testing.T) { TaskQueues := queue.NewTaskQueueSet().WithMetricStorage(metricStorage) TaskQueues.WithContext(context.Background()) - TaskQueues.NewNamedQueue("test_no_combine", func(_ context.Context, _ task.Task) queue.TaskResult { - return queue.TaskResult{ - Status: "Success", - } + TaskQueues.NewNamedQueue("test_no_combine", queue.QueueOpts{ + Handler: func(_ context.Context, _ task.Task) queue.TaskResult { + return queue.TaskResult{ + Status: "Success", + } + }, + CompactableTypes: []task.TaskType{task_metadata.HookRun}, + CompactionCallback: nil, }) tasks := []task.Task{ - task.NewTask(HookRun). + task.NewTask(task_metadata.HookRun). WithQueueName("test_no_combine"). - WithMetadata(HookMetadata{ + WithMetadata(task_metadata.HookMetadata{ HookName: "hook1.sh", BindingContext: []bindingcontext.BindingContext{ { @@ -156,9 +164,9 @@ func Test_CombineBindingContext_Nil_On_NoCombine(t *testing.T) { }, }, }), - task.NewTask(HookRun). + task.NewTask(task_metadata.HookRun). WithQueueName("test_no_combine"). - WithMetadata(HookMetadata{ + WithMetadata(task_metadata.HookMetadata{ HookName: "hook2.sh", BindingContext: []bindingcontext.BindingContext{ { @@ -167,9 +175,9 @@ func Test_CombineBindingContext_Nil_On_NoCombine(t *testing.T) { }, }, }), - task.NewTask(HookRun). + task.NewTask(task_metadata.HookRun). WithQueueName("test_no_combine"). - WithMetadata(HookMetadata{ + WithMetadata(task_metadata.HookMetadata{ HookName: "hook3.sh", BindingContext: []bindingcontext.BindingContext{ { @@ -208,10 +216,14 @@ func Test_CombineBindingContext_Group_Compaction(t *testing.T) { TaskQueues := queue.NewTaskQueueSet().WithMetricStorage(metricStorage) TaskQueues.WithContext(context.Background()) - TaskQueues.NewNamedQueue("test_multiple_hooks", func(_ context.Context, _ task.Task) queue.TaskResult { - return queue.TaskResult{ - Status: "Success", - } + TaskQueues.NewNamedQueue("test_multiple_hooks", queue.QueueOpts{ + Handler: func(_ context.Context, _ task.Task) queue.TaskResult { + return queue.TaskResult{ + Status: "Success", + } + }, + CompactableTypes: []task.TaskType{task_metadata.HookRun}, + CompactionCallback: nil, }) bcMeta := bindingcontext.BindingContext{}.Metadata @@ -219,9 +231,9 @@ func Test_CombineBindingContext_Group_Compaction(t *testing.T) { tasks := []task.Task{ // 3 tasks with Group should be compacted - task.NewTask(HookRun). + task.NewTask(task_metadata.HookRun). WithQueueName("test_multiple_hooks"). - WithMetadata(HookMetadata{ + WithMetadata(task_metadata.HookMetadata{ HookName: "hook1.sh", BindingContext: []bindingcontext.BindingContext{ { @@ -231,9 +243,9 @@ func Test_CombineBindingContext_Group_Compaction(t *testing.T) { }, }, }), - task.NewTask(HookRun). + task.NewTask(task_metadata.HookRun). WithQueueName("test_multiple_hooks"). - WithMetadata(HookMetadata{ + WithMetadata(task_metadata.HookMetadata{ HookName: "hook1.sh", BindingContext: []bindingcontext.BindingContext{ { @@ -243,9 +255,9 @@ func Test_CombineBindingContext_Group_Compaction(t *testing.T) { }, }, }), - task.NewTask(HookRun). + task.NewTask(task_metadata.HookRun). WithQueueName("test_multiple_hooks"). - WithMetadata(HookMetadata{ + WithMetadata(task_metadata.HookMetadata{ HookName: "hook1.sh", BindingContext: []bindingcontext.BindingContext{ { @@ -254,9 +266,9 @@ func Test_CombineBindingContext_Group_Compaction(t *testing.T) { }, }, }), - task.NewTask(HookRun). + task.NewTask(task_metadata.HookRun). WithQueueName("test_multiple_hooks"). - WithMetadata(HookMetadata{ + WithMetadata(task_metadata.HookMetadata{ HookName: "hook1.sh", BindingContext: []bindingcontext.BindingContext{ { @@ -266,9 +278,9 @@ func Test_CombineBindingContext_Group_Compaction(t *testing.T) { }, }), // Should not combine with next tasks (different hook name) - task.NewTask(HookRun). + task.NewTask(task_metadata.HookRun). WithQueueName("test_multiple_hooks"). - WithMetadata(HookMetadata{ + WithMetadata(task_metadata.HookMetadata{ HookName: "hook2.sh", BindingContext: []bindingcontext.BindingContext{ { @@ -277,9 +289,9 @@ func Test_CombineBindingContext_Group_Compaction(t *testing.T) { }, }, }), - task.NewTask(HookRun). + task.NewTask(task_metadata.HookRun). WithQueueName("test_multiple_hooks"). - WithMetadata(HookMetadata{ + WithMetadata(task_metadata.HookMetadata{ HookName: "hook1.sh", BindingContext: []bindingcontext.BindingContext{ { @@ -321,10 +333,14 @@ func Test_CombineBindingContext_Group_Type(t *testing.T) { TaskQueues := queue.NewTaskQueueSet().WithMetricStorage(metricStorage) TaskQueues.WithContext(context.Background()) - TaskQueues.NewNamedQueue("test_multiple_hooks", func(_ context.Context, _ task.Task) queue.TaskResult { - return queue.TaskResult{ - Status: "Success", - } + TaskQueues.NewNamedQueue("test_multiple_hooks", queue.QueueOpts{ + Handler: func(_ context.Context, _ task.Task) queue.TaskResult { + return queue.TaskResult{ + Status: "Success", + } + }, + CompactableTypes: []task.TaskType{task_metadata.HookRun}, + CompactionCallback: nil, }) bcMeta := bindingcontext.BindingContext{}.Metadata @@ -336,9 +352,9 @@ func Test_CombineBindingContext_Group_Type(t *testing.T) { schMeta.BindingType = types.Schedule tasks := []task.Task{ - task.NewTask(HookRun). + task.NewTask(task_metadata.HookRun). WithQueueName("test_multiple_hooks"). - WithMetadata(HookMetadata{ + WithMetadata(task_metadata.HookMetadata{ HookName: "hook1.sh", BindingContext: []bindingcontext.BindingContext{ { @@ -348,9 +364,9 @@ func Test_CombineBindingContext_Group_Type(t *testing.T) { }, }, }), - task.NewTask(HookRun). + task.NewTask(task_metadata.HookRun). WithQueueName("test_multiple_hooks"). - WithMetadata(HookMetadata{ + WithMetadata(task_metadata.HookMetadata{ HookName: "hook1.sh", BindingContext: []bindingcontext.BindingContext{ { @@ -360,9 +376,9 @@ func Test_CombineBindingContext_Group_Type(t *testing.T) { }, }, }), - task.NewTask(HookRun). + task.NewTask(task_metadata.HookRun). WithQueueName("test_multiple_hooks"). - WithMetadata(HookMetadata{ + WithMetadata(task_metadata.HookMetadata{ HookName: "hook1.sh", BindingContext: []bindingcontext.BindingContext{ { @@ -374,9 +390,9 @@ func Test_CombineBindingContext_Group_Type(t *testing.T) { // stop compaction for group // bcList[1] type == kemtypes.TypeEvent - task.NewTask(HookRun). + task.NewTask(task_metadata.HookRun). WithQueueName("test_multiple_hooks"). - WithMetadata(HookMetadata{ + WithMetadata(task_metadata.HookMetadata{ HookName: "hook1.sh", BindingContext: []bindingcontext.BindingContext{ { @@ -386,9 +402,9 @@ func Test_CombineBindingContext_Group_Type(t *testing.T) { }, }), // bcList[2] type == Group - task.NewTask(HookRun). + task.NewTask(task_metadata.HookRun). WithQueueName("test_multiple_hooks"). - WithMetadata(HookMetadata{ + WithMetadata(task_metadata.HookMetadata{ HookName: "hook1.sh", BindingContext: []bindingcontext.BindingContext{ { @@ -398,9 +414,9 @@ func Test_CombineBindingContext_Group_Type(t *testing.T) { }, }), // bcList[3] type == Schedule - task.NewTask(HookRun). + task.NewTask(task_metadata.HookRun). WithQueueName("test_multiple_hooks"). - WithMetadata(HookMetadata{ + WithMetadata(task_metadata.HookMetadata{ HookName: "hook1.sh", BindingContext: []bindingcontext.BindingContext{ { @@ -414,9 +430,9 @@ func Test_CombineBindingContext_Group_Type(t *testing.T) { }, }), // bcList[4] type == Group - task.NewTask(HookRun). + task.NewTask(task_metadata.HookRun). WithQueueName("test_multiple_hooks"). - WithMetadata(HookMetadata{ + WithMetadata(task_metadata.HookMetadata{ HookName: "hook1.sh", BindingContext: []bindingcontext.BindingContext{ { @@ -427,9 +443,9 @@ func Test_CombineBindingContext_Group_Type(t *testing.T) { }, }), // Should not combine with task that has different type - task.NewTask(EnableScheduleBindings). + task.NewTask(task_metadata.EnableScheduleBindings). WithQueueName("test_multiple_hooks"). - WithMetadata(HookMetadata{ + WithMetadata(task_metadata.HookMetadata{ HookName: "hook1.sh", BindingContext: []bindingcontext.BindingContext{ { @@ -438,9 +454,9 @@ func Test_CombineBindingContext_Group_Type(t *testing.T) { }, }, }), - task.NewTask(HookRun). + task.NewTask(task_metadata.HookRun). WithQueueName("test_multiple_hooks"). - WithMetadata(HookMetadata{ + WithMetadata(task_metadata.HookMetadata{ HookName: "hook1.sh", BindingContext: []bindingcontext.BindingContext{ { diff --git a/pkg/shell-operator/operator.go b/pkg/shell-operator/operator.go index 9efe1611..41ab16ce 100644 --- a/pkg/shell-operator/operator.go +++ b/pkg/shell-operator/operator.go @@ -822,7 +822,12 @@ func (op *ShellOperator) bootstrapMainQueue(tqs *queue.TaskQueueSet) { // Prepopulate main queue with 'onStartup' tasks and 'enable kubernetes bindings' tasks. tqs.WithMainName("main") - tqs.NewNamedQueue("main", op.taskHandler) + tqs.NewNamedQueue("main", queue.QueueOpts{ + Handler: op.taskHandler, + CompactableTypes: []task.TaskType{task_metadata.HookRun}, + CompactionCallback: nil, + Logger: op.logger.With("operator.component", "mainQueue"), + }) mainQueue := tqs.GetMain() @@ -891,7 +896,12 @@ func (op *ShellOperator) initAndStartHookQueues() { h := op.HookManager.GetHook(hookName) for _, hookBinding := range h.Config.Schedules { if op.TaskQueues.GetByName(hookBinding.Queue) == nil { - op.TaskQueues.NewNamedQueue(hookBinding.Queue, op.taskHandler) + op.TaskQueues.NewNamedQueue(hookBinding.Queue, queue.QueueOpts{ + Handler: op.taskHandler, + CompactableTypes: []task.TaskType{task_metadata.HookRun}, + CompactionCallback: nil, + Logger: op.logger.With("operator.component", "hookQueue", "hook", hookName, "queue", hookBinding.Queue), + }) op.TaskQueues.GetByName(hookBinding.Queue).Start(op.ctx) } } @@ -902,7 +912,12 @@ func (op *ShellOperator) initAndStartHookQueues() { h := op.HookManager.GetHook(hookName) for _, hookBinding := range h.Config.OnKubernetesEvents { if op.TaskQueues.GetByName(hookBinding.Queue) == nil { - op.TaskQueues.NewNamedQueue(hookBinding.Queue, op.taskHandler) + op.TaskQueues.NewNamedQueue(hookBinding.Queue, queue.QueueOpts{ + Handler: op.taskHandler, + CompactableTypes: []task.TaskType{task_metadata.HookRun}, + CompactionCallback: nil, + Logger: op.logger.With("operator.component", "hookQueue", "hook", hookName, "queue", hookBinding.Queue), + }) op.TaskQueues.GetByName(hookBinding.Queue).Start(op.ctx) } } diff --git a/pkg/task/dump/dump_test.go b/pkg/task/dump/dump_test.go index dfaf3310..5edf9c43 100644 --- a/pkg/task/dump/dump_test.go +++ b/pkg/task/dump/dump_test.go @@ -107,7 +107,11 @@ func Test_Dump(t *testing.T) { // Create and fill main queue. t.Run("single main queue", func(t *testing.T) { - tqs.NewNamedQueue("main", nil) + tqs.NewNamedQueue("main", queue.QueueOpts{ + Handler: nil, + CompactableTypes: []task.TaskType{task_metadata.HookRun}, + CompactionCallback: nil, + }) // Single empty main should be reported only as summary. dump := testDumpQueuesWrapper(tqs, "text", true) @@ -131,7 +135,11 @@ func Test_Dump(t *testing.T) { // Create and fill active queue. t.Run("fill active queue", func(_ *testing.T) { - tqs.NewNamedQueue("active-queue", nil) + tqs.NewNamedQueue("active-queue", queue.QueueOpts{ + Handler: nil, + CompactableTypes: []task.TaskType{task_metadata.HookRun}, + CompactionCallback: nil, + }) fillQueue(tqs.GetByName("active-queue"), activeTasks) dump := testDumpQueuesWrapper(tqs, "text", true) @@ -142,7 +150,11 @@ func Test_Dump(t *testing.T) { // Create empty queue. t.Run("create empty queue", func(t *testing.T) { - tqs.NewNamedQueue("empty", nil) + tqs.NewNamedQueue("empty", queue.QueueOpts{ + Handler: nil, + CompactableTypes: []task.TaskType{task_metadata.HookRun}, + CompactionCallback: nil, + }) dump := testDumpQueuesWrapper(tqs, "text", true) t.Log(dump) diff --git a/pkg/task/queue/queue_set.go b/pkg/task/queue/queue_set.go index 1a353f28..54b30dd3 100644 --- a/pkg/task/queue/queue_set.go +++ b/pkg/task/queue/queue_set.go @@ -5,6 +5,8 @@ import ( "sync" "time" + "github.com/deckhouse/deckhouse/pkg/log" + "github.com/flant/shell-operator/pkg/metric" "github.com/flant/shell-operator/pkg/task" ) @@ -73,12 +75,26 @@ func (tqs *TaskQueueSet) Add(queue *TaskQueue) { tqs.m.Unlock() } -func (tqs *TaskQueueSet) NewNamedQueue(name string, handler func(ctx context.Context, t task.Task) TaskResult) { +type QueueOpts struct { + Handler func(ctx context.Context, t task.Task) TaskResult + CompactableTypes []task.TaskType + CompactionCallback func(compactedTasks []task.Task, targetTask task.Task) + Logger *log.Logger +} + +func (tqs *TaskQueueSet) NewNamedQueue(name string, opts QueueOpts) { q := NewTasksQueue() q.WithName(name) - q.WithHandler(handler) + q.WithHandler(opts.Handler) q.WithContext(tqs.ctx) q.WithMetricStorage(tqs.metricStorage) + q.WithCompactableTypes(opts.CompactableTypes) + if opts.CompactionCallback != nil { + q.WithCompactionCallback(opts.CompactionCallback) + } + if opts.Logger != nil { + q.WithLogger(opts.Logger) + } tqs.m.Lock() tqs.Queues[name] = q tqs.m.Unlock() diff --git a/pkg/task/queue/task_queue.go b/pkg/task/queue/task_queue.go index 6cbe23c7..f90a53d6 100644 --- a/pkg/task/queue/task_queue.go +++ b/pkg/task/queue/task_queue.go @@ -1,3 +1,4 @@ +// !DEPRECATED package queue import ( @@ -5,12 +6,15 @@ import ( "fmt" "log/slog" "os" + "sort" "strings" "sync" "time" "github.com/deckhouse/deckhouse/pkg/log" + bindingcontext "github.com/flant/shell-operator/pkg/hook/binding_context" + "github.com/flant/shell-operator/pkg/hook/task_metadata" "github.com/flant/shell-operator/pkg/metric" "github.com/flant/shell-operator/pkg/task" "github.com/flant/shell-operator/pkg/utils/exponential_backoff" @@ -18,44 +22,13 @@ import ( ) /* -A working queue (a pipeline) for sequential execution of tasks. - -Tasks are added to the tail and executed from the head. Also a task can be pushed -to the head to implement a meta-tasks. - -Each task is executed until success. This can be controlled with allowFailure: true -config parameter. - +WARNING: This file is deprecated and will be removed in the future. +Mainly used for benchmark purposes. */ -var ( - DefaultWaitLoopCheckInterval = 125 * time.Millisecond - DefaultDelayOnQueueIsEmpty = 250 * time.Millisecond - DefaultInitialDelayOnFailedTask = 5 * time.Second - DefaultDelayOnRepeat = 25 * time.Millisecond -) - -type TaskStatus string - -const ( - Success TaskStatus = "Success" - Fail TaskStatus = "Fail" - Repeat TaskStatus = "Repeat" - Keep TaskStatus = "Keep" -) +type TaskQueueSlice struct { + logger *log.Logger -type TaskResult struct { - Status TaskStatus - HeadTasks []task.Task - TailTasks []task.Task - AfterTasks []task.Task - - DelayBeforeNextTask time.Duration - - AfterHandle func() -} - -type TaskQueue struct { m sync.RWMutex metricStorage metric.Storage ctx context.Context @@ -65,6 +38,9 @@ type TaskQueue struct { waitInProgress bool cancelDelay bool + isCompactable bool + CompactableTypes map[task.TaskType]struct{} + items []task.Task started bool // a flag to ignore multiple starts @@ -75,6 +51,9 @@ type TaskQueue struct { Handler func(ctx context.Context, t task.Task) TaskResult Status string + // Callback for task compaction events + CompactionCallback func(compactedTasks []task.Task, targetTask task.Task) + measureActionFn func() measureActionFnOnce sync.Once @@ -85,10 +64,11 @@ type TaskQueue struct { ExponentialBackoffFn func(failureCount int) time.Duration } -func NewTasksQueue() *TaskQueue { - return &TaskQueue{ +func NewTasksQueueSlice() *TaskQueueSlice { + return &TaskQueueSlice{ items: make([]task.Task, 0), // Default timings + logger: log.NewNop(), WaitLoopCheckInterval: DefaultWaitLoopCheckInterval, DelayOnQueueIsEmpty: DefaultDelayOnQueueIsEmpty, DelayOnRepeat: DefaultDelayOnRepeat, @@ -98,28 +78,48 @@ func NewTasksQueue() *TaskQueue { } } -func (q *TaskQueue) WithContext(ctx context.Context) { +func (q *TaskQueueSlice) WithContext(ctx context.Context) { q.ctx, q.cancel = context.WithCancel(ctx) } -func (q *TaskQueue) WithMetricStorage(mstor metric.Storage) *TaskQueue { +func (q *TaskQueueSlice) WithLogger(logger *log.Logger) { + q.logger = logger +} + +func (q *TaskQueueSlice) WithMetricStorage(mstor metric.Storage) *TaskQueueSlice { q.metricStorage = mstor return q } -func (q *TaskQueue) WithName(name string) *TaskQueue { +func (q *TaskQueueSlice) WithCompactableTypes(taskTypes []task.TaskType) *TaskQueueSlice { + q.CompactableTypes = make(map[task.TaskType]struct{}, len(taskTypes)) + for _, taskType := range taskTypes { + q.CompactableTypes[taskType] = struct{}{} + } + return q +} + +func (q *TaskQueueSlice) WithName(name string) *TaskQueueSlice { q.Name = name return q } -func (q *TaskQueue) WithHandler(fn func(ctx context.Context, t task.Task) TaskResult) *TaskQueue { +func (q *TaskQueueSlice) WithHandler(fn func(ctx context.Context, t task.Task) TaskResult) *TaskQueueSlice { q.Handler = fn return q } +func (q *TaskQueueSlice) WithCompactionCallback(callback func(compactedTasks []task.Task, targetTask task.Task)) *TaskQueueSlice { + q.CompactionCallback = callback + return q +} + // MeasureActionTime is a helper to measure execution time of queue's actions -func (q *TaskQueue) MeasureActionTime(action string) func() { +func (q *TaskQueueSlice) MeasureActionTime(action string) func() { + if q.metricStorage == nil { + return func() {} + } q.measureActionFnOnce.Do(func() { if os.Getenv("QUEUE_ACTIONS_METRICS") == "no" { q.measureActionFn = func() {} @@ -132,31 +132,33 @@ func (q *TaskQueue) MeasureActionTime(action string) func() { return q.measureActionFn } -func (q *TaskQueue) GetStatus() string { +func (q *TaskQueueSlice) GetStatus() string { defer q.MeasureActionTime("GetStatus")() q.m.RLock() defer q.m.RUnlock() return q.Status } -func (q *TaskQueue) SetStatus(status string) { +func (q *TaskQueueSlice) SetStatus(status string) { q.m.Lock() q.Status = status q.m.Unlock() } -func (q *TaskQueue) IsEmpty() bool { +func (q *TaskQueueSlice) IsEmpty() bool { defer q.MeasureActionTime("IsEmpty")() q.m.RLock() defer q.m.RUnlock() - return q.isEmpty() + isEmpty := q.isEmpty() + + return isEmpty } -func (q *TaskQueue) isEmpty() bool { +func (q *TaskQueueSlice) isEmpty() bool { return len(q.items) == 0 } -func (q *TaskQueue) Length() int { +func (q *TaskQueueSlice) Length() int { defer q.MeasureActionTime("Length")() q.m.RLock() defer q.m.RUnlock() @@ -164,7 +166,7 @@ func (q *TaskQueue) Length() int { } // AddFirst adds new head element. -func (q *TaskQueue) AddFirst(t task.Task) { +func (q *TaskQueueSlice) AddFirst(t task.Task) { defer q.MeasureActionTime("AddFirst")() q.withLock(func() { q.addFirst(t) @@ -172,12 +174,12 @@ func (q *TaskQueue) AddFirst(t task.Task) { } // addFirst adds new head element. -func (q *TaskQueue) addFirst(t task.Task) { +func (q *TaskQueueSlice) addFirst(t task.Task) { q.items = append([]task.Task{t}, q.items...) } // RemoveFirst deletes a head element, so head is moved. -func (q *TaskQueue) RemoveFirst() task.Task { +func (q *TaskQueueSlice) RemoveFirst() task.Task { defer q.MeasureActionTime("RemoveFirst")() var t task.Task @@ -189,7 +191,7 @@ func (q *TaskQueue) RemoveFirst() task.Task { } // removeFirst deletes a head element, so head is moved. -func (q *TaskQueue) removeFirst() task.Task { +func (q *TaskQueueSlice) removeFirst() task.Task { if q.isEmpty() { return nil } @@ -201,18 +203,19 @@ func (q *TaskQueue) removeFirst() task.Task { } // GetFirst returns a head element. -func (q *TaskQueue) GetFirst() task.Task { +func (q *TaskQueueSlice) GetFirst() task.Task { defer q.MeasureActionTime("GetFirst")() q.m.RLock() defer q.m.RUnlock() if q.isEmpty() { return nil } - return q.items[0] + task := q.items[0] + return task } // AddLast adds new tail element. -func (q *TaskQueue) AddLast(t task.Task) { +func (q *TaskQueueSlice) AddLast(t task.Task) { defer q.MeasureActionTime("AddLast")() q.withLock(func() { q.addLast(t) @@ -220,12 +223,248 @@ func (q *TaskQueue) AddLast(t task.Task) { } // addFirst adds new tail element. -func (q *TaskQueue) addLast(t task.Task) { +func (q *TaskQueueSlice) addLast(t task.Task) { q.items = append(q.items, t) + taskType := t.GetType() + + if _, ok := q.CompactableTypes[taskType]; ok { + q.isCompactable = true + } + + if q.isCompactable && len(q.items) > 100 { + q.compaction() + q.isCompactable = false + } +} + +// compaction merges HookRun tasks for the same hook. +// DEV WARNING! Do not use HookMetadataAccessor here. Use only *Accessor interfaces because this method is used from addon-operator. +func (q *TaskQueueSlice) compaction() { + if len(q.items) == 0 { + return + } + + // Предварительно выделяем память для результата + result := make([]task.Task, 0, len(q.items)) + + hookGroups := make(map[string][]int, 10) // hookName -> []indices + var hookOrder []string + + for i, task := range q.items { + if _, ok := q.CompactableTypes[task.GetType()]; !ok { + result = append(result, task) + continue + } + hm := task.GetMetadata() + if isNil(hm) || task.IsProcessing() { + result = append(result, task) // Nil metadata и processing задачи сразу в результат + continue + } + + // Safety check to ensure we can access hook name + hookNameAccessor, ok := hm.(task_metadata.HookNameAccessor) + if !ok { + result = append(result, task) // Cannot access hook name, skip compaction + continue + } + hookName := hookNameAccessor.GetHookName() + if _, exists := hookGroups[hookName]; !exists { + hookOrder = append(hookOrder, hookName) + } + hookGroups[hookName] = append(hookGroups[hookName], i) + } + + // Обрабатываем группы хуков - O(N) в худшем случае + for _, hookName := range hookOrder { + indices := hookGroups[hookName] + + if len(indices) == 1 { + // Только одна задача - добавляем как есть + result = append(result, q.items[indices[0]]) + continue + } + + // Находим задачу с минимальным индексом как целевую + minIndex := indices[0] + for _, idx := range indices { + if idx < minIndex { + minIndex = idx + } + } + + // Safety check to ensure minIndex is valid + if minIndex < 0 || minIndex >= len(q.items) { + continue + } + + targetTask := q.items[minIndex] + targetHm := targetTask.GetMetadata() + if targetHm == nil { + continue + } + + // Safety checks for type assertions + bindingContextAccessor, ok := targetHm.(task_metadata.BindingContextAccessor) + if !ok { + continue + } + monitorIDAccessor, ok := targetHm.(task_metadata.MonitorIDAccessor) + if !ok { + continue + } + + contexts := bindingContextAccessor.GetBindingContext() + monitorIDs := monitorIDAccessor.GetMonitorIDs() + // Предварительно вычисляем общий размер + totalContexts := len(contexts) + totalMonitorIDs := len(monitorIDs) + + for _, idx := range indices { + if idx == minIndex { + continue // Пропускаем целевую задачу + } + existingHm := q.items[idx].GetMetadata() + if existingHm != nil { + if bindingContextAccessor, ok := existingHm.(task_metadata.BindingContextAccessor); ok { + totalContexts += len(bindingContextAccessor.GetBindingContext()) + } + if monitorIDAccessor, ok := existingHm.(task_metadata.MonitorIDAccessor); ok { + totalMonitorIDs += len(monitorIDAccessor.GetMonitorIDs()) + } + } + } + + // Создаем новые слайсы с правильным размером + // Safety check to ensure we don't create negative-sized slices + if totalContexts < 0 { + totalContexts = 0 + } + if totalMonitorIDs < 0 { + totalMonitorIDs = 0 + } + newContexts := make([]bindingcontext.BindingContext, totalContexts) + newMonitorIDs := make([]string, totalMonitorIDs) + + // Копируем контексты целевой задачи + if len(contexts) > 0 && len(newContexts) > 0 { + copySize := len(contexts) + if copySize > len(newContexts) { + copySize = len(newContexts) + } + copy(newContexts[:copySize], contexts[:copySize]) + } + if len(monitorIDs) > 0 && len(newMonitorIDs) > 0 { + copySize := len(monitorIDs) + if copySize > len(newMonitorIDs) { + copySize = len(newMonitorIDs) + } + copy(newMonitorIDs[:copySize], monitorIDs[:copySize]) + } + + // Копируем контексты от остальных задач + contextIndex := len(contexts) + monitorIndex := len(monitorIDs) + + for _, idx := range indices { + if idx == minIndex { + continue + } + // Safety check to ensure idx is valid + if idx < 0 || idx >= len(q.items) { + continue + } + existingHm := q.items[idx].GetMetadata() + if existingHm == nil { + continue + } + + // Safety checks for type assertions + bindingContextAccessor, ok := existingHm.(task_metadata.BindingContextAccessor) + if !ok { + continue + } + monitorIDAccessor, ok := existingHm.(task_metadata.MonitorIDAccessor) + if !ok { + continue + } + + existingContexts := bindingContextAccessor.GetBindingContext() + existingMonitorIDs := monitorIDAccessor.GetMonitorIDs() + + if len(existingContexts) > 0 && contextIndex < len(newContexts) { + // Safety check to ensure we don't exceed slice bounds + remainingSpace := len(newContexts) - contextIndex + if remainingSpace > 0 { + copySize := len(existingContexts) + if copySize > remainingSpace { + copySize = remainingSpace + } + copy(newContexts[contextIndex:contextIndex+copySize], existingContexts[:copySize]) + } + } + contextIndex += len(existingContexts) + + if len(existingMonitorIDs) > 0 && monitorIndex < len(newMonitorIDs) { + // Safety check to ensure we don't exceed slice bounds + remainingSpace := len(newMonitorIDs) - monitorIndex + if remainingSpace > 0 { + copySize := len(existingMonitorIDs) + if copySize > remainingSpace { + copySize = remainingSpace + } + copy(newMonitorIDs[monitorIndex:monitorIndex+copySize], existingMonitorIDs[:copySize]) + } + } + monitorIndex += len(existingMonitorIDs) + } + + // Обновляем метаданные + bindingContextSetter, ok := targetHm.(task_metadata.BindingContextSetter) + if !ok { + continue + } + withContext := bindingContextSetter.SetBindingContext(compactBindingContexts(newContexts)) + + monitorIDSetter, ok := withContext.(task_metadata.MonitorIDSetter) + if !ok { + continue + } + withContext = monitorIDSetter.SetMonitorIDs(newMonitorIDs) + targetTask.UpdateMetadata(withContext) + + // Просто добавляем в конец, потом отсортируем + result = append(result, targetTask) + + // Call compaction callback if set + if q.CompactionCallback != nil && len(indices) > 1 { + compactedTasks := make([]task.Task, 0, len(indices)-1) + for _, idx := range indices { + if idx != minIndex { + compactedTasks = append(compactedTasks, q.items[idx]) + } + } + q.CompactionCallback(compactedTasks, targetTask) + } + } + + positionMap := make(map[task.Task]int, len(q.items)) + for i, task := range q.items { + positionMap[task] = i + } + + sort.Slice(result, func(i, j int) bool { + posI := positionMap[result[i]] + posJ := positionMap[result[j]] + return posI < posJ + }) + + q.items = result + // reset dirty flag + q.isCompactable = false } // RemoveLast deletes a tail element, so tail is moved. -func (q *TaskQueue) RemoveLast() task.Task { +func (q *TaskQueueSlice) RemoveLast() task.Task { defer q.MeasureActionTime("RemoveLast")() var t task.Task @@ -237,7 +476,7 @@ func (q *TaskQueue) RemoveLast() task.Task { } // RemoveLast deletes a tail element, so tail is moved. -func (q *TaskQueue) removeLast() task.Task { +func (q *TaskQueueSlice) removeLast() task.Task { if q.isEmpty() { return nil } @@ -253,7 +492,7 @@ func (q *TaskQueue) removeLast() task.Task { } // GetLast returns a tail element. -func (q *TaskQueue) GetLast() task.Task { +func (q *TaskQueueSlice) GetLast() task.Task { defer q.MeasureActionTime("GetLast")() var t task.Task @@ -265,7 +504,7 @@ func (q *TaskQueue) GetLast() task.Task { } // GetLast returns a tail element. -func (q *TaskQueue) getLast() task.Task { +func (q *TaskQueueSlice) getLast() task.Task { if q.isEmpty() { return nil } @@ -274,7 +513,7 @@ func (q *TaskQueue) getLast() task.Task { } // Get returns a task by id. -func (q *TaskQueue) Get(id string) task.Task { +func (q *TaskQueueSlice) Get(id string) task.Task { defer q.MeasureActionTime("Get")() var t task.Task @@ -286,7 +525,7 @@ func (q *TaskQueue) Get(id string) task.Task { } // Get returns a task by id. -func (q *TaskQueue) get(id string) task.Task { +func (q *TaskQueueSlice) get(id string) task.Task { for _, t := range q.items { if t.GetId() == id { return t @@ -297,7 +536,7 @@ func (q *TaskQueue) get(id string) task.Task { } // AddAfter inserts a task after the task with specified id. -func (q *TaskQueue) AddAfter(id string, newTask task.Task) { +func (q *TaskQueueSlice) AddAfter(id string, newTask task.Task) { defer q.MeasureActionTime("AddAfter")() q.withLock(func() { q.addAfter(id, newTask) @@ -305,7 +544,7 @@ func (q *TaskQueue) AddAfter(id string, newTask task.Task) { } // addAfter inserts a task after the task with specified id. -func (q *TaskQueue) addAfter(id string, newTask task.Task) { +func (q *TaskQueueSlice) addAfter(id string, newTask task.Task) { newItems := make([]task.Task, len(q.items)+1) idFound := false @@ -324,11 +563,15 @@ func (q *TaskQueue) addAfter(id string, newTask task.Task) { } } + if !idFound { + newItems[len(q.items)] = newTask + } + q.items = newItems } // AddBefore inserts a task before the task with specified id. -func (q *TaskQueue) AddBefore(id string, newTask task.Task) { +func (q *TaskQueueSlice) AddBefore(id string, newTask task.Task) { defer q.MeasureActionTime("AddBefore")() q.withLock(func() { q.addBefore(id, newTask) @@ -336,7 +579,7 @@ func (q *TaskQueue) AddBefore(id string, newTask task.Task) { } // addBefore inserts a task before the task with specified id. -func (q *TaskQueue) addBefore(id string, newTask task.Task) { +func (q *TaskQueueSlice) addBefore(id string, newTask task.Task) { newItems := make([]task.Task, len(q.items)+1) idFound := false @@ -362,7 +605,7 @@ func (q *TaskQueue) addBefore(id string, newTask task.Task) { } // Remove finds element by id and deletes it. -func (q *TaskQueue) Remove(id string) task.Task { +func (q *TaskQueueSlice) Remove(id string) task.Task { defer q.MeasureActionTime("Remove")() var t task.Task @@ -373,7 +616,7 @@ func (q *TaskQueue) Remove(id string) task.Task { return t } -func (q *TaskQueue) remove(id string) task.Task { +func (q *TaskQueueSlice) remove(id string) task.Task { delId := -1 for i, item := range q.items { if item.GetId() == id { @@ -391,24 +634,24 @@ func (q *TaskQueue) remove(id string) task.Task { return t } -func (q *TaskQueue) SetDebug(debug bool) { +func (q *TaskQueueSlice) SetDebug(debug bool) { q.debug = debug } -func (q *TaskQueue) debugf(format string, args ...interface{}) { +func (q *TaskQueueSlice) debugf(format string, args ...interface{}) { if !q.debug { return } log.Debug("DEBUG", fmt.Sprintf(format, args...)) } -func (q *TaskQueue) Stop() { +func (q *TaskQueueSlice) Stop() { if q.cancel != nil { q.cancel() } } -func (q *TaskQueue) Start(ctx context.Context) { +func (q *TaskQueueSlice) Start(ctx context.Context) { if q.started { return } @@ -435,6 +678,17 @@ func (q *TaskQueue) Start(ctx context.Context) { q.debugf("queue %s: tasks after wait %s", q.Name, q.String()) q.debugf("queue %s: task to handle '%s'", q.Name, t.GetType()) + // compact queue if it's dirty + q.withLock(func() { + if q.isCompactable { + q.compaction() + q.isCompactable = false + } + }) + + // set that current task is being processed, so we don't merge it with other tasks + t.SetProcessing(true) + // Now the task can be handled! var nextSleepDelay time.Duration q.SetStatus("run first task") @@ -451,6 +705,8 @@ func (q *TaskQueue) Start(ctx context.Context) { switch taskRes.Status { case Fail: + // Reset processing flag for failed task + t.SetProcessing(false) // Exponential backoff delay before retry. nextSleepDelay = q.ExponentialBackoffFn(t.GetFailureCount()) t.IncrementFailureCount() @@ -464,6 +720,9 @@ func (q *TaskQueue) Start(ctx context.Context) { // Remove current task on success. if taskRes.Status == Success { q.remove(t.GetId()) + } else { + // Reset processing flag for kept task + t.SetProcessing(false) } // Also, add HeadTasks in reverse order // at the start of the queue. The first task in HeadTasks @@ -478,6 +737,8 @@ func (q *TaskQueue) Start(ctx context.Context) { }) q.SetStatus("") case Repeat: + // Reset processing flag for repeated task + t.SetProcessing(false) // repeat a current task after a small delay nextSleepDelay = q.DelayOnRepeat q.SetStatus("repeat head task") @@ -503,7 +764,7 @@ func (q *TaskQueue) Start(ctx context.Context) { // waitForTask returns a task that can be processed or a nil if context is canceled. // sleepDelay is used to sleep before check a task, e.g. in case of failed previous task. // If queue is empty, then it will be checked every DelayOnQueueIsEmpty. -func (q *TaskQueue) waitForTask(sleepDelay time.Duration) task.Task { +func (q *TaskQueueSlice) waitForTask(sleepDelay time.Duration) task.Task { // Check Done channel. select { case <-q.ctx.Done(): @@ -589,7 +850,7 @@ func (q *TaskQueue) waitForTask(sleepDelay time.Duration) task.Task { } // CancelTaskDelay breaks wait loop. Useful to break the possible long sleep delay. -func (q *TaskQueue) CancelTaskDelay() { +func (q *TaskQueueSlice) CancelTaskDelay() { q.waitMu.Lock() if q.waitInProgress { q.cancelDelay = true @@ -598,7 +859,7 @@ func (q *TaskQueue) CancelTaskDelay() { } // Iterate run doFn for every task. -func (q *TaskQueue) Iterate(doFn func(task.Task)) { +func (q *TaskQueueSlice) Iterate(doFn func(task.Task)) { if doFn == nil { return } @@ -613,7 +874,7 @@ func (q *TaskQueue) Iterate(doFn func(task.Task)) { } // Filter run filterFn on every task and remove each with false result. -func (q *TaskQueue) Filter(filterFn func(task.Task) bool) { +func (q *TaskQueueSlice) Filter(filterFn func(task.Task) bool) { if filterFn == nil { return } @@ -634,7 +895,7 @@ func (q *TaskQueue) Filter(filterFn func(task.Task) bool) { // TODO define mapping method with QueueAction to insert, modify and delete tasks. // Dump tasks in queue to one line -func (q *TaskQueue) String() string { +func (q *TaskQueueSlice) String() string { var buf strings.Builder var index int qLen := q.Length() @@ -650,13 +911,13 @@ func (q *TaskQueue) String() string { return buf.String() } -func (q *TaskQueue) withLock(fn func()) { +func (q *TaskQueueSlice) withLock(fn func()) { q.m.Lock() fn() q.m.Unlock() } -func (q *TaskQueue) withRLock(fn func()) { +func (q *TaskQueueSlice) withRLock(fn func()) { q.m.RLock() fn() q.m.RUnlock() diff --git a/pkg/task/queue/task_queue_benchmark_test.go b/pkg/task/queue/task_queue_benchmark_test.go new file mode 100644 index 00000000..815c180a --- /dev/null +++ b/pkg/task/queue/task_queue_benchmark_test.go @@ -0,0 +1,289 @@ +package queue + +import ( + "fmt" + "strconv" + "sync/atomic" + "testing" + "time" + + "github.com/gofrs/uuid/v5" + + bindingcontext "github.com/flant/shell-operator/pkg/hook/binding_context" + "github.com/flant/shell-operator/pkg/hook/task_metadata" + "github.com/flant/shell-operator/pkg/task" +) + +// mockTask is a mock implementation of the task.Task interface for benchmarks. +// It's a simplified version to avoid dependencies on other test files. +type mockTaskBench struct { + Id string + Type task.TaskType + FailureCount int + FailureMessage string + Metadata interface{} + processing atomic.Bool +} + +func newBenchmarkTask() task.Task { + uuid, _ := uuid.NewV4() + id := uuid.String() + + return &mockTaskBench{ + Id: id, + Type: "BenchmarkTask", + } +} + +func (t *mockTaskBench) GetId() string { return t.Id } +func (t *mockTaskBench) GetType() task.TaskType { return t.Type } +func (t *mockTaskBench) GetQueueName() string { return "benchmark_queue" } +func (t *mockTaskBench) GetDescription() string { return fmt.Sprintf("task '%s'", t.Id) } +func (t *mockTaskBench) GetMetadata() interface{} { return t.Metadata } +func (t *mockTaskBench) UpdateMetadata(m interface{}) { t.Metadata = m } +func (t *mockTaskBench) SetProcessing(val bool) { t.processing.Store(val) } +func (t *mockTaskBench) IsProcessing() bool { return t.processing.Load() } +func (t *mockTaskBench) GetLogLabels() map[string]string { return map[string]string{"id": t.Id} } +func (t *mockTaskBench) GetFailureCount() int { return t.FailureCount } +func (t *mockTaskBench) IncrementFailureCount() { t.FailureCount++ } +func (t *mockTaskBench) GetFailureMessage() string { return t.FailureMessage } +func (t *mockTaskBench) UpdateFailureMessage(msg string) { t.FailureMessage = msg } +func (t *mockTaskBench) GetProp(_ string) interface{} { return nil } +func (t *mockTaskBench) SetProp(_ string, _ interface{}) {} +func (t *mockTaskBench) GetQueuedAt() time.Time { return time.Now() } +func (t *mockTaskBench) WithQueuedAt(_ time.Time) task.Task { return t } + +type Queue interface { + AddLast(t task.Task) + AddFirst(t task.Task) + GetFirst() task.Task + RemoveFirst() task.Task + Get(id string) task.Task +} + +func benchmarkAddLast(b *testing.B, queue Queue, size int) { + for i := 0; i < size; i++ { + queue.AddLast(newBenchmarkTask()) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + queue.AddLast(newBenchmarkTask()) + } +} + +func benchmarkAddFirst(b *testing.B, queue Queue, size int) { + for i := 0; i < size; i++ { + queue.AddFirst(newBenchmarkTask()) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + queue.AddFirst(newBenchmarkTask()) + } +} + +func benchmarkRemoveFirst(b *testing.B, queue Queue, size int) { + for i := 0; i < size; i++ { + queue.AddLast(newBenchmarkTask()) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + queue.RemoveFirst() + } +} + +func benchmarkGetFirst(b *testing.B, queue Queue, size int) { + for i := 0; i < size; i++ { + queue.AddLast(newBenchmarkTask()) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + queue.GetFirst() + } +} + +func benchmarkGetByID(b *testing.B, queue Queue, size int) { + uuids := make([]string, 0, size) + for i := 0; i < size; i++ { + task := newBenchmarkTask() + queue.AddLast(task) + uuids = append(uuids, task.GetId()) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + queue.Get(uuids[i%size]) + } +} + +/* Old code */ +func BenchmarkTaskQueueSlice_AddLast_100(b *testing.B) { + benchmarkAddLast(b, NewTasksQueueSlice(), 100) +} + +func BenchmarkTaskQueueSlice_AddLast_1000(b *testing.B) { + benchmarkAddLast(b, NewTasksQueueSlice(), 1000) +} + +func BenchmarkTaskQueueSlice_AddFirst_100(b *testing.B) { + benchmarkAddFirst(b, NewTasksQueueSlice(), 100) +} + +func BenchmarkTaskQueueSlice_AddFirst_1000(b *testing.B) { + benchmarkAddFirst(b, NewTasksQueueSlice(), 1000) +} + +func BenchmarkTaskQueueSlice_RemoveFirst_100(b *testing.B) { + benchmarkRemoveFirst(b, NewTasksQueueSlice(), 100) +} + +func BenchmarkTaskQueueSlice_RemoveFirst_1000(b *testing.B) { + benchmarkRemoveFirst(b, NewTasksQueueSlice(), 1000) +} + +func BenchmarkTaskQueueSlice_GetFirst_100(b *testing.B) { + benchmarkGetFirst(b, NewTasksQueueSlice(), 100) +} + +func BenchmarkTaskQueueSlice_GetFirst_1000(b *testing.B) { + benchmarkGetFirst(b, NewTasksQueueSlice(), 1000) +} + +func BenchmarkTaskQueueSlice_GetByID_100(b *testing.B) { + benchmarkGetByID(b, NewTasksQueueSlice(), 100) +} + +func BenchmarkTaskQueueSlice_GetByID_1000(b *testing.B) { + benchmarkGetByID(b, NewTasksQueueSlice(), 1000) +} + +/* New code */ +func BenchmarkTaskQueue_AddLast_100(b *testing.B) { benchmarkAddLast(b, NewTasksQueue(), 100) } +func BenchmarkTaskQueue_AddLast_1000(b *testing.B) { benchmarkAddLast(b, NewTasksQueue(), 1000) } +func BenchmarkTaskQueue_AddFirst_100(b *testing.B) { benchmarkAddFirst(b, NewTasksQueue(), 100) } +func BenchmarkTaskQueue_AddFirst_1000(b *testing.B) { benchmarkAddFirst(b, NewTasksQueue(), 1000) } +func BenchmarkTaskQueue_RemoveFirst_100(b *testing.B) { + benchmarkRemoveFirst(b, NewTasksQueue(), 100) +} + +func BenchmarkTaskQueue_RemoveFirst_1000(b *testing.B) { + benchmarkRemoveFirst(b, NewTasksQueue(), 1000) +} + +func BenchmarkTaskQueue_GetFirst_100(b *testing.B) { + benchmarkGetFirst(b, NewTasksQueue(), 100) +} + +func BenchmarkTaskQueue_GetFirst_1000(b *testing.B) { + benchmarkGetFirst(b, NewTasksQueue(), 1000) +} + +func BenchmarkTaskQueue_GetByID_100(b *testing.B) { + benchmarkGetByID(b, NewTasksQueue(), 100) +} + +func BenchmarkTaskQueue_GetByID_1000(b *testing.B) { + benchmarkGetByID(b, NewTasksQueue(), 1000) +} + +// --- Compaction Benchmarks --- + +// mockTaskForCompaction is a more realistic mock for compaction tests. +type mockTaskForCompaction struct { + mockTaskBench +} + +func (t *mockTaskForCompaction) GetType() task.TaskType { + return task_metadata.HookRun +} + +func newCompactionHookTask(id int, hookName string) task.Task { + t := &mockTaskForCompaction{} + t.Id = "task-" + strconv.Itoa(id) + t.Metadata = task_metadata.HookMetadata{ + HookName: hookName, + BindingContext: []bindingcontext.BindingContext{ + {Binding: fmt.Sprintf("bc_for_%d", id)}, + }, + } + return t +} + +func createCompactionBenchmarkData(b *testing.B, size int) []task.Task { + b.Helper() + tasks := make([]task.Task, 0, size) + // Create a mix of hooks for a realistic test + hookNames := []string{"hook-a", "hook-b", "hook-c", "hook-d", "hook-e"} + for i := 0; i < size; i++ { + hookName := hookNames[i%len(hookNames)] + t := newCompactionHookTask(i, hookName) + // Mark some tasks as processing + if i%20 == 0 { + t.SetProcessing(true) + } + tasks = append(tasks, t) + } + return tasks +} + +func benchmarkTaskQueueCompaction(b *testing.B, size int) { + for i := 0; i < b.N; i++ { + b.StopTimer() + q := NewTasksQueue() + q.WithCompactableTypes([]task.TaskType{task_metadata.HookRun}) + tasks := createCompactionBenchmarkData(b, size) + // Setup queue without triggering compaction + for _, t := range tasks { + q.items.PushBack(t) + } + + b.StartTimer() + q.compaction() + } +} + +func benchmarkTaskQueueSliceCompaction(b *testing.B, size int) { + for i := 0; i < b.N; i++ { + b.StopTimer() + q := NewTasksQueueSlice() + q.WithCompactableTypes([]task.TaskType{task_metadata.HookRun}) + tasks := createCompactionBenchmarkData(b, size) + // Setup queue without triggering compaction + q.items = append(q.items, tasks...) + + b.StartTimer() + q.compaction() + } +} + +/* Old code */ +func BenchmarkTaskQueueSlice_Compaction_10(b *testing.B) { + benchmarkTaskQueueSliceCompaction(b, 10) +} + +func BenchmarkTaskQueueSlice_Compaction_100(b *testing.B) { + benchmarkTaskQueueSliceCompaction(b, 100) +} + +func BenchmarkTaskQueueSlice_Compaction_500(b *testing.B) { + benchmarkTaskQueueSliceCompaction(b, 500) +} + +func BenchmarkTaskQueueSlice_Compaction_1000(b *testing.B) { + benchmarkTaskQueueSliceCompaction(b, 1000) +} + +/* New code */ +func BenchmarkTaskQueue_Compaction_10(b *testing.B) { + benchmarkTaskQueueCompaction(b, 10) +} + +func BenchmarkTaskQueue_Compaction_100(b *testing.B) { + benchmarkTaskQueueCompaction(b, 100) +} + +func BenchmarkTaskQueue_Compaction_500(b *testing.B) { + benchmarkTaskQueueCompaction(b, 500) +} + +func BenchmarkTaskQueue_Compaction_1000(b *testing.B) { + benchmarkTaskQueueCompaction(b, 1000) +} diff --git a/pkg/task/queue/task_queue_compaction_test.go b/pkg/task/queue/task_queue_compaction_test.go new file mode 100644 index 00000000..a3fb5719 --- /dev/null +++ b/pkg/task/queue/task_queue_compaction_test.go @@ -0,0 +1,275 @@ +package queue + +import ( + "fmt" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + bindingcontext "github.com/flant/shell-operator/pkg/hook/binding_context" + "github.com/flant/shell-operator/pkg/hook/task_metadata" + "github.com/flant/shell-operator/pkg/hook/types" + "github.com/flant/shell-operator/pkg/task" +) + +// mockTask is a mock implementation of the task.Task interface for testing. +type mockTask struct { + Id string + Type task.TaskType + LogLabels map[string]string + FailureCount int + FailureMessage string + QueueName string + QueuedAt time.Time + + Props map[string]interface{} + + Metadata interface{} + + processing atomic.Bool +} + +func newMockTask(id, hookName string, taskType task.TaskType) *mockTask { + return &mockTask{ + Id: id, + Type: taskType, + Metadata: task_metadata.HookMetadata{ + Group: "group", + HookName: hookName, + BindingContext: []bindingcontext.BindingContext{ + { + Metadata: struct { + Version string + BindingType types.BindingType + JqFilter string + IncludeSnapshots []string + IncludeAllSnapshots bool + Group string + }{ + Group: "group", + }, + Binding: fmt.Sprintf("bc_for_%s", id), + }, + }, + }, + } +} + +func newHookTask(id, hookName string) *mockTask { + return newMockTask(id, hookName, task_metadata.HookRun) +} + +func newServiceTask(id string) *mockTask { + return newMockTask(id, "", "Service") +} + +func (t *mockTask) GetId() string { + return t.Id +} + +func (t *mockTask) GetType() task.TaskType { + return t.Type +} + +func (t *mockTask) GetQueueName() string { + return "main" +} + +func (t *mockTask) GetDescription() string { + return fmt.Sprintf("task '%s'", t.Id) +} + +func (t *mockTask) GetMetadata() interface{} { + if t.Type != task_metadata.HookRun { + return nil + } + return t.Metadata +} + +func (t *mockTask) UpdateMetadata(m interface{}) { + t.Metadata = m +} + +func (t *mockTask) SetProcessing(val bool) { + t.processing.Store(val) +} + +func (t *mockTask) IsProcessing() bool { + return t.processing.Load() +} + +func (t *mockTask) GetLogLabels() map[string]string { + return map[string]string{"id": t.Id} +} + +func (t *mockTask) GetFailureCount() int { + return t.FailureCount +} + +func (t *mockTask) GetFailureMessage() string { + return t.FailureMessage +} + +func (t *mockTask) UpdateFailureMessage(msg string) { + t.FailureMessage = msg +} + +func (t *mockTask) GetProp(key string) interface{} { + return t.Props[key] +} + +func (t *mockTask) SetProp(key string, value interface{}) { + t.Props[key] = value +} + +func (t *mockTask) GetQueuedAt() time.Time { + return t.QueuedAt +} + +func (t *mockTask) WithQueuedAt(queuedAt time.Time) task.Task { + t.QueuedAt = queuedAt + return t +} + +func (t *mockTask) IncrementFailureCount() { + t.FailureCount++ +} + +func TestTaskQueueList_AddLast_GreedyMerge(t *testing.T) { + tests := []struct { + name string + initialQueue []task.Task + taskToAdd task.Task + expectedIDs []string + expectedBCs map[string]string // map[taskID] -> expected number of binding contexts + }{ + { + name: "Simple merge into last task", + initialQueue: []task.Task{newHookTask("h1_A", "hook-1")}, + taskToAdd: newHookTask("h1_B", "hook-1"), + expectedIDs: []string{"h1_A"}, + expectedBCs: map[string]string{"h1_A": "bc_for_h1_B"}, + }, + { + name: "No merge for different hook", + initialQueue: []task.Task{newHookTask("h1_A", "hook-1")}, + taskToAdd: newHookTask("h2_B", "hook-2"), + expectedIDs: []string{"h1_A", "h2_B"}, + expectedBCs: map[string]string{"h1_A": "bc_for_h1_A", "h2_B": "bc_for_h2_B"}, + }, + { + name: "Greedy merge over a different hook task", + initialQueue: []task.Task{newHookTask("h1_A", "hook-1"), newHookTask("h2_B", "hook-2")}, + taskToAdd: newHookTask("h1_C", "hook-1"), + expectedIDs: []string{"h1_A", "h2_B"}, + expectedBCs: map[string]string{"h1_A": "bc_for_h1_C", "h2_B": "bc_for_h2_B"}, + }, + { + name: "Do not merge into a processing task, add new", + initialQueue: []task.Task{func() task.Task { + t := newHookTask("h1_A", "hook-1") + t.SetProcessing(true) + return t + }()}, + taskToAdd: newHookTask("h1_B", "hook-1"), + expectedIDs: []string{"h1_A", "h1_B"}, + expectedBCs: map[string]string{"h1_A": "bc_for_h1_A", "h1_B": "bc_for_h1_B"}, + }, + { + name: "Merge into the second pile, not the processing one", + initialQueue: []task.Task{ + func() task.Task { + t := newHookTask("h1_A", "hook-1") + t.SetProcessing(true) + return t + }(), + newHookTask("h1_B", "hook-1"), + }, + taskToAdd: newHookTask("h1_C", "hook-1"), + expectedIDs: []string{"h1_A", "h1_B"}, + expectedBCs: map[string]string{"h1_A": "bc_for_h1_A", "h1_B": "bc_for_h1_C"}, + }, + { + name: "Merge over a processing task of the same kind", + initialQueue: []task.Task{ + newHookTask("h1_A", "hook-1"), + func() task.Task { + t := newHookTask("h1_B", "hook-1") + t.SetProcessing(true) + return t + }(), + }, + taskToAdd: newHookTask("h1_C", "hook-1"), + expectedIDs: []string{"h1_A", "h1_B"}, + expectedBCs: map[string]string{"h1_A": "bc_for_h1_C", "h1_B": "bc_for_h1_B"}, + }, + { + name: "Add service task, no merge", + initialQueue: []task.Task{newHookTask("h1_A", "hook-1")}, + taskToAdd: newServiceTask("service_B"), + expectedIDs: []string{"h1_A", "service_B"}, + expectedBCs: map[string]string{"h1_A": "bc_for_h1_A"}, + }, + { + name: "Merge hook task over a service task", + initialQueue: []task.Task{newHookTask("h1_A", "hook-1"), newServiceTask("service_B")}, + taskToAdd: newHookTask("h1_C", "hook-1"), + expectedIDs: []string{"h1_A", "service_B"}, + expectedBCs: map[string]string{"h1_A": "bc_for_h1_C"}, + }, + { + name: "Greedy merge should compact the entire queue", + initialQueue: []task.Task{ + func() task.Task { + t := newHookTask("h1_A", "hook-1") + t.SetProcessing(true) + return t + }(), + newHookTask("h1_B", "hook-1"), + newServiceTask("service-1"), + newHookTask("h2_A", "hook-2"), + newHookTask("h1_C", "hook-1"), + newHookTask("h1_D", "hook-1"), + newHookTask("h2_B", "hook-2"), + }, + taskToAdd: newHookTask("h1_E", "hook-1"), + expectedIDs: []string{"h1_A", "h1_B", "service-1", "h2_A"}, + expectedBCs: map[string]string{ + "h1_A": "bc_for_h1_A", // because it's processing + "h1_B": "bc_for_h1_E", // own (dropped) + h1_C (dropped) + h1_D (dropped) + h1_E (latest kept) + "h2_A": "bc_for_h2_B", // own (dropped) + h2_B (latest kept) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + q := NewTasksQueue() + q.WithName("test_queue") + q.WithCompactableTypes([]task.TaskType{task_metadata.HookRun}) + for _, task := range tt.initialQueue { + q.addLast(task) + } + + q.addLast(tt.taskToAdd) + + q.compaction() + // Verify IDs and order + finalIDs := make([]string, 0, q.Length()) + q.Iterate(func(t task.Task) { + finalIDs = append(finalIDs, t.GetId()) + }) + assert.Equal(t, tt.expectedIDs, finalIDs, "Task IDs and order should match expected") + q.Iterate(func(task task.Task) { + if mt, ok := task.(*mockTask); ok && mt.GetType() == task_metadata.HookRun { + hm := task_metadata.HookMetadataAccessor(mt) + require.NotNil(t, hm, "HookMetadataAccessor should not return nil for hook task") + assert.Equal(t, tt.expectedBCs[mt.GetId()], hm.BindingContext[0].Binding, "BindingContext for task %s should match", mt.GetId()) + } + }) + }) + } +} diff --git a/pkg/task/queue/task_queue_list.go b/pkg/task/queue/task_queue_list.go new file mode 100644 index 00000000..18d006f5 --- /dev/null +++ b/pkg/task/queue/task_queue_list.go @@ -0,0 +1,1058 @@ +package queue + +import ( + "container/list" + "context" + "fmt" + "log/slog" + "os" + "reflect" + "strings" + "sync" + "time" + + "github.com/deckhouse/deckhouse/pkg/log" + + bindingcontext "github.com/flant/shell-operator/pkg/hook/binding_context" + "github.com/flant/shell-operator/pkg/hook/task_metadata" + "github.com/flant/shell-operator/pkg/metric" + "github.com/flant/shell-operator/pkg/task" + "github.com/flant/shell-operator/pkg/utils/exponential_backoff" + "github.com/flant/shell-operator/pkg/utils/measure" +) + +/* +A working queue (a pipeline) for sequential execution of tasks. + +Tasks are added to the tail and executed from the head. Also a task can be pushed +to the head to implement a meta-tasks. + +Each task is executed until success. This can be controlled with allowFailure: true +config parameter. + +This implementation uses container/list for O(1) queue operations and a map for O(1) task lookup by ID. +*/ + +var ( + DefaultWaitLoopCheckInterval = 125 * time.Millisecond + DefaultDelayOnQueueIsEmpty = 250 * time.Millisecond + DefaultInitialDelayOnFailedTask = 5 * time.Second + DefaultDelayOnRepeat = 25 * time.Millisecond +) + +type TaskStatus string + +const ( + Success TaskStatus = "Success" + Fail TaskStatus = "Fail" + Repeat TaskStatus = "Repeat" + Keep TaskStatus = "Keep" +) + +const compactionThreshold = 100 + +type TaskResult struct { + Status TaskStatus + HeadTasks []task.Task + TailTasks []task.Task + AfterTasks []task.Task + + DelayBeforeNextTask time.Duration + + AfterHandle func() +} + +// Global object pools for reducing allocations +var ( + compactionGroupPool = sync.Pool{ + New: func() interface{} { + return &compactionGroup{ + elementsToMerge: make([]*list.Element, 0, 8), + } + }, + } + + contextSlicePool = sync.Pool{ + New: func() interface{} { + slice := make([]bindingcontext.BindingContext, 0, 64) + return &slice + }, + } + + monitorIDSlicePool = sync.Pool{ + New: func() interface{} { + slice := make([]string, 0, 64) + return &slice + }, + } + + hookGroupsMapPool = sync.Pool{ + New: func() interface{} { + return make(map[string]*compactionGroup, 32) + }, + } +) + +type compactionGroup struct { + targetElement *list.Element + elementsToMerge []*list.Element + totalContexts int + totalMonitorIDs int +} + +// reset resets the compaction group for reuse +func (cg *compactionGroup) reset() { + cg.targetElement = nil + cg.elementsToMerge = cg.elementsToMerge[:0] // Reuse slice + cg.totalContexts = 0 + cg.totalMonitorIDs = 0 +} + +type TaskQueue struct { + logger *log.Logger + + m sync.RWMutex + metricStorage metric.Storage + ctx context.Context + cancel context.CancelFunc + + waitMu sync.Mutex + waitInProgress bool + cancelDelay bool + + items *list.List + idIndex map[string]*list.Element + + started bool // a flag to ignore multiple starts + + Name string + Handler func(ctx context.Context, t task.Task) TaskResult + Status string + + measureActionFn func() + measureActionFnOnce sync.Once + + // Timing settings. + WaitLoopCheckInterval time.Duration + DelayOnQueueIsEmpty time.Duration + DelayOnRepeat time.Duration + ExponentialBackoffFn func(failureCount int) time.Duration + + // Compaction + CompactionCallback func(compactedTasks []task.Task, targetTask task.Task) + + isCompactable bool + compactableTypes map[task.TaskType]struct{} + + // Pre-allocated buffers for compaction + contextBuffer []bindingcontext.BindingContext + monitorIDBuffer []string + groupBuffer map[string]*compactionGroup +} + +func NewTasksQueue() *TaskQueue { + return &TaskQueue{ + items: list.New(), + idIndex: make(map[string]*list.Element), + // Default timings + WaitLoopCheckInterval: DefaultWaitLoopCheckInterval, + DelayOnQueueIsEmpty: DefaultDelayOnQueueIsEmpty, + DelayOnRepeat: DefaultDelayOnRepeat, + ExponentialBackoffFn: func(failureCount int) time.Duration { + return exponential_backoff.CalculateDelay(DefaultInitialDelayOnFailedTask, failureCount) + }, + logger: log.NewNop(), + // Pre-allocate buffers + contextBuffer: make([]bindingcontext.BindingContext, 0, 128), + monitorIDBuffer: make([]string, 0, 128), + groupBuffer: make(map[string]*compactionGroup, 32), + } +} + +func (q *TaskQueue) getCompactionGroup() *compactionGroup { + return compactionGroupPool.Get().(*compactionGroup) +} + +func (q *TaskQueue) putCompactionGroup(cg *compactionGroup) { + cg.reset() + compactionGroupPool.Put(cg) +} + +func (q *TaskQueue) getContextSlice() []bindingcontext.BindingContext { + return *contextSlicePool.Get().(*[]bindingcontext.BindingContext) +} + +func (q *TaskQueue) putContextSlice(slice *[]bindingcontext.BindingContext) { + *slice = (*slice)[:0] // Reset slice + contextSlicePool.Put(slice) +} + +func (q *TaskQueue) getMonitorIDSlice() []string { + return *monitorIDSlicePool.Get().(*[]string) +} + +func (q *TaskQueue) putMonitorIDSlice(slice *[]string) { + *slice = (*slice)[:0] // Reset slice + monitorIDSlicePool.Put(slice) +} + +func (q *TaskQueue) getHookGroupsMap() map[string]*compactionGroup { + return hookGroupsMapPool.Get().(map[string]*compactionGroup) +} + +func (q *TaskQueue) putHookGroupsMap(m map[string]*compactionGroup) { + // Clear the map + for k := range m { + delete(m, k) + } + hookGroupsMapPool.Put(m) +} + +func (q *TaskQueue) WithContext(ctx context.Context) { + q.ctx, q.cancel = context.WithCancel(ctx) +} + +func (q *TaskQueue) WithMetricStorage(mstor metric.Storage) *TaskQueue { + q.metricStorage = mstor + + return q +} + +func (q *TaskQueue) WithName(name string) *TaskQueue { + q.Name = name + return q +} + +func (q *TaskQueue) WithHandler(fn func(ctx context.Context, t task.Task) TaskResult) *TaskQueue { + q.Handler = fn + return q +} + +func (q *TaskQueue) WithLogger(logger *log.Logger) *TaskQueue { + q.logger = logger + return q +} + +func (q *TaskQueue) WithCompactableTypes(taskTypes []task.TaskType) *TaskQueue { + q.compactableTypes = make(map[task.TaskType]struct{}, len(taskTypes)) + for _, taskType := range taskTypes { + q.compactableTypes[taskType] = struct{}{} + } + return q +} + +// MeasureActionTime is a helper to measure execution time of queue's actions +func (q *TaskQueue) MeasureActionTime(action string) func() { + if q.metricStorage == nil { + return func() {} + } + + q.measureActionFnOnce.Do(func() { + if os.Getenv("QUEUE_ACTIONS_METRICS") == "no" { + q.measureActionFn = func() {} + } else { + q.measureActionFn = measure.Duration(func(d time.Duration) { + q.metricStorage.HistogramObserve("{PREFIX}tasks_queue_action_duration_seconds", d.Seconds(), map[string]string{"queue_name": q.Name, "queue_action": action}, nil) + }) + } + }) + return q.measureActionFn +} + +func (q *TaskQueue) GetStatus() string { + defer q.MeasureActionTime("GetStatus")() + q.m.RLock() + defer q.m.RUnlock() + return q.Status +} + +func (q *TaskQueue) SetStatus(status string) { + q.m.Lock() + q.Status = status + q.m.Unlock() +} + +func (q *TaskQueue) IsEmpty() bool { + defer q.MeasureActionTime("IsEmpty")() + q.m.RLock() + defer q.m.RUnlock() + return q.isEmpty() +} + +func (q *TaskQueue) isEmpty() bool { + return q.items.Len() == 0 +} + +func (q *TaskQueue) Length() int { + defer q.MeasureActionTime("Length")() + q.m.RLock() + defer q.m.RUnlock() + return q.items.Len() +} + +// AddFirst adds new head element. +func (q *TaskQueue) AddFirst(t task.Task) { + defer q.MeasureActionTime("AddFirst")() + q.withLock(func() { + q.addFirst(t) + }) +} + +// addFirst adds new head element. +func (q *TaskQueue) addFirst(t task.Task) { + element := q.items.PushFront(t) + q.idIndex[t.GetId()] = element +} + +// RemoveFirst deletes a head element, so head is moved. +func (q *TaskQueue) RemoveFirst() task.Task { + defer q.MeasureActionTime("RemoveFirst")() + var t task.Task + + q.withLock(func() { + t = q.removeFirst() + }) + + return t +} + +// removeFirst deletes a head element, so head is moved. +func (q *TaskQueue) removeFirst() task.Task { + if q.isEmpty() { + return nil + } + + element := q.items.Front() + t := q.items.Remove(element).(task.Task) + delete(q.idIndex, t.GetId()) + return t +} + +// GetFirst returns a head element. +func (q *TaskQueue) GetFirst() task.Task { + defer q.MeasureActionTime("GetFirst")() + q.m.RLock() + defer q.m.RUnlock() + if q.isEmpty() { + return nil + } + return q.items.Front().Value.(task.Task) +} + +// AddLast adds new tail element. +func (q *TaskQueue) AddLast(t task.Task) { + defer q.MeasureActionTime("AddLast")() + q.withLock(func() { + q.addLast(t) + }) +} + +// addLast adds a new tail element. +// It implements the merging logic for HookRun tasks by scanning the whole queue. +func (q *TaskQueue) addLast(t task.Task) { + q.logger.Debug("adding task to queue", + slog.String("queue", q.Name), + slog.String("task_id", t.GetId()), + slog.String("task_type", string(t.GetType())), + slog.String("task_description", t.GetDescription()), + slog.Int("queue_length_before", q.items.Len()), + ) + + if _, ok := q.idIndex[t.GetId()]; ok { + q.logger.Warn("task collision detected, unexpected behavior possible", slog.String("queue", q.Name), slog.String("task_id", t.GetId())) + } + + element := q.items.PushBack(t) + q.idIndex[t.GetId()] = element + + taskType := t.GetType() + if _, ok := q.compactableTypes[taskType]; ok { + q.isCompactable = true + + q.logger.Debug("task is mergeable, marking queue as dirty", + slog.String("queue", q.Name), + slog.String("task_id", t.GetId()), + slog.String("task_type", string(taskType)), + slog.Int("queue_length", q.items.Len()), + slog.Bool("queue_is_dirty", q.isCompactable), + ) + + // Only trigger compaction if queue is getting long and we have mergeable tasks + if q.items.Len() > compactionThreshold && q.isCompactable { + q.logger.Debug("triggering compaction due to queue length", + slog.String("queue", q.Name), + slog.Int("queue_length", q.items.Len()), + slog.Int("compaction_threshold", compactionThreshold), + ) + currentQueue := q.items.Len() + q.compaction() + q.logger.Debug("compaction finished", + slog.String("queue", q.Name), + slog.Int("queue_length_before", currentQueue), + slog.Int("queue_length_after", q.items.Len()), + ) + q.isCompactable = false + } + } else { + q.logger.Debug("task is not mergeable", + slog.String("queue", q.Name), + slog.String("task_id", t.GetId()), + slog.String("task_type", string(taskType)), + ) + } +} + +// compaction merges HookRun tasks for the same hook. +// It iterates through the list once, making it an O(N) operation. +// DEV WARNING! Do not use HookMetadataAccessor here. Use only *Accessor interfaces because this method is used from addon-operator. +func (q *TaskQueue) compaction() { + if q.items.Len() < 2 { + return + } + + // Get objects from pools + hookGroups := q.getHookGroupsMap() + defer func() { + // Return all compaction groups to pool + for _, group := range hookGroups { + q.putCompactionGroup(group) + } + q.putHookGroupsMap(hookGroups) + }() + + // First pass: identify groups and calculate sizes + for e := q.items.Front(); e != nil; e = e.Next() { + t := e.Value.(task.Task) + taskType := t.GetType() + + if _, ok := q.compactableTypes[taskType]; !ok { + continue + } + + metadata := t.GetMetadata() + if isNil(metadata) || t.IsProcessing() { + continue + } + + hookNameAcessor, ok := metadata.(task_metadata.HookNameAccessor) + if !ok { + continue + } + bindingContextAcessor, ok := metadata.(task_metadata.BindingContextAccessor) + if !ok { + continue + } + monitorIDsAcessor, ok := metadata.(task_metadata.MonitorIDAccessor) + if !ok { + continue + } + + hookName := hookNameAcessor.GetHookName() + bindingContext := bindingContextAcessor.GetBindingContext() + monitorIDs := monitorIDsAcessor.GetMonitorIDs() + + if group, exists := hookGroups[hookName]; exists { + // Add to existing group + group.elementsToMerge = append(group.elementsToMerge, e) + group.totalContexts += len(bindingContext) + group.totalMonitorIDs += len(monitorIDs) + } else { + // Get new group from pool + group := q.getCompactionGroup() + group.targetElement = e + group.totalContexts = len(bindingContext) + group.totalMonitorIDs = len(monitorIDs) + hookGroups[hookName] = group + } + } + + // Second pass: merge with pooled slices + for _, group := range hookGroups { + if len(group.elementsToMerge) == 0 { + continue + } + + targetTask := group.targetElement.Value.(task.Task) + targetMetadata := targetTask.GetMetadata() + + // Get slices from pools + newContexts := q.getContextSlice() + newMonitorIDs := q.getMonitorIDSlice() + + // Ensure capacity + if cap(newContexts) < group.totalContexts { + newContexts = make([]bindingcontext.BindingContext, 0, group.totalContexts) + } + if cap(newMonitorIDs) < group.totalMonitorIDs { + newMonitorIDs = make([]string, 0, group.totalMonitorIDs) + } + + targetBindingContextAcessor, ok := targetMetadata.(task_metadata.BindingContextAccessor) + if !ok { + continue + } + targetMonitorIDsAcessor, ok := targetMetadata.(task_metadata.MonitorIDAccessor) + if !ok { + continue + } + + // Add target's contexts first + targetContexts := targetBindingContextAcessor.GetBindingContext() + targetMonitorIDs := targetMonitorIDsAcessor.GetMonitorIDs() + + newContexts = append(newContexts, targetContexts...) + newMonitorIDs = append(newMonitorIDs, targetMonitorIDs...) + + // Append contexts from other tasks and remove them + for _, elementToMerge := range group.elementsToMerge { + taskToMerge := elementToMerge.Value.(task.Task) + mergeMetadata := taskToMerge.GetMetadata() + + mergeBindingContextAcessor, ok := mergeMetadata.(task_metadata.BindingContextAccessor) + if !ok { + continue + } + mergeMonitorIDsAcessor, ok := mergeMetadata.(task_metadata.MonitorIDAccessor) + if !ok { + continue + } + + mergeContexts := mergeBindingContextAcessor.GetBindingContext() + mergeMonitorIDs := mergeMonitorIDsAcessor.GetMonitorIDs() + + newContexts = append(newContexts, mergeContexts...) + newMonitorIDs = append(newMonitorIDs, mergeMonitorIDs...) + + q.items.Remove(elementToMerge) + delete(q.idIndex, taskToMerge.GetId()) + } + + // Update target task with compacted slices + compactedContexts := compactBindingContexts(newContexts) + + // Setters return new metadata, not the same pointer, so we need to update targetTask with the new metadata + targetBindingContextSetter, ok := targetMetadata.(task_metadata.BindingContextSetter) + if !ok { + continue + } + + withContext := targetBindingContextSetter.SetBindingContext(compactedContexts) + + targetMonitorIDsSetter, ok := withContext.(task_metadata.MonitorIDSetter) + if !ok { + continue + } + + withContext = targetMonitorIDsSetter.SetMonitorIDs(newMonitorIDs) + targetTask.UpdateMetadata(withContext) + + // Call compaction callback if set + if q.CompactionCallback != nil && len(group.elementsToMerge) > 0 { + compactedTasks := make([]task.Task, 0, len(group.elementsToMerge)) + for _, elementToMerge := range group.elementsToMerge { + compactedTasks = append(compactedTasks, elementToMerge.Value.(task.Task)) + } + q.CompactionCallback(compactedTasks, targetTask) + } + + // Return slices to pools + q.putContextSlice(&newContexts) + q.putMonitorIDSlice(&newMonitorIDs) + } +} + +// compactBindingContexts mimics the logic from shell-operator's CombineBindingContextForHook. +// It removes intermediate states for the same group, keeping only the most recent one. +func compactBindingContexts(combinedContext []bindingcontext.BindingContext) []bindingcontext.BindingContext { + if len(combinedContext) < 2 { + return combinedContext + } + + // Count ungrouped contexts and estimate result size + ungroupedCount := 0 + groupCount := 0 + for _, bc := range combinedContext { + if bc.Metadata.Group == "" { + ungroupedCount++ + } else { + groupCount++ + } + } + + // If no grouped contexts, return as is + if groupCount == 0 { + return combinedContext + } + + // Use a single pass approach with a map to track the last occurrence of each group + lastIndexForGroup := make(map[string]int, groupCount) + ungroupedIndices := make([]int, 0, ungroupedCount) + + for i, bc := range combinedContext { + group := bc.Metadata.Group + if group == "" { + ungroupedIndices = append(ungroupedIndices, i) + } else { + lastIndexForGroup[group] = i + } + } + + // Pre-allocate result slice with exact capacity + resultSize := len(ungroupedIndices) + len(lastIndexForGroup) + result := make([]bindingcontext.BindingContext, 0, resultSize) + + // Add ungrouped contexts first (preserving their order) + for _, idx := range ungroupedIndices { + result = append(result, combinedContext[idx]) + } + + // Add the last occurrence of each group + for _, idx := range lastIndexForGroup { + result = append(result, combinedContext[idx]) + } + + return result +} + +// RemoveLast deletes a tail element, so tail is moved. +func (q *TaskQueue) RemoveLast() task.Task { + defer q.MeasureActionTime("RemoveLast")() + var t task.Task + + q.withLock(func() { + t = q.removeLast() + }) + + return t +} + +// removeLast deletes a tail element, so tail is moved. +func (q *TaskQueue) removeLast() task.Task { + if q.isEmpty() { + return nil + } + + element := q.items.Back() + t := q.items.Remove(element).(task.Task) + delete(q.idIndex, t.GetId()) + + return t +} + +// GetLast returns a tail element. +func (q *TaskQueue) GetLast() task.Task { + defer q.MeasureActionTime("GetLast")() + var t task.Task + q.withRLock(func() { + t = q.getLast() + }) + + return t +} + +// getLast returns a tail element. +func (q *TaskQueue) getLast() task.Task { + if q.isEmpty() { + return nil + } + + return q.items.Back().Value.(task.Task) +} + +// Get returns a task by id. +func (q *TaskQueue) Get(id string) task.Task { + defer q.MeasureActionTime("Get")() + var t task.Task + + q.withRLock(func() { + t = q.get(id) + }) + + return t +} + +// get returns a task by id. +func (q *TaskQueue) get(id string) task.Task { + if element, ok := q.idIndex[id]; ok { + return element.Value.(task.Task) + } + + return nil +} + +// AddAfter inserts a task after the task with specified id. +func (q *TaskQueue) AddAfter(id string, newTask task.Task) { + defer q.MeasureActionTime("AddAfter")() + q.withLock(func() { + q.addAfter(id, newTask) + }) +} + +// addAfter inserts a task after the task with specified id. +func (q *TaskQueue) addAfter(id string, newTask task.Task) { + if element, ok := q.idIndex[id]; ok { + newElement := q.items.InsertAfter(newTask, element) + q.idIndex[newTask.GetId()] = newElement + } +} + +// AddBefore inserts a task before the task with specified id. +func (q *TaskQueue) AddBefore(id string, newTask task.Task) { + defer q.MeasureActionTime("AddBefore")() + q.withLock(func() { + q.addBefore(id, newTask) + }) +} + +// addBefore inserts a task before the task with specified id. +func (q *TaskQueue) addBefore(id string, newTask task.Task) { + if element, ok := q.idIndex[id]; ok { + newElement := q.items.InsertBefore(newTask, element) + q.idIndex[newTask.GetId()] = newElement + } +} + +// Remove finds element by id and deletes it. +func (q *TaskQueue) Remove(id string) task.Task { + defer q.MeasureActionTime("Remove")() + var t task.Task + + q.withLock(func() { + t = q.remove(id) + }) + + return t +} + +func (q *TaskQueue) remove(id string) task.Task { + if element, ok := q.idIndex[id]; ok { + t := q.items.Remove(element).(task.Task) + delete(q.idIndex, id) + return t + } + return nil +} + +func (q *TaskQueue) Stop() { + if q.cancel != nil { + q.cancel() + } +} + +// lazydebug evaluates args only if debug log is enabled. +// It is used to avoid unnecessary allocations when logging is disabled. +// Queue MUST remain fast and not allocate memory when logging is disabled. +func (q *TaskQueue) lazydebug(msg string, argsFn func() []any) { + if q.logger != nil && q.logger.Enabled(context.Background(), slog.LevelDebug) { + q.logger.Debug(msg, argsFn()...) + } +} + +func (q *TaskQueue) Start(ctx context.Context) { + if q.started { + return + } + + if q.Handler == nil { + log.Error("should set handler before start in queue", slog.String("name", q.Name)) + q.SetStatus("no handler set") + return + } + + go func() { + q.SetStatus("") + var sleepDelay time.Duration + for { + q.logger.Debug("queue: wait for task", slog.String("queue", q.Name), slog.Duration("sleep_delay", sleepDelay)) + t := q.waitForTask(sleepDelay) + if t == nil { + q.SetStatus("stop") + q.logger.Info("queue stopped", slog.String("name", q.Name)) + return + } + + q.withLock(func() { + if q.isCompactable { + q.lazydebug("triggering compaction before task processing", func() []any { + return []any{slog.String("queue", q.Name), slog.String("task_id", t.GetId()), slog.String("task_type", string(t.GetType())), slog.Int("queue_length", q.items.Len())} + }) + q.compaction() + q.isCompactable = false + + q.lazydebug("compaction completed, queue no longer dirty", func() []any { + return []any{slog.String("queue", q.Name), slog.Int("queue_length_after", q.items.Len())} + }) + } + }) + + // set that current task is being processed, so we don't merge it with other tasks + t.SetProcessing(true) + + // use lazydebug because it dumps whole queue and task + q.lazydebug("queue tasks after wait", func() []any { + return []any{ + slog.String("queue", q.Name), + slog.String("tasks", q.String()), + } + }) + + q.lazydebug("queue task to handle", func() []any { + return []any{ + slog.String("queue", q.Name), + slog.String("task_type", string(t.GetType())), + } + }) + + var nextSleepDelay time.Duration + q.SetStatus("run first task") + taskRes := q.Handler(ctx, t) + + // Check Done channel after long-running operation. + select { + case <-q.ctx.Done(): + q.logger.Info("queue stopped after task handling", slog.String("name", q.Name)) + q.SetStatus("stop") + return + default: + } + + switch taskRes.Status { + case Fail: + t.SetProcessing(false) + // Exponential backoff delay before retry. + nextSleepDelay = q.ExponentialBackoffFn(t.GetFailureCount()) + t.IncrementFailureCount() + q.SetStatus(fmt.Sprintf("sleep after fail for %s", nextSleepDelay.String())) + case Success, Keep: + // Insert new tasks right after the current task in reverse order. + q.withLock(func() { + for i := len(taskRes.AfterTasks) - 1; i >= 0; i-- { + q.addAfter(t.GetId(), taskRes.AfterTasks[i]) + } + + if taskRes.Status == Success { + q.remove(t.GetId()) + } + t.SetProcessing(false) // release processing flag + + // Also, add HeadTasks in reverse order + // at the start of the queue. The first task in HeadTasks + // become the new first task in the queue. + for i := len(taskRes.HeadTasks) - 1; i >= 0; i-- { + q.addFirst(taskRes.HeadTasks[i]) + } + // Add tasks to the end of the queue + for _, newTask := range taskRes.TailTasks { + q.addLast(newTask) + } + }) + q.SetStatus("") + case Repeat: + // repeat a current task after a small delay + t.SetProcessing(false) + nextSleepDelay = q.DelayOnRepeat + q.SetStatus("repeat head task") + } + + if taskRes.DelayBeforeNextTask != 0 { + nextSleepDelay = taskRes.DelayBeforeNextTask + q.SetStatus(fmt.Sprintf("sleep for %s", nextSleepDelay.String())) + } + + sleepDelay = nextSleepDelay + + if taskRes.AfterHandle != nil { + taskRes.AfterHandle() + } + // use lazydebug because it dumps whole queue + q.lazydebug("queue: tasks after handle", func() []any { + return []any{slog.String("queue", q.Name), slog.String("tasks", q.String())} + }) + } + }() + q.started = true +} + +// waitForTask returns a task that can be processed or a nil if context is canceled. +// sleepDelay is used to sleep before check a task, e.g. in case of failed previous task. +// If queue is empty, then it will be checked every DelayOnQueueIsEmpty. +func (q *TaskQueue) waitForTask(sleepDelay time.Duration) task.Task { + // Check Done channel. + select { + case <-q.ctx.Done(): + return nil + default: + } + + // Shortcut: return the first task if the queue is not empty and delay is not required. + if !q.IsEmpty() && sleepDelay == 0 { + return q.GetFirst() + } + + // Initialize wait settings. + waitBegin := time.Now() + waitUntil := q.DelayOnQueueIsEmpty + if sleepDelay != 0 { + waitUntil = sleepDelay + } + + checkTicker := time.NewTicker(q.WaitLoopCheckInterval) + q.waitMu.Lock() + q.waitInProgress = true + q.cancelDelay = false + q.waitMu.Unlock() + + origStatus := q.GetStatus() + + defer func() { + checkTicker.Stop() + q.waitMu.Lock() + q.waitInProgress = false + q.cancelDelay = false + q.waitMu.Unlock() + q.SetStatus(origStatus) + }() + + // Wait for the queued task with some delay. + // Every tick increases the 'elapsed' counter until it outgrows the waitUntil value. + // Or, delay can be canceled to handle new head task immediately. + for { + checkTask := false + select { + case <-q.ctx.Done(): + // Queue is stopped. + return nil + case <-checkTicker.C: + // Check and update waitUntil. + elapsed := time.Since(waitBegin) + + q.waitMu.Lock() + if q.cancelDelay { + // Reset waitUntil to check task immediately. + waitUntil = elapsed + } + q.waitMu.Unlock() + + // Wait loop is done or canceled: break select to check for the head task. + if elapsed >= waitUntil { + // Increase waitUntil to wait on the next iteration and go check for the head task. + checkTask = true + } + } + + // Break the for-loop to see if the head task can be returned. + if checkTask { + if q.IsEmpty() { + // No task to return: increase wait time. + waitUntil += q.DelayOnQueueIsEmpty + } else { + return q.GetFirst() + } + } + + // Wait loop still in progress: update queue status. + waitTime := time.Since(waitBegin).Truncate(time.Second) + if sleepDelay == 0 { + q.SetStatus(fmt.Sprintf("waiting for task %s", waitTime.String())) + } else { + delay := sleepDelay.Truncate(time.Second) + q.SetStatus(fmt.Sprintf("%s (%s left of %s delay)", origStatus, (delay - waitTime).String(), delay.String())) + } + } +} + +// CancelTaskDelay breaks wait loop. Useful to break the possible long sleep delay. +func (q *TaskQueue) CancelTaskDelay() { + q.waitMu.Lock() + if q.waitInProgress { + q.cancelDelay = true + } + q.waitMu.Unlock() +} + +// Iterate run doFn for every task. +func (q *TaskQueue) Iterate(doFn func(task.Task)) { + if doFn == nil { + return + } + + defer q.MeasureActionTime("Iterate")() + + q.withRLock(func() { + for e := q.items.Front(); e != nil; e = e.Next() { + doFn(e.Value.(task.Task)) + } + }) +} + +// Filter run filterFn on every task and remove each with false result. +func (q *TaskQueue) Filter(filterFn func(task.Task) bool) { + if filterFn == nil { + return + } + + defer q.MeasureActionTime("Filter")() + + q.withLock(func() { + for e := q.items.Front(); e != nil; { + current := e + e = e.Next() + t := current.Value.(task.Task) + if !filterFn(t) { + q.items.Remove(current) + delete(q.idIndex, t.GetId()) + } + } + }) +} + +func (q *TaskQueue) WithCompactionCallback(callback func(compactedTasks []task.Task, targetTask task.Task)) *TaskQueue { + q.CompactionCallback = callback + return q +} + +// TODO define mapping method with QueueAction to insert, modify and delete tasks. + +// Dump tasks in queue to one line +func (q *TaskQueue) String() string { + var buf strings.Builder + var index int + qLen := q.Length() + q.Iterate(func(t task.Task) { + buf.WriteString(fmt.Sprintf("[%s,id=%10.10s]", t.GetDescription(), t.GetId())) + index++ + if index == qLen { + return + } + buf.WriteString(", ") + }) + + return buf.String() +} + +func (q *TaskQueue) withLock(fn func()) { + q.m.Lock() + fn() + q.m.Unlock() +} + +func (q *TaskQueue) withRLock(fn func()) { + q.m.RLock() + fn() + q.m.RUnlock() +} + +func isNil(v interface{}) bool { + if v == nil { + return true + } + + // Use reflection to check if the interface contains a nil concrete value + rv := reflect.ValueOf(v) + switch rv.Kind() { + case reflect.Chan, reflect.Func, reflect.Interface, reflect.Map, reflect.Ptr, reflect.Slice: + return rv.IsNil() + default: + return false + } +} diff --git a/pkg/task/queue/task_queue_requeue_test.go b/pkg/task/queue/task_queue_requeue_test.go new file mode 100644 index 00000000..76968951 --- /dev/null +++ b/pkg/task/queue/task_queue_requeue_test.go @@ -0,0 +1,239 @@ +package queue + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + . "github.com/onsi/gomega" + "github.com/stretchr/testify/assert" + + "github.com/flant/shell-operator/pkg/hook/task_metadata" + "github.com/flant/shell-operator/pkg/metric" + "github.com/flant/shell-operator/pkg/task" +) + +func Test_TaskQueueList_Requeue(t *testing.T) { + g := NewWithT(t) + + metricStorage := metric.NewStorageMock(t) + metricStorage.HistogramObserveMock.Set(func(metric string, value float64, labels map[string]string, buckets []float64) { + assert.Equal(t, metric, "{PREFIX}tasks_queue_action_duration_seconds") + assert.NotZero(t, value) + assert.Equal(t, map[string]string{ + "queue_action": "AddLast", + "queue_name": "requeue-test-queue", + }, labels) + assert.Nil(t, buckets) + }) + + // A channel to control when RequeueTask can finish. + requeueTaskCanFinish := make(chan struct{}) + + // A channel to signal that RequeueTask has finished. + requeueTaskFinished := make(chan struct{}) + + // Store execution order. + executionOrder := make([]string, 0) + mu := &sync.Mutex{} + + // Create a new task queue + q := NewTasksQueue() + q.WithMetricStorage(metricStorage) + q.WithName("requeue-test-queue") + q.WithContext(context.Background()) + + q.WaitLoopCheckInterval = 5 * time.Millisecond + q.DelayOnQueueIsEmpty = 5 * time.Millisecond + q.DelayOnRepeat = 5 * time.Millisecond + + // Define the handler for tasks + q.WithHandler(func(_ context.Context, tsk task.Task) TaskResult { + mu.Lock() + executionOrder = append(executionOrder, tsk.GetId()) + mu.Unlock() + + if tsk.GetId() == "RequeueTask" { + // If there are other tasks in the queue, move this task to the end. + if q.Length() > 1 { + res := TaskResult{Status: Success} + res.TailTasks = append(res.TailTasks, tsk) + + return res + } + + // If no other tasks, wait for the signal to finish. + <-requeueTaskCanFinish + close(requeueTaskFinished) + return TaskResult{Status: Success} + } + + // For simple tasks, just succeed. + return TaskResult{Status: Success} + }) + + // Add the "requeue" task first. + requeueTask := task.BaseTask{Id: "RequeueTask", Type: task_metadata.HookRun} + q.AddLast(&requeueTask) + + // Add a few simple tasks. + for i := 0; i < 3; i++ { + simpleTask := task.BaseTask{Id: fmt.Sprintf("SimpleTask-%d", i), Type: task_metadata.HookRun} + q.AddLast(&simpleTask) + } + + g.Expect(q.Length()).To(Equal(4)) + + // Start processing the queue in a separate goroutine. + go q.Start(context.Background()) + defer q.Stop() + + // Wait until all simple tasks are processed. + g.Eventually(func() int { + mu.Lock() + defer mu.Unlock() + count := 0 + for _, id := range executionOrder { + if id != "RequeueTask" { + count++ + } + } + return count + }, "5s", "10ms").Should(Equal(3), "All simple tasks should run") + + // Verify the order of execution so far. + // The RequeueTask should have been processed and moved to the back. + mu.Lock() + // The first task should be the RequeueTask. + g.Expect(executionOrder[0]).To(Equal("RequeueTask")) + // The next 3 tasks should be SimpleTasks + g.Expect(executionOrder[1:4]).To(Equal([]string{"SimpleTask-0", "SimpleTask-1", "SimpleTask-2"})) + mu.Unlock() + + // Allow the RequeueTask to finish. + close(requeueTaskCanFinish) + + // Wait for the RequeueTask to finish. + g.Eventually(requeueTaskFinished, "5s", "10ms").Should(BeClosed()) + + // Check final execution order. + mu.Lock() + g.Expect(len(executionOrder)).To(BeNumerically(">=", 5)) + // Last executed task must be RequeueTask + g.Expect(executionOrder[len(executionOrder)-1]).To(Equal("RequeueTask")) + mu.Unlock() + + g.Expect(q.Length()).To(Equal(0)) +} + +func Test_TaskQueue_Requeue(t *testing.T) { + g := NewWithT(t) + + metricStorage := metric.NewStorageMock(t) + metricStorage.HistogramObserveMock.Set(func(metric string, value float64, labels map[string]string, buckets []float64) { + assert.Equal(t, metric, "{PREFIX}tasks_queue_action_duration_seconds") + assert.NotZero(t, value) + assert.Equal(t, map[string]string{ + "queue_action": "AddLast", + "queue_name": "requeue-test-queue", + }, labels) + assert.Nil(t, buckets) + }) + + // A channel to control when RequeueTask can finish. + requeueTaskCanFinish := make(chan struct{}) + + // A channel to signal that RequeueTask has finished. + requeueTaskFinished := make(chan struct{}) + + // Store execution order. + executionOrder := make([]string, 0) + mu := &sync.Mutex{} + + // Create a new task queue + q := NewTasksQueueSlice() + q.WithMetricStorage(metricStorage) + q.WithName("requeue-test-queue") + q.WithContext(context.Background()) + + q.WaitLoopCheckInterval = 5 * time.Millisecond + q.DelayOnQueueIsEmpty = 5 * time.Millisecond + q.DelayOnRepeat = 5 * time.Millisecond + + // Define the handler for tasks + q.WithHandler(func(_ context.Context, tsk task.Task) TaskResult { + mu.Lock() + executionOrder = append(executionOrder, tsk.GetId()) + mu.Unlock() + + if tsk.GetId() == "RequeueTask" { + // If there are other tasks in the queue, move this task to the end. + if q.Length() > 1 { + return TaskResult{Status: Success, TailTasks: []task.Task{tsk}} + } + + // If no other tasks, wait for the signal to finish. + <-requeueTaskCanFinish + close(requeueTaskFinished) + return TaskResult{Status: Success} + } + + // For simple tasks, just succeed. + return TaskResult{Status: Success} + }) + + // Add the "requeue" task first. + requeueTask := task.BaseTask{Id: "RequeueTask", Type: task_metadata.HookRun} + q.AddLast(&requeueTask) + + // Add a few simple tasks. + for i := 0; i < 3; i++ { + simpleTask := task.BaseTask{Id: fmt.Sprintf("SimpleTask-%d", i), Type: task_metadata.HookRun} + q.AddLast(&simpleTask) + } + + g.Expect(q.Length()).To(Equal(4)) + + // Start processing the queue in a separate goroutine. + go q.Start(context.Background()) + defer q.Stop() + + // Wait until all simple tasks are processed. + g.Eventually(func() int { + mu.Lock() + defer mu.Unlock() + count := 0 + for _, id := range executionOrder { + if id != "RequeueTask" { + count++ + } + } + return count + }, "5s", "10ms").Should(Equal(3), "All simple tasks should run") + + // Verify the order of execution so far. + // The RequeueTask should have been processed and moved to the back. + mu.Lock() + // The first task should be the RequeueTask. + g.Expect(executionOrder[0]).To(Equal("RequeueTask")) + // The next 3 tasks should be SimpleTasks + g.Expect(executionOrder[1:4]).To(Equal([]string{"SimpleTask-0", "SimpleTask-1", "SimpleTask-2"})) + mu.Unlock() + + // Allow the RequeueTask to finish. + close(requeueTaskCanFinish) + + // Wait for the RequeueTask to finish. + g.Eventually(requeueTaskFinished, "5s", "10ms").Should(BeClosed()) + + // Check final execution order. + mu.Lock() + g.Expect(len(executionOrder)).To(BeNumerically(">=", 5)) + // Last executed task must be RequeueTask + g.Expect(executionOrder[len(executionOrder)-1]).To(Equal("RequeueTask")) + mu.Unlock() + + g.Expect(q.Length()).To(Equal(0)) +} diff --git a/pkg/task/queue/task_queue_test.go b/pkg/task/queue/task_queue_test.go index 6a39ca01..0d347d33 100644 --- a/pkg/task/queue/task_queue_test.go +++ b/pkg/task/queue/task_queue_test.go @@ -4,12 +4,15 @@ import ( "bytes" "context" "fmt" + "strings" "testing" "time" . "github.com/onsi/gomega" "github.com/stretchr/testify/assert" + "github.com/flant/shell-operator/pkg/hook/task_metadata" + htypes "github.com/flant/shell-operator/pkg/hook/types" "github.com/flant/shell-operator/pkg/metric" "github.com/flant/shell-operator/pkg/task" ) @@ -89,6 +92,70 @@ func Test_TasksQueue_Remove(t *testing.T) { )) } +func Test_TasksQueue_RemoveFirst(t *testing.T) { + g := NewWithT(t) + + metricStorage := metric.NewStorageMock(t) + metricStorage.HistogramObserveMock.Set(func(metric string, value float64, labels map[string]string, buckets []float64) { + assert.Equal(t, metric, "{PREFIX}tasks_queue_action_duration_seconds") + assert.NotZero(t, value) + assert.Equal(t, map[string]string{ + "queue_action": "AddFirst", + "queue_name": "", + }, labels) + assert.Nil(t, buckets) + }) + + q := NewTasksQueue().WithMetricStorage(metricStorage) + + // Remove just one element + Task := &task.BaseTask{Id: "First one"} + q.AddFirst(Task) + g.Expect(q.Length()).To(Equal(1)) + q.RemoveFirst() + g.Expect(q.Length()).To(Equal(0)) + + // Remove element in the middle + for i := 0; i < 5; i++ { + Task := &task.BaseTask{Id: fmt.Sprintf("task_%02d", i)} + q.AddFirst(Task) + } + g.Expect(q.Length()).To(Equal(5)) + q.RemoveFirst() + g.Expect(q.Length()).To(Equal(4)) + + idsDump := DumpTaskIds(q) + + g.Expect(idsDump).To(And( + ContainSubstring("task_00"), + ContainSubstring("task_01"), + ContainSubstring("task_02"), + )) + + // Remove last element + q.RemoveFirst() + g.Expect(q.Length()).To(Equal(3)) + + idsDump = DumpTaskIds(q) + + g.Expect(idsDump).To(And( + ContainSubstring("task_00"), + ContainSubstring("task_01"), + ContainSubstring("task_02"), + )) + + // Remove first element by id + q.RemoveFirst() + g.Expect(q.Length()).To(Equal(2)) + + idsDump = DumpTaskIds(q) + + g.Expect(idsDump).To(And( + ContainSubstring("task_01"), + ContainSubstring("task_00"), + )) +} + func Test_ExponentialBackoff(t *testing.T) { g := NewWithT(t) @@ -279,3 +346,74 @@ func Test_CancelDelay(t *testing.T) { "Should stop delaying after CancelTaskDelay call. Got delay of %s, expect less than %s. Check cancel delay not broken in Start or waitForTask.", elapsed.String(), (2 * mockExponentialDelay).String()) } + +func Test_QueueDump_HookMetadata_Task_Description(t *testing.T) { + g := NewWithT(t) + + logLabels := map[string]string{ + "hook": "hook1.sh", + } + + metricStorage := metric.NewStorageMock(t) + metricStorage.HistogramObserveMock.Set(func(metric string, value float64, labels map[string]string, buckets []float64) { + assert.Equal(t, metric, "{PREFIX}tasks_queue_action_duration_seconds") + assert.NotZero(t, value) + assert.Equal(t, map[string]string{ + "queue_action": "AddLast", + "queue_name": "", + }, labels) + assert.Nil(t, buckets) + }) + + q := NewTasksQueue().WithMetricStorage(metricStorage) + + q.AddLast(task.NewTask(task_metadata.EnableKubernetesBindings). + WithMetadata(task_metadata.HookMetadata{ + HookName: "hook1.sh", + Binding: string(task_metadata.EnableKubernetesBindings), + })) + + q.AddLast(task.NewTask(task_metadata.HookRun). + WithMetadata(task_metadata.HookMetadata{ + HookName: "hook1.sh", + BindingType: htypes.OnKubernetesEvent, + Binding: "monitor_pods", + }). + WithLogLabels(logLabels). + WithQueueName("main")) + + q.AddLast(task.NewTask(task_metadata.HookRun). + WithMetadata(task_metadata.HookMetadata{ + HookName: "hook1.sh", + BindingType: htypes.Schedule, + AllowFailure: true, + Binding: "every 1 sec", + Group: "monitor_pods", + }). + WithLogLabels(logLabels). + WithQueueName("main")) + + queueDump := taskQueueToText(q) + + g.Expect(queueDump).Should(ContainSubstring("hook1.sh"), "Queue dump should reveal a hook name.") + g.Expect(queueDump).Should(ContainSubstring("EnableKubernetesBindings"), "Queue dump should reveal EnableKubernetesBindings.") + g.Expect(queueDump).Should(ContainSubstring(":kubernetes:"), "Queue dump should show kubernetes binding.") + g.Expect(queueDump).Should(ContainSubstring(":schedule:"), "Queue dump should show schedule binding.") + g.Expect(queueDump).Should(ContainSubstring("group=monitor_pods"), "Queue dump should show group name.") +} + +func taskQueueToText(q *TaskQueue) string { + var buf strings.Builder + buf.WriteString(fmt.Sprintf("Queue '%s': length %d, status: '%s'\n", q.Name, q.Length(), q.Status)) + buf.WriteString("\n") + + index := 1 + q.Iterate(func(task task.Task) { + buf.WriteString(fmt.Sprintf("%2d. ", index)) + buf.WriteString(task.GetDescription()) + buf.WriteString("\n") + index++ + }) + + return buf.String() +} diff --git a/pkg/task/task.go b/pkg/task/task.go index 351fba5e..33d89ff4 100644 --- a/pkg/task/task.go +++ b/pkg/task/task.go @@ -3,6 +3,7 @@ package task import ( "fmt" "sync" + "sync/atomic" "time" "github.com/gofrs/uuid/v5" @@ -28,6 +29,8 @@ type Task interface { GetDescription() string GetProp(key string) interface{} SetProp(key string, value interface{}) + IsProcessing() bool + SetProcessing(bool) } type BaseTask struct { @@ -43,6 +46,8 @@ type BaseTask struct { lock sync.RWMutex Metadata interface{} + + processing atomic.Bool } func NewTask(taskType TaskType) *BaseTask { @@ -155,3 +160,11 @@ func (t *BaseTask) GetDescription() string { return fmt.Sprintf("%s:%s%s%s", t.GetType(), t.GetQueueName(), metaDescription, failDescription) } + +func (t *BaseTask) SetProcessing(val bool) { + t.processing.Store(val) +} + +func (t *BaseTask) IsProcessing() bool { + return t.processing.Load() +} diff --git a/test/hook/context/context_combiner.go b/test/hook/context/context_combiner.go index e83ebc74..01a0cd91 100644 --- a/test/hook/context/context_combiner.go +++ b/test/hook/context/context_combiner.go @@ -35,7 +35,11 @@ func NewContextCombiner() *ContextCombiner { op.MetricStorage = metricstorage.NewMetricStorage(context.Background(), "test-prefix", false, log.NewNop()) op.TaskQueues = queue.NewTaskQueueSet().WithMetricStorage(op.MetricStorage) op.TaskQueues.WithContext(context.Background()) - op.TaskQueues.NewNamedQueue(TestQueueName, nil) + op.TaskQueues.NewNamedQueue(TestQueueName, queue.QueueOpts{ + Handler: nil, + CompactableTypes: []task.TaskType{task_metadata.HookRun}, + CompactionCallback: nil, + }) return &ContextCombiner{ op: op, From 512ef2a43c345560eaf3d2199e1c82757cedb95e Mon Sep 17 00:00:00 2001 From: Timur Tuktamyshev Date: Fri, 8 Aug 2025 11:57:51 +0300 Subject: [PATCH 2/2] fix: change debug to lazy eval in hot paths Signed-off-by: Timur Tuktamyshev --- pkg/task/queue/task_queue_list.go | 68 ++++++++++++++++++------------- 1 file changed, 39 insertions(+), 29 deletions(-) diff --git a/pkg/task/queue/task_queue_list.go b/pkg/task/queue/task_queue_list.go index 18d006f5..c2fb3566 100644 --- a/pkg/task/queue/task_queue_list.go +++ b/pkg/task/queue/task_queue_list.go @@ -350,13 +350,15 @@ func (q *TaskQueue) AddLast(t task.Task) { // addLast adds a new tail element. // It implements the merging logic for HookRun tasks by scanning the whole queue. func (q *TaskQueue) addLast(t task.Task) { - q.logger.Debug("adding task to queue", - slog.String("queue", q.Name), - slog.String("task_id", t.GetId()), - slog.String("task_type", string(t.GetType())), - slog.String("task_description", t.GetDescription()), - slog.Int("queue_length_before", q.items.Len()), - ) + q.lazydebug("adding task to queue", func() []any { + return []any{ + slog.String("queue", q.Name), + slog.String("task_id", t.GetId()), + slog.String("task_type", string(t.GetType())), + slog.String("task_description", t.GetDescription()), + slog.Int("queue_length_before", q.items.Len()), + } + }) if _, ok := q.idIndex[t.GetId()]; ok { q.logger.Warn("task collision detected, unexpected behavior possible", slog.String("queue", q.Name), slog.String("task_id", t.GetId())) @@ -369,36 +371,44 @@ func (q *TaskQueue) addLast(t task.Task) { if _, ok := q.compactableTypes[taskType]; ok { q.isCompactable = true - q.logger.Debug("task is mergeable, marking queue as dirty", - slog.String("queue", q.Name), - slog.String("task_id", t.GetId()), - slog.String("task_type", string(taskType)), - slog.Int("queue_length", q.items.Len()), - slog.Bool("queue_is_dirty", q.isCompactable), - ) + q.lazydebug("task is mergeable, marking queue as dirty", func() []any { + return []any{ + slog.String("queue", q.Name), + slog.String("task_id", t.GetId()), + slog.String("task_type", string(taskType)), + slog.Int("queue_length", q.items.Len()), + slog.Bool("queue_is_dirty", q.isCompactable), + } + }) // Only trigger compaction if queue is getting long and we have mergeable tasks if q.items.Len() > compactionThreshold && q.isCompactable { - q.logger.Debug("triggering compaction due to queue length", - slog.String("queue", q.Name), - slog.Int("queue_length", q.items.Len()), - slog.Int("compaction_threshold", compactionThreshold), - ) + q.lazydebug("triggering compaction due to queue length", func() []any { + return []any{ + slog.String("queue", q.Name), + slog.Int("queue_length", q.items.Len()), + slog.Int("compaction_threshold", compactionThreshold), + } + }) currentQueue := q.items.Len() q.compaction() - q.logger.Debug("compaction finished", - slog.String("queue", q.Name), - slog.Int("queue_length_before", currentQueue), - slog.Int("queue_length_after", q.items.Len()), - ) + q.lazydebug("compaction finished", func() []any { + return []any{ + slog.String("queue", q.Name), + slog.Int("queue_length_before", currentQueue), + slog.Int("queue_length_after", q.items.Len()), + } + }) q.isCompactable = false } } else { - q.logger.Debug("task is not mergeable", - slog.String("queue", q.Name), - slog.String("task_id", t.GetId()), - slog.String("task_type", string(taskType)), - ) + q.lazydebug("task is not mergeable", func() []any { + return []any{ + slog.String("queue", q.Name), + slog.String("task_id", t.GetId()), + slog.String("task_type", string(taskType)), + } + }) } }