Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions .github/workflows/pr-checks.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
name: PR Checks

on:
pull_request:
branches:
- main

permissions:
contents: read

# Cancel in-progress runs when a new commit is pushed to the same PR
concurrency:
group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }}
cancel-in-progress: true

jobs:
checks:
runs-on: ubuntu-latest
timeout-minutes: 15

steps:
- name: Checkout
uses: actions/checkout@v4
with:
fetch-depth: 1

- name: Set up Go
uses: actions/setup-go@v5
with:
go-version-file: go.mod
cache: true

- name: Verify modules tidy
run: |
go mod tidy
git diff --exit-code go.mod go.sum || (echo "::error::go.mod/go.sum need update; run 'go mod tidy' locally" && exit 1)

- name: Static analysis (go vet)
run: go vet ./...

- name: Static analysis (golangci-lint)
uses: golangci/golangci-lint-action@v6
with:
version: v1.62
args: --timeout=5m ./...

- name: Run unit tests
run: go test ./... -v -race -timeout=10m

3 changes: 1 addition & 2 deletions benchmarks/tpch/streams/stream_gen_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ func TestGenerateStreams(t *testing.T) {
reader, err := os.Open("query_streams.csv")
assert.Nil(t, err)
scanner := bufio.NewScanner(reader)
var streams [][]int
streams = make([][]int, 40)
streams := make([][]int, 40)
for row := 0; scanner.Scan(); row++ {
line := scanner.Text()
if row != 0 {
Expand Down
19 changes: 10 additions & 9 deletions cmd/cmp/main.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package cmp

import (
"github.com/spf13/cobra"
"os"
"os/exec"
"path/filepath"
"pbench/log"
"pbench/utils"
"regexp"

"github.com/spf13/cobra"
)

var (
Expand Down Expand Up @@ -117,14 +118,14 @@ func buildFileIdMap(path string) (map[string]string, error) {
return fileIdMap, nil
}

func readFileIntoString(filePath string) string {
if bytes, err := os.ReadFile(filePath); err != nil {
log.Error().Err(err).Str("path", filePath).Msg("failed to read file")
return ""
} else {
return string(bytes)
}
}
// func readFileIntoString(filePath string) string {
// if bytes, err := os.ReadFile(filePath); err != nil {
// log.Error().Err(err).Str("path", filePath).Msg("failed to read file")
// return ""
// } else {
// return string(bytes)
// }
// }

func generateDiff(buildSideFilePath, probeSideFilePath string) (string, error) {
cmd := exec.Command("diff", "-u", buildSideFilePath, probeSideFilePath)
Expand Down
8 changes: 3 additions & 5 deletions cmd/genddl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package genddl
import (
"encoding/json"
"fmt"
"github.com/spf13/cobra"
"io/fs"
"os"
"path/filepath"
Expand All @@ -12,6 +11,8 @@ import (
"strconv"
"strings"
"text/template"

"github.com/spf13/cobra"
)

type Schema struct {
Expand Down Expand Up @@ -258,10 +259,7 @@ func cleanOutputDir(dir string) error {
}

func (s *Schema) shouldGenInsert() bool {
if !s.Iceberg {
return false
}
return true
return s.Iceberg
}

func isRegisterTable(table *Table, schema *Schema) bool {
Expand Down
2 changes: 1 addition & 1 deletion cmd/loadjson/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func processFile(ctx context.Context, path string) {
}
if queryInfo.ErrorCode != nil {
// Need to set this so the run recorders will mark this query as failed.
queryResult.QueryError = fmt.Errorf(*queryInfo.ErrorCode.Name)
queryResult.QueryError = fmt.Errorf("%s", *queryInfo.ErrorCode.Name)
}
// Unlike benchmarks run by pbench, we do not know when did the run start and finish when loading them from files.
// We infer that the whole run starts at min(queryStartTime) and ends at max(queryEndTime).
Expand Down
12 changes: 9 additions & 3 deletions cmd/queryplan/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ func init() {
}

func run(c *cobra.Command, args []string) {
c.ValidateRequiredFlags()
if err := c.ValidateRequiredFlags(); err != nil {
log.Fatal().Err(err).Msg("invalid flags")
}
csvFile := args[0]

log.Info().Msgf("parsing the query plan at column %d in %s", queryPlanColumn, csvFile)
Expand Down Expand Up @@ -116,13 +118,17 @@ func processFile(csvFile string) error {
log.Err(err).Msgf("failed to serialize the joins at row:%d", rowNum)
failureCounter++
} else {
output.WriteString(fmt.Sprintf(`%s "%d":`, newline, rowNum))
if _, err := output.WriteString(fmt.Sprintf(`%s "%d":`, newline, rowNum)); err != nil {
return fmt.Errorf("failed to write to output: %w", err)
}
fmt.Fprint(output, string(out))
newline = ",\n"
}
}
}
output.WriteString("\n}")
if _, err := output.WriteString("\n}"); err != nil {
return fmt.Errorf("failed to write closing brace: %w", err)
}
return nil
}

Expand Down
10 changes: 5 additions & 5 deletions presto/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ type Client struct {
serverUrl *url.URL
userInfo *url.Userinfo
sessionParams map[string]any
clientTags []string
baseHeader http.Header
isTrino bool
forceHttps bool
headerMutex sync.RWMutex
//clientTags []string
baseHeader http.Header
isTrino bool
forceHttps bool
headerMutex sync.RWMutex
}

func NewClient(serverUrl string, isTrino bool) (*Client, error) {
Expand Down
15 changes: 9 additions & 6 deletions presto/plan_node/plan_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,9 @@ const (
)

var (
nodeDepthCtxKey = struct{}{}
NoRootPlanNodeError = errors.New("no root plan node found")
NonExistentRemoteSourceError = errors.New("non-existent remote source")
IsJoin = map[string]bool{
ErrNoRootPlanNode = errors.New("no root plan node found")
ErrNonExistentRemoteSource = errors.New("non-existent remote source")
IsJoin = map[string]bool{
LeftJoin: true,
RightJoin: true,
InnerJoin: true,
Expand All @@ -45,6 +44,10 @@ type PlanNode struct {
Estimates []PlanEstimate `json:"estimates"`
}

type nodeDepthCtxKeyType struct{}

var nodeDepthCtxKey = nodeDepthCtxKeyType{}

func (n *PlanNode) GetTraverseDepth(ctx context.Context) int {
depth, ok := ctx.Value(nodeDepthCtxKey).(int)
if !ok {
Expand Down Expand Up @@ -86,7 +89,7 @@ func (n *PlanNode) Traverse(ctx context.Context, fn PlanNodeTraverseFunction, pl
return err
}
} else {
return fmt.Errorf("%w %s", NonExistentRemoteSourceError, remoteSourceId)
return fmt.Errorf("%w %s", ErrNonExistentRemoteSource, remoteSourceId)
}
}
} else if err := fn(ctx, n); err != nil {
Expand All @@ -106,7 +109,7 @@ func (t PlanTree) Traverse(ctx context.Context, fn PlanNodeTraverseFunction, mod
if node, exists := t["0"]; exists {
return node.Plan.Traverse(node.Plan.incrementTraverseDepth(ctx), fn, t, mode...)
}
return NoRootPlanNodeError
return ErrNoRootPlanNode
}

func traceValue(assignmentMap map[string]Value, tableHandle *HiveTableHandle, value Value) Value {
Expand Down
5 changes: 3 additions & 2 deletions presto/query_json/duration.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,17 @@ package query_json
import (
"encoding/json"
"fmt"
"github.com/xhit/go-str2duration/v2"
"time"

"github.com/xhit/go-str2duration/v2"
)

type Duration struct {
time.Duration
}

func (d *Duration) String() string {
return d.String()
return d.Duration.String()
}

func (d *Duration) MarshalJSON() ([]byte, error) {
Expand Down
7 changes: 4 additions & 3 deletions presto/unmarshal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/stretchr/testify/assert"
"testing"

"github.com/stretchr/testify/assert"
)

func TestBuiltinRows(t *testing.T) {
Expand Down Expand Up @@ -38,11 +39,11 @@ func TestPrestoUnmarshal(t *testing.T) {
rows, columnHeaders := getBuiltinRows(t)
var nilPtr *[]string
err := UnmarshalQueryData(rows, columnHeaders, nilPtr)
assert.ErrorIs(t, err, UnmarshalError) // nil pointer
assert.ErrorIs(t, err, ErrUnmarshal) // nil pointer

columnsStats := make([]ColumnStats, 8, 17)
err = UnmarshalQueryData(rows, columnHeaders, columnsStats)
assert.ErrorIs(t, err, UnmarshalError) // not a pointer
assert.ErrorIs(t, err, ErrUnmarshal) // not a pointer

newFloat64 := func(f float64) *float64 {
return &f
Expand Down
12 changes: 6 additions & 6 deletions presto/unmarshaller.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

var (
RawJsonMessageType = reflect.TypeOf((*json.RawMessage)(nil)).Elem()
UnmarshalError = errors.New("unmarshall: receiving value")
ErrUnmarshal = errors.New("unmarshall: receiving value")
structColumnMapCache = make(map[reflect.Type]map[string]int)
)

Expand All @@ -19,7 +19,7 @@ func buildColumnMap(t reflect.Type) map[string]int {
k := t.Kind()
if k == reflect.Interface || k == reflect.Pointer {
t = t.Elem()
k = t.Kind()
t.Kind()
}
if t.Kind() != reflect.Struct {
return nil
Expand Down Expand Up @@ -67,7 +67,7 @@ func unmarshalRow(rawRowData json.RawMessage, v reflect.Value, columnFieldIndexe
v.Set(rawRowDataValue.Convert(vType))
return nil
}
return fmt.Errorf("%w cannot be set", UnmarshalError)
return fmt.Errorf("%w cannot be set", ErrUnmarshal)
}

row := make([]any, len(columnFieldIndexes))
Expand Down Expand Up @@ -95,12 +95,12 @@ func UnmarshalQueryData(data []json.RawMessage, columns []Column, v any) error {
}
vPtr := reflect.ValueOf(v)
if vPtr.Kind() != reflect.Pointer {
return fmt.Errorf("%w must be a pointer, but it is %T", UnmarshalError, v)
return fmt.Errorf("%w must be a pointer, but it is %T", ErrUnmarshal, v)
} else if vPtr.IsNil() {
if vPtr.CanAddr() {
vPtr.Set(reflect.New(vPtr.Type().Elem()))
} else {
return fmt.Errorf("%w non-addressable value", UnmarshalError)
return fmt.Errorf("%w non-addressable value", ErrUnmarshal)
}
}

Expand All @@ -122,7 +122,7 @@ func UnmarshalQueryData(data []json.RawMessage, columns []Column, v any) error {
} else {
// Then this is a scalar value!
if len(data) > 1 {
return fmt.Errorf("%w must be a pointer to an array, slice, or struct. But it is a pointer to %v", UnmarshalError, vArrayOrStruct.Type())
return fmt.Errorf("%w must be a pointer to an array, slice, or struct. But it is a pointer to %v", ErrUnmarshal, vArrayOrStruct.Type())
} else {
var cols []any
if err := json.Unmarshal(data[0], &cols); err != nil {
Expand Down
45 changes: 28 additions & 17 deletions stage/result.go
Original file line number Diff line number Diff line change
@@ -1,42 +1,53 @@
package stage

import (
"github.com/rs/zerolog"
"pbench/log"
"time"

"github.com/rs/zerolog"
)

type QueryResult struct {
StageId string
Query *Query
QueryId string
InfoUrl string
QueryError error
RowCount int
StartTime time.Time
EndTime *time.Time
Duration *time.Duration
simpleLogging bool
StageId string
Query *Query
QueryId string
InfoUrl string
QueryError error
RowCount int
StartTime time.Time
EndTime *time.Time
Duration *time.Duration
}

// simpleQueryResult is a wrapper for logging QueryResult with reduced output
type simpleQueryResult struct {
*QueryResult
}

func (q *QueryResult) SimpleLogging() *QueryResult {
q.simpleLogging = true
return q
func (q *QueryResult) SimpleLogging() zerolog.LogObjectMarshaler {
return simpleQueryResult{q}
}

func (q *QueryResult) MarshalZerologObject(e *zerolog.Event) {
q.marshalZerologObject(e, false)
}

func (s simpleQueryResult) MarshalZerologObject(e *zerolog.Event) {
s.QueryResult.marshalZerologObject(e, true)
}

func (q *QueryResult) marshalZerologObject(e *zerolog.Event, simpleLogging bool) {
e.Str("benchmark_stage_id", q.StageId)
if q.Query.File != nil {
e.Str("query_file", *q.Query.File)
} else if !q.simpleLogging {
} else if !simpleLogging {
e.Str("query", q.Query.Text)
}
e.Int("query_index", q.Query.Index)
e.Bool("cold_run", q.Query.ColdRun)
e.Int("sequence_no", q.Query.SequenceNo)
e.Str("info_url", q.InfoUrl)
if q.simpleLogging {
q.simpleLogging = false
if simpleLogging {
return
}
e.Str("query_id", q.QueryId)
Expand Down
Loading