Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,23 @@ Position findNewestMatching(FindPositionConstraint constraint, Predicate<Entry>
void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate<Entry> condition,
FindEntryCallback callback, Object ctx);

/**
* Find the newest entry that matches the given predicate.
*
* @param constraint
* search only active entries or all entries
* @param condition
* predicate that reads an entry an applies a condition
* @param callback
* callback object returning the resultant position
* @param ctx
* opaque context
* @param isFindFromLedger
* find the newest entry from ledger
*/
void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate<Entry> condition,
FindEntryCallback callback, Object ctx, boolean isFindFromLedger);

/**
* reset the cursor to specified position to enable replay of messages.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1182,6 +1182,12 @@ public void findEntryFailed(ManagedLedgerException exception, Optional<Position>
@Override
public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate<Entry> condition,
FindEntryCallback callback, Object ctx) {
asyncFindNewestMatching(constraint, condition, callback, ctx, false);
}

@Override
public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate<Entry> condition,
FindEntryCallback callback, Object ctx, boolean isFindFromLedger) {
OpFindNewest op;
PositionImpl startPosition = null;
long max = 0;
Expand All @@ -1203,7 +1209,11 @@ public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate
Optional.empty(), ctx);
return;
}
op = new OpFindNewest(this, startPosition, condition, max, callback, ctx);
if (isFindFromLedger) {
op = new OpFindNewest(this.ledger, startPosition, condition, max, callback, ctx);
} else {
op = new OpFindNewest(this, startPosition, condition, max, callback, ctx);
}
op.find();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,11 @@ public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate
AsyncCallbacks.FindEntryCallback callback, Object ctx) {
}

@Override
public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate<Entry> condition,
AsyncCallbacks.FindEntryCallback callback, Object ctx, boolean isFindFromLedger) {
}

@Override
public void asyncResetCursor(final Position position, boolean forceReset,
AsyncCallbacks.ResetCursorCallback callback) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public void findMessages(final long timestamp, AsyncCallbacks.FindEntryCallback
entry.release();
}
return false;
}, this, callback);
}, this, callback, true);
} else {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Ignore message position find scheduled task, last find is still running", topicName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Predicate;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
Expand Down Expand Up @@ -276,6 +277,11 @@ public void asyncFindNewestMatching(FindPositionConstraint constraint,

}

@Override
public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate<Entry> condition,
AsyncCallbacks.FindEntryCallback callback, Object ctx, boolean isFindFromLedger) {
}

@Override
public void resetCursor(Position position) throws InterruptedException, ManagedLedgerException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,39 @@ public void findEntryFailed(ManagedLedgerException exception, Optional<Position>
factory.shutdown();
}

@Test
void testPersistentMessageFinderWhenLastMessageDelete() throws Exception {
final String ledgerAndCursorName = "testPersistentMessageFinderWhenLastMessageDelete";

ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setRetentionSizeInMB(10);
config.setMaxEntriesPerLedger(10);
config.setRetentionTime(1, TimeUnit.HOURS);
ManagedLedger ledger = factory.open(ledgerAndCursorName, config);
ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor(ledgerAndCursorName);

ledger.addEntry(createMessageWrittenToLedger("msg1"));
ledger.addEntry(createMessageWrittenToLedger("msg2"));
ledger.addEntry(createMessageWrittenToLedger("msg3"));
Position lastPosition = ledger.addEntry(createMessageWrittenToLedger("last-message"));

long endTimestamp = System.currentTimeMillis() + 1000;

Result result = new Result();
// delete last position message
cursor.delete(lastPosition);
CompletableFuture<Void> future = findMessage(result, cursor, endTimestamp);
future.get();
assertNull(result.exception);
assertNotEquals(result.position, null);
assertEquals(result.position, lastPosition);

result.reset();
cursor.close();
ledger.close();
factory.shutdown();
}

@Test
void testPersistentMessageFinderWithBrokerTimestampForMessage() throws Exception {

Expand Down