Skip to content

fix(flink): add close function in AbstractStreamWriteFunction#18743

Closed
HuangZhenQiu wants to merge 1 commit into
apache:release-0.14.2from
HuangZhenQiu:add-close-function
Closed

fix(flink): add close function in AbstractStreamWriteFunction#18743
HuangZhenQiu wants to merge 1 commit into
apache:release-0.14.2from
HuangZhenQiu:add-close-function

Conversation

@HuangZhenQiu
Copy link
Copy Markdown
Member

Describe the issue this Pull Request addresses

Add close function in AbstractStreamWriteFunction to prevent orphan parquet file creation after rollback

Summary and Changelog

  1. Add close function in AbstractStreamWriteFunction to prevent orphan parquet file creation after rollback

Impact

none

Risk Level

none

Documentation Update

none

Contributor's checklist

  • Read through contributor's guide
  • Enough context is provided in the sections above
  • Adequate tests were added if applicable

@HuangZhenQiu
Copy link
Copy Markdown
Member Author

This close function is later introduced by the PR #17838 for Flink upsert support. But we don't what to cherry-pick such a large PR to 0.14.2.

@HuangZhenQiu HuangZhenQiu requested a review from danny0405 May 15, 2026 04:59
Copy link
Copy Markdown
Contributor

@hudi-agent hudi-agent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Thanks for the contribution! This adds a close() override on AbstractStreamWriteFunction to release the write client. One thing worth double-checking around super.close() chaining noted inline. Please take a look at any inline comments, and this should be ready for a Hudi committer or PMC member to take it from here.


@Override
public void close() throws Exception {
if (this.writeClient != null) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 Should this also call super.close() to chain to the parent (ProcessFunctionAdapter / RichFunction)? Subclasses like StreamWriteFunction and IndexWriteFunction invoke super.close() at the end of their own close(), which now lands here — and stops, leaving anything the Flink base class needs to clean up unhandled. Wrapping the writeClient.close() in a try/finally with super.close() in the finally would also ensure cleanup proceeds if the write client throws.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 The current diff already calls super.close() at line 313, so the chaining concern looks addressed. The try/finally point still has merit though — if writeClient.close() throws, super.close() would be skipped. Worth noting that the sibling StreamWriteFunction.close() already uses this try/finally pattern for memorySegmentPool, so mirroring it here would be consistent.

Copy link
Copy Markdown
Contributor

@hudi-agent hudi-agent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Thanks for the contribution! This PR adds a close() override in AbstractStreamWriteFunction to close the write client and avoid orphan parquet files after rollback. One issue worth fixing in the inline comments — super.close() appears to be invoked twice. Please take a look at the inline comments, and this should be ready for a Hudi committer or PMC member to take it from here.

this.writeClient.close();
}
super.close();
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 super.close() is invoked twice here (once before writeClient.close() and once after). Could you remove the duplicate trailing call? Depending on the parent's implementation, double-invoking could trigger errors or repeat side effects during teardown.

- AI-generated; verify before applying. React 👍/👎 to flag quality.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 Looking at the current diff, super.close() is only invoked once — at the end of the new close() method after writeClient.close(). There's no duplicate call. The try/finally suggestion from the sibling comment still seems worth considering though, so super.close() isn't skipped if writeClient.close() throws.


@Override
public void close() throws Exception {
if (this.writeClient != null) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 The current diff already calls super.close() at line 313, so the chaining concern looks addressed. The try/finally point still has merit though — if writeClient.close() throws, super.close() would be skipped. Worth noting that the sibling StreamWriteFunction.close() already uses this try/finally pattern for memorySegmentPool, so mirroring it here would be consistent.

@github-actions github-actions Bot added the size:XS PR with lines of changes in <= 10 label May 15, 2026
Copy link
Copy Markdown
Contributor

@hudi-agent hudi-agent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Thanks for the contribution! This PR adds a close() method to AbstractStreamWriteFunction so that the underlying writeClient is released (preventing orphan parquet files after rollback). No new issues flagged from this automated pass beyond what prior reviewers have already noted — a Hudi committer or PMC member can take it from here for a final review.

cc @yihua

this.writeClient.close();
}
super.close();
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 Looking at the current diff, super.close() is only invoked once — at the end of the new close() method after writeClient.close(). There's no duplicate call. The try/finally suggestion from the sibling comment still seems worth considering though, so super.close() isn't skipped if writeClient.close() throws.

@hudi-bot
Copy link
Copy Markdown
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

size:XS PR with lines of changes in <= 10

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants