Skip to content

Commit 6322048

Browse files
subt: add rospy+zmq based communication with cloudsim (#523)
general changes: * cloudsim2osgar.py: - use `rospy` to get data from ROS - serialize using `msgpack` - send over `zmq.PUSH` socket as multipart message (channel, data) * pull.py: - receive multipart (channel, data) message over `zmq.PULL` socket - republish on osgar channel advantages * no hand-parsing of ROS serialization format * logging or compression can be set per channel * allows multiple `zmq.PUSH` nodes using channel for identification specific changes: * cloudsim2osgar.py + ros_proxy_node.cc: move `rot`, `orientation` and `acc` from `cc` to `py`
1 parent 4924cf4 commit 6322048

7 files changed

Lines changed: 177 additions & 22 deletions

File tree

subt/cloudsim2osgar.py

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
#!/usr/bin/env python2
2+
3+
import errno
4+
import math
5+
import socket
6+
import sys
7+
import threading
8+
import time
9+
10+
import rospy
11+
import zmq
12+
import msgpack
13+
14+
from sensor_msgs.msg import Imu
15+
16+
17+
def py3round(f):
18+
if abs(round(f) - f) == 0.5:
19+
return int(2.0 * round(f / 2.0))
20+
return int(round(f))
21+
22+
23+
def wait_for_master():
24+
"""Return when /use_sim_time is set in rosparams. If rospy.init_node is called before /use_sim_time
25+
is set, it uses real time."""
26+
print("Waiting for rosmaster")
27+
start = time.time()
28+
last_params = []
29+
while not rospy.is_shutdown():
30+
try:
31+
params = rospy.get_param_names()
32+
if params != last_params:
33+
print(time.time()-start, params)
34+
if '/use_sim_time' in params:
35+
return True
36+
last_params = params
37+
time.sleep(0.01)
38+
except socket.error as serr:
39+
if serr.errno != errno.ECONNREFUSED:
40+
raise serr # re-raise error if its not the one we want
41+
time.sleep(0.5)
42+
43+
class Bus:
44+
def __init__(self):
45+
self.lock = threading.Lock()
46+
context = zmq.Context.instance()
47+
self.push = context.socket(zmq.PUSH)
48+
self.push.setsockopt(zmq.LINGER, 100) # milliseconds
49+
self.push.bind('tcp://*:5565')
50+
51+
def register(self, *outputs):
52+
pass
53+
54+
def publish(self, channel, data):
55+
raw = msgpack.packb(data, use_bin_type=True)
56+
with self.lock:
57+
self.push.send_multipart([channel, raw])
58+
59+
60+
class main:
61+
def __init__(self):
62+
# get cloudsim ready
63+
robot_name = sys.argv[1]
64+
imu_name = '/'+robot_name+'/imu/data'
65+
wait_for_master()
66+
rospy.init_node('imu2osgar', log_level=rospy.DEBUG)
67+
rospy.loginfo("waiting for {}".format(imu_name))
68+
rospy.wait_for_message(imu_name, Imu)
69+
rospy.sleep(2)
70+
self.imu_count = 0
71+
72+
# start
73+
self.bus = Bus()
74+
self.bus.register('rot', 'acc', 'orientation')
75+
rospy.Subscriber(imu_name, Imu, self.imu)
76+
rospy.spin()
77+
78+
def imu(self, msg):
79+
self.imu_count += 1
80+
rospy.loginfo_throttle(10, "imu callback: {}".format(self.imu_count))
81+
acc = [msg.linear_acceleration.x, msg.linear_acceleration.y, msg.linear_acceleration.z]
82+
orientation = [msg.orientation.x, msg.orientation.y, msg.orientation.z, msg.orientation.w]
83+
84+
# copy & paste from rosmsg
85+
q0, q1, q2, q3 = orientation # quaternion
86+
x = math.atan2(2 * (q0 * q1 + q2 * q3), 1 - 2 * (q1 * q1 + q2 * q2))
87+
y = math.asin(2 * (q0 * q2 - q3 * q1))
88+
z = math.atan2(2 * (q0 * q3 + q1 * q2), 1 - 2 * (q2 * q2 + q3 * q3))
89+
rot = [x, y, z]
90+
# end of copy & paste from rosmsg
91+
92+
self.bus.publish('rot', [py3round(math.degrees(angle) * 100) for angle in rot])
93+
self.bus.publish('acc', [py3round(x * 1000) for x in acc])
94+
self.bus.publish('orientation', orientation)
95+
# preliminary suggestion for combined message
96+
#angular_velocity = [msg.angular_velocity.x, msg.angular_velocity.y, msg.angular_velocity.z]
97+
#data = [
98+
# msg.header.stamp.to_nsec()/1000000, # time in milliseconds
99+
# orientation,
100+
# angular_velocity,
101+
# acc,
102+
#]
103+
#self.bus.publish('imu', data)
104+
105+
106+
if __name__ == '__main__':
107+
if len(sys.argv) < 2:
108+
print("need robot name as argument")
109+
raise SystemExit(1)
110+
main()
111+

subt/docker/base/Dockerfile

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,3 +97,10 @@ RUN ./env/bin/pip install --no-cache-dir \
9797

9898
ENV NVIDIA_VISIBLE_DEVICES ${NVIDIA_VISIBLE_DEVICES:-all}
9999
ENV NVIDIA_DRIVER_CAPABILITIES ${NVIDIA_DRIVER_CAPABILITIES:+$NVIDIA_DRIVER_CAPABILITIES,}graphics,compat32,utility
100+
101+
RUN apt-get update \
102+
&& DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends python-pip python-wheel \
103+
&& apt-get clean \
104+
&& pip2 install --no-cache-dir --user \
105+
"msgpack==1.0.0" \
106+
"pyzmq==19.0.0"

subt/docker/robotika/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
FROM robotika/subt-base:2020-05-18
1+
FROM robotika/subt-base:2020-05-26
22

33
RUN sudo apt-get update && sudo apt install -y ros-melodic-teleop-twist-keyboard
44

subt/docker/robotika/run_solution.bash

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ echo "Starting ros<->osgar proxy"
4343
# http://wiki.ros.org/roslaunch/Commandline%20Tools#line-45
4444
roslaunch $LAUNCH_FILE --wait robot_name:=$ROBOT_NAME &
4545

46+
/osgar-ws/src/osgar/subt/cloudsim2osgar.py $ROBOT_NAME &
47+
4648
echo "Starting osgar"
4749
export OSGAR_LOGS=/osgar-ws/logs
4850
/osgar-ws/env/bin/python3 -m subt run /osgar-ws/src/osgar/subt/$CONFIG_FILE --side auto --walldist 0.8 --timeout 100 --speed 1.0 --note "run_solution.bash"

subt/pull.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
import contextlib
2+
3+
from threading import Thread
4+
5+
import zmq
6+
7+
from osgar.bus import BusShutdownException
8+
import osgar.lib.serialize
9+
10+
class Pull:
11+
12+
def __init__(self, config, bus):
13+
bus.register(*config['outputs'])
14+
self.endpoint = config.get('endpoint', 'tcp://127.0.0.1:5565')
15+
self.timeout = config.get('timeout', 1) # default recv timeout 1s
16+
self.thread = Thread(target=self.run)
17+
self.thread.name = bus.name
18+
self.bus = bus
19+
20+
def start(self):
21+
self.thread.start()
22+
23+
def join(self, timeout=None):
24+
self.thread.join(timeout=timeout)
25+
26+
def run(self):
27+
context = zmq.Context.instance()
28+
socket = context.socket(zmq.PULL)
29+
# https://stackoverflow.com/questions/7538988/zeromq-how-to-prevent-infinite-wait
30+
socket.RCVTIMEO = int(self.timeout * 1000) # convert to milliseconds
31+
socket.connect(self.endpoint)
32+
33+
with contextlib.closing(socket):
34+
while self.bus.is_alive():
35+
try:
36+
channel, raw = socket.recv_multipart()
37+
message = osgar.lib.serialize.deserialize(raw)
38+
self.bus.publish(channel.decode('ascii'), message)
39+
except zmq.error.Again:
40+
pass
41+
42+
def request_stop(self):
43+
self.bus.shutdown()
44+
45+

subt/ros/proxy/ros_proxy_node.cc

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,14 @@
1818
// Goal:
1919
// - handle all artifacts, ROS communication
2020
// - send speed commands
21-
// - redirect IMU, Odom, Scan and Image messages to Python3 (via ZeroMQ?)
21+
// - redirect Odom, Scan and Image messages to Python3 (via ZeroMQ?)
2222

2323
#include <chrono>
2424
#include <thread>
2525
#include <fstream>
2626

2727
#include <geometry_msgs/Twist.h>
2828
#include <rosgraph_msgs/Clock.h>
29-
#include <sensor_msgs/Imu.h>
3029
#include <sensor_msgs/LaserScan.h>
3130
#include <sensor_msgs/CompressedImage.h>
3231
#include <nav_msgs/Odometry.h>
@@ -56,7 +55,6 @@
5655

5756

5857
int g_countClock = 0;
59-
int g_countImu = 0;
6058
int g_countScan = 0;
6159
int g_countImage = 0;
6260
int g_countOdom = 0;
@@ -88,15 +86,6 @@ void clockCallback(const rosgraph_msgs::Clock::ConstPtr& msg)
8886
g_countClock++;
8987
}
9088

