From dfe14d7bd6b7092a735f3f8358a2fb65119d614e Mon Sep 17 00:00:00 2001 From: marcoSanti Date: Sun, 26 Apr 2026 13:20:01 +0200 Subject: [PATCH 1/4] Added Capio Discovery Service This commit adds the CAPIO discovery service which allows a CAPIO server instance to detect other running server instances. It also moves the CAPIO canary flag to be managed by the deiscovery service, and changes the CLI parsing logic accordingly to manage correctly the statup process. --- capio/common/shm.hpp | 1 + capio/server/capio_server.cpp | 6 +- capio/server/include/remote/backend.hpp | 6 + capio/server/include/remote/backend/mpi.hpp | 1 + capio/server/include/remote/backend/none.hpp | 1 + capio/server/include/remote/discovery.hpp | 60 ++++++++++ capio/server/include/utils/signals.hpp | 10 +- capio/server/src/discovery_service.cpp | 109 +++++++++++++++++++ capio/server/src/mpi_backend.cpp | 1 + capio/server/src/none_backend.cpp | 3 +- capio/server/src/shm_canary.cpp | 27 +++++ capio/tests/unit/server/src/capio_file.cpp | 1 + 12 files changed, 220 insertions(+), 6 deletions(-) create mode 100644 capio/server/include/remote/discovery.hpp create mode 100644 capio/server/src/discovery_service.cpp create mode 100644 capio/server/src/shm_canary.cpp diff --git a/capio/common/shm.hpp b/capio/common/shm.hpp index c425e080b..99c113289 100644 --- a/capio/common/shm.hpp +++ b/capio/common/shm.hpp @@ -5,6 +5,7 @@ #include #include +#include #include #include #include diff --git a/capio/server/capio_server.cpp b/capio/server/capio_server.cpp index 403ec8f49..5ee3f08cb 100644 --- a/capio/server/capio_server.cpp +++ b/capio/server/capio_server.cpp @@ -28,6 +28,7 @@ #include "common/requests.hpp" #include "common/semaphore.hpp" #include "remote/backend.hpp" +#include "remote/discovery.hpp" #include "storage/capio_file.hpp" #include "utils/common.hpp" #include "utils/env.hpp" @@ -36,6 +37,7 @@ ClientManager *client_manager; StorageManager *storage_manager; Backend *backend; +DiscoveryService *discovery_service; #include "handlers.hpp" #include "utils/cli_parser.hpp" @@ -140,13 +142,13 @@ int main(int argc, char **argv) { capio_cl_engine->print(); - backend = select_backend(configuration.backend_name, argc, argv); + discovery_service = new DiscoveryService(); + backend = select_backend(configuration.backend_name, argc, argv); START_LOG(gettid(), "call()"); open_files_location(); - shm_canary = new CapioShmCanary(capio_cl_engine->getWorkflowName()); storage_manager = new StorageManager(); client_manager = new ClientManager(); diff --git a/capio/server/include/remote/backend.hpp b/capio/server/include/remote/backend.hpp index f1c827821..50b336b73 100644 --- a/capio/server/include/remote/backend.hpp +++ b/capio/server/include/remote/backend.hpp @@ -87,6 +87,12 @@ class Backend { * @param target */ virtual void send_request(const char *message, int message_len, const std::string &target) = 0; + + /** + * Connect this server instance to a remote server instance + * @param target Remote server instance identification + */ + virtual void connect_to(const std::string &target) = 0; }; #endif // CAPIO_SERVER_REMOTE_BACKEND_HPP diff --git a/capio/server/include/remote/backend/mpi.hpp b/capio/server/include/remote/backend/mpi.hpp index fca12a752..be185f01e 100644 --- a/capio/server/include/remote/backend/mpi.hpp +++ b/capio/server/include/remote/backend/mpi.hpp @@ -27,6 +27,7 @@ class MPIBackend : public Backend { void send_file(char *shm, long int nbytes, const std::string &target) override; void send_request(const char *message, int message_len, const std::string &target) override; void recv_file(char *shm, const std::string &source, long int bytes_expected) override; + void connect_to(const std::string &target) override; }; class MPISYNCBackend final : public MPIBackend { diff --git a/capio/server/include/remote/backend/none.hpp b/capio/server/include/remote/backend/none.hpp index 3faae205f..c0cd2298a 100644 --- a/capio/server/include/remote/backend/none.hpp +++ b/capio/server/include/remote/backend/none.hpp @@ -11,5 +11,6 @@ class NoneBackend final : public Backend { void send_file(char *shm, long int nbytes, const std::string &target) override; void send_request(const char *message, int message_len, const std::string &target) override; void recv_file(char *shm, const std::string &source, long int bytes_expected) override; + void connect_to(const std::string &target) override; }; #endif // CAPIO_SERVER_REMOTE_BACKEND_NONE_HPP diff --git a/capio/server/include/remote/discovery.hpp b/capio/server/include/remote/discovery.hpp new file mode 100644 index 000000000..eefc95f90 --- /dev/null +++ b/capio/server/include/remote/discovery.hpp @@ -0,0 +1,60 @@ +#ifndef CAPIO_DISCOVERY_HPP +#define CAPIO_DISCOVERY_HPP + +#include "common/shm.hpp" +#include +#include + +class CapioShmCanary { + int _shm_id; + std::string _canary_name; + + public: + explicit CapioShmCanary(std::string capio_workflow_name); + ~CapioShmCanary(); +}; + +class DiscoveryService { + bool terminate = false; + + /// @brief Handle for thread listening for other server instances + std::thread *listener_thread = nullptr; + /// @brief Handle for thread advertising this server instance + std::thread *advertisement_thread = nullptr; + + /// @brief Token to be advertised by this server + std::string advertisement_token; + + /// @brief Canary variable to detect other server instances running locally that are logically + /// equivalent to the one starting up + CapioShmCanary *shm_canary; + + public: + DiscoveryService(); + ~DiscoveryService(); + + /** + * Set the token to be advertised so that other server instance may connect to this instance. + * Token needs to be provided by an instance of a backend, according to backend specification + * for incoming connection + * @param token + */ + void setAdvertisementToken(const std::string &token); + + /** + * Start to advertise the token, and to scan for tokens from other servers + * @param adv_delay Delay between each advertisement. + */ + void start(unsigned int adv_delay); + + /** + * Stop current server instance from advertising itself and from receiving advertisements from + * other server instances. + * + * NOTE: this method does not destroy the CAPIO canary variable. for that the destruction of the + * class instance is required. + */ + void stop(); +}; + +#endif // CAPIO_DISCOVERY_HPP \ No newline at end of file diff --git a/capio/server/include/utils/signals.hpp b/capio/server/include/utils/signals.hpp index 94660c0a1..0f711611d 100644 --- a/capio/server/include/utils/signals.hpp +++ b/capio/server/include/utils/signals.hpp @@ -4,6 +4,7 @@ #include #include "remote/backend.hpp" +#include "remote/discovery.hpp" #include "server_println.hpp" #ifdef CAPIO_COVERAGE @@ -23,19 +24,22 @@ void sig_term_handler(int signum, siginfo_t *info, void *ptr) { } // free all the memory used - + discovery_service->stop(); delete client_manager; delete storage_manager; server_println("data_buffers cleanup completed", CapioCLEngine::get().getWorkflowName(), CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, __func__); + delete backend; + delete client_manager; + delete storage_manager; + #ifdef CAPIO_COVERAGE __gcov_dump(); #endif - delete backend; - delete shm_canary; + delete discovery_service; server_println("shutdown completed", CapioCLEngine::get().getWorkflowName(), CAPIO_LOG_SERVER_CLI_LEVEL_INFO, __func__); diff --git a/capio/server/src/discovery_service.cpp b/capio/server/src/discovery_service.cpp new file mode 100644 index 000000000..2a337819e --- /dev/null +++ b/capio/server/src/discovery_service.cpp @@ -0,0 +1,109 @@ +#include +#include + +#include "common/logger.hpp" +#include "remote/backend.hpp" +#include "remote/discovery.hpp" +#include "utils/capiocl_adapter.hpp" +#include "utils/common.hpp" + +extern Backend *backend; + +constexpr char CAPIO_MULTICAST_ADDRESS[] = "224.0.0.2"; +constexpr int CAPIO_MULTICAST_PORT = 22334; +int REUSE_MCAST_SOCKET = 1; + +void advertise(const bool *terminate, const unsigned int delay_ms, + const std::string &advertisement_token) { + const int advert_sock_fd = socket(AF_INET, SOCK_DGRAM, 0); + sockaddr_in advert_multicast_addr{}; + advert_multicast_addr.sin_family = AF_INET; + advert_multicast_addr.sin_port = htons(CAPIO_MULTICAST_PORT); + advert_multicast_addr.sin_addr.s_addr = inet_addr(CAPIO_MULTICAST_ADDRESS); + + while (!*terminate) { + std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms)); + sendto(advert_sock_fd, advertisement_token.data(), advertisement_token.size(), 0, + reinterpret_cast(&advert_multicast_addr), sizeof(advert_multicast_addr)); + } + + close(advert_sock_fd); +} + +void thread_discovery_service(const bool *terminate) { + START_LOG(gettid(), "call()"); + + int sockfd = socket(AF_INET, SOCK_DGRAM, 0); + + setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &REUSE_MCAST_SOCKET, sizeof(REUSE_MCAST_SOCKET)); + + timeval tv{}; + tv.tv_sec = 0; + tv.tv_usec = 100000; // 100,000 microseconds = 100ms + setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); + + sockaddr_in local_addr{}; + local_addr.sin_family = AF_INET; + local_addr.sin_port = htons(CAPIO_MULTICAST_PORT); + local_addr.sin_addr.s_addr = htonl(INADDR_ANY); + bind(sockfd, reinterpret_cast(&local_addr), sizeof(local_addr)); + + ip_mreq mreq{}; + mreq.imr_multiaddr.s_addr = inet_addr(CAPIO_MULTICAST_ADDRESS); + mreq.imr_interface.s_addr = htonl(INADDR_ANY); + setsockopt(sockfd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)); + + char incoming_token[2 * HOST_NAME_MAX] = {0}; + + while (!*terminate) { + + bzero(incoming_token, 2 * HOST_NAME_MAX); + + if (recvfrom(sockfd, incoming_token, sizeof(incoming_token) - 1, 0, nullptr, nullptr) > 0) { + backend->connect_to(incoming_token); + } + } + close(sockfd); +} + +void DiscoveryService::start(unsigned int adv_delay) { + if (advertisement_token.empty()) { + throw std::runtime_error("Advertisement token is empty"); + } + + listener_thread = new std::thread(thread_discovery_service, &terminate); + advertisement_thread = + new std::thread(advertise, &terminate, adv_delay, std::ref(advertisement_token)); + + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, "DiscoveryService will advertise " + + advertisement_token + " every " + + std::to_string(adv_delay) + "ms."); +} +void DiscoveryService::stop() { + terminate = true; + + if (listener_thread != nullptr && listener_thread->joinable()) { + listener_thread->join(); + listener_thread = nullptr; + } + + if (listener_thread != nullptr && advertisement_thread->joinable()) { + advertisement_thread->join(); + advertisement_thread = nullptr; + } +} + +DiscoveryService::DiscoveryService() { + shm_canary = new CapioShmCanary(CapioCLEngine::get().getWorkflowName()); +} + +DiscoveryService::~DiscoveryService() { + if (!terminate) { + stop(); + } + delete shm_canary; +} + +void DiscoveryService::setAdvertisementToken(const std::string &token) { + this->advertisement_token = token; +} \ No newline at end of file diff --git a/capio/server/src/mpi_backend.cpp b/capio/server/src/mpi_backend.cpp index ead84a42c..00ebe0aec 100644 --- a/capio/server/src/mpi_backend.cpp +++ b/capio/server/src/mpi_backend.cpp @@ -119,6 +119,7 @@ void MPIBackend::recv_file(char *shm, const std::string &source, long int bytes_ LOG("Chunk size is %ld bytes", bytes_received); } } +void MPIBackend::connect_to(const std::string &target) { return; } MPISYNCBackend::MPISYNCBackend(int argc, char *argv[]) : MPIBackend(argc, argv) { START_LOG(gettid(), "call()"); diff --git a/capio/server/src/none_backend.cpp b/capio/server/src/none_backend.cpp index e0e8e3208..588d5d843 100644 --- a/capio/server/src/none_backend.cpp +++ b/capio/server/src/none_backend.cpp @@ -30,4 +30,5 @@ void NoneBackend::send_request(const char *message, const int message_len, void NoneBackend::recv_file(char *shm, const std::string &source, const long int bytes_expected) { START_LOG(gettid(), "call(shm=%ld, source=%s, bytes_expected=%ld)", shm, source.c_str(), bytes_expected); -} \ No newline at end of file +} +void NoneBackend::connect_to(const std::string &target) { return; } \ No newline at end of file diff --git a/capio/server/src/shm_canary.cpp b/capio/server/src/shm_canary.cpp new file mode 100644 index 000000000..1c9321880 --- /dev/null +++ b/capio/server/src/shm_canary.cpp @@ -0,0 +1,27 @@ +#include "common/env.hpp" +#include "common/logger.hpp" +#include "remote/discovery.hpp" +#include "utils/common.hpp" + +CapioShmCanary::CapioShmCanary(std::string capio_workflow_name) + : _canary_name(capio_workflow_name) { + START_LOG(capio_syscall(SYS_gettid), "call(capio_workflow_name: %s)", _canary_name.data()); + if (_canary_name.empty()) { + _canary_name = get_capio_workflow_name(); + } + _shm_id = shm_open(_canary_name.data(), O_CREAT | O_EXCL, S_IRUSR | S_IWUSR); + if (_shm_id == -1) { + + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, + "Error: canary variable " + _canary_name + " already exists!"); + LOG(CAPIO_SHM_CANARY_ERROR, _canary_name.data()); + ERR_EXIT("ERR: shm canary flag already exists"); + } +} + +CapioShmCanary::~CapioShmCanary() { + START_LOG(capio_syscall(SYS_gettid), "call()"); + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, "Removing shared memory canary flag"); + close(_shm_id); + SHM_DESTROY_CHECK(_canary_name.c_str()); +} diff --git a/capio/tests/unit/server/src/capio_file.cpp b/capio/tests/unit/server/src/capio_file.cpp index db7ead849..28095dfe9 100644 --- a/capio/tests/unit/server/src/capio_file.cpp +++ b/capio/tests/unit/server/src/capio_file.cpp @@ -338,6 +338,7 @@ class MockBackend : public Backend { RemoteRequest read_next_request() override { return {nullptr, ""}; } void send_file(char *shm, long int nbytes, const std::string &target) override {} void send_request(const char *message, int message_len, const std::string &target) override {} + void connect_to(const std::string &target) override {} }; class MockBackendTestFixture : public ::testing::Test { From 5f76798556caec6ac79a41442786befef9b2fe9b Mon Sep 17 00:00:00 2001 From: marcoSanti Date: Sun, 26 Apr 2026 14:08:14 +0200 Subject: [PATCH 2/4] Began work to add FS based service discovery --- .gitignore | 2 ++ capio/server/include/remote/discovery.hpp | 27 ++++++++++++++--------- capio/server/include/utils/shm_canary.hpp | 13 +++++++++++ capio/server/src/discovery_service.cpp | 15 +++++++++++++ capio/server/src/shm_canary.cpp | 5 +++-- 5 files changed, 49 insertions(+), 13 deletions(-) create mode 100644 capio/server/include/utils/shm_canary.hpp diff --git a/.gitignore b/.gitignore index d29456e74..20776d91a 100644 --- a/.gitignore +++ b/.gitignore @@ -48,4 +48,6 @@ capio_logs debug build +.capio_tokens + cmake_test_discovery*.json diff --git a/capio/server/include/remote/discovery.hpp b/capio/server/include/remote/discovery.hpp index eefc95f90..6b8630844 100644 --- a/capio/server/include/remote/discovery.hpp +++ b/capio/server/include/remote/discovery.hpp @@ -1,20 +1,14 @@ #ifndef CAPIO_DISCOVERY_HPP #define CAPIO_DISCOVERY_HPP -#include "common/shm.hpp" #include #include -class CapioShmCanary { - int _shm_id; - std::string _canary_name; - - public: - explicit CapioShmCanary(std::string capio_workflow_name); - ~CapioShmCanary(); -}; +#include "utils/shm_canary.hpp" class DiscoveryService { + + /// @brief Variable used to signal termination to child threads bool terminate = false; /// @brief Handle for thread listening for other server instances @@ -29,20 +23,31 @@ class DiscoveryService { /// equivalent to the one starting up CapioShmCanary *shm_canary; + std::filesystem::path token_directory_path = ".capio_tokens/"; + std::filesystem::path token_filename; + public: + /// @brief Default constructor DiscoveryService(); + + /// @brief Default destructor ~DiscoveryService(); /** * Set the token to be advertised so that other server instance may connect to this instance. * Token needs to be provided by an instance of a backend, according to backend specification - * for incoming connection + * for incoming connection. + * + * Once the token is set, a new hidden file with the current token is stored within a hidden + * directory. * @param token */ void setAdvertisementToken(const std::string &token); /** - * Start to advertise the token, and to scan for tokens from other servers + * Start to advertise the token, and to scan for tokens from other servers. Advertisement works + * by sending multicast traffic, and by scanning files contained within the hidden directory + * with aliveness tokens. * @param adv_delay Delay between each advertisement. */ void start(unsigned int adv_delay); diff --git a/capio/server/include/utils/shm_canary.hpp b/capio/server/include/utils/shm_canary.hpp new file mode 100644 index 000000000..765ab7c4c --- /dev/null +++ b/capio/server/include/utils/shm_canary.hpp @@ -0,0 +1,13 @@ +#ifndef CAPIO_SHM_CANARY_HPP +#define CAPIO_SHM_CANARY_HPP +#include + +class CapioShmCanary { + int _shm_id; + std::string _canary_name; + + public: + explicit CapioShmCanary(const std::string &capio_workflow_name); + ~CapioShmCanary(); +}; +#endif // CAPIO_SHM_CANARY_HPP diff --git a/capio/server/src/discovery_service.cpp b/capio/server/src/discovery_service.cpp index 2a337819e..d90dc6366 100644 --- a/capio/server/src/discovery_service.cpp +++ b/capio/server/src/discovery_service.cpp @@ -95,15 +95,30 @@ void DiscoveryService::stop() { DiscoveryService::DiscoveryService() { shm_canary = new CapioShmCanary(CapioCLEngine::get().getWorkflowName()); + + if (!std::filesystem::exists(token_directory_path)) { + std::filesystem::create_directory(token_directory_path); + } + + std::string node_name(HOST_NAME_MAX, '\0'); + gethostname(node_name.data(), node_name.size()); + node_name.resize(strlen(node_name.data())); + + token_filename = node_name + ".capio"; } DiscoveryService::~DiscoveryService() { if (!terminate) { stop(); } + std::filesystem::remove(token_directory_path / token_filename); delete shm_canary; } void DiscoveryService::setAdvertisementToken(const std::string &token) { this->advertisement_token = token; + + std::ofstream token_file(token_directory_path / token_filename); + token_file << token; + token_file.close(); } \ No newline at end of file diff --git a/capio/server/src/shm_canary.cpp b/capio/server/src/shm_canary.cpp index 1c9321880..4c0bdceab 100644 --- a/capio/server/src/shm_canary.cpp +++ b/capio/server/src/shm_canary.cpp @@ -1,9 +1,10 @@ +#include "utils/shm_canary.hpp" + #include "common/env.hpp" #include "common/logger.hpp" -#include "remote/discovery.hpp" #include "utils/common.hpp" -CapioShmCanary::CapioShmCanary(std::string capio_workflow_name) +CapioShmCanary::CapioShmCanary(const std::string &capio_workflow_name) : _canary_name(capio_workflow_name) { START_LOG(capio_syscall(SYS_gettid), "call(capio_workflow_name: %s)", _canary_name.data()); if (_canary_name.empty()) { From 58b3585de9ab76fe2e0240d85f9b00b752ab25f1 Mon Sep 17 00:00:00 2001 From: marcoSanti Date: Sun, 26 Apr 2026 14:27:06 +0200 Subject: [PATCH 3/4] Added FS based discovery service --- capio/server/include/remote/discovery.hpp | 8 +++-- capio/server/src/discovery_service.cpp | 38 +++++++++++++++++++---- 2 files changed, 37 insertions(+), 9 deletions(-) diff --git a/capio/server/include/remote/discovery.hpp b/capio/server/include/remote/discovery.hpp index 6b8630844..9fe6b7d4e 100644 --- a/capio/server/include/remote/discovery.hpp +++ b/capio/server/include/remote/discovery.hpp @@ -11,10 +11,12 @@ class DiscoveryService { /// @brief Variable used to signal termination to child threads bool terminate = false; - /// @brief Handle for thread listening for other server instances - std::thread *listener_thread = nullptr; + /// @brief Handle for multicast based discovery thread + std::thread *mcast_listener_thread = nullptr; + /// @brief Handle for file system based discovery thread + std::thread *fs_listener_thread = nullptr; /// @brief Handle for thread advertising this server instance - std::thread *advertisement_thread = nullptr; + std::thread *advertisement_thread = nullptr; /// @brief Token to be advertised by this server std::string advertisement_token; diff --git a/capio/server/src/discovery_service.cpp b/capio/server/src/discovery_service.cpp index d90dc6366..753c35fd4 100644 --- a/capio/server/src/discovery_service.cpp +++ b/capio/server/src/discovery_service.cpp @@ -30,7 +30,7 @@ void advertise(const bool *terminate, const unsigned int delay_ms, close(advert_sock_fd); } -void thread_discovery_service(const bool *terminate) { +void mcast_thread_discovery_service(const bool *terminate) { START_LOG(gettid(), "call()"); int sockfd = socket(AF_INET, SOCK_DGRAM, 0); @@ -66,12 +66,33 @@ void thread_discovery_service(const bool *terminate) { close(sockfd); } +void fs_discovery_service(const bool *terminate, const std::filesystem::path &token_directory_path, + const unsigned int delay_ms) { + std::vector found_paths; + + while (!*terminate) { + for (auto &entry : std::filesystem::directory_iterator(token_directory_path)) { + if (std::find(found_paths.begin(), found_paths.end(), entry.path().string()) == + found_paths.end()) { + found_paths.push_back(entry.path().string()); + std::ifstream input(entry.path()); + std::string token; + input >> token; + backend->connect_to(token); + } + } + std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms)); + } +} + void DiscoveryService::start(unsigned int adv_delay) { if (advertisement_token.empty()) { throw std::runtime_error("Advertisement token is empty"); } - listener_thread = new std::thread(thread_discovery_service, &terminate); + mcast_listener_thread = new std::thread(mcast_thread_discovery_service, &terminate); + fs_listener_thread = + new std::thread(fs_discovery_service, &terminate, token_directory_path, adv_delay); advertisement_thread = new std::thread(advertise, &terminate, adv_delay, std::ref(advertisement_token)); @@ -82,12 +103,17 @@ void DiscoveryService::start(unsigned int adv_delay) { void DiscoveryService::stop() { terminate = true; - if (listener_thread != nullptr && listener_thread->joinable()) { - listener_thread->join(); - listener_thread = nullptr; + if (mcast_listener_thread != nullptr && mcast_listener_thread->joinable()) { + mcast_listener_thread->join(); + mcast_listener_thread = nullptr; + } + + if (fs_listener_thread != nullptr && fs_listener_thread->joinable()) { + fs_listener_thread->join(); + fs_listener_thread = nullptr; } - if (listener_thread != nullptr && advertisement_thread->joinable()) { + if (advertisement_thread != nullptr && advertisement_thread->joinable()) { advertisement_thread->join(); advertisement_thread = nullptr; } From 0f64ed8c0636b03d03ad9bfd6065061a4addcba4 Mon Sep 17 00:00:00 2001 From: marcoSanti Date: Mon, 27 Apr 2026 11:41:05 +0200 Subject: [PATCH 4/4] Cleanup and comments --- capio/common/constants.hpp | 4 + capio/common/shm.hpp | 43 --------- capio/server/include/remote/discovery.hpp | 57 ++++++++---- capio/server/src/discovery_service.cpp | 104 ++++++++++++++-------- 4 files changed, 109 insertions(+), 99 deletions(-) diff --git a/capio/common/constants.hpp b/capio/common/constants.hpp index 3a230f4bc..1c5e8d66e 100644 --- a/capio/common/constants.hpp +++ b/capio/common/constants.hpp @@ -11,6 +11,10 @@ constexpr size_t CAPIO_DEFAULT_DIR_INITIAL_SIZE = 1024L * 1024 * 1024; constexpr off64_t CAPIO_DEFAULT_FILE_INITIAL_SIZE = 1024L * 1024 * 1024 * 4; +// CAPIO backend constants +constexpr char CAPIO_MCAST_ADV_DEFAULT_ADDR[] = "224.0.0.2"; +constexpr unsigned int CAPIO_MCAST_ADV_DEFAULT_PORT = 22334; + // CAPIO default values for shared memory constexpr char CAPIO_DEFAULT_WORKFLOW_NAME[] = "CAPIO"; constexpr char CAPIO_DEFAULT_APP_NAME[] = "default_app"; diff --git a/capio/common/shm.hpp b/capio/common/shm.hpp index 99c113289..7f0c0515d 100644 --- a/capio/common/shm.hpp +++ b/capio/common/shm.hpp @@ -5,7 +5,6 @@ #include #include -#include #include #include #include @@ -44,48 +43,6 @@ #endif -class CapioShmCanary { - int _shm_id; - std::string _canary_name; - - public: - explicit CapioShmCanary(std::string capio_workflow_name) : _canary_name(capio_workflow_name) { - START_LOG(capio_syscall(SYS_gettid), "call(capio_workflow_name: %s)", _canary_name.data()); - if (_canary_name.empty()) { - _canary_name = get_capio_workflow_name(); - } - _shm_id = shm_open(_canary_name.data(), O_CREAT | O_EXCL, S_IRUSR | S_IWUSR); - if (_shm_id == -1) { - LOG(CAPIO_SHM_CANARY_ERROR, _canary_name.data()); -#ifndef __CAPIO_POSIX - const auto message = new char[strlen(CAPIO_SHM_CANARY_ERROR)]; - sprintf(message, CAPIO_SHM_CANARY_ERROR, _canary_name.data()); - server_println(message, capio_workflow_name, CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, - "CapioShmCanary"); - delete[] message; -#endif - ERR_EXIT("ERR: shm canary flag already exists"); - } -#ifndef __CAPIO_POSIX - server_println("Created Capio SHM canary: " + _canary_name, capio_workflow_name, - CAPIO_LOG_SERVER_CLI_LEVEL_STATUS, "CapioShmCanary"); -#endif - }; - - ~CapioShmCanary() { - START_LOG(capio_syscall(SYS_gettid), "call()"); -#ifndef __CAPIO_POSIX - server_println("Removing shared memory canary flag", get_capio_workflow_name(), - CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, "CapioShmCanary"); -#endif - close(_shm_id); - SHM_DESTROY_CHECK(_canary_name.c_str()); - } -}; - -// FIXME: Remove the inline specifier by using extern -inline CapioShmCanary *shm_canary; - inline void *create_shm(const std::string &shm_name, const long int size) { START_LOG(capio_syscall(SYS_gettid), "call(shm_name=%s, size=%ld)", shm_name.c_str(), size); diff --git a/capio/server/include/remote/discovery.hpp b/capio/server/include/remote/discovery.hpp index 9fe6b7d4e..4454a808b 100644 --- a/capio/server/include/remote/discovery.hpp +++ b/capio/server/include/remote/discovery.hpp @@ -6,6 +6,13 @@ #include "utils/shm_canary.hpp" +/** + * Discovery service. Responsible for: + * - Detect other server instances running in the same node with the same workflow name (and halts + * startup if it finds one) + * - Detect other remote running server instances of capio servers and issue commands to the backend + * to open a connection with them as soon as they are found. + */ class DiscoveryService { /// @brief Variable used to signal termination to child threads @@ -25,34 +32,48 @@ class DiscoveryService { /// equivalent to the one starting up CapioShmCanary *shm_canary; - std::filesystem::path token_directory_path = ".capio_tokens/"; + /// @brief Directory to look into for CAPIO tokens + std::filesystem::path token_directory_path; + /// @brief This server instance token filename std::filesystem::path token_filename; + /// @brief Multicast address + const std::string capio_multicast_adv_address; + + /// @brief multicast port + const unsigned int capio_multicast_adv_port; + public: - /// @brief Default constructor - DiscoveryService(); + /** + * Construct a new Discovery Service class + * @param mcast_addr Address to send and receive aliveness token from other servers + * @param mcast_port Port to send and receive aliveness token from other servers + */ + explicit DiscoveryService(const std::string &mcast_addr = CAPIO_MCAST_ADV_DEFAULT_ADDR, + unsigned int mcast_port = CAPIO_MCAST_ADV_DEFAULT_PORT); /// @brief Default destructor ~DiscoveryService(); /** - * Set the token to be advertised so that other server instance may connect to this instance. - * Token needs to be provided by an instance of a backend, according to backend specification - * for incoming connection. + * @brief Configures and starts the discovery service to advertise and scan for tokens. * - * Once the token is set, a new hidden file with the current token is stored within a hidden - * directory. - * @param token - */ - void setAdvertisementToken(const std::string &token); - - /** - * Start to advertise the token, and to scan for tokens from other servers. Advertisement works - * by sending multicast traffic, and by scanning files contained within the hidden directory - * with aliveness tokens. - * @param adv_delay Delay between each advertisement. + * Sets the advertisement token used by other server instances to establish a connection. + * The token must conform to the specific backend requirements for incoming connections. + * * @note The token is not passed via the constructor because the Discovery Service + * must be instantiated before the Backend provides the token. + * + * Once called, this method: + * 1. Stores the current token in a hidden file within a designated directory. + * 2. Initiates multicast traffic to advertise the local token. + * 3. Scans the hidden directory for aliveness tokens from other servers. + * + * @param adv_delay The interval (in milliseconds/seconds) between advertisement broadcasts. + * @param token The authentication or identification string provided by the backend. + * @param token_directory directory to store capio aliveness tokens */ - void start(unsigned int adv_delay); + void start(unsigned int adv_delay, const std::string &token, + const std::string &token_directory = ".capio_tokens/"); /** * Stop current server instance from advertising itself and from receiving advertisements from diff --git a/capio/server/src/discovery_service.cpp b/capio/server/src/discovery_service.cpp index 753c35fd4..5bf856f70 100644 --- a/capio/server/src/discovery_service.cpp +++ b/capio/server/src/discovery_service.cpp @@ -9,17 +9,17 @@ extern Backend *backend; -constexpr char CAPIO_MULTICAST_ADDRESS[] = "224.0.0.2"; -constexpr int CAPIO_MULTICAST_PORT = 22334; -int REUSE_MCAST_SOCKET = 1; +// constant required by setsockopt() +int REUSE_MCAST_SOCKET = 1; void advertise(const bool *terminate, const unsigned int delay_ms, - const std::string &advertisement_token) { + const std::string &advertisement_token, const std::string &adv_addr, + const unsigned int adv_port) { const int advert_sock_fd = socket(AF_INET, SOCK_DGRAM, 0); sockaddr_in advert_multicast_addr{}; advert_multicast_addr.sin_family = AF_INET; - advert_multicast_addr.sin_port = htons(CAPIO_MULTICAST_PORT); - advert_multicast_addr.sin_addr.s_addr = inet_addr(CAPIO_MULTICAST_ADDRESS); + advert_multicast_addr.sin_port = htons(adv_port); + advert_multicast_addr.sin_addr.s_addr = inet_addr(adv_addr.c_str()); while (!*terminate) { std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms)); @@ -30,7 +30,8 @@ void advertise(const bool *terminate, const unsigned int delay_ms, close(advert_sock_fd); } -void mcast_thread_discovery_service(const bool *terminate) { +void mcast_thread_discovery_service(const bool *terminate, const std::string &adv_addr, + const unsigned int adv_port) { START_LOG(gettid(), "call()"); int sockfd = socket(AF_INET, SOCK_DGRAM, 0); @@ -44,12 +45,18 @@ void mcast_thread_discovery_service(const bool *terminate) { sockaddr_in local_addr{}; local_addr.sin_family = AF_INET; - local_addr.sin_port = htons(CAPIO_MULTICAST_PORT); + local_addr.sin_port = htons(adv_port); local_addr.sin_addr.s_addr = htonl(INADDR_ANY); - bind(sockfd, reinterpret_cast(&local_addr), sizeof(local_addr)); + if (bind(sockfd, reinterpret_cast(&local_addr), sizeof(local_addr)) == -1) { + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, + "Error: unable to bind to multicast socket. Error is: " + + std::string(std::strerror(errno))); + // halt execution and return + return; + } ip_mreq mreq{}; - mreq.imr_multiaddr.s_addr = inet_addr(CAPIO_MULTICAST_ADDRESS); + mreq.imr_multiaddr.s_addr = inet_addr(adv_addr.c_str()); mreq.imr_interface.s_addr = htonl(INADDR_ANY); setsockopt(sockfd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)); @@ -68,16 +75,26 @@ void mcast_thread_discovery_service(const bool *terminate) { void fs_discovery_service(const bool *terminate, const std::filesystem::path &token_directory_path, const unsigned int delay_ms) { - std::vector found_paths; + // local cache to not reload tokens already found + // TODO: relax this by storing also last modified date, and reload in case changes occurred + // after first read + + std::vector cache; while (!*terminate) { - for (auto &entry : std::filesystem::directory_iterator(token_directory_path)) { - if (std::find(found_paths.begin(), found_paths.end(), entry.path().string()) == - found_paths.end()) { - found_paths.push_back(entry.path().string()); + const auto iterator = std::filesystem::directory_iterator(token_directory_path); + for (auto &entry : iterator) { + if (std::find(cache.begin(), cache.end(), entry.path()) == cache.end()) { + cache.push_back(entry.path()); + + // Read connection token from FS std::ifstream input(entry.path()); std::string token; input >> token; + + // Send token to backend to issue a direct connection. + // NOTE: backend will refuse to connect silently if connection is already + // established backend->connect_to(token); } } @@ -85,16 +102,40 @@ void fs_discovery_service(const bool *terminate, const std::filesystem::path &to } } -void DiscoveryService::start(unsigned int adv_delay) { - if (advertisement_token.empty()) { +void DiscoveryService::start(unsigned int adv_delay, const std::string &token, + const std::string &token_directory) { + + if (token.empty()) { throw std::runtime_error("Advertisement token is empty"); } - mcast_listener_thread = new std::thread(mcast_thread_discovery_service, &terminate); + if (token_directory.empty()) { + throw std::runtime_error("Provided token directory is empty"); + } + + if (!std::filesystem::exists(token_directory)) { + std::filesystem::create_directory(token_directory); + } + + std::string node_name(HOST_NAME_MAX, '\0'); + gethostname(node_name.data(), node_name.size()); + node_name.resize(strlen(node_name.data())); + + token_directory_path = token_directory; + token_filename = node_name + ".capio"; + advertisement_token = token; + + std::ofstream token_file(token_directory_path / token_filename); + token_file << advertisement_token; + token_file.close(); + + mcast_listener_thread = new std::thread(mcast_thread_discovery_service, &terminate, + capio_multicast_adv_address, capio_multicast_adv_port); fs_listener_thread = new std::thread(fs_discovery_service, &terminate, token_directory_path, adv_delay); advertisement_thread = - new std::thread(advertise, &terminate, adv_delay, std::ref(advertisement_token)); + new std::thread(advertise, &terminate, adv_delay, std::ref(advertisement_token), + capio_multicast_adv_address, capio_multicast_adv_port); server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, "DiscoveryService will advertise " + advertisement_token + " every " + @@ -119,32 +160,19 @@ void DiscoveryService::stop() { } } -DiscoveryService::DiscoveryService() { +DiscoveryService::DiscoveryService(const std::string &mcast_addr, const unsigned int mcast_port) + : capio_multicast_adv_address(mcast_addr), capio_multicast_adv_port(mcast_port) { shm_canary = new CapioShmCanary(CapioCLEngine::get().getWorkflowName()); - - if (!std::filesystem::exists(token_directory_path)) { - std::filesystem::create_directory(token_directory_path); - } - - std::string node_name(HOST_NAME_MAX, '\0'); - gethostname(node_name.data(), node_name.size()); - node_name.resize(strlen(node_name.data())); - - token_filename = node_name + ".capio"; } DiscoveryService::~DiscoveryService() { + // if destructor is called before stop(), then stop the the service first. if (!terminate) { stop(); } + // delete aliveness token std::filesystem::remove(token_directory_path / token_filename); - delete shm_canary; -} - -void DiscoveryService::setAdvertisementToken(const std::string &token) { - this->advertisement_token = token; - std::ofstream token_file(token_directory_path / token_filename); - token_file << token; - token_file.close(); + // delete shm canary + delete shm_canary; } \ No newline at end of file