diff --git a/README.md b/README.md index c23b04b..ccc458b 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,16 @@ The plugin requires the gRPC target to be provided via environment variable: export PLAYER_PRESENCE_GRPC_TARGET="dns:///service-player.api.svc.cluster.local:9000" ``` +Optional heartbeat configuration: + +```bash +export PLAYER_PRESENCE_HEARTBEAT_SECONDS="30" +export PLAYER_SESSIONS_TTL="90s" +``` + +`PLAYER_PRESENCE_HEARTBEAT_SECONDS` is automatically clamped to a safe value based on +`PLAYER_SESSIONS_TTL` (`max = ttl / 3`) to reduce false stale-session cleanup. + Messages are configured in `velocity/src/main/resources/messages.yml` (copied to the plugin data directory on first run). diff --git a/common/build.gradle.kts b/common/build.gradle.kts index cce2133..1bc3fdd 100644 --- a/common/build.gradle.kts +++ b/common/build.gradle.kts @@ -10,4 +10,4 @@ repositories { } } -dependencies { protobuf("gg.grounds:library-grpc-contracts-player:0.1.0") } +dependencies { protobuf("gg.grounds:library-grpc-contracts-player:feat-player-heartbeat-SNAPSHOT") } diff --git a/common/src/main/kotlin/gg/grounds/player/presence/GrpcPlayerPresenceClient.kt b/common/src/main/kotlin/gg/grounds/player/presence/GrpcPlayerPresenceClient.kt index 5a34d1e..680006f 100644 --- a/common/src/main/kotlin/gg/grounds/player/presence/GrpcPlayerPresenceClient.kt +++ b/common/src/main/kotlin/gg/grounds/player/presence/GrpcPlayerPresenceClient.kt @@ -1,5 +1,7 @@ package gg.grounds.player.presence +import gg.grounds.grpc.player.PlayerHeartbeatBatchReply +import gg.grounds.grpc.player.PlayerHeartbeatBatchRequest import gg.grounds.grpc.player.PlayerLoginRequest import gg.grounds.grpc.player.PlayerLogoutReply import gg.grounds.grpc.player.PlayerLogoutRequest @@ -49,6 +51,22 @@ private constructor( } } + fun heartbeatBatch(playerIds: Collection): PlayerHeartbeatBatchReply { + return try { + val request = + PlayerHeartbeatBatchRequest.newBuilder() + .addAllPlayerIds(playerIds.map { it.toString() }) + .build() + stub + .withDeadlineAfter(DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS) + .playerHeartbeatBatch(request) + } catch (e: StatusRuntimeException) { + errorHeartbeatBatchReply(e.status.toString()) + } catch (e: RuntimeException) { + errorHeartbeatBatchReply(e.message ?: e::class.java.name) + } + } + override fun close() { channel.shutdown() try { @@ -74,6 +92,14 @@ private constructor( private fun errorLogoutReply(message: String): PlayerLogoutReply = PlayerLogoutReply.newBuilder().setRemoved(false).setMessage(message).build() + private fun errorHeartbeatBatchReply(message: String): PlayerHeartbeatBatchReply = + PlayerHeartbeatBatchReply.newBuilder() + .setUpdated(0) + .setMissing(0) + .setSuccess(false) + .setMessage(message) + .build() + private fun isServiceUnavailable(status: Status.Code): Boolean = status == Status.Code.UNAVAILABLE || status == Status.Code.DEADLINE_EXCEEDED diff --git a/velocity/build.gradle.kts b/velocity/build.gradle.kts index e835e17..9da7e1e 100644 --- a/velocity/build.gradle.kts +++ b/velocity/build.gradle.kts @@ -5,4 +5,8 @@ dependencies { implementation("tools.jackson.dataformat:jackson-dataformat-yaml:3.0.3") implementation("tools.jackson.module:jackson-module-kotlin:3.0.3") implementation("io.grpc:grpc-netty-shaded:1.78.0") + + testImplementation("org.junit.jupiter:junit-jupiter-api:5.13.4") + testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:5.13.4") + testRuntimeOnly("org.junit.platform:junit-platform-launcher:1.13.4") } diff --git a/velocity/src/main/kotlin/gg/grounds/GroundsPluginPlayer.kt b/velocity/src/main/kotlin/gg/grounds/GroundsPluginPlayer.kt index e5f3bab..c56e123 100644 --- a/velocity/src/main/kotlin/gg/grounds/GroundsPluginPlayer.kt +++ b/velocity/src/main/kotlin/gg/grounds/GroundsPluginPlayer.kt @@ -9,6 +9,7 @@ import com.velocitypowered.api.plugin.annotation.DataDirectory import com.velocitypowered.api.proxy.ProxyServer import gg.grounds.config.MessagesConfigLoader import gg.grounds.listener.PlayerConnectionListener +import gg.grounds.presence.PlayerHeartbeatScheduler import gg.grounds.presence.PlayerPresenceService import io.grpc.LoadBalancerRegistry import io.grpc.NameResolverRegistry @@ -33,6 +34,8 @@ constructor( @param:DataDirectory private val dataDirectory: Path, ) { private val playerPresenceService = PlayerPresenceService() + private val heartbeatScheduler = + PlayerHeartbeatScheduler(this, proxy, logger, playerPresenceService) init { logger.info("Initialized plugin (plugin=plugin-player, version={})", BuildInfo.VERSION) @@ -55,11 +58,13 @@ constructor( ), ) + heartbeatScheduler.start() logger.info("Configured player presence gRPC client (target={})", target) } @Subscribe fun onShutdown(event: ProxyShutdownEvent) { + heartbeatScheduler.stop() playerPresenceService.close() } diff --git a/velocity/src/main/kotlin/gg/grounds/presence/PlayerHeartbeatScheduler.kt b/velocity/src/main/kotlin/gg/grounds/presence/PlayerHeartbeatScheduler.kt new file mode 100644 index 0000000..02f8915 --- /dev/null +++ b/velocity/src/main/kotlin/gg/grounds/presence/PlayerHeartbeatScheduler.kt @@ -0,0 +1,131 @@ +package gg.grounds.presence + +import com.velocitypowered.api.proxy.ProxyServer +import com.velocitypowered.api.scheduler.ScheduledTask +import java.util.concurrent.TimeUnit +import kotlin.math.max +import kotlin.math.min +import org.slf4j.Logger + +class PlayerHeartbeatScheduler( + private val plugin: Any, + private val proxy: ProxyServer, + private val logger: Logger, + private val presenceService: PlayerPresenceService, +) { + private var heartbeatTask: ScheduledTask? = null + + fun start() { + heartbeatTask?.cancel() + heartbeatTask = null + val intervalResolution = + resolveHeartbeatIntervalSeconds( + System.getenv(HEARTBEAT_INTERVAL_ENV), + System.getenv(SESSION_TTL_ENV), + ) + heartbeatTask = + proxy.scheduler + .buildTask(plugin, Runnable { sendHeartbeats() }) + .repeat(intervalResolution.effectiveIntervalSeconds, TimeUnit.SECONDS) + .schedule() + if (intervalResolution.wasClamped) { + logger.warn( + "Player heartbeat interval clamped (configuredIntervalSeconds={}, effectiveIntervalSeconds={}, sessionTtlSeconds={}, maxIntervalSeconds={})", + intervalResolution.configuredIntervalSeconds, + intervalResolution.effectiveIntervalSeconds, + intervalResolution.sessionTtlSeconds, + intervalResolution.maxIntervalSeconds, + ) + } + logger.info( + "Configured player presence heartbeat task (intervalSeconds={}, sessionTtlSeconds={})", + intervalResolution.effectiveIntervalSeconds, + intervalResolution.sessionTtlSeconds, + ) + } + + fun stop() { + heartbeatTask?.cancel() + heartbeatTask = null + } + + private fun sendHeartbeats() { + val playerIds = proxy.allPlayers.map { it.uniqueId } + if (playerIds.isEmpty()) { + return + } + + val result = presenceService.heartbeatBatch(playerIds) + if (!result.success) { + logger.error( + "Player session heartbeat batch failed (playerCount={}, updated={}, missing={}, reason={})", + playerIds.size, + result.updated, + result.missing, + result.message, + ) + } else if (result.missing > 0) { + logger.warn( + "Player session heartbeat batch completed with missing sessions (playerCount={}, updated={}, missing={})", + playerIds.size, + result.updated, + result.missing, + ) + } else { + logger.debug( + "Player session heartbeat batch completed (playerCount={}, updated={}, missing={}, result=success)", + playerIds.size, + result.updated, + result.missing, + ) + } + } + + internal data class HeartbeatIntervalResolution( + val configuredIntervalSeconds: Long, + val effectiveIntervalSeconds: Long, + val sessionTtlSeconds: Long, + val maxIntervalSeconds: Long, + val wasClamped: Boolean, + ) + + companion object { + private const val DEFAULT_HEARTBEAT_INTERVAL_SECONDS = 30L + private const val DEFAULT_SESSION_TTL_SECONDS = 90L + private const val HEARTBEAT_INTERVAL_ENV = "PLAYER_PRESENCE_HEARTBEAT_SECONDS" + private const val SESSION_TTL_ENV = "PLAYER_SESSIONS_TTL" + + internal fun resolveHeartbeatIntervalSeconds( + heartbeatIntervalRaw: String?, + sessionTtlRaw: String?, + ): HeartbeatIntervalResolution { + val configuredInterval = + heartbeatIntervalRaw?.trim()?.toLongOrNull()?.takeIf { it > 0 } + ?: DEFAULT_HEARTBEAT_INTERVAL_SECONDS + val sessionTtlSeconds = + parseSessionTtlSeconds(sessionTtlRaw) ?: DEFAULT_SESSION_TTL_SECONDS + val maxIntervalSeconds = max(1L, sessionTtlSeconds / 3) + val effectiveInterval = min(configuredInterval, maxIntervalSeconds) + return HeartbeatIntervalResolution( + configuredIntervalSeconds = configuredInterval, + effectiveIntervalSeconds = effectiveInterval, + sessionTtlSeconds = sessionTtlSeconds, + maxIntervalSeconds = maxIntervalSeconds, + wasClamped = effectiveInterval != configuredInterval, + ) + } + + private fun parseSessionTtlSeconds(raw: String?): Long? { + val value = raw?.trim()?.lowercase() ?: return null + if (value.isEmpty()) { + return null + } + return when { + value.endsWith("s") -> value.removeSuffix("s").toLongOrNull() + value.endsWith("m") -> value.removeSuffix("m").toLongOrNull()?.times(60) + value.endsWith("h") -> value.removeSuffix("h").toLongOrNull()?.times(3600) + else -> null + }?.takeIf { it > 0 } + } + } +} diff --git a/velocity/src/main/kotlin/gg/grounds/presence/PlayerPresenceService.kt b/velocity/src/main/kotlin/gg/grounds/presence/PlayerPresenceService.kt index ba6fa37..2ccd3c1 100644 --- a/velocity/src/main/kotlin/gg/grounds/presence/PlayerPresenceService.kt +++ b/velocity/src/main/kotlin/gg/grounds/presence/PlayerPresenceService.kt @@ -8,6 +8,13 @@ import java.util.UUID class PlayerPresenceService : AutoCloseable { private lateinit var client: GrpcPlayerPresenceClient + data class HeartbeatBatchResult( + val success: Boolean, + val message: String, + val updated: Int, + val missing: Int, + ) + fun configure(target: String) { close() client = GrpcPlayerPresenceClient.create(target) @@ -29,6 +36,20 @@ class PlayerPresenceService : AutoCloseable { } } + fun heartbeatBatch(playerIds: Collection): HeartbeatBatchResult { + return try { + val reply = client.heartbeatBatch(playerIds) + HeartbeatBatchResult(reply.success, reply.message, reply.updated, reply.missing) + } catch (e: RuntimeException) { + HeartbeatBatchResult( + success = false, + message = e.message ?: e::class.java.name, + updated = 0, + missing = playerIds.size, + ) + } + } + override fun close() { if (this::client.isInitialized) { client.close() diff --git a/velocity/src/test/kotlin/gg/grounds/presence/PlayerHeartbeatSchedulerTest.kt b/velocity/src/test/kotlin/gg/grounds/presence/PlayerHeartbeatSchedulerTest.kt new file mode 100644 index 0000000..ebb0529 --- /dev/null +++ b/velocity/src/test/kotlin/gg/grounds/presence/PlayerHeartbeatSchedulerTest.kt @@ -0,0 +1,63 @@ +package gg.grounds.presence + +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertFalse +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.Test + +class PlayerHeartbeatSchedulerTest { + @Test + fun resolveHeartbeatIntervalUsesDefaultsWhenUnset() { + val result = PlayerHeartbeatScheduler.resolveHeartbeatIntervalSeconds(null, null) + + assertEquals(30, result.configuredIntervalSeconds) + assertEquals(30, result.effectiveIntervalSeconds) + assertEquals(90, result.sessionTtlSeconds) + assertEquals(30, result.maxIntervalSeconds) + assertFalse(result.wasClamped) + } + + @Test + fun resolveHeartbeatIntervalClampsWhenAboveSafeMaximum() { + val result = PlayerHeartbeatScheduler.resolveHeartbeatIntervalSeconds("60", "90s") + + assertEquals(60, result.configuredIntervalSeconds) + assertEquals(30, result.effectiveIntervalSeconds) + assertEquals(90, result.sessionTtlSeconds) + assertEquals(30, result.maxIntervalSeconds) + assertTrue(result.wasClamped) + } + + @Test + fun resolveHeartbeatIntervalDoesNotClampWhenWithinSafeMaximum() { + val result = PlayerHeartbeatScheduler.resolveHeartbeatIntervalSeconds("20", "90s") + + assertEquals(20, result.configuredIntervalSeconds) + assertEquals(20, result.effectiveIntervalSeconds) + assertEquals(90, result.sessionTtlSeconds) + assertEquals(30, result.maxIntervalSeconds) + assertFalse(result.wasClamped) + } + + @Test + fun resolveHeartbeatIntervalParsesMinuteTtl() { + val result = PlayerHeartbeatScheduler.resolveHeartbeatIntervalSeconds("90", "6m") + + assertEquals(90, result.configuredIntervalSeconds) + assertEquals(90, result.effectiveIntervalSeconds) + assertEquals(360, result.sessionTtlSeconds) + assertEquals(120, result.maxIntervalSeconds) + assertFalse(result.wasClamped) + } + + @Test + fun resolveHeartbeatIntervalFallsBackForInvalidValues() { + val result = PlayerHeartbeatScheduler.resolveHeartbeatIntervalSeconds("-5", "oops") + + assertEquals(30, result.configuredIntervalSeconds) + assertEquals(30, result.effectiveIntervalSeconds) + assertEquals(90, result.sessionTtlSeconds) + assertEquals(30, result.maxIntervalSeconds) + assertFalse(result.wasClamped) + } +}