diff --git a/bigtable/internal/bidi/config_manager.go b/bigtable/internal/bidi/config_manager.go new file mode 100644 index 000000000000..16fc23cbfe53 --- /dev/null +++ b/bigtable/internal/bidi/config_manager.go @@ -0,0 +1,126 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package bidi + +import ( + "context" + "fmt" + "math/rand" + "net/url" + "sync" + "time" + + btpb "cloud.google.com/go/bigtable/apiv2/bigtablepb" + "google.golang.org/grpc/metadata" +) + +// ConfigManager handles fetching and applying client configuration. +type ConfigManager struct { + client btpb.BigtableClient + mu sync.RWMutex + currentConfig *btpb.ClientConfiguration + cancel context.CancelFunc +} + +// NewConfigManager creates a new ConfigManager. +func NewConfigManager(client btpb.BigtableClient) *ConfigManager { + return &ConfigManager{client: client} +} + +// GetClientConfiguration fetches the client configuration from the server. +func (m *ConfigManager) GetClientConfiguration(ctx context.Context, instanceName string, appProfileId string) (*btpb.ClientConfiguration, error) { + req := &btpb.GetClientConfigurationRequest{ + InstanceName: instanceName, + AppProfileId: appProfileId, + } + + requestParamsMD := metadata.Pairs(requestParamsHeader, + fmt.Sprintf("name=%s&app_profile_id=%s", url.QueryEscape(instanceName), url.QueryEscape(appProfileId))) + + originalContextMd, _ := metadata.FromOutgoingContext(ctx) + ctx = metadata.NewOutgoingContext(ctx, metadata.Join(originalContextMd, requestParamsMD)) + + return m.client.GetClientConfiguration(ctx, req) +} + +// ShouldUseSession returns true if the request should be routed to session protocol based on configuration. +func (m *ConfigManager) ShouldUseSession(config *btpb.ClientConfiguration) bool { + if config == nil || config.SessionConfiguration == nil { + return false + } + // Implement routing logic based on SessionLoad fraction. + // Returns true with probability equal to SessionLoad. + return rand.Float32() < config.SessionConfiguration.SessionLoad +} +func (m *ConfigManager) GetConfig() *btpb.ClientConfiguration { + m.mu.RLock() + defer m.mu.RUnlock() + return m.currentConfig +} + +func (m *ConfigManager) StartPolling(ctx context.Context, instanceName, appProfileId string) { + pollCtx, cancel := context.WithCancel(ctx) + m.mu.Lock() + m.cancel = cancel + m.mu.Unlock() + + go func() { + + ticker := time.NewTicker(1 * time.Hour) // Default interval to poll at beginning + defer ticker.Stop() + + for { + config, err := m.GetClientConfiguration(pollCtx, instanceName, appProfileId) + if err == nil && config != nil { + m.mu.Lock() + m.currentConfig = config + m.mu.Unlock() + + // Extract PollingInterval if available + var interval time.Duration + if config.Polling != nil { + switch p := config.Polling.(type) { + case *btpb.ClientConfiguration_StopPolling: + if p.StopPolling { + return + } + + case *btpb.ClientConfiguration_PollingConfiguration_: + if p.PollingConfiguration != nil && p.PollingConfiguration.PollingInterval != nil { + interval = time.Duration(p.PollingConfiguration.PollingInterval.Seconds) * time.Second + } + } + } + if interval > 0 { + ticker.Reset(interval) + } + } + + select { + case <-ticker.C: + case <-pollCtx.Done(): + return + } + } + }() +} + +func (m *ConfigManager) Close() { + m.mu.Lock() + defer m.mu.Unlock() + if m.cancel != nil { + m.cancel() + } +} diff --git a/bigtable/internal/bidi/config_manager_test.go b/bigtable/internal/bidi/config_manager_test.go new file mode 100644 index 000000000000..6cd3a8914849 --- /dev/null +++ b/bigtable/internal/bidi/config_manager_test.go @@ -0,0 +1,61 @@ +// Copyright 2026 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package bidi + +import ( + "testing" + + btpb "cloud.google.com/go/bigtable/apiv2/bigtablepb" +) + +func TestShouldUseSession(t *testing.T) { + cm := &ConfigManager{} + + t.Run("Always false on nil config", func(t *testing.T) { + if cm.ShouldUseSession(nil) { + t.Error("Expected false on nil config") + } + }) + + t.Run("Always false on nil SessionConfiguration", func(t *testing.T) { + config := &btpb.ClientConfiguration{} + if cm.ShouldUseSession(config) { + t.Error("Expected false on nil SessionConfiguration") + } + }) + + t.Run("Probabilistic routing", func(t *testing.T) { + config := &btpb.ClientConfiguration{ + SessionConfiguration: &btpb.SessionClientConfiguration{ + SessionLoad: 0.1, // 10% + }, + } + + iterations := 10000 + sessionCount := 0 + for i := 0; i < iterations; i++ { + if cm.ShouldUseSession(config) { + sessionCount++ + } + } + + expected := int(float32(iterations) * config.SessionConfiguration.SessionLoad) + tolerance := 200 // Allow some variance + + if sessionCount < expected-tolerance || sessionCount > expected+tolerance { + t.Errorf("Expected approximately %d session calls, got %d", expected, sessionCount) + } + }) +}