diff --git a/cdc/api/v2/model.go b/cdc/api/v2/model.go index eb20e3e979..784bfd776c 100644 --- a/cdc/api/v2/model.go +++ b/cdc/api/v2/model.go @@ -342,6 +342,7 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig( AvroEnableWatermark: oldConfig.AvroEnableWatermark, AvroDecimalHandlingMode: oldConfig.AvroDecimalHandlingMode, AvroBigintUnsignedHandlingMode: oldConfig.AvroBigintUnsignedHandlingMode, + AvroIncludeBeforeValue: oldConfig.AvroIncludeBeforeValue, EncodingFormat: oldConfig.EncodingFormat, } } @@ -614,6 +615,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig { AvroEnableWatermark: oldConfig.AvroEnableWatermark, AvroDecimalHandlingMode: oldConfig.AvroDecimalHandlingMode, AvroBigintUnsignedHandlingMode: oldConfig.AvroBigintUnsignedHandlingMode, + AvroIncludeBeforeValue: oldConfig.AvroIncludeBeforeValue, EncodingFormat: oldConfig.EncodingFormat, } } @@ -1192,6 +1194,7 @@ type CodecConfig struct { AvroEnableWatermark *bool `json:"avro_enable_watermark,omitempty"` AvroDecimalHandlingMode *string `json:"avro_decimal_handling_mode,omitempty"` AvroBigintUnsignedHandlingMode *string `json:"avro_bigint_unsigned_handling_mode,omitempty"` + AvroIncludeBeforeValue *bool `json:"avro_include_before_value,omitempty"` EncodingFormat *string `json:"encoding_format,omitempty"` } diff --git a/pkg/config/config_test_data.go b/pkg/config/config_test_data.go index 38b7290fe6..b1ceba1237 100644 --- a/pkg/config/config_test_data.go +++ b/pkg/config/config_test_data.go @@ -292,6 +292,7 @@ const ( "avro-enable-watermark": true, "avro-decimal-handling-mode": "string", "avro-bigint-unsigned-handling-mode": "string", + "avro-include-before-value": true, "encoding-format": "json" }, "large-message-handle": { @@ -469,6 +470,7 @@ const ( "avro-enable-watermark": true, "avro-decimal-handling-mode": "string", "avro-bigint-unsigned-handling-mode": "string", + "avro-include-before-value": true, "encoding-format": "json" }, "large-message-handle": { diff --git a/pkg/config/replica_config_test.go b/pkg/config/replica_config_test.go index 84c8322767..641f202950 100644 --- a/pkg/config/replica_config_test.go +++ b/pkg/config/replica_config_test.go @@ -103,6 +103,7 @@ func TestReplicaConfigMarshal(t *testing.T) { AvroEnableWatermark: aws.Bool(true), AvroDecimalHandlingMode: aws.String("string"), AvroBigintUnsignedHandlingMode: aws.String("string"), + AvroIncludeBeforeValue: aws.Bool(true), EncodingFormat: aws.String("json"), }, LargeMessageHandle: &LargeMessageHandleConfig{ diff --git a/pkg/config/sink.go b/pkg/config/sink.go index 9b2f77b601..0a61a2d05b 100644 --- a/pkg/config/sink.go +++ b/pkg/config/sink.go @@ -408,6 +408,7 @@ type CodecConfig struct { AvroEnableWatermark *bool `toml:"avro-enable-watermark" json:"avro-enable-watermark"` AvroDecimalHandlingMode *string `toml:"avro-decimal-handling-mode" json:"avro-decimal-handling-mode,omitempty"` AvroBigintUnsignedHandlingMode *string `toml:"avro-bigint-unsigned-handling-mode" json:"avro-bigint-unsigned-handling-mode,omitempty"` + AvroIncludeBeforeValue *bool `toml:"avro-include-before-value" json:"avro-include-before-value,omitempty"` EncodingFormat *string `toml:"encoding-format" json:"encoding-format,omitempty"` } diff --git a/pkg/sink/codec/avro/avro.go b/pkg/sink/codec/avro/avro.go index 52cc83da4c..b4cd7a1edf 100644 --- a/pkg/sink/codec/avro/avro.go +++ b/pkg/sink/codec/avro/avro.go @@ -184,6 +184,12 @@ func (a *BatchEncoder) encodeValue(ctx context.Context, topic string, e *model.R log.Error("avro: converting value to native failed", zap.Error(err)) return nil, errors.Trace(err) } + if a.config.AvroIncludeBeforeValue { + native, err = a.nativeValueWithBeforeValue(native, e) + if err != nil { + return nil, errors.Trace(err) + } + } if a.config.EnableTiDBExtension { native = a.nativeValueWithExtension(native, e) } @@ -326,6 +332,41 @@ func getOperation(e *model.RowChangedEvent) string { return "" } +func beforeValueRecordName(tableName model.TableName) string { + return common.SanitizeName(tableName.Table) + "_before" +} + +func (a *BatchEncoder) beforeValueRecordFullName(tableName model.TableName) string { + namespace := getAvroNamespace(a.namespace, tableName.Schema) + if namespace == "" { + return beforeValueRecordName(tableName) + } + return namespace + "." + beforeValueRecordName(tableName) +} + +func (a *BatchEncoder) nativeValueWithBeforeValue( + native map[string]interface{}, + e *model.RowChangedEvent, +) (map[string]interface{}, error) { + if !e.IsUpdate() { + native[ticdcBefore] = goavro.Union("null", nil) + return native, nil + } + + input := avroEncodeInput{ + TableInfo: e.TableInfo, + columns: e.PreColumns, + colInfos: e.TableInfo.GetColInfosForRowChangedEvent(), + } + before, err := a.columns2AvroData(input) + if err != nil { + log.Error("avro: converting before value to native failed", zap.Error(err)) + return nil, errors.Trace(err) + } + native[ticdcBefore] = goavro.Union(a.beforeValueRecordFullName(e.TableInfo.TableName), before) + return native, nil +} + func (a *BatchEncoder) nativeValueWithExtension( native map[string]interface{}, e *model.RowChangedEvent, @@ -354,6 +395,7 @@ const ( tidbOp = "_tidb_op" tidbCommitTs = "_tidb_commit_ts" tidbPhysicalTime = "_tidb_commit_physical_time" + ticdcBefore = "_ticdc_before" // row level checksum related fields tidbRowLevelChecksum = "_tidb_row_level_checksum" @@ -522,6 +564,25 @@ func (a *BatchEncoder) schemaWithExtension( return top } +func (a *BatchEncoder) schemaWithBeforeValue( + top *avroSchemaTop, + tableName model.TableName, + input avroEncodeInput, +) (*avroSchemaTop, error) { + beforeValue, err := a.columns2AvroSchema(tableName, input) + if err != nil { + return nil, err + } + beforeValue.Name = beforeValueRecordName(tableName) + + top.Fields = append(top.Fields, map[string]interface{}{ + "name": ticdcBefore, + "type": []interface{}{"null", beforeValue}, + "default": nil, + }) + return top, nil +} + func (a *BatchEncoder) columns2AvroSchema(tableName model.TableName, input avroEncodeInput) (*avroSchemaTop, error) { top := &avroSchemaTop{ Tp: "record", @@ -588,6 +649,13 @@ func (a *BatchEncoder) value2AvroSchema(tableName model.TableName, input avroEnc return "", err } + if a.config.AvroIncludeBeforeValue { + top, err = a.schemaWithBeforeValue(top, tableName, input) + if err != nil { + return "", err + } + } + if a.config.EnableTiDBExtension { top = a.schemaWithExtension(top) } diff --git a/pkg/sink/codec/avro/avro_test.go b/pkg/sink/codec/avro/avro_test.go index 54b11ae4cc..87375fe734 100644 --- a/pkg/sink/codec/avro/avro_test.go +++ b/pkg/sink/codec/avro/avro_test.go @@ -243,6 +243,87 @@ func TestAvroEncode(t *testing.T) { } } +func TestAvroEncodeIncludeBeforeValue(t *testing.T) { + codecConfig := common.NewConfig(config.ProtocolAvro) + codecConfig.EnableTiDBExtension = true + codecConfig.AvroIncludeBeforeValue = true + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + encoder, err := SetupEncoderAndSchemaRegistry4Testing(ctx, codecConfig) + defer TeardownEncoderAndSchemaRegistry4Testing() + require.NoError(t, err) + require.NotNil(t, encoder) + + tableColumns := []*model.Column{ + {Name: "id", Type: mysql.TypeLong, Flag: model.HandleKeyFlag | model.PrimaryKeyFlag}, + {Name: "name", Type: mysql.TypeVarchar}, + {Name: "age", Type: mysql.TypeLong}, + } + tableInfo := model.BuildTableInfo("test", "person", tableColumns, [][]int{{0}}) + beforeColumns := []*model.Column{ + {Name: "id", Value: int64(1)}, + {Name: "name", Value: "old"}, + {Name: "age", Value: int64(18)}, + } + afterColumns := []*model.Column{ + {Name: "id", Value: int64(1)}, + {Name: "name", Value: "new"}, + {Name: "age", Value: int64(20)}, + } + event := &model.RowChangedEvent{ + CommitTs: 1024, + TableInfo: tableInfo, + PreColumns: model.Columns2ColumnDatas(beforeColumns, tableInfo), + Columns: model.Columns2ColumnDatas(afterColumns, tableInfo), + } + + topic := "default" + bin, err := encoder.encodeValue(ctx, topic, event) + require.NoError(t, err) + + cid, data, err := extractConfluentSchemaIDAndBinaryData(bin) + require.NoError(t, err) + + avroValueCodec, err := encoder.schemaM.Lookup(ctx, topic, schemaID{confluentSchemaID: cid}) + require.NoError(t, err) + + res, _, err := avroValueCodec.NativeFromBinary(data) + require.NoError(t, err) + require.NotNil(t, res) + + m, ok := res.(map[string]interface{}) + require.True(t, ok) + require.Equal(t, int32(20), m["age"]) + require.Equal(t, "new", m["name"]) + + beforeUnion, ok := m[ticdcBefore].(map[string]interface{}) + require.True(t, ok) + before, ok := beforeUnion[encoder.beforeValueRecordFullName(tableInfo.TableName)].(map[string]interface{}) + require.True(t, ok) + require.Equal(t, int32(18), before["age"]) + require.Equal(t, "old", before["name"]) + + key, err := encoder.encodeKey(ctx, topic, event) + require.NoError(t, err) + decoder := NewDecoder(codecConfig, encoder.schemaM, topic, nil) + err = decoder.AddKeyValue(key, bin) + require.NoError(t, err) + + messageType, exist, err := decoder.HasNext() + require.NoError(t, err) + require.True(t, exist) + require.Equal(t, model.MessageTypeRow, messageType) + + decoded, err := decoder.NextRowChangedEvent() + require.NoError(t, err) + require.True(t, decoded.IsUpdate()) + require.Equal(t, uint64(1024), decoded.CommitTs) + require.Equal(t, "old", decoded.GetPreColumns()[1].Value) + require.Equal(t, "new", decoded.GetColumns()[1].Value) +} + func TestAvroEnvelope(t *testing.T) { t.Parallel() cManager := &confluentSchemaManager{} diff --git a/pkg/sink/codec/avro/decoder.go b/pkg/sink/codec/avro/decoder.go index a8474e238f..47ca10b2a1 100644 --- a/pkg/sink/codec/avro/decoder.go +++ b/pkg/sink/codec/avro/decoder.go @@ -182,6 +182,93 @@ func assembleEvent( return nil, errors.New("schema fields should be a map") } + columns, err := avroData2Columns(keyMap, valueMap, fields) + if err != nil { + return nil, errors.Trace(err) + } + + beforeMap, hasBefore, err := extractBeforeValueMap(valueMap) + if err != nil { + return nil, errors.Trace(err) + } + var beforeColumns []*model.Column + if hasBefore { + beforeColumns, err = avroData2Columns(keyMap, beforeMap, fields) + if err != nil { + return nil, errors.Trace(err) + } + } + + // "namespace.schema" + namespace := schema["namespace"].(string) + schemaName := strings.Split(namespace, ".")[1] + tableName := schema["name"].(string) + + var commitTs int64 + if !isDelete { + o, ok := valueMap[tidbCommitTs] + if !ok { + return nil, errors.New("commit ts not found") + } + commitTs = o.(int64) + } + + event := new(model.RowChangedEvent) + event.CommitTs = uint64(commitTs) + pkNameSet := make(map[string]struct{}, len(keyMap)) + for name := range keyMap { + pkNameSet[name] = struct{}{} + } + event.TableInfo = model.BuildTableInfoWithPKNames4Test(schemaName, tableName, columns, pkNameSet) + + if isDelete { + event.PreColumns = model.Columns2ColumnDatas(columns, event.TableInfo) + } else { + event.Columns = model.Columns2ColumnDatas(columns, event.TableInfo) + if hasBefore { + event.PreColumns = model.Columns2ColumnDatas(beforeColumns, event.TableInfo) + } + } + + return event, nil +} + +func isAvroExtensionField(name string) bool { + switch name { + case tidbOp, tidbCommitTs, tidbPhysicalTime, tidbRowLevelChecksum, + tidbChecksumVersion, tidbCorrupted, ticdcBefore: + return true + default: + return false + } +} + +func extractBeforeValueMap(valueMap map[string]interface{}) (map[string]interface{}, bool, error) { + rawBefore, ok := valueMap[ticdcBefore] + if !ok || rawBefore == nil { + return nil, false, nil + } + + beforeUnion, ok := rawBefore.(map[string]interface{}) + if !ok { + return nil, false, errors.New("before value should be a map") + } + for unionName, value := range beforeUnion { + if unionName == "null" || value == nil { + return nil, false, nil + } + before, ok := value.(map[string]interface{}) + if !ok { + return nil, false, errors.New("before record should be a map") + } + return before, true, nil + } + return nil, false, nil +} + +func avroData2Columns( + keyMap, valueMap map[string]interface{}, fields []interface{}, +) ([]*model.Column, error) { columns := make([]*model.Column, 0, len(valueMap)) // fields is ordered by the column id, so iterate over it to build columns // it's also the order to calculate the checksum. @@ -191,10 +278,9 @@ func assembleEvent( return nil, errors.New("schema field should be a map") } - // `tidbOp` is the first extension field in the schema, - // it's not real columns, so break here. + // Extension fields are not real columns, so break here. colName := field["name"].(string) - if colName == tidbOp { + if isAvroExtensionField(colName) { break } @@ -241,36 +327,7 @@ func assembleEvent( } columns = append(columns, col) } - - // "namespace.schema" - namespace := schema["namespace"].(string) - schemaName := strings.Split(namespace, ".")[1] - tableName := schema["name"].(string) - - var commitTs int64 - if !isDelete { - o, ok := valueMap[tidbCommitTs] - if !ok { - return nil, errors.New("commit ts not found") - } - commitTs = o.(int64) - } - - event := new(model.RowChangedEvent) - event.CommitTs = uint64(commitTs) - pkNameSet := make(map[string]struct{}, len(keyMap)) - for name := range keyMap { - pkNameSet[name] = struct{}{} - } - event.TableInfo = model.BuildTableInfoWithPKNames4Test(schemaName, tableName, columns, pkNameSet) - - if isDelete { - event.PreColumns = model.Columns2ColumnDatas(columns, event.TableInfo) - } else { - event.Columns = model.Columns2ColumnDatas(columns, event.TableInfo) - } - - return event, nil + return columns, nil } func isCorrupted(valueMap map[string]interface{}) bool { diff --git a/pkg/sink/codec/common/config.go b/pkg/sink/codec/common/config.go index 93b85c708a..4d2d557e68 100644 --- a/pkg/sink/codec/common/config.go +++ b/pkg/sink/codec/common/config.go @@ -55,6 +55,7 @@ type Config struct { AvroDecimalHandlingMode string AvroBigintUnsignedHandlingMode string AvroGlueSchemaRegistry *config.GlueSchemaRegistryConfig + AvroIncludeBeforeValue bool // EnableWatermarkEvent set to true, avro encode DDL and checkpoint event // and send to the downstream kafka, they cannot be consumed by the confluent official consumer // and would cause error, so this is only used for ticdc internal testing purpose, should not be @@ -117,6 +118,7 @@ func NewConfig(protocol config.Protocol) *Config { AvroConfluentSchemaRegistry: "", AvroDecimalHandlingMode: "precise", AvroBigintUnsignedHandlingMode: "long", + AvroIncludeBeforeValue: false, AvroEnableWatermark: false, OnlyOutputUpdatedColumns: false, @@ -160,6 +162,7 @@ type urlConfig struct { MaxMessageBytes *int `form:"max-message-bytes"` AvroDecimalHandlingMode *string `form:"avro-decimal-handling-mode"` AvroBigintUnsignedHandlingMode *string `form:"avro-bigint-unsigned-handling-mode"` + AvroIncludeBeforeValue *bool `form:"include-before-value"` // AvroEnableWatermark is the option for enabling watermark in avro protocol // only used for internal testing, do not set this in the production environment since the @@ -209,6 +212,9 @@ func (c *Config) Apply(sinkURI *url.URL, replicaConfig *config.ReplicaConfig) er *urlParameter.AvroBigintUnsignedHandlingMode != "" { c.AvroBigintUnsignedHandlingMode = *urlParameter.AvroBigintUnsignedHandlingMode } + if urlParameter.AvroIncludeBeforeValue != nil && c.Protocol == config.ProtocolAvro { + c.AvroIncludeBeforeValue = *urlParameter.AvroIncludeBeforeValue + } if urlParameter.AvroEnableWatermark != nil { if c.EnableTiDBExtension && c.Protocol == config.ProtocolAvro { c.AvroEnableWatermark = *urlParameter.AvroEnableWatermark @@ -315,6 +321,7 @@ func mergeConfig( dest.AvroEnableWatermark = codecConfig.AvroEnableWatermark dest.AvroDecimalHandlingMode = codecConfig.AvroDecimalHandlingMode dest.AvroBigintUnsignedHandlingMode = codecConfig.AvroBigintUnsignedHandlingMode + dest.AvroIncludeBeforeValue = codecConfig.AvroIncludeBeforeValue dest.EncodingFormatType = codecConfig.EncodingFormat } } diff --git a/pkg/sink/codec/common/config_test.go b/pkg/sink/codec/common/config_test.go index 36ed54b441..a00c589750 100644 --- a/pkg/sink/codec/common/config_test.go +++ b/pkg/sink/codec/common/config_test.go @@ -36,6 +36,7 @@ func TestNewConfig(t *testing.T) { require.Equal(t, "precise", c.AvroDecimalHandlingMode) require.Equal(t, "long", c.AvroBigintUnsignedHandlingMode) require.Equal(t, "", c.AvroConfluentSchemaRegistry) + require.False(t, c.AvroIncludeBeforeValue) require.False(t, c.EnableRowChecksum) require.NotNil(t, c.LargeMessageHandle) } @@ -370,7 +371,7 @@ func TestMergeConfig(t *testing.T) { "protocol=avro&enable-tidb-extension=true&schema-registry=abc&" + "only-output-updated-columns=true&avro-enable-watermark=true&" + "avro-bigint-unsigned-handling-mode=ab&avro-decimal-handling-mode=cd&" + - "max-message-bytes=123&max-batch-size=456" + "include-before-value=true&max-message-bytes=123&max-batch-size=456" sinkURI, err := url.Parse(uri) require.NoError(t, err) @@ -381,6 +382,7 @@ func TestMergeConfig(t *testing.T) { require.Equal(t, "abc", c.AvroConfluentSchemaRegistry) require.True(t, c.OnlyOutputUpdatedColumns) require.True(t, c.AvroEnableWatermark) + require.True(t, c.AvroIncludeBeforeValue) require.False(t, c.ContentCompatible) require.Equal(t, "ab", c.AvroBigintUnsignedHandlingMode) require.Equal(t, "cd", c.AvroDecimalHandlingMode) @@ -402,6 +404,7 @@ func TestMergeConfig(t *testing.T) { AvroEnableWatermark: aws.Bool(true), AvroBigintUnsignedHandlingMode: aws.String("ab"), AvroDecimalHandlingMode: aws.String("cd"), + AvroIncludeBeforeValue: aws.Bool(true), EncodingFormat: aws.String("json"), }, LargeMessageHandle: &config.LargeMessageHandleConfig{ @@ -415,6 +418,7 @@ func TestMergeConfig(t *testing.T) { require.Equal(t, "abc", c.AvroConfluentSchemaRegistry) require.True(t, c.OnlyOutputUpdatedColumns) require.True(t, c.AvroEnableWatermark) + require.True(t, c.AvroIncludeBeforeValue) require.False(t, c.ContentCompatible) require.Equal(t, "ab", c.AvroBigintUnsignedHandlingMode) require.Equal(t, "cd", c.AvroDecimalHandlingMode) @@ -427,7 +431,7 @@ func TestMergeConfig(t *testing.T) { "protocol=avro&enable-tidb-extension=true&schema-registry=abc&" + "only-output-updated-columns=true&avro-enable-watermark=true&" + "avro-bigint-unsigned-handling-mode=ab&avro-decimal-handling-mode=cd&" + - "max-message-bytes=123&max-batch-size=456" + "include-before-value=true&max-message-bytes=123&max-batch-size=456" sinkURI, err = url.Parse(uri) require.NoError(t, err) replicaConfig.Sink.OnlyOutputUpdatedColumns = aws.Bool(false) @@ -441,6 +445,7 @@ func TestMergeConfig(t *testing.T) { AvroEnableWatermark: aws.Bool(false), AvroBigintUnsignedHandlingMode: aws.String("adb"), AvroDecimalHandlingMode: aws.String("cde"), + AvroIncludeBeforeValue: aws.Bool(false), EncodingFormat: aws.String("avro"), }, LargeMessageHandle: &config.LargeMessageHandleConfig{ @@ -455,6 +460,7 @@ func TestMergeConfig(t *testing.T) { require.Equal(t, "abc", c.AvroConfluentSchemaRegistry) require.True(t, c.OnlyOutputUpdatedColumns) require.True(t, c.AvroEnableWatermark) + require.True(t, c.AvroIncludeBeforeValue) require.False(t, c.ContentCompatible) require.Equal(t, "ab", c.AvroBigintUnsignedHandlingMode) require.Equal(t, "cd", c.AvroDecimalHandlingMode)