diff --git a/src/climatebenchpress/compressor/plotting/plot_metrics.py b/src/climatebenchpress/compressor/plotting/plot_metrics.py index 648d189..0b0f468 100644 --- a/src/climatebenchpress/compressor/plotting/plot_metrics.py +++ b/src/climatebenchpress/compressor/plotting/plot_metrics.py @@ -7,7 +7,7 @@ import seaborn as sns import xarray as xr -from ..scripts.collect_metrics import parse_error_bounds +from ..scripts.compute_metrics import parse_error_bounds from .error_dist_plotter import ErrorDistPlotter from .variable_plotters import PLOTTERS @@ -58,7 +58,7 @@ def get_legend_name(compressor: str) -> str: def plot_metrics( basepath: Path = Path(), - data_loader_base_path: None | Path = None, + data_loader_basepath: None | Path = None, bound_names: list[str] = ["low", "mid", "high"], normalizer: str = "sz3", exclude_dataset: list[str] = [], @@ -68,7 +68,7 @@ def plot_metrics( ): metrics_path = basepath / "metrics" plots_path = basepath / "plots" - datasets = (data_loader_base_path or basepath) / "datasets" + datasets = (data_loader_basepath or basepath) / "datasets" compressed_datasets = basepath / "compressed-datasets" df = pd.read_csv(metrics_path / "all_results.csv") @@ -696,11 +696,17 @@ def savefig(outfile: Path): parser.add_argument("--exclude-compressor", type=str, nargs="+", default=[]) parser.add_argument("--tiny-datasets", action="store_true", default=False) parser.add_argument("--avoid-latex", action="store_true", default=False) + parser.add_argument("--basepath", type=Path, default=Path()) + parser.add_argument( + "--data-loader-basepath", + type=Path, + default=Path() / ".." / "data-loader", + ) args = parser.parse_args() plot_metrics( - basepath=Path(), - data_loader_base_path=Path() / ".." / "data-loader", + basepath=args.basepath, + data_loader_basepath=args.data_loader_basepath, exclude_compressor=args.exclude_compressor, exclude_dataset=args.exclude_dataset, tiny_datasets=args.tiny_datasets, diff --git a/src/climatebenchpress/compressor/scripts/compress.py b/src/climatebenchpress/compressor/scripts/compress.py index 08c3b9f..24059cb 100644 --- a/src/climatebenchpress/compressor/scripts/compress.py +++ b/src/climatebenchpress/compressor/scripts/compress.py @@ -26,10 +26,10 @@ def compress( include_dataset: None | Container[str] = None, exclude_compressor: Container[str] = tuple(), include_compressor: None | Container[str] = None, - data_loader_base_path: None | Path = None, + data_loader_basepath: None | Path = None, progress: bool = True, ): - datasets = (data_loader_base_path or basepath) / "datasets" + datasets = (data_loader_basepath or basepath) / "datasets" compressed_datasets = basepath / "compressed-datasets" datasets_error_bounds = basepath / "datasets-error-bounds" @@ -187,6 +187,10 @@ def get_error_bounds( parser.add_argument("--include-dataset", type=str, nargs="+", default=None) parser.add_argument("--exclude-compressor", type=str, nargs="+", default=[]) parser.add_argument("--include-compressor", type=str, nargs="+", default=None) + parser.add_argument("--basepath", type=Path, default=Path()) + parser.add_argument( + "--data-loader-basepath", type=Path, default=Path() / ".." / "data-loader" + ) args = parser.parse_args() compress( @@ -195,6 +199,6 @@ def get_error_bounds( include_dataset=args.include_dataset, exclude_compressor=args.exclude_compressor, include_compressor=args.include_compressor, - data_loader_base_path=Path() / ".." / "data-loader", + data_loader_basepath=args.data_loader_basepath, progress=True, ) diff --git a/src/climatebenchpress/compressor/scripts/collect_metrics.py b/src/climatebenchpress/compressor/scripts/compute_metrics.py similarity index 71% rename from src/climatebenchpress/compressor/scripts/collect_metrics.py rename to src/climatebenchpress/compressor/scripts/compute_metrics.py index 20b7147..8db5bc3 100644 --- a/src/climatebenchpress/compressor/scripts/collect_metrics.py +++ b/src/climatebenchpress/compressor/scripts/compute_metrics.py @@ -1,9 +1,10 @@ -__all__ = ["collect_metrics"] +__all__ = ["compute_metrics"] +import argparse import json import re from pathlib import Path -from typing import Optional +from typing import Iterable import pandas as pd import xarray as xr @@ -25,30 +26,35 @@ } -def collect_metrics( +def compute_metrics( basepath: Path = Path(), - data_loader_base_path: None | Path = None, + data_loader_basepath: None | Path = None, + exclude_dataset: Iterable[str] = tuple(), + include_dataset: None | Iterable[str] = None, + exclude_compressor: Iterable[str] = tuple(), + include_compressor: None | Iterable[str] = None, ): - datasets = (data_loader_base_path or basepath) / "datasets" + exclude_compressor = add_compressor_suffixes(exclude_compressor) + include_compressor = add_compressor_suffixes(include_compressor) + + datasets = (data_loader_basepath or basepath) / "datasets" compressed_datasets = basepath / "compressed-datasets" - error_bounds_dir = basepath / "datasets-error-bounds" metrics_dir = basepath / "metrics" - all_results = [] for dataset in compressed_datasets.iterdir(): - if dataset.name == ".gitignore": + if dataset.name == ".gitignore" or dataset.name in exclude_dataset: + continue + if include_dataset and dataset.name not in include_dataset: continue - - with (error_bounds_dir / dataset.name / "error_bounds.json").open() as f: - error_bound_list = json.load(f) for error_bound in dataset.iterdir(): variable2error_bound = parse_error_bounds(error_bound.name) - error_bound_name = get_error_bound_name( - variable2error_bound, error_bound_list - ) for compressor in error_bound.iterdir(): + if compressor.stem in exclude_compressor: + continue + if include_compressor and compressor.stem not in include_compressor: + continue print(f"Evaluating {compressor.stem} on {dataset.name}...") compressed_dataset = ( @@ -74,70 +80,21 @@ def collect_metrics( ) compressor_metrics.mkdir(parents=True, exist_ok=True) - metrics = compute_metrics(compressor_metrics, ds, ds_new) - tests = compute_tests( - compressor_metrics, variable2error_bound, ds, ds_new - ) - measurements = load_measurements(compressed_dataset, compressor) - - df = merge_metrics(measurements, metrics, tests) - df["Dataset"] = dataset.name - df["Error Bound"] = error_bound.name - df["Error Bound Name"] = error_bound_name - all_results.append(df) + compute_compressor_metrics(compressor_metrics, ds, ds_new) + compute_tests(compressor_metrics, variable2error_bound, ds, ds_new) - all_results_df = pd.concat(all_results) - all_results_df.to_csv(metrics_dir / "all_results.csv", index=False) +def add_compressor_suffixes(compressors: None | Iterable[str]) -> list[str]: + if compressors is None: + return [] -def get_error_bound_name( - variable2bound: dict[str, tuple[str, float]], - error_bound_list: list[dict[str, dict[str, Optional[float]]]], - bound_names: list[str] = ["low", "mid", "high"], -) -> str: - """The function returns either "low", "mid", or "high" depending on which error bound - from the variable2bound dictionary matches the exact error bound in the error_bound_list. - - error_bound_list contains one dictionary for each error bound (low, mid, high). - Each of these dictionaries contains the error bounds for - each variable. The variable names in the dictionaries should exactly match the variable names - in the variable2bound dictionary. - - Parameters - ---------- - variable2bound : dict[str, tuple[str, float]] - A dictionary representing a single error bound, mapping variable names to - tuples of error type and error bound. The error type is either "abs_error" - or "rel_error", and the error bound is a float. - error_bound_list : list[dict[str, dict[str, Optional[float]]]] - A list of dictionaries, each representing an error bound (low, mid, high). - Each dictionary contains variable names as keys and a dictionary of error types - and bounds as values. - bound_names : list[str], optional - A list of names for the error bounds, by default ["low", "mid", "high"]. - """ + extended_compressors = [] + for compressor in compressors: + extended_compressors.append(compressor) + extended_compressors.append(compressor + "-conservative-rel") + extended_compressors.append(compressor + "-conservative-abs") - # Convert the variable2bound dictionary to match the format of error_bound_list. - new_bound_format = dict() - for k in variable2bound.keys(): - new_bound_format[k] = { - "abs_error": ( - variable2bound[k][1] if variable2bound[k][0] == "abs_error" else None - ), - "rel_error": ( - variable2bound[k][1] if variable2bound[k][0] == "rel_error" else None - ), - } - - # Return the name of the error bound that matches new_bound_format. - for bound_name, error_bound in zip(bound_names, error_bound_list): - if new_bound_format == error_bound: - return bound_name - - raise ValueError( - f"Error bounds {new_bound_format} do not match any of the error bounds " - f"{error_bound_list}." - ) + return extended_compressors def parse_error_bounds(error_bound_str: str) -> dict[str, tuple[str, float]]: @@ -187,7 +144,7 @@ def parse_error_bounds(error_bound_str: str) -> dict[str, tuple[str, float]]: return result -def compute_metrics( +def compute_compressor_metrics( compressor_metrics: Path, ds: xr.Dataset, ds_new: xr.Dataset ) -> pd.DataFrame: metrics_path = compressor_metrics / "metrics.csv" @@ -197,7 +154,15 @@ def compute_metrics( metric_list = [] for name, metric in EVALUATION_METRICS.items(): for v in ds_new: - error = metric(ds[v], ds_new[v]) + try: + error = metric(ds[v], ds_new[v]) + except Exception as e: + print( + f"Error computing metric {name} for variable {v} on " + f"{compressor_metrics.parent.name}: {e}" + ) + error = float("nan") + metric_list.append( { "Metric": name, @@ -223,7 +188,16 @@ def compute_tests( test_list = [] for name, test in PASSFAIL_TESTS.items(): for v in ds_new: - test_result, test_value = test(ds[v], ds_new[v]) + try: + test_result, test_value = test(ds[v], ds_new[v]) + except Exception as e: + print( + f"Error computing test {name} for variable {v} on " + f"{compressor_metrics.parent.name}: {e}" + ) + test_result = False + test_value = float("nan") + test_list.append( { "Test": name, @@ -327,7 +301,21 @@ def merge_metrics( if __name__ == "__main__": - collect_metrics( - basepath=Path(), - data_loader_base_path=Path() / ".." / "data-loader", + parser = argparse.ArgumentParser() + parser.add_argument("--basepath", type=Path, default=Path()) + parser.add_argument( + "--data-loader-basepath", type=Path, default=Path() / ".." / "data-loader" + ) + parser.add_argument("--exclude-dataset", type=str, nargs="+", default=[]) + parser.add_argument("--include-dataset", type=str, nargs="+", default=None) + parser.add_argument("--exclude-compressor", type=str, nargs="+", default=[]) + parser.add_argument("--include-compressor", type=str, nargs="+", default=None) + args = parser.parse_args() + compute_metrics( + basepath=args.basepath, + data_loader_basepath=args.data_loader_basepath, + exclude_dataset=args.exclude_dataset, + include_dataset=args.include_dataset, + exclude_compressor=args.exclude_compressor, + include_compressor=args.include_compressor, ) diff --git a/src/climatebenchpress/compressor/scripts/concatenate_metrics.py b/src/climatebenchpress/compressor/scripts/concatenate_metrics.py new file mode 100644 index 0000000..02459d0 --- /dev/null +++ b/src/climatebenchpress/compressor/scripts/concatenate_metrics.py @@ -0,0 +1,184 @@ +__all__ = ["concatenate_metrics"] + +import argparse +import json +from pathlib import Path +from typing import Optional + +import pandas as pd + +from .compute_metrics import parse_error_bounds + + +def concatenate_metrics(basepath: Path = Path()): + compressed_datasets = basepath / "compressed-datasets" + error_bounds_dir = basepath / "datasets-error-bounds" + metrics_dir = basepath / "metrics" + + all_results = [] + for dataset in metrics_dir.iterdir(): + if not dataset.is_dir(): + continue + + with (error_bounds_dir / dataset.name / "error_bounds.json").open() as f: + error_bound_list = json.load(f) + + for error_bound in dataset.iterdir(): + variable2error_bound = parse_error_bounds(error_bound.name) + error_bound_name = get_error_bound_name( + variable2error_bound, error_bound_list + ) + + for compressor in error_bound.iterdir(): + metrics_csv = compressor / "metrics.csv" + metrics = pd.read_csv(metrics_csv) + tests_csv = compressor / "tests.csv" + tests = pd.read_csv(tests_csv) + compressed_dataset = ( + compressed_datasets + / dataset.name + / error_bound.name + / compressor.stem + ) + measurements = load_measurements(compressed_dataset, compressor) + + df = merge_metrics(measurements, metrics, tests) + df["Dataset"] = dataset.name + df["Error Bound"] = error_bound.name + df["Error Bound Name"] = error_bound_name + all_results.append(df) + + all_results_df = pd.concat(all_results) + all_results_df.to_csv(metrics_dir / "all_results.csv", index=False) + + +def load_measurements(compressed_dataset: Path, compressor: Path) -> pd.DataFrame: + with (compressed_dataset / "measurements.json").open() as f: + measurements = json.load(f) + + rows = [] + for var, variable_measurements in measurements.items(): + rows.append( + { + "Compressor": compressor.stem, + "Variable": var, + "Compression Ratio [raw B / enc B]": variable_measurements[ + "decoded_bytes" + ] + / variable_measurements["encoded_bytes"], + "Encode Instructions [# / raw B]": ( + None + if variable_measurements["encode_instructions"] is None + else ( + variable_measurements["encode_instructions"] + / variable_measurements["decoded_bytes"] + ) + ), + "Decode Instructions [# / raw B]": ( + None + if variable_measurements["decode_instructions"] is None + else ( + variable_measurements["decode_instructions"] + / variable_measurements["decoded_bytes"] + ) + ), + "Encode Throughput [raw B / s]": ( + None + if variable_measurements["encode_timing"] == 0 + else variable_measurements["decoded_bytes"] + / variable_measurements["encode_timing"] + ), + "Decode Throughput [raw B / s]": ( + None + if variable_measurements["decode_timing"] == 0 + else variable_measurements["decoded_bytes"] + / variable_measurements["decode_timing"] + ), + } + ) + return pd.DataFrame(rows) + + +def merge_metrics( + measurements: pd.DataFrame, metrics: pd.DataFrame, tests: pd.DataFrame +) -> pd.DataFrame: + # Turn each metric/test into a column. Merge on "variable" to avoid duplicating + # the "variable" column. + test_per_variable = tests.pivot( + index="Variable", columns="Test", values=["Passed", "Value"] + ) + # mypy cannot infer that test_per_variable.columns is a MultiIndex and therefore + # gives spurious errors for this assignment. + test_per_variable.columns = [ # type: ignore + f"{metric_name} ({passed_or_val})" # type: ignore + for passed_or_val, metric_name in test_per_variable.columns # type: ignore + ] + return pd.merge( + measurements, + metrics.pivot(index="Variable", columns="Metric", values="Error") + .reset_index() + .merge( + test_per_variable.reset_index(), + on="Variable", + ), + on="Variable", + ) + + +def get_error_bound_name( + variable2bound: dict[str, tuple[str, float]], + error_bound_list: list[dict[str, dict[str, Optional[float]]]], + bound_names: list[str] = ["low", "mid", "high"], +) -> str: + """The function returns either "low", "mid", or "high" depending on which error bound + from the variable2bound dictionary matches the exact error bound in the error_bound_list. + + error_bound_list contains one dictionary for each error bound (low, mid, high). + Each of these dictionaries contains the error bounds for + each variable. The variable names in the dictionaries should exactly match the variable names + in the variable2bound dictionary. + + Parameters + ---------- + variable2bound : dict[str, tuple[str, float]] + A dictionary representing a single error bound, mapping variable names to + tuples of error type and error bound. The error type is either "abs_error" + or "rel_error", and the error bound is a float. + error_bound_list : list[dict[str, dict[str, Optional[float]]]] + A list of dictionaries, each representing an error bound (low, mid, high). + Each dictionary contains variable names as keys and a dictionary of error types + and bounds as values. + bound_names : list[str], optional + A list of names for the error bounds, by default ["low", "mid", "high"]. + """ + + # Convert the variable2bound dictionary to match the format of error_bound_list. + new_bound_format = dict() + for k in variable2bound.keys(): + new_bound_format[k] = { + "abs_error": ( + variable2bound[k][1] if variable2bound[k][0] == "abs_error" else None + ), + "rel_error": ( + variable2bound[k][1] if variable2bound[k][0] == "rel_error" else None + ), + } + + # Return the name of the error bound that matches new_bound_format. + for bound_name, error_bound in zip(bound_names, error_bound_list): + if new_bound_format == error_bound: + return bound_name + + raise ValueError( + f"Error bounds {new_bound_format} do not match any of the error bounds " + f"{error_bound_list}." + ) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--basepath", type=Path, default=Path()) + args = parser.parse_args() + + concatenate_metrics(basepath=args.basepath) + concatenate_metrics(basepath=args.basepath) diff --git a/src/climatebenchpress/compressor/scripts/create_error_bounds.py b/src/climatebenchpress/compressor/scripts/create_error_bounds.py index e64be6a..057dfba 100644 --- a/src/climatebenchpress/compressor/scripts/create_error_bounds.py +++ b/src/climatebenchpress/compressor/scripts/create_error_bounds.py @@ -1,5 +1,6 @@ __all__ = ["create_error_bounds"] +import argparse import json from dataclasses import dataclass from pathlib import Path @@ -73,9 +74,9 @@ def create_error_bounds( basepath: Path = Path(), - data_loader_base_path: None | Path = None, + data_loader_basepath: None | Path = None, ): - datasets = (data_loader_base_path or basepath) / "datasets" + datasets = (data_loader_basepath or basepath) / "datasets" datasets_error_bounds = basepath / "datasets-error-bounds" era5_error_bounds = pd.read_csv(ERROR_BOUNDS) @@ -255,7 +256,14 @@ def compute_ensemble_spread_bounds( if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Create error bounds for datasets") + parser.add_argument("--basepath", type=Path, default=Path()) + parser.add_argument( + "--data-loader-basepath", type=Path, default=Path() / ".." / "data-loader" + ) + args = parser.parse_args() + create_error_bounds( - basepath=Path(), - data_loader_base_path=Path() / ".." / "data-loader", + basepath=args.basepath, + data_loader_basepath=args.data_loader_basepath, )