Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,6 @@ capio_logs
debug
build

.capio_tokens

cmake_test_discovery*.json
4 changes: 4 additions & 0 deletions capio/common/constants.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
constexpr size_t CAPIO_DEFAULT_DIR_INITIAL_SIZE = 1024L * 1024 * 1024;
constexpr off64_t CAPIO_DEFAULT_FILE_INITIAL_SIZE = 1024L * 1024 * 1024 * 4;

// CAPIO backend constants
constexpr char CAPIO_MCAST_ADV_DEFAULT_ADDR[] = "224.0.0.2";
constexpr unsigned int CAPIO_MCAST_ADV_DEFAULT_PORT = 22334;

// CAPIO default values for shared memory
constexpr char CAPIO_DEFAULT_WORKFLOW_NAME[] = "CAPIO";
constexpr char CAPIO_DEFAULT_APP_NAME[] = "default_app";
Expand Down
42 changes: 0 additions & 42 deletions capio/common/shm.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,48 +43,6 @@

#endif

class CapioShmCanary {
int _shm_id;
std::string _canary_name;

public:
explicit CapioShmCanary(std::string capio_workflow_name) : _canary_name(capio_workflow_name) {
START_LOG(capio_syscall(SYS_gettid), "call(capio_workflow_name: %s)", _canary_name.data());
if (_canary_name.empty()) {
_canary_name = get_capio_workflow_name();
}
_shm_id = shm_open(_canary_name.data(), O_CREAT | O_EXCL, S_IRUSR | S_IWUSR);
if (_shm_id == -1) {
LOG(CAPIO_SHM_CANARY_ERROR, _canary_name.data());
#ifndef __CAPIO_POSIX
const auto message = new char[strlen(CAPIO_SHM_CANARY_ERROR)];
sprintf(message, CAPIO_SHM_CANARY_ERROR, _canary_name.data());
server_println(message, capio_workflow_name, CAPIO_LOG_SERVER_CLI_LEVEL_ERROR,
"CapioShmCanary");
delete[] message;
#endif
ERR_EXIT("ERR: shm canary flag already exists");
}
#ifndef __CAPIO_POSIX
server_println("Created Capio SHM canary: " + _canary_name, capio_workflow_name,
CAPIO_LOG_SERVER_CLI_LEVEL_STATUS, "CapioShmCanary");
#endif
};

~CapioShmCanary() {
START_LOG(capio_syscall(SYS_gettid), "call()");
#ifndef __CAPIO_POSIX
server_println("Removing shared memory canary flag", get_capio_workflow_name(),
CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, "CapioShmCanary");
#endif
close(_shm_id);
SHM_DESTROY_CHECK(_canary_name.c_str());
}
};

// FIXME: Remove the inline specifier by using extern
inline CapioShmCanary *shm_canary;

