Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions include/libp2p/basic/poll.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/**
* Copyright Quadrivium LLC
* All Rights Reserved
* SPDX-License-Identifier: Apache-2.0
*/

#pragma once

#include <functional>

namespace libp2p {
using PollWaker = std::function<void()>;

class PollFuture {
public:
virtual ~PollFuture() = default;

virtual bool poll(PollWaker waker) = 0;
};
} // namespace libp2p
11 changes: 6 additions & 5 deletions include/libp2p/connection/capable_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include <functional>

#include <libp2p/basic/poll.hpp>
#include <libp2p/connection/secure_connection.hpp>

namespace libp2p::connection {
Expand Down Expand Up @@ -59,13 +60,13 @@ namespace libp2p::connection {
*/
virtual void newStream(StreamHandlerFunc cb) = 0;

virtual CoroOutcome<std::shared_ptr<connection::Stream>> acceptStream() = 0;

/**
* @brief Opens new stream in a coroutine manner
* @return Awaitable result of a new Stream or error
* Add unreliable datagram to send queue.
* Destroy to cancel.
*/
virtual CoroOutcome<std::shared_ptr<Stream>> newStreamCoroutine() = 0;

virtual CoroOutcome<std::shared_ptr<connection::Stream>> acceptStream() = 0;
virtual std::shared_ptr<PollFuture> sendDatagram(BytesIn message) = 0;
};

} // namespace libp2p::connection
27 changes: 27 additions & 0 deletions include/libp2p/connection/on_datagram.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/**
* Copyright Quadrivium LLC
* All Rights Reserved
* SPDX-License-Identifier: Apache-2.0
*/

#pragma once

#include <functional>
#include <qtils/byte_vec.hpp>

namespace libp2p::connection {
struct CapableConnection;
} // namespace libp2p::connection

namespace libp2p {
using OnDatagram = std::function<void(
std::shared_ptr<connection::CapableConnection>, qtils::ByteVec)>;

struct OnDatagramConfig {
OnDatagramConfig() = default;

bool enable_datagram = false;
OnDatagram on_datagram;
};
using OnDatagramConfigPtr = std::shared_ptr<OnDatagramConfig>;
} // namespace libp2p
8 changes: 8 additions & 0 deletions include/libp2p/injector/host_injector.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,14 @@ namespace libp2p::injector {
std::move(config))[boost::di::override];
}

inline auto enableDatagram() {
OnDatagramConfig config;
config.enable_datagram = true;
return boost::di::bind<OnDatagramConfig>().to(
std::make_shared<OnDatagramConfig>(
std::move(config)))[boost::di::override];
}

