Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions docs/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,25 @@ Console Output (``output=console``)

Prints the analysis summary directly to the console. This is the **default** output.

JSON Output (``output=json``)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Saves a JSON summary report to disk. This output includes raw stats, per-view
summary metrics, per-layer metrics, and additional metrics statistics.

.. list-table::
:widths: 25 15 30 30
:header-rows: 1

* - Parameter
- Type
- Default
- Description
* - ``output.file_path``
- string
- ""
- JSON output file path. If empty, writes to ``<hydra.runtime.output_dir>/dfanalyzer_output.json``.

CSV Output (``output=csv``)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Expand Down
4 changes: 2 additions & 2 deletions python/dftracer/analyzer/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from .cluster import ClusterType, ExternalCluster
from .config import CLUSTER_RESTART_TIMEOUT_SECONDS, init_hydra_config_store
from .dftracer import DFTracerAnalyzer
from .output import ConsoleOutput, CSVOutput, SQLiteOutput
from .output import ConsoleOutput, CSVOutput, JSONOutput, SQLiteOutput
from .recorder import RecorderAnalyzer
from .types import ViewType
from .utils.log_utils import configure_logging, log_block
Expand All @@ -37,7 +37,7 @@
DarshanAnalyzer = Analyzer

AnalyzerType = Union[DarshanAnalyzer, DFTracerAnalyzer, RecorderAnalyzer]
OutputType = Union[ConsoleOutput, CSVOutput, SQLiteOutput]
OutputType = Union[ConsoleOutput, JSONOutput, CSVOutput, SQLiteOutput]


@dataclass
Expand Down
7 changes: 7 additions & 0 deletions python/dftracer/analyzer/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,12 @@ class ConsoleOutputConfig(OutputConfig):
show_header: Optional[bool] = True


@dc.dataclass
class JSONOutputConfig(OutputConfig):
_target_: str = "dftracer.analyzer.output.JSONOutput"
file_path: Optional[str] = ""


@dc.dataclass
class CSVOutputConfig(OutputConfig):
_target_: str = "dftracer.analyzer.output.CSVOutput"
Expand Down Expand Up @@ -423,6 +429,7 @@ def init_hydra_config_store() -> ConfigStore:
cs.store(group="cluster", name="pbs", node=PBSClusterConfig)
cs.store(group="cluster", name="slurm", node=SLURMClusterConfig)
cs.store(group="output", name="console", node=ConsoleOutputConfig)
cs.store(group="output", name="json", node=JSONOutputConfig)
cs.store(group="output", name="csv", node=CSVOutputConfig)
cs.store(group="output", name="sqlite", node=SQLiteOutputConfig)
return cs
166 changes: 150 additions & 16 deletions python/dftracer/analyzer/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@
import dask
import dataclasses as dc
import inflect
import json
import numpy as np
import pandas as pd
from hydra.core.hydra_config import HydraConfig
from pathlib import Path
from rich.console import Console
from rich.table import Table
from typing import Dict, List, Optional
Expand All @@ -15,6 +18,7 @@
RawStats,
ViewKey,
humanized_view_name,
view_name,
)


