Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ require (
github.com/onsi/ginkgo/v2 v2.27.2
github.com/onsi/gomega v1.38.2
github.com/ontai-dev/conductor-sdk v0.0.0-00010101000000-000000000000
github.com/ontai-dev/dispatcher v0.0.0-00010101000000-000000000000
github.com/ontai-dev/guardian v0.0.0-00010101000000-000000000000
github.com/ontai-dev/platform v0.0.0-00010101000000-000000000000
github.com/ontai-dev/dispatcher v0.0.0-00010101000000-000000000000
github.com/ontai-dev/seam v0.0.0-00010101000000-000000000000
github.com/ontai-dev/seam-sdk v0.0.0-00010101000000-000000000000
github.com/prometheus/client_golang v1.23.2
github.com/siderolabs/talos/pkg/machinery v1.12.6
golang.org/x/time v0.14.0
google.golang.org/grpc v1.79.3
gopkg.in/yaml.v3 v3.0.1
helm.sh/helm/v3 v3.17.3
Expand Down Expand Up @@ -158,7 +159,6 @@ require (
golang.org/x/sys v0.41.0 // indirect
golang.org/x/term v0.40.0 // indirect
golang.org/x/text v0.34.0 // indirect
golang.org/x/time v0.14.0 // indirect
golang.org/x/tools v0.41.0 // indirect
gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20251202230838-ff82c1b0f217 // indirect
Expand Down
29 changes: 29 additions & 0 deletions internal/federation/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package federation

import (
"github.com/prometheus/client_golang/prometheus"
ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/metrics"
)

var (
// metricActiveStreams tracks the current number of live streams accepted by FederationServer.
// ADR-F6 D4.
metricActiveStreams = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "conductor_federation_stream_active_count",
Help: "Current number of live streams accepted by FederationServer.",
})

// metricReconnectsTotal counts reconnect events observed per tenant cluster.
// ADR-F6 D4.
metricReconnectsTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "conductor_federation_stream_reconnects_total",
Help: "Total number of stream reconnect events observed, labeled by cluster ID.",
},
[]string{"cluster_id"},
)
)

