Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 15 additions & 8 deletions server/storage_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -785,20 +785,27 @@ According to google documentation (https://pkg.go.dev/cloud.google.com/go/bigque
every table has a special stream named ‘_default’ to which data can be written. This stream doesn’t need to be created using CreateWriteStream

Here we create the default stream and add it to map in case it not exists yet, the GetWriteStreamRequest given as second
argument should have Name in this format: projects/<projectId>/datasets/<datasetId>/tables/<tableId>/streams/_default
argument should have Name in one of these formats:
- projects/<projectId>/datasets/<datasetId>/tables/<tableId>/streams/_default
- projects/<projectId>/datasets/<datasetId>/tables/<tableId>/_default
The Java client library uses the shorter format (without /streams/).
*/
func (s *storageWriteServer) createDefaultStream(ctx context.Context, req *storagepb.GetWriteStreamRequest) (*storagepb.WriteStream, error) {
streamId := req.Name
suffix := "_default"
streams := "/streams/"
if !strings.HasSuffix(streamId, suffix) {
return nil, fmt.Errorf("unexpected stream id: %s, expected '%s' suffix", streamId, suffix)
}
index := strings.LastIndex(streamId, streams)
if index == -1 {
return nil, fmt.Errorf("unexpected stream id: %s, expected containg '%s'", streamId, streams)
return nil, fmt.Errorf("unexpected stream id: %s, expected ‘%s’ suffix", streamId, suffix)
}
// Extract the table path by stripping the stream suffix.
// Handle both "…/streams/_default" and "…/_default" formats.
var streamPart string
if index := strings.LastIndex(streamId, "/streams/"); index != -1 {
streamPart = streamId[:index]
} else if index := strings.LastIndex(streamId, "/_default"); index != -1 {
streamPart = streamId[:index]
} else {
return nil, fmt.Errorf("unexpected stream id format: %s", streamId)
}
streamPart := streamId[:index]
writeStreamReq := &storagepb.CreateWriteStreamRequest{
Parent: streamPart,
WriteStream: &storagepb.WriteStream{
Expand Down
108 changes: 108 additions & 0 deletions server/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,114 @@ func TestStorageWrite(t *testing.T) {
}
}

// TestDefaultStreamPathFormats verifies that the _default stream can be
// accessed using both path formats:
// - projects/{p}/datasets/{d}/tables/{t}/streams/_default (Go client format)
// - projects/{p}/datasets/{d}/tables/{t}/_default (Java client format)
//
// This is a regression test for https://github.com/goccy/bigquery-emulator/issues/246
func TestDefaultStreamPathFormats(t *testing.T) {
for _, test := range []struct {
name string
streamPath string // path format for the _default stream
}{
{
name: "with_streams_prefix",
streamPath: "projects/%s/datasets/%s/tables/%s/streams/_default",
},
{
name: "without_streams_prefix",
streamPath: "projects/%s/datasets/%s/tables/%s/_default",
},
} {
t.Run(test.name, func(t *testing.T) {
const (
projectID = "test"
datasetID = "test"
tableID = "default_stream_test"
)

ctx := context.Background()
bqServer, err := server.New(server.TempStorage)
if err != nil {
t.Fatal(err)
}
if err := bqServer.Load(
server.StructSource(
types.NewProject(
projectID,
types.NewDataset(
datasetID,
types.NewTable(
tableID,
[]*types.Column{
types.NewColumn("string_col", types.STRING),
types.NewColumn("int64_col", types.INT64),
},
nil,
),
),
),
),
); err != nil {
t.Fatal(err)
}
testServer := bqServer.TestServer()
defer func() {
testServer.Close()
bqServer.Close()
}()
opts, err := testServer.GRPCClientOptions(ctx)
if err != nil {
t.Fatal(err)
}

// Use the raw BigQueryWriteClient (not managedwriter) so we
// control the exact stream path sent in the request.
writeClient, err := bqStorage.NewBigQueryWriteClient(ctx, opts...)
if err != nil {
t.Fatal(err)
}
defer writeClient.Close()

streamName := fmt.Sprintf(test.streamPath, projectID, datasetID, tableID)

// GetWriteStream should succeed and return the table schema.
writeStream, err := writeClient.GetWriteStream(ctx, &storagepb.GetWriteStreamRequest{
Name: streamName,
})
if err != nil {
t.Fatalf("GetWriteStream(%s) failed: %v", streamName, err)
}
if writeStream.GetTableSchema() == nil {
t.Fatal("GetWriteStream returned nil TableSchema")
}
if len(writeStream.GetTableSchema().GetFields()) == 0 {
t.Fatal("GetWriteStream returned empty schema fields")
}
t.Logf("GetWriteStream succeeded: %d schema fields", len(writeStream.GetTableSchema().GetFields()))

// Verify we can also query the table via REST to confirm it's accessible.
bqClient, err := bigquery.NewClient(
ctx,
projectID,
option.WithEndpoint(testServer.URL),
option.WithoutAuthentication(),
)
if err != nil {
t.Fatal(err)
}
defer bqClient.Close()

iter := bqClient.Dataset(datasetID).Table(tableID).Read(ctx)
rowCount := countRows(t, iter)
if rowCount != 0 {
t.Fatalf("expected 0 rows in fresh table, got %d", rowCount)
}
})
}
}

func countRows(t *testing.T, iter *bigquery.RowIterator) int {
var resultRowCount int
for {
Expand Down