91-
void imuCallback(const sensor_msgs::Imu::ConstPtr& msg)
92-
{
93-
ros::SerializedMessage sm = ros::serialization::serializeMessage(*msg);
94-
zmq_send(g_responder, sm.buf.get(), sm.num_bytes, 0);
95-
if(g_countImu % 100 == 0)
96-
ROS_INFO("received Imu %d ", g_countImu);
97-
g_countImu++;
98-
}
99-
10089
void scanCallback(const sensor_msgs::LaserScan::ConstPtr& msg)
10190
{
10291
ros::SerializedMessage sm = ros::serialization::serializeMessage(*msg);
@@ -219,7 +208,6 @@ class Controller
219208
subt_msgs::PoseFromArtifact originSrv;
220209

221210
ros::Subscriber subClock;
222-
ros::Subscriber subImu;
223211
ros::Subscriber subScan;
224212
ros::Subscriber subImage;
225213
ros::Subscriber subImage2; // workaround for two identical topics with different name
@@ -283,9 +271,6 @@ Controller::Controller(const std::string &_name)
283271
ros::service::waitForService("/subt/pose_from_artifact_origin", -1);
284272

285273
// Waiting from some message related to robot so that we know the robot is already in the simulation
286-
ROS_INFO_STREAM("Waiting for " << this->name << "/imu/data");
287-
ros::topic::waitForMessage<sensor_msgs::Imu>(this->name + "/imu/data", this->n);
288-
289274
ROS_INFO_STREAM("Waiting for " << this->name << "/front/depth");
290275
ros::topic::waitForMessage<sensor_msgs::Image>(this->name + "/front/depth", this->n);
291276

@@ -354,7 +339,6 @@ void Controller::Update(const ros::TimerEvent&)
354339
"/robot_data", 1);
355340

