fix(trtllm): flush rank log handler so MPI workers' transfer metrics …#272
fix(trtllm): flush rank log handler so MPI workers' transfer metrics …#272KavinKrishnan wants to merge 2 commits into
Conversation
WalkthroughMxLiveWeightLoader updates per-rank logging from FileHandler to a line-buffered StreamHandler with explicit handler/stream tracking on self. Before returning fallback weights and in cleanup, the loader explicitly flushes the tracked stream with os.fsync to guarantee transfer metrics are written even during early MPI worker termination. ChangesPer-rank logging durability
Estimated code review effort🎯 2 (Simple) | ⏱️ ~10 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
fda4d92 to
a2afc9a
Compare
…persist MPI worker processes get killed via SIGKILL when the engine init completes, which skips Python's normal logging shutdown. The buffered FileHandler loses all logged Gbps/timing metrics. Switch to line-buffered stream and explicit flush+fsync at end of load_weights(). Signed-off-by: Kavin Krishnan <kavinkrishnan@gmail.com> Made-with: Cursor
Signed-off-by: Kavin Krishnan <kavink@nvidia.com>
a2afc9a to
4f642cb
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
modelexpress_client/python/modelexpress/trtllm_live_transfer.py (1)
520-526: ⚡ Quick wincleanup() should also remove the handler and close the stream.
After flushing,
cleanup()leaves theStreamHandlerattached to the"modelexpress"logger and the file descriptor open. If the loader is constructed/used again in the same process (or simply outlives this call), this leaks fds and double-logs. Consider:def cleanup(self): if hasattr(self, "_rank_log_handler"): try: self._rank_log_handler.flush() self._rank_log_stream.flush() + logging.getLogger("modelexpress").removeHandler(self._rank_log_handler) + self._rank_log_stream.close() except Exception: pass🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@modelexpress_client/python/modelexpress/trtllm_live_transfer.py` around lines 520 - 526, cleanup() currently only flushes _rank_log_handler/_rank_log_stream; after that you must remove the StreamHandler from the "modelexpress" logger, close the handler, and close the stream to avoid fd leaks and duplicate logging. Locate the cleanup method and: call logger = logging.getLogger("modelexpress") then logger.removeHandler(self._rank_log_handler), call self._rank_log_handler.close(), call self._rank_log_stream.close(), and optionally delete the attributes (del self._rank_log_handler, del self._rank_log_stream) to prevent reuse.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@modelexpress_client/python/modelexpress/trtllm_live_transfer.py`:
- Around line 318-334: The dedup loop in load_weights() currently only removes
logging.FileHandler instances so previously added StreamHandler and its open
file (rank_log_stream) are not removed or closed, causing duplicated handlers
and leaked file descriptors; update the dedup logic that iterates
mx_logger.handlers to remove both logging.StreamHandler and logging.FileHandler
whose base stream/file matches os.path.abspath(rank_log), and when removing a
matching handler close it and if it owns an underlying file-like object (the
rank_log_stream) close that stream as well; also ensure cleanup() not only
flushes self._rank_log_stream and handler but explicitly closes
self._rank_log_stream and removes/closes self._rank_log_handler (and set them to
None) to avoid overwriting references and leaking FDs on subsequent
load_weights() calls.
---
Nitpick comments:
In `@modelexpress_client/python/modelexpress/trtllm_live_transfer.py`:
- Around line 520-526: cleanup() currently only flushes
_rank_log_handler/_rank_log_stream; after that you must remove the StreamHandler
from the "modelexpress" logger, close the handler, and close the stream to avoid
fd leaks and duplicate logging. Locate the cleanup method and: call logger =
logging.getLogger("modelexpress") then
logger.removeHandler(self._rank_log_handler), call
self._rank_log_handler.close(), call self._rank_log_stream.close(), and
optionally delete the attributes (del self._rank_log_handler, del
self._rank_log_stream) to prevent reuse.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 68d54b7e-f208-487c-bc0f-d15dd64f58b6
📒 Files selected for processing (1)
modelexpress_client/python/modelexpress/trtllm_live_transfer.py
| for h in list(mx_logger.handlers): | ||
| if isinstance(h, logging.FileHandler) and getattr( | ||
| h, "baseFilename", "") == os.path.abspath(rank_log): | ||
| mx_logger.removeHandler(h) | ||
| h.close() | ||
|
|
||
| # Open with line buffering so each log line hits disk immediately. | ||
| rank_log_stream = open(rank_log, "w", buffering=1) | ||
| fh = logging.StreamHandler(rank_log_stream) | ||
| fh.setLevel(logging.INFO) | ||
| fh.setFormatter(logging.Formatter("%(asctime)s %(name)s %(levelname)s %(message)s")) | ||
| logging.getLogger("modelexpress").addHandler(fh) | ||
| fh.setFormatter(logging.Formatter( | ||
| "%(asctime)s %(name)s %(levelname)s %(message)s")) | ||
| mx_logger.addHandler(fh) | ||
| # Track for explicit flush before return (safety net in case Python's | ||
| # exit handlers don't run, e.g. when MPI rank gets killed by mpirun). | ||
| self._rank_log_handler = fh | ||
| self._rank_log_stream = rank_log_stream |
There was a problem hiding this comment.
Duplicate-handler dedup no longer matches; rank log stream is leaked.
Two related concerns on the new handler setup:
- The dedup loop at lines 318–322 still checks
isinstance(h, logging.FileHandler), but the handler being added on line 326 is a plainlogging.StreamHandler(not aFileHandler). On a second call toload_weights()the previously attachedStreamHandlerwon't be matched/removed, so handlers (and underlying open file streams) accumulate onmx_loggerand every log line gets written N times. rank_log_streamopened on line 325 is never closed.cleanup()only flushes; the previous handler/stream reference onselfis overwritten on each call, leaking the file descriptor. Closing it in the dedup branch and incleanup()would fix both.
Proposed fix
- mx_logger = logging.getLogger("modelexpress")
- mx_logger.setLevel(logging.INFO)
- # Avoid duplicate handlers across multiple load_weights() calls.
- for h in list(mx_logger.handlers):
- if isinstance(h, logging.FileHandler) and getattr(
- h, "baseFilename", "") == os.path.abspath(rank_log):
- mx_logger.removeHandler(h)
- h.close()
-
- # Open with line buffering so each log line hits disk immediately.
- rank_log_stream = open(rank_log, "w", buffering=1)
- fh = logging.StreamHandler(rank_log_stream)
+ mx_logger = logging.getLogger("modelexpress")
+ mx_logger.setLevel(logging.INFO)
+ # Avoid duplicate handlers across multiple load_weights() calls.
+ rank_log_abspath = os.path.abspath(rank_log)
+ for h in list(mx_logger.handlers):
+ stream = getattr(h, "stream", None)
+ if stream is not None and getattr(stream, "name", None) == rank_log_abspath:
+ mx_logger.removeHandler(h)
+ try:
+ stream.close()
+ except Exception:
+ pass
+
+ # Open with line buffering so each log line hits disk immediately.
+ rank_log_stream = open(rank_log, "w", buffering=1)
+ fh = logging.StreamHandler(rank_log_stream)And mirror the close in cleanup() after flushing.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@modelexpress_client/python/modelexpress/trtllm_live_transfer.py` around lines
318 - 334, The dedup loop in load_weights() currently only removes
logging.FileHandler instances so previously added StreamHandler and its open
file (rank_log_stream) are not removed or closed, causing duplicated handlers
and leaked file descriptors; update the dedup logic that iterates
mx_logger.handlers to remove both logging.StreamHandler and logging.FileHandler
whose base stream/file matches os.path.abspath(rank_log), and when removing a
matching handler close it and if it owns an underlying file-like object (the
rank_log_stream) close that stream as well; also ensure cleanup() not only
flushes self._rank_log_stream and handler but explicitly closes
self._rank_log_stream and removes/closes self._rank_log_handler (and set them to
None) to avoid overwriting references and leaking FDs on subsequent
load_weights() calls.
Cherry-picked from PR #218 (kavink/trtllm_clean), which is being retired now that TRT-LLM PR #13531 has merged. MPI workers get SIGKILLed when engine init completes, which skips Python's normal logging shutdown — the buffered FileHandler in trtllm_live_transfer.py loses Gbps/timing metrics. Switches to a line-buffered stream with explicit flush() + os.fsync() at end of load_weights(). Independent of any patch-shim work; useful regardless of base image.
Summary by CodeRabbit
Bug Fixes
Chores