Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions driver/controller/startup/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ go_library(
"@com_google_cloud_go_pubsub//:pubsub",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//credentials",
"@org_golang_google_grpc//encoding/gzip",
"@org_golang_google_grpc//status",
"@org_golang_google_protobuf//types/known/timestamppb",
Expand Down
27 changes: 23 additions & 4 deletions driver/controller/startup/startup.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
shell "github.com/ipfs/go-ipfs-api"
"github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/encoding/gzip"

"sentioxyz/sentio-core/common/chx"
Expand Down Expand Up @@ -73,9 +74,20 @@ func (c *baseStartupController) releaseAll() {
}
}

// dialOptions returns the gRPC dial options for backend services. When the
// driver binary injects DialCredentials (e.g. the repo's mTLS credentials), they
// override the rpc package's insecure default; otherwise the default is kept.
func (c *baseStartupController) dialOptions() []grpc.DialOption {
opts := []grpc.DialOption{rpc.RetryDialOption}
if c.config.DialCredentials != nil {
opts = append(opts, grpc.WithTransportCredentials(c.config.DialCredentials))
}
return opts
}

func (c *baseStartupController) connectToProcessorService(ctx context.Context) error {
_, logger := log.FromContext(ctx)
conn, err := rpc.DialAuto(c.config.ProcessorService, rpc.RetryDialOption)
conn, err := rpc.Dial(c.config.ProcessorService, c.dialOptions()...)
if err != nil {
return errors.Wrapf(err, "dial to processor service %s failed", c.config.ProcessorService)
}
Expand All @@ -94,7 +106,7 @@ func (c *baseStartupController) connectToUsageService(ctx context.Context) error
logger.Warnf("no usage service so will not connect to usage service")
return nil
}
conn, err := rpc.DialAuto(c.config.UsageService, rpc.RetryDialOption)
conn, err := rpc.Dial(c.config.UsageService, c.dialOptions()...)
if err != nil {
return errors.Wrapf(err, "dial to usage service %s failed", c.config.UsageService)
}
Expand All @@ -117,7 +129,7 @@ func (c *baseStartupController) connectToDBRegistryService(ctx context.Context)
logger.Warnf("no db registry service configured so will not connect to it")
return nil
}
conn, err := rpc.DialAuto(c.config.DBRegistryService, rpc.RetryDialOption)
conn, err := rpc.Dial(c.config.DBRegistryService, c.dialOptions()...)
if err != nil {
return errors.Wrapf(err, "dial to db registry service %s failed", c.config.DBRegistryService)
}
Expand Down Expand Up @@ -165,7 +177,7 @@ func (c *baseStartupController) createWebhookSubscription(ctx context.Context) e
logger.Warn("no webhook service so will not create webhook subscription")
return nil
}
conn, err := rpc.DialAuto(c.config.WebhookService, rpc.RetryDialOption)
conn, err := rpc.Dial(c.config.WebhookService, c.dialOptions()...)
if err != nil {
return errors.Wrapf(err, "dial to webhook service %s failed", c.config.WebhookService)
}
Expand Down Expand Up @@ -569,6 +581,13 @@ type Config struct {
// the otel instrument and is built by the driver binary so the controller
// does not import the binary's metrics package.
EntityMetricsMonitor persistent.MetricsMonitor

// DialCredentials are the gRPC transport credentials used to dial the backend
// services (processor / usage / db-registry / webhook). The rpc package here
// defaults to insecure; the driver binary injects its own credentials (e.g.
// mTLS) so the TLS policy and certificates stay out of sentio-core. Nil keeps
// the insecure default.
DialCredentials credentials.TransportCredentials
}

func Main(config Config) {
Expand Down
Loading