Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import com.lightningkite.lightningserver.LsErrorException
import com.lightningkite.lightningserver.typed.BulkRequest
import com.lightningkite.lightningserver.typed.BulkResponse
import com.lightningkite.lightningserver.websocket.MultiplexMessage
import com.lightningkite.reactive.context.awaitOnce
import com.lightningkite.reactive.context.reactiveScope
import com.lightningkite.reactive.core.AppScope
import com.lightningkite.reactive.core.Reactive
Expand Down Expand Up @@ -36,6 +37,12 @@ class BulkFetcher(
override fun withHeaderCalculator(calculator: suspend () -> List<Pair<String, String>>): Fetcher =
BulkFetcher(httpBulk, wsMultiplex, json, pingTime, delay, log, calculator)

companion object {
val channelInitialBackoff: Duration = 100.milliseconds
val channelMaxBackoff: Duration = 30_000.milliseconds
val channelStableConnectionThreshold: Duration = 5_000.milliseconds
}

private var fetchQueue = HashMap<String, Pair<BulkRequest, CancellableContinuation<BulkResponse>>>()
private var scheduled = false
override suspend fun <I, O> invoke(
Expand Down Expand Up @@ -144,10 +151,18 @@ class BulkFetcher(
val channelOpen = Signal(false)
val channel = UUID.Companion.random().toString()

private var reconnectJob: Job? = null
private val backoff = ChannelBackoff(
initialBackoff = channelInitialBackoff,
maxBackoff = channelMaxBackoff,
stableConnectionThreshold = channelStableConnectionThreshold,
)

init {
remember.onMessage { message ->
if (message.channel == channel) {
if (message.start) {
backoff.onConnectionOpened()
channelOpen.value = true
onOpenList.forEach { it() }
}
Expand All @@ -156,6 +171,11 @@ class BulkFetcher(
}
if (message.end) {
channelOpen.value = false
val wasIntentional = shouldBeOn.value <= 0
backoff.onConnectionClosed(wasIntentional)
if (!wasIntentional) {
log?.log("Channel $channel closed unexpectedly, backoff now ${backoff.currentBackoffMs}ms")
}
onCloseList.forEach { it(-1) }
}
}
Expand All @@ -179,20 +199,39 @@ class BulkFetcher(
}
}

// Reactive scope pattern: whenever any signal changes, re-evaluate what to do.
// Only one reconnectJob runs at a time - we cancel before launching a new one.
// If signals change rapidly during backoff, we restart the backoff (intentional -
// we want to wait for stability before reconnecting).
val lifecycle = CoroutineScope(Job()).apply {
reactiveScope {
val shouldBeOn = shouldBeOn() > 0
val isOn = channelOpen()
val parentConnected = remember.connected()

// Always cancel pending reconnect when re-evaluating
reconnectJob?.cancel()
reconnectJob = null

if (shouldBeOn && parentConnected && !isOn) {
remember.send(
MultiplexMessage(
channel = channel,
path = path,
queryParams = params,
start = true
)
)
reconnectJob = launch {
val backoffWithJitter = backoff.getBackoffWithJitter()
if (backoffWithJitter > 0) {
log?.log("Channel $channel backing off for ${backoffWithJitter}ms before reconnect")
delay(backoffWithJitter)
}
// Re-verify conditions after delay (signals may have changed)
if (this@WebsocketChannel.shouldBeOn.value > 0 && remember.connected.awaitOnce() && !channelOpen.value) {
remember.send(
MultiplexMessage(
channel = channel,
path = path,
queryParams = params,
start = true
)
)
}
}
} else if (!shouldBeOn && parentConnected && isOn) {
remember.send(
MultiplexMessage(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package com.lightningkite.lightningserver.networking

import kotlin.random.Random
import kotlin.time.Duration

/**
* Manages exponential backoff for websocket channel reconnection.
*
* Backoff increases when connections fail quickly (< stableConnectionThreshold).
* Backoff resets when connections stay open long enough to be considered stable,
* or when the connection is closed intentionally.
*
* @param initialBackoff Starting backoff duration after first failure
* @param maxBackoff Maximum backoff duration
* @param stableConnectionThreshold How long a connection must stay open to be considered stable
* @param jitterFactor Random variation factor (0.2 = +/- 20%)
* @param timeSource Injectable time source for testing (returns epoch millis)
* @param randomSource Injectable random source for testing (returns 0.0 to 1.0)
*/
class ChannelBackoff(
private val initialBackoff: Duration,
private val maxBackoff: Duration,
private val stableConnectionThreshold: Duration,
private val jitterFactor: Double = 0.2,
private val timeSource: () -> Long = { com.lightningkite.now().toEpochMilliseconds() },
private val randomSource: () -> Double = { Random.nextDouble() },
) {
private var backoffMs: Long = 0L
private var connectionOpenedAt: Long = 0L

/** Call when a channel successfully connects */
fun onConnectionOpened() {
connectionOpenedAt = timeSource()
}

/**
* Call when a channel closes.
* @param wasIntentional true if the user closed the channel, false if server/network closed it
*/
fun onConnectionClosed(wasIntentional: Boolean) {
if (wasIntentional) {
backoffMs = 0L
return
}

val connectionDuration = timeSource() - connectionOpenedAt
if (connectionDuration < stableConnectionThreshold.inWholeMilliseconds) {
// Short-lived connection = failure, increase backoff
backoffMs = if (backoffMs == 0L) {
initialBackoff.inWholeMilliseconds
} else {
(backoffMs * 2).coerceAtMost(maxBackoff.inWholeMilliseconds)
}
} else {
// Connection was stable, reset backoff
backoffMs = 0L
}
}

/** Returns current backoff with random jitter applied, or 0 if no backoff needed */
fun getBackoffWithJitter(): Long {
if (backoffMs == 0L) return 0L
// Jitter: random value between -jitterFactor and +jitterFactor
val jitterMultiplier = 1.0 + (randomSource() * 2 - 1) * jitterFactor
return (backoffMs * jitterMultiplier).toLong().coerceAtLeast(1L)
}

/** Returns current base backoff in ms (without jitter), for logging */
val currentBackoffMs: Long get() = backoffMs
}
Loading
Loading