From 6e742e7280f59ca02e824d98468447f63585c336 Mon Sep 17 00:00:00 2001 From: "vybhav@gmail.com" Date: Sat, 13 Dec 2025 19:19:21 +0530 Subject: [PATCH 1/9] add position and pose command managers --- .../managers/command/pose_command.py | 245 ++++++++++++++++++ .../managers/command/position_command.py | 234 +++++++++++++++++ .../managers/command/velocity_command.py | 2 +- 3 files changed, 480 insertions(+), 1 deletion(-) create mode 100644 genesis_forge/managers/command/pose_command.py create mode 100644 genesis_forge/managers/command/position_command.py diff --git a/genesis_forge/managers/command/pose_command.py b/genesis_forge/managers/command/pose_command.py new file mode 100644 index 0000000..6af5357 --- /dev/null +++ b/genesis_forge/managers/command/pose_command.py @@ -0,0 +1,245 @@ +from typing import Tuple, TypedDict + +import os +import torch +import genesis as gs +from genesis.utils.geom import euler_to_R +import numpy as np +from genesis_forge.genesis_env import GenesisEnv +from genesis_forge.gamepads import Gamepad + +from .command_manager import CommandManager, CommandRangeValue + + +THIS_DIR = os.path.dirname(os.path.abspath(__file__)) + + +class PoseCommandRange(TypedDict): + pos_x: CommandRangeValue + pos_y: CommandRangeValue + pos_z: CommandRangeValue + + +class PoseDebugVisualizerConfig(TypedDict): + """Defines the configuration for the debug visualizer.""" + + envs_idx: list[int] + """The indices of the environments to visualize. If None, all environments will be visualized.""" + + arrow_offset: float + """The vertical offset of the debug arrows from the top of the robot""" + + arrow_radius: float + """The radius of the shaft of the debug arrows""" + + commanded_color: Tuple[float, float, float, float] + """The color of the commanded velocity arrow""" + + + +DEFAULT_VISUALIZER_CONFIG: PoseDebugVisualizerConfig = { + "envs_idx": None, + "sphere_offset": 0.03, + "sphere_radius": 0.02, + "commanded_color": (0.0, 0.5, 0.0, 1.0), +} + + +class PoseCommandManager(CommandManager): + """ + Generates a position command from uniform distribution. + The command comprises of a linear velocity in x and y direction and an angular velocity around the z-axis. + + IMPORTANT: The position commands are interpreted as world-relative coordinates: + - X-axis: x coordinate of the target position + - Y-axis: y coordinate of the target position + - Z-axis: z coordinate of the target position + + :::{admonition} Debug Visualization + + If you set `debug_visualizer` to True, target sphere will be rendered above the target pos + + Arrow meanings: + + - GREEN: Commanded position for the robot in the world frame + + Args: + env: The environment to control + range: The ranges of linear & angular velocities + resample_time_sec: The time interval between changing the command + debug_visualizer: Enable the debug arrow visualization + debug_visualizer_cfg: The configuration for the debug visualizer + + Example:: + + class MyEnv(GenesisEnv): + def config(self): + # Create a velocity command manager + self.command_manager = PoseCommandManager( + self, + visualize=True, + range = { + "lin_vel_x_range": (-1.0, 1.0), + "lin_vel_y_range": (-1.0, 1.0), + "ang_vel_z_range": (-0.5, 0.5), + } + ) + + RewardManager( + self, + logging_enabled=True, + cfg={ + "tracking_lin_vel": { + "weight": 1.0, + "fn": rewards.command_tracking_lin_vel, + "params": { + "vel_cmd_manager": self.velocity_command, + }, + }, + "tracking_ang_vel": { + "weight": 1.0, + "fn": rewards.command_tracking_ang_vel, + "params": { + "vel_cmd_manager": self.velocity_command, + }, + }, + # ... other rewards ... + }, + ) + + # Observations + ObservationManager( + self, + cfg={ + "velocity_cmd": {"fn": self.velocity_command.observation}, + # ... other observations ... + }, + ) + """ + + def __init__( + self, + env: GenesisEnv, + range: PoseCommandRange, + resample_time_sec: float = 5.0, + debug_visualizer: bool = False, + debug_visualizer_cfg: PoseDebugVisualizerConfig = DEFAULT_VISUALIZER_CONFIG, + ): + super().__init__(env, range=range, resample_time_sec=resample_time_sec) + self._sphere_nodes: list = [] + self.debug_visualizer = debug_visualizer + self.visualizer_cfg = {**DEFAULT_VISUALIZER_CONFIG, **debug_visualizer_cfg} + self.debug_envs_idx = None + + self._is_standing_env = torch.zeros( + env.num_envs, dtype=torch.bool, device=gs.device + ) + + """ + Lifecycle Operations + """ + + def resample_command(self, env_ids: list[int]): + """ + Overwrites commands for environments that should be standing still. + """ + super().resample_command(env_ids) + if not self.enabled: + return + + def build(self): + """Build the position command manager""" + super().build() + + # If debug envs_idx is not set, attempt to use the vis_options rendered_envs_idx + if not self.debug_visualizer or self.visualizer_cfg is None or self.env.scene is None: + return + self.debug_envs_idx = self.visualizer_cfg.get("envs_idx", None) + if self.debug_envs_idx is None and self.env.scene.vis_options is not None: + self.debug_envs_idx = self.env.scene.vis_options.rendered_envs_idx + if self.debug_envs_idx is None: + self.debug_envs_idx = list[int](range(self.env.num_envs)) + + def step(self): + """Render the command arrows""" + if not self.enabled: + return + super().step() + self._render_arrow() + + def use_gamepad( + self, + gamepad: Gamepad, + pos_x_axis: int = 0, + pos_y_axis: int = 1, + pos_z_axis: int = 2, + euler_x_axis: int = 3, + euler_y_axis: int = 4, + euler_z_axis: int = 5, + ): + """ + Use a connected gamepad to control the command. + + Args: + gamepad: The gamepad to use. + pos_x_axis: Map this gamepad axis index to the position in the x-direction. + pos_y_axis: Map this gamepad axis index to the position in the y-direction. + pos_z_axis: Map this gamepad axis index to the position in the z-direction. + euler_x_axis: Map this gamepad axis index to the euler in the x-direction. + euler_y_axis: Map this gamepad axis index to the euler in the y-direction. + euler_z_axis: Map this gamepad axis index to the euler in the z-direction. + """ + super().use_gamepad( + gamepad, + range_axis={ + "pos_x": pos_x_axis, + "pos_y": pos_y_axis, + "pos_z": pos_z_axis, + "euler_x": euler_x_axis, + "euler_y": euler_y_axis, + "euler_z": euler_z_axis, + }, + ) + + """ + Internal Implementation + """ + + def _render_arrow(self): + """ + Render the command sphere showing position commands. + + The commanded position sphere (green) shows the position in the world frame + """ + if not self.debug_visualizer: + return + + # Remove existing arrows + for arrow in self._arrow_nodes: + self.env.scene.clear_debug_object(arrow) + self._arrow_nodes = [] + + for i in self.debug_envs_idx: + # Target arrow (robot-relative command transformed to world coordinates for visualization) + self._draw_arrow( + pos=self.command[i], + color=self.visualizer_cfg["commanded_color"], + ) + + def _draw_arrow( + self, + pos: torch.Tensor, + euler: torch.Tensor, + color: list[float], + ): + try: + node = self.env.scene.draw_debug_arrow( + pos=pos.cpu().numpy(), + vec=np.tile([0,0,1], (pos.shape[0], 1))@euler_to_R(euler), + color=color, + radius=self.visualizer_cfg["arrow_radius"], + ) + if node: + self._sphere_nodes.append(node) + except Exception as e: + print(f"Error adding debug visualizing in PoseCommandManager: {e}") diff --git a/genesis_forge/managers/command/position_command.py b/genesis_forge/managers/command/position_command.py new file mode 100644 index 0000000..23e275b --- /dev/null +++ b/genesis_forge/managers/command/position_command.py @@ -0,0 +1,234 @@ +from typing import Tuple, TypedDict + +import os +import torch +import genesis as gs + +from genesis_forge.genesis_env import GenesisEnv +from genesis_forge.gamepads import Gamepad + +from .command_manager import CommandManager, CommandRangeValue + + +THIS_DIR = os.path.dirname(os.path.abspath(__file__)) + + +class PositionCommandRange(TypedDict): + pos_x: CommandRangeValue + pos_y: CommandRangeValue + pos_z: CommandRangeValue + + +class PositionDebugVisualizerConfig(TypedDict): + """Defines the configuration for the debug visualizer.""" + + envs_idx: list[int] + """The indices of the environments to visualize. If None, all environments will be visualized.""" + + sphere_offset: float + """The vertical offset of the debug arrows from the top of the robot""" + + sphere_radius: float + """The radius of the shaft of the debug arrows""" + + commanded_color: Tuple[float, float, float, float] + """The color of the commanded velocity arrow""" + + + +DEFAULT_VISUALIZER_CONFIG: PositionDebugVisualizerConfig = { + "envs_idx": None, + "sphere_offset": 0.03, + "sphere_radius": 0.02, + "commanded_color": (0.0, 0.5, 0.0, 1.0), +} + + +class PositionCommandManager(CommandManager): + """ + Generates a position command from uniform distribution. + The command comprises of a linear velocity in x and y direction and an angular velocity around the z-axis. + + IMPORTANT: The position commands are interpreted as world-relative coordinates: + - X-axis: x coordinate of the target position + - Y-axis: y coordinate of the target position + - Z-axis: z coordinate of the target position + + :::{admonition} Debug Visualization + + If you set `debug_visualizer` to True, target sphere will be rendered above the target pos + + Sphere meanings: + + - GREEN: Commanded position for the robot in the world frame + + Args: + env: The environment to control + range: The ranges of linear & angular velocities + resample_time_sec: The time interval between changing the command + debug_visualizer: Enable the debug arrow visualization + debug_visualizer_cfg: The configuration for the debug visualizer + + Example:: + + class MyEnv(GenesisEnv): + def config(self): + # Create a velocity command manager + self.command_manager = PositionCommandManager( + self, + visualize=True, + range = { + "lin_vel_x_range": (-1.0, 1.0), + "lin_vel_y_range": (-1.0, 1.0), + "ang_vel_z_range": (-0.5, 0.5), + } + ) + + RewardManager( + self, + logging_enabled=True, + cfg={ + "tracking_lin_vel": { + "weight": 1.0, + "fn": rewards.command_tracking_lin_vel, + "params": { + "vel_cmd_manager": self.velocity_command, + }, + }, + "tracking_ang_vel": { + "weight": 1.0, + "fn": rewards.command_tracking_ang_vel, + "params": { + "vel_cmd_manager": self.velocity_command, + }, + }, + # ... other rewards ... + }, + ) + + # Observations + ObservationManager( + self, + cfg={ + "velocity_cmd": {"fn": self.velocity_command.observation}, + # ... other observations ... + }, + ) + """ + + def __init__( + self, + env: GenesisEnv, + range: PositionCommandRange, + resample_time_sec: float = 5.0, + debug_visualizer: bool = False, + debug_visualizer_cfg: PositionDebugVisualizerConfig = DEFAULT_VISUALIZER_CONFIG, + ): + super().__init__(env, range=range, resample_time_sec=resample_time_sec) + self._sphere_nodes: list = [] + self.debug_visualizer = debug_visualizer + self.visualizer_cfg = {**DEFAULT_VISUALIZER_CONFIG, **debug_visualizer_cfg} + self.debug_envs_idx = None + + self._is_standing_env = torch.zeros( + env.num_envs, dtype=torch.bool, device=gs.device + ) + + """ + Lifecycle Operations + """ + + def resample_command(self, env_ids: list[int]): + """ + Overwrites commands for environments that should be standing still. + """ + super().resample_command(env_ids) + if not self.enabled: + return + + def build(self): + """Build the position command manager""" + super().build() + + # If debug envs_idx is not set, attempt to use the vis_options rendered_envs_idx + if not self.debug_visualizer or self.visualizer_cfg is None or self.env.scene is None: + return + self.debug_envs_idx = self.visualizer_cfg.get("envs_idx", None) + if self.debug_envs_idx is None and self.env.scene.vis_options is not None: + self.debug_envs_idx = self.env.scene.vis_options.rendered_envs_idx + if self.debug_envs_idx is None: + self.debug_envs_idx = list[int](range(self.env.num_envs)) + + def step(self): + """Render the command arrows""" + if not self.enabled: + return + super().step() + self._render_sphere() + + def use_gamepad( + self, + gamepad: Gamepad, + pos_x_axis: int = 0, + pos_y_axis: int = 1, + pos_z_axis: int = 2, + ): + """ + Use a connected gamepad to control the command. + + Args: + gamepad: The gamepad to use. + pos_x_axis: Map this gamepad axis index to the position in the x-direction. + pos_y_axis: Map this gamepad axis index to the position in the y-direction. + pos_z_axis: Map this gamepad axis index to the position in the z-direction. + """ + super().use_gamepad( + gamepad, + range_axis={ + "pos_x": pos_x_axis, + "pos_y": pos_y_axis, + "pos_z": pos_z_axis, + }, + ) + + """ + Internal Implementation + """ + + def _render_sphere(self): + """ + Render the command sphere showing position commands. + + The commanded position sphere (green) shows the position in the world frame + """ + if not self.debug_visualizer: + return + + # Remove existing arrows + for sphere in self._sphere_nodes: + self.env.scene.clear_debug_object(sphere) + self._sphere_nodes = [] + + for i in self.debug_envs_idx: + # Target arrow (robot-relative command transformed to world coordinates for visualization) + self._draw_sphere( + pos=self.command[i], + color=self.visualizer_cfg["commanded_color"], + ) + + def _draw_sphere( + self, + pos: torch.Tensor, + color: list[float], + ): + try: + node = self.env.scene.draw_debug_sphere( + pos=pos.cpu().numpy(), + color=color, + radius=self.visualizer_cfg["sphere_radius"], + ) + if node: + self._sphere_nodes.append(node) + except Exception as e: + print(f"Error adding debug visualizing in PositionCommandManager: {e}") + diff --git a/genesis_forge/managers/command/velocity_command.py b/genesis_forge/managers/command/velocity_command.py index 942183c..0f34964 100644 --- a/genesis_forge/managers/command/velocity_command.py +++ b/genesis_forge/managers/command/velocity_command.py @@ -175,7 +175,7 @@ def build(self): def build_debug(self): """Build the debug components of the velocity command manager""" - if not self.debug_visualizer or self.visualizer_cfg is None: + if not self.debug_visualizer or self.visualizer_cfg is None or self.env.scene is None: return # Pre-allocate buffers From 01b0ffbaf76a6a807d14282c0d8dc1feae14fb60 Mon Sep 17 00:00:00 2001 From: "vybhav@gmail.com" Date: Sat, 13 Dec 2025 20:04:54 +0530 Subject: [PATCH 2/9] add pose and position rewards --- genesis_forge/mdp/rewards.py | 126 +++++++++++++++++++++++++++++++++++ 1 file changed, 126 insertions(+) diff --git a/genesis_forge/mdp/rewards.py b/genesis_forge/mdp/rewards.py index eae526b..26928ad 100644 --- a/genesis_forge/mdp/rewards.py +++ b/genesis_forge/mdp/rewards.py @@ -7,6 +7,7 @@ import torch import genesis as gs +from genesis.utils.geom import quat_to_xyz from genesis_forge.genesis_env import GenesisEnv from genesis_forge.managers import ( ActuatorManager, @@ -282,6 +283,131 @@ def action_rate_l2(env: GenesisEnv) -> torch.Tensor: return torch.sum(torch.square(last_actions - actions), dim=1) +""" +Position Command Rewards +""" + +def commonad_tracking_base_position( + env: GenesisEnv, + command: torch.Tensor = None, + position_cmd_manager: PositionCommandManager = None, + sensitivity: float = 0.25, + entity_attr: str = "robot", + link_name: str = None, + entity_manager: EntityManager = None, +) -> torch.Tensor: + """ + Penalize base pose away from target. + + Args: + env: The Genesis environment containing the robot + command: The commanded XYZ position the the world frame, its shape is(num_envs, 3) + position_cmd_manager: The velocity command manager + sensitivity: A lower value means the reward is more sensitive to the error + entity_attr: The attribute name of the entity in the environment. + entity_manager: The entity manager for the entity. + + Returns: + torch.Tensor: Penalty for base position away from target + """ + assert ( + command is not None or position_cmd_manager is not None + ), "Either command or position_cmd_manager must be provided to commonad_tracking_base_position" + + if entity_manager is not None: + entity=entity_manager.entity + else: + entity = getattr(env, entity_attr) + + if link_name is None or link_name == "": + link= entity.links[0] + else: + try: + link=entity.get_link(link_name) + except Exception as e: + raise ValueError(f"Link name '{link_name}' not found in entity '{entity_attr}'.") + + pos = link.get_pos() + if position_cmd_manager is not None: + command = position_cmd_manager.command[:, :2] + else: + command = command[:, :2] + + base_pos_error= torch.sum(torch.square(command - pos), dim=1) + return torch.square(-base_pos_error/sensitivity) + +""" +Pose Command Rewards +""" + +def command_tracking_base_pose( + env: GenesisEnv, + command: torch.Tensor = None, + pose_cmd_manager: PoseCommandManager = None, + pos_sensitivity: float = 0.25, + euler_sensitivity: float = 0.25, + entity_attr: str = "robot", + link_name: str = None, + entity_manager: 'EntityManager' = None, +) -> torch.Tensor: + """ + Penalize base pose away from target (both position and Euler angles) with separate sensitivities. + + Args: + env: The Genesis environment containing the robot + command: The commanded XYZ position and Euler angles (in world frame), shape (num_envs, 6) where first 3 are position and last 3 are Euler angles. + position_cmd_manager: The velocity command manager + pos_sensitivity: A lower value means the reward is more sensitive to position error + euler_sensitivity: A lower value means the reward is more sensitive to Euler angle error + entity_attr: The attribute name of the entity in the environment. + entity_manager: The entity manager for the entity. + + Returns: + torch.Tensor: Penalty for base position and Euler angles away from target + """ + assert ( + command is not None or pose_cmd_manager is not None + ), "Either command or position_cmd_manager must be provided to command_tracking_base_pose" + + if entity_manager is not None: + entity=entity_manager.entity + else: + entity = getattr(env, entity_attr) + + if link_name is None or link_name == "": + link= entity.links[0] + else: + try: + link=entity.get_link(link_name) + except Exception as e: + raise ValueError(f"Link name '{link_name}' not found in entity '{entity_attr}'.") + + link_pos = link.get_pos() + link_euler = quat_to_xyz(link.get_quat()) + + if pose_cmd_manager is not None: + command_pos = pose_cmd_manager.command[:, :3] + command_euler = pose_cmd_manager.command[:, 3:6] + else: + command_pos = command[:, :3] + command_euler = command[:, 3:6] + + pos_error = torch.sum(torch.square(command_pos - link_pos), dim=1) + + euler_error = torch.sum(torch.square( + torch.min( + torch.abs(command_euler - link_euler), + 2 * torch.pi - torch.abs(command_euler - link_euler) + ) + ), dim=1) + + pos_penalty = torch.square(-pos_error / pos_sensitivity) + euler_penalty = torch.square(-euler_error / euler_sensitivity) + + total_penalty = pos_penalty + euler_penalty + + return total_penalty + """ Velocity Command Rewards """ From 96adb9782a1390da3c401cdf95ae1eb1fa4e29ac Mon Sep 17 00:00:00 2001 From: "vybhav@gmail.com" Date: Sat, 13 Dec 2025 20:12:40 +0530 Subject: [PATCH 3/9] update inits and imports --- genesis_forge/managers/__init__.py | 9 ++++++++- genesis_forge/managers/command/__init__.py | 4 ++++ genesis_forge/mdp/rewards.py | 5 +++++ 3 files changed, 17 insertions(+), 1 deletion(-) diff --git a/genesis_forge/managers/__init__.py b/genesis_forge/managers/__init__.py index db47035..7625213 100644 --- a/genesis_forge/managers/__init__.py +++ b/genesis_forge/managers/__init__.py @@ -3,7 +3,12 @@ from .termination_manager import TerminationManager from .action.position_action_manager import PositionActionManager from .action.position_within_limits import PositionWithinLimitsActionManager -from .command import CommandManager, VelocityCommandManager +from .command import ( + CommandManager, + PositionCommandManager, + PoseCommandManager, + VelocityCommandManager +) from .contact import ContactManager from .terrain_manager import TerrainManager from .entity_manager import EntityManager @@ -19,6 +24,8 @@ "RewardManager", "TerminationManager", "CommandManager", + "PositionCommandManager", + "PoseCommandManager", "VelocityCommandManager", "PositionActionManager", "PositionWithinLimitsActionManager", diff --git a/genesis_forge/managers/command/__init__.py b/genesis_forge/managers/command/__init__.py index 6a8040c..31f3ad0 100644 --- a/genesis_forge/managers/command/__init__.py +++ b/genesis_forge/managers/command/__init__.py @@ -1,7 +1,11 @@ from .command_manager import CommandManager +from .position_command import PositionCommandManager +from .pose_command import PoseCommandManager from .velocity_command import VelocityCommandManager __all__ = [ "CommandManager", + "PositionCommandManager", + "PoseCommandManager", "VelocityCommandManager", ] diff --git a/genesis_forge/mdp/rewards.py b/genesis_forge/mdp/rewards.py index 26928ad..9c0abc5 100644 --- a/genesis_forge/mdp/rewards.py +++ b/genesis_forge/mdp/rewards.py @@ -12,6 +12,8 @@ from genesis_forge.managers import ( ActuatorManager, CommandManager, + PositionCommandManager, + PoseCommandManager, VelocityCommandManager, PositionActionManager, ContactManager, @@ -287,6 +289,7 @@ def action_rate_l2(env: GenesisEnv) -> torch.Tensor: Position Command Rewards """ + def commonad_tracking_base_position( env: GenesisEnv, command: torch.Tensor = None, @@ -336,10 +339,12 @@ def commonad_tracking_base_position( base_pos_error= torch.sum(torch.square(command - pos), dim=1) return torch.square(-base_pos_error/sensitivity) + """ Pose Command Rewards """ + def command_tracking_base_pose( env: GenesisEnv, command: torch.Tensor = None, From b15dc763646887d4518a2089d5b776d960c3f1f0 Mon Sep 17 00:00:00 2001 From: "vybhav@gmail.com" Date: Mon, 15 Dec 2025 19:57:55 +0530 Subject: [PATCH 4/9] fix docstrings and formating --- genesis_forge/managers/__init__.py | 4 +- .../managers/command/pose_command.py | 76 ++++++++++--------- .../managers/command/position_command.py | 35 ++++----- .../managers/command/velocity_command.py | 6 +- genesis_forge/mdp/rewards.py | 8 +- 5 files changed, 65 insertions(+), 64 deletions(-) diff --git a/genesis_forge/managers/__init__.py b/genesis_forge/managers/__init__.py index 7625213..16040ca 100644 --- a/genesis_forge/managers/__init__.py +++ b/genesis_forge/managers/__init__.py @@ -4,10 +4,10 @@ from .action.position_action_manager import PositionActionManager from .action.position_within_limits import PositionWithinLimitsActionManager from .command import ( - CommandManager, + CommandManager, PositionCommandManager, PoseCommandManager, - VelocityCommandManager + VelocityCommandManager, ) from .contact import ContactManager from .terrain_manager import TerrainManager diff --git a/genesis_forge/managers/command/pose_command.py b/genesis_forge/managers/command/pose_command.py index 6af5357..02c5b45 100644 --- a/genesis_forge/managers/command/pose_command.py +++ b/genesis_forge/managers/command/pose_command.py @@ -18,6 +18,9 @@ class PoseCommandRange(TypedDict): pos_x: CommandRangeValue pos_y: CommandRangeValue pos_z: CommandRangeValue + euler_x: CommandRangeValue + euler_y: CommandRangeValue + euler_z: CommandRangeValue class PoseDebugVisualizerConfig(TypedDict): @@ -33,39 +36,41 @@ class PoseDebugVisualizerConfig(TypedDict): """The radius of the shaft of the debug arrows""" commanded_color: Tuple[float, float, float, float] - """The color of the commanded velocity arrow""" - + """The color of the pose arrow""" DEFAULT_VISUALIZER_CONFIG: PoseDebugVisualizerConfig = { "envs_idx": None, - "sphere_offset": 0.03, - "sphere_radius": 0.02, + "arrow_offset": 0.03, + "arrow_radius": 0.02, "commanded_color": (0.0, 0.5, 0.0, 1.0), } class PoseCommandManager(CommandManager): """ - Generates a position command from uniform distribution. - The command comprises of a linear velocity in x and y direction and an angular velocity around the z-axis. + Generates a pose command from uniform distribution. + The command comprises of a (x,y,z) position and (x,y,z) euler angles IMPORTANT: The position commands are interpreted as world-relative coordinates: - - X-axis: x coordinate of the target position - - Y-axis: y coordinate of the target position - - Z-axis: z coordinate of the target position + - pos-X-axis: x coordinate of the target position + - pos-Y-axis: y coordinate of the target position + - pos-Z-axis: z coordinate of the target position + - euler-X-axis: x coordinate of the target orientation + - euler-Y-axis: y coordinate of the target orientation + - euler-Z-axis: z coordinate of the target orientation :::{admonition} Debug Visualization - If you set `debug_visualizer` to True, target sphere will be rendered above the target pos + If you set `debug_visualizer` to True, target arrow will be rendered above the target pose Arrow meanings: - - GREEN: Commanded position for the robot in the world frame + - GREEN: Commanded pose for the robot in the world frame Args: env: The environment to control - range: The ranges of linear & angular velocities + range: The ranges of positions and orientation resample_time_sec: The time interval between changing the command debug_visualizer: Enable the debug arrow visualization debug_visualizer_cfg: The configuration for the debug visualizer @@ -75,13 +80,13 @@ class PoseCommandManager(CommandManager): class MyEnv(GenesisEnv): def config(self): # Create a velocity command manager - self.command_manager = PoseCommandManager( + self.pose_command_manager = PoseCommandManager( self, visualize=True, range = { - "lin_vel_x_range": (-1.0, 1.0), - "lin_vel_y_range": (-1.0, 1.0), - "ang_vel_z_range": (-0.5, 0.5), + "pos_x_range": (-5.0, 5.0), + "pos_y_range": (-5.0, 5.0), + "euler_z_range": (-1.57, 1.57), } ) @@ -89,18 +94,11 @@ def config(self): self, logging_enabled=True, cfg={ - "tracking_lin_vel": { + "tracking_pose": { "weight": 1.0, - "fn": rewards.command_tracking_lin_vel, + "fn": rewards.command_tracking_pose, "params": { - "vel_cmd_manager": self.velocity_command, - }, - }, - "tracking_ang_vel": { - "weight": 1.0, - "fn": rewards.command_tracking_ang_vel, - "params": { - "vel_cmd_manager": self.velocity_command, + "pose_cmd_manager": self.pose_command_manager, }, }, # ... other rewards ... @@ -111,7 +109,7 @@ def config(self): ObservationManager( self, cfg={ - "velocity_cmd": {"fn": self.velocity_command.observation}, + "pose_cmd": {"fn": self.pose_command_manager.observation}, # ... other observations ... }, ) @@ -148,12 +146,16 @@ def resample_command(self, env_ids: list[int]): return def build(self): - """Build the position command manager""" + """Build the pose command manager""" super().build() # If debug envs_idx is not set, attempt to use the vis_options rendered_envs_idx - if not self.debug_visualizer or self.visualizer_cfg is None or self.env.scene is None: - return + if ( + not self.debug_visualizer + or self.visualizer_cfg is None + or self.env.scene is None + ): + return self.debug_envs_idx = self.visualizer_cfg.get("envs_idx", None) if self.debug_envs_idx is None and self.env.scene.vis_options is not None: self.debug_envs_idx = self.env.scene.vis_options.rendered_envs_idx @@ -166,7 +168,7 @@ def step(self): return super().step() self._render_arrow() - + def use_gamepad( self, gamepad: Gamepad, @@ -185,9 +187,9 @@ def use_gamepad( pos_x_axis: Map this gamepad axis index to the position in the x-direction. pos_y_axis: Map this gamepad axis index to the position in the y-direction. pos_z_axis: Map this gamepad axis index to the position in the z-direction. - euler_x_axis: Map this gamepad axis index to the euler in the x-direction. - euler_y_axis: Map this gamepad axis index to the euler in the y-direction. - euler_z_axis: Map this gamepad axis index to the euler in the z-direction. + euler_x_axis: Map this gamepad axis index to the orientation in the x-direction. + euler_y_axis: Map this gamepad axis index to the orientation in the y-direction. + euler_z_axis: Map this gamepad axis index to the orientation in the z-direction. """ super().use_gamepad( gamepad, @@ -207,9 +209,9 @@ def use_gamepad( def _render_arrow(self): """ - Render the command sphere showing position commands. + Render the command arrow showing pose commands. - The commanded position sphere (green) shows the position in the world frame + The commanded pose arrow (green) shows the pose in the world frame """ if not self.debug_visualizer: return @@ -235,7 +237,7 @@ def _draw_arrow( try: node = self.env.scene.draw_debug_arrow( pos=pos.cpu().numpy(), - vec=np.tile([0,0,1], (pos.shape[0], 1))@euler_to_R(euler), + vec=np.tile([0, 0, 1], (pos.shape[0], 1)) @ euler_to_R(euler), color=color, radius=self.visualizer_cfg["arrow_radius"], ) diff --git a/genesis_forge/managers/command/position_command.py b/genesis_forge/managers/command/position_command.py index 23e275b..0d1bb71 100644 --- a/genesis_forge/managers/command/position_command.py +++ b/genesis_forge/managers/command/position_command.py @@ -35,7 +35,6 @@ class PositionDebugVisualizerConfig(TypedDict): """The color of the commanded velocity arrow""" - DEFAULT_VISUALIZER_CONFIG: PositionDebugVisualizerConfig = { "envs_idx": None, "sphere_offset": 0.03, @@ -74,13 +73,13 @@ class PositionCommandManager(CommandManager): class MyEnv(GenesisEnv): def config(self): # Create a velocity command manager - self.command_manager = PositionCommandManager( + self.position_command_manager = PositionCommandManager( self, visualize=True, range = { - "lin_vel_x_range": (-1.0, 1.0), - "lin_vel_y_range": (-1.0, 1.0), - "ang_vel_z_range": (-0.5, 0.5), + "pos_x_range": (-5.0, 5.0), + "pos_y_range": (-5.0, 5.0), + "pos_z_range": (0.29, 0.31), } ) @@ -88,18 +87,11 @@ def config(self): self, logging_enabled=True, cfg={ - "tracking_lin_vel": { - "weight": 1.0, - "fn": rewards.command_tracking_lin_vel, - "params": { - "vel_cmd_manager": self.velocity_command, - }, - }, - "tracking_ang_vel": { + "tracking_position": { "weight": 1.0, - "fn": rewards.command_tracking_ang_vel, + "fn": rewards.command_tracking_pos, "params": { - "vel_cmd_manager": self.velocity_command, + "position_cmd_manager": self.position_command_manager, }, }, # ... other rewards ... @@ -110,7 +102,7 @@ def config(self): ObservationManager( self, cfg={ - "velocity_cmd": {"fn": self.velocity_command.observation}, + "position_cmd": {"fn": self.position_command_manager.observation}, # ... other observations ... }, ) @@ -151,7 +143,11 @@ def build(self): super().build() # If debug envs_idx is not set, attempt to use the vis_options rendered_envs_idx - if not self.debug_visualizer or self.visualizer_cfg is None or self.env.scene is None: + if ( + not self.debug_visualizer + or self.visualizer_cfg is None + or self.env.scene is None + ): return self.debug_envs_idx = self.visualizer_cfg.get("envs_idx", None) if self.debug_envs_idx is None and self.env.scene.vis_options is not None: @@ -160,7 +156,7 @@ def build(self): self.debug_envs_idx = list[int](range(self.env.num_envs)) def step(self): - """Render the command arrows""" + """Render the command spheres""" if not self.enabled: return super().step() @@ -199,7 +195,7 @@ def _render_sphere(self): """ Render the command sphere showing position commands. - The commanded position sphere (green) shows the position in the world frame + The commanded position sphere (green) shows the position in the world frame """ if not self.debug_visualizer: return @@ -231,4 +227,3 @@ def _draw_sphere( self._sphere_nodes.append(node) except Exception as e: print(f"Error adding debug visualizing in PositionCommandManager: {e}") - diff --git a/genesis_forge/managers/command/velocity_command.py b/genesis_forge/managers/command/velocity_command.py index 0f34964..67be6da 100644 --- a/genesis_forge/managers/command/velocity_command.py +++ b/genesis_forge/managers/command/velocity_command.py @@ -175,7 +175,11 @@ def build(self): def build_debug(self): """Build the debug components of the velocity command manager""" - if not self.debug_visualizer or self.visualizer_cfg is None or self.env.scene is None: + if ( + not self.debug_visualizer + or self.visualizer_cfg is None + or self.env.scene is None + ): return # Pre-allocate buffers diff --git a/genesis_forge/mdp/rewards.py b/genesis_forge/mdp/rewards.py index 9c0abc5..160e83a 100644 --- a/genesis_forge/mdp/rewards.py +++ b/genesis_forge/mdp/rewards.py @@ -290,7 +290,7 @@ def action_rate_l2(env: GenesisEnv) -> torch.Tensor: """ -def commonad_tracking_base_position( +def commonad_tracking_position( env: GenesisEnv, command: torch.Tensor = None, position_cmd_manager: PositionCommandManager = None, @@ -300,7 +300,7 @@ def commonad_tracking_base_position( entity_manager: EntityManager = None, ) -> torch.Tensor: """ - Penalize base pose away from target. + Penalize base pos away from target. Args: env: The Genesis environment containing the robot @@ -345,7 +345,7 @@ def commonad_tracking_base_position( """ -def command_tracking_base_pose( +def command_tracking_pose( env: GenesisEnv, command: torch.Tensor = None, pose_cmd_manager: PoseCommandManager = None, @@ -356,7 +356,7 @@ def command_tracking_base_pose( entity_manager: 'EntityManager' = None, ) -> torch.Tensor: """ - Penalize base pose away from target (both position and Euler angles) with separate sensitivities. + Penalize base pose away from target (both position and orientation) with separate sensitivities. Args: env: The Genesis environment containing the robot From 850c5863aefc30730bbed25f34a841ce31a803aa Mon Sep 17 00:00:00 2001 From: "vybhav@gmail.com" Date: Mon, 15 Dec 2025 20:01:32 +0530 Subject: [PATCH 5/9] fix formatting --- genesis_forge/mdp/rewards.py | 70 ++++++++++++++++++++---------------- 1 file changed, 39 insertions(+), 31 deletions(-) diff --git a/genesis_forge/mdp/rewards.py b/genesis_forge/mdp/rewards.py index 160e83a..8ad94d0 100644 --- a/genesis_forge/mdp/rewards.py +++ b/genesis_forge/mdp/rewards.py @@ -316,28 +316,30 @@ def commonad_tracking_position( assert ( command is not None or position_cmd_manager is not None ), "Either command or position_cmd_manager must be provided to commonad_tracking_base_position" - + if entity_manager is not None: - entity=entity_manager.entity + entity = entity_manager.entity else: entity = getattr(env, entity_attr) - + if link_name is None or link_name == "": - link= entity.links[0] + link = entity.links[0] else: try: - link=entity.get_link(link_name) + link = entity.get_link(link_name) except Exception as e: - raise ValueError(f"Link name '{link_name}' not found in entity '{entity_attr}'.") - + raise ValueError( + f"Link name '{link_name}' not found in entity '{entity_attr}'." + ) + pos = link.get_pos() if position_cmd_manager is not None: command = position_cmd_manager.command[:, :2] else: command = command[:, :2] - - base_pos_error= torch.sum(torch.square(command - pos), dim=1) - return torch.square(-base_pos_error/sensitivity) + + base_pos_error = torch.sum(torch.square(command - pos), dim=1) + return torch.square(-base_pos_error / sensitivity) """ @@ -349,11 +351,11 @@ def command_tracking_pose( env: GenesisEnv, command: torch.Tensor = None, pose_cmd_manager: PoseCommandManager = None, - pos_sensitivity: float = 0.25, - euler_sensitivity: float = 0.25, + pos_sensitivity: float = 0.25, + euler_sensitivity: float = 0.25, entity_attr: str = "robot", link_name: str = None, - entity_manager: 'EntityManager' = None, + entity_manager: "EntityManager" = None, ) -> torch.Tensor: """ Penalize base pose away from target (both position and orientation) with separate sensitivities. @@ -373,38 +375,43 @@ def command_tracking_pose( assert ( command is not None or pose_cmd_manager is not None ), "Either command or position_cmd_manager must be provided to command_tracking_base_pose" - + if entity_manager is not None: - entity=entity_manager.entity + entity = entity_manager.entity else: entity = getattr(env, entity_attr) - + if link_name is None or link_name == "": - link= entity.links[0] + link = entity.links[0] else: try: - link=entity.get_link(link_name) + link = entity.get_link(link_name) except Exception as e: - raise ValueError(f"Link name '{link_name}' not found in entity '{entity_attr}'.") - + raise ValueError( + f"Link name '{link_name}' not found in entity '{entity_attr}'." + ) + link_pos = link.get_pos() - link_euler = quat_to_xyz(link.get_quat()) + link_euler = quat_to_xyz(link.get_quat()) if pose_cmd_manager is not None: - command_pos = pose_cmd_manager.command[:, :3] - command_euler = pose_cmd_manager.command[:, 3:6] + command_pos = pose_cmd_manager.command[:, :3] + command_euler = pose_cmd_manager.command[:, 3:6] else: - command_pos = command[:, :3] - command_euler = command[:, 3:6] + command_pos = command[:, :3] + command_euler = command[:, 3:6] pos_error = torch.sum(torch.square(command_pos - link_pos), dim=1) - euler_error = torch.sum(torch.square( - torch.min( - torch.abs(command_euler - link_euler), - 2 * torch.pi - torch.abs(command_euler - link_euler) - ) - ), dim=1) + euler_error = torch.sum( + torch.square( + torch.min( + torch.abs(command_euler - link_euler), + 2 * torch.pi - torch.abs(command_euler - link_euler), + ) + ), + dim=1, + ) pos_penalty = torch.square(-pos_error / pos_sensitivity) euler_penalty = torch.square(-euler_error / euler_sensitivity) @@ -413,6 +420,7 @@ def command_tracking_pose( return total_penalty + """ Velocity Command Rewards """ From 7b083eba74bc28113e36b08e603edd30d62e605d Mon Sep 17 00:00:00 2001 From: "vybhav@gmail.com" Date: Tue, 23 Dec 2025 03:36:13 +0530 Subject: [PATCH 6/9] address comments, fix variable names, remove unused code --- .../managers/command/pose_command.py | 71 ++++++++----------- .../managers/command/position_command.py | 59 ++++++--------- genesis_forge/mdp/rewards.py | 24 +++---- 3 files changed, 60 insertions(+), 94 deletions(-) diff --git a/genesis_forge/managers/command/pose_command.py b/genesis_forge/managers/command/pose_command.py index 02c5b45..bc3ea02 100644 --- a/genesis_forge/managers/command/pose_command.py +++ b/genesis_forge/managers/command/pose_command.py @@ -11,9 +11,6 @@ from .command_manager import CommandManager, CommandRangeValue -THIS_DIR = os.path.dirname(os.path.abspath(__file__)) - - class PoseCommandRange(TypedDict): pos_x: CommandRangeValue pos_y: CommandRangeValue @@ -53,12 +50,12 @@ class PoseCommandManager(CommandManager): The command comprises of a (x,y,z) position and (x,y,z) euler angles IMPORTANT: The position commands are interpreted as world-relative coordinates: - - pos-X-axis: x coordinate of the target position - - pos-Y-axis: y coordinate of the target position - - pos-Z-axis: z coordinate of the target position - - euler-X-axis: x coordinate of the target orientation - - euler-Y-axis: y coordinate of the target orientation - - euler-Z-axis: z coordinate of the target orientation + - pos-X: x coordinate of the target position + - pos-Y: y coordinate of the target position + - pos-Z: z coordinate of the target position + - euler-X: x coordinate of the target orientation + - euler-Y: y coordinate of the target orientation + - euler-Z: z coordinate of the target orientation :::{admonition} Debug Visualization @@ -82,11 +79,11 @@ def config(self): # Create a velocity command manager self.pose_command_manager = PoseCommandManager( self, - visualize=True, + debug_visualizer=True, range = { - "pos_x_range": (-5.0, 5.0), - "pos_y_range": (-5.0, 5.0), - "euler_z_range": (-1.57, 1.57), + "pos_x": (-5.0, 5.0), + "pos_y": (-5.0, 5.0), + "euler_z": (-1.57, 1.57), } ) @@ -129,22 +126,10 @@ def __init__( self.visualizer_cfg = {**DEFAULT_VISUALIZER_CONFIG, **debug_visualizer_cfg} self.debug_envs_idx = None - self._is_standing_env = torch.zeros( - env.num_envs, dtype=torch.bool, device=gs.device - ) - """ Lifecycle Operations """ - def resample_command(self, env_ids: list[int]): - """ - Overwrites commands for environments that should be standing still. - """ - super().resample_command(env_ids) - if not self.enabled: - return - def build(self): """Build the pose command manager""" super().build() @@ -172,34 +157,34 @@ def step(self): def use_gamepad( self, gamepad: Gamepad, - pos_x_axis: int = 0, - pos_y_axis: int = 1, - pos_z_axis: int = 2, - euler_x_axis: int = 3, - euler_y_axis: int = 4, - euler_z_axis: int = 5, + pos_x: int = 0, + pos_y: int = 1, + pos_z: int = 2, + euler_x: int = 3, + euler_y: int = 4, + euler_z: int = 5, ): """ Use a connected gamepad to control the command. Args: gamepad: The gamepad to use. - pos_x_axis: Map this gamepad axis index to the position in the x-direction. - pos_y_axis: Map this gamepad axis index to the position in the y-direction. - pos_z_axis: Map this gamepad axis index to the position in the z-direction. - euler_x_axis: Map this gamepad axis index to the orientation in the x-direction. - euler_y_axis: Map this gamepad axis index to the orientation in the y-direction. - euler_z_axis: Map this gamepad axis index to the orientation in the z-direction. + pos_x: Map this gamepad axis index to the position in the x-direction. + pos_y: Map this gamepad axis index to the position in the y-direction. + pos_z: Map this gamepad axis index to the position in the z-direction. + euler_x: Map this gamepad axis index to the orientation in the x-direction. + euler_y: Map this gamepad axis index to the orientation in the y-direction. + euler_z: Map this gamepad axis index to the orientation in the z-direction. """ super().use_gamepad( gamepad, range_axis={ - "pos_x": pos_x_axis, - "pos_y": pos_y_axis, - "pos_z": pos_z_axis, - "euler_x": euler_x_axis, - "euler_y": euler_y_axis, - "euler_z": euler_z_axis, + "pos_x": pos_x, + "pos_y": pos_y, + "pos_z": pos_z, + "euler_x": euler_x, + "euler_y": euler_y, + "euler_z": euler_z, }, ) diff --git a/genesis_forge/managers/command/position_command.py b/genesis_forge/managers/command/position_command.py index 0d1bb71..8baba1d 100644 --- a/genesis_forge/managers/command/position_command.py +++ b/genesis_forge/managers/command/position_command.py @@ -10,13 +10,10 @@ from .command_manager import CommandManager, CommandRangeValue -THIS_DIR = os.path.dirname(os.path.abspath(__file__)) - - class PositionCommandRange(TypedDict): - pos_x: CommandRangeValue - pos_y: CommandRangeValue - pos_z: CommandRangeValue + x: CommandRangeValue + y: CommandRangeValue + z: CommandRangeValue class PositionDebugVisualizerConfig(TypedDict): @@ -26,13 +23,13 @@ class PositionDebugVisualizerConfig(TypedDict): """The indices of the environments to visualize. If None, all environments will be visualized.""" sphere_offset: float - """The vertical offset of the debug arrows from the top of the robot""" + """The vertical offset of the debug sphere from the top of the robot""" sphere_radius: float - """The radius of the shaft of the debug arrows""" + """The radius of the shaft of the debug sphere""" commanded_color: Tuple[float, float, float, float] - """The color of the commanded velocity arrow""" + """The color of the commanded position sphere""" DEFAULT_VISUALIZER_CONFIG: PositionDebugVisualizerConfig = { @@ -46,7 +43,7 @@ class PositionDebugVisualizerConfig(TypedDict): class PositionCommandManager(CommandManager): """ Generates a position command from uniform distribution. - The command comprises of a linear velocity in x and y direction and an angular velocity around the z-axis. + The command comprises of a position in the x, y, and z axes. IMPORTANT: The position commands are interpreted as world-relative coordinates: - X-axis: x coordinate of the target position @@ -75,11 +72,11 @@ def config(self): # Create a velocity command manager self.position_command_manager = PositionCommandManager( self, - visualize=True, + debug_visualizer=True, range = { - "pos_x_range": (-5.0, 5.0), - "pos_y_range": (-5.0, 5.0), - "pos_z_range": (0.29, 0.31), + "x": (-5.0, 5.0), + "y": (-5.0, 5.0), + "z": (0.29, 0.31), } ) @@ -122,22 +119,10 @@ def __init__( self.visualizer_cfg = {**DEFAULT_VISUALIZER_CONFIG, **debug_visualizer_cfg} self.debug_envs_idx = None - self._is_standing_env = torch.zeros( - env.num_envs, dtype=torch.bool, device=gs.device - ) - """ Lifecycle Operations """ - def resample_command(self, env_ids: list[int]): - """ - Overwrites commands for environments that should be standing still. - """ - super().resample_command(env_ids) - if not self.enabled: - return - def build(self): """Build the position command manager""" super().build() @@ -165,25 +150,25 @@ def step(self): def use_gamepad( self, gamepad: Gamepad, - pos_x_axis: int = 0, - pos_y_axis: int = 1, - pos_z_axis: int = 2, + x: int = 0, + y: int = 1, + z: int = 2, ): """ Use a connected gamepad to control the command. Args: gamepad: The gamepad to use. - pos_x_axis: Map this gamepad axis index to the position in the x-direction. - pos_y_axis: Map this gamepad axis index to the position in the y-direction. - pos_z_axis: Map this gamepad axis index to the position in the z-direction. + x: Map this gamepad axis index to the position in the x-direction. + y: Map this gamepad axis index to the position in the y-direction. + z: Map this gamepad axis index to the position in the z-direction. """ super().use_gamepad( gamepad, range_axis={ - "pos_x": pos_x_axis, - "pos_y": pos_y_axis, - "pos_z": pos_z_axis, + "x": x, + "y": y, + "z": z, }, ) @@ -200,13 +185,13 @@ def _render_sphere(self): if not self.debug_visualizer: return - # Remove existing arrows + # Remove existing spheres for sphere in self._sphere_nodes: self.env.scene.clear_debug_object(sphere) self._sphere_nodes = [] for i in self.debug_envs_idx: - # Target arrow (robot-relative command transformed to world coordinates for visualization) + # Target sphere in the world frame for visualization) self._draw_sphere( pos=self.command[i], color=self.visualizer_cfg["commanded_color"], diff --git a/genesis_forge/mdp/rewards.py b/genesis_forge/mdp/rewards.py index 8ad94d0..5b45f75 100644 --- a/genesis_forge/mdp/rewards.py +++ b/genesis_forge/mdp/rewards.py @@ -290,7 +290,7 @@ def action_rate_l2(env: GenesisEnv) -> torch.Tensor: """ -def commonad_tracking_position( +def command_tracking_position( env: GenesisEnv, command: torch.Tensor = None, position_cmd_manager: PositionCommandManager = None, @@ -305,7 +305,7 @@ def commonad_tracking_position( Args: env: The Genesis environment containing the robot command: The commanded XYZ position the the world frame, its shape is(num_envs, 3) - position_cmd_manager: The velocity command manager + position_cmd_manager: The Position command manager sensitivity: A lower value means the reward is more sensitive to the error entity_attr: The attribute name of the entity in the environment. entity_manager: The entity manager for the entity. @@ -323,7 +323,7 @@ def commonad_tracking_position( entity = getattr(env, entity_attr) if link_name is None or link_name == "": - link = entity.links[0] + link = entity.base_link else: try: link = entity.get_link(link_name) @@ -334,9 +334,7 @@ def commonad_tracking_position( pos = link.get_pos() if position_cmd_manager is not None: - command = position_cmd_manager.command[:, :2] - else: - command = command[:, :2] + command = position_cmd_manager.command base_pos_error = torch.sum(torch.square(command - pos), dim=1) return torch.square(-base_pos_error / sensitivity) @@ -363,7 +361,7 @@ def command_tracking_pose( Args: env: The Genesis environment containing the robot command: The commanded XYZ position and Euler angles (in world frame), shape (num_envs, 6) where first 3 are position and last 3 are Euler angles. - position_cmd_manager: The velocity command manager + pose_cmd_manager: The Pose command manager pos_sensitivity: A lower value means the reward is more sensitive to position error euler_sensitivity: A lower value means the reward is more sensitive to Euler angle error entity_attr: The attribute name of the entity in the environment. @@ -374,7 +372,7 @@ def command_tracking_pose( """ assert ( command is not None or pose_cmd_manager is not None - ), "Either command or position_cmd_manager must be provided to command_tracking_base_pose" + ), "Either command or pose_cmd_manager must be provided to command_tracking_pose" if entity_manager is not None: entity = entity_manager.entity @@ -382,7 +380,7 @@ def command_tracking_pose( entity = getattr(env, entity_attr) if link_name is None or link_name == "": - link = entity.links[0] + link = entity.base_link else: try: link = entity.get_link(link_name) @@ -395,11 +393,9 @@ def command_tracking_pose( link_euler = quat_to_xyz(link.get_quat()) if pose_cmd_manager is not None: - command_pos = pose_cmd_manager.command[:, :3] - command_euler = pose_cmd_manager.command[:, 3:6] - else: - command_pos = command[:, :3] - command_euler = command[:, 3:6] + command = pose_cmd_manager.command + command_pos = command[:, :3] + command_euler = command[:, 3:6] pos_error = torch.sum(torch.square(command_pos - link_pos), dim=1) From 172f961ae19f38d60bba2b3d5823dbaa239d5128 Mon Sep 17 00:00:00 2001 From: "vybhav@gmail.com" Date: Tue, 23 Dec 2025 03:38:31 +0530 Subject: [PATCH 7/9] add pos command example --- examples/go2_pos_command/README.md | 38 ++++ examples/go2_pos_command/environment.py | 237 ++++++++++++++++++++++++ examples/go2_pos_command/eval.py | 89 +++++++++ examples/go2_pos_command/train.py | 134 ++++++++++++++ 4 files changed, 498 insertions(+) create mode 100644 examples/go2_pos_command/README.md create mode 100644 examples/go2_pos_command/environment.py create mode 100644 examples/go2_pos_command/eval.py create mode 100644 examples/go2_pos_command/train.py diff --git a/examples/go2_pos_command/README.md b/examples/go2_pos_command/README.md new file mode 100644 index 0000000..347f799 --- /dev/null +++ b/examples/go2_pos_command/README.md @@ -0,0 +1,38 @@ +# Go2 Simple Locomotion Example + +A simple program that teaches the Go2 robot to go to a target position in the world frame. + +This example uses the Genesis Forge managed environment setup, which let's the environment be dedicated more to the scene setup +and reward shaping, than logic to handle domain randomization and logging. + +## Training + +This will be trained using the [rsl_rl](https://github.com/leggedrobotics/rsl_rl) training library. So first, we need to install that and tensorboard: + +```bash +pip install tensorboard rsl-rl-lib>=2.2.4 +``` + +Now you can run the training with: + +```bash +python ./train.py +``` + + +You can view the training progress with: + +```bash +tensorboard --logdir ./logs/ +``` + +The Genesis Forge training environment will also save videos while training that can be viewed in `./logs/go2-pos-command/videos`. + + +## Evaluation + +Now you can view the trained policy: + +```bash +python ./eval.py ./logs/go2-walking/ +``` diff --git a/examples/go2_pos_command/environment.py b/examples/go2_pos_command/environment.py new file mode 100644 index 0000000..9575ed2 --- /dev/null +++ b/examples/go2_pos_command/environment.py @@ -0,0 +1,237 @@ +""" +Simplified Go2 Locomotion Environment using managers to handle everything. +""" + +import torch +import genesis as gs + +from genesis_forge import ManagedEnvironment +from genesis_forge.managers import ( + RewardManager, + TerminationManager, + EntityManager, + ObservationManager, + ActuatorManager, + PositionActionManager, + PositionCommandManager +) +from genesis_forge.mdp import reset, rewards, terminations + + +INITIAL_BODY_POSITION = [0.0, 0.0, 0.4] +INITIAL_QUAT = [1.0, 0.0, 0.0, 0.0] +TARGET_X_VELOCITY = 0.5 + + +class Go2PosEnv(ManagedEnvironment): + """ + Example training environment for the Go2 robot. + """ + + def __init__( + self, + num_envs: int = 1, + dt: float = 1 / 50, # control frequency on real robot is 50hz + max_episode_length_s: int | None = 20, + headless: bool = True, + ): + super().__init__( + num_envs=num_envs, + dt=dt, + max_episode_length_sec=max_episode_length_s, + max_episode_random_scaling=0.1, + ) + + # Construct the scene + self.scene = gs.Scene( + show_viewer=not headless, + sim_options=gs.options.SimOptions(dt=self.dt, substeps=2), + viewer_options=gs.options.ViewerOptions( + max_FPS=int(0.5 / self.dt), + camera_pos=(2.0, 0.0, 2.5), + camera_lookat=(0.0, 0.0, 0.5), + camera_fov=40, + ), + vis_options=gs.options.VisOptions(rendered_envs_idx=list(range(1))), + rigid_options=gs.options.RigidOptions( + dt=self.dt, + constraint_solver=gs.constraint_solver.Newton, + enable_collision=True, + enable_joint_limit=True, + # for this locomotion policy there are usually no more than 30 collision pairs + # set a low value can save memory + max_collision_pairs=30, + ), + ) + + # Create terrain + self.terrain = self.scene.add_entity(gs.morphs.Plane()) + + # Robot + self.robot = self.scene.add_entity( + gs.morphs.URDF( + file="urdf/go2/urdf/go2.urdf", + pos=INITIAL_BODY_POSITION, + quat=INITIAL_QUAT, + ), + ) + + # Camera, for headless video recording + self.camera = self.scene.add_camera( + pos=(-2.5, -1.5, 1.0), + lookat=(0.0, 0.0, 0.0), + res=(1280, 720), + fov=40, + env_idx=0, + debug=True, + ) + + def config(self): + """ + Configure the environment managers + """ + ## + # Robot manager + # i.e. what to do with the robot when it is reset + self.robot_manager = EntityManager( + self, + entity_attr="robot", + on_reset={ + # Reset the robot's initial position + "position": { + "fn": reset.position, + "params": { + "position": INITIAL_BODY_POSITION, + "quat": INITIAL_QUAT, + "zero_velocity": True, + }, + }, + }, + ) + self.position_command_manager = PositionCommandManager( + self, + debug_visualizer=True, + range = { + "x": (-10.0, 10.0), + "y": (-10.0, 10.0), + "z": (0.29, 0.31), + } + ) + ## + # Joint Actions + self.actuator_manager = ActuatorManager( + self, + joint_names=[ + "FL_.*_joint", + "FR_.*_joint", + "RL_.*_joint", + "RR_.*_joint", + ], + default_pos={ + ".*_hip_joint": 0.0, + "FL_thigh_joint": 0.8, + "FR_thigh_joint": 0.8, + "RL_thigh_joint": 1.0, + "RR_thigh_joint": 1.0, + ".*_calf_joint": -1.5, + }, + kp=20, + kv=0.5, + ) + self.action_manager = PositionActionManager( + self, + scale=0.25, + clip=(-100.0, 100.0), + use_default_offset=True, + actuator_manager=self.actuator_manager, + ) + + ## + # Rewards + RewardManager( + self, + logging_enabled=True, + cfg={ + "tracking_lin_vel": { + "weight": 1.0, + "fn": rewards.command_tracking_position, + "params": { + "position_cmd_manager": self.position_command_manager, + "entity_manager": self.robot_manager, + }, + }, + "lin_vel_z": { + "weight": -1.0, + "fn": rewards.lin_vel_z_l2, + "params": { + "entity_manager": self.robot_manager, + }, + }, + "action_rate": { + "weight": -0.005, + "fn": rewards.action_rate_l2, + }, + "similar_to_default": { + "weight": -0.1, + "fn": rewards.dof_similar_to_default, + "params": { + "action_manager": self.action_manager, + }, + }, + }, + ) + + ## + # Termination conditions + self.termination_manager = TerminationManager( + self, + logging_enabled=True, + term_cfg={ + # The episode ended + "timeout": { + "fn": terminations.timeout, + "time_out": True, + }, + # Terminate if the robot's pitch and yaw angles are too large + "fall_over": { + "fn": terminations.bad_orientation, + "params": { + "limit_angle": 10.0, + "entity_manager": self.robot_manager, + }, + }, + }, + ) + + ## + # Observations + ObservationManager( + self, + cfg={ + "angle_velocity": { + "fn": lambda env: self.robot_manager.get_angular_velocity(), + "scale": 0.25, + }, + "linear_velocity": { + "fn": lambda env: self.robot_manager.get_linear_velocity(), + "scale": 2.0, + }, + "projected_gravity": { + "fn": lambda env: self.robot_manager.get_projected_gravity(), + }, + "dof_position": { + "fn": lambda env: self.action_manager.get_dofs_position(), + }, + "dof_velocity": { + "fn": lambda env: self.action_manager.get_dofs_velocity(), + "scale": 0.05, + }, + "actions": { + "fn": lambda env: self.action_manager.get_actions(), + }, + }, + ) + + def build(self): + super().build() + self.camera.follow_entity(self.robot) diff --git a/examples/go2_pos_command/eval.py b/examples/go2_pos_command/eval.py new file mode 100644 index 0000000..28ad7bc --- /dev/null +++ b/examples/go2_pos_command/eval.py @@ -0,0 +1,89 @@ +import os +import glob +import torch +import pickle +import argparse +from importlib import metadata +import genesis as gs + +from genesis_forge.wrappers import RslRlWrapper +from environment import Go2PosEnv + +try: + try: + if metadata.version("rsl-rl"): + raise ImportError + except metadata.PackageNotFoundError: + if metadata.version("rsl-rl-lib").startswith("1."): + raise ImportError +except (metadata.PackageNotFoundError, ImportError) as e: + raise ImportError("Please install install 'rsl-rl-lib>=2.2.4'.") from e +from rsl_rl.runners import OnPolicyRunner + +EXPERIMENT_NAME = "go2-pos-command" + +parser = argparse.ArgumentParser(add_help=True) +parser.add_argument("-d", "--device", type=str, default="gpu") +parser.add_argument("-e", "--exp_name", type=str, default=EXPERIMENT_NAME) +args = parser.parse_args() + + +def get_latest_model(log_dir: str) -> str: + """ + Get the last model from the log directory + """ + model_checkpoints = glob.glob(os.path.join(log_dir, "model_*.pt")) + if len(model_checkpoints) == 0: + print( + f"Warning: No model files found at '{log_dir}' (you might need to train more)." + ) + exit(1) + # Sort by the file with the highest number + sorted_models = sorted( + model_checkpoints, + key=lambda x: int(os.path.basename(x).split("_")[1].split(".")[0]), + ) + return sorted_models[-1] + + +def main(): + # Processor backend (GPU or CPU) + backend = gs.gpu + if args.device == "cpu": + backend = gs.cpu + torch.set_default_device("cpu") + gs.init(logging_level="warning", backend=backend) + + # Load training configuration + log_path = f"./logs/{args.exp_name}" + [cfg] = pickle.load(open(f"{log_path}/cfgs.pkl", "rb")) + model = get_latest_model(log_path) + + # Setup environment + env = Go2PosEnv(num_envs=1, headless=False) + env = RslRlWrapper(env) + env.build() + + # Eval + print("🎬 Loading last model...") + runner = OnPolicyRunner(env, cfg, log_path, device=gs.device) + runner.load(model) + policy = runner.get_inference_policy(device=gs.device) + + try: + obs, _ = env.reset() + with torch.no_grad(): + while True: + actions = policy(obs) + obs, _rews, _dones, _infos = env.step(actions) + except KeyboardInterrupt: + pass + except gs.GenesisException as e: + if e.message != "Viewer closed.": + raise e + except Exception as e: + raise e + + +if __name__ == "__main__": + main() diff --git a/examples/go2_pos_command/train.py b/examples/go2_pos_command/train.py new file mode 100644 index 0000000..48064b5 --- /dev/null +++ b/examples/go2_pos_command/train.py @@ -0,0 +1,134 @@ +import os +import copy +import torch +import shutil +import pickle +import argparse +from importlib import metadata +import genesis as gs + +from genesis_forge.wrappers import ( + VideoWrapper, + RslRlWrapper, +) +from environment import Go2PosEnv + +try: + try: + if metadata.version("rsl-rl"): + raise ImportError + except metadata.PackageNotFoundError: + if metadata.version("rsl-rl-lib").startswith("1."): + raise ImportError +except (metadata.PackageNotFoundError, ImportError) as e: + raise ImportError("Please install install 'rsl-rl-lib>=2.2.4'.") from e +from rsl_rl.runners import OnPolicyRunner + +EXPERIMENT_NAME = "go2-pos-command" + +parser = argparse.ArgumentParser(add_help=True) +parser.add_argument("-n", "--num_envs", type=int, default=4096) +parser.add_argument("--max_iterations", type=int, default=101) +parser.add_argument("-d", "--device", type=str, default="gpu") +parser.add_argument("-e", "--exp_name", type=str, default=EXPERIMENT_NAME) +args = parser.parse_args() + + +def training_cfg(exp_name: str, max_iterations: int): + return { + "algorithm": { + "class_name": "PPO", + "clip_param": 0.2, + "desired_kl": 0.01, + "entropy_coef": 0.01, + "gamma": 0.99, + "lam": 0.95, + "learning_rate": 0.001, + "max_grad_norm": 1.0, + "num_learning_epochs": 5, + "num_mini_batches": 4, + "schedule": "adaptive", + "use_clipped_value_loss": True, + "value_loss_coef": 1.0, + }, + "init_member_classes": {}, + "policy": { + "activation": "elu", + "actor_hidden_dims": [512, 256, 128], + "critic_hidden_dims": [512, 256, 128], + "init_noise_std": 1.0, + "class_name": "ActorCritic", + }, + "runner": { + "checkpoint": -1, + "experiment_name": exp_name, + "load_run": -1, + "log_interval": 1, + "max_iterations": max_iterations, + "record_interval": -1, + "resume": False, + "resume_path": None, + "run_name": "", + }, + "runner_class_name": "OnPolicyRunner", + "seed": 1, + "num_steps_per_env": 24, + "save_interval": 100, + "empirical_normalization": None, + "obs_groups": {"policy": ["policy"], "critic": ["policy"]}, + } + + +def main(): + # Initialize Genesis + # Processor backend (GPU or CPU) + backend = gs.gpu + if args.device == "cpu": + backend = gs.cpu + torch.set_default_device("cpu") + gs.init(logging_level="warning", backend=backend) + + # Logging directory + log_base_dir = "./logs" + experiment_name = args.exp_name + log_path = os.path.join(log_base_dir, experiment_name) + if os.path.exists(log_path): + shutil.rmtree(log_path) + os.makedirs(log_path, exist_ok=True) + print(f"Logging to: {log_path}") + + # Load training configuration and save snapshot of training configs + cfg = training_cfg(experiment_name, args.max_iterations) + pickle.dump( + [cfg], + open(os.path.join(log_path, "cfgs.pkl"), "wb"), + ) + + # Create environment + env = Go2PosEnv(num_envs=args.num_envs, headless=True) + + # Record videos in regular intervals + env = VideoWrapper( + env, + video_length_sec=12, + out_dir=os.path.join(log_path, "videos"), + episode_trigger=lambda episode_id: episode_id % 5 == 0, + ) + + # Build the environment + env = RslRlWrapper(env) + env.build() + env.reset() + + # Train + print("💪 Training model...") + runner = OnPolicyRunner(env, copy.deepcopy(cfg), log_path, device=gs.device) + runner.git_status_repos = ["."] + runner.learn( + num_learning_iterations=args.max_iterations, init_at_random_ep_len=False + ) + env.close() + + +if __name__ == "__main__": + main() From b88103a9a54d58594869c097fc42611253607f51 Mon Sep 17 00:00:00 2001 From: "vybhav@gmail.com" Date: Mon, 29 Dec 2025 13:03:47 +0530 Subject: [PATCH 8/9] modify example name, modify comments, modify reward functions --- examples/position_command/README.md | 38 +++ examples/position_command/environment.py | 244 ++++++++++++++++++ examples/position_command/eval.py | 89 +++++++ examples/position_command/train.py | 134 ++++++++++ .../managers/command/pose_command.py | 4 +- .../managers/command/position_command.py | 6 +- genesis_forge/mdp/rewards.py | 8 +- 7 files changed, 514 insertions(+), 9 deletions(-) create mode 100644 examples/position_command/README.md create mode 100644 examples/position_command/environment.py create mode 100644 examples/position_command/eval.py create mode 100644 examples/position_command/train.py diff --git a/examples/position_command/README.md b/examples/position_command/README.md new file mode 100644 index 0000000..347f799 --- /dev/null +++ b/examples/position_command/README.md @@ -0,0 +1,38 @@ +# Go2 Simple Locomotion Example + +A simple program that teaches the Go2 robot to go to a target position in the world frame. + +This example uses the Genesis Forge managed environment setup, which let's the environment be dedicated more to the scene setup +and reward shaping, than logic to handle domain randomization and logging. + +## Training + +This will be trained using the [rsl_rl](https://github.com/leggedrobotics/rsl_rl) training library. So first, we need to install that and tensorboard: + +```bash +pip install tensorboard rsl-rl-lib>=2.2.4 +``` + +Now you can run the training with: + +```bash +python ./train.py +``` + + +You can view the training progress with: + +```bash +tensorboard --logdir ./logs/ +``` + +The Genesis Forge training environment will also save videos while training that can be viewed in `./logs/go2-pos-command/videos`. + + +## Evaluation + +Now you can view the trained policy: + +```bash +python ./eval.py ./logs/go2-walking/ +``` diff --git a/examples/position_command/environment.py b/examples/position_command/environment.py new file mode 100644 index 0000000..3e8af7b --- /dev/null +++ b/examples/position_command/environment.py @@ -0,0 +1,244 @@ +""" +Simplified Go2 Locomotion Environment using managers to handle everything. +""" + +import torch +import genesis as gs + +from genesis_forge import ManagedEnvironment +from genesis_forge.managers import ( + RewardManager, + TerminationManager, + EntityManager, + ObservationManager, + ActuatorManager, + PositionActionManager, + PositionCommandManager +) +from genesis_forge.mdp import reset, rewards, terminations + + +INITIAL_BODY_POSITION = [0.0, 0.0, 0.4] +INITIAL_QUAT = [1.0, 0.0, 0.0, 0.0] +TARGET_X_VELOCITY = 0.5 + + +class Go2PosEnv(ManagedEnvironment): + """ + Example training environment for the Go2 robot. + """ + + def __init__( + self, + num_envs: int = 1, + dt: float = 1 / 50, # control frequency on real robot is 50hz + max_episode_length_s: int | None = 200, + headless: bool = True, + ): + super().__init__( + num_envs=num_envs, + dt=dt, + max_episode_length_sec=max_episode_length_s, + max_episode_random_scaling=0.1, + ) + + # Construct the scene + self.scene = gs.Scene( + show_viewer=not headless, + sim_options=gs.options.SimOptions(dt=self.dt, substeps=2), + viewer_options=gs.options.ViewerOptions( + max_FPS=int(0.5 / self.dt), + camera_pos=(2.0, 0.0, 2.5), + camera_lookat=(0.0, 0.0, 0.5), + camera_fov=40, + ), + vis_options=gs.options.VisOptions(rendered_envs_idx=list(range(1))), + rigid_options=gs.options.RigidOptions( + dt=self.dt, + constraint_solver=gs.constraint_solver.Newton, + enable_collision=True, + enable_joint_limit=True, + # for this locomotion policy there are usually no more than 30 collision pairs + # set a low value can save memory + max_collision_pairs=30, + ), + ) + + # Create terrain + self.terrain = self.scene.add_entity(gs.morphs.Plane()) + + # Robot + self.robot = self.scene.add_entity( + gs.morphs.URDF( + file="urdf/go2/urdf/go2.urdf", + pos=INITIAL_BODY_POSITION, + quat=INITIAL_QUAT, + ), + ) + + # Camera, for headless video recording + self.camera = self.scene.add_camera( + pos=(-2.5, -1.5, 1.0), + lookat=(0.0, 0.0, 0.0), + res=(1280, 720), + fov=40, + env_idx=0, + debug=True, + ) + + def config(self): + """ + Configure the environment managers + """ + ## + # Robot manager + # i.e. what to do with the robot when it is reset + self.robot_manager = EntityManager( + self, + entity_attr="robot", + on_reset={ + # Reset the robot's initial position + "position": { + "fn": reset.position, + "params": { + "position": INITIAL_BODY_POSITION, + "quat": INITIAL_QUAT, + "zero_velocity": True, + }, + }, + }, + ) + self.position_command_manager = PositionCommandManager( + self, + debug_visualizer=True, + range={ + "x": (-3.0, 3.0), + "y": (-3.0, 3.0), + "z": (0.29, 0.31), + }, + resample_time_sec=5.0, + ) + ## + # Joint Actions + self.actuator_manager = ActuatorManager( + self, + joint_names=[ + "FL_.*_joint", + "FR_.*_joint", + "RL_.*_joint", + "RR_.*_joint", + ], + default_pos={ + ".*_hip_joint": 0.0, + "FL_thigh_joint": 0.8, + "FR_thigh_joint": 0.8, + "RL_thigh_joint": 1.0, + "RR_thigh_joint": 1.0, + ".*_calf_joint": -1.5, + }, + kp=20, + kv=0.5, + ) + self.action_manager = PositionActionManager( + self, + scale=0.25, + clip=(-100.0, 100.0), + use_default_offset=True, + actuator_manager=self.actuator_manager, + ) + + ## + # Rewards + RewardManager( + self, + logging_enabled=True, + cfg={ + "tracking_pos": { + "weight": 5.0, + "fn": rewards.command_tracking_position, + "params": { + "position_cmd_manager": self.position_command_manager, + "entity_manager": self.robot_manager, + "sensitivity": 0.25, + }, + }, + "lin_vel_z": { + "weight": -1.0, + "fn": rewards.lin_vel_z_l2, + "params": { + "entity_manager": self.robot_manager, + }, + }, + "action_rate": { + "weight": -0.005, + "fn": rewards.action_rate_l2, + }, + "similar_to_default": { + "weight": -0.1, + "fn": rewards.dof_similar_to_default, + "params": { + "action_manager": self.action_manager, + }, + }, + }, + ) + + ## + # Termination conditions + self.termination_manager = TerminationManager( + self, + logging_enabled=True, + term_cfg={ + # The episode ended + "timeout": { + "fn": terminations.timeout, + "time_out": True, + }, + # Terminate if the robot's pitch and yaw angles are too large + "fall_over": { + "fn": terminations.bad_orientation, + "params": { + "limit_angle": 25.0, + "entity_manager": self.robot_manager, + }, + }, + }, + ) + + ## + # Observations + ObservationManager( + self, + cfg={ + "command": { + "fn": self.position_command_manager.observation, + "scale": 1.0, + }, + "position": { + "fn": lambda env: self.robot_manager.base_pos, + "scale": 1.0, + }, + "linear_velocity": { + "fn": lambda env: self.robot_manager.get_linear_velocity(), + "scale": 0.5, + }, + "projected_gravity": { + "fn": lambda env: self.robot_manager.get_projected_gravity(), + }, + "dof_position": { + "fn": lambda env: self.action_manager.get_dofs_position(), + }, + "dof_velocity": { + "fn": lambda env: self.action_manager.get_dofs_velocity(), + "scale": 0.05, + }, + "actions": { + "fn": lambda env: self.action_manager.get_actions(), + }, + }, + ) + + def build(self): + super().build() + self.camera.follow_entity(self.robot) + diff --git a/examples/position_command/eval.py b/examples/position_command/eval.py new file mode 100644 index 0000000..28ad7bc --- /dev/null +++ b/examples/position_command/eval.py @@ -0,0 +1,89 @@ +import os +import glob +import torch +import pickle +import argparse +from importlib import metadata +import genesis as gs + +from genesis_forge.wrappers import RslRlWrapper +from environment import Go2PosEnv + +try: + try: + if metadata.version("rsl-rl"): + raise ImportError + except metadata.PackageNotFoundError: + if metadata.version("rsl-rl-lib").startswith("1."): + raise ImportError +except (metadata.PackageNotFoundError, ImportError) as e: + raise ImportError("Please install install 'rsl-rl-lib>=2.2.4'.") from e +from rsl_rl.runners import OnPolicyRunner + +EXPERIMENT_NAME = "go2-pos-command" + +parser = argparse.ArgumentParser(add_help=True) +parser.add_argument("-d", "--device", type=str, default="gpu") +parser.add_argument("-e", "--exp_name", type=str, default=EXPERIMENT_NAME) +args = parser.parse_args() + + +def get_latest_model(log_dir: str) -> str: + """ + Get the last model from the log directory + """ + model_checkpoints = glob.glob(os.path.join(log_dir, "model_*.pt")) + if len(model_checkpoints) == 0: + print( + f"Warning: No model files found at '{log_dir}' (you might need to train more)." + ) + exit(1) + # Sort by the file with the highest number + sorted_models = sorted( + model_checkpoints, + key=lambda x: int(os.path.basename(x).split("_")[1].split(".")[0]), + ) + return sorted_models[-1] + + +def main(): + # Processor backend (GPU or CPU) + backend = gs.gpu + if args.device == "cpu": + backend = gs.cpu + torch.set_default_device("cpu") + gs.init(logging_level="warning", backend=backend) + + # Load training configuration + log_path = f"./logs/{args.exp_name}" + [cfg] = pickle.load(open(f"{log_path}/cfgs.pkl", "rb")) + model = get_latest_model(log_path) + + # Setup environment + env = Go2PosEnv(num_envs=1, headless=False) + env = RslRlWrapper(env) + env.build() + + # Eval + print("🎬 Loading last model...") + runner = OnPolicyRunner(env, cfg, log_path, device=gs.device) + runner.load(model) + policy = runner.get_inference_policy(device=gs.device) + + try: + obs, _ = env.reset() + with torch.no_grad(): + while True: + actions = policy(obs) + obs, _rews, _dones, _infos = env.step(actions) + except KeyboardInterrupt: + pass + except gs.GenesisException as e: + if e.message != "Viewer closed.": + raise e + except Exception as e: + raise e + + +if __name__ == "__main__": + main() diff --git a/examples/position_command/train.py b/examples/position_command/train.py new file mode 100644 index 0000000..6590d1a --- /dev/null +++ b/examples/position_command/train.py @@ -0,0 +1,134 @@ +import os +import copy +import torch +import shutil +import pickle +import argparse +from importlib import metadata +import genesis as gs + +from genesis_forge.wrappers import ( + VideoWrapper, + RslRlWrapper, +) +from environment import Go2PosEnv + +try: + try: + if metadata.version("rsl-rl"): + raise ImportError + except metadata.PackageNotFoundError: + if metadata.version("rsl-rl-lib").startswith("1."): + raise ImportError +except (metadata.PackageNotFoundError, ImportError) as e: + raise ImportError("Please install install 'rsl-rl-lib>=2.2.4'.") from e +from rsl_rl.runners import OnPolicyRunner + +EXPERIMENT_NAME = "go2-pos-command" + +parser = argparse.ArgumentParser(add_help=True) +parser.add_argument("-n", "--num_envs", type=int, default=4096) +parser.add_argument("--max_iterations", type=int, default=400) +parser.add_argument("-d", "--device", type=str, default="gpu") +parser.add_argument("-e", "--exp_name", type=str, default=EXPERIMENT_NAME) +args = parser.parse_args() + + +def training_cfg(exp_name: str, max_iterations: int): + return { + "algorithm": { + "class_name": "PPO", + "clip_param": 0.2, + "desired_kl": 0.01, + "entropy_coef": 0.01, + "gamma": 0.99, + "lam": 0.95, + "learning_rate": 0.001, + "max_grad_norm": 1.0, + "num_learning_epochs": 5, + "num_mini_batches": 4, + "schedule": "adaptive", + "use_clipped_value_loss": True, + "value_loss_coef": 1.0, + }, + "init_member_classes": {}, + "policy": { + "activation": "elu", + "actor_hidden_dims": [512, 256, 128], + "critic_hidden_dims": [512, 256, 128], + "init_noise_std": 1.0, + "class_name": "ActorCritic", + }, + "runner": { + "checkpoint": -1, + "experiment_name": exp_name, + "load_run": -1, + "log_interval": 1, + "max_iterations": max_iterations, + "record_interval": -1, + "resume": False, + "resume_path": None, + "run_name": "", + }, + "runner_class_name": "OnPolicyRunner", + "seed": 1, + "num_steps_per_env": 24, + "save_interval": 100, + "empirical_normalization": None, + "obs_groups": {"policy": ["policy"], "critic": ["policy"]}, + } + + +def main(): + # Initialize Genesis + # Processor backend (GPU or CPU) + backend = gs.gpu + if args.device == "cpu": + backend = gs.cpu + torch.set_default_device("cpu") + gs.init(logging_level="warning", backend=backend) + + # Logging directory + log_base_dir = "./logs" + experiment_name = args.exp_name + log_path = os.path.join(log_base_dir, experiment_name) + if os.path.exists(log_path): + shutil.rmtree(log_path) + os.makedirs(log_path, exist_ok=True) + print(f"Logging to: {log_path}") + + # Load training configuration and save snapshot of training configs + cfg = training_cfg(experiment_name, args.max_iterations) + pickle.dump( + [cfg], + open(os.path.join(log_path, "cfgs.pkl"), "wb"), + ) + + # Create environment + env = Go2PosEnv(num_envs=args.num_envs, headless=True) + + # Record videos in regular intervals + env = VideoWrapper( + env, + video_length_sec=12, + out_dir=os.path.join(log_path, "videos"), + episode_trigger=lambda episode_id: episode_id % 5 == 0, + ) + + # Build the environment + env = RslRlWrapper(env) + env.build() + env.reset() + + # Train + print("💪 Training model...") + runner = OnPolicyRunner(env, copy.deepcopy(cfg), log_path, device=gs.device) + runner.git_status_repos = ["."] + runner.learn( + num_learning_iterations=args.max_iterations, init_at_random_ep_len=False + ) + env.close() + + +if __name__ == "__main__": + main() diff --git a/genesis_forge/managers/command/pose_command.py b/genesis_forge/managers/command/pose_command.py index bc3ea02..2cd0e9a 100644 --- a/genesis_forge/managers/command/pose_command.py +++ b/genesis_forge/managers/command/pose_command.py @@ -49,7 +49,7 @@ class PoseCommandManager(CommandManager): Generates a pose command from uniform distribution. The command comprises of a (x,y,z) position and (x,y,z) euler angles - IMPORTANT: The position commands are interpreted as world-relative coordinates: + IMPORTANT: The pose commands are interpreted as world-relative coordinates: - pos-X: x coordinate of the target position - pos-Y: y coordinate of the target position - pos-Z: z coordinate of the target position @@ -76,7 +76,7 @@ class PoseCommandManager(CommandManager): class MyEnv(GenesisEnv): def config(self): - # Create a velocity command manager + # Create a pose command manager self.pose_command_manager = PoseCommandManager( self, debug_visualizer=True, diff --git a/genesis_forge/managers/command/position_command.py b/genesis_forge/managers/command/position_command.py index 8baba1d..c2bb900 100644 --- a/genesis_forge/managers/command/position_command.py +++ b/genesis_forge/managers/command/position_command.py @@ -34,7 +34,7 @@ class PositionDebugVisualizerConfig(TypedDict): DEFAULT_VISUALIZER_CONFIG: PositionDebugVisualizerConfig = { "envs_idx": None, - "sphere_offset": 0.03, + "sphere_offset": 0.1, "sphere_radius": 0.02, "commanded_color": (0.0, 0.5, 0.0, 1.0), } @@ -62,14 +62,14 @@ class PositionCommandManager(CommandManager): env: The environment to control range: The ranges of linear & angular velocities resample_time_sec: The time interval between changing the command - debug_visualizer: Enable the debug arrow visualization + debug_visualizer: Enable the debug sphere visualization debug_visualizer_cfg: The configuration for the debug visualizer Example:: class MyEnv(GenesisEnv): def config(self): - # Create a velocity command manager + # Create a position command manager self.position_command_manager = PositionCommandManager( self, debug_visualizer=True, diff --git a/genesis_forge/mdp/rewards.py b/genesis_forge/mdp/rewards.py index 5b45f75..83e3307 100644 --- a/genesis_forge/mdp/rewards.py +++ b/genesis_forge/mdp/rewards.py @@ -336,8 +336,8 @@ def command_tracking_position( if position_cmd_manager is not None: command = position_cmd_manager.command - base_pos_error = torch.sum(torch.square(command - pos), dim=1) - return torch.square(-base_pos_error / sensitivity) + pos_error = torch.sum(torch.square(command - pos), dim=1) + return torch.exp(-pos_error / sensitivity) """ @@ -409,8 +409,8 @@ def command_tracking_pose( dim=1, ) - pos_penalty = torch.square(-pos_error / pos_sensitivity) - euler_penalty = torch.square(-euler_error / euler_sensitivity) + pos_penalty = torch.exp(-pos_error / pos_sensitivity) + euler_penalty = torch.exp(-euler_error / euler_sensitivity) total_penalty = pos_penalty + euler_penalty From b034db8f0f4b88681075ba55d633ab847ebe3db9 Mon Sep 17 00:00:00 2001 From: "vybhav@gmail.com" Date: Mon, 29 Dec 2025 16:51:12 +0530 Subject: [PATCH 9/9] add all spheres together in position_command_manager --- .../managers/command/pose_command.py | 2 +- .../managers/command/position_command.py | 31 +++++++++---------- 2 files changed, 16 insertions(+), 17 deletions(-) diff --git a/genesis_forge/managers/command/pose_command.py b/genesis_forge/managers/command/pose_command.py index 2cd0e9a..044453a 100644 --- a/genesis_forge/managers/command/pose_command.py +++ b/genesis_forge/managers/command/pose_command.py @@ -121,7 +121,7 @@ def __init__( debug_visualizer_cfg: PoseDebugVisualizerConfig = DEFAULT_VISUALIZER_CONFIG, ): super().__init__(env, range=range, resample_time_sec=resample_time_sec) - self._sphere_nodes: list = [] + self._arrow_nodes: list = [] self.debug_visualizer = debug_visualizer self.visualizer_cfg = {**DEFAULT_VISUALIZER_CONFIG, **debug_visualizer_cfg} self.debug_envs_idx = None diff --git a/genesis_forge/managers/command/position_command.py b/genesis_forge/managers/command/position_command.py index c2bb900..fd9242e 100644 --- a/genesis_forge/managers/command/position_command.py +++ b/genesis_forge/managers/command/position_command.py @@ -114,7 +114,7 @@ def __init__( debug_visualizer_cfg: PositionDebugVisualizerConfig = DEFAULT_VISUALIZER_CONFIG, ): super().__init__(env, range=range, resample_time_sec=resample_time_sec) - self._sphere_nodes: list = [] + self._sphere_nodes = None self.debug_visualizer = debug_visualizer self.visualizer_cfg = {**DEFAULT_VISUALIZER_CONFIG, **debug_visualizer_cfg} self.debug_envs_idx = None @@ -186,29 +186,28 @@ def _render_sphere(self): return # Remove existing spheres - for sphere in self._sphere_nodes: - self.env.scene.clear_debug_object(sphere) - self._sphere_nodes = [] - - for i in self.debug_envs_idx: - # Target sphere in the world frame for visualization) - self._draw_sphere( - pos=self.command[i], - color=self.visualizer_cfg["commanded_color"], - ) + if self._sphere_nodes is not None: + self.env.scene.clear_debug_object(self._sphere_nodes) + self._sphere_nodes = None + + # Target sphere in the world frame for visualization) + self._draw_spheres( + pos=self.command[self.debug_envs_idx], + color=self.visualizer_cfg["commanded_color"], + ) - def _draw_sphere( + def _draw_spheres( self, pos: torch.Tensor, color: list[float], ): try: - node = self.env.scene.draw_debug_sphere( - pos=pos.cpu().numpy(), + nodes = self.env.scene.draw_debug_spheres( + poss=pos.cpu().numpy(), color=color, radius=self.visualizer_cfg["sphere_radius"], ) - if node: - self._sphere_nodes.append(node) + if nodes: + self._sphere_nodes=nodes except Exception as e: print(f"Error adding debug visualizing in PositionCommandManager: {e}")