diff --git a/.gitignore b/.gitignore index f8a1c16..4a429eb 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ tmp/* __pycache__/ +*.egg-info/ +.pytest_cache/ diff --git a/README.md b/README.md index daaa9bc..82b7c6c 100644 --- a/README.md +++ b/README.md @@ -2,291 +2,245 @@ Subconscious

-

Subconscious SDK

+

Subconscious Python SDK

The official Python SDK for the Subconscious API

-

- PyPI version - PyPI downloads - docs - python version - license -

- --- ## Installation ```bash -pip install subconscious-python -# or -uv add subconscious-python -# or -poetry add subconscious-python +pip install subconscious-sdk ``` -> **Note**: The package name is `subconscious-python` but you import it as `subconscious`. - -## Quick Start +## Quick start ```python -from subconscious import Subconscious +from subconscious import Subconscious, tools +from pydantic import BaseModel -client = Subconscious(api_key="your-api-key") +client = Subconscious(api_key="...") -run = client.run( - engine="tim-large", +class Summary(BaseModel): + summary: str + score: float + +run = client.run_and_wait( + engine="tim-claude", input={ - "instructions": "Search for the latest AI news and summarize the top 3 stories", - "tools": [{"type": "platform", "id": "parallel_search"}], + "instructions": "Summarize and score this article: …", + "tools": [tools.platform("parallel_search")], + "answerFormat": Summary, # Pydantic class — auto-converted }, - options={"await_completion": True}, ) print(run.result.answer) ``` -## Get Your API Key +## Three ways to start a run -Create an API key in the [Subconscious dashboard](https://www.subconscious.dev/platform). +### 1. Fire-and-forget — `client.run` -## Usage +Returns the run with only `run_id` populated. Use this when a background +worker polls or when you've persisted the id and pick it up later. -### Run and Wait +```python +run = client.run(engine="tim-claude", input={"instructions": "Search AI news"}) +db.insert({"run_id": run.run_id, "status": "queued"}) +``` -The simplest way to use the SDK—create a run and wait for completion: +### 2. Block until done — `client.run_and_wait` ```python -run = client.run( - engine="tim-large", - input={ - "instructions": "Analyze the latest trends in renewable energy", - "tools": [{"type": "platform", "id": "parallel_search"}], - }, - options={"await_completion": True}, -) - +run = client.run_and_wait(engine="tim-claude", input={"instructions": "Search AI news"}) print(run.result.answer) -print(run.result.reasoning) # Structured reasoning nodes ``` -### Fire and Forget +### 3. Stream — `client.stream` -Start a run without waiting, then check status later: +Yields typed events. The first event is always `StartedEvent` +(carrying `run_id`); the last is always `DoneEvent`. Exactly one +`ResultEvent` (success) or `ErrorEvent` (failure) fires before `done`. ```python -run = client.run( - engine="tim-large", - input={ - "instructions": "Generate a comprehensive report", - "tools": [], - }, +from subconscious.types import ( + DeltaEvent, StartedEvent, ResultEvent, ToolCallEvent, ErrorEvent, ) -print(f"Run started: {run.run_id}") - -# Check status later -status = client.get(run.run_id) -print(status.status) # 'queued' | 'running' | 'succeeded' | 'failed' | 'canceled' | 'timed_out' +for event in client.stream(engine="tim-claude", input={"instructions": "Write an essay"}): + if isinstance(event, StartedEvent): + print("run_id:", event.run_id) + elif isinstance(event, DeltaEvent): + print(event.content, end="", flush=True) + elif isinstance(event, ToolCallEvent): + print(f"\ntool: {event.call.tool_name} {event.call.parameters}") + elif isinstance(event, ResultEvent): + print("\nfinal answer:", event.result.answer) + elif isinstance(event, ErrorEvent): + print(f"[{event.code}] {event.message}") ``` -### Poll with Custom Options +## Re-attaching to a run — `client.observe` + +Pick up a live or already-finished run and stream its events from the +durable buffer. Same wire format and event taxonomy as `stream()`. ```python -run = client.run( - engine="tim-large", - input={ - "instructions": "Complex task", - "tools": [{"type": "platform", "id": "parallel_search"}], - }, -) +run = client.run(engine="tim-claude", input=...) +db.persist(run.run_id) -# Wait with custom polling options -result = client.wait( - run.run_id, - options={ - "interval_ms": 2000, # Poll every 2 seconds - "max_attempts": 60, # Give up after 60 attempts - }, -) +# … later, possibly in a different process: +for event in client.observe(run.run_id): + if isinstance(event, ResultEvent): + print(event.result.answer) ``` -### Streaming (Text Deltas) +## Tools -Stream text as it's generated: +```python +from subconscious import tools +from pydantic import BaseModel + +class EmailArgs(BaseModel): + to: str + body: str + +input = { + "instructions": "Look up customers and send a follow-up email", + "tools": [ + tools.platform("parallel_search"), + tools.resource("sandbox"), + tools.function( + name="sendEmail", + url="https://api.example.com/email", + parameters=EmailArgs, # Pydantic class — auto-converted + defaults={"sender_id": "svc_abc"}, # hidden from model, auto-promoted + headers={"Authorization": "Bearer xyz"}, + ), + tools.mcp( + url="https://mcp.example.com", + headers={"Authorization": "Bearer xyz"}, # header-based auth + ), + ], +} +``` + +### Client-level FunctionTool overlays ```python -for event in client.stream( - engine="tim-large", - input={ - "instructions": "Write a short essay about space exploration", - "tools": [{"type": "platform", "id": "parallel_search"}], - }, -): - if event.type == "delta": - print(event.content, end="", flush=True) - elif event.type == "done": - print(f"\n\nRun completed: {event.run_id}") - elif event.type == "error": - print(f"Error: {event.message}") +client = Subconscious( + api_key="...", + default_function_tool_headers={"Authorization": "Bearer xyz"}, + default_function_tool_defaults={"tenant_id": "t_abc"}, +) ``` -> **Note**: Rich streaming events (reasoning steps, tool calls) are coming soon. Currently, the stream provides text deltas as they're generated. +Per-tool values win on conflict. -### Structured Output +## Structured output -Get responses in a specific JSON schema format using Pydantic models: +Pass a Pydantic class directly — the SDK converts it for you: ```python from pydantic import BaseModel -from subconscious import Subconscious -class AnalysisResult(BaseModel): +class Result(BaseModel): summary: str - key_points: list[str] - sentiment: str + score: float -client = Subconscious(api_key="your-api-key") - -run = client.run( - engine="tim-large", +run = client.run_and_wait( + engine="tim-claude", input={ - "instructions": "Analyze the latest news about electric vehicles", - "tools": [{"type": "platform", "id": "parallel_search"}], - "answerFormat": AnalysisResult, # Pass the Pydantic class directly + "instructions": "Rate this article", + "answerFormat": Result, }, - options={"await_completion": True}, ) -# The answer will conform to your schema -print(run.result.answer) # JSON string matching AnalysisResult +print(run.result.answer) # already a dict matching Result; cast if needed ``` -The SDK automatically converts your Pydantic model to JSON Schema. You can also pass a raw JSON Schema dict if preferred. +You can still pass a hand-built JSON Schema dict if you'd rather not +depend on Pydantic. -For advanced use cases, you can also specify a `reasoningFormat` to structure the agent's reasoning output. +## Cancelling a run -### Tools +`client.cancel(run_id)` is **idempotent**. Call it whether the run is +running, queued, or already terminal — it returns the run's current +shape. Only network / auth failures raise. ```python -# Platform tools (hosted by Subconscious) -parallel_search = { - "type": "platform", - "id": "parallel_search", -} - -# Function tools (your own functions) -custom_function = { - "type": "function", - "name": "get_weather", - "description": "Get current weather for a location", - "parameters": { - "type": "object", - "properties": { - "location": {"type": "string"}, - }, - "required": ["location"], - }, - "url": "https://api.example.com/weather", - "method": "GET", - "timeout": 30, -} - -# MCP tools -mcp_tool = { - "type": "mcp", - "url": "https://mcp.example.com", - "allow": ["read", "write"], -} +run = client.run(engine="tim-claude", input=...) +client.cancel(run.run_id) # safe regardless of state +client.cancel(run.run_id) # also safe ``` -### Error Handling +## Error codes -```python -from subconscious import ( - Subconscious, - SubconsciousError, - AuthenticationError, - RateLimitError, -) +Every `ErrorEvent` and every raised `SubconsciousError` carries a +canonical `code` from this set: -try: - run = client.run(...) -except AuthenticationError: - print("Invalid API key") -except RateLimitError: - print("Rate limited, retry later") -except SubconsciousError as e: - print(f"API error: {e.code} - {e}") ``` - -### Cancellation - -```python -# Cancel a running run -client.cancel(run.run_id) +invalid_request authentication_failed permission_denied +not_found rate_limited internal_error +service_unavailable timeout canceled ``` -## API Reference - -### `Subconscious` +Pattern-match on `code`, never on `message`. -The main client class. +## Engines -#### Constructor Options +The SDK accepts any engine name as a string. Canonical live names: -| Option | Type | Required | Default | -| ---------- | ------ | -------- | --------------------------------- | -| `api_key` | `str` | Yes | - | -| `base_url` | `str` | No | `https://api.subconscious.dev/v1` | +- `tim`, `tim-edge` +- `tim-claude`, `tim-claude-heavy` +- `tim-omni`, `tim-omni-mini` -#### Methods +Legacy names (`tim-large`, `tim-gpt`, `tim-small`, `timini`, …) are still +accepted and resolved to a live engine server-side. -| Method | Description | -| --------------------------- | ------------------------ | -| `run(engine, input, options)` | Create a new run | -| `stream(engine, input)` | Stream text deltas | -| `get(run_id)` | Get run status | -| `wait(run_id, options)` | Poll until completion | -| `cancel(run_id)` | Cancel a running run | +## Back-compat & deprecations -### Engines +The SDK keeps a thin compatibility shim for callers from before the +`run` / `run_and_wait` split and the wire-format normalization. Existing +code keeps working without changes; new code should reach for the new +spellings. -| Engine | Type | Availability | Description | -| ------------------- | -------- | ------------ | ----------------------------------------------------------------- | -| `tim-small-preview` | Unified | Available | Fast and tuned for search tasks | -| `tim-large` | Compound | Available | Generalized reasoning engine backed by the power of OpenAI | -| `timini` | Compound | Coming soon | Generalized reasoning engine backed by the power of Google Gemini | +### `options.await_completion` — deprecated -### Run Status +The single-method `client.run(..., options={"await_completion": True})` +shape from older releases is still accepted. It transparently routes +through `client.run_and_wait()` and emits a one-shot +`DeprecationWarning` so the migration is visible in dev. Migrate by +calling `run_and_wait()` directly: -| Status | Description | -| ----------- | ---------------------- | -| `queued` | Waiting to start | -| `running` | Currently executing | -| `succeeded` | Completed successfully | -| `failed` | Encountered an error | -| `canceled` | Manually canceled | -| `timed_out` | Exceeded time limit | +```python +# Before (still works, emits DeprecationWarning once): +run = client.run( + engine="tim-claude", + input={"instructions": "..."}, + options={"await_completion": True}, +) -## Requirements +# After: +run = client.run_and_wait(engine="tim-claude", input={"instructions": "..."}) +``` -- Python ≥ 3.8 -- requests +`RunOptions` will be removed in a future minor release. -## Contributing +### Wire-format `runId` -Contributions are welcome! Please feel free to submit a pull request. +The canonical SSE event payload key is `runId` (camelCase, matching REST +responses). The SDK still accepts the legacy snake_case `run_id` payload +shape so callers running against older API builds keep working. -## License +### Error code spelling: `canceled` (one `l`) -Apache-2.0 +`ErrorCode` and `RunStatus` both use `canceled` (one `l`). The earlier +double-`l` `cancelled` form was removed. -## Support +## License -For support and questions: -- Documentation: https://docs.subconscious.dev -- Email: {hongyin,jack}@subconscious.dev +Apache-2.0. See `LICENSE`. diff --git a/pyproject.toml b/pyproject.toml index bd3e551..22d49b3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "subconscious-sdk" -version = "0.1.5" +version = "0.1.6" description = "The official Python SDK for the Subconscious API" readme = "README.md" authors = [ diff --git a/subconscious/__init__.py b/subconscious/__init__.py index cf943e3..f185ece 100644 --- a/subconscious/__init__.py +++ b/subconscious/__init__.py @@ -1,53 +1,58 @@ """ -Subconscious Python SDK +Subconscious Python SDK. The official Python SDK for the Subconscious API. """ +from . import tools from .client import Subconscious +from .errors import ( + AuthenticationError, + ErrorCode, + NotFoundError, + RateLimitError, + SubconsciousError, + ValidationError, +) from .types import ( - # Run types - Run, - RunStatus, - RunResult, - RunInput, - RunOptions, - RunParams, - ReasoningNode, + DeltaEvent, + DoneEvent, Engine, - Usage, + ErrorEvent, + FunctionTool, + MCPAuth, + MCPTool, ModelUsage, + OutputSchema, + PlatformTool, PlatformToolUsage, PollOptions, - # Tool types - Tool, - PlatformTool, - FunctionTool, - MCPTool, - # Stream events + ReasoningNode, + ReasoningNodeEvent, + ResourceTool, + ResultEvent, + Run, + RunInput, + RunOptions, + RunParams, + RunResult, + RunStatus, + StartedEvent, StreamEvent, - DeltaEvent, - DoneEvent, - ErrorEvent, - # Structured output - OutputSchema, + Tool, + ToolCallEvent, + ToolUse, + Usage, pydantic_to_schema, ) -from .errors import ( - SubconsciousError, - AuthenticationError, - RateLimitError, - NotFoundError, - ValidationError, -) -__version__ = "0.2.0" +__version__ = "0.1.6" __author__ = "Subconscious Systems" __email__ = "contact@subconscious.dev" __all__ = [ - # Client "Subconscious", + "tools", # Run types "Run", "RunStatus", @@ -56,6 +61,7 @@ "RunOptions", "RunParams", "ReasoningNode", + "ToolUse", "Engine", "Usage", "ModelUsage", @@ -66,9 +72,15 @@ "PlatformTool", "FunctionTool", "MCPTool", - # Stream events + "MCPAuth", + "ResourceTool", + # Stream events (Stream Events v2) "StreamEvent", + "StartedEvent", "DeltaEvent", + "ReasoningNodeEvent", + "ToolCallEvent", + "ResultEvent", "DoneEvent", "ErrorEvent", # Structured output @@ -80,4 +92,5 @@ "RateLimitError", "NotFoundError", "ValidationError", + "ErrorCode", ] diff --git a/subconscious/_normalize_tools.py b/subconscious/_normalize_tools.py new file mode 100644 index 0000000..14d8f86 --- /dev/null +++ b/subconscious/_normalize_tools.py @@ -0,0 +1,127 @@ +"""Pre-flight tool normalization (R9, R12). + +Two responsibilities: + +1. **Inject client-level FunctionTool overlays.** When the SDK was + constructed with ``default_function_tool_headers`` / + ``default_function_tool_defaults``, merge those into every FunctionTool. + Per-tool values win on conflict so consumers can still override. + +2. **Auto-promote ``defaults`` keys into the JSON Schema** (R12). Defaults + are hidden values the engine never sees as model-controlled + parameters, but the schema needs to declare them anyway so the engine + can dispatch a complete payload. We synthesize a minimal property + descriptor (``{"type": "string"}``) for each defaults-only key that is + missing from ``parameters.properties``. Existing properties are left + untouched. +""" + +from dataclasses import asdict, is_dataclass +from typing import Any, Dict, List, Optional + + +def _to_dict(tool: Any) -> Dict[str, Any]: + """Convert a tool dataclass / dict into a wire-format dict.""" + if isinstance(tool, dict): + return {k: v for k, v in tool.items() if v is not None} + if is_dataclass(tool): + return {k: v for k, v in asdict(tool).items() if v is not None} + if hasattr(tool, "__dict__"): + return {k: v for k, v in tool.__dict__.items() if v is not None} + return tool + + +def _infer_property_shape(value: Any) -> Dict[str, Any]: + if isinstance(value, bool): + return {"type": "boolean"} + if isinstance(value, int) or isinstance(value, float): + return {"type": "number"} + if isinstance(value, list): + return {"type": "array", "items": {"type": "string"}} + if isinstance(value, dict): + return {"type": "object"} + return {"type": "string"} + + +def _promote_defaults( + parameters: Optional[Dict[str, Any]], + defaults: Optional[Dict[str, Any]], +) -> Dict[str, Any]: + """Auto-promote defaults-only keys to ``parameters.properties``. (R12.)""" + params: Dict[str, Any] = dict(parameters or {}) + if not defaults: + return params + + if params.get("type") != "object": + # Custom schema — leave it alone. + return params + + properties = dict(params.get("properties") or {}) + mutated = False + for key, value in defaults.items(): + if key not in properties: + properties[key] = _infer_property_shape(value) + mutated = True + + if not mutated: + return parameters or {} + + params["properties"] = properties + return params + + +def normalize_tools( + tools: Optional[List[Any]], + *, + default_function_tool_headers: Optional[Dict[str, str]] = None, + default_function_tool_defaults: Optional[Dict[str, Any]] = None, +) -> Optional[List[Dict[str, Any]]]: + """Normalize a list of tools to the wire format the API expects.""" + if tools is None: + return None + + out: List[Dict[str, Any]] = [] + for tool in tools: + d = _to_dict(tool) + if d.get("type") == "function": + out.append( + _normalize_function_tool( + d, + default_function_tool_headers, + default_function_tool_defaults, + ) + ) + else: + out.append(d) + return out + + +def _normalize_function_tool( + tool: Dict[str, Any], + default_headers: Optional[Dict[str, str]], + default_defaults: Optional[Dict[str, Any]], +) -> Dict[str, Any]: + """Apply R9 overlays + R12 defaults promotion to a single function tool dict.""" + # Per-tool values win on conflict. + merged_headers: Optional[Dict[str, str]] + if default_headers or tool.get("headers"): + merged_headers = {**(default_headers or {}), **(tool.get("headers") or {})} + else: + merged_headers = None + + merged_defaults: Optional[Dict[str, Any]] + if default_defaults or tool.get("defaults"): + merged_defaults = {**(default_defaults or {}), **(tool.get("defaults") or {})} + else: + merged_defaults = None + + parameters = _promote_defaults(tool.get("parameters"), merged_defaults) + + new_tool: Dict[str, Any] = {**tool, "parameters": parameters} + if merged_headers: + new_tool["headers"] = merged_headers + elif "headers" in new_tool and not merged_headers: + new_tool.pop("headers", None) + if merged_defaults: + new_tool["defaults"] = merged_defaults + return new_tool diff --git a/subconscious/client.py b/subconscious/client.py index 09cc1a9..b82cb6f 100644 --- a/subconscious/client.py +++ b/subconscious/client.py @@ -2,10 +2,13 @@ import json import time +import warnings +from dataclasses import asdict, is_dataclass from typing import Any, Dict, Generator, List, Optional, Union import requests +from ._normalize_tools import normalize_tools from .errors import raise_for_status from .types import ( DeltaEvent, @@ -13,83 +16,148 @@ Engine, ErrorEvent, PollOptions, + ReasoningNode, + ReasoningNodeEvent, + ResultEvent, Run, RunInput, RunOptions, - RunParams, RunResult, RunStatus, + StartedEvent, StreamEvent, - Tool, + ToolCallEvent, + ToolUse, Usage, ) +_AWAIT_COMPLETION_WARNING_SHOWN = False + + +def _warn_await_completion_deprecated() -> None: + global _AWAIT_COMPLETION_WARNING_SHOWN + if _AWAIT_COMPLETION_WARNING_SHOWN: + return + _AWAIT_COMPLETION_WARNING_SHOWN = True + warnings.warn( + "options.await_completion is deprecated. " + "Call client.run_and_wait(...) instead of " + "client.run(..., options={'await_completion': True}). " + "The legacy field will be removed in a future minor release.", + DeprecationWarning, + stacklevel=3, + ) + + +def _await_completion_requested(options: Any) -> bool: + """Read ``options.await_completion`` from a dict or RunOptions.""" + if options is None: + return False + if isinstance(options, RunOptions): + return bool(options.await_completion) + if isinstance(options, dict): + return bool(options.get("await_completion")) + return False + + +def _status_from_error_code(code: str) -> RunStatus: + if code == "canceled": + return "canceled" + if code == "timeout": + return "timed_out" + return "failed" + + def _resolve_schema(schema: Any) -> Optional[Dict[str, Any]]: """ Resolve a schema to a JSON Schema dict. - + Accepts: - - A Pydantic BaseModel class (calls model_json_schema() automatically) + - A Pydantic BaseModel class (calls model_json_schema() automatically) (R13) - A dict (passed through as-is) - None (returns None) """ if schema is None: return None - - # Check if it's a Pydantic model class + if isinstance(schema, type) and hasattr(schema, "model_json_schema"): return schema.model_json_schema() - - # Already a dict + if isinstance(schema, dict): return schema - - # Unknown type - try to use it as-is + return schema -TERMINAL_STATUSES: List[RunStatus] = ["succeeded", "failed", "canceled", "timed_out"] + +TERMINAL_STATUSES: List[RunStatus] = [ + "succeeded", + "failed", + "canceled", + "timed_out", +] class Subconscious: - """ - The main Subconscious API client. + """The main Subconscious API client. - Example: - ```python - from subconscious import Subconscious + Example:: - client = Subconscious(api_key="your-api-key") + from subconscious import Subconscious, tools + client = Subconscious(api_key="...") + + # Fire-and-forget (R18): run = client.run( - engine="tim-large", + engine="tim-claude", + input={"instructions": "Search AI news"}, + ) + + # Block until done (R18, R10): + run = client.run_and_wait( + engine="tim-claude", input={ - "instructions": "Search for the latest news about AI", - "tools": [{"type": "platform", "id": "parallel_search"}], + "instructions": "Summarize this article…", + "answerFormat": SummarySchema, # Pydantic class — auto-converted (R13) }, - options={"await_completion": True}, ) - print(run.result.answer) - ``` + + # Stream: + for event in client.stream(engine="tim-claude", input=...): + if event.type == "started": print("runId:", event.run_id) + if event.type == "delta": print(event.content, end="") + if event.type == "result": print("answer:", event.result.answer) """ def __init__( self, api_key: str, base_url: str = "https://api.subconscious.dev/v1", + *, + default_function_tool_headers: Optional[Dict[str, str]] = None, + default_function_tool_defaults: Optional[Dict[str, Any]] = None, ): """ Initialize the Subconscious client. Args: - api_key: Your Subconscious API key - base_url: API base URL (default: https://api.subconscious.dev/v1) + api_key: Your Subconscious API key. + base_url: API base URL. + default_function_tool_headers: Headers merged into every + FunctionTool dispatch. Use this for cross-cutting auth + instead of duplicating ``headers`` on every function tool. + Per-tool values win on conflict. (R9.) + default_function_tool_defaults: Hidden parameter values merged + into every FunctionTool's ``defaults``. Per-tool values + win on conflict. (R9.) """ if not api_key: raise ValueError("api_key is required") self._api_key = api_key self._base_url = base_url.rstrip("/") + self._default_function_tool_headers = default_function_tool_headers + self._default_function_tool_defaults = default_function_tool_defaults def _headers(self) -> Dict[str, str]: """Get default request headers.""" @@ -104,7 +172,6 @@ def _request( path: str, json_data: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: - """Make an HTTP request to the API.""" url = f"{self._base_url}{path}" response = requests.request( method=method, @@ -115,104 +182,109 @@ def _request( raise_for_status(response) return response.json() - def run( + # ------------------------------------------------------------------ + # Internal: build POST /v1/runs body + # ------------------------------------------------------------------ + + def _build_create_body( self, engine: Engine, input: Union[RunInput, Dict[str, Any]], - options: Optional[Union[RunOptions, Dict[str, Any]]] = None, - ) -> Run: - """ - Create a new run. - - Args: - engine: The engine to use ("tim-large" or "tim-small-preview") - input: Input configuration with instructions, tools, and optional answer/reasoning formats - options: Optional run options (await_completion, etc.) - - Returns: - The created run, with results if await_completion is True - - Example: - ```python - from pydantic import BaseModel - - class Result(BaseModel): - answer: str - confidence: float - - run = client.run( - engine="tim-large", - input={ - "instructions": "Search for AI news", - "tools": [{"type": "platform", "id": "parallel_search"}], - "answerFormat": Result, # Pass the Pydantic class directly - }, - options={"await_completion": True}, - ) - ``` - """ - # Normalize input + ) -> Dict[str, Any]: if isinstance(input, RunInput): input_dict: Dict[str, Any] = { "instructions": input.instructions, - "tools": input.tools, } + if input.tools is not None: + input_dict["tools"] = input.tools + if input.images is not None: + input_dict["images"] = input.images + if input.resources is not None: + input_dict["resources"] = input.resources + if input.skills is not None: + input_dict["skills"] = input.skills + if input.agent_id is not None: + input_dict["agentId"] = input.agent_id if input.answer_format is not None: input_dict["answerFormat"] = _resolve_schema(input.answer_format) if input.reasoning_format is not None: input_dict["reasoningFormat"] = _resolve_schema(input.reasoning_format) else: - input_dict = dict(input) # Make a copy to avoid mutating - # Resolve Pydantic models to JSON Schema + input_dict = dict(input) if "answerFormat" in input_dict: input_dict["answerFormat"] = _resolve_schema(input_dict["answerFormat"]) if "reasoningFormat" in input_dict: - input_dict["reasoningFormat"] = _resolve_schema(input_dict["reasoningFormat"]) - - # Normalize tools to dicts - tools = input_dict.get("tools", []) - normalized_tools = [] - for tool in tools: - if hasattr(tool, "__dict__"): - normalized_tools.append( - {k: v for k, v in tool.__dict__.items() if v is not None} + input_dict["reasoningFormat"] = _resolve_schema( + input_dict["reasoningFormat"] ) - else: - normalized_tools.append(tool) - input_dict["tools"] = normalized_tools - - # Make request - data = self._request( - "POST", - "/runs", - {"engine": engine, "input": input_dict}, - ) + # Camel-case agent id alias for ergonomic snake-case dict callers. + if "agent_id" in input_dict and "agentId" not in input_dict: + input_dict["agentId"] = input_dict.pop("agent_id") + + # Normalize tools (apply R9 overlays + R12 defaults promotion). + if "tools" in input_dict: + input_dict["tools"] = normalize_tools( + input_dict.get("tools"), + default_function_tool_headers=self._default_function_tool_headers, + default_function_tool_defaults=self._default_function_tool_defaults, + ) - run_id = data["runId"] + return {"engine": engine, "input": input_dict} - # Check if we should wait for completion - await_completion = False - if options: - if isinstance(options, RunOptions): - await_completion = options.await_completion - elif isinstance(options, dict): - await_completion = options.get("await_completion", False) + # ------------------------------------------------------------------ + # Public API + # ------------------------------------------------------------------ - if not await_completion: - return Run(run_id=run_id) + def run( + self, + engine: Engine, + input: Union[RunInput, Dict[str, Any]], + *, + options: Optional[Union[RunOptions, Dict[str, Any]]] = None, + poll_options: Optional[Union[PollOptions, Dict[str, Any]]] = None, + ) -> Run: + """Create a run and return its ``run_id`` immediately. Fire-and-forget. - return self.wait(run_id) + Use :py:meth:`run_and_wait` if you want to block until the run reaches + a terminal state. (R18.) - def get(self, run_id: str) -> Run: + Back-compat: passing ``options={"await_completion": True}`` (or the + ``RunOptions(await_completion=True)`` dataclass) transparently routes + through :py:meth:`run_and_wait` and emits a one-shot + :py:class:`DeprecationWarning`. New code should call + :py:meth:`run_and_wait` directly. """ - Get the current state of a run. + if _await_completion_requested(options): + _warn_await_completion_deprecated() + return self.run_and_wait(engine, input, poll_options=poll_options) + return self._create_run_only(engine, input) - Args: - run_id: The ID of the run to retrieve + def run_and_wait( + self, + engine: Engine, + input: Union[RunInput, Dict[str, Any]], + *, + poll_options: Optional[Union[PollOptions, Dict[str, Any]]] = None, + ) -> Run: + """Create a run and poll until it reaches a terminal state. (R18.)""" + # Use ``_create_run_only`` (the bare POST) instead of ``run()`` to + # avoid ping-ponging on the deprecated ``options.await_completion`` + # back-compat path. + run = self._create_run_only(engine, input) + return self.wait(run.run_id, poll_options) + + def _create_run_only( + self, + engine: Engine, + input: Union[RunInput, Dict[str, Any]], + ) -> Run: + """Internal: bare POST /runs and return the run_id only.""" + body = self._build_create_body(engine, input) + data = self._request("POST", "/runs", body) + return Run(run_id=data["runId"]) - Returns: - The run with its current status and result (if completed) - """ + def get(self, run_id: str) -> Run: + """Get the current state of a run.""" data = self._request("GET", f"/runs/{run_id}") return self._parse_run(data) @@ -221,30 +293,9 @@ def wait( run_id: str, options: Optional[Union[PollOptions, Dict[str, Any]]] = None, ) -> Run: - """ - Wait for a run to complete by polling. - - Args: - run_id: The ID of the run to wait for - options: Polling options (interval_ms, max_attempts) - - Returns: - The completed run - - Raises: - TimeoutError: If max_attempts is exceeded - - Example: - ```python - run = client.wait( - run_id, - options={"interval_ms": 2000, "max_attempts": 60}, - ) - ``` - """ - # Normalize options + """Wait for a run to complete by polling.""" interval_ms = 1000 - max_attempts = None + max_attempts: Optional[int] = None if options: if isinstance(options, PollOptions): interval_ms = options.interval_ms @@ -262,95 +313,42 @@ def wait( attempts += 1 if max_attempts is not None and attempts >= max_attempts: - raise TimeoutError(f"Polling exceeded max attempts ({max_attempts})") + raise TimeoutError( + f"Polling exceeded max attempts ({max_attempts})" + ) time.sleep(interval_ms / 1000) def cancel(self, run_id: str) -> Run: - """ - Cancel a running run. - - Args: - run_id: The ID of the run to cancel - - Returns: - The canceled run + """Cancel a run. + + **Idempotent** (R9). Callers may invoke this against a run in any + state (running, queued, already terminal) and receive the run's + current shape with a 200 response. Already-cancelled or already- + succeeded runs are returned unchanged with their existing status, + so you do not need to wrap this in ``try/except`` for the common + case. Errors are only raised for network/auth failures. """ data = self._request("POST", f"/runs/{run_id}/cancel") return self._parse_run(data) + # ------------------------------------------------------------------ + # Streaming + # ------------------------------------------------------------------ + def stream( self, engine: Engine, input: Union[RunInput, Dict[str, Any]], ) -> Generator[StreamEvent, None, Optional[Run]]: - """ - Create a streaming run that yields text deltas as they arrive. + """Create a streaming run and yield typed events. - Args: - engine: The engine to use - input: Input configuration with instructions, tools, and optional answer/reasoning formats - - Yields: - StreamEvent: Delta, done, or error events - - Returns: - The final run (if stream completes successfully) - - Example: - ```python - from pydantic import BaseModel - - class Result(BaseModel): - summary: str - - for event in client.stream( - engine="tim-large", - input={ - "instructions": "Write an essay", - "tools": [], - "answerFormat": Result, # Pass the Pydantic class directly - }, - ): - if event.type == "delta": - print(event.content, end="", flush=True) - elif event.type == "done": - print("\\nDone!") - ``` - - Note: - Rich streaming events (reasoning steps, tool calls) are coming soon. - Currently provides text deltas only. + Stream Events v2 (R8, R15) — yields ``StartedEvent`` first, + then any number of ``DeltaEvent`` / ``ReasoningNodeEvent`` / + ``ToolCallEvent`` events, exactly one of ``ResultEvent`` (success) + or ``ErrorEvent`` (failure), then ``DoneEvent`` last. """ - # Normalize input - if isinstance(input, RunInput): - input_dict: Dict[str, Any] = { - "instructions": input.instructions, - "tools": input.tools, - } - if input.answer_format is not None: - input_dict["answerFormat"] = _resolve_schema(input.answer_format) - if input.reasoning_format is not None: - input_dict["reasoningFormat"] = _resolve_schema(input.reasoning_format) - else: - input_dict = dict(input) # Make a copy to avoid mutating - # Resolve Pydantic models to JSON Schema - if "answerFormat" in input_dict: - input_dict["answerFormat"] = _resolve_schema(input_dict["answerFormat"]) - if "reasoningFormat" in input_dict: - input_dict["reasoningFormat"] = _resolve_schema(input_dict["reasoningFormat"]) - - # Normalize tools - tools = input_dict.get("tools", []) - normalized_tools = [] - for tool in tools: - if hasattr(tool, "__dict__"): - normalized_tools.append( - {k: v for k, v in tool.__dict__.items() if v is not None} - ) - else: - normalized_tools.append(tool) - input_dict["tools"] = normalized_tools + body = self._build_create_body(engine, input) url = f"{self._base_url}/runs/stream" headers = { @@ -361,85 +359,249 @@ class Result(BaseModel): response = requests.post( url, headers=headers, - json={"engine": engine, "input": input_dict}, + json=body, stream=True, ) raise_for_status(response) - run_id = response.headers.get("x-run-id", "") - is_error = False + header_run_id = response.headers.get("x-run-id", "") + # R8: emit `started` synchronously the moment we have a runId, even + # before the first server frame. + emitted_started_runid: Optional[str] = None + if header_run_id: + yield StartedEvent(run_id=header_run_id) + emitted_started_runid = header_run_id + + run_id = header_run_id + terminal_status: Optional[RunStatus] = None + terminal_result: Optional[RunResult[Any]] = None + terminal_usage: Optional[Usage] = None + + for event in self._iter_sse_events(response, run_id): + # Skip a duplicate `started` event for the same id we already synthesized. + if ( + isinstance(event, StartedEvent) + and event.run_id == emitted_started_runid + ): + continue + if isinstance(event, StartedEvent): + emitted_started_runid = event.run_id + run_id = event.run_id + elif isinstance(event, ResultEvent): + terminal_status = "succeeded" + terminal_result = event.result + terminal_usage = event.usage + elif isinstance(event, ErrorEvent): + terminal_status = _status_from_error_code(event.code) + yield event + + return ( + Run( + run_id=run_id, + status=terminal_status, + result=terminal_result, + usage=terminal_usage, + ) + if run_id + else None + ) + + def observe( + self, + run_id: str, + ) -> Generator[StreamEvent, None, Optional[Run]]: + """Re-attach to an in-flight (or finished) run and stream its events. (R16.) + + Same wire format and event taxonomy as :py:meth:`stream`. Useful + when a parent process restarts and needs to resume an existing + run. + + Example:: + + run = client.run(engine="tim-claude", input=...) + persist_to_db(run.run_id) + # … later, possibly in a different process … + for event in client.observe(run.run_id): + handle(event) + """ + url = f"{self._base_url}/runs/{run_id}/stream" + headers = { + "Authorization": f"Bearer {self._api_key}", + "Accept": "text/event-stream", + } + + response = requests.get(url, headers=headers, stream=True) + raise_for_status(response) + + terminal_status: Optional[RunStatus] = None + terminal_result: Optional[RunResult[Any]] = None + terminal_usage: Optional[Usage] = None + + for event in self._iter_sse_events(response, run_id): + if isinstance(event, StartedEvent): + run_id = event.run_id + elif isinstance(event, ResultEvent): + terminal_status = "succeeded" + terminal_result = event.result + terminal_usage = event.usage + elif isinstance(event, ErrorEvent): + terminal_status = _status_from_error_code(event.code) + yield event + + return ( + Run( + run_id=run_id, + status=terminal_status, + result=terminal_result, + usage=terminal_usage, + ) + if run_id + else None + ) - for line in response.iter_lines(decode_unicode=True): + # ------------------------------------------------------------------ + # SSE parsing + # ------------------------------------------------------------------ + + def _iter_sse_events( + self, + response: requests.Response, + run_id: str, + ) -> Generator[StreamEvent, None, None]: + """Parse a chunked SSE response into typed StreamEvent values.""" + pending_event: Optional[str] = None + + for raw_line in response.iter_lines(decode_unicode=True): + if raw_line is None: + continue + line = raw_line # already decoded if not line: + pending_event = None continue - line = line.strip() - - # Skip heartbeat comments if line.startswith(":"): + # Heartbeat / comment. continue - # Handle event type markers if line.startswith("event:"): - event_type = line[6:].strip() - is_error = event_type == "error" + pending_event = line[6:].strip() continue - # Handle data lines - if line.startswith("data:"): - data_content = line[5:].strip() - - # Stream end - if data_content == "[DONE]": - yield DoneEvent(type="done", run_id=run_id) - continue - - try: - payload = json.loads(data_content) - - # Meta event with run_id - if "run_id" in payload: - run_id = payload["run_id"] - continue - - # Error event - if is_error or "error" in payload: - yield ErrorEvent( - type="error", - run_id=run_id, - message=payload.get("details") or payload.get("error", "Unknown error"), - code=payload.get("code"), - ) - is_error = False - continue - - # OpenAI-compatible chunk with text delta - choices = payload.get("choices", []) - if choices: - content = choices[0].get("delta", {}).get("content") - if content: - yield DeltaEvent(type="delta", run_id=run_id, content=content) - - except json.JSONDecodeError: - pass - - return Run(run_id=run_id, status="succeeded") if run_id else None + if not line.startswith("data:"): + continue + data_content = line[5:].strip() + + if data_content == "[DONE]": + yield DoneEvent(run_id=run_id) + pending_event = None + continue + + try: + payload = json.loads(data_content) + except json.JSONDecodeError: + continue + + event = self._parse_sse_payload(pending_event, payload, run_id) + if event is None: + continue + + # Update tracked run_id from `started` events. + if isinstance(event, StartedEvent): + run_id = event.run_id + yield event + + def _parse_sse_payload( + self, + event_tag: Optional[str], + payload: Dict[str, Any], + run_id: str, + ) -> Optional[StreamEvent]: + if event_tag in ("started", "meta"): + # Canonical wire key is ``runId`` (camelCase). The legacy + # ``run_id`` snake_case form is accepted for one minor + # release of back-compat with older API builds. + new_id = payload.get("runId") or payload.get("run_id") or run_id + if not new_id: + return None + return StartedEvent(run_id=new_id) + + if event_tag == "reasoning_node": + node_data = payload.get("node", payload) + return ReasoningNodeEvent(run_id=run_id, node=_parse_reasoning_node(node_data)) + + if event_tag == "tool_call": + call_data = payload.get("call", payload) + return ToolCallEvent(run_id=run_id, call=_parse_tool_use(call_data)) + + if event_tag == "result": + result_data = payload.get("result", payload) + usage_data = payload.get("usage") + result = RunResult( + answer=result_data.get("answer"), + reasoning=_parse_reasoning_list(result_data.get("reasoning")), + ) + usage = ( + Usage( + input_tokens=usage_data.get("inputTokens", 0), + output_tokens=usage_data.get("outputTokens", 0), + ) + if usage_data + else None + ) + return ResultEvent(run_id=run_id, result=result, usage=usage) + + if event_tag == "error": + return ErrorEvent( + run_id=run_id, + code=payload.get("code") or "internal_error", + message=( + payload.get("message") + or payload.get("details") + or payload.get("error") + or "Unknown error" + ), + details=payload.get("details") if isinstance( + payload.get("details"), dict + ) + else None, + ) + + # No event tag — OpenAI-compat delta chunk. + choices = payload.get("choices", []) + if choices: + content = choices[0].get("delta", {}).get("content") + if content: + return DeltaEvent(run_id=run_id, content=content) + + # Bare `{"runId": "…"}` (or legacy `{"run_id": …}`) frames — synthesize started. + if "runId" in payload: + return StartedEvent(run_id=payload["runId"]) + if "run_id" in payload: + return StartedEvent(run_id=payload["run_id"]) + + return None + + # ------------------------------------------------------------------ + # JSON → dataclass parsing + # ------------------------------------------------------------------ def _parse_run(self, data: Dict[str, Any]) -> Run: - """Parse a run response from the API.""" result = None if "result" in data and data["result"]: + r = data["result"] result = RunResult( - answer=data["result"].get("answer", ""), - reasoning=data["result"].get("reasoning"), + answer=r.get("answer", ""), + reasoning=_parse_reasoning_list(r.get("reasoning")), ) usage = None if "usage" in data and data["usage"]: - usage = Usage( - models=data["usage"].get("models", []), - platform_tools=data["usage"].get("platformTools", []), - ) + u = data["usage"] + if isinstance(u, dict) and ("inputTokens" in u or "input_tokens" in u): + usage = Usage( + input_tokens=u.get("inputTokens", u.get("input_tokens", 0)), + output_tokens=u.get("outputTokens", u.get("output_tokens", 0)), + ) return Run( run_id=data.get("runId", data.get("run_id", "")), @@ -447,3 +609,41 @@ def _parse_run(self, data: Dict[str, Any]) -> Run: result=result, usage=usage, ) + + +# --------------------------------------------------------------------------- +# Module-level helpers +# --------------------------------------------------------------------------- + + +def _parse_tool_use(data: Any) -> Optional[ToolUse]: + if not isinstance(data, dict): + return None + return ToolUse( + tool_name=data.get("tool_name") or data.get("name") or "", + parameters=data.get("parameters"), + tool_result=data.get("tool_result"), + ) + + +def _parse_reasoning_node(data: Any) -> Optional[ReasoningNode]: + if not isinstance(data, dict): + return None + return ReasoningNode( + title=data.get("title", ""), + thought=data.get("thought", ""), + tooluse=_parse_tool_use(data.get("tooluse")), + subtasks=_parse_reasoning_list(data.get("subtasks")) or [], + conclusion=data.get("conclusion", ""), + ) + + +def _parse_reasoning_list(data: Any) -> Optional[List[ReasoningNode]]: + if not isinstance(data, list): + return None + out: List[ReasoningNode] = [] + for item in data: + node = _parse_reasoning_node(item) + if node is not None: + out.append(node) + return out diff --git a/subconscious/errors.py b/subconscious/errors.py index 9decac9..325f408 100644 --- a/subconscious/errors.py +++ b/subconscious/errors.py @@ -11,6 +11,7 @@ "internal_error", "service_unavailable", "timeout", + "canceled", ] diff --git a/subconscious/tools.py b/subconscious/tools.py new file mode 100644 index 0000000..152bcf8 --- /dev/null +++ b/subconscious/tools.py @@ -0,0 +1,112 @@ +"""Tool builders (R11). + +Tiny helpers that turn the discriminated-union verbosity of ``Tool`` into +a one-call API while preserving full type safety. Use these in preference +to building tool literals by hand. + +Example:: + + from subconscious import Subconscious, tools + from pydantic import BaseModel + + class EmailArgs(BaseModel): + to: str + body: str + + client = Subconscious(api_key=...) + run = client.run_and_wait( + engine="tim-claude", + input={ + "instructions": "Send a welcome email", + "tools": [ + tools.platform("parallel_search"), + tools.resource("sandbox"), + tools.function( + name="sendEmail", + url="https://api.example.com/email", + parameters=EmailArgs, # Pydantic OK (R13) + defaults={"sender_id": "svc_abc"}, # hidden from model (R12) + headers={"Authorization": "Bearer ..."}, + ), + tools.mcp( + url="https://mcp.example.com", + headers={"Authorization": "Bearer ..."}, # R7 + ), + ], + }, + ) +""" + +from typing import Any, Dict, List, Literal, Optional + +from .types import ( + FunctionTool, + MCPAuth, + MCPTool, + PlatformTool, + ResourceTool, + OutputSchema, + pydantic_to_schema, +) + + +def _coerce_parameters(parameters: Any, default_title: str) -> Dict[str, Any]: + """Accept either a Pydantic model class or a raw JSON Schema dict. (R13.)""" + if parameters is None: + return {} + if isinstance(parameters, dict): + return parameters + if isinstance(parameters, type) and hasattr(parameters, "model_json_schema"): + return dict(pydantic_to_schema(parameters, default_title)) + # Fall through — assume already JSON-Schema shaped. + return parameters # type: ignore[return-value] + + +def platform(id: str, options: Optional[Dict[str, Any]] = None) -> PlatformTool: + """Build a platform tool. ``options`` defaults to ``None`` (omitted).""" + return PlatformTool(id=id, options=options) + + +def function( + *, + name: str, + url: str, + parameters: Any, + description: Optional[str] = None, + headers: Optional[Dict[str, str]] = None, + defaults: Optional[Dict[str, Any]] = None, +) -> FunctionTool: + """Build a function tool. + + ``parameters`` accepts a Pydantic ``BaseModel`` class OR a raw JSON + Schema dict. (R13.) + """ + return FunctionTool( + name=name, + description=description, + url=url, + parameters=_coerce_parameters(parameters, name), + headers=headers, + defaults=defaults, + ) + + +def mcp( + *, + url: str, + allowed_tools: Optional[List[str]] = None, + headers: Optional[Dict[str, str]] = None, + auth: Optional[MCPAuth] = None, +) -> MCPTool: + """Build an MCP tool. Use ``headers`` for header-based auth. (R7.)""" + return MCPTool( + url=url, + allowed_tools=allowed_tools, + headers=headers, + auth=auth, + ) + + +def resource(id: Literal["sandbox", "memory", "browser"]) -> ResourceTool: + """Build a hosted runtime resource tool. (R17.)""" + return ResourceTool(id=id) diff --git a/subconscious/types.py b/subconscious/types.py index b8fad9a..3cb4d74 100644 --- a/subconscious/types.py +++ b/subconscious/types.py @@ -1,81 +1,135 @@ -"""Type definitions for the Subconscious SDK.""" +"""Type definitions for the Subconscious SDK (1.0).""" -from typing import Any, Dict, List, Literal, Optional, Union from dataclasses import dataclass, field - -# Engine types -Engine = Literal["tim-small-preview", "tim-large", "timini"] - - -# JSON Schema types for structured output +from typing import Any, Dict, Generic, List, Literal, Optional, TypeVar, Union + +# --------------------------------------------------------------------------- +# Engine identity +# --------------------------------------------------------------------------- +# +# Sourced from `packages/common/engines.ts`. The Literal lists the live +# canonical names plus legacy aliases the server still accepts and resolves +# at ingest. New code should prefer one of the live names below. +Engine = Literal[ + "tim", + "tim-edge", + "tim-claude", + "tim-claude-heavy", + "tim-omni", + "tim-omni-mini", + # Legacy aliases — accepted by the server, resolved to a live engine. + "tim-large", + "tim-small", + "tim-small-preview", + "tim-gpt", + "tim-gpt-heavy", + "timini", +] + + +# --------------------------------------------------------------------------- +# JSON Schema for structured output +# --------------------------------------------------------------------------- class OutputSchema(Dict[str, Any]): """ JSON Schema for structured output format. - + Expected structure: - $defs: Optional definitions for complex types - properties: Dict of property names to their JSON Schema definitions - required: List of required property names - title: Title for the schema - type: "object" - - Use pydantic_to_schema() to convert a Pydantic model to this format. + + Most users do not need to construct this directly; the SDK accepts a + Pydantic model in `answer_format` / `reasoning_format` and converts it + automatically. (R13.) """ + pass def pydantic_to_schema(model: type, title: Optional[str] = None) -> OutputSchema: """ Convert a Pydantic model to the JSON Schema format expected by Subconscious. - - Note: You typically don't need to call this directly. The SDK automatically - converts Pydantic models passed to answerFormat/reasoningFormat. - - Args: - model: A Pydantic BaseModel class - title: Optional title override (defaults to model class name) - - Returns: - An OutputSchema compatible with answerFormat/reasoningFormat + + Note: You typically don't need to call this directly. The SDK + automatically converts Pydantic models passed to `answer_format` / + `reasoning_format`. (R13.) """ - # Get the JSON Schema from Pydantic - schema = model.model_json_schema() - - # Create the output schema - result = OutputSchema({ - "type": "object", - "title": title or schema.get("title", model.__name__), - "properties": schema.get("properties", {}), - "required": schema.get("required", list(schema.get("properties", {}).keys())), - }) - - # Include $defs if present (for complex nested types) + schema = model.model_json_schema() # type: ignore[attr-defined] + result = OutputSchema( + { + "type": "object", + "title": title or schema.get("title", model.__name__), + "properties": schema.get("properties", {}), + "required": schema.get( + "required", list(schema.get("properties", {}).keys()) + ), + } + ) if "$defs" in schema: result["$defs"] = schema["$defs"] - return result -# Run status types -RunStatus = Literal["queued", "running", "succeeded", "failed", "canceled", "timed_out"] + +# Run lifecycle +RunStatus = Literal[ + "queued", "running", "succeeded", "failed", "canceled", "timed_out" +] + + +# --------------------------------------------------------------------------- +# Reasoning shapes (R2, R3) +# --------------------------------------------------------------------------- + + +@dataclass +class ToolUse: + """One completed tool invocation captured inside a reasoning node. + + Pre-1.0 the SDK typed this as ``List[Any]`` forcing every consumer to + cast and ``json.loads``. The structured shape eliminates that. (R3.) + """ + + tool_name: str + parameters: Optional[Any] = None + tool_result: Optional[Any] = None @dataclass class ReasoningNode: - """A node in the reasoning tree.""" + """A node in the reasoning tree. + + Note ``subtasks`` (plural). Earlier versions shipped ``subtask`` + (singular) which differed from the engine wire format. (R2.) + """ title: str thought: str - tooluse: List[Any] = field(default_factory=list) - subtask: List["ReasoningNode"] = field(default_factory=list) + tooluse: Optional[ToolUse] = None + subtasks: List["ReasoningNode"] = field(default_factory=list) conclusion: str = "" +# --------------------------------------------------------------------------- +# Run + result +# --------------------------------------------------------------------------- + +T = TypeVar("T") + + @dataclass -class RunResult: - """The result of a completed run.""" +class RunResult(Generic[T]): + """The structured result of a completed run. + + Generic over ``T`` — the answer type. Defaults to ``Any`` for free-form + completions. When using ``answer_format`` consumers may type-hint the + answer's shape. (R10.) + """ - answer: str - reasoning: Optional[ReasoningNode] = None + answer: T + reasoning: Optional[List[ReasoningNode]] = None @dataclass @@ -98,75 +152,141 @@ class PlatformToolUsage: @dataclass class Usage: - """Usage statistics for a run.""" + """Token usage for a run.""" - models: List[ModelUsage] = field(default_factory=list) - platform_tools: List[PlatformToolUsage] = field(default_factory=list) + input_tokens: int = 0 + output_tokens: int = 0 @dataclass -class Run: +class Run(Generic[T]): """Represents an agent run.""" run_id: str status: Optional[RunStatus] = None - result: Optional[RunResult] = None + result: Optional[RunResult[T]] = None usage: Optional[Usage] = None -# Tool types +# --------------------------------------------------------------------------- +# Tool definitions +# --------------------------------------------------------------------------- + + @dataclass class PlatformTool: """A platform-hosted tool.""" id: str type: Literal["platform"] = "platform" - options: Dict[str, Any] = field(default_factory=dict) + options: Optional[Dict[str, Any]] = None @dataclass class FunctionTool: - """A custom function tool.""" + """A custom function tool the engine dispatches to via HTTP. + + Body shape: flat JSON keyed by parameter name (no ``tool_name`` envelope). + + ``defaults`` are hidden parameter values merged into the dispatched + body server-side; the model never sees them. The SDK auto-promotes + keys-only-in-defaults into ``parameters`` at normalization time so the + engine has a complete schema. (R12.) + """ name: str type: Literal["function"] = "function" description: Optional[str] = None parameters: Dict[str, Any] = field(default_factory=dict) url: Optional[str] = None - method: Optional[str] = None - timeout: Optional[int] = None + headers: Optional[Dict[str, str]] = None + defaults: Optional[Dict[str, Any]] = None + + +@dataclass +class MCPAuth: + """Bearer / API-key auth for an MCP tool.""" + + type: Literal["bearer", "api_key"] + token: str + header: Optional[str] = None @dataclass class MCPTool: - """An MCP (Model Context Protocol) tool.""" + """An MCP (Model Context Protocol) tool. + + Use ``headers`` for arbitrary header-based auth (R7). Use ``auth`` for + the structured Bearer / API-key shape. + """ url: str type: Literal["mcp"] = "mcp" - allow: Optional[List[str]] = None + allowed_tools: Optional[List[str]] = None + headers: Optional[Dict[str, str]] = None + auth: Optional[MCPAuth] = None + + +@dataclass +class ResourceTool: + """A hosted runtime resource — sandbox, memory, or browser. (R17.)""" + + id: Literal["sandbox", "memory", "browser"] + type: Literal["resource"] = "resource" # Tool union type -Tool = Union[PlatformTool, FunctionTool, MCPTool, Dict[str, Any]] +Tool = Union[PlatformTool, FunctionTool, MCPTool, ResourceTool, Dict[str, Any]] + + +# --------------------------------------------------------------------------- +# Run input +# --------------------------------------------------------------------------- @dataclass class RunInput: - """Input configuration for a run.""" + """Input configuration for a run. + + Mirrors the API shape exactly. (R1.) + """ instructions: str tools: List[Tool] = field(default_factory=list) - answer_format: Optional[OutputSchema] = None - """JSON Schema for the answer output format. Use pydantic_to_schema() to generate from Pydantic.""" - reasoning_format: Optional[OutputSchema] = None - """JSON Schema for the reasoning output format. Use pydantic_to_schema() to generate from Pydantic.""" + images: Optional[List[str]] = None + """Inline image inputs — public URLs or base64 data URIs. (R1.)""" + resources: Optional[List[str]] = None + """**Deprecated** — pass ``ResourceTool`` blocks inside ``tools`` instead. (R17.)""" + skills: Optional[List[str]] = None + """Skill IDs the server resolves into a manifest. (R1.)""" + agent_id: Optional[str] = None + """Optional agent identifier — associates the run with an agent's config + memory. (R1.)""" + answer_format: Optional[Any] = None + """JSON Schema or Pydantic model for the answer output format. (R13.)""" + reasoning_format: Optional[Any] = None + """JSON Schema or Pydantic model for the reasoning output format. (R13.)""" @dataclass class RunOptions: - """Options for creating a run.""" + """**Deprecated.** Pre-split ``client.run()`` accepted + ``options.await_completion`` to choose between fire-and-forget and + polling-until-complete. The same thing is now expressed by calling + :py:meth:`Subconscious.run` (fire-and-forget) or + :py:meth:`Subconscious.run_and_wait` (polls until terminal). + + Existing call sites continue to work — passing + ``options=RunOptions(await_completion=True)`` (or the dict form + ``options={"await_completion": True}``) to ``client.run()`` + transparently routes through ``run_and_wait()`` and emits a one-shot + :py:class:`DeprecationWarning`. + + .. deprecated:: + Use :py:meth:`Subconscious.run_and_wait` instead. This shape will + be removed in a future minor release. + """ - await_completion: bool = False + await_completion: Optional[bool] = None @dataclass @@ -176,6 +296,9 @@ class RunParams: engine: Engine input: RunInput options: Optional[RunOptions] = None + """**Deprecated.** Use :py:meth:`Subconscious.run_and_wait` instead of + ``options.await_completion``. Kept for back-compat with pre-split callers. + """ @dataclass @@ -186,7 +309,20 @@ class PollOptions: max_attempts: Optional[int] = None -# Stream event types +# --------------------------------------------------------------------------- +# Stream events (Stream Events v2) +# --------------------------------------------------------------------------- + + +@dataclass +class StartedEvent: + """Emitted once, immediately after stream open. Carries the runId + synchronously so consumers can register cancellation handlers. (R8.)""" + + type: Literal["started"] = "started" + run_id: str = "" + + @dataclass class DeltaEvent: """Text delta event - emitted as text is generated.""" @@ -196,9 +332,38 @@ class DeltaEvent: content: str = "" +@dataclass +class ReasoningNodeEvent: + """One completed reasoning node. (R15.)""" + + type: Literal["reasoning_node"] = "reasoning_node" + run_id: str = "" + node: Optional[ReasoningNode] = None + + +@dataclass +class ToolCallEvent: + """One completed tool invocation. (R15.)""" + + type: Literal["tool_call"] = "tool_call" + run_id: str = "" + call: Optional[ToolUse] = None + + +@dataclass +class ResultEvent(Generic[T]): + """The final structured run envelope, emitted exactly once on success + immediately before ``DoneEvent``. (R15.)""" + + type: Literal["result"] = "result" + run_id: str = "" + result: Optional[RunResult[T]] = None + usage: Optional[Usage] = None + + @dataclass class DoneEvent: - """Stream completed successfully.""" + """Stream completed. Always the last event.""" type: Literal["done"] = "done" run_id: str = "" @@ -206,13 +371,21 @@ class DoneEvent: @dataclass class ErrorEvent: - """Stream encountered an error.""" + """Stream encountered an error. The ``code`` field is required (R5).""" type: Literal["error"] = "error" run_id: str = "" + code: str = "internal_error" message: str = "" - code: Optional[str] = None - - -StreamEvent = Union[DeltaEvent, DoneEvent, ErrorEvent] - + details: Optional[Dict[str, Any]] = None + + +StreamEvent = Union[ + StartedEvent, + DeltaEvent, + ReasoningNodeEvent, + ToolCallEvent, + ResultEvent, + DoneEvent, + ErrorEvent, +] diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e02abfc --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1 @@ + diff --git a/tests/test_builders.py b/tests/test_builders.py new file mode 100644 index 0000000..cd624db --- /dev/null +++ b/tests/test_builders.py @@ -0,0 +1,88 @@ +"""Tests for tool builders (R7, R11, R12, R13, R17).""" + +from subconscious import tools +from subconscious.types import ( + FunctionTool, + MCPAuth, + MCPTool, + PlatformTool, + ResourceTool, +) + +try: + from pydantic import BaseModel # type: ignore[import-not-found] + + HAS_PYDANTIC = True +except Exception: # pragma: no cover + HAS_PYDANTIC = False + + +def test_platform_minimal(): + assert tools.platform("parallel_search") == PlatformTool(id="parallel_search") + + +def test_platform_with_options(): + out = tools.platform("parallel_search", {"region": "us"}) + assert out == PlatformTool(id="parallel_search", options={"region": "us"}) + + +def test_function_with_raw_schema(): + out = tools.function( + name="lookup", + url="https://api.example.com", + parameters={ + "type": "object", + "properties": {"id": {"type": "string"}}, + "required": ["id"], + }, + ) + assert isinstance(out, FunctionTool) + assert out.parameters["properties"]["id"] == {"type": "string"} + + +def test_function_preserves_headers_and_defaults(): + out = tools.function( + name="send", + url="https://api.example.com/send", + parameters={"type": "object", "properties": {}, "required": []}, + headers={"Authorization": "Bearer xyz"}, + defaults={"sender_id": "svc_abc"}, + ) + assert out.headers == {"Authorization": "Bearer xyz"} + assert out.defaults == {"sender_id": "svc_abc"} + + +def test_mcp_with_headers_R7(): + out = tools.mcp(url="https://mcp.example.com", headers={"X-Tenant": "acme"}) + assert isinstance(out, MCPTool) + assert out.headers == {"X-Tenant": "acme"} + + +def test_mcp_with_structured_auth(): + auth = MCPAuth(type="bearer", token="xyz") + out = tools.mcp(url="https://mcp.example.com", auth=auth) + assert out.auth == auth + + +def test_resource_R17(): + for rid in ("sandbox", "memory", "browser"): + out = tools.resource(rid) # type: ignore[arg-type] + assert isinstance(out, ResourceTool) + assert out.id == rid + + +if HAS_PYDANTIC: + + def test_function_accepts_pydantic_class_R13(): + class EmailArgs(BaseModel): # type: ignore[misc] + to: str + body: str + + out = tools.function( + name="sendEmail", + url="https://api.example.com/email", + parameters=EmailArgs, + ) + params = out.parameters + assert params["type"] == "object" + assert set(params["properties"].keys()) == {"to", "body"} diff --git a/tests/test_normalize_tools.py b/tests/test_normalize_tools.py new file mode 100644 index 0000000..974149c --- /dev/null +++ b/tests/test_normalize_tools.py @@ -0,0 +1,131 @@ +"""Tests for the tool normalization layer (R9, R12).""" + +from subconscious import tools +from subconscious._normalize_tools import normalize_tools + + +def test_promote_defaults_to_properties_R12(): + input_tools = [ + tools.function( + name="sendEmail", + url="https://api.example.com", + parameters={ + "type": "object", + "properties": {"to": {"type": "string"}}, + "required": ["to"], + }, + defaults={"sender_id": "svc_abc"}, + ) + ] + out = normalize_tools(input_tools) + assert out is not None + params = out[0]["parameters"] + assert params["properties"]["sender_id"] == {"type": "string"} + assert params["properties"]["to"] == {"type": "string"} + + +def test_does_not_overwrite_explicit_property(): + input_tools = [ + tools.function( + name="sendEmail", + url="https://api.example.com", + parameters={ + "type": "object", + "properties": { + "sender_id": {"type": "string", "description": "explicit"} + }, + "required": [], + }, + defaults={"sender_id": "svc_abc"}, + ) + ] + out = normalize_tools(input_tools) + assert out is not None + assert out[0]["parameters"]["properties"]["sender_id"] == { + "type": "string", + "description": "explicit", + } + + +def test_infers_shapes_for_non_string_defaults(): + input_tools = [ + tools.function( + name="createTicket", + url="https://api.example.com", + parameters={"type": "object", "properties": {}, "required": []}, + defaults={ + "priority": 3, + "urgent": True, + "tags": ["ops"], + "extra": {"foo": 1}, + }, + ) + ] + out = normalize_tools(input_tools) + assert out is not None + props = out[0]["parameters"]["properties"] + assert props["priority"] == {"type": "number"} + assert props["urgent"] == {"type": "boolean"} + assert props["tags"] == {"type": "array", "items": {"type": "string"}} + assert props["extra"] == {"type": "object"} + + +def test_default_function_tool_headers_overlay_R9(): + input_tools = [ + tools.function( + name="send", + url="https://api.example.com", + parameters={"type": "object", "properties": {}, "required": []}, + ) + ] + out = normalize_tools( + input_tools, + default_function_tool_headers={"X-Tenant": "acme"}, + ) + assert out is not None + assert out[0]["headers"] == {"X-Tenant": "acme"} + + +def test_per_tool_headers_win_on_conflict(): + input_tools = [ + tools.function( + name="send", + url="https://api.example.com", + parameters={"type": "object", "properties": {}, "required": []}, + headers={"X-Tenant": "beta"}, + ) + ] + out = normalize_tools( + input_tools, + default_function_tool_headers={"X-Tenant": "acme", "X-Trace": "t1"}, + ) + assert out is not None + assert out[0]["headers"] == {"X-Tenant": "beta", "X-Trace": "t1"} + + +def test_default_defaults_promoted_to_properties_R9_R12(): + input_tools = [ + tools.function( + name="send", + url="https://api.example.com", + parameters={"type": "object", "properties": {}, "required": []}, + ) + ] + out = normalize_tools( + input_tools, + default_function_tool_defaults={"tenant_id": "t_xyz"}, + ) + assert out is not None + assert out[0]["defaults"] == {"tenant_id": "t_xyz"} + assert out[0]["parameters"]["properties"]["tenant_id"] == {"type": "string"} + + +def test_non_function_tools_passthrough(): + input_tools = [tools.platform("parallel_search"), tools.resource("sandbox")] + out = normalize_tools( + input_tools, + default_function_tool_headers={"X-Tenant": "acme"}, + ) + assert out is not None + assert out[0] == {"id": "parallel_search", "type": "platform"} + assert out[1] == {"id": "sandbox", "type": "resource"} diff --git a/tests/test_run_split.py b/tests/test_run_split.py new file mode 100644 index 0000000..516cd5e --- /dev/null +++ b/tests/test_run_split.py @@ -0,0 +1,248 @@ +"""Tests for the run / run_and_wait split (R18) and client overlay options (R9, R13).""" + +import json +import warnings +from unittest.mock import patch + +from subconscious import Subconscious, tools +from subconscious.types import RunInput, RunOptions + +try: + from pydantic import BaseModel # type: ignore[import-not-found] + + HAS_PYDANTIC = True +except Exception: # pragma: no cover + HAS_PYDANTIC = False + + +class _FakeJSONResponse: + def __init__(self, body, status_code=200): + self._body = body + self.status_code = status_code + self.text = json.dumps(body) + self.headers = {"Content-Type": "application/json"} + + def json(self): + return self._body + + +def test_run_returns_immediately_R18(): + captured = {} + + def fake_request(method, url, headers=None, json=None): + captured["method"] = method + captured["url"] = url + captured["body"] = json + return _FakeJSONResponse({"runId": "run_abc"}) + + with patch("subconscious.client.requests.request", side_effect=fake_request): + client = Subconscious(api_key="k") + run = client.run(engine="tim-claude", input={"instructions": "hi"}) + + assert run.run_id == "run_abc" + assert run.status is None + assert run.result is None + assert captured["method"] == "POST" + assert captured["url"].endswith("/runs") + + +def test_run_and_wait_polls_until_terminal_R18(): + poll_count = {"n": 0} + + def fake_request(method, url, headers=None, json=None): + if method == "POST": + return _FakeJSONResponse({"runId": "run_xyz"}) + # GET poll + poll_count["n"] += 1 + if poll_count["n"] < 2: + return _FakeJSONResponse({"runId": "run_xyz", "status": "running"}) + return _FakeJSONResponse( + { + "runId": "run_xyz", + "status": "succeeded", + "result": {"answer": "done", "reasoning": None}, + } + ) + + with patch("subconscious.client.requests.request", side_effect=fake_request): + client = Subconscious(api_key="k") + run = client.run_and_wait( + engine="tim-claude", + input={"instructions": "hi"}, + poll_options={"interval_ms": 1}, + ) + + assert run.status == "succeeded" + assert run.result is not None + assert run.result.answer == "done" + assert poll_count["n"] >= 2 + + +def test_run_options_await_completion_routes_through_run_and_wait(): + """Back-compat: passing options.await_completion=True transparently + routes through run_and_wait and emits a one-shot DeprecationWarning.""" + poll_count = {"n": 0} + + def fake_request(method, url, headers=None, json=None): + if method == "POST": + return _FakeJSONResponse({"runId": "run_legacy"}) + poll_count["n"] += 1 + return _FakeJSONResponse( + { + "runId": "run_legacy", + "status": "succeeded", + "result": {"answer": "ok", "reasoning": None}, + } + ) + + # Reset the one-shot guard so the test sees the warning. + import subconscious.client as _client_mod + + _client_mod._AWAIT_COMPLETION_WARNING_SHOWN = False + + with patch("subconscious.client.requests.request", side_effect=fake_request): + client = Subconscious(api_key="k") + with warnings.catch_warnings(record=True) as caught: + warnings.simplefilter("always") + run = client.run( + engine="tim-claude", + input={"instructions": "hi"}, + options={"await_completion": True}, + poll_options={"interval_ms": 1}, + ) + + assert run.status == "succeeded" + assert run.result is not None + assert run.result.answer == "ok" + assert poll_count["n"] >= 1 + deprecation = [w for w in caught if issubclass(w.category, DeprecationWarning)] + assert any("await_completion" in str(w.message) for w in deprecation) + + +def test_run_options_dataclass_form_also_works(): + """The dataclass `RunOptions(await_completion=True)` form is equivalent + to the dict form for back-compat.""" + + def fake_request(method, url, headers=None, json=None): + if method == "POST": + return _FakeJSONResponse({"runId": "r"}) + return _FakeJSONResponse( + {"runId": "r", "status": "succeeded", "result": {"answer": "", "reasoning": None}} + ) + + import subconscious.client as _client_mod + + _client_mod._AWAIT_COMPLETION_WARNING_SHOWN = False + + with patch("subconscious.client.requests.request", side_effect=fake_request): + client = Subconscious(api_key="k") + with warnings.catch_warnings(): + warnings.simplefilter("ignore", DeprecationWarning) + run = client.run( + engine="tim-claude", + input={"instructions": "hi"}, + options=RunOptions(await_completion=True), + poll_options={"interval_ms": 1}, + ) + + assert run.status == "succeeded" + + +def test_run_without_await_completion_still_fire_and_forgets(): + """`run()` without options behaves the same as the new explicit + fire-and-forget mode.""" + polled = {"hit": False} + + def fake_request(method, url, headers=None, json=None): + if method == "POST": + return _FakeJSONResponse({"runId": "r"}) + polled["hit"] = True + return _FakeJSONResponse({"runId": "r", "status": "succeeded"}) + + with patch("subconscious.client.requests.request", side_effect=fake_request): + client = Subconscious(api_key="k") + run = client.run(engine="tim-claude", input={"instructions": "hi"}, options={}) + + assert run.run_id == "r" + assert run.status is None + assert polled["hit"] is False + + +def test_default_function_tool_headers_overlay_R9(): + captured = {} + + def fake_request(method, url, headers=None, json=None): + captured["body"] = json + return _FakeJSONResponse({"runId": "r"}) + + with patch("subconscious.client.requests.request", side_effect=fake_request): + client = Subconscious( + api_key="k", + default_function_tool_headers={"Authorization": "Bearer xyz"}, + ) + client.run( + engine="tim-claude", + input={ + "instructions": "hi", + "tools": [ + tools.function( + name="send", + url="https://api.example.com", + parameters={"type": "object", "properties": {}, "required": []}, + ) + ], + }, + ) + + sent_tools = captured["body"]["input"]["tools"] + assert sent_tools[0]["headers"] == {"Authorization": "Bearer xyz"} + + +def test_runinput_dataclass_input_supports_resources_skills_agent_id_R1(): + captured = {} + + def fake_request(method, url, headers=None, json=None): + captured["body"] = json + return _FakeJSONResponse({"runId": "r"}) + + with patch("subconscious.client.requests.request", side_effect=fake_request): + client = Subconscious(api_key="k") + client.run( + engine="tim-claude", + input=RunInput( + instructions="hi", + images=["data:image/png;base64,iVBORw0K"], + skills=["skill_42"], + agent_id="agent_x", + ), + ) + + body_input = captured["body"]["input"] + assert body_input["images"] == ["data:image/png;base64,iVBORw0K"] + assert body_input["skills"] == ["skill_42"] + assert body_input["agentId"] == "agent_x" + + +if HAS_PYDANTIC: + + def test_run_accepts_pydantic_answer_format_R13(): + captured = {} + + def fake_request(method, url, headers=None, json=None): + captured["body"] = json + return _FakeJSONResponse({"runId": "r"}) + + class Result(BaseModel): # type: ignore[misc] + summary: str + score: float + + with patch("subconscious.client.requests.request", side_effect=fake_request): + client = Subconscious(api_key="k") + client.run( + engine="tim-claude", + input={"instructions": "hi", "answerFormat": Result}, + ) + + af = captured["body"]["input"]["answerFormat"] + assert af["type"] == "object" + assert set(af["properties"].keys()) == {"summary", "score"} diff --git a/tests/test_stream_events.py b/tests/test_stream_events.py new file mode 100644 index 0000000..1694ed1 --- /dev/null +++ b/tests/test_stream_events.py @@ -0,0 +1,240 @@ +"""Tests for Stream Events v2 parsing in the Python SDK (R5, R8, R15, R16).""" + +from typing import Generator, List, Optional, Tuple +from unittest.mock import patch + +from subconscious import Run, Subconscious, StreamEvent +from subconscious.types import ( + DeltaEvent, + DoneEvent, + ErrorEvent, + ResultEvent, + StartedEvent, + ToolCallEvent, +) + + +class _FakeResponse: + """Minimal stand-in for ``requests.Response`` used in stream tests.""" + + def __init__(self, lines, status_code=200, headers=None): + self._lines = lines + self.status_code = status_code + self.headers = headers or {} + self.text = "" + + def iter_lines(self, decode_unicode=True): # noqa: D401 + for line in self._lines: + yield line + + def json(self): + return {} + + +def _frames_to_lines(frames: List[str]) -> List[str]: + """Split SSE frames into the per-line iteration ``requests`` would produce.""" + out: List[str] = [] + for f in frames: + for line in f.split("\n"): + out.append(line) + return out + + +def _consume_with_return( + stream: Generator[StreamEvent, None, Optional[Run]] +) -> Tuple[List[StreamEvent], Optional[Run]]: + events: List[StreamEvent] = [] + while True: + try: + events.append(next(stream)) + except StopIteration as exc: + return events, exc.value + + +def test_stream_emits_started_first_R8(): + """Canonical wire shape uses camelCase ``runId`` (matches REST responses).""" + frames = [ + "event: meta\ndata: {\"runId\":\"run_abc\"}\n\n", + "data: {\"choices\":[{\"delta\":{\"content\":\"hi\"}}]}\n\n", + "event: result\ndata: {\"result\":{\"answer\":\"hi\",\"reasoning\":null}}\n\n", + "data: [DONE]\n\n", + ] + fake = _FakeResponse(_frames_to_lines(frames), headers={"x-run-id": "run_abc"}) + + with patch("subconscious.client.requests.post", return_value=fake): + client = Subconscious(api_key="k") + events = list(client.stream(engine="tim-claude", input={"instructions": "hi"})) + + assert isinstance(events[0], StartedEvent) + assert events[0].run_id == "run_abc" + assert isinstance(events[-1], DoneEvent) + types = [type(e) for e in events] + assert DeltaEvent in types + assert ResultEvent in types + + +def test_stream_back_compat_legacy_run_id_snake_case(): + """Older API builds emitted snake_case ``run_id``. SDKs MUST keep + accepting the legacy shape for at least one minor release.""" + frames = [ + "event: started\ndata: {\"run_id\":\"r_legacy\"}\n\n", + "event: result\ndata: {\"result\":{\"answer\":\"ok\",\"reasoning\":null}}\n\n", + "data: [DONE]\n\n", + ] + fake = _FakeResponse(_frames_to_lines(frames), headers={"x-run-id": "r_legacy"}) + with patch("subconscious.client.requests.post", return_value=fake): + client = Subconscious(api_key="k") + events = list(client.stream(engine="tim-claude", input={"instructions": "hi"})) + + assert isinstance(events[0], StartedEvent) + assert events[0].run_id == "r_legacy" + + +def test_stream_parses_canceled_error_code_one_l(): + """Canonical spelling is ``canceled`` (one ``l``) — matches RunStatus.""" + frames = [ + "event: started\ndata: {\"runId\":\"r1\"}\n\n", + "event: error\ndata: {\"code\":\"canceled\"," + "\"message\":\"The run was canceled\"}\n\n", + "data: [DONE]\n\n", + ] + fake = _FakeResponse(_frames_to_lines(frames), headers={"x-run-id": "r1"}) + with patch("subconscious.client.requests.post", return_value=fake): + client = Subconscious(api_key="k") + events = list(client.stream(engine="tim-claude", input={"instructions": "hi"})) + + errors = [e for e in events if isinstance(e, ErrorEvent)] + assert len(errors) == 1 + assert errors[0].code == "canceled" + + +def test_stream_parses_result_with_usage_R15(): + frames = [ + "event: started\ndata: {\"run_id\":\"r1\"}\n\n", + "event: result\ndata: {\"result\":{\"answer\":\"42\",\"reasoning\":null}," + "\"usage\":{\"inputTokens\":1,\"outputTokens\":2}}\n\n", + "data: [DONE]\n\n", + ] + fake = _FakeResponse(_frames_to_lines(frames), headers={"x-run-id": "r1"}) + + with patch("subconscious.client.requests.post", return_value=fake): + client = Subconscious(api_key="k") + events = list(client.stream(engine="tim-claude", input={"instructions": "hi"})) + + result_events = [e for e in events if isinstance(e, ResultEvent)] + assert len(result_events) == 1 + re = result_events[0] + assert re.result is not None + assert re.result.answer == "42" + assert re.usage is not None + assert re.usage.input_tokens == 1 + assert re.usage.output_tokens == 2 + + +def test_stream_parses_tool_call_R15(): + frames = [ + "event: started\ndata: {\"run_id\":\"r1\"}\n\n", + "event: tool_call\ndata: {\"call\":{\"tool_name\":\"web_search\"," + "\"parameters\":{\"q\":\"x\"},\"tool_result\":{\"docs\":[]}}}\n\n", + "event: result\ndata: {\"result\":{\"answer\":\"done\",\"reasoning\":null}}\n\n", + "data: [DONE]\n\n", + ] + fake = _FakeResponse(_frames_to_lines(frames), headers={"x-run-id": "r1"}) + + with patch("subconscious.client.requests.post", return_value=fake): + client = Subconscious(api_key="k") + events = list(client.stream(engine="tim-claude", input={"instructions": "hi"})) + + tool_calls = [e for e in events if isinstance(e, ToolCallEvent)] + assert len(tool_calls) == 1 + assert tool_calls[0].call is not None + assert tool_calls[0].call.tool_name == "web_search" + + +def test_stream_parses_error_with_required_code_R5(): + frames = [ + "event: started\ndata: {\"run_id\":\"r1\"}\n\n", + "event: error\ndata: {\"code\":\"rate_limited\",\"message\":\"slow down\"," + "\"details\":{\"retryAfterMs\":1000}}\n\n", + "data: [DONE]\n\n", + ] + fake = _FakeResponse(_frames_to_lines(frames), headers={"x-run-id": "r1"}) + + with patch("subconscious.client.requests.post", return_value=fake): + client = Subconscious(api_key="k") + events = list(client.stream(engine="tim-claude", input={"instructions": "hi"})) + + errors = [e for e in events if isinstance(e, ErrorEvent)] + assert len(errors) == 1 + err = errors[0] + assert err.code == "rate_limited" + assert err.message == "slow down" + assert err.details == {"retryAfterMs": 1000} + + +def test_stream_returns_failed_run_after_error_event(): + frames = [ + "event: started\ndata: {\"runId\":\"r_failed\"}\n\n", + "event: error\ndata: {\"code\":\"rate_limited\",\"message\":\"slow down\"}\n\n", + "data: [DONE]\n\n", + ] + fake = _FakeResponse(_frames_to_lines(frames), headers={"x-run-id": "r_failed"}) + + with patch("subconscious.client.requests.post", return_value=fake): + client = Subconscious(api_key="k") + _, returned = _consume_with_return( + client.stream(engine="tim-claude", input={"instructions": "hi"}) + ) + + assert returned is not None + assert returned.run_id == "r_failed" + assert returned.status == "failed" + + +def test_stream_returns_succeeded_run_with_result_and_usage(): + frames = [ + "event: started\ndata: {\"runId\":\"r_ok\"}\n\n", + "event: result\ndata: {\"result\":{\"answer\":\"42\",\"reasoning\":null}," + "\"usage\":{\"inputTokens\":1,\"outputTokens\":2}}\n\n", + "data: [DONE]\n\n", + ] + fake = _FakeResponse(_frames_to_lines(frames), headers={"x-run-id": "r_ok"}) + + with patch("subconscious.client.requests.post", return_value=fake): + client = Subconscious(api_key="k") + _, returned = _consume_with_return( + client.stream(engine="tim-claude", input={"instructions": "hi"}) + ) + + assert returned is not None + assert returned.run_id == "r_ok" + assert returned.status == "succeeded" + assert returned.result is not None + assert returned.result.answer == "42" + assert returned.usage is not None + assert returned.usage.input_tokens == 1 + assert returned.usage.output_tokens == 2 + + +def test_observe_reads_run_stream_endpoint_R16(): + frames = [ + "event: started\ndata: {\"run_id\":\"r_obs\"}\n\n", + "data: {\"choices\":[{\"delta\":{\"content\":\"replay\"}}]}\n\n", + "event: result\ndata: {\"result\":{\"answer\":\"replay\",\"reasoning\":null}}\n\n", + "data: [DONE]\n\n", + ] + fake = _FakeResponse(_frames_to_lines(frames)) + + with patch("subconscious.client.requests.get", return_value=fake) as mocked_get: + client = Subconscious(api_key="k") + events = list(client.observe("r_obs")) + + # Endpoint: GET /v1/runs/r_obs/stream + call_url = mocked_get.call_args.args[0] + assert call_url.endswith("/runs/r_obs/stream") + + types = [type(e) for e in events] + assert StartedEvent in types + assert DeltaEvent in types + delta = next(e for e in events if isinstance(e, DeltaEvent)) + assert delta.content == "replay"