Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
99 commits
Select commit Hold shift + click to select a range
41412df
mem2 recording fixes
ruthwikdasyam May 7, 2026
3d6adae
feat: hosted teleop module
ruthwikdasyam May 7, 2026
33c2571
misc: hosted tests
ruthwikdasyam May 7, 2026
fb43817
misc: no-cloudflare broker
ruthwikdasyam May 7, 2026
7b59671
fix: auth updates
ruthwikdasyam May 8, 2026
4e31bae
updated html + web address
ruthwikdasyam May 8, 2026
ed2d275
working commit
ruthwikdasyam May 9, 2026
b6914e9
feat/workingcommit/keyboard-teleop
ruthwikdasyam May 9, 2026
027dae0
feat/TwistStamped + recording msgs
ruthwikdasyam May 10, 2026
838a2c0
Merge branch 'main' into ruthwik/hosted-teleop
ruthwikdasyam May 18, 2026
a560a77
[autofix.ci] apply automated fixes
autofix-ci[bot] May 18, 2026
0ed9fcd
feat: added seq to Timestamped msg
ruthwikdasyam May 19, 2026
d5ea14e
no seperate sim blueprints needed
ruthwikdasyam May 19, 2026
06516f5
feat: increase workers
ruthwikdasyam May 19, 2026
f88746f
feat: benchmarking
ruthwikdasyam May 19, 2026
d18e495
fix: pre-commit errors
ruthwikdasyam May 19, 2026
b2b11d6
fix: logging spam
ruthwikdasyam May 19, 2026
e20bceb
added seq to cal loss
ruthwikdasyam May 19, 2026
996be8d
benchmarks for twist
ruthwikdasyam May 19, 2026
1b0829d
fix: pre-commit fixes
ruthwikdasyam May 19, 2026
38e51d4
[autofix.ci] apply automated fixes
autofix-ci[bot] May 19, 2026
2043f13
fix: notebook file import
ruthwikdasyam May 19, 2026
3a4cf68
fix: pre-commit
ruthwikdasyam May 19, 2026
b5dfe71
[autofix.ci] apply automated fixes
autofix-ci[bot] May 19, 2026
ef15f4a
feat(transport): add WebRTC DataChannel pubsub via Cloudflare Realtim…
spomichter May 11, 2026
646f85e
feat(transport): add BrokerProvider and typed LCM mode for WebRTCTran…
spomichter May 12, 2026
2c263eb
chore: regenerate uv.lock + add WebRTC loopback benchmark
spomichter May 19, 2026
10de4fe
fix(uv): restore exclude-newer-span = "P7D" to uv.lock
spomichter May 19, 2026
b8f81e0
fix: address CI failures and Greptile review comments
spomichter May 20, 2026
f821dca
fix: resolve mypy errors and remove unnecessary comment
spomichter May 20, 2026
3baba45
chore: remove unused CloudflareSession alias
spomichter May 20, 2026
0f01d93
chore: remove WebRTC from benchmark suite
spomichter May 20, 2026
95401f0
chore: mark CF integration tests as tool (excluded from CI)
spomichter May 20, 2026
7b6d67c
chore: remove trivial tests, apply tool marker per-test
spomichter May 20, 2026
23423e9
feat: added pingpong for time offset cal
ruthwikdasyam May 20, 2026
e3cf7e9
feat: video stream
ruthwikdasyam May 20, 2026
0544ba2
feat: videostream
ruthwikdasyam May 21, 2026
07b7f6c
feat: state_back channel and mediatrack added
ruthwikdasyam May 22, 2026
83ff3f2
fix: pre-commit
ruthwikdasyam May 22, 2026
88d389f
refactor(pubsub): WebRTCPubSub inherits AllPubSub[str, bytes]
spomichter May 22, 2026
89c4509
misc: debug lines removed
ruthwikdasyam May 23, 2026
ee1f1f0
[autofix.ci] apply automated fixes
autofix-ci[bot] May 23, 2026
79b5867
misc: remove debug and comments
ruthwikdasyam May 25, 2026
51ff352
feat: video streams + joystick controls
ruthwikdasyam May 26, 2026
1a22b87
fix: print video stats
ruthwikdasyam May 26, 2026
86ec4e9
feat: live stats compute in mocule
ruthwikdasyam May 29, 2026
69e1141
feat: all stats computation methods from utils
ruthwikdasyam May 29, 2026
494f3fd
docs: readme for hosted module setup
ruthwikdasyam May 29, 2026
4a4bd58
fix: pre-commit fixes
ruthwikdasyam May 29, 2026
3410213
Merge branch 'main' into ruthwik/hosted-teleop
ruthwikdasyam May 29, 2026
2781368
misc: remove dev_local_broker and data_analyzer files
ruthwikdasyam Jun 1, 2026
25a151b
docs: updated folder structure + files
ruthwikdasyam Jun 1, 2026
3cf4f75
fix: remove live logs
ruthwikdasyam Jun 2, 2026
8c3d772
feat: benchmarks added with recorder module
ruthwikdasyam Jun 2, 2026
a141dae
fix: pre-commit issues
ruthwikdasyam Jun 2, 2026
ffecde5
Merge remote-tracking branch 'origin/main' into ruthwik/hosted-teleop
ruthwikdasyam Jun 2, 2026
a3644ee
fix: local teleop blueprint changes
ruthwikdasyam Jun 2, 2026
de2c3ec
fix: max threshold for teleop
ruthwikdasyam Jun 2, 2026
af32aa9
fix: test_clock_sync move
ruthwikdasyam Jun 3, 2026
3397d3f
fix: split video_track code and sdpbundling
ruthwikdasyam Jun 3, 2026
f2684f4
Merge branch 'main' into ruthwik/hosted-teleop
ruthwikdasyam Jun 6, 2026
107f81b
fix: blueprints order
ruthwikdasyam Jun 6, 2026
31c4c67
fix: mpy issues
ruthwikdasyam Jun 6, 2026
75458a4
fix: blurprints
ruthwikdasyam Jun 6, 2026
41eb367
fix: video_stats msg to utils
ruthwikdasyam Jun 6, 2026
fe65780
fix: treat 0 as valid and float32-tolerant equality in teleop stats
ruthwikdasyam Jun 6, 2026
645c595
fix: route pong over state_reliable_back and fix restart event clear
ruthwikdasyam Jun 6, 2026
8a7aaf9
fix: pytest fixes
ruthwikdasyam Jun 6, 2026
559a587
fix: global config - no rerun
ruthwikdasyam Jun 10, 2026
7582f98
fix: strip shebangfrom import onlytest modules
ruthwikdasyam Jun 10, 2026
49cc362
fix: unwanted classifye2e function
ruthwikdasyam Jun 10, 2026
7ff1b38
fix: main remove
ruthwikdasyam Jun 10, 2026
a1b8918
fix: env-resolve teleop creds
ruthwikdasyam Jun 10, 2026
e368bd6
fix: logger warnings add
ruthwikdasyam Jun 10, 2026
90fc720
refactor: removed event loop, using basemodule loop
ruthwikdasyam Jun 10, 2026
7ecb6cb
remove png generate + record dir add
ruthwikdasyam Jun 10, 2026
2793a54
default with STATE_DIR
ruthwikdasyam Jun 10, 2026
cc53394
fix: restart compounding issue
ruthwikdasyam Jun 10, 2026
01b6755
fix: readme
ruthwikdasyam Jun 10, 2026
b60511f
Merge remote-tracking branch 'origin/ruthwik/hosted-teleop' into revi…
spomichter Jun 10, 2026
a87175e
refactor(webrtc): restructure transport into impl/webrtc with picklab…
spomichter Jun 10, 2026
ee4a7b5
chore(webrtc): drop unused operator API, correct CF message size cap …
spomichter Jun 10, 2026
56f9384
docs(webrtc): mark CloudflareProvider as test/benchmark only
spomichter Jun 10, 2026
9515e04
Merge remote-tracking branch 'origin/main' into review/webrtc-transport
spomichter Jun 11, 2026
dad9d10
fix(webrtc): drop __init__.py re-exports, dedup subscribe_all, lock l…
spomichter Jun 11, 2026
5810db5
fix(merge): restore memory2/module.py and TwistStamped.py to main
spomichter Jun 11, 2026
b4cbc05
test(webrtc): operator->broker->CF->transport end-to-end harness
spomichter Jun 11, 2026
83b59f6
feat(webrtc): bidirectional BrokerProvider, deprecate HostedTeleopModule
spomichter Jun 11, 2026
c44480f
docs(webrtc): blueprint runfile instructions for hosted-teleop-as-tra…
spomichter Jun 11, 2026
9181cce
feat(webrtc): TELEOP_ROBOT_ID no longer required
spomichter Jun 11, 2026
2e97255
feat(webrtc): video through BrokerProvider — full HostedTeleopModule …
spomichter Jun 11, 2026
831e80c
refactor(webrtc): provider-agnostic WebRTCVideoTransport base
spomichter Jun 11, 2026
80b639f
refactor(webrtc): drop _wire_fingerprint — demux by try-decode
spomichter Jun 11, 2026
6a5bf4d
chore(webrtc): lazy heavy imports, trim redundant tests
spomichter Jun 11, 2026
2968751
docs(webrtc): tighten find_spec comment
spomichter Jun 11, 2026
7236c7a
chore(bench): drop unused DURATION/MAX_MESSAGES env overrides
spomichter Jun 11, 2026
d84c27a
feat(webrtc): TURN relay fallback via broker credentials (#2488)
ruthwikdasyam Jun 14, 2026
7554b7d
Add transport config to blueprints (#2499)
Dreamsorcerer Jun 19, 2026
eaef4f2
Merge branch 'main' into feat/webrtc-transport-rebase
spomichter Jun 25, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 88 additions & 47 deletions dimos/control/blueprints/mobile.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,33 +73,29 @@ def _flowbase_twist_base(


# Mock holonomic twist base (3-DOF: vx, vy, wz)
coordinator_mock_twist_base = ControlCoordinator.blueprint(
hardware=[_mock_twist_base()],
tasks=[
TaskConfig(
name="vel_base",
type="velocity",
joint_names=_base_joints,
priority=10,
),
],
).remappings([(ControlCoordinator, "twist_command", "cmd_vel")])
coordinator_mock_twist_base = (
ControlCoordinator.blueprint(
hardware=[_mock_twist_base()],
tasks=[
TaskConfig(
name="vel_base",
type="velocity",
joint_names=_base_joints,
priority=10,
),
],
)
.transports(
{
("joint_state", JointState): LCMTransport.spec("/coordinator/joint_state", JointState),
("twist_command", Twist): LCMTransport.spec("/cmd_vel", Twist),
}
)
.remappings([(ControlCoordinator, "twist_command", "cmd_vel")])
)

# FlowBase holonomic twist base (3-DOF: vx, vy, wz) over Portal RPC
coordinator_flowbase = ControlCoordinator.blueprint(
hardware=[_flowbase_twist_base()],
tasks=[
TaskConfig(
name="vel_base",
type="velocity",
joint_names=_base_joints,
priority=10,
),
],
).remappings([(ControlCoordinator, "twist_command", "cmd_vel")])

# FlowBase + WASD pygame keyboard teleop in a single blueprint
coordinator_flowbase_keyboard_teleop = autoconnect(
coordinator_flowbase = (
ControlCoordinator.blueprint(
hardware=[_flowbase_twist_base()],
tasks=[
Expand All @@ -110,9 +106,40 @@ def _flowbase_twist_base(
priority=10,
),
],
),
KeyboardTeleop.blueprint(),
).remappings([(ControlCoordinator, "twist_command", "cmd_vel")])
)
.transports(
{
("joint_state", JointState): LCMTransport.spec("/coordinator/joint_state", JointState),
("twist_command", Twist): LCMTransport.spec("/cmd_vel", Twist),
}
)
.remappings([(ControlCoordinator, "twist_command", "cmd_vel")])
)

# FlowBase + WASD pygame keyboard teleop in a single blueprint
coordinator_flowbase_keyboard_teleop = (
autoconnect(
ControlCoordinator.blueprint(
hardware=[_flowbase_twist_base()],
tasks=[
TaskConfig(
name="vel_base",
type="velocity",
joint_names=_base_joints,
priority=10,
),
],
),
KeyboardTeleop.blueprint(),
)
.transports(
{
("twist_command", Twist): LCMTransport.spec("/cmd_vel", Twist),
("joint_state", JointState): LCMTransport.spec("/coordinator/joint_state", JointState),
}
)
.remappings([(ControlCoordinator, "twist_command", "cmd_vel")])
)

# FlowBase + Livox MID-360 + FastLio2 SLAM + nav stack with click-to-drive in Rerun. The velocity
# sink is ControlCoordinator + FlowBaseAdapter
Expand Down Expand Up @@ -170,21 +197,26 @@ def _flowbase_twist_base(
),
RerunWebSocketServer.blueprint(),
)
.transports(
{
("twist_command", Twist): LCMTransport.spec("/cmd_vel", Twist),
("joint_state", JointState): LCMTransport.spec("/coordinator/joint_state", JointState),
}
)
.remappings(
[
(FastLio2, "lidar", "registered_scan"),
# SimplePlanner / FarPlanner owns way_point — disconnect MovementManager's
# redundant pass-through copy (matches unitree-g1-nav-onboard).
(MovementManager, "way_point", "_mgr_way_point_unused"),
# MovementManager.cmd_vel publishes to LCM /cmd_vel by default; the
# coordinator's twist_command listens on the same name.
# coordinator's twist_command listens on the same topic.
(ControlCoordinator, "twist_command", "cmd_vel"),
]
)
.global_config(n_workers=8)
)


