Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class InternalApiController(
}

val runId = airflowRunId ?: UUID.randomUUID().toString()
executor.submit { scheduler.triggerDailyRefresh(runId).join() }
executor.submit { scheduler.triggerDailyRefresh(runId) }
return ResponseEntity.accepted().body(mapOf("status" to "STARTED", "runId" to runId))
}

Expand Down Expand Up @@ -120,7 +120,7 @@ class InternalApiController(
}

val runId = airflowRunId ?: UUID.randomUUID().toString()
executor.submit { scheduler.triggerPhase(phase, runId, upstreamRunId).join() }
executor.submit { scheduler.triggerPhase(phase, runId, upstreamRunId) }
return ResponseEntity.accepted().body(mapOf("status" to "STARTED", "runId" to runId))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,20 @@ import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
import com.fasterxml.jackson.module.kotlin.registerKotlinModule
import java.time.Instant
import java.util.concurrent.CompletableFuture
import java.util.concurrent.TimeUnit
import maple.externalapi.loop.PhaseLoopController
import maple.externalapi.scheduler.ExternalApiScheduler
import maple.externalapi.scheduler.PhaseStopSignal
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.Timeout
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertFalse
import org.junit.jupiter.api.Assertions.assertTrue
import org.mockito.kotlin.any
import org.mockito.kotlin.anyOrNull
import org.mockito.kotlin.mock
import org.mockito.kotlin.verify
import org.mockito.kotlin.whenever
Expand Down Expand Up @@ -203,6 +207,124 @@ class InternalApiControllerTest {
.andExpect(status().isAccepted)
}

@Test
@Timeout(value = 5, unit = TimeUnit.SECONDS)
fun `POST trigger daily returns 202 immediately without blocking on phase completion`() {
// Capture the Future returned by submit() and assert it completes
// within a short window. With the .join() bug, the runnable blocks
// on a never-completing CF, so the Future never completes.
val capturedFutures = java.util.concurrent.ConcurrentLinkedQueue<java.util.concurrent.Future<*>>()
val capturingExecutor = java.util.concurrent.Executors.newSingleThreadExecutor { r ->
Thread(r, "test-capturing").apply { isDaemon = true }
}
// Wrap submit via reflection-free override: use a delegating executor.
// Easiest path: subclass to override submit(Runnable).
val instrumentedExecutor = object : java.util.concurrent.AbstractExecutorService() {
override fun shutdown() = capturingExecutor.shutdown()
override fun shutdownNow(): MutableList<Runnable> = capturingExecutor.shutdownNow()
override fun isShutdown(): Boolean = capturingExecutor.isShutdown
override fun isTerminated(): Boolean = capturingExecutor.isTerminated
override fun awaitTermination(timeout: Long, unit: java.util.concurrent.TimeUnit): Boolean =
capturingExecutor.awaitTermination(timeout, unit)
override fun execute(command: Runnable) {
capturedFutures.add(capturingExecutor.submit(command))
}
}
try {
val neverCompleting = CompletableFuture<Void>()
// triggerDailyRefresh takes 1 arg (airflowRunId: String?). any() matches non-null,
// so we don't need anyOrNull here — but the controller may pass null when no
// X-Airflow-Run-Id header is set, so we use anyOrNull for safety.
whenever(scheduler.triggerDailyRefresh(anyOrNull())).thenReturn(neverCompleting)

val syncController = InternalApiController(
runStatusTracker, scheduler, instrumentedExecutor, phaseLoopController,
)
val syncMockMvc = standaloneSetup(syncController)
.setMessageConverters(MappingJackson2HttpMessageConverter(objectMapper))
.build()

syncMockMvc.perform(post("/api/internal/trigger/daily"))
.andExpect(status().isAccepted)
.andExpect(jsonPath("$.status").value("STARTED"))

val future = capturedFutures.peek()
?: error("expected exactly one submitted task; queue was empty")
// Fire-and-forget: the runnable must return quickly. With .join()
// on a never-completing CF, the runnable blocks indefinitely.
val completed = try {
future.get(500, TimeUnit.MILLISECONDS)
true
} catch (ex: java.util.concurrent.TimeoutException) {
false
}
assertThat(completed)
.withFailMessage(
"executor.submit() future should be done immediately (fire-and-forget); " +
"it never completed, which means the controller still .join()s on the phase CF."
)
.isTrue()
} finally {
capturingExecutor.shutdownNow()
}
}

