diff --git a/despeckle/application/build.gradle.kts b/despeckle/application/build.gradle.kts index a6f1593..165103b 100644 --- a/despeckle/application/build.gradle.kts +++ b/despeckle/application/build.gradle.kts @@ -25,6 +25,10 @@ dependencies { // implementations are no longer duplicated here. Static calls returning java.base types only, so // implementation (not api) is the right scope. implementation(project(":shared:io")) + // Tasks.awaitAll(Workers...): the shared fail-fast page fan-out (batch-owned executor, + // sibling interruption, quiescence before the failure propagates) DespeckleService runs its + // per-page work on. + implementation(project(":shared:process")) implementation(libs.slf4j.api) } diff --git a/despeckle/application/src/main/java/io/github/p4suta/despeckle/application/DespeckleService.java b/despeckle/application/src/main/java/io/github/p4suta/despeckle/application/DespeckleService.java index 062e974..034f2e2 100644 --- a/despeckle/application/src/main/java/io/github/p4suta/despeckle/application/DespeckleService.java +++ b/despeckle/application/src/main/java/io/github/p4suta/despeckle/application/DespeckleService.java @@ -9,18 +9,13 @@ import io.github.p4suta.shared.io.CorpusFiles; import io.github.p4suta.shared.io.OutputDirs; import io.github.p4suta.shared.kernel.PageProgressListener; +import io.github.p4suta.shared.process.Tasks; import java.io.IOException; -import java.io.UncheckedIOException; import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicInteger; import org.jspecify.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -88,8 +83,8 @@ public Summary run(Config config) throws IOException { /** * Execute a run, reporting each finished page to {@code progress}. * - * @param progress called once per page as it completes (1-based count, total page count); may - * be invoked from the worker threads, so it must be thread-safe + * @param progress called once per page as it completes (1-based count, total page count), + * always on the calling thread and in order */ public Summary run(Config config, PageProgressListener progress) throws IOException { OutputDirs.prepare(config.outputDir(), config.force()); @@ -106,27 +101,25 @@ public Summary run(Config config, PageProgressListener progress) throws IOExcept ? Reporter.noOp() : reporterFactory.create(config.reportDir(), config.flipbook()); - AtomicInteger done = new AtomicInteger(); - List outcomes; - ExecutorService pool = Executors.newFixedThreadPool(config.jobs()); - try { - List> tasks = new ArrayList<>(files.size()); - for (Path src : files) { - tasks.add( - () -> { - PageOutcome outcome = processOne(src, config, report); - int n = done.incrementAndGet(); - progress.onPage(n, files.size()); - if (n % PROGRESS_EVERY == 0 || n == files.size()) { - LOG.info("{}/{}", n, files.size()); + List> tasks = new ArrayList<>(files.size()); + for (Path src : files) { + tasks.add(() -> processOne(src, config, report)); + } + // Platform workers: each page is CPU-bound Leptonica work (FFM downcalls pin virtual + // threads' carriers). The fan-out fails fast and quiesces before throwing, so a failed + // run never leaves workers writing into the output directory. Progress and the human + // log arrive on this thread, ordered. + List outcomes = + Tasks.awaitAll( + Tasks.Workers.platform(config.jobs()), + tasks, + "despeckle", + (done, total) -> { + progress.onPage(done, total); + if (done % PROGRESS_EVERY == 0 || done == total) { + LOG.info("{}/{}", done, total); } - return outcome; }); - } - outcomes = invokeAll(pool, tasks); - } finally { - pool.shutdown(); - } report.finish(); @@ -152,7 +145,7 @@ public Summary run(Config config, PageProgressListener progress) throws IOExcept private record PageOutcome(Path source, ProcessResult result) {} - private PageOutcome processOne(Path src, Config config, Reporter report) { + private PageOutcome processOne(Path src, Config config, Reporter report) throws IOException { Path dest = CorpusFiles.mirrorDestination( src, config.inputDir(), config.outputDir(), config.format().extension()); @@ -166,30 +159,9 @@ private PageOutcome processOne(Path src, Config config, Reporter report) { report.addPage(stem, src, dest, result); return new PageOutcome(src, result); } catch (IOException e) { - throw new UncheckedIOException("failed to process " + src, e); - } - } - - private static List invokeAll( - ExecutorService pool, List> tasks) throws IOException { - List> futures; - try { - futures = pool.invokeAll(tasks); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IOException("despeckle run interrupted", e); - } - List outcomes = new ArrayList<>(futures.size()); - for (Future future : futures) { - try { - outcomes.add(future.get()); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IOException("despeckle run interrupted", e); - } catch (ExecutionException e) { - throw new IOException("page processing failed", e.getCause()); - } + // Re-throw with the page named: Tasks surfaces a task's IOException unchanged, so + // this context reaches the user instead of being lost in a generic wrapper. + throw new IOException("failed to process " + src, e); } - return outcomes; } } diff --git a/despeckle/application/src/main/java/io/github/p4suta/despeckle/application/Jbig2PackService.java b/despeckle/application/src/main/java/io/github/p4suta/despeckle/application/Jbig2PackService.java index c82f4cb..665badd 100644 --- a/despeckle/application/src/main/java/io/github/p4suta/despeckle/application/Jbig2PackService.java +++ b/despeckle/application/src/main/java/io/github/p4suta/despeckle/application/Jbig2PackService.java @@ -9,8 +9,6 @@ import java.nio.file.Path; import java.util.Comparator; import java.util.OptionalInt; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.stream.Stream; import org.jspecify.annotations.Nullable; import org.slf4j.Logger; @@ -77,17 +75,15 @@ public void run(Config config) throws IOException { } Path jb2Dir = createWorkDir(config.outPdf()); - ExecutorService pool = Executors.newFixedThreadPool(config.jobs()); try { assembler.assemble( config.imageDir(), config.outPdf(), config.source(), config.dpi(), - pool, + config.jobs(), jb2Dir); } finally { - pool.shutdown(); deleteRecursively(jb2Dir); } linearizer.linearize(config.outPdf()); diff --git a/despeckle/application/src/main/java/io/github/p4suta/despeckle/application/PdfPipelineService.java b/despeckle/application/src/main/java/io/github/p4suta/despeckle/application/PdfPipelineService.java index e90a508..16357c6 100644 --- a/despeckle/application/src/main/java/io/github/p4suta/despeckle/application/PdfPipelineService.java +++ b/despeckle/application/src/main/java/io/github/p4suta/despeckle/application/PdfPipelineService.java @@ -12,8 +12,6 @@ import java.nio.file.Path; import java.util.Comparator; import java.util.OptionalInt; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.stream.Stream; import org.jspecify.annotations.Nullable; import org.slf4j.Logger; @@ -96,41 +94,37 @@ public DespeckleService.Summary run(Config config) throws IOException { Path extracted = Files.createDirectories(work.resolve("in")); Path cleaned = Files.createDirectories(work.resolve("clean")); Path jbig2Dir = Files.createDirectories(work.resolve("jb2")); - ExecutorService pool = Executors.newFixedThreadPool(config.jobs()); - DespeckleService.Summary summary; - try { - int dpi = - config.options().dpi().isPresent() - ? config.options().dpi().getAsInt() - : extractor.dominantDpi(config.inputPdf()); - LOG.info( - "pipeline: {} -> {} at {} dpi", config.inputPdf(), config.outputPdf(), dpi); + int dpi = + config.options().dpi().isPresent() + ? config.options().dpi().getAsInt() + : extractor.dominantDpi(config.inputPdf()); + LOG.info("pipeline: {} -> {} at {} dpi", config.inputPdf(), config.outputPdf(), dpi); - extractor.extract(config.inputPdf(), extracted, config.jobs(), pool); + // Each step fans out on its own batch-owned workers bounded by the same jobs budget, + // so the steps never hold idle threads for each other (the old shared outer pool sat + // idle through the whole despeckle step). + extractor.extract(config.inputPdf(), extracted, config.jobs()); - DespeckleService.Config clean = - new DespeckleService.Config( - extracted, - cleaned, - OutputFormat.TIFF, - "*.tif", - config.jobs(), - true, - config.options().withDpi(dpi), - config.reportDir(), - config.flipbook()); - summary = despeckleService.run(clean); + DespeckleService.Config clean = + new DespeckleService.Config( + extracted, + cleaned, + OutputFormat.TIFF, + "*.tif", + config.jobs(), + true, + config.options().withDpi(dpi), + config.reportDir(), + config.flipbook()); + DespeckleService.Summary summary = despeckleService.run(clean); - assembler.assemble( - cleaned, - config.outputPdf(), - config.inputPdf(), - OptionalInt.of(dpi), - pool, - jbig2Dir); - } finally { - pool.shutdown(); - } + assembler.assemble( + cleaned, + config.outputPdf(), + config.inputPdf(), + OptionalInt.of(dpi), + config.jobs(), + jbig2Dir); linearizer.linearize(config.outputPdf()); LOG.info("wrote {}", config.outputPdf()); return summary; diff --git a/despeckle/application/src/test/java/io/github/p4suta/despeckle/application/Fakes.java b/despeckle/application/src/test/java/io/github/p4suta/despeckle/application/Fakes.java index a970181..aaf9f49 100644 --- a/despeckle/application/src/test/java/io/github/p4suta/despeckle/application/Fakes.java +++ b/despeckle/application/src/test/java/io/github/p4suta/despeckle/application/Fakes.java @@ -17,7 +17,6 @@ import java.nio.file.Path; import java.util.List; import java.util.OptionalInt; -import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; import org.jspecify.annotations.Nullable; @@ -111,8 +110,7 @@ public int dominantDpi(Path pdf) { } @Override - public void extract(Path pdf, Path outDir, int jobs, ExecutorService pool) - throws IOException { + public void extract(Path pdf, Path outDir, int jobs) throws IOException { if (failOn != null && pdf.getFileName().toString().contains(failOn)) { throw new IOException("fake extract failed for " + pdf); } @@ -135,7 +133,7 @@ public void assemble( Path outPdf, @Nullable Path source, OptionalInt forcedDpi, - ExecutorService pool, + int jobs, Path scratchDir) throws IOException { Files.writeString(outPdf, "%PDF-fake", StandardCharsets.UTF_8); diff --git a/despeckle/infrastructure/src/main/java/io/github/p4suta/despeckle/infrastructure/pdf/PdfBoxJbig2Assembler.java b/despeckle/infrastructure/src/main/java/io/github/p4suta/despeckle/infrastructure/pdf/PdfBoxJbig2Assembler.java index 907e663..a1a7a7b 100644 --- a/despeckle/infrastructure/src/main/java/io/github/p4suta/despeckle/infrastructure/pdf/PdfBoxJbig2Assembler.java +++ b/despeckle/infrastructure/src/main/java/io/github/p4suta/despeckle/infrastructure/pdf/PdfBoxJbig2Assembler.java @@ -4,7 +4,6 @@ import java.io.IOException; import java.nio.file.Path; import java.util.OptionalInt; -import java.util.concurrent.ExecutorService; import org.jspecify.annotations.Nullable; /** @@ -34,7 +33,7 @@ public PdfBoxJbig2Assembler() {} * @param outPdf the lossless-JBIG2 PDF to write * @param source a PDF whose Info dict, XMP and version are inherited, or {@code null} for none * @param forcedDpi a single DPI to size every page with, or empty to read each image's own - * @param pool the worker pool the per-page {@code jbig2} encodes run on + * @param jobs how many {@code jbig2} children may encode at once * @param scratchDir scratch directory for the intermediate per-page JBIG2 streams * (caller-owned) * @throws IOException if the directory is empty, a tool fails, or the write fails @@ -45,9 +44,9 @@ public void assemble( Path outPdf, @Nullable Path source, OptionalInt forcedDpi, - ExecutorService pool, + int jobs, Path scratchDir) throws IOException { - delegate.assemble(imageDir, outPdf, source, forcedDpi, pool, scratchDir); + delegate.assemble(imageDir, outPdf, source, forcedDpi, jobs, scratchDir); } } diff --git a/despeckle/infrastructure/src/main/java/io/github/p4suta/despeckle/infrastructure/pdf/PdfImagesCliExtractor.java b/despeckle/infrastructure/src/main/java/io/github/p4suta/despeckle/infrastructure/pdf/PdfImagesCliExtractor.java index 044a175..ab186b5 100644 --- a/despeckle/infrastructure/src/main/java/io/github/p4suta/despeckle/infrastructure/pdf/PdfImagesCliExtractor.java +++ b/despeckle/infrastructure/src/main/java/io/github/p4suta/despeckle/infrastructure/pdf/PdfImagesCliExtractor.java @@ -3,7 +3,6 @@ import io.github.p4suta.despeckle.port.PdfImageExtractor; import java.io.IOException; import java.nio.file.Path; -import java.util.concurrent.ExecutorService; /** * Despeckle's {@link PdfImageExtractor} adapter — a wrapper over the shared {@code @@ -34,10 +33,10 @@ public int dominantDpi(Path pdf) throws IOException { /** * Extract all pages of {@code pdf} into {@code outDir} as TIFFs, parallelized over page-range - * chunks. {@code jobs} bounds both the chunk count and the pool slots used. + * chunks. at most {@code jobs} chunks run at once. */ @Override - public void extract(Path pdf, Path outDir, int jobs, ExecutorService pool) throws IOException { - delegate.extract(pdf, outDir, jobs, pool); + public void extract(Path pdf, Path outDir, int jobs) throws IOException { + delegate.extract(pdf, outDir, jobs); } } diff --git a/despeckle/infrastructure/src/main/java/io/github/p4suta/despeckle/infrastructure/process/NativeTools.java b/despeckle/infrastructure/src/main/java/io/github/p4suta/despeckle/infrastructure/process/NativeTools.java index 5812478..dfdf3c5 100644 --- a/despeckle/infrastructure/src/main/java/io/github/p4suta/despeckle/infrastructure/process/NativeTools.java +++ b/despeckle/infrastructure/src/main/java/io/github/p4suta/despeckle/infrastructure/process/NativeTools.java @@ -6,12 +6,7 @@ import java.io.IOException; import java.io.InputStream; import java.nio.file.Path; -import java.util.ArrayList; import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; /** @@ -26,10 +21,10 @@ *

The launch helpers stay local rather than routing through {@code :shared:process}'s {@code * ProcessRunner}/{@code Tasks}: {@link #capture} returns the raw {@code byte[]} a JBIG2 stream * needs (the shared runner decodes to a UTF-8 {@code String}, which is lossy for binary output), - * and {@link #run}/{@link #awaitAll} raise/propagate the tagged {@link DespeckleException} kind - * (the shared {@code Tasks} flattens any non-{@code IOException} cause, losing the kind and its - * exit code). The {@code qpdf} call site, whose output is text and whose exit-3 tolerance the - * shared runner models directly, uses {@code ProcessRunner} instead. + * and {@link #run} raises/propagates the tagged {@link DespeckleException} kind (the shared {@code + * Tasks} flattens any non-{@code IOException} cause, losing the kind and its exit code). The {@code + * qpdf} call site, whose output is text and whose exit-3 tolerance the shared runner models + * directly, uses {@code ProcessRunner} instead. * *

Public so the sibling {@code infrastructure.pdf} adapters can call it; it never leaves the * {@code :infrastructure} module (the application and CLI layers depend only on the {@code :port} @@ -121,41 +116,4 @@ private static void awaitExit(Process process, List command, long timeou null); } } - - /** - * Run every task on {@code pool}, in submission order, surfacing the first failure as - * IOException. - */ - public static List awaitAll(ExecutorService pool, List> tasks) - throws IOException { - List> futures; - try { - futures = pool.invokeAll(tasks); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IOException("pipeline interrupted", e); - } - List results = new ArrayList<>(futures.size()); - for (Future future : futures) { - try { - results.add(future.get()); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IOException("pipeline interrupted", e); - } catch (ExecutionException e) { - Throwable cause = e.getCause(); - // Preserve a tagged domain failure (e.g. NATIVE_TOOL_FAILED / IMAGE_UNREADABLE - // raised inside a worker) so its kind — and exit code — survives the pool boundary - // instead of being flattened to a generic IOException. - if (cause instanceof DespeckleException de) { - throw de; - } - if (cause instanceof IOException io) { - throw io; - } - throw new IOException("pipeline task failed", cause); - } - } - return results; - } } diff --git a/despeckle/port/src/main/java/io/github/p4suta/despeckle/port/Jbig2Assembler.java b/despeckle/port/src/main/java/io/github/p4suta/despeckle/port/Jbig2Assembler.java index 7db4260..a7549e5 100644 --- a/despeckle/port/src/main/java/io/github/p4suta/despeckle/port/Jbig2Assembler.java +++ b/despeckle/port/src/main/java/io/github/p4suta/despeckle/port/Jbig2Assembler.java @@ -3,13 +3,12 @@ import java.io.IOException; import java.nio.file.Path; import java.util.OptionalInt; -import java.util.concurrent.ExecutorService; import org.jspecify.annotations.Nullable; /** * Packs a directory of cleaned bitonal pages into a lossless-JBIG2 PDF. The abstraction over the * {@code jbig2} encoder + PDFBox container; the implementation ({@code - * infrastructure.pdf.PdfBoxJbig2Assembler}) runs the per-page encodes on the supplied pool. + * infrastructure.pdf.PdfBoxJbig2Assembler}) runs at most {@code jobs} per-page encodes at once. */ public interface Jbig2Assembler { @@ -20,7 +19,7 @@ public interface Jbig2Assembler { * @param outPdf the lossless-JBIG2 PDF to write * @param source a PDF whose Info dict, XMP and version are inherited, or {@code null} for none * @param forcedDpi a single DPI to size every page with, or empty to read each image's own - * @param pool the worker pool the per-page encodes run on + * @param jobs how many per-page encodes may run at once * @param scratchDir scratch directory for the intermediate per-page JBIG2 streams * (caller-owned) * @throws IOException if the directory is empty, a tool fails, or the write fails @@ -30,7 +29,7 @@ void assemble( Path outPdf, @Nullable Path source, OptionalInt forcedDpi, - ExecutorService pool, + int jobs, Path scratchDir) throws IOException; } diff --git a/despeckle/port/src/main/java/io/github/p4suta/despeckle/port/PdfImageExtractor.java b/despeckle/port/src/main/java/io/github/p4suta/despeckle/port/PdfImageExtractor.java index 77a4969..f80348b 100644 --- a/despeckle/port/src/main/java/io/github/p4suta/despeckle/port/PdfImageExtractor.java +++ b/despeckle/port/src/main/java/io/github/p4suta/despeckle/port/PdfImageExtractor.java @@ -2,13 +2,12 @@ import java.io.IOException; import java.nio.file.Path; -import java.util.concurrent.ExecutorService; /** * Extracts a PDF's embedded bitonal images, and reports its dominant scan resolution. The * abstraction over the {@code pdfimages}/{@code pdfinfo} drivers; the implementation ({@code - * infrastructure.pdf.PdfImagesCliExtractor}) splits the page range across the supplied pool, one - * {@code pdfimages} invocation per chunk. + * infrastructure.pdf.PdfImagesCliExtractor}) splits the page range into bounded chunks, one {@code + * pdfimages} invocation per chunk. */ public interface PdfImageExtractor { @@ -17,7 +16,7 @@ public interface PdfImageExtractor { /** * Extract all pages of {@code pdf} into {@code outDir} as TIFFs, parallelized over page-range - * chunks. {@code jobs} bounds both the chunk count and the pool slots used. + * chunks. at most {@code jobs} chunks run at once. */ - void extract(Path pdf, Path outDir, int jobs, ExecutorService pool) throws IOException; + void extract(Path pdf, Path outDir, int jobs) throws IOException; } diff --git a/despeckle/port/src/main/java/io/github/p4suta/despeckle/port/package-info.java b/despeckle/port/src/main/java/io/github/p4suta/despeckle/port/package-info.java index 4c8bdf4..23f5510 100644 --- a/despeckle/port/src/main/java/io/github/p4suta/despeckle/port/package-info.java +++ b/despeckle/port/src/main/java/io/github/p4suta/despeckle/port/package-info.java @@ -2,9 +2,9 @@ * The despeckle ports: the narrow interfaces the application layer drives and the infrastructure * layer implements. Every signature is expressed purely in terms of {@code domain.model} value * types and JDK types ({@link java.nio.file.Path}, primitives, {@link java.util.OptionalInt}, - * {@link java.util.List}, {@link java.util.concurrent.ExecutorService}, {@link - * java.io.IOException}); no Leptonica {@code Pix}, PDFBox or AWT type ever crosses a port boundary, - * so the dependency is kept unidirectional and the adapters stay swappable. + * {@link java.util.List}, {@link java.io.IOException}); no Leptonica {@code Pix}, PDFBox or AWT + * type ever crosses a port boundary, so the dependency is kept unidirectional and the adapters stay + * swappable. */ @NullMarked package io.github.p4suta.despeckle.port; diff --git a/pipeline/app/src/main/java/io/github/p4suta/pipeline/Main.java b/pipeline/app/src/main/java/io/github/p4suta/pipeline/Main.java index 67b1db9..81ccc68 100644 --- a/pipeline/app/src/main/java/io/github/p4suta/pipeline/Main.java +++ b/pipeline/app/src/main/java/io/github/p4suta/pipeline/Main.java @@ -2,20 +2,62 @@ import io.github.p4suta.pipeline.cli.PipelineCommand; import io.github.p4suta.shared.observability.FatalUncaughtHandler; +import java.time.Duration; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; /** Process entry point for the unified pipeline tool ({@code pdfbook}). */ public final class Main { + /** + * How long the shutdown hook waits for the interrupted run to unwind. Generous: the page + * fan-outs quiesce in at most one page's worth of native work per worker, and the extractor + * kills its children on interrupt. + */ + private static final Duration SHUTDOWN_GRACE = Duration.ofSeconds(15); + private Main() {} /** - * Installs the fatal uncaught-exception handler, runs the CLI, and translates the result into a - * process exit code. All argument parsing and terminal output live in {@link PipelineCommand}. + * Installs the fatal uncaught-exception handler and the Ctrl-C cleanup hook, runs the CLI, and + * translates the result into a process exit code. All argument parsing and terminal output live + * in {@link PipelineCommand}. + * + *

On Ctrl-C (or any normal-termination signal) the shutdown hook interrupts the main thread + * and waits — bounded by {@link #SHUTDOWN_GRACE} — for the run to unwind: the page fan-outs + * stop their workers and quiesce, subprocesses are killed, and the run's {@code finally} blocks + * delete the temp work area. A Ctrl-C therefore no longer leaks a {@code p4suta-pipeline-} + * directory full of intermediates. On a normal exit the latch is already down, so the hook + * returns immediately. * * @param args command-line arguments */ public static void main(String[] args) { Thread.setDefaultUncaughtExceptionHandler(new FatalUncaughtHandler()); - System.exit(new PipelineCommand().run(args)); + CountDownLatch unwound = new CountDownLatch(1); + Thread main = Thread.currentThread(); + Runtime.getRuntime() + .addShutdownHook( + new Thread( + () -> { + main.interrupt(); + try { + boolean clean = + unwound.await( + SHUTDOWN_GRACE.toMillis(), + TimeUnit.MILLISECONDS); + if (!clean) { + System.err.println( + "pdfbook: shutdown grace expired; temp files" + + " may remain"); + } + } catch (InterruptedException ignored) { + // The JVM is exiting anyway; stop waiting. + } + }, + "pdfbook-shutdown")); + int code = new PipelineCommand().run(args); + unwound.countDown(); + System.exit(code); } } diff --git a/pipeline/infrastructure/src/main/java/io/github/p4suta/pipeline/infrastructure/G4EncodeStage.java b/pipeline/infrastructure/src/main/java/io/github/p4suta/pipeline/infrastructure/G4EncodeStage.java index bb3ad78..4fea0dc 100644 --- a/pipeline/infrastructure/src/main/java/io/github/p4suta/pipeline/infrastructure/G4EncodeStage.java +++ b/pipeline/infrastructure/src/main/java/io/github/p4suta/pipeline/infrastructure/G4EncodeStage.java @@ -12,9 +12,6 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicInteger; /** * The G4-normalization {@link Stage}: re-encodes each extracted page as single-strip CCITT G4 TIFF @@ -57,29 +54,26 @@ public String name() { @Override public Corpus apply(Corpus input, Path workDir) throws IOException { List pages = CorpusFiles.collect(input.dir(), input.glob()); - AtomicInteger done = new AtomicInteger(); - ExecutorService pool = Executors.newFixedThreadPool(jobs); - try { - List> tasks = new ArrayList<>(pages.size()); - for (Path src : pages) { - tasks.add( - () -> { - Path out = - CorpusFiles.mirrorDestination(src, input.dir(), workDir, "tif"); - try (Pix page = Pix.read(src)) { - page.setResolution(input.dpi()); - page.writeTiffG4(out); - } - progress.emit( - new ProgressEvent.PageProcessed( - name(), done.incrementAndGet(), pages.size())); - return null; - }); - } - Tasks.awaitAll(pool, tasks, "G4 encode interrupted", "G4 encode failed"); - } finally { - pool.shutdown(); + List> tasks = new ArrayList<>(pages.size()); + for (Path src : pages) { + tasks.add( + () -> { + Path out = CorpusFiles.mirrorDestination(src, input.dir(), workDir, "tif"); + try (Pix page = Pix.read(src)) { + page.setResolution(input.dpi()); + page.writeTiffG4(out); + } + return null; + }); } + // Platform workers: each task is a Leptonica decode + G4 encode (CPU-bound FFM downcalls, + // which would pin virtual threads' carriers). Progress arrives on this thread, ordered. + Tasks.awaitAll( + Tasks.Workers.platform(jobs), + tasks, + "G4 encode", + (done, total) -> + progress.emit(new ProgressEvent.PageProcessed(name(), done, total))); return input.movedTo(workDir, OUTPUT_GLOB); } } diff --git a/pipeline/infrastructure/src/main/java/io/github/p4suta/pipeline/infrastructure/PdfExtractSource.java b/pipeline/infrastructure/src/main/java/io/github/p4suta/pipeline/infrastructure/PdfExtractSource.java index 1a11456..c40694d 100644 --- a/pipeline/infrastructure/src/main/java/io/github/p4suta/pipeline/infrastructure/PdfExtractSource.java +++ b/pipeline/infrastructure/src/main/java/io/github/p4suta/pipeline/infrastructure/PdfExtractSource.java @@ -7,8 +7,6 @@ import java.nio.file.DirectoryStream; import java.nio.file.Files; import java.nio.file.Path; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; /** * The pipeline {@link Source}: extracts a scan PDF's bitonal pages once, via the shared {@code @@ -45,12 +43,7 @@ public String name() { @Override public Corpus open(Path workDir) throws IOException { int dpi = extractor.dominantDpi(sourcePdf); - ExecutorService pool = Executors.newFixedThreadPool(jobs); - try { - extractor.extract(sourcePdf, workDir, jobs, pool); - } finally { - pool.shutdown(); - } + extractor.extract(sourcePdf, workDir, jobs); return new Corpus(workDir, EXTRACT_GLOB, dpi, count(workDir, EXTRACT_GLOB)); } diff --git a/pipeline/infrastructure/src/test/java/io/github/p4suta/pipeline/infrastructure/G4EncodeStageTest.java b/pipeline/infrastructure/src/test/java/io/github/p4suta/pipeline/infrastructure/G4EncodeStageTest.java index 4ee3222..16a149c 100644 --- a/pipeline/infrastructure/src/test/java/io/github/p4suta/pipeline/infrastructure/G4EncodeStageTest.java +++ b/pipeline/infrastructure/src/test/java/io/github/p4suta/pipeline/infrastructure/G4EncodeStageTest.java @@ -1,7 +1,7 @@ package io.github.p4suta.pipeline.infrastructure; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatIOException; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import io.github.p4suta.pipeline.domain.Corpus; import io.github.p4suta.shared.imaging.Pix; @@ -79,15 +79,17 @@ void reEncodesUncompressedTiffsAsCcittG4() throws Exception { } @Test - void surfacesAnUnreadablePageAsIoException() throws Exception { + void surfacesAnUnreadablePageWithItsOriginalException() throws Exception { Path inputDir = Files.createDirectories(tmp.resolve("in")); Path workDir = Files.createDirectories(tmp.resolve("out")); Files.writeString(inputDir.resolve("page-000.tif"), "not a tiff"); Corpus input = new Corpus(inputDir, "*.tif", DPI, 1); - assertThatIOException() - .isThrownBy(() -> new G4EncodeStage(2).apply(input, workDir)) - .withMessage("G4 encode failed"); + // The fan-out preserves the failure's identity (no generic IOException wrapper), so the + // shared exception mapper still sees the original type and the message names the page. + assertThatThrownBy(() -> new G4EncodeStage(2).apply(input, workDir)) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining("could not read image"); } /** diff --git a/register/application/build.gradle.kts b/register/application/build.gradle.kts index 631d833..11a5a77 100644 --- a/register/application/build.gradle.kts +++ b/register/application/build.gradle.kts @@ -25,6 +25,10 @@ dependencies { // caller-supplied extension). Register passes its own glob/OutputFormat.extension() so this layer // imports neither the matcher nor the OutputFormat enum. implementation(project(":shared:io")) + // Tasks.awaitAll(Workers...): the shared fail-fast page fan-out (batch-owned executor, + // sibling interruption, quiescence before the failure propagates) both registration passes + // run their per-page work on. + implementation(project(":shared:process")) implementation(libs.slf4j.api) } diff --git a/register/application/src/main/java/io/github/p4suta/register/application/PdfPipelineService.java b/register/application/src/main/java/io/github/p4suta/register/application/PdfPipelineService.java index 0a826b4..6672142 100644 --- a/register/application/src/main/java/io/github/p4suta/register/application/PdfPipelineService.java +++ b/register/application/src/main/java/io/github/p4suta/register/application/PdfPipelineService.java @@ -11,8 +11,6 @@ import java.nio.file.Path; import java.util.Comparator; import java.util.OptionalInt; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.stream.Stream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,40 +78,37 @@ public void run(Config config) throws IOException { Path extracted = Files.createDirectories(work.resolve("in")); Path registered = Files.createDirectories(work.resolve("reg")); Path jbig2Dir = Files.createDirectories(work.resolve("jb2")); - ExecutorService pool = Executors.newFixedThreadPool(config.jobs()); - try { - int dpi = - config.options().dpi().isPresent() - ? config.options().dpi().getAsInt() - : extractor.dominantDpi(config.inputPdf()); - LOG.info( - "pipeline: {} -> {} at {} dpi", config.inputPdf(), config.outputPdf(), dpi); + int dpi = + config.options().dpi().isPresent() + ? config.options().dpi().getAsInt() + : extractor.dominantDpi(config.inputPdf()); + LOG.info("pipeline: {} -> {} at {} dpi", config.inputPdf(), config.outputPdf(), dpi); - extractor.extract(config.inputPdf(), extracted, config.jobs(), pool); + // Each step fans out on its own batch-owned workers bounded by the same jobs budget, + // so the steps never hold idle threads for each other (the old shared outer pool sat + // idle through the whole registration step). + extractor.extract(config.inputPdf(), extracted, config.jobs()); - RegistrationService.Config registration = - new RegistrationService.Config( - extracted, - registered, - OutputFormat.TIFF, - "*.tif", - config.jobs(), - true, - withDpi(config.options(), dpi), - null, - false); - registrationService.run(registration); + RegistrationService.Config registration = + new RegistrationService.Config( + extracted, + registered, + OutputFormat.TIFF, + "*.tif", + config.jobs(), + true, + withDpi(config.options(), dpi), + null, + false); + registrationService.run(registration); - assembler.assemble( - registered, - config.outputPdf(), - config.inputPdf(), - OptionalInt.of(dpi), - pool, - jbig2Dir); - } finally { - pool.shutdown(); - } + assembler.assemble( + registered, + config.outputPdf(), + config.inputPdf(), + OptionalInt.of(dpi), + config.jobs(), + jbig2Dir); LOG.info("wrote {}", config.outputPdf()); } finally { deleteRecursively(work); diff --git a/register/application/src/main/java/io/github/p4suta/register/application/RegistrationService.java b/register/application/src/main/java/io/github/p4suta/register/application/RegistrationService.java index a6210c3..378a6a1 100644 --- a/register/application/src/main/java/io/github/p4suta/register/application/RegistrationService.java +++ b/register/application/src/main/java/io/github/p4suta/register/application/RegistrationService.java @@ -17,6 +17,7 @@ import io.github.p4suta.shared.io.OutputDirs; import io.github.p4suta.shared.kernel.Medians; import io.github.p4suta.shared.kernel.PageProgressListener; +import io.github.p4suta.shared.process.Tasks; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; @@ -26,11 +27,6 @@ import java.util.Locale; import java.util.OptionalInt; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; import org.jspecify.annotations.Nullable; import org.slf4j.Logger; @@ -109,8 +105,8 @@ public Summary run(Config requested) throws IOException { * * @param requested run configuration (its empty {@code --dpi} is resolved from the inputs * before rendering) - * @param progress called once per finished page in each pass (1-based count over {@code 2N}); - * invoked from the worker threads, so it must be thread-safe + * @param progress called once per finished page in each pass (1-based count over {@code 2N}), + * always on the calling thread and in order * @throws IOException on filesystem failure */ public Summary run(Config requested, PageProgressListener progress) throws IOException { @@ -130,7 +126,6 @@ public Summary run(Config requested, PageProgressListener progress) throws IOExc // Per-page progress spans both passes (analyze 1..N, render N+1..2N), so each PageProcessed // the composing stage emits carries this 2N denominator. int progressTotal = files.size() * 2; - AtomicInteger pageProgress = new AtomicInteger(); Path diagDir = config.diagDir(); boolean recordDiagnostics = diagDir != null; @@ -140,21 +135,15 @@ public Summary run(Config requested, PageProgressListener progress) throws IOExc : Reporter.noOp(); // The analysis pass writes each deskewed page here; the render pass reads it back, so the - // costly deskew and detection run once per page, not again in pass two. Removed at the end. + // costly deskew and detection run once per page, not again in pass two. Removed at the + // end — safely: each pass's fan-out quiesces before returning or throwing, so no worker + // is still writing into the scratch when the finally-delete runs. Path scratchDir = createBeside(config.outputDir(), ".register-deskewed-"); - ExecutorService pool = Executors.newFixedThreadPool(config.jobs()); try { // Pass 1: deskew once, detect, cache the deskewed page for pass two List pages = analyzePass( - files, - scratchDir, - config, - recordDiagnostics, - pool, - progress, - pageProgress, - progressTotal); + files, scratchDir, config, recordDiagnostics, progress, progressTotal); List observations = toObservations(pages); int analyzed = observations.size(); @@ -189,9 +178,7 @@ public Summary run(Config requested, PageProgressListener progress) throws IOExc config, recordDiagnostics, reporter, - pool, progress, - pageProgress, progressTotal); if (recordDiagnostics) { @@ -221,7 +208,6 @@ public Summary run(Config requested, PageProgressListener progress) throws IOExc analyzed); return new Summary(files.size(), analyzed); } finally { - pool.shutdown(); deleteRecursively(scratchDir); } } @@ -235,9 +221,7 @@ private List analyzePass( Path scratchDir, Config config, boolean recordDiagnostics, - ExecutorService pool, PageProgressListener progress, - AtomicInteger pageProgress, int progressTotal) throws IOException { List> tasks = new ArrayList<>(files.size()); @@ -246,18 +230,23 @@ private List analyzePass( Path src = files.get(i); Path scratch = scratchDir.resolve(String.format(Locale.ROOT, "%06d.tif", index)); tasks.add( - () -> { - PageAnalysis analysis = - pageRegistrar.analyze( - src, scratch, config.options(), recordDiagnostics); - AnalyzedPage analyzed = - new AnalyzedPage(index, Parity.of(index), src, scratch, analysis); - // The analyze pass occupies the first N of the 2N progress units. - progress.onPage(pageProgress.incrementAndGet(), progressTotal); - return analyzed; - }); + () -> + new AnalyzedPage( + index, + Parity.of(index), + src, + scratch, + pageRegistrar.analyze( + src, scratch, config.options(), recordDiagnostics))); } - return awaitAll(pool, tasks); + // Platform workers: deskew + detection are CPU-bound Leptonica work (FFM downcalls pin + // virtual threads' carriers). The analyze pass occupies the first N of the 2N progress + // units; the callback arrives on this thread, ordered. + return Tasks.awaitAll( + Tasks.Workers.platform(config.jobs()), + tasks, + "register analyze", + (done, total) -> progress.onPage(done, progressTotal)); } /** The detected main-column boxes from pass 1, one per page that had a detectable column. */ @@ -286,36 +275,27 @@ private List renderPass( Config config, boolean recordDiagnostics, Reporter reporter, - ExecutorService pool, PageProgressListener progress, - AtomicInteger pageProgress, int progressTotal) throws IOException { - AtomicInteger done = new AtomicInteger(); - int total = pages.size(); - List> tasks = new ArrayList<>(total); + List> tasks = new ArrayList<>(pages.size()); for (AnalyzedPage page : pages) { tasks.add( - () -> { - Path dest = - renderOne( - page, - reference, - canvas, - config, - recordDiagnostics, - reporter); - int n = done.incrementAndGet(); - // The render pass occupies the second N of the 2N progress units; the local - // count drives the human log at its own N denominator. - progress.onPage(pageProgress.incrementAndGet(), progressTotal); - if (n % PROGRESS_EVERY == 0 || n == total) { - LOG.info("{}/{}", n, total); - } - return dest; - }); + () -> renderOne(page, reference, canvas, config, recordDiagnostics, reporter)); } - return awaitAll(pool, tasks); + // Platform workers: placement is CPU-bound Leptonica work. The render pass occupies the + // second N of the 2N progress units; the local count drives the human log at its own N + // denominator. Both arrive on this thread, ordered. + return Tasks.awaitAll( + Tasks.Workers.platform(config.jobs()), + tasks, + "register render", + (done, total) -> { + progress.onPage(pages.size() + done, progressTotal); + if (done % PROGRESS_EVERY == 0 || done == total) { + LOG.info("{}/{}", done, total); + } + }); } /** @@ -431,38 +411,6 @@ private Path renderOne( return dest; } - /** - * Run every task on {@code pool} in submission order, surfacing the first failure: a task's own - * {@link IOException} is re-thrown unchanged, any other failure is wrapped, and an interruption - * restores the interrupt flag. - */ - private static List awaitAll(ExecutorService pool, List> tasks) - throws IOException { - List> futures; - try { - futures = pool.invokeAll(tasks); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IOException("register run interrupted", e); - } - List results = new ArrayList<>(futures.size()); - for (Future future : futures) { - try { - results.add(future.get()); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IOException("register run interrupted", e); - } catch (ExecutionException e) { - Throwable cause = e.getCause(); - if (cause instanceof IOException io) { - throw io; - } - throw new IOException("page processing failed", cause); - } - } - return results; - } - /** * Create a temporary directory beside {@code sibling} (under its parent), so the scratch space * shares a filesystem with the output it serves. Falls back to the working directory when diff --git a/register/application/src/test/java/io/github/p4suta/register/application/Fakes.java b/register/application/src/test/java/io/github/p4suta/register/application/Fakes.java index 28d7ad4..08f2a4e 100644 --- a/register/application/src/test/java/io/github/p4suta/register/application/Fakes.java +++ b/register/application/src/test/java/io/github/p4suta/register/application/Fakes.java @@ -23,7 +23,6 @@ import java.util.List; import java.util.Optional; import java.util.OptionalInt; -import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; import org.jspecify.annotations.Nullable; @@ -194,8 +193,7 @@ public int dominantDpi(Path pdf) { } @Override - public void extract(Path pdf, Path outDir, int jobs, ExecutorService pool) - throws IOException { + public void extract(Path pdf, Path outDir, int jobs) throws IOException { extractCalls.incrementAndGet(); if (failOn != null && pdf.getFileName().toString().contains(failOn)) { throw new IOException("fake extract failed for " + pdf); @@ -220,7 +218,7 @@ public void assemble( Path outPdf, @Nullable Path source, OptionalInt forcedDpi, - ExecutorService pool, + int jobs, Path scratchDir) throws IOException { lastForcedDpi = forcedDpi; diff --git a/register/application/src/test/java/io/github/p4suta/register/application/RegistrationServiceTest.java b/register/application/src/test/java/io/github/p4suta/register/application/RegistrationServiceTest.java index cb028db..5b16165 100644 --- a/register/application/src/test/java/io/github/p4suta/register/application/RegistrationServiceTest.java +++ b/register/application/src/test/java/io/github/p4suta/register/application/RegistrationServiceTest.java @@ -181,11 +181,12 @@ void writesDiagnosticsWhenADiagDirIsGiven(@TempDir Path tmp) throws IOException } @Test - void propagatesAWorkerFailureAsAnIoException(@TempDir Path tmp) throws IOException { + void propagatesAWorkerFailureWithItsIdentity(@TempDir Path tmp) throws IOException { Path in = Files.createDirectories(tmp.resolve("in")); Path out = tmp.resolve("out"); writePages(in, 2); - // A registrar whose analyze pass throws: the service must surface it, not swallow it. + // A registrar whose analyze pass throws: the service must surface the original exception + // unchanged (the fan-out preserves identity, so domain kinds reach the exception mapper). PageRegistrar failing = new FakePageRegistrar(true, 600, 1000, 1500) { @Override @@ -201,7 +202,7 @@ public PageAnalysis analyze( new RegistrationService(failing, new RecordingReporterFactory()); assertThrows( - IOException.class, + IllegalStateException.class, () -> service.run( config( diff --git a/register/infrastructure/src/main/java/io/github/p4suta/register/infrastructure/pdf/PdfBoxJbig2Assembler.java b/register/infrastructure/src/main/java/io/github/p4suta/register/infrastructure/pdf/PdfBoxJbig2Assembler.java index a5a1411..85224b2 100644 --- a/register/infrastructure/src/main/java/io/github/p4suta/register/infrastructure/pdf/PdfBoxJbig2Assembler.java +++ b/register/infrastructure/src/main/java/io/github/p4suta/register/infrastructure/pdf/PdfBoxJbig2Assembler.java @@ -4,7 +4,6 @@ import java.io.IOException; import java.nio.file.Path; import java.util.OptionalInt; -import java.util.concurrent.ExecutorService; import org.jspecify.annotations.Nullable; /** @@ -40,9 +39,9 @@ public void assemble( Path outPdf, @Nullable Path source, OptionalInt forcedDpi, - ExecutorService pool, + int jobs, Path scratchDir) throws IOException { - delegate.assemble(imageDir, outPdf, source, forcedDpi, pool, scratchDir); + delegate.assemble(imageDir, outPdf, source, forcedDpi, jobs, scratchDir); } } diff --git a/register/infrastructure/src/main/java/io/github/p4suta/register/infrastructure/pdf/PdfImagesCliExtractor.java b/register/infrastructure/src/main/java/io/github/p4suta/register/infrastructure/pdf/PdfImagesCliExtractor.java index 231229a..07c55e5 100644 --- a/register/infrastructure/src/main/java/io/github/p4suta/register/infrastructure/pdf/PdfImagesCliExtractor.java +++ b/register/infrastructure/src/main/java/io/github/p4suta/register/infrastructure/pdf/PdfImagesCliExtractor.java @@ -3,7 +3,6 @@ import io.github.p4suta.register.port.PdfImageExtractor; import java.io.IOException; import java.nio.file.Path; -import java.util.concurrent.ExecutorService; /** * Register's {@link PdfImageExtractor} adapter: a thin binding onto the cross-app {@link @@ -43,10 +42,10 @@ public int dominantDpi(Path pdf) throws IOException { /** * Extract all pages of {@code pdf} into {@code outDir} as TIFFs, parallelized over page-range - * chunks. {@code jobs} bounds both the chunk count and the pool slots used. + * chunks. at most {@code jobs} chunks run at once. */ @Override - public void extract(Path pdf, Path outDir, int jobs, ExecutorService pool) throws IOException { - delegate.extract(pdf, outDir, jobs, pool); + public void extract(Path pdf, Path outDir, int jobs) throws IOException { + delegate.extract(pdf, outDir, jobs); } } diff --git a/register/infrastructure/src/main/java/io/github/p4suta/register/infrastructure/process/NativeTools.java b/register/infrastructure/src/main/java/io/github/p4suta/register/infrastructure/process/NativeTools.java index c254061..347a21b 100644 --- a/register/infrastructure/src/main/java/io/github/p4suta/register/infrastructure/process/NativeTools.java +++ b/register/infrastructure/src/main/java/io/github/p4suta/register/infrastructure/process/NativeTools.java @@ -3,7 +3,6 @@ import io.github.p4suta.register.domain.exception.RegisterErrorKind; import io.github.p4suta.register.domain.exception.RegisterException; import io.github.p4suta.shared.process.ProcessRunner; -import io.github.p4suta.shared.process.Tasks; import io.github.p4suta.shared.process.ToolPath; import java.io.IOException; import java.io.InputStream; @@ -11,8 +10,6 @@ import java.time.Duration; import java.util.List; import java.util.Objects; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -22,12 +19,11 @@ * io.github.p4suta.shared.process} plumbing: discovery is delegated to {@link * io.github.p4suta.shared.process.ToolPath} (passing register's per-tool {@code * -Dregister..path} override key — this is how the packaged app-image points at its bundled - * binaries, else the tool is looked up on {@code PATH}), the discard-output run is delegated to - * {@link io.github.p4suta.shared.process.ProcessRunner}, and the worker-pool fan-out to {@link - * io.github.p4suta.shared.process.Tasks}. Unlike the optional flip-book, a missing pipeline tool is - * fatal — the pipeline cannot proceed — so resolution throws, and the shared layer's neutral {@code - * IOException}/{@code TimeoutException} failures are re-mapped to register's {@code - * RegisterException} (NATIVE_TOOL_FAILED) so the pipeline keeps its own error wording. + * binaries, else the tool is looked up on {@code PATH}), and the discard-output run is delegated to + * {@link io.github.p4suta.shared.process.ProcessRunner}. Unlike the optional flip-book, a missing + * pipeline tool is fatal — the pipeline cannot proceed — so resolution throws, and the shared + * layer's neutral {@code IOException}/{@code TimeoutException} failures are re-mapped to register's + * {@code RegisterException} (NATIVE_TOOL_FAILED) so the pipeline keeps its own error wording. */ public final class NativeTools { @@ -128,15 +124,4 @@ private static void awaitExit(Process process, List command, long timeou null); } } - - /** - * Run every task on {@code pool}, in submission order, surfacing the first failure as an {@link - * IOException}. Thin wrapper over the shared {@link - * io.github.p4suta.shared.process.Tasks#awaitAll} that supplies the pipeline's messages, so the - * await semantics live in one place while the pipeline keeps its own error wording. - */ - public static List awaitAll(ExecutorService pool, List> tasks) - throws IOException { - return Tasks.awaitAll(pool, tasks, "pipeline interrupted", "pipeline task failed"); - } } diff --git a/register/port/src/main/java/io/github/p4suta/register/port/Jbig2Assembler.java b/register/port/src/main/java/io/github/p4suta/register/port/Jbig2Assembler.java index 466e95a..95902d0 100644 --- a/register/port/src/main/java/io/github/p4suta/register/port/Jbig2Assembler.java +++ b/register/port/src/main/java/io/github/p4suta/register/port/Jbig2Assembler.java @@ -3,7 +3,6 @@ import java.io.IOException; import java.nio.file.Path; import java.util.OptionalInt; -import java.util.concurrent.ExecutorService; import org.jspecify.annotations.Nullable; /** @@ -28,7 +27,7 @@ void assemble( Path outPdf, @Nullable Path source, OptionalInt forcedDpi, - ExecutorService pool, + int jobs, Path scratchDir) throws IOException; } diff --git a/register/port/src/main/java/io/github/p4suta/register/port/PdfImageExtractor.java b/register/port/src/main/java/io/github/p4suta/register/port/PdfImageExtractor.java index 7114285..23089b4 100644 --- a/register/port/src/main/java/io/github/p4suta/register/port/PdfImageExtractor.java +++ b/register/port/src/main/java/io/github/p4suta/register/port/PdfImageExtractor.java @@ -2,7 +2,6 @@ import java.io.IOException; import java.nio.file.Path; -import java.util.concurrent.ExecutorService; /** * Extracts a scanned PDF's embedded bitonal images as TIFFs. The abstraction over {@code pdfimages} @@ -20,10 +19,10 @@ public interface PdfImageExtractor { /** * Extract every page of {@code pdf} into {@code outDir} as TIFFs, parallelized over page-range - * chunks across the worker pool. + * chunks, at most {@code jobs} at once. * - * @param jobs the worker thread count (bounds the chunk count and pool slots used) + * @param jobs how many chunks may extract at once * @throws IOException if the external tool fails */ - void extract(Path pdf, Path outDir, int jobs, ExecutorService pool) throws IOException; + void extract(Path pdf, Path outDir, int jobs) throws IOException; } diff --git a/shared/pdf/src/main/java/io/github/p4suta/shared/pdf/PdfBoxJbig2Assembler.java b/shared/pdf/src/main/java/io/github/p4suta/shared/pdf/PdfBoxJbig2Assembler.java index 8fc2306..3113420 100644 --- a/shared/pdf/src/main/java/io/github/p4suta/shared/pdf/PdfBoxJbig2Assembler.java +++ b/shared/pdf/src/main/java/io/github/p4suta/shared/pdf/PdfBoxJbig2Assembler.java @@ -14,7 +14,6 @@ import java.util.Locale; import java.util.OptionalInt; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.stream.Stream; import org.apache.pdfbox.Loader; @@ -85,7 +84,7 @@ private record ImageInfo(int width, int height, int resolution) {} * @param outPdf the lossless-JBIG2 PDF to write * @param source a PDF whose Info dict, XMP and version are inherited, or {@code null} for none * @param forcedDpi a single DPI to size every page with, or empty to read each image's own - * @param pool the worker pool the per-page {@code jbig2} encodes run on + * @param jobs how many {@code jbig2} children may encode at once * @param scratchDir scratch directory for the intermediate per-page JBIG2 streams * (caller-owned) * @throws IOException if the directory is empty, a tool fails, or the write fails @@ -95,7 +94,7 @@ public void assemble( Path outPdf, @Nullable Path source, OptionalInt forcedDpi, - ExecutorService pool, + int jobs, Path scratchDir) throws IOException { List images = sortedImages(imageDir); @@ -109,8 +108,9 @@ public void assemble( int index = i; tasks.add(() -> encode(jbig2, image, scratchDir, index, forcedDpi)); } - List pages = - Tasks.awaitAll(pool, tasks, "jbig2 encode interrupted", "jbig2 encode failed"); + // Virtual workers: each task is one subprocess wait plus a brief Pix header read; the + // semaphore-bounded admission keeps at most `jobs` jbig2 children alive at once. + List pages = Tasks.awaitAll(Tasks.Workers.virtual(jobs), tasks, "jbig2 encode"); try (PDDocument doc = new PDDocument()) { for (Page page : pages) { diff --git a/shared/pdf/src/main/java/io/github/p4suta/shared/pdf/PdfImagesCliExtractor.java b/shared/pdf/src/main/java/io/github/p4suta/shared/pdf/PdfImagesCliExtractor.java index 77ed395..4ec4eeb 100644 --- a/shared/pdf/src/main/java/io/github/p4suta/shared/pdf/PdfImagesCliExtractor.java +++ b/shared/pdf/src/main/java/io/github/p4suta/shared/pdf/PdfImagesCliExtractor.java @@ -14,7 +14,6 @@ import java.util.List; import java.util.Locale; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeoutException; import java.util.stream.Stream; import org.jspecify.annotations.Nullable; @@ -101,7 +100,8 @@ public int dominantDpi(Path pdf) throws IOException { /** * Extract all pages of {@code pdf} into {@code outDir} as TIFFs, parallelized over page-range - * chunks of about {@link #CHUNK_PAGES} pages on {@code pool} (at most {@code 4 * jobs} chunks). + * chunks of about {@link #CHUNK_PAGES} pages, at most {@code jobs} running at once (and at most + * {@code 4 * jobs} chunks). * *

One {@code pdfimages -list} pass picks the mode: when every embedded image is 1-bpp CCITT * (the usual self-scanned book), each chunk dumps the raw G4 streams ({@code -ccitt}) and wraps @@ -112,7 +112,7 @@ public int dominantDpi(Path pdf) throws IOException { * a chunk whose dump or wrap deviates in any way is re-extracted decoded ({@code -tiff}), which * is also the whole-run mode for any other source. */ - public void extract(Path pdf, Path outDir, int jobs, ExecutorService pool) throws IOException { + public void extract(Path pdf, Path outDir, int jobs) throws IOException { int total = pageCount(pdf); String pdfimages = resolve("pdfimages", pdfimagesPropertyKey); List rows = @@ -145,7 +145,9 @@ public void extract(Path pdf, Path outDir, int jobs, ExecutorService pool) throw }); chunk++; } - Tasks.awaitAll(pool, tasks, "pdfimages extract interrupted", "pdfimages extract failed"); + // Virtual workers: a chunk is one subprocess wait plus (in remux mode) a brief wrap; the + // semaphore-bounded admission keeps at most `jobs` pdfimages children alive at once. + Tasks.awaitAll(Tasks.Workers.virtual(jobs), tasks, "pdfimages extract"); } /** Pages per extraction chunk; see {@link #extract}. */ diff --git a/shared/pdf/src/test/java/io/github/p4suta/shared/pdf/PdfBoxJbig2AssemblerTest.java b/shared/pdf/src/test/java/io/github/p4suta/shared/pdf/PdfBoxJbig2AssemblerTest.java index 3bd7edf..2bae0ec 100644 --- a/shared/pdf/src/test/java/io/github/p4suta/shared/pdf/PdfBoxJbig2AssemblerTest.java +++ b/shared/pdf/src/test/java/io/github/p4suta/shared/pdf/PdfBoxJbig2AssemblerTest.java @@ -8,8 +8,6 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.OptionalInt; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import javax.imageio.ImageIO; import org.apache.pdfbox.Loader; import org.apache.pdfbox.cos.COSName; @@ -57,13 +55,10 @@ void assemblesABitonalDirectoryIntoAJbig2Pdf(@TempDir Path tmp) throws Exception writeBitonalPage(imageDir.resolve("page-000.png"), 64, 48); writeBitonalPage(imageDir.resolve("page-001.png"), 80, 60); Path outPdf = tmp.resolve("out.pdf"); - - ExecutorService pool = Executors.newFixedThreadPool(2); try { new PdfBoxJbig2Assembler(JBIG2_KEY) - .assemble(imageDir, outPdf, null, OptionalInt.of(300), pool, scratch); + .assemble(imageDir, outPdf, null, OptionalInt.of(300), 2, scratch); } finally { - pool.shutdownNow(); } assertThat(Files.exists(outPdf)).isTrue(); @@ -103,13 +98,10 @@ void inheritsInfoAndVersionFromTheSourcePdf(@TempDir Path tmp) throws Exception Path scratch = Files.createDirectory(tmp.resolve("scratch")); writeBitonalPage(imageDir.resolve("page-000.png"), 64, 48); Path outPdf = tmp.resolve("out.pdf"); - - ExecutorService pool = Executors.newSingleThreadExecutor(); try { new PdfBoxJbig2Assembler(JBIG2_KEY) - .assemble(imageDir, outPdf, source, OptionalInt.empty(), pool, scratch); + .assemble(imageDir, outPdf, source, OptionalInt.empty(), 2, scratch); } finally { - pool.shutdownNow(); } try (PDDocument doc = Loader.loadPDF(outPdf.toFile())) { @@ -125,7 +117,6 @@ void emptyDirectoryFailsBeforeTouchingTheToolchain(@TempDir Path tmp) throws Exc // runs before tool resolution). Works on any host. Path imageDir = Files.createDirectory(tmp.resolve("empty")); Path scratch = Files.createDirectory(tmp.resolve("scratch")); - ExecutorService pool = Executors.newSingleThreadExecutor(); try { PdfBoxJbig2Assembler assembler = new PdfBoxJbig2Assembler(JBIG2_KEY); assertThatThrownBy( @@ -135,12 +126,11 @@ void emptyDirectoryFailsBeforeTouchingTheToolchain(@TempDir Path tmp) throws Exc tmp.resolve("out.pdf"), null, OptionalInt.empty(), - pool, + 1, scratch)) .isInstanceOf(IOException.class) .hasMessageContaining("no cleaned images"); } finally { - pool.shutdownNow(); } } @@ -156,7 +146,6 @@ void missingJbig2BinaryFailsWithAClearMessage(@TempDir Path tmp) throws Exceptio String bogusKey = "shared.pdf.test.bogus.jbig2.path"; System.setProperty(bogusKey, tmp.resolve("definitely-not-jbig2").toString()); try { - ExecutorService pool = Executors.newSingleThreadExecutor(); try { PdfBoxJbig2Assembler assembler = new PdfBoxJbig2Assembler(bogusKey); assertThatThrownBy( @@ -166,11 +155,10 @@ void missingJbig2BinaryFailsWithAClearMessage(@TempDir Path tmp) throws Exceptio tmp.resolve("out.pdf"), null, OptionalInt.of(300), - pool, + 1, scratch)) .isInstanceOf(IOException.class); } finally { - pool.shutdownNow(); } } finally { System.clearProperty(bogusKey); diff --git a/shared/pdf/src/test/java/io/github/p4suta/shared/pdf/PdfImagesCliExtractorTest.java b/shared/pdf/src/test/java/io/github/p4suta/shared/pdf/PdfImagesCliExtractorTest.java index 9b7d1c5..4d17e4d 100644 --- a/shared/pdf/src/test/java/io/github/p4suta/shared/pdf/PdfImagesCliExtractorTest.java +++ b/shared/pdf/src/test/java/io/github/p4suta/shared/pdf/PdfImagesCliExtractorTest.java @@ -10,8 +10,6 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.stream.Stream; import javax.imageio.ImageIO; import org.apache.pdfbox.pdmodel.PDDocument; @@ -68,12 +66,7 @@ void extractsEmbeddedImagesAsTiffs(@TempDir Path tmp) throws Exception { writePdfWithImage(pdf, 120, 90); Path outDir = Files.createDirectory(tmp.resolve("out")); - ExecutorService pool = Executors.newFixedThreadPool(2); - try { - new PdfImagesCliExtractor(PDFIMAGES_KEY, PDFINFO_KEY).extract(pdf, outDir, 2, pool); - } finally { - pool.shutdownNow(); - } + new PdfImagesCliExtractor(PDFIMAGES_KEY, PDFINFO_KEY).extract(pdf, outDir, 2); try (Stream entries = Files.list(outDir)) { List tiffs = @@ -131,12 +124,7 @@ void remuxesAnAllCcittSourceIntoStampedG4Tiffs(@TempDir Path tmp) throws Excepti writeCcittPdf(pdf, 3, 240, 180); Path outDir = Files.createDirectory(tmp.resolve("out")); - ExecutorService pool = Executors.newFixedThreadPool(2); - try { - new PdfImagesCliExtractor(PDFIMAGES_KEY, PDFINFO_KEY).extract(pdf, outDir, 2, pool); - } finally { - pool.shutdownNow(); - } + new PdfImagesCliExtractor(PDFIMAGES_KEY, PDFINFO_KEY).extract(pdf, outDir, 2); try (Stream entries = Files.list(outDir)) { List files = entries.sorted().toList(); diff --git a/shared/process/build.gradle.kts b/shared/process/build.gradle.kts index 5ca5418..f34fc9a 100644 --- a/shared/process/build.gradle.kts +++ b/shared/process/build.gradle.kts @@ -20,8 +20,11 @@ plugins { // (exitCode + stdout + stderr + elapsed Duration; tate's typed Result is the shape donor) and // throwing TimeoutException on timeout. The caller passes the set of acceptable non-zero exits // (generalizing despeckle's hardcoded qpdf-exit-3 tolerance) — a non-acceptable exit throws. -// * Tasks — awaitAll: a parameterized parallel fan-out over a caller-owned pool (register's), -// collecting results in submission order and aggregating the first failure as an IOException. +// * Tasks — awaitAll: a parallel fan-out over a batch-owned executor (Workers.platform for +// CPU-bound work, Workers.virtual for subprocess waits), collecting results in submission +// order with fail-fast sibling interruption, quiescence before the failure propagates, and +// exception identity preserved (domain RuntimeExceptions keep their error kind). Mirrors +// StructuredTaskScope.join() semantics on final-Java features. // // No domain exceptions are reachable here (that is the point of generalizing): a launch failure or // an unacceptable exit surfaces as a plain IOException, a timeout as a TimeoutException — so the @@ -37,13 +40,11 @@ dependencies { // Coverage floor: the same realistic infra-like 0.75 line / 0.60 branch the despeckle // :infrastructure and :shared:imaging modules use — this is process/exec plumbing, not branch-rich -// domain logic, so the domain-grade 0.95/0.90 the kernel/observability use would be dishonest. The -// only branches a unit test cannot drive are the InterruptedException catches (you cannot -// deterministically interrupt the waiting thread mid-waitFor) and a stray launch-failure path; at -// CLASS granularity the imaging-style exclusion would throw away ProcessRunner's genuinely covered -// happy/timeout/exit paths too, so NO class is excluded — the lenient floor absorbs the few -// untestable catch arms. The same self-contained block the other shared modules carry, since the -// floor is not applied by any convention plugin. +// domain logic, so the domain-grade 0.95/0.90 the kernel/observability use would be dishonest. +// (The interrupt arms are now deterministically covered — TasksTest and ProcessRunnerTest drive +// them with latch-coordinated interrupts — but a stray launch-failure path remains; the lenient +// floor absorbs it.) The same self-contained block the other shared modules carry, since the floor +// is not applied by any convention plugin. tasks.named("jacocoTestCoverageVerification") { dependsOn(tasks.named("test")) violationRules { diff --git a/shared/process/src/main/java/io/github/p4suta/shared/process/ProcessRunner.java b/shared/process/src/main/java/io/github/p4suta/shared/process/ProcessRunner.java index f1349fa..8ecfefe 100644 --- a/shared/process/src/main/java/io/github/p4suta/shared/process/ProcessRunner.java +++ b/shared/process/src/main/java/io/github/p4suta/shared/process/ProcessRunner.java @@ -81,9 +81,17 @@ public static Result run( try (ExecutorService drainers = Executors.newVirtualThreadPerTaskExecutor()) { Future out = drainers.submit(() -> process.getInputStream().readAllBytes()); Future err = drainers.submit(() -> process.getErrorStream().readAllBytes()); - if (!process.waitFor(timeout.toMillis(), TimeUnit.MILLISECONDS)) { + try { + if (!process.waitFor(timeout.toMillis(), TimeUnit.MILLISECONDS)) { + process.destroyForcibly(); + throw new TimeoutException(command.get(0) + " timed out after " + timeout); + } + } catch (InterruptedException e) { + // Kill the child before propagating, exactly like the timeout path: the drainers + // then reach EOF, so the try-with-resources close() returns instead of waiting on + // a live child's pipes — and no orphaned process outlives the interrupted caller. process.destroyForcibly(); - throw new TimeoutException(command.get(0) + " timed out after " + timeout); + throw e; } // Decode leniently (replace malformed bytes), matching the long-standing behavior. String stdout = new String(await(out), StandardCharsets.UTF_8); diff --git a/shared/process/src/main/java/io/github/p4suta/shared/process/Tasks.java b/shared/process/src/main/java/io/github/p4suta/shared/process/Tasks.java index 725d3aa..7212886 100644 --- a/shared/process/src/main/java/io/github/p4suta/shared/process/Tasks.java +++ b/shared/process/src/main/java/io/github/p4suta/shared/process/Tasks.java @@ -1,65 +1,236 @@ package io.github.p4suta.shared.process; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicBoolean; +import org.jspecify.annotations.Nullable; /** - * Runs a batch of {@link Callable}s on a caller-owned {@link ExecutorService} and collects their - * results in submission order, turning the executor's checked machinery into a single {@link - * IOException}. + * Runs a batch of {@link Callable}s on a batch-owned executor with fail-fast semantics: the first + * failure interrupts the remaining workers, the batch waits until every worker has actually stopped + * (quiescence), and only then does the failure propagate — so a caller's {@code finally} can safely + * delete the directories the workers were writing into. + * + *

The propagated failure keeps its identity: a task's own {@link IOException} or {@link + * RuntimeException} (the apps' domain exceptions are runtime exceptions) is re-thrown unchanged, so + * the shared exception mapper still sees the original kind and maps the right exit code and {@code + * RunFailed} token; an {@link UncheckedIOException} is unwrapped to its cause. This deliberately + * mirrors {@code StructuredTaskScope.join()}'s contract (fail-fast, sibling interruption, + * quiescence before propagation) on final-Java features, so a future swap to structured concurrency + * stays internal to this class. */ public final class Tasks { private Tasks() {} /** - * Submit every task, wait for all of them, and return their results in the order {@code tasks} - * was given. The first failure stops collection and surfaces as an {@link IOException}: a - * task's own {@link IOException} is re-thrown unchanged (so callers see the real cause), any - * other failure is wrapped with {@code failureMessage}, and an interruption restores the - * thread's interrupt flag and is reported with {@code interruptedMessage}. + * How one batch schedules its workers. * - * @param pool the executor to run the tasks on (the caller owns its lifecycle) + *

{@link #platform(int)} is for CPU-bound work (Leptonica/FFM pixel ops, TIFF G4 encoding): + * a fixed pool of platform threads, because a long native downcall pins a virtual thread's + * carrier, which would silently cap concurrency at the carrier count. {@link #virtual(int)} is + * for subprocess-bound work (per-page {@code jbig2}, {@code pdfimages} chunks): one virtual + * thread per task, with at most {@code maxInFlight} admitted past an internal semaphore so the + * spawned child processes stay bounded. + */ + public static final class Workers { + + private final int limit; + private final boolean virtualThreads; + + private Workers(int limit, boolean virtualThreads) { + if (limit < 1) { + throw new IllegalArgumentException("worker limit must be >= 1, got " + limit); + } + this.limit = limit; + this.virtualThreads = virtualThreads; + } + + /** {@code jobs} platform threads — CPU-bound work. */ + public static Workers platform(int jobs) { + return new Workers(jobs, false); + } + + /** + * A virtual thread per task, at most {@code maxInFlight} running at once — work that mostly + * waits on a subprocess. + */ + public static Workers virtual(int maxInFlight) { + return new Workers(maxInFlight, true); + } + + private ExecutorService newExecutor() { + return virtualThreads + ? Executors.newVirtualThreadPerTaskExecutor() + : Executors.newFixedThreadPool(limit); + } + + private @Nullable Semaphore newPermits() { + return virtualThreads ? new Semaphore(limit) : null; + } + } + + /** + * Per-item completion callback, invoked on the orchestrating thread in completion order: {@code + * done} runs {@code 1..total}, strictly increasing, successes only — so progress events built + * on it are ordered without any thread-safety burden on the listener. + */ + @FunctionalInterface + public interface ItemProgress { + + /** The no-op listener. */ + ItemProgress NONE = (done, total) -> {}; + + /** + * One more item finished. + * + * @param done how many items have finished so far (1-based, strictly increasing) + * @param total how many items the batch runs + */ + void onItem(int done, int total); + } + + /** {@link #awaitAll(Workers, List, String, ItemProgress)} without progress reporting. */ + public static List awaitAll(Workers workers, List> tasks, String label) + throws IOException { + return awaitAll(workers, tasks, label, ItemProgress.NONE); + } + + /** + * Run every task on a batch-owned executor and return their results in submission order. + * + *

On the first failure the remaining workers are interrupted ({@code shutdownNow}) and the + * batch waits for full quiescence before the failure propagates with its identity intact: + * {@link IOException} and {@link RuntimeException} unchanged, {@link UncheckedIOException} + * unwrapped to its cause, anything else wrapped as {@code IOException(label + " failed")}. + * Caller interruption likewise stops and joins the workers, restores the interrupt flag, and + * surfaces as {@code IOException(label + " interrupted")}. + * + * @param workers the scheduling mode (see {@link Workers}) * @param tasks the work to run, one result per task - * @param interruptedMessage the message for an {@link IOException} raised on interruption - * @param failureMessage the message wrapping a non-{@code IOException} task failure + * @param label the batch's name in failure/interruption messages (e.g. {@code "G4 encode"}) + * @param progress called on this thread after each successful item * @param the task result type * @return the task results, in submission order * @throws IOException if any task fails or the wait is interrupted */ public static List awaitAll( - ExecutorService pool, - List> tasks, - String interruptedMessage, - String failureMessage) + Workers workers, List> tasks, String label, ItemProgress progress) throws IOException { - List> futures; - try { - futures = pool.invokeAll(tasks); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IOException(interruptedMessage, e); - } - List results = new ArrayList<>(futures.size()); - for (Future future : futures) { + int total = tasks.size(); + List<@Nullable T> results = new ArrayList<>(Collections.nCopies(total, null)); + try (ExecutorService pool = workers.newExecutor()) { + ExecutorCompletionService> completion = + new ExecutorCompletionService<>(pool); + @Nullable Semaphore permits = workers.newPermits(); + // The fail-fast gate: set by the first worker whose task throws, checked by every + // worker before it starts a task. shutdownNow() alone cannot stop a queued task a + // newly-freed worker dequeues in the instant before the orchestrator reacts; the gate + // closes that window (a gated task completes with the BatchCancelled marker instead + // of running user code). + AtomicBoolean failed = new AtomicBoolean(); + for (int i = 0; i < total; i++) { + int index = i; + Callable task = tasks.get(i); + completion.submit( + () -> { + if (failed.get()) { + throw new BatchCancelled(); + } + // Acquired inside the worker so submission never blocks and an + // interrupt during the wait cancels cleanly. + if (permits != null) { + permits.acquire(); + } + try { + return new IndexedResult<>(index, task.call()); + } catch (Throwable t) { + failed.set(true); + throw t; + } finally { + if (permits != null) { + permits.release(); + } + } + }); + } try { - results.add(future.get()); + int done = 0; + while (done < total) { + Future> future = completion.take(); + try { + IndexedResult result = future.get(); + results.set(result.index(), result.value()); + done++; + progress.onItem(done, total); + } catch (ExecutionException e) { + if (e.getCause() instanceof BatchCancelled) { + // A gated sibling, not the root cause — keep draining: the failure + // that set the gate already ran, so its completion is on its way. + continue; + } + pool.shutdownNow(); + throw failure(e.getCause(), label); + } + } } catch (InterruptedException e) { + pool.shutdownNow(); Thread.currentThread().interrupt(); - throw new IOException(interruptedMessage, e); - } catch (ExecutionException e) { - Throwable cause = e.getCause(); - if (cause instanceof IOException io) { - throw io; - } - throw new IOException(failureMessage, cause); + throw new IOException(label + " interrupted", e); } + // The try-with-resources close() awaits full termination on every path, so by the + // time a result or an exception reaches the caller, no worker is still running (or + // still writing into directories the caller is about to delete). } - return results; + return castNonNull(results); + } + + /** One task's result tagged with its submission index, for submission-order assembly. */ + private record IndexedResult(int index, @Nullable T value) {} + + /** The internal marker a gated (skipped-after-failure) task completes with; never user code. */ + private static final class BatchCancelled extends RuntimeException { + private static final long serialVersionUID = 1L; + + BatchCancelled() { + super("cancelled by an earlier failure in the batch", null, false, false); + } + } + + /** The failure to propagate for a task that threw {@code cause}; see {@link #awaitAll}. */ + private static IOException failure(@Nullable Throwable cause, String label) { + switch (cause) { + case IOException io -> { + return io; + } + case UncheckedIOException unchecked -> { + // UncheckedIOException's constructor requires a non-null cause. + return java.util.Objects.requireNonNull(unchecked.getCause()); + } + case RuntimeException runtime -> throw runtime; + case Error error -> throw error; + case null, default -> { + return new IOException(label + " failed", cause); + } + } + } + + /** + * {@return the results list with every slot filled} Every slot was set exactly once (one + * completion per submitted task), so the nullable build-up list is safe to expose as non-null. + */ + @SuppressWarnings({"unchecked", "NullAway"}) // each index 0..total-1 was set by its task + private static List castNonNull(List<@Nullable T> results) { + return (List) (List) results; } } diff --git a/shared/process/src/test/java/io/github/p4suta/shared/process/ProcessRunnerTest.java b/shared/process/src/test/java/io/github/p4suta/shared/process/ProcessRunnerTest.java index 5514e24..c826d9d 100644 --- a/shared/process/src/test/java/io/github/p4suta/shared/process/ProcessRunnerTest.java +++ b/shared/process/src/test/java/io/github/p4suta/shared/process/ProcessRunnerTest.java @@ -111,4 +111,38 @@ void missingBinaryFailsWithIoException() { Duration.ofSeconds(10))) .isInstanceOf(IOException.class); } + + @Test + @org.junit.jupiter.api.Timeout(15) + void interruptedWaiterKillsTheChildAndUnwindsPromptly() throws Exception { + // Regression: an interrupt during waitFor used to propagate WITHOUT killing the child, so + // the drainer close() hung on the live child's pipes (here: up to the sleep's 30s) and the + // child leaked. Now the child is killed first, so the call unwinds promptly — the @Timeout + // and the bounded join are the proof. + var thrown = new java.util.concurrent.atomic.AtomicReference(); + var flagRestoredPathTaken = new java.util.concurrent.atomic.AtomicBoolean(); + var started = new java.util.concurrent.CountDownLatch(1); + Thread caller = + new Thread( + () -> { + try { + started.countDown(); + ProcessRunner.run(List.of("sleep", "30"), Duration.ofMinutes(5)); + } catch (InterruptedException e) { + thrown.set(e); + flagRestoredPathTaken.set(true); + } catch (Exception e) { + thrown.set(e); + } + }); + caller.start(); + assertThat(started.await(10, java.util.concurrent.TimeUnit.SECONDS)).isTrue(); + // Give the child a beat to actually spawn before interrupting the waiter. + Thread.sleep(300); + caller.interrupt(); + caller.join(10_000); + assertThat(caller.isAlive()).as("the interrupted run must unwind promptly").isFalse(); + assertThat(thrown.get()).isInstanceOf(InterruptedException.class); + assertThat(flagRestoredPathTaken).isTrue(); + } } diff --git a/shared/process/src/test/java/io/github/p4suta/shared/process/TasksTest.java b/shared/process/src/test/java/io/github/p4suta/shared/process/TasksTest.java index 0b689d1..3356f83 100644 --- a/shared/process/src/test/java/io/github/p4suta/shared/process/TasksTest.java +++ b/shared/process/src/test/java/io/github/p4suta/shared/process/TasksTest.java @@ -1,74 +1,344 @@ package io.github.p4suta.shared.process; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +/** + * The batch contract of {@link Tasks#awaitAll(Tasks.Workers, List, String, Tasks.ItemProgress)}: + * submission-order results, fail-fast with sibling interruption, quiescence before the failure + * propagates, exception identity, orchestrator-thread progress, and the virtual mode's in-flight + * bound. All coordination is latch-based — no sleeps as logic. + */ +@Timeout(30) class TasksTest { + // ---- happy path ---- + @Test void returnsResultsInSubmissionOrder() throws IOException { - ExecutorService pool = Executors.newFixedThreadPool(4); - try { - List> tasks = new ArrayList<>(); - for (int i = 0; i < 16; i++) { - int value = i; - tasks.add(() -> value); - } - List results = Tasks.awaitAll(pool, tasks, "interrupted", "failed"); - for (int i = 0; i < 16; i++) { - assertEquals(i, results.get(i)); - } - } finally { - pool.shutdown(); + List> tasks = new ArrayList<>(); + for (int i = 0; i < 16; i++) { + int value = i; + tasks.add(() -> value); + } + List results = Tasks.awaitAll(Tasks.Workers.platform(4), tasks, "batch"); + for (int i = 0; i < 16; i++) { + assertEquals(i, results.get(i)); } } @Test - void surfacesATasksIOExceptionUnchanged() { - ExecutorService pool = Executors.newFixedThreadPool(2); - try { - IOException boom = new IOException("boom"); - Callable task = - () -> { - throw boom; - }; - IOException thrown = - assertThrows( - IOException.class, - () -> Tasks.awaitAll(pool, List.of(task), "interrupted", "failed")); - // The task's own IOException is re-thrown as-is, not wrapped with the failure message. - assertSame(boom, thrown); - } finally { - pool.shutdown(); + void progressRunsOnTheCallingThreadInCompletionOrder() throws IOException { + Thread caller = Thread.currentThread(); + List seen = new ArrayList<>(); // safe: appended on the calling thread only + AtomicBoolean wrongThread = new AtomicBoolean(); + List> tasks = new ArrayList<>(); + for (int i = 0; i < 8; i++) { + int value = i; + tasks.add(() -> value); } + Tasks.awaitAll( + Tasks.Workers.platform(4), + tasks, + "batch", + (done, total) -> { + if (Thread.currentThread() != caller) { + wrongThread.set(true); + } + seen.add(new int[] {done, total}); + }); + assertFalse(wrongThread.get(), "progress must run on the orchestrating thread"); + assertEquals(8, seen.size()); + for (int i = 0; i < 8; i++) { + assertEquals(i + 1, seen.get(i)[0], "done is strictly increasing from 1"); + assertEquals(8, seen.get(i)[1]); + } + } + + // ---- failure semantics ---- + + @Test + void surfacesATasksIOExceptionByIdentity() { + IOException boom = new IOException("boom"); + IOException thrown = + assertThrows( + IOException.class, + () -> + Tasks.awaitAll( + Tasks.Workers.platform(2), + List.of(failing(boom)), + "batch")); + assertSame(boom, thrown); + } + + @Test + void surfacesARuntimeExceptionByIdentitySoErrorKindsSurvive() { + IllegalStateException boom = new IllegalStateException("domain kind carrier"); + IllegalStateException thrown = + assertThrows( + IllegalStateException.class, + () -> + Tasks.awaitAll( + Tasks.Workers.platform(2), + List.of(failing(boom)), + "batch")); + assertSame(boom, thrown); + } + + @Test + void unwrapsAnUncheckedIOExceptionToItsCause() { + IOException cause = new IOException("disk"); + IOException thrown = + assertThrows( + IOException.class, + () -> + Tasks.awaitAll( + Tasks.Workers.platform(2), + List.of(failing(new UncheckedIOException(cause))), + "batch")); + assertSame(cause, thrown); + } + + @Test + void wrapsAnUnknownCheckedFailureWithTheLabel() { + Exception checked = new Exception("odd"); + IOException thrown = + assertThrows( + IOException.class, + () -> + Tasks.awaitAll( + Tasks.Workers.platform(2), + List.of(failing(checked)), + "G4 encode")); + assertEquals("G4 encode failed", thrown.getMessage()); + assertSame(checked, thrown.getCause()); + } + + // ---- fail-fast, sibling interruption, quiescence ---- + + @Test + void firstFailureInterruptsSiblingsAndSkipsQueuedWork() throws Exception { + CountDownLatch blockerStarted = new CountDownLatch(1); + AtomicBoolean blockerInterrupted = new AtomicBoolean(); + AtomicBoolean queuedRan = new AtomicBoolean(); + IllegalStateException boom = new IllegalStateException("first failure"); + + Callable blocker = + () -> { + blockerStarted.countDown(); + try { + new CountDownLatch(1).await(); // blocks until interrupted + } catch (InterruptedException e) { + blockerInterrupted.set(true); + } + return null; + }; + Callable failing = + () -> { + blockerStarted.await(); // fail only once the sibling is provably running + throw boom; + }; + Callable queued = + () -> { + queuedRan.set(true); + return null; + }; + + IllegalStateException thrown = + assertThrows( + IllegalStateException.class, + () -> + Tasks.awaitAll( + Tasks.Workers.platform(2), + List.of(blocker, failing, queued), + "batch")); + assertSame(boom, thrown); + assertTrue(blockerInterrupted.get(), "the running sibling must be interrupted"); + assertFalse(queuedRan.get(), "queued work must be discarded on failure"); } @Test - void wrapsANonIoFailureWithTheFailureMessage() { - ExecutorService pool = Executors.newFixedThreadPool(2); - try { - Callable task = + void quiescesBeforeTheFailurePropagates() { + AtomicInteger inFlight = new AtomicInteger(); + CountDownLatch blockerStarted = new CountDownLatch(1); + + Callable blocker = + () -> { + inFlight.incrementAndGet(); + try { + blockerStarted.countDown(); + new CountDownLatch(1).await(); // until interrupted + return null; + } catch (InterruptedException e) { + return null; + } finally { + inFlight.decrementAndGet(); + } + }; + Callable failing = + () -> { + blockerStarted.await(); + throw new IllegalStateException("boom"); + }; + + assertThrows( + IllegalStateException.class, + () -> + Tasks.awaitAll( + Tasks.Workers.platform(2), List.of(blocker, failing), "batch")); + // The contract callers' finally-blocks rely on: when awaitAll throws, no worker is still + // running (still writing into directories about to be deleted). + assertEquals(0, inFlight.get()); + } + + @Test + void progressCountsOnlySuccessesContiguously() { + // Released from the PROGRESS callback (the orchestrating thread), not from the ok tasks: + // a task-side latch would leave a window where the failing task's completion overtakes an + // ok task's still-being-enqueued completion, making the recorded sequence racy. + CountDownLatch twoConsumed = new CountDownLatch(2); + List dones = new ArrayList<>(); // appended on the calling thread only + Callable ok = () -> null; + Callable failing = + () -> { + twoConsumed.await(); // both successes are consumed first, provably + throw new IllegalStateException("boom"); + }; + assertThrows( + IllegalStateException.class, + () -> + Tasks.awaitAll( + Tasks.Workers.platform(3), + List.of(ok, ok, failing), + "batch", + (done, total) -> { + dones.add(done); + twoConsumed.countDown(); + })); + assertEquals(List.of(1, 2), dones); + } + + // ---- caller interruption ---- + + @Test + void callerInterruptionStopsWorkersAndRestoresTheFlag() throws Exception { + CountDownLatch workerStarted = new CountDownLatch(1); + AtomicBoolean workerInterrupted = new AtomicBoolean(); + AtomicInteger inFlight = new AtomicInteger(); + AtomicReference thrown = new AtomicReference<>(); + AtomicBoolean flagRestored = new AtomicBoolean(); + + Callable blocker = + () -> { + inFlight.incrementAndGet(); + try { + workerStarted.countDown(); + new CountDownLatch(1).await(); + return null; + } catch (InterruptedException e) { + workerInterrupted.set(true); + return null; + } finally { + inFlight.decrementAndGet(); + } + }; + + Thread caller = + new Thread( + () -> { + try { + Tasks.awaitAll( + Tasks.Workers.platform(1), List.of(blocker), "register"); + } catch (IOException e) { + thrown.set(e); + flagRestored.set(Thread.currentThread().isInterrupted()); + } + }); + caller.start(); + assertTrue(workerStarted.await(10, TimeUnit.SECONDS)); + caller.interrupt(); + caller.join(10_000); + assertFalse(caller.isAlive(), "the interrupted batch must unwind promptly"); + + assertNotNull(thrown.get()); + assertInstanceOf(IOException.class, thrown.get()); + assertEquals("register interrupted", thrown.get().getMessage()); + assertTrue(flagRestored.get(), "the interrupt flag must be restored"); + assertTrue(workerInterrupted.get(), "the worker must be stopped"); + assertEquals(0, inFlight.get(), "quiescent before the exception reached the caller"); + } + + // ---- virtual mode ---- + + @Test + void virtualModeBoundsInFlightWork() throws Exception { + int bound = 3; + AtomicInteger inFlight = new AtomicInteger(); + AtomicInteger highWater = new AtomicInteger(); + CountDownLatch gate = new CountDownLatch(1); + CountDownLatch boundRunning = new CountDownLatch(bound); + + List> tasks = new ArrayList<>(); + for (int i = 0; i < 12; i++) { + tasks.add( () -> { - throw new IllegalStateException("oops"); - }; - IOException thrown = - assertThrows( - IOException.class, - () -> Tasks.awaitAll(pool, List.of(task), "interrupted", "failed")); - assertEquals("failed", thrown.getMessage()); - assertInstanceOf(IllegalStateException.class, thrown.getCause()); - } finally { - pool.shutdown(); + int now = inFlight.incrementAndGet(); + highWater.accumulateAndGet(now, Math::max); + boundRunning.countDown(); + try { + gate.await(); + return null; + } finally { + inFlight.decrementAndGet(); + } + }); } + Thread opener = + new Thread( + () -> { + try { + // Open the gate only once `bound` workers are provably in flight, + // so the high-water assertion is meaningful, then let all pass. + boundRunning.await(); + gate.countDown(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + opener.start(); + Tasks.awaitAll(Tasks.Workers.virtual(bound), tasks, "jbig2 encode"); + opener.join(10_000); + assertEquals(bound, highWater.get(), "no more than the bound may run at once"); + } + + @Test + void rejectsANonPositiveWorkerLimit() { + assertThrows(IllegalArgumentException.class, () -> Tasks.Workers.platform(0)); + assertThrows(IllegalArgumentException.class, () -> Tasks.Workers.virtual(-1)); + } + + /** A task that throws {@code failure} once called. */ + private static Callable failing(Exception failure) { + return () -> { + throw failure; + }; } }