Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
AvroEnableWatermark: oldConfig.AvroEnableWatermark,
AvroDecimalHandlingMode: oldConfig.AvroDecimalHandlingMode,
AvroBigintUnsignedHandlingMode: oldConfig.AvroBigintUnsignedHandlingMode,
AvroIncludeBeforeValue: oldConfig.AvroIncludeBeforeValue,
EncodingFormat: oldConfig.EncodingFormat,
}
}
Expand Down Expand Up @@ -614,6 +615,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
AvroEnableWatermark: oldConfig.AvroEnableWatermark,
AvroDecimalHandlingMode: oldConfig.AvroDecimalHandlingMode,
AvroBigintUnsignedHandlingMode: oldConfig.AvroBigintUnsignedHandlingMode,
AvroIncludeBeforeValue: oldConfig.AvroIncludeBeforeValue,
EncodingFormat: oldConfig.EncodingFormat,
}
}
Expand Down Expand Up @@ -1192,6 +1194,7 @@ type CodecConfig struct {
AvroEnableWatermark *bool `json:"avro_enable_watermark,omitempty"`
AvroDecimalHandlingMode *string `json:"avro_decimal_handling_mode,omitempty"`
AvroBigintUnsignedHandlingMode *string `json:"avro_bigint_unsigned_handling_mode,omitempty"`
AvroIncludeBeforeValue *bool `json:"avro_include_before_value,omitempty"`
EncodingFormat *string `json:"encoding_format,omitempty"`
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/config/config_test_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ const (
"avro-enable-watermark": true,
"avro-decimal-handling-mode": "string",
"avro-bigint-unsigned-handling-mode": "string",
"avro-include-before-value": true,
"encoding-format": "json"
},
"large-message-handle": {
Expand Down Expand Up @@ -469,6 +470,7 @@ const (
"avro-enable-watermark": true,
"avro-decimal-handling-mode": "string",
"avro-bigint-unsigned-handling-mode": "string",
"avro-include-before-value": true,
"encoding-format": "json"
},
"large-message-handle": {
Expand Down
1 change: 1 addition & 0 deletions pkg/config/replica_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ func TestReplicaConfigMarshal(t *testing.T) {
AvroEnableWatermark: aws.Bool(true),
AvroDecimalHandlingMode: aws.String("string"),
AvroBigintUnsignedHandlingMode: aws.String("string"),
AvroIncludeBeforeValue: aws.Bool(true),
EncodingFormat: aws.String("json"),
},
LargeMessageHandle: &LargeMessageHandleConfig{
Expand Down
1 change: 1 addition & 0 deletions pkg/config/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,7 @@ type CodecConfig struct {
AvroEnableWatermark *bool `toml:"avro-enable-watermark" json:"avro-enable-watermark"`
AvroDecimalHandlingMode *string `toml:"avro-decimal-handling-mode" json:"avro-decimal-handling-mode,omitempty"`
AvroBigintUnsignedHandlingMode *string `toml:"avro-bigint-unsigned-handling-mode" json:"avro-bigint-unsigned-handling-mode,omitempty"`
AvroIncludeBeforeValue *bool `toml:"avro-include-before-value" json:"avro-include-before-value,omitempty"`
EncodingFormat *string `toml:"encoding-format" json:"encoding-format,omitempty"`
}

Expand Down
68 changes: 68 additions & 0 deletions pkg/sink/codec/avro/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,12 @@ func (a *BatchEncoder) encodeValue(ctx context.Context, topic string, e *model.R
log.Error("avro: converting value to native failed", zap.Error(err))
return nil, errors.Trace(err)
}
if a.config.AvroIncludeBeforeValue {
native, err = a.nativeValueWithBeforeValue(native, e)
if err != nil {
return nil, errors.Trace(err)
}
}
if a.config.EnableTiDBExtension {
native = a.nativeValueWithExtension(native, e)
}
Expand Down Expand Up @@ -326,6 +332,41 @@ func getOperation(e *model.RowChangedEvent) string {
return ""
}

func beforeValueRecordName(tableName model.TableName) string {
return common.SanitizeName(tableName.Table) + "_before"
}

func (a *BatchEncoder) beforeValueRecordFullName(tableName model.TableName) string {
namespace := getAvroNamespace(a.namespace, tableName.Schema)
if namespace == "" {
return beforeValueRecordName(tableName)
}
return namespace + "." + beforeValueRecordName(tableName)
}

func (a *BatchEncoder) nativeValueWithBeforeValue(
native map[string]interface{},
e *model.RowChangedEvent,
) (map[string]interface{}, error) {
if !e.IsUpdate() {
native[ticdcBefore] = goavro.Union("null", nil)
return native, nil
}

input := avroEncodeInput{
TableInfo: e.TableInfo,
columns: e.PreColumns,
colInfos: e.TableInfo.GetColInfosForRowChangedEvent(),
}
before, err := a.columns2AvroData(input)
if err != nil {
log.Error("avro: converting before value to native failed", zap.Error(err))
return nil, errors.Trace(err)
}
native[ticdcBefore] = goavro.Union(a.beforeValueRecordFullName(e.TableInfo.TableName), before)
return native, nil
}

func (a *BatchEncoder) nativeValueWithExtension(
native map[string]interface{},
e *model.RowChangedEvent,
Expand Down Expand Up @@ -354,6 +395,7 @@ const (
tidbOp = "_tidb_op"
tidbCommitTs = "_tidb_commit_ts"
tidbPhysicalTime = "_tidb_commit_physical_time"
ticdcBefore = "_ticdc_before"

// row level checksum related fields
tidbRowLevelChecksum = "_tidb_row_level_checksum"
Expand Down Expand Up @@ -522,6 +564,25 @@ func (a *BatchEncoder) schemaWithExtension(
return top
}

func (a *BatchEncoder) schemaWithBeforeValue(
top *avroSchemaTop,
tableName model.TableName,
input avroEncodeInput,
) (*avroSchemaTop, error) {
beforeValue, err := a.columns2AvroSchema(tableName, input)
if err != nil {
return nil, err
}
beforeValue.Name = beforeValueRecordName(tableName)

top.Fields = append(top.Fields, map[string]interface{}{
"name": ticdcBefore,
"type": []interface{}{"null", beforeValue},
"default": nil,
})
return top, nil
}

func (a *BatchEncoder) columns2AvroSchema(tableName model.TableName, input avroEncodeInput) (*avroSchemaTop, error) {
top := &avroSchemaTop{
Tp: "record",
Expand Down Expand Up @@ -588,6 +649,13 @@ func (a *BatchEncoder) value2AvroSchema(tableName model.TableName, input avroEnc
return "", err
}

if a.config.AvroIncludeBeforeValue {
top, err = a.schemaWithBeforeValue(top, tableName, input)
if err != nil {
return "", err
}
}

if a.config.EnableTiDBExtension {
top = a.schemaWithExtension(top)
}
Expand Down
81 changes: 81 additions & 0 deletions pkg/sink/codec/avro/avro_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,87 @@ func TestAvroEncode(t *testing.T) {
}
}

func TestAvroEncodeIncludeBeforeValue(t *testing.T) {
codecConfig := common.NewConfig(config.ProtocolAvro)
codecConfig.EnableTiDBExtension = true
codecConfig.AvroIncludeBeforeValue = true

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

encoder, err := SetupEncoderAndSchemaRegistry4Testing(ctx, codecConfig)
defer TeardownEncoderAndSchemaRegistry4Testing()
require.NoError(t, err)
require.NotNil(t, encoder)

tableColumns := []*model.Column{
{Name: "id", Type: mysql.TypeLong, Flag: model.HandleKeyFlag | model.PrimaryKeyFlag},
{Name: "name", Type: mysql.TypeVarchar},
{Name: "age", Type: mysql.TypeLong},
}
tableInfo := model.BuildTableInfo("test", "person", tableColumns, [][]int{{0}})
beforeColumns := []*model.Column{
{Name: "id", Value: int64(1)},
{Name: "name", Value: "old"},
{Name: "age", Value: int64(18)},
}
afterColumns := []*model.Column{
{Name: "id", Value: int64(1)},
{Name: "name", Value: "new"},
{Name: "age", Value: int64(20)},
}
event := &model.RowChangedEvent{
CommitTs: 1024,
TableInfo: tableInfo,
PreColumns: model.Columns2ColumnDatas(beforeColumns, tableInfo),
Columns: model.Columns2ColumnDatas(afterColumns, tableInfo),
}

topic := "default"
bin, err := encoder.encodeValue(ctx, topic, event)
require.NoError(t, err)

cid, data, err := extractConfluentSchemaIDAndBinaryData(bin)
require.NoError(t, err)

avroValueCodec, err := encoder.schemaM.Lookup(ctx, topic, schemaID{confluentSchemaID: cid})
require.NoError(t, err)

res, _, err := avroValueCodec.NativeFromBinary(data)
require.NoError(t, err)
require.NotNil(t, res)

m, ok := res.(map[string]interface{})
require.True(t, ok)
require.Equal(t, int32(20), m["age"])
require.Equal(t, "new", m["name"])

beforeUnion, ok := m[ticdcBefore].(map[string]interface{})
require.True(t, ok)
before, ok := beforeUnion[encoder.beforeValueRecordFullName(tableInfo.TableName)].(map[string]interface{})
require.True(t, ok)
require.Equal(t, int32(18), before["age"])
require.Equal(t, "old", before["name"])

key, err := encoder.encodeKey(ctx, topic, event)
require.NoError(t, err)
decoder := NewDecoder(codecConfig, encoder.schemaM, topic, nil)
err = decoder.AddKeyValue(key, bin)
require.NoError(t, err)

messageType, exist, err := decoder.HasNext()
require.NoError(t, err)
require.True(t, exist)
require.Equal(t, model.MessageTypeRow, messageType)

decoded, err := decoder.NextRowChangedEvent()
require.NoError(t, err)
require.True(t, decoded.IsUpdate())
require.Equal(t, uint64(1024), decoded.CommitTs)
require.Equal(t, "old", decoded.GetPreColumns()[1].Value)
require.Equal(t, "new", decoded.GetColumns()[1].Value)
}

func TestAvroEnvelope(t *testing.T) {
t.Parallel()
cManager := &confluentSchemaManager{}
Expand Down
123 changes: 90 additions & 33 deletions pkg/sink/codec/avro/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,93 @@ func assembleEvent(
return nil, errors.New("schema fields should be a map")
}

columns, err := avroData2Columns(keyMap, valueMap, fields)
if err != nil {
return nil, errors.Trace(err)
}

beforeMap, hasBefore, err := extractBeforeValueMap(valueMap)
if err != nil {
return nil, errors.Trace(err)
}
var beforeColumns []*model.Column
if hasBefore {
beforeColumns, err = avroData2Columns(keyMap, beforeMap, fields)
if err != nil {
return nil, errors.Trace(err)
}
}

// "namespace.schema"
namespace := schema["namespace"].(string)
schemaName := strings.Split(namespace, ".")[1]
tableName := schema["name"].(string)
Comment on lines +203 to +205
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

These type assertions and the string split operation are fragile and could lead to a panic if the schema metadata is unexpected (e.g., missing namespace or namespace without a dot). It is safer to use comma-ok assertions and check the length of the split result.

Suggested change
namespace := schema["namespace"].(string)
schemaName := strings.Split(namespace, ".")[1]
tableName := schema["name"].(string)
namespace, ok := schema["namespace"].(string)
if !ok {
return nil, errors.New("namespace not found or not a string")
}
parts := strings.Split(namespace, ".")
if len(parts) < 2 {
return nil, errors.New("invalid namespace format")
}
schemaName := parts[1]
tableName, ok := schema["name"].(string)
if !ok {
return nil, errors.New("table name not found or not a string")
}


var commitTs int64
if !isDelete {
o, ok := valueMap[tidbCommitTs]
if !ok {
return nil, errors.New("commit ts not found")
}
commitTs = o.(int64)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This type assertion could panic if the value is not an int64. Using a comma-ok assertion is safer.

commitTs, ok = o.(int64)
		if !ok {
			return nil, errors.New("commit ts is not an int64")
		}

}

event := new(model.RowChangedEvent)
event.CommitTs = uint64(commitTs)
pkNameSet := make(map[string]struct{}, len(keyMap))
for name := range keyMap {
pkNameSet[name] = struct{}{}
}
event.TableInfo = model.BuildTableInfoWithPKNames4Test(schemaName, tableName, columns, pkNameSet)

if isDelete {
event.PreColumns = model.Columns2ColumnDatas(columns, event.TableInfo)
} else {
event.Columns = model.Columns2ColumnDatas(columns, event.TableInfo)
if hasBefore {
event.PreColumns = model.Columns2ColumnDatas(beforeColumns, event.TableInfo)
}
}

return event, nil
}

func isAvroExtensionField(name string) bool {
switch name {
case tidbOp, tidbCommitTs, tidbPhysicalTime, tidbRowLevelChecksum,
tidbChecksumVersion, tidbCorrupted, ticdcBefore:
return true
default:
return false
}
}

func extractBeforeValueMap(valueMap map[string]interface{}) (map[string]interface{}, bool, error) {
rawBefore, ok := valueMap[ticdcBefore]
if !ok || rawBefore == nil {
return nil, false, nil
}

beforeUnion, ok := rawBefore.(map[string]interface{})
if !ok {
return nil, false, errors.New("before value should be a map")
}
for unionName, value := range beforeUnion {
if unionName == "null" || value == nil {
return nil, false, nil
}
before, ok := value.(map[string]interface{})
if !ok {
return nil, false, errors.New("before record should be a map")
}
return before, true, nil
}
return nil, false, nil
}

func avroData2Columns(
keyMap, valueMap map[string]interface{}, fields []interface{},
) ([]*model.Column, error) {
columns := make([]*model.Column, 0, len(valueMap))
// fields is ordered by the column id, so iterate over it to build columns
// it's also the order to calculate the checksum.
Expand All @@ -191,10 +278,9 @@ func assembleEvent(
return nil, errors.New("schema field should be a map")
}

// `tidbOp` is the first extension field in the schema,
// it's not real columns, so break here.
// Extension fields are not real columns, so break here.
colName := field["name"].(string)
if colName == tidbOp {
if isAvroExtensionField(colName) {
break
}

Expand Down Expand Up @@ -241,36 +327,7 @@ func assembleEvent(
}
columns = append(columns, col)
}

// "namespace.schema"
namespace := schema["namespace"].(string)
schemaName := strings.Split(namespace, ".")[1]
tableName := schema["name"].(string)

var commitTs int64
if !isDelete {
o, ok := valueMap[tidbCommitTs]
if !ok {
return nil, errors.New("commit ts not found")
}
commitTs = o.(int64)
}

event := new(model.RowChangedEvent)
event.CommitTs = uint64(commitTs)
pkNameSet := make(map[string]struct{}, len(keyMap))
for name := range keyMap {
pkNameSet[name] = struct{}{}
}
event.TableInfo = model.BuildTableInfoWithPKNames4Test(schemaName, tableName, columns, pkNameSet)

if isDelete {
event.PreColumns = model.Columns2ColumnDatas(columns, event.TableInfo)
} else {
event.Columns = model.Columns2ColumnDatas(columns, event.TableInfo)
}

return event, nil
return columns, nil
}

func isCorrupted(valueMap map[string]interface{}) bool {
Expand Down
Loading
Loading