diff --git a/requirements.txt b/requirements.txt index 580be294..88d651fc 100644 --- a/requirements.txt +++ b/requirements.txt @@ -17,4 +17,5 @@ python-can ultralytics onnx onnxruntime -odrive \ No newline at end of file +odrive +smbus2 \ No newline at end of file diff --git a/src/HW-Devices/servo_pkg/servo_pkg/pdb_controller.py b/src/HW-Devices/servo_pkg/servo_pkg/pdb_controller.py new file mode 100644 index 00000000..8dc434e1 --- /dev/null +++ b/src/HW-Devices/servo_pkg/servo_pkg/pdb_controller.py @@ -0,0 +1,97 @@ +import rclpy +from std_msgs.msg import Float32 +from smbus2 import SMBus + +from servo_pkg.parent_config import Parent_Config + +I2C_DEFAULT_ADDR = 0x21 + +HW_NUM_SERVOS = 4 + +REG_SERVO1_HIGH = 0x00 + + +def convert_from_radians(angle, servo_info): + total_range = servo_info.max - servo_info.min + return int(servo_info.min + (total_range * angle / servo_info.rom)) + + +def convert_to_radians(value, servo_info): + total_range = servo_info.max - servo_info.min + return servo_info.rom * (value - servo_info.min) / total_range + + +class PDB_Servo(Parent_Config): + def __init__(self): + super().__init__("pdb_servo") + + # I2C parameters + self.declare_parameter("i2c_bus", 1) # /dev/i2c-1 on most SBCs + self.declare_parameter("i2c_address", I2C_DEFAULT_ADDR) + + self.i2c_bus = self.get_parameter("i2c_bus").get_parameter_value().integer_value + self.i2c_addr = ( + self.get_parameter("i2c_address").get_parameter_value().integer_value + ) + + self.bus = SMBus(self.i2c_bus) + + self.subs = {} + for port in range(HW_NUM_SERVOS): + self.subs[port] = self.create_subscription( + Float32, + f"/{self.servo_info[port].motor_name}", + lambda msg, port=port: self.set_position(msg, port), + 3, + ) + self.get_logger().info(f"Topic: /{self.servo_info[port].motor_name}") + self._last_pwm_us = {p: None for p in range(HW_NUM_SERVOS)} + + def _write_servo_us(self, port: int, pwm_us: int): + base_reg = REG_SERVO1_HIGH + (2 * port) + high = (pwm_us >> 8) & 0xFF + low = pwm_us & 0xFF + self.bus.write_i2c_block_data(self.i2c_addr, base_reg, [high, low]) + + def set_position(self, msg, port): + self.check_valid_servo(port) + servo_info = self.servo_info[port] + + target_value_us = convert_from_radians(msg.data, servo_info) + current_position = ( + None + if self._last_pwm_us[port] is None + else convert_to_radians(self._last_pwm_us[port], servo_info) + ) + + if not (servo_info.min <= target_value_us <= servo_info.max): + self.get_logger().warning( + f"Servo {port} input out of range. " f"Last angle: {current_position}" + ) + return + + self._write_servo_us(port, target_value_us) + self._last_pwm_us[port] = target_value_us + + new_angle = convert_to_radians(target_value_us, servo_info) + self.get_logger().info( + f"Servo {port} -> angle: {new_angle} rad, PWM: {target_value_us} us" + ) + + def destroy_node(self): + try: + self.bus.close() + finally: + super().destroy_node() + + +def main(args=None): + rclpy.init(args=args) + node = PDB_Servo() + rclpy.spin(node) + node.destroy_node() + rclpy.shutdown() + + +if __name__ == "__main__": + main() diff --git a/src/HW-Devices/servo_pkg/setup.py b/src/HW-Devices/servo_pkg/setup.py index 7613d749..afba0e4a 100644 --- a/src/HW-Devices/servo_pkg/setup.py +++ b/src/HW-Devices/servo_pkg/setup.py @@ -32,6 +32,7 @@ "servo_client = servo_pkg.servo_client:main", "i2c_Servo = servo_pkg.i2c_Servo:main", "pi_Servo = servo_pkg.pi_Servo:main", + "pdb_controller = servo_pkg.pdb_controller:main", ], }, ) diff --git a/src/Nav/aruco_localizer/scripts/convert_mrcal_to_opencv_ros.py b/src/Nav/aruco_localizer/scripts/convert_mrcal_to_opencv_ros.py index cb51b660..8ab97fac 100644 --- a/src/Nav/aruco_localizer/scripts/convert_mrcal_to_opencv_ros.py +++ b/src/Nav/aruco_localizer/scripts/convert_mrcal_to_opencv_ros.py @@ -145,9 +145,11 @@ def main(): "distortion_coefficients": { "rows": 1, "cols": len(ros_dist_coeffs), - "data": ros_dist_coeffs.tolist() - if isinstance(ros_dist_coeffs, np.ndarray) - else list(ros_dist_coeffs), + "data": ( + ros_dist_coeffs.tolist() + if isinstance(ros_dist_coeffs, np.ndarray) + else list(ros_dist_coeffs) + ), }, "rectification_matrix": {"rows": 3, "cols": 3, "data": R.flatten().tolist()}, "projection_matrix": {"rows": 3, "cols": 4, "data": P.flatten().tolist()},