Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 57 additions & 48 deletions code/sonalyze/db/timescaledb.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,15 @@
//
// Computed fields can sometimes become NULL, notably sum() over an empty set of rows, and must be
// handled the same way.
//
// Note that PSQL turns *some* field names into syntax, "user" is known to be a problem (issue 904).
// As a consequence, all field names should either be quoted with double quotes (so, `"user" in
// tbl`, not `user in tbl`) or should be qualified with a table name (`t1.user`, not `user`). The
// responsibility for this lies with whoever introduces a field name into a string that is used
// literally in a select, so it's shared between querySlice() and the callers of that function.
//
// Also, field names seem to be folded to lower case, so anything with upper case characters must
// similarly be quoted ("AllocTRES" not AllocTRES).

package db

Expand Down Expand Up @@ -189,19 +198,19 @@ func (cdb *connectedDB) ReadProcessSamples(
)

// Alpha order and KEEP THE FIELD AND BOX LISTS COMPLETELY IN SYNC OR YOU WILL BE SORRY!
t1Fields := "t1.cmd, t1.cpu_avg, t1.cpu_time, t1.cpu_util, t1.data_cancelled, t1.data_read, " +
"t1.data_written, t1.epoch, t1.job, t1.node, t1.num_threads, t1.pid, t1.ppid, " +
"t1.resident_memory, t1.rolledup, t1.time, t1.user, t1.virtual_memory"
t1Fields := `t1.cmd, t1.cpu_avg, t1.cpu_time, t1.cpu_util, t1.data_cancelled, t1.data_read, ` +
`t1.data_written, t1.epoch, t1.job, t1.node, t1.num_threads, t1.pid, t1.ppid, ` +
`t1.resident_memory, t1.rolledup, t1.time, t1.user, t1.virtual_memory`
t1Boxes := []any{
&cmd, &cpuAvg, &cpuTime, &cpuUtil, &dataCancelled, &dataRead,
&dataWritten, &epoch, &job, &node, &numThreads, &pid, &ppid,
&residentMemory, &rolledup, &timestamp, &user, &virtualMemory,
}
t2Fields := "sum(t2.gpu_memory), sum(t2.gpu_util), sum(t2.gpu_memory_util), count(t2.uuid)"
t2Fields := `sum(t2.gpu_memory), sum(t2.gpu_util), sum(t2.gpu_memory_util), count(t2.uuid)`
t2Boxes := []any{&gpuMemoryp, &gpuUtilp, &gpuMemoryUtilp, &gpuCount}
joinBy := "left join sample_process_gpu as t2 on t1.cluster = t2.cluster and t1.node = t2.node " +
"and t1.time = t2.time and t1.pid = t2.pid and t1.job = t2.job and t1.epoch = t2.epoch " +
"group by " + t1Fields
joinBy := `left join sample_process_gpu as t2 on t1.cluster = t2.cluster and t1.node = t2.node ` +
`and t1.time = t2.time and t1.pid = t2.pid and t1.job = t2.job and t1.epoch = t2.epoch ` +
`group by ` + t1Fields
q := query{
DataProviderFilter: filter,
table: "sample_process",
Expand Down Expand Up @@ -288,8 +297,8 @@ func (cdb *connectedDB) ReadNodeSamples(
DataProviderFilter: filter,
table: "sample_system",
// Alpha order and KEEP THESE TWO LISTS COMPLETELY IN SYNC OR YOU WILL BE SORRY!
fields: "boot, existing_entities, load1, load15, load5, node, " +
"runnable_entities, time, used_memory",
fields: `"boot", "existing_entities", "load1", "load15", "load5", "node", ` +
`"runnable_entities", "time", "used_memory"`,
boxes: []any{
&bootp, &existingEntities, &load1, &load15, &load5, &node,
&runnableEntities, &timestamp, &usedMemory,
Expand Down Expand Up @@ -334,11 +343,11 @@ func (cdb *connectedDB) ReadDiskSamples(
DataProviderFilter: filter,
table: "sample_disk",
// Alpha order and KEEP THESE TWO LISTS COMPLETELY IN SYNC OR YOU WILL BE SORRY!
fields: "discards_completed, discards_merged, flush_requests_completed, " +
"ios_currently_in_progress, major, minor, ms_spent_discarding, " +
"ms_spent_doing_ios, ms_spent_flushing, ms_spent_reading, ms_spent_writing, " +
"name, node, reads_completed, reads_merged, sectors_discarded, sectors_read, " +
"sectors_written, time, weighted_ms_spent_doing_ios, writes_completed, writes_merged",
fields: `"discards_completed", "discards_merged", "flush_requests_completed", ` +
`"ios_currently_in_progress", "major", "minor", "ms_spent_discarding", ` +
`"ms_spent_doing_ios", "ms_spent_flushing", "ms_spent_reading", "ms_spent_writing", ` +
`"name", "node", "reads_completed", "reads_merged", "sectors_discarded", "sectors_read", ` +
`"sectors_written", "time", "weighted_ms_spent_doing_ios", "writes_completed", "writes_merged"`,
boxes: []any{&discards_completed, &discards_merged, &flush_requests_completed,
&ios_currently_in_progress, &major, &minor, &ms_spent_discarding,
&ms_spent_doing_ios, &ms_spent_flushing, &ms_spent_reading, &ms_spent_writing,
Expand Down Expand Up @@ -390,7 +399,7 @@ func (cdb *connectedDB) ReadCpuSamples(
DataProviderFilter: filter,
table: "sample_system",
// Alpha order and KEEP THESE TWO LISTS COMPLETELY IN SYNC OR YOU WILL BE SORRY!
fields: "cpus, node, time",
fields: `"cpus", "node", "time"`,
boxes: []any{&cpus, &node, &timestamp},
}

Expand Down Expand Up @@ -440,12 +449,12 @@ func (cdb *connectedDB) ReadGpuSamples(
q := query{
DataProviderFilter: filter,
table: "sysinfo_gpu_card_config",
join: "join sample_gpu as t2 on t1.uuid = t2.uuid and " + extra +
"age(t1.time, t2.time) < interval '15 minutes'",
join: `join sample_gpu as t2 on t1.uuid = t2.uuid and ` + extra +
`age(t1.time, t2.time) < interval '15 minutes'`,
// Alpha order and KEEP THESE TWO LISTS COMPLETELY IN SYNC OR YOU WILL BE SORRY!
fields: "t2.ce_clock, t2.ce_util, t2.compute_mode, t2.failing, t2.fan, t2.index, " +
"t2.memory, t2.memory_clock, t2.memory_util, t1.node, t2.performance_state, " +
"t2.power, t2.power_limit, t2.temperature, t2.time, t2.uuid",
fields: `t2.ce_clock, t2.ce_util, t2.compute_mode, t2.failing, t2.fan, t2.index, ` +
`t2.memory, t2.memory_clock, t2.memory_util, t1.node, t2.performance_state, ` +
`t2.power, t2.power_limit, t2.temperature, t2.time, t2.uuid`,
boxes: []any{
&ce_clock, &ce_util, &compute_mode, &failing, &fan, &indexp,
&memory, &memory_clock, &memory_util, &node, &performance_state,
Expand Down Expand Up @@ -505,8 +514,8 @@ func (cdb *connectedDB) ReadSysinfoNodeData(
DataProviderFilter: filter,
table: "sysinfo_attributes",
// Alpha order and KEEP THESE TWO LISTS COMPLETELY IN SYNC OR YOU WILL BE SORRY!
fields: "architecture, cards, cluster, cores_per_socket, cpu_model, distances, memory, " +
"node, numa_nodes, os_name, os_release, sockets, threads_per_core, time, topo_svg, topo_text",
fields: `"architecture", "cards", "cluster", "cores_per_socket", "cpu_model", "distances", "memory", ` +
`"node", "numa_nodes", "os_name", "os_release", "sockets", "threads_per_core", "time", "topo_svg", "topo_text"`,
boxes: []any{
&architecture, &cards, &cluster, &coresPerSocket, &cpuModel, &distances, &memory,
&node, &numaNodesBox, &osName, &osRelease, &sockets, &threadsPerCore, &timestamp, &topoSvgp, &topoTextp,
Expand Down Expand Up @@ -599,10 +608,10 @@ func (cdb *connectedDB) ReadSysinfoCardData(
// sysinfo_gpu_card. That needs to be joined to sysinfo_gpu_card_config here (by UUID) to
// get the full story.
table: "sysinfo_gpu_card_config",
join: "join sysinfo_gpu_card t2 on t1.uuid = t2.uuid",
join: `join sysinfo_gpu_card t2 on t1.uuid = t2.uuid`,
// Alpha field name order and KEEP THESE TWO LISTS COMPLETELY IN SYNC OR YOU WILL BE SORRY!
fields: "address, architecture, driver, firmware, index, manufacturer, max_ce_clock, max_memory_clock, " +
"max_power_limit, memory, min_power_limit, model, node, power_limit, time, t1.uuid",
fields: `"address", "architecture", "driver", "firmware", "index", "manufacturer", "max_ce_clock", "max_memory_clock", ` +
`"max_power_limit", "memory", "min_power_limit", "model", "node", "power_limit", "time", t1.uuid`,
boxes: []any{
&address, &architecture, &driver, &firmware, &index, &manufacturer, &maxCeClock, &maxMemoryClock,
&maxPowerLimit, &memory, &minPowerLimit, &model, &node, &powerLimit, &timestamp, &uuid,
Expand Down Expand Up @@ -661,17 +670,17 @@ func (cdb *connectedDB) ReadSacctData(
DataProviderFilter: filter,
fieldMap: map[string]string{"time": "time", "user": "user_name", "job": "job_name", "node": "nodes"},
table: "sample_slurm_job",
join: "join sample_slurm_job_acc as t2 on " +
"t1.cluster = t2.cluster and t1.job_id = t2.job_id and t1.job_step = t2.job_step and " +
"t1.time = t2.time",
join: `join sample_slurm_job_acc as t2 on ` +
`t1.cluster = t2.cluster and t1.job_id = t2.job_id and t1.job_step = t2.job_step and ` +
`t1.time = t2.time`,
// Alpha order and KEEP THESE TWO LISTS COMPLETELY IN SYNC OR YOU WILL BE SORRY!
fields: "account, allocated_resources, \"AllocTRES\", array_job_id, array_task_id, \"AveCPU\", " +
"\"AveDiskRead\", \"AveDiskWrite\", \"AveRSS\", \"AveVMSize\", t1.cluster, distribution, \"ElapsedRaw\", " +
"end_time, exit_code, het_job_id, het_job_offset, t1.job_id, job_name, " +
"job_state, t1.job_step, \"MaxRSS\", \"MaxVMSize\", \"MinCPU\", nodes, " +
"partition, priority, requested_cpus, requested_memory_per_node, requested_node_count, " +
"requested_resources, reservation, start_time, submit_time, suspend_time, \"SystemCPU\", " +
"t1.time, time_limit, \"UserCPU\", user_name",
fields: `"account", "allocated_resources", "AllocTRES", "array_job_id", "array_task_id", "AveCPU", ` +
`"AveDiskRead", "AveDiskWrite", "AveRSS", "AveVMSize", t1.cluster, "distribution", "ElapsedRaw", ` +
`"end_time", "exit_code", "het_job_id", "het_job_offset", t1.job_id, "job_name", ` +
`"job_state", t1.job_step, "MaxRSS", "MaxVMSize", "MinCPU", "nodes", ` +
`"partition", "priority", "requested_cpus", "requested_memory_per_node", "requested_node_count", ` +
`"requested_resources", "reservation", "start_time", "submit_time", "suspend_time", "SystemCPU", ` +
`t1.time, "time_limit", "UserCPU", "user_name"`,
boxes: []any{
&account, &allocatedResourcesp, &allocTRES, &arrayJobId, &arrayTaskIdp, &aveCPU,
&aveDiskRead, &aveDiskWrite, &aveRSS, &aveVMSize, &cluster, &distribution, &elapsedRaw,
Expand Down Expand Up @@ -809,7 +818,7 @@ func (cdb *connectedDB) ReadCluzterAttributeData(
DataProviderFilter: filter,
table: "cluster_attributes",
// Alpha order and KEEP THESE TWO LISTS COMPLETELY IN SYNC OR YOU WILL BE SORRY!
fields: "cluster, slurm, time",
fields: `"cluster", "slurm", "time"`,
boxes: []any{&cluster, &slurm, &timestamp},
}

Expand Down Expand Up @@ -839,7 +848,7 @@ func (cdb *connectedDB) ReadCluzterPartitionData(
DataProviderFilter: filter,
table: "partition",
// Alpha order and KEEP THESE TWO LISTS COMPLETELY IN SYNC OR YOU WILL BE SORRY!
fields: "cluster, nodes_compact, partition, time",
fields: `"cluster", "nodes_compact", "partition", "time"`,
boxes: []any{&cluster, &nodeNamesCompact, &partName, &timestamp},
}

Expand Down Expand Up @@ -883,7 +892,7 @@ func (cdb *connectedDB) ReadCluzterNodeData(
DataProviderFilter: filter,
table: "node_state",
// Alpha order and KEEP THESE TWO LISTS COMPLETELY IN SYNC OR YOU WILL BE SORRY!
fields: "cluster, node, states, time",
fields: `"cluster", "node", "states", "time"`,
boxes: []any{&cluster, &nodeName, &states, &timestamp},
}

Expand Down Expand Up @@ -922,7 +931,7 @@ func makeRefillTimeCache(
return func() (low, high time.Time, err error) {
err = dbc.QueryRowAndScan(
context.Background(),
"SELECT MIN(time) FROM sysinfo_attributes WHERE cluster = $1",
`SELECT MIN(time) FROM sysinfo_attributes WHERE "cluster" = $1`,
[]any{clusterName},
[]any{&low},
)
Expand All @@ -931,7 +940,7 @@ func makeRefillTimeCache(
}
err = dbc.QueryRowAndScan(
context.Background(),
"SELECT MAX(time) FROM sample_process WHERE cluster = $1",
`SELECT MAX(time) FROM sample_process WHERE "cluster" = $1`,
[]any{clusterName},
[]any{&high},
)
Expand All @@ -956,7 +965,7 @@ func querySlice[T any](
q *query,
unbox func() *T,
) (finalRows [][]*T, softErrors int, err error) {
primary := "SELECT * FROM " + q.table + " WHERE " + "cluster=$1"
primary := "SELECT * FROM " + q.table + " WHERE \"cluster\"=$1"
qarg := []any{cdb.cx.ClusterName()}

// A note about adding field filters against values. For values we don't control we have to use
Expand All @@ -971,11 +980,11 @@ func querySlice[T any](

timeField := mapField("time", q.fieldMap)
if !q.FromDate.IsZero() {
primary += fmt.Sprintf(" AND %s >= $%d", timeField, len(qarg)+1)
primary += fmt.Sprintf(" AND \"%s\" >= $%d", timeField, len(qarg)+1)
qarg = append(qarg, q.FromDate.Format(time.DateOnly))
}
if !q.ToDate.IsZero() {
primary += fmt.Sprintf(" AND %s < $%d", timeField, len(qarg)+1)
primary += fmt.Sprintf(" AND \"%s\" < $%d", timeField, len(qarg)+1)
qarg = append(qarg, q.ToDate.Add(time.Hour*24).Format(time.DateOnly))
}

Expand Down Expand Up @@ -1009,7 +1018,7 @@ func querySlice[T any](
elements = append(elements, fmt.Sprintf("$%d::character varying", len(qarg)+1))
qarg = append(qarg, e)
}
primary += " AND " + fieldname + " && ARRAY[" + strings.Join(elements, ",") + "]"
primary += " AND \"" + fieldname + "\" && ARRAY[" + strings.Join(elements, ",") + "]"
}
} else {
conds := make([]string, 0)
Expand All @@ -1018,7 +1027,7 @@ func querySlice[T any](
for _, p := range q.Node.Patterns() {
loc := strings.IndexAny(p, "[*")
if !q.Node.IsPrefix() && loc == -1 {
conds = append(conds, fmt.Sprintf("%s = $%d", fieldname, nextIx))
conds = append(conds, fmt.Sprintf("\"%s\" = $%d", fieldname, nextIx))
} else {
// TODO: We can and should do more here:
//
Expand All @@ -1035,7 +1044,7 @@ func querySlice[T any](
//
// We should get the hostglobber involved since it has an exact parse of the pattern
// set and is already in q.hosts.
conds = append(conds, fmt.Sprintf("%s like $%d", fieldname, nextIx))
conds = append(conds, fmt.Sprintf("\"%s\" like $%d", fieldname, nextIx))
if loc != -1 {
p = p[:loc]
}
Expand All @@ -1057,7 +1066,7 @@ func querySlice[T any](
fieldname := mapField("job", q.fieldMap)
var jobs []string
for j := range q.Jobs {
jobs = append(jobs, fmt.Sprintf("%s = %d", fieldname, j))
jobs = append(jobs, fmt.Sprintf("\"%s\" = %d", fieldname, j))
}
primary += " AND (" + strings.Join(jobs, " OR ") + ")"
}
Expand All @@ -1068,7 +1077,7 @@ func querySlice[T any](
fieldname := mapField("user", q.fieldMap)
var conds []string
for u := range q.Users {
conds = append(conds, fmt.Sprintf("%s = $%d", fieldname, len(qarg)+1))
conds = append(conds, fmt.Sprintf("\"%s\" = $%d", fieldname, len(qarg)+1))
qarg = append(qarg, u.String())
}
primary += " AND (" + strings.Join(conds, " OR ") + ")"
Expand Down
Loading