Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -255,3 +255,4 @@ examples/start_and_test.sh
# Old tutorials directory (superseded by examples/tutorial_*.py)
tutorials/
site/
.claude/
36 changes: 27 additions & 9 deletions docs/concepts/providers/anthropic.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,32 +109,50 @@ result = agent.run_sync("This page is broken!")
print(result.parsed) # Triage(severity='high', needs_human=True)
```

### Prompt caching — automatic for long prompts
### Prompt caching — opt in for long prompts

This is the biggest cost saver if your system prompt or tool block is
long (skills, playbooks, RAG context). Anthropic's prompt-caching
mechanism marks a span of the request as cacheable; subsequent turns
within the cache window pay **1/10th** the input cost on the cached
span.

locus reads the request shape and applies `cache_control` to anything
beyond a small threshold automatically. You don't opt in.
Opt in with `prompt_cache=True` on `AnthropicModel`. Locus then sends
the system prompt as a block list with `cache_control: ephemeral` and
tags the last entry of the tool catalog the same way (Anthropic walks
markers in order — the last tag anchors the cache point).

```python
# Force or suppress caching explicitly:
from locus import Agent
from locus.models.native.anthropic import AnthropicModel

agent = Agent(
model="anthropic:claude-sonnet-4-20250514",
model_config={"prompt_cache": True}, # or False to opt out
model=AnthropicModel(
model="claude-sonnet-4-20250514",
prompt_cache=True,
),
tools=[...],
system_prompt="<a long system prompt — skills, playbooks, RAG context>",
)

