-
Notifications
You must be signed in to change notification settings - Fork 31
[DNM] kafka-consumer: Add logging for DDL and DML events to improve downstream write visibility #3671
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
- Add logging for DDL events before writing to downstream when no DML events are pending - Add logging for each DML event when writing to downstream in both DDL flush and watermark flush paths - Move existing DML flush log to more appropriate location with additional details - Include schema, table, commitTs, row count, and row data in logs for better observability Signed-off-by: tenfyzhong <tenfy@tenfy.cn>
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
Summary of ChangesHello @tenfyzhong, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the observability of the Kafka consumer's writer component by integrating detailed Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request enhances observability by adding detailed logging for DDL and DML events. The changes are well-targeted and improve visibility into the data being written to the downstream sink.
My review includes a few suggestions:
- A high-severity concern regarding the logging of full DML row data at the
Infolevel, which could impact performance and security. I've recommended removing this verbose field. - A medium-severity suggestion to improve the readability of DDL event logs by logging the event type as a string.
- A medium-severity suggestion to refactor the new logging logic into helper functions to reduce code duplication and improve maintainability.
Overall, these are valuable additions for debugging and monitoring. Addressing the feedback will make the implementation more robust and production-ready.
| log.Info("write DML event to downstream", | ||
| zap.String("schema", e.TableInfo.GetSchemaName()), | ||
| zap.String("table", e.TableInfo.GetTableName()), | ||
| zap.Int64("tableID", e.GetTableID()), | ||
| zap.Uint64("commitTs", e.GetCommitTs()), | ||
| zap.Uint64("startTs", e.GetStartTs()), | ||
| zap.Int32("rowCount", e.Len()), | ||
| zap.String("rows", e.String())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A couple of points on this DML logging:
- (High Severity) Logging the full row content with
zap.String("rows", e.String())at theInfolevel can cause significant performance issues and log bloat, and may leak sensitive data in production environments. It's strongly recommended to remove this field from the log statement. The existing metadata provides good visibility. If full row data is needed for debugging, it should be at theDebuglevel. - (Maintainability) This logging logic is duplicated for the watermark flush path (lines 315-323). Extracting this into a shared helper function would improve code clarity and reduce redundancy.
log.Info("write DML event to downstream",
zap.String("schema", e.TableInfo.GetSchemaName()),
zap.String("table", e.TableInfo.GetTableName()),
zap.Int64("tableID", e.GetTableID()),
zap.Uint64("commitTs", e.GetCommitTs()),
zap.Uint64("startTs", e.GetStartTs()),
zap.Int32("rowCount", e.Len()))| log.Info("write DML event to downstream", | ||
| zap.String("schema", e.TableInfo.GetSchemaName()), | ||
| zap.String("table", e.TableInfo.GetTableName()), | ||
| zap.Int64("tableID", e.GetTableID()), | ||
| zap.Uint64("commitTs", e.GetCommitTs()), | ||
| zap.Uint64("startTs", e.GetStartTs()), | ||
| zap.Int32("rowCount", e.Len()), | ||
| zap.String("rows", e.String())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Logging the full DML event content via e.String() at the Info level can introduce significant performance overhead and log bloat, especially with large transactions or frequent updates. It may also expose sensitive data in production logs. It's recommended to remove zap.String("rows", e.String()) to only log metadata about the DML event. If detailed row data is necessary for debugging, it should be logged at the Debug level.
log.Info("write DML event to downstream",
zap.String("schema", e.TableInfo.GetSchemaName()),
zap.String("table", e.TableInfo.GetTableName()),
zap.Int64("tableID", e.GetTableID()),
zap.Uint64("commitTs", e.GetCommitTs()),
zap.Uint64("startTs", e.GetStartTs()),
zap.Int32("rowCount", e.Len()))| log.Info("write DDL event to downstream", | ||
| zap.String("schema", ddl.GetSchemaName()), | ||
| zap.String("table", ddl.GetTableName()), | ||
| zap.Uint64("commitTs", ddl.GetCommitTs()), | ||
| zap.String("query", ddl.Query), | ||
| zap.Any("type", ddl.Type)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few suggestions for this logging block:
- The DDL event type is being logged as a raw byte value with
zap.Any("type", ddl.Type). It would be more readable to log its string representation usingddl.GetDDLType(), for example withzap.Stringer("type", ddl.GetDDLType()). - This logging logic is duplicated later in the file (lines 215-221). To improve maintainability, consider extracting this into a helper function, for example
logDDLEvent(ddl *commonEvent.DDLEvent). A similar refactoring could be applied to the DML event logging.
| log.Info("write DDL event to downstream", | |
| zap.String("schema", ddl.GetSchemaName()), | |
| zap.String("table", ddl.GetTableName()), | |
| zap.Uint64("commitTs", ddl.GetCommitTs()), | |
| zap.String("query", ddl.Query), | |
| zap.Any("type", ddl.Type)) | |
| log.Info("write DDL event to downstream", | |
| zap.String("schema", ddl.GetSchemaName()), | |
| zap.String("table", ddl.GetTableName()), | |
| zap.Uint64("commitTs", ddl.GetCommitTs()), | |
| zap.String("query", ddl.Query), | |
| zap.Stringer("type", ddl.GetDDLType())) |
| log.Info("write DDL event to downstream", | ||
| zap.String("schema", ddl.GetSchemaName()), | ||
| zap.String("table", ddl.GetTableName()), | ||
| zap.Uint64("commitTs", ddl.GetCommitTs()), | ||
| zap.String("query", ddl.Query), | ||
| zap.Any("type", ddl.Type)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The DDL event type is being logged as a raw byte value using zap.Any("type", ddl.Type). This makes the log less readable as it will show a number instead of the action type. It would be more informative to log the string representation of the DDL type. You can achieve this by using ddl.GetDDLType(), which returns a model.ActionType that has a String() method. Using zap.Stringer is a good practice for this.
log.Info("write DDL event to downstream",
zap.String("schema", ddl.GetSchemaName()),
zap.String("table", ddl.GetTableName()),
zap.Uint64("commitTs", ddl.GetCommitTs()),
zap.String("query", ddl.Query),
zap.Stringer("type", ddl.GetDDLType()))- Add `build_kafka_consumer` target to build Docker image for Kafka consumer - Use DOCKER_BUILDKIT=1 for improved build performance - Tag image with KAFKA_CONSUMER_TAG or fallback to GITBRANCH Signed-off-by: tenfyzhong <tenfy@tenfy.cn>
…vents" This reverts commit ac94e7a.
…events" This reverts commit d13b978.
|
[FORMAT CHECKER NOTIFICATION] Notice: To remove the 📖 For more info, you can check the "Contribute Code" section in the development guide. |
What problem does this PR solve?
Issue Number: close #xxx
What is changed and how it works?
This PR enhances the observability of the Kafka consumer's writer component by adding detailed logging for DDL and DML events before they are written to the downstream sink. The changes improve debugging and monitoring capabilities by providing a clear audit trail of data being processed.
Key changes include:
Infolevel logs for DDL events being written to the downstream, capturing schema, table, commit timestamp, query, and DDL type.Infolevel logs for DML events being written to the downstream, capturing schema, table, table ID, commit timestamp, start timestamp, row count, and a string representation of the rows.WriteBlockEventfor DDL andAddDMLEventfor DML) to ensure the logged information reflects the exact data being sent.Check List
Tests
Manual testing steps:
CREATE TABLE,ALTER TABLE) on the upstream TiDB.INSERT,UPDATE,DELETE) on the upstream TiDB.Questions
Will it cause performance regression or break compatibility?
No. This change only adds logging at the
Infolevel. It does not alter the data flow, processing logic, or any external interfaces. The performance impact is negligible as logging operations are inexpensive compared to the sink operations.Do you need to update user documentation, design documentation or monitoring documentation?
No. This is an internal observability improvement and does not require updates to user-facing documentation.
Release note