diff --git a/core/src/main/java/samba/network/history/HistoryNetwork.java b/core/src/main/java/samba/network/history/HistoryNetwork.java index 7d802ce2..30cab06c 100644 --- a/core/src/main/java/samba/network/history/HistoryNetwork.java +++ b/core/src/main/java/samba/network/history/HistoryNetwork.java @@ -43,6 +43,7 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.HashSet; import java.util.List; import java.util.Objects; import java.util.Optional; @@ -449,17 +450,44 @@ private Optional getAssociatedBlockHeader( if (blockHeaderBytes.isPresent()) { return ContentUtil.createBlockHeaderfromSszBytes(blockHeaderBytes.get()); } - - Optional searchResult = getContent(blockHeaderKey, DEFAULT_TIMEOUT); - if (searchResult.isPresent()) { - Optional blockHeader = - ContentUtil.createBlockHeaderfromSszBytes( - Bytes.fromHexString(searchResult.get().getContent())); - this.store( - blockHeaderKeySsz, blockHeader.get().getSszBytes()); // Store newly located block header - return blockHeader; + int SEARCH_ATTEMPTS = 3; + Set excludedNodes = new HashSet<>(); + for (int i = 0; i < SEARCH_ATTEMPTS; i++) { + RecursiveLookupTaskFindContent task = + new RecursiveLookupTaskFindContent( + this, + blockHeaderKey.getSszBytes(), + this.discv5Client.getHomeNodeRecord().getNodeId(), + getFoundNodes(blockHeaderKey), + DEFAULT_TIMEOUT); + Optional searchResult = task.execute().join(); + + if (searchResult.isPresent()) { + Optional blockHeader = + ContentUtil.createBlockHeaderfromSszBytes( + Bytes.fromHexString(searchResult.get().getContent())); + if (this.store(blockHeaderKeySsz, blockHeader.get().getSszBytes())) { + LOG.debug( + "Found and stored block header for content key {}: {}", + contentKey, + blockHeaderKey.getSszBytes().toHexString()); + this.gossip( + task.getInterestedNodes(), + blockHeaderKey.getSszBytes(), + Bytes.fromHexString(searchResult.get().getContent())); // POKE Mechanism + return blockHeader; // Store newly located block header + } + LOG.debug( + "Found Header for content key {} invalid, trying other peers", + contentKey, + blockHeaderKey); + } + if (task.getRespondingNode().isPresent()) excludedNodes.add(task.getRespondingNode().get()); } - + LOG.debug( + "Could not find associated block header for content key {} after {} attempts", + contentKey, + SEARCH_ATTEMPTS); return Optional.empty(); } catch (Exception e) { LOG.debug("Error when retrieving associated block header for {}", contentKey, e); diff --git a/core/src/main/java/samba/services/search/RecursiveLookupTaskFindContent.java b/core/src/main/java/samba/services/search/RecursiveLookupTaskFindContent.java index 2d6a6a64..6864a603 100644 --- a/core/src/main/java/samba/services/search/RecursiveLookupTaskFindContent.java +++ b/core/src/main/java/samba/services/search/RecursiveLookupTaskFindContent.java @@ -8,6 +8,7 @@ import samba.domain.messages.requests.FindContent; import samba.network.history.HistoryNetwork; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Optional; @@ -36,6 +37,8 @@ public class RecursiveLookupTaskFindContent { private Optional content = Optional.empty(); private final int timeout; private final Set interestedNodes = new HashSet<>(); + private final Set excludedNodes = new HashSet<>(); + private Optional respondingNode; public RecursiveLookupTaskFindContent( final HistoryNetwork historyNetwork, @@ -43,11 +46,22 @@ public RecursiveLookupTaskFindContent( final Bytes homeNodeId, final Set foundNodes, final int timeout) { + this(historyNetwork, contentKey, homeNodeId, foundNodes, Set.of(), timeout); + } + + public RecursiveLookupTaskFindContent( + final HistoryNetwork historyNetwork, + final Bytes contentKey, + final Bytes homeNodeId, + final Set foundNodes, + final Set excludedNodes, + final int timeout) { this.historyNetwork = historyNetwork; this.contentKey = contentKey; this.queriedNodeIds.add(homeNodeId); this.foundNodes.addAll(foundNodes); this.timeout = timeout; + this.excludedNodes.addAll(excludedNodes); } public CompletableFuture> execute() { @@ -61,6 +75,7 @@ private synchronized void sendRequests() { return; } if (content.isPresent()) { + LOG.error("No nodes left to query"); future.complete(content); return; } @@ -72,7 +87,7 @@ private synchronized void sendRequests() { .collect(Collectors.toList()); if (nodesToQuery.isEmpty()) { - future.completeExceptionally(new RuntimeException("No nodes left to query.")); + future.complete(content); return; } @@ -99,14 +114,17 @@ private void queryPeer(final NodeRecord peer) { LOG.debug("Node {} returned empty result", peer.getNodeId()); } else { FindContentResult contentResult = result.get(); - if (contentResult.getContent() != null) { + if (contentResult.getContent() != null && !excludedNodes.contains(peer)) { + this.respondingNode = Optional.of(peer); content = Optional.of(contentResult); future.complete(content); return; } // Add nodes to foundNodes and continue searching foundNodes.addAll( - contentResult.getEnrs().stream() + Optional.ofNullable(contentResult.getEnrs()) + .orElseGet(Collections::emptyList) + .stream() .map(historyNetwork::nodeRecordFromEnr) .flatMap(Optional::stream) .filter(node -> !queriedNodeIds.contains(node.getNodeId())) @@ -133,4 +151,8 @@ private void queryPeer(final NodeRecord peer) { public Set getInterestedNodes() { return interestedNodes; } + + public Optional getRespondingNode() { + return respondingNode; + } }