Skip to content

Commit da8b69f

Browse files
committed
Fix return the earliest position when query position by timestamp. #20457
1 parent e789a71 commit da8b69f

File tree

5 files changed

+68
-3
lines changed

5 files changed

+68
-3
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -550,6 +550,23 @@ Position findNewestMatching(FindPositionConstraint constraint, Predicate<Entry>
550550
void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate<Entry> condition,
551551
FindEntryCallback callback, Object ctx);
552552

553+
/**
554+
* Find the newest entry that matches the given predicate.
555+
*
556+
* @param constraint
557+
* search only active entries or all entries
558+
* @param condition
559+
* predicate that reads an entry an applies a condition
560+
* @param callback
561+
* callback object returning the resultant position
562+
* @param ctx
563+
* opaque context
564+
* @param isFindFromLedger
565+
* find the newest entry from ledger
566+
*/
567+
void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate<Entry> condition,
568+
FindEntryCallback callback, Object ctx, boolean isFindFromLedger);
569+
553570
/**
554571
* reset the cursor to specified position to enable replay of messages.
555572
*

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1101,7 +1101,13 @@ public void findEntryFailed(ManagedLedgerException exception, Optional<Position>
11011101

11021102
@Override
11031103
public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate<Entry> condition,
1104-
FindEntryCallback callback, Object ctx) {
1104+
FindEntryCallback callback, Object ctx) {
1105+
asyncFindNewestMatching(constraint, condition, callback, ctx, false);
1106+
}
1107+
1108+
@Override
1109+
public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate<Entry> condition,
1110+
FindEntryCallback callback, Object ctx, boolean isFindFromLedger) {
11051111
OpFindNewest op;
11061112
PositionImpl startPosition = null;
11071113
long max = 0;
@@ -1123,7 +1129,11 @@ public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate
11231129
Optional.empty(), ctx);
11241130
return;
11251131
}
1126-
op = new OpFindNewest(this, startPosition, condition, max, callback, ctx);
1132+
if (isFindFromLedger) {
1133+
op = new OpFindNewest(this.ledger, startPosition, condition, max, callback, ctx);
1134+
} else {
1135+
op = new OpFindNewest(this, startPosition, condition, max, callback, ctx);
1136+
}
11271137
op.find();
11281138
}
11291139

managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,11 @@ public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate
243243
AsyncCallbacks.FindEntryCallback callback, Object ctx) {
244244
}
245245

246+
@Override
247+
public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate<Entry> condition,
248+
AsyncCallbacks.FindEntryCallback callback, Object ctx, boolean isFindFromLedger) {
249+
}
250+
246251
@Override
247252
public void asyncResetCursor(final Position position, boolean forceReset,
248253
AsyncCallbacks.ResetCursorCallback callback) {

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public void findMessages(final long timestamp, AsyncCallbacks.FindEntryCallback
7171
entry.release();
7272
}
7373
return false;
74-
}, this, callback);
74+
}, this, callback, true);
7575
} else {
7676
if (log.isDebugEnabled()) {
7777
log.debug("[{}][{}] Ignore message position find scheduled task, last find is still running", topicName,

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,39 @@ public void findEntryFailed(ManagedLedgerException exception, Optional<Position>
243243
factory.shutdown();
244244
}
245245

246+
@Test
247+
void testPersistentMessageFinderWhenLastMessageDelete() throws Exception {
248+
final String ledgerAndCursorName = "testPersistentMessageFinderWhenLastMessageDelete";
249+
250+
ManagedLedgerConfig config = new ManagedLedgerConfig();
251+
config.setRetentionSizeInMB(10);
252+
config.setMaxEntriesPerLedger(10);
253+
config.setRetentionTime(1, TimeUnit.HOURS);
254+
ManagedLedger ledger = factory.open(ledgerAndCursorName, config);
255+
ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor(ledgerAndCursorName);
256+
257+
ledger.addEntry(createMessageWrittenToLedger("msg1"));
258+
ledger.addEntry(createMessageWrittenToLedger("msg2"));
259+
ledger.addEntry(createMessageWrittenToLedger("msg3"));
260+
Position lastPosition = ledger.addEntry(createMessageWrittenToLedger("last-message"));
261+
262+
long endTimestamp = System.currentTimeMillis() + 1000;
263+
264+
Result result = new Result();
265+
// delete last position message
266+
cursor.delete(lastPosition);
267+
CompletableFuture<Void> future = findMessage(result, cursor, endTimestamp);
268+
future.get();
269+
assertNull(result.exception);
270+
assertNotEquals(result.position, null);
271+
assertEquals(result.position, lastPosition);
272+
273+
result.reset();
274+
cursor.close();
275+
ledger.close();
276+
factory.shutdown();
277+
}
278+
246279
@Test
247280
void testPersistentMessageFinderWithBrokerTimestampForMessage() throws Exception {
248281

0 commit comments

Comments
 (0)