Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion core/src/main/java/samba/network/history/HistoryNetwork.java
Original file line number Diff line number Diff line change
Expand Up @@ -775,14 +775,15 @@ public Optional<TraceGetContentResult> traceGetContent(

@Override
public Optional<RecursiveFindNodesResult> recursiveFindNodes(
final String nodeId, final int timeout) {
final String nodeId, Set<NodeRecord> excludedNodes, final int timeout) {
Bytes nodeIdBytes = Bytes.fromHexString(nodeId);
RecursiveLookupTaskFindNodes task =
new RecursiveLookupTaskFindNodes(
this,
nodeIdBytes,
this.discv5Client.getHomeNodeRecord().getNodeId(),
this.routingTable.findClosestNodesToKey(nodeIdBytes, 10, false),
excludedNodes,
timeout);
CompletableFuture<Optional<RecursiveFindNodesResult>> future = task.execute();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ SafeFuture<Optional<FindContentResult>> findContent(

Set<NodeRecord> getFoundNodes(ContentKey contentKey, int count, boolean inRadius);

Optional<NodeRecord> nodeRecordFromEnr(String enr);

void gossip(final Set<NodeRecord> nodes, final Bytes key, final Bytes content);

int getMaxGossipCount();
Expand All @@ -52,7 +54,8 @@ SafeFuture<Optional<FindContentResult>> findContent(
// TODO RENAME
Optional<FindContentResult> getContent(ContentKey contentKey, int timeout);

Optional<RecursiveFindNodesResult> recursiveFindNodes(final String nodeId, final int timeout);
Optional<RecursiveFindNodesResult> recursiveFindNodes(
final String nodeId, Set<NodeRecord> excludedNodes, final int timeout);

UInt256 getLocalNodeId();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,27 @@
package samba.network.history.api.methods;

import samba.api.jsonrpc.results.PutContentResult;
import samba.api.jsonrpc.results.RecursiveFindNodesResult;
import samba.domain.content.ContentKey;
import samba.domain.content.ContentUtil;
import samba.network.history.api.HistoryNetworkInternalAPI;

import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.apache.tuweni.bytes.Bytes;
import org.ethereum.beacon.discovery.schema.NodeRecord;
import org.hyperledger.besu.crypto.Hash;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PutContent {

private static final Logger LOG = LoggerFactory.getLogger(PutContent.class);
private final HistoryNetworkInternalAPI historyNetworkInternalAPI;
private static final int SEARCH_TIMEOUT = 15;

public PutContent(HistoryNetworkInternalAPI historyNetworkInternalAPI) {
this.historyNetworkInternalAPI = historyNetworkInternalAPI;
Expand All @@ -28,6 +34,20 @@ private PutContentResult execute(final Bytes contentKeyInBytes, final Bytes cont
Set<NodeRecord> nodes =
this.historyNetworkInternalAPI.getFoundNodes(
contentKey, this.historyNetworkInternalAPI.getMaxGossipCount(), true);
if (nodes.size() < this.historyNetworkInternalAPI.getMaxGossipCount()) {
Optional<RecursiveFindNodesResult> newNodes =
this.historyNetworkInternalAPI.recursiveFindNodes(
Hash.sha256(contentKeyInBytes).toHexString(), nodes, SEARCH_TIMEOUT);
if (newNodes.isPresent()) {
nodes.addAll(
Stream.of(newNodes.get())
.flatMap(result -> result.getNodes().stream())
.map(enr -> this.historyNetworkInternalAPI.nodeRecordFromEnr(enr))
.flatMap(opt -> opt.map(Stream::of).orElseGet(Stream::empty))
.limit(this.historyNetworkInternalAPI.getMaxGossipCount() - nodes.size())
.collect(Collectors.toSet()));
}
}
this.historyNetworkInternalAPI.gossip(nodes, contentKey.getSszBytes(), contentValue);
return new PutContentResult(storedLocally, nodes.size());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import samba.network.history.api.HistoryNetworkInternalAPI;

import java.util.Optional;
import java.util.Set;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -20,7 +21,7 @@ public RecursiveFindNodes(HistoryNetworkInternalAPI historyNetworkInternalAPI) {

private Optional<RecursiveFindNodesResult> execute(final String nodeId) {
Optional<RecursiveFindNodesResult> result =
this.historyNetworkInternalAPI.recursiveFindNodes(nodeId, SEARCH_TIMEOUT);
this.historyNetworkInternalAPI.recursiveFindNodes(nodeId, Set.of(), SEARCH_TIMEOUT);
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,14 @@ public class RecursiveLookupTaskFindNodes {
private final Bytes targetNodeId;
private final Set<Bytes> queriedNodeIds = new HashSet<>();
private final SortedSet<NodeRecord> foundNodes;
private final Set<NodeRecord> excludedNodes = new HashSet<>();

public RecursiveLookupTaskFindNodes(
final HistoryNetwork historyNetwork,
final Bytes targetNodeId,
final Bytes homeNodeId,
final Set<NodeRecord> foundNodes,
final Set<NodeRecord> excludedNodes,
final int timeout) {
this.historyNetwork = historyNetwork;
this.targetNodeId = targetNodeId;
Expand All @@ -54,6 +56,7 @@ public RecursiveLookupTaskFindNodes(
Comparator.comparing(
foundNode -> UInt256.fromBytes(foundNode.getNodeId().xor(targetNodeId))));
this.foundNodes.addAll(foundNodes);
this.excludedNodes.addAll(excludedNodes);
this.timeout = timeout;
}

Expand Down Expand Up @@ -86,6 +89,7 @@ private synchronized void sendRequests() {
final boolean closestCondition =
foundNodes.stream()
.limit(MAX_NODE_LIST_COUNT)
.filter(record -> !excludedNodes.contains(record))
.filter(
record ->
UInt256.fromBytes(record.getNodeId().xor(targetNodeId))
Expand All @@ -103,8 +107,9 @@ record ->
: Optional.of(
new RecursiveFindNodesResult(
foundNodes.stream()
.filter(record -> !excludedNodes.contains(record))
.limit(MAX_NODE_LIST_COUNT)
.map(nodeRecord -> nodeRecord.getNodeId().toHexString())
.map(nodeRecord -> nodeRecord.asEnr())
.toList())));
return;
}
Expand All @@ -121,7 +126,8 @@ private void queryPeers(final List<NodeRecord> nodesToQuery) {
private void queryPeer(final NodeRecord peer) {
Integer logDistance =
UInt256.fromBytes(peer.getNodeId().xor(targetNodeId)).toBigInteger().bitLength() - 1;
Set<Integer> logDistances = Set.of(logDistance);
Set<Integer> logDistances = new HashSet<>();
logDistances.add(logDistance);
if (!peer.getNodeId().equals(targetNodeId)) {
logDistances.add(logDistance + 1);
logDistances.add(logDistance - 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public void shouldReturnEnrList() {
}));
RecursiveFindNodesResult foundData =
new RecursiveFindNodesResult(List.of("0x0001", "0x0002", "0x0003", "0x0004", "0x0005"));
when(historyJsonRpc.recursiveFindNodes(any(String.class), anyInt()))
when(historyJsonRpc.recursiveFindNodes(any(String.class), any(), anyInt()))
.thenReturn(Optional.of(foundData));
final JsonRpcResponse expected =
new JsonRpcSuccessResponse(request.getRequest().getId(), foundData);
Expand Down
Loading