Skip to content

Repeated RECORD field access intermittently reads values from the wrong field #483

@oshiro3

Description

@oshiro3

What happened?

While the data in the REPEATED RECORD itself can be verified correctly using TO_JSON_STRING, attempting to access individual fields such as item.name, item.skip, or item.enabled after using UNNEST may result in field mappings becoming misaligned.

As a result, when attempting to parse a STRING value as a bool in a location where a BOOLEAN-type field should be read, an error like the following occurs for JSON such as:

  {name: “item-1”, enabled: true}

an error like the one below occurs.

  strconv.ParseBool: parsing “item-1”: invalid syntax

Furthermore, this behavior is not always reproducible; when restarting the emulator and executing the same schema, data, and query, it may succeed or fail.

What did you expect to happen?

I expected querying fields inside a repeated RECORD after UNNEST to consistently return the values that were loaded into the table.

Given this row:

  {
    "id": 1,
    "items": [
      {
        "name": "item-1",
        "skip": false,
        "enabled": true
      }
    ]
  }

This query:

  SELECT id, item.name, item.skip, item.enabled
  FROM dataset.repro
  LEFT JOIN UNNEST(items) AS item

should always return:

1, "item-1", false, true

Also, this query:

  SELECT id, ARRAY_AGG(item.name)
  FROM dataset.repro
  LEFT JOIN UNNEST(items) AS item
  WHERE item.skip = FALSE AND item.enabled = TRUE
  GROUP BY id

should always return:

1, ["item-1"]

I expected to be deterministic across emulator instances for the same schema, data, and query.

How can we reproduce it (as minimally and precisely as possible)?

I have prepared the following test code.
This test fails when bigquery-emulator is version 0.7.2, but always succeeds with version 0.6.6.

func TestBigQueryEmulatorRepeatedRecordFieldAccessRepro(t *testing.T) {
	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel()

	client, cleanup := setupRepeatedRecordRepro(t, ctx)
	defer cleanup()

	// expect to pass
	t.Run("data is loaded", func(t *testing.T) {
		rows := queryRows(t, ctx, client, `
SELECT id, TO_JSON_STRING(items)
FROM dataset.repro
`)
		if len(rows) != 1 {
			t.Fatalf("row count mismatch: got %#v", rows)
		}
		assertJSONContains(t, rows[0][1].(string), "item-1", `"skip":false`, `"enabled":true`)
	})

	t.Run("individual repeated record field access is stable", func(t *testing.T) {
		const attempts = 50
		want := [][]bigquery.Value{{int64(1), "item-1", false, true}}
		var successes int
		var failures []string

		for i := range attempts {
			rows, err := queryRowsE(ctx, client, `
SELECT id, item.name, item.skip, item.enabled
FROM dataset.repro
LEFT JOIN UNNEST(items) AS item
`)
			if err != nil {
				failures = append(failures, fmt.Sprintf("attempt %02d: error: %v", i+1, err))
				continue
			}
			if diff := rowsDiff(rows, want); diff != "" {
				failures = append(failures, fmt.Sprintf("attempt %02d: mismatch: %s", i+1, diff))
				continue
			}
			successes++
		}

		if len(failures) > 0 {
			limit := len(failures)
			t.Fatalf("individual field access was not stable: successes=%d failures=%d attempts=%d\nfirst failures:\n%s", successes, len(failures), attempts, strings.Join(failures[:limit], "\n"))
		}
	})

	t.Run("boolean filter inside repeated record", func(t *testing.T) {
		rows := queryRows(t, ctx, client, `
SELECT id, ARRAY_AGG(item.name)
FROM dataset.repro
LEFT JOIN UNNEST(items) AS item
WHERE item.skip = FALSE AND item.enabled = TRUE
GROUP BY id
`)
		want := [][]bigquery.Value{{int64(1), []bigquery.Value{"item-1"}}}
		assertRows(t, rows, want)
	})

	t.Run("individual repeated record field access across fresh emulators", func(t *testing.T) {
		const attempts = 20
		want := [][]bigquery.Value{{int64(1), "item-1", false, true}}
		var successes int
		var failures []string

		for i := range attempts {
			attemptCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
			attemptClient, attemptCleanup := setupRepeatedRecordRepro(t, attemptCtx)
			rows, err := queryRowsE(attemptCtx, attemptClient, `
SELECT id, item.name, item.skip, item.enabled
FROM dataset.repro
LEFT JOIN UNNEST(items) AS item
`)
			attemptCleanup()
			cancel()

			if err != nil {
				failures = append(failures, fmt.Sprintf("attempt %02d: error: %v", i+1, err))
				continue
			}
			if diff := rowsDiff(rows, want); diff != "" {
				failures = append(failures, fmt.Sprintf("attempt %02d: mismatch: %s", i+1, diff))
				continue
			}
			successes++
		}

		if len(failures) > 0 {
			limit := len(failures)
			if limit > 10 {
				limit = 10
			}
			t.Fatalf("individual field access was not stable across fresh emulators: successes=%d failures=%d attempts=%d\nfirst failures:\n%s", successes, len(failures), attempts, strings.Join(failures[:limit], "\n"))
		}
	})
}

