From 917511d7dfeb2228bd6eb5464ad78ae793499095 Mon Sep 17 00:00:00 2001 From: Alex Sweet Date: Fri, 27 Mar 2026 12:58:24 -0700 Subject: [PATCH 1/2] Fix _default stream path format for Java client compatibility The Java BigQuery client library sends the _default stream name as: projects/{project}/datasets/{dataset}/tables/{table}/_default But createDefaultStream only accepted the format with /streams/: projects/{project}/datasets/{dataset}/tables/{table}/streams/_default This caused GetWriteStream to fail with "unexpected stream id" when called from Java clients, making the Storage Write API unusable. Handle both path formats by checking for /streams/_default first, then falling back to /_default. Fixes #246 Fixes #342 Co-Authored-By: Claude Opus 4.6 (1M context) --- server/storage_handler.go | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/server/storage_handler.go b/server/storage_handler.go index 7e2ace2c9..82a80c86b 100644 --- a/server/storage_handler.go +++ b/server/storage_handler.go @@ -785,20 +785,27 @@ According to google documentation (https://pkg.go.dev/cloud.google.com/go/bigque every table has a special stream named ‘_default’ to which data can be written. This stream doesn’t need to be created using CreateWriteStream Here we create the default stream and add it to map in case it not exists yet, the GetWriteStreamRequest given as second -argument should have Name in this format: projects//datasets//tables//streams/_default +argument should have Name in one of these formats: + - projects//datasets//tables//streams/_default + - projects//datasets//tables//_default +The Java client library uses the shorter format (without /streams/). */ func (s *storageWriteServer) createDefaultStream(ctx context.Context, req *storagepb.GetWriteStreamRequest) (*storagepb.WriteStream, error) { streamId := req.Name suffix := "_default" - streams := "/streams/" if !strings.HasSuffix(streamId, suffix) { - return nil, fmt.Errorf("unexpected stream id: %s, expected '%s' suffix", streamId, suffix) - } - index := strings.LastIndex(streamId, streams) - if index == -1 { - return nil, fmt.Errorf("unexpected stream id: %s, expected containg '%s'", streamId, streams) + return nil, fmt.Errorf("unexpected stream id: %s, expected ‘%s’ suffix", streamId, suffix) + } + // Extract the table path by stripping the stream suffix. + // Handle both "…/streams/_default" and "…/_default" formats. + var streamPart string + if index := strings.LastIndex(streamId, "/streams/"); index != -1 { + streamPart = streamId[:index] + } else if index := strings.LastIndex(streamId, "/_default"); index != -1 { + streamPart = streamId[:index] + } else { + return nil, fmt.Errorf("unexpected stream id format: %s", streamId) } - streamPart := streamId[:index] writeStreamReq := &storagepb.CreateWriteStreamRequest{ Parent: streamPart, WriteStream: &storagepb.WriteStream{ From bf6b646bd956749ae6febf85c28ac5ebada963b3 Mon Sep 17 00:00:00 2001 From: Alex Sweet Date: Fri, 27 Mar 2026 14:02:50 -0700 Subject: [PATCH 2/2] Add test for _default stream path format compatibility Tests that GetWriteStream works with both path formats for the _default stream: - projects/{p}/datasets/{d}/tables/{t}/streams/_default - projects/{p}/datasets/{d}/tables/{t}/_default The Java BigQuery client library uses the shorter format (without /streams/), which was previously rejected by the emulator. Co-Authored-By: Claude Opus 4.6 (1M context) --- server/storage_test.go | 108 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 108 insertions(+) diff --git a/server/storage_test.go b/server/storage_test.go index fa80bdf14..4b016cef1 100644 --- a/server/storage_test.go +++ b/server/storage_test.go @@ -640,6 +640,114 @@ func TestStorageWrite(t *testing.T) { } } +// TestDefaultStreamPathFormats verifies that the _default stream can be +// accessed using both path formats: +// - projects/{p}/datasets/{d}/tables/{t}/streams/_default (Go client format) +// - projects/{p}/datasets/{d}/tables/{t}/_default (Java client format) +// +// This is a regression test for https://github.com/goccy/bigquery-emulator/issues/246 +func TestDefaultStreamPathFormats(t *testing.T) { + for _, test := range []struct { + name string + streamPath string // path format for the _default stream + }{ + { + name: "with_streams_prefix", + streamPath: "projects/%s/datasets/%s/tables/%s/streams/_default", + }, + { + name: "without_streams_prefix", + streamPath: "projects/%s/datasets/%s/tables/%s/_default", + }, + } { + t.Run(test.name, func(t *testing.T) { + const ( + projectID = "test" + datasetID = "test" + tableID = "default_stream_test" + ) + + ctx := context.Background() + bqServer, err := server.New(server.TempStorage) + if err != nil { + t.Fatal(err) + } + if err := bqServer.Load( + server.StructSource( + types.NewProject( + projectID, + types.NewDataset( + datasetID, + types.NewTable( + tableID, + []*types.Column{ + types.NewColumn("string_col", types.STRING), + types.NewColumn("int64_col", types.INT64), + }, + nil, + ), + ), + ), + ), + ); err != nil { + t.Fatal(err) + } + testServer := bqServer.TestServer() + defer func() { + testServer.Close() + bqServer.Close() + }() + opts, err := testServer.GRPCClientOptions(ctx) + if err != nil { + t.Fatal(err) + } + + // Use the raw BigQueryWriteClient (not managedwriter) so we + // control the exact stream path sent in the request. + writeClient, err := bqStorage.NewBigQueryWriteClient(ctx, opts...) + if err != nil { + t.Fatal(err) + } + defer writeClient.Close() + + streamName := fmt.Sprintf(test.streamPath, projectID, datasetID, tableID) + + // GetWriteStream should succeed and return the table schema. + writeStream, err := writeClient.GetWriteStream(ctx, &storagepb.GetWriteStreamRequest{ + Name: streamName, + }) + if err != nil { + t.Fatalf("GetWriteStream(%s) failed: %v", streamName, err) + } + if writeStream.GetTableSchema() == nil { + t.Fatal("GetWriteStream returned nil TableSchema") + } + if len(writeStream.GetTableSchema().GetFields()) == 0 { + t.Fatal("GetWriteStream returned empty schema fields") + } + t.Logf("GetWriteStream succeeded: %d schema fields", len(writeStream.GetTableSchema().GetFields())) + + // Verify we can also query the table via REST to confirm it's accessible. + bqClient, err := bigquery.NewClient( + ctx, + projectID, + option.WithEndpoint(testServer.URL), + option.WithoutAuthentication(), + ) + if err != nil { + t.Fatal(err) + } + defer bqClient.Close() + + iter := bqClient.Dataset(datasetID).Table(tableID).Read(ctx) + rowCount := countRows(t, iter) + if rowCount != 0 { + t.Fatalf("expected 0 rows in fresh table, got %d", rowCount) + } + }) + } +} + func countRows(t *testing.T, iter *bigquery.RowIterator) int { var resultRowCount int for {