Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,15 @@
# Get COHERE_API_KEY at: https://dashboard.cohere.com/api-keys
# COHERE_API_KEY=... # Optional: improves ranking by ~7%

# ─── OPTIONAL: Cost estimator pricing (USD per 1M tokens) ───
# Used by end-of-run terminal cost summary.
NEURICO_COST_CLAUDE_INPUT_PER_1M=15
NEURICO_COST_CLAUDE_OUTPUT_PER_1M=75
NEURICO_COST_CODEX_INPUT_PER_1M=5
NEURICO_COST_CODEX_OUTPUT_PER_1M=15
NEURICO_COST_GEMINI_INPUT_PER_1M=1.25
NEURICO_COST_GEMINI_OUTPUT_PER_1M=5

# ===========================================
# SETUP TIERS
# ===========================================
Expand Down
210 changes: 210 additions & 0 deletions src/core/cost_estimator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
"""
Cost estimation utilities for NeuriCo runs.

This module reads provider pricing from environment variables, extracts
token-usage hints from transcript JSONL files, and computes an estimated cost.
"""

from __future__ import annotations

from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, List, Optional
import json
import os


PROVIDERS = ("claude", "codex", "gemini")


@dataclass
class ProviderPricing:
"""Per-provider pricing, USD per 1M tokens."""

provider: str
input_per_1m: Optional[float]
output_per_1m: Optional[float]


@dataclass
class UsageTotals:
"""Extracted usage totals for a run."""

input_tokens: int = 0
output_tokens: int = 0
total_tokens: int = 0


def _float_env(name: str) -> Optional[float]:
"""Read float env var, returning None if missing/invalid."""
raw = os.getenv(name)
if raw is None or raw.strip() == "":
return None
try:
value = float(raw)
except ValueError:
return None
if value < 0:
return None
return value


def load_pricing_table() -> Dict[str, ProviderPricing]:
"""
Load pricing from environment variables.

Required format per provider:
- NEURICO_COST_<PROVIDER>_INPUT_PER_1M
- NEURICO_COST_<PROVIDER>_OUTPUT_PER_1M
"""
table: Dict[str, ProviderPricing] = {}
for provider in PROVIDERS:
upper = provider.upper()
table[provider] = ProviderPricing(
provider=provider,
input_per_1m=_float_env(f"NEURICO_COST_{upper}_INPUT_PER_1M"),
output_per_1m=_float_env(f"NEURICO_COST_{upper}_OUTPUT_PER_1M"),
)
return table


def _as_int(value: Any) -> Optional[int]:
"""Convert supported numeric/string values to non-negative int."""
if value is None:
return None
if isinstance(value, bool):
return None
if isinstance(value, (int, float)):
iv = int(value)
return iv if iv >= 0 else None
if isinstance(value, str):
stripped = value.strip()
if not stripped:
return None
if stripped.isdigit():
return int(stripped)
return None


def _find_usage_dicts(value: Any, out: List[Dict[str, Any]]) -> None:
"""Recursively collect dict nodes that look like token-usage blocks."""
if isinstance(value, list):
for item in value:
_find_usage_dicts(item, out)
return

if not isinstance(value, dict):
return

usage_keys = {
"usage",
"input_tokens",
"output_tokens",
"prompt_tokens",
"completion_tokens",
"total_tokens",
}
if any(k in value for k in usage_keys):
out.append(value)

for nested in value.values():
_find_usage_dicts(nested, out)


def _normalize_usage_block(block: Dict[str, Any]) -> UsageTotals:
"""Normalize a usage block across provider-specific key names."""
# Some events wrap usage under "usage".
if isinstance(block.get("usage"), dict):
return _normalize_usage_block(block["usage"])

input_tokens = (
_as_int(block.get("input_tokens"))
or _as_int(block.get("prompt_tokens"))
or 0
)
output_tokens = (
_as_int(block.get("output_tokens"))
or _as_int(block.get("completion_tokens"))
or 0
)
total_tokens = (
_as_int(block.get("total_tokens"))
or (input_tokens + output_tokens)
)

