diff --git a/code/sonalyze/Makefile b/code/sonalyze/Makefile index 89c4a603..0d8fd830 100644 --- a/code/sonalyze/Makefile +++ b/code/sonalyze/Makefile @@ -6,7 +6,7 @@ SUBDIRS=application \ common \ daemon \ data/card data/common data/cpusample data/gpusample data/node data/sample \ - data/slurmjob data/slurmnode data/slurmpart \ + data/samplejob data/slurmjob data/slurmnode data/slurmpart \ db db/errs db/filedb db/parse db/repr db/special \ table diff --git a/code/sonalyze/application/local.go b/code/sonalyze/application/local.go index b2da4cff..5fbdfd7d 100644 --- a/code/sonalyze/application/local.go +++ b/code/sonalyze/application/local.go @@ -5,10 +5,15 @@ package application import ( "fmt" "io" + "time" . "sonalyze/cmd" + "sonalyze/cmd/jobs" + . "sonalyze/common" "sonalyze/data/sample" "sonalyze/db" + "sonalyze/db/repr" + "sonalyze/db/special" ) // Clearly, for `jobs` the file list thing is tricky b/c the list can be *either* sample data *or* @@ -18,22 +23,7 @@ import ( func LocalSampleOperation(command SampleAnalysisCommand, _ io.Reader, stdout, stderr io.Writer) error { args := command.SampleAnalysisFlags() - - var filter sample.QueryFilter - filter.AllUsers, filter.SkipSystemUsers, filter.ExcludeSystemCommands, filter.ExcludeHeartbeat = - command.DefaultRecordFilters() - filter.HaveFrom = args.SourceArgs.HaveFrom - filter.FromDate = args.SourceArgs.FromDate - filter.HaveTo = args.SourceArgs.HaveTo - filter.ToDate = args.SourceArgs.ToDate - filter.Host = args.HostArgs.Host - filter.ExcludeSystemJobs = args.RecordFilterArgs.ExcludeSystemJobs - filter.User = args.RecordFilterArgs.User - filter.ExcludeUser = args.RecordFilterArgs.ExcludeUser - filter.Command = args.RecordFilterArgs.Command - filter.ExcludeCommand = args.RecordFilterArgs.ExcludeCommand - filter.Job = args.RecordFilterArgs.Job - filter.ExcludeJob = args.RecordFilterArgs.ExcludeJob + filter := BuildSampleFilter(command) theLog, err := db.OpenReadOnlyDB( command.ConfigFile(), @@ -53,3 +43,97 @@ func LocalSampleOperation(command SampleAnalysisCommand, _ io.Reader, stdout, st return command.Perform(stdout, cfg, theLog, filter, hosts, recordFilter) } + +type FileListJobsDataProvider struct { + provider db.DataProvider + isSample bool +} + +func (fljdp *FileListJobsDataProvider) ReadSamples( + fromDate, toDate time.Time, + hosts *Hosts, + verbose bool, +) ( + sampleBlobs [][]*repr.Sample, + softErrors int, + err error, +) { + if fljdp.isSample { + return fljdp.provider.ReadSamples(fromDate, toDate, hosts, verbose) + } + return nil, 0, nil +} + +func (fljdp *FileListJobsDataProvider) ReadSacctData( + fromDate, toDate time.Time, + verbose bool, +) ( + recordBlobs [][]*repr.SacctInfo, + softErrors int, + err error, +) { + if !fljdp.isSample { + return fljdp.provider.ReadSacctData(fromDate, toDate, verbose) + } + return nil, 0, nil +} + +var _ = jobs.JobsDataProvider((*FileListJobsDataProvider)(nil)) + +func LocalJobsOperation(command *jobs.JobsCommand, _ io.Reader, stdout, stderr io.Writer) error { + args := command.SampleAnalysisFlags() + filter := BuildSampleFilter(command) + cfg, err := special.MaybeGetConfig(command.ConfigFile()) + if err != nil { + return err + } + hosts, recordFilter, err := sample.BuildSampleFilter(cfg, filter, args.Verbose) + if err != nil { + return fmt.Errorf("Failed to create record filter: %v", err) + } + + var theLog jobs.JobsDataProvider + if len(args.LogFiles) > 0 { + // We default to sample data, fall back to sacct data under a switch. + fljdp := &FileListJobsDataProvider{} + theLog = fljdp + if command.SlurmJobData { + fljdp.provider, err = db.OpenFileListDB(db.FileListSlurmJobData, args.LogFiles, cfg) + } else { + fljdp.isSample = true + fljdp.provider, err = db.OpenFileListDB(db.FileListSampleData, args.LogFiles, cfg) + } + } else { + if args.DataDir == "" { + return fmt.Errorf("Must have either dataDir or logFiles") + } + theLog, err = db.OpenPersistentDirectoryDB(args.DataDir, cfg) + } + if err != nil { + return fmt.Errorf("Failed to open log store: %v", err) + } + + return command.Perform(stdout, cfg, theLog, filter, hosts, recordFilter) +} + +func BuildSampleFilter(params SampleAnalysisParameters) sample.QueryFilter { + args := params.SampleAnalysisFlags() + + var filter sample.QueryFilter + filter.AllUsers, filter.SkipSystemUsers, filter.ExcludeSystemCommands, filter.ExcludeHeartbeat = + params.DefaultRecordFilters() + filter.HaveFrom = args.SourceArgs.HaveFrom + filter.FromDate = args.SourceArgs.FromDate + filter.HaveTo = args.SourceArgs.HaveTo + filter.ToDate = args.SourceArgs.ToDate + filter.Host = args.HostArgs.Host + filter.ExcludeSystemJobs = args.RecordFilterArgs.ExcludeSystemJobs + filter.User = args.RecordFilterArgs.User + filter.ExcludeUser = args.RecordFilterArgs.ExcludeUser + filter.Command = args.RecordFilterArgs.Command + filter.ExcludeCommand = args.RecordFilterArgs.ExcludeCommand + filter.Job = args.RecordFilterArgs.Job + filter.ExcludeJob = args.RecordFilterArgs.ExcludeJob + + return filter +} diff --git a/code/sonalyze/cmd/config.go b/code/sonalyze/cmd/config.go index ec380094..a0eb140a 100644 --- a/code/sonalyze/cmd/config.go +++ b/code/sonalyze/cmd/config.go @@ -10,17 +10,17 @@ import ( // Standard method for cleaning an InputStreamSet relative to a config: the config must have a // definition for each host. No config at all is a fatal error. No config for a host means we -// remove the host from the set, we return the modified set. +// remove the host from the set (imperatively). // // Over time this may become more complicated, as the config becomes time-dependent. func EnsureConfigForInputStreams( cfg *config.ClusterConfig, streams sample.InputStreamSet, reason string, -) (sample.InputStreamSet, error) { +) error { // Bail if there's no config data at all. if cfg == nil { - return nil, fmt.Errorf("Configuration file required: %s", reason) + return fmt.Errorf("Configuration file required: %s", reason) } // Remove streams for which we have no config data. @@ -37,5 +37,5 @@ func EnsureConfigForInputStreams( delete(streams, b) } - return streams, nil + return nil } diff --git a/code/sonalyze/cmd/jobs/jobs-table.go b/code/sonalyze/cmd/jobs/jobs-table.go index 516ab016..17e81536 100644 --- a/code/sonalyze/cmd/jobs/jobs-table.go +++ b/code/sonalyze/cmd/jobs/jobs-table.go @@ -2,6 +2,8 @@ package jobs +import "sonalyze/data/samplejob" + import ( "cmp" "fmt" @@ -29,31 +31,46 @@ var jobsFormatters = map[string]Formatter[*jobSummary]{ }, "Job": { Fmt: func(d *jobSummary, ctx PrintMods) string { - return FormatUint32((d.JobId), ctx) + if (d.sampleJob) != nil { + return FormatUint32((d.sampleJob.JobId), ctx) + } + return "?" }, Help: "(uint32) Job ID", }, "User": { Fmt: func(d *jobSummary, ctx PrintMods) string { - return FormatUstr((d.User), ctx) + if (d.sampleJob) != nil { + return FormatUstr((d.sampleJob.User), ctx) + } + return "?" }, Help: "(string) Name of user running the job", }, "Duration": { Fmt: func(d *jobSummary, ctx PrintMods) string { - return FormatDurationValue((d.Duration), ctx) + if (d.sampleJob) != nil { + return FormatDurationValue((d.sampleJob.Duration), ctx) + } + return "?" }, Help: "(DurationValue) Time of last observation minus time of first", }, "Start": { Fmt: func(d *jobSummary, ctx PrintMods) string { - return FormatDateTimeValue((d.Start), ctx) + if (d.sampleJob) != nil { + return FormatDateTimeValue((d.sampleJob.Start), ctx) + } + return "?" }, Help: "(DateTimeValue) Time of first observation", }, "End": { Fmt: func(d *jobSummary, ctx PrintMods) string { - return FormatDateTimeValue((d.End), ctx) + if (d.sampleJob) != nil { + return FormatDateTimeValue((d.sampleJob.End), ctx) + } + return "?" }, Help: "(DateTimeValue) Time of last observation", }, @@ -217,25 +234,37 @@ var jobsFormatters = map[string]Formatter[*jobSummary]{ }, "Gpus": { Fmt: func(d *jobSummary, ctx PrintMods) string { - return FormatGpuSet((d.Gpus), ctx) + if (d.sampleJob) != nil { + return FormatGpuSet((d.sampleJob.Gpus), ctx) + } + return "?" }, Help: "(GpuSet) GPU device numbers used by the job, 'none' if none or 'unknown' in error states", }, "GpuFail": { Fmt: func(d *jobSummary, ctx PrintMods) string { - return FormatInt((d.GpuFail), ctx) + if (d.sampleJob) != nil { + return FormatInt((d.sampleJob.GpuFail), ctx) + } + return "?" }, Help: "(int) Flag indicating GPU status (0=Ok, 1=Failing)", }, "Cmd": { Fmt: func(d *jobSummary, ctx PrintMods) string { - return FormatString((d.Cmd), ctx) + if (d.sampleJob) != nil { + return FormatString((d.sampleJob.Cmd), ctx) + } + return "?" }, Help: "(string) The commands invoking the processes of the job", }, "Hosts": { Fmt: func(d *jobSummary, ctx PrintMods) string { - return FormatHostnames((d.Hosts), ctx) + if (d.sampleJob) != nil { + return FormatHostnames((d.sampleJob.Hosts), ctx) + } + return "?" }, Help: "(Hostnames) List of the host name(s) running the job", }, @@ -247,68 +276,98 @@ var jobsFormatters = map[string]Formatter[*jobSummary]{ }, "Classification": { Fmt: func(d *jobSummary, ctx PrintMods) string { - return FormatInt((d.Classification), ctx) + if (d.sampleJob) != nil { + return FormatInt((d.sampleJob.Classification), ctx) + } + return "?" }, Help: "(int) Bit vector of live-at-start (2) and live-at-end (1) flags", }, "CpuTime": { Fmt: func(d *jobSummary, ctx PrintMods) string { - return FormatDurationValue((d.CpuTime), ctx) + if (d.sampleJob) != nil { + return FormatDurationValue((d.sampleJob.CpuTime), ctx) + } + return "?" }, Help: "(DurationValue) Total CPU time of the job across all cores", }, "GpuTime": { Fmt: func(d *jobSummary, ctx PrintMods) string { - return FormatDurationValue((d.GpuTime), ctx) + if (d.sampleJob) != nil { + return FormatDurationValue((d.sampleJob.GpuTime), ctx) + } + return "?" }, Help: "(DurationValue) Total GPU time of the job across all cards", }, "SomeGpu": { Fmt: func(d *jobSummary, ctx PrintMods) string { - return FormatBool((d.computedFlags&kUsesGpu != 0), ctx) + if (d.sampleJob) != nil { + return FormatBool((d.sampleJob.ComputedFlags&samplejob.KUsesGpu != 0), ctx) + } + return "?" }, Help: "(bool) True iff process was seen to use some GPU", }, "NoGpu": { Fmt: func(d *jobSummary, ctx PrintMods) string { - return FormatBool((d.computedFlags&kDoesNotUseGpu != 0), ctx) + if (d.sampleJob) != nil { + return FormatBool((d.sampleJob.ComputedFlags&samplejob.KDoesNotUseGpu != 0), ctx) + } + return "?" }, Help: "(bool) True iff process was seen to use no GPU", }, "Running": { Fmt: func(d *jobSummary, ctx PrintMods) string { - return FormatBool((d.computedFlags&kIsLiveAtEnd != 0), ctx) + if (d.sampleJob) != nil { + return FormatBool((d.sampleJob.ComputedFlags&samplejob.KIsLiveAtEnd != 0), ctx) + } + return "?" }, Help: "(bool) True iff process appears to still be running at end of time window", }, "Completed": { Fmt: func(d *jobSummary, ctx PrintMods) string { - return FormatBool((d.computedFlags&kIsNotLiveAtEnd != 0), ctx) + if (d.sampleJob) != nil { + return FormatBool((d.sampleJob.ComputedFlags&samplejob.KIsNotLiveAtEnd != 0), ctx) + } + return "?" }, Help: "(bool) True iff process appears not to be running at end of time window", }, "Zombie": { Fmt: func(d *jobSummary, ctx PrintMods) string { - return FormatBool((d.computedFlags&kIsZombie != 0), ctx) + if (d.sampleJob) != nil { + return FormatBool((d.sampleJob.ComputedFlags&samplejob.KIsZombie != 0), ctx) + } + return "?" }, Help: "(bool) True iff the process looks like a zombie", }, "Primordial": { Fmt: func(d *jobSummary, ctx PrintMods) string { - return FormatBool((d.computedFlags&kIsLiveAtStart != 0), ctx) + if (d.sampleJob) != nil { + return FormatBool((d.sampleJob.ComputedFlags&samplejob.KIsLiveAtStart != 0), ctx) + } + return "?" }, Help: "(bool) True iff the process appears to have been alive at the start of the time window", }, "BornLater": { Fmt: func(d *jobSummary, ctx PrintMods) string { - return FormatBool((d.computedFlags&kIsNotLiveAtStart != 0), ctx) + if (d.sampleJob) != nil { + return FormatBool((d.sampleJob.ComputedFlags&samplejob.KIsNotLiveAtStart != 0), ctx) + } + return "?" }, Help: "(bool) True iff the process appears not to have been alive at the start of the time window", }, "Submit": { Fmt: func(d *jobSummary, ctx PrintMods) string { - if (d.sacctInfo) != nil { - return FormatDateTimeValue((d.sacctInfo.Submit), ctx) + if (d.slurmJob.Main) != nil { + return FormatDateTimeValue((d.slurmJob.Main.Submit), ctx) } return "?" }, @@ -316,8 +375,8 @@ var jobsFormatters = map[string]Formatter[*jobSummary]{ }, "JobName": { Fmt: func(d *jobSummary, ctx PrintMods) string { - if (d.sacctInfo) != nil { - return FormatUstr((d.sacctInfo.JobName), ctx) + if (d.slurmJob.Main) != nil { + return FormatUstr((d.slurmJob.Main.JobName), ctx) } return "?" }, @@ -325,8 +384,8 @@ var jobsFormatters = map[string]Formatter[*jobSummary]{ }, "State": { Fmt: func(d *jobSummary, ctx PrintMods) string { - if (d.sacctInfo) != nil { - return FormatUstr((d.sacctInfo.State), ctx) + if (d.slurmJob.Main) != nil { + return FormatUstr((d.slurmJob.Main.State), ctx) } return "?" }, @@ -334,8 +393,8 @@ var jobsFormatters = map[string]Formatter[*jobSummary]{ }, "Account": { Fmt: func(d *jobSummary, ctx PrintMods) string { - if (d.sacctInfo) != nil { - return FormatUstr((d.sacctInfo.Account), ctx) + if (d.slurmJob.Main) != nil { + return FormatUstr((d.slurmJob.Main.Account), ctx) } return "?" }, @@ -343,8 +402,8 @@ var jobsFormatters = map[string]Formatter[*jobSummary]{ }, "Layout": { Fmt: func(d *jobSummary, ctx PrintMods) string { - if (d.sacctInfo) != nil { - return FormatUstr((d.sacctInfo.Layout), ctx) + if (d.slurmJob.Main) != nil { + return FormatUstr((d.slurmJob.Main.Layout), ctx) } return "?" }, @@ -352,8 +411,8 @@ var jobsFormatters = map[string]Formatter[*jobSummary]{ }, "Reservation": { Fmt: func(d *jobSummary, ctx PrintMods) string { - if (d.sacctInfo) != nil { - return FormatUstr((d.sacctInfo.Reservation), ctx) + if (d.slurmJob.Main) != nil { + return FormatUstr((d.slurmJob.Main.Reservation), ctx) } return "?" }, @@ -361,8 +420,8 @@ var jobsFormatters = map[string]Formatter[*jobSummary]{ }, "Partition": { Fmt: func(d *jobSummary, ctx PrintMods) string { - if (d.sacctInfo) != nil { - return FormatUstr((d.sacctInfo.Partition), ctx) + if (d.slurmJob.Main) != nil { + return FormatUstr((d.slurmJob.Main.Partition), ctx) } return "?" }, @@ -370,8 +429,8 @@ var jobsFormatters = map[string]Formatter[*jobSummary]{ }, "RequestedGpus": { Fmt: func(d *jobSummary, ctx PrintMods) string { - if (d.sacctInfo) != nil { - return FormatUstr((d.sacctInfo.ReqGPUS), ctx) + if (d.slurmJob.Main) != nil { + return FormatUstr((d.slurmJob.Main.ReqGPUS), ctx) } return "?" }, @@ -379,8 +438,8 @@ var jobsFormatters = map[string]Formatter[*jobSummary]{ }, "DiskReadAvgGB": { Fmt: func(d *jobSummary, ctx PrintMods) string { - if (d.sacctInfo) != nil { - return FormatUint32((d.sacctInfo.AveDiskRead), ctx) + if (d.slurmJob.Main) != nil { + return FormatUint32((d.slurmJob.Main.AveDiskRead), ctx) } return "?" }, @@ -388,8 +447,8 @@ var jobsFormatters = map[string]Formatter[*jobSummary]{ }, "DiskWriteAvgGB": { Fmt: func(d *jobSummary, ctx PrintMods) string { - if (d.sacctInfo) != nil { - return FormatUint32((d.sacctInfo.AveDiskWrite), ctx) + if (d.slurmJob.Main) != nil { + return FormatUint32((d.slurmJob.Main.AveDiskWrite), ctx) } return "?" }, @@ -397,8 +456,8 @@ var jobsFormatters = map[string]Formatter[*jobSummary]{ }, "RequestedCpus": { Fmt: func(d *jobSummary, ctx PrintMods) string { - if (d.sacctInfo) != nil { - return FormatUint32((d.sacctInfo.ReqCPUS), ctx) + if (d.slurmJob.Main) != nil { + return FormatUint32((d.slurmJob.Main.ReqCPUS), ctx) } return "?" }, @@ -406,8 +465,8 @@ var jobsFormatters = map[string]Formatter[*jobSummary]{ }, "RequestedMemGB": { Fmt: func(d *jobSummary, ctx PrintMods) string { - if (d.sacctInfo) != nil { - return FormatUint32((d.sacctInfo.ReqMem), ctx) + if (d.slurmJob.Main) != nil { + return FormatUint32((d.slurmJob.Main.ReqMem), ctx) } return "?" }, @@ -415,8 +474,8 @@ var jobsFormatters = map[string]Formatter[*jobSummary]{ }, "RequestedNodes": { Fmt: func(d *jobSummary, ctx PrintMods) string { - if (d.sacctInfo) != nil { - return FormatUint32((d.sacctInfo.ReqNodes), ctx) + if (d.slurmJob.Main) != nil { + return FormatUint32((d.slurmJob.Main.ReqNodes), ctx) } return "?" }, @@ -424,8 +483,8 @@ var jobsFormatters = map[string]Formatter[*jobSummary]{ }, "TimeLimit": { Fmt: func(d *jobSummary, ctx PrintMods) string { - if (d.sacctInfo) != nil { - return FormatU32Duration((d.sacctInfo.TimelimitRaw), ctx) + if (d.slurmJob.Main) != nil { + return FormatU32Duration((d.slurmJob.Main.TimelimitRaw), ctx) } return "?" }, @@ -433,8 +492,8 @@ var jobsFormatters = map[string]Formatter[*jobSummary]{ }, "ExitCode": { Fmt: func(d *jobSummary, ctx PrintMods) string { - if (d.sacctInfo) != nil { - return FormatUint8((d.sacctInfo.ExitCode), ctx) + if (d.slurmJob.Main) != nil { + return FormatUint8((d.slurmJob.Main.ExitCode), ctx) } return "?" }, @@ -494,31 +553,46 @@ var jobsPredicates = map[string]Predicate[*jobSummary]{ "Job": Predicate[*jobSummary]{ Convert: CvtString2Uint32, Compare: func(d *jobSummary, v any) int { - return cmp.Compare((d.JobId), v.(uint32)) + if (d.sampleJob) != nil { + return cmp.Compare((d.sampleJob.JobId), v.(uint32)) + } + return -1 }, }, "User": Predicate[*jobSummary]{ Convert: CvtString2Ustr, Compare: func(d *jobSummary, v any) int { - return cmp.Compare((d.User), v.(Ustr)) + if (d.sampleJob) != nil { + return cmp.Compare((d.sampleJob.User), v.(Ustr)) + } + return -1 }, }, "Duration": Predicate[*jobSummary]{ Convert: CvtString2DurationValue, Compare: func(d *jobSummary, v any) int { - return cmp.Compare((d.Duration), v.(DurationValue)) + if (d.sampleJob) != nil { + return cmp.Compare((d.sampleJob.Duration), v.(DurationValue)) + } + return -1 }, }, "Start": Predicate[*jobSummary]{ Convert: CvtString2DateTimeValue, Compare: func(d *jobSummary, v any) int { - return cmp.Compare((d.Start), v.(DateTimeValue)) + if (d.sampleJob) != nil { + return cmp.Compare((d.sampleJob.Start), v.(DateTimeValue)) + } + return -1 }, }, "End": Predicate[*jobSummary]{ Convert: CvtString2DateTimeValue, Compare: func(d *jobSummary, v any) int { - return cmp.Compare((d.End), v.(DateTimeValue)) + if (d.sampleJob) != nil { + return cmp.Compare((d.sampleJob.End), v.(DateTimeValue)) + } + return -1 }, }, "CpuAvgPct": Predicate[*jobSummary]{ @@ -668,24 +742,36 @@ var jobsPredicates = map[string]Predicate[*jobSummary]{ "Gpus": Predicate[*jobSummary]{ Convert: CvtString2GpuSet, SetCompare: func(d *jobSummary, v any, op int) bool { - return SetCompareGpuSets((d.Gpus), v.(gpuset.GpuSet), op) + if (d.sampleJob) != nil { + return SetCompareGpuSets((d.sampleJob.Gpus), v.(gpuset.GpuSet), op) + } + return false }, }, "GpuFail": Predicate[*jobSummary]{ Convert: CvtString2Int, Compare: func(d *jobSummary, v any) int { - return cmp.Compare((d.GpuFail), v.(int)) + if (d.sampleJob) != nil { + return cmp.Compare((d.sampleJob.GpuFail), v.(int)) + } + return -1 }, }, "Cmd": Predicate[*jobSummary]{ Compare: func(d *jobSummary, v any) int { - return cmp.Compare((d.Cmd), v.(string)) + if (d.sampleJob) != nil { + return cmp.Compare((d.sampleJob.Cmd), v.(string)) + } + return -1 }, }, "Hosts": Predicate[*jobSummary]{ Convert: CvtString2Hostnames, SetCompare: func(d *jobSummary, v any, op int) bool { - return SetCompareHostnames((d.Hosts), v.(*Hostnames), op) + if (d.sampleJob) != nil { + return SetCompareHostnames((d.sampleJob.Hosts), v.(*Hostnames), op) + } + return false }, }, "Now": Predicate[*jobSummary]{ @@ -697,68 +783,98 @@ var jobsPredicates = map[string]Predicate[*jobSummary]{ "Classification": Predicate[*jobSummary]{ Convert: CvtString2Int, Compare: func(d *jobSummary, v any) int { - return cmp.Compare((d.Classification), v.(int)) + if (d.sampleJob) != nil { + return cmp.Compare((d.sampleJob.Classification), v.(int)) + } + return -1 }, }, "CpuTime": Predicate[*jobSummary]{ Convert: CvtString2DurationValue, Compare: func(d *jobSummary, v any) int { - return cmp.Compare((d.CpuTime), v.(DurationValue)) + if (d.sampleJob) != nil { + return cmp.Compare((d.sampleJob.CpuTime), v.(DurationValue)) + } + return -1 }, }, "GpuTime": Predicate[*jobSummary]{ Convert: CvtString2DurationValue, Compare: func(d *jobSummary, v any) int { - return cmp.Compare((d.GpuTime), v.(DurationValue)) + if (d.sampleJob) != nil { + return cmp.Compare((d.sampleJob.GpuTime), v.(DurationValue)) + } + return -1 }, }, "SomeGpu": Predicate[*jobSummary]{ Convert: CvtString2Bool, Compare: func(d *jobSummary, v any) int { - return CompareBool((d.computedFlags&kUsesGpu != 0), v.(bool)) + if (d.sampleJob) != nil { + return CompareBool((d.sampleJob.ComputedFlags&samplejob.KUsesGpu != 0), v.(bool)) + } + return -1 }, }, "NoGpu": Predicate[*jobSummary]{ Convert: CvtString2Bool, Compare: func(d *jobSummary, v any) int { - return CompareBool((d.computedFlags&kDoesNotUseGpu != 0), v.(bool)) + if (d.sampleJob) != nil { + return CompareBool((d.sampleJob.ComputedFlags&samplejob.KDoesNotUseGpu != 0), v.(bool)) + } + return -1 }, }, "Running": Predicate[*jobSummary]{ Convert: CvtString2Bool, Compare: func(d *jobSummary, v any) int { - return CompareBool((d.computedFlags&kIsLiveAtEnd != 0), v.(bool)) + if (d.sampleJob) != nil { + return CompareBool((d.sampleJob.ComputedFlags&samplejob.KIsLiveAtEnd != 0), v.(bool)) + } + return -1 }, }, "Completed": Predicate[*jobSummary]{ Convert: CvtString2Bool, Compare: func(d *jobSummary, v any) int { - return CompareBool((d.computedFlags&kIsNotLiveAtEnd != 0), v.(bool)) + if (d.sampleJob) != nil { + return CompareBool((d.sampleJob.ComputedFlags&samplejob.KIsNotLiveAtEnd != 0), v.(bool)) + } + return -1 }, }, "Zombie": Predicate[*jobSummary]{ Convert: CvtString2Bool, Compare: func(d *jobSummary, v any) int { - return CompareBool((d.computedFlags&kIsZombie != 0), v.(bool)) + if (d.sampleJob) != nil { + return CompareBool((d.sampleJob.ComputedFlags&samplejob.KIsZombie != 0), v.(bool)) + } + return -1 }, }, "Primordial": Predicate[*jobSummary]{ Convert: CvtString2Bool, Compare: func(d *jobSummary, v any) int { - return CompareBool((d.computedFlags&kIsLiveAtStart != 0), v.(bool)) + if (d.sampleJob) != nil { + return CompareBool((d.sampleJob.ComputedFlags&samplejob.KIsLiveAtStart != 0), v.(bool)) + } + return -1 }, }, "BornLater": Predicate[*jobSummary]{ Convert: CvtString2Bool, Compare: func(d *jobSummary, v any) int { - return CompareBool((d.computedFlags&kIsNotLiveAtStart != 0), v.(bool)) + if (d.sampleJob) != nil { + return CompareBool((d.sampleJob.ComputedFlags&samplejob.KIsNotLiveAtStart != 0), v.(bool)) + } + return -1 }, }, "Submit": Predicate[*jobSummary]{ Convert: CvtString2DateTimeValue, Compare: func(d *jobSummary, v any) int { - if (d.sacctInfo) != nil { - return cmp.Compare((d.sacctInfo.Submit), v.(DateTimeValue)) + if (d.slurmJob.Main) != nil { + return cmp.Compare((d.slurmJob.Main.Submit), v.(DateTimeValue)) } return -1 }, @@ -766,8 +882,8 @@ var jobsPredicates = map[string]Predicate[*jobSummary]{ "JobName": Predicate[*jobSummary]{ Convert: CvtString2Ustr, Compare: func(d *jobSummary, v any) int { - if (d.sacctInfo) != nil { - return cmp.Compare((d.sacctInfo.JobName), v.(Ustr)) + if (d.slurmJob.Main) != nil { + return cmp.Compare((d.slurmJob.Main.JobName), v.(Ustr)) } return -1 }, @@ -775,8 +891,8 @@ var jobsPredicates = map[string]Predicate[*jobSummary]{ "State": Predicate[*jobSummary]{ Convert: CvtString2Ustr, Compare: func(d *jobSummary, v any) int { - if (d.sacctInfo) != nil { - return cmp.Compare((d.sacctInfo.State), v.(Ustr)) + if (d.slurmJob.Main) != nil { + return cmp.Compare((d.slurmJob.Main.State), v.(Ustr)) } return -1 }, @@ -784,8 +900,8 @@ var jobsPredicates = map[string]Predicate[*jobSummary]{ "Account": Predicate[*jobSummary]{ Convert: CvtString2Ustr, Compare: func(d *jobSummary, v any) int { - if (d.sacctInfo) != nil { - return cmp.Compare((d.sacctInfo.Account), v.(Ustr)) + if (d.slurmJob.Main) != nil { + return cmp.Compare((d.slurmJob.Main.Account), v.(Ustr)) } return -1 }, @@ -793,8 +909,8 @@ var jobsPredicates = map[string]Predicate[*jobSummary]{ "Layout": Predicate[*jobSummary]{ Convert: CvtString2Ustr, Compare: func(d *jobSummary, v any) int { - if (d.sacctInfo) != nil { - return cmp.Compare((d.sacctInfo.Layout), v.(Ustr)) + if (d.slurmJob.Main) != nil { + return cmp.Compare((d.slurmJob.Main.Layout), v.(Ustr)) } return -1 }, @@ -802,8 +918,8 @@ var jobsPredicates = map[string]Predicate[*jobSummary]{ "Reservation": Predicate[*jobSummary]{ Convert: CvtString2Ustr, Compare: func(d *jobSummary, v any) int { - if (d.sacctInfo) != nil { - return cmp.Compare((d.sacctInfo.Reservation), v.(Ustr)) + if (d.slurmJob.Main) != nil { + return cmp.Compare((d.slurmJob.Main.Reservation), v.(Ustr)) } return -1 }, @@ -811,8 +927,8 @@ var jobsPredicates = map[string]Predicate[*jobSummary]{ "Partition": Predicate[*jobSummary]{ Convert: CvtString2Ustr, Compare: func(d *jobSummary, v any) int { - if (d.sacctInfo) != nil { - return cmp.Compare((d.sacctInfo.Partition), v.(Ustr)) + if (d.slurmJob.Main) != nil { + return cmp.Compare((d.slurmJob.Main.Partition), v.(Ustr)) } return -1 }, @@ -820,8 +936,8 @@ var jobsPredicates = map[string]Predicate[*jobSummary]{ "RequestedGpus": Predicate[*jobSummary]{ Convert: CvtString2Ustr, Compare: func(d *jobSummary, v any) int { - if (d.sacctInfo) != nil { - return cmp.Compare((d.sacctInfo.ReqGPUS), v.(Ustr)) + if (d.slurmJob.Main) != nil { + return cmp.Compare((d.slurmJob.Main.ReqGPUS), v.(Ustr)) } return -1 }, @@ -829,8 +945,8 @@ var jobsPredicates = map[string]Predicate[*jobSummary]{ "DiskReadAvgGB": Predicate[*jobSummary]{ Convert: CvtString2Uint32, Compare: func(d *jobSummary, v any) int { - if (d.sacctInfo) != nil { - return cmp.Compare((d.sacctInfo.AveDiskRead), v.(uint32)) + if (d.slurmJob.Main) != nil { + return cmp.Compare((d.slurmJob.Main.AveDiskRead), v.(uint32)) } return -1 }, @@ -838,8 +954,8 @@ var jobsPredicates = map[string]Predicate[*jobSummary]{ "DiskWriteAvgGB": Predicate[*jobSummary]{ Convert: CvtString2Uint32, Compare: func(d *jobSummary, v any) int { - if (d.sacctInfo) != nil { - return cmp.Compare((d.sacctInfo.AveDiskWrite), v.(uint32)) + if (d.slurmJob.Main) != nil { + return cmp.Compare((d.slurmJob.Main.AveDiskWrite), v.(uint32)) } return -1 }, @@ -847,8 +963,8 @@ var jobsPredicates = map[string]Predicate[*jobSummary]{ "RequestedCpus": Predicate[*jobSummary]{ Convert: CvtString2Uint32, Compare: func(d *jobSummary, v any) int { - if (d.sacctInfo) != nil { - return cmp.Compare((d.sacctInfo.ReqCPUS), v.(uint32)) + if (d.slurmJob.Main) != nil { + return cmp.Compare((d.slurmJob.Main.ReqCPUS), v.(uint32)) } return -1 }, @@ -856,8 +972,8 @@ var jobsPredicates = map[string]Predicate[*jobSummary]{ "RequestedMemGB": Predicate[*jobSummary]{ Convert: CvtString2Uint32, Compare: func(d *jobSummary, v any) int { - if (d.sacctInfo) != nil { - return cmp.Compare((d.sacctInfo.ReqMem), v.(uint32)) + if (d.slurmJob.Main) != nil { + return cmp.Compare((d.slurmJob.Main.ReqMem), v.(uint32)) } return -1 }, @@ -865,8 +981,8 @@ var jobsPredicates = map[string]Predicate[*jobSummary]{ "RequestedNodes": Predicate[*jobSummary]{ Convert: CvtString2Uint32, Compare: func(d *jobSummary, v any) int { - if (d.sacctInfo) != nil { - return cmp.Compare((d.sacctInfo.ReqNodes), v.(uint32)) + if (d.slurmJob.Main) != nil { + return cmp.Compare((d.slurmJob.Main.ReqNodes), v.(uint32)) } return -1 }, @@ -874,8 +990,8 @@ var jobsPredicates = map[string]Predicate[*jobSummary]{ "TimeLimit": Predicate[*jobSummary]{ Convert: CvtString2U32Duration, Compare: func(d *jobSummary, v any) int { - if (d.sacctInfo) != nil { - return cmp.Compare((d.sacctInfo.TimelimitRaw), v.(U32Duration)) + if (d.slurmJob.Main) != nil { + return cmp.Compare((d.slurmJob.Main.TimelimitRaw), v.(U32Duration)) } return -1 }, @@ -883,8 +999,8 @@ var jobsPredicates = map[string]Predicate[*jobSummary]{ "ExitCode": Predicate[*jobSummary]{ Convert: CvtString2Uint8, Compare: func(d *jobSummary, v any) int { - if (d.sacctInfo) != nil { - return cmp.Compare((d.sacctInfo.ExitCode), v.(uint8)) + if (d.slurmJob.Main) != nil { + return cmp.Compare((d.slurmJob.Main.ExitCode), v.(uint8)) } return -1 }, diff --git a/code/sonalyze/cmd/jobs/jobs.go b/code/sonalyze/cmd/jobs/jobs.go index 85791d19..fbe38790 100644 --- a/code/sonalyze/cmd/jobs/jobs.go +++ b/code/sonalyze/cmd/jobs/jobs.go @@ -27,7 +27,7 @@ var uintArgs = []uintArg{ "Select only jobs with at least this many samples [default: 1]", 1, "min-samples", - -1, + kSampleCount, false, }, uintArg{ @@ -247,6 +247,7 @@ type JobsCommand struct /* implements SampleAnalysisCommand */ { MergeAll bool MergeNone bool MinRuntimeSec int64 + SlurmJobData bool // Print args NumJobs uint @@ -255,8 +256,6 @@ type JobsCommand struct /* implements SampleAnalysisCommand */ { minRuntimeStr string } -var _ SampleAnalysisCommand = (*JobsCommand)(nil) - func (jc *JobsCommand) lookupUint(s string) uint { if v, ok := jc.Uints[s]; ok { return *v @@ -268,6 +267,9 @@ func (jc *JobsCommand) Add(fs *CLI) { jc.SampleAnalysisArgs.Add(fs) jc.FormatArgs.Add(fs) + fs.Group("local-data-source") + fs.BoolVar(&jc.SlurmJobData, "slurm", false, "Interpret file arguments as slurm jobs data, not sonar sample data") + fs.Group("job-filter") jc.Uints = make(map[string]*uint) for _, v := range uintArgs { @@ -320,6 +322,7 @@ func (jc *JobsCommand) ReifyForRemote(x *ArgReifier) error { x.UintUnchecked(v.name, *box) } } + x.Bool("slurm", jc.SlurmJobData) x.Bool("no-gpu", jc.NoGpu) x.Bool("some-gpu", jc.SomeGpu) x.Bool("completed", jc.Completed) diff --git a/code/sonalyze/cmd/jobs/perform.go b/code/sonalyze/cmd/jobs/perform.go index 362fc46c..e3e6a634 100644 --- a/code/sonalyze/cmd/jobs/perform.go +++ b/code/sonalyze/cmd/jobs/perform.go @@ -3,8 +3,9 @@ package jobs import ( "fmt" "io" - "math" + "maps" "slices" + "strconv" "strings" "time" @@ -15,13 +16,12 @@ import ( . "sonalyze/cmd" . "sonalyze/common" "sonalyze/data/sample" + "sonalyze/data/samplejob" "sonalyze/data/slurmjob" "sonalyze/db" - "sonalyze/db/repr" . "sonalyze/table" ) -// Computed float64 fields in jobAggregate.computed const ( kCpuPctAvg = iota // Average CPU utilization, 1 core == 100% kCpuPctPeak // Peak CPU utilization ditto @@ -48,568 +48,487 @@ const ( kSgpuGBAvg // Average GPU memory utilization, all cards used by job == 100% kSgpuGBPeak // Peak GPU memory utilization ditto kDuration // Duration of job in seconds (wall clock, not CPU) + kSampleCount // Number of samples numF64Fields ) -// Computed flag bits in jobAggregate.computedFlags -const ( - kUsesGpu = (1 << iota) // True if there's reason to believe a gpu was used by job - kDoesNotUseGpu // Opposite - kGpuFail // GPU failed - kIsLiveAtStart // Job had record at earliest timestamp of input set for host - kIsNotLiveAtStart // Opposite - kIsLiveAtEnd // Job had record at latest timestamp of input set for host - kIsNotLiveAtEnd // Opposite - kIsZombie // Command contains or user starts with _zombie_ -) - -// Package for results from aggregation. +// A number of fields could come *either* from the sample job or the slurm job, depending on what +// data we have, and it's the result of that joining that we want to print when we print jobs. If +// there is a SlurmJob but not a SampleJob then a SampleJob is synthesized from the SlurmJob data. +// This keeps printing logic sane, and there are no samples exposed so that's fine. type jobSummary struct { - jobAggregate - JobId uint32 - User Ustr - JobAndMark string - Now DateTimeValue - Duration DurationValue - Start DateTimeValue // Earliest time seen for the job, seconds since epoch - End DateTimeValue // Latest time ditto - CpuTime DurationValue - GpuTime DurationValue - Classification int // Bit vector of flags - job sample.SampleStream - computedFlags int - selected bool // Initially true, used to deselect the record before printing - sacctInfo *repr.SacctInfo + Now DateTimeValue + JobAndMark string + selected bool + sampleJob *samplejob.SampleJob + slurmJob *slurmjob.SlurmJob + computed [numF64Fields]float64 } -// Aggregate figures for a job. For some cross-job data like user and host, go to the sample stream -// in the jobSummary that owns this aggregate. -// -// The float fields of this are *not* rounded in any way. -// -// GPU memory: If a system config is present and conf.GpuMemPct is true then kGpuGB* are derived -// from the recorded percentage figure, otherwise kRgpuGB* are derived from the recorded absolute -// figures. If a system config is not present then all fields will represent the recorded values -// (kRgpuKB * the recorded percentages). -type jobAggregate struct { - GpuFail int - Gpus gpuset.GpuSet - computed [numF64Fields]float64 - IsZombie bool - Cmd string - Hosts *Hostnames +type JobsDataProvider interface { + db.ProcessSampleDataProvider + db.SacctDataProvider +} + +// As we have a config, logclean will have computed proper GPU memory values for the job, so we need +// not look to sys.GpuMemPct here. +type hostResources struct { + cpuCores int + memGB int + gpuCards int + gpuMemGB int } func (jc *JobsCommand) Perform( out io.Writer, cfg *config.ClusterConfig, - theDb db.SampleDataProvider, + theDb JobsDataProvider, filter sample.QueryFilter, hosts *Hosts, recordFilter *sample.SampleFilter, ) error { - streams, bounds, read, dropped, err := - sample.ReadSampleStreamsAndMaybeBounds( - theDb, - filter.FromDate, - filter.ToDate, - hosts, - recordFilter, - true, - jc.Verbose, - ) - if err != nil { - return fmt.Errorf("Failed to read log records: %v", err) - } - if jc.Verbose { - Log.Infof("%d records read + %d dropped\n", read, dropped) - UstrStats(out, false) + var needConfig = NeedsConfig(jobsFormatters, jc.PrintFields) + if needConfig && cfg == nil { + return fmt.Errorf("Configuration file required for relative format arguments") } - if jc.Verbose { - Log.Infof("Streams constructed by postprocessing: %d", len(streams)) - numSamples := 0 - for _, stream := range streams { - numSamples += len(*stream) + isMergeable := func(k sample.InputStreamKey) bool { + // TODO: Eventually we'll need to use the epoch here. We now have it. + var sys *config.NodeConfigRecord + if cfg != nil { + sys = cfg.LookupHost(k.Host.String()) } - Log.Infof("Samples retained after filtering: %d", numSamples) + return sys != nil && sys.CrossNodeJobs + } + + // We want to: + // + // - compute an "OR" of slurm jobs and sample jobs so that if a job ID is in either data set + // then the job is in the result + // - synthesize SampleJob data for slurm jobs without a corresponding sample job + // - compute relative fields for all the jobs in the set, which depends either on the allocation + // for the job or on the node's configuration + // - filter the resulting summaries + // + // This needs to be done in a particular order to work at all. + + sampleJobs, err := jc.findSampleJobs( + isMergeable, + theDb, + filter, + hosts, + recordFilter, + ) + if err != nil { + return err } - if NeedsConfig(jobsFormatters, jc.PrintFields) { - var err error - streams, err = EnsureConfigForInputStreams(cfg, streams, "relative format arguments") - if err != nil { - return err + slurmJobs, err := jc.findSlurmJobs(theDb, filter) + if err != nil { + return err + } + + var summaries []*jobSummary + if len(slurmJobs) > 0 { + // Join slurm jobs to sample jobs, synthesizing sample jobs from slurm jobs when there are + // no corresponding sample jobs. + // + // The sample data may contain samples for non-slurm jobs coming from both slurm and + // non-slurm nodes on slurm-managed clusters. In other words, the key for the join is not + // simply the JobId, but also one bit of information about slurm/non-slurm data. Both data + // sets must incorporate this bit, but for slurmJobs it is implicitly true always. The + // epoch field in sampleJobs will do this for us (for newer data). The epoch field further + // distinguishes non-slurm jobs from each other on the same host. + // + // It's unlikely that the epoch is ever the same on two different nodes, but it can happen + // if there's a power outage and the nodes come up at the same time (within the same + // second). So for non-zero epoch we must distinguish by node name. Fortunately, these + // nodes will all have hostname sets that have exactly one member. + // + // So the hash table probably has a + // This is not quite right because there can be non-slurm jobs on slurm nodes. The JobId is + // not unique. The sampleJobs are all unique. It is only when we join slurmJobs that we + // should find sampleJobs to attach them to, and when we synthesize a sampleJob it too must + // create a unique key. But the key must always be derivable from the data. + // + // On slurm systems, the JobId *is* unique for slurm jobs. + // + // Otherwise, the JobId is at least pid+some host set identifier+something to do with time - the + // epoch again plus the time cutoff for reuse? + // + // Uniqueness (in this case) is sufficient for the set of jobs we're processing. But since the + // key can be computed from the job data, it'll still be globally unique. But we can compress + // eg via hash table. + // + // We only need the map when there is stuff to join - ie, when we have any slurm jobs at all. + // But in that case there could also be non-slurm jobs in the mix because clusters can have + // non-slurm nodes. But those jobs can maybe be tagged specially? + // + // the job id really needs to incorporate an is-slurm-node or is-slurm-job bit + + // really the epoch will fit in 32 bits so this could be simpler + // epoch *must* be 0 for batch jobs + // plus the key is more complicated, it needs to include host + // the host set for each sampleJob (whether synthesized or not) with nonzero epoch + // must contain exactly one host + // we can hash this host for a serial number, or include the string here + // for batch jobs the host can be blank here, the host set may be complicated in + // any case. or we could use the ustr repr of it. + type key struct { + jobid uint32 + epoch uint64 + name string + } + var smap = make(map[uint32]*jobSummary) + for _, j := range sampleJobs { + smap[j.JobId] = &jobSummary{sampleJob: j, selected: true} + } + for _, j := range slurmJobs { + if probe := smap[j.Id]; probe != nil { + probe.slurmJob = j + } else { + smap[j.Id] = &jobSummary{ + sampleJob: jc.synthesizeSampleJob(j), + slurmJob: j, + selected: true, + } + } + } + summaries = slices.Collect(maps.Values(smap)) + } else { + summaries = make([]*jobSummary, 0, len(sampleJobs)) + for _, j := range sampleJobs { + summaries = append(summaries, &jobSummary{sampleJob: j, selected: true}) } } - summaries := jc.aggregateAndFilterJobs(cfg, theDb, streams, bounds) - if jc.Verbose { - Log.Infof("Jobs after aggregation filtering: %d", len(summaries)) + for _, j := range summaries { + jc.computeComputedFields(j, cfg) } - return jc.printJobSummaries(out, summaries) -} - -// Container for computations we would prefer not to do but will need to do if certain names are -// used for printing or in queries. - -type nameTester struct { - needCmd bool - needHosts bool - needJobAndMark bool - needSacctInfo bool -} - -func (nt *nameTester) testName(name string) { - switch name { - case "cmd", "Cmd": - nt.needCmd = true - case "host", "hosts", "Hosts": - nt.needHosts = true - case "jobm", "JobAndMark": - nt.needJobAndMark = true - case "Submit", "JobName", "State", "Account", "Layout", "Reservation", - "Partition", "RequestedGpus", "DiskReadAvgGB", "DiskWriteAvgGB", - "RequestedCpus", "RequestedMemGB", "RequestedNodes", "TimeLimit", - "ExitCode": - // Our names for the Slurm sacct data fields. Mostly these are the same as in the sacct - // data, but there's no shame in sticking to proper naming. - nt.needSacctInfo = true + if sampleFilter := jc.buildSampleFilter(cfg != nil); sampleFilter != nil { + summaries = slices.DeleteFunc(summaries, func(v *jobSummary) bool { + return !sampleFilter.apply(v) + }) } -} -// A sample stream is a quadruple (host, command, job-related-id, record-list). A stream is only -// ever about one job. There may be multiple streams per job, they will all have the same -// job-related-id which is unique but not necessarily equal to any field in any of the records. -// -// This function collects the data per job and returns a vector of (aggregate, records) pairs where -// the aggregate describes the job in aggregate and the records is a synthesized stream of sample -// records for the job, based on all the input streams for the job. The manner of the synthesis -// depends on arguments to the program: with --merge-all we merge across all hosts; with -// --merge-none we do not merge; otherwise the config file can specify the hosts to merge across; -// otherwise if there is no config we do not merge. - -func (jc *JobsCommand) aggregateAndFilterJobs( - cfg *config.ClusterConfig, - theDb db.SampleDataProvider, - streams sample.InputStreamSet, - bounds Timebounds, -) []*jobSummary { + var now = time.Now().UTC().Unix() - var anyMergeableNodes bool - if !jc.MergeNone && cfg != nil { - anyMergeableNodes = cfg.HasCrossNodeJobs() + for i := range summaries { + summaries[i].Now = now + mark := "" + flags := summaries[i].sampleJob.ComputedFlags + switch { + case flags&(samplejob.KIsLiveAtStart|samplejob.KIsLiveAtEnd) == (samplejob.KIsLiveAtStart | samplejob.KIsLiveAtEnd): + mark = "!" + case flags&samplejob.KIsLiveAtStart != 0: + mark = "<" + case flags&samplejob.KIsLiveAtEnd != 0: + mark = ">" + } + summaries[i].JobAndMark = fmt.Sprint(summaries[i].sampleJob.JobId, mark) } - var jobs sample.SampleStreams - if jc.MergeAll { - jobs, bounds = sample.MergeByJob(streams, bounds) - } else if anyMergeableNodes { - jobs, bounds = mergeAcrossSomeNodes(cfg, streams, bounds) - } else { - jobs = sample.MergeByHostAndJob(streams) + return jc.printJobSummaries(out, summaries) +} + +func (jc *JobsCommand) findSampleJobs( + isMergeable func(sample.InputStreamKey) bool, + theDb JobsDataProvider, + filter sample.QueryFilter, + hosts *Hosts, + recordFilter *sample.SampleFilter, +) ([]*samplejob.SampleJob, error) { + var merge samplejob.Merge + switch { + case jc.MergeAll: + merge = samplejob.MergeAll + case jc.MergeNone: + merge = samplejob.MergeNone + } + + sampleJobs, err := samplejob.Query( + theDb, + isMergeable, + filter.FromDate, + filter.ToDate, + hosts, + recordFilter, + false, + merge, + jc.Verbose, + ) + if err != nil { + return nil, err } if jc.Verbose { - Log.Infof("Jobs constructed by merging: %d", len(jobs)) + Log.Infof("Sample jobs after aggregation filtering: %d", len(sampleJobs)) } - summaryFilter, slurmFilter := jc.buildFilters(cfg) + return sampleJobs, nil +} - summaries := make([]*jobSummary, 0) - minSamples := jc.lookupUint("min-samples") - if jc.Verbose && minSamples > 1 { - Log.Infof("Excluding jobs with fewer than %d samples", minSamples) - } - nt := nameTester{ - needSacctInfo: slurmFilter != nil, - } - for _, f := range jc.PrintFields { - nt.testName(f.Name) - } - if jc.ParsedQuery != nil { - names := make(map[string]bool) - QueryNames(jc.ParsedQuery, names) - for name := range names { - nt.testName(name) - } - } - discarded := 0 - for _, job := range jobs { - if uint(len(*job)) >= minSamples { - host := (*job)[0].Hostname - jobId := (*job)[0].Job - user := (*job)[0].User - first := (*job)[0].Timestamp - last := (*job)[len(*job)-1].Timestamp - duration := last - first - aggregate := jc.aggregateJob(cfg, host, *job, nt.needCmd, nt.needHosts, jc.Zombie) - aggregate.computed[kDuration] = float64(duration) - usesGpu := !aggregate.Gpus.IsEmpty() - flags := 0 - if usesGpu { - flags |= kUsesGpu - } else { - flags |= kDoesNotUseGpu - } - if aggregate.GpuFail != 0 { - flags |= kGpuFail - } - bound, haveBound := bounds[host] - if !haveBound { - panic("Expected to find bound") - } - if first == bound.Earliest { - flags |= kIsLiveAtStart - } else { - flags |= kIsNotLiveAtStart - } - if last == bound.Latest { - flags |= kIsLiveAtEnd - } else { - flags |= kIsNotLiveAtEnd - } - if aggregate.IsZombie { - flags |= kIsZombie - } - jobAndMark := "" - if nt.needJobAndMark { - mark := "" - switch { - case flags&(kIsLiveAtStart|kIsLiveAtEnd) == (kIsLiveAtStart | kIsLiveAtEnd): - mark = "!" - case flags&kIsLiveAtStart != 0: - mark = "<" - case flags&kIsLiveAtEnd != 0: - mark = ">" - } - jobAndMark = fmt.Sprint(jobId, mark) - } - classification := 0 - if (flags & kIsLiveAtStart) != 0 { - classification |= sonalyze.LIVE_AT_START - } - if (flags & kIsLiveAtEnd) != 0 { - classification |= sonalyze.LIVE_AT_END - } - summary := &jobSummary{ - jobAggregate: aggregate, - JobId: jobId, - JobAndMark: jobAndMark, - User: user, - CpuTime: DurationValue(math.Round(aggregate.computed[kCpuPctAvg] * float64(duration) / 100)), - GpuTime: DurationValue(math.Round(aggregate.computed[kGpuPctAvg] * float64(kDuration) / 100)), - Duration: DurationValue(duration), - Now: DateTimeValue(now), - Start: DateTimeValue(first), - End: DateTimeValue(last), - selected: true, - Classification: classification, - job: *job, - computedFlags: flags, - } - if summaryFilter == nil || summaryFilter.apply(summary) { - summaries = append(summaries, summary) - } - } else { - discarded++ +func (jc *JobsCommand) findSlurmJobs( + theDb JobsDataProvider, + filter sample.QueryFilter, +) ( + []*slurmjob.SlurmJob, + error, +) { + slurmFilter := jc.buildSlurmFilter() + if slurmFilter == nil { + slurmFilter = &slurmjob.QueryFilter{} + } + + slurmJobs, err := slurmjob.Query( + theDb, + filter.FromDate, + filter.ToDate, + *slurmFilter, + jc.Verbose, + ) + if err != nil { + if jc.Verbose { + Log.Warningf("Slurm data query failed: %v", err) } + return nil, err } if jc.Verbose { - Log.Infof("Jobs discarded by aggregation filtering: %d", discarded) - } - - if nt.needSacctInfo { - // TODO: If we have slurm data then those data may have precise measurements for some of the - // fields here and we might use them instead. If so, do so here and not in printing, to - // avoid messiness vis-a-vis filtering. - - if slurmDb, ok := theDb.(db.SacctDataProvider); ok { - - var err error - - // Two things happen here: - // - // - attach slurm info to summaries we have - // - reduce the set of summaries we have by filtering on slurm information for those - // summaries that do have slurm information - // - // Importantly, the first step cannot incorporate the second step, because it is valid - // for a job in the first set to not have a slurm aspect. - // - // So: - // - // - compute a set A of SlurmJobs from the job IDs alone - // - then another smaller set B of SlurmJobs from A with the other filters - // - then A \ B is the set of jobs to remove from the list of summaries - // - and B is the set of jobs contributing info for the remaining jobs - - jobIds := make([]uint32, 0) - for _, summary := range summaries { - if summary.JobId != 0 { - jobIds = append(jobIds, summary.JobId) - } - } - - var ( - aJobs, bJobs []*slurmjob.SlurmJob - bMap map[uint32]*slurmjob.SlurmJob - ) - aJobs, err = slurmjob.Query( - slurmDb, - jc.FromDate, - jc.ToDate, - slurmjob.QueryFilter{ - Job: jobIds, - }, - jc.Verbose, - ) - if err != nil { - if jc.Verbose { - Log.Warningf("Slurm data query failed: %v", err) - } - // Oh well - return summaries - } - - if slurmFilter != nil { - var err error - bJobs, err = slurmjob.FilterJobs( - aJobs, - *slurmFilter, - jc.Verbose, - ) - if err != nil { - if jc.Verbose { - Log.Warningf("Slurm data filter failed (bizarrely): %v", err) - } - bJobs = aJobs - // Ignore it, fall through to attach job info - } else { - bMap = make(map[uint32]*slurmjob.SlurmJob) - for _, j := range bJobs { - bMap[j.Id] = j - } - cullSet := make(map[uint32]bool) - for _, a := range aJobs { - if bMap[a.Id] == nil { - cullSet[a.Id] = true - } - } - summaries = slices.DeleteFunc(summaries, func(s *jobSummary) bool { - return cullSet[s.JobId] - }) - } - } else { - bJobs = aJobs - } - - if bMap == nil { - bMap = make(map[uint32]*slurmjob.SlurmJob) - for _, j := range bJobs { - bMap[j.Id] = j - } - } - - for _, summary := range summaries { - if probe, found := bMap[summary.JobId]; found { - summary.sacctInfo = probe.Main // Hm - } - } - } else { - if jc.Verbose { - Log.Warningf("Needed slurm data but can't read those from transient cluster") - } - } + Log.Infof("Slurm jobs after aggregation filtering: %d", len(slurmJobs)) } - return summaries + return slurmJobs, nil } -// Look to the config to find nodes that have CrossNodeJobs set, and merge their streams as if by -// --merge-all; the remaining streams are merged as if by --merge-none, and the two sets of merged -// jobs are combined into one set. +var ( + pending = StringToUstr("PENDING") + running = StringToUstr("RUNNING") +) -func mergeAcrossSomeNodes( - cfg *config.ClusterConfig, - streams sample.InputStreamSet, - bounds Timebounds, -) (sample.SampleStreams, Timebounds) { - mergeable := make(sample.InputStreamSet) - mBounds := make(Timebounds) - solo := make(sample.InputStreamSet) - sBounds := make(Timebounds) - for k, v := range streams { - bound := bounds[k.Host] - if sys := cfg.LookupHost(k.Host.String()); sys != nil && sys.CrossNodeJobs { - mBounds[k.Host] = bound - mergeable[k] = v - } else { - sBounds[k.Host] = bound - solo[k] = v +// Synthesize a SampleJob from the SlurmJob to hold common data. +func (jc *JobsCommand) synthesizeSampleJob(j *slurmjob.SlurmJob) *samplejob.SampleJob { + var gpus gpuset.GpuSet + // Compute gpus from ReqGPUS. This is not so easy b/c it does not necessarily have indices, + // just device counts. We fake it. We use only the first element in the list because the + // meaning of the list is that it is ordered from highest to lowest precedence. Not obvious how + // to incorporate the other steps here. But since we take the value from the request and not + // from any kind of usage data, it's probably OK to just use Main. + if j.Main.ReqGPUS != UstrEmpty { + var v uint32 + a, _, _ := strings.Cut(j.Main.ReqGPUS.String(), ",") + _, x, _ := strings.Cut(a, "=") + n, err := strconv.ParseUint(x, 10, 64) + if err != nil { + n = 1 + } + for i := uint64(0); i < n; i++ { + gpus, _ = gpuset.Adjoin(gpus, v) + v++ } } - mergedJobs, mergedBounds := sample.MergeByJob(mergeable, mBounds) - otherJobs := sample.MergeByHostAndJob(solo) - mergedJobs = append(mergedJobs, otherJobs...) - for k, v := range sBounds { - mergedBounds[k] = v + var hosts *Hostnames = NewHostnames() + err := hosts.AddCompressed(j.Main.NodeList.String()) + if err != nil { + Log.Warningf("Bad node list from slurm data: %s", j.Main.NodeList.String()) } - return mergedJobs, mergedBounds -} - -// Given a list of log entries for a job, sorted ascending by timestamp and with no duplicated -// timestamps, return a JobAggregate for the job, with values that are computed from all log -// entries. - -func (jc *JobsCommand) aggregateJob( - cfg *config.ClusterConfig, - host Ustr, - job sample.SampleStream, - needCmd, needHosts, needZombie bool, -) jobAggregate { - gpus := gpuset.EmptyGpuSet() - var ( - gpuFail uint8 - cpuPctAvg, cpuPctPeak float64 - rCpuPctAvg, rCpuPctPeak float64 - cpuGBAvg, cpuGBPeak float64 - rCpuGBAvg, rCpuGBPeak float64 - gpuPctAvg, gpuPctPeak float64 - rGpuPctAvg, rGpuPctPeak float64 - sGpuPctAvg, sGpuPctPeak float64 - rssAnonGBAvg, rssAnonGBPeak float64 - rRssAnonGBAvg, rRssAnonGBPeak float64 - gpuGBAvg, gpuGBPeak float64 - rGpuGBAvg, rGpuGBPeak float64 - sGpuGBAvg, sGpuGBPeak float64 - isZombie bool - ) - const kb2gb = 1.0 / (1024 * 1024) - - for _, s := range job { - gpus = gpuset.UnionGpuSets(gpus, s.Gpus) - gpuFail = sample.MergeGpuFail(gpuFail, s.GpuFail) - cpuPctAvg += float64(s.CpuUtilPct) - cpuPctPeak = math.Max(cpuPctPeak, float64(s.CpuUtilPct)) - gpuPctAvg += float64(s.GpuPct) - gpuPctPeak = math.Max(gpuPctPeak, float64(s.GpuPct)) - cpuGBAvg += float64(s.CpuKB) * kb2gb - cpuGBPeak = math.Max(cpuGBPeak, float64(s.CpuKB)*kb2gb) - rssAnonGBAvg += float64(s.RssAnonKB) * kb2gb - rssAnonGBPeak = math.Max(rssAnonGBPeak, float64(s.RssAnonKB)*kb2gb) - gpuGBAvg += float64(s.GpuKB) * kb2gb - gpuGBPeak = math.Max(gpuGBPeak, float64(s.GpuKB)*kb2gb) - - if needZombie && !isZombie { - cmd := s.Cmd.String() - isZombie = strings.Contains(cmd, "") || strings.HasPrefix(cmd, "_zombie_") - } + var classification int + if j.Main.State == pending || j.Main.State == running { + classification |= sonalyze.LIVE_AT_END + } + var flags int + if (classification & sonalyze.LIVE_AT_END) != 0 { + flags |= samplejob.KIsLiveAtEnd + } else { + flags |= samplejob.KIsNotLiveAtEnd } - usesGpu := !gpus.IsEmpty() + if !gpus.IsEmpty() { + flags |= samplejob.KUsesGpu + } else { + flags |= samplejob.KDoesNotUseGpu + } + // Set SampleCount to 1000 b/c we want the synthesized job to pass any plausible min-samples + // filter. Note this value will be used as a divisor for various averages in + // computeComputedFields. + sampleCount := 1000 + adjustedSampleCount := float64(sampleCount) / float64(len(j.Steps)+1) + + cpuPctSum := float64(j.Main.AveCPU) + cpuPctMax := float64(j.Main.AveCPU) + cpuKBSum := uint64(j.Main.AveVMSize) + rssAnonKBSum := uint64(j.Main.AveRSS) + cpuKBMax := uint64(j.Main.MaxVMSize) + rssAnonKBMax := uint64(j.Main.MaxRSS) + for _, s := range j.Steps { + cpuPctSum += float64(s.AveCPU) + cpuPctMax = max(cpuPctMax, float64(s.AveCPU)) + cpuKBSum += uint64(s.AveVMSize) + rssAnonKBSum += uint64(s.AveRSS) + cpuKBMax = max(cpuKBMax, uint64(s.MaxVMSize)) + rssAnonKBMax = max(rssAnonKBMax, uint64(s.MaxRSS)) + } + + // Scale GB -> KB + cpuKBSum *= 1024 * 1024 + rssAnonKBSum *= 1024 * 1024 + cpuKBMax *= 1024 * 1024 + rssAnonKBMax *= 1024 * 1024 + + // Adjust according to sampleCount so that later averaging will work out + cpuPctSum *= adjustedSampleCount + cpuKBSum = uint64(float64(cpuKBSum) * adjustedSampleCount) + rssAnonKBSum = uint64(float64(rssAnonKBSum) * adjustedSampleCount) + + var sampleJob = &samplejob.SampleJob{ + // `GpuFail` is not computable + Gpus: gpus, + // `IsZombie` is not applicable + Cmd: j.Main.JobName.String(), + Hosts: hosts, + JobId: j.Id, + User: j.Main.User, + Duration: DurationValue(j.Main.ElapsedRaw), + Start: DateTimeValue(j.Main.Start), + End: DateTimeValue(j.Main.End), + // `Job` is not applicable. + SampleCount: uint64(sampleCount), + Classification: classification, + ComputedFlags: flags, + CpuPctSum: cpuPctSum, + CpuKBSum: cpuKBSum, + RssAnonKBSum: rssAnonKBSum, + // `GpuPctSum` is not applicable + // `GpuKBSum` is not applicable + CpuTime: DurationValue(j.Main.SystemCPU + j.Main.UserCPU), + // `GpuTime` is not applicable + CpuPctMax: cpuPctMax, + CpuKBMax: cpuKBMax, + RssAnonKBMax: rssAnonKBMax, + // `GpuPctMax` is not applicable + // `GpuKBMax` is not applicable + } + return sampleJob +} +// The computed fields are always relative to js.sampleJob but the available resources may be +// computed from the slurmJob as well (notably the allocation). + +func (jc *JobsCommand) computeComputedFields(js *jobSummary, cfg *config.ClusterConfig) { + j := js.sampleJob + js.computed[kDuration] = float64(j.Duration) + js.computed[kSampleCount] = float64(j.SampleCount) + + // What things mean: + // + // "Average cpu utilization" is the sum across all samples of cpu utilization (which can be + // >100% for individual samples in merged jobs, jobs with multiple threads, etc), divided by the + // number of samples. + // + // "Average relative cpu utilization" further divides that by the number of cores allocated (ie, + // "relative" is always "relative to allocated resources" or "a fraction of allocated resources"). + // + // If there's a peak here it would be the "peak cpu utilization" across the time series, ie, the + // peak observed value of the job's cpu utilization. Again this is a sum across streams and can + // easily be >100%. + // + // The "relative peak" divides the peak by the number of cores allocated. + + var rCpuPctAvg, rCpuPctPeak float64 + var rCpuGBAvg, rCpuGBPeak float64 + var rRssAnonGBAvg, rRssAnonGBPeak float64 + var rGpuPctAvg, rGpuPctPeak float64 + var rGpuGBAvg, rGpuGBPeak float64 + var sGpuPctAvg, sGpuPctPeak float64 + var sGpuGBAvg, sGpuGBPeak float64 + // Division by number of samples happens below, as necessary if cfg != nil { - if sys := cfg.LookupHost(host.String()); sys != nil { + if sys := jc.allocatedResources(js, cfg); sys != nil { // Quantities can be zero in surprising ways, so always guard divisions - if cores := float64(sys.CpuCores); cores > 0 { - rCpuPctAvg = cpuPctAvg / cores - rCpuPctPeak = cpuPctPeak / cores + if cores := float64(sys.cpuCores); cores > 0 { + rCpuPctAvg = j.CpuPctSum / cores + rCpuPctPeak = j.CpuPctMax / cores } - if memory := float64(sys.MemGB); memory > 0 { - rCpuGBAvg = (cpuGBAvg * 100) / memory - rCpuGBPeak = (cpuGBPeak * 100) / memory - rRssAnonGBAvg = (rssAnonGBAvg * 100) / memory - rRssAnonGBPeak = (rssAnonGBPeak * 100) / memory + if memory := float64(sys.memGB); memory > 0 { + rCpuGBAvg = float64(j.CpuKBSum*100) / memory / (1024 * 1024) + rCpuGBPeak = float64(j.CpuKBMax*100) / memory / (1024 * 1024) + rRssAnonGBAvg = float64(j.RssAnonKBSum*100) / memory / (1024 * 1024) + rRssAnonGBPeak = float64(j.RssAnonKBMax*100) / memory / (1024 * 1024) } - if gpuCards := float64(sys.GpuCards); gpuCards > 0 { - rGpuPctAvg = gpuPctAvg / gpuCards - rGpuPctPeak = gpuPctPeak / gpuCards + if gpuCards := float64(sys.gpuCards); gpuCards > 0 { + rGpuPctAvg = j.GpuPctSum / gpuCards + rGpuPctPeak = j.GpuPctMax / gpuCards } - if gpuMemory := float64(sys.GpuMemGB); gpuMemory > 0 { - // As we have a config, logclean will have computed proper GPU memory values for the - // job, so we need not look to sys.GpuMemPct here. - rGpuGBAvg = (gpuGBAvg * 100) / gpuMemory - rGpuGBPeak = (gpuGBPeak * 100) / gpuMemory + if gpuMemory := float64(sys.gpuMemGB); gpuMemory > 0 { + rGpuGBAvg = float64(j.GpuKBSum*100) / gpuMemory / (1024 * 1024) + rGpuGBPeak = float64(j.GpuKBMax*100) / gpuMemory / (1024 * 1024) } - if usesGpu && !gpus.IsUnknown() { - nCards := float64(gpus.Size()) - sGpuPctAvg = gpuPctAvg / nCards - sGpuPctPeak = gpuPctPeak / nCards - if gpuCards := float64(sys.GpuCards); gpuCards > 0 { - if gpuMemory := float64(sys.GpuMemGB); gpuMemory > 0 { + if !js.sampleJob.Gpus.IsUnknown() && !js.sampleJob.Gpus.IsEmpty() { + nCards := float64(js.sampleJob.Gpus.Size()) + sGpuPctAvg = j.GpuPctSum / nCards + sGpuPctPeak = j.GpuPctMax / nCards + if gpuCards := float64(sys.gpuCards); gpuCards > 0 { + if gpuMemory := float64(sys.gpuMemGB); gpuMemory > 0 { jobGpuGB := nCards * (gpuMemory / gpuCards) - sGpuGBAvg = (gpuGBAvg * 100) / jobGpuGB - sGpuGBPeak = (gpuGBPeak * 100) / jobGpuGB + sGpuGBAvg = float64(j.GpuKBSum*100) / jobGpuGB / (1024 * 1024) + sGpuGBPeak = float64(j.GpuKBMax*100) / jobGpuGB / (1024 * 1024) } } } } } - cmd := "" - if needCmd { - names := make(map[Ustr]bool) - for _, sample := range job { - if _, found := names[sample.Cmd]; found { - continue - } - if cmd != "" { - cmd += ", " - } - cmd += sample.Cmd.String() - names[sample.Cmd] = true - } - } + n := float64(j.SampleCount) + js.computed[kCpuPctAvg] = j.CpuPctSum / n + js.computed[kCpuPctPeak] = j.CpuPctMax + js.computed[kRcpuPctAvg] = rCpuPctAvg / n + js.computed[kRcpuPctPeak] = rCpuPctPeak + + js.computed[kCpuGBAvg] = float64(j.CpuKBSum) / n / (1024 * 1024) + js.computed[kCpuGBPeak] = float64(j.CpuKBMax) / (1024 * 1024) + js.computed[kRcpuGBAvg] = rCpuGBAvg / n + js.computed[kRcpuGBPeak] = rCpuGBPeak + + js.computed[kRssAnonGBAvg] = float64(j.RssAnonKBSum) / n / (1024 * 1024) + js.computed[kRssAnonGBPeak] = float64(j.RssAnonKBMax) / (1024 * 1024) + js.computed[kRrssAnonGBAvg] = rRssAnonGBAvg / n + js.computed[kRrssAnonGBPeak] = rRssAnonGBPeak + + js.computed[kGpuPctAvg] = j.GpuPctSum / n + js.computed[kGpuPctPeak] = j.GpuPctMax + js.computed[kRgpuPctAvg] = rGpuPctAvg / n + js.computed[kRgpuPctPeak] = rGpuPctPeak + js.computed[kSgpuPctAvg] = sGpuPctAvg / n + js.computed[kSgpuPctPeak] = sGpuPctPeak + + js.computed[kGpuGBAvg] = float64(j.GpuKBSum) / n / (1024 * 1024) + js.computed[kGpuGBPeak] = float64(j.GpuKBMax) / (1024 * 1024) + js.computed[kRgpuGBAvg] = rGpuGBAvg / n + js.computed[kRgpuGBPeak] = rGpuGBPeak + js.computed[kSgpuGBAvg] = sGpuGBAvg / n + js.computed[kSgpuGBPeak] = sGpuGBPeak +} - var hosts *Hostnames - if needHosts { - hosts = NewHostnames() - for _, s := range job { - hosts.Add(s.Hostname.String()) +// If there is a single host then we just get the config's data. For multiple hosts we sum those +// data. We don't cache anything now b/c the underlying config code has a hashmap already. +// +// TODO: In the future, for jobs with a slurm aspect, we'll instead return the allocated resources, +// when available, falling back on config values when necessary. In the future after that, we're +// not going to have a config in the static sense, but a sysinfo blob that was valid at the time of +// the sample. That can be lazily computed and for slurm jobs it may not need to be computed at +// all. + +func (jc *JobsCommand) allocatedResources(js *jobSummary, cfg *config.ClusterConfig) *hostResources { + sum := new(hostResources) + for name := range js.sampleJob.Hosts.FullNames { + if sys := cfg.LookupHost(name); sys != nil { + sum.cpuCores += sys.CpuCores + sum.memGB += sys.MemGB + sum.gpuCards += sys.GpuCards + sum.gpuMemGB += sys.GpuMemGB } } - n := float64(len(job)) - a := jobAggregate{ - Gpus: gpus, - GpuFail: int(gpuFail), - Cmd: cmd, - Hosts: hosts, - IsZombie: isZombie, - } - a.computed[kCpuPctAvg] = cpuPctAvg / n - a.computed[kCpuPctPeak] = cpuPctPeak - a.computed[kRcpuPctAvg] = rCpuPctAvg / n - a.computed[kRcpuPctPeak] = rCpuPctPeak - - a.computed[kCpuGBAvg] = cpuGBAvg / n - a.computed[kCpuGBPeak] = cpuGBPeak - a.computed[kRcpuGBAvg] = rCpuGBAvg / n - a.computed[kRcpuGBPeak] = rCpuGBPeak - - a.computed[kRssAnonGBAvg] = rssAnonGBAvg / n - a.computed[kRssAnonGBPeak] = rssAnonGBPeak - a.computed[kRrssAnonGBAvg] = rRssAnonGBAvg / n - a.computed[kRrssAnonGBPeak] = rRssAnonGBPeak - - a.computed[kGpuPctAvg] = gpuPctAvg / n - a.computed[kGpuPctPeak] = gpuPctPeak - a.computed[kRgpuPctAvg] = rGpuPctAvg / n - a.computed[kRgpuPctPeak] = rGpuPctPeak - a.computed[kSgpuPctAvg] = sGpuPctAvg / n - a.computed[kSgpuPctPeak] = sGpuPctPeak - - a.computed[kGpuGBAvg] = gpuGBAvg / n - a.computed[kGpuGBPeak] = gpuGBPeak - a.computed[kRgpuGBAvg] = rGpuGBAvg / n - a.computed[kRgpuGBPeak] = rGpuGBPeak - a.computed[kSgpuGBAvg] = sGpuGBAvg / n - a.computed[kSgpuGBPeak] = sGpuGBPeak - - return a + return sum } -// Aggregation filters. -// // Filtering is mostly wasted work. Very frequently, all the filters will pass because the coarse // filtering (job number, user, command, host) has been applied already and most of the filters // applied to the aggregate are not very interesting to many users and will not be used to reject @@ -623,13 +542,13 @@ type filterVal struct { ix int } -type aggregationFilter struct { +type sampleFilter struct { minFilters []filterVal maxFilters []filterVal flags int } -func (f *aggregationFilter) apply(s *jobSummary) bool { +func (f *sampleFilter) apply(s *jobSummary) bool { for _, v := range f.minFilters { if s.computed[v.ix] < v.limit { return false @@ -640,17 +559,15 @@ func (f *aggregationFilter) apply(s *jobSummary) bool { return false } } - return (f.flags & s.computedFlags) == f.flags + return (f.flags & s.sampleJob.ComputedFlags) == f.flags } -func (jc *JobsCommand) buildFilters( - cfg *config.ClusterConfig, -) (*aggregationFilter, *slurmjob.QueryFilter) { +func (jc *JobsCommand) buildSampleFilter(allowRelative bool) *sampleFilter { minFilters := make([]filterVal, 0) maxFilters := make([]filterVal, 0) for _, v := range uintArgs { - if v.aggregateIx != -1 && (cfg != nil || !v.relative) { + if v.aggregateIx != -1 && (allowRelative || !v.relative) { val := jc.lookupUint(v.name) if strings.HasPrefix(v.name, "min-") && val != 0 { if jc.Verbose { @@ -678,44 +595,44 @@ func (jc *JobsCommand) buildFilters( // computed flags. flags := 0 if jc.NoGpu { - flags |= kDoesNotUseGpu + flags |= samplejob.KDoesNotUseGpu } if jc.SomeGpu { - flags |= kUsesGpu + flags |= samplejob.KUsesGpu } if jc.Completed { - flags |= kIsNotLiveAtEnd + flags |= samplejob.KIsNotLiveAtEnd } if jc.Running { - flags |= kIsLiveAtEnd + flags |= samplejob.KIsLiveAtEnd } if jc.Zombie { - flags |= kIsZombie + flags |= samplejob.KIsZombie } if jc.Verbose && flags != 0 { Log.Infof("Flag-filtering (UTSL): %x", flags) } - var summaryFilter *aggregationFilter - var slurmFilter *slurmjob.QueryFilter - - if len(minFilters) > 0 || len(maxFilters) > 0 || flags != 0 { - summaryFilter = &aggregationFilter{ - minFilters, - maxFilters, - flags, - } + if len(minFilters) == 0 && len(maxFilters) == 0 && flags == 0 { + return nil } - if len(jc.Partition)+len(jc.Reservation)+len(jc.Account)+len(jc.State)+len(jc.GpuType) > 0 { - slurmFilter = &slurmjob.QueryFilter{ - Account: jc.Account, - Partition: jc.Partition, - Reservation: jc.Reservation, - GpuType: jc.GpuType, - State: jc.State, - } + return &sampleFilter{ + minFilters, + maxFilters, + flags, } +} - return summaryFilter, slurmFilter +func (jc *JobsCommand) buildSlurmFilter() *slurmjob.QueryFilter { + if len(jc.Partition)+len(jc.Reservation)+len(jc.Account)+len(jc.State)+len(jc.GpuType) == 0 { + return nil + } + return &slurmjob.QueryFilter{ + Account: jc.Account, + Partition: jc.Partition, + Reservation: jc.Reservation, + GpuType: jc.GpuType, + State: jc.State, + } } diff --git a/code/sonalyze/cmd/jobs/print.go b/code/sonalyze/cmd/jobs/print.go index 71673061..3215cf63 100644 --- a/code/sonalyze/cmd/jobs/print.go +++ b/code/sonalyze/cmd/jobs/print.go @@ -15,16 +15,18 @@ import ( package jobs +import "sonalyze/data/samplejob" + %% FIELDS *jobSummary JobAndMark string desc:"Job ID with mark indicating job running at start+end (!), start (<), or end (>) of time window" alias:"jobm" - Job uint32 desc:"Job ID" alias:"job" field:"JobId" - User Ustr desc:"Name of user running the job" alias:"user" - Duration DurationValue desc:"Time of last observation minus time of first" alias:"duration" - Start DateTimeValue desc:"Time of first observation" alias:"start" - End DateTimeValue desc:"Time of last observation" alias:"end" + Job uint32 desc:"Job ID" alias:"job" field:"JobId" indirect:"sampleJob" + User Ustr desc:"Name of user running the job" alias:"user" indirect:"sampleJob" + Duration DurationValue desc:"Time of last observation minus time of first" alias:"duration" indirect:"sampleJob" + Start DateTimeValue desc:"Time of first observation" alias:"start" indirect:"sampleJob" + End DateTimeValue desc:"Time of last observation" alias:"end" indirect:"sampleJob" CpuAvgPct F64Ceil desc:"Average CPU utilization in percent (100% = 1 core)" field:"computed[kCpuPctAvg]" alias:"cpu-avg" CpuPeakPct F64Ceil desc:"Peak CPU utilization in percent (100% = 1 core)" field:"computed[kCpuPctPeak]" alias:"cpu-peak" RelativeCpuAvgPct F64Ceil desc:"Average relative CPU utilization in percent (100% = all cores)" field:"computed[kRcpuPctAvg]" alias:"rcpu-avg" @@ -67,52 +69,52 @@ FIELDS *jobSummary OccupiedRelativeGpuMemPeakPct \ F64Ceil desc:"Peak relative GPU resident memory utilization in percent (100% = all GPU RAM on cards used by job)" \ field:"computed[kSgpuGBPeak]" alias:"sgpumem-peak" - Gpus gpuset.GpuSet desc:"GPU device numbers used by the job, 'none' if none or 'unknown' in error states" alias:"gpus" - GpuFail int desc:"Flag indicating GPU status (0=Ok, 1=Failing)" alias:"gpufail" - Cmd string desc:"The commands invoking the processes of the job" alias:"cmd" - Hosts *Hostnames desc:"List of the host name(s) running the job" alias:"host,hosts" + Gpus gpuset.GpuSet desc:"GPU device numbers used by the job, 'none' if none or 'unknown' in error states" alias:"gpus" indirect:"sampleJob" + GpuFail int desc:"Flag indicating GPU status (0=Ok, 1=Failing)" alias:"gpufail" indirect:"sampleJob" + Cmd string desc:"The commands invoking the processes of the job" alias:"cmd" indirect:"sampleJob" + Hosts *Hostnames desc:"List of the host name(s) running the job" alias:"host,hosts" indirect:"sampleJob" Now DateTimeValue desc:"The current time" alias:"now" - Classification int desc:"Bit vector of live-at-start (2) and live-at-end (1) flags" alias:"classification" - CpuTime DurationValue desc:"Total CPU time of the job across all cores" alias:"cputime" - GpuTime DurationValue desc:"Total GPU time of the job across all cards" alias:"gputime" + Classification int desc:"Bit vector of live-at-start (2) and live-at-end (1) flags" alias:"classification" indirect:"sampleJob" + CpuTime DurationValue desc:"Total CPU time of the job across all cores" alias:"cputime" indirect:"sampleJob" + GpuTime DurationValue desc:"Total GPU time of the job across all cards" alias:"gputime" indirect:"sampleJob" # The expressions extracting bit flags happen to work for well-understood reasons, but this is # brittle and works in Go only because the operator precedence is right (in C it would not work). # See TODO in generate-table/README.md. SomeGpu bool desc:"True iff process was seen to use some GPU" \ - field:"computedFlags & kUsesGpu != 0" + field:"ComputedFlags & samplejob.KUsesGpu != 0" indirect:"sampleJob" NoGpu bool desc:"True iff process was seen to use no GPU" \ - field:"computedFlags & kDoesNotUseGpu != 0" + field:"ComputedFlags & samplejob.KDoesNotUseGpu != 0" indirect:"sampleJob" Running bool desc:"True iff process appears to still be running at end of time window" \ - field:"computedFlags & kIsLiveAtEnd != 0" + field:"ComputedFlags & samplejob.KIsLiveAtEnd != 0" indirect:"sampleJob" Completed bool desc:"True iff process appears not to be running at end of time window" \ - field:"computedFlags & kIsNotLiveAtEnd != 0" + field:"ComputedFlags & samplejob.KIsNotLiveAtEnd != 0" indirect:"sampleJob" Zombie bool desc:"True iff the process looks like a zombie" \ - field:"computedFlags & kIsZombie != 0" + field:"ComputedFlags & samplejob.KIsZombie != 0" indirect:"sampleJob" Primordial bool desc:"True iff the process appears to have been alive at the start of the time window" \ - field:"computedFlags & kIsLiveAtStart != 0" + field:"ComputedFlags & samplejob.KIsLiveAtStart != 0" indirect:"sampleJob" BornLater bool desc:"True iff the process appears not to have been alive at the start of the time window" \ - field:"computedFlags & kIsNotLiveAtStart != 0" + field:"ComputedFlags & samplejob.KIsNotLiveAtStart != 0" indirect:"sampleJob" - # NOTE! The slurm fields (via *sacctInfo) are checked for in perform.go. We can add more slurm + # NOTE! The slurm fields (via *slurmJob) are checked for in perform.go. We can add more slurm # fields here but if so they must also be added there. - Submit DateTimeValue desc:"Submit time of job (Slurm)" indirect:"sacctInfo" - JobName Ustr desc:"Name of job (Slurm)" indirect:"sacctInfo" - State Ustr desc:"Completion state of job (Slurm)" indirect:"sacctInfo" - Account Ustr desc:"Name of job's account (Slurm)" indirect:"sacctInfo" - Layout Ustr desc:"Layout spec of job (Slurm)" indirect:"sacctInfo" - Reservation Ustr desc:"Name of job's reservation (Slurm)" indirect:"sacctInfo" - Partition Ustr desc:"Partition of job (Slurm)" indirect:"sacctInfo" - RequestedGpus Ustr desc:"Names of requested GPUs (Slurm AllocTRES)" indirect:"sacctInfo" field:"ReqGPUS" - DiskReadAvgGB uint32 desc:"Average disk read activity in GB/s (Slurm AveDiskRead)" indirect:"sacctInfo" field:"AveDiskRead" - DiskWriteAvgGB uint32 desc:"Average disk write activity in GB/s (Slurm AveDiskWrite)" indirect:"sacctInfo" field:"AveDiskWrite" - RequestedCpus uint32 desc:"Number of requested CPUs (Slurm)" indirect:"sacctInfo" field:"ReqCPUS" - RequestedMemGB uint32 desc:"Requested memory (Slurm)" indirect:"sacctInfo" field:"ReqMem" - RequestedNodes uint32 desc:"Number of requested nodes (Slurm)" indirect:"sacctInfo" field:"ReqNodes" - TimeLimit U32Duration desc:"Elapsed time limit (Slurm)" indirect:"sacctInfo" field:"TimelimitRaw" - ExitCode uint8 desc:"Exit code of job (Slurm)" indirect:"sacctInfo" + Submit DateTimeValue desc:"Submit time of job (Slurm)" indirect:"slurmJob.Main" + JobName Ustr desc:"Name of job (Slurm)" indirect:"slurmJob.Main" + State Ustr desc:"Completion state of job (Slurm)" indirect:"slurmJob.Main" + Account Ustr desc:"Name of job's account (Slurm)" indirect:"slurmJob.Main" + Layout Ustr desc:"Layout spec of job (Slurm)" indirect:"slurmJob.Main" + Reservation Ustr desc:"Name of job's reservation (Slurm)" indirect:"slurmJob.Main" + Partition Ustr desc:"Partition of job (Slurm)" indirect:"slurmJob.Main" + RequestedGpus Ustr desc:"Names of requested GPUs (Slurm AllocTRES)" indirect:"slurmJob.Main" field:"ReqGPUS" + DiskReadAvgGB uint32 desc:"Average disk read activity in GB/s (Slurm AveDiskRead)" indirect:"slurmJob.Main" field:"AveDiskRead" + DiskWriteAvgGB uint32 desc:"Average disk write activity in GB/s (Slurm AveDiskWrite)" indirect:"slurmJob.Main" field:"AveDiskWrite" + RequestedCpus uint32 desc:"Number of requested CPUs (Slurm)" indirect:"slurmJob.Main" field:"ReqCPUS" + RequestedMemGB uint32 desc:"Requested memory (Slurm)" indirect:"slurmJob.Main" field:"ReqMem" + RequestedNodes uint32 desc:"Number of requested nodes (Slurm)" indirect:"slurmJob.Main" field:"ReqNodes" + TimeLimit U32Duration desc:"Elapsed time limit (Slurm)" indirect:"slurmJob.Main" field:"TimelimitRaw" + ExitCode uint8 desc:"Exit code of job (Slurm)" indirect:"slurmJob.Main" SUMMARY JobsCommand @@ -201,9 +203,9 @@ func (jc *JobsCommand) printJobSummaries(out io.Writer, summaries []*jobSummary) // Sort ascending by lowest beginning timestamp, and if those are equal, by job number. slices.SortStableFunc(summaries, func(a, b *jobSummary) int { - c := cmp.Compare(a.Start, b.Start) + c := cmp.Compare(a.sampleJob.Start, b.sampleJob.Start) if c == 0 { - c = cmp.Compare(a.JobId, b.JobId) + c = cmp.Compare(a.sampleJob.JobId, b.sampleJob.JobId) } return c }) @@ -217,7 +219,7 @@ func (jc *JobsCommand) printJobSummaries(out io.Writer, summaries []*jobSummary) } counts := make(map[Ustr]uint) for i := len(summaries) - 1; i >= 0; i-- { - u := summaries[i].job[0].User + u := summaries[i].sampleJob.User c := counts[u] + 1 counts[u] = c if c > jc.NumJobs { diff --git a/code/sonalyze/cmd/load/perform.go b/code/sonalyze/cmd/load/perform.go index 66d12ac2..0b532674 100644 --- a/code/sonalyze/cmd/load/perform.go +++ b/code/sonalyze/cmd/load/perform.go @@ -55,7 +55,7 @@ func (lc *LoadCommand) Perform( if NeedsConfig(loadFormatters, lc.PrintFields) { var err error - streams, err = EnsureConfigForInputStreams(cfg, streams, "relative format arguments") + err = EnsureConfigForInputStreams(cfg, streams, "relative format arguments") if err != nil { return err } diff --git a/code/sonalyze/data/samplejob/samplejob.go b/code/sonalyze/data/samplejob/samplejob.go new file mode 100644 index 00000000..79b757bc --- /dev/null +++ b/code/sonalyze/data/samplejob/samplejob.go @@ -0,0 +1,374 @@ +// A materialized table of jobs constructed from one or more sample streams. Input streams are +// partitioned into mergeable sets (based on flags and mergeability) and then each such set is +// aggregated into a coherent "job". +// +// TODO: Presently we construct it afresh every time, but once a job has completed the values will +// not change (for a given record filter), and the results could be stored in the database, esp if +// the sample stream is not needed by the client, as it is usually not. However, the exact set of +// input streams and the input flags both play a role in how merging is performed and will affect +// how we can cache anything. +package samplejob + +import ( + "fmt" + "math" + "strings" + "time" + + "go-utils/gpuset" + "go-utils/sonalyze" + . "sonalyze/common" + "sonalyze/data/sample" + "sonalyze/db" + . "sonalyze/table" +) + +// Computed flag bits in SampleJob.ComputedFlags +const ( + KUsesGpu = (1 << iota) // True if there's reason to believe a gpu was used by job + KDoesNotUseGpu // Opposite + KGpuFail // GPU failed + KIsLiveAtStart // Job had record at earliest timestamp of input set for host + KIsNotLiveAtStart // Opposite + KIsLiveAtEnd // Job had record at latest timestamp of input set for host + KIsNotLiveAtEnd // Opposite + KIsZombie // Command contains or user starts with _zombie_ +) + +// Aggregate figures for a job synthesized from a set of sample streams. +type SampleJob struct { + // Nonzero if any gpu in the job had a failure + GpuFail int + + // Gpus involved in the job + Gpus gpuset.GpuSet + + // True if any process in the job is believed to be a zombie + IsZombie bool + + // Combined command line of processes in the job + Cmd string + + // Combined host set of the job + Hosts *Hostnames + + // Job's primary ID + JobId uint32 + + // Job's epoch + Epoch uint64 + + // User running the job + User Ustr + + // Real time taken by the job + Duration DurationValue + + // Earliest time seen for the job + Start DateTimeValue + + // Latest time ditto + End DateTimeValue + + // The list of sample records, may be shared with the database layer, may be nil. + Job sample.SampleStream + + // The number of samples (even if Job is nil) + SampleCount uint64 + + // Bitwise 'or' of sonalyze.LIVE_AT_START and sonalyze.LIVE_AT_END + Classification int + + // Bitwise 'or' of flag values above + ComputedFlags int + + // Sum of CPU utilization figures across time steps, 1 core = 100.0 + CpuPctSum float64 + + // Sum of main memory in use across time steps + CpuKBSum uint64 + + // Sum of resident main memory across time steps + RssAnonKBSum uint64 + + // Sum of GPU utilization figures across time steps, 1 card = 100.0 + GpuPctSum float64 + + // Sum of GPU memory use across time steps + GpuKBSum uint64 + + // Aggregated CPU time for the job, based on utilization data + CpuTime DurationValue + + // Aggregated GPU compute time for the job, based on utilization data + GpuTime DurationValue + + // Maximum CPU utilization across time steps + CpuPctMax float64 + + // Maximum main memory in use across time steps + CpuKBMax uint64 + + // Maximum resident memory use across time steps + RssAnonKBMax uint64 + + // Maximum GPU utilization across time steps + GpuPctMax float64 + + // Maximum GPU memory use across time steps + GpuKBMax uint64 +} + +type Merge int + +const ( + MergeDefault = 0 + MergeAll = 1 + MergeNone = 2 +) + +// Query the materialized jobs with a filter. +// +// TODO: This interface is just too weird. But it works. +func Query( + theLog db.ProcessSampleDataProvider, + isMergeable func(sample.InputStreamKey) bool, + fromDate time.Time, + toDate time.Time, + hosts *Hosts, + recordFilter *sample.SampleFilter, + needRecords bool, + merge Merge, + verbose bool, +) ( + []*SampleJob, + error, +) { + streams, bounds, read, dropped, err := + sample.ReadSampleStreamsAndMaybeBounds( + theLog, + fromDate, + toDate, + hosts, + recordFilter, + true, + verbose, + ) + if err != nil { + return nil, fmt.Errorf("Failed to read log records: %v", err) + } + if verbose { + Log.Infof("%d records read + %d dropped\n", read, dropped) + } + if verbose { + Log.Infof("Streams constructed by postprocessing: %d", len(streams)) + numSamples := 0 + for _, stream := range streams { + numSamples += len(*stream) + } + Log.Infof("Samples retained after filtering: %d", numSamples) + } + + summaries := aggregateAndFilterJobs(isMergeable, streams, bounds, needRecords, merge, verbose) + if verbose { + Log.Infof("Jobs after aggregation filtering: %d", len(summaries)) + } + return summaries, nil +} + +func aggregateAndFilterJobs( + isMergeable func(sample.InputStreamKey) bool, + streams sample.InputStreamSet, + bounds Timebounds, + needRecords bool, + merge Merge, + verbose bool, +) []*SampleJob { + var anyMergeableNodes bool + if merge != MergeNone && isMergeable != nil { + for k := range streams { + anyMergeableNodes = isMergeable(k) + if anyMergeableNodes { + break + } + } + } + + var jobs sample.SampleStreams + if merge == MergeAll { + jobs, bounds = sample.MergeByJob(streams, bounds) + } else if anyMergeableNodes { + jobs, bounds = mergeAcrossSomeNodes(isMergeable, streams, bounds) + } else { + jobs = sample.MergeByHostAndJob(streams) + } + if verbose { + Log.Infof("Jobs constructed by merging: %d", len(jobs)) + } + + summaries := make([]*SampleJob, 0) + for _, job := range jobs { + host := (*job)[0].Hostname + jobId := (*job)[0].Job + epoch := (*job)[0].Epoch + user := (*job)[0].User + first := (*job)[0].Timestamp + last := (*job)[len(*job)-1].Timestamp + duration := last - first + aggregate := computeAggregate(host, *job) + usesGpu := !aggregate.Gpus.IsEmpty() + flags := 0 + if usesGpu { + flags |= KUsesGpu + } else { + flags |= KDoesNotUseGpu + } + if aggregate.GpuFail != 0 { + flags |= KGpuFail + } + bound, haveBound := bounds[host] + if !haveBound { + panic("Expected to find bound") + } + if first == bound.Earliest { + flags |= KIsLiveAtStart + } else { + flags |= KIsNotLiveAtStart + } + if last == bound.Latest { + flags |= KIsLiveAtEnd + } else { + flags |= KIsNotLiveAtEnd + } + if aggregate.IsZombie { + flags |= KIsZombie + } + classification := 0 + if (flags & KIsLiveAtStart) != 0 { + classification |= sonalyze.LIVE_AT_START + } + if (flags & KIsLiveAtEnd) != 0 { + classification |= sonalyze.LIVE_AT_END + } + aggregate.JobId = jobId + aggregate.Epoch = epoch + aggregate.User = user + aggregate.CpuTime = DurationValue(math.Round(aggregate.CpuPctSum * float64(duration) / 100)) + aggregate.GpuTime = DurationValue(math.Round(aggregate.GpuPctSum * float64(duration) / 100)) + aggregate.Duration = DurationValue(duration) + aggregate.Start = DateTimeValue(first) + aggregate.End = DateTimeValue(last) + aggregate.Classification = classification + if needRecords { + aggregate.Job = *job + } + aggregate.ComputedFlags = flags + summaries = append(summaries, aggregate) + } + return summaries +} + +// Given a list of samples, sorted ascending by timestamp and with no duplicated timestamps, return +// a partial SampleJob, with values that are aggregated from all the samples. +func computeAggregate( + host Ustr, + job sample.SampleStream, +) *SampleJob { + gpus := gpuset.EmptyGpuSet() + var ( + gpuFail uint8 + cpuPctSum, cpuPctMax float64 + cpuKBSum, cpuKBMax uint64 + gpuPctSum, gpuPctMax float64 + rssAnonKBSum, rssAnonKBMax uint64 + gpuKBSum, gpuKBMax uint64 + isZombie bool + ) + for _, s := range job { + gpus = gpuset.UnionGpuSets(gpus, s.Gpus) + gpuFail = sample.MergeGpuFail(gpuFail, s.GpuFail) + cpuPctSum += float64(s.CpuUtilPct) + cpuPctMax = max(cpuPctMax, float64(s.CpuUtilPct)) + gpuPctSum += float64(s.GpuPct) + gpuPctMax = max(gpuPctMax, float64(s.GpuPct)) + cpuKBSum += s.CpuKB + cpuKBMax = max(cpuKBMax, s.CpuKB) + rssAnonKBSum += s.RssAnonKB + rssAnonKBMax = max(rssAnonKBMax, s.RssAnonKB) + gpuKBSum += s.GpuKB + gpuKBMax = max(gpuKBMax, s.GpuKB) + + if !isZombie { + cmd := s.Cmd.String() + isZombie = strings.Contains(cmd, "") || strings.HasPrefix(cmd, "_zombie_") + } + } + + cmd := "" + names := make(map[Ustr]bool) + for _, sample := range job { + if _, found := names[sample.Cmd]; found { + continue + } + if cmd != "" { + cmd += ", " + } + cmd += sample.Cmd.String() + names[sample.Cmd] = true + } + + var hosts = NewHostnames() + for _, s := range job { + hosts.Add(s.Hostname.String()) + } + a := SampleJob{ + Gpus: gpus, + GpuFail: int(gpuFail), + Cmd: cmd, + Hosts: hosts, + IsZombie: isZombie, + CpuPctSum: cpuPctSum, + CpuPctMax: cpuPctMax, + CpuKBSum: cpuKBSum, + CpuKBMax: cpuKBMax, + RssAnonKBSum: rssAnonKBSum, + RssAnonKBMax: rssAnonKBMax, + GpuPctSum: gpuPctSum, + GpuPctMax: gpuPctMax, + GpuKBSum: gpuKBSum, + GpuKBMax: gpuKBMax, + SampleCount: uint64(len(job)), + } + + return &a +} + +// Merge mergeable streams as if by --merge-all; the remaining streams are merged as if by +// --merge-none, and the two sets of merged streams are combined into one set. +func mergeAcrossSomeNodes( + isMergeable func(sample.InputStreamKey) bool, + streams sample.InputStreamSet, + bounds Timebounds, +) (sample.SampleStreams, Timebounds) { + mergeable := make(sample.InputStreamSet) + mBounds := make(Timebounds) + solo := make(sample.InputStreamSet) + sBounds := make(Timebounds) + for k, s := range streams { + bound := bounds[k.Host] + if isMergeable(k) { + mBounds[k.Host] = bound + mergeable[k] = s + } else { + sBounds[k.Host] = bound + solo[k] = s + } + } + mergedJobs, mergedBounds := sample.MergeByJob(mergeable, mBounds) + otherJobs := sample.MergeByHostAndJob(solo) + mergedJobs = append(mergedJobs, otherJobs...) + for k, v := range sBounds { + mergedBounds[k] = v + } + return mergedJobs, mergedBounds +} diff --git a/code/sonalyze/sonalyze.go b/code/sonalyze/sonalyze.go index 82ce33e7..4453f972 100644 --- a/code/sonalyze/sonalyze.go +++ b/code/sonalyze/sonalyze.go @@ -29,6 +29,7 @@ import ( "go-utils/status" "sonalyze/application" "sonalyze/cmd" + "sonalyze/cmd/jobs" . "sonalyze/common" "sonalyze/daemon" "sonalyze/db" @@ -270,6 +271,8 @@ func (_ *standardCommandLineHandler) HandleCommand( switch command := anyCmd.(type) { case *daemon.DaemonCommand: return command.RunDaemon(stdin, stdout, stderr) + case *jobs.JobsCommand: + return application.LocalJobsOperation(command, stdin, stdout, stderr) case cmd.SampleAnalysisCommand: return application.LocalSampleOperation(command, stdin, stdout, stderr) case cmd.SimpleCommand: