From 0a65fe6ba3112587edf39288f794df890a1f4964 Mon Sep 17 00:00:00 2001 From: Lars T Hansen Date: Thu, 16 Apr 2026 10:59:51 +0200 Subject: [PATCH 1/4] Fix #890 - api0, and insertion parts of api1 --- code/sonalyze/Makefile | 4 +- code/sonalyze/application/command.go | 7 +- code/sonalyze/application/remote.go | 21 +- code/sonalyze/cmd/add/add.go | 230 ------ code/sonalyze/cmd/add/summary.txt | 15 - code/sonalyze/cmd/args.go | 22 +- code/sonalyze/cmd/args_reify.go | 19 +- code/sonalyze/daemon/api0/api0.go | 765 ++++++++++++++++++ code/sonalyze/daemon/api1/api1.go | 201 +++++ code/sonalyze/daemon/api2/api2.go | 126 +-- code/sonalyze/daemon/api2/list_clusters.go | 3 +- .../daemon/api2/nodes_cpu_timeseries.go | 15 +- .../daemon/api2/nodes_diskstats_timeseries.go | 5 +- .../daemon/api2/nodes_gpu_timeseries.go | 7 +- code/sonalyze/daemon/api2/nodes_info.go | 6 +- .../daemon/api2/nodes_last_probe_timestamp.go | 5 +- .../daemon/api2/nodes_process_gpu_util.go | 7 +- code/sonalyze/daemon/api2/processes.go | 7 +- code/sonalyze/daemon/api2/processes_gpu.go | 7 +- .../daemon/api2/processes_timeseries.go | 7 +- code/sonalyze/daemon/apiutil/apiutil.go | 137 ++++ code/sonalyze/daemon/apiutil/auth.go | 25 + code/sonalyze/daemon/daemon.go | 75 +- code/sonalyze/daemon/perform.go | 433 +--------- code/sonalyze/doc/STALE.md | 22 +- 25 files changed, 1269 insertions(+), 902 deletions(-) delete mode 100644 code/sonalyze/cmd/add/add.go delete mode 100644 code/sonalyze/cmd/add/summary.txt create mode 100644 code/sonalyze/daemon/api0/api0.go create mode 100644 code/sonalyze/daemon/api1/api1.go create mode 100644 code/sonalyze/daemon/apiutil/apiutil.go create mode 100644 code/sonalyze/daemon/apiutil/auth.go diff --git a/code/sonalyze/Makefile b/code/sonalyze/Makefile index 3ad6b5ab..3cc66255 100644 --- a/code/sonalyze/Makefile +++ b/code/sonalyze/Makefile @@ -2,11 +2,11 @@ TARGET=sonalyze SUBDIRS=application \ - cmd cmd/add cmd/cards cmd/clusters cmd/configs cmd/diskprof cmd/gpus cmd/jobs cmd/load \ + cmd cmd/cards cmd/clusters cmd/configs cmd/diskprof cmd/gpus cmd/jobs cmd/load \ cmd/metadata cmd/nodeprof cmd/nodes cmd/parse cmd/profile cmd/report cmd/sacct cmd/snodes \ cmd/sparts cmd/top cmd/uptime cmd/version \ common \ - daemon daemon/api2 \ + daemon daemon/api0 daemon/api1 daemon/api2 daemon/apiutil \ data/card data/common data/config data/cpusample data/disksample data/gpusample data/node \ data/nodesample data/sample data/slurmjob data/slurmnode data/slurmpart \ db db/errs db/filedb db/filesys db/parse db/repr db/special db/types db/util \ diff --git a/code/sonalyze/application/command.go b/code/sonalyze/application/command.go index 3daf211a..4a4382e0 100644 --- a/code/sonalyze/application/command.go +++ b/code/sonalyze/application/command.go @@ -5,7 +5,6 @@ import ( "io" "sonalyze/cmd" - "sonalyze/cmd/add" "sonalyze/cmd/cards" "sonalyze/cmd/clusters" "sonalyze/cmd/configs" @@ -31,7 +30,6 @@ import ( func CommandHelp(out io.Writer) { // Keep these alphabetical. - fmt.Fprintf(out, " add - (obsolete) add old-format(!) data to the database\n") fmt.Fprintf(out, " card - print card information extracted from sysinfo table\n") fmt.Fprintf(out, " cluster - print cluster information\n") fmt.Fprintf(out, " config - print node information extracted from cluster config\n") @@ -56,12 +54,9 @@ func CommandHelp(out io.Writer) { // Keep these alphabetical // -// WHEN UPDATING THESE, ALSO UPDATE THE HELP ABOVE, HTTP HANDLERS IN ../daemon/perform.go, -// AND ANY WEB SERVER CONFIG. +// WHEN UPDATING THESE, ALSO UPDATE THE HELP ABOVE and API HANDLERS IN ../daemon/api0/api0.go. func ConstructCommand(verb string) (command cmd.Command, actualVerb string) { switch verb { - case "add": - command = new(add.AddCommand) case "card": command = new(cards.CardCommand) case "cluster": diff --git a/code/sonalyze/application/remote.go b/code/sonalyze/application/remote.go index a7e6df18..749cccf5 100644 --- a/code/sonalyze/application/remote.go +++ b/code/sonalyze/application/remote.go @@ -15,7 +15,6 @@ import ( "time" . "sonalyze/cmd" - "sonalyze/cmd/add" . "sonalyze/common" ) @@ -97,23 +96,7 @@ func RemoteOperation(rCmd Command, verb string, stdin io.Reader, stdout, stderr curlArgs = append(curlArgs, "-u", fmt.Sprintf("%s:%s", username, password)) } - switch cmd := rCmd.(type) { - case *add.AddCommand: - // This turns into a POST with data coming from the standard DataSource - var contentType string - switch { - case cmd.Sample, cmd.SlurmSacct: - contentType = "text/csv" - case cmd.Sysinfo: - contentType = "application/json" - default: - panic("Unknown state of AddCommand") - } - curlArgs = append( - curlArgs, - "--data-binary", "@-", - "-H", "Content-Type: "+contentType, - ) + switch rCmd.(type) { case SampleAnalysisCommand: curlArgs = append(curlArgs, "--get") case SimpleCommand: @@ -123,7 +106,7 @@ func RemoteOperation(rCmd Command, verb string, stdin io.Reader, stdout, stderr default: panic("Unimplemented") } - curlArgs = append(curlArgs, rCmd.RemoteHost()+"/"+verb+"?"+r.EncodedArguments()) + curlArgs = append(curlArgs, rCmd.RemoteHost()+"/api/v0/"+verb+"?"+r.EncodedArguments()) if Verbose { Log.Infof( diff --git a/code/sonalyze/cmd/add/add.go b/code/sonalyze/cmd/add/add.go deleted file mode 100644 index d4d0244f..00000000 --- a/code/sonalyze/cmd/add/add.go +++ /dev/null @@ -1,230 +0,0 @@ -package add - -import ( - "bufio" - "bytes" - _ "embed" - "encoding/csv" - "encoding/json" - "errors" - "fmt" - "io" - "strings" - - "go-utils/config" - . "sonalyze/cmd" - . "sonalyze/common" - "sonalyze/data/sample" - "sonalyze/db" - "sonalyze/db/types" -) - -type AddCommand struct { - DevArgs - VerboseArgs - DatabaseArgs - Sample bool - Sysinfo bool - SlurmSacct bool -} - -var _ = SimpleCommand((*AddCommand)(nil)) - -//go:embed summary.txt -var summary string - -func (ac *AddCommand) Summary(out io.Writer) { - fmt.Fprint(out, summary) -} - -func (ac *AddCommand) Add(fs *CLI) { - ac.DevArgs.Add(fs) - ac.VerboseArgs.Add(fs) - ac.DatabaseArgs.Add(fs, DBArgOptions{}) - - fs.Group("data-target") - fs.BoolVar(&ac.Sample, "sample", false, - "Insert sonar sample data from stdin (zero or more records)") - fs.BoolVar(&ac.Sysinfo, "sysinfo", false, - "Insert sonar sysinfo data from stdin (exactly one record)") - fs.BoolVar(&ac.SlurmSacct, "slurm-sacct", false, - "Insert sacct data from stdin (zero or more records)") -} - -func (ac *AddCommand) Validate() error { - var e1, e2, e3, e4 error - e1 = ac.DevArgs.Validate() - e2 = ac.VerboseArgs.Validate() - e3 = ac.DatabaseArgs.Validate() - opts := 0 - if ac.Sample { - opts++ - } - if ac.Sysinfo { - opts++ - } - if ac.SlurmSacct { - opts++ - } - if opts != 1 { - e4 = errors.New("Exactly one of -sample, -sysinfo, -slurm-sacct must be requested") - } - return errors.Join(e1, e2, e3, e4) -} - -func (ac *AddCommand) ReifyForRemote(x *ArgReifier) error { - e1 := ac.DevArgs.ReifyForRemote(x) - e2 := ac.DatabaseArgs.ReifyForRemote(x) - x.Bool("sample", ac.Sample) - x.Bool("sysinfo", ac.Sysinfo) - x.Bool("slurm-sacct", ac.SlurmSacct) - return errors.Join(e1, e2) -} - -func (ac *AddCommand) Perform(meta types.Context, stdin io.Reader, _, _ io.Writer) error { - data, err := io.ReadAll(stdin) - if err != nil { - return err - } - switch { - case ac.Sample: - return ac.addSonarFreeCsv(meta, data) - case ac.Sysinfo: - return ac.addSysinfoOldJson(meta, data) - case ac.SlurmSacct: - return ac.addSlurmSacctFreeCsv(meta, data) - default: - panic("Unexpected") - } -} - -func (ac *AddCommand) addSysinfoOldJson(meta types.Context, payload []byte) error { - if Verbose { - Log.Infof("Sysinfo record %d bytes", len(payload)) - } - var info config.NodeConfigRecord - err := json.Unmarshal(payload, &info) - if err != nil { - return fmt.Errorf("Can't unmarshal Sysinfo JSON: %v", err) - } - if info.Timestamp == "" || info.Hostname == "" { - // Older versions of `sysinfo` - // TODO: IMPROVEME: Benign if timestamp missing? - return errors.New("Missing timestamp or host in Sonar sysinfo data") - } - ds, err := db.OpenAppendablePersistentDirectoryDB(meta) - if err != nil { - return err - } - defer ds.FlushAsync() - err = ds.AppendSysinfoAsync(db.DataSysinfoOldJSON, info.Hostname, info.Timestamp, payload) - if err == sample.BadTimestampErr { - return nil - } - return err -} - -func (ac *AddCommand) addSonarFreeCsv(meta types.Context, payload []byte) error { - if Verbose { - Log.Infof("Sample records %d bytes", len(payload)) - } - ds, err := db.OpenAppendablePersistentDirectoryDB(meta) - if err != nil { - return err - } - defer ds.FlushAsync() - count := 0 - scanner := bufio.NewScanner(bytes.NewReader(payload)) - var result error - for scanner.Scan() { - count++ - text := scanner.Text() - fields, err := getCsvFields(text) - if err != nil { - return fmt.Errorf("Can't unmarshal Sonar free CSV: %v", err) - } - host := fields["host"] - time := fields["time"] - if host == "" || time == "" { - // TODO: IMPROVEME: Benign if timestamp missing (would have to drop data)? - return errors.New("Missing timestamp or host in Sonar sample data") - } - err = ds.AppendSamplesAsync(db.DataSampleCSV, host, time, text) - if err != nil && err != sample.BadTimestampErr { - result = errors.Join(result, err) - } - } - if Verbose { - Log.Infof("Sample records: %d", count) - } - return result -} - -func (ac *AddCommand) addSlurmSacctFreeCsv(meta types.Context, payload []byte) error { - if Verbose { - Log.Infof("Sacct records %d bytes", len(payload)) - } - ds, err := db.OpenAppendablePersistentDirectoryDB(meta) - if err != nil { - return err - } - defer ds.FlushAsync() - count := 0 - scanner := bufio.NewScanner(bytes.NewReader(payload)) - var result error - for scanner.Scan() { - count++ - text := scanner.Text() - fields, err := getCsvFields(text) - if err != nil { - return fmt.Errorf("Can't unmarshal sacct free CSV: %v", err) - } - // Data are stored in the time-based database according to the End time, which we expect - // always to have because we only look at completed jobs. - time := fields["End"] - if time == "" { - // TODO: IMPROVEME: Benign if timestamp missing (would have to drop data)? - return errors.New("Missing timestamp in sacct data") - } - err = ds.AppendSlurmSacctAsync(db.DataSlurmCSV, time, text) - if err != nil && err != sample.BadTimestampErr { - result = errors.Join(result, err) - } - } - if Verbose { - Log.Infof("Sacct records: %d", count) - } - return result -} - -// Given one line of text on free csv format, return the pairs of field names and values. -// -// Errors: -// - If the CSV reader returns an error err, returns (nil, err), including io.EOF. -// - If any field is seen not to have a field name, return (fields, errNoName) with -// fields that were valid. - -func getCsvFields(text string) (map[string]string, error) { - rdr := csv.NewReader(strings.NewReader(text)) - rdr.FieldsPerRecord = -1 // Free form, though should not matter - fields, err := rdr.Read() - if err != nil { - return nil, err - } - result := make(map[string]string) - for _, f := range fields { - ix := strings.IndexByte(f, '=') - if ix == -1 { - err = errNoName - continue - } - // TODO: I guess we should detect duplicates? - result[f[0:ix]] = f[ix+1:] - } - return result, err -} - -var ( - // MT: Constant after initialization; immutable - errNoName = errors.New("CSV field without a field name") -) diff --git a/code/sonalyze/cmd/add/summary.txt b/code/sonalyze/cmd/add/summary.txt deleted file mode 100644 index a8c7709e..00000000 --- a/code/sonalyze/cmd/add/summary.txt +++ /dev/null @@ -1,15 +0,0 @@ -Add new data to the database. - -Data are read from stdin, the type and format are implied by operations --sample, -sysinfo, or -slurm-sacct, one of which must be specified: - - `add -sample` adds `sonar ps` data. The format must be "free CSV", ie CSV - with name=value field syntax and no fixed colums. - - `add -sysinfo` adds `sonar sysinfo` data. The format must be the "old" sysinfo - JSON format. - - `add -slurm-sacct` adds `sonar slurm` data. The format must be free CSV. - -Mostly this command is only run by the daemon; manual use is for -experiments and bugfixes. diff --git a/code/sonalyze/cmd/args.go b/code/sonalyze/cmd/args.go index 9aaf9e5a..83fa0edc 100644 --- a/code/sonalyze/cmd/args.go +++ b/code/sonalyze/cmd/args.go @@ -565,12 +565,12 @@ type HostArgs struct { func (h *HostArgs) Add(fs *CLI) { fs.Group("record-filter") - fs.Var(NewRepeatableStringNoCommas(&h.Host), "host", + fs.Var(NewRepeatableSemiSeparated(&h.Host), "host", "Select records for this `host` (repeatable) [default: all]") } func (h *HostArgs) ReifyForRemote(x *ArgReifier) error { - x.RepeatableString("host", h.Host) + x.RepeatableSemiSeparated("host", h.Host) return nil } @@ -723,28 +723,28 @@ func NeedsConfig[T any](formatters map[string]Formatter[T], fields []FieldSpec) // Repeatable arguments. // Some string arguments can't be comma-separated because host patterns such as 'ml[1,2],ml9' would -// not really work without heroic effort. +// not really work without heroic effort. Allow them to be separated by semicolons instead. -type RepeatableStringNoCommas struct { +type RepeatableSemiSeparated struct { xs *[]string } -func NewRepeatableStringNoCommas(xs *[]string) *RepeatableStringNoCommas { - return &RepeatableStringNoCommas{xs} +func NewRepeatableSemiSeparated(xs *[]string) *RepeatableSemiSeparated { + return &RepeatableSemiSeparated{xs} } -func (rs *RepeatableStringNoCommas) String() string { +func (rs *RepeatableSemiSeparated) String() string { if rs == nil || rs.xs == nil { return "" } - return strings.Join(*rs.xs, ",") + return strings.Join(*rs.xs, ";") } -func (rs *RepeatableStringNoCommas) Set(s string) error { +func (rs *RepeatableSemiSeparated) Set(s string) error { if *rs.xs == nil { - *rs.xs = []string{s} + *rs.xs = strings.Split(s, ";") } else { - *rs.xs = append(*rs.xs, s) + *rs.xs = append(*rs.xs, strings.Split(s, ";")...) } return nil } diff --git a/code/sonalyze/cmd/args_reify.go b/code/sonalyze/cmd/args_reify.go index 8ee1fc56..ee626010 100644 --- a/code/sonalyze/cmd/args_reify.go +++ b/code/sonalyze/cmd/args_reify.go @@ -11,6 +11,7 @@ package cmd import ( "fmt" "net/url" + "strings" ) type ArgReifier struct { @@ -77,14 +78,24 @@ func (r *ArgReifier) String(n, v string) { } func (r *ArgReifier) RepeatableString(n string, vs []string) { - for _, v := range vs { - r.String(n, v) + if len(vs) != 0 { + r.String(n, strings.Join(vs, ",")) + } +} + +func (r *ArgReifier) RepeatableSemiSeparated(n string, vs []string) { + if len(vs) != 0 { + r.String(n, strings.Join(vs, ";")) } } func (r *ArgReifier) RepeatableUint32(n string, vs []uint32) { - for _, v := range vs { - r.Uint(n, uint(v)) + if len(vs) != 0 { + var ss []string + for _, v := range vs { + ss = append(ss, fmt.Sprint(v)) + } + r.String(n, strings.Join(ss, ",")) } } diff --git a/code/sonalyze/daemon/api0/api0.go b/code/sonalyze/daemon/api0/api0.go new file mode 100644 index 00000000..12bc5266 --- /dev/null +++ b/code/sonalyze/daemon/api0/api0.go @@ -0,0 +1,765 @@ +// The v0 REST API is a disciplined reimplementation of the query parts of the original sonalyze +// REST API. In v0, the successful result of a GET is always a single JSON string that must be +// parsed by the consumer (for casual use just pipe it through `jq -r`), and when parsed should +// yield exactly the same text as the original API did. +// +// The v0 API is probably the right API for traditional sonalyze remoting, but for scripting we want +// to phase out this API in favor of the v1 API which returns proper JSON. +// +// There is no v0 API insertion point, as the original sonalyze insertion points are no longer +// supported - they handled only old data types, and not even all of those. Use the v1 insertion +// points to insert data. + +package api0 + +import ( + "context" + "path" + "strings" + + "github.com/danielgtaylor/huma/v2" + + "go-utils/auth" + "sonalyze/cmd" + . "sonalyze/common" + "sonalyze/daemon/apiutil" +) + +var ( + jobanalyzerDir string + databaseURI string + cmdlineHandler cmd.CommandLineHandler + getAuthenticator *auth.Authenticator +) + +func SetupAPI( + api huma.API, + jobanalyzerDir_ string, + databaseURI_ string, + cmdlineHandler_ cmd.CommandLineHandler, + getAuthenticator_ *auth.Authenticator, +) { + jobanalyzerDir = jobanalyzerDir_ + databaseURI = databaseURI_ + cmdlineHandler = cmdlineHandler_ + getAuthenticator = getAuthenticator_ + grp := huma.NewGroup(api, "/api/v0") + // WHEN UPDATING THESE, ALSO UPDATE SWITCH IN ../../application/command.go and HELP TEXT IN THE + // SAME PLACE. + addCard(grp) + addCluster(grp) + addConfig(grp) + addDiskprof(grp) + addGpu(grp) + addJobs(grp) + addLoad(grp) + addMetadata(grp) + addNode(grp) + addNodeprof(grp) + addProfile(grp) + addSacct(grp) + addSample(grp) + addSnode(grp) + addSpart(grp) + addUptime(grp) + addVersion(grp) + // Omitting `add` because it was already obsolete; replaced by /api/v1/insert + // Omitting `parse` because that's the old name for `sample` + // Omitting `report` because it's obsolete, it was for dashboard-1 + // Omitting `top` for now because it is very limited +} + +// Query commands. + +type QueryResponse struct { + // Body is a JSON string holding the output of the sonalyze command and it must be parsed, see + // top comments. + Body string +} + +func addCard(api huma.API) { + huma.Get(api, "/card", func( + ctx context.Context, + input *struct { + apiutil.AuthHeader + HostAnalysisParams + FormatParams + }, + ) (*QueryResponse, error) { + return queryCommand( + "card", + input.Auth, + collectAll(&input.HostAnalysisParams, &input.FormatParams), + ) + }) +} + +func addCluster(api huma.API) { + huma.Get(api, "/cluster", func( + ctx context.Context, + input *struct { + apiutil.AuthHeader + QueryParams + FormatParams + }, + ) (*QueryResponse, error) { + return queryCommand( + "cluster", + input.Auth, + collectAll(&input.QueryParams, &input.FormatParams), + ) + }) +} + +func addConfig(api huma.API) { + huma.Get(api, "/config", func( + ctx context.Context, + input *struct { + apiutil.AuthHeader + HostAnalysisParams + FormatParams + }, + ) (*QueryResponse, error) { + return queryCommand( + "config", + input.Auth, + collectAll(&input.HostAnalysisParams, &input.FormatParams), + ) + }) +} + +func addDiskprof(api huma.API) { + huma.Get(api, "/diskprof", func( + ctx context.Context, + input *struct { + apiutil.AuthHeader + HostAnalysisParams + FormatParams + }, + ) (*QueryResponse, error) { + return queryCommand( + "diskprof", + input.Auth, + collectAll(&input.HostAnalysisParams, &input.FormatParams), + ) + }) +} + +func addGpu(api huma.API) { + huma.Get(api, "/gpu", func( + ctx context.Context, + input *struct { + apiutil.AuthHeader + HostAnalysisParams + FormatParams + GpuIndexParam // just in-line this if it only has this one use + }, + ) (*QueryResponse, error) { + return queryCommand( + "gpu", + input.Auth, + collectAll(&input.HostAnalysisParams, &input.FormatParams, &input.GpuIndexParam), + ) + }) +} + +func addJobs(api huma.API) { + huma.Get(api, "/jobs", func( + ctx context.Context, + input *struct { + apiutil.AuthHeader + SampleAnalysisParams + NoGpu string `query:"no-gpu"` + SomeGpu string `query:"some-gpu"` + Completed string `query:"completed"` + Running string `query:"running"` + Zombie string `query:"zombie"` + Partition string `query:"partition"` + Account string `query:"account"` + Reservation string `query:"reservation"` + State string `query:"state"` + GpuType string `query:"gpu-type"` + MinRuntimeSec string `query:"min-runtime"` + MergeAll string `query:"merge-all"` + MergeNone string `query:"merge-none"` + SacctFromSonar string `query:"sacct-from-sonar"` + NumJobs string `query:"numjobs"` // [sic!] + MinSamples string `query:"min-samples"` + MinCpuAvg string `query:"min-cpu-avg"` + MinCpuPeak string `query:"min-cpu-peak"` + MaxCpuAvg string `query:"max-cpu-avg"` + MaxCpuPeak string `query:"max-cpu-peak"` + MinRcpuAvg string `query:"min-rcpu-avg"` + MinRcpuPeak string `query:"min-rcpu-peak"` + MaxRcpuAvg string `query:"max-rcpu-avg"` + MaxRcpuPeak string `query:"max-rcpu-peak"` + MinMemAvg string `query:"min-mem-avg"` + MinMemPeak string `query:"min-mem-peak"` + MinRmemAvg string `query:"min-rmem-avg"` + MinRmemPeak string `query:"min-rmem-peak"` + MinResAvg string `query:"min-res-avg"` + MinResPeak string `query:"min-res-peak"` + MinRresAvg string `query:"min-rres-avg"` + MinRresPeak string `query:"min-rres-peak"` + MinGpuAvg string `query:"min-gpu-avg"` + MinGpuPeak string `query:"min-gpu-peak"` + MaxGpuAvg string `query:"max-gpu-avg"` + MaxGpuPeak string `query:"max-gpu-peak"` + MinRgpuAvg string `query:"min-rgpu-avg"` + MinRgpuPeak string `query:"min-rgpu-peak"` + MaxRgpuAvg string `query:"max-rgpu-avg"` + MaxRgpuPeak string `query:"max-rgpu-peak"` + MinGpumemAvg string `query:"min-gpumem-avg"` + MinGpumemPeak string `query:"min-gpumem-peak"` + MinRgpumemAvg string `query:"min-rgpumem-avg"` + MinRgpumemPeak string `query:"min-rgpumem-peak"` + FormatParams + }, + ) (*QueryResponse, error) { + return queryCommand( + "jobs", + input.Auth, + append( + collectAll(&input.SampleAnalysisParams, &input.FormatParams), + collect( + "no-gpu", input.NoGpu, + "some-gpu", input.SomeGpu, + "completed", input.Completed, + "running", input.Running, + "zombie", input.Zombie, + "partition", input.Partition, + "account", input.Account, + "reservation", input.Reservation, + "state", input.State, + "gpu-type", input.GpuType, + "min-runtime", input.MinRuntimeSec, + "merge-all", input.MergeAll, + "merge-none", input.MergeNone, + "sacct-from-sonar", input.SacctFromSonar, + "numjobs", input.NumJobs, + "min-samples", input.MinSamples, + "min-cpu-avg", input.MinCpuAvg, + "min-cpu-peak", input.MinCpuPeak, + "max-cpu-avg", input.MaxCpuAvg, + "max-cpu-peak", input.MaxCpuPeak, + "min-rcpu-avg", input.MinRcpuAvg, + "min-rcpu-peak", input.MinRcpuPeak, + "max-rcpu-avg", input.MaxRcpuAvg, + "max-rcpu-peak", input.MaxRcpuPeak, + "min-mem-avg", input.MinMemAvg, + "min-mem-peak", input.MinMemPeak, + "min-rmem-avg", input.MinRmemAvg, + "min-rmem-peak", input.MinRmemPeak, + "min-res-avg", input.MinResAvg, + "min-res-peak", input.MinResPeak, + "min-rres-avg", input.MinRresAvg, + "min-rres-peak", input.MinRresPeak, + "min-gpu-avg", input.MinGpuAvg, + "min-gpu-peak", input.MinGpuPeak, + "max-gpu-avg", input.MaxGpuAvg, + "max-gpu-peak", input.MaxGpuPeak, + "min-rgpu-avg", input.MinRgpuAvg, + "min-rgpu-peak", input.MinRgpuPeak, + "max-rgpu-avg", input.MaxRgpuAvg, + "max-rgpu-peak", input.MaxRgpuPeak, + "min-gpumem-avg", input.MinGpumemAvg, + "min-gpumem-peak", input.MinGpumemPeak, + "min-rgpumem-avg", input.MinRgpumemAvg, + "min-rgpumem-peak", input.MinRgpumemPeak, + )..., + ), + ) + }) +} + +func addLoad(api huma.API) { + huma.Get(api, "/load", func( + ctx context.Context, + input *struct { + apiutil.AuthHeader + SampleAnalysisParams + FormatParams + Hourly string `query:"hourly"` + HalfHourly string `query:"half-hourly"` + Daily string `query:"daily"` + HalfDaily string `query:"half-daily"` + Weekly string `query:"weekly"` + None string `query:"none"` + Group string `query:"group"` + All string `query:"all"` + Last string `query:"last"` + Compact string `query:"compact"` + }, + ) (*QueryResponse, error) { + return queryCommand( + "load", + input.Auth, + append( + collectAll(&input.SampleAnalysisParams, &input.FormatParams), + collect( + "hourly", input.Hourly, + "half-hourly", input.HalfHourly, + "daily", input.Daily, + "half-daily", input.HalfDaily, + "weekly", input.Weekly, + "none", input.None, + "group", input.Group, + "all", input.All, + "last", input.Last, + "compact", input.Compact, + )..., + ), + ) + }) +} + +func addMetadata(api huma.API) { + huma.Get(api, "/metadata", func( + ctx context.Context, + input *struct { + apiutil.AuthHeader + SampleAnalysisParams + FormatParams + MergeByHostAndJob string `query:"merge-by-host-and-job"` + MergeByJob string `query:"merge-by-job"` + Times string `query:"times"` + Files string `query:"files"` + Bounds string `query:"bounds"` + }, + ) (*QueryResponse, error) { + return queryCommand( + "metadata", + input.Auth, + append( + collectAll(&input.SampleAnalysisParams, &input.FormatParams), + collect( + "merge-by-host-and-job", input.MergeByHostAndJob, + "merge-by-job", input.MergeByJob, + "times", input.Times, + "files", input.Files, + "bounds", input.Bounds, + )..., + ), + ) + }) +} + +func addNode(api huma.API) { + huma.Get(api, "/node", func( + ctx context.Context, + input *struct { + apiutil.AuthHeader + HostAnalysisParams + FormatParams + Newest string `query:"newest"` + }, + ) (*QueryResponse, error) { + return queryCommand( + "node", + input.Auth, + append( + collectAll(&input.HostAnalysisParams, &input.FormatParams), + collect("newest", input.Newest)..., + ), + ) + }) +} + +func addNodeprof(api huma.API) { + huma.Get(api, "/nodeprof", func( + ctx context.Context, + input *struct { + apiutil.AuthHeader + HostAnalysisParams + FormatParams + }, + ) (*QueryResponse, error) { + return queryCommand( + "nodeprof", + input.Auth, + collectAll(&input.HostAnalysisParams, &input.FormatParams), + ) + }) +} + +func addProfile(api huma.API) { + huma.Get(api, "/profile", func( + ctx context.Context, + input *struct { + apiutil.AuthHeader + SampleAnalysisParams + FormatParams + Max string `query:"max"` + Bucket string `query:"bucket"` + }, + ) (*QueryResponse, error) { + return queryCommand( + "parse", + input.Auth, + append( + collectAll(&input.SampleAnalysisParams, &input.FormatParams), + collect("max", input.Max, "bucket", input.Bucket)..., + ), + ) + }) +} + +func addSacct(api huma.API) { + huma.Get(api, "/sacct", func( + ctx context.Context, + input *struct { + apiutil.AuthHeader + HostAnalysisParams + FormatParams + States string `query:"state"` + Users string `query:"user"` + Accounts string `query:"account"` + Partitions string `query:"partition"` + Jobs string `query:"job"` + All string `query:"all"` + MinRuntime string `query:"min-runtime"` + MaxRuntime string `query:"max-runtime"` + MinReservedMem string `query:"min-reserved-mem"` + MaxReservedMem string `query:"max-reserved-mem"` + MinReservedCores string `query:"min-reserved-cores"` + MaxReservedCores string `query:"max-reserved-cores"` + SomeGPU string `query:"some-gpu"` + NoGPU string `query:"no-gpu"` + Regular string `query:"regular"` + Array string `query:"array"` + Het string `query:"het"` + }, + ) (*QueryResponse, error) { + return queryCommand( + "sacct", + input.Auth, + append( + collectAll(&input.HostAnalysisParams, &input.FormatParams), + collect( + "state", input.States, + "user", input.Users, + "account", input.Accounts, + "partition", input.Partitions, + "job", input.Jobs, + "all", input.All, + "min-runtime", input.MinRuntime, + "max-runtime", input.MaxRuntime, + "min-reserved-mem", input.MinReservedMem, + "max-reserved-mem", input.MaxReservedMem, + "min-reserved-cores", input.MinReservedCores, + "max-reserved-cores", input.MaxReservedCores, + "some-gpu", input.SomeGPU, + "no-gpu", input.NoGPU, + "regular", input.Regular, + "array", input.Array, + "het", input.Het, + )..., + ), + ) + }) +} + +func addSample(api huma.API) { + huma.Get(api, "/parse", func( + ctx context.Context, + input *struct { + apiutil.AuthHeader + SampleAnalysisParams + FormatParams + MergeByHostAndJob string `query:"merge-by-host-and-job"` + MergeByJob string `query:"merge-by-job"` + Clean string `query:"clean"` + LastN string `query:"last"` + }, + ) (*QueryResponse, error) { + return queryCommand( + "parse", + input.Auth, + append( + collectAll(&input.SampleAnalysisParams, &input.FormatParams), + collect( + "merge-by-host-and-job", input.MergeByHostAndJob, + "merge-by-job", input.MergeByJob, + "clean", input.Clean, + "last", input.LastN, + )..., + ), + ) + }) +} + +func addSnode(api huma.API) { + huma.Get(api, "/snode", func( + ctx context.Context, + input *struct { + apiutil.AuthHeader + HostAnalysisParams + FormatParams + }, + ) (*QueryResponse, error) { + return queryCommand( + "snode", + input.Auth, + collectAll(&input.HostAnalysisParams, &input.FormatParams), + ) + }) +} + +func addSpart(api huma.API) { + huma.Get(api, "/spart", func( + ctx context.Context, + input *struct { + apiutil.AuthHeader + HostAnalysisParams + FormatParams + }, + ) (*QueryResponse, error) { + return queryCommand( + "spart", + input.Auth, + collectAll(&input.HostAnalysisParams, &input.FormatParams), + ) + }) +} + +func addUptime(api huma.API) { + huma.Get(api, "/uptime", func( + ctx context.Context, + input *struct { + apiutil.AuthHeader + SampleAnalysisParams + FormatParams + Interval string `query:"interval"` + OnlyUp string `query:"only-up"` + OnlyDown string `query:"only-down"` + }, + ) (*QueryResponse, error) { + return queryCommand( + "uptime", + input.Auth, + append( + collectAll(&input.SampleAnalysisParams, &input.FormatParams), + collect( + "interval", input.Interval, + "only-up", input.OnlyUp, + "only-down", input.OnlyDown, + )..., + ), + ) + }) +} + +func addVersion(api huma.API) { + huma.Get(api, "/version", func( + ctx context.Context, + input *struct { + apiutil.AuthHeader + }, + ) (*QueryResponse, error) { + return queryCommand("version", input.Auth, []string{}) + }) +} + +// Run a query command. +// +// This must return `error` to be API compatible with Huma, but the error return is always a +// huma.StatusError. + +func queryCommand(command, auth string, params []string) (*QueryResponse, error) { + verbose := Verbose + if getAuthenticator != nil { + user, pass := apiutil.DecodeAuth(auth) + if !getAuthenticator.Authenticate(user, pass) { + return nil, huma.Error401Unauthorized(command + ": Unknown user/pass combination") + } + } + if verbose && auth != "" { + Log.Infof("Auth: %q", auth) + } + if jobanalyzerDir != "" { + params = append(params, "-jobanalyzer-dir", jobanalyzerDir) + } + if databaseURI != "" { + params = append(params, "-database-uri", databaseURI) + } + // not normally what we want but handy for debugging + // if verbose { + // params = append(params, "-v") + // } + cmdName := "" + if verbose { + Log.Infof( + "Command: %s %s", + path.Join(jobanalyzerDir, cmdName), + command+" "+strings.Join(params, " "), + ) + } + + anyCmd, _ := cmdlineHandler.ParseVerb(cmdName, command) + if anyCmd == nil { + return nil, huma.Error500InternalServerError(command + ": Unknown") + } + fs := cmd.NewCLI(command, anyCmd, cmdName, false) + err := cmdlineHandler.ParseArgs(command, params, anyCmd, fs) + if err != nil { + return nil, huma.Error400BadRequest(command + ": " + err.Error()) + } + + // The -cpuprofile option is ignored here, it should have forced ParseArgs to error out. + + var stdoutBuf, stderrBuf strings.Builder + err = cmdlineHandler.HandleCommand(anyCmd, nil, &stdoutBuf, &stderrBuf) + // In HandleCommand, the command line parser overrides the global setting. + Verbose = verbose + stdout := stdoutBuf.String() + stderr := stderrBuf.String() + if err != nil { + return nil, huma.Error400BadRequest(command + ": " + err.Error()) + } + if stderr != "" { + Log.Warningf(stderr, "") + } + + return &QueryResponse{Body: stdout}, nil +} + +// Query arguments. +// +// These structures follow the code in ../../cmd/args.go closely, and it's a little sad that this +// code is not there (as a general reification facility). We should consider whether it can be +// merged in. But we have our own needs for Huma to think about too. + +type Collectable interface { + Collect() []string +} + +type ClusterParams struct { + Cluster string `query:"cluster"` +} + +func (x *ClusterParams) Collect() []string { + return collect("cluster", x.Cluster) +} + +type SourceParams struct { + FromDate string `query:"from" doc:"ISO time stamp"` + ToDate string `query:"to" doc:"ISO time stamp"` +} + +func (x *SourceParams) Collect() []string { + return collect("from", x.FromDate, "to", x.ToDate) +} + +type QueryParams struct { + QueryStmt string `query:"q" doc:"Query expression"` +} + +func (x *QueryParams) Collect() []string { + return collect("q", x.QueryStmt) +} + +type HostParams struct { + // TODO: This is tricky. The argument parser does not allow comma separation here, but does + // allow repeated arguments. Not sure how this works with query args - repeats override, I + // think? Do we use the host list parser to separate comma-separated hosts? If so, we can just + // forward. The real problem is if the cli depends on repeated args and we can't do them with + // the REST API. + Host string `query:"host" doc:"Comma-separated host ranges"` +} + +func (x *HostParams) Collect() []string { + return collect("host", x.Host) +} + +type RecordFilterParams struct { + HostParams + User string `query:"user" doc:"Comma-separated users"` + ExcludeUser string `query:"exclude-user" doc:"Comma-separated users"` + Command string `query:"command"` + ExcludeCommand string `query:"exclude-command"` + ExcludeSystemJobs string `query:"exclude-system-jobs"` + Job string `query:"job"` + ExcludeJob string `query:"exclude-job"` +} + +func (x *RecordFilterParams) Collect() []string { + return append( + x.HostParams.Collect(), + collect( + "user", x.User, "exclude-user", x.ExcludeUser, + "command", x.Command, "exclude-command", x.ExcludeCommand, + "exclude-system-jobs", x.ExcludeSystemJobs, + "job", x.Job, "exclude-job", x.ExcludeJob, + )..., + ) +} + +type HostAnalysisParams struct { + ClusterParams + SourceParams + QueryParams + HostParams +} + +func (x *HostAnalysisParams) Collect() []string { + return collectAll(&x.ClusterParams, &x.SourceParams, &x.QueryParams, &x.HostParams) +} + +type SampleAnalysisParams struct { + ClusterParams + SourceParams + QueryParams + RecordFilterParams +} + +func (x *SampleAnalysisParams) Collect() []string { + return collectAll(&x.ClusterParams, &x.SourceParams, &x.QueryParams, &x.RecordFilterParams) +} + +type FormatParams struct { + Fmt string `query:"fmt" doc:"Format spec"` +} + +func (x *FormatParams) Collect() []string { + return collect("fmt", x.Fmt) +} + +type GpuIndexParam struct { + Gpu string `query:"gpu"` +} + +func (x *GpuIndexParam) Collect() []string { + return collect("gpu", x.Gpu) +} + +func collectAll(xs ...Collectable) []string { + result := make([]string, 0) + for _, x := range xs { + result = append(result, x.Collect()...) + } + return result +} + +func collect(xs ...any) []string { + if len(xs)%2 == 1 { + panic("Bad") + } + result := make([]string, 0) + for i := range len(xs) / 2 { + var s string + var add bool + switch v := xs[i*2+1].(type) { + case string: + if v != "" { + s = v + add = true + } + default: + panic("Type") + } + if add { + result = append(result, "-"+xs[i*2].(string)+"="+s) + } + } + return result +} diff --git a/code/sonalyze/daemon/api1/api1.go b/code/sonalyze/daemon/api1/api1.go new file mode 100644 index 00000000..1337b4ca --- /dev/null +++ b/code/sonalyze/daemon/api1/api1.go @@ -0,0 +1,201 @@ +// The v1 API follows the old v0 API but GET requests returns proper JSON objects instead of strings, and the JSON +// objects can have non-string values. +// +// The v1 API also has new insertion points for the new data, represented as JSON. The result of a +// POST is a JSON object with some data about the data that were received. + +package api1 + +import ( + "context" + "encoding/json" + + "go-utils/auth" + + "github.com/NordicHPC/sonar/util/formats/newfmt" + "github.com/danielgtaylor/huma/v2" + + "sonalyze/daemon/apiutil" + "sonalyze/db" +) + +var ( + postAuthenticator *auth.Authenticator +) + +func SetupAPI( + api huma.API, + insertAPI bool, + postAuthenticator_ *auth.Authenticator, +) { + postAuthenticator = postAuthenticator_ + grp := huma.NewGroup(api, "/api/v1") + + // v1 get apis go here, when we've implemented them + + if insertAPI { + addInsertSysinfoData(grp) + addInsertSampleData(grp) + addInsertJobData(grp) + addInsertClusterData(grp) + } +} + +// Insertion. +// +// Sonar does not require a specific return structure beyond the HTTP code. Here, on successful +// insertion, echo the cluster/node/topic/time back, since sonar assumes these will be unique. +// +// Insertion ops must return `error` to be API compatible with Huma, but the error return is always +// a huma.StatusError. + +type InsertionResponse struct { + Body InsertionResponseBody +} + +type InsertionResponseBody struct { + Cluster string `json:"cluster"` + Node string `json:"node,omitempty"` // There's no node for jobs and cluster data + Topic string `json:"topic"` + Time string `json:"time"` +} + +const ( + insertSampleName = "/insert/" + string(newfmt.DataTagSample) + insertSysinfoName = "/insert/" + string(newfmt.DataTagSysinfo) + insertJobsName = "/insert/" + string(newfmt.DataTagJobs) + insertClusterName = "/insert/" + string(newfmt.DataTagCluster) +) + +func addInsertSysinfoData(api huma.API) { + huma.Post(api, insertSysinfoName, func( + ctx context.Context, + input *struct { + apiutil.AuthHeader + Body newfmt.SysinfoEnvelope + }, + ) (*InsertionResponse, error) { + cluster := string(input.Body.Data.Attributes.Cluster) + ds, hErr := insertionSetup(insertSysinfoName, cluster, input.Auth) + if hErr != nil { + return nil, hErr + } + defer ds.FlushAsync() + nodename := string(input.Body.Data.Attributes.Node) + timestamp := string(input.Body.Data.Attributes.Time) + payload, _ := json.Marshal(input.Body) + err := ds.AppendSysinfoAsync(db.DataSysinfoV0JSON, nodename, timestamp, payload) + if err != nil { + return nil, huma.Error400BadRequest("insert: " + err.Error()) + } + return insertionResponse(cluster, nodename, timestamp, newfmt.DataTagSysinfo), nil + }) +} + +func addInsertSampleData(api huma.API) { + huma.Post(api, insertSampleName, func( + ctx context.Context, + input *struct { + apiutil.AuthHeader + Body newfmt.SampleEnvelope + }, + ) (*InsertionResponse, error) { + cluster := string(input.Body.Data.Attributes.Cluster) + ds, hErr := insertionSetup(insertSampleName, cluster, input.Auth) + if hErr != nil { + return nil, hErr + } + defer ds.FlushAsync() + nodename := string(input.Body.Data.Attributes.Node) + timestamp := string(input.Body.Data.Attributes.Time) + payload, _ := json.Marshal(input.Body) + err := ds.AppendSamplesAsync(db.DataSampleV0JSON, nodename, timestamp, payload) + if err != nil { + return nil, huma.Error400BadRequest("insert: " + err.Error()) + } + return insertionResponse(cluster, nodename, timestamp, newfmt.DataTagSample), nil + }) +} + +func addInsertJobData(api huma.API) { + huma.Post(api, insertJobsName, func( + ctx context.Context, + input *struct { + apiutil.AuthHeader + Body newfmt.JobsEnvelope + }, + ) (*InsertionResponse, error) { + cluster := string(input.Body.Data.Attributes.Cluster) + ds, hErr := insertionSetup(insertJobsName, cluster, input.Auth) + if hErr != nil { + return nil, hErr + } + defer ds.FlushAsync() + timestamp := string(input.Body.Data.Attributes.Time) + payload, _ := json.Marshal(input.Body) + err := ds.AppendSlurmSacctAsync(db.DataSlurmV0JSON, timestamp, payload) + if err != nil { + return nil, huma.Error400BadRequest("insert: " + err.Error()) + } + return insertionResponse(cluster, "", timestamp, newfmt.DataTagJobs), nil + }) +} + +func addInsertClusterData(api huma.API) { + huma.Post(api, insertClusterName, func( + ctx context.Context, + input *struct { + apiutil.AuthHeader + Body newfmt.ClusterEnvelope + }, + ) (*InsertionResponse, error) { + cluster := string(input.Body.Data.Attributes.Cluster) + ds, hErr := insertionSetup(insertClusterName, cluster, input.Auth) + if hErr != nil { + return nil, hErr + } + defer ds.FlushAsync() + timestamp := string(input.Body.Data.Attributes.Time) + payload, _ := json.Marshal(input.Body) + err := ds.AppendCluzterAsync(db.DataCluzterV0JSON, timestamp, payload) + if err != nil { + return nil, huma.Error400BadRequest("insert: " + err.Error()) + } + return insertionResponse(cluster, "", timestamp, newfmt.DataTagCluster), nil + }) +} + +func insertionResponse( + cluster, nodename, timestamp string, + datatype newfmt.DataType, +) *InsertionResponse { + return &InsertionResponse{ + Body: InsertionResponseBody{ + Cluster: cluster, + Node: nodename, + Topic: string(newfmt.DataTagSysinfo), + Time: timestamp, + }, + } +} + +func insertionSetup(path, cluster, auth string) (db.AppendablePersistentDataProvider, huma.StatusError) { + if postAuthenticator != nil { + user, pass := apiutil.DecodeAuth(auth) + if user != cluster { + return nil, huma.Error401Unauthorized("insert: Cluster in data does not match user in auth") + } + if !postAuthenticator.Authenticate(user, pass) { + return nil, huma.Error401Unauthorized("insert: Unknown user/pass combination") + } + } + meta, hErr := apiutil.GetClusterContext(insertSysinfoName, cluster) + if hErr != nil { + return nil, hErr + } + ds, err := db.OpenAppendablePersistentDirectoryDB(meta) + if err != nil { + return nil, huma.Error500InternalServerError("insert: incompatible database") + } + return ds, nil +} diff --git a/code/sonalyze/daemon/api2/api2.go b/code/sonalyze/daemon/api2/api2.go index 2e16b006..7569de2b 100644 --- a/code/sonalyze/daemon/api2/api2.go +++ b/code/sonalyze/daemon/api2/api2.go @@ -2,21 +2,16 @@ package api2 import ( "math" - "net/http" "time" "github.com/danielgtaylor/huma/v2" - "github.com/danielgtaylor/huma/v2/adapters/humago" "go-utils/gpuset" - . "sonalyze/common" "sonalyze/data/card" "sonalyze/data/common" "sonalyze/data/node" "sonalyze/data/sample" - "sonalyze/db" "sonalyze/db/repr" - "sonalyze/db/special" "sonalyze/db/types" ) @@ -42,25 +37,20 @@ const ( ) // The iface is the local interface: name:port. -func StartRestAPI(iface string) { - go func() { - router := http.NewServeMux() - api := humago.New(router, huma.DefaultConfig(apiName, apiVersion)) - grp := huma.NewGroup(api, "/api/v2") - addErrorMessages(grp) - addListClusters(grp) - addNodesInfo(grp) - addNodesLastProbeTimestamp(grp) - addNodesCpuTimeseries(grp) - addNodesMemoryTimeseries(grp) - addNodesGpuTimeseries(grp) - addNodesDiskstatsTimeseries(grp) - addNodesProcessGpuUtil(grp) - addProcesses(grp) - addProcessesGpu(grp) - addProcessesTimeseries(grp) - http.ListenAndServe(iface, router) - }() +func SetupAPI(api huma.API) { + grp := huma.NewGroup(api, "/api/v2") + addErrorMessages(grp) + addListClusters(grp) + addNodesInfo(grp) + addNodesLastProbeTimestamp(grp) + addNodesCpuTimeseries(grp) + addNodesMemoryTimeseries(grp) + addNodesGpuTimeseries(grp) + addNodesDiskstatsTimeseries(grp) + addNodesProcessGpuUtil(grp) + addProcesses(grp) + addProcessesGpu(grp) + addProcessesTimeseries(grp) } // This is called from the daemon's main thread when interrupted by signals. @@ -92,82 +82,6 @@ func canonicalizeInitialTimestep(t int64, resolution int64) int64 { return t } -func getClusterContext(opName, clusterName string) (types.Context, huma.StatusError) { - cluster := special.LookupCluster(clusterName) - if cluster == nil { - return nil, huma.Error400BadRequest(opName + ": Failed to find cluster " + clusterName) - } - return db.NewContextFromCluster(cluster), nil -} - -// Given a cluster, compute the from/to time based on the available data in the database for the cluster -// and any expressed from/to times. -func timeWindowFromData( - opName string, - meta types.Context, - startTimeInS, endTimeInS uint64, -) (from time.Time, to time.Time, hErr huma.StatusError) { - // TODO: Want to somehow document default timespan. - // - // Can we attach that to the api somehow without repeating it for every API? - - theLog, err := db.OpenReadOnlyDB(meta, types.MetaData) - if err != nil { - hErr = huma.Error500InternalServerError(opName+": Can't open database", err) - return - } - maxTime, err := theLog.MaxTime(true) - if err != nil { - maxTime = time.Now() - } - minTime, err := theLog.MinTime(true) - if err != nil { - minTime = maxTime - } - if Verbose { - Log.Infof("Min/max time: %v %v", minTime, maxTime) - } - - // Sensible defaults - to = maxTime - from = maxTime.Add(-defaultTimeWindow) - - // Overrides - start/end can be specified separately - if startTimeInS != 0 { - from = time.Unix(int64(startTimeInS), 0) - if endTimeInS == 0 { - to = from.Add(defaultTimeWindow) - } - } - if endTimeInS != 0 { - to = time.Unix(int64(endTimeInS), 0) - if startTimeInS == 0 { - from = to.Add(-defaultTimeWindow) - } - } - - // Validation - if from.After(to) { - hErr = huma.Error400BadRequest(opName+": Bad time value(s)", err) - return - } - - // Clamping to max window - if to.Sub(from) > maxTimeWindow { - from = to.Add(-maxTimeWindow) - } - - // Clamping to max/min times - if from.Before(minTime) { - from = minTime - } - if to.After(maxTime) { - to = maxTime - } - - return -} - func openSampleDataProvider(opName string, meta types.Context) (*sample.SampleDataProvider, huma.StatusError) { sdp, err := sample.OpenSampleDataProvider(meta) if err != nil { @@ -177,18 +91,6 @@ func openSampleDataProvider(opName string, meta types.Context) (*sample.SampleDa return sdp, nil } -func newHostFilter(opName, nodeName string) (*Hosts, huma.StatusError) { - var hostList []string - if nodeName != "" { - hostList = []string{nodeName} - } - hostFilter, err := NewHosts(true, hostList) - if err != nil { - return nil, huma.Error400BadRequest(opName+": Bad host list", err) - } - return hostFilter, nil -} - // Retrieve latest node metadata for the nodes within the time window. func getSysinfoAt( opName string, diff --git a/code/sonalyze/daemon/api2/list_clusters.go b/code/sonalyze/daemon/api2/list_clusters.go index 64c55340..bafc522b 100644 --- a/code/sonalyze/daemon/api2/list_clusters.go +++ b/code/sonalyze/daemon/api2/list_clusters.go @@ -9,6 +9,7 @@ import ( "github.com/danielgtaylor/huma/v2" + "sonalyze/daemon/apiutil" "sonalyze/data/common" "sonalyze/data/slurmpart" "sonalyze/db" @@ -53,7 +54,7 @@ func handleListClusters( resp := &ClusterResponse{} for _, c := range special.AllClusters() { meta := db.NewContextFromCluster(c) - _, to, hErr := timeWindowFromData(listClustersName, meta, 0, input.TimeInS) + _, to, hErr := apiutil.TimeWindowFromData(listClustersName, meta, 0, input.TimeInS) from := to.Add(-24 * time.Hour) if hErr != nil { return nil, hErr diff --git a/code/sonalyze/daemon/api2/nodes_cpu_timeseries.go b/code/sonalyze/daemon/api2/nodes_cpu_timeseries.go index fd325a8c..8f3c3a1d 100644 --- a/code/sonalyze/daemon/api2/nodes_cpu_timeseries.go +++ b/code/sonalyze/daemon/api2/nodes_cpu_timeseries.go @@ -6,6 +6,7 @@ import ( "github.com/danielgtaylor/huma/v2" + "sonalyze/daemon/apiutil" "sonalyze/data/sample" ) @@ -87,11 +88,11 @@ func computeProfile( startTimeInS, endTimeInS, resolutionInS uint64, nodename string, ) (map[string][]profStepData, huma.StatusError) { - meta, hErr := getClusterContext(opName, clusterName) + meta, hErr := apiutil.GetClusterContext(opName, clusterName) if hErr != nil { return nil, hErr } - from, to, hErr := timeWindowFromData(opName, meta, startTimeInS, endTimeInS) + from, to, hErr := apiutil.TimeWindowFromData(opName, meta, startTimeInS, endTimeInS) if hErr != nil { return nil, hErr } @@ -100,19 +101,15 @@ func computeProfile( bucket = int64(resolutionInS) } - var hostList []string - if nodename != "" { - hostList = []string{nodename} - } - sysinfo, hErr := getSysinfoAt(opName, meta, to, hostList) + hostFilter, hErr := apiutil.NewHostFilter(opName, nodename) if hErr != nil { return nil, hErr } - sdp, hErr := openSampleDataProvider(opName, meta) + sysinfo, hErr := getSysinfoAt(opName, meta, to, hostFilter.Patterns()) if hErr != nil { return nil, hErr } - hostFilter, hErr := newHostFilter(opName, nodename) + sdp, hErr := openSampleDataProvider(opName, meta) if hErr != nil { return nil, hErr } diff --git a/code/sonalyze/daemon/api2/nodes_diskstats_timeseries.go b/code/sonalyze/daemon/api2/nodes_diskstats_timeseries.go index 460dc8d5..ac53a873 100644 --- a/code/sonalyze/daemon/api2/nodes_diskstats_timeseries.go +++ b/code/sonalyze/daemon/api2/nodes_diskstats_timeseries.go @@ -9,6 +9,7 @@ import ( "github.com/danielgtaylor/huma/v2" . "sonalyze/common" + "sonalyze/daemon/apiutil" "sonalyze/data/common" "sonalyze/data/disksample" "sonalyze/db/repr" @@ -72,11 +73,11 @@ func handleNodesDiskstatsTimeseries( Nodename string `query:"nodename" doc:"Compressed node name list"` }, ) (*NodesDiskstatsTimeseriesResponse, error) { - meta, hErr := getClusterContext(nodesDiskstatsTimeseriesName, input.Cluster) + meta, hErr := apiutil.GetClusterContext(nodesDiskstatsTimeseriesName, input.Cluster) if hErr != nil { return nil, hErr } - from, to, hErr := timeWindowFromData( + from, to, hErr := apiutil.TimeWindowFromData( nodesDiskstatsTimeseriesName, meta, input.StartTimeInS, input.EndTimeInS) if hErr != nil { return nil, hErr diff --git a/code/sonalyze/daemon/api2/nodes_gpu_timeseries.go b/code/sonalyze/daemon/api2/nodes_gpu_timeseries.go index 6914cb58..64b3d9fb 100644 --- a/code/sonalyze/daemon/api2/nodes_gpu_timeseries.go +++ b/code/sonalyze/daemon/api2/nodes_gpu_timeseries.go @@ -6,6 +6,7 @@ import ( "github.com/danielgtaylor/huma/v2" + "sonalyze/daemon/apiutil" "sonalyze/data/gpusample" "sonalyze/db/repr" ) @@ -52,11 +53,11 @@ func handleNodesGpuTimeseries( Nodename string `query:"nodename" doc:"Compressed node name list"` }, ) (*NodesGpuTimeseriesResponse, error) { - meta, hErr := getClusterContext(nodesGpuTimeseriesName, input.Cluster) + meta, hErr := apiutil.GetClusterContext(nodesGpuTimeseriesName, input.Cluster) if hErr != nil { return nil, hErr } - from, to, hErr := timeWindowFromData( + from, to, hErr := apiutil.TimeWindowFromData( nodesGpuTimeseriesName, meta, input.StartTimeInS, input.EndTimeInS) if hErr != nil { return nil, hErr @@ -81,7 +82,7 @@ func handleNodesGpuTimeseries( return nil, huma.Error500InternalServerError( nodesGpuTimeseriesName+": failed to open gpu sample store", err) } - hostGlobber, hErr := newHostFilter(nodesGpuTimeseriesName, input.Nodename) + hostGlobber, hErr := apiutil.NewHostFilter(nodesGpuTimeseriesName, input.Nodename) if err != nil { return nil, hErr diff --git a/code/sonalyze/daemon/api2/nodes_info.go b/code/sonalyze/daemon/api2/nodes_info.go index 33497160..6a912dda 100644 --- a/code/sonalyze/daemon/api2/nodes_info.go +++ b/code/sonalyze/daemon/api2/nodes_info.go @@ -5,6 +5,8 @@ import ( "net/http" "github.com/danielgtaylor/huma/v2" + + "sonalyze/daemon/apiutil" ) // List all nodes in a cluster with the latest hardware and OS information. Note that the time @@ -57,11 +59,11 @@ func handleNodesInfo( TimeInS uint64 `query:"time_in_s" doc:"Posix timestamp"` }, ) (*NodesInfoResponse, error) { - meta, hErr := getClusterContext(nodesInfoName, input.Cluster) + meta, hErr := apiutil.GetClusterContext(nodesInfoName, input.Cluster) if hErr != nil { return nil, hErr } - _, to, hErr := timeWindowFromData(nodesInfoName, meta, 0, input.TimeInS) + _, to, hErr := apiutil.TimeWindowFromData(nodesInfoName, meta, 0, input.TimeInS) if hErr != nil { return nil, hErr } diff --git a/code/sonalyze/daemon/api2/nodes_last_probe_timestamp.go b/code/sonalyze/daemon/api2/nodes_last_probe_timestamp.go index 25f070f5..e764f471 100644 --- a/code/sonalyze/daemon/api2/nodes_last_probe_timestamp.go +++ b/code/sonalyze/daemon/api2/nodes_last_probe_timestamp.go @@ -14,6 +14,7 @@ import ( "github.com/danielgtaylor/huma/v2" + "sonalyze/daemon/apiutil" "sonalyze/data/sample" ) @@ -45,11 +46,11 @@ func handleNodesLastProbeTimestamp( }, ) (*NodesLastProbeTimestampResponse, error) { // Logic from cmd/metadata - meta, hErr := getClusterContext(nodesLastProbeTimestampName, input.Cluster) + meta, hErr := apiutil.GetClusterContext(nodesLastProbeTimestampName, input.Cluster) if hErr != nil { return nil, hErr } - from, to, hErr := timeWindowFromData( + from, to, hErr := apiutil.TimeWindowFromData( nodesLastProbeTimestampName, meta, input.TimeInS, input.TimeInS) if hErr != nil { return nil, hErr diff --git a/code/sonalyze/daemon/api2/nodes_process_gpu_util.go b/code/sonalyze/daemon/api2/nodes_process_gpu_util.go index f3b757fa..a4fea4ce 100644 --- a/code/sonalyze/daemon/api2/nodes_process_gpu_util.go +++ b/code/sonalyze/daemon/api2/nodes_process_gpu_util.go @@ -16,6 +16,7 @@ import ( "github.com/danielgtaylor/huma/v2" + "sonalyze/daemon/apiutil" "sonalyze/data/sample" ) @@ -56,11 +57,11 @@ func handleNodesProcessGpuUtil( WindowInS uint64 `query:"window_in_s" doc:"Width of averaging interval, default 300"` }, ) (*NodesProcessGpuUtilResponse, error) { - meta, hErr := getClusterContext(nodesProcessGpuUtilName, input.Cluster) + meta, hErr := apiutil.GetClusterContext(nodesProcessGpuUtilName, input.Cluster) if hErr != nil { return nil, hErr } - from, to, hErr := timeWindowFromData( + from, to, hErr := apiutil.TimeWindowFromData( nodesProcessGpuUtilName, meta, input.ReferenceTimeInS, input.ReferenceTimeInS) if hErr != nil { return nil, hErr @@ -70,7 +71,7 @@ func handleNodesProcessGpuUtil( from = from.Add(-w) to = to.Add(w) - hostFilter, hErr := newHostFilter(nodesProcessGpuUtilName, input.Nodename) + hostFilter, hErr := apiutil.NewHostFilter(nodesProcessGpuUtilName, input.Nodename) if hErr != nil { return nil, hErr } diff --git a/code/sonalyze/daemon/api2/processes.go b/code/sonalyze/daemon/api2/processes.go index 11e4443b..92c66896 100644 --- a/code/sonalyze/daemon/api2/processes.go +++ b/code/sonalyze/daemon/api2/processes.go @@ -5,6 +5,7 @@ import ( "net/http" "github.com/danielgtaylor/huma/v2" + "sonalyze/daemon/apiutil" "sonalyze/data/sample" ) @@ -55,15 +56,15 @@ func handleProcesses( TimeInS uint64 `query:"time_in_s" doc:"Posix timestamp"` }, ) (*ProcessesResponse, error) { - meta, hErr := getClusterContext(processesName, input.Cluster) + meta, hErr := apiutil.GetClusterContext(processesName, input.Cluster) if hErr != nil { return nil, hErr } - from, to, hErr := timeWindowFromData(processesName, meta, 0, input.TimeInS) + from, to, hErr := apiutil.TimeWindowFromData(processesName, meta, 0, input.TimeInS) if hErr != nil { return nil, hErr } - hostFilter, hErr := newHostFilter(processesName, input.Nodename) + hostFilter, hErr := apiutil.NewHostFilter(processesName, input.Nodename) if hErr != nil { return nil, hErr } diff --git a/code/sonalyze/daemon/api2/processes_gpu.go b/code/sonalyze/daemon/api2/processes_gpu.go index ab94a073..9905d805 100644 --- a/code/sonalyze/daemon/api2/processes_gpu.go +++ b/code/sonalyze/daemon/api2/processes_gpu.go @@ -7,6 +7,7 @@ import ( "github.com/danielgtaylor/huma/v2" . "sonalyze/common" + "sonalyze/daemon/apiutil" "sonalyze/data/gpusample" "sonalyze/data/sample" ) @@ -60,15 +61,15 @@ func handleProcessesGpu( TimeInS uint64 `query:"time_in_s" doc:"Posix timestamp"` }, ) (*ProcessesGpuResponse, error) { - meta, hErr := getClusterContext(processesGpuName, input.Cluster) + meta, hErr := apiutil.GetClusterContext(processesGpuName, input.Cluster) if hErr != nil { return nil, hErr } - from, to, hErr := timeWindowFromData(processesGpuName, meta, 0, input.TimeInS) + from, to, hErr := apiutil.TimeWindowFromData(processesGpuName, meta, 0, input.TimeInS) if hErr != nil { return nil, hErr } - hostFilter, hErr := newHostFilter(processesGpuName, input.Nodename) + hostFilter, hErr := apiutil.NewHostFilter(processesGpuName, input.Nodename) if hErr != nil { return nil, hErr } diff --git a/code/sonalyze/daemon/api2/processes_timeseries.go b/code/sonalyze/daemon/api2/processes_timeseries.go index e47f292a..8e9f4ffd 100644 --- a/code/sonalyze/daemon/api2/processes_timeseries.go +++ b/code/sonalyze/daemon/api2/processes_timeseries.go @@ -6,6 +6,7 @@ import ( "github.com/danielgtaylor/huma/v2" + "sonalyze/daemon/apiutil" "sonalyze/data/sample" ) @@ -57,11 +58,11 @@ func handleProcessesTimeseries( Nodename string `query:"nodename" doc:"Compressed node name list"` }, ) (*ProcessesTimeseriesResponse, error) { - meta, hErr := getClusterContext(processesTimeseriesName, input.Cluster) + meta, hErr := apiutil.GetClusterContext(processesTimeseriesName, input.Cluster) if hErr != nil { return nil, hErr } - from, to, hErr := timeWindowFromData(processesTimeseriesName, meta, input.StartTimeInS, input.EndTimeInS) + from, to, hErr := apiutil.TimeWindowFromData(processesTimeseriesName, meta, input.StartTimeInS, input.EndTimeInS) if hErr != nil { return nil, hErr } @@ -69,7 +70,7 @@ func handleProcessesTimeseries( if input.ResolutionInS != 0 { bucket = int64(input.ResolutionInS) } - hostFilter, hErr := newHostFilter(processesTimeseriesName, input.Nodename) + hostFilter, hErr := apiutil.NewHostFilter(processesTimeseriesName, input.Nodename) if hErr != nil { return nil, hErr } diff --git a/code/sonalyze/daemon/apiutil/apiutil.go b/code/sonalyze/daemon/apiutil/apiutil.go new file mode 100644 index 00000000..cc7bb742 --- /dev/null +++ b/code/sonalyze/daemon/apiutil/apiutil.go @@ -0,0 +1,137 @@ +package apiutil + +import ( + "net/http" + "slices" + "time" + + "github.com/danielgtaylor/huma/v2" + "github.com/danielgtaylor/huma/v2/adapters/humago" + + . "sonalyze/common" + "sonalyze/db" + "sonalyze/db/special" + "sonalyze/db/types" +) + +// Time windows for searching for data corresponding to something else. These ought to be +// parameters, probably, not hardcoded. +const ( + // By default the time window is the last hour before "now", or latest datum available in the + // database. This is compatible with slurm-monitor. + defaultTimeWindow = 1 * time.Hour + + // The max time window for searching is 2 weeks, or we risk overloading the server. This too is + // compatible with slurm-monitor. + maxTimeWindow = 24 * time.Hour * 14 +) + +const ( + apiName = "sonalyze API" + apiVersion = "2" +) + +var router humago.Mux +var iface string + +func CreateAPI(iface_ string) huma.API { + iface = iface_ + router = http.NewServeMux() + // This sucks - one version to rule them all even for /api/vx. But in that case it's best to + // report the highest version here. + return humago.New(router, huma.DefaultConfig(apiName, apiVersion)) +} + +func RunAPI() { + // FIXME - Not quite what we want, cf our http abstraction, but OK for now + go http.ListenAndServe(iface, router) +} + +func StopAPI() { + // FIXME - implement StopAPI() +} + +func GetClusterContext(opName, clusterName string) (types.Context, huma.StatusError) { + cluster := special.LookupCluster(clusterName) + if cluster == nil { + return nil, huma.Error400BadRequest(opName + ": Failed to find cluster " + clusterName) + } + return db.NewContextFromCluster(cluster), nil +} + +// Given a cluster, compute the from/to time based on the available data in the database for the cluster +// and any expressed from/to times. +func TimeWindowFromData( + opName string, + meta types.Context, + startTimeInS, endTimeInS uint64, +) (from time.Time, to time.Time, hErr huma.StatusError) { + // TODO: Want to somehow document default timespan. + // + // Can we attach that to the api somehow without repeating it for every API? + + theLog, err := db.OpenReadOnlyDB(meta, types.MetaData) + if err != nil { + hErr = huma.Error500InternalServerError(opName+": Can't open database", err) + return + } + maxTime, err := theLog.MaxTime(true) + if err != nil { + maxTime = time.Now() + } + minTime, err := theLog.MinTime(true) + if err != nil { + minTime = maxTime + } + if Verbose { + Log.Infof("Min/max time: %v %v", minTime, maxTime) + } + + // Sensible defaults + to = maxTime + from = maxTime.Add(-defaultTimeWindow) + + // Overrides - start/end can be specified separately + if startTimeInS != 0 { + from = time.Unix(int64(startTimeInS), 0) + if endTimeInS == 0 { + to = from.Add(defaultTimeWindow) + } + } + if endTimeInS != 0 { + to = time.Unix(int64(endTimeInS), 0) + if startTimeInS == 0 { + from = to.Add(-defaultTimeWindow) + } + } + + // Validation + if from.After(to) { + hErr = huma.Error400BadRequest(opName+": Bad time value(s)", err) + return + } + + // Clamping to max window + if to.Sub(from) > maxTimeWindow { + from = to.Add(-maxTimeWindow) + } + + // Clamping to max/min times + if from.Before(minTime) { + from = minTime + } + if to.After(maxTime) { + to = maxTime + } + + return +} + +func NewHostFilter(opName string, patternList ...string) (*Hosts, huma.StatusError) { + patternList = slices.DeleteFunc(patternList, func(s string) bool { return s == "" }) + hostFilter, err := NewHosts(true, patternList) + if err != nil { + return nil, huma.Error400BadRequest(opName+": Bad host list", err) + } + return hostFilter, nil +} diff --git a/code/sonalyze/daemon/apiutil/auth.go b/code/sonalyze/daemon/apiutil/auth.go new file mode 100644 index 00000000..7deeb3e6 --- /dev/null +++ b/code/sonalyze/daemon/apiutil/auth.go @@ -0,0 +1,25 @@ +// Authorization stuff + +package apiutil + +import ( + "encoding/base64" + "strings" +) + +type AuthHeader struct { + Auth string `header:"Authorization"` +} + +func DecodeAuth(auth string) (string, string) { + if strings.HasPrefix(auth, "Basic ") { + data, err := base64.StdEncoding.DecodeString(strings.TrimSpace(auth[6:])) + if err == nil { + user, pass, found := strings.Cut(string(data), ":") + if found { + return strings.TrimSpace(user), strings.TrimSpace(pass) + } + } + } + return "-NO-USER-", "-NO-PASS-" +} diff --git a/code/sonalyze/daemon/daemon.go b/code/sonalyze/daemon/daemon.go index 256b4134..e133c070 100644 --- a/code/sonalyze/daemon/daemon.go +++ b/code/sonalyze/daemon/daemon.go @@ -37,10 +37,6 @@ // If present, this specifies a database access point. The database is used for data access rather // than the data/ subdirectory of the jobanalyzer directory. // -// -port -// -// This is an optional argument. It is the port number on which to listen, the default is 8087. -// // -analysis-auth // -password-file // @@ -55,25 +51,12 @@ // header. (If the connection is not HTTPS then the password may have been intercepted in // transit.) // -// -match-user-and-cluster -// -// Optional but *strongly* recommended argument. If set, and -upload-auth is also provided, then -// the user name provided by the HTTP connection must match the cluster name in the data packet or -// query string. The effect is to make it possible for each cluster to have its own -// username:password pair and for one cluster not to be able to upload data for another. -// // -cache // // Cache raw or parboiled data in memory between operations. The size is expressed as nnM // (megabytes) or nnG (gigabytes). A sensible size *might* be about 256MB per 100 (slurm) nodes // per week. // -// -no-add -// -// This disables the /add, /sysinfo and /sonar-freecsv endpoints and the options -upload-auth and -// -match-user-and-cluster. The implication is that we're either running on a read-only database -// or we're using -kafka to handle all ingestion. -// // -kafka // // EXPERIMENTAL. The daemon will attempt to ingest data over a unencrypted and unauthenticated @@ -82,9 +65,18 @@ // // -rest-api // -// EXPERIMENTAL. The daemon will present a subset of the slurm-monitor REST API v2 on the given -// interface (in the form interface:port, e.g. "localhost:8888"). Access the /openapi.json or -// /openapi.yaml endpoint on that interface to retrieve API documentation. +// The daemon will present various APIs on the given interface (in the form interface:port, +// e.g. "localhost:8888"). Access the /openapi.json or /openapi.yaml endpoint on that interface +// to retrieve API documentation. Normally under /api/v0 there will be the old sonalyze API (so +// /api/v0/jobs corresponds to the old /jobs API), under /api/v1 there will be a "clean" API more +// or less aligned with the v0 API but with clean JSON output and not the idiosyncrasies of v0, +// and under /api/v2 there is a subset of the slurm-monitor REST API v2. +// +// -insert +// +// Enable the /api/v1/insert points in the REST API. Normally this API is enabled only when +// running without a -database-uri and without -kafka (though it is not incompatible with the +// latter). // // Termination: // @@ -107,7 +99,6 @@ package daemon import ( _ "embed" - "errors" "fmt" "io" @@ -118,7 +109,6 @@ import ( const ( defaultListenPort = 8087 logTag = "jobanalyzer/sonalyze" - authRealm = "Jobanalyzer remote access" ) // MT: Immutable (no mutator operations) and thread-safe. @@ -129,13 +119,12 @@ type DaemonCommand struct { DevArgs VerboseArgs DatabaseArgs - port uint getAuthFile string postAuthFile string matchUserAndCluster bool kafkaBroker string - noAdd bool - restAPI2 string + restAPI string + insert bool getAuthenticator *auth.Authenticator postAuthenticator *auth.Authenticator @@ -153,14 +142,13 @@ func (dc *DaemonCommand) Add(fs *CLI) { dc.VerboseArgs.Add(fs) dc.DatabaseArgs.Add(fs, DBArgOptions{RequireFullDatabase: true}) fs.Group("daemon-configuration") - fs.UintVar(&dc.port, "port", defaultListenPort, "Listen for connections on `port`") fs.StringVar(&dc.getAuthFile, "analysis-auth", "", "Authentication info `filename` for analysis access") fs.StringVar(&dc.postAuthFile, "upload-auth", "", "Authentication info `filename` for data upload access") fs.BoolVar(&dc.matchUserAndCluster, "match-user-and-cluster", false, "Require user name to match cluster name") fs.StringVar(&dc.getAuthFile, "password-file", "", "Alias for -analysis-auth") fs.StringVar(&dc.kafkaBroker, "kafka", "", "Ingest data from this broker for all known clusters") - fs.BoolVar(&dc.noAdd, "no-add", false, "Disable HTTPS ingestion") - fs.StringVar(&dc.restAPI2, "rest-api", "", "Enable subset slurm-monitor API v2 on this interface:port") + fs.StringVar(&dc.restAPI, "rest-api", "", "Serve /api/v0, /api/v1 and /api/v2 on this interface:port") + fs.BoolVar(&dc.insert, "insert", false, "Enable the /api/v1/insert points") } //go:embed summary.txt @@ -171,27 +159,30 @@ func (dc *DaemonCommand) Summary(out io.Writer) { } func (dc *DaemonCommand) Validate() error { - var e1, e2, e4, e5, e7, e8 error - e1 = dc.DevArgs.Validate() - e2 = dc.VerboseArgs.Validate() + if err := dc.DevArgs.Validate(); err != nil { + return err + } + if err := dc.VerboseArgs.Validate(); err != nil { + return err + } if dc.getAuthFile != "" { - dc.getAuthenticator, e4 = auth.ReadPasswords(dc.getAuthFile) - if e4 != nil { - e4 = fmt.Errorf("Failed to read analysis authentication file: %v", e4) + var err error + dc.getAuthenticator, err = auth.ReadPasswords(dc.getAuthFile) + if err != nil { + return fmt.Errorf("Failed to read analysis authentication file: %v", err) } } if dc.postAuthFile != "" { - dc.postAuthenticator, e5 = auth.ReadPasswords(dc.postAuthFile) - if e5 != nil { - return fmt.Errorf("Failed to read upload authentication file: %v", e5) + var err error + dc.postAuthenticator, err = auth.ReadPasswords(dc.postAuthFile) + if err != nil { + return fmt.Errorf("Failed to read upload authentication file: %v", err) } } - if dc.noAdd { - if dc.matchUserAndCluster || dc.postAuthFile != "" { - e8 = errors.New("The -no-add switch precludes https upload parameters") - } + if dc.insert && dc.DatabaseURI() != "" { + return fmt.Errorf("Can't have both -database-uri and -insert") } - return errors.Join(e1, e2, e4, e5, e7, e8) + return nil } func (dc *DaemonCommand) ReifyForRemote(x *ArgReifier) error { diff --git a/code/sonalyze/daemon/perform.go b/code/sonalyze/daemon/perform.go index c60d3a2c..2e9e30cd 100644 --- a/code/sonalyze/daemon/perform.go +++ b/code/sonalyze/daemon/perform.go @@ -1,36 +1,22 @@ -// See ../TECHNICAL.md for a definition of the protocol. +// See ../doc/TECHNICAL.md for a definition of the protocol. // -// When adding a new command to the daemon, several points in this file have to be updated: -// -// - a new handler has to be installed in RunDaemon() -// - any special argument construction has to be created in httpGetHandler() (several places) or -// httpPostHandler() -// - any local-only arguments that should never be forwarded need to be added to the blacklist -// in argOk() -// -// In addition, due to the structure of the URL syntax, a new command point may need to be added to -// the HTTP server's configuration file. +// When adding a new command to the daemon, several points in the API implementations have to be +// updated, see comments in the subdirectories. package daemon import ( - "bytes" - "errors" "fmt" "io" "log/syslog" - "net/http" - "net/url" - "path" - "strings" "syscall" - "go-utils/auth" - "go-utils/httpsrv" "go-utils/process" - . "sonalyze/cmd" . "sonalyze/common" + "sonalyze/daemon/api0" + "sonalyze/daemon/api1" "sonalyze/daemon/api2" + "sonalyze/daemon/apiutil" "sonalyze/db" "sonalyze/db/special" ) @@ -68,400 +54,29 @@ func (dc *DaemonCommand) RunDaemon(_ io.Reader, _, stderr io.Writer) error { } } - // Note "daemon" is not a command here - if !dc.noAdd { - http.HandleFunc("/add", httpAddHandler(dc)) - } - // Keep these alphabetical. - // - // WHEN UPDATING THESE, ALSO UPDATE SWITCH IN ../application/command.go, HELP TEXT IN THE SAME - // PLACE, AND ANY WEB SERVER CONFIG. - http.HandleFunc("/card", httpGetHandler(dc, "card")) - http.HandleFunc("/cluster", httpGetHandler(dc, "cluster")) - http.HandleFunc("/config", httpGetHandler(dc, "config")) - http.HandleFunc("/diskprof", httpGetHandler(dc, "diskprof")) - http.HandleFunc("/gpu", httpGetHandler(dc, "gpu")) - http.HandleFunc("/jobs", httpGetHandler(dc, "jobs")) - http.HandleFunc("/load", httpGetHandler(dc, "load")) - http.HandleFunc("/metadata", httpGetHandler(dc, "metadata")) - http.HandleFunc("/node", httpGetHandler(dc, "node")) - http.HandleFunc("/nodeprof", httpGetHandler(dc, "nodeprof")) - http.HandleFunc("/parse", httpGetHandler(dc, "sample")) - http.HandleFunc("/profile", httpGetHandler(dc, "profile")) - http.HandleFunc("/report", httpGetHandler(dc, "report")) - http.HandleFunc("/sacct", httpGetHandler(dc, "sacct")) - http.HandleFunc("/sample", httpGetHandler(dc, "sample")) - http.HandleFunc("/snode", httpGetHandler(dc, "snode")) - http.HandleFunc("/spart", httpGetHandler(dc, "spart")) - // Omitting `top` for now because it is very limited. - http.HandleFunc("/uptime", httpGetHandler(dc, "uptime")) - http.HandleFunc("/version", httpGetHandler(dc, "version")) - if !dc.noAdd { - // These request names are compatible with the older `infiltrate` and `sonalyzed`, and with the - // upload infra already running on the clusters. We'd like to get rid of them eventually. - http.HandleFunc("/sonar-freecsv", httpPostHandler(dc, "sample", "text/csv")) - http.HandleFunc("/sysinfo", httpPostHandler(dc, "sysinfo", "application/json")) - } - - if dc.restAPI2 != "" { - api2.StartRestAPI(dc.restAPI2) - } - - var programFailed bool - s := httpsrv.New(Verbose, int(dc.port), func(err error) { - programFailed = true - }) - go s.Start() - - // Wait here until we're stopped by SIGHUP (manual) or SIGTERM (from OS during shutdown). - // - // TODO: IMPROVEME: For SIGHUP, we should not exit but should instead reread the password file, - // the cluster aliases file, and the configuration files (we could purge the config object - // cache). Really we must be purging the entire LogFile cache in this case too. - process.WaitForSignal(syscall.SIGHUP, syscall.SIGTERM) - s.Stop() - if dc.restAPI2 != "" { - api2.StopRestAPI() - } - - if programFailed { - return errors.New("HTTP server failed to start, or errored out") - } - return nil -} - -// HTTP handlers -// -// Documented behavior: the server will close the request body, we don't need to do it. -// -// I can find no documentation about needing to consume the body in case of an early (error) -// return, nor anything obvious in the net/http source code to indicate this, nor has google -// turned up anything. So request handler code assumes it's not necessary. - -func httpGetHandler( - dc *DaemonCommand, - command string, -) func(http.ResponseWriter, *http.Request) { - return func(w http.ResponseWriter, r *http.Request) { - _, _, _, ok := - requestPreamble(dc, command, w, r, "GET", dc.getAuthenticator, authRealm, "") - if !ok { - return - } - - verb := command - arguments := []string{ - "-jobanalyzer-dir", + if dc.restAPI != "" { + api := apiutil.CreateAPI(dc.restAPI) + api0.SetupAPI( + api, dc.JobanalyzerDir(), - } - - for name, vs := range r.URL.Query() { - if why := argOk(command, name); why != "" { - w.WriteHeader(400) - fmt.Fprintf(w, "Bad parameter %s: %s", name, why) - if Verbose { - Log.Warningf("Bad parameter %s: %s", name, why) - } - return - } - - // Repeats are OK, the commands allow them in a number of cases. - // - // Booleans carry the regular true/false values or, for backward compatibility, the old - // MagicBoolean value. See comments in ../command/reify.go. - - for _, v := range vs { - // The MagicBoolean is handled by transforming it to "true", for uniformity. - if v == MagicBoolean { - v = "true" - } - // Go requires "=" between parameter and name for boolean params, but allows it for - // every type, so do it uniformly. - arguments = append(arguments, "-"+name+"="+v) - } - } - - stdout, ok := runSonalyze(dc, w, verb, arguments, []byte{}) - if !ok { - return - } - - w.WriteHeader(200) - fmt.Fprint(w, stdout) - } -} - -func parseAddQuery(query url.Values, name string) (isSet bool, err error) { - vs, isName := query[name] - if !isName { - return - } - if len(vs) == 1 { - switch vs[0] { - case "true", MagicBoolean: - isSet = true - return - case "false": - return - } - } - err = fmt.Errorf("Bad `%s` parameter", name) - return -} - -func httpAddHandler(dc *DaemonCommand) func(http.ResponseWriter, *http.Request) { - forSample := httpPostHandler(dc, "sample", "text/csv") - forSlurmSacct := httpPostHandler(dc, "slurm-sacct", "text/csv") - forSysinfo := httpPostHandler(dc, "sysinfo", "application/json") - return func(w http.ResponseWriter, r *http.Request) { - query := r.URL.Query() - isSample, e1 := parseAddQuery(query, "sample") - isSysinfo, e2 := parseAddQuery(query, "sysinfo") - isSlurmSacct, e3 := parseAddQuery(query, "slurm-sacct") - n := 0 - if isSample { - n++ - } - if isSysinfo { - n++ - } - if isSlurmSacct { - n++ - } - var e4 error - if n != 1 { - e4 = errors.New("Need exactly one of `-sample`, `-sysinfo`, or `-slurm-sacct`") - } - if err := errors.Join(e1, e2, e3, e4); err != nil { - w.WriteHeader(400) - fmt.Fprintf(w, "Bad operation: %s", err.Error()) - if Verbose { - Log.Warningf("Bad operation: %s", err.Error()) - } - return - } - switch { - case isSample: - forSample(w, r) - case isSysinfo: - forSysinfo(w, r) - case isSlurmSacct: - forSlurmSacct(w, r) - default: - panic("Unexpected") - } - } -} - -func httpPostHandler( - dc *DaemonCommand, - dataType string, - contentType string, -) func(http.ResponseWriter, *http.Request) { - return func(w http.ResponseWriter, r *http.Request) { - payload, userName, clusterName, ok := - requestPreamble(dc, "add", w, r, "POST", dc.postAuthenticator, "", contentType) - if !ok { - return - } - - if dc.matchUserAndCluster && userName != "" && clusterName != userName { - w.WriteHeader(400) - fmt.Fprintf(w, "Upload not authorized") - if Verbose { - Log.Warningf("Upload not authorized") - } - return - } - - verb := "add" - arguments := []string{ - "-" + dataType, - "-jobanalyzer-dir", - dc.JobanalyzerDir(), - "-cluster", - clusterName, - } - - stdout, ok := runSonalyze(dc, w, verb, arguments, payload) - if !ok { - return - } - - w.WriteHeader(200) - fmt.Fprint(w, stdout) - } -} - -func requestPreamble( - dc *DaemonCommand, - command string, - w http.ResponseWriter, - r *http.Request, - method string, - authenticator *auth.Authenticator, - realm string, - contentType string, -) (payload []byte, userName, clusterName string, ok bool) { - if Verbose { - // Header reveals auth info, don't put it into logs - Log.Infof("Request from %s: %v", r.RemoteAddr, r.URL.String()) - } - - if !httpsrv.AssertMethod(w, r, method, Verbose) { - return - } - - authOk, userName := httpsrv.Authenticate(w, r, authenticator, realm, Verbose) - if !authOk { - return - } - - payload, havePayload := httpsrv.ReadPayload(w, r, Verbose) - if !havePayload { - return - } - - if contentType != "" { - if !httpsrv.AssertContentType(w, r, contentType, Verbose) { - return - } - } - - clusterValues, found := r.URL.Query()["cluster"] - if command != "cluster" && command != "version" { - if !found || len(clusterValues) != 1 || clusterValues[0] == "" { - w.WriteHeader(400) - fmt.Fprintf(w, "Bad parameters - missing or empty or repeated 'cluster'") - if Verbose { - Log.Warningf("Bad parameters - missing or empty or repeated 'cluster'") - } - return - } - clusterName = special.ResolveClusterName(clusterValues[0]) - } else { - if found { - w.WriteHeader(400) - fmt.Fprintf(w, "Bad parameters - illegal 'cluster'") - if Verbose { - Log.Warningf("Bad parameters - illegal 'cluster'") - } - return - } - } - - ok = true - return -} - -func runSonalyze( - dc *DaemonCommand, - w http.ResponseWriter, - verb string, - arguments []string, - input []byte, -) (stdout string, ok bool) { - //Log.Warningf("%s %v", verb, arguments) - cmdName := "" - - // Run the command and report the result - - if Verbose { - Log.Infof( - "Command: %s %s", - path.Join(dc.JobanalyzerDir(), cmdName), - verb+" "+strings.Join(arguments, " "), + dc.DatabaseURI(), + dc.cmdlineHandler, + dc.getAuthenticator, ) + api1.SetupAPI( + api, + dc.insert, + dc.postAuthenticator, + ) + api2.SetupAPI(api) + apiutil.RunAPI() } - anyCmd, _ := dc.cmdlineHandler.ParseVerb(cmdName, verb) - if anyCmd == nil { - errResponse(w, 400, fmt.Errorf("Bad verb in daemon-dispatched command: %s", verb), "") - return - } - fs := NewCLI(verb, anyCmd, cmdName, false) - err := dc.cmdlineHandler.ParseArgs(verb, arguments, anyCmd, fs) - if err != nil { - errResponse(w, 400, err, "") - return - } - - // The -cpuprofile option is ignored here, it should have forced ParseArgs to error out. - - var stdoutBuf, stderrBuf strings.Builder - err = dc.cmdlineHandler.HandleCommand(anyCmd, bytes.NewReader(input), &stdoutBuf, &stderrBuf) - stdout = stdoutBuf.String() - stderr := stderrBuf.String() - if err != nil { - errResponse(w, 400, err, stderr) - return - } - if stderr != "" { - Log.Warningf(stderr, "") - } - - ok = true - return -} - -func errResponse(w http.ResponseWriter, code int, err error, stderr string) { - w.WriteHeader(code) - fmt.Fprint(w, err.Error()) - if stderr != "" { - fmt.Fprint(w, "\n", stderr) - } - if Verbose { - Log.Warningf("ERROR: %v", err) - } -} - -// Disallow argument names that are malformed or are specific values. This is not fabulous but -// maintaining a whitelist is a lot of work. - -func argOk(command, arg string) string { - // Args are alphabetic and lower-case only, except - is allowed except in the first position - for i, c := range arg { - switch { - case c >= 'a' && c <= 'z': - // OK - case c == '-' && i > 0: - // OK - default: - return "Bad character" - } - } + process.WaitForSignal(syscall.SIGHUP, syscall.SIGTERM) - // Disallow short options (pretty primitive) - // Except -q, good grief. - if len(arg) <= 1 && arg != "q" { - return "Short option" + if dc.restAPI != "" { + apiutil.StopAPI() } - // Specific names are excluded, for now, the names in the comments relate to structure names in - // sonalyze/src/sonalyze.rs or sonalyze/command/args.go. - switch arg { - case "cpuprofile": - // DevArgs (go) - return "Developer arg" - case "data-path", "data-dir": - // SourceArgs (rust), DataDirArgs (go) - return "Local arg" - case "remote", "auth-file": - // SourceArgs - return "Source arg" - case "config-file": - // ConfigFileArgs - return "Config arg" - case "report-dir": - // ReportCommand - return "Local arg" - case "verbose", "v": - // VerboseArgs - return "Developer arg" - case "raw": - // MetaArgs (rust) - return "Meta arg" - default: - return "" - } + return nil } diff --git a/code/sonalyze/doc/STALE.md b/code/sonalyze/doc/STALE.md index 918f1969..6ae4de31 100644 --- a/code/sonalyze/doc/STALE.md +++ b/code/sonalyze/doc/STALE.md @@ -63,27 +63,7 @@ values, in the manner of `-f1w`; it must be `-f 1w`. ### Data insertion operations -The `add` operation is obsolete but still available. It will be removed. - -The `add` operation is used to ingest data into the database. In addition to an option -`--data-path` that identifies the cluster directory, the main mode identifies the type of data. The -data are self-identifying and are always read from stdin. The data are always in older Sonar -formats (still supported by sonalyze, but considered obsolete and should not be used). - -`--sample` - - The data are "free CSV" data coming from `sonar ps`, representing samples for one or more systems, zero - or more records - -`--sysinfo` - - The data are old-format JSON sysinfo data coming from `sonar sysinfo`, identifying one particular - system, one record exactly. - -`--slurm-sacct` - - The data are "free CSV" data coming from `sacctd`, which extracts data from the Slurm databases - using `sacct`. +(Not currently supported, the old "add" command was removed.) ### Analysis and data extraction operations From 1a5c7b3df843d1f1f4ac08c24691c4cf5829fbf3 Mon Sep 17 00:00:00 2001 From: Lars T Hansen Date: Wed, 29 Apr 2026 09:54:38 +0200 Subject: [PATCH 2/4] Clean up tests --- code/sonalyze/daemon/api0/api0.go | 12 ++-- code/tests/sonalyze/add/add_smoketest.sh | 4 ++ code/tests/sonalyzed/cluster1-samples.json | 1 + code/tests/sonalyzed/cluster1-sysinfo.json | 10 +-- code/tests/sonalyzed/cluster2-samples.json | 1 + code/tests/sonalyzed/cluster2-sysinfo.json | 10 +-- .../{ => obsolete}/cluster1-samples.csv | 0 .../sonalyzed/obsolete/cluster1-sysinfo.json | 9 +++ .../{ => obsolete}/cluster2-samples.csv | 0 .../sonalyzed/obsolete/cluster2-sysinfo.json | 9 +++ .../sonalyzed/obsolete/simple.sh.obsolete | 72 +++++++++++++++++++ code/tests/sonalyzed/simple.sh | 60 ++++++++++------ 12 files changed, 139 insertions(+), 49 deletions(-) create mode 100644 code/tests/sonalyzed/cluster1-samples.json create mode 100644 code/tests/sonalyzed/cluster2-samples.json rename code/tests/sonalyzed/{ => obsolete}/cluster1-samples.csv (100%) create mode 100644 code/tests/sonalyzed/obsolete/cluster1-sysinfo.json rename code/tests/sonalyzed/{ => obsolete}/cluster2-samples.csv (100%) create mode 100644 code/tests/sonalyzed/obsolete/cluster2-sysinfo.json create mode 100755 code/tests/sonalyzed/obsolete/simple.sh.obsolete diff --git a/code/sonalyze/daemon/api0/api0.go b/code/sonalyze/daemon/api0/api0.go index 12bc5266..76b5ed3e 100644 --- a/code/sonalyze/daemon/api0/api0.go +++ b/code/sonalyze/daemon/api0/api0.go @@ -573,9 +573,10 @@ func queryCommand(command, auth string, params []string) (*QueryResponse, error) return nil, huma.Error401Unauthorized(command + ": Unknown user/pass combination") } } - if verbose && auth != "" { - Log.Infof("Auth: %q", auth) - } + // not normally what we want but handy for debugging + // if verbose && auth != "" { + // Log.Infof("Auth: %q", auth) + // } if jobanalyzerDir != "" { params = append(params, "-jobanalyzer-dir", jobanalyzerDir) } @@ -659,11 +660,6 @@ func (x *QueryParams) Collect() []string { } type HostParams struct { - // TODO: This is tricky. The argument parser does not allow comma separation here, but does - // allow repeated arguments. Not sure how this works with query args - repeats override, I - // think? Do we use the host list parser to separate comma-separated hosts? If so, we can just - // forward. The real problem is if the cli depends on repeated args and we can't do them with - // the REST API. Host string `query:"host" doc:"Comma-separated host ranges"` } diff --git a/code/tests/sonalyze/add/add_smoketest.sh b/code/tests/sonalyze/add/add_smoketest.sh index 2de70970..d9dba472 100755 --- a/code/tests/sonalyze/add/add_smoketest.sh +++ b/code/tests/sonalyze/add/add_smoketest.sh @@ -1,5 +1,9 @@ #!/bin/bash +# add was removed in sonalyze v0.15, replaced by the /api/v1/insert protocol, which has its own test +# in ../../sonalyzed. +exit 0 + set -e # No support for `add` in the rust version diff --git a/code/tests/sonalyzed/cluster1-samples.json b/code/tests/sonalyzed/cluster1-samples.json new file mode 100644 index 00000000..fbe1cce8 --- /dev/null +++ b/code/tests/sonalyzed/cluster1-samples.json @@ -0,0 +1 @@ +{"meta":{"producer":"sonar","version":"0.19.0-devel"},"data":{"type":"sample","attributes":{"time":"2026-04-29T09:07:26+02:00","cluster":"cluster1.naic.com","node":"slurm-monitor.uio.no","system":{"boot":"2026-03-17T12:31:36+00:00","cpus":[208738,221031,222182,205890],"disks":[{"name":"sda","major":8,"minor":0,"stats":[514021,5172,69403323,677564,44428518,2656134,720521918,24751595,0,9079067,25429159,0,0,0,0,0,0]},{"name":"sda1","major":8,"minor":1,"stats":[239,1,60844,174,1053,169,946076,4330,0,585,4505,0,0,0,0,0,0]},{"name":"sda2","major":8,"minor":2,"stats":[513436,5171,69319359,677276,44427465,2655965,719575842,24747264,0,9295997,25424541,0,0,0,0,0,0]},{"name":"sdb","major":8,"minor":16,"stats":[522956,5306,138000899,1231738,64519003,2360714,2044300575,35152895,0,11032153,36384633,0,0,0,0,0,0]},{"name":"dm-0","major":253,"minor":0,"stats":[4018,0,180582,2161,872138,0,10613585,446516,0,101628,448677,0,0,0,0,0,0]},{"name":"dm-1","major":253,"minor":1,"stats":[98,0,4440,42,0,0,0,0,0,32,42,0,0,0,0,0,0]},{"name":"dm-2","major":253,"minor":2,"stats":[22492,0,2206349,19000,161137,0,9951307,253045,0,34223,272045,0,0,0,0,0,0]},{"name":"dm-3","major":253,"minor":3,"stats":[602769,0,94293696,956493,31822086,0,1166025739,12132563,0,8287229,13089056,0,0,0,0,0,0]},{"name":"dm-4","major":253,"minor":4,"stats":[149,0,7166,50,69649,0,7363463,63904,0,12262,63954,0,0,0,0,0,0]},{"name":"dm-5","major":253,"minor":5,"stats":[8935,0,8791028,21586,337138,0,12301790,199531,0,83992,221117,0,0,0,0,0,0]},{"name":"dm-6","major":253,"minor":6,"stats":[308442,0,92825653,554902,80642264,0,1557619985,33000537,0,14011841,33555439,0,0,0,0,0,0]}],"used_memory":17741296,"load1":0.23,"load5":0.23,"load15":0.18,"runnable_entities":1,"existing_entities":927},"jobs":[{"job":3210393,"user":"avahi","epoch":195913896,"processes":[{"resident_memory":10200,"virtual_memory":10712,"cmd":"postgres","pid":3210393,"ppid":3208760,"in_container":true,"cpu_avg":0.2,"cpu_time":1444}]},{"job":3209333,"user":"avahi","epoch":195913896,"processes":[{"resident_memory":3068,"virtual_memory":3156,"cmd":"postgres","pid":3209333,"ppid":3208760,"in_container":true,"cpu_time":7}]},{"job":3209025,"user":"systemd-coredump","epoch":195913896,"processes":[{"resident_memory":2232,"virtual_memory":1764,"cmd":"postgres","pid":3209025,"ppid":3208758,"in_container":true,"cpu_time":4}]},{"job":799573,"user":"larstha","epoch":195913896,"processes":[{"resident_memory":2216,"virtual_memory":1720,"cmd":"sshd","pid":799577,"ppid":799573}]},{"job":806463,"user":"avahi","epoch":195913896,"processes":[{"resident_memory":3248,"virtual_memory":3320,"cmd":"postgres","pid":806463,"ppid":3208760,"in_container":true,"cpu_avg":0.1}]},{"job":3209318,"user":"avahi","epoch":195913896,"processes":[{"resident_memory":2176,"virtual_memory":1924,"cmd":"postgres","pid":3209318,"ppid":3208760,"in_container":true,"cpu_time":30}]},{"job":3209036,"user":"systemd-coredump","epoch":195913896,"processes":[{"resident_memory":2488,"virtual_memory":2180,"cmd":"postgres","pid":3209036,"ppid":3208758,"in_container":true,"cpu_time":2}]},{"job":3210271,"user":"systemd-coredump","epoch":195913896,"processes":[{"resident_memory":4072,"virtual_memory":3756,"cmd":"postgres","pid":3210271,"ppid":3208758,"in_container":true,"cpu_time":8}]},{"job":806492,"user":"larstha","epoch":195913896,"processes":[{"resident_memory":1160,"virtual_memory":600,"cmd":"sonar","pid":806492,"ppid":800146,"cpu_avg":100}]},{"job":3208795,"user":"_user_1000","epoch":195913896,"processes":[{"resident_memory":344,"virtual_memory":568,"cmd":"kc.sh","pid":3208795,"ppid":3208700,"in_container":true},{"resident_memory":750756,"virtual_memory":942740,"cmd":"java","pid":3209423,"ppid":3208795,"in_container":true,"num_threads":54,"cpu_avg":0.2,"cpu_time":1630}]},{"job":10071,"user":"larstha","epoch":195913896,"processes":[{"resident_memory":4276,"virtual_memory":20040,"cmd":"(sd-pam)","pid":10072,"ppid":10071},{"resident_memory":2216,"virtual_memory":1472,"data_read":105,"data_written":4,"cmd":"systemd","pid":10071,"ppid":1,"cpu_time":78}]},{"job":3209337,"user":"avahi","epoch":195913896,"processes":[{"resident_memory":3060,"virtual_memory":3160,"cmd":"postgres","pid":3209337,"ppid":3208760,"in_container":true,"cpu_time":6}]},{"job":3210583,"user":"avahi","epoch":195913896,"processes":[{"resident_memory":8620,"virtual_memory":10204,"cmd":"postgres","pid":3210583,"ppid":3208760,"in_container":true,"cpu_time":197}]},{"job":3209024,"user":"systemd-coredump","epoch":195913896,"processes":[{"resident_memory":2272,"virtual_memory":1740,"cmd":"postgres","pid":3209024,"ppid":3208758,"in_container":true}]},{"job":3208758,"user":"systemd-coredump","epoch":195913896,"processes":[{"resident_memory":2192,"virtual_memory":1608,"cmd":"postgres","pid":3208758,"ppid":3208659,"in_container":true,"cpu_time":85}]},{"job":3208802,"user":"_user_1000","epoch":195913896,"processes":[{"resident_memory":1053096,"virtual_memory":1408184,"cmd":"java","pid":3208802,"ppid":3208662,"in_container":true,"num_threads":110,"cpu_avg":1.5,"cpu_time":10817}]},{"job":3280211,"user":"root","epoch":195913896,"processes":[{"resident_memory":624,"virtual_memory":636,"cmd":"bash","pid":3280211,"ppid":3208644,"in_container":true}]},{"job":3280283,"user":"avahi","epoch":195913896,"processes":[{"resident_memory":26884,"virtual_memory":28428,"cmd":"postgres","pid":3280283,"ppid":3208760,"in_container":true,"cpu_time":7}]},{"job":3209317,"user":"avahi","epoch":195913896,"processes":[{"resident_memory":1828,"virtual_memory":1344,"cmd":"postgres","pid":3209317,"ppid":3208760,"in_container":true,"cpu_time":52}]},{"job":3210388,"user":"avahi","epoch":195913896,"processes":[{"resident_memory":27832,"virtual_memory":28216,"cmd":"postgres","pid":3210388,"ppid":3208760,"in_container":true,"cpu_time":116}]},{"job":3209033,"user":"systemd-coredump","epoch":195913896,"processes":[{"resident_memory":2224,"virtual_memory":1608,"cmd":"postgres","pid":3209033,"ppid":3208758,"in_container":true,"cpu_time":4}]},{"job":806479,"user":"avahi","epoch":195913896,"processes":[{"resident_memory":89124,"virtual_memory":103220,"cmd":"postgres","pid":806479,"ppid":3208760,"in_container":true,"cpu_avg":13.3,"cpu_time":3}]},{"job":3208809,"user":"_user_100","epoch":195913896,"processes":[{"resident_memory":626016,"virtual_memory":744540,"cmd":"java","pid":3208809,"ppid":3208683,"in_container":true,"num_threads":37,"cpu_avg":0.1,"cpu_time":717}]},{"job":3208760,"user":"avahi","epoch":195913896,"processes":[{"resident_memory":1736,"virtual_memory":1248,"cmd":"postgres","pid":3208760,"ppid":3208644,"in_container":true,"cpu_time":37360}]},{"job":3209038,"user":"systemd-coredump","epoch":195913896,"processes":[{"resident_memory":2448,"virtual_memory":2168,"cmd":"postgres","pid":3209038,"ppid":3208758,"in_container":true}]},{"job":3209271,"user":"avahi","epoch":195913896,"processes":[{"resident_memory":1984,"virtual_memory":1552,"cmd":"postgres","pid":3209271,"ppid":3208760,"in_container":true,"cpu_time":340}]},{"job":800146,"user":"larstha","epoch":195913896,"processes":[{"resident_memory":620,"virtual_memory":696,"data_read":124108,"data_written":154308,"data_cancelled":72044,"cmd":"bash","pid":800146,"ppid":800121,"cpu_time":25}]},{"job":3209320,"user":"avahi","epoch":195913896,"processes":[{"resident_memory":2108,"virtual_memory":1852,"cmd":"postgres","pid":3209320,"ppid":3208760,"in_container":true}]},{"job":806464,"user":"avahi","epoch":195913896,"processes":[{"resident_memory":27404,"virtual_memory":29664,"cmd":"postgres","pid":806464,"ppid":3208760,"in_container":true,"cpu_avg":5.3,"cpu_time":1}]},{"job":3209272,"user":"avahi","epoch":195913896,"processes":[{"resident_memory":1856,"virtual_memory":1404,"cmd":"postgres","pid":3209272,"ppid":3208760,"in_container":true,"cpu_time":22}]},{"job":3210230,"user":"systemd-coredump","epoch":195913896,"processes":[{"resident_memory":4392,"virtual_memory":4792,"cmd":"postgres","pid":3210230,"ppid":3208758,"in_container":true,"cpu_time":9}]},{"job":3280282,"user":"root","epoch":195913896,"processes":[{"resident_memory":1108,"virtual_memory":552,"cmd":"psql","pid":3280282,"ppid":3280211,"in_container":true,"cpu_time":1}]},{"job":3209319,"user":"avahi","epoch":195913896,"processes":[{"resident_memory":2136,"virtual_memory":1872,"cmd":"postgres","pid":3209319,"ppid":3208760,"in_container":true,"cpu_time":1}]},{"job":799578,"user":"larstha","epoch":195913896,"processes":[{"resident_memory":3568,"virtual_memory":3672,"data_read":256,"data_written":2176,"data_cancelled":1752,"cmd":"bash","pid":799578,"ppid":799577,"cpu_time":1}]},{"job":800121,"user":"larstha","epoch":195913896,"processes":[{"resident_memory":31004,"virtual_memory":36884,"data_read":13,"data_written":172,"cmd":"emacs","pid":800121,"ppid":799578,"cpu_avg":1,"cpu_time":9}]},{"job":3210387,"user":"avahi","epoch":195913896,"processes":[{"resident_memory":17068,"virtual_memory":18096,"cmd":"postgres","pid":3210387,"ppid":3208760,"in_container":true,"cpu_time":14}]}]}}} diff --git a/code/tests/sonalyzed/cluster1-sysinfo.json b/code/tests/sonalyzed/cluster1-sysinfo.json index cd584467..8be3e12f 100644 --- a/code/tests/sonalyzed/cluster1-sysinfo.json +++ b/code/tests/sonalyzed/cluster1-sysinfo.json @@ -1,9 +1 @@ -{ - "timestamp": "2024-03-12T00:00:03+01:00", - "hostname": "c1.cluster1.naic.com", - "description": "2x14 (hyperthreaded) Intel(R) Xeon(R) Gold 5120 CPU @ 2.20GHz, 125 GB, 3x NVIDIA GeForce RTX 2080 Ti @ 11GB", - "cpu_cores": 56, - "mem_gb": 125, - "gpu_cards": 3, - "gpumem_gb": 33 -} +{"meta":{"producer":"sonar","version":"0.19.0-devel"},"data":{"type":"sysinfo","attributes":{"time":"2026-04-29T09:08:04+02:00","cluster":"cluster1.naic.com","node":"slurm-monitor.uio.no","os_name":"Linux","os_release":"5.14.0-611.38.1.el9_7.x86_64","numa_nodes":1,"sockets":4,"cores_per_socket":1,"threads_per_core":1,"cpu_model":"Intel(R) Xeon(R) Gold 6448Y","architecture":"x86_64","memory":49033964,"distances":[[10]]}}} diff --git a/code/tests/sonalyzed/cluster2-samples.json b/code/tests/sonalyzed/cluster2-samples.json new file mode 100644 index 00000000..0e13fe3d --- /dev/null +++ b/code/tests/sonalyzed/cluster2-samples.json @@ -0,0 +1 @@ +{"meta":{"producer":"sonar","version":"0.19.0-devel"},"data":{"type":"sample","attributes":{"time":"2026-04-29T09:07:46+02:00","cluster":"cluster2.naic.com","node":"naic-monitor.uio.no","system":{"boot":"2026-03-17T12:31:36+00:00","cpus":[208739,221032,222183,205890],"disks":[{"name":"sda","major":8,"minor":0,"stats":[514021,5172,69403323,677564,44428732,2656152,720525186,24751706,0,9079098,25429270,0,0,0,0,0,0]},{"name":"sda1","major":8,"minor":1,"stats":[239,1,60844,174,1053,169,946076,4330,0,585,4505,0,0,0,0,0,0]},{"name":"sda2","major":8,"minor":2,"stats":[513436,5171,69319359,677276,44427679,2655983,719579110,24747375,0,9296030,25424652,0,0,0,0,0,0]},{"name":"sdb","major":8,"minor":16,"stats":[522956,5306,138000899,1231738,64519244,2360726,2044305672,35153120,0,11032185,36384859,0,0,0,0,0,0]},{"name":"dm-0","major":253,"minor":0,"stats":[4018,0,180582,2161,872140,0,10613588,446516,0,101628,448677,0,0,0,0,0,0]},{"name":"dm-1","major":253,"minor":1,"stats":[98,0,4440,42,0,0,0,0,0,32,42,0,0,0,0,0,0]},{"name":"dm-2","major":253,"minor":2,"stats":[22492,0,2206349,19000,161139,0,9951310,253046,0,34224,272046,0,0,0,0,0,0]},{"name":"dm-3","major":253,"minor":3,"stats":[602769,0,94293696,956493,31822182,0,1166028742,12132584,0,8287239,13089077,0,0,0,0,0,0]},{"name":"dm-4","major":253,"minor":4,"stats":[149,0,7166,50,69651,0,7363466,63905,0,12263,63955,0,0,0,0,0,0]},{"name":"dm-5","major":253,"minor":5,"stats":[8935,0,8791028,21586,337145,0,12301885,199531,0,83992,221117,0,0,0,0,0,0]},{"name":"dm-6","major":253,"minor":6,"stats":[308442,0,92825653,554902,80642640,0,1557625243,33000781,0,14011893,33555683,0,0,0,0,0,0]}],"used_memory":17726524,"load1":0.24,"load5":0.23,"load15":0.19,"runnable_entities":1,"existing_entities":928},"jobs":[{"job":799573,"user":"larstha","epoch":195913896,"processes":[{"resident_memory":2216,"virtual_memory":1720,"cmd":"sshd","pid":799577,"ppid":799573}]},{"job":800121,"user":"larstha","epoch":195913896,"processes":[{"resident_memory":32428,"virtual_memory":37964,"data_read":13,"data_written":172,"cmd":"emacs","pid":800121,"ppid":799578,"cpu_avg":1.1,"cpu_time":10}]},{"job":3209319,"user":"avahi","epoch":195913896,"processes":[{"resident_memory":2136,"virtual_memory":1872,"cmd":"postgres","pid":3209319,"ppid":3208760,"in_container":true,"cpu_time":1}]},{"job":3209024,"user":"systemd-coredump","epoch":195913896,"processes":[{"resident_memory":2272,"virtual_memory":1740,"cmd":"postgres","pid":3209024,"ppid":3208758,"in_container":true}]},{"job":3210393,"user":"avahi","epoch":195913896,"processes":[{"resident_memory":10200,"virtual_memory":10712,"cmd":"postgres","pid":3210393,"ppid":3208760,"in_container":true,"cpu_avg":0.2,"cpu_time":1444}]},{"job":3209272,"user":"avahi","epoch":195913896,"processes":[{"resident_memory":1856,"virtual_memory":1404,"cmd":"postgres","pid":3209272,"ppid":3208760,"in_container":true,"cpu_time":22}]},{"job":3209337,"user":"avahi","epoch":195913896,"processes":[{"resident_memory":3060,"virtual_memory":3160,"cmd":"postgres","pid":3209337,"ppid":3208760,"in_container":true,"cpu_time":6}]},{"job":3209036,"user":"systemd-coredump","epoch":195913896,"processes":[{"resident_memory":2488,"virtual_memory":2180,"cmd":"postgres","pid":3209036,"ppid":3208758,"in_container":true,"cpu_time":2}]},{"job":3209333,"user":"avahi","epoch":195913896,"processes":[{"resident_memory":3068,"virtual_memory":3156,"cmd":"postgres","pid":3209333,"ppid":3208760,"in_container":true,"cpu_time":7}]},{"job":3210271,"user":"systemd-coredump","epoch":195913896,"processes":[{"resident_memory":4072,"virtual_memory":3756,"cmd":"postgres","pid":3210271,"ppid":3208758,"in_container":true,"cpu_time":8}]},{"job":3209038,"user":"systemd-coredump","epoch":195913896,"processes":[{"resident_memory":2448,"virtual_memory":2168,"cmd":"postgres","pid":3209038,"ppid":3208758,"in_container":true}]},{"job":3209318,"user":"avahi","epoch":195913896,"processes":[{"resident_memory":2176,"virtual_memory":1924,"cmd":"postgres","pid":3209318,"ppid":3208760,"in_container":true,"cpu_time":30}]},{"job":806511,"user":"larstha","epoch":195913896,"processes":[{"resident_memory":1160,"virtual_memory":600,"cmd":"sonar","pid":806511,"ppid":800146,"cpu_avg":100,"cpu_util":9.259}]},{"job":3210230,"user":"systemd-coredump","epoch":195913896,"processes":[{"resident_memory":4392,"virtual_memory":4792,"cmd":"postgres","pid":3210230,"ppid":3208758,"in_container":true,"cpu_time":9}]},{"job":3280211,"user":"root","epoch":195913896,"processes":[{"resident_memory":624,"virtual_memory":636,"cmd":"bash","pid":3280211,"ppid":3208644,"in_container":true}]},{"job":799578,"user":"larstha","epoch":195913896,"processes":[{"resident_memory":3568,"virtual_memory":3672,"data_read":256,"data_written":2176,"data_cancelled":1752,"cmd":"bash","pid":799578,"ppid":799577,"cpu_time":1}]},{"job":3209033,"user":"systemd-coredump","epoch":195913896,"processes":[{"resident_memory":2224,"virtual_memory":1608,"cmd":"postgres","pid":3209033,"ppid":3208758,"in_container":true,"cpu_time":4}]},{"job":3208758,"user":"systemd-coredump","epoch":195913896,"processes":[{"resident_memory":2192,"virtual_memory":1608,"cmd":"postgres","pid":3208758,"ppid":3208659,"in_container":true,"cpu_time":85}]},{"job":806463,"user":"avahi","epoch":195913896,"processes":[{"resident_memory":3248,"virtual_memory":3320,"cmd":"postgres","pid":806463,"ppid":3208760,"in_container":true}]},{"job":3209320,"user":"avahi","epoch":195913896,"processes":[{"resident_memory":2108,"virtual_memory":1852,"cmd":"postgres","pid":3209320,"ppid":3208760,"in_container":true}]},{"job":3209317,"user":"avahi","epoch":195913896,"processes":[{"resident_memory":1828,"virtual_memory":1344,"cmd":"postgres","pid":3209317,"ppid":3208760,"in_container":true,"cpu_time":52}]},{"job":3208760,"user":"avahi","epoch":195913896,"processes":[{"resident_memory":1736,"virtual_memory":1248,"cmd":"postgres","pid":3208760,"ppid":3208644,"in_container":true,"cpu_time":37360}]},{"job":3280282,"user":"root","epoch":195913896,"processes":[{"resident_memory":1108,"virtual_memory":552,"cmd":"psql","pid":3280282,"ppid":3280211,"in_container":true,"cpu_time":1}]},{"job":806464,"user":"avahi","epoch":195913896,"processes":[{"resident_memory":27404,"virtual_memory":29664,"cmd":"postgres","pid":806464,"ppid":3208760,"in_container":true,"cpu_avg":2.7,"cpu_time":1}]},{"job":3210583,"user":"avahi","epoch":195913896,"processes":[{"resident_memory":8620,"virtual_memory":10204,"cmd":"postgres","pid":3210583,"ppid":3208760,"in_container":true,"cpu_time":197}]},{"job":800146,"user":"larstha","epoch":195913896,"processes":[{"resident_memory":620,"virtual_memory":696,"data_read":124108,"data_written":154320,"data_cancelled":72044,"cmd":"bash","pid":800146,"ppid":800121,"cpu_time":25}]},{"job":3208809,"user":"_user_100","epoch":195913896,"processes":[{"resident_memory":626016,"virtual_memory":744540,"cmd":"java","pid":3208809,"ppid":3208683,"in_container":true,"num_threads":37,"cpu_avg":0.1,"cpu_time":718}]},{"job":3209025,"user":"systemd-coredump","epoch":195913896,"processes":[{"resident_memory":2232,"virtual_memory":1764,"cmd":"postgres","pid":3209025,"ppid":3208758,"in_container":true,"cpu_time":4}]},{"job":3208802,"user":"_user_1000","epoch":195913896,"processes":[{"resident_memory":1053096,"virtual_memory":1408184,"cmd":"java","pid":3208802,"ppid":3208662,"in_container":true,"num_threads":110,"cpu_avg":1.5,"cpu_time":10817}]},{"job":3280283,"user":"avahi","epoch":195913896,"processes":[{"resident_memory":26884,"virtual_memory":28428,"cmd":"postgres","pid":3280283,"ppid":3208760,"in_container":true,"cpu_time":7}]},{"job":10071,"user":"larstha","epoch":195913896,"processes":[{"resident_memory":4276,"virtual_memory":20040,"cmd":"(sd-pam)","pid":10072,"ppid":10071},{"resident_memory":2216,"virtual_memory":1472,"data_read":105,"data_written":4,"cmd":"systemd","pid":10071,"ppid":1,"cpu_time":78}]},{"job":3209271,"user":"avahi","epoch":195913896,"processes":[{"resident_memory":1984,"virtual_memory":1552,"cmd":"postgres","pid":3209271,"ppid":3208760,"in_container":true,"cpu_time":340}]},{"job":3210387,"user":"avahi","epoch":195913896,"processes":[{"resident_memory":17068,"virtual_memory":18096,"cmd":"postgres","pid":3210387,"ppid":3208760,"in_container":true,"cpu_time":14}]},{"job":3210388,"user":"avahi","epoch":195913896,"processes":[{"resident_memory":27832,"virtual_memory":28216,"cmd":"postgres","pid":3210388,"ppid":3208760,"in_container":true,"cpu_time":116}]},{"job":3208795,"user":"_user_1000","epoch":195913896,"processes":[{"resident_memory":344,"virtual_memory":568,"cmd":"kc.sh","pid":3208795,"ppid":3208700,"in_container":true},{"resident_memory":750756,"virtual_memory":942740,"cmd":"java","pid":3209423,"ppid":3208795,"in_container":true,"num_threads":54,"cpu_avg":0.2,"cpu_time":1630}]},{"job":806479,"user":"avahi","epoch":195913896,"processes":[{"resident_memory":89124,"virtual_memory":103220,"cmd":"postgres","pid":806479,"ppid":3208760,"in_container":true,"cpu_avg":6.7,"cpu_time":3}]}]}}} diff --git a/code/tests/sonalyzed/cluster2-sysinfo.json b/code/tests/sonalyzed/cluster2-sysinfo.json index 426cdff8..4dda0175 100644 --- a/code/tests/sonalyzed/cluster2-sysinfo.json +++ b/code/tests/sonalyzed/cluster2-sysinfo.json @@ -1,9 +1 @@ -{ - "timestamp": "2024-04-01T00:00:02+02:00", - "hostname": "c2.cluster2.naic.com", - "description": "2x48 (hyperthreaded) AMD EPYC 7642 48-Core Processor, 1007 GiB, 4x NVIDIA GeForce RTX 3090 @ 24GiB", - "cpu_cores": 192, - "mem_gb": 1007, - "gpu_cards": 4, - "gpumem_gb": 96 -} +{"meta":{"producer":"sonar","version":"0.19.0-devel"},"data":{"type":"sysinfo","attributes":{"time":"2026-04-29T09:08:11+02:00","cluster":"cluster2.naic.com","node":"naic-monitor.uio.no","os_name":"Linux","os_release":"5.14.0-611.38.1.el9_7.x86_64","numa_nodes":1,"sockets":4,"cores_per_socket":1,"threads_per_core":1,"cpu_model":"Intel(R) Xeon(R) Gold 6448Y","architecture":"x86_64","memory":49033964,"distances":[[10]]}}} diff --git a/code/tests/sonalyzed/cluster1-samples.csv b/code/tests/sonalyzed/obsolete/cluster1-samples.csv similarity index 100% rename from code/tests/sonalyzed/cluster1-samples.csv rename to code/tests/sonalyzed/obsolete/cluster1-samples.csv diff --git a/code/tests/sonalyzed/obsolete/cluster1-sysinfo.json b/code/tests/sonalyzed/obsolete/cluster1-sysinfo.json new file mode 100644 index 00000000..cd584467 --- /dev/null +++ b/code/tests/sonalyzed/obsolete/cluster1-sysinfo.json @@ -0,0 +1,9 @@ +{ + "timestamp": "2024-03-12T00:00:03+01:00", + "hostname": "c1.cluster1.naic.com", + "description": "2x14 (hyperthreaded) Intel(R) Xeon(R) Gold 5120 CPU @ 2.20GHz, 125 GB, 3x NVIDIA GeForce RTX 2080 Ti @ 11GB", + "cpu_cores": 56, + "mem_gb": 125, + "gpu_cards": 3, + "gpumem_gb": 33 +} diff --git a/code/tests/sonalyzed/cluster2-samples.csv b/code/tests/sonalyzed/obsolete/cluster2-samples.csv similarity index 100% rename from code/tests/sonalyzed/cluster2-samples.csv rename to code/tests/sonalyzed/obsolete/cluster2-samples.csv diff --git a/code/tests/sonalyzed/obsolete/cluster2-sysinfo.json b/code/tests/sonalyzed/obsolete/cluster2-sysinfo.json new file mode 100644 index 00000000..426cdff8 --- /dev/null +++ b/code/tests/sonalyzed/obsolete/cluster2-sysinfo.json @@ -0,0 +1,9 @@ +{ + "timestamp": "2024-04-01T00:00:02+02:00", + "hostname": "c2.cluster2.naic.com", + "description": "2x48 (hyperthreaded) AMD EPYC 7642 48-Core Processor, 1007 GiB, 4x NVIDIA GeForce RTX 3090 @ 24GiB", + "cpu_cores": 192, + "mem_gb": 1007, + "gpu_cards": 4, + "gpumem_gb": 96 +} diff --git a/code/tests/sonalyzed/obsolete/simple.sh.obsolete b/code/tests/sonalyzed/obsolete/simple.sh.obsolete new file mode 100755 index 00000000..f1b6c994 --- /dev/null +++ b/code/tests/sonalyzed/obsolete/simple.sh.obsolete @@ -0,0 +1,72 @@ +#!/bin/bash +# +# OBSOLETE test - the `add` verb is gone now. Preserving this for posterity, until we're sure we don't need it. + +set -e + +# No support for `add` in the rust version +if [[ $($SONALYZE version) =~ sonalyze-rs ]]; then + echo "Skipping sonalyzed tests for sonalyze-rs" + exit 0 +fi + +# Note if `sonalyze daemon` fails on startup the `set -e` will not catch it because the server is +# run in the background. In this case, $sonalyzed_pid will reference a process that is not there. + +rootdir=test-root +testport=24680 +rm -rf $rootdir + +# Set up a test jobanalyzer directory structure + +mkdir -p $rootdir $rootdir/cluster-config $rootdir/data/cluster{1,2}.naic.com +cp $SONALYZE $rootdir +cp cluster-aliases.json $rootdir/cluster-config +cp cluster1.naic.com-config.json $rootdir/cluster-config/cluster1.naic.com-config.json +cp cluster2.naic.com-config.json $rootdir/cluster-config/cluster2.naic.com-config.json + +# Run the server in the background against that directory + +$SONALYZE daemon -v \ + -jobanalyzer-dir $rootdir \ + -port $testport \ + -upload-auth upload-auth.txt \ + -analysis-auth analysis-auth.txt \ + -match-user-and-cluster & +sonalyzed_pid=$! + +# Always attempt to shut down the server on exit. (Not sure if the HUP/INT are necessary or if they +# are subsumed by EXIT.) +trap "kill -HUP $sonalyzed_pid" EXIT ERR SIGHUP SIGINT + +# Wait for sonalyzed to come up +sleep 1 + +# First, try to insert some data and verify that the data have been added as expected + +curl --fail-with-body --data-binary @cluster1-samples.csv -H 'Content-Type: text/csv' -u cluster1.naic.com:hohoho \ + http://localhost:$testport/sonar-freecsv?cluster=cluster1.naic.com + +curl --fail-with-body --data-binary @cluster1-sysinfo.json -H 'Content-Type: application/json' -u cluster1.naic.com:hohoho \ + http://localhost:$testport/sysinfo?cluster=cluster1.naic.com + +curl --fail-with-body --data-binary @cluster2-samples.csv -H 'Content-Type: text/csv' -u cluster2.naic.com:hahaha \ + http://localhost:$testport/sonar-freecsv?cluster=cluster2.naic.com + +curl --fail-with-body --data-binary @cluster2-sysinfo.json -H 'Content-Type: application/json' -u cluster2.naic.com:hahaha \ + http://localhost:$testport/sysinfo?cluster=cluster2.naic.com + +sleep 1 +# Note input data tagged say 2023-09-15T00:00:nn+02:00 are normalized to 2023-09-14. Etc. +cmp cluster1-samples.csv $rootdir/data/cluster1.naic.com/2023/09/14/c1.cluster1.naic.com.csv +cmp cluster1-sysinfo.json $rootdir/data/cluster1.naic.com/2024/03/11/sysinfo-c1.cluster1.naic.com.json +cmp cluster2-samples.csv $rootdir/data/cluster2.naic.com/2023/09/13/c2.cluster2.naic.com.csv +cmp cluster2-sysinfo.json $rootdir/data/cluster2.naic.com/2024/03/31/sysinfo-c2.cluster2.naic.com.json + +# Then, try to run a jobs command and verify that the result is what we expect + +output=$(curl --silent --fail-with-body -G -u john:jj \ + "http://localhost:$testport/jobs?cluster=cluster1.naic.com&job=2712710&from=2023-09-01&fmt=noheader,csv,std,cpu,mem") +CHECK "jobs_1" "2712710!,hermanno,0d0h20m,c1.cluster1.naic.com,3,7,14,14" "$output" + +rm -rf $rootdir diff --git a/code/tests/sonalyzed/simple.sh b/code/tests/sonalyzed/simple.sh index 441427d0..50c0ec3a 100755 --- a/code/tests/sonalyzed/simple.sh +++ b/code/tests/sonalyzed/simple.sh @@ -2,17 +2,11 @@ set -e -# No support for `add` in the rust version -if [[ $($SONALYZE version) =~ sonalyze-rs ]]; then - echo "Skipping sonalyzed tests for sonalyze-rs" - exit 0 -fi - # Note if `sonalyze daemon` fails on startup the `set -e` will not catch it because the server is # run in the background. In this case, $sonalyzed_pid will reference a process that is not there. rootdir=test-root -testport=24680 +testapi=127.0.0.1:4545 rm -rf $rootdir # Set up a test jobanalyzer directory structure @@ -27,10 +21,10 @@ cp cluster2.naic.com-config.json $rootdir/cluster-config/cluster2.naic.com-confi $SONALYZE daemon -v \ -jobanalyzer-dir $rootdir \ - -port $testport \ + -rest-api $testapi \ + -insert \ -upload-auth upload-auth.txt \ - -analysis-auth analysis-auth.txt \ - -match-user-and-cluster & + -analysis-auth analysis-auth.txt & sonalyzed_pid=$! # Always attempt to shut down the server on exit. (Not sure if the HUP/INT are necessary or if they @@ -42,29 +36,49 @@ sleep 1 # First, try to insert some data and verify that the data have been added as expected -curl --fail-with-body --data-binary @cluster1-samples.csv -H 'Content-Type: text/csv' -u cluster1.naic.com:hohoho \ - http://localhost:$testport/sonar-freecsv?cluster=cluster1.naic.com +curl --fail-with-body --data-binary @cluster1-samples.json -H 'Content-Type: application/json' -u cluster1.naic.com:hohoho \ + http://$testapi/api/v1/insert/sample curl --fail-with-body --data-binary @cluster1-sysinfo.json -H 'Content-Type: application/json' -u cluster1.naic.com:hohoho \ - http://localhost:$testport/sysinfo?cluster=cluster1.naic.com + http://$testapi/api/v1/insert/sysinfo -curl --fail-with-body --data-binary @cluster2-samples.csv -H 'Content-Type: text/csv' -u cluster2.naic.com:hahaha \ - http://localhost:$testport/sonar-freecsv?cluster=cluster2.naic.com +curl --fail-with-body --data-binary @cluster2-samples.json -H 'Content-Type: application/json' -u cluster2.naic.com:hahaha \ + http://$testapi/api/v1/insert/sample curl --fail-with-body --data-binary @cluster2-sysinfo.json -H 'Content-Type: application/json' -u cluster2.naic.com:hahaha \ - http://localhost:$testport/sysinfo?cluster=cluster2.naic.com + http://$testapi/api/v1/insert/sysinfo sleep 1 + # Note input data tagged say 2023-09-15T00:00:nn+02:00 are normalized to 2023-09-14. Etc. -cmp cluster1-samples.csv $rootdir/data/cluster1.naic.com/2023/09/14/c1.cluster1.naic.com.csv -cmp cluster1-sysinfo.json $rootdir/data/cluster1.naic.com/2024/03/11/sysinfo-c1.cluster1.naic.com.json -cmp cluster2-samples.csv $rootdir/data/cluster2.naic.com/2023/09/13/c2.cluster2.naic.com.csv -cmp cluster2-sysinfo.json $rootdir/data/cluster2.naic.com/2024/03/31/sysinfo-c2.cluster2.naic.com.json +# +# Also note that the /api/v1 ingestion path *parses* the input and then serializes it again, so the +# results are not bitwise comparable (unlike before). This is not ideal but was an inevitable +# consequence of the rewrite. In any case, we must use jq here to sort the fields before we compare +# the files. + +jq -S < cluster1-samples.json > $rootdir/a +jq -S < $rootdir/data/cluster1.naic.com/2026/04/29/0+sample-slurm-monitor.uio.no.json > $rootdir/b +cmp $rootdir/a $rootdir/b + +jq -S < cluster1-sysinfo.json > $rootdir/a +jq -S < $rootdir/data/cluster1.naic.com/2026/04/29/0+sysinfo-slurm-monitor.uio.no.json > $rootdir/b +cmp $rootdir/a $rootdir/b + +jq -S < cluster2-samples.json > $rootdir/a +jq -S < $rootdir/data/cluster2.naic.com/2026/04/29/0+sample-naic-monitor.uio.no.json > $rootdir/b +cmp $rootdir/a $rootdir/b + +jq -S < cluster2-sysinfo.json > $rootdir/a +jq -S < $rootdir/data/cluster2.naic.com/2026/04/29/0+sysinfo-naic-monitor.uio.no.json > $rootdir/b +cmp $rootdir/a $rootdir/b -# Then, try to run a jobs command and verify that the result is what we expect +# Then, try to run queries and verify that the result is what we expect. API 0 returns a JSON +# string that must be parsed to get the expected output. output=$(curl --silent --fail-with-body -G -u john:jj \ - "http://localhost:$testport/jobs?cluster=cluster1.naic.com&job=2712710&from=2023-09-01&fmt=noheader,csv,std,cpu,mem") -CHECK "jobs_1" "2712710!,hermanno,0d0h20m,c1.cluster1.naic.com,3,7,14,14" "$output" + "http://127.0.0.1:4545/api/v0/node?from=2026-04-29&cluster=cluster1.naic.com&fmt=default,csv,noheader" \ + | jq -r) +CHECK "node_1" 'slurm-monitor.uio.no,4,47,0,0,"4x1 Intel(R) Xeon(R) Gold 6448Y, 47 GiB"' "$output" rm -rf $rootdir From 0d72a06dc7f5e522263946c9e550082d8ae55fee Mon Sep 17 00:00:00 2001 From: Lars T Hansen Date: Wed, 29 Apr 2026 10:28:57 +0200 Subject: [PATCH 3/4] Tweaks and doc --- code/sonalyze/daemon/daemon.go | 47 ++++++------ code/sonalyze/daemon/perform.go | 32 +++++---- code/sonalyze/doc/HOWTO-AUTH.md | 17 +++-- code/sonalyze/doc/HOWTO-RESTAPI.md | 112 +++++++++++------------------ code/tests/sonalyzed/simple.sh | 2 + 5 files changed, 98 insertions(+), 112 deletions(-) diff --git a/code/sonalyze/daemon/daemon.go b/code/sonalyze/daemon/daemon.go index e133c070..46066c20 100644 --- a/code/sonalyze/daemon/daemon.go +++ b/code/sonalyze/daemon/daemon.go @@ -1,19 +1,7 @@ // `sonalyze daemon` - HTTP server that runs sonalyze on behalf of a remote client // // This server responds to GET and POST requests carrying parameters that specify how to run -// sonalyze against a local data store. The path for analysis commands is the sonalyze command -// name, eg, `GET /jobs?...` will run `sonalyze jobs`. The path for add commands is either `POST -// /add?...` with the appropriate arguments, or for backward compatibility with existing infra, a -// keyword describing the data, eg `POST /sonar-freecsv?...` will run `sonalyze add -sample`. -// -// A query parameter `cluster=clusterName` is required for all requests, it names the cluster we're -// operating within and determines a bunch of file paths. -// -// Other parameter names are always the long parameter names for sonalyze and the parameter values -// are always urlencoded as necessary; parameter-less flags default to not-present. Most parameters -// and names are forwarded to sonalyze, with eg --data-path and --config-file supplied by this code. -// The returned output is the raw output from sonalyze, whether for success or error. A successful -// runs yields 2xx and an error yields 4xx or 5xx. +// sonalyze against a local data store. See doc/HOWTO-RESTAPI.md. // // Arguments: // @@ -72,6 +60,21 @@ // or less aligned with the v0 API but with clean JSON output and not the idiosyncrasies of v0, // and under /api/v2 there is a subset of the slurm-monitor REST API v2. // +// The individual APIs are all disabled by default. Pass -v0, -v1, and -v2 to enable them. +// +// -v0 +// +// Enable the v0 API. +// +// -v1 +// +// Enable the v1 API. +// +// -v2 +// +// Enable the v2 API. Note this does not use the analysis-auth, having been set up for OAUTH +// authentication, and may require additional work to be safe. +// // -insert // // Enable the /api/v1/insert points in the REST API. Normally this API is enabled only when @@ -119,12 +122,14 @@ type DaemonCommand struct { DevArgs VerboseArgs DatabaseArgs - getAuthFile string - postAuthFile string - matchUserAndCluster bool - kafkaBroker string - restAPI string - insert bool + getAuthFile string + postAuthFile string + kafkaBroker string + restAPI string + insert bool + v0 bool + v1 bool + v2 bool getAuthenticator *auth.Authenticator postAuthenticator *auth.Authenticator @@ -144,11 +149,13 @@ func (dc *DaemonCommand) Add(fs *CLI) { fs.Group("daemon-configuration") fs.StringVar(&dc.getAuthFile, "analysis-auth", "", "Authentication info `filename` for analysis access") fs.StringVar(&dc.postAuthFile, "upload-auth", "", "Authentication info `filename` for data upload access") - fs.BoolVar(&dc.matchUserAndCluster, "match-user-and-cluster", false, "Require user name to match cluster name") fs.StringVar(&dc.getAuthFile, "password-file", "", "Alias for -analysis-auth") fs.StringVar(&dc.kafkaBroker, "kafka", "", "Ingest data from this broker for all known clusters") fs.StringVar(&dc.restAPI, "rest-api", "", "Serve /api/v0, /api/v1 and /api/v2 on this interface:port") fs.BoolVar(&dc.insert, "insert", false, "Enable the /api/v1/insert points") + fs.BoolVar(&dc.v0, "v0", false, "Enable the v0 API") + fs.BoolVar(&dc.v1, "v1", false, "Enable the v1 API") + fs.BoolVar(&dc.v2, "v2", false, "Enable the v2 API") } //go:embed summary.txt diff --git a/code/sonalyze/daemon/perform.go b/code/sonalyze/daemon/perform.go index 2e9e30cd..fc6e8365 100644 --- a/code/sonalyze/daemon/perform.go +++ b/code/sonalyze/daemon/perform.go @@ -56,19 +56,25 @@ func (dc *DaemonCommand) RunDaemon(_ io.Reader, _, stderr io.Writer) error { if dc.restAPI != "" { api := apiutil.CreateAPI(dc.restAPI) - api0.SetupAPI( - api, - dc.JobanalyzerDir(), - dc.DatabaseURI(), - dc.cmdlineHandler, - dc.getAuthenticator, - ) - api1.SetupAPI( - api, - dc.insert, - dc.postAuthenticator, - ) - api2.SetupAPI(api) + if dc.v0 { + api0.SetupAPI( + api, + dc.JobanalyzerDir(), + dc.DatabaseURI(), + dc.cmdlineHandler, + dc.getAuthenticator, + ) + } + if dc.v1 { + api1.SetupAPI( + api, + dc.insert, + dc.postAuthenticator, + ) + } + if dc.v2 { + api2.SetupAPI(api) + } apiutil.RunAPI() } diff --git a/code/sonalyze/doc/HOWTO-AUTH.md b/code/sonalyze/doc/HOWTO-AUTH.md index 13761062..159a6dbf 100644 --- a/code/sonalyze/doc/HOWTO-AUTH.md +++ b/code/sonalyze/doc/HOWTO-AUTH.md @@ -40,20 +40,19 @@ There is none: every known user has superuser privileges on all clusters, ie, th for all users. See "Better authorization" below. -## Upload (`add`, etc) authentication and authorization +## Upload (`/api/v1/insert`) authentication and authorization ### Design -For upload (the `add` command and its aliases) authentication first checks that the provided -username and password exist in the upload authorization data base. A request comes in to the HTTP -server with username/password U/P (encoded as an HTTP header asking for the simple HTTP -authentication). Identity checking is performed by the daemon before the request is acted on. If -the check fails the request is rejected. +For upload authentication first checks that the provided username and password exist in the upload +authorization data base. A request comes in to the HTTP server with username/password U/P (encoded +as an HTTP header asking for the simple HTTP authentication). Identity checking is performed by the +daemon before the request is acted on. If the check fails the request is rejected. Then an authorization check is made that the provided username matches the cluster argument given to -`add` - different clusters have different upload passwords, which are secrets they own. This check -makes it impossible for somebody with only the password for one cluster to spoof data for another -cluster. +`/api/v1/insert` - different clusters have different upload passwords, which are secrets they own. This +check makes it impossible for somebody with only the password for one cluster to spoof data for +another cluster. Note that if the cluster name is embedded in the data there is not generally a check that it matches the name given in the upload request. diff --git a/code/sonalyze/doc/HOWTO-RESTAPI.md b/code/sonalyze/doc/HOWTO-RESTAPI.md index 8849006f..4005e207 100644 --- a/code/sonalyze/doc/HOWTO-RESTAPI.md +++ b/code/sonalyze/doc/HOWTO-RESTAPI.md @@ -1,88 +1,60 @@ # The REST APIs -Sonalyze running in daemon mode presents two different REST-style APIs depending on options. +Sonalyze running in daemon mode can present a REST-style API to local and remote clients. This API has +multiple versions. -## Classical REST API +To enable the REST APIs, pass the `-rest-api` switch to the daemon. It takes an interface value, +frequently something like `127.0.0.1:8888`. Additionally, individual APIs must be enabled with +`-v0`, `-v1`, and `-v2` (several can be enabled at the same time). -The first is the "classical" API described by the doc comment at the start of daemon/daemon.go and -below as "REST API v0". This API is currently always active in daemon mode. The listen port is -8087 by default but can be overridden with the `-port` argument. +All requests to the API start with `/api/vK` where K is a version number, currently 0, 1, or 2. For +example, `/api/v2/cluster/my.cluster.name/nodes/info`. -Via this API, sonalyze processes top-level requests that take the form of sonalyze commands as -described for the non-daemon mode: `/jobs?user=x&from=y&to=z` corresponds directly to the command -`sonalyze jobs -user x -from y -to z`. - -At this time, there is no way of extracting API information from this API or generating client code -from such a spec; read below or the source code. (Almost certainly this API will be reimagined as a -proper REST API implementation before long, mitigating these problems.) - -Authentication is via HTTP basic authentication, ie, username/password headers. The API checks that -the credentials allow access to the data by looking them up in an internal user database - see -[TECHNICAL.md](TECHNICAL.md). There are separate authentication realms for insertion and lookup. - -For convenience, Sonalyze, with the -remote option, translates a "local" command to an API call in -the former style (with authentication), but there's nothing special about this: under the hood it is -currently just a curl invocation of the translated request. See MANUAL.md. - -## Slurm-monitor REST API - -The second API is a partial [slurm-monitor](https://github.com/2maz/slurm-monitor) style API. This -is a proper REST API built on modern infrastructure. It is off by default but is enabled with the -`-rest-api` argument to the daemon, which takes an interface value, frequently something like -`127.0.0.1:8888`. All requests start with `/api/v2` - `/api/v2/cluster/my.cluster.name/nodes/info`. - -Documentation is available via `https://127.0.0.1:8888/openapi.yaml` (or .json) when the server is -running on that interface. +Documentation is available via `https://127.0.0.1:8888/openapi.yaml` (or `openapi.json`) when the +server is running on that interface, this will describe the v0, v1, and v2 versions. -Authentication for this API is via OAUTH and is in principle set up so that only a super-user can -query data for other users than themselves. +Authentication for v0 and v1 is via HTTP basic authentication, ie, username/password headers. The +API checks that the credentials allow access to the data by looking them up in an internal user +database - see [TECHNICAL.md](TECHNICAL.md). There are separate authentication realms for insertion +and lookup. ## REST API v0 -For the following to make sense you need to be familiar with data model and command line syntax, see -MANUAL.md. The API mimics the sonalyze command line, and (except for some proscribed parameters) -every argument is effectively passed through to a recursive invocation of sonalyze on the server. A -request `/jobs?user=x&from=y&to=z` becomes the command `sonalyze jobs -user x -from y -to z`. - -The request URL is always `?`. The Jobanalyzer HTTP server is generally set up so that -these requests must be top-level: `/?`. - -The `` is one of the verbs accepted by sonalyze on the command line, run `sonalyze help` for -the full list. - -For the `add` verb the HTTP operation must be `POST` and the payload to be inserted into the -database is the body of the the request. +The v0 API is close to the "classical" Sonalyze REST API and follows the command line syntax +closely. A GET request to `/api/v0/jobs?...` will be a jobs query, for example, and the query +arguments are the same as for the jobs command at the command line. Parameter names are always the +long parameter names for sonalyze (`user` not `u`, `from` not `f`). -For the other verbs the HTTP operation must be `GET`. +The returned output is the raw output from sonalyze, whether for success or error, encoded as a JSON +string (which must be parsed by the consumer). A successful run yields 2xx and an error yields 4xx +or 5xx. -Query parameters are always URL-encoded and separated by `&` in the normal way. - -Query parameters that carry values are specified as `name=value`, with the value presented in the -syntax required by the sonalyze verb in question, eg `host=gpu-[1,4-8],c[1,2]-[8,9]` or -`user=frobnitz`. +Via this API, sonalyze processes top-level requests that take the form of sonalyze commands as +described for the non-daemon mode: `/api/v0/jobs?cluster=c&user=x&from=y&to=z` corresponds directly +to the command `sonalyze jobs -cluster c -user x -from y -to z`. -Value-less query parameters (flags) are a special case. The value must be a boolean value, `true` -or `false` (`some-gpu=true`). Passing a parameter with a `false` value is redundant, and it would -be better to omit the parameter. Also, while many "boolean" values are accepted by the current -flags parser, please stick to `true` or `false` if you use a value at all. +For convenience, Sonalyze, with the -remote option, translates a "local" command to a v0 API call +(with authentication), but there's nothing special about this: under the hood it is currently just a +curl invocation of the translated request. See MANUAL.md. -By and large, all parameters accepted by `sonalyze` are accepted as query parameters, with the same -name and syntax for both the parameter names (without the leading `-`) and parameter values. Try -`sonalyze help` or `sonalyze -h` for more information, read MANUAL.md in this directory, or -examine the code. +## REST API v1 -Some parameters are scrubbed by `sonalyze` when it constructs the remote request, and various -consistency checks are applied. Errors are signalled for bad behavior. +The v1 API is intended to follow the v0 API, with the difference that where the v0 API always +returns a JSON string for all output types, the v1 API will return plain JSON data. Additionally, +the v0 API (following the classical Sonalyze REST API), when it returns JSON encoded data (with +`-fmt=json`), encodes all field values as strings. The v1 API will use natural encodings. -When constructing a query by hand, however, there are no client-side restrictions, but the server -will quietly ignore the query parameters `cpuprofile`, `data-dir`, `data-path`, `remote`, -`auth-file`, `config-file`, `v`, `verbose`, and `raw`. +At the moment, the v1 API presents only a data insertion API that is new (the old v0 data insertion +API being obsoleted since those data formats are no longer supported). A POST to +`/api/v1/insert/` will present data of the given `` (sample, sysinfo, job, cluster) for +insertion in the data store. The data must be presented as JSON and have the form defined by the +Sonar data format spec. -The `cluster` parameter is required except for with the `cluster` verb. +## REST API v2 - the slurm-monitor REST API -The server will infer `config-file` and `data-dir` from the `cluster` parameter, as appropriate. +The v2 API is a *partial* and *probably buggy* +[slurm-monitor](https://github.com/2maz/slurm-monitor) style API. -Query URLs are limited in length by parts of the infrastructure (and possibly by underlying web -standards). Very long lists of e.g. job IDs used for selection criteria may result in errors being -reported. The workaround for this is currently to either run multiple queries and merge the -results, or to query less selectively and filter the data on the client side. +Authentication for this API is via OAUTH and is in principle set up so that only a super-user can +query data for other users than themselves. Since this authentication scheme is poorly integrated +with Sonalyze at this time, the switch `-v2` must be passed to the daemon to enable this API. diff --git a/code/tests/sonalyzed/simple.sh b/code/tests/sonalyzed/simple.sh index 50c0ec3a..8b8d9f14 100755 --- a/code/tests/sonalyzed/simple.sh +++ b/code/tests/sonalyzed/simple.sh @@ -22,6 +22,8 @@ cp cluster2.naic.com-config.json $rootdir/cluster-config/cluster2.naic.com-confi $SONALYZE daemon -v \ -jobanalyzer-dir $rootdir \ -rest-api $testapi \ + -v0 \ + -v1 \ -insert \ -upload-auth upload-auth.txt \ -analysis-auth analysis-auth.txt & From 4cfe80e73985896ad2976c47cb6efe9a547a85ca Mon Sep 17 00:00:00 2001 From: Lars T Hansen Date: Wed, 29 Apr 2026 10:57:33 +0200 Subject: [PATCH 4/4] Decode the JSON-encoded string for /api/v0 --- code/sonalyze/application/remote.go | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/code/sonalyze/application/remote.go b/code/sonalyze/application/remote.go index 749cccf5..91e74b7d 100644 --- a/code/sonalyze/application/remote.go +++ b/code/sonalyze/application/remote.go @@ -5,6 +5,7 @@ package application import ( "bufio" "context" + "encoding/json" "errors" "fmt" "io" @@ -128,6 +129,14 @@ func RemoteOperation(rCmd Command, verb string, stdin io.Reader, stdout, stderr err = command.Run() + // In principle, stdout is *always* encoded as a single JSON string, but if decoding fails fall + // back to the raw output. + stdoutString := newStdout.String() + var out string + if json.Unmarshal([]byte(stdoutString), &out) != nil { + out = stdoutString + } + // If there is a processing error on the remote end then the server will respond with a 400 code // and the text that would otherwise go to stderr, see runSonalyze() in daemon/perform.go. That // translates as a non-nil error with code 22 here, and the error message is on our local @@ -143,7 +152,7 @@ func RemoteOperation(rCmd Command, verb string, stdin io.Reader, stdout, stderr if xe, ok := err.(*exec.ExitError); ok { switch xe.ExitCode() { case 22: - return fmt.Errorf("Remote: %s", newStdout.String()) + return fmt.Errorf("Remote: %s", out) case 5, 6, 7: return fmt.Errorf("Failed to resolve remote host (or proxy). Exit code %v, stderr=%s", xe.ExitCode(), string(xe.Stderr)) @@ -155,6 +164,6 @@ func RemoteOperation(rCmd Command, verb string, stdin io.Reader, stdout, stderr } // print, not println, or we end up adding a blank line that confuses consumers - fmt.Fprint(stdout, newStdout.String()) + fmt.Fprint(stdout, out) return nil }