diff --git a/code/sonalyze/daemon/kafka.go b/code/sonalyze/daemon/kafka.go index a951c19e..a01133a0 100644 --- a/code/sonalyze/daemon/kafka.go +++ b/code/sonalyze/daemon/kafka.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "fmt" - "log" "github.com/NordicHPC/sonar/util/formats/newfmt" "github.com/twmb/franz-go/pkg/kgo" @@ -37,42 +36,42 @@ func runKafka(kafkaBroker, cluster, consumerGroup string, ds db.AppendablePersis if err != nil { // This should be surfaced somehow, but probably we should just back off and retry later, // the broker could be down - depends on the error! - log.Printf("%s: Failed to create client: %v", cluster, err) + Log.Infof("%s: Failed to create client: %v", cluster, err) return } defer cl.Close() if Verbose { - log.Printf("%s: Connected!", cluster) + Log.Infof("%s: Connected!", cluster) } ctx := context.Background() for { if Verbose { - log.Printf("%s: Fetching data", cluster) + Log.Infof("%s: Fetching data", cluster) } fetches := cl.PollFetches(ctx) if Verbose { - log.Printf("%s: Fetched data", cluster) + Log.Infof("%s: Fetched data", cluster) } if errs := fetches.Errors(); len(errs) > 0 { // All errors are retried internally when fetching, but non-retriable errors are // returned from polls so that users can notice and take action. - log.Printf("%s: SOFT ERROR: Failed to fetch data! %v", cluster, errs) + Log.Infof("%s: SOFT ERROR: Failed to fetch data! %v", cluster, errs) } iter := fetches.RecordIter() for !iter.Done() { record := iter.Next() if Verbose { - log.Printf(" %s: %s", cluster, record.Topic) + Log.Infof(" %s: %s", cluster, record.Topic) } err := handler.dispatch(record.Topic, record.Key, record.Value) if err != nil { - log.Printf(" %s: SOFT ERROR: Topic handler %s failed: %v", cluster, record.Topic, err) + Log.Infof(" %s: SOFT ERROR: Topic handler %s failed: %v", cluster, record.Topic, err) } } if err := cl.CommitUncommittedOffsets(ctx); err != nil { - log.Printf(" %s: SOFT ERROR: Commit records failed: %v", cluster, err) + Log.Infof(" %s: SOFT ERROR: Commit records failed: %v", cluster, err) } } } @@ -128,12 +127,12 @@ func handleSample(ch *clusterHandler, topic, host string, data []byte) error { } if info.Data != nil { if Verbose { - log.Printf("%s: Got a good sample %s %s", ch.cluster, topic, host) + Log.Infof("%s: Got a good sample %s %s", ch.cluster, topic, host) } return ch.ds.AppendSamplesAsync(db.DataSampleV0JSON, host, string(info.Data.Attributes.Time), data) } if Verbose { - log.Printf("%s: Dropping a sample error object on the floor", ch.cluster) + Log.Infof("%s: Dropping a sample error object on the floor", ch.cluster) } return nil } @@ -146,12 +145,12 @@ func handleSysinfo(ch *clusterHandler, topic, host string, data []byte) error { } if info.Data != nil { if Verbose { - log.Printf("%s: Got a good sysinfo %s %s", ch.cluster, topic, host) + Log.Infof("%s: Got a good sysinfo %s %s", ch.cluster, topic, host) } return ch.ds.AppendSysinfoAsync(db.DataSysinfoV0JSON, host, string(info.Data.Attributes.Time), data) } if Verbose { - log.Printf("%s: Dropping a sysinfo error object on the floor", ch.cluster) + Log.Infof("%s: Dropping a sysinfo error object on the floor", ch.cluster) } return nil } @@ -164,12 +163,12 @@ func handleSlurmJobs(ch *clusterHandler, topic, host string, data []byte) error } if info.Data != nil { if Verbose { - log.Printf("%s: Got a good jobs %s %s", ch.cluster, topic, host) + Log.Infof("%s: Got a good jobs %s %s", ch.cluster, topic, host) } return ch.ds.AppendSlurmSacctAsync(db.DataSlurmV0JSON, string(info.Data.Attributes.Time), data) } if Verbose { - log.Printf("%s: Dropping a job error object on the floor", ch.cluster) + Log.Infof("%s: Dropping a job error object on the floor", ch.cluster) } return nil } @@ -182,12 +181,12 @@ func handleCluster(ch *clusterHandler, topic, host string, data []byte) error { } if info.Data != nil { if Verbose { - log.Printf("%s: Got a good cluster %s %s", ch.cluster, topic, host) + Log.Infof("%s: Got a good cluster %s %s", ch.cluster, topic, host) } return ch.ds.AppendCluzterAsync(db.DataCluzterV0JSON, string(info.Data.Attributes.Time), data) } if Verbose { - log.Printf("%s: Dropping a cluster error object on the floor", ch.cluster) + Log.Infof("%s: Dropping a cluster error object on the floor", ch.cluster) } return nil }