From 1ee3f1bef64c1ccf04043efb740df777f2cdddf8 Mon Sep 17 00:00:00 2001 From: Caelum Forder Date: Fri, 5 Dec 2025 22:47:23 -0800 Subject: [PATCH 1/2] Add agent sleeping --- .../coralserver/agent/graph/GraphAgent.kt | 12 +++ .../agent/graph/GraphAgentRequest.kt | 40 +++++++++ .../coralprotocol/coralserver/mcp/McpTool.kt | 1 + .../mcp/resources/AgentResource.kt | 3 +- .../mcp/resources/InstructionResource.kt | 7 +- .../mcp/resources/MessageResource.kt | 3 +- .../coralserver/server/CoralServer.kt | 1 + .../coralserver/session/LocalSession.kt | 84 ++++++++++++++++++- .../session/models/SessionAgent.kt | 1 + 9 files changed, 146 insertions(+), 6 deletions(-) diff --git a/src/main/kotlin/org/coralprotocol/coralserver/agent/graph/GraphAgent.kt b/src/main/kotlin/org/coralprotocol/coralserver/agent/graph/GraphAgent.kt index e5438671..3ca0b371 100644 --- a/src/main/kotlin/org/coralprotocol/coralserver/agent/graph/GraphAgent.kt +++ b/src/main/kotlin/org/coralprotocol/coralserver/agent/graph/GraphAgent.kt @@ -86,6 +86,18 @@ data class GraphAgent( */ val x402Budgets: List, + /** + * A set of events that should cause this agent to enter a sleeping state + * @see GraphAgentRequest.sleepEvents + */ + val sleepEvents: Set = emptySet(), + + /** + * A set of events that should cause this agent to wake from a sleeping state + * @see GraphAgentRequest.sleepEvents + */ + val wakeEvents: Set = emptySet(), + /** * Runtime secret ID. This is given to agents as an environment variable so that they may identify themselves to * the server securely. This is useful for example, when consuming x402 budgets, we do not want to let agent A diff --git a/src/main/kotlin/org/coralprotocol/coralserver/agent/graph/GraphAgentRequest.kt b/src/main/kotlin/org/coralprotocol/coralserver/agent/graph/GraphAgentRequest.kt index 4585ce6f..6c8178db 100644 --- a/src/main/kotlin/org/coralprotocol/coralserver/agent/graph/GraphAgentRequest.kt +++ b/src/main/kotlin/org/coralprotocol/coralserver/agent/graph/GraphAgentRequest.kt @@ -48,6 +48,14 @@ data class GraphAgentRequest( @Description("An optional list of resources and an accompanied budget that this agent may spend on services that accept x402 payments") val x402Budgets: List = listOf(), + + @Description("Events that should cause this agent to go to sleep") + @SerialName("sleepEvents") + val sleepEvents: Set = emptySet(), + + @Description("Events that should cause this agent to wake up") + @SerialName("wakeEvents") + val wakeEvents: Set = emptySet(), ) { /** * Given a reference to the agent registry [AgentRegistry], this function will attempt to convert this request into @@ -124,6 +132,38 @@ data class GraphAgentRequest( plugins = plugins, provider = provider, x402Budgets = x402Budgets, + sleepEvents = sleepEvents, + wakeEvents = wakeEvents, ) } } + +/** + * Events that will put the agent into a sleeping state. + */ +@Serializable +enum class SleepEvent { + @SerialName("agent_started") + AGENT_STARTED, + + @SerialName("removed_from_last_thread") + REMOVED_FROM_LAST_THREAD, + + @SerialName("last_thread_closed") + LAST_THREAD_CLOSED, +} + +/** + * Events that will wake the agent from a sleeping state. + */ +@Serializable +enum class WakeEvent { + @SerialName("added_to_any_thread") + ADDED_TO_ANY_THREAD, + + @SerialName("added_to_thread_first_time") + ADDED_TO_THREAD_FIRST_TIME, + + @SerialName("mentioned") + MENTIONED, +} diff --git a/src/main/kotlin/org/coralprotocol/coralserver/mcp/McpTool.kt b/src/main/kotlin/org/coralprotocol/coralserver/mcp/McpTool.kt index edad4537..6228ea8a 100644 --- a/src/main/kotlin/org/coralprotocol/coralserver/mcp/McpTool.kt +++ b/src/main/kotlin/org/coralprotocol/coralserver/mcp/McpTool.kt @@ -23,6 +23,7 @@ abstract class McpTool() { internal abstract suspend fun execute(mcpServer: CoralAgentIndividualMcp, arguments: T): McpToolResult internal suspend fun executeRaw(mcpServer: CoralAgentIndividualMcp, request: CallToolRequest): CallToolResult { + mcpServer.localSession.awaitAwake(mcpServer.connectedAgentId) val arguments = try { apiJsonConfig.decodeFromString(argumentsSerializer, request.arguments.toString()) diff --git a/src/main/kotlin/org/coralprotocol/coralserver/mcp/resources/AgentResource.kt b/src/main/kotlin/org/coralprotocol/coralserver/mcp/resources/AgentResource.kt index fd439da3..2c3bd626 100644 --- a/src/main/kotlin/org/coralprotocol/coralserver/mcp/resources/AgentResource.kt +++ b/src/main/kotlin/org/coralprotocol/coralserver/mcp/resources/AgentResource.kt @@ -6,7 +6,8 @@ import io.modelcontextprotocol.kotlin.sdk.TextResourceContents import org.coralprotocol.coralserver.mcp.McpResources.AGENT_RESOURCE_URI import org.coralprotocol.coralserver.server.CoralAgentIndividualMcp -private fun CoralAgentIndividualMcp.handler(request: ReadResourceRequest): ReadResourceResult { +private suspend fun CoralAgentIndividualMcp.handler(request: ReadResourceRequest): ReadResourceResult { + localSession.awaitAwake(connectedAgentId) val otherAgents = localSession .agents .filter { (name, _) -> name != connectedAgentId } diff --git a/src/main/kotlin/org/coralprotocol/coralserver/mcp/resources/InstructionResource.kt b/src/main/kotlin/org/coralprotocol/coralserver/mcp/resources/InstructionResource.kt index cd21b2dd..207065f9 100644 --- a/src/main/kotlin/org/coralprotocol/coralserver/mcp/resources/InstructionResource.kt +++ b/src/main/kotlin/org/coralprotocol/coralserver/mcp/resources/InstructionResource.kt @@ -21,7 +21,8 @@ and expect or require a response from another agent, use the $WAIT_FOR_MENTIONS In most cases assistant message output will not reach the user. Use tooling where possible to communicate with the user instead. """ -private fun handle(request: ReadResourceRequest): ReadResourceResult { +private suspend fun CoralAgentIndividualMcp.handle(request: ReadResourceRequest): ReadResourceResult { + localSession.awaitAwake(connectedAgentId) return ReadResourceResult( contents = listOf( TextResourceContents( @@ -39,7 +40,7 @@ fun CoralAgentIndividualMcp.addInstructionResource() { description = "Coral instructions resource", uri = INSTRUCTION_RESOURCE_URI.toString(), mimeType = "text/markdown", - readHandler = ::handle, + readHandler = { request -> this.handle(request) }, ) // deprecated addResource( @@ -47,6 +48,6 @@ fun CoralAgentIndividualMcp.addInstructionResource() { description = "Coral instructions resource", uri = "Instruction.resource", mimeType = "text/markdown", - readHandler = ::handle, + readHandler = { request -> this.handle(request) }, ) } diff --git a/src/main/kotlin/org/coralprotocol/coralserver/mcp/resources/MessageResource.kt b/src/main/kotlin/org/coralprotocol/coralserver/mcp/resources/MessageResource.kt index 9023c9a3..9c5c60b7 100644 --- a/src/main/kotlin/org/coralprotocol/coralserver/mcp/resources/MessageResource.kt +++ b/src/main/kotlin/org/coralprotocol/coralserver/mcp/resources/MessageResource.kt @@ -10,7 +10,8 @@ import org.coralprotocol.coralserver.models.ResolvedThread import org.coralprotocol.coralserver.models.resolve import org.coralprotocol.coralserver.server.CoralAgentIndividualMcp -private fun CoralAgentIndividualMcp.handler(request: ReadResourceRequest): ReadResourceResult { +private suspend fun CoralAgentIndividualMcp.handler(request: ReadResourceRequest): ReadResourceResult { + localSession.awaitAwake(connectedAgentId) val threadsAgentPrivyIn: List = this.localSession.getAllThreadsAgentParticipatesIn(this.connectedAgentId).map { it -> it.resolve() } val renderedThreads: String = XML.encodeToString(threadsAgentPrivyIn, rootName = QName("threads")) return ReadResourceResult( diff --git a/src/main/kotlin/org/coralprotocol/coralserver/server/CoralServer.kt b/src/main/kotlin/org/coralprotocol/coralserver/server/CoralServer.kt index c85a586f..ea6df211 100644 --- a/src/main/kotlin/org/coralprotocol/coralserver/server/CoralServer.kt +++ b/src/main/kotlin/org/coralprotocol/coralserver/server/CoralServer.kt @@ -200,6 +200,7 @@ class CoralServer( debugApiRoutes(localSessionManager) sessionApiRoutes(registry, localSessionManager, devmode) messageApiRoutes(mcpServersByTransportId, localSessionManager, remoteSessionManager) + sleepingRoutes(localSessionManager) telemetryApiRoutes(localSessionManager) documentationApiRoutes() agentApiRoutes(registry, blockchainService, remoteSessionManager, jupiterService, config.paymentConfig) diff --git a/src/main/kotlin/org/coralprotocol/coralserver/session/LocalSession.kt b/src/main/kotlin/org/coralprotocol/coralserver/session/LocalSession.kt index 189a4245..201a06ce 100644 --- a/src/main/kotlin/org/coralprotocol/coralserver/session/LocalSession.kt +++ b/src/main/kotlin/org/coralprotocol/coralserver/session/LocalSession.kt @@ -3,8 +3,12 @@ package org.coralprotocol.coralserver.session import io.github.oshai.kotlinlogging.KotlinLogging import io.ktor.util.collections.* import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.ensureActive +import kotlinx.coroutines.currentCoroutineContext import org.coralprotocol.coralserver.EventBus import org.coralprotocol.coralserver.agent.graph.AgentGraph +import org.coralprotocol.coralserver.agent.graph.SleepEvent +import org.coralprotocol.coralserver.agent.graph.WakeEvent import org.coralprotocol.coralserver.models.Message import org.coralprotocol.coralserver.models.Thread import org.coralprotocol.coralserver.models.resolve @@ -45,6 +49,10 @@ class LocalSession( private val eventBus = EventBus() val events get() = eventBus.events + private val firstThreadAdded = ConcurrentSet() + + private val sleepingWaiters = ConcurrentHashMap>() + init { agentGraph?.run { for (id in agents.keys) { @@ -128,6 +136,13 @@ class LocalSession( ) agents[sessionAgent.id] = sessionAgent + graphAgent?.sleepEvents?.let { events -> + if (events.contains(SleepEvent.AGENT_STARTED)) { + setAgentSleeping(agentId, true) + } + } + // TODO: eventBus.emit(SessionEvent.AgentRegistered(sessionAgent)) ? + return sessionAgent } @@ -202,6 +217,18 @@ class LocalSession( if (!thread.participants.contains(participantId)) { thread.participants.add(participantId) lastReadMessageIndex[Pair(participantId, threadId)] = thread.messages.size + + val graphAgent = agentGraph?.agents[participantId] + val isFirstTime = !firstThreadAdded.contains(participantId) + if (graphAgent != null) { // else devmode + if (graphAgent.wakeEvents.contains(WakeEvent.ADDED_TO_ANY_THREAD)) { + setAgentSleeping(participantId, false) + } + if (isFirstTime && graphAgent.wakeEvents.contains(WakeEvent.ADDED_TO_THREAD_FIRST_TIME)) { + setAgentSleeping(participantId, false) + } + } + if (isFirstTime) firstThreadAdded.add(participantId) } return true } @@ -211,7 +238,15 @@ class LocalSession( if (thread.isClosed) return false - return thread.participants.remove(participantId) + val wasActuallyRemoved = thread.participants.remove(participantId) + if (wasActuallyRemoved) { + val hasRemainingOpenThreads = threads.values.any { it.participants.contains(participantId) && !it.isClosed } + val graphAgent = agentGraph?.agents[participantId] + if (!hasRemainingOpenThreads && graphAgent?.sleepEvents?.contains(SleepEvent.REMOVED_FROM_LAST_THREAD) == true) { + setAgentSleeping(participantId, true) + } + } + return wasActuallyRemoved } fun closeThread(threadId: String, summary: String): Boolean { @@ -220,6 +255,16 @@ class LocalSession( thread.isClosed = true thread.summary = summary + // Sleep agents that now have no open threads if configured + val participantsSnapshot = thread.participants.toList() + participantsSnapshot.forEach { participantId -> + val graphAgent = agentGraph?.agents[participantId] + val hasAnyOpenThreads = threads.values.any { it.participants.contains(participantId) && !it.isClosed } + if (!hasAnyOpenThreads && graphAgent?.sleepEvents?.contains(SleepEvent.LAST_THREAD_CLOSED) == true) { + setAgentSleeping(participantId, true) + } + } + return true } @@ -266,6 +311,11 @@ class LocalSession( if (deferred != null && !deferred.isCompleted) { deferred.complete(listOf(message)) } + // Wake on mention if configured + val graphAgent = agentGraph?.agents[mentionId] + if (graphAgent?.wakeEvents?.contains(WakeEvent.MENTIONED) == true) { + setAgentSleeping(mentionId, false) + } } } @@ -332,4 +382,36 @@ class LocalSession( super.destroy(sessionCloseMode) clearAll() } + + fun isAgentSleeping(agentId: String): Boolean { + return agents[agentId]?.sleeping ?: false + } + + fun setAgentSleeping(agentId: String, sleeping: Boolean): Boolean { + val agent = agents[agentId] ?: return false + agent.sleeping = sleeping + if (!sleeping) { + sleepingWaiters.remove(agentId)?.complete(Unit) + } else { + sleepingWaiters.remove(agentId) + } + return true + } + + suspend fun awaitAwake(agentId: String) { + val agent = agents[agentId] ?: return + if (!agent.sleeping) return + var waiter = sleepingWaiters[agentId] + if (waiter == null) { + val newWaiter = CompletableDeferred() + val prev = sleepingWaiters.putIfAbsent(agentId, newWaiter) + waiter = prev ?: newWaiter + } + if (!agent.sleeping) { + sleepingWaiters.remove(agentId)?.complete(Unit) + return + } + currentCoroutineContext().ensureActive() + waiter.await() + } } diff --git a/src/main/kotlin/org/coralprotocol/coralserver/session/models/SessionAgent.kt b/src/main/kotlin/org/coralprotocol/coralserver/session/models/SessionAgent.kt index 1bc2ae2e..92179b47 100644 --- a/src/main/kotlin/org/coralprotocol/coralserver/session/models/SessionAgent.kt +++ b/src/main/kotlin/org/coralprotocol/coralserver/session/models/SessionAgent.kt @@ -13,6 +13,7 @@ data class SessionAgent( val id: String, var description: String = "", var state: SessionAgentState = SessionAgentState.Disconnected, + var sleeping: Boolean = false, var mcpUrl: String?, val extraTools: Set = setOf(), val coralPlugins: Set = setOf(), From 6f3dedbb0186d5ad33b07f04b6d1e14e77762760 Mon Sep 17 00:00:00 2001 From: Caelum Forder Date: Fri, 5 Dec 2025 22:52:07 -0800 Subject: [PATCH 2/2] Add agent sleeping routes --- .../routes/api/v1/SleepingRoutes.kt | 110 ++++++++++++++++++ 1 file changed, 110 insertions(+) create mode 100644 src/main/kotlin/org/coralprotocol/coralserver/routes/api/v1/SleepingRoutes.kt diff --git a/src/main/kotlin/org/coralprotocol/coralserver/routes/api/v1/SleepingRoutes.kt b/src/main/kotlin/org/coralprotocol/coralserver/routes/api/v1/SleepingRoutes.kt new file mode 100644 index 00000000..a04de948 --- /dev/null +++ b/src/main/kotlin/org/coralprotocol/coralserver/routes/api/v1/SleepingRoutes.kt @@ -0,0 +1,110 @@ +package org.coralprotocol.coralserver.routes.api.v1 + +import io.github.smiley4.ktoropenapi.resources.get +import io.github.smiley4.ktoropenapi.resources.post +import io.github.smiley4.ktoropenapi.resources.put +import io.ktor.http.* +import io.ktor.resources.* +import io.ktor.server.request.* +import io.ktor.server.response.* +import io.ktor.server.routing.* +import kotlinx.serialization.Serializable +import org.coralprotocol.coralserver.server.RouteException +import org.coralprotocol.coralserver.session.LocalSessionManager + +@Resource("/api/v1/sessions/{sessionId}/{agentId}/sleeping") +class Sleeping(val sessionId: String, val agentId: String) + +@Serializable +data class SleepState(val sleeping: Boolean) + + +fun Routing.sleepingRoutes(localSessionManager: LocalSessionManager) { + + get({ + summary = "Get agent sleeping state" + description = "Returns whether a given agent instance in a session is sleeping" + operationId = "getAgentSleeping" + request { + pathParameter("sessionId") { description = "The session ID" } + pathParameter("agentId") { description = "The agent ID within the session" } + } + response { + HttpStatusCode.OK to { + description = "Success" + body { description = "Current sleeping state" } + } + HttpStatusCode.NotFound to { + description = "Session or agent not found" + body { description = "Error details" } + } + } + }) { + val session = localSessionManager.getSession(it.sessionId) ?: throw RouteException(HttpStatusCode.NotFound, "Session not found") + val agent = session.getAgent(it.agentId) ?: throw RouteException(HttpStatusCode.NotFound, "Agent not found in session") + call.respond(HttpStatusCode.OK, SleepState(agent.sleeping)) + } + + put({ + summary = "Set agent sleeping state" + description = "Sets whether a given agent instance in a session is sleeping" + operationId = "setAgentSleeping" + request { + body { description = "Desired sleeping state" } + } + response { + HttpStatusCode.OK to { + description = "Success" + body { description = "Updated sleeping state" } + } + HttpStatusCode.NotFound to { + description = "Session or agent not found" + body { description = "Error details" } + } + } + }) { sleeping -> + val body = call.receive() + val result = setSleepingState(localSessionManager, sleeping.sessionId, sleeping.agentId, body.sleeping) + call.respond(HttpStatusCode.OK, result) + } + + + // TODO: This POST endpoint duplicates the PUT behavior. Not strictly REST-compliant, + // but provided because custom tools can only issue POST requests. + post({ + summary = "Set agent sleeping state (POST)" + description = "Same behavior as PUT: sets whether a given agent instance in a session is sleeping" + operationId = "setAgentSleepingPost" + request { + body { description = "Desired sleeping state" } + } + response { + HttpStatusCode.OK to { + description = "Success" + body { description = "Updated sleeping state" } + } + HttpStatusCode.NotFound to { + description = "Session or agent not found" + body { description = "Error details" } + } + } + }) { sleeping -> + val body = call.receive() + val result = setSleepingState(localSessionManager, sleeping.sessionId, sleeping.agentId, body.sleeping) + call.respond(HttpStatusCode.OK, result) + } +} + +private fun setSleepingState( + localSessionManager: LocalSessionManager, + sessionId: String, + agentId: String, + desiredSleeping: Boolean +): SleepState { + val session = localSessionManager.getSession(sessionId) + ?: throw RouteException(HttpStatusCode.NotFound, "Session not found") + if (!session.setAgentSleeping(agentId, desiredSleeping)) { + throw RouteException(HttpStatusCode.NotFound, "Agent not found in session") + } + return SleepState(session.isAgentSleeping(agentId)) +}