Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 129 additions & 33 deletions how-to/integrating-cycles-with-async-openai.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ The same lifecycle composes against other Rust LLM clients (Anthropic, Bedrock,
- Error-aware patterns using `ReservationGuard` that preserve typed `OpenAIError` for the caller (`with_cycles()` wraps closure errors as `Error::Validation` and loses the original type)
- Token-to-USD conversion at commit time for spend-denominated budgets

**Loud-failure stance.** The examples on this page error out on missing `usage`, missing `content`, or non-positive `caps.max_tokens` rather than silently committing zero or sending `max_completion_tokens=0` to OpenAI. This matches the shipped [`examples/async_openai_completion.rs`](https://github.com/runcycles/cycles-client-rust/blob/main/examples/async_openai_completion.rs) in the runcycles crate. Production code that prefers a fallback (e.g. commit the reservation estimate on missing usage) should opt into that fallback explicitly — the default in a teaching example should not be silent under-billing.

## Cargo.toml

```toml
Expand All @@ -28,6 +30,7 @@ runcycles = "0.2"
async-openai = { version = "0.38", default-features = false, features = ["chat-completion", "rustls"] }
tokio = { version = "1", features = ["full"] }
futures = "0.3" # for stream consumption
thiserror = "2" # for the error-aware section
```

`async-openai` 0.31+ splits its surface behind per-API features — the `chat-completion` feature is what makes `Client` and the chat-completion types available. The 0.30.x line bundled everything by default; if you're upgrading from there, the example uses `async_openai::types::chat::` paths (the chat types moved out of the top-level `types::` module in 0.31). The 0.30.x line also pulled `backoff` transitively, which has been replaced with `tower` in 0.31+ — worth the version bump for the cleaner dependency tree alone.
Expand Down Expand Up @@ -64,7 +67,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|_ctx| async move {
let request = CreateChatCompletionRequestArgs::default()
.model("gpt-4o-mini")
.max_tokens(800u32)
// max_completion_tokens is the current field; max_tokens is
// deprecated upstream for chat completions.
.max_completion_tokens(800u32)
.messages([ChatCompletionRequestUserMessageArgs::default()
.content(prompt)
.build()?
Expand All @@ -73,17 +78,23 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

let response = openai.chat().create(request).await?;

// Loud-failure stance: a successful HTTP response with no choices
// / no content is a malformed result. Surfacing it as `Err` lets
// `with_cycles` release the reservation rather than commit on an
// empty reply.
let text = response
.choices
.first()
.and_then(|c| c.message.content.clone())
.unwrap_or_default();
.ok_or("OpenAI response had no message content")?;

// usage is `Option<CompletionUsage>`; treat missing as zero
let actual = response
// Same stance for missing usage: committing zero tokens against a
// successful-looking call silently under-bills the budget. Error
// out and let the caller decide whether to fall back.
let usage = response
.usage
.map(|u| u.total_tokens as i64)
.unwrap_or(0);
.ok_or("OpenAI response omitted usage — refusing to commit a guessed amount")?;
let actual = i64::from(usage.total_tokens);

Ok((text, Amount::tokens(actual)))
},
Expand Down Expand Up @@ -117,27 +128,37 @@ let reply = with_cycles(
.action("llm.completion", "gpt-4o-mini")
.subject(Subject { tenant: Some("acme-corp".into()), ..Default::default() }),
|ctx| async move {
// Default ceiling; override if Cycles capped lower
// Default ceiling; override if Cycles capped lower. A non-positive
// cap is treated as an explicit refusal — sending max_completion_tokens=0
// would charge the request for zero output, which is never the intent.
let mut max_tokens: u32 = 800;
if let Some(caps) = &ctx.caps {
if let Some(cap) = caps.max_tokens {
max_tokens = (cap as u32).min(max_tokens);
let cap_u32 = u32::try_from(cap)
.map_err(|_| "caps.max_tokens is negative — refusing to call OpenAI")?;
if cap_u32 == 0 {
return Err("caps.max_tokens is 0 — refusing to call OpenAI".into());
}
max_tokens = cap_u32.min(max_tokens);
}
}

let request = CreateChatCompletionRequestArgs::default()
.model("gpt-4o-mini")
.max_tokens(max_tokens)
.max_completion_tokens(max_tokens)
.messages([ChatCompletionRequestUserMessageArgs::default()
.content(prompt)
.build()?
.into()])
.build()?;

let response = openai.chat().create(request).await?;
let actual = response.usage.map(|u| u.total_tokens as i64).unwrap_or(0);
let text = response.choices.first().and_then(|c| c.message.content.clone()).unwrap_or_default();
Ok((text, Amount::tokens(actual)))
let text = response.choices.first()
.and_then(|c| c.message.content.clone())
.ok_or("OpenAI response had no message content")?;
let usage = response.usage
.ok_or("OpenAI response omitted usage")?;
Ok((text, Amount::tokens(i64::from(usage.total_tokens))))
},
).await?;
```
Expand Down Expand Up @@ -178,24 +199,40 @@ let guard = cycles.reserve(
.build()
).await?;

// Apply caps before building the request
// Apply caps before building the request. Non-positive caps are an explicit
// refusal — release the guard and bail rather than send max_completion_tokens=0.
// Note `let _ = ... .await` on release: if the release itself errors (rare —
// network failure between the agent and the Cycles server), the caller still
// sees the original zero-cap error rather than the release error swallowing
// it.
let mut max_tokens: u32 = 1_500;
if let Some(caps) = guard.caps() {
if let Some(cap) = caps.max_tokens {
max_tokens = (cap as u32).min(max_tokens);
let cap_u32 = u32::try_from(cap)
.map_err(|_| "caps.max_tokens is negative — refusing to call OpenAI")?;
if cap_u32 == 0 {
let _ = guard.release("caps.max_tokens is 0".to_string()).await;
return Err("caps.max_tokens is 0 — refusing to call OpenAI".into());
}
max_tokens = cap_u32.min(max_tokens);
}
}

let request = CreateChatCompletionRequestArgs::default()
.model("gpt-4o-mini")
.max_tokens(max_tokens)
.max_completion_tokens(max_tokens)
.messages([ChatCompletionRequestUserMessageArgs::default()
.content(prompt)
.build()?
.into()])
.stream(true)
// Required for the stream to emit a final usage chunk.
.stream_options(ChatCompletionStreamOptions { include_usage: true })
// Required for the stream to emit a final usage chunk. The struct's
// fields are `Option<bool>` in async-openai 0.38.x — `include_obfuscation`
// is set to `None` to keep the upstream default.
.stream_options(ChatCompletionStreamOptions {
include_usage: Some(true),
include_obfuscation: None,
})
.build()?;

let mut stream = openai.chat().create_stream(request).await?;
Expand All @@ -212,14 +249,33 @@ while let Some(chunk_result) = stream.next().await {
}
// The final chunk carries usage when include_usage was set.
if let Some(usage) = chunk.usage {
final_usage_tokens = usage.total_tokens as i64;
final_usage_tokens = i64::from(usage.total_tokens);
}
}

// Defensive fallback: if usage didn't arrive (some OpenAI-compatible
// providers don't honor include_usage), estimate locally.
// Two edge cases at end-of-stream:
//
// - `full_text` is empty: the stream produced no content chunks. Treat as
// a malformed result and release the guard rather than commit on a
// zero-output response.
// - `final_usage_tokens` is zero: the stream completed but the provider
// didn't honor `include_usage`. Some OpenAI-compatible servers (Ollama,
// vLLM, certain LiteLLM configs) silently drop the usage chunk. Either
// estimate locally with a tokenizer, or release and error.
//
// The example below takes the loud path (release + error) to match the
// non-streaming sections' stance. For production code that prefers a
// fallback, plug in the `tiktoken-rs` crate's `o200k_base()` encoder and
// commit the estimate — see the snippet at the end of this section.
if full_text.is_empty() {
let _ = guard.release("openai_stream_no_content".to_string()).await;
return Err("OpenAI stream produced no content".into());
}
if final_usage_tokens == 0 {
final_usage_tokens = estimate_tokens_with_tiktoken(&prompt, &full_text);
let _ = guard.release("openai_stream_no_usage".to_string()).await;
return Err(
"OpenAI stream omitted usage — set stream_options.include_usage or estimate locally".into(),
);
}

guard.commit(
Expand All @@ -239,6 +295,24 @@ guard.commit(

If the stream errors midway (network failure, rate limit, content policy violation), call `guard.release(...).await?` — the reservation is returned to the pool with a reason code. The guard's `Drop` implementation provides best-effort release on panic / early `?` return, but explicit release with a reason code is preferred for clean audit records.

### Optional: tokenizer fallback for missing-usage chunks

If the loud-failure path on missing usage is too pessimistic for your deployment — for instance, you're routing through an OpenAI-compatible proxy that doesn't honor `include_usage` and you can't change the proxy — plug in a real tokenizer instead of erroring out. The `tiktoken-rs` crate's `o200k_base` encoder matches the tokenizer used by gpt-4o-family models:

```rust
// Add to Cargo.toml: tiktoken-rs = "0.6" (check crates.io for current)
use tiktoken_rs::o200k_base;

fn estimate_tokens(prompt: &str, output: &str) -> Result<i64, Box<dyn std::error::Error + Send + Sync>> {
let bpe = o200k_base()?;
let input = i64::try_from(bpe.encode_with_special_tokens(prompt).len())?;
let out = i64::try_from(bpe.encode_with_special_tokens(output).len())?;
Ok(input + out)
}
```

Then commit `estimate_tokens(&prompt, &full_text)` instead of releasing the guard on the missing-usage branch. The estimate will be approximate — it doesn't account for system prompts, tool definitions, or the model's actual tokenization of formatting tokens — but it beats committing zero.

## Error handling: preserving the OpenAI error type

`async-openai` returns `OpenAIError`; Cycles returns `runcycles::Error`. Callers usually want to act on these differently:
Expand Down Expand Up @@ -282,7 +356,7 @@ async fn run_completion(

let request = CreateChatCompletionRequestArgs::default()
.model("gpt-4o-mini")
.max_tokens(800u32)
.max_completion_tokens(800u32)
.messages([ChatCompletionRequestUserMessageArgs::default()
.content(prompt)
.build()?
Expand All @@ -298,14 +372,31 @@ async fn run_completion(
}
};

let text = response.choices.first()
.and_then(|c| c.message.content.clone())
.unwrap_or_default();
let actual = response.usage.map(|u| u.total_tokens as i64).unwrap_or(0);
// Loud failure on malformed-but-successful responses: missing content or
// missing usage releases the reservation and surfaces as a typed error,
// rather than committing zero and silently under-billing.
let text = match response.choices.first().and_then(|c| c.message.content.clone()) {
Some(t) => t,
None => {
let _ = guard.release("openai_no_content".to_string()).await;
return Err(CompletionError::Cycles(CyclesError::Validation(
"OpenAI response had no message content".into(),
)));
}
};
let usage = match response.usage {
Some(u) => u,
None => {
let _ = guard.release("openai_no_usage".to_string()).await;
return Err(CompletionError::Cycles(CyclesError::Validation(
"OpenAI response omitted usage".into(),
)));
}
};

guard.commit(
CommitRequest::builder()
.actual(Amount::tokens(actual))
.actual(Amount::tokens(i64::from(usage.total_tokens)))
.build()
).await?;

Expand Down Expand Up @@ -356,9 +447,12 @@ fn tokens_to_microcents(prompt_tokens: u32, completion_tokens: u32, model: &str)
}

// Inside the with_cycles closure:
let usage = response.usage.unwrap_or_default();
let usage = response.usage
.ok_or("OpenAI response omitted usage — refusing to commit a guessed amount")?;
let microcents = tokens_to_microcents(usage.prompt_tokens, usage.completion_tokens, "gpt-4o-mini");
Ok((text, Amount::usd_microcents(microcents as i64)))
let amount = i64::try_from(microcents)
.map_err(|_| "microcents overflow when converting to i64")?;
Ok((text, Amount::usd_microcents(amount)))
```

Keeping the rate table in one helper makes provider rate changes a single-edit fix. For multi-provider deployments, hoist it to your shared `costs` module.
Expand All @@ -371,7 +465,7 @@ The reserve-commit shape is the same for any Rust LLM client. The four things yo

1. **The request builder type** — `CreateChatCompletionRequestArgs` for async-openai, `MessageCreateBuilder` / `MessageCreateParams` for Anthropic's `anthropic-sdk-rust`, the provider-specific equivalent elsewhere.
2. **The call method** — `client.chat().create(req)` for async-openai; consult the provider crate's docs for the equivalent.
3. **The response usage extraction** — `response.usage.map(|u| u.total_tokens as i64)` for async-openai; Anthropic returns `input_tokens` + `output_tokens` separately on its response usage object; check the crate.
3. **The response usage extraction** — `response.usage.ok_or(...)?` then `i64::from(usage.total_tokens)` for async-openai (loud failure on missing usage, no `as` cast). Anthropic returns `input_tokens` + `output_tokens` separately on its response usage object; the same `ok_or(...)?` / `i64::from(...)` pattern applies, you just sum the two fields.
4. **The model name in the action label** — `.action("llm.completion", "claude-3-5-sonnet-20241022")` rather than `"gpt-4o-mini"`.

Pin to the specific crate version you're using and verify each of those four points against its current docs before copy-pasting. The Rust Anthropic ecosystem in particular has churn across crate names and major versions; the reserve-commit lifecycle is unchanged, but the provider-side type paths are not portable.
Expand All @@ -380,16 +474,18 @@ The [`Error Handling in Rust`](/how-to/error-handling-patterns-in-rust) patterns

## Common gotchas

1. **Streaming without `include_usage` reports zero tokens.** OpenAI's official streaming endpoint emits usage only when `stream_options.include_usage = true` is set on the request. Without it, you'll commit zero tokens and the budget will not reflect actual spend. Set the option, or fall back to a tokenizer estimate.
1. **Streaming without `include_usage` reports zero tokens.** OpenAI's official streaming endpoint emits usage only when `stream_options.include_usage` is set on the request. Without it, you'll commit zero tokens and the budget will not reflect actual spend. Set the option, and have a tokenizer fallback for OpenAI-compatible providers that don't honor it.

2. **`response.usage` is `Option`.** Some compatible servers (Ollama, vLLM, certain LiteLLM configs) don't return usage. Treat `None` as "estimate it locally" rather than "no spend."
2. **`response.usage` is `Option`.** Some compatible servers (Ollama, vLLM, certain LiteLLM configs) don't return usage. For **non-streaming** calls, the cleanest pattern is loud failure — return `Err`, let `with_cycles` release the reservation, surface the issue to the caller (the examples above follow this stance, matching the shipped `cycles-client-rust/examples/async_openai_completion.rs`). Streaming is the genuine exception: you've already consumed the stream so re-issuing is expensive, and a tokenizer estimate beats committing zero.

3. **`response.choices[0].message.content` can be `None`** when the model returns a tool-call or refusal. Handle the `None` case (commit zero or commit the prompt-token cost only) rather than unwrapping.
3. **`response.choices[0].message.content` can be `None`** when the model returns only a tool-call, a refusal, or finishes with `length` on a malformed setup. Treat that as a malformed result (fail loud and release) rather than committing on an empty reply.

4. **Don't include the OpenAI API key in the Cycles reservation metadata.** Cycles records actions, not credentials. If you're tagging the reservation with provider info, use the action name (`gpt-4o-mini`) — never the key.

5. **Mismatched async runtimes.** `async-openai` uses `tokio`; the blocking `runcycles` variant requires not being inside a Tokio runtime. Pick one — for most LLM workloads, the async client is correct.

6. **`as u32` / `as i64` on values you got from elsewhere.** `cap as u32` silently wraps on a negative `cap.max_tokens`; `microcents as i64` silently wraps on overflow. Use `u32::try_from(...)` / `i64::try_from(...)` and surface a typed error instead.

## Next steps

- [Rust Client Quickstart](/quickstart/getting-started-with-the-rust-client) — the lifecycle this page composes against
Expand Down