Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion example/02-kademlia/rendezvous_chat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ class Session : public std::enable_shared_from_this<Session> {

stream_->readSome(
*incoming_,
incoming_->size(),
[self = shared_from_this()](outcome::result<size_t> result) {
if (not result) {
self->close();
Expand Down
1 change: 0 additions & 1 deletion include/libp2p/basic/read.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ namespace libp2p {
// read some bytes
reader->readSome(
out,
out.size(),
[weak{std::weak_ptr{reader}}, out, cb{std::move(cb)}](
outcome::result<size_t> n_res) mutable {
if (n_res.has_error()) {
Expand Down
2 changes: 1 addition & 1 deletion include/libp2p/basic/reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ namespace libp2p::basic {
* pointer, or having buffer as part of some class/struct, and using
* enable_shared_from_this()
*/
virtual void readSome(BytesOut out, size_t bytes, ReadCallbackFunc cb) = 0;
virtual void readSome(BytesOut out, ReadCallbackFunc cb) = 0;

/**
* @brief Defers reporting result or error to callback to avoid reentrancy
Expand Down
20 changes: 0 additions & 20 deletions include/libp2p/common/ambigous_size.hpp

This file was deleted.

2 changes: 1 addition & 1 deletion include/libp2p/connection/as_asio_read_write.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ namespace libp2p {
boost::asio::mutable_buffer,
MutableBufferSequence>::first(buffers)};
impl->readSome(
asioBuffer(buffer), buffer.size(), wrapCb(std::forward<Cb>(cb)));
asioBuffer(buffer), wrapCb(std::forward<Cb>(cb)));
}

template <typename ConstBufferSequence, typename Cb>
Expand Down
2 changes: 1 addition & 1 deletion include/libp2p/connection/loopback_stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ namespace libp2p::connection {
const override;

protected:
void readSome(BytesOut out, size_t bytes, ReadCallbackFunc cb) override;
void readSome(BytesOut out, ReadCallbackFunc cb) override;

void writeSome(BytesIn in, WriteCallbackFunc cb) override;

Expand Down
2 changes: 1 addition & 1 deletion include/libp2p/layer/websocket/ssl_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ namespace libp2p::connection {
outcome::result<void> close() override;
bool isClosed() const override;

void readSome(BytesOut out, size_t bytes, ReadCallbackFunc cb) override;
void readSome(BytesOut out, ReadCallbackFunc cb) override;
void deferReadCallback(outcome::result<size_t> res,
ReadCallbackFunc cb) override;

Expand Down
2 changes: 1 addition & 1 deletion include/libp2p/layer/websocket/ws_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ namespace libp2p::connection {

bool isClosed() const override;

void readSome(BytesOut out, size_t bytes, ReadCallbackFunc cb) override;
void readSome(BytesOut out, ReadCallbackFunc cb) override;

void deferReadCallback(outcome::result<size_t> res,
ReadCallbackFunc cb) override;
Expand Down
2 changes: 1 addition & 1 deletion include/libp2p/muxer/mplex/mplex_stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ namespace libp2p::connection {

~MplexStream() override = default;

void readSome(BytesOut out, size_t bytes, ReadCallbackFunc cb) override;
void readSome(BytesOut out, ReadCallbackFunc cb) override;

void deferReadCallback(outcome::result<size_t> res,
ReadCallbackFunc cb) override;
Expand Down
2 changes: 1 addition & 1 deletion include/libp2p/muxer/mplex/mplexed_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ namespace libp2p::connection {

/// usage of these four methods is highly not recommended or even forbidden:
/// use stream over this connection instead
void readSome(BytesOut out, size_t bytes, ReadCallbackFunc cb) override;
void readSome(BytesOut out, ReadCallbackFunc cb) override;
void writeSome(BytesIn in, WriteCallbackFunc cb) override;

void deferReadCallback(outcome::result<size_t> res,
Expand Down
2 changes: 1 addition & 1 deletion include/libp2p/muxer/yamux/yamux_stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ namespace libp2p::connection {
size_t maximum_window_size,
size_t write_queue_limit);

void readSome(BytesOut out, size_t bytes, ReadCallbackFunc cb) override;
void readSome(BytesOut out, ReadCallbackFunc cb) override;

void deferReadCallback(outcome::result<size_t> res,
ReadCallbackFunc cb) override;
Expand Down
2 changes: 1 addition & 1 deletion include/libp2p/muxer/yamux/yamuxed_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ namespace libp2p::connection {

/// usage of these four methods is highly not recommended or even forbidden:
/// use stream over this connection instead
void readSome(BytesOut out, size_t bytes, ReadCallbackFunc cb) override;
void readSome(BytesOut out, ReadCallbackFunc cb) override;
void writeSome(BytesIn in, WriteCallbackFunc cb) override;

/// Initiates async readSome on connection
Expand Down
2 changes: 1 addition & 1 deletion include/libp2p/security/noise/noise_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ namespace libp2p::connection {

outcome::result<void> close() override;

void readSome(BytesOut out, size_t bytes, ReadCallbackFunc cb) override;
void readSome(BytesOut out, ReadCallbackFunc cb) override;

void deferReadCallback(outcome::result<size_t> res,
ReadCallbackFunc cb) override;
Expand Down
2 changes: 1 addition & 1 deletion include/libp2p/security/plaintext/plaintext_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ namespace libp2p::connection {

outcome::result<multi::Multiaddress> remoteMultiaddr() override;

void readSome(BytesOut out, size_t bytes, ReadCallbackFunc cb) override;
void readSome(BytesOut out, ReadCallbackFunc cb) override;

void deferReadCallback(outcome::result<size_t> res,
ReadCallbackFunc cb) override;
Expand Down
2 changes: 1 addition & 1 deletion include/libp2p/security/secio/secio_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ namespace libp2p::connection {

outcome::result<multi::Multiaddress> remoteMultiaddr() override;

void readSome(BytesOut out, size_t bytes, ReadCallbackFunc cb) override;
void readSome(BytesOut out, ReadCallbackFunc cb) override;

void deferReadCallback(outcome::result<size_t> res,
ReadCallbackFunc cb) override;
Expand Down
2 changes: 1 addition & 1 deletion include/libp2p/transport/quic/connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ namespace libp2p::transport {
void operator=(QuicConnection &&) = delete;

// Reader
void readSome(BytesOut out, size_t bytes, ReadCallbackFunc cb) override;
void readSome(BytesOut out, ReadCallbackFunc cb) override;
void deferReadCallback(outcome::result<size_t> res,
ReadCallbackFunc cb) override;

Expand Down
2 changes: 1 addition & 1 deletion include/libp2p/transport/quic/stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ namespace libp2p::connection {
void operator=(QuicStream &&) = delete;

// Reader
void readSome(BytesOut out, size_t bytes, ReadCallbackFunc cb) override;
void readSome(BytesOut out, ReadCallbackFunc cb) override;
void deferReadCallback(outcome::result<size_t> res,
ReadCallbackFunc cb) override;

Expand Down
2 changes: 1 addition & 1 deletion include/libp2p/transport/tcp/tcp_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ namespace libp2p::transport {
ConnectCallbackFunc cb,
std::chrono::milliseconds timeout);

void readSome(BytesOut out, size_t bytes, ReadCallbackFunc cb) override;
void readSome(BytesOut out, ReadCallbackFunc cb) override;

void deferReadCallback(outcome::result<size_t> res,
ReadCallbackFunc cb) override;
Expand Down
9 changes: 3 additions & 6 deletions src/connection/loopback_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
#include <libp2p/connection/loopback_stream.hpp>

#include <libp2p/basic/read_return_size.hpp>
#include <libp2p/common/ambigous_size.hpp>

namespace libp2p::connection {

Expand Down Expand Up @@ -109,32 +108,30 @@ namespace libp2p::connection {
}

void LoopbackStream::readSome(BytesOut out,
size_t bytes,
libp2p::basic::Reader::ReadCallbackFunc cb) {
if (is_reset_) {
return deferReadCallback(Error::STREAM_RESET_BY_HOST, std::move(cb));
}
if (!is_readable_) {
return deferReadCallback(Error::STREAM_NOT_READABLE, std::move(cb));
}
if (bytes == 0 || out.empty() || static_cast<size_t>(out.size()) < bytes) {
if (out.empty()) {
return cb(Error::STREAM_INVALID_ARGUMENT);
}

// this lambda checks, if there's enough data in our read buffer, and gives
// it to the caller, if so
auto read_lambda = [self{shared_from_this()},
cb{std::move(cb)},
out,
bytes](outcome::result<size_t> res) mutable {
out](outcome::result<size_t> res) mutable {
if (!res) {
self->data_notified_ = true;
self->deferReadCallback(res.as_failure(), std::move(cb));
return;
}

if (self->buffer_.size() > 0) {
auto to_read = std::min(self->buffer_.size(), bytes);
auto to_read = std::min(self->buffer_.size(), out.size());

if (boost::asio::buffer_copy(boost::asio::buffer(out.data(), to_read),
self->buffer_.data(),
Expand Down
3 changes: 0 additions & 3 deletions src/layer/websocket/ssl_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
#include <libp2p/layer/websocket/ssl_connection.hpp>

#include <libp2p/basic/read_return_size.hpp>
#include <libp2p/common/ambigous_size.hpp>
#include <libp2p/common/asio_buffer.hpp>
#include <libp2p/common/asio_cb.hpp>

Expand Down Expand Up @@ -44,9 +43,7 @@ namespace libp2p::connection {
}

void SslConnection::readSome(BytesOut out,
size_t bytes,
libp2p::basic::Reader::ReadCallbackFunc cb) {
ambigousSize(out, bytes);
ssl_.async_read_some(asioBuffer(out), toAsioCbSize(std::move(cb)));
}

Expand Down
7 changes: 2 additions & 5 deletions src/layer/websocket/ws_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
#include <libp2p/layer/websocket/ws_connection.hpp>

#include <libp2p/basic/read_return_size.hpp>
#include <libp2p/common/ambigous_size.hpp>
#include <libp2p/common/asio_buffer.hpp>
#include <libp2p/common/asio_cb.hpp>
#include <libp2p/common/bytestr.hpp>
Expand Down Expand Up @@ -86,18 +85,16 @@ namespace libp2p::connection {
}

void WsConnection::readSome(BytesOut out,
size_t bytes,
libp2p::basic::Reader::ReadCallbackFunc cb) {
ambigousSize(out, bytes);
SL_TRACE(log_, "read some upto {} bytes", bytes);
SL_TRACE(log_, "read some upto {} bytes", out.size());
auto on_read = [weak{weak_from_this()}, out, cb{std::move(cb)}](
boost::system::error_code ec, size_t n) mutable {
if (ec) {
cb(ec);
} else if (n != 0) {
cb(n);
} else if (auto self = weak.lock()) {
self->readSome(out, out.size(), std::move(cb));
self->readSome(out, std::move(cb));
} else {
cb(boost::system::errc::broken_pipe);
}
Expand Down
4 changes: 1 addition & 3 deletions src/muxer/mplex/mplex_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
#include <boost/container_hash/hash.hpp>
#include <libp2p/basic/read_return_size.hpp>
#include <libp2p/basic/write_return_size.hpp>
#include <libp2p/common/ambigous_size.hpp>
#include <libp2p/muxer/mplex/mplexed_connection.hpp>

#define TRY_GET_CONNECTION(tmp) \
Expand Down Expand Up @@ -59,8 +58,7 @@ namespace libp2p::connection {
return true;
}

void MplexStream::readSome(BytesOut out, size_t bytes, ReadCallbackFunc cb) {
ambigousSize(out, bytes);
void MplexStream::readSome(BytesOut out, ReadCallbackFunc cb) {
if (is_reset_) {
return cb(Error::STREAM_RESET_BY_PEER);
}
Expand Down
3 changes: 1 addition & 2 deletions src/muxer/mplex/mplexed_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,8 @@ namespace libp2p::connection {
}

void MplexedConnection::readSome(BytesOut out,
size_t bytes,
ReadCallbackFunc cb) {
connection_->readSome(out, bytes, std::move(cb));
connection_->readSome(out, std::move(cb));
}

void MplexedConnection::writeSome(BytesIn in,
Expand Down
4 changes: 1 addition & 3 deletions src/muxer/yamux/yamux_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
#include <cassert>

#include <libp2p/basic/read_return_size.hpp>
#include <libp2p/common/ambigous_size.hpp>
#include <libp2p/muxer/yamux/yamux_frame.hpp>
#include <qtils/option_take.hpp>

Expand Down Expand Up @@ -67,8 +66,7 @@ namespace libp2p::connection {
assert(write_queue_limit >= maximum_window_size_);
}

void YamuxStream::readSome(BytesOut out, size_t bytes, ReadCallbackFunc cb) {
ambigousSize(out, bytes);
void YamuxStream::readSome(BytesOut out, ReadCallbackFunc cb) {
doRead(out, std::move(cb));
}

Expand Down
2 changes: 0 additions & 2 deletions src/muxer/yamux/yamuxed_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,6 @@ namespace libp2p::connection {
}

void YamuxedConnection::readSome(BytesOut out,
size_t bytes,
ReadCallbackFunc cb) {
log()->error("YamuxedConnection::readSome : invalid direct call");
deferReadCallback(Error::CONNECTION_DIRECT_IO_FORBIDDEN, std::move(cb));
Expand All @@ -189,7 +188,6 @@ namespace libp2p::connection {
void YamuxedConnection::continueReading() {
SL_TRACE(log(), "YamuxedConnection::continueReading");
connection_->readSome(*raw_read_buffer_,
raw_read_buffer_->size(),
[wptr = weak_from_this(), buffer = raw_read_buffer_](
outcome::result<size_t> res) {
auto self = wptr.lock();
Expand Down
2 changes: 1 addition & 1 deletion src/protocol/echo/client_echo_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ namespace libp2p::protocol {
completed();
}

stream_->readSome(span, span.size(), [self](outcome::result<size_t> rr) {
stream_->readSome(span, [self](outcome::result<size_t> rr) {
if (!rr && !self->ec_) {
self->ec_ = rr.error();
return self->completed();
Expand Down
1 change: 0 additions & 1 deletion src/protocol/echo/server_echo_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ namespace libp2p::protocol {

stream_->readSome(
buf_,
buf_.size(),
[self{shared_from_this()}](outcome::result<size_t> rread) {
self->onRead(rread);
});
Expand Down
5 changes: 1 addition & 4 deletions src/security/noise/noise_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
#include <libp2p/security/noise/noise_connection.hpp>

#include <libp2p/basic/read_return_size.hpp>
#include <libp2p/common/ambigous_size.hpp>
#include <libp2p/common/outcome_macro.hpp>
#include <libp2p/crypto/x25519_provider/x25519_provider_impl.hpp>
#include <libp2p/security/noise/crypto/interfaces.hpp>
Expand Down Expand Up @@ -47,9 +46,7 @@ namespace libp2p::connection {
}

void NoiseConnection::readSome(BytesOut out,
size_t bytes,
libp2p::basic::Reader::ReadCallbackFunc cb) {
ambigousSize(out, bytes);
if (not frame_buffer_->empty()) {
auto n{std::min(out.size(), frame_buffer_->size())};
auto begin{frame_buffer_->begin()};
Expand All @@ -65,7 +62,7 @@ namespace libp2p::connection {
auto decrypted =
IF_ERROR_CB_RETURN(self->decoder_cs_->decrypt({}, *data, {}));
self->frame_buffer_->assign(decrypted.begin(), decrypted.end());
self->readSome(out, out.size(), std::move(cb));
self->readSome(out, std::move(cb));
});
}

Expand Down
3 changes: 1 addition & 2 deletions src/security/plaintext/plaintext_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,8 @@ namespace libp2p::connection {
}

void PlaintextConnection::readSome(BytesOut in,
size_t bytes,
Reader::ReadCallbackFunc f) {
return original_connection_->readSome(in, bytes, std::move(f));
return original_connection_->readSome(in, std::move(f));
};

void PlaintextConnection::writeSome(BytesIn in,
Expand Down
Loading
Loading