Skip to content

Feat/webrtc transport with Sam config merged on top #2595

Draft
spomichter wants to merge 99 commits into
mainfrom
feat/webrtc-transport-rebase
Draft

Feat/webrtc transport with Sam config merged on top #2595
spomichter wants to merge 99 commits into
mainfrom
feat/webrtc-transport-rebase

Conversation

@spomichter

Copy link
Copy Markdown
Contributor

Problem

Closes DIM-XXX

Solution

How to Test

Contributor License Agreement

  • I have read and approved the CLA.

ruthwikdasyam and others added 30 commits May 7, 2026 15:28
…e SFU

Implements a new pubsub transport backed by WebRTC DataChannels over
Cloudflare's Realtime SFU. Two new classes in
dimos/protocol/pubsub/impl/webrtcpubsub.py:

- CloudflareSession: manages the WebRTC PeerConnection lifecycle.
  Opens two CF sessions (publisher + subscriber) so a single process
  can do loopback pubsub. Runs aiortc on a dedicated background
  asyncio thread with its own ThreadPoolExecutor (so we don't leak
  asyncio_N worker threads). Uses negotiated=True placeholder DCs
  with id=100 during transport establishment to avoid stream-id
  collisions with CF-assigned ids.

- WebRTCPubSub: bytes-on-the-wire pubsub facade matching the
  LCMPubSubBase / BytesSharedMemory interface (string topics, bytes
  payloads). Lazily creates pub/sub DataChannel pairs on first
  publish/subscribe per topic.

Also adds:
- WebRTCTransport in dimos/core/transport.py (mirrors LCMTransport
  pattern, no encoding - bytes only).
- WebRTC benchmark testcase in
  dimos/protocol/pubsub/benchmark/testdata.py, gated on aiortc +
  CF_TELEOP_APP_ID / CF_TELEOP_APP_SECRET env vars.
- Integration test in
  dimos/protocol/pubsub/impl/test_webrtcpubsub.py covering basic
  pub/sub, latency, and throughput (all live tests skip without CF
  credentials).
- aiortc + httpx as new 'webrtc' optional extra in pyproject.toml.

Live benchmark (us-east-2 -> CF edge):
- 64-256B:  ~10K msgs/s, 0% loss
- 1KiB:     ~7K msgs/s, 0% loss
- >= 64KiB: dropped (above SCTP message size)
- Median single-RTT: ~2.5 ms
…sport

- Add BrokerProvider: DataChannelProvider that works through the hosted
  teleop broker (dimensional-teleop) instead of directly with CF credentials.
  Handles session registration, heartbeat loop, and DataChannel creation
  when an operator joins via the broker's bridge-datachannel API.

- Extend WebRTCTransport with optional msg_type parameter for typed LCM
  encode/decode with fingerprint-based filtering. Multiple transports can
  share a single multiplexed DataChannel and each receives only its type.

- Add hosted teleop blueprints (dimos/teleop/hosted/) demonstrating the
  module-free architecture: make_teleop_hosted_go2() uses pure transport
  (zero modules), make_teleop_hosted_go2_scaled() adds a thin
  TeleopScalerModule for speed scaling only.

- Add unit tests for typed mode, fingerprint filtering, multiplexed
  dispatch, and BrokerProvider credential validation.
- Rebase on main and regenerate uv.lock (resolve conflict)
- Add _LoopbackProvider (in-process, no network) to benchmark testdata
- Enables local WebRTC transport benchmarking without CF credentials
- All 12 message sizes pass locally (2.78s total)
The previous lock regen dropped the `exclude-newer-span` marker, leaving
only the frozen `exclude-newer` timestamp. uv then treats every resolve as
"cooldown was newly added" and forces a re-resolve against today minus 7
days — which currently excludes md-babel-py 1.2.0 (published 2026-05-15)
and breaks `uv sync --extra all` / `uv lock`.

Re-adding the span line tells uv the lock was generated with P7D
semantics, so the existing pinned versions are honored.
- Remove __init__.py files (project policy: no init files)
- Remove section markers from test_webrtcpubsub.py
- Regenerate all_blueprints.py (adds TeleopScalerModule)
- Fix WebRTCTransport.__reduce__ to preserve msg_type across pickle
- Fix CloudflareProvider.publish() race: snapshot loop ref before use
- Fix CloudflareProvider.subscribe() race: check sub_channels inside lock
- Add comment clarifying TwistStamped→Twist type safety in blueprint
- Add return type annotations to CloudflareProvider event handlers
- Fix type: ignore codes to match actual mypy errors (attr-defined)
- Add type annotation to __setstate__ dict parameter
- Add type: ignore[arg-type] for WebRTCPubSub→PubSub duck typing in benchmark
- Remove TwistStamped subclass comment from blueprint
ruthwikdasyam and others added 27 commits June 9, 2026 22:23
…ew/webrtc-transport

