diff --git a/code/sonalyze/daemon/daemon.go b/code/sonalyze/daemon/daemon.go index 46066c20..7ab98900 100644 --- a/code/sonalyze/daemon/daemon.go +++ b/code/sonalyze/daemon/daemon.go @@ -51,6 +51,12 @@ // Kafka channel for the clusters found in the data directory. It should be the only consumer // for those data. The broker-address is normally on the form hostname:port. // +// -kafka-group +// +// For Kafka, use this consumer group name (mostly useful for testing and advanced usage). Once +// ingested for a particular consumer group, a message will not be ingested again for the group +// even if the client is restarted. +// // -rest-api // // The daemon will present various APIs on the given interface (in the form interface:port, @@ -112,6 +118,7 @@ import ( const ( defaultListenPort = 8087 logTag = "jobanalyzer/sonalyze" + defaultKafkaGroup = "jobanalyzer-ingest" ) // MT: Immutable (no mutator operations) and thread-safe. @@ -122,14 +129,15 @@ type DaemonCommand struct { DevArgs VerboseArgs DatabaseArgs - getAuthFile string - postAuthFile string - kafkaBroker string - restAPI string - insert bool - v0 bool - v1 bool - v2 bool + getAuthFile string + postAuthFile string + kafkaBroker string + consumerGroup string + restAPI string + insert bool + v0 bool + v1 bool + v2 bool getAuthenticator *auth.Authenticator postAuthenticator *auth.Authenticator @@ -150,7 +158,8 @@ func (dc *DaemonCommand) Add(fs *CLI) { fs.StringVar(&dc.getAuthFile, "analysis-auth", "", "Authentication info `filename` for analysis access") fs.StringVar(&dc.postAuthFile, "upload-auth", "", "Authentication info `filename` for data upload access") fs.StringVar(&dc.getAuthFile, "password-file", "", "Alias for -analysis-auth") - fs.StringVar(&dc.kafkaBroker, "kafka", "", "Ingest data from this broker for all known clusters") + fs.StringVar(&dc.kafkaBroker, "kafka", "", "Ingest data from this `broker` for all known clusters") + fs.StringVar(&dc.consumerGroup, "kafka-group", defaultKafkaGroup, "Kafka consumer `group name`") fs.StringVar(&dc.restAPI, "rest-api", "", "Serve /api/v0, /api/v1 and /api/v2 on this interface:port") fs.BoolVar(&dc.insert, "insert", false, "Enable the /api/v1/insert points") fs.BoolVar(&dc.v0, "v0", false, "Enable the v0 API") @@ -189,6 +198,9 @@ func (dc *DaemonCommand) Validate() error { if dc.insert && dc.DatabaseURI() != "" { return fmt.Errorf("Can't have both -database-uri and -insert") } + if dc.consumerGroup != defaultKafkaGroup && dc.kafkaBroker == "" { + return fmt.Errorf("Can't have -kafka-group without -kafka") + } return nil } diff --git a/code/sonalyze/daemon/kafka.go b/code/sonalyze/daemon/kafka.go index 8a7be911..a951c19e 100644 --- a/code/sonalyze/daemon/kafka.go +++ b/code/sonalyze/daemon/kafka.go @@ -22,12 +22,12 @@ const ( // This runs on a goroutine - one goroutine per cluster, just to be a little resilient. -func runKafka(kafkaBroker, cluster string, ds db.AppendablePersistentDataProvider) { +func runKafka(kafkaBroker, cluster, consumerGroup string, ds db.AppendablePersistentDataProvider) { defer ds.Close() handler := newClusterHandler(cluster, ds) cl, err := kgo.NewClient( kgo.SeedBrokers(kafkaBroker), - kgo.ConsumerGroup("jobanalyzer-ingest"), + kgo.ConsumerGroup(consumerGroup), kgo.ConsumeTopics( handler.add(tySample), handler.add(tySysinfo), diff --git a/code/sonalyze/daemon/perform.go b/code/sonalyze/daemon/perform.go index fc6e8365..d88ede65 100644 --- a/code/sonalyze/daemon/perform.go +++ b/code/sonalyze/daemon/perform.go @@ -50,7 +50,7 @@ func (dc *DaemonCommand) RunDaemon(_ io.Reader, _, stderr io.Writer) error { if Verbose { Log.Infof("Starting listener for %s", cl.Name) } - go runKafka(dc.kafkaBroker, cl.Name, ds) + go runKafka(dc.kafkaBroker, cl.Name, dc.consumerGroup, ds) } }