diff --git a/core/src/main/java/samba/network/history/HistoryNetwork.java b/core/src/main/java/samba/network/history/HistoryNetwork.java index c532e31f..bd437663 100644 --- a/core/src/main/java/samba/network/history/HistoryNetwork.java +++ b/core/src/main/java/samba/network/history/HistoryNetwork.java @@ -333,12 +333,12 @@ public SafeFuture> offer( .map(Util::addUnsignedLeb128SizeToData) .toList(); - Optional.ofNullable(contentToOffer) - .filter(list -> !list.isEmpty()) - .ifPresent( - list -> - utpManager.offerWrite( - nodeRecord, accept.getConnectionId(), Bytes.concatenate(list))); + if (contentToOffer != null && !contentToOffer.isEmpty()) { + return utpManager + .offerWrite( + nodeRecord, accept.getConnectionId(), Bytes.concatenate(contentToOffer)) + .thenApply(__ -> Optional.of(accept.getContentKeys())); + } return SafeFuture.completedFuture(Optional.of(accept.getContentKeys())); }) diff --git a/core/src/main/java/samba/services/utp/UTPManager.java b/core/src/main/java/samba/services/utp/UTPManager.java index caa5f500..8e0e159a 100644 --- a/core/src/main/java/samba/services/utp/UTPManager.java +++ b/core/src/main/java/samba/services/utp/UTPManager.java @@ -6,6 +6,7 @@ import java.io.IOException; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; @@ -68,19 +69,19 @@ public int acceptRead(NodeRecord nodeRecord, Consumer onContentReceived) return connectionId; } - public void offerWrite(final NodeRecord nodeRecord, final int connectionId, Bytes content) { - this.runAsyncUTP( + public SafeFuture offerWrite(NodeRecord nodeRecord, int connectionId, Bytes content) { + return runAsyncUTPWithFuture( () -> { - UTPClient utpClient = this.registerClient(nodeRecord, connectionId); + UTPClient utpClient = registerClient(nodeRecord, connectionId); utpClient .connect(connectionId, new UTPAddress(nodeRecord)) - .thenCompose(__ -> utpClient.write(content, this.utpExecutor)) - .get(); + .thenCompose(__ -> utpClient.write(content, utpExecutor)) + .join(); }, "offerWrite", nodeRecord, connectionId, - this.utpExecutor); + utpExecutor); } public int foundContentWrite(NodeRecord nodeRecord, Bytes content) { @@ -195,6 +196,21 @@ private void runAsyncUTP( .exceptionally(defaultUTPErrorLog(operationName, nodeRecord, connectionId)); } + private SafeFuture runAsyncUTPWithFuture( + RunnableUTP task, + String operationName, + NodeRecord nodeRecord, + int connectionId, + Executor executor) { + + CompletableFuture future = + SafeFuture.runAsync( + () -> executeWithHandling(task, operationName, nodeRecord, connectionId), executor) + .exceptionally(defaultUTPErrorLog(operationName, nodeRecord, connectionId)); + + return SafeFuture.of(future); + } + private void executeWithHandling( RunnableUTP task, String operationName, NodeRecord nodeRecord, int connectionId) { try { diff --git a/core/src/test/java/samba/network/history/OfferMessageTests.java b/core/src/test/java/samba/network/history/OfferMessageTests.java index 1491b3ce..abd8c19d 100644 --- a/core/src/test/java/samba/network/history/OfferMessageTests.java +++ b/core/src/test/java/samba/network/history/OfferMessageTests.java @@ -4,6 +4,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockStatic; @@ -251,7 +252,8 @@ public void sendOkOfferMessageWithEmptyContentAndGetAcceptedMessageAndAnOkRespon when(discv5Client.sendDiscv5Message(any(NodeRecord.class), any(Bytes.class), any(Bytes.class))) .thenReturn(createAcceptResponse(555, Bytes.of(1), 0)); when(historyDB.get(any(ContentKey.class))).thenReturn(Optional.of(Bytes.of(0))); - + when(utpManager.offerWrite(any(NodeRecord.class), anyInt(), any(Bytes.class))) + .thenReturn(SafeFuture.completedFuture(null)); Offer offer = new Offer(List.of(DefaultContent.key3)); Optional contentKeysBitList = @@ -277,7 +279,8 @@ public void sendOkOfferMessageWithEmptyContentAndGetAcceptedMessageAndAnOkRespon any(NodeRecord.class), any(Bytes.class), any(Bytes.class))) .thenReturn(createAcceptResponse(777, Bytes.of(0), 1)); when(historyDB.get(any(ContentKey.class))).thenReturn(Optional.of(Bytes.of(0))); - + when(utpManager.offerWrite(any(NodeRecord.class), anyInt(), any(Bytes.class))) + .thenReturn(SafeFuture.completedFuture(null)); Offer offer = new Offer(List.of(DefaultContent.key3)); Optional contentKeysByteList = this.historyNetwork.offer(nodeRecord, List.of(DefaultContent.value3), offer).get(); diff --git a/core/src/test/java/samba/util/DefaultContent.java b/core/src/test/java/samba/util/DefaultContent.java index 2a4b1698..6e3785ce 100644 --- a/core/src/test/java/samba/util/DefaultContent.java +++ b/core/src/test/java/samba/util/DefaultContent.java @@ -37,7 +37,7 @@ public final class DefaultContent { key3 = Bytes.fromHexString("0x0288e96d4537bea4d9c05d12549907b32561d3bf31f45aae734cdc119f13406cb6"); - value3 = Bytes.fromHexString("0x00"); + value3 = Bytes.fromHexString("0x"); preMergeBlockHeader = new ContentBlockHeader(