feat(go2): RPP Trajectory controller + decoupled pub/sub benchmark & scoring#2605
feat(go2): RPP Trajectory controller + decoupled pub/sub benchmark & scoring#2605mustafab0 wants to merge 1 commit into
Conversation
❌ 2 Tests Failed:
View the top 2 failed test(s) by shortest run time
To view more test analytics, go to the Test Analytics Dashboard |
Greptile SummaryThis PR adds a regulated-pure-pursuit (RPP) path-follower task and a decoupled benchmark/scoring pipeline for the Unitree Go2. The controller self-calibrates from a vendored tuning artifact on first use, and the benchmark talks to it purely over LCM (
Confidence Score: 4/5Safe to merge; all findings are quality-oriented and none affect core runtime correctness for the intended use case. The architecture is clean: the benchmarker and controller are genuinely decoupled over LCM, artifact loading is lazy and well-guarded, and the scoring pipeline correctly separates acquisition from analysis. The
Important Files Changed
Reviews (1): Last reviewed commit: "feat(go2): RPP controller + decoupled pu..." | Re-trigger Greptile |
| if self._config.forward_only: | ||
| assert abs(vy) < 1e-6, f"PathFollowerTask forward_only: vy={vy} must be 0" | ||
| vx = max(0.0, vx) |
There was a problem hiding this comment.
Using
assert inside the control-loop compute() method is fragile: Python's -O (optimize) flag strips all assert statements at runtime, so this check silently disappears in an optimized deployment. If vy were ever non-zero (e.g., due to a future PController change), the robot would receive an unexpected lateral velocity without any error. An explicit guard with an early return is safer.
| if self._config.forward_only: | |
| assert abs(vy) < 1e-6, f"PathFollowerTask forward_only: vy={vy} must be 0" | |
| vx = max(0.0, vx) | |
| if self._config.forward_only: | |
| if abs(vy) > 1e-6: | |
| logger.warning( | |
| f"PathFollowerTask '{self._name}' forward_only: unexpected vy={vy:.6f}; clamping to 0" | |
| ) | |
| vy = 0.0 | |
| vx = max(0.0, vx) |
| yaws = [p.orientation.euler[2] for p in poses] | ||
| if max(yaws) - min(yaws) > _HEADING_DEGENERATE_EPS: | ||
| return path # planner provided real per-pose headings; leave them. |
There was a problem hiding this comment.
The heading-degenerate check uses
max(yaws) - min(yaws), which fails when yaws straddle the ±π boundary. For example, a path with poses at yaw ≈ π and a few at yaw ≈ −π (same physical direction) would show a spread of ~2π and be incorrectly classified as having valid per-pose headings, suppressing tangent-heading synthesis. Using pairwise angle differences is more robust.
| yaws = [p.orientation.euler[2] for p in poses] | |
| if max(yaws) - min(yaws) > _HEADING_DEGENERATE_EPS: | |
| return path # planner provided real per-pose headings; leave them. | |
| yaws = [p.orientation.euler[2] for p in poses] | |
| # Use pairwise angle differences to handle ±π wraparound correctly. | |
| total_spread = sum( | |
| abs(((b - a + math.pi) % (2 * math.pi)) - math.pi) for a, b in zip(yaws, yaws[1:]) | |
| ) | |
| if total_spread > _HEADING_DEGENERATE_EPS: | |
| return path # planner provided real per-pose headings; leave them. |
| logger.info( | ||
| f"Benchmarker: {cfg.robot} speeds={speeds} over {len(path_set())} paths " | ||
| f"(gate_source={cfg.gate_source}, out={out_root})" | ||
| ) | ||
|
|
||
| idx = 0 | ||
| for path_name, path in path_set().items(): |
There was a problem hiding this comment.
path_set() is called twice — once for the log message and once to drive the loop. Capturing it once avoids double-constructing all paths.
| logger.info( | |
| f"Benchmarker: {cfg.robot} speeds={speeds} over {len(path_set())} paths " | |
| f"(gate_source={cfg.gate_source}, out={out_root})" | |
| ) | |
| idx = 0 | |
| for path_name, path in path_set().items(): | |
| battery = path_set() | |
| logger.info( | |
| f"Benchmarker: {cfg.robot} speeds={speeds} over {len(battery)} paths " | |
| f"(gate_source={cfg.gate_source}, out={out_root})" | |
| ) | |
| idx = 0 | |
| for path_name, path in battery.items(): |
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
| def FopdtChannelParamsLike(dc: FopdtChannelDC): | ||
| """Lightweight adapter: derive_config wants a TwistBasePlantParams | ||
| (made of FopdtChannelParams), but the artifact stores them as | ||
| FopdtChannelDC. Return a duck-typed object with .K, .tau, .L.""" | ||
| from dimos.utils.benchmarking.plant import FopdtChannelParams | ||
|
|
||
| return FopdtChannelParams(K=dc.K, tau=dc.tau, L=dc.L) |
There was a problem hiding this comment.
FopdtChannelParamsLike is a plain function but named with PascalCase (reserved for classes in PEP 8), making every call site look like a constructor. Consider renaming to _fopdt_channel_as_params.
| def FopdtChannelParamsLike(dc: FopdtChannelDC): | |
| """Lightweight adapter: derive_config wants a TwistBasePlantParams | |
| (made of FopdtChannelParams), but the artifact stores them as | |
| FopdtChannelDC. Return a duck-typed object with .K, .tau, .L.""" | |
| from dimos.utils.benchmarking.plant import FopdtChannelParams | |
| return FopdtChannelParams(K=dc.K, tau=dc.tau, L=dc.L) | |
| def _fopdt_channel_as_params(dc: FopdtChannelDC): | |
| """Lightweight adapter: derive_config wants a TwistBasePlantParams | |
| (made of FopdtChannelParams), but the artifact stores them as | |
| FopdtChannelDC. Return a duck-typed object with .K, .tau, .L.""" | |
| from dimos.utils.benchmarking.plant import FopdtChannelParams | |
| return FopdtChannelParams(K=dc.K, tau=dc.tau, L=dc.L) |
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
| def _on_path(self, msg: Path) -> None: | ||
| """Forward a planned path + a fresh odom snapshot to any task exposing | ||
| ``set_path(path, odom)``. Single-arg ``set_path(path)`` callers still | ||
| work (TypeError fallback).""" | ||
| odom = self._read_base_odom() | ||
| with self._task_lock: | ||
| for task in self._tasks.values(): | ||
| set_path = getattr(task, "set_path", None) | ||
| if set_path is None: | ||
| continue | ||
| try: | ||
| set_path(msg, odom) | ||
| except TypeError: | ||
| set_path(msg) # backwards compat with single-arg set_path |
There was a problem hiding this comment.
set_path called under _task_lock — blocks tick loop during path setup
_on_path holds _task_lock for the entire duration of the set_path call chain, which resolves to start_path → PathDistancer.__init__ (numpy cumulative-distance array) + VelocityProfiler.precompute_profile (forward/backward accel passes). For the benchmark's 100–200 waypoint paths this is sub-millisecond and harmless, but it becomes more visible with longer nav-stack paths. Consider copying task references before releasing the lock and doing the setup work outside it.
| def re_derive_config( | ||
| artifact: TuningConfig, | ||
| *, | ||
| vx_max: float = GO2_VX_MAX, | ||
| wz_max: float = GO2_WZ_MAX, | ||
| floor_probe_amplitudes: dict[str, list[float]] | None = None, | ||
| min_speed_floor: float = 0.0, | ||
| sag_threshold: float = 0.15, | ||
| ) -> TuningConfig: | ||
| """Post-hoc apply the current envelope + DERIVE logic to an existing | ||
| artifact. Uses the stored ``dynamics_by_amplitude`` + | ||
| ``floor_probe_results`` — no re-run on hardware needed. | ||
|
|
||
| Useful after a methodology bugfix (the K-sag ceiling was too | ||
| conservative on noisy fits; switched to ``max(K·amp)`` for | ||
| operational use): pass the artifact through here and you get a | ||
| corrected JSON without re-collecting data. | ||
|
|
||
| Plant, FF (the canonical FOPDT) and provenance are passed through | ||
| unchanged — this only recomputes envelope + velocity_profile + | ||
| caveats.""" | ||
| K_linear = { | ||
| "vx": artifact.plant.vx.K, | ||
| "vy": artifact.plant.vy.K, | ||
| "wz": artifact.plant.wz.K, | ||
| } | ||
| env = compute_envelope( | ||
| artifact.floor_probe_results, | ||
| artifact.dynamics_by_amplitude, | ||
| vx_cap=vx_max, | ||
| wz_cap=wz_max, | ||
| floor_probe_amplitudes=floor_probe_amplitudes, | ||
| K_linear=K_linear, | ||
| sag_threshold=sag_threshold, | ||
| ) | ||
| plant = TwistBasePlantParams( | ||
| vx=FopdtChannelParamsLike(artifact.plant.vx), | ||
| vy=FopdtChannelParamsLike(artifact.plant.vy), | ||
| wz=FopdtChannelParamsLike(artifact.plant.wz), | ||
| ) | ||
| return derive_config( | ||
| plant, | ||
| artifact.provenance, | ||
| vx_max=vx_max, | ||
| wz_max=wz_max, | ||
| velocity_envelope=env, | ||
| dynamics_by_amplitude=artifact.dynamics_by_amplitude, | ||
| floor_probe_results=artifact.floor_probe_results, | ||
| min_speed_floor=min_speed_floor, | ||
| ) |
There was a problem hiding this comment.
re_derive_config not exported in __all__
re_derive_config is the primary entry point for the "post-hoc re-derive from stored data" workflow documented in its docstring, but it is absent from __all__. Adding it (and compute_envelope) would align with the other public helpers already listed there.
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!
A clean Go2 regulated-pure-pursuit controller and a decoupled benchmark off main.
The controller and benchmark are separate Modules that talk only over LCM.
The controller owns its calibration (loads the tuning artifact itself); the benchmark and scorer know nothing about it. Scoring is a separate offline step.
Interface:
/pathnav_msgs/Path/speedstd_msgs/Float32/go2/odomgeometry_msgs/PoseStamped/cmd_velgeometry_msgs/Twist/benchmark/gatestd_msgs/Int8Needs a 5x5 m open space
Run the benchmark (one terminal):
Focus the pygame window: WASD to position the robot, ENTER to start each run (K = skip, Backspace = quit). Completion is detected from odom automatically.
Score the recorded runs (offline):
Swap in a different controller: make a Module that subscribes to
/path+/speedand publishes/go2/odom+/cmd_vel, then compose it withBenchmarkerin a blueprint in place ofunitree_go2_rpp_controller. Drive a controller standalone (paths from any source) with: