From c958167dc292d2ba09deb7b5c25db7f8c944e8de Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sandor=20Sz=C3=BCcs?= Date: Wed, 18 Jun 2025 15:05:23 +0200 Subject: [PATCH 1/4] feature: introduce better timeout handling TCPQueueTimeoutServer, because ReadHeaderTimeoutServer and ReadTimeoutServer are not the right timeouts to use for a tcp queuerefactor: create metrics.NoMetrics to not have to check for nil accessing metrics.Metrics, this might be even an optimization because of omitting branches and compiler can likely figure out that we call an empty body function test: add good coverage for StackListener and a benchmark benchmark: stackListner and queueListener MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Sandor Szücs --- config/config.go | 3 + config/config_test.go | 1 + go.mod | 3 + go.sum | 6 + metrics/metrics.go | 29 ++ queuelistener/listener.go | 7 +- queuelistener/stack.go | 44 ++ queuelistener/stack_listener.go | 161 ++++++ queuelistener/stack_listener_test.go | 749 +++++++++++++++++++++++++++ queuelistener/stack_other.go | 118 +++++ queuelistener/stack_other_test.go | 133 +++++ queuelistener/stack_test.go | 132 +++++ skipper.go | 10 +- skipper_test.go | 1 + 14 files changed, 1390 insertions(+), 7 deletions(-) create mode 100644 queuelistener/stack.go create mode 100644 queuelistener/stack_listener.go create mode 100644 queuelistener/stack_listener_test.go create mode 100644 queuelistener/stack_other.go create mode 100644 queuelistener/stack_other_test.go create mode 100644 queuelistener/stack_test.go diff --git a/config/config.go b/config/config.go index 2fa53c1962..23e8a3c036 100644 --- a/config/config.go +++ b/config/config.go @@ -252,6 +252,7 @@ type Config struct { BackendFlushInterval time.Duration `yaml:"backend-flush-interval"` ExperimentalUpgrade bool `yaml:"experimental-upgrade"` ExperimentalUpgradeAudit bool `yaml:"experimental-upgrade-audit"` + TCPQueueTimeoutServer time.Duration `yaml:"tcp-queue-timeout-server"` ReadTimeoutServer time.Duration `yaml:"read-timeout-server"` ReadHeaderTimeoutServer time.Duration `yaml:"read-header-timeout-server"` WriteTimeoutServer time.Duration `yaml:"write-timeout-server"` @@ -576,6 +577,7 @@ func NewConfig() *Config { flag.DurationVar(&cfg.BackendFlushInterval, "backend-flush-interval", 20*time.Millisecond, "flush interval for upgraded proxy connections") flag.BoolVar(&cfg.ExperimentalUpgrade, "experimental-upgrade", false, "enable experimental feature to handle upgrade protocol requests") flag.BoolVar(&cfg.ExperimentalUpgradeAudit, "experimental-upgrade-audit", false, "enable audit logging of the request line and the messages during the experimental web socket upgrades") + flag.DurationVar(&cfg.TCPQueueTimeoutServer, "tcp-queue-timeout-server", time.Second, "set timeout for how long TCP connections can be queued in http server connections") flag.DurationVar(&cfg.ReadTimeoutServer, "read-timeout-server", 5*time.Minute, "set ReadTimeout for http server connections") flag.DurationVar(&cfg.ReadHeaderTimeoutServer, "read-header-timeout-server", 60*time.Second, "set ReadHeaderTimeout for http server connections") flag.DurationVar(&cfg.WriteTimeoutServer, "write-timeout-server", 60*time.Second, "set WriteTimeout for http server connections") @@ -948,6 +950,7 @@ func (c *Config) ToOptions() skipper.Options { BackendFlushInterval: c.BackendFlushInterval, ExperimentalUpgrade: c.ExperimentalUpgrade, ExperimentalUpgradeAudit: c.ExperimentalUpgradeAudit, + TCPQueueTimeoutServer: c.TCPQueueTimeoutServer, ReadTimeoutServer: c.ReadTimeoutServer, ReadHeaderTimeoutServer: c.ReadHeaderTimeoutServer, WriteTimeoutServer: c.WriteTimeoutServer, diff --git a/config/config_test.go b/config/config_test.go index 0deac48612..1b83382089 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -129,6 +129,7 @@ func defaultConfig(with func(*Config)) *Config { IdleConnsPerHost: 64, CloseIdleConnsPeriod: 20 * time.Second, BackendFlushInterval: 20 * time.Millisecond, + TCPQueueTimeoutServer: time.Second, ReadTimeoutServer: 5 * time.Minute, ReadHeaderTimeoutServer: 1 * time.Minute, WriteTimeoutServer: 1 * time.Minute, diff --git a/go.mod b/go.mod index b9894519c1..40c2b6a05b 100644 --- a/go.mod +++ b/go.mod @@ -68,6 +68,8 @@ require ( github.com/HdrHistogram/hdrhistogram-go v1.1.2 // indirect github.com/Microsoft/go-winio v0.6.2 // indirect github.com/agnivade/levenshtein v1.2.1 // indirect + github.com/amirylm/go-options v0.0.2 // indirect + github.com/amirylm/lockfree v0.0.4 // indirect github.com/armon/go-metrics v0.4.1 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b // indirect @@ -101,6 +103,7 @@ require ( github.com/go-ole/go-ole v1.2.6 // indirect github.com/gobwas/glob v0.2.3 // indirect github.com/gogo/protobuf v1.3.2 // indirect + github.com/golang-collections/collections v0.0.0-20130729185459-604e922904d3 // indirect github.com/golang/protobuf v1.5.4 // indirect github.com/google/btree v1.0.0 // indirect github.com/google/flatbuffers v25.2.10+incompatible // indirect diff --git a/go.sum b/go.sum index 5c5564bde1..a2e8209a43 100644 --- a/go.sum +++ b/go.sum @@ -34,6 +34,10 @@ github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883 h1:bvNMNQO63//z+xNg github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8= github.com/andybalholm/brotli v1.2.0 h1:ukwgCxwYrmACq68yiUqwIWnGY0cTPox/M94sVwToPjQ= github.com/andybalholm/brotli v1.2.0/go.mod h1:rzTDkvFWvIrjDXZHkuS16NPggd91W3kUSvPlQ1pLaKY= +github.com/amirylm/go-options v0.0.2 h1:OvuFcKUg3+7jdKeY54XrRnAIxP9Dmultlg9dS7Q3TpA= +github.com/amirylm/go-options v0.0.2/go.mod h1:OmhJW65Aeyb74akzydI9SVgCjuwKlPNcTZeXk7TETPk= +github.com/amirylm/lockfree v0.0.4 h1:SAC96Droepe6HjDqymFY3E6UyJ6GR2crOGvbXFlk+kY= +github.com/amirylm/lockfree v0.0.4/go.mod h1:92tGIqOCCQdd9SR5nGLYwK4GN9PTKlQmRwXKxqfVz/U= github.com/arbovm/levenshtein v0.0.0-20160628152529-48b4e1c0c4d0 h1:jfIu9sQUG6Ig+0+Ap1h4unLjW6YQJpKZVmUzxsD4E/Q= github.com/arbovm/levenshtein v0.0.0-20160628152529-48b4e1c0c4d0/go.mod h1:t2tdKJDJF9BV14lnkjHmOQgcvEKgtqs5a1N3LNdJhGE= github.com/armon/go-metrics v0.4.1 h1:hR91U9KYmb6bLBYLQjyM+3j+rcd/UhE+G78SFnF8gJA= @@ -169,6 +173,8 @@ github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7a github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= +github.com/golang-collections/collections v0.0.0-20130729185459-604e922904d3 h1:zN2lZNZRflqFyxVaTIU61KNKQ9C0055u9CAfpmqUvo4= +github.com/golang-collections/collections v0.0.0-20130729185459-604e922904d3/go.mod h1:nPpo7qLxd6XL3hWJG/O60sR8ZKfMCiIoNap5GvD12KU= github.com/golang-jwt/jwt/v4 v4.4.2/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= github.com/golang-jwt/jwt/v4 v4.5.2 h1:YtQM7lnr8iZ+j5q71MGKkNw9Mn7AjHM68uc9g5fXeUI= github.com/golang-jwt/jwt/v4 v4.5.2/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= diff --git a/metrics/metrics.go b/metrics/metrics.go index 08fa111736..206b9cbc7f 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -84,6 +84,35 @@ type Metrics interface { Close() } +type NoMetric struct{} + +func (NoMetric) MeasureSince(string, time.Time) {} +func (NoMetric) IncCounter(string) {} +func (NoMetric) IncCounterBy(string, int64) {} +func (NoMetric) IncFloatCounterBy(string, float64) {} +func (NoMetric) MeasureRouteLookup(time.Time) {} +func (NoMetric) MeasureFilterCreate(string, time.Time) {} +func (NoMetric) MeasureFilterRequest(string, time.Time) {} +func (NoMetric) MeasureAllFiltersRequest(string, time.Time) {} +func (NoMetric) MeasureBackend(string, time.Time) {} +func (NoMetric) MeasureBackendHost(string, time.Time) {} +func (NoMetric) MeasureFilterResponse(string, time.Time) {} +func (NoMetric) MeasureAllFiltersResponse(string, time.Time) {} +func (NoMetric) MeasureResponse(int, string, string, time.Time) {} +func (NoMetric) MeasureProxy(time.Duration, time.Duration) {} +func (NoMetric) MeasureServe(string, string, string, int, time.Time) {} +func (NoMetric) IncRoutingFailures() {} +func (NoMetric) IncErrorsBackend(string) {} +func (NoMetric) MeasureBackend5xx(time.Time) {} +func (NoMetric) IncErrorsStreaming(string) {} +func (NoMetric) RegisterHandler(string, *http.ServeMux) {} +func (NoMetric) UpdateGauge(string, float64) {} +func (NoMetric) IncValidRoutes() {} +func (NoMetric) IncInvalidRoutes(string) {} +func (NoMetric) Close() {} + +var _ Metrics = NoMetric{} + // Options for initializing metrics collection. type Options struct { // the metrics exposing format. diff --git a/queuelistener/listener.go b/queuelistener/listener.go index 527db456b6..11c6bd86c1 100644 --- a/queuelistener/listener.go +++ b/queuelistener/listener.go @@ -20,6 +20,7 @@ const ( maxCalculatedQueueSize = 50_000 acceptedConnectionsKey = "listener.accepted.connections" queuedConnectionsKey = "listener.queued.connections" + queueTimeoutKey = "listener.queued.timeouts" acceptLatencyKey = "listener.accept.latency" ) @@ -96,6 +97,7 @@ type listener struct { var ( token struct{} errListenerClosed = errors.New("listener closed") + errAcceptTimeout = errors.New("accept timeout") ) func (c *connection) Close() error { @@ -132,10 +134,7 @@ func (o Options) maxQueueSize() int64 { return int64(o.MaxQueueSize) } - maxQueueSize := 10 * o.maxConcurrency() - if maxQueueSize > maxCalculatedQueueSize { - maxQueueSize = maxCalculatedQueueSize - } + maxQueueSize := min(10*o.maxConcurrency(), maxCalculatedQueueSize) return maxQueueSize } diff --git a/queuelistener/stack.go b/queuelistener/stack.go new file mode 100644 index 0000000000..264cd485bb --- /dev/null +++ b/queuelistener/stack.go @@ -0,0 +1,44 @@ +package queuelistener + +import ( + "sync" +) + +const stackSize int = 10000 + +type naiveStack[T any] struct { + mu sync.Mutex + top int + items [stackSize]*T +} + +func NewStack() *naiveStack[external] { + return &naiveStack[external]{ + top: -1, + } +} + +func (s *naiveStack[T]) Push(data *T) { + s.mu.Lock() + defer s.mu.Unlock() + + if s.top == len(s.items)-1 { + return + } + + s.top++ + s.items[s.top] = data +} + +func (s *naiveStack[T]) Pop() *T { + s.mu.Lock() + defer s.mu.Unlock() + + if s.top == -1 { + return nil + } else { + defer func() { s.top-- }() + } + + return s.items[s.top] +} diff --git a/queuelistener/stack_listener.go b/queuelistener/stack_listener.go new file mode 100644 index 0000000000..d8ab4016ff --- /dev/null +++ b/queuelistener/stack_listener.go @@ -0,0 +1,161 @@ +package queuelistener + +import ( + "errors" + "fmt" + "net" + "net/http" + "sync" + "time" + + "github.com/zalando/skipper/logging" + "github.com/zalando/skipper/metrics" +) + +type stackListener struct { + log logging.Logger + metrics metrics.Metrics + maxConcurrency int64 + maxQueueSize int64 + memoryLimitBytes int64 + connectionBytes int + queueTimeout time.Duration + stack *naiveStack[external] + externalListener net.Listener + acceptInternal chan external + quit chan struct{} + once sync.Once +} + +func StackListener(o Options) (net.Listener, error) { + nl, err := net.Listen(o.Network, o.Address) + if err != nil { + return nil, fmt.Errorf("StackListener failed net.Listen: %w", err) + } + + acceptCH := make(chan external) + + if o.Log == nil { + o.Log = &logging.DefaultLog{} + } + + if o.MemoryLimitBytes <= 0 { + o.MemoryLimitBytes = defaultMemoryLimitBytes + } + + if o.ConnectionBytes <= 0 { + o.ConnectionBytes = defaultConnectionBytes + } + + m := o.Metrics + if m == nil { + m = metrics.NoMetric{} + } + l := &stackListener{ + log: o.Log, + metrics: m, + externalListener: nl, + maxConcurrency: o.maxConcurrency(), + maxQueueSize: o.maxQueueSize(), + memoryLimitBytes: o.MemoryLimitBytes, + connectionBytes: o.ConnectionBytes, + queueTimeout: o.QueueTimeout, + stack: NewStack(), + acceptInternal: acceptCH, + quit: make(chan struct{}), + once: sync.Once{}, + } + l.log.Infof("TCP lifo listener config: %s", l) + + go l.listenExternal() + go l.listenInternal() + return l, nil +} + +func (l *stackListener) String() string { + return fmt.Sprintf("stackListener concurrency: %d, queue size: %d, memory limit: %d, bytes per connection: %d, queue timeout: %s", l.maxConcurrency, l.maxQueueSize, l.memoryLimitBytes, l.connectionBytes, l.queueTimeout) +} + +func (l *stackListener) Accept() (net.Conn, error) { + select { + case <-l.quit: + return nil, errListenerClosed + case c := <-l.acceptInternal: + l.metrics.MeasureSince(acceptLatencyKey, c.accepted) + d := time.Since(c.accepted) + if d > l.queueTimeout { + l.metrics.IncCounter(queueTimeoutKey) + if c.Conn != nil { + c.Conn.Close() + } + return nil, errAcceptTimeout + } + return c, nil + } +} + +func (l *stackListener) Addr() net.Addr { + return l.externalListener.Addr() +} + +func (l *stackListener) Close() error { + l.once.Do(func() { + close(l.quit) + l.externalListener.Close() + close(l.acceptInternal) + }) + + return nil +} + +func (l *stackListener) listenExternal() { + var ( + err error + c net.Conn + ) + for { + select { + case <-l.quit: + return + default: + } + + c, err = l.externalListener.Accept() + if err != nil { + if errors.Is(err, http.ErrServerClosed) { + l.log.Infof("Server closed: %v", err) + return + } + + // client closed for example + //l.log.Infof("Failed to accept connection (%T): %v", err, err) + if c != nil { + l.log.Info("close connection") + c.Close() + } + continue + } + cc := external{c, time.Now()} + l.stack.Push(&cc) + } +} + +func (l *stackListener) listenInternal() { + for { + select { + case <-l.quit: + return + default: + } + cc := l.stack.Pop() + if cc == nil { + // reduce cpu usage caused by busywait + time.Sleep(10 * time.Microsecond) + continue + } + l.metrics.IncCounter(acceptedConnectionsKey) + l.metrics.UpdateGauge(queuedConnectionsKey, float64(l.stack.top+1)) + + l.acceptInternal <- *cc + } +} diff --git a/queuelistener/stack_listener_test.go b/queuelistener/stack_listener_test.go new file mode 100644 index 0000000000..69b9295a8a --- /dev/null +++ b/queuelistener/stack_listener_test.go @@ -0,0 +1,749 @@ +package queuelistener + +import ( + "fmt" + "io" + "net" + "net/http" + "runtime" + "testing" + "time" + + log "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + + "github.com/zalando/skipper/metrics" + "github.com/zalando/skipper/metrics/metricstest" + skpnet "github.com/zalando/skipper/net" +) + +func TestStackListener(t *testing.T) { + for _, tt := range []struct { + name string + opt Options + want error + }{ + { + name: "wrong listener options", + want: fmt.Errorf("StackListener failed net.Listen:"), + }, + { + name: "listener with metrics", + opt: Options{ + Network: "tcp", + Address: ":9090", + MaxConcurrency: 10000, + MaxQueueSize: 10000, + MemoryLimitBytes: 1000, + ConnectionBytes: 100, + QueueTimeout: time.Second, + Metrics: metrics.Default, + }, + want: nil, + }, + { + name: "listener without metrics", + opt: Options{ + Network: "tcp", + Address: ":9090", + MaxConcurrency: 10000, + MaxQueueSize: 10000, + MemoryLimitBytes: 1000, + ConnectionBytes: 100, + QueueTimeout: time.Second, + Metrics: nil, + }, + want: nil, + }} { + t.Run(tt.name, func(t *testing.T) { + srv := &http.Server{ + Addr: tt.opt.Address, + Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusAccepted) + w.Write([]byte("OK")) + }), + ReadTimeout: time.Second, + ReadHeaderTimeout: time.Second, + WriteTimeout: time.Second, + IdleTimeout: time.Second, + MaxHeaderBytes: 1000, + } + defer srv.Close() + + cm := &skpnet.ConnManager{ + Keepalive: 0, + KeepaliveRequests: 0, + Metrics: tt.opt.Metrics, + } + cm.Configure(srv) + + l, err := StackListener(tt.opt) + if err != nil { + if tt.want == nil { + t.Fatalf("Failed to create StackListener: %v", err) + } else { + // have err and want error + return + } + } + defer l.Close() + if tt.want != nil && err == nil { + t.Fatalf("Failed to get error from StackListener, want: %v", tt.want) + } + + go func() { + t.Logf("start server") + if err := srv.Serve(l); err != http.ErrServerClosed { + log.Errorf("Serve failed: %v", err) + } + return + }() + + dst := "http://" + l.Addr().String() + var rsp *http.Response + for range 3 { + rsp, err = http.DefaultClient.Get(dst) + if err != nil { + time.Sleep(time.Second) + continue + } + break + } + if err != nil { + t.Fatalf("Failed to do a GET request to %s", dst) + } + if rsp.StatusCode != http.StatusAccepted { + t.Fatalf("Failed to get response status code we expect, got: %d", rsp.StatusCode) + } + defer rsp.Body.Close() + io.Copy(io.Discard, rsp.Body) + http.DefaultClient.CloseIdleConnections() + }) + } +} + +func TestStackListenerRequestFlow(t *testing.T) { + opt := Options{ + Network: "tcp", + Address: ":9090", + MaxConcurrency: 10000, + MaxQueueSize: 10000, + MemoryLimitBytes: 1000, + ConnectionBytes: 100, + QueueTimeout: time.Second, + Metrics: &metricstest.MockMetrics{}, + } + + srv := &http.Server{ + Addr: opt.Address, + Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusAccepted) + w.Write([]byte("OK")) + }), + ReadTimeout: time.Second, + ReadHeaderTimeout: time.Second, + WriteTimeout: time.Second, + IdleTimeout: time.Second, + MaxHeaderBytes: 1000, + } + defer srv.Close() + + cm := &skpnet.ConnManager{ + Keepalive: 0, + KeepaliveRequests: 0, + Metrics: opt.Metrics, + } + cm.Configure(srv) + + l, err := StackListener(opt) + if err != nil { + t.Fatalf("Failed to create StackListener: %v", err) + } + defer l.Close() + + go func() { + t.Logf("start server") + if err := srv.Serve(l); err != http.ErrServerClosed { + log.Errorf("Serve failed: %v", err) + } + return + }() + + dst := "http://" + l.Addr().String() + var rsp *http.Response + for range 3 { + rsp, err = http.DefaultClient.Get(dst) + if err != nil { + time.Sleep(time.Second) + continue + } + break + } + if err != nil { + t.Fatalf("Failed to do a GET request to %s", dst) + } + if rsp.StatusCode != http.StatusAccepted { + t.Fatalf("Failed to get response status code we expect, got: %d", rsp.StatusCode) + } + defer rsp.Body.Close() + io.Copy(io.Discard, rsp.Body) + + for range 10 { + rsp, err = http.DefaultClient.Get(dst) + if err != nil { + t.Fatalf("Failed to do a GET request to %s", dst) + } + if rsp.StatusCode != http.StatusAccepted { + t.Fatalf("Failed to get response status code we expect, got: %d", rsp.StatusCode) + } + t.Logf("Response: %d", rsp.StatusCode) + io.Copy(io.Discard, rsp.Body) + rsp.Body.Close() + } + + t.Logf("opt.Metrics: %+v", opt.Metrics) +} + +func TestStackListenerTimeout(t *testing.T) { + for _, tt := range []struct { + name string + opt Options + want error + }{ + { + name: "listener timeout with metrics", + opt: Options{ + Network: "tcp", + Address: ":9090", + MaxConcurrency: 10000, + MaxQueueSize: 10000, + MemoryLimitBytes: 1000, + ConnectionBytes: 100, + QueueTimeout: time.Microsecond, + Metrics: &metricstest.MockMetrics{}, + }, + want: nil, + }, + { + name: "listener timeout without metrics", + opt: Options{ + Network: "tcp", + Address: ":9090", + MaxConcurrency: 10000, + MaxQueueSize: 10000, + MemoryLimitBytes: 1000, + ConnectionBytes: 100, + QueueTimeout: time.Microsecond, + Metrics: nil, + }, + want: nil, + }} { + t.Run(tt.name, func(t *testing.T) { + srv := &http.Server{ + Addr: tt.opt.Address, + Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusAccepted) + w.Write([]byte("OK")) + }), + ReadTimeout: time.Second, + ReadHeaderTimeout: time.Second, + WriteTimeout: time.Second, + IdleTimeout: time.Second, + MaxHeaderBytes: 1000, + } + defer srv.Close() + + cm := &skpnet.ConnManager{ + Keepalive: 0, + KeepaliveRequests: 0, + Metrics: tt.opt.Metrics, + } + cm.Configure(srv) + + l, err := StackListener(tt.opt) + if err != nil { + if tt.want == nil { + t.Fatalf("Failed to create StackListener: %v", err) + } else { + // have err and want error + return + } + } + defer l.Close() + + go func() { + t.Logf("start server") + if err := srv.Serve(l); err != http.ErrServerClosed { + t.Logf("Server closed: %v", err) + } + return + }() + + dst := "http://" + l.Addr().String() + + _, err = http.DefaultClient.Get(dst) + if err == nil { + t.Fatal("Failed to create a fail") + } + + if tt.opt.Metrics != nil { + if m, ok := tt.opt.Metrics.(*metricstest.MockMetrics); ok { + m.WithCounters(func(c map[string]int64) { + t.Logf("counters: %v", c) + + assert.Equal(t, int64(1), c["listener.queued.timeouts"]) + assert.Equal(t, int64(1), c["listener.accepted.connections"]) + }) + } + } + + http.DefaultClient.CloseIdleConnections() + + }) + } +} + +func TestStackListenerShutdown(t *testing.T) { + for _, tt := range []struct { + name string + opt Options + backendTime time.Duration + want error + }{ + { + name: "listener timeout with metrics", + opt: Options{ + Network: "tcp", + Address: ":9090", + MaxConcurrency: 10000, + MaxQueueSize: 10000, + MemoryLimitBytes: 1000, + ConnectionBytes: 100, + QueueTimeout: 100 * time.Millisecond, + Metrics: &metricstest.MockMetrics{}, + }, + backendTime: time.Second, + want: nil, + }, + { + name: "listener timeout without metrics", + opt: Options{ + Network: "tcp", + Address: ":9090", + MaxConcurrency: 10000, + MaxQueueSize: 10000, + MemoryLimitBytes: 1000, + ConnectionBytes: 100, + QueueTimeout: 100 * time.Millisecond, + Metrics: nil, + }, + backendTime: time.Second, + want: nil, + }} { + t.Run(tt.name, func(t *testing.T) { + srv := &http.Server{ + Addr: tt.opt.Address, + Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + + // backend hang + time.Sleep(tt.backendTime) + + w.WriteHeader(http.StatusAccepted) + w.Write([]byte("OK")) + }), + ReadTimeout: time.Second, + ReadHeaderTimeout: time.Second, + WriteTimeout: time.Second, + IdleTimeout: time.Second, + MaxHeaderBytes: 1000, + } + defer srv.Close() + + cm := &skpnet.ConnManager{ + Keepalive: 0, + KeepaliveRequests: 0, + Metrics: tt.opt.Metrics, + } + cm.Configure(srv) + + l, err := StackListener(tt.opt) + if err != nil { + if tt.want == nil { + t.Fatalf("Failed to create StackListener: %v", err) + } else { + // have err and want error + return + } + } + defer l.Close() + + go func() { + t.Logf("start server") + if err := srv.Serve(l); err != http.ErrServerClosed { + t.Logf("Server closed: %v", err) + } + return + }() + + dst := "http://" + l.Addr().String() + errCH := make(chan error) + go func(ch chan error) { + _, err = http.DefaultClient.Get(dst) + ch <- err + }(errCH) + + time.Sleep(500 * time.Millisecond) + // server init close + err = l.Close() + t.Logf("Close: %v", err) + + err = <-errCH + t.Logf("client err: %v", err) + // if !strings.Contains(err.Error(), "connection refused") { + // t.Errorf("Failed to get client err: %v", err) + // } + + if tt.opt.Metrics != nil { + if m, ok := tt.opt.Metrics.(*metricstest.MockMetrics); ok { + m.WithCounters(func(c map[string]int64) { + t.Logf("counters: %v", c) + + // assert.Equal(t, int64(1), c["listener.queued.timeouts"]) + // assert.Equal(t, int64(1), c["listener.accepted.connections"]) + }) + } + } + + http.DefaultClient.CloseIdleConnections() + + }) + } +} + +func TestTCPListenerStackListener(t *testing.T) { + addr := ":9090" + l, err := StackListener(Options{ + Network: "tcp", + Address: addr, + MaxConcurrency: 10000, + MaxQueueSize: 10000, + MemoryLimitBytes: 1000, + ConnectionBytes: 100, + QueueTimeout: time.Second, + Metrics: metrics.Default, + //Metrics: metrics.NoMetric{}, + }) + if err != nil { + t.Fatalf("Failed to create QueueListener: %v", err) + } + defer l.Close() + + srv := &http.Server{ + Addr: addr, + Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusAccepted) + w.Write([]byte("OK")) + }), + ReadTimeout: time.Second, + ReadHeaderTimeout: time.Second, + WriteTimeout: time.Second, + IdleTimeout: time.Second, + MaxHeaderBytes: 1000, + } + defer srv.Close() + + cm := &skpnet.ConnManager{ + Keepalive: 10 * time.Second, + KeepaliveRequests: 10, + Metrics: metrics.Default, + } + cm.Configure(srv) + + go func() { + if err := srv.Serve(l); err != http.ErrServerClosed { + log.Errorf("Serve failed: %v", err) + } + }() + + // check ready + err = fmt.Errorf("an error") + var rsp *http.Response + for err != nil { + rsp, err = http.DefaultClient.Get("http://" + l.Addr().String() + "/") + time.Sleep(time.Millisecond) + } + io.Copy(io.Discard, rsp.Body) + rsp.Body.Close() + http.DefaultClient.CloseIdleConnections() + + buf := []byte("GET / HTTP/1.1\r\nHost: localhost\r\n\r\n") + result := make([]byte, 1024) + + for n := 0; n < 2; n++ { + c, err := net.Dial("tcp", l.Addr().String()) + if err != nil { + t.Fatalf("Failed to dial: %v", err) + return + } + _, err = c.Write(buf) + if err != nil { + t.Fatalf("Failed to write: %v", err) + c.Close() + return + } + + n, err := c.Read(result) + if err != nil { + t.Fatalf("Failed to write: %v", err) + c.Close() + return + } + + if n != 124 { + t.Fatalf("Read %d bytes: %q", n, string(result[0:n])) + } + t.Logf("Read %d bytes: %q", n, string(result[0:n])) + c.Close() + + } +} + +func TestTCPListenerQueueListener(t *testing.T) { + addr := ":9090" + l, err := Listen(Options{ + Network: "tcp", + Address: addr, + MaxConcurrency: 10000, + MaxQueueSize: 10000, + MemoryLimitBytes: 1000, + ConnectionBytes: 100, + QueueTimeout: time.Second, + Metrics: metrics.Default, + }) + if err != nil { + t.Fatalf("Failed to create QueueListener: %v", err) + } + defer l.Close() + + srv := &http.Server{ + Addr: addr, + Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusAccepted) + w.Write([]byte("OK")) + }), + ReadTimeout: time.Second, + ReadHeaderTimeout: time.Second, + WriteTimeout: time.Second, + IdleTimeout: time.Second, + MaxHeaderBytes: 1000, + } + defer srv.Close() + + cm := &skpnet.ConnManager{ + Keepalive: 10 * time.Second, + KeepaliveRequests: 10, + Metrics: metrics.Default, + } + cm.Configure(srv) + + go func() { + if err := srv.Serve(l); err != http.ErrServerClosed { + log.Errorf("Serve failed: %v", err) + } + }() + + // check ready + err = fmt.Errorf("an error") + var rsp *http.Response + for err != nil { + rsp, err = http.DefaultClient.Get("http://" + l.Addr().String() + "/") + time.Sleep(time.Millisecond) + } + io.Copy(io.Discard, rsp.Body) + rsp.Body.Close() + http.DefaultClient.CloseIdleConnections() + + buf := []byte("GET / HTTP/1.1\r\nHost: localhost\r\n\r\n") + result := make([]byte, 1024) + + for n := 0; n < 2; n++ { + c, err := net.Dial("tcp", l.Addr().String()) + if err != nil { + t.Fatalf("Failed to dial: %v", err) + return + } + _, err = c.Write(buf) + if err != nil { + t.Fatalf("Failed to write: %v", err) + c.Close() + return + } + + n, err := c.Read(result) + if err != nil { + t.Fatalf("Failed to write: %v", err) + c.Close() + return + } + + if n != 124 { + t.Fatalf("Read %d bytes: %q", n, string(result[0:n])) + } + c.Close() + + } +} + +func BenchmarkStackListener(b *testing.B) { + maxprocs := runtime.GOMAXPROCS(-5) + stackAddr := fmt.Sprintf(":90%02d", maxprocs) + stackListener, err := StackListener(Options{ + Log: &noLog{}, + Network: "tcp", + Address: stackAddr, + MaxConcurrency: 10000, + MaxQueueSize: 10000, + MemoryLimitBytes: 1000, + ConnectionBytes: 100, + QueueTimeout: time.Second, + //Metrics: metrics.Default, + Metrics: metrics.NoMetric{}, + }) + if err != nil { + b.Fatalf("Failed to create StackListener: %v", err) + } + + benchmarkListener(b, stackAddr, stackListener) + stackListener.Close() + +} + +type noLog struct{} + +func (*noLog) Error(...interface{}) {} +func (*noLog) Errorf(string, ...interface{}) {} +func (*noLog) Warn(...interface{}) {} +func (*noLog) Warnf(string, ...interface{}) {} +func (*noLog) Info(...interface{}) {} +func (*noLog) Infof(string, ...interface{}) {} +func (*noLog) Debug(...interface{}) {} +func (*noLog) Debugf(string, ...interface{}) {} + +func BenchmarkQueueListener(b *testing.B) { + queueAddr := ":9090" + queueListener, err := Listen(Options{ + Log: &noLog{}, + Network: "tcp", + Address: queueAddr, + MaxConcurrency: 10000, + MaxQueueSize: 10000, + MemoryLimitBytes: 1000, + ConnectionBytes: 100, + QueueTimeout: time.Second, + //Metrics: metrics.Default, + Metrics: metrics.NoMetric{}, + }) + if err != nil { + b.Fatalf("Failed to create QueueListener: %v", err) + } + benchmarkListener(b, queueAddr, queueListener) + queueListener.Close() +} + +func benchmarkListener(b *testing.B, addr string, l net.Listener) { + srv := &http.Server{ + Addr: addr, + Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusAccepted) + w.Write([]byte("OK")) + }), + ReadTimeout: time.Second, + ReadHeaderTimeout: time.Second, + WriteTimeout: time.Second, + IdleTimeout: time.Second, + MaxHeaderBytes: 1000, + } + defer srv.Close() + + cm := &skpnet.ConnManager{ + Keepalive: 10 * time.Second, + KeepaliveRequests: 10, + Metrics: metrics.Default, + } + cm.Configure(srv) + + go func() { + if err := srv.Serve(l); err != http.ErrServerClosed { + log.Errorf("Serve failed: %v", err) + } + }() + + // check ready + err := fmt.Errorf("an error") + var rsp *http.Response + for err != nil { + rsp, err = http.DefaultClient.Get("http://" + l.Addr().String() + "/") + time.Sleep(time.Millisecond) + } + io.Copy(io.Discard, rsp.Body) + rsp.Body.Close() + http.DefaultClient.CloseIdleConnections() + + // tr := http.Transport{ + // DisableKeepAlives: true, + // } + // client := http.Client{ + // Transport: &tr, + // } + + buf := []byte("GET / HTTP/1.1\r\nHost: localhost\r\n\r\n") + result := make([]byte, 1024) + b.ResetTimer() + + for n := 0; n < b.N; n++ { + c, err := net.Dial("tcp", l.Addr().String()) + if err != nil { + b.Logf("Failed to dial: %v", err) + return + } + _, err = c.Write(buf) + if err != nil { + b.Logf("Failed to write: %v", err) + c.Close() + return + } + + n, err := c.Read(result) + if err != nil { + b.Logf("Failed to write: %v", err) + c.Close() + return + } + + if n != 124 { + b.Logf("Read %d bytes: %q", n, string(result[0:n])) + } + c.Close() + } + + /* + for n := 0; n < b.N; n++ { + // TODO(sszuecs): we should create connections TCP/IP + // and not use a pooled client + + rsp, err := client.Get("http://" + l.Addr().String() + "/") + if err != nil { + b.Fatalf("Failed to send request: %v", err) + } + if rsp.StatusCode != http.StatusAccepted { + b.Fatalf("Failed to get status code: %d != %d", rsp.StatusCode, http.StatusAccepted) + } + res, err := io.ReadAll(rsp.Body) + if result := string(res); result != "OK" { + b.Logf("Failed to get result: %q", result) + } + rsp.Body.Close() + } + client.CloseIdleConnections() + */ +} diff --git a/queuelistener/stack_other.go b/queuelistener/stack_other.go new file mode 100644 index 0000000000..a1743e6bee --- /dev/null +++ b/queuelistener/stack_other.go @@ -0,0 +1,118 @@ +package queuelistener + +import ( + "net" + "sync/atomic" + "unsafe" +) + +// https://englyk.com/book2/Lock-Free_Data_Structures/ + +// Node represents a node in the stack +type Node struct { + value any + next *Node +} + +// LockFreeStack represents a lock-free stack +type LockFreeStack struct { + head unsafe.Pointer // *Node +} + +// NewLockFreeStack creates a new lock-free stack +func NewLockFreeStack() *LockFreeStack { + return &LockFreeStack{} +} + +// Push adds a new value onto the stack +func (s *LockFreeStack) Push(value interface{}) { + newNode := &Node{value: value} + for { + oldHead := atomic.LoadPointer(&s.head) + newNode.next = (*Node)(oldHead) + if atomic.CompareAndSwapPointer(&s.head, oldHead, unsafe.Pointer(newNode)) { + break + } + } +} + +// Pop removes and returns the value from the top of the stack +func (s *LockFreeStack) Pop() (value any, ok bool) { + for { + oldHead := atomic.LoadPointer(&s.head) + if oldHead == nil { + return nil, false // Stack is empty + } + newHead := (*Node)(oldHead).next + if atomic.CompareAndSwapPointer(&s.head, oldHead, unsafe.Pointer(newHead)) { + return (*Node)(oldHead).value, true + } + } +} + +// TODO: Elimination-Backoff Stack + +// TODO: Semantic Relaxation and Elastic Designs +// highly concurrent systems where minor deviations from strict LIFO order are acceptable in exchange for significant performance gains + +// TODO Semantically Relaxed Stack + +// TODO Wait-Free Stack: Goel Stack + +// TODO Wait-Free Stack: SIM Stack + +// Treiber Stack +type node[T any] struct { + val T + next *node[T] +} +type treiberStack[T any] struct { + head atomic.Pointer[node[T]] // node[T] +} + +func NewTreiberStack() *treiberStack[net.Conn] { + return &treiberStack[net.Conn]{} +} + +func (s *treiberStack[T]) Push(data T) { + newHead := node[T]{ + val: data, + next: s.head.Load(), + } + if s.head.CompareAndSwap(newHead.next, &newHead) { + return + } else { + s.Push(data) + } +} + +func (s *treiberStack[T]) Pop() *T { + oldHead := s.head.Load() // maybe panic + if oldHead == nil { + return nil + } + if s.head.CompareAndSwap(oldHead, oldHead.next) { + return &oldHead.val + } else { + return s.Pop() + } +} + +// https://en.wikipedia.org/wiki/Treiber_stack +/* + class LockFreeStack { + private class Node(val value: T, val next: Node?) + + private val head = AtomicReference?>(null) + + tailrec fun push(value: T) { + val newHead = Node(value = value, next = head.get()) + if (head.compareAndSet(newHead.next, newHead)) return else push(value) + } + + tailrec fun pop(): T { + val oldHead = head.get() ?: throw NoSuchElementException("Stack is empty") + return if (head.compareAndSet(oldHead, oldHead.next)) oldHead.value else pop() + } + } +*/ diff --git a/queuelistener/stack_other_test.go b/queuelistener/stack_other_test.go new file mode 100644 index 0000000000..ebb06c8cac --- /dev/null +++ b/queuelistener/stack_other_test.go @@ -0,0 +1,133 @@ +package queuelistener + +import ( + "net" + "testing" + + "github.com/amirylm/lockfree/core" + lstack "github.com/amirylm/lockfree/stack" + "github.com/golang-collections/collections/stack" +) + +func TestTreiberStack(t *testing.T) { + var c1, c2, c3 net.Conn + s := NewTreiberStack() + if v := s.Pop(); v != nil { + t.Fatalf("Failed to get nil from empty stack: %v", v) + } + + s.Push(c1) + s.Push(c2) + s.Push(c3) + if v := s.Pop(); *v != c3 { + t.Fatalf("Failed to get c3 from stack: %v", v) + } + if v := s.Pop(); *v != c2 { + t.Fatalf("Failed to get c2 from stack: %v", v) + } + if v := s.Pop(); *v != c1 { + t.Fatalf("Failed to get c1 from stack: %v", v) + } + + if v := s.Pop(); v != nil { + t.Fatalf("Failed to get nil from empty stack: %v", v) + } +} + +// https://pkg.go.dev/github.com/golang-collections/collections/stack + +func BenchmarkGoStack(b *testing.B) { + gostack := stack.New() + for n := 0; n < b.N; n++ { + gostack.Push(n) + k := gostack.Pop() + if k != n { + b.Fatalf("%d != %d", k, n) + } + } +} + +// https://github.com/amirylm/lockfree/blob/main/stack/stack.go + +func BenchmarkAmirylmLockFreeStack(b *testing.B) { + // func WithCapacity(c int) options.Option[Options] { + // func New[Value any](opts ...options.Option[core.Options]) core.Stack[Value] { + var c *net.Conn + llstack := lstack.New[*net.Conn](core.WithCapacity(stackSize)) + for n := 0; n < b.N; n++ { + llstack.Push(c) + k, _ := llstack.Pop() + if k != c { + b.Fatalf("%p != %p", k, c) + } + } +} + +func BenchmarkLockFreeStack(b *testing.B) { + var c *net.Conn + llstack := NewLockFreeStack() + for n := 0; n < b.N; n++ { + llstack.Push(c) + k, _ := llstack.Pop() + if k != c { + b.Fatalf("%p != %p", k, c) + } + } +} + +func BenchmarkTreiberStack(b *testing.B) { + var c net.Conn + mystack := NewTreiberStack() + for n := 0; n < b.N; n++ { + mystack.Push(c) + k := mystack.Pop() + if *k != c { + b.Fatalf("%p != %p", k, &c) + } + } +} + +func BenchmarkAmirylmLockFreeStackParallel(b *testing.B) { + // func WithCapacity(c int) options.Option[Options] { + // func New[Value any](opts ...options.Option[core.Options]) core.Stack[Value] { + var c *net.Conn + llstack := lstack.New[*net.Conn](core.WithCapacity(stackSize)) + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + llstack.Push(c) + k, _ := llstack.Pop() + if k != c { + b.Fatalf("%p != %p", k, c) + } + } + }) +} + +func BenchmarkLockFreeStackParallel(b *testing.B) { + var c *net.Conn + llstack := NewLockFreeStack() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + llstack.Push(c) + k, _ := llstack.Pop() + if k != c { + b.Fatalf("%p != %p", k, c) + } + } + }) +} + +func BenchmarkTreiberStackParallel(b *testing.B) { + var c net.Conn + mystack := NewTreiberStack() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + mystack.Push(c) + k := mystack.Pop() + if *k != c { + b.Fatalf("%v != %v", k, c) + } + + } + }) +} diff --git a/queuelistener/stack_test.go b/queuelistener/stack_test.go new file mode 100644 index 0000000000..85cd2be5bd --- /dev/null +++ b/queuelistener/stack_test.go @@ -0,0 +1,132 @@ +package queuelistener + +import ( + "net" + "testing" + "time" +) + +func TestNaiveStack(t *testing.T) { + var ( + c net.Conn + a [5]*external + ) + for i := range len(a) { + a[i] = &external{Conn: c} + } + for i := range len(a) { + if a[i] == nil { + t.Fatalf("a[%d] should not be nil", i) + } + } + + t.Run("pop before push should return nil until we get result", func(t *testing.T) { + s := NewStack() + + ch := make(chan struct{}) + dataCH := make(chan *external) + go func() { + <-ch + s.Push(a[0]) + }() + go func() { + v := s.Pop() + for v == nil { + time.Sleep(time.Millisecond) + v = s.Pop() + } + dataCH <- v + }() + close(ch) + if v := <-dataCH; v != a[0] { + t.Fatalf("Failed to get item from stack: %v", v) + } + }) + + t.Run("push pop push pop push pop ... should work", func(t *testing.T) { + s := NewStack() + // push pop; push pop; .. + for i := range len(a) { + s.Push(a[i]) + if v := s.Pop(); v != a[i] { + t.Fatalf("Failed to get %d from stack: %v", i, v) + } + } + }) + + t.Run("push push push .. pop pop pop... should work", func(t *testing.T) { + s := NewStack() + // push push ..; pop; pop;.. + for i := range len(a) { + s.Push(a[i]) + } + for i := range len(a) { + if v := s.Pop(); v != a[len(a)-i-1] { + t.Fatalf("Failed to get a[%d] from stack: %v", len(a)-i-1, v) + } + } + }) + + t.Run("test push max should return without change", func(t *testing.T) { + s := NewStack() + s.top = len(s.items) - 2 + + for i := range len(a) { + s.Push(a[i]) // only first will be pushed on our stack rest will be ignored + } + + if v := s.Pop(); v != a[0] { + t.Fatalf("Failed to get a[0] from stack: %v", v) + } + }) +} + +func BenchmarkNaiveStack(b *testing.B) { + var ( + c net.Conn + a [5]*external + ) + for i := range len(a) { + a[i] = &external{Conn: c} + } + + mystack := NewStack() + for n := 0; n < b.N; n++ { + for i := range len(a) { + mystack.Push(a[i]) + } + + for i := range len(a) { + k := mystack.Pop() + if k != a[len(a)-1-i] { + b.Fatalf("%p != %p", k, a[len(a)-1-i]) + } + } + } +} + +func BenchmarkNaiveStackParallel(b *testing.B) { + var ( + c net.Conn + a [5]*external + ) + for i := range 5 { + a[i] = &external{Conn: c} + } + + mystack := NewStack() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + for i := range len(a) { + mystack.Push(a[i]) + } + + for range len(a) { + k := mystack.Pop() + if k == nil { + b.Fatalf("%v", k) + } + } + } + }) +} diff --git a/skipper.go b/skipper.go index 944d282138..53df2e0c22 100644 --- a/skipper.go +++ b/skipper.go @@ -362,6 +362,9 @@ type Options struct { // by the proxy are closed. CloseIdleConnsPeriod time.Duration + // TCPQueueTimeoutServer is the timeout for the accept() handling in case StackListener is used + TCPQueueTimeoutServer time.Duration + // Defines ReadTimeoutServer for server http connections. ReadTimeoutServer time.Duration @@ -1336,12 +1339,13 @@ func listen(o *Options, address string, mtr metrics.Metrics) (net.Listener, erro } } - qto := o.ReadHeaderTimeoutServer + qto := o.TCPQueueTimeoutServer if qto <= 0 { - qto = o.ReadTimeoutServer + qto = time.Second } - return queuelistener.Listen(queuelistener.Options{ + return queuelistener.StackListener(queuelistener.Options{ + //return queuelistener.Listen(queuelistener.Options{ Network: "tcp", Address: address, MaxConcurrency: o.MaxTCPListenerConcurrency, diff --git a/skipper_test.go b/skipper_test.go index 051434ab29..0d21f720cc 100644 --- a/skipper_test.go +++ b/skipper_test.go @@ -526,6 +526,7 @@ func TestDataClients(t *testing.T) { ExpectedBytesPerRequest: 1024, ReadHeaderTimeoutServer: 0, ReadTimeoutServer: 1 * time.Second, + TCPQueueTimeoutServer: 1 * time.Second, MetricsFlavours: []string{"codahale"}, EnablePrometheusMetrics: true, LoadBalancerHealthCheckInterval: 3 * time.Second, From ece6ac94f930c4bd3a586606c1059543da4697b1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sandor=20Sz=C3=BCcs?= Date: Wed, 16 Jul 2025 10:32:27 +0200 Subject: [PATCH 2/4] adapt metrics upstream change MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Sandor Szücs --- metrics/metrics.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/metrics/metrics.go b/metrics/metrics.go index 206b9cbc7f..d100df4735 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -94,11 +94,13 @@ func (NoMetric) MeasureRouteLookup(time.Time) {} func (NoMetric) MeasureFilterCreate(string, time.Time) {} func (NoMetric) MeasureFilterRequest(string, time.Time) {} func (NoMetric) MeasureAllFiltersRequest(string, time.Time) {} +func (NoMetric) MeasureBackendRequestHeader(string, int) {} func (NoMetric) MeasureBackend(string, time.Time) {} func (NoMetric) MeasureBackendHost(string, time.Time) {} func (NoMetric) MeasureFilterResponse(string, time.Time) {} func (NoMetric) MeasureAllFiltersResponse(string, time.Time) {} func (NoMetric) MeasureResponse(int, string, string, time.Time) {} +func (NoMetric) MeasureResponseSize(string, int64) {} func (NoMetric) MeasureProxy(time.Duration, time.Duration) {} func (NoMetric) MeasureServe(string, string, string, int, time.Time) {} func (NoMetric) IncRoutingFailures() {} From 61dd8c96587c01ae94e6906d913272d2bc313b53 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sandor=20Sz=C3=BCcs?= Date: Wed, 16 Jul 2025 10:46:43 +0200 Subject: [PATCH 3/4] refactor: copy logs and metrics to listener to omit hoping through options MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Sandor Szücs --- queuelistener/listener.go | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/queuelistener/listener.go b/queuelistener/listener.go index 11c6bd86c1..aa2868be64 100644 --- a/queuelistener/listener.go +++ b/queuelistener/listener.go @@ -80,6 +80,8 @@ type Options struct { } type listener struct { + metrics metrics.Metrics + log logging.Logger options Options maxConcurrency int64 maxQueueSize int64 @@ -154,6 +156,8 @@ func listenWith(nl net.Listener, o Options) (net.Listener, error) { l := &listener{ options: o, + log: o.Log, + metrics: o.Metrics, maxConcurrency: o.maxConcurrency(), maxQueueSize: o.maxQueueSize(), externalListener: nl, @@ -164,7 +168,10 @@ func listenWith(nl net.Listener, o Options) (net.Listener, error) { releaseConnection: make(chan struct{}), quit: make(chan struct{}), } - o.Log.Infof("TCP lifo listener config: %s", l) + if l.metrics == nil { + l.metrics = metrics.NoMetric{} + } + l.log.Infof("TCP lifo listener config: %s", l) go l.listenExternal() go l.listenInternal() @@ -232,7 +239,7 @@ func (l *listener) listenExternal() { //lint:ignore SA1019 Temporary is deprecated in Go 1.18, but keep it for now (https://github.com/zalando/skipper/issues/1992) if nerr, ok := err.(net.Error); ok && nerr.Temporary() { delay = bounce(delay) - l.options.Log.Errorf( + l.log.Errorf( "queue listener: accept error: %v, retrying in %v", err, delay, @@ -308,10 +315,8 @@ func (l *listener) listenInternal() { ) } - if l.options.Metrics != nil { - l.options.Metrics.UpdateGauge(acceptedConnectionsKey, float64(concurrency)) - l.options.Metrics.UpdateGauge(queuedConnectionsKey, float64(queue.size)) - } + l.metrics.UpdateGauge(acceptedConnectionsKey, float64(concurrency)) + l.metrics.UpdateGauge(queuedConnectionsKey, float64(queue.size)) select { case conn := <-l.acceptExternal: @@ -361,7 +366,7 @@ func (l *listener) listenInternal() { // Closing the real listener in a separate goroutine is based on inspecting the // stdlib. It's fair to just log the errors. if err := l.externalListener.Close(); err != nil { - l.options.Log.Errorf("Failed to close network listener: %v.", err) + l.log.Errorf("Failed to close network listener: %v.", err) } if l.closedHook != nil { @@ -376,9 +381,7 @@ func (l *listener) listenInternal() { func (l *listener) Accept() (net.Conn, error) { select { case c := <-l.acceptInternal: - if l.options.Metrics != nil { - l.options.Metrics.MeasureSince(acceptLatencyKey, c.external.accepted) - } + l.metrics.MeasureSince(acceptLatencyKey, c.external.accepted) return c, nil case err := <-l.internalError: return nil, err From ad4fe67f382c0ef2a03e7cb74d0b563665f3b432 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sandor=20Sz=C3=BCcs?= Date: Wed, 16 Jul 2025 10:49:02 +0200 Subject: [PATCH 4/4] adapt NoMetric on upstream changes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Sandor Szücs --- metrics/metrics.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/metrics/metrics.go b/metrics/metrics.go index d100df4735..300e5c0d3d 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -109,8 +109,7 @@ func (NoMetric) MeasureBackend5xx(time.Time) {} func (NoMetric) IncErrorsStreaming(string) {} func (NoMetric) RegisterHandler(string, *http.ServeMux) {} func (NoMetric) UpdateGauge(string, float64) {} -func (NoMetric) IncValidRoutes() {} -func (NoMetric) IncInvalidRoutes(string) {} +func (NoMetric) UpdateInvalidRoute(map[string]int) {} func (NoMetric) Close() {} var _ Metrics = NoMetric{}