func init() {
ctrlmetrics.Registry.MustRegister(metricActiveStreams, metricReconnectsTotal)
}
109 changes: 101 additions & 8 deletions internal/federation/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ import (
"fmt"
"net"
"sync"
"sync/atomic"
"time"

"golang.org/x/time/rate"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
Expand Down Expand Up @@ -47,6 +49,19 @@ type clusterStatus struct {
missedHeartbeats int
}

// FederationServerOptions configures the stream admission limits for FederationServer.
// Zero values disable the corresponding limit. ADR-F6.
type FederationServerOptions struct {
// MaxConcurrentStreams is the maximum number of simultaneous active streams.
// When reached, new connections receive codes.ResourceExhausted.
// Must be in [1, 1000]; 0 means unlimited (no semaphore). Default: 50 via env.
MaxConcurrentStreams int

// AdmissionRate is the token-bucket refill rate in tokens per second.
// Burst capacity is 2x this value. 0 means unlimited. Default: 5 via env.
AdmissionRate int
}

// FederationServer is the management-side federation gRPC server.
// It listens on the federation port with mutual TLS, extracts cluster IDs
// from client certificate SANs, and maintains the bidirectional stream with
Expand All @@ -63,31 +78,82 @@ type FederationServer struct {
mu sync.RWMutex
// connectedClusters maps clusterID → stream status for heartbeat tracking.
connectedClusters map[string]*clusterStatus

// semaphore limits concurrent active streams. nil = unlimited. ADR-F6 D1.
semaphore chan struct{}

// admissionLimiter rate-limits new stream accepts. nil = unlimited. ADR-F6 D2.
admissionLimiter *rate.Limiter

// activeCount is the live stream count, kept in sync with the semaphore. ADR-F6 D4.
activeCount atomic.Int64
}

// NewFederationServer constructs a FederationServer from certificate paths.
// The server does not start until Start is called.
// conductor-schema.md §18.
func NewFederationServer(caCertPath, serverCertPath, serverKeyPath string, kubeClient kubernetes.Interface) (*FederationServer, error) {
func NewFederationServer(caCertPath, serverCertPath, serverKeyPath string, kubeClient kubernetes.Interface, opts FederationServerOptions) (*FederationServer, error) {
tlsCfg, err := BuildServerTLSConfig(caCertPath, serverCertPath, serverKeyPath)
if err != nil {
return nil, fmt.Errorf("federation server TLS config: %w", err)
}
return &FederationServer{
tlsCfg: tlsCfg,
kubeClient: kubeClient,
connectedClusters: make(map[string]*clusterStatus),
}, nil
return newFederationServer(tlsCfg, kubeClient, opts), nil
}

// NewFederationServerFromTLS constructs a FederationServer from an already-built
// tls.Config. Used in tests to inject a test TLS config directly.
func NewFederationServerFromTLS(tlsCfg *tls.Config, kubeClient kubernetes.Interface) *FederationServer {
return &FederationServer{
func NewFederationServerFromTLS(tlsCfg *tls.Config, kubeClient kubernetes.Interface, opts FederationServerOptions) *FederationServer {
return newFederationServer(tlsCfg, kubeClient, opts)
}

func newFederationServer(tlsCfg *tls.Config, kubeClient kubernetes.Interface, opts FederationServerOptions) *FederationServer {
s := &FederationServer{
tlsCfg: tlsCfg,
kubeClient: kubeClient,
connectedClusters: make(map[string]*clusterStatus),
}
if opts.MaxConcurrentStreams > 0 {
s.semaphore = make(chan struct{}, opts.MaxConcurrentStreams)
}
if opts.AdmissionRate > 0 {
s.admissionLimiter = rate.NewLimiter(rate.Limit(opts.AdmissionRate), 2*opts.AdmissionRate)
}
return s
}

// ActiveStreamCount returns the number of currently active streams. ADR-F6 D4.
func (s *FederationServer) ActiveStreamCount() int64 {
return s.activeCount.Load()
}

// ParseFederationMaxStreams parses FEDERATION_MAX_CONCURRENT_STREAMS env value.
// Valid range: [1, 1000]. Returns 50 (default) if empty, 0 on invalid input.
// ADR-F6 D1.
func ParseFederationMaxStreams(v string) int {
if v == "" {
return 50
}
var n int
if _, err := fmt.Sscanf(v, "%d", &n); err != nil || n <= 0 || n > 1000 {
fmt.Printf("federation server: invalid FEDERATION_MAX_CONCURRENT_STREAMS %q (must be 1-1000) — using default 50\n", v)
return 50
}
return n
}

// ParseFederationAdmissionRate parses FEDERATION_ADMISSION_RATE env value.
// Returns 5 (default) if empty, 0 on invalid input (disables rate limiting).
// ADR-F6 D2.
func ParseFederationAdmissionRate(v string) int {
if v == "" {
return 5
}
var n int
if _, err := fmt.Sscanf(v, "%d", &n); err != nil || n <= 0 {
fmt.Printf("federation server: invalid FEDERATION_ADMISSION_RATE %q (must be >0) — using default 5\n", v)
return 5
}
return n
}

// Start begins listening on addr and serves the federation gRPC stream with mutual TLS.
Expand Down Expand Up @@ -138,12 +204,39 @@ func (s *FederationServer) ConnectedClusterIDs() []string {
// federationStream handles a single bidirectional stream from a connected tenant.
// It implements the grpc.ServerStream interface handler for the FederationService/Stream method.
func (s *FederationServer) federationStream(stream grpc.ServerStream) error {
// D2: admission rate-limit check before semaphore acquisition. ADR-F6.
if s.admissionLimiter != nil && !s.admissionLimiter.Allow() {
return status.Errorf(codes.ResourceExhausted, "federation server: admission rate limit exceeded")
}

// D1: semaphore -- reject when max concurrent streams reached. ADR-F6.
if s.semaphore != nil {
select {
case s.semaphore <- struct{}{}:
// slot acquired
default:
return status.Errorf(codes.ResourceExhausted, "federation server: max concurrent stream limit reached")
}
defer func() { <-s.semaphore }()
}

// Track active count and update the Prometheus gauge. ADR-F6 D4.
s.activeCount.Add(1)
metricActiveStreams.Inc()
defer func() {
s.activeCount.Add(-1)
metricActiveStreams.Dec()
}()

// Extract cluster ID from the peer TLS certificate SAN.
clusterID, err := s.clusterIDFromStream(stream)
if err != nil {
return status.Errorf(codes.Unauthenticated, "cluster ID extraction: %v", err)
}

// Count this as a reconnect event (every stream accept = one connection). ADR-F6 D4.
metricReconnectsTotal.WithLabelValues(clusterID).Inc()

// Register this cluster as connected.
cs := &clusterStatus{}
s.mu.Lock()
Expand Down
155 changes: 155 additions & 0 deletions internal/federation/server_pool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package federation

import (
"sync"
"testing"
"time"
)

// acquireSlot tests the semaphore directly, bypassing TLS cert extraction.
// Returns true if the slot was acquired (semaphore not full), false otherwise.
func acquireSlot(s *FederationServer) (release func(), ok bool) {
if s.semaphore == nil {
return func() {}, true
}
select {
case s.semaphore <- struct{}{}:
return func() { <-s.semaphore }, true
default:
return nil, false
}
}

// TestFederationServer_RejectsWhenLimitReached verifies that a server with
// limit=2 rejects the third concurrent connection with RESOURCE_EXHAUSTED.
// ADR-F6 D1.
func TestFederationServer_RejectsWhenLimitReached(t *testing.T) {
opts := FederationServerOptions{MaxConcurrentStreams: 2}
s := newFederationServer(nil, nil, opts)

// Acquire both slots.
rel1, ok1 := acquireSlot(s)
if !ok1 {
t.Fatal("expected slot 1 to be acquired")
}
defer rel1()
rel2, ok2 := acquireSlot(s)
if !ok2 {
t.Fatal("expected slot 2 to be acquired")
}
defer rel2()

// Third attempt must be rejected.
_, ok3 := acquireSlot(s)
if ok3 {
t.Error("expected slot 3 to be rejected (limit=2 reached)")
}
}

// TestFederationServer_AdmitsUpToLimit verifies that a server with limit=2
// admits exactly two concurrent streams and both are recorded as active.
// ADR-F6 D1.
func TestFederationServer_AdmitsUpToLimit(t *testing.T) {
opts := FederationServerOptions{MaxConcurrentStreams: 2}
s := newFederationServer(nil, nil, opts)

var mu sync.Mutex
admitted := 0

var wg sync.WaitGroup
for i := 0; i < 2; i++ {
wg.Add(1)
go func() {
defer wg.Done()
rel, ok := acquireSlot(s)
if !ok {
return
}
defer rel()
mu.Lock()
admitted++
mu.Unlock()
// Hold the slot briefly.
time.Sleep(10 * time.Millisecond)
}()
}
wg.Wait()

if admitted != 2 {
t.Errorf("expected 2 admitted streams, got %d", admitted)
}
}

// TestActiveStreamCount_DecreasesOnDisconnect verifies that ActiveStreamCount
// increments when a slot is acquired and decrements when it is released.
// ADR-F6 D4.
func TestActiveStreamCount_DecreasesOnDisconnect(t *testing.T) {
opts := FederationServerOptions{MaxConcurrentStreams: 5}
s := newFederationServer(nil, nil, opts)

if n := s.ActiveStreamCount(); n != 0 {
t.Fatalf("expected ActiveStreamCount=0 before any stream, got %d", n)
}

// Simulate what federationStream does: acquire semaphore + track activeCount.
rel, ok := acquireSlot(s)
if !ok {
t.Fatal("expected slot to be acquired")
}
s.activeCount.Add(1)
metricActiveStreams.Inc()

if n := s.ActiveStreamCount(); n != 1 {
t.Errorf("expected ActiveStreamCount=1 after connect, got %d", n)
}

// Simulate disconnect.
s.activeCount.Add(-1)
metricActiveStreams.Dec()
rel()

if n := s.ActiveStreamCount(); n != 0 {
t.Errorf("expected ActiveStreamCount=0 after disconnect, got %d", n)
}
}

// TestParseFederationMaxStreams verifies the env var parser. ADR-F6 D1.
func TestParseFederationMaxStreams(t *testing.T) {
cases := []struct {
input string
want int
}{
{"", 50},
{"10", 10},
{"1000", 1000},
{"0", 50}, // out of range: default
{"1001", 50}, // out of range: default
{"bad", 50}, // invalid: default
{"-5", 50}, // negative: default
}
for _, tc := range cases {
if got := ParseFederationMaxStreams(tc.input); got != tc.want {
t.Errorf("ParseFederationMaxStreams(%q) = %d, want %d", tc.input, got, tc.want)
}
}
}

// TestParseFederationAdmissionRate verifies the env var parser. ADR-F6 D2.
func TestParseFederationAdmissionRate(t *testing.T) {
cases := []struct {
input string
want int
}{
{"", 5},
{"10", 10},
{"1", 1},
{"0", 5}, // zero invalid: default
{"-1", 5}, // negative: default
{"bad", 5}, // invalid: default
}
for _, tc := range cases {
if got := ParseFederationAdmissionRate(tc.input); got != tc.want {
t.Errorf("ParseFederationAdmissionRate(%q) = %d, want %d", tc.input, got, tc.want)
}
}
}
6 changes: 5 additions & 1 deletion internal/kernel/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,11 @@ func RunAgent(goCtx context.Context, execCtx config.ExecutionContext, client kub

if fedCACertPath != "" && fedServerCertPath != "" && fedServerKeyPath != "" {
// Management Conductor: start the federation server.
fedServer, fedErr := federation.NewFederationServer(fedCACertPath, fedServerCertPath, fedServerKeyPath, nil)
fedOpts := federation.FederationServerOptions{
MaxConcurrentStreams: federation.ParseFederationMaxStreams(os.Getenv("FEDERATION_MAX_CONCURRENT_STREAMS")),
AdmissionRate: federation.ParseFederationAdmissionRate(os.Getenv("FEDERATION_ADMISSION_RATE")),
}
fedServer, fedErr := federation.NewFederationServer(fedCACertPath, fedServerCertPath, fedServerKeyPath, nil, fedOpts)
if fedErr != nil {
return fmt.Errorf("conductor agent: build federation server: %w", fedErr)
}
Expand Down
Loading
Loading