Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ type DHT struct {
pendingRequests map[string]chan *DHTMessage
pendingMu sync.RWMutex

// Inbound connection tracking (for outbound-only detection)
startTime time.Time
hasReceivedInbound bool
inboundMu sync.RWMutex
lastInboundWarning time.Time

// Shutdown coordination
shutdown chan struct{}
wg sync.WaitGroup
Expand All @@ -64,6 +70,7 @@ func NewDHT(localSystem *System, storage *Storage, listenAddr string) *DHT {
listenAddr: listenAddr,
pendingRequests: make(map[string]chan *DHTMessage),
shutdown: make(chan struct{}),
startTime: time.Now(),
httpClient: &http.Client{
Timeout: RequestTimeout,
},
Expand Down Expand Up @@ -106,6 +113,43 @@ func (dht *DHT) Stop() {
log.Printf("DHT stopped")
}

// markInboundReceived records that we've received an inbound connection
func (dht *DHT) markInboundReceived() {
dht.inboundMu.Lock()
dht.hasReceivedInbound = true
dht.inboundMu.Unlock()
}

// checkInboundStatus logs a warning if no inbound connections after startup period
func (dht *DHT) checkInboundStatus() {
dht.inboundMu.RLock()
hasInbound := dht.hasReceivedInbound
lastWarning := dht.lastInboundWarning
dht.inboundMu.RUnlock()

if hasInbound {
return
}

// Only warn after 10 minutes of uptime
if time.Since(dht.startTime) < 10*time.Minute {
return
}

// Repeat warning every 6 hours
if !lastWarning.IsZero() && time.Since(lastWarning) < 6*time.Hour {
return
}

dht.inboundMu.Lock()
dht.lastInboundWarning = time.Now()
dht.inboundMu.Unlock()

log.Printf("WARNING: No inbound connections received after 10 minutes.")
log.Printf(" Your node may be in outbound-only mode (can see network but others can't reach you).")
log.Printf(" Check that port %s is open and forwarded correctly, as UPnP may have failed.", dht.listenAddr)
}

// updateRoutingTable adds a node to the peer cache
// Simplified from Kademlia - we just cache all peers we hear about
func (dht *DHT) updateRoutingTable(sys *System) {
Expand Down Expand Up @@ -218,6 +262,9 @@ func (dht *DHT) handleDHTMessage(w http.ResponseWriter, r *http.Request) {
return
}

// Mark that we've received an inbound request (not a response)
dht.markInboundReceived()

switch msg.Type {
case MessageTypePing:
response, err = dht.handlePing(&msg)
Expand Down
13 changes: 12 additions & 1 deletion dht_maintenance.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func (dht *DHT) peerLivenessLoop() {
return
case <-ticker.C:
dht.checkPeerLiveness()
dht.checkInboundStatus()
}
}
}
Expand Down Expand Up @@ -208,7 +209,7 @@ func (dht *DHT) validateGossipSystems() {
}

// Try to ping the system directly
_, err := dht.Ping(sys.PeerAddress)
respSystem, err := dht.Ping(sys.PeerAddress)
if err != nil {
// Failed to contact - check if this is the expected system
// The Ping function already handles UUID mismatches
Expand All @@ -223,6 +224,16 @@ func (dht *DHT) validateGossipSystems() {
dht.routingTable.RemoveFromCache(sys.ID)
removed++
}
} else if respSystem != nil && respSystem.ID != sys.ID {
// UUID mismatch - a different system now lives at this address
// The gossip entry is stale, remove it
log.Printf(" %s (%s): UUID mismatch - address now belongs to %s (%s), removing stale entry",
sys.Name, sys.ID.String()[:8], respSystem.Name, respSystem.ID.String()[:8])
dht.routingTable.RemoveFromCache(sys.ID)
if err := dht.storage.DeletePeerSystem(sys.ID); err != nil {
log.Printf("Warning: failed to delete stale peer system %s: %v", sys.ID.String()[:8], err)
}
removed++
} else {
verified++
log.Printf(" %s: verified", sys.Name)
Expand Down