356341
this->subClock = n.subscribe("/clock", 1000, clockCallback);
357-
this->subImu = n.subscribe(this->name + "/imu/data", 1000, imuCallback);
358342
this->subScan = n.subscribe(this->name + "/front_scan", 1000, scanCallback);
359343
this->subImage = n.subscribe(this->name + "/front/image_raw/compressed", 1000, imageCallback);
360344
this->subImage2 = n.subscribe(this->name + "/image_raw/compressed", 1000, imageCallback);

subt/zmq-subt-x2.json

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,12 @@
5959
"in": ["depth", "scan"],
6060
"out": ["scan"],
6161
"init": {}
62+
},
63+
"rospy": {
64+
"driver": "subt.pull:Pull",
65+
"init": {
66+
"outputs": ["rot", "acc", "orientation"]
67+
}
6268
}
6369
},
6470
"links": [["app.desired_speed", "rosmsg.desired_speed"],
@@ -69,10 +75,10 @@
6975
["receiver.raw", "rosmsg.raw"],
7076
["rosmsg.cmd", "transmitter.raw"],
7177

72-
["rosmsg.rot", "app.rot"],
73-
["rosmsg.rot", "depth2scan.rot"],
74-
["rosmsg.acc", "app.acc"],
75-
["rosmsg.orientation", "app.orientation"],
78+
["rospy.rot", "app.rot"],
79+
["rospy.rot", "depth2scan.rot"],
80+
["rospy.acc", "app.acc"],
81+
["rospy.orientation", "app.orientation"],
7682

7783
["rosmsg.scan", "detector.scan"],
7884
["rosmsg.scan", "depth2scan.scan"],

0 commit comments

Comments
 (0)