# Mock arm (7-DOF) + mock holonomic base (3-DOF)
_mock_arm_hw = HardwareComponent(
hardware_id="arm",
Expand All @@ -193,20 +225,29 @@ def _flowbase_twist_base(
adapter_type="mock",
)

coordinator_mobile_manip_mock = ControlCoordinator.blueprint(
hardware=[_mock_arm_hw, _mock_twist_base()],
tasks=[
TaskConfig(
name="traj_arm",
type="trajectory",
joint_names=_mock_arm_hw.joints,
priority=10,
),
TaskConfig(
name="vel_base",
type="velocity",
joint_names=_base_joints,
priority=10,
),
],
).remappings([(ControlCoordinator, "twist_command", "cmd_vel")])
coordinator_mobile_manip_mock = (
ControlCoordinator.blueprint(
hardware=[_mock_arm_hw, _mock_twist_base()],
tasks=[
TaskConfig(
name="traj_arm",
type="trajectory",
joint_names=_mock_arm_hw.joints,
priority=10,
),
TaskConfig(
name="vel_base",
type="velocity",
joint_names=_base_joints,
priority=10,
),
],
)
.transports(
{
("joint_state", JointState): LCMTransport.spec("/coordinator/joint_state", JointState),
("twist_command", Twist): LCMTransport.spec("/cmd_vel", Twist),
}
)
.remappings([(ControlCoordinator, "twist_command", "cmd_vel")])
)
49 changes: 44 additions & 5 deletions dimos/core/coordination/blueprints.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,14 @@
from types import MappingProxyType
from typing import TYPE_CHECKING, Any, Literal, Union, get_args, get_origin, get_type_hints

from pydantic import create_model
from pydantic import BaseModel, create_model

if TYPE_CHECKING:
from dimos.protocol.service.system_configurator.base import SystemConfigurator

from dimos.core.global_config import GlobalConfig
from dimos.core.module import ModuleBase, is_module_type
from dimos.core.stream import In, Out
from dimos.core.transport import PubSubTransport
from dimos.core.stream import In, Out, Transport
from dimos.spec.utils import Spec, is_spec
from dimos.utils.logging_config import setup_logger

Expand Down Expand Up @@ -141,6 +140,29 @@ def create(cls, module: type[ModuleBase], kwargs: dict[str, Any]) -> Self:
)


