From f95c5eefc4583a8a70df6c370f8ef16d02d3fa5e Mon Sep 17 00:00:00 2001 From: Marco Edoardo Santimaria Date: Sat, 23 Aug 2025 09:25:06 +0000 Subject: [PATCH 1/2] Moved global variables into a dedicated class --- README.md | 2 +- src/common/capio/env.hpp | 2 +- src/common/capio/shm.hpp | 12 +- .../capio-cl-engine/capio_cl_engine.hpp | 13 +- src/server/capio-cl-engine/json_parser.hpp | 181 +++++++++++------- src/server/capio_server.cpp | 24 +-- src/server/client-manager/client_manager.hpp | 6 +- .../client-manager/request_handler_engine.hpp | 28 ++- .../CapioCommunicationService.hpp | 31 +-- .../control_plane/multicast_control_plane.hpp | 59 +++--- src/server/file-manager/file_manager.hpp | 2 +- src/server/file-manager/fs_monitor.hpp | 6 +- .../storage-service/capio_storage_service.hpp | 15 +- src/server/utils/configuration.hpp | 28 +++ src/server/utils/distributed_semaphore.hpp | 7 +- src/server/utils/parser.hpp | 45 +++-- src/server/utils/signals.hpp | 18 +- tests/multinode/backend/src/MTCL.hpp | 1 - tests/multinode/backend/src/main.cpp | 3 +- .../server/src/CapioCacheSPSCQueueTests.hpp | 2 + 20 files changed, 303 insertions(+), 182 deletions(-) create mode 100644 src/server/utils/configuration.hpp diff --git a/README.md b/README.md index bb652de30..4dede7032 100644 --- a/README.md +++ b/README.md @@ -101,7 +101,7 @@ killall -USR1 capio_server > CAPIO-CL configuration file), CAPIO will not be able to operate correctly! > [!tip] -> To gracefully shhut down the capio server instance, just send the SIGUSR1 signal. +> To gracefully shut down the capio server instance, just send the SIGUSR1 signal. > the capio_server process will then automatically clean up and terminate itself! --- diff --git a/src/common/capio/env.hpp b/src/common/capio/env.hpp index e22a6ff81..ae28b7e73 100644 --- a/src/common/capio/env.hpp +++ b/src/common/capio/env.hpp @@ -100,7 +100,7 @@ inline std::string get_capio_workflow_name() { } #else LOG("fetching name from workflow_name"); - name = workflow_name; + name = capio_global_configuration->workflow_name; if (name.size() == 0) { LOG("Falling back to default workflow name"); name = CAPIO_DEFAULT_WORKFLOW_NAME; diff --git a/src/common/capio/shm.hpp b/src/common/capio/shm.hpp index cbb76e1bf..804686732 100644 --- a/src/common/capio/shm.hpp +++ b/src/common/capio/shm.hpp @@ -32,7 +32,8 @@ #define SHM_DESTROY_CHECK(source_name) \ if (shm_unlink(source_name) == -1) { \ - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << node_name << " ] " \ + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " \ + << capio_global_configuration->node_name << " ] " \ << "Unable to destroy shared mem: '" << source_name << "' (" << strerror(errno) \ << ")" << std::endl; \ }; @@ -40,7 +41,8 @@ #define SHM_CREATE_CHECK(condition, source) \ if (condition) { \ LOG("error while creating %s", source); \ - std::cout << CAPIO_SERVER_CLI_LOG_SERVER_ERROR << " [ " << node_name << " ] " \ + std::cout << CAPIO_SERVER_CLI_LOG_SERVER_ERROR << " [ " \ + << capio_global_configuration->node_name << " ] " \ << "Unable to create shm: " << source << std::endl; \ ERR_EXIT("Unable to open shm %s: %s", source, strerror(errno)); \ }; @@ -75,7 +77,8 @@ class CapioShmCanary { ~CapioShmCanary() { START_LOG(capio_syscall(SYS_gettid), "call()"); #ifndef __CAPIO_POSIX - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << node_name << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " + << capio_global_configuration->node_name << " ] " << "Removing shared memory canary flag" << std::endl; #endif @@ -89,7 +92,8 @@ class CapioShmCanary { #endif #ifndef __CAPIO_POSIX - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << node_name << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " + << capio_global_configuration->node_name << " ] " << "shutdown completed" << std::endl; #endif } diff --git a/src/server/capio-cl-engine/capio_cl_engine.hpp b/src/server/capio-cl-engine/capio_cl_engine.hpp index 22fc1efe4..152b1dbaf 100644 --- a/src/server/capio-cl-engine/capio_cl_engine.hpp +++ b/src/server/capio-cl-engine/capio_cl_engine.hpp @@ -34,15 +34,18 @@ class CapioCLEngine { public: void print() const { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_JSON << " [ " << node_name << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_JSON << " [ " + << capio_global_configuration->node_name << " ] " << "Composition of expected CAPIO FS: " << std::endl << std::endl << "|============================================================================" "==========================================================|" << std::endl << "|" << std::setw(135) << "|" << std::endl - << "| Parsed configuration file for workflow: \033[1;36m" << workflow_name - << std::setw(94 - workflow_name.length()) << "\033[0m |" << std::endl + << "| Parsed configuration file for workflow: \033[1;36m" + << capio_global_configuration->workflow_name + << std::setw(94 - capio_global_configuration->workflow_name.length()) + << "\033[0m |" << std::endl << "|" << std::setw(135) << "|" << std::endl << "| File color legend: \033[48;5;034m \033[0m File stored in memory" << std::setw(83) << "|" << std::endl @@ -440,11 +443,11 @@ class CapioCLEngine { START_LOG(gettid(), "call(path=%s)", path.c_str()); if (const auto location = _locations.find(path); location == _locations.end()) { LOG("No rule for home node. Returning create home node"); - return node_name; + return capio_global_configuration->node_name; } else { LOG("Found location entry"); } - return node_name; + return capio_global_configuration->node_name; } protected: diff --git a/src/server/capio-cl-engine/json_parser.hpp b/src/server/capio-cl-engine/json_parser.hpp index d2897ddbb..c511d3ce1 100644 --- a/src/server/capio-cl-engine/json_parser.hpp +++ b/src/server/capio-cl-engine/json_parser.hpp @@ -63,7 +63,7 @@ class JsonParser { */ locations->newFile("*"); locations->setDirectory("*"); - if (StoreOnlyInMemory) { + if (capio_global_configuration->StoreOnlyInMemory) { locations->setStoreFileInMemory("*"); } @@ -82,7 +82,8 @@ class JsonParser { try { json = simdjson::padded_string::load(source.c_str()); } catch (const simdjson::simdjson_error &e) { - std::cerr << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << node_name << " ] " + std::cerr << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " + << capio_global_configuration->node_name << " ] " << "Exception thrown while opening config file: " << e.what() << std::endl; LOG("Exception thrown while opening config file: %s", e.what()); ERR_EXIT("Exception thrown while opening config file: %s", e.what()); @@ -94,12 +95,16 @@ class JsonParser { if (error) { ERR_EXIT("Error: workflow name is mandatory"); } - workflow_name = std::string(wf_name); - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_JSON << " [ " << node_name << " ] " - << "Parsing configuration for workflow: " << workflow_name << std::endl; - LOG("Parsing configuration for workflow: %s", std::string(workflow_name).c_str()); + capio_global_configuration->workflow_name = std::string(wf_name); + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_JSON << " [ " + << capio_global_configuration->node_name << " ] " + << "Parsing configuration for workflow: " + << capio_global_configuration->workflow_name << std::endl; + LOG("Parsing configuration for workflow: %s", + std::string(capio_global_configuration->workflow_name).c_str()); - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_JSON << " [ " << node_name << " ] " << std::endl; + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_JSON << " [ " + << capio_global_configuration->node_name << " ] " << std::endl; auto io_graph = entries["IO_Graph"]; @@ -110,42 +115,47 @@ class JsonParser { ERR_EXIT("Error: app name is mandatory"); } - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_JSON << " [ " << node_name << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_JSON << " [ " + << capio_global_configuration->node_name << " ] " << "Parsing config for app " << app_name << std::endl; LOG("Parsing config for app %s", std::string(app_name).c_str()); if (app["input_stream"].get_array().get(input_stream)) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << node_name << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " + << capio_global_configuration->node_name << " ] " << "No input_stream section found for app " << app_name << std::endl; ERR_EXIT("No input_stream section found for app %s", std::string(app_name).c_str()); } else { for (auto itm : input_stream) { std::filesystem::path file(itm.get_string().take_value()); - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_JSON << " [ " << node_name << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_JSON << " [ " + << capio_global_configuration->node_name << " ] " << "Found file : " << file << std::endl; if (file.is_relative()) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << node_name - << " ] " - << "File : " << file + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " + << capio_global_configuration->node_name << " ] File : " << file << " IS RELATIVE! using cwd() of server to compute abs path." << std::endl; file = resolve_prexix / file; } std::string appname(app_name); - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_JSON << " [ " << node_name << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_JSON << " [ " + << capio_global_configuration->node_name << " ] " << "File : " << file << " added to app: " << app_name << std::endl; locations->newFile(file.c_str()); locations->addConsumer(file, appname); } - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_JSON << " [ " << node_name << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_JSON << " [ " + << capio_global_configuration->node_name << " ] " << "Completed input_stream parsing for app: " << app_name << std::endl; LOG("Completed input_stream parsing for app: %s", std::string(app_name).c_str()); } if (app["output_stream"].get_array().get(output_stream)) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << node_name << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " + << capio_global_configuration->node_name << " ] " << "No output_stream section found for app " << app_name << std::endl; ERR_EXIT("No output_stream section found for app %s", std::string(app_name).c_str()); @@ -154,8 +164,8 @@ class JsonParser { std::filesystem::path file(itm.get_string().take_value()); if (file.is_relative()) { if (file.is_relative()) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << node_name - << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " + << capio_global_configuration->node_name << " ] " << "File : " << file << " IS RELATIVE! using cwd() of server to compute abs path." << std::endl; @@ -163,20 +173,23 @@ class JsonParser { } } std::string appname(app_name); - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_JSON << " [ " << node_name << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_JSON << " [ " + << capio_global_configuration->node_name << " ] " << "Adding file: " << file << " to app: " << app_name << std::endl; locations->newFile(file); locations->addProducer(file, appname); } - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_JSON << " [ " << node_name << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_JSON << " [ " + << capio_global_configuration->node_name << " ] " << "Completed output_stream parsing for app: " << app_name << std::endl; LOG("Completed output_stream parsing for app: %s", std::string(app_name).c_str()); } // PARSING STREAMING FILES if (app["streaming"].get_array().get(streaming)) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << node_name << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " + << capio_global_configuration->node_name << " ] " << "No streaming section found for app: " << app_name << std::endl; LOG("No streaming section found for app: %s", std::string(app_name).c_str()); } else { @@ -195,7 +208,8 @@ class JsonParser { error = file["dirname"].get_array().get(name); if (error || name.is_empty()) { std::cout - << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << node_name << " ] " + << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " + << capio_global_configuration->node_name << " ] " << "error: either name or dirname in streaming section is required" << std::endl; ERR_EXIT( @@ -209,8 +223,8 @@ class JsonParser { LOG("Found name: %s", std::string(elem).c_str()); std::filesystem::path file_fs(elem); if (file_fs.is_relative()) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << node_name - << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " + << capio_global_configuration->node_name << " ] " << "File : " << file_fs << " IS RELATIVE! using cwd() of server to compute abs path." << std::endl; @@ -223,7 +237,8 @@ class JsonParser { // PARSING COMMITTED error = file["committed"].get_string().get(committed); if (error) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << node_name << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " + << capio_global_configuration->node_name << " ] " << "commit rule is mandatory in streaming section" << std::endl; ERR_EXIT("error commit rule is mandatory in streaming section"); } else { @@ -232,8 +247,8 @@ class JsonParser { commit_rule = committed.substr(0, pos); std::string count_str(committed.substr(pos + 1, committed.length())); if (!is_int(count_str)) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << node_name - << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " + << capio_global_configuration->node_name << " ] " << "commit rule on_close/n_files invalid number" << std::endl; ERR_EXIT("error commit rule on_close invalid number: !is_int()"); @@ -246,8 +261,8 @@ class JsonParser { // TODO: use internally n_files. for now, we use on_close as default commit_rule = CAPIO_FILE_COMMITTED_ON_CLOSE; } else { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << node_name - << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " + << capio_global_configuration->node_name << " ] " << "commit rule " << commit_rule << std::endl; ERR_EXIT("error commit rule: %s", std::string(commit_rule).c_str()); } @@ -263,8 +278,8 @@ class JsonParser { error = file["file_deps"].get_array().get(file_deps_tmp); if (error) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << node_name - << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " + << capio_global_configuration->node_name << " ] " << "commit rule is on_file but no file_deps section found" << std::endl; ERR_EXIT("commit rule is on_file but no file_deps section found"); @@ -276,15 +291,15 @@ class JsonParser { std::filesystem::path computed_path(name_tmp); if (computed_path.is_relative()) { std::cout - << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << node_name - << " ] " + << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " + << capio_global_configuration->node_name << " ] " << "File : " << computed_path << " IS RELATIVE! using cwd() of server to compute abs path." << std::endl; computed_path = resolve_prexix / computed_path; } - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_JSON << " [ " << node_name - << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_JSON << " [ " + << capio_global_configuration->node_name << " ] " << "Adding file: " << computed_path << " to file dependencies: " << std::endl; file_deps.emplace_back(computed_path); @@ -314,13 +329,14 @@ class JsonParser { } LOG("batch_size: %d", batch_size); for (auto path : streaming_names) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_JSON << " [ " << node_name << " ] " - << " [ " << node_name << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_JSON << " [ " + << capio_global_configuration->node_name << " ] " + << " [ " << capio_global_configuration->node_name << " ] " << "Updating metadata for path: " << path << std::endl; if (path.is_relative()) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << node_name - << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " + << capio_global_configuration->node_name << " ] " << "Path : " << path << " IS RELATIVE! using cwd() of server to compute abs path." << std::endl; @@ -331,8 +347,8 @@ class JsonParser { // TODO: check for globs std::string commit(commit_rule), firerule(mode); if (n_files != -1) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_JSON << " [ " << node_name - << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_JSON << " [ " + << capio_global_configuration->node_name << " ] " << "Setting path: " << path << " n_files to " << n_files << std::endl; locations->setDirectoryFileCount(path, n_files); @@ -346,25 +362,30 @@ class JsonParser { } } - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_JSON << " [ " << node_name << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_JSON << " [ " + << capio_global_configuration->node_name << " ] " << "completed parsing of streaming section for app: " << app_name << std::endl; LOG("completed parsing of streaming section for app: %s", std::string(app_name).c_str()); } // END PARSING STREAMING FILES - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_JSON << " [ " << node_name << " ] " - << std::endl; + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_JSON << " [ " + << capio_global_configuration->node_name << " ] " << std::endl; } // END OF APP MAIN LOOPS LOG("Completed parsing of io_graph app main loops"); - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_JSON << " [ " << node_name << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_JSON << " [ " + << capio_global_configuration->node_name << " ] " << "Completed parsing of io_graph" << std::endl; LOG("Completed parsing of io_graph"); if (entries["permanent"].get_array().get(permanent_files)) { // PARSING PERMANENT FILES - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << node_name << " ] " - << "No permanent section found for workflow: " << workflow_name << std::endl; - LOG("No permanent section found for workflow: %s", std::string(workflow_name).c_str()); + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " + << capio_global_configuration->node_name << " ] " + << "No permanent section found for workflow: " + << capio_global_configuration->workflow_name << std::endl; + LOG("No permanent section found for workflow: %s", + std::string(capio_global_configuration->workflow_name).c_str()); } else { for (auto file : permanent_files) { std::string_view name; @@ -377,7 +398,8 @@ class JsonParser { std::filesystem::path path(name); if (path.is_relative()) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << node_name << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " + << capio_global_configuration->node_name << " ] " << "Path : " << path << " IS RELATIVE! using cwd() of server to compute abs path." << std::endl; @@ -388,18 +410,23 @@ class JsonParser { // TODO: improve this locations->setPermanent(name.data(), true); } - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_JSON << " [ " << node_name << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_JSON << " [ " + << capio_global_configuration->node_name << " ] " << "Completed parsing of permanent files" << std::endl; LOG("Completed parsing of permanent files"); } // END PARSING PERMANENT FILES - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_JSON << " [ " << node_name << " ] " << std::endl; + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_JSON << " [ " + << capio_global_configuration->node_name << " ] " << std::endl; if (entries["exclude"].get_array().get(exclude_files)) { // PARSING PERMANENT FILES - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << node_name << " ] " - << "No exclude section found for workflow: " << workflow_name << std::endl; - LOG("No exclude section found for workflow: %s", std::string(workflow_name).c_str()); + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " + << capio_global_configuration->node_name << " ] " + << "No exclude section found for workflow: " + << capio_global_configuration->workflow_name << std::endl; + LOG("No exclude section found for workflow: %s", + std::string(capio_global_configuration->workflow_name).c_str()); } else { for (auto file : exclude_files) { std::string_view name; @@ -412,7 +439,8 @@ class JsonParser { std::filesystem::path path(name); if (path.is_relative()) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << node_name << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " + << capio_global_configuration->node_name << " ] " << "Path : " << path << " IS RELATIVE! using cwd() of server to compute abs path." << std::endl; @@ -421,36 +449,44 @@ class JsonParser { // TODO: check for globs locations->setExclude(path, true); } - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_JSON << " [ " << node_name << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_JSON << " [ " + << capio_global_configuration->node_name << " ] " << "Completed parsing of exclude files" << std::endl; LOG("Completed parsing of exclude files"); } // END PARSING PERMANENT FILES - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_JSON << " [ " << node_name << " ] " << std::endl; + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_JSON << " [ " + << capio_global_configuration->node_name << " ] " << std::endl; auto home_node_policies = entries["home_node_policy"].error(); if (!home_node_policies) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << node_name << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " + << capio_global_configuration->node_name << " ] " << "Warning: capio does not support home node policies yet! skipping section " << std::endl; } if (entries["storage"].get_object().get(storage_section)) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << node_name << " ] " - << "No storage section found for workflow: " << workflow_name << std::endl; - LOG("No storage section found for workflow: %s", std::string(workflow_name).c_str()); + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " + << capio_global_configuration->node_name << " ] " + << "No storage section found for workflow: " + << capio_global_configuration->workflow_name << std::endl; + LOG("No storage section found for workflow: %s", + std::string(capio_global_configuration->workflow_name).c_str()); } else { if (storage_section["memory"].get_array().get(storage_memory)) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << node_name << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " + << capio_global_configuration->node_name << " ] " << "No files listed in memory storage section for workflow: " - << workflow_name << std::endl; + << capio_global_configuration->workflow_name << std::endl; LOG("No files listed in memory storage section for workflow: %s", - std::string(workflow_name).c_str()); + std::string(capio_global_configuration->workflow_name).c_str()); } else { for (auto file : storage_memory) { std::string_view file_str; [[maybe_unused]] const auto error = file.get_string().get(file_str); - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_JSON << " [ " << node_name << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_JSON << " [ " + << capio_global_configuration->node_name << " ] " << "Setting file " << file_str << " to be stored in memory" << std::endl; locations->setStoreFileInMemory(file_str); @@ -458,22 +494,25 @@ class JsonParser { } if (storage_section["fs"].get_array().get(storage_fs)) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << node_name << " ] " - << "No files listed in fs storage section for workflow: " << workflow_name - << std::endl; + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " + << capio_global_configuration->node_name << " ] " + << "No files listed in fs storage section for workflow: " + << capio_global_configuration->workflow_name << std::endl; LOG("No files listed in fs storage section for workflow: %s", - std::string(workflow_name).c_str()); + std::string(capio_global_configuration->workflow_name).c_str()); } else { for (auto file : storage_fs) { std::string_view file_str; [[maybe_unused]] const auto error = file.get_string().get(file_str); - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_JSON << " [ " << node_name << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_JSON << " [ " + << capio_global_configuration->node_name << " ] " << "Setting file " << file_str << " to be stored on file system" << std::endl; locations->setStoreFileInFileSystem(file_str); } } - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_JSON << " [ " << node_name << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_JSON << " [ " + << capio_global_configuration->node_name << " ] " << "Completed parsing of memory storage directives" << std::endl; } diff --git a/src/server/capio_server.cpp b/src/server/capio_server.cpp index d3036094d..43e35346e 100644 --- a/src/server/capio_server.cpp +++ b/src/server/capio_server.cpp @@ -20,15 +20,7 @@ #include #include -/* - * Variables required to be globally available - * to all classes and subclasses. - */ -bool termination_phase = false; -std::string workflow_name; -pid_t CAPIO_SERVER_MAIN_PID; -inline bool StoreOnlyInMemory = false; -char node_name[HOST_NAME_MAX]; +#include "utils/configuration.hpp" #include "utils/types.hpp" @@ -50,10 +42,10 @@ char node_name[HOST_NAME_MAX]; int main(int argc, char **argv) { std::cout << CAPIO_LOG_SERVER_BANNER; - gethostname(node_name, HOST_NAME_MAX); - CAPIO_SERVER_MAIN_PID = gettid(); - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << node_name << " ] " - << "Started server with PID: " << CAPIO_SERVER_MAIN_PID << std::endl; + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << capio_global_configuration->node_name + << " ] " + << "Started server with PID: " << capio_global_configuration->CAPIO_SERVER_MAIN_PID + << std::endl; char resolve_prefix[PATH_MAX]{0}; const std::string config_path = parseCLI(argc, argv, resolve_prefix); @@ -62,7 +54,7 @@ int main(int argc, char **argv) { setup_signal_handlers(); capio_cl_engine = JsonParser::parse(config_path, std::filesystem::path(resolve_prefix)); - shm_canary = new CapioShmCanary(workflow_name); + shm_canary = new CapioShmCanary(capio_global_configuration->workflow_name); file_manager = new CapioFileManager(); fs_monitor = new FileSystemMonitor(); request_handlers_engine = new RequestHandlerEngine(); @@ -70,8 +62,8 @@ int main(int argc, char **argv) { capio_cl_engine->print(); - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << node_name << " ] " - << "server initialization completed!" << std::endl + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << capio_global_configuration->node_name + << " ] server initialization completed!" << std::endl << std::flush; request_handlers_engine->start(); diff --git a/src/server/client-manager/client_manager.hpp b/src/server/client-manager/client_manager.hpp index 779948008..4671b5072 100644 --- a/src/server/client-manager/client_manager.hpp +++ b/src/server/client-manager/client_manager.hpp @@ -20,7 +20,8 @@ class ClientManager { bufs_response = new std::unordered_map(); app_names = new std::unordered_map; files_created_by_producer = new std::unordered_map *>; - std::cout << CAPIO_SERVER_CLI_LOG_SERVER << " [ " << node_name << " ] " + std::cout << CAPIO_SERVER_CLI_LOG_SERVER << " [ " << capio_global_configuration->node_name + << " ] " << "ClientManager initialization completed." << std::endl; } @@ -29,7 +30,8 @@ class ClientManager { delete bufs_response; delete app_names; delete files_created_by_producer; - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << node_name << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " + << capio_global_configuration->node_name << " ] " << "buf_response cleanup completed" << std::endl; } diff --git a/src/server/client-manager/request_handler_engine.hpp b/src/server/client-manager/request_handler_engine.hpp index d1ec5fcb6..e4744d33d 100644 --- a/src/server/client-manager/request_handler_engine.hpp +++ b/src/server/client-manager/request_handler_engine.hpp @@ -70,9 +70,11 @@ class RequestHandlerEngine { if (ec == std::errc()) { strcpy(str, ptr + 1); } else { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << node_name << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " + << capio_global_configuration->node_name << " ] " << "Received invalid code: " << code << std::endl; - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << node_name << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " + << capio_global_configuration->node_name << " ] " << "Offending request: " << ptr << " / " << req << std::endl; ERR_EXIT("Invalid request %d:%s", code, ptr); } @@ -85,10 +87,12 @@ class RequestHandlerEngine { client_manager = new ClientManager(); request_handlers = build_request_handlers_table(); - buf_requests = new CSBufRequest_t(SHM_COMM_CHAN_NAME, CAPIO_REQ_BUFF_CNT, - CAPIO_REQ_MAX_SIZE, workflow_name); + buf_requests = + new CSBufRequest_t(SHM_COMM_CHAN_NAME, CAPIO_REQ_BUFF_CNT, CAPIO_REQ_MAX_SIZE, + capio_global_configuration->workflow_name); - std::cout << CAPIO_SERVER_CLI_LOG_SERVER << " [ " << node_name << " ] " + std::cout << CAPIO_SERVER_CLI_LOG_SERVER << " [ " << capio_global_configuration->node_name + << " ] " << "RequestHandlerEngine initialization completed." << std::endl; } @@ -96,9 +100,11 @@ class RequestHandlerEngine { START_LOG(gettid(), "call()"); delete buf_requests; - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << node_name << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " + << capio_global_configuration->node_name << " ] " << "buf_requests cleanup completed" << std::endl; - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << node_name << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " + << capio_global_configuration->node_name << " ] " << "request_handlers_engine cleanup completed" << std::endl; } @@ -117,13 +123,15 @@ class RequestHandlerEngine { * as queues are empty and the server ha removed all requests, it calls the termination * handler to stop the server execution */ - while (!termination_phase || client_manager->get_connected_posix_client() > 0) { + while (!capio_global_configuration->termination_phase || + client_manager->get_connected_posix_client() > 0) { LOG(CAPIO_LOG_SERVER_REQUEST_START); try { code = read_next_request(str.get()); } catch (const std::exception &e) { - if (termination_phase) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << node_name << " ] " + if (capio_global_configuration->termination_phase) { + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " + << capio_global_configuration->node_name << " ] " << "Termination phase is in progress... Ignoring Exception likely " "thrown while receiving SIGUSR1" << std::endl; diff --git a/src/server/communication-service/CapioCommunicationService.hpp b/src/server/communication-service/CapioCommunicationService.hpp index ce6109535..a0c9a9d4f 100644 --- a/src/server/communication-service/CapioCommunicationService.hpp +++ b/src/server/communication-service/CapioCommunicationService.hpp @@ -10,8 +10,6 @@ class CapioCommunicationService { - char ownHostname[HOST_NAME_MAX] = {0}; - public: ~CapioCommunicationService() { delete capio_control_plane; @@ -21,47 +19,56 @@ class CapioCommunicationService { CapioCommunicationService(std::string &backend_name, const int port, const std::string &control_plane_backend = "multicast") { START_LOG(gettid(), "call(backend_name=%s)", backend_name.c_str()); - gethostname(ownHostname, HOST_NAME_MAX); - LOG("My hostname is %s. Starting to listen on connection", ownHostname); + + LOG("My hostname is %s. Starting to listen on connection", + capio_global_configuration->node_name); if (backend_name == "MQTT" || backend_name == "MPI") { std::cout - << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << ownHostname << " ] " + << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " + << capio_global_configuration->node_name << " ] " << "Warn: selected backend is not yet officially supported. Setting backend to TCP" << std::endl; backend_name = "TCP"; } if (backend_name == "TCP" || backend_name == "UCX") { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << ownHostname << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " + << capio_global_configuration->node_name << " ] " << "Selected backend is: " << backend_name << std::endl; - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << ownHostname << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " + << capio_global_configuration->node_name << " ] " << "Selected backend port is: " << port << std::endl; capio_backend = new MTCL_backend(backend_name, std::to_string(port), CAPIO_BACKEND_DEFAULT_SLEEP_TIME); } else if (backend_name == "FS") { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << ownHostname << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " + << capio_global_configuration->node_name << " ] " << "Selected backend is File System" << std::endl; capio_backend = new NoBackend(); } else { START_LOG(gettid(), "call()"); - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << ownHostname << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " + << capio_global_configuration->node_name << " ] " << "Provided communication backend " << backend_name << " is invalid" << std::endl; ERR_EXIT("No valid backend was provided"); } if (control_plane_backend == "multicast") { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << ownHostname << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " + << capio_global_configuration->node_name << " ] " << "Starting multicast control plane" << std::endl; capio_control_plane = new MulticastControlPlane(port); } else { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << ownHostname << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " + << capio_global_configuration->node_name << " ] " << "Starting file system control plane" << std::endl; capio_control_plane = new FSControlPlane(port); } - std::cout << CAPIO_SERVER_CLI_LOG_SERVER << " [ " << ownHostname << " ] " + std::cout << CAPIO_SERVER_CLI_LOG_SERVER << " [ " << capio_global_configuration->node_name + << " ] " << "CapioCommunicationService initialization completed." << std::endl; } }; diff --git a/src/server/communication-service/control_plane/multicast_control_plane.hpp b/src/server/communication-service/control_plane/multicast_control_plane.hpp index 385e6e2e4..72d1c7236 100644 --- a/src/server/communication-service/control_plane/multicast_control_plane.hpp +++ b/src/server/communication-service/control_plane/multicast_control_plane.hpp @@ -17,15 +17,15 @@ class MulticastControlPlane : public CapioControlPlane { static void send_multicast_alive_token(const int data_plane_backend_port) { START_LOG(gettid(), "call(data_plane_backend_port=%d)", data_plane_backend_port); - char hostname[HOST_NAME_MAX]; - gethostname(hostname, HOST_NAME_MAX); int transmission_socket = socket(AF_INET, SOCK_DGRAM, 0); if (transmission_socket < 0) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << hostname << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " + << capio_global_configuration->node_name << " ] " << "WARNING: unable to bind multicast socket: " << strerror(errno) << std::endl; - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << hostname << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " + << capio_global_configuration->node_name << " ] " << "Execution will continue only with FS discovery support" << std::endl; return; } @@ -36,11 +36,12 @@ class MulticastControlPlane : public CapioControlPlane { addr.sin_port = htons(MULTICAST_DISCOVERY_PORT); char message[MULTICAST_ALIVE_TOKEN_MESSAGE_SIZE]; - sprintf(message, "%s:%d", hostname, data_plane_backend_port); + sprintf(message, "%s:%d", capio_global_configuration->node_name, data_plane_backend_port); if (sendto(transmission_socket, message, strlen(message), 0, reinterpret_cast(&addr), sizeof(addr)) < 0) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << hostname << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " + << capio_global_configuration->node_name << " ] " << "WARNING: unable to send alive token(" << message << ") to multicast address!: " << strerror(errno) << std::endl; } @@ -55,21 +56,22 @@ class MulticastControlPlane : public CapioControlPlane { START_LOG(gettid(), "call(data_plane_backend_port=%d)", data_plane_backend_port); - char incomingMessage[MULTICAST_ALIVE_TOKEN_MESSAGE_SIZE], ownHostname[HOST_NAME_MAX]; - gethostname(ownHostname, HOST_NAME_MAX); + char incomingMessage[MULTICAST_ALIVE_TOKEN_MESSAGE_SIZE]; int loopback = 0; // disable receive loopback messages u_int multiple_socket_on_same_address = 1; // enable multiple sockets on same address - const std::string SELF_TOKEN = - std::string(ownHostname) + ":" + std::to_string(data_plane_backend_port); + const std::string SELF_TOKEN = std::string(capio_global_configuration->node_name) + ":" + + std::to_string(data_plane_backend_port); int discovery_socket = socket(AF_INET, SOCK_DGRAM, 0); if (discovery_socket < 0) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << ownHostname << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " + << capio_global_configuration->node_name << " ] " << "WARNING: unable to open multicast socket: " << strerror(errno) << std::endl; - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << ownHostname << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " + << capio_global_configuration->node_name << " ] " << "Execution will continue only with FS discovery support" << std::endl; return; } @@ -78,10 +80,12 @@ class MulticastControlPlane : public CapioControlPlane { if (setsockopt(discovery_socket, SOL_SOCKET, SO_REUSEADDR, (char *) &multiple_socket_on_same_address, sizeof(multiple_socket_on_same_address)) < 0) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << ownHostname << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " + << capio_global_configuration->node_name << " ] " << "WARNING: unable to assign multiple sockets to same address: " << strerror(errno) << std::endl; - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << ownHostname << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " + << capio_global_configuration->node_name << " ] " << "Execution will continue only with FS discovery support" << std::endl; return; } @@ -89,10 +93,12 @@ class MulticastControlPlane : public CapioControlPlane { if (setsockopt(discovery_socket, IPPROTO_IP, IP_MULTICAST_LOOP, &loopback, sizeof(loopback)) < 0) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << ownHostname << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " + << capio_global_configuration->node_name << " ] " << "WARNING: unable to filter out loopback incoming messages: " << strerror(errno) << std::endl; - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << ownHostname << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " + << capio_global_configuration->node_name << " ] " << "Execution will continue only with FS discovery support" << std::endl; return; } @@ -108,10 +114,12 @@ class MulticastControlPlane : public CapioControlPlane { // bind to receive address if (bind(discovery_socket, reinterpret_cast(&addr), sizeof(addr)) < 0) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << ownHostname << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " + << capio_global_configuration->node_name << " ] " << "WARNING: unable to bind multicast socket: " << strerror(errno) << std::endl; - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << ownHostname << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " + << capio_global_configuration->node_name << " ] " << "Execution will continue only with FS discovery support" << std::endl; return; } @@ -121,10 +129,12 @@ class MulticastControlPlane : public CapioControlPlane { mreq.imr_multiaddr.s_addr = inet_addr(MULTICAST_DISCOVERY_ADDR); mreq.imr_interface.s_addr = htonl(INADDR_ANY); if (setsockopt(discovery_socket, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)) < 0) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << ownHostname << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " + << capio_global_configuration->node_name << " ] " << "WARNING: unable to join multicast group: " << strerror(errno) << std::endl; - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << ownHostname << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " + << capio_global_configuration->node_name << " ] " << "Execution will continue only with FS discovery support" << std::endl; return; } @@ -142,10 +152,12 @@ class MulticastControlPlane : public CapioControlPlane { LOG("Received multicast data of size %ld and content %s", recv_sice, incomingMessage); if (recv_sice < 0) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << ownHostname << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " + << capio_global_configuration->node_name << " ] " << "WARNING: received < 0 bytes from multicast: " << strerror(errno) << std::endl; - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << ownHostname << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " + << capio_global_configuration->node_name << " ] " << "Execution will continue only with FS discovery support" << std::endl; return; @@ -155,7 +167,8 @@ class MulticastControlPlane : public CapioControlPlane { std::lock_guard lg(*token_used_to_connect_mutex); if (std::find(token_used_to_connect->begin(), token_used_to_connect->end(), incomingMessage) == token_used_to_connect->end()) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << ownHostname << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " + << capio_global_configuration->node_name << " ] " << "Multicast adv: " << incomingMessage << std::endl; LOG("Received message: %s", incomingMessage); token_used_to_connect->push_back(incomingMessage); diff --git a/src/server/file-manager/file_manager.hpp b/src/server/file-manager/file_manager.hpp index 1fab3e965..58177da8a 100644 --- a/src/server/file-manager/file_manager.hpp +++ b/src/server/file-manager/file_manager.hpp @@ -25,7 +25,7 @@ class CapioFileManager { public: CapioFileManager() { START_LOG(gettid(), "call()"); - std::cout << CAPIO_SERVER_CLI_LOG_SERVER << " [ " << node_name << " ] " + std::cout << CAPIO_SERVER_CLI_LOG_SERVER << " [ " << capio_global_configuration->node_name << " ] " << "CapioFileManager initialization completed." << std::endl; } ~CapioFileManager() { START_LOG(gettid(), "call()"); } diff --git a/src/server/file-manager/fs_monitor.hpp b/src/server/file-manager/fs_monitor.hpp index 6908cc4a9..e80aa0f2c 100644 --- a/src/server/file-manager/fs_monitor.hpp +++ b/src/server/file-manager/fs_monitor.hpp @@ -75,7 +75,8 @@ class FileSystemMonitor { START_LOG(gettid(), "call()"); *continue_execution = true; th = new std::thread(_main, std::ref(continue_execution)); - std::cout << CAPIO_SERVER_CLI_LOG_SERVER << " [ " << node_name << " ] " + std::cout << CAPIO_SERVER_CLI_LOG_SERVER << " [ " << capio_global_configuration->node_name + << " ] " << "CapioFileSystemMonitor initialization completed." << std::endl; } @@ -90,7 +91,8 @@ class FileSystemMonitor { delete th; delete continue_execution; - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << node_name << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " + << capio_global_configuration->node_name << " ] " << "fs_monitor cleanup completed" << std::endl; } }; diff --git a/src/server/storage-service/capio_storage_service.hpp b/src/server/storage-service/capio_storage_service.hpp index a0c0f8e91..6a7017a9c 100644 --- a/src/server/storage-service/capio_storage_service.hpp +++ b/src/server/storage-service/capio_storage_service.hpp @@ -38,7 +38,8 @@ class CapioStorageService { _threads_waiting_for_memory_data = new std::unordered_map>>; - std::cout << CAPIO_SERVER_CLI_LOG_SERVER << " [ " << node_name << " ] " + std::cout << CAPIO_SERVER_CLI_LOG_SERVER << " [ " << capio_global_configuration->node_name + << " ] " << "CapioStorageService initialization completed." << std::endl; } @@ -132,11 +133,13 @@ class CapioStorageService { void register_client(const std::string &app_name, const pid_t pid) const { START_LOG(gettid(), "call(app_name=%s)", app_name.c_str()); _client_to_server_queue->emplace( - pid, new SPSCQueue("queue-" + std::to_string(pid) + +".cts", get_cache_lines(), - get_cache_line_size(), workflow_name, false)); + pid, + new SPSCQueue("queue-" + std::to_string(pid) + +".cts", get_cache_lines(), + get_cache_line_size(), capio_global_configuration->workflow_name, false)); _server_to_client_queue->emplace( - pid, new SPSCQueue("queue-" + std::to_string(pid) + +".stc", get_cache_lines(), - get_cache_line_size(), workflow_name, false)); + pid, + new SPSCQueue("queue-" + std::to_string(pid) + +".stc", get_cache_lines(), + get_cache_line_size(), capio_global_configuration->workflow_name, false)); LOG("Created communication queues"); } @@ -195,7 +198,7 @@ class CapioStorageService { [[nodiscard]] size_t sendFilesToStoreInMemory(const long pid) const { START_LOG(gettid(), "call(pid=%d)", pid); - if (StoreOnlyInMemory) { + if (capio_global_configuration->StoreOnlyInMemory) { LOG("All files should be handled in memory. sending * wildcard"); char f[PATH_MAX + 1]{0}; f[0] = '*'; diff --git a/src/server/utils/configuration.hpp b/src/server/utils/configuration.hpp new file mode 100644 index 000000000..2a4ed991c --- /dev/null +++ b/src/server/utils/configuration.hpp @@ -0,0 +1,28 @@ +#ifndef CAPIO_CONFIGURATION_HPP +#define CAPIO_CONFIGURATION_HPP + +#include +#include + +/* + * Variables required to be globally available + * to all classes and subclasses. + */ +class CapioGlobalConfiguration { + public: + bool termination_phase, StoreOnlyInMemory; + std::string workflow_name; + pid_t CAPIO_SERVER_MAIN_PID = -1; + char node_name[HOST_NAME_MAX]{0}; + + CapioGlobalConfiguration() { + termination_phase = false; + StoreOnlyInMemory = false; + gethostname(node_name, HOST_NAME_MAX); + CAPIO_SERVER_MAIN_PID = gettid(); + } +}; + +inline auto capio_global_configuration = new CapioGlobalConfiguration(); + +#endif // CAPIO_CONFIGURATION_HPP diff --git a/src/server/utils/distributed_semaphore.hpp b/src/server/utils/distributed_semaphore.hpp index 526ec8ef5..0e07fb0fc 100644 --- a/src/server/utils/distributed_semaphore.hpp +++ b/src/server/utils/distributed_semaphore.hpp @@ -19,9 +19,10 @@ class DistributedSemaphore { fp = open(name.c_str(), O_EXCL | O_CREAT | O_WRONLY, 0777); } LOG("Locked %s", name.c_str()); - if (write(fp, node_name, strlen(node_name)) == -1) { - ERR_EXIT("Unable to insert lock holder %s on lock file %s", node_name, - name.c_str()); + if (write(fp, capio_global_configuration->node_name, + strlen(capio_global_configuration->node_name)) == -1) { + ERR_EXIT("Unable to insert lock holder %s on lock file %s", + capio_global_configuration->node_name, name.c_str()); } } LOG("Completed spinlock on lock file %s", name.c_str()); diff --git a/src/server/utils/parser.hpp b/src/server/utils/parser.hpp index 52e617a02..f455a0d2f 100644 --- a/src/server/utils/parser.hpp +++ b/src/server/utils/parser.hpp @@ -71,8 +71,9 @@ std::string parseCLI(int argc, char **argv, char *resolve_prefix) { } if (memStorageOnly) { - StoreOnlyInMemory = true; - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << node_name << " ] " + capio_global_configuration->StoreOnlyInMemory = true; + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " + << capio_global_configuration->node_name << " ] " << "All files will be stored in memory whenever possible." << std::endl; } @@ -104,26 +105,31 @@ std::string parseCLI(int argc, char **argv, char *resolve_prefix) { #ifdef CAPIO_LOG auto logname = open_server_logfile(); log = new Logger(__func__, __FILE__, __LINE__, gettid(), "Created new log file"); - std::cout << CAPIO_SERVER_CLI_LOG_SERVER << " [ " << node_name << " ] " + std::cout << CAPIO_SERVER_CLI_LOG_SERVER << " [ " << capio_global_configuration->node_name + << " ] " << "started logging to logfile " << logname << std::endl; #endif if (config) { std::string token = args::get(config); - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << node_name << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " + << capio_global_configuration->node_name << " ] " << "parsing config file: " << token << std::endl; // TODO: pass config file path } else if (noConfigFile) { - workflow_name = std::string_view(get_capio_workflow_name()); - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << node_name << " ] " + capio_global_configuration->workflow_name = std::string_view(get_capio_workflow_name()); + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " + << capio_global_configuration->node_name << " ] " << "skipping config file parsing." << std::endl - << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << node_name << " ] " + << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " + << capio_global_configuration->node_name << " ] " << "Obtained from environment variable current workflow name: " - << workflow_name.data() << std::endl; + << capio_global_configuration->workflow_name.data() << std::endl; } else { START_LOG(gettid(), "call()"); - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << node_name << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " + << capio_global_configuration->node_name << " ] " << "Error: no config file provided. To skip config file use --no-config option!" << std::endl; ERR_EXIT("no config file provided, and --no-config not provided"); @@ -131,14 +137,16 @@ std::string parseCLI(int argc, char **argv, char *resolve_prefix) { #ifdef CAPIO_LOG CAPIO_LOG_LEVEL = get_capio_log_level(); - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << node_name << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << capio_global_configuration->node_name + << " ] " << "LOG_LEVEL set to: " << CAPIO_LOG_LEVEL << std::endl; std::cout << CAPIO_LOG_SERVER_CLI_LOGGING_ENABLED_WARNING; log->log("LOG_LEVEL set to: %d", CAPIO_LOG_LEVEL); delete log; #else if (std::getenv("CAPIO_LOG_LEVEL") != nullptr) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << node_name << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " + << capio_global_configuration->node_name << " ] " << CAPIO_LOG_SERVER_CLI_LOGGING_NOT_AVAILABLE << std::endl; } #endif @@ -156,21 +164,24 @@ std::string parseCLI(int argc, char **argv, char *resolve_prefix) { if (controlPlaneBackend) { auto tmp = args::get(controlPlaneBackend); if (tmp != "multicast" && tmp != "fs") { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << node_name << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " + << capio_global_configuration->node_name << " ] " << "Unknown control plane backend " << tmp << std::endl; } else { constrol_backend_name = tmp; } } - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << node_name << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " + << capio_global_configuration->node_name << " ] " << "Using control plane backend: " << constrol_backend_name << std::endl; capio_communication_service = new CapioCommunicationService(backend_name, port, constrol_backend_name); } else { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << node_name << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " + << capio_global_configuration->node_name << " ] " << "Selected backend is File System" << std::endl; capio_backend = new NoBackend(); } @@ -178,10 +189,12 @@ std::string parseCLI(int argc, char **argv, char *resolve_prefix) { if (capio_cl_resolve_path) { auto path = args::get(capio_cl_resolve_path); memcpy(resolve_prefix, path.c_str(), PATH_MAX); - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << node_name << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " + << capio_global_configuration->node_name << " ] " << "CAPIO-CL relative file prefix: " << resolve_prefix << std::endl; } else { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << node_name << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " + << capio_global_configuration->node_name << " ] " << "No CAPIO-CL resolve file prefix provided" << std::endl; } diff --git a/src/server/utils/signals.hpp b/src/server/utils/signals.hpp index e06479880..6f5696ab5 100644 --- a/src/server/utils/signals.hpp +++ b/src/server/utils/signals.hpp @@ -18,18 +18,20 @@ extern "C" void __gcov_dump(void); * @param ptr */ inline void sig_term_handler(int signum, siginfo_t *info, void *ptr) { - if (gettid() != CAPIO_SERVER_MAIN_PID) { + if (gettid() != capio_global_configuration->CAPIO_SERVER_MAIN_PID) { return; } START_LOG(gettid(), "call(signal=[%d] (%s) from process with pid=%ld)", signum, strsignal(signum), info != nullptr ? info->si_pid : -1); std::cout << std::endl - << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << node_name << " ] " + << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " + << capio_global_configuration->node_name << " ] " << "shutting down server" << std::endl; if (signum == SIGSEGV) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " << node_name << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " + << capio_global_configuration->node_name << " ] " << "Segfault detected!" << std::endl; } @@ -41,20 +43,22 @@ inline void sig_term_handler(int signum, siginfo_t *info, void *ptr) { delete fs_monitor; delete capio_communication_service; delete shm_canary; - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << node_name << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << capio_global_configuration->node_name + << " ] " << "Bye!" << std::endl; exit(EXIT_SUCCESS); } inline void sig_usr1_handler(int signum, siginfo_t *info, void *ptr) { - if (gettid() != CAPIO_SERVER_MAIN_PID) { + if (gettid() != capio_global_configuration->CAPIO_SERVER_MAIN_PID) { return; } START_LOG(gettid(), "call(signal=[%d] (%s) from process with pid=%ld)", signum, strsignal(signum), info != nullptr ? info->si_pid : -1); - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << node_name << " ] " + std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " + << capio_global_configuration->node_name << " ] " << "Received request for graceful shutdown!" << std::endl; - termination_phase = true; + capio_global_configuration->termination_phase = true; } /** diff --git a/tests/multinode/backend/src/MTCL.hpp b/tests/multinode/backend/src/MTCL.hpp index e3b04ba92..f74c3ccc0 100644 --- a/tests/multinode/backend/src/MTCL.hpp +++ b/tests/multinode/backend/src/MTCL.hpp @@ -11,7 +11,6 @@ constexpr capio_off64_t BUFFER_SIZES = 1024; TEST(CapioCommServiceTest, TestPingPong) { START_LOG(gettid(), "INFO: TestPingPong"); - gethostname(node_name.data(), HOST_NAME_MAX); const int port = 1234; std::string proto = "TCP"; auto communication_service = new CapioCommunicationService(proto, port, "multicast"); diff --git a/tests/multinode/backend/src/main.cpp b/tests/multinode/backend/src/main.cpp index 83d55f326..768ea9b5f 100644 --- a/tests/multinode/backend/src/main.cpp +++ b/tests/multinode/backend/src/main.cpp @@ -2,7 +2,8 @@ #include #include -std::string node_name; +#include "../server/utils/configuration.hpp" + #include "MTCL.hpp" #include diff --git a/tests/unit/server/src/CapioCacheSPSCQueueTests.hpp b/tests/unit/server/src/CapioCacheSPSCQueueTests.hpp index 569730884..35bfc0f5a 100644 --- a/tests/unit/server/src/CapioCacheSPSCQueueTests.hpp +++ b/tests/unit/server/src/CapioCacheSPSCQueueTests.hpp @@ -3,6 +3,8 @@ #include +#include "../server/utils/configuration.hpp" + #include "../common/capio/response_queue.hpp" #include "../posix/utils/env.hpp" #include "../posix/utils/filesystem.hpp" From 7015945da8a1a2af58c7006ab25d6ec87bd3027f Mon Sep 17 00:00:00 2001 From: Marco Edoardo Santimaria Date: Sat, 23 Aug 2025 12:43:08 +0000 Subject: [PATCH 2/2] Code cleanup moving from using std::cout to server_println --- src/common/capio/constants.hpp | 30 +- src/common/capio/shm.hpp | 22 +- .../capio-cl-engine/capio_cl_engine.hpp | 151 ++++++---- src/server/capio-cl-engine/json_parser.hpp | 275 ++++++++---------- src/server/capio_server.cpp | 12 +- src/server/client-manager/client_manager.hpp | 8 +- .../client-manager/handlers/handshake.hpp | 28 +- .../client-manager/request_handler_engine.hpp | 30 +- .../CapioCommunicationService.hpp | 41 +-- .../control_plane/fs_control_plane.hpp | 8 +- .../control_plane/multicast_control_plane.hpp | 109 +++---- .../data_plane/MTCL_backend.hpp | 22 +- src/server/file-manager/file_manager.hpp | 8 +- src/server/file-manager/fs_monitor.hpp | 9 +- .../storage-service/capio_storage_service.hpp | 6 +- src/server/utils/configuration.hpp | 13 +- src/server/utils/parser.hpp | 88 +++--- src/server/utils/signals.hpp | 19 +- 18 files changed, 401 insertions(+), 478 deletions(-) diff --git a/src/common/capio/constants.hpp b/src/common/capio/constants.hpp index 0f848ff59..5235e16b6 100644 --- a/src/common/capio/constants.hpp +++ b/src/common/capio/constants.hpp @@ -35,9 +35,9 @@ constexpr int CAPIO_CACHE_LINE_SIZE_DEFAULT = 32768; // 32K of default // TODO: use that in communication only uses the file descriptor instead of the path to save on the // PATH_MAX constexpr size_t CAPIO_REQ_MAX_SIZE = (PATH_MAX + 256) * sizeof(char); -constexpr char CAPIO_SERVER_CLI_LOG_SERVER[] = "[ \033[1;32m SERVER \033[0m ] "; -constexpr char CAPIO_SERVER_CLI_LOG_SERVER_WARNING[] = "[ \033[1;33m SERVER \033[0m ] "; -constexpr char CAPIO_SERVER_CLI_LOG_SERVER_ERROR[] = "[ \033[1;31m SERVER \033[0m ] "; +constexpr char CAPIO_SERVER_CLI_LOG_SERVER[] = "[\033[1;32mSERVER\033[0m"; +constexpr char CAPIO_SERVER_CLI_LOG_SERVER_WARNING[] = "[\033[1;33mSERVER\033[0m"; +constexpr char CAPIO_SERVER_CLI_LOG_SERVER_ERROR[] = "[\033[1;31mSERVER\033[0m"; constexpr char LOG_CAPIO_START_REQUEST[] = "\n+++++++++++ SYSCALL %s (%d) +++++++++++"; constexpr char LOG_CAPIO_END_REQUEST[] = "----------- END SYSCALL ----------\n"; constexpr char CAPIO_SERVER_LOG_START_REQUEST_MSG[] = "\n+++++++++++++++++REQUEST+++++++++++++++++"; @@ -103,22 +103,20 @@ constexpr char CAPIO_LOG_SERVER_BANNER[] = "\\______/\n\n" "\033[0m CAPIO - Cross Application Programmable IO \n" " V. " CAPIO_VERSION "\n\n"; -constexpr char CAPIO_LOG_SERVER_CLI_LEVEL_INFO[] = "[ \033[1;32m SERVER \033[0m ] "; -constexpr char CAPIO_LOG_SERVER_CLI_LEVEL_WARNING[] = "[ \033[1;33m SERVER \033[0m ] "; -constexpr char CAPIO_LOG_SERVER_CLI_LEVEL_ERROR[] = "[ \033[1;31m SERVER \033[0m ] "; -constexpr char CAPIO_LOG_SERVER_CLI_LEVEL_JSON[] = "[ \033[1;34m SERVER \033[0m ] "; +constexpr char CAPIO_LOG_SERVER_CLI_LEVEL_INFO[] = "[\033[1;32mSERVER\033[0m"; +constexpr char CAPIO_LOG_SERVER_CLI_LEVEL_WARNING[] = "[\033[1;33mSERVER\033[0m"; +constexpr char CAPIO_LOG_SERVER_CLI_LEVEL_ERROR[] = "[\033[1;31mSERVER\033[0m"; +constexpr char CAPIO_LOG_SERVER_CLI_LEVEL_JSON[] = "[\033[1;34mSERVER\033[0m"; constexpr char CAPIO_LOG_SERVER_CLI_LOGGING_ENABLED_WARNING[] = - "[ \033[1;33m SERVER \033[0m ] " + "[\033[1;33mSERVER\033[0m] " "|==================================================================|\n" - "[ \033[1;33m SERVER \033[0m ] | you are running a build of CAPIO with " + "[\033[1;33mSERVER\033[0m] | you are running a build of CAPIO with " "logging enabled. |\n" - "[ \033[1;33m SERVER \033[0m ] | this will have impact on performance. " - "you " - "should recompile CAPIO |\n" - "[ \033[1;33m SERVER \033[0m ] | with -DCAPIO_LOG=FALSE " - " " - " |\n" - "[ \033[1;33m SERVER \033[0m ] " + "[\033[1;33mSERVER\033[0m] | this will have impact on performance. " + "you should recompile CAPIO |\n" + "[\033[1;33mSERVER\033[0m] | with -DCAPIO_LOG=FALSE " + " |\n" + "[\033[1;33mSERVER\033[0m] " "|==================================================================|\n"; constexpr char CAPIO_LOG_SERVER_CLI_LOGGING_NOT_AVAILABLE[] = "CAPIO_LOG set but log support was not compiled into CAPIO!"; diff --git a/src/common/capio/shm.hpp b/src/common/capio/shm.hpp index 804686732..e2bd992f4 100644 --- a/src/common/capio/shm.hpp +++ b/src/common/capio/shm.hpp @@ -32,18 +32,16 @@ #define SHM_DESTROY_CHECK(source_name) \ if (shm_unlink(source_name) == -1) { \ - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " \ - << capio_global_configuration->node_name << " ] " \ - << "Unable to destroy shared mem: '" << source_name << "' (" << strerror(errno) \ - << ")" << std::endl; \ + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, "Unable to destroy shared mem: '" + \ + std::string(source_name) + "' (" + \ + strerror(errno) + ")"); \ }; #define SHM_CREATE_CHECK(condition, source) \ if (condition) { \ - LOG("error while creating %s", source); \ - std::cout << CAPIO_SERVER_CLI_LOG_SERVER_ERROR << " [ " \ - << capio_global_configuration->node_name << " ] " \ - << "Unable to create shm: " << source << std::endl; \ + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, "Unable to create shared mem: '" + \ + std::string(source) + "' (" + \ + strerror(errno) + ")"); \ ERR_EXIT("Unable to open shm %s: %s", source, strerror(errno)); \ }; @@ -77,9 +75,7 @@ class CapioShmCanary { ~CapioShmCanary() { START_LOG(capio_syscall(SYS_gettid), "call()"); #ifndef __CAPIO_POSIX - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " - << capio_global_configuration->node_name << " ] " - << "Removing shared memory canary flag" << std::endl; + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, "Removing shared memory canary flag"); #endif #ifdef __CAPIO_POSIX @@ -92,9 +88,7 @@ class CapioShmCanary { #endif #ifndef __CAPIO_POSIX - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " - << capio_global_configuration->node_name << " ] " - << "shutdown completed" << std::endl; + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, "Shutdown completed"); #endif } }; diff --git a/src/server/capio-cl-engine/capio_cl_engine.hpp b/src/server/capio-cl-engine/capio_cl_engine.hpp index 152b1dbaf..fd1c04515 100644 --- a/src/server/capio-cl-engine/capio_cl_engine.hpp +++ b/src/server/capio-cl-engine/capio_cl_engine.hpp @@ -12,7 +12,6 @@ class CapioCLEngine { friend class CapioFileManager; - private: std::unordered_map, // Vector for producers [0] std::vector, // Vector for consumers [1] @@ -34,79 +33,103 @@ class CapioCLEngine { public: void print() const { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_JSON << " [ " - << capio_global_configuration->node_name << " ] " - << "Composition of expected CAPIO FS: " << std::endl - << std::endl - << "|============================================================================" - "==========================================================|" - << std::endl - << "|" << std::setw(135) << "|" << std::endl - << "| Parsed configuration file for workflow: \033[1;36m" - << capio_global_configuration->workflow_name - << std::setw(94 - capio_global_configuration->workflow_name.length()) - << "\033[0m |" << std::endl - << "|" << std::setw(135) << "|" << std::endl - << "| File color legend: \033[48;5;034m \033[0m File stored in memory" - << std::setw(83) << "|" << std::endl - << "| " - << "\033[48;5;172m \033[0m File stored on file system" << std::setw(78) << "|" - << std::endl - << "|============================================================================" - "==========================================================|" - << std::endl - << "|======|===================|===================|====================|========" - "============|============|===========|=========|==========|" - << std::endl - << "| Kind | Filename | Producer step | Consumer step | " - "Commit Rule | Fire Rule | Permanent | Exclude | n_files |" - << std::endl - << "|======|===================|===================|====================|========" - "============|============|===========|=========|==========|" - << std::endl; - - for (auto itm : _locations) { + // First message + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_JSON, ""); + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_JSON, "Composition of expected CAPIO FS: "); + + // Table header lines + server_println( + CAPIO_LOG_SERVER_CLI_LEVEL_JSON, + "|============================================================================" + "==========================================================|"); + + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_JSON, "|" + std::string(134, ' ') + "|"); + + { + std::ostringstream oss; + oss << "| Parsed configuration file for workflow: \033[1;36m" + << capio_global_configuration->workflow_name + << std::setw(94 - capio_global_configuration->workflow_name.length()) + << "\033[0m |"; + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_JSON, oss.str()); + } + + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_JSON, "|" + std::string(134, ' ') + "|"); + + server_println( + CAPIO_LOG_SERVER_CLI_LEVEL_JSON, + "| File color legend: \033[48;5;034m \033[0m File stored in memory" + + std::string(82, ' ') + "|"); + + server_println( + CAPIO_LOG_SERVER_CLI_LEVEL_JSON, + "| \033[48;5;172m \033[0m File stored on file system" + + std::string(77, ' ') + "|"); + + server_println( + CAPIO_LOG_SERVER_CLI_LEVEL_JSON, + "|============================================================================" + "==========================================================|"); + + server_println( + CAPIO_LOG_SERVER_CLI_LEVEL_JSON, + "|======|===================|===================|====================|========" + "============|============|===========|=========|==========|"); + + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_JSON, + "| Kind | Filename | Producer step | Consumer step | " + "Commit Rule | Fire Rule | Permanent | Exclude | n_files |"); + + server_println( + CAPIO_LOG_SERVER_CLI_LEVEL_JSON, + "|======|===================|===================|====================|========" + "============|============|===========|=========|==========|"); + + // Iterate over _locations + for (auto &itm : _locations) { std::string color_preamble = std::get<11>(itm.second) ? "\033[38;5;034m" : "\033[38;5;172m"; std::string color_post = "\033[0m"; std::string name_trunc = truncateLastN(itm.first, 12); auto kind = std::get<6>(itm.second) ? "F" : "D"; - std::cout << "| " << color_preamble << kind << color_post << " | " << color_preamble + + std::ostringstream base_line; + base_line << "| " << color_preamble << kind << color_post << " | " << color_preamble << name_trunc << color_post << std::setfill(' ') << std::setw(20 - name_trunc.length()) << "| "; auto producers = std::get<0>(itm.second); auto consumers = std::get<1>(itm.second); - auto rowCount = - producers.size() > consumers.size() ? producers.size() : consumers.size(); + auto rowCount = std::max(producers.size(), consumers.size()); - // Add logic to handle the n_files column std::string n_files = std::to_string(std::get<8>(itm.second)); if (std::get<8>(itm.second) < 1) { n_files = "N.A."; } for (std::size_t i = 0; i <= rowCount; i++) { - std::string prod, cons; - if (i > 0) { - std::cout << "| | | "; + std::ostringstream line; + + if (i == 0) { + line << base_line.str(); + } else { + line << "| | | "; } if (i < producers.size()) { auto prod1 = truncateLastN(producers.at(i), 12); - std::cout << prod1 << std::setfill(' ') << std::setw(20 - prod1.length()) - << " | "; + line << prod1 << std::setfill(' ') << std::setw(20 - prod1.length()) << " | "; } else { - std::cout << std::setfill(' ') << std::setw(20) << " | "; + line << std::setfill(' ') << std::setw(20) << " | "; } if (i < consumers.size()) { auto cons1 = truncateLastN(consumers.at(i), 12); - std::cout << " " << cons1 << std::setfill(' ') << std::setw(20 - cons1.length()) - << " | "; + line << " " << cons1 << std::setfill(' ') << std::setw(20 - cons1.length()) + << " | "; } else { - std::cout << std::setfill(' ') << std::setw(21) << " | "; + line << std::setfill(' ') << std::setw(21) << " | "; } if (i == 0) { @@ -114,26 +137,30 @@ class CapioCLEngine { fire_rule = std::get<3>(itm.second); bool exclude = std::get<4>(itm.second), permanent = std::get<5>(itm.second); - std::cout << " " << commit_rule << std::setfill(' ') - << std::setw(20 - commit_rule.length()) << " | " << fire_rule - << std::setfill(' ') << std::setw(13 - fire_rule.length()) << " | " - << " " << (permanent ? "YES" : "NO ") << " | " - << (exclude ? "YES" : "NO ") << " | " << n_files - << std::setw(11 - n_files.length()) << " | " << std::endl; + line << " " << commit_rule << std::setfill(' ') + << std::setw(20 - commit_rule.length()) << " | " << fire_rule + << std::setfill(' ') << std::setw(13 - fire_rule.length()) << " | " + << " " << (permanent ? "YES" : "NO ") << " | " + << (exclude ? "YES" : "NO ") << " | " << n_files + << std::setw(10 - n_files.length()) << " |"; } else { - std::cout << std::setfill(' ') << std::setw(20) << "|" << std::setfill(' ') - << std::setw(13) << "|" << std::setfill(' ') << std::setw(12) << "|" - << std::setfill(' ') << std::setw(10) << "|" << std::setw(11) << "|" - << std::endl; + line << std::setfill(' ') << std::setw(20) << "|" << std::setfill(' ') + << std::setw(13) << "|" << std::setfill(' ') << std::setw(12) << "|" + << std::setfill(' ') << std::setw(10) << "|" << std::setw(10) << "|"; } + + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_JSON, line.str()); } - std::cout << "*~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" - "~~~~~~~~~~~~~~~~~~" - "~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~*" - << std::endl; + + server_println( + CAPIO_LOG_SERVER_CLI_LEVEL_JSON, + "*~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~" + "~~~~~~~~~~~~~~~~~~" + "~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~*"); } - std::cout << std::endl; - }; + + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_JSON, ""); + } /** * Check whether the file is contained inside the location, either by direct name or by glob diff --git a/src/server/capio-cl-engine/json_parser.hpp b/src/server/capio-cl-engine/json_parser.hpp index c511d3ce1..761a10515 100644 --- a/src/server/capio-cl-engine/json_parser.hpp +++ b/src/server/capio-cl-engine/json_parser.hpp @@ -96,15 +96,15 @@ class JsonParser { ERR_EXIT("Error: workflow name is mandatory"); } capio_global_configuration->workflow_name = std::string(wf_name); - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_JSON << " [ " - << capio_global_configuration->node_name << " ] " - << "Parsing configuration for workflow: " - << capio_global_configuration->workflow_name << std::endl; + + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_JSON, + "Parsing configuration for workflow: " + + capio_global_configuration->workflow_name); + LOG("Parsing configuration for workflow: %s", std::string(capio_global_configuration->workflow_name).c_str()); - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_JSON << " [ " - << capio_global_configuration->node_name << " ] " << std::endl; + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_JSON, ""); auto io_graph = entries["IO_Graph"]; @@ -115,48 +115,49 @@ class JsonParser { ERR_EXIT("Error: app name is mandatory"); } - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_JSON << " [ " - << capio_global_configuration->node_name << " ] " - << "Parsing config for app " << app_name << std::endl; + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_JSON, + "Parsing config for app " + std::string(app_name)); LOG("Parsing config for app %s", std::string(app_name).c_str()); if (app["input_stream"].get_array().get(input_stream)) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " - << capio_global_configuration->node_name << " ] " - << "No input_stream section found for app " << app_name << std::endl; + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, + "No input_stream section found for app " + std::string(app_name)); ERR_EXIT("No input_stream section found for app %s", std::string(app_name).c_str()); } else { for (auto itm : input_stream) { std::filesystem::path file(itm.get_string().take_value()); - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_JSON << " [ " - << capio_global_configuration->node_name << " ] " - << "Found file : " << file << std::endl; + + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_JSON, + "Found file " + std::string(file)); + if (file.is_relative()) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " - << capio_global_configuration->node_name << " ] File : " << file - << " IS RELATIVE! using cwd() of server to compute abs path." - << std::endl; + + server_println( + CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, + "Path : " + std::string(file) + + " IS RELATIVE! using cwd() of server to compute abs path."); file = resolve_prexix / file; } std::string appname(app_name); - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_JSON << " [ " - << capio_global_configuration->node_name << " ] " - << "File : " << file << " added to app: " << app_name << std::endl; + + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_JSON, + "Path : " + std::string(file) + + " added to app: " + std::string(app_name)); locations->newFile(file.c_str()); locations->addConsumer(file, appname); } - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_JSON << " [ " - << capio_global_configuration->node_name << " ] " - << "Completed input_stream parsing for app: " << app_name << std::endl; + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_JSON, + "Completed input_stream parsing for app: " + std::string(app_name)); + LOG("Completed input_stream parsing for app: %s", std::string(app_name).c_str()); } if (app["output_stream"].get_array().get(output_stream)) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " - << capio_global_configuration->node_name << " ] " - << "No output_stream section found for app " << app_name << std::endl; + + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, + "No output_stream section found for app " + std::string(app_name)); ERR_EXIT("No output_stream section found for app %s", std::string(app_name).c_str()); } else { @@ -164,33 +165,31 @@ class JsonParser { std::filesystem::path file(itm.get_string().take_value()); if (file.is_relative()) { if (file.is_relative()) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " - << capio_global_configuration->node_name << " ] " - << "File : " << file - << " IS RELATIVE! using cwd() of server to compute abs path." - << std::endl; + server_println( + CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, + "Path : " + std::string(file) + + " IS RELATIVE! using cwd() of server to compute abs path."); file = resolve_prexix / file; } } std::string appname(app_name); - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_JSON << " [ " - << capio_global_configuration->node_name << " ] " - << "Adding file: " << file << " to app: " << app_name << std::endl; + + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_JSON, + "Adding file: " + std::string(file) + " to app: " + appname); locations->newFile(file); locations->addProducer(file, appname); } - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_JSON << " [ " - << capio_global_configuration->node_name << " ] " - << "Completed output_stream parsing for app: " << app_name << std::endl; + + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_JSON, + "Completed output_stream parsing for app: " + std::string(app_name)); LOG("Completed output_stream parsing for app: %s", std::string(app_name).c_str()); } // PARSING STREAMING FILES if (app["streaming"].get_array().get(streaming)) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " - << capio_global_configuration->node_name << " ] " - << "No streaming section found for app: " << app_name << std::endl; + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, + "No Streaming section found for app " + std::string(app_name)); LOG("No streaming section found for app: %s", std::string(app_name).c_str()); } else { LOG("Began parsing streaming section for app %s", std::string(app_name).c_str()); @@ -207,11 +206,9 @@ class JsonParser { if (error || name.is_empty()) { error = file["dirname"].get_array().get(name); if (error || name.is_empty()) { - std::cout - << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " - << capio_global_configuration->node_name << " ] " - << "error: either name or dirname in streaming section is required" - << std::endl; + server_println( + CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, + "error: either name or dirname in streaming section is required"); ERR_EXIT( "error: either name or dirname in streaming section is required"); } @@ -223,11 +220,10 @@ class JsonParser { LOG("Found name: %s", std::string(elem).c_str()); std::filesystem::path file_fs(elem); if (file_fs.is_relative()) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " - << capio_global_configuration->node_name << " ] " - << "File : " << file_fs - << " IS RELATIVE! using cwd() of server to compute abs path." - << std::endl; + server_println( + CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, + "Path : " + std::string(file_fs) + + " IS RELATIVE! using cwd() of server to compute abs path."); file_fs = resolve_prexix / file_fs; } LOG("Saving file %s to locations", std::string(elem).c_str()); @@ -237,9 +233,8 @@ class JsonParser { // PARSING COMMITTED error = file["committed"].get_string().get(committed); if (error) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " - << capio_global_configuration->node_name << " ] " - << "commit rule is mandatory in streaming section" << std::endl; + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, + "commit rule is mandatory in streaming section"); ERR_EXIT("error commit rule is mandatory in streaming section"); } else { auto pos = committed.find(':'); @@ -247,10 +242,8 @@ class JsonParser { commit_rule = committed.substr(0, pos); std::string count_str(committed.substr(pos + 1, committed.length())); if (!is_int(count_str)) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " - << capio_global_configuration->node_name << " ] " - << "commit rule on_close/n_files invalid number" - << std::endl; + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, + "commit rule on_close/n_files invalid number"); ERR_EXIT("error commit rule on_close invalid number: !is_int()"); } @@ -261,9 +254,8 @@ class JsonParser { // TODO: use internally n_files. for now, we use on_close as default commit_rule = CAPIO_FILE_COMMITTED_ON_CLOSE; } else { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " - << capio_global_configuration->node_name << " ] " - << "commit rule " << commit_rule << std::endl; + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, + "Invalid commit rule: " + std::string(commit_rule)); ERR_EXIT("error commit rule: %s", std::string(commit_rule).c_str()); } @@ -278,10 +270,8 @@ class JsonParser { error = file["file_deps"].get_array().get(file_deps_tmp); if (error) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " - << capio_global_configuration->node_name << " ] " - << "commit rule is on_file but no file_deps section found" - << std::endl; + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, + "commit rule is on_file but no file_deps section found"); ERR_EXIT("commit rule is on_file but no file_deps section found"); } @@ -290,18 +280,16 @@ class JsonParser { name_tmp = itm.get_string().value(); std::filesystem::path computed_path(name_tmp); if (computed_path.is_relative()) { - std::cout - << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " - << capio_global_configuration->node_name << " ] " - << "File : " << computed_path - << " IS RELATIVE! using cwd() of server to compute abs path." - << std::endl; + server_println( + CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, + "Path : " + std::string(computed_path) + + " IS RELATIVE! using cwd() of server to compute abs path."); computed_path = resolve_prexix / computed_path; } - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_JSON << " [ " - << capio_global_configuration->node_name << " ] " - << "Adding file: " << computed_path - << " to file dependencies: " << std::endl; + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_JSON, + "Adding file: " + std::string(computed_path) + + " to file dependencies: "); + file_deps.emplace_back(computed_path); } } @@ -329,17 +317,14 @@ class JsonParser { } LOG("batch_size: %d", batch_size); for (auto path : streaming_names) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_JSON << " [ " - << capio_global_configuration->node_name << " ] " - << " [ " << capio_global_configuration->node_name << " ] " - << "Updating metadata for path: " << path << std::endl; + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_JSON, + "Updating metadata for path: " + std::string(path)); if (path.is_relative()) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " - << capio_global_configuration->node_name << " ] " - << "Path : " << path - << " IS RELATIVE! using cwd() of server to compute abs path." - << std::endl; + server_println( + CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, + "Path : " + std::string(path) + + " IS RELATIVE! using cwd() of server to compute abs path."); path = resolve_prexix / path; } LOG("path: %s", path.c_str()); @@ -347,10 +332,10 @@ class JsonParser { // TODO: check for globs std::string commit(commit_rule), firerule(mode); if (n_files != -1) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_JSON << " [ " - << capio_global_configuration->node_name << " ] " - << "Setting path: " << path << " n_files to " << n_files - << std::endl; + + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_JSON, + "Setting path: " + std::string(path) + " n_files to " + + std::to_string(n_files)); locations->setDirectoryFileCount(path, n_files); } @@ -362,30 +347,26 @@ class JsonParser { } } - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_JSON << " [ " - << capio_global_configuration->node_name << " ] " - << "completed parsing of streaming section for app: " << app_name - << std::endl; + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_JSON, + "completed parsing of streaming section for app: " + + std::string(app_name)); LOG("completed parsing of streaming section for app: %s", std::string(app_name).c_str()); } // END PARSING STREAMING FILES - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_JSON << " [ " - << capio_global_configuration->node_name << " ] " << std::endl; + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_JSON, ""); } // END OF APP MAIN LOOPS LOG("Completed parsing of io_graph app main loops"); - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_JSON << " [ " - << capio_global_configuration->node_name << " ] " - << "Completed parsing of io_graph" << std::endl; + + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_JSON, "Completed parsing of io_graph"); LOG("Completed parsing of io_graph"); if (entries["permanent"].get_array().get(permanent_files)) { // PARSING PERMANENT FILES - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " - << capio_global_configuration->node_name << " ] " - << "No permanent section found for workflow: " - << capio_global_configuration->workflow_name << std::endl; + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, + "No permanent section found for workflow: " + + capio_global_configuration->workflow_name); LOG("No permanent section found for workflow: %s", - std::string(capio_global_configuration->workflow_name).c_str()); + capio_global_configuration->workflow_name.c_str()); } else { for (auto file : permanent_files) { std::string_view name; @@ -398,11 +379,9 @@ class JsonParser { std::filesystem::path path(name); if (path.is_relative()) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " - << capio_global_configuration->node_name << " ] " - << "Path : " << path - << " IS RELATIVE! using cwd() of server to compute abs path." - << std::endl; + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, + "Path : " + std::string(path) + + " IS RELATIVE! using cwd() of server to compute abs path."); path = resolve_prexix / path; } @@ -410,21 +389,17 @@ class JsonParser { // TODO: improve this locations->setPermanent(name.data(), true); } - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_JSON << " [ " - << capio_global_configuration->node_name << " ] " - << "Completed parsing of permanent files" << std::endl; + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_JSON, "Completed parsing of permanent files"); LOG("Completed parsing of permanent files"); } // END PARSING PERMANENT FILES - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_JSON << " [ " - << capio_global_configuration->node_name << " ] " << std::endl; + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_JSON, ""); if (entries["exclude"].get_array().get(exclude_files)) { // PARSING PERMANENT FILES - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " - << capio_global_configuration->node_name << " ] " - << "No exclude section found for workflow: " - << capio_global_configuration->workflow_name << std::endl; + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, + "No exclude section found for workflow: " + + capio_global_configuration->workflow_name); LOG("No exclude section found for workflow: %s", std::string(capio_global_configuration->workflow_name).c_str()); } else { @@ -439,84 +414,74 @@ class JsonParser { std::filesystem::path path(name); if (path.is_relative()) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " - << capio_global_configuration->node_name << " ] " - << "Path : " << path - << " IS RELATIVE! using cwd() of server to compute abs path." - << std::endl; + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, + "Path : " + std::string(path) + + " IS RELATIVE! using cwd() of server to compute abs path."); path = resolve_prexix / path; } // TODO: check for globs locations->setExclude(path, true); } - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_JSON << " [ " - << capio_global_configuration->node_name << " ] " - << "Completed parsing of exclude files" << std::endl; + + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_JSON, "Completed parsing of exclude files"); LOG("Completed parsing of exclude files"); } // END PARSING PERMANENT FILES - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_JSON << " [ " - << capio_global_configuration->node_name << " ] " << std::endl; + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_JSON, ""); auto home_node_policies = entries["home_node_policy"].error(); if (!home_node_policies) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " - << capio_global_configuration->node_name << " ] " - << "Warning: capio does not support home node policies yet! skipping section " - << std::endl; + server_println( + CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, + "Warning: capio does not support home node policies yet! skipping section "); } if (entries["storage"].get_object().get(storage_section)) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " - << capio_global_configuration->node_name << " ] " - << "No storage section found for workflow: " - << capio_global_configuration->workflow_name << std::endl; + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, + "No storage section found for workflow: " + + capio_global_configuration->workflow_name); LOG("No storage section found for workflow: %s", std::string(capio_global_configuration->workflow_name).c_str()); } else { if (storage_section["memory"].get_array().get(storage_memory)) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " - << capio_global_configuration->node_name << " ] " - << "No files listed in memory storage section for workflow: " - << capio_global_configuration->workflow_name << std::endl; + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, + "No files listed in memory storage section for workflow: " + + capio_global_configuration->workflow_name); LOG("No files listed in memory storage section for workflow: %s", std::string(capio_global_configuration->workflow_name).c_str()); } else { for (auto file : storage_memory) { std::string_view file_str; [[maybe_unused]] const auto error = file.get_string().get(file_str); - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_JSON << " [ " - << capio_global_configuration->node_name << " ] " - << "Setting file " << file_str << " to be stored in memory" - << std::endl; + + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_JSON, "Setting file " + + std::string(file_str) + + " to be stored in memory"); locations->setStoreFileInMemory(file_str); } } if (storage_section["fs"].get_array().get(storage_fs)) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " - << capio_global_configuration->node_name << " ] " - << "No files listed in fs storage section for workflow: " - << capio_global_configuration->workflow_name << std::endl; + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, + "No files listed in fs storage section for workflow: " + + capio_global_configuration->workflow_name); LOG("No files listed in fs storage section for workflow: %s", std::string(capio_global_configuration->workflow_name).c_str()); } else { for (auto file : storage_fs) { std::string_view file_str; [[maybe_unused]] const auto error = file.get_string().get(file_str); - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_JSON << " [ " - << capio_global_configuration->node_name << " ] " - << "Setting file " << file_str << " to be stored on file system" - << std::endl; + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_JSON, + "Setting file " + std::string(file_str) + + " to be stored on file system"); locations->setStoreFileInFileSystem(file_str); } } - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_JSON << " [ " - << capio_global_configuration->node_name << " ] " - << "Completed parsing of memory storage directives" << std::endl; + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_JSON, + "Completed parsing of memory storage directives"); } - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_JSON << std::endl; + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_JSON, ""); return locations; } diff --git a/src/server/capio_server.cpp b/src/server/capio_server.cpp index 43e35346e..7ae6adca4 100644 --- a/src/server/capio_server.cpp +++ b/src/server/capio_server.cpp @@ -42,10 +42,10 @@ int main(int argc, char **argv) { std::cout << CAPIO_LOG_SERVER_BANNER; - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << capio_global_configuration->node_name - << " ] " - << "Started server with PID: " << capio_global_configuration->CAPIO_SERVER_MAIN_PID - << std::endl; + + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, + "Started server with PID: " + + std::to_string(capio_global_configuration->CAPIO_SERVER_MAIN_PID)); char resolve_prefix[PATH_MAX]{0}; const std::string config_path = parseCLI(argc, argv, resolve_prefix); @@ -62,9 +62,7 @@ int main(int argc, char **argv) { capio_cl_engine->print(); - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << capio_global_configuration->node_name - << " ] server initialization completed!" << std::endl - << std::flush; + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, "server initialization completed!"); request_handlers_engine->start(); diff --git a/src/server/client-manager/client_manager.hpp b/src/server/client-manager/client_manager.hpp index 4671b5072..27298e098 100644 --- a/src/server/client-manager/client_manager.hpp +++ b/src/server/client-manager/client_manager.hpp @@ -20,9 +20,7 @@ class ClientManager { bufs_response = new std::unordered_map(); app_names = new std::unordered_map; files_created_by_producer = new std::unordered_map *>; - std::cout << CAPIO_SERVER_CLI_LOG_SERVER << " [ " << capio_global_configuration->node_name - << " ] " - << "ClientManager initialization completed." << std::endl; + server_println(CAPIO_SERVER_CLI_LOG_SERVER, "ClientManager initialization completed."); } ~ClientManager() { @@ -30,9 +28,7 @@ class ClientManager { delete bufs_response; delete app_names; delete files_created_by_producer; - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " - << capio_global_configuration->node_name << " ] " - << "buf_response cleanup completed" << std::endl; + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, "ClientManager cleanup completed."); } /** diff --git a/src/server/client-manager/handlers/handshake.hpp b/src/server/client-manager/handlers/handshake.hpp index a47693585..3c01d8bb5 100644 --- a/src/server/client-manager/handlers/handshake.hpp +++ b/src/server/client-manager/handlers/handshake.hpp @@ -17,26 +17,30 @@ inline void handshake_handler(const char *const str) { sscanf(str, "%d %d %s", &tid, &pid, app_name); START_LOG(gettid(), "call(tid=%ld, pid=%ld, app_name=%s)", tid, pid, app_name); - client_manager->register_client(app_name, tid); - storage_service->register_client(app_name, tid); - /* - * The handshake request must be blocking ONLY when not building tests. This is because when - * starting unit tests, the binary is loaded with libcapio_posix.so underneath thus performing - * a handshake request. If the handshake is blocking, then the capio_server binary cannot be - * started as the whole process is waiting for a handshake. - */ + if (!capio_global_configuration->termination_phase) { + client_manager->register_client(app_name, tid); + storage_service->register_client(app_name, tid); + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, + "Registered new app: " + std::string(app_name)); + /* + * The handshake request must be blocking ONLY when not building tests. This is because when + * starting unit tests, the binary is loaded with libcapio_posix.so underneath thus + * performing a handshake request. If the handshake is blocking, then the capio_server + * binary cannot be started as the whole process is waiting for a handshake. + */ #ifndef CAPIO_BUILD_TESTS - // If not on termination phase, return 1. Otherwise, return 0 - // if - is returned posix application will terminate - if (!termination_phase) { // Unlock client waiting to start LOG("Allowing handshake to continue"); client_manager->reply_to_client(tid, 1); +#endif } else { +#ifndef CAPIO_BUILD_TESTS LOG("Termination phase is in progress. ignoring further handshakes."); client_manager->reply_to_client(tid, 0); - } + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, + "Termination phase is in progress. ignoring further handshakes."); #endif + } } #endif // HANDSHAKE_HPP diff --git a/src/server/client-manager/request_handler_engine.hpp b/src/server/client-manager/request_handler_engine.hpp index e4744d33d..456a2bfc3 100644 --- a/src/server/client-manager/request_handler_engine.hpp +++ b/src/server/client-manager/request_handler_engine.hpp @@ -70,12 +70,10 @@ class RequestHandlerEngine { if (ec == std::errc()) { strcpy(str, ptr + 1); } else { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " - << capio_global_configuration->node_name << " ] " - << "Received invalid code: " << code << std::endl; - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " - << capio_global_configuration->node_name << " ] " - << "Offending request: " << ptr << " / " << req << std::endl; + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, + "Received invalid code: " + std::to_string(code)); + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, + "Offending request: " + std::string(ptr) + " / " + req); ERR_EXIT("Invalid request %d:%s", code, ptr); } return code; @@ -91,21 +89,15 @@ class RequestHandlerEngine { new CSBufRequest_t(SHM_COMM_CHAN_NAME, CAPIO_REQ_BUFF_CNT, CAPIO_REQ_MAX_SIZE, capio_global_configuration->workflow_name); - std::cout << CAPIO_SERVER_CLI_LOG_SERVER << " [ " << capio_global_configuration->node_name - << " ] " - << "RequestHandlerEngine initialization completed." << std::endl; + server_println(CAPIO_SERVER_CLI_LOG_SERVER, + "RequestHandlerEngine initialization completed."); } ~RequestHandlerEngine() { START_LOG(gettid(), "call()"); delete buf_requests; - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " - << capio_global_configuration->node_name << " ] " - << "buf_requests cleanup completed" << std::endl; - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " - << capio_global_configuration->node_name << " ] " - << "request_handlers_engine cleanup completed" << std::endl; + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, "RequestHandlerEngine cleanup completed."); } /** @@ -130,11 +122,9 @@ class RequestHandlerEngine { code = read_next_request(str.get()); } catch (const std::exception &e) { if (capio_global_configuration->termination_phase) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " - << capio_global_configuration->node_name << " ] " - << "Termination phase is in progress... Ignoring Exception likely " - "thrown while receiving SIGUSR1" - << std::endl; + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, + "Termination phase is in progress... " + "Ignoring Exception likely thrown while receiving SIGUSR1"); continue; } throw; diff --git a/src/server/communication-service/CapioCommunicationService.hpp b/src/server/communication-service/CapioCommunicationService.hpp index a0c9a9d4f..84348c7bb 100644 --- a/src/server/communication-service/CapioCommunicationService.hpp +++ b/src/server/communication-service/CapioCommunicationService.hpp @@ -24,52 +24,39 @@ class CapioCommunicationService { capio_global_configuration->node_name); if (backend_name == "MQTT" || backend_name == "MPI") { - std::cout - << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " - << capio_global_configuration->node_name << " ] " - << "Warn: selected backend is not yet officially supported. Setting backend to TCP" - << std::endl; + server_println( + CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, + "Warn: selected backend is not yet officially supported. Setting backend to TCP"); backend_name = "TCP"; } if (backend_name == "TCP" || backend_name == "UCX") { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " - << capio_global_configuration->node_name << " ] " - << "Selected backend is: " << backend_name << std::endl; - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " - << capio_global_configuration->node_name << " ] " - << "Selected backend port is: " << port << std::endl; + + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, "Selected backend is " + backend_name); + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, + "Selected backend port is " + std::to_string(port)); capio_backend = new MTCL_backend(backend_name, std::to_string(port), CAPIO_BACKEND_DEFAULT_SLEEP_TIME); } else if (backend_name == "FS") { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " - << capio_global_configuration->node_name << " ] " - << "Selected backend is File System" << std::endl; + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, "Selected backend is File System"); capio_backend = new NoBackend(); } else { START_LOG(gettid(), "call()"); - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " - << capio_global_configuration->node_name << " ] " - << "Provided communication backend " << backend_name << " is invalid" - << std::endl; + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, + "Provided communication backend " + backend_name + " is invalid"); ERR_EXIT("No valid backend was provided"); } if (control_plane_backend == "multicast") { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " - << capio_global_configuration->node_name << " ] " - << "Starting multicast control plane" << std::endl; + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, "Starting multicast control plane"); capio_control_plane = new MulticastControlPlane(port); } else { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " - << capio_global_configuration->node_name << " ] " - << "Starting file system control plane" << std::endl; + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, "Starting file system control plane"); capio_control_plane = new FSControlPlane(port); } - std::cout << CAPIO_SERVER_CLI_LOG_SERVER << " [ " << capio_global_configuration->node_name - << " ] " - << "CapioCommunicationService initialization completed." << std::endl; + server_println(CAPIO_SERVER_CLI_LOG_SERVER, + "CapioCommunicationService initialization completed."); } }; diff --git a/src/server/communication-service/control_plane/fs_control_plane.hpp b/src/server/communication-service/control_plane/fs_control_plane.hpp index 591288aae..716f0c970 100644 --- a/src/server/communication-service/control_plane/fs_control_plane.hpp +++ b/src/server/communication-service/control_plane/fs_control_plane.hpp @@ -26,8 +26,7 @@ class FSControlPlane : public CapioControlPlane { FilePort.close(); LOG("Saved self token info to FS"); - std::cout << CAPIO_SERVER_CLI_LOG_SERVER << " [ " << ownHostname << " ] " - << "Generated token at " << token_filename << std::endl; + server_println(CAPIO_SERVER_CLI_LOG_SERVER, "Generated token at " + token_filename); } void delete_aliveness_token() const { @@ -102,6 +101,8 @@ class FSControlPlane : public CapioControlPlane { token_used_to_connect_mutex = new std::mutex(); thread = new std::thread(fs_server_aliveness_detector_thread, std::ref(continue_execution), &token_used_to_connect, token_used_to_connect_mutex); + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, + "FSControlPlane initialization completed."); }; ~FSControlPlane() override { @@ -112,8 +113,7 @@ class FSControlPlane : public CapioControlPlane { delete continue_execution; delete token_used_to_connect_mutex; - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << ownHostname << " ] " - << "FSControlPlane correctly terminated" << std::endl; + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, "FSControlPlane cleanup completed."); } }; diff --git a/src/server/communication-service/control_plane/multicast_control_plane.hpp b/src/server/communication-service/control_plane/multicast_control_plane.hpp index 72d1c7236..fee8f6ed6 100644 --- a/src/server/communication-service/control_plane/multicast_control_plane.hpp +++ b/src/server/communication-service/control_plane/multicast_control_plane.hpp @@ -20,13 +20,12 @@ class MulticastControlPlane : public CapioControlPlane { int transmission_socket = socket(AF_INET, SOCK_DGRAM, 0); if (transmission_socket < 0) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " - << capio_global_configuration->node_name << " ] " - << "WARNING: unable to bind multicast socket: " << strerror(errno) - << std::endl; - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " - << capio_global_configuration->node_name << " ] " - << "Execution will continue only with FS discovery support" << std::endl; + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, + std::string("WARNING: unable to bind multicast socket: ") + + strerror(errno)); + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, + "Execution will continue only with FS discovery support"); + return; } @@ -40,10 +39,9 @@ class MulticastControlPlane : public CapioControlPlane { if (sendto(transmission_socket, message, strlen(message), 0, reinterpret_cast(&addr), sizeof(addr)) < 0) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " - << capio_global_configuration->node_name << " ] " - << "WARNING: unable to send alive token(" << message - << ") to multicast address!: " << strerror(errno) << std::endl; + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, + "WARNING: unable to send alive token(" + std::string(message) + + ") to multicast address!: " + strerror(errno)); } LOG("Sent multicast token"); close(transmission_socket); @@ -66,13 +64,11 @@ class MulticastControlPlane : public CapioControlPlane { int discovery_socket = socket(AF_INET, SOCK_DGRAM, 0); if (discovery_socket < 0) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " - << capio_global_configuration->node_name << " ] " - << "WARNING: unable to open multicast socket: " << strerror(errno) - << std::endl; - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " - << capio_global_configuration->node_name << " ] " - << "Execution will continue only with FS discovery support" << std::endl; + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, + std::string("WARNING: unable to open multicast socket: ") + + strerror(errno)); + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, + "Execution will continue only with FS discovery support"); return; } LOG("Created socket"); @@ -80,26 +76,23 @@ class MulticastControlPlane : public CapioControlPlane { if (setsockopt(discovery_socket, SOL_SOCKET, SO_REUSEADDR, (char *) &multiple_socket_on_same_address, sizeof(multiple_socket_on_same_address)) < 0) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " - << capio_global_configuration->node_name << " ] " - << "WARNING: unable to assign multiple sockets to same address: " - << strerror(errno) << std::endl; - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " - << capio_global_configuration->node_name << " ] " - << "Execution will continue only with FS discovery support" << std::endl; + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, + std::string("WARNING: unable to multiple sockets to same address: ") + + strerror(errno)); + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, + "Execution will continue only with FS discovery support"); return; } LOG("Set IP address to accept multiple sockets on same address"); if (setsockopt(discovery_socket, IPPROTO_IP, IP_MULTICAST_LOOP, &loopback, sizeof(loopback)) < 0) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " - << capio_global_configuration->node_name << " ] " - << "WARNING: unable to filter out loopback incoming messages: " - << strerror(errno) << std::endl; - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " - << capio_global_configuration->node_name << " ] " - << "Execution will continue only with FS discovery support" << std::endl; + server_println( + CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, + std::string("WARNING: unable to filter out loopback incoming messages: ") + + strerror(errno)); + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, + "Execution will continue only with FS discovery support"); return; } LOG("Disabled reception of loopback messages from socket"); @@ -114,13 +107,11 @@ class MulticastControlPlane : public CapioControlPlane { // bind to receive address if (bind(discovery_socket, reinterpret_cast(&addr), sizeof(addr)) < 0) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " - << capio_global_configuration->node_name << " ] " - << "WARNING: unable to bind multicast socket: " << strerror(errno) - << std::endl; - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " - << capio_global_configuration->node_name << " ] " - << "Execution will continue only with FS discovery support" << std::endl; + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, + std::string("WARNING: unable to bind multicast socket: ") + + strerror(errno)); + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, + "Execution will continue only with FS discovery support"); return; } LOG("Binded socket"); @@ -129,13 +120,11 @@ class MulticastControlPlane : public CapioControlPlane { mreq.imr_multiaddr.s_addr = inet_addr(MULTICAST_DISCOVERY_ADDR); mreq.imr_interface.s_addr = htonl(INADDR_ANY); if (setsockopt(discovery_socket, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)) < 0) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " - << capio_global_configuration->node_name << " ] " - << "WARNING: unable to join multicast group: " << strerror(errno) - << std::endl; - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " - << capio_global_configuration->node_name << " ] " - << "Execution will continue only with FS discovery support" << std::endl; + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, + std::string("WARNING: unable to join multicast group: ") + + strerror(errno)); + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, + "Execution will continue only with FS discovery support"); return; } LOG("Successfully joined multicast group"); @@ -152,14 +141,11 @@ class MulticastControlPlane : public CapioControlPlane { LOG("Received multicast data of size %ld and content %s", recv_sice, incomingMessage); if (recv_sice < 0) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " - << capio_global_configuration->node_name << " ] " - << "WARNING: received < 0 bytes from multicast: " << strerror(errno) - << std::endl; - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " - << capio_global_configuration->node_name << " ] " - << "Execution will continue only with FS discovery support" - << std::endl; + server_println( + CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, + std::string("WARNING: received 0 bytes from multicast socket: ")); + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, + "Execution will continue only with FS discovery support"); return; } } while (std::string(incomingMessage) == SELF_TOKEN); @@ -167,9 +153,8 @@ class MulticastControlPlane : public CapioControlPlane { std::lock_guard lg(*token_used_to_connect_mutex); if (std::find(token_used_to_connect->begin(), token_used_to_connect->end(), incomingMessage) == token_used_to_connect->end()) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " - << capio_global_configuration->node_name << " ] " - << "Multicast adv: " << incomingMessage << std::endl; + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, + "Multicast adv: " + std::string(incomingMessage)); LOG("Received message: %s", incomingMessage); token_used_to_connect->push_back(incomingMessage); capio_backend->connect_to(incomingMessage); @@ -187,9 +172,10 @@ class MulticastControlPlane : public CapioControlPlane { thread = new std::thread(multicast_server_aliveness_thread, std::ref(continue_execution), &token_used_to_connect, token_used_to_connect_mutex, backend_port); - std::cout << CAPIO_SERVER_CLI_LOG_SERVER << " [ " << ownHostname << " ] " - << "Multicast discovery service @ " << MULTICAST_DISCOVERY_ADDR << ":" - << MULTICAST_DISCOVERY_PORT << std::endl; + + server_println(CAPIO_SERVER_CLI_LOG_SERVER, std::string("Multicast discovery service @ ") + + MULTICAST_DISCOVERY_ADDR + ":" + + std::to_string(MULTICAST_DISCOVERY_PORT)); } ~MulticastControlPlane() override { @@ -199,8 +185,7 @@ class MulticastControlPlane : public CapioControlPlane { delete token_used_to_connect_mutex; delete thread; delete continue_execution; - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << ownHostname << " ] " - << "MulticastControlPlane correctly terminated" << std::endl; + server_println(CAPIO_SERVER_CLI_LOG_SERVER, "MulticastControlPlane correctly terminated"); } }; diff --git a/src/server/communication-service/data_plane/MTCL_backend.hpp b/src/server/communication-service/data_plane/MTCL_backend.hpp index 05e4f2296..2a8da8f55 100644 --- a/src/server/communication-service/data_plane/MTCL_backend.hpp +++ b/src/server/communication-service/data_plane/MTCL_backend.hpp @@ -186,8 +186,8 @@ class MTCL_backend : public BackendInterface { char connected_hostname[HOST_NAME_MAX] = {0}; UserManager.receive(connected_hostname, HOST_NAME_MAX); - std::cout << CAPIO_SERVER_CLI_LOG_SERVER << " [ " << ownHostname << " ] " - << "Connected from " << connected_hostname << std::endl; + server_println(CAPIO_SERVER_CLI_LOG_SERVER, + std::string("Connected from ") + connected_hostname); LOG("Received connection hostname: %s", connected_hostname); @@ -224,8 +224,7 @@ class MTCL_backend : public BackendInterface { LOG("Trying to connect on remote: %s", remoteToken.c_str()); if (auto UserManager = MTCL::Manager::connect(remoteToken); UserManager.isValid()) { - std::cout << CAPIO_SERVER_CLI_LOG_SERVER << " [ " << ownHostname << " ] " - << "Connected to " << remoteToken << std::endl; + server_println(CAPIO_SERVER_CLI_LOG_SERVER, std::string("Connected to ") + remoteToken); LOG("Connected to: %s", remoteToken.c_str()); UserManager.send(ownHostname, HOST_NAME_MAX); const std::lock_guard lg(*_guard); @@ -239,9 +238,10 @@ class MTCL_backend : public BackendInterface { server_connection_handler, std::move(UserManager), remoteHost.c_str(), thread_sleep_times, connection_tuple, terminate, TO_REMOTE)); } else { - std::cout << CAPIO_SERVER_CLI_LOG_SERVER_WARNING << " [ " << ownHostname << " ] " - << "Warning: found token " << remoteHost << ".alive_token" - << ", but connection is not valid" << std::endl; + + server_println(CAPIO_SERVER_CLI_LOG_SERVER_WARNING, + "Warning: found token " + std::string(remoteHost) + + ".alive_token, but connection is not valid"); } } @@ -269,8 +269,7 @@ class MTCL_backend : public BackendInterface { th = new std::thread(incoming_connection_listener, std::ref(continue_execution), sleep_time, &connected_hostnames_map, _guard, &connection_threads, terminate); - std::cout << CAPIO_SERVER_CLI_LOG_SERVER << " [ " << ownHostname << " ] " - << "MTCL data plane initialization completed." << std::endl; + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, "MTCL_backend initialization completed."); } ~MTCL_backend() override { @@ -293,8 +292,7 @@ class MTCL_backend : public BackendInterface { MTCL::Manager::finalize(); LOG("Finalizing MTCL backend"); - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " << ownHostname << " ] " - << "MTCL backend correctly terminated" << std::endl; + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, "MTCL_backend cleanup completed."); } std::string receive(char *buf, capio_off64_t *buf_size, capio_off64_t *start_offset) override { @@ -353,7 +351,7 @@ class MTCL_backend : public BackendInterface { LOG("Pushing Transport unit to out queue"); out->push(outputUnit); } else { - std::cout << "can't find target" << std::endl; + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, "can't find target"); } } diff --git a/src/server/file-manager/file_manager.hpp b/src/server/file-manager/file_manager.hpp index 58177da8a..6d664e9b0 100644 --- a/src/server/file-manager/file_manager.hpp +++ b/src/server/file-manager/file_manager.hpp @@ -25,10 +25,12 @@ class CapioFileManager { public: CapioFileManager() { START_LOG(gettid(), "call()"); - std::cout << CAPIO_SERVER_CLI_LOG_SERVER << " [ " << capio_global_configuration->node_name << " ] " - << "CapioFileManager initialization completed." << std::endl; + server_println(CAPIO_SERVER_CLI_LOG_SERVER, "CapioFileManager initialization completed."); + } + ~CapioFileManager() { + START_LOG(gettid(), "call()"); + server_println(CAPIO_SERVER_CLI_LOG_SERVER, "CapioFileManager cleanup completed."); } - ~CapioFileManager() { START_LOG(gettid(), "call()"); } static uintmax_t get_file_size_if_exists(const std::filesystem::path &path); static std::string getMetadataPath(const std::string &path); diff --git a/src/server/file-manager/fs_monitor.hpp b/src/server/file-manager/fs_monitor.hpp index e80aa0f2c..f879a91c3 100644 --- a/src/server/file-manager/fs_monitor.hpp +++ b/src/server/file-manager/fs_monitor.hpp @@ -75,9 +75,8 @@ class FileSystemMonitor { START_LOG(gettid(), "call()"); *continue_execution = true; th = new std::thread(_main, std::ref(continue_execution)); - std::cout << CAPIO_SERVER_CLI_LOG_SERVER << " [ " << capio_global_configuration->node_name - << " ] " - << "CapioFileSystemMonitor initialization completed." << std::endl; + server_println(CAPIO_SERVER_CLI_LOG_SERVER, + "CapioFileSystemMonitor initialization completed."); } ~FileSystemMonitor() { @@ -91,9 +90,7 @@ class FileSystemMonitor { delete th; delete continue_execution; - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " - << capio_global_configuration->node_name << " ] " - << "fs_monitor cleanup completed" << std::endl; + server_println(CAPIO_SERVER_CLI_LOG_SERVER, "CapioFileSystemMonitor cleanup completed."); } }; diff --git a/src/server/storage-service/capio_storage_service.hpp b/src/server/storage-service/capio_storage_service.hpp index 6a7017a9c..2481d921d 100644 --- a/src/server/storage-service/capio_storage_service.hpp +++ b/src/server/storage-service/capio_storage_service.hpp @@ -38,9 +38,8 @@ class CapioStorageService { _threads_waiting_for_memory_data = new std::unordered_map>>; - std::cout << CAPIO_SERVER_CLI_LOG_SERVER << " [ " << capio_global_configuration->node_name - << " ] " - << "CapioStorageService initialization completed." << std::endl; + server_println(CAPIO_SERVER_CLI_LOG_SERVER, + "CapioStorageService initialization completed."); } ~CapioStorageService() { @@ -49,6 +48,7 @@ class CapioStorageService { delete _client_to_server_queue; delete _server_to_client_queue; delete _threads_waiting_for_memory_data; + server_println(CAPIO_SERVER_CLI_LOG_SERVER, "CapioStorageService cleanup completed."); } void createMemoryFile(const std::string &file_name) const { diff --git a/src/server/utils/configuration.hpp b/src/server/utils/configuration.hpp index 2a4ed991c..5eb6f720f 100644 --- a/src/server/utils/configuration.hpp +++ b/src/server/utils/configuration.hpp @@ -1,6 +1,8 @@ #ifndef CAPIO_CONFIGURATION_HPP #define CAPIO_CONFIGURATION_HPP +#include "capio/constants.hpp" + #include #include @@ -16,13 +18,20 @@ class CapioGlobalConfiguration { char node_name[HOST_NAME_MAX]{0}; CapioGlobalConfiguration() { - termination_phase = false; - StoreOnlyInMemory = false; gethostname(node_name, HOST_NAME_MAX); + termination_phase = false; + StoreOnlyInMemory = false; CAPIO_SERVER_MAIN_PID = gettid(); + workflow_name = CAPIO_DEFAULT_WORKFLOW_NAME; } }; inline auto capio_global_configuration = new CapioGlobalConfiguration(); +inline void server_println(const std::string &message_type, const std::string &message_line) { + std::cout << message_type << " " << capio_global_configuration->node_name << "] " + << message_line << std::endl + << std::flush; +} + #endif // CAPIO_CONFIGURATION_HPP diff --git a/src/server/utils/parser.hpp b/src/server/utils/parser.hpp index f455a0d2f..d3ba27da2 100644 --- a/src/server/utils/parser.hpp +++ b/src/server/utils/parser.hpp @@ -63,27 +63,25 @@ std::string parseCLI(int argc, char **argv, char *resolve_prefix) { continue_on_error = true; std::cout << CAPIO_LOG_SERVER_CLI_CONT_ON_ERR_WARNING << std::endl; #else - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING - << "--continue-on-error flag given, but logger is not compiled into CAPIO. Flag " - "is ignored." - << std::endl; + + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, + "--continue-on-error flag given, but logger is not compiled into CAPIO. " + "Flag is ignored."); #endif } if (memStorageOnly) { capio_global_configuration->StoreOnlyInMemory = true; - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " - << capio_global_configuration->node_name << " ] " - << "All files will be stored in memory whenever possible." << std::endl; + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, + "All files will be stored in memory whenever possible."); } if (logfile_folder) { #ifdef CAPIO_LOG log_master_dir_name = args::get(logfile_folder); #else - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING - << "Capio logfile folder, but logging capabilities not compiled into capio!" - << std::endl; + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, + "Capio logfile folder, but logging capabilities not compiled into capio!"); #endif } @@ -97,57 +95,46 @@ std::string parseCLI(int argc, char **argv, char *resolve_prefix) { } logfile_prefix = token; #else - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING - << "Capio logfile provided, but logging capabilities not compiled into capio!" - << std::endl; + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, + "Capio logfile provided, but logging capabilities not compiled into capio!"); #endif } #ifdef CAPIO_LOG auto logname = open_server_logfile(); log = new Logger(__func__, __FILE__, __LINE__, gettid(), "Created new log file"); - std::cout << CAPIO_SERVER_CLI_LOG_SERVER << " [ " << capio_global_configuration->node_name - << " ] " - << "started logging to logfile " << logname << std::endl; + server_println(CAPIO_SERVER_CLI_LOG_SERVER, "started logging to logfile " + logname.string()); #endif if (config) { std::string token = args::get(config); - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " - << capio_global_configuration->node_name << " ] " - << "parsing config file: " << token << std::endl; + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, "parsing config file: " + token); // TODO: pass config file path } else if (noConfigFile) { capio_global_configuration->workflow_name = std::string_view(get_capio_workflow_name()); - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " - << capio_global_configuration->node_name << " ] " - << "skipping config file parsing." << std::endl - << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " - << capio_global_configuration->node_name << " ] " - << "Obtained from environment variable current workflow name: " - << capio_global_configuration->workflow_name.data() << std::endl; + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, "skipping config file parsing."); + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, + "Obtained from environment variable current workflow name: " + + capio_global_configuration->workflow_name); } else { START_LOG(gettid(), "call()"); - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " - << capio_global_configuration->node_name << " ] " - << "Error: no config file provided. To skip config file use --no-config option!" - << std::endl; + server_println( + CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, + "Error: no config file provided. To skip config file use --no-config option!"); ERR_EXIT("no config file provided, and --no-config not provided"); } #ifdef CAPIO_LOG CAPIO_LOG_LEVEL = get_capio_log_level(); - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << capio_global_configuration->node_name - << " ] " - << "LOG_LEVEL set to: " << CAPIO_LOG_LEVEL << std::endl; + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, + "LOG_LEVEL set to: " + std::to_string(CAPIO_LOG_LEVEL)); std::cout << CAPIO_LOG_SERVER_CLI_LOGGING_ENABLED_WARNING; log->log("LOG_LEVEL set to: %d", CAPIO_LOG_LEVEL); delete log; #else if (std::getenv("CAPIO_LOG_LEVEL") != nullptr) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " - << capio_global_configuration->node_name << " ] " - << CAPIO_LOG_SERVER_CLI_LOGGING_NOT_AVAILABLE << std::endl; + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, + CAPIO_LOG_SERVER_CLI_LOGGING_NOT_AVAILABLE); } #endif @@ -160,42 +147,35 @@ std::string parseCLI(int argc, char **argv, char *resolve_prefix) { port = args::get(backend_port); } - std::string constrol_backend_name = "multicast"; + std::string control_backend_name = "multicast"; if (controlPlaneBackend) { auto tmp = args::get(controlPlaneBackend); if (tmp != "multicast" && tmp != "fs") { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " - << capio_global_configuration->node_name << " ] " - << "Unknown control plane backend " << tmp << std::endl; + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, + "Unknown control plane backend " + tmp); } else { - constrol_backend_name = tmp; + control_backend_name = tmp; } } - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " - << capio_global_configuration->node_name << " ] " - << "Using control plane backend: " << constrol_backend_name << std::endl; + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, + "Using control plane backend: " + control_backend_name); capio_communication_service = - new CapioCommunicationService(backend_name, port, constrol_backend_name); + new CapioCommunicationService(backend_name, port, control_backend_name); } else { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " - << capio_global_configuration->node_name << " ] " - << "Selected backend is File System" << std::endl; + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, "Selected backend is File System"); capio_backend = new NoBackend(); } if (capio_cl_resolve_path) { auto path = args::get(capio_cl_resolve_path); memcpy(resolve_prefix, path.c_str(), PATH_MAX); - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " - << capio_global_configuration->node_name << " ] " - << "CAPIO-CL relative file prefix: " << resolve_prefix << std::endl; + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, "CAPIO-CL relative file prefix: " + path); } else { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " - << capio_global_configuration->node_name << " ] " - << "No CAPIO-CL resolve file prefix provided" << std::endl; + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, + "No CAPIO-CL resolve file prefix provided"); } if (config) { diff --git a/src/server/utils/signals.hpp b/src/server/utils/signals.hpp index 6f5696ab5..b5e9f2c4c 100644 --- a/src/server/utils/signals.hpp +++ b/src/server/utils/signals.hpp @@ -24,15 +24,10 @@ inline void sig_term_handler(int signum, siginfo_t *info, void *ptr) { START_LOG(gettid(), "call(signal=[%d] (%s) from process with pid=%ld)", signum, strsignal(signum), info != nullptr ? info->si_pid : -1); - std::cout << std::endl - << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " - << capio_global_configuration->node_name << " ] " - << "shutting down server" << std::endl; + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, "shutting down server"); if (signum == SIGSEGV) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_ERROR << " [ " - << capio_global_configuration->node_name << " ] " - << "Segfault detected!" << std::endl; + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, "Segfault detected!"); } #ifdef CAPIO_COVERAGE @@ -43,9 +38,8 @@ inline void sig_term_handler(int signum, siginfo_t *info, void *ptr) { delete fs_monitor; delete capio_communication_service; delete shm_canary; - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << " [ " << capio_global_configuration->node_name - << " ] " - << "Bye!" << std::endl; + + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, "Bye!"); exit(EXIT_SUCCESS); } @@ -55,9 +49,8 @@ inline void sig_usr1_handler(int signum, siginfo_t *info, void *ptr) { } START_LOG(gettid(), "call(signal=[%d] (%s) from process with pid=%ld)", signum, strsignal(signum), info != nullptr ? info->si_pid : -1); - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_WARNING << " [ " - << capio_global_configuration->node_name << " ] " - << "Received request for graceful shutdown!" << std::endl; + + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, "Received request for graceful shutdown!"); capio_global_configuration->termination_phase = true; }