diff --git a/dimos/control/composition.py b/dimos/control/composition.py
new file mode 100644
index 0000000000..3fecd5a9bd
--- /dev/null
+++ b/dimos/control/composition.py
@@ -0,0 +1,131 @@
+# Copyright 2026 Dimensional Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Internal composition helpers for coordinator-facing control tasks."""
+
+from __future__ import annotations
+
+from collections.abc import Sequence
+from typing import TYPE_CHECKING
+
+from dimos.control.task import (
+ BaseControlTask,
+ ControlTask,
+ CoordinatorState,
+ JointCommandOutput,
+ ReferenceTransformTask,
+ ResourceClaim,
+)
+
+if TYPE_CHECKING:
+ from dimos.control.components import JointName
+
+
+class ComposedControlTask(BaseControlTask):
+ """A single coordinator-facing task backed by a linear internal pipeline.
+
+ The coordinator still sees one task with one claim and one final
+ ``JointCommandOutput``. Internally, ``source.compute()`` produces the
+ nominal reference and each transform consumes the previous output.
+ """
+
+ def __init__(
+ self,
+ name: str,
+ source: ControlTask,
+ transforms: Sequence[ReferenceTransformTask],
+ ) -> None:
+ if not name:
+ raise ValueError("ComposedControlTask requires a non-empty name")
+ if not transforms:
+ raise ValueError(f"ComposedControlTask '{name}' requires at least one transform")
+
+ self._name = name
+ self._source = source
+ self._transforms = list(transforms)
+ self._claim = self._validate_pipeline(name, source, self._transforms)
+
+ @property
+ def name(self) -> str:
+ """Unique task identifier exposed to the coordinator."""
+ return self._name
+
+ @property
+ def source(self) -> ControlTask:
+ """Source task used by the internal pipeline."""
+ return self._source
+
+ @property
+ def transforms(self) -> tuple[ReferenceTransformTask, ...]:
+ """Ordered transforms used by the internal pipeline."""
+ return tuple(self._transforms)
+
+ def claim(self) -> ResourceClaim:
+ """Declare the single external claim owned by this composed task.
+
+ v1 composition assumes source/transform claims are static after
+ construction; the validated claim is cached to keep arbitration cheap.
+ """
+ return self._claim
+
+ def is_active(self) -> bool:
+ """A composed pipeline is active when its source is active."""
+ return self._source.is_active()
+
+ def compute(self, state: CoordinatorState) -> JointCommandOutput | None:
+ """Run source then transforms in order."""
+ output = self._source.compute(state)
+ if output is None:
+ return None
+
+ for transform in self._transforms:
+ output = transform.compute_from_reference(state, output)
+ if output is None:
+ return None
+ return output
+
+ def on_preempted(self, by_task: str, joints: frozenset[JointName]) -> None:
+ """Forward preemption and reset transform state when available."""
+ self._source.on_preempted(by_task, joints)
+ for transform in self._transforms:
+ transform.on_preempted(by_task, joints)
+ reset = getattr(transform, "reset", None)
+ if callable(reset):
+ reset()
+
+ @staticmethod
+ def _validate_pipeline(
+ name: str,
+ source: ControlTask,
+ transforms: Sequence[ReferenceTransformTask],
+ ) -> ResourceClaim:
+ source_claim = source.claim()
+ if not source_claim.joints:
+ raise ValueError(f"ComposedControlTask '{name}' source must claim at least one joint")
+
+ for transform in transforms:
+ transform_claim = transform.claim()
+ if transform_claim.joints != source_claim.joints:
+ raise ValueError(
+ f"ComposedControlTask '{name}' transform '{transform.name}' joints "
+ f"{sorted(transform_claim.joints)} do not match source joints "
+ f"{sorted(source_claim.joints)}"
+ )
+ if transform_claim.mode != source_claim.mode:
+ raise ValueError(
+ f"ComposedControlTask '{name}' transform '{transform.name}' mode "
+ f"{transform_claim.mode} does not match source mode {source_claim.mode}"
+ )
+
+ return source_claim
diff --git a/dimos/control/task.py b/dimos/control/task.py
index 6e22e48246..d1d5e08d92 100644
--- a/dimos/control/task.py
+++ b/dimos/control/task.py
@@ -304,6 +304,33 @@ def set_velocities_by_name(self, velocities: dict[str, float], t_now: float) ->
...
+@runtime_checkable
+class ReferenceTransformTask(ControlTask, Protocol):
+ """Control task that can transform an upstream joint command reference.
+
+ A reference transform can still be registered as a normal coordinator-facing
+ ``ControlTask``. When used inside a composed task, the outer task owns the
+ coordinator claim and calls ``compute_from_reference()`` synchronously after
+ a source task has produced a reference command.
+ """
+
+ def compute_from_reference(
+ self,
+ state: CoordinatorState,
+ reference: JointCommandOutput,
+ ) -> JointCommandOutput | None:
+ """Transform an upstream command reference into a new command.
+
+ Args:
+ state: Current coordinator state for this tick.
+ reference: Upstream command from the source task or prior transform.
+
+ Returns:
+ Transformed command, or None to suppress output for this tick.
+ """
+ ...
+
+
class BaseControlTask(ControlTask):
"""Base class with no-op defaults for optional listener methods.
diff --git a/dimos/control/tasks/compliance_task/__registry__.py b/dimos/control/tasks/compliance_task/__registry__.py
new file mode 100644
index 0000000000..1418052bf3
--- /dev/null
+++ b/dimos/control/tasks/compliance_task/__registry__.py
@@ -0,0 +1,20 @@
+# Copyright 2026 Dimensional Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Lazy registry manifest for joint compliance tasks."""
+
+TASK_FACTORIES = {
+ "joint_compliance": "dimos.control.tasks.compliance_task.compliance_task:create_task",
+ "compliant_trajectory": "dimos.control.tasks.compliance_task.compliant_trajectory_task:create_task",
+}
diff --git a/dimos/control/tasks/compliance_task/compliance_task.py b/dimos/control/tasks/compliance_task/compliance_task.py
new file mode 100644
index 0000000000..e560a49f70
--- /dev/null
+++ b/dimos/control/tasks/compliance_task/compliance_task.py
@@ -0,0 +1,385 @@
+# Copyright 2026 Dimensional Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Joint-space compliance transform for position-servo references."""
+
+from __future__ import annotations
+
+from dataclasses import dataclass, field
+import math
+from typing import Any
+
+from dimos.control.components import JointName
+from dimos.control.task import (
+ BaseControlTask,
+ ControlMode,
+ CoordinatorState,
+ JointCommandOutput,
+ ResourceClaim,
+)
+from dimos.protocol.service.spec import BaseConfig
+from dimos.utils.logging_config import setup_logger
+
+logger = setup_logger()
+
+
+@dataclass
+class JointComplianceDiagnostics:
+ """Inspectable compliance state for tests and runtime diagnostics."""
+
+ offsets: dict[JointName, float] = field(default_factory=dict)
+ offset_velocities: dict[JointName, float] = field(default_factory=dict)
+ saturated: dict[JointName, bool] = field(default_factory=dict)
+ feedback_mode: str = "position_error"
+ last_safe_action: str = "none"
+
+
+@dataclass
+class JointComplianceTaskConfig:
+ """Configuration for a joint-space compliance task.
+
+ Scalar gains apply to all joints. Dict gains override individual joints.
+ Effort feedback is opt-in because several adapters expose placeholder zeros.
+ """
+
+ joint_names: list[JointName]
+ priority: int = 10
+ mass: float | dict[JointName, float] = 1.0
+ damping: float | dict[JointName, float] = 8.0
+ stiffness: float | dict[JointName, float] = 30.0
+ max_offset: float | dict[JointName, float] = 0.15
+ max_offset_velocity: float | dict[JointName, float] = 0.5
+ deadband: float | dict[JointName, float] = 0.0
+ use_effort_feedback: bool = False
+ effort_gain: float | dict[JointName, float] = 1.0
+
+
+class JointComplianceTask(BaseControlTask):
+ """Transform nominal joint references into bounded compliant commands."""
+
+ def __init__(self, name: str, config: JointComplianceTaskConfig) -> None:
+ if not config.joint_names:
+ raise ValueError(f"JointComplianceTask '{name}' requires at least one joint")
+ self._name = name
+ self._config = config
+ self._joint_names = frozenset(config.joint_names)
+ self._joint_names_list = list(config.joint_names)
+ self._active = False
+ self._offset = {joint: 0.0 for joint in self._joint_names_list}
+ self._offset_velocity = {joint: 0.0 for joint in self._joint_names_list}
+ self._diagnostics = JointComplianceDiagnostics(
+ offsets=dict(self._offset),
+ offset_velocities=dict(self._offset_velocity),
+ saturated={joint: False for joint in self._joint_names_list},
+ feedback_mode="effort" if config.use_effort_feedback else "position_error",
+ )
+
+ self._mass = self._resolve_positive_values(config.mass, "mass")
+ self._damping = self._resolve_nonnegative_values(config.damping, "damping")
+ self._stiffness = self._resolve_nonnegative_values(config.stiffness, "stiffness")
+ self._max_offset = self._resolve_nonnegative_values(config.max_offset, "max_offset")
+ self._max_offset_velocity = self._resolve_nonnegative_values(
+ config.max_offset_velocity,
+ "max_offset_velocity",
+ )
+ self._deadband = self._resolve_nonnegative_values(config.deadband, "deadband")
+ self._effort_gain = self._resolve_values(config.effort_gain, "effort_gain")
+
+ logger.info(f"JointComplianceTask {name} initialized for joints: {config.joint_names}")
+
+ @property
+ def name(self) -> str:
+ """Unique task identifier."""
+ return self._name
+
+ def claim(self) -> ResourceClaim:
+ """Declare position-servo resources for standalone or composed use."""
+ return ResourceClaim(
+ joints=self._joint_names,
+ priority=self._config.priority,
+ mode=ControlMode.SERVO_POSITION,
+ )
+
+ def is_active(self) -> bool:
+ """Standalone compliance does not emit commands until explicitly started."""
+ return self._active
+
+ def compute(self, state: CoordinatorState) -> JointCommandOutput | None:
+ """Standalone mode holds current joint positions with compliance reset."""
+ if not self._active:
+ return None
+ positions: list[float] = []
+ for joint in self._joint_names_list:
+ position = state.joints.get_position(joint)
+ if position is None:
+ self._set_safe_action("missing_state")
+ return None
+ positions.append(position)
+ reference = JointCommandOutput(
+ joint_names=self._joint_names_list,
+ positions=positions,
+ mode=ControlMode.SERVO_POSITION,
+ )
+ return self.compute_from_reference(state, reference)
+
+ def compute_from_reference(
+ self,
+ state: CoordinatorState,
+ reference: JointCommandOutput,
+ ) -> JointCommandOutput | None:
+ """Apply bounded admittance-style shaping to a position reference."""
+ if reference.mode != ControlMode.SERVO_POSITION or reference.positions is None:
+ self._set_safe_action("incompatible_reference")
+ return None
+ if reference.joint_names != self._joint_names_list:
+ self._set_safe_action("incompatible_joints")
+ return None
+ if not math.isfinite(state.dt) or state.dt <= 0.0:
+ self._set_safe_action("invalid_dt")
+ return reference
+ if any(not math.isfinite(q_ref) for q_ref in reference.positions):
+ self._set_safe_action("invalid_reference")
+ return None
+
+ q_cmd: list[float] = []
+ next_offset: dict[JointName, float] = {}
+ next_offset_velocity: dict[JointName, float] = {}
+ saturated: dict[JointName, bool] = {}
+ for joint, q_ref in zip(self._joint_names_list, reference.positions, strict=True):
+ q_actual = state.joints.get_position(joint)
+ if q_actual is None:
+ self._set_safe_action("missing_state")
+ return None
+ if not math.isfinite(q_actual):
+ self._set_safe_action("invalid_state")
+ return None
+ signal = self._feedback_signal(joint, q_actual, q_ref, state)
+ if signal is None:
+ return None
+
+ velocity = self._offset_velocity[joint]
+ offset = self._offset[joint]
+ if not math.isfinite(velocity) or not math.isfinite(offset):
+ self.reset()
+ self._set_safe_action("invalid_compliance_state")
+ return None
+ acceleration = (
+ signal / self._mass[joint]
+ - self._damping[joint] * velocity
+ - self._stiffness[joint] * offset
+ )
+ if not math.isfinite(acceleration):
+ self._set_safe_action("invalid_acceleration")
+ return None
+
+ velocity = self._clamp(
+ velocity + acceleration * state.dt,
+ -self._max_offset_velocity[joint],
+ self._max_offset_velocity[joint],
+ )
+ offset = offset + velocity * state.dt
+ if not math.isfinite(velocity) or not math.isfinite(offset):
+ self._set_safe_action("invalid_offset")
+ return None
+ clamped_offset = self._clamp(
+ offset,
+ -self._max_offset[joint],
+ self._max_offset[joint],
+ )
+ is_saturated = (
+ clamped_offset != offset or abs(velocity) >= self._max_offset_velocity[joint]
+ )
+
+ next_offset[joint] = clamped_offset
+ next_offset_velocity[joint] = velocity
+ saturated[joint] = is_saturated
+ command = q_ref + clamped_offset
+ if not math.isfinite(command):
+ self._set_safe_action("invalid_output")
+ return None
+ q_cmd.append(command)
+
+ self._offset.update(next_offset)
+ self._offset_velocity.update(next_offset_velocity)
+ self._diagnostics = JointComplianceDiagnostics(
+ offsets=dict(self._offset),
+ offset_velocities=dict(self._offset_velocity),
+ saturated=saturated,
+ feedback_mode="effort" if self._config.use_effort_feedback else "position_error",
+ last_safe_action="none",
+ )
+ return JointCommandOutput(
+ joint_names=list(reference.joint_names),
+ positions=q_cmd,
+ mode=ControlMode.SERVO_POSITION,
+ )
+
+ def on_preempted(self, by_task: str, joints: frozenset[JointName]) -> None:
+ """Reset compliance state if a higher-priority task takes any joint."""
+ if joints & self._joint_names:
+ logger.warning(
+ f"JointComplianceTask {self._name} preempted by {by_task} on joints {joints}"
+ )
+ self.reset()
+
+ def start(self) -> None:
+ """Activate standalone hold behavior."""
+ self._active = True
+
+ def stop(self) -> None:
+ """Deactivate standalone behavior and reset offsets."""
+ self._active = False
+ self.reset()
+
+ def reset(self) -> None:
+ """Reset all offsets, velocities, and saturation diagnostics."""
+ for joint in self._joint_names_list:
+ self._offset[joint] = 0.0
+ self._offset_velocity[joint] = 0.0
+ self._diagnostics = JointComplianceDiagnostics(
+ offsets=dict(self._offset),
+ offset_velocities=dict(self._offset_velocity),
+ saturated={joint: False for joint in self._joint_names_list},
+ feedback_mode="effort" if self._config.use_effort_feedback else "position_error",
+ last_safe_action="reset",
+ )
+
+ def get_diagnostics(self) -> JointComplianceDiagnostics:
+ """Return a copy of the latest diagnostics."""
+ return JointComplianceDiagnostics(
+ offsets=dict(self._diagnostics.offsets),
+ offset_velocities=dict(self._diagnostics.offset_velocities),
+ saturated=dict(self._diagnostics.saturated),
+ feedback_mode=self._diagnostics.feedback_mode,
+ last_safe_action=self._diagnostics.last_safe_action,
+ )
+
+ def _feedback_signal(
+ self,
+ joint: JointName,
+ q_actual: float,
+ q_ref: float,
+ state: CoordinatorState,
+ ) -> float | None:
+ if self._config.use_effort_feedback:
+ effort = state.joints.get_effort(joint)
+ if effort is None:
+ return 0.0
+ if not math.isfinite(effort):
+ self._set_safe_action("invalid_effort")
+ return None
+ signal = self._effort_gain[joint] * effort
+ else:
+ signal = q_actual - q_ref
+ if not math.isfinite(signal):
+ self._set_safe_action("invalid_feedback")
+ return None
+
+ deadband = self._deadband[joint]
+ if abs(signal) <= deadband:
+ return 0.0
+ return signal - math.copysign(deadband, signal)
+
+ def _set_safe_action(self, action: str) -> None:
+ self._diagnostics = JointComplianceDiagnostics(
+ offsets=dict(self._offset),
+ offset_velocities=dict(self._offset_velocity),
+ saturated=dict(self._diagnostics.saturated),
+ feedback_mode="effort" if self._config.use_effort_feedback else "position_error",
+ last_safe_action=action,
+ )
+
+ def _resolve_positive_values(
+ self,
+ values: float | dict[JointName, float],
+ label: str,
+ ) -> dict[JointName, float]:
+ resolved = self._resolve_values(values, label)
+ for joint, value in resolved.items():
+ if value <= 0.0:
+ raise ValueError(
+ f"JointComplianceTask '{self._name}' {label} for {joint} must be > 0"
+ )
+ return resolved
+
+ def _resolve_nonnegative_values(
+ self,
+ values: float | dict[JointName, float],
+ label: str,
+ ) -> dict[JointName, float]:
+ resolved = self._resolve_values(values, label)
+ for joint, value in resolved.items():
+ if value < 0.0:
+ raise ValueError(
+ f"JointComplianceTask '{self._name}' {label} for {joint} must be >= 0"
+ )
+ return resolved
+
+ def _resolve_values(
+ self,
+ values: float | dict[JointName, float],
+ label: str,
+ ) -> dict[JointName, float]:
+ if isinstance(values, dict):
+ missing = [joint for joint in self._joint_names_list if joint not in values]
+ if missing:
+ raise ValueError(
+ f"JointComplianceTask '{self._name}' {label} missing joints: {missing}"
+ )
+ resolved = {joint: float(values[joint]) for joint in self._joint_names_list}
+ else:
+ resolved = {joint: float(values) for joint in self._joint_names_list}
+ for joint, value in resolved.items():
+ if not math.isfinite(value):
+ raise ValueError(
+ f"JointComplianceTask '{self._name}' {label} for {joint} must be finite"
+ )
+ return resolved
+
+ @staticmethod
+ def _clamp(value: float, lower: float, upper: float) -> float:
+ return min(max(value, lower), upper)
+
+
+class JointComplianceTaskParams(BaseConfig):
+ mass: float | dict[str, float] | None = None
+ damping: float | dict[str, float] | None = None
+ stiffness: float | dict[str, float] | None = None
+ max_offset: float | dict[str, float] | None = None
+ max_offset_velocity: float | dict[str, float] | None = None
+ deadband: float | dict[str, float] | None = None
+ use_effort_feedback: bool = False
+ effort_gain: float | dict[str, float] | None = None
+
+
+def create_task(cfg: Any, hardware: Any) -> JointComplianceTask:
+ params = JointComplianceTaskParams.model_validate(cfg.params)
+ kwargs: dict[str, object] = {
+ "joint_names": cfg.joint_names,
+ "priority": cfg.priority,
+ "use_effort_feedback": params.use_effort_feedback,
+ }
+ for key in (
+ "mass",
+ "damping",
+ "stiffness",
+ "max_offset",
+ "max_offset_velocity",
+ "deadband",
+ "effort_gain",
+ ):
+ value = getattr(params, key)
+ if value is not None:
+ kwargs[key] = value
+ return JointComplianceTask(cfg.name, JointComplianceTaskConfig(**kwargs)) # type: ignore[arg-type]
diff --git a/dimos/control/tasks/compliance_task/compliant_trajectory_task.py b/dimos/control/tasks/compliance_task/compliant_trajectory_task.py
new file mode 100644
index 0000000000..8c9e29f5fb
--- /dev/null
+++ b/dimos/control/tasks/compliance_task/compliant_trajectory_task.py
@@ -0,0 +1,136 @@
+# Copyright 2026 Dimensional Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Composed compliant joint trajectory task."""
+
+from __future__ import annotations
+
+from typing import Any
+
+from dimos.control.composition import ComposedControlTask
+from dimos.control.task import CoordinatorState, JointCommandOutput
+from dimos.control.tasks.compliance_task.compliance_task import (
+ JointComplianceDiagnostics,
+ JointComplianceTask,
+ JointComplianceTaskConfig,
+ JointComplianceTaskParams,
+)
+from dimos.control.tasks.trajectory_task.trajectory_task import (
+ JointTrajectoryTask,
+ JointTrajectoryTaskConfig,
+)
+from dimos.msgs.trajectory_msgs.JointTrajectory import JointTrajectory
+from dimos.msgs.trajectory_msgs.TrajectoryStatus import TrajectoryState
+
+
+class CompliantJointTrajectoryTask(ComposedControlTask):
+ """Joint trajectory source followed by a joint compliance transform."""
+
+ def __init__(
+ self,
+ name: str,
+ trajectory: JointTrajectoryTask,
+ compliance: JointComplianceTask,
+ ) -> None:
+ super().__init__(name=name, source=trajectory, transforms=[compliance])
+ self._trajectory = trajectory
+ self._compliance = compliance
+
+ def compute(self, state: CoordinatorState) -> JointCommandOutput | None:
+ reference = self._trajectory.compute(state)
+ if reference is None:
+ return None
+ if not self._trajectory.is_active():
+ self._compliance.reset()
+ return reference
+ return self._compliance.compute_from_reference(state, reference)
+
+ def execute(self, trajectory: JointTrajectory) -> bool:
+ """Start executing a trajectory and reset prior compliance offsets."""
+ accepted = self._trajectory.execute(trajectory)
+ if accepted:
+ self._compliance.reset()
+ return accepted
+
+ def cancel(self) -> bool:
+ """Cancel trajectory execution and reset compliance offsets."""
+ cancelled = self._trajectory.cancel()
+ if cancelled:
+ self._compliance.reset()
+ return cancelled
+
+ def reset(self) -> bool:
+ """Reset trajectory and compliance state."""
+ reset = self._trajectory.reset()
+ if reset:
+ self._compliance.reset()
+ return reset
+
+ def get_state(self) -> TrajectoryState:
+ """Get the underlying trajectory state."""
+ return self._trajectory.get_state()
+
+ def get_progress(self, t_now: float) -> float:
+ """Get trajectory execution progress."""
+ return self._trajectory.get_progress(t_now)
+
+ def get_compliance_diagnostics(self) -> JointComplianceDiagnostics:
+ """Get current compliance diagnostics."""
+ return self._compliance.get_diagnostics()
+
+
+def create_compliant_joint_trajectory_task(
+ name: str,
+ joint_names: list[str],
+ priority: int = 10,
+ compliance_config: JointComplianceTaskConfig | None = None,
+) -> CompliantJointTrajectoryTask:
+ """Create a compliant trajectory task from joint names and optional gains."""
+ trajectory = JointTrajectoryTask(
+ name=f"{name}.trajectory",
+ config=JointTrajectoryTaskConfig(joint_names=joint_names, priority=priority),
+ )
+ compliance = JointComplianceTask(
+ name=f"{name}.compliance",
+ config=compliance_config
+ or JointComplianceTaskConfig(joint_names=joint_names, priority=priority),
+ )
+ return CompliantJointTrajectoryTask(name=name, trajectory=trajectory, compliance=compliance)
+
+
+def create_task(cfg: Any, hardware: Any) -> CompliantJointTrajectoryTask:
+ params = JointComplianceTaskParams.model_validate(cfg.params)
+ kwargs: dict[str, object] = {
+ "joint_names": cfg.joint_names,
+ "priority": cfg.priority,
+ "use_effort_feedback": params.use_effort_feedback,
+ }
+ for key in (
+ "mass",
+ "damping",
+ "stiffness",
+ "max_offset",
+ "max_offset_velocity",
+ "deadband",
+ "effort_gain",
+ ):
+ value = getattr(params, key)
+ if value is not None:
+ kwargs[key] = value
+ return create_compliant_joint_trajectory_task(
+ name=cfg.name,
+ joint_names=cfg.joint_names,
+ priority=cfg.priority,
+ compliance_config=JointComplianceTaskConfig(**kwargs), # type: ignore[arg-type]
+ )
diff --git a/dimos/control/tasks/compliance_task/test_compliance_task.py b/dimos/control/tasks/compliance_task/test_compliance_task.py
new file mode 100644
index 0000000000..6d097fcbb7
--- /dev/null
+++ b/dimos/control/tasks/compliance_task/test_compliance_task.py
@@ -0,0 +1,276 @@
+# Copyright 2026 Dimensional Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import pytest
+
+from dimos.control.task import ControlMode, CoordinatorState, JointCommandOutput, JointStateSnapshot
+from dimos.control.tasks.compliance_task.compliance_task import (
+ JointComplianceTask,
+ JointComplianceTaskConfig,
+)
+from dimos.control.tasks.compliance_task.compliant_trajectory_task import (
+ create_compliant_joint_trajectory_task,
+)
+from dimos.msgs.trajectory_msgs.JointTrajectory import JointTrajectory
+from dimos.msgs.trajectory_msgs.TrajectoryPoint import TrajectoryPoint
+from dimos.msgs.trajectory_msgs.TrajectoryStatus import TrajectoryState
+
+JOINTS = ["arm/j1", "arm/j2"]
+
+
+def make_state(
+ positions: dict[str, float] | None = None,
+ efforts: dict[str, float] | None = None,
+ dt: float = 0.01,
+ t_now: float = 1.0,
+) -> CoordinatorState:
+ return CoordinatorState(
+ joints=JointStateSnapshot(
+ joint_positions=positions or {"arm/j1": 0.0, "arm/j2": 0.0},
+ joint_velocities={"arm/j1": 0.0, "arm/j2": 0.0},
+ joint_efforts=efforts or {},
+ ),
+ t_now=t_now,
+ dt=dt,
+ )
+
+
+def make_reference(positions: list[float] | None = None) -> JointCommandOutput:
+ return JointCommandOutput(
+ joint_names=JOINTS,
+ positions=positions or [0.0, 0.0],
+ mode=ControlMode.SERVO_POSITION,
+ )
+
+
+def make_trajectory() -> JointTrajectory:
+ return JointTrajectory(
+ joint_names=JOINTS,
+ points=[
+ TrajectoryPoint(time_from_start=0.0, positions=[0.0, 0.0]),
+ TrajectoryPoint(time_from_start=1.0, positions=[1.0, 2.0]),
+ ],
+ )
+
+
+def make_invalid_trajectory() -> JointTrajectory:
+ return JointTrajectory(joint_names=JOINTS, points=[])
+
+
+def test_compliance_passes_through_free_space_with_near_zero_offsets() -> None:
+ task = JointComplianceTask("compliance", JointComplianceTaskConfig(joint_names=JOINTS))
+
+ output = task.compute_from_reference(make_state(), make_reference([0.2, -0.1]))
+
+ assert output is not None
+ assert output.positions == pytest.approx([0.2, -0.1], abs=1e-4)
+ diagnostics = task.get_diagnostics()
+ assert max(abs(offset) for offset in diagnostics.offsets.values()) < 1e-4
+ assert max(abs(velocity) for velocity in diagnostics.offset_velocities.values()) < 1e-2
+
+
+def test_compliance_bounds_offset_and_velocity_under_tracking_resistance() -> None:
+ task = JointComplianceTask(
+ "compliance",
+ JointComplianceTaskConfig(
+ joint_names=JOINTS,
+ mass=0.1,
+ damping=0.0,
+ stiffness=0.0,
+ max_offset=0.02,
+ max_offset_velocity=0.03,
+ ),
+ )
+
+ output: JointCommandOutput | None = None
+ for step in range(10):
+ output = task.compute_from_reference(
+ make_state(positions={"arm/j1": 1.0, "arm/j2": -1.0}, dt=0.1, t_now=float(step)),
+ make_reference([0.0, 0.0]),
+ )
+
+ assert output is not None
+ assert output.positions == pytest.approx([0.02, -0.02])
+ diagnostics = task.get_diagnostics()
+ assert diagnostics.offset_velocities == pytest.approx({"arm/j1": 0.03, "arm/j2": -0.03})
+ assert diagnostics.saturated == {"arm/j1": True, "arm/j2": True}
+
+
+def test_effort_feedback_is_opt_in_and_zero_placeholder_effort_is_ignored_by_default() -> None:
+ default_task = JointComplianceTask(
+ "default",
+ JointComplianceTaskConfig(joint_names=JOINTS, mass=1.0, damping=0.0, stiffness=0.0),
+ )
+ effort_task = JointComplianceTask(
+ "effort",
+ JointComplianceTaskConfig(
+ joint_names=JOINTS, mass=1.0, damping=0.0, stiffness=0.0, use_effort_feedback=True
+ ),
+ )
+
+ state = make_state(
+ positions={"arm/j1": 1.0, "arm/j2": 0.0}, efforts={"arm/j1": 0.0, "arm/j2": 0.0}, dt=0.1
+ )
+
+ default_output = default_task.compute_from_reference(state, make_reference([0.0, 0.0]))
+ effort_output = effort_task.compute_from_reference(state, make_reference([0.0, 0.0]))
+
+ assert default_output is not None
+ assert effort_output is not None
+ assert default_output.positions == pytest.approx([0.01, 0.0])
+ assert effort_output.positions == pytest.approx([0.0, 0.0])
+ assert default_task.get_diagnostics().feedback_mode == "position_error"
+ assert effort_task.get_diagnostics().feedback_mode == "effort"
+
+
+def test_compliance_invalid_inputs_record_safe_action() -> None:
+ task = JointComplianceTask("compliance", JointComplianceTaskConfig(joint_names=JOINTS))
+
+ output = task.compute_from_reference(make_state(dt=0.0), make_reference([0.1, 0.2]))
+ assert output is not None
+ assert output.positions == [0.1, 0.2]
+ assert task.get_diagnostics().last_safe_action == "invalid_dt"
+
+ assert (
+ task.compute_from_reference(make_state(positions={"arm/j1": 0.0}), make_reference()) is None
+ )
+ assert task.get_diagnostics().last_safe_action == "missing_state"
+
+ incompatible = JointCommandOutput(
+ joint_names=JOINTS, velocities=[0.0, 0.0], mode=ControlMode.VELOCITY
+ )
+ assert task.compute_from_reference(make_state(), incompatible) is None
+ assert task.get_diagnostics().last_safe_action == "incompatible_reference"
+
+
+def test_compliance_rejects_non_finite_config_and_runtime_values() -> None:
+ with pytest.raises(ValueError, match="must be finite"):
+ JointComplianceTask("bad", JointComplianceTaskConfig(joint_names=JOINTS, mass=float("nan")))
+
+ task = JointComplianceTask("compliance", JointComplianceTaskConfig(joint_names=JOINTS))
+
+ assert task.compute_from_reference(make_state(), make_reference([float("nan"), 0.0])) is None
+ assert task.get_diagnostics().last_safe_action == "invalid_reference"
+
+ assert (
+ task.compute_from_reference(
+ make_state(positions={"arm/j1": float("inf"), "arm/j2": 0.0}), make_reference()
+ )
+ is None
+ )
+ assert task.get_diagnostics().last_safe_action == "invalid_state"
+
+ effort_task = JointComplianceTask(
+ "effort",
+ JointComplianceTaskConfig(joint_names=JOINTS, use_effort_feedback=True),
+ )
+ assert (
+ effort_task.compute_from_reference(
+ make_state(efforts={"arm/j1": float("nan"), "arm/j2": 0.0}), make_reference()
+ )
+ is None
+ )
+ assert effort_task.get_diagnostics().last_safe_action == "invalid_effort"
+
+
+def test_compliant_trajectory_samples_and_shapes_output() -> None:
+ task = create_compliant_joint_trajectory_task(
+ "compliant",
+ JOINTS,
+ compliance_config=JointComplianceTaskConfig(
+ joint_names=JOINTS, mass=1.0, damping=0.0, stiffness=0.0
+ ),
+ )
+ assert task.execute(make_trajectory()) is True
+
+ first = task.compute(make_state(t_now=10.0, dt=0.1))
+ second = task.compute(make_state(positions={"arm/j1": 1.0, "arm/j2": 0.0}, t_now=10.5, dt=0.1))
+
+ assert first is not None
+ assert second is not None
+ assert first.positions == pytest.approx([0.0, 0.0])
+ assert second.positions == pytest.approx([0.505, 0.99])
+
+
+def test_compliant_trajectory_completion_cancel_and_preemption_match_rigid_task() -> None:
+ task = create_compliant_joint_trajectory_task("compliant", JOINTS)
+ assert task.execute(make_trajectory()) is True
+ completed = task.compute(make_state(t_now=1.0, dt=0.01))
+ assert completed is not None
+ assert task.compute(make_state(t_now=2.1, dt=0.01)) is not None
+ assert task.get_state() == TrajectoryState.COMPLETED
+ assert task.cancel() is False
+ assert task.reset() is True
+ assert task.get_state() == TrajectoryState.IDLE
+
+ assert task.execute(make_trajectory()) is True
+ assert task.cancel() is True
+ assert task.get_state() == TrajectoryState.ABORTED
+
+ assert task.reset() is True
+ assert task.execute(make_trajectory()) is True
+ task.on_preempted("higher", frozenset({"arm/j1"}))
+ assert task.get_state() == TrajectoryState.ABORTED
+ assert task.get_compliance_diagnostics().last_safe_action == "reset"
+
+
+def test_compliant_trajectory_completion_returns_nominal_final_and_resets_compliance() -> None:
+ task = create_compliant_joint_trajectory_task(
+ "compliant",
+ JOINTS,
+ compliance_config=JointComplianceTaskConfig(
+ joint_names=JOINTS, mass=0.1, damping=0.0, stiffness=0.0
+ ),
+ )
+ assert task.execute(make_trajectory()) is True
+
+ shaped = task.compute(make_state(positions={"arm/j1": 1.0, "arm/j2": -1.0}, t_now=0.0, dt=0.1))
+ assert shaped is not None
+ assert any(abs(position) > 0.0 for position in shaped.positions or [])
+
+ completed = task.compute(
+ make_state(positions={"arm/j1": 1.0, "arm/j2": -1.0}, t_now=1.1, dt=0.1)
+ )
+
+ assert completed is not None
+ assert completed.positions == pytest.approx([1.0, 2.0])
+ assert task.get_state() == TrajectoryState.COMPLETED
+ diagnostics = task.get_compliance_diagnostics()
+ assert diagnostics.offsets == pytest.approx({"arm/j1": 0.0, "arm/j2": 0.0})
+ assert diagnostics.last_safe_action == "reset"
+
+
+def test_rejected_execute_and_forbidden_reset_preserve_compliance_state() -> None:
+ task = create_compliant_joint_trajectory_task(
+ "compliant",
+ JOINTS,
+ compliance_config=JointComplianceTaskConfig(
+ joint_names=JOINTS, mass=0.1, damping=0.0, stiffness=0.0
+ ),
+ )
+ assert task.execute(make_trajectory()) is True
+ output = task.compute(make_state(positions={"arm/j1": 1.0, "arm/j2": -1.0}, t_now=0.0, dt=0.1))
+ assert output is not None
+ before = task.get_compliance_diagnostics()
+ assert any(abs(offset) > 0.0 for offset in before.offsets.values())
+
+ assert task.execute(make_invalid_trajectory()) is False
+ after_rejected_execute = task.get_compliance_diagnostics()
+ assert after_rejected_execute.offsets == pytest.approx(before.offsets)
+ assert task.get_state() == TrajectoryState.EXECUTING
+
+ assert task.reset() is False
+ after_rejected_reset = task.get_compliance_diagnostics()
+ assert after_rejected_reset.offsets == pytest.approx(before.offsets)
+ assert task.get_state() == TrajectoryState.EXECUTING
diff --git a/dimos/control/test_composition.py b/dimos/control/test_composition.py
new file mode 100644
index 0000000000..a5a9a6198a
--- /dev/null
+++ b/dimos/control/test_composition.py
@@ -0,0 +1,147 @@
+# Copyright 2026 Dimensional Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from dimos.control.composition import ComposedControlTask
+from dimos.control.task import (
+ BaseControlTask,
+ ControlMode,
+ CoordinatorState,
+ JointCommandOutput,
+ JointStateSnapshot,
+ ResourceClaim,
+)
+
+JOINTS = ["arm/j1", "arm/j2"]
+
+
+class FakeSourceTask(BaseControlTask):
+ def __init__(self) -> None:
+ self.preemptions: list[tuple[str, frozenset[str]]] = []
+ self.active = True
+ self.output: JointCommandOutput | None = JointCommandOutput(
+ joint_names=JOINTS,
+ positions=[1.0, 2.0],
+ mode=ControlMode.SERVO_POSITION,
+ )
+
+ @property
+ def name(self) -> str:
+ return "source"
+
+ def claim(self) -> ResourceClaim:
+ return ResourceClaim(frozenset(JOINTS), priority=7, mode=ControlMode.SERVO_POSITION)
+
+ def is_active(self) -> bool:
+ return self.active
+
+ def compute(self, state: CoordinatorState) -> JointCommandOutput | None:
+ return self.output
+
+ def on_preempted(self, by_task: str, joints: frozenset[str]) -> None:
+ self.preemptions.append((by_task, joints))
+
+
+class AddTransform(BaseControlTask):
+ def __init__(self, name: str, delta: float, joints: list[str] | None = None) -> None:
+ self._name = name
+ self._delta = delta
+ self._joints = joints or JOINTS
+ self.preemptions: list[tuple[str, frozenset[str]]] = []
+ self.reset_count = 0
+
+ @property
+ def name(self) -> str:
+ return self._name
+
+ def claim(self) -> ResourceClaim:
+ return ResourceClaim(frozenset(self._joints), priority=7, mode=ControlMode.SERVO_POSITION)
+
+ def is_active(self) -> bool:
+ return True
+
+ def compute(self, state: CoordinatorState) -> JointCommandOutput | None:
+ return None
+
+ def compute_from_reference(
+ self,
+ state: CoordinatorState,
+ reference: JointCommandOutput,
+ ) -> JointCommandOutput:
+ assert reference.positions is not None
+ return JointCommandOutput(
+ joint_names=reference.joint_names,
+ positions=[value + self._delta for value in reference.positions],
+ mode=reference.mode,
+ )
+
+ def on_preempted(self, by_task: str, joints: frozenset[str]) -> None:
+ self.preemptions.append((by_task, joints))
+
+ def reset(self) -> None:
+ self.reset_count += 1
+
+
+def make_state() -> CoordinatorState:
+ return CoordinatorState(joints=JointStateSnapshot(), t_now=1.0, dt=0.01)
+
+
+def test_composed_task_forwards_output_through_transforms_in_order() -> None:
+ task = ComposedControlTask(
+ "composed", FakeSourceTask(), [AddTransform("a", 1.0), AddTransform("b", 10.0)]
+ )
+
+ output = task.compute(make_state())
+
+ assert output is not None
+ assert output.positions == [12.0, 13.0]
+
+
+def test_composed_task_active_state_follows_source_and_suppresses_none() -> None:
+ source = FakeSourceTask()
+ task = ComposedControlTask("composed", source, [AddTransform("a", 1.0)])
+
+ source.active = False
+ assert task.is_active() is False
+ source.output = None
+ assert task.compute(make_state()) is None
+
+
+def test_composed_task_validation_rejects_empty_and_mismatched_pipeline() -> None:
+ source = FakeSourceTask()
+
+ try:
+ ComposedControlTask("composed", source, [])
+ except ValueError as exc:
+ assert "requires at least one transform" in str(exc)
+ else:
+ raise AssertionError("empty transforms should fail")
+
+ try:
+ ComposedControlTask("composed", source, [AddTransform("bad", 0.0, joints=["arm/j1"])])
+ except ValueError as exc:
+ assert "do not match source joints" in str(exc)
+ else:
+ raise AssertionError("mismatched transform joints should fail")
+
+
+def test_composed_task_forwards_preemption_and_resets_transforms() -> None:
+ source = FakeSourceTask()
+ transform = AddTransform("a", 1.0)
+ task = ComposedControlTask("composed", source, [transform])
+
+ task.on_preempted("higher", frozenset({"arm/j1"}))
+
+ assert source.preemptions == [("higher", frozenset({"arm/j1"}))]
+ assert transform.preemptions == [("higher", frozenset({"arm/j1"}))]
+ assert transform.reset_count == 1
diff --git a/dimos/robot/all_blueprints.py b/dimos/robot/all_blueprints.py
index 22bf7d27e1..99bbc85021 100644
--- a/dimos/robot/all_blueprints.py
+++ b/dimos/robot/all_blueprints.py
@@ -129,6 +129,7 @@
"xarm-perception-sim": "dimos.robot.manipulators.xarm.blueprints.simulation:xarm_perception_sim",
"xarm-perception-sim-agent": "dimos.robot.manipulators.xarm.blueprints.agentic:xarm_perception_sim_agent",
"xarm6-planner-only": "dimos.robot.manipulators.xarm.blueprints.basic:xarm6_planner_only",
+ "xarm7-compliant-obstacle-sim": "dimos.robot.manipulators.xarm.blueprints.simulation:xarm7_compliant_obstacle_sim",
"xarm7-planner-coordinator": "dimos.robot.manipulators.xarm.blueprints.basic:xarm7_planner_coordinator",
"xarm7-planner-coordinator-agent": "dimos.robot.manipulators.xarm.blueprints.agentic:xarm7_planner_coordinator_agent",
}
diff --git a/dimos/robot/manipulators/common/blueprints.py b/dimos/robot/manipulators/common/blueprints.py
index 1f85998a67..388de8c9fe 100644
--- a/dimos/robot/manipulators/common/blueprints.py
+++ b/dimos/robot/manipulators/common/blueprints.py
@@ -47,6 +47,23 @@ def trajectory_task(
)
+def compliant_trajectory_task(
+ hardware: HardwareComponent,
+ *,
+ name: str | None = None,
+ priority: int = 10,
+ params: dict[str, Any] | None = None,
+) -> TaskConfig:
+ """Create a compliant joint trajectory task config for a manipulator."""
+ return TaskConfig(
+ name=name or trajectory_task_name(hardware.hardware_id),
+ type="compliant_trajectory",
+ joint_names=hardware.joints,
+ priority=priority,
+ params=params or {},
+ )
+
+
def cartesian_ik_task(
hardware: HardwareComponent,
*,
diff --git a/dimos/robot/manipulators/xarm/blueprints/simulation.py b/dimos/robot/manipulators/xarm/blueprints/simulation.py
index d4c397f48b..d49128cd0a 100644
--- a/dimos/robot/manipulators/xarm/blueprints/simulation.py
+++ b/dimos/robot/manipulators/xarm/blueprints/simulation.py
@@ -21,11 +21,16 @@
from dimos.core.coordination.blueprints import autoconnect
from dimos.manipulation.pick_and_place_module import PickAndPlaceModule
from dimos.perception.object_scene_registration import ObjectSceneRegistrationModule
-from dimos.robot.manipulators.common.blueprints import coordinator, trajectory_task
+from dimos.robot.manipulators.common.blueprints import (
+ compliant_trajectory_task,
+ coordinator,
+ trajectory_task,
+)
from dimos.robot.manipulators.xarm.config import (
XARM7_SIM_PATH,
make_xarm7_model_config,
make_xarm_hardware,
+ xarm7_compliance_obstacle_scene_path,
)
from dimos.simulation.engines.mujoco_sim_module import MujocoSimModule
from dimos.visualization.rerun.bridge import RerunBridgeModule
@@ -70,3 +75,42 @@
),
RerunBridgeModule.blueprint(),
)
+
+_xarm7_compliance_obstacle_scene = xarm7_compliance_obstacle_scene_path()
+
+_xarm7_compliance_obstacle_hw = make_xarm_hardware(
+ "arm",
+ 7,
+ adapter_type="sim_mujoco",
+ address=str(_xarm7_compliance_obstacle_scene),
+ gripper=False,
+ home_joints=XARM7_SIM_HOME,
+)
+
+xarm7_compliant_obstacle_sim = autoconnect(
+ MujocoSimModule.blueprint(
+ address=str(_xarm7_compliance_obstacle_scene),
+ headless=False,
+ dof=7,
+ camera_name="wrist_camera",
+ base_frame_id="link7",
+ ),
+ coordinator(
+ hardware=[_xarm7_compliance_obstacle_hw],
+ tasks=[
+ compliant_trajectory_task(
+ _xarm7_compliance_obstacle_hw,
+ params={
+ "mass": 0.7,
+ "damping": 6.0,
+ "stiffness": 18.0,
+ "max_offset": 0.12,
+ "max_offset_velocity": 0.35,
+ "deadband": 0.005,
+ "use_effort_feedback": False,
+ },
+ )
+ ],
+ ),
+ RerunBridgeModule.blueprint(),
+)
diff --git a/dimos/robot/manipulators/xarm/config.py b/dimos/robot/manipulators/xarm/config.py
index d918ee94a8..a33baec36b 100644
--- a/dimos/robot/manipulators/xarm/config.py
+++ b/dimos/robot/manipulators/xarm/config.py
@@ -19,6 +19,7 @@
from pathlib import Path
from typing import Any
+from dimos.constants import DIMOS_PROJECT_ROOT
from dimos.control.components import HardwareComponent, HardwareType, make_joints
from dimos.core.global_config import global_config
from dimos.manipulation.planning.spec.config import RobotModelConfig
@@ -54,6 +55,24 @@
XARM7_FK_MODEL = LfsPath("xarm_description/urdf/xarm7/xarm7.urdf")
XARM6_SIM_PATH = LfsPath("xarm6/scene.xml")
XARM7_SIM_PATH = LfsPath("xarm7/scene.xml")
+XARM7_COMPLIANCE_OBSTACLE_SIM_PATH = DIMOS_PROJECT_ROOT / "data/xarm7/compliance_obstacle_scene.xml"
+XARM7_COMPLIANCE_OBSTACLE_TEMPLATE_PATH = (
+ DIMOS_PROJECT_ROOT / "dimos/robot/manipulators/xarm/mujoco/compliance_obstacle_scene.xml"
+)
+
+
+def xarm7_compliance_obstacle_scene_path() -> Path:
+ """Return the local xArm7 obstacle scene, ensuring included LFS assets exist."""
+ # The obstacle scene includes ``xarm7.xml`` from the xArm7 LFS asset. Resolve the base scene
+ # first so a fresh checkout downloads/extracts the xArm7 LFS asset directory.
+ Path(str(XARM7_SIM_PATH)).exists()
+ scene_xml = XARM7_COMPLIANCE_OBSTACLE_TEMPLATE_PATH.read_text()
+ if (
+ not XARM7_COMPLIANCE_OBSTACLE_SIM_PATH.exists()
+ or XARM7_COMPLIANCE_OBSTACLE_SIM_PATH.read_text() != scene_xml
+ ):
+ XARM7_COMPLIANCE_OBSTACLE_SIM_PATH.write_text(scene_xml)
+ return XARM7_COMPLIANCE_OBSTACLE_SIM_PATH
def _adapter_kwargs(home_joints: list[float] | None = None) -> dict[str, object]:
diff --git a/dimos/robot/manipulators/xarm/mujoco/compliance_obstacle_scene.xml b/dimos/robot/manipulators/xarm/mujoco/compliance_obstacle_scene.xml
new file mode 100644
index 0000000000..1c7e0461ba
--- /dev/null
+++ b/dimos/robot/manipulators/xarm/mujoco/compliance_obstacle_scene.xml
@@ -0,0 +1,56 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/docs/capabilities/manipulation/readme.md b/docs/capabilities/manipulation/readme.md
index e502947699..bd27da213b 100644
--- a/docs/capabilities/manipulation/readme.md
+++ b/docs/capabilities/manipulation/readme.md
@@ -76,6 +76,84 @@ preview() # Preview in Meshcat
execute() # Execute via coordinator
```
+### Rigid vs compliant joint trajectories
+
+Manipulator coordinator stacks can execute either rigid joint trajectories or
+compliant joint trajectories:
+
+- `trajectory` keeps the existing rigid behavior: the trajectory sampler emits
+ the final `SERVO_POSITION` command directly.
+- `compliant_trajectory` keeps the same nominal trajectory interface, then runs
+ an internal joint-compliance stage before the final coordinator command.
+
+The compliant task is still one coordinator-facing task. It does not create a
+coordinator-level task graph, and the trajectory and compliance stages do not
+compete for the same joints during arbitration.
+
+Version 1 is position-servo only. It reads joint position feedback and uses
+effort feedback only when explicitly enabled in task params; joint velocity
+feedback is not part of the v1 compliance law. This is deliberate: some arm
+adapters expose placeholder zero effort values, so zero torque readings are not
+treated as reliable contact data by default. The generic manipulator route also
+does not require torque output.
+
+Compliance behavior is bounded by per-joint virtual mass, damping, stiffness,
+maximum offset, and maximum offset velocity. If required joint position state is
+missing, the transform suppresses output instead of inventing offsets. If `dt`
+is invalid, it safely passes the nominal reference through for that tick. Offset
+and saturation diagnostics are available from the compliant task for tests and
+debugging.
+
+Blueprint helpers can select the behavior explicitly:
+
+```python skip
+from dimos.robot.manipulators.common.blueprints import (
+ compliant_trajectory_task,
+ trajectory_task,
+)
+
+rigid = trajectory_task(arm_hw)
+
+compliant = compliant_trajectory_task(
+ arm_hw,
+ params={
+ "stiffness": 20.0,
+ "damping": 6.0,
+ "max_offset": 0.1,
+ "max_offset_velocity": 0.4,
+ "use_effort_feedback": False,
+ },
+)
+```
+
+#### MuJoCo verification
+
+Use the dedicated xArm7 obstacle scene as the canonical manipulator simulation
+entry point for compliant trajectory checks:
+
+```bash
+uv run dimos --simulation run xarm7-compliant-obstacle-sim
+```
+
+The scene places a fixed wall near the robot's forward workspace and registers a
+`compliant_trajectory` task named `traj_arm`. For a free-space smoke check, run a
+trajectory that stays in front of the wall; for contact tuning, run a trajectory
+whose nominal target reaches behind the translucent marker. Expected pass/fail
+observations:
+
+- Free space: compliant offsets stay near zero and the final joint error remains
+ comparable to the rigid trajectory.
+- Rigid contact or obstruction: compliant offset grows only within configured
+ limits, saturation is observable in diagnostics, and the command does not keep
+ integrating through the bound.
+- Placeholder effort feedback: leave `use_effort_feedback=False` unless the
+ adapter is known to expose reliable effort readings for the scenario.
+
+Rigid table/obstacle and soft-contact demos should compare joint-space metrics
+such as peak effort, tracking error, offset, and saturation time. These demos are
+useful for tuning, but deterministic unit tests remain the source of truth for
+safe invalid-`dt`, missing-state, and saturation behavior.
+
### Planning Visualization
Manipulation visualization is configured on `ManipulationModuleConfig.visualization`.
diff --git a/docs/development/conventions.md b/docs/development/conventions.md
index 53259c74da..a24736fe15 100644
--- a/docs/development/conventions.md
+++ b/docs/development/conventions.md
@@ -10,3 +10,4 @@ This mostly to track when conventions change (with regard to codebase updates) b
- Similar to the `rerun_config` the `rrb` (rerun blueprint) is defined at a blueprint level right now, but ideally would be a per-module contribution with only a per-blueprint override of the layout.
- No `__init__.py` files
- Helper blueprints (like `_with_vis`) that should not be used on their own need to start with an underscore to avoid being picked up by the all_blueprints.py code generation step
+- Control tasks exposed to `ControlCoordinator` should keep the simple contract `claim()`, `is_active()`, and `compute() -> JointCommandOutput | None`. If a controller needs internal stages, use a composed task that still emits one final command; do not add coordinator-level task graphs unless multiple runtime-composable controllers justify the extra scheduling and safety semantics.
diff --git a/openspec/changes/add-compliant-joint-trajectory-task/.openspec.yaml b/openspec/changes/add-compliant-joint-trajectory-task/.openspec.yaml
new file mode 100644
index 0000000000..b2bdba5c52
--- /dev/null
+++ b/openspec/changes/add-compliant-joint-trajectory-task/.openspec.yaml
@@ -0,0 +1,2 @@
+schema: dimos-capability
+created: 2026-06-23
diff --git a/openspec/changes/add-compliant-joint-trajectory-task/design.md b/openspec/changes/add-compliant-joint-trajectory-task/design.md
new file mode 100644
index 0000000000..139161ea4f
--- /dev/null
+++ b/openspec/changes/add-compliant-joint-trajectory-task/design.md
@@ -0,0 +1,114 @@
+## Context
+
+DimOS control tasks currently expose a simple coordinator contract: each active task claims joints, computes a final `JointCommandOutput`, and the coordinator arbitrates one command per joint before routing to hardware. The existing passive joint trajectory task samples a nominal trajectory each tick and emits `SERVO_POSITION` commands directly. If a separate compliance task were run at the same time on the same joints, the two tasks would compete in arbitration instead of composing.
+
+Manipulator hardware support is also uneven. The common adapter path provides joint positions and may provide velocities/efforts, but torque command output is not available through the generic coordinator-to-manipulator route. xArm, Piper, OpenArm, A750, mock, and MuJoCo-backed adapters all support position commands; velocity and effort feedback quality varies by backend. The first compliant trajectory capability therefore needs to be position-servo based and capability-adaptive.
+
+MuJoCo simulation already supports manipulator scenes, SHM joint state exchange, position/velocity command targets, and effort feedback. It can be used to verify the behavior implemented here, but not as proof of a future torque impedance backend.
+
+## Goals / Non-Goals
+
+**Goals:**
+
+- Preserve the coordinator's existing final-command arbitration model.
+- Add a narrow composition pattern where one source control task can be followed by one or more reference-transforming control tasks.
+- Add compliant joint trajectory execution by composing trajectory sampling with bounded joint-space compliance.
+- Keep rigid trajectory execution unchanged and selectable.
+- Support operation without reliable effort feedback by using position/velocity feedback as the baseline signal.
+- Provide unit, task-composition, and MuJoCo verification coverage.
+
+**Non-Goals:**
+
+- No coordinator-level task graph, DAG scheduler, or ROS2-style exported reference interface system.
+- No torque impedance control in the first implementation.
+- No automatic inference that zero-valued effort feedback is reliable.
+- No runtime controller rewiring UI or CLI.
+- No broad refactor of teleop, whole-body control, navigation, or Cartesian controllers.
+
+## DimOS Architecture
+
+The change should add a small composition layer in the control package, either near `dimos/control/task.py` or in a focused composition module. The coordinator-facing composed task remains a normal `ControlTask`: it exposes one name, one claim set, one active lifecycle, and one final `JointCommandOutput`. Internally, it calls a source task and then ordered transform-capable tasks.
+
+The reference transform interface should be typed around existing coordinator data structures:
+
+```text
+compute_from_reference(CoordinatorState, JointCommandOutput) -> JointCommandOutput | None
+```
+
+Standalone control tasks continue to implement `compute(CoordinatorState)`. A transform-capable task can implement both standalone behavior and reference-transform behavior. In the compliant trajectory use case, the source is the existing joint trajectory task, and the transform is a joint compliance task that consumes `SERVO_POSITION` positions and emits adjusted `SERVO_POSITION` positions.
+
+The compliant task should use current joint state from `CoordinatorState`:
+
+- joint position is required for each controlled joint;
+- velocity is optional but used when available;
+- effort feedback is opt-in/configured and must not be assumed merely because an adapter returns a list of zeros.
+
+The compliance state should maintain per-joint offset and offset velocity. Each tick, it updates offset using an admittance-style virtual mass/damping/stiffness model, clamps offset and offset velocity, and emits `q_cmd = q_ref + offset`. The output mode remains `SERVO_POSITION`, so existing manipulator hardware routing continues to call position-command methods.
+
+Blueprint and CLI exposure should be minimal. Existing rigid trajectory tasks remain the default unless a blueprint/configuration explicitly selects compliant trajectory behavior. No skill/MCP tool exposure is required.
+
+If implementation adds or renames blueprint variables, the generated registry must be regenerated with `pytest dimos/robot/test_all_blueprints_generation.py`. If no blueprint registry surface changes, no generated registry update is needed.
+
+## Decisions
+
+1. **Use internal task composition, not coordinator-level chainability.**
+ - Rationale: the coordinator currently arbitrates final joint commands. Adding a graph scheduler would mix arbitration semantics with reference transformation semantics.
+ - Alternative considered: ROS2-style chainable controllers with exported reference interfaces. This was rejected for v1 because DimOS lacks controller-manager ordering and broad intermediate reference contracts.
+
+2. **Use position-servo compliance first.**
+ - Rationale: all target manipulator paths can accept position commands, while torque output is not generally exposed.
+ - Alternative considered: torque impedance. This is deferred until adapter protocols and hardware routes support explicit torque command output.
+
+3. **Make effort feedback explicit, not auto-detected.**
+ - Rationale: some adapters return zero efforts as a placeholder. Treating zeros as reliable torque data would create misleading contact behavior.
+ - Alternative considered: infer effort availability from nonempty effort arrays. This is unsafe with current adapters.
+
+4. **Keep composition narrow and linear.**
+ - Rationale: the initial use case is trajectory source plus compliance transform. A generic graph DSL would add complexity before multiple use cases prove the need.
+
+5. **Verify with joint-space metrics first, contact demos second.**
+ - Rationale: controller math and safety bounds can be tested deterministically without MuJoCo; contact scenes are useful integration evidence but depend on simulation asset tuning.
+
+## Safety / Simulation / Replay
+
+Safety constraints should be explicit in configuration and tests:
+
+- maximum offset per joint;
+- maximum offset velocity per joint;
+- optional maximum sustained tracking error or effort threshold;
+- behavior for missing joint state;
+- behavior for invalid or nonpositive `dt`;
+- reset behavior on start, completion, abort, and preemption.
+
+Under missing required joint position state, the compliant transform should fail safe by returning no command or a documented hold/pass-through behavior rather than producing an arbitrary offset. Under clamp saturation, diagnostics should make saturation visible to tests and operators.
+
+MuJoCo verification should use existing manipulator simulation as much as possible. The first smoke test should run without contact and confirm that compliant output tracks similarly to rigid trajectory output when no disturbance exists. A second integration/demo scene should drive an end effector into a rigid table or obstacle and compare rigid trajectory behavior against compliant trajectory behavior using peak effort, tracking error, compliance offset, and saturation metrics. A soft cushion/contact scene can follow after the rigid contact path is stable.
+
+Replay support is not a primary target because this is an active control behavior. Any replay-based tests should be limited to deterministic state/command calculations rather than claiming physical contact validation.
+
+## Risks / Trade-offs
+
+- **Tuning risk:** mass/damping/stiffness defaults may be unstable or ineffective across robots. Mitigation: conservative defaults, per-joint clamps, and tests that verify bounded behavior.
+- **Feedback quality risk:** adapters differ in velocity and effort fidelity. Mitigation: require positions, treat velocities/efforts as optional, and make effort use opt-in.
+- **Simulation realism risk:** MuJoCo contact behavior depends on scene geometry and contact parameters. Mitigation: use MuJoCo as integration evidence, not the only acceptance criterion.
+- **Abstraction risk:** a reusable composition pattern can grow into a graph framework prematurely. Mitigation: keep v1 linear and scoped to one source plus transforms.
+- **Safety responsibility risk:** safety may be split across transform, source task, coordinator, and hardware adapter. Mitigation: transform-level clamps are mandatory, while coordinator and hardware routing remain unchanged.
+
+## Migration / Rollout
+
+Roll out in layers:
+
+1. Add composition abstractions and unit tests without changing existing trajectory behavior.
+2. Add joint compliance transform with deterministic tests.
+3. Add compliant trajectory wrapper/factory and integration tests.
+4. Add optional blueprint/configuration exposure for simulation or manipulator stacks.
+5. Add documentation for compliant trajectory semantics, safety bounds, and verification approach.
+
+Existing users can keep using rigid trajectory tasks. Rollback is straightforward if the new task is not selected by default: remove the new task/configuration and keep existing trajectory behavior.
+
+## Open Questions
+
+- Should the first implementation pass through the reference or return no command when joint state for a controlled joint is missing?
+- Which robot/sim blueprint should be the canonical MuJoCo verification entry point: xArm7 first, Piper second, or a new minimal manipulator verification blueprint?
+- Should sustained offset saturation abort the task in v1, or only report diagnostics and continue clamped?
+- What configuration format should expose compliance gains and clamps to blueprints without overcommitting to a public API too early?
diff --git a/openspec/changes/add-compliant-joint-trajectory-task/docs.md b/openspec/changes/add-compliant-joint-trajectory-task/docs.md
new file mode 100644
index 0000000000..1ab5da1643
--- /dev/null
+++ b/openspec/changes/add-compliant-joint-trajectory-task/docs.md
@@ -0,0 +1,31 @@
+## User-Facing Docs
+
+- Add or update a user-facing control/manipulation guide under `docs/usage/` or `docs/capabilities/` explaining compliant joint trajectory execution:
+ - rigid trajectory vs compliant trajectory behavior;
+ - supported command mode for v1: position-servo output;
+ - required feedback: joint positions; optional feedback: velocities and configured effort;
+ - safety clamps and saturation behavior;
+ - recommended use cases such as gentle contact, guarded motion, and manipulation near uncertain obstacles.
+- If a simulation blueprint or example is added, document how to run the MuJoCo smoke/contact verification scenario.
+
+## Contributor Docs
+
+- Update control architecture contributor docs if they exist, or add a short development note in `docs/development/` covering:
+ - coordinator-facing tasks still emit final commands;
+ - composed tasks are internal linear pipelines, not coordinator-level task graphs;
+ - torque impedance remains future work until the adapter/hardware path supports torque output.
+
+## Coding-Agent Docs
+
+- Update `docs/coding-agents/` only if implementation introduces new coding-agent workflow guidance for control tasks.
+- No `AGENTS.md` update is required unless new safety rules or generated-file requirements become broadly applicable to coding agents.
+
+## Doc Validation
+
+- Run link/documentation validation commands used by the repository for any changed docs, such as `doclinks` if available.
+- For executable Markdown snippets, run `md-babel-py run ` where applicable.
+- If diagrams are added or regenerated, run `bin/gen-diagrams` if applicable.
+
+## No Docs Needed
+
+Documentation is needed because the change introduces user/developer-visible control behavior and safety semantics. Even if the first implementation is only selectable by blueprint/configuration, users need to understand that v1 is position-servo compliance rather than torque impedance.
diff --git a/openspec/changes/add-compliant-joint-trajectory-task/proposal.md b/openspec/changes/add-compliant-joint-trajectory-task/proposal.md
new file mode 100644
index 0000000000..703ef0e11d
--- /dev/null
+++ b/openspec/changes/add-compliant-joint-trajectory-task/proposal.md
@@ -0,0 +1,36 @@
+## Why
+
+DimOS currently has a passive `JointTrajectoryTask` that samples nominal joint-space trajectories and emits final `SERVO_POSITION` commands directly to the coordinator. This is appropriate for rigid motion, but it gives no reusable way to soften trajectory tracking under contact or unexpected resistance. A separate compliance task cannot safely run beside the trajectory task today because the coordinator arbitrates final joint ownership rather than composing command transforms.
+
+This change introduces a narrow compliant joint trajectory capability: trajectory generation remains the nominal reference source, while a compliance transform shapes the final servo-position command using joint feedback. The first implementation targets position-servo compliance that works across existing manipulator adapters and MuJoCo simulation without requiring torque output support.
+
+## What Changes
+
+- Add a coordinator-facing composed control task pattern for one source task plus ordered reference transforms.
+- Add a compliant joint trajectory behavior that composes joint trajectory sampling with joint-space compliance output shaping.
+- Add a reusable joint compliance task that can consume an upstream joint position reference and produce a bounded `SERVO_POSITION` command.
+- Keep existing rigid trajectory behavior available and unchanged.
+- Add verification coverage for unit-level compliance math, composed task behavior, and MuJoCo simulation/contact scenarios.
+- No **BREAKING** public CLI, skill/MCP, or hardware-safety behavior changes are intended.
+
+## Affected DimOS Surfaces
+
+- Modules/streams: control task abstractions, trajectory task integration, coordinator task outputs, joint command/task tests.
+- Blueprints/CLI: manipulator coordinator task configuration may gain a compliant trajectory task option; existing blueprint behavior remains unchanged unless the new task is selected.
+- Skills/MCP: none directly.
+- Hardware/simulation/replay: manipulator adapters remain position/velocity-command based; MuJoCo simulation is used for smoke/contact verification through existing xArm/Piper-style simulated hardware paths.
+- Docs/generated registries: control/task documentation and capability docs may be updated; generated blueprint registry changes are only needed if a new blueprint is added.
+
+## Capabilities
+
+### New Capabilities
+
+- `compliant-joint-trajectory-control`: Joint-space trajectory execution with bounded position-servo compliance and verification expectations.
+
+### Modified Capabilities
+
+- None.
+
+## Impact
+
+Developers gain a supported path for compliant trajectory execution without introducing coordinator-level task graphs or torque-mode requirements. Existing users of `JointTrajectoryTask` should see no behavioral change. Main risks are control tuning, safety-bound defaults, and simulation realism; the implementation should therefore include conservative clamps, explicit effort-feedback configuration, and tests that separate unit-level behavior from MuJoCo contact demos.
diff --git a/openspec/changes/add-compliant-joint-trajectory-task/specs/compliant-joint-trajectory-control/spec.md b/openspec/changes/add-compliant-joint-trajectory-task/specs/compliant-joint-trajectory-control/spec.md
new file mode 100644
index 0000000000..ae26d53505
--- /dev/null
+++ b/openspec/changes/add-compliant-joint-trajectory-task/specs/compliant-joint-trajectory-control/spec.md
@@ -0,0 +1,114 @@
+## ADDED Requirements
+
+### Requirement: Compliant joint trajectory execution
+
+DimOS SHALL provide a joint trajectory execution mode that follows a nominal joint trajectory while allowing bounded compliant deviation from the reference under tracking resistance or contact.
+
+#### Scenario: Free-space trajectory behaves like nominal tracking
+- **GIVEN** a compliant joint trajectory is active for a set of manipulator joints
+- **AND** the measured joint positions track the nominal trajectory within the configured deadband or tolerance
+- **WHEN** the controller computes the next joint command
+- **THEN** the commanded joint positions SHALL remain close to the nominal trajectory reference
+- **AND** the compliant offset SHALL remain near zero within configured tolerance.
+
+#### Scenario: Contact or resistance produces bounded backoff
+- **GIVEN** a compliant joint trajectory is active for a set of manipulator joints
+- **AND** measured joint feedback indicates sustained resistance to the nominal motion
+- **WHEN** the controller computes subsequent joint commands
+- **THEN** the commanded joint positions SHALL include a compliant offset that reduces continued fighting against the resistance
+- **AND** the compliant offset and offset velocity SHALL remain within configured bounds.
+
+#### Scenario: Existing rigid trajectory behavior remains available
+- **GIVEN** a robot stack or test selects the existing rigid joint trajectory behavior
+- **WHEN** a joint trajectory is executed
+- **THEN** DimOS SHALL execute the trajectory without applying compliant offset behavior
+- **AND** existing rigid trajectory users SHALL NOT be required to change their configuration.
+
+### Requirement: Position-servo compatibility
+
+The compliant joint trajectory capability SHALL produce position-servo joint commands in its initial supported mode and MUST NOT require torque command output from the manipulator hardware path.
+
+#### Scenario: Hardware supports position commands but not torque commands
+- **GIVEN** manipulator hardware can read joint positions and accept position-servo commands
+- **AND** the hardware does not expose torque command output through the generic manipulator command path
+- **WHEN** a compliant joint trajectory is selected
+- **THEN** the controller SHALL be able to operate using position-servo commands
+- **AND** it SHALL NOT require torque-mode hardware routing.
+
+#### Scenario: Effort feedback is unavailable or disabled
+- **GIVEN** manipulator joint positions are available
+- **AND** joint effort feedback is unavailable, placeholder-valued, or not configured for use
+- **WHEN** a compliant joint trajectory is executed
+- **THEN** DimOS SHALL still compute compliance from configured non-effort feedback behavior
+- **AND** it SHALL NOT infer reliable effort feedback solely from zero-valued effort readings.
+
+#### Scenario: Effort feedback is explicitly enabled
+- **GIVEN** a hardware or simulation backend provides reliable joint effort feedback
+- **AND** effort feedback use is explicitly enabled in the compliant trajectory configuration
+- **WHEN** a compliant joint trajectory is executed
+- **THEN** DimOS MAY use effort feedback as part of its resistance estimate
+- **AND** safety bounds SHALL still limit the resulting commanded offsets.
+
+### Requirement: Safe bounded behavior
+
+DimOS SHALL expose and enforce safety bounds for compliant trajectory execution, including maximum compliant offset and maximum offset velocity per controlled joint.
+
+#### Scenario: Offset reaches configured limit
+- **GIVEN** a compliant joint trajectory is active
+- **AND** the computed compliant offset exceeds a configured joint offset limit
+- **WHEN** the command is emitted
+- **THEN** DimOS SHALL clamp the commanded offset to the configured limit
+- **AND** the saturation condition SHALL be observable to tests or diagnostics.
+
+#### Scenario: Invalid timing input
+- **GIVEN** a compliant joint trajectory is active
+- **AND** the controller receives a zero or negative control timestep
+- **WHEN** the controller computes the next command
+- **THEN** DimOS SHALL avoid integrating an unstable offset update
+- **AND** it SHALL produce a documented safe outcome such as pass-through, hold, or no command.
+
+#### Scenario: Required joint state is missing
+- **GIVEN** a compliant joint trajectory is active for one or more joints
+- **AND** required joint position state is missing for a controlled joint
+- **WHEN** the controller computes the next command
+- **THEN** DimOS SHALL avoid producing arbitrary compliant offsets for that joint
+- **AND** it SHALL use a documented safe behavior such as pass-through, hold, fault, or no command.
+
+### Requirement: Single owner of final joint command
+
+DimOS SHALL expose compliant trajectory execution to the coordinator as a single owner of the controlled joints, rather than requiring separate trajectory and compliance controllers to compete for the same joints.
+
+#### Scenario: Coordinator arbitrates compliant trajectory joints
+- **GIVEN** a compliant joint trajectory is active for a set of joints
+- **WHEN** the coordinator arbitrates active joint commands
+- **THEN** it SHALL see one final command stream for those joints from the compliant trajectory behavior
+- **AND** it SHALL NOT need to schedule separate competing trajectory and compliance tasks for the same joints.
+
+#### Scenario: Higher-priority safety behavior preempts compliant trajectory
+- **GIVEN** a compliant joint trajectory is active
+- **AND** a higher-priority safety or control behavior claims one or more of the same joints
+- **WHEN** the coordinator preempts the compliant trajectory behavior for those joints
+- **THEN** the compliant trajectory behavior SHALL stop or transition to a documented preempted state
+- **AND** internal compliant offset state SHALL be reset or made safe before later reuse.
+
+### Requirement: MuJoCo verification support
+
+DimOS SHALL support verification of compliant joint trajectory behavior in simulation using joint-space metrics, with optional contact-scene integration tests or demos.
+
+#### Scenario: Free-space simulation smoke test
+- **GIVEN** a simulated manipulator can execute joint trajectories in MuJoCo
+- **WHEN** a compliant joint trajectory runs in free space
+- **THEN** the trajectory SHALL complete without large compliant offsets
+- **AND** the observed joint positions and efforts SHALL remain within configured verification thresholds.
+
+#### Scenario: Rigid contact comparison
+- **GIVEN** a MuJoCo scene includes a table or rigid obstacle in the manipulator workspace
+- **WHEN** an equivalent rigid trajectory and compliant trajectory are compared while attempting motion into contact
+- **THEN** the compliant trajectory run SHOULD show reduced fighting behavior using metrics such as peak effort, tracking error, commanded offset, or saturation time
+- **AND** the compliant trajectory run SHALL remain within configured safety bounds.
+
+#### Scenario: Soft contact comparison
+- **GIVEN** a MuJoCo scene includes a soft or cushion-like contact object with tuned contact parameters
+- **WHEN** a compliant trajectory makes contact with the object
+- **THEN** DimOS SHOULD be able to demonstrate a bounded compliant equilibrium or safe saturation
+- **AND** failure to tune soft contact parameters SHALL NOT invalidate deterministic unit-level compliance tests.
diff --git a/openspec/changes/add-compliant-joint-trajectory-task/tasks.md b/openspec/changes/add-compliant-joint-trajectory-task/tasks.md
new file mode 100644
index 0000000000..5710a9dfe3
--- /dev/null
+++ b/openspec/changes/add-compliant-joint-trajectory-task/tasks.md
@@ -0,0 +1,54 @@
+## 1. Composition Infrastructure
+
+- [x] 1.1 Add a narrow reference-transform interface for control tasks that can consume an upstream `JointCommandOutput` and return a transformed `JointCommandOutput | None`.
+- [x] 1.2 Add a composed control task wrapper that exposes normal coordinator-facing task semantics while internally running one source task followed by ordered transforms.
+- [x] 1.3 Validate composed task construction for v1 constraints: one source, linear transforms, compatible joints, and compatible command modes.
+- [x] 1.4 Forward lifecycle/preemption behavior from the composed task to the source and transforms, including safe reset of transform state on preemption.
+
+## 2. Joint Compliance Behavior
+
+- [x] 2.1 Add a joint compliance task/configuration with per-joint mass, damping, stiffness, max offset, max offset velocity, deadband/tolerance, and effort-feedback options.
+- [x] 2.2 Implement reference-transform behavior for `SERVO_POSITION` joint position references.
+- [x] 2.3 Implement bounded admittance-style offset integration using coordinator joint position state, optional velocity state, and explicitly enabled effort feedback.
+- [x] 2.4 Define and implement safe behavior for invalid `dt`, missing required joint state, incompatible reference modes, and clamp saturation.
+- [x] 2.5 Expose diagnostics or inspectable state for offset, offset velocity, saturation, and feedback mode so tests can verify behavior.
+
+## 3. Compliant Trajectory Integration
+
+- [x] 3.1 Add a compliant joint trajectory task/factory that composes the existing joint trajectory source with the joint compliance transform.
+- [x] 3.2 Keep existing rigid joint trajectory behavior unchanged and selectable.
+- [x] 3.3 Add optional manipulator task configuration or blueprint helper support for selecting compliant trajectory behavior where appropriate.
+- [x] 3.4 If any blueprint variables are added or renamed, regenerate and verify the blueprint registry with `pytest dimos/robot/test_all_blueprints_generation.py`.
+
+## 4. Tests
+
+- [x] 4.1 Add unit tests for composed task output forwarding, transform ordering, validation failures, active-state behavior, and preemption forwarding.
+- [x] 4.2 Add unit tests for compliance pass-through behavior in free space.
+- [x] 4.3 Add unit tests for bounded offset and offset velocity under tracking resistance.
+- [x] 4.4 Add unit tests proving effort feedback is opt-in and zero-valued placeholder effort is not treated as reliable by default.
+- [x] 4.5 Add unit tests for invalid `dt`, missing joint state, incompatible reference modes, and saturation diagnostics.
+- [x] 4.6 Add integration tests for compliant trajectory sampling followed by compliant output shaping, including completion/cancel/preemption parity with rigid trajectory execution.
+
+## 5. MuJoCo Verification
+
+- [x] 5.1 Identify the canonical MuJoCo manipulator verification entry point, preferably existing xArm7 simulation unless a smaller dedicated scene is warranted.
+- [x] 5.2 Add a free-space MuJoCo smoke test or demo that compares rigid and compliant trajectory execution with near-zero compliant offset.
+- [x] 5.3 Add or document a rigid table/obstacle contact scenario that compares rigid and compliant trajectory behavior using joint-space metrics such as peak effort, tracking error, offset, and saturation time.
+- [x] 5.4 Optionally add a soft cushion/contact demo after rigid contact behavior is stable; do not make soft-contact tuning the only proof of correctness.
+- [x] 5.5 Document manual simulation QA commands and expected pass/fail observations.
+
+## 6. Documentation
+
+- [x] 6.1 Add or update user-facing documentation explaining rigid vs compliant trajectory behavior, v1 position-servo semantics, feedback requirements, and safety bounds.
+- [x] 6.2 Add contributor documentation or development notes explaining that composed tasks are internal linear pipelines, not coordinator-level task graphs.
+- [x] 6.3 Document MuJoCo smoke/contact verification if a simulation blueprint, scene, or demo is added.
+- [x] 6.4 Confirm whether `docs/coding-agents/` or `AGENTS.md` need updates; update only if new coding-agent guidance is introduced.
+
+## 7. Validation
+
+- [x] 7.1 Run `openspec validate add-compliant-joint-trajectory-task`.
+- [x] 7.2 Run focused pytest targets for changed control task and compliance task code.
+- [x] 7.3 Run focused MuJoCo tests or demos for simulation verification when available; use `bin/pytest-mujoco` if the test is marked for MuJoCo.
+- [x] 7.4 Run `pytest dimos/robot/test_all_blueprints_generation.py` if blueprint registry inputs change.
+- [x] 7.5 Run relevant docs validation commands for changed docs, such as `doclinks`, `md-babel-py run `, or `bin/gen-diagrams` when applicable.
+- [x] 7.6 Run type/lint checks required by the touched area, such as `uv run mypy dimos/` or focused lint/test commands if full checks are too expensive.
diff --git a/openspec/config.yaml b/openspec/config.yaml
new file mode 100644
index 0000000000..62a72bba63
--- /dev/null
+++ b/openspec/config.yaml
@@ -0,0 +1,45 @@
+schema: dimos-capability
+
+context: |
+ DimOS is a robotics operating system for generalist robots. Modules communicate
+ through typed streams (`In[T]`, `Out[T]`) over LCM, SHM, ROS, DDS, or other
+ transports. Blueprints compose modules into runnable robot stacks. Skills are
+ `@skill`-annotated RPC methods exposed to agents and MCP clients.
+
+ Terminology boundary:
+ - "OpenSpec spec" means a behavior specification under `openspec/specs/`.
+ - "DimOS Spec" means a Python Protocol/RPC contract in `*_spec.py` files,
+ usually inheriting `dimos.spec.utils.Spec` and `typing.Protocol`.
+ Keep these separate. OpenSpec specs describe observable behavior; DimOS Specs
+ describe code-level module interfaces.
+
+ OpenSpec specs should capture current behavior, user/developer-visible
+ outcomes, public CLI/API/tool surfaces, robot safety constraints, and testable
+ scenarios. Put implementation choices, class names, module wiring, generated
+ registry updates, and rollout details in `design.md` or `tasks.md`.
+
+ Documentation lives in:
+ - `docs/usage/` for user-facing concepts and APIs.
+ - `docs/capabilities/` for capability and platform guides.
+ - `docs/development/` for contributor process.
+ - `docs/coding-agents/` and `AGENTS.md` for coding-agent guidance.
+
+rules:
+ proposal:
+ - "Identify affected DimOS surfaces: modules, streams, blueprints, CLI, skills/MCP, docs, hardware, simulation, replay, or generated registries."
+ - Use capability names that match behavior domains, not Python class names.
+ - Mark hardware safety or public API/CLI changes explicitly.
+ specs:
+ - Write behavior-first requirements; avoid implementation detail unless it is externally observable.
+ - Every requirement must include at least one `#### Scenario:` block with concrete observable outcomes.
+ - Use "OpenSpec capability spec" when prose might otherwise be confused with DimOS Python `Spec` Protocols.
+ design:
+ - Call out DimOS `Spec` Protocols, adapter Protocols, blueprint composition, stream names/types, and skill/MCP exposure when relevant.
+ - Mention generated files and required regeneration commands, especially `pytest dimos/robot/test_all_blueprints_generation.py` for blueprint registry changes.
+ - Include hardware/simulation/replay assumptions and safety constraints for robot-facing work.
+ docs:
+ - List user-facing docs, contributor docs, coding-agent docs, and AGENTS.md updates required by the change.
+ - Include documentation validation commands for changed docs, such as `doclinks` and `md-babel-py run ` where applicable.
+ tasks:
+ - Include verification tasks for OpenSpec validation, relevant pytest targets, type checks when needed, and manual QA through the user-facing surface.
+ - Add registry generation tasks when blueprint names, module classes, or generated registry inputs change.
diff --git a/openspec/schemas/dimos-capability/schema.yaml b/openspec/schemas/dimos-capability/schema.yaml
new file mode 100644
index 0000000000..fedb7964ee
--- /dev/null
+++ b/openspec/schemas/dimos-capability/schema.yaml
@@ -0,0 +1,128 @@
+name: dimos-capability
+version: 1
+description: DimOS capability workflow - proposal → specs/design/docs → tasks
+artifacts:
+ - id: proposal
+ generates: proposal.md
+ description: DimOS change proposal covering intent, scope, capability impact, and affected robot/software surfaces
+ template: proposal.md
+ instruction: |
+ Create the proposal document that establishes WHY this change is needed and what DimOS behavior it affects.
+
+ Sections:
+ - **Why**: 1-2 concise paragraphs on the problem or opportunity. Explain why the change matters now.
+ - **What Changes**: Bullet list of added, modified, or removed behavior. Mark public API/CLI or hardware-safety breaking changes with **BREAKING**.
+ - **Affected DimOS Surfaces**: Identify modules, streams, blueprints, CLI commands, skills/MCP tools, docs, hardware, simulation, replay, generated registries, or external protocols touched by the change.
+ - **Capabilities**: Identify which OpenSpec capability specs will be created or modified:
+ - **New Capabilities**: List behavior domains introduced by the change. Each becomes `specs//spec.md`. Use kebab-case names (for example, `agent-skills-mcp`, `blueprint-composition`, `manipulation-stack`).
+ - **Modified Capabilities**: List existing `openspec/specs//` entries whose requirements change. Only include spec-level behavior changes, not implementation-only refactors.
+ - **Impact**: Summarize user/developer impact, compatibility risks, dependency changes, documentation updates, and test/QA scope.
+
+ Keep proposals concise. Do not include line-by-line implementation details; put architecture and rollout decisions in `design.md`.
+ requires: []
+ - id: specs
+ generates: specs/**/*.md
+ description: Behavior-first OpenSpec capability delta specifications
+ template: spec.md
+ instruction: |
+ Create OpenSpec capability specs that define WHAT DimOS should do, not how it is implemented.
+
+ Create one delta spec file per capability listed in proposal.md:
+ - New capabilities: use `specs//spec.md` with the exact kebab-case name from the proposal.
+ - Modified capabilities: use the existing folder from `openspec/specs//`.
+
+ Use these delta sections as `##` headers:
+ - **ADDED Requirements**: New externally observable behavior.
+ - **MODIFIED Requirements**: Changed behavior. Include the full updated requirement block, not a partial patch.
+ - **REMOVED Requirements**: Deprecated behavior. Include **Reason** and **Migration**.
+ - **RENAMED Requirements**: Name-only changes. Use FROM:/TO: format.
+
+ Requirement format:
+ - Use `### Requirement: `.
+ - Use SHALL/MUST for normative requirements.
+ - Include at least one `#### Scenario: ` per requirement. Scenario headings MUST use exactly four `#` characters.
+ - Prefer `- **GIVEN**`, `- **WHEN**`, `- **THEN**`, and `- **AND**` bullets.
+ - Cover happy path plus meaningful edge/error/safety cases.
+
+ DimOS-specific guidance:
+ - Specify user/developer-visible behavior, robot outcomes, CLI behavior, skill/MCP tool behavior, stream contracts, safety constraints, and compatibility expectations.
+ - Avoid Python class names, private module internals, transport implementation choices, and generated-file details unless those details are observable API contracts.
+ - Use "OpenSpec capability spec" in prose when needed to avoid confusion with DimOS Python `Spec` Protocols.
+ - If the behavior only changes implementation and not observable requirements, do not create a spec delta.
+ requires:
+ - proposal
+ - id: design
+ generates: design.md
+ description: DimOS technical design and architecture decisions
+ template: design.md
+ instruction: |
+ Create the design document that explains HOW the change should be implemented in DimOS.
+
+ Include design.md for cross-module changes, new robot/hardware integration, new public interfaces, new dependencies, safety-sensitive behavior, generated registry changes, or unclear architecture.
+
+ Sections:
+ - **Context**: Current state, relevant modules/blueprints/docs, and constraints.
+ - **Goals / Non-Goals**: What the design achieves and explicitly excludes.
+ - **DimOS Architecture**: Modules, streams, transports, blueprints, RPC/module refs, DimOS `Spec` Protocols, adapter Protocols, skills/MCP exposure, CLI entry points, and generated registries involved.
+ - **Decisions**: Key choices with rationale and alternatives considered.
+ - **Safety / Simulation / Replay**: Hardware assumptions, sim/replay behavior, safety constraints, and manual QA surface.
+ - **Risks / Trade-offs**: Known risks and mitigations.
+ - **Migration / Rollout**: Compatibility, generated files, docs, and deployment steps.
+ - **Open Questions**: Outstanding decisions or unknowns.
+
+ Reference proposal.md for intent and specs for behavior. Keep line-by-line work in tasks.md.
+ requires:
+ - proposal
+ - id: docs
+ generates: docs.md
+ description: Documentation impact plan for user, contributor, and coding-agent docs
+ template: docs.md
+ instruction: |
+ Create the documentation impact plan for the change.
+
+ Sections:
+ - **User-Facing Docs**: Updates under `docs/usage/`, `docs/capabilities/`, `docs/platforms/`, or README files.
+ - **Contributor Docs**: Updates under `docs/development/`.
+ - **Coding-Agent Docs**: Updates under `docs/coding-agents/` or `AGENTS.md`.
+ - **Doc Validation**: Commands needed for changed docs, such as `doclinks`, `md-babel-py run `, and `bin/gen-diagrams`.
+ - **No Docs Needed**: If no docs are needed, explain why.
+
+ Match `docs/development/writing_docs.md`: contributor-only docs belong in `docs/development`; user-facing behavior belongs in `docs/usage` or `docs/capabilities`.
+ requires:
+ - proposal
+ - id: tasks
+ generates: tasks.md
+ description: Implementation, validation, docs, and manual-QA checklist
+ template: tasks.md
+ instruction: |
+ Create the implementation checklist. The apply phase parses checkbox format, so every actionable task MUST use `- [ ]`.
+
+ Guidelines:
+ - Group tasks under numbered `##` headings.
+ - Each task must be `- [ ] X.Y Task description`.
+ - Keep tasks small enough to complete in one focused session.
+ - Order tasks by dependency.
+ - Include docs and validation tasks from docs.md.
+ - Include generated registry tasks when blueprints or module registry inputs change.
+ - Include manual QA through the actual user surface: CLI, TUI, HTTP API, MCP tool, simulation/replay blueprint, hardware procedure, or library driver.
+
+ Typical DimOS validation tasks:
+ - Run `openspec validate `.
+ - Run focused pytest targets for changed modules.
+ - Run `pytest dimos/robot/test_all_blueprints_generation.py` when blueprint registry output may change.
+ - Run docs validation commands for changed docs.
+ - Run lints/types when the touched area requires them.
+
+ Reference specs for WHAT, design for HOW, and docs.md for documentation work.
+ requires:
+ - specs
+ - design
+ - docs
+apply:
+ requires:
+ - tasks
+ tracks: tasks.md
+ instruction: |
+ Read proposal.md, specs, design.md, docs.md, and tasks.md before editing code.
+ Work through pending tasks, mark checkboxes complete as they finish, and keep artifacts current when implementation changes the plan.
+ Verify with OpenSpec validation, focused tests, docs checks, and manual QA through the relevant DimOS surface.
diff --git a/openspec/schemas/dimos-capability/templates/design.md b/openspec/schemas/dimos-capability/templates/design.md
new file mode 100644
index 0000000000..25031ceb8b
--- /dev/null
+++ b/openspec/schemas/dimos-capability/templates/design.md
@@ -0,0 +1,35 @@
+## Context
+
+
+
+## Goals / Non-Goals
+
+**Goals:**
+
+
+**Non-Goals:**
+
+
+## DimOS Architecture
+
+
+
+## Decisions
+
+
+
+## Safety / Simulation / Replay
+
+
+
+## Risks / Trade-offs
+
+
+
+## Migration / Rollout
+
+
+
+## Open Questions
+
+
diff --git a/openspec/schemas/dimos-capability/templates/docs.md b/openspec/schemas/dimos-capability/templates/docs.md
new file mode 100644
index 0000000000..d274aed653
--- /dev/null
+++ b/openspec/schemas/dimos-capability/templates/docs.md
@@ -0,0 +1,19 @@
+## User-Facing Docs
+
+
+
+## Contributor Docs
+
+
+
+## Coding-Agent Docs
+
+
+
+## Doc Validation
+
+
+
+## No Docs Needed
+
+
diff --git a/openspec/schemas/dimos-capability/templates/proposal.md b/openspec/schemas/dimos-capability/templates/proposal.md
new file mode 100644
index 0000000000..98d409e8de
--- /dev/null
+++ b/openspec/schemas/dimos-capability/templates/proposal.md
@@ -0,0 +1,32 @@
+## Why
+
+
+
+## What Changes
+
+
+
+## Affected DimOS Surfaces
+
+
+- Modules/streams:
+- Blueprints/CLI:
+- Skills/MCP:
+- Hardware/simulation/replay:
+- Docs/generated registries:
+
+## Capabilities
+
+### New Capabilities
+
+- ``:
+
+### Modified Capabilities
+
+- ``:
+
+## Impact
+
+
diff --git a/openspec/schemas/dimos-capability/templates/spec.md b/openspec/schemas/dimos-capability/templates/spec.md
new file mode 100644
index 0000000000..afc0c1ff58
--- /dev/null
+++ b/openspec/schemas/dimos-capability/templates/spec.md
@@ -0,0 +1,16 @@
+## ADDED Requirements
+
+### Requirement:
+
+
+#### Scenario:
+- **GIVEN**
+- **WHEN**
+- **THEN**
+- **AND**
+
+
diff --git a/openspec/schemas/dimos-capability/templates/tasks.md b/openspec/schemas/dimos-capability/templates/tasks.md
new file mode 100644
index 0000000000..b38fcdfabb
--- /dev/null
+++ b/openspec/schemas/dimos-capability/templates/tasks.md
@@ -0,0 +1,15 @@
+## 1. Implementation
+
+- [ ] 1.1
+- [ ] 1.2
+
+## 2. Documentation
+
+- [ ] 2.1
+
+## 3. Verification
+
+- [ ] 3.1 Run `openspec validate `
+- [ ] 3.2 Run focused tests for changed code
+- [ ] 3.3 Run docs validation commands for changed docs
+- [ ] 3.4 Manually QA through the relevant DimOS surface (CLI, MCP, simulation/replay, hardware procedure, HTTP API, or library driver)