Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 28 additions & 22 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,22 +1,28 @@
# Binaries for programs and plugins
*.exe
*.exe~
*.dll
*.so
*.dylib

# Test binary, built with `go test -c`
*.test

# Output of the go coverage tool, specifically when used with LiteIDE
*.out

# Dependency directories (remove the comment below to include it)
vendor/
dist

# Locally built emulator binary (produced by `make emulator/build`)
/bigquery-emulator

# Local output of the e2e conformance suite
/test/e2e/.out/
# Binaries for programs and plugins
*.exe
*.exe~
*.dll
*.so
*.dylib

# Test binary, built with `go test -c`
*.test

# Output of the go coverage tool, specifically when used with LiteIDE
*.out

# Dependency directories (remove the comment below to include it)
vendor/
dist

# Locally built emulator binary (produced by `make emulator/build`)
/bigquery-emulator

# Local output of the e2e conformance suite
/test/e2e/.out/

# Local reproduction scripts and logs
/repro/

# Local development log
devlog.md
238 changes: 121 additions & 117 deletions internal/connection/manager.go
Original file line number Diff line number Diff line change
@@ -1,117 +1,121 @@
package connection

import (
"context"
"database/sql"
"fmt"

"github.com/goccy/googlesqlite"
)

type Manager struct {
db *sql.DB
}

func NewManager(db *sql.DB) *Manager {
return &Manager{db: db}
}

func (m *Manager) Connection(ctx context.Context, projectID, datasetID string) (*Conn, error) {
if projectID == "" {
return nil, fmt.Errorf("invalid projectID. projectID is empty")
}
conn, err := m.db.Conn(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get connection: %w", err)
}
return &Conn{
ProjectID: projectID,
DatasetID: datasetID,
Conn: conn,
}, nil
}

type Tx struct {
tx *sql.Tx
conn *Conn
committed bool
}

func (t *Tx) Tx() *sql.Tx {
return t.tx
}

func (t *Tx) RollbackIfNotCommitted() error {
if t.committed {
return nil
}
defer t.conn.Conn.Close()
return t.tx.Rollback()
}

func (t *Tx) Commit() error {
if err := t.tx.Commit(); err != nil {
return err
}
t.committed = true
t.conn.Conn.Close()
return nil
}

func (t *Tx) SetProjectAndDataset(projectID, datasetID string) {
t.conn.ProjectID = projectID
t.conn.DatasetID = datasetID
}

func (t *Tx) MetadataRepoMode() error {
if err := t.conn.Conn.Raw(func(c interface{}) error {
gsqlConn, ok := c.(*googlesqlite.Conn)
if !ok {
return fmt.Errorf("failed to get *googlesqlite.Conn from %T", c)
}
_ = gsqlConn.SetNamePath([]string{})
return nil
}); err != nil {
return fmt.Errorf("failed to setup connection: %w", err)
}
return nil
}

func (t *Tx) ContentRepoMode() error {
if err := t.conn.Conn.Raw(func(c interface{}) error {
gsqlConn, ok := c.(*googlesqlite.Conn)
if !ok {
return fmt.Errorf("failed to get *googlesqlite.Conn from %T", c)
}
if t.conn.DatasetID == "" {
_ = gsqlConn.SetNamePath([]string{t.conn.ProjectID})
} else {
_ = gsqlConn.SetNamePath([]string{t.conn.ProjectID, t.conn.DatasetID})
}
const maxNamePath = 3 // projectID and datasetID and tableID
gsqlConn.SetMaxNamePath(maxNamePath)
return nil
}); err != nil {
return fmt.Errorf("failed to setup connection: %w", err)
}
return nil
}

type Conn struct {
ProjectID string
DatasetID string
Conn *sql.Conn
}

func (c *Conn) Begin(ctx context.Context) (*Tx, error) {
tx, err := c.Conn.BeginTx(ctx, nil)
if err != nil {
// The pooled connection is owned by the Tx once BeginTx succeeds and
// is released by Commit/RollbackIfNotCommitted. When BeginTx fails no
// Tx is created, so the connection must be returned to the pool here
// or it leaks for the lifetime of the process.
_ = c.Conn.Close()
return nil, err
}
return &Tx{tx: tx, conn: c}, nil
}
package connection

