Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 16 additions & 17 deletions code/sonalyze/daemon/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"log"

"github.com/NordicHPC/sonar/util/formats/newfmt"
"github.com/twmb/franz-go/pkg/kgo"
Expand Down Expand Up @@ -37,42 +36,42 @@ func runKafka(kafkaBroker, cluster, consumerGroup string, ds db.AppendablePersis
if err != nil {
// This should be surfaced somehow, but probably we should just back off and retry later,
// the broker could be down - depends on the error!
log.Printf("%s: Failed to create client: %v", cluster, err)
Log.Infof("%s: Failed to create client: %v", cluster, err)
return
}
defer cl.Close()
if Verbose {
log.Printf("%s: Connected!", cluster)
Log.Infof("%s: Connected!", cluster)
}

ctx := context.Background()
for {
if Verbose {
log.Printf("%s: Fetching data", cluster)
Log.Infof("%s: Fetching data", cluster)
}
fetches := cl.PollFetches(ctx)
if Verbose {
log.Printf("%s: Fetched data", cluster)
Log.Infof("%s: Fetched data", cluster)
}
if errs := fetches.Errors(); len(errs) > 0 {
// All errors are retried internally when fetching, but non-retriable errors are
// returned from polls so that users can notice and take action.
log.Printf("%s: SOFT ERROR: Failed to fetch data! %v", cluster, errs)
Log.Infof("%s: SOFT ERROR: Failed to fetch data! %v", cluster, errs)
}

iter := fetches.RecordIter()
for !iter.Done() {
record := iter.Next()
if Verbose {
log.Printf(" %s: %s", cluster, record.Topic)
Log.Infof(" %s: %s", cluster, record.Topic)
}
err := handler.dispatch(record.Topic, record.Key, record.Value)
if err != nil {
log.Printf(" %s: SOFT ERROR: Topic handler %s failed: %v", cluster, record.Topic, err)
Log.Infof(" %s: SOFT ERROR: Topic handler %s failed: %v", cluster, record.Topic, err)
}
}
if err := cl.CommitUncommittedOffsets(ctx); err != nil {
log.Printf(" %s: SOFT ERROR: Commit records failed: %v", cluster, err)
Log.Infof(" %s: SOFT ERROR: Commit records failed: %v", cluster, err)
}
}
}
Expand Down Expand Up @@ -128,12 +127,12 @@ func handleSample(ch *clusterHandler, topic, host string, data []byte) error {
}
if info.Data != nil {
if Verbose {
log.Printf("%s: Got a good sample %s %s", ch.cluster, topic, host)
Log.Infof("%s: Got a good sample %s %s", ch.cluster, topic, host)
}
return ch.ds.AppendSamplesAsync(db.DataSampleV0JSON, host, string(info.Data.Attributes.Time), data)
}
if Verbose {
log.Printf("%s: Dropping a sample error object on the floor", ch.cluster)
Log.Infof("%s: Dropping a sample error object on the floor", ch.cluster)
}
return nil
}
Expand All @@ -146,12 +145,12 @@ func handleSysinfo(ch *clusterHandler, topic, host string, data []byte) error {
}
if info.Data != nil {
if Verbose {
log.Printf("%s: Got a good sysinfo %s %s", ch.cluster, topic, host)
Log.Infof("%s: Got a good sysinfo %s %s", ch.cluster, topic, host)
}
return ch.ds.AppendSysinfoAsync(db.DataSysinfoV0JSON, host, string(info.Data.Attributes.Time), data)
}
if Verbose {
log.Printf("%s: Dropping a sysinfo error object on the floor", ch.cluster)
Log.Infof("%s: Dropping a sysinfo error object on the floor", ch.cluster)
}
return nil
}
Expand All @@ -164,12 +163,12 @@ func handleSlurmJobs(ch *clusterHandler, topic, host string, data []byte) error
}
if info.Data != nil {
if Verbose {
log.Printf("%s: Got a good jobs %s %s", ch.cluster, topic, host)
Log.Infof("%s: Got a good jobs %s %s", ch.cluster, topic, host)
}
return ch.ds.AppendSlurmSacctAsync(db.DataSlurmV0JSON, string(info.Data.Attributes.Time), data)
}
if Verbose {
log.Printf("%s: Dropping a job error object on the floor", ch.cluster)
Log.Infof("%s: Dropping a job error object on the floor", ch.cluster)
}
return nil
}
Expand All @@ -182,12 +181,12 @@ func handleCluster(ch *clusterHandler, topic, host string, data []byte) error {
}
if info.Data != nil {
if Verbose {
log.Printf("%s: Got a good cluster %s %s", ch.cluster, topic, host)
Log.Infof("%s: Got a good cluster %s %s", ch.cluster, topic, host)
}
return ch.ds.AppendCluzterAsync(db.DataCluzterV0JSON, string(info.Data.Attributes.Time), data)
}
if Verbose {
log.Printf("%s: Dropping a cluster error object on the floor", ch.cluster)
Log.Infof("%s: Dropping a cluster error object on the floor", ch.cluster)
}
return nil
}
Loading