Get a server and client running locally in under 5 minutes.
- Python 3.11+
# Edge server (no Ray dependency)
pip install inferential
# Or with Ray Serve for distributed serving
pip install inferential[ray]Save as server.py:
import asyncio
import numpy as np
from inferential import Server, LocalDispatcher
def mock_policy(obs: dict) -> dict:
dim = 7
for v in obs.values():
if isinstance(v, np.ndarray) and v.ndim == 1:
dim = v.shape[0]
break
return {"actions": np.random.randn(dim).astype(np.float32)}
dispatcher = LocalDispatcher({"policy-v2": mock_policy})
server = Server(bind="tcp://*:5555", dispatcher=dispatcher)
@server.on_metric
def log(name, value, labels):
if name == "inference_latency_ms":
client = labels.get("client", "?")
print(f" [{client}] {value:.1f}ms")
asyncio.run(server.run())ray start --headSave as server.py:
import asyncio
import numpy as np
from ray import serve
from inferential import Server
@serve.deployment
class MockPolicy:
def infer(self, obs: dict) -> dict:
dim = 7
for v in obs.values():
if isinstance(v, np.ndarray) and v.ndim == 1:
dim = v.shape[0]
break
return {"actions": np.random.randn(dim).astype(np.float32)}
serve.run(MockPolicy.bind(), name="policy-v2")
server = Server(bind="tcp://*:5555", models=["policy-v2"])
@server.on_metric
def log(name, value, labels):
if name == "inference_latency_ms":
client = labels.get("client", "?")
print(f" [{client}] {value:.1f}ms")
asyncio.run(server.run())Run it:
python server.pyIn a second terminal, save as client.py:
import time
import numpy as np
from inferential import Connection
conn = Connection(server="tcp://localhost:5555", client_id="demo-01", client_type="sim")
# priority=0 is highest; lower-priority clients yield GPU slots under contention
model = conn.model("policy-v2", latency_budget_ms=50.0, priority=0)
for step in range(10):
state = np.random.randn(7).astype(np.float32)
model.observe(urgency=0.5, state=state)
result = model.get_result(timeout_ms=100)
if result is not None:
actions = result["actions"]
latency = result["inference_latency_ms"]
print(f"step {step}: actions={actions[:3]}... latency={latency:.1f}ms")
time.sleep(0.05)
conn.close()Run it:
python client.pyExpected output:
step 0: actions=[ 0.42 -1.07 0.83]... latency=11.2ms
step 1: actions=[-0.31 0.55 1.22]... latency=1.4ms
step 2: actions=[ 0.91 -0.18 0.03]... latency=1.2ms
...
The first request may be slower (cold start). Subsequent requests settle around 1-2ms for this mock model.
On the server terminal you'll see metric callbacks firing:
[demo-01] 11.2ms
[demo-01] 1.4ms
[demo-01] 1.2ms
The SDK also provides AsyncConnection for asyncio-based control loops. Save as async_client.py:
import asyncio
import numpy as np
from inferential import AsyncConnection
async def main():
async with AsyncConnection(
server="tcp://localhost:5555", client_id="async-01", client_type="sim"
) as conn:
model = conn.model("policy-v2", latency_budget_ms=50.0, priority=0)
for step in range(10):
state = np.random.randn(7).astype(np.float32)
await model.observe(urgency=0.5, state=state)
result = await model.get_result(timeout_ms=100)
if result is not None:
actions = result["actions"]
latency = result["inference_latency_ms"]
print(f"step {step}: actions={actions[:3]}... latency={latency:.1f}ms")
await asyncio.sleep(0.05)
asyncio.run(main())Run it:
python async_client.pyThe API mirrors the sync client — observe() and get_result() are await-ed instead of blocking. AsyncConnection supports async with for automatic cleanup.
# Stop the server with Ctrl+C
ray stop # only if using Ray ServeFor production use with multiple models (e.g. a manipulation policy and a telemetry stream), use model_deadline + per-model concurrency limits:
from inferential.config.schema import InferentialConfig, ModelConfig, ModelsConfig
config = InferentialConfig(
models=ModelsConfig(
known={
"manipulation-policy": ModelConfig(max_inflight=4),
"telemetry": ModelConfig(max_inflight=1),
},
),
)
config.transport.bind = "tcp://*:5555"
config.scheduling.strategy = "model_deadline"
config.scheduling.pipeline_dispatch.enabled = True
server = Server(config=config, models=["manipulation-policy", "telemetry"])max_inflight controls per-model dispatch concurrency. With Ray Serve, match it to num_replicas. With LocalDispatcher, set it to 1 for sequential GPU inference or higher if your model supports concurrent calls.
- Architecture — system design, wire protocol, schedulers, configuration reference
- Examples — multi-language client demos, server extensions, custom schedulers
- C++ SDK · Rust SDK — other language clients