/**
* @brief Main function that creates Host Injector.
* This creates a complete BasicHost with all network dependencies.
Expand Down
3 changes: 1 addition & 2 deletions include/libp2p/transport/quic/connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,9 @@ namespace libp2p::transport {
void start() override;
void stop() override;
void newStream(StreamHandlerFunc cb) override;
CoroOutcome<std::shared_ptr<connection::Stream>> newStreamCoroutine()
override;
outcome::result<std::shared_ptr<connection::Stream>> newStream() override;
CoroOutcome<std::shared_ptr<connection::Stream>> acceptStream() override;
std::shared_ptr<PollFuture> sendDatagram(BytesIn message) override;

void onClose();

Expand Down
7 changes: 7 additions & 0 deletions include/libp2p/transport/quic/engine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <boost/asio/ip/udp.hpp>
#include <boost/asio/steady_timer.hpp>
#include <deque>
#include <libp2p/connection/on_datagram.hpp>
#include <libp2p/coro/channel.hpp>
#include <libp2p/coro/coro.hpp>
#include <libp2p/coro/handler.hpp>
Expand Down Expand Up @@ -48,6 +49,7 @@ namespace libp2p::transport {

namespace libp2p::transport::lsquic {
class Engine;
class SendDatagramFuture;
struct ConnCtx;
struct StreamCtx;

Expand All @@ -70,6 +72,7 @@ namespace libp2p::transport::lsquic {
std::optional<Connecting> connecting{};
std::optional<std::shared_ptr<connection::QuicStream>> new_stream{};
std::weak_ptr<QuicConnection> conn{};
std::deque<std::weak_ptr<SendDatagramFuture>> datagram_send_queue;
};

/**
Expand All @@ -95,6 +98,7 @@ namespace libp2p::transport::lsquic {
Engine(std::shared_ptr<boost::asio::io_context> io_context,
std::shared_ptr<boost::asio::ssl::context> ssl_context,
const muxer::MuxedConnectionConfig &mux_config,
OnDatagramConfigPtr on_datagram_config,
PeerId local_peer,
std::shared_ptr<crypto::marshaller::KeyMarshaller> key_codec,
boost::asio::ip::udp::socket &&socket,
Expand All @@ -115,6 +119,8 @@ namespace libp2p::transport::lsquic {
const boost::asio::ip::udp::endpoint &remote, const PeerId &peer);
outcome::result<std::shared_ptr<connection::QuicStream>> newStream(
ConnCtx *conn_ctx);
std::shared_ptr<PollFuture> sendDatagram(ConnCtx *conn_ctx,
BytesIn message);
ConnectionPtrCoroOutcome asyncAccept();
void wantProcess();
void wantFlush(StreamCtx *stream_ctx);
Expand All @@ -126,6 +132,7 @@ namespace libp2p::transport::lsquic {

std::shared_ptr<boost::asio::io_context> io_context_;
std::shared_ptr<boost::asio::ssl::context> ssl_context_;
OnDatagramConfigPtr on_datagram_config_;
PeerId local_peer_;
std::shared_ptr<crypto::marshaller::KeyMarshaller> key_codec_;
boost::asio::ip::udp::socket socket_;
Expand Down
2 changes: 2 additions & 0 deletions include/libp2p/transport/quic/listener.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ namespace libp2p::transport {
QuicListener(std::shared_ptr<boost::asio::io_context> io_context,
std::shared_ptr<boost::asio::ssl::context> ssl_context,
const muxer::MuxedConnectionConfig &mux_config,
OnDatagramConfigPtr on_datagram_config,
PeerId local_peer,
std::shared_ptr<crypto::marshaller::KeyMarshaller> key_codec);

Expand All @@ -51,6 +52,7 @@ namespace libp2p::transport {
std::shared_ptr<boost::asio::io_context> io_context_;
std::shared_ptr<boost::asio::ssl::context> ssl_context_;
muxer::MuxedConnectionConfig mux_config_;
OnDatagramConfigPtr on_datagram_config_;
PeerId local_peer_;
std::shared_ptr<crypto::marshaller::KeyMarshaller> key_codec_;
TransportListener::HandlerFunc handler_;
Expand Down
2 changes: 2 additions & 0 deletions include/libp2p/transport/quic/transport.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ namespace libp2p::transport {
QuicTransport(std::shared_ptr<boost::asio::io_context> io_context,
const security::SslContext &ssl_context,
const muxer::MuxedConnectionConfig &mux_config,
OnDatagramConfigPtr on_datagram_config,
const peer::IdentityManager &id_mgr,
std::shared_ptr<crypto::marshaller::KeyMarshaller> key_codec);

Expand All @@ -55,6 +56,7 @@ namespace libp2p::transport {
std::shared_ptr<boost::asio::io_context> io_context_;
std::shared_ptr<boost::asio::ssl::context> ssl_context_;
muxer::MuxedConnectionConfig mux_config_;
OnDatagramConfigPtr on_datagram_config_;
PeerId local_peer_;
std::shared_ptr<crypto::marshaller::KeyMarshaller> key_codec_;
boost::asio::ip::udp::resolver resolver_;
Expand Down
1 change: 1 addition & 0 deletions include/libp2p/transport/transport_adaptor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <libp2p/coro/coro.hpp>

#include <libp2p/connection/capable_connection.hpp>
#include <libp2p/connection/on_datagram.hpp>
#include <libp2p/multi/multiaddress.hpp>
#include <libp2p/peer/peer_id.hpp>
#include <libp2p/transport/transport_listener.hpp>
Expand Down
7 changes: 4 additions & 3 deletions src/transport/quic/connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,6 @@ namespace libp2p::transport {
cb(newStream());
}

CoroOutcome<std::shared_ptr<connection::Stream>>
QuicConnection::newStreamCoroutine() {}

outcome::result<std::shared_ptr<libp2p::connection::Stream>>
QuicConnection::newStream() {
if (not conn_ctx_) {
Expand All @@ -103,6 +100,10 @@ namespace libp2p::transport {
co_return co_await stream_signal_.receive();
}

std::shared_ptr<PollFuture> QuicConnection::sendDatagram(BytesIn message) {
return conn_ctx_->engine->sendDatagram(conn_ctx_, message);
}

void QuicConnection::onStream(std::shared_ptr<connection::Stream> stream) {
stream_signal_.send(stream);
}
Expand Down
91 changes: 91 additions & 0 deletions src/transport/quic/engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,34 @@
#include <qtils/option_take.hpp>

namespace libp2p::transport::lsquic {
inline std::shared_ptr<SendDatagramFuture> nextDatagram(ConnCtx *conn_ctx) {
while (not conn_ctx->datagram_send_queue.empty()) {
auto &weak_future = conn_ctx->datagram_send_queue.front();
if (auto future = weak_future.lock()) {
return future;
}
conn_ctx->datagram_send_queue.pop_front();
}
return nullptr;
}

class SendDatagramFuture : public PollFuture {
public:
SendDatagramFuture(qtils::ByteVec message) : message{std::move(message)} {}

bool poll(PollWaker waker) override {
if (completed) {
return true;
}
last_waker.emplace(std::move(waker));
return false;
}

qtils::ByteVec message;
bool completed = false;
std::optional<PollWaker> last_waker;
};

inline boost::asio::ip::udp::endpoint endpointFrom(const sockaddr *raw) {
boost::asio::ip::udp::endpoint endpoint;
size_t size = 0;
Expand All @@ -36,12 +64,14 @@ namespace libp2p::transport::lsquic {
Engine::Engine(std::shared_ptr<boost::asio::io_context> io_context,
std::shared_ptr<boost::asio::ssl::context> ssl_context,
const muxer::MuxedConnectionConfig &mux_config,
OnDatagramConfigPtr on_datagram_config,
PeerId local_peer,
std::shared_ptr<crypto::marshaller::KeyMarshaller> key_codec,
boost::asio::ip::udp::socket &&socket,
bool client)
: io_context_{std::move(io_context)},
ssl_context_{std::move(ssl_context)},
on_datagram_config_{std::move(on_datagram_config)},
local_peer_{std::move(local_peer)},
key_codec_{std::move(key_codec)},
socket_{std::move(socket)},
Expand Down Expand Up @@ -71,6 +101,7 @@ namespace libp2p::transport::lsquic {
settings.es_idle_timeout = std::chrono::duration_cast<std::chrono::seconds>(
mux_config.no_streams_interval)
.count();
settings.es_datagrams = on_datagram_config_->enable_datagram ? 1 : 0;

static lsquic_stream_if stream_if{};
stream_if.on_new_conn = +[](void *void_self, lsquic_conn_t *conn) {
Expand Down Expand Up @@ -196,6 +227,55 @@ namespace libp2p::transport::lsquic {
writing.value()();
}
};
stream_if.on_datagram = +[](lsquic_conn_t *conn,
const void *buffer,
size_t buffer_size) {
// NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
auto conn_ctx = reinterpret_cast<ConnCtx *>(lsquic_conn_get_ctx(conn));
auto &on_datagram = conn_ctx->engine->on_datagram_config_->on_datagram;
if (not on_datagram) {
return;
}
auto connection = conn_ctx->conn.lock();
if (not connection) {
return;
}
qtils::ByteVec message{
std::span{static_cast<const uint8_t *>(buffer), buffer_size}};
boost::asio::post(*conn_ctx->engine->io_context_,
[on_datagram,
connection{std::move(connection)},
message{std::move(message)}] {
on_datagram(std::move(connection),
std::move(message));
});
};
stream_if.on_dg_write = +[](lsquic_conn_t *conn,
void *buffer,
size_t buffer_size) {
// NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
auto conn_ctx = reinterpret_cast<ConnCtx *>(lsquic_conn_get_ctx(conn));
ssize_t result = -1;
if (auto future = nextDatagram(conn_ctx)) {
conn_ctx->datagram_send_queue.pop_front();
if (buffer_size < future->message.size()) {
throw std::logic_error{"stream_if.on_dg_write buffer is too small"};
}
memcpy(buffer, future->message.data(), future->message.size());
result = future->message.size();
future->completed = true;
if (auto waker = qtils::optionTake(future->last_waker)) {
boost::asio::post(*conn_ctx->engine->io_context_, *waker);
}
}
if (auto future = nextDatagram(conn_ctx)) {
lsquic_conn_set_min_datagram_size(conn_ctx->ls_conn,
future->message.size());
} else {
lsquic_conn_want_datagram_write(conn_ctx->ls_conn, 0);
}
return result;
};

lsquic_engine_api api{};
api.ea_settings = &settings;
Expand Down Expand Up @@ -309,6 +389,17 @@ namespace libp2p::transport::lsquic {
return stream;
}

std::shared_ptr<PollFuture> Engine::sendDatagram(ConnCtx *conn_ctx,
BytesIn message) {
auto future = std::make_shared<SendDatagramFuture>(message);
conn_ctx->datagram_send_queue.emplace_back(future);
if (nextDatagram(conn_ctx) == future) {
lsquic_conn_set_min_datagram_size(conn_ctx->ls_conn, message.size());
lsquic_conn_want_datagram_write(conn_ctx->ls_conn, 1);
}
return future;
}

ConnectionPtrCoroOutcome Engine::asyncAccept() {
co_return co_await conn_signal_.receive();
}
Expand Down
3 changes: 3 additions & 0 deletions src/transport/quic/listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@ namespace libp2p::transport {
std::shared_ptr<boost::asio::io_context> io_context,
std::shared_ptr<boost::asio::ssl::context> ssl_context,
const muxer::MuxedConnectionConfig &mux_config,
OnDatagramConfigPtr on_datagram_config,
PeerId local_peer,
std::shared_ptr<crypto::marshaller::KeyMarshaller> key_codec)
: io_context_{std::move(io_context)},
ssl_context_{std::move(ssl_context)},
mux_config_{mux_config},
on_datagram_config_{std::move(on_datagram_config)},
local_peer_{std::move(local_peer)},
key_codec_{std::move(key_codec)} {}

Expand All @@ -39,6 +41,7 @@ namespace libp2p::transport {
server_ = std::make_shared<lsquic::Engine>(io_context_,
ssl_context_,
mux_config_,
on_datagram_config_,
local_peer_,
key_codec_,
std::move(socket),
Expand Down
Loading
Loading