diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/counter/TestCounter.java b/ratis-examples/src/test/java/org/apache/ratis/examples/counter/TestCounter.java index 3d188010d6..50cee4d515 100644 --- a/ratis-examples/src/test/java/org/apache/ratis/examples/counter/TestCounter.java +++ b/ratis-examples/src/test/java/org/apache/ratis/examples/counter/TestCounter.java @@ -18,6 +18,8 @@ package org.apache.ratis.examples.counter; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.impl.MiniRaftCluster; import org.apache.ratis.client.RaftClient; import org.apache.ratis.examples.ParameterizedBaseTest; @@ -30,6 +32,8 @@ import java.io.IOException; import java.util.Collection; +import java.util.List; +import java.util.concurrent.ExecutionException; public class TestCounter extends ParameterizedBaseTest { @@ -59,4 +63,40 @@ public void testSeveralCounter(MiniRaftCluster cluster) throws IOException, Inte Assertions.assertEquals(30, reply3.getMessage().getContent().asReadOnlyByteBuffer().getInt()); } } + + @ParameterizedTest + @MethodSource("data") + public void testSeveralCounterFromFollower(MiniRaftCluster cluster) throws IOException, InterruptedException { + setAndStart(cluster); + + List followers = cluster.getFollowers(); + Assertions.assertEquals(2, followers.size()); + final RaftPeerId f0 = followers.get(0).getId(); + final RaftServer.Division d1 = cluster.getDivision(f0); + + Message queryMessage = Message.valueOf("GET"); + + try (final RaftClient client = cluster.createClient()) { + for (int i = 0; i < 10; i++) { + client.io().send(Message.valueOf("INCREMENT")); + } + RaftClientReply reply1 = client.io().sendReadOnly(queryMessage); + Assertions.assertEquals(10, getCounterInt(reply1.getMessage())); + boolean isFollowerUptoDate = d1.okForLocalReadBounded(1000, 100); + // Clients can choose to query from local StateMachine or leader + if (isFollowerUptoDate) { + d1.getStateMachine().query(queryMessage).get(); + Assertions.assertTrue(10 >= getCounterInt(d1.getStateMachine().query(queryMessage).get())); + } else { + reply1 = client.io().sendReadOnly(queryMessage); + Assertions.assertEquals(10, getCounterInt(reply1.getMessage())); + } + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + } + + private int getCounterInt(Message message) { + return message.getContent().asReadOnlyByteBuffer().getInt(); + } } diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java index 84e3a1ed30..53b4d7c875 100644 --- a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java +++ b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java @@ -122,6 +122,8 @@ default RaftGroup getGroup() { @Override void close(); + + public boolean okForLocalReadBounded(int maxLag, long leaseMs); } /** @return the server ID. */ diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LocalLease.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LocalLease.java new file mode 100644 index 0000000000..dd8b08d0bf --- /dev/null +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LocalLease.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.impl; + +import org.apache.ratis.protocol.RaftPeerId; + +import java.util.concurrent.atomic.AtomicLong; + +/** LocalLease can be used for followers to check if it's updated to Leader. + */ +public class LocalLease { + private final AtomicLong leaderCommit = new AtomicLong(-1); + private volatile long lastHbNanos = 0L; + private volatile long leaseTerm = -1L; + private volatile RaftPeerId leaseLeader = null; + + public LocalLease() { + + } + + void onAppend(long term, RaftPeerId leader, long commitIdx) { + if (leaseTerm != term || leaseLeader == null || !leaseLeader.equals(leader)) { + leaseTerm = term; + leaseLeader = leader; + leaderCommit.set(commitIdx); + } else { + long prev; + do { + prev = leaderCommit.get(); + } while (commitIdx > prev && !leaderCommit.compareAndSet(prev, commitIdx)); + } + lastHbNanos = System.nanoTime(); + } + + public long getLastHbNanos() { + return lastHbNanos; + } + + public AtomicLong getLeaderCommit() { + return leaderCommit; + } +} diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 3c10e103b6..aa509feb70 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -262,6 +262,8 @@ public long[] getFollowerMatchIndices() { private final AtomicReference> appendLogFuture; private final NavigableIndices appendLogTermIndices = new NavigableIndices(); + private final LocalLease localLease; + RaftServerImpl(RaftGroup group, StateMachine stateMachine, RaftServerProxy proxy, RaftStorage.StartupOption option) throws IOException { final RaftPeerId id = proxy.getId(); @@ -305,6 +307,8 @@ public long[] getFollowerMatchIndices() { RaftServerConfigKeys.ThreadPool.clientCached(properties), RaftServerConfigKeys.ThreadPool.clientSize(properties), id + "-client"); + + this.localLease = new LocalLease(); } private long getCommitIndex(RaftPeerId id) { @@ -1501,6 +1505,7 @@ public CompletableFuture appendEntriesAsync(AppendEntri } assertGroup(getMemberId(), leaderId, leaderGroupId); assertEntries(r, previous, state); + localLease.onAppend(r.getLeaderTerm(), leaderId, r.getLeaderCommit()); return appendEntriesAsync(leaderId, request.getCallId(), previous, r); } catch(Exception t) { @@ -1510,6 +1515,16 @@ public CompletableFuture appendEntriesAsync(AppendEntri } } + @Override + public boolean okForLocalReadBounded(int maxLag, long leaseMs) { + if (System.nanoTime() - localLease.getLastHbNanos() > leaseMs * 1_000_000L) { + return false; + } + long applied = stateMachine.getLastAppliedTermIndex().getIndex(); + long target = localLease.getLeaderCommit().get(); + return target <= 0 || applied + maxLag >= target; + } + @Override public CompletableFuture readIndexAsync(ReadIndexRequestProto request) throws IOException { assertLifeCycleState(LifeCycle.States.RUNNING);