Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 117 additions & 0 deletions docs/how-to/oci-dac.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
# OCI Dedicated AI Cluster (DAC) endpoints

OCI GenAI exposes two serving modes:

- **On-demand** — pay-per-token against a shared model id (`openai.gpt-5.5`,
`cohere.command-r-plus-08-2024`, …). What `Agent(model="oci:openai.gpt-5.5")`
has been using by default.
- **Dedicated AI Cluster (DAC)** — provisioned capacity exposed as a
*generative AI endpoint* OCID
(`ocid1.generativeaiendpoint.oc1.<region>....`). Inference is routed
to your cluster, with predictable latency and isolation guarantees.

Locus auto-routes DAC endpoint OCIDs to the SDK transport (`OCIModel`)
because the V1 OpenAI-compatible endpoint doesn't speak
`DedicatedServingMode`. Pass the endpoint OCID exactly the way you'd
pass a model id:

```python
from locus import Agent

agent = Agent(
model="oci:ocid1.generativeaiendpoint.oc1.<region>....<id>",
compartment_id="ocid1.compartment.oc1...", # required for DAC
profile_name="DEFAULT", # any profile in ~/.oci/config
system_prompt="...",
)
```

Behind the scenes:

```text
get_model("oci:ocid1.generativeaiendpoint....")
→ OCIModel(model_id="ocid1.generativeaiendpoint....")
→ OCIClient.get_serving_mode(...)
returns DedicatedServingMode(endpoint_id=...)
→ SDK chat() routes to your DAC.
```

## Streaming

`OCIModel.stream()` flips `is_stream=True` on the underlying
`GenericChatRequest` / `CohereChatRequest` and iterates the SSE event
stream the SDK returns. Works for both on-demand and DAC serving
modes, and for both Generic (Llama / OpenAI / xAI / Mistral / Gemini
on OCI) and Cohere R-series request shapes:

```python
async for event in agent.run("Plan Q3"):
if isinstance(event, ModelChunkEvent) and event.content:
print(event.content, end="", flush=True)
```

Each SSE event carries a JSON delta. The provider's
`parse_stream_chunk()` extracts text deltas and tool-call deltas; if
the endpoint rejects `is_stream` (some custom DAC deployments do),
the stream falls back to the non-streaming `complete()` path and
yields a single chunk with the full content — so a misconfigured
endpoint never hard-fails the stream.

## Auth

DAC endpoints accept the same auth methods as on-demand:

| Method | When | Kwarg |
|---|---|---|
| API key | local dev with `~/.oci/config` | `profile_name="DEFAULT"` |
| Session token | corporate SSO | `auth_type="security_token", profile_name="..."` |
| Instance principal | OCI VM / OKE / Functions | `auth_type="instance_principal"` |
| Resource principal | OCI Functions, Data Science | `auth_type="resource_principal"` |

`compartment_id` is **required** for DAC — the dedicated endpoint
exists in a specific compartment, and the SDK validates the
`compartment_id` field on every chat request.

## Tutorial-style env-var workflow

`examples/config.py`'s `_pick_oci_transport()` recognises DAC OCIDs
and routes them to the SDK transport automatically:

```bash
export LOCUS_MODEL_PROVIDER=oci
export LOCUS_MODEL_ID="ocid1.generativeaiendpoint.oc1.<region>....<id>"
export LOCUS_OCI_PROFILE=MY_PROFILE
export LOCUS_OCI_COMPARTMENT="ocid1.compartment.oc1..."
python examples/tutorial_01_basic_agent.py
```

`LOCUS_OCI_TRANSPORT=sdk` forces the SDK transport explicitly if you
have a hosted model that uses an OCID-shaped name but isn't a real
DAC endpoint.

## Things that go wrong

| Symptom | Likely cause |
|---|---|
| `404 Not Found` on chat | Endpoint OCID is from a different region than the SDK is talking to. Pass the right `service_endpoint=` (or set `LOCUS_OCI_REGION`) to match the endpoint's region. |
| `compartment_id is required` | Pass `compartment_id=` on `Agent()` — DAC enforces it even when on-demand wouldn't. |
| Stream yields one big chunk instead of deltas | The endpoint rejected `is_stream`. The fall-back path swallows the failure and emits the full response as one chunk; check `OCI_LOG_REQUESTS=1` to see the API error. |
| `You are not authorized to perform this request` | The principal you're authenticating with doesn't have the `inspect generative-ai-endpoints` policy in the endpoint's compartment. |

