Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions tests/test_download_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,36 @@ def test_wire_bytes_monotonic_across_file_boundary():
assert s.session_bytes == 1500 # 1000 (a) + 500 (b)


def test_wire_bytes_monotonic_across_parallel_files():
s, c = _mk()
s.note_started("a", 1000)
s.note_started("b", 2000)
c.t = 1.0
s.note_progress("a", 100, 1000)
c.t = 2.0
s.note_progress("b", 200, 2000)
c.t = 3.0
s.note_progress("a", 300, 1000)

assert s.session_bytes == 500
assert s.avg_speed_bps == 200.0


def test_eta_includes_all_active_parallel_files():
s, c = _mk(remaining=1000)
s.note_started("a", 1000)
s.note_started("b", 2000)
c.t = 1.0
s.note_progress("a", 500, 1000)
c.t = 5.0
s.note_progress("b", 1000, 2000)

# speed = (1500 - 500) / (5 - 1) = 250 B/s
# remaining = pending 1000 + a remainder 500 + b remainder 1000
assert s.avg_speed_bps == 250.0
assert s.eta_seconds == 10.0


def test_retry_within_file_no_negative_delta():
"""download_file retries reset bytes_done WITHOUT a new item_started.
The wire-byte counter must clamp the backward jump to zero."""
Expand Down
22 changes: 22 additions & 0 deletions tests/test_hub.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,32 @@ async def test_initial_last_state_includes_new_keys() -> None:
assert "disk_pct" in hub.last_state
assert "sync_status" in hub.last_state
assert "sync_status_reason" in hub.last_state
assert "current_items" in hub.last_state
assert hub.last_state["sync_error"] is None
assert hub.last_state["disk_pct"] is None
assert hub.last_state["sync_status"] is None
assert hub.last_state["sync_status_reason"] is None
assert hub.last_state["current_items"] == {}


async def test_broadcast_tracks_multiple_current_items() -> None:
hub = Hub()
await hub.broadcast({"type": "item_started", "filename": "a.MP4", "total": 100})
await hub.broadcast({"type": "item_started", "filename": "b.MP4", "total": 200})
await hub.broadcast({
"type": "item_progress",
"filename": "a.MP4",
"bytes": 40,
"total": 100,
"speed": 10,
})

assert set(hub.last_state["current_items"]) == {"a.MP4", "b.MP4"}
assert hub.last_state["current_items"]["a.MP4"]["bytes"] == 40

await hub.broadcast({"type": "item_finished", "filename": "a.MP4"})
assert set(hub.last_state["current_items"]) == {"b.MP4"}
assert hub.last_state["current_item"]["filename"] == "b.MP4"


def _stub_provider(**snap_overrides):
Expand Down
30 changes: 30 additions & 0 deletions tests/test_queue_ro_only.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,33 @@ def test_next_pending_default_unchanged(db: Database) -> None:
item = queue.next_pending(db)
assert item is not None
assert item.filename == "DRV.MP4"


def test_claim_next_pending_marks_row_downloading(db: Database) -> None:
_add_pending(db, filename="A.MP4", source_dir="/DCIM/Movie", enq=1)

item = queue.claim_next_pending(db)

assert item is not None
assert item.filename == "A.MP4"
assert item.state == "downloading"
assert item.attempts == 1
with db.conn() as c:
row = c.execute(
"SELECT state, attempts FROM download_queue WHERE filename=?",
("A.MP4",),
).fetchone()
assert row["state"] == "downloading"
assert row["attempts"] == 1


def test_claim_next_pending_does_not_reclaim_downloading_row(
db: Database,
) -> None:
_add_pending(db, filename="A.MP4", source_dir="/DCIM/Movie", enq=1)

first = queue.claim_next_pending(db)
second = queue.claim_next_pending(db)

assert first is not None
assert second is None
11 changes: 11 additions & 0 deletions tests/test_settings_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,17 @@ def test_get_settings_includes_delete_after_download(authed_client) -> None:
assert body["editable"]["DELETE_AFTER_DOWNLOAD"] is False


def test_download_concurrency_round_trips(authed_client) -> None:
put = authed_client.put(
"/api/settings", json={"DOWNLOAD_CONCURRENCY": 2}
)
assert put.status_code == 200
assert put.json()["editable"]["DOWNLOAD_CONCURRENCY"] == 2

got = authed_client.get("/api/settings")
assert got.json()["editable"]["DOWNLOAD_CONCURRENCY"] == 2