Expand Down Expand Up @@ -63,11 +67,16 @@ def __init__(
def handle_result(self, result: AnalyzerResultType):
raise NotImplementedError

def _create_summary(self, result: AnalyzerResultType, view_key: ViewKey) -> OutputSummary:
flat_view = result.flat_views[view_key]
def _compute_raw_stats(self, result: AnalyzerResultType) -> RawStats:
raw_stats = dask.compute(result.raw_stats)[0]
if isinstance(raw_stats, dict):
raw_stats = RawStats(**raw_stats)
return raw_stats

def _create_summary(self, result: AnalyzerResultType, view_key: ViewKey, raw_stats: Optional[RawStats] = None) -> OutputSummary:
flat_view = result.flat_views[view_key]
if raw_stats is None:
raw_stats = self._compute_raw_stats(result)
summary = OutputSummary(
job_time=float(raw_stats.job_time),
layer_metrics={},
Expand Down Expand Up @@ -132,6 +141,19 @@ def _humanized_layer_name(self, name: str) -> str:
.replace('Ssd', '(SSD)')
)

@staticmethod
def _additional_metric_scale_and_unit(metric: str):
metric_lower = metric.lower()
if metric_lower.endswith('_gbps'):
return GiB, 'GB/s'
if metric_lower.endswith('_mbps'):
return MiB, 'MB/s'
if metric_lower.endswith('_gb'):
return GiB, 'GB'
if metric_lower.endswith('_mb'):
return MiB, 'MB'
return 1.0, '-'


class ConsoleOutput(Output):
def __init__(
Expand All @@ -148,11 +170,12 @@ def __init__(
self.show_header = show_header

def handle_result(self, result: AnalyzerResultType):
raw_stats = self._compute_raw_stats(result)
print_objects = []
for view_key in result.flat_views:
if view_key[-1] not in result.view_types:
continue
summary = self._create_summary(result=result, view_key=view_key)
summary = self._create_summary(result=result, view_key=view_key, raw_stats=raw_stats)
summary_table = self._create_summary_table(summary=summary, view_key=view_key)
layer_breakdown_table = self._create_layer_breakdown_table(summary=summary, view_key=view_key)
print_objects.append(summary_table)
Expand Down Expand Up @@ -280,19 +303,6 @@ def _create_additional_metrics_table(self, result: AnalyzerResultType, view_key:
return None
return additional_table

@staticmethod
def _additional_metric_scale_and_unit(metric: str):
metric_lower = metric.lower()
if metric_lower.endswith('_gbps'):
return GiB, 'GB/s'
if metric_lower.endswith('_mbps'):
return MiB, 'MB/s'
if metric_lower.endswith('_gb'):
return GiB, 'GB'
if metric_lower.endswith('_mb'):
return MiB, 'MB'
return 1.0, '-'

def _format_val(self, value: float, fmt_int=False) -> str:
if value is None or value == 0:
return '-'
Expand Down Expand Up @@ -324,6 +334,130 @@ def _percentage_color(self, percentage: float) -> str:
return f"#{int(r * 255):02x}{int(g * 255):02x}{int(b * 255):02x}"


class JSONOutput(Output):
def __init__(
self,
compact: bool = False,
file_path: str = "",
name: str = "",
root_only: bool = False,
view_names: List[str] = [],
):
super().__init__(compact, name, root_only, view_names)
self.file_path = file_path

def handle_result(self, result: AnalyzerResultType):
raw_stats = self._compute_raw_stats(result)
output = {
"schema_version": "1",
"raw_stats": self._create_raw_stats(raw_stats=raw_stats),
"views": {},
}
for view_key in result.flat_views:
if view_key[-1] not in result.view_types:
continue
summary = self._create_summary(result=result, view_key=view_key, raw_stats=raw_stats)
output["views"][view_name(view_key, separator="/")] = {
"summary": self._create_summary_payload(summary=summary),
"additional_metrics": self._create_additional_metrics_payload(result=result, view_key=view_key),
}

output_path = self._resolve_output_path()
output_path.parent.mkdir(parents=True, exist_ok=True)
with output_path.open("w", encoding="utf-8") as f:
json.dump(output, f, indent=2, allow_nan=False)
f.write("\n")

def _resolve_output_path(self) -> Path:
if self.file_path:
return Path(self.file_path)
try:
output_dir = HydraConfig.get().runtime.output_dir
except Exception:
output_dir = "."
return Path(output_dir) / "dfanalyzer_output.json"

@staticmethod
def _to_int_or_none(value):
if value is None or pd.isna(value):
return None
return int(value)

@staticmethod
def _to_float_or_none(value):
if value is None or pd.isna(value):
return None
return float(value)

def _create_raw_stats(self, raw_stats: RawStats):
return {
"job_time_s": self._to_float_or_none(raw_stats.job_time),
"time_granularity_s": self._to_float_or_none(raw_stats.time_granularity),
"time_resolution_ns": self._to_int_or_none(raw_stats.time_resolution),
"total_event_count": self._to_int_or_none(raw_stats.total_event_count),
"unique_file_count": self._to_int_or_none(raw_stats.unique_file_count),
"unique_host_count": self._to_int_or_none(raw_stats.unique_host_count),
"unique_process_count": self._to_int_or_none(raw_stats.unique_process_count),
}

def _create_summary_payload(self, summary: OutputSummary):
summary_payload = {
"job_time_s": self._to_float_or_none(summary.job_time),
"total_event_count": self._to_int_or_none(summary.total_event_count),
"unique_file_count": self._to_int_or_none(summary.unique_file_count),
"unique_host_count": self._to_int_or_none(summary.unique_host_count),
"unique_process_count": self._to_int_or_none(summary.unique_process_count),
"time_granularity_s": self._to_float_or_none(summary.time_granularity),
"time_resolution_ns": self._to_int_or_none(summary.time_resolution),
"layers": {},
}
for layer in summary.layers:
metrics = summary.layer_metrics[layer]
summary_payload["layers"][layer] = {
"time_s": self._to_float_or_none(metrics.time),
"count": self._to_int_or_none(metrics.count),
"size_bytes": self._to_float_or_none(metrics.size),
"ops_per_s": self._to_float_or_none(metrics.ops),
"bandwidth_bps": self._to_float_or_none(metrics.bandwidth),
"num_files": self._to_int_or_none(metrics.num_files),
"num_processes": self._to_int_or_none(metrics.num_processes),
"u_time_s": self._to_float_or_none(metrics.u_time),
"u_count": self._to_int_or_none(metrics.u_count),
"u_size_bytes": self._to_float_or_none(metrics.u_size),
}
return summary_payload

def _create_additional_metrics_payload(self, result: AnalyzerResultType, view_key: ViewKey):
payload = {}
flat_view = result.flat_views[view_key]
view_type = view_key[-1]
view_additional_metrics = result.additional_metrics.get(view_type, [])
for metric in view_additional_metrics:
if metric not in flat_view.columns:
continue
metric_series = pd.to_numeric(flat_view[metric], errors='coerce').replace([np.inf, -np.inf], np.nan)
scale, unit = self._additional_metric_scale_and_unit(metric)
metric_series = metric_series / scale
non_null = int(metric_series.notna().sum())
metric_payload = {
"unit": unit,
"non_null": non_null,
"min": None,
"mean": None,
"max": None,
}
if non_null > 0:
metric_payload.update(
{
"min": float(metric_series.min()),
"mean": float(metric_series.mean()),
"max": float(metric_series.max()),
}
)
payload[metric] = metric_payload
return payload


class CSVOutput(Output):
def handle_result(self, result: AnalyzerResultType):
raise NotImplementedError("CSVOutput is not implemented yet.")
Expand Down
44 changes: 44 additions & 0 deletions tests/test_e2e.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import os
import pathlib
import pytest
Expand Down Expand Up @@ -60,6 +61,49 @@ def test_e2e_smoke(
_test_e2e(analyzer, preset, trace_path, checkpoint, tmp_path, dask_cluster)


@pytest.mark.smoke
def test_json_output_file(tmp_path: pathlib.Path, dask_cluster: LocalCluster) -> None:
"""Verify JSON output file is created with the expected schema and views."""
checkpoint_dir = f"{tmp_path}/checkpoints"
scheduler_address = dask_cluster.scheduler_address
output_path = tmp_path / "analysis.json"
hydra_overrides = [
"analyzer=dftracer",
"analyzer/preset=posix",
"analyzer.checkpoint=False",
f"analyzer.checkpoint_dir={checkpoint_dir}",
"cluster=external",
"cluster.restart_on_connect=True",
f"cluster.scheduler_address={scheduler_address}",
"output=json",
f"output.file_path={output_path}",
f"hydra.run.dir={tmp_path}",
f"hydra.runtime.output_dir={tmp_path}",
"trace_path=tests/data/extracted/dftracer-posix",
"view_types=[time_range,proc_name]",
]

dfa = init_with_hydra(hydra_overrides=hydra_overrides)
result = dfa.analyze_trace()
dfa.output.handle_result(result)

assert output_path.exists(), f"Expected JSON output at {output_path}"
with output_path.open() as f:
payload = json.load(f)

assert payload["schema_version"] == "1"
assert "raw_stats" in payload
assert "views" in payload
assert "time_range" in payload["views"]
assert "proc_name" in payload["views"]
assert "summary" in payload["views"]["time_range"]
assert "additional_metrics" in payload["views"]["time_range"]
assert "flat_views" not in payload

dfa.shutdown()
assert dfa.client.status == "closed", "Dask client should be closed after shutdown"


def _test_e2e(
analyzer: str,
preset: str,
Expand Down