Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion crates/harness-server/src/amp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ pub struct AmpEventNormalizer {

impl AmpEventNormalizer {
fn normalize(&mut self, event: AnthropicStreamEvent) -> Vec<NormalizedEvent> {
let token_usage = event.token_usage();
let normalized = self.anthropic.normalize(event);
match normalized {
let mut out = match normalized {
NormalizedEvent::AgentTextDelta { ref item_id, delta } => {
if !delta.is_empty() {
self.pre_final_text_items.insert(item_id.clone());
Expand All @@ -49,7 +50,11 @@ impl AmpEventNormalizer {
content,
} => self.chunk_final_assistant_message(stop_reason, content),
event => vec![event],
};
if let Some(usage) = token_usage {
out.insert(0, NormalizedEvent::TokenUsage { usage });
}
out
}

fn chunk_final_assistant_message(
Expand Down
122 changes: 121 additions & 1 deletion crates/harness-server/src/anthropic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ use std::collections::HashMap;
use serde::Deserialize;
use serde_json::Value;

use crate::{NormalizedContent, NormalizedEvent, NormalizedToolResult, Result, stable_id};
use crate::{
NormalizedContent, NormalizedEvent, NormalizedTokenUsage, NormalizedToolResult, Result,
stable_id,
};

#[derive(Debug, Clone, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
Expand Down Expand Up @@ -31,6 +34,7 @@ pub enum AnthropicStreamEvent {
is_error: bool,
error: Option<Value>,
message: Option<String>,
usage: Option<Value>,
},
Error {
error: Option<Value>,
Expand Down Expand Up @@ -61,6 +65,16 @@ impl AnthropicStreamEvent {
_ => None,
}
}

pub fn token_usage(&self) -> Option<NormalizedTokenUsage> {
match self {
Self::Assistant { message, .. } => {
token_usage_from_value(message.usage.as_ref(), message.model.clone())
}
Self::Result { usage, .. } => token_usage_from_value(usage.as_ref(), None),
_ => None,
}
}
}

#[derive(Debug, Clone, Deserialize)]
Expand Down Expand Up @@ -227,6 +241,7 @@ impl AnthropicEventNormalizer {
is_error,
error,
message,
usage: _,
} => NormalizedEvent::Result {
error: result_error_text(subtype, is_error, error, message, result),
},
Expand Down Expand Up @@ -254,7 +269,9 @@ impl From<AnthropicStreamEvent> for NormalizedEvent {
#[derive(Debug, Clone, Deserialize)]
pub struct AnthropicMessage {
pub id: Option<String>,
pub model: Option<String>,
pub stop_reason: Option<String>,
pub usage: Option<Value>,
#[serde(default)]
pub content: Vec<AnthropicContentBlock>,
}
Expand Down Expand Up @@ -439,6 +456,75 @@ fn runtime_exit_code(value: Option<&Value>) -> Option<i32> {
.and_then(|code| i32::try_from(code).ok())
}

fn token_usage_from_value(
value: Option<&Value>,
model: Option<String>,
) -> Option<NormalizedTokenUsage> {
let value = value?;
if value.is_null() {
return None;
}
let usage = NormalizedTokenUsage {
model: model.or_else(|| token_string(value, &["model"])),
input_tokens: token_count(value, &["input_tokens", "inputTokens", "inputTokenCount"]),
output_tokens: token_count(
value,
&["output_tokens", "outputTokens", "outputTokenCount"],
),
cache_creation_input_tokens: token_count(
value,
&[
"cache_creation_input_tokens",
"cacheCreationInputTokens",
"cacheCreationTokens",
],
),
cache_read_input_tokens: token_count(
value,
&[
"cache_read_input_tokens",
"cached_input_tokens",
"cacheReadInputTokens",
"cachedInputTokens",
],
),
reasoning_output_tokens: token_count(
value,
&[
"reasoning_output_tokens",
"reasoning_tokens",
"reasoningOutputTokens",
"reasoningTokens",
],
),
total_tokens: token_count(value, &["total_tokens", "totalTokens", "totalTokenCount"]),
};
usage.has_counts().then_some(usage)
}

fn token_count(value: &Value, keys: &[&str]) -> Option<i64> {
keys.iter()
.filter_map(|key| value.get(*key))
.find_map(value_as_i64)
}

fn token_string(value: &Value, keys: &[&str]) -> Option<String> {
keys.iter()
.filter_map(|key| value.get(*key))
.filter_map(Value::as_str)
.map(str::trim)
.find(|value| !value.is_empty())
.map(str::to_owned)
}

fn value_as_i64(value: &Value) -> Option<i64> {
value
.as_i64()
.or_else(|| value.as_u64().and_then(|value| i64::try_from(value).ok()))
.or_else(|| value.as_str()?.trim().parse().ok())
.filter(|value| *value >= 0)
}

fn parse_json_tool_result(content: &str) -> Option<(String, Option<i32>)> {
let value: Value = serde_json::from_str(content).ok()?;
let object = value.as_object()?;
Expand All @@ -459,3 +545,37 @@ fn exit_code_from_prefix(content: &str) -> Option<i32> {
let code_text = rest.split_once('\n').map_or(rest, |(code, _)| code);
code_text.trim().parse().ok()
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn assistant_event_exposes_token_usage() {
let event: AnthropicStreamEvent = serde_json::from_str(
r#"{"type":"assistant","message":{"model":"claude-fable-5","id":"msg_1","content":[{"type":"text","text":"hi"}],"usage":{"input_tokens":2,"cache_creation_input_tokens":3,"cache_read_input_tokens":5,"output_tokens":7}}}"#,
)
.expect("assistant event");
let usage = event.token_usage().expect("usage");

assert_eq!(usage.model.as_deref(), Some("claude-fable-5"));
assert_eq!(usage.input_tokens, Some(2));
assert_eq!(usage.cache_creation_input_tokens, Some(3));
assert_eq!(usage.cache_read_input_tokens, Some(5));
assert_eq!(usage.output_tokens, Some(7));
}

#[test]
fn result_event_exposes_camel_case_token_usage() {
let event: AnthropicStreamEvent = serde_json::from_str(
r#"{"type":"result","subtype":"success","usage":{"inputTokens":11,"cachedInputTokens":13,"outputTokens":17,"totalTokens":41}}"#,
)
.expect("result event");
let usage = event.token_usage().expect("usage");

assert_eq!(usage.input_tokens, Some(11));
assert_eq!(usage.cache_read_input_tokens, Some(13));
assert_eq!(usage.output_tokens, Some(17));
assert_eq!(usage.total_tokens, Some(41));
}
}
5 changes: 5 additions & 0 deletions crates/harness-server/src/claude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,13 @@ impl PendingAgentMessage {

impl ClaudeEventNormalizer {
pub fn normalize(&mut self, event: AnthropicStreamEvent) -> Vec<NormalizedEvent> {
let token_usage = event.token_usage();
let message_stop_reason = event.message_stop_reason().map(str::to_string);
let normalized = self.inner.normalize(event);
let mut out = Vec::new();
if let Some(usage) = token_usage {
out.push(NormalizedEvent::TokenUsage { usage });
}
if let Some(stop_reason) = message_stop_reason {
self.flush_pending(Some(stop_reason), &mut out);
}
Expand All @@ -73,6 +77,7 @@ impl ClaudeEventNormalizer {
self.flush_pending(None, &mut out);
out.push(event);
}
NormalizedEvent::TokenUsage { .. } => {}
NormalizedEvent::Ignored => {}
event => out.push(event),
}
Expand Down
2 changes: 1 addition & 1 deletion crates/harness-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub use error::{HarnessServerError, Result};
pub use server::{run_blocks_server, run_harness_server, run_validate_jsonrpc, server_for};
pub use traits::{
AppServerNormalizer, AppServerRuntime, HarnessKind, HarnessServer, NormalizedContent,
NormalizedEvent, NormalizedToolResult, ThreadState,
NormalizedEvent, NormalizedTokenUsage, NormalizedToolResult, ThreadState,
};
pub use turn::{BridgeConfig, CodexTurnNormalizer};
pub use validation::run_validate_agent_deltas;
Expand Down
Loading
Loading