# Conflicts:
#	dimos/robot/all_blueprints.py
#	pyproject.toml
#	uv.lock
…le provider configs

- Move webrtcpubsub + providers into protocol/pubsub/impl/webrtc/ with
  providers/spec.py (Provider protocol, ProviderConfig, AsyncProviderBase)
- ProviderConfig: picklable, hashable factory resolving to a per-process
  singleton provider — transports survive pickling into module workers and
  share one PeerConnection per process
- WebRTCTransport rebuilt on DimosMsg-bound typevar; CloudflareTransport
  subclass binds BrokerConfig for blueprint use
- Fingerprint filter now derives from the wire format (TwistStamped inherits
  Twist's fingerprint but encodes as LCM TwistStamped)
- BrokerProvider: operator rejoin via SCTP id tracking, heartbeat task held
  and cancelled on disconnect, X-Robot-API-Key auth, id=0 throwaway channel,
  publish() raises (broker is receive-only for now)
- CloudflareProvider: locking discipline, asyncio channel-creation lock,
  collision-safe DC names
- Benchmark: WebRTC case in the standard harness, env-overridable knobs
  (DIMOS_BENCH_DURATION_S / _MAX_MESSAGES / _RECEIVE_TIMEOUT_S)
- teleop-hosted-go2-transport: transport-only go2 blueprint (3 lines)
- Delete dimos/teleop/hosted (duplicate scaler), add webrtc extra to all
# Conflicts:
#	dimos/robot/all_blueprints.py
#	dimos/robot/test_all_blueprints.py
#	dimos/teleop/quest_hosted/README.md
#	dimos/teleop/quest_hosted/blueprints.py
#	dimos/teleop/quest_hosted/hosted_extensions.py
#	dimos/teleop/quest_hosted/hosted_teleop_module.py
#	dimos/teleop/quest_hosted/video_track.py
#	dimos/teleop/utils/recorder.py
#	dimos/teleop/utils/report.py
#	dimos/teleop/utils/stream_stats.py
#	dimos/teleop/utils/video_stats.py
#	docs/capabilities/teleoperation/hosted.md
…azy start

- delete impl/webrtc/__init__.py + providers/__init__.py (repo forbids
  non-root inits — was the only red CI check); importers now hit the
  source modules directly
- subscribe_all callbacks fired once per *subscription* on a topic, not
  once per message; the dispatcher is now attached to each topic exactly
  once (regression test added)
- WebRTCTransport.start() guards first-use init with a lock so two
  threads racing subscribe()/broadcast() can't construct two WebRTCPubSub
  wrappers and orphan subscribe_all state
- test fixture annotation: Generator[X, None, None] -> Iterator[X]
The main merge silently kept stale hunks from ruthwik's branch at our
old merge point (mem2 recorder warned_frames/recv_ts, TwistStamped seq
field) that he later dropped before #2411's final squash — non-
overlapping hunks, so git auto-merged without conflict. This PR never
intended to touch either file; both now match main byte-for-byte.
aiortc stands in for the browser operator (same join/bridge/negotiated-
channel protocol as the teleop web client); the robot side is the exact
CloudflareTransport the teleop-hosted-go2-transport blueprint binds.
Env-gated on TELEOP_API_KEY/TELEOP_ROBOT_ID/TELEOP_OPERATOR_TOKEN, tool-
marked. Verified against the live broker: 40/40 TwistStamped delivered
and decoded.
BrokerProvider now owns all three broker-bridged channels (topic ==
DataChannel name): cmd_unreliable + state_reliable inbound, and
state_reliable_back outbound via publish() — robot telemetry flows
through CloudflareTransport instead of raising. Heartbeat acks carry all
three SCTP ids; channels are (re)opened per operator join/leave/rejoin.
Publishes drop while no operator is connected (normal pubsub semantics).

HostedTeleopModule is deprecated: data planes are covered by
CloudflareTransport; the module remains only for video-track publishing
until BrokerProvider grows media support.

Live e2e (test_broker_e2e.py) now covers both directions against
teleop.dimensionalos.com: operator-sim -> cmd_unreliable -> transport
(40/40 decoded) and transport -> state_reliable_back -> operator-sim.
The broker derives the canonical robot identity from the API key
(dimensional-teleop now accepts session create without robot_id), so
BrokerProvider only requires TELEOP_API_KEY. An explicit robot_id is
still sent for consistency-checking when configured. Kills the
'TELEOP_ROBOT_ID or BrokerConfig.robot_id required' failure mode when
the env var doesn't reach module worker processes.
…replacement

The provider's session offer now always carries a sendonly video track
(CameraVideoTrack + propagate_bundle_candidates moved from quest_hosted
into the webrtc impl; the deprecated module imports them from here).
CloudflareVideoTransport feeds a blueprint's Image stream into that
track via the shared provider singleton, so hosted teleop with video is
a transport mapping on the base blueprint — no module wrapper:

    ("cmd_vel", Twist): CloudflareTransport("cmd_unreliable", TwistStamped),
    ("color_image", Image): CloudflareVideoTransport(),

TELEOP_ROBOT_ID also no longer required by the e2e test (broker derives
identity from the key). Live e2e now covers all three planes: cmd
40/40 decoded, telemetry back, and video RTP forwarded to the operator
(decode asserted best-effort: aiortc's receiver is lazy about PLI when
joining mid-stream; the browser requests keyframes immediately).
Note: frames must be flowing before the operator bridges — CF infers
the pulled track's kind from live RTP.
Mirrors the WebRTCTransport -> CloudflareTransport pattern: the generic
base works with any ProviderConfig whose provider exposes
set_video_frame() (clear NotImplementedError otherwise);
CloudflareVideoTransport is the thin BrokerConfig binding. Pickling
rebuilds the concrete subclass.
lcm_decode already verifies the wire fingerprint and raises ValueError
on other types, so the typed subscriber just attempts decode and skips
mismatches. Kills the default-instance-encoding hack (which existed only
because _get_packed_fingerprint disagrees with the wire format for
subclasses like TwistStamped). The subclass regression test stays,
now exercised through lcm_decode.
- aiortc/av/httpx/CameraVideoTrack now import on first provider use, not
  at module scope: the broker chain hangs off dimos.core.transport, so
  eager imports taxed every dimos process (~320ms; import now 510->207ms).
  WEBRTC_AVAILABLE uses find_spec. Verified live: CF loopback tests +
  broker connect both pass on the lazy path.
- tests trimmed against the LCM coverage bar: fingerprint_filter merged
  into multiple_types_multiplexed (same demux contract), kwargs-plumbing
  constructor test dropped (pickle test covers config equality),
  loopback_rtt dropped (DIY benchmarking — the benchmark harness owns
  RTT/throughput measurement).
- WebRTCVideoTransport: custom __reduce__ + rebuild helper deleted —
  its only state is a picklable frozen dataclass, default pickling
  preserves subclass and config (unlike WebRTCTransport, which keeps its
  rebuild path for the unpicklable init lock).
Nothing sets them; only DIMOS_BENCH_RECEIVE_TIMEOUT_S is exercised
(networked drain window for the webrtc/CF benchmark runs).
Co-authored-by: Paul Nechifor <paul@nechifor.net>
@codecov

codecov Bot commented Jun 25, 2026

Copy link
Copy Markdown

❌ 4 Tests Failed:

Tests completed Failed Passed Skipped
1989 4 1985 159
View the top 3 failed test(s) by shortest run time
dimos.protocol.pubsub.impl.webrtc.test_transport::test_transport_overrides_coerce_string_values
Stack Traces | 0.001s run time
def test_transport_overrides_coerce_string_values() -> None:
        """CLI/env overrides arrive as raw strings; non-str fields must coerce, not pass through."""
        bp = Blueprint(blueprints=()).transports({("topic", FakeLCMMsg): MockTransport("topic")})
>       new_bp = _apply_transport_overrides(bp, {"mock": {"count": "5"}})
E       NameError: name '_apply_transport_overrides' is not defined

bp         = Blueprint(blueprints=(), disabled_modules_tuple=(), transport_map=mappingproxy({('topic', <class 'dimos.protocol.pubsu...lobal_config_overrides=mappingproxy({}), remapping_map=mappingproxy({}), requirement_checks=(), configurator_checks=())

.../impl/webrtc/test_transport.py:280: NameError
dimos.robot.test_all_blueprints::test_blueprint_is_valid[unitree-go2-keyboard-teleop]
Stack Traces | 0.003s run time
blueprint_name = 'unitree-go2-keyboard-teleop'

    @pytest.mark.parametrize("blueprint_name", UBUNTU_BLUEPRINTS)
    def test_blueprint_is_valid(blueprint_name: str) -> None:
        """Validate blueprints that should import on the ubuntu-latest runner."""
>       _check_blueprint(blueprint_name)

blueprint_name = 'unitree-go2-keyboard-teleop'

dimos/robot/test_all_blueprints.py:104: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
dimos/robot/test_all_blueprints.py:80: in _check_blueprint
    blueprint = get_blueprint_by_name(blueprint_name)
        blueprint_name = 'unitree-go2-keyboard-teleop'
        message    = "name 'Twist' is not defined"
dimos/robot/get_all_blueprints.py:47: in get_blueprint_by_name
    module = __import__(module_path, fromlist=[attr])
        attr       = 'unitree_go2_keyboard_teleop'
        module_path = 'dimos.robot.unitree.go2.blueprints.basic.unitree_go2_keyboard_teleop'
        name       = 'unitree-go2-keyboard-teleop'
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

    #!.../usr/bin/env python3
    # Copyright 2025-2026 Dimensional Inc.
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #     http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    
    """Unitree Go2 keyboard teleop via ControlCoordinator (DDS/SDK2 path).
    
    WASD keys -> Twist -> coordinator twist_command -> UnitreeGo2TwistAdapter (DDS).
    
    Usage:
        dimos run unitree-go2-keyboard-teleop
    """
    
    from __future__ import annotations
    
    from dimos.control.components import HardwareComponent, HardwareType, make_twist_base_joints
    from dimos.control.coordinator import ControlCoordinator, TaskConfig
    from dimos.core.coordination.blueprints import autoconnect
    from dimos.robot.unitree.keyboard_teleop import KeyboardTeleop
    
    _go2_joints = make_twist_base_joints("go2")
    
    unitree_go2_keyboard_teleop = (
        autoconnect(
            ControlCoordinator.blueprint(
                hardware=[
                    HardwareComponent(
                        hardware_id="go2",
                        hardware_type=HardwareType.BASE,
                        joints=_go2_joints,
                        adapter_type="unitree_go2",
                        adapter_kwargs={"rage_mode": False},
                    ),
                ],
                tasks=[
                    TaskConfig(
                        name="vel_go2",
                        type="velocity",
                        joint_names=_go2_joints,
                        priority=10,
                    ),
                ],
            ),
            KeyboardTeleop.blueprint(),
        )
        .transports(
            {
>               ("twist_command", Twist): LCMTransport.spec("/cmd_vel", Twist),
                ("joint_state", JointState): LCMTransport.spec("/coordinator/joint_state", JointState),
            }
        )
        .remappings([(ControlCoordinator, "twist_command", "cmd_vel")])
        .global_config(obstacle_avoidance=True)
    )
E   NameError: name 'Twist' is not defined

ControlCoordinator = <class 'dimos.control.coordinator.ControlCoordinator'>
HardwareComponent = <class 'dimos.control.components.HardwareComponent'>
HardwareType = <enum 'HardwareType'>
KeyboardTeleop = <class 'dimos.robot.unitree.keyboard_teleop.KeyboardTeleop'>
TaskConfig = <class 'dimos.control.coordinator.TaskConfig'>
__builtins__ = <builtins>
__cached__ = '.../basic/__pycache__/unitree_go2_keyboard_teleop.cpython-312.pyc'
__doc__    = 'Unitree Go2 keyboard teleop via ControlCoordinator (DDS/SDK2 path).\n\nWASD keys -> Twist -> coordinator twist_command -> UnitreeGo2TwistAdapter (DDS).\n\nUsage:\n    dimos run unitree-go2-keyboard-teleop\n'
__file__   = '.../blueprints/basic/unitree_go2_keyboard_teleop.py'
__loader__ = <_frozen_importlib_external.SourceFileLoader object at 0xffeae6a37a70>
__name__   = 'dimos.robot.unitree.go2.blueprints.basic.unitree_go2_keyboard_teleop'
__package__ = 'dimos.robot.unitree.go2.blueprints.basic'
__spec__   = ModuleSpec(name='dimos.robot.unitree.go2.blueprints.basic.unitree_go2_keyboard_teleop', loader=<_frozen_importlib_exte...37a70>, origin='.../blueprints/basic/unitree_go2_keyboard_teleop.py')
_go2_joints = ['go2/vx', 'go2/vy', 'go2/wz']
annotations = _Feature((3, 7, 0, 'beta', 1), None, 16777216)
autoconnect = <function autoconnect at 0xffec54cf0c20>
make_twist_base_joints = <function make_twist_base_joints at 0xffeb38630e00>

.../blueprints/basic/unitree_go2_keyboard_teleop.py:58: NameError
dimos.core.test_mcp_integration.TestMCPErrorHandling::test_cli_call_tool_wrong_arg_format
Stack Traces | 0.024s run time
self = <dimos.core.test_mcp_integration.TestMCPErrorHandling object at 0xffeb30c0e7e0>
mcp_shared = <dimos.core.coordination.module_coordinator.ModuleCoordinator object at 0xffeae4fc3770>

    def test_cli_call_tool_wrong_arg_format(self, mcp_shared: ModuleCoordinator) -> None:
        """dimos mcp call with bad --arg format should error."""
        result = CliRunner().invoke(main, ["mcp", "call", "echo", "--arg", "no_equals_sign"])
>       assert result.exit_code == 2  # click ParamType validation error
E       AssertionError: assert 1 == 2
E        +  where 1 = <Result BadParameter('expected KEY=VALUE, got: no_equals_sign')>.exit_code

mcp_shared = <dimos.core.coordination.module_coordinator.ModuleCoordinator object at 0xffeae4fc3770>
result     = <Result BadParameter('expected KEY=VALUE, got: no_equals_sign')>
self       = <dimos.core.test_mcp_integration.TestMCPErrorHandling object at 0xffeb30c0e7e0>

dimos/core/test_mcp_integration.py:274: AssertionError
dimos.codebase_checks.test_no_all::test_no_all
Stack Traces | 4.74s run time
def test_no_all():
        """Fail if any file defines `__all__`."""
        dimos_dir = DIMOS_PROJECT_ROOT / "dimos"
        hits = find_all_definitions()
        if hits:
            listing = "\n".join(f"  - {p.relative_to(dimos_dir)}:{lineno}" for p, lineno in hits)
>           raise AssertionError(
                f"Found __all__ definition(s) in dimos/:\n{listing}\n\n"
                "__all__ is not allowed. We don't use `from x import *`, so __all__ "
                "lists serve no purpose and are tedious to maintain. Remove them. For "
                "an import that exists purely to be re-exported, use `# noqa: F401`."
            )
E           AssertionError: Found __all__ definition(s) in dimos/:
E             - .../webrtc/providers/broker.py:382
E             - .../webrtc/providers/cloudflare.py:323
E             - .../webrtc/providers/spec.py:211
E             - .../impl/webrtc/webrtcpubsub.py:115
E           
E           __all__ is not allowed. We don't use `from x import *`, so __all__ lists serve no purpose and are tedious to maintain. Remove them. For an import that exists purely to be re-exported, use `# noqa: F401`.

dimos_dir  = PosixPath('.../dimos/dimos/dimos')
hits       = [(PosixPath('.../dimos/dimos/dimos/.../webrtc/providers/broker.py'), 382), (PosixPath('...s/spec.py'), 211), (PosixPath('.../dimos/dimos/dimos/.../impl/webrtc/webrtcpubsub.py'), 115)]
listing    = '  - .../webrtc/providers/broker.py:382\n  - .../webrtc/providers/cloudflare.py:323\n  - .../webrtc/providers/spec.py:211\n  - .../impl/webrtc/webrtcpubsub.py:115'

dimos/codebase_checks/test_no_all.py:51: AssertionError

To view more test analytics, go to the Test Analytics Dashboard
📋 Got 3 mins? Take this short survey to help us improve Test Analytics.

@spomichter spomichter changed the title test Feat/webrtc transport rebase Feat/webrtc transport with Sam config merged on top Jun 25, 2026
Comment thread pyproject.toml

webrtc = [
# WebRTC DataChannel pubsub (Cloudflare Realtime SFU)
"aiortc>=1.14.0",

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You have added just two packages, but uv.lock has thousands of lines different.

I think you should reset uv.lock to main, and run uv lock again. I assume you must have upgraded all the packages producing such a large change.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants