From d97f78b696174343237aec58349d977c9b4f8519 Mon Sep 17 00:00:00 2001 From: Marco Edoardo Santimaria Date: Mon, 1 Sep 2025 10:03:04 +0200 Subject: [PATCH 01/16] Create event in communication service --- .../control-plane/capio_control_plane.hpp | 2 +- .../include/storage-service/capio_file.hpp | 8 +++--- .../storage-service/capio_storage_service.hpp | 11 ++++---- .../src/client-manager/handlers/open.cpp | 2 +- .../control-plane/multicast_control_plane.cpp | 25 ++++++++++++++----- .../src/file-manager/file_manager.cpp | 2 +- .../src/storage-service/capio_remote_file.cpp | 2 +- .../storage-service/capio_storage_service.cpp | 5 ++-- capio-tests/multinode/backend/CMakeLists.txt | 16 +++++++++++- 9 files changed, 52 insertions(+), 21 deletions(-) diff --git a/capio-server/include/communication-service/control-plane/capio_control_plane.hpp b/capio-server/include/communication-service/control-plane/capio_control_plane.hpp index ed369d71b..11c83d9ce 100644 --- a/capio-server/include/communication-service/control-plane/capio_control_plane.hpp +++ b/capio-server/include/communication-service/control-plane/capio_control_plane.hpp @@ -4,7 +4,7 @@ class CapioControlPlane { public: - typedef enum { CREATE, DELETE, WRITE } event_type; + typedef enum { CREATE, WRITE, CLOSE, COMMIT } event_type; virtual ~CapioControlPlane() = default; diff --git a/capio-server/include/storage-service/capio_file.hpp b/capio-server/include/storage-service/capio_file.hpp index 080fa9efb..3e66ba4b1 100644 --- a/capio-server/include/storage-service/capio_file.hpp +++ b/capio-server/include/storage-service/capio_file.hpp @@ -10,11 +10,13 @@ class CapioFile { protected: - const std::string fileName; + const std::string fileName, homeNode; std::size_t totalSize; public: - explicit CapioFile(const std::string &filePath) : fileName(filePath), totalSize(0) {}; + explicit CapioFile(const std::string &filePath, + const std::string &home_node = capio_global_configuration->node_name) + : fileName(filePath), homeNode(home_node), totalSize(0) {}; virtual ~CapioFile() = default; [[nodiscard]] std::size_t getSize() const { @@ -131,7 +133,7 @@ class CapioMemoryFile : public CapioFile { class CapioRemoteFile : public CapioFile { public: - explicit CapioRemoteFile(const std::string &filePath); + explicit CapioRemoteFile(const std::string &filePath, const std::string &home_node); ~CapioRemoteFile() override; diff --git a/capio-server/include/storage-service/capio_storage_service.hpp b/capio-server/include/storage-service/capio_storage_service.hpp index 4dd234b86..c0ed835e2 100644 --- a/capio-server/include/storage-service/capio_storage_service.hpp +++ b/capio-server/include/storage-service/capio_storage_service.hpp @@ -33,8 +33,9 @@ class CapioStorageService { * Create a CapioRemoteFile, after checking that an instance of CapioMemoryFile (meaning a local * file) is not present * @param file_name file path + * @param home_node */ - void createRemoteFile(const std::string &file_name) const; + void createRemoteFile(const std::string &file_name, const std::string &home_node) const; void deleteFile(const std::string &file_name) const; @@ -69,7 +70,7 @@ class CapioStorageService { * @param app_name * @param pid */ - void register_client(const std::string &app_name, const pid_t pid) const; + void register_client(const std::string &app_name, pid_t pid) const; /** * Send the file content to a client application @@ -87,7 +88,7 @@ class CapioStorageService { * @param data * @param len */ - void reply_to_client_raw(pid_t pid, const char *data, const capio_off64_t len) const; + void reply_to_client_raw(pid_t pid, const char *data, capio_off64_t len) const; /** * Receive the file content from the client application @@ -105,9 +106,9 @@ class CapioStorageService { * @param pid * @return */ - [[nodiscard]] size_t sendFilesToStoreInMemory(const long pid) const; + [[nodiscard]] size_t sendFilesToStoreInMemory(long pid) const; - void remove_client(const pid_t pid) const; + void remove_client(pid_t pid) const; }; inline CapioStorageService *storage_service; diff --git a/capio-server/src/client-manager/handlers/open.cpp b/capio-server/src/client-manager/handlers/open.cpp index c8993eba2..23f3f1eeb 100644 --- a/capio-server/src/client-manager/handlers/open.cpp +++ b/capio-server/src/client-manager/handlers/open.cpp @@ -27,7 +27,7 @@ void open_handler(const char *const str) { * At this point, the file that needs to be created more likely than not is not local to the * machine. As such, we call the creation of a new CapioRemoteFile */ - storage_service->createRemoteFile(path); + storage_service->createRemoteFile(path, {}); return; } diff --git a/capio-server/src/communication-service/control-plane/multicast_control_plane.cpp b/capio-server/src/communication-service/control-plane/multicast_control_plane.cpp index 493d287b8..4fd22f5c2 100644 --- a/capio-server/src/communication-service/control-plane/multicast_control_plane.cpp +++ b/capio-server/src/communication-service/control-plane/multicast_control_plane.cpp @@ -1,4 +1,6 @@ +#include "include/storage-service/capio_storage_service.hpp" #include "multicast_utils.hpp" + #include #include #include @@ -82,10 +84,9 @@ void MulticastControlPlane::multicast_control_plane_incoming_thread( reinterpret_cast(&addr), &addrlen); LOG("Received multicast data of size %ld and content %s", recv_sice, incoming_msg); if (recv_sice < 0) { - server_println(CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, - std::string("WARNING: received 0 bytes from multicast socket: ")); - server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, - "Execution will continue only with FS discovery support"); + LOG("WARNING: received size less than zero. An error might have occurred: %s", + strerror(errno)); + LOG("Skipping iteration and returning to listening for incoming paxkets"); continue; } @@ -96,12 +97,24 @@ void MulticastControlPlane::multicast_control_plane_incoming_thread( sscanf(incoming_msg, "%d %s %s", reinterpret_cast(&event), source_hostname, source_path); + LOG("event=%d, source:%s, path=%s", event, source_path, incoming_msg); + if (strcmp(capio_global_configuration->node_name, source_hostname) == 0) { continue; } - server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, - "Received control message: " + std::string(incoming_msg)); + switch (event) { + case CREATE: + LOG("Handling remote CREATE event"); + storage_service->createRemoteFile(source_path, source_hostname); + break; + + default: + LOG("WARNING: unknown / unhandled event: %s", incoming_msg); + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, + "Unknown/Unhandled message recived: " + std::string(incoming_msg)); + } + LOG("Completed handling of event"); } close(discovery_socket); diff --git a/capio-server/src/file-manager/file_manager.cpp b/capio-server/src/file-manager/file_manager.cpp index 6439b4f52..04ee4d1e1 100644 --- a/capio-server/src/file-manager/file_manager.cpp +++ b/capio-server/src/file-manager/file_manager.cpp @@ -70,7 +70,7 @@ void CapioFileManager::_unlockThreadAwaitingCreation(const std::string &path, * Here we need to create a new remote file, as it might be that the file is not * produced by this node but by another remote one */ - storage_service->createRemoteFile(path); + storage_service->createRemoteFile(path, {}); } } diff --git a/capio-server/src/storage-service/capio_remote_file.cpp b/capio-server/src/storage-service/capio_remote_file.cpp index c635b039e..d2e7e50f2 100644 --- a/capio-server/src/storage-service/capio_remote_file.cpp +++ b/capio-server/src/storage-service/capio_remote_file.cpp @@ -1,7 +1,7 @@ #include -CapioRemoteFile::CapioRemoteFile(const std::string &filePath) : CapioFile(filePath) {} +CapioRemoteFile::CapioRemoteFile(const std::string &filePath, const std::string& home_node) : CapioFile(filePath) {} CapioRemoteFile::~CapioRemoteFile() {} diff --git a/capio-server/src/storage-service/capio_storage_service.cpp b/capio-server/src/storage-service/capio_storage_service.cpp index f55ac94b7..80256338c 100644 --- a/capio-server/src/storage-service/capio_storage_service.cpp +++ b/capio-server/src/storage-service/capio_storage_service.cpp @@ -37,7 +37,8 @@ void CapioStorageService::createMemoryFile(const std::string &file_name) const { _stored_files->emplace(file_name, new CapioMemoryFile(file_name)); } -void CapioStorageService::createRemoteFile(const std::string &file_name) const { +void CapioStorageService::createRemoteFile(const std::string &file_name, + const std::string &home_node) const { /* * First we check that the file associate does not yet exists, as it might be produced * by another app running under the same server instance. if it is not found, we create @@ -46,7 +47,7 @@ void CapioStorageService::createRemoteFile(const std::string &file_name) const { START_LOG(gettid(), "call(file_name=%s)", file_name.c_str()); if (_stored_files->find(file_name) == _stored_files->end()) { LOG("File not found. Creating a new remote file"); - _stored_files->emplace(file_name, new CapioRemoteFile(file_name)); + _stored_files->emplace(file_name, new CapioRemoteFile(file_name, home_node)); } } diff --git a/capio-tests/multinode/backend/CMakeLists.txt b/capio-tests/multinode/backend/CMakeLists.txt index 636a35359..bf7ccb1a0 100644 --- a/capio-tests/multinode/backend/CMakeLists.txt +++ b/capio-tests/multinode/backend/CMakeLists.txt @@ -24,11 +24,25 @@ target_include_directories(${TARGET_NAME} PRIVATE ) file(GLOB_RECURSE CAPIO_SERVER_SOURCES - "${CMAKE_SOURCE_DIR}/capio-server/src/communication-service/*.cpp" + "${CMAKE_SOURCE_DIR}/capio-server/*.cpp" ) + file(GLOB_RECURSE CAPIO_SERVER_HEADERS "${TARGET_INCLUDE_FOLDER}/*.hpp") +list(FILTER CAPIO_SERVER_SOURCES EXCLUDE REGEX + ".*/json_parser\\.cpp$" +) + +list(FILTER CAPIO_SERVER_SOURCES EXCLUDE REGEX + ".*/capio_server\\.cpp$" +) + +list(FILTER CAPIO_SERVER_SOURCES EXCLUDE REGEX + ".*/command_line_parser\\.cpp$" +) + + target_sources(${TARGET_NAME} PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/src/main.cpp ${CAPIO_SERVER_SOURCES} From fccc53aecacb49888bd85cf5d05eb3e089c85023 Mon Sep 17 00:00:00 2001 From: Marco Edoardo Santimaria Date: Tue, 2 Sep 2025 11:04:27 +0200 Subject: [PATCH 02/16] Changed read mem handler pending backend complete rewrite --- .../storage-service/capio_storage_service.hpp | 5 +-- .../src/client-manager/handlers/read.cpp | 35 +++++-------------- .../src/storage-service/capio_remote_file.cpp | 6 +++- .../storage-service/capio_storage_service.cpp | 4 +-- 4 files changed, 18 insertions(+), 32 deletions(-) diff --git a/capio-server/include/storage-service/capio_storage_service.hpp b/capio-server/include/storage-service/capio_storage_service.hpp index c0ed835e2..1e024e35f 100644 --- a/capio-server/include/storage-service/capio_storage_service.hpp +++ b/capio-server/include/storage-service/capio_storage_service.hpp @@ -78,9 +78,10 @@ class CapioStorageService { * @param file * @param offset * @param size + * @return size sent to client */ - void reply_to_client(pid_t pid, const std::string &file, capio_off64_t offset, - capio_off64_t size) const; + size_t reply_to_client(pid_t pid, const std::string &file, capio_off64_t offset, + capio_off64_t size) const; /** * Send raw data to client without fetching from the storage manager itself diff --git a/capio-server/src/client-manager/handlers/read.cpp b/capio-server/src/client-manager/handlers/read.cpp index 6dfe34b57..7616645a3 100644 --- a/capio-server/src/client-manager/handlers/read.cpp +++ b/capio-server/src/client-manager/handlers/read.cpp @@ -58,35 +58,16 @@ void read_mem_handler(const char *const str) { tid, read_begin_offset, read_size, client_cache_line_size, use_cache ? "true" : "false", path); - if (storage_service->sizeOf(path) < read_begin_offset + read_size && - !file_manager->isCommitted(path)) { - LOG("File is not yet ready to be consumed as there is not enough data, and is not " - "committed"); - storage_service->addThreadWaitingForData(tid, path, read_begin_offset, read_size); - return; - } - - capio_off64_t size_to_send = storage_service->sizeOf(path); - if (use_cache) { - LOG("Computing size of data to send: minimum between:"); - LOG("client_cache_line_size: %llu", client_cache_line_size); - LOG("file_size:%llu - read_begin_offset=%llu = %llu", size_to_send, read_begin_offset, - size_to_send - read_begin_offset); - size_to_send = std::min({client_cache_line_size, (size_to_send - read_begin_offset)}); - } + capio_off64_t size_to_send = std::min({client_cache_line_size, read_size}); + LOG("Will try to send up to %ld bytes", size_to_send); + auto size_sent = storage_service->reply_to_client(tid, path, read_begin_offset, size_to_send); LOG("Sending to posix app the offset up to which read."); - if (file_manager->isCommitted(path) && - read_begin_offset + size_to_send >= storage_service->sizeOf(path)) { - LOG("File is committed, and end of read >= than file size." - " signaling it to posix application by setting offset MSB to 1"); - LOG("Sending offset: %llu", 0x8000000000000000 | size_to_send); - client_manager->reply_to_client(tid, 0x8000000000000000 | size_to_send); - } else { - LOG("File is not committed. Sending offset: %llu", size_to_send); - client_manager->reply_to_client(tid, size_to_send); + if (file_manager->isCommitted(path)) { + LOG("File is committed, setting MSB to 1"); + size_sent |= 0x8000000000000000; } - LOG("Need to sent to client %llu bytes, asking storage service to send data", size_to_send); - storage_service->reply_to_client(tid, path, read_begin_offset, size_to_send); + LOG("Telling client to read %ld bytes", size_sent); + client_manager->reply_to_client(tid, size_sent); } diff --git a/capio-server/src/storage-service/capio_remote_file.cpp b/capio-server/src/storage-service/capio_remote_file.cpp index d2e7e50f2..558816b4a 100644 --- a/capio-server/src/storage-service/capio_remote_file.cpp +++ b/capio-server/src/storage-service/capio_remote_file.cpp @@ -1,7 +1,11 @@ +#include "include/communication-service/data-plane/backend_interface.hpp" +#include "include/storage-service/capio_storage_service.hpp" + #include -CapioRemoteFile::CapioRemoteFile(const std::string &filePath, const std::string& home_node) : CapioFile(filePath) {} +CapioRemoteFile::CapioRemoteFile(const std::string &filePath, const std::string &home_node) + : CapioFile(filePath) {} CapioRemoteFile::~CapioRemoteFile() {} diff --git a/capio-server/src/storage-service/capio_storage_service.cpp b/capio-server/src/storage-service/capio_storage_service.cpp index 80256338c..c17ef7df3 100644 --- a/capio-server/src/storage-service/capio_storage_service.cpp +++ b/capio-server/src/storage-service/capio_storage_service.cpp @@ -101,12 +101,12 @@ void CapioStorageService::register_client(const std::string &app_name, const pid LOG("Created communication queues"); } -void CapioStorageService::reply_to_client(pid_t pid, const std::string &file, capio_off64_t offset, +size_t CapioStorageService::reply_to_client(pid_t pid, const std::string &file, capio_off64_t offset, capio_off64_t size) const { START_LOG(gettid(), "call(pid=%llu, file=%s, offset=%llu, size=%llu)", pid, file.c_str(), offset, size); - getFile(file)->writeToQueue(*_server_to_client_queue->at(pid), offset, size); + return getFile(file)->writeToQueue(*_server_to_client_queue->at(pid), offset, size); } void CapioStorageService::reply_to_client_raw(pid_t pid, const char *data, From 0025d1ce3d95a262604dc01e366ec0b9eafdf508 Mon Sep 17 00:00:00 2001 From: Marco Edoardo Santimaria Date: Tue, 2 Sep 2025 14:06:21 +0200 Subject: [PATCH 03/16] Began rewrite of capio_server backend --- .../data-plane/backend_interface.hpp | 46 ++-- .../data-plane/mtcl_backend.hpp | 54 +++-- .../data-plane/transport_unit.hpp | 22 -- .../data-plane/mtcl_backend.cpp | 212 ++++-------------- 4 files changed, 94 insertions(+), 240 deletions(-) delete mode 100644 capio-server/include/communication-service/data-plane/transport_unit.hpp diff --git a/capio-server/include/communication-service/data-plane/backend_interface.hpp b/capio-server/include/communication-service/data-plane/backend_interface.hpp index df0e26031..0d16f9775 100644 --- a/capio-server/include/communication-service/data-plane/backend_interface.hpp +++ b/capio-server/include/communication-service/data-plane/backend_interface.hpp @@ -3,6 +3,7 @@ #include "capio/constants.hpp" #include +#include #include #include #include @@ -17,6 +18,9 @@ class NotImplementedBackendMethod : public std::exception { }; class BackendInterface { + protected: + typedef enum { FETCH_FROM_REMOTE } BackendRequest_t; + public: virtual ~BackendInterface() = default; @@ -25,29 +29,18 @@ class BackendInterface { */ virtual void connect_to(std::string hostname_port) { throw NotImplementedBackendMethod(); }; - /** - * @brief Send data to target + /** Fetch a chunk of CapioFile internal data from remote host * - * @param target Hostname of remote target - * @param buf pointer to data to sent - * @param buf_size length of@param filepath - * @param start_offset - * @param buf + * @param hostname Hostname to request data from + * @param filepath Path of the file targeted by the request + * @param buffer Buffer in which data will be available + * @param offset Offset relative to the beginning of the file from which to read from + * @param count Size of @param buffer and hence size of the fetch operation + * @return Amount of data returned from the remote host */ - virtual void send(const std::string &target, char *buf, uint64_t buf_size, - const std::string &filepath, capio_off64_t start_offset) { - throw NotImplementedBackendMethod(); - }; - - /** - * @brief receive data - * - * @param buf allocated data buffer - * @param buf_size size of @param buf - * @param start_offset - * @return std::string hostname of sender - */ - virtual std::string receive(char *buf, capio_off64_t *buf_size, capio_off64_t *start_offset) { + virtual size_t fetchFromRemoteHost(const std::string &hostname, + const std::filesystem::path &filepath, char *buffer, + capio_off64_t offset, capio_off64_t count) { throw NotImplementedBackendMethod(); }; @@ -65,16 +58,11 @@ class BackendInterface { class NoBackend final : public BackendInterface { public: void connect_to(std::string hostname_port) override { return; }; - - void send(const std::string &target, char *buf, uint64_t buf_size, const std::string &filepath, - capio_off64_t start_offset) override { - return; + size_t fetchFromRemoteHost(const std::string &hostname, const std::filesystem::path &filepath, + char *buffer, capio_off64_t offset, capio_off64_t count) override { + return -1; }; - std::string receive(char *buf, capio_off64_t *buf_size, capio_off64_t *start_offset) override { - return {"no-backend"}; - } - std::vector get_open_connections() override { return {}; } }; diff --git a/capio-server/include/communication-service/data-plane/mtcl_backend.hpp b/capio-server/include/communication-service/data-plane/mtcl_backend.hpp index 96ef6eba8..efb76d8ea 100644 --- a/capio-server/include/communication-service/data-plane/mtcl_backend.hpp +++ b/capio-server/include/communication-service/data-plane/mtcl_backend.hpp @@ -2,12 +2,12 @@ #define MTCL_BACKEND_HPP #include -#include #include #include +#include /** - * This avoid to include the MTCL librari here as it is a header only library. + * This avoids to include the MTCL library here as it is a header only library. * this is equivalent to use extern in C but for class */ namespace MTCL { @@ -17,10 +17,8 @@ class HandleUser; class MTCLBackend : public BackendInterface { typedef enum { FROM_REMOTE, TO_REMOTE } CONN_HANDLER_ORIGIN; - typedef std::tuple *, std::queue *, std::mutex *> - TransportUnitInterface; - std::unordered_map connected_hostnames_map; std::string selfToken, connectedHostname, ownPort, usedProtocol; + std::unordered_map> open_connections; char ownHostname[HOST_NAME_MAX] = {0}; int thread_sleep_times = 0; bool *continue_execution = new bool; @@ -29,36 +27,48 @@ class MTCLBackend : public BackendInterface { std::vector connection_threads; bool *terminate; - static TransportUnit *receive_unit(MTCL::HandleUser *HandlerPointer); - - static void send_unit(MTCL::HandleUser *HandlerPointer, const TransportUnit *unit); - /** - * This thread will handle connections towards a single target. + * This thread handles a single p2p connection with another capio_server instance + * @param HandlerPointer + * @param remote_hostname + * @param outbound_messages + * @param sleep_time + * @param terminate + * @param source */ - void static server_connection_handler(MTCL::HandleUser HandlerPointer, - const std::string remote_hostname, const int sleep_time, - TransportUnitInterface interface, const bool *terminate, - CONN_HANDLER_ORIGIN source); + void static serverConnectionHandler(MTCL::HandleUser HandlerPointer, + const std::string &remote_hostname, + std::queue *outbound_messages, int sleep_time, + const bool *terminate, CONN_HANDLER_ORIGIN source); - void static incoming_connection_listener( + /** + * Waits for incoming new requests to connect to new server instances. When a new request + * arrives it then handshakes with the remote servers, opening a new connection, and starting a + * new thread that will handle remote requests + * + * @param continue_execution + * @param sleep_time + * @param open_connections + * @param guard + * @param _connection_threads + * @param terminate + */ + void static incomingConnectionListener( const bool *continue_execution, int sleep_time, - std::unordered_map *open_connections, + + std::unordered_map> *open_connections, std::mutex *guard, std::vector *_connection_threads, bool *terminate); public: - void connect_to(std::string hostname_port) override; - explicit MTCLBackend(const std::string &proto, const std::string &port, int sleep_time); ~MTCLBackend() override; - std::string receive(char *buf, capio_off64_t *buf_size, capio_off64_t *start_offset) override; - - void send(const std::string &target, char *buf, uint64_t buf_size, const std::string &filepath, - const capio_off64_t start_offset) override; + void connect_to(std::string hostname_port) override; std::vector get_open_connections() override; + size_t fetchFromRemoteHost(const std::string &hostname, const std::filesystem::path &filepath, + char *buffer, capio_off64_t offset, capio_off64_t count) override; }; #endif // MTCL_BACKEND_HPP diff --git a/capio-server/include/communication-service/data-plane/transport_unit.hpp b/capio-server/include/communication-service/data-plane/transport_unit.hpp deleted file mode 100644 index 62ae5abf9..000000000 --- a/capio-server/include/communication-service/data-plane/transport_unit.hpp +++ /dev/null @@ -1,22 +0,0 @@ -#ifndef TRANSPORT_UNIT_HPP -#define TRANSPORT_UNIT_HPP - -#include -#include - -class TransportUnit { - protected: - std::string _filepath; - char *_bytes{}; - capio_off64_t _buffer_size{}; - capio_off64_t _start_write_offset{}; - - public: - TransportUnit() = default; - - ~TransportUnit() { delete[] _bytes; } - - friend class MTCLBackend; -}; - -#endif // TRANSPORT_UNIT_HPP diff --git a/capio-server/src/communication-service/data-plane/mtcl_backend.cpp b/capio-server/src/communication-service/data-plane/mtcl_backend.cpp index 9fd31b6f2..d8f257708 100644 --- a/capio-server/src/communication-service/data-plane/mtcl_backend.cpp +++ b/capio-server/src/communication-service/data-plane/mtcl_backend.cpp @@ -1,118 +1,42 @@ +#include +#include #include +#include #include -TransportUnit *MTCLBackend::receive_unit(MTCL::HandleUser *HandlerPointer) { - START_LOG(gettid(), "call()"); - size_t filepath_len; - const auto unit = new TransportUnit(); - HandlerPointer->receive(&filepath_len, sizeof(size_t)); - LOG("Incoming path of length %ld", filepath_len); - unit->_filepath.reserve(filepath_len + 1); - HandlerPointer->receive(unit->_filepath.data(), filepath_len); - LOG("Received message! Path : %s", unit->_filepath.c_str()); - HandlerPointer->receive(&unit->_buffer_size, sizeof(capio_off64_t)); - LOG("Buffer size for incoming data is %ld", unit->_buffer_size); - unit->_bytes = new char[unit->_buffer_size]; - LOG("Allocated space for incoming data"); - HandlerPointer->receive(unit->_bytes, unit->_buffer_size); - LOG("Received file buffer data"); - HandlerPointer->receive(&unit->_start_write_offset, sizeof(capio_off64_t)); - LOG("Received chunk of data should be stored on offset %ld of file %s", - unit->_start_write_offset, unit->_filepath.c_str()); - return unit; -} - -void MTCLBackend::send_unit(MTCL::HandleUser *HandlerPointer, const TransportUnit *unit) { - START_LOG(gettid(), "call()"); - LOG("[send] buffer=%s", unit->_bytes); - /** - * step0: send file path - * step1: send receive buffer size - * step1: send offset of write - * step2: send data - */ - const size_t file_path_length = unit->_filepath.length(); - - HandlerPointer->send(&file_path_length, sizeof(size_t)); - LOG("Size of path that is being sent: %ld", file_path_length); - - HandlerPointer->send(unit->_filepath.c_str(), file_path_length); - LOG("Sent file path: %s", unit->_filepath.c_str()); - - HandlerPointer->send(&unit->_buffer_size, sizeof(capio_off64_t)); - LOG("Size of file buffer to be sent: %ld", unit->_buffer_size); - - HandlerPointer->send(unit->_bytes, unit->_buffer_size); - LOG("Sent %ld bytes of data chunk", unit->_buffer_size); - - HandlerPointer->send(&unit->_start_write_offset, sizeof(capio_off64_t)); - LOG("Sent start write offset : %ld", unit->_start_write_offset); - - // DO NOT DELETE unit: here just afterward, the unit experiences a pop() which - // effectively calls delete on the container. If I delete it here, a double delete is - // raised -} - /** * This thread will handle connections towards a single target. */ -void MTCLBackend::server_connection_handler(MTCL::HandleUser HandlerPointer, - const std::string remote_hostname, const int sleep_time, - TransportUnitInterface interface, const bool *terminate, - CONN_HANDLER_ORIGIN source) { +void MTCLBackend::serverConnectionHandler(MTCL::HandleUser HandlerPointer, + const std::string &remote_hostname, + std::queue *outbound_messages, + const int sleep_time, const bool *terminate, + const CONN_HANDLER_ORIGIN source) { START_LOG(gettid(), "call(remote_hostname=%s, kind=%s)", remote_hostname.c_str(), source == FROM_REMOTE ? "from remote server" : "to remote server"); - // out = data to sent to others - // in = data from others - auto [in, out, mutex] = interface; - while (HandlerPointer.isValid()) { // execute up to N operation of send &/or receive, to avoid starvation due to // semaphores. constexpr int max_net_op = 10; // Send phase - for (int completed_io_operations = 0; completed_io_operations < max_net_op && !out->empty(); + for (int completed_io_operations = 0; completed_io_operations < max_net_op; ++completed_io_operations) { - LOG("[send] Starting send section"); - const auto unit = out->front(); - - LOG("[send] Sending %ld bytes of file %s to %s", unit->_buffer_size, - unit->_filepath.c_str(), remote_hostname.c_str()); - send_unit(&HandlerPointer, unit); - LOG("[send] Message sent"); - - const std::lock_guard lg(*mutex); - LOG("[send] Locked guard"); - out->pop(); + // TODO: send incoming request and then retrive result... } // Receive phase size_t receive_size = 0, completed_io_operations = 0; HandlerPointer.probe(receive_size, false); while (completed_io_operations < max_net_op && receive_size > 0) { - LOG("[recv] Receiving data"); - auto unit = receive_unit(&HandlerPointer); - LOG("[recv] Lock guard"); - const std::lock_guard lg(*mutex); - in->push(unit); - LOG("[recv] Pushed %ld bytes to be stored on file %s", unit->_buffer_size, - unit->_filepath.c_str()); - - ++completed_io_operations; - receive_size = 0; - HandlerPointer.probe(receive_size, false); + completed_io_operations++; } // terminate phase if (*terminate) { - const std::lock_guard lg(*mutex); + LOG("[TERM PHASE] Locked access send and receive queues"); - while (!out->empty()) { - const auto unit = out->front(); - send_unit(&HandlerPointer, unit); - out->pop(); - } + LOG("[TERM PHASE] Emptied queues. Closing connection"); HandlerPointer.close(); LOG("[TERM PHASE] Terminating thread server_connection_handler"); @@ -123,9 +47,9 @@ void MTCLBackend::server_connection_handler(MTCL::HandleUser HandlerPointer, } } -void MTCLBackend::incoming_connection_listener( +void MTCLBackend::incomingConnectionListener( const bool *continue_execution, int sleep_time, - std::unordered_map *open_connections, std::mutex *guard, + std::unordered_map> *open_connections, std::mutex *guard, std::vector *_connection_threads, bool *terminate) { char ownHostname[HOST_NAME_MAX] = {0}; @@ -150,14 +74,11 @@ void MTCLBackend::incoming_connection_listener( const std::lock_guard lock(*guard); - open_connections->insert( - {connected_hostname, - std::make_tuple(new std::queue(), new std::queue(), - new std::mutex())}); + open_connections->insert({connected_hostname, {}}); _connection_threads->push_back(new std::thread( - server_connection_handler, std::move(UserManager), connected_hostname, sleep_time, - open_connections->at(connected_hostname), terminate, FROM_REMOTE)); + serverConnectionHandler, std::move(UserManager), connected_hostname, + &open_connections->at(connected_hostname), sleep_time, terminate, FROM_REMOTE)); } } @@ -173,7 +94,7 @@ void MTCLBackend::connect_to(std::string hostname_port) { return; } - if (connected_hostnames_map.find(remoteToken) != connected_hostnames_map.end()) { + if (open_connections.contains(remoteHost)) { LOG("Remote host %s is already connected", remoteHost.c_str()); return; } @@ -185,13 +106,11 @@ void MTCLBackend::connect_to(std::string hostname_port) { UserManager.send(ownHostname, HOST_NAME_MAX); const std::lock_guard lg(*_guard); - auto connection_tuple = std::make_tuple( - new std::queue(), new std::queue(), new std::mutex()); - connected_hostnames_map.insert({remoteHost, connection_tuple}); + open_connections.insert({remoteHost, {}}); - connection_threads.push_back( - new std::thread(server_connection_handler, std::move(UserManager), remoteHost.c_str(), - thread_sleep_times, connection_tuple, terminate, TO_REMOTE)); + connection_threads.push_back(new std::thread( + serverConnectionHandler, std::move(UserManager), remoteHost.c_str(), + &open_connections.at(remoteHost), thread_sleep_times, terminate, TO_REMOTE)); } else { server_println(CAPIO_SERVER_CLI_LOG_SERVER_WARNING, "Warning: tried to connect to " + std::string(remoteHost) + @@ -220,8 +139,8 @@ MTCLBackend::MTCLBackend(const std::string &proto, const std::string &port, int MTCL::Manager::listen(selfToken); - th = new std::thread(incoming_connection_listener, std::ref(continue_execution), sleep_time, - &connected_hostnames_map, _guard, &connection_threads, terminate); + th = new std::thread(incomingConnectionListener, std::ref(continue_execution), sleep_time, + &open_connections, _guard, &connection_threads, terminate); server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, "MTCL_backend initialization completed."); } @@ -248,71 +167,30 @@ MTCLBackend::~MTCLBackend() { server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, "MTCL_backend cleanup completed."); } -std::string MTCLBackend::receive(char *buf, capio_off64_t *buf_size, capio_off64_t *start_offset) { - START_LOG(gettid(), "call()"); +std::vector MTCLBackend::get_open_connections() { + std::vector keys; + keys.reserve(open_connections.size()); // avoid reallocations - std::queue *inQueue = nullptr; - TransportUnitInterface interface; - bool found = false; - while (!found) { - for (auto [hostname, data] : connected_hostnames_map) { - inQueue = std::get<0>(data); - interface = data; - found = !inQueue->empty(); - LOG("Hostname %s, %s incoming data", hostname.c_str(), found ? "has" : "has not"); - } - if (!found) { - LOG("No incoming messages. Putting thread to sleep"); - std::this_thread::sleep_for(std::chrono::milliseconds(thread_sleep_times)); - } + for (const auto &pair : open_connections) { + keys.push_back(pair.first); } - LOG("Found incoming message"); - const std::lock_guard lg(*std::get<2>(interface)); - auto inputUnit = inQueue->front(); - *buf_size = inputUnit->_buffer_size; - *start_offset = inputUnit->_start_write_offset; - memcpy(buf, inputUnit->_bytes, *buf_size); - LOG("Received buffer: %s", inputUnit->_bytes); - inQueue->pop(); - - std::string filename(inputUnit->_filepath); - - return filename; -} -void MTCLBackend::send(const std::string &target, char *buf, uint64_t buf_size, - const std::string &filepath, const capio_off64_t start_offset) { - START_LOG(gettid(), "call(target=%s, buf_size=%ld, file_path=%s, start_offset=%ld, buf=%s)", - target.c_str(), buf_size, filepath.c_str(), start_offset, buf); - - if (const auto element = connected_hostnames_map.find(target); - element != connected_hostnames_map.end()) { - LOG("Found alive connection for given target"); - - const auto interface = element->second; - auto *out = std::get<1>(interface); - - const auto outputUnit = new TransportUnit(); - outputUnit->_buffer_size = buf_size; - outputUnit->_filepath = filepath; - outputUnit->_start_write_offset = start_offset; - outputUnit->_bytes = new char[buf_size]; - memcpy(outputUnit->_bytes, buf, buf_size); - LOG("Copied buffer: %s", outputUnit->_bytes); - - const std::lock_guard lg(*std::get<2>(interface)); - LOG("Pushing Transport unit to out queue"); - out->push(outputUnit); - } else { - server_println(CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, "can't find target"); - } + return keys; } -std::vector MTCLBackend::get_open_connections() { - std::vector connections; +size_t MTCLBackend::fetchFromRemoteHost(const std::string &hostname, + const std::filesystem::path &filepath, char *buffer, + capio_off64_t offset, capio_off64_t count) { + START_LOG(gettid(), "call(hostname=%s, path=%s, offset=%ld, count=%ld)", hostname.c_str(), + filepath.c_str(), offset, count); - for (const auto &[hostname, _] : connected_hostnames_map) { - connections.push_back(hostname); - } - return connections; + char REQUEST[CAPIO_REQ_MAX_SIZE]; + + sprintf(REQUEST, "%d %s %llu %llu", FETCH_FROM_REMOTE, filepath.c_str(), offset, count); + open_connections.at(hostname).emplace(REQUEST); + + + + + return 0; } From 7bfbc7029dc2f828558924334da62f7e19cfa632 Mon Sep 17 00:00:00 2001 From: Marco Edoardo Santimaria Date: Wed, 3 Sep 2025 09:51:52 +0000 Subject: [PATCH 04/16] Work in progress on backend fetcFromRemote --- .../data-plane/backend_interface.hpp | 60 +++++++++++++++++++ .../data-plane/mtcl_backend.hpp | 15 ++--- .../storage-service/capio_storage_service.hpp | 4 ++ .../data-plane/mtcl_backend.cpp | 25 +++++--- .../storage-service/capio_storage_service.cpp | 11 +++- 5 files changed, 98 insertions(+), 17 deletions(-) diff --git a/capio-server/include/communication-service/data-plane/backend_interface.hpp b/capio-server/include/communication-service/data-plane/backend_interface.hpp index 0d16f9775..2a492f928 100644 --- a/capio-server/include/communication-service/data-plane/backend_interface.hpp +++ b/capio-server/include/communication-service/data-plane/backend_interface.hpp @@ -4,10 +4,70 @@ #include "capio/constants.hpp" #include #include +#include +#include #include #include +#include #include +class MessageQueue { + + class ResponseToRequest { + public: + std::string original_request; + char *response; + capio_off64_t response_size; + + ResponseToRequest(std::string request, char *buf, capio_off64_t buff_size) + : original_request(std::move(request)), response(buf), response_size(buff_size) {} + }; + + std::queue request_queue; + std::queue response_queue; + std::mutex request_queue_mutex, response_queue_mutex; + + public: + void push_request(const std::string &request) { + std::lock_guard lg(request_queue_mutex); + request_queue.emplace(request); + } + + void push_response(char *buffer, capio_off64_t buff_size, std::string origin) { + std::lock_guard lg(response_queue_mutex); + response_queue.emplace(std::move(origin), buffer, buff_size); + } + + std::string get_request() { + std::lock_guard lg(request_queue_mutex); + std::string req = std::move(request_queue.front()); + request_queue.pop(); + return req; + } + + std::tuple get_response() { + timespec sleep{.tv_sec = 0, .tv_nsec = 300}; + while (!this->has_response()) { + nanosleep(&sleep, nullptr); + } + + std::lock_guard lg(response_queue_mutex); + auto response = std::move(response_queue.front()); + response_queue.pop(); + return std::make_tuple(response.response_size, response.response); + } + + bool has_requests() { + std::lock_guard lg(request_queue_mutex); + return !request_queue.empty(); + } + + bool has_response() { + std::lock_guard lg(response_queue_mutex); + return !response_queue.empty(); + } +}; + class NotImplementedBackendMethod : public std::exception { public: [[nodiscard]] const char *what() const noexcept override { diff --git a/capio-server/include/communication-service/data-plane/mtcl_backend.hpp b/capio-server/include/communication-service/data-plane/mtcl_backend.hpp index efb76d8ea..f14e8c0b0 100644 --- a/capio-server/include/communication-service/data-plane/mtcl_backend.hpp +++ b/capio-server/include/communication-service/data-plane/mtcl_backend.hpp @@ -18,7 +18,7 @@ class MTCLBackend : public BackendInterface { typedef enum { FROM_REMOTE, TO_REMOTE } CONN_HANDLER_ORIGIN; std::string selfToken, connectedHostname, ownPort, usedProtocol; - std::unordered_map> open_connections; + std::unordered_map open_connections; char ownHostname[HOST_NAME_MAX] = {0}; int thread_sleep_times = 0; bool *continue_execution = new bool; @@ -32,14 +32,16 @@ class MTCLBackend : public BackendInterface { * @param HandlerPointer * @param remote_hostname * @param outbound_messages + * @param inbound_messages queue of inbound results for outbound message, with pair of buffer + * ond buffer size * @param sleep_time * @param terminate * @param source */ void static serverConnectionHandler(MTCL::HandleUser HandlerPointer, - const std::string &remote_hostname, - std::queue *outbound_messages, int sleep_time, - const bool *terminate, CONN_HANDLER_ORIGIN source); + const std::string &remote_hostname, MessageQueue *queue, + int sleep_time, const bool *terminate, + CONN_HANDLER_ORIGIN source); /** * Waits for incoming new requests to connect to new server instances. When a new request @@ -55,9 +57,8 @@ class MTCLBackend : public BackendInterface { */ void static incomingConnectionListener( const bool *continue_execution, int sleep_time, - - std::unordered_map> *open_connections, - std::mutex *guard, std::vector *_connection_threads, bool *terminate); + std::unordered_map *open_connections, std::mutex *guard, + std::vector *_connection_threads, bool *terminate); public: explicit MTCLBackend(const std::string &proto, const std::string &port, int sleep_time); diff --git a/capio-server/include/storage-service/capio_storage_service.hpp b/capio-server/include/storage-service/capio_storage_service.hpp index 1e024e35f..17bffa0fb 100644 --- a/capio-server/include/storage-service/capio_storage_service.hpp +++ b/capio-server/include/storage-service/capio_storage_service.hpp @@ -110,6 +110,10 @@ class CapioStorageService { [[nodiscard]] size_t sendFilesToStoreInMemory(long pid) const; void remove_client(pid_t pid) const; + + + void storeData(const std::filesystem::path &path, capio_off64_t offset, capio_off64_t buff_size, + const char *buffer) const; }; inline CapioStorageService *storage_service; diff --git a/capio-server/src/communication-service/data-plane/mtcl_backend.cpp b/capio-server/src/communication-service/data-plane/mtcl_backend.cpp index d8f257708..b61629db2 100644 --- a/capio-server/src/communication-service/data-plane/mtcl_backend.cpp +++ b/capio-server/src/communication-service/data-plane/mtcl_backend.cpp @@ -1,3 +1,5 @@ +#include "include/storage-service/capio_storage_service.hpp" + #include #include #include @@ -8,8 +10,7 @@ * This thread will handle connections towards a single target. */ void MTCLBackend::serverConnectionHandler(MTCL::HandleUser HandlerPointer, - const std::string &remote_hostname, - std::queue *outbound_messages, + const std::string &remote_hostname, MessageQueue *queue, const int sleep_time, const bool *terminate, const CONN_HANDLER_ORIGIN source) { START_LOG(gettid(), "call(remote_hostname=%s, kind=%s)", remote_hostname.c_str(), @@ -19,6 +20,8 @@ void MTCLBackend::serverConnectionHandler(MTCL::HandleUser HandlerPointer, // semaphores. constexpr int max_net_op = 10; + // TODO: implement send and recive! + // Send phase for (int completed_io_operations = 0; completed_io_operations < max_net_op; ++completed_io_operations) { @@ -49,7 +52,7 @@ void MTCLBackend::serverConnectionHandler(MTCL::HandleUser HandlerPointer, void MTCLBackend::incomingConnectionListener( const bool *continue_execution, int sleep_time, - std::unordered_map> *open_connections, std::mutex *guard, + std::unordered_map *open_connections, std::mutex *guard, std::vector *_connection_threads, bool *terminate) { char ownHostname[HOST_NAME_MAX] = {0}; @@ -78,7 +81,7 @@ void MTCLBackend::incomingConnectionListener( _connection_threads->push_back(new std::thread( serverConnectionHandler, std::move(UserManager), connected_hostname, - &open_connections->at(connected_hostname), sleep_time, terminate, FROM_REMOTE)); + open_connections->at(connected_hostname), sleep_time, terminate, FROM_REMOTE)); } } @@ -108,9 +111,11 @@ void MTCLBackend::connect_to(std::string hostname_port) { open_connections.insert({remoteHost, {}}); - connection_threads.push_back(new std::thread( - serverConnectionHandler, std::move(UserManager), remoteHost.c_str(), - &open_connections.at(remoteHost), thread_sleep_times, terminate, TO_REMOTE)); + auto queue_elem = open_connections.at(remoteHost); + + connection_threads.push_back(new std::thread(serverConnectionHandler, + std::move(UserManager), remoteHost, queue_elem, + thread_sleep_times, terminate, TO_REMOTE)); } else { server_println(CAPIO_SERVER_CLI_LOG_SERVER_WARNING, "Warning: tried to connect to " + std::string(remoteHost) + @@ -181,16 +186,20 @@ std::vector MTCLBackend::get_open_connections() { size_t MTCLBackend::fetchFromRemoteHost(const std::string &hostname, const std::filesystem::path &filepath, char *buffer, capio_off64_t offset, capio_off64_t count) { + START_LOG(gettid(), "call(hostname=%s, path=%s, offset=%ld, count=%ld)", hostname.c_str(), filepath.c_str(), offset, count); char REQUEST[CAPIO_REQ_MAX_SIZE]; sprintf(REQUEST, "%d %s %llu %llu", FETCH_FROM_REMOTE, filepath.c_str(), offset, count); - open_connections.at(hostname).emplace(REQUEST); + auto queues = open_connections.at(hostname); + queues->push_request(REQUEST); + auto [buff_size, response_buffer] = queues->get_response(); + storage_service->storeData(filepath, offset, buff_size, response_buffer); return 0; } diff --git a/capio-server/src/storage-service/capio_storage_service.cpp b/capio-server/src/storage-service/capio_storage_service.cpp index c17ef7df3..1e623a5d2 100644 --- a/capio-server/src/storage-service/capio_storage_service.cpp +++ b/capio-server/src/storage-service/capio_storage_service.cpp @@ -101,8 +101,8 @@ void CapioStorageService::register_client(const std::string &app_name, const pid LOG("Created communication queues"); } -size_t CapioStorageService::reply_to_client(pid_t pid, const std::string &file, capio_off64_t offset, - capio_off64_t size) const { +size_t CapioStorageService::reply_to_client(pid_t pid, const std::string &file, + capio_off64_t offset, capio_off64_t size) const { START_LOG(gettid(), "call(pid=%llu, file=%s, offset=%llu, size=%llu)", pid, file.c_str(), offset, size); @@ -151,3 +151,10 @@ size_t CapioStorageService::sendFilesToStoreInMemory(const long pid) const { LOG("Return value=%llu", files_to_store_in_mem.size()); return files_to_store_in_mem.size(); } + +void CapioStorageService::storeData(const std::filesystem::path &path, const capio_off64_t offset, + const capio_off64_t buff_size, const char *buffer) const { + const auto file = getFile(path); + + file->writeData(buffer, offset, buff_size); +} From 5190e24de59e6b67f645a63fe1c6839255265ff9 Mon Sep 17 00:00:00 2001 From: Marco Edoardo Santimaria Date: Wed, 3 Sep 2025 16:32:06 +0200 Subject: [PATCH 05/16] Began work on handling requests --- .../data-plane/mtcl_backend.cpp | 34 +++++++++++++++---- 1 file changed, 27 insertions(+), 7 deletions(-) diff --git a/capio-server/src/communication-service/data-plane/mtcl_backend.cpp b/capio-server/src/communication-service/data-plane/mtcl_backend.cpp index b61629db2..09410a11a 100644 --- a/capio-server/src/communication-service/data-plane/mtcl_backend.cpp +++ b/capio-server/src/communication-service/data-plane/mtcl_backend.cpp @@ -1,4 +1,6 @@ +#include "../../../../cmake-build-release/_deps/mtcl-src/include/protocols/shm.hpp" #include "include/storage-service/capio_storage_service.hpp" +#include "protocols/mpip2p.hpp" #include #include @@ -20,24 +22,42 @@ void MTCLBackend::serverConnectionHandler(MTCL::HandleUser HandlerPointer, // semaphores. constexpr int max_net_op = 10; - // TODO: implement send and recive! + /* + * TODO: this code works for this reason: most of the time, it happens that data flows in + * one way. as such it is very unlikely that request between two nodes handled by this + * function to arrive at the same time. when this happens, we would need to reorder + * messages, but as data flows in one direction we probably could avoid to fix this + * issue as of right now. + */ // Send phase - for (int completed_io_operations = 0; completed_io_operations < max_net_op; - ++completed_io_operations) { - // TODO: send incoming request and then retrive result... + if (queue->has_requests()) { + // Send of request + auto request = queue->get_request(); + HandlerPointer.send(request.c_str(), request.length()); + + // Retrive size of response + capio_off64_t response_size; + HandlerPointer.receive(&response_size, sizeof(response_size)); + char *response_buffer = new char[response_size]; + HandlerPointer.receive(response_buffer, response_size); + + // push response back to the source + queue->push_response(response_buffer, response_size, request); } // Receive phase size_t receive_size = 0, completed_io_operations = 0; HandlerPointer.probe(receive_size, false); - while (completed_io_operations < max_net_op && receive_size > 0) { - completed_io_operations++; + while (receive_size > 0) { + char incoming_request[CAPIO_REQ_MAX_SIZE]; + HandlerPointer.receive(incoming_request, receive_size); + + //TODO: handle request } // terminate phase if (*terminate) { - LOG("[TERM PHASE] Locked access send and receive queues"); LOG("[TERM PHASE] Emptied queues. Closing connection"); From d6f5f52952f1b91d5eb88ea391bdda931baba523 Mon Sep 17 00:00:00 2001 From: Marco Edoardo Santimaria Date: Thu, 11 Sep 2025 11:22:31 +0200 Subject: [PATCH 06/16] Bagain implementation of communication protocol --- .../data-plane/backend_interface.hpp | 2 +- .../data-plane/mtcl_backend.cpp | 116 +++++++++++------- 2 files changed, 76 insertions(+), 42 deletions(-) diff --git a/capio-server/include/communication-service/data-plane/backend_interface.hpp b/capio-server/include/communication-service/data-plane/backend_interface.hpp index 2a492f928..c7a2f36b8 100644 --- a/capio-server/include/communication-service/data-plane/backend_interface.hpp +++ b/capio-server/include/communication-service/data-plane/backend_interface.hpp @@ -79,7 +79,7 @@ class NotImplementedBackendMethod : public std::exception { class BackendInterface { protected: - typedef enum { FETCH_FROM_REMOTE } BackendRequest_t; + typedef enum { HAVE_FINISH_SEND_REQUEST, FETCH_FROM_REMOTE } BackendRequest_t; public: virtual ~BackendInterface() = default; diff --git a/capio-server/src/communication-service/data-plane/mtcl_backend.cpp b/capio-server/src/communication-service/data-plane/mtcl_backend.cpp index 09410a11a..beb1ef83c 100644 --- a/capio-server/src/communication-service/data-plane/mtcl_backend.cpp +++ b/capio-server/src/communication-service/data-plane/mtcl_backend.cpp @@ -1,6 +1,4 @@ -#include "../../../../cmake-build-release/_deps/mtcl-src/include/protocols/shm.hpp" #include "include/storage-service/capio_storage_service.hpp" -#include "protocols/mpip2p.hpp" #include #include @@ -15,57 +13,93 @@ void MTCLBackend::serverConnectionHandler(MTCL::HandleUser HandlerPointer, const std::string &remote_hostname, MessageQueue *queue, const int sleep_time, const bool *terminate, const CONN_HANDLER_ORIGIN source) { + constexpr int max_net_op = 10; + /* + * Algorithm works in this way. At the beginning, the role of the node that starts to send is + * chosen as the smaller lexicographically between the two hostnames involved in the + * communication. Then two phases of sending and receiving up to max_net_op are performed. + * The two nodes switches phases after a final synchronization with the special request + * HAVE_FINISH_SEND_REQUEST. + * When the sender sends this request, either because it has reached the max_net_op or because + * there are no more messages to be sent, the two nodes switches roles. This continues until the + * remote handle pointer is valid + */ + + char ownHostname[HOST_NAME_MAX]; + gethostname(ownHostname, HOST_NAME_MAX); + bool my_turn_to_send = ownHostname > remote_hostname; + + char request_has_finished_to_send[CAPIO_REQ_MAX_SIZE]{0}; + sprintf(request_has_finished_to_send, "%03d", HAVE_FINISH_SEND_REQUEST); + START_LOG(gettid(), "call(remote_hostname=%s, kind=%s)", remote_hostname.c_str(), source == FROM_REMOTE ? "from remote server" : "to remote server"); - while (HandlerPointer.isValid()) { - // execute up to N operation of send &/or receive, to avoid starvation due to - // semaphores. - constexpr int max_net_op = 10; - - /* - * TODO: this code works for this reason: most of the time, it happens that data flows in - * one way. as such it is very unlikely that request between two nodes handled by this - * function to arrive at the same time. when this happens, we would need to reorder - * messages, but as data flows in one direction we probably could avoid to fix this - * issue as of right now. - */ - - // Send phase - if (queue->has_requests()) { - // Send of request - auto request = queue->get_request(); - HandlerPointer.send(request.c_str(), request.length()); - - // Retrive size of response - capio_off64_t response_size; - HandlerPointer.receive(&response_size, sizeof(response_size)); - char *response_buffer = new char[response_size]; - HandlerPointer.receive(response_buffer, response_size); - - // push response back to the source - queue->push_response(response_buffer, response_size, request); - } - // Receive phase - size_t receive_size = 0, completed_io_operations = 0; - HandlerPointer.probe(receive_size, false); - while (receive_size > 0) { - char incoming_request[CAPIO_REQ_MAX_SIZE]; - HandlerPointer.receive(incoming_request, receive_size); + LOG("Will begin execution with %s phase", my_turn_to_send ? "sending" : "receiving"); - //TODO: handle request + while (HandlerPointer.isValid()) { + // execute up to N operation of send &/or receive, to avoid starvation + + if (my_turn_to_send) { + for (int i = 0; i < max_net_op && queue->has_requests(); i++) { + // Send of request + auto request = queue->get_request(); + HandlerPointer.send(request.c_str(), request.length()); + + // Retrive size of response + capio_off64_t response_size; + HandlerPointer.receive(&response_size, sizeof(response_size)); + char *response_buffer = new char[response_size]; + HandlerPointer.receive(response_buffer, response_size); + + // push response back to the source + queue->push_response(response_buffer, response_size, request); + } + + // Send message I have finished + HandlerPointer.send(request_has_finished_to_send, sizeof(request_has_finished_to_send)); + + } else { + for (int i = 0; i < max_net_op; i++) { + // Receive phase + size_t receive_size = 0; + HandlerPointer.probe(receive_size, false); + while (receive_size > 0) { + BackendRequest_t requestCode; + char incoming_request[CAPIO_REQ_MAX_SIZE]; + HandlerPointer.receive(incoming_request, receive_size); + sscanf(incoming_request, "%d", &requestCode); + + switch (requestCode) { + case HAVE_FINISH_SEND_REQUEST: { + // Finished sending data. Set i to be greater than max_net_op to go to next + // phase + i = max_net_op; + break; + } + + case FETCH_FROM_REMOTE: { + // Scan request fetch from remote + + break; + } + + default: + break; + } + } + } } // terminate phase if (*terminate) { - LOG("[TERM PHASE] Locked access send and receive queues"); - - LOG("[TERM PHASE] Emptied queues. Closing connection"); + LOG("[TERM PHASE] Closing connection"); HandlerPointer.close(); LOG("[TERM PHASE] Terminating thread server_connection_handler"); return; } + my_turn_to_send = !my_turn_to_send; std::this_thread::sleep_for(std::chrono::milliseconds(sleep_time)); } } @@ -212,7 +246,7 @@ size_t MTCLBackend::fetchFromRemoteHost(const std::string &hostname, char REQUEST[CAPIO_REQ_MAX_SIZE]; - sprintf(REQUEST, "%d %s %llu %llu", FETCH_FROM_REMOTE, filepath.c_str(), offset, count); + sprintf(REQUEST, "%03d %s %llu %llu", FETCH_FROM_REMOTE, filepath.c_str(), offset, count); auto queues = open_connections.at(hostname); queues->push_request(REQUEST); From 52c0b7e2933bd94584520adaa9b0bc30fa3d8cab Mon Sep 17 00:00:00 2001 From: Marco Edoardo Santimaria Date: Thu, 11 Sep 2025 13:03:17 +0200 Subject: [PATCH 07/16] First version of memory backend --- .../storage-service/capio_storage_service.hpp | 12 +++++++++++- .../data-plane/mtcl_backend.cpp | 13 ++++++++++++- .../src/storage-service/capio_storage_service.cpp | 8 ++++++++ 3 files changed, 31 insertions(+), 2 deletions(-) diff --git a/capio-server/include/storage-service/capio_storage_service.hpp b/capio-server/include/storage-service/capio_storage_service.hpp index 17bffa0fb..5ded4916c 100644 --- a/capio-server/include/storage-service/capio_storage_service.hpp +++ b/capio-server/include/storage-service/capio_storage_service.hpp @@ -111,9 +111,19 @@ class CapioStorageService { void remove_client(pid_t pid) const; - void storeData(const std::filesystem::path &path, capio_off64_t offset, capio_off64_t buff_size, const char *buffer) const; + + /** + * Read data from file and store it inside buffer + * @param filepath filepath + * @param offset starting read offset + * @param buffer targeted read buffer + * @param count Requested read size + * @return Actual size of read + */ + size_t readFromFileToBuffer(const std::filesystem::path &filepath, capio_off64_t offset, + char *buffer, capio_off64_t count) const; }; inline CapioStorageService *storage_service; diff --git a/capio-server/src/communication-service/data-plane/mtcl_backend.cpp b/capio-server/src/communication-service/data-plane/mtcl_backend.cpp index beb1ef83c..b2463afc1 100644 --- a/capio-server/src/communication-service/data-plane/mtcl_backend.cpp +++ b/capio-server/src/communication-service/data-plane/mtcl_backend.cpp @@ -1,3 +1,4 @@ +#include "include/file-manager/file_manager.hpp" #include "include/storage-service/capio_storage_service.hpp" #include @@ -65,7 +66,7 @@ void MTCLBackend::serverConnectionHandler(MTCL::HandleUser HandlerPointer, size_t receive_size = 0; HandlerPointer.probe(receive_size, false); while (receive_size > 0) { - BackendRequest_t requestCode; + int requestCode; char incoming_request[CAPIO_REQ_MAX_SIZE]; HandlerPointer.receive(incoming_request, receive_size); sscanf(incoming_request, "%d", &requestCode); @@ -80,7 +81,17 @@ void MTCLBackend::serverConnectionHandler(MTCL::HandleUser HandlerPointer, case FETCH_FROM_REMOTE: { // Scan request fetch from remote + char filepath[PATH_MAX]; + capio_off64_t offset, count; + sscanf(incoming_request, "%s %llu %llu", filepath, &offset, &count); + + auto buffer = new char[count]; + auto read_size = + storage_service->readFromFileToBuffer(filepath, offset, buffer, count); + HandlerPointer.send(&read_size, sizeof(read_size)); + HandlerPointer.send(buffer, read_size); + delete[] buffer; break; } diff --git a/capio-server/src/storage-service/capio_storage_service.cpp b/capio-server/src/storage-service/capio_storage_service.cpp index 1e623a5d2..531d8a928 100644 --- a/capio-server/src/storage-service/capio_storage_service.cpp +++ b/capio-server/src/storage-service/capio_storage_service.cpp @@ -158,3 +158,11 @@ void CapioStorageService::storeData(const std::filesystem::path &path, const cap file->writeData(buffer, offset, buff_size); } + +size_t CapioStorageService::readFromFileToBuffer(const std::filesystem::path &filepath, + capio_off64_t offset, char *buffer, + capio_off64_t count) const { + const auto file = this->getFile(filepath); + + return file->readData(buffer, offset, count); +} From 18c419051019371d73288d224cf512d3082ed592 Mon Sep 17 00:00:00 2001 From: Marco Edoardo Santimaria Date: Thu, 11 Sep 2025 13:36:18 +0200 Subject: [PATCH 08/16] Implemented one function in CapioRemoteFile. other functions throws exceptions --- .../src/storage-service/capio_remote_file.cpp | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/capio-server/src/storage-service/capio_remote_file.cpp b/capio-server/src/storage-service/capio_remote_file.cpp index 558816b4a..2cdd14f7f 100644 --- a/capio-server/src/storage-service/capio_remote_file.cpp +++ b/capio-server/src/storage-service/capio_remote_file.cpp @@ -11,17 +11,28 @@ CapioRemoteFile::~CapioRemoteFile() {} std::size_t CapioRemoteFile::writeData(const char *buffer, const std::size_t file_offset, std::size_t buffer_length) { - return 0; + throw std::runtime_error("Not implemented: writeData"); } std::size_t CapioRemoteFile::readData(char *buffer, std::size_t file_offset, std::size_t buffer_size) { - return 0; + throw std::runtime_error("Not implemented: readData"); } -void CapioRemoteFile::readFromQueue(SPSCQueue &queue, std::size_t offset, std::size_t length) {} +void CapioRemoteFile::readFromQueue(SPSCQueue &queue, std::size_t offset, std::size_t length) { + throw std::runtime_error("Not implemented: readFromQueue"); +} std::size_t CapioRemoteFile::writeToQueue(SPSCQueue &queue, std::size_t offset, std::size_t length) const { - return 0; + auto buffer = new char[length]; + + auto buffer_size = + capio_backend->fetchFromRemoteHost(this->fileName, this->homeNode, buffer, offset, length); + + queue.write(buffer, buffer_size); + + delete[] buffer; + + return buffer_size; } From 402caa9cee3e33861c62469065e25cbe9a65da0c Mon Sep 17 00:00:00 2001 From: Marco Edoardo Santimaria Date: Wed, 17 Sep 2025 15:43:40 +0200 Subject: [PATCH 09/16] Logs --- .../include/capio-cl-engine/capio_cl_engine.hpp | 2 +- .../data-plane/backend_interface.hpp | 4 ++++ capio-server/include/storage-service/capio_file.hpp | 4 ++++ capio-server/src/capio-cl-engine/capio_cl_engine.cpp | 2 +- capio-server/src/client-manager/handlers/read.cpp | 2 +- .../src/storage-service/capio_remote_file.cpp | 2 ++ .../src/storage-service/capio_storage_service.cpp | 11 +++++++++++ 7 files changed, 24 insertions(+), 3 deletions(-) diff --git a/capio-server/include/capio-cl-engine/capio_cl_engine.hpp b/capio-server/include/capio-cl-engine/capio_cl_engine.hpp index 6337c5dde..6a49e798d 100644 --- a/capio-server/include/capio-cl-engine/capio_cl_engine.hpp +++ b/capio-server/include/capio-cl-engine/capio_cl_engine.hpp @@ -84,7 +84,7 @@ class CapioCLEngine { void setStoreFileInFileSystem(const std::filesystem::path &path); bool storeFileInMemory(const std::filesystem::path &path); std::vector getFileToStoreInMemory(); - auto get_home_node(const std::string &path); + std::string get_home_node(const std::string &path); }; inline CapioCLEngine *capio_cl_engine; diff --git a/capio-server/include/communication-service/data-plane/backend_interface.hpp b/capio-server/include/communication-service/data-plane/backend_interface.hpp index c7a2f36b8..66826cd99 100644 --- a/capio-server/include/communication-service/data-plane/backend_interface.hpp +++ b/capio-server/include/communication-service/data-plane/backend_interface.hpp @@ -46,13 +46,17 @@ class MessageQueue { } std::tuple get_response() { + START_LOG(gettid(), "call()"); timespec sleep{.tv_sec = 0, .tv_nsec = 300}; while (!this->has_response()) { nanosleep(&sleep, nullptr); } + LOG("Got a response!"); std::lock_guard lg(response_queue_mutex); auto response = std::move(response_queue.front()); + LOG("Response to query %s has size %ld", response.original_request.c_str(), + response.response_size); response_queue.pop(); return std::make_tuple(response.response_size, response.response); } diff --git a/capio-server/include/storage-service/capio_file.hpp b/capio-server/include/storage-service/capio_file.hpp index 3e66ba4b1..0d4a9ca51 100644 --- a/capio-server/include/storage-service/capio_file.hpp +++ b/capio-server/include/storage-service/capio_file.hpp @@ -19,6 +19,8 @@ class CapioFile { : fileName(filePath), homeNode(home_node), totalSize(0) {}; virtual ~CapioFile() = default; + virtual bool is_remote() { return false; } + [[nodiscard]] std::size_t getSize() const { START_LOG(gettid(), "call()"); return totalSize; @@ -137,6 +139,8 @@ class CapioRemoteFile : public CapioFile { ~CapioRemoteFile() override; + bool is_remote() override { return true; }; + /** * Write data to a file stored inside the memory * @param buffer buffer to store inside memory (i.e. content of the file) diff --git a/capio-server/src/capio-cl-engine/capio_cl_engine.cpp b/capio-server/src/capio-cl-engine/capio_cl_engine.cpp index 4cb60859e..011318439 100644 --- a/capio-server/src/capio-cl-engine/capio_cl_engine.cpp +++ b/capio-server/src/capio-cl-engine/capio_cl_engine.cpp @@ -420,7 +420,7 @@ std::vector CapioCLEngine::getFileToStoreInMemory() { return files; } -auto CapioCLEngine::get_home_node(const std::string &path) { +std::string CapioCLEngine::get_home_node(const std::string &path) { // TODO: understand here how to get the home node policy. START_LOG(gettid(), "call(path=%s)", path.c_str()); if (const auto location = _locations.find(path); location == _locations.end()) { diff --git a/capio-server/src/client-manager/handlers/read.cpp b/capio-server/src/client-manager/handlers/read.cpp index 7616645a3..d38f4ffb5 100644 --- a/capio-server/src/client-manager/handlers/read.cpp +++ b/capio-server/src/client-manager/handlers/read.cpp @@ -59,7 +59,7 @@ void read_mem_handler(const char *const str) { use_cache ? "true" : "false", path); capio_off64_t size_to_send = std::min({client_cache_line_size, read_size}); - LOG("Will try to send up to %ld bytes", size_to_send); + LOG("Will try to send to client up to %ld bytes", size_to_send); auto size_sent = storage_service->reply_to_client(tid, path, read_begin_offset, size_to_send); LOG("Sending to posix app the offset up to which read."); diff --git a/capio-server/src/storage-service/capio_remote_file.cpp b/capio-server/src/storage-service/capio_remote_file.cpp index 2cdd14f7f..b775488cc 100644 --- a/capio-server/src/storage-service/capio_remote_file.cpp +++ b/capio-server/src/storage-service/capio_remote_file.cpp @@ -25,6 +25,8 @@ void CapioRemoteFile::readFromQueue(SPSCQueue &queue, std::size_t offset, std::s std::size_t CapioRemoteFile::writeToQueue(SPSCQueue &queue, std::size_t offset, std::size_t length) const { + + START_LOG(gettid(), "call(offset=%ld,count=%ld,path=%s)", offset, length, fileName.c_str()); auto buffer = new char[length]; auto buffer_size = diff --git a/capio-server/src/storage-service/capio_storage_service.cpp b/capio-server/src/storage-service/capio_storage_service.cpp index 531d8a928..55d4c520f 100644 --- a/capio-server/src/storage-service/capio_storage_service.cpp +++ b/capio-server/src/storage-service/capio_storage_service.cpp @@ -7,9 +7,20 @@ #include auto CapioStorageService::getFile(const std::string &file_name) const { + START_LOG(gettid(), "getFile(file_name=%s)", file_name.c_str()); if (_stored_files->find(file_name) == _stored_files->end()) { + LOG("File not found. Creating file!"); + DBG(gettid(), [&]() { + for (auto f : *_stored_files) { + LOG("Found path in storage service %s", f.first.c_str()); + } + }); + createMemoryFile(file_name); } + auto file = _stored_files->at(file_name); + LOG("Returning %s instance with path %s", + file->is_remote() ? "CapioRemotFile" : "CapioMemoryFile", file->getFileName().c_str()); return _stored_files->at(file_name); } From 69dfb0ad03b80237285feedb03b9f2d087756be5 Mon Sep 17 00:00:00 2001 From: Marco Edoardo Santimaria Date: Wed, 17 Sep 2025 15:55:26 +0200 Subject: [PATCH 10/16] log --- .../communication-service/data-plane/backend_interface.hpp | 3 ++- .../src/communication-service/data-plane/mtcl_backend.cpp | 6 ++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/capio-server/include/communication-service/data-plane/backend_interface.hpp b/capio-server/include/communication-service/data-plane/backend_interface.hpp index 66826cd99..9dd5df336 100644 --- a/capio-server/include/communication-service/data-plane/backend_interface.hpp +++ b/capio-server/include/communication-service/data-plane/backend_interface.hpp @@ -1,7 +1,8 @@ #ifndef BACKEND_INTERFACE_HPP #define BACKEND_INTERFACE_HPP -#include "capio/constants.hpp" +#include +#include #include #include #include diff --git a/capio-server/src/communication-service/data-plane/mtcl_backend.cpp b/capio-server/src/communication-service/data-plane/mtcl_backend.cpp index b2463afc1..894c74527 100644 --- a/capio-server/src/communication-service/data-plane/mtcl_backend.cpp +++ b/capio-server/src/communication-service/data-plane/mtcl_backend.cpp @@ -258,12 +258,14 @@ size_t MTCLBackend::fetchFromRemoteHost(const std::string &hostname, char REQUEST[CAPIO_REQ_MAX_SIZE]; sprintf(REQUEST, "%03d %s %llu %llu", FETCH_FROM_REMOTE, filepath.c_str(), offset, count); + LOG("Sending request %s", REQUEST); auto queues = open_connections.at(hostname); + LOG("obtained access to queue"); queues->push_request(REQUEST); - + LOG("Request pushed to output queue"); auto [buff_size, response_buffer] = queues->get_response(); - + LOG("Obtained response. Buffer size of response is $ld", buff_size); storage_service->storeData(filepath, offset, buff_size, response_buffer); return 0; From 8a1227d25cb2b3aa0dbc635e4bac40ddc8a09159 Mon Sep 17 00:00:00 2001 From: Marco Edoardo Santimaria Date: Wed, 17 Sep 2025 18:04:44 +0200 Subject: [PATCH 11/16] Bugfixes and logs --- .../include/storage-service/capio_file.hpp | 6 ++++++ .../control-plane/multicast_control_plane.cpp | 7 +++---- .../data-plane/mtcl_backend.cpp | 17 +++++++++++++---- .../src/storage-service/capio_remote_file.cpp | 4 ++-- .../storage-service/capio_storage_service.cpp | 7 +++++-- scripts/docker-development/build-and-startup.sh | 4 ++-- 6 files changed, 31 insertions(+), 14 deletions(-) diff --git a/capio-server/include/storage-service/capio_file.hpp b/capio-server/include/storage-service/capio_file.hpp index 0d4a9ca51..60521d7d0 100644 --- a/capio-server/include/storage-service/capio_file.hpp +++ b/capio-server/include/storage-service/capio_file.hpp @@ -64,6 +64,12 @@ class CapioFile { */ virtual std::size_t writeToQueue(SPSCQueue &queue, std::size_t offset, std::size_t length) const = 0; + + /** + * + * @return HomeNode of file + */ + virtual std::string getHomeNode() { return homeNode; }; }; class CapioMemoryFile : public CapioFile { diff --git a/capio-server/src/communication-service/control-plane/multicast_control_plane.cpp b/capio-server/src/communication-service/control-plane/multicast_control_plane.cpp index 4fd22f5c2..f9197c2f0 100644 --- a/capio-server/src/communication-service/control-plane/multicast_control_plane.cpp +++ b/capio-server/src/communication-service/control-plane/multicast_control_plane.cpp @@ -90,14 +90,13 @@ void MulticastControlPlane::multicast_control_plane_incoming_thread( continue; } - event_type event; + int event; char source_hostname[HOST_NAME_MAX]; char source_path[PATH_MAX]; - sscanf(incoming_msg, "%d %s %s", reinterpret_cast(&event), source_hostname, - source_path); + sscanf(incoming_msg, "%d %s %s", &event, source_hostname, source_path); - LOG("event=%d, source:%s, path=%s", event, source_path, incoming_msg); + LOG("event=%d, source=%s, path=%s", event, source_hostname, source_path); if (strcmp(capio_global_configuration->node_name, source_hostname) == 0) { continue; diff --git a/capio-server/src/communication-service/data-plane/mtcl_backend.cpp b/capio-server/src/communication-service/data-plane/mtcl_backend.cpp index 894c74527..abe7fa721 100644 --- a/capio-server/src/communication-service/data-plane/mtcl_backend.cpp +++ b/capio-server/src/communication-service/data-plane/mtcl_backend.cpp @@ -42,19 +42,23 @@ void MTCLBackend::serverConnectionHandler(MTCL::HandleUser HandlerPointer, // execute up to N operation of send &/or receive, to avoid starvation if (my_turn_to_send) { + LOG("Send PHASE"); for (int i = 0; i < max_net_op && queue->has_requests(); i++) { // Send of request auto request = queue->get_request(); + LOG("Request to be sent = %s", request.c_str()); HandlerPointer.send(request.c_str(), request.length()); // Retrive size of response capio_off64_t response_size; HandlerPointer.receive(&response_size, sizeof(response_size)); + LOG("Response will have size %ld", response_size); char *response_buffer = new char[response_size]; HandlerPointer.receive(response_buffer, response_size); - + LOG("Received response"); // push response back to the source queue->push_response(response_buffer, response_size, request); + LOG("Pushed response back!"); } // Send message I have finished @@ -63,29 +67,34 @@ void MTCLBackend::serverConnectionHandler(MTCL::HandleUser HandlerPointer, } else { for (int i = 0; i < max_net_op; i++) { // Receive phase + LOG("Receive PHASE"); size_t receive_size = 0; HandlerPointer.probe(receive_size, false); while (receive_size > 0) { + LOG("A request is incoming"); int requestCode; char incoming_request[CAPIO_REQ_MAX_SIZE]; HandlerPointer.receive(incoming_request, receive_size); + LOG("Received request = %s", incoming_request); sscanf(incoming_request, "%d", &requestCode); - + LOG("Request code is %d", requestCode); switch (requestCode) { case HAVE_FINISH_SEND_REQUEST: { // Finished sending data. Set i to be greater than max_net_op to go to next // phase + LOG("Other has finished sending phase. swithing me from receive to send"); i = max_net_op; break; } case FETCH_FROM_REMOTE: { + LOG("Received request for data from remote server"); // Scan request fetch from remote char filepath[PATH_MAX]; capio_off64_t offset, count; sscanf(incoming_request, "%s %llu %llu", filepath, &offset, &count); - + LOG("filepath=%s, offset=%ld, count=%ld", filepath, offset, count); auto buffer = new char[count]; auto read_size = storage_service->readFromFileToBuffer(filepath, offset, buffer, count); @@ -258,7 +267,7 @@ size_t MTCLBackend::fetchFromRemoteHost(const std::string &hostname, char REQUEST[CAPIO_REQ_MAX_SIZE]; sprintf(REQUEST, "%03d %s %llu %llu", FETCH_FROM_REMOTE, filepath.c_str(), offset, count); - LOG("Sending request %s", REQUEST); + LOG("Sending request %s. Fetching queue to hostname %s", REQUEST, hostname.c_str()); auto queues = open_connections.at(hostname); LOG("obtained access to queue"); diff --git a/capio-server/src/storage-service/capio_remote_file.cpp b/capio-server/src/storage-service/capio_remote_file.cpp index b775488cc..d896d9b97 100644 --- a/capio-server/src/storage-service/capio_remote_file.cpp +++ b/capio-server/src/storage-service/capio_remote_file.cpp @@ -5,7 +5,7 @@ #include CapioRemoteFile::CapioRemoteFile(const std::string &filePath, const std::string &home_node) - : CapioFile(filePath) {} + : CapioFile(filePath, home_node) {} CapioRemoteFile::~CapioRemoteFile() {} @@ -30,7 +30,7 @@ std::size_t CapioRemoteFile::writeToQueue(SPSCQueue &queue, std::size_t offset, auto buffer = new char[length]; auto buffer_size = - capio_backend->fetchFromRemoteHost(this->fileName, this->homeNode, buffer, offset, length); + capio_backend->fetchFromRemoteHost(this->homeNode, this->fileName, buffer, offset, length); queue.write(buffer, buffer_size); diff --git a/capio-server/src/storage-service/capio_storage_service.cpp b/capio-server/src/storage-service/capio_storage_service.cpp index 55d4c520f..d7c3a1596 100644 --- a/capio-server/src/storage-service/capio_storage_service.cpp +++ b/capio-server/src/storage-service/capio_storage_service.cpp @@ -55,11 +55,14 @@ void CapioStorageService::createRemoteFile(const std::string &file_name, * by another app running under the same server instance. if it is not found, we create * the file */ - START_LOG(gettid(), "call(file_name=%s)", file_name.c_str()); - if (_stored_files->find(file_name) == _stored_files->end()) { + START_LOG(gettid(), "call(file_name=%s, home_node=%s)", file_name.c_str(), home_node.c_str()); + if (!_stored_files->contains(file_name)) { LOG("File not found. Creating a new remote file"); _stored_files->emplace(file_name, new CapioRemoteFile(file_name, home_node)); } + LOG("Created remote file at path %s with home_node %s", + _stored_files->at(file_name)->getFileName().c_str(), + _stored_files->at(file_name)->getHomeNode().c_str()); } void CapioStorageService::deleteFile(const std::string &file_name) const { diff --git a/scripts/docker-development/build-and-startup.sh b/scripts/docker-development/build-and-startup.sh index df223d16f..7c48205b9 100755 --- a/scripts/docker-development/build-and-startup.sh +++ b/scripts/docker-development/build-and-startup.sh @@ -3,5 +3,5 @@ cd ../.. docker build -t alphaunito/capio --build-arg CAPIO_LOG=ON --build-arg CMAKE_BUILD_TYPE=Debug . cd scripts/docker-development - -docker compose up +( sleep 3; docker ps )& +docker compose up \ No newline at end of file From dd24c93bebbbccaddbed07057107d66a4a5119c2 Mon Sep 17 00:00:00 2001 From: Marco Edoardo Santimaria Date: Thu, 18 Sep 2025 13:44:45 +0200 Subject: [PATCH 12/16] logs and bugfixes --- .../data-plane/backend_interface.hpp | 68 ++++++++++--------- .../data-plane/mtcl_backend.cpp | 63 +++++++++-------- 2 files changed, 72 insertions(+), 59 deletions(-) diff --git a/capio-server/include/communication-service/data-plane/backend_interface.hpp b/capio-server/include/communication-service/data-plane/backend_interface.hpp index 9dd5df336..c9cb386d8 100644 --- a/capio-server/include/communication-service/data-plane/backend_interface.hpp +++ b/capio-server/include/communication-service/data-plane/backend_interface.hpp @@ -3,9 +3,11 @@ #include #include +#include #include #include #include +#include #include #include #include @@ -13,7 +15,6 @@ #include class MessageQueue { - class ResponseToRequest { public: std::string original_request; @@ -26,50 +27,55 @@ class MessageQueue { std::queue request_queue; std::queue response_queue; - std::mutex request_queue_mutex, response_queue_mutex; + + std::mutex request_mutex; + std::mutex response_mutex; + + std::condition_variable request_cv; + std::condition_variable response_cv; public: - void push_request(const std::string &request) { - std::lock_guard lg(request_queue_mutex); - request_queue.emplace(request); - } - void push_response(char *buffer, capio_off64_t buff_size, std::string origin) { - std::lock_guard lg(response_queue_mutex); - response_queue.emplace(std::move(origin), buffer, buff_size); + void push_request(const std::string &request) { + START_LOG(gettid(), "call(req=%s)", request.c_str()); + { + std::lock_guard lg(request_mutex); + LOG("Obtained lock"); + request_queue.emplace(request); + } + request_cv.notify_one(); } - std::string get_request() { - std::lock_guard lg(request_queue_mutex); + std::optional try_get_request() { + START_LOG(gettid(), "call()"); + std::lock_guard lg(request_mutex); + LOG("Obtained lock"); + if (request_queue.empty()) { + return std::nullopt; + } std::string req = std::move(request_queue.front()); request_queue.pop(); return req; } - std::tuple get_response() { - START_LOG(gettid(), "call()"); - timespec sleep{.tv_sec = 0, .tv_nsec = 300}; - while (!this->has_response()) { - nanosleep(&sleep, nullptr); + void push_response(char *buffer, capio_off64_t buff_size, std::string origin) { + START_LOG(gettid(), "call(origin=%s, buff_size=%ld)", origin.c_str(), buff_size); + { + std::lock_guard lg(response_mutex); + LOG("Obtained lock"); + response_queue.emplace(std::move(origin), buffer, buff_size); } + response_cv.notify_one(); + } - LOG("Got a response!"); - std::lock_guard lg(response_queue_mutex); + std::tuple get_response() { + START_LOG(gettid(), "call()"); + std::unique_lock lk(response_mutex); + response_cv.wait(lk, [this] { return !response_queue.empty(); }); + LOG("Obtained lock"); auto response = std::move(response_queue.front()); - LOG("Response to query %s has size %ld", response.original_request.c_str(), - response.response_size); response_queue.pop(); - return std::make_tuple(response.response_size, response.response); - } - - bool has_requests() { - std::lock_guard lg(request_queue_mutex); - return !request_queue.empty(); - } - - bool has_response() { - std::lock_guard lg(response_queue_mutex); - return !response_queue.empty(); + return {response.response_size, response.response}; } }; diff --git a/capio-server/src/communication-service/data-plane/mtcl_backend.cpp b/capio-server/src/communication-service/data-plane/mtcl_backend.cpp index abe7fa721..a9c2da005 100644 --- a/capio-server/src/communication-service/data-plane/mtcl_backend.cpp +++ b/capio-server/src/communication-service/data-plane/mtcl_backend.cpp @@ -43,34 +43,40 @@ void MTCLBackend::serverConnectionHandler(MTCL::HandleUser HandlerPointer, if (my_turn_to_send) { LOG("Send PHASE"); - for (int i = 0; i < max_net_op && queue->has_requests(); i++) { + for (int i = 0; i < max_net_op; i++) { // Send of request - auto request = queue->get_request(); - LOG("Request to be sent = %s", request.c_str()); - HandlerPointer.send(request.c_str(), request.length()); - - // Retrive size of response - capio_off64_t response_size; - HandlerPointer.receive(&response_size, sizeof(response_size)); - LOG("Response will have size %ld", response_size); - char *response_buffer = new char[response_size]; - HandlerPointer.receive(response_buffer, response_size); - LOG("Received response"); - // push response back to the source - queue->push_response(response_buffer, response_size, request); - LOG("Pushed response back!"); + LOG("PIPPO"); + const auto request_opt = queue->try_get_request(); + if (request_opt.has_value()) { + const auto &request = request_opt.value(); + LOG("Request to be sent = %s", request.c_str()); + HandlerPointer.send(request.c_str(), request.length()); + + // Retrieve size of response + capio_off64_t response_size; + HandlerPointer.receive(&response_size, sizeof(response_size)); + LOG("Response will have size %ld", response_size); + char *response_buffer = new char[response_size]; + HandlerPointer.receive(response_buffer, response_size); + LOG("Received response"); + // push response back to the source + queue->push_response(response_buffer, response_size, request); + LOG("Pushed response back!"); + } } - + LOG("Completed SEND PHASE"); // Send message I have finished HandlerPointer.send(request_has_finished_to_send, sizeof(request_has_finished_to_send)); } else { - for (int i = 0; i < max_net_op; i++) { + + bool continue_receive_phase = true; + size_t receive_size = 0; + LOG("Receive PHASE"); + while (continue_receive_phase) { // Receive phase - LOG("Receive PHASE"); - size_t receive_size = 0; HandlerPointer.probe(receive_size, false); - while (receive_size > 0) { + if (receive_size > 0) { LOG("A request is incoming"); int requestCode; char incoming_request[CAPIO_REQ_MAX_SIZE]; @@ -80,10 +86,10 @@ void MTCLBackend::serverConnectionHandler(MTCL::HandleUser HandlerPointer, LOG("Request code is %d", requestCode); switch (requestCode) { case HAVE_FINISH_SEND_REQUEST: { - // Finished sending data. Set i to be greater than max_net_op to go to next - // phase - LOG("Other has finished sending phase. swithing me from receive to send"); - i = max_net_op; + // Finished sending data. Set continue_receive_phase to + // false to go to next phase + LOG("Other has finished sending phase. Switching me from receive to send"); + continue_receive_phase = false; break; } @@ -95,7 +101,7 @@ void MTCLBackend::serverConnectionHandler(MTCL::HandleUser HandlerPointer, sscanf(incoming_request, "%s %llu %llu", filepath, &offset, &count); LOG("filepath=%s, offset=%ld, count=%ld", filepath, offset, count); - auto buffer = new char[count]; + const auto buffer = new char[count]; auto read_size = storage_service->readFromFileToBuffer(filepath, offset, buffer, count); HandlerPointer.send(&read_size, sizeof(read_size)); @@ -145,7 +151,7 @@ void MTCLBackend::incomingConnectionListener( UserManager.receive(connected_hostname, HOST_NAME_MAX); server_println(CAPIO_SERVER_CLI_LOG_SERVER, - std::string("Connected from ") + connected_hostname); + std::string("Accepted connection with ") + connected_hostname); LOG("Received connection hostname: %s", connected_hostname); @@ -178,8 +184,9 @@ void MTCLBackend::connect_to(std::string hostname_port) { LOG("Trying to connect on remote: %s", remoteToken.c_str()); if (auto UserManager = MTCL::Manager::connect(remoteToken); UserManager.isValid()) { - server_println(CAPIO_SERVER_CLI_LOG_SERVER, std::string("Connected to ") + remoteToken); - LOG("Connected to: %s", remoteToken.c_str()); + server_println(CAPIO_SERVER_CLI_LOG_SERVER, + std::string("Opened connection with ") + remoteToken); + LOG("Opened connection with: %s", remoteToken.c_str()); UserManager.send(ownHostname, HOST_NAME_MAX); const std::lock_guard lg(*_guard); From 0c86a3efb77b558027a7172c640e8baa8beadbcb Mon Sep 17 00:00:00 2001 From: Marco Edoardo Santimaria Date: Mon, 22 Sep 2025 08:52:17 +0200 Subject: [PATCH 13/16] Bugfix in handshake phase --- .../control-plane/multicast_control_plane.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/capio-server/src/communication-service/control-plane/multicast_control_plane.cpp b/capio-server/src/communication-service/control-plane/multicast_control_plane.cpp index f9197c2f0..0d9044c9c 100644 --- a/capio-server/src/communication-service/control-plane/multicast_control_plane.cpp +++ b/capio-server/src/communication-service/control-plane/multicast_control_plane.cpp @@ -31,12 +31,11 @@ void MulticastControlPlane::multicast_server_aliveness_thread( std::to_string(MULTICAST_DISCOVERY_PORT)); while (*continue_execution) { - bzero(incomingMessage, sizeof(incomingMessage)); - // Send port of local data plane backend send_multicast_alive_token(dataplane_backend_port); LOG("Waiting for incoming token..."); do { + bzero(incomingMessage, sizeof(incomingMessage)); const auto recv_sice = recvfrom(discovery_socket, incomingMessage, MULTICAST_ALIVE_TOKEN_MESSAGE_SIZE, 0, reinterpret_cast(&addr), &addrlen); From d9418ee7bfb68917db5bbf239b641edb822d884f Mon Sep 17 00:00:00 2001 From: Marco Edoardo Santimaria Date: Mon, 22 Sep 2025 11:24:26 +0200 Subject: [PATCH 14/16] Fixed memory backend --- .../data-plane/backend_interface.hpp | 21 +++--- .../data-plane/mtcl_backend.hpp | 14 +++- .../client-manager/request_handler_engine.cpp | 4 +- .../data-plane/mtcl_backend.cpp | 74 +++++++++++-------- .../src/storage-service/capio_remote_file.cpp | 5 +- capio-tests/multinode/backend/src/MTCL.hpp | 8 +- scripts/docker-development/docker-compose.yml | 12 +++ 7 files changed, 89 insertions(+), 49 deletions(-) diff --git a/capio-server/include/communication-service/data-plane/backend_interface.hpp b/capio-server/include/communication-service/data-plane/backend_interface.hpp index c9cb386d8..d9ff1d2c6 100644 --- a/capio-server/include/communication-service/data-plane/backend_interface.hpp +++ b/capio-server/include/communication-service/data-plane/backend_interface.hpp @@ -35,10 +35,10 @@ class MessageQueue { std::condition_variable response_cv; public: - void push_request(const std::string &request) { START_LOG(gettid(), "call(req=%s)", request.c_str()); { + LOG("Locking request_mutex"); std::lock_guard lg(request_mutex); LOG("Obtained lock"); request_queue.emplace(request); @@ -48,6 +48,7 @@ class MessageQueue { std::optional try_get_request() { START_LOG(gettid(), "call()"); + LOG("Locking request_mutex"); std::lock_guard lg(request_mutex); LOG("Obtained lock"); if (request_queue.empty()) { @@ -104,14 +105,14 @@ class BackendInterface { * * @param hostname Hostname to request data from * @param filepath Path of the file targeted by the request - * @param buffer Buffer in which data will be available * @param offset Offset relative to the beginning of the file from which to read from * @param count Size of @param buffer and hence size of the fetch operation - * @return Amount of data returned from the remote host + * @return Tuple of size of buffer and pointer to char* with the actual buffer */ - virtual size_t fetchFromRemoteHost(const std::string &hostname, - const std::filesystem::path &filepath, char *buffer, - capio_off64_t offset, capio_off64_t count) { + virtual std::tuple fetchFromRemoteHost(const std::string &hostname, + const std::filesystem::path &filepath, + capio_off64_t offset, + capio_off64_t count) { throw NotImplementedBackendMethod(); }; @@ -129,9 +130,11 @@ class BackendInterface { class NoBackend final : public BackendInterface { public: void connect_to(std::string hostname_port) override { return; }; - size_t fetchFromRemoteHost(const std::string &hostname, const std::filesystem::path &filepath, - char *buffer, capio_off64_t offset, capio_off64_t count) override { - return -1; + std::tuple fetchFromRemoteHost(const std::string &hostname, + const std::filesystem::path &filepath, + capio_off64_t offset, + capio_off64_t count) override { + return {-1, nullptr}; }; std::vector get_open_connections() override { return {}; } diff --git a/capio-server/include/communication-service/data-plane/mtcl_backend.hpp b/capio-server/include/communication-service/data-plane/mtcl_backend.hpp index f14e8c0b0..7f9ecba4f 100644 --- a/capio-server/include/communication-service/data-plane/mtcl_backend.hpp +++ b/capio-server/include/communication-service/data-plane/mtcl_backend.hpp @@ -60,6 +60,14 @@ class MTCLBackend : public BackendInterface { std::unordered_map *open_connections, std::mutex *guard, std::vector *_connection_threads, bool *terminate); + /** + * Explode request into request code and request arguments + * @param req Original request string as received from remote node + * @param args output string with parameters of the request + * @return request code + */ + static int read_next_request(char *req, char *args); + public: explicit MTCLBackend(const std::string &proto, const std::string &port, int sleep_time); @@ -68,8 +76,10 @@ class MTCLBackend : public BackendInterface { void connect_to(std::string hostname_port) override; std::vector get_open_connections() override; - size_t fetchFromRemoteHost(const std::string &hostname, const std::filesystem::path &filepath, - char *buffer, capio_off64_t offset, capio_off64_t count) override; + std::tuple fetchFromRemoteHost(const std::string &hostname, + const std::filesystem::path &filepath, + capio_off64_t offset, + capio_off64_t count) override; }; #endif // MTCL_BACKEND_HPP diff --git a/capio-server/src/client-manager/request_handler_engine.cpp b/capio-server/src/client-manager/request_handler_engine.cpp index 42faccefb..8f0fb70ee 100644 --- a/capio-server/src/client-manager/request_handler_engine.cpp +++ b/capio-server/src/client-manager/request_handler_engine.cpp @@ -1,3 +1,4 @@ +#include #include #include #include @@ -104,7 +105,8 @@ void RequestHandlerEngine::start() const { << std::endl << std::endl; - exit(EXIT_FAILURE); + capio_global_configuration->termination_phase = true; + kill(getpid(), SIGUSR1); } LOG(CAPIO_LOG_SERVER_REQUEST_END); diff --git a/capio-server/src/communication-service/data-plane/mtcl_backend.cpp b/capio-server/src/communication-service/data-plane/mtcl_backend.cpp index a9c2da005..b1b801b11 100644 --- a/capio-server/src/communication-service/data-plane/mtcl_backend.cpp +++ b/capio-server/src/communication-service/data-plane/mtcl_backend.cpp @@ -7,6 +7,23 @@ #include #include +int MTCLBackend::read_next_request(char *req, char *args) { + + START_LOG(gettid(), "call(req=%s)", req); + int code = -1; + auto [ptr, ec] = std::from_chars(req, req + 4, code); + if (ec == std::errc()) { + strcpy(args, ptr + 1); + } else { + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, + "Received invalid code: " + std::to_string(code)); + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, + "Offending request: " + std::string(ptr) + " / " + req); + ERR_EXIT("Invalid request %d:%s", code, ptr); + } + return code; +} + /** * This thread will handle connections towards a single target. */ @@ -45,9 +62,7 @@ void MTCLBackend::serverConnectionHandler(MTCL::HandleUser HandlerPointer, LOG("Send PHASE"); for (int i = 0; i < max_net_op; i++) { // Send of request - LOG("PIPPO"); - const auto request_opt = queue->try_get_request(); - if (request_opt.has_value()) { + if (const auto request_opt = queue->try_get_request(); request_opt.has_value()) { const auto &request = request_opt.value(); LOG("Request to be sent = %s", request.c_str()); HandlerPointer.send(request.c_str(), request.length()); @@ -78,11 +93,11 @@ void MTCLBackend::serverConnectionHandler(MTCL::HandleUser HandlerPointer, HandlerPointer.probe(receive_size, false); if (receive_size > 0) { LOG("A request is incoming"); - int requestCode; - char incoming_request[CAPIO_REQ_MAX_SIZE]; - HandlerPointer.receive(incoming_request, receive_size); + + char incoming_request[CAPIO_REQ_MAX_SIZE], request_args[CAPIO_REQ_MAX_SIZE]; + HandlerPointer.receive(incoming_request, CAPIO_REQ_MAX_SIZE); LOG("Received request = %s", incoming_request); - sscanf(incoming_request, "%d", &requestCode); + int requestCode = read_next_request(incoming_request, request_args); LOG("Request code is %d", requestCode); switch (requestCode) { case HAVE_FINISH_SEND_REQUEST: { @@ -98,8 +113,7 @@ void MTCLBackend::serverConnectionHandler(MTCL::HandleUser HandlerPointer, // Scan request fetch from remote char filepath[PATH_MAX]; capio_off64_t offset, count; - - sscanf(incoming_request, "%s %llu %llu", filepath, &offset, &count); + sscanf(request_args, "%s %llu %llu", filepath, &offset, &count); LOG("filepath=%s, offset=%ld, count=%ld", filepath, offset, count); const auto buffer = new char[count]; auto read_size = @@ -155,13 +169,14 @@ void MTCLBackend::incomingConnectionListener( LOG("Received connection hostname: %s", connected_hostname); - const std::lock_guard lock(*guard); - - open_connections->insert({connected_hostname, {}}); - - _connection_threads->push_back(new std::thread( - serverConnectionHandler, std::move(UserManager), connected_hostname, - open_connections->at(connected_hostname), sleep_time, terminate, FROM_REMOTE)); + auto *queue = new MessageQueue(); + { + const std::lock_guard lock(*guard); + open_connections->insert({connected_hostname, queue}); + } + _connection_threads->push_back(new std::thread(serverConnectionHandler, + std::move(UserManager), connected_hostname, + queue, sleep_time, terminate, FROM_REMOTE)); } } @@ -188,14 +203,14 @@ void MTCLBackend::connect_to(std::string hostname_port) { std::string("Opened connection with ") + remoteToken); LOG("Opened connection with: %s", remoteToken.c_str()); UserManager.send(ownHostname, HOST_NAME_MAX); - const std::lock_guard lg(*_guard); - - open_connections.insert({remoteHost, {}}); - - auto queue_elem = open_connections.at(remoteHost); + auto *queue = new MessageQueue(); + { + const std::lock_guard lg(*_guard); + open_connections.insert({remoteHost, queue}); + } connection_threads.push_back(new std::thread(serverConnectionHandler, - std::move(UserManager), remoteHost, queue_elem, + std::move(UserManager), remoteHost, queue, thread_sleep_times, terminate, TO_REMOTE)); } else { server_println(CAPIO_SERVER_CLI_LOG_SERVER_WARNING, "Warning: tried to connect to " + @@ -264,16 +279,17 @@ std::vector MTCLBackend::get_open_connections() { return keys; } -size_t MTCLBackend::fetchFromRemoteHost(const std::string &hostname, - const std::filesystem::path &filepath, char *buffer, - capio_off64_t offset, capio_off64_t count) { +std::tuple MTCLBackend::fetchFromRemoteHost(const std::string &hostname, + const std::filesystem::path &filepath, + capio_off64_t offset, + capio_off64_t count) { START_LOG(gettid(), "call(hostname=%s, path=%s, offset=%ld, count=%ld)", hostname.c_str(), filepath.c_str(), offset, count); char REQUEST[CAPIO_REQ_MAX_SIZE]; - sprintf(REQUEST, "%03d %s %llu %llu", FETCH_FROM_REMOTE, filepath.c_str(), offset, count); + sprintf(REQUEST, "%04d %s %llu %llu", FETCH_FROM_REMOTE, filepath.c_str(), offset, count); LOG("Sending request %s. Fetching queue to hostname %s", REQUEST, hostname.c_str()); auto queues = open_connections.at(hostname); LOG("obtained access to queue"); @@ -281,8 +297,6 @@ size_t MTCLBackend::fetchFromRemoteHost(const std::string &hostname, queues->push_request(REQUEST); LOG("Request pushed to output queue"); auto [buff_size, response_buffer] = queues->get_response(); - LOG("Obtained response. Buffer size of response is $ld", buff_size); - storage_service->storeData(filepath, offset, buff_size, response_buffer); - - return 0; + LOG("Obtained response. Buffer size of response is %ld", buff_size); + return std::make_tuple(buff_size, response_buffer); } diff --git a/capio-server/src/storage-service/capio_remote_file.cpp b/capio-server/src/storage-service/capio_remote_file.cpp index d896d9b97..05cbc1b0d 100644 --- a/capio-server/src/storage-service/capio_remote_file.cpp +++ b/capio-server/src/storage-service/capio_remote_file.cpp @@ -27,10 +27,9 @@ std::size_t CapioRemoteFile::writeToQueue(SPSCQueue &queue, std::size_t offset, std::size_t length) const { START_LOG(gettid(), "call(offset=%ld,count=%ld,path=%s)", offset, length, fileName.c_str()); - auto buffer = new char[length]; - auto buffer_size = - capio_backend->fetchFromRemoteHost(this->homeNode, this->fileName, buffer, offset, length); + auto [buffer_size, buffer] = + capio_backend->fetchFromRemoteHost(this->homeNode, this->fileName, offset, length); queue.write(buffer, buffer_size); diff --git a/capio-tests/multinode/backend/src/MTCL.hpp b/capio-tests/multinode/backend/src/MTCL.hpp index 19bf9192c..0f9e82e39 100644 --- a/capio-tests/multinode/backend/src/MTCL.hpp +++ b/capio-tests/multinode/backend/src/MTCL.hpp @@ -33,10 +33,10 @@ TEST(CapioCommServiceTest, TestPingPong) { std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << "Sending ping to: " << i << std::endl; char buff[BUFFER_SIZES]{0}, buff1[BUFFER_SIZES]{0}; memcpy(buff, TEST_MESSAGE, strlen(TEST_MESSAGE)); - capio_backend->send(i, buff, BUFFER_SIZES, "./test", 0); + // capio_backend->send(i, buff, BUFFER_SIZES, "./test", 0); std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << "sent ping to: " << i << ". Waiting for response" << std::endl; - capio_backend->receive(buff1, &size_revc, &offset); + // capio_backend->receive(buff1, &size_revc, &offset); std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << "Received ping response from : " << i << std::endl; EXPECT_EQ(strcmp(buff, buff1), 0); @@ -46,10 +46,10 @@ TEST(CapioCommServiceTest, TestPingPong) { std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << "Listening for ping from: " << i << std::endl; char recvBuff[BUFFER_SIZES]; - capio_backend->receive(recvBuff, &size_revc, &offset); + // capio_backend->receive(recvBuff, &size_revc, &offset); std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << "Received ping from: " << i << std::endl; EXPECT_EQ(strcmp(recvBuff, TEST_MESSAGE), 0); - capio_backend->send(i, recvBuff, size_revc, "./test", 0); + // capio_backend->send(i, recvBuff, size_revc, "./test", 0); std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << "Sent ping response to: " << i << std::endl; delete communication_service; return; diff --git a/scripts/docker-development/docker-compose.yml b/scripts/docker-development/docker-compose.yml index 5a4ef82cc..cfcdc57fe 100644 --- a/scripts/docker-development/docker-compose.yml +++ b/scripts/docker-development/docker-compose.yml @@ -11,6 +11,12 @@ services: - node1 environment: - CAPIO_LOG_LEVEL=-1 + cap_add: + - SYS_PTRACE + security_opt: + - seccomp=unconfined + ports: + - "1111:1111" # Use 1111 for gdbserver node2: image: alphaunito/capio:latest @@ -24,6 +30,12 @@ services: - node2 environment: - CAPIO_LOG_LEVEL=-1 + cap_add: + - SYS_PTRACE + security_opt: + - seccomp=unconfined + ports: + - "1112:1112" # Use 1111 for gdbserver volumes: From b9985b6eb4347e71260780f9a4006e45818ecc10 Mon Sep 17 00:00:00 2001 From: Marco Edoardo Santimaria Date: Mon, 22 Sep 2025 15:32:52 +0200 Subject: [PATCH 15/16] Typos and comments --- .../data-plane/mtcl_backend.hpp | 25 ++++++------- .../control-plane/multicast_control_plane.cpp | 2 +- .../data-plane/mtcl_backend.cpp | 35 ++++++++----------- 3 files changed, 27 insertions(+), 35 deletions(-) diff --git a/capio-server/include/communication-service/data-plane/mtcl_backend.hpp b/capio-server/include/communication-service/data-plane/mtcl_backend.hpp index 7f9ecba4f..8d2330fdb 100644 --- a/capio-server/include/communication-service/data-plane/mtcl_backend.hpp +++ b/capio-server/include/communication-service/data-plane/mtcl_backend.hpp @@ -7,7 +7,7 @@ #include /** - * This avoids to include the MTCL library here as it is a header only library. + * This avoids it to include the MTCL library here as it is a header-only library. * this is equivalent to use extern in C but for class */ namespace MTCL { @@ -15,7 +15,6 @@ class HandleUser; } class MTCLBackend : public BackendInterface { - typedef enum { FROM_REMOTE, TO_REMOTE } CONN_HANDLER_ORIGIN; std::string selfToken, connectedHostname, ownPort, usedProtocol; std::unordered_map open_connections; @@ -29,23 +28,21 @@ class MTCLBackend : public BackendInterface { /** * This thread handles a single p2p connection with another capio_server instance - * @param HandlerPointer - * @param remote_hostname - * @param outbound_messages - * @param inbound_messages queue of inbound results for outbound message, with pair of buffer - * ond buffer size - * @param sleep_time - * @param terminate - * @param source + * @param HandlerPointer A MTCL valid HandlePointer + * @param remote_hostname The remote endpoint of the connection handled by this thread + * @param queue A pointer to a queue to communicate with other components. Queue has pointers to + * inbound and outbound sub-queues for inbound and outbound messages + * @param sleep_time How long to sleep between thread cycle + * @param terminate A reference to a boolean heap allocated variable that is controlled by the + * main thread that tells when to terminate the execution */ void static serverConnectionHandler(MTCL::HandleUser HandlerPointer, const std::string &remote_hostname, MessageQueue *queue, - int sleep_time, const bool *terminate, - CONN_HANDLER_ORIGIN source); + int sleep_time, const bool *terminate); /** * Waits for incoming new requests to connect to new server instances. When a new request - * arrives it then handshakes with the remote servers, opening a new connection, and starting a + * arrives, it then handshakes with the remote servers, opening a new connection, and starting a * new thread that will handle remote requests * * @param continue_execution @@ -62,7 +59,7 @@ class MTCLBackend : public BackendInterface { /** * Explode request into request code and request arguments - * @param req Original request string as received from remote node + * @param req Original request string as received from the remote node * @param args output string with parameters of the request * @return request code */ diff --git a/capio-server/src/communication-service/control-plane/multicast_control_plane.cpp b/capio-server/src/communication-service/control-plane/multicast_control_plane.cpp index 0d9044c9c..378d75147 100644 --- a/capio-server/src/communication-service/control-plane/multicast_control_plane.cpp +++ b/capio-server/src/communication-service/control-plane/multicast_control_plane.cpp @@ -110,7 +110,7 @@ void MulticastControlPlane::multicast_control_plane_incoming_thread( default: LOG("WARNING: unknown / unhandled event: %s", incoming_msg); server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, - "Unknown/Unhandled message recived: " + std::string(incoming_msg)); + "Unknown/Unhandled message received: " + std::string(incoming_msg)); } LOG("Completed handling of event"); } diff --git a/capio-server/src/communication-service/data-plane/mtcl_backend.cpp b/capio-server/src/communication-service/data-plane/mtcl_backend.cpp index b1b801b11..2e97f1398 100644 --- a/capio-server/src/communication-service/data-plane/mtcl_backend.cpp +++ b/capio-server/src/communication-service/data-plane/mtcl_backend.cpp @@ -1,9 +1,8 @@ -#include "include/file-manager/file_manager.hpp" -#include "include/storage-service/capio_storage_service.hpp" - #include #include #include +#include +#include #include #include @@ -26,22 +25,18 @@ int MTCLBackend::read_next_request(char *req, char *args) { /** * This thread will handle connections towards a single target. + * Algorithm works in this way. In the beginning, the role of the node that starts to send is + * chosen as the smaller lexicographically between the two hostnames involved in the + * communication. Then two phases of sending and receiving up to max_net_op are performed. + * The two nodes switch phases after a final synchronization with the special request + * HAVE_FINISH_SEND_REQUEST. + * When the sender sends this request, either because it has reached the max_net_op or because + * there are no more messages to be sent, the two nodes switch roles. This continues until the + * remote handle pointer is valid */ void MTCLBackend::serverConnectionHandler(MTCL::HandleUser HandlerPointer, const std::string &remote_hostname, MessageQueue *queue, - const int sleep_time, const bool *terminate, - const CONN_HANDLER_ORIGIN source) { - constexpr int max_net_op = 10; - /* - * Algorithm works in this way. At the beginning, the role of the node that starts to send is - * chosen as the smaller lexicographically between the two hostnames involved in the - * communication. Then two phases of sending and receiving up to max_net_op are performed. - * The two nodes switches phases after a final synchronization with the special request - * HAVE_FINISH_SEND_REQUEST. - * When the sender sends this request, either because it has reached the max_net_op or because - * there are no more messages to be sent, the two nodes switches roles. This continues until the - * remote handle pointer is valid - */ + const int sleep_time, const bool *terminate) { char ownHostname[HOST_NAME_MAX]; gethostname(ownHostname, HOST_NAME_MAX); @@ -50,8 +45,7 @@ void MTCLBackend::serverConnectionHandler(MTCL::HandleUser HandlerPointer, char request_has_finished_to_send[CAPIO_REQ_MAX_SIZE]{0}; sprintf(request_has_finished_to_send, "%03d", HAVE_FINISH_SEND_REQUEST); - START_LOG(gettid(), "call(remote_hostname=%s, kind=%s)", remote_hostname.c_str(), - source == FROM_REMOTE ? "from remote server" : "to remote server"); + START_LOG(gettid(), "call(remote_hostname=%s)", remote_hostname.c_str()); LOG("Will begin execution with %s phase", my_turn_to_send ? "sending" : "receiving"); @@ -59,6 +53,7 @@ void MTCLBackend::serverConnectionHandler(MTCL::HandleUser HandlerPointer, // execute up to N operation of send &/or receive, to avoid starvation if (my_turn_to_send) { + constexpr int max_net_op = 10; LOG("Send PHASE"); for (int i = 0; i < max_net_op; i++) { // Send of request @@ -176,7 +171,7 @@ void MTCLBackend::incomingConnectionListener( } _connection_threads->push_back(new std::thread(serverConnectionHandler, std::move(UserManager), connected_hostname, - queue, sleep_time, terminate, FROM_REMOTE)); + queue, sleep_time, terminate)); } } @@ -211,7 +206,7 @@ void MTCLBackend::connect_to(std::string hostname_port) { } connection_threads.push_back(new std::thread(serverConnectionHandler, std::move(UserManager), remoteHost, queue, - thread_sleep_times, terminate, TO_REMOTE)); + thread_sleep_times, terminate)); } else { server_println(CAPIO_SERVER_CLI_LOG_SERVER_WARNING, "Warning: tried to connect to " + std::string(remoteHost) + From d4c54d304a926c56d03eb61c304304d4651779d2 Mon Sep 17 00:00:00 2001 From: Marco Edoardo Santimaria Date: Mon, 22 Sep 2025 16:06:26 +0200 Subject: [PATCH 16/16] Added tests for backend --- .../multinode/backend/docker-compose.yml | 12 ++- capio-tests/multinode/backend/src/MTCL.hpp | 95 ++++++++++--------- 2 files changed, 60 insertions(+), 47 deletions(-) diff --git a/capio-tests/multinode/backend/docker-compose.yml b/capio-tests/multinode/backend/docker-compose.yml index ec80cbff3..7a4cfc8a4 100644 --- a/capio-tests/multinode/backend/docker-compose.yml +++ b/capio-tests/multinode/backend/docker-compose.yml @@ -11,7 +11,11 @@ services: - node1 environment: - CAPIO_LOG_LEVEL=-1 - command: capio_backend_unit_tests --gtest_break_on_failure --gtest_print_time=1 + - APP_TYPE=writer + - CAPIO_DIR=. + command: | + bash -c " capio_server -b tcp --mem-only --no-config & \ + LD_PRELOAD=libcapio_posix.so capio_backend_unit_tests --gtest_break_on_failure --gtest_print_time=1" node2: image: alphaunito/capio:latest @@ -25,7 +29,11 @@ services: - node2 environment: - CAPIO_LOG_LEVEL=-1 - command: bash -c "sleep 5 && capio_backend_unit_tests --gtest_break_on_failure --gtest_print_time=1" + - APP_TYPE=reader + - CAPIO_DIR=. + command: | + sleep 5 && bash -c " capio_server -b tcp --mem-only --no-config & \ + LD_PRELOAD=libcapio_posix.so capio_backend_unit_tests --gtest_break_on_failure --gtest_print_time=1" volumes: shared_data: diff --git a/capio-tests/multinode/backend/src/MTCL.hpp b/capio-tests/multinode/backend/src/MTCL.hpp index 0f9e82e39..a131f40b6 100644 --- a/capio-tests/multinode/backend/src/MTCL.hpp +++ b/capio-tests/multinode/backend/src/MTCL.hpp @@ -8,51 +8,56 @@ #include #include -constexpr char TEST_MESSAGE[] = "hello world how is it going?"; -constexpr capio_off64_t BUFFER_SIZES = 1024; - -TEST(CapioCommServiceTest, TestPingPong) { - START_LOG(gettid(), "INFO: TestPingPong"); - const int port = 1234; - std::string proto = "TCP"; - auto communication_service = new CapioCommunicationService(proto, port, "multicast"); - capio_off64_t size_revc, offset; - - std::vector connections; - - do { - connections = capio_backend->get_open_connections(); - std::this_thread::sleep_for(std::chrono::milliseconds(300)); - } while (connections.empty()); - - char ownHostname[HOST_NAME_MAX] = {0}; - gethostname(ownHostname, HOST_NAME_MAX); - - for (const auto &i : connections) { - if (i.compare(ownHostname) < 0) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << "Sending ping to: " << i << std::endl; - char buff[BUFFER_SIZES]{0}, buff1[BUFFER_SIZES]{0}; - memcpy(buff, TEST_MESSAGE, strlen(TEST_MESSAGE)); - // capio_backend->send(i, buff, BUFFER_SIZES, "./test", 0); - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << "sent ping to: " << i - << ". Waiting for response" << std::endl; - // capio_backend->receive(buff1, &size_revc, &offset); - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << "Received ping response from : " << i - << std::endl; - EXPECT_EQ(strcmp(buff, buff1), 0); - delete communication_service; - return; - } - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << "Listening for ping from: " << i - << std::endl; - char recvBuff[BUFFER_SIZES]; - // capio_backend->receive(recvBuff, &size_revc, &offset); - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << "Received ping from: " << i << std::endl; - EXPECT_EQ(strcmp(recvBuff, TEST_MESSAGE), 0); - // capio_backend->send(i, recvBuff, size_revc, "./test", 0); - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << "Sent ping response to: " << i << std::endl; - delete communication_service; - return; +const char *filename = "data.bin"; +const size_t chunkSize = 1024; +const size_t totalSize = 2048; + +inline int writer() { + + char buffer[chunkSize]; + for (int i = 0; i < chunkSize; i++) { + buffer[i] = i % 26 + 'A'; + } + + FILE *fp = fopen(filename, "wb"); + EXPECT_NE(fp, nullptr); + + for (int i = 0; i < totalSize / chunkSize; i++) { + EXPECT_EQ(fwrite(buffer, 1, chunkSize, fp), chunkSize); + } + + fclose(fp); + printf("Wrote %zu bytes to %s\n", totalSize, filename); + return 0; +} + +inline int reader() { + + char buffer[chunkSize]; + size_t totalRead = 0; + + FILE *fp = fopen(filename, "rb"); + EXPECT_NE(fp, nullptr); + + // Read in 1024-byte chunks until EOF + size_t bytesRead; + while ((bytesRead = fread(buffer, 1, chunkSize, fp)) > 0) { + totalRead += bytesRead; + } + + EXPECT_EQ(totalRead, totalSize); + + fclose(fp); + printf("Total read: %zu bytes\n", totalRead); + return 0; +} + +TEST(CapioCommServiceTest, TestCapioMemoryNetworkBackend) { + + if (const auto program = std::getenv("APP_TYPE"); std::string(program) == "writer") { + EXPECT_EQ(writer(), 0); + } else { + EXPECT_EQ(reader(), 0); } }