Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 101 additions & 13 deletions src/reachy_mini/daemon/backend/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import typing
from abc import abstractmethod
from pathlib import Path
from typing import Annotated, Any, Callable, Dict, Optional
from typing import Annotated, Any, Awaitable, Callable, Dict, Optional

import numpy as np
from numpy.typing import NDArray
Expand All @@ -40,6 +40,7 @@
MujocoBackendStatus,
PlaySoundCmd,
RecordedDataMsg,
RestartDaemonCmd,
RobotBackendStatus,
SetAntennasCmd,
SetAutomaticBodyYawCmd,
Expand Down Expand Up @@ -219,6 +220,13 @@ def __init__(
# GLib thread) can schedule cancellation correctly.
self._log_loop: Optional["asyncio.AbstractEventLoop"] = None

# Daemon-level lifecycle hook used by the `restart_daemon`
# DataChannel command. Wired by `Daemon.start()` after the
# backend is constructed (see `set_daemon_restart_handler`).
# Without it, `restart_daemon` reports an error to the caller
# rather than silently no-op'ing.
self._daemon_restart_handler: Optional[Callable[..., Awaitable[Any]]] = None

# Life cycle methods
def wrapped_run(self) -> None:
"""Run the backend in a try-except block to store errors."""
Expand Down Expand Up @@ -738,7 +746,9 @@ def stop_sound(self) -> None:
1.0032234352772091,
]

