-
Notifications
You must be signed in to change notification settings - Fork 116
Add a cassandra-backed storage option #246
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
pkg/blobstore/cassandra/README.md
Outdated
| The tables that are required can be set up using command similar to the | ||
| below in a `cqlsh` session: | ||
|
|
||
| ```shell |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/shell/sql/ or something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Checking the supported languages, I think sql is probably right.
| "github.com/buildbarn/bb-storage/pkg/blobstore" | ||
| "github.com/buildbarn/bb-storage/pkg/blobstore/buffer" | ||
| "github.com/buildbarn/bb-storage/pkg/blobstore/slicing" | ||
| "github.com/buildbarn/bb-storage/pkg/capabilities" | ||
| bbdigest "github.com/buildbarn/bb-storage/pkg/digest" | ||
| "github.com/buildbarn/bb-storage/pkg/util" | ||
|
|
||
| remoteexecution "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2" | ||
| "github.com/gocql/gocql" | ||
| "github.com/prometheus/client_golang/prometheus" | ||
| "golang.org/x/sync/errgroup" | ||
| "google.golang.org/grpc/codes" | ||
| "google.golang.org/grpc/status" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For consistency with the rest of the code base, please group imports by second level domain. So first comes github.com, then an empty line, and then golang.org.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there an automated formatter for this? Or suggested IJ settings?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| segmentWriteHist = prometheus.NewHistogram(prometheus.HistogramOpts{ | ||
| Namespace: "buildbarn", | ||
| Subsystem: "storage", | ||
| Name: "cassandra_segment_write_duration_in_seconds", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please following the Prometheus best practices on naming: https://prometheus.io/docs/practices/naming/
In particular, remove the _in_ part of the metric's name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| cluster := gocql.NewCluster(clusterHosts...) | ||
| cluster.Keyspace = keyspace | ||
| cluster.Consistency = writeConsistency | ||
| if preferredDc != "" { | ||
| cluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(gocql.DCAwareRoundRobinPolicy(preferredDc)) | ||
| } | ||
|
|
||
| if port != 0 { | ||
| cluster.Port = int(port) | ||
| } | ||
|
|
||
| if protoVersion != 0 { | ||
| cluster.ProtoVersion = int(protoVersion) | ||
| } | ||
|
|
||
| if username != "" { | ||
| cluster.Authenticator = gocql.PasswordAuthenticator{ | ||
| Username: username, | ||
| Password: password, | ||
| } | ||
| } | ||
|
|
||
| if tlsConfig != nil { | ||
| cluster.SslOpts = &gocql.SslOptions{ | ||
| Config: tlsConfig, | ||
| } | ||
| } else { | ||
| log.Printf("Not setting up TLS config") | ||
| } | ||
|
|
||
| if tablePrefix == "" { | ||
| log.Printf("No table prefix set. Cowardly refusing to proceed.") | ||
| return blobstore.NewErrorBlobAccess(errors.New("table prefix must be set")) | ||
| } | ||
| log.Printf("Table prefix: %s", tablePrefix) | ||
|
|
||
| log.Printf("Creating session in keyspace '%s' for %v", keyspace, clusterHosts) | ||
|
|
||
| session, err := cluster.CreateSession() | ||
| if err != nil { | ||
| log.Printf("Unable to create session: %v", err) | ||
| return blobstore.NewErrorBlobAccess(err) | ||
| } | ||
|
|
||
| // Create the worker pool for updating the last access times | ||
| jobs := make(chan job, lastAccessUpdateWorkerCount) | ||
| for i := 0; i < lastAccessUpdateWorkerCount; i++ { | ||
| go lastUpdateWorker(jobs) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed, please move this into a separate function, as it prevents cassandraBlobAccess from being unit tested properly. Also remove the log.Printf() calls, as there is no need to repeat whatever is specified in config. Because once we start to tolerate this, the question becomes: "Why do we print the value of this one configuration option, but not of this other one?" Also get rid of the calls to NewErrorBlobAccess. Any misconfigurations should lead to a fatal startup error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extracted the session creation into a helper function and removed the logging.
| func (a *cassandraBlobAccess) GetCapabilities(ctx context.Context, instanceName bbdigest.InstanceName) (*remoteexecution.ServerCapabilities, error) { | ||
| return a.capabilitiesProvider.GetCapabilities(ctx, instanceName) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably nicer to embed CapabilitiesProvider directly into cassandraBlobAccess, so we don't need this wrapper.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
| return buffer.NewBufferFromError(err) | ||
| } | ||
|
|
||
| go a.tables.metadata.updateLastAccessTime(context.Background(), digest, metadata.instanceName, metadata.lastAccess, a.lastAccessUpdateInterval, a.lastAccessUpdate) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again, even though having no concurrency limits in place might at this point never have been an issue for you, I'd like to see this addressed before this change lands.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
| g.SetLimit(simultaneousWorkers) | ||
| for _, digest := range digests.Items() { | ||
| g.Go(func() error { | ||
| return a.isMissing(errCtx, digest, &builder, &mu) | ||
| }) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A downside of using .SetLimit() in combination with a loop like this is that there is no cancelation at all during loop iteration. If a single .isMissing() fails, this code will fail to break immediately, meaning all remaining entries MUST be checked, even if we know they will fail.
Please consider getting rid of the .SetLimit() call and using a separate semaphore, like we do in most other places of the Buildbarn codebase.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How's that?
|
|
||
| func (a *cassandraBlobAccess) getInstanceName(digest bbdigest.Digest) string { | ||
| instanceName := digest.GetInstanceName().String() | ||
| if instanceName == a.universalInstanceName { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove support for "universal instance names". I don't think there's any need to do this as part of this backend specifically. Can't it be implemented as a separate decorator for BlobAccess?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed. I'll do a follow up PR to set the instance name to a default value if necessary.
| longBlob := bytes.Repeat([]byte("This is a very long string!"), 100) | ||
| query := session.Query("INSERT INTO some_table (id, first_name, long_blob) VALUES (?, ?, ?)", 1234, "Jon Snow", longBlob) | ||
| expectedOutput := `[query ANY "INSERT INTO some_table (id, first_name, long_blob) VALUES (?, ?, ?)" ` + | ||
| `[1234 Jon Snow [84 104 105 115 32 105 115 32 97 32 118 101 114 121 32 108 111 110 103 32 115 116 114 105 110 103 33...]]` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this mean that if failures occur, this storage backend will potentially dump megabytes of decimal numbers to stderr?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope. The formatQuery method caps the length of the output to 100 characters.
|
|
||
| // The Cassandra protocol version. This can normally be omitted from | ||
| // your config. | ||
| int32 protocolVersion = 4; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
protocol_version, preferred_dc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missed those. Fixed. Thank you
shs96c
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've landed changes for the simple things. Still need to update to handle the go routines piling up, and I'll get to that ASAP.
pkg/blobstore/cassandra/README.md
Outdated
| The tables that are required can be set up using command similar to the | ||
| below in a `cqlsh` session: | ||
|
|
||
| ```shell |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Checking the supported languages, I think sql is probably right.
| longBlob := bytes.Repeat([]byte("This is a very long string!"), 100) | ||
| query := session.Query("INSERT INTO some_table (id, first_name, long_blob) VALUES (?, ?, ?)", 1234, "Jon Snow", longBlob) | ||
| expectedOutput := `[query ANY "INSERT INTO some_table (id, first_name, long_blob) VALUES (?, ?, ?)" ` + | ||
| `[1234 Jon Snow [84 104 105 115 32 105 115 32 97 32 118 101 114 121 32 108 111 110 103 32 115 116 114 105 110 103 33...]]` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope. The formatQuery method caps the length of the output to 100 characters.
|
|
||
| // The Cassandra protocol version. This can normally be omitted from | ||
| // your config. | ||
| int32 protocolVersion = 4; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missed those. Fixed. Thank you
| "github.com/buildbarn/bb-storage/pkg/blobstore" | ||
| "github.com/buildbarn/bb-storage/pkg/blobstore/buffer" | ||
| "github.com/buildbarn/bb-storage/pkg/blobstore/slicing" | ||
| "github.com/buildbarn/bb-storage/pkg/capabilities" | ||
| bbdigest "github.com/buildbarn/bb-storage/pkg/digest" | ||
| "github.com/buildbarn/bb-storage/pkg/util" | ||
|
|
||
| remoteexecution "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2" | ||
| "github.com/gocql/gocql" | ||
| "github.com/prometheus/client_golang/prometheus" | ||
| "golang.org/x/sync/errgroup" | ||
| "google.golang.org/grpc/codes" | ||
| "google.golang.org/grpc/status" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there an automated formatter for this? Or suggested IJ settings?
| "github.com/buildbarn/bb-storage/pkg/blobstore" | ||
| "github.com/buildbarn/bb-storage/pkg/blobstore/buffer" | ||
| "github.com/buildbarn/bb-storage/pkg/blobstore/slicing" | ||
| "github.com/buildbarn/bb-storage/pkg/capabilities" | ||
| bbdigest "github.com/buildbarn/bb-storage/pkg/digest" | ||
| "github.com/buildbarn/bb-storage/pkg/util" | ||
|
|
||
| remoteexecution "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2" | ||
| "github.com/gocql/gocql" | ||
| "github.com/prometheus/client_golang/prometheus" | ||
| "golang.org/x/sync/errgroup" | ||
| "google.golang.org/grpc/codes" | ||
| "google.golang.org/grpc/status" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| totalWritesHist = prometheus.NewHistogram(prometheus.HistogramOpts{ | ||
| Namespace: "buildbarn", | ||
| Subsystem: "storage", | ||
| Name: "cassandra_total_write_duration_in_seconds", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed.
| Namespace: "buildbarn", | ||
| Subsystem: "storage", | ||
| Name: "cassandra_total_write_duration_in_seconds", | ||
| Help: "Total amount of time in seconds it takes to write an entire bytestream to Cassandra", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
| type job struct { | ||
| task func() | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| cluster := gocql.NewCluster(clusterHosts...) | ||
| cluster.Keyspace = keyspace | ||
| cluster.Consistency = writeConsistency | ||
| if preferredDc != "" { | ||
| cluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(gocql.DCAwareRoundRobinPolicy(preferredDc)) | ||
| } | ||
|
|
||
| if port != 0 { | ||
| cluster.Port = int(port) | ||
| } | ||
|
|
||
| if protoVersion != 0 { | ||
| cluster.ProtoVersion = int(protoVersion) | ||
| } | ||
|
|
||
| if username != "" { | ||
| cluster.Authenticator = gocql.PasswordAuthenticator{ | ||
| Username: username, | ||
| Password: password, | ||
| } | ||
| } | ||
|
|
||
| if tlsConfig != nil { | ||
| cluster.SslOpts = &gocql.SslOptions{ | ||
| Config: tlsConfig, | ||
| } | ||
| } else { | ||
| log.Printf("Not setting up TLS config") | ||
| } | ||
|
|
||
| if tablePrefix == "" { | ||
| log.Printf("No table prefix set. Cowardly refusing to proceed.") | ||
| return blobstore.NewErrorBlobAccess(errors.New("table prefix must be set")) | ||
| } | ||
| log.Printf("Table prefix: %s", tablePrefix) | ||
|
|
||
| log.Printf("Creating session in keyspace '%s' for %v", keyspace, clusterHosts) | ||
|
|
||
| session, err := cluster.CreateSession() | ||
| if err != nil { | ||
| log.Printf("Unable to create session: %v", err) | ||
| return blobstore.NewErrorBlobAccess(err) | ||
| } | ||
|
|
||
| // Create the worker pool for updating the last access times | ||
| jobs := make(chan job, lastAccessUpdateWorkerCount) | ||
| for i := 0; i < lastAccessUpdateWorkerCount; i++ { | ||
| go lastUpdateWorker(jobs) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extracted the session creation into a helper function and removed the logging.
|
|
||
| func (a *cassandraBlobAccess) getInstanceName(digest bbdigest.Digest) string { | ||
| instanceName := digest.GetInstanceName().String() | ||
| if instanceName == a.universalInstanceName { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed. I'll do a follow up PR to set the instance name to a default value if necessary.
No description provided.