diff --git a/dimos/manipulation/manipulation_module.py b/dimos/manipulation/manipulation_module.py
index 0d4dfff226..9b22dc0468 100644
--- a/dimos/manipulation/manipulation_module.py
+++ b/dimos/manipulation/manipulation_module.py
@@ -38,7 +38,13 @@
from dimos.core.core import rpc
from dimos.core.module import Module, ModuleConfig
from dimos.core.stream import In
-from dimos.manipulation.planning.factory import create_planning_specs, create_world
+from dimos.manipulation.planning.factory import (
+ KinematicsName,
+ PlannerName,
+ WorldBackend,
+ create_planning_specs,
+ create_world,
+)
from dimos.manipulation.planning.kinematics.config import (
ManipulationKinematicsConfig,
PinkKinematicsConfig,
@@ -105,13 +111,14 @@ class ManipulationModuleConfig(ModuleConfig):
robots: list[RobotModelConfig] = Field(default_factory=list)
planning_timeout: float = 10.0
+ world_backend: WorldBackend = "drake"
visualization: ManipulationVisualizationConfig = Field(
default_factory=NoManipulationVisualizationConfig
)
- planner_name: str = "rrt_connect" # "rrt_connect"
+ planner_name: PlannerName = "rrt_connect"
kinematics: ManipulationKinematicsConfig = Field(default_factory=PinkKinematicsConfig)
# Deprecated: use kinematics.backend instead.
- kinematics_name: str | None = None # "jacobian", "drake_optimization", or "pink"
+ kinematics_name: KinematicsName | None = None
# Floor plane Z height (meters). When set, a box obstacle is added at startup
# to prevent the planner from routing trajectories below this height.
# Set to None to disable.
@@ -186,9 +193,13 @@ def _initialize_planning(self) -> None:
logger.warning("No robots configured, planning disabled")
return
- world = create_world(visualization=self.config.visualization)
+ world = create_world(
+ backend=self.config.world_backend,
+ visualization=self.config.visualization,
+ )
planning_specs = create_planning_specs(
world=world,
+ world_backend=self.config.world_backend,
planner_name=self.config.planner_name,
kinematics_name=self.config.kinematics_name,
kinematics=self.config.kinematics,
diff --git a/dimos/manipulation/planning/factory.py b/dimos/manipulation/planning/factory.py
index c1eca50e90..cf00da0f1f 100644
--- a/dimos/manipulation/planning/factory.py
+++ b/dimos/manipulation/planning/factory.py
@@ -17,7 +17,7 @@
from __future__ import annotations
from dataclasses import dataclass
-from typing import TYPE_CHECKING, Any
+from typing import TYPE_CHECKING, Any, Literal, TypeAlias, get_args
from dimos.manipulation.planning.kinematics.config import (
DrakeOptimizationKinematicsConfig,
@@ -26,6 +26,7 @@
PinkKinematicsConfig,
kinematics_config_from_name,
)
+from dimos.manipulation.planning.spec.protocols import PlannerSpec
from dimos.manipulation.visualization.config import (
ManipulationVisualizationConfig,
NoManipulationVisualizationConfig,
@@ -35,7 +36,6 @@
from dimos.manipulation.planning.monitor.world_monitor import WorldMonitor
from dimos.manipulation.planning.spec.protocols import (
KinematicsSpec,
- PlannerSpec,
WorldSpec,
)
@@ -49,23 +49,68 @@ class PlanningSpecs:
planner: PlannerSpec
+WorldBackend: TypeAlias = Literal["drake", "roboplan"]
+PlannerName: TypeAlias = Literal["rrt_connect", "roboplan"]
+KinematicsName: TypeAlias = Literal["jacobian", "drake_optimization", "pink"]
+
+SUPPORTED_WORLD_BACKENDS = get_args(WorldBackend)
+SUPPORTED_PLANNERS = get_args(PlannerName)
+SUPPORTED_KINEMATICS = get_args(KinematicsName)
+
+_ROBOPLAN_PLANNER_REQUIRES_ROBOPLAN_WORLD = (
+ 'planner_name="roboplan" requires world_backend="roboplan"'
+)
+
+DEFAULT_KINEMATICS_NAME: KinematicsName = "pink"
+
+
+def validate_backend_combination(
+ *,
+ world_backend: str = "drake",
+ planner_name: str = "rrt_connect",
+ kinematics_name: str = "jacobian",
+) -> None:
+ """Validate manipulation backend choices before constructing the stack."""
+ if world_backend not in SUPPORTED_WORLD_BACKENDS:
+ raise ValueError(
+ f"Unknown backend: {world_backend}. Available: {list(SUPPORTED_WORLD_BACKENDS)}"
+ )
+ if planner_name not in SUPPORTED_PLANNERS:
+ raise ValueError(f"Unknown planner: {planner_name}. Available: {list(SUPPORTED_PLANNERS)}")
+ if kinematics_name not in SUPPORTED_KINEMATICS:
+ raise ValueError(
+ f"Unknown kinematics solver: {kinematics_name}. Available: {list(SUPPORTED_KINEMATICS)}"
+ )
+
+ if planner_name == "roboplan" and world_backend != "roboplan":
+ raise ValueError(_ROBOPLAN_PLANNER_REQUIRES_ROBOPLAN_WORLD)
+ if kinematics_name == "drake_optimization" and world_backend != "drake":
+ raise ValueError('kinematics_name="drake_optimization" requires world_backend="drake"')
+
+
def create_world(
backend: str = "drake",
visualization: ManipulationVisualizationConfig | None = None,
**kwargs: Any,
) -> WorldSpec:
- """Create a world instance. backend='drake' only for now."""
+ """Create a world instance for the selected planning backend."""
visualization = visualization or NoManipulationVisualizationConfig()
+ enable_viz = visualization.requires_world_visualization
+
if backend == "drake":
from dimos.manipulation.planning.world.drake_world import DrakeWorld
- return DrakeWorld(enable_viz=visualization.requires_world_visualization, **kwargs)
- else:
- raise ValueError(f"Unknown backend: {backend}. Available: ['drake']")
+ return DrakeWorld(enable_viz=enable_viz, **kwargs)
+ if backend == "roboplan":
+ from dimos.manipulation.planning.world.roboplan_world import RoboPlanWorld
+
+ return RoboPlanWorld(enable_viz=enable_viz, **kwargs)
+
+ raise ValueError(f"Unknown backend: {backend}. Available: {list(SUPPORTED_WORLD_BACKENDS)}")
def create_kinematics(
- name: str = "pink",
+ name: str = DEFAULT_KINEMATICS_NAME,
config: ManipulationKinematicsConfig | None = None,
**kwargs: Any,
) -> KinematicsSpec:
@@ -93,19 +138,32 @@ def create_kinematics(
def create_planner(
name: str = "rrt_connect",
+ world: WorldSpec | None = None,
+ world_backend: str | None = None,
**kwargs: Any,
) -> PlannerSpec:
- """Create motion planner. name='rrt_connect'."""
+ """Create motion planner. name='rrt_connect'|'roboplan'.
+
+ RoboPlan-native planning is scene/backend-coupled, so `name='roboplan'`
+ returns the RoboPlan world object itself as the planner.
+ """
if name == "rrt_connect":
from dimos.manipulation.planning.planners.rrt_planner import RRTConnectPlanner
return RRTConnectPlanner(**kwargs)
- else:
- raise ValueError(f"Unknown planner: {name}. Available: ['rrt_connect']")
+ if name == "roboplan":
+ if world_backend != "roboplan" or world is None:
+ raise ValueError(_ROBOPLAN_PLANNER_REQUIRES_ROBOPLAN_WORLD)
+ if not isinstance(world, PlannerSpec):
+ raise ValueError("RoboPlan-native planner requires a RoboPlan world planner object")
+ return world
+
+ raise ValueError(f"Unknown planner: {name}. Available: {list(SUPPORTED_PLANNERS)}")
def create_planning_specs(
world: WorldSpec,
+ world_backend: str = "drake",
planner_name: str = "rrt_connect",
kinematics_name: str | None = None,
kinematics: ManipulationKinematicsConfig | None = None,
@@ -115,25 +173,35 @@ def create_planning_specs(
if kinematics_name is not None:
kinematics = kinematics_config_from_name(kinematics_name)
+ if kinematics is None:
+ kinematics = kinematics_config_from_name(DEFAULT_KINEMATICS_NAME)
+
+ validate_backend_combination(
+ world_backend=world_backend,
+ planner_name=planner_name,
+ kinematics_name=kinematics.backend,
+ )
return PlanningSpecs(
world_monitor=WorldMonitor(world=world),
kinematics=create_kinematics(config=kinematics),
- planner=create_planner(name=planner_name),
+ planner=create_planner(name=planner_name, world=world, world_backend=world_backend),
)
def create_planning_stack(
robot_config: Any,
+ world_backend: str = "drake",
visualization: ManipulationVisualizationConfig | None = None,
planner_name: str = "rrt_connect",
kinematics_name: str | None = None,
kinematics: ManipulationKinematicsConfig | None = None,
) -> tuple[WorldSpec, KinematicsSpec, PlannerSpec, str]:
"""Create complete planning stack. Returns (world, kinematics, planner, robot_id)."""
- world = create_world(visualization=visualization)
+ world = create_world(backend=world_backend, visualization=visualization)
planning_specs = create_planning_specs(
world=world,
+ world_backend=world_backend,
planner_name=planner_name,
kinematics_name=kinematics_name,
kinematics=kinematics,
diff --git a/dimos/manipulation/planning/kinematics/test_pink_ik.py b/dimos/manipulation/planning/kinematics/test_pink_ik.py
index 2c2fd863e2..9bfb1abeff 100644
--- a/dimos/manipulation/planning/kinematics/test_pink_ik.py
+++ b/dimos/manipulation/planning/kinematics/test_pink_ik.py
@@ -23,9 +23,11 @@
import numpy as np
import pytest
+from pytest_mock import MockerFixture
from dimos.manipulation.planning.factory import create_kinematics
from dimos.manipulation.planning.kinematics.config import PinkKinematicsConfig
+import dimos.manipulation.planning.kinematics.pink_ik as pink_ik
from dimos.manipulation.planning.kinematics.pink_ik import (
PinkIK,
PinkIKConfig,
@@ -172,15 +174,11 @@ def _robot_config() -> RobotModelConfig:
)
-class _TestPinkIK(PinkIK):
- def __init__(self, converge: bool = True) -> None:
- self.config = PinkIKConfig(max_iterations=3)
- self._modules = _fake_modules(converge=converge)
- self._robot_contexts = {}
-
-
-def _pink_ik(converge: bool = True) -> PinkIK:
- return _TestPinkIK(converge=converge)
+def _pink_ik(mocker: MockerFixture, converge: bool = True) -> PinkIK:
+ mocker.patch.object(
+ pink_ik, "_load_optional_dependencies", return_value=_fake_modules(converge=converge)
+ )
+ return PinkIK(PinkIKConfig(max_iterations=3))
def _context() -> _PinkRobotContext:
@@ -222,16 +220,14 @@ def check_config_collision_free(self, robot_id: str, joint_state: JointState) ->
def test_create_kinematics_pink_missing_dependency_is_actionable(
- monkeypatch: pytest.MonkeyPatch,
+ mocker: MockerFixture,
) -> None:
- from dimos.manipulation.planning.kinematics import pink_ik
-
def fake_import_module(name: str) -> ModuleType:
if name == "pink":
raise ImportError("missing pink")
return ModuleType(name)
- monkeypatch.setattr(pink_ik.importlib, "import_module", fake_import_module)
+ mocker.patch.object(pink_ik.importlib, "import_module", side_effect=fake_import_module)
with pytest.raises(PinkIKDependencyError) as exc_info:
create_kinematics("pink")
@@ -240,36 +236,30 @@ def fake_import_module(name: str) -> ModuleType:
def test_create_kinematics_pink_unavailable_solver_mentions_manipulation_extra(
- monkeypatch: pytest.MonkeyPatch,
+ mocker: MockerFixture,
) -> None:
- from dimos.manipulation.planning.kinematics import pink_ik
-
def fake_import_module(name: str) -> ModuleType:
module = ModuleType(name)
if name == "qpsolvers":
module.available_solvers = [] # type: ignore[attr-defined]
return module
- monkeypatch.setattr(pink_ik.importlib, "import_module", fake_import_module)
+ mocker.patch.object(pink_ik.importlib, "import_module", side_effect=fake_import_module)
with pytest.raises(PinkIKDependencyError, match="--extra manipulation"):
create_kinematics("pink")
-def test_create_kinematics_pink_returns_backend(monkeypatch: pytest.MonkeyPatch) -> None:
- from dimos.manipulation.planning.kinematics import pink_ik
-
- monkeypatch.setattr(pink_ik, "_load_optional_dependencies", lambda solver: _fake_modules())
+def test_create_kinematics_pink_returns_backend(mocker: MockerFixture) -> None:
+ mocker.patch.object(pink_ik, "_load_optional_dependencies", return_value=_fake_modules())
assert isinstance(create_kinematics("pink"), PinkIK)
def test_create_kinematics_pink_config_passes_tuning(
- monkeypatch: pytest.MonkeyPatch,
+ mocker: MockerFixture,
) -> None:
- from dimos.manipulation.planning.kinematics import pink_ik
-
- monkeypatch.setattr(pink_ik, "_load_optional_dependencies", lambda solver: _fake_modules())
+ mocker.patch.object(pink_ik, "_load_optional_dependencies", return_value=_fake_modules())
ik = create_kinematics(config=PinkKinematicsConfig(max_iterations=7, dt=0.02, posture_cost=0.0))
@@ -279,10 +269,8 @@ def test_create_kinematics_pink_config_passes_tuning(
assert ik.config.posture_cost == 0.0
-def test_pink_ik_config_overrides_are_applied(monkeypatch: pytest.MonkeyPatch) -> None:
- from dimos.manipulation.planning.kinematics import pink_ik
-
- monkeypatch.setattr(pink_ik, "_load_optional_dependencies", lambda solver: _fake_modules())
+def test_pink_ik_config_overrides_are_applied(mocker: MockerFixture) -> None:
+ mocker.patch.object(pink_ik, "_load_optional_dependencies", return_value=_fake_modules())
ik = PinkIK(PinkIKConfig(solver="proxqp", dt=0.1), max_iterations=7, posture_cost=0.0)
@@ -310,8 +298,8 @@ def test_mapping_failure_for_missing_joint() -> None:
_build_joint_mapping(_FakeModel(), config)
-def test_solve_single_returns_successful_ik_result() -> None:
- ik = _pink_ik(converge=True)
+def test_solve_single_returns_successful_ik_result(mocker: MockerFixture) -> None:
+ ik = _pink_ik(mocker, converge=True)
target = np.eye(4)
target[:3, 3] = [0.1, 0.2, 0.3]
@@ -331,8 +319,8 @@ def test_solve_single_returns_successful_ik_result() -> None:
assert result.joint_state.position == pytest.approx([0.2, 0.1, 0.3])
-def test_solve_single_reports_non_convergence() -> None:
- ik = _pink_ik(converge=False)
+def test_solve_single_reports_non_convergence(mocker: MockerFixture) -> None:
+ ik = _pink_ik(mocker, converge=False)
target = np.eye(4)
target[:3, 3] = [0.1, 0.0, 0.0]
@@ -350,8 +338,8 @@ def test_solve_single_reports_non_convergence() -> None:
assert "did not converge" in result.message
-def test_solve_rejects_collision_candidate() -> None:
- ik = _pink_ik(converge=True)
+def test_solve_rejects_collision_candidate(mocker: MockerFixture) -> None:
+ ik = _pink_ik(mocker, converge=True)
context = _context()
ik._robot_contexts = {"robot": context}
@@ -370,8 +358,8 @@ def test_solve_rejects_collision_candidate() -> None:
assert result.joint_state is None
-def test_solve_retries_after_joint_limit_failure(monkeypatch: pytest.MonkeyPatch) -> None:
- ik = _pink_ik(converge=True)
+def test_solve_retries_after_joint_limit_failure(mocker: MockerFixture) -> None:
+ ik = _pink_ik(mocker, converge=True)
context = _context()
ik._robot_contexts = {"robot": context}
calls = 0
@@ -396,7 +384,7 @@ def fake_solve_single(**_: object) -> IKResult:
iterations=1,
)
- monkeypatch.setattr(ik, "_solve_single", fake_solve_single)
+ solve_single = mocker.patch.object(ik, "_solve_single", side_effect=fake_solve_single)
result = ik.solve(
world=cast("Any", _FakeWorld(collision_free=True)),
@@ -409,5 +397,5 @@ def fake_solve_single(**_: object) -> IKResult:
max_attempts=2,
)
- assert calls == 2
+ assert solve_single.call_count == 2
assert result.status == IKStatus.SUCCESS
diff --git a/dimos/manipulation/planning/monitor/test_world_monitor.py b/dimos/manipulation/planning/monitor/test_world_monitor.py
index d46eb73a6c..df75d23d03 100644
--- a/dimos/manipulation/planning/monitor/test_world_monitor.py
+++ b/dimos/manipulation/planning/monitor/test_world_monitor.py
@@ -196,7 +196,7 @@ def test_create_planning_specs_wraps_existing_world(monkeypatch) -> None:
"create_kinematics",
lambda *args, **kwargs: fake_kinematics,
)
- monkeypatch.setattr(planning_factory, "create_planner", lambda name: fake_planner)
+ monkeypatch.setattr(planning_factory, "create_planner", lambda **kwargs: fake_planner)
planning_specs = planning_factory.create_planning_specs(world=fake_world) # type: ignore[arg-type]
diff --git a/dimos/manipulation/planning/world/roboplan_world.py b/dimos/manipulation/planning/world/roboplan_world.py
new file mode 100644
index 0000000000..d252d3fdc9
--- /dev/null
+++ b/dimos/manipulation/planning/world/roboplan_world.py
@@ -0,0 +1,655 @@
+# Copyright 2025-2026 Dimensional Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""RoboPlan-backed manipulation world implementation.
+
+This adapter imports RoboPlan at module load time. The factory imports this module
+only when the RoboPlan backend is requested, so default planning paths do not need
+the optional dependency installed.
+"""
+
+from __future__ import annotations
+
+from contextlib import contextmanager
+from dataclasses import dataclass, field, replace
+from pathlib import Path
+import tempfile
+import time
+from typing import TYPE_CHECKING
+import xml.etree.ElementTree as ET
+from xml.sax.saxutils import escape
+
+import numpy as np
+
+try:
+ import roboplan.core as roboplan_core
+ import roboplan.rrt as roboplan_rrt
+except ImportError as exc:
+ raise ImportError(
+ "RoboPlanWorld requires the optional roboplan dependency. "
+ "Install the manipulation extra before selecting the roboplan backend."
+ ) from exc
+
+from dimos.manipulation.planning.spec.config import RobotModelConfig
+from dimos.manipulation.planning.spec.enums import ObstacleType, PlanningStatus
+from dimos.manipulation.planning.spec.models import Obstacle, PlanningResult, WorldRobotID
+from dimos.manipulation.planning.utils.mesh_utils import prepare_urdf_for_drake
+from dimos.manipulation.planning.utils.path_utils import compute_path_length
+from dimos.msgs.geometry_msgs.PoseStamped import PoseStamped
+from dimos.msgs.sensor_msgs.JointState import JointState
+from dimos.utils.logging_config import setup_logger
+from dimos.utils.transform_utils import matrix_to_pose, pose_to_matrix
+
+if TYPE_CHECKING:
+ from collections.abc import Generator
+
+ from numpy.typing import NDArray
+
+ from dimos.manipulation.planning.spec.protocols import WorldSpec
+
+logger = setup_logger()
+
+_WORLD_FRAME = ""
+
+
+@dataclass
+class _RoboPlanRobotData:
+ robot_id: WorldRobotID
+ config: RobotModelConfig
+ lower_limits: NDArray[np.float64]
+ upper_limits: NDArray[np.float64]
+
+
+@dataclass
+class RoboPlanContext:
+ """DimOS context wrapper for RoboPlan world state."""
+
+ q_by_robot: dict[WorldRobotID, NDArray[np.float64]] = field(default_factory=dict)
+
+
+class RoboPlanWorld:
+ """WorldSpec implementation backed by RoboPlan scene and collision queries."""
+
+ def __init__(self, enable_viz: bool = False, **_: object) -> None:
+ self._scene: roboplan_core.Scene | None = None
+ self._enable_viz = enable_viz
+ if enable_viz:
+ logger.warning("RoboPlanWorld does not currently provide manipulation visualization")
+
+ self._robots: dict[WorldRobotID, _RoboPlanRobotData] = {}
+ self._obstacles: dict[str, Obstacle] = {}
+ self._robot_counter = 0
+ self._finalized = False
+ self._live_context = RoboPlanContext()
+ self._srdf_tempdirs: list[tempfile.TemporaryDirectory[str]] = []
+
+ # Robot Management
+
+ def add_robot(self, config: RobotModelConfig) -> WorldRobotID:
+ """Add a supported robot model to the RoboPlan scene."""
+ if self._finalized:
+ raise RuntimeError("Cannot add robot after world is finalized")
+ if self._robots:
+ raise ValueError("RoboPlanWorld currently supports one robot per Scene")
+ if not Path(config.model_path).exists():
+ raise FileNotFoundError(f"Robot model not found: {Path(config.model_path).resolve()}")
+
+ self._validate_robot_config(config)
+ self._robot_counter += 1
+ robot_id = f"robot_{self._robot_counter}"
+ self._scene = self._create_scene(config)
+ lower, upper = self._extract_joint_limits(config)
+ self._robots[robot_id] = _RoboPlanRobotData(
+ robot_id=robot_id,
+ config=config,
+ lower_limits=lower,
+ upper_limits=upper,
+ )
+ self._live_context.q_by_robot[robot_id] = np.zeros(
+ len(config.joint_names), dtype=np.float64
+ )
+ logger.info(f"Added RoboPlan robot '{robot_id}' ({config.name})")
+ return robot_id
+
+ def get_robot_ids(self) -> list[WorldRobotID]:
+ """Get all robot IDs in the world."""
+ return list(self._robots.keys())
+
+ def get_robot_config(self, robot_id: WorldRobotID) -> RobotModelConfig:
+ """Get robot configuration by ID."""
+ return self._get_robot(robot_id).config
+
+ def get_joint_limits(
+ self, robot_id: WorldRobotID
+ ) -> tuple[NDArray[np.float64], NDArray[np.float64]]:
+ """Get joint limits in DimOS joint order."""
+ robot = self._get_robot(robot_id)
+ return robot.lower_limits.copy(), robot.upper_limits.copy()
+
+ # Obstacle Management
+
+ def add_obstacle(self, obstacle: Obstacle) -> str:
+ """Add a supported obstacle to the RoboPlan scene."""
+ obstacle_id = obstacle.name
+ if obstacle_id in self._obstacles:
+ return obstacle_id
+ self._add_obstacle_to_scene(obstacle, obstacle_id)
+ self._obstacles[obstacle_id] = obstacle
+ return obstacle_id
+
+ def remove_obstacle(self, obstacle_id: str) -> bool:
+ """Remove an obstacle from the RoboPlan scene."""
+ if obstacle_id not in self._obstacles:
+ return False
+ scene = self._require_scene()
+ scene.removeGeometry(obstacle_id)
+ del self._obstacles[obstacle_id]
+ return True
+
+ def update_obstacle_pose(self, obstacle_id: str, pose: PoseStamped) -> bool:
+ """Update an obstacle pose and invalidate collision scratch."""
+ if obstacle_id not in self._obstacles:
+ return False
+ scene = self._require_scene()
+ scene.updateGeometryPlacement(obstacle_id, _WORLD_FRAME, pose_to_matrix(pose))
+ self._obstacles[obstacle_id] = replace(self._obstacles[obstacle_id], pose=pose)
+ return True
+
+ def clear_obstacles(self) -> None:
+ """Remove all tracked obstacles."""
+ for obstacle_id in list(self._obstacles.keys()):
+ self.remove_obstacle(obstacle_id)
+
+ def get_obstacles(self) -> list[Obstacle]:
+ """Get all obstacles currently tracked by DimOS."""
+ return list(self._obstacles.values())
+
+ # Lifecycle
+
+ def finalize(self) -> None:
+ """Mark the RoboPlan scene ready for DimOS planning queries.
+
+ RoboPlan Python bindings construct a query-ready Scene directly; v0.4.0
+ exposes no Scene.finalize() lifecycle method.
+ """
+ self._require_scene()
+ self._finalized = True
+
+ @property
+ def is_finalized(self) -> bool:
+ """Check whether the scene is finalized."""
+ return self._finalized
+
+ # Context Management
+
+ def get_live_context(self) -> RoboPlanContext:
+ """Get the live context that mirrors robot state."""
+ self._require_finalized()
+ return self._live_context
+
+ @contextmanager
+ def scratch_context(self) -> Generator[RoboPlanContext, None, None]:
+ """Create a per-consumer context with independent collision scratch."""
+ self._require_finalized()
+ ctx = RoboPlanContext(
+ q_by_robot={robot_id: q.copy() for robot_id, q in self._live_context.q_by_robot.items()}
+ )
+ yield ctx
+
+ def sync_from_joint_state(self, robot_id: WorldRobotID, joint_state: JointState) -> None:
+ """Sync live context from a driver joint-state message."""
+ if not self._finalized:
+ return
+ self.set_joint_state(self._live_context, robot_id, joint_state)
+
+ # State Operations
+
+ def set_joint_state(
+ self, ctx: RoboPlanContext, robot_id: WorldRobotID, joint_state: JointState
+ ) -> None:
+ """Set robot joint state in a context."""
+ self._require_finalized()
+ ctx.q_by_robot[robot_id] = self._joint_state_to_q(robot_id, joint_state)
+
+ def get_joint_state(self, ctx: RoboPlanContext, robot_id: WorldRobotID) -> JointState:
+ """Get robot joint state from a context."""
+ robot = self._get_robot(robot_id)
+ q = ctx.q_by_robot.get(robot_id)
+ if q is None:
+ q = np.zeros(len(robot.config.joint_names), dtype=np.float64)
+ return JointState(name=robot.config.joint_names, position=q.astype(float).tolist())
+
+ # Collision Checking
+
+ def is_collision_free(self, ctx: RoboPlanContext, robot_id: WorldRobotID) -> bool:
+ """Check if the robot configuration in a context is collision-free."""
+ self._require_finalized()
+ q = ctx.q_by_robot.get(robot_id)
+ if q is None:
+ raise KeyError(f"Robot '{robot_id}' not found in context")
+ return not self._has_collisions(robot_id, q)
+
+ def get_min_distance(self, ctx: RoboPlanContext, robot_id: WorldRobotID) -> float:
+ """Get minimum signed distance.
+
+ RoboPlan signed-distance semantics are not verified yet, so do not return
+ a misleading approximation.
+ """
+ raise NotImplementedError("RoboPlanWorld.get_min_distance is not implemented")
+
+ def check_config_collision_free(self, robot_id: WorldRobotID, joint_state: JointState) -> bool:
+ """Check a joint state using a scratch collision context."""
+ with self.scratch_context() as ctx:
+ self.set_joint_state(ctx, robot_id, joint_state)
+ return self.is_collision_free(ctx, robot_id)
+
+ def check_edge_collision_free(
+ self,
+ robot_id: WorldRobotID,
+ start: JointState,
+ end: JointState,
+ step_size: float = 0.05,
+ ) -> bool:
+ """Check if an interpolated edge is collision-free."""
+ self._require_finalized()
+ q_start = self._joint_state_to_q(robot_id, start)
+ q_end = self._joint_state_to_q(robot_id, end)
+ return not self._call_path_collision_checker(robot_id, q_start, q_end, step_size)
+
+ # Forward Kinematics
+
+ def get_ee_pose(self, ctx: RoboPlanContext, robot_id: WorldRobotID) -> PoseStamped:
+ """Get end-effector pose if RoboPlan exposes FK."""
+ robot = self._get_robot(robot_id)
+ mat = self.get_link_pose(ctx, robot_id, robot.config.end_effector_link)
+ pose = matrix_to_pose(mat)
+ return PoseStamped(
+ frame_id="world",
+ position=[pose.position.x, pose.position.y, pose.position.z],
+ orientation=[
+ pose.orientation.x,
+ pose.orientation.y,
+ pose.orientation.z,
+ pose.orientation.w,
+ ],
+ )
+
+ def get_link_pose(
+ self, ctx: RoboPlanContext, robot_id: WorldRobotID, link_name: str
+ ) -> NDArray[np.float64]:
+ """Get link pose as a 4x4 homogeneous transform."""
+ q = ctx.q_by_robot.get(robot_id)
+ if q is None:
+ raise KeyError(f"Robot '{robot_id}' not found in context")
+ scene = self._require_scene()
+ result = scene.forwardKinematics(self._to_scene_q(robot_id, q), link_name, "")
+ return np.asarray(result, dtype=np.float64)
+
+ def get_jacobian(self, ctx: RoboPlanContext, robot_id: WorldRobotID) -> NDArray[np.float64]:
+ """Get end-effector Jacobian if RoboPlan exposes a compatible API."""
+ robot = self._get_robot(robot_id)
+ q = ctx.q_by_robot.get(robot_id)
+ if q is None:
+ raise KeyError(f"Robot '{robot_id}' not found in context")
+ scene = self._require_scene()
+ result = scene.computeFrameJacobian(
+ self._to_scene_q(robot_id, q), robot.config.end_effector_link, True
+ )
+ arr = np.asarray(result, dtype=np.float64)
+ if arr.shape[0] != 6:
+ raise ValueError(f"Unexpected RoboPlan Jacobian shape: {arr.shape}; expected 6 x n")
+ return arr
+
+ # PlannerSpec for native RoboPlan planning
+
+ def plan_joint_path(
+ self,
+ world: WorldSpec,
+ robot_id: WorldRobotID,
+ start: JointState,
+ goal: JointState,
+ timeout: float = 10.0,
+ ) -> PlanningResult:
+ """Plan a path using RoboPlan-native RRT when selected as planner."""
+ if world is not self:
+ return PlanningResult(
+ status=PlanningStatus.NO_SOLUTION,
+ message="RoboPlan-native planner requires its RoboPlanWorld instance",
+ )
+ start_time = time.time()
+ q_start = self._joint_state_to_q(robot_id, start)
+ q_goal = self._joint_state_to_q(robot_id, goal)
+ try:
+ path_arrays = self._run_native_rrt(robot_id, q_start, q_goal, timeout)
+ except ValueError as exc:
+ # _run_native_rrt raises ValueError for the known "no path" case; let any
+ # unexpected error propagate instead of swallowing its traceback.
+ return PlanningResult(
+ status=PlanningStatus.NO_SOLUTION,
+ planning_time=time.time() - start_time,
+ message=f"RoboPlan-native planning failed: {exc}",
+ )
+ if not path_arrays:
+ return PlanningResult(
+ status=PlanningStatus.NO_SOLUTION,
+ planning_time=time.time() - start_time,
+ message="RoboPlan-native planning failed: returned an empty path",
+ )
+ robot = self._get_robot(robot_id)
+ path = [
+ JointState(
+ name=list(robot.config.joint_names),
+ position=np.asarray(q).astype(float).tolist(),
+ )
+ for q in path_arrays
+ ]
+ return PlanningResult(
+ status=PlanningStatus.SUCCESS,
+ path=path,
+ planning_time=time.time() - start_time,
+ path_length=compute_path_length(path),
+ message="RoboPlan path found",
+ )
+
+ def get_name(self) -> str:
+ """Get planner name."""
+ return "RoboPlan"
+
+ # Internals
+
+ def _create_scene(self, config: RobotModelConfig) -> roboplan_core.Scene:
+ urdf_path = self._prepare_robot_urdf(config)
+ srdf_path = self._prepare_robot_srdf(config, urdf_path)
+ package_paths = [str(path) for path in config.package_paths.values()]
+ scene = roboplan_core.Scene(config.name, str(urdf_path), str(srdf_path), package_paths)
+ self._apply_collision_exclusions(scene, config, urdf_path)
+ return scene
+
+ def _validate_robot_config(self, config: RobotModelConfig) -> None:
+ if not config.joint_names:
+ raise ValueError("RoboPlanWorld requires explicit joint_names")
+ if not np.allclose(pose_to_matrix(config.base_pose), np.eye(4)):
+ raise ValueError("RoboPlanWorld does not yet support non-identity robot base_pose")
+
+ def _prepare_robot_urdf(self, config: RobotModelConfig) -> Path:
+ return Path(
+ prepare_urdf_for_drake(
+ config.model_path,
+ package_paths=config.package_paths,
+ xacro_args=config.xacro_args,
+ convert_meshes=config.auto_convert_meshes,
+ )
+ )
+
+ def _prepare_robot_srdf(self, config: RobotModelConfig, urdf_path: Path) -> Path:
+ srdf = self._generate_srdf(config, urdf_path)
+ srdf_tempdir = tempfile.TemporaryDirectory(prefix="dimos_roboplan_srdf_")
+ self._srdf_tempdirs.append(srdf_tempdir)
+ cache_dir = Path(srdf_tempdir.name)
+ srdf_path = cache_dir / f"{config.name}.srdf"
+ srdf_path.write_text(srdf)
+ return srdf_path
+
+ def _generate_srdf(self, config: RobotModelConfig, urdf_path: Path) -> str:
+ lines = [f'']
+ lines.append(f' ')
+ for joint_name in config.joint_names:
+ lines.append(f' ')
+ lines.append(" ")
+ for link1, link2 in self._collision_exclusion_pairs(config, urdf_path):
+ lines.append(
+ f' '
+ )
+ lines.append("")
+ return "\n".join(lines) + "\n"
+
+ def _collision_exclusion_pairs(
+ self, config: RobotModelConfig, urdf_path: Path
+ ) -> list[tuple[str, str]]:
+ pairs = set(config.collision_exclusion_pairs)
+ pairs.update(self._adjacent_link_pairs_from_urdf(urdf_path))
+ return sorted(pairs)
+
+ def _adjacent_link_pairs_from_urdf(self, urdf_path: Path) -> list[tuple[str, str]]:
+ try:
+ root = ET.parse(urdf_path).getroot()
+ except ET.ParseError as exc:
+ raise ValueError(
+ f"Unable to parse prepared URDF for SRDF generation: {urdf_path}"
+ ) from exc
+
+ pairs: list[tuple[str, str]] = []
+ for joint in root.findall("joint"):
+ parent = joint.find("parent")
+ child = joint.find("child")
+ parent_link = parent.get("link") if parent is not None else None
+ child_link = child.get("link") if child is not None else None
+ if parent_link and child_link:
+ pairs.append((parent_link, child_link))
+ return pairs
+
+ def _apply_collision_exclusions(
+ self, scene: roboplan_core.Scene, config: RobotModelConfig, urdf_path: Path
+ ) -> None:
+ for link1, link2 in self._collision_exclusion_pairs(config, urdf_path):
+ try:
+ scene.setCollisions(link1, link2, False)
+ except RuntimeError:
+ logger.warning(f"RoboPlan rejected collision exclusion pair: {link1} <-> {link2}")
+
+ def _extract_joint_limits(
+ self, config: RobotModelConfig
+ ) -> tuple[NDArray[np.float64], NDArray[np.float64]]:
+ if config.joint_limits_lower is not None and config.joint_limits_upper is not None:
+ lower = np.asarray(config.joint_limits_lower, dtype=np.float64)
+ upper = np.asarray(config.joint_limits_upper, dtype=np.float64)
+ else:
+ limits = self._query_scene_joint_limits(config)
+ if limits is None:
+ raise ValueError(
+ "RoboPlanWorld requires explicit joint_limits_lower/joint_limits_upper "
+ "when limits cannot be read from RoboPlan bindings"
+ )
+ lower, upper = limits
+ if len(lower) != len(config.joint_names) or len(upper) != len(config.joint_names):
+ raise ValueError("Joint limit length must match joint_names length")
+ if np.any(~np.isfinite(lower)) or np.any(~np.isfinite(upper)):
+ raise ValueError("RoboPlanWorld requires finite joint limits")
+ return lower, upper
+
+ def _query_scene_joint_limits(
+ self, config: RobotModelConfig
+ ) -> tuple[NDArray[np.float64], NDArray[np.float64]] | None:
+ scene = self._require_scene()
+ joint_order = self._query_scene_joint_order(scene, config)
+ if joint_order is None:
+ return None
+ lower, upper = scene.getPositionLimitVectors(config.name, False)
+ lower_array = np.asarray(lower, dtype=np.float64)
+ upper_array = np.asarray(upper, dtype=np.float64)
+ if len(joint_order) != len(lower_array) or len(joint_order) != len(upper_array):
+ raise ValueError(
+ "RoboPlan joint limit order length does not match returned limit vectors"
+ )
+ if len(set(joint_order)) != len(joint_order):
+ raise ValueError("RoboPlan returned duplicate joint names for joint limits")
+ if set(joint_order) != set(config.joint_names):
+ raise ValueError(
+ "RoboPlan joint limit names do not match configured joint_names: "
+ f"RoboPlan={joint_order}, configured={config.joint_names}"
+ )
+ order_indices = [joint_order.index(joint_name) for joint_name in config.joint_names]
+ return lower_array[order_indices], upper_array[order_indices]
+
+ def _query_scene_joint_order(
+ self, scene: roboplan_core.Scene, config: RobotModelConfig
+ ) -> list[str] | None:
+ try:
+ group_info = scene.getJointGroupInfo(config.name)
+ except AttributeError:
+ return None
+ return list(group_info.joint_names)
+
+ def _get_robot(self, robot_id: WorldRobotID) -> _RoboPlanRobotData:
+ if robot_id not in self._robots:
+ raise KeyError(f"Robot '{robot_id}' not found")
+ return self._robots[robot_id]
+
+ def _joint_state_to_q(
+ self, robot_id: WorldRobotID, joint_state: JointState
+ ) -> NDArray[np.float64]:
+ robot = self._get_robot(robot_id)
+ if len(joint_state.position) != len(robot.config.joint_names):
+ raise ValueError("JointState position length must match configured joint count")
+ if not joint_state.name:
+ return np.asarray(joint_state.position, dtype=np.float64)
+ name_to_pos = {
+ robot.config.get_urdf_joint_name(name): position
+ for name, position in zip(joint_state.name, joint_state.position, strict=True)
+ }
+ missing = [name for name in robot.config.joint_names if name not in name_to_pos]
+ if missing:
+ raise ValueError(f"JointState missing joints for RoboPlanWorld: {missing}")
+ return np.asarray(
+ [name_to_pos[name] for name in robot.config.joint_names], dtype=np.float64
+ )
+
+ def _require_finalized(self) -> None:
+ if not self._finalized:
+ raise RuntimeError("World must be finalized first")
+
+ def _require_scene(self) -> roboplan_core.Scene:
+ if self._scene is None:
+ raise RuntimeError("RoboPlan scene is not initialized; add a robot first")
+ return self._scene
+
+ def _to_scene_q(self, robot_id: WorldRobotID, q: NDArray[np.float64]) -> NDArray[np.float64]:
+ """Expand DimOS group positions to RoboPlan's full scene vector when available."""
+ scene = self._require_scene()
+ robot = self._get_robot(robot_id)
+ if len(q) != len(robot.config.joint_names):
+ return q
+ return np.asarray(scene.toFullJointPositions(robot.config.name, q), dtype=np.float64)
+
+ def _has_collisions(self, robot_id: WorldRobotID, q: NDArray[np.float64]) -> bool:
+ scene = self._require_scene()
+ scene_q = self._to_scene_q(robot_id, q)
+ return bool(scene.hasCollisions(scene_q))
+
+ def _call_path_collision_checker(
+ self,
+ robot_id: WorldRobotID,
+ q_start: NDArray[np.float64],
+ q_end: NDArray[np.float64],
+ step_size: float,
+ ) -> bool:
+ scene = self._require_scene()
+ scene_q_start = self._to_scene_q(robot_id, q_start)
+ scene_q_end = self._to_scene_q(robot_id, q_end)
+ return bool(
+ roboplan_core.hasCollisionsAlongPath(
+ scene,
+ scene_q_start,
+ scene_q_end,
+ step_size,
+ False,
+ True,
+ )
+ )
+
+ def _add_obstacle_to_scene(self, obstacle: Obstacle, obstacle_id: str) -> None:
+ scene = self._require_scene()
+ matrix = pose_to_matrix(obstacle.pose)
+ color = np.asarray(obstacle.color, dtype=np.float64)
+ if obstacle.obstacle_type == ObstacleType.BOX:
+ self._require_dimensions(obstacle, 3)
+ width, height, depth = obstacle.dimensions
+ scene.addBoxGeometry(
+ obstacle_id,
+ _WORLD_FRAME,
+ roboplan_core.Box(width, height, depth),
+ matrix,
+ color,
+ )
+ return
+ if obstacle.obstacle_type == ObstacleType.SPHERE:
+ self._require_dimensions(obstacle, 1)
+ (radius,) = obstacle.dimensions
+ scene.addSphereGeometry(
+ obstacle_id, _WORLD_FRAME, roboplan_core.Sphere(radius), matrix, color
+ )
+ return
+ if obstacle.obstacle_type == ObstacleType.CYLINDER:
+ self._require_dimensions(obstacle, 2)
+ radius, length = obstacle.dimensions
+ scene.addCylinderGeometry(
+ obstacle_id,
+ _WORLD_FRAME,
+ roboplan_core.Cylinder(radius, length),
+ matrix,
+ color,
+ )
+ return
+ if obstacle.obstacle_type == ObstacleType.MESH:
+ if not obstacle.mesh_path:
+ raise ValueError("MESH obstacle requires mesh_path")
+ scene.addMeshGeometry(
+ obstacle_id,
+ _WORLD_FRAME,
+ roboplan_core.Mesh(obstacle.mesh_path),
+ matrix,
+ color,
+ )
+ return
+ raise ValueError(f"Unsupported obstacle type: {obstacle.obstacle_type}")
+
+ def _require_dimensions(self, obstacle: Obstacle, n_dims: int) -> None:
+ if len(obstacle.dimensions) != n_dims:
+ raise ValueError(
+ f"{obstacle.obstacle_type.name} obstacle requires {n_dims} dimensions, "
+ f"got {len(obstacle.dimensions)}"
+ )
+
+ def _run_native_rrt(
+ self,
+ robot_id: WorldRobotID,
+ q_start: NDArray[np.float64],
+ q_goal: NDArray[np.float64],
+ timeout: float,
+ ) -> list[NDArray[np.float64]]:
+ scene = self._require_scene()
+ robot = self._get_robot(robot_id)
+ options = roboplan_rrt.RRTOptions()
+ options.group_name = robot.config.name
+ options.max_planning_time = timeout
+ options.collision_check_use_bisection = False
+ planner = roboplan_rrt.RRT(scene, options)
+ start_config = self._to_native_joint_configuration(robot_id, q_start)
+ goal_config = self._to_native_joint_configuration(robot_id, q_goal)
+ result = planner.plan(start_config, goal_config)
+ if result is None:
+ raise ValueError("RoboPlan RRT returned no path")
+ return self._extract_native_path(result)
+
+ def _to_native_joint_configuration(
+ self, robot_id: WorldRobotID, q: NDArray[np.float64]
+ ) -> roboplan_core.JointConfiguration:
+ robot = self._get_robot(robot_id)
+ return roboplan_core.JointConfiguration(
+ robot.config.joint_names, np.asarray(q, dtype=np.float64)
+ )
+
+ def _extract_native_path(self, result: roboplan_core.JointPath) -> list[NDArray[np.float64]]:
+ return [np.asarray(q, dtype=np.float64) for q in result.positions]
diff --git a/dimos/manipulation/test_manipulation_unit.py b/dimos/manipulation/test_manipulation_unit.py
index 4c83e9c3ad..bc12bfe994 100644
--- a/dimos/manipulation/test_manipulation_unit.py
+++ b/dimos/manipulation/test_manipulation_unit.py
@@ -260,6 +260,7 @@ def test_kinematics_config_is_passed_to_factory(
planning_initialization.mock_planning_specs.assert_called_once_with(
world=planning_initialization.mock_world,
+ world_backend="drake",
planner_name="rrt_connect",
kinematics_name=None,
kinematics=kinematics,
@@ -279,6 +280,7 @@ def test_legacy_kinematics_name_still_selects_backend(
planning_initialization.mock_planning_specs.assert_called_once_with(
world=planning_initialization.mock_world,
+ world_backend="drake",
planner_name="rrt_connect",
kinematics_name="pink",
kinematics=module.config.kinematics,
diff --git a/dimos/manipulation/test_planning_factory.py b/dimos/manipulation/test_planning_factory.py
new file mode 100644
index 0000000000..e695d9579f
--- /dev/null
+++ b/dimos/manipulation/test_planning_factory.py
@@ -0,0 +1,212 @@
+# Copyright 2025-2026 Dimensional Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Focused tests for manipulation planning wiring."""
+
+from __future__ import annotations
+
+from collections.abc import Callable, Generator
+from pathlib import Path
+import sys
+from typing import Any
+
+import pytest
+from pytest_mock import MockerFixture
+
+from dimos.manipulation.manipulation_module import ManipulationModule
+from dimos.manipulation.planning.factory import (
+ create_kinematics,
+ create_planner,
+ create_planning_stack,
+ create_world,
+ validate_backend_combination,
+)
+from dimos.manipulation.planning.kinematics.config import JacobianKinematicsConfig
+from dimos.manipulation.planning.kinematics.jacobian_ik import JacobianIK
+from dimos.manipulation.planning.planners.rrt_planner import RRTConnectPlanner
+from dimos.manipulation.planning.spec.config import RobotModelConfig
+from dimos.manipulation.planning.spec.protocols import PlannerSpec
+from dimos.msgs.geometry_msgs.PoseStamped import PoseStamped
+from dimos.msgs.geometry_msgs.Quaternion import Quaternion
+from dimos.msgs.geometry_msgs.Vector3 import Vector3
+
+
+@pytest.fixture
+def make_module() -> Generator[Callable[..., ManipulationModule], None, None]:
+ """Build ManipulationModules and stop them on teardown, even on failure."""
+ modules: list[ManipulationModule] = []
+
+ def _make(**kwargs: Any) -> ManipulationModule:
+ module = ManipulationModule(**kwargs)
+ modules.append(module)
+ return module
+
+ yield _make
+ for module in modules:
+ module.stop()
+
+
+@pytest.fixture
+def robot_config() -> RobotModelConfig:
+ return RobotModelConfig(
+ name="arm",
+ model_path=Path("/path/to/robot.urdf"),
+ base_pose=PoseStamped(position=Vector3(), orientation=Quaternion()), # type: ignore[call-arg]
+ joint_names=["joint1", "joint2"],
+ end_effector_link="tcp",
+ coordinator_task_name="traj_arm",
+ )
+
+
+def test_create_world_unknown_backend() -> None:
+ with pytest.raises(
+ ValueError, match=r"Unknown backend: fake\. Available: \['drake', 'roboplan'\]"
+ ):
+ create_world(backend="fake")
+
+
+def test_factory_selects_expected_implementations() -> None:
+ assert isinstance(create_planner(name="rrt_connect"), RRTConnectPlanner)
+ assert isinstance(create_kinematics(name="jacobian"), JacobianIK)
+
+
+def test_default_planner_path_does_not_import_roboplan(monkeypatch: pytest.MonkeyPatch) -> None:
+ for module_name in list(sys.modules):
+ if module_name == "roboplan" or module_name.startswith("roboplan."):
+ monkeypatch.delitem(sys.modules, module_name, raising=False)
+
+ create_planner(name="rrt_connect")
+ validate_backend_combination()
+
+ assert "roboplan.core" not in sys.modules
+ assert "roboplan.rrt" not in sys.modules
+
+
+def test_validate_backend_combination_rejects_invalid_combinations() -> None:
+ with pytest.raises(
+ ValueError, match='planner_name="roboplan" requires world_backend="roboplan"'
+ ):
+ validate_backend_combination(world_backend="drake", planner_name="roboplan")
+
+ with pytest.raises(
+ ValueError, match='kinematics_name="drake_optimization" requires world_backend="drake"'
+ ):
+ validate_backend_combination(world_backend="roboplan", kinematics_name="drake_optimization")
+
+
+def test_create_planner_uses_roboplan_world_as_native_planner(mocker: MockerFixture) -> None:
+ world = mocker.MagicMock(spec=PlannerSpec)
+
+ assert create_planner(name="roboplan", world=world, world_backend="roboplan") is world
+
+
+def test_create_planner_rejects_roboplan_without_roboplan_world(mocker: MockerFixture) -> None:
+ with pytest.raises(
+ ValueError, match='planner_name="roboplan" requires world_backend="roboplan"'
+ ):
+ create_planner(name="roboplan", world=mocker.MagicMock(), world_backend="drake")
+
+
+def test_create_planning_stack_wires_selected_components(
+ mocker: MockerFixture, robot_config: RobotModelConfig
+) -> None:
+ world = mocker.MagicMock()
+ world.add_robot.return_value = "robot-id"
+
+ kinematics = mocker.MagicMock(name="kinematics")
+ planner = mocker.MagicMock(name="planner")
+
+ mock_world = mocker.patch(
+ "dimos.manipulation.planning.factory.create_world", return_value=world
+ )
+ mock_kinematics = mocker.patch(
+ "dimos.manipulation.planning.factory.create_kinematics",
+ return_value=kinematics,
+ )
+ mock_planner = mocker.patch(
+ "dimos.manipulation.planning.factory.create_planner",
+ return_value=planner,
+ )
+
+ result = create_planning_stack(
+ robot_config,
+ world_backend="drake",
+ planner_name="rrt_connect",
+ kinematics_name="jacobian",
+ )
+
+ assert result == (world, kinematics, planner, "robot-id")
+ mock_world.assert_called_once_with(backend="drake", visualization=None)
+ mock_kinematics.assert_called_once_with(config=JacobianKinematicsConfig())
+ mock_planner.assert_called_once_with(name="rrt_connect", world=world, world_backend="drake")
+ world.add_robot.assert_called_once_with(robot_config)
+ world.finalize.assert_called_once()
+
+
+def test_start_with_no_robots_skips_planning(
+ mocker: MockerFixture, make_module: Callable[..., ManipulationModule]
+) -> None:
+ module = make_module(robots=[])
+ create_world_mock = mocker.patch("dimos.manipulation.manipulation_module.create_world")
+ create_planning_specs_mock = mocker.patch(
+ "dimos.manipulation.manipulation_module.create_planning_specs"
+ )
+
+ module._initialize_planning()
+
+ assert module._robots == {}
+ assert module._world_monitor is None
+ create_world_mock.assert_not_called()
+ create_planning_specs_mock.assert_not_called()
+
+
+def test_start_uses_configured_planner_and_kinematics(
+ mocker: MockerFixture,
+ robot_config: RobotModelConfig,
+ make_module: Callable[..., ManipulationModule],
+) -> None:
+ module = make_module(robots=[robot_config], kinematics=JacobianKinematicsConfig())
+ world = mocker.MagicMock(name="world")
+ world_monitor = mocker.MagicMock()
+ world_monitor.add_robot.return_value = "robot-id"
+ planner = mocker.MagicMock(name="planner")
+ kinematics = mocker.MagicMock(name="kinematics")
+ planning_specs = mocker.MagicMock(
+ world_monitor=world_monitor,
+ planner=planner,
+ kinematics=kinematics,
+ )
+ create_world_mock = mocker.patch(
+ "dimos.manipulation.manipulation_module.create_world", return_value=world
+ )
+ create_planning_specs_mock = mocker.patch(
+ "dimos.manipulation.manipulation_module.create_planning_specs",
+ return_value=planning_specs,
+ )
+
+ module._initialize_planning()
+
+ create_world_mock.assert_called_once_with(
+ backend="drake", visualization=module.config.visualization
+ )
+ create_planning_specs_mock.assert_called_once_with(
+ world=world,
+ world_backend="drake",
+ planner_name="rrt_connect",
+ kinematics_name=None,
+ kinematics=module.config.kinematics,
+ )
+ assert module._planner is planner
+ assert module._kinematics is kinematics
+ assert module._robots["arm"][0] == "robot-id"
diff --git a/dimos/manipulation/test_roboplan_world.py b/dimos/manipulation/test_roboplan_world.py
new file mode 100644
index 0000000000..9457c4264c
--- /dev/null
+++ b/dimos/manipulation/test_roboplan_world.py
@@ -0,0 +1,549 @@
+# Copyright 2025-2026 Dimensional Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Pure-Python tests for the optional RoboPlan world adapter."""
+
+from __future__ import annotations
+
+import importlib
+from pathlib import Path
+import sys
+from types import ModuleType
+from typing import Any, ClassVar
+
+import numpy as np
+import pytest
+
+from dimos.manipulation.planning.spec.config import RobotModelConfig
+from dimos.manipulation.planning.spec.enums import ObstacleType, PlanningStatus
+from dimos.manipulation.planning.spec.models import Obstacle
+from dimos.msgs.geometry_msgs.PoseStamped import PoseStamped
+from dimos.msgs.geometry_msgs.Quaternion import Quaternion
+from dimos.msgs.geometry_msgs.Vector3 import Vector3
+from dimos.msgs.sensor_msgs.JointState import JointState
+from dimos.utils.transform_utils import pose_to_matrix
+
+
+class FakeJointConfiguration:
+ def __init__(
+ self, joint_names: list[str] | None = None, positions: np.ndarray | None = None
+ ) -> None:
+ self.joint_names = joint_names or []
+ self.positions = np.asarray(positions if positions is not None else [], dtype=np.float64)
+
+
+class FakeJointPath:
+ def __init__(self, joint_names: list[str], positions: list[np.ndarray]) -> None:
+ self.joint_names = joint_names
+ self.positions = positions
+
+
+class FakeBox:
+ def __init__(self, x: float, y: float, z: float) -> None:
+ self.dimensions = (x, y, z)
+
+
+class FakeSphere:
+ def __init__(self, radius: float) -> None:
+ self.radius = radius
+
+
+class FakeCylinder:
+ def __init__(self, radius: float, length: float) -> None:
+ self.radius = radius
+ self.length = length
+
+
+class FakeMesh:
+ def __init__(self, filename: str) -> None:
+ self.filename = filename
+
+
+class FakeJointGroupInfo:
+ def __init__(self, joint_names: list[str]) -> None:
+ self.joint_names = joint_names
+
+
+class FakeScene:
+ joint_group_joint_names: ClassVar[list[str]] = ["joint1", "joint2"]
+ position_limits_lower: ClassVar[list[float]] = [-1.0, -2.0]
+ position_limits_upper: ClassVar[list[float]] = [1.0, 2.0]
+
+ def __init__(self, *args: Any) -> None:
+ self.constructor_args = args
+ self.models: list[tuple[str, str, dict[str, str]]] = []
+ self.geometry: dict[str, np.ndarray] = {}
+ self.collision_settings: dict[tuple[str, str], bool] = {}
+
+ def addRobotModel(self, path: str, name: str, package_paths: dict[str, str]) -> str:
+ self.models.append((path, name, package_paths))
+ return name
+
+ def hasCollisions(self, q: np.ndarray) -> bool:
+ return bool(np.any(np.asarray(q) > 0.9))
+
+ def getPositionLimitVectors(
+ self, group_name: str = "", collapsed: bool = False
+ ) -> tuple[np.ndarray, np.ndarray]:
+ _ = (group_name, collapsed)
+ return np.asarray(self.position_limits_lower), np.asarray(self.position_limits_upper)
+
+ def getJointGroupInfo(self, name: str) -> FakeJointGroupInfo:
+ _ = name
+ return FakeJointGroupInfo(self.joint_group_joint_names)
+
+ def toFullJointPositions(self, group_name: str, q: np.ndarray) -> np.ndarray:
+ _ = group_name
+ return q
+
+ def addBoxGeometry(
+ self,
+ obstacle_id: str,
+ parent_frame: str,
+ box: FakeBox,
+ matrix: np.ndarray,
+ color: np.ndarray,
+ ) -> None:
+ _ = (parent_frame, box, color)
+ self.geometry[obstacle_id] = matrix
+
+ def addSphereGeometry(
+ self,
+ obstacle_id: str,
+ parent_frame: str,
+ sphere: FakeSphere,
+ matrix: np.ndarray,
+ color: np.ndarray,
+ ) -> None:
+ _ = (parent_frame, sphere, color)
+ self.geometry[obstacle_id] = matrix
+
+ def addCylinderGeometry(
+ self,
+ obstacle_id: str,
+ parent_frame: str,
+ cylinder: FakeCylinder,
+ matrix: np.ndarray,
+ color: np.ndarray,
+ ) -> None:
+ _ = (parent_frame, cylinder, color)
+ self.geometry[obstacle_id] = matrix
+
+ def addMeshGeometry(
+ self,
+ obstacle_id: str,
+ parent_frame: str,
+ mesh: FakeMesh,
+ matrix: np.ndarray,
+ color: np.ndarray,
+ ) -> None:
+ _ = (parent_frame, mesh, color)
+ self.geometry[obstacle_id] = matrix
+
+ def updateGeometryPlacement(
+ self, obstacle_id: str, parent_frame: str, matrix: np.ndarray
+ ) -> None:
+ _ = parent_frame
+ self.geometry[obstacle_id] = matrix
+
+ def removeGeometry(self, obstacle_id: str) -> None:
+ del self.geometry[obstacle_id]
+
+ def setCollisions(self, body1: str, body2: str, enable: bool) -> None:
+ self.collision_settings[(body1, body2)] = enable
+
+ def forwardKinematics(self, q: np.ndarray, frame_name: str, base_frame: str = "") -> np.ndarray:
+ _ = (frame_name, base_frame)
+ mat = np.eye(4)
+ mat[0, 3] = float(np.sum(q))
+ return mat
+
+ def computeFrameJacobian(
+ self, q: np.ndarray, frame_name: str, local: bool = True
+ ) -> np.ndarray:
+ _ = (q, frame_name, local)
+ return np.ones((6, 2))
+
+
+class FakeRRTOptions:
+ def __init__(self) -> None:
+ self.timeout = 0.0
+ self.max_time = 0.0
+ self.collision_check_use_bisection = True
+
+
+class FakeRRT:
+ def __init__(self, scene: FakeScene, options: FakeRRTOptions) -> None:
+ self.scene = scene
+ self.options = options
+
+ def plan(
+ self, q_start: FakeJointConfiguration, q_goal: FakeJointConfiguration
+ ) -> FakeJointPath:
+ assert isinstance(q_start, FakeJointConfiguration)
+ assert isinstance(q_goal, FakeJointConfiguration)
+ midpoint = (np.asarray(q_start.positions) + np.asarray(q_goal.positions)) / 2.0
+ return FakeJointPath(
+ q_start.joint_names,
+ [np.asarray(q_start.positions), midpoint, np.asarray(q_goal.positions)],
+ )
+
+
+def _install_fake_roboplan(monkeypatch: pytest.MonkeyPatch) -> None:
+ roboplan_pkg = ModuleType("roboplan")
+ roboplan_pkg.__path__ = [] # type: ignore[attr-defined]
+ core = ModuleType("roboplan.core")
+ core.Scene = FakeScene # type: ignore[attr-defined]
+ core.JointConfiguration = FakeJointConfiguration # type: ignore[attr-defined]
+ core.JointPath = FakeJointPath # type: ignore[attr-defined]
+ core.Box = FakeBox # type: ignore[attr-defined]
+ core.Sphere = FakeSphere # type: ignore[attr-defined]
+ core.Cylinder = FakeCylinder # type: ignore[attr-defined]
+ core.Mesh = FakeMesh # type: ignore[attr-defined]
+
+ def has_collisions_along_path(
+ scene: FakeScene,
+ q_start: np.ndarray,
+ q_end: np.ndarray,
+ max_step_size: float,
+ bisection: bool = False,
+ check_endpoints: bool = True,
+ ) -> bool:
+ _ = (scene, max_step_size, bisection, check_endpoints)
+ for t in np.linspace(0.0, 1.0, 5):
+ if scene.hasCollisions(q_start + t * (q_end - q_start)):
+ return True
+ return False
+
+ core.hasCollisionsAlongPath = has_collisions_along_path # type: ignore[attr-defined]
+
+ rrt = ModuleType("roboplan.rrt")
+ rrt.RRTOptions = FakeRRTOptions # type: ignore[attr-defined]
+ rrt.RRT = FakeRRT # type: ignore[attr-defined]
+
+ monkeypatch.setitem(sys.modules, "roboplan", roboplan_pkg)
+ monkeypatch.setitem(sys.modules, "roboplan.core", core)
+ monkeypatch.setitem(sys.modules, "roboplan.rrt", rrt)
+
+
+@pytest.fixture
+def fake_roboplan(monkeypatch: pytest.MonkeyPatch) -> None:
+ _install_fake_roboplan(monkeypatch)
+
+
+@pytest.fixture
+def robot_config(tmp_path: Path) -> RobotModelConfig:
+ model_path = tmp_path / "robot.urdf"
+ model_path.write_text(
+ """
+
+
+
+
+
+
+
+
+
+ """
+ )
+ return RobotModelConfig(
+ name="arm",
+ model_path=model_path,
+ base_pose=PoseStamped(position=Vector3(), orientation=Quaternion()), # type: ignore[call-arg]
+ joint_names=["joint1", "joint2"],
+ end_effector_link="tcp",
+ joint_limits_lower=[-1.0, -2.0],
+ joint_limits_upper=[1.0, 2.0],
+ )
+
+
+def _make_world(fake_roboplan: None, robot_config: RobotModelConfig) -> tuple[Any, str]:
+ module = _import_roboplan_world(fake_roboplan)
+
+ world = module.RoboPlanWorld()
+ robot_id = world.add_robot(robot_config)
+ return world, robot_id
+
+
+def _import_roboplan_world(fake_roboplan: None) -> ModuleType:
+ _ = fake_roboplan
+ module_name = "dimos.manipulation.planning.world.roboplan_world"
+ if module_name in sys.modules:
+ return importlib.reload(sys.modules[module_name])
+ return importlib.import_module(module_name)
+
+
+def test_roboplan_bindings_are_imported_at_module_load(fake_roboplan: None) -> None:
+ module = _import_roboplan_world(fake_roboplan)
+
+ assert module.roboplan_core.Scene is FakeScene
+ assert module.roboplan_rrt.RRT is FakeRRT
+
+
+def test_robot_registration_finalization_and_joint_limits(
+ fake_roboplan: None, robot_config: RobotModelConfig
+) -> None:
+ world, robot_id = _make_world(fake_roboplan, robot_config)
+
+ assert world.get_robot_ids() == [robot_id]
+ assert world.get_robot_config(robot_id) is robot_config
+ assert world._scene.constructor_args[0] == "arm"
+ assert Path(world._scene.constructor_args[1]).suffix == ".urdf"
+ assert Path(world._scene.constructor_args[2]).suffix == ".srdf"
+ assert (
+ 'disable_collisions link1="base" link2="link1"'
+ in Path(world._scene.constructor_args[2]).read_text()
+ )
+ lower, upper = world.get_joint_limits(robot_id)
+ np.testing.assert_allclose(lower, [-1.0, -2.0])
+ np.testing.assert_allclose(upper, [1.0, 2.0])
+
+ world.finalize()
+ assert world.is_finalized
+
+
+def test_scene_joint_limits_are_reordered_to_configured_joint_order(
+ fake_roboplan: None, robot_config: RobotModelConfig, monkeypatch: pytest.MonkeyPatch
+) -> None:
+ config = robot_config.model_copy(
+ update={"joint_limits_lower": None, "joint_limits_upper": None}
+ )
+ monkeypatch.setattr(FakeScene, "joint_group_joint_names", ["joint2", "joint1"])
+ monkeypatch.setattr(FakeScene, "position_limits_lower", [-2.0, -1.0])
+ monkeypatch.setattr(FakeScene, "position_limits_upper", [2.0, 1.0])
+
+ world, robot_id = _make_world(fake_roboplan, config)
+
+ lower, upper = world.get_joint_limits(robot_id)
+ np.testing.assert_allclose(lower, [-1.0, -2.0])
+ np.testing.assert_allclose(upper, [1.0, 2.0])
+
+
+def test_scene_joint_limits_validate_joint_names(
+ fake_roboplan: None, robot_config: RobotModelConfig, monkeypatch: pytest.MonkeyPatch
+) -> None:
+ config = robot_config.model_copy(
+ update={"joint_limits_lower": None, "joint_limits_upper": None}
+ )
+ monkeypatch.setattr(FakeScene, "joint_group_joint_names", ["joint2", "extra_joint"])
+
+ with pytest.raises(ValueError, match="joint limit names do not match"):
+ _make_world(fake_roboplan, config)
+
+
+def test_context_cloning_and_joint_state_round_trip(
+ fake_roboplan: None, robot_config: RobotModelConfig
+) -> None:
+ world, robot_id = _make_world(fake_roboplan, robot_config)
+ world.finalize()
+
+ live_state = JointState(name=["joint1", "joint2"], position=[0.1, 0.2])
+ world.sync_from_joint_state(robot_id, live_state)
+
+ with world.scratch_context() as scratch:
+ scratch_state = world.get_joint_state(scratch, robot_id)
+ assert scratch_state.name == ["joint1", "joint2"]
+ assert scratch_state.position == [0.1, 0.2]
+ world.set_joint_state(
+ scratch, robot_id, JointState(name=["joint1", "joint2"], position=[0.3, 0.4])
+ )
+
+ live_round_trip = world.get_joint_state(world.get_live_context(), robot_id)
+ assert live_round_trip.position == [0.1, 0.2]
+
+
+def test_joint_name_mapping_is_applied_to_input_states(
+ fake_roboplan: None, robot_config: RobotModelConfig
+) -> None:
+ robot_config.joint_name_mapping = {"arm/j1": "joint1", "arm/j2": "joint2"}
+ world, robot_id = _make_world(fake_roboplan, robot_config)
+ world.finalize()
+
+ world.sync_from_joint_state(
+ robot_id, JointState(name=["arm/j1", "arm/j2"], position=[0.2, 0.3])
+ )
+
+ live_round_trip = world.get_joint_state(world.get_live_context(), robot_id)
+ assert live_round_trip.position == [0.2, 0.3]
+
+
+def test_obstacle_mutation_updates_scene_and_stored_pose(
+ fake_roboplan: None, robot_config: RobotModelConfig
+) -> None:
+ world, _ = _make_world(fake_roboplan, robot_config)
+ world.finalize()
+
+ obstacle = Obstacle(
+ name="box",
+ obstacle_type=ObstacleType.BOX,
+ pose=PoseStamped(position=Vector3(), orientation=Quaternion()), # type: ignore[call-arg]
+ dimensions=(0.1, 0.2, 0.3),
+ )
+ assert world.add_obstacle(obstacle) == "box"
+ assert "box" in world._scene.geometry
+ updated_pose = PoseStamped(position=Vector3(1, 0, 0), orientation=Quaternion()) # type: ignore[call-arg]
+ assert world.update_obstacle_pose(
+ "box",
+ updated_pose,
+ )
+ assert world.get_obstacles()[0].pose is updated_pose
+ np.testing.assert_allclose(world._scene.geometry["box"], pose_to_matrix(updated_pose))
+ assert world.remove_obstacle("box")
+ assert world.get_obstacles() == []
+
+
+def test_collision_config_and_edge_checks(
+ fake_roboplan: None, robot_config: RobotModelConfig
+) -> None:
+ world, robot_id = _make_world(fake_roboplan, robot_config)
+ world.finalize()
+
+ safe = JointState(name=["joint1", "joint2"], position=[0.1, 0.2])
+ colliding = JointState(name=["joint1", "joint2"], position=[0.95, 0.2])
+
+ assert world.check_config_collision_free(robot_id, safe)
+ assert not world.check_config_collision_free(robot_id, colliding)
+ assert not world.check_edge_collision_free(robot_id, safe, colliding, step_size=0.05)
+
+
+def test_collision_check_uses_scene_queries(
+ fake_roboplan: None, robot_config: RobotModelConfig
+) -> None:
+ world, robot_id = _make_world(fake_roboplan, robot_config)
+ world.finalize()
+
+ safe = JointState(name=["joint1", "joint2"], position=[0.1, 0.2])
+ colliding = JointState(name=["joint1", "joint2"], position=[0.95, 0.2])
+
+ assert world.check_config_collision_free(robot_id, safe)
+ assert not world.check_config_collision_free(robot_id, colliding)
+
+
+def test_generic_rrt_planner_uses_roboplan_world_collision_checks(
+ fake_roboplan: None, robot_config: RobotModelConfig
+) -> None:
+ from dimos.manipulation.planning.planners.rrt_planner import RRTConnectPlanner
+
+ world, robot_id = _make_world(fake_roboplan, robot_config)
+ world.finalize()
+ planner = RRTConnectPlanner(step_size=0.5, connect_step_size=0.5, goal_tolerance=10.0)
+
+ start = JointState(name=["joint1", "joint2"], position=[0.0, 0.0])
+ goal = JointState(name=["joint1", "joint2"], position=[0.2, 0.1])
+ result = planner.plan_joint_path(world, robot_id, start, goal, timeout=1.0, max_iterations=3)
+
+ assert result.status == PlanningStatus.SUCCESS
+ assert len(result.path) >= 2
+
+
+def test_fk_jacobian_and_explicit_min_distance_unsupported(
+ fake_roboplan: None, robot_config: RobotModelConfig
+) -> None:
+ world, robot_id = _make_world(fake_roboplan, robot_config)
+ world.finalize()
+ ctx = world.get_live_context()
+ world.set_joint_state(
+ ctx, robot_id, JointState(name=["joint1", "joint2"], position=[0.25, 0.5])
+ )
+
+ pose = world.get_ee_pose(ctx, robot_id)
+ assert pose.position.x == pytest.approx(0.75)
+ assert world.get_jacobian(ctx, robot_id).shape == (6, 2)
+ with pytest.raises(NotImplementedError, match="get_min_distance"):
+ world.get_min_distance(ctx, robot_id)
+
+
+def test_native_planner_converts_path(fake_roboplan: None, robot_config: RobotModelConfig) -> None:
+ world, robot_id = _make_world(fake_roboplan, robot_config)
+ world.finalize()
+
+ start = JointState(name=["joint1", "joint2"], position=[0.0, 0.0])
+ goal = JointState(name=["joint1", "joint2"], position=[0.4, 0.2])
+ result = world.plan_joint_path(world, robot_id, start, goal, timeout=1.0)
+
+ assert result.status == PlanningStatus.SUCCESS
+ assert [state.position for state in result.path] == [[0.0, 0.0], [0.2, 0.1], [0.4, 0.2]]
+ assert [state.name for state in result.path] == [["joint1", "joint2"]] * 3
+
+
+def test_native_planner_names_path_from_robot_config_when_start_is_unnamed(
+ fake_roboplan: None, robot_config: RobotModelConfig
+) -> None:
+ world, robot_id = _make_world(fake_roboplan, robot_config)
+ world.finalize()
+
+ start = JointState(name=[], position=[0.0, 0.0])
+ goal = JointState(name=["joint1", "joint2"], position=[0.4, 0.2])
+ result = world.plan_joint_path(world, robot_id, start, goal, timeout=1.0)
+
+ assert result.status == PlanningStatus.SUCCESS
+ assert [state.name for state in result.path] == [["joint1", "joint2"]] * 3
+
+
+def test_native_planner_rejects_empty_path(
+ fake_roboplan: None, robot_config: RobotModelConfig, monkeypatch: pytest.MonkeyPatch
+) -> None:
+ class EmptyPathRRT(FakeRRT):
+ def plan(
+ self, q_start: FakeJointConfiguration, q_goal: FakeJointConfiguration
+ ) -> FakeJointPath:
+ _ = (q_start, q_goal)
+ return FakeJointPath(["joint1", "joint2"], [])
+
+ monkeypatch.setattr(sys.modules["roboplan.rrt"], "RRT", EmptyPathRRT)
+ world, robot_id = _make_world(fake_roboplan, robot_config)
+ world.finalize()
+
+ start = JointState(name=["joint1", "joint2"], position=[0.0, 0.0])
+ goal = JointState(name=["joint1", "joint2"], position=[0.4, 0.2])
+ result = world.plan_joint_path(world, robot_id, start, goal, timeout=1.0)
+
+ assert result.status == PlanningStatus.NO_SOLUTION
+ assert result.path == []
+ assert "empty path" in result.message
+
+
+def test_collision_exclusion_pairs_are_written_to_generated_srdf(
+ fake_roboplan: None, robot_config: RobotModelConfig
+) -> None:
+ robot_config.collision_exclusion_pairs = [("a", "b")]
+ world, _ = _make_world(fake_roboplan, robot_config)
+
+ srdf_path = Path(world._scene.constructor_args[2])
+ assert 'disable_collisions link1="a" link2="b"' in srdf_path.read_text()
+
+
+def test_generated_srdf_uses_scoped_temp_directory(
+ fake_roboplan: None, robot_config: RobotModelConfig
+) -> None:
+ world, _ = _make_world(fake_roboplan, robot_config)
+
+ srdf_path = Path(world._scene.constructor_args[2])
+ assert srdf_path.parent.name.startswith("dimos_roboplan_srdf_")
+ assert srdf_path.exists()
+ assert world._srdf_tempdirs
+
+
+def test_unsupported_base_pose_fails_before_planning(
+ fake_roboplan: None, robot_config: RobotModelConfig
+) -> None:
+ from dimos.manipulation.planning.world.roboplan_world import RoboPlanWorld
+
+ robot_config.base_pose = PoseStamped( # type: ignore[call-arg]
+ position=Vector3(1, 0, 0), orientation=Quaternion()
+ )
+ world = RoboPlanWorld()
+ with pytest.raises(ValueError, match="base_pose"):
+ world.add_robot(robot_config)
diff --git a/dimos/manipulation/visualization/viser/config.py b/dimos/manipulation/visualization/viser/config.py
index c7e66e4de1..36ce7d06ea 100644
--- a/dimos/manipulation/visualization/viser/config.py
+++ b/dimos/manipulation/visualization/viser/config.py
@@ -54,7 +54,6 @@ class ViserVisualizationConfig(BaseModel):
default=0.02,
validation_alias=AliasChoices("current_match_tolerance", "viser_current_match_tolerance"),
)
- allow_plan_execute: bool = False
@property
def requires_world_visualization(self) -> bool:
diff --git a/dimos/manipulation/visualization/viser/gui.py b/dimos/manipulation/visualization/viser/gui.py
index 0434228583..2c0abac52d 100644
--- a/dimos/manipulation/visualization/viser/gui.py
+++ b/dimos/manipulation/visualization/viser/gui.py
@@ -531,10 +531,7 @@ def _update_control_state(self) -> None:
self._set_disabled("preview", not self.state.can_preview())
self._set_disabled(
"execute",
- not (
- self.config.allow_plan_execute
- and self.state.can_execute(self.config.current_match_tolerance)
- ),
+ not self.state.can_execute(self.config.current_match_tolerance),
)
self._set_disabled("cancel", not self.state.can_cancel())
self._update_target_visual_state()
@@ -627,11 +624,6 @@ def _submit_execute(self) -> None:
robot_name = self.state.selected_robot
if robot_name is None:
return
- if not self.config.allow_plan_execute:
- self._set_recoverable_error(
- "Panel execution disabled; set allow_plan_execute=True to enable"
- )
- return
if not self.state.can_execute(self.config.current_match_tolerance):
self._set_recoverable_error(
"Cannot execute: require feasible fresh plan and matching current joints"
diff --git a/dimos/manipulation/visualization/viser/test_operation_worker.py b/dimos/manipulation/visualization/viser/test_operation_worker.py
index ee1d5c29c0..7b7724ce2d 100644
--- a/dimos/manipulation/visualization/viser/test_operation_worker.py
+++ b/dimos/manipulation/visualization/viser/test_operation_worker.py
@@ -277,7 +277,10 @@ def test_gui_cancelled_planning_clears_active_plan_state(monkeypatch: pytest.Mon
[
("_submit_plan", "Cannot plan until target is feasible and manipulation is idle"),
("_submit_preview", "No fresh plan to preview"),
- ("_submit_execute", "Panel execution disabled; set allow_plan_execute=True to enable"),
+ (
+ "_submit_execute",
+ "Cannot execute: require feasible fresh plan and matching current joints",
+ ),
],
)
def test_gui_guard_errors_keep_action_idle(
diff --git a/dimos/manipulation/visualization/viser/test_viser_visualization.py b/dimos/manipulation/visualization/viser/test_viser_visualization.py
index 3be6162a67..2994158022 100644
--- a/dimos/manipulation/visualization/viser/test_viser_visualization.py
+++ b/dimos/manipulation/visualization/viser/test_viser_visualization.py
@@ -1157,7 +1157,7 @@ def test_gui_preset_dropdown_and_controls_include_init_home_current_and_callback
gui._apply_preset("Current")
assert [gui._joint_sliders[name].value for name in ("j1", "j2")] == [0.25, 0.5]
gui._submit_execute()
- assert gui.state.error == "Panel execution disabled; set allow_plan_execute=True to enable"
+ assert "Cannot execute" in gui.state.error
def test_gui_rebuilding_joint_sliders_removes_stale_viser_handles(
@@ -1205,7 +1205,7 @@ def test_gui_parses_numpy_transform_control_arrays() -> None:
assert list(pose.orientation) == [0.1, 0.2, 0.3, 0.5]
-def test_panel_execution_is_gated_by_default_and_refresh_updates_robot_controls(
+def test_panel_execution_requires_fresh_plan_and_refresh_updates_robot_controls(
make_panel: Callable[..., ViserPanelGui],
) -> None:
current = FakeJointState(["j1"], position=[1.2])
@@ -1234,7 +1234,7 @@ def test_panel_execution_is_gated_by_default_and_refresh_updates_robot_controls(
assert gui._joint_sliders["j1"].value == 0.5
gui._submit_execute()
- assert "Panel execution disabled" in gui.state.error
+ assert "Cannot execute" in gui.state.error
def test_gui_moves_joint_target_immediately_and_stores_evaluated_joint_solution(
@@ -1418,9 +1418,7 @@ def test_gui_safe_execute_requires_fresh_matching_plan_and_clear_resets_path(
gui = make_panel(
FakeGuiServer(),
adapter,
- ViserVisualizationConfig(
- panel_enabled=True, allow_plan_execute=True, current_match_tolerance=0.05
- ),
+ ViserVisualizationConfig(panel_enabled=True, current_match_tolerance=0.05),
)
gui._operation_worker.stop()
monkeypatch.setattr(
diff --git a/dimos/robot/cli/dimos.py b/dimos/robot/cli/dimos.py
index e5b1ce1bcf..74857f2556 100644
--- a/dimos/robot/cli/dimos.py
+++ b/dimos/robot/cli/dimos.py
@@ -26,7 +26,6 @@
import types
from typing import TYPE_CHECKING, Any, Literal, Union, cast, get_args, get_origin
-import click
from dotenv import load_dotenv
from pydantic import BaseModel
from pydantic_core import PydanticUndefined
@@ -453,28 +452,30 @@ def mcp_list_tools() -> None:
typer.echo(json.dumps(tools, indent=2))
-class _KeyValueType(click.ParamType):
- """Parse KEY=VALUE arguments, auto-converting JSON values."""
+def _parse_key_value_arg(value: str) -> tuple[str, Any]:
+ """Parse a KEY=VALUE argument, auto-converting JSON values."""
+ if "=" not in value:
+ raise ValueError(f"expected KEY=VALUE, got: {value}")
+ key, val = value.split("=", 1)
+ try:
+ return (key, json.loads(val))
+ except (json.JSONDecodeError, ValueError):
+ return (key, val)
- name = "KEY=VALUE"
- def convert(
- self, value: str, param: click.Parameter | None, ctx: click.Context | None
- ) -> tuple[str, Any]:
+def _validate_key_value_args(values: list[str]) -> list[str]:
+ """Validate KEY=VALUE arguments during CLI parsing."""
+ for value in values:
if "=" not in value:
- self.fail(f"expected KEY=VALUE, got: {value}", param, ctx)
- key, val = value.split("=", 1)
- try:
- return (key, json.loads(val))
- except (json.JSONDecodeError, ValueError):
- return (key, val)
+ raise typer.BadParameter(f"expected KEY=VALUE, got: {value}")
+ return values
@mcp_app.command("call")
def mcp_call_tool(
tool_name: str = typer.Argument(..., help="Tool name to call"),
args: list[str] = typer.Option(
- [], "--arg", "-a", click_type=_KeyValueType(), help="Arguments as key=value"
+ [], "--arg", "-a", callback=_validate_key_value_args, help="Arguments as key=value"
),
json_args: str = typer.Option("", "--json-args", "-j", help="Arguments as JSON string"),
) -> None:
@@ -487,8 +488,11 @@ def mcp_call_tool(
typer.echo(f"Error: invalid JSON in --json-args: {e}", err=True)
raise typer.Exit(1)
else:
- # _KeyValueType.convert() returns (key, val) tuples at runtime
- arguments = dict(args) # type: ignore[arg-type]
+ try:
+ arguments = dict(_parse_key_value_arg(arg) for arg in args)
+ except ValueError as e:
+ typer.echo(f"Error: invalid --arg: {e}", err=True)
+ raise typer.Exit(1)
try:
result = _get_adapter().call_tool(tool_name, arguments)
diff --git a/dimos/robot/manipulators/common/blueprints.py b/dimos/robot/manipulators/common/blueprints.py
index 1f85998a67..4b8552b1a3 100644
--- a/dimos/robot/manipulators/common/blueprints.py
+++ b/dimos/robot/manipulators/common/blueprints.py
@@ -114,12 +114,14 @@ def planner(
visualization: dict[str, Any] | None = None,
**kwargs: Any,
) -> Blueprint:
- return ManipulationModule.blueprint(
- robots=list(robots),
- planning_timeout=planning_timeout,
- visualization=visualization or {"backend": "meshcat"},
+ module_kwargs: dict[str, Any] = {
+ "robots": list(robots),
+ "planning_timeout": planning_timeout,
**kwargs,
- )
+ }
+ if visualization is not None:
+ module_kwargs["visualization"] = visualization
+ return ManipulationModule.blueprint(**module_kwargs)
def default_trajectory_task_name(hardware_id: str) -> str:
diff --git a/dimos/robot/manipulators/test_blueprints.py b/dimos/robot/manipulators/test_blueprints.py
new file mode 100644
index 0000000000..765926f8ac
--- /dev/null
+++ b/dimos/robot/manipulators/test_blueprints.py
@@ -0,0 +1,60 @@
+# Copyright 2026 Dimensional Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from typing import Any
+
+from dimos.core.coordination.blueprints import Blueprint
+from dimos.manipulation.manipulation_module import ManipulationModule, ManipulationModuleConfig
+from dimos.manipulation.visualization.config import NoManipulationVisualizationConfig
+from dimos.robot.manipulators.common.blueprints import planner
+from dimos.robot.manipulators.xarm.blueprints.basic import (
+ dual_xarm6_planner,
+ xarm6_planner_only,
+ xarm7_planner_coordinator,
+)
+from dimos.robot.manipulators.xarm.config import make_xarm7_model_config
+
+
+def _manipulation_kwargs(blueprint: Blueprint) -> dict[str, Any]:
+ return next(atom.kwargs for atom in blueprint.blueprints if atom.module is ManipulationModule)
+
+
+def _manipulation_config(blueprint: Blueprint) -> ManipulationModuleConfig:
+ return ManipulationModuleConfig(**_manipulation_kwargs(blueprint))
+
+
+def test_planner_helper_defaults_to_no_visualization() -> None:
+ blueprint = planner(robots=[make_xarm7_model_config(name="arm", add_gripper=True)])
+
+ kwargs = _manipulation_kwargs(blueprint)
+ config = ManipulationModuleConfig(**kwargs)
+
+ assert "visualization" not in kwargs
+ assert isinstance(config.visualization, NoManipulationVisualizationConfig)
+
+
+def test_planner_helper_preserves_explicit_visualization() -> None:
+ blueprint = planner(
+ robots=[make_xarm7_model_config(name="arm", add_gripper=True)],
+ visualization={"backend": "meshcat"},
+ )
+
+ assert _manipulation_kwargs(blueprint)["visualization"] == {"backend": "meshcat"}
+
+
+def test_xarm_planner_blueprints_default_to_no_visualization() -> None:
+ for blueprint in (xarm6_planner_only, dual_xarm6_planner, xarm7_planner_coordinator):
+ config = _manipulation_config(blueprint)
+
+ assert isinstance(config.visualization, NoManipulationVisualizationConfig)
diff --git a/dimos/robot/manipulators/xarm/blueprints/basic.py b/dimos/robot/manipulators/xarm/blueprints/basic.py
index 4f1660342e..f66d50079b 100644
--- a/dimos/robot/manipulators/xarm/blueprints/basic.py
+++ b/dimos/robot/manipulators/xarm/blueprints/basic.py
@@ -33,7 +33,6 @@
xarm6_planner_only = ManipulationModule.blueprint(
robots=[make_xarm6_model_config(name="arm")],
planning_timeout=10.0,
- visualization={"backend": "meshcat"},
)
dual_xarm6_planner = ManipulationModule.blueprint(
@@ -42,7 +41,6 @@
make_xarm6_model_config(name="right_arm", y_offset=-0.5),
],
planning_timeout=10.0,
- visualization={"backend": "meshcat"},
)
_xarm7_hw = xarm7_hardware("arm", gripper=True, mock_without_address=True)
diff --git a/docs/capabilities/manipulation/readme.md b/docs/capabilities/manipulation/readme.md
index e502947699..2d0aba6afb 100644
--- a/docs/capabilities/manipulation/readme.md
+++ b/docs/capabilities/manipulation/readme.md
@@ -1,6 +1,8 @@
# Manipulation
-Motion planning and teleoperation for robotic manipulators. Uses Drake for physics simulation and optional Meshcat or Viser planning visualization.
+Motion planning and teleoperation for robotic manipulators. Drake remains the default
+world backend, RoboPlan is available as an optional planning backend, and
+manipulation visualization supports Meshcat or Viser.
## Quick Start
@@ -76,6 +78,67 @@ preview() # Preview in Meshcat
execute() # Execute via coordinator
```
+### Planning backend selection
+
+Manipulation planning separates the world backend from the planner algorithm:
+
+- `world_backend` selects the robot/world/collision representation.
+- `planner_name` selects the path-planning algorithm.
+- `kinematics.backend` selects the IK backend. The legacy `kinematics_name`
+ field remains available as a compatibility shim.
+
+Drake remains the default:
+
+```bash
+dimos run xarm7-planner-coordinator
+```
+
+RoboPlan is available as an optional backend for evaluating a non-Drake world
+implementation. Select it explicitly with module options:
+
+```bash
+dimos run xarm7-planner-coordinator \
+ -o manipulationmodule.world_backend=roboplan \
+ -o manipulationmodule.planner_name=rrt_connect
+```
+
+Valid combinations:
+
+| `world_backend` | `planner_name` | `kinematics.backend` | Status |
+|-----------------|----------------|-------------------|--------|
+| `drake` | `rrt_connect` | `pink` | Default path |
+| `drake` | `rrt_connect` | `jacobian` | Legacy Jacobian IK |
+| `drake` | `rrt_connect` | `drake_optimization` | Drake-only IK |
+| `roboplan` | `rrt_connect` | `pink` or `jacobian` | Generic RRT over RoboPlan collision checks |
+| `roboplan` | `roboplan` | `pink` or `jacobian` | RoboPlan-native planner, using the RoboPlan world object |
+
+Invalid combinations fail during startup instead of waiting for the first plan
+request. For example, `planner_name=roboplan` requires
+`world_backend=roboplan`, and `kinematics.backend=drake_optimization` requires
+`world_backend=drake`.
+
+Install the manipulation dependencies:
+
+```bash
+uv sync --extra manipulation --inexact
+```
+
+The `manipulation` extra includes RoboPlan via `roboplan` from PyPI.
+The `--inexact` flag preserves other extras already installed in your current
+environment.
+
+Safety behavior for unsupported RoboPlan features:
+
+- Planning-critical unsupported inputs fail loudly before planning. Examples
+ include unsupported obstacle geometry, unavailable robot loading APIs, or
+ unavailable collision query APIs. RoboPlan worlds generate a minimal SRDF from
+ the DimOS robot config, including configured collision-exclusion pairs.
+- Unverified non-critical query methods raise explicit `NotImplementedError`.
+ In particular, signed minimum-distance semantics are not implemented for
+ RoboPlan until a safe equivalent is verified.
+- Embedded Meshcat visualization requires a world implementing `VisualizationSpec`;
+ use Viser or `none` with the RoboPlan backend.
+
### Planning Visualization
Manipulation visualization is configured on `ManipulationModuleConfig.visualization`.
@@ -93,8 +156,7 @@ CLI example:
```bash
uv run dimos run xarm7-planner-coordinator \
- -o manipulationmodule.visualization.backend=viser \
- -o manipulationmodule.visualization.allow_plan_execute=true
+ -o manipulationmodule.visualization.backend=viser
```
Blueprint example:
@@ -111,7 +173,6 @@ manipulation = ManipulationModule.blueprint(
"port": 8095,
"open_browser": True,
"panel_enabled": True, # default; set False for scene-only Viser
- "allow_plan_execute": False, # keep panel execution blocked by default
},
)
)
@@ -135,8 +196,8 @@ after the planning world has added its robots. This snapshot maps world robot ID
visuals without `WorldMonitor` depending on Viser-specific hooks. Embedded Meshcat visualization
does not need extra setup because it observes the Drake world directly.
-Panel execution is opt-in. Leave `allow_plan_execute=False` unless the operator intentionally
-wants the browser panel to call the existing manipulation execution path.
+When the Viser panel is enabled, it can call the existing manipulation execution path after a
+fresh feasible plan is available and the current robot joints still match the plan start.
### Perception + Agent
@@ -149,7 +210,7 @@ XARM7_IP= dimos run coordinator-xarm7 xarm-perception-agent
```
KeyboardTeleopModule ──→ ControlCoordinator ──→ ManipulationModule
- (pygame UI) (100Hz tick loop) (Drake + Meshcat)
+ (pygame UI) (100Hz tick loop) (WorldSpec backend)
│ │ │
PoseStamped CartesianIK task RRT planner
commands (Pinocchio IK) JacobianIK
@@ -159,7 +220,7 @@ KeyboardTeleopModule ──→ ControlCoordinator ──→ ManipulationModule
- **KeyboardTeleopModule** — Pygame UI publishing cartesian pose commands
- **ControlCoordinator** — 100Hz control loop with mock or real hardware adapters
-- **ManipulationModule** — Drake physics, Meshcat viz, RRT motion planning, obstacle management
+- **ManipulationModule** — world backend, optional visualization, RRT motion planning, obstacle management
Internally, planning code depends on `WorldSpec` for world, collision, and
kinematics behavior. Meshcat preview and publishing are exposed separately
diff --git a/pyproject.toml b/pyproject.toml
index 65970a739e..4d7a6143b9 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -267,6 +267,7 @@ manipulation = [
# Other
"matplotlib>=3.7.1",
"pyyaml>=6.0",
+ "roboplan",
]
cpu = [
@@ -432,7 +433,7 @@ tests-self-hosted = [
required-version = ">=0.9.17"
default-groups = ["tests"]
exclude-newer = "7 days"
-exclude-newer-package = { dimos-viewer = false, pyrealsense2-extended = false, dimos-lcm = false, lcm-dimos-fork = false }
+exclude-newer-package = { dimos-viewer = false, pyrealsense2-extended = false, dimos-lcm = false, lcm-dimos-fork = false, roboplan = false }
override-dependencies = [
# moondream pins pillow<11 but we need >=12.2.0 for security fixes
# (CVE-2026-25990, CVE-2026-40192, CVE-2026-42311).
diff --git a/stubs/roboplan/__init__.pyi b/stubs/roboplan/__init__.pyi
new file mode 100644
index 0000000000..ec4dcbca79
--- /dev/null
+++ b/stubs/roboplan/__init__.pyi
@@ -0,0 +1 @@
+"""Partial RoboPlan package stubs for DimOS type checking."""
diff --git a/stubs/roboplan/core/__init__.pyi b/stubs/roboplan/core/__init__.pyi
new file mode 100644
index 0000000000..74ed0863c6
--- /dev/null
+++ b/stubs/roboplan/core/__init__.pyi
@@ -0,0 +1,120 @@
+"""Partial stubs matching RoboPlan's bundled pybind stubs."""
+
+from collections.abc import Sequence
+import os
+
+import numpy as np
+from numpy.typing import NDArray
+
+__version__: str
+
+class JointConfiguration:
+ def __init__(
+ self,
+ joint_names: Sequence[str] = ...,
+ positions: NDArray[np.float64] = ...,
+ ) -> None: ...
+ @property
+ def joint_names(self) -> list[str]: ...
+ @joint_names.setter
+ def joint_names(self, arg: Sequence[str], /) -> None: ...
+ @property
+ def positions(self) -> NDArray[np.float64]: ...
+ @positions.setter
+ def positions(self, arg: NDArray[np.float64], /) -> None: ...
+
+class JointGroupInfo:
+ @property
+ def joint_names(self) -> list[str]: ...
+
+class JointPath:
+ @property
+ def joint_names(self) -> list[str]: ...
+ @property
+ def positions(self) -> list[NDArray[np.float64]]: ...
+
+class Box:
+ def __init__(self, x: float, y: float, z: float) -> None: ...
+
+class Sphere:
+ def __init__(self, radius: float) -> None: ...
+
+class Cylinder:
+ def __init__(self, radius: float, length: float) -> None: ...
+
+class Mesh:
+ def __init__(
+ self,
+ filename: str | os.PathLike[str],
+ scale: NDArray[np.float64] = ...,
+ ) -> None: ...
+
+class Scene:
+ def __init__(
+ self,
+ name: str,
+ urdf_path: str | os.PathLike[str],
+ srdf_path: str | os.PathLike[str],
+ package_paths: Sequence[str | os.PathLike[str]] = ...,
+ yaml_config_path: str | os.PathLike[str] = ...,
+ ) -> None: ...
+ def hasCollisions(self, q: NDArray[np.float64], debug: bool = ...) -> bool: ...
+ def toFullJointPositions(
+ self, group_name: str, q: NDArray[np.float64]
+ ) -> NDArray[np.float64]: ...
+ def forwardKinematics(
+ self, q: NDArray[np.float64], frame_name: str, base_frame: str = ...
+ ) -> NDArray[np.float64]: ...
+ def computeFrameJacobian(
+ self, q: NDArray[np.float64], frame_name: str, local: bool = ...
+ ) -> NDArray[np.float64]: ...
+ def getJointGroupInfo(self, name: str) -> JointGroupInfo: ...
+ def getPositionLimitVectors(
+ self, group_name: str = ..., collapsed: bool = ...
+ ) -> tuple[NDArray[np.float64], NDArray[np.float64]]: ...
+ def addBoxGeometry(
+ self,
+ name: str,
+ parent_frame: str,
+ box: Box,
+ tform: NDArray[np.float64],
+ color: NDArray[np.float64],
+ ) -> None: ...
+ def addSphereGeometry(
+ self,
+ name: str,
+ parent_frame: str,
+ sphere: Sphere,
+ tform: NDArray[np.float64],
+ color: NDArray[np.float64],
+ ) -> None: ...
+ def addCylinderGeometry(
+ self,
+ name: str,
+ parent_frame: str,
+ cylinder: Cylinder,
+ tform: NDArray[np.float64],
+ color: NDArray[np.float64],
+ ) -> None: ...
+ def addMeshGeometry(
+ self,
+ name: str,
+ parent_frame: str,
+ mesh: Mesh,
+ tform: NDArray[np.float64],
+ color: NDArray[np.float64],
+ ) -> None: ...
+ def updateGeometryPlacement(
+ self, name: str, parent_frame: str, tform: NDArray[np.float64]
+ ) -> None: ...
+ def removeGeometry(self, name: str) -> None: ...
+ def setCollisions(self, body1: str, body2: str, enable: bool) -> None: ...
+
+def hasCollisionsAlongPath(
+ scene: Scene,
+ q_start: NDArray[np.float64],
+ q_end: NDArray[np.float64],
+ max_step_size: float,
+ bisection: bool = ...,
+ check_endpoints: bool = ...,
+) -> bool: ...
diff --git a/stubs/roboplan/rrt/__init__.pyi b/stubs/roboplan/rrt/__init__.pyi
new file mode 100644
index 0000000000..1e8d35256e
--- /dev/null
+++ b/stubs/roboplan/rrt/__init__.pyi
@@ -0,0 +1,41 @@
+"""Partial stubs matching RoboPlan's bundled RRT pybind stubs."""
+
+import roboplan.core as roboplan_core
+
+__version__: str
+
+class RRTOptions:
+ def __init__(
+ self,
+ group_name: str = ...,
+ max_nodes: int = ...,
+ max_connection_distance: float = ...,
+ collision_check_step_size: float = ...,
+ collision_check_use_bisection: bool = ...,
+ goal_biasing_probability: float = ...,
+ max_planning_time: float = ...,
+ rrt_connect: bool = ...,
+ rrt_star: bool = ...,
+ rewire_distance: float = ...,
+ fast_return: bool = ...,
+ ) -> None: ...
+ @property
+ def group_name(self) -> str: ...
+ @group_name.setter
+ def group_name(self, arg: str, /) -> None: ...
+ @property
+ def max_planning_time(self) -> float: ...
+ @max_planning_time.setter
+ def max_planning_time(self, arg: float, /) -> None: ...
+ @property
+ def collision_check_use_bisection(self) -> bool: ...
+ @collision_check_use_bisection.setter
+ def collision_check_use_bisection(self, arg: bool, /) -> None: ...
+
+class RRT:
+ def __init__(self, scene: roboplan_core.Scene, options: RRTOptions) -> None: ...
+ def plan(
+ self,
+ start: roboplan_core.JointConfiguration,
+ goal: roboplan_core.JointConfiguration,
+ ) -> roboplan_core.JointPath | None: ...
diff --git a/uv.lock b/uv.lock
index 82bfadf7ed..f8f2e2b012 100644
--- a/uv.lock
+++ b/uv.lock
@@ -48,6 +48,7 @@ dimos-viewer = false
dimos-lcm = false
lcm-dimos-fork = false
pyrealsense2-extended = false
+roboplan = false
[manifest]
overrides = [
@@ -2060,6 +2061,7 @@ all = [
{ name = "qpsolvers", extra = ["proxqp"] },
{ name = "reportlab" },
{ name = "rerun-sdk" },
+ { name = "roboplan" },
{ name = "sounddevice" },
{ name = "soundfile" },
{ name = "sse-starlette" },
@@ -2137,6 +2139,7 @@ manipulation = [
{ name = "pyrealsense2-extended", marker = "sys_platform != 'darwin'" },
{ name = "pyyaml" },
{ name = "qpsolvers", extra = ["proxqp"] },
+ { name = "roboplan" },
{ name = "trimesh" },
{ name = "viser", extra = ["urdf"] },
{ name = "xacro" },
@@ -2488,6 +2491,7 @@ requires-dist = [
{ name = "reportlab", marker = "extra == 'apriltag'", specifier = ">=4.5.0" },
{ name = "rerun-sdk", specifier = "==0.32.0a1" },
{ name = "rerun-sdk", marker = "extra == 'visualization'", specifier = "==0.32.0a1" },
+ { name = "roboplan", marker = "extra == 'manipulation'" },
{ name = "scipy", specifier = ">=1.15.1" },
{ name = "sortedcontainers", specifier = "==2.4.0" },
{ name = "sounddevice", marker = "extra == 'agents'" },
@@ -9734,6 +9738,35 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/ca/e5/d708d262b600a352abe01c2ae360d8ff75b0af819b78e9af293191d928e6/rich_click-1.9.7-py3-none-any.whl", hash = "sha256:2f99120fca78f536e07b114d3b60333bc4bb2a0969053b1250869bcdc1b5351b", size = 71491, upload-time = "2026-01-31T04:29:26.777Z" },
]
+[[package]]
+name = "roboplan"
+version = "0.0.100"
+source = { registry = "https://pypi.org/simple" }
+dependencies = [
+ { name = "matplotlib" },
+ { name = "numpy", version = "2.2.6", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.11'" },
+ { name = "numpy", version = "2.3.5", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.11'" },
+ { name = "pin" },
+]
+sdist = { url = "https://files.pythonhosted.org/packages/32/66/42ba4788f3516d956cd60c76e5e3e3b60947e663e801917ab09a26be48a2/roboplan-0.0.100.tar.gz", hash = "sha256:a2c7b129630c4a7c5599b435962ecf91c16ebf1374dfb8a0365565831ef5cd82", size = 48439846, upload-time = "2026-06-25T00:52:43.416Z" }
+wheels = [
+ { url = "https://files.pythonhosted.org/packages/8e/7e/b5d40766c2427112936786c00293cf72563f704d83ce98cf1f2c1063e874/roboplan-0.0.100-cp310-cp310-macosx_14_0_arm64.whl", hash = "sha256:062b18c7f01e52b0e9ef6a7cb61f57cfbc0faa607122e21e0c854b1a3d2a116f", size = 42196048, upload-time = "2026-06-25T00:51:32.744Z" },
+ { url = "https://files.pythonhosted.org/packages/a6/b6/57677ddd26a83031db37e291f13a3d00f76d63aa978992278ee633c69415/roboplan-0.0.100-cp310-cp310-manylinux_2_28_aarch64.whl", hash = "sha256:79a760d9eedfa4bdb896316b43c58b1ecab2c1d4caea4f8a400d8344fbb977d8", size = 48026832, upload-time = "2026-06-25T00:51:37.987Z" },
+ { url = "https://files.pythonhosted.org/packages/dd/d2/b31265d468c3c2b70a412bd61022f44c5fd9e6c450020078dece3db4f9a4/roboplan-0.0.100-cp310-cp310-manylinux_2_28_x86_64.whl", hash = "sha256:2a3ae288f664c45e1fb22ac23d6c6a12934059ef116411440eb64ff9ffaa7cff", size = 49297082, upload-time = "2026-06-25T00:51:42.872Z" },
+ { url = "https://files.pythonhosted.org/packages/8e/7c/44b09dfa98077152a5620a46668afd96e470d7a9865ca74e3efe10630725/roboplan-0.0.100-cp311-cp311-macosx_14_0_arm64.whl", hash = "sha256:abdd9f841a27d3cb7cdb446cc76e3919481c0b5f638d0e9228fc6268edae00b3", size = 42194541, upload-time = "2026-06-25T00:51:47.397Z" },
+ { url = "https://files.pythonhosted.org/packages/27/cf/d63c42f0a48b496105e0f4b7cf5ec8f771e3c96a35603697a38a0df62be1/roboplan-0.0.100-cp311-cp311-manylinux_2_28_aarch64.whl", hash = "sha256:2a8abfaadfb54dd5209c44992cf9f99b0e414aa5af048537871789f772f3f454", size = 48024767, upload-time = "2026-06-25T00:51:52.185Z" },
+ { url = "https://files.pythonhosted.org/packages/ed/80/0f1bca0a723e19ef3b0f0261de82f64d34d7206693f3b7cb065a821d4cba/roboplan-0.0.100-cp311-cp311-manylinux_2_28_x86_64.whl", hash = "sha256:e7784c63b8a8a8708683221c66fa9d88898208909af904eb5126ba16f44f1fc7", size = 49295272, upload-time = "2026-06-25T00:51:56.916Z" },
+ { url = "https://files.pythonhosted.org/packages/d4/66/a7abbc1a595157a2312db46ab2aca25f58679eda2e03bbb857e7fa96bf4a/roboplan-0.0.100-cp312-cp312-macosx_14_0_arm64.whl", hash = "sha256:9beb11befd10b152c604f0ecaff892afff01615ce4884ba4236ef2a4560fe03a", size = 42174552, upload-time = "2026-06-25T00:52:01.334Z" },
+ { url = "https://files.pythonhosted.org/packages/fc/94/84d6490d9a2bca205de5a5d7c7d51c041daf425c37f66e7693c021336abc/roboplan-0.0.100-cp312-cp312-manylinux_2_28_aarch64.whl", hash = "sha256:3f908becbbcc8637f8ca1b1f18aeee88a8f04edea02afc2e37889773d740b960", size = 47993567, upload-time = "2026-06-25T00:52:06.173Z" },
+ { url = "https://files.pythonhosted.org/packages/1d/46/a4369b23887c2a9ab047c5da611fe0e47c58d54fdff3c94fa63aab7d800e/roboplan-0.0.100-cp312-cp312-manylinux_2_28_x86_64.whl", hash = "sha256:b6c72ead7d40be4f39664d51e126cee2ef72794029edf2293ec095d2b7f1e0a7", size = 49260596, upload-time = "2026-06-25T00:52:11.209Z" },
+ { url = "https://files.pythonhosted.org/packages/72/3c/8bcbfaa5b7a7fc57a1ee9258359cfc959c97bd0581b546641a3eaae6ac61/roboplan-0.0.100-cp313-cp313-macosx_14_0_arm64.whl", hash = "sha256:512aaa06d5b06a6db00e88260af37e1c82652f85aae8a528b458257cb1776059", size = 42174557, upload-time = "2026-06-25T00:52:15.922Z" },
+ { url = "https://files.pythonhosted.org/packages/ac/a6/3b0267542595c50fc9a19e5ba50c60428a8a1bda4f28220d989030e6d21b/roboplan-0.0.100-cp313-cp313-manylinux_2_28_aarch64.whl", hash = "sha256:ffbff5cd0477764c0df27740271caab674ca75155106a2adc1e1919efe0dfad2", size = 47993582, upload-time = "2026-06-25T00:52:20.574Z" },
+ { url = "https://files.pythonhosted.org/packages/ca/63/a00380c470f9399acd4f8bd15c22cb7045f5d4e960457e95a8698330c5c1/roboplan-0.0.100-cp313-cp313-manylinux_2_28_x86_64.whl", hash = "sha256:280abb8bb2a715cae374144620384788ea4ace602e4ef768db90cd72f8f271f7", size = 49260622, upload-time = "2026-06-25T00:52:25.409Z" },
+ { url = "https://files.pythonhosted.org/packages/56/a6/86c053056f730e322d84dca3035108f1efc1a178ec87bf71c17a877ffd6e/roboplan-0.0.100-cp314-cp314-macosx_14_0_arm64.whl", hash = "sha256:1f986f5e907a3db09fbbd8f6407da0ce4ceb8300668132795fb53c4e07f0c943", size = 42174391, upload-time = "2026-06-25T00:52:29.639Z" },
+ { url = "https://files.pythonhosted.org/packages/a0/7c/620051e5370e0fb6217f299dea6bebcaf3a025f946ceb9e05317d3378a84/roboplan-0.0.100-cp314-cp314-manylinux_2_28_aarch64.whl", hash = "sha256:86eefdec4b3a555bc6c04014822a07ff269ba00d16497c0c2ea25b6365321bdf", size = 47993630, upload-time = "2026-06-25T00:52:34.178Z" },
+ { url = "https://files.pythonhosted.org/packages/12/c3/514c57883048853279d784333b9bf6abb6099f526a558872664ab0414ca6/roboplan-0.0.100-cp314-cp314-manylinux_2_28_x86_64.whl", hash = "sha256:9c20a3c94f1e6cc2b3531efb1b06db5042167596c143c281eb08f03e4e38c81e", size = 49260636, upload-time = "2026-06-25T00:52:38.771Z" },
+]
+
[[package]]
name = "rope"
version = "1.14.0"