diff --git a/docs/configuration.rst b/docs/configuration.rst index 247c0de..037a875 100644 --- a/docs/configuration.rst +++ b/docs/configuration.rst @@ -223,6 +223,25 @@ Console Output (``output=console``) Prints the analysis summary directly to the console. This is the **default** output. +JSON Output (``output=json``) +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Saves a JSON summary report to disk. This output includes raw stats, per-view +summary metrics, per-layer metrics, and additional metrics statistics. + +.. list-table:: + :widths: 25 15 30 30 + :header-rows: 1 + + * - Parameter + - Type + - Default + - Description + * - ``output.file_path`` + - string + - "" + - JSON output file path. If empty, writes to ``/dfanalyzer_output.json``. + CSV Output (``output=csv``) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/python/dftracer/analyzer/__init__.py b/python/dftracer/analyzer/__init__.py index 5b0ab24..f35704b 100644 --- a/python/dftracer/analyzer/__init__.py +++ b/python/dftracer/analyzer/__init__.py @@ -13,7 +13,7 @@ from .cluster import ClusterType, ExternalCluster from .config import CLUSTER_RESTART_TIMEOUT_SECONDS, init_hydra_config_store from .dftracer import DFTracerAnalyzer -from .output import ConsoleOutput, CSVOutput, SQLiteOutput +from .output import ConsoleOutput, CSVOutput, JSONOutput, SQLiteOutput from .recorder import RecorderAnalyzer from .types import ViewType from .utils.log_utils import configure_logging, log_block @@ -37,7 +37,7 @@ DarshanAnalyzer = Analyzer AnalyzerType = Union[DarshanAnalyzer, DFTracerAnalyzer, RecorderAnalyzer] -OutputType = Union[ConsoleOutput, CSVOutput, SQLiteOutput] +OutputType = Union[ConsoleOutput, JSONOutput, CSVOutput, SQLiteOutput] @dataclass diff --git a/python/dftracer/analyzer/config.py b/python/dftracer/analyzer/config.py index 36b50db..0bca952 100644 --- a/python/dftracer/analyzer/config.py +++ b/python/dftracer/analyzer/config.py @@ -334,6 +334,12 @@ class ConsoleOutputConfig(OutputConfig): show_header: Optional[bool] = True +@dc.dataclass +class JSONOutputConfig(OutputConfig): + _target_: str = "dftracer.analyzer.output.JSONOutput" + file_path: Optional[str] = "" + + @dc.dataclass class CSVOutputConfig(OutputConfig): _target_: str = "dftracer.analyzer.output.CSVOutput" @@ -423,6 +429,7 @@ def init_hydra_config_store() -> ConfigStore: cs.store(group="cluster", name="pbs", node=PBSClusterConfig) cs.store(group="cluster", name="slurm", node=SLURMClusterConfig) cs.store(group="output", name="console", node=ConsoleOutputConfig) + cs.store(group="output", name="json", node=JSONOutputConfig) cs.store(group="output", name="csv", node=CSVOutputConfig) cs.store(group="output", name="sqlite", node=SQLiteOutputConfig) return cs diff --git a/python/dftracer/analyzer/output.py b/python/dftracer/analyzer/output.py index 2aa1713..8f9c129 100644 --- a/python/dftracer/analyzer/output.py +++ b/python/dftracer/analyzer/output.py @@ -3,8 +3,11 @@ import dask import dataclasses as dc import inflect +import json import numpy as np import pandas as pd +from hydra.core.hydra_config import HydraConfig +from pathlib import Path from rich.console import Console from rich.table import Table from typing import Dict, List, Optional @@ -15,6 +18,7 @@ RawStats, ViewKey, humanized_view_name, + view_name, ) @@ -63,11 +67,16 @@ def __init__( def handle_result(self, result: AnalyzerResultType): raise NotImplementedError - def _create_summary(self, result: AnalyzerResultType, view_key: ViewKey) -> OutputSummary: - flat_view = result.flat_views[view_key] + def _compute_raw_stats(self, result: AnalyzerResultType) -> RawStats: raw_stats = dask.compute(result.raw_stats)[0] if isinstance(raw_stats, dict): raw_stats = RawStats(**raw_stats) + return raw_stats + + def _create_summary(self, result: AnalyzerResultType, view_key: ViewKey, raw_stats: Optional[RawStats] = None) -> OutputSummary: + flat_view = result.flat_views[view_key] + if raw_stats is None: + raw_stats = self._compute_raw_stats(result) summary = OutputSummary( job_time=float(raw_stats.job_time), layer_metrics={}, @@ -132,6 +141,19 @@ def _humanized_layer_name(self, name: str) -> str: .replace('Ssd', '(SSD)') ) + @staticmethod + def _additional_metric_scale_and_unit(metric: str): + metric_lower = metric.lower() + if metric_lower.endswith('_gbps'): + return GiB, 'GB/s' + if metric_lower.endswith('_mbps'): + return MiB, 'MB/s' + if metric_lower.endswith('_gb'): + return GiB, 'GB' + if metric_lower.endswith('_mb'): + return MiB, 'MB' + return 1.0, '-' + class ConsoleOutput(Output): def __init__( @@ -148,11 +170,12 @@ def __init__( self.show_header = show_header def handle_result(self, result: AnalyzerResultType): + raw_stats = self._compute_raw_stats(result) print_objects = [] for view_key in result.flat_views: if view_key[-1] not in result.view_types: continue - summary = self._create_summary(result=result, view_key=view_key) + summary = self._create_summary(result=result, view_key=view_key, raw_stats=raw_stats) summary_table = self._create_summary_table(summary=summary, view_key=view_key) layer_breakdown_table = self._create_layer_breakdown_table(summary=summary, view_key=view_key) print_objects.append(summary_table) @@ -280,19 +303,6 @@ def _create_additional_metrics_table(self, result: AnalyzerResultType, view_key: return None return additional_table - @staticmethod - def _additional_metric_scale_and_unit(metric: str): - metric_lower = metric.lower() - if metric_lower.endswith('_gbps'): - return GiB, 'GB/s' - if metric_lower.endswith('_mbps'): - return MiB, 'MB/s' - if metric_lower.endswith('_gb'): - return GiB, 'GB' - if metric_lower.endswith('_mb'): - return MiB, 'MB' - return 1.0, '-' - def _format_val(self, value: float, fmt_int=False) -> str: if value is None or value == 0: return '-' @@ -324,6 +334,130 @@ def _percentage_color(self, percentage: float) -> str: return f"#{int(r * 255):02x}{int(g * 255):02x}{int(b * 255):02x}" +class JSONOutput(Output): + def __init__( + self, + compact: bool = False, + file_path: str = "", + name: str = "", + root_only: bool = False, + view_names: List[str] = [], + ): + super().__init__(compact, name, root_only, view_names) + self.file_path = file_path + + def handle_result(self, result: AnalyzerResultType): + raw_stats = self._compute_raw_stats(result) + output = { + "schema_version": "1", + "raw_stats": self._create_raw_stats(raw_stats=raw_stats), + "views": {}, + } + for view_key in result.flat_views: + if view_key[-1] not in result.view_types: + continue + summary = self._create_summary(result=result, view_key=view_key, raw_stats=raw_stats) + output["views"][view_name(view_key, separator="/")] = { + "summary": self._create_summary_payload(summary=summary), + "additional_metrics": self._create_additional_metrics_payload(result=result, view_key=view_key), + } + + output_path = self._resolve_output_path() + output_path.parent.mkdir(parents=True, exist_ok=True) + with output_path.open("w", encoding="utf-8") as f: + json.dump(output, f, indent=2, allow_nan=False) + f.write("\n") + + def _resolve_output_path(self) -> Path: + if self.file_path: + return Path(self.file_path) + try: + output_dir = HydraConfig.get().runtime.output_dir + except Exception: + output_dir = "." + return Path(output_dir) / "dfanalyzer_output.json" + + @staticmethod + def _to_int_or_none(value): + if value is None or pd.isna(value): + return None + return int(value) + + @staticmethod + def _to_float_or_none(value): + if value is None or pd.isna(value): + return None + return float(value) + + def _create_raw_stats(self, raw_stats: RawStats): + return { + "job_time_s": self._to_float_or_none(raw_stats.job_time), + "time_granularity_s": self._to_float_or_none(raw_stats.time_granularity), + "time_resolution_ns": self._to_int_or_none(raw_stats.time_resolution), + "total_event_count": self._to_int_or_none(raw_stats.total_event_count), + "unique_file_count": self._to_int_or_none(raw_stats.unique_file_count), + "unique_host_count": self._to_int_or_none(raw_stats.unique_host_count), + "unique_process_count": self._to_int_or_none(raw_stats.unique_process_count), + } + + def _create_summary_payload(self, summary: OutputSummary): + summary_payload = { + "job_time_s": self._to_float_or_none(summary.job_time), + "total_event_count": self._to_int_or_none(summary.total_event_count), + "unique_file_count": self._to_int_or_none(summary.unique_file_count), + "unique_host_count": self._to_int_or_none(summary.unique_host_count), + "unique_process_count": self._to_int_or_none(summary.unique_process_count), + "time_granularity_s": self._to_float_or_none(summary.time_granularity), + "time_resolution_ns": self._to_int_or_none(summary.time_resolution), + "layers": {}, + } + for layer in summary.layers: + metrics = summary.layer_metrics[layer] + summary_payload["layers"][layer] = { + "time_s": self._to_float_or_none(metrics.time), + "count": self._to_int_or_none(metrics.count), + "size_bytes": self._to_float_or_none(metrics.size), + "ops_per_s": self._to_float_or_none(metrics.ops), + "bandwidth_bps": self._to_float_or_none(metrics.bandwidth), + "num_files": self._to_int_or_none(metrics.num_files), + "num_processes": self._to_int_or_none(metrics.num_processes), + "u_time_s": self._to_float_or_none(metrics.u_time), + "u_count": self._to_int_or_none(metrics.u_count), + "u_size_bytes": self._to_float_or_none(metrics.u_size), + } + return summary_payload + + def _create_additional_metrics_payload(self, result: AnalyzerResultType, view_key: ViewKey): + payload = {} + flat_view = result.flat_views[view_key] + view_type = view_key[-1] + view_additional_metrics = result.additional_metrics.get(view_type, []) + for metric in view_additional_metrics: + if metric not in flat_view.columns: + continue + metric_series = pd.to_numeric(flat_view[metric], errors='coerce').replace([np.inf, -np.inf], np.nan) + scale, unit = self._additional_metric_scale_and_unit(metric) + metric_series = metric_series / scale + non_null = int(metric_series.notna().sum()) + metric_payload = { + "unit": unit, + "non_null": non_null, + "min": None, + "mean": None, + "max": None, + } + if non_null > 0: + metric_payload.update( + { + "min": float(metric_series.min()), + "mean": float(metric_series.mean()), + "max": float(metric_series.max()), + } + ) + payload[metric] = metric_payload + return payload + + class CSVOutput(Output): def handle_result(self, result: AnalyzerResultType): raise NotImplementedError("CSVOutput is not implemented yet.") diff --git a/tests/test_e2e.py b/tests/test_e2e.py index 32c20c8..0f379d4 100644 --- a/tests/test_e2e.py +++ b/tests/test_e2e.py @@ -1,3 +1,4 @@ +import json import os import pathlib import pytest @@ -60,6 +61,49 @@ def test_e2e_smoke( _test_e2e(analyzer, preset, trace_path, checkpoint, tmp_path, dask_cluster) +@pytest.mark.smoke +def test_json_output_file(tmp_path: pathlib.Path, dask_cluster: LocalCluster) -> None: + """Verify JSON output file is created with the expected schema and views.""" + checkpoint_dir = f"{tmp_path}/checkpoints" + scheduler_address = dask_cluster.scheduler_address + output_path = tmp_path / "analysis.json" + hydra_overrides = [ + "analyzer=dftracer", + "analyzer/preset=posix", + "analyzer.checkpoint=False", + f"analyzer.checkpoint_dir={checkpoint_dir}", + "cluster=external", + "cluster.restart_on_connect=True", + f"cluster.scheduler_address={scheduler_address}", + "output=json", + f"output.file_path={output_path}", + f"hydra.run.dir={tmp_path}", + f"hydra.runtime.output_dir={tmp_path}", + "trace_path=tests/data/extracted/dftracer-posix", + "view_types=[time_range,proc_name]", + ] + + dfa = init_with_hydra(hydra_overrides=hydra_overrides) + result = dfa.analyze_trace() + dfa.output.handle_result(result) + + assert output_path.exists(), f"Expected JSON output at {output_path}" + with output_path.open() as f: + payload = json.load(f) + + assert payload["schema_version"] == "1" + assert "raw_stats" in payload + assert "views" in payload + assert "time_range" in payload["views"] + assert "proc_name" in payload["views"] + assert "summary" in payload["views"]["time_range"] + assert "additional_metrics" in payload["views"]["time_range"] + assert "flat_views" not in payload + + dfa.shutdown() + assert dfa.client.status == "closed", "Dask client should be closed after shutdown" + + def _test_e2e( analyzer: str, preset: str,