11#ifndef CAPIOCOMMUNICATIONSERVICE_HPP
22#define CAPIOCOMMUNICATIONSERVICE_HPP
33
4- #include " BackendInterface.hpp"
5- #include " MTCL_backend .hpp"
4+ #include " data_plane/ BackendInterface.hpp"
5+ #include " control_plane/capio_control_plane .hpp"
66
7- #include < algorithm>
8- #include < filesystem>
9-
10- #include < arpa/inet.h>
11- #include < netinet/in.h>
12- #include < sys/socket.h>
13- #include < sys/types.h>
14- #include < time.h>
7+ #include " control_plane/fs_control_plane.hpp"
8+ #include " control_plane/multicast_control_plane.hpp"
9+ #include " data_plane/MTCL_backend.hpp"
1510
1611class CapioCommunicationService {
1712
1813 char ownHostname[HOST_NAME_MAX] = {0 };
19- bool *continue_execution = new bool ;
20- std::thread *thread_server_finder_fs, *thread_server_finder_multicast;
21-
22- std::vector<std::string> token_used_to_connect;
23- std::mutex *token_used_to_connect_mutex;
24-
25- void generate_aliveness_token (const int port) const {
26- START_LOG (gettid (), " call(port=d)" , port);
27-
28- std::string token_filename (ownHostname);
29- token_filename += " .alive_connection" ;
30-
31- LOG (" Creating alive token %s" , token_filename.c_str ());
32-
33- std::ofstream FilePort (token_filename);
34- FilePort << port;
35- FilePort.close ();
36-
37- LOG (" Saved self token info to FS" );
38- std::cout << CAPIO_SERVER_CLI_LOG_SERVER << " [ " << ownHostname << " ] "
39- << " Generated token at " << token_filename << std::endl;
40- }
41-
42- void delete_aliveness_token () const {
43- START_LOG (gettid (), " call()" );
44-
45- std::string token_filename (ownHostname);
46- token_filename += " .alive_connection" ;
47- if (!std::filesystem::exists (token_filename)) {
48- LOG (" Token does not exists. Skipping delettion" );
49- return ;
50- }
51-
52- LOG (" Removing alive token %s" , token_filename.c_str ());
53- std::filesystem::remove (token_filename);
54- LOG (" Removed token" );
55- }
56-
57- static void send_multicast_alive_token (int backend_port) {
58- char hostname[HOST_NAME_MAX];
59- gethostname (hostname, HOST_NAME_MAX);
60-
61- int transmission_socket = socket (AF_INET, SOCK_DGRAM, 0 );
62- if (transmission_socket < 0 ) {
63- std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << hostname << " ] "
64- << " WARNING: unable to bind multicast socket: " << strerror (errno)
65- << std::endl;
66- std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << hostname << " ] "
67- << " Execution will continue only with FS discovery support" << std::endl;
68- return ;
69- }
70-
71- sockaddr_in addr = {};
72- addr.sin_family = AF_INET;
73- addr.sin_addr .s_addr = inet_addr (MULTICAST_DISCOVERY_ADDR);
74- addr.sin_port = htons (MULTICAST_DISCOVERY_PORT);
75-
76- char message[MULTICAST_ALIVE_TOKEN_MESSAGE_SIZE];
77- sprintf (message, " %s:%d" , hostname, backend_port);
78-
79- if (sendto (transmission_socket, message, strlen (message), 0 ,
80- reinterpret_cast <sockaddr *>(&addr), sizeof (addr)) < 0 ) {
81- std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << hostname << " ] "
82- << " WARNING: unable to send alive token(" << message
83- << " ) to multicast address!: " << strerror (errno) << std::endl;
84- }
85- close (transmission_socket);
86- }
87-
88- static void find_new_server_from_multicast_thread (
89- const bool *continue_execution, std::vector<std::string> *token_used_to_connect,
90- std::mutex *token_used_to_connect_mutex, int backend_port) {
91- char incomingMessage[MULTICAST_ALIVE_TOKEN_MESSAGE_SIZE];
92- char ownHostname[HOST_NAME_MAX];
93- gethostname (ownHostname, HOST_NAME_MAX);
94-
95- int discovery_socket = socket (AF_INET, SOCK_DGRAM, 0 );
96- if (discovery_socket < 0 ) {
97- std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << ownHostname << " ] "
98- << " WARNING: unable to open multicast socket: " << strerror (errno)
99- << std::endl;
100- std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << ownHostname << " ] "
101- << " Execution will continue only with FS discovery support" << std::endl;
102- return ;
103- }
104-
105- u_int multiple_socket_on_same_address = 1 ;
106- if (setsockopt (discovery_socket, SOL_SOCKET, SO_REUSEADDR,
107- (char *) &multiple_socket_on_same_address,
108- sizeof (multiple_socket_on_same_address)) < 0 ) {
109- std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << ownHostname << " ] "
110- << " WARNING: unable to assign multiple sockets to same address: "
111- << strerror (errno) << std::endl;
112- std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << ownHostname << " ] "
113- << " Execution will continue only with FS discovery support" << std::endl;
114- return ;
115- }
116-
117- struct sockaddr_in addr = {};
118- addr.sin_family = AF_INET;
119- addr.sin_addr .s_addr = htonl (INADDR_ANY);
120- addr.sin_port = htons (MULTICAST_DISCOVERY_PORT);
121- socklen_t addrlen = sizeof (addr);
122-
123- // bind to receive address
124- if (bind (discovery_socket, reinterpret_cast <struct sockaddr *>(&addr), sizeof (addr)) < 0 ) {
125-
126- std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << ownHostname << " ] "
127- << " WARNING: unable to bind multicast socket: " << strerror (errno)
128- << std::endl;
129- std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << ownHostname << " ] "
130- << " Execution will continue only with FS discovery support" << std::endl;
131- return ;
132- }
133-
134- struct ip_mreq mreq;
135- mreq.imr_multiaddr .s_addr = inet_addr (MULTICAST_DISCOVERY_ADDR);
136- mreq.imr_interface .s_addr = htonl (INADDR_ANY);
137- if (setsockopt (discovery_socket, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof (mreq)) < 0 ) {
138- std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << ownHostname << " ] "
139- << " WARNING: unable to join multicast group: " << strerror (errno)
140- << std::endl;
141- std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << ownHostname << " ] "
142- << " Execution will continue only with FS discovery support" << std::endl;
143- return ;
144- }
145-
146- while (*continue_execution) {
147- bzero (incomingMessage, sizeof (incomingMessage));
148- send_multicast_alive_token (backend_port);
149- if (recvfrom (discovery_socket, incomingMessage, MULTICAST_ALIVE_TOKEN_MESSAGE_SIZE, 0 ,
150- reinterpret_cast <sockaddr *>(&addr), &addrlen) < 0 ) {
151- std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << ownHostname << " ] "
152- << " WARNING: recvied < 0 bytes from multicast: " << strerror (errno)
153- << std::endl;
154- std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << ownHostname << " ] "
155- << " Execution will continue only with FS discovery support" << std::endl;
156- return ;
157- }
158-
159- if (std::string (incomingMessage) ==
160- std::string (ownHostname) + " :" + std::to_string (backend_port)) {
161- // skip myself
162- continue ;
163- }
164-
165- std::lock_guard lg (*token_used_to_connect_mutex);
166- if (std::find (token_used_to_connect->begin (), token_used_to_connect->end (),
167- incomingMessage) == token_used_to_connect->end ()) {
168- std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << ownHostname << " ] "
169- << " Connecting to " << incomingMessage << " from multicast advert."
170- << std::endl;
171- token_used_to_connect->push_back (incomingMessage);
172- capio_backend->connect_to (incomingMessage);
173- }
174-
175- sleep (1 );
176- }
177- }
178-
179- /*
180- * Monitor the file system for the presence of tokens.
181- */
182- static void
183- find_new_server_from_fs_token_thread (const bool *continue_execution,
184- std::vector<std::string> *token_used_to_connect,
185- std::mutex *token_used_to_connect_mutex) {
186- START_LOG (gettid (), " call()" );
187-
188- if (!continue_execution) {
189- LOG (" Terminating execution" );
190- return ;
191- }
192-
193- auto dir_iterator = std::filesystem::directory_iterator (std::filesystem::current_path ());
194- for (const auto &entry : dir_iterator) {
195- const auto token_path = entry.path ();
196-
197- if (!entry.is_regular_file () || token_path.extension () != " .alive_connection" ) {
198- LOG (" Filename %s is not valid" , entry.path ().c_str ());
199- continue ;
200- }
201-
202- LOG (" Found token %s" , token_path.c_str ());
203-
204- std::ifstream MyReadFile (token_path.filename ());
205- std::string remoteHost = entry.path ().stem (), remotePort;
206- LOG (" Testing for file: %s (hostname: %s, port=%s)" , entry.path ().filename ().c_str (),
207- remoteHost.c_str (), remotePort.c_str ());
208-
209- getline (MyReadFile, remotePort);
210- MyReadFile.close ();
211-
212- const auto hostname_port = std::string (remoteHost) + " :" + remotePort;
213- std::lock_guard lock (*token_used_to_connect_mutex);
214- if (std::find (token_used_to_connect->begin (), token_used_to_connect->end (),
215- hostname_port) != token_used_to_connect->end ()) {
216- LOG (" Token already handled... skipping it!" );
217- continue ;
218- };
219-
220- // TODO: as of now we will not connect with servers
221- // TODO: that terminates and then comes back up online...
222- token_used_to_connect->push_back (hostname_port);
223- capio_backend->connect_to (std::string (remoteHost) + " :" + remotePort);
224- }
225- LOG (" Terminated loop. sleeping one second" );
226- sleep (1 );
227- }
22814
22915 public:
23016 ~CapioCommunicationService () {
231- *continue_execution = false ;
232-
233- pthread_cancel (thread_server_finder_multicast->native_handle ());
234- pthread_cancel (thread_server_finder_fs->native_handle ());
235- thread_server_finder_fs->join ();
236- thread_server_finder_multicast->join ();
237-
238- delete_aliveness_token ();
17+ delete capio_control_plane;
23918 delete capio_backend;
240- delete token_used_to_connect_mutex;
24119 };
24220
243- CapioCommunicationService (std::string &backend_name, const int port) {
21+ CapioCommunicationService (std::string &backend_name, const int port,
22+ const std::string &control_plane_backend = " multicast" ) {
24423 START_LOG (gettid (), " call(backend_name=%s)" , backend_name.c_str ());
245- *continue_execution = true ;
24624 gethostname (ownHostname, HOST_NAME_MAX);
24725 LOG (" My hostname is %s. Starting to listen on connection" , ownHostname);
24826
249- token_used_to_connect_mutex = new std::mutex ();
250-
25127 if (backend_name == " MQTT" || backend_name == " MPI" ) {
25228 std::cout
25329 << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << ownHostname << " ] "
@@ -274,14 +50,16 @@ class CapioCommunicationService {
27450 << std::endl;
27551 ERR_EXIT (" No valid backend was provided" );
27652 }
277- generate_aliveness_token (port);
278- thread_server_finder_fs =
279- new std::thread (find_new_server_from_fs_token_thread, std::ref (continue_execution),
280- &token_used_to_connect, token_used_to_connect_mutex);
28153
282- thread_server_finder_multicast =
283- new std::thread (find_new_server_from_multicast_thread, std::ref (continue_execution),
284- &token_used_to_connect, token_used_to_connect_mutex, port);
54+ if (control_plane_backend == " multicast" ) {
55+ std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << ownHostname << " ] "
56+ << " Starting multicast control plane" << std::endl;
57+ capio_control_plane = new MulticastControlPlane (port);
58+ } else {
59+ std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << ownHostname << " ] "
60+ << " Starting file system control plane" << std::endl;
61+ capio_control_plane = new FSControlPlane (port);
62+ }
28563 }
28664};
28765
0 commit comments