Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 39 additions & 3 deletions mini_bdx_runtime/mini_bdx_runtime/xbox_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from threading import Thread
from queue import Queue
import time
import os
import numpy as np
from mini_bdx_runtime.buttons import Buttons

Expand Down Expand Up @@ -40,13 +41,31 @@ def __init__(self, command_freq, only_head_control=False):
self.RB_pressed = False

self.buttons = Buttons()
self.is_connected = True

Thread(target=self.commands_worker, daemon=True).start()

def commands_worker(self):
while True:
self.cmd_queue.put(self.get_commands())
time.sleep(1 / self.command_freq)
# Find the joystick device path (Linux-specific)
self.joystick_device_path = None
for i in range(10):
path = f"/dev/input/js{i}"
if os.path.exists(path):
self.joystick_device_path = path
break

try:
while self.is_connected:
# Check if joystick device still exists (Linux)
if self.joystick_device_path and not os.path.exists(self.joystick_device_path):
print(f"Controller disconnected ({self.joystick_device_path} no longer exists)")
self.is_connected = False
break
self.cmd_queue.put(self.get_commands())
time.sleep(1 / self.command_freq)
except Exception as e:
print(f"Controller disconnected: {e}")
self.is_connected = False

def get_commands(self):
last_commands = self.last_commands
Expand Down Expand Up @@ -118,6 +137,23 @@ def get_commands(self):
last_commands[6] = head_roll

for event in pygame.event.get():
# Check for controller disconnection
if event.type == pygame.JOYDEVICEREMOVED:
print("Controller removed event detected")
self.is_connected = False
return (
np.around(last_commands, 3),
self.A_pressed,
self.B_pressed,
self.X_pressed,
self.Y_pressed,
self.LB_pressed,
self.RB_pressed,
left_trigger,
right_trigger,
0,
)

if event.type == pygame.JOYBUTTONDOWN:

if self.p1.get_button(0): # A button
Expand Down
67 changes: 63 additions & 4 deletions scripts/v2_rl_walk_mujoco.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import pickle

import numpy as np
import pygame
from mini_bdx_runtime.rustypot_position_hwi import HWI
from mini_bdx_runtime.onnx_infer import OnnxInfer

Expand Down Expand Up @@ -92,11 +93,15 @@ def __init__(

self.last_commands = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]

self.paused = self.duck_config.start_paused
self.user_paused = self.duck_config.start_paused # User's desired pause state (config or A button)
self.paused = self.user_paused # Actual pause state (can be forced by missing controller)

self.command_freq = 20 # hz
self.xbox_controller = None
self.last_controller_check = 0
self.controller_check_interval = 1.0 # Check for controller every second when disconnected
if self.commands:
self.xbox_controller = XBoxController(self.command_freq)
self._try_connect_controller()

# Reference motion, but we only really need the length of one phase
# TODO
Expand All @@ -120,6 +125,48 @@ def __init__(
if self.duck_config.antennas:
self.antennas = Antennas()

def _try_connect_controller(self, silent=False):
"""Try to connect to the Xbox controller. Returns True if successful."""
try:
# Initialize pygame if not already done
if not pygame.get_init():
pygame.init()

# Reinit joystick subsystem to detect newly connected controllers
pygame.joystick.quit()
pygame.joystick.init()

if pygame.joystick.get_count() == 0:
if not silent:
print("No controller detected. Waiting for controller...")
self.xbox_controller = None
self.paused = True
return False

self.xbox_controller = XBoxController(self.command_freq)
print("Controller connected!")
self.paused = self.user_paused # Restore user's desired pause state
if self.paused:
print("Droid is paused (start_paused=True). Press A to unpause.")
else:
print("Unpausing droid.")
return True
except Exception as e:
if not silent:
print(f"WARNING: Could not initialize Xbox controller: {e}")
print("Pausing droid. Waiting for controller...")
self.xbox_controller = None
self.paused = True
return False

def _handle_controller_disconnect(self):
"""Handle controller disconnection."""
print("Controller disconnected! Pausing droid...")
self.xbox_controller = None
self.user_paused = True # Require manual unpause after reconnection
self.paused = True
self.last_commands = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]

def get_obs(self):

imu_data = self.imu.get_data()
Expand Down Expand Up @@ -206,7 +253,18 @@ def run(self):
right_trigger = 0
t = time.time()

if self.commands:
# Check for controller connection (only when paused and no controller)
if self.commands and self.paused and self.xbox_controller is None:
if t - self.last_controller_check >= self.controller_check_interval:
self.last_controller_check = t
self._try_connect_controller(silent=True)

# Check if controller is still connected
if self.commands and self.xbox_controller is not None:
if not self.xbox_controller.is_connected:
self._handle_controller_disconnect()

if self.commands and self.xbox_controller is not None:
self.last_commands, self.buttons, left_trigger, right_trigger = (
self.xbox_controller.get_last_command()
)
Expand Down Expand Up @@ -240,7 +298,8 @@ def run(self):
self.antennas.set_position_right(left_trigger)

if self.buttons.A.triggered:
self.paused = not self.paused
self.user_paused = not self.user_paused
self.paused = self.user_paused
if self.paused:
print("PAUSE")
else:
Expand Down