diff --git a/core/src/main/java/samba/network/history/HistoryNetwork.java b/core/src/main/java/samba/network/history/HistoryNetwork.java index 88661b47..3be8e39a 100644 --- a/core/src/main/java/samba/network/history/HistoryNetwork.java +++ b/core/src/main/java/samba/network/history/HistoryNetwork.java @@ -775,7 +775,7 @@ public Optional traceGetContent( @Override public Optional recursiveFindNodes( - final String nodeId, final int timeout) { + final String nodeId, Set excludedNodes, final int timeout) { Bytes nodeIdBytes = Bytes.fromHexString(nodeId); RecursiveLookupTaskFindNodes task = new RecursiveLookupTaskFindNodes( @@ -783,6 +783,7 @@ public Optional recursiveFindNodes( nodeIdBytes, this.discv5Client.getHomeNodeRecord().getNodeId(), this.routingTable.findClosestNodesToKey(nodeIdBytes, 10, false), + excludedNodes, timeout); CompletableFuture> future = task.execute(); try { diff --git a/core/src/main/java/samba/network/history/api/HistoryNetworkInternalAPI.java b/core/src/main/java/samba/network/history/api/HistoryNetworkInternalAPI.java index 685c94ce..ff165370 100644 --- a/core/src/main/java/samba/network/history/api/HistoryNetworkInternalAPI.java +++ b/core/src/main/java/samba/network/history/api/HistoryNetworkInternalAPI.java @@ -37,6 +37,8 @@ SafeFuture> findContent( Set getFoundNodes(ContentKey contentKey, int count, boolean inRadius); + Optional nodeRecordFromEnr(String enr); + void gossip(final Set nodes, final Bytes key, final Bytes content); int getMaxGossipCount(); @@ -52,7 +54,8 @@ SafeFuture> findContent( // TODO RENAME Optional getContent(ContentKey contentKey, int timeout); - Optional recursiveFindNodes(final String nodeId, final int timeout); + Optional recursiveFindNodes( + final String nodeId, Set excludedNodes, final int timeout); UInt256 getLocalNodeId(); diff --git a/core/src/main/java/samba/network/history/api/methods/PutContent.java b/core/src/main/java/samba/network/history/api/methods/PutContent.java index 3fce5a9e..0a00e03d 100644 --- a/core/src/main/java/samba/network/history/api/methods/PutContent.java +++ b/core/src/main/java/samba/network/history/api/methods/PutContent.java @@ -1,14 +1,19 @@ package samba.network.history.api.methods; import samba.api.jsonrpc.results.PutContentResult; +import samba.api.jsonrpc.results.RecursiveFindNodesResult; import samba.domain.content.ContentKey; import samba.domain.content.ContentUtil; import samba.network.history.api.HistoryNetworkInternalAPI; +import java.util.Optional; import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.tuweni.bytes.Bytes; import org.ethereum.beacon.discovery.schema.NodeRecord; +import org.hyperledger.besu.crypto.Hash; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -16,6 +21,7 @@ public class PutContent { private static final Logger LOG = LoggerFactory.getLogger(PutContent.class); private final HistoryNetworkInternalAPI historyNetworkInternalAPI; + private static final int SEARCH_TIMEOUT = 15; public PutContent(HistoryNetworkInternalAPI historyNetworkInternalAPI) { this.historyNetworkInternalAPI = historyNetworkInternalAPI; @@ -28,6 +34,20 @@ private PutContentResult execute(final Bytes contentKeyInBytes, final Bytes cont Set nodes = this.historyNetworkInternalAPI.getFoundNodes( contentKey, this.historyNetworkInternalAPI.getMaxGossipCount(), true); + if (nodes.size() < this.historyNetworkInternalAPI.getMaxGossipCount()) { + Optional newNodes = + this.historyNetworkInternalAPI.recursiveFindNodes( + Hash.sha256(contentKeyInBytes).toHexString(), nodes, SEARCH_TIMEOUT); + if (newNodes.isPresent()) { + nodes.addAll( + Stream.of(newNodes.get()) + .flatMap(result -> result.getNodes().stream()) + .map(enr -> this.historyNetworkInternalAPI.nodeRecordFromEnr(enr)) + .flatMap(opt -> opt.map(Stream::of).orElseGet(Stream::empty)) + .limit(this.historyNetworkInternalAPI.getMaxGossipCount() - nodes.size()) + .collect(Collectors.toSet())); + } + } this.historyNetworkInternalAPI.gossip(nodes, contentKey.getSszBytes(), contentValue); return new PutContentResult(storedLocally, nodes.size()); } diff --git a/core/src/main/java/samba/network/history/api/methods/RecursiveFindNodes.java b/core/src/main/java/samba/network/history/api/methods/RecursiveFindNodes.java index eac0093a..d9e21ec8 100644 --- a/core/src/main/java/samba/network/history/api/methods/RecursiveFindNodes.java +++ b/core/src/main/java/samba/network/history/api/methods/RecursiveFindNodes.java @@ -4,6 +4,7 @@ import samba.network.history.api.HistoryNetworkInternalAPI; import java.util.Optional; +import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -20,7 +21,7 @@ public RecursiveFindNodes(HistoryNetworkInternalAPI historyNetworkInternalAPI) { private Optional execute(final String nodeId) { Optional result = - this.historyNetworkInternalAPI.recursiveFindNodes(nodeId, SEARCH_TIMEOUT); + this.historyNetworkInternalAPI.recursiveFindNodes(nodeId, Set.of(), SEARCH_TIMEOUT); return result; } diff --git a/core/src/main/java/samba/services/search/RecursiveLookupTaskFindNodes.java b/core/src/main/java/samba/services/search/RecursiveLookupTaskFindNodes.java index 769b0462..5719de6e 100644 --- a/core/src/main/java/samba/services/search/RecursiveLookupTaskFindNodes.java +++ b/core/src/main/java/samba/services/search/RecursiveLookupTaskFindNodes.java @@ -39,12 +39,14 @@ public class RecursiveLookupTaskFindNodes { private final Bytes targetNodeId; private final Set queriedNodeIds = new HashSet<>(); private final SortedSet foundNodes; + private final Set excludedNodes = new HashSet<>(); public RecursiveLookupTaskFindNodes( final HistoryNetwork historyNetwork, final Bytes targetNodeId, final Bytes homeNodeId, final Set foundNodes, + final Set excludedNodes, final int timeout) { this.historyNetwork = historyNetwork; this.targetNodeId = targetNodeId; @@ -54,6 +56,7 @@ public RecursiveLookupTaskFindNodes( Comparator.comparing( foundNode -> UInt256.fromBytes(foundNode.getNodeId().xor(targetNodeId)))); this.foundNodes.addAll(foundNodes); + this.excludedNodes.addAll(excludedNodes); this.timeout = timeout; } @@ -86,6 +89,7 @@ private synchronized void sendRequests() { final boolean closestCondition = foundNodes.stream() .limit(MAX_NODE_LIST_COUNT) + .filter(record -> !excludedNodes.contains(record)) .filter( record -> UInt256.fromBytes(record.getNodeId().xor(targetNodeId)) @@ -103,8 +107,9 @@ record -> : Optional.of( new RecursiveFindNodesResult( foundNodes.stream() + .filter(record -> !excludedNodes.contains(record)) .limit(MAX_NODE_LIST_COUNT) - .map(nodeRecord -> nodeRecord.getNodeId().toHexString()) + .map(nodeRecord -> nodeRecord.asEnr()) .toList()))); return; } @@ -121,7 +126,8 @@ private void queryPeers(final List nodesToQuery) { private void queryPeer(final NodeRecord peer) { Integer logDistance = UInt256.fromBytes(peer.getNodeId().xor(targetNodeId)).toBigInteger().bitLength() - 1; - Set logDistances = Set.of(logDistance); + Set logDistances = new HashSet<>(); + logDistances.add(logDistance); if (!peer.getNodeId().equals(targetNodeId)) { logDistances.add(logDistance + 1); logDistances.add(logDistance - 1); diff --git a/core/src/test/java/samba/services/jsonrpc/methods/history/PortalHistoryRecursiveFindNodesTest.java b/core/src/test/java/samba/services/jsonrpc/methods/history/PortalHistoryRecursiveFindNodesTest.java index b26abe7c..bbf88a87 100644 --- a/core/src/test/java/samba/services/jsonrpc/methods/history/PortalHistoryRecursiveFindNodesTest.java +++ b/core/src/test/java/samba/services/jsonrpc/methods/history/PortalHistoryRecursiveFindNodesTest.java @@ -53,7 +53,7 @@ public void shouldReturnEnrList() { })); RecursiveFindNodesResult foundData = new RecursiveFindNodesResult(List.of("0x0001", "0x0002", "0x0003", "0x0004", "0x0005")); - when(historyJsonRpc.recursiveFindNodes(any(String.class), anyInt())) + when(historyJsonRpc.recursiveFindNodes(any(String.class), any(), anyInt())) .thenReturn(Optional.of(foundData)); final JsonRpcResponse expected = new JsonRpcSuccessResponse(request.getRequest().getId(), foundData);