From a210c94154acee15cab2f0186b6d70ec228714a7 Mon Sep 17 00:00:00 2001 From: turuslan Date: Mon, 16 Feb 2026 20:06:10 +0500 Subject: [PATCH 1/3] quic datagram --- include/libp2p/basic/poll.hpp | 20 +++++ .../libp2p/connection/capable_connection.hpp | 11 +-- include/libp2p/connection/on_datagram.hpp | 27 ++++++ include/libp2p/injector/host_injector.hpp | 8 ++ include/libp2p/transport/quic/connection.hpp | 3 +- include/libp2p/transport/quic/engine.hpp | 7 ++ include/libp2p/transport/quic/listener.hpp | 2 + include/libp2p/transport/quic/transport.hpp | 2 + .../libp2p/transport/transport_adaptor.hpp | 1 + src/transport/quic/connection.cpp | 7 +- src/transport/quic/engine.cpp | 87 +++++++++++++++++++ src/transport/quic/listener.cpp | 3 + src/transport/quic/transport.cpp | 11 ++- 13 files changed, 177 insertions(+), 12 deletions(-) create mode 100644 include/libp2p/basic/poll.hpp create mode 100644 include/libp2p/connection/on_datagram.hpp diff --git a/include/libp2p/basic/poll.hpp b/include/libp2p/basic/poll.hpp new file mode 100644 index 00000000..82643b36 --- /dev/null +++ b/include/libp2p/basic/poll.hpp @@ -0,0 +1,20 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include + +namespace libp2p { + using PollWaker = std::function; + + class PollFuture { + public: + virtual ~PollFuture() = default; + + virtual bool poll(PollWaker waker) = 0; + }; +} // namespace libp2p diff --git a/include/libp2p/connection/capable_connection.hpp b/include/libp2p/connection/capable_connection.hpp index 9c816400..5e22a9c4 100644 --- a/include/libp2p/connection/capable_connection.hpp +++ b/include/libp2p/connection/capable_connection.hpp @@ -8,6 +8,7 @@ #include +#include #include namespace libp2p::connection { @@ -59,13 +60,13 @@ namespace libp2p::connection { */ virtual void newStream(StreamHandlerFunc cb) = 0; + virtual CoroOutcome> acceptStream() = 0; + /** - * @brief Opens new stream in a coroutine manner - * @return Awaitable result of a new Stream or error + * Add unreliable datagram to send queue. + * Destroy to cancel. */ - virtual CoroOutcome> newStreamCoroutine() = 0; - - virtual CoroOutcome> acceptStream() = 0; + virtual std::shared_ptr sendDatagram(BytesIn message) = 0; }; } // namespace libp2p::connection diff --git a/include/libp2p/connection/on_datagram.hpp b/include/libp2p/connection/on_datagram.hpp new file mode 100644 index 00000000..a6de5a4c --- /dev/null +++ b/include/libp2p/connection/on_datagram.hpp @@ -0,0 +1,27 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include +#include + +namespace libp2p::connection { + struct CapableConnection; +} // namespace libp2p::connection + +namespace libp2p { + using OnDatagram = std::function, qtils::ByteVec)>; + + struct OnDatagramConfig { + OnDatagramConfig() = default; + + bool enable_datagram = false; + OnDatagram on_datagram; + }; + using OnDatagramConfigPtr = std::shared_ptr; +} // namespace libp2p diff --git a/include/libp2p/injector/host_injector.hpp b/include/libp2p/injector/host_injector.hpp index 91f2b44f..924fcbe6 100644 --- a/include/libp2p/injector/host_injector.hpp +++ b/include/libp2p/injector/host_injector.hpp @@ -61,6 +61,14 @@ namespace libp2p::injector { std::move(config))[boost::di::override]; } + inline auto enableDatagram() { + OnDatagramConfig config; + config.enable_datagram = true; + return boost::di::bind().to( + std::make_shared( + std::move(config)))[boost::di::override]; + } + /** * @brief Main function that creates Host Injector. * This creates a complete BasicHost with all network dependencies. diff --git a/include/libp2p/transport/quic/connection.hpp b/include/libp2p/transport/quic/connection.hpp index 889c5d60..83d4a508 100644 --- a/include/libp2p/transport/quic/connection.hpp +++ b/include/libp2p/transport/quic/connection.hpp @@ -59,10 +59,9 @@ namespace libp2p::transport { void start() override; void stop() override; void newStream(StreamHandlerFunc cb) override; - CoroOutcome> newStreamCoroutine() - override; outcome::result> newStream() override; CoroOutcome> acceptStream() override; + std::shared_ptr sendDatagram(BytesIn message) override; void onClose(); diff --git a/include/libp2p/transport/quic/engine.hpp b/include/libp2p/transport/quic/engine.hpp index 4bc0d79b..e9125c3a 100644 --- a/include/libp2p/transport/quic/engine.hpp +++ b/include/libp2p/transport/quic/engine.hpp @@ -11,6 +11,7 @@ #include #include #include +#include #include #include #include @@ -48,6 +49,7 @@ namespace libp2p::transport { namespace libp2p::transport::lsquic { class Engine; + class SendDatagramFuture; struct ConnCtx; struct StreamCtx; @@ -70,6 +72,7 @@ namespace libp2p::transport::lsquic { std::optional connecting{}; std::optional> new_stream{}; std::weak_ptr conn{}; + std::deque> datagram_send_queue; }; /** @@ -95,6 +98,7 @@ namespace libp2p::transport::lsquic { Engine(std::shared_ptr io_context, std::shared_ptr ssl_context, const muxer::MuxedConnectionConfig &mux_config, + OnDatagramConfigPtr on_datagram_config, PeerId local_peer, std::shared_ptr key_codec, boost::asio::ip::udp::socket &&socket, @@ -115,6 +119,8 @@ namespace libp2p::transport::lsquic { const boost::asio::ip::udp::endpoint &remote, const PeerId &peer); outcome::result> newStream( ConnCtx *conn_ctx); + std::shared_ptr sendDatagram(ConnCtx *conn_ctx, + BytesIn message); ConnectionPtrCoroOutcome asyncAccept(); void wantProcess(); void wantFlush(StreamCtx *stream_ctx); @@ -126,6 +132,7 @@ namespace libp2p::transport::lsquic { std::shared_ptr io_context_; std::shared_ptr ssl_context_; + OnDatagramConfigPtr on_datagram_config_; PeerId local_peer_; std::shared_ptr key_codec_; boost::asio::ip::udp::socket socket_; diff --git a/include/libp2p/transport/quic/listener.hpp b/include/libp2p/transport/quic/listener.hpp index 5570411c..41df1bea 100644 --- a/include/libp2p/transport/quic/listener.hpp +++ b/include/libp2p/transport/quic/listener.hpp @@ -32,6 +32,7 @@ namespace libp2p::transport { QuicListener(std::shared_ptr io_context, std::shared_ptr ssl_context, const muxer::MuxedConnectionConfig &mux_config, + OnDatagramConfigPtr on_datagram_config, PeerId local_peer, std::shared_ptr key_codec); @@ -51,6 +52,7 @@ namespace libp2p::transport { std::shared_ptr io_context_; std::shared_ptr ssl_context_; muxer::MuxedConnectionConfig mux_config_; + OnDatagramConfigPtr on_datagram_config_; PeerId local_peer_; std::shared_ptr key_codec_; TransportListener::HandlerFunc handler_; diff --git a/include/libp2p/transport/quic/transport.hpp b/include/libp2p/transport/quic/transport.hpp index 1c793967..a64ccce8 100644 --- a/include/libp2p/transport/quic/transport.hpp +++ b/include/libp2p/transport/quic/transport.hpp @@ -39,6 +39,7 @@ namespace libp2p::transport { QuicTransport(std::shared_ptr io_context, const security::SslContext &ssl_context, const muxer::MuxedConnectionConfig &mux_config, + OnDatagramConfigPtr on_datagram_config, const peer::IdentityManager &id_mgr, std::shared_ptr key_codec); @@ -55,6 +56,7 @@ namespace libp2p::transport { std::shared_ptr io_context_; std::shared_ptr ssl_context_; muxer::MuxedConnectionConfig mux_config_; + OnDatagramConfigPtr on_datagram_config_; PeerId local_peer_; std::shared_ptr key_codec_; boost::asio::ip::udp::resolver resolver_; diff --git a/include/libp2p/transport/transport_adaptor.hpp b/include/libp2p/transport/transport_adaptor.hpp index 75abc603..18076c31 100644 --- a/include/libp2p/transport/transport_adaptor.hpp +++ b/include/libp2p/transport/transport_adaptor.hpp @@ -13,6 +13,7 @@ #include #include +#include #include #include #include diff --git a/src/transport/quic/connection.cpp b/src/transport/quic/connection.cpp index ef9a864c..9993a362 100644 --- a/src/transport/quic/connection.cpp +++ b/src/transport/quic/connection.cpp @@ -86,9 +86,6 @@ namespace libp2p::transport { cb(newStream()); } - CoroOutcome> - QuicConnection::newStreamCoroutine() {} - outcome::result> QuicConnection::newStream() { if (not conn_ctx_) { @@ -103,6 +100,10 @@ namespace libp2p::transport { co_return co_await stream_signal_.receive(); } + std::shared_ptr QuicConnection::sendDatagram(BytesIn message) { + return conn_ctx_->engine->sendDatagram(conn_ctx_, message); + } + void QuicConnection::onStream(std::shared_ptr stream) { stream_signal_.send(stream); } diff --git a/src/transport/quic/engine.cpp b/src/transport/quic/engine.cpp index 3544001a..5acdac81 100644 --- a/src/transport/quic/engine.cpp +++ b/src/transport/quic/engine.cpp @@ -19,6 +19,34 @@ #include namespace libp2p::transport::lsquic { + inline std::shared_ptr nextDatagram(ConnCtx *conn_ctx) { + while (not conn_ctx->datagram_send_queue.empty()) { + auto &weak_future = conn_ctx->datagram_send_queue.front(); + if (auto future = weak_future.lock()) { + return future; + } + conn_ctx->datagram_send_queue.pop_front(); + } + return nullptr; + } + + class SendDatagramFuture : public PollFuture { + public: + SendDatagramFuture(qtils::ByteVec message) : message{std::move(message)} {} + + bool poll(PollWaker waker) override { + if (completed) { + return true; + } + last_waker.emplace(std::move(waker)); + return false; + } + + qtils::ByteVec message; + bool completed = false; + std::optional last_waker; + }; + inline boost::asio::ip::udp::endpoint endpointFrom(const sockaddr *raw) { boost::asio::ip::udp::endpoint endpoint; size_t size = 0; @@ -36,12 +64,14 @@ namespace libp2p::transport::lsquic { Engine::Engine(std::shared_ptr io_context, std::shared_ptr ssl_context, const muxer::MuxedConnectionConfig &mux_config, + OnDatagramConfigPtr on_datagram_config, PeerId local_peer, std::shared_ptr key_codec, boost::asio::ip::udp::socket &&socket, bool client) : io_context_{std::move(io_context)}, ssl_context_{std::move(ssl_context)}, + on_datagram_config_{std::move(on_datagram_config)}, local_peer_{std::move(local_peer)}, key_codec_{std::move(key_codec)}, socket_{std::move(socket)}, @@ -71,6 +101,7 @@ namespace libp2p::transport::lsquic { settings.es_idle_timeout = std::chrono::duration_cast( mux_config.no_streams_interval) .count(); + settings.es_datagrams = on_datagram_config_->enable_datagram ? 1 : 0; static lsquic_stream_if stream_if{}; stream_if.on_new_conn = +[](void *void_self, lsquic_conn_t *conn) { @@ -196,6 +227,51 @@ namespace libp2p::transport::lsquic { writing.value()(); } }; + stream_if.on_datagram = +[](lsquic_conn_t *conn, + const void *buffer, + size_t buffer_size) { + // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) + auto conn_ctx = reinterpret_cast(lsquic_conn_get_ctx(conn)); + auto &on_datagram = conn_ctx->engine->on_datagram_config_->on_datagram; + if (not on_datagram) { + return; + } + auto connection = conn_ctx->conn.lock(); + if (not connection) { + return; + } + qtils::ByteVec message{ + std::span{static_cast(buffer), buffer_size}}; + boost::asio::post(*conn_ctx->engine->io_context_, + [on_datagram, + connection{std::move(connection)}, + message{std::move(message)}] { + on_datagram(std::move(connection), + std::move(message)); + }); + }; + stream_if.on_dg_write = +[](lsquic_conn_t *conn, + void *buffer, + size_t buffer_size) { + // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) + auto conn_ctx = reinterpret_cast(lsquic_conn_get_ctx(conn)); + ssize_t result = -1; + if (auto future = nextDatagram(conn_ctx)) { + conn_ctx->datagram_send_queue.pop_front(); + if (buffer_size < future->message.size()) { + throw std::logic_error{"stream_if.on_dg_write buffer is too small"}; + } + memcpy(buffer, future->message.data(), future->message.size()); + result = future->message.size(); + } + if (auto future = nextDatagram(conn_ctx)) { + lsquic_conn_set_min_datagram_size(conn_ctx->ls_conn, + future->message.size()); + } else { + lsquic_conn_want_datagram_write(conn_ctx->ls_conn, 0); + } + return result; + }; lsquic_engine_api api{}; api.ea_settings = &settings; @@ -309,6 +385,17 @@ namespace libp2p::transport::lsquic { return stream; } + std::shared_ptr Engine::sendDatagram(ConnCtx *conn_ctx, + BytesIn message) { + auto future = std::make_shared(message); + conn_ctx->datagram_send_queue.emplace_back(future); + if (nextDatagram(conn_ctx) == future) { + lsquic_conn_set_min_datagram_size(conn_ctx->ls_conn, message.size()); + lsquic_conn_want_datagram_write(conn_ctx->ls_conn, 1); + } + return future; + } + ConnectionPtrCoroOutcome Engine::asyncAccept() { co_return co_await conn_signal_.receive(); } diff --git a/src/transport/quic/listener.cpp b/src/transport/quic/listener.cpp index adf7b8d1..df00e332 100644 --- a/src/transport/quic/listener.cpp +++ b/src/transport/quic/listener.cpp @@ -16,11 +16,13 @@ namespace libp2p::transport { std::shared_ptr io_context, std::shared_ptr ssl_context, const muxer::MuxedConnectionConfig &mux_config, + OnDatagramConfigPtr on_datagram_config, PeerId local_peer, std::shared_ptr key_codec) : io_context_{std::move(io_context)}, ssl_context_{std::move(ssl_context)}, mux_config_{mux_config}, + on_datagram_config_{std::move(on_datagram_config)}, local_peer_{std::move(local_peer)}, key_codec_{std::move(key_codec)} {} @@ -39,6 +41,7 @@ namespace libp2p::transport { server_ = std::make_shared(io_context_, ssl_context_, mux_config_, + on_datagram_config_, local_peer_, key_codec_, std::move(socket), diff --git a/src/transport/quic/transport.cpp b/src/transport/quic/transport.cpp index 0369706c..cc9a00da 100644 --- a/src/transport/quic/transport.cpp +++ b/src/transport/quic/transport.cpp @@ -18,11 +18,13 @@ namespace libp2p::transport { std::shared_ptr io_context, const security::SslContext &ssl_context, const muxer::MuxedConnectionConfig &mux_config, + OnDatagramConfigPtr on_datagram_config, const peer::IdentityManager &id_mgr, std::shared_ptr key_codec) : io_context_{std::move(io_context)}, ssl_context_{ssl_context.quic}, mux_config_{mux_config}, + on_datagram_config_{std::move(on_datagram_config)}, local_peer_{id_mgr.getId()}, key_codec_{std::move(key_codec)}, resolver_{*io_context_} {} @@ -60,8 +62,12 @@ namespace libp2p::transport { } std::shared_ptr QuicTransport::createListener() { - return std::make_shared( - io_context_, ssl_context_, mux_config_, local_peer_, key_codec_); + return std::make_shared(io_context_, + ssl_context_, + mux_config_, + on_datagram_config_, + local_peer_, + key_codec_); } bool QuicTransport::canDial(const Multiaddress &ma) const { @@ -79,6 +85,7 @@ namespace libp2p::transport { return std::make_shared(io_context_, ssl_context_, mux_config_, + on_datagram_config_, local_peer_, key_codec_, std::move(socket), From 2763c3d3b9b9e9899473103a186e7de6b6cefc4b Mon Sep 17 00:00:00 2001 From: turuslan Date: Tue, 17 Feb 2026 06:45:45 +0500 Subject: [PATCH 2/3] fix completed and waker --- src/transport/quic/engine.cpp | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/transport/quic/engine.cpp b/src/transport/quic/engine.cpp index 5acdac81..0b036f2a 100644 --- a/src/transport/quic/engine.cpp +++ b/src/transport/quic/engine.cpp @@ -263,6 +263,11 @@ namespace libp2p::transport::lsquic { } memcpy(buffer, future->message.data(), future->message.size()); result = future->message.size(); + future->completed = true; + if (future->last_waker) { + boost::asio::post(*conn_ctx->engine->io_context_, + *future->last_waker); + } } if (auto future = nextDatagram(conn_ctx)) { lsquic_conn_set_min_datagram_size(conn_ctx->ls_conn, From 846e26611b8796230e3eff23a8971898ce66c6d8 Mon Sep 17 00:00:00 2001 From: turuslan Date: Wed, 18 Feb 2026 06:02:35 +0500 Subject: [PATCH 3/3] consume waker --- src/transport/quic/engine.cpp | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/transport/quic/engine.cpp b/src/transport/quic/engine.cpp index 0b036f2a..8146fa88 100644 --- a/src/transport/quic/engine.cpp +++ b/src/transport/quic/engine.cpp @@ -264,9 +264,8 @@ namespace libp2p::transport::lsquic { memcpy(buffer, future->message.data(), future->message.size()); result = future->message.size(); future->completed = true; - if (future->last_waker) { - boost::asio::post(*conn_ctx->engine->io_context_, - *future->last_waker); + if (auto waker = qtils::optionTake(future->last_waker)) { + boost::asio::post(*conn_ctx->engine->io_context_, *waker); } } if (auto future = nextDatagram(conn_ctx)) {