From e7e21bdd2190e4dc1cf9bf33da493f8ab6232ded Mon Sep 17 00:00:00 2001 From: donmahallem Date: Thu, 5 Mar 2026 07:41:56 +0100 Subject: [PATCH] feat(python): setup pytest benchmark --- python/cmd/__init__.py | 6 -- python/cmd/cli_output.py | 50 ---------- python/cmd/cmd_benchmark.py | 167 ---------------------------------- python/cmd/cmd_list.py | 68 -------------- python/cmd/cmd_parser.py | 50 ---------- python/cmd/collect_solvers.py | 18 ---- python/cmd/const.py | 27 ------ python/cmd/get_part.py | 25 ----- python/cmd/testdata.py | 67 -------------- python/conftest.py | 46 ++++++++++ python/test_benchmark.py | 80 ++++++++++++++++ 11 files changed, 126 insertions(+), 478 deletions(-) delete mode 100644 python/cmd/__init__.py delete mode 100644 python/cmd/cli_output.py delete mode 100644 python/cmd/cmd_benchmark.py delete mode 100644 python/cmd/cmd_list.py delete mode 100644 python/cmd/cmd_parser.py delete mode 100644 python/cmd/collect_solvers.py delete mode 100644 python/cmd/const.py delete mode 100644 python/cmd/get_part.py delete mode 100644 python/cmd/testdata.py create mode 100644 python/conftest.py create mode 100644 python/test_benchmark.py diff --git a/python/cmd/__init__.py b/python/cmd/__init__.py deleted file mode 100644 index aad0f870..00000000 --- a/python/cmd/__init__.py +++ /dev/null @@ -1,6 +0,0 @@ -from .cmd_benchmark import BenchmarkArgs, BenchmarkResult -from .cmd_list import ListArgs, ListResult -from .cmd_parser import SolveArgs, SolverResult -from .collect_solvers import collect_solvers -from .const import CommonArgs, SUPPORTED_YEARS, SUPPORTED_DAYS, SUPPORTED_PARTS, Solver -from .cli_output import CliOutput diff --git a/python/cmd/cli_output.py b/python/cmd/cli_output.py deleted file mode 100644 index 4d97a32d..00000000 --- a/python/cmd/cli_output.py +++ /dev/null @@ -1,50 +0,0 @@ -from typing import Any, Optional -import tqdm -import json as _json -import sys -from dataclasses import asdict - - -class CliOutput: - def __init__( - self, - json: bool = False, - verbose: bool = False, - output: Optional[str] = None, - ): - self.json = bool(json) - self.verbose = bool(verbose) - self.output = output - - def progress(self, iterable, **kwargs): - """Return a tqdm progress bar on stderr, unless JSON is streaming to - stdout *and* verbose is off (would interleave with the JSON output).""" - # json_to_stdout: structured output is going directly to the terminal. - json_to_stdout = self.json and self.output is None - if not json_to_stdout or self.verbose: - return tqdm.tqdm(iterable, file=sys.stderr, **kwargs) - return iterable - - def print(self, *args, **kwargs) -> None: - """Print a diagnostic / warning message to stderr.""" - print(*args, file=sys.stderr, **kwargs) - - def render(self, result: Any) -> None: - """Render a result to the output file (if set) or stdout. - - JSON mode writes JSON; plain mode writes render_text(). - The output file receives exactly one result and is always UTF-8. - """ - if self.json: - data = result.to_json() if hasattr(result, "to_json") else asdict(result) - text = _json.dumps(data, indent=2) - else: - text = ( - result.render_text() if hasattr(result, "render_text") else str(result) - ) - - if self.output: - with open(self.output, "w", encoding="utf-8") as f: - f.write(text + "\n") - else: - print(text) diff --git a/python/cmd/cmd_benchmark.py b/python/cmd/cmd_benchmark.py deleted file mode 100644 index 3dd08d05..00000000 --- a/python/cmd/cmd_benchmark.py +++ /dev/null @@ -1,167 +0,0 @@ -from .cli_output import CliOutput - -from .const import CommonArgs -from typing import Optional, List -import pathlib -import time -import io -from dataclasses import dataclass, asdict - - -@dataclass -class BenchmarkStats: - iterations: int - avg_ms: float - total_time_sec: float - - -@dataclass -class BenchmarkEntry: - year: int - day: int - part: int - name: str - stats: BenchmarkStats - - def __lt__(self, other: "BenchmarkEntry") -> bool: - return (self.year, self.day, self.part, self.name) < ( - other.year, - other.day, - other.part, - other.name, - ) - - -@dataclass(slots=True) -class BenchmarkArgs(CommonArgs): - year: Optional[list[int]] - day: Optional[list[int]] - part: Optional[list[int]] - timeout: Optional[float] - - -@dataclass -class BenchmarkResult: - entries: List["BenchmarkEntry"] - - @staticmethod - def execute(cfg: CliOutput, args: BenchmarkArgs): - from .collect_solvers import collect_solvers - from .testdata import TestData - - solvers = collect_solvers() - - if args.year is not None: - solvers = [s for s in solvers if s.year in args.year] - if args.day is not None: - solvers = [s for s in solvers if s.day in args.day] - if args.part is not None: - solvers = [s for s in solvers if s.part in args.part] - - data_path = pathlib.Path(__file__).parent.parent.parent / "test" / "data.json" - if not data_path.exists(): - cfg.print("Error: Test data not found.") - return None - test_data = TestData.load(data_path) - - tasks = [] - for solver in solvers: - if not test_data.has_year(solver.year): - continue - if not test_data.has_day(solver.year, solver.day): - continue - - cases = test_data[solver.year][solver.day] - for idx, case in enumerate(cases): - name = case.get("name", f"case{idx}") - if "input" in case: - input_data = case["input"] - else: - file_ref = case.get("file") - if not file_ref: - continue - input_path = data_path.parent / file_ref - if not input_path.exists(): - cfg.print(f"Warning: Input file {input_path} not found.") - continue - input_data = input_path.read_text() - if f"part{solver.part}" in case: - tasks.append( - { - "year": solver.year, - "day": solver.day, - "part": solver.part, - "name": name, - "solver": solver.func, - "input_data": input_data, - } - ) - - timeout_limit = args.timeout if args.timeout else 1.0 - entries = [] - - progress_bar = cfg.progress(tasks, desc="Benchmarking") - for task in progress_bar: - year, day, part, name = ( - task["year"], - task["day"], - task["part"], - task["name"], - ) - solver, input_data = task["solver"], task["input_data"] - - iterations = 0 - elapsed = 0.0 - bench_start = time.perf_counter() - - while elapsed < timeout_limit: - stream = io.StringIO(input_data) - solver(stream) - iterations += 1 - elapsed = time.perf_counter() - bench_start - - avg_ms = (elapsed / iterations) * 1000 if iterations else 0.0 - - stats = BenchmarkStats( - iterations=iterations, - avg_ms=avg_ms, - total_time_sec=elapsed, - ) - entries.append( - BenchmarkEntry(year=year, day=day, part=part, name=name, stats=stats) - ) - - return BenchmarkResult(entries=entries) - - def render_text(self) -> str: - lines = [] - total_avg = sum(e.stats.avg_ms for e in self.entries) - - lines.append("") - lines.append( - f"{'YEAR':<6} | {'DAY':<4} | {'PART':<4} | {'NAME':<15} | {'ITERATIONS':>12} | {'AVG TIME (ms)':>15} | {'%':>6}" - ) - lines.append("-" * 103) - lines.append("") - - for entry in sorted(self.entries): - pct = (entry.stats.avg_ms / total_avg * 100) if total_avg > 0 else 0.0 - lines.append( - f"{entry.year:<6} | {entry.day:02d} | {entry.part:<4} | {entry.name:<15} | {entry.stats.iterations:>12} | {entry.stats.avg_ms:>15.4f} | {pct:6.2f}" - ) - - return "\n".join(lines) - - def to_json(self) -> dict: - """Convert to JSON-serializable nested dict.""" - output: dict[int, dict[int, dict[int, dict[str, dict]]]] = {} - for entry in self.entries: - y, d, p = entry.year, entry.day, entry.part - if y not in output: - output[y] = {} - if d not in output[y]: - output[y][d] = {} - if p not in output[y][d]: - output[y][d][p] = {} - output[y][d][p][entry.name] = asdict(entry.stats) - return output diff --git a/python/cmd/cmd_list.py b/python/cmd/cmd_list.py deleted file mode 100644 index 0fe7007c..00000000 --- a/python/cmd/cmd_list.py +++ /dev/null @@ -1,68 +0,0 @@ -from dataclasses import dataclass -from typing import TYPE_CHECKING, List - -from .cli_output import CliOutput - -from .const import CommonArgs - -if TYPE_CHECKING: - from .const import Solver - - -@dataclass(slots=True) -class ListArgs(CommonArgs): - pass - - -@dataclass -class ListResult: - solvers: List["Solver"] - - @staticmethod - def execute(cfg: CliOutput, args: ListArgs) -> "ListResult": - from .collect_solvers import collect_solvers - - solvers = collect_solvers() - return ListResult(solvers=solvers) - - def render_text(self) -> str: - lines = [] - current_year = None - printed_days = set() - - for solver in self.solvers: - if solver.year != current_year: - if current_year is not None: - lines.append("") - lines.append(f"Year {solver.year}:") - current_year = solver.year - - # Print once per day - day_key = (solver.year, solver.day) - if day_key not in printed_days: - parts = sorted( - [ - s.part - for s in self.solvers - if s.year == solver.year and s.day == solver.day - ] - ) - line = f" Day {solver.day:02}:" - for p in parts: - line += f" part{p}=true" - lines.append(line) - printed_days.add(day_key) - - return "\n".join(lines) - - def to_json(self) -> dict: - output: dict[int, dict[int, list[int]]] = {} - for solver in self.solvers: - y, d, p = solver.year, solver.day, solver.part - if y not in output: - output[y] = {} - if d not in output[y]: - output[y][d] = [] - if p not in output[y][d]: - output[y][d].append(p) - return output diff --git a/python/cmd/cmd_parser.py b/python/cmd/cmd_parser.py deleted file mode 100644 index 92bd55f4..00000000 --- a/python/cmd/cmd_parser.py +++ /dev/null @@ -1,50 +0,0 @@ -from .cli_output import CliOutput - -from .const import CommonArgs -from typing import Optional, Any -from dataclasses import dataclass -import sys - - -@dataclass(slots=True) -class SolveArgs(CommonArgs): - year: int - day: int - part: int - input: Optional[str] - - -@dataclass -class SolverResult: - year: int - day: int - part: int - result: Any - - @staticmethod - def execute(cfg: CliOutput, args: SolveArgs) -> Optional["SolverResult"]: - from .get_part import getPart - - solver = getPart(args.year, args.day, args.part) - if not solver: - cfg.print("Could not find requested solver") - return None - - if args.input: - with open(args.input, "r") as f: - result = solver(f) - else: - result = solver(sys.stdin) - - return SolverResult(year=args.year, day=args.day, part=args.part, result=result) - - def render_text(self) -> str: - return f"Result (Year {self.year}, Day {self.day}, Part {self.part}): {self.result}" - - def to_json(self) -> dict: - return { - "year": self.year, - "day": self.day, - "part": self.part, - "result": self.result, - } diff --git a/python/cmd/collect_solvers.py b/python/cmd/collect_solvers.py deleted file mode 100644 index f3d46a3d..00000000 --- a/python/cmd/collect_solvers.py +++ /dev/null @@ -1,18 +0,0 @@ -from .const import SUPPORTED_YEARS, SUPPORTED_DAYS, SUPPORTED_PARTS, Solver -from typing import List - -from .get_part import getPart - - -def collect_solvers() -> List[Solver]: - solvers: List[Solver] = [] - for year in SUPPORTED_YEARS: - for day in SUPPORTED_DAYS: - for part in SUPPORTED_PARTS: - part_func = getPart(year, day, part) - if part_func: - solvers.append( - Solver(year=year, day=day, part=part, func=part_func) - ) - - return sorted(solvers) diff --git a/python/cmd/const.py b/python/cmd/const.py deleted file mode 100644 index 99aa3b7a..00000000 --- a/python/cmd/const.py +++ /dev/null @@ -1,27 +0,0 @@ -from typing import Final, Optional, Callable -from dataclasses import dataclass - -SUPPORTED_YEARS: Final[tuple[int, ...]] = (23, 24, 25) - -SUPPORTED_DAYS: Final[tuple[int, ...]] = tuple(range(1, 26)) - -SUPPORTED_PARTS: Final[tuple[int, ...]] = (1, 2) - - -@dataclass(slots=True) -class CommonArgs: - json: bool - verbose: bool - output: Optional[str] - - -@dataclass(slots=True) -class Solver: - year: int - day: int - part: int - func: Optional[Callable] = None # None when only listing, populated when executing - - def __lt__(self, other: "Solver") -> bool: - """Enable sorting: by year, then day, then part.""" - return (self.year, self.day, self.part) < (other.year, other.day, other.part) diff --git a/python/cmd/get_part.py b/python/cmd/get_part.py deleted file mode 100644 index 21759230..00000000 --- a/python/cmd/get_part.py +++ /dev/null @@ -1,25 +0,0 @@ -import importlib -import importlib.util -import sys -import pathlib - -root_path = pathlib.Path(__file__).resolve().parent.parent - -# Add it to sys.path so 'import aoc24' works -if str(root_path) not in sys.path: - sys.path.insert(0, str(root_path)) - - -def getPart(year, day, part): - compound = f"aoc{year}.day{day:02}.part_{part}" - try: - spec = importlib.util.find_spec(compound, package=__package__) - if not spec: - return None - mod = importlib.import_module(compound, package=__package__) - partName = f"Part{part}" - if hasattr(mod, partName): - return getattr(mod, partName) - except Exception as exc: - pass - return None diff --git a/python/cmd/testdata.py b/python/cmd/testdata.py deleted file mode 100644 index bd025819..00000000 --- a/python/cmd/testdata.py +++ /dev/null @@ -1,67 +0,0 @@ -import pathlib -import json -from typing import Dict, TypedDict, List, Optional, Union, Literal, Mapping - -type Language = Literal["go", "python"] -type ResultType = Literal["int", "int16", "string"] - -type TestResult = Union[int, str, List[Union[int, str]]] - - -class PartExpectation(TypedDict, total=False): - result: TestResult - type: ResultType - skip_languages: List[Language] - - -class TestCase(TypedDict, total=False): - name: str - input: Optional[str] - file: Optional[str] - skip_languages: List[Language] - part1: PartExpectation - part2: PartExpectation - - -type ParsedTestData = Mapping[int, Mapping[int, List[TestCase]]] - - -class TestData: - def __init__(self, data: ParsedTestData): - self._data = data - - @classmethod - def load(cls, path: str | pathlib.Path) -> "TestData": - """Loads test data from a JSON file and parses keys to integers.""" - path = pathlib.Path(path) - if not path.exists(): - raise FileNotFoundError(f"Test data file not found: {path}") - - raw: Dict[str, Dict[str, List[TestCase]]] = json.loads(path.read_text()) - - parsed: Dict[int, Dict[int, List[TestCase]]] = {} - - for y_str, days in raw.items(): - year = int(y_str) - parsed[year] = {int(d_str): cases for d_str, cases in days.items()} - - return cls(parsed) - - def get_year(self, year: int) -> Mapping[int, List[TestCase]]: - return self._data.get(year, {}) - - def get_day(self, year: int, day: int) -> List[TestCase]: - return self._data.get(year, {}).get(day, []) - - def has_year(self, year: int) -> bool: - return year in self._data - - def has_day(self, year: int, day: int) -> bool: - return day in self._data.get(year, {}) - - def __getitem__(self, key: int) -> Mapping[int, List[TestCase]]: - return self._data[key] - - def __repr__(self) -> str: - years = list(self._data.keys()) - return f"" diff --git a/python/conftest.py b/python/conftest.py new file mode 100644 index 00000000..680cde91 --- /dev/null +++ b/python/conftest.py @@ -0,0 +1,46 @@ +from pytest_benchmark.stats import Stats +from pytest import Config +import json + +def pytest_benchmark_generate_json(config: Config, benchmarks, include_data, machine_info, commit_info): + print(config) + import os + import re + from datetime import datetime, timezone + measurements = [] + for bench in benchmarks: + param = getattr(bench, "param", "") + if not param: + continue + + m = re.match(r"^(\d+)_day(\d+)_(.+)_part(\d+)$", param) + if not m: + continue + + year, day, name, part = m.groups() + + series_key = f"{year}/{day}/part{part}" + group_key = name + + mean_sec = bench.stats.mean + duration_ns = str(int(mean_sec * 1_000_000_000)) + + iterations = getattr(bench.stats, "rounds", None) + if not iterations: + iterations = getattr(bench, "iterations", 1) + + measurements.append({ + "series_key": series_key, + "group_key": group_key, + "duration": duration_ns, + "iterations": iterations + }) + + output = { + "name": "python", + "hash": os.environ.get("GITHUB_SHA", "unknown"), + "timestamp": datetime.now(timezone.utc).isoformat(), + "measurements": measurements + } + + return output diff --git a/python/test_benchmark.py b/python/test_benchmark.py new file mode 100644 index 00000000..bb044317 --- /dev/null +++ b/python/test_benchmark.py @@ -0,0 +1,80 @@ +import json +import os +import io +import pytest +from pathlib import Path +from importlib import import_module + +CURRENT_DIR = Path(__file__).resolve().parent +TEST_DIR = CURRENT_DIR.parent / "test" + +def _get_test_cases(): + data_json_path = CURRENT_DIR.parent / "test" / "data.json" + if not data_json_path.exists(): + return [] + + with open(data_json_path, "r", encoding="utf-8") as f: + data = json.load(f) + + cases_list = [] + + for year_str, days in data.items(): + year = int(year_str) + for day_str, cases in days.items(): + day = int(day_str) + for i, case in enumerate(cases): + name = case.get("name", f"case_{i}") + input_data = case.get("input") + file_path = case.get("file") + + for part_num, pkey in [(1, "part1"), (2, "part2")]: + if pkey in case: + expected_data = case[pkey] + if "result" in expected_data: + cases_list.append({ + "id": f"{year}_day{day}_{name}_part{part_num}", + "year": year, + "day": day, + "part": part_num, + "expected": expected_data["result"], + "input_data": input_data, + "file_path": file_path + }) + return cases_list + +def pytest_generate_tests(metafunc): + if "aoc_case" in metafunc.fixturenames: + cases = _get_test_cases() + metafunc.parametrize("aoc_case", cases, ids=[c["id"] for c in cases]) + +def test_aoc_benchmark(benchmark, aoc_case): + module_name = f"aoc{aoc_case['year']}.day{aoc_case['day']:02d}.part_{aoc_case['part']}" + try: + mod = import_module(module_name) + except ImportError: + pytest.skip(f"Module {module_name} not found") + + func_name = f"Part{aoc_case['part']}" + func = getattr(mod, func_name, None) + if not func: + pytest.skip(f"Function {func_name} not found in {module_name}") + + if aoc_case["input_data"] is not None: + raw_text = aoc_case["input_data"] + elif aoc_case["file_path"]: + abs_path = TEST_DIR.joinpath(aoc_case["file_path"]).resolve() + if not os.path.exists(abs_path): + pytest.fail(f"Input file not found: {abs_path}") + with open(abs_path, "r", encoding="utf-8") as f: + raw_text = f.read() + else: + pytest.fail("No input provided for test case") + + # This wrapper isolates the I/O string instantiation, but the main cost is the solution + def run_part(): + f_obj = io.StringIO(raw_text) + return func(f_obj) + + # benchmark measures the time it takes to execute run_part + result = benchmark(run_part) + assert result == aoc_case["expected"]