Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,16 @@ For complete setup, usage examples, hardware acceleration configuration, and tro
files. You can change the number of parallel processes used with `-j`. It
defaults to 2x number of logical cores.

Downloaded archives are cached in `resources/.cache/` during the run,
deduplicated across test suites, and downloaded in parallel. HTTP concurrency is
capped at 8 by default to avoid hammering remote mirrors. With `--keep`,
the cache is preserved after the run.

Note: running multiple `fluster.py download` instances against the same
`resources/` directory concurrently is not supported — one instance's
cleanup may remove files another is still extracting. Use `--keep` or
point each instance at a separate resource dir if you need parallel runs.

Use the `-c/--codec` option to download test suites for specific codecs:
- `./fluster.py download -c H.264,H.265` downloads all H.264 and H.265 test suites
- `./fluster.py download AV1-TEST-VECTORS VP9-TEST-VECTORS` downloads the specific AV1-TEST-VECTORS and VP9-TEST-VECTORS test suite
Expand Down
99 changes: 91 additions & 8 deletions fluster/fluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@
import os
import os.path
import sys
from concurrent.futures import ThreadPoolExecutor, as_completed
from enum import Enum
from functools import lru_cache
from shutil import rmtree
from typing import Any, Dict, Iterator, List, Optional, Set, Tuple

from fluster import utils
from fluster.codec import Codec, Profile
from fluster.decoder import DECODERS, Decoder

Expand Down Expand Up @@ -883,11 +885,92 @@ def download_test_suites(
download_test_suites = self.test_suites
print(f"Test suites: {[ts.name for ts in download_test_suites]}")

for test_suite in download_test_suites:
test_suite.download(
jobs,
self.resources_dir,
verify=True,
keep_file=keep_file,
retries=retries,
)
if not download_test_suites:
print("No test suites to download.")
return

cache_dir = os.path.join(self.resources_dir, ".cache")
with utils.DownloadManager(cache_dir=cache_dir, verify=True, keep_file=keep_file, retries=retries) as manager:
# Phase 1: collect every (url, checksum) across all selected suites,
# deduplicated. Different suites can share a URL (e.g. AV1-ARGON
# archive).
url_checksums: Dict[str, str] = {}
checksum_conflicts: List[Tuple[str, str, str]] = []
for ts in download_test_suites:
for tv in ts.test_vectors.values():
existing = url_checksums.get(tv.source)
if existing is None or existing == "__skip__":
# Prefer a real checksum over an unset/__skip__ one.
url_checksums[tv.source] = tv.source_checksum
elif tv.source_checksum not in (existing, "__skip__"):
checksum_conflicts.append((tv.source, existing, tv.source_checksum))
if checksum_conflicts:
for src, kept, other in checksum_conflicts:
print(
f"ERROR: conflicting checksums for {src}: "
f"{kept} vs {other} — the test-suite definitions disagree."
)
sys.exit(
f"{len(checksum_conflicts)} URL(s) have conflicting checksums across "
f"selected suites; refusing to download (fix the test-suite JSON)."
)

# Phase 2: parallel pre-download. The manager's BoundedSemaphore
# caps actual HTTP concurrency; this pool just feeds it work.
if url_checksums:
max_workers = max(1, min(jobs, len(url_checksums), utils.MAX_PREDOWNLOAD_POOL_WORKERS))
print(
f"Pre-downloading {len(url_checksums)} unique source(s) across "
f"{len(download_test_suites)} suite(s) using {max_workers} workers"
)
pre_errors: List[Tuple[str, Exception]] = []
with ThreadPoolExecutor(max_workers=max_workers) as pool:
futures = {pool.submit(manager.get, url, checksum): url for url, checksum in url_checksums.items()}
for fut in as_completed(futures):
url = futures[fut]
try:
fut.result()
except Exception as exc: # noqa: BLE001 - report all
pre_errors.append((url, exc))
if pre_errors:
failed_urls = {err_url for err_url, _ in pre_errors}
for err_url, err_exc in pre_errors:
print(f"Error pre-downloading {err_url}: {type(err_exc).__name__}: {err_exc}")
print(f"{len(pre_errors)} URL(s) failed to pre-download — skipping affected suites.")
else:
failed_urls = set()
else:
failed_urls = set()

# Phase 3: extract per suite. Cache is now warm — TestSuite.download
# hits manager.get() which short-circuits to the cached path.
# Suites whose URLs intersect with failed_urls are skipped so the
# rest of the batch still extracts.
skipped_suites: List[str] = []
failed_extractions: List[str] = []
for test_suite in download_test_suites:
suite_urls = {tv.source for tv in test_suite.test_vectors.values()}
if suite_urls & failed_urls:
skipped_suites.append(test_suite.name)
continue
try:
test_suite.download(
jobs,
self.resources_dir,
download_manager=manager,
)
except utils.BadArchiveError as exc:
# The cache entry was already invalidated inside download();
# report cleanly and keep going so the rest of the batch
# still extracts.
print(f"\n{test_suite.name}: {exc}")
failed_extractions.append(test_suite.name)
if skipped_suites:
print(
f"\nSkipped {len(skipped_suites)} suite(s) due to pre-download failures: "
f"{skipped_suites}"
)
if failed_extractions:
print(f"Corrupt archive(s) invalidated for: {failed_extractions} (re-run to retry)")
if skipped_suites or failed_extractions:
sys.exit(1)
4 changes: 2 additions & 2 deletions fluster/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,8 +335,8 @@ def _add_download_cmd(self, subparsers: Any) -> None:
subparser.add_argument(
"-k",
"--keep",
help="keep original downloaded file after extracting. Only applicable to compressed "
"files such as .zip, .tar.gz, etc",
help="keep original downloaded file after extracting. Archives are stored in "
"<resources>/.cache/. Only applicable to compressed files (.zip, .tar.gz, etc)",
action="store_true",
)
subparser.add_argument(
Expand Down
Loading
Loading