return UsageTotals(
input_tokens=input_tokens,
output_tokens=output_tokens,
total_tokens=total_tokens,
)


def extract_usage_from_transcript(transcript_path: Path) -> UsageTotals:
"""
Extract best-effort usage totals from a transcript JSONL file.

Strategy:
- Parse each JSON line.
- Recursively find usage-like dicts.
- Use max observed usage block values (common in streaming events).
"""
path = Path(transcript_path)
if not path.exists():
return UsageTotals()

max_input = 0
max_output = 0
max_total = 0

with open(path, "r", encoding="utf-8", errors="replace") as f:
for line in f:
stripped = line.strip()
if not stripped:
continue
try:
payload = json.loads(stripped)
except json.JSONDecodeError:
continue

usage_blocks: List[Dict[str, Any]] = []
_find_usage_dicts(payload, usage_blocks)
for block in usage_blocks:
normalized = _normalize_usage_block(block)
max_input = max(max_input, normalized.input_tokens)
max_output = max(max_output, normalized.output_tokens)
max_total = max(max_total, normalized.total_tokens)
# NOTE:
# We assume streaming transcripts include cumulative usage counters
# and take the maximum observed values per file.
# This may undercount if transcripts emit only incremental usage.

if max_total == 0:
max_total = max_input + max_output

return UsageTotals(
input_tokens=max_input,
output_tokens=max_output,
total_tokens=max_total,
)


def aggregate_usage(transcript_files: List[Path]) -> UsageTotals:
"""Aggregate usage totals across multiple transcript files."""
total = UsageTotals()
for file_path in transcript_files:
usage = extract_usage_from_transcript(file_path)
total.input_tokens += usage.input_tokens
total.output_tokens += usage.output_tokens
total.total_tokens += usage.total_tokens
if total.total_tokens == 0:
total.total_tokens = total.input_tokens + total.output_tokens
return total


def estimate_cost_usd(usage: UsageTotals, pricing: ProviderPricing) -> Optional[float]:
"""Estimate USD cost from usage and pricing; returns None if unavailable."""
if pricing.input_per_1m is None or pricing.output_per_1m is None:
return None
input_cost = (usage.input_tokens / 1_000_000) * pricing.input_per_1m
output_cost = (usage.output_tokens / 1_000_000) * pricing.output_per_1m
return input_cost + output_cost
94 changes: 93 additions & 1 deletion src/core/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@
from core.idea_manager import IdeaManager
from core.config_loader import ConfigLoader
from core.security import sanitize_text
from core.cost_estimator import (
PROVIDERS,
aggregate_usage,
estimate_cost_usd,
load_pricing_table,
)
from templates.prompt_generator import PromptGenerator
from templates.research_agent_instructions import generate_instructions

