Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,6 @@ capio_logs
debug
build

.capio_tokens

cmake_test_discovery*.json
6 changes: 6 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@ FetchContent_Declare(
GIT_TAG v1.4.0
)

FetchContent_Declare(
mtcl
GIT_REPOSITORY https://github.com/ParaGroup/MTCL
GIT_TAG e5f2bfeea0fc3d704554c7df02e7857f0a00bbba
)

#####################################
# Targets
#####################################
Expand Down
4 changes: 4 additions & 0 deletions capio/common/constants.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
constexpr size_t CAPIO_DEFAULT_DIR_INITIAL_SIZE = 1024L * 1024 * 1024;
constexpr off64_t CAPIO_DEFAULT_FILE_INITIAL_SIZE = 1024L * 1024 * 1024 * 4;

// CAPIO backend constants
constexpr char CAPIO_MCAST_ADV_DEFAULT_ADDR[] = "224.0.0.2";
constexpr unsigned int CAPIO_MCAST_ADV_DEFAULT_PORT = 22334;

// CAPIO default values for shared memory
constexpr char CAPIO_DEFAULT_WORKFLOW_NAME[] = "CAPIO";
constexpr char CAPIO_DEFAULT_APP_NAME[] = "default_app";
Expand Down
2 changes: 2 additions & 0 deletions capio/common/requests.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,6 @@ constexpr const int CAPIO_SERVER_REQUEST_STAT_REPLY = 3;

constexpr const int CAPIO_SERVER_NR_REQUEST = 4;

constexpr const int BACKEND_HAVE_FINISH_SEND_REQUEST = 4;

#endif // CAPIO_COMMON_REQUESTS_HPP
42 changes: 0 additions & 42 deletions capio/common/shm.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,48 +43,6 @@

#endif

class CapioShmCanary {
int _shm_id;
std::string _canary_name;

public:
explicit CapioShmCanary(std::string capio_workflow_name) : _canary_name(capio_workflow_name) {
START_LOG(capio_syscall(SYS_gettid), "call(capio_workflow_name: %s)", _canary_name.data());
if (_canary_name.empty()) {
_canary_name = get_capio_workflow_name();
}
_shm_id = shm_open(_canary_name.data(), O_CREAT | O_EXCL, S_IRUSR | S_IWUSR);
if (_shm_id == -1) {
LOG(CAPIO_SHM_CANARY_ERROR, _canary_name.data());
#ifndef __CAPIO_POSIX
const auto message = new char[strlen(CAPIO_SHM_CANARY_ERROR)];
sprintf(message, CAPIO_SHM_CANARY_ERROR, _canary_name.data());
server_println(message, capio_workflow_name, CAPIO_LOG_SERVER_CLI_LEVEL_ERROR,
"CapioShmCanary");
delete[] message;
#endif
ERR_EXIT("ERR: shm canary flag already exists");
}
#ifndef __CAPIO_POSIX
server_println("Created Capio SHM canary: " + _canary_name, capio_workflow_name,
CAPIO_LOG_SERVER_CLI_LEVEL_STATUS, "CapioShmCanary");
#endif
};

~CapioShmCanary() {
START_LOG(capio_syscall(SYS_gettid), "call()");
#ifndef __CAPIO_POSIX
server_println("Removing shared memory canary flag", get_capio_workflow_name(),
CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, "CapioShmCanary");
#endif
close(_shm_id);
SHM_DESTROY_CHECK(_canary_name.c_str());
}
};

// FIXME: Remove the inline specifier by using extern
inline CapioShmCanary *shm_canary;

