Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion capio-server/include/capio-cl-engine/capio_cl_engine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class CapioCLEngine {
void setStoreFileInFileSystem(const std::filesystem::path &path);
bool storeFileInMemory(const std::filesystem::path &path);
std::vector<std::string> getFileToStoreInMemory();
auto get_home_node(const std::string &path);
std::string get_home_node(const std::string &path);
};

inline CapioCLEngine *capio_cl_engine;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

class CapioControlPlane {
public:
typedef enum { CREATE, DELETE, WRITE } event_type;
typedef enum { CREATE, WRITE, CLOSE, COMMIT } event_type;

virtual ~CapioControlPlane() = default;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,85 @@
#ifndef BACKEND_INTERFACE_HPP
#define BACKEND_INTERFACE_HPP

#include "capio/constants.hpp"
#include <capio/constants.hpp>
#include <capio/logger.hpp>
#include <condition_variable>
#include <cstdint>
#include <filesystem>
#include <mutex>
#include <optional>
#include <queue>
#include <string>
#include <unistd.h>
#include <utility>
#include <vector>

class MessageQueue {
class ResponseToRequest {
public:
std::string original_request;
char *response;
capio_off64_t response_size;

ResponseToRequest(std::string request, char *buf, capio_off64_t buff_size)
: original_request(std::move(request)), response(buf), response_size(buff_size) {}
};

std::queue<std::string> request_queue;
std::queue<ResponseToRequest> response_queue;

std::mutex request_mutex;
std::mutex response_mutex;

std::condition_variable request_cv;
std::condition_variable response_cv;

public:
void push_request(const std::string &request) {
START_LOG(gettid(), "call(req=%s)", request.c_str());
{
LOG("Locking request_mutex");
std::lock_guard lg(request_mutex);
LOG("Obtained lock");
request_queue.emplace(request);
}
request_cv.notify_one();
}

std::optional<std::string> try_get_request() {
START_LOG(gettid(), "call()");
LOG("Locking request_mutex");
std::lock_guard lg(request_mutex);
LOG("Obtained lock");
if (request_queue.empty()) {
return std::nullopt;
}
std::string req = std::move(request_queue.front());
request_queue.pop();
return req;
}

void push_response(char *buffer, capio_off64_t buff_size, std::string origin) {
START_LOG(gettid(), "call(origin=%s, buff_size=%ld)", origin.c_str(), buff_size);
{
std::lock_guard lg(response_mutex);
LOG("Obtained lock");
response_queue.emplace(std::move(origin), buffer, buff_size);
}
response_cv.notify_one();
}

std::tuple<capio_off64_t, char *> get_response() {
START_LOG(gettid(), "call()");
std::unique_lock lk(response_mutex);
response_cv.wait(lk, [this] { return !response_queue.empty(); });
LOG("Obtained lock");
auto response = std::move(response_queue.front());
response_queue.pop();
return {response.response_size, response.response};
}
};

class NotImplementedBackendMethod : public std::exception {
public:
[[nodiscard]] const char *what() const noexcept override {
Expand All @@ -17,6 +90,9 @@ class NotImplementedBackendMethod : public std::exception {
};

class BackendInterface {
protected:
typedef enum { HAVE_FINISH_SEND_REQUEST, FETCH_FROM_REMOTE } BackendRequest_t;

public:
virtual ~BackendInterface() = default;

Expand All @@ -25,29 +101,18 @@ class BackendInterface {
*/
virtual void connect_to(std::string hostname_port) { throw NotImplementedBackendMethod(); };

/**
* @brief Send data to target
/** Fetch a chunk of CapioFile internal data from remote host
*
* @param target Hostname of remote target
* @param buf pointer to data to sent
* @param buf_size length of@param filepath
* @param start_offset
* @param buf
* @param hostname Hostname to request data from
* @param filepath Path of the file targeted by the request
* @param offset Offset relative to the beginning of the file from which to read from
* @param count Size of @param buffer and hence size of the fetch operation
* @return Tuple of size of buffer and pointer to char* with the actual buffer
*/
virtual void send(const std::string &target, char *buf, uint64_t buf_size,
const std::string &filepath, capio_off64_t start_offset) {
throw NotImplementedBackendMethod();
};

/**
* @brief receive data
*
* @param buf allocated data buffer
* @param buf_size size of @param buf
* @param start_offset
* @return std::string hostname of sender
*/
virtual std::string receive(char *buf, capio_off64_t *buf_size, capio_off64_t *start_offset) {
virtual std::tuple<size_t, char *> fetchFromRemoteHost(const std::string &hostname,
const std::filesystem::path &filepath,
capio_off64_t offset,
capio_off64_t count) {
throw NotImplementedBackendMethod();
};

Expand All @@ -65,16 +130,13 @@ class BackendInterface {
class NoBackend final : public BackendInterface {
public:
void connect_to(std::string hostname_port) override { return; };

void send(const std::string &target, char *buf, uint64_t buf_size, const std::string &filepath,
capio_off64_t start_offset) override {
return;
std::tuple<size_t, char *> fetchFromRemoteHost(const std::string &hostname,
const std::filesystem::path &filepath,
capio_off64_t offset,
capio_off64_t count) override {
return {-1, nullptr};
};

std::string receive(char *buf, capio_off64_t *buf_size, capio_off64_t *start_offset) override {
return {"no-backend"};
}

std::vector<std::string> get_open_connections() override { return {}; }
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,22 @@
#define MTCL_BACKEND_HPP

#include <include/communication-service/data-plane/backend_interface.hpp>
#include <include/communication-service/data-plane/transport_unit.hpp>
#include <queue>
#include <thread>
#include <unordered_map>

/**
* This avoid to include the MTCL librari here as it is a header only library.
* This avoids it to include the MTCL library here as it is a header-only library.
* this is equivalent to use extern in C but for class
*/
namespace MTCL {
class HandleUser;
}

class MTCLBackend : public BackendInterface {
typedef enum { FROM_REMOTE, TO_REMOTE } CONN_HANDLER_ORIGIN;

typedef std::tuple<std::queue<TransportUnit *> *, std::queue<TransportUnit *> *, std::mutex *>
TransportUnitInterface;
std::unordered_map<std::string, TransportUnitInterface> connected_hostnames_map;
std::string selfToken, connectedHostname, ownPort, usedProtocol;
std::unordered_map<std::string, MessageQueue *> open_connections;
char ownHostname[HOST_NAME_MAX] = {0};
int thread_sleep_times = 0;
bool *continue_execution = new bool;
Expand All @@ -29,36 +26,57 @@ class MTCLBackend : public BackendInterface {
std::vector<std::thread *> connection_threads;
bool *terminate;

static TransportUnit *receive_unit(MTCL::HandleUser *HandlerPointer);

static void send_unit(MTCL::HandleUser *HandlerPointer, const TransportUnit *unit);

/**
* This thread will handle connections towards a single target.
* This thread handles a single p2p connection with another capio_server instance
* @param HandlerPointer A MTCL valid HandlePointer
* @param remote_hostname The remote endpoint of the connection handled by this thread
* @param queue A pointer to a queue to communicate with other components. Queue has pointers to
* inbound and outbound sub-queues for inbound and outbound messages
* @param sleep_time How long to sleep between thread cycle
* @param terminate A reference to a boolean heap allocated variable that is controlled by the
* main thread that tells when to terminate the execution
*/
void static server_connection_handler(MTCL::HandleUser HandlerPointer,
const std::string remote_hostname, const int sleep_time,
TransportUnitInterface interface, const bool *terminate,
CONN_HANDLER_ORIGIN source);
void static serverConnectionHandler(MTCL::HandleUser HandlerPointer,
const std::string &remote_hostname, MessageQueue *queue,
int sleep_time, const bool *terminate);

void static incoming_connection_listener(
/**
* Waits for incoming new requests to connect to new server instances. When a new request
* arrives, it then handshakes with the remote servers, opening a new connection, and starting a
* new thread that will handle remote requests
*
* @param continue_execution
* @param sleep_time
* @param open_connections
* @param guard
* @param _connection_threads
* @param terminate
*/
void static incomingConnectionListener(
const bool *continue_execution, int sleep_time,
std::unordered_map<std::string, TransportUnitInterface> *open_connections,
std::mutex *guard, std::vector<std::thread *> *_connection_threads, bool *terminate);
std::unordered_map<std::string, MessageQueue *> *open_connections, std::mutex *guard,
std::vector<std::thread *> *_connection_threads, bool *terminate);

public:
void connect_to(std::string hostname_port) override;
/**
* Explode request into request code and request arguments
* @param req Original request string as received from the remote node
* @param args output string with parameters of the request
* @return request code
*/
static int read_next_request(char *req, char *args);

public:
explicit MTCLBackend(const std::string &proto, const std::string &port, int sleep_time);

~MTCLBackend() override;

std::string receive(char *buf, capio_off64_t *buf_size, capio_off64_t *start_offset) override;

void send(const std::string &target, char *buf, uint64_t buf_size, const std::string &filepath,
const capio_off64_t start_offset) override;
void connect_to(std::string hostname_port) override;

std::vector<std::string> get_open_connections() override;
std::tuple<size_t, char *> fetchFromRemoteHost(const std::string &hostname,
const std::filesystem::path &filepath,
capio_off64_t offset,
capio_off64_t count) override;
};

#endif // MTCL_BACKEND_HPP

This file was deleted.

18 changes: 15 additions & 3 deletions capio-server/include/storage-service/capio_file.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,17 @@

class CapioFile {
protected:
const std::string fileName;
const std::string fileName, homeNode;
std::size_t totalSize;

public:
explicit CapioFile(const std::string &filePath) : fileName(filePath), totalSize(0) {};
explicit CapioFile(const std::string &filePath,
const std::string &home_node = capio_global_configuration->node_name)
: fileName(filePath), homeNode(home_node), totalSize(0) {};
virtual ~CapioFile() = default;

virtual bool is_remote() { return false; }

[[nodiscard]] std::size_t getSize() const {
START_LOG(gettid(), "call()");
return totalSize;
Expand Down Expand Up @@ -60,6 +64,12 @@ class CapioFile {
*/
virtual std::size_t writeToQueue(SPSCQueue &queue, std::size_t offset,
std::size_t length) const = 0;

/**
*
* @return HomeNode of file
*/
virtual std::string getHomeNode() { return homeNode; };
};

class CapioMemoryFile : public CapioFile {
Expand Down Expand Up @@ -131,10 +141,12 @@ class CapioMemoryFile : public CapioFile {

class CapioRemoteFile : public CapioFile {
public:
explicit CapioRemoteFile(const std::string &filePath);
explicit CapioRemoteFile(const std::string &filePath, const std::string &home_node);

~CapioRemoteFile() override;

bool is_remote() override { return true; };

/**
* Write data to a file stored inside the memory
* @param buffer buffer to store inside memory (i.e. content of the file)
Expand Down
Loading