From a313df834428b6b8081eee728f4a9ffec0af6e29 Mon Sep 17 00:00:00 2001 From: Xitee <59659167+Xitee1@users.noreply.github.com> Date: Mon, 18 May 2026 14:47:57 +0200 Subject: [PATCH] feat(extract): auto-detect next disc, drop double confirmation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Discs ≥ 2 are detected by polling drive_status. Press 'e' (single keypress via cbreak stdin) to stop collecting and proceed to the extraction phase. Disc 1 keeps the classic Enter prompt so the user can read the header info before the run starts. Drops the trailing `prompt_yn("Insert another disc?")` and the `prompt_disc("Insert disc N")` confirmation for every N ≥ 2. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/bd_archive/commands/extract.py | 62 ++++++++++++++++++++++++++---- src/bd_archive/ui/keypress.py | 51 ++++++++++++++++++++++++ 2 files changed, 106 insertions(+), 7 deletions(-) create mode 100644 src/bd_archive/ui/keypress.py diff --git a/src/bd_archive/commands/extract.py b/src/bd_archive/commands/extract.py index 4fc963a..3bde814 100644 --- a/src/bd_archive/commands/extract.py +++ b/src/bd_archive/commands/extract.py @@ -11,12 +11,17 @@ from bd_archive.shell.deps import check_deps from bd_archive.shell.format import human_bytes from bd_archive.tools import dar, par2 +from bd_archive.tools import eject as eject_tool from bd_archive.tools.optical import resolve_device from bd_archive.tools.par2 import VerifyResult, is_par2_index +from bd_archive.ui.keypress import cbreak_stdin, read_keypress from bd_archive.ui.logger import log from bd_archive.ui.progress import Progress, copy_with_progress from bd_archive.ui.prompts import prompt_disc, prompt_yn +SPINNER_FRAMES = "⠋⠙⠹⠸⠼⠴⠦⠧" +POLL_INTERVAL_S = 0.3 + # A dar slice or catalog filename ends in ".NNNN.dar"; stripping that # off yields the dar archive basename (e.g. "photos-gen1" or, on legacy # pre-Phase-2 archives, just "photos"). That basename is what dar -x @@ -40,6 +45,41 @@ def _mount_with_prompt(dio: DiscIO, mount_dir: Path, prompt_msg: str) -> Path | return None +def _wait_for_next_disc(dio: DiscIO, mount_dir: Path, target: int) -> Path | None: + """Poll drive + stdin until a disc is mountable or the user presses 'e'. + + Returns the mount path when a disc is detected and mounts cleanly. + Returns None when the user pressed 'e' — caller should break the + disc-collection loop and proceed to the extraction phase. + """ + is_stdout_tty = sys.stdout.isatty() + log.info(f"Waiting for disc {target}... (press 'e' to extract all collected discs)") + if not sys.stdin.isatty(): + log.warn("stdin not a TTY — press Ctrl+C to abort instead of 'e'") + + frame = 0 + try: + with cbreak_stdin(): + while True: + if is_stdout_tty: + sys.stdout.write(f"\r {SPINNER_FRAMES[frame]} polling drive...") + sys.stdout.flush() + frame = (frame + 1) % len(SPINNER_FRAMES) + + key = read_keypress(POLL_INTERVAL_S) + if key == "e": + return None + + if eject_tool.drive_status(dio.device) == eject_tool.CDS_DISC_OK: + mounted = dio.mount(mount_dir) + if mounted is not None: + return mounted + finally: + if is_stdout_tty: + sys.stdout.write("\r\033[K") + sys.stdout.flush() + + def _copy_disc_data( mounted: Path, disc_basename: str, staging: Path, catalog_verified: bool ) -> list[Path]: @@ -180,11 +220,22 @@ def cmd_extract(args): target = disc_num + 1 # ── 1. Mount disc ───────────────────────────────────────────────── + # Disc 1 keeps the classic press-Enter prompt so the user can read + # the header info before the run starts. Discs ≥ 2 auto-detect: + # poll drive + stdin, return on disc-ready or user pressing 'e'. mount_dir = Path(tempfile.mkdtemp(prefix="bd-mount-")) - mounted = _mount_with_prompt(dio, mount_dir, f"Insert disc {target}") - if mounted is None: - mount_dir.rmdir() - sys.exit(1) + if target == 1: + mounted = _mount_with_prompt(dio, mount_dir, f"Insert disc {target}") + if mounted is None: + mount_dir.rmdir() + sys.exit(1) + else: + mounted = _wait_for_next_disc(dio, mount_dir, target) + if mounted is None: + # 'e' pressed → done collecting, proceed to extraction + with contextlib.suppress(OSError): + mount_dir.rmdir() + break try: # Detect chain name + generation from any slice filename. @@ -273,9 +324,6 @@ def cmd_extract(args): gens_collected = sorted(gen_basenames) log.info(f"Chain so far: Gen {gens_collected} ({disc_num} disc(s) total)") - if not prompt_yn("Insert another disc?"): - break - if chain_name is None: log.error("No discs processed") sys.exit(1) diff --git a/src/bd_archive/ui/keypress.py b/src/bd_archive/ui/keypress.py new file mode 100644 index 0000000..51826bd --- /dev/null +++ b/src/bd_archive/ui/keypress.py @@ -0,0 +1,51 @@ +"""Single-keypress stdin handling for interactive waits. + +Used by `extract`'s auto-detect-next-disc loop to read a single 'e' +without requiring Enter. Linux-only (termios), which matches the rest +of the project (sysfs, ioctl, udisks). +""" + +import contextlib +import select +import sys +import termios +import time +import tty + + +@contextlib.contextmanager +def cbreak_stdin(): + """Put stdin into cbreak mode for the duration of the block. + + On exit (including exceptions) the original terminal attributes are + restored — leaving the user's shell in cbreak mode after a crash + would be a nasty surprise. No-op when stdin is not a TTY. + """ + if not sys.stdin.isatty(): + yield + return + fd = sys.stdin.fileno() + old = termios.tcgetattr(fd) + try: + tty.setcbreak(fd) + yield + finally: + termios.tcsetattr(fd, termios.TCSADRAIN, old) + + +def read_keypress(timeout: float) -> str | None: + """Wait up to `timeout` seconds for a single keypress. + + Returns the character (lowercased) or None if nothing was pressed. + Must be called from within a `cbreak_stdin()` context to read raw + single chars. On non-TTY stdin, sleeps for the timeout and returns + None — the caller's poll loop still ticks at the normal rate. + """ + if not sys.stdin.isatty(): + time.sleep(timeout) + return None + rlist, _, _ = select.select([sys.stdin], [], [], timeout) + if not rlist: + return None + ch = sys.stdin.read(1) + return ch.lower() if ch else None