inline void *create_shm(const std::string &shm_name, const long int size) {
START_LOG(capio_syscall(SYS_gettid), "call(shm_name=%s, size=%ld)", shm_name.c_str(), size);

Expand Down
3 changes: 2 additions & 1 deletion capio/server/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ FetchContent_Declare(
set(ARGS_BUILD_EXAMPLE OFF CACHE INTERNAL "")
set(ARGS_BUILD_UNITTESTS OFF CACHE INTERNAL "")

FetchContent_MakeAvailable(args capio_cl)
FetchContent_MakeAvailable(args capio_cl mtcl)

#####################################
# Target definition
Expand All @@ -38,6 +38,7 @@ target_include_directories(${TARGET_NAME} PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}/include
${MPI_INCLUDE_PATH}
${capio_cl_SOURCE_DIR}
${mtcl_SOURCE_DIR}/include
)

#####################################
Expand Down
7 changes: 4 additions & 3 deletions capio/server/capio_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "common/requests.hpp"
#include "common/semaphore.hpp"
#include "remote/backend.hpp"
#include "remote/discovery.hpp"
#include "storage/capio_file.hpp"
#include "utils/common.hpp"
#include "utils/env.hpp"
Expand All @@ -36,6 +37,7 @@
ClientManager *client_manager;
StorageManager *storage_manager;
Backend *backend;
DiscoveryService *discovery_service;

#include "handlers.hpp"
#include "utils/cli_parser.hpp"
Expand Down Expand Up @@ -140,13 +142,13 @@ int main(int argc, char **argv) {

capio_cl_engine->print();

backend = select_backend(configuration.backend_name, argc, argv);
discovery_service = new DiscoveryService();
backend = select_backend(configuration.backend_name, argc, argv);

START_LOG(gettid(), "call()");

open_files_location();

shm_canary = new CapioShmCanary(capio_cl_engine->getWorkflowName());
storage_manager = new StorageManager();
client_manager = new ClientManager();

Expand All @@ -159,6 +161,5 @@ int main(int argc, char **argv) {
server_thread.join();
remote_listener_thread.join();

delete backend;
return 0;
}
72 changes: 72 additions & 0 deletions capio/server/include/remote/atomic_queue.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@

#ifndef CAPIO_BACKEND_ATOMIC_QUEUE_HPP
#define CAPIO_BACKEND_ATOMIC_QUEUE_HPP

#include <condition_variable>
#include <mutex>
#include <optional>
#include <queue>
#include <string>

template <typename T> struct AtomicQueueElement {

AtomicQueueElement(T message, size_t message_size, const std::string &origin) {
this->object = message;
this->object_size = message_size;
this->target_or_source = origin;
}

T object;
size_t object_size = 0;
std::string target_or_source;
};

template <typename T> class AtomicQueue {
std::queue<AtomicQueueElement<T>> _queue;
std::mutex _mutex;
std::condition_variable _lock_cond;

bool _shutdown = false;

public:
~AtomicQueue() {
{
std::lock_guard lg(_mutex);
_shutdown = true;
}
_lock_cond.notify_all();
}

void push(T message, size_t message_size, const std::string &origin) {
{
std::lock_guard lg(_mutex);
if (_shutdown) {
return;
}
_queue.emplace(message, message_size, origin);
}
_lock_cond.notify_all();
}

AtomicQueueElement<T> pop() {
std::unique_lock lock(_mutex);
_lock_cond.wait(lock, [this] { return !_queue.empty() || _shutdown; });
auto s = std::move(_queue.front());
_queue.pop();

return s;
}

std::optional<AtomicQueueElement<T>> try_pop() {
std::lock_guard lg(_mutex);
if (_queue.empty() || _shutdown) {
return std::nullopt;
}

auto s = std::move(_queue.front());
_queue.pop();
return s;
}
};

#endif // CAPIO_BACKEND_ATOMIC_QUEUE_HPP
6 changes: 6 additions & 0 deletions capio/server/include/remote/backend.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,12 @@ class Backend {
* @param target
*/
virtual void send_request(const char *message, int message_len, const std::string &target) = 0;

/**
* Connect this server instance to a remote server instance
* @param target Remote server instance identification
*/
virtual void connect_to(const std::string &target) = 0;
};

#endif // CAPIO_SERVER_REMOTE_BACKEND_HPP
2 changes: 2 additions & 0 deletions capio/server/include/remote/backend/include.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,7 @@
*/

#include "mpi.hpp"
#include "mtcl.hpp"
#include "none.hpp"

#endif // CAPIO_SERVER_REMOTE_BACKEND_INCLUDE_HPP
1 change: 1 addition & 0 deletions capio/server/include/remote/backend/mpi.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class MPIBackend : public Backend {
void send_file(char *shm, long int nbytes, const std::string &target) override;
void send_request(const char *message, int message_len, const std::string &target) override;
void recv_file(char *shm, const std::string &source, long int bytes_expected) override;
void connect_to(const std::string &target) override;
};

class MPISYNCBackend final : public MPIBackend {
Expand Down
92 changes: 92 additions & 0 deletions capio/server/include/remote/backend/mtcl.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
#ifndef MTCL_BACKEND_HPP
#define MTCL_BACKEND_HPP

#include <condition_variable>
#include <filesystem>
#include <mutex>
#include <optional>
#include <queue>
#include <string>
#include <thread>
#include <unistd.h>
#include <unordered_map>
#include <utility>
#include <vector>

#include "common/constants.hpp"
#include "common/logger.hpp"
#include "remote/atomic_queue.hpp"
#include "remote/backend.hpp"

#include <shared_mutex>

typedef unsigned long long int capio_off64_t;

/**
* This avoids it to include the MTCL library here as it is a header-only library.
* this is equivalent to use extern in C but for class
*/
namespace MTCL {
class HandleUser;
}

// TODO: extend backend class
class MTCLBackend : public Backend {

int thread_sleep_times = 0;
bool continue_execution = true;

const std::string selfToken, ownPort, usedProtocol;

std::shared_mutex open_connections_lock;
std::unordered_map<std::string, AtomicQueue<const char *> *> open_connections;

std::thread *incoming_connection_thread = nullptr;
std::vector<std::thread *> connection_threads;

AtomicQueue<std::string> incoming_request_queue;

/**
* Waits for incoming new requests to connect to new server instances. When a new request
* arrives, it then handshakes with the remote servers, opening a new connection, and starting a
* new thread that will handle remote requests. If no request arrives within the sleep_time
* parameter, then the method will issue an advertisement on UDP multicast of its alive state
* so that other servers may instantiate a new connection with me.
*
* @param ownPort
* @param usedProtocol
* @param continue_execution
* @param sleep_time
* @param open_connections
* @param open_connection_guard
* @param _connection_threads
* @param incoming_request_queue
*/
void static incomingMTCLConnectionListener(
const std::string &ownPort, const std::string &usedProtocol, const bool *continue_execution,
int sleep_time,
std::unordered_map<std::string, AtomicQueue<const char *> *> *open_connections,
std::shared_mutex *open_connection_guard, std::vector<std::thread *> *_connection_threads,
AtomicQueue<std::string> *incoming_request_queue);

public:
explicit MTCLBackend(const std::string &proto, const std::string &port, int sleep_time);

~MTCLBackend() override;

RemoteRequest read_next_request() override;

void handshake_servers() override;

const std::set<std::string> get_nodes() override;

void send_request(const char *message, int message_len, const std::string &target) override;

void send_file(char *shm, long int nbytes, const std::string &target) override;

void recv_file(char *shm, const std::string &source, long int bytes_expected) override;

void connect_to(const std::string &target_token) override;
};

#endif // MTCL_BACKEND_HPP
1 change: 1 addition & 0 deletions capio/server/include/remote/backend/none.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@ class NoneBackend final : public Backend {
void send_file(char *shm, long int nbytes, const std::string &target) override;
void send_request(const char *message, int message_len, const std::string &target) override;
void recv_file(char *shm, const std::string &source, long int bytes_expected) override;
void connect_to(const std::string &target) override;
};
#endif // CAPIO_SERVER_REMOTE_BACKEND_NONE_HPP
Loading
Loading