Skip to content

Commit e97ae45

Browse files
authored
Improved termination phase for thread teardown (#56)
This commit fixes issues with the termination phase in the `MulticastMonitor` class. The Workflow is broken due to external actions and is going to be fixed in later commits.
1 parent d1b134c commit e97ae45

3 files changed

Lines changed: 65 additions & 15 deletions

File tree

capiocl/monitor.h

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,12 @@ class MulticastMonitor final : public MonitorInterface {
146146

147147
std::string MULTICAST_HOME_NODE_ADDR;
148148

149+
/// @brief variable to terminate execution
150+
bool terminate = false;
151+
152+
/// @brief Multicast poll timeout interval
153+
static constexpr int MULTICAST_THREAD_POLL_INTERVAL = 250;
154+
149155
/**
150156
* @brief Multicast port number.
151157
*/
@@ -183,10 +189,10 @@ class MulticastMonitor final : public MonitorInterface {
183189
* @param lock Mutex protecting shared access to committed_files.
184190
* @param ip_addr Multicast commit listen address.
185191
* @param ip_port Multicast commit listen port.
192+
* @param terminate Boolean flag to terminate thread
186193
*/
187-
[[noreturn]] static void commit_listener(std::vector<std::string> &committed_files,
188-
std::mutex &lock, const std::string &ip_addr,
189-
int ip_port);
194+
static void commit_listener(std::vector<std::string> &committed_files, std::mutex &lock,
195+
const std::string &ip_addr, int ip_port, const bool *terminate);
190196

191197
/**
192198
* @brief Background thread function to listen for commit messages.
@@ -200,9 +206,9 @@ class MulticastMonitor final : public MonitorInterface {
200206
* @param ip_addr Multicast home node listen address.
201207
* @param ip_port Multicast home node listen port.
202208
*/
203-
[[noreturn]] static void
204-
home_node_listener(std::unordered_map<std::string, std::string> &home_nodes, std::mutex &lock,
205-
const std::string &ip_addr, int ip_port);
209+
static void home_node_listener(std::unordered_map<std::string, std::string> &home_nodes,
210+
std::mutex &lock, const std::string &ip_addr, int ip_port,
211+
const bool *terminate);
206212

207213
public:
208214
/**

src/monitors/Multicast.cpp

Lines changed: 51 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#include <arpa/inet.h>
22
#include <netinet/in.h>
3+
#include <poll.h>
34
#include <sys/socket.h>
45

56
#include "capiocl.hpp"
@@ -74,19 +75,37 @@ static int incoming_socket_multicast(const std::string &address_ip, const int po
7475
return _socket;
7576
}
7677

77-
[[noreturn]] void
78-
capiocl::monitor::MulticastMonitor::commit_listener(std::vector<std::string> &committed_files,
79-
std::mutex &lock, const std::string &ip_addr,
80-
const int ip_port) {
78+
void capiocl::monitor::MulticastMonitor::commit_listener(std::vector<std::string> &committed_files,
79+
std::mutex &lock,
80+
const std::string &ip_addr,
81+
const int ip_port, const bool *terminate) {
82+
pthread_setcancelstate(PTHREAD_CANCEL_ASYNCHRONOUS, nullptr);
8183
sockaddr_in addr_in = {};
8284
socklen_t addr_len = {};
8385
const auto socket = incoming_socket_multicast(ip_addr, ip_port, addr_in, addr_len);
8486
const auto addr = reinterpret_cast<sockaddr *>(&addr_in);
8587
char incoming_message[MESSAGE_SIZE] = {0};
8688

89+
// Polling for non blocking
90+
pollfd pfd = {};
91+
pfd.fd = socket;
92+
pfd.events = POLLIN | POLLPRI;
93+
8794
do {
8895
bzero(incoming_message, sizeof(incoming_message));
8996

97+
// TODO: migrate to epoll for linux and kqueue on MacOS
98+
if (poll(&pfd, 1, MULTICAST_THREAD_POLL_INTERVAL) == 0) {
99+
// No data from incoming socket. Continue, awaking thread ensuring pthread_cancel points
100+
// can be reached
101+
if (*terminate) {
102+
close(socket);
103+
return;
104+
}
105+
106+
continue;
107+
}
108+
90109
// LCOV_EXCL_START
91110
if (recvfrom(socket, incoming_message, MESSAGE_SIZE, 0, addr, &addr_len) < 0) {
92111
continue;
@@ -113,9 +132,11 @@ capiocl::monitor::MulticastMonitor::commit_listener(std::vector<std::string> &co
113132
} while (true);
114133
}
115134

116-
[[noreturn]] void capiocl::monitor::MulticastMonitor::home_node_listener(
135+
void capiocl::monitor::MulticastMonitor::home_node_listener(
117136
std::unordered_map<std::string, std::string> &home_nodes, std::mutex &lock,
118-
const std::string &ip_addr, int ip_port) {
137+
const std::string &ip_addr, int ip_port, const bool *terminate) {
138+
pthread_setcancelstate(PTHREAD_CANCEL_ASYNCHRONOUS, nullptr);
139+
119140
char this_hostname[HOST_NAME_MAX] = {};
120141
gethostname(this_hostname, HOST_NAME_MAX);
121142

@@ -129,6 +150,23 @@ capiocl::monitor::MulticastMonitor::commit_listener(std::vector<std::string> &co
129150
do {
130151
bzero(incoming_message, sizeof(incoming_message));
131152

153+
// Polling for non blocking
154+
pollfd pfd = {};
155+
pfd.fd = socket;
156+
pfd.events = POLLIN | POLLPRI;
157+
158+
// TODO: migrate to epoll for linux and kqueue on MacOS
159+
if (poll(&pfd, 1, MULTICAST_THREAD_POLL_INTERVAL) == 0) {
160+
// No data from incoming socket. Continue, awaking thread ensuring pthread_cancel points
161+
// can be reached
162+
if (*terminate) {
163+
close(socket);
164+
return;
165+
}
166+
167+
continue;
168+
}
169+
132170
// LCOV_EXCL_START
133171
if (recvfrom(socket, incoming_message, MESSAGE_SIZE, 0, addr, &addr_len) < 0) {
134172
continue;
@@ -193,19 +231,23 @@ capiocl::monitor::MulticastMonitor::MulticastMonitor(
193231

194232
commit_thread =
195233
std::thread(&commit_listener, std::ref(_committed_files), std::ref(committed_lock),
196-
MULTICAST_COMMIT_ADDR, MULTICAST_COMMIT_PORT);
234+
MULTICAST_COMMIT_ADDR, MULTICAST_COMMIT_PORT, &this->terminate);
197235

198236
home_node_thread =
199237
std::thread(&home_node_listener, std::ref(_home_nodes), std::ref(home_node_lock),
200-
MULTICAST_HOME_NODE_ADDR, MULTICAST_HOME_NODE_PORT);
238+
MULTICAST_HOME_NODE_ADDR, MULTICAST_HOME_NODE_PORT, &this->terminate);
201239

202240
gethostname(_hostname, HOST_NAME_MAX);
203241
}
204242

205243
capiocl::monitor::MulticastMonitor::~MulticastMonitor() {
244+
245+
terminate = true;
246+
206247
pthread_cancel(commit_thread.native_handle());
207-
pthread_cancel(home_node_thread.native_handle());
208248
commit_thread.join();
249+
250+
pthread_cancel(home_node_thread.native_handle());
209251
home_node_thread.join();
210252
}
211253

src/webapi.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ void process_get_request(const Req &req, Res &res, Fn &&handler) {
5050
/// @brief Main WebServer thread function
5151
void server(const std::string &address, const int port, capiocl::engine::Engine *engine) {
5252

53+
pthread_setcancelstate(PTHREAD_CANCEL_ASYNCHRONOUS, nullptr);
54+
5355
capiocl::printer::print(capiocl::printer::CLI_LEVEL_INFO,
5456
"Starting API server @ " + address + ":" + std::to_string(port));
5557

0 commit comments

Comments
 (0)