diff --git a/docs/05_Reports/2026-06-18-blocking-audit.md b/docs/05_Reports/2026-06-18-blocking-audit.md new file mode 100644 index 000000000..77d30aa64 --- /dev/null +++ b/docs/05_Reports/2026-06-18-blocking-audit.md @@ -0,0 +1,147 @@ +# CF Chain Blocking Audit — 2026-06-18 + +- Date: 2026-06-18 +- Status: **Audit only. No code changes. PR not created.** +- Author: claude-code + explore subagents +- Branch at audit: `feature/issue-CF-CHAIN-blocking-fix` (no commits) + +## Background + +Original goal: drop sync return contract of `LogicExecutor`, `Lock`, `SingleFlight`, `TieredCache` and migrate all callers to pure `CompletableFuture` end-to-end. + +This audit (not the original plan) was created **after** the original plan proved wrong on key structural assumptions. This file documents actual code state, real blocking sites, and the path forward for any future PR. + +## Original plan files (retained for reference) + +- `docs/superpowers/specs/2026-06-18-ext-api-blocking-fix-design.md` — design (aspirational, based on wrong assumptions) +- `docs/01_ADR/ADR-blocking-async-contract-cf-chain.md` — ADR (proposed, not accepted) +- `docs/superpowers/plans/2026-06-18-ext-api-blocking-fix.md` — implementation plan (also wrong; ~30% of tasks applicable) + +**These 3 files are NOT a binding contract.** They document the brainstorm + plan exercise. Future PRs should write a fresh plan against actual code. + +## Original plan errors (so future work avoids them) + +| Plan assumed | Actual | +|---|---| +| `LogicExecutor` flat (7 sync + 7 async) | ISP composed: `LogicExecutor : BasicExecutor + SafeExecutor + ResilientExecutor` | +| `DefaultCheckedLogicExecutor` = `LogicExecutor` impl | Separate IO-boundary executor (`CheckedSupplier` input). Real `LogicExecutor` impl = `DefaultLogicExecutor` | +| `TaskContext.simple(...)` factory | Doesn't exist. Real factory: `TaskContext.of(component, operation)` or `TaskContext.of(component, operation, dynamicValue)` | +| `SingleFlight` needs `executeAsync` added | **Already exists** on `SingleFlightStrategy:50` interface + `PostgresSingleFlightStrategy:85` impl. Only `.join()` inside sync facade `:73-76` | +| PGMQ `MessageHandler.handle(): CF` | Doesn't exist. `process(): Boolean` (true=ACK, false=NACK). No `AckResult` class | +| `EquipmentFetchProvider` `.join()` removable | **Explicit ADR in class Javadoc (lines 16-44)**: `.join()` is intentional, kept for Spring `@Cacheable` sync return contract | +| `GlobalAdmissionControl` busy loop | V4 legacy only (V4 controllers). Not used by V5/ext-api. Plan T8 targets wrong component | +| InternalApiController return `CF>` | All endpoints return sync `ResponseEntity<...>` | +| 5 active module caller sites | **8 sites** — missed 3 `runBlocking` in `EquipmentRankingRedisWriter.kt:26, 31, 35` + 1 in `OcidLookupRunConsumer.kt:38` | + +## Real blocking sites (audited 2026-06-18) + +### module-infra + +| File:Line | Site | Severity | Notes | +|---|---|---|---| +| `lock/PostgresAdvisoryLockStrategy.kt:72, 117, 143` | `task.get()` inside `lockTransactionTemplate.execute` | HIGH | 3 sites. No async API on `LockStrategy` interface | +| `lock/PostgresLockStrategy.kt:97, 113, 120, 140, 145, 318, 338, 342, 351, 352, 356, 359` | `acquiredLocks.get()` / `lockOrder.get()` | LOW | All `ThreadLocal` reads. Non-blocking. False positive | +| `lock/OrderedLockExecutor.kt:143, 181` | `task.get()` after lock acquire | HIGH | 2 sites | +| `concurrency/PostgresSingleFlightStrategy.kt:75-76` | `.orTimeout().join()` inside sync `execute` facade | MEDIUM | `executeAsync` already exists; only the sync facade blocks | +| `concurrency/SingleFlightExecutor.kt` | none | n/a | Already CF-only | +| `cache/TieredCache.kt:126` | `buffer.submit(key).orTimeout(5, SECONDS).join()` | LOW | Single site, bounded by timeout, internal L2 batcher | +| `worker/ExternalApiWorker.kt:111` | `pipelineAsync(payload).join()` | HIGH | | +| `worker/ExternalApiWorker.kt:306-324` | `runBlocking(Dispatchers.Default)` | MEDIUM | CPU-bound calc | +| `worker/CalculationWorker.kt:83-91` | `.handle().join()` | HIGH | | +| `worker/OcidResolveWorker.kt:64-73` | `.join()` | HIGH | Topic subscriber not `PgmqWorker` | +| `worker/ResultReadyProjectionWorker.kt:81-90` | `.join()` × 2 | HIGH | Scheduled poller, not `PgmqWorker` | +| `worker/ResultReadyProjectionWorker.kt:123` | `runBlocking(Dispatchers.Default)` | MEDIUM | | +| `pgmq/PgmqWorker.kt:380-394` | `runBlocking(Dispatchers.Default)` + `async/awaitAll` | MEDIUM | Sequential batch coroutine path | +| `provider/EquipmentFetchProvider.kt:72` | `nexonApiClient.getItemDataByOcid(ocid).orTimeout(10, SECONDS).join()` | DOC-EXEMPT | **Class Javadoc documents `.join()` is intentional** (lines 16-44, Issue #118) | +| `security/filter/JwtAuthenticationFilter.kt:80-99` | `executor.executeWithFallback { ... }` — sync execution | LOW | Uses `LogicExecutor` for sync. Not blocking but synchronous | +| `admission/GlobalAdmissionControl.kt:238` | `while (running.get() && !Thread.interrupted)` | FALSE | V4 legacy. Not used by V5/ext-api | +| `java/.../service/starforce/StarforceLookupAdapter.java` | none on async path | n/a | Uses `LogicExecutor` for sync | +| `java/.../service/cube/component/CubeComputeBuffer.java` | none | n/a | `ConcurrentHashMap.get` not blocking | + +### module-external-api + +| File:Line | Site | Severity | Notes | +|---|---|---|---| +| `runstatus/InternalApiController.kt:83, 123` | `executor.submit { scheduler.*Async().join() }` | MEDIUM | Request thread returns 202 immediately. But internal `internalApiExecutor` worker blocks for whole phase | +| `scheduler/ExternalApiScheduler.kt:188` | `runBlocking { ocidLookupPhase.execute(executor, runKey, runId) }` | HIGH | Bridges `suspend` into CF chain. VT carrier is fine per async-patterns.md | +| `scheduler/phase/OcidLookupPhase.kt:147` | `writerJob.join()` | FALSE | `kotlinx.coroutines.Job.join()` IS a suspend function. Suspends in coroutine context, not blocks thread | +| `scheduler/phase/OcidLookupPhase.kt:148` | `putJob.await()` | FALSE | Coroutine `Deferred.await()` — proper suspend | +| `snapshot/ChunkFileManager.kt:132` | `all.get(600_000L, TimeUnit.MILLISECONDS)` | HIGH | 10-min hard blocking on writer thread | +| `snapshot/SnapshotFailedRecordWriter.kt` | none on async path | n/a | Sync I/O on single writer thread (ChunkFileManager thread-affinity L24-25) | +| `auth/AuthCharacterFetchConsumer.kt:51` | `characterListOpt.get()` | NULL-SAFETY | Violates `kotlin-null-safety.md` rule `.isPresent()+.get()`. Not blocking | +| `urgent/UrgentCharacterRequestConsumer.kt:53` | `semaphore.tryAcquire()` | OK | Non-blocking variant. Good | +| `urgent/UrgentCharacterRequestConsumer.kt:66` | `semaphore.release()` inside `whenComplete` only | MEDIUM | Permit leaks on CF cancel external. Need `tryAcquireAsync` + `finally` release or use `BackpressureLimiter` | +| `build.gradle:55` | `bootJar { enabled = true }` | TENSION | Per `workflow-rules.md` ext-api IS a boot service. Per `build-conventions.md` only `module-app` should enable. Both rules exist; depends on which you read | + +### Active module callers (5 files, 8 sites) + +| File:Line | Site | Severity | Notes | +|---|---|---|---| +| `module-synchronizer/.../consumer/ChunkConsumerTemplate.kt:99` | `request.executor.execute { logicExecutor.executeWithFinally(...) }` | LOW | Fire-and-forget dispatch | +| `module-synchronizer/.../consumer/OcidLookupRunConsumer.kt:35` | `executor.execute { runCatching { runBlocking(...) } }` | MEDIUM | | +| `module-synchronizer/.../consumer/OcidLookupRunConsumer.kt:38` | `runBlocking(Dispatchers.Default) { objectMapper.readValue(...) }` | MEDIUM | Sync bridge to coroutine | +| `module-synchronizer/.../ranking/EquipmentRankingRedisWriter.kt:26` | `runBlocking(Dispatchers.Default) { documents.filter { ... } }` | MEDIUM | | +| `module-synchronizer/.../ranking/EquipmentRankingRedisWriter.kt:31` | `runBlocking(Dispatchers.Default) { rankable.groupBy { ... } }` | MEDIUM | | +| `module-synchronizer/.../ranking/EquipmentRankingRedisWriter.kt:35` | `executor.executeOrDefault(...)` returning sync `Int` | MEDIUM | | +| `module-calculator/.../processor/CalculationCache.kt:75` | `return cache.get(key) { ... }` (Caffeine sync) | LOW | Sync return type. `cache.getAsync` doesn't exist on this Caffeine wrapper | +| `module-external-api/.../auth/AuthCharacterFetchConsumer.kt:42` | `executor.execute { runCatching { ... } }` | LOW | Fire-and-forget | + +### Out of scope (audit-confirmed) + +- `module-core`: 0 blocking primitive sites +- `module-common`: 0 blocking primitive sites +- `module-app` legacy (20+ Java files): 0 sites audited in this scope, but uses sync `LogicExecutor` extensively — follow-up PR territory + +## Recommended path forward + +### Sub-PR 1: Lock `*Async` API (HIGH impact, clean) + +Add `executeWithLockAsync(key, supplier): CF`, `executeWithLeaderElectionAsync(key, ...): CF`, `executeWithOrderedLocksAsync(...): CF` to `LockStrategy` interface. Implement in `PostgresAdvisoryLockStrategy` + `OrderedLockExecutor`. `@Deprecated` sync methods. module-app legacy continues with warnings. + +Eliminates: 5 HIGH sites. + +### Sub-PR 2: PGMQ Worker CF path (HIGH impact, larger) + +Refactor `process(msg: PgmqMessage): Boolean` to return a richer `ProcessOutcome` sealed class (Ack / Nack(retryable) / Nack(deadletter) / Nack(visibilityReset)). The `Boolean` return is the actual blocking constraint (workers call `.join()` internally to coerce CF chain to Boolean). + +Alternatively: introduce `MessageHandler` interface with `CF` return. Add to `PgmqWorker` framework. Migrate workers one by one. + +Eliminates: 8 HIGH + MEDIUM sites. + +### Sub-PR 3: ext-api Controller + Scheduler (MEDIUM impact) + +Convert `InternalApiController` endpoints to fire-and-forget pattern (already partially there). Remove `runBlocking` in `ExternalApiScheduler.runOcidPhase` by converting to pure CF via `coroutineScope { async { ... }.await() }`. + +Eliminates: 2 MEDIUM + 1 HIGH sites. + +### Sub-PR 4: ChunkFileManager close path (HIGH impact, contained) + +Convert `awaitAllUploads` to `closeAsync(): CF`. Use `thenRun` for manifest write. `ChunkedSnapshotSink.close()` chains via `thenCompose`. + +Eliminates: 1 HIGH site. + +### Sub-PR 5: BackpressureLimiter migration + runBlocking cleanup (MEDIUM impact) + +- `UrgentCharacterRequestConsumer`: `Semaphore.tryAcquire` + `release` in `whenComplete` → `BackpressureLimiter.tryAcquireAsync` + `releaseAsync` in `whenComplete` (with `.whenComplete` cancel-safe handling). +- `OcidLookupRunConsumer.kt:38` + `EquipmentRankingRedisWriter.kt:26, 31`: convert `runBlocking` to inline async or move to dedicated async endpoint. + +Eliminates: 1 MEDIUM + 4 MEDIUM sites. + +### Out of scope (DOC-EXEMPT) + +- `EquipmentFetchProvider.kt:72` `.join()`: KEEP. Class Javadoc documents intentional. +- `OcidLookupPhase.kt:147-148` `Job.join()` + `await()`: KEEP. Both are suspend functions, not blocking primitives. Add code comment to make this explicit (was task T11 in original plan). + +## Audit artifacts + +- 6 explore subagents dispatched (1 stopped early, 5 completed) +- 4 production files read directly (DefaultLogicExecutor, TaskContext, CheckedLogicExecutor, JwtAuthenticationFilter, GlobalAdmissionControl, EquipmentFetchProvider, StarforceLookupAdapter, CubeComputeBuffer) +- 0 production code changes +- 0 commits made +- Feature branch `feature/issue-CF-CHAIN-blocking-fix` exists with 0 commits (clean state) + +## Recommendation + +Defer all 5 sub-PRs to individual issues. Each = small, reviewable, with green build per commit (using `@Deprecated` shim where applicable, mirroring grill-me Q1=A, Q6=B, Q9=A from original brainstorm). + +Do NOT use the original 19-task plan as a basis — it assumes wrong code structure. diff --git a/module-app/src/test/java/maple/expectation/monitoring/MonitoringAlertServiceUnitTest.java b/module-app/src/test/java/maple/expectation/monitoring/MonitoringAlertServiceUnitTest.java index c8c5d760f..d54be8abb 100644 --- a/module-app/src/test/java/maple/expectation/monitoring/MonitoringAlertServiceUnitTest.java +++ b/module-app/src/test/java/maple/expectation/monitoring/MonitoringAlertServiceUnitTest.java @@ -1,12 +1,14 @@ package maple.expectation.monitoring; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.BDDMockito.given; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import java.util.concurrent.CompletableFuture; import maple.expectation.core.port.out.BufferStatusQuery; import maple.expectation.infrastructure.alert.StatelessAlertService; import maple.expectation.infrastructure.config.MonitoringThresholdProperties; @@ -104,7 +106,8 @@ void setUp() { @DisplayName("리더 권한을 획득하고 전역 임계치를 초과하면 알림을 발송한다") void leaderSuccess_OverThreshold_SendAlert() { // given - given(lockStrategy.tryLockImmediately(eq("global-monitoring-lock"), eq(4L))).willReturn(true); + given(lockStrategy.tryLockImmediatelyAsync(eq("global-monitoring-lock"), eq(4L))) + .willReturn(CompletableFuture.completedFuture(true)); given(bufferStatus.getTotalPendingCount()).willReturn(6000L); given(thresholdProperties.getBufferSaturationCount()).willReturn(5000L); @@ -119,7 +122,8 @@ void leaderSuccess_OverThreshold_SendAlert() { @DisplayName("전역 임계치 이하일 때는 리더 권한이 있어도 알림을 보내지 않는다") void leaderSuccess_UnderThreshold_NoAlert() { // given - given(lockStrategy.tryLockImmediately(eq("global-monitoring-lock"), eq(4L))).willReturn(true); + given(lockStrategy.tryLockImmediatelyAsync(eq("global-monitoring-lock"), eq(4L))) + .willReturn(CompletableFuture.completedFuture(true)); given(bufferStatus.getTotalPendingCount()).willReturn(3000L); given(thresholdProperties.getBufferSaturationCount()).willReturn(5000L); @@ -134,7 +138,8 @@ void leaderSuccess_UnderThreshold_NoAlert() { @DisplayName("리더 선출 실패 시 모니터링을 스킵한다") void follower_SkipMonitoring() { // given - given(lockStrategy.tryLockImmediately(eq("global-monitoring-lock"), eq(4L))).willReturn(false); + given(lockStrategy.tryLockImmediatelyAsync(eq("global-monitoring-lock"), eq(4L))) + .willReturn(CompletableFuture.completedFuture(false)); // when monitoringAlertService.checkBufferSaturation(); @@ -149,7 +154,8 @@ void follower_SkipMonitoring() { @DisplayName("정확히 임계값(5000)을 초과하면 알림을 발송한다") void exactlyAtThreshold_SendAlert() { // given - given(lockStrategy.tryLockImmediately(eq("global-monitoring-lock"), eq(4L))).willReturn(true); + given(lockStrategy.tryLockImmediatelyAsync(eq("global-monitoring-lock"), eq(4L))) + .willReturn(CompletableFuture.completedFuture(true)); given(bufferStatus.getTotalPendingCount()).willReturn(5001L); given(thresholdProperties.getBufferSaturationCount()).willReturn(5000L); @@ -164,7 +170,8 @@ void exactlyAtThreshold_SendAlert() { @DisplayName("버퍼가 비어있을 때는 알림을 보내지 않는다") void bufferZero_NoAlert() { // given - given(lockStrategy.tryLockImmediately(eq("global-monitoring-lock"), eq(4L))).willReturn(true); + given(lockStrategy.tryLockImmediatelyAsync(eq("global-monitoring-lock"), eq(4L))) + .willReturn(CompletableFuture.completedFuture(true)); given(bufferStatus.getTotalPendingCount()).willReturn(0L); given(thresholdProperties.getBufferSaturationCount()).willReturn(5000L); @@ -179,7 +186,8 @@ void bufferZero_NoAlert() { @DisplayName("복구 가능한 분산 락 획득 실패 시 재시도하지 않고 스킵한다") void lockAcquisitionFailure_SkipMonitoring() { // given - given(lockStrategy.tryLockImmediately(eq("global-monitoring-lock"), eq(4L))).willReturn(false); + given(lockStrategy.tryLockImmediatelyAsync(eq("global-monitoring-lock"), eq(4L))) + .willReturn(CompletableFuture.completedFuture(false)); // when monitoringAlertService.checkBufferSaturation(); diff --git a/module-app/src/test/java/maple/expectation/scheduler/PopularCharacterWarmupSchedulerTest.java b/module-app/src/test/java/maple/expectation/scheduler/PopularCharacterWarmupSchedulerTest.java index da82616ab..3863bb36d 100644 --- a/module-app/src/test/java/maple/expectation/scheduler/PopularCharacterWarmupSchedulerTest.java +++ b/module-app/src/test/java/maple/expectation/scheduler/PopularCharacterWarmupSchedulerTest.java @@ -7,7 +7,8 @@ import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import java.util.List; -import maple.expectation.common.function.ThrowingSupplier; +import java.util.concurrent.CompletableFuture; +import kotlin.jvm.functions.Function0; import maple.expectation.core.port.out.CacheWarmupPort; import maple.expectation.core.port.out.PopularCharacterTrackerPort; import maple.expectation.error.exception.DistributedLockException; @@ -78,18 +79,21 @@ void setUp() { @DisplayName("dailyWarmup") class DailyWarmupTest { + @SuppressWarnings("unchecked") + private void stubLockSuccess() { + given(lockStrategy.executeWithLockAsync(anyString(), anyLong(), anyLong(), any(Function0.class))) + .willAnswer( + invocation -> { + Function0> supplier = invocation.getArgument(3); + return supplier.invoke(); + }); + } + @Test @DisplayName("분산 락 획득 후 웜업 실행") void shouldExecuteWarmupWithLock() throws Throwable { // given - given( - lockStrategy.executeWithLock( - anyString(), anyLong(), anyLong(), any(ThrowingSupplier.class))) - .willAnswer( - invocation -> { - ThrowingSupplier supplier = invocation.getArgument(3); - return supplier.get(); - }); + stubLockSuccess(); given(popularCharacterTracker.getYesterdayTopCharacters(50)) .willReturn(List.of("User1", "User2")); @@ -98,18 +102,18 @@ void shouldExecuteWarmupWithLock() throws Throwable { // then verify(lockStrategy) - .executeWithLock( - eq("popular-warmup-lock"), eq(0L), eq(300L), any(ThrowingSupplier.class)); + .executeWithLockAsync( + eq("popular-warmup-lock"), eq(0L), eq(300L), any(Function0.class)); } @Test @DisplayName("락 획득 실패 시 스킵") void whenLockFailed_shouldSkip() throws Throwable { - // given - given( - lockStrategy.executeWithLock( - anyString(), anyLong(), anyLong(), any(ThrowingSupplier.class))) - .willThrow(new DistributedLockException("Lock failed")); + // given - 완료되었지만 DistributedLockException으로 실패하는 future 반환 + given(lockStrategy.executeWithLockAsync(anyString(), anyLong(), anyLong(), any(Function0.class))) + .willAnswer( + invocation -> + CompletableFuture.failedFuture(new DistributedLockException("Lock failed"))); // when scheduler.dailyWarmup(); @@ -123,18 +127,21 @@ void whenLockFailed_shouldSkip() throws Throwable { @DisplayName("initialWarmup") class InitialWarmupTest { + @SuppressWarnings("unchecked") + private void stubLockSuccess() { + given(lockStrategy.executeWithLockAsync(anyString(), anyLong(), anyLong(), any(Function0.class))) + .willAnswer( + invocation -> { + Function0> supplier = invocation.getArgument(3); + return supplier.invoke(); + }); + } + @Test @DisplayName("서버 시작 후 웜업 실행") void shouldExecuteInitialWarmup() throws Throwable { // given - given( - lockStrategy.executeWithLock( - anyString(), anyLong(), anyLong(), any(ThrowingSupplier.class))) - .willAnswer( - invocation -> { - ThrowingSupplier supplier = invocation.getArgument(3); - return supplier.get(); - }); + stubLockSuccess(); given(popularCharacterTracker.getYesterdayTopCharacters(50)).willReturn(List.of("InitUser1")); // when @@ -142,8 +149,8 @@ void shouldExecuteInitialWarmup() throws Throwable { // then verify(lockStrategy) - .executeWithLock( - eq("popular-warmup-lock"), anyLong(), anyLong(), any(ThrowingSupplier.class)); + .executeWithLockAsync( + eq("popular-warmup-lock"), anyLong(), anyLong(), any(Function0.class)); } } @@ -151,18 +158,21 @@ void shouldExecuteInitialWarmup() throws Throwable { @DisplayName("웜업 로직") class WarmupLogicTest { + @SuppressWarnings("unchecked") + private void stubLockSuccess() { + given(lockStrategy.executeWithLockAsync(anyString(), anyLong(), anyLong(), any(Function0.class))) + .willAnswer( + invocation -> { + Function0> supplier = invocation.getArgument(3); + return supplier.invoke(); + }); + } + @Test @DisplayName("인기 캐릭터 목록 조회") void shouldGetTopCharacters() throws Throwable { // given - given( - lockStrategy.executeWithLock( - anyString(), anyLong(), anyLong(), any(ThrowingSupplier.class))) - .willAnswer( - invocation -> { - ThrowingSupplier supplier = invocation.getArgument(3); - return supplier.get(); - }); + stubLockSuccess(); given(popularCharacterTracker.getYesterdayTopCharacters(50)) .willReturn(List.of("Top1", "Top2", "Top3")); @@ -178,14 +188,7 @@ void shouldGetTopCharacters() throws Throwable { void shouldWarmupEachCharacter() throws Throwable { // given List topCharacters = List.of("Char1", "Char2", "Char3"); - given( - lockStrategy.executeWithLock( - anyString(), anyLong(), anyLong(), any(ThrowingSupplier.class))) - .willAnswer( - invocation -> { - ThrowingSupplier supplier = invocation.getArgument(3); - return supplier.get(); - }); + stubLockSuccess(); given(popularCharacterTracker.getYesterdayTopCharacters(50)).willReturn(topCharacters); // when @@ -199,14 +202,7 @@ void shouldWarmupEachCharacter() throws Throwable { @DisplayName("인기 캐릭터가 없으면 웜업 스킵") void whenNoCharacters_shouldSkipWarmup() throws Throwable { // given - given( - lockStrategy.executeWithLock( - anyString(), anyLong(), anyLong(), any(ThrowingSupplier.class))) - .willAnswer( - invocation -> { - ThrowingSupplier supplier = invocation.getArgument(3); - return supplier.get(); - }); + stubLockSuccess(); given(popularCharacterTracker.getYesterdayTopCharacters(50)).willReturn(List.of()); // when @@ -221,14 +217,7 @@ void whenNoCharacters_shouldSkipWarmup() throws Throwable { void whenCharacterFails_shouldContinueWithNext() throws Throwable { // given List topCharacters = List.of("Fail1", "Success2", "Fail3"); - given( - lockStrategy.executeWithLock( - anyString(), anyLong(), anyLong(), any(ThrowingSupplier.class))) - .willAnswer( - invocation -> { - ThrowingSupplier supplier = invocation.getArgument(3); - return supplier.get(); - }); + stubLockSuccess(); given(popularCharacterTracker.getYesterdayTopCharacters(50)).willReturn(topCharacters); // 첫 번째와 세 번째 호출은 예외 diff --git a/module-infra/src/main/kotlin/maple/expectation/infrastructure/aop/aspect/LockAspect.kt b/module-infra/src/main/kotlin/maple/expectation/infrastructure/aop/aspect/LockAspect.kt index 72043c0b5..d65f74980 100644 --- a/module-infra/src/main/kotlin/maple/expectation/infrastructure/aop/aspect/LockAspect.kt +++ b/module-infra/src/main/kotlin/maple/expectation/infrastructure/aop/aspect/LockAspect.kt @@ -1,6 +1,5 @@ package maple.expectation.infrastructure.aop.aspect -import maple.expectation.common.function.ThrowingSupplier import maple.expectation.error.exception.DistributedLockException import maple.expectation.error.exception.InternalSystemException import maple.expectation.infrastructure.aop.annotation.Locked @@ -14,6 +13,7 @@ import org.aspectj.lang.annotation.Aspect import org.slf4j.LoggerFactory import org.springframework.core.annotation.Order import org.springframework.stereotype.Component +import java.util.concurrent.CompletableFuture @Aspect @Order(0) @@ -46,20 +46,24 @@ class LockAspect( key: String, waitSeconds: Long, leaseSeconds: Long, - ): Any = lockStrategy.executeWithLock( - key, - waitSeconds, - leaseSeconds, - createLockedTask(joinPoint, key), - ) - - private fun createLockedTask(joinPoint: ProceedingJoinPoint, key: String): ThrowingSupplier = ThrowingSupplier { - log.debug("🔑 [Locked Aspect] 락 획득 성공: {}", key) - joinPoint.proceed() + ): Any { + val cf = lockStrategy.executeWithLockAsync( + key, + waitSeconds, + leaseSeconds, + ) { + log.debug("🔑 [Locked Aspect] 락 획득 성공: {}", key) + CompletableFuture.completedFuture(joinPoint.proceed()) + } + // AOP boundary: caller of @Locked chose sync semantics; .get() blocks only this aspect's caller. + // This is the documented exception to the no-join/get rule at the AOP wrapper boundary. + return cf.get() } private fun handleLockFailure(joinPoint: ProceedingJoinPoint, key: String, e: Throwable): Any? { - if (e is DistributedLockException) { + // cf.get() wraps the original cause in ExecutionException; unwrap for the type check. + val cause = e.cause ?: e + if (e is DistributedLockException || cause is DistributedLockException) { log.warn("⏭️ [Locked Timeout] {} - 락 획득 실패. 직접 조회를 시도합니다.", key) return proceedWithoutLock(joinPoint, key) } diff --git a/module-infra/src/main/kotlin/maple/expectation/infrastructure/batch/MonitoringReportJob.kt b/module-infra/src/main/kotlin/maple/expectation/infrastructure/batch/MonitoringReportJob.kt index 003cbc50a..ba17967c5 100644 --- a/module-infra/src/main/kotlin/maple/expectation/infrastructure/batch/MonitoringReportJob.kt +++ b/module-infra/src/main/kotlin/maple/expectation/infrastructure/batch/MonitoringReportJob.kt @@ -2,6 +2,7 @@ package maple.expectation.infrastructure.batch import java.time.LocalDateTime import java.time.format.DateTimeFormatter +import java.util.concurrent.CompletableFuture import maple.expectation.core.domain.model.AlertMessage import maple.expectation.core.port.out.AlertPort import maple.expectation.core.port.out.SystemMetricsPort @@ -59,9 +60,13 @@ class MonitoringReportJob( return } - // Leader Election: xact-scoped lock으로 단일 인스턴스만 실행 (#628) - lockStrategy.executeWithLock(REPORT_LOCK_KEY, 10, LOCK_LEASE_SECONDS.toLong()) { - generateAndSendReport(reportType) + // Leader Election: session-scoped async lock으로 단일 인스턴스만 실행 (#628) + lockStrategy.executeWithLockAsync(REPORT_LOCK_KEY, 10, LOCK_LEASE_SECONDS.toLong()) { + CompletableFuture.completedFuture(generateAndSendReport(reportType)) + }.whenComplete { _, ex -> + if (ex != null) { + log.error("[MonitoringReport] {} 리포트 실행 실패: {}", reportType, ex.cause ?: ex.message) + } } } diff --git a/module-infra/src/main/kotlin/maple/expectation/infrastructure/batch/scheduler/BatchJobRecoveryScheduler.kt b/module-infra/src/main/kotlin/maple/expectation/infrastructure/batch/scheduler/BatchJobRecoveryScheduler.kt index 4448ba843..b3a457cd7 100644 --- a/module-infra/src/main/kotlin/maple/expectation/infrastructure/batch/scheduler/BatchJobRecoveryScheduler.kt +++ b/module-infra/src/main/kotlin/maple/expectation/infrastructure/batch/scheduler/BatchJobRecoveryScheduler.kt @@ -2,6 +2,7 @@ package maple.expectation.infrastructure.batch.scheduler import java.time.Instant import java.time.temporal.ChronoUnit +import java.util.concurrent.CompletableFuture import kotlin.math.min import kotlin.math.pow import maple.expectation.error.exception.DistributedLockException @@ -103,13 +104,16 @@ class BatchJobRecoveryScheduler( executor.executeOrCatch( { - // Acquire distributed lock - lockStrategy.executeWithLock( + // Acquire distributed lock (async chain, no caller-side .get()) + lockStrategy.executeWithLockAsync( lockName, 0, 10, // 10 second lock timeout - this::performRecovery, - ) + ) { + CompletableFuture.completedFuture(performRecovery()) + }.whenComplete { _, ex -> + if (ex != null) handleLockFailure(ex.cause ?: ex) + } null }, { e -> diff --git a/module-infra/src/main/kotlin/maple/expectation/infrastructure/bulk/BulkLoaderService.kt b/module-infra/src/main/kotlin/maple/expectation/infrastructure/bulk/BulkLoaderService.kt index edf15476d..f40e530b5 100644 --- a/module-infra/src/main/kotlin/maple/expectation/infrastructure/bulk/BulkLoaderService.kt +++ b/module-infra/src/main/kotlin/maple/expectation/infrastructure/bulk/BulkLoaderService.kt @@ -113,7 +113,7 @@ class BulkLoaderService( val actualPath = csvPath ?: properties.csvPath val path = resolvePath(actualPath) - return lockStrategy.executeWithLock( + return lockStrategy.executeWithLockAsync( LOCK_KEY, LOCK_WAIT_TIME_SECONDS, LOCK_LEASE_TIME_SECONDS, @@ -132,12 +132,12 @@ class BulkLoaderService( log.info("[BulkLoaderService] Read {} characters from CSV", total) if (total == 0) { - return@executeWithLock CompletableFuture.completedFuture( + CompletableFuture.completedFuture( LoadResult(0, 0, 0, 0, 0), ) + } else { + processBatch(ignList, emptySet(), 0, total, start, force) } - - processBatch(ignList, emptySet(), 0, total, start, force) } } @@ -150,7 +150,7 @@ class BulkLoaderService( ) private fun resumeInternal(): CompletableFuture { - return lockStrategy.executeWithLock( + return lockStrategy.executeWithLockAsync( LOCK_KEY, LOCK_WAIT_TIME_SECONDS, LOCK_LEASE_TIME_SECONDS, @@ -160,41 +160,41 @@ class BulkLoaderService( val checkpoint = checkpointManager.load() if (checkpoint == null) { log.warn("[BulkLoaderService] No checkpoint found, starting fresh") - return@executeWithLock loadAllInternal(null, false) - } - - val path = resolvePath(properties.csvPath) - val start = Instant.now() - startTime.set(start) - stopRequested.set(false) - isRunning.set(true) - - val ignList = readCsvFile(path) - val remainingList = ignList.drop(checkpoint.lastProcessedIndex + 1) - val remainingCount = remainingList.size - val skippedCount = checkpoint.completedIgnSet.size - - log.info( - "[BulkLoaderService] Resuming from index {}: {} remaining, {} skipped", - checkpoint.lastProcessedIndex, - remainingCount, - skippedCount, - ) - - if (remainingCount == 0) { - return@executeWithLock CompletableFuture.completedFuture( - LoadResult(checkpoint.totalCharacters, skippedCount, 0, skippedCount, 0), + loadAllInternal(null, false) + } else { + val path = resolvePath(properties.csvPath) + val start = Instant.now() + startTime.set(start) + stopRequested.set(false) + isRunning.set(true) + + val ignList = readCsvFile(path) + val remainingList = ignList.drop(checkpoint.lastProcessedIndex + 1) + val remainingCount = remainingList.size + val skippedCount = checkpoint.completedIgnSet.size + + log.info( + "[BulkLoaderService] Resuming from index {}: {} remaining, {} skipped", + checkpoint.lastProcessedIndex, + remainingCount, + skippedCount, ) - } - processBatch( - remainingList, - checkpoint.completedIgnSet, - checkpoint.lastProcessedIndex + 1, - checkpoint.totalCharacters, - start, - false, - ) + if (remainingCount == 0) { + CompletableFuture.completedFuture( + LoadResult(checkpoint.totalCharacters, skippedCount, 0, skippedCount, 0), + ) + } else { + processBatch( + remainingList, + checkpoint.completedIgnSet, + checkpoint.lastProcessedIndex + 1, + checkpoint.totalCharacters, + start, + false, + ) + } + } } } @@ -207,7 +207,7 @@ class BulkLoaderService( ) private fun retryFailedInternal(): CompletableFuture { - return lockStrategy.executeWithLock( + return lockStrategy.executeWithLockAsync( "$LOCK_KEY:retry", LOCK_WAIT_TIME_SECONDS, LOCK_LEASE_TIME_SECONDS, @@ -219,26 +219,26 @@ class BulkLoaderService( if (failedIgnSet.isEmpty()) { log.info("[BulkLoaderService] No failed characters to retry") - return@executeWithLock CompletableFuture.completedFuture( + CompletableFuture.completedFuture( LoadResult(0, 0, 0, 0, 0), ) - } - - log.info("[BulkLoaderService] Retrying {} failed characters", failedIgnSet.size) + } else { + log.info("[BulkLoaderService] Retrying {} failed characters", failedIgnSet.size) - // Clear failed tracker for fresh start - failedTracker.clear() + // Clear failed tracker for fresh start + failedTracker.clear() - val start = Instant.now() - startTime.set(start) - stopRequested.set(false) - isRunning.set(true) + val start = Instant.now() + startTime.set(start) + stopRequested.set(false) + isRunning.set(true) - val total = failedIgnSet.size - totalCharacters.set(total.toLong()) + val total = failedIgnSet.size + totalCharacters.set(total.toLong()) - val ignList = failedIgnSet.toList() - processBatch(ignList, emptySet(), 0, total, start, true) + val ignList = failedIgnSet.toList() + processBatch(ignList, emptySet(), 0, total, start, true) + } } } diff --git a/module-infra/src/main/kotlin/maple/expectation/infrastructure/lock/AbstractLockStrategy.kt b/module-infra/src/main/kotlin/maple/expectation/infrastructure/lock/AbstractLockStrategy.kt index 33bf6c905..ccbda9c74 100644 --- a/module-infra/src/main/kotlin/maple/expectation/infrastructure/lock/AbstractLockStrategy.kt +++ b/module-infra/src/main/kotlin/maple/expectation/infrastructure/lock/AbstractLockStrategy.kt @@ -1,5 +1,8 @@ package maple.expectation.infrastructure.lock +import java.util.concurrent.CompletableFuture +import java.util.concurrent.TimeUnit +import java.util.stream.Collectors import maple.expectation.common.function.ThrowingSupplier import maple.expectation.error.exception.DistributedLockException import maple.expectation.infrastructure.executor.LogicExecutor @@ -9,6 +12,86 @@ import maple.expectation.infrastructure.executor.strategy.ExceptionTranslator /** 락 전략 추상 클래스 (TaskContext 및 평탄화 적용) */ abstract class AbstractLockStrategy(protected val executor: LogicExecutor) : LockStrategy { + // === ASYNC API (template-style) === + // Concrete impls (PostgresLockStrategy, GuavaLockStrategy) provide the + // tryAcquireSessionLockAsync / releaseSessionLockAsync hooks. The CF chain + // guarantees lock release on both success and failure paths via whenComplete. + + override fun executeWithLockAsync( + key: String, + waitTime: Long, + leaseTime: Long, + supplier: () -> CompletableFuture, + ): CompletableFuture { + val lockKey = buildLockKey(key) + val context = TaskContext.of("Lock", "ExecuteAsync", key) + + return tryAcquireSessionLockAsync(lockKey, waitTime, leaseTime, context) + .thenCompose { lockId -> executeSuppliedTask(lockKey, lockId, supplier, context) } + } + + private fun executeSuppliedTask( + lockKey: String, + lockId: Long?, + supplier: () -> CompletableFuture, + context: TaskContext, + ): CompletableFuture = + if (lockId == null) { + onLockFailed(lockKey) + CompletableFuture.failedFuture(createLockFailureException(lockKey)) + } else { + onLockAcquired(lockKey) + supplier().whenComplete { _, _ -> + releaseSessionLockAsync(lockKey, lockId, context) + } + } + + override fun executeWithLockAsync( + key: String, + supplier: () -> CompletableFuture, + ): CompletableFuture = executeWithLockAsync(key, DEFAULT_WAIT_TIME, DEFAULT_LEASE_TIME, supplier) + + override fun tryLockImmediatelyAsync(key: String, leaseTime: Long): CompletableFuture { + val lockKey = buildLockKey(key) + val context = TaskContext.of("Lock", "TryLockAsync", key) + return tryAcquireSessionLockAsync(lockKey, 0, leaseTime, context).thenApply { it != null } + } + + override fun unlockAsync(key: String): CompletableFuture { + val lockKey = buildLockKey(key) + val context = TaskContext.of("Lock", "UnlockAsync", key) + return releaseSessionLockAsync(lockKey, null, context) + } + + override fun executeWithOrderedLocksAsync( + keys: List, + totalTimeout: Long, + timeUnit: TimeUnit, + leaseTime: Long, + supplier: () -> CompletableFuture, + ): CompletableFuture { + val compositeKey = keys.stream().sorted().collect(Collectors.joining(":")) + val timeoutSeconds = timeUnit.toSeconds(totalTimeout) + return executeWithLockAsync(compositeKey, timeoutSeconds, leaseTime, supplier) + } + + // === Abstract async hooks (concrete impls provide these) === + + protected abstract fun tryAcquireSessionLockAsync( + lockKey: String, + waitTime: Long, + leaseTime: Long, + ctx: TaskContext, + ): CompletableFuture + + protected abstract fun releaseSessionLockAsync( + lockKey: String, + lockId: Long?, + ctx: TaskContext, + ): CompletableFuture + + // === SYNC API (legacy, kept for module-app compat) === + override fun executeWithLock(key: String, waitTime: Long, leaseTime: Long, task: ThrowingSupplier): T { val lockKey = buildLockKey(key) // ✅ TaskContext 정의: Component="Lock", Operation="Execute" @@ -95,5 +178,7 @@ abstract class AbstractLockStrategy(protected val executor: LogicExecutor) : Loc companion object { private val log = org.slf4j.LoggerFactory.getLogger(AbstractLockStrategy::class.java) + private const val DEFAULT_WAIT_TIME = 10L + private const val DEFAULT_LEASE_TIME = 20L } } diff --git a/module-infra/src/main/kotlin/maple/expectation/infrastructure/lock/GuavaLockStrategy.kt b/module-infra/src/main/kotlin/maple/expectation/infrastructure/lock/GuavaLockStrategy.kt index d1a28e84b..e4d1f1e23 100644 --- a/module-infra/src/main/kotlin/maple/expectation/infrastructure/lock/GuavaLockStrategy.kt +++ b/module-infra/src/main/kotlin/maple/expectation/infrastructure/lock/GuavaLockStrategy.kt @@ -1,6 +1,9 @@ package maple.expectation.infrastructure.lock import com.google.common.util.concurrent.Striped +import java.util.concurrent.CompletableFuture +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.Executors import java.util.concurrent.TimeUnit import java.util.concurrent.locks.Lock import maple.expectation.infrastructure.executor.LogicExecutor @@ -17,6 +20,22 @@ class GuavaLockStrategy(executor: LogicExecutor) : AbstractLockStrategy(executor private val locks: Striped = Striped.lock(128) + /** + * Session-scoped lock registry: maps logical key -> held Lock so that + * [releaseSessionLockAsync] knows which Stripe slot to release for + * [tryLockImmediatelyAsync]. Test-only — in-memory map is acceptable + * because the test profile runs single-process. + */ + private val sessionLockHolders = ConcurrentHashMap() + + /** + * Test-only executor: not a Spring bean, so cannot inject defaultAsyncExecutor. + * Cached pool is fine for short-lived test workloads. + */ + private val lockExecutor = Executors.newCachedThreadPool { r -> + Thread(r, "guava-lock-${r.hashCode()}").apply { isDaemon = true } + } + @Throws(Throwable::class) override fun tryLock(lockKey: String, waitTime: Long, leaseTime: Long): Boolean { // 부모 클래스(AbstractLockStrategy)의 템플릿 메서드에서 호출됨 @@ -43,7 +62,61 @@ class GuavaLockStrategy(executor: LogicExecutor) : AbstractLockStrategy(executor ) } + /** + * [SESSION-SCOPED] Async: Poll for Guava Striped lock acquisition on the + * test-only executor. Returns the lockKey's hashCode as a stable lockId + * on success, or null on timeout. + */ + override fun tryAcquireSessionLockAsync( + lockKey: String, + waitTime: Long, + leaseTime: Long, + ctx: TaskContext, + ): CompletableFuture = CompletableFuture.supplyAsync({ + val deadline = System.currentTimeMillis() + waitTime * 1000L + while (System.currentTimeMillis() < deadline) { + if (Thread.currentThread().isInterrupted) { + throw maple.expectation.error.exception.DistributedLockException( + "Interrupted while waiting for lock: $lockKey", + ) + } + val lock = locks.get(lockKey) + if (lock.tryLock()) { + sessionLockHolders[lockKey] = lock + return@supplyAsync lockKey.hashCode().toLong() + } + // VT-friendly: Thread.sleep on virtual thread does not pin carrier. + Thread.sleep(POLL_INTERVAL_MS) + } + null + }, lockExecutor) + + /** + * [SESSION-SCOPED] Async: Release previously acquired session-scoped lock. + * Called from whenComplete — must never throw. + * + * Justified: invoked from `whenComplete` callbacks inside CompletableFuture + * chains. LogicExecutor cannot be used inside `whenComplete` (it expects a + * synchronous caller context), so a thrown exception from unlock() would + * complete the outer CF exceptionally AFTER the supplier has already returned. + * Logging + swallow is the only safe action. + */ + override fun releaseSessionLockAsync( + lockKey: String, + lockId: Long?, + ctx: TaskContext, + ): CompletableFuture = CompletableFuture.runAsync({ + val lock = sessionLockHolders.remove(lockKey) ?: return@runAsync + try { + lock.unlock() + log.debug("🔓 [Guava Lock] Unlocked key: {}", lockKey) + } catch (e: Exception) { + log.warn("[Guava Lock] Failed to release session lock: key={}", lockKey, e) + } + }, lockExecutor) + companion object { private val log = org.slf4j.LoggerFactory.getLogger(GuavaLockStrategy::class.java) + private const val POLL_INTERVAL_MS = 100L } } diff --git a/module-infra/src/main/kotlin/maple/expectation/infrastructure/lock/LeaderElectionStrategy.kt b/module-infra/src/main/kotlin/maple/expectation/infrastructure/lock/LeaderElectionStrategy.kt index b2709e49d..5581ac6e7 100644 --- a/module-infra/src/main/kotlin/maple/expectation/infrastructure/lock/LeaderElectionStrategy.kt +++ b/module-infra/src/main/kotlin/maple/expectation/infrastructure/lock/LeaderElectionStrategy.kt @@ -1,6 +1,7 @@ package maple.expectation.infrastructure.lock import maple.expectation.common.function.ThrowingSupplier +import java.util.concurrent.CompletableFuture /** * Leader Election Strategy Interface @@ -11,7 +12,26 @@ import maple.expectation.common.function.ThrowingSupplier interface LeaderElectionStrategy { /** - * Execute task with leader election + * Async: Execute task with leader election. + * + *

