Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions example/02-kademlia/rendezvous_chat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@

#include <boost/beast.hpp>

#include <libp2p/basic/write_return_size.hpp>
#include <libp2p/basic/write.hpp>
#include <libp2p/common/literals.hpp>
#include <libp2p/injector/kademlia_injector.hpp>
#include <libp2p/log/configurator.hpp>
#include <libp2p/log/sublogger.hpp>
#include <libp2p/multi/content_identifier_codec.hpp>
#include <qtils/bytestr.hpp>

using libp2p::common::operator""_unhex;

Expand Down Expand Up @@ -69,19 +70,18 @@ class Session : public std::enable_shared_from_this<Session> {
return false;
}

libp2p::writeReturnSize(
libp2p::write(
stream_,
*buffer,
[self = shared_from_this(), buffer](outcome::result<size_t> result) {
[self = shared_from_this(), buffer](outcome::result<void> result) {
if (not result) {
self->close();
std::cout << self->stream_->remotePeerId().value().toBase58()
<< " - closed at writting" << std::endl;
return;
}
std::cout << self->stream_->remotePeerId().value().toBase58() << " < "
<< std::string(buffer->begin(),
buffer->begin() + result.value());
<< qtils::byte2str(*buffer);
std::cout.flush();
});
return true;
Expand Down
3 changes: 2 additions & 1 deletion include/libp2p/basic/message_read_writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include <memory>

#include <libp2p/basic/cb.hpp>
#include <libp2p/basic/readwriter.hpp>

namespace libp2p::basic {
Expand Down Expand Up @@ -36,6 +37,6 @@ namespace libp2p::basic {
* @param cb is called when the message is written or an error happened.
* Quantity of bytes written is passed as an argument in case of success
*/
virtual void write(BytesIn buffer, Writer::WriteCallbackFunc cb) = 0;
virtual void write(BytesIn buffer, CbOutcomeVoid cb) = 0;
};
} // namespace libp2p::basic
2 changes: 1 addition & 1 deletion include/libp2p/basic/message_read_writer_bigendian.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ namespace libp2p::basic {
* @param buffer - the message to be written
* @param cb, which is called, when the message is read or error happens
*/
void write(BytesIn buffer, Writer::WriteCallbackFunc cb) override;
void write(BytesIn buffer, CbOutcomeVoid cb) override;

private:
std::shared_ptr<ReadWriter> conn_;
Expand Down
2 changes: 1 addition & 1 deletion include/libp2p/basic/message_read_writer_uvarint.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ namespace libp2p::basic {
* @param buffer - the message to be written
* @param cb, which is called, when the message is read or error happens
*/
void write(BytesIn buffer, Writer::WriteCallbackFunc cb) override;
void write(BytesIn buffer, CbOutcomeVoid cb) override;

private:
std::shared_ptr<ReadWriter> conn_;
Expand Down
2 changes: 1 addition & 1 deletion include/libp2p/basic/protobuf_message_read_writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ namespace libp2p::basic {
typename = std::enable_if_t<
std::is_default_constructible<ProtoMsgType>::value>>
void write(const ProtoMsgType &msg,
Writer::WriteCallbackFunc cb,
CbOutcomeVoid cb,
const std::shared_ptr<std::vector<uint8_t>> &bytes = nullptr) {
auto msg_bytes =
std::make_shared<std::vector<uint8_t>>(msg.ByteSize(), 0);
Expand Down
26 changes: 0 additions & 26 deletions include/libp2p/basic/read_return_size.hpp

This file was deleted.

25 changes: 0 additions & 25 deletions include/libp2p/basic/write_return_size.hpp

This file was deleted.

6 changes: 2 additions & 4 deletions include/libp2p/connection/as_asio_read_write.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ namespace libp2p {
boost::asio::detail::buffer_sequence_adapter<
boost::asio::mutable_buffer,
MutableBufferSequence>::first(buffers)};
impl->readSome(
asioBuffer(buffer), wrapCb(std::forward<Cb>(cb)));
impl->readSome(asioBuffer(buffer), wrapCb(std::forward<Cb>(cb)));
}

template <typename ConstBufferSequence, typename Cb>
Expand All @@ -77,8 +76,7 @@ namespace libp2p {
boost::asio::detail::buffer_sequence_adapter<
boost::asio::const_buffer,
ConstBufferSequence>::first(buffers)};
impl->writeSome(
asioBuffer(buffer), wrapCb(std::forward<Cb>(cb)));
impl->writeSome(asioBuffer(buffer), wrapCb(std::forward<Cb>(cb)));
}

std::shared_ptr<boost::asio::io_context> io;
Expand Down
5 changes: 3 additions & 2 deletions include/libp2p/muxer/mplex/mplexed_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <unordered_map>
#include <utility>

#include <libp2p/basic/cb.hpp>
#include <libp2p/connection/capable_connection.hpp>
#include <libp2p/log/logger.hpp>
#include <libp2p/muxer/mplex/mplex_stream.hpp>
Expand Down Expand Up @@ -74,7 +75,7 @@ namespace libp2p::connection {
private:
struct WriteData {
Bytes data;
WriteCallbackFunc cb;
CbOutcomeVoid cb;
};
std::queue<WriteData> write_queue_;
bool is_writing_ = false;
Expand All @@ -92,7 +93,7 @@ namespace libp2p::connection {
/**
* Called, when write is complete
*/
void onWriteCompleted(outcome::result<size_t> write_res);
void onWriteCompleted(outcome::result<void> write_res);

/**
* Read next frame from the connection
Expand Down
6 changes: 3 additions & 3 deletions include/libp2p/network/impl/listener_manager_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ namespace libp2p::network {
private:
bool started = false;

// clang-format off
std::unordered_map<multi::Multiaddress, std::shared_ptr<transport::TransportListener>> listeners_;
// clang-format on
std::unordered_map<multi::Multiaddress,
std::shared_ptr<transport::TransportListener>>
listeners_;

std::shared_ptr<protocol_muxer::ProtocolMuxer> multiselect_;
std::shared_ptr<network::Router> router_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ namespace libp2p::protocol {
* @param written_bytes - how much bytes were written
* @param stream with the other side
*/
void identifySent(outcome::result<size_t> written_bytes,
void identifySent(outcome::result<void> written_bytes,
const StreamSPtr &stream);

/**
Expand Down
4 changes: 2 additions & 2 deletions include/libp2p/protocol/ping/ping_client_session.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ namespace libp2p::protocol {
private:
void write();

void writeCompleted(outcome::result<size_t> r);
void writeCompleted(outcome::result<void> r);

void read();

void readCompleted(outcome::result<size_t> r);
void readCompleted(outcome::result<void> r);

void close();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ namespace libp2p::protocol_muxer::multiselect {
void send(Packet packet);

/// Called when write operation completes
void onDataWritten(outcome::result<size_t> res);
void onDataWritten(outcome::result<void> res);

/// Closes the negotiation session with result, returns instance to owner
void close(outcome::result<std::string> result);
Expand All @@ -69,7 +69,7 @@ namespace libp2p::protocol_muxer::multiselect {
void receive();

/// Called on read operations completion
void onDataRead(outcome::result<size_t> res);
void onDataRead(outcome::result<void> res);

/// Processes parsed messages, called from onDataRead
MaybeResult processMessages();
Expand Down
3 changes: 1 addition & 2 deletions include/libp2p/security/noise/handshake.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ namespace libp2p::security::noise {
outcome::result<std::vector<uint8_t>> generateHandshakePayload(
const DHKey &keypair);

void sendHandshakeMessage(BytesIn payload,
basic::Writer::WriteCallbackFunc cb);
void sendHandshakeMessage(BytesIn payload, CbOutcomeVoid cb);

void readHandshakeMessage(basic::MessageReadWriter::ReadCallbackFunc cb);

Expand Down
2 changes: 1 addition & 1 deletion include/libp2p/security/noise/insecure_rw.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ namespace libp2p::security::noise {
void read(ReadCallbackFunc cb) override;

/// write the given bytes to the network
void write(BytesIn buffer, basic::Writer::WriteCallbackFunc cb) override;
void write(BytesIn buffer, CbOutcomeVoid cb) override;

private:
std::shared_ptr<connection::LayerConnection> connection_;
Expand Down
60 changes: 28 additions & 32 deletions src/basic/message_read_writer_bigendian.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@
#include <arpa/inet.h>
#include <boost/assert.hpp>
#include <libp2p/basic/message_read_writer_error.hpp>
#include <libp2p/basic/read_return_size.hpp>
#include <libp2p/basic/write_return_size.hpp>
#include <libp2p/basic/read.hpp>
#include <libp2p/basic/write.hpp>
#include <libp2p/common/byteutil.hpp>
#include <libp2p/common/outcome_macro.hpp>

namespace libp2p::basic {
MessageReadWriterBigEndian::MessageReadWriterBigEndian(
Expand All @@ -25,29 +26,26 @@ namespace libp2p::basic {
void MessageReadWriterBigEndian::read(ReadCallbackFunc cb) {
auto buffer = std::make_shared<std::vector<uint8_t>>();
buffer->resize(kLenMarkerSize);
readReturnSize(
conn_,
*buffer,
[self{shared_from_this()}, buffer, cb{std::move(cb)}](auto &&result) {
if (not result) {
return cb(result.error());
}
uint32_t msg_len = ntohl( // NOLINT
common::convert<uint32_t>(buffer->data()));
buffer->resize(msg_len);
std::fill(buffer->begin(), buffer->end(), 0u);
readReturnSize(
self->conn_, *buffer, [self, buffer, cb](auto &&result) {
if (not result) {
return cb(result.error());
}
cb(buffer);
});
});
libp2p::read(conn_,
*buffer,
[self{shared_from_this()}, buffer, cb{std::move(cb)}](
outcome::result<void> result) {
IF_ERROR_CB_RETURN(result);
uint32_t msg_len = ntohl( // NOLINT
common::convert<uint32_t>(buffer->data()));
buffer->resize(msg_len);
std::fill(buffer->begin(), buffer->end(), 0u);
libp2p::read(
self->conn_,
*buffer,
[self, buffer, cb](outcome::result<void> result) {
IF_ERROR_CB_RETURN(result);
cb(buffer);
});
});
}

void MessageReadWriterBigEndian::write(BytesIn buffer,
Writer::WriteCallbackFunc cb) {
void MessageReadWriterBigEndian::write(BytesIn buffer, CbOutcomeVoid cb) {
if (buffer.empty()) {
// TODO(107): Reentrancy
return cb(MessageReadWriterError::BUFFER_IS_EMPTY);
Expand All @@ -57,14 +55,12 @@ namespace libp2p::basic {
raw_buf.reserve(kLenMarkerSize + buffer.size());
common::putUint32BE(raw_buf, buffer.size());
raw_buf.insert(raw_buf.end(), buffer.begin(), buffer.end());
writeReturnSize(
conn_,
raw_buf,
[self{shared_from_this()}, cb{std::move(cb)}](auto &&result) {
if (not result) {
return cb(result.error());
}
cb(result.value() - self->kLenMarkerSize);
});
libp2p::write(conn_,
raw_buf,
[self{shared_from_this()},
cb{std::move(cb)}](outcome::result<void> result) {
IF_ERROR_CB_RETURN(result);
cb(outcome::success());
});
}
} // namespace libp2p::basic
Loading
Loading