Skip to content

Commit fa8bef9

Browse files
hanmznicoloboschi
authored andcommitted
[fix][broker] Fix return the earliest position when query position by timestamp. (apache#20457)
(cherry picked from commit 0e60340) (cherry picked from commit 6a62499)
1 parent 42302aa commit fa8bef9

6 files changed

Lines changed: 67 additions & 2 deletions

File tree

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -551,6 +551,23 @@ Position findNewestMatching(FindPositionConstraint constraint, Predicate<Entry>
551551
void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate<Entry> condition,
552552
FindEntryCallback callback, Object ctx);
553553

554+
/**
555+
* Find the newest entry that matches the given predicate.
556+
*
557+
* @param constraint
558+
* search only active entries or all entries
559+
* @param condition
560+
* predicate that reads an entry an applies a condition
561+
* @param callback
562+
* callback object returning the resultant position
563+
* @param ctx
564+
* opaque context
565+
* @param isFindFromLedger
566+
* find the newest entry from ledger
567+
*/
568+
void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate<Entry> condition,
569+
FindEntryCallback callback, Object ctx, boolean isFindFromLedger);
570+
554571
/**
555572
* reset the cursor to specified position to enable replay of messages.
556573
*

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1104,6 +1104,12 @@ public void findEntryFailed(ManagedLedgerException exception, Optional<Position>
11041104
@Override
11051105
public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate<Entry> condition,
11061106
FindEntryCallback callback, Object ctx) {
1107+
asyncFindNewestMatching(constraint, condition, callback, ctx, false);
1108+
}
1109+
1110+
@Override
1111+
public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate<Entry> condition,
1112+
FindEntryCallback callback, Object ctx, boolean isFindFromLedger) {
11071113
OpFindNewest op;
11081114
PositionImpl startPosition = null;
11091115
long max = 0;
@@ -1125,7 +1131,11 @@ public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate
11251131
Optional.empty(), ctx);
11261132
return;
11271133
}
1128-
op = new OpFindNewest(this, startPosition, condition, max, callback, ctx);
1134+
if (isFindFromLedger) {
1135+
op = new OpFindNewest(this.ledger, startPosition, condition, max, callback, ctx);
1136+
} else {
1137+
op = new OpFindNewest(this, startPosition, condition, max, callback, ctx);
1138+
}
11291139
op.find();
11301140
}
11311141

managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,11 @@ public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate
241241
AsyncCallbacks.FindEntryCallback callback, Object ctx) {
242242
}
243243

244+
@Override
245+
public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate<Entry> condition,
246+
AsyncCallbacks.FindEntryCallback callback, Object ctx, boolean isFindFromLedger) {
247+
}
248+
244249
@Override
245250
public void asyncResetCursor(final Position position, boolean forceReset,
246251
AsyncCallbacks.ResetCursorCallback callback) {

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public void findMessages(final long timestamp, AsyncCallbacks.FindEntryCallback
7171
entry.release();
7272
}
7373
return false;
74-
}, this, callback);
74+
}, this, callback, true);
7575
} else {
7676
if (log.isDebugEnabled()) {
7777
log.debug("[{}][{}] Ignore message position find scheduled task, last find is still running", topicName,

pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/MockManagedCursor.java

Whitespace-only changes.

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,39 @@ public void findEntryFailed(ManagedLedgerException exception, Optional<Position>
243243
factory.shutdown();
244244
}
245245

246+
@Test
247+
void testPersistentMessageFinderWhenLastMessageDelete() throws Exception {
248+
final String ledgerAndCursorName = "testPersistentMessageFinderWhenLastMessageDelete";
249+
250+
ManagedLedgerConfig config = new ManagedLedgerConfig();
251+
config.setRetentionSizeInMB(10);
252+
config.setMaxEntriesPerLedger(10);
253+
config.setRetentionTime(1, TimeUnit.HOURS);
254+
ManagedLedger ledger = factory.open(ledgerAndCursorName, config);
255+
ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor(ledgerAndCursorName);
256+
257+
ledger.addEntry(createMessageWrittenToLedger("msg1"));
258+
ledger.addEntry(createMessageWrittenToLedger("msg2"));
259+
ledger.addEntry(createMessageWrittenToLedger("msg3"));
260+
Position lastPosition = ledger.addEntry(createMessageWrittenToLedger("last-message"));
261+
262+
long endTimestamp = System.currentTimeMillis() + 1000;
263+
264+
Result result = new Result();
265+
// delete last position message
266+
cursor.delete(lastPosition);
267+
CompletableFuture<Void> future = findMessage(result, cursor, endTimestamp);
268+
future.get();
269+
assertNull(result.exception);
270+
assertNotEquals(result.position, null);
271+
assertEquals(result.position, lastPosition);
272+
273+
result.reset();
274+
cursor.close();
275+
ledger.close();
276+
factory.shutdown();
277+
}
278+
246279
@Test
247280
void testPersistentMessageFinderWithBrokerTimestampForMessage() throws Exception {
248281

0 commit comments

Comments
 (0)