import (
"context"
"database/sql"
"fmt"

"github.com/goccy/googlesqlite"
)

type Manager struct {
db *sql.DB
}

func NewManager(db *sql.DB) *Manager {
return &Manager{db: db}
}

func (m *Manager) Connection(ctx context.Context, projectID, datasetID string) (*Conn, error) {
if projectID == "" {
return nil, fmt.Errorf("invalid projectID. projectID is empty")
}
conn, err := m.db.Conn(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get connection: %w", err)
}
return &Conn{
ProjectID: projectID,
DatasetID: datasetID,
Conn: conn,
}, nil
}

type Tx struct {
tx *sql.Tx
conn *Conn
committed bool
}

func (t *Tx) Tx() *sql.Tx {
return t.tx
}

func (t *Tx) RollbackIfNotCommitted() error {
if t.committed {
return nil
}
defer t.conn.Conn.Close()
return t.tx.Rollback()
}

func (t *Tx) Commit() error {
if err := t.tx.Commit(); err != nil {
return err
}
t.committed = true
t.conn.Conn.Close()
return nil
}

func (t *Tx) SetProjectAndDataset(projectID, datasetID string) {
t.conn.ProjectID = projectID
t.conn.DatasetID = datasetID
}

func (t *Tx) MetadataRepoMode() error {
if err := t.conn.Conn.Raw(func(c interface{}) error {
gsqlConn, ok := c.(*googlesqlite.Conn)
if !ok {
return fmt.Errorf("failed to get *googlesqlite.Conn from %T", c)
}
_ = gsqlConn.SetNamePath([]string{})
return nil
}); err != nil {
return fmt.Errorf("failed to setup connection: %w", err)
}
return nil
}

func (t *Tx) ContentRepoMode() error {
if err := t.conn.Conn.Raw(func(c interface{}) error {
gsqlConn, ok := c.(*googlesqlite.Conn)
if !ok {
return fmt.Errorf("failed to get *googlesqlite.Conn from %T", c)
}
if t.conn.ProjectID == "" {
return fmt.Errorf("invalid projectID. projectID is empty")
}
namePath := []string{t.conn.ProjectID}
if t.conn.DatasetID != "" {
namePath = append(namePath, t.conn.DatasetID)
}
_ = gsqlConn.SetNamePath(namePath)

const maxNamePath = 3 // projectID and datasetID and tableID
gsqlConn.SetMaxNamePath(maxNamePath)
return nil
}); err != nil {
return fmt.Errorf("failed to setup connection: %w", err)
}
return nil
}

type Conn struct {
ProjectID string
DatasetID string
Conn *sql.Conn
}

func (c *Conn) Begin(ctx context.Context) (*Tx, error) {
tx, err := c.Conn.BeginTx(ctx, nil)
if err != nil {
// The pooled connection is owned by the Tx once BeginTx succeeds and
// is released by Commit/RollbackIfNotCommitted. When BeginTx fails no
// Tx is created, so the connection must be returned to the pool here
// or it leaks for the lifetime of the process.
_ = c.Conn.Close()
return nil, err
}
return &Tx{tx: tx, conn: c}, nil
}
4 changes: 2 additions & 2 deletions internal/contentdata/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ func (r *Repository) convertValueToCell(value interface{}, schema *bigqueryv2.Ta

func (r *Repository) CreateOrReplaceTable(ctx context.Context, tx *connection.Tx, projectID, datasetID string, table *types.Table) error {
tx.SetProjectAndDataset(projectID, datasetID)
if err := tx.ContentRepoMode(); err != nil {
if err := tx.MetadataRepoMode(); err != nil {
return err
}
defer func() {
Expand Down Expand Up @@ -577,7 +577,7 @@ func (r *Repository) AddTableData(ctx context.Context, tx *connection.Tx, projec
return nil
}
tx.SetProjectAndDataset(projectID, datasetID)
if err := tx.ContentRepoMode(); err != nil {
if err := tx.MetadataRepoMode(); err != nil {
return err
}
defer func() {
Expand Down
Loading