diff --git a/capio-common/capio/logger.hpp b/capio-common/capio/logger.hpp index 3de480fb7..d71705a01 100644 --- a/capio-common/capio/logger.hpp +++ b/capio-common/capio/logger.hpp @@ -327,7 +327,7 @@ class Logger { sprintf(tmp_buf, message, ##__VA_ARGS__); \ char node_name[HOST_NAME_MAX]{0}; \ gethostname(node_name, HOST_NAME_MAX); \ - printf("%s [ %s ] %s\n", CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, node_name, tmp_buf); \ + printf("%s %s] %s\n", CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, node_name, tmp_buf); \ fflush(stdout); \ throw std::runtime_error(std::string(tmp_buf)); \ } diff --git a/capio-server/CMakeLists.txt b/capio-server/CMakeLists.txt index ec4601cc1..7978f5370 100644 --- a/capio-server/CMakeLists.txt +++ b/capio-server/CMakeLists.txt @@ -30,8 +30,14 @@ FetchContent_Declare( GIT_TAG fbd8aa924b916c6578963af315fdb5f10c3969ad ) +FetchContent_Declare( + httplib + GIT_REPOSITORY https://github.com/yhirose/cpp-httplib.git + GIT_TAG v0.26.0 +) + -FetchContent_MakeAvailable(args simdjson mtcl) +FetchContent_MakeAvailable(args simdjson mtcl httplib) ##################################### # Target definition @@ -52,6 +58,7 @@ target_include_directories(${TARGET_NAME} PRIVATE ${args_SOURCE_DIR} ${simdjson_SOURCE_DIR} ${mtcl_SOURCE_DIR}/include + ${httplib_SOURCE_DIR} ) ##################################### diff --git a/capio-server/capio_server.cpp b/capio-server/capio_server.cpp index 7eefa17f2..96b264394 100644 --- a/capio-server/capio_server.cpp +++ b/capio-server/capio_server.cpp @@ -16,6 +16,7 @@ #include "capio/env.hpp" #include "capio/logger.hpp" #include "capio/semaphore.hpp" +#include "include/api-server/api-server.hpp" #include #include @@ -43,6 +44,7 @@ int main(int argc, char **argv) { capio_cl_engine = JsonParser::parse(config_path, std::filesystem::path(resolve_prefix)); shm_canary = new CapioShmCanary(capio_global_configuration->workflow_name); + api_server = new CapioAPIServer(6666); file_manager = new CapioFileManager(); fs_monitor = new FileSystemMonitor(); client_manager = new ClientManager(); diff --git a/capio-server/include/api-server/api-server.hpp b/capio-server/include/api-server/api-server.hpp new file mode 100644 index 000000000..16a554f75 --- /dev/null +++ b/capio-server/include/api-server/api-server.hpp @@ -0,0 +1,59 @@ +#ifndef CAPIO_API_SERVER_HPP +#define CAPIO_API_SERVER_HPP +#include +#include +#include +#include +#include + +inline std::unordered_map api_server_routes_descriptions; + +#define REGISTER_GET_ROUTE(route_name, route_description, callback) \ + api_server_routes_descriptions[route_name] = route_description; \ + svr->Get(route_name, callback); + +class CapioAPIServer { + typedef std::unordered_map ResponseMap; + + std::thread *th; + httplib::Server httplib_server_instance; + static void api_server_main_func(int server_port, httplib::Server *svr); + + static std::string + build_json_response(const std::unordered_map &map) { + + // TODO: implement a correct json mapping in c++ + + ResponseMap response_map = map; + response_map["identity"] = std::string("{ \"hostname\":\"") + + capio_global_configuration->node_name + "\",\"wf_name\":\"" + + capio_global_configuration->workflow_name + "\"},"; + + std::string json_response = "{"; + + for (auto &[key, value] : response_map) { + if (key == "identity") { + json_response += "\"" + key + "\":" + value; + } else { + json_response += "\"" + key + "\":\""; + json_response += value + "\","; + } + } + + // Remove last comma to ensure json validity + if (json_response.back() == ',') { + json_response.pop_back(); + } + + json_response += "}\n"; + return json_response; + } + + public: + explicit CapioAPIServer(int server_port); + ~CapioAPIServer(); +}; + +inline CapioAPIServer *api_server; + +#endif // CAPIO_API_SERVER_HPP diff --git a/capio-server/include/utils/signals.hpp b/capio-server/include/utils/signals.hpp index 892e0cfdb..8a0cc7e33 100644 --- a/capio-server/include/utils/signals.hpp +++ b/capio-server/include/utils/signals.hpp @@ -10,6 +10,12 @@ extern "C" void __gcov_dump(void); #endif +inline void sig_usr1_handler(int signum, siginfo_t *info, void *ptr) { + // Empty function used to Wake up sleeping threads when the API server has received a + // Termination request. This way the termination phase condition is re-evaluated and the server + // can shut down properly +} + /** * @brief Generic handler for incoming signals * @@ -38,41 +44,35 @@ inline void sig_term_handler(int signum, siginfo_t *info, void *ptr) { delete fs_monitor; delete capio_communication_service; delete shm_canary; + delete api_server; server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, "Bye!"); exit(EXIT_SUCCESS); } -inline void sig_usr1_handler(int signum, siginfo_t *info, void *ptr) { - if (gettid() != capio_global_configuration->CAPIO_SERVER_MAIN_PID) { - return; - } - START_LOG(gettid(), "call(signal=[%d] (%s) from process with pid=%ld)", signum, - strsignal(signum), info != nullptr ? info->si_pid : -1); - - server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, "Received request for graceful shutdown!"); - capio_global_configuration->termination_phase = true; -} - /** - * @brief Set the up signal handlers + * @brief Set the up signal handlers. Note: sigusr1 is only used to wake up from sleep threads + * waiting on queues * */ inline void setup_signal_handlers() { START_LOG(gettid(), "call()"); static struct sigaction sigact, sigact_usr1; + memset(&sigact, 0, sizeof(sigact)); - memset(&sigact, 0, sizeof(sigact_usr1)); - sigact.sa_sigaction = sig_term_handler; - sigact.sa_flags = SA_SIGINFO; + memset(&sigact_usr1, 0, sizeof(sigact)); + + sigact.sa_sigaction = sig_term_handler; + sigact.sa_flags = SA_SIGINFO; + sigact_usr1.sa_sigaction = sig_usr1_handler; sigact_usr1.sa_flags = SA_SIGINFO; - int res = sigaction(SIGTERM, &sigact, nullptr) | sigaction(SIGILL, &sigact, nullptr) | - sigaction(SIGABRT, &sigact, nullptr) | sigaction(SIGFPE, &sigact, nullptr) | - sigaction(SIGSEGV, &sigact, nullptr) | sigaction(SIGQUIT, &sigact, nullptr) | - sigaction(SIGPIPE, &sigact, nullptr) | sigaction(SIGINT, &sigact, nullptr) | - sigaction(SIGUSR1, &sigact_usr1, nullptr); - if (res == -1) { + + if ((sigaction(SIGTERM, &sigact, nullptr) | sigaction(SIGILL, &sigact, nullptr) | + sigaction(SIGABRT, &sigact, nullptr) | sigaction(SIGFPE, &sigact, nullptr) | + sigaction(SIGSEGV, &sigact, nullptr) | sigaction(SIGQUIT, &sigact, nullptr) | + sigaction(SIGPIPE, &sigact, nullptr) | sigaction(SIGINT, &sigact, nullptr) | + sigaction(SIGUSR1, &sigact_usr1, nullptr)) == -1) { ERR_EXIT("sigaction for SIGTERM"); } } diff --git a/capio-server/src/api-server/api-server.cpp b/capio-server/src/api-server/api-server.cpp new file mode 100644 index 000000000..a7af981a5 --- /dev/null +++ b/capio-server/src/api-server/api-server.cpp @@ -0,0 +1,76 @@ +#include +#include +#include + +CapioAPIServer::CapioAPIServer(int server_port) { + th = new std::thread(api_server_main_func, server_port, &httplib_server_instance); + + // Register callback for unknown routes + httplib_server_instance.set_error_handler( + [](const httplib::Request &req, httplib::Response &res) { + ResponseMap map; + map["status"] = std::to_string(res.status); + map["message"] = "Error: Unknown request: " + req.path; + res.set_content(build_json_response(map).c_str(), "application/json"); + }); +} + +CapioAPIServer::~CapioAPIServer() { + httplib_server_instance.stop(); + th->join(); + delete th; + server_println(CAPIO_SERVER_CLI_LOG_SERVER, "API server correctly terminated"); +} + +void CapioAPIServer::api_server_main_func(const int server_port, httplib::Server *svr) { + + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, + "Started API server on port: " + std::to_string(server_port)); + + REGISTER_GET_ROUTE("/", "Get server instance information", + [](const httplib::Request &req, httplib::Response &res) { + ResponseMap map; + map["endpoints"] = "/routes"; + res.set_content(build_json_response(map).c_str(), "application/json"); + }); + + REGISTER_GET_ROUTE("/clients", "Get number connected POSIX clients", + [](const httplib::Request &req, httplib::Response &res) { + ResponseMap map; + map["connected_clients"] = + std::to_string(client_manager->get_connected_posix_client()); + + res.set_content(build_json_response(map).c_str(), "application/json"); + }); + + REGISTER_GET_ROUTE("/terminate", "Terminate gracefully server instance", + [](const httplib::Request &req, httplib::Response &res) { + server_println(CAPIO_SERVER_CLI_LOG_SERVER_WARNING, + "Received shutdown request from API Server"); + capio_global_configuration->termination_phase = true; + ResponseMap map; + map["status"] = "shutting-down"; + res.set_content(build_json_response(map).c_str(), "application/json"); + kill(capio_global_configuration->CAPIO_SERVER_MAIN_PID, + SIGUSR1); // Wake parent child and children + }); + + REGISTER_GET_ROUTE("/routes", "Get all available API-SERVER routes", + [](const httplib::Request &req, httplib::Response &res) { + res.set_content( + build_json_response(api_server_routes_descriptions).c_str(), + "application/json"); + }); + + REGISTER_GET_ROUTE("/status", "Get current server status", + [](const httplib::Request &req, httplib::Response &res) { + ResponseMap map; + map["status"] = capio_global_configuration->termination_phase + ? "shutting-down" + : "running"; + res.set_content(build_json_response(map).c_str(), "application/json"); + }); + + svr->listen("127.0.0.1", server_port); + server_println(CAPIO_SERVER_CLI_LOG_SERVER_ERROR, "API server terminated unexpectedly"); +} diff --git a/capio-server/src/client-manager/request_handler_engine.cpp b/capio-server/src/client-manager/request_handler_engine.cpp index 8f0fb70ee..4f42a09b0 100644 --- a/capio-server/src/client-manager/request_handler_engine.cpp +++ b/capio-server/src/client-manager/request_handler_engine.cpp @@ -79,7 +79,7 @@ void RequestHandlerEngine::start() const { if (capio_global_configuration->termination_phase) { server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, "Termination phase is in progress... " - "Ignoring Exception likely thrown while receiving SIGUSR1"); + "Ignoring Exception likely thrown while waking up threads"); continue; } throw; diff --git a/capio-tests/multinode/backend/CMakeLists.txt b/capio-tests/multinode/backend/CMakeLists.txt index bf7ccb1a0..456a7f0b7 100644 --- a/capio-tests/multinode/backend/CMakeLists.txt +++ b/capio-tests/multinode/backend/CMakeLists.txt @@ -20,6 +20,7 @@ add_executable(${TARGET_NAME} ${TARGET_SOURCES}) target_include_directories(${TARGET_NAME} PRIVATE ${TARGET_INCLUDE_FOLDER} ${mtcl_SOURCE_DIR}/include + ${httplib_SOURCE_DIR} "${CMAKE_SOURCE_DIR}/capio-server/" ) diff --git a/capio-tests/multinode/backend/src/MTCL.hpp b/capio-tests/multinode/backend/src/MTCL.hpp index a131f40b6..5bfcf27e7 100644 --- a/capio-tests/multinode/backend/src/MTCL.hpp +++ b/capio-tests/multinode/backend/src/MTCL.hpp @@ -8,9 +8,9 @@ #include #include -const char *filename = "data.bin"; -const size_t chunkSize = 1024; -const size_t totalSize = 2048; +const char *filename = "data.bin"; +const int chunkSize = 1024; +const int totalSize = 2048; inline int writer() { @@ -27,7 +27,7 @@ inline int writer() { } fclose(fp); - printf("Wrote %zu bytes to %s\n", totalSize, filename); + printf("Wrote %d bytes to %s\n", totalSize, filename); return 0; }