Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,28 @@ class InMemoryIndex<T : Indexable>(
override suspend fun insert(entry: T) = lock.write { insertSingleLocked(entry) }

override suspend fun removeBySource(sourceId: String) = lock.write {
val keys = sourceMap.remove(sourceId) ?: return@write
removeBySourceLocked(sourceId)
}

/**
* Remove every entry belonging to any of [sourceIds].
*
* Acquires the write lock once and removes each source under it, so the whole
* batch is atomic with respect to concurrent readers and writers — there is no
* intermediate state in which only some of the sources have been removed.
*/
override suspend fun removeBySources(sourceIds: Collection<String>) = lock.write {
for (sourceId in sourceIds) {
removeBySourceLocked(sourceId)
}
}

/**
* Remove all entries for [sourceId] from the primary, source, and secondary
* indexes. Caller MUST already hold the write lock; this method does not lock.
*/
private fun removeBySourceLocked(sourceId: String) {
val keys = sourceMap.remove(sourceId) ?: return
for (key in keys) {
val entry = primaryMap.remove(key) ?: continue
removeFromSecondaryIndexes(entry)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ class SQLiteIndex<T : Indexable>(
) : Index<T> {
companion object {
private val log = LoggerFactory.getLogger(SQLiteIndex::class.java)

/**
* Max number of `_source_id` placeholders per batched DELETE.
* Kept well under SQLite's default 999 bound-parameter limit.
*/
private const val DELETE_CHUNK_SIZE = 900
}


Expand Down Expand Up @@ -190,6 +196,35 @@ class SQLiteIndex<T : Indexable>(
ifOpen { db.execSQL("DELETE FROM $tableName WHERE _source_id = ?", arrayOf(sourceId)) }
}

/**
* Remove every row whose `_source_id` is in [sourceIds] using a single SQLite
* transaction. The ids are split into chunks of at most [DELETE_CHUNK_SIZE] so
* each `DELETE ... IN (?, ?, ...)` stays within SQLite's bound-parameter limit;
* all chunks run inside the one transaction, so the batch commits atomically
* (an empty [sourceIds] is a no-op and opens no transaction).
*
* @param sourceIds Source ids whose rows should be deleted.
*/
override suspend fun removeBySources(sourceIds: Collection<String>) =
withContext(Dispatchers.IO) {
if (sourceIds.isEmpty()) return@withContext
ifOpen {
db.beginTransaction()
try {
for (chunk in sourceIds.chunked(DELETE_CHUNK_SIZE)) {
val placeholders = chunk.joinToString(",") { "?" }
db.execSQL(
"DELETE FROM $tableName WHERE _source_id IN ($placeholders)",
chunk.toTypedArray(),
)
}
db.setTransactionSuccessful()
} finally {
db.endTransaction()
}
}
}

override suspend fun clear() = withContext(Dispatchers.IO) {
ifOpen { db.execSQL("DELETE FROM $tableName") }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,16 @@ interface WritableIndex<T : Indexable> {
*/
suspend fun removeBySource(sourceId: String)

/**
* Remove all entries from the given sources in a single transaction.
*
* Equivalent to calling [removeBySource] for each id, but issues the
* deletes as one batched, transactional operation instead of N
* sequential statements. Implementations should chunk the ids so the
* generated SQL stays within parameter limits.
*/
suspend fun removeBySources(sourceIds: Collection<String>)

/**
* Remove all entries.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,13 @@ internal class IndexWorker(
while (isActive) {
when (val cmd = queue.take()) {
is IndexCommand.RemoveFromIndex -> {
val filePath = cmd.path.pathString
fileIndex.remove(filePath)
sourceIndex.removeBySource(filePath)
applyRemovals(
first = cmd,
fileIndex = fileIndex,
sourceIndex = sourceIndex,
pollNext = { queue.pollIndexQueue() },
pushBack = { queue.pushBackIndexQueue(it) },
)
}

is IndexCommand.IndexSourceFile -> {
Expand Down Expand Up @@ -151,3 +155,51 @@ internal class IndexWorker(
}
}
}

/**
* Apply [first] plus any consecutive, immediately-available [IndexCommand.RemoveFromIndex]
* commands as a single batched removal.
*
* Symbol removals are collapsed into one [JvmSymbolIndex.removeBySources] call — a single
* SQLite transaction — instead of issuing one `DELETE FROM jvm_symbols` (one transaction)
* per file, which is the N+1 this fix targets (Sentry APPDEVFORALL-SE).
*
* [pollNext] returns the next already-queued index command without blocking, or `null`
* when none is ready. A polled command that is *not* a removal is handed to [pushBack] so
* it is processed (in order) on the next loop iteration rather than dropped.
*
* @param first The removal command that triggered this batch.
* @param fileIndex Per-file metadata index (has no batch API; removed one by one).
* @param sourceIndex Symbol index; removed via the batched [JvmSymbolIndex.removeBySources].
* @param pollNext Non-blocking poll of the next queued index command.
* @param pushBack Returns a non-removal command to the front of the queue.
*/
internal suspend fun applyRemovals(
first: IndexCommand.RemoveFromIndex,
fileIndex: KtFileMetadataIndex,
sourceIndex: JvmSymbolIndex,
pollNext: () -> IndexCommand?,
pushBack: (IndexCommand) -> Unit,
) {
val paths = ArrayList<String>()
paths.add(first.path.pathString)

while (true) {
val next = pollNext() ?: break
if (next is IndexCommand.RemoveFromIndex) {
paths.add(next.path.pathString)
} else {
// Not batchable — return it so the main loop handles it next, in order.
pushBack(next)
break
}
}

// Per-file metadata index has no batch API; remove individually.
for (path in paths) {
fileIndex.remove(path)
}
Comment on lines +198 to +201

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The KtFileMetadataIndex is a wrapper over an Index, so I think we could add a similar API to KtFileMetadataIndex for batch removal.


// Collapse all symbol removals into a single transaction.
sourceIndex.removeBySources(paths)
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,39 @@ internal class WorkerQueue<T> {
private val editChannel = Channel<T>(capacity = 20)
private val indexChannel = Channel<T>(capacity = 100)

// Single-slot pushback for an index-queue item that was polled (to coalesce
// removals) but turned out not to be batchable. It is returned ahead of the
// channels by the next [take], preserving command order.
private var pushedBack: T? = null

suspend fun putScanQueue(item: T) = scanChannel.send(item)
suspend fun putEditQueue(item: T) = editChannel.send(item)
suspend fun putIndexQueue(item: T) = indexChannel.send(item)

/**
* Non-blocking poll of the index queue. Returns the next already-available
* index-queue item, or `null` if none is immediately ready.
*
* Used to coalesce a run of consecutive removal commands into a single
* batched index operation (see [IndexWorker]) instead of issuing one
* transaction per command. A polled item that is not batchable must be
* returned via [pushBackIndexQueue] so it is not dropped.
*/
fun pollIndexQueue(): T? = indexChannel.tryReceive().getOrNull()

/**
* Return an item previously obtained from [pollIndexQueue] to the front of
* the queue so the next [take] yields it before any channel item. At most
* one item may be pushed back at a time.
*/
fun pushBackIndexQueue(item: T) {
check(pushedBack == null) { "pushBack slot already occupied" }
pushedBack = item
}

suspend fun take(): T {
pushedBack?.let { pushedBack = null; return it }

scanChannel.tryReceive().getOrNull()?.let { return it }
editChannel.tryReceive().getOrNull()?.let { return it }
indexChannel.tryReceive().getOrNull()?.let { return it }
Expand Down
Loading
Loading