Expand Down Expand Up @@ -344,6 +350,7 @@ def run_research(self, idea_id: str,
self._finalize_research(idea_id, work_dir, github_url, title, provider, success)

# Return result info
self._print_terminal_cost_summary(work_dir=work_dir, provider=provider)
return {
'work_dir': work_dir,
'github_url': github_url,
Expand Down Expand Up @@ -402,6 +409,7 @@ def run_research(self, idea_id: str,

# Prepare command
log_file = work_dir / "logs" / f"execution_{provider}.log"
transcript_file = work_dir / "logs" / f"execution_{provider}_transcript.jsonl"

# Build command - raw CLI by default, scribe if requested
if use_scribe:
Expand All @@ -428,13 +436,14 @@ def run_research(self, idea_id: str,

print(f" Command: {cmd}")
print(f" Log file: {log_file}")
print(f" Transcript: {transcript_file}")
print()
print("=" * 80)
print("AGENT OUTPUT (streaming)")
print("=" * 80)
print()

with open(log_file, 'w') as log_f:
with open(log_file, 'w') as log_f, open(transcript_file, 'w') as transcript_f:
# Start process in workspace directory
process = subprocess.Popen(
shlex.split(cmd),
Expand All @@ -457,6 +466,7 @@ def run_research(self, idea_id: str,
sanitized_line = sanitize_text(line)
print(sanitized_line, end='')
log_f.write(sanitized_line)
transcript_f.write(sanitized_line)

# Wait for completion
return_code = process.wait(timeout=timeout)
Expand Down Expand Up @@ -523,6 +533,7 @@ def run_research(self, idea_id: str,
print(f" GitHub: {github_url}")

# Return result info
self._print_terminal_cost_summary(work_dir=work_dir, provider=provider)
return {
'work_dir': work_dir,
'github_url': github_url,
Expand Down Expand Up @@ -647,12 +658,93 @@ def run_comment_mode(
print(f"Warning: Failed to push to GitHub: {e}")
print(" Changes are available locally")

self._print_terminal_cost_summary(work_dir=work_dir, provider=provider)
return {
'work_dir': work_dir,
'github_url': github_url,
'success': result['success']
}

def _collect_transcript_files(self, work_dir: Path) -> List[Path]:
"""Collect transcript files generated during a run."""
logs_dir = Path(work_dir) / "logs"
if not logs_dir.exists():
return []
return sorted(logs_dir.glob("*_transcript.jsonl"))

def _print_terminal_cost_summary(self, work_dir: Path, provider: str) -> None:
"""
Print provider pricing table and estimated run cost.

Pricing is read from environment variables:
- NEURICO_COST_CLAUDE_INPUT_PER_1M
- NEURICO_COST_CLAUDE_OUTPUT_PER_1M
- NEURICO_COST_CODEX_INPUT_PER_1M
- NEURICO_COST_CODEX_OUTPUT_PER_1M
- NEURICO_COST_GEMINI_INPUT_PER_1M
- NEURICO_COST_GEMINI_OUTPUT_PER_1M
"""
all_transcript_files = self._collect_transcript_files(work_dir)
if not all_transcript_files:
print("\n⚠️ No transcript files found, skipping cost summary.")
return

# NOTE: assumes single-provider run; may extend to multi-provider later.
transcript_files = [
path for path in all_transcript_files
if path.name.endswith(f"_{provider}_transcript.jsonl")
]
if not transcript_files:
print(
f"\n⚠️ No transcript files found for provider '{provider}', "
"skipping cost summary."
)
return

usage = aggregate_usage(transcript_files)
pricing_table = load_pricing_table()

# Build a compact fixed-width table for terminal output.
header = (
f"{'Provider':<10} {'Input/M':>10} {'Output/M':>10} "
f"{'InTok':>12} {'OutTok':>12} {'Est USD':>12}"
)
print()
print("=" * 80)
print("COST ESTIMATOR")
print("=" * 80)
print(header)
print("-" * len(header))

for row_provider in PROVIDERS:
pricing = pricing_table[row_provider]
in_rate = "N/A" if pricing.input_per_1m is None else f"{pricing.input_per_1m:.4f}"
out_rate = "N/A" if pricing.output_per_1m is None else f"{pricing.output_per_1m:.4f}"

if row_provider == provider:
in_tok = usage.input_tokens
out_tok = usage.output_tokens
est_cost = estimate_cost_usd(usage, pricing)
est_cost_text = "N/A" if est_cost is None else f"{est_cost:.6f}"
else:
in_tok = 0
out_tok = 0
est_cost_text = "-"

print(
f"{row_provider:<10} {in_rate:>10} {out_rate:>10} "
f"{in_tok:>12} {out_tok:>12} {est_cost_text:>12}"
)

print("-" * len(header))
print(f"Transcripts scanned: {len(transcript_files)}")
print(f"Run provider: {provider}")
print(
"Pricing env vars: NEURICO_COST_<PROVIDER>_INPUT_PER_1M / "
"NEURICO_COST_<PROVIDER>_OUTPUT_PER_1M"
)
print("=" * 80)

def _copy_workspace_resources(self, work_dir: Path):
"""
Copy helper scripts and resources to workspace.
Expand Down
Loading