Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 126 additions & 0 deletions bigtable/internal/bidi/config_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
// Copyright 2026 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package bidi

import (
"context"
"fmt"
"math/rand"
"net/url"
"sync"
"time"

btpb "cloud.google.com/go/bigtable/apiv2/bigtablepb"
"google.golang.org/grpc/metadata"
)

// ConfigManager handles fetching and applying client configuration.
type ConfigManager struct {
client btpb.BigtableClient
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The RPC call to GetClientConfiguration lacks a timeout. If the network hangs, the polling goroutine could be blocked indefinitely. Ensure the RPC's context is tied to a lifecycle with a timeout to prevent resource leaks or long delays.

References
  1. Propagate the RPC's context to factory calls that create new gRPC connections to ensure that connection attempts are tied to the RPC's lifecycle and can be cancelled or timed out, preventing resource leaks or long delays.

mu sync.RWMutex
currentConfig *btpb.ClientConfiguration
cancel context.CancelFunc
}

// NewConfigManager creates a new ConfigManager.
func NewConfigManager(client btpb.BigtableClient) *ConfigManager {
return &ConfigManager{client: client}
}

// GetClientConfiguration fetches the client configuration from the server.
Comment on lines +35 to +42
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

ShouldUseSession is defined as a method on ConfigManager but it doesn't use the manager's internal state, requiring the caller to pass the config manually. It would be more idiomatic for the manager to provide a parameterless ShouldUseSession() method that uses m.currentConfig. Avoid converting this to a standalone helper function to maintain API consistency and follow repository patterns.

References
  1. Avoid redundant top-level functions if a similar method exists on a struct, especially if it leads to API inconsistency or deviates from parity with other SDKs.

func (m *ConfigManager) GetClientConfiguration(ctx context.Context, instanceName string, appProfileId string) (*btpb.ClientConfiguration, error) {

Check failure on line 43 in bigtable/internal/bidi/config_manager.go

View workflow job for this annotation

GitHub Actions / vet

method parameter appProfileId should be appProfileID
req := &btpb.GetClientConfigurationRequest{
InstanceName: instanceName,
AppProfileId: appProfileId,
}

requestParamsMD := metadata.Pairs(requestParamsHeader,
fmt.Sprintf("name=%s&app_profile_id=%s", url.QueryEscape(instanceName), url.QueryEscape(appProfileId)))

originalContextMd, _ := metadata.FromOutgoingContext(ctx)
ctx = metadata.NewOutgoingContext(ctx, metadata.Join(originalContextMd, requestParamsMD))

return m.client.GetClientConfiguration(ctx, req)
Comment on lines +49 to +55
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Calling StartPolling multiple times will overwrite the m.cancel function, leading to a leak of previous polling goroutines because they can no longer be stopped via Close(). Additionally, the goroutine should ensure m.cancel is cleared upon exit so that polling can be restarted if needed. The suggested fix uses a lock to prevent concurrent starts and ensures the field is cleared upon exit, while avoiding performing any slow operations while holding the exclusive lock.

func (m *ConfigManager) StartPolling(ctx context.Context, instanceName, appProfileId string) {
	m.mu.Lock()
	if m.cancel != nil {
		m.mu.Unlock()
		return
	}
	pollCtx, cancel := context.WithCancel(ctx)
	m.cancel = cancel
	m.mu.Unlock()

	go func() {
		defer func() {
			cancel()
			m.mu.Lock()
			m.cancel = nil
			m.mu.Unlock()
		}()
References
  1. Avoid performing slow operations, such as gRPC channel creation, while holding an exclusive lock, as this can block other operations and slow down the entire client.

}

// ShouldUseSession returns true if the request should be routed to session protocol based on configuration.
func (m *ConfigManager) ShouldUseSession(config *btpb.ClientConfiguration) bool {
if config == nil || config.SessionConfiguration == nil {
return false
}
// Implement routing logic based on SessionLoad fraction.
// Returns true with probability equal to SessionLoad.
return rand.Float32() < config.SessionConfiguration.SessionLoad
}
func (m *ConfigManager) GetConfig() *btpb.ClientConfiguration {

Check failure on line 67 in bigtable/internal/bidi/config_manager.go

View workflow job for this annotation

GitHub Actions / vet

exported method ConfigManager.GetConfig should have comment or be unexported
m.mu.RLock()
defer m.mu.RUnlock()
return m.currentConfig
}

func (m *ConfigManager) StartPolling(ctx context.Context, instanceName, appProfileId string) {

Check failure on line 73 in bigtable/internal/bidi/config_manager.go

View workflow job for this annotation

GitHub Actions / vet

method parameter appProfileId should be appProfileID

Check failure on line 73 in bigtable/internal/bidi/config_manager.go

View workflow job for this annotation

GitHub Actions / vet

exported method ConfigManager.StartPolling should have comment or be unexported
pollCtx, cancel := context.WithCancel(ctx)
m.mu.Lock()
m.cancel = cancel
m.mu.Unlock()

go func() {

ticker := time.NewTicker(1 * time.Hour) // Default interval to poll at beginning
defer ticker.Stop()

for {
config, err := m.GetClientConfiguration(pollCtx, instanceName, appProfileId)
if err == nil && config != nil {
m.mu.Lock()
m.currentConfig = config
m.mu.Unlock()

// Extract PollingInterval if available
var interval time.Duration
if config.Polling != nil {
switch p := config.Polling.(type) {
case *btpb.ClientConfiguration_StopPolling:
if p.StopPolling {
return
}

case *btpb.ClientConfiguration_PollingConfiguration_:
if p.PollingConfiguration != nil && p.PollingConfiguration.PollingInterval != nil {
interval = time.Duration(p.PollingConfiguration.PollingInterval.Seconds) * time.Second
}
}
}
if interval > 0 {
ticker.Reset(interval)
}
}

select {
case <-ticker.C:
case <-pollCtx.Done():
return
}
}
}()
}

func (m *ConfigManager) Close() {

Check failure on line 120 in bigtable/internal/bidi/config_manager.go

View workflow job for this annotation

GitHub Actions / vet

exported method ConfigManager.Close should have comment or be unexported
m.mu.Lock()
defer m.mu.Unlock()
if m.cancel != nil {
m.cancel()
}
}
61 changes: 61 additions & 0 deletions bigtable/internal/bidi/config_manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright 2026 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package bidi

import (
"testing"

btpb "cloud.google.com/go/bigtable/apiv2/bigtablepb"
)

func TestShouldUseSession(t *testing.T) {
cm := &ConfigManager{}

t.Run("Always false on nil config", func(t *testing.T) {
if cm.ShouldUseSession(nil) {
t.Error("Expected false on nil config")
}
})

t.Run("Always false on nil SessionConfiguration", func(t *testing.T) {
config := &btpb.ClientConfiguration{}
if cm.ShouldUseSession(config) {
t.Error("Expected false on nil SessionConfiguration")
}
})

t.Run("Probabilistic routing", func(t *testing.T) {
config := &btpb.ClientConfiguration{
SessionConfiguration: &btpb.SessionClientConfiguration{
SessionLoad: 0.1, // 10%
},
}

iterations := 10000
sessionCount := 0
for i := 0; i < iterations; i++ {
if cm.ShouldUseSession(config) {
sessionCount++
}
}

expected := int(float32(iterations) * config.SessionConfiguration.SessionLoad)
tolerance := 200 // Allow some variance

if sessionCount < expected-tolerance || sessionCount > expected+tolerance {
t.Errorf("Expected approximately %d session calls, got %d", expected, sessionCount)
}
})
}
Loading