diff --git a/.gitignore b/.gitignore index 68bc17f..a3b73ff 100644 --- a/.gitignore +++ b/.gitignore @@ -25,6 +25,7 @@ share/python-wheels/ .installed.cfg *.egg MANIFEST +environment.yml # PyInstaller # Usually these files are written by a python script from a template diff --git a/Untitled.ipynb b/Untitled.ipynb new file mode 100644 index 0000000..058bf42 --- /dev/null +++ b/Untitled.ipynb @@ -0,0 +1,211 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "id": "be766ec9-1f94-49fb-a583-06f443bdf9db", + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "pybullet build time: May 17 2025 21:04:57\n" + ] + } + ], + "source": [ + "from inverse_geometry import computeqgrasppose" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "68227652-0c9e-4af6-9d0b-87e217e03f0b", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Wrapper tries to connect to server \n", + "You can open the visualizer by visiting the following URL:\n", + "http://127.0.0.1:7001/static/\n" + ] + } + ], + "source": [ + "from tools import setupwithmeshcat\n", + "robot, cube, viz = setupwithmeshcat(url=\"tcp://127.0.0.1:6000\")" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "7e829a6a-1b19-4fcf-8f6e-68e944f5c495", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "tcp://127.0.0.1:6000\n" + ] + } + ], + "source": [ + "from config import MESHCAT_URL\n", + "print(MESHCAT_URL)" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "948cc2a3-c520-442e-be4b-99fa75569f09", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "\n", + "
\n", + " \n", + "
\n", + " " + ], + "text/plain": [ + "" + ] + }, + "execution_count": 4, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "hasattr(viz.viewer, 'jupyter_cell') and viz.viewer.jupyter_cell()" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "6683d3cc-1a3a-4ad8-a655-edd273432293", + "metadata": {}, + "outputs": [], + "source": [ + "from config import CUBE_PLACEMENT\n", + "cubetarget = CUBE_PLACEMENT\n", + "qcurrent = robot.q0" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "ae8201e6-2be1-414d-8f39-7e72e0c695a2", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "(array([-3.01785725e-01, 1.33283923e-15, -2.85817151e-16, -9.28742483e-01,\n", + " -3.29665591e-02, -2.05698204e-01, 4.47493515e-18, 2.38664763e-01,\n", + " -3.40271791e-01, -2.63214437e-01, -2.64229802e-01, 5.15769935e-02,\n", + " 8.84695564e-18, 2.12652808e-01, 2.13738547e+00]),\n", + " True)" + ] + }, + "execution_count": 6, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "computeqgrasppose(robot, qcurrent, cube, cubetarget, viz)" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "4d05cbce-2ece-4adc-8d70-b434a72a806c", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "True" + ] + }, + "execution_count": 8, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "from tools import collision\n", + "\n", + "collision(robot, robot.q0)" + ] + }, + { + "cell_type": "code", + "execution_count": 18, + "id": "c3e5bc01-7f2c-42f9-b768-9487d24408d0", + "metadata": {}, + "outputs": [], + "source": [ + "import pinocchio as pin" + ] + }, + { + "cell_type": "code", + "execution_count": 19, + "id": "f256b348-1a37-4594-b018-0199beb2f807", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "SE3(array([[-9.99998732e-01, 1.59265292e-03, 8.26883241e-18, 3.99483973e-01],[-1.59265292e-03, -9.99998732e-01, 1.35248866e-17, 5.97082849e-02],[ 4.34329287e-17, 3.45763012e-17, 1.00000000e+00, 9.29934278e-01],[ 0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 1.00000000e+00]]))" + ] + }, + "execution_count": 19, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "pin.computeJointJacobians(robot.model,robot.data,robot.q0)\n", + "robot.data.oMf[-1]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "58eaf60e-ccb0-42c3-ae34-b5eabde6f37d", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.13.7" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/bezier.py b/bezier.py index 62375bc..2e392c1 100644 --- a/bezier.py +++ b/bezier.py @@ -7,7 +7,7 @@ def __init__(self, pointlist, t_min=0.0, t_max=1.0, mult_t=1.0): self.T_min_ = t_min self.T_max_ = t_max self.mult_T_ = mult_t - self.size_ = len(pointlist)- 1 + self.size_ = len(pointlist) self.degree_ = self.size_ - 1 self.control_points_ = pointlist if (self.size_ < 1 or self.T_max_ <= self.T_min_): diff --git a/config.py b/config.py index b3e51ee..c9f1d54 100644 --- a/config.py +++ b/config.py @@ -28,7 +28,7 @@ LEFT_HOOK = "LARM_HOOK" RIGHT_HOOK = "RARM_HOOK" - + #scene placements ROBOT_PLACEMENT= pin.XYZQUATToSE3(np.array([0.,0.,0.85,0.,0.,0.,1.])) TABLE_PLACEMENT= pin.SE3(rotate('z',-np.pi/2),np.array([0.8,0.,0.])) @@ -37,13 +37,13 @@ CUBE_PLACEMENT_TARGET= pin.SE3(rotate('z', 0),np.array([0.4, 0.11, 0.93])) #do not edit this part unless you know what you are doing -MODELS_PATH = join(dirname(str(abspath(__file__))), "models") -MESH_DIR = MODELS_PATH +MODELS_PATH = join(dirname(str(abspath(__file__))), "models") +MESH_DIR = MODELS_PATH NEXTAGE_URDF_PATH = MODELS_PATH + '/nextagea_description/urdf/' NEXTAGE_URDF = NEXTAGE_URDF_PATH + 'NextageaOpen.urdf' NEXTAGE_SRDF = NEXTAGE_URDF_PATH + 'NextageAOpen.srdf' TABLE_URDF = MODELS_PATH + '/table/table_tallerscaled.urdf' -TABLE_MESH = MODELS_PATH + "/table/" +TABLE_MESH = MODELS_PATH + "/table/" OBSTACLE_URDF = MODELS_PATH + '/cubes/obstacle.urdf' OBSTACLE_MESH = MODELS_PATH + '/cubes/' CUBE_URDF = MODELS_PATH + '/cubes/cube_small.urdf' diff --git a/constants.py b/constants.py new file mode 100644 index 0000000..c59eab8 --- /dev/null +++ b/constants.py @@ -0,0 +1,59 @@ +"""Project-wide numeric constants and defaults. + +Place all tunable "magic numbers" here so they are easy to discover and tweak. +""" +from typing import Tuple + +# Sampling (table / cube placement) +DEFAULT_TABLE_Z_RANGE: Tuple[float, float] = (0.93, 2) +DEFAULT_CHECK_COLLISIONS: bool = True + +# RRT / planning +DEFAULT_NUM_ITER: int = 500 +DEFAULT_DISCRETISATION_STEPS: int = 100 +DEFAULT_CHECK_EDGE_STEPS: int = 1000 +EDGE_DISTANCE_TOL: float = 1e-3 + +# Visualization / interactive refresh defaults +DEFAULT_VIZ_DELAY: float = 0.01 # seconds to sleep after updating viz +SAMPLE_DISPLAY_EVERY: int = 1 # display every N samples when visualizing sampling +# Fraction of the table extents to use when creating a restricted sampling region +DEFAULT_SAMPLER_SHRINK: float = 0.3 +# When sampling on the vertical plane, allow a small half-width (meters) perpendicular to the plane +DEFAULT_PLANE_HALF_WIDTH: float = 0.3 + +# Planner-specific defaults (tunable) +DEFAULT_REPAIR_ATTEMPTS: int = 5 +DEFAULT_GOAL_BIAS: float = 0.03 +DEFAULT_MAX_IK_ATTEMPTS: int = 3 +DEFAULT_MAX_TIME_S: float = 300 + +MIN_DIST_TO_OBS: float = 0.02 + +# Preset bundles for convenience: 'fast', 'default', 'robust' +PRESETS = { + 'fast': { + 'num_iter': 200, + 'discretisation': 8, + 'repair_attempts': 3, + 'goal_bias': 0.02, + 'max_ik_attempts': 3, + 'max_time_s': 15.0, + }, + 'default': { + 'num_iter': 500, + 'discretisation': 8, + 'repair_attempts': 5, + 'goal_bias': 0.03, + 'max_ik_attempts': 3, + 'max_time_s': 30.0, + }, + 'robust': { + 'num_iter': 1000, + 'discretisation': 10, + 'repair_attempts': 8, + 'goal_bias': 0.05, + 'max_ik_attempts': 5, + 'max_time_s': 60.0, + } +} diff --git a/control.py b/control.py index 8ad6f1a..003df0b 100644 --- a/control.py +++ b/control.py @@ -6,61 +6,190 @@ @author: stonneau """ +import pickle import numpy as np +from zmq.constants import THREAD_AFFINITY_CPU_REMOVE from bezier import Bezier - +from typing import List +import pinocchio as pin +from cube_trajectory_wrapper import GlobalMinJerkLinearJTraj +from tools import setcubeplacement +from config import LEFT_HAND, RIGHT_HAND +from qp_utils import get_bezier_control_points +import pybullet as pyb + # in my solution these gains were good enough for all joints but you might want to tune this. -Kp = 300. # proportional gain (P of PD) -Kv = 2 * np.sqrt(Kp) # derivative gain (D of PD) +Kp = 1000 # proportional gain (P of PD) +Kv = 30 * np.sqrt(Kp) # derivative gain (D of PD) + +Kx_lin = 2000 # Very stiff position control +Dx_lin = 5 * np.sqrt(Kx_lin) + +Kx_rot = 1000 +Dx_rot = 10 + +# Whether to use Bezier curve for trajectory generation +USE_BEZIER = False + +if USE_BEZIER: + Kp = 1000 # proportional gain (P of PD) + Kv = 30 * np.sqrt(Kp) # derivative gain (D of PD) + -def controllaw(sim, robot, trajs, tcurrent, cube): +def controllaw(sim, robot, trajs, tcurrent): + """Joint trajectory tracking with end-effector force correction""" q, vq = sim.getpybulletstate() - #TODO - torques = [0.0 for _ in sim.bulletCtrlJointsInPinOrder] - sim.step(torques) + + q_target = trajs[0](tcurrent) + vq_target = trajs[1](tcurrent) + vvq_target = trajs[2](tcurrent) + + vvq_des = vvq_target + Kp * (q_target - q) + Kv * (vq_target - vq) + M = robot.mass(q) + h = robot.nle(q, vq) + tau_base = M @ vvq_des + h + + cube_pos, cube_quat = pyb.getBasePositionAndOrientation(sim.cubeId) + R = np.array(pyb.getMatrixFromQuaternion(cube_quat)).reshape(3, 3) + cube_current_SE3 = pin.SE3(R, np.array(cube_pos)) + + # cube_target_SE3 = pin.SE3(rotate('z', 0.), cube_trajs(tcurrent)) + setcubeplacement(robot, cube, cube_current_SE3) + + # Forward kinematics + model, data = robot.model, robot.data + left_id = model.getFrameId(LEFT_HAND) + right_id = model.getFrameId(RIGHT_HAND) + + pin.forwardKinematics(model, data, trajs[0](0)) + pin.updateFramePlacements(model, data) + + oMLhand_desired = data.oMf[left_id] # From joint trajectory + oMRhand_desired = data.oMf[right_id] + + + pin.forwardKinematics(model, data, q, vq) + pin.updateFramePlacements(model, data) + pin.computeJointJacobians(model, data, q) + + # Jacobians + J_l = pin.computeFrameJacobian(model, data, q, left_id, + pin.ReferenceFrame.LOCAL_WORLD_ALIGNED) + J_r = pin.computeFrameJacobian(model, data, q, right_id, + pin.ReferenceFrame.LOCAL_WORLD_ALIGNED) + + oMLhand_actual = data.oMf[left_id] + oMRhand_actual = data.oMf[right_id] + + error_pos_hands = oMLhand_actual.translation - oMRhand_actual.translation + + # Orientation errors + R_err_l = oMLhand_desired.rotation @ oMLhand_actual.rotation.T + err_rot_l = pin.log3(R_err_l) + + R_err_r = oMRhand_desired.rotation @ oMRhand_actual.rotation.T + err_rot_r = pin.log3(R_err_r) + + # Velocities + v_l = pin.getFrameVelocity(model, data, left_id, + pin.ReferenceFrame.LOCAL_WORLD_ALIGNED) + vel_l = np.hstack([v_l.linear, v_l.angular]) + v_r = pin.getFrameVelocity(model, data, right_id, + pin.ReferenceFrame.LOCAL_WORLD_ALIGNED) + vel_r = np.hstack([v_r.linear, v_r.angular]) + + # Compute correction forces + F_l = np.hstack([ + -Kx_lin * error_pos_hands - Dx_lin * vel_l[:3], + Kx_rot * err_rot_l - Dx_rot * vel_l[3:] + ]) + F_r = np.hstack([ + Kx_lin * error_pos_hands - Dx_lin * vel_r[:3], + Kx_rot * err_rot_r - Dx_rot * vel_r[3:] + ]) + + # Map to joint torques + tau_l = J_l[:, :].T @ F_l + tau_r = J_r[:, :].T @ F_r + + tau_l = np.clip(tau_l, -200.0, 200.0) + tau_r = np.clip(tau_r, -200.0, 200.0) + + # Combine: base tracking + end-effector correction + tau_total = tau_base + tau_r + tau_l + sim.step(tau_total) if __name__ == "__main__": - + from tools import setupwithpybullet, setupwithpybulletandmeshcat, rununtil from config import DT - - robot, sim, cube = setupwithpybullet() - - - from config import CUBE_PLACEMENT, CUBE_PLACEMENT_TARGET + from config import CUBE_PLACEMENT, CUBE_PLACEMENT_TARGET from inverse_geometry import computeqgrasppose from path import computepath - - q0,successinit = computeqgrasppose(robot, robot.q0, cube, CUBE_PLACEMENT, None) - qe,successend = computeqgrasppose(robot, robot.q0, cube, CUBE_PLACEMENT_TARGET, None) - path = computepath(q0,qe,CUBE_PLACEMENT, CUBE_PLACEMENT_TARGET) - - #setting initial configuration + robot, sim, cube = setupwithpybullet() + + q0, successinit = computeqgrasppose(robot, robot.q0, cube, CUBE_PLACEMENT, None) + qe, successend = computeqgrasppose(robot, robot.q0, cube, CUBE_PLACEMENT_TARGET, None) + + + if not successinit or not successend: + raise RuntimeError('Failed to successfully compute grasp pose!') + + path, cube_placements = computepath( + q0, + qe, + CUBE_PLACEMENT, + CUBE_PLACEMENT_TARGET, + robot=robot, + cube=cube, + computeqgrasppose=computeqgrasppose) + sim.setqsim(q0) - - - #TODO this is just an example, you are free to do as you please. - #In any case this trajectory does not follow the path - #0 init and end velocities - def maketraj(q0,q1,T): #TODO compute a real trajectory ! - q_of_t = Bezier([q0,q0,q1,q1],t_max=T) + + def maketraj(points, T): + q_of_t = Bezier(points,t_max=T) vq_of_t = q_of_t.derivative(1) vvq_of_t = vq_of_t.derivative(1) return q_of_t, vq_of_t, vvq_of_t - - - #TODO this is just a random trajectory, you need to do this yourself - total_time=4. - trajs = maketraj(q0, qe, total_time) - + + + def maketraj_with_joint_space_linear( + waypoints: List[np.array], + T: float): + joint_space_trajectory = GlobalMinJerkLinearJTraj( + waypoints=waypoints, + T_max=T, + ) + + return ( + joint_space_trajectory, + joint_space_trajectory.derivative(1), + joint_space_trajectory.derivative(2) + ) + + total_time=10.0 + + if USE_BEZIER: + control_points = get_bezier_control_points(path) + if control_points is None: + raise RuntimeError('Failed to successfully compute Bezier control points!') + trajs = maketraj(control_points, total_time) + else: + trajs = maketraj_with_joint_space_linear( + waypoints=path, + T=total_time + ) + tcur = 0. - - + while tcur < total_time: - rununtil(controllaw, DT, sim, robot, trajs, tcur, cube) + rununtil( + controllaw, + DT, + sim, + robot, + trajs, + tcur) tcur += DT - - - \ No newline at end of file diff --git a/cube_trajectory_wrapper.py b/cube_trajectory_wrapper.py new file mode 100644 index 0000000..30d70ec --- /dev/null +++ b/cube_trajectory_wrapper.py @@ -0,0 +1,492 @@ +import numpy as np +from config import DT +import pinocchio as pin +from pinocchio.utils import rotate +from inverse_geometry import computeqgrasppose +from bezier import Bezier + +class RobotTrajectoryWrapper: + def __init__( + self, + q_init, + q_goal, + T_max, + T_min=0.0, + dt: float = DT + ): + self.q_init = q_init + self.q_goal = q_goal + self.T_max = T_max + self.T_min = T_min + self.dt = dt + + self.q_t = { + T_min: q_init, + T_max: q_goal + } + + def __call__(self, t): + return self.get_closest_q(t) + + def set_q_t(self, q, t): + self.q_t[t] = q + + def get_closest_q(self, t): + if t in self.q_t: + return self.q_t[t] + else: + return self.q_t[min(self.q_t.keys(), key=lambda x: abs(x-t))] + + def derivative(self, order, t): + + q_t = self.get_closest_q(t) + + # choose neighbors with clamping + tp = min(t + self.dt, self.T_max) + tm = max(t - self.dt, self.T_min) + q_tp = self.get_closest_q(tp) + q_tm = self.get_closest_q(tm) + + if order == 1: + if tp == t and tm == t: + return np.zeros_like(q_t) + if tp > t and tm < t: # central diff + return (q_tp - q_tm) / (tp - tm) + elif tp > t: # forward one-sided + return (q_tp - q_t) / (tp - t) + else: # backward one-sided + return (q_t - q_tm) / (t - tm) + + if order == 2: + # use symmetric step h when possible + h_p = max(tp - t, 0.0) + h_m = max(t - tm, 0.0) + if h_p > 0 and h_m > 0 and abs(h_p - h_m) < 1e-12: + h = h_p # equal steps + return (q_tp - 2.0*q_t + q_tm) / (h*h) + # fallback: nonuniform 3-point formula (approx) + denom = 0.5*(h_p*h_m*(h_p + h_m)) + 1e-12 + a = 2.0 / denom + return a * ( (q_tp * h_m) - (q_t * (h_p + h_m)) + (q_tm * h_p) ) + raise ValueError("Only derivative orders 1 or 2 supported.") + + +class CubeBezierTrajectory: + def __init__(self, + cube_placements, + robot, + cube, + q_init, + q_goal, + T_max, + T_min=0.0, + dt: float = DT): + + self.cube_placements = cube_placements + self.cube_placements_pos = [p.translation for p in cube_placements] + + self.q_of_t_cube = Bezier(self.cube_placements_pos, T_min, T_max) + + self.robot = robot + self.cube = cube + + self.dt = dt + self.T_min = T_min + self.T_max = T_max + + self.robot_q_trajectory = RobotTrajectoryWrapper( + q_init=q_init, + q_goal=q_goal, + T_max=T_max, + T_min=T_min, + dt=dt + ) + + def _register_robot_q_t(self, q, t): + self.robot_q_trajectory.set_q_t(q, t) + + def __call__(self, t): + cube_pos = self.q_of_t_cube(t) + cube_placement = pin.SE3(rotate('z', 0.), cube_pos) + + q, success = computeqgrasppose( + robot=self.robot, + qcurrent=self.robot_q_trajectory.get_closest_q(t - self.dt), + cube=self.cube, + cubetarget=cube_placement) + if not success: + print(t) + print(cube_placement) + print(self.robot_q_trajectory.get_closest_q(t - self.dt * 2)) + print(q) + raise ValueError("Failed to compute grasp pose") + # print(f"t={t:.3f}, success={success}") + self.robot_q_trajectory.set_q_t(q, t) + return q + + def get_q_derivative(self, t, order): + if t not in self.robot_q_trajectory.q_t: + self.__call__(t) + prev_t = max(self.T_min, t - self.dt) + next_t = min(self.T_max, t + self.dt) + if prev_t not in self.robot_q_trajectory.q_t: + self.__call__(prev_t) + if next_t not in self.robot_q_trajectory.q_t: + self.__call__(next_t) + return self.robot_q_trajectory.derivative(order, t) + + def derivative(self, order): + return lambda t: self.get_q_derivative(t, order) + + +class CubeLinearTrajectory: + def __init__(self, + cube_placements, + robot, + cube, + q_init, + q_goal, + T_max, + T_min=0.0, + dt: float = DT): + + self.cube_placements = cube_placements + self.cube_placements_pos = [p.translation for p in cube_placements] + + self.robot = robot + self.cube = cube + + self.T_max = T_max + self.T_min = T_min + + self.dt = dt + + self.robot_q_trajectory = RobotTrajectoryWrapper( + q_init=q_init, + q_goal=q_goal, + T_max=T_max, + T_min=T_min, + dt=dt + ) + + segments = np.diff(self.cube_placements_pos, axis=0) + self.lengths = np.linalg.norm(segments, axis=1) + self.cumulative_lengths = np.concatenate([[0], np.cumsum(self.lengths)]) + self.total_length = self.cumulative_lengths[-1] + + # constant speed + self.v = self.total_length / T_max + + def _register_robot_q_t(self, q, t): + self.robot_q_trajectory.set_q_t(q, t) + + def _get_cube_pos(self, t): + # current arc length + s = t * self.v + s = np.clip(s, 0, self.total_length) + + # segment index of the current arc length + segment_idx = np.searchsorted(self.cumulative_lengths, s, side='right') - 1 + segment_idx = min(segment_idx, len(self.lengths) - 1) + + # Local parameter within segment + s_local = s - self.cumulative_lengths[segment_idx] + alpha = s_local / self.lengths[segment_idx] if self.lengths[segment_idx] > 0 else 0 + + # Interpolate + p0 = self.cube_placements_pos[segment_idx] + p1 = self.cube_placements_pos[segment_idx + 1] + return (1 - alpha) * p0 + alpha * p1 + + def __call__(self, t): + cube_pos = self._get_cube_pos(t) + cube_placement = pin.SE3(rotate('z', 0.), cube_pos) + + q, success = computeqgrasppose( + robot=self.robot, + qcurrent=self.robot_q_trajectory.get_closest_q(t-self.dt * 10), + cube=self.cube, + cubetarget=cube_placement) + + if not success: + print(f'Failed to compute q for t={t}, cube_pos={cube_pos}, ') + self.robot_q_trajectory.set_q_t(q, t) + return q + + def get_q_derivative(self, t, order): + if t not in self.robot_q_trajectory.q_t: + self.__call__(t) + prev_t = max(self.T_min, t - self.dt) + next_t = min(self.T_max, t + self.dt) + if prev_t not in self.robot_q_trajectory.q_t: + self.__call__(prev_t) + if next_t not in self.robot_q_trajectory.q_t: + self.__call__(next_t) + return self.robot_q_trajectory.derivative(order, t) + + def derivative(self, order): + return lambda t: self.get_q_derivative(t, order) + + + +class LinearJointTrajectory: + """ + Piecewise-linear joint-space trajectory through given waypoints. + Time is distributed proportional to joint-space segment length, + with optional per-joint vmax / amax limiting. + """ + def __init__(self, waypoints, T_max, qdot_max=None, amax=None, eps=1e-9): + self.W = np.asarray(waypoints, dtype=float) # (K, nq) + # assert self.W.ndim == 2 and self.W.shape[0] >= 2, "Need at least 2 waypoints" + self.nq = self.W.shape[1] + self.T_max = float(T_max) + self.eps = eps + + # Per-joint limits + if qdot_max is None: + qdot_max = np.full(self.nq, 2.0) # rad/s default + if amax is None: + amax = np.full(self.nq, 5.0) # rad/s^2 default + self.qdot_max = np.asarray(qdot_max, dtype=float) + self.amax = np.asarray(amax, dtype=float) + + # Segment lengths in joint space (∞-norm per joint -> max joint move dominates) + dQ = np.diff(self.W, axis=0) # (K-1, nq) + seg_norms = np.max(np.abs(dQ), axis=1) # (K-1,) + seg_norms = np.maximum(seg_norms, self.eps) + total = float(np.sum(seg_norms)) + + # First pass: proportional time split + self.seg_times = (seg_norms / total) * self.T_max # (K-1,) + + # Enforce vmax per joint: T_i >= max_j |dq_ij| / vmax_j + Tmin_v = np.max(np.abs(dQ) / self.qdot_max, axis=1) + self.seg_times = np.maximum(self.seg_times, Tmin_v + 1e-6) + + # Optional: very light amax check (not a full trapezoid here) + # You can tighten with a true trapezoidal time-scaler if needed. + + # Prefix sums -> piecewise timing + self.t_knots = np.concatenate(([0.0], np.cumsum(self.seg_times))) + # Guard for round-off + self.t_knots[-1] = self.T_max + + # Precompute segment slopes (constant velocity per segment) + self.seg_slopes = np.zeros_like(dQ) + for i, dt in enumerate(self.seg_times): + self.seg_slopes[i] = dQ[i] / max(dt, self.eps) + + def _find_seg(self, t): + if t <= 0.0: return 0, 0.0 + if t >= self.T_max: return len(self.seg_times) - 1, self.seg_times[-1] + # locate segment index i s.t. t in [t_i, t_{i+1}] + i = np.searchsorted(self.t_knots, t, side='right') - 1 + i = min(max(i, 0), len(self.seg_times) - 1) + tau = t - self.t_knots[i] + return i, tau + + def __call__(self, t): + """q(t)""" + t = float(np.clip(t, 0.0, self.T_max)) + i, tau = self._find_seg(t) + return self.W[i] + self.seg_slopes[i] * tau + + def derivative(self, order): + if order == 1: + return lambda t: self.velocity(t) + elif order == 2: + return lambda t: self.acceleration(t) + else: + return lambda t: np.zeros(self.nq) + + def velocity(self, t): + """qdot(t): piecewise constant""" + t = float(np.clip(t, 0.0, self.T_max)) + i, _ = self._find_seg(t) + return self.seg_slopes[i] + + def acceleration(self, t): + """qddot(t): zero inside segments (impulses at knots not modeled)""" + return np.zeros(self.nq) + + + +class GlobalMinJerkLinearJTraj: + """ + Follow a joint-space polyline with a GLOBAL min-jerk time law s(t) in [0, L]. + - q(t) continuous everywhere + - |q̇(t)| ramps from ~0 at start to peak mid-way, back to 0 at end + - q̇ direction still changes at vertices (polyline corners) + """ + def __init__(self, waypoints, T_max, eps=1e-9): + W = np.asarray(waypoints, float) + # assert W.ndim == 2 and W.shape[0] >= 2 + print(W) + self.W = W + self.nq = W.shape[1] + self.T = float(T_max) + self.eps = eps + + # Segment "lengths" in joint space (use ∞-norm so one joint dominates timing) + dQ = np.diff(W, axis=0) # (S, nq) + seg_L = np.max(np.abs(dQ), axis=1) # (S,) + seg_L = np.maximum(seg_L, eps) + self.dQ = dQ + self.seg_L = seg_L + self.cum_L = np.concatenate(([0.0], np.cumsum(seg_L))) + self.Ltot = float(self.cum_L[-1]) + + # Precompute segment directions (unit in ∞-norm sense) + self.seg_dir = np.divide(dQ, seg_L[:, None], where=seg_L[:, None] > 0) + + # ---- Min-jerk scalar profile s(t) (and derivatives) on [0,T] ---- + # τ in [0,1]; s = L*(10τ^3 - 15τ^4 + 6τ^5) + def _s(self, t): + tau = np.clip(t / self.T, 0.0, 1.0) + tau2, tau3, tau4, tau5 = tau*tau, tau*tau*tau, None, None + tau4 = tau2*tau2 + tau5 = tau3*tau2 + s = self.Ltot * (10*tau3 - 15*tau4 + 6*tau5) + sd = self.Ltot * (30*tau*tau - 60*tau2*tau + 30*tau4) / self.T + sdd = self.Ltot * (60*tau - 180*tau2 + 120*tau3) / (self.T**2) + return s, sd, sdd + + def _locate_segment(self, s): + # find i s.t. s in [cum_L[i], cum_L[i+1]] + i = np.searchsorted(self.cum_L, s, side='right') - 1 + i = int(np.clip(i, 0, len(self.seg_L)-1)) + s_local = s - self.cum_L[i] + alpha = s_local / max(self.seg_L[i], self.eps) # in [0,1] + return i, alpha + + def __call__(self, t): + s, sd, sdd = self._s(t) + # Clamp end + if s >= self.Ltot - 1e-12: + return self.W[-1] + i, alpha = self._locate_segment(s) + return self.W[i] + alpha * self.dQ[i] + + def derivative(self, order): + if order == 1: + return lambda t: self.velocity(t) + if order == 2: + return lambda t: self.acceleration(t) + return lambda t: np.zeros(self.nq) + + def velocity(self, t): + s, sd, sdd = self._s(t) + if s >= self.Ltot - 1e-12 or sd <= 0: + return np.zeros(self.nq) + i, _ = self._locate_segment(s) + # q̇ = (dq/ds) * ṡ ; along a linear segment, dq/ds = seg_dir[i] + return self.seg_dir[i] * sd + + def acceleration(self, t): + s, sd, sdd = self._s(t) + if s >= self.Ltot - 1e-12: + return np.zeros(self.nq) + i, _ = self._locate_segment(s) + # q̈ = (dq/ds) * s̈ + (d/ds seg_dir)*ṡ^2 ; second term is zero inside segment + # (impulses at corners not modeled explicitly) + return self.seg_dir[i] * sdd + + + +class LinearPathWithTrapezoidalVel: + """ + Piecewise linear path with smooth velocity profile that slows at corners + """ + def __init__(self, waypoints, T_max, corner_slowdown=0.1): + W = np.asarray(waypoints, float) + self.W = W + self.nq = W.shape[1] + self.T = T_max + + # Segment lengths + dQ = np.diff(W, axis=0) + seg_L = np.linalg.norm(dQ, axis=1) + self.seg_L = seg_L + self.cum_L = np.concatenate(([0.0], np.cumsum(seg_L))) + self.Ltot = self.cum_L[-1] + + # Direction vectors + self.seg_dir = dQ / (seg_L[:, None] + 1e-9) + + # Define "corner zones" - last X% of each segment + first X% of next + self.corner_slowdown = corner_slowdown + self.slow_distance = corner_slowdown * np.min(seg_L[seg_L > 0]) if len(seg_L) > 0 else 0.1 + + def _s(self, t): + """Min-jerk profile on [0, T] but modified near corners""" + tau = np.clip(t / self.T, 0.0, 1.0) + tau2, tau3 = tau*tau, tau*tau*tau + tau4, tau5 = tau2*tau2, tau3*tau2 + + # Base min-jerk + s_base = self.Ltot * (10*tau3 - 15*tau4 + 6*tau5) + sd_base = self.Ltot * (30*tau2 - 60*tau3 + 30*tau4) / self.T + sdd_base = self.Ltot * (60*tau - 180*tau2 + 120*tau3) / (self.T**2) + + # Slow down factor based on proximity to corners + # slowdown = self._corner_slowdown_factor(s_base) + slowdown = 1.0 + + return s_base, sd_base * slowdown, sdd_base * slowdown + + def _corner_slowdown_factor(self, s): + """Returns value in (0, 1] that reduces speed near corners""" + # Find which segment we're in + i = np.searchsorted(self.cum_L, s, side='right') - 1 + i = int(np.clip(i, 0, len(self.seg_L)-1)) + + # Distance from start and end of segment + s_in_seg = s - self.cum_L[i] + dist_to_end = self.seg_L[i] - s_in_seg + + # Slow down if near end of segment (approaching corner) + if dist_to_end < self.slow_distance: + factor = dist_to_end / self.slow_distance + return 0.2 + 0.8 * factor # Reduce to 20% speed at corner + + # Slow down if near start of segment (just left corner) + if s_in_seg < self.slow_distance and i > 0: + factor = s_in_seg / self.slow_distance + return 0.2 + 0.8 * factor + + return 1.0 # Full speed in middle of segments + + def _locate_segment(self, s): + i = np.searchsorted(self.cum_L, s, side='right') - 1 + i = int(np.clip(i, 0, len(self.seg_L)-1)) + s_local = s - self.cum_L[i] + alpha = s_local / max(self.seg_L[i], 1e-9) + return i, alpha + + def __call__(self, t): + s, _, _ = self._s(t) + if s >= self.Ltot - 1e-12: + return self.W[-1] + i, alpha = self._locate_segment(s) + return self.W[i] + alpha * (self.W[i+1] - self.W[i]) + + def velocity(self, t): + s, sd, _ = self._s(t) + if s >= self.Ltot - 1e-12 or sd <= 0: + return np.zeros(self.nq) + i, _ = self._locate_segment(s) + return self.seg_dir[i] * sd + + def acceleration(self, t): + s, sd, sdd = self._s(t) + if s >= self.Ltot - 1e-12: + return np.zeros(self.nq) + i, _ = self._locate_segment(s) + return self.seg_dir[i] * sdd + + def derivative(self, order): + if order == 1: + return self.velocity + elif order == 2: + return self.acceleration + return lambda t: np.zeros(self.nq) diff --git a/inverse_geometry.py b/inverse_geometry.py index 8f03044..0cbcb55 100644 --- a/inverse_geometry.py +++ b/inverse_geometry.py @@ -6,33 +6,98 @@ @author: stonneau """ -import pinocchio as pin +import pinocchio as pin import numpy as np from numpy.linalg import pinv,inv,norm,svd,eig -from tools import collision, getcubeplacement, setcubeplacement, projecttojointlimits +from tools import collision, getcubeplacement, setcubeplacement, jointlimitsviolated +import time + from config import LEFT_HOOK, RIGHT_HOOK, LEFT_HAND, RIGHT_HAND, EPSILON from config import CUBE_PLACEMENT, CUBE_PLACEMENT_TARGET from tools import setcubeplacement -def computeqgrasppose(robot, qcurrent, cube, cubetarget, viz=None): - '''Return a collision free configuration grasping a cube at a specific location and a success flag''' +def computeqgrasppose(robot, qcurrent, cube, cubetarget, viz=None, max_attempts: int = 1000, viz_sleep: bool = True): + '''Return a collision free configuration grasping a cube at a specific location and a success flag. + + Args: + robot: RobotWrapper + qcurrent: initial joint configuration to seed the IK + cube: cube body + cubetarget: desired cube SE3 placement + viz: optional visualizer + max_attempts: maximum number of IK iteration steps to attempt (default 1000) + ''' + q = qcurrent.copy() + # Ensure the cube is placed at the requested target before solving IK setcubeplacement(robot, cube, cubetarget) - #TODO implement - print ("TODO: implement me") - return robot.q0, False - + + lhand_id = robot.model.getFrameId(LEFT_HAND) + rhand_id = robot.model.getFrameId(RIGHT_HAND) + + oMlhook = getcubeplacement(cube, LEFT_HOOK) + oMrhook = getcubeplacement(cube, RIGHT_HOOK) + + success = False + + for i in range(max_attempts): + pin.framesForwardKinematics(robot.model, robot.data, q) + pin.computeJointJacobians(robot.model, robot.data, q) + + oMlhand = robot.data.oMf[lhand_id] + oMrhand = robot.data.oMf[rhand_id] + + lhandMlhook = oMlhand.inverse()*oMlhook + rhandMrhook = oMrhand.inverse()*oMrhook + + nu_L = pin.log(lhandMlhook).vector + nu_R = pin.log(rhandMrhook).vector + + J_L = pin.computeFrameJacobian(robot.model, robot.data, q, lhand_id, pin.LOCAL) + J_R = pin.computeFrameJacobian(robot.model, robot.data, q, rhand_id, pin.LOCAL) + + JL_p = pinv(J_L) + v1 = JL_p @ nu_L + N1 = np.eye(robot.nv) - JL_p @ J_L + + JRN = J_R @ N1 + JRN_p = pinv(JRN) + + v2 = JRN_p @ (nu_R - J_R @ v1) + vq = v1 + N1 @ v2 + + N2 = N1 - JRN_p @ JRN @ N1 + v3 = pin.difference(robot.model, q, robot.q0) + + vq += N2 @ v3 + + q = pin.integrate(robot.model,q, vq) + + lerror = norm(oMlhand.translation - oMlhook.translation) + norm(oMlhand.rotation - oMlhook.rotation) + rerror = norm(oMrhand.translation - oMrhook.translation) + norm(oMrhand.rotation - oMrhook.rotation) + + if viz: + # allow callers to request that we display without sleeping so the + # planner can validate IK quickly without incurring a sleep cost. + viz.display(q) + if viz_sleep: + time.sleep(0.1) + + if lerror < EPSILON and rerror < EPSILON and not collision(robot, q) and not jointlimitsviolated(robot, q): + success = True + break + + return q, success + + if __name__ == "__main__": from tools import setupwithmeshcat from setup_meshcat import updatevisuals robot, cube, viz = setupwithmeshcat() - + q = robot.q0.copy() - + q0,successinit = computeqgrasppose(robot, q, cube, CUBE_PLACEMENT, viz) qe,successend = computeqgrasppose(robot, q, cube, CUBE_PLACEMENT_TARGET, viz) - + updatevisuals(viz, robot, cube, q0) - - - diff --git a/path.py b/path.py index 03d3b90..1dc7dda 100644 --- a/path.py +++ b/path.py @@ -1,48 +1,275 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -""" -Created on Thu Sep 21 11:44:32 2023 +"""Compatibility wrapper module. -@author: stonneau +This file used to contain the RRT and sampling implementation. To keep +backwards compatibility for imports that expect `path.py`, it re-exports the +cleaner implementations from `sampling.py` and `rrt.py`. """ +from rrt import Vertex, RRT +from constants import ( + DEFAULT_NUM_ITER, + DEFAULT_DISCRETISATION_STEPS, + DEFAULT_VIZ_DELAY, + DEFAULT_SAMPLER_SHRINK, + DEFAULT_PLANE_HALF_WIDTH, + DEFAULT_TABLE_Z_RANGE, + PRESETS, + DEFAULT_REPAIR_ATTEMPTS, + DEFAULT_GOAL_BIAS, + DEFAULT_MAX_IK_ATTEMPTS, + DEFAULT_MAX_TIME_S, +) +from tools import setcubeplacement +import time +import logging +from typing import List import pinocchio as pin import numpy as np -from numpy.linalg import pinv -from config import LEFT_HAND, RIGHT_HAND -import time +logger = logging.getLogger(__name__) + +from typing import Callable, Tuple, List +from pinocchio.utils import rotate +from tools import setcubeplacement, distanceToObstacle + +def computepath(qinit, + qgoal, + cubeplacementq0, + cubeplacementqgoal, + robot=None, + cube=None, + computeqgrasppose=None, + num_iter: int = DEFAULT_NUM_ITER, + discretisation_steps: int = DEFAULT_DISCRETISATION_STEPS, + max_ik_attempts: int = None, + max_time_s: float = DEFAULT_MAX_TIME_S, + post_path_wait: float = 5.0, + random_sampler=None, + goal_bias: float = 0.02, + viz=None, + viz_delay: float = DEFAULT_VIZ_DELAY): + """ Returns a collision free path from qinit to qgoal under grasping constraints. + Args: + qinit: initial configuration + qgoal: goal configuration + cubeplacementq0: initial cube placement + cubeplacementqgoal: goal cube placement + Returns: + path: list of configurations + """ + robot = robot or globals().get("robot") + cube = cube or globals().get("cube") + computeqgrasppose = computeqgrasppose or globals().get("computeqgrasppose") + + + if random_sampler is None: + p0 = np.array(cubeplacementq0.translation) + p1 = np.array(cubeplacementqgoal.translation) + + p0_xy = p0[:2] + p1_xy = p1[:2] + seg = p1_xy - p0_xy + L = np.linalg.norm(seg) + if L < 1e-6: + u = np.array([1.0, 0.0]) + else: + u = seg / L + perp = np.array([-u[1], u[0]]) + + half_width = DEFAULT_PLANE_HALF_WIDTH + + # z range uses the default table z range + zmin, zmax = DEFAULT_TABLE_Z_RANGE + + def plane_sampler(): + while True: + s = np.random.uniform(-1.0, 2.0) + t = np.random.uniform(-half_width, half_width) + xy = (1.0 - s) * p0_xy + s * p1_xy + perp * t + z = np.random.uniform(zmin, zmax) + placement = pin.SE3(np.eye(3), np.array([xy[0], xy[1], z])) + setcubeplacement(robot, cube, placement) + has_collision = pin.computeCollisions(cube.collision_model, cube.collision_data, False) + if not has_collision: + return placement + + RANDOM_SAMPLER = plane_sampler + + # sampling box for visualization: a thin box centered between p0 and p1 + center_xy = (p0_xy + p1_xy) / 2.0 + center = np.array([center_xy[0], center_xy[1], (zmin + zmax) / 2.0]) + dims_u = max(L, 1e-6) + dims_perp = 2.0 * half_width + dims_z = zmax - zmin + + # build axis-aligned x/y range for visualization convenience + corners_xy = [p0_xy + perp * (-half_width), p0_xy + perp * (half_width), + p1_xy + perp * (-half_width), p1_xy + perp * (half_width)] + xs = [c[0] for c in corners_xy] + ys = [c[1] for c in corners_xy] + x_range = (min(xs), max(xs)) + y_range = (min(ys), max(ys)) + z_range = (zmin, zmax) + sampling_box_ranges = (x_range, y_range, z_range) + else: + RANDOM_SAMPLER = random_sampler + MAX_DELTA_Q = None + GET_GRASPPING_POSEQ = computeqgrasppose + + print('Searching for a valid path...(this may take some time)') + # (no sampling-box visualization) + # Instantiate RRT with desired runtime parameters then run it. This uses + # the newer instance API so callers can inspect the tree if desired. + rrt = RRT(root_q=cubeplacementq0, + root_grasping_q=qinit, + robot=robot, + cube=cube, + get_grasping_poseq=GET_GRASPPING_POSEQ, + viz=viz, + viz_delay=viz_delay, + discretisation_steps=discretisation_steps, + max_ik_attempts=max_ik_attempts, + num_iter=num_iter, + random_sampler=RANDOM_SAMPLER, + max_delta_q=MAX_DELTA_Q, + cubeplacementqgoal=cubeplacementqgoal, + q_goal=qgoal, + max_time_s=max_time_s) + + rrt.run() + + if not rrt: + print( + """Valid Path was not found! + Try to increase discretisation steps or try with different start and end configurations.""") + + return [], [] -#returns a collision free path from qinit to qgoal under grasping constraints -#the path is expressed as a list of configurations -def computepath(qinit,qgoal,cubeplacementq0, cubeplacementqgoal): - #TODO - return [qinit, qgoal] - pass + print('Path found! Retrieving shortcut path from RRT...') + path_vertices = rrt.get_shortcut_from_rrt_path() -def displaypath(robot,path,dt,viz): - for q in path: + path_configurations = [vertex.grasping_q for vertex in path_vertices] + cube_placements = [vertex.q for vertex in path_vertices] + + print('Path is ready!') + + # visualise the cube placements by stepping the robot so user sees the grasping poses + if viz: + # pause briefly so user sees the final planning state before playback + time.sleep(post_path_wait) + for v in path_vertices: + setcubeplacement(robot, cube, v.q) + viz.display(v.grasping_q) + time.sleep(max(0.01, viz_delay)) + + # log metrics if rrt stored them + if hasattr(rrt, 'metrics'): + logger.info('Path metrics: %s', rrt.metrics) + + return path_configurations, cube_placements + + +def displaypath(robot, cube, path, cube_placements, dt, viz): + for q, cube_placement in zip(path, cube_placements): + setcubeplacement(robot, cube, cube_placement) viz.display(q) time.sleep(dt) if __name__ == "__main__": + import argparse from tools import setupwithmeshcat from config import CUBE_PLACEMENT, CUBE_PLACEMENT_TARGET from inverse_geometry import computeqgrasppose - + import logging + + parser = argparse.ArgumentParser(description='Run RRT planner for cube placements') + parser.add_argument('--no-viz', action='store_true', help='Disable visualization during planning') + parser.add_argument('--discretisation', type=int, default=DEFAULT_DISCRETISATION_STEPS, help='Discretisation steps for interpolation') + parser.add_argument('--num-iter', type=int, default=DEFAULT_NUM_ITER, help='Maximum RRT iterations/samples') + parser.add_argument('--max-time', type=float, default=None, help='Maximum wall time (s) for planner (abort early)') + parser.add_argument('--max-ik-attempts', type=int, default=None, help='Maximum IK iterations per interpolation point') + parser.add_argument('--max-delta-q', type=float, default=None, help='Maximum delta (translation) per extension (meters)') + parser.add_argument('--viz-delay', type=float, default=DEFAULT_VIZ_DELAY, help='Visualization delay in seconds') + parser.add_argument('--progress-every', type=int, default=50, help='How many iterations between progress logs') + parser.add_argument('--goal-bias', type=float, default=0.02, help='Probability to sample the goal directly on each iteration (0.0-1.0)') + parser.add_argument('--playback', action='store_true', help='Show the final path even if planning ran with --no-viz') + parser.add_argument('--preset', choices=['fast', 'default', 'robust'], default=None, help='Use a preset bundle of planner defaults') + args = parser.parse_args() + + logging.basicConfig(level=logging.INFO) + + # `setupwithmeshcat` returns (robot, cube, viz) in the current helpers. + # Unpack accordingly and avoid depending on any `table` object. robot, cube, viz = setupwithmeshcat() - - + q = robot.q0.copy() - q0,successinit = computeqgrasppose(robot, q, cube, CUBE_PLACEMENT, viz) - qe,successend = computeqgrasppose(robot, q, cube, CUBE_PLACEMENT_TARGET, viz) - - if not(successinit and successend): - print ("error: invalid initial or end configuration") - - path = computepath(q0,qe,CUBE_PLACEMENT, CUBE_PLACEMENT_TARGET) - - displaypath(robot,path,dt=0.5,viz=viz) #you ll probably want to lower dt - + q0, successinit = computeqgrasppose(robot, q, cube, CUBE_PLACEMENT, viz=None) + qe, successend = computeqgrasppose(robot, q, cube, CUBE_PLACEMENT_TARGET, viz=None) + + if not (successinit and successend): + print("error: invalid initial or end configuration") + raise SystemExit(1) + + viz_obj = None if args.no_viz else viz + # Resolve preset values (CLI args override preset). We detect explicit + # CLI overrides by checking sys.argv for the flag name. + import sys + preset_values = PRESETS.get(args.preset) if args.preset is not None else {} + def use_or_preset(flag_name, current_value): + # Return CLI-provided value if flag present, otherwise preset or current_value + if f'--{flag_name}' in sys.argv: + return current_value + if preset_values and flag_name in preset_values: + return preset_values[flag_name] + return current_value + + resolved_num_iter = use_or_preset('num-iter', args.num_iter) + resolved_discretisation = use_or_preset('discretisation', args.discretisation) + resolved_goal_bias = use_or_preset('goal-bias', args.goal_bias) + resolved_max_ik_attempts = use_or_preset('max-ik-attempts', args.max_ik_attempts if args.max_ik_attempts is not None else DEFAULT_MAX_IK_ATTEMPTS) + resolved_max_time = use_or_preset('max-time', args.max_time if args.max_time is not None else DEFAULT_MAX_TIME_S) + # choose sampler: either let computepath build the plane sampler, or build a smart sampler here + + path, cube_placements = computepath( + q0, qe, CUBE_PLACEMENT, CUBE_PLACEMENT_TARGET, + robot=robot, + cube=cube, + computeqgrasppose=computeqgrasppose, + num_iter=resolved_num_iter, + discretisation_steps=resolved_discretisation, + goal_bias=resolved_goal_bias, + viz=viz_obj, + viz_delay=args.viz_delay, + max_ik_attempts=resolved_max_ik_attempts, + # forward wall-time + # Note: computepath forwards this into construct_rrt/RRT.run + max_time_s=resolved_max_time, + ) + + # optionally playback the final path. If --playback is set we show the + # final trajectory even when planning was run headless with --no-viz. + if path and (args.playback or not args.no_viz): + # ensure visualiser is ready (short pause) then play back slowly + try: + time.sleep(0.5) + displaypath(robot, cube, path, cube_placements, dt=0.5, viz=viz) + except Exception: + # Best-effort: if viz isn't available or playback fails, continue. + logger.exception('Playback failed') + + if hasattr(rrt := __import__('rrt'), 'rrt'): + # noop — ensure rrt module imported so metrics are available in namespace + pass + + # if rrt metrics exist log them + try: + import rrt as _rrt + if hasattr(_rrt, 'RRT') and hasattr(_rrt, 'construct_rrt'): + # rrt.metrics is attached to the returned rrt instance; cannot access it here unless we returned it + pass + except Exception: + pass + + print('Done') diff --git a/qp_utils.py b/qp_utils.py new file mode 100644 index 0000000..7098b7f --- /dev/null +++ b/qp_utils.py @@ -0,0 +1,188 @@ +import numpy as np +import quadprog +from typing import List, Dict, Tuple + +def assign_time_to_intermediate_points(path: List[np.array]) -> Dict[int, float]: + # assign time to each point in the path + # so we can use it in QP cost function + # compute time proportional to distance between points + + total_dist = 0 + point_idx_time = {} + + for i in range(1, len(path)-1): + dist = np.linalg.norm(path[i] - path[i-1]) + total_dist += dist + point_idx_time[i] = dist + + total_dist += np.linalg.norm(path[-1] - path[-2]) + norm_dist = np.cumsum(list(point_idx_time.values()) / total_dist) + for point_idx, time in point_idx_time.items(): + point_idx_time[point_idx] = norm_dist[point_idx-1] + + return point_idx_time + + +def quadprog_solve_qp(H, q, G=None, h=None, C=None, d=None, verbose=False): + """ + min (1/2)x' P x + q' x + subject to G x <= h + subject to C x = d + """ + qp_G = 0.5 * (H + H.T) # make sure P is symmetric + qp_a = -q + qp_C = None + qp_b = None + meq = 0 + if C is not None: + if G is not None: + qp_C = -np.vstack([C, G]).T + qp_b = -np.hstack([d, h]) + else: + qp_C = -C.transpose() + qp_b = -d + meq = C.shape[0] + elif G is not None: # no equality constraint + qp_C = -G.T + qp_b = -h + res = quadprog.solve_qp(qp_G, qp_a, qp_C, qp_b, meq) + if verbose: + return res + return res[0] + + +def bernstein_deg4(t: float) -> np.array: + """Quartic Bernstein basis [B0..B4] at scalar t.""" + # as the equation looks like this + # p(t) = (1-t)^4*p0 + 4(1-t)^3*t*p1 + 6(1-t)^2*t^2*p2 + 4(1-t)*t^3*p3 + t^4*p4 + # so we need to compute the coefficients for each term + # B0 = (1-t)^4 + # B1 = 4*(1-t)^3*t + # B2 = 6*(1-t)^2*t^2 + # B3 = 4*(1-t)*t^3 + # B4 = t^4 + tt = t + omt = 1.0 - tt + B0 = omt**4 + B1 = 4 * omt**3 * tt + B2 = 6 * omt**2 * tt**2 + B3 = 4 * omt * tt**3 + B4 = tt**4 + return np.array([B0, B1, B2, B3, B4], dtype=np.float64) + +def build_H_q_quartic_bezier( + q0: np.array, + qe: np.array, + t_samples: np.array, + g_samples: np.array, + l2_reg: float = 0.0 +) -> Tuple[np.array, np.array]: + + + """ + t_samples: (N,) in [0,1] + g_samples: (N, 15) targets + Returns H (45x45), q (45,) + """ + + ## p(t)=P0​B0​ + (P1​B1 ​+ P2​B2 ​+ P3​B3 ​) + P4​B4 + ## need to be p(t) = Ax + b + ## where b = P0​B0​ + P4​B4 + + ## variables are [P1_dimensions, P2_dimensions, P3_dimensions] + ## and A = [ + ## [B1, 0, 0, 0, ...., B2, 0, 0, 0, ...., B3, 0, 0, 0,0...], + ## [0, B1, 0, 0, ...., 0, B2, 0, 0, ...., 0, B3, 0, 0, ...., 0], + ## [0, 0, B1, 0, ...., 0, 0, B2, 0, ...., 0, 0, B3, 0, ...., 0], + ## ..... + ## [0, 0, 0, B1, ...., 0, 0, 0, B2, ...., 0, 0, 0, B3, ...., 0], + ## [0, 0, 0, 0, ...., B3, 0, 0, 0, ...., 0, 0, 0, 0, ...., B3] + ## ] + + # we need to compute the coefficients for each term + t_samples = np.asarray(t_samples, dtype=np.float64).ravel() + g_samples = np.asarray(g_samples, dtype=np.float64) + assert g_samples.shape == (t_samples.size, 15) + + d = 15 + n_ctrl = 3 + nvars = d * n_ctrl + + H = np.zeros((nvars, nvars), dtype=np.float64) + q = np.zeros(nvars, dtype=np.float64) + + # samples in the computed trajectory we want to fit + # Should look like this + # ||p(t) - g(t)||^2 + # ||A x + b - g(t)||^2 + # ||A x - (g(t) - b)||^2 + # 1/2 x.T A.T A x + (-A.T (g(t) - b)) x + # 1/2 x.T A.T A x + (A.T (b - g(t))) x + + for ti, gi in zip(t_samples, g_samples): + B = bernstein_deg4(ti) + b = B[0] * q0 + B[-1] * qe + A_i = np.concatenate([np.eye(d) * b for b in B[1:-1]], axis=1)# (15, 45): per-dim applies same B across control blocks + H += A_i.T @ A_i # accumulate quadratic term + q += A_i.T @ (b - gi) # accumulate linear term + + if l2_reg > 0.0: + # Optional small ridge for PD Hessian (helps quadprog) + H += 2.0 * l2_reg * np.eye(nvars, dtype=np.float64) + + return H, q + + +def accel_equalities_known_endpoints( + q0: np.array, + qe: np.array +) -> Tuple[np.array, np.array]: + d = 15 + C_start = np.concatenate([np.eye(d) * i for i in [-2., 1., 0.]], axis=1) # P2 - 2P1 = -P0 + C_end = np.concatenate([np.eye(d) * i for i in [ 0., 1., -2.]], axis=1) # P2 - 2P3 = -P4 + C = np.vstack([C_start, C_end]) # (30,45) + dvec = np.concatenate([-q0, -qe]) # (30,) + return C, dvec + + +def construct_collision_constraints( + discretization_steps, + distance_to_obstacle_function, + distance_to_obstacle_threshold): + + ## we need to construct inequality constraints for the collision avoidance + ## form of inequality constraint is + ## distance_to_obstacle_function(x) <= distance_to_obstacle_threshold + ## distance_function(p(t)) <= distance_to_obstacle_threshold + ## distance_function(Ax+b) <= distance_to_obstacle_threshold + + pass + + +def get_bezier_control_points( + path: List[np.array]) -> List[np.array]: + + # assign specific times to intermediate points + # this is based on the distance between points in the path + point_times = assign_time_to_intermediate_points(path) + + q0 = path[0] + qe = path[-1] + + t_samples = np.array(list(point_times.values())) + g_samples = path[1:-1] + + l2_reg = 1e-6 + + H, q = build_H_q_quartic_bezier(q0, qe, t_samples, g_samples, l2_reg=l2_reg) + C, d = accel_equalities_known_endpoints(q0, qe) + res = quadprog_solve_qp(H, q, C=C, d=d) + + if res is None or res.size == 0: + return + + control_points = [] + for i in range(0, len(res), 15): + control_points.append(res[i:i+15]) + + return [q0] + control_points + [qe] diff --git a/rrt.py b/rrt.py new file mode 100644 index 0000000..58f2a3b --- /dev/null +++ b/rrt.py @@ -0,0 +1,278 @@ +#!/usr/bin/env python3 +"""Standard RRT implementation for cube placement planning. + +This module contains a focused RRT implementation that works with cube +placements (pin.SE3) and optional grasping-pose callbacks. The implementation +is intentionally minimal and readable. +""" +from typing import Callable, Tuple, List, Optional + +import logging +from inverse_geometry import computeqgrasppose +import numpy as np +import pinocchio as pin +from pinocchio.utils import rotate + +from constants import EDGE_DISTANCE_TOL, DEFAULT_DISCRETISATION_STEPS, MIN_DIST_TO_OBS +from tools import setcubeplacement, distanceToObstacle +from constants import DEFAULT_VIZ_DELAY +import time + +logger = logging.getLogger(__name__) + + +from dataclasses import dataclass + + +def distance_to_obstacle_only(robot, cube, q): + '''Return the shortest distance between robot and the obstacle. ''' + geomidobs = robot.collision_model.getGeometryId('baseLink_0') + pairs = [i for i, pair in enumerate(robot.collision_model.collisionPairs) if pair.second == geomidobs] + pin.framesForwardKinematics(robot.model,robot.data,q) + pin.updateGeometryPlacements(robot.model,robot.data,robot.collision_model,robot.collision_data,q) + dists = [pin.computeDistance(robot.collision_model, robot.collision_data, idx).min_distance for idx in pairs] + + return min(dists) + + +@dataclass +class Vertex: + q: pin.SE3 + grasping_q: np.ndarray + parent: Optional['Vertex'] = None + + +class RRT: + """A small, explicit RRT tree that stores vertices of cube placements. + + Attributes: + nodes: list of Vertex objects, root at index 0. + """ + + def __init__(self, + root_q: pin.SE3, + root_grasping_q: Optional[np.ndarray], + robot: pin.robot_wrapper.RobotWrapper, + cube: pin.robot_wrapper.RobotWrapper, + get_grasping_poseq: Optional[Callable] = None, + viz=None, + viz_delay: float = DEFAULT_VIZ_DELAY, + discretisation_steps: int = DEFAULT_DISCRETISATION_STEPS, + max_ik_attempts: Optional[int] = None, + # Run-time parameters (moved into the instance so run() needs no args) + num_iter: int = 1000, + random_sampler: Optional[Callable] = None, + max_delta_q: Optional[float] = None, + cubeplacementqgoal: Optional[pin.SE3] = None, + q_goal: Optional[np.ndarray] = None, + max_time_s: Optional[float] = None): + + self.robot = robot + self.cube = cube + self.get_grasping_poseq = get_grasping_poseq + + self.nodes: List[Vertex] = [Vertex(q=root_q, grasping_q=root_grasping_q, parent=None)] + # store common runtime parameters on the RRT instance so methods don't + # need to be passed them repeatedly + self.viz = viz + self.viz_delay = viz_delay + self.discretisation_steps = discretisation_steps + self.max_ik_attempts = max_ik_attempts + # store run-time configuration so run() requires no arguments + self.num_iter = num_iter + self.random_sampler = random_sampler + self.max_delta_q = max_delta_q + self.cubeplacementqgoal = cubeplacementqgoal + self.q_goal = q_goal + self.max_time_s = max_time_s + # profiling counters for IK, viz and edge checks + + @staticmethod + def calc_distance(q1, q2) -> float: + """Euclidean distance between two translations or SE3s.""" + if hasattr(q1, 'translation'): + t1 = np.array(q1.translation) + else: + t1 = np.array(q1) + + if hasattr(q2, 'translation'): + t2 = np.array(q2.translation) + else: + t2 = np.array(q2) + + return np.linalg.norm(t2 - t1) + + @staticmethod + def lerp(q0, q1, t: float): + """Linear interpolation of translations and return a pin.SE3 placement.""" + if hasattr(q0, 'translation'): + a = np.array(q0.translation) + else: + a = np.array(q0) + + if hasattr(q1, 'translation'): + b = np.array(q1.translation) + else: + b = np.array(q1) + + trans = (1 - t) * a + t * b + return pin.SE3(rotate('z', 0.0), trans) + + def add_edge(self, parent_idx: int, new_q: pin.SE3, new_grasping_q: Optional[np.ndarray]) -> None: + self.nodes.append(Vertex(q=new_q, grasping_q=new_grasping_q, parent=self.nodes[parent_idx])) + + def get_nearest_vertex(self, q: pin.SE3) -> Tuple[int, Vertex]: + nearest_idx = 0 + smallest = float('inf') + for i, v in enumerate(self.nodes): + dist = self.calc_distance(v.q, q) + if dist < smallest: + smallest = dist + nearest_idx = i + return nearest_idx, self.nodes[nearest_idx] + + def get_q_new(self, + q_nearest_vertex: Vertex, + q_rand: pin.SE3, + max_delta_q: Optional[float] = None) -> Tuple[pin.SE3, Optional[np.ndarray], bool]: + + q_new_found = True + q_end = q_rand + + dist = self.calc_distance(q_nearest_vertex.q, q_rand) + if max_delta_q is not None and dist > max_delta_q: + q_end = self.lerp(q_nearest_vertex.q, q_rand, max_delta_q / dist) + + grasping_q = None + if self.get_grasping_poseq is not None: + # allow caller to override discretisation for this call; otherwise use instance default + dt = 1.0 / self.discretisation_steps + # seed IK with the nearest vertex grasping pose and then update the seed + seed_grasping_q = q_nearest_vertex.grasping_q + for i in range(1, self.discretisation_steps): + q = self.lerp(q_nearest_vertex.q, q_end, dt * i) + # optionally update visualization so user sees the robot attempt + if self.viz is not None: + t_v = time.perf_counter() + setcubeplacement(self.robot, self.cube, q) + q_disp = getattr(self.robot, 'q', None) + if q_disp is not None: + self.viz.display(q_disp) + else: + if hasattr(self.viz, 'render'): + self.viz.render() + dt_v = time.perf_counter() - t_v + time.sleep(self.viz_delay) + + # Use the IK wrapper to call the IK callback in a + # backwards-compatible way and time the call. We set + # viz_sleep=False so display calls (if any) don't sleep. + grasping_q, found = computeqgrasppose( + robot=self.robot, + qcurrent=seed_grasping_q, + cube=self.cube, + cubetarget=q, + viz=(self.viz if self.viz is not None else None), + max_attempts=(self.max_ik_attempts if self.max_ik_attempts is not None else 1000), + viz_sleep=False, + ) + if found and grasping_q is not None: + seed_grasping_q = grasping_q + if not found or distance_to_obstacle_only(self.robot, self.cube, grasping_q) < MIN_DIST_TO_OBS: + if i - 1 == 0: + q_new_found = False + return q_nearest_vertex.q, q_nearest_vertex.grasping_q, False + return self.lerp(q_nearest_vertex.q, q_end, dt * (i - 1)), grasping_q, True + return q_end, grasping_q, q_new_found + + def check_edge( + self, + q_latest_new: pin.SE3, + q_latest_new_grasping_pose: Optional[np.ndarray], + q_goal: pin.SE3 + ) -> bool: + dummy_vertex = Vertex(q=q_latest_new, grasping_q=q_latest_new_grasping_pose, parent=None) + q_new, q_new_grasping_q, q_new_found = self.get_q_new( + q_nearest_vertex=dummy_vertex, + q_rand=q_goal, + max_delta_q=None, + ) + if not q_new_found: + return False + distance_to_target = self.calc_distance(q_new, q_goal) + return distance_to_target < EDGE_DISTANCE_TOL + + def get_path_from_rrt(self) -> List[Vertex]: + path: List[Vertex] = [] + last = self.nodes[-1] + while last is not None: + path.insert(0, last) + last = last.parent + return path + + def get_shortcut_from_rrt_path(self) -> List[Vertex]: + path_vertices = self.get_path_from_rrt() + if len(path_vertices) <= 2: + return path_vertices + + short_path: List[Vertex] = [] + start_idx = 0 + while start_idx < len(path_vertices) - 1: + current_vertex = path_vertices[start_idx] + found_idx = None + for next_idx in range(len(path_vertices) - 1, start_idx, -1): + if self.check_edge(current_vertex.q, current_vertex.grasping_q, path_vertices[next_idx].q): + found_idx = next_idx + break + if found_idx is None: + # no shortcut found; step by one + short_path.append(current_vertex) + start_idx += 1 + else: + short_path.append(current_vertex) + start_idx = found_idx + + # always append the final vertex + if path_vertices: + short_path.append(path_vertices[-1]) + return short_path + + def run(self) -> Optional['RRT']: + """Grow the RRT using the same algorithm formerly in construct_rrt. + + This is an instance method so callers can instantiate RRT(...) with + desired runtime parameters and then run the search. Returns self on + success, or None on timeout/failure. + """ + # Metrics for logging + total_samples = 0 + successful_extensions = 0 + start_time = time.perf_counter() + + for i in range(self.num_iter): + # Abort the run if we've exceeded a user-specified wall-time budget + if self.max_time_s is not None: + elapsed_now = time.perf_counter() - start_time + if elapsed_now > self.max_time_s: + print("Stopped because of time") + return None + + total_samples += 1 + random_cp_q = self.random_sampler() + + nearest_idx, nearest_vertex = self.get_nearest_vertex(random_cp_q) + + cp_q_new, cp_q_new_grasping_pose, cp_q_new_found = self.get_q_new( + q_nearest_vertex=nearest_vertex, + q_rand=random_cp_q, + max_delta_q=self.max_delta_q, + ) + + if cp_q_new_found: + self.add_edge(parent_idx=nearest_idx, new_q=cp_q_new, new_grasping_q=cp_q_new_grasping_pose) + + if self.check_edge(cp_q_new, cp_q_new_grasping_pose, self.cubeplacementqgoal): + self.add_edge(parent_idx=len(self.nodes) - 1, new_q=self.cubeplacementqgoal, new_grasping_q=self.q_goal) + return self + + return None diff --git a/tools.py b/tools.py index 9ad021c..6c94e79 100644 --- a/tools.py +++ b/tools.py @@ -26,14 +26,14 @@ def collision(robot, q): '''Return true if in collision, false otherwise.''' pin.updateGeometryPlacements(robot.model,robot.data,robot.collision_model,robot.collision_data,q) # if pin.computeCollisions(robot.collision_model,robot.collision_data,False): - # for k in range(len(robot.collision_model.collisionPairs)): + # for k in range(len(robot.collision_model.collisionPairs)): # cr = robot.collision_data.collisionResults[k] # cp = robot.collision_model.collisionPairs[k] # if cr.isCollision(): # print("collision pair:",robot.collision_model.geometryObjects[cp.first].name,",",robot.collision_model.geometryObjects[cp.second].name,"- collision:","Yes" if cr.isCollision() else "No") - + return pin.computeCollisions(robot.collision_model,robot.collision_data,False) - + def distanceToObstacle(robot, q): '''Return the shortest distance between robot and the obstacle. ''' geomidobs = robot.collision_model.getGeometryId('obstaclebase_0') @@ -41,8 +41,8 @@ def distanceToObstacle(robot, q): pairs = [i for i, pair in enumerate(robot.collision_model.collisionPairs) if pair.second == geomidobs or pair.second == geomidtable] pin.framesForwardKinematics(robot.model,robot.data,q) pin.updateGeometryPlacements(robot.model,robot.data,robot.collision_model,robot.collision_data,q) - dists = [pin.computeDistance(robot.collision_model, robot.collision_data, idx).min_distance for idx in pairs] - + dists = [pin.computeDistance(robot.collision_model, robot.collision_data, idx).min_distance for idx in pairs] + # pairsId = [pair.first for i, pair in enumerate(robot.collision_model.collisionPairs) if pair.second == geomidobs or pair.second == geomidtable] # names = [robot.collision_model.geometryObjects[idx].name for idx in pairsId ] # for name, dist in zip(names,dists): @@ -50,27 +50,27 @@ def distanceToObstacle(robot, q): # print(min (dists)) return min(dists) - + def getcubeplacement(cube, hookname = None): oMf = cube.collision_model.geometryObjects[0].placement if hookname is not None: frameid = cube.model.getFrameId(hookname) - oMf *= cube.data.oMf[frameid] + oMf *= cube.data.oMf[frameid] return oMf - + def setcubeplacement(robot, cube, oMf): q = cube.q0 robot.visual_model.geometryObjects[-1].placement = oMf robot.collision_model.geometryObjects[-1].placement = oMf cube.visual_model.geometryObjects[-1].placement = oMf - cube.collision_model.geometryObjects[0].placement = oMf + cube.collision_model.geometryObjects[0].placement = oMf pin.updateGeometryPlacements(cube.model,cube.data,cube.collision_model,cube.collision_data,q) - - + + from setup_pinocchio import setuppinocchio -from setup_meshcat import setupmeshcat +from setup_meshcat import setupmeshcat from config import MESHCAT_URL def setupwithmeshcat(url=MESHCAT_URL): @@ -78,27 +78,27 @@ def setupwithmeshcat(url=MESHCAT_URL): robot, table, obstacle, cube = setuppinocchio() viz = setupmeshcat(robot, url) return robot, cube, viz - + from setup_pybullet import setuppybullet def setupwithpybullet(): '''setups everything to work with the robot and pybullet''' - robot, table, obstacle, cube = setuppinocchio() + robot, table, obstacle, cube = setuppinocchio() sim = setuppybullet(robot) sim.setTorqueControlMode() return robot, sim, cube - - + + def setupwithpybulletandmeshcat(url=MESHCAT_URL): '''setups everything to work with the robot, pybullet AND meshcat''' - robot, table, obstacle, cube = setuppinocchio() + robot, table, obstacle, cube = setuppinocchio() viz = setupmeshcat(robot) sim = setuppybullet(robot) sim.setTorqueControlMode() return robot, sim, cube, viz - + import time - + def rununtil(f, t, *args, **kwargs): '''starts a timer, runs a function f then waits until t seconds have passed since timer started''' t = time.perf_counter() @@ -107,4 +107,4 @@ def rununtil(f, t, *args, **kwargs): t+=0.001 while(time.perf_counter()-t<0.001): time.sleep(0.0001) # Weird loop for parallel execution (should not be needed in this project) - return result \ No newline at end of file + return result