From f6e3495e95e13909f56548988af9a321bfd958c0 Mon Sep 17 00:00:00 2001 From: tfrere Date: Tue, 19 May 2026 08:21:08 +0200 Subject: [PATCH] feat(protocol): add restart_daemon DataChannel command Recovery hatch for a degraded daemon (e.g. closed motor controller asserting on every command) over the same typed WebRTC transport already used for control, with no SSH or LAN-only HTTP access needed. Same effect as POST /api/daemon/restart, but reachable from a remote (Central-routed) peer. The dispatcher acks the caller before scheduling the restart so the client can distinguish "request accepted" from a transport failure (stopping the daemon tears down the media pipeline, which closes the data channel - the ack would otherwise never reach the caller). Daemon parameters (sim mode, serial port, kinematics, audio, localhost_only, ...) are preserved from the previous start(). The restart is conservative by default: the robot is NOT put to sleep before stopping (goto_sleep_on_stop=False), and the wake-up trajectory is NOT replayed after (wake_up_on_start=False). Both flags can be set to opt in. Wiring is decoupled: Backend exposes set_daemon_restart_handler() and Daemon.start() passes Daemon.restart in. This avoids a circular daemon -> backend -> daemon import and lets non-daemon backends (tests, future transports) leave the hook unset, in which case restart_daemon returns a clear error to the caller. Co-authored-by: Cursor --- src/reachy_mini/daemon/backend/abstract.py | 114 ++++++++++++++++++--- src/reachy_mini/daemon/daemon.py | 7 ++ src/reachy_mini/io/protocol.py | 44 +++++++- 3 files changed, 150 insertions(+), 15 deletions(-) diff --git a/src/reachy_mini/daemon/backend/abstract.py b/src/reachy_mini/daemon/backend/abstract.py index 5f796a042..53c3cc9c3 100644 --- a/src/reachy_mini/daemon/backend/abstract.py +++ b/src/reachy_mini/daemon/backend/abstract.py @@ -16,7 +16,7 @@ import typing from abc import abstractmethod from pathlib import Path -from typing import Annotated, Any, Callable, Dict, Optional +from typing import Annotated, Any, Awaitable, Callable, Dict, Optional import numpy as np from numpy.typing import NDArray @@ -40,6 +40,7 @@ MujocoBackendStatus, PlaySoundCmd, RecordedDataMsg, + RestartDaemonCmd, RobotBackendStatus, SetAntennasCmd, SetAutomaticBodyYawCmd, @@ -219,6 +220,13 @@ def __init__( # GLib thread) can schedule cancellation correctly. self._log_loop: Optional["asyncio.AbstractEventLoop"] = None + # Daemon-level lifecycle hook used by the `restart_daemon` + # DataChannel command. Wired by `Daemon.start()` after the + # backend is constructed (see `set_daemon_restart_handler`). + # Without it, `restart_daemon` reports an error to the caller + # rather than silently no-op'ing. + self._daemon_restart_handler: Optional[Callable[..., Awaitable[Any]]] = None + # Life cycle methods def wrapped_run(self) -> None: """Run the backend in a try-except block to store errors.""" @@ -738,7 +746,9 @@ def stop_sound(self) -> None: 1.0032234352772091, ] - INIT_ANTENNAS_JOINT_POSITIONS = np.array((-0.1745, 0.1745)) # ~10° offset to reduce shaking at vertical + INIT_ANTENNAS_JOINT_POSITIONS = np.array( + (-0.1745, 0.1745) + ) # ~10° offset to reduce shaking at vertical SLEEP_ANTENNAS_JOINT_POSITIONS = np.array((-3.05, 3.05)) SLEEP_HEAD_POSE = np.array( [ @@ -797,7 +807,9 @@ async def goto_sleep(self) -> None: if dist_to_init_pose > 30: # Move to the initial position await self.goto_target( - self.INIT_HEAD_POSE, antennas=self.INIT_ANTENNAS_JOINT_POSITIONS, duration=1 + self.INIT_HEAD_POSE, + antennas=self.INIT_ANTENNAS_JOINT_POSITIONS, + duration=1, ) await asyncio.sleep(0.2) @@ -1021,7 +1033,12 @@ def _maybe_ignore(field: str) -> bool: elif isinstance( cmd, - (SetVolumeCmd, GetVolumeCmd, SetMicrophoneVolumeCmd, GetMicrophoneVolumeCmd), + ( + SetVolumeCmd, + GetVolumeCmd, + SetMicrophoneVolumeCmd, + GetMicrophoneVolumeCmd, + ), ): # Volume is a global robot setting, not per-session: a remote # change persists for the next connection. This matches the @@ -1090,6 +1107,9 @@ def _maybe_ignore(field: str) -> bool: elif isinstance(cmd, UnsubscribeLogsCmd): self._cancel_log_subscription(peer_id) + elif isinstance(cmd, RestartDaemonCmd): + self._dispatch_restart_daemon(cmd, send_response) + # ------------------------------------------------------------------ # journalctl log streaming over the typed transport (subscribe_logs) # ------------------------------------------------------------------ @@ -1110,9 +1130,7 @@ def _start_log_subscription( # is a no-op error rather than an exception so the consumer can # blindly call `subscribeLogs` on every reconnect. self._cancel_log_subscription(peer_id) - task = asyncio.create_task( - self._async_subscribe_logs(peer_id, send_response) - ) + task = asyncio.create_task(self._async_subscribe_logs(peer_id, send_response)) self._log_tasks[peer_id] = task def _cancel_log_subscription(self, peer_id: Optional[str]) -> None: @@ -1173,9 +1191,7 @@ async def _async_subscribe_logs( # journalctl -f shouldn't EOF except on shutdown; # surface it so the consumer can decide to retry. send_response( - LogStreamErrorMsg( - error="journalctl stream ended" - ).model_dump() + LogStreamErrorMsg(error="journalctl stream ended").model_dump() ) return line = raw.decode("utf-8", errors="replace").rstrip("\n") @@ -1185,9 +1201,7 @@ async def _async_subscribe_logs( ts, sep, rest = line.partition(" ") if not sep: ts, rest = "", line - send_response( - LogLineMsg(timestamp=ts, line=rest).model_dump() - ) + send_response(LogLineMsg(timestamp=ts, line=rest).model_dump()) except asyncio.CancelledError: raise except Exception as e: @@ -1245,10 +1259,84 @@ async def _async_goto_sleep( except Exception as e: send_response({"error": str(e), "command": "goto_sleep"}) + # ------------------------------------------------------------------ + # restart_daemon dispatch + # ------------------------------------------------------------------ + + def _dispatch_restart_daemon( + self, + cmd: RestartDaemonCmd, + send_response: Callable[[dict[str, Any]], None], + ) -> None: + """Schedule a daemon restart and ack the caller before tearing down. + + The data channel goes down with the media pipeline as part of + the restart, so we send the ack synchronously *before* + scheduling the work — otherwise the caller would never see a + confirmation that their request was accepted, only the + connection drop. + """ + handler = self._daemon_restart_handler + if handler is None: + send_response( + { + "error": "restart_daemon: no daemon lifecycle handler wired", + "command": "restart_daemon", + } + ) + return + + send_response( + { + "status": "ok", + "command": "restart_daemon", + "scheduled": True, + } + ) + asyncio.create_task(self._async_daemon_restart(handler, cmd)) + + async def _async_daemon_restart( + self, + handler: Callable[..., Awaitable[Any]], + cmd: RestartDaemonCmd, + ) -> None: + """Run the daemon restart handler and surface failures to journalctl. + + We can't reach the caller anymore once the restart starts (the + data channel closes), so failures are logged rather than + reported back over the typed transport. + """ + try: + await handler( + goto_sleep_on_stop=cmd.goto_sleep_on_stop, + wake_up_on_start=cmd.wake_up_on_start, + ) + except Exception as e: + self.logger.error("restart_daemon failed: %s", e) + # ------------------------------------------------------------------ # WebRTC data channel interface (delegates to process_command) # ------------------------------------------------------------------ + def set_daemon_restart_handler( + self, handler: Callable[..., Awaitable[Any]] + ) -> None: + """Wire the daemon-level restart entry point used by ``restart_daemon``. + + Called by ``Daemon.start()`` after the backend is constructed, + passing ``Daemon.restart`` as the handler. Decoupled via a + callback so the backend doesn't need to import the daemon + (avoids the circular ``daemon -> backend -> daemon`` import). + + Args: + handler: An async callable accepting at minimum the keyword + arguments ``goto_sleep_on_stop: bool`` and + ``wake_up_on_start: bool``. ``Daemon.restart`` matches + this shape. + + """ + self._daemon_restart_handler = handler + def setup_media_server(self, media_server: Any) -> None: """Connect the backend to the media server. diff --git a/src/reachy_mini/daemon/daemon.py b/src/reachy_mini/daemon/daemon.py index 54371fb99..5c0859a56 100644 --- a/src/reachy_mini/daemon/daemon.py +++ b/src/reachy_mini/daemon/daemon.py @@ -286,6 +286,13 @@ async def start( hardware_config_filepath=hardware_config_filepath, ) + # Wire the typed-transport restart hook so peers can recover + # a degraded daemon (e.g. closed motor controller) over the + # WebRTC data channel without SSH or LAN HTTP access. + # Re-wired on every start() because each start builds a new + # backend instance. + self.backend.set_daemon_restart_handler(self.restart) + self.ws_server = WSServer(backend=self.backend) self.ws_server.start() self._thread_publish_status = Thread( diff --git a/src/reachy_mini/io/protocol.py b/src/reachy_mini/io/protocol.py index 583a639f8..7e84dd7c3 100644 --- a/src/reachy_mini/io/protocol.py +++ b/src/reachy_mini/io/protocol.py @@ -7,7 +7,8 @@ goto_target, wake_up, goto_sleep, play_sound, set_motor_mode, set_torque, get_motor_mode, set_gravity_compensation, set_automatic_body_yaw, - get_state, get_version, start_recording, stop_recording, append_record + get_state, get_version, start_recording, stop_recording, append_record, + subscribe_logs, unsubscribe_logs, restart_daemon Server->Client message types: joint_positions, head_pose, imu_data, recorded_data, @@ -304,6 +305,44 @@ class UnsubscribeLogsCmd(BaseModel): type: Literal["unsubscribe_logs"] = "unsubscribe_logs" +# ------------------------------------------------------------------ +# Daemon lifecycle commands +# +# `restart_daemon` lets a peer recover a degraded daemon (e.g. a +# closed motor controller asserting on every command) over the same +# typed transport already used for control, without needing SSH or +# LAN-only HTTP access. Same effect as `POST /api/daemon/restart`, +# but reachable from a remote (Central-routed) WebRTC peer. +# +# Side-effects, by design: +# - The active WebRTC session is dropped: stopping the daemon tears +# down the media pipeline, which closes the data channel. The +# daemon sends an ack on the channel BEFORE kicking the restart +# off so well-behaved clients can distinguish "request accepted" +# from a transport failure. +# - The robot does NOT go to sleep before the restart unless +# `goto_sleep_on_stop=True`, and the wake-up trajectory is NOT +# replayed afterwards unless `wake_up_on_start=True`. Defaults +# match `Daemon.restart()` so the call is conservative. +# - Daemon parameters (sim mode, serial port, kinematics, audio, +# localhost_only, ...) are preserved from the previous `start()`. +# ------------------------------------------------------------------ + + +class RestartDaemonCmd(BaseModel): + """Restart the daemon process. + + Recovery hatch over the WebRTC DataChannel: tears the backend + down (motor controller, control loop, media pipeline) and + re-runs ``Daemon.start()`` with the same parameters as the + previous start. The data channel is dropped as part of the + restart and the client must reconnect. + """ + + type: Literal["restart_daemon"] = "restart_daemon" + goto_sleep_on_stop: bool = False + wake_up_on_start: bool = False + AnyCommand = Annotated[ SetTargetCmd @@ -331,7 +370,8 @@ class UnsubscribeLogsCmd(BaseModel): | SetMicrophoneVolumeCmd | GetMicrophoneVolumeCmd | SubscribeLogsCmd - | UnsubscribeLogsCmd, + | UnsubscribeLogsCmd + | RestartDaemonCmd, Field(discriminator="type"), ]