fix(concurrency): fail-fast page fan-out that keeps exception identity#30
Closed
P4suta wants to merge 8 commits into
Closed
fix(concurrency): fail-fast page fan-out that keeps exception identity#30P4suta wants to merge 8 commits into
P4suta wants to merge 8 commits into
Conversation
Nothing in the pipeline measured where a run's time went: ProgressEvents
carry no timestamps and the stage logs no durations, so optimization
work had no baseline to argue against. This adds the measurement layer:
- --timings: a StageTimingSink (composed in the CLI shell) prints a
stable, machine-parseable per-stage breakdown to stderr when a run
ends ("timing: <stage> = <seconds>s (<percent>%)"), including the
still-open stage on failure.
- PipelineRunner logs each stage directory's byte total, making the
intermediate I/O of every stage visible.
- benchPipeline: a Gradle task driving the installDist launcher with
--timings (PipelineBenchmark, test sources), measuring E2E wall, the
per-stage medians, peak RSS via /proc VmHWM, and output size, over a
-Pjobs sweep; writes pipeline/docs/perf-baseline.md.
- createSampleScan: a deterministic synthetic 600-dpi A5 scan book
(specks for despeckle, ±0.5° skew for deskew) so the benchmark needs
no copyrighted input and stays comparable across machines.
Baseline on the 200-page fixture (8 CPUs): conv 14.48s at -j8 —
despeckle 68%, register 22.6%, extract 7.9%, spread 1.5% — and a
3.44x scale-up from -j1, recorded in pipeline/docs/perf-baseline.md.
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
pdfimages -tiff decodes every embedded G4 image into an uncompressed TIFF (~2.2 MB per 600-dpi page; ~434 MB of transient intermediates for a 200-page book) even though the typical self-scanned source is CCITT G4 end to end. (The originally planned `-tiffcompression g4` flag does not exist on pdfimages — it is a pdftoppm option.) The extractor now picks its mode from one pdfimages -list pass: when every embedded image is 1-bpp CCITT, each chunk dumps the raw G4 streams (-ccitt) and CcittTiffs wraps them verbatim into single-strip CCITT-G4 TIFFs — a pure remux: no decode/re-encode, intermediates drop ~60x, and the image's true ppi is stamped instead of pdfimages' default 72 dpi. Because PDF's EncodedByteAlign never reaches the dumped .params file, every wrapped page is decoded back once through Leptonica as verification; a chunk that deviates in any way (params shape, count, or a wrap that fails to decode) is re-extracted decoded, which is also the whole-run mode for any non-CCITT source. The photometric mapping (-B -> WhiteIsZero, -W -> BlackIsZero) is pinned empirically by a pixel-identical round trip test. Extraction chunks also shrink from total/jobs to ~12 pages (capped at 4*jobs): fast finishers free their pool slot early, and a future streaming source can consume pages chunk by chunk. Benchmark (200-page fixture, warm median of 3, vs the PR #28 baseline): extract 1.15s -> 0.46s at -j8 (4.57s -> 0.88s at -j1), conv 49.85s -> 45.98s (-7.8%) at -j1, intermediates ~434 MB -> ~7 MB. Output validated with qpdf --check (100 spreads, linearized, no errors). Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
The shared Tasks.awaitAll (and its two private copies in despeckle/ register) had three real defects: - No fail-fast: invokeAll waits for every page even after the first failure — a corrupt page at position 1 of 600 still ran the other 599. - Error kinds were destroyed at the pool boundary: a worker's domain exception (DespeckleException, RegisterException — RuntimeExceptions carrying their ErrorKind) was wrapped into a generic IOException, so ExceptionMapper saw INTERNAL instead of the real kind: wrong exit code, wrong RunFailed token in the webapp's SSE stream. - ProcessRunner leaked the child on interrupt: an InterruptedException from waitFor propagated without destroyForcibly, leaving the drainer close() hanging on a live child's pipes. Tasks is rewritten around a batch-owned executor (try-with-resources) with StructuredTaskScope.join() semantics on final-Java features: fail-fast with sibling interruption plus a gate that stops queued tasks a freed worker dequeues in the shutdownNow race window, quiescence before the failure propagates (so finally-deletes never race in-flight writers), and exception identity preserved (IOException/ RuntimeException unchanged, UncheckedIOException unwrapped). Two scheduling modes: Workers.platform(jobs) for CPU-bound Leptonica/FFM work (long downcalls pin virtual threads' carriers) and Workers.virtual(maxInFlight) for subprocess waits (pdfimages chunks, per-page jbig2), semaphore-bounded. ItemProgress now fires on the orchestrating thread in completion order, so per-page progress events are strictly ordered and the AtomicInteger counters at every call site are gone. All seven call sites migrate (G4EncodeStage, DespeckleService, RegistrationService both passes, PdfExtractSource, the shared extractor/assembler, Jbig2PackService, both PdfPipelineServices — whose redundant outer pools, idle through the whole middle step, are removed); the ExecutorService parameter disappears from the PdfImageExtractor/Jbig2Assembler ports; the dead NativeTools.awaitAll copies (zero callers) are deleted. pdfbook's Main also gains a shutdown hook that interrupts the run and waits (bounded 15s) for it to unwind, so Ctrl-C now kills the children, quiesces the workers, and deletes the p4suta-pipeline- work area instead of leaking it. Verified in-container: SIGINT to the process group aborts the run (exit 130), no work dir remains. Benchmark: no regression (conv 14.29s vs 14.23s at -j8 on the 200-page fixture, within noise). Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
The CI spell check flags PDFBox's COSName.DECODE_PARMS (the PDF spec's own key name, which the remux test must name verbatim) and a hyphenated coinage in the extractor's javadoc. Allowlist the spec identifier — the same precedent as the veraPDF en-GB names — and use plain words. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
The test released its gate from inside the ok tasks, leaving a window where the failing task's completion could overtake an ok task's still-being-enqueued completion (countDown happens inside call(), the queue add after it returns) — the recorded sequence then read [1] instead of [1, 2]. The gate now opens from the progress callback on the orchestrating thread, so both successes are provably consumed before the failure is thrown. Also renames `failer` to `failing` for the spell check. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Squash merges orphan the stack's ancestry, so the benchmark documents (regenerated on every bench run) collide as add/add between this branch and main. Align them to main's version; the round's closing PR commits the final regenerated baselines, so no information is lost from the final state. The measured numbers this PR contributed remain in its commit message and PR description. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Owner
Author
|
Re-filed as a fresh branch off main (squash merges orphaned the stacked ancestry, conflicting on files later PRs evolved); content is identical. |
This was referenced Jun 10, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
The defects (all verified in code)
Tasks.awaitAll(and its two private copies in despeckle/register) usedinvokeAll, which waits for every page even after the first failure — a corrupt page at position 1 of 600 still ran the other 599.DespeckleException/RegisterException—RuntimeExceptions carrying theirErrorKind) was wrapped into a genericIOException, soExceptionMappersawINTERNALinstead of the real kind: wrong exit code, wrongRunFailedtoken in the webapp's SSE stream. (despeckle's copy even wrappedIOExceptions, losing the page name.)ProcessRunnerleaked the child on interrupt:InterruptedExceptionfromwaitForpropagated withoutdestroyForcibly, leaving the drainerclose()hanging on a live child's pipes.finallyran on SIGINT, so the wholep4suta-pipeline-temp tree of intermediates remained.The new substrate (
shared/process Tasks)Batch-owned executor (try-with-resources) with
StructuredTaskScope.join()semantics on final-Java features (preview stays off; a future swap to JEP 505 is internal-only):shutdownNowrace window (caught by the test that pinned this race).close()joins every worker, so the callers'finally-deletes (PipelineRunner,RegistrationServicescratch) can never race in-flight writers.IOException/RuntimeExceptionunchanged,UncheckedIOExceptionunwrapped — kinds and exit codes are now correct on page failures.Workers.platform(jobs)for CPU-bound Leptonica/FFM work (long downcalls pin virtual threads' carriers — virtual would silently cap-jat core count),Workers.virtual(maxInFlight)for subprocess waits (pdfimages chunks, per-page jbig2), semaphore-bounded.ItemProgressfires on the orchestrating thread in completion order — per-page progress is now strictly ordered, and theAtomicIntegercounter at every call site is gone.Migration (all 7 sites)
G4EncodeStage,DespeckleService,RegistrationService(both passes),PdfExtractSource, shared extractor/assembler (ports lose theExecutorServiceparameter),Jbig2PackService, bothPdfPipelineServices — whose redundant outer pools that sat idle through the whole middle step are removed. The deadNativeTools.awaitAllcopies (zero callers) are deleted.Ctrl-C cleanup
Maininstalls a shutdown hook that interrupts the run and waits (bounded, 15s) for it to unwind; a latch makes the normal-exit path return immediately. Verified in-container: SIGINT to the process group aborts the run (exit 130,Error[INTERNAL]reported), children killed, no work dir remains. (Note for testers: signaling a non-job-control background launcher doesn't deliver SIGINT — the JVM honors an inherited SIG_IGN; a real terminal Ctrl-C delivers to the foreground process group.)Tests
TasksTest: 12 deterministic, latch-coordinated tests — submission order, orchestrator-thread ordered progress, exception identity (assertSame) for IO/Runtime/UncheckedIO, label-wrapped unknown checked, fail-fast + sibling interrupt + queued-skip, quiescence (in-flight counter is 0 when the throw reaches the caller), caller interruption (flag restored, workers joined), virtual-mode in-flight bound, limit validation.ProcessRunnerTest: interrupted waiter kills the child and unwinds promptly (bounded join is the proof; pre-fix this hung on the drainers)../gradlew checkgreen; benchmark shows no regression (conv 14.29s vs 14.23s at-j8).🤖 Generated with Claude Code