diff --git a/capio-server/include/capio-cl-engine/capio_cl_engine.hpp b/capio-server/include/capio-cl-engine/capio_cl_engine.hpp index 6337c5dde..6a49e798d 100644 --- a/capio-server/include/capio-cl-engine/capio_cl_engine.hpp +++ b/capio-server/include/capio-cl-engine/capio_cl_engine.hpp @@ -84,7 +84,7 @@ class CapioCLEngine { void setStoreFileInFileSystem(const std::filesystem::path &path); bool storeFileInMemory(const std::filesystem::path &path); std::vector getFileToStoreInMemory(); - auto get_home_node(const std::string &path); + std::string get_home_node(const std::string &path); }; inline CapioCLEngine *capio_cl_engine; diff --git a/capio-server/include/communication-service/control-plane/capio_control_plane.hpp b/capio-server/include/communication-service/control-plane/capio_control_plane.hpp index ed369d71b..11c83d9ce 100644 --- a/capio-server/include/communication-service/control-plane/capio_control_plane.hpp +++ b/capio-server/include/communication-service/control-plane/capio_control_plane.hpp @@ -4,7 +4,7 @@ class CapioControlPlane { public: - typedef enum { CREATE, DELETE, WRITE } event_type; + typedef enum { CREATE, WRITE, CLOSE, COMMIT } event_type; virtual ~CapioControlPlane() = default; diff --git a/capio-server/include/communication-service/data-plane/backend_interface.hpp b/capio-server/include/communication-service/data-plane/backend_interface.hpp index df0e26031..d9ff1d2c6 100644 --- a/capio-server/include/communication-service/data-plane/backend_interface.hpp +++ b/capio-server/include/communication-service/data-plane/backend_interface.hpp @@ -1,12 +1,85 @@ #ifndef BACKEND_INTERFACE_HPP #define BACKEND_INTERFACE_HPP -#include "capio/constants.hpp" +#include +#include +#include #include +#include +#include +#include +#include #include #include +#include #include +class MessageQueue { + class ResponseToRequest { + public: + std::string original_request; + char *response; + capio_off64_t response_size; + + ResponseToRequest(std::string request, char *buf, capio_off64_t buff_size) + : original_request(std::move(request)), response(buf), response_size(buff_size) {} + }; + + std::queue request_queue; + std::queue response_queue; + + std::mutex request_mutex; + std::mutex response_mutex; + + std::condition_variable request_cv; + std::condition_variable response_cv; + + public: + void push_request(const std::string &request) { + START_LOG(gettid(), "call(req=%s)", request.c_str()); + { + LOG("Locking request_mutex"); + std::lock_guard lg(request_mutex); + LOG("Obtained lock"); + request_queue.emplace(request); + } + request_cv.notify_one(); + } + + std::optional try_get_request() { + START_LOG(gettid(), "call()"); + LOG("Locking request_mutex"); + std::lock_guard lg(request_mutex); + LOG("Obtained lock"); + if (request_queue.empty()) { + return std::nullopt; + } + std::string req = std::move(request_queue.front()); + request_queue.pop(); + return req; + } + + void push_response(char *buffer, capio_off64_t buff_size, std::string origin) { + START_LOG(gettid(), "call(origin=%s, buff_size=%ld)", origin.c_str(), buff_size); + { + std::lock_guard lg(response_mutex); + LOG("Obtained lock"); + response_queue.emplace(std::move(origin), buffer, buff_size); + } + response_cv.notify_one(); + } + + std::tuple get_response() { + START_LOG(gettid(), "call()"); + std::unique_lock lk(response_mutex); + response_cv.wait(lk, [this] { return !response_queue.empty(); }); + LOG("Obtained lock"); + auto response = std::move(response_queue.front()); + response_queue.pop(); + return {response.response_size, response.response}; + } +}; + class NotImplementedBackendMethod : public std::exception { public: [[nodiscard]] const char *what() const noexcept override { @@ -17,6 +90,9 @@ class NotImplementedBackendMethod : public std::exception { }; class BackendInterface { + protected: + typedef enum { HAVE_FINISH_SEND_REQUEST, FETCH_FROM_REMOTE } BackendRequest_t; + public: virtual ~BackendInterface() = default; @@ -25,29 +101,18 @@ class BackendInterface { */ virtual void connect_to(std::string hostname_port) { throw NotImplementedBackendMethod(); }; - /** - * @brief Send data to target + /** Fetch a chunk of CapioFile internal data from remote host * - * @param target Hostname of remote target - * @param buf pointer to data to sent - * @param buf_size length of@param filepath - * @param start_offset - * @param buf + * @param hostname Hostname to request data from + * @param filepath Path of the file targeted by the request + * @param offset Offset relative to the beginning of the file from which to read from + * @param count Size of @param buffer and hence size of the fetch operation + * @return Tuple of size of buffer and pointer to char* with the actual buffer */ - virtual void send(const std::string &target, char *buf, uint64_t buf_size, - const std::string &filepath, capio_off64_t start_offset) { - throw NotImplementedBackendMethod(); - }; - - /** - * @brief receive data - * - * @param buf allocated data buffer - * @param buf_size size of @param buf - * @param start_offset - * @return std::string hostname of sender - */ - virtual std::string receive(char *buf, capio_off64_t *buf_size, capio_off64_t *start_offset) { + virtual std::tuple fetchFromRemoteHost(const std::string &hostname, + const std::filesystem::path &filepath, + capio_off64_t offset, + capio_off64_t count) { throw NotImplementedBackendMethod(); }; @@ -65,16 +130,13 @@ class BackendInterface { class NoBackend final : public BackendInterface { public: void connect_to(std::string hostname_port) override { return; }; - - void send(const std::string &target, char *buf, uint64_t buf_size, const std::string &filepath, - capio_off64_t start_offset) override { - return; + std::tuple fetchFromRemoteHost(const std::string &hostname, + const std::filesystem::path &filepath, + capio_off64_t offset, + capio_off64_t count) override { + return {-1, nullptr}; }; - std::string receive(char *buf, capio_off64_t *buf_size, capio_off64_t *start_offset) override { - return {"no-backend"}; - } - std::vector get_open_connections() override { return {}; } }; diff --git a/capio-server/include/communication-service/data-plane/mtcl_backend.hpp b/capio-server/include/communication-service/data-plane/mtcl_backend.hpp index 96ef6eba8..8d2330fdb 100644 --- a/capio-server/include/communication-service/data-plane/mtcl_backend.hpp +++ b/capio-server/include/communication-service/data-plane/mtcl_backend.hpp @@ -2,12 +2,12 @@ #define MTCL_BACKEND_HPP #include -#include #include #include +#include /** - * This avoid to include the MTCL librari here as it is a header only library. + * This avoids it to include the MTCL library here as it is a header-only library. * this is equivalent to use extern in C but for class */ namespace MTCL { @@ -15,12 +15,9 @@ class HandleUser; } class MTCLBackend : public BackendInterface { - typedef enum { FROM_REMOTE, TO_REMOTE } CONN_HANDLER_ORIGIN; - typedef std::tuple *, std::queue *, std::mutex *> - TransportUnitInterface; - std::unordered_map connected_hostnames_map; std::string selfToken, connectedHostname, ownPort, usedProtocol; + std::unordered_map open_connections; char ownHostname[HOST_NAME_MAX] = {0}; int thread_sleep_times = 0; bool *continue_execution = new bool; @@ -29,36 +26,57 @@ class MTCLBackend : public BackendInterface { std::vector connection_threads; bool *terminate; - static TransportUnit *receive_unit(MTCL::HandleUser *HandlerPointer); - - static void send_unit(MTCL::HandleUser *HandlerPointer, const TransportUnit *unit); - /** - * This thread will handle connections towards a single target. + * This thread handles a single p2p connection with another capio_server instance + * @param HandlerPointer A MTCL valid HandlePointer + * @param remote_hostname The remote endpoint of the connection handled by this thread + * @param queue A pointer to a queue to communicate with other components. Queue has pointers to + * inbound and outbound sub-queues for inbound and outbound messages + * @param sleep_time How long to sleep between thread cycle + * @param terminate A reference to a boolean heap allocated variable that is controlled by the + * main thread that tells when to terminate the execution */ - void static server_connection_handler(MTCL::HandleUser HandlerPointer, - const std::string remote_hostname, const int sleep_time, - TransportUnitInterface interface, const bool *terminate, - CONN_HANDLER_ORIGIN source); + void static serverConnectionHandler(MTCL::HandleUser HandlerPointer, + const std::string &remote_hostname, MessageQueue *queue, + int sleep_time, const bool *terminate); - void static incoming_connection_listener( + /** + * Waits for incoming new requests to connect to new server instances. When a new request + * arrives, it then handshakes with the remote servers, opening a new connection, and starting a + * new thread that will handle remote requests + * + * @param continue_execution + * @param sleep_time + * @param open_connections + * @param guard + * @param _connection_threads + * @param terminate + */ + void static incomingConnectionListener( const bool *continue_execution, int sleep_time, - std::unordered_map *open_connections, - std::mutex *guard, std::vector *_connection_threads, bool *terminate); + std::unordered_map *open_connections, std::mutex *guard, + std::vector *_connection_threads, bool *terminate); - public: - void connect_to(std::string hostname_port) override; + /** + * Explode request into request code and request arguments + * @param req Original request string as received from the remote node + * @param args output string with parameters of the request + * @return request code + */ + static int read_next_request(char *req, char *args); + public: explicit MTCLBackend(const std::string &proto, const std::string &port, int sleep_time); ~MTCLBackend() override; - std::string receive(char *buf, capio_off64_t *buf_size, capio_off64_t *start_offset) override; - - void send(const std::string &target, char *buf, uint64_t buf_size, const std::string &filepath, - const capio_off64_t start_offset) override; + void connect_to(std::string hostname_port) override; std::vector get_open_connections() override; + std::tuple fetchFromRemoteHost(const std::string &hostname, + const std::filesystem::path &filepath, + capio_off64_t offset, + capio_off64_t count) override; }; #endif // MTCL_BACKEND_HPP diff --git a/capio-server/include/communication-service/data-plane/transport_unit.hpp b/capio-server/include/communication-service/data-plane/transport_unit.hpp deleted file mode 100644 index 62ae5abf9..000000000 --- a/capio-server/include/communication-service/data-plane/transport_unit.hpp +++ /dev/null @@ -1,22 +0,0 @@ -#ifndef TRANSPORT_UNIT_HPP -#define TRANSPORT_UNIT_HPP - -#include -#include - -class TransportUnit { - protected: - std::string _filepath; - char *_bytes{}; - capio_off64_t _buffer_size{}; - capio_off64_t _start_write_offset{}; - - public: - TransportUnit() = default; - - ~TransportUnit() { delete[] _bytes; } - - friend class MTCLBackend; -}; - -#endif // TRANSPORT_UNIT_HPP diff --git a/capio-server/include/storage-service/capio_file.hpp b/capio-server/include/storage-service/capio_file.hpp index 080fa9efb..60521d7d0 100644 --- a/capio-server/include/storage-service/capio_file.hpp +++ b/capio-server/include/storage-service/capio_file.hpp @@ -10,13 +10,17 @@ class CapioFile { protected: - const std::string fileName; + const std::string fileName, homeNode; std::size_t totalSize; public: - explicit CapioFile(const std::string &filePath) : fileName(filePath), totalSize(0) {}; + explicit CapioFile(const std::string &filePath, + const std::string &home_node = capio_global_configuration->node_name) + : fileName(filePath), homeNode(home_node), totalSize(0) {}; virtual ~CapioFile() = default; + virtual bool is_remote() { return false; } + [[nodiscard]] std::size_t getSize() const { START_LOG(gettid(), "call()"); return totalSize; @@ -60,6 +64,12 @@ class CapioFile { */ virtual std::size_t writeToQueue(SPSCQueue &queue, std::size_t offset, std::size_t length) const = 0; + + /** + * + * @return HomeNode of file + */ + virtual std::string getHomeNode() { return homeNode; }; }; class CapioMemoryFile : public CapioFile { @@ -131,10 +141,12 @@ class CapioMemoryFile : public CapioFile { class CapioRemoteFile : public CapioFile { public: - explicit CapioRemoteFile(const std::string &filePath); + explicit CapioRemoteFile(const std::string &filePath, const std::string &home_node); ~CapioRemoteFile() override; + bool is_remote() override { return true; }; + /** * Write data to a file stored inside the memory * @param buffer buffer to store inside memory (i.e. content of the file) diff --git a/capio-server/include/storage-service/capio_storage_service.hpp b/capio-server/include/storage-service/capio_storage_service.hpp index 4dd234b86..5ded4916c 100644 --- a/capio-server/include/storage-service/capio_storage_service.hpp +++ b/capio-server/include/storage-service/capio_storage_service.hpp @@ -33,8 +33,9 @@ class CapioStorageService { * Create a CapioRemoteFile, after checking that an instance of CapioMemoryFile (meaning a local * file) is not present * @param file_name file path + * @param home_node */ - void createRemoteFile(const std::string &file_name) const; + void createRemoteFile(const std::string &file_name, const std::string &home_node) const; void deleteFile(const std::string &file_name) const; @@ -69,7 +70,7 @@ class CapioStorageService { * @param app_name * @param pid */ - void register_client(const std::string &app_name, const pid_t pid) const; + void register_client(const std::string &app_name, pid_t pid) const; /** * Send the file content to a client application @@ -77,9 +78,10 @@ class CapioStorageService { * @param file * @param offset * @param size + * @return size sent to client */ - void reply_to_client(pid_t pid, const std::string &file, capio_off64_t offset, - capio_off64_t size) const; + size_t reply_to_client(pid_t pid, const std::string &file, capio_off64_t offset, + capio_off64_t size) const; /** * Send raw data to client without fetching from the storage manager itself @@ -87,7 +89,7 @@ class CapioStorageService { * @param data * @param len */ - void reply_to_client_raw(pid_t pid, const char *data, const capio_off64_t len) const; + void reply_to_client_raw(pid_t pid, const char *data, capio_off64_t len) const; /** * Receive the file content from the client application @@ -105,9 +107,23 @@ class CapioStorageService { * @param pid * @return */ - [[nodiscard]] size_t sendFilesToStoreInMemory(const long pid) const; + [[nodiscard]] size_t sendFilesToStoreInMemory(long pid) const; - void remove_client(const pid_t pid) const; + void remove_client(pid_t pid) const; + + void storeData(const std::filesystem::path &path, capio_off64_t offset, capio_off64_t buff_size, + const char *buffer) const; + + /** + * Read data from file and store it inside buffer + * @param filepath filepath + * @param offset starting read offset + * @param buffer targeted read buffer + * @param count Requested read size + * @return Actual size of read + */ + size_t readFromFileToBuffer(const std::filesystem::path &filepath, capio_off64_t offset, + char *buffer, capio_off64_t count) const; }; inline CapioStorageService *storage_service; diff --git a/capio-server/src/capio-cl-engine/capio_cl_engine.cpp b/capio-server/src/capio-cl-engine/capio_cl_engine.cpp index 4cb60859e..011318439 100644 --- a/capio-server/src/capio-cl-engine/capio_cl_engine.cpp +++ b/capio-server/src/capio-cl-engine/capio_cl_engine.cpp @@ -420,7 +420,7 @@ std::vector CapioCLEngine::getFileToStoreInMemory() { return files; } -auto CapioCLEngine::get_home_node(const std::string &path) { +std::string CapioCLEngine::get_home_node(const std::string &path) { // TODO: understand here how to get the home node policy. START_LOG(gettid(), "call(path=%s)", path.c_str()); if (const auto location = _locations.find(path); location == _locations.end()) { diff --git a/capio-server/src/client-manager/handlers/open.cpp b/capio-server/src/client-manager/handlers/open.cpp index c8993eba2..23f3f1eeb 100644 --- a/capio-server/src/client-manager/handlers/open.cpp +++ b/capio-server/src/client-manager/handlers/open.cpp @@ -27,7 +27,7 @@ void open_handler(const char *const str) { * At this point, the file that needs to be created more likely than not is not local to the * machine. As such, we call the creation of a new CapioRemoteFile */ - storage_service->createRemoteFile(path); + storage_service->createRemoteFile(path, {}); return; } diff --git a/capio-server/src/client-manager/handlers/read.cpp b/capio-server/src/client-manager/handlers/read.cpp index 6dfe34b57..d38f4ffb5 100644 --- a/capio-server/src/client-manager/handlers/read.cpp +++ b/capio-server/src/client-manager/handlers/read.cpp @@ -58,35 +58,16 @@ void read_mem_handler(const char *const str) { tid, read_begin_offset, read_size, client_cache_line_size, use_cache ? "true" : "false", path); - if (storage_service->sizeOf(path) < read_begin_offset + read_size && - !file_manager->isCommitted(path)) { - LOG("File is not yet ready to be consumed as there is not enough data, and is not " - "committed"); - storage_service->addThreadWaitingForData(tid, path, read_begin_offset, read_size); - return; - } - - capio_off64_t size_to_send = storage_service->sizeOf(path); - if (use_cache) { - LOG("Computing size of data to send: minimum between:"); - LOG("client_cache_line_size: %llu", client_cache_line_size); - LOG("file_size:%llu - read_begin_offset=%llu = %llu", size_to_send, read_begin_offset, - size_to_send - read_begin_offset); - size_to_send = std::min({client_cache_line_size, (size_to_send - read_begin_offset)}); - } + capio_off64_t size_to_send = std::min({client_cache_line_size, read_size}); + LOG("Will try to send to client up to %ld bytes", size_to_send); + auto size_sent = storage_service->reply_to_client(tid, path, read_begin_offset, size_to_send); LOG("Sending to posix app the offset up to which read."); - if (file_manager->isCommitted(path) && - read_begin_offset + size_to_send >= storage_service->sizeOf(path)) { - LOG("File is committed, and end of read >= than file size." - " signaling it to posix application by setting offset MSB to 1"); - LOG("Sending offset: %llu", 0x8000000000000000 | size_to_send); - client_manager->reply_to_client(tid, 0x8000000000000000 | size_to_send); - } else { - LOG("File is not committed. Sending offset: %llu", size_to_send); - client_manager->reply_to_client(tid, size_to_send); + if (file_manager->isCommitted(path)) { + LOG("File is committed, setting MSB to 1"); + size_sent |= 0x8000000000000000; } - LOG("Need to sent to client %llu bytes, asking storage service to send data", size_to_send); - storage_service->reply_to_client(tid, path, read_begin_offset, size_to_send); + LOG("Telling client to read %ld bytes", size_sent); + client_manager->reply_to_client(tid, size_sent); } diff --git a/capio-server/src/client-manager/request_handler_engine.cpp b/capio-server/src/client-manager/request_handler_engine.cpp index 42faccefb..8f0fb70ee 100644 --- a/capio-server/src/client-manager/request_handler_engine.cpp +++ b/capio-server/src/client-manager/request_handler_engine.cpp @@ -1,3 +1,4 @@ +#include #include #include #include @@ -104,7 +105,8 @@ void RequestHandlerEngine::start() const { << std::endl << std::endl; - exit(EXIT_FAILURE); + capio_global_configuration->termination_phase = true; + kill(getpid(), SIGUSR1); } LOG(CAPIO_LOG_SERVER_REQUEST_END); diff --git a/capio-server/src/communication-service/control-plane/multicast_control_plane.cpp b/capio-server/src/communication-service/control-plane/multicast_control_plane.cpp index 493d287b8..378d75147 100644 --- a/capio-server/src/communication-service/control-plane/multicast_control_plane.cpp +++ b/capio-server/src/communication-service/control-plane/multicast_control_plane.cpp @@ -1,4 +1,6 @@ +#include "include/storage-service/capio_storage_service.hpp" #include "multicast_utils.hpp" + #include #include #include @@ -29,12 +31,11 @@ void MulticastControlPlane::multicast_server_aliveness_thread( std::to_string(MULTICAST_DISCOVERY_PORT)); while (*continue_execution) { - bzero(incomingMessage, sizeof(incomingMessage)); - // Send port of local data plane backend send_multicast_alive_token(dataplane_backend_port); LOG("Waiting for incoming token..."); do { + bzero(incomingMessage, sizeof(incomingMessage)); const auto recv_sice = recvfrom(discovery_socket, incomingMessage, MULTICAST_ALIVE_TOKEN_MESSAGE_SIZE, 0, reinterpret_cast(&addr), &addrlen); @@ -82,26 +83,36 @@ void MulticastControlPlane::multicast_control_plane_incoming_thread( reinterpret_cast(&addr), &addrlen); LOG("Received multicast data of size %ld and content %s", recv_sice, incoming_msg); if (recv_sice < 0) { - server_println(CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, - std::string("WARNING: received 0 bytes from multicast socket: ")); - server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, - "Execution will continue only with FS discovery support"); + LOG("WARNING: received size less than zero. An error might have occurred: %s", + strerror(errno)); + LOG("Skipping iteration and returning to listening for incoming paxkets"); continue; } - event_type event; + int event; char source_hostname[HOST_NAME_MAX]; char source_path[PATH_MAX]; - sscanf(incoming_msg, "%d %s %s", reinterpret_cast(&event), source_hostname, - source_path); + sscanf(incoming_msg, "%d %s %s", &event, source_hostname, source_path); + + LOG("event=%d, source=%s, path=%s", event, source_hostname, source_path); if (strcmp(capio_global_configuration->node_name, source_hostname) == 0) { continue; } - server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, - "Received control message: " + std::string(incoming_msg)); + switch (event) { + case CREATE: + LOG("Handling remote CREATE event"); + storage_service->createRemoteFile(source_path, source_hostname); + break; + + default: + LOG("WARNING: unknown / unhandled event: %s", incoming_msg); + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, + "Unknown/Unhandled message received: " + std::string(incoming_msg)); + } + LOG("Completed handling of event"); } close(discovery_socket); diff --git a/capio-server/src/communication-service/data-plane/mtcl_backend.cpp b/capio-server/src/communication-service/data-plane/mtcl_backend.cpp index 9fd31b6f2..2e97f1398 100644 --- a/capio-server/src/communication-service/data-plane/mtcl_backend.cpp +++ b/capio-server/src/communication-service/data-plane/mtcl_backend.cpp @@ -1,131 +1,147 @@ +#include +#include #include +#include +#include +#include #include -TransportUnit *MTCLBackend::receive_unit(MTCL::HandleUser *HandlerPointer) { - START_LOG(gettid(), "call()"); - size_t filepath_len; - const auto unit = new TransportUnit(); - HandlerPointer->receive(&filepath_len, sizeof(size_t)); - LOG("Incoming path of length %ld", filepath_len); - unit->_filepath.reserve(filepath_len + 1); - HandlerPointer->receive(unit->_filepath.data(), filepath_len); - LOG("Received message! Path : %s", unit->_filepath.c_str()); - HandlerPointer->receive(&unit->_buffer_size, sizeof(capio_off64_t)); - LOG("Buffer size for incoming data is %ld", unit->_buffer_size); - unit->_bytes = new char[unit->_buffer_size]; - LOG("Allocated space for incoming data"); - HandlerPointer->receive(unit->_bytes, unit->_buffer_size); - LOG("Received file buffer data"); - HandlerPointer->receive(&unit->_start_write_offset, sizeof(capio_off64_t)); - LOG("Received chunk of data should be stored on offset %ld of file %s", - unit->_start_write_offset, unit->_filepath.c_str()); - return unit; -} - -void MTCLBackend::send_unit(MTCL::HandleUser *HandlerPointer, const TransportUnit *unit) { - START_LOG(gettid(), "call()"); - LOG("[send] buffer=%s", unit->_bytes); - /** - * step0: send file path - * step1: send receive buffer size - * step1: send offset of write - * step2: send data - */ - const size_t file_path_length = unit->_filepath.length(); - - HandlerPointer->send(&file_path_length, sizeof(size_t)); - LOG("Size of path that is being sent: %ld", file_path_length); +int MTCLBackend::read_next_request(char *req, char *args) { - HandlerPointer->send(unit->_filepath.c_str(), file_path_length); - LOG("Sent file path: %s", unit->_filepath.c_str()); - - HandlerPointer->send(&unit->_buffer_size, sizeof(capio_off64_t)); - LOG("Size of file buffer to be sent: %ld", unit->_buffer_size); - - HandlerPointer->send(unit->_bytes, unit->_buffer_size); - LOG("Sent %ld bytes of data chunk", unit->_buffer_size); - - HandlerPointer->send(&unit->_start_write_offset, sizeof(capio_off64_t)); - LOG("Sent start write offset : %ld", unit->_start_write_offset); - - // DO NOT DELETE unit: here just afterward, the unit experiences a pop() which - // effectively calls delete on the container. If I delete it here, a double delete is - // raised + START_LOG(gettid(), "call(req=%s)", req); + int code = -1; + auto [ptr, ec] = std::from_chars(req, req + 4, code); + if (ec == std::errc()) { + strcpy(args, ptr + 1); + } else { + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, + "Received invalid code: " + std::to_string(code)); + server_println(CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, + "Offending request: " + std::string(ptr) + " / " + req); + ERR_EXIT("Invalid request %d:%s", code, ptr); + } + return code; } /** * This thread will handle connections towards a single target. + * Algorithm works in this way. In the beginning, the role of the node that starts to send is + * chosen as the smaller lexicographically between the two hostnames involved in the + * communication. Then two phases of sending and receiving up to max_net_op are performed. + * The two nodes switch phases after a final synchronization with the special request + * HAVE_FINISH_SEND_REQUEST. + * When the sender sends this request, either because it has reached the max_net_op or because + * there are no more messages to be sent, the two nodes switch roles. This continues until the + * remote handle pointer is valid */ -void MTCLBackend::server_connection_handler(MTCL::HandleUser HandlerPointer, - const std::string remote_hostname, const int sleep_time, - TransportUnitInterface interface, const bool *terminate, - CONN_HANDLER_ORIGIN source) { - START_LOG(gettid(), "call(remote_hostname=%s, kind=%s)", remote_hostname.c_str(), - source == FROM_REMOTE ? "from remote server" : "to remote server"); - // out = data to sent to others - // in = data from others - auto [in, out, mutex] = interface; +void MTCLBackend::serverConnectionHandler(MTCL::HandleUser HandlerPointer, + const std::string &remote_hostname, MessageQueue *queue, + const int sleep_time, const bool *terminate) { - while (HandlerPointer.isValid()) { - // execute up to N operation of send &/or receive, to avoid starvation due to - // semaphores. - constexpr int max_net_op = 10; - - // Send phase - for (int completed_io_operations = 0; completed_io_operations < max_net_op && !out->empty(); - ++completed_io_operations) { - LOG("[send] Starting send section"); - const auto unit = out->front(); - - LOG("[send] Sending %ld bytes of file %s to %s", unit->_buffer_size, - unit->_filepath.c_str(), remote_hostname.c_str()); - send_unit(&HandlerPointer, unit); - LOG("[send] Message sent"); - - const std::lock_guard lg(*mutex); - LOG("[send] Locked guard"); - out->pop(); - } + char ownHostname[HOST_NAME_MAX]; + gethostname(ownHostname, HOST_NAME_MAX); + bool my_turn_to_send = ownHostname > remote_hostname; + + char request_has_finished_to_send[CAPIO_REQ_MAX_SIZE]{0}; + sprintf(request_has_finished_to_send, "%03d", HAVE_FINISH_SEND_REQUEST); - // Receive phase - size_t receive_size = 0, completed_io_operations = 0; - HandlerPointer.probe(receive_size, false); - while (completed_io_operations < max_net_op && receive_size > 0) { - LOG("[recv] Receiving data"); - auto unit = receive_unit(&HandlerPointer); - LOG("[recv] Lock guard"); - const std::lock_guard lg(*mutex); - in->push(unit); - LOG("[recv] Pushed %ld bytes to be stored on file %s", unit->_buffer_size, - unit->_filepath.c_str()); - - ++completed_io_operations; - receive_size = 0; - HandlerPointer.probe(receive_size, false); + START_LOG(gettid(), "call(remote_hostname=%s)", remote_hostname.c_str()); + + LOG("Will begin execution with %s phase", my_turn_to_send ? "sending" : "receiving"); + + while (HandlerPointer.isValid()) { + // execute up to N operation of send &/or receive, to avoid starvation + + if (my_turn_to_send) { + constexpr int max_net_op = 10; + LOG("Send PHASE"); + for (int i = 0; i < max_net_op; i++) { + // Send of request + if (const auto request_opt = queue->try_get_request(); request_opt.has_value()) { + const auto &request = request_opt.value(); + LOG("Request to be sent = %s", request.c_str()); + HandlerPointer.send(request.c_str(), request.length()); + + // Retrieve size of response + capio_off64_t response_size; + HandlerPointer.receive(&response_size, sizeof(response_size)); + LOG("Response will have size %ld", response_size); + char *response_buffer = new char[response_size]; + HandlerPointer.receive(response_buffer, response_size); + LOG("Received response"); + // push response back to the source + queue->push_response(response_buffer, response_size, request); + LOG("Pushed response back!"); + } + } + LOG("Completed SEND PHASE"); + // Send message I have finished + HandlerPointer.send(request_has_finished_to_send, sizeof(request_has_finished_to_send)); + + } else { + + bool continue_receive_phase = true; + size_t receive_size = 0; + LOG("Receive PHASE"); + while (continue_receive_phase) { + // Receive phase + HandlerPointer.probe(receive_size, false); + if (receive_size > 0) { + LOG("A request is incoming"); + + char incoming_request[CAPIO_REQ_MAX_SIZE], request_args[CAPIO_REQ_MAX_SIZE]; + HandlerPointer.receive(incoming_request, CAPIO_REQ_MAX_SIZE); + LOG("Received request = %s", incoming_request); + int requestCode = read_next_request(incoming_request, request_args); + LOG("Request code is %d", requestCode); + switch (requestCode) { + case HAVE_FINISH_SEND_REQUEST: { + // Finished sending data. Set continue_receive_phase to + // false to go to next phase + LOG("Other has finished sending phase. Switching me from receive to send"); + continue_receive_phase = false; + break; + } + + case FETCH_FROM_REMOTE: { + LOG("Received request for data from remote server"); + // Scan request fetch from remote + char filepath[PATH_MAX]; + capio_off64_t offset, count; + sscanf(request_args, "%s %llu %llu", filepath, &offset, &count); + LOG("filepath=%s, offset=%ld, count=%ld", filepath, offset, count); + const auto buffer = new char[count]; + auto read_size = + storage_service->readFromFileToBuffer(filepath, offset, buffer, count); + HandlerPointer.send(&read_size, sizeof(read_size)); + HandlerPointer.send(buffer, read_size); + delete[] buffer; + break; + } + + default: + break; + } + } + } } // terminate phase if (*terminate) { - const std::lock_guard lg(*mutex); - LOG("[TERM PHASE] Locked access send and receive queues"); - while (!out->empty()) { - const auto unit = out->front(); - send_unit(&HandlerPointer, unit); - out->pop(); - } - LOG("[TERM PHASE] Emptied queues. Closing connection"); + LOG("[TERM PHASE] Closing connection"); HandlerPointer.close(); LOG("[TERM PHASE] Terminating thread server_connection_handler"); return; } + my_turn_to_send = !my_turn_to_send; std::this_thread::sleep_for(std::chrono::milliseconds(sleep_time)); } } -void MTCLBackend::incoming_connection_listener( +void MTCLBackend::incomingConnectionListener( const bool *continue_execution, int sleep_time, - std::unordered_map *open_connections, std::mutex *guard, + std::unordered_map *open_connections, std::mutex *guard, std::vector *_connection_threads, bool *terminate) { char ownHostname[HOST_NAME_MAX] = {0}; @@ -144,20 +160,18 @@ void MTCLBackend::incoming_connection_listener( UserManager.receive(connected_hostname, HOST_NAME_MAX); server_println(CAPIO_SERVER_CLI_LOG_SERVER, - std::string("Connected from ") + connected_hostname); + std::string("Accepted connection with ") + connected_hostname); LOG("Received connection hostname: %s", connected_hostname); - const std::lock_guard lock(*guard); - - open_connections->insert( - {connected_hostname, - std::make_tuple(new std::queue(), new std::queue(), - new std::mutex())}); - - _connection_threads->push_back(new std::thread( - server_connection_handler, std::move(UserManager), connected_hostname, sleep_time, - open_connections->at(connected_hostname), terminate, FROM_REMOTE)); + auto *queue = new MessageQueue(); + { + const std::lock_guard lock(*guard); + open_connections->insert({connected_hostname, queue}); + } + _connection_threads->push_back(new std::thread(serverConnectionHandler, + std::move(UserManager), connected_hostname, + queue, sleep_time, terminate)); } } @@ -173,25 +187,26 @@ void MTCLBackend::connect_to(std::string hostname_port) { return; } - if (connected_hostnames_map.find(remoteToken) != connected_hostnames_map.end()) { + if (open_connections.contains(remoteHost)) { LOG("Remote host %s is already connected", remoteHost.c_str()); return; } LOG("Trying to connect on remote: %s", remoteToken.c_str()); if (auto UserManager = MTCL::Manager::connect(remoteToken); UserManager.isValid()) { - server_println(CAPIO_SERVER_CLI_LOG_SERVER, std::string("Connected to ") + remoteToken); - LOG("Connected to: %s", remoteToken.c_str()); + server_println(CAPIO_SERVER_CLI_LOG_SERVER, + std::string("Opened connection with ") + remoteToken); + LOG("Opened connection with: %s", remoteToken.c_str()); UserManager.send(ownHostname, HOST_NAME_MAX); - const std::lock_guard lg(*_guard); - auto connection_tuple = std::make_tuple( - new std::queue(), new std::queue(), new std::mutex()); - connected_hostnames_map.insert({remoteHost, connection_tuple}); - - connection_threads.push_back( - new std::thread(server_connection_handler, std::move(UserManager), remoteHost.c_str(), - thread_sleep_times, connection_tuple, terminate, TO_REMOTE)); + auto *queue = new MessageQueue(); + { + const std::lock_guard lg(*_guard); + open_connections.insert({remoteHost, queue}); + } + connection_threads.push_back(new std::thread(serverConnectionHandler, + std::move(UserManager), remoteHost, queue, + thread_sleep_times, terminate)); } else { server_println(CAPIO_SERVER_CLI_LOG_SERVER_WARNING, "Warning: tried to connect to " + std::string(remoteHost) + @@ -220,8 +235,8 @@ MTCLBackend::MTCLBackend(const std::string &proto, const std::string &port, int MTCL::Manager::listen(selfToken); - th = new std::thread(incoming_connection_listener, std::ref(continue_execution), sleep_time, - &connected_hostnames_map, _guard, &connection_threads, terminate); + th = new std::thread(incomingConnectionListener, std::ref(continue_execution), sleep_time, + &open_connections, _guard, &connection_threads, terminate); server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, "MTCL_backend initialization completed."); } @@ -248,71 +263,35 @@ MTCLBackend::~MTCLBackend() { server_println(CAPIO_LOG_SERVER_CLI_LEVEL_INFO, "MTCL_backend cleanup completed."); } -std::string MTCLBackend::receive(char *buf, capio_off64_t *buf_size, capio_off64_t *start_offset) { - START_LOG(gettid(), "call()"); +std::vector MTCLBackend::get_open_connections() { + std::vector keys; + keys.reserve(open_connections.size()); // avoid reallocations - std::queue *inQueue = nullptr; - TransportUnitInterface interface; - bool found = false; - while (!found) { - for (auto [hostname, data] : connected_hostnames_map) { - inQueue = std::get<0>(data); - interface = data; - found = !inQueue->empty(); - LOG("Hostname %s, %s incoming data", hostname.c_str(), found ? "has" : "has not"); - } - if (!found) { - LOG("No incoming messages. Putting thread to sleep"); - std::this_thread::sleep_for(std::chrono::milliseconds(thread_sleep_times)); - } + for (const auto &pair : open_connections) { + keys.push_back(pair.first); } - LOG("Found incoming message"); - const std::lock_guard lg(*std::get<2>(interface)); - auto inputUnit = inQueue->front(); - *buf_size = inputUnit->_buffer_size; - *start_offset = inputUnit->_start_write_offset; - memcpy(buf, inputUnit->_bytes, *buf_size); - LOG("Received buffer: %s", inputUnit->_bytes); - inQueue->pop(); - - std::string filename(inputUnit->_filepath); - - return filename; -} -void MTCLBackend::send(const std::string &target, char *buf, uint64_t buf_size, - const std::string &filepath, const capio_off64_t start_offset) { - START_LOG(gettid(), "call(target=%s, buf_size=%ld, file_path=%s, start_offset=%ld, buf=%s)", - target.c_str(), buf_size, filepath.c_str(), start_offset, buf); - - if (const auto element = connected_hostnames_map.find(target); - element != connected_hostnames_map.end()) { - LOG("Found alive connection for given target"); - - const auto interface = element->second; - auto *out = std::get<1>(interface); - - const auto outputUnit = new TransportUnit(); - outputUnit->_buffer_size = buf_size; - outputUnit->_filepath = filepath; - outputUnit->_start_write_offset = start_offset; - outputUnit->_bytes = new char[buf_size]; - memcpy(outputUnit->_bytes, buf, buf_size); - LOG("Copied buffer: %s", outputUnit->_bytes); - - const std::lock_guard lg(*std::get<2>(interface)); - LOG("Pushing Transport unit to out queue"); - out->push(outputUnit); - } else { - server_println(CAPIO_LOG_SERVER_CLI_LEVEL_ERROR, "can't find target"); - } + return keys; } -std::vector MTCLBackend::get_open_connections() { - std::vector connections; +std::tuple MTCLBackend::fetchFromRemoteHost(const std::string &hostname, + const std::filesystem::path &filepath, + capio_off64_t offset, + capio_off64_t count) { - for (const auto &[hostname, _] : connected_hostnames_map) { - connections.push_back(hostname); - } - return connections; + START_LOG(gettid(), "call(hostname=%s, path=%s, offset=%ld, count=%ld)", hostname.c_str(), + filepath.c_str(), offset, count); + + char REQUEST[CAPIO_REQ_MAX_SIZE]; + + sprintf(REQUEST, "%04d %s %llu %llu", FETCH_FROM_REMOTE, filepath.c_str(), offset, count); + LOG("Sending request %s. Fetching queue to hostname %s", REQUEST, hostname.c_str()); + auto queues = open_connections.at(hostname); + LOG("obtained access to queue"); + + queues->push_request(REQUEST); + LOG("Request pushed to output queue"); + auto [buff_size, response_buffer] = queues->get_response(); + LOG("Obtained response. Buffer size of response is %ld", buff_size); + return std::make_tuple(buff_size, response_buffer); } diff --git a/capio-server/src/file-manager/file_manager.cpp b/capio-server/src/file-manager/file_manager.cpp index 6439b4f52..04ee4d1e1 100644 --- a/capio-server/src/file-manager/file_manager.cpp +++ b/capio-server/src/file-manager/file_manager.cpp @@ -70,7 +70,7 @@ void CapioFileManager::_unlockThreadAwaitingCreation(const std::string &path, * Here we need to create a new remote file, as it might be that the file is not * produced by this node but by another remote one */ - storage_service->createRemoteFile(path); + storage_service->createRemoteFile(path, {}); } } diff --git a/capio-server/src/storage-service/capio_remote_file.cpp b/capio-server/src/storage-service/capio_remote_file.cpp index c635b039e..05cbc1b0d 100644 --- a/capio-server/src/storage-service/capio_remote_file.cpp +++ b/capio-server/src/storage-service/capio_remote_file.cpp @@ -1,23 +1,39 @@ +#include "include/communication-service/data-plane/backend_interface.hpp" +#include "include/storage-service/capio_storage_service.hpp" + #include -CapioRemoteFile::CapioRemoteFile(const std::string &filePath) : CapioFile(filePath) {} +CapioRemoteFile::CapioRemoteFile(const std::string &filePath, const std::string &home_node) + : CapioFile(filePath, home_node) {} CapioRemoteFile::~CapioRemoteFile() {} std::size_t CapioRemoteFile::writeData(const char *buffer, const std::size_t file_offset, std::size_t buffer_length) { - return 0; + throw std::runtime_error("Not implemented: writeData"); } std::size_t CapioRemoteFile::readData(char *buffer, std::size_t file_offset, std::size_t buffer_size) { - return 0; + throw std::runtime_error("Not implemented: readData"); } -void CapioRemoteFile::readFromQueue(SPSCQueue &queue, std::size_t offset, std::size_t length) {} +void CapioRemoteFile::readFromQueue(SPSCQueue &queue, std::size_t offset, std::size_t length) { + throw std::runtime_error("Not implemented: readFromQueue"); +} std::size_t CapioRemoteFile::writeToQueue(SPSCQueue &queue, std::size_t offset, std::size_t length) const { - return 0; + + START_LOG(gettid(), "call(offset=%ld,count=%ld,path=%s)", offset, length, fileName.c_str()); + + auto [buffer_size, buffer] = + capio_backend->fetchFromRemoteHost(this->homeNode, this->fileName, offset, length); + + queue.write(buffer, buffer_size); + + delete[] buffer; + + return buffer_size; } diff --git a/capio-server/src/storage-service/capio_storage_service.cpp b/capio-server/src/storage-service/capio_storage_service.cpp index f55ac94b7..d7c3a1596 100644 --- a/capio-server/src/storage-service/capio_storage_service.cpp +++ b/capio-server/src/storage-service/capio_storage_service.cpp @@ -7,9 +7,20 @@ #include auto CapioStorageService::getFile(const std::string &file_name) const { + START_LOG(gettid(), "getFile(file_name=%s)", file_name.c_str()); if (_stored_files->find(file_name) == _stored_files->end()) { + LOG("File not found. Creating file!"); + DBG(gettid(), [&]() { + for (auto f : *_stored_files) { + LOG("Found path in storage service %s", f.first.c_str()); + } + }); + createMemoryFile(file_name); } + auto file = _stored_files->at(file_name); + LOG("Returning %s instance with path %s", + file->is_remote() ? "CapioRemotFile" : "CapioMemoryFile", file->getFileName().c_str()); return _stored_files->at(file_name); } @@ -37,17 +48,21 @@ void CapioStorageService::createMemoryFile(const std::string &file_name) const { _stored_files->emplace(file_name, new CapioMemoryFile(file_name)); } -void CapioStorageService::createRemoteFile(const std::string &file_name) const { +void CapioStorageService::createRemoteFile(const std::string &file_name, + const std::string &home_node) const { /* * First we check that the file associate does not yet exists, as it might be produced * by another app running under the same server instance. if it is not found, we create * the file */ - START_LOG(gettid(), "call(file_name=%s)", file_name.c_str()); - if (_stored_files->find(file_name) == _stored_files->end()) { + START_LOG(gettid(), "call(file_name=%s, home_node=%s)", file_name.c_str(), home_node.c_str()); + if (!_stored_files->contains(file_name)) { LOG("File not found. Creating a new remote file"); - _stored_files->emplace(file_name, new CapioRemoteFile(file_name)); + _stored_files->emplace(file_name, new CapioRemoteFile(file_name, home_node)); } + LOG("Created remote file at path %s with home_node %s", + _stored_files->at(file_name)->getFileName().c_str(), + _stored_files->at(file_name)->getHomeNode().c_str()); } void CapioStorageService::deleteFile(const std::string &file_name) const { @@ -100,12 +115,12 @@ void CapioStorageService::register_client(const std::string &app_name, const pid LOG("Created communication queues"); } -void CapioStorageService::reply_to_client(pid_t pid, const std::string &file, capio_off64_t offset, - capio_off64_t size) const { +size_t CapioStorageService::reply_to_client(pid_t pid, const std::string &file, + capio_off64_t offset, capio_off64_t size) const { START_LOG(gettid(), "call(pid=%llu, file=%s, offset=%llu, size=%llu)", pid, file.c_str(), offset, size); - getFile(file)->writeToQueue(*_server_to_client_queue->at(pid), offset, size); + return getFile(file)->writeToQueue(*_server_to_client_queue->at(pid), offset, size); } void CapioStorageService::reply_to_client_raw(pid_t pid, const char *data, @@ -150,3 +165,18 @@ size_t CapioStorageService::sendFilesToStoreInMemory(const long pid) const { LOG("Return value=%llu", files_to_store_in_mem.size()); return files_to_store_in_mem.size(); } + +void CapioStorageService::storeData(const std::filesystem::path &path, const capio_off64_t offset, + const capio_off64_t buff_size, const char *buffer) const { + const auto file = getFile(path); + + file->writeData(buffer, offset, buff_size); +} + +size_t CapioStorageService::readFromFileToBuffer(const std::filesystem::path &filepath, + capio_off64_t offset, char *buffer, + capio_off64_t count) const { + const auto file = this->getFile(filepath); + + return file->readData(buffer, offset, count); +} diff --git a/capio-tests/multinode/backend/CMakeLists.txt b/capio-tests/multinode/backend/CMakeLists.txt index 636a35359..bf7ccb1a0 100644 --- a/capio-tests/multinode/backend/CMakeLists.txt +++ b/capio-tests/multinode/backend/CMakeLists.txt @@ -24,11 +24,25 @@ target_include_directories(${TARGET_NAME} PRIVATE ) file(GLOB_RECURSE CAPIO_SERVER_SOURCES - "${CMAKE_SOURCE_DIR}/capio-server/src/communication-service/*.cpp" + "${CMAKE_SOURCE_DIR}/capio-server/*.cpp" ) + file(GLOB_RECURSE CAPIO_SERVER_HEADERS "${TARGET_INCLUDE_FOLDER}/*.hpp") +list(FILTER CAPIO_SERVER_SOURCES EXCLUDE REGEX + ".*/json_parser\\.cpp$" +) + +list(FILTER CAPIO_SERVER_SOURCES EXCLUDE REGEX + ".*/capio_server\\.cpp$" +) + +list(FILTER CAPIO_SERVER_SOURCES EXCLUDE REGEX + ".*/command_line_parser\\.cpp$" +) + + target_sources(${TARGET_NAME} PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/src/main.cpp ${CAPIO_SERVER_SOURCES} diff --git a/capio-tests/multinode/backend/docker-compose.yml b/capio-tests/multinode/backend/docker-compose.yml index ec80cbff3..7a4cfc8a4 100644 --- a/capio-tests/multinode/backend/docker-compose.yml +++ b/capio-tests/multinode/backend/docker-compose.yml @@ -11,7 +11,11 @@ services: - node1 environment: - CAPIO_LOG_LEVEL=-1 - command: capio_backend_unit_tests --gtest_break_on_failure --gtest_print_time=1 + - APP_TYPE=writer + - CAPIO_DIR=. + command: | + bash -c " capio_server -b tcp --mem-only --no-config & \ + LD_PRELOAD=libcapio_posix.so capio_backend_unit_tests --gtest_break_on_failure --gtest_print_time=1" node2: image: alphaunito/capio:latest @@ -25,7 +29,11 @@ services: - node2 environment: - CAPIO_LOG_LEVEL=-1 - command: bash -c "sleep 5 && capio_backend_unit_tests --gtest_break_on_failure --gtest_print_time=1" + - APP_TYPE=reader + - CAPIO_DIR=. + command: | + sleep 5 && bash -c " capio_server -b tcp --mem-only --no-config & \ + LD_PRELOAD=libcapio_posix.so capio_backend_unit_tests --gtest_break_on_failure --gtest_print_time=1" volumes: shared_data: diff --git a/capio-tests/multinode/backend/src/MTCL.hpp b/capio-tests/multinode/backend/src/MTCL.hpp index 19bf9192c..a131f40b6 100644 --- a/capio-tests/multinode/backend/src/MTCL.hpp +++ b/capio-tests/multinode/backend/src/MTCL.hpp @@ -8,51 +8,56 @@ #include #include -constexpr char TEST_MESSAGE[] = "hello world how is it going?"; -constexpr capio_off64_t BUFFER_SIZES = 1024; - -TEST(CapioCommServiceTest, TestPingPong) { - START_LOG(gettid(), "INFO: TestPingPong"); - const int port = 1234; - std::string proto = "TCP"; - auto communication_service = new CapioCommunicationService(proto, port, "multicast"); - capio_off64_t size_revc, offset; - - std::vector connections; - - do { - connections = capio_backend->get_open_connections(); - std::this_thread::sleep_for(std::chrono::milliseconds(300)); - } while (connections.empty()); - - char ownHostname[HOST_NAME_MAX] = {0}; - gethostname(ownHostname, HOST_NAME_MAX); - - for (const auto &i : connections) { - if (i.compare(ownHostname) < 0) { - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << "Sending ping to: " << i << std::endl; - char buff[BUFFER_SIZES]{0}, buff1[BUFFER_SIZES]{0}; - memcpy(buff, TEST_MESSAGE, strlen(TEST_MESSAGE)); - capio_backend->send(i, buff, BUFFER_SIZES, "./test", 0); - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << "sent ping to: " << i - << ". Waiting for response" << std::endl; - capio_backend->receive(buff1, &size_revc, &offset); - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << "Received ping response from : " << i - << std::endl; - EXPECT_EQ(strcmp(buff, buff1), 0); - delete communication_service; - return; - } - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << "Listening for ping from: " << i - << std::endl; - char recvBuff[BUFFER_SIZES]; - capio_backend->receive(recvBuff, &size_revc, &offset); - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << "Received ping from: " << i << std::endl; - EXPECT_EQ(strcmp(recvBuff, TEST_MESSAGE), 0); - capio_backend->send(i, recvBuff, size_revc, "./test", 0); - std::cout << CAPIO_LOG_SERVER_CLI_LEVEL_INFO << "Sent ping response to: " << i << std::endl; - delete communication_service; - return; +const char *filename = "data.bin"; +const size_t chunkSize = 1024; +const size_t totalSize = 2048; + +inline int writer() { + + char buffer[chunkSize]; + for (int i = 0; i < chunkSize; i++) { + buffer[i] = i % 26 + 'A'; + } + + FILE *fp = fopen(filename, "wb"); + EXPECT_NE(fp, nullptr); + + for (int i = 0; i < totalSize / chunkSize; i++) { + EXPECT_EQ(fwrite(buffer, 1, chunkSize, fp), chunkSize); + } + + fclose(fp); + printf("Wrote %zu bytes to %s\n", totalSize, filename); + return 0; +} + +inline int reader() { + + char buffer[chunkSize]; + size_t totalRead = 0; + + FILE *fp = fopen(filename, "rb"); + EXPECT_NE(fp, nullptr); + + // Read in 1024-byte chunks until EOF + size_t bytesRead; + while ((bytesRead = fread(buffer, 1, chunkSize, fp)) > 0) { + totalRead += bytesRead; + } + + EXPECT_EQ(totalRead, totalSize); + + fclose(fp); + printf("Total read: %zu bytes\n", totalRead); + return 0; +} + +TEST(CapioCommServiceTest, TestCapioMemoryNetworkBackend) { + + if (const auto program = std::getenv("APP_TYPE"); std::string(program) == "writer") { + EXPECT_EQ(writer(), 0); + } else { + EXPECT_EQ(reader(), 0); } } diff --git a/scripts/docker-development/build-and-startup.sh b/scripts/docker-development/build-and-startup.sh index df223d16f..7c48205b9 100755 --- a/scripts/docker-development/build-and-startup.sh +++ b/scripts/docker-development/build-and-startup.sh @@ -3,5 +3,5 @@ cd ../.. docker build -t alphaunito/capio --build-arg CAPIO_LOG=ON --build-arg CMAKE_BUILD_TYPE=Debug . cd scripts/docker-development - -docker compose up +( sleep 3; docker ps )& +docker compose up \ No newline at end of file diff --git a/scripts/docker-development/docker-compose.yml b/scripts/docker-development/docker-compose.yml index 5a4ef82cc..cfcdc57fe 100644 --- a/scripts/docker-development/docker-compose.yml +++ b/scripts/docker-development/docker-compose.yml @@ -11,6 +11,12 @@ services: - node1 environment: - CAPIO_LOG_LEVEL=-1 + cap_add: + - SYS_PTRACE + security_opt: + - seccomp=unconfined + ports: + - "1111:1111" # Use 1111 for gdbserver node2: image: alphaunito/capio:latest @@ -24,6 +30,12 @@ services: - node2 environment: - CAPIO_LOG_LEVEL=-1 + cap_add: + - SYS_PTRACE + security_opt: + - seccomp=unconfined + ports: + - "1112:1112" # Use 1111 for gdbserver volumes: