Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 55 additions & 7 deletions src/bd_archive/commands/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,17 @@
from bd_archive.shell.deps import check_deps
from bd_archive.shell.format import human_bytes
from bd_archive.tools import dar, par2
from bd_archive.tools import eject as eject_tool
from bd_archive.tools.optical import resolve_device
from bd_archive.tools.par2 import VerifyResult, is_par2_index
from bd_archive.ui.keypress import cbreak_stdin, read_keypress
from bd_archive.ui.logger import log
from bd_archive.ui.progress import Progress, copy_with_progress
from bd_archive.ui.prompts import prompt_disc, prompt_yn

SPINNER_FRAMES = "⠋⠙⠹⠸⠼⠴⠦⠧"
POLL_INTERVAL_S = 0.3

# A dar slice or catalog filename ends in ".NNNN.dar"; stripping that
# off yields the dar archive basename (e.g. "photos-gen1" or, on legacy
# pre-Phase-2 archives, just "photos"). That basename is what dar -x
Expand All @@ -40,6 +45,41 @@ def _mount_with_prompt(dio: DiscIO, mount_dir: Path, prompt_msg: str) -> Path |
return None


def _wait_for_next_disc(dio: DiscIO, mount_dir: Path, target: int) -> Path | None:
"""Poll drive + stdin until a disc is mountable or the user presses 'e'.

Returns the mount path when a disc is detected and mounts cleanly.
Returns None when the user pressed 'e' — caller should break the
disc-collection loop and proceed to the extraction phase.
"""
is_stdout_tty = sys.stdout.isatty()
log.info(f"Waiting for disc {target}... (press 'e' to extract all collected discs)")
if not sys.stdin.isatty():
log.warn("stdin not a TTY — press Ctrl+C to abort instead of 'e'")

frame = 0
try:
with cbreak_stdin():
while True:
if is_stdout_tty:
sys.stdout.write(f"\r {SPINNER_FRAMES[frame]} polling drive...")
sys.stdout.flush()
frame = (frame + 1) % len(SPINNER_FRAMES)

key = read_keypress(POLL_INTERVAL_S)
if key == "e":
return None

if eject_tool.drive_status(dio.device) == eject_tool.CDS_DISC_OK:
mounted = dio.mount(mount_dir)
if mounted is not None:
return mounted
finally:
if is_stdout_tty:
sys.stdout.write("\r\033[K")
sys.stdout.flush()


def _copy_disc_data(
mounted: Path, disc_basename: str, staging: Path, catalog_verified: bool
) -> list[Path]:
Expand Down Expand Up @@ -180,11 +220,22 @@ def cmd_extract(args):
target = disc_num + 1

# ── 1. Mount disc ─────────────────────────────────────────────────
# Disc 1 keeps the classic press-Enter prompt so the user can read
# the header info before the run starts. Discs ≥ 2 auto-detect:
# poll drive + stdin, return on disc-ready or user pressing 'e'.
mount_dir = Path(tempfile.mkdtemp(prefix="bd-mount-"))
mounted = _mount_with_prompt(dio, mount_dir, f"Insert disc {target}")
if mounted is None:
mount_dir.rmdir()
sys.exit(1)
if target == 1:
mounted = _mount_with_prompt(dio, mount_dir, f"Insert disc {target}")
if mounted is None:
mount_dir.rmdir()
sys.exit(1)
else:
mounted = _wait_for_next_disc(dio, mount_dir, target)
if mounted is None:
# 'e' pressed → done collecting, proceed to extraction
with contextlib.suppress(OSError):
mount_dir.rmdir()
break

try:
# Detect chain name + generation from any slice filename.
Expand Down Expand Up @@ -273,9 +324,6 @@ def cmd_extract(args):
gens_collected = sorted(gen_basenames)
log.info(f"Chain so far: Gen {gens_collected} ({disc_num} disc(s) total)")

if not prompt_yn("Insert another disc?"):
break

if chain_name is None:
log.error("No discs processed")
sys.exit(1)
Expand Down
51 changes: 51 additions & 0 deletions src/bd_archive/ui/keypress.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
"""Single-keypress stdin handling for interactive waits.

Used by `extract`'s auto-detect-next-disc loop to read a single 'e'
without requiring Enter. Linux-only (termios), which matches the rest
of the project (sysfs, ioctl, udisks).
"""

import contextlib
import select
import sys
import termios
import time
import tty


@contextlib.contextmanager
def cbreak_stdin():
"""Put stdin into cbreak mode for the duration of the block.

On exit (including exceptions) the original terminal attributes are
restored — leaving the user's shell in cbreak mode after a crash
would be a nasty surprise. No-op when stdin is not a TTY.
"""
if not sys.stdin.isatty():
yield
return
fd = sys.stdin.fileno()
old = termios.tcgetattr(fd)
try:
tty.setcbreak(fd)
yield
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old)


def read_keypress(timeout: float) -> str | None:
"""Wait up to `timeout` seconds for a single keypress.

Returns the character (lowercased) or None if nothing was pressed.
Must be called from within a `cbreak_stdin()` context to read raw
single chars. On non-TTY stdin, sleeps for the timeout and returns
None — the caller's poll loop still ticks at the normal rate.
"""
if not sys.stdin.isatty():
time.sleep(timeout)
return None
rlist, _, _ = select.select([sys.stdin], [], [], timeout)
if not rlist:
return None
ch = sys.stdin.read(1)
return ch.lower() if ch else None
Loading