diff --git a/.env.example b/.env.example index 12b9d89..78d5d95 100644 --- a/.env.example +++ b/.env.example @@ -36,6 +36,15 @@ # Get COHERE_API_KEY at: https://dashboard.cohere.com/api-keys # COHERE_API_KEY=... # Optional: improves ranking by ~7% +# ─── OPTIONAL: Cost estimator pricing (USD per 1M tokens) ─── +# Used by end-of-run terminal cost summary. +NEURICO_COST_CLAUDE_INPUT_PER_1M=15 +NEURICO_COST_CLAUDE_OUTPUT_PER_1M=75 +NEURICO_COST_CODEX_INPUT_PER_1M=5 +NEURICO_COST_CODEX_OUTPUT_PER_1M=15 +NEURICO_COST_GEMINI_INPUT_PER_1M=1.25 +NEURICO_COST_GEMINI_OUTPUT_PER_1M=5 + # =========================================== # SETUP TIERS # =========================================== diff --git a/src/core/cost_estimator.py b/src/core/cost_estimator.py new file mode 100644 index 0000000..487916e --- /dev/null +++ b/src/core/cost_estimator.py @@ -0,0 +1,210 @@ +""" +Cost estimation utilities for NeuriCo runs. + +This module reads provider pricing from environment variables, extracts +token-usage hints from transcript JSONL files, and computes an estimated cost. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path +from typing import Any, Dict, List, Optional +import json +import os + + +PROVIDERS = ("claude", "codex", "gemini") + + +@dataclass +class ProviderPricing: + """Per-provider pricing, USD per 1M tokens.""" + + provider: str + input_per_1m: Optional[float] + output_per_1m: Optional[float] + + +@dataclass +class UsageTotals: + """Extracted usage totals for a run.""" + + input_tokens: int = 0 + output_tokens: int = 0 + total_tokens: int = 0 + + +def _float_env(name: str) -> Optional[float]: + """Read float env var, returning None if missing/invalid.""" + raw = os.getenv(name) + if raw is None or raw.strip() == "": + return None + try: + value = float(raw) + except ValueError: + return None + if value < 0: + return None + return value + + +def load_pricing_table() -> Dict[str, ProviderPricing]: + """ + Load pricing from environment variables. + + Required format per provider: + - NEURICO_COST__INPUT_PER_1M + - NEURICO_COST__OUTPUT_PER_1M + """ + table: Dict[str, ProviderPricing] = {} + for provider in PROVIDERS: + upper = provider.upper() + table[provider] = ProviderPricing( + provider=provider, + input_per_1m=_float_env(f"NEURICO_COST_{upper}_INPUT_PER_1M"), + output_per_1m=_float_env(f"NEURICO_COST_{upper}_OUTPUT_PER_1M"), + ) + return table + + +def _as_int(value: Any) -> Optional[int]: + """Convert supported numeric/string values to non-negative int.""" + if value is None: + return None + if isinstance(value, bool): + return None + if isinstance(value, (int, float)): + iv = int(value) + return iv if iv >= 0 else None + if isinstance(value, str): + stripped = value.strip() + if not stripped: + return None + if stripped.isdigit(): + return int(stripped) + return None + + +def _find_usage_dicts(value: Any, out: List[Dict[str, Any]]) -> None: + """Recursively collect dict nodes that look like token-usage blocks.""" + if isinstance(value, list): + for item in value: + _find_usage_dicts(item, out) + return + + if not isinstance(value, dict): + return + + usage_keys = { + "usage", + "input_tokens", + "output_tokens", + "prompt_tokens", + "completion_tokens", + "total_tokens", + } + if any(k in value for k in usage_keys): + out.append(value) + + for nested in value.values(): + _find_usage_dicts(nested, out) + + +def _normalize_usage_block(block: Dict[str, Any]) -> UsageTotals: + """Normalize a usage block across provider-specific key names.""" + # Some events wrap usage under "usage". + if isinstance(block.get("usage"), dict): + return _normalize_usage_block(block["usage"]) + + input_tokens = ( + _as_int(block.get("input_tokens")) + or _as_int(block.get("prompt_tokens")) + or 0 + ) + output_tokens = ( + _as_int(block.get("output_tokens")) + or _as_int(block.get("completion_tokens")) + or 0 + ) + total_tokens = ( + _as_int(block.get("total_tokens")) + or (input_tokens + output_tokens) + ) + + return UsageTotals( + input_tokens=input_tokens, + output_tokens=output_tokens, + total_tokens=total_tokens, + ) + + +def extract_usage_from_transcript(transcript_path: Path) -> UsageTotals: + """ + Extract best-effort usage totals from a transcript JSONL file. + + Strategy: + - Parse each JSON line. + - Recursively find usage-like dicts. + - Use max observed usage block values (common in streaming events). + """ + path = Path(transcript_path) + if not path.exists(): + return UsageTotals() + + max_input = 0 + max_output = 0 + max_total = 0 + + with open(path, "r", encoding="utf-8", errors="replace") as f: + for line in f: + stripped = line.strip() + if not stripped: + continue + try: + payload = json.loads(stripped) + except json.JSONDecodeError: + continue + + usage_blocks: List[Dict[str, Any]] = [] + _find_usage_dicts(payload, usage_blocks) + for block in usage_blocks: + normalized = _normalize_usage_block(block) + max_input = max(max_input, normalized.input_tokens) + max_output = max(max_output, normalized.output_tokens) + max_total = max(max_total, normalized.total_tokens) + # NOTE: + # We assume streaming transcripts include cumulative usage counters + # and take the maximum observed values per file. + # This may undercount if transcripts emit only incremental usage. + + if max_total == 0: + max_total = max_input + max_output + + return UsageTotals( + input_tokens=max_input, + output_tokens=max_output, + total_tokens=max_total, + ) + + +def aggregate_usage(transcript_files: List[Path]) -> UsageTotals: + """Aggregate usage totals across multiple transcript files.""" + total = UsageTotals() + for file_path in transcript_files: + usage = extract_usage_from_transcript(file_path) + total.input_tokens += usage.input_tokens + total.output_tokens += usage.output_tokens + total.total_tokens += usage.total_tokens + if total.total_tokens == 0: + total.total_tokens = total.input_tokens + total.output_tokens + return total + + +def estimate_cost_usd(usage: UsageTotals, pricing: ProviderPricing) -> Optional[float]: + """Estimate USD cost from usage and pricing; returns None if unavailable.""" + if pricing.input_per_1m is None or pricing.output_per_1m is None: + return None + input_cost = (usage.input_tokens / 1_000_000) * pricing.input_per_1m + output_cost = (usage.output_tokens / 1_000_000) * pricing.output_per_1m + return input_cost + output_cost diff --git a/src/core/runner.py b/src/core/runner.py index 98f621f..39b1703 100644 --- a/src/core/runner.py +++ b/src/core/runner.py @@ -24,6 +24,12 @@ from core.idea_manager import IdeaManager from core.config_loader import ConfigLoader from core.security import sanitize_text +from core.cost_estimator import ( + PROVIDERS, + aggregate_usage, + estimate_cost_usd, + load_pricing_table, +) from templates.prompt_generator import PromptGenerator from templates.research_agent_instructions import generate_instructions @@ -344,6 +350,7 @@ def run_research(self, idea_id: str, self._finalize_research(idea_id, work_dir, github_url, title, provider, success) # Return result info + self._print_terminal_cost_summary(work_dir=work_dir, provider=provider) return { 'work_dir': work_dir, 'github_url': github_url, @@ -402,6 +409,7 @@ def run_research(self, idea_id: str, # Prepare command log_file = work_dir / "logs" / f"execution_{provider}.log" + transcript_file = work_dir / "logs" / f"execution_{provider}_transcript.jsonl" # Build command - raw CLI by default, scribe if requested if use_scribe: @@ -428,13 +436,14 @@ def run_research(self, idea_id: str, print(f" Command: {cmd}") print(f" Log file: {log_file}") + print(f" Transcript: {transcript_file}") print() print("=" * 80) print("AGENT OUTPUT (streaming)") print("=" * 80) print() - with open(log_file, 'w') as log_f: + with open(log_file, 'w') as log_f, open(transcript_file, 'w') as transcript_f: # Start process in workspace directory process = subprocess.Popen( shlex.split(cmd), @@ -457,6 +466,7 @@ def run_research(self, idea_id: str, sanitized_line = sanitize_text(line) print(sanitized_line, end='') log_f.write(sanitized_line) + transcript_f.write(sanitized_line) # Wait for completion return_code = process.wait(timeout=timeout) @@ -523,6 +533,7 @@ def run_research(self, idea_id: str, print(f" GitHub: {github_url}") # Return result info + self._print_terminal_cost_summary(work_dir=work_dir, provider=provider) return { 'work_dir': work_dir, 'github_url': github_url, @@ -647,12 +658,93 @@ def run_comment_mode( print(f"Warning: Failed to push to GitHub: {e}") print(" Changes are available locally") + self._print_terminal_cost_summary(work_dir=work_dir, provider=provider) return { 'work_dir': work_dir, 'github_url': github_url, 'success': result['success'] } + def _collect_transcript_files(self, work_dir: Path) -> List[Path]: + """Collect transcript files generated during a run.""" + logs_dir = Path(work_dir) / "logs" + if not logs_dir.exists(): + return [] + return sorted(logs_dir.glob("*_transcript.jsonl")) + + def _print_terminal_cost_summary(self, work_dir: Path, provider: str) -> None: + """ + Print provider pricing table and estimated run cost. + + Pricing is read from environment variables: + - NEURICO_COST_CLAUDE_INPUT_PER_1M + - NEURICO_COST_CLAUDE_OUTPUT_PER_1M + - NEURICO_COST_CODEX_INPUT_PER_1M + - NEURICO_COST_CODEX_OUTPUT_PER_1M + - NEURICO_COST_GEMINI_INPUT_PER_1M + - NEURICO_COST_GEMINI_OUTPUT_PER_1M + """ + all_transcript_files = self._collect_transcript_files(work_dir) + if not all_transcript_files: + print("\n⚠️ No transcript files found, skipping cost summary.") + return + + # NOTE: assumes single-provider run; may extend to multi-provider later. + transcript_files = [ + path for path in all_transcript_files + if path.name.endswith(f"_{provider}_transcript.jsonl") + ] + if not transcript_files: + print( + f"\n⚠️ No transcript files found for provider '{provider}', " + "skipping cost summary." + ) + return + + usage = aggregate_usage(transcript_files) + pricing_table = load_pricing_table() + + # Build a compact fixed-width table for terminal output. + header = ( + f"{'Provider':<10} {'Input/M':>10} {'Output/M':>10} " + f"{'InTok':>12} {'OutTok':>12} {'Est USD':>12}" + ) + print() + print("=" * 80) + print("COST ESTIMATOR") + print("=" * 80) + print(header) + print("-" * len(header)) + + for row_provider in PROVIDERS: + pricing = pricing_table[row_provider] + in_rate = "N/A" if pricing.input_per_1m is None else f"{pricing.input_per_1m:.4f}" + out_rate = "N/A" if pricing.output_per_1m is None else f"{pricing.output_per_1m:.4f}" + + if row_provider == provider: + in_tok = usage.input_tokens + out_tok = usage.output_tokens + est_cost = estimate_cost_usd(usage, pricing) + est_cost_text = "N/A" if est_cost is None else f"{est_cost:.6f}" + else: + in_tok = 0 + out_tok = 0 + est_cost_text = "-" + + print( + f"{row_provider:<10} {in_rate:>10} {out_rate:>10} " + f"{in_tok:>12} {out_tok:>12} {est_cost_text:>12}" + ) + + print("-" * len(header)) + print(f"Transcripts scanned: {len(transcript_files)}") + print(f"Run provider: {provider}") + print( + "Pricing env vars: NEURICO_COST__INPUT_PER_1M / " + "NEURICO_COST__OUTPUT_PER_1M" + ) + print("=" * 80) + def _copy_workspace_resources(self, work_dir: Path): """ Copy helper scripts and resources to workspace. diff --git a/src/core/transcript_parser.py b/src/core/transcript_parser.py new file mode 100644 index 0000000..67c37d0 --- /dev/null +++ b/src/core/transcript_parser.py @@ -0,0 +1,239 @@ +""" +Transcript parser for provider streaming JSON logs. + +Parses newline-delimited transcript files produced by Claude/Codex/Gemini runs +and normalizes events into a consistent structure. +""" + +from __future__ import annotations + +from dataclasses import dataclass, asdict +from pathlib import Path +from typing import Any, Dict, List, Optional +import json + + +@dataclass +class TranscriptEvent: + """Normalized transcript event.""" + + index: int + line_number: int + provider: str + event_type: str + role: Optional[str] + tool_name: Optional[str] + text: str + raw: Dict[str, Any] + + +@dataclass +class ParsedTranscript: + """Parsed transcript payload with summary metadata.""" + + file_path: str + provider: str + total_lines: int + parsed_json_lines: int + invalid_json_lines: int + events: List[TranscriptEvent] + event_type_counts: Dict[str, int] + + def to_dict(self) -> Dict[str, Any]: + """Convert to serializable dictionary.""" + return { + "file_path": self.file_path, + "provider": self.provider, + "total_lines": self.total_lines, + "parsed_json_lines": self.parsed_json_lines, + "invalid_json_lines": self.invalid_json_lines, + "events": [asdict(event) for event in self.events], + "event_type_counts": self.event_type_counts, + } + + def extracted_text(self) -> str: + """Concatenate non-empty text fields from normalized events.""" + parts = [event.text for event in self.events if event.text.strip()] + return "\n".join(parts) + + +def infer_provider_from_path(path: Path) -> str: + """Infer provider from transcript filename/path.""" + name = path.name.lower() + if "claude" in name: + return "claude" + if "codex" in name: + return "codex" + if "gemini" in name: + return "gemini" + return "unknown" + + +def _extract_role(payload: Dict[str, Any]) -> Optional[str]: + """Best-effort role extraction across common transcript shapes.""" + for key in ("role", "speaker", "author", "actor"): + value = payload.get(key) + if isinstance(value, str) and value.strip(): + return value.strip() + + message = payload.get("message") + if isinstance(message, dict): + value = message.get("role") + if isinstance(value, str) and value.strip(): + return value.strip() + + return None + + +def _extract_tool_name(payload: Dict[str, Any]) -> Optional[str]: + """Best-effort tool name extraction.""" + for key in ("tool_name", "tool", "name"): + value = payload.get(key) + if isinstance(value, str) and value.strip(): + # Avoid treating generic event name as tool when explicit type exists. + if key == "name" and any(k in payload for k in ("type", "event", "kind")): + continue + return value.strip() + + if isinstance(payload.get("tool"), dict): + maybe_name = payload["tool"].get("name") + if isinstance(maybe_name, str) and maybe_name.strip(): + return maybe_name.strip() + + return None + + +def _extract_event_type(payload: Dict[str, Any]) -> str: + """Best-effort event type extraction.""" + for key in ("type", "event", "kind", "op", "action", "name"): + value = payload.get(key) + if isinstance(value, str) and value.strip(): + return value.strip() + return "unknown" + + +def _append_text(chunks: List[str], value: Any) -> None: + """Recursively collect text from common nested transcript fields.""" + if value is None: + return + + if isinstance(value, str): + stripped = value.strip() + if stripped: + chunks.append(stripped) + return + + if isinstance(value, list): + for item in value: + _append_text(chunks, item) + return + + if not isinstance(value, dict): + return + + # Common shapes in streaming responses from different CLIs. + candidate_keys = ( + "text", + "delta", + "content", + "message", + "output", + "response", + "value", + "arguments", + "input", + ) + for key in candidate_keys: + if key in value: + _append_text(chunks, value[key]) + + # Content blocks like [{"type":"text","text":"..."}] + if "items" in value: + _append_text(chunks, value["items"]) + if "parts" in value: + _append_text(chunks, value["parts"]) + + +def _dedupe_preserve_order(values: List[str]) -> List[str]: + """Remove duplicates while preserving the first occurrence order.""" + seen = set() + out: List[str] = [] + for value in values: + if value not in seen: + seen.add(value) + out.append(value) + return out + + +def parse_transcript_file(path: Path) -> ParsedTranscript: + """ + Parse a transcript file into normalized events. + + Args: + path: Path to newline-delimited transcript (`*.jsonl`). + + Returns: + ParsedTranscript with normalized events and summary counts. + """ + transcript_path = Path(path) + if not transcript_path.exists(): + raise FileNotFoundError(f"Transcript file not found: {transcript_path}") + + provider = infer_provider_from_path(transcript_path) + events: List[TranscriptEvent] = [] + event_type_counts: Dict[str, int] = {} + + total_lines = 0 + parsed_json_lines = 0 + invalid_json_lines = 0 + + with open(transcript_path, "r", encoding="utf-8", errors="replace") as f: + for line_number, line in enumerate(f, start=1): + total_lines += 1 + stripped = line.strip() + if not stripped: + continue + + payload: Dict[str, Any] + try: + obj = json.loads(stripped) + except json.JSONDecodeError: + invalid_json_lines += 1 + payload = {"type": "raw_text", "text": stripped} + else: + parsed_json_lines += 1 + if isinstance(obj, dict): + payload = obj + else: + payload = {"type": "json_scalar", "value": obj} + + event_type = _extract_event_type(payload) + role = _extract_role(payload) + tool_name = _extract_tool_name(payload) + + text_chunks: List[str] = [] + _append_text(text_chunks, payload) + text = "\n".join(_dedupe_preserve_order(text_chunks)) + + event = TranscriptEvent( + index=len(events), + line_number=line_number, + provider=provider, + event_type=event_type, + role=role, + tool_name=tool_name, + text=text, + raw=payload, + ) + events.append(event) + event_type_counts[event_type] = event_type_counts.get(event_type, 0) + 1 + + return ParsedTranscript( + file_path=str(transcript_path), + provider=provider, + total_lines=total_lines, + parsed_json_lines=parsed_json_lines, + invalid_json_lines=invalid_json_lines, + events=events, + event_type_counts=dict(sorted(event_type_counts.items(), key=lambda kv: kv[0])), + )