Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions include/libp2p/protocol/gossip/gossip.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <libp2p/coro/channel.hpp>
#include <libp2p/coro/coro.hpp>
#include <libp2p/event/bus.hpp>
#include <libp2p/log/logger.hpp>
#include <libp2p/protocol/base_protocol.hpp>
#include <libp2p/protocol/gossip/config.hpp>
#include <libp2p/protocol/gossip/score.hpp>
Expand Down Expand Up @@ -348,6 +349,7 @@ namespace libp2p::protocol::gossip {
message_cache_;
GossipPromises<PeerPtr> gossip_promises_;
Score score_;
log::Logger logger_ = log::createLogger("Gossip");
size_t heartbeat_ticks_ = 0;
std::unordered_map<PeerPtr, size_t> count_received_ihave_;
std::unordered_map<PeerPtr, size_t> count_sent_iwant_;
Expand Down
192 changes: 170 additions & 22 deletions src/protocol/gossip/gossip.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -267,38 +267,84 @@ namespace libp2p::protocol::gossip {

// Protocol IDs to advertise for stream negotiation.
StreamProtocols Gossip::getProtocolIds() const {
SL_TRACE(logger_,
"getProtocolIds called, returning {} protocols:",
protocols_.size());
for (auto &p : protocols_) {
SL_TRACE(logger_, " - {}", p);
}
return protocols_;
}

// Handle inbound stream: set peer kind, track stream, and read RPCs in a
// loop.
void Gossip::handle(std::shared_ptr<Stream> stream) {
auto peer_id = stream->remotePeerId();
SL_TRACE(logger_,
"HANDLE called for peer={} protocol={}",
peer_id.toBase58(),
stream->protocol());
auto peer_it = peers_.find(peer_id);
if (peer_it == peers_.end()) {
stream->reset();
return;
// Peer not found - this can happen if the inbound stream arrives before
// the on_peer_connected event fires (race condition). Create the peer
// entry now to avoid rejecting the stream.
SL_TRACE(logger_,
"HANDLE creating missing peer entry for {}",
peer_id.toBase58());
peer_it =
peers_.emplace(peer_id, std::make_shared<Peer>(peer_id, false)).first;
score_.connect(peer_id);
} else {
SL_TRACE(logger_, "HANDLE peer exists for {}", peer_id.toBase58());
}
auto peer = peer_it->second;
updatePeerKind(peer, stream->protocol());
peer->streams_in_.emplace(stream);
coroSpawn(*io_context_, [WEAK_SELF, stream, peer]() -> Coro<void> {
Bytes encoded;
while (true) {
auto r = co_await readVarintMessage(stream, encoded);
if (not r.has_value()) {
break;
}
auto self = weak_self.lock();
if (not self) {
break;
}
if (not self->onMessage(peer, encoded)) {
break;
}
}
peer->streams_in_.erase(stream);
});
SL_TRACE(
logger_, "HANDLE starting read loop for peer={}", peer_id.toBase58());
coroSpawn(
*io_context_,
[WEAK_SELF, stream, peer, peer_id, logger = logger_]() -> Coro<void> {
Bytes encoded;
int msg_count = 0;
while (true) {
auto r = co_await readVarintMessage(stream, encoded);
if (not r.has_value()) {
SL_TRACE(
logger,
"HANDLE read failed for peer={} after {} messages, error={}",
peer_id.toBase58(),
msg_count,
r.error().message());
break;
}
msg_count++;
SL_TRACE(logger,
"HANDLE read message #{} from peer={} size={}",
msg_count,
peer_id.toBase58(),
encoded.size());
auto self = weak_self.lock();
if (not self) {
SL_TRACE(logger,
"HANDLE self expired for peer={}",
peer_id.toBase58());
break;
}
if (not self->onMessage(peer, encoded)) {
SL_TRACE(logger,
"HANDLE onMessage returned false for peer={}",
peer_id.toBase58());
break;
}
}
SL_TRACE(logger,
"HANDLE read loop ended for peer={} protocol={}",
peer_id.toBase58(),
stream->protocol());
peer->streams_in_.erase(stream);
});
}

// Start event listeners, timers, and open outbound streams on new
Expand All @@ -312,8 +358,15 @@ namespace libp2p::protocol::gossip {
WEAK_LOCK(self);
auto peer_id = connection->remotePeer();
auto out = connection->isInitiator();
SL_TRACE(self->logger_,
"ON_PEER_CONNECTED peer={} initiator={}",
peer_id.toBase58(),
(out ? "true" : "false"));
auto peer_it = self->peers_.find(peer_id);
if (peer_it == self->peers_.end()) {
SL_TRACE(self->logger_,
"ON_PEER_CONNECTED creating new peer entry for {}",
peer_id.toBase58());
peer_it =
self->peers_
.emplace(peer_id, std::make_shared<Peer>(peer_id, out))
Expand All @@ -323,23 +376,41 @@ namespace libp2p::protocol::gossip {
auto peer = peer_it->second;
// Avoid creating multiple streams concurrently
if (not peer->stream_out_.has_value() and not peer->is_connecting_) {
SL_TRACE(self->logger_,
"ON_PEER_CONNECTED creating outbound stream for {}",
peer_id.toBase58());
peer->is_connecting_ = true;
coroSpawn(
*self->io_context_, [self, connection, peer]() -> Coro<void> {
*self->io_context_,
[self, connection, peer, peer_id, logger = self->logger_]()
-> Coro<void> {
auto stream_result = (co_await self->host_->newStream(
connection, self->protocols_));
peer->is_connecting_ = false;
if (not stream_result.has_value()) {
// TODO: can't open out stream?
SL_TRACE(logger,
"ON_PEER_CONNECTED failed to open out stream for "
"{} error={}",
peer_id.toBase58(),
stream_result.error().message());
co_return;
}
if (auto stream = qtils::optionTake(peer->stream_out_)) {
(**stream).reset();
}
auto &stream = stream_result.value();
SL_TRACE(
logger,
"ON_PEER_CONNECTED opened out stream for {} protocol={}",
peer_id.toBase58(),
stream->protocol());
self->updatePeerKind(peer, stream->protocol());
peer->stream_out_ = stream;
if (not self->topics_.empty()) {
SL_TRACE(
logger,
"ON_PEER_CONNECTED sending initial SUBSCRIBE to {}",
peer_id.toBase58());
auto &message = self->getBatch(peer);
for (auto &[topic_hash, topic] : self->topics_) {
if (topic->publish_only_) {
Expand Down Expand Up @@ -503,11 +574,21 @@ namespace libp2p::protocol::gossip {

// Graylist gate: ignore peer below threshold.
if (score_.below(peer->peer_id_, config_.score.graylist_threshold)) {
SL_TRACE(logger_,
"GRAYLISTED peer={} score={} threshold={}",
peer->peer_id_.toBase58(),
score_.score(peer->peer_id_),
config_.score.graylist_threshold);
return true;
}

// Handle SUBSCRIBE/UNSUBSCRIBE and opportunistic GRAFT on subscribe.
for (auto &pb_subscribe : pb_message.subscriptions()) {
SL_TRACE(logger_,
"RX SUBSCRIBE/UNSUBSCRIBE topic={} sub={} from={}",
pb_subscribe.topic_id(),
pb_subscribe.subscribe(),
peer->peer_id_.toBase58());
auto topic_hash =
qtils::ByteVec(qtils::str2byte(pb_subscribe.topic_id()));
auto topic_it = topics_.find(topic_hash);
Expand Down Expand Up @@ -539,6 +620,11 @@ namespace libp2p::protocol::gossip {
// Handle PUBLISH: verify signature (if strict mode), dedupe, deliver
// locally, and relay.
for (auto &pb_publish : pb_message.publish()) {
SL_TRACE(logger_,
"RX PUBLISH topic={} size={} from={}",
pb_publish.topic(),
pb_publish.data().size(),
peer->peer_id_.toBase58());
auto message = std::make_shared<Message>();

switch (config_.validation_mode) {
Expand Down Expand Up @@ -613,6 +699,10 @@ namespace libp2p::protocol::gossip {

// Handle GRAFT: accept (add to mesh) or PRUNE with backoff.
for (auto &pb_graft : pb_message.control().graft()) {
SL_TRACE(logger_,
"RX GRAFT topic={} from={}",
pb_graft.topic_id(),
peer->peer_id_.toBase58());
if (not peer->isGossipsub()) {
return false;
}
Expand Down Expand Up @@ -647,20 +737,36 @@ namespace libp2p::protocol::gossip {
return false;
}
if (score_.below(peer->peer_id_, config_.score.zero)) {
SL_TRACE(logger_,
"GRAFT_REJECT_LOW_SCORE peer={} score={}",
peer->peer_id_.toBase58(),
score_.score(peer->peer_id_));
return false;
}
return true;
}();
if (accept) {
SL_TRACE(logger_,
"GRAFT_ACCEPTED peer={} topic={}",
peer->peer_id_.toBase58(),
pb_graft.topic_id());
topic->mesh_peers_.emplace(peer);
score_.graft(peer->peer_id_, topic_hash);
} else {
SL_TRACE(logger_,
"GRAFT_PRUNED peer={} topic={}",
peer->peer_id_.toBase58(),
pb_graft.topic_id());
make_prune(*topic, peer);
}
}

// Handle PRUNE: remove from mesh and update backoff (v1.1+ honors backoff).
for (auto &pb_prune : pb_message.control().prune()) {
SL_TRACE(logger_,
"RX PRUNE topic={} from={}",
pb_prune.topic_id(),
peer->peer_id_.toBase58());
if (not peer->isGossipsub()) {
return false;
}
Expand All @@ -673,6 +779,12 @@ namespace libp2p::protocol::gossip {
}

// Handle IHAVE: select a capped subset of unknown IDs and enqueue IWANTs.
if (pb_message.control().ihave().size() > 0) {
SL_TRACE(logger_,
"RX IHAVE count={} from={}",
pb_message.control().ihave().size(),
peer->peer_id_.toBase58());
}
if (not handle_ihave(peer, pb_message)) {
return false;
}
Expand Down Expand Up @@ -881,18 +993,27 @@ namespace libp2p::protocol::gossip {
return;
}
peer->writing_ = true;
coroSpawn(*io_context_, [peer]() -> Coro<void> {
auto peer_id = peer->peer_id_;
coroSpawn(*io_context_, [peer, peer_id, logger = logger_]() -> Coro<void> {
co_await coroYield();
assert(peer->writing_);
assert(peer->stream_out_.has_value());
while (auto message = qtils::optionTake(peer->batch_)) {
auto pb_messages = splitBatch(*message);
SL_TRACE(logger,
"CHECKWRITE sending {} PB messages to {}",
pb_messages.size(),
peer_id.toBase58());

for (auto &encoded : pb_messages) {
assert(not encoded.empty());
auto r =
co_await writeVarintMessage(peer->stream_out_.value(), encoded);
if (not r.has_value()) {
SL_TRACE(logger,
"CHECKWRITE send failed for peer={} error={}",
peer_id.toBase58(),
r.error().message());
peer->stream_out_.reset();
break;
}
Expand Down Expand Up @@ -957,6 +1078,33 @@ namespace libp2p::protocol::gossip {
// emit gossip, expire history/cache, and clear expired dont_send_ marks.
void Gossip::heartbeat() {
++heartbeat_ticks_;

// Log peer scores and mesh status during heartbeat
SL_TRACE(logger_, "=== HEARTBEAT #{} ===", heartbeat_ticks_);
SL_TRACE(
logger_, "Peer count={} Topic count={}", peers_.size(), topics_.size());
for (auto &[peer_id, peer] : peers_) {
auto peer_score = score_.score(peer_id);
SL_TRACE(logger_,
"PEER_SCORE peer={} score={} out={} topics={}",
peer_id.toBase58(),
peer_score,
peer->out_,
peer->topics_.size());
}
for (auto &[topic_hash, topic] : topics_) {
SL_TRACE(logger_,
"TOPIC topic={} mesh_size={} peers={} publish_only={}",
qtils::byte2str(topic_hash),
topic->mesh_peers_.size(),
topic->peers_.size(),
topic->publish_only_);
for (auto &peer : topic->mesh_peers_) {
SL_TRACE(logger_, " MESH_PEER peer={}", peer->peer_id_.toBase58());
}
}
SL_TRACE(logger_, "=== END HEARTBEAT ===");

for (auto &topic : topics_ | std::views::values) {
topic->backoff_.shift();
}
Expand Down
Loading