Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions include/libp2p/protocol_muxer/multiselect.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@

#include "protocol_muxer.hpp"

namespace libp2p::basic {
class Scheduler;
}

namespace libp2p::protocol_muxer::multiselect {

class MultiselectInstance;
Expand All @@ -20,6 +24,8 @@ namespace libp2p::protocol_muxer::multiselect {
public:
using Instance = std::shared_ptr<MultiselectInstance>;

explicit Multiselect(std::shared_ptr<basic::Scheduler> scheduler);

~Multiselect() override = default;

/// Implements ProtocolMuxer API
Expand All @@ -45,6 +51,9 @@ namespace libp2p::protocol_muxer::multiselect {
/// Returns instance either from cache or creates a new one
Instance getInstance();

/// Scheduler for timeout management
std::shared_ptr<basic::Scheduler> scheduler_;

/// Active instances, keep them here to hold shared ptrs alive
std::unordered_set<Instance> active_instances_;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,18 @@

#pragma once

#include <libp2p/basic/scheduler.hpp>
#include <libp2p/protocol_muxer/multiselect.hpp>
#include "parser.hpp"

namespace soralog {
class Logger;
}

namespace libp2p::basic {
class Scheduler;
}

namespace libp2p::protocol_muxer::multiselect {

class Multiselect;
Expand All @@ -21,7 +26,8 @@ namespace libp2p::protocol_muxer::multiselect {
class MultiselectInstance
: public std::enable_shared_from_this<MultiselectInstance> {
public:
explicit MultiselectInstance(Multiselect &owner);
explicit MultiselectInstance(Multiselect &owner,
std::shared_ptr<basic::Scheduler> scheduler);

/// Implements ProtocolMuxer API
void selectOneOf(std::span<const peer::ProtocolName> protocols,
Expand Down Expand Up @@ -74,6 +80,9 @@ namespace libp2p::protocol_muxer::multiselect {
/// Handles "na" reply, client-specific
MaybeResult handleNA();

/// Handles timeout when negotiation takes too long
void onTimeout();

/// Owner of this object, needed for reuse of instances
Multiselect &owner_;

Expand Down Expand Up @@ -125,6 +134,12 @@ namespace libp2p::protocol_muxer::multiselect {

/// Cache: serialized NA response
boost::optional<Packet> na_response_;

/// Scheduler for timeout handling
std::shared_ptr<basic::Scheduler> scheduler_;

/// Timeout handle for negotiation timeout
basic::Scheduler::Handle timeout_handle_;
};

} // namespace libp2p::protocol_muxer::multiselect
6 changes: 5 additions & 1 deletion src/protocol_muxer/multiselect.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

#include <libp2p/basic/scheduler.hpp>
#include <libp2p/log/logger.hpp>
#include <libp2p/protocol_muxer/multiselect/multiselect_instance.hpp>
#include <libp2p/protocol_muxer/multiselect/simple_stream_negotiate.hpp>
Expand All @@ -21,6 +22,9 @@ namespace libp2p::protocol_muxer::multiselect {
constexpr size_t kMaxCacheSize = 8;
} // namespace

Multiselect::Multiselect(std::shared_ptr<basic::Scheduler> scheduler)
: scheduler_(std::move(scheduler)) {}

void Multiselect::selectOneOf(std::span<const peer::ProtocolName> protocols,
std::shared_ptr<basic::ReadWriter> connection,
bool is_initiator,
Expand Down Expand Up @@ -62,7 +66,7 @@ namespace libp2p::protocol_muxer::multiselect {
Multiselect::Instance Multiselect::getInstance() {
Instance instance;
if (cache_.empty()) {
instance = std::make_shared<MultiselectInstance>(*this);
instance = std::make_shared<MultiselectInstance>(*this, scheduler_);
} else {
SL_TRACE(log(),
"cache: {}->{}, active {}->{}",
Expand Down
30 changes: 28 additions & 2 deletions src/protocol_muxer/multiselect/multiselect_instance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
#include <libp2p/protocol_muxer/multiselect/multiselect_instance.hpp>

#include <cctype>
#include <span>

#include <libp2p/basic/scheduler.hpp>
#include <libp2p/basic/write_return_size.hpp>
#include <libp2p/common/trace.hpp>
#include <libp2p/protocol_muxer/multiselect/serializing.hpp>
Expand All @@ -20,10 +22,14 @@ namespace libp2p::protocol_muxer::multiselect {
static log::Logger logger = log::createLogger("Multiselect");
return logger;
}

/// Timeout for protocol negotiation (5 seconds)
constexpr std::chrono::milliseconds kNegotiationTimeout{5000};
} // namespace

MultiselectInstance::MultiselectInstance(Multiselect &owner)
: owner_(owner) {}
MultiselectInstance::MultiselectInstance(
Multiselect &owner, std::shared_ptr<basic::Scheduler> scheduler)
: owner_(owner), scheduler_(std::move(scheduler)) {}

void MultiselectInstance::selectOneOf(
std::span<const peer::ProtocolName> protocols,
Expand Down Expand Up @@ -62,6 +68,15 @@ namespace libp2p::protocol_muxer::multiselect {
write_queue_.clear();
is_writing_ = false;

// Schedule timeout for negotiation
timeout_handle_ = scheduler_->scheduleWithHandle(
[wptr = std::weak_ptr<MultiselectInstance>(shared_from_this())]() {
if (auto self = wptr.lock()) {
self->onTimeout();
}
},
kNegotiationTimeout);

if (is_initiator_) {
std::ignore = sendProposal();
} else if (negotiate_multiselect) {
Expand Down Expand Up @@ -166,6 +181,10 @@ namespace libp2p::protocol_muxer::multiselect {

void MultiselectInstance::close(outcome::result<std::string> result) {
closed_ = true;

// Cancel timeout if it's still active
timeout_handle_.reset();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we reset() connection/stream, to cancel pending read/write?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it will be automatically reset, once we erase this active instance:

void Multiselect::instanceClosed(Instance instance,
const ProtocolHandlerFunc &cb,
outcome::result<peer::ProtocolName> result) {
active_instances_.erase(instance);
if (cache_.size() < kMaxCacheSize) {
cache_.emplace_back(std::move(instance));
}
cb(std::move(result));
}

It might live a little longer in the cache, but eventually a new connection will replace the one from the instance


++current_round_;
write_queue_.clear();
Multiselect::ProtocolHandlerFunc callback;
Expand Down Expand Up @@ -338,4 +357,11 @@ namespace libp2p::protocol_muxer::multiselect {
return MaybeResult(ProtocolMuxer::Error::PROTOCOL_VIOLATION);
}

void MultiselectInstance::onTimeout() {
SL_DEBUG(log(),
"Protocol negotiation timeout after {}ms",
kNegotiationTimeout.count());
close(ProtocolMuxer::Error::NEGOTIATION_FAILED);
}

} // namespace libp2p::protocol_muxer::multiselect
2 changes: 1 addition & 1 deletion test/acceptance/p2p/host/peer/test_peer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ Peer::sptr<host::BasicHost> Peer::makeHost(const crypto::KeyPair &keyPair) {
std::make_shared<peer::IdentityManagerImpl>(keyPair, key_marshaller);

auto multiselect =
std::make_shared<protocol_muxer::multiselect::Multiselect>();
std::make_shared<protocol_muxer::multiselect::Multiselect>(scheduler_);

auto router = std::make_shared<network::RouterImpl>();

Expand Down
Loading