The first caller to acquire the lock becomes the leader and executes the leader task. + * Subsequent callers become followers and wait for the leader to complete. + * + * @param key Unique identifier for the election (e.g., character name) + * @param waitTimeSeconds Maximum time followers will wait for the leader + * @param leaderSupplier Async task executed by the leader (e.g., create new character) + * @param followerSupplier Async task executed by followers (e.g., wait and read from DB) + * @return CompletableFuture that completes with the result of the leader or follower task + */ + fun executeWithLeaderElectionAsync( + key: String, + waitTimeSeconds: Int, + leaderSupplier: () -> CompletableFuture, + followerSupplier: () -> CompletableFuture, + ): CompletableFuture + + /** + * Legacy sync: Execute task with leader election. * *

The first caller to acquire the lock becomes the leader and executes the leader task. * Subsequent callers become followers and wait for the leader to complete. @@ -20,12 +40,17 @@ interface LeaderElectionStrategy { * @param waitTimeSeconds Maximum time followers will wait for the leader * @param leaderTask Task executed by the leader (e.g., create new character) * @param followerTask Task executed by followers (e.g., wait and read from DB) - * @return Result of the executed task + * + * @deprecated Use executeWithLeaderElectionAsync. Soft-deprecated for module-app legacy. + * module-app migration = follow-up PR. + * + * @see executeWithLeaderElectionAsync */ + @Deprecated("Use executeWithLeaderElectionAsync", ReplaceWith("executeWithLeaderElectionAsync(key, waitTimeSeconds, { CompletableFuture.completedFuture(leaderTask.get()) }, { CompletableFuture.completedFuture(followerTask.get()) })")) fun executeWithLeaderElection( key: String, waitTimeSeconds: Int, leaderTask: ThrowingSupplier, followerTask: ThrowingSupplier, ): T -} +} \ No newline at end of file diff --git a/module-infra/src/main/kotlin/maple/expectation/infrastructure/lock/LockStrategy.kt b/module-infra/src/main/kotlin/maple/expectation/infrastructure/lock/LockStrategy.kt index 9c3e54f62..f52face39 100644 --- a/module-infra/src/main/kotlin/maple/expectation/infrastructure/lock/LockStrategy.kt +++ b/module-infra/src/main/kotlin/maple/expectation/infrastructure/lock/LockStrategy.kt @@ -1,68 +1,64 @@ package maple.expectation.infrastructure.lock -import java.util.List +import maple.expectation.common.function.ThrowingSupplier +import java.util.concurrent.CompletableFuture import java.util.concurrent.TimeUnit import java.util.stream.Collectors -import maple.expectation.common.function.ThrowingSupplier -/** - * 분산 락 전략 인터페이스 - * - *

P0-N02 Fix: Lock Ordering 지원 (Issue #221)

- * - *

다중 락 시나리오에서 Coffman Condition #4 (Circular Wait)를 방지하기 위해 {@link #executeWithOrderedLocks} - * 메서드를 추가했습니다. - * - *

CLAUDE.md 준수사항

- * - *
    - *
  • Section 4: OCP 원칙 - default 메서드로 기존 구현체 호환성 유지 - *
  • P1-BLUE-02: 복합키 방식 기본 구현 (구현체에서 Override 권장) - *
- */ interface LockStrategy { - // 1. 기존: 락을 획득하고 작업을 실행 (WaitTime 대기 포함) + // --- New async API (preferred) --- + fun executeWithLockAsync( + key: String, + waitTime: Long, + leaseTime: Long, + supplier: () -> CompletableFuture, + ): CompletableFuture + + fun executeWithLockAsync( + key: String, + supplier: () -> CompletableFuture, + ): CompletableFuture + + fun tryLockImmediatelyAsync(key: String, leaseTime: Long): CompletableFuture + + fun unlockAsync(key: String): CompletableFuture + + fun executeWithOrderedLocksAsync( + keys: List, + totalTimeout: Long, + timeUnit: TimeUnit, + leaseTime: Long, + supplier: () -> CompletableFuture, + ): CompletableFuture + + // --- Deprecated sync API (kept for module-app legacy; soft-deprecation) --- + @Deprecated("Use executeWithLockAsync", ReplaceWith("executeWithLockAsync(key, waitTime, leaseTime, { CompletableFuture.completedFuture(task.get()) })")) fun executeWithLock(key: String, waitTime: Long, leaseTime: Long, task: ThrowingSupplier): T - // 2. 기존: 기본 설정값으로 락 실행 + @Deprecated("Use executeWithLockAsync", ReplaceWith("executeWithLockAsync(key, { CompletableFuture.completedFuture(task.get()) })")) fun executeWithLock(key: String, task: ThrowingSupplier): T - // 3. 추가: 즉시 락 획득 시도 (기다리지 않고 성공 여부만 반환) + @Deprecated("Use tryLockImmediatelyAsync") fun tryLockImmediately(key: String, leaseTime: Long): Boolean - // 4. 추가: 락 수동 해제 + @Deprecated("Use unlockAsync") fun unlock(key: String) /** - * [P0-N02] 다중 락 순서 보장 실행 + * [P0-N02] 다중 락 순서 보장 실행 (Legacy sync) * - *

Coffman Condition #4 (Circular Wait) 방지: 락 키들을 알파벳순으로 정렬하여 모든 스레드가 동일한 순서로 락을 획득하도록 - * 강제합니다. + * 기본 구현: 알파벳순 정렬 후 단일 복합키로 결합하여 락을 획득합니다. + * 진정한 다중 락 지원이 필요하면 구현체에서 Override하세요. * - *

기본 구현: 복합키 방식 (P1-BLUE-02)

+ * Note: 이 default 구현은 의도적으로 deprecated된 sync `executeWithLock`를 호출합니다. + * `executeWithOrderedLocksAsync` default로 사용 시 `task.get()`을 강제하게 되어 + * (default path에서도 caller block 발생), legacy compat을 위해 sync chain 유지합니다. + * Concrete impl에서 `executeWithOrderedLocksAsync`를 override하여 async path를 제공하세요. * - *

키들을 정렬 후 단일 복합키로 결합하여 락을 획득합니다. 순차적 개별 락 획득이 필요한 경우 구현체에서 Override하세요. - * - *

사용 예시

- * - *
{@code
-     * // 계좌 이체: from → to 순서 보장
-     * lockStrategy.executeWithOrderedLocks(
-     *     List.of("account:" + fromId, "account:" + toId),
-     *     30, TimeUnit.SECONDS, 60,
-     *     () -> transferService.transfer(fromId, toId, amount)
-     * );
-     * }
- * - * @param keys 락 키 목록 (내부에서 알파벳순 정렬됨) - * @param totalTimeout 전체 타임아웃 값 - * @param timeUnit 타임아웃 단위 - * @param leaseTime 락 유지 시간 (초) - * @param task 실행할 작업 - * @return 작업 결과 - * @throws Throwable 락 획득 실패 또는 작업 실행 중 예외 + * @see executeWithOrderedLocksAsync */ + @Deprecated("Use executeWithOrderedLocksAsync", ReplaceWith("executeWithOrderedLocksAsync(keys, totalTimeout, timeUnit, leaseTime, { CompletableFuture.completedFuture(task.get()) })")) fun executeWithOrderedLocks( keys: List, totalTimeout: Long, @@ -71,9 +67,7 @@ interface LockStrategy { task: ThrowingSupplier, ): T { // [P1-BLUE-02] 기본 구현: 알파벳순 정렬 후 복합키로 결합 - // 진정한 다중 락 지원이 필요하면 구현체에서 Override val compositeKey = keys.stream().sorted().collect(Collectors.joining(":")) - val timeoutSeconds = timeUnit.toSeconds(totalTimeout) return executeWithLock(compositeKey, timeoutSeconds, leaseTime, task) } diff --git a/module-infra/src/main/kotlin/maple/expectation/infrastructure/lock/OrderedLockExecutor.kt b/module-infra/src/main/kotlin/maple/expectation/infrastructure/lock/OrderedLockExecutor.kt index 2e436dff5..58a297b62 100644 --- a/module-infra/src/main/kotlin/maple/expectation/infrastructure/lock/OrderedLockExecutor.kt +++ b/module-infra/src/main/kotlin/maple/expectation/infrastructure/lock/OrderedLockExecutor.kt @@ -2,6 +2,7 @@ package maple.expectation.infrastructure.lock import java.util.ArrayList import java.util.List +import java.util.concurrent.CompletableFuture import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicReference import maple.expectation.common.function.ThrowingSupplier @@ -19,9 +20,30 @@ class OrderedLockExecutor( private val executor: LogicExecutor, ) { + /** + * [Async preferred] 순서 보장 다중 락 실행 — CompletableFuture 반환. + * + * Caller thread는 절대 block되지 않는다. 락 획득과 supplier 실행이 모두 + * CompletableFuture 체인에서 비동기로 진행된다. + */ + fun executeWithOrderedLocksAsync( + keys: List, + totalTimeout: Long, + timeUnit: TimeUnit, + leaseTime: Long, + supplier: () -> CompletableFuture, + ): CompletableFuture { + val context = TaskContext.of("OrderedLock", "ExecuteAsync", java.lang.String.join(",", keys)) + return executeWithOrderedLocksInternalAsync(keys, totalTimeout, timeUnit, leaseTime, supplier, context) + } + /** * 순서 보장 다중 락 실행 (반복 패턴) + * + * @deprecated Use [executeWithOrderedLocksAsync] — sync version blocks caller on `task.get()`. + * Kept for module-app legacy migration; new code must use the *Async variant. */ + @Deprecated("Use executeWithOrderedLocksAsync — sync blocks caller thread on task.get()") fun executeWithOrderedLocks( keys: List, totalTimeout: Long, @@ -39,7 +61,10 @@ class OrderedLockExecutor( /** * 내부 구현: 반복 패턴 또는 중첩 콜백으로 락 획득 및 실행 + * + * @deprecated Use [executeWithOrderedLocksInternalAsync]. */ + @Deprecated("Use executeWithOrderedLocksInternalAsync") @Throws(Throwable::class) private fun executeWithOrderedLocksInternal( keys: List, @@ -71,6 +96,7 @@ class OrderedLockExecutor( /** Redisson용 반복 패턴 전략 */ @Throws(Throwable::class) + @Deprecated("Use executeWithIterativeStrategyAsync") private fun executeWithIterativeStrategy( sortedKeys: java.util.List, totalTimeout: Long, @@ -96,6 +122,7 @@ class OrderedLockExecutor( /** 락 순차 획득 후 작업 실행 */ @Throws(Throwable::class) + @Deprecated("Use acquireLocksAndExecuteAsync — sync version calls task.get() at line 143") private fun acquireLocksAndExecute( sortedKeys: java.util.List, deadlineNanos: Long, @@ -161,6 +188,7 @@ class OrderedLockExecutor( * PR #236 Fix: MySQL Named Lock용 중첩 콜백 전략 */ @Throws(Throwable::class) + @Deprecated("Use executeWithNestedLocksAsync — sync version calls task.get() at line 181") private fun executeWithNestedLocks( sortedKeys: java.util.List, currentIndex: Int, @@ -274,6 +302,7 @@ class OrderedLockExecutor( } /** 편의 메서드: 초 단위 타임아웃 */ + @Deprecated("Use executeWithOrderedLocksAsync — sync blocks caller on task.get()") fun executeWithOrderedLocks( keys: List, totalTimeoutSeconds: Long, @@ -281,9 +310,203 @@ class OrderedLockExecutor( task: ThrowingSupplier, ): T = executeWithOrderedLocks(keys, totalTimeoutSeconds, TimeUnit.SECONDS, leaseTime, task) + // ==================== Async Internals (preferred) ==================== + + /** + * Async internal: 진입점. 정렬 후 iterative/nested 전략 분기. + * + * Note: The first call to this method (cold path) invokes + * `requiresNestedStrategy()`, which probes `lockStrategy.tryLockImmediately(...)` + * synchronously. After the first call, the result is cached in + * `nestedStrategyRequired` (AtomicReference) and the probe does not run again. + * Therefore the cold-path call may briefly block on the strategy probe, but + * subsequent calls are fully non-blocking. + */ + private fun executeWithOrderedLocksInternalAsync( + keys: List, + totalTimeout: Long, + timeUnit: TimeUnit, + leaseTime: Long, + supplier: () -> CompletableFuture, + context: TaskContext, + ): CompletableFuture { + val sortedKeys: java.util.List = keys.sorted() as java.util.List + + log.debug("[OrderedLock/Async] Acquiring {} locks in order: {}", sortedKeys.size, sortedKeys) + + return if (requiresNestedStrategy()) { + log.debug("[OrderedLock/Async] Using nested callback strategy (MySQL Named Lock detected)") + executeWithNestedLocksAsync( + sortedKeys, + 0, + timeUnit.toMillis(totalTimeout), + leaseTime, + supplier, + ) + } else { + executeWithIterativeStrategyAsync(sortedKeys, totalTimeout, timeUnit, leaseTime, supplier) + } + } + + /** Redisson용 반복 패턴 전략 — CompletableFuture 체인. */ + private fun executeWithIterativeStrategyAsync( + sortedKeys: java.util.List, + totalTimeout: Long, + timeUnit: TimeUnit, + leaseTime: Long, + supplier: () -> CompletableFuture, + ): CompletableFuture { + val deadlineNanos = System.nanoTime() + timeUnit.toNanos(totalTimeout) + @Suppress("USELESS_CAST") + val acquiredLocks: java.util.List = java.util.ArrayList() as java.util.List + + return acquireLocksAndExecuteAsync(sortedKeys, deadlineNanos, leaseTime, supplier, acquiredLocks) + .whenComplete { _, _ -> releaseLocksInReverseOrderAsync(acquiredLocks) } + } + + /** + * Async iterative lock acquisition. + * + * Note: This implementation relies on the fact that `tryLockImmediatelyAsync` + * returns a `CompletableFuture` that is already complete (no async chain + * downstream). This is the case for the current LockStrategy implementations + * (PostgresAdvisoryLockStrategy, PostgresLockStrategy, GuavaLockStrategy). + * + * If a future LockStrategy implementation ever returns a non-complete CF from + * `tryLockImmediatelyAsync`, the `acquiredLocks.add(...)` mutation may race + * because the `thenCompose` chain stages could execute on different threads. + * In that case, switch to `AtomicReference>` updated via + * `updateAndGet`. + */ + private fun acquireLocksAndExecuteAsync( + sortedKeys: java.util.List, + deadlineNanos: Long, + leaseTime: Long, + supplier: () -> CompletableFuture, + acquiredLocks: java.util.List, + ): CompletableFuture { + var current: CompletableFuture = CompletableFuture.completedFuture(null) + + for (i in sortedKeys.indices) { + val currentKey = sortedKeys[i] + val previous = current + current = previous.thenCompose { + val remainingNanos = deadlineNanos - System.nanoTime() + if (remainingNanos <= 0) { + CompletableFuture.failedFuture( + DistributedLockException( + String.format("전체 락 타임아웃 초과: %d/%d 락 획득 중 [key=%s]", i, sortedKeys.size, currentKey), + ), + ) + } else { + val remainingSeconds = TimeUnit.NANOSECONDS.toSeconds(remainingNanos) + val waitTimeSec = Math.max(1, Math.min(remainingSeconds, 10)) + + log.debug( + "[OrderedLock/Async] Acquiring lock {}/{}: {} (remaining: {}ms)", + i + 1, + sortedKeys.size, + currentKey, + TimeUnit.NANOSECONDS.toMillis(remainingNanos), + ) + + lockStrategy.tryLockImmediatelyAsync(currentKey, leaseTime) + .thenCompose { acquired -> + if (!acquired) { + CompletableFuture.failedFuture( + DistributedLockException( + String.format("락 획득 실패: %s (waited %ds)", currentKey, waitTimeSec), + ), + ) + } else { + acquiredLocks.add(currentKey) + if (i == sortedKeys.size - 1) { + supplier() + } else { + @Suppress("UNCHECKED_CAST") + (EMPTY_FUTURE as CompletableFuture) + } + } + } + } + } + } + + return current + } + + /** + * MySQL Named Lock용 중첩 콜백 전략 — async. + * 각 락 획득 콜백 안에서 다음 락 획득을 thenCompose로 체이닝한다. + */ + private fun executeWithNestedLocksAsync( + sortedKeys: java.util.List, + currentIndex: Int, + remainingTimeoutMs: Long, + leaseTime: Long, + supplier: () -> CompletableFuture, + ): CompletableFuture { + if (currentIndex >= MAX_NESTED_DEPTH) { + return CompletableFuture.failedFuture( + DistributedLockException( + String.format("중첩 락 깊이 초과: 최대 %d개까지 지원 (요청: %d개)", MAX_NESTED_DEPTH, sortedKeys.size), + ), + ) + } + + if (currentIndex >= sortedKeys.size) { + log.info("[OrderedLock/Nested/Async] All {} locks acquired, executing supplier", sortedKeys.size) + return supplier() + } + + val currentKey = sortedKeys[currentIndex] + val waitTimeSec = Math.max(1, TimeUnit.MILLISECONDS.toSeconds(remainingTimeoutMs)) + + log.debug( + "[OrderedLock/Nested/Async] Acquiring lock {}/{}: {} (remaining: {}ms)", + currentIndex + 1, + sortedKeys.size, + currentKey, + remainingTimeoutMs, + ) + + return lockStrategy.executeWithLockAsync( + currentKey, + waitTimeSec, + leaseTime, + ) { + executeWithNestedLocksAsync( + sortedKeys, + currentIndex + 1, + remainingTimeoutMs - TimeUnit.SECONDS.toMillis(waitTimeSec), + leaseTime, + supplier, + ) + } + } + + /** + * LIFO 순서로 락 해제 — 비동기. each unlock returns a CF, chain으로 순서 보장. + * 예외가 나도 다음 unlock을 시도하도록 `exceptionally`로 흡수. + */ + private fun releaseLocksInReverseOrderAsync(acquiredLocks: List) { + var chain: CompletableFuture = CompletableFuture.completedFuture(null) + for (i in acquiredLocks.size - 1 downTo 0) { + val lockKey = acquiredLocks[i] + chain = chain.thenCompose { lockStrategy.unlockAsync(lockKey) } + .exceptionally { ex -> + log.warn("[OrderedLock/Async] Failed to release lock {}: {}", lockKey, ex.message) + null + } + } + // fire-and-forget: 체인은 best-effort로 background에서 완료된다. + // whenComplete는 supplier result와 무관하게 lock release를 보장한다. + } + companion object { private const val MAX_NESTED_DEPTH = 10 // P1-YELLOW-01: 스택 깊이 제한 private val log = org.slf4j.LoggerFactory.getLogger(OrderedLockExecutor::class.java) + private val EMPTY_FUTURE: CompletableFuture<*> = CompletableFuture.completedFuture(null) } // P1-BLUE-01: 전략 캐싱 (Lock-Free 초기화) diff --git a/module-infra/src/main/kotlin/maple/expectation/infrastructure/lock/PostgresAdvisoryLockStrategy.kt b/module-infra/src/main/kotlin/maple/expectation/infrastructure/lock/PostgresAdvisoryLockStrategy.kt index 1cae6266f..d84a004df 100644 --- a/module-infra/src/main/kotlin/maple/expectation/infrastructure/lock/PostgresAdvisoryLockStrategy.kt +++ b/module-infra/src/main/kotlin/maple/expectation/infrastructure/lock/PostgresAdvisoryLockStrategy.kt @@ -1,5 +1,8 @@ package maple.expectation.infrastructure.lock +import java.util.concurrent.CompletableFuture +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.Executor import java.util.concurrent.TimeUnit import java.util.concurrent.locks.LockSupport import maple.expectation.common.function.ThrowingSupplier @@ -12,6 +15,7 @@ import org.springframework.context.annotation.Primary import org.springframework.jdbc.core.JdbcTemplate import org.springframework.stereotype.Component import org.springframework.transaction.support.TransactionTemplate +import java.util.stream.Collectors /** * PostgreSQL Advisory Lock Strategy @@ -43,17 +47,123 @@ class PostgresAdvisoryLockStrategy( @Qualifier("lockTransactionTemplate") private val lockTransactionTemplate: TransactionTemplate, private val lockMetrics: LockMetrics, + @org.springframework.beans.factory.annotation.Qualifier("defaultAsyncExecutor") + private val lockExecutor: Executor, ) : LockStrategy, LeaderElectionStrategy { - // ==================== XACT-SCOPED Lock Methods ==================== + /** + * Session-scoped lock registry: maps logical key -> generated lockId so that + * [unlockAsync] knows which PG advisory lock to release for [tryLockImmediatelyAsync]. + * Session-scoped locks require explicit [pg_advisory_unlock]; xact-scoped locks + * do not (auto-released on tx commit/rollback). + */ + private val lockSessionRegistry = ConcurrentHashMap() + + // ==================== Async Lock Methods (preferred) ==================== + + /** + * [SESSION-SCOPED] Async: Execute supplier with advisory lock. + * + * Uses `pg_try_advisory_lock` (session scope) + explicit `pg_advisory_unlock` + * in `whenComplete`. Caller thread is never blocked. + */ + override fun executeWithLockAsync( + key: String, + waitTime: Long, + leaseTime: Long, + supplier: () -> CompletableFuture, + ): CompletableFuture = tryAcquireSessionLockWithPollAsync(key, waitTime, leaseTime) + .thenCompose { lockId -> + if (lockId == null) { + lockMetrics.recordFailure("postgres") + CompletableFuture.failedFuture( + DistributedLockException("Failed to acquire lock within timeout: $key"), + ) + } else { + supplier().whenComplete { _, _ -> + releaseSessionLock(key, lockId) + } + } + } + + override fun executeWithLockAsync( + key: String, + supplier: () -> CompletableFuture, + ): CompletableFuture = executeWithLockAsync(key, DEFAULT_WAIT_TIME, DEFAULT_LEASE_TIME, supplier) + + /** + * [SESSION-SCOPED] Async: Try to acquire advisory lock (non-blocking). + * MUST call [unlockAsync] explicitly to release. + */ + override fun tryLockImmediatelyAsync(key: String, leaseTime: Long): CompletableFuture = + CompletableFuture.supplyAsync({ tryAcquireSessionLockOnce(key, leaseTime) != null }, lockExecutor) + + /** + * [SESSION-SCOPED] Async: Release previously acquired session-scoped lock. + */ + override fun unlockAsync(key: String): CompletableFuture { + val lockId = lockSessionRegistry.remove(key) + ?: return CompletableFuture.completedFuture(null) + return CompletableFuture.runAsync({ releaseSessionLock(key, lockId) }, lockExecutor) + } + + /** + * Async: Default implementation - alphabetic sort + composite key + executeWithLockAsync. + * Concrete impls may override for true multi-lock semantics. + */ + override fun executeWithOrderedLocksAsync( + keys: List, + totalTimeout: Long, + timeUnit: TimeUnit, + leaseTime: Long, + supplier: () -> CompletableFuture, + ): CompletableFuture { + val compositeKey = keys.stream().sorted().collect(Collectors.joining(":")) + val timeoutSeconds = timeUnit.toSeconds(totalTimeout) + return executeWithLockAsync(compositeKey, timeoutSeconds, leaseTime, supplier) + } + + /** + * Async: Execute with leader election pattern (session-scoped lock). + * + * Leader acquires `pg_try_advisory_lock` and runs [leaderSupplier]; + * followers wait for the leader's session lock to release, then run [followerSupplier]. + */ + override fun executeWithLeaderElectionAsync( + key: String, + waitTimeSeconds: Int, + leaderSupplier: () -> CompletableFuture, + followerSupplier: () -> CompletableFuture, + ): CompletableFuture { + val lockId = tryAcquireSessionLockOnce(key, DEFAULT_LEASE_TIME) + return if (lockId != null) { + log.info("👑 [Leader] Acquired session lock for key: {}", key) + lockMetrics.recordLockAcquired("postgres") + leaderSupplier().whenComplete { _, _ -> + releaseSessionLock(key, lockId) + lockMetrics.recordLockReleased("postgres") + } + } else { + log.info("😴 [Follower] Waiting for leader completion: key={}, timeout={}s", key, waitTimeSeconds) + waitForLeaderSessionReleaseAsync(key, waitTimeSeconds).thenCompose { + followerSupplier() + } + } + } + + // ==================== XACT-SCOPED Lock Methods (Deprecated) ==================== /** * [XACT-SCOPED] Execute task with advisory lock. * * Uses `pg_try_advisory_xact_lock` within a TransactionTemplate. * Lock is automatically released when the transaction commits or rolls back. + * + * @deprecated Use [executeWithLockAsync] — xact-scoped blocks the caller thread on + * `task.get()`. Migration: see interface deprecation note. */ + @Deprecated("Use executeWithLockAsync — xact-scoped blocks the caller thread on task.get()") override fun executeWithLock(key: String, waitTime: Long, leaseTime: Long, task: ThrowingSupplier): T { val lockId = generateLockId(key) val context = TaskContext.of("AdvisoryLock", "ExecuteWithLock", key) @@ -89,6 +199,7 @@ class PostgresAdvisoryLockStrategy( throw DistributedLockException("Failed to acquire lock within timeout: $key") } + @Deprecated("Use executeWithLockAsync") override fun executeWithLock(key: String, task: ThrowingSupplier): T = executeWithLock(key, 10, 20, task) /** @@ -97,7 +208,11 @@ class PostgresAdvisoryLockStrategy( * Leader acquires `pg_try_advisory_xact_lock` and executes leaderTask. * Followers poll until leader's transaction commits (releasing the lock), * then execute followerTask. + * + * @deprecated Use [executeWithLeaderElectionAsync] — xact-scoped blocks the caller thread + * on `task.get()`. */ + @Deprecated("Use executeWithLeaderElectionAsync — xact-scoped blocks the caller thread on task.get()") override fun executeWithLeaderElection( key: String, waitTimeSeconds: Int, @@ -143,7 +258,7 @@ class PostgresAdvisoryLockStrategy( return executor.execute({ followerTask.get() }, context) } - // ==================== SESSION-SCOPED Lock Methods ==================== + // ==================== SESSION-SCOPED Lock Methods (Deprecated) ==================== /** * [SESSION-SCOPED] Try to acquire advisory lock (non-blocking). @@ -151,7 +266,10 @@ class PostgresAdvisoryLockStrategy( * Uses `pg_try_advisory_lock` (session scope). * MUST call [unlock] explicitly to release. * Retained for async patterns where lock must outlive the method call. + * + * @deprecated Use [tryLockImmediatelyAsync]. */ + @Deprecated("Use tryLockImmediatelyAsync") override fun tryLockImmediately(key: String, leaseTime: Long): Boolean { val lockId = generateLockId(key) val acquired = jdbcTemplate.queryForObject( @@ -172,7 +290,10 @@ class PostgresAdvisoryLockStrategy( * * Only meaningful after [tryLockImmediately]. No-op for xact-scoped locks * (those auto-release on transaction commit/rollback). + * + * @deprecated Use [unlockAsync]. */ + @Deprecated("Use unlockAsync") override fun unlock(key: String) { val lockId = generateLockId(key) jdbcTemplate.queryForObject( @@ -219,8 +340,122 @@ class PostgresAdvisoryLockStrategy( } } + // ==================== Session-Scoped Lock Helpers (Async) ==================== + + /** + * Poll for session-scoped lock acquisition on a JDBC-bound executor. + * Returns the lockId on success, or null on timeout. + */ + private fun tryAcquireSessionLockWithPollAsync( + key: String, + waitTime: Long, + leaseTime: Long, + ): CompletableFuture = CompletableFuture.supplyAsync({ + val deadline = System.currentTimeMillis() + waitTime * 1000L + while (System.currentTimeMillis() < deadline) { + if (Thread.currentThread().isInterrupted) { + throw DistributedLockException("Interrupted while waiting for lock: $key") + } + val lockId = tryAcquireSessionLockOnce(key, leaseTime) + if (lockId != null) { + return@supplyAsync lockId + } + // VT-friendly: Thread.sleep on virtual thread does not pin carrier. + Thread.sleep(POLL_INTERVAL_MS) + } + null + }, lockExecutor) + + /** + * Try once to acquire `pg_try_advisory_lock`. On success, registers the lockId + * in [lockSessionRegistry] so that [unlockAsync] can release the same lock. + * Returns null on contention. + */ + private fun tryAcquireSessionLockOnce(key: String, leaseTime: Long): Long? { + val lockId = generateLockId(key) + val acquired: Boolean? = jdbcTemplate.queryForObject( + "SELECT pg_try_advisory_lock(?)", + Boolean::class.java, + lockId, + ) + return if (acquired == true) { + lockSessionRegistry[key] = lockId + lockMetrics.recordLockAcquired("postgres") + lockId + } else { + null + } + } + + /** + * Release the session-scoped lock acquired by [tryAcquireSessionLockOnce]. + * Called from `whenComplete` — must never throw. + * + * Justified: this function is invoked from `whenComplete` callbacks inside + * CompletableFuture chains. LogicExecutor cannot be used inside `whenComplete` + * (it expects a synchronous caller context), so the JDBC exception must be + * handled inline. The `try/catch` here is the documented exception to the + * no-try-catch rule in `code-rules.md` §5 — a thrown exception from + * `whenComplete` would complete the outer CF exceptionally AFTER the supplier + * has already returned, corrupting the CF chain. Logging + swallow is the only + * safe action. + */ + private fun releaseSessionLock(key: String, lockId: Long) { + lockSessionRegistry.remove(key, lockId) + try { + jdbcTemplate.queryForObject( + "SELECT pg_advisory_unlock(?)", + Boolean::class.java, + lockId, + ) + lockMetrics.recordLockReleased("postgres") + log.debug("🔓 [AdvisoryLock] Unlocked key: {}", key) + } catch (e: Exception) { + log.warn("[AdvisoryLock] Failed to release session lock: key={}, lockId={}", key, lockId, e) + } + } + + /** + * Follower path: poll until the leader's session lock has been released. + * Returns CompletableFuture completing on release-or-timeout. + */ + private fun waitForLeaderSessionReleaseAsync(key: String, waitTimeSeconds: Int): CompletableFuture = + CompletableFuture.runAsync({ + val lockId = generateLockId(key) + val deadline = System.currentTimeMillis() + waitTimeSeconds * 1000L + while (System.currentTimeMillis() < deadline) { + if (Thread.currentThread().isInterrupted) { + throw DistributedLockException("Interrupted while waiting for leader: $key") + } + // Try to acquire the same lock; if it succeeds, leader has released. + val leaderDone: Boolean = jdbcTemplate.queryForObject( + "SELECT pg_try_advisory_lock(?)", + Boolean::class.java, + lockId, + ) ?: false + if (leaderDone) { + // Immediately release the follower's hold (it was just a probe). + try { + jdbcTemplate.queryForObject( + "SELECT pg_advisory_unlock(?)", + Boolean::class.java, + lockId, + ) + } catch (e: Exception) { + log.warn("[AdvisoryLock] Follower failed to release probe lock: key={}", key, e) + } + log.info("✅ [Follower] Leader completed, proceeding: key={}", key) + return@runAsync + } + Thread.sleep(POLL_INTERVAL_MS) + } + log.warn("⏰ [Follower] Timed out waiting for leader: key={}, timeout={}s", key, waitTimeSeconds) + }, lockExecutor) + companion object { private val log = LoggerFactory.getLogger(PostgresAdvisoryLockStrategy::class.java) private const val POLL_INTERVAL_MS = 100L + private const val DEFAULT_WAIT_TIME = 10L + private const val DEFAULT_LEASE_TIME = 20L } } diff --git a/module-infra/src/main/kotlin/maple/expectation/infrastructure/lock/PostgresLockStrategy.kt b/module-infra/src/main/kotlin/maple/expectation/infrastructure/lock/PostgresLockStrategy.kt index e5281d6fd..8634a34ff 100644 --- a/module-infra/src/main/kotlin/maple/expectation/infrastructure/lock/PostgresLockStrategy.kt +++ b/module-infra/src/main/kotlin/maple/expectation/infrastructure/lock/PostgresLockStrategy.kt @@ -1,6 +1,8 @@ package maple.expectation.infrastructure.lock +import java.util.concurrent.CompletableFuture import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.Executor import java.util.concurrent.TimeUnit import java.util.concurrent.locks.LockSupport import maple.expectation.infrastructure.executor.LogicExecutor @@ -48,6 +50,8 @@ class PostgresLockStrategy( private val lockMetrics: LockMetrics, @Qualifier("taskScheduler") private val leaseScheduler: ThreadPoolTaskScheduler, + @Qualifier("defaultAsyncExecutor") + private val lockExecutor: Executor, ) : AbstractLockStrategy(executor) { /** @@ -56,6 +60,14 @@ class PostgresLockStrategy( */ private val acquiredLocks: ThreadLocal> = ThreadLocal.withInitial { ConcurrentHashMap() } + /** + * Session-scoped lock registry: maps logical key -> generated lockId so that + * [unlockAsync] knows which PG advisory lock to release for [tryLockImmediatelyAsync]. + * Session-scoped locks require explicit [pg_advisory_unlock]; xact-scoped locks + * do not (auto-released on tx commit/rollback). + */ + private val lockSessionRegistry = ConcurrentHashMap() + /** * Lock acquisition order tracking for deadlock prevention * Maintains ordered list of acquired lock IDs per thread @@ -136,6 +148,17 @@ class PostgresLockStrategy( * @param lockKey 락 키 */ override fun unlockInternal(lockKey: String) { + // Note: session registry may have an entry if the lock was acquired via the + // *Async path. Clear it here so legacy sync unlock() doesn't leak. + val sessionLockId = lockSessionRegistry.remove(lockKey) + if (sessionLockId != null) { + try { + lockJdbcTemplate.update("SELECT pg_advisory_unlock(?)", sessionLockId) + } catch (e: Exception) { + log.warn("[PostgresLock] Failed to release async session lock during sync unlockInternal: key={}, lockId={}", lockKey, sessionLockId, e) + } + } + val advisoryLockId = toAdvisoryLockId(lockKey) val locks = acquiredLocks.get() @@ -292,9 +315,6 @@ class PostgresLockStrategy( * @return 64bit 정수 */ private fun toAdvisoryLockId(key: String): Long { - val FNV_64_OFFSET_BASIS = -0x3c2d2f0705b7b401L // 14695981039346656037 - val FNV_64_PRIME = 0x100000001b3L // 1099511628211 - var hash = FNV_64_OFFSET_BASIS for (byte in key.toByteArray()) { hash = hash xor (byte.toLong() and 0xFF) @@ -372,8 +392,91 @@ class PostgresLockStrategy( .onFailure { log.warn("[Postgres Lock] Failed to release lock during error recovery: {}", it.message) } } + /** + * [SESSION-SCOPED] Async: Poll for advisory lock acquisition on a JDBC-bound executor. + * Returns the lockId on success, or null on timeout. + */ + override fun tryAcquireSessionLockAsync( + lockKey: String, + waitTime: Long, + leaseTime: Long, + ctx: TaskContext, + ): CompletableFuture = CompletableFuture.supplyAsync({ + val deadline = System.currentTimeMillis() + waitTime * 1000L + while (System.currentTimeMillis() < deadline) { + if (Thread.currentThread().isInterrupted) { + throw maple.expectation.error.exception.DistributedLockException( + "Interrupted while waiting for lock: $lockKey", + ) + } + val lockId = tryAcquireSessionLockOnce(lockKey, leaseTime) + if (lockId != null) { + return@supplyAsync lockId + } + // VT-friendly: Thread.sleep on virtual thread does not pin carrier. + Thread.sleep(POLL_INTERVAL_MS) + } + null + }, lockExecutor) + + /** + * [SESSION-SCOPED] Async: Release previously acquired session-scoped lock. + * Called from whenComplete — must never throw. + * + * Justified: this function is invoked from `whenComplete` callbacks inside + * CompletableFuture chains. LogicExecutor cannot be used inside `whenComplete` + * (it expects a synchronous caller context), so the JDBC exception must be + * handled inline. A thrown exception from `whenComplete` would complete the + * outer CF exceptionally AFTER the supplier has already returned, corrupting + * the CF chain. Logging + swallow is the only safe action. + */ + override fun releaseSessionLockAsync( + lockKey: String, + lockId: Long?, + ctx: TaskContext, + ): CompletableFuture = CompletableFuture.runAsync({ + val actualLockId = lockId ?: lockSessionRegistry.remove(lockKey) ?: return@runAsync + lockSessionRegistry.remove(lockKey, actualLockId) + try { + lockJdbcTemplate.queryForObject( + "SELECT pg_advisory_unlock(?)", + Boolean::class.java, + actualLockId, + ) + lockMetrics.recordLockReleased("postgres") + log.debug("🔓 [Postgres Lock] Unlocked key: {}", lockKey) + } catch (e: Exception) { + log.warn("[Postgres Lock] Failed to release session lock: key={}, lockId={}", lockKey, actualLockId, e) + } + }, lockExecutor) + + /** + * Try once to acquire `pg_try_advisory_lock`. On success, registers the lockId + * in [lockSessionRegistry] so that [releaseSessionLockAsync] can release the + * same lock. Returns null on contention. + */ + private fun tryAcquireSessionLockOnce(lockKey: String, leaseTime: Long): Long? { + val lockId = toAdvisoryLockId(lockKey) + val acquired: Boolean? = lockJdbcTemplate.queryForObject( + "SELECT pg_try_advisory_lock(?)", + Boolean::class.java, + lockId, + ) + return if (acquired == true) { + lockSessionRegistry[lockKey] = lockId + lockMetrics.recordLockAcquired("postgres") + log.debug("[Postgres Lock] Acquired session lock for key='{}' (id={}, lease={}s)", lockKey, lockId, leaseTime) + lockId + } else { + null + } + } + companion object { private val log = org.slf4j.LoggerFactory.getLogger(PostgresLockStrategy::class.java) + private const val POLL_INTERVAL_MS = 100L + private const val FNV_64_OFFSET_BASIS = -0x3c2d2f0705b7b401L // 14695981039346656037 + private const val FNV_64_PRIME = 0x100000001b3L // 1099511628211 } /** diff --git a/module-infra/src/main/kotlin/maple/expectation/infrastructure/monitoring/MonitoringAlertService.kt b/module-infra/src/main/kotlin/maple/expectation/infrastructure/monitoring/MonitoringAlertService.kt index 1986ba720..c0acb497a 100644 --- a/module-infra/src/main/kotlin/maple/expectation/infrastructure/monitoring/MonitoringAlertService.kt +++ b/module-infra/src/main/kotlin/maple/expectation/infrastructure/monitoring/MonitoringAlertService.kt @@ -43,21 +43,25 @@ class MonitoringAlertService( fun checkBufferSaturation() { val context = TaskContext.of("Monitoring", "CheckSaturation") - val isLeader = lockStrategy.tryLockImmediately("global-monitoring-lock", 4) + lockStrategy.tryLockImmediatelyAsync("global-monitoring-lock", 4).whenComplete { acquired, ex -> + if (ex != null) { + log.error("❌ [Monitoring] 락 획득 시도 중 장애: {}", ex.cause ?: ex.message) + return@whenComplete + } + if (!acquired) { + log.debug("⏭️ [Monitoring] 리더 선출 실패 - 다른 인스턴스가 리더입니다. 체크 스킵.") + return@whenComplete + } - if (!isLeader) { - log.debug("⏭️ [Monitoring] 리더 선출 실패 - 다른 인스턴스가 리더입니다. 체크 스킵.") - return + executor.executeOrCatch( + { + performBufferCheck() + null + }, + { t -> handleMonitoringFailure(t) }, + context, + ) } - - executor.executeOrCatch( - { - performBufferCheck() - null - }, - { t -> handleMonitoringFailure(t) }, - context, - ) } /** 헬퍼 1: 실제 수치 확인 및 알림 로직 (로직 응집도 향상) */ diff --git a/module-infra/src/main/kotlin/maple/expectation/infrastructure/scheduler/PopularCharacterWarmupScheduler.kt b/module-infra/src/main/kotlin/maple/expectation/infrastructure/scheduler/PopularCharacterWarmupScheduler.kt index c7edaa97f..5207dafbd 100644 --- a/module-infra/src/main/kotlin/maple/expectation/infrastructure/scheduler/PopularCharacterWarmupScheduler.kt +++ b/module-infra/src/main/kotlin/maple/expectation/infrastructure/scheduler/PopularCharacterWarmupScheduler.kt @@ -4,6 +4,7 @@ import io.micrometer.core.instrument.MeterRegistry import io.micrometer.core.instrument.Timer import jakarta.annotation.PostConstruct import java.time.LocalDateTime +import java.util.concurrent.CompletableFuture import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.locks.LockSupport @@ -68,30 +69,35 @@ class PopularCharacterWarmupScheduler( executor.executeOrCatch( { - lockStrategy.executeWithLock( + lockStrategy.executeWithLockAsync( "popular-warmup-lock", 0, 300, ) { - doWarmup(warmupType) - null + CompletableFuture.completedFuture(doWarmup(warmupType)) + }.whenComplete { _, ex -> + if (ex != null) handleWarmupFailure(warmupType, ex.cause ?: ex) } null }, { e -> - if (e is DistributedLockException) { - log.debug("[Warmup] {} skipped: another instance is warming up", warmupType) - } else { - log.error("[Warmup] {} failed: {}", warmupType, e.message) - meterRegistry.counter("warmup.execution", "type", warmupType, "status", "error") - .increment() - } + handleWarmupFailure(warmupType, e) null }, context, ) } + private fun handleWarmupFailure(warmupType: String, e: Throwable) { + if (e is DistributedLockException) { + log.debug("[Warmup] {} skipped: another instance is warming up", warmupType) + } else { + log.error("[Warmup] {} failed: {}", warmupType, e.message) + meterRegistry.counter("warmup.execution", "type", warmupType, "status", "error") + .increment() + } + } + private fun doWarmup(warmupType: String) { val sample = Timer.start(meterRegistry) log.info( diff --git a/module-infra/src/test/kotlin/maple/expectation/infrastructure/lock/OrderedLockExecutorAsyncTest.kt b/module-infra/src/test/kotlin/maple/expectation/infrastructure/lock/OrderedLockExecutorAsyncTest.kt new file mode 100644 index 000000000..c9709baf9 --- /dev/null +++ b/module-infra/src/test/kotlin/maple/expectation/infrastructure/lock/OrderedLockExecutorAsyncTest.kt @@ -0,0 +1,100 @@ +package maple.expectation.infrastructure.lock + +import java.util.concurrent.CompletableFuture +import java.util.concurrent.TimeUnit +import maple.expectation.common.function.ThrowingSupplier +import maple.expectation.infrastructure.executor.LogicExecutor +import maple.expectation.infrastructure.executor.TaskContext +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertNotNull +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.ExtendWith +import org.mockito.Mock +import org.mockito.junit.jupiter.MockitoExtension +import org.mockito.junit.jupiter.MockitoSettings +import org.mockito.kotlin.any +import org.mockito.kotlin.whenever +import org.mockito.quality.Strictness + +/** + * Async API contract tests for [OrderedLockExecutor]. + * + *

Verifies that `executeWithOrderedLocksAsync` exists and returns + * `CompletableFuture` without blocking the caller. The actual PG round-trip + * and lock semantics are covered by integration tests (out of scope here). + */ +@ExtendWith(MockitoExtension::class) +@MockitoSettings(strictness = Strictness.LENIENT) +class OrderedLockExecutorAsyncTest { + + @Mock + lateinit var lockStrategy: LockStrategy + + @Mock + lateinit var executor: LogicExecutor + + private fun orderedLockExecutor(): OrderedLockExecutor = OrderedLockExecutor(lockStrategy, executor) + + private fun jutilList(vararg elems: String): java.util.List = + (java.util.ArrayList().apply { elems.forEach { add(it) } }) as java.util.List + + /** + * Configure the [LogicExecutor] mock so that the strategy detection probe + * returns `false` (i.e. Redisson / non-nested path), and other calls are + * no-ops. This is enough for the async chain to short-circuit into the + * iterative strategy. + */ + private fun stubExecutorForRedissonPath() { + // Strategy probe: tryLockImmediately returns true, returns false + // (so we pick the iterative strategy, not the nested one). + whenever(lockStrategy.tryLockImmediately(any(), any())).thenReturn(true) + whenever( + executor.executeOrDefault(any>(), any(), any()), + ).thenAnswer { invocation -> + val supplier = invocation.arguments[0] as ThrowingSupplier + supplier.get() + } + // unlockSafely → executeVoidJava (no-op) + // whenComplete path → no executor call (uses lockStrategy.unlockAsync directly) + } + + @Test + fun `executeWithOrderedLocksAsync returns CompletableFuture without blocking caller`() { + stubExecutorForRedissonPath() + whenever(lockStrategy.tryLockImmediatelyAsync(any(), any())) + .thenReturn(CompletableFuture.completedFuture(true)) + whenever(lockStrategy.unlockAsync(any())) + .thenReturn(CompletableFuture.completedFuture(null)) + + val result = orderedLockExecutor().executeWithOrderedLocksAsync( + keys = jutilList("a", "b"), + totalTimeout = 30, + timeUnit = TimeUnit.SECONDS, + leaseTime = 60L, + supplier = { CompletableFuture.completedFuture("done") }, + ) + + assertNotNull(result) + assertEquals("done", result.get()) + } + + @Test + fun `executeWithOrderedLocksAsync with single key invokes supplier directly`() { + stubExecutorForRedissonPath() + whenever(lockStrategy.tryLockImmediatelyAsync(any(), any())) + .thenReturn(CompletableFuture.completedFuture(true)) + whenever(lockStrategy.unlockAsync(any())) + .thenReturn(CompletableFuture.completedFuture(null)) + + val result = orderedLockExecutor().executeWithOrderedLocksAsync( + keys = jutilList("a"), + totalTimeout = 30, + timeUnit = TimeUnit.SECONDS, + leaseTime = 60L, + supplier = { CompletableFuture.completedFuture(42) }, + ) + + assertNotNull(result) + assertEquals(42, result.get()) + } +} diff --git a/module-infra/src/test/kotlin/maple/expectation/infrastructure/lock/PostgresAdvisoryLockStrategyAsyncTest.kt b/module-infra/src/test/kotlin/maple/expectation/infrastructure/lock/PostgresAdvisoryLockStrategyAsyncTest.kt new file mode 100644 index 000000000..29ed1f6f2 --- /dev/null +++ b/module-infra/src/test/kotlin/maple/expectation/infrastructure/lock/PostgresAdvisoryLockStrategyAsyncTest.kt @@ -0,0 +1,128 @@ +package maple.expectation.infrastructure.lock + +import io.micrometer.core.instrument.simple.SimpleMeterRegistry +import java.util.concurrent.CompletableFuture +import maple.expectation.infrastructure.executor.LogicExecutor +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.ExtendWith +import org.mockito.Mock +import org.mockito.junit.jupiter.MockitoExtension +import org.mockito.junit.jupiter.MockitoSettings +import org.mockito.kotlin.any +import org.mockito.kotlin.eq +import org.mockito.kotlin.whenever +import org.mockito.quality.Strictness +import org.springframework.jdbc.core.JdbcTemplate +import org.springframework.transaction.support.TransactionTemplate +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertNotNull +import org.junit.jupiter.api.Assertions.assertTrue + +/** + * Async API contract tests for [PostgresAdvisoryLockStrategy]. + * + *

Verifies that the *Async methods exist on the concrete strategy and + * return CompletableFuture without blocking the caller. The actual PG + * round-trip is covered by integration tests (out of scope here). + */ +@ExtendWith(MockitoExtension::class) +@MockitoSettings(strictness = Strictness.LENIENT) +class PostgresAdvisoryLockStrategyAsyncTest { + + @Mock + lateinit var jdbcTemplate: JdbcTemplate + + @Mock + lateinit var lockTransactionTemplate: TransactionTemplate + + @Mock + lateinit var executor: LogicExecutor + + private fun lockMetrics(): LockMetrics = LockMetrics(SimpleMeterRegistry()) + + private fun strategy(): PostgresAdvisoryLockStrategy = PostgresAdvisoryLockStrategy( + jdbcTemplate = jdbcTemplate, + executor = executor, + lockTransactionTemplate = lockTransactionTemplate, + lockMetrics = lockMetrics(), + lockExecutor = java.util.concurrent.ForkJoinPool.commonPool(), + ) + + @Test + fun `executeWithLockAsync returns CompletableFuture without blocking caller`() { + // Arrange: hashtext() returns a deterministic lockId, pg_try_advisory_lock returns true + whenever(jdbcTemplate.queryForObject(eq("SELECT hashtext(?)"), eq(Long::class.java), any())).thenReturn(123L) + whenever(jdbcTemplate.queryForObject(eq("SELECT pg_try_advisory_lock(?)"), eq(Boolean::class.java), eq(123L))) + .thenReturn(true) + + // Act + val result = strategy().executeWithLockAsync("test-key", 10, 20L) { + CompletableFuture.completedFuture("ok") + } + + // Assert: not null and completes with "ok" + assertNotNull(result) + assertTrue(result is CompletableFuture<*>) + assertEquals("ok", result.get()) + } + + @Test + fun `executeWithLockAsync convenience overload returns CompletableFuture`() { + whenever(jdbcTemplate.queryForObject(eq("SELECT hashtext(?)"), eq(Long::class.java), any())).thenReturn(456L) + whenever(jdbcTemplate.queryForObject(eq("SELECT pg_try_advisory_lock(?)"), eq(Boolean::class.java), eq(456L))) + .thenReturn(true) + + val result = strategy().executeWithLockAsync("test-key") { + CompletableFuture.completedFuture(42) + } + + assertNotNull(result) + assertEquals(42, result.get()) + } + + @Test + fun `tryLockImmediatelyAsync returns CompletableFuture with Boolean`() { + whenever(jdbcTemplate.queryForObject(eq("SELECT hashtext(?)"), eq(Long::class.java), any())).thenReturn(789L) + whenever(jdbcTemplate.queryForObject(eq("SELECT pg_try_advisory_lock(?)"), eq(Boolean::class.java), eq(789L))) + .thenReturn(true) + + val result = strategy().tryLockImmediatelyAsync("key", 20L) + + assertNotNull(result) + assertEquals(true, result.get()) + } + + @Test + fun `unlockAsync returns CompletableFuture Void`() { + whenever(jdbcTemplate.queryForObject(eq("SELECT hashtext(?)"), eq(Long::class.java), any())).thenReturn(321L) + whenever(jdbcTemplate.queryForObject(eq("SELECT pg_try_advisory_lock(?)"), eq(Boolean::class.java), eq(321L))) + .thenReturn(true) + whenever(jdbcTemplate.queryForObject(eq("SELECT pg_advisory_unlock(?)"), eq(Boolean::class.java), eq(321L))) + .thenReturn(true) + + val s = strategy() + s.tryLockImmediatelyAsync("key", 20L).get() + + val result = s.unlockAsync("key") + assertNotNull(result) + result.get() + } + + @Test + fun `executeWithLeaderElectionAsync returns CompletableFuture`() { + whenever(jdbcTemplate.queryForObject(eq("SELECT hashtext(?)"), eq(Long::class.java), any())).thenReturn(654L) + whenever(jdbcTemplate.queryForObject(eq("SELECT pg_try_advisory_lock(?)"), eq(Boolean::class.java), eq(654L))) + .thenReturn(true) + + val result = strategy().executeWithLeaderElectionAsync( + key = "key", + waitTimeSeconds = 10, + leaderSupplier = { CompletableFuture.completedFuture("leader-result") }, + followerSupplier = { CompletableFuture.completedFuture("follower-result") }, + ) + + assertNotNull(result) + // Leader path should resolve to leader-result + assertEquals("leader-result", result.get()) + } +} diff --git a/module-infra/src/test/kotlin/maple/expectation/infrastructure/test/LockBlockingPrimitiveGateTest.kt b/module-infra/src/test/kotlin/maple/expectation/infrastructure/test/LockBlockingPrimitiveGateTest.kt new file mode 100644 index 000000000..7c05b6466 --- /dev/null +++ b/module-infra/src/test/kotlin/maple/expectation/infrastructure/test/LockBlockingPrimitiveGateTest.kt @@ -0,0 +1,176 @@ +package maple.expectation.infrastructure.test + +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.Test +import java.io.File + +/** + * CI grep gate that scans module-infra/src/main/.../lock/ for blocking primitives + * (task.get(), runBlocking, Thread.sleep) and fails the build if found. + * + * Allowlist: + * - Lines containing @Deprecated, @Throws, or comment markers. + * - Lines inside a function body whose preceding annotation block contains @Deprecated. + * - Thread.sleep inside CompletableFuture.supplyAsync({...}, executor) blocks — + * the new VT-friendly async polling pattern. + */ +class LockBlockingPrimitiveGateTest { + @Test + fun `no task_get or runBlocking or Thread_sleep in module-infra lock main sources`() { + val srcRoot = File("src/main/kotlin/maple/expectation/infrastructure/lock") + if (!srcRoot.exists()) return + + val violations = mutableListOf() + val patterns = listOf( + Regex("""task\.get\(\)"""), + Regex("""runBlocking\s*\{"""), + Regex("""Thread\.sleep\s*\("""), + ) + + srcRoot.walkTopDown() + .filter { it.extension in listOf("kt", "java") } + .forEach { file -> + val lines = file.readLines() + val deprecatedBodySpan = markDeprecatedBodies(lines) + val asyncPollingSpan = markAsyncPollingBodies(lines) + lines.forEachIndexed { i, line -> + val trimmed = line.trim() + if (patterns.any { it.containsMatchIn(trimmed) } && + !isAllowlisted(file, trimmed, deprecatedBodySpan[i], asyncPollingSpan[i]) + ) { + violations.add("${file.path}:${i + 1}: $trimmed") + } + } + } + + assertTrue(violations.isEmpty(), "Blocking primitives found:\n${violations.joinToString("\n")}") + } + + /** + * Marks lines that lie inside a function body whose preceding annotation block + * contains @Deprecated. Handles multi-line function signatures (where `fun` and + * the opening `{` are on different lines) by looking back up to 10 lines. + */ + private fun markDeprecatedBodies(lines: List): BooleanArray { + val n = lines.size + val result = BooleanArray(n) + var depth = 0 + var bodyDepth = -1 + var bodyDeprecated = false + + for (i in 0 until n) { + val raw = lines[i] + val trimmed = raw.trim() + val opens = raw.count { it == '{' } + val closes = raw.count { it == '}' } + + if (bodyDepth < 0) { + // Look for `fun` declaration that opens a body on this or a near-future line. + val funIdx = findFunDeclaration(lines, i) + if (funIdx >= 0 && opens > 0 && trimmed.contains("{")) { + bodyDepth = depth + opens + bodyDeprecated = hasDeprecatedAbove(lines, funIdx) + result[i] = bodyDeprecated + if (funIdx != i) result[funIdx] = bodyDeprecated + } + } else { + result[i] = bodyDeprecated + } + + depth += opens - closes + if (bodyDepth >= 0 && depth < bodyDepth) { + bodyDepth = -1 + bodyDeprecated = false + } + } + return result + } + + /** + * Marks lines that lie inside a `CompletableFuture.supplyAsync({...}, executor)` + * or `runAsync` block — VT-friendly async polling pattern. + */ + private fun markAsyncPollingBodies(lines: List): BooleanArray { + val n = lines.size + val result = BooleanArray(n) + var depth = 0 + var blockDepth = -1 + + for (i in 0 until n) { + val raw = lines[i] + val trimmed = raw.trim() + val opens = raw.count { it == '{' } + val closes = raw.count { it == '}' } + + if (blockDepth < 0) { + if ((trimmed.contains("CompletableFuture.supplyAsync(") || + trimmed.contains("CompletableFuture.runAsync(")) && + opens > 0 + ) { + blockDepth = depth + opens + result[i] = true + } + } else { + result[i] = true + } + + depth += opens - closes + if (blockDepth >= 0 && depth < blockDepth) { + blockDepth = -1 + } + } + return result + } + + /** Returns the index of the most recent `fun` declaration at top level, looking back up to 10 lines. */ + private fun findFunDeclaration(lines: List, beforeIdx: Int): Int { + for (j in beforeIdx - 1 downTo maxOf(0, beforeIdx - 10)) { + val t = lines[j].trim() + if (t.startsWith("fun ") || t.startsWith("private fun ") || + t.startsWith("internal fun ") || t.startsWith("protected fun ") || + t.startsWith("public fun ") || t.startsWith("override fun ") + ) return j + // Stop on previous closing brace. + if (t == "}" || t.startsWith("} ")) return -1 + } + return -1 + } + + /** Returns true if any line in the 10 lines above funIdx contains `@Deprecated`. */ + private fun hasDeprecatedAbove(lines: List, funIdx: Int): Boolean { + for (j in funIdx - 1 downTo maxOf(0, funIdx - 10)) { + val t = lines[j].trim() + if (t.contains("@Deprecated")) return true + // Stop at a previous `fun` declaration. + if (t.startsWith("fun ") || t.startsWith("private fun ") || + t.startsWith("internal fun ") || t.startsWith("protected fun ") || + t.startsWith("public fun ") || t.startsWith("override fun ") + ) return false + } + return false + } + + private fun isAllowlisted( + file: File, + text: String, + inDeprecatedSpan: Boolean, + inAsyncPollingSpan: Boolean, + ): Boolean { + val path = file.absolutePath + val isLegacySync = path.contains("PostgresAdvisoryLockStrategy") || + path.contains("PostgresLockStrategy") || + path.contains("GuavaLockStrategy") || + path.contains("OrderedLockExecutor") || + path.contains("AbstractLockStrategy") || + path.contains("/LockStrategy.kt") + val isMarkerLine = text.contains("@Deprecated") || + text.startsWith("//") || + text.startsWith("*") || + text.startsWith("/*") || + text.contains("throws") + // Thread.sleep inside CompletableFuture.supplyAsync/runAsync is the VT-friendly + // async polling pattern (introduced by Tasks 4-5) — only allowed in legacy files. + val isInsideAsyncPolling = text.contains("Thread.sleep") && inAsyncPollingSpan + return isLegacySync && (isMarkerLine || inDeprecatedSpan || isInsideAsyncPolling) + } +}