From 867027df9678787be54d1c4d1d5f572ed8b51356 Mon Sep 17 00:00:00 2001 From: turuslan Date: Mon, 7 Jul 2025 10:00:01 +0500 Subject: [PATCH 1/9] noise read writer Signed-off-by: turuslan --- include/libp2p/common/outcome_macro.hpp | 19 +++++++++ src/security/noise/insecure_rw.cpp | 56 ++++++++----------------- 2 files changed, 36 insertions(+), 39 deletions(-) create mode 100644 include/libp2p/common/outcome_macro.hpp diff --git a/include/libp2p/common/outcome_macro.hpp b/include/libp2p/common/outcome_macro.hpp new file mode 100644 index 000000000..7de40dbf7 --- /dev/null +++ b/include/libp2p/common/outcome_macro.hpp @@ -0,0 +1,19 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include + +#define _IF_ERROR_CB_RETURN(tmp, r) \ + ({ \ + auto &&_r = r; \ + if (not _r.has_value()) { \ + return cb(_r.error()); \ + } \ + _r.value(); \ + }) +#define IF_ERROR_CB_RETURN(r) _IF_ERROR_CB_RETURN(QTILS_UNIQUE_NAME(tmp), r) diff --git a/src/security/noise/insecure_rw.cpp b/src/security/noise/insecure_rw.cpp index 9c99cd80b..a8d612f81 100644 --- a/src/security/noise/insecure_rw.cpp +++ b/src/security/noise/insecure_rw.cpp @@ -8,25 +8,12 @@ #include -#include +#include +#include #include +#include #include -#ifndef UNIQUE_NAME -#define UNIQUE_NAME(base) base##__LINE__ -#endif // UNIQUE_NAME - -#define IO_OUTCOME_TRY_NAME(var, val, res, cb) \ - auto && (var) = (res); \ - if ((var).has_error()) { \ - cb((var).error()); \ - return; \ - } \ - auto && (val) = (var).value(); - -#define IO_OUTCOME_TRY(name, res, cb) \ - IO_OUTCOME_TRY_NAME(UNIQUE_NAME(name), name, res, cb) - namespace libp2p::security::noise { InsecureReadWriter::InsecureReadWriter( std::shared_ptr connection, @@ -34,27 +21,21 @@ namespace libp2p::security::noise { : connection_{std::move(connection)}, buffer_{std::move(buffer)} {} void InsecureReadWriter::read(basic::MessageReadWriter::ReadCallbackFunc cb) { - buffer_->resize(kMaxMsgLen); // ensure buffer capacity + buffer_->reserve(kMaxMsgLen); // ensure buffer capacity auto read_cb = [cb{std::move(cb)}, self{shared_from_this()}]( - outcome::result result) mutable { - IO_OUTCOME_TRY(read_bytes, result, cb); - if (kLengthPrefixSize != read_bytes) { - return cb(std::errc::broken_pipe); - } + outcome::result result) mutable { + IF_ERROR_CB_RETURN(result); uint16_t frame_len{ ntohs(common::convert(self->buffer_->data()))}; // NOLINT - auto read_cb = [cb = std::move(cb), self, frame_len]( - outcome::result result) { - IO_OUTCOME_TRY(read_bytes, result, cb); - if (frame_len != read_bytes) { - return cb(std::errc::broken_pipe); - } - self->buffer_->resize(read_bytes); + auto read_cb = [cb = std::move(cb), self](outcome::result result) { + IF_ERROR_CB_RETURN(result); cb(self->buffer_); }; - self->connection_->read(*self->buffer_, frame_len, std::move(read_cb)); + self->buffer_->resize(frame_len); + libp2p::read(self->connection_, *self->buffer_, std::move(read_cb)); }; - connection_->read(*buffer_, kLengthPrefixSize, std::move(read_cb)); + buffer_->resize(kLengthPrefixSize); + libp2p::read(connection_, *buffer_, std::move(read_cb)); } void InsecureReadWriter::write(BytesIn buffer, @@ -66,14 +47,11 @@ namespace libp2p::security::noise { outbuf_.reserve(kLengthPrefixSize + buffer.size()); common::putUint16BE(outbuf_, buffer.size()); outbuf_.insert(outbuf_.end(), buffer.begin(), buffer.end()); - auto write_cb = [self{shared_from_this()}, - cb{std::move(cb)}](outcome::result result) { - IO_OUTCOME_TRY(written_bytes, result, cb); - if (self->outbuf_.size() != written_bytes) { - return cb(std::errc::broken_pipe); - } - cb(written_bytes - kLengthPrefixSize); + auto write_cb = [self{shared_from_this()}, buffer, cb{std::move(cb)}]( + outcome::result result) { + IF_ERROR_CB_RETURN(result); + cb(buffer.size()); }; - writeReturnSize(connection_, outbuf_, std::move(write_cb)); + libp2p::write(connection_, outbuf_, std::move(write_cb)); } } // namespace libp2p::security::noise From 11c452066a6b29bb0198c12e2d87efe5c0e31f8e Mon Sep 17 00:00:00 2001 From: turuslan Date: Mon, 7 Jul 2025 10:26:33 +0500 Subject: [PATCH 2/9] secio connection Signed-off-by: turuslan --- include/libp2p/basic/cb.hpp | 13 ++ .../security/secio/secio_connection.hpp | 4 +- src/security/secio/secio_connection.cpp | 129 ++++++------------ 3 files changed, 59 insertions(+), 87 deletions(-) create mode 100644 include/libp2p/basic/cb.hpp diff --git a/include/libp2p/basic/cb.hpp b/include/libp2p/basic/cb.hpp new file mode 100644 index 000000000..48799a2a5 --- /dev/null +++ b/include/libp2p/basic/cb.hpp @@ -0,0 +1,13 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include + +namespace libp2p { + using CbOutcomeVoid = std::function)>; +} // namespace libp2p diff --git a/include/libp2p/security/secio/secio_connection.hpp b/include/libp2p/security/secio/secio_connection.hpp index 30ff46592..af6c7d70e 100644 --- a/include/libp2p/security/secio/secio_connection.hpp +++ b/include/libp2p/security/secio/secio_connection.hpp @@ -11,6 +11,7 @@ #include #include +#include #include #include #include @@ -115,7 +116,7 @@ namespace libp2p::connection { /** * Retrieves the next available SECIO message from the network. */ - void readNextMessage(ReadCallbackFunc cb); + void readNextMessage(CbOutcomeVoid cb); /** * Moves decrypted bytes from internal buffer to the output buffer. @@ -163,6 +164,7 @@ namespace libp2p::connection { std::queue user_data_buffer_; std::shared_ptr read_buffer_; + std::shared_ptr write_buffer_; log::Logger log_ = log::createLogger("SecIoConnection"); diff --git a/src/security/secio/secio_connection.cpp b/src/security/secio/secio_connection.cpp index 9a55b23fe..520ed1bcc 100644 --- a/src/security/secio/secio_connection.cpp +++ b/src/security/secio/secio_connection.cpp @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include @@ -45,21 +46,6 @@ OUTCOME_CPP_DEFINE_CATEGORY(libp2p::connection, SecioConnection::Error, e) { } } -#ifndef UNIQUE_NAME -#define UNIQUE_NAME(base) base##__LINE__ -#endif // UNIQUE_NAME - -#define IO_OUTCOME_TRY_NAME(var, val, res, cb) \ - auto && (var) = (res); \ - if ((var).has_error()) { \ - cb((var).error()); \ - return; \ - } \ - auto && (val) = (var).value(); - -#define IO_OUTCOME_TRY(name, res, cb) \ - IO_OUTCOME_TRY_NAME(UNIQUE_NAME(name), name, res, cb) - namespace { template outcome::result initAesSecret(const libp2p::Bytes &key, @@ -99,7 +85,8 @@ namespace libp2p::connection { local_stretched_key_{std::move(local_stretched_key)}, remote_stretched_key_{std::move(remote_stretched_key)}, aes128_secrets_{boost::none}, - aes256_secrets_{boost::none} { + aes256_secrets_{boost::none}, + write_buffer_{std::make_shared()} { BOOST_ASSERT(original_connection_); BOOST_ASSERT(hmac_provider_); BOOST_ASSERT(key_marshaller_); @@ -206,6 +193,7 @@ namespace libp2p::connection { void SecioConnection::readSome(BytesOut out, size_t bytes, basic::Reader::ReadCallbackFunc cb) { + ambigousSize(out, bytes); // TODO(107): Reentrancy if (!isInitialized()) { @@ -213,48 +201,29 @@ namespace libp2p::connection { return; } - // define bytes quantity to read of user-level data - size_t out_size{out.empty() ? 0 : static_cast(out.size())}; - size_t read_limit{out_size < bytes ? out_size : bytes}; - if (not user_data_buffer_.empty()) { - auto bytes_available{user_data_buffer_.size()}; - size_t to_read{bytes_available < read_limit ? bytes_available - : read_limit}; + size_t to_read{std::min(user_data_buffer_.size(), out.size())}; popUserData(out, to_read); SL_TRACE(log_, "Successfully read {} bytes", to_read); cb(to_read); return; } - ReadCallbackFunc cb_wrapper = - [self{shared_from_this()}, user_cb{cb}, out, bytes]( - outcome::result size_read_res) -> void { - if (not size_read_res) { - user_cb(size_read_res); - return; - } - self->readSome(out, bytes, user_cb); - }; - - readNextMessage(cb_wrapper); + readNextMessage([self{shared_from_this()}, out, cb{std::move(cb)}]( + outcome::result result) { + IF_ERROR_CB_RETURN(result); + self->readSome(out, out.size(), std::move(cb)); + }); } - void SecioConnection::readNextMessage(ReadCallbackFunc cb) { - original_connection_->read( + void SecioConnection::readNextMessage(CbOutcomeVoid cb) { + read_buffer_->resize(kLenMarkerSize); + libp2p::read( + original_connection_, *read_buffer_, - kLenMarkerSize, [self{shared_from_this()}, buffer = read_buffer_, cb{std::move(cb)}]( - outcome::result read_bytes_res) mutable { - IO_OUTCOME_TRY(len_marker_size, read_bytes_res, cb); - if (len_marker_size != kLenMarkerSize) { - self->log_->error( - "Cannot read frame header. Read {} bytes when {} expected", - len_marker_size, - kLenMarkerSize); - cb(Error::STREAM_IS_BROKEN); - return; - } + outcome::result result) mutable { + IF_ERROR_CB_RETURN(result); uint32_t frame_len{ ntohl(common::convert(buffer->data()))}; // NOLINT if (frame_len > kMaxFrameSize) { @@ -265,37 +234,28 @@ namespace libp2p::connection { return; } SL_TRACE(self->log_, "Expecting frame of size {}.", frame_len); - self->original_connection_->read( + buffer->resize(frame_len); + libp2p::read( + self->original_connection_, *buffer, - frame_len, - [self, buffer, frame_len, cb{cb}]( - outcome::result read_bytes) mutable { - IO_OUTCOME_TRY(read_frame_bytes, read_bytes, cb); - if (frame_len != read_frame_bytes) { - self->log_->error( - "Unable to read expected amount of bytes. Read {} when " - "{} expected", - read_frame_bytes, - frame_len); - cb(Error::STREAM_IS_BROKEN); - return; - } - SL_TRACE( - self->log_, "Received frame with len {}", read_frame_bytes); - IO_OUTCOME_TRY(mac_size, self->macSize(), cb); + [self, buffer, frame_len, cb{std::move(cb)}]( + outcome::result result) mutable { + IF_ERROR_CB_RETURN(result); + SL_TRACE(self->log_, "Received frame with len {}", frame_len); + auto mac_size = IF_ERROR_CB_RETURN(self->macSize()); const auto data_size{frame_len - mac_size}; auto data_span{std::span(buffer->data(), data_size)}; auto mac_span{std::span(*buffer).subspan(data_size, mac_size)}; - IO_OUTCOME_TRY(remote_mac, self->macRemote(data_span), cb); + auto remote_mac = + IF_ERROR_CB_RETURN(self->macRemote(data_span)); if (BytesIn(remote_mac) != BytesIn(mac_span)) { self->log_->error( "Signature does not validate for the received frame"); cb(Error::INVALID_MAC); return; } - IO_OUTCOME_TRY(decrypted_bytes, - (*self->remote_decryptor_)->crypt(data_span), - cb); + auto decrypted_bytes = IF_ERROR_CB_RETURN( + (*self->remote_decryptor_)->crypt(data_span)); size_t decrypted_bytes_len{decrypted_bytes.size()}; for (auto &&e : decrypted_bytes) { self->user_data_buffer_.emplace(std::forward(e)); @@ -304,7 +264,7 @@ namespace libp2p::connection { "Frame decrypted successfully {} -> {}", frame_len, decrypted_bytes_len); - cb(decrypted_bytes_len); + cb(outcome::success()); }); }); } @@ -312,38 +272,35 @@ namespace libp2p::connection { void SecioConnection::writeSome(BytesIn in, size_t bytes, basic::Writer::WriteCallbackFunc cb) { + ambigousSize(in, bytes); // TODO(107): Reentrancy if (!isInitialized()) { cb(Error::CONN_NOT_INITIALIZED); } - IO_OUTCOME_TRY(mac_size, macSize(), cb); - size_t frame_len{bytes + mac_size}; - Bytes frame_buffer; + auto mac_size = IF_ERROR_CB_RETURN(macSize()); + size_t frame_len{in.size() + mac_size}; + write_buffer_->resize(0); + auto &frame_buffer = *write_buffer_; constexpr size_t len_field_size{kLenMarkerSize}; frame_buffer.reserve(len_field_size + frame_len); common::putUint32BE(frame_buffer, frame_len); - IO_OUTCOME_TRY(encrypted_data, (*local_encryptor_)->crypt(in), cb); - IO_OUTCOME_TRY(mac_data, macLocal(encrypted_data), cb); + auto encrypted_data = IF_ERROR_CB_RETURN((*local_encryptor_)->crypt(in)); + auto mac_data = IF_ERROR_CB_RETURN(macLocal(encrypted_data)); frame_buffer.insert(frame_buffer.end(), std::make_move_iterator(encrypted_data.begin()), std::make_move_iterator(encrypted_data.end())); frame_buffer.insert(frame_buffer.end(), std::make_move_iterator(mac_data.begin()), std::make_move_iterator(mac_data.end())); - basic::Writer::WriteCallbackFunc cb_wrapper = - [user_cb{std::move(cb)}, bytes, raw_bytes{frame_buffer.size()}]( - auto &&res) { - if (not res) { - return user_cb(res); // pulling out the error occurred - } - if (res.value() != raw_bytes) { - return user_cb(Error::STREAM_IS_BROKEN); - } - user_cb(bytes); - }; - writeReturnSize(original_connection_, frame_buffer, cb_wrapper); + write(original_connection_, + frame_buffer, + [buffer{write_buffer_}, in, cb{std::move(cb)}]( + outcome::result result) { + IF_ERROR_CB_RETURN(result); + cb(in.size()); + }); } bool SecioConnection::isClosed() const { From ed44c6d0f58ef74528b1f876be27fd9921622126 Mon Sep 17 00:00:00 2001 From: turuslan Date: Mon, 7 Jul 2025 13:42:03 +0500 Subject: [PATCH 3/9] noise connection Signed-off-by: turuslan --- .../security/noise/noise_connection.hpp | 21 ---- src/security/noise/noise_connection.cpp | 105 +++++------------- 2 files changed, 26 insertions(+), 100 deletions(-) diff --git a/include/libp2p/security/noise/noise_connection.hpp b/include/libp2p/security/noise/noise_connection.hpp index fb92f26bc..fa77c56d1 100644 --- a/include/libp2p/security/noise/noise_connection.hpp +++ b/include/libp2p/security/noise/noise_connection.hpp @@ -24,14 +24,6 @@ namespace libp2p::connection { class NoiseConnection : public SecureConnection, public std::enable_shared_from_this { public: - using BufferList = std::list; - - struct OperationContext { - size_t bytes_served; /// written or read bytes count - const size_t total_bytes; /// total size to process - BufferList::iterator write_buffer; /// temporary data storage - }; - ~NoiseConnection() override = default; NoiseConnection( @@ -70,18 +62,6 @@ namespace libp2p::connection { outcome::result remotePublicKey() const override; private: - void readSome(BytesOut out, - size_t bytes, - OperationContext ctx, - ReadCallbackFunc cb); - - void write(BytesIn in, - size_t bytes, - OperationContext ctx, - WriteCallbackFunc cb); - - void eraseWriteBuffer(BufferList::iterator &iterator); - std::shared_ptr connection_; crypto::PublicKey local_; crypto::PublicKey remote_; @@ -90,7 +70,6 @@ namespace libp2p::connection { std::shared_ptr decoder_cs_; std::shared_ptr frame_buffer_; std::shared_ptr framer_; - BufferList write_buffers_; log::Logger log_ = log::createLogger("NoiseConnection"); public: diff --git a/src/security/noise/noise_connection.cpp b/src/security/noise/noise_connection.cpp index e836628e3..7092f0e1c 100644 --- a/src/security/noise/noise_connection.cpp +++ b/src/security/noise/noise_connection.cpp @@ -8,26 +8,10 @@ #include #include +#include #include #include -#ifndef UNIQUE_NAME -#define UNIQUE_NAME(base) base##__LINE__ -#endif // UNIQUE_NAME - -#define OUTCOME_CB_I(var, res) \ - auto && (var) = (res); \ - if ((var).has_error()) { \ - self->eraseWriteBuffer(ctx.write_buffer); \ - return cb((var).error()); \ - } - -#define OUTCOME_CB_NAME_I(var, val, res) \ - OUTCOME_CB_I(var, res) \ - auto && (val) = (var).value(); - -#define OUTCOME_CB(name, res) OUTCOME_CB_NAME_I(UNIQUE_NAME(name), name, res) - namespace libp2p::connection { NoiseConnection::NoiseConnection( std::shared_ptr original_connection, @@ -72,18 +56,9 @@ namespace libp2p::connection { void NoiseConnection::readSome(BytesOut out, size_t bytes, libp2p::basic::Reader::ReadCallbackFunc cb) { - OperationContext context{.bytes_served = 0, - .total_bytes = bytes, - .write_buffer = write_buffers_.end()}; - readSome(out, bytes, context, std::move(cb)); - } - - void NoiseConnection::readSome(BytesOut out, - size_t bytes, - OperationContext ctx, - ReadCallbackFunc cb) { + ambigousSize(out, bytes); if (not frame_buffer_->empty()) { - auto n{std::min(bytes, frame_buffer_->size())}; + auto n{std::min(out.size(), frame_buffer_->size())}; auto begin{frame_buffer_->begin()}; auto end{begin + static_cast(n)}; std::copy(begin, end, out.begin()); @@ -91,55 +66,35 @@ namespace libp2p::connection { return cb(n); } framer_->read( - [self{shared_from_this()}, out, bytes, cb{std::move(cb)}, ctx]( - auto _data) mutable { - OUTCOME_CB(data, _data); - OUTCOME_CB(decrypted, self->decoder_cs_->decrypt({}, *data, {})); + [self{shared_from_this()}, out, cb{std::move(cb)}]( + outcome::result> data_result) mutable { + auto data = IF_ERROR_CB_RETURN(data_result); + auto decrypted = + IF_ERROR_CB_RETURN(self->decoder_cs_->decrypt({}, *data, {})); self->frame_buffer_->assign(decrypted.begin(), decrypted.end()); - self->readSome(out, bytes, ctx, std::move(cb)); + self->readSome(out, out.size(), std::move(cb)); }); } - void NoiseConnection::write(BytesIn in, - size_t bytes, - NoiseConnection::OperationContext ctx, - basic::Writer::WriteCallbackFunc cb) { - auto *self{this}; // for OUTCOME_CB - if (0 == bytes) { - BOOST_ASSERT(ctx.bytes_served >= ctx.total_bytes); - eraseWriteBuffer(ctx.write_buffer); - return cb(ctx.total_bytes); - } - auto n{std::min(bytes, security::noise::kMaxPlainText)}; - OUTCOME_CB(encrypted, encoder_cs_->encrypt({}, in.subspan(0, n), {})); - if (write_buffers_.end() == ctx.write_buffer) { - constexpr auto dummy_size = 1; - constexpr auto dummy_value = 0x0; - ctx.write_buffer = - write_buffers_.emplace(write_buffers_.end(), dummy_size, dummy_value); - } - ctx.write_buffer->swap(encrypted); - framer_->write(*ctx.write_buffer, - [self{shared_from_this()}, - in{in.subspan(static_cast(n))}, - bytes{bytes - n}, - cb{std::move(cb)}, - ctx](auto _n) mutable { - OUTCOME_CB(n, _n); - ctx.bytes_served += n; - self->write(in, bytes, ctx, std::move(cb)); - }); - } - void NoiseConnection::writeSome(BytesIn in, size_t bytes, - libp2p::basic::Writer::WriteCallbackFunc cb) { - OperationContext context{ - .bytes_served = 0, - .total_bytes = bytes, - .write_buffer = write_buffers_.end(), - }; - write(in, bytes, context, std::move(cb)); + basic::Writer::WriteCallbackFunc cb) { + ambigousSize(in, bytes); + if (in.empty()) { + cb(in.size()); + return; + } + if (in.size() > security::noise::kMaxPlainText) { + in = in.first(security::noise::kMaxPlainText); + } + auto encrypted = IF_ERROR_CB_RETURN(encoder_cs_->encrypt({}, in, {})); + // `InsecureReadWriter::write` doesn't leak `BytesIn` reference + framer_->write( + encrypted, + [in, cb{std::move(cb)}](outcome::result result) mutable { + IF_ERROR_CB_RETURN(result); + cb(in.size()); + }); } void NoiseConnection::deferReadCallback(outcome::result res, @@ -180,12 +135,4 @@ namespace libp2p::connection { const { return remote_; } - - void NoiseConnection::eraseWriteBuffer(BufferList::iterator &iterator) { - if (write_buffers_.end() == iterator) { - return; - } - write_buffers_.erase(iterator); - iterator = write_buffers_.end(); - } } // namespace libp2p::connection From 92bd9ff38357db86ad01f88bcc7abbc7b07e9b54 Mon Sep 17 00:00:00 2001 From: turuslan Date: Tue, 8 Jul 2025 14:06:47 +0500 Subject: [PATCH 4/9] mplex stream Signed-off-by: turuslan --- include/libp2p/muxer/mplex/mplex_stream.hpp | 4 +--- src/muxer/mplex/mplex_stream.cpp | 13 +++++++------ 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/include/libp2p/muxer/mplex/mplex_stream.hpp b/include/libp2p/muxer/mplex/mplex_stream.hpp index 074c38948..bd696c2be 100644 --- a/include/libp2p/muxer/mplex/mplex_stream.hpp +++ b/include/libp2p/muxer/mplex/mplex_stream.hpp @@ -84,7 +84,6 @@ namespace libp2p::connection { private: struct Reading { BytesOut out; - size_t bytes; ReadCallbackFunc cb; }; @@ -101,8 +100,7 @@ namespace libp2p::connection { boost::optional reading_; /// Queue of write requests that were received when stream was writing - std::deque, size_t, WriteCallbackFunc>> - write_queue_{}; + std::deque> write_queue_{}; mutable std::mutex write_queue_mutex_; diff --git a/src/muxer/mplex/mplex_stream.cpp b/src/muxer/mplex/mplex_stream.cpp index 041d767db..abe4ec921 100644 --- a/src/muxer/mplex/mplex_stream.cpp +++ b/src/muxer/mplex/mplex_stream.cpp @@ -46,7 +46,7 @@ namespace libp2p::connection { } bool MplexStream::readTry() { - auto size{std::min(read_buffer_.size(), reading_->bytes)}; + auto size{std::min(read_buffer_.size(), reading_->out.size())}; if (size == 0) { return false; } @@ -65,20 +65,21 @@ namespace libp2p::connection { } void MplexStream::readSome(BytesOut out, size_t bytes, ReadCallbackFunc cb) { + ambigousSize(out, bytes); if (is_reset_) { return cb(Error::STREAM_RESET_BY_PEER); } if (!is_readable_) { return cb(Error::STREAM_NOT_READABLE); } - if (bytes == 0 || out.empty() || static_cast(out.size()) < bytes) { + if (out.empty()) { return cb(Error::STREAM_INVALID_ARGUMENT); } if (reading_.has_value()) { return cb(Error::STREAM_IS_READING); } - reading_.emplace(Reading{out, bytes, std::move(cb)}); + reading_.emplace(Reading{out, std::move(cb)}); if (readTry()) { return; } @@ -98,13 +99,13 @@ namespace libp2p::connection { if (!is_writable_) { return cb(Error::STREAM_NOT_WRITABLE); } - if (bytes == 0 || in.empty() || static_cast(in.size()) < bytes) { + if (in.empty()) { return cb(Error::STREAM_INVALID_ARGUMENT); } if (is_writing_) { std::vector in_vector(in.begin(), in.end()); std::lock_guard lock(write_queue_mutex_); - write_queue_.emplace_back(in_vector, bytes, cb); + write_queue_.emplace_back(in_vector, cb); return; } if (connection_.expired()) { @@ -129,7 +130,7 @@ namespace libp2p::connection { // check if new write messages were received while stream was writing // and propagate these messages if (not self->write_queue_.empty()) { - auto [in, bytes, cb] = self->write_queue_.front(); + auto [in, cb] = self->write_queue_.front(); self->write_queue_.pop_front(); writeReturnSize(self, in, cb); } From 9932f99b342b1f195244e71e98c6c264f1085506 Mon Sep 17 00:00:00 2001 From: turuslan Date: Tue, 8 Jul 2025 14:12:38 +0500 Subject: [PATCH 5/9] server echo session Signed-off-by: turuslan --- .../protocol/echo/server_echo_session.hpp | 4 +- src/protocol/echo/server_echo_session.cpp | 38 ++++++++----------- 2 files changed, 17 insertions(+), 25 deletions(-) diff --git a/include/libp2p/protocol/echo/server_echo_session.hpp b/include/libp2p/protocol/echo/server_echo_session.hpp index 753d4d180..996b19f8b 100644 --- a/include/libp2p/protocol/echo/server_echo_session.hpp +++ b/include/libp2p/protocol/echo/server_echo_session.hpp @@ -41,9 +41,9 @@ namespace libp2p::protocol { void onRead(outcome::result rread); - void doWrite(size_t size); + void doWrite(); - void onWrite(outcome::result rwrite); + void onWrite(outcome::result rwrite); }; } // namespace libp2p::protocol diff --git a/src/protocol/echo/server_echo_session.cpp b/src/protocol/echo/server_echo_session.cpp index 494d4a437..c07b6780f 100644 --- a/src/protocol/echo/server_echo_session.cpp +++ b/src/protocol/echo/server_echo_session.cpp @@ -7,8 +7,9 @@ #include #include +#include -#include +#include namespace libp2p::protocol { @@ -70,40 +71,31 @@ namespace libp2p::protocol { } else { log_->debug("read {} bytes", rread.value()); } - doWrite(rread.value()); + buf_.resize(rread.value()); + doWrite(); } - void ServerEchoSession::doWrite(size_t size) { - if (stream_->isClosedForWrite() || size == 0) { + void ServerEchoSession::doWrite() { + if (stream_->isClosedForWrite() or buf_.empty()) { return stop(); } - - auto write_buf = std::vector( - buf_.begin(), - // NOLINTNEXTLINE(cppcoreguidelines-narrowing-conversions) - buf_.begin() + size); - BytesIn span = write_buf; - writeReturnSize( - stream_, - span, - [self{shared_from_this()}, write_buf{std::move(write_buf)}]( - outcome::result rwrite) { self->onWrite(rwrite); }); + write(stream_, + buf_, + [self{shared_from_this()}](outcome::result rwrite) { + self->onWrite(rwrite); + }); } - void ServerEchoSession::onWrite(outcome::result rwrite) { + void ServerEchoSession::onWrite(outcome::result rwrite) { if (!rwrite) { log_->error("error happened during write: {}", rwrite.error()); return stop(); } - if (rwrite.value() < 120) { - log_->info( - "written message: {}", - std::string{buf_.begin(), - // NOLINTNEXTLINE(cppcoreguidelines-narrowing-conversions) - buf_.begin() + rwrite.value()}); + if (buf_.size() < 120) { + log_->info("written message: {}", qtils::byte2str(buf_)); } else { - log_->info("written {} bytes", rwrite.value()); + log_->info("written {} bytes", buf_.size()); } if (!repeat_infinitely_) { From b7d8c047fc45fa86e88f402421b20ce0f7867891 Mon Sep 17 00:00:00 2001 From: turuslan Date: Tue, 8 Jul 2025 14:39:42 +0500 Subject: [PATCH 6/9] yamux Signed-off-by: turuslan --- include/libp2p/muxer/yamux/yamux_stream.hpp | 26 ++-- .../libp2p/muxer/yamux/yamuxed_connection.hpp | 2 +- src/muxer/yamux/yamux_stream.cpp | 147 ++++++------------ src/muxer/yamux/yamuxed_connection.cpp | 25 ++- 4 files changed, 68 insertions(+), 132 deletions(-) diff --git a/include/libp2p/muxer/yamux/yamux_stream.hpp b/include/libp2p/muxer/yamux/yamux_stream.hpp index 7ed91e8ff..4d4ad0484 100644 --- a/include/libp2p/muxer/yamux/yamux_stream.hpp +++ b/include/libp2p/muxer/yamux/yamux_stream.hpp @@ -112,21 +112,22 @@ namespace libp2p::connection { void closedByConnection(std::error_code ec); private: + struct Reading { + BytesOut out; + ReadCallbackFunc cb; + }; + /// Performs close-related cleanup and notifications - void doClose(std::error_code ec, bool notify_read_side); + void doClose(std::error_code ec); /// Called by read*() functions - void doRead(BytesOut out, size_t bytes, ReadCallbackFunc cb); - - /// Completes the read operation if any, clears read state - [[nodiscard]] std::pair> - readCompleted(); + void doRead(BytesOut out, ReadCallbackFunc cb); /// Dequeues data from write queue and sends to the wire in async manner void doWrite(); /// Called by write*() functions - void doWrite(BytesIn in, size_t bytes, WriteCallbackFunc cb); + void doWrite(BytesIn in, WriteCallbackFunc cb); /// Clears close callback state [[nodiscard]] std::pair> @@ -169,16 +170,7 @@ namespace libp2p::connection { basic::ReadBuffer internal_read_buffer_; /// True if read operation is active - bool is_reading_ = false; - - /// Read callback, it is non-zero during async data receive - ReadCallbackFunc read_cb_; - - /// TODO: get rid of this. client's read buffer - BytesOut external_read_buffer_; - - /// Size of message being read - size_t read_message_size_ = 0; + std::optional reading_; /// adjustWindowSize() callback, triggers when receive window size /// becomes greater or equal then desired diff --git a/include/libp2p/muxer/yamux/yamuxed_connection.hpp b/include/libp2p/muxer/yamux/yamuxed_connection.hpp index 6ac7dd594..29e869375 100644 --- a/include/libp2p/muxer/yamux/yamuxed_connection.hpp +++ b/include/libp2p/muxer/yamux/yamuxed_connection.hpp @@ -159,7 +159,7 @@ namespace libp2p::connection { void doWrite(WriteQueueItem packet); /// Write callback - void onDataWritten(outcome::result res, StreamId stream_id); + void onDataWritten(outcome::result res, WriteQueueItem &&packet); /// Creates new yamux stream std::shared_ptr createStream(StreamId stream_id); diff --git a/src/muxer/yamux/yamux_stream.cpp b/src/muxer/yamux/yamux_stream.cpp index 3dbe594ea..42eef75ee 100644 --- a/src/muxer/yamux/yamux_stream.cpp +++ b/src/muxer/yamux/yamux_stream.cpp @@ -11,6 +11,7 @@ #include #include #include +#include #define TRACE_ENABLED 0 #include @@ -24,6 +25,14 @@ namespace libp2p::connection { } } // namespace + struct FinallyReading { + ~FinallyReading() { + cb(r); + } + YamuxStream::ReadCallbackFunc cb; + outcome::result r; + }; + YamuxStream::YamuxStream( std::shared_ptr connection, YamuxStreamFeedback &feedback, @@ -50,7 +59,8 @@ namespace libp2p::connection { } void YamuxStream::readSome(BytesOut out, size_t bytes, ReadCallbackFunc cb) { - doRead(out, bytes, std::move(cb)); + ambigousSize(out, bytes); + doRead(out, std::move(cb)); } void YamuxStream::deferReadCallback(outcome::result res, @@ -59,7 +69,8 @@ namespace libp2p::connection { } void YamuxStream::writeSome(BytesIn in, size_t bytes, WriteCallbackFunc cb) { - doWrite(in, bytes, std::move(cb)); + ambigousSize(in, bytes); + doWrite(in, std::move(cb)); } void YamuxStream::deferWriteCallback(std::error_code ec, @@ -116,7 +127,7 @@ namespace libp2p::connection { void YamuxStream::reset() { feedback_.resetStream(stream_id_); - doClose(Error::STREAM_RESET_BY_HOST, true); + doClose(Error::STREAM_RESET_BY_HOST); } void YamuxStream::adjustWindowSize(uint32_t new_size, @@ -189,38 +200,18 @@ namespace libp2p::connection { TRACE("stream {} read {} bytes", stream_id_, sz); bool overflow = false; - bool read_completed = false; size_t bytes_consumed = 0; - std::pair> read_cb_and_res{ - ReadCallbackFunc{}, 0}; + std::optional finally_reading; // First transfer bytes to client if available - if (is_reading_) { - [[maybe_unused]] auto bytes_needed = - static_cast(external_read_buffer_.size()); - - assert(bytes_needed > 0); + if (auto reading = qtils::optionTake(reading_)) { assert(internal_read_buffer_.empty()); // if sz > bytes_needed then internal buffer will be non empty after // this - bytes_consumed = - internal_read_buffer_.addAndConsume(bytes, external_read_buffer_); - + bytes_consumed = internal_read_buffer_.addAndConsume(bytes, reading->out); assert(bytes_consumed > 0); - - external_read_buffer_ = - external_read_buffer_.subspan(static_cast(bytes_consumed)); - - read_completed = external_read_buffer_.empty(); - read_message_size_ = bytes_consumed; - read_completed = true; - - if (read_completed) { - read_cb_and_res = readCompleted(); - } else { - assert(bytes_consumed < bytes_needed); - } + finally_reading.emplace(std::move(reading->cb), bytes_consumed); } else { internal_read_buffer_.add(bytes); } @@ -246,7 +237,7 @@ namespace libp2p::connection { } if (overflow) { - doClose(Error::STREAM_RECEIVE_OVERFLOW, false); + doClose(Error::STREAM_RECEIVE_OVERFLOW); } else if (bytes_consumed > 0) { feedback_.ackReceivedBytes(stream_id_, bytes_consumed); TRACE("stream {} receive window increased by {} to {}", @@ -255,13 +246,16 @@ namespace libp2p::connection { peers_window_size_ - internal_read_buffer_.size()); } - if (read_cb_and_res.first) { - read_cb_and_res.first(read_cb_and_res.second); - } return overflow ? kRemoveStreamAndSendRst : kKeepStream; } YamuxStream::DataFromConnectionResult YamuxStream::onFINReceived() { + std::optional finally_reading; + if (auto reading = qtils::optionTake(reading_)) { + finally_reading.emplace(std::move(reading->cb), + Error::STREAM_CLOSED_BY_HOST); + } + if (isClosed()) { // already closed, maybe error return kRemoveStreamAndSendRst; @@ -270,21 +264,12 @@ namespace libp2p::connection { is_readable_ = false; if (!is_writable_) { - doClose(Error::STREAM_CLOSED_BY_HOST, true); + doClose(Error::STREAM_CLOSED_BY_HOST); // connection will remove stream return kRemoveStream; } - if (is_reading_) { - // Half closed, client may still write and FIN - - auto cb_and_result = readCompleted(); - if (cb_and_result.first) { - cb_and_result.first(cb_and_result.second); - } - } - return kKeepStream; } @@ -294,7 +279,7 @@ namespace libp2p::connection { return; } - doClose(Error::STREAM_RESET_BY_PEER, true); + doClose(Error::STREAM_RESET_BY_PEER); } void YamuxStream::onDataWritten(size_t bytes) { @@ -302,7 +287,7 @@ namespace libp2p::connection { if (!result.data_consistent) { log()->error("write queue ack failed, stream {}", stream_id_); feedback_.resetStream(stream_id_); - doClose(Error::STREAM_INTERNAL_ERROR, true); + doClose(Error::STREAM_INTERNAL_ERROR); return; } @@ -312,12 +297,18 @@ namespace libp2p::connection { } void YamuxStream::closedByConnection(std::error_code ec) { - doClose(std::move(ec), true); + doClose(std::move(ec)); } - void YamuxStream::doClose(std::error_code ec, bool notify_read_side) { + void YamuxStream::doClose(std::error_code ec) { // ensure lifetime of this object during doClose auto self = shared_from_this(); + + std::optional finally_reading; + if (auto reading = qtils::optionTake(reading_)) { + finally_reading.emplace(std::move(reading->cb), ec); + } + if (close_reason_) { // already closed return; @@ -327,13 +318,6 @@ namespace libp2p::connection { is_readable_ = false; is_writable_ = false; - std::pair> read_cb_and_res{ - ReadCallbackFunc{}, 0}; - - if (notify_read_side && is_reading_) { - read_cb_and_res = readCompleted(); - } - internal_read_buffer_.clear(); auto write_callbacks = write_queue_.getAllCallbacks(); @@ -345,10 +329,6 @@ namespace libp2p::connection { VoidResultHandlerFunc window_size_cb; window_size_cb.swap(window_size_cb_); - if (read_cb_and_res.first) { - read_cb_and_res.first(read_cb_and_res.second); - } - for (const auto &cb : write_callbacks) { cb(ec); } @@ -362,18 +342,16 @@ namespace libp2p::connection { } } - void YamuxStream::doRead(BytesOut out, size_t bytes, ReadCallbackFunc cb) { + void YamuxStream::doRead(BytesOut out, ReadCallbackFunc cb) { assert(cb); - if (!cb || bytes == 0 || out.empty() - || static_cast(out.size()) < bytes) { + if (out.empty()) { return deferReadCallback(Error::STREAM_INVALID_ARGUMENT, std::move(cb)); } // If something is still in read buffer, the client can consume these bytes auto bytes_available_now = internal_read_buffer_.size(); if (bytes_available_now > 0) { - out = out.first(static_cast(bytes)); size_t consumed = internal_read_buffer_.consume(out); assert(consumed > 0); @@ -388,8 +366,8 @@ namespace libp2p::connection { return deferReadCallback(*close_reason_, std::move(cb)); } - if (is_reading_) { - return deferReadCallback(Error::STREAM_IS_READING, std::move(cb)); + if (reading_.has_value()) { + abort(); } if (!is_readable_) { @@ -397,40 +375,7 @@ namespace libp2p::connection { return deferReadCallback(Error::STREAM_NOT_READABLE, std::move(cb)); } - is_reading_ = true; - read_cb_ = std::move(cb); - external_read_buffer_ = out; - read_message_size_ = bytes; - external_read_buffer_ = - external_read_buffer_.first(static_cast(read_message_size_)); - - if (bytes_available_now > 0) { - internal_read_buffer_.consume(external_read_buffer_); - external_read_buffer_ = external_read_buffer_.subspan( - static_cast(bytes_available_now)); - } - } - - std::pair> - YamuxStream::readCompleted() { - using CB = basic::Reader::ReadCallbackFunc; - std::pair> r{CB{}, read_message_size_}; - if (is_reading_) { - is_reading_ = false; - read_message_size_ = 0; - if (read_cb_) { - r.first.swap(read_cb_); - if (!is_readable_) { - if (close_reason_) { - r.second = *close_reason_; - } else { - // FIN received, but not yet closed - r.second = Error::STREAM_CLOSED_BY_PEER; - } - } - } - } - return r; + reading_.emplace(out, std::move(cb)); } void YamuxStream::doWrite() { @@ -464,7 +409,7 @@ namespace libp2p::connection { } if (!is_readable_) { - doClose(Error::STREAM_CLOSED_BY_HOST, false); + doClose(Error::STREAM_CLOSED_BY_HOST); } else { // let bytes be consumed with peers FIN even if no reader (???) peers_window_size_ = maximum_window_size_; @@ -472,8 +417,8 @@ namespace libp2p::connection { } } - void YamuxStream::doWrite(BytesIn in, size_t bytes, WriteCallbackFunc cb) { - if (bytes == 0 || in.empty() || static_cast(in.size()) < bytes) { + void YamuxStream::doWrite(BytesIn in, WriteCallbackFunc cb) { + if (in.empty()) { return deferWriteCallback(Error::STREAM_INVALID_ARGUMENT, std::move(cb)); } @@ -488,11 +433,11 @@ namespace libp2p::connection { outcome::result) mutable { cb(std::move(res)); }); } - if (!write_queue_.canEnqueue(bytes)) { + if (!write_queue_.canEnqueue(in.size())) { return deferWriteCallback(Error::STREAM_WRITE_OVERFLOW, std::move(cb)); } - write_queue_.enqueue(in.first(bytes), std::move(cb)); + write_queue_.enqueue(in, std::move(cb)); doWrite(); } diff --git a/src/muxer/yamux/yamuxed_connection.cpp b/src/muxer/yamux/yamuxed_connection.cpp index 198cbd63f..6dc79b610 100644 --- a/src/muxer/yamux/yamuxed_connection.cpp +++ b/src/muxer/yamux/yamuxed_connection.cpp @@ -8,9 +8,7 @@ #include -#include -#include -#include +#include #include namespace libp2p::connection { @@ -625,18 +623,18 @@ namespace libp2p::connection { writing_buf_->assign(packet.packet.begin(), packet.packet.end()); auto cb = [wptr{weak_from_this()}, buf{writing_buf_}, - packet = std::move(packet)](outcome::result res) { + packet = std::move(packet)](outcome::result res) mutable { if (auto self = wptr.lock()) { - self->onDataWritten(res, packet.stream_id); + self->onDataWritten(res, std::move(packet)); } }; is_writing_ = true; - writeReturnSize(connection_, *writing_buf_, cb); + write(connection_, *writing_buf_, cb); } - void YamuxedConnection::onDataWritten(outcome::result res, - StreamId stream_id) { + void YamuxedConnection::onDataWritten(outcome::result res, + WriteQueueItem &&packet) { if (!res) { write_queue_.clear(); std::ignore = connection_->close(); @@ -648,10 +646,10 @@ namespace libp2p::connection { // this instance may be killed inside further callback auto self = shared_from_this(); - if (stream_id != 0) { + if (packet.stream_id != 0) { // pass write ack to stream about data size written except header size - auto sz = res.value(); + auto sz = packet.packet.size(); if (sz < YamuxFrame::kHeaderLength) { log()->error("onDataWritten : too small size arrived: {}", sz); sz = 0; @@ -660,10 +658,11 @@ namespace libp2p::connection { } if (sz > 0) { - auto it = streams_.find(stream_id); + auto it = streams_.find(packet.stream_id); if (it == streams_.end()) { - SL_DEBUG( - log(), "onDataWritten : stream {} no longer exists", stream_id); + SL_DEBUG(log(), + "onDataWritten : stream {} no longer exists", + packet.stream_id); } else { // stream can now call write callbacks it->second->onDataWritten(sz); From 528fb9b0ac5c4790dc4966ba6cb9f27cf5717610 Mon Sep 17 00:00:00 2001 From: turuslan Date: Tue, 8 Jul 2025 15:12:09 +0500 Subject: [PATCH 7/9] clang-tidy Signed-off-by: turuslan --- src/muxer/yamux/yamux_stream.cpp | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/muxer/yamux/yamux_stream.cpp b/src/muxer/yamux/yamux_stream.cpp index 42eef75ee..df9fc3094 100644 --- a/src/muxer/yamux/yamux_stream.cpp +++ b/src/muxer/yamux/yamux_stream.cpp @@ -26,9 +26,18 @@ namespace libp2p::connection { } // namespace struct FinallyReading { + FinallyReading(YamuxStream::ReadCallbackFunc cb, outcome::result r) + : cb{std::move(cb)}, r{r} {} ~FinallyReading() { cb(r); } + + // clang-tidy cppcoreguidelines-special-member-functions + FinallyReading(const FinallyReading &) = delete; + void operator=(const FinallyReading &) = delete; + FinallyReading(FinallyReading &&) = delete; + void operator=(FinallyReading &&) = delete; + YamuxStream::ReadCallbackFunc cb; outcome::result r; }; From 47e2cf87573bb0bf8fc4d8f8fffbf1c204d72fc6 Mon Sep 17 00:00:00 2001 From: turuslan Date: Tue, 8 Jul 2025 19:42:15 +0500 Subject: [PATCH 8/9] clang-tidy Signed-off-by: turuslan --- src/muxer/yamux/yamux_stream.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/muxer/yamux/yamux_stream.cpp b/src/muxer/yamux/yamux_stream.cpp index df9fc3094..446c42fd5 100644 --- a/src/muxer/yamux/yamux_stream.cpp +++ b/src/muxer/yamux/yamux_stream.cpp @@ -384,7 +384,7 @@ namespace libp2p::connection { return deferReadCallback(Error::STREAM_NOT_READABLE, std::move(cb)); } - reading_.emplace(out, std::move(cb)); + reading_.emplace(Reading{out, std::move(cb)}); } void YamuxStream::doWrite() { From 9506caa8b4b2f60649bc4e134559ad6b35b27673 Mon Sep 17 00:00:00 2001 From: turuslan Date: Tue, 8 Jul 2025 19:42:55 +0500 Subject: [PATCH 9/9] comment Signed-off-by: turuslan --- src/muxer/yamux/yamux_stream.cpp | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/muxer/yamux/yamux_stream.cpp b/src/muxer/yamux/yamux_stream.cpp index 446c42fd5..f622b2f7e 100644 --- a/src/muxer/yamux/yamux_stream.cpp +++ b/src/muxer/yamux/yamux_stream.cpp @@ -25,6 +25,11 @@ namespace libp2p::connection { } } // namespace + /** + * Calls read callback on return. + * + * try { ... } finally { cb(); } + */ struct FinallyReading { FinallyReading(YamuxStream::ReadCallbackFunc cb, outcome::result r) : cb{std::move(cb)}, r{r} {}