## Where the wiring lives

- [`src/locus/models/registry.py`](https://github.com/oracle-samples/locus/blob/main/src/locus/models/registry.py)
— DAC OCIDs are detected by `lowered.startswith("ocid1.generativeaiendpoint.")`
and routed to `OCIModel`.
- [`src/locus/models/providers/oci/client.py`](https://github.com/oracle-samples/locus/blob/main/src/locus/models/providers/oci/client.py)
— `OCIClient.get_serving_mode()` returns `DedicatedServingMode(endpoint_id=...)`
for OCID-shaped model ids.
- [`src/locus/models/providers/oci/__init__.py`](https://github.com/oracle-samples/locus/blob/main/src/locus/models/providers/oci/__init__.py)
— `OCIModel.stream()` does the real SSE iteration.
- [`tests/unit/test_oci_dac.py`](https://github.com/oracle-samples/locus/blob/main/tests/unit/test_oci_dac.py)
— 12 unit tests covering routing, serving-mode selection, and stream-chunk parsing.

## Related

- [OCI GenAI](../concepts/providers/oci.md) — overview of the V1 vs SDK transports.
- [OCI models how-to](oci-models.md) — full transport story for on-demand.
8 changes: 7 additions & 1 deletion examples/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,13 @@ def _pick_oci_transport(model_id: str) -> str:
forced = os.environ.get("LOCUS_OCI_TRANSPORT")
if forced in ("v1", "sdk"):
return forced
return "sdk" if model_id.lower().startswith("cohere.command-r") else "v1"
lowered = model_id.lower()
# DAC endpoint OCIDs and Cohere R-series both need the SDK
# transport (DedicatedServingMode for the former, the proprietary
# Cohere chat shape for the latter).
if lowered.startswith(("ocid1.generativeaiendpoint.", "cohere.command-r")):
return "sdk"
return "v1"


def _get_oci_model(**kwargs: Any) -> Any:
Expand Down
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ nav:
- Build a custom tool: how-to/custom-tools.md
- Add a checkpointer backend: how-to/custom-checkpointer.md
- OCI GenAI models: how-to/oci-models.md
- OCI Dedicated AI Cluster (DAC): how-to/oci-dac.md
- API reference:
- Agent: api/agent.md
- Checkpointers: api/checkpointers.md
Expand Down
101 changes: 85 additions & 16 deletions src/locus/models/providers/oci/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,24 +244,93 @@ async def stream(
tools: list[dict[str, Any]] | None = None,
**kwargs: Any,
) -> AsyncIterator[ModelChunkEvent]:
"""Stream a chat response.
"""Stream a chat response via the OCI GenAI SDK.

Note: OCI GenAI streaming is limited. This implementation
falls back to non-streaming and yields the full response.
Sets ``is_stream=True`` on the chat request so the SDK returns
an SSE event stream. Each ``data:`` event carries a JSON chunk
with ``message.content`` deltas and (on the last event)
``finishReason``. Works for both ``OnDemandServingMode``
(model id) and ``DedicatedServingMode`` (DAC endpoint OCID).

On any exception the stream falls back to the non-streaming
``complete()`` path and yields a single chunk with the full
content — robust to providers that reject ``is_stream``.
"""
# OCI GenAI has limited streaming support
# Fall back to complete and yield in chunks
response = await self.complete(messages, tools, **kwargs)

if response.content:
# Yield content in chunks for better UX
chunk_size = 50
content = response.content
for i in range(0, len(content), chunk_size):
yield ModelChunkEvent(content=content[i : i + chunk_size])

if response.tool_calls:
yield ModelChunkEvent(tool_calls=response.tool_calls)
import json as _json

from oci.generative_ai_inference import models

# Build the same request shape as ``complete()`` but with
# ``is_stream=True`` so the SDK returns a streaming response.
converted_messages = self.provider.convert_messages(messages, model_id=self.config.model_id)
converted_tools = self.provider.convert_tools(tools)
request_kwargs = {
"max_tokens": self.config.max_tokens,
"temperature": self.config.temperature,
"top_p": self.config.top_p,
"model_id": self.config.model_id,
"is_stream": True,
}

chat_request = self.provider.build_request(
converted_messages,
converted_tools,
**request_kwargs,
)
# Some provider request builders (Cohere) take messages
# under a different field — they may have ignored is_stream
# in build_request. Set it on the resulting object as a
# belt-and-braces step.
if hasattr(chat_request, "is_stream"):
chat_request.is_stream = True

chat_details = models.ChatDetails(
compartment_id=self.client.compartment_id,
serving_mode=self.client.get_serving_mode(self.config.model_id),
chat_request=chat_request,
)

loop = asyncio.get_running_loop()
try:
response = await loop.run_in_executor(None, lambda: self.client.chat(chat_details))
except Exception: # noqa: BLE001 — fall back on any provider error
# Some DAC endpoints / model versions reject is_stream.
# Hand the user a working stream by chunking the
# non-streaming response.
non_stream = await self.complete(messages, tools, **kwargs)
if non_stream.content:
yield ModelChunkEvent(content=non_stream.content)
if non_stream.tool_calls:
yield ModelChunkEvent(tool_calls=non_stream.tool_calls)
yield ModelChunkEvent(done=True)
return

# ``response.data`` is the raw streaming body. Iterate the SSE
# event stream synchronously in a worker thread so the asyncio
# loop stays responsive — each event is a small JSON delta.
events_iter = response.data.events()
sentinel = object()

def _next_event() -> Any:
return next(events_iter, sentinel)

while True:
event = await loop.run_in_executor(None, _next_event)
if event is sentinel:
break
data = getattr(event, "data", None)
if not data:
continue
try:
chunk = _json.loads(data)
except (ValueError, TypeError):
# Skip malformed deltas — keep the stream alive.
continue
content_delta, tool_calls_delta, _is_done = self.provider.parse_stream_chunk(chunk)
if content_delta:
yield ModelChunkEvent(content=content_delta)
if tool_calls_delta:
yield ModelChunkEvent(tool_calls=tool_calls_delta)

yield ModelChunkEvent(done=True)

Expand Down
31 changes: 22 additions & 9 deletions src/locus/models/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,22 +104,35 @@ def _register_defaults() -> None:
pass

# OCI GenAI — pick the right transport per model family.
# Cohere R-series (cohere.command-r-*) needs the OCI SDK's
# proprietary chat shape and is routed through OCIModel.
# Everything else (OpenAI / Meta / xAI / Mistral / Gemini and
# non-R Cohere) goes through OCIOpenAIModel against
# /openai/v1/chat/completions — real SSE streaming, day-0 model
# support, no Project OCID required. See
# docs/how-to/oci-models.md.
#
# Three transport rules, evaluated top-down:
# 1. Dedicated AI Cluster (DAC) endpoint OCIDs — strings starting
# with ``ocid1.generativeaiendpoint.`` — go through ``OCIModel``
# (SDK transport). The DAC endpoint OCID is passed verbatim to
# ``DedicatedServingMode(endpoint_id=...)``; the V1 transport
# doesn't speak that mode.
# 2. Cohere R-series (``cohere.command-r-*``) needs the OCI SDK's
# proprietary chat shape — also ``OCIModel``.
# 3. Everything else (OpenAI / Meta / xAI / Mistral / Gemini and
# non-R Cohere on-demand) goes through ``OCIOpenAIModel``
# against ``/openai/v1/chat/completions`` — real SSE streaming,
# day-0 model support, no Project OCID required.
#
# See docs/how-to/oci-models.md and docs/how-to/oci-dac.md.
try:
from locus.models.providers.oci import OCIModel, OCIOpenAIModel

def _make_oci(m: str, **kw: Any) -> ModelProtocol:
if m.lower().startswith("cohere.command-r"):
lowered = m.lower()
# Rule 1: DAC endpoint OCID → SDK transport.
if lowered.startswith("ocid1.generativeaiendpoint."):
return OCIModel(model_id=m, **kw)
# Rule 2: Cohere R-series → SDK transport.
if lowered.startswith("cohere.command-r"):
# SDK transport: defaults to profile_name="DEFAULT" + API_KEY,
# so no env-var fallback needed for one-line ergonomics.
return OCIModel(model_id=m, **kw)
# V1 transport: strictly requires profile= or auth_type=.
# Rule 3: V1 transport. Strictly requires profile= or auth_type=.
# Fall back to OCI_PROFILE env var so `Agent(model="oci:...")`
# works in one line. Explicit kwargs always win.
if "profile" not in kw and "auth_type" not in kw:
Expand Down
Loading
Loading