Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions include/libp2p/basic/cb.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/**
* Copyright Quadrivium LLC
* All Rights Reserved
* SPDX-License-Identifier: Apache-2.0
*/

#pragma once

#include <qtils/outcome.hpp>

namespace libp2p {
using CbOutcomeVoid = std::function<void(outcome::result<void>)>;
} // namespace libp2p
19 changes: 19 additions & 0 deletions include/libp2p/common/outcome_macro.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/**
* Copyright Quadrivium LLC
* All Rights Reserved
* SPDX-License-Identifier: Apache-2.0
*/

#pragma once

#include <qtils/macro/common.hpp>

#define _IF_ERROR_CB_RETURN(tmp, r) \
Comment thread
turuslan marked this conversation as resolved.
({ \
auto &&_r = r; \
if (not _r.has_value()) { \
return cb(_r.error()); \
} \
_r.value(); \
})
#define IF_ERROR_CB_RETURN(r) _IF_ERROR_CB_RETURN(QTILS_UNIQUE_NAME(tmp), r)
4 changes: 1 addition & 3 deletions include/libp2p/muxer/mplex/mplex_stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ namespace libp2p::connection {
private:
struct Reading {
BytesOut out;
size_t bytes;
ReadCallbackFunc cb;
};

Expand All @@ -101,8 +100,7 @@ namespace libp2p::connection {
boost::optional<Reading> reading_;

/// Queue of write requests that were received when stream was writing
std::deque<std::tuple<std::vector<uint8_t>, size_t, WriteCallbackFunc>>
write_queue_{};
std::deque<std::tuple<Bytes, WriteCallbackFunc>> write_queue_{};

mutable std::mutex write_queue_mutex_;

Expand Down
26 changes: 9 additions & 17 deletions include/libp2p/muxer/yamux/yamux_stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,21 +112,22 @@ namespace libp2p::connection {
void closedByConnection(std::error_code ec);

private:
struct Reading {
BytesOut out;
ReadCallbackFunc cb;
};

/// Performs close-related cleanup and notifications
void doClose(std::error_code ec, bool notify_read_side);
void doClose(std::error_code ec);

/// Called by read*() functions
void doRead(BytesOut out, size_t bytes, ReadCallbackFunc cb);

/// Completes the read operation if any, clears read state
[[nodiscard]] std::pair<ReadCallbackFunc, outcome::result<size_t>>
readCompleted();
void doRead(BytesOut out, ReadCallbackFunc cb);

/// Dequeues data from write queue and sends to the wire in async manner
void doWrite();

/// Called by write*() functions
void doWrite(BytesIn in, size_t bytes, WriteCallbackFunc cb);
void doWrite(BytesIn in, WriteCallbackFunc cb);

/// Clears close callback state
[[nodiscard]] std::pair<VoidResultHandlerFunc, outcome::result<void>>
Expand Down Expand Up @@ -169,16 +170,7 @@ namespace libp2p::connection {
basic::ReadBuffer internal_read_buffer_;

/// True if read operation is active
bool is_reading_ = false;

/// Read callback, it is non-zero during async data receive
ReadCallbackFunc read_cb_;

/// TODO: get rid of this. client's read buffer
BytesOut external_read_buffer_;

/// Size of message being read
size_t read_message_size_ = 0;
std::optional<Reading> reading_;

/// adjustWindowSize() callback, triggers when receive window size
/// becomes greater or equal then desired
Expand Down
2 changes: 1 addition & 1 deletion include/libp2p/muxer/yamux/yamuxed_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ namespace libp2p::connection {
void doWrite(WriteQueueItem packet);

/// Write callback
void onDataWritten(outcome::result<size_t> res, StreamId stream_id);
void onDataWritten(outcome::result<void> res, WriteQueueItem &&packet);

/// Creates new yamux stream
std::shared_ptr<Stream> createStream(StreamId stream_id);
Expand Down
4 changes: 2 additions & 2 deletions include/libp2p/protocol/echo/server_echo_session.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ namespace libp2p::protocol {

void onRead(outcome::result<size_t> rread);

void doWrite(size_t size);
void doWrite();

void onWrite(outcome::result<size_t> rwrite);
void onWrite(outcome::result<void> rwrite);
};

} // namespace libp2p::protocol
21 changes: 0 additions & 21 deletions include/libp2p/security/noise/noise_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,6 @@ namespace libp2p::connection {
class NoiseConnection : public SecureConnection,
public std::enable_shared_from_this<NoiseConnection> {
public:
using BufferList = std::list<Bytes>;

struct OperationContext {
size_t bytes_served; /// written or read bytes count
const size_t total_bytes; /// total size to process
BufferList::iterator write_buffer; /// temporary data storage
};

~NoiseConnection() override = default;

NoiseConnection(
Expand Down Expand Up @@ -70,18 +62,6 @@ namespace libp2p::connection {
outcome::result<crypto::PublicKey> remotePublicKey() const override;

private:
void readSome(BytesOut out,
size_t bytes,
OperationContext ctx,
ReadCallbackFunc cb);

void write(BytesIn in,
size_t bytes,
OperationContext ctx,
WriteCallbackFunc cb);

void eraseWriteBuffer(BufferList::iterator &iterator);

std::shared_ptr<LayerConnection> connection_;
crypto::PublicKey local_;
crypto::PublicKey remote_;
Expand All @@ -90,7 +70,6 @@ namespace libp2p::connection {
std::shared_ptr<security::noise::CipherState> decoder_cs_;
std::shared_ptr<Bytes> frame_buffer_;
std::shared_ptr<security::noise::InsecureReadWriter> framer_;
BufferList write_buffers_;
log::Logger log_ = log::createLogger("NoiseConnection");

public:
Expand Down
4 changes: 3 additions & 1 deletion include/libp2p/security/secio/secio_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <mutex>
#include <queue>

#include <libp2p/basic/cb.hpp>
#include <libp2p/common/metrics/instance_count.hpp>
#include <libp2p/connection/secure_connection.hpp>
#include <libp2p/crypto/common.hpp>
Expand Down Expand Up @@ -115,7 +116,7 @@ namespace libp2p::connection {
/**
* Retrieves the next available SECIO message from the network.
*/
void readNextMessage(ReadCallbackFunc cb);
void readNextMessage(CbOutcomeVoid cb);

/**
* Moves decrypted bytes from internal buffer to the output buffer.
Expand Down Expand Up @@ -163,6 +164,7 @@ namespace libp2p::connection {
std::queue<uint8_t> user_data_buffer_;

std::shared_ptr<Bytes> read_buffer_;
std::shared_ptr<Bytes> write_buffer_;

log::Logger log_ = log::createLogger("SecIoConnection");

Expand Down
13 changes: 7 additions & 6 deletions src/muxer/mplex/mplex_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ namespace libp2p::connection {
}

bool MplexStream::readTry() {
auto size{std::min(read_buffer_.size(), reading_->bytes)};
auto size{std::min(read_buffer_.size(), reading_->out.size())};
if (size == 0) {
return false;
}
Expand All @@ -65,20 +65,21 @@ namespace libp2p::connection {
}

void MplexStream::readSome(BytesOut out, size_t bytes, ReadCallbackFunc cb) {
ambigousSize(out, bytes);
if (is_reset_) {
return cb(Error::STREAM_RESET_BY_PEER);
}
if (!is_readable_) {
return cb(Error::STREAM_NOT_READABLE);
}
if (bytes == 0 || out.empty() || static_cast<size_t>(out.size()) < bytes) {
if (out.empty()) {
return cb(Error::STREAM_INVALID_ARGUMENT);
}
if (reading_.has_value()) {
return cb(Error::STREAM_IS_READING);
}

reading_.emplace(Reading{out, bytes, std::move(cb)});
reading_.emplace(Reading{out, std::move(cb)});
if (readTry()) {
return;
}
Expand All @@ -98,13 +99,13 @@ namespace libp2p::connection {
if (!is_writable_) {
return cb(Error::STREAM_NOT_WRITABLE);
}
if (bytes == 0 || in.empty() || static_cast<size_t>(in.size()) < bytes) {
if (in.empty()) {
return cb(Error::STREAM_INVALID_ARGUMENT);
}
if (is_writing_) {
std::vector<uint8_t> in_vector(in.begin(), in.end());
std::lock_guard<std::mutex> lock(write_queue_mutex_);
write_queue_.emplace_back(in_vector, bytes, cb);
write_queue_.emplace_back(in_vector, cb);
return;
}
if (connection_.expired()) {
Expand All @@ -129,7 +130,7 @@ namespace libp2p::connection {
// check if new write messages were received while stream was writing
// and propagate these messages
if (not self->write_queue_.empty()) {
auto [in, bytes, cb] = self->write_queue_.front();
auto [in, cb] = self->write_queue_.front();
self->write_queue_.pop_front();
writeReturnSize(self, in, cb);
}
Expand Down
Loading
Loading