Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancelChildren
import kotlinx.coroutines.joinAll
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withTimeoutOrNull
import org.slf4j.LoggerFactory
import java.io.Closeable
Expand Down Expand Up @@ -41,8 +42,10 @@ class IndexingServiceManager(
/**
* Register an [IndexingService].
*
* Must be called before [onProjectSynced]. Services are initialized
* in registration order.
* Must be called before [onProjectSynced]. Services are initialized in an
* unspecified order (they are held in a [ConcurrentHashMap]), so a service's
* [IndexingService.initialize] must not depend on another service having
* been initialized first.
*
* @throws IllegalStateException if called after initialization.
*/
Expand Down Expand Up @@ -123,51 +126,57 @@ class IndexingServiceManager(
override fun close() {
log.info("Shutting down indexing services")

// Close all services and the registry, and block until they finish.
// Callers (e.g. ProjectManagerImpl.destroy()) rely on shutdown being
// complete when close() returns -- the Closeable contract -- before they
// drop their reference to the manager.
//
// Services are closed concurrently on Dispatchers.Default (so no ordering
// is implied), and each close is bounded by SERVICE_CLOSE_TIMEOUT so a
// cooperatively-cancellable service cannot stall teardown indefinitely.
// Failures are isolated per service.
runBlocking {
val serviceJobs = services.values.map { service ->
launch(Dispatchers.Default) {
withTimeoutOrNull(SERVICE_CLOSE_TIMEOUT) {
try {
service.close()
log.debug("Closed service: {}", service.id)
} catch (e: Exception) {
if (e is CancellationException) throw e
log.error("Failed to close service: {}", service.id, e)
}
} ?: log.warn(
"Indexing service {} failed to close within timeout period: {}ms",
service.id, SERVICE_CLOSE_TIMEOUT.inWholeMilliseconds,
)
}
}

// Close services in reverse registration order
val cancellationJobs = services.values.reversed().map { service ->
scope.launch(Dispatchers.Default) {
val closeRegistryJob = launch(Dispatchers.Default) {
withTimeoutOrNull(SERVICE_CLOSE_TIMEOUT) {
try {
service.close()
log.debug("Closed service: {}", service.id)
registry.close()
} catch (e: Exception) {
if (e is CancellationException) throw e
log.error("Failed to close service: {}", service.id, e)
log.error("Failed to close index registry", e)
}
} ?: run {
log.warn("Indexing service {} failed to close within timeout period: {}ms", service.id, SERVICE_CLOSE_TIMEOUT.inWholeMilliseconds)
}
} ?: log.warn(
"Index registry failed to close within timeout: {}ms",
SERVICE_CLOSE_TIMEOUT.inWholeMilliseconds,
)
}
}

val closeRegistryJob = scope.launch(Dispatchers.Default) {
withTimeoutOrNull(SERVICE_CLOSE_TIMEOUT) {
try {
registry.close()
} catch (e: Exception) {
if (e is CancellationException) throw e
log.error("Failed to close index registry")
}
} ?: run {
log.warn("Index registry failed to close within timeout: {}ms", SERVICE_CLOSE_TIMEOUT.inWholeMilliseconds)
}
joinAll(*serviceJobs.toTypedArray(), closeRegistryJob)
}

scope.launch {
runCatching { joinAll(closeRegistryJob, *cancellationJobs.toTypedArray()) }
.onFailure { err ->
log.error("Failed to close indexing services", err)
}

// Cancel in-flight work
scope.coroutineContext.cancelChildren()
}
// Cancel any in-flight indexing work still running on the manager scope.
scope.coroutineContext.cancelChildren()

services.clear()
initialized = false

log.info("Indexing services shut down requested")
log.info("Indexing services shut down")
}

private suspend fun initializeServices() {
Expand Down
Loading