diff --git a/go.mod b/go.mod index aceb438e44..fa5bb582f6 100644 --- a/go.mod +++ b/go.mod @@ -38,7 +38,7 @@ require ( github.com/go-mysql-org/go-mysql v1.13.0 github.com/go-oauth2/oauth2/v4 v4.5.4 github.com/go-ozzo/ozzo-validation/v4 v4.3.0 - github.com/go-sql-driver/mysql v1.7.1 + github.com/go-sql-driver/mysql v1.8.1 github.com/goccy/go-json v0.10.5 github.com/gogo/gateway v1.1.0 github.com/gogo/protobuf v1.3.2 diff --git a/go.sum b/go.sum index 1cc75fead9..52c62b3488 100644 --- a/go.sum +++ b/go.sum @@ -452,8 +452,8 @@ github.com/go-resty/resty/v2 v2.11.0 h1:i7jMfNOJYMp69lq7qozJP+bjgzfAzeOhuGlyDrqx github.com/go-resty/resty/v2 v2.11.0/go.mod h1:iiP/OpA0CkcL3IGt1O0+/SIItFUbkkyw5BGXiVdTu+A= github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-sql-driver/mysql v1.7.0/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI= -github.com/go-sql-driver/mysql v1.7.1 h1:lUIinVbN1DY0xBg0eMOzmmtGoHwWBbvnWubQUrtU8EI= -github.com/go-sql-driver/mysql v1.7.1/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI= +github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y= +github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/go-test/deep v1.0.8 h1:TDsG77qcSprGbC6vTN8OuXp5g+J+b5Pcguhf7Zt61VM= github.com/go-test/deep v1.0.8/go.mod h1:5C2ZWiW0ErCdrYzpqxLbTX7MG14M9iiw8DgHncVwcsE= diff --git a/pkg/logutil/log_test.go b/pkg/logutil/log_test.go index 479c8240fd..fce0b4f8e0 100644 --- a/pkg/logutil/log_test.go +++ b/pkg/logutil/log_test.go @@ -22,6 +22,7 @@ import ( "path/filepath" "regexp" "runtime" + "strings" "sync" "testing" "time" @@ -236,10 +237,15 @@ func TestMySQLLogger(t *testing.T) { defer cancel() err = db.PingContext(ctx) require.Error(t, err) - // Log: [ERROR] [packets.go:37] ["unexpected EOF"] [component="[mysql]"] - require.Contains(t, buffer.Stripped(), "[ERROR]") - require.Contains(t, buffer.Stripped(), "packets.go") - require.Contains(t, buffer.Stripped(), "unexpected EOF") - require.Contains(t, buffer.Stripped(), "[mysql]") + // Log: [ERROR] [] ["unexpected EOF"] [component="[mysql]"] + // Caller file depends on go-sql-driver/mysql version: v1.7.x logs from + // packets.go; v1.8.x logs from connection.go. + out := buffer.Stripped() + require.Contains(t, out, "[ERROR]") + require.True(t, + strings.Contains(out, "packets.go") || + strings.Contains(out, "connection.go")) + require.Contains(t, out, "unexpected EOF") + require.Contains(t, out, "[mysql]") wg.Wait() } diff --git a/pkg/sink/codec/common/helper.go b/pkg/sink/codec/common/helper.go index b42599a7c4..ee2f4cfbc6 100644 --- a/pkg/sink/codec/common/helper.go +++ b/pkg/sink/codec/common/helper.go @@ -34,6 +34,7 @@ type ColumnsHolder struct { Values []interface{} ValuePointers []interface{} Types []*sql.ColumnType + rawValues [][]byte } func newColumnHolder(rows *sql.Rows) (*ColumnsHolder, error) { @@ -42,17 +43,23 @@ func newColumnHolder(rows *sql.Rows) (*ColumnsHolder, error) { return nil, errors.Trace(err) } - values := make([]interface{}, len(columnTypes)) - valuePointers := make([]interface{}, len(columnTypes)) - for i := range values { - valuePointers[i] = &values[i] - } - - return &ColumnsHolder{ - Values: values, - ValuePointers: valuePointers, + n := len(columnTypes) + h := &ColumnsHolder{ + Values: make([]interface{}, n), + ValuePointers: make([]interface{}, n), Types: columnTypes, - }, nil + rawValues: make([][]byte, n), + } + // Scan into *[]byte so every column lands as []byte regardless of its + // underlying Go type. go-sql-driver/mysql v1.8 returns typed values + // (int64, float64, ...) when scanning into *interface{}; downstream + // decoders type-assert .([]uint8). Routing through database/sql's + // convertAssign for *[]byte formats numbers/bools/etc. as their textual + // representation, preserving the pre-v1.8 contract. + for i := range h.rawValues { + h.ValuePointers[i] = &h.rawValues[i] + } + return h, nil } // Length return the column count @@ -256,6 +263,16 @@ func MustSnapshotQuery( zap.String("schema", schema), zap.String("table", table), zap.Uint64("commitTs", commitTs), zap.Error(err)) } + for i, b := range holder.rawValues { + // A nil byte slice from Scan signals SQL NULL. Preserve that as an + // untyped nil so downstream `value == nil` checks behave the same + // way they did before the *[]byte scan was introduced. + if b == nil { + holder.Values[i] = nil + } else { + holder.Values[i] = b + } + } } return holder }