diff --git a/tests/test_download_session.py b/tests/test_download_session.py index bb4cfb4..bef19c2 100644 --- a/tests/test_download_session.py +++ b/tests/test_download_session.py @@ -60,6 +60,36 @@ def test_wire_bytes_monotonic_across_file_boundary(): assert s.session_bytes == 1500 # 1000 (a) + 500 (b) +def test_wire_bytes_monotonic_across_parallel_files(): + s, c = _mk() + s.note_started("a", 1000) + s.note_started("b", 2000) + c.t = 1.0 + s.note_progress("a", 100, 1000) + c.t = 2.0 + s.note_progress("b", 200, 2000) + c.t = 3.0 + s.note_progress("a", 300, 1000) + + assert s.session_bytes == 500 + assert s.avg_speed_bps == 200.0 + + +def test_eta_includes_all_active_parallel_files(): + s, c = _mk(remaining=1000) + s.note_started("a", 1000) + s.note_started("b", 2000) + c.t = 1.0 + s.note_progress("a", 500, 1000) + c.t = 5.0 + s.note_progress("b", 1000, 2000) + + # speed = (1500 - 500) / (5 - 1) = 250 B/s + # remaining = pending 1000 + a remainder 500 + b remainder 1000 + assert s.avg_speed_bps == 250.0 + assert s.eta_seconds == 10.0 + + def test_retry_within_file_no_negative_delta(): """download_file retries reset bytes_done WITHOUT a new item_started. The wire-byte counter must clamp the backward jump to zero.""" diff --git a/tests/test_hub.py b/tests/test_hub.py index e169620..813b0da 100644 --- a/tests/test_hub.py +++ b/tests/test_hub.py @@ -112,10 +112,32 @@ async def test_initial_last_state_includes_new_keys() -> None: assert "disk_pct" in hub.last_state assert "sync_status" in hub.last_state assert "sync_status_reason" in hub.last_state + assert "current_items" in hub.last_state assert hub.last_state["sync_error"] is None assert hub.last_state["disk_pct"] is None assert hub.last_state["sync_status"] is None assert hub.last_state["sync_status_reason"] is None + assert hub.last_state["current_items"] == {} + + +async def test_broadcast_tracks_multiple_current_items() -> None: + hub = Hub() + await hub.broadcast({"type": "item_started", "filename": "a.MP4", "total": 100}) + await hub.broadcast({"type": "item_started", "filename": "b.MP4", "total": 200}) + await hub.broadcast({ + "type": "item_progress", + "filename": "a.MP4", + "bytes": 40, + "total": 100, + "speed": 10, + }) + + assert set(hub.last_state["current_items"]) == {"a.MP4", "b.MP4"} + assert hub.last_state["current_items"]["a.MP4"]["bytes"] == 40 + + await hub.broadcast({"type": "item_finished", "filename": "a.MP4"}) + assert set(hub.last_state["current_items"]) == {"b.MP4"} + assert hub.last_state["current_item"]["filename"] == "b.MP4" def _stub_provider(**snap_overrides): diff --git a/tests/test_queue_ro_only.py b/tests/test_queue_ro_only.py index 7e97fbf..0ba77da 100644 --- a/tests/test_queue_ro_only.py +++ b/tests/test_queue_ro_only.py @@ -48,3 +48,33 @@ def test_next_pending_default_unchanged(db: Database) -> None: item = queue.next_pending(db) assert item is not None assert item.filename == "DRV.MP4" + + +def test_claim_next_pending_marks_row_downloading(db: Database) -> None: + _add_pending(db, filename="A.MP4", source_dir="/DCIM/Movie", enq=1) + + item = queue.claim_next_pending(db) + + assert item is not None + assert item.filename == "A.MP4" + assert item.state == "downloading" + assert item.attempts == 1 + with db.conn() as c: + row = c.execute( + "SELECT state, attempts FROM download_queue WHERE filename=?", + ("A.MP4",), + ).fetchone() + assert row["state"] == "downloading" + assert row["attempts"] == 1 + + +def test_claim_next_pending_does_not_reclaim_downloading_row( + db: Database, +) -> None: + _add_pending(db, filename="A.MP4", source_dir="/DCIM/Movie", enq=1) + + first = queue.claim_next_pending(db) + second = queue.claim_next_pending(db) + + assert first is not None + assert second is None diff --git a/tests/test_settings_api.py b/tests/test_settings_api.py index 5948f09..09b548b 100644 --- a/tests/test_settings_api.py +++ b/tests/test_settings_api.py @@ -134,6 +134,17 @@ def test_get_settings_includes_delete_after_download(authed_client) -> None: assert body["editable"]["DELETE_AFTER_DOWNLOAD"] is False +def test_download_concurrency_round_trips(authed_client) -> None: + put = authed_client.put( + "/api/settings", json={"DOWNLOAD_CONCURRENCY": 2} + ) + assert put.status_code == 200 + assert put.json()["editable"]["DOWNLOAD_CONCURRENCY"] == 2 + + got = authed_client.get("/api/settings") + assert got.json()["editable"]["DOWNLOAD_CONCURRENCY"] == 2 + + def test_put_delete_after_download_persists(authed_client) -> None: r = authed_client.put("/api/settings", json={"DELETE_AFTER_DOWNLOAD": True}) assert r.status_code == 200 diff --git a/tests/test_settings_schema.py b/tests/test_settings_schema.py index a5fdce4..30b05f1 100644 --- a/tests/test_settings_schema.py +++ b/tests/test_settings_schema.py @@ -62,6 +62,19 @@ def test_timeout_range() -> None: SettingsModel(**{**DEFAULT_VALUES, "TIMEOUT": 61}) +def test_download_concurrency_range() -> None: + SettingsModel(**{**DEFAULT_VALUES, "DOWNLOAD_CONCURRENCY": 1}) + SettingsModel(**{**DEFAULT_VALUES, "DOWNLOAD_CONCURRENCY": 4}) + with pytest.raises(ValueError): + SettingsModel(**{**DEFAULT_VALUES, "DOWNLOAD_CONCURRENCY": 0}) + with pytest.raises(ValueError): + SettingsModel(**{**DEFAULT_VALUES, "DOWNLOAD_CONCURRENCY": 5}) + + +def test_download_concurrency_is_editable() -> None: + assert "DOWNLOAD_CONCURRENCY" in EDITABLE_KEYS + + def test_web_port_range() -> None: SettingsModel(**{**DEFAULT_VALUES, "WEB_PORT": 1}) SettingsModel(**{**DEFAULT_VALUES, "WEB_PORT": 65535}) diff --git a/tests/test_sync_worker_concurrency.py b/tests/test_sync_worker_concurrency.py new file mode 100644 index 0000000..08432e5 --- /dev/null +++ b/tests/test_sync_worker_concurrency.py @@ -0,0 +1,156 @@ +"""SyncWorker parallel download drain.""" +from __future__ import annotations + +import asyncio +import time +import types + +from web.db import Database +from web.services import queue as q +from web.services import sync_worker as sw_mod +from web.services.sync_worker import SyncWorker + + +class _Hub: + def __init__(self): + self.events = [] + + async def broadcast(self, event): + self.events.append(event) + + def schedule_broadcast(self, loop, event): + self.events.append(event) + + +def _add_pending(db: Database, filename: str, enq: int) -> None: + with db.write() as c: + c.execute( + "INSERT INTO download_queue " + "(filename, source_dir, state, enqueued_at, remote_size, " + "recorded_at) VALUES (?, ?, 'pending', ?, ?, ?)", + (filename, "/DCIM/Movie", enq, 1024, int(time.time())), + ) + + +async def test_cycle_runs_downloads_up_to_configured_concurrency( + tmp_path, + monkeypatch, +): + db = Database(str(tmp_path / "v.db")) + rec_dir = tmp_path / "rec" + rec_dir.mkdir() + for idx in range(3): + _add_pending(db, f"{idx}.MP4", idx) + + snap = types.SimpleNamespace( + download_concurrency=2, + sync_ro_only=False, + recordings=str(rec_dir), + grouping="none", + retention_max_days=0, + retention_disk_pct=0, + retention_protect_ro=True, + recordings_quota_gb=0, + import_path="", + ) + worker = SyncWorker(db, types.SimpleNamespace(get=lambda: snap), _Hub()) + worker._active_address = "192.0.2.10" + + async def yes(*args, **kwargs): + return True + + async def active_address(): + return "192.0.2.10", "primary" + + worker._emit_disk_pct = yes + worker._check_recordings_writable = yes + worker._select_active_address = active_address + worker._refresh_listing_and_reconcile = yes + worker._probe_one = yes + + monkeypatch.setattr(sw_mod.scanner, "scan", lambda *a, **k: 0) + monkeypatch.setattr(sw_mod._retention, "sweep", lambda *a, **k: None) + + active = 0 + max_active = 0 + seen = [] + + async def fake_download(item, **kwargs): + nonlocal active, max_active + seen.append(item.filename) + active += 1 + max_active = max(max_active, active) + await asyncio.sleep(0.05) + q.mark_done(db, item.id) + active -= 1 + return True + + worker._download_one = fake_download + + did = await worker._cycle() + + assert did is True + assert max_active == 2 + assert sorted(seen) == ["0.MP4", "1.MP4", "2.MP4"] + + +async def test_cycle_refills_slot_as_soon_as_download_finishes( + tmp_path, + monkeypatch, +): + db = Database(str(tmp_path / "v.db")) + rec_dir = tmp_path / "rec" + rec_dir.mkdir() + for idx in range(5): + _add_pending(db, f"{idx}.MP4", idx) + + snap = types.SimpleNamespace( + download_concurrency=4, + sync_ro_only=False, + recordings=str(rec_dir), + grouping="none", + retention_max_days=0, + retention_disk_pct=0, + retention_protect_ro=True, + recordings_quota_gb=0, + import_path="", + ) + worker = SyncWorker(db, types.SimpleNamespace(get=lambda: snap), _Hub()) + worker._active_address = "192.0.2.10" + + async def yes(*args, **kwargs): + return True + + async def active_address(): + return "192.0.2.10", "primary" + + worker._emit_disk_pct = yes + worker._check_recordings_writable = yes + worker._select_active_address = active_address + worker._refresh_listing_and_reconcile = yes + worker._probe_one = yes + + monkeypatch.setattr(sw_mod.scanner, "scan", lambda *a, **k: 0) + monkeypatch.setattr(sw_mod._retention, "sweep", lambda *a, **k: None) + + fifth_started = asyncio.Event() + seen = [] + + async def fake_download(item, **kwargs): + seen.append(item.filename) + if item.filename == "0.MP4": + await asyncio.sleep(0.01) + elif item.filename in {"1.MP4", "2.MP4", "3.MP4"}: + await asyncio.wait_for(fifth_started.wait(), timeout=1) + elif item.filename == "4.MP4": + fifth_started.set() + q.mark_done(db, item.id) + return True + + worker._download_one = fake_download + + did = await worker._cycle() + + assert did is True + assert seen[:4] == ["0.MP4", "1.MP4", "2.MP4", "3.MP4"] + assert "4.MP4" in seen diff --git a/web/routers/settings.py b/web/routers/settings.py index 0b63e0d..aff05d9 100644 --- a/web/routers/settings.py +++ b/web/routers/settings.py @@ -48,6 +48,7 @@ def _editable_values(snap) -> dict[str, Any]: "RECORDINGS_QUOTA_GB": snap.recordings_quota_gb, "DISK_CRITICAL_PCT": snap.disk_critical_pct, "TIMEOUT": int(snap.timeout), + "DOWNLOAD_CONCURRENCY": snap.download_concurrency, "DOWNLOAD_ATTEMPTS": snap.download_attempts, "MAX_DOWNLOAD_ATTEMPTS": snap.max_attempts, "SYNC_INTERVAL": snap.sync_interval_seconds, diff --git a/web/services/download_session.py b/web/services/download_session.py index dd17e7e..66a5eea 100644 --- a/web/services/download_session.py +++ b/web/services/download_session.py @@ -12,10 +12,17 @@ from __future__ import annotations import time +from dataclasses import dataclass from collections import deque from typing import Callable, Deque, Optional, Tuple +@dataclass +class _ActiveFile: + bytes_done: int = 0 + total: Optional[int] = None + + class DownloadSession: # Below this many seconds of sample span the windowed speed is too # noisy to report. @@ -38,6 +45,7 @@ def __init__( self._cur_file: Optional[str] = None self._cur_file_bytes = 0 self._cur_total: Optional[int] = None + self._active_files: dict[str, _ActiveFile] = {} self._remaining_pending = 0 # ---- event feeds (called on the loop) ---- @@ -51,6 +59,7 @@ def note_started(self, filename: str, total: Optional[int]) -> None: self._cur_file = filename self._cur_file_bytes = 0 self._cur_total = total + self._active_files[filename] = _ActiveFile(total=total) self._refresh_remaining() def note_progress( @@ -58,17 +67,20 @@ def note_progress( ) -> None: if not self._active: self.note_started(filename, total) - if filename == self._cur_file: - delta = bytes_done - self._cur_file_bytes - else: + cur = self._active_files.get(filename) + if cur is None: # A file we never saw an item_started for. - self._cur_file = filename - delta = bytes_done + cur = _ActiveFile(total=total) + self._active_files[filename] = cur + delta = bytes_done - cur.bytes_done if delta < 0: # A retry reset bytes_done within the same file — never let the # monotonic counter go backwards. delta = 0 self._wire_bytes += delta + cur.bytes_done = bytes_done + cur.total = total + self._cur_file = filename self._cur_file_bytes = bytes_done self._cur_total = total now = self._mono() @@ -80,6 +92,7 @@ def note_finished( ) -> None: # Progress ticks already accounted for the bytes; just clear the # per-file cursor so the next file starts fresh. + self._active_files.pop(filename, None) self._cur_file = None self._cur_file_bytes = 0 self._cur_total = None @@ -93,6 +106,7 @@ def note_idle(self) -> None: self._cur_file = None self._cur_file_bytes = 0 self._cur_total = None + self._active_files.clear() self._remaining_pending = 0 def refresh_remaining(self) -> None: @@ -131,8 +145,9 @@ def eta_seconds(self) -> Optional[float]: if not speed: # None or 0 return None remaining = self._remaining_pending - if self._cur_total: - remaining += max(0, self._cur_total - self._cur_file_bytes) + for cur in self._active_files.values(): + if cur.total: + remaining += max(0, cur.total - cur.bytes_done) if remaining <= 0: return 0.0 return remaining / speed diff --git a/web/services/hub.py b/web/services/hub.py index 7c5a685..ca10b60 100644 --- a/web/services/hub.py +++ b/web/services/hub.py @@ -64,6 +64,7 @@ def __init__(self, settings_provider: Any = None, session: Any = None) -> None: "dashcam_source": None, "dashcam_address": None, "current_item": None, + "current_items": {}, # Session-wide download stats (see download_session.py). Always # present so the WS snapshot and MQTT state_fn never KeyError. "session": { @@ -118,22 +119,40 @@ async def broadcast(self, event: Dict[str, Any]) -> None: elif t == "dashcam_offline": self.last_state["dashcam_online"] = False elif t == "item_started": - self.last_state["current_item"] = { + item = { "filename": event.get("filename"), "total": event.get("total"), "bytes": 0, } + self.last_state["current_item"] = item + items = dict(self.last_state.get("current_items") or {}) + if item["filename"] is not None: + items[item["filename"]] = item + self.last_state["current_items"] = items elif t == "item_progress": - ci = self.last_state.get("current_item") or {} - ci.update( - filename=event.get("filename"), - bytes=event.get("bytes"), - total=event.get("total"), - speed=event.get("speed"), - ) + filename = event.get("filename") + ci = { + "filename": event.get("filename"), + "bytes": event.get("bytes"), + "total": event.get("total"), + "speed": event.get("speed"), + } self.last_state["current_item"] = ci + items = dict(self.last_state.get("current_items") or {}) + if filename is not None: + existing = dict(items.get(filename) or {}) + existing.update(ci) + items[filename] = existing + self.last_state["current_items"] = items elif t == "item_finished": - self.last_state["current_item"] = None + filename = event.get("filename") + items = dict(self.last_state.get("current_items") or {}) + if filename is not None: + items.pop(filename, None) + self.last_state["current_items"] = items + self.last_state["current_item"] = next( + iter(items.values()), None + ) elif t == "sync_state": self.last_state["sync_state"] = { "running": event.get("running"), diff --git a/web/services/queue.py b/web/services/queue.py index d857cf2..a788fe8 100644 --- a/web/services/queue.py +++ b/web/services/queue.py @@ -252,6 +252,55 @@ def next_pending( ) +def claim_next_pending( + db: Database, *, ro_only: bool = False, +) -> Optional[QueueItem]: + """Atomically move the next pending row to ``downloading``. + + Parallel sync workers must not perform ``next_pending()`` and + ``mark_downloading()`` as two separate steps, otherwise two workers + can select the same row before either marks it. ``Database.write()`` + serializes this select+update within the process. + """ + now = int(time.time()) + sql = ( + "SELECT * FROM download_queue " + "WHERE state='pending'" + ) + if ro_only: + sql += " AND (source_dir LIKE '%/RO/%' OR source_dir LIKE '%/RO')" + sql += " ORDER BY priority DESC, enqueued_at ASC LIMIT 1" + + with db.write() as c: + row = c.execute(sql).fetchone() + if row is None: + return None + c.execute( + "UPDATE download_queue SET state='downloading', " + "started_at=?, attempts=attempts+1, " + "last_attempt_at=? WHERE id=?", + (now, now, row["id"]), + ) + updated = c.execute( + "SELECT * FROM download_queue WHERE id=?", (row["id"],) + ).fetchone() + + return QueueItem( + id=updated["id"], + filename=updated["filename"], + source_dir=updated["source_dir"], + remote_size=updated["remote_size"], + recorded_at=updated["recorded_at"], + camera=updated["camera"], + event_type=updated["event_type"], + state=updated["state"], + priority=updated["priority"], + attempts=updated["attempts"], + last_error=updated["last_error"], + last_attempt_at=updated["last_attempt_at"], + ) + + def reconcile_orphan_downloads(db: Database) -> int: """Reset rows stuck at ``state='downloading'`` back to ``'pending'`` so the next sync cycle picks them up. diff --git a/web/services/sync_worker.py b/web/services/sync_worker.py index c1112fc..d281e6b 100644 --- a/web/services/sync_worker.py +++ b/web/services/sync_worker.py @@ -321,6 +321,7 @@ def __init__( self._cancel_current = threading.Event() self._kick = asyncio.Event() self._paused = threading.Event() # set = paused + self._cancel_events: dict[int, threading.Event] = {} # Restore the last-used pause toggle (persisted in kv) so a # restart resumes in the state the user left it. ENABLE_SCHEDULED_SYNC # still gates whether the schedule runs at all. Best-effort: with @@ -331,6 +332,7 @@ def __init__( self._loop: Optional[asyncio.AbstractEventLoop] = None self._running_cycle = False self._current_filename: Optional[str] = None + self._current_filenames: set[str] = set() # The address chosen for the in-flight cycle (primary or the # alternative). Selected once per cycle and held for the whole # drain — no mid-download switching. None when offline. @@ -432,12 +434,16 @@ def cancel_current(self) -> None: presses Stop.""" log.info("aborting current download") self._cancel_current.set() + for ev in list(getattr(self, "_cancel_events", {}).values()): + ev.set() def skip_current(self) -> None: """Cancel the in-flight download and move on to the next queue item. Unlike pause, the worker keeps running.""" log.info("skipping current download") self._cancel_current.set() + for ev in list(getattr(self, "_cancel_events", {}).values()): + ev.set() def pause(self) -> None: """Pause the worker: finish the current chunk then stop @@ -446,6 +452,8 @@ def pause(self) -> None: self._paused.set() self.db.kv_set(_PAUSED_KV_KEY, "1") self._cancel_current.set() + for ev in list(getattr(self, "_cancel_events", {}).values()): + ev.set() self._broadcast_sync_state() def resume(self) -> None: @@ -469,6 +477,9 @@ def get_status(self) -> dict: "running": self._is_running(), "paused": self.paused, "current_filename": self._current_filename, + "current_filenames": sorted( + getattr(self, "_current_filenames", set()) + ), } def _broadcast_sync_state(self) -> None: @@ -480,6 +491,9 @@ def _broadcast_sync_state(self) -> None: "running": self._is_running(), "paused": self.paused, "current_filename": self._current_filename, + "current_filenames": sorted( + getattr(self, "_current_filenames", set()) + ), }) ) ) @@ -739,49 +753,121 @@ async def _cycle(self) -> bool: }) return False - # Drain the queue. After each successful download the - # loop re-checks ``next_pending`` so a priority update - # mid-cycle takes effect immediately. + # Drain the queue with a rolling task pool. Each task atomically + # claims one pending row before it starts, so concurrent downloads + # cannot pick the same file. When one finishes, immediately refill + # that slot instead of waiting for the slowest sibling. did_any = False + refresh_after_success = False + active: dict[ + asyncio.Task[bool], + tuple[q.QueueItem, threading.Event], + ] = {} + + def _set_current_from_active() -> None: + self._current_filename = ( + sorted(self._current_filenames)[0] + if self._current_filenames else None + ) + self._broadcast_sync_state() + + def _forget_active_task(task: asyncio.Task[bool]) -> q.QueueItem: + item, _cancel_event = active.pop(task) + self._cancel_events.pop(item.id, None) + self._current_filenames.discard(item.filename) + return item + + def _forget_all_active() -> None: + for item, _cancel_event in list(active.values()): + self._cancel_events.pop(item.id, None) + self._current_filenames.discard(item.filename) + active.clear() + _set_current_from_active() + while not self._stop.is_set(): - if self._paused.is_set(): + snap = self._provider.get() + concurrency = max(1, int(getattr(snap, "download_concurrency", 1))) + + while ( + len(active) < concurrency + and not self._stop.is_set() + and not self._paused.is_set() + ): + # Re-probe before filling an open slot so we don't burn a + # retry budget on a dashcam that's already gone. + if did_any and not await self._probe_one(self._active_address): + self.cancel_current() + await self.hub.broadcast({ + "type": "dashcam_offline", + }) + if active: + await asyncio.gather( + *active.keys(), return_exceptions=True, + ) + _forget_all_active() + return True + + # Refresh listing after successful completions and before + # claiming the next item, so new clips can enter the queue + # without draining the whole current pool first. + if refresh_after_success: + await self._refresh_listing_and_reconcile() + refresh_after_success = False + + item = q.claim_next_pending( + self.db, + ro_only=snap.sync_ro_only, + ) + if item is None: + break + cancel_event = threading.Event() + self._cancel_events[item.id] = cancel_event + self._current_filenames.add(item.filename) + _set_current_from_active() + task = asyncio.create_task( + self._download_one( + item, + cancel_event=cancel_event, + already_claimed=True, + ) + ) + active[task] = (item, cancel_event) + + if not active: break - item = q.next_pending( - self.db, - ro_only=self._provider.get().sync_ro_only, + + done, _pending = await asyncio.wait( + active.keys(), return_when=asyncio.FIRST_COMPLETED, ) - if item is None: - break - # Re-probe occasionally so we don't burn a whole - # retry budget on a dashcam that's already gone. - if did_any and not await self._probe_one(self._active_address): - await self.hub.broadcast({ - "type": "dashcam_offline", - }) - return True - self._current_filename = item.filename - self._broadcast_sync_state() - try: - ok = await self._download_one(item) - except DiskFullError: + disk_full = False + for task in done: + item = _forget_active_task(task) + try: + result = task.result() + except DiskFullError: + disk_full = True + result = False + except Exception: + log.exception("download task crashed for %s", item.filename) + result = False + + did_any = True + if result is True: + refresh_after_success = True + _set_current_from_active() + + if disk_full: + self.cancel_current() log.warning( "recordings volume full — stopping this cycle's " "downloads; will retry next cycle" ) - self._current_filename = None + if active: + await asyncio.gather( + *active.keys(), return_exceptions=True, + ) + _forget_all_active() return did_any - self._current_filename = None - did_any = True - if not ok: - # Transient failure. Loop continues with next - # pending item, which may well succeed. - continue - # Refresh listing between downloads so clips the - # dashcam recorded during this transfer show up in - # the queue before we pick the next pending one. - # Best-effort: a transient listing failure here - # leaves the existing queue intact. - await self._refresh_listing_and_reconcile() # Re-index + sweep thumbs so new clips appear in the UI. # Both calls are idempotent; the did_any gate is just to @@ -836,10 +922,19 @@ def _present_filenames(self): # ---- single item download ---- - async def _download_one(self, item: q.QueueItem) -> bool: + async def _download_one( + self, + item: q.QueueItem, + *, + cancel_event: threading.Event | None = None, + already_claimed: bool = False, + ) -> bool: snap = self._provider.get() - q.mark_downloading(self.db, item.id) - self._cancel_current.clear() + if not already_claimed: + q.mark_downloading(self.db, item.id) + if cancel_event is None: + cancel_event = self._cancel_current + cancel_event.clear() loop = asyncio.get_running_loop() sink = WebSink(self.hub, loop) @@ -871,7 +966,7 @@ def _blocking(): base, rec, snap.recordings, group_name, progress_sink=sink, - cancel_check=self._cancel_current.is_set, + cancel_check=cancel_event.is_set, max_attempts=snap.download_attempts, socket_timeout=snap.timeout, ) diff --git a/web/settings.py b/web/settings.py index b08b1f1..e5ef841 100644 --- a/web/settings.py +++ b/web/settings.py @@ -51,6 +51,7 @@ class Snapshot: derive_filmstrips_eager: bool delete_after_download: bool timeout: float + download_concurrency: int download_attempts: int max_attempts: int sync_interval_seconds: int @@ -250,6 +251,7 @@ def _make_snapshot(self, data: dict) -> Snapshot: derive_filmstrips_eager=m.DERIVE_FILMSTRIPS_EAGER, delete_after_download=m.DELETE_AFTER_DOWNLOAD, timeout=float(m.TIMEOUT), + download_concurrency=m.DOWNLOAD_CONCURRENCY, download_attempts=m.DOWNLOAD_ATTEMPTS, max_attempts=m.MAX_DOWNLOAD_ATTEMPTS, sync_interval_seconds=m.SYNC_INTERVAL, diff --git a/web/settings_schema.py b/web/settings_schema.py index 768876f..7249b45 100644 --- a/web/settings_schema.py +++ b/web/settings_schema.py @@ -43,6 +43,7 @@ class SettingsModel(BaseModel): DERIVE_FILMSTRIPS_EAGER: bool = False DELETE_AFTER_DOWNLOAD: bool = False TIMEOUT: int = Field(default=10, ge=1, le=60) + DOWNLOAD_CONCURRENCY: int = Field(default=1, ge=1, le=4) DOWNLOAD_ATTEMPTS: int = Field(default=3, ge=1, le=10) MAX_DOWNLOAD_ATTEMPTS: int = Field(default=5, ge=1, le=20) SYNC_INTERVAL: int = Field(default=600, ge=60, le=86400) @@ -163,7 +164,8 @@ def _validate_disk_critical(self): "ADDRESS", "ADDRESS_FALLBACK", "IMPORT_PATH", "GROUPING", "HTML", "GPS_EXTRACT", "DERIVE_THUMBS_EAGER", "DERIVE_FILMSTRIPS_EAGER", "DELETE_AFTER_DOWNLOAD", - "TIMEOUT", "DOWNLOAD_ATTEMPTS", "MAX_DOWNLOAD_ATTEMPTS", "SYNC_INTERVAL", + "TIMEOUT", "DOWNLOAD_CONCURRENCY", "DOWNLOAD_ATTEMPTS", + "MAX_DOWNLOAD_ATTEMPTS", "SYNC_INTERVAL", "ENABLE_SCHEDULED_SYNC", "WEB_HOST", "WEB_PORT", "EXPORT_ENCODER", "NOMINATIM_EMAIL", "GEOCODE_ENABLED", "SYNC_RO_ONLY", "RETENTION_MAX_DAYS", "RETENTION_DISK_PCT", diff --git a/web/static/app.js b/web/static/app.js index 4fa779f..4040d8d 100644 --- a/web/static/app.js +++ b/web/static/app.js @@ -50,6 +50,7 @@ const state = { syncRunning: false, syncPaused: false, currentFilename: null, + currentDownloads: {}, // Mirrored from /api/settings on login + on Save so display // helpers (fmtDistance) don't need to read from settingsState // (which is only loaded when the Settings tab is visited). @@ -2633,7 +2634,12 @@ function handleEvent(ev) { if (ev.state.sync_status) { applyStatus(ev.state.sync_status, ev.state.sync_status_reason); } - if (ev.state.current_item) updateCurrent(ev.state.current_item); + if (ev.state.current_items) { + state.currentDownloads = { ...ev.state.current_items }; + renderCurrentDownloads(); + } else if (ev.state.current_item) { + updateCurrent(ev.state.current_item); + } if (ev.state.session) updateSessionStats(ev.state.session); if (ev.state.sync_state) { state.syncRunning = ev.state.sync_state.running; @@ -2666,8 +2672,8 @@ function handleEvent(ev) { updateCurrent(ev); break; case "item_finished": - document.getElementById("current-download").innerHTML = ""; - state.currentFilename = null; + delete state.currentDownloads[ev.filename]; + renderCurrentDownloads(); refreshQueueIfVisible(); break; case "session_stats": @@ -2739,24 +2745,51 @@ document.getElementById("current-download").addEventListener("click", (e) => { }); function updateCurrent(info) { + if (!info || !info.filename) return; + state.currentDownloads[info.filename] = { + ...(state.currentDownloads[info.filename] || {}), + ...info, + }; + renderCurrentDownloads(); +} + +function renderCurrentDownloads() { const el = document.getElementById("current-download"); - const pct = info.total ? (100 * info.bytes / info.total).toFixed(1) : 0; - const done = fmtBytes(info.bytes); - const total = info.total ? fmtBytes(info.total) : "?"; - const speed = info.speed ? `${fmtBytes(info.speed)}/s` : ""; + const items = Object.values(state.currentDownloads || {}) + .filter((item) => item && item.filename) + .sort((a, b) => String(a.filename).localeCompare(String(b.filename))); + if (!items.length) { + el.innerHTML = ""; + state.currentFilename = null; + return; + } + const skipLabel = items.length > 1 ? "Skip active downloads" : "Skip this file"; el.innerHTML = `
- ${escHtml(info.filename)} + ${items.length > 1 ? `${items.length} active downloads` : escHtml(items[0].filename)} + title="${skipLabel}" aria-label="${skipLabel}">×
-
- ${done} / ${total} · ${pct}% · ${speed} +
+ ${items.map((info) => { + const pct = info.total ? (100 * (info.bytes || 0) / info.total).toFixed(1) : 0; + const done = fmtBytes(info.bytes || 0); + const total = info.total ? fmtBytes(info.total) : "?"; + const speed = info.speed ? `${fmtBytes(info.speed)}/s` : ""; + return ` +
+
${escHtml(info.filename)}
+
+ ${done} / ${total} · ${pct}%${speed ? ` · ${speed}` : ""} +
+
+
+ `; + }).join("")}
-
`; - state.currentFilename = info.filename; + state.currentFilename = items[0].filename; } function updateSessionStats(s) { @@ -3051,6 +3084,14 @@ function renderSyncSection(pane) { renderField(pane, "ENABLE_SCHEDULED_SYNC", "Run scheduled sync", checkbox("ENABLE_SCHEDULED_SYNC")); renderField(pane, "SYNC_INTERVAL", "Sync interval (seconds)", textInput("SYNC_INTERVAL", { type: "number", min: 60, max: 86400 })); + renderField(pane, "DOWNLOAD_CONCURRENCY", "Simultaneous downloads", + textInput("DOWNLOAD_CONCURRENCY", { type: "number", min: 1, max: 4 })); + const concurrencyNote = document.createElement("p"); + concurrencyNote.className = "hint"; + concurrencyNote.textContent = + "Downloads multiple clips at once. Start with 2; higher values may " + + "overload some dashcams or Wi-Fi links."; + pane.appendChild(concurrencyNote); renderField(pane, "DOWNLOAD_ATTEMPTS", "Per-cycle retry count", textInput("DOWNLOAD_ATTEMPTS", { type: "number", min: 1, max: 10 })); renderField(pane, "MAX_DOWNLOAD_ATTEMPTS", "Total retry budget", diff --git a/web/static/styles.css b/web/static/styles.css index 484af08..844e834 100644 --- a/web/static/styles.css +++ b/web/static/styles.css @@ -790,6 +790,25 @@ main { height: 100%; background: var(--accent); width: 0%; transition: width 0.2s; } +.current-items { + display: grid; + gap: 12px; + margin-top: 12px; +} +.current-item + .current-item { + border-top: 1px solid var(--border); + padding-top: 12px; +} +.current-file { + font-weight: 700; + overflow-wrap: anywhere; +} +.current-meta { + color: var(--muted); + font-size: 12px; + margin-top: 4px; + font-variant-numeric: tabular-nums; +} .queue-filters { display: flex; gap: 10px; margin-bottom: 12px; align-items: center; flex-wrap: wrap;