diff --git a/module-external-api/src/main/kotlin/maple/externalapi/runstatus/InternalApiController.kt b/module-external-api/src/main/kotlin/maple/externalapi/runstatus/InternalApiController.kt index 8449cd6f7..45cba5a75 100644 --- a/module-external-api/src/main/kotlin/maple/externalapi/runstatus/InternalApiController.kt +++ b/module-external-api/src/main/kotlin/maple/externalapi/runstatus/InternalApiController.kt @@ -80,7 +80,7 @@ class InternalApiController( } val runId = airflowRunId ?: UUID.randomUUID().toString() - executor.submit { scheduler.triggerDailyRefresh(runId).join() } + executor.submit { scheduler.triggerDailyRefresh(runId) } return ResponseEntity.accepted().body(mapOf("status" to "STARTED", "runId" to runId)) } @@ -120,7 +120,7 @@ class InternalApiController( } val runId = airflowRunId ?: UUID.randomUUID().toString() - executor.submit { scheduler.triggerPhase(phase, runId, upstreamRunId).join() } + executor.submit { scheduler.triggerPhase(phase, runId, upstreamRunId) } return ResponseEntity.accepted().body(mapOf("status" to "STARTED", "runId" to runId)) } diff --git a/module-external-api/src/test/kotlin/maple/externalapi/runstatus/InternalApiControllerTest.kt b/module-external-api/src/test/kotlin/maple/externalapi/runstatus/InternalApiControllerTest.kt index ce8b5fe40..4ac319703 100644 --- a/module-external-api/src/test/kotlin/maple/externalapi/runstatus/InternalApiControllerTest.kt +++ b/module-external-api/src/test/kotlin/maple/externalapi/runstatus/InternalApiControllerTest.kt @@ -4,16 +4,20 @@ import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule import com.fasterxml.jackson.module.kotlin.registerKotlinModule import java.time.Instant +import java.util.concurrent.CompletableFuture +import java.util.concurrent.TimeUnit import maple.externalapi.loop.PhaseLoopController import maple.externalapi.scheduler.ExternalApiScheduler import maple.externalapi.scheduler.PhaseStopSignal import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test +import org.junit.jupiter.api.Timeout import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Assertions.assertFalse import org.junit.jupiter.api.Assertions.assertTrue import org.mockito.kotlin.any +import org.mockito.kotlin.anyOrNull import org.mockito.kotlin.mock import org.mockito.kotlin.verify import org.mockito.kotlin.whenever @@ -203,6 +207,124 @@ class InternalApiControllerTest { .andExpect(status().isAccepted) } + @Test + @Timeout(value = 5, unit = TimeUnit.SECONDS) + fun `POST trigger daily returns 202 immediately without blocking on phase completion`() { + // Capture the Future returned by submit() and assert it completes + // within a short window. With the .join() bug, the runnable blocks + // on a never-completing CF, so the Future never completes. + val capturedFutures = java.util.concurrent.ConcurrentLinkedQueue>() + val capturingExecutor = java.util.concurrent.Executors.newSingleThreadExecutor { r -> + Thread(r, "test-capturing").apply { isDaemon = true } + } + // Wrap submit via reflection-free override: use a delegating executor. + // Easiest path: subclass to override submit(Runnable). + val instrumentedExecutor = object : java.util.concurrent.AbstractExecutorService() { + override fun shutdown() = capturingExecutor.shutdown() + override fun shutdownNow(): MutableList = capturingExecutor.shutdownNow() + override fun isShutdown(): Boolean = capturingExecutor.isShutdown + override fun isTerminated(): Boolean = capturingExecutor.isTerminated + override fun awaitTermination(timeout: Long, unit: java.util.concurrent.TimeUnit): Boolean = + capturingExecutor.awaitTermination(timeout, unit) + override fun execute(command: Runnable) { + capturedFutures.add(capturingExecutor.submit(command)) + } + } + try { + val neverCompleting = CompletableFuture() + // triggerDailyRefresh takes 1 arg (airflowRunId: String?). any() matches non-null, + // so we don't need anyOrNull here — but the controller may pass null when no + // X-Airflow-Run-Id header is set, so we use anyOrNull for safety. + whenever(scheduler.triggerDailyRefresh(anyOrNull())).thenReturn(neverCompleting) + + val syncController = InternalApiController( + runStatusTracker, scheduler, instrumentedExecutor, phaseLoopController, + ) + val syncMockMvc = standaloneSetup(syncController) + .setMessageConverters(MappingJackson2HttpMessageConverter(objectMapper)) + .build() + + syncMockMvc.perform(post("/api/internal/trigger/daily")) + .andExpect(status().isAccepted) + .andExpect(jsonPath("$.status").value("STARTED")) + + val future = capturedFutures.peek() + ?: error("expected exactly one submitted task; queue was empty") + // Fire-and-forget: the runnable must return quickly. With .join() + // on a never-completing CF, the runnable blocks indefinitely. + val completed = try { + future.get(500, TimeUnit.MILLISECONDS) + true + } catch (ex: java.util.concurrent.TimeoutException) { + false + } + assertThat(completed) + .withFailMessage( + "executor.submit() future should be done immediately (fire-and-forget); " + + "it never completed, which means the controller still .join()s on the phase CF." + ) + .isTrue() + } finally { + capturingExecutor.shutdownNow() + } + } + + @Test + @Timeout(value = 5, unit = TimeUnit.SECONDS) + fun `POST trigger phase returns 202 immediately without blocking on phase completion`() { + val capturedFutures = java.util.concurrent.ConcurrentLinkedQueue>() + val capturingExecutor = java.util.concurrent.Executors.newSingleThreadExecutor { r -> + Thread(r, "test-capturing").apply { isDaemon = true } + } + val instrumentedExecutor = object : java.util.concurrent.AbstractExecutorService() { + override fun shutdown() = capturingExecutor.shutdown() + override fun shutdownNow(): MutableList = capturingExecutor.shutdownNow() + override fun isShutdown(): Boolean = capturingExecutor.isShutdown + override fun isTerminated(): Boolean = capturingExecutor.isTerminated + override fun awaitTermination(timeout: Long, unit: java.util.concurrent.TimeUnit): Boolean = + capturingExecutor.awaitTermination(timeout, unit) + override fun execute(command: Runnable) { + capturedFutures.add(capturingExecutor.submit(command)) + } + } + try { + val neverCompleting = CompletableFuture() + // Stub: any 4-arg call (with loopId defaulting to null) returns never-completing CF + whenever(scheduler.triggerPhase(any(), any(), any(), anyOrNull())).thenReturn(neverCompleting) + + val syncController = InternalApiController( + runStatusTracker, scheduler, instrumentedExecutor, phaseLoopController, + ) + val syncMockMvc = standaloneSetup(syncController) + .setMessageConverters(MappingJackson2HttpMessageConverter(objectMapper)) + .build() + + syncMockMvc.perform( + post("/api/internal/trigger/phase/RANKING_FETCH") + .header("X-Upstream-Run-Id", "u-1"), + ) + .andExpect(status().isAccepted) + .andExpect(jsonPath("$.status").value("STARTED")) + + val future = capturedFutures.peek() + ?: error("expected exactly one submitted task; queue was empty") + val completed = try { + future.get(500, TimeUnit.MILLISECONDS) + true + } catch (ex: java.util.concurrent.TimeoutException) { + false + } + assertThat(completed) + .withFailMessage( + "executor.submit() future should be done immediately (fire-and-forget); " + + "it never completed, which means the controller still .join()s on the phase CF." + ) + .isTrue() + } finally { + capturingExecutor.shutdownNow() + } + } + @Test fun `POST trigger phase returns 202 with runId when slot empty`() { val runStatusTracker = mock() diff --git a/module-external-api/src/test/kotlin/maple/externalapi/test/ExtApiBlockingPrimitiveGateTest.kt b/module-external-api/src/test/kotlin/maple/externalapi/test/ExtApiBlockingPrimitiveGateTest.kt new file mode 100644 index 000000000..8cf353774 --- /dev/null +++ b/module-external-api/src/test/kotlin/maple/externalapi/test/ExtApiBlockingPrimitiveGateTest.kt @@ -0,0 +1,76 @@ +package maple.externalapi.test + +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.Test +import java.io.File + +// CI gate: fails the build if blocking primitives reappear in +// module-external-api runstatus/ or scheduler/ main sources. +// +// Blocking primitives checked: +// - .join() on a CompletableFuture chain (sync coercion of async pipeline) +// - runBlocking { (Kotlin coroutine bridge that pins carrier thread) +// - Task.join() (JavaFX / Task join - out of scope here, but cheap to catch) +// - Thread.sleep( (sleeping inside async pipeline is always wrong) +// +// Allowlist rationale (each entry is a documented exception, not a free pass): +// - ExternalApiScheduler.kt runBlocking: documented acceptable VT-carrier +// bridge per async-patterns.md (see audit + ADR-blocking-async-contract-cf-chain). +// Path-level allowlist for runBlocking ONLY - must NOT introduce .join(), +// Thread.sleep, or Task.join(). +// - Coroutine Job.join() (e.g. writerJob.join()) inside the documented +// runBlocking block: legitimate Kotlin coroutine bridge. +// - Comments and Deprecated shim bodies: not new blocking code. +class ExtApiBlockingPrimitiveGateTest { + @Test + fun `no blocking primitives in module-external-api runstatus or scheduler main sources`() { + val srcRoots = listOf( + File("src/main/kotlin/maple/externalapi/runstatus"), + File("src/main/kotlin/maple/externalapi/scheduler"), + ) + + val violations = mutableListOf() + val patterns = listOf( + Regex("""\.join\(\)"""), + Regex("""runBlocking\s*\{"""), + Regex("""Task\.join\(\)"""), + Regex("""Thread\.sleep\s*\("""), + ) + + srcRoots.forEach { srcRoot -> + if (!srcRoot.exists()) return@forEach + srcRoot.walkTopDown() + .filter { it.extension in listOf("kt", "java") } + .forEach { file -> + file.readLines().forEachIndexed { i, line -> + val trimmed = line.trim() + if (trimmed.isEmpty()) return@forEachIndexed + if (patterns.any { it.containsMatchIn(trimmed) } && !isAllowlisted(file, i, trimmed)) { + violations.add("${file.path}:${i + 1}: $trimmed") + } + } + } + } + + assertThat(violations) + .withFailMessage("Blocking primitives found:\n${violations.joinToString("\n")}") + .isEmpty() + } + + private fun isAllowlisted(file: File, line: Int, text: String): Boolean { + val path = file.absolutePath + val isComment = text.startsWith("//") || + text.startsWith("*") || + text.startsWith("/*") + val isDeprecatedLine = text.contains("@Deprecated") + + // ExternalApiScheduler.kt runBlocking: documented acceptable VT-carrier bridge + val isLegacyRunBlocking = path.contains("ExternalApiScheduler") && + text.contains("runBlocking") + + // Coroutine Job.join() (e.g. writerJob.join()) inside the documented runBlocking block + val isCoroutineJobJoin = text.matches(Regex("""[a-zA-Z_][a-zA-Z0-9_]*Job\.join\(\).*""")) + + return isComment || isDeprecatedLine || isLegacyRunBlocking || isCoroutineJobJoin + } +}