INIT_ANTENNAS_JOINT_POSITIONS = np.array((-0.1745, 0.1745)) # ~10° offset to reduce shaking at vertical
INIT_ANTENNAS_JOINT_POSITIONS = np.array(
(-0.1745, 0.1745)
) # ~10° offset to reduce shaking at vertical
SLEEP_ANTENNAS_JOINT_POSITIONS = np.array((-3.05, 3.05))
SLEEP_HEAD_POSE = np.array(
[
Expand Down Expand Up @@ -797,7 +807,9 @@ async def goto_sleep(self) -> None:
if dist_to_init_pose > 30:
# Move to the initial position
await self.goto_target(
self.INIT_HEAD_POSE, antennas=self.INIT_ANTENNAS_JOINT_POSITIONS, duration=1
self.INIT_HEAD_POSE,
antennas=self.INIT_ANTENNAS_JOINT_POSITIONS,
duration=1,
)
await asyncio.sleep(0.2)

Expand Down Expand Up @@ -1021,7 +1033,12 @@ def _maybe_ignore(field: str) -> bool:

elif isinstance(
cmd,
(SetVolumeCmd, GetVolumeCmd, SetMicrophoneVolumeCmd, GetMicrophoneVolumeCmd),
(
SetVolumeCmd,
GetVolumeCmd,
SetMicrophoneVolumeCmd,
GetMicrophoneVolumeCmd,
),
):
# Volume is a global robot setting, not per-session: a remote
# change persists for the next connection. This matches the
Expand Down Expand Up @@ -1090,6 +1107,9 @@ def _maybe_ignore(field: str) -> bool:
elif isinstance(cmd, UnsubscribeLogsCmd):
self._cancel_log_subscription(peer_id)

elif isinstance(cmd, RestartDaemonCmd):
self._dispatch_restart_daemon(cmd, send_response)

# ------------------------------------------------------------------
# journalctl log streaming over the typed transport (subscribe_logs)
# ------------------------------------------------------------------
Expand All @@ -1110,9 +1130,7 @@ def _start_log_subscription(
# is a no-op error rather than an exception so the consumer can
# blindly call `subscribeLogs` on every reconnect.
self._cancel_log_subscription(peer_id)
task = asyncio.create_task(
self._async_subscribe_logs(peer_id, send_response)
)
task = asyncio.create_task(self._async_subscribe_logs(peer_id, send_response))
self._log_tasks[peer_id] = task

def _cancel_log_subscription(self, peer_id: Optional[str]) -> None:
Expand Down Expand Up @@ -1173,9 +1191,7 @@ async def _async_subscribe_logs(
# journalctl -f shouldn't EOF except on shutdown;
# surface it so the consumer can decide to retry.
send_response(
LogStreamErrorMsg(
error="journalctl stream ended"
).model_dump()
LogStreamErrorMsg(error="journalctl stream ended").model_dump()
)
return
line = raw.decode("utf-8", errors="replace").rstrip("\n")
Expand All @@ -1185,9 +1201,7 @@ async def _async_subscribe_logs(
ts, sep, rest = line.partition(" ")
if not sep:
ts, rest = "", line
send_response(
LogLineMsg(timestamp=ts, line=rest).model_dump()
)
send_response(LogLineMsg(timestamp=ts, line=rest).model_dump())
except asyncio.CancelledError:
raise
except Exception as e:
Expand Down Expand Up @@ -1245,10 +1259,84 @@ async def _async_goto_sleep(
except Exception as e:
send_response({"error": str(e), "command": "goto_sleep"})

# ------------------------------------------------------------------
# restart_daemon dispatch
# ------------------------------------------------------------------

def _dispatch_restart_daemon(
self,
cmd: RestartDaemonCmd,
send_response: Callable[[dict[str, Any]], None],
) -> None:
"""Schedule a daemon restart and ack the caller before tearing down.

The data channel goes down with the media pipeline as part of
the restart, so we send the ack synchronously *before*
scheduling the work — otherwise the caller would never see a
confirmation that their request was accepted, only the
connection drop.
"""
handler = self._daemon_restart_handler
if handler is None:
send_response(
{
"error": "restart_daemon: no daemon lifecycle handler wired",
"command": "restart_daemon",
}
)
return

send_response(
{
"status": "ok",
"command": "restart_daemon",
"scheduled": True,
}
)
asyncio.create_task(self._async_daemon_restart(handler, cmd))

async def _async_daemon_restart(
self,
handler: Callable[..., Awaitable[Any]],
cmd: RestartDaemonCmd,
) -> None:
"""Run the daemon restart handler and surface failures to journalctl.

We can't reach the caller anymore once the restart starts (the
data channel closes), so failures are logged rather than
reported back over the typed transport.
"""
try:
await handler(
goto_sleep_on_stop=cmd.goto_sleep_on_stop,
wake_up_on_start=cmd.wake_up_on_start,
)
except Exception as e:
self.logger.error("restart_daemon failed: %s", e)

# ------------------------------------------------------------------
# WebRTC data channel interface (delegates to process_command)
# ------------------------------------------------------------------

def set_daemon_restart_handler(
self, handler: Callable[..., Awaitable[Any]]
) -> None:
"""Wire the daemon-level restart entry point used by ``restart_daemon``.

Called by ``Daemon.start()`` after the backend is constructed,
passing ``Daemon.restart`` as the handler. Decoupled via a
callback so the backend doesn't need to import the daemon
(avoids the circular ``daemon -> backend -> daemon`` import).

Args:
handler: An async callable accepting at minimum the keyword
arguments ``goto_sleep_on_stop: bool`` and
``wake_up_on_start: bool``. ``Daemon.restart`` matches
this shape.

"""
self._daemon_restart_handler = handler

def setup_media_server(self, media_server: Any) -> None:
"""Connect the backend to the media server.

Expand Down
7 changes: 7 additions & 0 deletions src/reachy_mini/daemon/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,13 @@ async def start(
hardware_config_filepath=hardware_config_filepath,
)

# Wire the typed-transport restart hook so peers can recover
# a degraded daemon (e.g. closed motor controller) over the
# WebRTC data channel without SSH or LAN HTTP access.
# Re-wired on every start() because each start builds a new
# backend instance.
self.backend.set_daemon_restart_handler(self.restart)

self.ws_server = WSServer(backend=self.backend)
self.ws_server.start()
self._thread_publish_status = Thread(
Expand Down
44 changes: 42 additions & 2 deletions src/reachy_mini/io/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
goto_target, wake_up, goto_sleep, play_sound,
set_motor_mode, set_torque, get_motor_mode,
set_gravity_compensation, set_automatic_body_yaw,
get_state, get_version, start_recording, stop_recording, append_record
get_state, get_version, start_recording, stop_recording, append_record,
subscribe_logs, unsubscribe_logs, restart_daemon

Server->Client message types:
joint_positions, head_pose, imu_data, recorded_data,
Expand Down Expand Up @@ -304,6 +305,44 @@ class UnsubscribeLogsCmd(BaseModel):
type: Literal["unsubscribe_logs"] = "unsubscribe_logs"


# ------------------------------------------------------------------
# Daemon lifecycle commands
#
# `restart_daemon` lets a peer recover a degraded daemon (e.g. a
# closed motor controller asserting on every command) over the same
# typed transport already used for control, without needing SSH or
# LAN-only HTTP access. Same effect as `POST /api/daemon/restart`,
# but reachable from a remote (Central-routed) WebRTC peer.
#
# Side-effects, by design:
# - The active WebRTC session is dropped: stopping the daemon tears
# down the media pipeline, which closes the data channel. The
# daemon sends an ack on the channel BEFORE kicking the restart
# off so well-behaved clients can distinguish "request accepted"
# from a transport failure.
# - The robot does NOT go to sleep before the restart unless
# `goto_sleep_on_stop=True`, and the wake-up trajectory is NOT
# replayed afterwards unless `wake_up_on_start=True`. Defaults
# match `Daemon.restart()` so the call is conservative.
# - Daemon parameters (sim mode, serial port, kinematics, audio,
# localhost_only, ...) are preserved from the previous `start()`.
# ------------------------------------------------------------------


class RestartDaemonCmd(BaseModel):
"""Restart the daemon process.

Recovery hatch over the WebRTC DataChannel: tears the backend
down (motor controller, control loop, media pipeline) and
re-runs ``Daemon.start()`` with the same parameters as the
previous start. The data channel is dropped as part of the
restart and the client must reconnect.
"""

type: Literal["restart_daemon"] = "restart_daemon"
goto_sleep_on_stop: bool = False
wake_up_on_start: bool = False


AnyCommand = Annotated[
SetTargetCmd
Expand Down Expand Up @@ -331,7 +370,8 @@ class UnsubscribeLogsCmd(BaseModel):
| SetMicrophoneVolumeCmd
| GetMicrophoneVolumeCmd
| SubscribeLogsCmd
| UnsubscribeLogsCmd,
| UnsubscribeLogsCmd
| RestartDaemonCmd,
Field(discriminator="type"),
]

Expand Down
Loading