Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,9 @@ dist

# Local output of the e2e conformance suite
/test/e2e/.out/

# Local reproduction scripts and logs
/repro/

# Local development log (not for version control)
devlog.md
312 changes: 195 additions & 117 deletions internal/connection/manager.go
Original file line number Diff line number Diff line change
@@ -1,117 +1,195 @@
package connection

import (
"context"
"database/sql"
"fmt"

"github.com/goccy/googlesqlite"
)

type Manager struct {
db *sql.DB
}

func NewManager(db *sql.DB) *Manager {
return &Manager{db: db}
}

func (m *Manager) Connection(ctx context.Context, projectID, datasetID string) (*Conn, error) {
if projectID == "" {
return nil, fmt.Errorf("invalid projectID. projectID is empty")
}
conn, err := m.db.Conn(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get connection: %w", err)
}
return &Conn{
ProjectID: projectID,
DatasetID: datasetID,
Conn: conn,
}, nil
}

type Tx struct {
tx *sql.Tx
conn *Conn
committed bool
}

func (t *Tx) Tx() *sql.Tx {
return t.tx
}

func (t *Tx) RollbackIfNotCommitted() error {
if t.committed {
return nil
}
defer t.conn.Conn.Close()
return t.tx.Rollback()
}

func (t *Tx) Commit() error {
if err := t.tx.Commit(); err != nil {
return err
}
t.committed = true
t.conn.Conn.Close()
return nil
}

func (t *Tx) SetProjectAndDataset(projectID, datasetID string) {
t.conn.ProjectID = projectID
t.conn.DatasetID = datasetID
}

func (t *Tx) MetadataRepoMode() error {
if err := t.conn.Conn.Raw(func(c interface{}) error {
gsqlConn, ok := c.(*googlesqlite.Conn)
if !ok {
return fmt.Errorf("failed to get *googlesqlite.Conn from %T", c)
}
_ = gsqlConn.SetNamePath([]string{})
return nil
}); err != nil {
return fmt.Errorf("failed to setup connection: %w", err)
}
return nil
}

func (t *Tx) ContentRepoMode() error {
if err := t.conn.Conn.Raw(func(c interface{}) error {
gsqlConn, ok := c.(*googlesqlite.Conn)
if !ok {
return fmt.Errorf("failed to get *googlesqlite.Conn from %T", c)
}
if t.conn.DatasetID == "" {
_ = gsqlConn.SetNamePath([]string{t.conn.ProjectID})
} else {
_ = gsqlConn.SetNamePath([]string{t.conn.ProjectID, t.conn.DatasetID})
}
const maxNamePath = 3 // projectID and datasetID and tableID
gsqlConn.SetMaxNamePath(maxNamePath)
return nil
}); err != nil {
return fmt.Errorf("failed to setup connection: %w", err)
}
return nil
}

type Conn struct {
ProjectID string
DatasetID string
Conn *sql.Conn
}

func (c *Conn) Begin(ctx context.Context) (*Tx, error) {
tx, err := c.Conn.BeginTx(ctx, nil)
if err != nil {
// The pooled connection is owned by the Tx once BeginTx succeeds and
// is released by Commit/RollbackIfNotCommitted. When BeginTx fails no
// Tx is created, so the connection must be returned to the pool here
// or it leaks for the lifetime of the process.
_ = c.Conn.Close()
return nil, err
}
return &Tx{tx: tx, conn: c}, nil
}
package connection

import (
"context"
"database/sql"
"fmt"
"reflect"
"unsafe"

"github.com/goccy/googlesqlite"
)

type Manager struct {
db *sql.DB
}

func NewManager(db *sql.DB) *Manager {
return &Manager{db: db}
}

func (m *Manager) Connection(ctx context.Context, projectID, datasetID string) (*Conn, error) {
if projectID == "" {
return nil, fmt.Errorf("invalid projectID. projectID is empty")
}
conn, err := m.db.Conn(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get connection: %w", err)
}
return &Conn{
ProjectID: projectID,
DatasetID: datasetID,
Conn: conn,
}, nil
}

type Tx struct {
tx *sql.Tx
conn *Conn
committed bool
}

func (t *Tx) Tx() *sql.Tx {
return t.tx
}

func (t *Tx) RollbackIfNotCommitted() error {
if t.committed {
return nil
}
defer t.conn.Conn.Close()
return t.tx.Rollback()
}

func (t *Tx) Commit() error {
if err := t.tx.Commit(); err != nil {
return err
}
t.committed = true
t.conn.Conn.Close()
return nil
}

func (t *Tx) SetProjectAndDataset(projectID, datasetID string) {
t.conn.ProjectID = projectID
t.conn.DatasetID = datasetID
}

func (t *Tx) MetadataRepoMode() error {
if err := t.conn.Conn.Raw(func(c interface{}) error {
gsqlConn, ok := c.(*googlesqlite.Conn)
if !ok {
return fmt.Errorf("failed to get *googlesqlite.Conn from %T", c)
}
_ = gsqlConn.SetNamePath([]string{})
return nil
}); err != nil {
return fmt.Errorf("failed to setup connection: %w", err)
}
return nil
}

func (t *Tx) ContentRepoMode() error {
if err := t.conn.Conn.Raw(func(c interface{}) error {
gsqlConn, ok := c.(*googlesqlite.Conn)
if !ok {
return fmt.Errorf("failed to get *googlesqlite.Conn from %T", c)
}
if t.conn.ProjectID == "" {
return fmt.Errorf("invalid projectID. projectID is empty")
}
namePath := []string{t.conn.ProjectID}
if t.conn.DatasetID != "" {
namePath = append(namePath, t.conn.DatasetID)
}
_ = gsqlConn.SetNamePath(namePath)

const maxNamePath = 3 // projectID and datasetID and tableID
gsqlConn.SetMaxNamePath(maxNamePath)
return nil
}); err != nil {
return fmt.Errorf("failed to setup connection: %w", err)
}
return nil
}