result = agent.run_sync("...")
print(f"cache writes: {result.metrics.cache_creation_input_tokens}")
print(f"cache reads: {result.metrics.cache_read_input_tokens}")
# → cache writes: 4092 (turn 1, written once)
# → cache reads: 4092 (turn 2 — same prefix, ~10× cheaper input)
```

When it kicks in:

- A 5-minute "ephemeral" cache (rolling window) — the default.
- A 5-minute "ephemeral" cache (rolling window).
- Subsequent turns reusing the same prefix pay `0.1× input rate` on
the cached portion.
- Effective when system prompts > ~1024 tokens, or you've loaded a
big skill / playbook / RAG block.
- Most effective when system prompts ≥ ~1024 tokens, or you've loaded
a big skill / playbook / RAG block.

`cache_creation_input_tokens` and `cache_read_input_tokens` surface
on `AgentResult.metrics` so observability hooks can chart cache hits
and the cost saved.

### Extended thinking — visible reasoning

Expand Down
29 changes: 26 additions & 3 deletions src/locus/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -510,8 +510,15 @@ async def run(
)
prompt_toks = response.usage.get("prompt_tokens", 0)
completion_toks = response.usage.get("completion_tokens", 0)
cache_creation_toks = response.usage.get("cache_creation_input_tokens", 0)
cache_read_toks = response.usage.get("cache_read_input_tokens", 0)
_total_tokens += prompt_toks + completion_toks
state = state.with_token_usage(prompt_toks, completion_toks)
state = state.with_token_usage(
prompt_toks,
completion_toks,
cache_creation_tokens=cache_creation_toks,
cache_read_tokens=cache_read_toks,
)

summary = (
response.message.content
Expand Down Expand Up @@ -579,8 +586,15 @@ async def run(
response, state = await self._get_model_response(state)
prompt_toks = response.usage.get("prompt_tokens", 0)
completion_toks = response.usage.get("completion_tokens", 0)
cache_creation_toks = response.usage.get("cache_creation_input_tokens", 0)
cache_read_toks = response.usage.get("cache_read_input_tokens", 0)
_total_tokens += prompt_toks + completion_toks
state = state.with_token_usage(prompt_toks, completion_toks)
state = state.with_token_usage(
prompt_toks,
completion_toks,
cache_creation_tokens=cache_creation_toks,
cache_read_tokens=cache_read_toks,
)
_last_assistant_content = response.message.content
# Track for the user-supplied termination condition. Updated again
# below if a Cohere-style text tool call is parsed out of the body.
Expand Down Expand Up @@ -1064,6 +1078,8 @@ async def _run() -> AgentResult:
total_tokens=state.total_tokens_used,
prompt_tokens=state.prompt_tokens_used,
completion_tokens=state.completion_tokens_used,
cache_creation_input_tokens=state.cache_creation_tokens_used,
cache_read_input_tokens=state.cache_read_tokens_used,
duration_ms=elapsed_ms,
)

Expand Down Expand Up @@ -1322,8 +1338,15 @@ async def _run_from_state(
response, state = await self._get_model_response(state)
prompt_toks = response.usage.get("prompt_tokens", 0)
completion_toks = response.usage.get("completion_tokens", 0)
cache_creation_toks = response.usage.get("cache_creation_input_tokens", 0)
cache_read_toks = response.usage.get("cache_read_input_tokens", 0)
_total_tokens += prompt_toks + completion_toks
state = state.with_token_usage(prompt_toks, completion_toks)
state = state.with_token_usage(
prompt_toks,
completion_toks,
cache_creation_tokens=cache_creation_toks,
cache_read_tokens=cache_read_toks,
)
_last_assistant_content = response.message.content
_last_no_tool_calls = not response.message.tool_calls

Expand Down
6 changes: 6 additions & 0 deletions src/locus/agent/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ class ExecutionMetrics(BaseModel):
duration_ms: float = 0.0
reflexion_evaluations: int = 0
grounding_evaluations: int = 0
# Anthropic prompt-caching token counts. Populated only when the
# AnthropicModel is configured with `prompt_cache=True` and the
# provider returns cache_creation_input_tokens / cache_read_input_tokens
# on the response usage. Zero on other providers.
cache_creation_input_tokens: int = 0
cache_read_input_tokens: int = 0

model_config = {"frozen": True}

Expand Down
25 changes: 23 additions & 2 deletions src/locus/core/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ class AgentState(BaseModel):
total_tokens_used: int = 0
prompt_tokens_used: int = 0
completion_tokens_used: int = 0
# Anthropic prompt-cache token counts. Populated only when an
# AnthropicModel is configured with prompt_cache=True. Zero on
# other providers.
cache_creation_tokens_used: int = 0
cache_read_tokens_used: int = 0
token_budget: int | None = None

# Completion mode
Expand Down Expand Up @@ -202,13 +207,29 @@ def with_metadata(self, key: str, value: Any) -> AgentState:
}
)

def with_token_usage(self, prompt_tokens: int, completion_tokens: int) -> AgentState:
"""Record token usage from a model response."""
def with_token_usage(
self,
prompt_tokens: int,
completion_tokens: int,
cache_creation_tokens: int = 0,
cache_read_tokens: int = 0,
) -> AgentState:
"""Record token usage from a model response.

``cache_creation_tokens`` and ``cache_read_tokens`` are populated
only when Anthropic returns prompt-cache stats on the response
usage (i.e., the AnthropicModel was configured with
``prompt_cache=True``). Default 0 for other providers.
"""
return self.model_copy(
update={
"total_tokens_used": self.total_tokens_used + prompt_tokens + completion_tokens,
"prompt_tokens_used": self.prompt_tokens_used + prompt_tokens,
"completion_tokens_used": self.completion_tokens_used + completion_tokens,
"cache_creation_tokens_used": (
self.cache_creation_tokens_used + cache_creation_tokens
),
"cache_read_tokens_used": self.cache_read_tokens_used + cache_read_tokens,
"updated_at": datetime.now(UTC),
}
)
Expand Down
48 changes: 46 additions & 2 deletions src/locus/models/native/anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,15 @@ class AnthropicConfig(ModelConfig):
top_p: float = 0.9
api_key: str | None = Field(default=None, description="Anthropic API key")
base_url: str | None = Field(default=None, description="Custom API base URL")
prompt_cache: bool = Field(
default=False,
description=(
"When True, mark the system prompt and tool catalog with "
"Anthropic's `cache_control: ephemeral` so subsequent turns "
"reuse the cached input at ~1/10x cost. Default False for "
"backward compatibility."
),
)


class AnthropicModel(BaseModel):
Expand All @@ -53,6 +62,7 @@ def __init__(
base_url: str | None = None,
max_tokens: int = 4096,
temperature: float = 0.7,
prompt_cache: bool = False,
**kwargs: Any,
) -> None:
config = AnthropicConfig(
Expand All @@ -61,6 +71,7 @@ def __init__(
base_url=base_url,
max_tokens=max_tokens,
temperature=temperature,
prompt_cache=prompt_cache,
**kwargs,
)
super().__init__(config=config)
Expand Down Expand Up @@ -195,7 +206,20 @@ async def complete(
"temperature": kwargs.get("temperature", self.config.temperature),
}
if system_prompt:
params["system"] = system_prompt
# When prompt-caching is enabled, send the system prompt as a
# block list with ``cache_control: ephemeral`` so subsequent
# turns reuse the cached input at ~1/10x cost (Anthropic
# ephemeral cache TTL is ~5 min).
if self.config.prompt_cache:
params["system"] = [
{
"type": "text",
"text": system_prompt,
"cache_control": {"type": "ephemeral"},
}
]
else:
params["system"] = system_prompt

# Structured-output mode: emulate ``response_format`` via tool-use.
response_format = kwargs.get("response_format")
Expand All @@ -211,6 +235,17 @@ async def complete(
}

if anthropic_tools:
# Cache the tool catalog too — it's typically the same across
# turns and can be large. Anthropic walks the cache_control
# markers in order; tagging the last tool covers the catalog.
if self.config.prompt_cache and anthropic_tools:
anthropic_tools = [
*anthropic_tools[:-1],
{
**anthropic_tools[-1],
"cache_control": {"type": "ephemeral"},
},
]
params["tools"] = anthropic_tools

response = await self.client.messages.create(**params)
Expand Down Expand Up @@ -240,12 +275,21 @@ async def complete(
if structured_mode and structured_payload is not None:
content = _json.dumps(structured_payload)

usage = {}
usage: dict[str, int] = {}
if response.usage:
usage = {
"prompt_tokens": response.usage.input_tokens,
"completion_tokens": response.usage.output_tokens,
}
# Anthropic returns these only when prompt caching is in play.
# Surface them on usage so AgentResult.metrics can show
# cache hits/misses and cost-saved estimates.
cache_creation = getattr(response.usage, "cache_creation_input_tokens", None)
cache_read = getattr(response.usage, "cache_read_input_tokens", None)
if cache_creation is not None:
usage["cache_creation_input_tokens"] = cache_creation
if cache_read is not None:
usage["cache_read_input_tokens"] = cache_read

return ModelResponse(
message=Message.assistant(content=content, tool_calls=tool_calls),
Expand Down
Loading
Loading