diff --git a/.gitignore b/.gitignore index d29456e74..20776d91a 100644 --- a/.gitignore +++ b/.gitignore @@ -48,4 +48,6 @@ capio_logs debug build +.capio_tokens + cmake_test_discovery*.json diff --git a/CMakeLists.txt b/CMakeLists.txt index 6a930e879..d7dd6ce33 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -81,6 +81,12 @@ FetchContent_Declare( GIT_TAG v1.4.0 ) +FetchContent_Declare( + mtcl + GIT_REPOSITORY https://github.com/ParaGroup/MTCL + GIT_TAG e5f2bfeea0fc3d704554c7df02e7857f0a00bbba +) + ##################################### # Targets ##################################### diff --git a/capio/common/constants.hpp b/capio/common/constants.hpp index 3a230f4bc..1c5e8d66e 100644 --- a/capio/common/constants.hpp +++ b/capio/common/constants.hpp @@ -11,6 +11,10 @@ constexpr size_t CAPIO_DEFAULT_DIR_INITIAL_SIZE = 1024L * 1024 * 1024; constexpr off64_t CAPIO_DEFAULT_FILE_INITIAL_SIZE = 1024L * 1024 * 1024 * 4; +// CAPIO backend constants +constexpr char CAPIO_MCAST_ADV_DEFAULT_ADDR[] = "224.0.0.2"; +constexpr unsigned int CAPIO_MCAST_ADV_DEFAULT_PORT = 22334; + // CAPIO default values for shared memory constexpr char CAPIO_DEFAULT_WORKFLOW_NAME[] = "CAPIO"; constexpr char CAPIO_DEFAULT_APP_NAME[] = "default_app"; diff --git a/capio/common/requests.hpp b/capio/common/requests.hpp index 2ae70083a..e73f8a8c6 100644 --- a/capio/common/requests.hpp +++ b/capio/common/requests.hpp @@ -37,4 +37,6 @@ constexpr const int CAPIO_SERVER_REQUEST_STAT_REPLY = 3; constexpr const int CAPIO_SERVER_NR_REQUEST = 4; +constexpr const int BACKEND_HAVE_FINISH_SEND_REQUEST = 4; + #endif // CAPIO_COMMON_REQUESTS_HPP diff --git a/capio/common/shm.hpp b/capio/common/shm.hpp index c425e080b..7f0c0515d 100644 --- a/capio/common/shm.hpp +++ b/capio/common/shm.hpp @@ -43,48 +43,6 @@ #endif -class CapioShmCanary { - int _shm_id; - std::string _canary_name; - - public: - explicit CapioShmCanary(std::string capio_workflow_name) : _canary_name(capio_workflow_name) { - START_LOG(capio_syscall(SYS_gettid), "call(capio_workflow_name: %s)", _canary_name.data()); - if (_canary_name.empty()) { - _canary_name = get_capio_workflow_name(); - } - _shm_id = shm_open(_canary_name.data(), O_CREAT | O_EXCL, S_IRUSR | S_IWUSR); - if (_shm_id == -1) { - LOG(CAPIO_SHM_CANARY_ERROR, _canary_name.data()); -#ifndef __CAPIO_POSIX - const auto message = new char[strlen(CAPIO_SHM_CANARY_ERROR)]; - sprintf(message, CAPIO_SHM_CANARY_ERROR, _canary_name.data()); - server_println(message, capio_workflow_name, CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, - "CapioShmCanary"); - delete[] message; -#endif - ERR_EXIT("ERR: shm canary flag already exists"); - } -#ifndef __CAPIO_POSIX - server_println("Created Capio SHM canary: " + _canary_name, capio_workflow_name, - CAPIO_LOG_SERVER_CLI_LEVEL_STATUS, "CapioShmCanary"); -#endif - }; - - ~CapioShmCanary() { - START_LOG(capio_syscall(SYS_gettid), "call()"); -#ifndef __CAPIO_POSIX - server_println("Removing shared memory canary flag", get_capio_workflow_name(), - CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, "CapioShmCanary"); -#endif - close(_shm_id); - SHM_DESTROY_CHECK(_canary_name.c_str()); - } -}; - -// FIXME: Remove the inline specifier by using extern -inline CapioShmCanary *shm_canary; - inline void *create_shm(const std::string &shm_name, const long int size) { START_LOG(capio_syscall(SYS_gettid), "call(shm_name=%s, size=%ld)", shm_name.c_str(), size); diff --git a/capio/server/CMakeLists.txt b/capio/server/CMakeLists.txt index 701789550..acf5a732e 100644 --- a/capio/server/CMakeLists.txt +++ b/capio/server/CMakeLists.txt @@ -19,7 +19,7 @@ FetchContent_Declare( set(ARGS_BUILD_EXAMPLE OFF CACHE INTERNAL "") set(ARGS_BUILD_UNITTESTS OFF CACHE INTERNAL "") -FetchContent_MakeAvailable(args capio_cl) +FetchContent_MakeAvailable(args capio_cl mtcl) ##################################### # Target definition @@ -38,6 +38,7 @@ target_include_directories(${TARGET_NAME} PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include ${MPI_INCLUDE_PATH} ${capio_cl_SOURCE_DIR} + ${mtcl_SOURCE_DIR}/include ) ##################################### diff --git a/capio/server/capio_server.cpp b/capio/server/capio_server.cpp index 403ec8f49..782697582 100644 --- a/capio/server/capio_server.cpp +++ b/capio/server/capio_server.cpp @@ -28,6 +28,7 @@ #include "common/requests.hpp" #include "common/semaphore.hpp" #include "remote/backend.hpp" +#include "remote/discovery.hpp" #include "storage/capio_file.hpp" #include "utils/common.hpp" #include "utils/env.hpp" @@ -36,6 +37,7 @@ ClientManager *client_manager; StorageManager *storage_manager; Backend *backend; +DiscoveryService *discovery_service; #include "handlers.hpp" #include "utils/cli_parser.hpp" @@ -140,13 +142,13 @@ int main(int argc, char **argv) { capio_cl_engine->print(); - backend = select_backend(configuration.backend_name, argc, argv); + discovery_service = new DiscoveryService(); + backend = select_backend(configuration.backend_name, argc, argv); START_LOG(gettid(), "call()"); open_files_location(); - shm_canary = new CapioShmCanary(capio_cl_engine->getWorkflowName()); storage_manager = new StorageManager(); client_manager = new ClientManager(); @@ -159,6 +161,5 @@ int main(int argc, char **argv) { server_thread.join(); remote_listener_thread.join(); - delete backend; return 0; } \ No newline at end of file diff --git a/capio/server/include/remote/atomic_queue.hpp b/capio/server/include/remote/atomic_queue.hpp new file mode 100644 index 000000000..258dd1dc2 --- /dev/null +++ b/capio/server/include/remote/atomic_queue.hpp @@ -0,0 +1,72 @@ + +#ifndef CAPIO_BACKEND_ATOMIC_QUEUE_HPP +#define CAPIO_BACKEND_ATOMIC_QUEUE_HPP + +#include +#include +#include +#include +#include + +template struct AtomicQueueElement { + + AtomicQueueElement(T message, size_t message_size, const std::string &origin) { + this->object = message; + this->object_size = message_size; + this->target_or_source = origin; + } + + T object; + size_t object_size = 0; + std::string target_or_source; +}; + +template class AtomicQueue { + std::queue> _queue; + std::mutex _mutex; + std::condition_variable _lock_cond; + + bool _shutdown = false; + + public: + ~AtomicQueue() { + { + std::lock_guard lg(_mutex); + _shutdown = true; + } + _lock_cond.notify_all(); + } + + void push(T message, size_t message_size, const std::string &origin) { + { + std::lock_guard lg(_mutex); + if (_shutdown) { + return; + } + _queue.emplace(message, message_size, origin); + } + _lock_cond.notify_all(); + } + + AtomicQueueElement pop() { + std::unique_lock lock(_mutex); + _lock_cond.wait(lock, [this] { return !_queue.empty() || _shutdown; }); + auto s = std::move(_queue.front()); + _queue.pop(); + + return s; + } + + std::optional> try_pop() { + std::lock_guard lg(_mutex); + if (_queue.empty() || _shutdown) { + return std::nullopt; + } + + auto s = std::move(_queue.front()); + _queue.pop(); + return s; + } +}; + +#endif // CAPIO_BACKEND_ATOMIC_QUEUE_HPP diff --git a/capio/server/include/remote/backend.hpp b/capio/server/include/remote/backend.hpp index f1c827821..50b336b73 100644 --- a/capio/server/include/remote/backend.hpp +++ b/capio/server/include/remote/backend.hpp @@ -87,6 +87,12 @@ class Backend { * @param target */ virtual void send_request(const char *message, int message_len, const std::string &target) = 0; + + /** + * Connect this server instance to a remote server instance + * @param target Remote server instance identification + */ + virtual void connect_to(const std::string &target) = 0; }; #endif // CAPIO_SERVER_REMOTE_BACKEND_HPP diff --git a/capio/server/include/remote/backend/include.hpp b/capio/server/include/remote/backend/include.hpp index 5518624cf..7f04735c4 100644 --- a/capio/server/include/remote/backend/include.hpp +++ b/capio/server/include/remote/backend/include.hpp @@ -5,5 +5,7 @@ */ #include "mpi.hpp" +#include "mtcl.hpp" #include "none.hpp" + #endif // CAPIO_SERVER_REMOTE_BACKEND_INCLUDE_HPP diff --git a/capio/server/include/remote/backend/mpi.hpp b/capio/server/include/remote/backend/mpi.hpp index fca12a752..be185f01e 100644 --- a/capio/server/include/remote/backend/mpi.hpp +++ b/capio/server/include/remote/backend/mpi.hpp @@ -27,6 +27,7 @@ class MPIBackend : public Backend { void send_file(char *shm, long int nbytes, const std::string &target) override; void send_request(const char *message, int message_len, const std::string &target) override; void recv_file(char *shm, const std::string &source, long int bytes_expected) override; + void connect_to(const std::string &target) override; }; class MPISYNCBackend final : public MPIBackend { diff --git a/capio/server/include/remote/backend/mtcl.hpp b/capio/server/include/remote/backend/mtcl.hpp new file mode 100644 index 000000000..c90da12ba --- /dev/null +++ b/capio/server/include/remote/backend/mtcl.hpp @@ -0,0 +1,92 @@ +#ifndef MTCL_BACKEND_HPP +#define MTCL_BACKEND_HPP + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "common/constants.hpp" +#include "common/logger.hpp" +#include "remote/atomic_queue.hpp" +#include "remote/backend.hpp" + +#include + +typedef unsigned long long int capio_off64_t; + +/** + * This avoids it to include the MTCL library here as it is a header-only library. + * this is equivalent to use extern in C but for class + */ +namespace MTCL { +class HandleUser; +} + +// TODO: extend backend class +class MTCLBackend : public Backend { + + int thread_sleep_times = 0; + bool continue_execution = true; + + const std::string selfToken, ownPort, usedProtocol; + + std::shared_mutex open_connections_lock; + std::unordered_map *> open_connections; + + std::thread *incoming_connection_thread = nullptr; + std::vector connection_threads; + + AtomicQueue incoming_request_queue; + + /** + * Waits for incoming new requests to connect to new server instances. When a new request + * arrives, it then handshakes with the remote servers, opening a new connection, and starting a + * new thread that will handle remote requests. If no request arrives within the sleep_time + * parameter, then the method will issue an advertisement on UDP multicast of its alive state + * so that other servers may instantiate a new connection with me. + * + * @param ownPort + * @param usedProtocol + * @param continue_execution + * @param sleep_time + * @param open_connections + * @param open_connection_guard + * @param _connection_threads + * @param incoming_request_queue + */ + void static incomingMTCLConnectionListener( + const std::string &ownPort, const std::string &usedProtocol, const bool *continue_execution, + int sleep_time, + std::unordered_map *> *open_connections, + std::shared_mutex *open_connection_guard, std::vector *_connection_threads, + AtomicQueue *incoming_request_queue); + + public: + explicit MTCLBackend(const std::string &proto, const std::string &port, int sleep_time); + + ~MTCLBackend() override; + + RemoteRequest read_next_request() override; + + void handshake_servers() override; + + const std::set get_nodes() override; + + void send_request(const char *message, int message_len, const std::string &target) override; + + void send_file(char *shm, long int nbytes, const std::string &target) override; + + void recv_file(char *shm, const std::string &source, long int bytes_expected) override; + + void connect_to(const std::string &target_token) override; +}; + +#endif // MTCL_BACKEND_HPP \ No newline at end of file diff --git a/capio/server/include/remote/backend/none.hpp b/capio/server/include/remote/backend/none.hpp index 3faae205f..c0cd2298a 100644 --- a/capio/server/include/remote/backend/none.hpp +++ b/capio/server/include/remote/backend/none.hpp @@ -11,5 +11,6 @@ class NoneBackend final : public Backend { void send_file(char *shm, long int nbytes, const std::string &target) override; void send_request(const char *message, int message_len, const std::string &target) override; void recv_file(char *shm, const std::string &source, long int bytes_expected) override; + void connect_to(const std::string &target) override; }; #endif // CAPIO_SERVER_REMOTE_BACKEND_NONE_HPP diff --git a/capio/server/include/remote/discovery.hpp b/capio/server/include/remote/discovery.hpp new file mode 100644 index 000000000..4454a808b --- /dev/null +++ b/capio/server/include/remote/discovery.hpp @@ -0,0 +1,88 @@ +#ifndef CAPIO_DISCOVERY_HPP +#define CAPIO_DISCOVERY_HPP + +#include +#include + +#include "utils/shm_canary.hpp" + +/** + * Discovery service. Responsible for: + * - Detect other server instances running in the same node with the same workflow name (and halts + * startup if it finds one) + * - Detect other remote running server instances of capio servers and issue commands to the backend + * to open a connection with them as soon as they are found. + */ +class DiscoveryService { + + /// @brief Variable used to signal termination to child threads + bool terminate = false; + + /// @brief Handle for multicast based discovery thread + std::thread *mcast_listener_thread = nullptr; + /// @brief Handle for file system based discovery thread + std::thread *fs_listener_thread = nullptr; + /// @brief Handle for thread advertising this server instance + std::thread *advertisement_thread = nullptr; + + /// @brief Token to be advertised by this server + std::string advertisement_token; + + /// @brief Canary variable to detect other server instances running locally that are logically + /// equivalent to the one starting up + CapioShmCanary *shm_canary; + + /// @brief Directory to look into for CAPIO tokens + std::filesystem::path token_directory_path; + /// @brief This server instance token filename + std::filesystem::path token_filename; + + /// @brief Multicast address + const std::string capio_multicast_adv_address; + + /// @brief multicast port + const unsigned int capio_multicast_adv_port; + + public: + /** + * Construct a new Discovery Service class + * @param mcast_addr Address to send and receive aliveness token from other servers + * @param mcast_port Port to send and receive aliveness token from other servers + */ + explicit DiscoveryService(const std::string &mcast_addr = CAPIO_MCAST_ADV_DEFAULT_ADDR, + unsigned int mcast_port = CAPIO_MCAST_ADV_DEFAULT_PORT); + + /// @brief Default destructor + ~DiscoveryService(); + + /** + * @brief Configures and starts the discovery service to advertise and scan for tokens. + * + * Sets the advertisement token used by other server instances to establish a connection. + * The token must conform to the specific backend requirements for incoming connections. + * * @note The token is not passed via the constructor because the Discovery Service + * must be instantiated before the Backend provides the token. + * + * Once called, this method: + * 1. Stores the current token in a hidden file within a designated directory. + * 2. Initiates multicast traffic to advertise the local token. + * 3. Scans the hidden directory for aliveness tokens from other servers. + * + * @param adv_delay The interval (in milliseconds/seconds) between advertisement broadcasts. + * @param token The authentication or identification string provided by the backend. + * @param token_directory directory to store capio aliveness tokens + */ + void start(unsigned int adv_delay, const std::string &token, + const std::string &token_directory = ".capio_tokens/"); + + /** + * Stop current server instance from advertising itself and from receiving advertisements from + * other server instances. + * + * NOTE: this method does not destroy the CAPIO canary variable. for that the destruction of the + * class instance is required. + */ + void stop(); +}; + +#endif // CAPIO_DISCOVERY_HPP \ No newline at end of file diff --git a/capio/server/include/remote/listener.hpp b/capio/server/include/remote/listener.hpp index 747c20efc..bf9a3c597 100644 --- a/capio/server/include/remote/listener.hpp +++ b/capio/server/include/remote/listener.hpp @@ -41,6 +41,15 @@ inline Backend *select_backend(const std::string &backend_name, int argc, char * return new MPIBackend(argc, argv); } + if (backend_name == "mtcl") { + LOG("backend selected: MTCL"); + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << "Starting CAPIO with MTCL backend" + << std::endl; + char hostname[HOST_NAME_MAX]{0}; + gethostname(hostname, HOST_NAME_MAX); + return new MTCLBackend("TCP", "1234", 1000000); + } + if (backend_name == "mpisync") { LOG("backend selected: mpisync"); server_println("Starting CAPIO with MPI (SYNC) backend", diff --git a/capio/server/include/utils/shm_canary.hpp b/capio/server/include/utils/shm_canary.hpp new file mode 100644 index 000000000..765ab7c4c --- /dev/null +++ b/capio/server/include/utils/shm_canary.hpp @@ -0,0 +1,13 @@ +#ifndef CAPIO_SHM_CANARY_HPP +#define CAPIO_SHM_CANARY_HPP +#include + +class CapioShmCanary { + int _shm_id; + std::string _canary_name; + + public: + explicit CapioShmCanary(const std::string &capio_workflow_name); + ~CapioShmCanary(); +}; +#endif // CAPIO_SHM_CANARY_HPP diff --git a/capio/server/include/utils/signals.hpp b/capio/server/include/utils/signals.hpp index 94660c0a1..0f711611d 100644 --- a/capio/server/include/utils/signals.hpp +++ b/capio/server/include/utils/signals.hpp @@ -4,6 +4,7 @@ #include #include "remote/backend.hpp" +#include "remote/discovery.hpp" #include "server_println.hpp" #ifdef CAPIO_COVERAGE @@ -23,19 +24,22 @@ void sig_term_handler(int signum, siginfo_t *info, void *ptr) { } // free all the memory used - + discovery_service->stop(); delete client_manager; delete storage_manager; server_println("data_buffers cleanup completed", CapioCLEngine::get().getWorkflowName(), CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, __func__); + delete backend; + delete client_manager; + delete storage_manager; + #ifdef CAPIO_COVERAGE __gcov_dump(); #endif - delete backend; - delete shm_canary; + delete discovery_service; server_println("shutdown completed", CapioCLEngine::get().getWorkflowName(), CAPIO_LOG_SERVER_CLI_LEVEL_INFO, __func__); diff --git a/capio/server/src/discovery_service.cpp b/capio/server/src/discovery_service.cpp new file mode 100644 index 000000000..5bf856f70 --- /dev/null +++ b/capio/server/src/discovery_service.cpp @@ -0,0 +1,178 @@ +#include +#include + +#include "common/logger.hpp" +#include "remote/backend.hpp" +#include "remote/discovery.hpp" +#include "utils/capiocl_adapter.hpp" +#include "utils/common.hpp" + +extern Backend *backend; + +// constant required by setsockopt() +int REUSE_MCAST_SOCKET = 1; + +void advertise(const bool *terminate, const unsigned int delay_ms, + const std::string &advertisement_token, const std::string &adv_addr, + const unsigned int adv_port) { + const int advert_sock_fd = socket(AF_INET, SOCK_DGRAM, 0); + sockaddr_in advert_multicast_addr{}; + advert_multicast_addr.sin_family = AF_INET; + advert_multicast_addr.sin_port = htons(adv_port); + advert_multicast_addr.sin_addr.s_addr = inet_addr(adv_addr.c_str()); + + while (!*terminate) { + std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms)); + sendto(advert_sock_fd, advertisement_token.data(), advertisement_token.size(), 0, + reinterpret_cast(&advert_multicast_addr), sizeof(advert_multicast_addr)); + } + + close(advert_sock_fd); +} + +void mcast_thread_discovery_service(const bool *terminate, const std::string &adv_addr, + const unsigned int adv_port) { + START_LOG(gettid(), "call()"); + + int sockfd = socket(AF_INET, SOCK_DGRAM, 0); + + setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &REUSE_MCAST_SOCKET, sizeof(REUSE_MCAST_SOCKET)); + + timeval tv{}; + tv.tv_sec = 0; + tv.tv_usec = 100000; // 100,000 microseconds = 100ms + setsockopt(sockfd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); + + sockaddr_in local_addr{}; + local_addr.sin_family = AF_INET; + local_addr.sin_port = htons(adv_port); + local_addr.sin_addr.s_addr = htonl(INADDR_ANY); + if (bind(sockfd, reinterpret_cast(&local_addr), sizeof(local_addr)) == -1) { + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, + "Error: unable to bind to multicast socket. Error is: " + + std::string(std::strerror(errno))); + // halt execution and return + return; + } + + ip_mreq mreq{}; + mreq.imr_multiaddr.s_addr = inet_addr(adv_addr.c_str()); + mreq.imr_interface.s_addr = htonl(INADDR_ANY); + setsockopt(sockfd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)); + + char incoming_token[2 * HOST_NAME_MAX] = {0}; + + while (!*terminate) { + + bzero(incoming_token, 2 * HOST_NAME_MAX); + + if (recvfrom(sockfd, incoming_token, sizeof(incoming_token) - 1, 0, nullptr, nullptr) > 0) { + backend->connect_to(incoming_token); + } + } + close(sockfd); +} + +void fs_discovery_service(const bool *terminate, const std::filesystem::path &token_directory_path, + const unsigned int delay_ms) { + // local cache to not reload tokens already found + // TODO: relax this by storing also last modified date, and reload in case changes occurred + // after first read + + std::vector cache; + + while (!*terminate) { + const auto iterator = std::filesystem::directory_iterator(token_directory_path); + for (auto &entry : iterator) { + if (std::find(cache.begin(), cache.end(), entry.path()) == cache.end()) { + cache.push_back(entry.path()); + + // Read connection token from FS + std::ifstream input(entry.path()); + std::string token; + input >> token; + + // Send token to backend to issue a direct connection. + // NOTE: backend will refuse to connect silently if connection is already + // established + backend->connect_to(token); + } + } + std::this_thread::sleep_for(std::chrono::milliseconds(delay_ms)); + } +} + +void DiscoveryService::start(unsigned int adv_delay, const std::string &token, + const std::string &token_directory) { + + if (token.empty()) { + throw std::runtime_error("Advertisement token is empty"); + } + + if (token_directory.empty()) { + throw std::runtime_error("Provided token directory is empty"); + } + + if (!std::filesystem::exists(token_directory)) { + std::filesystem::create_directory(token_directory); + } + + std::string node_name(HOST_NAME_MAX, '\0'); + gethostname(node_name.data(), node_name.size()); + node_name.resize(strlen(node_name.data())); + + token_directory_path = token_directory; + token_filename = node_name + ".capio"; + advertisement_token = token; + + std::ofstream token_file(token_directory_path / token_filename); + token_file << advertisement_token; + token_file.close(); + + mcast_listener_thread = new std::thread(mcast_thread_discovery_service, &terminate, + capio_multicast_adv_address, capio_multicast_adv_port); + fs_listener_thread = + new std::thread(fs_discovery_service, &terminate, token_directory_path, adv_delay); + advertisement_thread = + new std::thread(advertise, &terminate, adv_delay, std::ref(advertisement_token), + capio_multicast_adv_address, capio_multicast_adv_port); + + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, "DiscoveryService will advertise " + + advertisement_token + " every " + + std::to_string(adv_delay) + "ms."); +} +void DiscoveryService::stop() { + terminate = true; + + if (mcast_listener_thread != nullptr && mcast_listener_thread->joinable()) { + mcast_listener_thread->join(); + mcast_listener_thread = nullptr; + } + + if (fs_listener_thread != nullptr && fs_listener_thread->joinable()) { + fs_listener_thread->join(); + fs_listener_thread = nullptr; + } + + if (advertisement_thread != nullptr && advertisement_thread->joinable()) { + advertisement_thread->join(); + advertisement_thread = nullptr; + } +} + +DiscoveryService::DiscoveryService(const std::string &mcast_addr, const unsigned int mcast_port) + : capio_multicast_adv_address(mcast_addr), capio_multicast_adv_port(mcast_port) { + shm_canary = new CapioShmCanary(CapioCLEngine::get().getWorkflowName()); +} + +DiscoveryService::~DiscoveryService() { + // if destructor is called before stop(), then stop the the service first. + if (!terminate) { + stop(); + } + // delete aliveness token + std::filesystem::remove(token_directory_path / token_filename); + + // delete shm canary + delete shm_canary; +} \ No newline at end of file diff --git a/capio/server/src/mpi_backend.cpp b/capio/server/src/mpi_backend.cpp index ead84a42c..e62e2013f 100644 --- a/capio/server/src/mpi_backend.cpp +++ b/capio/server/src/mpi_backend.cpp @@ -120,6 +120,11 @@ void MPIBackend::recv_file(char *shm, const std::string &source, long int bytes_ } } +void MPIBackend::connect_to(const std::string &target) { + START_LOG(gettid(), "call(target=%s)", target.c_str()); + LOG("connect_to called on backend that is not dynamic. ignoring call"); +} + MPISYNCBackend::MPISYNCBackend(int argc, char *argv[]) : MPIBackend(argc, argv) { START_LOG(gettid(), "call()"); LOG("Wrapped MPI backend with MPISYC backend"); diff --git a/capio/server/src/mtcl_backend.cpp b/capio/server/src/mtcl_backend.cpp new file mode 100644 index 000000000..95bb88489 --- /dev/null +++ b/capio/server/src/mtcl_backend.cpp @@ -0,0 +1,315 @@ +#include "common/logger.hpp" +#include "common/requests.hpp" +#include "remote/backend.hpp" +#include "remote/backend/mtcl.hpp" +#include "remote/discovery.hpp" +#include "storage/manager.hpp" +#include "utils/common.hpp" + +#include +#include + +// TODO: THERE IS A MASSIVE MEMORY LEAK WHEN SENDING AND RECEIVING CONST CHAR*. FIX IT BEFORE MERGE + +// TODO: CLI args (with defaults) instead of hardcoded values + +constexpr int max_net_op = 10; + +extern Backend *backend; +extern DiscoveryService *discovery_service; +extern StorageManager *storage_service; + +RemoteRequest MTCLBackend::read_next_request() { + START_LOG(gettid(), "call()"); + + auto optional_request = incoming_request_queue.try_pop(); + while (!optional_request.has_value()) { + std::this_thread::sleep_for(std::chrono::milliseconds(thread_sleep_times)); + optional_request = incoming_request_queue.try_pop(); + } + + auto [req, req_size, source] = optional_request.value(); + LOG("Received %s from %d", req.c_str(), source.c_str()); + return {req.data(), source}; +} + +/** + * @brief Manages a dedicated P2P connection to a single remote capio_server instance. + * * The communication logic follows a deterministic role-assignment algorithm: + * 1. **Initial Role Assignment:** The initial sender is determined by the lexicographical + * comparison of the two participating hostnames (the smaller hostname starts as sender). + * 2. **Operational Phases:** The thread executes alternating phases of sending and receiving, + * processing up to `max_net_op` operations per phase. + * 3. **Role Switching:** Nodes synchronize a role swap using a `HAVE_FINISH_SEND_REQUEST` + * signal. This occurs when the current sender either exhausts its message queue or reaches + * the `max_net_op` threshold. + * 4. **Termination:** The loop persists as long as the remote handle remains valid and the + * `terminate` flag is false. + * @param HandlerPointer A valid MTCL HandlePointer for the connection. + * @param remote_hostname The hostname of the remote endpoint. + * @param queue Pointer to the communication queue containing inbound and outbound sub-queues. + * @param sleep_time Microseconds to sleep between thread cycles to prevent CPU pinning. + * @param continue_execution Reference to a boolean flag to know when to stop execution + * to signal execution shutdown. + * @param incoming_request_queue + */ +void serverConnectionHandler(MTCL::HandleUser HandlerPointer, const std::string &remote_hostname, + AtomicQueue *queue, const int sleep_time, + const bool *continue_execution, + AtomicQueue *incoming_request_queue) { + + char ownHostname[HOST_NAME_MAX]; + gethostname(ownHostname, HOST_NAME_MAX); + bool my_turn_to_send = ownHostname > remote_hostname; + + char request_has_finished_to_send[CAPIO_REQ_MAX_SIZE]{0}; + sprintf(request_has_finished_to_send, "%03d", BACKEND_HAVE_FINISH_SEND_REQUEST); + + START_LOG(gettid(), "call(remote_hostname=%s)", remote_hostname.c_str()); + + LOG("Will begin execution with %s phase", my_turn_to_send ? "sending" : "receiving"); + + while (HandlerPointer.isValid()) { + // execute up to N operation of send &/or receive, to avoid starvation + + if (my_turn_to_send) { + LOG("Send PHASE"); + for (int i = 0; i < max_net_op; i++) { + if (const auto request_opt = queue->try_pop(); request_opt.has_value()) { + const auto &[request, request_size, target] = request_opt.value(); + LOG("Request to be sent = %s to %s", request, target.c_str()); + + HandlerPointer.send(&request_size, sizeof(request_size)); + HandlerPointer.send(request, request_size); + } + } + LOG("Completed SEND PHASE"); + // Send message I have finished the max number of allowed consecutive io operations + HandlerPointer.send(request_has_finished_to_send, sizeof(request_has_finished_to_send)); + + } else { + + bool continue_receive_phase = true; + size_t receive_size = 0; + LOG("Receive PHASE"); + while (continue_receive_phase) { + // Receive phase + HandlerPointer.probe(receive_size, false); + if (receive_size > 0) { + LOG("A request is incoming"); + + ssize_t incoming_request_size = 0; + HandlerPointer.receive(&incoming_request_size, sizeof(incoming_request_size)); + + const auto incoming_request = new char[incoming_request_size]; + const auto resp_size = + HandlerPointer.receive(incoming_request, incoming_request_size); + LOG("Received request with size = %ld", incoming_request_size); + + if (const auto code = + RemoteRequest{incoming_request, remote_hostname}.get_code(); + code == BACKEND_HAVE_FINISH_SEND_REQUEST) { + // Finished sending data. Set continue_receive_phase = false to go to next + // phase + LOG("CTRL MSG received: Other has finished sending phase. Switching me " + "from receive to send"); + continue_receive_phase = false; + } else { + incoming_request_queue->push(incoming_request, resp_size, remote_hostname); + } + } + } + } + + // terminate phase + if (!*continue_execution) { + LOG("[TERM PHASE] Closing connection"); + HandlerPointer.close(); + LOG("[TERM PHASE] Terminating thread server_connection_handler"); + return; + } + + my_turn_to_send = !my_turn_to_send; + std::this_thread::sleep_for(std::chrono::milliseconds(sleep_time)); + } +} + +void MTCLBackend::incomingMTCLConnectionListener( + const std::string &ownPort, const std::string &usedProtocol, const bool *continue_execution, + int sleep_time, std::unordered_map *> *open_connections, + std::shared_mutex *open_connection_guard, std::vector *_connection_threads, + AtomicQueue *incoming_request_queue) { + + START_LOG(gettid(), "call(sleep_time=%d)", sleep_time); + + while (*continue_execution) { + + if (auto UserManager = MTCL::Manager::getNext(std::chrono::microseconds(sleep_time)); + UserManager.isValid()) { + // received MTCL handle + LOG("Handle user is valid"); + size_t remoteHostnameSize = -1; + if (UserManager.receive(&remoteHostnameSize, sizeof(remoteHostnameSize)) <= 0 || + remoteHostnameSize == 0 || remoteHostnameSize > HOST_NAME_MAX) { + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, + "Remote hostname size received is zero or negative"); + UserManager.close(); + continue; + } + + std::string remote_hostname(remoteHostnameSize, '\0'); + UserManager.receive(remote_hostname.data(), remoteHostnameSize); + LOG("Received connection hostname: %s", remote_hostname.c_str()); + + auto *queue = new AtomicQueue(); + { + const std::unique_lock lock(*open_connection_guard); + (*open_connections)[remote_hostname] = queue; + } + _connection_threads->push_back( + new std::thread(serverConnectionHandler, std::move(UserManager), remote_hostname, + queue, sleep_time, continue_execution, incoming_request_queue)); + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, "Connected to " + usedProtocol + ":" + + remote_hostname + ":" + ownPort + + " (incoming)"); + } + } +} + +MTCLBackend::MTCLBackend(const std::string &proto, const std::string &port, const int sleep_time) + : Backend(HOST_NAME_MAX), thread_sleep_times(sleep_time), selfToken(proto + ":0.0.0.0:" + port), + ownPort(port), usedProtocol(proto) { + START_LOG(gettid(), "INFO: instance of MTCLBackend"); + + LOG("My hostname is %s. Starting to listen on connection %s", node_name.c_str(), + selfToken.c_str()); + + std::string hostname_id("server-"); + hostname_id += node_name; + MTCL::Manager::init(hostname_id); + + MTCL::Manager::listen(selfToken); + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, "MTCL_backend initialization completed."); +} + +MTCLBackend::~MTCLBackend() { + START_LOG(gettid(), "call()"); + continue_execution = false; + + incoming_connection_thread->join(); + + for (const auto t : connection_threads) { + t->join(); + } + LOG("Terminated connection threads"); + + delete incoming_connection_thread; + + LOG("Handler closed."); + + MTCL::Manager::finalize(); + LOG("Finalizing MTCL backend"); + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, "MTCL backend cleanup completed."); +} + +void MTCLBackend::handshake_servers() { + incoming_connection_thread = + new std::thread(incomingMTCLConnectionListener, ownPort, usedProtocol, &continue_execution, + thread_sleep_times, &open_connections, &open_connections_lock, + &connection_threads, &incoming_request_queue); +} + +const std::set MTCLBackend::get_nodes() { + std::set keys; + shared_lock_guard slg(open_connections_lock); + for (const auto &[hostname, _handle] : open_connections) { + keys.insert(hostname); + } + + return keys; +} + +void MTCLBackend::send_request(const char *message, const int message_len, + const std::string &target) { + + START_LOG(gettid(), "call(target=%s, message=%s, message_len=%ld)", target.c_str(), message, + message_len); + + shared_lock_guard slg(open_connections_lock); + const auto queues = open_connections.at(target); + LOG("obtained access to queue"); + + queues->push(message, message_len, target); + LOG("Request pushed to output queue"); +} + +void MTCLBackend::send_file(char *shm, long int nbytes, const std::string &target) { + START_LOG(gettid(), "call(target=%s, nbytes=%ld)", target.c_str(), nbytes); + + shared_lock_guard slg(open_connections_lock); + const auto queue = open_connections.at(target); + queue->push(shm, nbytes, target); +} + +void MTCLBackend::recv_file(char *shm, const std::string &source, long int bytes_expected) { + shared_lock_guard slg(open_connections_lock); + const auto queues = open_connections.at(source); + const auto data = queues->pop(); + memcpy(shm, data.object, bytes_expected); +} + +void MTCLBackend::connect_to(const std::string &target_token) { + START_LOG(gettid(), "call(target=%s)", target_token.c_str()); + + if (std::string(target_token) == selfToken) { + LOG("Skipping to connect to self"); + return; + } + + std::string remoteHostname = + target_token.substr(target_token.find(':') + 1, // Drop proto + target_token.find_last_of(':') - target_token.find(':') - 1 // drop port + ); + + /* + * Connect to remote only if its hostname is lexically smaller than self hostname + * If current server hostname is equal to remoteHostname, avoid connection + * TODO: extend this to support also different workflows on same nodes. (NB: right now we expect + different MCAST groups ) + */ + if (node_name >= remoteHostname) { + return; + } + + { + shared_lock_guard slg(open_connections_lock); + if (open_connections.find(remoteHostname) != open_connections.end()) { + LOG("Remote host %s is already connected", remoteHostname.c_str()); + return; + } + } + + if (auto UserManager = MTCL::Manager::connect(target_token); UserManager.isValid()) { + LOG("Opened connection with: %s", target_token.c_str()); + + // send my hostname + const size_t ownHostnameLen = node_name.size(); + UserManager.send(&ownHostnameLen, sizeof(ownHostnameLen)); + UserManager.send(node_name.c_str(), ownHostnameLen); + + auto *queue = new AtomicQueue(); + { + const std::lock_guard lg(open_connections_lock); + open_connections[remoteHostname] = queue; + } + connection_threads.push_back( + new std::thread(serverConnectionHandler, std::move(UserManager), remoteHostname, queue, + thread_sleep_times, &continue_execution, &incoming_request_queue)); + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, + "Connected to " + target_token + " (outgoing)"); + } else { + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, "Warning: tried to connect to " + + std::string(remoteHostname) + + " but connection is not valid"); + } +} diff --git a/capio/server/src/none_backend.cpp b/capio/server/src/none_backend.cpp index e0e8e3208..588d5d843 100644 --- a/capio/server/src/none_backend.cpp +++ b/capio/server/src/none_backend.cpp @@ -30,4 +30,5 @@ void NoneBackend::send_request(const char *message, const int message_len, void NoneBackend::recv_file(char *shm, const std::string &source, const long int bytes_expected) { START_LOG(gettid(), "call(shm=%ld, source=%s, bytes_expected=%ld)", shm, source.c_str(), bytes_expected); -} \ No newline at end of file +} +void NoneBackend::connect_to(const std::string &target) { return; } \ No newline at end of file diff --git a/capio/server/src/shm_canary.cpp b/capio/server/src/shm_canary.cpp new file mode 100644 index 000000000..4c0bdceab --- /dev/null +++ b/capio/server/src/shm_canary.cpp @@ -0,0 +1,28 @@ +#include "utils/shm_canary.hpp" + +#include "common/env.hpp" +#include "common/logger.hpp" +#include "utils/common.hpp" + +CapioShmCanary::CapioShmCanary(const std::string &capio_workflow_name) + : _canary_name(capio_workflow_name) { + START_LOG(capio_syscall(SYS_gettid), "call(capio_workflow_name: %s)", _canary_name.data()); + if (_canary_name.empty()) { + _canary_name = get_capio_workflow_name(); + } + _shm_id = shm_open(_canary_name.data(), O_CREAT | O_EXCL, S_IRUSR | S_IWUSR); + if (_shm_id == -1) { + + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, + "Error: canary variable " + _canary_name + " already exists!"); + LOG(CAPIO_SHM_CANARY_ERROR, _canary_name.data()); + ERR_EXIT("ERR: shm canary flag already exists"); + } +} + +CapioShmCanary::~CapioShmCanary() { + START_LOG(capio_syscall(SYS_gettid), "call()"); + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, "Removing shared memory canary flag"); + close(_shm_id); + SHM_DESTROY_CHECK(_canary_name.c_str()); +} diff --git a/capio/tests/unit/server/CMakeLists.txt b/capio/tests/unit/server/CMakeLists.txt index ca41e31bc..a11ccdb27 100644 --- a/capio/tests/unit/server/CMakeLists.txt +++ b/capio/tests/unit/server/CMakeLists.txt @@ -4,7 +4,7 @@ set(TARGET_NAME capio_server_unit_tests) find_package(MPI REQUIRED) -FetchContent_MakeAvailable(capio_cl) +FetchContent_MakeAvailable(capio_cl mtcl) set(TARGET_INCLUDE_FOLDER "${PROJECT_SOURCE_DIR}/capio/server") @@ -32,6 +32,7 @@ target_sources(${TARGET_NAME} PRIVATE target_include_directories(${TARGET_NAME} PRIVATE "${TARGET_INCLUDE_FOLDER}/include" ${capio_cl_SOURCE_DIR} + ${mtcl_SOURCE_DIR}/include ) ##################################### diff --git a/capio/tests/unit/server/src/capio_file.cpp b/capio/tests/unit/server/src/capio_file.cpp index db7ead849..28095dfe9 100644 --- a/capio/tests/unit/server/src/capio_file.cpp +++ b/capio/tests/unit/server/src/capio_file.cpp @@ -338,6 +338,7 @@ class MockBackend : public Backend { RemoteRequest read_next_request() override { return {nullptr, ""}; } void send_file(char *shm, long int nbytes, const std::string &target) override {} void send_request(const char *message, int message_len, const std::string &target) override {} + void connect_to(const std::string &target) override {} }; class MockBackendTestFixture : public ::testing::Test { diff --git a/capio/tests/unit/server/src/main.cpp b/capio/tests/unit/server/src/main.cpp index 28cb277e4..83be6583c 100644 --- a/capio/tests/unit/server/src/main.cpp +++ b/capio/tests/unit/server/src/main.cpp @@ -3,6 +3,7 @@ #include "capiocl.hpp" #include "capiocl/engine.h" #include "client-manager/client_manager.hpp" +#include "remote/discovery.hpp" #include "storage/manager.hpp" #include "utils/capiocl_adapter.hpp" #include "utils/location.hpp" @@ -11,6 +12,7 @@ capiocl::engine::Engine *capio_cl_engine = nullptr; StorageManager *storage_manager = nullptr; ClientManager *client_manager = nullptr; Backend *backend = nullptr; +DiscoveryService *discovery_service = nullptr; const capiocl::engine::Engine &CapioCLEngine::get() { return *capio_cl_engine; } @@ -19,15 +21,17 @@ class ServerUnitTestEnvironment : public testing::Environment { explicit ServerUnitTestEnvironment() = default; void SetUp() override { - capio_cl_engine = new capiocl::engine::Engine(false); - client_manager = new ClientManager(); - storage_manager = new StorageManager(); + capio_cl_engine = new capiocl::engine::Engine(false); + client_manager = new ClientManager(); + storage_manager = new StorageManager(); + discovery_service = new DiscoveryService(); } void TearDown() override { delete storage_manager; delete client_manager; delete capio_cl_engine; + delete discovery_service; } }; diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 000000000..327ab3e44 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,49 @@ +services: + node1: + image: hpio/capio:latest + container_name: node1 + hostname: "node1" + tty: true + working_dir: /shared + volumes: + - shared_data:/shared + networks: + capio_private_net: + ipv4_address: 10.10.0.10 + environment: + - CAPIO_LOG_LEVEL=-1 + - APP_TYPE=writer + - CAPIO_DIR=. + command: | + capio_server -b mtcl --no-config + + node2: + image: hpio/capio:latest + container_name: node2 + hostname: "node2" + tty: true + working_dir: /shared + volumes: + - shared_data:/shared + networks: + capio_private_net: + ipv4_address: 10.10.0.11 + environment: + - CAPIO_LOG_LEVEL=-1 + - APP_TYPE=reader + - CAPIO_DIR=. + command: | + capio_server -b mtcl --no-config + +volumes: + shared_data: + +networks: + capio_private_net: + driver: macvlan + driver_opts: + parent: dummy0 + ipam: + config: + - subnet: 10.10.0.0/24 + gateway: 10.10.0.1 \ No newline at end of file