Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
350 changes: 306 additions & 44 deletions src/exgentic/interfaces/cli/commands/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -924,6 +924,205 @@ def _collect_results_rows(
return rows, failures


def _collect_full_results_rows(
config_paths: list[str],
) -> tuple[list[dict[str, Any]], list[tuple[str, str]]]:
"""Load full results (including all fields) from config paths.

Unlike _collect_results_rows, this keeps all fields for EEE conversion.
"""
failures: list[tuple[str, str]] = []
rows: list[dict[str, Any]] = []

for config_path in config_paths:
try:
raw_config = _load_config_file(config_path)
run_id = raw_config.get("run_id")
if not run_id:
raise click.ClickException(f"Missing run_id in config: {config_path}")
output_dir = Path(config_path).parent
results_path = (output_dir / run_id / "results.json").resolve()
if not results_path.is_file():
raise click.ClickException(f"Results not found: {results_path}")
payload = _load_config_file(str(results_path))
if not isinstance(payload, dict):
raise click.ClickException(f"Results JSON is not an object: {results_path}")
if "benchmark_score" not in payload:
raise click.ClickException(f"Missing benchmark_score in results: {results_path}")
# Include agent/benchmark from the config for EEE metadata
payload.setdefault("agent", raw_config.get("agent"))
payload.setdefault("benchmark", raw_config.get("benchmark"))
rows.append(payload)
except Exception as exc:
failures.append((config_path, str(exc)))

return rows, failures


# Mapping from exgentic model_name to (developer, clean_model_name, model_id)
_MODEL_DEVELOPER_MAP: dict[str, str] = {
"claude": "Anthropic",
"gpt": "OpenAI",
"gemini": "Google",
}


def _parse_model_info(model_name: str) -> tuple[str, str]:
"""Extract developer and model ID from exgentic model_name like 'openai/aws/claude-opus-4-5'."""
# Strip the provider prefix (e.g. "openai/aws/" or "openai/gcp/" or "openai/Azure/")
parts = model_name.split("/")
raw_model = parts[-1] if parts else model_name

# Determine developer from the model name
developer = "unknown"
lower = raw_model.lower()
for prefix, dev in _MODEL_DEVELOPER_MAP.items():
if lower.startswith(prefix):
developer = dev
break

return developer, raw_model


def _convert_to_eee(
rows: list[dict[str, Any]],
retrieved_timestamp: str,
) -> list[dict[str, Any]]:
"""Convert exgentic results rows to Every Eval Ever EvaluationLog dicts."""
import re

eee_logs: list[dict[str, Any]] = []

for row in rows:
model_name_raw = row.get("model_name") or "unknown"
developer, model_slug = _parse_model_info(model_name_raw)
model_id = f"{developer.lower()}/{model_slug}"

benchmark = row.get("benchmark_name") or row.get("benchmark") or "unknown"
agent_name = row.get("agent_name") or row.get("agent") or "unknown"
agent_slug = re.sub(r"[^a-z0-9]+", "-", agent_name.lower()).strip("-")
subset = row.get("subset_name")

eval_name = benchmark.lower().replace(" ", "-")
if subset:
eval_name = f"{eval_name}/{subset}"

score = row.get("benchmark_score")
if score is None:
score = row.get("average_score", 0.0)

# Build score details with uncertainty from session counts
total = row.get("total_sessions")
uncertainty = None
if total and total > 0:
uncertainty = {"num_samples": total}

# Build evaluation result
eval_result: dict[str, Any] = {
"evaluation_name": eval_name,
"source_data": {
"dataset_name": eval_name,
"source_type": "url",
"url": ["https://github.com/Exgentic/exgentic"],
},
"metric_config": {
"evaluation_description": f"{benchmark} benchmark evaluation"
+ (f" ({subset} subset)" if subset else ""),
"lower_is_better": False,
"score_type": "continuous",
"min_score": 0.0,
"max_score": 1.0,
},
"score_details": {
"score": round(score, 4) if score is not None else 0.0,
},
"generation_config": {
"generation_args": {
"agentic_eval_config": {
"additional_details": {
"agent_name": agent_name,
"agent_framework": row.get("agent") or agent_slug,
},
},
},
},
}

if uncertainty:
eval_result["score_details"]["uncertainty"] = uncertainty

# Additional score details
details: dict[str, str] = {}
if row.get("average_agent_cost") is not None:
details["average_agent_cost"] = str(round(row["average_agent_cost"], 2))
if row.get("total_run_cost") is not None:
details["total_run_cost"] = str(round(row["total_run_cost"], 2))
if row.get("average_steps") is not None:
details["average_steps"] = str(round(row["average_steps"], 2))
if row.get("percent_finished") is not None:
details["percent_finished"] = str(round(row["percent_finished"], 4))
if details:
eval_result["score_details"]["details"] = details

sanitized_model_id = model_id.replace("/", "_")
evaluation_id = f"{eval_name}/{agent_slug}__{sanitized_model_id}/{retrieved_timestamp}"

eee_log: dict[str, Any] = {
"schema_version": "0.2.2",
"evaluation_id": evaluation_id,
"retrieved_timestamp": retrieved_timestamp,
"source_metadata": {
"source_name": "Exgentic Open Agent Leaderboard",
"source_type": "evaluation_run",
"source_organization_name": "Exgentic",
"source_organization_url": "https://github.com/Exgentic",
"evaluator_relationship": "third_party",
},
"eval_library": {
"name": "exgentic",
"version": "0.1.0",
},
"model_info": {
"name": model_slug,
"id": model_id,
"developer": developer,
"additional_details": {
"agent_name": agent_name,
"agent_framework": row.get("agent") or agent_slug,
},
},
"evaluation_results": [eval_result],
}

eee_logs.append(eee_log)

return eee_logs


def _save_eee_files(eee_logs: list[dict[str, Any]], output_dir: str) -> list[str]:
"""Save EEE JSON files to disk in the standard directory structure."""
import re
import uuid

saved: list[str] = []
base = Path(output_dir)

for log in eee_logs:
model_info = log.get("model_info", {})
developer = re.sub(r'[<>:"/\\|?*]', "_", model_info.get("developer", "unknown"))
model_name = re.sub(r'[<>:"/\\|?*]', "_", model_info.get("name", "unknown"))

dir_path = base / developer / model_name
dir_path.mkdir(parents=True, exist_ok=True)

filepath = dir_path / f"{uuid.uuid4()}.json"
with open(filepath, "w", encoding="utf-8") as f:
json.dump(log, f, indent=2, ensure_ascii=False)
saved.append(str(filepath))

return saved


@batch_cmd.command(
"publish",
context_settings={"allow_extra_args": True},
Expand All @@ -937,7 +1136,7 @@ def _collect_results_rows(
@click.option(
"--repo",
"repo_id",
required=True,
default=None,
help="HuggingFace dataset repo ID (e.g. 'Exgentic/open-agent-leaderboard-results').",
)
@click.option(
Expand All @@ -954,63 +1153,126 @@ def _collect_results_rows(
show_default=True,
help="Append to existing dataset or overwrite it.",
)
@click.option(
"--format",
"fmt",
type=click.Choice(["exgentic", "eee"]),
default="exgentic",
show_default=True,
help="Output format: 'exgentic' (HF dataset) or 'eee' (Every Eval Ever JSON files).",
)
@click.option(
"--output-dir",
"output_dir",
default=None,
help="Directory to save EEE JSON files locally (only used with --format eee).",
)
@click.pass_context
def batch_publish_cmd(
ctx: click.Context,
config_values: tuple[str, ...],
repo_id: str,
repo_id: str | None,
private: bool,
append: bool,
fmt: str,
output_dir: str | None,
) -> None:
"""Publish run results to a HuggingFace dataset."""
try:
from datasets import Dataset, load_dataset
except ImportError as err:
raise click.ClickException(
"The 'datasets' package is required for publishing. Install it with: pip install datasets"
) from err
"""Publish run results to a HuggingFace dataset or in Every Eval Ever format."""
import time

config_paths = _expand_config_inputs(config_values, list(ctx.args))
rows, failures = _collect_results_rows(config_paths)

if not rows:
raise click.ClickException("No valid results to publish.")
if fmt == "eee":
full_rows, failures = _collect_full_results_rows(config_paths)
if not full_rows:
raise click.ClickException("No valid results to publish.")

retrieved_timestamp = str(time.time())
eee_logs = _convert_to_eee(full_rows, retrieved_timestamp)

# Save locally if output_dir is specified
if output_dir:
saved = _save_eee_files(eee_logs, output_dir)
click.echo(f"Saved {len(saved)} EEE JSON file(s) to {output_dir}/")

# Push to HF if repo is specified
if repo_id:
try:
from datasets import Dataset, load_dataset
except ImportError as err:
raise click.ClickException(
"The 'datasets' package is required for publishing. Install it with: pip install datasets"
) from err

if append:
try:
existing_ds = load_dataset(repo_id, split="train")
existing_rows = list(existing_ds)
click.echo(f"Loaded {len(existing_rows)} existing row(s) from {repo_id}.")
all_rows = existing_rows + eee_logs
except Exception:
click.echo("No existing dataset found, creating new one.")
all_rows = eee_logs
else:
all_rows = eee_logs

ds = Dataset.from_list(all_rows)
ds.push_to_hub(repo_id, private=private)
click.echo(f"Published {len(all_rows)} EEE row(s) to https://huggingface.co/datasets/{repo_id}")

if not output_dir and not repo_id:
raise click.ClickException("Specify --output-dir and/or --repo for EEE format.")

else:
# Original exgentic format
if not repo_id:
raise click.ClickException("--repo is required for exgentic format.")

if append:
try:
existing_ds = load_dataset(repo_id, split="train")
existing_rows = list(existing_ds)
click.echo(f"Loaded {len(existing_rows)} existing row(s) from {repo_id}.")

# Deduplicate by (benchmark, agent, model) triple
existing_keys = set()
for r in existing_rows:
key = (r.get("benchmark"), r.get("agent"), r.get("model"))
existing_keys.add(key)

new_rows = []
updated = 0
for row in rows:
key = (row.get("benchmark"), row.get("agent"), row.get("model"))
if key in existing_keys:
# Replace existing row with updated one
existing_rows = [
r for r in existing_rows if (r.get("benchmark"), r.get("agent"), r.get("model")) != key
]
updated += 1
new_rows.append(row)

all_rows = existing_rows + new_rows
click.echo(f"Publishing {len(all_rows)} row(s) " f"({len(new_rows) - updated} new, {updated} updated).")
except Exception:
click.echo("No existing dataset found, creating new one.")
from datasets import Dataset, load_dataset
except ImportError as err:
raise click.ClickException(
"The 'datasets' package is required for publishing. Install it with: pip install datasets"
) from err

rows, failures = _collect_results_rows(config_paths)

if not rows:
raise click.ClickException("No valid results to publish.")

if append:
try:
existing_ds = load_dataset(repo_id, split="train")
existing_rows = list(existing_ds)
click.echo(f"Loaded {len(existing_rows)} existing row(s) from {repo_id}.")

existing_keys = set()
for r in existing_rows:
key = (r.get("benchmark"), r.get("agent"), r.get("model"))
existing_keys.add(key)

new_rows = []
updated = 0
for row in rows:
key = (row.get("benchmark"), row.get("agent"), row.get("model"))
if key in existing_keys:
existing_rows = [
r for r in existing_rows if (r.get("benchmark"), r.get("agent"), r.get("model")) != key
]
updated += 1
new_rows.append(row)

all_rows = existing_rows + new_rows
click.echo(f"Publishing {len(all_rows)} row(s) ({len(new_rows) - updated} new, {updated} updated).")
except Exception:
click.echo("No existing dataset found, creating new one.")
all_rows = rows
else:
all_rows = rows
else:
all_rows = rows

ds = Dataset.from_list(all_rows)
ds.push_to_hub(repo_id, private=private)
click.echo(f"Published {len(all_rows)} row(s) to https://huggingface.co/datasets/{repo_id}")
ds = Dataset.from_list(all_rows)
ds.push_to_hub(repo_id, private=private)
click.echo(f"Published {len(all_rows)} row(s) to https://huggingface.co/datasets/{repo_id}")

if failures:
click.echo(f"Warning: {len(failures)} config(s) had errors and were skipped:")
Expand Down
Loading