diff --git a/dht.go b/dht.go index 72aa660..37aac50 100644 --- a/dht.go +++ b/dht.go @@ -51,6 +51,12 @@ type DHT struct { pendingRequests map[string]chan *DHTMessage pendingMu sync.RWMutex + // Inbound connection tracking (for outbound-only detection) + startTime time.Time + hasReceivedInbound bool + inboundMu sync.RWMutex + lastInboundWarning time.Time + // Shutdown coordination shutdown chan struct{} wg sync.WaitGroup @@ -64,6 +70,7 @@ func NewDHT(localSystem *System, storage *Storage, listenAddr string) *DHT { listenAddr: listenAddr, pendingRequests: make(map[string]chan *DHTMessage), shutdown: make(chan struct{}), + startTime: time.Now(), httpClient: &http.Client{ Timeout: RequestTimeout, }, @@ -106,6 +113,43 @@ func (dht *DHT) Stop() { log.Printf("DHT stopped") } +// markInboundReceived records that we've received an inbound connection +func (dht *DHT) markInboundReceived() { + dht.inboundMu.Lock() + dht.hasReceivedInbound = true + dht.inboundMu.Unlock() +} + +// checkInboundStatus logs a warning if no inbound connections after startup period +func (dht *DHT) checkInboundStatus() { + dht.inboundMu.RLock() + hasInbound := dht.hasReceivedInbound + lastWarning := dht.lastInboundWarning + dht.inboundMu.RUnlock() + + if hasInbound { + return + } + + // Only warn after 10 minutes of uptime + if time.Since(dht.startTime) < 10*time.Minute { + return + } + + // Repeat warning every 6 hours + if !lastWarning.IsZero() && time.Since(lastWarning) < 6*time.Hour { + return + } + + dht.inboundMu.Lock() + dht.lastInboundWarning = time.Now() + dht.inboundMu.Unlock() + + log.Printf("WARNING: No inbound connections received after 10 minutes.") + log.Printf(" Your node may be in outbound-only mode (can see network but others can't reach you).") + log.Printf(" Check that port %s is open and forwarded correctly, as UPnP may have failed.", dht.listenAddr) +} + // updateRoutingTable adds a node to the peer cache // Simplified from Kademlia - we just cache all peers we hear about func (dht *DHT) updateRoutingTable(sys *System) { @@ -218,6 +262,9 @@ func (dht *DHT) handleDHTMessage(w http.ResponseWriter, r *http.Request) { return } + // Mark that we've received an inbound request (not a response) + dht.markInboundReceived() + switch msg.Type { case MessageTypePing: response, err = dht.handlePing(&msg) diff --git a/dht_maintenance.go b/dht_maintenance.go index ab57ab9..c971a68 100644 --- a/dht_maintenance.go +++ b/dht_maintenance.go @@ -54,6 +54,7 @@ func (dht *DHT) peerLivenessLoop() { return case <-ticker.C: dht.checkPeerLiveness() + dht.checkInboundStatus() } } } @@ -208,7 +209,7 @@ func (dht *DHT) validateGossipSystems() { } // Try to ping the system directly - _, err := dht.Ping(sys.PeerAddress) + respSystem, err := dht.Ping(sys.PeerAddress) if err != nil { // Failed to contact - check if this is the expected system // The Ping function already handles UUID mismatches @@ -223,6 +224,16 @@ func (dht *DHT) validateGossipSystems() { dht.routingTable.RemoveFromCache(sys.ID) removed++ } + } else if respSystem != nil && respSystem.ID != sys.ID { + // UUID mismatch - a different system now lives at this address + // The gossip entry is stale, remove it + log.Printf(" %s (%s): UUID mismatch - address now belongs to %s (%s), removing stale entry", + sys.Name, sys.ID.String()[:8], respSystem.Name, respSystem.ID.String()[:8]) + dht.routingTable.RemoveFromCache(sys.ID) + if err := dht.storage.DeletePeerSystem(sys.ID); err != nil { + log.Printf("Warning: failed to delete stale peer system %s: %v", sys.ID.String()[:8], err) + } + removed++ } else { verified++ log.Printf(" %s: verified", sys.Name)