Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion capio-common/capio/logger.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ class Logger {
sprintf(tmp_buf, message, ##__VA_ARGS__); \
char node_name[HOST_NAME_MAX]{0}; \
gethostname(node_name, HOST_NAME_MAX); \
printf("%s [ %s ] %s\n", CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, node_name, tmp_buf); \
printf("%s %s] %s\n", CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, node_name, tmp_buf); \
fflush(stdout); \
throw std::runtime_error(std::string(tmp_buf)); \
}
Expand Down
9 changes: 8 additions & 1 deletion capio-server/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,14 @@ FetchContent_Declare(
GIT_TAG fbd8aa924b916c6578963af315fdb5f10c3969ad
)

FetchContent_Declare(
httplib
GIT_REPOSITORY https://github.com/yhirose/cpp-httplib.git
GIT_TAG v0.26.0
)


FetchContent_MakeAvailable(args simdjson mtcl)
FetchContent_MakeAvailable(args simdjson mtcl httplib)

#####################################
# Target definition
Expand All @@ -52,6 +58,7 @@ target_include_directories(${TARGET_NAME} PRIVATE
${args_SOURCE_DIR}
${simdjson_SOURCE_DIR}
${mtcl_SOURCE_DIR}/include
${httplib_SOURCE_DIR}
)

#####################################
Expand Down
2 changes: 2 additions & 0 deletions capio-server/capio_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "capio/env.hpp"
#include "capio/logger.hpp"
#include "capio/semaphore.hpp"
#include "include/api-server/api-server.hpp"

#include <include/capio-cl-engine/capio_cl_engine.hpp>
#include <include/capio-cl-engine/json_parser.hpp>
Expand Down Expand Up @@ -43,6 +44,7 @@ int main(int argc, char **argv) {

capio_cl_engine = JsonParser::parse(config_path, std::filesystem::path(resolve_prefix));
shm_canary = new CapioShmCanary(capio_global_configuration->workflow_name);
api_server = new CapioAPIServer(6666);
file_manager = new CapioFileManager();
fs_monitor = new FileSystemMonitor();
client_manager = new ClientManager();
Expand Down
59 changes: 59 additions & 0 deletions capio-server/include/api-server/api-server.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#ifndef CAPIO_API_SERVER_HPP
#define CAPIO_API_SERVER_HPP
#include <httplib.h>
#include <include/utils/configuration.hpp>
#include <string>
#include <thread>
#include <unordered_map>

inline std::unordered_map<std::string, std::string> api_server_routes_descriptions;

#define REGISTER_GET_ROUTE(route_name, route_description, callback) \
api_server_routes_descriptions[route_name] = route_description; \
svr->Get(route_name, callback);

class CapioAPIServer {
typedef std::unordered_map<std::string, std::string> ResponseMap;

std::thread *th;
httplib::Server httplib_server_instance;
static void api_server_main_func(int server_port, httplib::Server *svr);

static std::string
build_json_response(const std::unordered_map<std::string, std::string> &map) {

// TODO: implement a correct json mapping in c++

ResponseMap response_map = map;
response_map["identity"] = std::string("{ \"hostname\":\"") +
capio_global_configuration->node_name + "\",\"wf_name\":\"" +
capio_global_configuration->workflow_name + "\"},";

std::string json_response = "{";

for (auto &[key, value] : response_map) {
if (key == "identity") {
json_response += "\"" + key + "\":" + value;
} else {
json_response += "\"" + key + "\":\"";
json_response += value + "\",";
}
}

// Remove last comma to ensure json validity
if (json_response.back() == ',') {
json_response.pop_back();
}

json_response += "}\n";
return json_response;
}

public:
explicit CapioAPIServer(int server_port);
~CapioAPIServer();
};

inline CapioAPIServer *api_server;

#endif // CAPIO_API_SERVER_HPP
42 changes: 21 additions & 21 deletions capio-server/include/utils/signals.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@
extern "C" void __gcov_dump(void);
#endif

inline void sig_usr1_handler(int signum, siginfo_t *info, void *ptr) {
// Empty function used to Wake up sleeping threads when the API server has received a
// Termination request. This way the termination phase condition is re-evaluated and the server
// can shut down properly
}

/**
* @brief Generic handler for incoming signals
*
Expand Down Expand Up @@ -38,41 +44,35 @@ inline void sig_term_handler(int signum, siginfo_t *info, void *ptr) {
delete fs_monitor;
delete capio_communication_service;
delete shm_canary;
delete api_server;

server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, "Bye!");
exit(EXIT_SUCCESS);
}

inline void sig_usr1_handler(int signum, siginfo_t *info, void *ptr) {
if (gettid() != capio_global_configuration->CAPIO_SERVER_MAIN_PID) {
return;
}
START_LOG(gettid(), "call(signal=[%d] (%s) from process with pid=%ld)", signum,
strsignal(signum), info != nullptr ? info->si_pid : -1);

server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, "Received request for graceful shutdown!");
capio_global_configuration->termination_phase = true;
}

/**
* @brief Set the up signal handlers
* @brief Set the up signal handlers. Note: sigusr1 is only used to wake up from sleep threads
* waiting on queues
*
*/
inline void setup_signal_handlers() {
START_LOG(gettid(), "call()");
static struct sigaction sigact, sigact_usr1;

memset(&sigact, 0, sizeof(sigact));
memset(&sigact, 0, sizeof(sigact_usr1));
sigact.sa_sigaction = sig_term_handler;
sigact.sa_flags = SA_SIGINFO;
memset(&sigact_usr1, 0, sizeof(sigact));

sigact.sa_sigaction = sig_term_handler;
sigact.sa_flags = SA_SIGINFO;

sigact_usr1.sa_sigaction = sig_usr1_handler;
sigact_usr1.sa_flags = SA_SIGINFO;
int res = sigaction(SIGTERM, &sigact, nullptr) | sigaction(SIGILL, &sigact, nullptr) |
sigaction(SIGABRT, &sigact, nullptr) | sigaction(SIGFPE, &sigact, nullptr) |
sigaction(SIGSEGV, &sigact, nullptr) | sigaction(SIGQUIT, &sigact, nullptr) |
sigaction(SIGPIPE, &sigact, nullptr) | sigaction(SIGINT, &sigact, nullptr) |
sigaction(SIGUSR1, &sigact_usr1, nullptr);
if (res == -1) {

if ((sigaction(SIGTERM, &sigact, nullptr) | sigaction(SIGILL, &sigact, nullptr) |
sigaction(SIGABRT, &sigact, nullptr) | sigaction(SIGFPE, &sigact, nullptr) |
sigaction(SIGSEGV, &sigact, nullptr) | sigaction(SIGQUIT, &sigact, nullptr) |
sigaction(SIGPIPE, &sigact, nullptr) | sigaction(SIGINT, &sigact, nullptr) |
sigaction(SIGUSR1, &sigact_usr1, nullptr)) == -1) {
ERR_EXIT("sigaction for SIGTERM");
}
}
Expand Down
76 changes: 76 additions & 0 deletions capio-server/src/api-server/api-server.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
#include <capio/constants.hpp>
#include <include/api-server/api-server.hpp>
#include <include/client-manager/client_manager.hpp>

CapioAPIServer::CapioAPIServer(int server_port) {
th = new std::thread(api_server_main_func, server_port, &httplib_server_instance);

// Register callback for unknown routes
httplib_server_instance.set_error_handler(
[](const httplib::Request &req, httplib::Response &res) {
ResponseMap map;
map["status"] = std::to_string(res.status);
map["message"] = "Error: Unknown request: " + req.path;
res.set_content(build_json_response(map).c_str(), "application/json");
});
}

CapioAPIServer::~CapioAPIServer() {
httplib_server_instance.stop();
th->join();
delete th;
server_println(CAPIO_SERVER_CLI_LOG_SERVER, "API server correctly terminated");
}

void CapioAPIServer::api_server_main_func(const int server_port, httplib::Server *svr) {

server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO,
"Started API server on port: " + std::to_string(server_port));

REGISTER_GET_ROUTE("/", "Get server instance information",
[](const httplib::Request &req, httplib::Response &res) {
ResponseMap map;
map["endpoints"] = "/routes";
res.set_content(build_json_response(map).c_str(), "application/json");
});

REGISTER_GET_ROUTE("/clients", "Get number connected POSIX clients",
[](const httplib::Request &req, httplib::Response &res) {
ResponseMap map;
map["connected_clients"] =
std::to_string(client_manager->get_connected_posix_client());

res.set_content(build_json_response(map).c_str(), "application/json");
});

REGISTER_GET_ROUTE("/terminate", "Terminate gracefully server instance",
[](const httplib::Request &req, httplib::Response &res) {
server_println(CAPIO_SERVER_CLI_LOG_SERVER_WARNING,
"Received shutdown request from API Server");
capio_global_configuration->termination_phase = true;
ResponseMap map;
map["status"] = "shutting-down";
res.set_content(build_json_response(map).c_str(), "application/json");
kill(capio_global_configuration->CAPIO_SERVER_MAIN_PID,
SIGUSR1); // Wake parent child and children
});

REGISTER_GET_ROUTE("/routes", "Get all available API-SERVER routes",
[](const httplib::Request &req, httplib::Response &res) {
res.set_content(
build_json_response(api_server_routes_descriptions).c_str(),
"application/json");
});

REGISTER_GET_ROUTE("/status", "Get current server status",
[](const httplib::Request &req, httplib::Response &res) {
ResponseMap map;
map["status"] = capio_global_configuration->termination_phase
? "shutting-down"
: "running";
res.set_content(build_json_response(map).c_str(), "application/json");
});

svr->listen("127.0.0.1", server_port);
server_println(CAPIO_SERVER_CLI_LOG_SERVER_ERROR, "API server terminated unexpectedly");
}
2 changes: 1 addition & 1 deletion capio-server/src/client-manager/request_handler_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ void RequestHandlerEngine::start() const {
if (capio_global_configuration->termination_phase) {
server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING,
"Termination phase is in progress... "
"Ignoring Exception likely thrown while receiving SIGUSR1");
"Ignoring Exception likely thrown while waking up threads");
continue;
}
throw;
Expand Down
1 change: 1 addition & 0 deletions capio-tests/multinode/backend/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ add_executable(${TARGET_NAME} ${TARGET_SOURCES})
target_include_directories(${TARGET_NAME} PRIVATE
${TARGET_INCLUDE_FOLDER}
${mtcl_SOURCE_DIR}/include
${httplib_SOURCE_DIR}
"${CMAKE_SOURCE_DIR}/capio-server/"
)

Expand Down
8 changes: 4 additions & 4 deletions capio-tests/multinode/backend/src/MTCL.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
#include <include/communication-service/data-plane/backend_interface.hpp>
#include <thread>

const char *filename = "data.bin";
const size_t chunkSize = 1024;
const size_t totalSize = 2048;
const char *filename = "data.bin";
const int chunkSize = 1024;
const int totalSize = 2048;

inline int writer() {

Expand All @@ -27,7 +27,7 @@ inline int writer() {
}

fclose(fp);
printf("Wrote %zu bytes to %s\n", totalSize, filename);
printf("Wrote %d bytes to %s\n", totalSize, filename);
return 0;
}

Expand Down
Loading