inline void *create_shm(const std::string &shm_name, const long int size) {
START_LOG(capio_syscall(SYS_gettid), "call(shm_name=%s, size=%ld)", shm_name.c_str(), size);

Expand Down
6 changes: 4 additions & 2 deletions capio/server/capio_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "common/requests.hpp"
#include "common/semaphore.hpp"
#include "remote/backend.hpp"
#include "remote/discovery.hpp"
#include "storage/capio_file.hpp"
#include "utils/common.hpp"
#include "utils/env.hpp"
Expand All @@ -36,6 +37,7 @@
ClientManager *client_manager;
StorageManager *storage_manager;
Backend *backend;
DiscoveryService *discovery_service;

#include "handlers.hpp"
#include "utils/cli_parser.hpp"
Expand Down Expand Up @@ -140,13 +142,13 @@ int main(int argc, char **argv) {

capio_cl_engine->print();

backend = select_backend(configuration.backend_name, argc, argv);
discovery_service = new DiscoveryService();
backend = select_backend(configuration.backend_name, argc, argv);

START_LOG(gettid(), "call()");

open_files_location();

shm_canary = new CapioShmCanary(capio_cl_engine->getWorkflowName());
storage_manager = new StorageManager();
client_manager = new ClientManager();

Expand Down
6 changes: 6 additions & 0 deletions capio/server/include/remote/backend.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,12 @@ class Backend {
* @param target
*/
virtual void send_request(const char *message, int message_len, const std::string &target) = 0;

/**
* Connect this server instance to a remote server instance
* @param target Remote server instance identification
*/
virtual void connect_to(const std::string &target) = 0;
};

#endif // CAPIO_SERVER_REMOTE_BACKEND_HPP
1 change: 1 addition & 0 deletions capio/server/include/remote/backend/mpi.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class MPIBackend : public Backend {
void send_file(char *shm, long int nbytes, const std::string &target) override;
void send_request(const char *message, int message_len, const std::string &target) override;
void recv_file(char *shm, const std::string &source, long int bytes_expected) override;
void connect_to(const std::string &target) override;
};

class MPISYNCBackend final : public MPIBackend {
Expand Down
1 change: 1 addition & 0 deletions capio/server/include/remote/backend/none.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@ class NoneBackend final : public Backend {
void send_file(char *shm, long int nbytes, const std::string &target) override;
void send_request(const char *message, int message_len, const std::string &target) override;
void recv_file(char *shm, const std::string &source, long int bytes_expected) override;
void connect_to(const std::string &target) override;
};
#endif // CAPIO_SERVER_REMOTE_BACKEND_NONE_HPP
88 changes: 88 additions & 0 deletions capio/server/include/remote/discovery.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#ifndef CAPIO_DISCOVERY_HPP
#define CAPIO_DISCOVERY_HPP

#include <string>
#include <thread>

#include "utils/shm_canary.hpp"

/**
* Discovery service. Responsible for:
* - Detect other server instances running in the same node with the same workflow name (and halts
* startup if it finds one)
* - Detect other remote running server instances of capio servers and issue commands to the backend
* to open a connection with them as soon as they are found.
*/
class DiscoveryService {

/// @brief Variable used to signal termination to child threads
bool terminate = false;

/// @brief Handle for multicast based discovery thread
std::thread *mcast_listener_thread = nullptr;
/// @brief Handle for file system based discovery thread
std::thread *fs_listener_thread = nullptr;
/// @brief Handle for thread advertising this server instance
std::thread *advertisement_thread = nullptr;

/// @brief Token to be advertised by this server
std::string advertisement_token;

/// @brief Canary variable to detect other server instances running locally that are logically
/// equivalent to the one starting up
CapioShmCanary *shm_canary;

/// @brief Directory to look into for CAPIO tokens
std::filesystem::path token_directory_path;
/// @brief This server instance token filename
std::filesystem::path token_filename;

/// @brief Multicast address
const std::string capio_multicast_adv_address;

/// @brief multicast port
const unsigned int capio_multicast_adv_port;

public:
/**
* Construct a new Discovery Service class
* @param mcast_addr Address to send and receive aliveness token from other servers
* @param mcast_port Port to send and receive aliveness token from other servers
*/
explicit DiscoveryService(const std::string &mcast_addr = CAPIO_MCAST_ADV_DEFAULT_ADDR,
unsigned int mcast_port = CAPIO_MCAST_ADV_DEFAULT_PORT);

/// @brief Default destructor
~DiscoveryService();

/**
* @brief Configures and starts the discovery service to advertise and scan for tokens.
*
* Sets the advertisement token used by other server instances to establish a connection.
* The token must conform to the specific backend requirements for incoming connections.
* * @note The token is not passed via the constructor because the Discovery Service
* must be instantiated before the Backend provides the token.
*
* Once called, this method:
* 1. Stores the current token in a hidden file within a designated directory.
* 2. Initiates multicast traffic to advertise the local token.
* 3. Scans the hidden directory for aliveness tokens from other servers.
*
* @param adv_delay The interval (in milliseconds/seconds) between advertisement broadcasts.
* @param token The authentication or identification string provided by the backend.
* @param token_directory directory to store capio aliveness tokens
*/
void start(unsigned int adv_delay, const std::string &token,
const std::string &token_directory = ".capio_tokens/");

/**
* Stop current server instance from advertising itself and from receiving advertisements from
* other server instances.
*
* NOTE: this method does not destroy the CAPIO canary variable. for that the destruction of the
* class instance is required.
*/
void stop();
};

#endif // CAPIO_DISCOVERY_HPP
13 changes: 13 additions & 0 deletions capio/server/include/utils/shm_canary.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#ifndef CAPIO_SHM_CANARY_HPP
#define CAPIO_SHM_CANARY_HPP
#include <string>

class CapioShmCanary {
int _shm_id;
std::string _canary_name;

public:
explicit CapioShmCanary(const std::string &capio_workflow_name);
~CapioShmCanary();
};
#endif // CAPIO_SHM_CANARY_HPP
10 changes: 7 additions & 3 deletions capio/server/include/utils/signals.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <csignal>

#include "remote/backend.hpp"
#include "remote/discovery.hpp"
#include "server_println.hpp"

#ifdef CAPIO_COVERAGE
Expand All @@ -23,19 +24,22 @@ void sig_term_handler(int signum, siginfo_t *info, void *ptr) {
}

// free all the memory used

discovery_service->stop();
delete client_manager;
delete storage_manager;

server_println("data_buffers cleanup completed", CapioCLEngine::get().getWorkflowName(),
CAPIO_LOG_SERVER_CLI_LEVEL_WARNING, __func__);

delete backend;
delete client_manager;
delete storage_manager;

#ifdef CAPIO_COVERAGE
__gcov_dump();
#endif

delete backend;
delete shm_canary;
delete discovery_service;

server_println("shutdown completed", CapioCLEngine::get().getWorkflowName(),
CAPIO_LOG_SERVER_CLI_LEVEL_INFO, __func__);
Expand Down
Loading