From b8c304087247f9d8cf4f87a0d0954a96628e0544 Mon Sep 17 00:00:00 2001 From: Marco Edoardo Santimaria Date: Fri, 29 Aug 2025 12:12:38 +0200 Subject: [PATCH 1/5] Added backend control plane implementation --- Dockerfile | 16 +- README.md | 6 +- docker-development/build-and-startup.sh | 7 + docker-development/docker-compose.yml | 34 +++ src/common/capio/constants.hpp | 95 ++++---- src/common/capio/logger.hpp | 2 +- .../capio-cl-engine/capio_cl_engine.hpp | 2 +- src/server/client-manager/handlers/create.hpp | 4 +- .../CapioCommunicationService.hpp | 11 +- .../control_plane/capio_control_plane.hpp | 25 +- .../control_plane/fs_control_plane.hpp | 9 +- .../control_plane/multicast_control_plane.hpp | 221 +++++++++++++----- .../data_plane/MTCL_backend.hpp | 5 +- src/server/utils/configuration.hpp | 23 +- src/server/utils/parser.hpp | 50 ++-- src/server/utils/signals.hpp | 2 +- tests/multinode/backend/src/MTCL.hpp | 2 +- 17 files changed, 343 insertions(+), 171 deletions(-) create mode 100755 docker-development/build-and-startup.sh create mode 100644 docker-development/docker-compose.yml diff --git a/Dockerfile b/Dockerfile index 2e71b2dc2..ab6078535 100644 --- a/Dockerfile +++ b/Dockerfile @@ -89,15 +89,15 @@ COPY --from=builder \ # Binaries COPY --from=builder \ - "/usr/local/bin/capio_posix_unit_test[s]" \ + "/usr/local/bin/capio_posix_unit_test[s]*" \ "/usr/local/bin/capio_server" \ - "/usr/local/bin/capio_server_unit_test[s]" \ - "/usr/local/bin/capio_syscall_unit_test[s]" \ - "/usr/local/bin/capio_integration_test[s]" \ - "/usr/local/bin/capio_backend_unit_tests" \ - "/usr/local/bin/capio_integration_test_map" \ - "/usr/local/bin/capio_integration_test_merge" \ - "/usr/local/bin/capio_integration_test_split" \ + "/usr/local/bin/capio_server_unit_test[s]*" \ + "/usr/local/bin/capio_syscall_unit_test[s]*" \ + "/usr/local/bin/capio_integration_test[s]*" \ + "/usr/local/bin/capio_backend_unit_tests*" \ + "/usr/local/bin/capio_integration_test_map*" \ + "/usr/local/bin/capio_integration_test_merge*" \ + "/usr/local/bin/capio_integration_test_split*" \ "/opt/capio/capiorun/capiorun" \ /usr/local/bin/ diff --git a/README.md b/README.md index 2bd80b98d..746481162 100644 --- a/README.md +++ b/README.md @@ -12,9 +12,9 @@ Bash. > oy just want to coordinate IO operations between workflow steps! Compatible on: -- ![Architecture](https://img.shields.io/badge/Architecture-x86_64-blue.svg) -- ![Architecture](https://img.shields.io/badge/Architecture-risc--v-green.svg) -- ![Architecture](https://img.shields.io/badge/Architecture-arm64-red.svg) coming soon! +- ![Architecture](https://img.shields.io/badge/Architecture-x86__64_/_amd64-50C878.svg) +- ![Architecture](https://img.shields.io/badge/Architecture-RISC--V_(riscv64)-50C878.svg) +- ![Architecture](https://img.shields.io/badge/Architecture-ARM64_coming_soon-red.svg) --- ## Automatic install with SPACK diff --git a/docker-development/build-and-startup.sh b/docker-development/build-and-startup.sh new file mode 100755 index 000000000..4a4d77351 --- /dev/null +++ b/docker-development/build-and-startup.sh @@ -0,0 +1,7 @@ +#!/usr/bin/env bash + +cd .. +docker build -t alphaunito/capio --build-arg CAPIO_LOG=ON --build-arg CMAKE_BUILD_TYPE=Debug . +cd docker-development + +docker compose up diff --git a/docker-development/docker-compose.yml b/docker-development/docker-compose.yml new file mode 100644 index 000000000..5a4ef82cc --- /dev/null +++ b/docker-development/docker-compose.yml @@ -0,0 +1,34 @@ +services: + node1: + image: alphaunito/capio:latest + tty: true + working_dir: /shared + volumes: + - shared_data:/shared + networks: + capio_net: + aliases: + - node1 + environment: + - CAPIO_LOG_LEVEL=-1 + + node2: + image: alphaunito/capio:latest + tty: true + working_dir: /shared + volumes: + - shared_data:/shared + networks: + capio_net: + aliases: + - node2 + environment: + - CAPIO_LOG_LEVEL=-1 + + +volumes: + shared_data: + +networks: + capio_net: + driver: bridge \ No newline at end of file diff --git a/src/common/capio/constants.hpp b/src/common/capio/constants.hpp index 5235e16b6..6c86ccc5b 100644 --- a/src/common/capio/constants.hpp +++ b/src/common/capio/constants.hpp @@ -11,16 +11,16 @@ typedef unsigned long long int capio_off64_t; // CAPIO files constants -constexpr size_t CAPIO_DEFAULT_DIR_INITIAL_SIZE = 1024L * 1024 * 1024; -constexpr off64_t CAPIO_DEFAULT_FILE_INITIAL_SIZE = 1024L * 1024 * 1024 * 4; +constexpr size_t CAPIO_DEFAULT_DIR_INITIAL_SIZE = 1024L * 1024 * 1024; +constexpr off64_t CAPIO_DEFAULT_FILE_INITIAL_SIZE = 1024L * 1024 * 1024 * 4; [[maybe_unused]] constexpr std::array CAPIO_DIR_FORBIDDEN_PATHS = { std::string_view{"/proc/"}, std::string_view{"/sys/"}, std::string_view{"/boot/"}, - std::string_view{"/dev/"}, std::string_view{"/var/"}, std::string_view{"/run/"}, + std::string_view{"/dev/"}, std::string_view{"/var/"}, std::string_view{"/run/"}, std::string_view("/spack/")}; // CAPIO default values for shared memory constexpr char CAPIO_DEFAULT_WORKFLOW_NAME[] = "CAPIO"; -constexpr char CAPIO_DEFAULT_APP_NAME[] = "default_app"; +constexpr char CAPIO_DEFAULT_APP_NAME[] = "default_app"; constexpr char CAPIO_SHM_CANARY_ERROR[] = "FATAL ERROR: Shared memories for workflow %s already " "exists. One of two (or both) reasons are to blame: \n " @@ -29,52 +29,52 @@ constexpr char CAPIO_SHM_CANARY_ERROR[] = "is already running. Clean shared memory and then retry"; // CAPIO communication constants -constexpr int CAPIO_REQ_BUFF_CNT = 512; // Max number of elements inside buffers -constexpr int CAPIO_CACHE_LINES_DEFAULT = 10; -constexpr int CAPIO_CACHE_LINE_SIZE_DEFAULT = 32768; // 32K of default size for cache lines +constexpr int CAPIO_REQ_BUFF_CNT = 512; // Max number of elements inside buffers +constexpr int CAPIO_CACHE_LINES_DEFAULT = 10; +constexpr int CAPIO_CACHE_LINE_SIZE_DEFAULT = 32768; // 32K of default size for cache lines // TODO: use that in communication only uses the file descriptor instead of the path to save on the // PATH_MAX -constexpr size_t CAPIO_REQ_MAX_SIZE = (PATH_MAX + 256) * sizeof(char); -constexpr char CAPIO_SERVER_CLI_LOG_SERVER[] = "[\033[1;32mSERVER\033[0m"; +constexpr size_t CAPIO_REQ_MAX_SIZE = (PATH_MAX + 256) * sizeof(char); +constexpr char CAPIO_SERVER_CLI_LOG_SERVER[] = "[\033[1;32mSERVER\033[0m"; constexpr char CAPIO_SERVER_CLI_LOG_SERVER_WARNING[] = "[\033[1;33mSERVER\033[0m"; -constexpr char CAPIO_SERVER_CLI_LOG_SERVER_ERROR[] = "[\033[1;31mSERVER\033[0m"; -constexpr char LOG_CAPIO_START_REQUEST[] = "\n+++++++++++ SYSCALL %s (%d) +++++++++++"; -constexpr char LOG_CAPIO_END_REQUEST[] = "----------- END SYSCALL ----------\n"; +constexpr char CAPIO_SERVER_CLI_LOG_SERVER_ERROR[] = "[\033[1;31mSERVER\033[0m"; +constexpr char LOG_CAPIO_START_REQUEST[] = "\n+++++++++++ SYSCALL %s (%d) +++++++++++"; +constexpr char LOG_CAPIO_END_REQUEST[] = "----------- END SYSCALL ----------\n"; constexpr char CAPIO_SERVER_LOG_START_REQUEST_MSG[] = "\n+++++++++++++++++REQUEST+++++++++++++++++"; -constexpr char CAPIO_SERVER_LOG_END_REQUEST_MSG[] = "~~~~~~~~~~~~~~~END REQUEST~~~~~~~~~~~~~~~"; -constexpr int CAPIO_LOG_MAX_MSG_LEN = 4096; -constexpr int CAPIO_MAX_SPSQUEUE_ELEMS = 10; -constexpr int CAPIO_MAX_SPSCQUEUE_ELEM_SIZE = 1024 * 256; +constexpr char CAPIO_SERVER_LOG_END_REQUEST_MSG[] = "~~~~~~~~~~~~~~~END REQUEST~~~~~~~~~~~~~~~"; +constexpr int CAPIO_LOG_MAX_MSG_LEN = 4096; +constexpr int CAPIO_MAX_SPSQUEUE_ELEMS = 10; +constexpr int CAPIO_MAX_SPSCQUEUE_ELEM_SIZE = 1024 * 256; // CAPIO streaming semantics -constexpr char CAPIO_FILE_MODE_NO_UPDATE[] = "no_update"; -constexpr char CAPIO_FILE_MODE_UPDATE[] = "update"; -constexpr char CAPIO_FILE_COMMITTED_ON_CLOSE[] = "on_close"; -constexpr char CAPIO_FILE_COMMITTED_ON_FILE[] = "on_file"; -constexpr char CAPIO_FILE_COMMITTED_N_FILES[] = "n_files"; +constexpr char CAPIO_FILE_MODE_NO_UPDATE[] = "no_update"; +constexpr char CAPIO_FILE_MODE_UPDATE[] = "update"; +constexpr char CAPIO_FILE_COMMITTED_ON_CLOSE[] = "on_close"; +constexpr char CAPIO_FILE_COMMITTED_ON_FILE[] = "on_file"; +constexpr char CAPIO_FILE_COMMITTED_N_FILES[] = "n_files"; constexpr char CAPIO_FILE_COMMITTED_ON_TERMINATION[] = "on_termination"; // CAPIO POSIX return codes -constexpr int CAPIO_POSIX_SYSCALL_ERRNO = -1; +constexpr int CAPIO_POSIX_SYSCALL_ERRNO = -1; constexpr int CAPIO_POSIX_SYSCALL_REQUEST_SKIP = -2; -constexpr int CAPIO_POSIX_SYSCALL_SKIP = 1; -constexpr int CAPIO_POSIX_SYSCALL_SUCCESS = 0; +constexpr int CAPIO_POSIX_SYSCALL_SKIP = 1; +constexpr int CAPIO_POSIX_SYSCALL_SUCCESS = 0; // CAPIO logger - common -constexpr char CAPIO_LOG_PRE_MSG[] = "at[%.15llu][%.40s]: "; +constexpr char CAPIO_LOG_PRE_MSG[] = "at[%.15llu][%.40s]: "; constexpr char CAPIO_DEFAULT_LOG_FOLDER[] = "capio_logs\0"; // CAPIO common - shared memory constant names -constexpr char SHM_FIRST_ELEM[] = "_first_elem_"; -constexpr char SHM_LAST_ELEM[] = "_last_elem_"; -constexpr char SHM_MUTEX_PREFIX[] = "_mutex_"; -constexpr char SHM_SEM_ELEMS[] = "_sem_num_elems_"; -constexpr char SHM_SEM_EMPTY[] = "_sem_num_empty_"; +constexpr char SHM_FIRST_ELEM[] = "_first_elem_"; +constexpr char SHM_LAST_ELEM[] = "_last_elem_"; +constexpr char SHM_MUTEX_PREFIX[] = "_mutex_"; +constexpr char SHM_SEM_ELEMS[] = "_sem_num_elems_"; +constexpr char SHM_SEM_EMPTY[] = "_sem_num_empty_"; constexpr char SHM_SPSC_PREFIX_WRITE[] = "capio_write_tid_"; -constexpr char SHM_SPSC_PREFIX_READ[] = "capio_read_tid_"; +constexpr char SHM_SPSC_PREFIX_READ[] = "capio_read_tid_"; // CAPIO common - shared channel by client and server -constexpr char SHM_COMM_CHAN_NAME[] = "request_buffer"; +constexpr char SHM_COMM_CHAN_NAME[] = "request_buffer"; constexpr char SHM_COMM_CHAN_NAME_RESP[] = "response_buffer_"; // CAPIO logger - shm errors @@ -84,8 +84,8 @@ constexpr char CAPIO_SHM_OPEN_ERROR[] = // CAPIO logger - POSIX constexpr char CAPIO_LOG_POSIX_DEFAULT_LOG_FILE_PREFIX[] = "posix_thread_\0"; -constexpr char CAPIO_LOG_POSIX_SYSCALL_START[] = "\n+++++++++ SYSCALL %s (%d) +++++++++"; -constexpr char CAPIO_LOG_POSIX_SYSCALL_END[] = "~~~~~~~~~ END SYSCALL ~~~~~~~~~\n"; +constexpr char CAPIO_LOG_POSIX_SYSCALL_START[] = "\n+++++++++ SYSCALL %s (%d) +++++++++"; +constexpr char CAPIO_LOG_POSIX_SYSCALL_END[] = "~~~~~~~~~ END SYSCALL ~~~~~~~~~\n"; // CAPIO logger - server constexpr char CAPIO_SERVER_DEFAULT_LOG_FILE_PREFIX[] = "server_thread_\0"; @@ -103,10 +103,10 @@ constexpr char CAPIO_LOG_SERVER_BANNER[] = "\\______/\n\n" "\033[0m CAPIO - Cross Application Programmable IO \n" " V. " CAPIO_VERSION "\n\n"; -constexpr char CAPIO_LOG_SERVER_CLI_LEVEL_INFO[] = "[\033[1;32mSERVER\033[0m"; +constexpr char CAPIO_LOG_SERVER_CLI_LEVEL_INFO[] = "[\033[1;32mSERVER\033[0m"; constexpr char CAPIO_LOG_SERVER_CLI_LEVEL_WARNING[] = "[\033[1;33mSERVER\033[0m"; -constexpr char CAPIO_LOG_SERVER_CLI_LEVEL_ERROR[] = "[\033[1;31mSERVER\033[0m"; -constexpr char CAPIO_LOG_SERVER_CLI_LEVEL_JSON[] = "[\033[1;34mSERVER\033[0m"; +constexpr char CAPIO_LOG_SERVER_CLI_LEVEL_ERROR[] = "[\033[1;31mSERVER\033[0m"; +constexpr char CAPIO_LOG_SERVER_CLI_LEVEL_JSON[] = "[\033[1;34mSERVER\033[0m"; constexpr char CAPIO_LOG_SERVER_CLI_LOGGING_ENABLED_WARNING[] = "[\033[1;33mSERVER\033[0m] " "|==================================================================|\n" @@ -121,7 +121,7 @@ constexpr char CAPIO_LOG_SERVER_CLI_LOGGING_ENABLED_WARNING[] = constexpr char CAPIO_LOG_SERVER_CLI_LOGGING_NOT_AVAILABLE[] = "CAPIO_LOG set but log support was not compiled into CAPIO!"; constexpr char CAPIO_LOG_SERVER_REQUEST_START[] = "+++++++++++ REQUEST +++++++++++"; -constexpr char CAPIO_LOG_SERVER_REQUEST_END[] = "~~~~~~~~~ END REQUEST ~~~~~~~~~\n"; +constexpr char CAPIO_LOG_SERVER_REQUEST_END[] = "~~~~~~~~~ END REQUEST ~~~~~~~~~\n"; // CAPIO server argument parser constexpr char CAPIO_SERVER_ARG_PARSER_PRE[] = @@ -185,10 +185,17 @@ constexpr char CAPIO_SERVER_ARG_PARSER_CONFIG_BACKEND_HELP[] = // CAPIO backend constant values -constexpr int DEFAULT_CAPIO_BACKEND_PORT = 2222; +constexpr int DEFAULT_CAPIO_BACKEND_PORT = 2222; constexpr int CAPIO_BACKEND_DEFAULT_SLEEP_TIME = 300; -constexpr char MULTICAST_DISCOVERY_ADDR[] = "234.234.234.1"; -constexpr int MULTICAST_DISCOVERY_PORT = 2223; -constexpr int MULTICAST_ALIVE_TOKEN_MESSAGE_SIZE = - HOST_NAME_MAX + 10; // hostname + : + sizeof(port) -#endif // CAPIO_COMMON_CONSTANTS_HPP +constexpr char MULTICAST_DISCOVERY_ADDR[] = "234.234.234.1"; +constexpr char MULTICAST_CONTROLPL_ADDR[] = "234.234.234.2"; +constexpr int MULTICAST_DISCOVERY_PORT = 2223; +constexpr int MULTICAST_CONTROLPL_PORT = 2224; + + +// hostname + : + sizeof(port) +constexpr int MULTICAST_ALIVE_TOKEN_MESSAGE_SIZE = HOST_NAME_MAX + 10; +constexpr int MULTICAST_CONTROLPL_MESSAGE_SIZE = HOST_NAME_MAX + PATH_MAX + 10; + + +#endif // CAPIO_COMMON_CONSTANTS_HPP \ No newline at end of file diff --git a/src/common/capio/logger.hpp b/src/common/capio/logger.hpp index c658e6a92..5be99f07c 100644 --- a/src/common/capio/logger.hpp +++ b/src/common/capio/logger.hpp @@ -372,7 +372,7 @@ inline bool syscall_no_intercept_flag = false; #define ERR_EXIT(fmt, ...) \ if (!continue_on_error) { \ syscall_no_intercept_flag = true; \ - char tmp_buf[1024]; \ + char tmp_buf[5120]; \ sprintf(tmp_buf, fmt, ##__VA_ARGS__); \ char node_name[HOST_NAME_MAX]{0}; \ gethostname(node_name, HOST_NAME_MAX); \ diff --git a/src/server/capio-cl-engine/capio_cl_engine.hpp b/src/server/capio-cl-engine/capio_cl_engine.hpp index fd1c04515..adab55430 100644 --- a/src/server/capio-cl-engine/capio_cl_engine.hpp +++ b/src/server/capio-cl-engine/capio_cl_engine.hpp @@ -34,7 +34,7 @@ class CapioCLEngine { public: void print() const { // First message - server_println(CAPIO_LOG_SERVER_CLI_LEVEL_JSON, ""); + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_JSON); server_println(CAPIO_LOG_SERVER_CLI_LEVEL_JSON, "Composition of expected CAPIO FS: "); // Table header lines diff --git a/src/server/client-manager/handlers/create.hpp b/src/server/client-manager/handlers/create.hpp index 7736f2c1d..930a5bb02 100644 --- a/src/server/client-manager/handlers/create.hpp +++ b/src/server/client-manager/handlers/create.hpp @@ -1,5 +1,6 @@ #ifndef CAPIO_CREATE_HPP #define CAPIO_CREATE_HPP +#include "communication-service/control_plane/capio_control_plane.hpp" #include "storage-service/capio_storage_service.hpp" /** @@ -24,6 +25,7 @@ inline void create_handler(const char *const str) { capio_cl_engine->addProducer(path, name); client_manager->register_produced_file(tid, path_str); storage_service->createMemoryFile(path); + capio_control_plane->notify_all(CapioControlPlane::CREATE, path_str); } -#endif // CAPIO_CREATE_HPP +#endif // CAPIO_CREATE_HPP \ No newline at end of file diff --git a/src/server/communication-service/CapioCommunicationService.hpp b/src/server/communication-service/CapioCommunicationService.hpp index 84348c7bb..0a6568aee 100644 --- a/src/server/communication-service/CapioCommunicationService.hpp +++ b/src/server/communication-service/CapioCommunicationService.hpp @@ -16,8 +16,7 @@ class CapioCommunicationService { delete capio_backend; }; - CapioCommunicationService(std::string &backend_name, const int port, - const std::string &control_plane_backend = "multicast") { + CapioCommunicationService(std::string &backend_name, const int port) { START_LOG(gettid(), "call(backend_name=%s)", backend_name.c_str()); LOG("My hostname is %s. Starting to listen on connection", @@ -47,14 +46,6 @@ class CapioCommunicationService { ERR_EXIT("No valid backend was provided"); } - if (control_plane_backend == "multicast") { - server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, "Starting multicast control plane"); - capio_control_plane = new MulticastControlPlane(port); - } else { - server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, "Starting file system control plane"); - capio_control_plane = new FSControlPlane(port); - } - server_println(CAPIO_SERVER_CLI_LOG_SERVER, "CapioCommunicationService initialization completed."); } diff --git a/src/server/communication-service/control_plane/capio_control_plane.hpp b/src/server/communication-service/control_plane/capio_control_plane.hpp index 4fbf45bbd..cdd5b7ec1 100644 --- a/src/server/communication-service/control_plane/capio_control_plane.hpp +++ b/src/server/communication-service/control_plane/capio_control_plane.hpp @@ -1,11 +1,32 @@ #ifndef CAPIO_CONTROL_PLANE_HPP #define CAPIO_CONTROL_PLANE_HPP +#include + class CapioControlPlane { - public: +public: + typedef enum { CREATE, DELETE, WRITE } event_type; + virtual ~CapioControlPlane() = default; + + /** + * Notify a single host of the occurrence of an event + * @param event + * @param path + * @param hostname_target + */ + void notify(event_type event, const std::filesystem::path &path, + const std::string &hostname_target) { + } + + /** + * Notify all nodes of the occurence of an event + * @param event + * @param path + */ + virtual void notify_all(event_type event, const std::filesystem::path &path) = 0; }; inline CapioControlPlane *capio_control_plane; -#endif // CAPIO_CONTROL_PLANE_HPP +#endif // CAPIO_CONTROL_PLANE_HPP \ No newline at end of file diff --git a/src/server/communication-service/control_plane/fs_control_plane.hpp b/src/server/communication-service/control_plane/fs_control_plane.hpp index 716f0c970..f4dacc3b0 100644 --- a/src/server/communication-service/control_plane/fs_control_plane.hpp +++ b/src/server/communication-service/control_plane/fs_control_plane.hpp @@ -93,11 +93,11 @@ class FSControlPlane : public CapioControlPlane { sleep(1); } - public: +public: explicit FSControlPlane(int backend_port) : _backend_port(backend_port) { gethostname(ownHostname, HOST_NAME_MAX); generate_aliveness_token(backend_port); - continue_execution = new bool(true); + continue_execution = new bool(true); token_used_to_connect_mutex = new std::mutex(); thread = new std::thread(fs_server_aliveness_detector_thread, std::ref(continue_execution), &token_used_to_connect, token_used_to_connect_mutex); @@ -115,6 +115,9 @@ class FSControlPlane : public CapioControlPlane { server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, "FSControlPlane cleanup completed."); } + + void notify_all(event_type event, const std::filesystem::path &path) override { + } }; -#endif // FS_CONTROL_PLANE_HPP +#endif // FS_CONTROL_PLANE_HPP \ No newline at end of file diff --git a/src/server/communication-service/control_plane/multicast_control_plane.hpp b/src/server/communication-service/control_plane/multicast_control_plane.hpp index fee8f6ed6..a3ae992ff 100644 --- a/src/server/communication-service/control_plane/multicast_control_plane.hpp +++ b/src/server/communication-service/control_plane/multicast_control_plane.hpp @@ -8,130 +8,156 @@ #include class MulticastControlPlane : public CapioControlPlane { - int _backend_port; + char _discovery_multicast_address[16] = {0}; bool *continue_execution; - std::thread *thread; + std::thread *discovery_thread, *controlpl_incoming; std::vector token_used_to_connect; std::mutex *token_used_to_connect_mutex; char ownHostname[HOST_NAME_MAX] = {0}; - static void send_multicast_alive_token(const int data_plane_backend_port) { - START_LOG(gettid(), "call(data_plane_backend_port=%d)", data_plane_backend_port); + static int open_outgoing_multicast_socket(const char *address, const int port, + sockaddr_in *addr) { int transmission_socket = socket(AF_INET, SOCK_DGRAM, 0); if (transmission_socket < 0) { server_println(CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, std::string("WARNING: unable to bind multicast socket: ") + - strerror(errno)); + strerror(errno)); server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, "Execution will continue only with FS discovery support"); - return; + return -1; } - sockaddr_in addr = {}; - addr.sin_family = AF_INET; - addr.sin_addr.s_addr = inet_addr(MULTICAST_DISCOVERY_ADDR); - addr.sin_port = htons(MULTICAST_DISCOVERY_PORT); + addr->sin_family = AF_INET; + addr->sin_addr.s_addr = inet_addr(address); + addr->sin_port = htons(port); + return transmission_socket; + }; + + static void send_multicast_alive_token(const int data_plane_backend_port) { + START_LOG(gettid(), "call(data_plane_backend_port=%d)", data_plane_backend_port); + + sockaddr_in addr = {}; + const auto socket = open_outgoing_multicast_socket(MULTICAST_DISCOVERY_ADDR, + MULTICAST_DISCOVERY_PORT, &addr); char message[MULTICAST_ALIVE_TOKEN_MESSAGE_SIZE]; sprintf(message, "%s:%d", capio_global_configuration->node_name, data_plane_backend_port); - if (sendto(transmission_socket, message, strlen(message), 0, + LOG("Sending token: %s", message); + + if (sendto(socket, message, strlen(message), 0, reinterpret_cast(&addr), sizeof(addr)) < 0) { server_println(CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, "WARNING: unable to send alive token(" + std::string(message) + - ") to multicast address!: " + strerror(errno)); + ") to multicast address!: " + strerror(errno)); } LOG("Sent multicast token"); - close(transmission_socket); + close(socket); } - static void multicast_server_aliveness_thread(const bool *continue_execution, - std::vector *token_used_to_connect, - std::mutex *token_used_to_connect_mutex, - const int data_plane_backend_port) { - - START_LOG(gettid(), "call(data_plane_backend_port=%d)", data_plane_backend_port); - - char incomingMessage[MULTICAST_ALIVE_TOKEN_MESSAGE_SIZE]; - - int loopback = 0; // disable receive loopback messages + static int open_outgoing_socket(const char *address_ip, const int port, + sockaddr_in &addr, socklen_t &addrlen) { + START_LOG(gettid(), "call(address=%s, port=%d)", address_ip, port); + int loopback = 0; // disable receive loopback messages u_int multiple_socket_on_same_address = 1; // enable multiple sockets on same address - const std::string SELF_TOKEN = std::string(capio_global_configuration->node_name) + ":" + - std::to_string(data_plane_backend_port); - - int discovery_socket = socket(AF_INET, SOCK_DGRAM, 0); - if (discovery_socket < 0) { + int outgoing_socket = socket(AF_INET, SOCK_DGRAM, 0); + if (outgoing_socket < 0) { server_println(CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, std::string("WARNING: unable to open multicast socket: ") + - strerror(errno)); + strerror(errno)); server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, "Execution will continue only with FS discovery support"); - return; + return -1; } LOG("Created socket"); - if (setsockopt(discovery_socket, SOL_SOCKET, SO_REUSEADDR, + if (setsockopt(outgoing_socket, SOL_SOCKET, SO_REUSEADDR, (char *) &multiple_socket_on_same_address, sizeof(multiple_socket_on_same_address)) < 0) { server_println(CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, std::string("WARNING: unable to multiple sockets to same address: ") + - strerror(errno)); + strerror(errno)); server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, "Execution will continue only with FS discovery support"); - return; + return -1; } LOG("Set IP address to accept multiple sockets on same address"); - if (setsockopt(discovery_socket, IPPROTO_IP, IP_MULTICAST_LOOP, &loopback, + if (setsockopt(outgoing_socket, IPPROTO_IP, IP_MULTICAST_LOOP, &loopback, sizeof(loopback)) < 0) { server_println( CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, std::string("WARNING: unable to filter out loopback incoming messages: ") + - strerror(errno)); + strerror(errno)); server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, "Execution will continue only with FS discovery support"); - return; + return -1; } LOG("Disabled reception of loopback messages from socket"); - sockaddr_in addr = {}; - addr.sin_family = AF_INET; + addr = {}; + addr.sin_family = AF_INET; addr.sin_addr.s_addr = htonl(INADDR_ANY); - addr.sin_port = htons(MULTICAST_DISCOVERY_PORT); - socklen_t addrlen = sizeof(addr); - LOG("Set socket on IP: %s - PORT: %d", MULTICAST_DISCOVERY_ADDR, MULTICAST_DISCOVERY_PORT); + addr.sin_port = htons(port); + addrlen = sizeof(addr); + LOG("Set socket on IP: %s - PORT: %d", address_ip, port); // bind to receive address - if (bind(discovery_socket, reinterpret_cast(&addr), sizeof(addr)) < 0) { + if (bind(outgoing_socket, reinterpret_cast(&addr), sizeof(addr)) < 0) { server_println(CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, std::string("WARNING: unable to bind multicast socket: ") + - strerror(errno)); + strerror(errno)); server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, "Execution will continue only with FS discovery support"); - return; + return -1; } LOG("Binded socket"); ip_mreq mreq{}; - mreq.imr_multiaddr.s_addr = inet_addr(MULTICAST_DISCOVERY_ADDR); + mreq.imr_multiaddr.s_addr = inet_addr(address_ip); mreq.imr_interface.s_addr = htonl(INADDR_ANY); - if (setsockopt(discovery_socket, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)) < 0) { + if (setsockopt(outgoing_socket, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)) < 0) { server_println(CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, std::string("WARNING: unable to join multicast group: ") + - strerror(errno)); + strerror(errno)); server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, "Execution will continue only with FS discovery support"); - return; + return -1; } LOG("Successfully joined multicast group"); + return outgoing_socket; + } + + static void multicast_server_aliveness_thread(const bool *continue_execution, + std::vector *token_used_to_connect, + std::mutex *token_used_to_connect_mutex, + int dataplane_backend_port) { + + START_LOG(gettid(), "call(data_plane_backend_port=%d)", dataplane_backend_port); + + char incomingMessage[MULTICAST_ALIVE_TOKEN_MESSAGE_SIZE]; + + const std::string SELF_TOKEN = std::string(capio_global_configuration->node_name) + ":" + + std::to_string(dataplane_backend_port); + + sockaddr_in addr = {}; + socklen_t addrlen = {}; + const auto discovery_socket = open_outgoing_socket(MULTICAST_DISCOVERY_ADDR, + MULTICAST_DISCOVERY_PORT, + addr, addrlen); + + server_println(CAPIO_SERVER_CLI_LOG_SERVER, std::string("Multicast discovery service @ ") + + MULTICAST_DISCOVERY_ADDR + ":" + + std::to_string(MULTICAST_DISCOVERY_PORT)); while (*continue_execution) { bzero(incomingMessage, sizeof(incomingMessage)); - send_multicast_alive_token(data_plane_backend_port); + // Send port of local data plane backend + send_multicast_alive_token(dataplane_backend_port); LOG("Waiting for incoming token..."); do { @@ -164,29 +190,102 @@ class MulticastControlPlane : public CapioControlPlane { } } - public: - explicit MulticastControlPlane(int backend_port) : _backend_port(backend_port) { + + static void multicast_control_plane_incoming_thread(const bool *continue_execution) { + START_LOG(gettid(), "Call(multicast_control_plane_incoming_thread)"); + char incoming_msg[MULTICAST_CONTROLPL_MESSAGE_SIZE] = {0}; + sockaddr_in addr = {}; + socklen_t addrlen = {}; + const auto discovery_socket = open_outgoing_socket(MULTICAST_CONTROLPL_ADDR, + MULTICAST_CONTROLPL_PORT, + addr, addrlen); + + server_println(CAPIO_SERVER_CLI_LOG_SERVER, std::string("Multicast control plane @ ") + + MULTICAST_CONTROLPL_ADDR + ":" + + std::to_string(MULTICAST_CONTROLPL_PORT)); + + while (*continue_execution) { + bzero(incoming_msg, sizeof(incoming_msg)); + const auto recv_sice = + recvfrom(discovery_socket, incoming_msg, MULTICAST_CONTROLPL_MESSAGE_SIZE, + 0, reinterpret_cast(&addr), &addrlen); + LOG("Received multicast data of size %ld and content %s", recv_sice, + incoming_msg); + if (recv_sice < 0) { + server_println( + CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, + std::string("WARNING: received 0 bytes from multicast socket: ")); + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, + "Execution will continue only with FS discovery support"); + continue; + } + + MulticastControlPlane::event_type event; + char source_hostname[HOST_NAME_MAX]; + char source_path[PATH_MAX]; + + sscanf(incoming_msg, "%d %s %s", &event, source_hostname, source_path); + + if (strcmp(capio_global_configuration->node_name, source_hostname) == 0) { + continue; + } + + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, + "Recived control message: " + std::string(incoming_msg)); + } + + close(discovery_socket); + } + +public: + explicit MulticastControlPlane(int dataplane_backend_port) { + START_LOG(gettid(), "call(dataplane_backend_port=%d)", dataplane_backend_port); gethostname(ownHostname, HOST_NAME_MAX); - continue_execution = new bool(true); + continue_execution = new bool(true); token_used_to_connect_mutex = new std::mutex(); - thread = new std::thread(multicast_server_aliveness_thread, std::ref(continue_execution), - &token_used_to_connect, token_used_to_connect_mutex, backend_port); + discovery_thread = new std::thread(multicast_server_aliveness_thread, + continue_execution, + &token_used_to_connect, token_used_to_connect_mutex, + dataplane_backend_port); + + controlpl_incoming = new std::thread(multicast_control_plane_incoming_thread, + continue_execution); - server_println(CAPIO_SERVER_CLI_LOG_SERVER, std::string("Multicast discovery service @ ") + - MULTICAST_DISCOVERY_ADDR + ":" + - std::to_string(MULTICAST_DISCOVERY_PORT)); } ~MulticastControlPlane() override { *continue_execution = false; - pthread_cancel(thread->native_handle()); - thread->join(); + pthread_cancel(discovery_thread->native_handle()); + discovery_thread->join(); + pthread_cancel(controlpl_incoming->native_handle()); + controlpl_incoming->join(); delete token_used_to_connect_mutex; - delete thread; + delete discovery_thread; delete continue_execution; server_println(CAPIO_SERVER_CLI_LOG_SERVER, "MulticastControlPlane correctly terminated"); } + + void notify_all(const event_type event, const std::filesystem::path &path) override { + START_LOG(gettid(), "call(event=%s, path=%s)", event, path.string().c_str()); + sockaddr_in addr = {}; + const auto socket = open_outgoing_multicast_socket(MULTICAST_CONTROLPL_ADDR, + MULTICAST_CONTROLPL_PORT, &addr); + + char message[MULTICAST_CONTROLPL_MESSAGE_SIZE]; + sprintf(message, "%03d %s %s", event, ownHostname, path.string().c_str()); + + LOG("Sending message: %s", message); + if (sendto(socket, message, strlen(message), 0, + reinterpret_cast(&addr), sizeof(addr)) < 0) { + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, + "WARNING: unable to send message(" + std::string(message) + + ") to multicast address!: " + strerror(errno)); + } + LOG("Sent message"); + + close(socket); + } }; -#endif // MULTICAST_CONTROL_PLANE_HPP +#endif // MULTICAST_CONTROL_PLANE_HPP \ No newline at end of file diff --git a/src/server/communication-service/data_plane/MTCL_backend.hpp b/src/server/communication-service/data_plane/MTCL_backend.hpp index 2a8da8f55..f2339f7b2 100644 --- a/src/server/communication-service/data_plane/MTCL_backend.hpp +++ b/src/server/communication-service/data_plane/MTCL_backend.hpp @@ -238,10 +238,9 @@ class MTCL_backend : public BackendInterface { server_connection_handler, std::move(UserManager), remoteHost.c_str(), thread_sleep_times, connection_tuple, terminate, TO_REMOTE)); } else { - server_println(CAPIO_SERVER_CLI_LOG_SERVER_WARNING, - "Warning: found token " + std::string(remoteHost) + - ".alive_token, but connection is not valid"); + "Warning: tried to connect to " + std::string(remoteHost) + + " but connection is not valid"); } } diff --git a/src/server/utils/configuration.hpp b/src/server/utils/configuration.hpp index 5eb6f720f..be2c51294 100644 --- a/src/server/utils/configuration.hpp +++ b/src/server/utils/configuration.hpp @@ -11,7 +11,7 @@ * to all classes and subclasses. */ class CapioGlobalConfiguration { - public: +public: bool termination_phase, StoreOnlyInMemory; std::string workflow_name; pid_t CAPIO_SERVER_MAIN_PID = -1; @@ -19,19 +19,24 @@ class CapioGlobalConfiguration { CapioGlobalConfiguration() { gethostname(node_name, HOST_NAME_MAX); - termination_phase = false; - StoreOnlyInMemory = false; + termination_phase = false; + StoreOnlyInMemory = false; CAPIO_SERVER_MAIN_PID = gettid(); - workflow_name = CAPIO_DEFAULT_WORKFLOW_NAME; + workflow_name = CAPIO_DEFAULT_WORKFLOW_NAME; } }; inline auto capio_global_configuration = new CapioGlobalConfiguration(); -inline void server_println(const std::string &message_type, const std::string &message_line) { - std::cout << message_type << " " << capio_global_configuration->node_name << "] " - << message_line << std::endl - << std::flush; +inline void server_println(const std::string &message_type = "", + const std::string &message_line = "") { + if (message_type.empty()) { + std::cout << std::endl; + } else { + std::cout << message_type << " " << capio_global_configuration->node_name << "] " + << message_line << std::endl + << std::flush; + } } -#endif // CAPIO_CONFIGURATION_HPP +#endif // CAPIO_CONFIGURATION_HPP \ No newline at end of file diff --git a/src/server/utils/parser.hpp b/src/server/utils/parser.hpp index d3ba27da2..7732765ae 100644 --- a/src/server/utils/parser.hpp +++ b/src/server/utils/parser.hpp @@ -101,7 +101,7 @@ std::string parseCLI(int argc, char **argv, char *resolve_prefix) { } #ifdef CAPIO_LOG auto logname = open_server_logfile(); - log = new Logger(__func__, __FILE__, __LINE__, gettid(), "Created new log file"); + log = new Logger(__func__, __FILE__, __LINE__, gettid(), "Created new log file"); server_println(CAPIO_SERVER_CLI_LOG_SERVER, "started logging to logfile " + logname.string()); #endif @@ -114,7 +114,7 @@ std::string parseCLI(int argc, char **argv, char *resolve_prefix) { server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, "skipping config file parsing."); server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, "Obtained from environment variable current workflow name: " + - capio_global_configuration->workflow_name); + capio_global_configuration->workflow_name); } else { START_LOG(gettid(), "call()"); @@ -138,37 +138,41 @@ std::string parseCLI(int argc, char **argv, char *resolve_prefix) { } #endif + // Port used for communication backend + int port = DEFAULT_CAPIO_BACKEND_PORT; + if (backend_port) { + port = args::get(backend_port); + } + + if (backend) { std::string backend_name = args::get(backend); std::transform(backend_name.begin(), backend_name.end(), backend_name.begin(), ::toupper); - int port = DEFAULT_CAPIO_BACKEND_PORT; - if (backend_port) { - port = args::get(backend_port); - } - - std::string control_backend_name = "multicast"; - if (controlPlaneBackend) { - auto tmp = args::get(controlPlaneBackend); - if (tmp != "multicast" && tmp != "fs") { - server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, - "Unknown control plane backend " + tmp); - } else { - control_backend_name = tmp; - } - } - - server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, - "Using control plane backend: " + control_backend_name); - capio_communication_service = - new CapioCommunicationService(backend_name, port, control_backend_name); + new CapioCommunicationService(backend_name, port); } else { server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, "Selected backend is File System"); capio_backend = new NoBackend(); } + std::string control_backend_name = "multicast"; + + if (controlPlaneBackend) { + auto tmp = args::get(controlPlaneBackend); + if (tmp == "fs") { + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, "Starting FS control plane"); + capio_control_plane = new FSControlPlane(port); + } else { + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, "Starting multicast control plane"); + capio_control_plane = new MulticastControlPlane(port); + } + } else { + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, "Starting multicast control plane"); + capio_control_plane = new MulticastControlPlane(port); + } + if (capio_cl_resolve_path) { auto path = args::get(capio_cl_resolve_path); memcpy(resolve_prefix, path.c_str(), PATH_MAX); @@ -184,4 +188,4 @@ std::string parseCLI(int argc, char **argv, char *resolve_prefix) { return ""; } -#endif // PARSER_HPP +#endif // PARSER_HPP \ No newline at end of file diff --git a/src/server/utils/signals.hpp b/src/server/utils/signals.hpp index b5e9f2c4c..35f0ea5d6 100644 --- a/src/server/utils/signals.hpp +++ b/src/server/utils/signals.hpp @@ -23,7 +23,7 @@ inline void sig_term_handler(int signum, siginfo_t *info, void *ptr) { } START_LOG(gettid(), "call(signal=[%d] (%s) from process with pid=%ld)", signum, strsignal(signum), info != nullptr ? info->si_pid : -1); - + server_println(); server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, "shutting down server"); if (signum == SIGSEGV) { diff --git a/tests/multinode/backend/src/MTCL.hpp b/tests/multinode/backend/src/MTCL.hpp index f74c3ccc0..f94dce251 100644 --- a/tests/multinode/backend/src/MTCL.hpp +++ b/tests/multinode/backend/src/MTCL.hpp @@ -13,7 +13,7 @@ TEST(CapioCommServiceTest, TestPingPong) { START_LOG(gettid(), "INFO: TestPingPong"); const int port = 1234; std::string proto = "TCP"; - auto communication_service = new CapioCommunicationService(proto, port, "multicast"); + auto communication_service = new CapioCommunicationService(proto, port); capio_off64_t size_revc, offset; std::vector connections; From b8a08a7ea16c286786580115129d959f8be0b222 Mon Sep 17 00:00:00 2001 From: Marco Edoardo Santimaria Date: Fri, 29 Aug 2025 13:05:12 +0200 Subject: [PATCH 2/5] Cleanup --- src/common/capio/constants.hpp | 92 +++++---- src/common/capio/env.hpp | 4 +- src/common/capio/filesystem.hpp | 2 +- src/common/capio/logger.hpp | 2 +- src/common/capio/requests.hpp | 2 +- src/common/capio/response_queue.hpp | 3 +- src/common/capio/shm.hpp | 2 +- src/common/capio/syscall.hpp | 2 +- src/posix/handlers.hpp | 2 +- src/posix/handlers/access.hpp | 2 +- src/posix/handlers/chdir.hpp | 2 +- src/posix/handlers/close.hpp | 2 +- src/posix/handlers/copy_file_range.hpp | 3 +- src/posix/handlers/dup.hpp | 2 +- src/posix/handlers/execve.hpp | 2 +- src/posix/handlers/fchmod.hpp | 2 +- src/posix/handlers/fchown.hpp | 2 +- src/posix/handlers/fcntl.hpp | 2 +- src/posix/handlers/fgetxattr.hpp | 2 +- src/posix/handlers/fork.hpp | 5 +- src/posix/handlers/ioctl.hpp | 2 +- src/posix/handlers/lseek.hpp | 2 +- src/posix/handlers/mkdir.hpp | 3 +- src/posix/handlers/posix_readdir.hpp | 5 +- src/posix/handlers/rename.hpp | 5 +- src/posix/handlers/stat.hpp | 5 +- src/posix/handlers/statfs.hpp | 2 +- src/posix/handlers/statx.hpp | 2 +- src/posix/handlers/unlink.hpp | 2 +- src/posix/libcapio_posix.cpp | 8 +- src/posix/syscall_intercept/CMakeLists.txt | 2 +- .../utils/cache/consent_request_cache.hpp | 4 +- .../utils/cache/read_request_cache_fs.hpp | 3 +- .../utils/cache/read_request_cache_mem.hpp | 1 - .../utils/cache/write_request_cache_fs.hpp | 4 +- .../utils/cache/write_request_cache_mem.hpp | 2 +- src/posix/utils/clone.hpp | 2 +- src/posix/utils/common.hpp | 2 +- src/posix/utils/env.hpp | 2 +- src/posix/utils/snapshot.hpp | 2 +- src/posix/utils/types.hpp | 2 +- src/server/capio-cl-engine/json_parser.hpp | 7 +- src/server/capio_server.cpp | 1 - src/server/client-manager/client_manager.hpp | 3 +- src/server/client-manager/handlers/close.hpp | 3 +- .../client-manager/handlers/consent.hpp | 2 +- src/server/client-manager/handlers/exit.hpp | 2 +- .../handlers/files_in_memory.hpp | 2 +- .../client-manager/handlers/handshake.hpp | 1 - src/server/client-manager/handlers/open.hpp | 2 +- .../client-manager/handlers/posix_readdir.hpp | 3 +- src/server/client-manager/handlers/rename.hpp | 2 +- src/server/client-manager/handlers/write.hpp | 2 +- .../client-manager/request_handler_engine.hpp | 2 +- .../control_plane/capio_control_plane.hpp | 5 +- .../control_plane/fs_control_plane.hpp | 7 +- .../control_plane/multicast_control_plane.hpp | 182 +++--------------- .../control_plane/multicast_utils.hpp | 114 +++++++++++ .../data_plane/BackendInterface.hpp | 2 +- .../data_plane/MTCL_backend.hpp | 7 +- src/server/file-manager/file_manager.hpp | 3 +- src/server/file-manager/file_manager_impl.hpp | 5 - src/server/file-manager/fs_monitor.hpp | 3 +- .../CapioFile/CapioMemoryFile.hpp | 3 - .../CapioFile/CapioRemoteFile.hpp | 3 +- .../storage-service/capio_storage_service.hpp | 1 - src/server/utils/configuration.hpp | 12 +- src/server/utils/distributed_semaphore.hpp | 2 +- src/server/utils/parser.hpp | 8 +- src/server/utils/types.hpp | 2 +- 70 files changed, 281 insertions(+), 309 deletions(-) create mode 100644 src/server/communication-service/control_plane/multicast_utils.hpp diff --git a/src/common/capio/constants.hpp b/src/common/capio/constants.hpp index 6c86ccc5b..decf5a190 100644 --- a/src/common/capio/constants.hpp +++ b/src/common/capio/constants.hpp @@ -11,16 +11,16 @@ typedef unsigned long long int capio_off64_t; // CAPIO files constants -constexpr size_t CAPIO_DEFAULT_DIR_INITIAL_SIZE = 1024L * 1024 * 1024; -constexpr off64_t CAPIO_DEFAULT_FILE_INITIAL_SIZE = 1024L * 1024 * 1024 * 4; +constexpr size_t CAPIO_DEFAULT_DIR_INITIAL_SIZE = 1024L * 1024 * 1024; +constexpr off64_t CAPIO_DEFAULT_FILE_INITIAL_SIZE = 1024L * 1024 * 1024 * 4; [[maybe_unused]] constexpr std::array CAPIO_DIR_FORBIDDEN_PATHS = { std::string_view{"/proc/"}, std::string_view{"/sys/"}, std::string_view{"/boot/"}, - std::string_view{"/dev/"}, std::string_view{"/var/"}, std::string_view{"/run/"}, + std::string_view{"/dev/"}, std::string_view{"/var/"}, std::string_view{"/run/"}, std::string_view("/spack/")}; // CAPIO default values for shared memory constexpr char CAPIO_DEFAULT_WORKFLOW_NAME[] = "CAPIO"; -constexpr char CAPIO_DEFAULT_APP_NAME[] = "default_app"; +constexpr char CAPIO_DEFAULT_APP_NAME[] = "default_app"; constexpr char CAPIO_SHM_CANARY_ERROR[] = "FATAL ERROR: Shared memories for workflow %s already " "exists. One of two (or both) reasons are to blame: \n " @@ -29,52 +29,52 @@ constexpr char CAPIO_SHM_CANARY_ERROR[] = "is already running. Clean shared memory and then retry"; // CAPIO communication constants -constexpr int CAPIO_REQ_BUFF_CNT = 512; // Max number of elements inside buffers -constexpr int CAPIO_CACHE_LINES_DEFAULT = 10; -constexpr int CAPIO_CACHE_LINE_SIZE_DEFAULT = 32768; // 32K of default size for cache lines +constexpr int CAPIO_REQ_BUFF_CNT = 512; // Max number of elements inside buffers +constexpr int CAPIO_CACHE_LINES_DEFAULT = 10; +constexpr int CAPIO_CACHE_LINE_SIZE_DEFAULT = 32768; // 32K of default size for cache lines // TODO: use that in communication only uses the file descriptor instead of the path to save on the // PATH_MAX -constexpr size_t CAPIO_REQ_MAX_SIZE = (PATH_MAX + 256) * sizeof(char); -constexpr char CAPIO_SERVER_CLI_LOG_SERVER[] = "[\033[1;32mSERVER\033[0m"; +constexpr size_t CAPIO_REQ_MAX_SIZE = (PATH_MAX + 256) * sizeof(char); +constexpr char CAPIO_SERVER_CLI_LOG_SERVER[] = "[\033[1;32mSERVER\033[0m"; constexpr char CAPIO_SERVER_CLI_LOG_SERVER_WARNING[] = "[\033[1;33mSERVER\033[0m"; -constexpr char CAPIO_SERVER_CLI_LOG_SERVER_ERROR[] = "[\033[1;31mSERVER\033[0m"; -constexpr char LOG_CAPIO_START_REQUEST[] = "\n+++++++++++ SYSCALL %s (%d) +++++++++++"; -constexpr char LOG_CAPIO_END_REQUEST[] = "----------- END SYSCALL ----------\n"; +constexpr char CAPIO_SERVER_CLI_LOG_SERVER_ERROR[] = "[\033[1;31mSERVER\033[0m"; +constexpr char LOG_CAPIO_START_REQUEST[] = "\n+++++++++++ SYSCALL %s (%d) +++++++++++"; +constexpr char LOG_CAPIO_END_REQUEST[] = "----------- END SYSCALL ----------\n"; constexpr char CAPIO_SERVER_LOG_START_REQUEST_MSG[] = "\n+++++++++++++++++REQUEST+++++++++++++++++"; -constexpr char CAPIO_SERVER_LOG_END_REQUEST_MSG[] = "~~~~~~~~~~~~~~~END REQUEST~~~~~~~~~~~~~~~"; -constexpr int CAPIO_LOG_MAX_MSG_LEN = 4096; -constexpr int CAPIO_MAX_SPSQUEUE_ELEMS = 10; -constexpr int CAPIO_MAX_SPSCQUEUE_ELEM_SIZE = 1024 * 256; +constexpr char CAPIO_SERVER_LOG_END_REQUEST_MSG[] = "~~~~~~~~~~~~~~~END REQUEST~~~~~~~~~~~~~~~"; +constexpr int CAPIO_LOG_MAX_MSG_LEN = 4096; +constexpr int CAPIO_MAX_SPSQUEUE_ELEMS = 10; +constexpr int CAPIO_MAX_SPSCQUEUE_ELEM_SIZE = 1024 * 256; // CAPIO streaming semantics -constexpr char CAPIO_FILE_MODE_NO_UPDATE[] = "no_update"; -constexpr char CAPIO_FILE_MODE_UPDATE[] = "update"; -constexpr char CAPIO_FILE_COMMITTED_ON_CLOSE[] = "on_close"; -constexpr char CAPIO_FILE_COMMITTED_ON_FILE[] = "on_file"; -constexpr char CAPIO_FILE_COMMITTED_N_FILES[] = "n_files"; +constexpr char CAPIO_FILE_MODE_NO_UPDATE[] = "no_update"; +constexpr char CAPIO_FILE_MODE_UPDATE[] = "update"; +constexpr char CAPIO_FILE_COMMITTED_ON_CLOSE[] = "on_close"; +constexpr char CAPIO_FILE_COMMITTED_ON_FILE[] = "on_file"; +constexpr char CAPIO_FILE_COMMITTED_N_FILES[] = "n_files"; constexpr char CAPIO_FILE_COMMITTED_ON_TERMINATION[] = "on_termination"; // CAPIO POSIX return codes -constexpr int CAPIO_POSIX_SYSCALL_ERRNO = -1; +constexpr int CAPIO_POSIX_SYSCALL_ERRNO = -1; constexpr int CAPIO_POSIX_SYSCALL_REQUEST_SKIP = -2; -constexpr int CAPIO_POSIX_SYSCALL_SKIP = 1; -constexpr int CAPIO_POSIX_SYSCALL_SUCCESS = 0; +constexpr int CAPIO_POSIX_SYSCALL_SKIP = 1; +constexpr int CAPIO_POSIX_SYSCALL_SUCCESS = 0; // CAPIO logger - common -constexpr char CAPIO_LOG_PRE_MSG[] = "at[%.15llu][%.40s]: "; +constexpr char CAPIO_LOG_PRE_MSG[] = "at[%.15llu][%.40s]: "; constexpr char CAPIO_DEFAULT_LOG_FOLDER[] = "capio_logs\0"; // CAPIO common - shared memory constant names -constexpr char SHM_FIRST_ELEM[] = "_first_elem_"; -constexpr char SHM_LAST_ELEM[] = "_last_elem_"; -constexpr char SHM_MUTEX_PREFIX[] = "_mutex_"; -constexpr char SHM_SEM_ELEMS[] = "_sem_num_elems_"; -constexpr char SHM_SEM_EMPTY[] = "_sem_num_empty_"; +constexpr char SHM_FIRST_ELEM[] = "_first_elem_"; +constexpr char SHM_LAST_ELEM[] = "_last_elem_"; +constexpr char SHM_MUTEX_PREFIX[] = "_mutex_"; +constexpr char SHM_SEM_ELEMS[] = "_sem_num_elems_"; +constexpr char SHM_SEM_EMPTY[] = "_sem_num_empty_"; constexpr char SHM_SPSC_PREFIX_WRITE[] = "capio_write_tid_"; -constexpr char SHM_SPSC_PREFIX_READ[] = "capio_read_tid_"; +constexpr char SHM_SPSC_PREFIX_READ[] = "capio_read_tid_"; // CAPIO common - shared channel by client and server -constexpr char SHM_COMM_CHAN_NAME[] = "request_buffer"; +constexpr char SHM_COMM_CHAN_NAME[] = "request_buffer"; constexpr char SHM_COMM_CHAN_NAME_RESP[] = "response_buffer_"; // CAPIO logger - shm errors @@ -84,8 +84,8 @@ constexpr char CAPIO_SHM_OPEN_ERROR[] = // CAPIO logger - POSIX constexpr char CAPIO_LOG_POSIX_DEFAULT_LOG_FILE_PREFIX[] = "posix_thread_\0"; -constexpr char CAPIO_LOG_POSIX_SYSCALL_START[] = "\n+++++++++ SYSCALL %s (%d) +++++++++"; -constexpr char CAPIO_LOG_POSIX_SYSCALL_END[] = "~~~~~~~~~ END SYSCALL ~~~~~~~~~\n"; +constexpr char CAPIO_LOG_POSIX_SYSCALL_START[] = "\n+++++++++ SYSCALL %s (%d) +++++++++"; +constexpr char CAPIO_LOG_POSIX_SYSCALL_END[] = "~~~~~~~~~ END SYSCALL ~~~~~~~~~\n"; // CAPIO logger - server constexpr char CAPIO_SERVER_DEFAULT_LOG_FILE_PREFIX[] = "server_thread_\0"; @@ -103,10 +103,10 @@ constexpr char CAPIO_LOG_SERVER_BANNER[] = "\\______/\n\n" "\033[0m CAPIO - Cross Application Programmable IO \n" " V. " CAPIO_VERSION "\n\n"; -constexpr char CAPIO_LOG_SERVER_CLI_LEVEL_INFO[] = "[\033[1;32mSERVER\033[0m"; +constexpr char CAPIO_LOG_SERVER_CLI_LEVEL_INFO[] = "[\033[1;32mSERVER\033[0m"; constexpr char CAPIO_LOG_SERVER_CLI_LEVEL_WARNING[] = "[\033[1;33mSERVER\033[0m"; -constexpr char CAPIO_LOG_SERVER_CLI_LEVEL_ERROR[] = "[\033[1;31mSERVER\033[0m"; -constexpr char CAPIO_LOG_SERVER_CLI_LEVEL_JSON[] = "[\033[1;34mSERVER\033[0m"; +constexpr char CAPIO_LOG_SERVER_CLI_LEVEL_ERROR[] = "[\033[1;31mSERVER\033[0m"; +constexpr char CAPIO_LOG_SERVER_CLI_LEVEL_JSON[] = "[\033[1;34mSERVER\033[0m"; constexpr char CAPIO_LOG_SERVER_CLI_LOGGING_ENABLED_WARNING[] = "[\033[1;33mSERVER\033[0m] " "|==================================================================|\n" @@ -121,7 +121,7 @@ constexpr char CAPIO_LOG_SERVER_CLI_LOGGING_ENABLED_WARNING[] = constexpr char CAPIO_LOG_SERVER_CLI_LOGGING_NOT_AVAILABLE[] = "CAPIO_LOG set but log support was not compiled into CAPIO!"; constexpr char CAPIO_LOG_SERVER_REQUEST_START[] = "+++++++++++ REQUEST +++++++++++"; -constexpr char CAPIO_LOG_SERVER_REQUEST_END[] = "~~~~~~~~~ END REQUEST ~~~~~~~~~\n"; +constexpr char CAPIO_LOG_SERVER_REQUEST_END[] = "~~~~~~~~~ END REQUEST ~~~~~~~~~\n"; // CAPIO server argument parser constexpr char CAPIO_SERVER_ARG_PARSER_PRE[] = @@ -185,17 +185,15 @@ constexpr char CAPIO_SERVER_ARG_PARSER_CONFIG_BACKEND_HELP[] = // CAPIO backend constant values -constexpr int DEFAULT_CAPIO_BACKEND_PORT = 2222; +constexpr int DEFAULT_CAPIO_BACKEND_PORT = 2222; constexpr int CAPIO_BACKEND_DEFAULT_SLEEP_TIME = 300; -constexpr char MULTICAST_DISCOVERY_ADDR[] = "234.234.234.1"; -constexpr char MULTICAST_CONTROLPL_ADDR[] = "234.234.234.2"; -constexpr int MULTICAST_DISCOVERY_PORT = 2223; -constexpr int MULTICAST_CONTROLPL_PORT = 2224; - +constexpr char MULTICAST_DISCOVERY_ADDR[] = "234.234.234.1"; +constexpr char MULTICAST_CONTROLPL_ADDR[] = "234.234.234.2"; +constexpr int MULTICAST_DISCOVERY_PORT = 2223; +constexpr int MULTICAST_CONTROLPL_PORT = 2224; // hostname + : + sizeof(port) constexpr int MULTICAST_ALIVE_TOKEN_MESSAGE_SIZE = HOST_NAME_MAX + 10; -constexpr int MULTICAST_CONTROLPL_MESSAGE_SIZE = HOST_NAME_MAX + PATH_MAX + 10; - +constexpr int MULTICAST_CONTROLPL_MESSAGE_SIZE = HOST_NAME_MAX + PATH_MAX + 10; -#endif // CAPIO_COMMON_CONSTANTS_HPP \ No newline at end of file +#endif // CAPIO_COMMON_CONSTANTS_HPP \ No newline at end of file diff --git a/src/common/capio/env.hpp b/src/common/capio/env.hpp index ae28b7e73..f04d3c759 100644 --- a/src/common/capio/env.hpp +++ b/src/common/capio/env.hpp @@ -23,9 +23,7 @@ inline const std::filesystem::path &get_capio_dir() { if (val == nullptr) { ERR_EXIT("Fatal: CAPIO_DIR not provided!"); - } else { - const char *realpath_res = capio_realpath(val, buf.get()); if (realpath_res == nullptr) { ERR_EXIT("error CAPIO_DIR: directory %s does not " @@ -111,4 +109,4 @@ inline std::string get_capio_workflow_name() { return name; } -#endif // CAPIO_COMMON_ENV_HPP +#endif // CAPIO_COMMON_ENV_HPP \ No newline at end of file diff --git a/src/common/capio/filesystem.hpp b/src/common/capio/filesystem.hpp index 24814782b..3269b21dc 100644 --- a/src/common/capio/filesystem.hpp +++ b/src/common/capio/filesystem.hpp @@ -115,4 +115,4 @@ inline bool is_capio_path(const std::filesystem::path &path_to_check) { return std::regex(computed); } -#endif // CAPIO_COMMON_FILESYSTEM_HPP +#endif // CAPIO_COMMON_FILESYSTEM_HPP \ No newline at end of file diff --git a/src/common/capio/logger.hpp b/src/common/capio/logger.hpp index 5be99f07c..c9fa8539f 100644 --- a/src/common/capio/logger.hpp +++ b/src/common/capio/logger.hpp @@ -33,7 +33,7 @@ inline thread_local char logfile_path[PATH_MAX]{'\0'}; inline thread_local int current_log_level = 0; inline thread_local bool logging_syscall = false; // this variable tells the logger that syscall logging - // has started and we are not in setup phase +// has started and we are not in setup phase #ifndef CAPIO_MAX_LOG_LEVEL // capio max log level. defaults to -1, where everything is logged #define CAPIO_MAX_LOG_LEVEL -1 diff --git a/src/common/capio/requests.hpp b/src/common/capio/requests.hpp index 627cdff92..25ec7e11d 100644 --- a/src/common/capio/requests.hpp +++ b/src/common/capio/requests.hpp @@ -18,4 +18,4 @@ constexpr const int CAPIO_REQUEST_POSIX_DIR_COMMITTED = 13; constexpr const int CAPIO_NR_REQUESTS = 14; -#endif // CAPIO_COMMON_REQUESTS_HPP +#endif // CAPIO_COMMON_REQUESTS_HPP \ No newline at end of file diff --git a/src/common/capio/response_queue.hpp b/src/common/capio/response_queue.hpp index 0f73458b5..9df2b57b1 100644 --- a/src/common/capio/response_queue.hpp +++ b/src/common/capio/response_queue.hpp @@ -43,6 +43,7 @@ class ResponseQueue { ResponseQueue(const ResponseQueue &) = delete; ResponseQueue &operator=(const ResponseQueue &) = delete; + ~ResponseQueue() { START_LOG(capio_syscall(SYS_gettid), "call(_shm_name=%s)", _shm_name.c_str()); if (require_cleanup) { @@ -67,4 +68,4 @@ class ResponseQueue { _shared_mutex.unlock(); } }; -#endif // CAPIO_RESPONSE_QUEUE_HPP +#endif // CAPIO_RESPONSE_QUEUE_HPP \ No newline at end of file diff --git a/src/common/capio/shm.hpp b/src/common/capio/shm.hpp index e2bd992f4..bef292451 100644 --- a/src/common/capio/shm.hpp +++ b/src/common/capio/shm.hpp @@ -198,4 +198,4 @@ void *get_shm_if_exist(const std::string &shm_name) { return p; } -#endif // CAPIO_COMMON_SHM_HPP +#endif // CAPIO_COMMON_SHM_HPP \ No newline at end of file diff --git a/src/common/capio/syscall.hpp b/src/common/capio/syscall.hpp index a4c182dc6..d282ccad4 100644 --- a/src/common/capio/syscall.hpp +++ b/src/common/capio/syscall.hpp @@ -30,4 +30,4 @@ inline char *syscall_no_intercept_realpath(const char *path, char *resolved) { #define gettid() capio_syscall(SYS_gettid) #endif -#endif // CAPIO_COMMON_SYSCALL_HPP +#endif // CAPIO_COMMON_SYSCALL_HPP \ No newline at end of file diff --git a/src/posix/handlers.hpp b/src/posix/handlers.hpp index fd1aeed3e..4fe0f53eb 100644 --- a/src/posix/handlers.hpp +++ b/src/posix/handlers.hpp @@ -36,4 +36,4 @@ #include "handlers/posix_readdir.hpp" -#endif // CAPIO_POSIX_HANDLERS_HPP +#endif // CAPIO_POSIX_HANDLERS_HPP \ No newline at end of file diff --git a/src/posix/handlers/access.hpp b/src/posix/handlers/access.hpp index 3990e883d..164ba6b31 100644 --- a/src/posix/handlers/access.hpp +++ b/src/posix/handlers/access.hpp @@ -63,4 +63,4 @@ int faccessat_handler(long arg0, long arg1, long arg2, long arg3, long arg4, lon } #endif // SYS_faccessat -#endif // CAPIO_POSIX_HANDLERS_ACCESS_HPP +#endif // CAPIO_POSIX_HANDLERS_ACCESS_HPP \ No newline at end of file diff --git a/src/posix/handlers/chdir.hpp b/src/posix/handlers/chdir.hpp index 30cd69a71..cec94e812 100644 --- a/src/posix/handlers/chdir.hpp +++ b/src/posix/handlers/chdir.hpp @@ -32,4 +32,4 @@ int chdir_handler(long arg0, long arg1, long arg2, long arg3, long arg4, long ar } #endif // SYS_chdir -#endif // CAPIO_POSIX_HANDLERS_CHDIR_HPP +#endif // CAPIO_POSIX_HANDLERS_CHDIR_HPP \ No newline at end of file diff --git a/src/posix/handlers/close.hpp b/src/posix/handlers/close.hpp index 1107e47fc..d04608ecc 100644 --- a/src/posix/handlers/close.hpp +++ b/src/posix/handlers/close.hpp @@ -20,4 +20,4 @@ int close_handler(long arg0, long arg1, long arg2, long arg3, long arg4, long ar } #endif // SYS_close -#endif // CAPIO_POSIX_HANDLERS_CLOSE_HPP +#endif // CAPIO_POSIX_HANDLERS_CLOSE_HPP \ No newline at end of file diff --git a/src/posix/handlers/copy_file_range.hpp b/src/posix/handlers/copy_file_range.hpp index be056cf9c..01e4d4fce 100644 --- a/src/posix/handlers/copy_file_range.hpp +++ b/src/posix/handlers/copy_file_range.hpp @@ -1,5 +1,6 @@ #ifndef CAPIO_COPY_FILE_RANGE_HPP #define CAPIO_COPY_FILE_RANGE_HPP + int copy_file_range_handler(long arg0, long arg1, long arg2, long arg3, long arg4, long arg5, long *result) { auto tid = static_cast(syscall_no_intercept(SYS_gettid)); @@ -20,4 +21,4 @@ int copy_file_range_handler(long arg0, long arg1, long arg2, long arg3, long arg return CAPIO_POSIX_SYSCALL_SKIP; } -#endif // CAPIO_COPY_FILE_RANGE_HPP +#endif // CAPIO_COPY_FILE_RANGE_HPP \ No newline at end of file diff --git a/src/posix/handlers/dup.hpp b/src/posix/handlers/dup.hpp index 9fcd66cd4..8ee1e1a45 100644 --- a/src/posix/handlers/dup.hpp +++ b/src/posix/handlers/dup.hpp @@ -82,4 +82,4 @@ int dup3_handler(long arg0, long arg1, long arg2, long arg3, long arg4, long arg } #endif // SYS_dup3 -#endif // CAPIO_POSIX_HANDLERS_DUP_HPP +#endif // CAPIO_POSIX_HANDLERS_DUP_HPP \ No newline at end of file diff --git a/src/posix/handlers/execve.hpp b/src/posix/handlers/execve.hpp index b26d97224..47c526a70 100644 --- a/src/posix/handlers/execve.hpp +++ b/src/posix/handlers/execve.hpp @@ -15,4 +15,4 @@ int execve_handler(long arg0, long arg1, long arg2, long arg3, long arg4, long a } #endif // SYS_execve -#endif // CAPIO_POSIX_HANDLERS_EXECVE_HPP +#endif // CAPIO_POSIX_HANDLERS_EXECVE_HPP \ No newline at end of file diff --git a/src/posix/handlers/fchmod.hpp b/src/posix/handlers/fchmod.hpp index 424905b8b..cc9e926f5 100644 --- a/src/posix/handlers/fchmod.hpp +++ b/src/posix/handlers/fchmod.hpp @@ -19,4 +19,4 @@ int fchmod_handler(long arg0, long arg1, long arg2, long arg3, long arg4, long a } #endif // SYS_chmod -#endif // CAPIO_POSIX_HANDLERS_FCHMOD_HPP +#endif // CAPIO_POSIX_HANDLERS_FCHMOD_HPP \ No newline at end of file diff --git a/src/posix/handlers/fchown.hpp b/src/posix/handlers/fchown.hpp index 84b08f6c1..b40398c0b 100644 --- a/src/posix/handlers/fchown.hpp +++ b/src/posix/handlers/fchown.hpp @@ -20,4 +20,4 @@ int fchown_handler(long arg0, long arg1, long arg2, long arg3, long arg4, long a } #endif // SYS_chown -#endif // CAPIO_POSIX_HANDLERS_FCHOWN_HPP +#endif // CAPIO_POSIX_HANDLERS_FCHOWN_HPP \ No newline at end of file diff --git a/src/posix/handlers/fcntl.hpp b/src/posix/handlers/fcntl.hpp index 05ff50cfc..2780f8856 100644 --- a/src/posix/handlers/fcntl.hpp +++ b/src/posix/handlers/fcntl.hpp @@ -22,4 +22,4 @@ int fcntl_handler(long arg0, long arg1, long arg2, long arg3, long arg4, long ar #endif #endif // SYS_fcntl -#endif // CAPIO_POSIX_HANDLERS_FCNTL_HPP +#endif // CAPIO_POSIX_HANDLERS_FCNTL_HPP \ No newline at end of file diff --git a/src/posix/handlers/fgetxattr.hpp b/src/posix/handlers/fgetxattr.hpp index 31c8becaa..3554d799e 100644 --- a/src/posix/handlers/fgetxattr.hpp +++ b/src/posix/handlers/fgetxattr.hpp @@ -19,4 +19,4 @@ int fgetxattr_handler(long arg0, long arg1, long arg2, long arg3, long arg4, lon } #endif // SYS_fgetxattr -#endif // CAPIO_POSIX_HANDLERS_FGETXATTR_HPP +#endif // CAPIO_POSIX_HANDLERS_FGETXATTR_HPP \ No newline at end of file diff --git a/src/posix/handlers/fork.hpp b/src/posix/handlers/fork.hpp index 6ba163c7b..0fbf1dbbf 100644 --- a/src/posix/handlers/fork.hpp +++ b/src/posix/handlers/fork.hpp @@ -12,7 +12,8 @@ int fork_handler(long arg0, long arg1, long arg2, long arg3, long arg4, long arg START_LOG(parent_tid, "call(pid=%ld)", pid); - if (pid == 0) { // child + if (pid == 0) { + // child auto child_tid = static_cast(syscall_no_intercept(SYS_gettid)); init_process(child_tid); *result = 0; @@ -24,4 +25,4 @@ int fork_handler(long arg0, long arg1, long arg2, long arg3, long arg4, long arg } #endif // SYS_fork -#endif // CAPIO_POSIX_HANDLERS_FORK_HPP +#endif // CAPIO_POSIX_HANDLERS_FORK_HPP \ No newline at end of file diff --git a/src/posix/handlers/ioctl.hpp b/src/posix/handlers/ioctl.hpp index dc019ba12..86d192f7d 100644 --- a/src/posix/handlers/ioctl.hpp +++ b/src/posix/handlers/ioctl.hpp @@ -16,4 +16,4 @@ int ioctl_handler(long arg0, long arg1, long arg2, long arg3, long arg4, long ar } #endif // SYS_ioctl -#endif // CAPIO_POSIX_HANDLERS_IOCTL_HPP +#endif // CAPIO_POSIX_HANDLERS_IOCTL_HPP \ No newline at end of file diff --git a/src/posix/handlers/lseek.hpp b/src/posix/handlers/lseek.hpp index 07f9ef8b5..814dd6b4d 100644 --- a/src/posix/handlers/lseek.hpp +++ b/src/posix/handlers/lseek.hpp @@ -30,4 +30,4 @@ int lseek_handler(long arg0, long arg1, long arg2, long arg3, long arg4, long ar } #endif // SYS_lseek || SYS_llseek -#endif // CAPIO_POSIX_HANDLERS_LSEEK_HPP +#endif // CAPIO_POSIX_HANDLERS_LSEEK_HPP \ No newline at end of file diff --git a/src/posix/handlers/mkdir.hpp b/src/posix/handlers/mkdir.hpp index 4ec036422..bd78c15c2 100644 --- a/src/posix/handlers/mkdir.hpp +++ b/src/posix/handlers/mkdir.hpp @@ -32,7 +32,6 @@ inline off64_t capio_mkdirat(int dirfd, const std::string_view &pathname, mode_t } if (is_capio_path(path)) { - create_request(-1, path, tid); } return CAPIO_POSIX_SYSCALL_REQUEST_SKIP; @@ -75,4 +74,4 @@ int rmdir_handler(long arg0, long arg1, long arg2, long arg3, long arg4, long ar } #endif // SYS_rmdir -#endif // CAPIO_POSIX_HANDLERS_MKDIR_HPP +#endif // CAPIO_POSIX_HANDLERS_MKDIR_HPP \ No newline at end of file diff --git a/src/posix/handlers/posix_readdir.hpp b/src/posix/handlers/posix_readdir.hpp index b588f93ec..8620b6d63 100644 --- a/src/posix/handlers/posix_readdir.hpp +++ b/src/posix/handlers/posix_readdir.hpp @@ -67,7 +67,6 @@ inline void init_posix_dirent() { } inline unsigned long int load_files_from_directory(const char *path) { - START_LOG(capio_syscall(SYS_gettid), "call(path=%s)", path); syscall_no_intercept_flag = true; @@ -178,7 +177,6 @@ inline struct dirent64 *capio_internal_readdir(DIR *dirp, long pid) { } DIR *opendir(const char *name) { - START_LOG(capio_syscall(SYS_gettid), "call(path=%s)", name); if (is_forbidden_path(name)) { @@ -380,7 +378,6 @@ void seekdir(DIR *dirp, long int loc) { } int readdir_r(DIR *dirp, struct dirent *entry, struct dirent **result) { - /* * WARN: I have not yet clear the usage of this function, as such bugs are surely presents * TODO: implement the correct handling logic for this method @@ -412,4 +409,4 @@ int readdir_r(DIR *dirp, struct dirent *entry, struct dirent **result) { return 0; } -#endif // POSIX_READDIR_HPP +#endif // POSIX_READDIR_HPP \ No newline at end of file diff --git a/src/posix/handlers/rename.hpp b/src/posix/handlers/rename.hpp index 1cc7b0d8a..4c61e8a5d 100644 --- a/src/posix/handlers/rename.hpp +++ b/src/posix/handlers/rename.hpp @@ -16,7 +16,8 @@ int rename_handler(long arg0, long arg1, long arg2, long arg3, long arg4, long a auto newpath_abs = capio_absolute(newpath); LOG("newpath absolute: %s", newpath_abs.c_str()); - if (is_prefix(oldpath_abs, newpath_abs)) { // TODO: The check is more complex + if (is_prefix(oldpath_abs, newpath_abs)) { + // TODO: The check is more complex errno = EINVAL; *result = -errno; return CAPIO_POSIX_SYSCALL_SUCCESS; @@ -37,4 +38,4 @@ int rename_handler(long arg0, long arg1, long arg2, long arg3, long arg4, long a } #endif // SYS_rename -#endif // CAPIO_POSIX_HANDLERS_RENAME_HPP +#endif // CAPIO_POSIX_HANDLERS_RENAME_HPP \ No newline at end of file diff --git a/src/posix/handlers/stat.hpp b/src/posix/handlers/stat.hpp index 76e8ffd73..43264fb9e 100644 --- a/src/posix/handlers/stat.hpp +++ b/src/posix/handlers/stat.hpp @@ -61,7 +61,8 @@ inline int capio_fstatat(int dirfd, const std::string_view &pathname, struct sta std::filesystem::path path(pathname); if (path.empty() && (flags & AT_EMPTY_PATH) == AT_EMPTY_PATH) { - if (dirfd == AT_FDCWD) { // operate on currdir + if (dirfd == AT_FDCWD) { + // operate on currdir return capio_lstat(get_current_dir().native(), statbuf, tid); } // operate on dirfd. in this case dirfd can refer to any type of file @@ -129,4 +130,4 @@ int stat_handler(long arg0, long arg1, long arg2, long arg3, long arg4, long arg } #endif // SYS_stat || SYS_stat64 -#endif // CAPIO_POSIX_HANDLERS_STAT_HPP +#endif // CAPIO_POSIX_HANDLERS_STAT_HPP \ No newline at end of file diff --git a/src/posix/handlers/statfs.hpp b/src/posix/handlers/statfs.hpp index 73f47a6a1..7c4488ff3 100644 --- a/src/posix/handlers/statfs.hpp +++ b/src/posix/handlers/statfs.hpp @@ -17,4 +17,4 @@ int fstatfs_handler(long arg0, long arg1, long arg2, long arg3, long arg4, long } #endif // SYS_fstatfs || SYS_fstatfs64 -#endif // CAPIO_POSIX_HANDLERS_STATFS_HPP +#endif // CAPIO_POSIX_HANDLERS_STATFS_HPP \ No newline at end of file diff --git a/src/posix/handlers/statx.hpp b/src/posix/handlers/statx.hpp index 86abbb9f6..e931ef2db 100644 --- a/src/posix/handlers/statx.hpp +++ b/src/posix/handlers/statx.hpp @@ -35,4 +35,4 @@ int statx_handler(long arg0, long arg1, long arg2, long arg3, long arg4, long ar } #endif // SYS_statx -#endif // CAPIO_POSIX_HANDLERS_STATX_HPP +#endif // CAPIO_POSIX_HANDLERS_STATX_HPP \ No newline at end of file diff --git a/src/posix/handlers/unlink.hpp b/src/posix/handlers/unlink.hpp index 741d3fac1..58e5cbba1 100644 --- a/src/posix/handlers/unlink.hpp +++ b/src/posix/handlers/unlink.hpp @@ -36,4 +36,4 @@ int unlinkat_handler(long arg0, long arg1, long arg2, long arg3, long arg4, long } #endif // SYS_unlinkat -#endif // CAPIO_POSIX_HANDLERS_UNLINK_HPP +#endif // CAPIO_POSIX_HANDLERS_UNLINK_HPP \ No newline at end of file diff --git a/src/posix/libcapio_posix.cpp b/src/posix/libcapio_posix.cpp index 131cb631b..57ad15ce2 100644 --- a/src/posix/libcapio_posix.cpp +++ b/src/posix/libcapio_posix.cpp @@ -434,7 +434,11 @@ static int hook(long syscall_number, long arg0, long arg1, long arg2, long arg3, return 1; } -static __attribute__((constructor)) void init() { +static + __attribute__((constructor)) + + void + init() { init_client(); init_filesystem(); init_threading_support(); @@ -457,4 +461,4 @@ static __attribute__((constructor)) void init() { intercept_hook_point_clone_parent = hook_clone_parent; intercept_hook_point = hook; START_SYSCALL_LOGGING(); -} +} \ No newline at end of file diff --git a/src/posix/syscall_intercept/CMakeLists.txt b/src/posix/syscall_intercept/CMakeLists.txt index df762ade0..6215c0dde 100644 --- a/src/posix/syscall_intercept/CMakeLists.txt +++ b/src/posix/syscall_intercept/CMakeLists.txt @@ -18,7 +18,7 @@ ExternalProject_Add(syscall_intercept GIT_TAG b05ff8037de2eb7b44dfb7fa372cfe08d565ba84 PREFIX ${CMAKE_CURRENT_BINARY_DIR} CMAKE_ARGS - -DSTATIC_CAPSTONE=ON + -DSTATIC_CAPSTONE=ON -DBUILD_TESTS=OFF -DBUILD_EXAMPLES=OFF -DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE} diff --git a/src/posix/utils/cache/consent_request_cache.hpp b/src/posix/utils/cache/consent_request_cache.hpp index 753e5ac11..0b2e4058c 100644 --- a/src/posix/utils/cache/consent_request_cache.hpp +++ b/src/posix/utils/cache/consent_request_cache.hpp @@ -1,7 +1,7 @@ #ifndef CONSENT_REQUEST_CACHE_HPP #define CONSENT_REQUEST_CACHE_HPP -class ConsentRequestCache { +class ConsentRequestCache { std::unordered_map *available_consent; // Block until server allows for proceeding to a generic request @@ -49,4 +49,4 @@ class ConsentRequestCache { } }; -#endif // CONSENT_REQUEST_CACHE_HPP +#endif // CONSENT_REQUEST_CACHE_HPP \ No newline at end of file diff --git a/src/posix/utils/cache/read_request_cache_fs.hpp b/src/posix/utils/cache/read_request_cache_fs.hpp index 3cb446dcf..a82130a40 100644 --- a/src/posix/utils/cache/read_request_cache_fs.hpp +++ b/src/posix/utils/cache/read_request_cache_fs.hpp @@ -1,5 +1,6 @@ #ifndef READ_REQUEST_CACHE_FS_HPP #define READ_REQUEST_CACHE_FS_HPP + class ReadRequestCacheFS { int current_fd = -1; capio_off64_t max_read = 0; @@ -74,4 +75,4 @@ class ReadRequestCacheFS { }; }; -#endif // READ_REQUEST_CACHE_FS_HPP +#endif // READ_REQUEST_CACHE_FS_HPP \ No newline at end of file diff --git a/src/posix/utils/cache/read_request_cache_mem.hpp b/src/posix/utils/cache/read_request_cache_mem.hpp index bf529a8d5..5fbc8da8d 100644 --- a/src/posix/utils/cache/read_request_cache_mem.hpp +++ b/src/posix/utils/cache/read_request_cache_mem.hpp @@ -141,7 +141,6 @@ class ReadRequestCacheMEM { actual_read_size = count; _last_read_end = get_capio_fd_offset(_fd) + count; set_capio_fd_offset(fd, _last_read_end); - } else { // There could be some data available already on the cache. Copy that first and then // proceed to request the other missing data diff --git a/src/posix/utils/cache/write_request_cache_fs.hpp b/src/posix/utils/cache/write_request_cache_fs.hpp index a84dbcd06..a2da60615 100644 --- a/src/posix/utils/cache/write_request_cache_fs.hpp +++ b/src/posix/utils/cache/write_request_cache_fs.hpp @@ -1,7 +1,7 @@ #ifndef WRITE_REQUEST_CACHE_FS_HPP #define WRITE_REQUEST_CACHE_FS_HPP -class WriteRequestCacheFS { +class WriteRequestCacheFS { int current_fd = -1; capio_off64_t current_size = 0; @@ -55,4 +55,4 @@ class WriteRequestCacheFS { } }; -#endif // WRITE_REQUEST_CACHE_FS_HPP +#endif // WRITE_REQUEST_CACHE_FS_HPP \ No newline at end of file diff --git a/src/posix/utils/cache/write_request_cache_mem.hpp b/src/posix/utils/cache/write_request_cache_mem.hpp index 34c570460..fbe7b85a5 100644 --- a/src/posix/utils/cache/write_request_cache_mem.hpp +++ b/src/posix/utils/cache/write_request_cache_mem.hpp @@ -97,4 +97,4 @@ class WriteRequestCacheMEM { } }; -#endif // WRITE_REQUEST_CACHE_MEM_HPP +#endif // WRITE_REQUEST_CACHE_MEM_HPP \ No newline at end of file diff --git a/src/posix/utils/clone.hpp b/src/posix/utils/clone.hpp index a84f5c66c..650ddc317 100644 --- a/src/posix/utils/clone.hpp +++ b/src/posix/utils/clone.hpp @@ -114,4 +114,4 @@ inline void hook_clone_parent(long child_tid) { clone_cv.notify_all(); } -#endif // CAPIO_POSIX_UTILS_CLONE_HPP +#endif // CAPIO_POSIX_UTILS_CLONE_HPP \ No newline at end of file diff --git a/src/posix/utils/common.hpp b/src/posix/utils/common.hpp index 8f97bc5ec..5f6be981a 100644 --- a/src/posix/utils/common.hpp +++ b/src/posix/utils/common.hpp @@ -24,4 +24,4 @@ inline bool is_file_to_store_in_memory(std::filesystem::path &path, const long p [&path](const std::regex ®ex) { return std::regex_match(path.string(), regex); }); } -#endif // CAPIO_FUNCTIONS_H +#endif // CAPIO_FUNCTIONS_H \ No newline at end of file diff --git a/src/posix/utils/env.hpp b/src/posix/utils/env.hpp index 65b912bd4..5bfa2891e 100644 --- a/src/posix/utils/env.hpp +++ b/src/posix/utils/env.hpp @@ -96,4 +96,4 @@ inline bool store_in_memory_only() { return *store_in_memory; } -#endif // CAPIO_POSIX_UTILS_ENV_HPP +#endif // CAPIO_POSIX_UTILS_ENV_HPP \ No newline at end of file diff --git a/src/posix/utils/snapshot.hpp b/src/posix/utils/snapshot.hpp index f213b053d..4399ab0f4 100644 --- a/src/posix/utils/snapshot.hpp +++ b/src/posix/utils/snapshot.hpp @@ -113,4 +113,4 @@ inline void create_snapshot(long tid) { #endif } -#endif // CAPIO_POSIX_UTILS_SNAPSHOT_HPP +#endif // CAPIO_POSIX_UTILS_SNAPSHOT_HPP \ No newline at end of file diff --git a/src/posix/utils/types.hpp b/src/posix/utils/types.hpp index fbd6ee0b4..ba0e6e3a6 100644 --- a/src/posix/utils/types.hpp +++ b/src/posix/utils/types.hpp @@ -15,4 +15,4 @@ typedef std::unordered_map> CPFilesPaths_t; typedef int (*CPHandler_t)(long, long, long, long, long, long, long *); -#endif // CAPIO_POSIX_UTILS_TYPES_HPP +#endif // CAPIO_POSIX_UTILS_TYPES_HPP \ No newline at end of file diff --git a/src/server/capio-cl-engine/json_parser.hpp b/src/server/capio-cl-engine/json_parser.hpp index 761a10515..f343f5871 100644 --- a/src/server/capio-cl-engine/json_parser.hpp +++ b/src/server/capio-cl-engine/json_parser.hpp @@ -9,7 +9,6 @@ * */ class JsonParser { - /** * @brief Check if a string is a representation of a integer number * @@ -131,7 +130,6 @@ class JsonParser { "Found file " + std::string(file)); if (file.is_relative()) { - server_println( CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, "Path : " + std::string(file) + @@ -155,7 +153,6 @@ class JsonParser { } if (app["output_stream"].get_array().get(output_stream)) { - server_println(CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, "No output_stream section found for app " + std::string(app_name)); ERR_EXIT("No output_stream section found for app %s", @@ -258,7 +255,6 @@ class JsonParser { "Invalid commit rule: " + std::string(commit_rule)); ERR_EXIT("error commit rule: %s", std::string(commit_rule).c_str()); } - } else { commit_rule = committed; } @@ -332,7 +328,6 @@ class JsonParser { // TODO: check for globs std::string commit(commit_rule), firerule(mode); if (n_files != -1) { - server_println(CAPIO_LOG_SERVER_CLI_LEVEL_JSON, "Setting path: " + std::string(path) + " n_files to " + std::to_string(n_files)); @@ -487,4 +482,4 @@ class JsonParser { } }; -#endif // JSON_PARSER_HPP +#endif // JSON_PARSER_HPP \ No newline at end of file diff --git a/src/server/capio_server.cpp b/src/server/capio_server.cpp index 7ae6adca4..3162239dd 100644 --- a/src/server/capio_server.cpp +++ b/src/server/capio_server.cpp @@ -40,7 +40,6 @@ #include int main(int argc, char **argv) { - std::cout << CAPIO_LOG_SERVER_BANNER; server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, diff --git a/src/server/client-manager/client_manager.hpp b/src/server/client-manager/client_manager.hpp index 27298e098..10dac6a22 100644 --- a/src/server/client-manager/client_manager.hpp +++ b/src/server/client-manager/client_manager.hpp @@ -92,6 +92,7 @@ class ClientManager { } LOG("Error: tis is not present in files_created_by_producers map"); } + /** * @brief Get the files that a given pid is waiting to be produced * @@ -126,4 +127,4 @@ class ClientManager { inline ClientManager *client_manager; -#endif // CLIENT_MANAGER_HPP +#endif // CLIENT_MANAGER_HPP \ No newline at end of file diff --git a/src/server/client-manager/handlers/close.hpp b/src/server/client-manager/handlers/close.hpp index 86fb6f731..34132a76a 100644 --- a/src/server/client-manager/handlers/close.hpp +++ b/src/server/client-manager/handlers/close.hpp @@ -22,7 +22,6 @@ inline void close_handler(const char *const str) { // producer if (capio_cl_engine->getCommitRule(filename) == CAPIO_FILE_COMMITTED_ON_CLOSE && capio_cl_engine->isProducer(filename, tid)) { - CapioFileManager::setCommitted(path); /** * The increase close count is called only on explicit close() sc, as defined by the @@ -38,4 +37,4 @@ inline void close_handler(const char *const str) { } } -#endif // CAPIO_CLOSE_HPP +#endif // CAPIO_CLOSE_HPP \ No newline at end of file diff --git a/src/server/client-manager/handlers/consent.hpp b/src/server/client-manager/handlers/consent.hpp index e306a7fdc..b8cc17d4c 100644 --- a/src/server/client-manager/handlers/consent.hpp +++ b/src/server/client-manager/handlers/consent.hpp @@ -50,4 +50,4 @@ inline void consent_to_proceed_handler(const char *const str) { file_manager->addThreadAwaitingData(path, tid, ULLONG_MAX); } -#endif // CONSENT_HPP +#endif // CONSENT_HPP \ No newline at end of file diff --git a/src/server/client-manager/handlers/exit.hpp b/src/server/client-manager/handlers/exit.hpp index d882d3c84..8bb55d11b 100644 --- a/src/server/client-manager/handlers/exit.hpp +++ b/src/server/client-manager/handlers/exit.hpp @@ -25,4 +25,4 @@ inline void exit_handler(const char *const str) { client_manager->remove_client(tid); } -#endif // CAPIO_EXIT_HPP +#endif // CAPIO_EXIT_HPP \ No newline at end of file diff --git a/src/server/client-manager/handlers/files_in_memory.hpp b/src/server/client-manager/handlers/files_in_memory.hpp index e0eed75b9..802dc84d4 100644 --- a/src/server/client-manager/handlers/files_in_memory.hpp +++ b/src/server/client-manager/handlers/files_in_memory.hpp @@ -19,4 +19,4 @@ inline void files_to_store_in_memory_handler(const char *const str) { client_manager->reply_to_client(tid, count); } -#endif // FILES_IN_MEMORY_HPP +#endif // FILES_IN_MEMORY_HPP \ No newline at end of file diff --git a/src/server/client-manager/handlers/handshake.hpp b/src/server/client-manager/handlers/handshake.hpp index 37facceae..ddbb4c0a6 100644 --- a/src/server/client-manager/handlers/handshake.hpp +++ b/src/server/client-manager/handlers/handshake.hpp @@ -26,7 +26,6 @@ inline void handshake_handler(const char *const str) { // Unlock client waiting to start LOG("Allowing handshake to continue"); client_manager->reply_to_client(tid, 1); - } else { LOG("Termination phase is in progress. ignoring further handshakes."); client_manager->reply_to_client(tid, 0); diff --git a/src/server/client-manager/handlers/open.hpp b/src/server/client-manager/handlers/open.hpp index 1fb6bc35a..8c9a742cb 100644 --- a/src/server/client-manager/handlers/open.hpp +++ b/src/server/client-manager/handlers/open.hpp @@ -36,4 +36,4 @@ inline void open_handler(const char *const str) { LOG("File does not yet exists. halting operation and adding it to queue"); file_manager->addThreadAwaitingCreation(path, tid); } -#endif // OPEN_HPP +#endif // OPEN_HPP \ No newline at end of file diff --git a/src/server/client-manager/handlers/posix_readdir.hpp b/src/server/client-manager/handlers/posix_readdir.hpp index 3e9377d0d..99ec35f03 100644 --- a/src/server/client-manager/handlers/posix_readdir.hpp +++ b/src/server/client-manager/handlers/posix_readdir.hpp @@ -1,4 +1,3 @@ - #ifndef POSIX_READDIR_HPP #define POSIX_READDIR_HPP @@ -15,4 +14,4 @@ inline void posix_readdir_handler(const char *const str) { storage_service->reply_to_client_raw(pid, metadata_token.c_str(), metadata_token.length()); } -#endif // POSIX_READDIR_HPP +#endif // POSIX_READDIR_HPP \ No newline at end of file diff --git a/src/server/client-manager/handlers/rename.hpp b/src/server/client-manager/handlers/rename.hpp index 3731dabce..da7b58579 100644 --- a/src/server/client-manager/handlers/rename.hpp +++ b/src/server/client-manager/handlers/rename.hpp @@ -20,4 +20,4 @@ inline void rename_handler(const char *const str) { // TODO: check what happen when old or new is to be handled in memory } -#endif // CAPIO_RENAME_HPP +#endif // CAPIO_RENAME_HPP \ No newline at end of file diff --git a/src/server/client-manager/handlers/write.hpp b/src/server/client-manager/handlers/write.hpp index 2448c41b0..608a0fdf8 100644 --- a/src/server/client-manager/handlers/write.hpp +++ b/src/server/client-manager/handlers/write.hpp @@ -40,4 +40,4 @@ inline void write_mem_handler(const char *const str) { storage_service->recive_from_client(tid, path, offset, write_size); } -#endif // WRITE_HPP +#endif // WRITE_HPP \ No newline at end of file diff --git a/src/server/client-manager/request_handler_engine.hpp b/src/server/client-manager/request_handler_engine.hpp index 456a2bfc3..e1cacbffe 100644 --- a/src/server/client-manager/request_handler_engine.hpp +++ b/src/server/client-manager/request_handler_engine.hpp @@ -163,4 +163,4 @@ class RequestHandlerEngine { inline RequestHandlerEngine *request_handlers_engine; -#endif // CAPIO_CL_ENGINE_MAIN_HPP +#endif // CAPIO_CL_ENGINE_MAIN_HPP \ No newline at end of file diff --git a/src/server/communication-service/control_plane/capio_control_plane.hpp b/src/server/communication-service/control_plane/capio_control_plane.hpp index cdd5b7ec1..fbc95b5c0 100644 --- a/src/server/communication-service/control_plane/capio_control_plane.hpp +++ b/src/server/communication-service/control_plane/capio_control_plane.hpp @@ -4,7 +4,7 @@ #include class CapioControlPlane { -public: + public: typedef enum { CREATE, DELETE, WRITE } event_type; virtual ~CapioControlPlane() = default; @@ -16,8 +16,7 @@ class CapioControlPlane { * @param hostname_target */ void notify(event_type event, const std::filesystem::path &path, - const std::string &hostname_target) { - } + const std::string &hostname_target) {} /** * Notify all nodes of the occurence of an event diff --git a/src/server/communication-service/control_plane/fs_control_plane.hpp b/src/server/communication-service/control_plane/fs_control_plane.hpp index f4dacc3b0..2988d84a7 100644 --- a/src/server/communication-service/control_plane/fs_control_plane.hpp +++ b/src/server/communication-service/control_plane/fs_control_plane.hpp @@ -93,11 +93,11 @@ class FSControlPlane : public CapioControlPlane { sleep(1); } -public: + public: explicit FSControlPlane(int backend_port) : _backend_port(backend_port) { gethostname(ownHostname, HOST_NAME_MAX); generate_aliveness_token(backend_port); - continue_execution = new bool(true); + continue_execution = new bool(true); token_used_to_connect_mutex = new std::mutex(); thread = new std::thread(fs_server_aliveness_detector_thread, std::ref(continue_execution), &token_used_to_connect, token_used_to_connect_mutex); @@ -116,8 +116,7 @@ class FSControlPlane : public CapioControlPlane { server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, "FSControlPlane cleanup completed."); } - void notify_all(event_type event, const std::filesystem::path &path) override { - } + void notify_all(event_type event, const std::filesystem::path &path) override {} }; #endif // FS_CONTROL_PLANE_HPP \ No newline at end of file diff --git a/src/server/communication-service/control_plane/multicast_control_plane.hpp b/src/server/communication-service/control_plane/multicast_control_plane.hpp index a3ae992ff..85444c1a4 100644 --- a/src/server/communication-service/control_plane/multicast_control_plane.hpp +++ b/src/server/communication-service/control_plane/multicast_control_plane.hpp @@ -7,131 +7,15 @@ #include #include +#include "multicast_utils.hpp" + class MulticastControlPlane : public CapioControlPlane { - char _discovery_multicast_address[16] = {0}; bool *continue_execution; std::thread *discovery_thread, *controlpl_incoming; std::vector token_used_to_connect; std::mutex *token_used_to_connect_mutex; char ownHostname[HOST_NAME_MAX] = {0}; - - static int open_outgoing_multicast_socket(const char *address, const int port, - sockaddr_in *addr) { - int transmission_socket = socket(AF_INET, SOCK_DGRAM, 0); - if (transmission_socket < 0) { - server_println(CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, - std::string("WARNING: unable to bind multicast socket: ") + - strerror(errno)); - server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, - "Execution will continue only with FS discovery support"); - - return -1; - } - - addr->sin_family = AF_INET; - addr->sin_addr.s_addr = inet_addr(address); - addr->sin_port = htons(port); - return transmission_socket; - }; - - static void send_multicast_alive_token(const int data_plane_backend_port) { - START_LOG(gettid(), "call(data_plane_backend_port=%d)", data_plane_backend_port); - - sockaddr_in addr = {}; - const auto socket = open_outgoing_multicast_socket(MULTICAST_DISCOVERY_ADDR, - MULTICAST_DISCOVERY_PORT, &addr); - - char message[MULTICAST_ALIVE_TOKEN_MESSAGE_SIZE]; - sprintf(message, "%s:%d", capio_global_configuration->node_name, data_plane_backend_port); - - LOG("Sending token: %s", message); - - if (sendto(socket, message, strlen(message), 0, - reinterpret_cast(&addr), sizeof(addr)) < 0) { - server_println(CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, - "WARNING: unable to send alive token(" + std::string(message) + - ") to multicast address!: " + strerror(errno)); - } - LOG("Sent multicast token"); - close(socket); - } - - static int open_outgoing_socket(const char *address_ip, const int port, - sockaddr_in &addr, socklen_t &addrlen) { - START_LOG(gettid(), "call(address=%s, port=%d)", address_ip, port); - int loopback = 0; // disable receive loopback messages - u_int multiple_socket_on_same_address = 1; // enable multiple sockets on same address - - int outgoing_socket = socket(AF_INET, SOCK_DGRAM, 0); - if (outgoing_socket < 0) { - server_println(CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, - std::string("WARNING: unable to open multicast socket: ") + - strerror(errno)); - server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, - "Execution will continue only with FS discovery support"); - return -1; - } - LOG("Created socket"); - - if (setsockopt(outgoing_socket, SOL_SOCKET, SO_REUSEADDR, - (char *) &multiple_socket_on_same_address, - sizeof(multiple_socket_on_same_address)) < 0) { - server_println(CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, - std::string("WARNING: unable to multiple sockets to same address: ") + - strerror(errno)); - server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, - "Execution will continue only with FS discovery support"); - return -1; - } - LOG("Set IP address to accept multiple sockets on same address"); - - if (setsockopt(outgoing_socket, IPPROTO_IP, IP_MULTICAST_LOOP, &loopback, - sizeof(loopback)) < 0) { - server_println( - CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, - std::string("WARNING: unable to filter out loopback incoming messages: ") + - strerror(errno)); - server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, - "Execution will continue only with FS discovery support"); - return -1; - } - LOG("Disabled reception of loopback messages from socket"); - - addr = {}; - addr.sin_family = AF_INET; - addr.sin_addr.s_addr = htonl(INADDR_ANY); - addr.sin_port = htons(port); - addrlen = sizeof(addr); - LOG("Set socket on IP: %s - PORT: %d", address_ip, port); - - // bind to receive address - if (bind(outgoing_socket, reinterpret_cast(&addr), sizeof(addr)) < 0) { - - server_println(CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, - std::string("WARNING: unable to bind multicast socket: ") + - strerror(errno)); - server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, - "Execution will continue only with FS discovery support"); - return -1; - } - LOG("Binded socket"); - - ip_mreq mreq{}; - mreq.imr_multiaddr.s_addr = inet_addr(address_ip); - mreq.imr_interface.s_addr = htonl(INADDR_ANY); - if (setsockopt(outgoing_socket, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)) < 0) { - server_println(CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, - std::string("WARNING: unable to join multicast group: ") + - strerror(errno)); - server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, - "Execution will continue only with FS discovery support"); - return -1; - } - LOG("Successfully joined multicast group"); - return outgoing_socket; - } - static void multicast_server_aliveness_thread(const bool *continue_execution, std::vector *token_used_to_connect, std::mutex *token_used_to_connect_mutex, @@ -144,15 +28,14 @@ class MulticastControlPlane : public CapioControlPlane { const std::string SELF_TOKEN = std::string(capio_global_configuration->node_name) + ":" + std::to_string(dataplane_backend_port); - sockaddr_in addr = {}; + sockaddr_in addr = {}; socklen_t addrlen = {}; - const auto discovery_socket = open_outgoing_socket(MULTICAST_DISCOVERY_ADDR, - MULTICAST_DISCOVERY_PORT, - addr, addrlen); + const auto discovery_socket = + open_outgoing_socket(MULTICAST_DISCOVERY_ADDR, MULTICAST_DISCOVERY_PORT, addr, addrlen); server_println(CAPIO_SERVER_CLI_LOG_SERVER, std::string("Multicast discovery service @ ") + - MULTICAST_DISCOVERY_ADDR + ":" + - std::to_string(MULTICAST_DISCOVERY_PORT)); + MULTICAST_DISCOVERY_ADDR + ":" + + std::to_string(MULTICAST_DISCOVERY_PORT)); while (*continue_execution) { bzero(incomingMessage, sizeof(incomingMessage)); @@ -190,41 +73,38 @@ class MulticastControlPlane : public CapioControlPlane { } } - static void multicast_control_plane_incoming_thread(const bool *continue_execution) { START_LOG(gettid(), "Call(multicast_control_plane_incoming_thread)"); char incoming_msg[MULTICAST_CONTROLPL_MESSAGE_SIZE] = {0}; - sockaddr_in addr = {}; - socklen_t addrlen = {}; - const auto discovery_socket = open_outgoing_socket(MULTICAST_CONTROLPL_ADDR, - MULTICAST_CONTROLPL_PORT, - addr, addrlen); + sockaddr_in addr = {}; + socklen_t addrlen = {}; + const auto discovery_socket = + open_outgoing_socket(MULTICAST_CONTROLPL_ADDR, MULTICAST_CONTROLPL_PORT, addr, addrlen); server_println(CAPIO_SERVER_CLI_LOG_SERVER, std::string("Multicast control plane @ ") + - MULTICAST_CONTROLPL_ADDR + ":" + - std::to_string(MULTICAST_CONTROLPL_PORT)); + MULTICAST_CONTROLPL_ADDR + ":" + + std::to_string(MULTICAST_CONTROLPL_PORT)); while (*continue_execution) { bzero(incoming_msg, sizeof(incoming_msg)); const auto recv_sice = - recvfrom(discovery_socket, incoming_msg, MULTICAST_CONTROLPL_MESSAGE_SIZE, - 0, reinterpret_cast(&addr), &addrlen); - LOG("Received multicast data of size %ld and content %s", recv_sice, - incoming_msg); + recvfrom(discovery_socket, incoming_msg, MULTICAST_CONTROLPL_MESSAGE_SIZE, 0, + reinterpret_cast(&addr), &addrlen); + LOG("Received multicast data of size %ld and content %s", recv_sice, incoming_msg); if (recv_sice < 0) { - server_println( - CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, - std::string("WARNING: received 0 bytes from multicast socket: ")); + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, + std::string("WARNING: received 0 bytes from multicast socket: ")); server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, "Execution will continue only with FS discovery support"); continue; } - MulticastControlPlane::event_type event; + event_type event; char source_hostname[HOST_NAME_MAX]; char source_path[PATH_MAX]; - sscanf(incoming_msg, "%d %s %s", &event, source_hostname, source_path); + sscanf(incoming_msg, "%d %s %s", reinterpret_cast(&event), source_hostname, + source_path); if (strcmp(capio_global_configuration->node_name, source_hostname) == 0) { continue; @@ -237,21 +117,19 @@ class MulticastControlPlane : public CapioControlPlane { close(discovery_socket); } -public: + public: explicit MulticastControlPlane(int dataplane_backend_port) { START_LOG(gettid(), "call(dataplane_backend_port=%d)", dataplane_backend_port); gethostname(ownHostname, HOST_NAME_MAX); - continue_execution = new bool(true); + continue_execution = new bool(true); token_used_to_connect_mutex = new std::mutex(); - discovery_thread = new std::thread(multicast_server_aliveness_thread, - continue_execution, + discovery_thread = new std::thread(multicast_server_aliveness_thread, continue_execution, &token_used_to_connect, token_used_to_connect_mutex, dataplane_backend_port); - controlpl_incoming = new std::thread(multicast_control_plane_incoming_thread, - continue_execution); - + controlpl_incoming = + new std::thread(multicast_control_plane_incoming_thread, continue_execution); } ~MulticastControlPlane() override { @@ -268,7 +146,7 @@ class MulticastControlPlane : public CapioControlPlane { void notify_all(const event_type event, const std::filesystem::path &path) override { START_LOG(gettid(), "call(event=%s, path=%s)", event, path.string().c_str()); - sockaddr_in addr = {}; + sockaddr_in addr = {}; const auto socket = open_outgoing_multicast_socket(MULTICAST_CONTROLPL_ADDR, MULTICAST_CONTROLPL_PORT, &addr); @@ -276,11 +154,11 @@ class MulticastControlPlane : public CapioControlPlane { sprintf(message, "%03d %s %s", event, ownHostname, path.string().c_str()); LOG("Sending message: %s", message); - if (sendto(socket, message, strlen(message), 0, - reinterpret_cast(&addr), sizeof(addr)) < 0) { + if (sendto(socket, message, strlen(message), 0, reinterpret_cast(&addr), + sizeof(addr)) < 0) { server_println(CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, "WARNING: unable to send message(" + std::string(message) + - ") to multicast address!: " + strerror(errno)); + ") to multicast address!: " + strerror(errno)); } LOG("Sent message"); diff --git a/src/server/communication-service/control_plane/multicast_utils.hpp b/src/server/communication-service/control_plane/multicast_utils.hpp new file mode 100644 index 000000000..1b42b50c3 --- /dev/null +++ b/src/server/communication-service/control_plane/multicast_utils.hpp @@ -0,0 +1,114 @@ +#ifndef CAPIO_MULTICAST_UTILS_HPP +#define CAPIO_MULTICAST_UTILS_HPP + +static int open_outgoing_multicast_socket(const char *address, const int port, sockaddr_in *addr) { + int transmission_socket = socket(AF_INET, SOCK_DGRAM, 0); + if (transmission_socket < 0) { + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, + std::string("WARNING: unable to bind multicast socket: ") + strerror(errno)); + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, + "Execution will continue only with FS discovery support"); + + return -1; + } + + addr->sin_family = AF_INET; + addr->sin_addr.s_addr = inet_addr(address); + addr->sin_port = htons(port); + return transmission_socket; +}; + +static void send_multicast_alive_token(const int data_plane_backend_port) { + START_LOG(gettid(), "call(data_plane_backend_port=%d)", data_plane_backend_port); + + sockaddr_in addr = {}; + const auto socket = + open_outgoing_multicast_socket(MULTICAST_DISCOVERY_ADDR, MULTICAST_DISCOVERY_PORT, &addr); + + char message[MULTICAST_ALIVE_TOKEN_MESSAGE_SIZE]; + sprintf(message, "%s:%d", capio_global_configuration->node_name, data_plane_backend_port); + + LOG("Sending token: %s", message); + + if (sendto(socket, message, strlen(message), 0, reinterpret_cast(&addr), + sizeof(addr)) < 0) { + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, + "WARNING: unable to send alive token(" + std::string(message) + + ") to multicast address!: " + strerror(errno)); + } + LOG("Sent multicast token"); + close(socket); +} + +static int open_outgoing_socket(const char *address_ip, const int port, sockaddr_in &addr, + socklen_t &addrlen) { + START_LOG(gettid(), "call(address=%s, port=%d)", address_ip, port); + int loopback = 0; // disable receive loopback messages + u_int multiple_socket_on_same_address = 1; // enable multiple sockets on same address + + int outgoing_socket = socket(AF_INET, SOCK_DGRAM, 0); + if (outgoing_socket < 0) { + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, + std::string("WARNING: unable to open multicast socket: ") + strerror(errno)); + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, + "Execution will continue only with FS discovery support"); + return -1; + } + LOG("Created socket"); + + if (setsockopt(outgoing_socket, SOL_SOCKET, SO_REUSEADDR, + (char *) &multiple_socket_on_same_address, + sizeof(multiple_socket_on_same_address)) < 0) { + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, + std::string("WARNING: unable to multiple sockets to same address: ") + + strerror(errno)); + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, + "Execution will continue only with FS discovery support"); + return -1; + } + LOG("Set IP address to accept multiple sockets on same address"); + + if (setsockopt(outgoing_socket, IPPROTO_IP, IP_MULTICAST_LOOP, &loopback, sizeof(loopback)) < + 0) { + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, + std::string("WARNING: unable to filter out loopback incoming messages: ") + + strerror(errno)); + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, + "Execution will continue only with FS discovery support"); + return -1; + } + LOG("Disabled reception of loopback messages from socket"); + + addr = {}; + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = htonl(INADDR_ANY); + addr.sin_port = htons(port); + addrlen = sizeof(addr); + LOG("Set socket on IP: %s - PORT: %d", address_ip, port); + + // bind to receive address + if (bind(outgoing_socket, reinterpret_cast(&addr), sizeof(addr)) < 0) { + + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, + std::string("WARNING: unable to bind multicast socket: ") + strerror(errno)); + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, + "Execution will continue only with FS discovery support"); + return -1; + } + LOG("Binded socket"); + + ip_mreq mreq{}; + mreq.imr_multiaddr.s_addr = inet_addr(address_ip); + mreq.imr_interface.s_addr = htonl(INADDR_ANY); + if (setsockopt(outgoing_socket, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)) < 0) { + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, + std::string("WARNING: unable to join multicast group: ") + strerror(errno)); + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, + "Execution will continue only with FS discovery support"); + return -1; + } + LOG("Successfully joined multicast group"); + return outgoing_socket; +} + +#endif // CAPIO_MULTICAST_UTILS_HPP \ No newline at end of file diff --git a/src/server/communication-service/data_plane/BackendInterface.hpp b/src/server/communication-service/data_plane/BackendInterface.hpp index 20a001f5a..c0676ca7f 100644 --- a/src/server/communication-service/data_plane/BackendInterface.hpp +++ b/src/server/communication-service/data_plane/BackendInterface.hpp @@ -76,4 +76,4 @@ class NoBackend final : public BackendInterface { inline BackendInterface *capio_backend; -#endif // CAPIOBACKEND_HPP +#endif // CAPIOBACKEND_HPP \ No newline at end of file diff --git a/src/server/communication-service/data_plane/MTCL_backend.hpp b/src/server/communication-service/data_plane/MTCL_backend.hpp index f2339f7b2..a92887abf 100644 --- a/src/server/communication-service/data_plane/MTCL_backend.hpp +++ b/src/server/communication-service/data_plane/MTCL_backend.hpp @@ -32,6 +32,7 @@ class TransportUnit { class MTCL_backend : public BackendInterface { typedef enum { FROM_REMOTE, TO_REMOTE } CONN_HANDLER_ORIGIN; + typedef std::tuple *, std::queue *, std::mutex *> TransportUnitInterface; std::unordered_map connected_hostnames_map; @@ -238,9 +239,9 @@ class MTCL_backend : public BackendInterface { server_connection_handler, std::move(UserManager), remoteHost.c_str(), thread_sleep_times, connection_tuple, terminate, TO_REMOTE)); } else { - server_println(CAPIO_SERVER_CLI_LOG_SERVER_WARNING, - "Warning: tried to connect to " + std::string(remoteHost) + - " but connection is not valid"); + server_println(CAPIO_SERVER_CLI_LOG_SERVER_WARNING, "Warning: tried to connect to " + + std::string(remoteHost) + + " but connection is not valid"); } } diff --git a/src/server/file-manager/file_manager.hpp b/src/server/file-manager/file_manager.hpp index 6d664e9b0..70bc2016c 100644 --- a/src/server/file-manager/file_manager.hpp +++ b/src/server/file-manager/file_manager.hpp @@ -27,6 +27,7 @@ class CapioFileManager { START_LOG(gettid(), "call()"); server_println(CAPIO_SERVER_CLI_LOG_SERVER, "CapioFileManager initialization completed."); } + ~CapioFileManager() { START_LOG(gettid(), "call()"); server_println(CAPIO_SERVER_CLI_LOG_SERVER, "CapioFileManager cleanup completed."); @@ -51,4 +52,4 @@ inline CapioFileManager *file_manager; #include "file_manager_impl.hpp" -#endif // FILE_MANAGER_HEADER_HPP +#endif // FILE_MANAGER_HEADER_HPP \ No newline at end of file diff --git a/src/server/file-manager/file_manager_impl.hpp b/src/server/file-manager/file_manager_impl.hpp index b0efa998b..1d3257448 100644 --- a/src/server/file-manager/file_manager_impl.hpp +++ b/src/server/file-manager/file_manager_impl.hpp @@ -162,7 +162,6 @@ inline void CapioFileManager::_unlockThreadAwaitingData( // remove thread from map LOG("Removing thread %ld from threads awaiting on data", item->first); item = pids_awaiting.erase(item); - } else if (capio_cl_engine->isFirable(path) && filesize >= item->second) { /** * if is Fire No Update and there is enough data @@ -174,16 +173,13 @@ inline void CapioFileManager::_unlockThreadAwaitingData( // remove thread from map LOG("Removing thread %ld from threads awaiting on data", item->first); item = pids_awaiting.erase(item); - } else if (isCommitted(path)) { - LOG("Thread %ld can be unlocked as file is committed", item->first); client_manager->reply_to_client(item->first, ULLONG_MAX); // remove thread from map LOG("Removing thread %ld from threads awaiting on data", item->first); item = pids_awaiting.erase(item); } else { - // DEFAULT: no condition to unlock has occurred, hence wait... LOG("Waiting threads cannot yet be unlocked"); ++item; @@ -429,7 +425,6 @@ inline void CapioFileManager::checkDirectoriesNFiles() const { static const auto loc = capio_cl_engine->getLocations(); for (const auto &[path_config, config] : *loc) { - if (std::get<6>(config)) { /* * In this case we are trying to check for a file. diff --git a/src/server/file-manager/fs_monitor.hpp b/src/server/file-manager/fs_monitor.hpp index f879a91c3..c2c523c30 100644 --- a/src/server/file-manager/fs_monitor.hpp +++ b/src/server/file-manager/fs_monitor.hpp @@ -44,7 +44,6 @@ class FileSystemMonitor { * @param continue_execution */ static void _main(const bool *continue_execution) { - START_LOG(gettid(), "INFO: instance of FileSystemMonitor"); timespec sleep{}; @@ -96,4 +95,4 @@ class FileSystemMonitor { inline FileSystemMonitor *fs_monitor; -#endif // CAPIO_FS_FILE_SYSTEM_MONITOR_HPP +#endif // CAPIO_FS_FILE_SYSTEM_MONITOR_HPP \ No newline at end of file diff --git a/src/server/storage-service/CapioFile/CapioMemoryFile.hpp b/src/server/storage-service/CapioFile/CapioMemoryFile.hpp index 5fd98bd8d..c6eabc0f8 100644 --- a/src/server/storage-service/CapioFile/CapioMemoryFile.hpp +++ b/src/server/storage-service/CapioFile/CapioMemoryFile.hpp @@ -28,7 +28,6 @@ class CapioMemoryFile : public CapioFile { * @return tuple */ static auto compute_offsets(const std::size_t offset, std::size_t length) { - START_LOG(gettid(), "call(offset=%llu, length=%llu)", offset, length); // Compute the offset of the memoryBlocks component. const auto map_offset = offset / _pageSizeBytes; @@ -74,7 +73,6 @@ class CapioMemoryFile : public CapioFile { */ std::size_t writeData(const char *buffer, const std::size_t file_offset, std::size_t buffer_length) override { - const auto &[map_offset, write_offset, first_write_size] = compute_offsets(file_offset, buffer_length); @@ -151,7 +149,6 @@ class CapioMemoryFile : public CapioFile { * @param length */ void readFromQueue(SPSCQueue &queue, std::size_t offset, std::size_t length) override { - const auto &[map_offset, write_offset, first_write_size] = compute_offsets(offset, length); auto remaining_bytes = length; diff --git a/src/server/storage-service/CapioFile/CapioRemoteFile.hpp b/src/server/storage-service/CapioFile/CapioRemoteFile.hpp index 2e7966d64..5181e2bae 100644 --- a/src/server/storage-service/CapioFile/CapioRemoteFile.hpp +++ b/src/server/storage-service/CapioFile/CapioRemoteFile.hpp @@ -6,7 +6,6 @@ #include "CapioFile.hpp" class CapioRemoteFile : public CapioFile { - public: explicit CapioRemoteFile(const std::string &filePath) : CapioFile(filePath) {} @@ -56,4 +55,4 @@ class CapioRemoteFile : public CapioFile { } }; -#endif // CAPIOMEMORYFILE_HPP +#endif // CAPIOMEMORYFILE_HPP \ No newline at end of file diff --git a/src/server/storage-service/capio_storage_service.hpp b/src/server/storage-service/capio_storage_service.hpp index 2481d921d..0f6c3e211 100644 --- a/src/server/storage-service/capio_storage_service.hpp +++ b/src/server/storage-service/capio_storage_service.hpp @@ -7,7 +7,6 @@ #include "CapioFile/CapioRemoteFile.hpp" class CapioStorageService { - // TODO: put all of this conde on a different thread std::unordered_map *_client_to_server_queue; diff --git a/src/server/utils/configuration.hpp b/src/server/utils/configuration.hpp index be2c51294..f26e15045 100644 --- a/src/server/utils/configuration.hpp +++ b/src/server/utils/configuration.hpp @@ -11,7 +11,7 @@ * to all classes and subclasses. */ class CapioGlobalConfiguration { -public: + public: bool termination_phase, StoreOnlyInMemory; std::string workflow_name; pid_t CAPIO_SERVER_MAIN_PID = -1; @@ -19,10 +19,10 @@ class CapioGlobalConfiguration { CapioGlobalConfiguration() { gethostname(node_name, HOST_NAME_MAX); - termination_phase = false; - StoreOnlyInMemory = false; + termination_phase = false; + StoreOnlyInMemory = false; CAPIO_SERVER_MAIN_PID = gettid(); - workflow_name = CAPIO_DEFAULT_WORKFLOW_NAME; + workflow_name = CAPIO_DEFAULT_WORKFLOW_NAME; } }; @@ -34,8 +34,8 @@ inline void server_println(const std::string &message_type = "", std::cout << std::endl; } else { std::cout << message_type << " " << capio_global_configuration->node_name << "] " - << message_line << std::endl - << std::flush; + << message_line << std::endl + << std::flush; } } diff --git a/src/server/utils/distributed_semaphore.hpp b/src/server/utils/distributed_semaphore.hpp index 0e07fb0fc..e7ce1d569 100644 --- a/src/server/utils/distributed_semaphore.hpp +++ b/src/server/utils/distributed_semaphore.hpp @@ -66,4 +66,4 @@ class DistributedSemaphore { } } }; -#endif // DISTRIBUTEDSEMAPHORE_HPP +#endif // DISTRIBUTEDSEMAPHORE_HPP \ No newline at end of file diff --git a/src/server/utils/parser.hpp b/src/server/utils/parser.hpp index 7732765ae..e6e123abb 100644 --- a/src/server/utils/parser.hpp +++ b/src/server/utils/parser.hpp @@ -101,7 +101,7 @@ std::string parseCLI(int argc, char **argv, char *resolve_prefix) { } #ifdef CAPIO_LOG auto logname = open_server_logfile(); - log = new Logger(__func__, __FILE__, __LINE__, gettid(), "Created new log file"); + log = new Logger(__func__, __FILE__, __LINE__, gettid(), "Created new log file"); server_println(CAPIO_SERVER_CLI_LOG_SERVER, "started logging to logfile " + logname.string()); #endif @@ -114,7 +114,7 @@ std::string parseCLI(int argc, char **argv, char *resolve_prefix) { server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, "skipping config file parsing."); server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, "Obtained from environment variable current workflow name: " + - capio_global_configuration->workflow_name); + capio_global_configuration->workflow_name); } else { START_LOG(gettid(), "call()"); @@ -144,13 +144,11 @@ std::string parseCLI(int argc, char **argv, char *resolve_prefix) { port = args::get(backend_port); } - if (backend) { std::string backend_name = args::get(backend); std::transform(backend_name.begin(), backend_name.end(), backend_name.begin(), ::toupper); - capio_communication_service = - new CapioCommunicationService(backend_name, port); + capio_communication_service = new CapioCommunicationService(backend_name, port); } else { server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, "Selected backend is File System"); diff --git a/src/server/utils/types.hpp b/src/server/utils/types.hpp index 81ee86068..05fbb3431 100644 --- a/src/server/utils/types.hpp +++ b/src/server/utils/types.hpp @@ -27,4 +27,4 @@ typedef CircularBuffer CSBufRequest_t; */ typedef void (*CSHandler_t)(const char *const); -#endif // CAPIO_SERVER_UTILS_TYPES_HPP +#endif // CAPIO_SERVER_UTILS_TYPES_HPP \ No newline at end of file From 1a4f29e9c1797127cc6316a54f8b7d75707a15d7 Mon Sep 17 00:00:00 2001 From: Marco Edoardo Santimaria Date: Fri, 29 Aug 2025 13:34:14 +0200 Subject: [PATCH 3/5] Fixed automated startup of control plane --- .../CapioCommunicationService.hpp | 13 ++++++- .../control_plane/capio_control_plane.hpp | 2 +- .../control_plane/multicast_control_plane.hpp | 2 +- src/server/utils/parser.hpp | 37 +++++++------------ tests/multinode/backend/src/MTCL.hpp | 2 +- 5 files changed, 28 insertions(+), 28 deletions(-) diff --git a/src/server/communication-service/CapioCommunicationService.hpp b/src/server/communication-service/CapioCommunicationService.hpp index 0a6568aee..e1e43483b 100644 --- a/src/server/communication-service/CapioCommunicationService.hpp +++ b/src/server/communication-service/CapioCommunicationService.hpp @@ -16,7 +16,8 @@ class CapioCommunicationService { delete capio_backend; }; - CapioCommunicationService(std::string &backend_name, const int port) { + CapioCommunicationService(std::string &backend_name, const int port, + const std::string &control_backend_name) { START_LOG(gettid(), "call(backend_name=%s)", backend_name.c_str()); LOG("My hostname is %s. Starting to listen on connection", @@ -48,6 +49,16 @@ class CapioCommunicationService { server_println(CAPIO_SERVER_CLI_LOG_SERVER, "CapioCommunicationService initialization completed."); + + if (control_backend_name == "fs") { + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, "Starting FS control plane"); + capio_control_plane = new FSControlPlane(port); + } else if (control_backend_name == "multicast") { + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, "Starting multicast control plane"); + capio_control_plane = new MulticastControlPlane(port); + }else { + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, "Error: unknown control plane backend: " + control_backend_name); + } } }; diff --git a/src/server/communication-service/control_plane/capio_control_plane.hpp b/src/server/communication-service/control_plane/capio_control_plane.hpp index fbc95b5c0..6375593e5 100644 --- a/src/server/communication-service/control_plane/capio_control_plane.hpp +++ b/src/server/communication-service/control_plane/capio_control_plane.hpp @@ -19,7 +19,7 @@ class CapioControlPlane { const std::string &hostname_target) {} /** - * Notify all nodes of the occurence of an event + * Notify all nodes of the occurrence of an event * @param event * @param path */ diff --git a/src/server/communication-service/control_plane/multicast_control_plane.hpp b/src/server/communication-service/control_plane/multicast_control_plane.hpp index 85444c1a4..526734abe 100644 --- a/src/server/communication-service/control_plane/multicast_control_plane.hpp +++ b/src/server/communication-service/control_plane/multicast_control_plane.hpp @@ -111,7 +111,7 @@ class MulticastControlPlane : public CapioControlPlane { } server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, - "Recived control message: " + std::string(incoming_msg)); + "Received control message: " + std::string(incoming_msg)); } close(discovery_socket); diff --git a/src/server/utils/parser.hpp b/src/server/utils/parser.hpp index e6e123abb..0faa5211f 100644 --- a/src/server/utils/parser.hpp +++ b/src/server/utils/parser.hpp @@ -139,38 +139,27 @@ std::string parseCLI(int argc, char **argv, char *resolve_prefix) { #endif // Port used for communication backend - int port = DEFAULT_CAPIO_BACKEND_PORT; + int port = DEFAULT_CAPIO_BACKEND_PORT; + std::string control_backend_name = "multicast"; if (backend_port) { port = args::get(backend_port); } - if (backend) { - std::string backend_name = args::get(backend); - std::transform(backend_name.begin(), backend_name.end(), backend_name.begin(), ::toupper); - - capio_communication_service = new CapioCommunicationService(backend_name, port); - - } else { - server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, "Selected backend is File System"); - capio_backend = new NoBackend(); + if (controlPlaneBackend) { + control_backend_name = std::string(args::get(controlPlaneBackend)); } - std::string control_backend_name = "multicast"; - - if (controlPlaneBackend) { - auto tmp = args::get(controlPlaneBackend); - if (tmp == "fs") { - server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, "Starting FS control plane"); - capio_control_plane = new FSControlPlane(port); - } else { - server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, "Starting multicast control plane"); - capio_control_plane = new MulticastControlPlane(port); - } - } else { - server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, "Starting multicast control plane"); - capio_control_plane = new MulticastControlPlane(port); + std::string backend_name = "none"; + if (backend) { + std::string tmp = args::get(backend); + std::ranges::transform(tmp, tmp.begin(), ::toupper); + backend_name = tmp; } + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, "Selected backend is: " + backend_name); + capio_communication_service = + new CapioCommunicationService(backend_name, port, control_backend_name); + if (capio_cl_resolve_path) { auto path = args::get(capio_cl_resolve_path); memcpy(resolve_prefix, path.c_str(), PATH_MAX); diff --git a/tests/multinode/backend/src/MTCL.hpp b/tests/multinode/backend/src/MTCL.hpp index f94dce251..f74c3ccc0 100644 --- a/tests/multinode/backend/src/MTCL.hpp +++ b/tests/multinode/backend/src/MTCL.hpp @@ -13,7 +13,7 @@ TEST(CapioCommServiceTest, TestPingPong) { START_LOG(gettid(), "INFO: TestPingPong"); const int port = 1234; std::string proto = "TCP"; - auto communication_service = new CapioCommunicationService(proto, port); + auto communication_service = new CapioCommunicationService(proto, port, "multicast"); capio_off64_t size_revc, offset; std::vector connections; From a662541b24eee04d2241b6704a10d69b0f3feb98 Mon Sep 17 00:00:00 2001 From: Marco Edoardo Santimaria Date: Fri, 29 Aug 2025 13:40:07 +0200 Subject: [PATCH 4/5] fixes --- .../communication-service/CapioCommunicationService.hpp | 5 +++-- src/server/utils/parser.hpp | 2 ++ 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/server/communication-service/CapioCommunicationService.hpp b/src/server/communication-service/CapioCommunicationService.hpp index e1e43483b..d6e7429b8 100644 --- a/src/server/communication-service/CapioCommunicationService.hpp +++ b/src/server/communication-service/CapioCommunicationService.hpp @@ -56,8 +56,9 @@ class CapioCommunicationService { } else if (control_backend_name == "multicast") { server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, "Starting multicast control plane"); capio_control_plane = new MulticastControlPlane(port); - }else { - server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, "Error: unknown control plane backend: " + control_backend_name); + } else { + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, + "Error: unknown control plane backend: " + control_backend_name); } } }; diff --git a/src/server/utils/parser.hpp b/src/server/utils/parser.hpp index 0faa5211f..8b1126fa4 100644 --- a/src/server/utils/parser.hpp +++ b/src/server/utils/parser.hpp @@ -1,6 +1,8 @@ #ifndef PARSER_HPP #define PARSER_HPP +#include + std::string parseCLI(int argc, char **argv, char *resolve_prefix) { Logger *log; From 55a2803e552106f5927da5f8e10ccbd249dd6663 Mon Sep 17 00:00:00 2001 From: Marco Edoardo Santimaria Date: Fri, 29 Aug 2025 13:44:10 +0200 Subject: [PATCH 5/5] fixes --- .../communication-service/CapioCommunicationService.hpp | 3 +++ src/server/utils/parser.hpp | 4 +--- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/server/communication-service/CapioCommunicationService.hpp b/src/server/communication-service/CapioCommunicationService.hpp index d6e7429b8..8839c38c9 100644 --- a/src/server/communication-service/CapioCommunicationService.hpp +++ b/src/server/communication-service/CapioCommunicationService.hpp @@ -40,6 +40,9 @@ class CapioCommunicationService { } else if (backend_name == "FS") { server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, "Selected backend is File System"); capio_backend = new NoBackend(); + } else if (backend_name == "none") { + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, + "Skipping communication backend startup"); } else { START_LOG(gettid(), "call()"); server_println(CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, diff --git a/src/server/utils/parser.hpp b/src/server/utils/parser.hpp index 8b1126fa4..9fba79cfc 100644 --- a/src/server/utils/parser.hpp +++ b/src/server/utils/parser.hpp @@ -1,8 +1,6 @@ #ifndef PARSER_HPP #define PARSER_HPP -#include - std::string parseCLI(int argc, char **argv, char *resolve_prefix) { Logger *log; @@ -154,7 +152,7 @@ std::string parseCLI(int argc, char **argv, char *resolve_prefix) { std::string backend_name = "none"; if (backend) { std::string tmp = args::get(backend); - std::ranges::transform(tmp, tmp.begin(), ::toupper); + std::transform(tmp.begin(), tmp.end(), tmp.begin(), ::toupper); backend_name = tmp; }