diff --git a/go.mod b/go.mod index e88fae0..e766047 100644 --- a/go.mod +++ b/go.mod @@ -18,13 +18,14 @@ require ( github.com/onsi/ginkgo/v2 v2.27.2 github.com/onsi/gomega v1.38.2 github.com/ontai-dev/conductor-sdk v0.0.0-00010101000000-000000000000 + github.com/ontai-dev/dispatcher v0.0.0-00010101000000-000000000000 github.com/ontai-dev/guardian v0.0.0-00010101000000-000000000000 github.com/ontai-dev/platform v0.0.0-00010101000000-000000000000 - github.com/ontai-dev/dispatcher v0.0.0-00010101000000-000000000000 github.com/ontai-dev/seam v0.0.0-00010101000000-000000000000 github.com/ontai-dev/seam-sdk v0.0.0-00010101000000-000000000000 github.com/prometheus/client_golang v1.23.2 github.com/siderolabs/talos/pkg/machinery v1.12.6 + golang.org/x/time v0.14.0 google.golang.org/grpc v1.79.3 gopkg.in/yaml.v3 v3.0.1 helm.sh/helm/v3 v3.17.3 @@ -158,7 +159,6 @@ require ( golang.org/x/sys v0.41.0 // indirect golang.org/x/term v0.40.0 // indirect golang.org/x/text v0.34.0 // indirect - golang.org/x/time v0.14.0 // indirect golang.org/x/tools v0.41.0 // indirect gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20251202230838-ff82c1b0f217 // indirect diff --git a/internal/federation/metrics.go b/internal/federation/metrics.go new file mode 100644 index 0000000..33e11c2 --- /dev/null +++ b/internal/federation/metrics.go @@ -0,0 +1,29 @@ +package federation + +import ( + "github.com/prometheus/client_golang/prometheus" + ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/metrics" +) + +var ( + // metricActiveStreams tracks the current number of live streams accepted by FederationServer. + // ADR-F6 D4. + metricActiveStreams = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "conductor_federation_stream_active_count", + Help: "Current number of live streams accepted by FederationServer.", + }) + + // metricReconnectsTotal counts reconnect events observed per tenant cluster. + // ADR-F6 D4. + metricReconnectsTotal = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "conductor_federation_stream_reconnects_total", + Help: "Total number of stream reconnect events observed, labeled by cluster ID.", + }, + []string{"cluster_id"}, + ) +) + +func init() { + ctrlmetrics.Registry.MustRegister(metricActiveStreams, metricReconnectsTotal) +} diff --git a/internal/federation/server.go b/internal/federation/server.go index 6fa4daf..8f78ecb 100644 --- a/internal/federation/server.go +++ b/internal/federation/server.go @@ -7,8 +7,10 @@ import ( "fmt" "net" "sync" + "sync/atomic" "time" + "golang.org/x/time/rate" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" @@ -47,6 +49,19 @@ type clusterStatus struct { missedHeartbeats int } +// FederationServerOptions configures the stream admission limits for FederationServer. +// Zero values disable the corresponding limit. ADR-F6. +type FederationServerOptions struct { + // MaxConcurrentStreams is the maximum number of simultaneous active streams. + // When reached, new connections receive codes.ResourceExhausted. + // Must be in [1, 1000]; 0 means unlimited (no semaphore). Default: 50 via env. + MaxConcurrentStreams int + + // AdmissionRate is the token-bucket refill rate in tokens per second. + // Burst capacity is 2x this value. 0 means unlimited. Default: 5 via env. + AdmissionRate int +} + // FederationServer is the management-side federation gRPC server. // It listens on the federation port with mutual TLS, extracts cluster IDs // from client certificate SANs, and maintains the bidirectional stream with @@ -63,31 +78,82 @@ type FederationServer struct { mu sync.RWMutex // connectedClusters maps clusterID → stream status for heartbeat tracking. connectedClusters map[string]*clusterStatus + + // semaphore limits concurrent active streams. nil = unlimited. ADR-F6 D1. + semaphore chan struct{} + + // admissionLimiter rate-limits new stream accepts. nil = unlimited. ADR-F6 D2. + admissionLimiter *rate.Limiter + + // activeCount is the live stream count, kept in sync with the semaphore. ADR-F6 D4. + activeCount atomic.Int64 } // NewFederationServer constructs a FederationServer from certificate paths. // The server does not start until Start is called. // conductor-schema.md §18. -func NewFederationServer(caCertPath, serverCertPath, serverKeyPath string, kubeClient kubernetes.Interface) (*FederationServer, error) { +func NewFederationServer(caCertPath, serverCertPath, serverKeyPath string, kubeClient kubernetes.Interface, opts FederationServerOptions) (*FederationServer, error) { tlsCfg, err := BuildServerTLSConfig(caCertPath, serverCertPath, serverKeyPath) if err != nil { return nil, fmt.Errorf("federation server TLS config: %w", err) } - return &FederationServer{ - tlsCfg: tlsCfg, - kubeClient: kubeClient, - connectedClusters: make(map[string]*clusterStatus), - }, nil + return newFederationServer(tlsCfg, kubeClient, opts), nil } // NewFederationServerFromTLS constructs a FederationServer from an already-built // tls.Config. Used in tests to inject a test TLS config directly. -func NewFederationServerFromTLS(tlsCfg *tls.Config, kubeClient kubernetes.Interface) *FederationServer { - return &FederationServer{ +func NewFederationServerFromTLS(tlsCfg *tls.Config, kubeClient kubernetes.Interface, opts FederationServerOptions) *FederationServer { + return newFederationServer(tlsCfg, kubeClient, opts) +} + +func newFederationServer(tlsCfg *tls.Config, kubeClient kubernetes.Interface, opts FederationServerOptions) *FederationServer { + s := &FederationServer{ tlsCfg: tlsCfg, kubeClient: kubeClient, connectedClusters: make(map[string]*clusterStatus), } + if opts.MaxConcurrentStreams > 0 { + s.semaphore = make(chan struct{}, opts.MaxConcurrentStreams) + } + if opts.AdmissionRate > 0 { + s.admissionLimiter = rate.NewLimiter(rate.Limit(opts.AdmissionRate), 2*opts.AdmissionRate) + } + return s +} + +// ActiveStreamCount returns the number of currently active streams. ADR-F6 D4. +func (s *FederationServer) ActiveStreamCount() int64 { + return s.activeCount.Load() +} + +// ParseFederationMaxStreams parses FEDERATION_MAX_CONCURRENT_STREAMS env value. +// Valid range: [1, 1000]. Returns 50 (default) if empty, 0 on invalid input. +// ADR-F6 D1. +func ParseFederationMaxStreams(v string) int { + if v == "" { + return 50 + } + var n int + if _, err := fmt.Sscanf(v, "%d", &n); err != nil || n <= 0 || n > 1000 { + fmt.Printf("federation server: invalid FEDERATION_MAX_CONCURRENT_STREAMS %q (must be 1-1000) — using default 50\n", v) + return 50 + } + return n +} + +// ParseFederationAdmissionRate parses FEDERATION_ADMISSION_RATE env value. +// Returns 5 (default) if empty, 0 on invalid input (disables rate limiting). +// ADR-F6 D2. +func ParseFederationAdmissionRate(v string) int { + if v == "" { + return 5 + } + var n int + if _, err := fmt.Sscanf(v, "%d", &n); err != nil || n <= 0 { + fmt.Printf("federation server: invalid FEDERATION_ADMISSION_RATE %q (must be >0) — using default 5\n", v) + return 5 + } + return n } // Start begins listening on addr and serves the federation gRPC stream with mutual TLS. @@ -138,12 +204,39 @@ func (s *FederationServer) ConnectedClusterIDs() []string { // federationStream handles a single bidirectional stream from a connected tenant. // It implements the grpc.ServerStream interface handler for the FederationService/Stream method. func (s *FederationServer) federationStream(stream grpc.ServerStream) error { + // D2: admission rate-limit check before semaphore acquisition. ADR-F6. + if s.admissionLimiter != nil && !s.admissionLimiter.Allow() { + return status.Errorf(codes.ResourceExhausted, "federation server: admission rate limit exceeded") + } + + // D1: semaphore -- reject when max concurrent streams reached. ADR-F6. + if s.semaphore != nil { + select { + case s.semaphore <- struct{}{}: + // slot acquired + default: + return status.Errorf(codes.ResourceExhausted, "federation server: max concurrent stream limit reached") + } + defer func() { <-s.semaphore }() + } + + // Track active count and update the Prometheus gauge. ADR-F6 D4. + s.activeCount.Add(1) + metricActiveStreams.Inc() + defer func() { + s.activeCount.Add(-1) + metricActiveStreams.Dec() + }() + // Extract cluster ID from the peer TLS certificate SAN. clusterID, err := s.clusterIDFromStream(stream) if err != nil { return status.Errorf(codes.Unauthenticated, "cluster ID extraction: %v", err) } + // Count this as a reconnect event (every stream accept = one connection). ADR-F6 D4. + metricReconnectsTotal.WithLabelValues(clusterID).Inc() + // Register this cluster as connected. cs := &clusterStatus{} s.mu.Lock() diff --git a/internal/federation/server_pool_test.go b/internal/federation/server_pool_test.go new file mode 100644 index 0000000..8816c6a --- /dev/null +++ b/internal/federation/server_pool_test.go @@ -0,0 +1,155 @@ +package federation + +import ( + "sync" + "testing" + "time" +) + +// acquireSlot tests the semaphore directly, bypassing TLS cert extraction. +// Returns true if the slot was acquired (semaphore not full), false otherwise. +func acquireSlot(s *FederationServer) (release func(), ok bool) { + if s.semaphore == nil { + return func() {}, true + } + select { + case s.semaphore <- struct{}{}: + return func() { <-s.semaphore }, true + default: + return nil, false + } +} + +// TestFederationServer_RejectsWhenLimitReached verifies that a server with +// limit=2 rejects the third concurrent connection with RESOURCE_EXHAUSTED. +// ADR-F6 D1. +func TestFederationServer_RejectsWhenLimitReached(t *testing.T) { + opts := FederationServerOptions{MaxConcurrentStreams: 2} + s := newFederationServer(nil, nil, opts) + + // Acquire both slots. + rel1, ok1 := acquireSlot(s) + if !ok1 { + t.Fatal("expected slot 1 to be acquired") + } + defer rel1() + rel2, ok2 := acquireSlot(s) + if !ok2 { + t.Fatal("expected slot 2 to be acquired") + } + defer rel2() + + // Third attempt must be rejected. + _, ok3 := acquireSlot(s) + if ok3 { + t.Error("expected slot 3 to be rejected (limit=2 reached)") + } +} + +// TestFederationServer_AdmitsUpToLimit verifies that a server with limit=2 +// admits exactly two concurrent streams and both are recorded as active. +// ADR-F6 D1. +func TestFederationServer_AdmitsUpToLimit(t *testing.T) { + opts := FederationServerOptions{MaxConcurrentStreams: 2} + s := newFederationServer(nil, nil, opts) + + var mu sync.Mutex + admitted := 0 + + var wg sync.WaitGroup + for i := 0; i < 2; i++ { + wg.Add(1) + go func() { + defer wg.Done() + rel, ok := acquireSlot(s) + if !ok { + return + } + defer rel() + mu.Lock() + admitted++ + mu.Unlock() + // Hold the slot briefly. + time.Sleep(10 * time.Millisecond) + }() + } + wg.Wait() + + if admitted != 2 { + t.Errorf("expected 2 admitted streams, got %d", admitted) + } +} + +// TestActiveStreamCount_DecreasesOnDisconnect verifies that ActiveStreamCount +// increments when a slot is acquired and decrements when it is released. +// ADR-F6 D4. +func TestActiveStreamCount_DecreasesOnDisconnect(t *testing.T) { + opts := FederationServerOptions{MaxConcurrentStreams: 5} + s := newFederationServer(nil, nil, opts) + + if n := s.ActiveStreamCount(); n != 0 { + t.Fatalf("expected ActiveStreamCount=0 before any stream, got %d", n) + } + + // Simulate what federationStream does: acquire semaphore + track activeCount. + rel, ok := acquireSlot(s) + if !ok { + t.Fatal("expected slot to be acquired") + } + s.activeCount.Add(1) + metricActiveStreams.Inc() + + if n := s.ActiveStreamCount(); n != 1 { + t.Errorf("expected ActiveStreamCount=1 after connect, got %d", n) + } + + // Simulate disconnect. + s.activeCount.Add(-1) + metricActiveStreams.Dec() + rel() + + if n := s.ActiveStreamCount(); n != 0 { + t.Errorf("expected ActiveStreamCount=0 after disconnect, got %d", n) + } +} + +// TestParseFederationMaxStreams verifies the env var parser. ADR-F6 D1. +func TestParseFederationMaxStreams(t *testing.T) { + cases := []struct { + input string + want int + }{ + {"", 50}, + {"10", 10}, + {"1000", 1000}, + {"0", 50}, // out of range: default + {"1001", 50}, // out of range: default + {"bad", 50}, // invalid: default + {"-5", 50}, // negative: default + } + for _, tc := range cases { + if got := ParseFederationMaxStreams(tc.input); got != tc.want { + t.Errorf("ParseFederationMaxStreams(%q) = %d, want %d", tc.input, got, tc.want) + } + } +} + +// TestParseFederationAdmissionRate verifies the env var parser. ADR-F6 D2. +func TestParseFederationAdmissionRate(t *testing.T) { + cases := []struct { + input string + want int + }{ + {"", 5}, + {"10", 10}, + {"1", 1}, + {"0", 5}, // zero invalid: default + {"-1", 5}, // negative: default + {"bad", 5}, // invalid: default + } + for _, tc := range cases { + if got := ParseFederationAdmissionRate(tc.input); got != tc.want { + t.Errorf("ParseFederationAdmissionRate(%q) = %d, want %d", tc.input, got, tc.want) + } + } +} diff --git a/internal/kernel/agent.go b/internal/kernel/agent.go index 19d766a..b52f655 100644 --- a/internal/kernel/agent.go +++ b/internal/kernel/agent.go @@ -354,7 +354,11 @@ func RunAgent(goCtx context.Context, execCtx config.ExecutionContext, client kub if fedCACertPath != "" && fedServerCertPath != "" && fedServerKeyPath != "" { // Management Conductor: start the federation server. - fedServer, fedErr := federation.NewFederationServer(fedCACertPath, fedServerCertPath, fedServerKeyPath, nil) + fedOpts := federation.FederationServerOptions{ + MaxConcurrentStreams: federation.ParseFederationMaxStreams(os.Getenv("FEDERATION_MAX_CONCURRENT_STREAMS")), + AdmissionRate: federation.ParseFederationAdmissionRate(os.Getenv("FEDERATION_ADMISSION_RATE")), + } + fedServer, fedErr := federation.NewFederationServer(fedCACertPath, fedServerCertPath, fedServerKeyPath, nil, fedOpts) if fedErr != nil { return fmt.Errorf("conductor agent: build federation server: %w", fedErr) } diff --git a/test/integration/federation/stream_integration_test.go b/test/integration/federation/stream_integration_test.go index cb12e8b..4ffa890 100644 --- a/test/integration/federation/stream_integration_test.go +++ b/test/integration/federation/stream_integration_test.go @@ -209,7 +209,7 @@ func TestStream_HeartBeat_ServerRespondsWithACK(t *testing.T) { if err != nil { t.Fatalf("server TLS: %v", err) } - srv := federation.NewFederationServerFromTLS(serverTLS, nil) + srv := federation.NewFederationServerFromTLS(serverTLS, nil, federation.FederationServerOptions{}) addr, _ := startStreamServer(t, srv) clientTLS, err := federation.BuildClientTLSConfig(caPath, clientCertPath, clientKeyPath) @@ -264,7 +264,7 @@ func TestStream_AuditEventBatch_ServerRespondsWithAck(t *testing.T) { t.Fatalf("server TLS: %v", err) } // kubeClient is nil — server skips ConfigMap creation but still ACKs. - srv := federation.NewFederationServerFromTLS(serverTLS, nil) + srv := federation.NewFederationServerFromTLS(serverTLS, nil, federation.FederationServerOptions{}) addr, _ := startStreamServer(t, srv) clientTLS, err := federation.BuildClientTLSConfig(caPath, clientCertPath, clientKeyPath) @@ -327,7 +327,7 @@ func TestStream_ClusterID_ExtractedFromClientCert(t *testing.T) { if err != nil { t.Fatalf("server TLS: %v", err) } - srv := federation.NewFederationServerFromTLS(serverTLS, nil) + srv := federation.NewFederationServerFromTLS(serverTLS, nil, federation.FederationServerOptions{}) addr, _ := startStreamServer(t, srv) clientTLS, err := federation.BuildClientTLSConfig(caPath, clientCertPath, clientKeyPath) @@ -374,7 +374,7 @@ func TestStream_WALReplay_OnReconnect(t *testing.T) { if err != nil { t.Fatalf("server TLS: %v", err) } - srv := federation.NewFederationServerFromTLS(serverTLS, nil) + srv := federation.NewFederationServerFromTLS(serverTLS, nil, federation.FederationServerOptions{}) addr, _ := startStreamServer(t, srv) // Pre-populate WAL with 3 entries; ACK sequence 1. diff --git a/test/unit/federation/federation_stream_test.go b/test/unit/federation/federation_stream_test.go index 6ba1aec..2eac4f6 100644 --- a/test/unit/federation/federation_stream_test.go +++ b/test/unit/federation/federation_stream_test.go @@ -38,7 +38,7 @@ func setupStreamTest(t *testing.T) *streamTestEnv { serverCertPath, serverKeyPath, caPath := writeTempCerts(t, serverCertPEM, serverKeyPEM, ca.caPEM()) // Use a fake kubeClient in tests (nil — server skips ConfigMap creation). - srv, err := federation.NewFederationServer(caPath, serverCertPath, serverKeyPath, nil) + srv, err := federation.NewFederationServer(caPath, serverCertPath, serverKeyPath, nil, federation.FederationServerOptions{}) if err != nil { t.Fatalf("NewFederationServer: %v", err) } diff --git a/test/unit/federation/federation_tls_test.go b/test/unit/federation/federation_tls_test.go index 03d347f..2def2ca 100644 --- a/test/unit/federation/federation_tls_test.go +++ b/test/unit/federation/federation_tls_test.go @@ -286,7 +286,7 @@ func TestFederationServer_gRPC_AcceptsValidCert(t *testing.T) { serverCertPEM, serverKeyPEM := ca.issueServerCert(t, []string{"localhost"}) serverCertPath, serverKeyPath, caPath := writeTempCerts(t, serverCertPEM, serverKeyPEM, ca.caPEM()) - srv, err := federation.NewFederationServer(caPath, serverCertPath, serverKeyPath, nil) + srv, err := federation.NewFederationServer(caPath, serverCertPath, serverKeyPath, nil, federation.FederationServerOptions{}) if err != nil { t.Fatalf("NewFederationServer: %v", err) } @@ -334,7 +334,7 @@ func TestFederationServer_gRPC_RejectsNoCert(t *testing.T) { serverCertPEM, serverKeyPEM := ca.issueServerCert(t, []string{"localhost"}) serverCertPath, serverKeyPath, caPath := writeTempCerts(t, serverCertPEM, serverKeyPEM, ca.caPEM()) - srv, err := federation.NewFederationServer(caPath, serverCertPath, serverKeyPath, nil) + srv, err := federation.NewFederationServer(caPath, serverCertPath, serverKeyPath, nil, federation.FederationServerOptions{}) if err != nil { t.Fatalf("NewFederationServer: %v", err) } @@ -389,7 +389,7 @@ func TestFederationServer_gRPC_RejectsWrongCA(t *testing.T) { serverCertPEM, serverKeyPEM := serverCA.issueServerCert(t, []string{"localhost"}) serverCertPath, serverKeyPath, caPath := writeTempCerts(t, serverCertPEM, serverKeyPEM, serverCA.caPEM()) - srv, err := federation.NewFederationServer(caPath, serverCertPath, serverKeyPath, nil) + srv, err := federation.NewFederationServer(caPath, serverCertPath, serverKeyPath, nil, federation.FederationServerOptions{}) if err != nil { t.Fatalf("NewFederationServer: %v", err) } @@ -460,7 +460,7 @@ func TestFederationClient_ClusterIDExtraction(t *testing.T) { serverCertPEM, serverKeyPEM := ca.issueServerCert(t, []string{"localhost"}) serverCertPath, serverKeyPath, caPath := writeTempCerts(t, serverCertPEM, serverKeyPEM, ca.caPEM()) - srv, err := federation.NewFederationServer(caPath, serverCertPath, serverKeyPath, nil) + srv, err := federation.NewFederationServer(caPath, serverCertPath, serverKeyPath, nil, federation.FederationServerOptions{}) if err != nil { t.Fatalf("NewFederationServer: %v", err) }