diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java index edbfa0b43204e..d1ffdf6d2d763 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java @@ -637,6 +637,23 @@ Position findNewestMatching(FindPositionConstraint constraint, Predicate void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate condition, FindEntryCallback callback, Object ctx); + /** + * Find the newest entry that matches the given predicate. + * + * @param constraint + * search only active entries or all entries + * @param condition + * predicate that reads an entry an applies a condition + * @param callback + * callback object returning the resultant position + * @param ctx + * opaque context + * @param isFindFromLedger + * find the newest entry from ledger + */ + void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate condition, + FindEntryCallback callback, Object ctx, boolean isFindFromLedger); + /** * reset the cursor to specified position to enable replay of messages. * diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index f30f9553e15e0..8ce3a322c09be 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -1182,6 +1182,12 @@ public void findEntryFailed(ManagedLedgerException exception, Optional @Override public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate condition, FindEntryCallback callback, Object ctx) { + asyncFindNewestMatching(constraint, condition, callback, ctx, false); + } + + @Override + public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate condition, + FindEntryCallback callback, Object ctx, boolean isFindFromLedger) { OpFindNewest op; PositionImpl startPosition = null; long max = 0; @@ -1203,7 +1209,11 @@ public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate Optional.empty(), ctx); return; } - op = new OpFindNewest(this, startPosition, condition, max, callback, ctx); + if (isFindFromLedger) { + op = new OpFindNewest(this.ledger, startPosition, condition, max, callback, ctx); + } else { + op = new OpFindNewest(this, startPosition, condition, max, callback, ctx); + } op.find(); } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java index 2c01b778caf6b..04d99d3bdf480 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java @@ -258,6 +258,11 @@ public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate AsyncCallbacks.FindEntryCallback callback, Object ctx) { } + @Override + public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate condition, + AsyncCallbacks.FindEntryCallback callback, Object ctx, boolean isFindFromLedger) { + } + @Override public void asyncResetCursor(final Position position, boolean forceReset, AsyncCallbacks.ResetCursorCallback callback) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java index d2e6f6f5ff869..08273155e4cfa 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java @@ -71,7 +71,7 @@ public void findMessages(final long timestamp, AsyncCallbacks.FindEntryCallback entry.release(); } return false; - }, this, callback); + }, this, callback, true); } else { if (log.isDebugEnabled()) { log.debug("[{}][{}] Ignore message position find scheduled task, last find is still running", topicName, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockManagedCursor.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockManagedCursor.java index 499262c1e60b9..477290fc6837a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockManagedCursor.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockManagedCursor.java @@ -24,6 +24,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Predicate; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; @@ -276,6 +277,11 @@ public void asyncFindNewestMatching(FindPositionConstraint constraint, } + @Override + public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate condition, + AsyncCallbacks.FindEntryCallback callback, Object ctx, boolean isFindFromLedger) { + } + @Override public void resetCursor(Position position) throws InterruptedException, ManagedLedgerException { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java index 48798f0020f01..e77fd07c6ef8b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java @@ -243,6 +243,39 @@ public void findEntryFailed(ManagedLedgerException exception, Optional factory.shutdown(); } + @Test + void testPersistentMessageFinderWhenLastMessageDelete() throws Exception { + final String ledgerAndCursorName = "testPersistentMessageFinderWhenLastMessageDelete"; + + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setRetentionSizeInMB(10); + config.setMaxEntriesPerLedger(10); + config.setRetentionTime(1, TimeUnit.HOURS); + ManagedLedger ledger = factory.open(ledgerAndCursorName, config); + ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor(ledgerAndCursorName); + + ledger.addEntry(createMessageWrittenToLedger("msg1")); + ledger.addEntry(createMessageWrittenToLedger("msg2")); + ledger.addEntry(createMessageWrittenToLedger("msg3")); + Position lastPosition = ledger.addEntry(createMessageWrittenToLedger("last-message")); + + long endTimestamp = System.currentTimeMillis() + 1000; + + Result result = new Result(); + // delete last position message + cursor.delete(lastPosition); + CompletableFuture future = findMessage(result, cursor, endTimestamp); + future.get(); + assertNull(result.exception); + assertNotEquals(result.position, null); + assertEquals(result.position, lastPosition); + + result.reset(); + cursor.close(); + ledger.close(); + factory.shutdown(); + } + @Test void testPersistentMessageFinderWithBrokerTimestampForMessage() throws Exception {