diff --git a/include/libp2p/protocol/gossip/gossip.hpp b/include/libp2p/protocol/gossip/gossip.hpp index 36e7e8f9..fa05d8e8 100644 --- a/include/libp2p/protocol/gossip/gossip.hpp +++ b/include/libp2p/protocol/gossip/gossip.hpp @@ -9,6 +9,7 @@ #include #include #include +#include #include #include #include @@ -348,6 +349,7 @@ namespace libp2p::protocol::gossip { message_cache_; GossipPromises gossip_promises_; Score score_; + log::Logger logger_ = log::createLogger("Gossip"); size_t heartbeat_ticks_ = 0; std::unordered_map count_received_ihave_; std::unordered_map count_sent_iwant_; diff --git a/src/protocol/gossip/gossip.cpp b/src/protocol/gossip/gossip.cpp index 78c6514f..eac58175 100644 --- a/src/protocol/gossip/gossip.cpp +++ b/src/protocol/gossip/gossip.cpp @@ -267,6 +267,12 @@ namespace libp2p::protocol::gossip { // Protocol IDs to advertise for stream negotiation. StreamProtocols Gossip::getProtocolIds() const { + SL_TRACE(logger_, + "getProtocolIds called, returning {} protocols:", + protocols_.size()); + for (auto &p : protocols_) { + SL_TRACE(logger_, " - {}", p); + } return protocols_; } @@ -274,31 +280,71 @@ namespace libp2p::protocol::gossip { // loop. void Gossip::handle(std::shared_ptr stream) { auto peer_id = stream->remotePeerId(); + SL_TRACE(logger_, + "HANDLE called for peer={} protocol={}", + peer_id.toBase58(), + stream->protocol()); auto peer_it = peers_.find(peer_id); if (peer_it == peers_.end()) { - stream->reset(); - return; + // Peer not found - this can happen if the inbound stream arrives before + // the on_peer_connected event fires (race condition). Create the peer + // entry now to avoid rejecting the stream. + SL_TRACE(logger_, + "HANDLE creating missing peer entry for {}", + peer_id.toBase58()); + peer_it = + peers_.emplace(peer_id, std::make_shared(peer_id, false)).first; + score_.connect(peer_id); + } else { + SL_TRACE(logger_, "HANDLE peer exists for {}", peer_id.toBase58()); } auto peer = peer_it->second; updatePeerKind(peer, stream->protocol()); peer->streams_in_.emplace(stream); - coroSpawn(*io_context_, [WEAK_SELF, stream, peer]() -> Coro { - Bytes encoded; - while (true) { - auto r = co_await readVarintMessage(stream, encoded); - if (not r.has_value()) { - break; - } - auto self = weak_self.lock(); - if (not self) { - break; - } - if (not self->onMessage(peer, encoded)) { - break; - } - } - peer->streams_in_.erase(stream); - }); + SL_TRACE( + logger_, "HANDLE starting read loop for peer={}", peer_id.toBase58()); + coroSpawn( + *io_context_, + [WEAK_SELF, stream, peer, peer_id, logger = logger_]() -> Coro { + Bytes encoded; + int msg_count = 0; + while (true) { + auto r = co_await readVarintMessage(stream, encoded); + if (not r.has_value()) { + SL_TRACE( + logger, + "HANDLE read failed for peer={} after {} messages, error={}", + peer_id.toBase58(), + msg_count, + r.error().message()); + break; + } + msg_count++; + SL_TRACE(logger, + "HANDLE read message #{} from peer={} size={}", + msg_count, + peer_id.toBase58(), + encoded.size()); + auto self = weak_self.lock(); + if (not self) { + SL_TRACE(logger, + "HANDLE self expired for peer={}", + peer_id.toBase58()); + break; + } + if (not self->onMessage(peer, encoded)) { + SL_TRACE(logger, + "HANDLE onMessage returned false for peer={}", + peer_id.toBase58()); + break; + } + } + SL_TRACE(logger, + "HANDLE read loop ended for peer={} protocol={}", + peer_id.toBase58(), + stream->protocol()); + peer->streams_in_.erase(stream); + }); } // Start event listeners, timers, and open outbound streams on new @@ -312,8 +358,15 @@ namespace libp2p::protocol::gossip { WEAK_LOCK(self); auto peer_id = connection->remotePeer(); auto out = connection->isInitiator(); + SL_TRACE(self->logger_, + "ON_PEER_CONNECTED peer={} initiator={}", + peer_id.toBase58(), + (out ? "true" : "false")); auto peer_it = self->peers_.find(peer_id); if (peer_it == self->peers_.end()) { + SL_TRACE(self->logger_, + "ON_PEER_CONNECTED creating new peer entry for {}", + peer_id.toBase58()); peer_it = self->peers_ .emplace(peer_id, std::make_shared(peer_id, out)) @@ -323,23 +376,41 @@ namespace libp2p::protocol::gossip { auto peer = peer_it->second; // Avoid creating multiple streams concurrently if (not peer->stream_out_.has_value() and not peer->is_connecting_) { + SL_TRACE(self->logger_, + "ON_PEER_CONNECTED creating outbound stream for {}", + peer_id.toBase58()); peer->is_connecting_ = true; coroSpawn( - *self->io_context_, [self, connection, peer]() -> Coro { + *self->io_context_, + [self, connection, peer, peer_id, logger = self->logger_]() + -> Coro { auto stream_result = (co_await self->host_->newStream( connection, self->protocols_)); peer->is_connecting_ = false; if (not stream_result.has_value()) { - // TODO: can't open out stream? + SL_TRACE(logger, + "ON_PEER_CONNECTED failed to open out stream for " + "{} error={}", + peer_id.toBase58(), + stream_result.error().message()); co_return; } if (auto stream = qtils::optionTake(peer->stream_out_)) { (**stream).reset(); } auto &stream = stream_result.value(); + SL_TRACE( + logger, + "ON_PEER_CONNECTED opened out stream for {} protocol={}", + peer_id.toBase58(), + stream->protocol()); self->updatePeerKind(peer, stream->protocol()); peer->stream_out_ = stream; if (not self->topics_.empty()) { + SL_TRACE( + logger, + "ON_PEER_CONNECTED sending initial SUBSCRIBE to {}", + peer_id.toBase58()); auto &message = self->getBatch(peer); for (auto &[topic_hash, topic] : self->topics_) { if (topic->publish_only_) { @@ -503,11 +574,21 @@ namespace libp2p::protocol::gossip { // Graylist gate: ignore peer below threshold. if (score_.below(peer->peer_id_, config_.score.graylist_threshold)) { + SL_TRACE(logger_, + "GRAYLISTED peer={} score={} threshold={}", + peer->peer_id_.toBase58(), + score_.score(peer->peer_id_), + config_.score.graylist_threshold); return true; } // Handle SUBSCRIBE/UNSUBSCRIBE and opportunistic GRAFT on subscribe. for (auto &pb_subscribe : pb_message.subscriptions()) { + SL_TRACE(logger_, + "RX SUBSCRIBE/UNSUBSCRIBE topic={} sub={} from={}", + pb_subscribe.topic_id(), + pb_subscribe.subscribe(), + peer->peer_id_.toBase58()); auto topic_hash = qtils::ByteVec(qtils::str2byte(pb_subscribe.topic_id())); auto topic_it = topics_.find(topic_hash); @@ -539,6 +620,11 @@ namespace libp2p::protocol::gossip { // Handle PUBLISH: verify signature (if strict mode), dedupe, deliver // locally, and relay. for (auto &pb_publish : pb_message.publish()) { + SL_TRACE(logger_, + "RX PUBLISH topic={} size={} from={}", + pb_publish.topic(), + pb_publish.data().size(), + peer->peer_id_.toBase58()); auto message = std::make_shared(); switch (config_.validation_mode) { @@ -613,6 +699,10 @@ namespace libp2p::protocol::gossip { // Handle GRAFT: accept (add to mesh) or PRUNE with backoff. for (auto &pb_graft : pb_message.control().graft()) { + SL_TRACE(logger_, + "RX GRAFT topic={} from={}", + pb_graft.topic_id(), + peer->peer_id_.toBase58()); if (not peer->isGossipsub()) { return false; } @@ -647,20 +737,36 @@ namespace libp2p::protocol::gossip { return false; } if (score_.below(peer->peer_id_, config_.score.zero)) { + SL_TRACE(logger_, + "GRAFT_REJECT_LOW_SCORE peer={} score={}", + peer->peer_id_.toBase58(), + score_.score(peer->peer_id_)); return false; } return true; }(); if (accept) { + SL_TRACE(logger_, + "GRAFT_ACCEPTED peer={} topic={}", + peer->peer_id_.toBase58(), + pb_graft.topic_id()); topic->mesh_peers_.emplace(peer); score_.graft(peer->peer_id_, topic_hash); } else { + SL_TRACE(logger_, + "GRAFT_PRUNED peer={} topic={}", + peer->peer_id_.toBase58(), + pb_graft.topic_id()); make_prune(*topic, peer); } } // Handle PRUNE: remove from mesh and update backoff (v1.1+ honors backoff). for (auto &pb_prune : pb_message.control().prune()) { + SL_TRACE(logger_, + "RX PRUNE topic={} from={}", + pb_prune.topic_id(), + peer->peer_id_.toBase58()); if (not peer->isGossipsub()) { return false; } @@ -673,6 +779,12 @@ namespace libp2p::protocol::gossip { } // Handle IHAVE: select a capped subset of unknown IDs and enqueue IWANTs. + if (pb_message.control().ihave().size() > 0) { + SL_TRACE(logger_, + "RX IHAVE count={} from={}", + pb_message.control().ihave().size(), + peer->peer_id_.toBase58()); + } if (not handle_ihave(peer, pb_message)) { return false; } @@ -881,18 +993,27 @@ namespace libp2p::protocol::gossip { return; } peer->writing_ = true; - coroSpawn(*io_context_, [peer]() -> Coro { + auto peer_id = peer->peer_id_; + coroSpawn(*io_context_, [peer, peer_id, logger = logger_]() -> Coro { co_await coroYield(); assert(peer->writing_); assert(peer->stream_out_.has_value()); while (auto message = qtils::optionTake(peer->batch_)) { auto pb_messages = splitBatch(*message); + SL_TRACE(logger, + "CHECKWRITE sending {} PB messages to {}", + pb_messages.size(), + peer_id.toBase58()); for (auto &encoded : pb_messages) { assert(not encoded.empty()); auto r = co_await writeVarintMessage(peer->stream_out_.value(), encoded); if (not r.has_value()) { + SL_TRACE(logger, + "CHECKWRITE send failed for peer={} error={}", + peer_id.toBase58(), + r.error().message()); peer->stream_out_.reset(); break; } @@ -957,6 +1078,33 @@ namespace libp2p::protocol::gossip { // emit gossip, expire history/cache, and clear expired dont_send_ marks. void Gossip::heartbeat() { ++heartbeat_ticks_; + + // Log peer scores and mesh status during heartbeat + SL_TRACE(logger_, "=== HEARTBEAT #{} ===", heartbeat_ticks_); + SL_TRACE( + logger_, "Peer count={} Topic count={}", peers_.size(), topics_.size()); + for (auto &[peer_id, peer] : peers_) { + auto peer_score = score_.score(peer_id); + SL_TRACE(logger_, + "PEER_SCORE peer={} score={} out={} topics={}", + peer_id.toBase58(), + peer_score, + peer->out_, + peer->topics_.size()); + } + for (auto &[topic_hash, topic] : topics_) { + SL_TRACE(logger_, + "TOPIC topic={} mesh_size={} peers={} publish_only={}", + qtils::byte2str(topic_hash), + topic->mesh_peers_.size(), + topic->peers_.size(), + topic->publish_only_); + for (auto &peer : topic->mesh_peers_) { + SL_TRACE(logger_, " MESH_PEER peer={}", peer->peer_id_.toBase58()); + } + } + SL_TRACE(logger_, "=== END HEARTBEAT ==="); + for (auto &topic : topics_ | std::views::values) { topic->backoff_.shift(); }