Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ python-can
ultralytics
onnx
onnxruntime
odrive
odrive
smbus2
97 changes: 97 additions & 0 deletions src/HW-Devices/servo_pkg/servo_pkg/pdb_controller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import rclpy
from std_msgs.msg import Float32
from smbus2 import SMBus

from servo_pkg.parent_config import Parent_Config

I2C_DEFAULT_ADDR = 0x21

HW_NUM_SERVOS = 4

REG_SERVO1_HIGH = 0x00


def convert_from_radians(angle, servo_info):
total_range = servo_info.max - servo_info.min
return int(servo_info.min + (total_range * angle / servo_info.rom))


def convert_to_radians(value, servo_info):
total_range = servo_info.max - servo_info.min
return servo_info.rom * (value - servo_info.min) / total_range

Comment on lines +14 to +22
Copy link

Copilot AI Feb 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The radians<->PWM conversion helpers are duplicated from servo_pkg/USB_Servo.py (same formulas). Consider moving these conversions into a shared utility module (e.g., servo_pkg/conversions.py) and importing from both controllers to avoid future drift if the mapping ever changes.

Copilot uses AI. Check for mistakes.

class PDB_Servo(Parent_Config):
def __init__(self):
super().__init__("pdb_servo")

# I2C parameters
self.declare_parameter("i2c_bus", 1) # /dev/i2c-1 on most SBCs
self.declare_parameter("i2c_address", I2C_DEFAULT_ADDR)

self.i2c_bus = self.get_parameter("i2c_bus").get_parameter_value().integer_value
self.i2c_addr = (
self.get_parameter("i2c_address").get_parameter_value().integer_value
)

self.bus = SMBus(self.i2c_bus)

self.subs = {}
for port in range(HW_NUM_SERVOS):
self.subs[port] = self.create_subscription(
Float32,
f"/{self.servo_info[port].motor_name}",
lambda msg, port=port: self.set_position(msg, port),
3,
)
self.get_logger().info(f"Topic: /{self.servo_info[port].motor_name}")
self._last_pwm_us = {p: None for p in range(HW_NUM_SERVOS)}

def _write_servo_us(self, port: int, pwm_us: int):
base_reg = REG_SERVO1_HIGH + (2 * port)
high = (pwm_us >> 8) & 0xFF
low = pwm_us & 0xFF
self.bus.write_i2c_block_data(self.i2c_addr, base_reg, [high, low])

def set_position(self, msg, port):
self.check_valid_servo(port)
servo_info = self.servo_info[port]

target_value_us = convert_from_radians(msg.data, servo_info)
current_position = (
None
if self._last_pwm_us[port] is None
else convert_to_radians(self._last_pwm_us[port], servo_info)
)

if not (servo_info.min <= target_value_us <= servo_info.max):
self.get_logger().warning(
f"Servo {port} input out of range. " f"Last angle: {current_position}"
Copy link

Copilot AI Feb 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The out-of-range warning logs Last angle: {current_position} rather than the requested angle (msg.data) that was actually out of range. This makes the warning misleading when debugging; include the requested angle (and optionally the valid range) in the message.

Suggested change
f"Servo {port} input out of range. " f"Last angle: {current_position}"
f"Servo {port} input out of range. "
f"Requested angle: {msg.data} rad, "
f"valid PWM range: [{servo_info.min}, {servo_info.max}] us, "
f"last angle: {current_position}"

Copilot uses AI. Check for mistakes.
)
return

self._write_servo_us(port, target_value_us)
self._last_pwm_us[port] = target_value_us

new_angle = convert_to_radians(target_value_us, servo_info)
self.get_logger().info(
f"Servo {port} -> angle: {new_angle} rad, PWM: {target_value_us} us"
)

def destroy_node(self):
try:
self.bus.close()
finally:
super().destroy_node()


def main(args=None):
rclpy.init(args=args)
node = PDB_Servo()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
Comment on lines +91 to +93
Copy link

Copilot AI Feb 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

main() spins the node without a try/finally, so a KeyboardInterrupt or exception can skip destroy_node() and leave the I2C bus handle open. Wrap rclpy.spin(node) in try: ... finally: to always close the SMBus and call rclpy.shutdown().

Suggested change
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
try:
rclpy.spin(node)
finally:
node.destroy_node()
rclpy.shutdown()

Copilot uses AI. Check for mistakes.


if __name__ == "__main__":
main()
1 change: 1 addition & 0 deletions src/HW-Devices/servo_pkg/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
"servo_client = servo_pkg.servo_client:main",
"i2c_Servo = servo_pkg.i2c_Servo:main",
"pi_Servo = servo_pkg.pi_Servo:main",
"pdb_controller = servo_pkg.pdb_controller:main",
],
},
)
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,11 @@ def main():
"distortion_coefficients": {
"rows": 1,
"cols": len(ros_dist_coeffs),
"data": ros_dist_coeffs.tolist()
if isinstance(ros_dist_coeffs, np.ndarray)
else list(ros_dist_coeffs),
"data": (
ros_dist_coeffs.tolist()
if isinstance(ros_dist_coeffs, np.ndarray)
else list(ros_dist_coeffs)
),
},
"rectification_matrix": {"rows": 3, "cols": 3, "data": R.flatten().tolist()},
"projection_matrix": {"rows": 3, "cols": 4, "data": P.flatten().tolist()},
Expand Down
Loading