sink(ticdc): add before field for avro protocol#12632
Conversation
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
before field fro avro protocolbefore field for avro protocol
There was a problem hiding this comment.
Code Review
This pull request introduces the AvroIncludeBeforeValue configuration option, which allows the Avro codec to include pre-update column values in the encoded output. The implementation includes updates to the API models, configuration parsing, and both the Avro encoder and decoder, along with corresponding unit tests. Review feedback identifies several locations in the decoder where unsafe type assertions and string operations could lead to panics, recommending the use of comma-ok assertions and more rigorous validation of schema metadata.
| namespace := schema["namespace"].(string) | ||
| schemaName := strings.Split(namespace, ".")[1] | ||
| tableName := schema["name"].(string) |
There was a problem hiding this comment.
These type assertions and the string split operation are fragile and could lead to a panic if the schema metadata is unexpected (e.g., missing namespace or namespace without a dot). It is safer to use comma-ok assertions and check the length of the split result.
| namespace := schema["namespace"].(string) | |
| schemaName := strings.Split(namespace, ".")[1] | |
| tableName := schema["name"].(string) | |
| namespace, ok := schema["namespace"].(string) | |
| if !ok { | |
| return nil, errors.New("namespace not found or not a string") | |
| } | |
| parts := strings.Split(namespace, ".") | |
| if len(parts) < 2 { | |
| return nil, errors.New("invalid namespace format") | |
| } | |
| schemaName := parts[1] | |
| tableName, ok := schema["name"].(string) | |
| if !ok { | |
| return nil, errors.New("table name not found or not a string") | |
| } |
| if !ok { | ||
| return nil, errors.New("commit ts not found") | ||
| } | ||
| commitTs = o.(int64) |
|
/retest |
|
@wk989898: The following tests failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
What problem does this PR solve?
Issue Number: close #12631 close #11824
What is changed and how it works?
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note