From b3c24c9e26492fa7cbe9ebe0070e93ee1c57d184 Mon Sep 17 00:00:00 2001 From: turuslan Date: Tue, 15 Jul 2025 07:50:06 +0500 Subject: [PATCH 1/4] clang-format Signed-off-by: turuslan --- .../libp2p/connection/as_asio_read_write.hpp | 6 +- .../network/impl/listener_manager_impl.hpp | 6 +- src/basic/varint_reader.cpp | 37 +++++----- src/connection/loopback_stream.cpp | 12 +-- src/layer/websocket/ws_connection.cpp | 2 +- src/muxer/mplex/mplexed_connection.cpp | 6 +- src/muxer/yamux/yamuxed_connection.cpp | 6 +- src/protocol/echo/server_echo_session.cpp | 3 +- src/protocol/gossip/impl/stream.cpp | 19 ++--- .../plaintext/plaintext_connection.cpp | 6 +- src/security/tls/tls_connection.cpp | 6 +- src/transport/quic/connection.cpp | 6 +- src/transport/quic/stream.cpp | 6 +- test/acceptance/p2p/muxer.cpp | 63 ++++++++-------- test/libp2p/muxer/muxers_and_streams_test.cpp | 73 +++++++++---------- test/libp2p/protocol/identify_delta_test.cpp | 6 -- .../transport/tcp/tcp_integration_test.cpp | 27 ++++--- .../connection/capable_connection_mock.hpp | 6 +- .../connection/layer_connection_mock.hpp | 6 +- test/testutil/expect_read.hpp | 26 +++---- test/testutil/expect_write.hpp | 24 +++--- 21 files changed, 157 insertions(+), 195 deletions(-) diff --git a/include/libp2p/connection/as_asio_read_write.hpp b/include/libp2p/connection/as_asio_read_write.hpp index 3025e4afe..66d26de45 100644 --- a/include/libp2p/connection/as_asio_read_write.hpp +++ b/include/libp2p/connection/as_asio_read_write.hpp @@ -67,8 +67,7 @@ namespace libp2p { boost::asio::detail::buffer_sequence_adapter< boost::asio::mutable_buffer, MutableBufferSequence>::first(buffers)}; - impl->readSome( - asioBuffer(buffer), wrapCb(std::forward(cb))); + impl->readSome(asioBuffer(buffer), wrapCb(std::forward(cb))); } template @@ -77,8 +76,7 @@ namespace libp2p { boost::asio::detail::buffer_sequence_adapter< boost::asio::const_buffer, ConstBufferSequence>::first(buffers)}; - impl->writeSome( - asioBuffer(buffer), wrapCb(std::forward(cb))); + impl->writeSome(asioBuffer(buffer), wrapCb(std::forward(cb))); } std::shared_ptr io; diff --git a/include/libp2p/network/impl/listener_manager_impl.hpp b/include/libp2p/network/impl/listener_manager_impl.hpp index 08582ba73..fa9016868 100644 --- a/include/libp2p/network/impl/listener_manager_impl.hpp +++ b/include/libp2p/network/impl/listener_manager_impl.hpp @@ -52,9 +52,9 @@ namespace libp2p::network { private: bool started = false; - // clang-format off - std::unordered_map> listeners_; - // clang-format on + std::unordered_map> + listeners_; std::shared_ptr multiselect_; std::shared_ptr router_; diff --git a/src/basic/varint_reader.cpp b/src/basic/varint_reader.cpp index 55439aa20..9ea567d0a 100644 --- a/src/basic/varint_reader.cpp +++ b/src/basic/varint_reader.cpp @@ -4,8 +4,8 @@ * SPDX-License-Identifier: Apache-2.0 */ -#include #include +#include #include @@ -45,25 +45,24 @@ namespace libp2p::basic { } // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic) - readReturnSize( - conn, - std::span(varint_buf->data() + current_length, 1), - [c = conn, cb = std::move(cb), current_length, varint_buf]( - auto &&res) mutable { - if (not res.has_value()) { - return cb(res.error()); - } + readReturnSize(conn, + std::span(varint_buf->data() + current_length, 1), + [c = conn, cb = std::move(cb), current_length, varint_buf]( + auto &&res) mutable { + if (not res.has_value()) { + return cb(res.error()); + } - auto varint_opt = multi::UVarint::create( - std::span(varint_buf->data(), current_length + 1)); - if (varint_opt) { - return cb(*varint_opt); - } + auto varint_opt = multi::UVarint::create( + std::span(varint_buf->data(), current_length + 1)); + if (varint_opt) { + return cb(*varint_opt); + } - readVarint(std::move(c), - std::move(cb), - ++current_length, - std::move(varint_buf)); - }); + readVarint(std::move(c), + std::move(cb), + ++current_length, + std::move(varint_buf)); + }); } } // namespace libp2p::basic diff --git a/src/connection/loopback_stream.cpp b/src/connection/loopback_stream.cpp index 2b3d52b1c..6039c77d2 100644 --- a/src/connection/loopback_stream.cpp +++ b/src/connection/loopback_stream.cpp @@ -56,7 +56,7 @@ namespace libp2p::connection { }; void LoopbackStream::adjustWindowSize(uint32_t new_size, - VoidResultHandlerFunc cb){}; + VoidResultHandlerFunc cb) {}; outcome::result LoopbackStream::isInitiator() const { return outcome::success(false); @@ -88,8 +88,9 @@ namespace libp2p::connection { return deferWriteCallback(Error::STREAM_INVALID_ARGUMENT, std::move(cb)); } - if (boost::asio::buffer_copy(buffer_.prepare(in.size()), - boost::asio::const_buffer(in.data(), in.size())) + if (boost::asio::buffer_copy( + buffer_.prepare(in.size()), + boost::asio::const_buffer(in.data(), in.size())) != in.size()) { return deferWriteCallback(Error::STREAM_INTERNAL_ERROR, std::move(cb)); } @@ -121,9 +122,8 @@ namespace libp2p::connection { // this lambda checks, if there's enough data in our read buffer, and gives // it to the caller, if so - auto read_lambda = [self{shared_from_this()}, - cb{std::move(cb)}, - out](outcome::result res) mutable { + auto read_lambda = [self{shared_from_this()}, cb{std::move(cb)}, out]( + outcome::result res) mutable { if (!res) { self->data_notified_ = true; self->deferReadCallback(res.as_failure(), std::move(cb)); diff --git a/src/layer/websocket/ws_connection.cpp b/src/layer/websocket/ws_connection.cpp index 251893174..140b12e76 100644 --- a/src/layer/websocket/ws_connection.cpp +++ b/src/layer/websocket/ws_connection.cpp @@ -102,7 +102,7 @@ namespace libp2p::connection { ws_.async_read_some(asioBuffer(out), std::move(on_read)); } - void WsConnection::writeSome(BytesIn in, // + void WsConnection::writeSome(BytesIn in, libp2p::basic::Writer::WriteCallbackFunc cb) { SL_TRACE(log_, "write some upto {} bytes", in.size()); ws_.async_write_some(true, asioBuffer(in), toAsioCbSize(std::move(cb))); diff --git a/src/muxer/mplex/mplexed_connection.cpp b/src/muxer/mplex/mplexed_connection.cpp index 3d31b9b7b..06a69fc6e 100644 --- a/src/muxer/mplex/mplexed_connection.cpp +++ b/src/muxer/mplex/mplexed_connection.cpp @@ -125,13 +125,11 @@ namespace libp2p::connection { return !is_active_ || connection_->isClosed(); } - void MplexedConnection::readSome(BytesOut out, - ReadCallbackFunc cb) { + void MplexedConnection::readSome(BytesOut out, ReadCallbackFunc cb) { connection_->readSome(out, std::move(cb)); } - void MplexedConnection::writeSome(BytesIn in, - WriteCallbackFunc cb) { + void MplexedConnection::writeSome(BytesIn in, WriteCallbackFunc cb) { connection_->writeSome(in, std::move(cb)); } diff --git a/src/muxer/yamux/yamuxed_connection.cpp b/src/muxer/yamux/yamuxed_connection.cpp index 5177298ba..96b311c04 100644 --- a/src/muxer/yamux/yamuxed_connection.cpp +++ b/src/muxer/yamux/yamuxed_connection.cpp @@ -163,14 +163,12 @@ namespace libp2p::connection { return !started_ || connection_->isClosed(); } - void YamuxedConnection::readSome(BytesOut out, - ReadCallbackFunc cb) { + void YamuxedConnection::readSome(BytesOut out, ReadCallbackFunc cb) { log()->error("YamuxedConnection::readSome : invalid direct call"); deferReadCallback(Error::CONNECTION_DIRECT_IO_FORBIDDEN, std::move(cb)); } - void YamuxedConnection::writeSome(BytesIn in, - WriteCallbackFunc cb) { + void YamuxedConnection::writeSome(BytesIn in, WriteCallbackFunc cb) { log()->error("YamuxedConnection::writeSome : invalid direct call"); deferWriteCallback(Error::CONNECTION_DIRECT_IO_FORBIDDEN, std::move(cb)); } diff --git a/src/protocol/echo/server_echo_session.cpp b/src/protocol/echo/server_echo_session.cpp index 291bea4fe..518bb3e9b 100644 --- a/src/protocol/echo/server_echo_session.cpp +++ b/src/protocol/echo/server_echo_session.cpp @@ -47,8 +47,7 @@ namespace libp2p::protocol { } stream_->readSome( - buf_, - [self{shared_from_this()}](outcome::result rread) { + buf_, [self{shared_from_this()}](outcome::result rread) { self->onRead(rread); }); } diff --git a/src/protocol/gossip/impl/stream.cpp b/src/protocol/gossip/impl/stream.cpp index e24b230a5..6ee74e661 100644 --- a/src/protocol/gossip/impl/stream.cpp +++ b/src/protocol/gossip/impl/stream.cpp @@ -48,18 +48,15 @@ namespace libp2p::protocol::gossip { TRACE("reading length from {}:{}", peer_->str, stream_id_); - // clang-format off libp2p::basic::VarintReader::readVarint( stream_, - [self_wptr = weak_from_this(), this] - (outcome::result varint) { + [self_wptr = weak_from_this(), + this](outcome::result varint) { if (self_wptr.expired()) { return; } onLengthRead(std::move(varint)); - } - ); - // clang-format on + }); reading_ = true; } @@ -161,21 +158,17 @@ namespace libp2p::protocol::gossip { TRACE("writing {} bytes to {}:{}", writing_bytes_, peer_->str, stream_id_); - // clang-format off BytesIn span{*buffer}; writeReturnSize( stream_, span, - [self_wptr = weak_from_this(), this, buffer = std::move(buffer)] - (outcome::result result) - { + [self_wptr = weak_from_this(), this, buffer = std::move(buffer)]( + outcome::result result) { if (self_wptr.expired() || closed_) { return; } onMessageWritten(result); - } - ); - // clang-format on + }); if (timeout_ > std::chrono::milliseconds::zero()) { timeout_handle_ = scheduler_.scheduleWithHandle( diff --git a/src/security/plaintext/plaintext_connection.cpp b/src/security/plaintext/plaintext_connection.cpp index 17eadf700..3914dbbf4 100644 --- a/src/security/plaintext/plaintext_connection.cpp +++ b/src/security/plaintext/plaintext_connection.cpp @@ -58,13 +58,11 @@ namespace libp2p::connection { return original_connection_->remoteMultiaddr(); } - void PlaintextConnection::readSome(BytesOut in, - Reader::ReadCallbackFunc f) { + void PlaintextConnection::readSome(BytesOut in, Reader::ReadCallbackFunc f) { return original_connection_->readSome(in, std::move(f)); }; - void PlaintextConnection::writeSome(BytesIn in, - Writer::WriteCallbackFunc f) { + void PlaintextConnection::writeSome(BytesIn in, Writer::WriteCallbackFunc f) { return original_connection_->writeSome(in, std::move(f)); } diff --git a/src/security/tls/tls_connection.cpp b/src/security/tls/tls_connection.cpp index a26840699..bc6bce743 100644 --- a/src/security/tls/tls_connection.cpp +++ b/src/security/tls/tls_connection.cpp @@ -139,8 +139,7 @@ namespace libp2p::connection { }; } - void TlsConnection::readSome(BytesOut out, - Reader::ReadCallbackFunc cb) { + void TlsConnection::readSome(BytesOut out, Reader::ReadCallbackFunc cb) { SL_TRACE(log(), "reading some up to {} bytes", out.size()); socket_.async_read_some(asioBuffer(out), closeOnError(*this, std::move(cb))); @@ -151,8 +150,7 @@ namespace libp2p::connection { original_connection_->deferReadCallback(res, std::move(cb)); } - void TlsConnection::writeSome(BytesIn in, - Writer::WriteCallbackFunc cb) { + void TlsConnection::writeSome(BytesIn in, Writer::WriteCallbackFunc cb) { SL_TRACE(log(), "writing some up to {} bytes", in.size()); socket_.async_write_some(asioBuffer(in), closeOnError(*this, std::move(cb))); diff --git a/src/transport/quic/connection.cpp b/src/transport/quic/connection.cpp index c39f0ef50..68d579740 100644 --- a/src/transport/quic/connection.cpp +++ b/src/transport/quic/connection.cpp @@ -32,8 +32,7 @@ namespace libp2p::transport { std::ignore = close(); } - void QuicConnection::readSome(BytesOut out, - ReadCallbackFunc cb) { + void QuicConnection::readSome(BytesOut out, ReadCallbackFunc cb) { throw std::logic_error{"QuicConnection::readSome must not be called"}; } @@ -42,8 +41,7 @@ namespace libp2p::transport { post(*io_context_, [cb{std::move(cb)}, res] { cb(res); }); } - void QuicConnection::writeSome(BytesIn in, - WriteCallbackFunc cb) { + void QuicConnection::writeSome(BytesIn in, WriteCallbackFunc cb) { throw std::logic_error{"QuicConnection::writeSome must not be called"}; } diff --git a/src/transport/quic/stream.cpp b/src/transport/quic/stream.cpp index 33e0baa60..27435958f 100644 --- a/src/transport/quic/stream.cpp +++ b/src/transport/quic/stream.cpp @@ -25,8 +25,7 @@ namespace libp2p::connection { reset(); } - void QuicStream::readSome(BytesOut out, - basic::Reader::ReadCallbackFunc cb) { + void QuicStream::readSome(BytesOut out, basic::Reader::ReadCallbackFunc cb) { outcome::result r = QuicError::STREAM_CLOSED; if (not stream_ctx_) { return cb(r); @@ -51,8 +50,7 @@ namespace libp2p::connection { conn_->deferReadCallback(res, std::move(cb)); } - void QuicStream::writeSome(BytesIn in, - basic::Writer::WriteCallbackFunc cb) { + void QuicStream::writeSome(BytesIn in, basic::Writer::WriteCallbackFunc cb) { outcome::result r = QuicError::STREAM_CLOSED; if (not stream_ctx_) { return cb(r); diff --git a/test/acceptance/p2p/muxer.cpp b/test/acceptance/p2p/muxer.cpp index 67731bf9d..6782894e9 100644 --- a/test/acceptance/p2p/muxer.cpp +++ b/test/acceptance/p2p/muxer.cpp @@ -127,38 +127,37 @@ struct Server : public std::enable_shared_from_this { println("onStream executed"); - stream->readSome( - *buf, [buf, stream, this](outcome::result rread) { - if (!rread) { - if (rread.error() == RawConnection::Error::CONNECTION_CLOSED_BY_PEER - || rread.error() == Stream::Error::STREAM_RESET_BY_HOST) { - return; - } - this->println(fmt::format("readSome error: {}", rread.error())); - } - - ASSERT_OUTCOME_SUCCESS(read, rread); - - this->println("readSome ", read, " bytes"); - if (read == 0) { - return; - } - this->streamReads++; - - // 01-echo back read data - buf->resize(read); - writeReturnSize( - stream, - *buf, - [buf, read, stream, this](outcome::result rwrite) { - ASSERT_OUTCOME_SUCCESS(write, rwrite); - this->println("write ", write, " bytes"); - this->streamWrites++; - ASSERT_EQ(write, read); - - this->onStream(buf, stream); - }); - }); + stream->readSome(*buf, [buf, stream, this](outcome::result rread) { + if (!rread) { + if (rread.error() == RawConnection::Error::CONNECTION_CLOSED_BY_PEER + || rread.error() == Stream::Error::STREAM_RESET_BY_HOST) { + return; + } + this->println(fmt::format("readSome error: {}", rread.error())); + } + + ASSERT_OUTCOME_SUCCESS(read, rread); + + this->println("readSome ", read, " bytes"); + if (read == 0) { + return; + } + this->streamReads++; + + // 01-echo back read data + buf->resize(read); + writeReturnSize( + stream, + *buf, + [buf, read, stream, this](outcome::result rwrite) { + ASSERT_OUTCOME_SUCCESS(write, rwrite); + this->println("write ", write, " bytes"); + this->streamWrites++; + ASSERT_EQ(write, read); + + this->onStream(buf, stream); + }); + }); } void listen(const Multiaddress &ma) { diff --git a/test/libp2p/muxer/muxers_and_streams_test.cpp b/test/libp2p/muxer/muxers_and_streams_test.cpp index a622c7dd7..af2fe5fe3 100644 --- a/test/libp2p/muxer/muxers_and_streams_test.cpp +++ b/test/libp2p/muxer/muxers_and_streams_test.cpp @@ -141,16 +141,14 @@ namespace libp2p::regression { void connect(const peer::PeerInfo &connect_to) { start(); - // clang-format off - host_->newStream( - connect_to, - {getProtocolId()}, - [wptr = weak_from_this()] (auto rstream) { - auto self = wptr.lock(); - if (self) { self->onConnected(std::move(rstream)); } - } - ); - // clang-format on + host_->newStream(connect_to, + {getProtocolId()}, + [wptr = weak_from_this()](auto rstream) { + auto self = wptr.lock(); + if (self) { + self->onConnected(std::move(rstream)); + } + }); } void listen(const multi::Multiaddress &listen_to) { @@ -173,16 +171,14 @@ namespace libp2p::regression { stats_.put(Stats::FATAL_ERROR); return behavior_(*this); } - // clang-format off - readReturnSize( - stream, - *read_buf_, - [wptr = weak_from_this(), buf = read_buf_] (auto res) { - auto self = wptr.lock(); - if (self) { self->onRead(res); } - } - ); - //clang-format on + readReturnSize(stream, + *read_buf_, + [wptr = weak_from_this(), buf = read_buf_](auto res) { + auto self = wptr.lock(); + if (self) { + self->onRead(res); + } + }); } void write(WhatStream what_stream = ANY_STREAM) { @@ -192,31 +188,29 @@ namespace libp2p::regression { stats_.put(Stats::FATAL_ERROR); return behavior_(*this); } - // clang-format off - writeReturnSize( - stream, - *write_buf_, - [wptr = weak_from_this(), buf = write_buf_] (auto res) { - auto self = wptr.lock(); - if (self) { self->onWrite(res); } - } - ); - //clang-format on + writeReturnSize(stream, + *write_buf_, + [wptr = weak_from_this(), buf = write_buf_](auto res) { + auto self = wptr.lock(); + if (self) { + self->onWrite(res); + } + }); } void stop() { if (accepted_stream_) { auto p = accepted_stream_->remotePeerId(); if (p) { - host_->getNetwork().getConnectionManager() - .closeConnectionsToPeer(p.value()); + host_->getNetwork().getConnectionManager().closeConnectionsToPeer( + p.value()); } } if (connected_stream_) { auto p = connected_stream_->remotePeerId(); if (p) { - host_->getNetwork().getConnectionManager() - .closeConnectionsToPeer(p.value()); + host_->getNetwork().getConnectionManager().closeConnectionsToPeer( + p.value()); } } @@ -235,15 +229,14 @@ namespace libp2p::regression { void start() { if (!started_) { - // clang-format off host_->setProtocolHandler( {getProtocolId()}, - [wptr = weak_from_this()] (StreamAndProtocol stream) { + [wptr = weak_from_this()](StreamAndProtocol stream) { auto self = wptr.lock(); - if (self) { self->onAccepted(std::move(stream)); } - } - ); - // clang-format on + if (self) { + self->onAccepted(std::move(stream)); + } + }); host_->start(); started_ = true; diff --git a/test/libp2p/protocol/identify_delta_test.cpp b/test/libp2p/protocol/identify_delta_test.cpp index 677bfefd5..c7763c4b9 100644 --- a/test/libp2p/protocol/identify_delta_test.cpp +++ b/test/libp2p/protocol/identify_delta_test.cpp @@ -129,12 +129,6 @@ TEST_F(IdentifyDeltaTest, Send) { .WillOnce(InvokeArgument<2>( StreamAndProtocol{stream_, kIdentifyDeltaProtocol})); - auto if_added = [&](BytesIn actual) { - auto expected = BytesIn(msg_added_protos_bytes_); - return std::equal( - actual.begin(), actual.end(), expected.begin(), expected.end()); - }; - EXPECT_CALL_WRITE(*stream_).WILL_WRITE(msg_added_protos_bytes_); id_delta_->start(); diff --git a/test/libp2p/transport/tcp/tcp_integration_test.cpp b/test/libp2p/transport/tcp/tcp_integration_test.cpp index d9fe9edef..b824af075 100644 --- a/test/libp2p/transport/tcp/tcp_integration_test.cpp +++ b/test/libp2p/transport/tcp/tcp_integration_test.cpp @@ -146,20 +146,19 @@ TEST(TCP, SingleListenerCanAcceptManyClients) { EXPECT_FALSE(conn->isInitiator()); auto buf = std::make_shared>(kSize, 0); - conn->readSome( - *buf, [&counter, conn, buf, context](auto &&res) { - ASSERT_OUTCOME_SUCCESS(res); - - libp2p::writeReturnSize( - conn, *buf, [&counter, conn, buf, context](auto &&res) { - ASSERT_OUTCOME_SUCCESS(res); - EXPECT_EQ(res.value(), buf->size()); - counter++; - if (counter >= kClients) { - context->stop(); - } - }); - }); + conn->readSome(*buf, [&counter, conn, buf, context](auto &&res) { + ASSERT_OUTCOME_SUCCESS(res); + + libp2p::writeReturnSize( + conn, *buf, [&counter, conn, buf, context](auto &&res) { + ASSERT_OUTCOME_SUCCESS(res); + EXPECT_EQ(res.value(), buf->size()); + counter++; + if (counter >= kClients) { + context->stop(); + } + }); + }); }); ASSERT_TRUE(listener); diff --git a/test/mock/libp2p/connection/capable_connection_mock.hpp b/test/mock/libp2p/connection/capable_connection_mock.hpp index c5345a29d..8b305d26a 100644 --- a/test/mock/libp2p/connection/capable_connection_mock.hpp +++ b/test/mock/libp2p/connection/capable_connection_mock.hpp @@ -81,13 +81,11 @@ namespace libp2p::connection { return real_->remoteMultiaddr(); }; - void readSome(BytesOut in, - Reader::ReadCallbackFunc f) override { + void readSome(BytesOut in, Reader::ReadCallbackFunc f) override { return real_->readSome(in, f); }; - void writeSome(BytesIn in, - Writer::WriteCallbackFunc f) override { + void writeSome(BytesIn in, Writer::WriteCallbackFunc f) override { return real_->writeSome(in, f); } diff --git a/test/mock/libp2p/connection/layer_connection_mock.hpp b/test/mock/libp2p/connection/layer_connection_mock.hpp index 4473f7d59..741a9d89d 100644 --- a/test/mock/libp2p/connection/layer_connection_mock.hpp +++ b/test/mock/libp2p/connection/layer_connection_mock.hpp @@ -6,14 +6,16 @@ #pragma once -#include #include +#include #include namespace libp2p::connection { - class LayerConnectionMock : public std::enable_shared_from_this, public virtual LayerConnection { + class LayerConnectionMock + : public std::enable_shared_from_this, + public virtual LayerConnection { public: ~LayerConnectionMock() override = default; diff --git a/test/testutil/expect_read.hpp b/test/testutil/expect_read.hpp index ccf770a88..9a66fa3e5 100644 --- a/test/testutil/expect_read.hpp +++ b/test/testutil/expect_read.hpp @@ -11,22 +11,22 @@ #define EXPECT_CALL_READ(mock) \ EXPECT_CALL(mock, readSome(testing::_, testing::_)) -#define WILL_READ(_expected) \ - WillOnce([expected{qtils::asVec(_expected)}]( \ - libp2p::BytesOut out, \ - libp2p::basic::Reader::ReadCallbackFunc cb) { \ - ASSERT_GE(out.size(), expected.size()); \ - memcpy(out.data(), expected.data(), expected.size()); \ - cb(expected.size()); \ - }) +#define WILL_READ(_expected) \ + WillOnce( \ + [expected{qtils::asVec(_expected)}]( \ + libp2p::BytesOut out, libp2p::basic::Reader::ReadCallbackFunc cb) { \ + ASSERT_GE(out.size(), expected.size()); \ + memcpy(out.data(), expected.data(), expected.size()); \ + cb(expected.size()); \ + }) #define WILL_READ_SIZE(_expected) \ WillOnce([expected{_expected}](libp2p::BytesOut out, \ libp2p::basic::Reader::ReadCallbackFunc cb) { \ ASSERT_EQ(out.size(), expected); \ cb(expected); \ }) -#define WILL_READ_ERROR() \ - WillOnce([](libp2p::BytesOut out, \ - libp2p::basic::Reader::ReadCallbackFunc cb) { \ - cb(std::errc::io_error); \ - }) +#define WILL_READ_ERROR() \ + WillOnce( \ + [](libp2p::BytesOut out, libp2p::basic::Reader::ReadCallbackFunc cb) { \ + cb(std::errc::io_error); \ + }) diff --git a/test/testutil/expect_write.hpp b/test/testutil/expect_write.hpp index 9bd9a9860..cf4fe0233 100644 --- a/test/testutil/expect_write.hpp +++ b/test/testutil/expect_write.hpp @@ -11,13 +11,13 @@ #define EXPECT_CALL_WRITE(mock) \ EXPECT_CALL(mock, writeSome(testing::_, testing::_)) -#define WILL_WRITE(_expected) \ - WillOnce([expected{qtils::asVec(_expected)}]( \ - libp2p::BytesIn in, \ - libp2p::basic::Writer::WriteCallbackFunc cb) { \ - ASSERT_EQ(qtils::asVec(in), expected); \ - cb(in.size()); \ - }) +#define WILL_WRITE(_expected) \ + WillOnce( \ + [expected{qtils::asVec(_expected)}]( \ + libp2p::BytesIn in, libp2p::basic::Writer::WriteCallbackFunc cb) { \ + ASSERT_EQ(qtils::asVec(in), expected); \ + cb(in.size()); \ + }) #define WILL_WRITE_SIZE(_expected) \ WillOnce( \ [expected{_expected}](libp2p::BytesIn in, \ @@ -25,8 +25,8 @@ ASSERT_EQ(in.size(), expected); \ cb(expected); \ }) -#define WILL_WRITE_ERROR() \ - WillOnce([](libp2p::BytesIn in, \ - libp2p::basic::Writer::WriteCallbackFunc cb) { \ - cb(std::errc::io_error); \ - }) +#define WILL_WRITE_ERROR() \ + WillOnce( \ + [](libp2p::BytesIn in, libp2p::basic::Writer::WriteCallbackFunc cb) { \ + cb(std::errc::io_error); \ + }) From c1b3b4c6ce37abc2cd4dfd35096da102b1034578 Mon Sep 17 00:00:00 2001 From: turuslan Date: Tue, 15 Jul 2025 11:03:40 +0500 Subject: [PATCH 2/4] remove writeReturnSize Signed-off-by: turuslan --- example/02-kademlia/rendezvous_chat.cpp | 10 ++--- include/libp2p/basic/write_return_size.hpp | 25 ----------- .../libp2p/muxer/mplex/mplexed_connection.hpp | 5 ++- .../protocol/ping/ping_client_session.hpp | 2 +- .../multiselect/multiselect_instance.hpp | 2 +- src/basic/message_read_writer_bigendian.cpp | 19 ++++---- src/basic/message_read_writer_uvarint.cpp | 22 +++++----- src/muxer/mplex/mplex_stream.cpp | 4 +- src/muxer/mplex/mplexed_connection.cpp | 17 +++---- src/protocol/echo/client_echo_session.cpp | 4 +- src/protocol/gossip/impl/stream.cpp | 27 +++++------- src/protocol/gossip/impl/stream.hpp | 2 +- src/protocol/ping/ping_client_session.cpp | 14 +++--- src/protocol/ping/ping_server_session.cpp | 4 +- .../multiselect/multiselect_instance.cpp | 25 +++++------ .../multiselect/simple_stream_negotiate.cpp | 23 ++++------ src/security/secio/secio.cpp | 44 +++++++++---------- src/security/secio/secio_connection.cpp | 2 +- .../p2p/host/protocol/client_test_session.cpp | 20 ++++----- test/acceptance/p2p/muxer.cpp | 27 +++++------- .../loopback_stream/loopback_stream_test.cpp | 11 ++--- .../plaintext_connection_test.cpp | 5 +-- test/libp2p/muxer/muxers_and_streams_test.cpp | 25 ++++++----- .../transport/tcp/tcp_integration_test.cpp | 42 +++++++++--------- 24 files changed, 167 insertions(+), 214 deletions(-) delete mode 100644 include/libp2p/basic/write_return_size.hpp diff --git a/example/02-kademlia/rendezvous_chat.cpp b/example/02-kademlia/rendezvous_chat.cpp index ae00d5f15..581d42e97 100644 --- a/example/02-kademlia/rendezvous_chat.cpp +++ b/example/02-kademlia/rendezvous_chat.cpp @@ -11,12 +11,13 @@ #include -#include +#include #include #include #include #include #include +#include using libp2p::common::operator""_unhex; @@ -69,10 +70,10 @@ class Session : public std::enable_shared_from_this { return false; } - libp2p::writeReturnSize( + libp2p::write( stream_, *buffer, - [self = shared_from_this(), buffer](outcome::result result) { + [self = shared_from_this(), buffer](outcome::result result) { if (not result) { self->close(); std::cout << self->stream_->remotePeerId().value().toBase58() @@ -80,8 +81,7 @@ class Session : public std::enable_shared_from_this { return; } std::cout << self->stream_->remotePeerId().value().toBase58() << " < " - << std::string(buffer->begin(), - buffer->begin() + result.value()); + << qtils::byte2str(*buffer); std::cout.flush(); }); return true; diff --git a/include/libp2p/basic/write_return_size.hpp b/include/libp2p/basic/write_return_size.hpp deleted file mode 100644 index b5b4f4030..000000000 --- a/include/libp2p/basic/write_return_size.hpp +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Copyright Quadrivium LLC - * All Rights Reserved - * SPDX-License-Identifier: Apache-2.0 - */ - -#pragma once - -#include - -namespace libp2p { - /// Write exactly `in.size()` bytes - inline void writeReturnSize(const std::shared_ptr &writer, - BytesIn in, - basic::Writer::WriteCallbackFunc cb) { - write( - writer, in, [n{in.size()}, cb{std::move(cb)}](outcome::result r) { - if (r.has_error()) { - cb(r.error()); - } else { - cb(n); - } - }); - } -} // namespace libp2p diff --git a/include/libp2p/muxer/mplex/mplexed_connection.hpp b/include/libp2p/muxer/mplex/mplexed_connection.hpp index 166af8836..85a134f1e 100644 --- a/include/libp2p/muxer/mplex/mplexed_connection.hpp +++ b/include/libp2p/muxer/mplex/mplexed_connection.hpp @@ -10,6 +10,7 @@ #include #include +#include #include #include #include @@ -74,7 +75,7 @@ namespace libp2p::connection { private: struct WriteData { Bytes data; - WriteCallbackFunc cb; + CbOutcomeVoid cb; }; std::queue write_queue_; bool is_writing_ = false; @@ -92,7 +93,7 @@ namespace libp2p::connection { /** * Called, when write is complete */ - void onWriteCompleted(outcome::result write_res); + void onWriteCompleted(outcome::result write_res); /** * Read next frame from the connection diff --git a/include/libp2p/protocol/ping/ping_client_session.hpp b/include/libp2p/protocol/ping/ping_client_session.hpp index 00c9e5e46..89bc0f762 100644 --- a/include/libp2p/protocol/ping/ping_client_session.hpp +++ b/include/libp2p/protocol/ping/ping_client_session.hpp @@ -46,7 +46,7 @@ namespace libp2p::protocol { private: void write(); - void writeCompleted(outcome::result r); + void writeCompleted(outcome::result r); void read(); diff --git a/include/libp2p/protocol_muxer/multiselect/multiselect_instance.hpp b/include/libp2p/protocol_muxer/multiselect/multiselect_instance.hpp index 4133332d8..37bd76b49 100644 --- a/include/libp2p/protocol_muxer/multiselect/multiselect_instance.hpp +++ b/include/libp2p/protocol_muxer/multiselect/multiselect_instance.hpp @@ -60,7 +60,7 @@ namespace libp2p::protocol_muxer::multiselect { void send(Packet packet); /// Called when write operation completes - void onDataWritten(outcome::result res); + void onDataWritten(outcome::result res); /// Closes the negotiation session with result, returns instance to owner void close(outcome::result result); diff --git a/src/basic/message_read_writer_bigendian.cpp b/src/basic/message_read_writer_bigendian.cpp index a00d48205..f0ca9e39a 100644 --- a/src/basic/message_read_writer_bigendian.cpp +++ b/src/basic/message_read_writer_bigendian.cpp @@ -12,8 +12,9 @@ #include #include #include -#include +#include #include +#include namespace libp2p::basic { MessageReadWriterBigEndian::MessageReadWriterBigEndian( @@ -57,14 +58,12 @@ namespace libp2p::basic { raw_buf.reserve(kLenMarkerSize + buffer.size()); common::putUint32BE(raw_buf, buffer.size()); raw_buf.insert(raw_buf.end(), buffer.begin(), buffer.end()); - writeReturnSize( - conn_, - raw_buf, - [self{shared_from_this()}, cb{std::move(cb)}](auto &&result) { - if (not result) { - return cb(result.error()); - } - cb(result.value() - self->kLenMarkerSize); - }); + libp2p::write(conn_, + raw_buf, + [self{shared_from_this()}, + cb{std::move(cb)}](outcome::result result) { + IF_ERROR_CB_RETURN(result); + cb(outcome::success()); + }); } } // namespace libp2p::basic diff --git a/src/basic/message_read_writer_uvarint.cpp b/src/basic/message_read_writer_uvarint.cpp index 23d687cec..5810252be 100644 --- a/src/basic/message_read_writer_uvarint.cpp +++ b/src/basic/message_read_writer_uvarint.cpp @@ -13,7 +13,8 @@ #include #include #include -#include +#include +#include #include namespace libp2p::basic { @@ -61,16 +62,13 @@ namespace libp2p::basic { std::make_move_iterator(varint_len.toVector().end())); msg_bytes->insert(msg_bytes->end(), buffer.begin(), buffer.end()); - writeReturnSize(conn_, - *msg_bytes, - [cb = std::move(cb), - varint_size = varint_len.size(), - msg_bytes](auto &&res) { - if (!res) { - return cb(res.error()); - } - // hide a written varint from the user of the method - cb(res.value() - varint_size); - }); + libp2p::write(conn_, + *msg_bytes, + [cb = std::move(cb), + varint_size = varint_len.size(), + msg_bytes](outcome::result result) { + IF_ERROR_CB_RETURN(result); + cb(outcome::success()); + }); } } // namespace libp2p::basic diff --git a/src/muxer/mplex/mplex_stream.cpp b/src/muxer/mplex/mplex_stream.cpp index 099e13671..20ab1612b 100644 --- a/src/muxer/mplex/mplex_stream.cpp +++ b/src/muxer/mplex/mplex_stream.cpp @@ -10,7 +10,7 @@ #include #include -#include +#include #include #define TRY_GET_CONNECTION(tmp) \ @@ -124,7 +124,7 @@ namespace libp2p::connection { if (not self->write_queue_.empty()) { auto [in, cb] = self->write_queue_.front(); self->write_queue_.pop_front(); - writeReturnSize(self, in, cb); + self->writeSome(in, cb); } }); } diff --git a/src/muxer/mplex/mplexed_connection.cpp b/src/muxer/mplex/mplexed_connection.cpp index 06a69fc6e..ae8bef688 100644 --- a/src/muxer/mplex/mplexed_connection.cpp +++ b/src/muxer/mplex/mplexed_connection.cpp @@ -8,7 +8,7 @@ #include #include -#include +#include #include namespace libp2p::connection { @@ -163,19 +163,20 @@ namespace libp2p::connection { is_writing_ = true; const auto &write_data = write_queue_.front(); - return writeReturnSize( - connection_, write_data.data, [self{shared_from_this()}](auto &&res) { - self->onWriteCompleted(std::forward(res)); - }); + libp2p::write(connection_, + write_data.data, + [self{shared_from_this()}](outcome::result result) { + self->onWriteCompleted(result); + }); } - void MplexedConnection::onWriteCompleted(outcome::result write_res) { + void MplexedConnection::onWriteCompleted(outcome::result write_res) { if (!write_res) { log_->error("data write failed: {}", write_res.error()); } - - write_queue_.front().cb(std::forward(write_res)); + auto item = std::exchange(write_queue_.front(), {}); write_queue_.pop(); + item.cb(write_res); doWrite(); } diff --git a/src/protocol/echo/client_echo_session.cpp b/src/protocol/echo/client_echo_session.cpp index f1b36ab8e..07a0dc726 100644 --- a/src/protocol/echo/client_echo_session.cpp +++ b/src/protocol/echo/client_echo_session.cpp @@ -8,7 +8,7 @@ #include -#include +#include #include namespace libp2p::protocol { @@ -33,7 +33,7 @@ namespace libp2p::protocol { auto self{shared_from_this()}; - writeReturnSize(stream_, buf_, [self](outcome::result rw) { + write(stream_, buf_, [self](outcome::result rw) { if (!rw && !self->ec_) { self->ec_ = rw.error(); self->completed(); diff --git a/src/protocol/gossip/impl/stream.cpp b/src/protocol/gossip/impl/stream.cpp index 6ee74e661..4cc2d91ec 100644 --- a/src/protocol/gossip/impl/stream.cpp +++ b/src/protocol/gossip/impl/stream.cpp @@ -10,7 +10,7 @@ #include #include -#include +#include #include "message_parser.hpp" #include "peer_context.hpp" @@ -158,16 +158,15 @@ namespace libp2p::protocol::gossip { TRACE("writing {} bytes to {}:{}", writing_bytes_, peer_->str, stream_id_); - BytesIn span{*buffer}; - writeReturnSize( + libp2p::write( stream_, - span, - [self_wptr = weak_from_this(), this, buffer = std::move(buffer)]( - outcome::result result) { - if (self_wptr.expired() || closed_) { + *buffer, + [weak_self{weak_from_this()}, buffer](outcome::result result) { + auto self = weak_self.lock(); + if (not self) { return; } - onMessageWritten(result); + self->onMessageWritten(result); }); if (timeout_ > std::chrono::milliseconds::zero()) { @@ -182,7 +181,10 @@ namespace libp2p::protocol::gossip { } } - void Stream::onMessageWritten(outcome::result res) { + void Stream::onMessageWritten(outcome::result res) { + if (closed_) { + return; + } if (writing_bytes_ == 0) { return; } @@ -192,12 +194,7 @@ namespace libp2p::protocol::gossip { return; } - TRACE("written {} bytes to {}:{}", res.value(), peer_->str, stream_id_); - - if (writing_bytes_ != res.value()) { - feedback_(peer_, Error::MESSAGE_WRITE_ERROR); - return; - } + TRACE("written {} bytes to {}:{}", writing_bytes_, peer_->str, stream_id_); endWrite(); diff --git a/src/protocol/gossip/impl/stream.hpp b/src/protocol/gossip/impl/stream.hpp index 12477a061..070afaefd 100644 --- a/src/protocol/gossip/impl/stream.hpp +++ b/src/protocol/gossip/impl/stream.hpp @@ -52,7 +52,7 @@ namespace libp2p::protocol::gossip { void onLengthRead(outcome::result varint); void onMessageRead(outcome::result res); void beginWrite(SharedBuffer buffer); - void onMessageWritten(outcome::result res); + void onMessageWritten(outcome::result res); void endWrite(); void asyncPostError(Error error); diff --git a/src/protocol/ping/ping_client_session.cpp b/src/protocol/ping/ping_client_session.cpp index 58c6c40bc..fbe5687f8 100644 --- a/src/protocol/ping/ping_client_session.cpp +++ b/src/protocol/ping/ping_client_session.cpp @@ -8,7 +8,7 @@ #include #include -#include +#include #include namespace libp2p::protocol { @@ -55,14 +55,14 @@ namespace libp2p::protocol { config_.timeout); write_buffer_ = rand_gen_->randomBytes(config_.message_size); - writeReturnSize(stream_, - write_buffer_, - [self{shared_from_this()}](outcome::result r) { - self->writeCompleted(r); - }); + libp2p::write(stream_, + write_buffer_, + [self{shared_from_this()}](outcome::result r) { + self->writeCompleted(r); + }); } - void PingClientSession::writeCompleted(outcome::result r) { + void PingClientSession::writeCompleted(outcome::result r) { timer_.reset(); if (r.has_error()) { // timeout passed or error happened; in any case, we cannot ping it diff --git a/src/protocol/ping/ping_server_session.cpp b/src/protocol/ping/ping_server_session.cpp index d5de0231a..361462509 100644 --- a/src/protocol/ping/ping_server_session.cpp +++ b/src/protocol/ping/ping_server_session.cpp @@ -8,7 +8,7 @@ #include #include -#include +#include #include namespace libp2p::protocol { @@ -50,7 +50,7 @@ namespace libp2p::protocol { return; } - writeReturnSize( + libp2p::write( stream_, buffer_, [self{shared_from_this()}](auto &&write_res) { if (!write_res) { return; diff --git a/src/protocol_muxer/multiselect/multiselect_instance.cpp b/src/protocol_muxer/multiselect/multiselect_instance.cpp index 701f7c093..9fd1080cb 100644 --- a/src/protocol_muxer/multiselect/multiselect_instance.cpp +++ b/src/protocol_muxer/multiselect/multiselect_instance.cpp @@ -11,7 +11,7 @@ #include #include -#include +#include #include #include #include @@ -144,23 +144,20 @@ namespace libp2p::protocol_muxer::multiselect { return; } - auto span = BytesIn(*packet); - - writeReturnSize(connection_, - span, - [wptr = weak_from_this(), - round = current_round_, - packet = std::move(packet)](outcome::result res) { - auto self = wptr.lock(); - if (self && self->current_round_ == round) { - self->onDataWritten(res); - } - }); + write(connection_, + *packet, + [wptr = weak_from_this(), round = current_round_, packet]( + outcome::result res) { + auto self = wptr.lock(); + if (self && self->current_round_ == round) { + self->onDataWritten(res); + } + }); is_writing_ = true; } - void MultiselectInstance::onDataWritten(outcome::result res) { + void MultiselectInstance::onDataWritten(outcome::result res) { is_writing_ = false; if (!res) { diff --git a/src/protocol_muxer/multiselect/simple_stream_negotiate.cpp b/src/protocol_muxer/multiselect/simple_stream_negotiate.cpp index 0345d0931..6651f334e 100644 --- a/src/protocol_muxer/multiselect/simple_stream_negotiate.cpp +++ b/src/protocol_muxer/multiselect/simple_stream_negotiate.cpp @@ -7,7 +7,7 @@ #include #include -#include +#include #include #include #include @@ -89,15 +89,11 @@ namespace libp2p::protocol_muxer::multiselect { void onPacketWritten(StreamPtr stream, Callback cb, std::shared_ptr buffers, - outcome::result res) { + outcome::result res) { if (!res) { return failed(stream, cb, res.error()); } - if (res.value() != buffers->written.size()) { - return failed(stream, cb, ProtocolMuxer::Error::INTERNAL_ERROR); - } - BytesOut span(buffers->read); span = span.first(kMaxVarintSize); @@ -131,14 +127,13 @@ namespace libp2p::protocol_muxer::multiselect { BytesIn span(buffers->written); - writeReturnSize( - stream, - span, - [stream = stream, cb = std::move(cb), buffers = std::move(buffers)]( - outcome::result res) mutable { - onPacketWritten( - std::move(stream), std::move(cb), std::move(buffers), res); - }); + write(stream, + span, + [stream = stream, cb = std::move(cb), buffers = std::move(buffers)]( + outcome::result res) mutable { + onPacketWritten( + std::move(stream), std::move(cb), std::move(buffers), res); + }); } } // namespace libp2p::protocol_muxer::multiselect diff --git a/src/security/secio/secio.cpp b/src/security/secio/secio.cpp index bfc267ef0..c803b0b54 100644 --- a/src/security/secio/secio.cpp +++ b/src/security/secio/secio.cpp @@ -9,7 +9,7 @@ #include #include #include -#include +#include #include #include #include @@ -256,29 +256,25 @@ namespace libp2p::security { local_stretched_key, remote_stretched_key); SECIO_OUTCOME_VOID_TRY(secio_conn->init(), conn, cb) - writeReturnSize( - secio_conn, - self->remote_peer_rand_, - [self, conn, cb, secio_conn](auto &&write_res) { - SECIO_OUTCOME_TRY(written_bytes, write_res, conn, cb) - if (written_bytes != self->remote_peer_rand_.size()) { - return cb(Error::INITIAL_PACKET_VERIFICATION_FAILED); - } - const auto kToRead{self->propose_message_.rand.size()}; - auto buffer = std::make_shared(kToRead); - readReturnSize( - secio_conn, - *buffer, - [self, cb, conn, secio_conn, buffer](auto &&read_res) { - SECIO_OUTCOME_TRY(read_bytes, read_res, conn, cb) - if (read_bytes != buffer->size() - or *buffer != self->propose_message_.rand) { - return cb(Error::INITIAL_PACKET_VERIFICATION_FAILED); - } - SL_TRACE(self->log_, "connection initialized"); - cb(secio_conn); - }); - }); + write(secio_conn, + self->remote_peer_rand_, + [self, conn, cb, secio_conn](outcome::result write_res) { + SECIO_OUTCOME_VOID_TRY(write_res, conn, cb); + const auto kToRead{self->propose_message_.rand.size()}; + auto buffer = std::make_shared(kToRead); + readReturnSize( + secio_conn, + *buffer, + [self, cb, conn, secio_conn, buffer](auto &&read_res) { + SECIO_OUTCOME_TRY(read_bytes, read_res, conn, cb) + if (read_bytes != buffer->size() + or *buffer != self->propose_message_.rand) { + return cb(Error::INITIAL_PACKET_VERIFICATION_FAILED); + } + SL_TRACE(self->log_, "connection initialized"); + cb(secio_conn); + }); + }); }); } diff --git a/src/security/secio/secio_connection.cpp b/src/security/secio/secio_connection.cpp index 78f6dd633..2454a4b72 100644 --- a/src/security/secio/secio_connection.cpp +++ b/src/security/secio/secio_connection.cpp @@ -10,7 +10,7 @@ #include #include -#include +#include #include #include #include diff --git a/test/acceptance/p2p/host/protocol/client_test_session.cpp b/test/acceptance/p2p/host/protocol/client_test_session.cpp index cb0988401..11760ccd8 100644 --- a/test/acceptance/p2p/host/protocol/client_test_session.cpp +++ b/test/acceptance/p2p/host/protocol/client_test_session.cpp @@ -8,7 +8,8 @@ #include #include #include -#include +#include +#include #include namespace libp2p::protocol { @@ -37,16 +38,13 @@ namespace libp2p::protocol { write_buf_ = random_generator_->randomBytes(buffer_size_); - writeReturnSize(stream_, - write_buf_, - [self = shared_from_this(), - cb{std::move(cb)}](outcome::result rw) mutable { - if (!rw) { - return cb(rw.error()); - } - - self->read(cb); - }); + libp2p::write(stream_, + write_buf_, + [self = shared_from_this(), + cb{std::move(cb)}](outcome::result result) mutable { + IF_ERROR_CB_RETURN(result); + self->read(cb); + }); } void ClientTestSession::read(Callback cb) { diff --git a/test/acceptance/p2p/muxer.cpp b/test/acceptance/p2p/muxer.cpp index 6782894e9..938a355f1 100644 --- a/test/acceptance/p2p/muxer.cpp +++ b/test/acceptance/p2p/muxer.cpp @@ -9,7 +9,7 @@ #include #include #include -#include +#include #include #include #include @@ -146,15 +146,11 @@ struct Server : public std::enable_shared_from_this { // 01-echo back read data buf->resize(read); - writeReturnSize( - stream, - *buf, - [buf, read, stream, this](outcome::result rwrite) { - ASSERT_OUTCOME_SUCCESS(write, rwrite); - this->println("write ", write, " bytes"); + libp2p::write( + stream, *buf, [buf, stream, this](outcome::result rwrite) { + ASSERT_OUTCOME_SUCCESS(rwrite); + this->println("write ", buf->size(), " bytes"); this->streamWrites++; - ASSERT_EQ(write, read); - this->onStream(buf, stream); }); }); @@ -244,27 +240,26 @@ struct Client : public std::enable_shared_from_this { } auto buf = randomBuffer(); - writeReturnSize( + libp2p::write( stream, *buf, - [round, streamId, buf, stream, this](outcome::result rwrite) { - ASSERT_OUTCOME_SUCCESS(write, rwrite); - this->println(streamId, " write ", write, " bytes"); + [round, streamId, buf, stream, this](outcome::result rwrite) { + ASSERT_OUTCOME_SUCCESS(rwrite); + this->println(streamId, " write ", buf->size(), " bytes"); this->streamWrites++; auto readbuf = std::make_shared>(); - readbuf->resize(write); + readbuf->resize(buf->size()); readReturnSize(stream, *readbuf, - [round, streamId, write, buf, readbuf, stream, this]( + [round, streamId, buf, readbuf, stream, this]( outcome::result rread) { ASSERT_OUTCOME_SUCCESS(read, rread); this->println( streamId, " readSome ", read, " bytes"); this->streamReads++; - ASSERT_EQ(write, read); ASSERT_EQ(*buf, *readbuf); this->onStream(streamId, round - 1, stream); diff --git a/test/libp2p/connection/loopback_stream/loopback_stream_test.cpp b/test/libp2p/connection/loopback_stream/loopback_stream_test.cpp index fa06a0509..00adc74f8 100644 --- a/test/libp2p/connection/loopback_stream/loopback_stream_test.cpp +++ b/test/libp2p/connection/loopback_stream/loopback_stream_test.cpp @@ -7,7 +7,7 @@ #include #include #include -#include +#include #include #include #include @@ -51,10 +51,11 @@ TEST_F(LoopbackStreamTest, Basic) { std::shared_ptr stream = std::make_shared(PeerInfo{peer_id, {}}, context); bool all_executed{false}; - libp2p::writeReturnSize( - stream, kBuffer, [stream, buf = kBuffer, &all_executed](auto result) { - ASSERT_OUTCOME_SUCCESS(bytes, result); - ASSERT_EQ(bytes, kBufferSize); + libp2p::write( + stream, + kBuffer, + [stream, buf = kBuffer, &all_executed](outcome::result result) { + ASSERT_OUTCOME_SUCCESS(result); auto read_buf = std::make_shared(kBufferSize, 0); ASSERT_EQ(read_buf->size(), kBufferSize); ASSERT_NE(*read_buf, buf); diff --git a/test/libp2p/connection/security_conn/plaintext_connection_test.cpp b/test/libp2p/connection/security_conn/plaintext_connection_test.cpp index eec964198..d54694e5e 100644 --- a/test/libp2p/connection/security_conn/plaintext_connection_test.cpp +++ b/test/libp2p/connection/security_conn/plaintext_connection_test.cpp @@ -6,7 +6,7 @@ #include #include -#include +#include #include #include #include @@ -140,9 +140,8 @@ TEST_F(PlaintextConnectionTest, Write) { const int size = 100; EXPECT_CALL_WRITE(*connection_).WILL_WRITE_SIZE(size); auto buf = std::make_shared>(size, 0); - libp2p::writeReturnSize(secure_connection_, *buf, [size, buf](auto &&res) { + libp2p::write(secure_connection_, *buf, [buf](outcome::result res) { ASSERT_OUTCOME_SUCCESS(res); - ASSERT_EQ(res.value(), size); }); } diff --git a/test/libp2p/muxer/muxers_and_streams_test.cpp b/test/libp2p/muxer/muxers_and_streams_test.cpp index af2fe5fe3..e51bd7d67 100644 --- a/test/libp2p/muxer/muxers_and_streams_test.cpp +++ b/test/libp2p/muxer/muxers_and_streams_test.cpp @@ -11,7 +11,7 @@ #include #include -#include +#include #include #define TRACE_ENABLED 1 @@ -188,14 +188,15 @@ namespace libp2p::regression { stats_.put(Stats::FATAL_ERROR); return behavior_(*this); } - writeReturnSize(stream, - *write_buf_, - [wptr = weak_from_this(), buf = write_buf_](auto res) { - auto self = wptr.lock(); - if (self) { - self->onWrite(res); - } - }); + libp2p::write(stream, + *write_buf_, + [wptr = weak_from_this(), + buf = write_buf_](outcome::result res) { + auto self = wptr.lock(); + if (self) { + self->onWrite(res); + } + }); } void stop() { @@ -278,12 +279,12 @@ namespace libp2p::regression { behavior_(*this); } - void onWrite(outcome::result res) { - if (!res || res.value() != write_buf_->size()) { + void onWrite(outcome::result res) { + if (not res.has_value()) { TRACE("({}): write error", stats_.node_id); stats_.put(Stats::WRITE_FAILURE); } else { - TRACE("({}): written {} bytes", stats_.node_id, res.value()); + TRACE("({}): written {} bytes", stats_.node_id, write_buf_->size()); stats_.put(Stats::WRITE); } behavior_(*this); diff --git a/test/libp2p/transport/tcp/tcp_integration_test.cpp b/test/libp2p/transport/tcp/tcp_integration_test.cpp index b824af075..c8fcbb69e 100644 --- a/test/libp2p/transport/tcp/tcp_integration_test.cpp +++ b/test/libp2p/transport/tcp/tcp_integration_test.cpp @@ -9,7 +9,7 @@ #include #include #include -#include +#include #include #include #include @@ -149,15 +149,15 @@ TEST(TCP, SingleListenerCanAcceptManyClients) { conn->readSome(*buf, [&counter, conn, buf, context](auto &&res) { ASSERT_OUTCOME_SUCCESS(res); - libp2p::writeReturnSize( - conn, *buf, [&counter, conn, buf, context](auto &&res) { - ASSERT_OUTCOME_SUCCESS(res); - EXPECT_EQ(res.value(), buf->size()); - counter++; - if (counter >= kClients) { - context->stop(); - } - }); + libp2p::write(conn, + *buf, + [&counter, conn, buf, context](outcome::result res) { + ASSERT_OUTCOME_SUCCESS(res); + counter++; + if (counter >= kClients) { + context->stop(); + } + }); }); }); @@ -182,10 +182,11 @@ TEST(TCP, SingleListenerCanAcceptManyClients) { EXPECT_TRUE(conn->isInitiator()); - libp2p::writeReturnSize( - conn, *buf, [conn, readback, buf, context](auto &&res) { + libp2p::write( + conn, + *buf, + [conn, readback, buf, context](outcome::result res) { ASSERT_OUTCOME_SUCCESS(res); - ASSERT_EQ(res.value(), buf->size()); libp2p::readReturnSize( conn, *readback, [conn, readback, buf, context](auto &&res) { context->stop(); @@ -314,11 +315,11 @@ TEST(TCP, OneTransportServerHandlesManyClients) { conn->readSome(*buf, [&counter, conn, buf](auto &&res) { ASSERT_OUTCOME_SUCCESS(res); - libp2p::writeReturnSize(conn, *buf, [&counter, buf, conn](auto &&res) { - ASSERT_OUTCOME_SUCCESS(res); - EXPECT_EQ(res.value(), buf->size()); - counter++; - }); + libp2p::write( + conn, *buf, [&counter, buf, conn](outcome::result res) { + ASSERT_OUTCOME_SUCCESS(res); + counter++; + }); }); }); @@ -340,10 +341,9 @@ TEST(TCP, OneTransportServerHandlesManyClients) { EXPECT_TRUE(conn->isInitiator()); - libp2p::writeReturnSize( - conn, *buf, [conn, kSize, readback, buf](auto &&res) { + libp2p::write( + conn, *buf, [conn, readback, buf](outcome::result res) { ASSERT_OUTCOME_SUCCESS(res); - ASSERT_EQ(res.value(), buf->size()); libp2p::readReturnSize( conn, *readback, [conn, readback, buf](auto &&res) { ASSERT_OUTCOME_SUCCESS(res); From effd5e20db16d4f849d36b4f9c81a9c9e2910e56 Mon Sep 17 00:00:00 2001 From: turuslan Date: Tue, 15 Jul 2025 11:07:40 +0500 Subject: [PATCH 3/4] message writer returns void Signed-off-by: turuslan --- include/libp2p/basic/message_read_writer.hpp | 3 +- .../basic/message_read_writer_bigendian.hpp | 2 +- .../basic/message_read_writer_uvarint.hpp | 2 +- .../basic/protobuf_message_read_writer.hpp | 2 +- .../identify/identify_msg_processor.hpp | 2 +- include/libp2p/security/noise/handshake.hpp | 3 +- include/libp2p/security/noise/insecure_rw.hpp | 2 +- src/basic/message_read_writer_bigendian.cpp | 3 +- src/basic/message_read_writer_uvarint.cpp | 3 +- .../identify/identify_msg_processor.cpp | 6 +-- src/security/noise/handshake.cpp | 42 ++++++++++--------- src/security/noise/insecure_rw.cpp | 6 +-- src/security/noise/noise_connection.cpp | 2 +- .../libp2p/basic/message_read_writer_test.cpp | 3 +- 14 files changed, 41 insertions(+), 40 deletions(-) diff --git a/include/libp2p/basic/message_read_writer.hpp b/include/libp2p/basic/message_read_writer.hpp index d78061b5e..184c8c80d 100644 --- a/include/libp2p/basic/message_read_writer.hpp +++ b/include/libp2p/basic/message_read_writer.hpp @@ -8,6 +8,7 @@ #include +#include #include namespace libp2p::basic { @@ -36,6 +37,6 @@ namespace libp2p::basic { * @param cb is called when the message is written or an error happened. * Quantity of bytes written is passed as an argument in case of success */ - virtual void write(BytesIn buffer, Writer::WriteCallbackFunc cb) = 0; + virtual void write(BytesIn buffer, CbOutcomeVoid cb) = 0; }; } // namespace libp2p::basic diff --git a/include/libp2p/basic/message_read_writer_bigendian.hpp b/include/libp2p/basic/message_read_writer_bigendian.hpp index 24ff3fdb9..e0e76c170 100644 --- a/include/libp2p/basic/message_read_writer_bigendian.hpp +++ b/include/libp2p/basic/message_read_writer_bigendian.hpp @@ -39,7 +39,7 @@ namespace libp2p::basic { * @param buffer - the message to be written * @param cb, which is called, when the message is read or error happens */ - void write(BytesIn buffer, Writer::WriteCallbackFunc cb) override; + void write(BytesIn buffer, CbOutcomeVoid cb) override; private: std::shared_ptr conn_; diff --git a/include/libp2p/basic/message_read_writer_uvarint.hpp b/include/libp2p/basic/message_read_writer_uvarint.hpp index c0934878c..5abe5733b 100644 --- a/include/libp2p/basic/message_read_writer_uvarint.hpp +++ b/include/libp2p/basic/message_read_writer_uvarint.hpp @@ -35,7 +35,7 @@ namespace libp2p::basic { * @param buffer - the message to be written * @param cb, which is called, when the message is read or error happens */ - void write(BytesIn buffer, Writer::WriteCallbackFunc cb) override; + void write(BytesIn buffer, CbOutcomeVoid cb) override; private: std::shared_ptr conn_; diff --git a/include/libp2p/basic/protobuf_message_read_writer.hpp b/include/libp2p/basic/protobuf_message_read_writer.hpp index 4d73c5609..85bb8a3a8 100644 --- a/include/libp2p/basic/protobuf_message_read_writer.hpp +++ b/include/libp2p/basic/protobuf_message_read_writer.hpp @@ -72,7 +72,7 @@ namespace libp2p::basic { typename = std::enable_if_t< std::is_default_constructible::value>> void write(const ProtoMsgType &msg, - Writer::WriteCallbackFunc cb, + CbOutcomeVoid cb, const std::shared_ptr> &bytes = nullptr) { auto msg_bytes = std::make_shared>(msg.ByteSize(), 0); diff --git a/include/libp2p/protocol/identify/identify_msg_processor.hpp b/include/libp2p/protocol/identify/identify_msg_processor.hpp index 7318d9e46..c9689b936 100644 --- a/include/libp2p/protocol/identify/identify_msg_processor.hpp +++ b/include/libp2p/protocol/identify/identify_msg_processor.hpp @@ -80,7 +80,7 @@ namespace libp2p::protocol { * @param written_bytes - how much bytes were written * @param stream with the other side */ - void identifySent(outcome::result written_bytes, + void identifySent(outcome::result written_bytes, const StreamSPtr &stream); /** diff --git a/include/libp2p/security/noise/handshake.hpp b/include/libp2p/security/noise/handshake.hpp index 849b2b5f1..81b0bdd16 100644 --- a/include/libp2p/security/noise/handshake.hpp +++ b/include/libp2p/security/noise/handshake.hpp @@ -46,8 +46,7 @@ namespace libp2p::security::noise { outcome::result> generateHandshakePayload( const DHKey &keypair); - void sendHandshakeMessage(BytesIn payload, - basic::Writer::WriteCallbackFunc cb); + void sendHandshakeMessage(BytesIn payload, CbOutcomeVoid cb); void readHandshakeMessage(basic::MessageReadWriter::ReadCallbackFunc cb); diff --git a/include/libp2p/security/noise/insecure_rw.hpp b/include/libp2p/security/noise/insecure_rw.hpp index d12129f61..ae0bc7109 100644 --- a/include/libp2p/security/noise/insecure_rw.hpp +++ b/include/libp2p/security/noise/insecure_rw.hpp @@ -38,7 +38,7 @@ namespace libp2p::security::noise { void read(ReadCallbackFunc cb) override; /// write the given bytes to the network - void write(BytesIn buffer, basic::Writer::WriteCallbackFunc cb) override; + void write(BytesIn buffer, CbOutcomeVoid cb) override; private: std::shared_ptr connection_; diff --git a/src/basic/message_read_writer_bigendian.cpp b/src/basic/message_read_writer_bigendian.cpp index f0ca9e39a..5e7d3b998 100644 --- a/src/basic/message_read_writer_bigendian.cpp +++ b/src/basic/message_read_writer_bigendian.cpp @@ -47,8 +47,7 @@ namespace libp2p::basic { }); } - void MessageReadWriterBigEndian::write(BytesIn buffer, - Writer::WriteCallbackFunc cb) { + void MessageReadWriterBigEndian::write(BytesIn buffer, CbOutcomeVoid cb) { if (buffer.empty()) { // TODO(107): Reentrancy return cb(MessageReadWriterError::BUFFER_IS_EMPTY); diff --git a/src/basic/message_read_writer_uvarint.cpp b/src/basic/message_read_writer_uvarint.cpp index 5810252be..4ff522408 100644 --- a/src/basic/message_read_writer_uvarint.cpp +++ b/src/basic/message_read_writer_uvarint.cpp @@ -51,8 +51,7 @@ namespace libp2p::basic { }); } - void MessageReadWriterUvarint::write(BytesIn buffer, - Writer::WriteCallbackFunc cb) { + void MessageReadWriterUvarint::write(BytesIn buffer, CbOutcomeVoid cb) { auto varint_len = multi::UVarint{static_cast(buffer.size())}; auto msg_bytes = std::make_shared>(); diff --git a/src/protocol/identify/identify_msg_processor.cpp b/src/protocol/identify/identify_msg_processor.cpp index b79642022..64a141ab2 100644 --- a/src/protocol/identify/identify_msg_processor.cpp +++ b/src/protocol/identify/identify_msg_processor.cpp @@ -96,13 +96,13 @@ namespace libp2p::protocol { rw->write( msg, [self{shared_from_this()}, - stream = std::move(stream)](auto &&res) mutable { - self->identifySent(std::forward(res), stream); + stream = std::move(stream)](outcome::result result) mutable { + self->identifySent(result, stream); }); } void IdentifyMessageProcessor::identifySent( - outcome::result written_bytes, const StreamSPtr &stream) { + outcome::result written_bytes, const StreamSPtr &stream) { auto [peer_id, peer_addr] = detail::getPeerIdentity(stream); if (!written_bytes) { log_->error("cannot write identify message to stream to peer {}, {}: {}", diff --git a/src/security/noise/handshake.cpp b/src/security/noise/handshake.cpp index b7f7a90b8..110ece087 100644 --- a/src/security/noise/handshake.cpp +++ b/src/security/noise/handshake.cpp @@ -9,6 +9,7 @@ #include #include +#include #include #include #include @@ -18,6 +19,12 @@ #include #include +#define IF_ERROR_HSCB_RETURN(result) \ + ({ \ + auto cb = [&](std::error_code ec) { self->hscb(ec); }; \ + IF_ERROR_CB_RETURN(result); \ + }) + #ifndef UNIQUE_NAME #define UNIQUE_NAME(base) base##__LINE__ #endif // UNIQUE_NAME @@ -108,18 +115,17 @@ namespace libp2p::security::noise { return noise_marshaller_->marshal(payload); } - void Handshake::sendHandshakeMessage(BytesIn payload, - basic::Writer::WriteCallbackFunc cb) { + void Handshake::sendHandshakeMessage(BytesIn payload, CbOutcomeVoid cb) { IO_OUTCOME_TRY( write_result, handshake_state_->writeMessage({}, payload), cb); auto write_cb = [self{shared_from_this()}, cb{std::move(cb)}, - wr{write_result}](outcome::result result) { - IO_OUTCOME_TRY(bytes_written, result, cb); + wr{write_result}](outcome::result result) { + IF_ERROR_CB_RETURN(result); if (wr.cs1 and wr.cs2) { self->setCipherStates(wr.cs1, wr.cs2); } - cb(bytes_written); + cb(outcome::success()); }; rw_->write(write_result.data, write_cb); } @@ -194,11 +200,9 @@ namespace libp2p::security::noise { SL_TRACE(log_, "outgoing connection. stage 0"); sendHandshakeMessage( {}, - [self{shared_from_this()}, payload{std::move(payload)}](auto result) { - IO_OUTCOME_TRY(bytes_written, result, self->hscb); - if (0 == bytes_written) { - return self->hscb(std::errc::bad_message); - } + [self{shared_from_this()}, + payload{std::move(payload)}](outcome::result result) { + IF_ERROR_HSCB_RETURN(result); // // Outgoing connection. Stage 1 // @@ -214,12 +218,12 @@ namespace libp2p::security::noise { // Outgoing connection. Stage 2 // SL_TRACE(self->log_, "outgoing connection. stage 2"); - self->sendHandshakeMessage( - payload, [self, to_write(payload.size())](auto result) { - IO_OUTCOME_TRY(bytes_written, result, self->hscb); - unused(bytes_written); - self->hscb(true); - }); + self->sendHandshakeMessage(payload, + [self, to_write(payload.size())]( + outcome::result result) { + IF_ERROR_HSCB_RETURN(result); + self->hscb(true); + }); }); }); } else { @@ -240,9 +244,9 @@ namespace libp2p::security::noise { // SL_TRACE(self->log_, "incoming connection. stage 1"); self->sendHandshakeMessage( - payload, [self, to_write(payload.size())](auto result) { - IO_OUTCOME_TRY(bytes_written, result, self->hscb); - unused(bytes_written); + payload, + [self, to_write(payload.size())](outcome::result result) { + IF_ERROR_HSCB_RETURN(result); // // Incoming connection. Stage 2 // diff --git a/src/security/noise/insecure_rw.cpp b/src/security/noise/insecure_rw.cpp index a8d612f81..70bf3ed33 100644 --- a/src/security/noise/insecure_rw.cpp +++ b/src/security/noise/insecure_rw.cpp @@ -38,8 +38,7 @@ namespace libp2p::security::noise { libp2p::read(connection_, *buffer_, std::move(read_cb)); } - void InsecureReadWriter::write(BytesIn buffer, - basic::Writer::WriteCallbackFunc cb) { + void InsecureReadWriter::write(BytesIn buffer, CbOutcomeVoid cb) { if (buffer.size() > static_cast(kMaxMsgLen)) { return cb(std::errc::message_size); } @@ -49,8 +48,9 @@ namespace libp2p::security::noise { outbuf_.insert(outbuf_.end(), buffer.begin(), buffer.end()); auto write_cb = [self{shared_from_this()}, buffer, cb{std::move(cb)}]( outcome::result result) { + std::ignore = buffer; IF_ERROR_CB_RETURN(result); - cb(buffer.size()); + cb(outcome::success()); }; libp2p::write(connection_, outbuf_, std::move(write_cb)); } diff --git a/src/security/noise/noise_connection.cpp b/src/security/noise/noise_connection.cpp index 8a2003145..ac5108fbd 100644 --- a/src/security/noise/noise_connection.cpp +++ b/src/security/noise/noise_connection.cpp @@ -79,7 +79,7 @@ namespace libp2p::connection { // `InsecureReadWriter::write` doesn't leak `BytesIn` reference framer_->write( encrypted, - [in, cb{std::move(cb)}](outcome::result result) mutable { + [in, cb{std::move(cb)}](outcome::result result) mutable { IF_ERROR_CB_RETURN(result); cb(in.size()); }); diff --git a/test/libp2p/basic/message_read_writer_test.cpp b/test/libp2p/basic/message_read_writer_test.cpp index 0a4f49344..71fdfe47f 100644 --- a/test/libp2p/basic/message_read_writer_test.cpp +++ b/test/libp2p/basic/message_read_writer_test.cpp @@ -60,9 +60,8 @@ TEST_F(MessageReadWriterTest, Read) { TEST_F(MessageReadWriterTest, Write) { EXPECT_CALL_WRITE(*conn_mock_).WILL_WRITE(msg_with_varint_bytes_); - msg_rw_->write(msg_bytes_, [this](auto &&res) { + msg_rw_->write(msg_bytes_, [this](outcome::result res) { ASSERT_TRUE(res); - ASSERT_EQ(res.value(), msg_bytes_.size()); operation_completed_ = true; }); From 59dd0eee053b470538f583fa636502a24f72b46f Mon Sep 17 00:00:00 2001 From: turuslan Date: Tue, 15 Jul 2025 11:29:10 +0500 Subject: [PATCH 4/4] remove readReturnSize Signed-off-by: turuslan --- include/libp2p/basic/read_return_size.hpp | 26 ------------- .../protocol/ping/ping_client_session.hpp | 2 +- .../multiselect/multiselect_instance.hpp | 2 +- src/basic/message_read_writer_bigendian.cpp | 38 +++++++++---------- src/basic/message_read_writer_uvarint.cpp | 18 ++++----- src/basic/varint_reader.cpp | 36 +++++++++--------- src/connection/loopback_stream.cpp | 2 - src/layer/websocket/ssl_connection.cpp | 1 - src/layer/websocket/ws_connection.cpp | 1 - src/muxer/mplex/mplex_frame.cpp | 20 +++++----- src/muxer/mplex/mplex_stream.cpp | 1 - src/muxer/mplex/mplexed_connection.cpp | 1 - src/muxer/yamux/yamux_stream.cpp | 1 - src/protocol/gossip/impl/stream.cpp | 32 ++++++++-------- src/protocol/gossip/impl/stream.hpp | 2 +- src/protocol/ping/ping_client_session.cpp | 14 +++---- src/protocol/ping/ping_server_session.cpp | 17 +++++---- .../multiselect/multiselect_instance.cpp | 32 +++++++--------- .../multiselect/simple_stream_negotiate.cpp | 18 ++++----- src/security/noise/noise_connection.cpp | 1 - .../plaintext/plaintext_connection.cpp | 1 - src/security/secio/secio.cpp | 25 ++++++------ src/security/secio/secio_connection.cpp | 2 +- src/security/tls/tls_connection.cpp | 1 - src/transport/quic/stream.cpp | 1 - src/transport/tcp/tcp_connection.cpp | 1 - .../p2p/host/protocol/client_test_session.cpp | 22 +++++------ test/acceptance/p2p/muxer.cpp | 24 ++++++------ .../loopback_stream/loopback_stream_test.cpp | 19 +++++----- .../plaintext_connection_test.cpp | 5 +-- test/libp2p/muxer/muxers_and_streams_test.cpp | 25 ++++++------ .../transport/tcp/tcp_integration_test.cpp | 21 +++++----- .../connection/capable_connection_mock.hpp | 1 - .../connection/layer_connection_mock.hpp | 1 - test/mock/libp2p/connection/stream_mock.hpp | 1 - 35 files changed, 177 insertions(+), 238 deletions(-) delete mode 100644 include/libp2p/basic/read_return_size.hpp diff --git a/include/libp2p/basic/read_return_size.hpp b/include/libp2p/basic/read_return_size.hpp deleted file mode 100644 index 423fec02c..000000000 --- a/include/libp2p/basic/read_return_size.hpp +++ /dev/null @@ -1,26 +0,0 @@ -/** - * Copyright Quadrivium LLC - * All Rights Reserved - * SPDX-License-Identifier: Apache-2.0 - */ - -#pragma once - -#include - -namespace libp2p { - /// Read exactly `out.size()` bytes - inline void readReturnSize(const std::shared_ptr &reader, - BytesOut out, - basic::Reader::ReadCallbackFunc cb) { - read(reader, - out, - [n{out.size()}, cb{std::move(cb)}](outcome::result r) { - if (r.has_error()) { - cb(r.error()); - } else { - cb(n); - } - }); - } -} // namespace libp2p diff --git a/include/libp2p/protocol/ping/ping_client_session.hpp b/include/libp2p/protocol/ping/ping_client_session.hpp index 89bc0f762..a70d22e91 100644 --- a/include/libp2p/protocol/ping/ping_client_session.hpp +++ b/include/libp2p/protocol/ping/ping_client_session.hpp @@ -50,7 +50,7 @@ namespace libp2p::protocol { void read(); - void readCompleted(outcome::result r); + void readCompleted(outcome::result r); void close(); diff --git a/include/libp2p/protocol_muxer/multiselect/multiselect_instance.hpp b/include/libp2p/protocol_muxer/multiselect/multiselect_instance.hpp index 37bd76b49..a6017fe3d 100644 --- a/include/libp2p/protocol_muxer/multiselect/multiselect_instance.hpp +++ b/include/libp2p/protocol_muxer/multiselect/multiselect_instance.hpp @@ -69,7 +69,7 @@ namespace libp2p::protocol_muxer::multiselect { void receive(); /// Called on read operations completion - void onDataRead(outcome::result res); + void onDataRead(outcome::result res); /// Processes parsed messages, called from onDataRead MaybeResult processMessages(); diff --git a/src/basic/message_read_writer_bigendian.cpp b/src/basic/message_read_writer_bigendian.cpp index 5e7d3b998..67c95c643 100644 --- a/src/basic/message_read_writer_bigendian.cpp +++ b/src/basic/message_read_writer_bigendian.cpp @@ -11,7 +11,7 @@ #include #include #include -#include +#include #include #include #include @@ -26,25 +26,23 @@ namespace libp2p::basic { void MessageReadWriterBigEndian::read(ReadCallbackFunc cb) { auto buffer = std::make_shared>(); buffer->resize(kLenMarkerSize); - readReturnSize( - conn_, - *buffer, - [self{shared_from_this()}, buffer, cb{std::move(cb)}](auto &&result) { - if (not result) { - return cb(result.error()); - } - uint32_t msg_len = ntohl( // NOLINT - common::convert(buffer->data())); - buffer->resize(msg_len); - std::fill(buffer->begin(), buffer->end(), 0u); - readReturnSize( - self->conn_, *buffer, [self, buffer, cb](auto &&result) { - if (not result) { - return cb(result.error()); - } - cb(buffer); - }); - }); + libp2p::read(conn_, + *buffer, + [self{shared_from_this()}, buffer, cb{std::move(cb)}]( + outcome::result result) { + IF_ERROR_CB_RETURN(result); + uint32_t msg_len = ntohl( // NOLINT + common::convert(buffer->data())); + buffer->resize(msg_len); + std::fill(buffer->begin(), buffer->end(), 0u); + libp2p::read( + self->conn_, + *buffer, + [self, buffer, cb](outcome::result result) { + IF_ERROR_CB_RETURN(result); + cb(buffer); + }); + }); } void MessageReadWriterBigEndian::write(BytesIn buffer, CbOutcomeVoid cb) { diff --git a/src/basic/message_read_writer_uvarint.cpp b/src/basic/message_read_writer_uvarint.cpp index 4ff522408..58b9fcce1 100644 --- a/src/basic/message_read_writer_uvarint.cpp +++ b/src/basic/message_read_writer_uvarint.cpp @@ -11,7 +11,7 @@ #include #include #include -#include +#include #include #include #include @@ -36,15 +36,13 @@ namespace libp2p::basic { auto msg_len = varint_res.value().toUInt64(); if (0 != msg_len) { auto buffer = std::make_shared>(msg_len, 0); - readReturnSize( - self->conn_, - *buffer, - [self, buffer, cb = std::move(cb)](auto &&res) mutable { - if (!res) { - return cb(res.error()); - } - cb(std::move(buffer)); - }); + libp2p::read(self->conn_, + *buffer, + [self, buffer, cb = std::move(cb)]( + outcome::result result) mutable { + IF_ERROR_CB_RETURN(result); + cb(std::move(buffer)); + }); } else { cb(ResultType{}); } diff --git a/src/basic/varint_reader.cpp b/src/basic/varint_reader.cpp index 9ea567d0a..94a08def0 100644 --- a/src/basic/varint_reader.cpp +++ b/src/basic/varint_reader.cpp @@ -4,8 +4,9 @@ * SPDX-License-Identifier: Apache-2.0 */ -#include +#include #include +#include #include @@ -44,25 +45,22 @@ namespace libp2p::basic { return cb(Error::NO_VARINT); } - // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic) - readReturnSize(conn, - std::span(varint_buf->data() + current_length, 1), - [c = conn, cb = std::move(cb), current_length, varint_buf]( - auto &&res) mutable { - if (not res.has_value()) { - return cb(res.error()); - } + read(conn, + BytesOut{*varint_buf}.subspan(current_length, 1), + [c = conn, cb = std::move(cb), current_length, varint_buf]( + outcome::result result) mutable { + IF_ERROR_CB_RETURN(result); - auto varint_opt = multi::UVarint::create( - std::span(varint_buf->data(), current_length + 1)); - if (varint_opt) { - return cb(*varint_opt); - } + auto varint_opt = multi::UVarint::create( + BytesIn{*varint_buf}.first(current_length + 1)); + if (varint_opt) { + return cb(*varint_opt); + } - readVarint(std::move(c), - std::move(cb), - ++current_length, - std::move(varint_buf)); - }); + readVarint(std::move(c), + std::move(cb), + current_length + 1, + std::move(varint_buf)); + }); } } // namespace libp2p::basic diff --git a/src/connection/loopback_stream.cpp b/src/connection/loopback_stream.cpp index 6039c77d2..d1e065204 100644 --- a/src/connection/loopback_stream.cpp +++ b/src/connection/loopback_stream.cpp @@ -6,8 +6,6 @@ #include -#include - namespace libp2p::connection { namespace { diff --git a/src/layer/websocket/ssl_connection.cpp b/src/layer/websocket/ssl_connection.cpp index 4c3804160..ade57ac9d 100644 --- a/src/layer/websocket/ssl_connection.cpp +++ b/src/layer/websocket/ssl_connection.cpp @@ -6,7 +6,6 @@ #include -#include #include #include diff --git a/src/layer/websocket/ws_connection.cpp b/src/layer/websocket/ws_connection.cpp index 140b12e76..39cd3e4d2 100644 --- a/src/layer/websocket/ws_connection.cpp +++ b/src/layer/websocket/ws_connection.cpp @@ -6,7 +6,6 @@ #include -#include #include #include #include diff --git a/src/muxer/mplex/mplex_frame.cpp b/src/muxer/mplex/mplex_frame.cpp index 6272bfdb9..40c5ec601 100644 --- a/src/muxer/mplex/mplex_frame.cpp +++ b/src/muxer/mplex/mplex_frame.cpp @@ -6,8 +6,9 @@ #include -#include +#include #include +#include #include #include @@ -101,16 +102,13 @@ namespace libp2p::connection { // read data std::shared_ptr data = std::make_shared(length, 0); - readReturnSize(reader, - *data, - [id_flag, data, cb{std::move(cb)}]( - auto &&read_res) mutable { - if (!read_res) { - return cb(read_res.error()); - } - - cb(createFrame(id_flag, std::move(*data))); - }); + libp2p::read(reader, + *data, + [id_flag, data, cb{std::move(cb)}]( + outcome::result result) mutable { + IF_ERROR_CB_RETURN(result); + cb(createFrame(id_flag, std::move(*data))); + }); }); }); } diff --git a/src/muxer/mplex/mplex_stream.cpp b/src/muxer/mplex/mplex_stream.cpp index 20ab1612b..ecf06cdf4 100644 --- a/src/muxer/mplex/mplex_stream.cpp +++ b/src/muxer/mplex/mplex_stream.cpp @@ -9,7 +9,6 @@ #include #include -#include #include #include diff --git a/src/muxer/mplex/mplexed_connection.cpp b/src/muxer/mplex/mplexed_connection.cpp index ae8bef688..50d8d94a7 100644 --- a/src/muxer/mplex/mplexed_connection.cpp +++ b/src/muxer/mplex/mplexed_connection.cpp @@ -7,7 +7,6 @@ #include #include -#include #include #include diff --git a/src/muxer/yamux/yamux_stream.cpp b/src/muxer/yamux/yamux_stream.cpp index e16f2037f..173e94e58 100644 --- a/src/muxer/yamux/yamux_stream.cpp +++ b/src/muxer/yamux/yamux_stream.cpp @@ -8,7 +8,6 @@ #include -#include #include #include diff --git a/src/protocol/gossip/impl/stream.cpp b/src/protocol/gossip/impl/stream.cpp index 4cc2d91ec..2b18a9f47 100644 --- a/src/protocol/gossip/impl/stream.cpp +++ b/src/protocol/gossip/impl/stream.cpp @@ -8,7 +8,7 @@ #include -#include +#include #include #include @@ -81,18 +81,18 @@ namespace libp2p::protocol::gossip { read_buffer_->resize(msg_len); - readReturnSize(stream_, - std::span(read_buffer_->data(), msg_len), - [self_wptr = weak_from_this(), this, buffer = read_buffer_]( - auto &&res) { - if (self_wptr.expired()) { - return; - } - onMessageRead(std::forward(res)); - }); + libp2p::read(stream_, + std::span(read_buffer_->data(), msg_len), + [self_wptr = weak_from_this(), this, buffer = read_buffer_]( + outcome::result res) { + if (self_wptr.expired()) { + return; + } + onMessageRead(std::forward(res)); + }); } - void Stream::onMessageRead(outcome::result res) { + void Stream::onMessageRead(outcome::result res) { if (!reading_) { return; } @@ -104,12 +104,10 @@ namespace libp2p::protocol::gossip { return; } - TRACE("read {} bytes from {}:{}", res.value(), peer_->str, stream_id_); - - if (read_buffer_->size() != res.value()) { - feedback_(peer_, Error::MESSAGE_PARSE_ERROR); - return; - } + TRACE("read {} bytes from {}:{}", + read_buffer_->size(), + peer_->str, + stream_id_); MessageParser parser; if (!parser.parse(*read_buffer_)) { diff --git a/src/protocol/gossip/impl/stream.hpp b/src/protocol/gossip/impl/stream.hpp index 070afaefd..cc5b317be 100644 --- a/src/protocol/gossip/impl/stream.hpp +++ b/src/protocol/gossip/impl/stream.hpp @@ -50,7 +50,7 @@ namespace libp2p::protocol::gossip { private: void onLengthRead(outcome::result varint); - void onMessageRead(outcome::result res); + void onMessageRead(outcome::result res); void beginWrite(SharedBuffer buffer); void onMessageWritten(outcome::result res); void endWrite(); diff --git a/src/protocol/ping/ping_client_session.cpp b/src/protocol/ping/ping_client_session.cpp index fbe5687f8..abca88479 100644 --- a/src/protocol/ping/ping_client_session.cpp +++ b/src/protocol/ping/ping_client_session.cpp @@ -7,7 +7,7 @@ #include #include -#include +#include #include #include @@ -85,14 +85,14 @@ namespace libp2p::protocol { }, config_.timeout); - readReturnSize(stream_, - read_buffer_, - [self{shared_from_this()}](outcome::result r) { - self->readCompleted(r); - }); + libp2p::read(stream_, + read_buffer_, + [self{shared_from_this()}](outcome::result r) { + self->readCompleted(r); + }); } - void PingClientSession::readCompleted(outcome::result r) { + void PingClientSession::readCompleted(outcome::result r) { timer_.reset(); if (r.has_error() || write_buffer_ != read_buffer_) { // again, in case of any error we cannot continue to ping the peer and diff --git a/src/protocol/ping/ping_server_session.cpp b/src/protocol/ping/ping_server_session.cpp index 361462509..77be9204c 100644 --- a/src/protocol/ping/ping_server_session.cpp +++ b/src/protocol/ping/ping_server_session.cpp @@ -7,7 +7,7 @@ #include #include -#include +#include #include #include @@ -32,13 +32,14 @@ namespace libp2p::protocol { return; } - readReturnSize( - stream_, buffer_, [self{shared_from_this()}](auto &&read_res) { - if (!read_res) { - return; - } - self->readCompleted(); - }); + libp2p::read(stream_, + buffer_, + [self{shared_from_this()}](outcome::result read_res) { + if (!read_res) { + return; + } + self->readCompleted(); + }); } void PingServerSession::readCompleted() { diff --git a/src/protocol_muxer/multiselect/multiselect_instance.cpp b/src/protocol_muxer/multiselect/multiselect_instance.cpp index 9fd1080cb..56e3b7acb 100644 --- a/src/protocol_muxer/multiselect/multiselect_instance.cpp +++ b/src/protocol_muxer/multiselect/multiselect_instance.cpp @@ -9,7 +9,7 @@ #include #include -#include +#include #include #include #include @@ -211,31 +211,25 @@ namespace libp2p::protocol_muxer::multiselect { BytesOut span(*read_buffer_); span = span.first(static_cast(bytes_needed)); - readReturnSize(connection_, - span, - [wptr = weak_from_this(), - round = current_round_, - packet = read_buffer_](outcome::result res) { - auto self = wptr.lock(); - if (self && self->current_round_ == round) { - self->onDataRead(res); - } - }); + libp2p::read(connection_, + span, + [wptr = weak_from_this(), + round = current_round_, + packet = read_buffer_](outcome::result res) { + auto self = wptr.lock(); + if (self && self->current_round_ == round) { + self->onDataRead(res); + } + }); } - void MultiselectInstance::onDataRead(outcome::result res) { + void MultiselectInstance::onDataRead(outcome::result res) { if (!res) { return close(res.error()); } - auto bytes_read = res.value(); - if (bytes_read > read_buffer_->size()) { - log()->error("onDataRead(): invalid state"); - return close(ProtocolMuxer::Error::INTERNAL_ERROR); - } - BytesIn span(*read_buffer_); - span = span.first(static_cast(bytes_read)); + span = span.first(static_cast(parser_.bytesNeeded())); boost::optional> got_result; diff --git a/src/protocol_muxer/multiselect/simple_stream_negotiate.cpp b/src/protocol_muxer/multiselect/simple_stream_negotiate.cpp index 6651f334e..fc964406a 100644 --- a/src/protocol_muxer/multiselect/simple_stream_negotiate.cpp +++ b/src/protocol_muxer/multiselect/simple_stream_negotiate.cpp @@ -6,7 +6,7 @@ #include -#include +#include #include #include #include @@ -43,7 +43,7 @@ namespace libp2p::protocol_muxer::multiselect { void onLastBytesRead(StreamPtr stream, const Callback &cb, const Buffers &buffers, - outcome::result res) { + outcome::result res) { if (!res) { return failed(stream, cb, res.error()); } @@ -53,15 +53,11 @@ namespace libp2p::protocol_muxer::multiselect { void onFirstBytesRead(StreamPtr stream, Callback cb, std::shared_ptr buffers, - outcome::result res) { + outcome::result res) { if (!res) { return failed(stream, cb, res.error()); } - if (res.value() != kMaxVarintSize) { - return failed(stream, cb, ProtocolMuxer::Error::INTERNAL_ERROR); - } - auto total_sz = buffers->written.size(); if (total_sz == kMaxVarintSize) { // protocol_id consists of 1 byte, not standard but possible @@ -77,11 +73,11 @@ namespace libp2p::protocol_muxer::multiselect { // NOLINTNEXTLINE(cppcoreguidelines-narrowing-conversions) span = span.subspan(kMaxVarintSize, remaining_bytes); - readReturnSize( + libp2p::read( stream, span, [stream = stream, cb = std::move(cb), buffers = std::move(buffers)]( - outcome::result res) mutable { + outcome::result res) mutable { onLastBytesRead(std::move(stream), cb, *buffers, res); }); } @@ -97,11 +93,11 @@ namespace libp2p::protocol_muxer::multiselect { BytesOut span(buffers->read); span = span.first(kMaxVarintSize); - readReturnSize( + libp2p::read( stream, span, [stream = stream, cb = std::move(cb), buffers = std::move(buffers)]( - outcome::result res) mutable { + outcome::result res) mutable { onFirstBytesRead(stream, std::move(cb), std::move(buffers), res); }); } diff --git a/src/security/noise/noise_connection.cpp b/src/security/noise/noise_connection.cpp index ac5108fbd..3b740294c 100644 --- a/src/security/noise/noise_connection.cpp +++ b/src/security/noise/noise_connection.cpp @@ -6,7 +6,6 @@ #include -#include #include #include #include diff --git a/src/security/plaintext/plaintext_connection.cpp b/src/security/plaintext/plaintext_connection.cpp index 3914dbbf4..97cab696b 100644 --- a/src/security/plaintext/plaintext_connection.cpp +++ b/src/security/plaintext/plaintext_connection.cpp @@ -7,7 +7,6 @@ #include #include -#include #include namespace libp2p::connection { diff --git a/src/security/secio/secio.cpp b/src/security/secio/secio.cpp index c803b0b54..4fa75da7d 100644 --- a/src/security/secio/secio.cpp +++ b/src/security/secio/secio.cpp @@ -8,7 +8,7 @@ #include #include -#include +#include #include #include #include @@ -262,18 +262,17 @@ namespace libp2p::security { SECIO_OUTCOME_VOID_TRY(write_res, conn, cb); const auto kToRead{self->propose_message_.rand.size()}; auto buffer = std::make_shared(kToRead); - readReturnSize( - secio_conn, - *buffer, - [self, cb, conn, secio_conn, buffer](auto &&read_res) { - SECIO_OUTCOME_TRY(read_bytes, read_res, conn, cb) - if (read_bytes != buffer->size() - or *buffer != self->propose_message_.rand) { - return cb(Error::INITIAL_PACKET_VERIFICATION_FAILED); - } - SL_TRACE(self->log_, "connection initialized"); - cb(secio_conn); - }); + read(secio_conn, + *buffer, + [self, cb, conn, secio_conn, buffer]( + outcome::result read_res) { + SECIO_OUTCOME_VOID_TRY(read_res, conn, cb) + if (*buffer != self->propose_message_.rand) { + return cb(Error::INITIAL_PACKET_VERIFICATION_FAILED); + } + SL_TRACE(self->log_, "connection initialized"); + cb(secio_conn); + }); }); }); } diff --git a/src/security/secio/secio_connection.cpp b/src/security/secio/secio_connection.cpp index 2454a4b72..c4f09e558 100644 --- a/src/security/secio/secio_connection.cpp +++ b/src/security/secio/secio_connection.cpp @@ -9,7 +9,7 @@ #include #include -#include +#include #include #include #include diff --git a/src/security/tls/tls_connection.cpp b/src/security/tls/tls_connection.cpp index bc6bce743..b0f22d9da 100644 --- a/src/security/tls/tls_connection.cpp +++ b/src/security/tls/tls_connection.cpp @@ -6,7 +6,6 @@ #include "tls_connection.hpp" -#include #include #include diff --git a/src/transport/quic/stream.cpp b/src/transport/quic/stream.cpp index 27435958f..1adb3f527 100644 --- a/src/transport/quic/stream.cpp +++ b/src/transport/quic/stream.cpp @@ -5,7 +5,6 @@ */ #include -#include #include #include #include diff --git a/src/transport/tcp/tcp_connection.cpp b/src/transport/tcp/tcp_connection.cpp index af531a896..890c935f7 100644 --- a/src/transport/tcp/tcp_connection.cpp +++ b/src/transport/tcp/tcp_connection.cpp @@ -6,7 +6,6 @@ #include -#include #include #include #include diff --git a/test/acceptance/p2p/host/protocol/client_test_session.cpp b/test/acceptance/p2p/host/protocol/client_test_session.cpp index 11760ccd8..487c23846 100644 --- a/test/acceptance/p2p/host/protocol/client_test_session.cpp +++ b/test/acceptance/p2p/host/protocol/client_test_session.cpp @@ -7,7 +7,7 @@ #include #include -#include +#include #include #include #include @@ -52,16 +52,16 @@ namespace libp2p::protocol { read_buf_ = std::vector(buffer_size_); - readReturnSize(stream_, - read_buf_, - [self = shared_from_this(), - cb{std::move(cb)}](outcome::result rr) mutable { - if (!rr) { - return cb(rr.error()); - } - cb(std::move(self->read_buf_)); - return self->write(std::move(cb)); - }); + libp2p::read(stream_, + read_buf_, + [self = shared_from_this(), + cb{std::move(cb)}](outcome::result rr) mutable { + if (!rr) { + return cb(rr.error()); + } + cb(std::move(self->read_buf_)); + return self->write(std::move(cb)); + }); } } // namespace libp2p::protocol diff --git a/test/acceptance/p2p/muxer.cpp b/test/acceptance/p2p/muxer.cpp index 938a355f1..c5f2d59d7 100644 --- a/test/acceptance/p2p/muxer.cpp +++ b/test/acceptance/p2p/muxer.cpp @@ -6,7 +6,7 @@ #include #include -#include +#include #include #include #include @@ -251,19 +251,19 @@ struct Client : public std::enable_shared_from_this { auto readbuf = std::make_shared>(); readbuf->resize(buf->size()); - readReturnSize(stream, - *readbuf, - [round, streamId, buf, readbuf, stream, this]( - outcome::result rread) { - ASSERT_OUTCOME_SUCCESS(read, rread); - this->println( - streamId, " readSome ", read, " bytes"); - this->streamReads++; + libp2p::read(stream, + *readbuf, + [round, streamId, buf, readbuf, stream, this]( + outcome::result rread) { + ASSERT_OUTCOME_SUCCESS(rread); + this->println( + streamId, " readSome ", buf->size(), " bytes"); + this->streamReads++; - ASSERT_EQ(*buf, *readbuf); + ASSERT_EQ(*buf, *readbuf); - this->onStream(streamId, round - 1, stream); - }); + this->onStream(streamId, round - 1, stream); + }); }); } diff --git a/test/libp2p/connection/loopback_stream/loopback_stream_test.cpp b/test/libp2p/connection/loopback_stream/loopback_stream_test.cpp index 00adc74f8..2d560ebf7 100644 --- a/test/libp2p/connection/loopback_stream/loopback_stream_test.cpp +++ b/test/libp2p/connection/loopback_stream/loopback_stream_test.cpp @@ -6,7 +6,7 @@ #include #include -#include +#include #include #include #include @@ -59,15 +59,14 @@ TEST_F(LoopbackStreamTest, Basic) { auto read_buf = std::make_shared(kBufferSize, 0); ASSERT_EQ(read_buf->size(), kBufferSize); ASSERT_NE(*read_buf, buf); - libp2p::readReturnSize( - stream, - *read_buf, - [buf = read_buf, source_buf = buf, &all_executed](auto result) { - ASSERT_OUTCOME_SUCCESS(bytes, result); - ASSERT_EQ(bytes, kBufferSize); - ASSERT_EQ(*buf, source_buf); - all_executed = true; - }); + libp2p::read(stream, + *read_buf, + [buf = read_buf, source_buf = buf, &all_executed]( + outcome::result result) { + ASSERT_OUTCOME_SUCCESS(result); + ASSERT_EQ(*buf, source_buf); + all_executed = true; + }); }); context->run(); ASSERT_TRUE(all_executed); diff --git a/test/libp2p/connection/security_conn/plaintext_connection_test.cpp b/test/libp2p/connection/security_conn/plaintext_connection_test.cpp index d54694e5e..b90ec8ceb 100644 --- a/test/libp2p/connection/security_conn/plaintext_connection_test.cpp +++ b/test/libp2p/connection/security_conn/plaintext_connection_test.cpp @@ -5,7 +5,7 @@ */ #include -#include +#include #include #include #include @@ -125,9 +125,8 @@ TEST_F(PlaintextConnectionTest, Read) { const int size = 100; EXPECT_CALL_READ(*connection_).WILL_READ_SIZE(size); auto buf = std::make_shared>(size, 0); - libp2p::readReturnSize(secure_connection_, *buf, [size, buf](auto &&res) { + libp2p::read(secure_connection_, *buf, [buf](outcome::result res) { ASSERT_OUTCOME_SUCCESS(res); - ASSERT_EQ(res.value(), size); }); } diff --git a/test/libp2p/muxer/muxers_and_streams_test.cpp b/test/libp2p/muxer/muxers_and_streams_test.cpp index e51bd7d67..390ca3585 100644 --- a/test/libp2p/muxer/muxers_and_streams_test.cpp +++ b/test/libp2p/muxer/muxers_and_streams_test.cpp @@ -10,7 +10,7 @@ #include #include -#include +#include #include #include @@ -171,14 +171,15 @@ namespace libp2p::regression { stats_.put(Stats::FATAL_ERROR); return behavior_(*this); } - readReturnSize(stream, - *read_buf_, - [wptr = weak_from_this(), buf = read_buf_](auto res) { - auto self = wptr.lock(); - if (self) { - self->onRead(res); - } - }); + libp2p::read(stream, + *read_buf_, + [wptr = weak_from_this(), + buf = read_buf_](outcome::result res) { + auto self = wptr.lock(); + if (self) { + self->onRead(res); + } + }); } void write(WhatStream what_stream = ANY_STREAM) { @@ -268,12 +269,12 @@ namespace libp2p::regression { behavior_(*this); } - void onRead(outcome::result res) { - if (!res || res.value() != read_buf_->size()) { + void onRead(outcome::result res) { + if (not res.has_value()) { TRACE("({}): read error", stats_.node_id); stats_.put(Stats::READ_FAILURE); } else { - TRACE("({}): read {} bytes", stats_.node_id, res.value()); + TRACE("({}): read {} bytes", stats_.node_id, read_buf_->size()); stats_.put(Stats::READ); } behavior_(*this); diff --git a/test/libp2p/transport/tcp/tcp_integration_test.cpp b/test/libp2p/transport/tcp/tcp_integration_test.cpp index c8fcbb69e..87758ae93 100644 --- a/test/libp2p/transport/tcp/tcp_integration_test.cpp +++ b/test/libp2p/transport/tcp/tcp_integration_test.cpp @@ -8,7 +8,7 @@ #include #include #include -#include +#include #include #include #include @@ -187,11 +187,12 @@ TEST(TCP, SingleListenerCanAcceptManyClients) { *buf, [conn, readback, buf, context](outcome::result res) { ASSERT_OUTCOME_SUCCESS(res); - libp2p::readReturnSize( - conn, *readback, [conn, readback, buf, context](auto &&res) { + libp2p::read( + conn, + *readback, + [conn, readback, buf, context](outcome::result res) { context->stop(); ASSERT_OUTCOME_SUCCESS(res); - ASSERT_EQ(res.value(), readback->size()); ASSERT_EQ(*buf, *readback); }); }); @@ -344,12 +345,12 @@ TEST(TCP, OneTransportServerHandlesManyClients) { libp2p::write( conn, *buf, [conn, readback, buf](outcome::result res) { ASSERT_OUTCOME_SUCCESS(res); - libp2p::readReturnSize( - conn, *readback, [conn, readback, buf](auto &&res) { - ASSERT_OUTCOME_SUCCESS(res); - ASSERT_EQ(res.value(), readback->size()); - ASSERT_EQ(*buf, *readback); - }); + libp2p::read(conn, + *readback, + [conn, readback, buf](outcome::result res) { + ASSERT_OUTCOME_SUCCESS(res); + ASSERT_EQ(*buf, *readback); + }); }); }); diff --git a/test/mock/libp2p/connection/capable_connection_mock.hpp b/test/mock/libp2p/connection/capable_connection_mock.hpp index 8b305d26a..bb25cd3a1 100644 --- a/test/mock/libp2p/connection/capable_connection_mock.hpp +++ b/test/mock/libp2p/connection/capable_connection_mock.hpp @@ -8,7 +8,6 @@ #include -#include #include namespace libp2p::connection { diff --git a/test/mock/libp2p/connection/layer_connection_mock.hpp b/test/mock/libp2p/connection/layer_connection_mock.hpp index 741a9d89d..6009ff46e 100644 --- a/test/mock/libp2p/connection/layer_connection_mock.hpp +++ b/test/mock/libp2p/connection/layer_connection_mock.hpp @@ -6,7 +6,6 @@ #pragma once -#include #include #include diff --git a/test/mock/libp2p/connection/stream_mock.hpp b/test/mock/libp2p/connection/stream_mock.hpp index 0dcd61308..50cb42941 100644 --- a/test/mock/libp2p/connection/stream_mock.hpp +++ b/test/mock/libp2p/connection/stream_mock.hpp @@ -6,7 +6,6 @@ #pragma once -#include #include #include