type Conn struct {
ProjectID string
DatasetID string
Conn *sql.Conn
}

// RawExec executes a raw SQLite statement directly on the inner connection,
// bypassing googlesqlite's ZetaSQL parser. Safe to call while a transaction
// is active (after Begin, before Commit/Rollback).
func (t *Tx) RawExec(ctx context.Context, query string, args ...interface{}) error {
var execErr error
if err := t.conn.Conn.Raw(func(c interface{}) error {
gsqlConn, ok := c.(*googlesqlite.Conn)
if !ok {
return fmt.Errorf("unexpected driver connection type %T", c)
}
innerConn, err := innerSQLConn(gsqlConn)
if err != nil {
return err
}
_, execErr = innerConn.ExecContext(ctx, query, args...)
return nil
}); err != nil {
return err
}
return execErr
}
Comment on lines +115 to +132

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit 5b3b693. Extracted the reflect+unsafe access into a dedicated innerSQLConn helper that validates f.Kind() == reflect.Ptr and f.Type() == reflect.TypeOf((*sql.Conn)(nil)) before the pointer reinterpretation. A type mismatch now returns a clear error instead of causing a panic or memory corruption.


// RawQueryRow executes a raw SQLite query, bypassing ZetaSQL, and returns
// the single-row result. The caller must scan the row before the next call.
func (t *Tx) RawQueryRow(ctx context.Context, query string, args ...interface{}) (*sql.Row, error) {
var row *sql.Row
if err := t.conn.Conn.Raw(func(c interface{}) error {
gsqlConn, ok := c.(*googlesqlite.Conn)
if !ok {
return fmt.Errorf("unexpected driver connection type %T", c)
}
innerConn, err := innerSQLConn(gsqlConn)
if err != nil {
return err
}
row = innerConn.QueryRowContext(ctx, query, args...)
return nil
}); err != nil {
return nil, err
}
return row, nil
}

// innerSQLConn extracts the unexported *sql.Conn from a *googlesqlite.Conn via
// reflect. It validates the field name, kind, and exact type before the unsafe
// pointer reinterpretation so a layout change in googlesqlite surfaces as a
// clear error rather than a panic or memory corruption.
func innerSQLConn(gsqlConn *googlesqlite.Conn) (*sql.Conn, error) {
f := reflect.ValueOf(gsqlConn).Elem().FieldByName("conn")
if !f.IsValid() {
return nil, fmt.Errorf("googlesqlite.Conn has no 'conn' field")
}
wantType := reflect.TypeOf((*sql.Conn)(nil))
if f.Kind() != reflect.Ptr || f.Type() != wantType {
return nil, fmt.Errorf("googlesqlite.Conn.conn has unexpected type %s (want %s)", f.Type(), wantType)
}
return *(**sql.Conn)(unsafe.Pointer(f.UnsafeAddr())), nil
}

// WithGSQLConn calls f with the underlying *googlesqlite.Conn. Use this to
// manipulate googlesqlite internals (e.g. the in-memory ZetaSQL catalog)
// from outside the package.
func (t *Tx) WithGSQLConn(f func(*googlesqlite.Conn) error) error {
return t.conn.Conn.Raw(func(c interface{}) error {
gsqlConn, ok := c.(*googlesqlite.Conn)
if !ok {
return fmt.Errorf("unexpected driver connection type %T", c)
}
return f(gsqlConn)
})
}

func (c *Conn) Begin(ctx context.Context) (*Tx, error) {
tx, err := c.Conn.BeginTx(ctx, nil)
if err != nil {
// The pooled connection is owned by the Tx once BeginTx succeeds and
// is released by Commit/RollbackIfNotCommitted. When BeginTx fails no
// Tx is created, so the connection must be returned to the pool here
// or it leaks for the lifetime of the process.
_ = c.Conn.Close()
return nil, err
}
return &Tx{tx: tx, conn: c}, nil
}
4 changes: 2 additions & 2 deletions internal/contentdata/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ func (r *Repository) convertValueToCell(value interface{}, schema *bigqueryv2.Ta

func (r *Repository) CreateOrReplaceTable(ctx context.Context, tx *connection.Tx, projectID, datasetID string, table *types.Table) error {
tx.SetProjectAndDataset(projectID, datasetID)
if err := tx.ContentRepoMode(); err != nil {
if err := tx.MetadataRepoMode(); err != nil {
return err
}
Comment on lines +544 to 546

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is part of the shared commits from PR #492 (fix for #470). The switch to MetadataRepoMode() in these calls is intentional: the table is being registered in the metadata catalog during the streaming-insert resolution path, not written to the content store. PR #492 contains the full explanation and test coverage for this change.

defer func() {
Expand Down Expand Up @@ -577,7 +577,7 @@ func (r *Repository) AddTableData(ctx context.Context, tx *connection.Tx, projec
return nil
}
tx.SetProjectAndDataset(projectID, datasetID)
if err := tx.ContentRepoMode(); err != nil {
if err := tx.MetadataRepoMode(); err != nil {
return err
}
Comment on lines +580 to 582

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above — this is part of the shared commits from PR #492. Addressed there.

defer func() {
Expand Down
Loading