Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion include/libp2p/basic/write.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ namespace libp2p {
// write some bytes
writer->writeSome(
in,
in.size(),
[weak{std::weak_ptr{writer}}, in, cb{std::move(cb)}](
outcome::result<size_t> n_res) mutable {
if (n_res.has_error()) {
Expand Down
2 changes: 1 addition & 1 deletion include/libp2p/basic/writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ namespace libp2p::basic {
* pointer, or having buffer as part of some class/struct, and using
* enable_shared_from_this()
*/
virtual void writeSome(BytesIn in, size_t bytes, WriteCallbackFunc cb) = 0;
virtual void writeSome(BytesIn in, WriteCallbackFunc cb) = 0;

/**
* @brief Defers reporting error state to callback to avoid reentrancy
Expand Down
2 changes: 1 addition & 1 deletion include/libp2p/connection/as_asio_read_write.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ namespace libp2p {
boost::asio::const_buffer,
ConstBufferSequence>::first(buffers)};
impl->writeSome(
asioBuffer(buffer), buffer.size(), wrapCb(std::forward<Cb>(cb)));
asioBuffer(buffer), wrapCb(std::forward<Cb>(cb)));
}

std::shared_ptr<boost::asio::io_context> io;
Expand Down
2 changes: 1 addition & 1 deletion include/libp2p/connection/loopback_stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ namespace libp2p::connection {
protected:
void readSome(BytesOut out, size_t bytes, ReadCallbackFunc cb) override;

void writeSome(BytesIn in, size_t bytes, WriteCallbackFunc cb) override;
void writeSome(BytesIn in, WriteCallbackFunc cb) override;

void deferReadCallback(outcome::result<size_t> res,
ReadCallbackFunc cb) override;
Expand Down
2 changes: 1 addition & 1 deletion include/libp2p/layer/websocket/ssl_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ namespace libp2p::connection {
void deferReadCallback(outcome::result<size_t> res,
ReadCallbackFunc cb) override;

void writeSome(BytesIn in, size_t bytes, WriteCallbackFunc cb) override;
void writeSome(BytesIn in, WriteCallbackFunc cb) override;
void deferWriteCallback(std::error_code ec, WriteCallbackFunc cb) override;

private:
Expand Down
2 changes: 1 addition & 1 deletion include/libp2p/layer/websocket/ws_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ namespace libp2p::connection {
void deferReadCallback(outcome::result<size_t> res,
ReadCallbackFunc cb) override;

void writeSome(BytesIn in, size_t bytes, WriteCallbackFunc cb) override;
void writeSome(BytesIn in, WriteCallbackFunc cb) override;

void deferWriteCallback(std::error_code ec, WriteCallbackFunc cb) override;

Expand Down
2 changes: 1 addition & 1 deletion include/libp2p/muxer/mplex/mplex_stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ namespace libp2p::connection {
void deferReadCallback(outcome::result<size_t> res,
ReadCallbackFunc cb) override;

void writeSome(BytesIn in, size_t bytes, WriteCallbackFunc cb) override;
void writeSome(BytesIn in, WriteCallbackFunc cb) override;

void deferWriteCallback(std::error_code ec, WriteCallbackFunc cb) override;

Expand Down
2 changes: 1 addition & 1 deletion include/libp2p/muxer/mplex/mplexed_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ namespace libp2p::connection {
/// usage of these four methods is highly not recommended or even forbidden:
/// use stream over this connection instead
void readSome(BytesOut out, size_t bytes, ReadCallbackFunc cb) override;
void writeSome(BytesIn in, size_t bytes, WriteCallbackFunc cb) override;
void writeSome(BytesIn in, WriteCallbackFunc cb) override;

void deferReadCallback(outcome::result<size_t> res,
ReadCallbackFunc cb) override;
Expand Down
2 changes: 1 addition & 1 deletion include/libp2p/muxer/yamux/yamux_stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ namespace libp2p::connection {
void deferReadCallback(outcome::result<size_t> res,
ReadCallbackFunc cb) override;

void writeSome(BytesIn in, size_t bytes, WriteCallbackFunc cb) override;
void writeSome(BytesIn in, WriteCallbackFunc cb) override;

void deferWriteCallback(std::error_code ec, WriteCallbackFunc cb) override;

Expand Down
2 changes: 1 addition & 1 deletion include/libp2p/muxer/yamux/yamuxed_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ namespace libp2p::connection {
/// usage of these four methods is highly not recommended or even forbidden:
/// use stream over this connection instead
void readSome(BytesOut out, size_t bytes, ReadCallbackFunc cb) override;
void writeSome(BytesIn in, size_t bytes, WriteCallbackFunc cb) override;
void writeSome(BytesIn in, WriteCallbackFunc cb) override;

/// Initiates async readSome on connection
void continueReading();
Expand Down
2 changes: 1 addition & 1 deletion include/libp2p/security/noise/noise_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ namespace libp2p::connection {
void deferReadCallback(outcome::result<size_t> res,
ReadCallbackFunc cb) override;

void writeSome(BytesIn in, size_t bytes, WriteCallbackFunc cb) override;
void writeSome(BytesIn in, WriteCallbackFunc cb) override;

void deferWriteCallback(std::error_code ec, WriteCallbackFunc cb) override;

Expand Down
2 changes: 1 addition & 1 deletion include/libp2p/security/plaintext/plaintext_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ namespace libp2p::connection {
void deferReadCallback(outcome::result<size_t> res,
ReadCallbackFunc cb) override;

void writeSome(BytesIn in, size_t bytes, WriteCallbackFunc cb) override;
void writeSome(BytesIn in, WriteCallbackFunc cb) override;

void deferWriteCallback(std::error_code ec, WriteCallbackFunc cb) override;

Expand Down
2 changes: 1 addition & 1 deletion include/libp2p/security/secio/secio_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ namespace libp2p::connection {
void deferReadCallback(outcome::result<size_t> res,
ReadCallbackFunc cb) override;

void writeSome(BytesIn in, size_t bytes, WriteCallbackFunc cb) override;
void writeSome(BytesIn in, WriteCallbackFunc cb) override;

void deferWriteCallback(std::error_code ec, WriteCallbackFunc cb) override;

Expand Down
2 changes: 1 addition & 1 deletion include/libp2p/transport/quic/connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ namespace libp2p::transport {
ReadCallbackFunc cb) override;

// Writer
void writeSome(BytesIn in, size_t bytes, WriteCallbackFunc cb) override;
void writeSome(BytesIn in, WriteCallbackFunc cb) override;
void deferWriteCallback(std::error_code ec, WriteCallbackFunc cb) override;

// Closeable
Expand Down
2 changes: 1 addition & 1 deletion include/libp2p/transport/quic/stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ namespace libp2p::connection {
ReadCallbackFunc cb) override;

// Writer
void writeSome(BytesIn in, size_t bytes, WriteCallbackFunc cb) override;
void writeSome(BytesIn in, WriteCallbackFunc cb) override;
void deferWriteCallback(std::error_code ec, WriteCallbackFunc cb) override;

// Stream
Expand Down
2 changes: 1 addition & 1 deletion include/libp2p/transport/tcp/tcp_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ namespace libp2p::transport {
void deferReadCallback(outcome::result<size_t> res,
ReadCallbackFunc cb) override;

void writeSome(BytesIn in, size_t bytes, WriteCallbackFunc cb) override;
void writeSome(BytesIn in, WriteCallbackFunc cb) override;

void deferWriteCallback(std::error_code ec, WriteCallbackFunc cb) override;

Expand Down
13 changes: 6 additions & 7 deletions src/connection/loopback_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,28 +78,27 @@ namespace libp2p::connection {
}

void LoopbackStream::writeSome(BytesIn in,
size_t bytes,
libp2p::basic::Writer::WriteCallbackFunc cb) {
if (is_reset_) {
return deferWriteCallback(Error::STREAM_RESET_BY_HOST, std::move(cb));
}
if (!is_writable_) {
return deferWriteCallback(Error::STREAM_NOT_WRITABLE, std::move(cb));
}
if (bytes == 0 || in.empty() || static_cast<size_t>(in.size()) < bytes) {
if (in.empty()) {
return deferWriteCallback(Error::STREAM_INVALID_ARGUMENT, std::move(cb));
}

if (boost::asio::buffer_copy(buffer_.prepare(bytes),
boost::asio::const_buffer(in.data(), bytes))
!= bytes) {
if (boost::asio::buffer_copy(buffer_.prepare(in.size()),
boost::asio::const_buffer(in.data(), in.size()))
!= in.size()) {
return deferWriteCallback(Error::STREAM_INTERNAL_ERROR, std::move(cb));
}

buffer_.commit(bytes);
buffer_.commit(in.size());

// intentionally used deferReadCallback, since it acquires bytes written
deferReadCallback(bytes, std::move(cb));
deferReadCallback(in.size(), std::move(cb));
/* The whole approach with such methods (deferReadCallback and
* deferWriteCallback) is going to be avoided in the near future, thus we do
* not remove from the source code the counting of bytes written */
Expand Down
2 changes: 0 additions & 2 deletions src/layer/websocket/ssl_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,7 @@ namespace libp2p::connection {
}

void SslConnection::writeSome(BytesIn in,
size_t bytes,
libp2p::basic::Writer::WriteCallbackFunc cb) {
ambigousSize(in, bytes);
ssl_.async_write_some(asioBuffer(in), toAsioCbSize(std::move(cb)));
}

Expand Down
4 changes: 1 addition & 3 deletions src/layer/websocket/ws_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,8 @@ namespace libp2p::connection {
}

void WsConnection::writeSome(BytesIn in, //
size_t bytes, //
libp2p::basic::Writer::WriteCallbackFunc cb) {
ambigousSize(in, bytes);
SL_TRACE(log_, "write some upto {} bytes", bytes);
SL_TRACE(log_, "write some upto {} bytes", in.size());
ws_.async_write_some(true, asioBuffer(in), toAsioCbSize(std::move(cb)));
}

Expand Down
5 changes: 2 additions & 3 deletions src/muxer/mplex/mplex_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,7 @@ namespace libp2p::connection {
}
}

void MplexStream::writeSome(BytesIn in, size_t bytes, WriteCallbackFunc cb) {
ambigousSize(in, bytes);
void MplexStream::writeSome(BytesIn in, WriteCallbackFunc cb) {
// TODO(107): Reentrancy

if (is_reset_) {
Expand All @@ -111,7 +110,7 @@ namespace libp2p::connection {
connection_.lock()->streamWrite(
stream_id_,
in,
bytes,
in.size(),
[self{shared_from_this()}, cb{std::move(cb)}](auto &&write_res) {
self->is_writing_ = false;
if (!write_res) {
Expand Down
3 changes: 1 addition & 2 deletions src/muxer/mplex/mplexed_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,8 @@ namespace libp2p::connection {
}

void MplexedConnection::writeSome(BytesIn in,
size_t bytes,
WriteCallbackFunc cb) {
connection_->writeSome(in, bytes, std::move(cb));
connection_->writeSome(in, std::move(cb));
}

void MplexedConnection::deferReadCallback(outcome::result<size_t> res,
Expand Down
3 changes: 1 addition & 2 deletions src/muxer/yamux/yamux_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,7 @@ namespace libp2p::connection {
feedback_.deferCall([res, cb{std::move(cb)}] { cb(res); });
}

void YamuxStream::writeSome(BytesIn in, size_t bytes, WriteCallbackFunc cb) {
ambigousSize(in, bytes);
void YamuxStream::writeSome(BytesIn in, WriteCallbackFunc cb) {
doWrite(in, std::move(cb));
}

Expand Down
1 change: 0 additions & 1 deletion src/muxer/yamux/yamuxed_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,6 @@ namespace libp2p::connection {
}

void YamuxedConnection::writeSome(BytesIn in,
size_t bytes,
WriteCallbackFunc cb) {
log()->error("YamuxedConnection::writeSome : invalid direct call");
deferWriteCallback(Error::CONNECTION_DIRECT_IO_FORBIDDEN, std::move(cb));
Expand Down
2 changes: 0 additions & 2 deletions src/security/noise/noise_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,7 @@ namespace libp2p::connection {
}

void NoiseConnection::writeSome(BytesIn in,
size_t bytes,
basic::Writer::WriteCallbackFunc cb) {
ambigousSize(in, bytes);
if (in.empty()) {
cb(in.size());
return;
Expand Down
3 changes: 1 addition & 2 deletions src/security/plaintext/plaintext_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,8 @@ namespace libp2p::connection {
};

void PlaintextConnection::writeSome(BytesIn in,
size_t bytes,
Writer::WriteCallbackFunc f) {
return original_connection_->writeSome(in, bytes, std::move(f));
return original_connection_->writeSome(in, std::move(f));
}

void PlaintextConnection::deferReadCallback(outcome::result<size_t> res,
Expand Down
2 changes: 0 additions & 2 deletions src/security/secio/secio_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,7 @@ namespace libp2p::connection {
}

void SecioConnection::writeSome(BytesIn in,
size_t bytes,
basic::Writer::WriteCallbackFunc cb) {
ambigousSize(in, bytes);
// TODO(107): Reentrancy

if (!isInitialized()) {
Expand Down
4 changes: 1 addition & 3 deletions src/security/tls/tls_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,8 @@ namespace libp2p::connection {
}

void TlsConnection::writeSome(BytesIn in,
size_t bytes,
Writer::WriteCallbackFunc cb) {
ambigousSize(in, bytes);
SL_TRACE(log(), "writing some up to {} bytes", bytes);
SL_TRACE(log(), "writing some up to {} bytes", in.size());
socket_.async_write_some(asioBuffer(in),
closeOnError(*this, std::move(cb)));
}
Expand Down
2 changes: 1 addition & 1 deletion src/security/tls/tls_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ namespace libp2p::connection {
ReadCallbackFunc cb) override;

/// Async writes up to the # of bytes given
void writeSome(BytesIn in, size_t bytes, WriteCallbackFunc cb) override;
void writeSome(BytesIn in, WriteCallbackFunc cb) override;

/// Defers error callback to avoid reentrancy in async calls
void deferWriteCallback(std::error_code ec, ReadCallbackFunc cb) override;
Expand Down
1 change: 0 additions & 1 deletion src/transport/quic/connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ namespace libp2p::transport {
}

void QuicConnection::writeSome(BytesIn in,
size_t bytes,
WriteCallbackFunc cb) {
throw std::logic_error{"QuicConnection::writeSome must not be called"};
}
Expand Down
2 changes: 0 additions & 2 deletions src/transport/quic/stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,7 @@ namespace libp2p::connection {
}

void QuicStream::writeSome(BytesIn in,
size_t bytes,
basic::Writer::WriteCallbackFunc cb) {
ambigousSize(in, bytes);
outcome::result<size_t> r = QuicError::STREAM_CLOSED;
if (not stream_ctx_) {
return cb(r);
Expand Down
6 changes: 2 additions & 4 deletions src/transport/tcp/tcp_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,11 +183,9 @@ namespace libp2p::transport {
}

void TcpConnection::writeSome(BytesIn in,
size_t bytes,
TcpConnection::WriteCallbackFunc cb) {
ByteCounter::getInstance().incrementBytesWritten(bytes);
ambigousSize(in, bytes);
TRACE("{} write some up to {}", debug_str_, bytes);
ByteCounter::getInstance().incrementBytesWritten(in.size());
TRACE("{} write some up to {}", debug_str_, in.size());
socket_.async_write_some(asioBuffer(in),
closeOnError(*this, std::move(cb)));
}
Expand Down
5 changes: 2 additions & 3 deletions test/libp2p/protocol/identify_delta_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "mock/libp2p/peer/peer_repository_mock.hpp"
#include "mock/libp2p/peer/protocol_repository_mock.hpp"
#include "testutil/expect_read.hpp"
#include "testutil/expect_write.hpp"
#include "testutil/gmock_actions.hpp"
#include "testutil/prepare_loggers.hpp"

Expand Down Expand Up @@ -134,9 +135,7 @@ TEST_F(IdentifyDeltaTest, Send) {
actual.begin(), actual.end(), expected.begin(), expected.end());
};

EXPECT_CALL(*stream_,
writeSome(Truly(if_added), msg_added_protos_bytes_.size(), _))
.WillOnce(InvokeArgument<2>(outcome::success()));
EXPECT_CALL_WRITE(*stream_).WILL_WRITE(msg_added_protos_bytes_);

id_delta_->start();
bus_.getChannel<event::network::ProtocolsAddedChannel>().publish(
Expand Down
2 changes: 1 addition & 1 deletion test/mock/libp2p/basic/read_writer_mock.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,6 @@ namespace libp2p::basic {
struct ReadWriterMock : public ReadWriter {
MOCK_METHOD3(readSome, void(BytesOut, size_t, Reader::ReadCallbackFunc));

MOCK_METHOD3(writeSome, void(BytesIn, size_t, Writer::WriteCallbackFunc));
MOCK_METHOD2(writeSome, void(BytesIn, Writer::WriteCallbackFunc));
};
} // namespace libp2p::basic
5 changes: 2 additions & 3 deletions test/mock/libp2p/connection/capable_connection_mock.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ namespace libp2p::connection {
MOCK_CONST_METHOD0(isClosed, bool(void));
MOCK_METHOD0(close, outcome::result<void>());
MOCK_METHOD3(readSome, void(BytesOut, size_t, Reader::ReadCallbackFunc));
MOCK_METHOD3(writeSome, void(BytesIn, size_t, Writer::WriteCallbackFunc));
MOCK_METHOD2(writeSome, void(BytesIn, Writer::WriteCallbackFunc));
MOCK_METHOD2(deferReadCallback,
void(outcome::result<size_t>, Reader::ReadCallbackFunc));
MOCK_METHOD2(deferWriteCallback,
Expand Down Expand Up @@ -88,9 +88,8 @@ namespace libp2p::connection {
};

void writeSome(BytesIn in,
size_t bytes,
Writer::WriteCallbackFunc f) override {
return real_->writeSome(in, bytes, f);
return real_->writeSome(in, f);
}

bool isClosed() const override {
Expand Down
2 changes: 1 addition & 1 deletion test/mock/libp2p/connection/layer_connection_mock.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ namespace libp2p::connection {
MOCK_METHOD0(close, outcome::result<void>());

MOCK_METHOD3(readSome, void(BytesOut, size_t, Reader::ReadCallbackFunc));
MOCK_METHOD3(writeSome, void(BytesIn, size_t, Writer::WriteCallbackFunc));
MOCK_METHOD2(writeSome, void(BytesIn, Writer::WriteCallbackFunc));
MOCK_METHOD2(deferReadCallback,
void(outcome::result<size_t>, Reader::ReadCallbackFunc));
MOCK_METHOD2(deferWriteCallback,
Expand Down
2 changes: 1 addition & 1 deletion test/mock/libp2p/connection/secure_connection_mock.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ namespace libp2p::connection {
MOCK_METHOD0(close, outcome::result<void>(void));

MOCK_METHOD3(readSome, void(BytesOut, size_t, Reader::ReadCallbackFunc));
MOCK_METHOD3(writeSome, void(BytesIn, size_t, Writer::WriteCallbackFunc));
MOCK_METHOD2(writeSome, void(BytesIn, Writer::WriteCallbackFunc));

MOCK_METHOD2(deferReadCallback,
void(outcome::result<size_t>, Reader::ReadCallbackFunc));
Expand Down
Loading
Loading