diff --git a/module-external-api/src/main/kotlin/maple/externalapi/scheduler/phase/CharacterBasicFetchPhase.kt b/module-external-api/src/main/kotlin/maple/externalapi/scheduler/phase/CharacterBasicFetchPhase.kt index 3ce667440..d79689637 100644 --- a/module-external-api/src/main/kotlin/maple/externalapi/scheduler/phase/CharacterBasicFetchPhase.kt +++ b/module-external-api/src/main/kotlin/maple/externalapi/scheduler/phase/CharacterBasicFetchPhase.kt @@ -88,21 +88,19 @@ class CharacterBasicFetchPhase( val dispatcher = workerExecutor.asCoroutineDispatcher() return CoroutineScope(dispatcher).future { - try { - val (successCount, failCount) = batchSupport.processBatch( - rateLimiter, - entries, - batchSize, - ctx, - sink, - effectiveRunId, - start, - ) - schedulerProgressLogger.logSummary("character-basic", entries.size, successCount, successCount, failCount, start) - } finally { - sink.close() - metrics.characterBasicTimer().record(Duration.between(start, Instant.now(clock))) - } + val (successCount, failCount) = batchSupport.processBatch( + rateLimiter, + entries, + batchSize, + ctx, + sink, + effectiveRunId, + start, + ) + schedulerProgressLogger.logSummary("character-basic", entries.size, successCount, successCount, failCount, start) + }.thenCompose { + metrics.characterBasicTimer().record(Duration.between(start, Instant.now(clock))) + sink.closeAsync().thenApply { Unit } } } } diff --git a/module-external-api/src/main/kotlin/maple/externalapi/scheduler/phase/ItemEquipmentFetchPhase.kt b/module-external-api/src/main/kotlin/maple/externalapi/scheduler/phase/ItemEquipmentFetchPhase.kt index 4c1f1a595..3dae3d2ef 100644 --- a/module-external-api/src/main/kotlin/maple/externalapi/scheduler/phase/ItemEquipmentFetchPhase.kt +++ b/module-external-api/src/main/kotlin/maple/externalapi/scheduler/phase/ItemEquipmentFetchPhase.kt @@ -81,21 +81,19 @@ class ItemEquipmentFetchPhase( val dispatcher = workerExecutor.asCoroutineDispatcher() return CoroutineScope(dispatcher).future { - try { - val (successCount, failCount) = batchSupport.processBatch( - rateLimiter, - entries, - batchSize, - ctx, - sink, - effectiveRunId, - start, - ) - schedulerProgressLogger.logSummary("item-equipment", entries.size, successCount, successCount, failCount, start) - } finally { - sink.close() - metrics.itemEquipmentTimer().record(Duration.between(start, Instant.now(clock))) - } + val (successCount, failCount) = batchSupport.processBatch( + rateLimiter, + entries, + batchSize, + ctx, + sink, + effectiveRunId, + start, + ) + schedulerProgressLogger.logSummary("item-equipment", entries.size, successCount, successCount, failCount, start) + }.thenCompose { + metrics.itemEquipmentTimer().record(Duration.between(start, Instant.now(clock))) + sink.closeAsync().thenApply { Unit } } } } diff --git a/module-external-api/src/main/kotlin/maple/externalapi/scheduler/phase/RankingFetchPhase.kt b/module-external-api/src/main/kotlin/maple/externalapi/scheduler/phase/RankingFetchPhase.kt index f294f353e..9cd8e223a 100644 --- a/module-external-api/src/main/kotlin/maple/externalapi/scheduler/phase/RankingFetchPhase.kt +++ b/module-external-api/src/main/kotlin/maple/externalapi/scheduler/phase/RankingFetchPhase.kt @@ -89,8 +89,8 @@ class RankingFetchPhase( val start = Instant.now() return processPages(workerExecutor, sink, rateLimiter, date, 1, fetched, failed) + .thenCompose { sink.closeAsync() } .whenComplete { _, ex -> - sink.close() if (ex != null) { log.error("[RankingFetch] failed: runId={}, fetched={}, failed={}", runId, fetched.get(), failed.get(), ex) } else { diff --git a/module-external-api/src/main/kotlin/maple/externalapi/snapshot/ChunkFileManager.kt b/module-external-api/src/main/kotlin/maple/externalapi/snapshot/ChunkFileManager.kt index 2bdaf7f0d..a376605a0 100644 --- a/module-external-api/src/main/kotlin/maple/externalapi/snapshot/ChunkFileManager.kt +++ b/module-external-api/src/main/kotlin/maple/externalapi/snapshot/ChunkFileManager.kt @@ -125,7 +125,7 @@ class ChunkFileManager( * uploads are logged but not rethrown — the sink's run-completed event * is the place to surface them. */ - fun awaitAllUploads(timeoutMs: Long = 600_000L): Boolean { + fun awaitAllUploads(timeoutMs: Long = DEFAULT_AWAIT_TIMEOUT_MS): Boolean { if (inFlightUploads.isEmpty()) return true val all = CompletableFuture.allOf(*inFlightUploads.toTypedArray()) return try { @@ -147,6 +147,38 @@ class ChunkFileManager( } } + /** + * Async variant of [awaitAllUploads]. Returns a [CompletableFuture] that + * completes with `true` when all in-flight uploads succeed and `false` on + * timeout or individual upload failure. Never blocks the calling thread + * — callers chain via `thenCompose` / `whenComplete` to keep the writer + * thread (or the scheduler's CF chain) free. + * + * @param timeoutMs hard timeout (default 10 minutes, matches sync variant) + */ + fun awaitAllUploadsAsync( + timeoutMs: Long = DEFAULT_AWAIT_TIMEOUT_MS, + ): CompletableFuture { + if (inFlightUploads.isEmpty()) return CompletableFuture.completedFuture(true) + val all = CompletableFuture.allOf(*inFlightUploads.toTypedArray()) + return all + .thenApply { true } + .orTimeout(timeoutMs, TimeUnit.MILLISECONDS) + .exceptionally { ex -> + val cause = ex.cause ?: ex + if (cause is java.util.concurrent.TimeoutException) { + log.error( + "[ChunkFileManager] awaitAllUploadsAsync timed out after {}ms (in-flight: {})", + timeoutMs, + inFlightUploads.size, + ) + } else { + log.error("[ChunkFileManager] awaitAllUploadsAsync failed: {}", cause.message, cause) + } + false + } + } + fun inFlightUploadCount(): Int = inFlightUploads.size fun writeManifestAndSuccessMarker() { @@ -194,4 +226,12 @@ class ChunkFileManager( startedAt = stats.startedAt, finishedAt = stats.finishedAt, ) + + companion object { + /** + * Default hard timeout for [awaitAllUploads] / [awaitAllUploadsAsync]. + * 10 minutes is generous for 128MB × N chunks on a healthy MinIO. + */ + const val DEFAULT_AWAIT_TIMEOUT_MS: Long = 600_000L + } } diff --git a/module-external-api/src/main/kotlin/maple/externalapi/snapshot/ChunkedSnapshotSink.kt b/module-external-api/src/main/kotlin/maple/externalapi/snapshot/ChunkedSnapshotSink.kt index c7fa7c5c9..a62824dc0 100644 --- a/module-external-api/src/main/kotlin/maple/externalapi/snapshot/ChunkedSnapshotSink.kt +++ b/module-external-api/src/main/kotlin/maple/externalapi/snapshot/ChunkedSnapshotSink.kt @@ -1,6 +1,7 @@ package maple.externalapi.snapshot import java.util.concurrent.ArrayBlockingQueue +import java.util.concurrent.CompletableFuture import java.util.concurrent.ExecutorService import java.util.concurrent.Executors import java.util.concurrent.Future @@ -28,6 +29,16 @@ class ChunkedSnapshotSink( runWriterLoop() } + /** + * Dedicated executor for async-orchestration waits in [closeAsync] (e.g. + * `awaitTermination`). Kept separate from [writerExecutor] because once + * `writerExecutor.shutdown()` is called it rejects new tasks, and we need + * a still-live executor to host the post-shutdown wait task. + */ + private val closeAsyncExecutor: ExecutorService = Executors.newSingleThreadExecutor { runnable -> + Thread.ofPlatform().name("snapshot-close-async-$endpoint").unstarted(runnable) + } + fun submit(record: SnapshotChunkRecord) { if (!accepting.get()) { val err = writerError.get() @@ -56,6 +67,7 @@ class ChunkedSnapshotSink( fun queueDepth(): Int = queue.size + @Deprecated("Use closeAsync() for non-blocking shutdown; this sync variant holds the calling thread for up to ~10 minutes") fun close() { accepting.set(false) try { @@ -117,6 +129,99 @@ class ChunkedSnapshotSink( } } + /** + * Async variant of [close]. Performs the same work — shutdown writer, close + * current chunk, await in-flight uploads, write manifest, publish run event — + * but chains via [CompletableFuture] so the calling thread is never blocked. + * + * Audit reference: docs/05_Reports/2026-06-18-blocking-audit.md line 69. + * Replaces the blocking `all.get(600_000L, TimeUnit.MILLISECONDS)` in the + * previous sync path. + * + * @return CF that completes normally on successful close, exceptionally on + * any failure (writer error, upload timeout, manifest write error). + */ + fun closeAsync(): CompletableFuture { + accepting.set(false) + // Enqueue CloseSignal from a non-writer thread. The writer thread is + // already blocked in queue.take() waiting for the signal — using the + // single-thread writerExecutor to enqueue it would deadlock the writer + // against itself. closeAsyncExecutor hosts the offer and the post-shutdown + // awaitTermination wait. + val closeSignalFuture = CompletableFuture.runAsync( + { + if (!queue.offer(SnapshotChunkRecord.CloseSignal, 30, TimeUnit.SECONDS)) { + throw IllegalStateException("failed to enqueue close signal after 30s") + } + }, + closeAsyncExecutor, + ) + + val writerDoneFuture = closeSignalFuture.thenCompose { + writerExecutor.shutdown() + CompletableFuture.runAsync( + { + if (!writerExecutor.awaitTermination(60, TimeUnit.SECONDS)) { + log.warn("[Sink] writer executor did not terminate within 60s, forcing shutdown") + writerExecutor.shutdownNow() + } + }, + closeAsyncExecutor, + ) + } + + return writerDoneFuture + .thenCompose { + val err = writerError.get() + val manifest = fileManager.manifest() + if (err != null) { + fileManager.cleanupOnFailure() + eventPublisher.publishRunFailed(manifest, endpoint, err.message ?: "unknown") + val failed: CompletableFuture = CompletableFuture() + failed.completeExceptionally(RuntimeException("writer thread failed: ${err.message}", err)) + return@thenCompose failed + } + + // Close current chunk and register its upload. + fileManager.closeCurrentChunk()?.let { stats -> publishWhenUploaded(stats) } + + // Wait for all fire-and-forget chunk uploads to complete + // BEFORE writing the manifest (otherwise manifest references + // chunks not yet in MinIO — calculator/sync could read incomplete data). + fileManager.awaitAllUploadsAsync(ChunkFileManager.DEFAULT_AWAIT_TIMEOUT_MS).thenCompose { allUploaded -> + if (!allUploaded) { + val msg = "chunk uploads did not complete in time (in-flight=${fileManager.inFlightUploadCount()})" + fileManager.cleanupOnFailure() + eventPublisher.publishRunFailed(manifest, endpoint, msg) + val failed: CompletableFuture = CompletableFuture() + failed.completeExceptionally(RuntimeException(msg)) + return@thenCompose failed + } + + fileManager.writeManifestAndSuccessMarker() + fileManager.deleteRunningMarker() + + log.info( + "[Sink] closed: endpoint={}, chunks={}, records={}, failed={}", + endpoint, + manifest.chunks.size, + manifest.totalRecords, + manifest.totalFailed, + ) + + eventPublisher.publishRunCompleted(manifest, endpoint) + CompletableFuture.completedFuture(null) + } + } + .whenComplete { _, _ -> + // Ensure writerExecutor is fully terminated even if any step throws. + if (!writerExecutor.isTerminated) { + writerExecutor.shutdownNow() + } + closeAsyncExecutor.shutdown() + } + } + private fun runWriterLoop() { try { while (true) { diff --git a/module-external-api/src/test/kotlin/maple/externalapi/snapshot/ChunkFileManagerAsyncTest.kt b/module-external-api/src/test/kotlin/maple/externalapi/snapshot/ChunkFileManagerAsyncTest.kt new file mode 100644 index 000000000..595b34f88 --- /dev/null +++ b/module-external-api/src/test/kotlin/maple/externalapi/snapshot/ChunkFileManagerAsyncTest.kt @@ -0,0 +1,105 @@ +package maple.externalapi.snapshot + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.databind.SerializationFeature +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule +import com.fasterxml.jackson.module.kotlin.kotlinModule +import maple.expectation.common.storage.ObjectStorage +import maple.expectation.common.storage.PutResult +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.Test +import org.mockito.kotlin.any +import org.mockito.kotlin.mock +import org.mockito.kotlin.whenever +import java.nio.file.Files +import java.nio.file.Path +import java.time.Clock +import java.time.Instant +import java.util.concurrent.CompletableFuture +import java.util.concurrent.TimeUnit + +/** + * Async contract tests for [ChunkFileManager.awaitAllUploadsAsync]. + * + * Sub-PR 4 (audit reference: docs/05_Reports/2026-06-18-blocking-audit.md line 69) + * replaces the blocking `.get(600_000L, TimeUnit.MILLISECONDS)` in + * [ChunkFileManager.awaitAllUploads] with a CF-returning variant so callers can + * chain via `thenCompose` without holding a thread hostage on a 10-minute timeout. + * + * Two guarantees under test: + * - Empty in-flight list returns a CF that is already completed with `true` + * synchronously (no scheduling latency, no thread hop). + * - Non-empty in-flight list returns a CF that has NOT yet completed + * (i.e. the call site does not block waiting for uploads). + */ +class ChunkFileManagerAsyncTest { + + private val objectMapper = ObjectMapper() + .registerModule(kotlinModule()) + .registerModule(JavaTimeModule()) + .disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS) + + @Test + fun `awaitAllUploadsAsync returns completed CF true when no in-flight uploads`() { + val storage = mock() + + val manager = ChunkFileManager( + runKey = "runs/test/empty", + endpoint = "test", + maxRecords = 100, + maxUncompressedBytes = 1_000_000, + objectMapper = objectMapper, + clock = Clock.systemUTC(), + objectStorage = storage, + ) + + val future: CompletableFuture = manager.awaitAllUploadsAsync() + + assertThat(future.isDone).isEqualTo(true) + assertThat(future.get(1, TimeUnit.SECONDS)).isEqualTo(true) + assertThat(manager.inFlightUploadCount()).isEqualTo(0) + } + + @Test + fun `awaitAllUploadsAsync returns CF immediately without blocking on uploads`() { + val storage = mock() + val neverCompletes = CompletableFuture() + whenever(storage.putFileAsync(any(), any())).thenReturn(neverCompletes) + + val manager = ChunkFileManager( + runKey = "runs/test/inflight", + endpoint = "test", + maxRecords = 1, + maxUncompressedBytes = 1_000_000, + objectMapper = objectMapper, + clock = Clock.systemUTC(), + objectStorage = storage, + ) + + // Trigger one rotation: appendSuccess with maxRecords=1 forces rotation, + // which registers the never-completing upload future into inFlightUploads. + manager.appendSuccess( + SnapshotChunkRecord.Success( + bodyBytes = objectMapper.writeValueAsBytes(mapOf("k" to "v")), + key = "k1", + endpoint = "test", + keyType = "TEST", + httpStatus = 200, + fetchedAt = Instant.parse("2026-06-18T00:00:00Z"), + ), + ) + assertThat(manager.inFlightUploadCount()).isEqualTo(1) + + // The CF must be returned synchronously (well under 100ms) and + // must NOT be done yet — otherwise the call would block on uploads. + val start = System.nanoTime() + val future = manager.awaitAllUploadsAsync() + val returnMs = (System.nanoTime() - start) / 1_000_000 + + assertThat(returnMs) + .withFailMessage("awaitAllUploadsAsync should return synchronously but took ${returnMs}ms") + .isLessThan(100) + val isDone: Boolean = future.isDone + assertThat(isDone).isFalse() + } +} diff --git a/module-external-api/src/test/kotlin/maple/externalapi/test/ExtApiBlockingPrimitiveGateTest.kt b/module-external-api/src/test/kotlin/maple/externalapi/test/ExtApiBlockingPrimitiveGateTest.kt index 8cf353774..9bca2079e 100644 --- a/module-external-api/src/test/kotlin/maple/externalapi/test/ExtApiBlockingPrimitiveGateTest.kt +++ b/module-external-api/src/test/kotlin/maple/externalapi/test/ExtApiBlockingPrimitiveGateTest.kt @@ -5,7 +5,7 @@ import org.junit.jupiter.api.Test import java.io.File // CI gate: fails the build if blocking primitives reappear in -// module-external-api runstatus/ or scheduler/ main sources. +// module-external-api runstatus/, scheduler/, or snapshot/ main sources. // // Blocking primitives checked: // - .join() on a CompletableFuture chain (sync coercion of async pipeline) @@ -23,10 +23,11 @@ import java.io.File // - Comments and Deprecated shim bodies: not new blocking code. class ExtApiBlockingPrimitiveGateTest { @Test - fun `no blocking primitives in module-external-api runstatus or scheduler main sources`() { + fun `no blocking primitives in module-external-api runstatus scheduler or snapshot main sources`() { val srcRoots = listOf( File("src/main/kotlin/maple/externalapi/runstatus"), File("src/main/kotlin/maple/externalapi/scheduler"), + File("src/main/kotlin/maple/externalapi/snapshot"), ) val violations = mutableListOf()