Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 21 additions & 9 deletions code/sonalyze/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@
// Kafka channel for the clusters found in the data directory. It should be the only consumer
// for those data. The broker-address is normally on the form hostname:port.
//
// -kafka-group <group-name>
//
// For Kafka, use this consumer group name (mostly useful for testing and advanced usage). Once
// ingested for a particular consumer group, a message will not be ingested again for the group
// even if the client is restarted.
//
// -rest-api <interface>
//
// The daemon will present various APIs on the given interface (in the form interface:port,
Expand Down Expand Up @@ -112,6 +118,7 @@ import (
const (
defaultListenPort = 8087
logTag = "jobanalyzer/sonalyze"
defaultKafkaGroup = "jobanalyzer-ingest"
)

// MT: Immutable (no mutator operations) and thread-safe.
Expand All @@ -122,14 +129,15 @@ type DaemonCommand struct {
DevArgs
VerboseArgs
DatabaseArgs
getAuthFile string
postAuthFile string
kafkaBroker string
restAPI string
insert bool
v0 bool
v1 bool
v2 bool
getAuthFile string
postAuthFile string
kafkaBroker string
consumerGroup string
restAPI string
insert bool
v0 bool
v1 bool
v2 bool

getAuthenticator *auth.Authenticator
postAuthenticator *auth.Authenticator
Expand All @@ -150,7 +158,8 @@ func (dc *DaemonCommand) Add(fs *CLI) {
fs.StringVar(&dc.getAuthFile, "analysis-auth", "", "Authentication info `filename` for analysis access")
fs.StringVar(&dc.postAuthFile, "upload-auth", "", "Authentication info `filename` for data upload access")
fs.StringVar(&dc.getAuthFile, "password-file", "", "Alias for -analysis-auth")
fs.StringVar(&dc.kafkaBroker, "kafka", "", "Ingest data from this broker for all known clusters")
fs.StringVar(&dc.kafkaBroker, "kafka", "", "Ingest data from this `broker` for all known clusters")
fs.StringVar(&dc.consumerGroup, "kafka-group", defaultKafkaGroup, "Kafka consumer `group name`")
fs.StringVar(&dc.restAPI, "rest-api", "", "Serve /api/v0, /api/v1 and /api/v2 on this interface:port")
fs.BoolVar(&dc.insert, "insert", false, "Enable the /api/v1/insert points")
fs.BoolVar(&dc.v0, "v0", false, "Enable the v0 API")
Expand Down Expand Up @@ -189,6 +198,9 @@ func (dc *DaemonCommand) Validate() error {
if dc.insert && dc.DatabaseURI() != "" {
return fmt.Errorf("Can't have both -database-uri and -insert")
}
if dc.consumerGroup != defaultKafkaGroup && dc.kafkaBroker == "" {
return fmt.Errorf("Can't have -kafka-group without -kafka")
}
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions code/sonalyze/daemon/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ const (

// This runs on a goroutine - one goroutine per cluster, just to be a little resilient.

func runKafka(kafkaBroker, cluster string, ds db.AppendablePersistentDataProvider) {
func runKafka(kafkaBroker, cluster, consumerGroup string, ds db.AppendablePersistentDataProvider) {
defer ds.Close()
handler := newClusterHandler(cluster, ds)
cl, err := kgo.NewClient(
kgo.SeedBrokers(kafkaBroker),
kgo.ConsumerGroup("jobanalyzer-ingest"),
kgo.ConsumerGroup(consumerGroup),
kgo.ConsumeTopics(
handler.add(tySample),
handler.add(tySysinfo),
Expand Down
2 changes: 1 addition & 1 deletion code/sonalyze/daemon/perform.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (dc *DaemonCommand) RunDaemon(_ io.Reader, _, stderr io.Writer) error {
if Verbose {
Log.Infof("Starting listener for %s", cl.Name)
}
go runKafka(dc.kafkaBroker, cl.Name, ds)
go runKafka(dc.kafkaBroker, cl.Name, dc.consumerGroup, ds)
}
}

Expand Down
Loading