From 6f6dd7d2270610c36138cafa95583e54476e4e72 Mon Sep 17 00:00:00 2001 From: Symious Date: Wed, 1 Oct 2025 15:20:20 +0800 Subject: [PATCH] RATIS-2340. Add ReadIndexAsync executor for readIndex stateMachine query --- .../java/org/apache/ratis/server/impl/RaftServerImpl.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 3c10e103b6..eafb8e7159 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -1054,6 +1054,9 @@ private CompletableFuture getReadIndex(RaftClientRequest request, LeaderSt return writeIndexCache.getWriteIndexFuture(request).thenCompose(leader::getReadIndex); } + private final ExecutorService readAsyncExecutor = ConcurrentUtils.newThreadPoolWithMax(false, + Math.max(2, Runtime.getRuntime().availableProcessors()), "ReadAsync"); + private CompletableFuture readAsync(RaftClientRequest request) { if (request.getType().getRead().getPreferNonLinearizable() || readOption == RaftServerConfigKeys.Read.Option.DEFAULT) { @@ -1087,7 +1090,8 @@ private CompletableFuture readAsync(RaftClientRequest request) return replyFuture .thenCompose(readIndex -> getReadRequests().waitToAdvance(readIndex)) - .thenCompose(readIndex -> queryStateMachine(request)) + .thenComposeAsync(ignored -> stateMachine.query(request.getMessage()), readAsyncExecutor) + .thenCompose(reply -> processQueryFuture(CompletableFuture.completedFuture(reply), request)) .exceptionally(e -> readException2Reply(request, e)); } else { throw new IllegalStateException("Unexpected read option: " + readOption);