Skip to content

feat(infra): PGMQ workers — processAsync CF<ProcessOutcome>, no .join#1305

Merged
zbnerd merged 13 commits into
developfrom
feature/pgmq-process-async
Jun 18, 2026
Merged

feat(infra): PGMQ workers — processAsync CF<ProcessOutcome>, no .join#1305
zbnerd merged 13 commits into
developfrom
feature/pgmq-process-async

Conversation

@zbnerd

@zbnerd zbnerd commented Jun 18, 2026

Copy link
Copy Markdown
Owner

Summary

Adds async-returning processAsync(): CF<ProcessOutcome> to PGMQ workers. Eliminates 8 blocking sites (.join() × 4, runBlocking × 3, Task.join() × 1) across 6 PGMQ workers + ResultReadyProjectionWorker + PgmqWorker.processSequentialBatch. Sync process(): Boolean kept @Deprecated for module-app legacy.

Spec / Audit

  • Audit: docs/05_Reports/2026-06-18-blocking-audit.md
  • Plan: docs/superpowers/plans/2026-06-18-pgmq-process-async.md

Decisions (grill-me Q1-Q8)

  • Q1=A: 6 PGMQ workers (ExternalApiWorker, CalculationWorker, CalculationRequestedWorker, CalculationCompletedWorker, DonationWorker, NexonFanOutWorker) + ResultReadyProjectionWorker
  • Q2=A: sealed class ProcessOutcome (Ack | Nack(retryable, visibilityReset) | DeadLetter)
  • Q3=A: add processAsync() + sync @Deprecated — revised in execution: processAsync is open with default impl (not abstract) to keep build green; sync process() carries @Deprecated shim
  • Q4=A: @scheduled poll loop sync, async chain internal via .whenComplete
  • Q5=A: parallel via CompletableFuture.allOf bounded by workerPool/cpuExecutor
  • Q6=A: ResultReadyProjectionWorker in scope (2 .join() + 1 runBlocking)
  • Q7=B: topic subscribers (OcidResolveWorker, NexonApiWorker) out of scope — different framework (MQTopicGroup)
  • Q8=A: Nack.visibilityReset: Duration? enables per-cancel redelivery timing

Changes (12 commits)

module-infra/pgmq/

  • ProcessOutcome.kt: new sealed class — Ack | Nack(retryable, visibilityReset) | DeadLetter
  • PgmqWorker.kt: added open fun processAsync() with default impl; marked process() @Deprecated; migrated processSequentialBatch from runBlocking to CompletableFuture.allOf + per-message supplyAsync

module-infra/worker/

  • ExternalApiWorker: 3 sites eliminated. processAsync chains pipeline.processAsync().thenCompose(publisher).thenApply(Ack).exceptionally(Nack). CPU section migrated from runBlocking(Dispatchers.Default) to CompletableFuture.supplyAsync(expectationComputeCpuExecutor).
  • CalculationWorker: 1 site eliminated. processAsync chains expectationPort.calculateExpectationAsync().handle { value, ex -> Ack/Nack/DeadLetter }.
  • CalculationRequestedWorker: processAsync wraps process() in supplyAsync(cpuExecutor). Pure sync — no internal blocks.
  • CalculationCompletedWorker: same pattern.
  • DonationWorker: same pattern.
  • NexonFanOutWorker: same pattern. onProcessingFailed kept UNCHANGED — still needed for legacy process() path until base class orchestrator is updated (follow-up PR).
  • ResultReadyProjectionWorker: 3 sites eliminated. projectPgmqBatch returns CompletableFuture<Void>; .join() replaced with .thenCombine; runBlocking(Dispatchers.Default) replaced with CompletableFuture.allOf + per-message supplyAsync.

Test infra

  • 7 new test files: *AsyncTest.kt for each migrated worker + PgmqWorkerProcessAsyncTest + ProcessOutcomeTest
  • All tests use AssertJ (NOT kotlin.test — module-infra's build.gradle doesn't declare kotlin-test)
  • All *Async tests use the callProcessAsync/callProjectPgmqBatch internal bridge pattern to access protected methods
  • All tests use inline anonymous LogicExecutor impl to avoid NPE from mock<LogicExecutor>() (which would return null and NPE on Boolean unbox inside the CompletableFuture.supplyAsync lambda)

CI gate

New test PgmqBlockingPrimitiveGateTest greps module-infra/{pgmq,worker}/ for .join(), runBlocking, Task.join(), Thread.sleep. Allowlist for @Deprecated sync compat (line-level). ExternalApiWorker.kt + OcidResolveWorker.kt .join() allowlisted (documented keep-sites per audit + inline Javadoc ADRs).

Verification

  • ./gradlew compileKotlin compileJava --continue — clean
  • ./gradlew test (excluding module-cleanup which has a pre-existing compile error on develop unrelated to this PR) — 1216 PASSED, 0 FAILED
  • CI grep gate: 0 violations (PgmqBlockingPrimitiveGateTest PASSED)
  • (Load test SKIPPED per user direction; Sub-PR 1 followed the same pattern)

Out of scope (follow-up PR)

  • worker/OcidResolveWorker.kt and NexonApiWorker.kt (topic subscribers, different framework — Q7=B)
  • module-app legacy process(): Boolean users
  • Base class orchestrator update: PgmqWorker.processSingleMessage still calls process(): Boolean and onProcessingFailed. The full migration to honor ProcessOutcome.Nack.visibilityReset requires updating the base class to use processAsync(). Tracked as follow-up.

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 00a746f888

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines +156 to +157
handleFailure(jobId, cause)
ProcessOutcome.Nack(retryable = true)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Preserve successful retry scheduling as an ack

When an external API failure reaches this branch and handleFailure() successfully schedules a replacement retry message via retryExternalApiJob() (or marks the job failed), the previous process() path returned that Boolean so the current PGMQ message was archived. This now discards the result and always returns Nack, so while message.readCount is still retryable the base worker leaves the original message to reappear even though a replacement was already enqueued, causing duplicate external-api work for the same job on transient failures.

Useful? React with 👍 / 👎.

@zbnerd zbnerd merged commit e0f156c into develop Jun 18, 2026
@zbnerd zbnerd deleted the feature/pgmq-process-async branch June 18, 2026 21:18
@zbnerd

zbnerd commented Jun 19, 2026

Copy link
Copy Markdown
Owner Author

@codex review

@chatgpt-codex-connector

Copy link
Copy Markdown

You have reached your Codex usage limits for code reviews. You can see your limits in the Codex usage dashboard.
To continue using code reviews, you can upgrade your account or add credits to your account and enable them for code reviews in your settings.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant