Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,16 @@ The plugin requires the gRPC target to be provided via environment variable:
export PLAYER_PRESENCE_GRPC_TARGET="dns:///service-player.api.svc.cluster.local:9000"
```

Optional heartbeat configuration:

```bash
export PLAYER_PRESENCE_HEARTBEAT_SECONDS="30"
export PLAYER_SESSIONS_TTL="90s"
```

`PLAYER_PRESENCE_HEARTBEAT_SECONDS` is automatically clamped to a safe value based on
`PLAYER_SESSIONS_TTL` (`max = ttl / 3`) to reduce false stale-session cleanup.

Messages are configured in `velocity/src/main/resources/messages.yml` (copied to the plugin data
directory on first run).

Expand Down
2 changes: 1 addition & 1 deletion common/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ repositories {
}
}

dependencies { protobuf("gg.grounds:library-grpc-contracts-player:0.1.0") }
dependencies { protobuf("gg.grounds:library-grpc-contracts-player:feat-player-heartbeat-SNAPSHOT") }
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package gg.grounds.player.presence

import gg.grounds.grpc.player.PlayerHeartbeatBatchReply
import gg.grounds.grpc.player.PlayerHeartbeatBatchRequest
import gg.grounds.grpc.player.PlayerLoginRequest
import gg.grounds.grpc.player.PlayerLogoutReply
import gg.grounds.grpc.player.PlayerLogoutRequest
Expand Down Expand Up @@ -49,6 +51,22 @@ private constructor(
}
}

fun heartbeatBatch(playerIds: Collection<UUID>): PlayerHeartbeatBatchReply {
return try {
val request =
PlayerHeartbeatBatchRequest.newBuilder()
.addAllPlayerIds(playerIds.map { it.toString() })
.build()
stub
.withDeadlineAfter(DEFAULT_TIMEOUT_MS, TimeUnit.MILLISECONDS)
.playerHeartbeatBatch(request)
} catch (e: StatusRuntimeException) {
errorHeartbeatBatchReply(e.status.toString())
} catch (e: RuntimeException) {
errorHeartbeatBatchReply(e.message ?: e::class.java.name)
}
}

override fun close() {
channel.shutdown()
try {
Expand All @@ -74,6 +92,14 @@ private constructor(
private fun errorLogoutReply(message: String): PlayerLogoutReply =
PlayerLogoutReply.newBuilder().setRemoved(false).setMessage(message).build()

private fun errorHeartbeatBatchReply(message: String): PlayerHeartbeatBatchReply =
PlayerHeartbeatBatchReply.newBuilder()
.setUpdated(0)
.setMissing(0)
.setSuccess(false)
.setMessage(message)
.build()

private fun isServiceUnavailable(status: Status.Code): Boolean =
status == Status.Code.UNAVAILABLE || status == Status.Code.DEADLINE_EXCEEDED

Expand Down
4 changes: 4 additions & 0 deletions velocity/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,8 @@ dependencies {
implementation("tools.jackson.dataformat:jackson-dataformat-yaml:3.0.3")
implementation("tools.jackson.module:jackson-module-kotlin:3.0.3")
implementation("io.grpc:grpc-netty-shaded:1.78.0")

testImplementation("org.junit.jupiter:junit-jupiter-api:5.13.4")
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:5.13.4")
testRuntimeOnly("org.junit.platform:junit-platform-launcher:1.13.4")
}
5 changes: 5 additions & 0 deletions velocity/src/main/kotlin/gg/grounds/GroundsPluginPlayer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import com.velocitypowered.api.plugin.annotation.DataDirectory
import com.velocitypowered.api.proxy.ProxyServer
import gg.grounds.config.MessagesConfigLoader
import gg.grounds.listener.PlayerConnectionListener
import gg.grounds.presence.PlayerHeartbeatScheduler
import gg.grounds.presence.PlayerPresenceService
import io.grpc.LoadBalancerRegistry
import io.grpc.NameResolverRegistry
Expand All @@ -33,6 +34,8 @@ constructor(
@param:DataDirectory private val dataDirectory: Path,
) {
private val playerPresenceService = PlayerPresenceService()
private val heartbeatScheduler =
PlayerHeartbeatScheduler(this, proxy, logger, playerPresenceService)

init {
logger.info("Initialized plugin (plugin=plugin-player, version={})", BuildInfo.VERSION)
Expand All @@ -55,11 +58,13 @@ constructor(
),
)

heartbeatScheduler.start()
logger.info("Configured player presence gRPC client (target={})", target)
}

@Subscribe
fun onShutdown(event: ProxyShutdownEvent) {
heartbeatScheduler.stop()
playerPresenceService.close()
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package gg.grounds.presence

import com.velocitypowered.api.proxy.ProxyServer
import com.velocitypowered.api.scheduler.ScheduledTask
import java.util.concurrent.TimeUnit
import kotlin.math.max
import kotlin.math.min
import org.slf4j.Logger

class PlayerHeartbeatScheduler(
private val plugin: Any,
private val proxy: ProxyServer,
private val logger: Logger,
private val presenceService: PlayerPresenceService,
) {
private var heartbeatTask: ScheduledTask? = null

fun start() {
heartbeatTask?.cancel()
heartbeatTask = null
val intervalResolution =
resolveHeartbeatIntervalSeconds(
System.getenv(HEARTBEAT_INTERVAL_ENV),
System.getenv(SESSION_TTL_ENV),
)
heartbeatTask =
proxy.scheduler
.buildTask(plugin, Runnable { sendHeartbeats() })
.repeat(intervalResolution.effectiveIntervalSeconds, TimeUnit.SECONDS)
.schedule()
if (intervalResolution.wasClamped) {
logger.warn(
"Player heartbeat interval clamped (configuredIntervalSeconds={}, effectiveIntervalSeconds={}, sessionTtlSeconds={}, maxIntervalSeconds={})",
intervalResolution.configuredIntervalSeconds,
intervalResolution.effectiveIntervalSeconds,
intervalResolution.sessionTtlSeconds,
intervalResolution.maxIntervalSeconds,
)
}
logger.info(
"Configured player presence heartbeat task (intervalSeconds={}, sessionTtlSeconds={})",
intervalResolution.effectiveIntervalSeconds,
intervalResolution.sessionTtlSeconds,
)
}

fun stop() {
heartbeatTask?.cancel()
heartbeatTask = null
}

private fun sendHeartbeats() {
val playerIds = proxy.allPlayers.map { it.uniqueId }
if (playerIds.isEmpty()) {
return
}

val result = presenceService.heartbeatBatch(playerIds)
if (!result.success) {
logger.error(
"Player session heartbeat batch failed (playerCount={}, updated={}, missing={}, reason={})",
playerIds.size,
result.updated,
result.missing,
result.message,
)
} else if (result.missing > 0) {
logger.warn(
"Player session heartbeat batch completed with missing sessions (playerCount={}, updated={}, missing={})",
playerIds.size,
result.updated,
result.missing,
)
} else {
logger.debug(
"Player session heartbeat batch completed (playerCount={}, updated={}, missing={}, result=success)",
playerIds.size,
result.updated,
result.missing,
)
}
}

internal data class HeartbeatIntervalResolution(
val configuredIntervalSeconds: Long,
val effectiveIntervalSeconds: Long,
val sessionTtlSeconds: Long,
val maxIntervalSeconds: Long,
val wasClamped: Boolean,
)

companion object {
private const val DEFAULT_HEARTBEAT_INTERVAL_SECONDS = 30L
private const val DEFAULT_SESSION_TTL_SECONDS = 90L
private const val HEARTBEAT_INTERVAL_ENV = "PLAYER_PRESENCE_HEARTBEAT_SECONDS"
private const val SESSION_TTL_ENV = "PLAYER_SESSIONS_TTL"

internal fun resolveHeartbeatIntervalSeconds(
heartbeatIntervalRaw: String?,
sessionTtlRaw: String?,
): HeartbeatIntervalResolution {
val configuredInterval =
heartbeatIntervalRaw?.trim()?.toLongOrNull()?.takeIf { it > 0 }
?: DEFAULT_HEARTBEAT_INTERVAL_SECONDS
val sessionTtlSeconds =
parseSessionTtlSeconds(sessionTtlRaw) ?: DEFAULT_SESSION_TTL_SECONDS
val maxIntervalSeconds = max(1L, sessionTtlSeconds / 3)
val effectiveInterval = min(configuredInterval, maxIntervalSeconds)
return HeartbeatIntervalResolution(
configuredIntervalSeconds = configuredInterval,
effectiveIntervalSeconds = effectiveInterval,
sessionTtlSeconds = sessionTtlSeconds,
maxIntervalSeconds = maxIntervalSeconds,
wasClamped = effectiveInterval != configuredInterval,
)
}

private fun parseSessionTtlSeconds(raw: String?): Long? {
val value = raw?.trim()?.lowercase() ?: return null
if (value.isEmpty()) {
return null
}
return when {
value.endsWith("s") -> value.removeSuffix("s").toLongOrNull()
value.endsWith("m") -> value.removeSuffix("m").toLongOrNull()?.times(60)
value.endsWith("h") -> value.removeSuffix("h").toLongOrNull()?.times(3600)
else -> null
}?.takeIf { it > 0 }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@ import java.util.UUID
class PlayerPresenceService : AutoCloseable {
private lateinit var client: GrpcPlayerPresenceClient

data class HeartbeatBatchResult(
val success: Boolean,
val message: String,
val updated: Int,
val missing: Int,
)

fun configure(target: String) {
close()
client = GrpcPlayerPresenceClient.create(target)
Expand All @@ -29,6 +36,20 @@ class PlayerPresenceService : AutoCloseable {
}
}

fun heartbeatBatch(playerIds: Collection<UUID>): HeartbeatBatchResult {
return try {
val reply = client.heartbeatBatch(playerIds)
HeartbeatBatchResult(reply.success, reply.message, reply.updated, reply.missing)
} catch (e: RuntimeException) {
HeartbeatBatchResult(
success = false,
message = e.message ?: e::class.java.name,
updated = 0,
missing = playerIds.size,
)
}
}

override fun close() {
if (this::client.isInitialized) {
client.close()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package gg.grounds.presence

import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertFalse
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.Test

class PlayerHeartbeatSchedulerTest {
@Test
fun resolveHeartbeatIntervalUsesDefaultsWhenUnset() {
val result = PlayerHeartbeatScheduler.resolveHeartbeatIntervalSeconds(null, null)

assertEquals(30, result.configuredIntervalSeconds)
assertEquals(30, result.effectiveIntervalSeconds)
assertEquals(90, result.sessionTtlSeconds)
assertEquals(30, result.maxIntervalSeconds)
assertFalse(result.wasClamped)
}

@Test
fun resolveHeartbeatIntervalClampsWhenAboveSafeMaximum() {
val result = PlayerHeartbeatScheduler.resolveHeartbeatIntervalSeconds("60", "90s")

assertEquals(60, result.configuredIntervalSeconds)
assertEquals(30, result.effectiveIntervalSeconds)
assertEquals(90, result.sessionTtlSeconds)
assertEquals(30, result.maxIntervalSeconds)
assertTrue(result.wasClamped)
}

@Test
fun resolveHeartbeatIntervalDoesNotClampWhenWithinSafeMaximum() {
val result = PlayerHeartbeatScheduler.resolveHeartbeatIntervalSeconds("20", "90s")

assertEquals(20, result.configuredIntervalSeconds)
assertEquals(20, result.effectiveIntervalSeconds)
assertEquals(90, result.sessionTtlSeconds)
assertEquals(30, result.maxIntervalSeconds)
assertFalse(result.wasClamped)
}

@Test
fun resolveHeartbeatIntervalParsesMinuteTtl() {
val result = PlayerHeartbeatScheduler.resolveHeartbeatIntervalSeconds("90", "6m")

assertEquals(90, result.configuredIntervalSeconds)
assertEquals(90, result.effectiveIntervalSeconds)
assertEquals(360, result.sessionTtlSeconds)
assertEquals(120, result.maxIntervalSeconds)
assertFalse(result.wasClamped)
}

@Test
fun resolveHeartbeatIntervalFallsBackForInvalidValues() {
val result = PlayerHeartbeatScheduler.resolveHeartbeatIntervalSeconds("-5", "oops")

assertEquals(30, result.configuredIntervalSeconds)
assertEquals(30, result.effectiveIntervalSeconds)
assertEquals(90, result.sessionTtlSeconds)
assertEquals(30, result.maxIntervalSeconds)
assertFalse(result.wasClamped)
}
}