From 71153c5d6bbc2ad2291c198744b6861fe6c38dc4 Mon Sep 17 00:00:00 2001 From: hanmz Date: Thu, 1 Jun 2023 19:39:06 +0800 Subject: [PATCH 1/4] Fix return the earliest position when query position by timestamp. --- .../java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java index 900af9322c791..4a2a85264c071 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java @@ -142,7 +142,7 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) { } public void find() { - if (cursor != null ? cursor.hasMoreEntries(searchPosition) : ledger.hasMoreEntries(searchPosition)) { + if (ledger.hasMoreEntries(searchPosition)) { ledger.asyncReadEntry(searchPosition, this, null); } else { callback.findEntryComplete(lastMatchedPosition, OpFindNewest.this.ctx); From eae4d5da25dd00377aadb1d4c677f7f65648c8fa Mon Sep 17 00:00:00 2001 From: hanmz Date: Mon, 26 Jun 2023 20:06:49 +0800 Subject: [PATCH 2/4] Fix return the earliest position when query position by timestamp. --- .../bookkeeper/mledger/ManagedCursor.java | 17 +++++++++ .../mledger/impl/ManagedCursorImpl.java | 12 +++++- .../bookkeeper/mledger/impl/OpFindNewest.java | 2 +- .../impl/ManagedCursorContainerTest.java | 5 +++ .../persistent/PersistentMessageFinder.java | 2 +- .../broker/delayed/MockManagedCursor.java | 6 +++ .../service/PersistentMessageFinderTest.java | 37 +++++++++++++++++++ 7 files changed, 78 insertions(+), 3 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java index edbfa0b43204e..d1ffdf6d2d763 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java @@ -637,6 +637,23 @@ Position findNewestMatching(FindPositionConstraint constraint, Predicate void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate condition, FindEntryCallback callback, Object ctx); + /** + * Find the newest entry that matches the given predicate. + * + * @param constraint + * search only active entries or all entries + * @param condition + * predicate that reads an entry an applies a condition + * @param callback + * callback object returning the resultant position + * @param ctx + * opaque context + * @param isFindFromLedger + * find the newest entry from ledger + */ + void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate condition, + FindEntryCallback callback, Object ctx, boolean isFindFromLedger); + /** * reset the cursor to specified position to enable replay of messages. * diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 1ce0403a54762..e5d784d40d48e 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -1182,6 +1182,12 @@ public void findEntryFailed(ManagedLedgerException exception, Optional @Override public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate condition, FindEntryCallback callback, Object ctx) { + asyncFindNewestMatching(constraint, condition, callback, ctx, false); + } + + @Override + public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate condition, + FindEntryCallback callback, Object ctx, boolean isFindFromLedger) { OpFindNewest op; PositionImpl startPosition = null; long max = 0; @@ -1203,7 +1209,11 @@ public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate Optional.empty(), ctx); return; } - op = new OpFindNewest(this, startPosition, condition, max, callback, ctx); + if (isFindFromLedger) { + op = new OpFindNewest(this.ledger, startPosition, condition, max, callback, ctx); + } else { + op = new OpFindNewest(this, startPosition, condition, max, callback, ctx); + } op.find(); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java index 4a2a85264c071..900af9322c791 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java @@ -142,7 +142,7 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) { } public void find() { - if (ledger.hasMoreEntries(searchPosition)) { + if (cursor != null ? cursor.hasMoreEntries(searchPosition) : ledger.hasMoreEntries(searchPosition)) { ledger.asyncReadEntry(searchPosition, this, null); } else { callback.findEntryComplete(lastMatchedPosition, OpFindNewest.this.ctx); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java index 2c01b778caf6b..04d99d3bdf480 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java @@ -258,6 +258,11 @@ public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate AsyncCallbacks.FindEntryCallback callback, Object ctx) { } + @Override + public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate condition, + AsyncCallbacks.FindEntryCallback callback, Object ctx, boolean isFindFromLedger) { + } + @Override public void asyncResetCursor(final Position position, boolean forceReset, AsyncCallbacks.ResetCursorCallback callback) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java index d2e6f6f5ff869..08273155e4cfa 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java @@ -71,7 +71,7 @@ public void findMessages(final long timestamp, AsyncCallbacks.FindEntryCallback entry.release(); } return false; - }, this, callback); + }, this, callback, true); } else { if (log.isDebugEnabled()) { log.debug("[{}][{}] Ignore message position find scheduled task, last find is still running", topicName, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockManagedCursor.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockManagedCursor.java index 499262c1e60b9..477290fc6837a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockManagedCursor.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockManagedCursor.java @@ -24,6 +24,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Predicate; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; @@ -276,6 +277,11 @@ public void asyncFindNewestMatching(FindPositionConstraint constraint, } + @Override + public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate condition, + AsyncCallbacks.FindEntryCallback callback, Object ctx, boolean isFindFromLedger) { + } + @Override public void resetCursor(Position position) throws InterruptedException, ManagedLedgerException { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java index 48798f0020f01..6f7d17f7c1356 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java @@ -243,6 +243,43 @@ public void findEntryFailed(ManagedLedgerException exception, Optional factory.shutdown(); } + @Test + void testPersistentMessageFinderWhenLastMessageDelete() throws Exception { + final String ledgerAndCursorName = "testPersistentMessageFinderWhenLastMessageDelete"; + + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setRetentionSizeInMB(10); + config.setMaxEntriesPerLedger(10); + config.setRetentionTime(1, TimeUnit.HOURS); + ManagedLedger ledger = factory.open(ledgerAndCursorName, config); + ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor(ledgerAndCursorName); + + ledger.addEntry(createMessageWrittenToLedger("msg1")); + Thread.sleep(100); + ledger.addEntry(createMessageWrittenToLedger("msg2")); + Thread.sleep(100); + ledger.addEntry(createMessageWrittenToLedger("msg3")); + Thread.sleep(100); + Position lastPosition = ledger.addEntry(createMessageWrittenToLedger("last-message")); + + Thread.sleep(1000); + long endTimestamp = System.currentTimeMillis(); + + Result result = new Result(); + // delete last position message + cursor.delete(lastPosition); + CompletableFuture future = findMessage(result, cursor, endTimestamp); + future.get(); + assertNull(result.exception); + assertNotEquals(result.position, null); + assertEquals(result.position, lastPosition); + + result.reset(); + cursor.close(); + ledger.close(); + factory.shutdown(); + } + @Test void testPersistentMessageFinderWithBrokerTimestampForMessage() throws Exception { From 0efd53375c9f764636527f97a4af51c2505d6261 Mon Sep 17 00:00:00 2001 From: hanmz Date: Wed, 28 Jun 2023 10:47:38 +0800 Subject: [PATCH 3/4] Remove unnecessary sleep --- .../pulsar/broker/service/PersistentMessageFinderTest.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java index 6f7d17f7c1356..eb6248fae5a14 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java @@ -255,11 +255,8 @@ void testPersistentMessageFinderWhenLastMessageDelete() throws Exception { ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor(ledgerAndCursorName); ledger.addEntry(createMessageWrittenToLedger("msg1")); - Thread.sleep(100); ledger.addEntry(createMessageWrittenToLedger("msg2")); - Thread.sleep(100); ledger.addEntry(createMessageWrittenToLedger("msg3")); - Thread.sleep(100); Position lastPosition = ledger.addEntry(createMessageWrittenToLedger("last-message")); Thread.sleep(1000); From f1958ffa1b65add0f2e2eb2409201fd481fb259a Mon Sep 17 00:00:00 2001 From: hanmz Date: Thu, 29 Jun 2023 15:26:38 +0800 Subject: [PATCH 4/4] Remove unnecessary sleep --- .../pulsar/broker/service/PersistentMessageFinderTest.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java index eb6248fae5a14..e77fd07c6ef8b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java @@ -258,9 +258,8 @@ void testPersistentMessageFinderWhenLastMessageDelete() throws Exception { ledger.addEntry(createMessageWrittenToLedger("msg2")); ledger.addEntry(createMessageWrittenToLedger("msg3")); Position lastPosition = ledger.addEntry(createMessageWrittenToLedger("last-message")); - - Thread.sleep(1000); - long endTimestamp = System.currentTimeMillis(); + + long endTimestamp = System.currentTimeMillis() + 1000; Result result = new Result(); // delete last position message