diff --git a/example/02-kademlia/rendezvous_chat.cpp b/example/02-kademlia/rendezvous_chat.cpp index ae00d5f15..581d42e97 100644 --- a/example/02-kademlia/rendezvous_chat.cpp +++ b/example/02-kademlia/rendezvous_chat.cpp @@ -11,12 +11,13 @@ #include -#include +#include #include #include #include #include #include +#include using libp2p::common::operator""_unhex; @@ -69,10 +70,10 @@ class Session : public std::enable_shared_from_this { return false; } - libp2p::writeReturnSize( + libp2p::write( stream_, *buffer, - [self = shared_from_this(), buffer](outcome::result result) { + [self = shared_from_this(), buffer](outcome::result result) { if (not result) { self->close(); std::cout << self->stream_->remotePeerId().value().toBase58() @@ -80,8 +81,7 @@ class Session : public std::enable_shared_from_this { return; } std::cout << self->stream_->remotePeerId().value().toBase58() << " < " - << std::string(buffer->begin(), - buffer->begin() + result.value()); + << qtils::byte2str(*buffer); std::cout.flush(); }); return true; diff --git a/include/libp2p/basic/message_read_writer.hpp b/include/libp2p/basic/message_read_writer.hpp index d78061b5e..184c8c80d 100644 --- a/include/libp2p/basic/message_read_writer.hpp +++ b/include/libp2p/basic/message_read_writer.hpp @@ -8,6 +8,7 @@ #include +#include #include namespace libp2p::basic { @@ -36,6 +37,6 @@ namespace libp2p::basic { * @param cb is called when the message is written or an error happened. * Quantity of bytes written is passed as an argument in case of success */ - virtual void write(BytesIn buffer, Writer::WriteCallbackFunc cb) = 0; + virtual void write(BytesIn buffer, CbOutcomeVoid cb) = 0; }; } // namespace libp2p::basic diff --git a/include/libp2p/basic/message_read_writer_bigendian.hpp b/include/libp2p/basic/message_read_writer_bigendian.hpp index 24ff3fdb9..e0e76c170 100644 --- a/include/libp2p/basic/message_read_writer_bigendian.hpp +++ b/include/libp2p/basic/message_read_writer_bigendian.hpp @@ -39,7 +39,7 @@ namespace libp2p::basic { * @param buffer - the message to be written * @param cb, which is called, when the message is read or error happens */ - void write(BytesIn buffer, Writer::WriteCallbackFunc cb) override; + void write(BytesIn buffer, CbOutcomeVoid cb) override; private: std::shared_ptr conn_; diff --git a/include/libp2p/basic/message_read_writer_uvarint.hpp b/include/libp2p/basic/message_read_writer_uvarint.hpp index c0934878c..5abe5733b 100644 --- a/include/libp2p/basic/message_read_writer_uvarint.hpp +++ b/include/libp2p/basic/message_read_writer_uvarint.hpp @@ -35,7 +35,7 @@ namespace libp2p::basic { * @param buffer - the message to be written * @param cb, which is called, when the message is read or error happens */ - void write(BytesIn buffer, Writer::WriteCallbackFunc cb) override; + void write(BytesIn buffer, CbOutcomeVoid cb) override; private: std::shared_ptr conn_; diff --git a/include/libp2p/basic/protobuf_message_read_writer.hpp b/include/libp2p/basic/protobuf_message_read_writer.hpp index 4d73c5609..85bb8a3a8 100644 --- a/include/libp2p/basic/protobuf_message_read_writer.hpp +++ b/include/libp2p/basic/protobuf_message_read_writer.hpp @@ -72,7 +72,7 @@ namespace libp2p::basic { typename = std::enable_if_t< std::is_default_constructible::value>> void write(const ProtoMsgType &msg, - Writer::WriteCallbackFunc cb, + CbOutcomeVoid cb, const std::shared_ptr> &bytes = nullptr) { auto msg_bytes = std::make_shared>(msg.ByteSize(), 0); diff --git a/include/libp2p/basic/read_return_size.hpp b/include/libp2p/basic/read_return_size.hpp deleted file mode 100644 index 423fec02c..000000000 --- a/include/libp2p/basic/read_return_size.hpp +++ /dev/null @@ -1,26 +0,0 @@ -/** - * Copyright Quadrivium LLC - * All Rights Reserved - * SPDX-License-Identifier: Apache-2.0 - */ - -#pragma once - -#include - -namespace libp2p { - /// Read exactly `out.size()` bytes - inline void readReturnSize(const std::shared_ptr &reader, - BytesOut out, - basic::Reader::ReadCallbackFunc cb) { - read(reader, - out, - [n{out.size()}, cb{std::move(cb)}](outcome::result r) { - if (r.has_error()) { - cb(r.error()); - } else { - cb(n); - } - }); - } -} // namespace libp2p diff --git a/include/libp2p/basic/write_return_size.hpp b/include/libp2p/basic/write_return_size.hpp deleted file mode 100644 index b5b4f4030..000000000 --- a/include/libp2p/basic/write_return_size.hpp +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Copyright Quadrivium LLC - * All Rights Reserved - * SPDX-License-Identifier: Apache-2.0 - */ - -#pragma once - -#include - -namespace libp2p { - /// Write exactly `in.size()` bytes - inline void writeReturnSize(const std::shared_ptr &writer, - BytesIn in, - basic::Writer::WriteCallbackFunc cb) { - write( - writer, in, [n{in.size()}, cb{std::move(cb)}](outcome::result r) { - if (r.has_error()) { - cb(r.error()); - } else { - cb(n); - } - }); - } -} // namespace libp2p diff --git a/include/libp2p/connection/as_asio_read_write.hpp b/include/libp2p/connection/as_asio_read_write.hpp index 3025e4afe..66d26de45 100644 --- a/include/libp2p/connection/as_asio_read_write.hpp +++ b/include/libp2p/connection/as_asio_read_write.hpp @@ -67,8 +67,7 @@ namespace libp2p { boost::asio::detail::buffer_sequence_adapter< boost::asio::mutable_buffer, MutableBufferSequence>::first(buffers)}; - impl->readSome( - asioBuffer(buffer), wrapCb(std::forward(cb))); + impl->readSome(asioBuffer(buffer), wrapCb(std::forward(cb))); } template @@ -77,8 +76,7 @@ namespace libp2p { boost::asio::detail::buffer_sequence_adapter< boost::asio::const_buffer, ConstBufferSequence>::first(buffers)}; - impl->writeSome( - asioBuffer(buffer), wrapCb(std::forward(cb))); + impl->writeSome(asioBuffer(buffer), wrapCb(std::forward(cb))); } std::shared_ptr io; diff --git a/include/libp2p/muxer/mplex/mplexed_connection.hpp b/include/libp2p/muxer/mplex/mplexed_connection.hpp index 166af8836..85a134f1e 100644 --- a/include/libp2p/muxer/mplex/mplexed_connection.hpp +++ b/include/libp2p/muxer/mplex/mplexed_connection.hpp @@ -10,6 +10,7 @@ #include #include +#include #include #include #include @@ -74,7 +75,7 @@ namespace libp2p::connection { private: struct WriteData { Bytes data; - WriteCallbackFunc cb; + CbOutcomeVoid cb; }; std::queue write_queue_; bool is_writing_ = false; @@ -92,7 +93,7 @@ namespace libp2p::connection { /** * Called, when write is complete */ - void onWriteCompleted(outcome::result write_res); + void onWriteCompleted(outcome::result write_res); /** * Read next frame from the connection diff --git a/include/libp2p/network/impl/listener_manager_impl.hpp b/include/libp2p/network/impl/listener_manager_impl.hpp index 08582ba73..fa9016868 100644 --- a/include/libp2p/network/impl/listener_manager_impl.hpp +++ b/include/libp2p/network/impl/listener_manager_impl.hpp @@ -52,9 +52,9 @@ namespace libp2p::network { private: bool started = false; - // clang-format off - std::unordered_map> listeners_; - // clang-format on + std::unordered_map> + listeners_; std::shared_ptr multiselect_; std::shared_ptr router_; diff --git a/include/libp2p/protocol/identify/identify_msg_processor.hpp b/include/libp2p/protocol/identify/identify_msg_processor.hpp index 7318d9e46..c9689b936 100644 --- a/include/libp2p/protocol/identify/identify_msg_processor.hpp +++ b/include/libp2p/protocol/identify/identify_msg_processor.hpp @@ -80,7 +80,7 @@ namespace libp2p::protocol { * @param written_bytes - how much bytes were written * @param stream with the other side */ - void identifySent(outcome::result written_bytes, + void identifySent(outcome::result written_bytes, const StreamSPtr &stream); /** diff --git a/include/libp2p/protocol/ping/ping_client_session.hpp b/include/libp2p/protocol/ping/ping_client_session.hpp index 00c9e5e46..a70d22e91 100644 --- a/include/libp2p/protocol/ping/ping_client_session.hpp +++ b/include/libp2p/protocol/ping/ping_client_session.hpp @@ -46,11 +46,11 @@ namespace libp2p::protocol { private: void write(); - void writeCompleted(outcome::result r); + void writeCompleted(outcome::result r); void read(); - void readCompleted(outcome::result r); + void readCompleted(outcome::result r); void close(); diff --git a/include/libp2p/protocol_muxer/multiselect/multiselect_instance.hpp b/include/libp2p/protocol_muxer/multiselect/multiselect_instance.hpp index 4133332d8..a6017fe3d 100644 --- a/include/libp2p/protocol_muxer/multiselect/multiselect_instance.hpp +++ b/include/libp2p/protocol_muxer/multiselect/multiselect_instance.hpp @@ -60,7 +60,7 @@ namespace libp2p::protocol_muxer::multiselect { void send(Packet packet); /// Called when write operation completes - void onDataWritten(outcome::result res); + void onDataWritten(outcome::result res); /// Closes the negotiation session with result, returns instance to owner void close(outcome::result result); @@ -69,7 +69,7 @@ namespace libp2p::protocol_muxer::multiselect { void receive(); /// Called on read operations completion - void onDataRead(outcome::result res); + void onDataRead(outcome::result res); /// Processes parsed messages, called from onDataRead MaybeResult processMessages(); diff --git a/include/libp2p/security/noise/handshake.hpp b/include/libp2p/security/noise/handshake.hpp index 849b2b5f1..81b0bdd16 100644 --- a/include/libp2p/security/noise/handshake.hpp +++ b/include/libp2p/security/noise/handshake.hpp @@ -46,8 +46,7 @@ namespace libp2p::security::noise { outcome::result> generateHandshakePayload( const DHKey &keypair); - void sendHandshakeMessage(BytesIn payload, - basic::Writer::WriteCallbackFunc cb); + void sendHandshakeMessage(BytesIn payload, CbOutcomeVoid cb); void readHandshakeMessage(basic::MessageReadWriter::ReadCallbackFunc cb); diff --git a/include/libp2p/security/noise/insecure_rw.hpp b/include/libp2p/security/noise/insecure_rw.hpp index d12129f61..ae0bc7109 100644 --- a/include/libp2p/security/noise/insecure_rw.hpp +++ b/include/libp2p/security/noise/insecure_rw.hpp @@ -38,7 +38,7 @@ namespace libp2p::security::noise { void read(ReadCallbackFunc cb) override; /// write the given bytes to the network - void write(BytesIn buffer, basic::Writer::WriteCallbackFunc cb) override; + void write(BytesIn buffer, CbOutcomeVoid cb) override; private: std::shared_ptr connection_; diff --git a/src/basic/message_read_writer_bigendian.cpp b/src/basic/message_read_writer_bigendian.cpp index a00d48205..67c95c643 100644 --- a/src/basic/message_read_writer_bigendian.cpp +++ b/src/basic/message_read_writer_bigendian.cpp @@ -11,9 +11,10 @@ #include #include #include -#include -#include +#include +#include #include +#include namespace libp2p::basic { MessageReadWriterBigEndian::MessageReadWriterBigEndian( @@ -25,29 +26,26 @@ namespace libp2p::basic { void MessageReadWriterBigEndian::read(ReadCallbackFunc cb) { auto buffer = std::make_shared>(); buffer->resize(kLenMarkerSize); - readReturnSize( - conn_, - *buffer, - [self{shared_from_this()}, buffer, cb{std::move(cb)}](auto &&result) { - if (not result) { - return cb(result.error()); - } - uint32_t msg_len = ntohl( // NOLINT - common::convert(buffer->data())); - buffer->resize(msg_len); - std::fill(buffer->begin(), buffer->end(), 0u); - readReturnSize( - self->conn_, *buffer, [self, buffer, cb](auto &&result) { - if (not result) { - return cb(result.error()); - } - cb(buffer); - }); - }); + libp2p::read(conn_, + *buffer, + [self{shared_from_this()}, buffer, cb{std::move(cb)}]( + outcome::result result) { + IF_ERROR_CB_RETURN(result); + uint32_t msg_len = ntohl( // NOLINT + common::convert(buffer->data())); + buffer->resize(msg_len); + std::fill(buffer->begin(), buffer->end(), 0u); + libp2p::read( + self->conn_, + *buffer, + [self, buffer, cb](outcome::result result) { + IF_ERROR_CB_RETURN(result); + cb(buffer); + }); + }); } - void MessageReadWriterBigEndian::write(BytesIn buffer, - Writer::WriteCallbackFunc cb) { + void MessageReadWriterBigEndian::write(BytesIn buffer, CbOutcomeVoid cb) { if (buffer.empty()) { // TODO(107): Reentrancy return cb(MessageReadWriterError::BUFFER_IS_EMPTY); @@ -57,14 +55,12 @@ namespace libp2p::basic { raw_buf.reserve(kLenMarkerSize + buffer.size()); common::putUint32BE(raw_buf, buffer.size()); raw_buf.insert(raw_buf.end(), buffer.begin(), buffer.end()); - writeReturnSize( - conn_, - raw_buf, - [self{shared_from_this()}, cb{std::move(cb)}](auto &&result) { - if (not result) { - return cb(result.error()); - } - cb(result.value() - self->kLenMarkerSize); - }); + libp2p::write(conn_, + raw_buf, + [self{shared_from_this()}, + cb{std::move(cb)}](outcome::result result) { + IF_ERROR_CB_RETURN(result); + cb(outcome::success()); + }); } } // namespace libp2p::basic diff --git a/src/basic/message_read_writer_uvarint.cpp b/src/basic/message_read_writer_uvarint.cpp index 23d687cec..58b9fcce1 100644 --- a/src/basic/message_read_writer_uvarint.cpp +++ b/src/basic/message_read_writer_uvarint.cpp @@ -11,9 +11,10 @@ #include #include #include -#include +#include #include -#include +#include +#include #include namespace libp2p::basic { @@ -35,23 +36,20 @@ namespace libp2p::basic { auto msg_len = varint_res.value().toUInt64(); if (0 != msg_len) { auto buffer = std::make_shared>(msg_len, 0); - readReturnSize( - self->conn_, - *buffer, - [self, buffer, cb = std::move(cb)](auto &&res) mutable { - if (!res) { - return cb(res.error()); - } - cb(std::move(buffer)); - }); + libp2p::read(self->conn_, + *buffer, + [self, buffer, cb = std::move(cb)]( + outcome::result result) mutable { + IF_ERROR_CB_RETURN(result); + cb(std::move(buffer)); + }); } else { cb(ResultType{}); } }); } - void MessageReadWriterUvarint::write(BytesIn buffer, - Writer::WriteCallbackFunc cb) { + void MessageReadWriterUvarint::write(BytesIn buffer, CbOutcomeVoid cb) { auto varint_len = multi::UVarint{static_cast(buffer.size())}; auto msg_bytes = std::make_shared>(); @@ -61,16 +59,13 @@ namespace libp2p::basic { std::make_move_iterator(varint_len.toVector().end())); msg_bytes->insert(msg_bytes->end(), buffer.begin(), buffer.end()); - writeReturnSize(conn_, - *msg_bytes, - [cb = std::move(cb), - varint_size = varint_len.size(), - msg_bytes](auto &&res) { - if (!res) { - return cb(res.error()); - } - // hide a written varint from the user of the method - cb(res.value() - varint_size); - }); + libp2p::write(conn_, + *msg_bytes, + [cb = std::move(cb), + varint_size = varint_len.size(), + msg_bytes](outcome::result result) { + IF_ERROR_CB_RETURN(result); + cb(outcome::success()); + }); } } // namespace libp2p::basic diff --git a/src/basic/varint_reader.cpp b/src/basic/varint_reader.cpp index 55439aa20..94a08def0 100644 --- a/src/basic/varint_reader.cpp +++ b/src/basic/varint_reader.cpp @@ -4,8 +4,9 @@ * SPDX-License-Identifier: Apache-2.0 */ +#include #include -#include +#include #include @@ -44,26 +45,22 @@ namespace libp2p::basic { return cb(Error::NO_VARINT); } - // NOLINTNEXTLINE(cppcoreguidelines-pro-bounds-pointer-arithmetic) - readReturnSize( - conn, - std::span(varint_buf->data() + current_length, 1), - [c = conn, cb = std::move(cb), current_length, varint_buf]( - auto &&res) mutable { - if (not res.has_value()) { - return cb(res.error()); - } + read(conn, + BytesOut{*varint_buf}.subspan(current_length, 1), + [c = conn, cb = std::move(cb), current_length, varint_buf]( + outcome::result result) mutable { + IF_ERROR_CB_RETURN(result); - auto varint_opt = multi::UVarint::create( - std::span(varint_buf->data(), current_length + 1)); - if (varint_opt) { - return cb(*varint_opt); - } + auto varint_opt = multi::UVarint::create( + BytesIn{*varint_buf}.first(current_length + 1)); + if (varint_opt) { + return cb(*varint_opt); + } - readVarint(std::move(c), - std::move(cb), - ++current_length, - std::move(varint_buf)); - }); + readVarint(std::move(c), + std::move(cb), + current_length + 1, + std::move(varint_buf)); + }); } } // namespace libp2p::basic diff --git a/src/connection/loopback_stream.cpp b/src/connection/loopback_stream.cpp index 2b3d52b1c..d1e065204 100644 --- a/src/connection/loopback_stream.cpp +++ b/src/connection/loopback_stream.cpp @@ -6,8 +6,6 @@ #include -#include - namespace libp2p::connection { namespace { @@ -56,7 +54,7 @@ namespace libp2p::connection { }; void LoopbackStream::adjustWindowSize(uint32_t new_size, - VoidResultHandlerFunc cb){}; + VoidResultHandlerFunc cb) {}; outcome::result LoopbackStream::isInitiator() const { return outcome::success(false); @@ -88,8 +86,9 @@ namespace libp2p::connection { return deferWriteCallback(Error::STREAM_INVALID_ARGUMENT, std::move(cb)); } - if (boost::asio::buffer_copy(buffer_.prepare(in.size()), - boost::asio::const_buffer(in.data(), in.size())) + if (boost::asio::buffer_copy( + buffer_.prepare(in.size()), + boost::asio::const_buffer(in.data(), in.size())) != in.size()) { return deferWriteCallback(Error::STREAM_INTERNAL_ERROR, std::move(cb)); } @@ -121,9 +120,8 @@ namespace libp2p::connection { // this lambda checks, if there's enough data in our read buffer, and gives // it to the caller, if so - auto read_lambda = [self{shared_from_this()}, - cb{std::move(cb)}, - out](outcome::result res) mutable { + auto read_lambda = [self{shared_from_this()}, cb{std::move(cb)}, out]( + outcome::result res) mutable { if (!res) { self->data_notified_ = true; self->deferReadCallback(res.as_failure(), std::move(cb)); diff --git a/src/layer/websocket/ssl_connection.cpp b/src/layer/websocket/ssl_connection.cpp index 4c3804160..ade57ac9d 100644 --- a/src/layer/websocket/ssl_connection.cpp +++ b/src/layer/websocket/ssl_connection.cpp @@ -6,7 +6,6 @@ #include -#include #include #include diff --git a/src/layer/websocket/ws_connection.cpp b/src/layer/websocket/ws_connection.cpp index 251893174..39cd3e4d2 100644 --- a/src/layer/websocket/ws_connection.cpp +++ b/src/layer/websocket/ws_connection.cpp @@ -6,7 +6,6 @@ #include -#include #include #include #include @@ -102,7 +101,7 @@ namespace libp2p::connection { ws_.async_read_some(asioBuffer(out), std::move(on_read)); } - void WsConnection::writeSome(BytesIn in, // + void WsConnection::writeSome(BytesIn in, libp2p::basic::Writer::WriteCallbackFunc cb) { SL_TRACE(log_, "write some upto {} bytes", in.size()); ws_.async_write_some(true, asioBuffer(in), toAsioCbSize(std::move(cb))); diff --git a/src/muxer/mplex/mplex_frame.cpp b/src/muxer/mplex/mplex_frame.cpp index 6272bfdb9..40c5ec601 100644 --- a/src/muxer/mplex/mplex_frame.cpp +++ b/src/muxer/mplex/mplex_frame.cpp @@ -6,8 +6,9 @@ #include -#include +#include #include +#include #include #include @@ -101,16 +102,13 @@ namespace libp2p::connection { // read data std::shared_ptr data = std::make_shared(length, 0); - readReturnSize(reader, - *data, - [id_flag, data, cb{std::move(cb)}]( - auto &&read_res) mutable { - if (!read_res) { - return cb(read_res.error()); - } - - cb(createFrame(id_flag, std::move(*data))); - }); + libp2p::read(reader, + *data, + [id_flag, data, cb{std::move(cb)}]( + outcome::result result) mutable { + IF_ERROR_CB_RETURN(result); + cb(createFrame(id_flag, std::move(*data))); + }); }); }); } diff --git a/src/muxer/mplex/mplex_stream.cpp b/src/muxer/mplex/mplex_stream.cpp index 099e13671..ecf06cdf4 100644 --- a/src/muxer/mplex/mplex_stream.cpp +++ b/src/muxer/mplex/mplex_stream.cpp @@ -9,8 +9,7 @@ #include #include -#include -#include +#include #include #define TRY_GET_CONNECTION(tmp) \ @@ -124,7 +123,7 @@ namespace libp2p::connection { if (not self->write_queue_.empty()) { auto [in, cb] = self->write_queue_.front(); self->write_queue_.pop_front(); - writeReturnSize(self, in, cb); + self->writeSome(in, cb); } }); } diff --git a/src/muxer/mplex/mplexed_connection.cpp b/src/muxer/mplex/mplexed_connection.cpp index 3d31b9b7b..50d8d94a7 100644 --- a/src/muxer/mplex/mplexed_connection.cpp +++ b/src/muxer/mplex/mplexed_connection.cpp @@ -7,8 +7,7 @@ #include #include -#include -#include +#include #include namespace libp2p::connection { @@ -125,13 +124,11 @@ namespace libp2p::connection { return !is_active_ || connection_->isClosed(); } - void MplexedConnection::readSome(BytesOut out, - ReadCallbackFunc cb) { + void MplexedConnection::readSome(BytesOut out, ReadCallbackFunc cb) { connection_->readSome(out, std::move(cb)); } - void MplexedConnection::writeSome(BytesIn in, - WriteCallbackFunc cb) { + void MplexedConnection::writeSome(BytesIn in, WriteCallbackFunc cb) { connection_->writeSome(in, std::move(cb)); } @@ -165,19 +162,20 @@ namespace libp2p::connection { is_writing_ = true; const auto &write_data = write_queue_.front(); - return writeReturnSize( - connection_, write_data.data, [self{shared_from_this()}](auto &&res) { - self->onWriteCompleted(std::forward(res)); - }); + libp2p::write(connection_, + write_data.data, + [self{shared_from_this()}](outcome::result result) { + self->onWriteCompleted(result); + }); } - void MplexedConnection::onWriteCompleted(outcome::result write_res) { + void MplexedConnection::onWriteCompleted(outcome::result write_res) { if (!write_res) { log_->error("data write failed: {}", write_res.error()); } - - write_queue_.front().cb(std::forward(write_res)); + auto item = std::exchange(write_queue_.front(), {}); write_queue_.pop(); + item.cb(write_res); doWrite(); } diff --git a/src/muxer/yamux/yamux_stream.cpp b/src/muxer/yamux/yamux_stream.cpp index e16f2037f..173e94e58 100644 --- a/src/muxer/yamux/yamux_stream.cpp +++ b/src/muxer/yamux/yamux_stream.cpp @@ -8,7 +8,6 @@ #include -#include #include #include diff --git a/src/muxer/yamux/yamuxed_connection.cpp b/src/muxer/yamux/yamuxed_connection.cpp index 5177298ba..96b311c04 100644 --- a/src/muxer/yamux/yamuxed_connection.cpp +++ b/src/muxer/yamux/yamuxed_connection.cpp @@ -163,14 +163,12 @@ namespace libp2p::connection { return !started_ || connection_->isClosed(); } - void YamuxedConnection::readSome(BytesOut out, - ReadCallbackFunc cb) { + void YamuxedConnection::readSome(BytesOut out, ReadCallbackFunc cb) { log()->error("YamuxedConnection::readSome : invalid direct call"); deferReadCallback(Error::CONNECTION_DIRECT_IO_FORBIDDEN, std::move(cb)); } - void YamuxedConnection::writeSome(BytesIn in, - WriteCallbackFunc cb) { + void YamuxedConnection::writeSome(BytesIn in, WriteCallbackFunc cb) { log()->error("YamuxedConnection::writeSome : invalid direct call"); deferWriteCallback(Error::CONNECTION_DIRECT_IO_FORBIDDEN, std::move(cb)); } diff --git a/src/protocol/echo/client_echo_session.cpp b/src/protocol/echo/client_echo_session.cpp index f1b36ab8e..07a0dc726 100644 --- a/src/protocol/echo/client_echo_session.cpp +++ b/src/protocol/echo/client_echo_session.cpp @@ -8,7 +8,7 @@ #include -#include +#include #include namespace libp2p::protocol { @@ -33,7 +33,7 @@ namespace libp2p::protocol { auto self{shared_from_this()}; - writeReturnSize(stream_, buf_, [self](outcome::result rw) { + write(stream_, buf_, [self](outcome::result rw) { if (!rw && !self->ec_) { self->ec_ = rw.error(); self->completed(); diff --git a/src/protocol/echo/server_echo_session.cpp b/src/protocol/echo/server_echo_session.cpp index 291bea4fe..518bb3e9b 100644 --- a/src/protocol/echo/server_echo_session.cpp +++ b/src/protocol/echo/server_echo_session.cpp @@ -47,8 +47,7 @@ namespace libp2p::protocol { } stream_->readSome( - buf_, - [self{shared_from_this()}](outcome::result rread) { + buf_, [self{shared_from_this()}](outcome::result rread) { self->onRead(rread); }); } diff --git a/src/protocol/gossip/impl/stream.cpp b/src/protocol/gossip/impl/stream.cpp index e24b230a5..2b18a9f47 100644 --- a/src/protocol/gossip/impl/stream.cpp +++ b/src/protocol/gossip/impl/stream.cpp @@ -8,9 +8,9 @@ #include -#include +#include #include -#include +#include #include "message_parser.hpp" #include "peer_context.hpp" @@ -48,18 +48,15 @@ namespace libp2p::protocol::gossip { TRACE("reading length from {}:{}", peer_->str, stream_id_); - // clang-format off libp2p::basic::VarintReader::readVarint( stream_, - [self_wptr = weak_from_this(), this] - (outcome::result varint) { + [self_wptr = weak_from_this(), + this](outcome::result varint) { if (self_wptr.expired()) { return; } onLengthRead(std::move(varint)); - } - ); - // clang-format on + }); reading_ = true; } @@ -84,18 +81,18 @@ namespace libp2p::protocol::gossip { read_buffer_->resize(msg_len); - readReturnSize(stream_, - std::span(read_buffer_->data(), msg_len), - [self_wptr = weak_from_this(), this, buffer = read_buffer_]( - auto &&res) { - if (self_wptr.expired()) { - return; - } - onMessageRead(std::forward(res)); - }); + libp2p::read(stream_, + std::span(read_buffer_->data(), msg_len), + [self_wptr = weak_from_this(), this, buffer = read_buffer_]( + outcome::result res) { + if (self_wptr.expired()) { + return; + } + onMessageRead(std::forward(res)); + }); } - void Stream::onMessageRead(outcome::result res) { + void Stream::onMessageRead(outcome::result res) { if (!reading_) { return; } @@ -107,12 +104,10 @@ namespace libp2p::protocol::gossip { return; } - TRACE("read {} bytes from {}:{}", res.value(), peer_->str, stream_id_); - - if (read_buffer_->size() != res.value()) { - feedback_(peer_, Error::MESSAGE_PARSE_ERROR); - return; - } + TRACE("read {} bytes from {}:{}", + read_buffer_->size(), + peer_->str, + stream_id_); MessageParser parser; if (!parser.parse(*read_buffer_)) { @@ -161,21 +156,16 @@ namespace libp2p::protocol::gossip { TRACE("writing {} bytes to {}:{}", writing_bytes_, peer_->str, stream_id_); - // clang-format off - BytesIn span{*buffer}; - writeReturnSize( + libp2p::write( stream_, - span, - [self_wptr = weak_from_this(), this, buffer = std::move(buffer)] - (outcome::result result) - { - if (self_wptr.expired() || closed_) { + *buffer, + [weak_self{weak_from_this()}, buffer](outcome::result result) { + auto self = weak_self.lock(); + if (not self) { return; } - onMessageWritten(result); - } - ); - // clang-format on + self->onMessageWritten(result); + }); if (timeout_ > std::chrono::milliseconds::zero()) { timeout_handle_ = scheduler_.scheduleWithHandle( @@ -189,7 +179,10 @@ namespace libp2p::protocol::gossip { } } - void Stream::onMessageWritten(outcome::result res) { + void Stream::onMessageWritten(outcome::result res) { + if (closed_) { + return; + } if (writing_bytes_ == 0) { return; } @@ -199,12 +192,7 @@ namespace libp2p::protocol::gossip { return; } - TRACE("written {} bytes to {}:{}", res.value(), peer_->str, stream_id_); - - if (writing_bytes_ != res.value()) { - feedback_(peer_, Error::MESSAGE_WRITE_ERROR); - return; - } + TRACE("written {} bytes to {}:{}", writing_bytes_, peer_->str, stream_id_); endWrite(); diff --git a/src/protocol/gossip/impl/stream.hpp b/src/protocol/gossip/impl/stream.hpp index 12477a061..cc5b317be 100644 --- a/src/protocol/gossip/impl/stream.hpp +++ b/src/protocol/gossip/impl/stream.hpp @@ -50,9 +50,9 @@ namespace libp2p::protocol::gossip { private: void onLengthRead(outcome::result varint); - void onMessageRead(outcome::result res); + void onMessageRead(outcome::result res); void beginWrite(SharedBuffer buffer); - void onMessageWritten(outcome::result res); + void onMessageWritten(outcome::result res); void endWrite(); void asyncPostError(Error error); diff --git a/src/protocol/identify/identify_msg_processor.cpp b/src/protocol/identify/identify_msg_processor.cpp index b79642022..64a141ab2 100644 --- a/src/protocol/identify/identify_msg_processor.cpp +++ b/src/protocol/identify/identify_msg_processor.cpp @@ -96,13 +96,13 @@ namespace libp2p::protocol { rw->write( msg, [self{shared_from_this()}, - stream = std::move(stream)](auto &&res) mutable { - self->identifySent(std::forward(res), stream); + stream = std::move(stream)](outcome::result result) mutable { + self->identifySent(result, stream); }); } void IdentifyMessageProcessor::identifySent( - outcome::result written_bytes, const StreamSPtr &stream) { + outcome::result written_bytes, const StreamSPtr &stream) { auto [peer_id, peer_addr] = detail::getPeerIdentity(stream); if (!written_bytes) { log_->error("cannot write identify message to stream to peer {}, {}: {}", diff --git a/src/protocol/ping/ping_client_session.cpp b/src/protocol/ping/ping_client_session.cpp index 58c6c40bc..abca88479 100644 --- a/src/protocol/ping/ping_client_session.cpp +++ b/src/protocol/ping/ping_client_session.cpp @@ -7,8 +7,8 @@ #include #include -#include -#include +#include +#include #include namespace libp2p::protocol { @@ -55,14 +55,14 @@ namespace libp2p::protocol { config_.timeout); write_buffer_ = rand_gen_->randomBytes(config_.message_size); - writeReturnSize(stream_, - write_buffer_, - [self{shared_from_this()}](outcome::result r) { - self->writeCompleted(r); - }); + libp2p::write(stream_, + write_buffer_, + [self{shared_from_this()}](outcome::result r) { + self->writeCompleted(r); + }); } - void PingClientSession::writeCompleted(outcome::result r) { + void PingClientSession::writeCompleted(outcome::result r) { timer_.reset(); if (r.has_error()) { // timeout passed or error happened; in any case, we cannot ping it @@ -85,14 +85,14 @@ namespace libp2p::protocol { }, config_.timeout); - readReturnSize(stream_, - read_buffer_, - [self{shared_from_this()}](outcome::result r) { - self->readCompleted(r); - }); + libp2p::read(stream_, + read_buffer_, + [self{shared_from_this()}](outcome::result r) { + self->readCompleted(r); + }); } - void PingClientSession::readCompleted(outcome::result r) { + void PingClientSession::readCompleted(outcome::result r) { timer_.reset(); if (r.has_error() || write_buffer_ != read_buffer_) { // again, in case of any error we cannot continue to ping the peer and diff --git a/src/protocol/ping/ping_server_session.cpp b/src/protocol/ping/ping_server_session.cpp index d5de0231a..77be9204c 100644 --- a/src/protocol/ping/ping_server_session.cpp +++ b/src/protocol/ping/ping_server_session.cpp @@ -7,8 +7,8 @@ #include #include -#include -#include +#include +#include #include namespace libp2p::protocol { @@ -32,13 +32,14 @@ namespace libp2p::protocol { return; } - readReturnSize( - stream_, buffer_, [self{shared_from_this()}](auto &&read_res) { - if (!read_res) { - return; - } - self->readCompleted(); - }); + libp2p::read(stream_, + buffer_, + [self{shared_from_this()}](outcome::result read_res) { + if (!read_res) { + return; + } + self->readCompleted(); + }); } void PingServerSession::readCompleted() { @@ -50,7 +51,7 @@ namespace libp2p::protocol { return; } - writeReturnSize( + libp2p::write( stream_, buffer_, [self{shared_from_this()}](auto &&write_res) { if (!write_res) { return; diff --git a/src/protocol_muxer/multiselect/multiselect_instance.cpp b/src/protocol_muxer/multiselect/multiselect_instance.cpp index 701f7c093..56e3b7acb 100644 --- a/src/protocol_muxer/multiselect/multiselect_instance.cpp +++ b/src/protocol_muxer/multiselect/multiselect_instance.cpp @@ -9,9 +9,9 @@ #include #include -#include +#include #include -#include +#include #include #include #include @@ -144,23 +144,20 @@ namespace libp2p::protocol_muxer::multiselect { return; } - auto span = BytesIn(*packet); - - writeReturnSize(connection_, - span, - [wptr = weak_from_this(), - round = current_round_, - packet = std::move(packet)](outcome::result res) { - auto self = wptr.lock(); - if (self && self->current_round_ == round) { - self->onDataWritten(res); - } - }); + write(connection_, + *packet, + [wptr = weak_from_this(), round = current_round_, packet]( + outcome::result res) { + auto self = wptr.lock(); + if (self && self->current_round_ == round) { + self->onDataWritten(res); + } + }); is_writing_ = true; } - void MultiselectInstance::onDataWritten(outcome::result res) { + void MultiselectInstance::onDataWritten(outcome::result res) { is_writing_ = false; if (!res) { @@ -214,31 +211,25 @@ namespace libp2p::protocol_muxer::multiselect { BytesOut span(*read_buffer_); span = span.first(static_cast(bytes_needed)); - readReturnSize(connection_, - span, - [wptr = weak_from_this(), - round = current_round_, - packet = read_buffer_](outcome::result res) { - auto self = wptr.lock(); - if (self && self->current_round_ == round) { - self->onDataRead(res); - } - }); + libp2p::read(connection_, + span, + [wptr = weak_from_this(), + round = current_round_, + packet = read_buffer_](outcome::result res) { + auto self = wptr.lock(); + if (self && self->current_round_ == round) { + self->onDataRead(res); + } + }); } - void MultiselectInstance::onDataRead(outcome::result res) { + void MultiselectInstance::onDataRead(outcome::result res) { if (!res) { return close(res.error()); } - auto bytes_read = res.value(); - if (bytes_read > read_buffer_->size()) { - log()->error("onDataRead(): invalid state"); - return close(ProtocolMuxer::Error::INTERNAL_ERROR); - } - BytesIn span(*read_buffer_); - span = span.first(static_cast(bytes_read)); + span = span.first(static_cast(parser_.bytesNeeded())); boost::optional> got_result; diff --git a/src/protocol_muxer/multiselect/simple_stream_negotiate.cpp b/src/protocol_muxer/multiselect/simple_stream_negotiate.cpp index 0345d0931..fc964406a 100644 --- a/src/protocol_muxer/multiselect/simple_stream_negotiate.cpp +++ b/src/protocol_muxer/multiselect/simple_stream_negotiate.cpp @@ -6,8 +6,8 @@ #include -#include -#include +#include +#include #include #include #include @@ -43,7 +43,7 @@ namespace libp2p::protocol_muxer::multiselect { void onLastBytesRead(StreamPtr stream, const Callback &cb, const Buffers &buffers, - outcome::result res) { + outcome::result res) { if (!res) { return failed(stream, cb, res.error()); } @@ -53,15 +53,11 @@ namespace libp2p::protocol_muxer::multiselect { void onFirstBytesRead(StreamPtr stream, Callback cb, std::shared_ptr buffers, - outcome::result res) { + outcome::result res) { if (!res) { return failed(stream, cb, res.error()); } - if (res.value() != kMaxVarintSize) { - return failed(stream, cb, ProtocolMuxer::Error::INTERNAL_ERROR); - } - auto total_sz = buffers->written.size(); if (total_sz == kMaxVarintSize) { // protocol_id consists of 1 byte, not standard but possible @@ -77,11 +73,11 @@ namespace libp2p::protocol_muxer::multiselect { // NOLINTNEXTLINE(cppcoreguidelines-narrowing-conversions) span = span.subspan(kMaxVarintSize, remaining_bytes); - readReturnSize( + libp2p::read( stream, span, [stream = stream, cb = std::move(cb), buffers = std::move(buffers)]( - outcome::result res) mutable { + outcome::result res) mutable { onLastBytesRead(std::move(stream), cb, *buffers, res); }); } @@ -89,23 +85,19 @@ namespace libp2p::protocol_muxer::multiselect { void onPacketWritten(StreamPtr stream, Callback cb, std::shared_ptr buffers, - outcome::result res) { + outcome::result res) { if (!res) { return failed(stream, cb, res.error()); } - if (res.value() != buffers->written.size()) { - return failed(stream, cb, ProtocolMuxer::Error::INTERNAL_ERROR); - } - BytesOut span(buffers->read); span = span.first(kMaxVarintSize); - readReturnSize( + libp2p::read( stream, span, [stream = stream, cb = std::move(cb), buffers = std::move(buffers)]( - outcome::result res) mutable { + outcome::result res) mutable { onFirstBytesRead(stream, std::move(cb), std::move(buffers), res); }); } @@ -131,14 +123,13 @@ namespace libp2p::protocol_muxer::multiselect { BytesIn span(buffers->written); - writeReturnSize( - stream, - span, - [stream = stream, cb = std::move(cb), buffers = std::move(buffers)]( - outcome::result res) mutable { - onPacketWritten( - std::move(stream), std::move(cb), std::move(buffers), res); - }); + write(stream, + span, + [stream = stream, cb = std::move(cb), buffers = std::move(buffers)]( + outcome::result res) mutable { + onPacketWritten( + std::move(stream), std::move(cb), std::move(buffers), res); + }); } } // namespace libp2p::protocol_muxer::multiselect diff --git a/src/security/noise/handshake.cpp b/src/security/noise/handshake.cpp index b7f7a90b8..110ece087 100644 --- a/src/security/noise/handshake.cpp +++ b/src/security/noise/handshake.cpp @@ -9,6 +9,7 @@ #include #include +#include #include #include #include @@ -18,6 +19,12 @@ #include #include +#define IF_ERROR_HSCB_RETURN(result) \ + ({ \ + auto cb = [&](std::error_code ec) { self->hscb(ec); }; \ + IF_ERROR_CB_RETURN(result); \ + }) + #ifndef UNIQUE_NAME #define UNIQUE_NAME(base) base##__LINE__ #endif // UNIQUE_NAME @@ -108,18 +115,17 @@ namespace libp2p::security::noise { return noise_marshaller_->marshal(payload); } - void Handshake::sendHandshakeMessage(BytesIn payload, - basic::Writer::WriteCallbackFunc cb) { + void Handshake::sendHandshakeMessage(BytesIn payload, CbOutcomeVoid cb) { IO_OUTCOME_TRY( write_result, handshake_state_->writeMessage({}, payload), cb); auto write_cb = [self{shared_from_this()}, cb{std::move(cb)}, - wr{write_result}](outcome::result result) { - IO_OUTCOME_TRY(bytes_written, result, cb); + wr{write_result}](outcome::result result) { + IF_ERROR_CB_RETURN(result); if (wr.cs1 and wr.cs2) { self->setCipherStates(wr.cs1, wr.cs2); } - cb(bytes_written); + cb(outcome::success()); }; rw_->write(write_result.data, write_cb); } @@ -194,11 +200,9 @@ namespace libp2p::security::noise { SL_TRACE(log_, "outgoing connection. stage 0"); sendHandshakeMessage( {}, - [self{shared_from_this()}, payload{std::move(payload)}](auto result) { - IO_OUTCOME_TRY(bytes_written, result, self->hscb); - if (0 == bytes_written) { - return self->hscb(std::errc::bad_message); - } + [self{shared_from_this()}, + payload{std::move(payload)}](outcome::result result) { + IF_ERROR_HSCB_RETURN(result); // // Outgoing connection. Stage 1 // @@ -214,12 +218,12 @@ namespace libp2p::security::noise { // Outgoing connection. Stage 2 // SL_TRACE(self->log_, "outgoing connection. stage 2"); - self->sendHandshakeMessage( - payload, [self, to_write(payload.size())](auto result) { - IO_OUTCOME_TRY(bytes_written, result, self->hscb); - unused(bytes_written); - self->hscb(true); - }); + self->sendHandshakeMessage(payload, + [self, to_write(payload.size())]( + outcome::result result) { + IF_ERROR_HSCB_RETURN(result); + self->hscb(true); + }); }); }); } else { @@ -240,9 +244,9 @@ namespace libp2p::security::noise { // SL_TRACE(self->log_, "incoming connection. stage 1"); self->sendHandshakeMessage( - payload, [self, to_write(payload.size())](auto result) { - IO_OUTCOME_TRY(bytes_written, result, self->hscb); - unused(bytes_written); + payload, + [self, to_write(payload.size())](outcome::result result) { + IF_ERROR_HSCB_RETURN(result); // // Incoming connection. Stage 2 // diff --git a/src/security/noise/insecure_rw.cpp b/src/security/noise/insecure_rw.cpp index a8d612f81..70bf3ed33 100644 --- a/src/security/noise/insecure_rw.cpp +++ b/src/security/noise/insecure_rw.cpp @@ -38,8 +38,7 @@ namespace libp2p::security::noise { libp2p::read(connection_, *buffer_, std::move(read_cb)); } - void InsecureReadWriter::write(BytesIn buffer, - basic::Writer::WriteCallbackFunc cb) { + void InsecureReadWriter::write(BytesIn buffer, CbOutcomeVoid cb) { if (buffer.size() > static_cast(kMaxMsgLen)) { return cb(std::errc::message_size); } @@ -49,8 +48,9 @@ namespace libp2p::security::noise { outbuf_.insert(outbuf_.end(), buffer.begin(), buffer.end()); auto write_cb = [self{shared_from_this()}, buffer, cb{std::move(cb)}]( outcome::result result) { + std::ignore = buffer; IF_ERROR_CB_RETURN(result); - cb(buffer.size()); + cb(outcome::success()); }; libp2p::write(connection_, outbuf_, std::move(write_cb)); } diff --git a/src/security/noise/noise_connection.cpp b/src/security/noise/noise_connection.cpp index 8a2003145..3b740294c 100644 --- a/src/security/noise/noise_connection.cpp +++ b/src/security/noise/noise_connection.cpp @@ -6,7 +6,6 @@ #include -#include #include #include #include @@ -79,7 +78,7 @@ namespace libp2p::connection { // `InsecureReadWriter::write` doesn't leak `BytesIn` reference framer_->write( encrypted, - [in, cb{std::move(cb)}](outcome::result result) mutable { + [in, cb{std::move(cb)}](outcome::result result) mutable { IF_ERROR_CB_RETURN(result); cb(in.size()); }); diff --git a/src/security/plaintext/plaintext_connection.cpp b/src/security/plaintext/plaintext_connection.cpp index 17eadf700..97cab696b 100644 --- a/src/security/plaintext/plaintext_connection.cpp +++ b/src/security/plaintext/plaintext_connection.cpp @@ -7,7 +7,6 @@ #include #include -#include #include namespace libp2p::connection { @@ -58,13 +57,11 @@ namespace libp2p::connection { return original_connection_->remoteMultiaddr(); } - void PlaintextConnection::readSome(BytesOut in, - Reader::ReadCallbackFunc f) { + void PlaintextConnection::readSome(BytesOut in, Reader::ReadCallbackFunc f) { return original_connection_->readSome(in, std::move(f)); }; - void PlaintextConnection::writeSome(BytesIn in, - Writer::WriteCallbackFunc f) { + void PlaintextConnection::writeSome(BytesIn in, Writer::WriteCallbackFunc f) { return original_connection_->writeSome(in, std::move(f)); } diff --git a/src/security/secio/secio.cpp b/src/security/secio/secio.cpp index bfc267ef0..4fa75da7d 100644 --- a/src/security/secio/secio.cpp +++ b/src/security/secio/secio.cpp @@ -8,8 +8,8 @@ #include #include -#include -#include +#include +#include #include #include #include @@ -256,29 +256,24 @@ namespace libp2p::security { local_stretched_key, remote_stretched_key); SECIO_OUTCOME_VOID_TRY(secio_conn->init(), conn, cb) - writeReturnSize( - secio_conn, - self->remote_peer_rand_, - [self, conn, cb, secio_conn](auto &&write_res) { - SECIO_OUTCOME_TRY(written_bytes, write_res, conn, cb) - if (written_bytes != self->remote_peer_rand_.size()) { - return cb(Error::INITIAL_PACKET_VERIFICATION_FAILED); - } - const auto kToRead{self->propose_message_.rand.size()}; - auto buffer = std::make_shared(kToRead); - readReturnSize( - secio_conn, - *buffer, - [self, cb, conn, secio_conn, buffer](auto &&read_res) { - SECIO_OUTCOME_TRY(read_bytes, read_res, conn, cb) - if (read_bytes != buffer->size() - or *buffer != self->propose_message_.rand) { - return cb(Error::INITIAL_PACKET_VERIFICATION_FAILED); - } - SL_TRACE(self->log_, "connection initialized"); - cb(secio_conn); - }); - }); + write(secio_conn, + self->remote_peer_rand_, + [self, conn, cb, secio_conn](outcome::result write_res) { + SECIO_OUTCOME_VOID_TRY(write_res, conn, cb); + const auto kToRead{self->propose_message_.rand.size()}; + auto buffer = std::make_shared(kToRead); + read(secio_conn, + *buffer, + [self, cb, conn, secio_conn, buffer]( + outcome::result read_res) { + SECIO_OUTCOME_VOID_TRY(read_res, conn, cb) + if (*buffer != self->propose_message_.rand) { + return cb(Error::INITIAL_PACKET_VERIFICATION_FAILED); + } + SL_TRACE(self->log_, "connection initialized"); + cb(secio_conn); + }); + }); }); } diff --git a/src/security/secio/secio_connection.cpp b/src/security/secio/secio_connection.cpp index 78f6dd633..c4f09e558 100644 --- a/src/security/secio/secio_connection.cpp +++ b/src/security/secio/secio_connection.cpp @@ -9,8 +9,8 @@ #include #include -#include -#include +#include +#include #include #include #include diff --git a/src/security/tls/tls_connection.cpp b/src/security/tls/tls_connection.cpp index a26840699..b0f22d9da 100644 --- a/src/security/tls/tls_connection.cpp +++ b/src/security/tls/tls_connection.cpp @@ -6,7 +6,6 @@ #include "tls_connection.hpp" -#include #include #include @@ -139,8 +138,7 @@ namespace libp2p::connection { }; } - void TlsConnection::readSome(BytesOut out, - Reader::ReadCallbackFunc cb) { + void TlsConnection::readSome(BytesOut out, Reader::ReadCallbackFunc cb) { SL_TRACE(log(), "reading some up to {} bytes", out.size()); socket_.async_read_some(asioBuffer(out), closeOnError(*this, std::move(cb))); @@ -151,8 +149,7 @@ namespace libp2p::connection { original_connection_->deferReadCallback(res, std::move(cb)); } - void TlsConnection::writeSome(BytesIn in, - Writer::WriteCallbackFunc cb) { + void TlsConnection::writeSome(BytesIn in, Writer::WriteCallbackFunc cb) { SL_TRACE(log(), "writing some up to {} bytes", in.size()); socket_.async_write_some(asioBuffer(in), closeOnError(*this, std::move(cb))); diff --git a/src/transport/quic/connection.cpp b/src/transport/quic/connection.cpp index c39f0ef50..68d579740 100644 --- a/src/transport/quic/connection.cpp +++ b/src/transport/quic/connection.cpp @@ -32,8 +32,7 @@ namespace libp2p::transport { std::ignore = close(); } - void QuicConnection::readSome(BytesOut out, - ReadCallbackFunc cb) { + void QuicConnection::readSome(BytesOut out, ReadCallbackFunc cb) { throw std::logic_error{"QuicConnection::readSome must not be called"}; } @@ -42,8 +41,7 @@ namespace libp2p::transport { post(*io_context_, [cb{std::move(cb)}, res] { cb(res); }); } - void QuicConnection::writeSome(BytesIn in, - WriteCallbackFunc cb) { + void QuicConnection::writeSome(BytesIn in, WriteCallbackFunc cb) { throw std::logic_error{"QuicConnection::writeSome must not be called"}; } diff --git a/src/transport/quic/stream.cpp b/src/transport/quic/stream.cpp index 33e0baa60..1adb3f527 100644 --- a/src/transport/quic/stream.cpp +++ b/src/transport/quic/stream.cpp @@ -5,7 +5,6 @@ */ #include -#include #include #include #include @@ -25,8 +24,7 @@ namespace libp2p::connection { reset(); } - void QuicStream::readSome(BytesOut out, - basic::Reader::ReadCallbackFunc cb) { + void QuicStream::readSome(BytesOut out, basic::Reader::ReadCallbackFunc cb) { outcome::result r = QuicError::STREAM_CLOSED; if (not stream_ctx_) { return cb(r); @@ -51,8 +49,7 @@ namespace libp2p::connection { conn_->deferReadCallback(res, std::move(cb)); } - void QuicStream::writeSome(BytesIn in, - basic::Writer::WriteCallbackFunc cb) { + void QuicStream::writeSome(BytesIn in, basic::Writer::WriteCallbackFunc cb) { outcome::result r = QuicError::STREAM_CLOSED; if (not stream_ctx_) { return cb(r); diff --git a/src/transport/tcp/tcp_connection.cpp b/src/transport/tcp/tcp_connection.cpp index af531a896..890c935f7 100644 --- a/src/transport/tcp/tcp_connection.cpp +++ b/src/transport/tcp/tcp_connection.cpp @@ -6,7 +6,6 @@ #include -#include #include #include #include diff --git a/test/acceptance/p2p/host/protocol/client_test_session.cpp b/test/acceptance/p2p/host/protocol/client_test_session.cpp index cb0988401..487c23846 100644 --- a/test/acceptance/p2p/host/protocol/client_test_session.cpp +++ b/test/acceptance/p2p/host/protocol/client_test_session.cpp @@ -7,8 +7,9 @@ #include #include -#include -#include +#include +#include +#include #include namespace libp2p::protocol { @@ -37,16 +38,13 @@ namespace libp2p::protocol { write_buf_ = random_generator_->randomBytes(buffer_size_); - writeReturnSize(stream_, - write_buf_, - [self = shared_from_this(), - cb{std::move(cb)}](outcome::result rw) mutable { - if (!rw) { - return cb(rw.error()); - } - - self->read(cb); - }); + libp2p::write(stream_, + write_buf_, + [self = shared_from_this(), + cb{std::move(cb)}](outcome::result result) mutable { + IF_ERROR_CB_RETURN(result); + self->read(cb); + }); } void ClientTestSession::read(Callback cb) { @@ -54,16 +52,16 @@ namespace libp2p::protocol { read_buf_ = std::vector(buffer_size_); - readReturnSize(stream_, - read_buf_, - [self = shared_from_this(), - cb{std::move(cb)}](outcome::result rr) mutable { - if (!rr) { - return cb(rr.error()); - } - cb(std::move(self->read_buf_)); - return self->write(std::move(cb)); - }); + libp2p::read(stream_, + read_buf_, + [self = shared_from_this(), + cb{std::move(cb)}](outcome::result rr) mutable { + if (!rr) { + return cb(rr.error()); + } + cb(std::move(self->read_buf_)); + return self->write(std::move(cb)); + }); } } // namespace libp2p::protocol diff --git a/test/acceptance/p2p/muxer.cpp b/test/acceptance/p2p/muxer.cpp index 67731bf9d..c5f2d59d7 100644 --- a/test/acceptance/p2p/muxer.cpp +++ b/test/acceptance/p2p/muxer.cpp @@ -6,10 +6,10 @@ #include #include -#include +#include #include #include -#include +#include #include #include #include @@ -127,38 +127,33 @@ struct Server : public std::enable_shared_from_this { println("onStream executed"); - stream->readSome( - *buf, [buf, stream, this](outcome::result rread) { - if (!rread) { - if (rread.error() == RawConnection::Error::CONNECTION_CLOSED_BY_PEER - || rread.error() == Stream::Error::STREAM_RESET_BY_HOST) { - return; - } - this->println(fmt::format("readSome error: {}", rread.error())); - } - - ASSERT_OUTCOME_SUCCESS(read, rread); - - this->println("readSome ", read, " bytes"); - if (read == 0) { - return; - } - this->streamReads++; - - // 01-echo back read data - buf->resize(read); - writeReturnSize( - stream, - *buf, - [buf, read, stream, this](outcome::result rwrite) { - ASSERT_OUTCOME_SUCCESS(write, rwrite); - this->println("write ", write, " bytes"); - this->streamWrites++; - ASSERT_EQ(write, read); - - this->onStream(buf, stream); - }); - }); + stream->readSome(*buf, [buf, stream, this](outcome::result rread) { + if (!rread) { + if (rread.error() == RawConnection::Error::CONNECTION_CLOSED_BY_PEER + || rread.error() == Stream::Error::STREAM_RESET_BY_HOST) { + return; + } + this->println(fmt::format("readSome error: {}", rread.error())); + } + + ASSERT_OUTCOME_SUCCESS(read, rread); + + this->println("readSome ", read, " bytes"); + if (read == 0) { + return; + } + this->streamReads++; + + // 01-echo back read data + buf->resize(read); + libp2p::write( + stream, *buf, [buf, stream, this](outcome::result rwrite) { + ASSERT_OUTCOME_SUCCESS(rwrite); + this->println("write ", buf->size(), " bytes"); + this->streamWrites++; + this->onStream(buf, stream); + }); + }); } void listen(const Multiaddress &ma) { @@ -245,31 +240,30 @@ struct Client : public std::enable_shared_from_this { } auto buf = randomBuffer(); - writeReturnSize( + libp2p::write( stream, *buf, - [round, streamId, buf, stream, this](outcome::result rwrite) { - ASSERT_OUTCOME_SUCCESS(write, rwrite); - this->println(streamId, " write ", write, " bytes"); + [round, streamId, buf, stream, this](outcome::result rwrite) { + ASSERT_OUTCOME_SUCCESS(rwrite); + this->println(streamId, " write ", buf->size(), " bytes"); this->streamWrites++; auto readbuf = std::make_shared>(); - readbuf->resize(write); - - readReturnSize(stream, - *readbuf, - [round, streamId, write, buf, readbuf, stream, this]( - outcome::result rread) { - ASSERT_OUTCOME_SUCCESS(read, rread); - this->println( - streamId, " readSome ", read, " bytes"); - this->streamReads++; - - ASSERT_EQ(write, read); - ASSERT_EQ(*buf, *readbuf); - - this->onStream(streamId, round - 1, stream); - }); + readbuf->resize(buf->size()); + + libp2p::read(stream, + *readbuf, + [round, streamId, buf, readbuf, stream, this]( + outcome::result rread) { + ASSERT_OUTCOME_SUCCESS(rread); + this->println( + streamId, " readSome ", buf->size(), " bytes"); + this->streamReads++; + + ASSERT_EQ(*buf, *readbuf); + + this->onStream(streamId, round - 1, stream); + }); }); } diff --git a/test/libp2p/basic/message_read_writer_test.cpp b/test/libp2p/basic/message_read_writer_test.cpp index 0a4f49344..71fdfe47f 100644 --- a/test/libp2p/basic/message_read_writer_test.cpp +++ b/test/libp2p/basic/message_read_writer_test.cpp @@ -60,9 +60,8 @@ TEST_F(MessageReadWriterTest, Read) { TEST_F(MessageReadWriterTest, Write) { EXPECT_CALL_WRITE(*conn_mock_).WILL_WRITE(msg_with_varint_bytes_); - msg_rw_->write(msg_bytes_, [this](auto &&res) { + msg_rw_->write(msg_bytes_, [this](outcome::result res) { ASSERT_TRUE(res); - ASSERT_EQ(res.value(), msg_bytes_.size()); operation_completed_ = true; }); diff --git a/test/libp2p/connection/loopback_stream/loopback_stream_test.cpp b/test/libp2p/connection/loopback_stream/loopback_stream_test.cpp index fa06a0509..2d560ebf7 100644 --- a/test/libp2p/connection/loopback_stream/loopback_stream_test.cpp +++ b/test/libp2p/connection/loopback_stream/loopback_stream_test.cpp @@ -6,8 +6,8 @@ #include #include -#include -#include +#include +#include #include #include #include @@ -51,22 +51,22 @@ TEST_F(LoopbackStreamTest, Basic) { std::shared_ptr stream = std::make_shared(PeerInfo{peer_id, {}}, context); bool all_executed{false}; - libp2p::writeReturnSize( - stream, kBuffer, [stream, buf = kBuffer, &all_executed](auto result) { - ASSERT_OUTCOME_SUCCESS(bytes, result); - ASSERT_EQ(bytes, kBufferSize); + libp2p::write( + stream, + kBuffer, + [stream, buf = kBuffer, &all_executed](outcome::result result) { + ASSERT_OUTCOME_SUCCESS(result); auto read_buf = std::make_shared(kBufferSize, 0); ASSERT_EQ(read_buf->size(), kBufferSize); ASSERT_NE(*read_buf, buf); - libp2p::readReturnSize( - stream, - *read_buf, - [buf = read_buf, source_buf = buf, &all_executed](auto result) { - ASSERT_OUTCOME_SUCCESS(bytes, result); - ASSERT_EQ(bytes, kBufferSize); - ASSERT_EQ(*buf, source_buf); - all_executed = true; - }); + libp2p::read(stream, + *read_buf, + [buf = read_buf, source_buf = buf, &all_executed]( + outcome::result result) { + ASSERT_OUTCOME_SUCCESS(result); + ASSERT_EQ(*buf, source_buf); + all_executed = true; + }); }); context->run(); ASSERT_TRUE(all_executed); diff --git a/test/libp2p/connection/security_conn/plaintext_connection_test.cpp b/test/libp2p/connection/security_conn/plaintext_connection_test.cpp index eec964198..b90ec8ceb 100644 --- a/test/libp2p/connection/security_conn/plaintext_connection_test.cpp +++ b/test/libp2p/connection/security_conn/plaintext_connection_test.cpp @@ -5,8 +5,8 @@ */ #include -#include -#include +#include +#include #include #include #include @@ -125,9 +125,8 @@ TEST_F(PlaintextConnectionTest, Read) { const int size = 100; EXPECT_CALL_READ(*connection_).WILL_READ_SIZE(size); auto buf = std::make_shared>(size, 0); - libp2p::readReturnSize(secure_connection_, *buf, [size, buf](auto &&res) { + libp2p::read(secure_connection_, *buf, [buf](outcome::result res) { ASSERT_OUTCOME_SUCCESS(res); - ASSERT_EQ(res.value(), size); }); } @@ -140,9 +139,8 @@ TEST_F(PlaintextConnectionTest, Write) { const int size = 100; EXPECT_CALL_WRITE(*connection_).WILL_WRITE_SIZE(size); auto buf = std::make_shared>(size, 0); - libp2p::writeReturnSize(secure_connection_, *buf, [size, buf](auto &&res) { + libp2p::write(secure_connection_, *buf, [buf](outcome::result res) { ASSERT_OUTCOME_SUCCESS(res); - ASSERT_EQ(res.value(), size); }); } diff --git a/test/libp2p/muxer/muxers_and_streams_test.cpp b/test/libp2p/muxer/muxers_and_streams_test.cpp index a622c7dd7..390ca3585 100644 --- a/test/libp2p/muxer/muxers_and_streams_test.cpp +++ b/test/libp2p/muxer/muxers_and_streams_test.cpp @@ -10,8 +10,8 @@ #include #include -#include -#include +#include +#include #include #define TRACE_ENABLED 1 @@ -141,16 +141,14 @@ namespace libp2p::regression { void connect(const peer::PeerInfo &connect_to) { start(); - // clang-format off - host_->newStream( - connect_to, - {getProtocolId()}, - [wptr = weak_from_this()] (auto rstream) { - auto self = wptr.lock(); - if (self) { self->onConnected(std::move(rstream)); } - } - ); - // clang-format on + host_->newStream(connect_to, + {getProtocolId()}, + [wptr = weak_from_this()](auto rstream) { + auto self = wptr.lock(); + if (self) { + self->onConnected(std::move(rstream)); + } + }); } void listen(const multi::Multiaddress &listen_to) { @@ -173,16 +171,15 @@ namespace libp2p::regression { stats_.put(Stats::FATAL_ERROR); return behavior_(*this); } - // clang-format off - readReturnSize( - stream, - *read_buf_, - [wptr = weak_from_this(), buf = read_buf_] (auto res) { - auto self = wptr.lock(); - if (self) { self->onRead(res); } - } - ); - //clang-format on + libp2p::read(stream, + *read_buf_, + [wptr = weak_from_this(), + buf = read_buf_](outcome::result res) { + auto self = wptr.lock(); + if (self) { + self->onRead(res); + } + }); } void write(WhatStream what_stream = ANY_STREAM) { @@ -192,31 +189,30 @@ namespace libp2p::regression { stats_.put(Stats::FATAL_ERROR); return behavior_(*this); } - // clang-format off - writeReturnSize( - stream, - *write_buf_, - [wptr = weak_from_this(), buf = write_buf_] (auto res) { - auto self = wptr.lock(); - if (self) { self->onWrite(res); } - } - ); - //clang-format on + libp2p::write(stream, + *write_buf_, + [wptr = weak_from_this(), + buf = write_buf_](outcome::result res) { + auto self = wptr.lock(); + if (self) { + self->onWrite(res); + } + }); } void stop() { if (accepted_stream_) { auto p = accepted_stream_->remotePeerId(); if (p) { - host_->getNetwork().getConnectionManager() - .closeConnectionsToPeer(p.value()); + host_->getNetwork().getConnectionManager().closeConnectionsToPeer( + p.value()); } } if (connected_stream_) { auto p = connected_stream_->remotePeerId(); if (p) { - host_->getNetwork().getConnectionManager() - .closeConnectionsToPeer(p.value()); + host_->getNetwork().getConnectionManager().closeConnectionsToPeer( + p.value()); } } @@ -235,15 +231,14 @@ namespace libp2p::regression { void start() { if (!started_) { - // clang-format off host_->setProtocolHandler( {getProtocolId()}, - [wptr = weak_from_this()] (StreamAndProtocol stream) { + [wptr = weak_from_this()](StreamAndProtocol stream) { auto self = wptr.lock(); - if (self) { self->onAccepted(std::move(stream)); } - } - ); - // clang-format on + if (self) { + self->onAccepted(std::move(stream)); + } + }); host_->start(); started_ = true; @@ -274,23 +269,23 @@ namespace libp2p::regression { behavior_(*this); } - void onRead(outcome::result res) { - if (!res || res.value() != read_buf_->size()) { + void onRead(outcome::result res) { + if (not res.has_value()) { TRACE("({}): read error", stats_.node_id); stats_.put(Stats::READ_FAILURE); } else { - TRACE("({}): read {} bytes", stats_.node_id, res.value()); + TRACE("({}): read {} bytes", stats_.node_id, read_buf_->size()); stats_.put(Stats::READ); } behavior_(*this); } - void onWrite(outcome::result res) { - if (!res || res.value() != write_buf_->size()) { + void onWrite(outcome::result res) { + if (not res.has_value()) { TRACE("({}): write error", stats_.node_id); stats_.put(Stats::WRITE_FAILURE); } else { - TRACE("({}): written {} bytes", stats_.node_id, res.value()); + TRACE("({}): written {} bytes", stats_.node_id, write_buf_->size()); stats_.put(Stats::WRITE); } behavior_(*this); diff --git a/test/libp2p/protocol/identify_delta_test.cpp b/test/libp2p/protocol/identify_delta_test.cpp index 677bfefd5..c7763c4b9 100644 --- a/test/libp2p/protocol/identify_delta_test.cpp +++ b/test/libp2p/protocol/identify_delta_test.cpp @@ -129,12 +129,6 @@ TEST_F(IdentifyDeltaTest, Send) { .WillOnce(InvokeArgument<2>( StreamAndProtocol{stream_, kIdentifyDeltaProtocol})); - auto if_added = [&](BytesIn actual) { - auto expected = BytesIn(msg_added_protos_bytes_); - return std::equal( - actual.begin(), actual.end(), expected.begin(), expected.end()); - }; - EXPECT_CALL_WRITE(*stream_).WILL_WRITE(msg_added_protos_bytes_); id_delta_->start(); diff --git a/test/libp2p/transport/tcp/tcp_integration_test.cpp b/test/libp2p/transport/tcp/tcp_integration_test.cpp index d9fe9edef..87758ae93 100644 --- a/test/libp2p/transport/tcp/tcp_integration_test.cpp +++ b/test/libp2p/transport/tcp/tcp_integration_test.cpp @@ -8,8 +8,8 @@ #include #include #include -#include -#include +#include +#include #include #include #include @@ -146,20 +146,19 @@ TEST(TCP, SingleListenerCanAcceptManyClients) { EXPECT_FALSE(conn->isInitiator()); auto buf = std::make_shared>(kSize, 0); - conn->readSome( - *buf, [&counter, conn, buf, context](auto &&res) { - ASSERT_OUTCOME_SUCCESS(res); - - libp2p::writeReturnSize( - conn, *buf, [&counter, conn, buf, context](auto &&res) { - ASSERT_OUTCOME_SUCCESS(res); - EXPECT_EQ(res.value(), buf->size()); - counter++; - if (counter >= kClients) { - context->stop(); - } - }); - }); + conn->readSome(*buf, [&counter, conn, buf, context](auto &&res) { + ASSERT_OUTCOME_SUCCESS(res); + + libp2p::write(conn, + *buf, + [&counter, conn, buf, context](outcome::result res) { + ASSERT_OUTCOME_SUCCESS(res); + counter++; + if (counter >= kClients) { + context->stop(); + } + }); + }); }); ASSERT_TRUE(listener); @@ -183,15 +182,17 @@ TEST(TCP, SingleListenerCanAcceptManyClients) { EXPECT_TRUE(conn->isInitiator()); - libp2p::writeReturnSize( - conn, *buf, [conn, readback, buf, context](auto &&res) { + libp2p::write( + conn, + *buf, + [conn, readback, buf, context](outcome::result res) { ASSERT_OUTCOME_SUCCESS(res); - ASSERT_EQ(res.value(), buf->size()); - libp2p::readReturnSize( - conn, *readback, [conn, readback, buf, context](auto &&res) { + libp2p::read( + conn, + *readback, + [conn, readback, buf, context](outcome::result res) { context->stop(); ASSERT_OUTCOME_SUCCESS(res); - ASSERT_EQ(res.value(), readback->size()); ASSERT_EQ(*buf, *readback); }); }); @@ -315,11 +316,11 @@ TEST(TCP, OneTransportServerHandlesManyClients) { conn->readSome(*buf, [&counter, conn, buf](auto &&res) { ASSERT_OUTCOME_SUCCESS(res); - libp2p::writeReturnSize(conn, *buf, [&counter, buf, conn](auto &&res) { - ASSERT_OUTCOME_SUCCESS(res); - EXPECT_EQ(res.value(), buf->size()); - counter++; - }); + libp2p::write( + conn, *buf, [&counter, buf, conn](outcome::result res) { + ASSERT_OUTCOME_SUCCESS(res); + counter++; + }); }); }); @@ -341,16 +342,15 @@ TEST(TCP, OneTransportServerHandlesManyClients) { EXPECT_TRUE(conn->isInitiator()); - libp2p::writeReturnSize( - conn, *buf, [conn, kSize, readback, buf](auto &&res) { + libp2p::write( + conn, *buf, [conn, readback, buf](outcome::result res) { ASSERT_OUTCOME_SUCCESS(res); - ASSERT_EQ(res.value(), buf->size()); - libp2p::readReturnSize( - conn, *readback, [conn, readback, buf](auto &&res) { - ASSERT_OUTCOME_SUCCESS(res); - ASSERT_EQ(res.value(), readback->size()); - ASSERT_EQ(*buf, *readback); - }); + libp2p::read(conn, + *readback, + [conn, readback, buf](outcome::result res) { + ASSERT_OUTCOME_SUCCESS(res); + ASSERT_EQ(*buf, *readback); + }); }); }); diff --git a/test/mock/libp2p/connection/capable_connection_mock.hpp b/test/mock/libp2p/connection/capable_connection_mock.hpp index c5345a29d..bb25cd3a1 100644 --- a/test/mock/libp2p/connection/capable_connection_mock.hpp +++ b/test/mock/libp2p/connection/capable_connection_mock.hpp @@ -8,7 +8,6 @@ #include -#include #include namespace libp2p::connection { @@ -81,13 +80,11 @@ namespace libp2p::connection { return real_->remoteMultiaddr(); }; - void readSome(BytesOut in, - Reader::ReadCallbackFunc f) override { + void readSome(BytesOut in, Reader::ReadCallbackFunc f) override { return real_->readSome(in, f); }; - void writeSome(BytesIn in, - Writer::WriteCallbackFunc f) override { + void writeSome(BytesIn in, Writer::WriteCallbackFunc f) override { return real_->writeSome(in, f); } diff --git a/test/mock/libp2p/connection/layer_connection_mock.hpp b/test/mock/libp2p/connection/layer_connection_mock.hpp index 4473f7d59..6009ff46e 100644 --- a/test/mock/libp2p/connection/layer_connection_mock.hpp +++ b/test/mock/libp2p/connection/layer_connection_mock.hpp @@ -7,13 +7,14 @@ #pragma once #include -#include #include namespace libp2p::connection { - class LayerConnectionMock : public std::enable_shared_from_this, public virtual LayerConnection { + class LayerConnectionMock + : public std::enable_shared_from_this, + public virtual LayerConnection { public: ~LayerConnectionMock() override = default; diff --git a/test/mock/libp2p/connection/stream_mock.hpp b/test/mock/libp2p/connection/stream_mock.hpp index 0dcd61308..50cb42941 100644 --- a/test/mock/libp2p/connection/stream_mock.hpp +++ b/test/mock/libp2p/connection/stream_mock.hpp @@ -6,7 +6,6 @@ #pragma once -#include #include #include diff --git a/test/testutil/expect_read.hpp b/test/testutil/expect_read.hpp index ccf770a88..9a66fa3e5 100644 --- a/test/testutil/expect_read.hpp +++ b/test/testutil/expect_read.hpp @@ -11,22 +11,22 @@ #define EXPECT_CALL_READ(mock) \ EXPECT_CALL(mock, readSome(testing::_, testing::_)) -#define WILL_READ(_expected) \ - WillOnce([expected{qtils::asVec(_expected)}]( \ - libp2p::BytesOut out, \ - libp2p::basic::Reader::ReadCallbackFunc cb) { \ - ASSERT_GE(out.size(), expected.size()); \ - memcpy(out.data(), expected.data(), expected.size()); \ - cb(expected.size()); \ - }) +#define WILL_READ(_expected) \ + WillOnce( \ + [expected{qtils::asVec(_expected)}]( \ + libp2p::BytesOut out, libp2p::basic::Reader::ReadCallbackFunc cb) { \ + ASSERT_GE(out.size(), expected.size()); \ + memcpy(out.data(), expected.data(), expected.size()); \ + cb(expected.size()); \ + }) #define WILL_READ_SIZE(_expected) \ WillOnce([expected{_expected}](libp2p::BytesOut out, \ libp2p::basic::Reader::ReadCallbackFunc cb) { \ ASSERT_EQ(out.size(), expected); \ cb(expected); \ }) -#define WILL_READ_ERROR() \ - WillOnce([](libp2p::BytesOut out, \ - libp2p::basic::Reader::ReadCallbackFunc cb) { \ - cb(std::errc::io_error); \ - }) +#define WILL_READ_ERROR() \ + WillOnce( \ + [](libp2p::BytesOut out, libp2p::basic::Reader::ReadCallbackFunc cb) { \ + cb(std::errc::io_error); \ + }) diff --git a/test/testutil/expect_write.hpp b/test/testutil/expect_write.hpp index 9bd9a9860..cf4fe0233 100644 --- a/test/testutil/expect_write.hpp +++ b/test/testutil/expect_write.hpp @@ -11,13 +11,13 @@ #define EXPECT_CALL_WRITE(mock) \ EXPECT_CALL(mock, writeSome(testing::_, testing::_)) -#define WILL_WRITE(_expected) \ - WillOnce([expected{qtils::asVec(_expected)}]( \ - libp2p::BytesIn in, \ - libp2p::basic::Writer::WriteCallbackFunc cb) { \ - ASSERT_EQ(qtils::asVec(in), expected); \ - cb(in.size()); \ - }) +#define WILL_WRITE(_expected) \ + WillOnce( \ + [expected{qtils::asVec(_expected)}]( \ + libp2p::BytesIn in, libp2p::basic::Writer::WriteCallbackFunc cb) { \ + ASSERT_EQ(qtils::asVec(in), expected); \ + cb(in.size()); \ + }) #define WILL_WRITE_SIZE(_expected) \ WillOnce( \ [expected{_expected}](libp2p::BytesIn in, \ @@ -25,8 +25,8 @@ ASSERT_EQ(in.size(), expected); \ cb(expected); \ }) -#define WILL_WRITE_ERROR() \ - WillOnce([](libp2p::BytesIn in, \ - libp2p::basic::Writer::WriteCallbackFunc cb) { \ - cb(std::errc::io_error); \ - }) +#define WILL_WRITE_ERROR() \ + WillOnce( \ + [](libp2p::BytesIn in, libp2p::basic::Writer::WriteCallbackFunc cb) { \ + cb(std::errc::io_error); \ + })