@dataclass(frozen=True)
class TransportSpec:
"""Deferred transport construction: a transport class plus its ctor args.
Blueprint authors declare transports via ``Cls.spec(...)`` so nothing is
constructed at blueprint-definition time. The coordinator materializes
specs at build time, once CLI/env/config overrides have resolved.
"""

cls: type[Transport[Any]]
args: tuple[Any, ...] = ()
kwargs: Mapping[str, Any] = field(default_factory=dict)

@property
def config_cls(self) -> type[BaseModel] | None:
# Set by transports that expose a pydantic config override surface
return self.cls._config_cls

def build(self, config: BaseModel | None = None) -> Transport[Any]:
extra = {"config": config} if config is not None else {}
return self.cls(*self.args, **self.kwargs, **extra)


# These fields cannot be pickled.
_PROXY_FIELDS = ("transport_map", "global_config_overrides", "remapping_map")

Expand All @@ -149,7 +171,7 @@ def create(cls, module: type[ModuleBase], kwargs: dict[str, Any]) -> Self:
class Blueprint:
blueprints: tuple[BlueprintAtom, ...]
disabled_modules_tuple: tuple[type[ModuleBase], ...] = field(default_factory=tuple)
transport_map: Mapping[tuple[str, type], PubSubTransport[Any]] = field(
transport_map: Mapping[tuple[str, type], TransportSpec] = field(
default_factory=lambda: MappingProxyType({})
)
global_config_overrides: Mapping[str, Any] = field(default_factory=lambda: MappingProxyType({}))
Expand Down Expand Up @@ -185,9 +207,22 @@ def config(self) -> type:
for b in self.blueprints
}
configs["g"] = (GlobalConfig | None, None)
transport_fields: dict[str, Any] = {}
seen: set[type] = set()
for spec in self.transport_map.values():
cls = spec.config_cls
if cls is None or cls in seen:
continue
seen.add(cls)
transport_fields[transport_config_name(cls)] = (cls | None, None)
if transport_fields:
transports_model = create_model(
"TransportsConfig", __config__={"extra": "forbid"}, **transport_fields
)
configs["transports"] = (transports_model | None, None)
return create_model("BlueprintConfig", __config__={"extra": "forbid"}, **configs) # type: ignore[call-overload,no-any-return]

