From 7998de9c0959732d4c9f611c1915559b4c553798 Mon Sep 17 00:00:00 2001 From: Alex Peng Date: Thu, 25 Jun 2026 16:35:10 +0800 Subject: [PATCH] Refactor _hf_download to improve subprocess handling and stdout capture - Removed the --format=json option to allow tqdm progress bars to display correctly. - Changed from subprocess.run to subprocess.Popen for better control over the child process. - Captured stdout lines to ensure the last line (local snapshot path) is returned correctly. - Added error handling to ensure the child process is killed on exceptions, preventing stale file-lock deadlocks. --- cosmos_framework/utils/checkpoint_db.py | 30 +++++++++++++++++++++---- 1 file changed, 26 insertions(+), 4 deletions(-) diff --git a/cosmos_framework/utils/checkpoint_db.py b/cosmos_framework/utils/checkpoint_db.py index 580f92f..2142d8f 100644 --- a/cosmos_framework/utils/checkpoint_db.py +++ b/cosmos_framework/utils/checkpoint_db.py @@ -144,6 +144,13 @@ def _hf_download(cmd_args: list[str]) -> str: Uses a newer Hugging Face CLI version to download checkpoint. The dependency version is very old and not robust. + + --format=json is intentionally omitted: it silences tqdm progress bars. + Without it, hf download streams tqdm to stderr (visible on rank0) and + prints the local snapshot path as the last stdout line. + + stdout is captured via Popen so we can guarantee the child is killed on + KeyboardInterrupt, preventing stale file-lock deadlocks on the next run. """ is_rank0 = os.environ.get("RANK", "0") == "0" cmd = [ @@ -152,18 +159,33 @@ def _hf_download(cmd_args: list[str]) -> str: "click", f"hf@{HF_VERSION}", "download", - "--format=json", *cmd_args, ] log.info(f"{shlex.join(cmd)}") - output = subprocess.run( + proc = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=None if is_rank0 else subprocess.PIPE, text=True, - check=True, ) - return json.loads(output.stdout)["path"] + stdout_lines: list[str] = [] + try: + assert proc.stdout is not None + for line in proc.stdout: + stdout_lines.append(line) + proc.wait() + except BaseException: + # Ensure the child is always reaped (covers KeyboardInterrupt too), + # so it cannot keep holding HuggingFace file-system locks. + proc.kill() + proc.wait() + raise + if proc.returncode != 0: + raise subprocess.CalledProcessError(proc.returncode, cmd) + # Without --format=json, hf download prints the local path as the last + # (and typically only) stdout line. + last_line = next((ln for ln in reversed(stdout_lines) if ln.strip()), "") + return last_line.strip() class RepositoryType(StrEnum):