def test_put_delete_after_download_persists(authed_client) -> None:
r = authed_client.put("/api/settings", json={"DELETE_AFTER_DOWNLOAD": True})
assert r.status_code == 200
Expand Down
13 changes: 13 additions & 0 deletions tests/test_settings_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,19 @@ def test_timeout_range() -> None:
SettingsModel(**{**DEFAULT_VALUES, "TIMEOUT": 61})


def test_download_concurrency_range() -> None:
SettingsModel(**{**DEFAULT_VALUES, "DOWNLOAD_CONCURRENCY": 1})
SettingsModel(**{**DEFAULT_VALUES, "DOWNLOAD_CONCURRENCY": 4})
with pytest.raises(ValueError):
SettingsModel(**{**DEFAULT_VALUES, "DOWNLOAD_CONCURRENCY": 0})
with pytest.raises(ValueError):
SettingsModel(**{**DEFAULT_VALUES, "DOWNLOAD_CONCURRENCY": 5})


def test_download_concurrency_is_editable() -> None:
assert "DOWNLOAD_CONCURRENCY" in EDITABLE_KEYS


def test_web_port_range() -> None:
SettingsModel(**{**DEFAULT_VALUES, "WEB_PORT": 1})
SettingsModel(**{**DEFAULT_VALUES, "WEB_PORT": 65535})
Expand Down
156 changes: 156 additions & 0 deletions tests/test_sync_worker_concurrency.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
"""SyncWorker parallel download drain."""
from __future__ import annotations

import asyncio
import time
import types

from web.db import Database
from web.services import queue as q
from web.services import sync_worker as sw_mod
from web.services.sync_worker import SyncWorker


class _Hub:
def __init__(self):
self.events = []

async def broadcast(self, event):
self.events.append(event)

def schedule_broadcast(self, loop, event):
self.events.append(event)


def _add_pending(db: Database, filename: str, enq: int) -> None:
with db.write() as c:
c.execute(
"INSERT INTO download_queue "
"(filename, source_dir, state, enqueued_at, remote_size, "
"recorded_at) VALUES (?, ?, 'pending', ?, ?, ?)",
(filename, "/DCIM/Movie", enq, 1024, int(time.time())),
)


async def test_cycle_runs_downloads_up_to_configured_concurrency(
tmp_path,
monkeypatch,
):
db = Database(str(tmp_path / "v.db"))
rec_dir = tmp_path / "rec"
rec_dir.mkdir()
for idx in range(3):
_add_pending(db, f"{idx}.MP4", idx)

snap = types.SimpleNamespace(
download_concurrency=2,
sync_ro_only=False,
recordings=str(rec_dir),
grouping="none",
retention_max_days=0,
retention_disk_pct=0,
retention_protect_ro=True,
recordings_quota_gb=0,
import_path="",
)
worker = SyncWorker(db, types.SimpleNamespace(get=lambda: snap), _Hub())
worker._active_address = "192.0.2.10"

async def yes(*args, **kwargs):
return True

async def active_address():
return "192.0.2.10", "primary"

worker._emit_disk_pct = yes
worker._check_recordings_writable = yes
worker._select_active_address = active_address
worker._refresh_listing_and_reconcile = yes
worker._probe_one = yes

monkeypatch.setattr(sw_mod.scanner, "scan", lambda *a, **k: 0)
monkeypatch.setattr(sw_mod._retention, "sweep", lambda *a, **k: None)

active = 0
max_active = 0
seen = []

async def fake_download(item, **kwargs):
nonlocal active, max_active
seen.append(item.filename)
active += 1
max_active = max(max_active, active)
await asyncio.sleep(0.05)
q.mark_done(db, item.id)
active -= 1
return True

worker._download_one = fake_download

did = await worker._cycle()

assert did is True
assert max_active == 2
assert sorted(seen) == ["0.MP4", "1.MP4", "2.MP4"]


async def test_cycle_refills_slot_as_soon_as_download_finishes(
tmp_path,
monkeypatch,
):
db = Database(str(tmp_path / "v.db"))
rec_dir = tmp_path / "rec"
rec_dir.mkdir()
for idx in range(5):
_add_pending(db, f"{idx}.MP4", idx)

snap = types.SimpleNamespace(
download_concurrency=4,
sync_ro_only=False,
recordings=str(rec_dir),
grouping="none",
retention_max_days=0,
retention_disk_pct=0,
retention_protect_ro=True,
recordings_quota_gb=0,
import_path="",
)
worker = SyncWorker(db, types.SimpleNamespace(get=lambda: snap), _Hub())
worker._active_address = "192.0.2.10"

async def yes(*args, **kwargs):
return True