def transports(self, transports: dict[tuple[str, type], Any]) -> "Blueprint":
def transports(self, transports: dict[tuple[str, type], TransportSpec]) -> "Blueprint":
return replace(self, transport_map=MappingProxyType({**self.transport_map, **transports}))

def global_config(self, **kwargs: Any) -> "Blueprint":
Expand Down Expand Up @@ -219,6 +254,10 @@ def active_blueprints(self) -> tuple[BlueprintAtom, ...]:
return tuple(bp for bp in self.blueprints if bp.module not in disabled)


def transport_config_name(cls: type) -> str:
return cls.__name__.removesuffix("Config").lower()


def autoconnect(*blueprints: Blueprint) -> Blueprint:
all_blueprints = tuple(_eliminate_duplicates([bp for bs in blueprints for bp in bs.blueprints]))
all_transports = dict( # type: ignore[var-annotated]
Expand Down
50 changes: 37 additions & 13 deletions dimos/core/coordination/module_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@
import threading
from typing import TYPE_CHECKING, Any, NamedTuple, cast

from dimos.core.coordination.blueprints import transport_config_name
from dimos.core.coordination.coordinator_rpc import CoordinatorRPC
from dimos.core.coordination.worker_manager import WorkerManager
from dimos.core.coordination.worker_manager_python import WorkerManagerPython
from dimos.core.global_config import GlobalConfig, global_config
from dimos.core.module import ModuleBase, ModuleSpec
from dimos.core.resource import Resource
from dimos.core.stream import Transport
from dimos.core.transport import LCMTransport, PubSubTransport, pLCMTransport
from dimos.spec.utils import is_spec, spec_annotation_compliance, spec_structural_compliance
from dimos.utils.generic import short_id
Expand Down Expand Up @@ -65,9 +67,9 @@ def __init__(
self._deployed_modules = {}
self._deployed_atoms: dict[type[ModuleBase], BlueprintAtom] = {}
self._resolved_module_refs: dict[tuple[type[ModuleBase], str], type[ModuleBase]] = {}
self._transport_registry: dict[tuple[str, type], PubSubTransport[Any]] = {}
self._transport_registry: dict[tuple[str, type], Transport[Any]] = {}
self._class_aliases: dict[type[ModuleBase], type[ModuleBase]] = {}
self._module_transports: dict[type[ModuleBase], dict[str, PubSubTransport[Any]]] = {}
self._module_transports: dict[type[ModuleBase], dict[str, Transport[Any]]] = {}
self._started = False
self._modules_lock = threading.RLock()
self._coordinator_rpc: CoordinatorRPC | None = None
Expand Down Expand Up @@ -249,7 +251,9 @@ def _send_on_system_modules(self) -> None:
if hasattr(module, "on_system_modules"):
module.on_system_modules(modules)

def _connect_streams(self, blueprint: Blueprint) -> None:
def _connect_streams(
self, blueprint: Blueprint, transports: Mapping[tuple[str, type], Transport[Any]]
) -> None:
streams: dict[tuple[str, type], list[tuple[type, str]]] = defaultdict(list)

for bp in blueprint.active_blueprints:
Expand All @@ -263,7 +267,9 @@ def _connect_streams(self, blueprint: Blueprint) -> None:
if key in self._transport_registry:
transport = self._transport_registry[key]
else:
transport = _get_transport_for(blueprint, remapped_name, stream_type)
transport = transports.get(key)
if transport is None:
transport = _get_transport_for(blueprint, remapped_name, stream_type)
self._transport_registry[key] = transport
for module, original_name in streams[key]:
instance = self.get_instance(module) # type: ignore[assignment]
Expand All @@ -290,6 +296,8 @@ def build(
blueprint_args = blueprint_args or {}
if "g" in blueprint_args:
global_config.update(**blueprint_args.pop("g"))
transport_overrides = blueprint_args.pop("transports", None) or {}
transports = _materialize_transports(blueprint, transport_overrides)

_run_configurators(blueprint)
_check_requirements(blueprint)
Expand All @@ -300,7 +308,7 @@ def build(
coordinator.start()

_deploy_all_modules(blueprint, coordinator, global_config, blueprint_args)
coordinator._connect_streams(blueprint)
coordinator._connect_streams(blueprint, transports)
_connect_module_refs(blueprint, coordinator)

coordinator.build_all_modules()
Expand Down Expand Up @@ -337,6 +345,8 @@ def _load_blueprint(
blueprint_args = blueprint_args or {}
if "g" in blueprint_args:
self._global_config.update(**blueprint_args.pop("g"))
transport_overrides = blueprint_args.pop("transports", None) or {}
transports = _materialize_transports(blueprint, transport_overrides)

# Scale worker pool.
n_extra = int(blueprint.global_config_overrides.get("n_workers", 0))
Expand All @@ -361,7 +371,7 @@ def _load_blueprint(
before = set(self._deployed_modules)

_deploy_all_modules(blueprint, self, self._global_config, blueprint_args)
self._connect_streams(blueprint)
self._connect_streams(blueprint, transports)
_connect_module_refs(blueprint, self, existing_modules=before)

new_modules = [proxy for cls, proxy in self._deployed_modules.items() if cls not in before]
Expand Down Expand Up @@ -572,15 +582,29 @@ def _is_name_unique(blueprint: Blueprint, name: str) -> bool:


def _get_transport_for(blueprint: Blueprint, name: str, stream_type: type) -> PubSubTransport[Any]:
transport = blueprint.transport_map.get((name, stream_type), None)
if transport:
return transport

use_pickled = getattr(stream_type, "lcm_encode", None) is None
topic = f"/{name}" if _is_name_unique(blueprint, name) else f"/{short_id()}"
transport = pLCMTransport(topic) if use_pickled else LCMTransport(topic, stream_type)
return pLCMTransport(topic) if use_pickled else LCMTransport(topic, stream_type)


return transport
def _materialize_transports(
blueprint: Blueprint, overrides: Mapping[str, Mapping[str, Any]]
) -> dict[tuple[str, type], Transport[Any]]:
"""Build the blueprint's declared transports, merging CLI/env config overrides.

WebRTC transports get a freshly constructed provider config from the
resolved ``transports.<name>.*`` overrides; everything else builds from the
spec as-is. Returns ready-to-use instances pickled into module workers.
"""
materialized: dict[tuple[str, type], Transport[Any]] = {}
for key, spec in blueprint.transport_map.items():
config = None
config_cls = spec.config_cls
if config_cls is not None:
sub = overrides.get(transport_config_name(config_cls), {})
config = config_cls(**sub)
materialized[key] = spec.build(config=config)
return materialized


def _verify_no_name_conflicts(blueprint: Blueprint) -> None:
Expand Down Expand Up @@ -621,7 +645,7 @@ def _verify_no_name_conflicts(blueprint: Blueprint) -> None:

def _verify_no_conflicts_with_existing(
blueprint: Blueprint,
existing_registry: dict[tuple[str, type], PubSubTransport[Any]],
existing_registry: dict[tuple[str, type], Transport[Any]],
) -> None:
"""Check that a new blueprint's streams don't conflict with already-registered transports."""
if not existing_registry:
Expand Down
Loading
Loading