@Test
@Timeout(value = 5, unit = TimeUnit.SECONDS)
fun `POST trigger phase returns 202 immediately without blocking on phase completion`() {
val capturedFutures = java.util.concurrent.ConcurrentLinkedQueue<java.util.concurrent.Future<*>>()
val capturingExecutor = java.util.concurrent.Executors.newSingleThreadExecutor { r ->
Thread(r, "test-capturing").apply { isDaemon = true }
}
val instrumentedExecutor = object : java.util.concurrent.AbstractExecutorService() {
override fun shutdown() = capturingExecutor.shutdown()
override fun shutdownNow(): MutableList<Runnable> = capturingExecutor.shutdownNow()
override fun isShutdown(): Boolean = capturingExecutor.isShutdown
override fun isTerminated(): Boolean = capturingExecutor.isTerminated
override fun awaitTermination(timeout: Long, unit: java.util.concurrent.TimeUnit): Boolean =
capturingExecutor.awaitTermination(timeout, unit)
override fun execute(command: Runnable) {
capturedFutures.add(capturingExecutor.submit(command))
}
}
try {
val neverCompleting = CompletableFuture<Void>()
// Stub: any 4-arg call (with loopId defaulting to null) returns never-completing CF
whenever(scheduler.triggerPhase(any(), any(), any(), anyOrNull())).thenReturn(neverCompleting)

val syncController = InternalApiController(
runStatusTracker, scheduler, instrumentedExecutor, phaseLoopController,
)
val syncMockMvc = standaloneSetup(syncController)
.setMessageConverters(MappingJackson2HttpMessageConverter(objectMapper))
.build()

syncMockMvc.perform(
post("/api/internal/trigger/phase/RANKING_FETCH")
.header("X-Upstream-Run-Id", "u-1"),
)
.andExpect(status().isAccepted)
.andExpect(jsonPath("$.status").value("STARTED"))

val future = capturedFutures.peek()
?: error("expected exactly one submitted task; queue was empty")
val completed = try {
future.get(500, TimeUnit.MILLISECONDS)
true
} catch (ex: java.util.concurrent.TimeoutException) {
false
}
assertThat(completed)
.withFailMessage(
"executor.submit() future should be done immediately (fire-and-forget); " +
"it never completed, which means the controller still .join()s on the phase CF."
)
.isTrue()
} finally {
capturingExecutor.shutdownNow()
}
}

@Test
fun `POST trigger phase returns 202 with runId when slot empty`() {
val runStatusTracker = mock<RunStatusTracker>()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package maple.externalapi.test

import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test
import java.io.File

// CI gate: fails the build if blocking primitives reappear in
// module-external-api runstatus/ or scheduler/ main sources.
//
// Blocking primitives checked:
// - .join() on a CompletableFuture chain (sync coercion of async pipeline)
// - runBlocking { (Kotlin coroutine bridge that pins carrier thread)
// - Task.join() (JavaFX / Task join - out of scope here, but cheap to catch)
// - Thread.sleep( (sleeping inside async pipeline is always wrong)
//
// Allowlist rationale (each entry is a documented exception, not a free pass):
// - ExternalApiScheduler.kt runBlocking: documented acceptable VT-carrier
// bridge per async-patterns.md (see audit + ADR-blocking-async-contract-cf-chain).
// Path-level allowlist for runBlocking ONLY - must NOT introduce .join(),
// Thread.sleep, or Task.join().
// - Coroutine Job.join() (e.g. writerJob.join()) inside the documented
// runBlocking block: legitimate Kotlin coroutine bridge.
// - Comments and Deprecated shim bodies: not new blocking code.
class ExtApiBlockingPrimitiveGateTest {
@Test
fun `no blocking primitives in module-external-api runstatus or scheduler main sources`() {
val srcRoots = listOf(
File("src/main/kotlin/maple/externalapi/runstatus"),
File("src/main/kotlin/maple/externalapi/scheduler"),
)

val violations = mutableListOf<String>()
val patterns = listOf(
Regex("""\.join\(\)"""),
Regex("""runBlocking\s*\{"""),
Regex("""Task\.join\(\)"""),
Regex("""Thread\.sleep\s*\("""),
)

srcRoots.forEach { srcRoot ->
if (!srcRoot.exists()) return@forEach
srcRoot.walkTopDown()
.filter { it.extension in listOf("kt", "java") }
.forEach { file ->
file.readLines().forEachIndexed { i, line ->
val trimmed = line.trim()
if (trimmed.isEmpty()) return@forEachIndexed
if (patterns.any { it.containsMatchIn(trimmed) } && !isAllowlisted(file, i, trimmed)) {
violations.add("${file.path}:${i + 1}: $trimmed")
}
}
}
}

assertThat(violations)
.withFailMessage("Blocking primitives found:\n${violations.joinToString("\n")}")
.isEmpty()
}

private fun isAllowlisted(file: File, line: Int, text: String): Boolean {
val path = file.absolutePath
val isComment = text.startsWith("//") ||
text.startsWith("*") ||
text.startsWith("/*")
val isDeprecatedLine = text.contains("@Deprecated")

// ExternalApiScheduler.kt runBlocking: documented acceptable VT-carrier bridge
val isLegacyRunBlocking = path.contains("ExternalApiScheduler") &&
text.contains("runBlocking")

// Coroutine Job.join() (e.g. writerJob.join()) inside the documented runBlocking block
val isCoroutineJobJoin = text.matches(Regex("""[a-zA-Z_][a-zA-Z0-9_]*Job\.join\(\).*"""))

return isComment || isDeprecatedLine || isLegacyRunBlocking || isCoroutineJobJoin
}
}