Skip to content
Merged
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,4 @@ jobs:

- name: Build
run: |
./gradlew :plugin:build --no-daemon --stacktrace --no-configuration-cache
./gradlew :plugin:assemble --no-daemon --stacktrace --no-configuration-cache
2 changes: 1 addition & 1 deletion plugin/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ plugins {
}

group = "io.github.tiper"
version = "3.0.0"
version = "3.1.0"

kotlin {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ import io.ktor.client.plugins.api.createClientPlugin
import io.ktor.client.request.HttpRequestBuilder
import io.ktor.http.HttpMethod
import io.ktor.http.HttpMethod.Companion.Get
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.NonCancellable
import kotlinx.coroutines.async
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.isActive
Comment thread
tiper marked this conversation as resolved.
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.withContext

/**
* Configuration for the request deduplication plugin.
Expand Down Expand Up @@ -95,8 +95,8 @@ class RequestDeduplicationConfig {
*
* **Cancellation Behavior:**
* - If one caller cancels, other concurrent callers still receive the response
* - If all callers cancel, the in-flight request is cancelled to save resources
* - The first caller runs in a supervisor scope to prevent cascading cancellations
* - If the leader is cancelled, surviving waiters automatically retry as a new deduplicated group
* - If all callers cancel, the in-flight entry is removed and resources are freed
*
* **⚠️ CRITICAL: Plugin Installation Order**
*
Expand Down Expand Up @@ -173,51 +173,53 @@ val RequestDeduplication: ClientPlugin<RequestDeduplicationConfig> = createClien
val config = pluginConfig
val mutex = Mutex()
val inFlight = mutableMapOf<String, InFlightEntry>()
// SupervisorJob must be rightmost so it becomes the effective Job element
val scope = CoroutineScope(client.coroutineContext + SupervisorJob(client.coroutineContext[Job]))

val direct: suspend CoroutineScope.(Send.Sender, HttpRequestBuilder) -> HttpClientCall = { sender, request ->
sender.proceed(request).save()
suspend fun Send.Sender.execute(request: HttpRequestBuilder): HttpClientCall = if (config.minWindow < 1) {
proceed(request).save()
} else {
val deferred = async { proceed(request).save() }
delay(config.minWindow)
deferred.await()
}

val proceed = if (config.minWindow < 1) direct else { sender, request ->
async { direct(sender, request) }.also {
delay(config.minWindow)
}.await()
}

on(Send) { request ->
if (request.method !in config.deduplicateMethods) return@on proceed(request)

val cacheKey = request.buildCacheKey(config)

suspend fun Send.Sender.deduplicate(request: HttpRequestBuilder, cacheKey: String): HttpClientCall {
val (isFirst, entry) = mutex.withLock {
val existing = inFlight[cacheKey]
if (existing != null) false to existing.also { it.waiters += 1 }
else true to InFlightEntry().also { inFlight[cacheKey] = it }
}

if (isFirst) {
entry.job = scope.launch {
try {
proceed(this@on, request).also(entry.deferred::complete)
} catch (e: Throwable) {
throw e.also(entry.deferred::completeExceptionally)
} finally {
mutex.withLock { inFlight.remove(cacheKey) }
try {
return execute(request).also(entry.deferred::complete)
} catch (e: Throwable) {
throw e.also(entry.deferred::completeExceptionally)
} finally {
withContext(NonCancellable) {
mutex.withLock { if (inFlight[cacheKey] === entry) inFlight.remove(cacheKey) }
}
}
}

try {
entry.deferred.await()
return entry.deferred.await()
} catch (e: CancellationException) {
if (!isActive) throw e
return deduplicate(request, cacheKey)
} finally {
mutex.withLock {
entry.waiters -= 1
if (entry.waiters == 0 && !entry.deferred.isCompleted) entry.job?.cancel()
withContext(NonCancellable) {
mutex.withLock {
if (entry.waiters == 1 && inFlight[cacheKey] === entry) inFlight.remove(cacheKey)
else entry.waiters -= 1
}
}
}
}

on(Send) { request ->
if (request.method !in config.deduplicateMethods) return@on proceed(request)
deduplicate(request, request.buildCacheKey(config))
}
}

private fun HttpRequestBuilder.buildCacheKey(config: RequestDeduplicationConfig): String {
Expand All @@ -231,8 +233,7 @@ private fun HttpRequestBuilder.buildCacheKey(config: RequestDeduplicationConfig)
return "${method.value}:${url.buildString()}|h=$headerHash"
}

private data class InFlightEntry(
private class InFlightEntry(
val deferred: CompletableDeferred<HttpClientCall> = CompletableDeferred(),
var waiters: Int = 1,
var job: Job? = null,
)
Original file line number Diff line number Diff line change
Expand Up @@ -818,8 +818,7 @@ class RequestDeduplicationTest {
val client = HttpClient(MockEngine) {
engine {
addHandler {
requestCount.incrementAndGet()
respond("response-${requestCount.value}")
respond("response-${requestCount.incrementAndGet()}")
}
}
install(RequestDeduplication) // Default config only deduplicates GET
Expand All @@ -834,9 +833,7 @@ class RequestDeduplicationTest {
}.awaitAll()

assertEquals(3, requestCount.value, "Expected 3 separate POST requests (POST not deduplicated by default)")
responses.forEachIndexed { index, response ->
assertEquals("response-${index + 1}", response.bodyAsText())
}
assertEquals(setOf("response-1", "response-2", "response-3"), responses.map { it.bodyAsText() }.toSet())
}

@Test
Expand All @@ -845,8 +842,7 @@ class RequestDeduplicationTest {
val client = HttpClient(MockEngine) {
engine {
addHandler {
requestCount.incrementAndGet()
respond("response-${requestCount.value}")
respond("response-${requestCount.incrementAndGet()}")
}
}
install(RequestDeduplication) {
Expand Down Expand Up @@ -885,4 +881,30 @@ class RequestDeduplicationTest {
batch1.forEach { assertEquals("response-1", it.bodyAsText()) }
batch2.forEach { assertEquals("response-2", it.bodyAsText()) }
}

@Test
fun leader_cancelled_waiters_retry_as_deduplicated_group() = runTest {
val requestCount = atomic(0)
val client = mockClient {
"response-${requestCount.incrementAndGet()}"
}

val jobs = List(5) { async { client.get("https://api.example.com/users") } }

// Advance to just before the response is ready, then cancel job0.
testScheduler.advanceTimeBy(50)

// Cancel the leader while all waiters are still blocked
jobs[0].cancel()

// Let everything complete. The 4 survivors must deduplicate into a single retry.
testScheduler.advanceUntilIdle()

val responses = jobs.drop(1).awaitAll()

// The cancelled leader never reached requestCount.incrementAndGet().
// The deduplicated retry should only be the real HTTP call.
assertEquals(1, requestCount.value, "Surviving waiters must retry as one deduplicated group, not individually")
responses.forEach { assertEquals("response-1", it.bodyAsText()) }
}
}
Loading