-
Notifications
You must be signed in to change notification settings - Fork 978
Add in-memory cache for getLastEntryInLedger to reduce RocksDB getFloor CPU cost #4732
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -34,6 +34,7 @@ | |
| import org.apache.bookkeeper.conf.ServerConfiguration; | ||
| import org.apache.bookkeeper.stats.StatsLogger; | ||
| import org.apache.bookkeeper.util.collections.ConcurrentLongHashSet; | ||
| import org.apache.bookkeeper.util.collections.ConcurrentLongLongHashMap; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
|
|
@@ -45,15 +46,25 @@ | |
| */ | ||
| public class EntryLocationIndex implements Closeable { | ||
|
|
||
| static final String LAST_ENTRY_CACHE_MAX_SIZE = "dbStorage_lastEntryCacheMaxSize"; | ||
| private static final long DEFAULT_LAST_ENTRY_CACHE_MAX_SIZE = 10_000; | ||
|
|
||
| private final KeyValueStorage locationsDb; | ||
| private final ConcurrentLongHashSet deletedLedgers = ConcurrentLongHashSet.newBuilder().build(); | ||
| private final ConcurrentLongLongHashMap lastEntryCache; | ||
| private final long lastEntryCacheMaxSize; | ||
| private final EntryLocationIndexStats stats; | ||
| private boolean isCompacting; | ||
|
|
||
| public EntryLocationIndex(ServerConfiguration conf, KeyValueStorageFactory storageFactory, String basePath, | ||
| StatsLogger stats) throws IOException { | ||
| locationsDb = storageFactory.newKeyValueStorage(basePath, "locations", DbConfigType.EntryLocation, conf); | ||
|
|
||
| this.lastEntryCacheMaxSize = conf.getLong(LAST_ENTRY_CACHE_MAX_SIZE, DEFAULT_LAST_ENTRY_CACHE_MAX_SIZE); | ||
| this.lastEntryCache = ConcurrentLongLongHashMap.newBuilder() | ||
| .expectedItems((int) Math.min(lastEntryCacheMaxSize, Integer.MAX_VALUE)) | ||
| .build(); | ||
|
|
||
| this.stats = new EntryLocationIndexStats( | ||
| stats, | ||
| () -> { | ||
|
|
@@ -115,6 +126,18 @@ public long getLastEntryInLedger(long ledgerId) throws IOException { | |
| } | ||
|
|
||
| private long getLastEntryInLedgerInternal(long ledgerId) throws IOException { | ||
| // Check in-memory cache first to avoid expensive getFloor() calls. | ||
| // ConcurrentLongLongHashMap.get() returns -1 if not found. | ||
| long cachedLastEntry = lastEntryCache.get(ledgerId); | ||
| if (cachedLastEntry >= 0) { | ||
| if (log.isDebugEnabled()) { | ||
| log.debug("Found last entry for ledger {} in cache: {}", ledgerId, cachedLastEntry); | ||
| } | ||
| stats.getGetLastEntryInLedgerStats() | ||
| .registerSuccessfulEvent(0, TimeUnit.NANOSECONDS); | ||
| return cachedLastEntry; | ||
|
Comment on lines
+136
to
+138
|
||
| } | ||
|
|
||
| LongPairWrapper maxEntryId = LongPairWrapper.get(ledgerId, Long.MAX_VALUE); | ||
|
|
||
| long startTimeNanos = MathUtils.nowInNano(); | ||
|
|
@@ -139,6 +162,11 @@ private long getLastEntryInLedgerInternal(long ledgerId) throws IOException { | |
| if (log.isDebugEnabled()) { | ||
| log.debug("Found last page in storage db for ledger {} - last entry: {}", ledgerId, lastEntryId); | ||
| } | ||
| // Populate cache for future lookups | ||
| if (lastEntryCache.size() >= lastEntryCacheMaxSize) { | ||
| lastEntryCache.clear(); | ||
| } | ||
| lastEntryCache.put(ledgerId, lastEntryId); | ||
|
Comment on lines
+165
to
+169
|
||
| return lastEntryId; | ||
| } else { | ||
| throw new Bookie.NoEntryException(ledgerId, -1); | ||
|
|
@@ -171,6 +199,18 @@ public void addLocation(Batch batch, long ledgerId, long entryId, long location) | |
| key.recycle(); | ||
| value.recycle(); | ||
| } | ||
|
|
||
| // Update the last entry cache if this entry is newer. | ||
| // ConcurrentLongLongHashMap.get() returns -1 if not found. | ||
| long cachedLastEntry = lastEntryCache.get(ledgerId); | ||
| if (cachedLastEntry < entryId) { | ||
| // Clear the cache if it exceeds the max size to bound memory usage. | ||
| // The cache will quickly repopulate with hot ledgers on subsequent reads. | ||
| if (cachedLastEntry < 0 && lastEntryCache.size() >= lastEntryCacheMaxSize) { | ||
| lastEntryCache.clear(); | ||
| } | ||
| lastEntryCache.put(ledgerId, entryId); | ||
| } | ||
|
Comment on lines
+203
to
+213
|
||
| } | ||
|
|
||
| public void updateLocations(Iterable<EntryLocation> newLocations) throws IOException { | ||
|
|
@@ -195,6 +235,7 @@ public void delete(long ledgerId) throws IOException { | |
| // We need to find all the LedgerIndexPage records belonging to one specific | ||
| // ledgers | ||
| deletedLedgers.add(ledgerId); | ||
| lastEntryCache.remove(ledgerId); | ||
| } | ||
|
|
||
| public String getEntryLocationDBPath() { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,9 +22,11 @@ | |
|
|
||
| import static org.junit.Assert.assertEquals; | ||
| import static org.junit.Assert.assertTrue; | ||
| import static org.junit.Assert.fail; | ||
|
|
||
| import java.io.File; | ||
| import java.io.IOException; | ||
| import org.apache.bookkeeper.bookie.Bookie; | ||
| import org.apache.bookkeeper.conf.ServerConfiguration; | ||
| import org.apache.bookkeeper.stats.NullStatsLogger; | ||
| import org.apache.bookkeeper.test.TestStatsProvider; | ||
|
|
@@ -205,6 +207,52 @@ public void testDeleteSpecialEntry() throws IOException { | |
| assertEquals(0, idx.getLocation(40312, 10)); | ||
| } | ||
|
|
||
| @Test | ||
| public void testGetLastEntryInLedgerCache() throws Exception { | ||
| File tmpDir = File.createTempFile("bkTest", ".dir"); | ||
| tmpDir.delete(); | ||
| tmpDir.mkdir(); | ||
| tmpDir.deleteOnExit(); | ||
|
|
||
| EntryLocationIndex idx = new EntryLocationIndex(serverConfiguration, KeyValueStorageRocksDB.factory, | ||
| tmpDir.getAbsolutePath(), NullStatsLogger.INSTANCE); | ||
|
|
||
| // Add entries for ledger 1 | ||
| idx.addLocation(1, 0, 100); | ||
| idx.addLocation(1, 1, 101); | ||
| idx.addLocation(1, 2, 102); | ||
|
|
||
| // Add entries for ledger 2 | ||
| idx.addLocation(2, 0, 200); | ||
| idx.addLocation(2, 5, 205); | ||
|
|
||
| // First call should hit RocksDB and populate cache | ||
| assertEquals(2, idx.getLastEntryInLedger(1)); | ||
| assertEquals(5, idx.getLastEntryInLedger(2)); | ||
|
|
||
| // Second call should hit cache and return same result | ||
| assertEquals(2, idx.getLastEntryInLedger(1)); | ||
| assertEquals(5, idx.getLastEntryInLedger(2)); | ||
|
|
||
| // Adding a newer entry should update cache | ||
| idx.addLocation(1, 10, 110); | ||
| assertEquals(10, idx.getLastEntryInLedger(1)); | ||
|
|
||
| // Delete should invalidate cache | ||
| idx.delete(1); | ||
| try { | ||
| idx.getLastEntryInLedger(1); | ||
| fail("Should have thrown NoEntryException"); | ||
| } catch (Bookie.NoEntryException e) { | ||
| // expected | ||
| } | ||
|
Comment on lines
+241
to
+248
|
||
|
|
||
| // Ledger 2 should still work | ||
| assertEquals(5, idx.getLastEntryInLedger(2)); | ||
|
|
||
| idx.close(); | ||
| } | ||
|
|
||
| @Test | ||
| public void testEntryIndexLookupLatencyStats() throws IOException { | ||
| File tmpDir = File.createTempFile("bkTest", ".dir"); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lastEntryCacheMaxSizeis used directly asexpectedItems(...)with the defaultConcurrentLongLongHashMapconcurrency level (16).ConcurrentLongLongHashMapenforcesexpectedItems > 0andexpectedItems >= concurrencyLevel, so settingdbStorage_lastEntryCacheMaxSizeto a small value (e.g., 1–15) or 0 will throwIllegalArgumentExceptionduringEntryLocationIndexconstruction. Please clamp/validate the config and/or explicitly set the cache map concurrency level (eg.concurrencyLevel(min(16, expectedItems))) and treat<= 0as “cache disabled”.