diff --git a/CMakeLists.txt b/CMakeLists.txt index ca95ebf7f..c805080e5 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -21,9 +21,14 @@ set(CMAKE_EXPORT_COMPILE_COMMANDS TRUE) set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -Wall -pedantic -O0") set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -O3") +# Silence warning from G++ +if (CMAKE_CXX_COMPILER_ID STREQUAL "GNU") + set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -Wno-terminate") +endif () + # Silence warning from clang if (CMAKE_CXX_COMPILER_ID STREQUAL "Clang") - set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -Wall -pedantic -O0 -Wno-gnu-zero-variadic-macro-arguments") + set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -Wall -pedantic -O0 -Wno-gnu-zero-variadic-macro-arguments -Wno-exceptions") endif (CMAKE_CXX_COMPILER_ID STREQUAL "Clang") ##################################### @@ -49,6 +54,14 @@ find_package(Threads REQUIRED) ##################################### add_compile_definitions(CAPIO_VERSION="${CMAKE_PROJECT_VERSION}") +# We check here and define only here the CAPIO_BUILD_TESTS macro +# as it is required by both src/posix and src/server targets +# Later on we check again to add the tests subproject, which cannot +# be added here for issues with include directories +IF (CAPIO_BUILD_TESTS) + add_compile_definitions(CAPIO_BUILD_TESTS) +ENDIF (CAPIO_BUILD_TESTS) + IF (CAPIO_LOG) IF (CMAKE_BUILD_TYPE STREQUAL "Debug") message(STATUS "Enabling CAPIO logger") @@ -97,5 +110,6 @@ install( IF (CAPIO_BUILD_TESTS) message(STATUS "Building CAPIO test suite") + add_compile_definitions(CAPIO_BUILD_TESTS) add_subdirectory(tests) -ENDIF (CAPIO_BUILD_TESTS) +ENDIF (CAPIO_BUILD_TESTS) \ No newline at end of file diff --git a/src/common/capio/constants.hpp b/src/common/capio/constants.hpp index debed8e5e..0f848ff59 100644 --- a/src/common/capio/constants.hpp +++ b/src/common/capio/constants.hpp @@ -11,9 +11,9 @@ typedef unsigned long long int capio_off64_t; // CAPIO files constants -constexpr size_t CAPIO_DEFAULT_DIR_INITIAL_SIZE = 1024L * 1024 * 1024; -constexpr off64_t CAPIO_DEFAULT_FILE_INITIAL_SIZE = 1024L * 1024 * 1024 * 4; -constexpr std::array CAPIO_DIR_FORBIDDEN_PATHS = { +constexpr size_t CAPIO_DEFAULT_DIR_INITIAL_SIZE = 1024L * 1024 * 1024; +constexpr off64_t CAPIO_DEFAULT_FILE_INITIAL_SIZE = 1024L * 1024 * 1024 * 4; +[[maybe_unused]] constexpr std::array CAPIO_DIR_FORBIDDEN_PATHS = { std::string_view{"/proc/"}, std::string_view{"/sys/"}, std::string_view{"/boot/"}, std::string_view{"/dev/"}, std::string_view{"/var/"}, std::string_view{"/run/"}, std::string_view("/spack/")}; diff --git a/src/common/capio/filesystem.hpp b/src/common/capio/filesystem.hpp index bc7e07c29..24814782b 100644 --- a/src/common/capio/filesystem.hpp +++ b/src/common/capio/filesystem.hpp @@ -105,7 +105,7 @@ inline bool is_capio_path(const std::filesystem::path &path_to_check) { * @param capio_path String to convert * @return std::regex compiled with the corresponding c++ regex */ -[[nodiscard]] static std::regex generateCapioRegex(const std::string &capio_path) { +[[maybe_unused]] [[nodiscard]] static std::regex generateCapioRegex(const std::string &capio_path) { START_LOG(gettid(), "call(capio_path=%s)", capio_path.c_str()); auto computed = replaceSymbol(capio_path, '.', "\\."); computed = replaceSymbol(computed, '/', "\\/"); @@ -115,52 +115,4 @@ inline bool is_capio_path(const std::filesystem::path &path_to_check) { return std::regex(computed); } -/** - * Resolve a possible symbolic link to the absolute path that it points to - * @param input_path - * @return - */ -[[nodiscard]] static std::string resolve_possible_symlink(const std::filesystem::path &input_path) { - START_LOG(capio_syscall(SYS_gettid), "call(path=%s)", input_path.c_str()); - - LOG("Absolute path = %s", input_path.c_str()); - -#ifdef __CAPIO_POSIX - syscall_no_intercept_flag = true; -#endif - - std::filesystem::path resolved, input_abs_path = std::filesystem::absolute(input_path); - - for (const auto &part : input_abs_path) { - resolved /= part; - - if (part == "." || part.empty()) { - continue; - } - if (part == "..") { - resolved = resolved.parent_path(); - continue; - } - if (std::filesystem::is_symlink(resolved)) { - char buf[PATH_MAX]{0}; - if (capio_syscall(SYS_readlink, resolved.c_str(), buf, sizeof(buf) - 1) == -1) { - LOG("File might not exist. path was %s and Error is %s", resolved.c_str(), - strerror(errno)); - continue; - } - - if (std::filesystem::path target(buf); target.is_relative()) { - resolved = resolved.parent_path() / target; - } else { - resolved = target; - } - } - } -#ifdef __CAPIO_POSIX - syscall_no_intercept_flag = false; -#endif - - return resolved; -} - #endif // CAPIO_COMMON_FILESYSTEM_HPP diff --git a/src/common/capio/logger.hpp b/src/common/capio/logger.hpp index 555827eba..c658e6a92 100644 --- a/src/common/capio/logger.hpp +++ b/src/common/capio/logger.hpp @@ -329,7 +329,7 @@ class Logger { gethostname(node_name, HOST_NAME_MAX); \ printf("%s [ %s ] %s\n", CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, node_name, tmp_buf); \ fflush(stdout); \ - exit(EXIT_FAILURE); \ + throw std::runtime_error(std::string(tmp_buf)); \ } #define LOG(message, ...) log.log(message, ##__VA_ARGS__) #define START_LOG(tid, message, ...) \ diff --git a/src/common/capio/semaphore.hpp b/src/common/capio/semaphore.hpp index f1f0f560e..a97ce3b03 100644 --- a/src/common/capio/semaphore.hpp +++ b/src/common/capio/semaphore.hpp @@ -94,7 +94,7 @@ class NamedSemaphore { START_LOG(capio_syscall(SYS_gettid), "call(name=%s)", _name.c_str()); if (sem_wait(_sem) == -1) { - ERR_EXIT(" unable to acquire %s", _name.c_str()); + ERR_EXIT("Unable to acquire %s", _name.c_str()); } } diff --git a/src/posix/handlers/copy_file_range.hpp b/src/posix/handlers/copy_file_range.hpp index a309cd36c..be056cf9c 100644 --- a/src/posix/handlers/copy_file_range.hpp +++ b/src/posix/handlers/copy_file_range.hpp @@ -2,11 +2,13 @@ #define CAPIO_COPY_FILE_RANGE_HPP int copy_file_range_handler(long arg0, long arg1, long arg2, long arg3, long arg4, long arg5, long *result) { - auto tid = static_cast(syscall_no_intercept(SYS_gettid)); - auto fd_in = static_cast(arg0); - auto fd_out = static_cast(arg2); - auto off_in = static_cast(arg1); - auto off_out = static_cast(arg3); + auto tid = static_cast(syscall_no_intercept(SYS_gettid)); + auto fd_in = static_cast(arg0); + auto off_in = static_cast(arg1); + + // auto fd_out = static_cast(arg2); + // auto off_out = static_cast(arg3); + START_LOG(tid, "call()"); // TODO: support in memory read / write diff --git a/src/posix/handlers/posix_readdir.hpp b/src/posix/handlers/posix_readdir.hpp index 9a852596e..b588f93ec 100644 --- a/src/posix/handlers/posix_readdir.hpp +++ b/src/posix/handlers/posix_readdir.hpp @@ -10,8 +10,8 @@ #include // Map &DIR -> -inline std::unordered_map> *opened_directory = - nullptr; +inline std::unordered_map> + *opened_directory = nullptr; inline std::unordered_map *> *directory_items; @@ -50,8 +50,9 @@ inline void init_posix_dirent() { real_readdir = (dirent * (*) (DIR *) ) dlsym(RTLD_NEXT, "readdir"); } - directory_items = new std::unordered_map *>(); - opened_directory = new std::unordered_map>(); + directory_items = new std::unordered_map *>(); + opened_directory = + new std::unordered_map>(); dirent_curr_dir = new dirent64(); dirent_parent_dir = new dirent64(); @@ -65,14 +66,14 @@ inline void init_posix_dirent() { syscall_no_intercept_flag = false; } -inline int load_files_from_directory(const char *path) { +inline unsigned long int load_files_from_directory(const char *path) { START_LOG(capio_syscall(SYS_gettid), "call(path=%s)", path); syscall_no_intercept_flag = true; dirent64 *entry; - DIR *dir = real_opendir(path); - int count = 0; + DIR *dir = real_opendir(path); + unsigned long int count = 0; if (directory_items->find(path) == directory_items->end()) { LOG("Directory vector not present. Adding it at path %s", path); diff --git a/src/posix/utils/cache/read_request_cache_fs.hpp b/src/posix/utils/cache/read_request_cache_fs.hpp index d8a6f07c2..3cb446dcf 100644 --- a/src/posix/utils/cache/read_request_cache_fs.hpp +++ b/src/posix/utils/cache/read_request_cache_fs.hpp @@ -59,7 +59,7 @@ class ReadRequestCacheFS { return; } - if (end_of_read > max_read) { + if (static_cast(end_of_read) > max_read) { LOG("[cache] end_of_read > max_read. Performing server request"); max_read = _read_request(current_path, end_of_read, tid, fd); LOG("[cache] Obtained value from server is %llu", max_read); diff --git a/src/posix/utils/cache/read_request_cache_mem.hpp b/src/posix/utils/cache/read_request_cache_mem.hpp index 16661c02a..bf529a8d5 100644 --- a/src/posix/utils/cache/read_request_cache_mem.hpp +++ b/src/posix/utils/cache/read_request_cache_mem.hpp @@ -85,7 +85,7 @@ class ReadRequestCacheMEM { _real_file_size_commmitted = -1; } - long read(const int fd, void *buffer, off64_t count) { + long read(const int fd, void *buffer, capio_off64_t count) { START_LOG(capio_syscall(SYS_gettid), "call(fd=%d, count=%ld)", fd, count); long actual_read_size = 0; diff --git a/src/posix/utils/cache/write_request_cache_fs.hpp b/src/posix/utils/cache/write_request_cache_fs.hpp index 096c5a941..a84dbcd06 100644 --- a/src/posix/utils/cache/write_request_cache_fs.hpp +++ b/src/posix/utils/cache/write_request_cache_fs.hpp @@ -2,8 +2,8 @@ #define WRITE_REQUEST_CACHE_FS_HPP class WriteRequestCacheFS { - int current_fd = -1; - long long current_size = 0; + int current_fd = -1; + capio_off64_t current_size = 0; const capio_off64_t _max_size; diff --git a/src/posix/utils/filesystem.hpp b/src/posix/utils/filesystem.hpp index c64d8c4e6..a58297566 100644 --- a/src/posix/utils/filesystem.hpp +++ b/src/posix/utils/filesystem.hpp @@ -300,4 +300,53 @@ inline void set_current_dir(const std::filesystem::path &cwd) { current_dir = std::make_unique(cwd); } +/** + * Resolve a possible symbolic link to the absolute path that it points to + * @param input_path + * @return + */ +[[maybe_unused]] [[nodiscard]] static std::string +resolve_possible_symlink(const std::filesystem::path &input_path) { + START_LOG(capio_syscall(SYS_gettid), "call(path=%s)", input_path.c_str()); + + LOG("Absolute path = %s", input_path.c_str()); + +#ifdef __CAPIO_POSIX + syscall_no_intercept_flag = true; +#endif + + std::filesystem::path resolved, input_abs_path = std::filesystem::absolute(input_path); + + for (const auto &part : input_abs_path) { + resolved /= part; + + if (part == "." || part.empty()) { + continue; + } + if (part == "..") { + resolved = resolved.parent_path(); + continue; + } + if (std::filesystem::is_symlink(resolved)) { + char buf[PATH_MAX]{0}; + if (capio_syscall(SYS_readlink, resolved.c_str(), buf, sizeof(buf) - 1) == -1) { + LOG("File might not exist. path was %s and Error is %s", resolved.c_str(), + strerror(errno)); + continue; + } + + if (std::filesystem::path target(buf); target.is_relative()) { + resolved = resolved.parent_path() / target; + } else { + resolved = target; + } + } + } +#ifdef __CAPIO_POSIX + syscall_no_intercept_flag = false; +#endif + + return resolved; +} + #endif // CAPIO_POSIX_UTILS_FILESYSTEM_HPP \ No newline at end of file diff --git a/src/posix/utils/requests.hpp b/src/posix/utils/requests.hpp index 2f7847b10..a7041d539 100644 --- a/src/posix/utils/requests.hpp +++ b/src/posix/utils/requests.hpp @@ -32,8 +32,7 @@ inline void init_client() { } /** - * Perform handshake. server returns the amount of paths that needs to be obtained from the server - * to know which files are going to be treated during write and read operations inside memory + * Perform handshake. * @param tid * @param pid * @param app_name @@ -51,9 +50,29 @@ inline void handshake_request(const long tid, const long pid, const std::string char req[CAPIO_REQ_MAX_SIZE]; sprintf(req, "%04d %ld %ld %s", CAPIO_REQUEST_HANDSHAKE, tid, pid, app_name.c_str()); buf_requests->write(req, CAPIO_REQ_MAX_SIZE); + +#ifndef CAPIO_BUILD_TESTS + LOG("Waiting for response from capio_server"); + /* + * The handshake request must be blocking ONLY when not building tests. This is because when + * starting unit tests, the binary is loaded with libcapio_posix.so underneath thus performing + * a handshake request. If the handshake is blocking, then the capio_server binary cannot be + * started as the whole process is waiting for a handshake. + */ + if (bufs_response->at(pid)->read() == 0) { + ERR_EXIT("Error: handshake request sent while capio_server is shutting down!"); + } +#endif + LOG("Sent handshake request"); } +/** + * File in memory requests: server returns the amount of paths that needs to be obtained from the + * server to know which files are going to be treated during write and read operations inside memory + * @param pid + * @return + */ inline std::vector *file_in_memory_request(const long pid) { START_LOG(capio_syscall(SYS_gettid), "call(pid=%ld)", pid); char req[CAPIO_REQ_MAX_SIZE]; @@ -64,7 +83,7 @@ inline std::vector *file_in_memory_request(const long pid) { capio_off64_t files_to_read_from_queue = bufs_response->at(pid)->read(); LOG("Need to read %llu files from data queues", files_to_read_from_queue); const auto regex_vector = new std::vector; - for (int i = 0; i < files_to_read_from_queue; i++) { + for (capio_off64_t i = 0; i < files_to_read_from_queue; i++) { LOG("Reading file number %d", i); auto file = new char[PATH_MAX]{}; stc_queue->read(file, PATH_MAX); diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt index 31d0fbf9b..ba3ff0e00 100644 --- a/src/server/CMakeLists.txt +++ b/src/server/CMakeLists.txt @@ -15,6 +15,9 @@ FetchContent_Declare( GIT_REPOSITORY https://github.com/Taywee/args.git GIT_TAG 6.4.7 ) +set(ARGS_BUILD_EXAMPLE OFF CACHE INTERNAL "") +set(ARGS_BUILD_UNITTESTS OFF CACHE INTERNAL "") + FetchContent_Declare( simdjson GIT_REPOSITORY https://github.com/simdjson/simdjson.git diff --git a/src/server/capio-cl-engine/capio_cl_engine.hpp b/src/server/capio-cl-engine/capio_cl_engine.hpp index 1247ce42a..22fc1efe4 100644 --- a/src/server/capio-cl-engine/capio_cl_engine.hpp +++ b/src/server/capio-cl-engine/capio_cl_engine.hpp @@ -28,7 +28,7 @@ class CapioCLEngine { bool>> // Store File on FS. true = memory [11] _locations; - static std::string truncateLastN(const std::string &str, const int n) { + static std::string truncateLastN(const std::string &str, const std::size_t n) { return str.length() > n ? "[..] " + str.substr(str.length() - n) : str; } @@ -41,13 +41,13 @@ class CapioCLEngine { "==========================================================|" << std::endl << "|" << std::setw(135) << "|" << std::endl - << "| Parsed configuration file for workflow: \e[1;36m" << workflow_name - << std::setw(94 - workflow_name.length()) << "\e[0m |" << std::endl + << "| Parsed configuration file for workflow: \033[1;36m" << workflow_name + << std::setw(94 - workflow_name.length()) << "\033[0m |" << std::endl << "|" << std::setw(135) << "|" << std::endl - << "| File color legend: \e[48;5;034m \e[0m File stored in memory" + << "| File color legend: \033[48;5;034m \033[0m File stored in memory" << std::setw(83) << "|" << std::endl << "| " - << "\e[48;5;172m \e[0m File stored on file system" << std::setw(78) << "|" + << "\033[48;5;172m \033[0m File stored on file system" << std::setw(78) << "|" << std::endl << "|============================================================================" "==========================================================|" @@ -63,8 +63,9 @@ class CapioCLEngine { << std::endl; for (auto itm : _locations) { - std::string color_preamble = std::get<11>(itm.second) ? "\e[38;5;034m" : "\e[38;5;172m"; - std::string color_post = "\e[0m"; + std::string color_preamble = + std::get<11>(itm.second) ? "\033[38;5;034m" : "\033[38;5;172m"; + std::string color_post = "\033[0m"; std::string name_trunc = truncateLastN(itm.first, 12); auto kind = std::get<6>(itm.second) ? "F" : "D"; @@ -83,7 +84,7 @@ class CapioCLEngine { n_files = "N.A."; } - for (int i = 0; i <= rowCount; i++) { + for (std::size_t i = 0; i <= rowCount; i++) { std::string prod, cons; if (i > 0) { std::cout << "| | | "; diff --git a/src/server/capio-cl-engine/json_parser.hpp b/src/server/capio-cl-engine/json_parser.hpp index 0810546a2..d2897ddbb 100644 --- a/src/server/capio-cl-engine/json_parser.hpp +++ b/src/server/capio-cl-engine/json_parser.hpp @@ -449,7 +449,7 @@ class JsonParser { } else { for (auto file : storage_memory) { std::string_view file_str; - file.get_string().get(file_str); + [[maybe_unused]] const auto error = file.get_string().get(file_str); std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_JSON << " [ " << node_name << " ] " << "Setting file " << file_str << " to be stored in memory" << std::endl; @@ -466,7 +466,7 @@ class JsonParser { } else { for (auto file : storage_fs) { std::string_view file_str; - file.get_string().get(file_str); + [[maybe_unused]] const auto error = file.get_string().get(file_str); std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_JSON << " [ " << node_name << " ] " << "Setting file " << file_str << " to be stored on file system" << std::endl; diff --git a/src/server/capio_server.cpp b/src/server/capio_server.cpp index d5fc87afc..d3036094d 100644 --- a/src/server/capio_server.cpp +++ b/src/server/capio_server.cpp @@ -24,6 +24,7 @@ * Variables required to be globally available * to all classes and subclasses. */ +bool termination_phase = false; std::string workflow_name; pid_t CAPIO_SERVER_MAIN_PID; inline bool StoreOnlyInMemory = false; @@ -75,5 +76,7 @@ int main(int argc, char **argv) { request_handlers_engine->start(); + sig_term_handler(SIGTERM, nullptr, nullptr); + return 0; } \ No newline at end of file diff --git a/src/server/client-manager/client_manager.hpp b/src/server/client-manager/client_manager.hpp index ac42d37bf..779948008 100644 --- a/src/server/client-manager/client_manager.hpp +++ b/src/server/client-manager/client_manager.hpp @@ -122,6 +122,8 @@ class ClientManager { } return CAPIO_DEFAULT_APP_NAME; } + + inline auto get_connected_posix_client() { return bufs_response->size(); } }; inline ClientManager *client_manager; diff --git a/src/server/client-manager/handlers/handshake.hpp b/src/server/client-manager/handlers/handshake.hpp index ee17a9498..a47693585 100644 --- a/src/server/client-manager/handlers/handshake.hpp +++ b/src/server/client-manager/handlers/handshake.hpp @@ -16,8 +16,27 @@ inline void handshake_handler(const char *const str) { char app_name[1024]; sscanf(str, "%d %d %s", &tid, &pid, app_name); START_LOG(gettid(), "call(tid=%ld, pid=%ld, app_name=%s)", tid, pid, app_name); + client_manager->register_client(app_name, tid); storage_service->register_client(app_name, tid); + /* + * The handshake request must be blocking ONLY when not building tests. This is because when + * starting unit tests, the binary is loaded with libcapio_posix.so underneath thus performing + * a handshake request. If the handshake is blocking, then the capio_server binary cannot be + * started as the whole process is waiting for a handshake. + */ +#ifndef CAPIO_BUILD_TESTS + // If not on termination phase, return 1. Otherwise, return 0 + // if - is returned posix application will terminate + if (!termination_phase) { + // Unlock client waiting to start + LOG("Allowing handshake to continue"); + client_manager->reply_to_client(tid, 1); + } else { + LOG("Termination phase is in progress. ignoring further handshakes."); + client_manager->reply_to_client(tid, 0); + } +#endif } #endif // HANDSHAKE_HPP diff --git a/src/server/client-manager/handlers/read.hpp b/src/server/client-manager/handlers/read.hpp index 7d2cc1204..e510d6611 100644 --- a/src/server/client-manager/handlers/read.hpp +++ b/src/server/client-manager/handlers/read.hpp @@ -49,7 +49,7 @@ inline void read_handler(const char *const str) { } inline void read_mem_handler(const char *const str) { - pid_t tid; + long int tid; capio_off64_t read_size, client_cache_line_size, read_begin_offset; int use_cache; char path[PATH_MAX]; diff --git a/src/server/client-manager/handlers/write.hpp b/src/server/client-manager/handlers/write.hpp index d02ea615c..2448c41b0 100644 --- a/src/server/client-manager/handlers/write.hpp +++ b/src/server/client-manager/handlers/write.hpp @@ -29,7 +29,7 @@ inline void write_handler(const char *const str) { } inline void write_mem_handler(const char *const str) { - pid_t tid; + long int tid; char path[PATH_MAX]; off64_t write_size; capio_off64_t offset; diff --git a/src/server/client-manager/request_handler_engine.hpp b/src/server/client-manager/request_handler_engine.hpp index 9f292e069..d1ec5fcb6 100644 --- a/src/server/client-manager/request_handler_engine.hpp +++ b/src/server/client-manager/request_handler_engine.hpp @@ -62,8 +62,9 @@ class RequestHandlerEngine { */ inline auto read_next_request(char *str) const { char req[CAPIO_REQ_MAX_SIZE]; + START_LOG(gettid(), "call()"); buf_requests->read(req); - START_LOG(gettid(), "call(req=%s)", req); + LOG("req=%s", req); int code = -1; auto [ptr, ec] = std::from_chars(req, req + 4, code); if (ec == std::errc()) { @@ -106,13 +107,31 @@ class RequestHandlerEngine { * the posix clients (aggregated) and handle the response * */ - [[noreturn]] void start() const { + void start() const { START_LOG(gettid(), "call()\n\n"); - auto str = std::unique_ptr(new char[CAPIO_REQ_MAX_SIZE]); - while (true) { + const auto str = std::unique_ptr(new char[CAPIO_REQ_MAX_SIZE]); + int code; + + /* When in termination_phase, we empty all requests while clients are connected. as soon + * as queues are empty and the server ha removed all requests, it calls the termination + * handler to stop the server execution + */ + while (!termination_phase || client_manager->get_connected_posix_client() > 0) { LOG(CAPIO_LOG_SERVER_REQUEST_START); - int code = read_next_request(str.get()); + try { + code = read_next_request(str.get()); + } catch (const std::exception &e) { + if (termination_phase) { + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << node_name << " ] " + << "Termination phase is in progress... Ignoring Exception likely " + "thrown while receiving SIGUSR1" + << std::endl; + continue; + } + throw; + } + try { request_handlers[code](str.get()); } catch (const std::exception &exception) { @@ -134,11 +153,13 @@ class RequestHandlerEngine { << std::endl << std::endl; - ERR_EXIT("%s", exception.what()); + exit(EXIT_FAILURE); } LOG(CAPIO_LOG_SERVER_REQUEST_END); } + + LOG("Terminated handling of posix clients"); } }; diff --git a/src/server/file-manager/file_manager_impl.hpp b/src/server/file-manager/file_manager_impl.hpp index f92e43e5a..b0efa998b 100644 --- a/src/server/file-manager/file_manager_impl.hpp +++ b/src/server/file-manager/file_manager_impl.hpp @@ -173,7 +173,7 @@ inline void CapioFileManager::_unlockThreadAwaitingData( client_manager->reply_to_client(item->first, filesize); // remove thread from map LOG("Removing thread %ld from threads awaiting on data", item->first); - item = item = pids_awaiting.erase(item); + item = pids_awaiting.erase(item); } else if (isCommitted(path)) { @@ -181,7 +181,7 @@ inline void CapioFileManager::_unlockThreadAwaitingData( client_manager->reply_to_client(item->first, ULLONG_MAX); // remove thread from map LOG("Removing thread %ld from threads awaiting on data", item->first); - item = item = pids_awaiting.erase(item); + item = pids_awaiting.erase(item); } else { // DEFAULT: no condition to unlock has occurred, hence wait... @@ -446,7 +446,7 @@ inline void CapioFileManager::checkDirectoriesNFiles() const { long count = 0; if (std::filesystem::exists(path_config)) { auto iterator = std::filesystem::directory_iterator(path_config); - for (const auto &entry : iterator) { + for ([[maybe_unused]] const auto &entry : iterator) { ++count; } } diff --git a/src/server/storage-service/capio_storage_service.hpp b/src/server/storage-service/capio_storage_service.hpp index d5722617d..a0c0f8e91 100644 --- a/src/server/storage-service/capio_storage_service.hpp +++ b/src/server/storage-service/capio_storage_service.hpp @@ -107,7 +107,7 @@ class CapioStorageService { auto file_size = sizeOf(path); - for (const auto [offset, size, thread_id] : threads) { + for (auto &[offset, size, thread_id] : threads) { if (file_size >= offset + size) { reply_to_client(thread_id, path, offset, size); } diff --git a/src/server/utils/signals.hpp b/src/server/utils/signals.hpp index 1541b0042..e06479880 100644 --- a/src/server/utils/signals.hpp +++ b/src/server/utils/signals.hpp @@ -46,24 +46,35 @@ inline void sig_term_handler(int signum, siginfo_t *info, void *ptr) { exit(EXIT_SUCCESS); } +inline void sig_usr1_handler(int signum, siginfo_t *info, void *ptr) { + if (gettid() != CAPIO_SERVER_MAIN_PID) { + return; + } + START_LOG(gettid(), "call(signal=[%d] (%s) from process with pid=%ld)", signum, + strsignal(signum), info != nullptr ? info->si_pid : -1); + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << node_name << " ] " + << "Received request for graceful shutdown!" << std::endl; + termination_phase = true; +} + /** * @brief Set the up signal handlers * */ inline void setup_signal_handlers() { START_LOG(gettid(), "call()"); - static struct sigaction sigact; + static struct sigaction sigact, sigact_usr1; memset(&sigact, 0, sizeof(sigact)); - sigact.sa_sigaction = sig_term_handler; - sigact.sa_flags = SA_SIGINFO; - int res = sigaction(SIGTERM, &sigact, nullptr); - res = res | sigaction(SIGILL, &sigact, nullptr); - res = res | sigaction(SIGABRT, &sigact, nullptr); - res = res | sigaction(SIGFPE, &sigact, nullptr); - res = res | sigaction(SIGSEGV, &sigact, nullptr); - res = res | sigaction(SIGQUIT, &sigact, nullptr); - res = res | sigaction(SIGPIPE, &sigact, nullptr); - res = res | sigaction(SIGINT, &sigact, nullptr); + memset(&sigact, 0, sizeof(sigact_usr1)); + sigact.sa_sigaction = sig_term_handler; + sigact.sa_flags = SA_SIGINFO; + sigact_usr1.sa_sigaction = sig_usr1_handler; + sigact_usr1.sa_flags = SA_SIGINFO; + int res = sigaction(SIGTERM, &sigact, nullptr) | sigaction(SIGILL, &sigact, nullptr) | + sigaction(SIGABRT, &sigact, nullptr) | sigaction(SIGFPE, &sigact, nullptr) | + sigaction(SIGSEGV, &sigact, nullptr) | sigaction(SIGQUIT, &sigact, nullptr) | + sigaction(SIGPIPE, &sigact, nullptr) | sigaction(SIGINT, &sigact, nullptr) | + sigaction(SIGUSR1, &sigact_usr1, nullptr); if (res == -1) { ERR_EXIT("sigaction for SIGTERM"); } diff --git a/tests/multinode/integration/src/common.hpp b/tests/multinode/integration/src/common.hpp index f8d2a647b..dd6ea314d 100644 --- a/tests/multinode/integration/src/common.hpp +++ b/tests/multinode/integration/src/common.hpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include @@ -49,11 +50,12 @@ static const char *phrases[] = { static const int maxphraselen = 1024; static const int maxfilename = 32; static const int maxnumfiles = 10000; -static char fmtin[] = "%s/infile_%05d.dat"; // 5 is the number of digits of maxnumfiles -static char fmtout[] = "%s/outfile_%05d.dat"; static const int REALLOC_BATCH = 10485760; // 10MB static const int REDUCE_CHUNK = 10240; // 10K static const int IO_BUFFER = 1048576; // 1MB +static char fmtout[] = "%s/outfile_%05d.dat"; +[[maybe_unused]] static char fmtin[] = + "%s/infile_%05d.dat"; // 5 is the number of digits of maxnumfiles // reading file data into memory static inline char *readdata(FILE *fp, char *dataptr, size_t *datalen, size_t *datacapacity) { @@ -107,15 +109,15 @@ static inline double diffmsec(const struct timeval a, const struct timeval b) { return ((double) (sec * 1000) + ((double) usec) / 1000.0); } -static int writedata(char *dataptr, size_t datalen, float percent, char *destdir, ssize_t dstart, - ssize_t dfiles) { +[[maybe_unused]] static int writedata(char *dataptr, size_t datalen, float percent, char *destdir, + ssize_t dstart, ssize_t dfiles) { int error = 0; FILE **fp = (FILE **) calloc(sizeof(FILE *), dfiles); if (!fp) { perror("malloc"); return -1; } - char filepath[strlen(destdir) + maxfilename]; + char filepath[2 * PATH_MAX]{0}; // opening (truncating) all files for (int j = 0, i = 0 + dstart; i < (dfiles + dstart); ++i, ++j) { @@ -152,7 +154,7 @@ static int writedata(char *dataptr, size_t datalen, float percent, char *destdir return error; } -static char *getrandomphrase(char *buffer, size_t len) { +[[maybe_unused]] static char *getrandomphrase(char *buffer, size_t len) { static int phrases_entry = sizeof(phrases) / sizeof(phrases[0]); bzero(buffer, len); @@ -219,7 +221,7 @@ char **build_env(char **envp) { } char **cleaned_env = (char **) malloc((vars.size() + 2) * sizeof(uintptr_t)); - for (int i = 0; i < vars.size(); i++) { + for (long unsigned int i = 0; i < vars.size(); i++) { cleaned_env[i] = strdup(envp[i]); } cleaned_env[vars.size()] = strdup("LD_PRELOAD="); diff --git a/tests/multinode/integration/src/mapreduce.cpp b/tests/multinode/integration/src/mapreduce.cpp index 1fdaccb70..e2afd0356 100644 --- a/tests/multinode/integration/src/mapreduce.cpp +++ b/tests/multinode/integration/src/mapreduce.cpp @@ -22,13 +22,13 @@ int mapReduceFunction(char *sourcedirname, ssize_t sstart, ssize_t sfiles, char EXPECT_GT(percent, 0); EXPECT_LE(percent, 1); - char filepath[strlen(sourcedirname) + maxfilename]; + char filepath[2 * PATH_MAX]{0}; // concatenating all files in memory (dataptr) for (int i = 0 + sstart; i < (sfiles + sstart); ++i) { sprintf(filepath, fmtin, sourcedirname, i); FILE *fp = fopen(filepath, "r"); - EXPECT_TRUE(fp); + EXPECT_NE(fileno(fp), -1); char *ptr = readdata(fp, dataptr, &datalen, &datacapacity); EXPECT_NE(ptr, nullptr); diff --git a/tests/multinode/integration/src/merge.cpp b/tests/multinode/integration/src/merge.cpp index 60a496ca0..f76558418 100644 --- a/tests/multinode/integration/src/merge.cpp +++ b/tests/multinode/integration/src/merge.cpp @@ -1,5 +1,7 @@ #include "common.hpp" +#include + int mergeFunction(ssize_t nfiles, char *sourcedir, char *destdir) { struct timeval before, after; @@ -16,11 +18,11 @@ int mergeFunction(ssize_t nfiles, char *sourcedir, char *destdir) { EXPECT_NE(stat(destdir, &statbuf), -1); EXPECT_TRUE(S_ISDIR(statbuf.st_mode)); - char filepath[strlen(sourcedir) + maxfilename]; + char filepath[2 * PATH_MAX]{0}; for (int i = 0; i < nfiles; ++i) { sprintf(filepath, fmtout, sourcedir, i); FILE *fp = fopen(filepath, "r"); - EXPECT_TRUE(fp); + EXPECT_NE(fileno(fp), -1); char *ptr = readdata(fp, dataptr, &datalen, &datacapacity); EXPECT_NE(ptr, nullptr); @@ -29,7 +31,7 @@ int mergeFunction(ssize_t nfiles, char *sourcedir, char *destdir) { fclose(fp); } - char resultpath[strlen(destdir) + strlen("/result.dat")]; + char resultpath[2 * PATH_MAX]{0}; sprintf(resultpath, "%s/result.dat", destdir); FILE *fp = fopen(resultpath, "w"); EXPECT_TRUE(fp); diff --git a/tests/multinode/integration/src/split.cpp b/tests/multinode/integration/src/split.cpp index a1d82a797..a3d9adb96 100644 --- a/tests/multinode/integration/src/split.cpp +++ b/tests/multinode/integration/src/split.cpp @@ -20,7 +20,7 @@ int splitFunction(ssize_t nlines, ssize_t nfiles, char *dirname) { EXPECT_TRUE(buffer); int error = 0; - char filepath[strlen(dirname) + maxfilename]; + char filepath[2 * PATH_MAX]{0}; // opening (truncating) all files for (int i = 0; i < nfiles; ++i) { sprintf(filepath, fmtin, dirname, i); diff --git a/tests/unit/MemoryFiles/src/main.cpp b/tests/unit/MemoryFiles/src/main.cpp index 4c51b8cff..e37a74275 100644 --- a/tests/unit/MemoryFiles/src/main.cpp +++ b/tests/unit/MemoryFiles/src/main.cpp @@ -36,7 +36,7 @@ char **build_env(char **envp) { } char **cleaned_env = (char **) malloc((vars.size() + 2) * sizeof(uintptr_t)); - for (int i = 0; i < vars.size(); i++) { + for (size_t i = 0; i < vars.size(); i++) { cleaned_env[i] = strdup(envp[i]); } diff --git a/tests/unit/server/src/CapioCacheSPSCQueueTests.hpp b/tests/unit/server/src/CapioCacheSPSCQueueTests.hpp index 4e9b94843..569730884 100644 --- a/tests/unit/server/src/CapioCacheSPSCQueueTests.hpp +++ b/tests/unit/server/src/CapioCacheSPSCQueueTests.hpp @@ -124,7 +124,7 @@ TEST(CapioCacheSPSCQueue, TestWriteCacheSPSCQueueAndCapioFile) { CapioMemoryFile *testFile = new CapioMemoryFile(test_file_name); testFile->readFromQueue(*cts_queue, 0, long_test_length); char *readBuf = new char[readBufSize]{}; - for (auto offset = 0; offset < long_test_length; offset += readBufSize) { + for (unsigned long offset = 0; offset < long_test_length; offset += readBufSize) { testFile->readData(readBuf, offset, readBufSize); EXPECT_EQ(strncmp(readBuf, SOURCE_TEST_TEXT + offset, readBufSize), 0); } @@ -149,8 +149,8 @@ TEST(CapioCacheSPSCQueue, TestWriteCacheSPSCQueueAndCapioFileWithRequest) { std::thread server_thread([readBufSize] { char *req = new char[CAPIO_REQ_MAX_SIZE]; - int code, fd; - pid_t tid; + int code; + long int tid; char path[PATH_MAX]; off64_t write_size; capio_off64_t offset; @@ -202,7 +202,7 @@ TEST(CapioCacheSPSCQueue, TestWriteCacheSPSCQueueAndCapioFileWithRequestAndSeek) std::thread t3([readBufSize, long_test_length] { char *req = new char[CAPIO_REQ_MAX_SIZE]; int code; - pid_t tid; + long int tid; char path[PATH_MAX]; off64_t write_size; capio_off64_t offset, total_read_size = 0; @@ -274,9 +274,8 @@ TEST(CapioCacheSPSCQueue, TestReadCacheWithSpscQueueRead) { while (total_data_sent < long_test_length) { char req[CAPIO_REQ_MAX_SIZE]; - char file[1024]; int code; - pid_t tid; + long int tid; capio_off64_t read_size, client_cache_line_size, read_begin_offset; buf_requests->read(req, CAPIO_REQ_MAX_SIZE); @@ -285,7 +284,7 @@ TEST(CapioCacheSPSCQueue, TestReadCacheWithSpscQueueRead) { strcpy(req, ptr + 1); EXPECT_EQ(code, CAPIO_REQUEST_READ_MEM); sscanf(req, "%ld %llu %llu %llu", &tid, &read_begin_offset, &read_size, - &client_cache_line_size, file); + &client_cache_line_size); auto size_to_send = read_size < client_cache_line_size ? read_size : client_cache_line_size; @@ -325,9 +324,8 @@ TEST(CapioCacheSPSCQueue, TestReadCacheWithSpscQueueReadWithCapioFile) { while (total_data_sent < long_test_length) { char req[CAPIO_REQ_MAX_SIZE]; - char file[1024]; int code; - pid_t tid; + long int tid; capio_off64_t read_size, client_cache_line_size, read_begin_offset; buf_requests->read(req, CAPIO_REQ_MAX_SIZE); @@ -336,7 +334,7 @@ TEST(CapioCacheSPSCQueue, TestReadCacheWithSpscQueueReadWithCapioFile) { strcpy(req, ptr + 1); EXPECT_EQ(code, CAPIO_REQUEST_READ_MEM); sscanf(req, "%ld %llu %llu %llu", &tid, &read_begin_offset, &read_size, - &client_cache_line_size, file); + &client_cache_line_size); auto size_to_send = read_size < client_cache_line_size ? read_size : client_cache_line_size; @@ -380,7 +378,7 @@ TEST(CapioCacheSPSCQueue, TestReadCacheWithSpscQueueReadWithCapioFileAndSeek) { char req[CAPIO_REQ_MAX_SIZE]{0}; char file[1024]; int code, use_cache; - pid_t tid; + long int tid; capio_off64_t read_size, client_cache_line_size, read_begin_offset; buf_requests->read(req, CAPIO_REQ_MAX_SIZE); diff --git a/tests/unit/server/src/CapioFileTests.hpp b/tests/unit/server/src/CapioFileTests.hpp index d5a7ae40e..9380a5554 100644 --- a/tests/unit/server/src/CapioFileTests.hpp +++ b/tests/unit/server/src/CapioFileTests.hpp @@ -59,7 +59,7 @@ TEST(CapioMemoryFileTest, TestWriteAndRead) { // 8 MB buffer const auto buffer = new std::vector(); buffer->reserve(FILE_SIZE); - for (ssize_t i = 0; i < buffer->size(); i++) { + for (size_t i = 0; i < buffer->size(); i++) { buffer->push_back(static_cast('a' + i % 26)); } @@ -87,7 +87,7 @@ TEST(CapioMemoryFileTest, TestWriteAndReadDifferentPageStartOffset) { // 8 MB buffer const auto buffer = new std::vector(); buffer->reserve(FILE_SIZE); - for (ssize_t i = 0; i < buffer->size(); i++) { + for (size_t i = 0; i < buffer->size(); i++) { buffer->push_back(static_cast('a' + i % 26)); } @@ -117,7 +117,7 @@ TEST(CapioMemoryFileTest, TestThreadsSpscqueueAndCapioMemFile) { // 8 MB buffer auto buffer = new char[10 * FILE_SIZE]; - for (ssize_t i = 0; i < 10 * FILE_SIZE; i++) { + for (size_t i = 0; i < 10 * FILE_SIZE; i++) { buffer[i] = 'a' + i % 26; } diff --git a/tests/unit/syscall/src/dirent.cpp b/tests/unit/syscall/src/dirent.cpp index e8104e528..58e71be80 100644 --- a/tests/unit/syscall/src/dirent.cpp +++ b/tests/unit/syscall/src/dirent.cpp @@ -71,7 +71,7 @@ TEST(SystemCallTest, TestDirentsOnCapioDir) { break; } - for (size_t bpos = 0, i = 0; bpos < nread && i < 10; i++) { + for (long int bpos = 0, i = 0; bpos < nread && i < 10; i++) { auto d = (struct linux_dirent64 *) (buf + bpos); EXPECT_NE(std::find(expectedNames.begin(), expectedNames.end(), d->d_name), diff --git a/tests/unit/syscall/src/main.cpp b/tests/unit/syscall/src/main.cpp index c3d8572e1..338822761 100644 --- a/tests/unit/syscall/src/main.cpp +++ b/tests/unit/syscall/src/main.cpp @@ -29,7 +29,7 @@ char **build_env(char **envp) { } char **cleaned_env = (char **) malloc((vars.size() + 2) * sizeof(uintptr_t)); - for (int i = 0; i < vars.size(); i++) { + for (size_t i = 0; i < vars.size(); i++) { cleaned_env[i] = strdup(envp[i]); } cleaned_env[vars.size()] = strdup("LD_PRELOAD=");