Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,21 +88,19 @@ class CharacterBasicFetchPhase(

val dispatcher = workerExecutor.asCoroutineDispatcher()
return CoroutineScope(dispatcher).future {
try {
val (successCount, failCount) = batchSupport.processBatch(
rateLimiter,
entries,
batchSize,
ctx,
sink,
effectiveRunId,
start,
)
schedulerProgressLogger.logSummary("character-basic", entries.size, successCount, successCount, failCount, start)
} finally {
sink.close()
metrics.characterBasicTimer().record(Duration.between(start, Instant.now(clock)))
}
val (successCount, failCount) = batchSupport.processBatch(
rateLimiter,
entries,
batchSize,
ctx,
sink,
effectiveRunId,
start,
)
schedulerProgressLogger.logSummary("character-basic", entries.size, successCount, successCount, failCount, start)
}.thenCompose {
metrics.characterBasicTimer().record(Duration.between(start, Instant.now(clock)))
sink.closeAsync().thenApply { Unit }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,21 +81,19 @@ class ItemEquipmentFetchPhase(

val dispatcher = workerExecutor.asCoroutineDispatcher()
return CoroutineScope(dispatcher).future {
try {
val (successCount, failCount) = batchSupport.processBatch(
rateLimiter,
entries,
batchSize,
ctx,
sink,
effectiveRunId,
start,
)
schedulerProgressLogger.logSummary("item-equipment", entries.size, successCount, successCount, failCount, start)
} finally {
sink.close()
metrics.itemEquipmentTimer().record(Duration.between(start, Instant.now(clock)))
}
val (successCount, failCount) = batchSupport.processBatch(
rateLimiter,
entries,
batchSize,
ctx,
sink,
effectiveRunId,
start,
)
schedulerProgressLogger.logSummary("item-equipment", entries.size, successCount, successCount, failCount, start)
}.thenCompose {
metrics.itemEquipmentTimer().record(Duration.between(start, Instant.now(clock)))
sink.closeAsync().thenApply { Unit }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ class RankingFetchPhase(
val start = Instant.now()

return processPages(workerExecutor, sink, rateLimiter, date, 1, fetched, failed)
.thenCompose { sink.closeAsync() }
.whenComplete { _, ex ->
sink.close()
if (ex != null) {
log.error("[RankingFetch] failed: runId={}, fetched={}, failed={}", runId, fetched.get(), failed.get(), ex)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ class ChunkFileManager(
* uploads are logged but not rethrown — the sink's run-completed event
* is the place to surface them.
*/
fun awaitAllUploads(timeoutMs: Long = 600_000L): Boolean {
fun awaitAllUploads(timeoutMs: Long = DEFAULT_AWAIT_TIMEOUT_MS): Boolean {
if (inFlightUploads.isEmpty()) return true
val all = CompletableFuture.allOf(*inFlightUploads.toTypedArray())
return try {
Expand All @@ -147,6 +147,38 @@ class ChunkFileManager(
}
}

/**
* Async variant of [awaitAllUploads]. Returns a [CompletableFuture] that
* completes with `true` when all in-flight uploads succeed and `false` on
* timeout or individual upload failure. Never blocks the calling thread
* — callers chain via `thenCompose` / `whenComplete` to keep the writer
* thread (or the scheduler's CF chain) free.
*
* @param timeoutMs hard timeout (default 10 minutes, matches sync variant)
*/
fun awaitAllUploadsAsync(
timeoutMs: Long = DEFAULT_AWAIT_TIMEOUT_MS,
): CompletableFuture<Boolean> {
if (inFlightUploads.isEmpty()) return CompletableFuture.completedFuture(true)
val all = CompletableFuture.allOf(*inFlightUploads.toTypedArray())
return all
.thenApply { true }
.orTimeout(timeoutMs, TimeUnit.MILLISECONDS)
.exceptionally { ex ->
val cause = ex.cause ?: ex
if (cause is java.util.concurrent.TimeoutException) {
log.error(
"[ChunkFileManager] awaitAllUploadsAsync timed out after {}ms (in-flight: {})",
timeoutMs,
inFlightUploads.size,
)
} else {
log.error("[ChunkFileManager] awaitAllUploadsAsync failed: {}", cause.message, cause)
}
false
}
}

fun inFlightUploadCount(): Int = inFlightUploads.size

fun writeManifestAndSuccessMarker() {
Expand Down Expand Up @@ -194,4 +226,12 @@ class ChunkFileManager(
startedAt = stats.startedAt,
finishedAt = stats.finishedAt,
)

companion object {
/**
* Default hard timeout for [awaitAllUploads] / [awaitAllUploadsAsync].
* 10 minutes is generous for 128MB × N chunks on a healthy MinIO.
*/
const val DEFAULT_AWAIT_TIMEOUT_MS: Long = 600_000L
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package maple.externalapi.snapshot

import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.Future
Expand Down Expand Up @@ -28,6 +29,16 @@ class ChunkedSnapshotSink(
runWriterLoop()
}

/**
* Dedicated executor for async-orchestration waits in [closeAsync] (e.g.
* `awaitTermination`). Kept separate from [writerExecutor] because once
* `writerExecutor.shutdown()` is called it rejects new tasks, and we need
* a still-live executor to host the post-shutdown wait task.
*/
private val closeAsyncExecutor: ExecutorService = Executors.newSingleThreadExecutor { runnable ->
Thread.ofPlatform().name("snapshot-close-async-$endpoint").unstarted(runnable)
}

fun submit(record: SnapshotChunkRecord) {
if (!accepting.get()) {
val err = writerError.get()
Expand Down Expand Up @@ -56,6 +67,7 @@ class ChunkedSnapshotSink(

fun queueDepth(): Int = queue.size

@Deprecated("Use closeAsync() for non-blocking shutdown; this sync variant holds the calling thread for up to ~10 minutes")
fun close() {
accepting.set(false)
try {
Expand Down Expand Up @@ -117,6 +129,99 @@ class ChunkedSnapshotSink(
}
}

/**
* Async variant of [close]. Performs the same work — shutdown writer, close
* current chunk, await in-flight uploads, write manifest, publish run event —
* but chains via [CompletableFuture] so the calling thread is never blocked.
*
* Audit reference: docs/05_Reports/2026-06-18-blocking-audit.md line 69.
* Replaces the blocking `all.get(600_000L, TimeUnit.MILLISECONDS)` in the
* previous sync path.
*
* @return CF that completes normally on successful close, exceptionally on
* any failure (writer error, upload timeout, manifest write error).
*/
fun closeAsync(): CompletableFuture<Void> {
accepting.set(false)
// Enqueue CloseSignal from a non-writer thread. The writer thread is
// already blocked in queue.take() waiting for the signal — using the
// single-thread writerExecutor to enqueue it would deadlock the writer
// against itself. closeAsyncExecutor hosts the offer and the post-shutdown
// awaitTermination wait.
val closeSignalFuture = CompletableFuture.runAsync(
{
if (!queue.offer(SnapshotChunkRecord.CloseSignal, 30, TimeUnit.SECONDS)) {
throw IllegalStateException("failed to enqueue close signal after 30s")
}
},
closeAsyncExecutor,
)

val writerDoneFuture = closeSignalFuture.thenCompose {
writerExecutor.shutdown()
CompletableFuture.runAsync(
{
if (!writerExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
log.warn("[Sink] writer executor did not terminate within 60s, forcing shutdown")
writerExecutor.shutdownNow()
}
},
closeAsyncExecutor,
)
}

return writerDoneFuture
.thenCompose {
val err = writerError.get()
val manifest = fileManager.manifest()
if (err != null) {
fileManager.cleanupOnFailure()
eventPublisher.publishRunFailed(manifest, endpoint, err.message ?: "unknown")
val failed: CompletableFuture<Void> = CompletableFuture()
failed.completeExceptionally(RuntimeException("writer thread failed: ${err.message}", err))
return@thenCompose failed
}

// Close current chunk and register its upload.
fileManager.closeCurrentChunk()?.let { stats -> publishWhenUploaded(stats) }

// Wait for all fire-and-forget chunk uploads to complete
// BEFORE writing the manifest (otherwise manifest references
// chunks not yet in MinIO — calculator/sync could read incomplete data).
fileManager.awaitAllUploadsAsync(ChunkFileManager.DEFAULT_AWAIT_TIMEOUT_MS).thenCompose { allUploaded ->
if (!allUploaded) {
val msg = "chunk uploads did not complete in time (in-flight=${fileManager.inFlightUploadCount()})"
fileManager.cleanupOnFailure()
eventPublisher.publishRunFailed(manifest, endpoint, msg)
val failed: CompletableFuture<Void> = CompletableFuture()
failed.completeExceptionally(RuntimeException(msg))
return@thenCompose failed
}

fileManager.writeManifestAndSuccessMarker()
fileManager.deleteRunningMarker()

log.info(
"[Sink] closed: endpoint={}, chunks={}, records={}, failed={}",
endpoint,
manifest.chunks.size,
manifest.totalRecords,
manifest.totalFailed,
)

eventPublisher.publishRunCompleted(manifest, endpoint)
CompletableFuture.completedFuture<Void>(null)
}
}
.whenComplete { _, _ ->
// Ensure writerExecutor is fully terminated even if any step throws.
if (!writerExecutor.isTerminated) {
writerExecutor.shutdownNow()
}
closeAsyncExecutor.shutdown()
}
}

private fun runWriterLoop() {
try {
while (true) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package maple.externalapi.snapshot

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.SerializationFeature
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
import com.fasterxml.jackson.module.kotlin.kotlinModule
import maple.expectation.common.storage.ObjectStorage
import maple.expectation.common.storage.PutResult
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test
import org.mockito.kotlin.any
import org.mockito.kotlin.mock
import org.mockito.kotlin.whenever
import java.nio.file.Files
import java.nio.file.Path
import java.time.Clock
import java.time.Instant
import java.util.concurrent.CompletableFuture
import java.util.concurrent.TimeUnit

/**
* Async contract tests for [ChunkFileManager.awaitAllUploadsAsync].
*
* Sub-PR 4 (audit reference: docs/05_Reports/2026-06-18-blocking-audit.md line 69)
* replaces the blocking `.get(600_000L, TimeUnit.MILLISECONDS)` in
* [ChunkFileManager.awaitAllUploads] with a CF-returning variant so callers can
* chain via `thenCompose` without holding a thread hostage on a 10-minute timeout.
*
* Two guarantees under test:
* - Empty in-flight list returns a CF that is already completed with `true`
* synchronously (no scheduling latency, no thread hop).
* - Non-empty in-flight list returns a CF that has NOT yet completed
* (i.e. the call site does not block waiting for uploads).
*/
class ChunkFileManagerAsyncTest {

private val objectMapper = ObjectMapper()
.registerModule(kotlinModule())
.registerModule(JavaTimeModule())
.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)

@Test
fun `awaitAllUploadsAsync returns completed CF true when no in-flight uploads`() {
val storage = mock<ObjectStorage>()

val manager = ChunkFileManager(
runKey = "runs/test/empty",
endpoint = "test",
maxRecords = 100,
maxUncompressedBytes = 1_000_000,
objectMapper = objectMapper,
clock = Clock.systemUTC(),
objectStorage = storage,
)

val future: CompletableFuture<Boolean> = manager.awaitAllUploadsAsync()

assertThat(future.isDone).isEqualTo(true)
assertThat(future.get(1, TimeUnit.SECONDS)).isEqualTo(true)
assertThat(manager.inFlightUploadCount()).isEqualTo(0)
}

@Test
fun `awaitAllUploadsAsync returns CF immediately without blocking on uploads`() {
val storage = mock<ObjectStorage>()
val neverCompletes = CompletableFuture<PutResult>()
whenever(storage.putFileAsync(any<String>(), any<Path>())).thenReturn(neverCompletes)

val manager = ChunkFileManager(
runKey = "runs/test/inflight",
endpoint = "test",
maxRecords = 1,
maxUncompressedBytes = 1_000_000,
objectMapper = objectMapper,
clock = Clock.systemUTC(),
objectStorage = storage,
)

// Trigger one rotation: appendSuccess with maxRecords=1 forces rotation,
// which registers the never-completing upload future into inFlightUploads.
manager.appendSuccess(
SnapshotChunkRecord.Success(
bodyBytes = objectMapper.writeValueAsBytes(mapOf("k" to "v")),
key = "k1",
endpoint = "test",
keyType = "TEST",
httpStatus = 200,
fetchedAt = Instant.parse("2026-06-18T00:00:00Z"),
),
)
assertThat(manager.inFlightUploadCount()).isEqualTo(1)

// The CF must be returned synchronously (well under 100ms) and
// must NOT be done yet — otherwise the call would block on uploads.
val start = System.nanoTime()
val future = manager.awaitAllUploadsAsync()
val returnMs = (System.nanoTime() - start) / 1_000_000

assertThat(returnMs)
.withFailMessage("awaitAllUploadsAsync should return synchronously but took ${returnMs}ms")
.isLessThan(100)
val isDone: Boolean = future.isDone
assertThat(isDone).isFalse()
}
}
Loading