diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java index a353b7cf7ee..a93b1058472 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndex.java @@ -34,6 +34,7 @@ import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.util.collections.ConcurrentLongHashSet; +import org.apache.bookkeeper.util.collections.ConcurrentLongLongHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,8 +46,13 @@ */ public class EntryLocationIndex implements Closeable { + static final String LAST_ENTRY_CACHE_MAX_SIZE = "dbStorage_lastEntryCacheMaxSize"; + private static final long DEFAULT_LAST_ENTRY_CACHE_MAX_SIZE = 10_000; + private final KeyValueStorage locationsDb; private final ConcurrentLongHashSet deletedLedgers = ConcurrentLongHashSet.newBuilder().build(); + private final ConcurrentLongLongHashMap lastEntryCache; + private final long lastEntryCacheMaxSize; private final EntryLocationIndexStats stats; private boolean isCompacting; @@ -54,6 +60,11 @@ public EntryLocationIndex(ServerConfiguration conf, KeyValueStorageFactory stora StatsLogger stats) throws IOException { locationsDb = storageFactory.newKeyValueStorage(basePath, "locations", DbConfigType.EntryLocation, conf); + this.lastEntryCacheMaxSize = conf.getLong(LAST_ENTRY_CACHE_MAX_SIZE, DEFAULT_LAST_ENTRY_CACHE_MAX_SIZE); + this.lastEntryCache = ConcurrentLongLongHashMap.newBuilder() + .expectedItems((int) Math.min(lastEntryCacheMaxSize, Integer.MAX_VALUE)) + .build(); + this.stats = new EntryLocationIndexStats( stats, () -> { @@ -115,6 +126,18 @@ public long getLastEntryInLedger(long ledgerId) throws IOException { } private long getLastEntryInLedgerInternal(long ledgerId) throws IOException { + // Check in-memory cache first to avoid expensive getFloor() calls. + // ConcurrentLongLongHashMap.get() returns -1 if not found. + long cachedLastEntry = lastEntryCache.get(ledgerId); + if (cachedLastEntry >= 0) { + if (log.isDebugEnabled()) { + log.debug("Found last entry for ledger {} in cache: {}", ledgerId, cachedLastEntry); + } + stats.getGetLastEntryInLedgerStats() + .registerSuccessfulEvent(0, TimeUnit.NANOSECONDS); + return cachedLastEntry; + } + LongPairWrapper maxEntryId = LongPairWrapper.get(ledgerId, Long.MAX_VALUE); long startTimeNanos = MathUtils.nowInNano(); @@ -139,6 +162,11 @@ private long getLastEntryInLedgerInternal(long ledgerId) throws IOException { if (log.isDebugEnabled()) { log.debug("Found last page in storage db for ledger {} - last entry: {}", ledgerId, lastEntryId); } + // Populate cache for future lookups + if (lastEntryCache.size() >= lastEntryCacheMaxSize) { + lastEntryCache.clear(); + } + lastEntryCache.put(ledgerId, lastEntryId); return lastEntryId; } else { throw new Bookie.NoEntryException(ledgerId, -1); @@ -171,6 +199,18 @@ public void addLocation(Batch batch, long ledgerId, long entryId, long location) key.recycle(); value.recycle(); } + + // Update the last entry cache if this entry is newer. + // ConcurrentLongLongHashMap.get() returns -1 if not found. + long cachedLastEntry = lastEntryCache.get(ledgerId); + if (cachedLastEntry < entryId) { + // Clear the cache if it exceeds the max size to bound memory usage. + // The cache will quickly repopulate with hot ledgers on subsequent reads. + if (cachedLastEntry < 0 && lastEntryCache.size() >= lastEntryCacheMaxSize) { + lastEntryCache.clear(); + } + lastEntryCache.put(ledgerId, entryId); + } } public void updateLocations(Iterable newLocations) throws IOException { @@ -195,6 +235,7 @@ public void delete(long ledgerId) throws IOException { // We need to find all the LedgerIndexPage records belonging to one specific // ledgers deletedLedgers.add(ledgerId); + lastEntryCache.remove(ledgerId); } public String getEntryLocationDBPath() { diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndexTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndexTest.java index 80fdfc7bd0e..1b53821ffd9 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndexTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndexTest.java @@ -22,9 +22,11 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.File; import java.io.IOException; +import org.apache.bookkeeper.bookie.Bookie; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.test.TestStatsProvider; @@ -205,6 +207,52 @@ public void testDeleteSpecialEntry() throws IOException { assertEquals(0, idx.getLocation(40312, 10)); } + @Test + public void testGetLastEntryInLedgerCache() throws Exception { + File tmpDir = File.createTempFile("bkTest", ".dir"); + tmpDir.delete(); + tmpDir.mkdir(); + tmpDir.deleteOnExit(); + + EntryLocationIndex idx = new EntryLocationIndex(serverConfiguration, KeyValueStorageRocksDB.factory, + tmpDir.getAbsolutePath(), NullStatsLogger.INSTANCE); + + // Add entries for ledger 1 + idx.addLocation(1, 0, 100); + idx.addLocation(1, 1, 101); + idx.addLocation(1, 2, 102); + + // Add entries for ledger 2 + idx.addLocation(2, 0, 200); + idx.addLocation(2, 5, 205); + + // First call should hit RocksDB and populate cache + assertEquals(2, idx.getLastEntryInLedger(1)); + assertEquals(5, idx.getLastEntryInLedger(2)); + + // Second call should hit cache and return same result + assertEquals(2, idx.getLastEntryInLedger(1)); + assertEquals(5, idx.getLastEntryInLedger(2)); + + // Adding a newer entry should update cache + idx.addLocation(1, 10, 110); + assertEquals(10, idx.getLastEntryInLedger(1)); + + // Delete should invalidate cache + idx.delete(1); + try { + idx.getLastEntryInLedger(1); + fail("Should have thrown NoEntryException"); + } catch (Bookie.NoEntryException e) { + // expected + } + + // Ledger 2 should still work + assertEquals(5, idx.getLastEntryInLedger(2)); + + idx.close(); + } + @Test public void testEntryIndexLookupLatencyStats() throws IOException { File tmpDir = File.createTempFile("bkTest", ".dir");