async def active_address():
return "192.0.2.10", "primary"

worker._emit_disk_pct = yes
worker._check_recordings_writable = yes
worker._select_active_address = active_address
worker._refresh_listing_and_reconcile = yes
worker._probe_one = yes

monkeypatch.setattr(sw_mod.scanner, "scan", lambda *a, **k: 0)
monkeypatch.setattr(sw_mod._retention, "sweep", lambda *a, **k: None)

fifth_started = asyncio.Event()
seen = []

async def fake_download(item, **kwargs):
seen.append(item.filename)
if item.filename == "0.MP4":
await asyncio.sleep(0.01)
elif item.filename in {"1.MP4", "2.MP4", "3.MP4"}:
await asyncio.wait_for(fifth_started.wait(), timeout=1)
elif item.filename == "4.MP4":
fifth_started.set()
q.mark_done(db, item.id)
return True

worker._download_one = fake_download

did = await worker._cycle()

assert did is True
assert seen[:4] == ["0.MP4", "1.MP4", "2.MP4", "3.MP4"]
assert "4.MP4" in seen
1 change: 1 addition & 0 deletions web/routers/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def _editable_values(snap) -> dict[str, Any]:
"RECORDINGS_QUOTA_GB": snap.recordings_quota_gb,
"DISK_CRITICAL_PCT": snap.disk_critical_pct,
"TIMEOUT": int(snap.timeout),
"DOWNLOAD_CONCURRENCY": snap.download_concurrency,
"DOWNLOAD_ATTEMPTS": snap.download_attempts,
"MAX_DOWNLOAD_ATTEMPTS": snap.max_attempts,
"SYNC_INTERVAL": snap.sync_interval_seconds,
Expand Down
29 changes: 22 additions & 7 deletions web/services/download_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,17 @@
from __future__ import annotations

import time
from dataclasses import dataclass
from collections import deque
from typing import Callable, Deque, Optional, Tuple


@dataclass
class _ActiveFile:
bytes_done: int = 0
total: Optional[int] = None


class DownloadSession:
# Below this many seconds of sample span the windowed speed is too
# noisy to report.
Expand All @@ -38,6 +45,7 @@ def __init__(
self._cur_file: Optional[str] = None
self._cur_file_bytes = 0
self._cur_total: Optional[int] = None
self._active_files: dict[str, _ActiveFile] = {}
self._remaining_pending = 0

# ---- event feeds (called on the loop) ----
Expand All @@ -51,24 +59,28 @@ def note_started(self, filename: str, total: Optional[int]) -> None:
self._cur_file = filename
self._cur_file_bytes = 0
self._cur_total = total
self._active_files[filename] = _ActiveFile(total=total)
self._refresh_remaining()

def note_progress(
self, filename: str, bytes_done: int, total: Optional[int],
) -> None:
if not self._active:
self.note_started(filename, total)
if filename == self._cur_file:
delta = bytes_done - self._cur_file_bytes
else:
cur = self._active_files.get(filename)
if cur is None:
# A file we never saw an item_started for.
self._cur_file = filename
delta = bytes_done
cur = _ActiveFile(total=total)
self._active_files[filename] = cur
delta = bytes_done - cur.bytes_done
if delta < 0:
# A retry reset bytes_done within the same file — never let the
# monotonic counter go backwards.
delta = 0
self._wire_bytes += delta
cur.bytes_done = bytes_done
cur.total = total
self._cur_file = filename
self._cur_file_bytes = bytes_done
self._cur_total = total
now = self._mono()
Expand All @@ -80,6 +92,7 @@ def note_finished(
) -> None:
# Progress ticks already accounted for the bytes; just clear the
# per-file cursor so the next file starts fresh.
self._active_files.pop(filename, None)
self._cur_file = None
self._cur_file_bytes = 0
self._cur_total = None
Expand All @@ -93,6 +106,7 @@ def note_idle(self) -> None:
self._cur_file = None
self._cur_file_bytes = 0
self._cur_total = None
self._active_files.clear()
self._remaining_pending = 0

def refresh_remaining(self) -> None:
Expand Down Expand Up @@ -131,8 +145,9 @@ def eta_seconds(self) -> Optional[float]:
if not speed: # None or 0
return None
remaining = self._remaining_pending
if self._cur_total:
remaining += max(0, self._cur_total - self._cur_file_bytes)
for cur in self._active_files.values():
if cur.total:
remaining += max(0, cur.total - cur.bytes_done)
if remaining <= 0:
return 0.0
return remaining / speed
Expand Down
Loading
Loading