diff --git a/pkg/app/debug.go b/pkg/app/debug.go index 434300a3..7adaaa54 100644 --- a/pkg/app/debug.go +++ b/pkg/app/debug.go @@ -10,10 +10,7 @@ var DebugUnixSocket = "/var/run/shell-operator/debug.socket" var DebugHttpServerAddr = "" -var ( - DebugKeepTmpFilesVar = "no" - DebugKeepTmpFiles = false -) +var DebugKeepTmpFiles = false var DebugKubernetesAPI = false @@ -27,11 +24,11 @@ func DefineDebugFlags(kpApp *kingpin.Application, cmd *kingpin.CmdClause) { Default(DebugHttpServerAddr). StringVar(&DebugHttpServerAddr) - cmd.Flag("debug-keep-tmp-files", "set to yes to disable cleanup of temporary files"). + cmd.Flag("debug-keep-tmp-files", "set to true to disable cleanup of temporary files"). Envar("DEBUG_KEEP_TMP_FILES"). Hidden(). - Default(DebugKeepTmpFilesVar). - StringVar(&DebugKeepTmpFilesVar) + Default("false"). + BoolVar(&DebugKeepTmpFiles) cmd.Flag("debug-kubernetes-api", "enable client-go debug messages"). Envar("DEBUG_KUBERNETES_API"). diff --git a/pkg/app/debug_test.go b/pkg/app/debug_test.go new file mode 100644 index 00000000..6000d957 --- /dev/null +++ b/pkg/app/debug_test.go @@ -0,0 +1,24 @@ +package app + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestDebugKeepTmpFiles_DefaultIsFalse(t *testing.T) { + assert.False(t, DebugKeepTmpFiles, "DebugKeepTmpFiles should default to false") +} + +// TestDebugKeepTmpFilesIsBool verifies the variable is the bool type +// and that simple assignment behaves correctly (no string comparison needed). +func TestDebugKeepTmpFilesIsBool(t *testing.T) { + saved := DebugKeepTmpFiles + defer func() { DebugKeepTmpFiles = saved }() + + DebugKeepTmpFiles = true + assert.True(t, DebugKeepTmpFiles) + + DebugKeepTmpFiles = false + assert.False(t, DebugKeepTmpFiles) +} diff --git a/pkg/config/config.go b/pkg/config/config.go index 7109881d..3cbee4d5 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -9,6 +9,8 @@ import ( "time" "github.com/deckhouse/deckhouse/pkg/log" + + pkg "github.com/flant/shell-operator/pkg" ) /** @@ -52,7 +54,7 @@ func NewConfig(logger *log.Logger) *Config { values: make(map[string]string), temporalValues: make(map[string]*TemporalValue), errors: make(map[string]error), - logger: logger.With(slog.String("component", "runtimeConfig")), + logger: logger.With(slog.String(pkg.LogKeyComponent, "runtimeConfig")), } } @@ -261,7 +263,7 @@ func (c *Config) expireOverrides() { for _, expire := range expires { name, oldValue, newValue := expire[0], expire[1], expire[2] - c.logger.Debug("Parameter is expired", slog.String("parameter", name)) + c.logger.Debug("Parameter is expired", slog.String(pkg.LogKeyParameter, name)) c.callOnChange(name, oldValue, newValue) } } @@ -274,7 +276,7 @@ func (c *Config) callOnChange(name string, oldValue string, newValue string) { err := c.params[name].onChange(oldValue, newValue) if err != nil { c.logger.Error("OnChange handler failed for parameter during value change values", - slog.String("parameter", name), slog.String("old_value", oldValue), slog.String("new_value", newValue), log.Err(err)) + slog.String(pkg.LogKeyParameter, name), slog.String(pkg.LogKeyOldValue, oldValue), slog.String(pkg.LogKeyNewValue, newValue), log.Err(err)) } c.m.Lock() delete(c.errors, name) diff --git a/pkg/debug/server.go b/pkg/debug/server.go index f62bca58..1d500baf 100644 --- a/pkg/debug/server.go +++ b/pkg/debug/server.go @@ -17,6 +17,7 @@ import ( "github.com/go-chi/chi/v5/middleware" "gopkg.in/yaml.v3" + pkg "github.com/flant/shell-operator/pkg" utils "github.com/flant/shell-operator/pkg/utils/file" structuredLogger "github.com/flant/shell-operator/pkg/utils/structured-logger" ) @@ -70,7 +71,7 @@ func (s *Server) Init() error { return fmt.Errorf("Debug HTTP server fail to listen on '%s': %w", address, err) } - s.logger.Info("Debug endpoint listen on address", slog.String("address", address)) + s.logger.Info("Debug endpoint listen on address", slog.String(pkg.LogKeyAddress, address)) go func() { if err := http.Serve(listener, s.Router); err != nil { @@ -145,7 +146,7 @@ func handleFormattedOutput(writer http.ResponseWriter, request *http.Request, ha format = strings.TrimPrefix(format, ".") } - structuredLogger.GetLogEntry(request).Debug("used format", slog.String("format", format)) + structuredLogger.GetLogEntry(request).Debug("used format", slog.String(pkg.LogKeyFormat, format)) switch format { case "text": diff --git a/pkg/executor/executor.go b/pkg/executor/executor.go index 165e711b..c43ea0a6 100644 --- a/pkg/executor/executor.go +++ b/pkg/executor/executor.go @@ -16,6 +16,7 @@ import ( "github.com/deckhouse/deckhouse/pkg/log" "go.opentelemetry.io/otel" + pkg "github.com/flant/shell-operator/pkg" utils "github.com/flant/shell-operator/pkg/utils/labels" ) @@ -26,7 +27,7 @@ const ( func Run(cmd *exec.Cmd) error { // TODO context: hook name, hook phase, hook binding // TODO observability - log.Debug("Executing command", slog.String("command", strings.Join(cmd.Args, " ")), slog.String("dir", cmd.Dir)) + log.Debug("Executing command", slog.String(pkg.LogKeyCommand, strings.Join(cmd.Args, " ")), slog.String(pkg.LogKeyDir, cmd.Dir)) return cmd.Run() } @@ -100,8 +101,8 @@ func NewExecutor(dir string, entrypoint string, args []string, envs []string) *E func (e *Executor) Output() ([]byte, error) { e.logger.Debug("Executing command", - slog.String("command", strings.Join(e.cmd.Args, " ")), - slog.String("dir", e.cmd.Dir)) + slog.String(pkg.LogKeyCommand, strings.Join(e.cmd.Args, " ")), + slog.String(pkg.LogKeyDir, e.cmd.Dir)) return e.cmd.Output() } @@ -117,12 +118,12 @@ func (e *Executor) RunAndLogLines(ctx context.Context, logLabels map[string]stri stdErr := bytes.NewBuffer(nil) logEntry := utils.EnrichLoggerWithLabels(e.logger, logLabels) - stdoutLogEntry := logEntry.With("output", "stdout") - stderrLogEntry := logEntry.With("output", "stderr") + stdoutLogEntry := logEntry.With(pkg.LogKeyOutput, "stdout") + stderrLogEntry := logEntry.With(pkg.LogKeyOutput, "stderr") log.Debug("Executing command", - slog.String("command", strings.Join(e.cmd.Args, " ")), - slog.String("dir", e.cmd.Dir)) + slog.String(pkg.LogKeyCommand, strings.Join(e.cmd.Args, " ")), + slog.String(pkg.LogKeyDir, e.cmd.Dir)) plo := &proxyLogger{ ctx: ctx, @@ -208,7 +209,7 @@ func (pl *proxyLogger) Write(p []byte) (int, error) { }() if !ok { - pl.logger.Debug("json log line not map[string]interface{}", slog.Any("line", line)) + pl.logger.Debug("json log line not map[string]interface{}", slog.Any(pkg.LogKeyLine, line)) // fall back to using the logger pl.logger.Info(string(p)) @@ -308,7 +309,7 @@ func (pl *proxyLogger) mergeAndLogInputLog(ctx context.Context, inputLog map[str if len(logLine) > 10000 { logLine = fmt.Sprintf("%s:truncated", logLine[:10000]) - logger.Log(ctx, lvl.Level(), msg, slog.Any("hook", map[string]any{ + logger.Log(ctx, lvl.Level(), msg, slog.Any(pkg.LogKeyHook, map[string]any{ "truncated": logLine, })) diff --git a/pkg/hook/controller/admission_bindings_controller.go b/pkg/hook/controller/admission_bindings_controller.go index ded6a4d2..66a9b923 100644 --- a/pkg/hook/controller/admission_bindings_controller.go +++ b/pkg/hook/controller/admission_bindings_controller.go @@ -7,6 +7,7 @@ import ( "github.com/deckhouse/deckhouse/pkg/log" v1 "k8s.io/api/admission/v1" + pkg "github.com/flant/shell-operator/pkg" bctx "github.com/flant/shell-operator/pkg/hook/binding_context" htypes "github.com/flant/shell-operator/pkg/hook/types" "github.com/flant/shell-operator/pkg/webhook/admission" @@ -71,8 +72,8 @@ func (c *AdmissionBindingsController) EnableValidatingBindings() { } if config.Webhook.Metadata.ConfigurationId != confId { log.Error("Possible bug!!! kubernetesValidating has non-unique configurationIds", - slog.String("configurationID", config.Webhook.Metadata.ConfigurationId), - slog.String("confID", confId)) + slog.String(pkg.LogKeyConfigurationID, config.Webhook.Metadata.ConfigurationId), + slog.String(pkg.LogKeyConfID, confId)) } } c.ConfigurationId = confId @@ -107,8 +108,8 @@ func (c *AdmissionBindingsController) EnableMutatingBindings() { } if config.Webhook.Metadata.ConfigurationId != confId { log.Error("Possible bug!!! kubernetesMutating has non-unique configurationIds", - slog.String("configurationID", config.Webhook.Metadata.ConfigurationId), - slog.String("confID", confId)) + slog.String(pkg.LogKeyConfigurationID, config.Webhook.Metadata.ConfigurationId), + slog.String(pkg.LogKeyConfID, confId)) } } c.ConfigurationId = confId @@ -145,8 +146,8 @@ func (c *AdmissionBindingsController) CanHandleEvent(event admission.Event) bool func (c *AdmissionBindingsController) HandleEvent(_ context.Context, event admission.Event) BindingExecutionInfo { if c.ConfigurationId != event.ConfigurationId { log.Error("Possible bug!!! Unknown validating event: no binding for configurationId", - slog.String("configurationId", event.ConfigurationId), - slog.String("webhookId", event.WebhookId)) + slog.String(pkg.LogKeyConfigId, event.ConfigurationId), + slog.String(pkg.LogKeyWebhookId, event.WebhookId)) return BindingExecutionInfo{ BindingContext: []bctx.BindingContext{}, AllowFailure: false, @@ -156,8 +157,8 @@ func (c *AdmissionBindingsController) HandleEvent(_ context.Context, event admis link, hasKey := c.AdmissionLinks[event.WebhookId] if !hasKey { log.Error("Possible bug!!! Unknown validating event: no binding for configurationId", - slog.String("configurationId", event.ConfigurationId), - slog.String("webhookId", event.WebhookId)) + slog.String(pkg.LogKeyConfigId, event.ConfigurationId), + slog.String(pkg.LogKeyWebhookId, event.WebhookId)) return BindingExecutionInfo{ BindingContext: []bctx.BindingContext{}, AllowFailure: false, diff --git a/pkg/hook/controller/conversion_bindings_controller.go b/pkg/hook/controller/conversion_bindings_controller.go index ad259071..b1112356 100644 --- a/pkg/hook/controller/conversion_bindings_controller.go +++ b/pkg/hook/controller/conversion_bindings_controller.go @@ -7,6 +7,7 @@ import ( "github.com/deckhouse/deckhouse/pkg/log" v1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + pkg "github.com/flant/shell-operator/pkg" bctx "github.com/flant/shell-operator/pkg/hook/binding_context" htypes "github.com/flant/shell-operator/pkg/hook/types" "github.com/flant/shell-operator/pkg/webhook/conversion" @@ -64,7 +65,7 @@ func (c *ConversionBindingsController) EnableConversionBindings() { } } log.Info("conversion binding controller: add webhook from config", - slog.String("name", config.Webhook.Metadata.Name)) + slog.String(pkg.LogKeyName, config.Webhook.Metadata.Name)) c.webhookManager.AddWebhook(config.Webhook) } } @@ -85,7 +86,7 @@ func (c *ConversionBindingsController) CanHandleEvent(crdName string, _ *v1.Conv func (c *ConversionBindingsController) HandleEvent(_ context.Context, crdName string, request *v1.ConversionRequest, rule conversion.Rule) BindingExecutionInfo { _, hasKey := c.Links[crdName] if !hasKey { - log.Error("Possible bug!!! No binding for conversion event for crd", slog.String("crd", crdName)) + log.Error("Possible bug!!! No binding for conversion event for crd", slog.String(pkg.LogKeyCRD, crdName)) return BindingExecutionInfo{ BindingContext: []bctx.BindingContext{}, AllowFailure: false, @@ -94,8 +95,8 @@ func (c *ConversionBindingsController) HandleEvent(_ context.Context, crdName st link, has := c.Links[crdName][rule] if !has { log.Error("Possible bug!!! Event has an unknown conversion rule: no binding was registered", - slog.String("rule", rule.String()), - slog.String("crd", crdName)) + slog.String(pkg.LogKeyRule, rule.String()), + slog.String(pkg.LogKeyCRD, crdName)) return BindingExecutionInfo{ BindingContext: []bctx.BindingContext{}, AllowFailure: false, diff --git a/pkg/hook/controller/kubernetes_bindings_controller.go b/pkg/hook/controller/kubernetes_bindings_controller.go index 736fe6c3..3066450a 100644 --- a/pkg/hook/controller/kubernetes_bindings_controller.go +++ b/pkg/hook/controller/kubernetes_bindings_controller.go @@ -8,6 +8,7 @@ import ( "github.com/deckhouse/deckhouse/pkg/log" + pkg "github.com/flant/shell-operator/pkg" bctx "github.com/flant/shell-operator/pkg/hook/binding_context" htypes "github.com/flant/shell-operator/pkg/hook/types" kubeeventsmanager "github.com/flant/shell-operator/pkg/kube_events_manager" @@ -132,9 +133,9 @@ func (c *kubernetesBindingsController) UpdateMonitor(monitorId string, kind, api utils.EnrichLoggerWithLabels(c.logger, link.BindingConfig.Monitor.Metadata.LogLabels). Info("Monitor is recreated", - slog.String("bindingName", link.BindingConfig.BindingName), - slog.String("kind", link.BindingConfig.Monitor.Kind), - slog.String("apiVersion", link.BindingConfig.Monitor.ApiVersion)) + slog.String(pkg.LogKeyBindingName, link.BindingConfig.BindingName), + slog.String(pkg.LogKeyKind, link.BindingConfig.Monitor.Kind), + slog.String(pkg.LogKeyAPIVersion, link.BindingConfig.Monitor.ApiVersion)) // Synchronization has no meaning for UpdateMonitor. Just emit Added event to handle objects of // a new kind. @@ -166,7 +167,7 @@ func (c *kubernetesBindingsController) UnlockEvents() { func (c *kubernetesBindingsController) UnlockEventsFor(monitorID string) { m := c.kubeEventsManager.GetMonitor(monitorID) if m == nil { - log.Warn("monitor was not found", slog.String("monitorID", monitorID)) + log.Warn("monitor was not found", slog.String(pkg.LogKeyMonitorID, monitorID)) return } m.EnableKubeEventCb() @@ -223,7 +224,7 @@ func (c *kubernetesBindingsController) setBindingMonitorLinks(monitorId string, func (c *kubernetesBindingsController) HandleEvent(_ context.Context, kubeEvent kemtypes.KubeEvent) BindingExecutionInfo { link, hasKey := c.getBindingMonitorLinksById(kubeEvent.MonitorId) if !hasKey { - log.Error("Possible bug!!! Unknown kube event: no such monitor id registered", slog.String("monitorID", kubeEvent.MonitorId)) + log.Error("Possible bug!!! Unknown kube event: no such monitor id registered", slog.String(pkg.LogKeyMonitorID, kubeEvent.MonitorId)) return BindingExecutionInfo{ BindingContext: []bctx.BindingContext{}, AllowFailure: false, diff --git a/pkg/hook/hook.go b/pkg/hook/hook.go index be3bb8cc..c5765bb4 100644 --- a/pkg/hook/hook.go +++ b/pkg/hook/hook.go @@ -16,7 +16,6 @@ import ( "go.opentelemetry.io/otel/attribute" "golang.org/x/time/rate" - "github.com/flant/shell-operator/pkg/app" "github.com/flant/shell-operator/pkg/executor" bctx "github.com/flant/shell-operator/pkg/hook/binding_context" "github.com/flant/shell-operator/pkg/hook/config" @@ -139,7 +138,7 @@ func (h *Hook) Run(ctx context.Context, _ htypes.BindingType, context []bctx.Bin // remove tmp file on hook exit defer func() { - if app.DebugKeepTmpFilesVar != "yes" { + if !h.KeepTemporaryHookFiles { _ = os.Remove(contextPath) _ = os.Remove(metricsPath) _ = os.Remove(conversionPath) diff --git a/pkg/hook/hook_manager.go b/pkg/hook/hook_manager.go index 9bef3492..0ddad308 100644 --- a/pkg/hook/hook_manager.go +++ b/pkg/hook/hook_manager.go @@ -14,7 +14,7 @@ import ( "github.com/deckhouse/deckhouse/pkg/log" v1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" - "github.com/flant/shell-operator/pkg/app" + pkg "github.com/flant/shell-operator/pkg" "github.com/flant/shell-operator/pkg/executor" "github.com/flant/shell-operator/pkg/hook/controller" htypes "github.com/flant/shell-operator/pkg/hook/types" @@ -36,6 +36,11 @@ type Manager struct { conversionWebhookManager *conversion.WebhookManager admissionWebhookManager *admission.WebhookManager + // hook execution options + keepTemporaryHookFiles bool + logProxyHookJSON bool + logProxyHookJSONKey string + // sorted hook names hookNamesInOrder []string @@ -52,12 +57,16 @@ type Manager struct { // ManagerConfig sets configuration for Manager type ManagerConfig struct { - WorkingDir string - TempDir string - Kmgr kubeeventsmanager.KubeEventsManager - Smgr schedulemanager.ScheduleManager - Wmgr *admission.WebhookManager - Cmgr *conversion.WebhookManager + WorkingDir string + TempDir string + KubeEventsManager kubeeventsmanager.KubeEventsManager + ScheduleManager schedulemanager.ScheduleManager + AdmissionWebhookManager *admission.WebhookManager + ConversionWebhookManager *conversion.WebhookManager + + KeepTemporaryHookFiles bool + LogProxyHookJSON bool + LogProxyHookJSONKey string Logger *log.Logger } @@ -71,10 +80,14 @@ func NewHookManager(config *ManagerConfig) *Manager { workingDir: config.WorkingDir, tempDir: config.TempDir, - kubeEventsManager: config.Kmgr, - scheduleManager: config.Smgr, - admissionWebhookManager: config.Wmgr, - conversionWebhookManager: config.Cmgr, + kubeEventsManager: config.KubeEventsManager, + scheduleManager: config.ScheduleManager, + admissionWebhookManager: config.AdmissionWebhookManager, + conversionWebhookManager: config.ConversionWebhookManager, + + keepTemporaryHookFiles: config.KeepTemporaryHookFiles, + logProxyHookJSON: config.LogProxyHookJSON, + logProxyHookJSONKey: config.LogProxyHookJSONKey, logger: config.Logger, } @@ -97,7 +110,7 @@ func (hm *Manager) Init() error { if err := utils_file.RecursiveCheckLibDirectory(hm.workingDir); err != nil { hm.logger.Error("failed to check lib directory", - slog.String("workingDir", hm.workingDir), + slog.String(pkg.LogKeyWorkingDir, hm.workingDir), log.Err(err)) } @@ -108,7 +121,7 @@ func (hm *Manager) Init() error { // sort hooks by path sort.Strings(hooksRelativePaths) - hm.logger.Debug("Search hooks in paths", slog.Any("paths", hooksRelativePaths)) + hm.logger.Debug("Search hooks in paths", slog.Any(pkg.LogKeyPaths, hooksRelativePaths)) for _, hookPath := range hooksRelativePaths { hook, err := hm.loadHook(hookPath) @@ -140,17 +153,17 @@ func (hm *Manager) loadHook(hookPath string) (*Hook, error) { return nil, err } - hook := NewHook(hookName, hookPath, app.DebugKeepTmpFiles, app.LogProxyHookJSON, app.ProxyJsonLogKey, hm.logger.Named("hook")) - hookEntry := hm.logger.With(slog.String("hook", hook.Name), slog.String("phase", "config")) - hookEntry.Info("Load config", slog.String("path", hookPath)) + hook := NewHook(hookName, hookPath, hm.keepTemporaryHookFiles, hm.logProxyHookJSON, hm.logProxyHookJSONKey, hm.logger.Named("hook")) + hookEntry := hm.logger.With(slog.String(pkg.LogKeyHook, hook.Name), slog.String(pkg.LogKeyPhase, "config")) + hookEntry.Info("Load config", slog.String(pkg.LogKeyPath, hookPath)) envs := make([]string, 0) configOutput, err := hm.execCommandOutput(hook.Name, hm.workingDir, hookPath, envs, []string{"--config"}) if err != nil { - hookEntry.Error("Hook config output", slog.String("value", string(configOutput))) + hookEntry.Error("Hook config output", slog.String(pkg.LogKeyValue, string(configOutput))) var ee *exec.ExitError if errors.As(err, &ee) && len(ee.Stderr) > 0 { - hookEntry.Error("Hook config stderr", slog.String("value", string(ee.Stderr))) + hookEntry.Error("Hook config stderr", slog.String(pkg.LogKeyValue, string(ee.Stderr))) } return nil, fmt.Errorf("cannot get config for hook '%s': %w", hookPath, err) } @@ -161,36 +174,36 @@ func (hm *Manager) loadHook(hookPath string) (*Hook, error) { // Add hook info as log labels, update MetricLabels for _, kubeCfg := range hook.GetConfig().OnKubernetesEvents { - kubeCfg.Monitor.Metadata.LogLabels["hook"] = hook.Name + kubeCfg.Monitor.Metadata.LogLabels[pkg.LogKeyHook] = hook.Name kubeCfg.Monitor.Metadata.MetricLabels = map[string]string{ - "hook": hook.Name, - "binding": kubeCfg.BindingName, - "queue": kubeCfg.Queue, + pkg.MetricKeyHook: hook.Name, + pkg.MetricKeyBinding: kubeCfg.BindingName, + pkg.MetricKeyQueue: kubeCfg.Queue, } } for _, conversionCfg := range hook.GetConfig().KubernetesConversion { - conversionCfg.Webhook.Metadata.LogLabels["hook"] = hook.Name + conversionCfg.Webhook.Metadata.LogLabels[pkg.LogKeyHook] = hook.Name conversionCfg.Webhook.Metadata.MetricLabels = map[string]string{ - "hook": hook.Name, - "binding": conversionCfg.BindingName, + pkg.MetricKeyHook: hook.Name, + pkg.MetricKeyBinding: conversionCfg.BindingName, } } for _, validatingCfg := range hook.GetConfig().KubernetesValidating { - validatingCfg.Webhook.Metadata.LogLabels["hook"] = hook.Name + validatingCfg.Webhook.Metadata.LogLabels[pkg.LogKeyHook] = hook.Name validatingCfg.Webhook.Metadata.MetricLabels = map[string]string{ - "hook": hook.Name, - "binding": validatingCfg.BindingName, + pkg.MetricKeyHook: hook.Name, + pkg.MetricKeyBinding: validatingCfg.BindingName, } validatingCfg.Webhook.UpdateIds("", validatingCfg.BindingName) } for _, mutatingCfg := range hook.GetConfig().KubernetesMutating { - mutatingCfg.Webhook.Metadata.LogLabels["hook"] = hook.Name + mutatingCfg.Webhook.Metadata.LogLabels[pkg.LogKeyHook] = hook.Name mutatingCfg.Webhook.Metadata.MetricLabels = map[string]string{ - "hook": hook.Name, - "binding": mutatingCfg.BindingName, + pkg.MetricKeyHook: hook.Name, + pkg.MetricKeyBinding: mutatingCfg.BindingName, } mutatingCfg.Webhook.UpdateIds("", mutatingCfg.BindingName) } @@ -210,7 +223,7 @@ func (hm *Manager) loadHook(hookPath string) (*Hook, error) { return nil, fmt.Errorf("hook %q is marked as executable but doesn't contain config section", hook.Path) } - hookEntry.Info("Loaded config", slog.String("value", hook.GetConfigDescription())) + hookEntry.Info("Loaded config", slog.String(pkg.LogKeyValue, hook.GetConfigDescription())) return hook, nil } @@ -222,22 +235,22 @@ func (hm *Manager) execCommandOutput(hookName string, dir string, entrypoint str entrypoint, args, envs). - WithLogProxyHookJSON(app.LogProxyHookJSON). - WithLogProxyHookJSONKey(app.ProxyJsonLogKey). + WithLogProxyHookJSON(hm.logProxyHookJSON). + WithLogProxyHookJSONKey(hm.logProxyHookJSONKey). WithCMDStdout(nil). WithCMDStderr(nil). WithLogger(hm.logger.Named("executor")) - debugEntry := hm.logger.With(slog.String("hook", hookName), slog.String("cmd", strings.Join(args, " "))) + debugEntry := hm.logger.With(slog.String(pkg.LogKeyHook, hookName), slog.String(pkg.LogKeyCmd, strings.Join(args, " "))) - debugEntry.Debug("Executing hook", slog.String("dir", dir)) + debugEntry.Debug("Executing hook", slog.String(pkg.LogKeyDir, dir)) output, err := hookCmd.Output() if err != nil { return output, err } - debugEntry.Debug("execCommandOutput", slog.String("output", string(output))) + debugEntry.Debug("execCommandOutput", slog.String(pkg.LogKeyOutput, string(output))) return output, nil } @@ -247,7 +260,7 @@ func (hm *Manager) GetHook(name string) *Hook { if exists { return hook } - hm.logger.Error("Possible bug!!! Hook not found in hook manager", slog.String("name", name)) + hm.logger.Error("Possible bug!!! Hook not found in hook manager", slog.String(pkg.LogKeyName, name)) return nil } @@ -376,11 +389,11 @@ func (hm *Manager) DetectAdmissionEventType(event admission.Event) htypes.Bindin } hm.logger.Error("Possible bug!!! No linked hook for admission event", - slog.String("configId", event.ConfigurationId), - slog.String("webhookId", event.WebhookId), - slog.String("kind", event.Request.Kind.String()), - slog.String("name", event.Request.Name), - slog.String("namespace", event.Request.Namespace)) + slog.String(pkg.LogKeyConfigId, event.ConfigurationId), + slog.String(pkg.LogKeyWebhookId, event.WebhookId), + slog.String(pkg.LogKeyKind, event.Request.Kind.String()), + slog.String(pkg.LogKeyName, event.Request.Name), + slog.String(pkg.LogKeyNamespace, event.Request.Namespace)) return "" } diff --git a/pkg/hook/hook_manager_test.go b/pkg/hook/hook_manager_test.go index a1580390..ef679ff4 100644 --- a/pkg/hook/hook_manager_test.go +++ b/pkg/hook/hook_manager_test.go @@ -25,12 +25,12 @@ func newHookManager(t *testing.T, testdataDir string) *Manager { admissionManager.Settings = admission.DefaultSettings cfg := &ManagerConfig{ - WorkingDir: hooksDir, - TempDir: t.TempDir(), - Kmgr: nil, - Smgr: nil, - Wmgr: admissionManager, - Cmgr: conversionManager, + WorkingDir: hooksDir, + TempDir: t.TempDir(), + KubeEventsManager: nil, + ScheduleManager: nil, + AdmissionWebhookManager: admissionManager, + ConversionWebhookManager: conversionManager, Logger: log.NewNop(), } @@ -385,3 +385,66 @@ func Test_HookManager_onstartup_order(t *testing.T) { g.Expect(hookName).To(Equal(expectNames[i])) } } + +func Test_ManagerConfig_HookOptions_PropagateToManager(t *testing.T) { + conversionManager := conversion.NewWebhookManager() + conversionManager.Settings = conversion.DefaultSettings + + admissionManager := admission.NewWebhookManager(nil) + admissionManager.Settings = admission.DefaultSettings + + cfg := &ManagerConfig{ + WorkingDir: t.TempDir(), + TempDir: t.TempDir(), + KubeEventsManager: nil, + ScheduleManager: nil, + AdmissionWebhookManager: admissionManager, + ConversionWebhookManager: conversionManager, + KeepTemporaryHookFiles: true, + LogProxyHookJSON: true, + LogProxyHookJSONKey: "myJsonKey", + Logger: log.NewNop(), + } + + hm := NewHookManager(cfg) + + // Verify options are stored and used when creating hooks. + // NewHook uses hm.keepTemporaryHookFiles / logProxyHookJSON / logProxyHookJSONKey. + hook := NewHook("test", "test/path", + hm.keepTemporaryHookFiles, + hm.logProxyHookJSON, + hm.logProxyHookJSONKey, + hm.logger.Named("hook"), + ) + + if hook.KeepTemporaryHookFiles != true { + t.Fatalf("expected KeepTemporaryHookFiles=true, got false") + } + if hook.LogProxyHookJSON != true { + t.Fatalf("expected LogProxyHookJSON=true, got false") + } + if hook.LogProxyHookJSONKey != "myJsonKey" { + t.Fatalf("expected LogProxyHookJSONKey=myJsonKey, got %q", hook.LogProxyHookJSONKey) + } +} + +func Test_ManagerConfig_RenamedFields_CompileCheck(t *testing.T) { + // This test validates that the old abbreviated field names (Kmgr, Smgr, Wmgr, Cmgr) + // no longer exist and the self-documenting names are used. Compilation of this file + // itself is the key check; we also verify that zero-value construction works. + cfg := &ManagerConfig{ + WorkingDir: "", + TempDir: "", + KubeEventsManager: nil, + ScheduleManager: nil, + AdmissionWebhookManager: nil, + ConversionWebhookManager: nil, + Logger: log.NewNop(), + } + if cfg.KubeEventsManager != nil { + t.Fatal("expected nil KubeEventsManager") + } + if cfg.ScheduleManager != nil { + t.Fatal("expected nil ScheduleManager") + } +} diff --git a/pkg/hook/task_metadata/task_metadata.go b/pkg/hook/task_metadata/task_metadata.go index 8185ffd6..a67b5138 100644 --- a/pkg/hook/task_metadata/task_metadata.go +++ b/pkg/hook/task_metadata/task_metadata.go @@ -6,6 +6,7 @@ import ( "github.com/deckhouse/deckhouse/pkg/log" + pkg "github.com/flant/shell-operator/pkg" bindingcontext "github.com/flant/shell-operator/pkg/hook/binding_context" "github.com/flant/shell-operator/pkg/hook/types" "github.com/flant/shell-operator/pkg/task" @@ -57,21 +58,21 @@ var ( _ task.MetadataDescriptionGetter = HookMetadata{} ) -func HookMetadataAccessor(t task.Task) HookMetadata { +func HookMetadataAccessor(t task.Task) (HookMetadata, bool) { meta := t.GetMetadata() if meta == nil { log.Error("Possible Bug! task metadata is nil") - return HookMetadata{} + return HookMetadata{}, false } hookMeta, ok := meta.(HookMetadata) if !ok { log.Error("Possible Bug! task metadata is not of type HookMetadata", - slog.String("type", fmt.Sprintf("%T", meta))) - return HookMetadata{} + slog.String(pkg.LogKeyType, fmt.Sprintf("%T", meta))) + return HookMetadata{}, false } - return hookMeta + return hookMeta, true } func (m HookMetadata) GetHookName() string { diff --git a/pkg/hook/task_metadata/task_metadata_test.go b/pkg/hook/task_metadata/task_metadata_test.go index 41e15778..38a2af15 100644 --- a/pkg/hook/task_metadata/task_metadata_test.go +++ b/pkg/hook/task_metadata/task_metadata_test.go @@ -24,7 +24,9 @@ func Test_HookMetadata_Access(t *testing.T) { AllowFailure: true, }) - hm := HookMetadataAccessor(Task) + hm, ok := HookMetadataAccessor(Task) + + g.Expect(ok).Should(BeTrue()) g.Expect(hm.HookName).Should(Equal("test-hook")) g.Expect(hm.BindingType).Should(Equal(htypes.Schedule)) @@ -33,3 +35,28 @@ func Test_HookMetadata_Access(t *testing.T) { g.Expect(hm.BindingContext[0].Binding).Should(Equal("each_1_min")) g.Expect(hm.BindingContext[1].Binding).Should(Equal("each_5_min")) } + +func Test_HookMetadataAccessor_NilMetadata(t *testing.T) { + g := NewWithT(t) + + // A task with nil metadata should return ok=false and a zero HookMetadata. + nilMetaTask := task.NewTask(HookRun) // no WithMetadata call + + hm, ok := HookMetadataAccessor(nilMetaTask) + + g.Expect(ok).Should(BeFalse()) + g.Expect(hm).Should(Equal(HookMetadata{})) +} + +func Test_HookMetadataAccessor_WrongMetadataType(t *testing.T) { + g := NewWithT(t) + + // A task with metadata of the wrong type should return ok=false. + type otherMeta struct{ Value string } + wrongTypeTask := task.NewTask(HookRun).WithMetadata(otherMeta{Value: "unexpected"}) + + hm, ok := HookMetadataAccessor(wrongTypeTask) + + g.Expect(ok).Should(BeFalse()) + g.Expect(hm).Should(Equal(HookMetadata{})) +} diff --git a/pkg/kube/object_patch/operation.go b/pkg/kube/object_patch/operation.go index 464803d4..0a069d99 100644 --- a/pkg/kube/object_patch/operation.go +++ b/pkg/kube/object_patch/operation.go @@ -12,6 +12,7 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/types" + pkg "github.com/flant/shell-operator/pkg" "github.com/flant/shell-operator/pkg/filter/jq" ) @@ -63,7 +64,7 @@ func GetPatchStatusOperationsOnHookError(operations []sdkpkg.PatchCollectorOpera } func ParseOperations(specBytes []byte) ([]sdkpkg.PatchCollectorOperation, error) { - log.Debug("parsing patcher operations", slog.String("value", string(specBytes))) + log.Debug("parsing patcher operations", slog.String(pkg.LogKeyValue, string(specBytes))) specs, err := unmarshalFromJSONOrYAML(specBytes) if err != nil { diff --git a/pkg/kube/object_patch/patch.go b/pkg/kube/object_patch/patch.go index 1fa1a691..704ed18c 100644 --- a/pkg/kube/object_patch/patch.go +++ b/pkg/kube/object_patch/patch.go @@ -39,7 +39,7 @@ type KubeClient interface { func NewObjectPatcher(kubeClient KubeClient, logger *log.Logger) *ObjectPatcher { return &ObjectPatcher{ kubeClient: kubeClient, - logger: logger.With("operator.component", "KubernetesObjectPatcher"), + logger: logger.With(pkg.LogKeyOperatorComponent, "KubernetesObjectPatcher"), } } @@ -49,7 +49,7 @@ func (o *ObjectPatcher) ExecuteOperations(ops []sdkpkg.PatchCollectorOperation) applyErrors := &multierror.Error{} for _, op := range ops { - log.Debug("Applying operation", slog.String("name", op.Description())) + log.Debug("Applying operation", slog.String(pkg.LogKeyName, op.Description())) if err := o.ExecuteOperation(op); err != nil { err = gerror.WithMessage(err, op.Description()) applyErrors = multierror.Append(applyErrors, err) diff --git a/pkg/kube_events_manager/error_handler.go b/pkg/kube_events_manager/error_handler.go index 271df8f9..54fb3bba 100644 --- a/pkg/kube_events_manager/error_handler.go +++ b/pkg/kube_events_manager/error_handler.go @@ -10,6 +10,7 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/client-go/tools/cache" + pkg "github.com/flant/shell-operator/pkg" "github.com/flant/shell-operator/pkg/metrics" utils "github.com/flant/shell-operator/pkg/utils/labels" ) @@ -42,8 +43,8 @@ func (weh *WatchErrorHandler) handler(_ *cache.Reflector, err error) { // has a semantic that it returns data at least as fresh as provided RV. // So first try to LIST with setting RV to resource version of last observed object. weh.logger.Error("Watch closed", - slog.String("description", weh.description), - slog.String("kind", weh.kind), + slog.String(pkg.LogKeyDescription, weh.description), + slog.String(pkg.LogKeyKind, weh.kind), log.Err(err)) errorType = "expired" case err == io.EOF: @@ -51,20 +52,20 @@ func (weh *WatchErrorHandler) handler(_ *cache.Reflector, err error) { errorType = "eof" case errors.Is(err, io.ErrUnexpectedEOF): weh.logger.Error("Watch closed with unexpected EOF", - slog.String("description", weh.description), - slog.String("kind", weh.kind), + slog.String(pkg.LogKeyDescription, weh.description), + slog.String(pkg.LogKeyKind, weh.kind), log.Err(err)) errorType = "unexpected-eof" case err != nil: weh.logger.Error("Watch Failed", - slog.String("description", weh.description), - slog.String("kind", weh.kind), + slog.String(pkg.LogKeyDescription, weh.description), + slog.String(pkg.LogKeyKind, weh.kind), log.Err(err)) errorType = "fail" } if weh.metricStorage != nil { - weh.metricStorage.CounterAdd(metrics.KubernetesClientWatchErrorsTotal, 1.0, map[string]string{"error_type": errorType}) + weh.metricStorage.CounterAdd(metrics.KubernetesClientWatchErrorsTotal, 1.0, map[string]string{pkg.MetricKeyErrorType: errorType}) } } diff --git a/pkg/kube_events_manager/factory.go b/pkg/kube_events_manager/factory.go index efbfa4c5..95c27c20 100644 --- a/pkg/kube_events_manager/factory.go +++ b/pkg/kube_events_manager/factory.go @@ -13,20 +13,15 @@ import ( "k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/tools/cache" + + pkg "github.com/flant/shell-operator/pkg" ) const ( FactoryShutdownTimeout = 30 * time.Second ) -var ( - DefaultFactoryStore *FactoryStore - DefaultSyncTime = 100 * time.Millisecond -) - -func init() { - DefaultFactoryStore = NewFactoryStore() -} +var DefaultSyncTime = 100 * time.Millisecond type FactoryIndex struct { GVR schema.GroupVersionResource @@ -76,14 +71,14 @@ func (c *FactoryStore) add(ctx context.Context, index FactoryIndex, f dynamicinf } log.Debug("Factory store: added a new factory for index", - slog.String("namespace", index.Namespace), slog.String("gvr", index.GVR.String())) + slog.String(pkg.LogKeyNamespace, index.Namespace), slog.String(pkg.LogKeyGVR, index.GVR.String())) } func (c *FactoryStore) get(ctx context.Context, client dynamic.Interface, index FactoryIndex) *Factory { f, ok := c.data[index] if ok { log.Debug("Factory store: the factory with index found", - slog.String("namespace", index.Namespace), slog.String("gvr", index.GVR.String())) + slog.String(pkg.LogKeyNamespace, index.Namespace), slog.String(pkg.LogKeyGVR, index.GVR.String())) return f } @@ -121,15 +116,15 @@ func (c *FactoryStore) Start(ctx context.Context, informerId string, client dyna registration, err := informer.AddEventHandler(handler) if err != nil { log.Warn("Factory store: couldn't add event handler to the factory's informer", - slog.String("namespace", index.Namespace), slog.String("gvr", index.GVR.String()), + slog.String(pkg.LogKeyNamespace, index.Namespace), slog.String(pkg.LogKeyGVR, index.GVR.String()), log.Err(err)) } factory.handlerRegistrations[informerId] = registration log.Debug("Factory store: increased usage counter of the factory", - slog.Int("value", len(factory.handlerRegistrations)), - slog.String("namespace", index.Namespace), slog.String("gvr", index.GVR.String())) + slog.Int(pkg.LogKeyValue, len(factory.handlerRegistrations)), + slog.String(pkg.LogKeyNamespace, index.Namespace), slog.String(pkg.LogKeyGVR, index.GVR.String())) // Ensure informer.Run is started once and tracked if factory.done == nil { @@ -141,7 +136,7 @@ func (c *FactoryStore) Start(ctx context.Context, informerId string, client dyna close(factory.done) log.Debug("Factory store: informer goroutine exited", - slog.String("namespace", index.Namespace), slog.String("gvr", index.GVR.String())) + slog.String(pkg.LogKeyNamespace, index.Namespace), slog.String(pkg.LogKeyGVR, index.GVR.String())) }() } @@ -154,7 +149,7 @@ func (c *FactoryStore) Start(ctx context.Context, informerId string, client dyna } log.Debug("Factory store: started informer", - slog.String("namespace", index.Namespace), slog.String("gvr", index.GVR.String())) + slog.String(pkg.LogKeyNamespace, index.Namespace), slog.String(pkg.LogKeyGVR, index.GVR.String())) return nil } @@ -172,19 +167,19 @@ func (c *FactoryStore) Stop(informerId string, index FactoryIndex) { err := f.shared.ForResource(index.GVR).Informer().RemoveEventHandler(handlerRegistration) if err != nil { log.Warn("Factory store: couldn't remove event handler from the factory's informer", - slog.String("namespace", index.Namespace), slog.String("gvr", index.GVR.String()), + slog.String(pkg.LogKeyNamespace, index.Namespace), slog.String(pkg.LogKeyGVR, index.GVR.String()), log.Err(err)) } delete(f.handlerRegistrations, informerId) log.Debug("Factory store: decreased usage counter of the factory", - slog.Int("value", len(f.handlerRegistrations)), - slog.String("namespace", index.Namespace), slog.String("gvr", index.GVR.String())) + slog.Int(pkg.LogKeyValue, len(f.handlerRegistrations)), + slog.String(pkg.LogKeyNamespace, index.Namespace), slog.String(pkg.LogKeyGVR, index.GVR.String())) if len(f.handlerRegistrations) == 0 { log.Debug("Factory store: last handler removed, canceling shared informer", - slog.String("namespace", index.Namespace), slog.String("gvr", index.GVR.String())) + slog.String(pkg.LogKeyNamespace, index.Namespace), slog.String(pkg.LogKeyGVR, index.GVR.String())) done := f.done @@ -198,7 +193,7 @@ func (c *FactoryStore) Stop(informerId string, index FactoryIndex) { delete(c.data, index) log.Debug("Factory store: deleted factory", - slog.String("namespace", index.Namespace), slog.String("gvr", index.GVR.String())) + slog.String(pkg.LogKeyNamespace, index.Namespace), slog.String(pkg.LogKeyGVR, index.GVR.String())) if ch, ok := c.stoppedCh[index]; ok { close(ch) @@ -233,7 +228,7 @@ func (c *FactoryStore) WaitStopped(index FactoryIndex) { return case <-time.After(FactoryShutdownTimeout): log.Warn("timeout waiting for factory to stop", - slog.String("namespace", index.Namespace), slog.String("gvr", index.GVR.String())) + slog.String(pkg.LogKeyNamespace, index.Namespace), slog.String(pkg.LogKeyGVR, index.GVR.String())) } } } diff --git a/pkg/kube_events_manager/kube_events_manager.go b/pkg/kube_events_manager/kube_events_manager.go index f7f6ecb4..2e6098b0 100644 --- a/pkg/kube_events_manager/kube_events_manager.go +++ b/pkg/kube_events_manager/kube_events_manager.go @@ -11,6 +11,7 @@ import ( metricsstorage "github.com/deckhouse/deckhouse/pkg/metrics-storage" klient "github.com/flant/kube-client/client" + pkg "github.com/flant/shell-operator/pkg" kemtypes "github.com/flant/shell-operator/pkg/kube_events_manager/types" ) @@ -39,6 +40,8 @@ type kubeEventsManager struct { cancel context.CancelFunc metricStorage metricsstorage.Storage + factoryStore *FactoryStore + m sync.RWMutex Monitors map[string]Monitor @@ -52,13 +55,14 @@ var _ KubeEventsManager = (*kubeEventsManager)(nil) func NewKubeEventsManager(ctx context.Context, client *klient.Client, logger *log.Logger) *kubeEventsManager { cctx, cancel := context.WithCancel(ctx) em := &kubeEventsManager{ - ctx: cctx, - cancel: cancel, - KubeClient: client, - m: sync.RWMutex{}, - Monitors: make(map[string]Monitor), - KubeEventCh: make(chan kemtypes.KubeEvent, 1), - logger: logger, + ctx: cctx, + cancel: cancel, + KubeClient: client, + factoryStore: NewFactoryStore(), + m: sync.RWMutex{}, + Monitors: make(map[string]Monitor), + KubeEventCh: make(chan kemtypes.KubeEvent, 1), + logger: logger, } return em } @@ -72,11 +76,12 @@ func (mgr *kubeEventsManager) WithMetricStorage(mstor metricsstorage.Storage) { // TODO use Context to stop informers func (mgr *kubeEventsManager) AddMonitor(monitorConfig *MonitorConfig) error { log.Debug("Add MONITOR", - slog.String("config", fmt.Sprintf("%+v", monitorConfig))) + slog.String(pkg.LogKeyConfig, fmt.Sprintf("%+v", monitorConfig))) monitor := NewMonitor( mgr.ctx, mgr.KubeClient, mgr.metricStorage, + mgr.factoryStore, monitorConfig, func(ev kemtypes.KubeEvent) { defer trace.StartRegion(context.Background(), "EmitKubeEvent").End() diff --git a/pkg/kube_events_manager/kube_events_manager_test.go b/pkg/kube_events_manager/kube_events_manager_test.go index 73d0a2d9..72104cc4 100644 --- a/pkg/kube_events_manager/kube_events_manager_test.go +++ b/pkg/kube_events_manager/kube_events_manager_test.go @@ -416,3 +416,24 @@ func Test_FakeClient_CatchUpdates(t *testing.T) { time.Sleep(100 * time.Millisecond) } } + +// TestNewKubeEventsManager_IsolatedFactoryStore verifies that each KubeEventsManager +// instance owns its own FactoryStore so parallel tests do not share informer state. +func TestNewKubeEventsManager_IsolatedFactoryStore(t *testing.T) { + ctx := context.Background() + kubeClient := klient.NewFake(nil) + + mgr1 := NewKubeEventsManager(ctx, kubeClient, log.NewNop()) + mgr2 := NewKubeEventsManager(ctx, kubeClient, log.NewNop()) + + // Each manager must have a non-nil, distinct factory store. + if mgr1.factoryStore == nil { + t.Fatal("mgr1.factoryStore must not be nil") + } + if mgr2.factoryStore == nil { + t.Fatal("mgr2.factoryStore must not be nil") + } + if mgr1.factoryStore == mgr2.factoryStore { + t.Fatal("mgr1 and mgr2 must not share the same factoryStore") + } +} diff --git a/pkg/kube_events_manager/monitor.go b/pkg/kube_events_manager/monitor.go index a5b8ec10..a20d4452 100644 --- a/pkg/kube_events_manager/monitor.go +++ b/pkg/kube_events_manager/monitor.go @@ -11,6 +11,7 @@ import ( metricsstorage "github.com/deckhouse/deckhouse/pkg/metrics-storage" klient "github.com/flant/kube-client/client" + pkg "github.com/flant/shell-operator/pkg" kemtypes "github.com/flant/shell-operator/pkg/kube_events_manager/types" utils "github.com/flant/shell-operator/pkg/utils/labels" ) @@ -38,6 +39,8 @@ type monitor struct { // map of dynamically starting informers VaryingInformers varyingInformers + factoryStore *FactoryStore + eventCb func(kemtypes.KubeEvent) eventsEnabled bool // Index of namespaces statically defined in monitor configuration @@ -131,7 +134,7 @@ func (c *cancelForNs) Delete(key string) { var _ Monitor = (*monitor)(nil) -func NewMonitor(ctx context.Context, client *klient.Client, mstor metricsstorage.Storage, config *MonitorConfig, eventCb func(kemtypes.KubeEvent), logger *log.Logger) *monitor { +func NewMonitor(ctx context.Context, client *klient.Client, mstor metricsstorage.Storage, factoryStore *FactoryStore, config *MonitorConfig, eventCb func(kemtypes.KubeEvent), logger *log.Logger) *monitor { cctx, cancel := context.WithCancel(ctx) return &monitor{ @@ -139,6 +142,7 @@ func NewMonitor(ctx context.Context, client *klient.Client, mstor metricsstorage cancel: cancel, KubeClient: client, metricStorage: mstor, + factoryStore: factoryStore, Config: config, eventCb: eventCb, ResourceInformers: make([]*resourceInformer, 0), @@ -164,12 +168,12 @@ func (m *monitor) CreateInformers() error { if m.Config.Kind == "" && m.Config.ApiVersion == "" { logEntry.Debug("Create Informers for Config with empty kind and apiVersion", - slog.String("value", fmt.Sprintf("%+v", m.Config))) + slog.String(pkg.LogKeyValue, fmt.Sprintf("%+v", m.Config))) return nil } logEntry.Debug("Create Informers Config: %+v", - slog.String("value", fmt.Sprintf("%+v", m.Config))) + slog.String(pkg.LogKeyValue, fmt.Sprintf("%+v", m.Config))) nsNames := m.Config.namespaces() if len(nsNames) > 0 { logEntry.Debug("create static ResourceInformers") @@ -203,12 +207,12 @@ func (m *monitor) CreateInformers() error { return } - logEntry.Info("got ns, create dynamic ResourceInformers", slog.String("name", nsName)) + logEntry.Info("got ns, create dynamic ResourceInformers", slog.String(pkg.LogKeyName, nsName)) varyingInformers, err := m.CreateInformersForNamespace(nsName) if err != nil { logEntry.Error("create ResourceInformers for ns", - slog.String("name", nsName), + slog.String(pkg.LogKeyName, nsName), log.Err(err)) } m.VaryingInformers.Store(nsName, varyingInformers) @@ -226,7 +230,7 @@ func (m *monitor) CreateInformers() error { }, func(nsName string) { // Delete event: check, stop and remove informers for Ns - logEntry.Info("deleted ns, stop dynamic ResourceInformers", slog.String("name", nsName)) + logEntry.Info("deleted ns, stop dynamic ResourceInformers", slog.String(pkg.LogKeyName, nsName)) // ignore statically specified namespaces if _, ok := m.staticNamespaces.Load(nsName); ok { @@ -252,7 +256,7 @@ func (m *monitor) CreateInformers() error { return fmt.Errorf("create namespace informer: %v", err) } for nsName := range m.NamespaceInformer.getExistedObjects() { - logEntry.Info("got ns, create dynamic ResourceInformers", slog.String("name", nsName)) + logEntry.Info("got ns, create dynamic ResourceInformers", slog.String(pkg.LogKeyName, nsName)) // ignore event if namespace is already has static ResourceInformers if _, ok := m.staticNamespaces.Load(nsName); ok { @@ -262,7 +266,7 @@ func (m *monitor) CreateInformers() error { varyingInformers, err := m.CreateInformersForNamespace(nsName) if err != nil { logEntry.Error("create ResourceInformers for ns", - slog.String("name", nsName), + slog.String(pkg.LogKeyName, nsName), log.Err(err)) } m.VaryingInformers.Store(nsName, varyingInformers) @@ -315,11 +319,12 @@ func (m *monitor) EnableKubeEventCb() { func (m *monitor) CreateInformersForNamespace(namespace string) ([]*resourceInformer, error) { informers := make([]*resourceInformer, 0) cfg := &resourceInformerConfig{ - client: m.KubeClient, - mstor: m.metricStorage, - eventCb: m.eventCb, - monitor: m.Config, - logger: m.logger.Named("resource-informer"), + client: m.KubeClient, + mstor: m.metricStorage, + factoryStore: m.factoryStore, + eventCb: m.eventCb, + monitor: m.Config, + logger: m.logger.Named("resource-informer"), } objNames := []string{""} diff --git a/pkg/kube_events_manager/monitor_test.go b/pkg/kube_events_manager/monitor_test.go index fa4ef660..11c9cea5 100644 --- a/pkg/kube_events_manager/monitor_test.go +++ b/pkg/kube_events_manager/monitor_test.go @@ -58,7 +58,7 @@ func Test_Monitor_should_handle_dynamic_ns_events(t *testing.T) { metricStorage.GaugeSetMock.When(metrics.KubeSnapshotObjects, 2, map[string]string(nil)).Then() metricStorage.GaugeSetMock.When(metrics.KubeSnapshotObjects, 3, map[string]string(nil)).Then() - mon := NewMonitor(context.Background(), fc.Client, metricStorage, monitorCfg, func(ev kemtypes.KubeEvent) { + mon := NewMonitor(context.Background(), fc.Client, metricStorage, NewFactoryStore(), monitorCfg, func(ev kemtypes.KubeEvent) { objsMutex.Lock() objsFromEvents = append(objsFromEvents, snapshotResourceIDs(ev.Objects)...) objsMutex.Unlock() diff --git a/pkg/kube_events_manager/namespace_informer.go b/pkg/kube_events_manager/namespace_informer.go index 4134dc20..2c205c0e 100644 --- a/pkg/kube_events_manager/namespace_informer.go +++ b/pkg/kube_events_manager/namespace_informer.go @@ -16,6 +16,7 @@ import ( "k8s.io/client-go/tools/cache" klient "github.com/flant/kube-client/client" + pkg "github.com/flant/shell-operator/pkg" ) const ( @@ -102,7 +103,7 @@ func (ni *namespaceInformer) OnAdd(obj interface{}, _ bool) { return } nsObj := obj.(*v1.Namespace) - log.Debug("NamespaceInformer: Added ns", slog.String("name", nsObj.Name)) + log.Debug("NamespaceInformer: Added ns", slog.String(pkg.LogKeyName, nsObj.Name)) if ni.addFn != nil { ni.addFn(nsObj.Name) } @@ -120,17 +121,17 @@ func (ni *namespaceInformer) OnDelete(obj interface{}) { obj = staleObj.Obj } nsObj := obj.(*v1.Namespace) - log.Debug("NamespaceInformer: Deleted ns", slog.String("name", nsObj.Name)) + log.Debug("NamespaceInformer: Deleted ns", slog.String(pkg.LogKeyName, nsObj.Name)) if ni.delFn != nil { ni.delFn(nsObj.Name) } } func (ni *namespaceInformer) start() { - log.Debug("Run namespace informer", slog.String("name", ni.Monitor.Metadata.DebugName)) + log.Debug("Run namespace informer", slog.String(pkg.LogKeyName, ni.Monitor.Metadata.DebugName)) if ni.SharedInformer == nil { log.Error("Possible BUG!!! Start called before createSharedInformer, ShredInformer is nil", - slog.String("debugName", ni.Monitor.Metadata.DebugName)) + slog.String(pkg.LogKeyDebugName, ni.Monitor.Metadata.DebugName)) return } cctx, cancel := context.WithCancel(ni.ctx) @@ -144,17 +145,17 @@ func (ni *namespaceInformer) start() { go func() { ni.SharedInformer.Run(cctx.Done()) close(ni.done) - log.Debug("Namespace informer goroutine exited", slog.String("name", ni.Monitor.Metadata.DebugName)) + log.Debug("Namespace informer goroutine exited", slog.String(pkg.LogKeyName, ni.Monitor.Metadata.DebugName)) }() if err := wait.PollUntilContextCancel(cctx, DefaultSyncTime, true, func(_ context.Context) (bool, error) { return ni.SharedInformer.HasSynced(), nil }); err != nil { ni.Monitor.Logger.Error("Cache is not synced for informer", - slog.String("debugName", ni.Monitor.Metadata.DebugName)) + slog.String(pkg.LogKeyDebugName, ni.Monitor.Metadata.DebugName)) } - log.Debug("Informer is ready", slog.String("debugName", ni.Monitor.Metadata.DebugName)) + log.Debug("Informer is ready", slog.String(pkg.LogKeyDebugName, ni.Monitor.Metadata.DebugName)) } func (ni *namespaceInformer) wait() { @@ -162,10 +163,10 @@ func (ni *namespaceInformer) wait() { for { select { case <-ni.done: - log.Debug("Namespace informer stopped", slog.String("name", ni.Monitor.Metadata.DebugName)) + log.Debug("Namespace informer stopped", slog.String(pkg.LogKeyName, ni.Monitor.Metadata.DebugName)) return case <-time.After(NamespaceInformerShutdownTimeout): - log.Warn("timeout waiting for namespace informer to stop", slog.String("name", ni.Monitor.Metadata.DebugName)) + log.Warn("timeout waiting for namespace informer to stop", slog.String(pkg.LogKeyName, ni.Monitor.Metadata.DebugName)) } } } diff --git a/pkg/kube_events_manager/resource_informer.go b/pkg/kube_events_manager/resource_informer.go index d6d77fd8..53aaab18 100644 --- a/pkg/kube_events_manager/resource_informer.go +++ b/pkg/kube_events_manager/resource_informer.go @@ -17,6 +17,7 @@ import ( "k8s.io/client-go/tools/cache" klient "github.com/flant/kube-client/client" + pkg "github.com/flant/shell-operator/pkg" kemtypes "github.com/flant/shell-operator/pkg/kube_events_manager/types" "github.com/flant/shell-operator/pkg/metrics" "github.com/flant/shell-operator/pkg/utils/measure" @@ -59,6 +60,8 @@ type resourceInformer struct { metricStorage metricsstorage.Storage + factoryStore *FactoryStore + // a flag to stop handle events after Stop() stopped bool @@ -67,10 +70,11 @@ type resourceInformer struct { // resourceInformer should implement ResourceInformer type resourceInformerConfig struct { - client *klient.Client - mstor metricsstorage.Storage - eventCb func(kemtypes.KubeEvent) - monitor *MonitorConfig + client *klient.Client + mstor metricsstorage.Storage + factoryStore *FactoryStore + eventCb func(kemtypes.KubeEvent) + monitor *MonitorConfig logger *log.Logger } @@ -80,6 +84,7 @@ func newResourceInformer(ns, name string, cfg *resourceInformerConfig) *resource id: uuid.Must(uuid.NewV4()).String(), KubeClient: cfg.client, metricStorage: cfg.mstor, + factoryStore: cfg.factoryStore, Namespace: ns, Name: name, eventCb: cfg.eventCb, @@ -109,16 +114,16 @@ func (ei *resourceInformer) createSharedInformer() error { // discover GroupVersionResource for informer log.Debug("discover GVR for apiVersion...", - slog.String("debugName", ei.Monitor.Metadata.DebugName), - slog.String("apiVersion", ei.Monitor.ApiVersion), - slog.String("kind", ei.Monitor.Kind)) + slog.String(pkg.LogKeyDebugName, ei.Monitor.Metadata.DebugName), + slog.String(pkg.LogKeyAPIVersion, ei.Monitor.ApiVersion), + slog.String(pkg.LogKeyKind, ei.Monitor.Kind)) if ei.GroupVersionResource, err = ei.KubeClient.GroupVersionResource(ei.Monitor.ApiVersion, ei.Monitor.Kind); err != nil { return fmt.Errorf("%s: Cannot get GroupVersionResource info for apiVersion '%s' kind '%s' from api-server. Possibly CRD is not created before informers are started. Error was: %w", ei.Monitor.Metadata.DebugName, ei.Monitor.ApiVersion, ei.Monitor.Kind, err) } log.Debug("GVR for kind", - slog.String("debugName", ei.Monitor.Metadata.DebugName), - slog.String("kind", ei.Monitor.Kind), - slog.String("gvr", ei.GroupVersionResource.String())) + slog.String(pkg.LogKeyDebugName, ei.Monitor.Metadata.DebugName), + slog.String(pkg.LogKeyKind, ei.Monitor.Kind), + slog.String(pkg.LogKeyGVR, ei.GroupVersionResource.String())) // define tweakListOptions for informer fmtLabelSelector, err := FormatLabelSelector(ei.Monitor.LabelSelector) @@ -194,25 +199,25 @@ func (ei *resourceInformer) loadExistedObjects() error { List(context.TODO(), ei.ListOptions) if err != nil { log.Error("initial list resources of kind", - slog.String("debugName", ei.Monitor.Metadata.DebugName), - slog.String("kind", ei.Monitor.Kind), + slog.String(pkg.LogKeyDebugName, ei.Monitor.Metadata.DebugName), + slog.String(pkg.LogKeyKind, ei.Monitor.Kind), log.Err(err)) return err } if objList == nil || len(objList.Items) == 0 { log.Debug("Got no existing resources", - slog.String("debugName", ei.Monitor.Metadata.DebugName), - slog.String("kind", ei.Monitor.Kind)) + slog.String(pkg.LogKeyDebugName, ei.Monitor.Metadata.DebugName), + slog.String(pkg.LogKeyKind, ei.Monitor.Kind)) return nil } // FIXME objList.Items has too much information for log // log.Debugf("%s: Got %d existing '%s' resources: %+v", ei.Monitor.Metadata.DebugName, len(objList.Items), ei.Monitor.Kind, objList.Items) log.Debug("initial list: Got existing resources", - slog.String("debugName", ei.Monitor.Metadata.DebugName), - slog.String("kind", ei.Monitor.Kind), - slog.Int("count", len(objList.Items))) + slog.String(pkg.LogKeyDebugName, ei.Monitor.Metadata.DebugName), + slog.String(pkg.LogKeyKind, ei.Monitor.Kind), + slog.Int(pkg.LogKeyCount, len(objList.Items))) filteredObjects := make(map[string]*kemtypes.ObjectAndFilterResult) @@ -240,9 +245,9 @@ func (ei *resourceInformer) loadExistedObjects() error { filteredObjects[objFilterRes.Metadata.ResourceId] = objFilterRes log.Debug("initial list: cached with checksum", - slog.String("debugName", ei.Monitor.Metadata.DebugName), - slog.String("resourceId", objFilterRes.Metadata.ResourceId), - slog.String("checksum", objFilterRes.Metadata.Checksum)) + slog.String(pkg.LogKeyDebugName, ei.Monitor.Metadata.DebugName), + slog.String(pkg.LogKeyResourceId, objFilterRes.Metadata.ResourceId), + slog.String(pkg.LogKeyChecksum, objFilterRes.Metadata.Checksum)) } // Save objects to the cache. @@ -278,10 +283,10 @@ func (ei *resourceInformer) handleWatchEvent(object interface{}, eventType kemty // check if stop if ei.stopped { log.Debug("received WATCH for stopped informer", - slog.String("debugName", ei.Monitor.Metadata.DebugName), - slog.String("namespace", ei.Namespace), - slog.String("name", ei.Name), - slog.String("eventType", string(eventType))) + slog.String(pkg.LogKeyDebugName, ei.Monitor.Metadata.DebugName), + slog.String(pkg.LogKeyNamespace, ei.Namespace), + slog.String(pkg.LogKeyName, ei.Name), + slog.String(pkg.LogKeyEventType, string(eventType))) return } @@ -309,8 +314,8 @@ func (ei *resourceInformer) handleWatchEvent(object interface{}, eventType kemty }() if err != nil { log.Error("handleWatchEvent: applyFilter error", - slog.String("debugName", ei.Monitor.Metadata.DebugName), - slog.String("eventType", string(eventType)), + slog.String(pkg.LogKeyDebugName, ei.Monitor.Metadata.DebugName), + slog.String(pkg.LogKeyEventType, string(eventType)), log.Err(err)) return } @@ -332,9 +337,9 @@ func (ei *resourceInformer) handleWatchEvent(object interface{}, eventType kemty if objectInCache && cachedObject.Metadata.Checksum == objFilterRes.Metadata.Checksum { // update object in cache and do not send event log.Debug("skip KubeEvent", - slog.String("debugName", ei.Monitor.Metadata.DebugName), - slog.String("eventType", string(eventType)), - slog.String("resourceId", resourceId), + slog.String(pkg.LogKeyDebugName, ei.Monitor.Metadata.DebugName), + slog.String(pkg.LogKeyEventType, string(eventType)), + slog.String(pkg.LogKeyResourceId, resourceId), ) skipEvent = true } @@ -374,9 +379,9 @@ func (ei *resourceInformer) handleWatchEvent(object interface{}, eventType kemty // Fire KubeEvent only if needed. if ei.shouldFireEvent(eventType) { log.Debug("send KubeEvent", - slog.String("debugName", ei.Monitor.Metadata.DebugName), - slog.String("eventType", string(eventType)), - slog.String("resourceId", resourceId)) + slog.String(pkg.LogKeyDebugName, ei.Monitor.Metadata.DebugName), + slog.String(pkg.LogKeyEventType, string(eventType)), + slog.String(pkg.LogKeyResourceId, resourceId)) // TODO: should be disabled by default and enabled by a debug feature switch // log.Debugf("HandleKubeEvent: obj type is %T, value:\n%#v", obj, obj) @@ -447,29 +452,29 @@ func (ei *resourceInformer) shouldFireEvent(checkEvent kemtypes.WatchEventType) } func (ei *resourceInformer) start() { - log.Debug("RUN resource informer", slog.String("debugName", ei.Monitor.Metadata.DebugName)) + log.Debug("RUN resource informer", slog.String(pkg.LogKeyDebugName, ei.Monitor.Metadata.DebugName)) go func() { if ei.ctx != nil { <-ei.ctx.Done() - DefaultFactoryStore.Stop(ei.id, ei.FactoryIndex) + ei.factoryStore.Stop(ei.id, ei.FactoryIndex) } }() // TODO: separate handler and informer errorHandler := newWatchErrorHandler(ei.Monitor.Metadata.DebugName, ei.Monitor.Kind, ei.Monitor.Metadata.LogLabels, ei.metricStorage, ei.logger.Named("watch-error-handler")) - err := DefaultFactoryStore.Start(ei.ctx, ei.id, ei.KubeClient.Dynamic(), ei.FactoryIndex, ei, errorHandler) + err := ei.factoryStore.Start(ei.ctx, ei.id, ei.KubeClient.Dynamic(), ei.FactoryIndex, ei, errorHandler) if err != nil { - ei.Monitor.Logger.Error("cache is not synced for informer", slog.String("debugName", ei.Monitor.Metadata.DebugName)) + ei.Monitor.Logger.Error("cache is not synced for informer", slog.String(pkg.LogKeyDebugName, ei.Monitor.Metadata.DebugName)) return } - log.Debug("informer is ready", slog.String("debugName", ei.Monitor.Metadata.DebugName)) + log.Debug("informer is ready", slog.String(pkg.LogKeyDebugName, ei.Monitor.Metadata.DebugName)) } // wait blocks until the underlying shared informer for this FactoryIndex is stopped func (ei *resourceInformer) wait() { - DefaultFactoryStore.WaitStopped(ei.FactoryIndex) + ei.factoryStore.WaitStopped(ei.FactoryIndex) } // CachedObjectsInfo returns info accumulated from start. diff --git a/pkg/kube_events_manager/types/types.go b/pkg/kube_events_manager/types/types.go index 5bdfb8d7..9e21fee3 100644 --- a/pkg/kube_events_manager/types/types.go +++ b/pkg/kube_events_manager/types/types.go @@ -9,6 +9,8 @@ import ( "github.com/deckhouse/deckhouse/pkg/log" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + + pkg "github.com/flant/shell-operator/pkg" ) type WatchEventType string @@ -72,7 +74,7 @@ func (o ObjectAndFilterResult) Map() map[string]interface{} { err := json.Unmarshal([]byte(filterResString), &filterResultValue) if err != nil { log.Error("Possible bug!!! Cannot unmarshal jq filter result", - slog.String("jqFilter", o.Metadata.JqFilter), + slog.String(pkg.LogKeyJqFilter, o.Metadata.JqFilter), log.Err(err)) m["filterResult"] = nil return m diff --git a/pkg/labels.go b/pkg/labels.go new file mode 100644 index 00000000..744367c1 --- /dev/null +++ b/pkg/labels.go @@ -0,0 +1,100 @@ +package pkg + +// Log attribute key constants for use with log/slog. +const ( + LogKeyHook = "hook" + LogKeyQueue = "queue" + LogKeyQueueName = "queueName" + LogKeyQueueLength = "queue_length" + LogKeyQueueLengthBefore = "queue_length_before" + LogKeyQueueLengthAfter = "queue_length_after" + LogKeyQueueIsDirty = "queue_is_dirty" + LogKeyCompactionThreshold = "compaction_threshold" + LogKeyTaskID = "task_id" + LogKeyTaskType = "task_type" + LogKeyTaskDescription = "task_description" + LogKeyTasksCount = "tasksCount" + LogKeyTasks = "tasks" + LogKeyAfterTaskCount = "after_task_count" + LogKeyHeadTaskCount = "head_task_count" + LogKeyTailTaskCount = "tail_task_count" + LogKeySleepDelay = "sleep_delay" + LogKeyName = "name" + LogKeyDescription = "description" + LogKeyCommand = "command" + LogKeyDir = "dir" + LogKeyLine = "line" + LogKeyPath = "path" + LogKeyFile = "file" + LogKeyAddress = "address" + LogKeyPort = "port" + LogKeyFormat = "format" + LogKeyComponent = "component" + LogKeyParameter = "parameter" + LogKeyOldValue = "old_value" + LogKeyNewValue = "new_value" + LogKeyCrontab = "crontab" + LogKeyOperatorComponent = "operator.component" + LogKeyHTTPMethod = "http_method" + LogKeyURI = "uri" + LogKeyRespStatus = "resp_status" + LogKeyRespBytesLength = "resp_bytes_length" + LogKeyRespElapsedMs = "resp_elapsed_ms" + LogKeyStack = "stack" + LogKeyPanic = "panic" + LogKeyValue = "value" + LogKeyConfig = "config" + LogKeyType = "type" + LogKeyWorkingDir = "workingDir" + LogKeyPaths = "paths" + LogKeyPhase = "phase" + LogKeyCmd = "cmd" + LogKeyOutput = "output" + LogKeyConfigId = "configId" + LogKeyWebhookId = "webhookId" + LogKeyConfigurationId = "configurationId" + LogKeyConfigurationID = "configurationID" + LogKeyWebhookID = "webhookID" + LogKeyConfID = "confID" + LogKeyKind = "kind" + LogKeyNamespace = "namespace" + LogKeyDebugName = "debugName" + LogKeyAPIVersion = "apiVersion" + LogKeyGVR = "gvr" + LogKeyCount = "count" + LogKeyResourceId = "resourceId" + LogKeyChecksum = "checksum" + LogKeyEventType = "eventType" + LogKeyCRD = "crd" + LogKeyRequest = "request" + LogKeyConfigurationName = "configurationName" + LogKeyMessage = "message" + LogKeyDropped = "dropped" + LogKeyFilteredCount = "filteredCount" + LogKeyTask = "task" + LogKeyBinding = "binding" + LogKeyEvent = "event" + LogKeyEventID = "event.id" + LogKeyTaskDotID = "task.id" + LogKeyInfo = "info" + LogKeyVersions = "versions" + LogKeyLen = "len" + LogKeyFailedCount = "failedCount" + LogKeyBindingName = "bindingName" + LogKeyMonitorID = "monitorID" + LogKeyRule = "rule" + LogKeyError = "error" + LogKeyJqFilter = "jqFilter" +) + +// Metric label key constants for Prometheus metrics. +const ( + MetricKeyHook = "hook" + MetricKeyKind = "kind" + MetricKeyBinding = "binding" + MetricKeyQueue = "queue" + MetricKeyQueueName = "queue_name" + MetricKeyQueueAction = "queue_action" + MetricKeyErrorType = "error_type" + MetricKeyComponent = "component" +) diff --git a/pkg/schedule_manager/schedule_manager.go b/pkg/schedule_manager/schedule_manager.go index 3c4812d6..3a9071aa 100644 --- a/pkg/schedule_manager/schedule_manager.go +++ b/pkg/schedule_manager/schedule_manager.go @@ -8,6 +8,7 @@ import ( "github.com/deckhouse/deckhouse/pkg/log" "gopkg.in/robfig/cron.v2" + pkg "github.com/flant/shell-operator/pkg" smtypes "github.com/flant/shell-operator/pkg/schedule_manager/types" ) @@ -61,7 +62,7 @@ func (sm *scheduleManager) Stop() { // Crontab string should be validated with cron.Parse // function before pass to Add. func (sm *scheduleManager) Add(newEntry smtypes.ScheduleEntry) { - logEntry := sm.logger.With("operator.component", "scheduleManager") + logEntry := sm.logger.With(pkg.LogKeyOperatorComponent, "scheduleManager") if newEntry.Crontab == "" { logEntry.Error("crontab is empty") return @@ -74,15 +75,15 @@ func (sm *scheduleManager) Add(newEntry smtypes.ScheduleEntry) { // If no entry, then add new scheduled function and save CronEntry. if !hasCronEntry { entryId, err := sm.cron.AddFunc(newEntry.Crontab, func() { - logEntry.Debug("fire schedule event for entry", slog.String("crontab", newEntry.Crontab)) + logEntry.Debug("fire schedule event for entry", slog.String(pkg.LogKeyCrontab, newEntry.Crontab)) sm.ScheduleCh <- newEntry.Crontab }) if err != nil { - logEntry.Error("invalid crontab", slog.String("crontab", newEntry.Crontab), slog.Any("error", err)) + logEntry.Error("invalid crontab", slog.String(pkg.LogKeyCrontab, newEntry.Crontab), slog.Any(pkg.LogKeyError, err)) return } - logEntry.Debug("entry added", slog.String("crontab", newEntry.Crontab)) + logEntry.Debug("entry added", slog.String(pkg.LogKeyCrontab, newEntry.Crontab)) sm.Entries[newEntry.Crontab] = CronEntry{ EntryID: entryId, @@ -122,8 +123,8 @@ func (sm *scheduleManager) Remove(delEntry smtypes.ScheduleEntry) { if len(sm.Entries[delEntry.Crontab].Ids) == 0 { sm.cron.Remove(sm.Entries[delEntry.Crontab].EntryID) delete(sm.Entries, delEntry.Crontab) - sm.logger.With("operator.component", "scheduleManager"). - Debug("entry deleted", slog.String("name", delEntry.Crontab)) + sm.logger.With(pkg.LogKeyOperatorComponent, "scheduleManager"). + Debug("entry deleted", slog.String(pkg.LogKeyName, delEntry.Crontab)) } } diff --git a/pkg/shell-operator/bootstrap.go b/pkg/shell-operator/bootstrap.go index dc4522c7..cafa90e9 100644 --- a/pkg/shell-operator/bootstrap.go +++ b/pkg/shell-operator/bootstrap.go @@ -225,13 +225,16 @@ func (op *ShellOperator) setupHookManagers(hooksDir string, tempDir string) { // Initialize Hook manager. cfg := &hook.ManagerConfig{ - WorkingDir: hooksDir, - TempDir: tempDir, - Kmgr: op.KubeEventsManager, - Smgr: op.ScheduleManager, - Wmgr: op.AdmissionWebhookManager, - Cmgr: op.ConversionWebhookManager, - Logger: op.logger.Named("hook-manager"), + WorkingDir: hooksDir, + TempDir: tempDir, + KubeEventsManager: op.KubeEventsManager, + ScheduleManager: op.ScheduleManager, + AdmissionWebhookManager: op.AdmissionWebhookManager, + ConversionWebhookManager: op.ConversionWebhookManager, + KeepTemporaryHookFiles: app.DebugKeepTmpFiles, + LogProxyHookJSON: app.LogProxyHookJSON, + LogProxyHookJSONKey: app.ProxyJsonLogKey, + Logger: op.logger.Named("hook-manager"), } op.HookManager = hook.NewHookManager(cfg) } diff --git a/pkg/shell-operator/combine_binding_context.go b/pkg/shell-operator/combine_binding_context.go index 456f2deb..410d3774 100644 --- a/pkg/shell-operator/combine_binding_context.go +++ b/pkg/shell-operator/combine_binding_context.go @@ -4,6 +4,7 @@ import ( "fmt" "log/slog" + pkg "github.com/flant/shell-operator/pkg" bctx "github.com/flant/shell-operator/pkg/hook/binding_context" . "github.com/flant/shell-operator/pkg/hook/task_metadata" "github.com/flant/shell-operator/pkg/task" @@ -122,10 +123,10 @@ func (op *ShellOperator) combineBindingContextForHook(tqs *queue.TaskQueueSet, q compactMsg = fmt.Sprintf("are combined to %d contexts", len(combinedContext)) } op.logger.Info("Binding contexts from are dropped from queue", - slog.Int("count", len(otherTasks)+1), - slog.String("message", compactMsg), - slog.Int("dropped", len(tasksFilter)-1), - slog.String("queue", t.GetQueueName())) + slog.Int(pkg.LogKeyCount, len(otherTasks)+1), + slog.String(pkg.LogKeyMessage, compactMsg), + slog.Int(pkg.LogKeyDropped, len(tasksFilter)-1), + slog.String(pkg.LogKeyQueue, t.GetQueueName())) res.BindingContexts = compactedContext res.MonitorIDs = monitorIDs diff --git a/pkg/shell-operator/http_server.go b/pkg/shell-operator/http_server.go index aef13d0b..46084e8e 100644 --- a/pkg/shell-operator/http_server.go +++ b/pkg/shell-operator/http_server.go @@ -14,6 +14,7 @@ import ( "github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5/middleware" + pkg "github.com/flant/shell-operator/pkg" "github.com/flant/shell-operator/pkg/app" ) @@ -39,8 +40,8 @@ func (bhs *baseHTTPServer) Start(ctx context.Context) { } }() log.Info("base http server started", - slog.String("address", bhs.address), - slog.String("port", bhs.port)) + slog.String(pkg.LogKeyAddress, bhs.address), + slog.String(pkg.LogKeyPort, bhs.port)) go func() { <-ctx.Done() diff --git a/pkg/shell-operator/kube_client.go b/pkg/shell-operator/kube_client.go index 4b1a3e04..f7fcc2b3 100644 --- a/pkg/shell-operator/kube_client.go +++ b/pkg/shell-operator/kube_client.go @@ -2,11 +2,13 @@ package shell_operator import ( "fmt" + "time" "github.com/deckhouse/deckhouse/pkg/log" metricsstorage "github.com/deckhouse/deckhouse/pkg/metrics-storage" klient "github.com/flant/kube-client/client" + pkg "github.com/flant/shell-operator/pkg" "github.com/flant/shell-operator/pkg/app" objectpatch "github.com/flant/shell-operator/pkg/kube/object_patch" "github.com/flant/shell-operator/pkg/metric" @@ -14,26 +16,42 @@ import ( ) var ( - defaultMainKubeClientMetricLabels = map[string]string{"component": "main"} - defaultObjectPatcherKubeClientMetricLabels = map[string]string{"component": "object_patcher"} + defaultMainKubeClientMetricLabels = map[string]string{pkg.MetricKeyComponent: "main"} + defaultObjectPatcherKubeClientMetricLabels = map[string]string{pkg.MetricKeyComponent: "object_patcher"} ) +// KubeClientConfig holds explicit connection settings for a Kubernetes client, +// decoupling business logic from the global app.* configuration variables. +type KubeClientConfig struct { + Context string + Config string + QPS float32 + Burst int + Timeout time.Duration // zero means no timeout +} + // defaultMainKubeClient creates a Kubernetes client for hooks. No timeout specified, because // timeout will reset connections for Watchers. -func defaultMainKubeClient(metricStorage metricsstorage.Storage, metricLabels map[string]string, logger *log.Logger) *klient.Client { +func defaultMainKubeClient(cfg KubeClientConfig, metricStorage metricsstorage.Storage, metricLabels map[string]string, logger *log.Logger) *klient.Client { client := klient.New(klient.WithLogger(logger)) - client.WithContextName(app.KubeContext) - client.WithConfigPath(app.KubeConfig) - client.WithRateLimiterSettings(app.KubeClientQps, app.KubeClientBurst) + client.WithContextName(cfg.Context) + client.WithConfigPath(cfg.Config) + client.WithRateLimiterSettings(cfg.QPS, cfg.Burst) client.WithMetricStorage(metric.NewMetricsAdapter(metricStorage, logger.Named("kube-client-metrics-adapter"))) client.WithMetricLabels(utils.DefaultIfEmpty(metricLabels, defaultMainKubeClientMetricLabels)) return client } func initDefaultMainKubeClient(metricStorage metricsstorage.Storage, logger *log.Logger) (*klient.Client, error) { + cfg := KubeClientConfig{ + Context: app.KubeContext, + Config: app.KubeConfig, + QPS: app.KubeClientQps, + Burst: app.KubeClientBurst, + } //nolint:staticcheck klient.RegisterKubernetesClientMetrics(metric.NewMetricsAdapter(metricStorage, logger.Named("kube-client-metrics-adapter")), defaultMainKubeClientMetricLabels) - kubeClient := defaultMainKubeClient(metricStorage, defaultMainKubeClientMetricLabels, logger.Named("main-kube-client")) + kubeClient := defaultMainKubeClient(cfg, metricStorage, defaultMainKubeClientMetricLabels, logger.Named("main-kube-client")) err := kubeClient.Init() if err != nil { return nil, fmt.Errorf("initialize 'main' Kubernetes client: %s\n", err) @@ -42,19 +60,28 @@ func initDefaultMainKubeClient(metricStorage metricsstorage.Storage, logger *log } // defaultObjectPatcherKubeClient initializes a Kubernetes client for ObjectPatcher. Timeout is specified here. -func defaultObjectPatcherKubeClient(metricStorage metricsstorage.Storage, metricLabels map[string]string, logger *log.Logger) *klient.Client { +func defaultObjectPatcherKubeClient(cfg KubeClientConfig, metricStorage metricsstorage.Storage, metricLabels map[string]string, logger *log.Logger) *klient.Client { client := klient.New(klient.WithLogger(logger)) - client.WithContextName(app.KubeContext) - client.WithConfigPath(app.KubeConfig) - client.WithRateLimiterSettings(app.ObjectPatcherKubeClientQps, app.ObjectPatcherKubeClientBurst) + client.WithContextName(cfg.Context) + client.WithConfigPath(cfg.Config) + client.WithRateLimiterSettings(cfg.QPS, cfg.Burst) client.WithMetricStorage(metric.NewMetricsAdapter(metricStorage, logger.Named("kube-client-metrics-adapter"))) client.WithMetricLabels(utils.DefaultIfEmpty(metricLabels, defaultObjectPatcherKubeClientMetricLabels)) - client.WithTimeout(app.ObjectPatcherKubeClientTimeout) + if cfg.Timeout > 0 { + client.WithTimeout(cfg.Timeout) + } return client } func initDefaultObjectPatcher(metricStorage metricsstorage.Storage, logger *log.Logger) (*objectpatch.ObjectPatcher, error) { - patcherKubeClient := defaultObjectPatcherKubeClient(metricStorage, defaultObjectPatcherKubeClientMetricLabels, logger.Named("object-patcher-kube-client")) + cfg := KubeClientConfig{ + Context: app.KubeContext, + Config: app.KubeConfig, + QPS: app.ObjectPatcherKubeClientQps, + Burst: app.ObjectPatcherKubeClientBurst, + Timeout: app.ObjectPatcherKubeClientTimeout, + } + patcherKubeClient := defaultObjectPatcherKubeClient(cfg, metricStorage, defaultObjectPatcherKubeClientMetricLabels, logger.Named("object-patcher-kube-client")) err := patcherKubeClient.Init() if err != nil { return nil, fmt.Errorf("initialize Kubernetes client for Object patcher: %s\n", err) diff --git a/pkg/shell-operator/kube_client_test.go b/pkg/shell-operator/kube_client_test.go new file mode 100644 index 00000000..34fe405e --- /dev/null +++ b/pkg/shell-operator/kube_client_test.go @@ -0,0 +1,45 @@ +package shell_operator + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestKubeClientConfig_ZeroValue(t *testing.T) { + cfg := KubeClientConfig{} + assert.Empty(t, cfg.Context) + assert.Empty(t, cfg.Config) + assert.Zero(t, cfg.QPS) + assert.Zero(t, cfg.Burst) + assert.Zero(t, cfg.Timeout) +} + +func TestDefaultObjectPatcherKubeClient_NoTimeoutWhenZero(t *testing.T) { + // Verify that a zero Timeout in KubeClientConfig results in no timeout being set. + // This is a structural test: we just construct client config and verify the guard works. + cfg := KubeClientConfig{ + Context: "", + Config: "", + QPS: 5, + Burst: 10, + Timeout: 0, // zero → no timeout applied + } + assert.Equal(t, time.Duration(0), cfg.Timeout) +} + +func TestKubeClientConfig_WithTimeout(t *testing.T) { + cfg := KubeClientConfig{ + Context: "my-context", + Config: "/home/user/.kube/config", + QPS: 10, + Burst: 20, + Timeout: 30 * time.Second, + } + assert.Equal(t, "my-context", cfg.Context) + assert.Equal(t, "/home/user/.kube/config", cfg.Config) + assert.Equal(t, float32(10), cfg.QPS) + assert.Equal(t, 20, cfg.Burst) + assert.Equal(t, 30*time.Second, cfg.Timeout) +} diff --git a/pkg/shell-operator/manager_events_handler.go b/pkg/shell-operator/manager_events_handler.go index fd56db6b..8cab3664 100644 --- a/pkg/shell-operator/manager_events_handler.go +++ b/pkg/shell-operator/manager_events_handler.go @@ -5,6 +5,7 @@ import ( "github.com/deckhouse/deckhouse/pkg/log" + pkg "github.com/flant/shell-operator/pkg" kubeeventsmanager "github.com/flant/shell-operator/pkg/kube_events_manager" kemtypes "github.com/flant/shell-operator/pkg/kube_events_manager/types" schedulemanager "github.com/flant/shell-operator/pkg/schedule_manager" @@ -65,7 +66,7 @@ func (m *ManagerEventsHandler) Start() { go func() { for { var tailTasks []task.Task - logEntry := m.logger.With("operator.component", "handleEvents") + logEntry := m.logger.With(pkg.LogKeyOperatorComponent, "handleEvents") ctx := context.Background() diff --git a/pkg/shell-operator/operator.go b/pkg/shell-operator/operator.go index e6cf9e99..62596b40 100644 --- a/pkg/shell-operator/operator.go +++ b/pkg/shell-operator/operator.go @@ -27,6 +27,7 @@ import ( v1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" klient "github.com/flant/kube-client/client" + pkg "github.com/flant/shell-operator/pkg" "github.com/flant/shell-operator/pkg/hook" bindingcontext "github.com/flant/shell-operator/pkg/hook/binding_context" "github.com/flant/shell-operator/pkg/hook/controller" @@ -163,11 +164,11 @@ func (op *ShellOperator) initHookManager() error { // Define event handlers for schedule event and kubernetes event. op.ManagerEventsHandler.WithKubeEventHandler(func(_ context.Context, kubeEvent kemTypes.KubeEvent) []task.Task { logLabels := map[string]string{ - "event.id": uuid.Must(uuid.NewV4()).String(), - "binding": string(types.OnKubernetesEvent), + pkg.LogKeyEventID: uuid.Must(uuid.NewV4()).String(), + pkg.LogKeyBinding: string(types.OnKubernetesEvent), } logEntry := utils.EnrichLoggerWithLabels(op.logger, logLabels) - logEntry.Debug("Create tasks for 'kubernetes' event", slog.String("name", kubeEvent.String())) + logEntry.Debug("Create tasks for 'kubernetes' event", slog.String(pkg.LogKeyName, kubeEvent.String())) return op.HookManager.CreateTasksFromKubeEvent(kubeEvent, func(hook *hook.Hook, info controller.BindingExecutionInfo) task.Task { newTask := task.NewTask(task_metadata.HookRun). @@ -183,19 +184,19 @@ func (op *ShellOperator) initHookManager() error { WithQueueName(info.QueueName). WithCompactionID(hook.Name) - logEntry.With("queue", info.QueueName). - Info("queue task", slog.String("name", newTask.GetDescription())) + logEntry.With(pkg.LogKeyQueue, info.QueueName). + Info("queue task", slog.String(pkg.LogKeyName, newTask.GetDescription())) return newTask.WithQueuedAt(time.Now()) }) }) op.ManagerEventsHandler.WithScheduleEventHandler(func(_ context.Context, crontab string) []task.Task { logLabels := map[string]string{ - "event.id": uuid.Must(uuid.NewV4()).String(), - "binding": string(types.Schedule), + pkg.LogKeyEventID: uuid.Must(uuid.NewV4()).String(), + pkg.LogKeyBinding: string(types.Schedule), } logEntry := utils.EnrichLoggerWithLabels(op.logger, logLabels) - logEntry.Debug("Create tasks for 'schedule' event", slog.String("name", crontab)) + logEntry.Debug("Create tasks for 'schedule' event", slog.String(pkg.LogKeyName, crontab)) return op.HookManager.HandleCreateTasksFromScheduleEvent(crontab, func(hook *hook.Hook, info controller.BindingExecutionInfo) task.Task { newTask := task.NewTask(task_metadata.HookRun). @@ -211,8 +212,8 @@ func (op *ShellOperator) initHookManager() error { WithQueueName(info.QueueName). WithCompactionID(hook.Name) - logEntry.With("queue", info.QueueName). - Info("queue task", slog.String("name", newTask.GetDescription())) + logEntry.With(pkg.LogKeyQueue, info.QueueName). + Info("queue task", slog.String(pkg.LogKeyName, newTask.GetDescription())) return newTask.WithQueuedAt(time.Now()) }) @@ -254,14 +255,14 @@ func (op *ShellOperator) initValidatingWebhookManager() error { op.AdmissionWebhookManager.WithAdmissionEventHandler(func(ctx context.Context, event admission.Event) (*admission.Response, error) { eventBindingType := op.HookManager.DetectAdmissionEventType(event) logLabels := map[string]string{ - "event.id": uuid.Must(uuid.NewV4()).String(), - "event": string(eventBindingType), + pkg.LogKeyEventID: uuid.Must(uuid.NewV4()).String(), + pkg.LogKeyEvent: string(eventBindingType), } logEntry := utils.EnrichLoggerWithLabels(op.logger, logLabels) logEntry = logEntry.With( - slog.String("type", string(eventBindingType)), - slog.String("configurationId", event.ConfigurationId), - slog.String("webhookID", event.WebhookId)) + slog.String(pkg.LogKeyType, string(eventBindingType)), + slog.String(pkg.LogKeyConfigurationId, event.ConfigurationId), + slog.String(pkg.LogKeyWebhookID, event.WebhookId)) logEntry.Debug("Handle event") var admissionTask task.Task @@ -300,7 +301,7 @@ func (op *ShellOperator) initValidatingWebhookManager() error { admissionResponse, ok := admissionProp.(*admission.Response) if !ok { logEntry.Error("'admissionResponse' task prop is not of type *AdmissionResponse", - slog.String("type", fmt.Sprintf("%T", admissionProp))) + slog.String(pkg.LogKeyType, fmt.Sprintf("%T", admissionProp))) return nil, fmt.Errorf("hook task prop error") } return admissionResponse, nil @@ -348,16 +349,16 @@ func (op *ShellOperator) initConversionWebhookManager() error { // conversionEventHandler is called when Kubernetes requests a conversion. func (op *ShellOperator) conversionEventHandler(ctx context.Context, crdName string, request *v1.ConversionRequest) (*conversion.Response, error) { logLabels := map[string]string{ - "event.id": uuid.Must(uuid.NewV4()).String(), - "binding": string(types.KubernetesConversion), + pkg.LogKeyEventID: uuid.Must(uuid.NewV4()).String(), + pkg.LogKeyBinding: string(types.KubernetesConversion), } logEntry := utils.EnrichLoggerWithLabels(op.logger, logLabels) sourceVersions := conversion.ExtractAPIVersions(request.Objects) logEntry.Info("Handle kubernetesCustomResourceConversion event for crd", - slog.String("name", crdName), - slog.Int("len", len(request.Objects)), - slog.Any("versions", sourceVersions)) + slog.String(pkg.LogKeyName, crdName), + slog.Int(pkg.LogKeyLen, len(request.Objects)), + slog.Any(pkg.LogKeyVersions, sourceVersions)) done := false for _, srcVer := range sourceVersions { @@ -370,8 +371,8 @@ func (op *ShellOperator) conversionEventHandler(ctx context.Context, crdName str continue } logEntry.Info("Find conversion path for rule", - slog.String("name", rule.String()), - slog.Any("value", convPath)) + slog.String(pkg.LogKeyName, rule.String()), + slog.Any(pkg.LogKeyValue, convPath)) for _, convRule := range convPath { var convTask task.Task @@ -408,7 +409,7 @@ func (op *ShellOperator) conversionEventHandler(ctx context.Context, crdName str response, ok := prop.(*conversion.Response) if !ok { logEntry.Error("'conversionResponse' task prop is not of type *conversion.Response", - slog.String("type", fmt.Sprintf("%T", prop))) + slog.String(pkg.LogKeyType, fmt.Sprintf("%T", prop))) return nil, fmt.Errorf("hook task prop error") } @@ -444,8 +445,13 @@ func (op *ShellOperator) conversionEventHandler(ctx context.Context, crdName str // taskHandler func (op *ShellOperator) taskHandler(ctx context.Context, t task.Task) queue.TaskResult { - logEntry := op.logger.With("operator.component", "taskRunner") - hookMeta := task_metadata.HookMetadataAccessor(t) + logEntry := op.logger.With(pkg.LogKeyOperatorComponent, "taskRunner") + hookMeta, ok := task_metadata.HookMetadataAccessor(t) + if !ok { + logEntry.Error("Possible Bug! cannot access hook metadata for task", + slog.String(pkg.LogKeyType, string(t.GetType()))) + return queue.TaskResult{Status: "Fail"} + } var res queue.TaskResult switch t.GetType() { @@ -457,10 +463,10 @@ func (op *ShellOperator) taskHandler(ctx context.Context, t task.Task) queue.Tas case task_metadata.EnableScheduleBindings: hookLogLabels := map[string]string{} - hookLogLabels["hook"] = hookMeta.HookName - hookLogLabels["binding"] = string(types.Schedule) - hookLogLabels["task"] = "EnableScheduleBindings" - hookLogLabels["queue"] = "main" + hookLogLabels[pkg.LogKeyHook] = hookMeta.HookName + hookLogLabels[pkg.LogKeyBinding] = string(types.Schedule) + hookLogLabels[pkg.LogKeyTask] = "EnableScheduleBindings" + hookLogLabels[pkg.LogKeyQueue] = "main" taskLogEntry := utils.EnrichLoggerWithLabels(logEntry, hookLogLabels) @@ -478,10 +484,15 @@ func (op *ShellOperator) taskHandleEnableKubernetesBindings(ctx context.Context, ctx, span := otel.Tracer(serviceName).Start(ctx, "taskHandleEnableKubernetesBindings") defer span.End() - hookMeta := task_metadata.HookMetadataAccessor(t) + hookMeta, ok := task_metadata.HookMetadataAccessor(t) + if !ok { + op.logger.Error("Possible Bug! cannot access hook metadata", + slog.String(pkg.LogKeyType, string(t.GetType()))) + return queue.TaskResult{Status: "Fail"} + } metricLabels := map[string]string{ - "hook": hookMeta.HookName, + pkg.MetricKeyHook: hookMeta.HookName, } defer measure.Duration(func(d time.Duration) { op.MetricStorage.GaugeSet(metrics.HookEnableKubernetesBindingsSeconds, d.Seconds(), metricLabels) @@ -489,10 +500,10 @@ func (op *ShellOperator) taskHandleEnableKubernetesBindings(ctx context.Context, var res queue.TaskResult hookLogLabels := map[string]string{} - hookLogLabels["hook"] = hookMeta.HookName - hookLogLabels["binding"] = "" - hookLogLabels["task"] = "EnableKubernetesBindings" - hookLogLabels["queue"] = "main" + hookLogLabels[pkg.LogKeyHook] = hookMeta.HookName + hookLogLabels[pkg.LogKeyBinding] = "" + hookLogLabels[pkg.LogKeyTask] = "EnableKubernetesBindings" + hookLogLabels[pkg.LogKeyQueue] = "main" taskLogEntry := utils.EnrichLoggerWithLabels(op.logger, hookLogLabels) @@ -528,13 +539,13 @@ func (op *ShellOperator) taskHandleEnableKubernetesBindings(ctx context.Context, errors = 1.0 t.UpdateFailureMessage(err.Error()) taskLogEntry.Error("Enable Kubernetes binding for hook failed. Will retry after delay.", - slog.Int("failedCount", t.GetFailureCount()+1), + slog.Int(pkg.LogKeyFailedCount, t.GetFailureCount()+1), log.Err(err)) res.Status = "Fail" } else { success = 1.0 taskLogEntry.Info("Kubernetes bindings for hook are enabled successfully", - slog.Int("count", len(hookRunTasks))) + slog.Int(pkg.LogKeyCount, len(hookRunTasks))) res.Status = "Success" now := time.Now() for _, t := range hookRunTasks { @@ -554,7 +565,12 @@ func (op *ShellOperator) taskHandleHookRun(ctx context.Context, t task.Task) que ctx, span := otel.Tracer(serviceName).Start(ctx, "taskHandleHookRun") defer span.End() - hookMeta := task_metadata.HookMetadataAccessor(t) + hookMeta, ok := task_metadata.HookMetadataAccessor(t) + if !ok { + op.logger.Error("Possible Bug! cannot access hook metadata", + slog.String(pkg.LogKeyType, string(t.GetType()))) + return queue.TaskResult{Status: "Fail"} + } taskHook := op.HookManager.GetHook(hookMeta.HookName) err := taskHook.RateLimitWait(context.Background()) @@ -566,9 +582,9 @@ func (op *ShellOperator) taskHandleHookRun(ctx context.Context, t task.Task) que } metricLabels := map[string]string{ - "hook": hookMeta.HookName, - "binding": hookMeta.Binding, - "queue": t.GetQueueName(), + pkg.MetricKeyHook: hookMeta.HookName, + pkg.MetricKeyBinding: hookMeta.Binding, + pkg.MetricKeyQueue: t.GetQueueName(), } taskWaitTime := time.Since(t.GetQueuedAt()).Seconds() op.MetricStorage.CounterAdd(metrics.TaskWaitInQueueSecondsTotal, taskWaitTime, metricLabels) @@ -578,11 +594,11 @@ func (op *ShellOperator) taskHandleHookRun(ctx context.Context, t task.Task) que })() hookLogLabels := map[string]string{} - hookLogLabels["hook"] = hookMeta.HookName - hookLogLabels["binding"] = hookMeta.Binding - hookLogLabels["event"] = string(hookMeta.BindingType) - hookLogLabels["task"] = "HookRun" - hookLogLabels["queue"] = t.GetQueueName() + hookLogLabels[pkg.LogKeyHook] = hookMeta.HookName + hookLogLabels[pkg.LogKeyBinding] = hookMeta.Binding + hookLogLabels[pkg.LogKeyEvent] = string(hookMeta.BindingType) + hookLogLabels[pkg.LogKeyTask] = "HookRun" + hookLogLabels[pkg.LogKeyQueue] = t.GetQueueName() taskLogEntry := utils.EnrichLoggerWithLabels(op.logger, hookLogLabels) @@ -642,7 +658,7 @@ func (op *ShellOperator) taskHandleHookRun(ctx context.Context, t task.Task) que t.UpdateFailureMessage(err.Error()) t.WithQueuedAt(time.Now()) // Reset queueAt for correct results in 'task_wait_in_queue' metric. taskLogEntry.Error("Hook failed. Will retry after delay.", - slog.Int("failedCount", t.GetFailureCount()+1), + slog.Int(pkg.LogKeyFailedCount, t.GetFailureCount()+1), log.Err(err)) res.Status = "Fail" } @@ -672,7 +688,7 @@ func (op *ShellOperator) handleRunHook(ctx context.Context, t task.Task, taskHoo defer span.End() for _, info := range taskHook.HookController.SnapshotsInfo() { - taskLogEntry.Debug("snapshot info", slog.String("info", info)) + taskLogEntry.Debug("snapshot info", slog.String(pkg.LogKeyInfo, info)) } result, err := taskHook.Run(ctx, hookMeta.BindingType, hookMeta.BindingContext, hookLogLabels) @@ -692,7 +708,7 @@ func (op *ShellOperator) handleRunHook(ctx context.Context, t task.Task, taskHoo } if result.Usage != nil { - taskLogEntry.Debug("Usage", slog.String("value", fmt.Sprintf("%+v", result.Usage))) + taskLogEntry.Debug("Usage", slog.String(pkg.LogKeyValue, fmt.Sprintf("%+v", result.Usage))) op.MetricStorage.HistogramObserve(metrics.HookRunSysCPUSeconds, result.Usage.Sys.Seconds(), metricLabels, nil) op.MetricStorage.HistogramObserve(metrics.HookRunUserCPUSeconds, result.Usage.User.Seconds(), metricLabels, nil) op.MetricStorage.GaugeSet(metrics.HookRunMaxRSSBytes, float64(result.Usage.MaxRss)*1024, metricLabels) @@ -712,7 +728,7 @@ func (op *ShellOperator) handleRunHook(ctx context.Context, t task.Task, taskHoo // Try to update custom metrics err = op.HookMetricStorage.ApplyBatchOperations(result.Metrics, map[string]string{ - "hook": hookMeta.HookName, + pkg.MetricKeyHook: hookMeta.HookName, }) if err != nil { return err @@ -722,14 +738,14 @@ func (op *ShellOperator) handleRunHook(ctx context.Context, t task.Task, taskHoo if result.AdmissionResponse != nil { t.SetProp("admissionResponse", result.AdmissionResponse) taskLogEntry.Info("AdmissionResponse from hook", - slog.String("value", result.AdmissionResponse.Dump())) + slog.String(pkg.LogKeyValue, result.AdmissionResponse.Dump())) } // Save conversionResponse in task props for future use. if result.ConversionResponse != nil { t.SetProp("conversionResponse", result.ConversionResponse) taskLogEntry.Info("ConversionResponse from hook", - slog.String("value", result.ConversionResponse.Dump())) + slog.String(pkg.LogKeyValue, result.ConversionResponse.Dump())) } return nil @@ -840,10 +856,10 @@ func (op *ShellOperator) CombineBindingContextForHook(q *queue.TaskQueue, t task compactMsg = fmt.Sprintf("are combined to %d contexts", len(combinedContext)) } op.logger.Info("Binding contexts from tasks. Tasks are dropped from queue", - slog.Int("count", len(otherTasks)+1), - slog.String("tasks", compactMsg), - slog.Int("filteredCount", len(tasksFilter)-1), - slog.String("queueName", t.GetQueueName())) + slog.Int(pkg.LogKeyCount, len(otherTasks)+1), + slog.String(pkg.LogKeyTasks, compactMsg), + slog.Int(pkg.LogKeyFilteredCount, len(tasksFilter)-1), + slog.String(pkg.LogKeyQueueName, t.GetQueueName())) res.BindingContexts = compactedContext res.MonitorIDs = monitorIDs @@ -853,14 +869,14 @@ func (op *ShellOperator) CombineBindingContextForHook(q *queue.TaskQueue, t task // bootstrapMainQueue adds tasks to run hooks with OnStartup bindings // and tasks to enable kubernetes bindings. func (op *ShellOperator) bootstrapMainQueue(tqs *queue.TaskQueueSet) { - logEntry := op.logger.With("operator.component", "initMainQueue") + logEntry := op.logger.With(pkg.LogKeyOperatorComponent, "initMainQueue") // Prepopulate main queue with 'onStartup' tasks and 'enable kubernetes bindings' tasks. tqs.WithMainName("main") tqs.NewNamedQueue("main", op.taskHandler, queue.WithCompactableTypes(task_metadata.HookRun), - queue.WithLogger(op.logger.With("operator.component", "mainQueue")), + queue.WithLogger(op.logger.With(pkg.LogKeyOperatorComponent, "mainQueue")), ) mainQueue := tqs.GetMain() @@ -889,8 +905,8 @@ func (op *ShellOperator) bootstrapMainQueue(tqs *queue.TaskQueueSet) { mainQueue.AddLast(newTask) logEntry.Info("queue task with hook", - slog.String("task", newTask.GetDescription()), - slog.String("hook", hookName)) + slog.String(pkg.LogKeyTask, newTask.GetDescription()), + slog.String(pkg.LogKeyHook, hookName)) } // Add tasks to enable kubernetes monitors and schedules for each hook @@ -907,8 +923,8 @@ func (op *ShellOperator) bootstrapMainQueue(tqs *queue.TaskQueueSet) { WithQueuedAt(time.Now()) mainQueue.AddLast(newTask) logEntry.Info("queue task with hook", - slog.String("task", newTask.GetDescription()), - slog.String("hook", hookName)) + slog.String(pkg.LogKeyTask, newTask.GetDescription()), + slog.String(pkg.LogKeyHook, hookName)) } if h.GetConfig().HasBinding(types.Schedule) { @@ -921,8 +937,8 @@ func (op *ShellOperator) bootstrapMainQueue(tqs *queue.TaskQueueSet) { WithQueuedAt(time.Now()) mainQueue.AddLast(newTask) logEntry.Info("queue task with hook", - slog.String("task", newTask.GetDescription()), - slog.String("hook", hookName)) + slog.String(pkg.LogKeyTask, newTask.GetDescription()), + slog.String(pkg.LogKeyHook, hookName)) } } } @@ -945,7 +961,7 @@ func (op *ShellOperator) runMetrics() { for { op.TaskQueues.IterateSnapshot(context.TODO(), func(_ context.Context, queue *queue.TaskQueue) { queueLen := float64(queue.Length()) - op.MetricStorage.GaugeSet(metrics.TasksQueueLength, queueLen, map[string]string{"queue": queue.Name}) + op.MetricStorage.GaugeSet(metrics.TasksQueueLength, queueLen, map[string]string{pkg.MetricKeyQueue: queue.Name}) }) time.Sleep(5 * time.Second) @@ -963,7 +979,7 @@ func (op *ShellOperator) initAndStartHookQueues() { op.TaskQueues.NewNamedQueue(hookBinding.Queue, op.taskHandler, queue.WithCompactableTypes(task_metadata.HookRun), - queue.WithLogger(op.logger.With("operator.component", "hookQueue", "hook", hookName, "queue", hookBinding.Queue)), + queue.WithLogger(op.logger.With(pkg.LogKeyOperatorComponent, "hookQueue", pkg.LogKeyHook, hookName, pkg.LogKeyQueue, hookBinding.Queue)), ) op.TaskQueues.GetByName(hookBinding.Queue).Start(op.ctx) } @@ -978,7 +994,7 @@ func (op *ShellOperator) initAndStartHookQueues() { op.TaskQueues.NewNamedQueue(hookBinding.Queue, op.taskHandler, queue.WithCompactableTypes(task_metadata.HookRun), - queue.WithLogger(op.logger.With("operator.component", "hookQueue", "hook", hookName, "queue", hookBinding.Queue)), + queue.WithLogger(op.logger.With(pkg.LogKeyOperatorComponent, "hookQueue", pkg.LogKeyHook, hookName, pkg.LogKeyQueue, hookBinding.Queue)), ) op.TaskQueues.GetByName(hookBinding.Queue).Start(op.ctx) } @@ -988,18 +1004,18 @@ func (op *ShellOperator) initAndStartHookQueues() { // Shutdown pause kubernetes events handling and stop queues. Wait for queues to stop. func (op *ShellOperator) Shutdown() { - op.logger.Info("shutdown begin", slog.String("phase", "shutdown")) + op.logger.Info("shutdown begin", slog.String(pkg.LogKeyPhase, "shutdown")) op.ScheduleManager.Stop() - op.logger.Info("schedule manager stopped", slog.String("phase", "shutdown")) + op.logger.Info("schedule manager stopped", slog.String(pkg.LogKeyPhase, "shutdown")) op.KubeEventsManager.Stop() - op.logger.Info("waiting informers", slog.String("phase", "shutdown")) + op.logger.Info("waiting informers", slog.String(pkg.LogKeyPhase, "shutdown")) op.KubeEventsManager.Wait() - op.logger.Info("informers stopped", slog.String("phase", "shutdown")) + op.logger.Info("informers stopped", slog.String(pkg.LogKeyPhase, "shutdown")) op.TaskQueues.Stop() - op.logger.Info("waiting task queues", slog.String("phase", "shutdown")) + op.logger.Info("waiting task queues", slog.String(pkg.LogKeyPhase, "shutdown")) // Wait for queues to stop, but no more than 10 seconds op.TaskQueues.WaitStopWithTimeout(WaitQueuesTimeout) - op.logger.Info("task queues stopped", slog.String("phase", "shutdown")) + op.logger.Info("task queues stopped", slog.String(pkg.LogKeyPhase, "shutdown")) } diff --git a/pkg/shell-operator/operator_test.go b/pkg/shell-operator/operator_test.go index 570c9ceb..572c8ab1 100644 --- a/pkg/shell-operator/operator_test.go +++ b/pkg/shell-operator/operator_test.go @@ -73,7 +73,8 @@ func Test_Operator_startup_tasks(t *testing.T) { } expect := expectTasks[i] - hm := HookMetadataAccessor(tsk) + hm, ok := HookMetadataAccessor(tsk) + g.Expect(ok).To(BeTrue(), "HookMetadataAccessor should succeed for task %d", i) g.Expect(tsk.GetType()).To(Equal(expect.taskType), "task type should match for task %d, got %+v %+v", i, tsk, hm) g.Expect(hm.BindingType).To(Equal(expect.bindingType), "binding should match for task %d, got %+v %+v", i, tsk, hm) g.Expect(hm.HookName).To(HavePrefix(expect.hookPrefix), "hook name should match for task %d, got %+v %+v", i, tsk, hm) diff --git a/pkg/task/queue/queue_set.go b/pkg/task/queue/queue_set.go index e1bbdc48..fd7abf27 100644 --- a/pkg/task/queue/queue_set.go +++ b/pkg/task/queue/queue_set.go @@ -8,6 +8,7 @@ import ( "github.com/deckhouse/deckhouse/pkg/log" metricsstorage "github.com/deckhouse/deckhouse/pkg/metrics-storage" + pkg "github.com/flant/shell-operator/pkg" "github.com/flant/shell-operator/pkg/task" ) @@ -70,7 +71,7 @@ func (tqs *TaskQueueSet) Start(ctx context.Context) { tqs.IterateSnapshot(ctx, func(ctx context.Context, queue *TaskQueue) { defer func() { if r := recover(); r != nil { - tqs.logger.Warn("panic recovered in Start", slog.Any("error", r)) + tqs.logger.Warn("panic recovered in Start", slog.Any(pkg.LogKeyError, r)) } }() @@ -116,22 +117,22 @@ func (tqs *TaskQueueSet) GetMain() *TaskQueue { } func (tqs *TaskQueueSet) AddTailTasks(tasks ...task.Task) { - tqs.logger.Debug("AddTailTasks: adding tasks to queues", slog.Int("tasksCount", len(tasks))) + tqs.logger.Debug("AddTailTasks: adding tasks to queues", slog.Int(pkg.LogKeyTasksCount, len(tasks))) for _, resTask := range tasks { q, ok := tqs.Queues.Get(resTask.GetQueueName()) if ok { tqs.logger.Debug("AddTailTasks: adding task to queue", - slog.String("queueName", resTask.GetQueueName()), - slog.String("description", resTask.GetDescription())) + slog.String(pkg.LogKeyQueueName, resTask.GetQueueName()), + slog.String(pkg.LogKeyDescription, resTask.GetDescription())) q.AddLast(resTask) continue } log.Error("Possible bug!!! Got task for queue but queue is not created yet.", - slog.String("queueName", resTask.GetQueueName()), - slog.String("description", resTask.GetDescription())) + slog.String(pkg.LogKeyQueueName, resTask.GetQueueName()), + slog.String(pkg.LogKeyDescription, resTask.GetDescription())) } tqs.logger.Debug("AddTailTasks: adding tasks to queues done") @@ -190,7 +191,7 @@ func (tqs *TaskQueueSet) IterateSnapshot(ctx context.Context, doFn func(ctx cont // Execute callbacks without holding any locks defer func() { if r := recover(); r != nil { - tqs.logger.Warn("panic recovered in IterateSnapshot", slog.Any("error", r)) + tqs.logger.Warn("panic recovered in IterateSnapshot", slog.Any(pkg.LogKeyError, r)) } }() diff --git a/pkg/task/queue/task_counter.go b/pkg/task/queue/task_counter.go index 6c08ed45..9d23f31f 100644 --- a/pkg/task/queue/task_counter.go +++ b/pkg/task/queue/task_counter.go @@ -5,6 +5,7 @@ import ( metricsstorage "github.com/deckhouse/deckhouse/pkg/metrics-storage" + pkg "github.com/flant/shell-operator/pkg" "github.com/flant/shell-operator/pkg/metrics" "github.com/flant/shell-operator/pkg/task" ) @@ -131,8 +132,8 @@ func (tc *TaskCounter) UpdateHookMetricsFromSnapshot(hookCounts map[string]uint) for hookName, count := range hookCounts { if count > compactionMetricsThreshold { labels := map[string]string{ - "queue_name": tc.queueName, - "hook": hookName, + pkg.MetricKeyQueueName: tc.queueName, + pkg.MetricKeyHook: hookName, } tc.metricStorage.GaugeSet(metrics.TasksQueueCompactionTasksByHook, float64(count), labels) } diff --git a/pkg/task/queue/task_queue.go b/pkg/task/queue/task_queue.go index b83739ee..79199e74 100644 --- a/pkg/task/queue/task_queue.go +++ b/pkg/task/queue/task_queue.go @@ -14,6 +14,7 @@ import ( "github.com/deckhouse/deckhouse/pkg/log" metricsstorage "github.com/deckhouse/deckhouse/pkg/metrics-storage" + pkg "github.com/flant/shell-operator/pkg" bindingcontext "github.com/flant/shell-operator/pkg/hook/binding_context" "github.com/flant/shell-operator/pkg/hook/task_metadata" "github.com/flant/shell-operator/pkg/metrics" @@ -257,7 +258,7 @@ func (q *TaskQueue) MeasureActionTime(action string) func() { q.measureActionFn = func() {} } else { q.measureActionFn = measure.Duration(func(d time.Duration) { - q.metricStorage.HistogramObserve(metrics.TasksQueueActionDurationSeconds, d.Seconds(), map[string]string{"queue_name": q.Name, "queue_action": action}, nil) + q.metricStorage.HistogramObserve(metrics.TasksQueueActionDurationSeconds, d.Seconds(), map[string]string{pkg.MetricKeyQueueName: q.Name, pkg.MetricKeyQueueAction: action}, nil) }) } }) @@ -328,16 +329,16 @@ func (q *TaskQueue) AddLast(tasks ...task.Task) { for _, t := range tasks { q.lazydebug("adding task to queue", func() []any { return []any{ - slog.String("queue", q.Name), - slog.String("task_id", t.GetId()), - slog.String("task_type", string(t.GetType())), - slog.String("task_description", t.GetDescription()), - slog.Int("queue_length_before", q.storage.Length()), + slog.String(pkg.LogKeyQueue, q.Name), + slog.String(pkg.LogKeyTaskID, t.GetId()), + slog.String(pkg.LogKeyTaskType, string(t.GetType())), + slog.String(pkg.LogKeyTaskDescription, t.GetDescription()), + slog.Int(pkg.LogKeyQueueLengthBefore, q.storage.Length()), } }) if q.storage.Get(t.GetId()) != nil { - q.logger.Warn("task collision detected, unexpected behavior possible", slog.String("queue", q.Name), slog.String("task_id", t.GetId())) + q.logger.Warn("task collision detected, unexpected behavior possible", slog.String(pkg.LogKeyQueue, q.Name), slog.String(pkg.LogKeyTaskID, t.GetId())) } q.storage.AddLast(t) @@ -349,11 +350,11 @@ func (q *TaskQueue) AddLast(tasks ...task.Task) { if _, ok := q.compactableTypes[taskType]; ok { q.lazydebug("task is mergeable, marking queue as dirty", func() []any { return []any{ - slog.String("queue", q.Name), - slog.String("task_id", t.GetId()), - slog.String("task_type", string(taskType)), - slog.Int("queue_length", q.storage.Length()), - slog.Bool("queue_is_dirty", q.queueTasksCounter.IsAnyCapReached()), + slog.String(pkg.LogKeyQueue, q.Name), + slog.String(pkg.LogKeyTaskID, t.GetId()), + slog.String(pkg.LogKeyTaskType, string(taskType)), + slog.Int(pkg.LogKeyQueueLength, q.storage.Length()), + slog.Bool(pkg.LogKeyQueueIsDirty, q.queueTasksCounter.IsAnyCapReached()), } }) @@ -361,9 +362,9 @@ func (q *TaskQueue) AddLast(tasks ...task.Task) { if q.storage.Length() > compactionThreshold && q.queueTasksCounter.IsAnyCapReached() { q.lazydebug("triggering compaction due to queue length", func() []any { return []any{ - slog.String("queue", q.Name), - slog.Int("queue_length", q.storage.Length()), - slog.Int("compaction_threshold", compactionThreshold), + slog.String(pkg.LogKeyQueue, q.Name), + slog.Int(pkg.LogKeyQueueLength, q.storage.Length()), + slog.Int(pkg.LogKeyCompactionThreshold, compactionThreshold), } }) @@ -372,18 +373,18 @@ func (q *TaskQueue) AddLast(tasks ...task.Task) { q.lazydebug("compaction finished", func() []any { return []any{ - slog.String("queue", q.Name), - slog.Int("queue_length_before", currentQueue), - slog.Int("queue_length_after", q.storage.Length()), + slog.String(pkg.LogKeyQueue, q.Name), + slog.Int(pkg.LogKeyQueueLengthBefore, currentQueue), + slog.Int(pkg.LogKeyQueueLengthAfter, q.storage.Length()), } }) } } else { q.lazydebug("task is not mergeable", func() []any { return []any{ - slog.String("queue", q.Name), - slog.String("task_id", t.GetId()), - slog.String("task_type", string(taskType)), + slog.String(pkg.LogKeyQueue, q.Name), + slog.String(pkg.LogKeyTaskID, t.GetId()), + slog.String(pkg.LogKeyTaskType, string(taskType)), } }) } @@ -757,7 +758,7 @@ func (q *TaskQueue) Start(ctx context.Context) { } if q.Handler == nil { - log.Error("should set handler before start in queue", slog.String("name", q.Name)) + log.Error("should set handler before start in queue", slog.String(pkg.LogKeyName, q.Name)) q.SetStatus(QueueStatusNoHandlerSet) return } @@ -769,24 +770,24 @@ func (q *TaskQueue) Start(ctx context.Context) { q.SetStatus(QueueStatusIdle) var sleepDelay time.Duration for { - q.logger.Debug("queue: wait for task", slog.String("queue", q.Name), slog.Duration("sleep_delay", sleepDelay)) + q.logger.Debug("queue: wait for task", slog.String(pkg.LogKeyQueue, q.Name), slog.Duration(pkg.LogKeySleepDelay, sleepDelay)) t := q.waitForTask(sleepDelay) if t == nil { q.SetStatus(QueueStatusStop) - q.logger.Info("queue stopped", slog.String("name", q.Name)) + q.logger.Info("queue stopped", slog.String(pkg.LogKeyName, q.Name)) return } q.withLock(func() { if q.queueTasksCounter.IsAnyCapReached() { q.lazydebug("triggering compaction before task processing", func() []any { - return []any{slog.String("queue", q.Name), slog.String("task_id", t.GetId()), slog.String("task_type", string(t.GetType())), slog.Int("queue_length", q.storage.Length())} + return []any{slog.String(pkg.LogKeyQueue, q.Name), slog.String(pkg.LogKeyTaskID, t.GetId()), slog.String(pkg.LogKeyTaskType, string(t.GetType())), slog.Int(pkg.LogKeyQueueLength, q.storage.Length())} }) q.compaction(q.queueTasksCounter.GetReachedCap()) q.lazydebug("compaction completed, queue no longer dirty", func() []any { - return []any{slog.String("queue", q.Name), slog.Int("queue_length_after", q.storage.Length())} + return []any{slog.String(pkg.LogKeyQueue, q.Name), slog.Int(pkg.LogKeyQueueLengthAfter, q.storage.Length())} }) } }) @@ -797,15 +798,15 @@ func (q *TaskQueue) Start(ctx context.Context) { // use lazydebug because it dumps whole queue and task q.lazydebug("queue tasks after wait", func() []any { return []any{ - slog.String("queue", q.Name), - slog.String("tasks", q.String()), + slog.String(pkg.LogKeyQueue, q.Name), + slog.String(pkg.LogKeyTasks, q.String()), } }) q.lazydebug("queue task to handle", func() []any { return []any{ - slog.String("queue", q.Name), - slog.String("task_type", string(t.GetType())), + slog.String(pkg.LogKeyQueue, q.Name), + slog.String(pkg.LogKeyTaskType, string(t.GetType())), } }) @@ -814,7 +815,7 @@ func (q *TaskQueue) Start(ctx context.Context) { defer func() { if r := recover(); r != nil { - q.logger.Warn("panic recovered in Start", slog.Any("error", r)) + q.logger.Warn("panic recovered in Start", slog.Any(pkg.LogKeyError, r)) } }() taskRes := q.Handler(ctx, t) @@ -822,7 +823,7 @@ func (q *TaskQueue) Start(ctx context.Context) { // Check Done channel after long-running operation. select { case <-q.ctx.Done(): - q.logger.Info("queue stopped after task handling", slog.String("name", q.Name)) + q.logger.Info("queue stopped after task handling", slog.String(pkg.LogKeyName, q.Name)) q.SetStatus(QueueStatusStop) return default: @@ -841,9 +842,9 @@ func (q *TaskQueue) Start(ctx context.Context) { case Fail: if len(taskRes.GetAfterTasks()) > 0 || len(taskRes.GetHeadTasks()) > 0 || len(taskRes.GetTailTasks()) > 0 { q.logger.Warn("result is fail, cannot process tasks in result", - slog.Int("after_task_count", len(taskRes.GetAfterTasks())), - slog.Int("head_task_count", len(taskRes.GetHeadTasks())), - slog.Int("tail_task_count", len(taskRes.GetTailTasks()))) + slog.Int(pkg.LogKeyAfterTaskCount, len(taskRes.GetAfterTasks())), + slog.Int(pkg.LogKeyHeadTaskCount, len(taskRes.GetHeadTasks())), + slog.Int(pkg.LogKeyTailTaskCount, len(taskRes.GetTailTasks()))) } t.SetProcessing(false) @@ -854,9 +855,9 @@ func (q *TaskQueue) Start(ctx context.Context) { case Repeat: if len(taskRes.GetAfterTasks()) > 0 || len(taskRes.GetHeadTasks()) > 0 || len(taskRes.GetTailTasks()) > 0 { q.logger.Warn("result is repeat, cannot process tasks in result", - slog.Int("after_task_count", len(taskRes.GetAfterTasks())), - slog.Int("head_task_count", len(taskRes.GetHeadTasks())), - slog.Int("tail_task_count", len(taskRes.GetTailTasks()))) + slog.Int(pkg.LogKeyAfterTaskCount, len(taskRes.GetAfterTasks())), + slog.Int(pkg.LogKeyHeadTaskCount, len(taskRes.GetHeadTasks())), + slog.Int(pkg.LogKeyTailTaskCount, len(taskRes.GetTailTasks()))) } // repeat a current task after a small delay @@ -877,7 +878,7 @@ func (q *TaskQueue) Start(ctx context.Context) { } // use lazydebug because it dumps whole queue q.lazydebug("queue: tasks after handle", func() []any { - return []any{slog.String("queue", q.Name), slog.String("tasks", q.String())} + return []any{slog.String(pkg.LogKeyQueue, q.Name), slog.String(pkg.LogKeyTasks, q.String())} }) } }() @@ -1006,7 +1007,7 @@ func (q *TaskQueue) IterateSnapshot(doFn func(task.Task)) { defer func() { if r := recover(); r != nil { - q.logger.Warn("panic recovered in IterateSnapshot", slog.Any("error", r)) + q.logger.Warn("panic recovered in IterateSnapshot", slog.Any(pkg.LogKeyError, r)) } }() @@ -1071,7 +1072,7 @@ func (q *TaskQueue) withLock(fn func()) { q.m.Unlock() if r := recover(); r != nil { - q.logger.Warn("panic recovered in withLock", slog.Any("error", r)) + q.logger.Warn("panic recovered in withLock", slog.Any(pkg.LogKeyError, r)) } }() diff --git a/pkg/task/queue/task_queue_compaction_test.go b/pkg/task/queue/task_queue_compaction_test.go index a81cbcd1..a4e278a0 100644 --- a/pkg/task/queue/task_queue_compaction_test.go +++ b/pkg/task/queue/task_queue_compaction_test.go @@ -334,8 +334,8 @@ func TestTaskQueueList_AddLast_GreedyMerge(t *testing.T) { assert.Equal(t, tt.expectedIDs, finalIDs, "Task IDs and order should match expected") q.IterateSnapshot(func(task task.Task) { if mt, ok := task.(*mockTask); ok && mt.GetType() == task_metadata.HookRun { - hm := task_metadata.HookMetadataAccessor(mt) - require.NotNil(t, hm, "HookMetadataAccessor should not return nil for hook task") + hm, ok := task_metadata.HookMetadataAccessor(mt) + require.True(t, ok, "HookMetadataAccessor should return ok=true for hook task") assert.Equal(t, tt.expectedBCs[mt.GetId()], hm.BindingContext[0].Binding, "BindingContext for task %s should match", mt.GetId()) } }) diff --git a/pkg/task/task.go b/pkg/task/task.go index a2aeea70..487ad1b7 100644 --- a/pkg/task/task.go +++ b/pkg/task/task.go @@ -8,6 +8,7 @@ import ( "github.com/gofrs/uuid/v5" + pkg "github.com/flant/shell-operator/pkg" utils "github.com/flant/shell-operator/pkg/utils/labels" ) @@ -60,7 +61,7 @@ func NewTask(taskType TaskType) *BaseTask { Id: taskId, FailureCount: 0, Type: taskType, - LogLabels: map[string]string{"task.id": taskId}, + LogLabels: map[string]string{pkg.LogKeyTaskDotID: taskId}, Props: make(map[string]interface{}), compactionID: taskId, } @@ -101,7 +102,7 @@ func (t *BaseTask) deepCopy() *BaseTask { func (t *BaseTask) DeepCopyWithNewUUID() Task { newTask := t.deepCopy() newTask.Id = uuid.Must(uuid.NewV4()).String() - newTask.LogLabels["task.id"] = newTask.Id + newTask.LogLabels[pkg.LogKeyTaskDotID] = newTask.Id return newTask } diff --git a/pkg/utils/file/file.go b/pkg/utils/file/file.go index 18deeeec..f5e17b5d 100644 --- a/pkg/utils/file/file.go +++ b/pkg/utils/file/file.go @@ -9,6 +9,8 @@ import ( "strings" "github.com/deckhouse/deckhouse/pkg/log" + + pkg "github.com/flant/shell-operator/pkg" ) // FileExists returns true if path exists @@ -52,12 +54,12 @@ func RecursiveGetExecutablePaths(dir string, excludedDirs ...string) ([]string, if err := checkExecutableHookFile(f); err != nil { if errors.Is(err, ErrFileNoExecutablePermissions) { - log.Warn("file is skipped", slog.String("path", path), log.Err(err)) + log.Warn("file is skipped", slog.String(pkg.LogKeyPath, path), log.Err(err)) return nil } - log.Debug("file is skipped", slog.String("path", path), log.Err(err)) + log.Debug("file is skipped", slog.String(pkg.LogKeyPath, path), log.Err(err)) return nil } @@ -96,7 +98,7 @@ func RecursiveCheckLibDirectory(dir string) error { if err := checkExecutableHookFile(f); err == nil { log.Warn("file has executable permissions and is located in the ignored 'lib' directory", - slog.String("file", strings.TrimPrefix(path, dir))) + slog.String(pkg.LogKeyFile, strings.TrimPrefix(path, dir))) } return nil diff --git a/pkg/utils/signal/signal.go b/pkg/utils/signal/signal.go index a8e366f4..48039810 100644 --- a/pkg/utils/signal/signal.go +++ b/pkg/utils/signal/signal.go @@ -7,6 +7,8 @@ import ( "syscall" "github.com/deckhouse/deckhouse/pkg/log" + + pkg "github.com/flant/shell-operator/pkg" ) // WaitForProcessInterruption wait for SIGINT or SIGTERM and run a callback function. @@ -19,7 +21,7 @@ func WaitForProcessInterruption(cb ...func()) { interruptCh := make(chan os.Signal, 1) forcedExit := func(s os.Signal) { - log.Info("Forced shutdown by signal", slog.String("name", s.String())) + log.Info("Forced shutdown by signal", slog.String(pkg.LogKeyName, s.String())) signum := 0 if v, ok := s.(syscall.Signal); ok { @@ -35,7 +37,7 @@ func WaitForProcessInterruption(cb ...func()) { switch allowedCount { case 0: if len(cb) > 0 { - log.Info("Grace shutdown by signal", slog.String("name", sig.String())) + log.Info("Grace shutdown by signal", slog.String(pkg.LogKeyName, sig.String())) cb[0]() } else { forcedExit(sig) diff --git a/pkg/utils/structured-logger/structured_logger.go b/pkg/utils/structured-logger/structured_logger.go index 191ccd32..b3d1c44d 100644 --- a/pkg/utils/structured-logger/structured_logger.go +++ b/pkg/utils/structured-logger/structured_logger.go @@ -8,6 +8,8 @@ import ( "github.com/deckhouse/deckhouse/pkg/log" "github.com/go-chi/chi/v5/middleware" + + pkg "github.com/flant/shell-operator/pkg" ) // StructuredLogger is a simple, but powerful implementation of a custom structured @@ -31,9 +33,9 @@ func (l *StructuredLogger) NewLogEntry(r *http.Request) middleware.LogEntry { entry.Logger = entry.Logger.With( // TODO: make snake_case - slog.String("operator.component", l.ComponentLabel), - slog.String("http_method", r.Method), - slog.String("uri", r.RequestURI), + slog.String(pkg.LogKeyOperatorComponent, l.ComponentLabel), + slog.String(pkg.LogKeyHTTPMethod, r.Method), + slog.String(pkg.LogKeyURI, r.RequestURI), ) // entry.Logger.Info("request started") @@ -46,9 +48,9 @@ type StructuredLoggerEntry struct { func (l *StructuredLoggerEntry) Write(status, bytes int, _ http.Header, elapsed time.Duration, _ interface{}) { l.Logger = l.Logger.With( - slog.Int("resp_status", status), - slog.Int("resp_bytes_length", bytes), - slog.Float64("resp_elapsed_ms", float64(elapsed.Truncate(10*time.Microsecond))/100.0), + slog.Int(pkg.LogKeyRespStatus, status), + slog.Int(pkg.LogKeyRespBytesLength, bytes), + slog.Float64(pkg.LogKeyRespElapsedMs, float64(elapsed.Truncate(10*time.Microsecond))/100.0), ) l.Logger.Info("complete") @@ -57,8 +59,8 @@ func (l *StructuredLoggerEntry) Write(status, bytes int, _ http.Header, elapsed // This will log panics to log func (l *StructuredLoggerEntry) Panic(v interface{}, stack []byte) { l.Logger = l.Logger.With( - slog.String("stack", string(stack)), - slog.String("panic", fmt.Sprintf("%+v", v)), + slog.String(pkg.LogKeyStack, string(stack)), + slog.String(pkg.LogKeyPanic, fmt.Sprintf("%+v", v)), ) } diff --git a/pkg/webhook/admission/handler.go b/pkg/webhook/admission/handler.go index 8df88b06..61873bcc 100644 --- a/pkg/webhook/admission/handler.go +++ b/pkg/webhook/admission/handler.go @@ -14,6 +14,7 @@ import ( v1 "k8s.io/api/admission/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + pkg "github.com/flant/shell-operator/pkg" structuredLogger "github.com/flant/shell-operator/pkg/utils/structured-logger" ) @@ -70,10 +71,10 @@ func (h *WebhookHandler) serveReviewRequest(w http.ResponseWriter, r *http.Reque } logger := h.Logger.With( - slog.String("request", string(admissionReview.Request.UID)), - slog.String("name", admissionReview.Request.Name), - slog.String("namespace", admissionReview.Request.Namespace), - slog.String("kind", admissionReview.Request.Kind.Kind), + slog.String(pkg.LogKeyRequest, string(admissionReview.Request.UID)), + slog.String(pkg.LogKeyName, admissionReview.Request.Name), + slog.String(pkg.LogKeyNamespace, admissionReview.Request.Namespace), + slog.String(pkg.LogKeyKind, admissionReview.Request.Kind.Kind), ) admissionResponse, err := h.handleReviewRequest(ctx, r.URL.Path, admissionReview.Request) @@ -100,11 +101,11 @@ func (h *WebhookHandler) serveReviewRequest(w http.ResponseWriter, r *http.Reque func (h *WebhookHandler) handleReviewRequest(ctx context.Context, path string, request *v1.AdmissionRequest) (*v1.AdmissionResponse, error) { configurationID, webhookID := detectConfigurationAndWebhook(path) h.Logger.Info("Got AdmissionReview request", - slog.String("configurationID", configurationID), - slog.String("webhookID", webhookID), - slog.String("kind", request.Kind.Kind), - slog.String("name", request.Name), - slog.String("namespace", request.Namespace), + slog.String(pkg.LogKeyConfigurationID, configurationID), + slog.String(pkg.LogKeyWebhookID, webhookID), + slog.String(pkg.LogKeyKind, request.Kind.Kind), + slog.String(pkg.LogKeyName, request.Name), + slog.String(pkg.LogKeyNamespace, request.Namespace), ) if h.Handler == nil { diff --git a/pkg/webhook/admission/resource.go b/pkg/webhook/admission/resource.go index cf4d67d4..f6d6b8af 100644 --- a/pkg/webhook/admission/resource.go +++ b/pkg/webhook/admission/resource.go @@ -65,8 +65,8 @@ func (w *ValidatingWebhookResource) Register() error { } w.logger.Info("Add path to config", - slog.String("path", *webhook.ClientConfig.Service.Path), - slog.String("configurationName", w.opts.ConfigurationName)) + slog.String(pkg.LogKeyPath, *webhook.ClientConfig.Service.Path), + slog.String(pkg.LogKeyConfigurationName, w.opts.ConfigurationName)) configuration.Webhooks = append(configuration.Webhooks, *webhook.ValidatingWebhook) } @@ -97,7 +97,7 @@ func createWebhookPath(webhook IWebhookConfig) *string { } func (w *ValidatingWebhookResource) submit(conf *v1.ValidatingWebhookConfiguration) error { - logger := w.logger.With(slog.String("name", conf.Name)) + logger := w.logger.With(slog.String(pkg.LogKeyName, conf.Name)) client := w.opts.KubeClient.AdmissionregistrationV1().ValidatingWebhookConfigurations() listOpts := metav1.ListOptions{ @@ -166,8 +166,8 @@ func (w *MutatingWebhookResource) Register() error { } w.logger.Info("Add path to config", - slog.String("path", *webhook.ClientConfig.Service.Path), - slog.String("configurationName", w.opts.ConfigurationName)) + slog.String(pkg.LogKeyPath, *webhook.ClientConfig.Service.Path), + slog.String(pkg.LogKeyConfigurationName, w.opts.ConfigurationName)) configuration.Webhooks = append(configuration.Webhooks, *webhook.MutatingWebhook) } @@ -181,7 +181,7 @@ func (w *MutatingWebhookResource) Unregister() error { } func (w *MutatingWebhookResource) submit(conf *v1.MutatingWebhookConfiguration) error { - logger := w.logger.With(slog.String("name", conf.Name)) + logger := w.logger.With(slog.String(pkg.LogKeyName, conf.Name)) client := w.opts.KubeClient.AdmissionregistrationV1().MutatingWebhookConfigurations() listOpts := metav1.ListOptions{ diff --git a/pkg/webhook/conversion/handler.go b/pkg/webhook/conversion/handler.go index 94c4778d..be761971 100644 --- a/pkg/webhook/conversion/handler.go +++ b/pkg/webhook/conversion/handler.go @@ -16,6 +16,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + pkg "github.com/flant/shell-operator/pkg" structuredLogger "github.com/flant/shell-operator/pkg/utils/structured-logger" ) @@ -55,7 +56,7 @@ func (h *WebhookHandler) serveReviewRequest(w http.ResponseWriter, r *http.Reque crdName := detectCrdName(r.URL.Path) - logger := h.Logger.With(slog.String("crd", crdName)) + logger := h.Logger.With(slog.String(pkg.LogKeyCRD, crdName)) logger.Info("serving ConversionReview request") var convertReview v1.ConversionReview @@ -75,8 +76,8 @@ func (h *WebhookHandler) serveReviewRequest(w http.ResponseWriter, r *http.Reque } logger = logger.With( - slog.String("request", string(convertReview.Request.UID)), - slog.String("kind", convertReview.Kind)) + slog.String(pkg.LogKeyRequest, string(convertReview.Request.UID)), + slog.String(pkg.LogKeyKind, convertReview.Kind)) conversionResponse, err := h.handleReviewRequest(ctx, crdName, convertReview.Request) if err != nil { diff --git a/pkg/webhook/server/server.go b/pkg/webhook/server/server.go index cb117016..999dc841 100644 --- a/pkg/webhook/server/server.go +++ b/pkg/webhook/server/server.go @@ -12,6 +12,8 @@ import ( "github.com/deckhouse/deckhouse/pkg/log" "github.com/go-chi/chi/v5" + + pkg "github.com/flant/shell-operator/pkg" ) type WebhookServer struct { @@ -91,7 +93,7 @@ func (s *WebhookServer) Start() error { } go func() { - s.Logger.Info("Webhook server listens", slog.String("address", listenAddr)) + s.Logger.Info("Webhook server listens", slog.String(pkg.LogKeyAddress, listenAddr)) err := srv.ServeTLS(listener, "", "") if err != nil { s.Logger.Error("Error starting Webhook https server", log.Err(err)) diff --git a/test/hook/context/generator.go b/test/hook/context/generator.go index cbcc7724..3feaabe7 100644 --- a/test/hook/context/generator.go +++ b/test/hook/context/generator.go @@ -11,7 +11,6 @@ import ( metricsstorage "github.com/deckhouse/deckhouse/pkg/metrics-storage" "github.com/flant/kube-client/fake" - "github.com/flant/shell-operator/pkg/app" "github.com/flant/shell-operator/pkg/hook" bctx "github.com/flant/shell-operator/pkg/hook/binding_context" "github.com/flant/shell-operator/pkg/hook/controller" @@ -69,8 +68,6 @@ func NewBindingContextController(config string, logger *log.Logger, version ...f b.KubeEventsManager.WithMetricStorage(metricsstorage.NewMetricStorage( metricsstorage.WithLogger(log.NewNop()), )) - // Re-create factory to drop informers created using different b.fakeCluster.Client. - kubeeventsmanager.DefaultFactoryStore.Reset() b.ScheduleManager = schedulemanager.NewScheduleManager(ctx, b.logger.Named("schedule-manager")) @@ -107,7 +104,7 @@ func (b *BindingContextController) Run(initialState string) (GeneratedBindingCon } if b.Hook == nil { - testHook := hook.NewHook("test", "test", app.DebugKeepTmpFiles, app.LogProxyHookJSON, app.ProxyJsonLogKey, b.logger.Named("hook")) + testHook := hook.NewHook("test", "test", false, false, "", b.logger.Named("hook")) testHook, err = testHook.LoadConfig([]byte(b.HookConfig)) if err != nil { return GeneratedBindingContexts{}, fmt.Errorf("couldn't load or validate hook configuration: %v", err)