diff --git a/server/storage_handler.go b/server/storage_handler.go index 7e2ace2c9..82a80c86b 100644 --- a/server/storage_handler.go +++ b/server/storage_handler.go @@ -785,20 +785,27 @@ According to google documentation (https://pkg.go.dev/cloud.google.com/go/bigque every table has a special stream named ‘_default’ to which data can be written. This stream doesn’t need to be created using CreateWriteStream Here we create the default stream and add it to map in case it not exists yet, the GetWriteStreamRequest given as second -argument should have Name in this format: projects//datasets//tables//streams/_default +argument should have Name in one of these formats: + - projects//datasets//tables//streams/_default + - projects//datasets//tables//_default +The Java client library uses the shorter format (without /streams/). */ func (s *storageWriteServer) createDefaultStream(ctx context.Context, req *storagepb.GetWriteStreamRequest) (*storagepb.WriteStream, error) { streamId := req.Name suffix := "_default" - streams := "/streams/" if !strings.HasSuffix(streamId, suffix) { - return nil, fmt.Errorf("unexpected stream id: %s, expected '%s' suffix", streamId, suffix) - } - index := strings.LastIndex(streamId, streams) - if index == -1 { - return nil, fmt.Errorf("unexpected stream id: %s, expected containg '%s'", streamId, streams) + return nil, fmt.Errorf("unexpected stream id: %s, expected ‘%s’ suffix", streamId, suffix) + } + // Extract the table path by stripping the stream suffix. + // Handle both "…/streams/_default" and "…/_default" formats. + var streamPart string + if index := strings.LastIndex(streamId, "/streams/"); index != -1 { + streamPart = streamId[:index] + } else if index := strings.LastIndex(streamId, "/_default"); index != -1 { + streamPart = streamId[:index] + } else { + return nil, fmt.Errorf("unexpected stream id format: %s", streamId) } - streamPart := streamId[:index] writeStreamReq := &storagepb.CreateWriteStreamRequest{ Parent: streamPart, WriteStream: &storagepb.WriteStream{ diff --git a/server/storage_test.go b/server/storage_test.go index fa80bdf14..4b016cef1 100644 --- a/server/storage_test.go +++ b/server/storage_test.go @@ -640,6 +640,114 @@ func TestStorageWrite(t *testing.T) { } } +// TestDefaultStreamPathFormats verifies that the _default stream can be +// accessed using both path formats: +// - projects/{p}/datasets/{d}/tables/{t}/streams/_default (Go client format) +// - projects/{p}/datasets/{d}/tables/{t}/_default (Java client format) +// +// This is a regression test for https://github.com/goccy/bigquery-emulator/issues/246 +func TestDefaultStreamPathFormats(t *testing.T) { + for _, test := range []struct { + name string + streamPath string // path format for the _default stream + }{ + { + name: "with_streams_prefix", + streamPath: "projects/%s/datasets/%s/tables/%s/streams/_default", + }, + { + name: "without_streams_prefix", + streamPath: "projects/%s/datasets/%s/tables/%s/_default", + }, + } { + t.Run(test.name, func(t *testing.T) { + const ( + projectID = "test" + datasetID = "test" + tableID = "default_stream_test" + ) + + ctx := context.Background() + bqServer, err := server.New(server.TempStorage) + if err != nil { + t.Fatal(err) + } + if err := bqServer.Load( + server.StructSource( + types.NewProject( + projectID, + types.NewDataset( + datasetID, + types.NewTable( + tableID, + []*types.Column{ + types.NewColumn("string_col", types.STRING), + types.NewColumn("int64_col", types.INT64), + }, + nil, + ), + ), + ), + ), + ); err != nil { + t.Fatal(err) + } + testServer := bqServer.TestServer() + defer func() { + testServer.Close() + bqServer.Close() + }() + opts, err := testServer.GRPCClientOptions(ctx) + if err != nil { + t.Fatal(err) + } + + // Use the raw BigQueryWriteClient (not managedwriter) so we + // control the exact stream path sent in the request. + writeClient, err := bqStorage.NewBigQueryWriteClient(ctx, opts...) + if err != nil { + t.Fatal(err) + } + defer writeClient.Close() + + streamName := fmt.Sprintf(test.streamPath, projectID, datasetID, tableID) + + // GetWriteStream should succeed and return the table schema. + writeStream, err := writeClient.GetWriteStream(ctx, &storagepb.GetWriteStreamRequest{ + Name: streamName, + }) + if err != nil { + t.Fatalf("GetWriteStream(%s) failed: %v", streamName, err) + } + if writeStream.GetTableSchema() == nil { + t.Fatal("GetWriteStream returned nil TableSchema") + } + if len(writeStream.GetTableSchema().GetFields()) == 0 { + t.Fatal("GetWriteStream returned empty schema fields") + } + t.Logf("GetWriteStream succeeded: %d schema fields", len(writeStream.GetTableSchema().GetFields())) + + // Verify we can also query the table via REST to confirm it's accessible. + bqClient, err := bigquery.NewClient( + ctx, + projectID, + option.WithEndpoint(testServer.URL), + option.WithoutAuthentication(), + ) + if err != nil { + t.Fatal(err) + } + defer bqClient.Close() + + iter := bqClient.Dataset(datasetID).Table(tableID).Read(ctx) + rowCount := countRows(t, iter) + if rowCount != 0 { + t.Fatalf("expected 0 rows in fresh table, got %d", rowCount) + } + }) + } +} + func countRows(t *testing.T, iter *bigquery.RowIterator) int { var resultRowCount int for {