diff --git a/.gitignore b/.gitignore index d29456e74..20776d91a 100644 --- a/.gitignore +++ b/.gitignore @@ -48,4 +48,6 @@ capio_logs debug build +.capio_tokens + cmake_test_discovery*.json diff --git a/capio/common/constants.hpp b/capio/common/constants.hpp index 3a230f4bc..1c5e8d66e 100644 --- a/capio/common/constants.hpp +++ b/capio/common/constants.hpp @@ -11,6 +11,10 @@ constexpr size_t CAPIO_DEFAULT_DIR_INITIAL_SIZE = 1024L * 1024 * 1024; constexpr off64_t CAPIO_DEFAULT_FILE_INITIAL_SIZE = 1024L * 1024 * 1024 * 4; +// CAPIO backend constants +constexpr char CAPIO_MCAST_ADV_DEFAULT_ADDR[] = "224.0.0.2"; +constexpr unsigned int CAPIO_MCAST_ADV_DEFAULT_PORT = 22334; + // CAPIO default values for shared memory constexpr char CAPIO_DEFAULT_WORKFLOW_NAME[] = "CAPIO"; constexpr char CAPIO_DEFAULT_APP_NAME[] = "default_app"; diff --git a/capio/common/shm.hpp b/capio/common/shm.hpp index c425e080b..7f0c0515d 100644 --- a/capio/common/shm.hpp +++ b/capio/common/shm.hpp @@ -43,48 +43,6 @@ #endif -class CapioShmCanary { - int _shm_id; - std::string _canary_name; - - public: - explicit CapioShmCanary(std::string capio_workflow_name) : _canary_name(capio_workflow_name) { - START_LOG(capio_syscall(SYS_gettid), "call(capio_workflow_name: %s)", _canary_name.data()); - if (_canary_name.empty()) { - _canary_name = get_capio_workflow_name(); - } - _shm_id = shm_open(_canary_name.data(), O_CREAT | O_EXCL, S_IRUSR | S_IWUSR); - if (_shm_id == -1) { - LOG(CAPIO_SHM_CANARY_ERROR, _canary_name.data()); -#ifndef __CAPIO_POSIX - const auto message = new char[strlen(CAPIO_SHM_CANARY_ERROR)]; - sprintf(message, CAPIO_SHM_CANARY_ERROR, _canary_name.data()); - server_println(message, capio_workflow_name, CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, - "CapioShmCanary"); - delete[] message; -#endif - ERR_EXIT("ERR: shm canary flag already exists"); - } -#ifndef __CAPIO_POSIX - server_println("Created Capio SHM canary: " + _canary_name, capio_workflow_name, - CAPIO_LOG_SERVER_CLI_LEVEL_STATUS, "CapioShmCanary"); -#endif - }; - - ~CapioShmCanary() { - START_LOG(capio_syscall(SYS_gettid), "call()"); -#ifndef __CAPIO_POSIX - server_println("Removing shared memory canary flag", get_capio_workflow_name(), - CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, "CapioShmCanary"); -#endif - close(_shm_id); - SHM_DESTROY_CHECK(_canary_name.c_str()); - } -}; - -// FIXME: Remove the inline specifier by using extern -inline CapioShmCanary *shm_canary; - inline void *create_shm(const std::string &shm_name, const long int size) { START_LOG(capio_syscall(SYS_gettid), "call(shm_name=%s, size=%ld)", shm_name.c_str(), size); diff --git a/capio/server/capio_server.cpp b/capio/server/capio_server.cpp index 403ec8f49..5ee3f08cb 100644 --- a/capio/server/capio_server.cpp +++ b/capio/server/capio_server.cpp @@ -28,6 +28,7 @@ #include "common/requests.hpp" #include "common/semaphore.hpp" #include "remote/backend.hpp" +#include "remote/discovery.hpp" #include "storage/capio_file.hpp" #include "utils/common.hpp" #include "utils/env.hpp" @@ -36,6 +37,7 @@ ClientManager *client_manager; StorageManager *storage_manager; Backend *backend; +DiscoveryService *discovery_service; #include "handlers.hpp" #include "utils/cli_parser.hpp" @@ -140,13 +142,13 @@ int main(int argc, char **argv) { capio_cl_engine->print(); - backend = select_backend(configuration.backend_name, argc, argv); + discovery_service = new DiscoveryService(); + backend = select_backend(configuration.backend_name, argc, argv); START_LOG(gettid(), "call()"); open_files_location(); - shm_canary = new CapioShmCanary(capio_cl_engine->getWorkflowName()); storage_manager = new StorageManager(); client_manager = new ClientManager(); diff --git a/capio/server/include/remote/backend.hpp b/capio/server/include/remote/backend.hpp index f1c827821..50b336b73 100644 --- a/capio/server/include/remote/backend.hpp +++ b/capio/server/include/remote/backend.hpp @@ -87,6 +87,12 @@ class Backend { * @param target */ virtual void send_request(const char *message, int message_len, const std::string &target) = 0; + + /** + * Connect this server instance to a remote server instance + * @param target Remote server instance identification + */ + virtual void connect_to(const std::string &target) = 0; }; #endif // CAPIO_SERVER_REMOTE_BACKEND_HPP diff --git a/capio/server/include/remote/backend/mpi.hpp b/capio/server/include/remote/backend/mpi.hpp index fca12a752..be185f01e 100644 --- a/capio/server/include/remote/backend/mpi.hpp +++ b/capio/server/include/remote/backend/mpi.hpp @@ -27,6 +27,7 @@ class MPIBackend : public Backend { void send_file(char *shm, long int nbytes, const std::string &target) override; void send_request(const char *message, int message_len, const std::string &target) override; void recv_file(char *shm, const std::string &source, long int bytes_expected) override; + void connect_to(const std::string &target) override; }; class MPISYNCBackend final : public MPIBackend { diff --git a/capio/server/include/remote/backend/none.hpp b/capio/server/include/remote/backend/none.hpp index 3faae205f..c0cd2298a 100644 --- a/capio/server/include/remote/backend/none.hpp +++ b/capio/server/include/remote/backend/none.hpp @@ -11,5 +11,6 @@ class NoneBackend final : public Backend { void send_file(char *shm, long int nbytes, const std::string &target) override; void send_request(const char *message, int message_len, const std::string &target) override; void recv_file(char *shm, const std::string &source, long int bytes_expected) override; + void connect_to(const std::string &target) override; }; #endif // CAPIO_SERVER_REMOTE_BACKEND_NONE_HPP diff --git a/capio/server/include/remote/discovery.hpp b/capio/server/include/remote/discovery.hpp new file mode 100644 index 000000000..4454a808b --- /dev/null +++ b/capio/server/include/remote/discovery.hpp @@ -0,0 +1,88 @@ +#ifndef CAPIO_DISCOVERY_HPP +#define CAPIO_DISCOVERY_HPP + +#include +#include + +#include "utils/shm_canary.hpp" + +/** + * Discovery service. Responsible for: + * - Detect other server instances running in the same node with the same workflow name (and halts + * startup if it finds one) + * - Detect other remote running server instances of capio servers and issue commands to the backend + * to open a connection with them as soon as they are found. + */ +class DiscoveryService { + + /// @brief Variable used to signal termination to child threads + bool terminate = false; + + /// @brief Handle for multicast based discovery thread + std::thread *mcast_listener_thread = nullptr; + /// @brief Handle for file system based discovery thread + std::thread *fs_listener_thread = nullptr; + /// @brief Handle for thread advertising this server instance + std::thread *advertisement_thread = nullptr; + + /// @brief Token to be advertised by this server + std::string advertisement_token; + + /// @brief Canary variable to detect other server instances running locally that are logically + /// equivalent to the one starting up + CapioShmCanary *shm_canary; + + /// @brief Directory to look into for CAPIO tokens + std::filesystem::path token_directory_path; + /// @brief This server instance token filename + std::filesystem::path token_filename; + + /// @brief Multicast address + const std::string capio_multicast_adv_address; + + /// @brief multicast port + const unsigned int capio_multicast_adv_port; + + public: + /** + * Construct a new Discovery Service class + * @param mcast_addr Address to send and receive aliveness token from other servers + * @param mcast_port Port to send and receive aliveness token from other servers + */ + explicit DiscoveryService(const std::string &mcast_addr = CAPIO_MCAST_ADV_DEFAULT_ADDR, + unsigned int mcast_port = CAPIO_MCAST_ADV_DEFAULT_PORT); + + /// @brief Default destructor + ~DiscoveryService(); + + /** + * @brief Configures and starts the discovery service to advertise and scan for tokens. + * + * Sets the advertisement token used by other server instances to establish a connection. + * The token must conform to the specific backend requirements for incoming connections. + * * @note The token is not passed via the constructor because the Discovery Service + * must be instantiated before the Backend provides the token. + * + * Once called, this method: + * 1. Stores the current token in a hidden file within a designated directory. + * 2. Initiates multicast traffic to advertise the local token. + * 3. Scans the hidden directory for aliveness tokens from other servers. + * + * @param adv_delay The interval (in milliseconds/seconds) between advertisement broadcasts. + * @param token The authentication or identification string provided by the backend. + * @param token_directory directory to store capio aliveness tokens + */ + void start(unsigned int adv_delay, const std::string &token, + const std::string &token_directory = ".capio_tokens/"); + + /** + * Stop current server instance from advertising itself and from receiving advertisements from + * other server instances. + * + * NOTE: this method does not destroy the CAPIO canary variable. for that the destruction of the + * class instance is required. + */ + void stop(); +}; + +#endif // CAPIO_DISCOVERY_HPP \ No newline at end of file diff --git a/capio/server/include/utils/shm_canary.hpp b/capio/server/include/utils/shm_canary.hpp new file mode 100644 index 000000000..765ab7c4c --- /dev/null +++ b/capio/server/include/utils/shm_canary.hpp @@ -0,0 +1,13 @@ +#ifndef CAPIO_SHM_CANARY_HPP +#define CAPIO_SHM_CANARY_HPP +#include + +class CapioShmCanary { + int _shm_id; + std::string _canary_name; + + public: + explicit CapioShmCanary(const std::string &capio_workflow_name); + ~CapioShmCanary(); +}; +#endif // CAPIO_SHM_CANARY_HPP diff --git a/capio/server/include/utils/signals.hpp b/capio/server/include/utils/signals.hpp index 94660c0a1..0f711611d 100644 --- a/capio/server/include/utils/signals.hpp +++ b/capio/server/include/utils/signals.hpp @@ -4,6 +4,7 @@ #include #include "remote/backend.hpp" +#include "remote/discovery.hpp" #include "server_println.hpp" #ifdef CAPIO_COVERAGE @@ -23,19 +24,22 @@ void sig_term_handler(int signum, siginfo_t *info, void *ptr) { } // free all the memory used - + discovery_service->stop(); delete client_manager; delete storage_manager; server_println("data_buffers cleanup completed", CapioCLEngine::get().getWorkflowName(), CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, __func__); + delete backend; + delete client_manager; + delete storage_manager; + #ifdef CAPIO_COVERAGE __gcov_dump(); #endif - delete backend; - delete shm_canary; + delete discovery_service; server_println("shutdown completed", CapioCLEngine::get().getWorkflowName(), CAPIO_LOG_SERVER_CLI_LEVEL_INFO, __func__); diff --git a/capio/server/src/discovery_service.cpp b/capio/server/src/discovery_service.cpp new file mode 100644 index 000000000..5bf856f70 --- /dev/null +++ b/capio/server/src/discovery_service.cpp @@ -0,0 +1,178 @@ +#include +#include + +#include "common/logger.hpp" +#include "remote/backend.hpp" +#include "remote/discovery.hpp" +#include "utils/capiocl_adapter.hpp" +#include "utils/common.hpp" + +extern Backend *backend; + +// constant required by setsockopt() +int REUSE_MCAST_SOCKET = 1; + +void advertise(const bool *terminate, const unsigned int delay_ms, + const std::string &advertisement_token, const std::string &adv_addr, + const unsigned int adv_port) { + const int advert_sock_fd = socket(AF_INET, SOCK_DGRAM, 0); + sockaddr_in advert_multicast_addr{}; + advert_multicast_addr.sin_family = AF_INET; + advert_multicast_addr.sin_port = htons(adv_port); + advert_multicast_addr.sin_addr.s_addr = inet_addr(adv_addr.c_str()); + + while (!*terminate) { + std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms)); + sendto(advert_sock_fd, advertisement_token.data(), advertisement_token.size(), 0, + reinterpret_cast(&advert_multicast_addr), sizeof(advert_multicast_addr)); + } + + close(advert_sock_fd); +} + +void mcast_thread_discovery_service(const bool *terminate, const std::string &adv_addr, + const unsigned int adv_port) { + START_LOG(gettid(), "call()"); + + int sockfd = socket(AF_INET, SOCK_DGRAM, 0); + + setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &REUSE_MCAST_SOCKET, sizeof(REUSE_MCAST_SOCKET)); + + timeval tv{}; + tv.tv_sec = 0; + tv.tv_usec = 100000; // 100,000 microseconds = 100ms + setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); + + sockaddr_in local_addr{}; + local_addr.sin_family = AF_INET; + local_addr.sin_port = htons(adv_port); + local_addr.sin_addr.s_addr = htonl(INADDR_ANY); + if (bind(sockfd, reinterpret_cast(&local_addr), sizeof(local_addr)) == -1) { + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, + "Error: unable to bind to multicast socket. Error is: " + + std::string(std::strerror(errno))); + // halt execution and return + return; + } + + ip_mreq mreq{}; + mreq.imr_multiaddr.s_addr = inet_addr(adv_addr.c_str()); + mreq.imr_interface.s_addr = htonl(INADDR_ANY); + setsockopt(sockfd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)); + + char incoming_token[2 * HOST_NAME_MAX] = {0}; + + while (!*terminate) { + + bzero(incoming_token, 2 * HOST_NAME_MAX); + + if (recvfrom(sockfd, incoming_token, sizeof(incoming_token) - 1, 0, nullptr, nullptr) > 0) { + backend->connect_to(incoming_token); + } + } + close(sockfd); +} + +void fs_discovery_service(const bool *terminate, const std::filesystem::path &token_directory_path, + const unsigned int delay_ms) { + // local cache to not reload tokens already found + // TODO: relax this by storing also last modified date, and reload in case changes occurred + // after first read + + std::vector cache; + + while (!*terminate) { + const auto iterator = std::filesystem::directory_iterator(token_directory_path); + for (auto &entry : iterator) { + if (std::find(cache.begin(), cache.end(), entry.path()) == cache.end()) { + cache.push_back(entry.path()); + + // Read connection token from FS + std::ifstream input(entry.path()); + std::string token; + input >> token; + + // Send token to backend to issue a direct connection. + // NOTE: backend will refuse to connect silently if connection is already + // established + backend->connect_to(token); + } + } + std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms)); + } +} + +void DiscoveryService::start(unsigned int adv_delay, const std::string &token, + const std::string &token_directory) { + + if (token.empty()) { + throw std::runtime_error("Advertisement token is empty"); + } + + if (token_directory.empty()) { + throw std::runtime_error("Provided token directory is empty"); + } + + if (!std::filesystem::exists(token_directory)) { + std::filesystem::create_directory(token_directory); + } + + std::string node_name(HOST_NAME_MAX, '\0'); + gethostname(node_name.data(), node_name.size()); + node_name.resize(strlen(node_name.data())); + + token_directory_path = token_directory; + token_filename = node_name + ".capio"; + advertisement_token = token; + + std::ofstream token_file(token_directory_path / token_filename); + token_file << advertisement_token; + token_file.close(); + + mcast_listener_thread = new std::thread(mcast_thread_discovery_service, &terminate, + capio_multicast_adv_address, capio_multicast_adv_port); + fs_listener_thread = + new std::thread(fs_discovery_service, &terminate, token_directory_path, adv_delay); + advertisement_thread = + new std::thread(advertise, &terminate, adv_delay, std::ref(advertisement_token), + capio_multicast_adv_address, capio_multicast_adv_port); + + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, "DiscoveryService will advertise " + + advertisement_token + " every " + + std::to_string(adv_delay) + "ms."); +} +void DiscoveryService::stop() { + terminate = true; + + if (mcast_listener_thread != nullptr && mcast_listener_thread->joinable()) { + mcast_listener_thread->join(); + mcast_listener_thread = nullptr; + } + + if (fs_listener_thread != nullptr && fs_listener_thread->joinable()) { + fs_listener_thread->join(); + fs_listener_thread = nullptr; + } + + if (advertisement_thread != nullptr && advertisement_thread->joinable()) { + advertisement_thread->join(); + advertisement_thread = nullptr; + } +} + +DiscoveryService::DiscoveryService(const std::string &mcast_addr, const unsigned int mcast_port) + : capio_multicast_adv_address(mcast_addr), capio_multicast_adv_port(mcast_port) { + shm_canary = new CapioShmCanary(CapioCLEngine::get().getWorkflowName()); +} + +DiscoveryService::~DiscoveryService() { + // if destructor is called before stop(), then stop the the service first. + if (!terminate) { + stop(); + } + // delete aliveness token + std::filesystem::remove(token_directory_path / token_filename); + + // delete shm canary + delete shm_canary; +} \ No newline at end of file diff --git a/capio/server/src/mpi_backend.cpp b/capio/server/src/mpi_backend.cpp index ead84a42c..00ebe0aec 100644 --- a/capio/server/src/mpi_backend.cpp +++ b/capio/server/src/mpi_backend.cpp @@ -119,6 +119,7 @@ void MPIBackend::recv_file(char *shm, const std::string &source, long int bytes_ LOG("Chunk size is %ld bytes", bytes_received); } } +void MPIBackend::connect_to(const std::string &target) { return; } MPISYNCBackend::MPISYNCBackend(int argc, char *argv[]) : MPIBackend(argc, argv) { START_LOG(gettid(), "call()"); diff --git a/capio/server/src/none_backend.cpp b/capio/server/src/none_backend.cpp index e0e8e3208..588d5d843 100644 --- a/capio/server/src/none_backend.cpp +++ b/capio/server/src/none_backend.cpp @@ -30,4 +30,5 @@ void NoneBackend::send_request(const char *message, const int message_len, void NoneBackend::recv_file(char *shm, const std::string &source, const long int bytes_expected) { START_LOG(gettid(), "call(shm=%ld, source=%s, bytes_expected=%ld)", shm, source.c_str(), bytes_expected); -} \ No newline at end of file +} +void NoneBackend::connect_to(const std::string &target) { return; } \ No newline at end of file diff --git a/capio/server/src/shm_canary.cpp b/capio/server/src/shm_canary.cpp new file mode 100644 index 000000000..4c0bdceab --- /dev/null +++ b/capio/server/src/shm_canary.cpp @@ -0,0 +1,28 @@ +#include "utils/shm_canary.hpp" + +#include "common/env.hpp" +#include "common/logger.hpp" +#include "utils/common.hpp" + +CapioShmCanary::CapioShmCanary(const std::string &capio_workflow_name) + : _canary_name(capio_workflow_name) { + START_LOG(capio_syscall(SYS_gettid), "call(capio_workflow_name: %s)", _canary_name.data()); + if (_canary_name.empty()) { + _canary_name = get_capio_workflow_name(); + } + _shm_id = shm_open(_canary_name.data(), O_CREAT | O_EXCL, S_IRUSR | S_IWUSR); + if (_shm_id == -1) { + + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, + "Error: canary variable " + _canary_name + " already exists!"); + LOG(CAPIO_SHM_CANARY_ERROR, _canary_name.data()); + ERR_EXIT("ERR: shm canary flag already exists"); + } +} + +CapioShmCanary::~CapioShmCanary() { + START_LOG(capio_syscall(SYS_gettid), "call()"); + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, "Removing shared memory canary flag"); + close(_shm_id); + SHM_DESTROY_CHECK(_canary_name.c_str()); +} diff --git a/capio/tests/unit/server/src/capio_file.cpp b/capio/tests/unit/server/src/capio_file.cpp index db7ead849..28095dfe9 100644 --- a/capio/tests/unit/server/src/capio_file.cpp +++ b/capio/tests/unit/server/src/capio_file.cpp @@ -338,6 +338,7 @@ class MockBackend : public Backend { RemoteRequest read_next_request() override { return {nullptr, ""}; } void send_file(char *shm, long int nbytes, const std::string &target) override {} void send_request(const char *message, int message_len, const std::string &target) override {} + void connect_to(const std::string &target) override {} }; class MockBackendTestFixture : public ::testing::Test {