func setupRepeatedRecordRepro(t *testing.T, ctx context.Context) (*bigquery.Client, func()) {
	t.Helper()

	bqs, err := server.New(server.TempStorage)
	if err != nil {
		t.Fatal(err)
	}
	testServer := bqs.TestServer()

	client, err := bigquery.NewClient(ctx, "test", option.WithEndpoint(testServer.URL), option.WithoutAuthentication())
	if err != nil {
		t.Fatal(err)
	}

	schema := []*types.Column{
		types.NewColumn("id", types.INTEGER),
		types.NewColumn(
			"items",
			types.RECORD,
			types.ColumnMode(types.RepeatedMode),
			types.ColumnFields(
				types.NewColumn("name", types.STRING),
				types.NewColumn("skip", types.BOOLEAN),
				types.NewColumn("enabled", types.BOOLEAN),
			),
		),
	}

	rows := mustUnmarshalRows(t, `[
  {
    "id": 1,
    "items": [
      {"name": "item-1", "skip": false, "enabled": true}
    ]
  }
]`)

	if err := bqs.Load(server.StructSource(types.NewProject(
		"test",
		types.NewDataset("dataset", types.NewTable("repro", schema, types.Data(rows))),
	))); err != nil {
		t.Fatal(err)
	}

	return client, func() {
		client.Close()
		testServer.Close()
	}
}

func mustUnmarshalRows(t *testing.T, data string) []map[string]any {
	t.Helper()
	var rows []map[string]any
	if err := json.Unmarshal([]byte(data), &rows); err != nil {
		t.Fatal(err)
	}
	return rows
}

func queryRows(t *testing.T, ctx context.Context, client *bigquery.Client, sql string) [][]bigquery.Value {
	t.Helper()
	rows, err := queryRowsE(ctx, client, sql)
	if err != nil {
		t.Fatalf("query failed: %v", err)
	}
	return rows
}

func queryRowsE(ctx context.Context, client *bigquery.Client, sql string) ([][]bigquery.Value, error) {
	it, err := client.Query(sql).Read(ctx)
	if err != nil {
		return nil, err
	}

	var rows [][]bigquery.Value
	for {
		var row []bigquery.Value
		err := it.Next(&row)
		if err == iterator.Done {
			return rows, nil
		}
		if err != nil {
			return nil, err
		}
		rows = append(rows, row)
	}
}

func assertJSONContains(t *testing.T, value string, parts ...string) {
	t.Helper()
	for _, part := range parts {
		if !strings.Contains(value, part) {
			t.Fatalf("JSON %q does not contain %q", value, part)
		}
	}
}

func assertRows(t *testing.T, got, want [][]bigquery.Value) {
	t.Helper()
	if diff := rowsDiff(got, want); diff != "" {
		t.Fatal(diff)
	}
}

func rowsDiff(got, want [][]bigquery.Value) string {
	if len(got) != len(want) {
		return fmt.Sprintf("row count mismatch: got %#v, want %#v", got, want)
	}
	for i := range want {
		if len(got[i]) != len(want[i]) {
			return fmt.Sprintf("column count mismatch at row %d: got %#v, want %#v", i, got[i], want[i])
		}
		for j := range want[i] {
			if !valuesEqual(got[i][j], want[i][j]) {
				return fmt.Sprintf("value mismatch at row %d column %d: got %#v, want %#v; all rows got %#v", i, j, got[i][j], want[i][j], got)
			}
		}
	}
	return ""
}

func valuesEqual(got, want bigquery.Value) bool {
	gotValues, gotOK := got.([]bigquery.Value)
	wantValues, wantOK := want.([]bigquery.Value)
	if gotOK || wantOK {
		if !gotOK || !wantOK || len(gotValues) != len(wantValues) {
			return false
		}
		for i := range wantValues {
			if gotValues[i] != wantValues[i] {
				return false
			}
		}
		return true
	}
	return got == want
}

Anything else we need to know?

Although I used BOOLEAN as an example, it appeared that this issue was occurring regardless of the data type.
I hope this will be resolved speedily.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions