Skip to content
Closed
4 changes: 4 additions & 0 deletions _typos.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,7 @@ Pagent = "Pagent"
Flavour = "Flavour"
flavours = "flavours"
initialise = "initialise"
# The PDF spec's own key name `DecodeParms` (and PDFBox's COSName.DECODE_PARMS
# constant mirroring it) — third-party/spec identifiers we must name verbatim.
Parms = "Parms"
PARMS = "PARMS"
4 changes: 4 additions & 0 deletions despeckle/application/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ dependencies {
// implementations are no longer duplicated here. Static calls returning java.base types only, so
// implementation (not api) is the right scope.
implementation(project(":shared:io"))
// Tasks.awaitAll(Workers...): the shared fail-fast page fan-out (batch-owned executor,
// sibling interruption, quiescence before the failure propagates) DespeckleService runs its
// per-page work on.
implementation(project(":shared:process"))
implementation(libs.slf4j.api)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,13 @@
import io.github.p4suta.shared.io.CorpusFiles;
import io.github.p4suta.shared.io.OutputDirs;
import io.github.p4suta.shared.kernel.PageProgressListener;
import io.github.p4suta.shared.process.Tasks;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import org.jspecify.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -88,8 +83,8 @@ public Summary run(Config config) throws IOException {
/**
* Execute a run, reporting each finished page to {@code progress}.
*
* @param progress called once per page as it completes (1-based count, total page count); may
* be invoked from the worker threads, so it must be thread-safe
* @param progress called once per page as it completes (1-based count, total page count),
* always on the calling thread and in order
*/
public Summary run(Config config, PageProgressListener progress) throws IOException {
OutputDirs.prepare(config.outputDir(), config.force());
Expand All @@ -106,27 +101,25 @@ public Summary run(Config config, PageProgressListener progress) throws IOExcept
? Reporter.noOp()
: reporterFactory.create(config.reportDir(), config.flipbook());

AtomicInteger done = new AtomicInteger();
List<PageOutcome> outcomes;
ExecutorService pool = Executors.newFixedThreadPool(config.jobs());
try {
List<Callable<PageOutcome>> tasks = new ArrayList<>(files.size());
for (Path src : files) {
tasks.add(
() -> {
PageOutcome outcome = processOne(src, config, report);
int n = done.incrementAndGet();
progress.onPage(n, files.size());
if (n % PROGRESS_EVERY == 0 || n == files.size()) {
LOG.info("{}/{}", n, files.size());
List<Callable<PageOutcome>> tasks = new ArrayList<>(files.size());
for (Path src : files) {
tasks.add(() -> processOne(src, config, report));
}
// Platform workers: each page is CPU-bound Leptonica work (FFM downcalls pin virtual
// threads' carriers). The fan-out fails fast and quiesces before throwing, so a failed
// run never leaves workers writing into the output directory. Progress and the human
// log arrive on this thread, ordered.
List<PageOutcome> outcomes =
Tasks.awaitAll(
Tasks.Workers.platform(config.jobs()),
tasks,
"despeckle",
(done, total) -> {
progress.onPage(done, total);
if (done % PROGRESS_EVERY == 0 || done == total) {
LOG.info("{}/{}", done, total);
}
return outcome;
});
}
outcomes = invokeAll(pool, tasks);
} finally {
pool.shutdown();
}

report.finish();

Expand All @@ -152,7 +145,7 @@ public Summary run(Config config, PageProgressListener progress) throws IOExcept

private record PageOutcome(Path source, ProcessResult result) {}

private PageOutcome processOne(Path src, Config config, Reporter report) {
private PageOutcome processOne(Path src, Config config, Reporter report) throws IOException {
Path dest =
CorpusFiles.mirrorDestination(
src, config.inputDir(), config.outputDir(), config.format().extension());
Expand All @@ -166,30 +159,9 @@ private PageOutcome processOne(Path src, Config config, Reporter report) {
report.addPage(stem, src, dest, result);
return new PageOutcome(src, result);
} catch (IOException e) {
throw new UncheckedIOException("failed to process " + src, e);
}
}

private static List<PageOutcome> invokeAll(
ExecutorService pool, List<Callable<PageOutcome>> tasks) throws IOException {
List<Future<PageOutcome>> futures;
try {
futures = pool.invokeAll(tasks);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("despeckle run interrupted", e);
}
List<PageOutcome> outcomes = new ArrayList<>(futures.size());
for (Future<PageOutcome> future : futures) {
try {
outcomes.add(future.get());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("despeckle run interrupted", e);
} catch (ExecutionException e) {
throw new IOException("page processing failed", e.getCause());
}
// Re-throw with the page named: Tasks surfaces a task's IOException unchanged, so
// this context reaches the user instead of being lost in a generic wrapper.
throw new IOException("failed to process " + src, e);
}
return outcomes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@
import java.nio.file.Path;
import java.util.Comparator;
import java.util.OptionalInt;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Stream;
import org.jspecify.annotations.Nullable;
import org.slf4j.Logger;
Expand Down Expand Up @@ -77,17 +75,15 @@ public void run(Config config) throws IOException {
}

Path jb2Dir = createWorkDir(config.outPdf());
ExecutorService pool = Executors.newFixedThreadPool(config.jobs());
try {
assembler.assemble(
config.imageDir(),
config.outPdf(),
config.source(),
config.dpi(),
pool,
config.jobs(),
jb2Dir);
} finally {
pool.shutdown();
deleteRecursively(jb2Dir);
}
linearizer.linearize(config.outPdf());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
import java.nio.file.Path;
import java.util.Comparator;
import java.util.OptionalInt;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Stream;
import org.jspecify.annotations.Nullable;
import org.slf4j.Logger;
Expand Down Expand Up @@ -96,41 +94,37 @@ public DespeckleService.Summary run(Config config) throws IOException {
Path extracted = Files.createDirectories(work.resolve("in"));
Path cleaned = Files.createDirectories(work.resolve("clean"));
Path jbig2Dir = Files.createDirectories(work.resolve("jb2"));
ExecutorService pool = Executors.newFixedThreadPool(config.jobs());
DespeckleService.Summary summary;
try {
int dpi =
config.options().dpi().isPresent()
? config.options().dpi().getAsInt()
: extractor.dominantDpi(config.inputPdf());
LOG.info(
"pipeline: {} -> {} at {} dpi", config.inputPdf(), config.outputPdf(), dpi);
int dpi =
config.options().dpi().isPresent()
? config.options().dpi().getAsInt()
: extractor.dominantDpi(config.inputPdf());
LOG.info("pipeline: {} -> {} at {} dpi", config.inputPdf(), config.outputPdf(), dpi);

extractor.extract(config.inputPdf(), extracted, config.jobs(), pool);
// Each step fans out on its own batch-owned workers bounded by the same jobs budget,
// so the steps never hold idle threads for each other (the old shared outer pool sat
// idle through the whole despeckle step).
extractor.extract(config.inputPdf(), extracted, config.jobs());

DespeckleService.Config clean =
new DespeckleService.Config(
extracted,
cleaned,
OutputFormat.TIFF,
"*.tif",
config.jobs(),
true,
config.options().withDpi(dpi),
config.reportDir(),
config.flipbook());
summary = despeckleService.run(clean);
DespeckleService.Config clean =
new DespeckleService.Config(
extracted,
cleaned,
OutputFormat.TIFF,
"*.tif",
config.jobs(),
true,
config.options().withDpi(dpi),
config.reportDir(),
config.flipbook());
DespeckleService.Summary summary = despeckleService.run(clean);

assembler.assemble(
cleaned,
config.outputPdf(),
config.inputPdf(),
OptionalInt.of(dpi),
pool,
jbig2Dir);
} finally {
pool.shutdown();
}
assembler.assemble(
cleaned,
config.outputPdf(),
config.inputPdf(),
OptionalInt.of(dpi),
config.jobs(),
jbig2Dir);
linearizer.linearize(config.outputPdf());
LOG.info("wrote {}", config.outputPdf());
return summary;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import java.nio.file.Path;
import java.util.List;
import java.util.OptionalInt;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import org.jspecify.annotations.Nullable;

Expand Down Expand Up @@ -111,8 +110,7 @@ public int dominantDpi(Path pdf) {
}

@Override
public void extract(Path pdf, Path outDir, int jobs, ExecutorService pool)
throws IOException {
public void extract(Path pdf, Path outDir, int jobs) throws IOException {
if (failOn != null && pdf.getFileName().toString().contains(failOn)) {
throw new IOException("fake extract failed for " + pdf);
}
Expand All @@ -135,7 +133,7 @@ public void assemble(
Path outPdf,
@Nullable Path source,
OptionalInt forcedDpi,
ExecutorService pool,
int jobs,
Path scratchDir)
throws IOException {
Files.writeString(outPdf, "%PDF-fake", StandardCharsets.UTF_8);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import java.io.IOException;
import java.nio.file.Path;
import java.util.OptionalInt;
import java.util.concurrent.ExecutorService;
import org.jspecify.annotations.Nullable;

/**
Expand Down Expand Up @@ -34,7 +33,7 @@ public PdfBoxJbig2Assembler() {}
* @param outPdf the lossless-JBIG2 PDF to write
* @param source a PDF whose Info dict, XMP and version are inherited, or {@code null} for none
* @param forcedDpi a single DPI to size every page with, or empty to read each image's own
* @param pool the worker pool the per-page {@code jbig2} encodes run on
* @param jobs how many {@code jbig2} children may encode at once
* @param scratchDir scratch directory for the intermediate per-page JBIG2 streams
* (caller-owned)
* @throws IOException if the directory is empty, a tool fails, or the write fails
Expand All @@ -45,9 +44,9 @@ public void assemble(
Path outPdf,
@Nullable Path source,
OptionalInt forcedDpi,
ExecutorService pool,
int jobs,
Path scratchDir)
throws IOException {
delegate.assemble(imageDir, outPdf, source, forcedDpi, pool, scratchDir);
delegate.assemble(imageDir, outPdf, source, forcedDpi, jobs, scratchDir);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import io.github.p4suta.despeckle.port.PdfImageExtractor;
import java.io.IOException;
import java.nio.file.Path;
import java.util.concurrent.ExecutorService;

/**
* Despeckle's {@link PdfImageExtractor} adapter — a wrapper over the shared {@code
Expand Down Expand Up @@ -34,10 +33,10 @@ public int dominantDpi(Path pdf) throws IOException {

/**
* Extract all pages of {@code pdf} into {@code outDir} as TIFFs, parallelized over page-range
* chunks. {@code jobs} bounds both the chunk count and the pool slots used.
* chunks. at most {@code jobs} chunks run at once.
*/
@Override
public void extract(Path pdf, Path outDir, int jobs, ExecutorService pool) throws IOException {
delegate.extract(pdf, outDir, jobs, pool);
public void extract(Path pdf, Path outDir, int jobs) throws IOException {
delegate.extract(pdf, outDir, jobs);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

/**
Expand All @@ -26,10 +21,10 @@
* <p>The launch helpers stay local rather than routing through {@code :shared:process}'s {@code
* ProcessRunner}/{@code Tasks}: {@link #capture} returns the raw {@code byte[]} a JBIG2 stream
* needs (the shared runner decodes to a UTF-8 {@code String}, which is lossy for binary output),
* and {@link #run}/{@link #awaitAll} raise/propagate the tagged {@link DespeckleException} kind
* (the shared {@code Tasks} flattens any non-{@code IOException} cause, losing the kind and its
* exit code). The {@code qpdf} call site, whose output is text and whose exit-3 tolerance the
* shared runner models directly, uses {@code ProcessRunner} instead.
* and {@link #run} raises/propagates the tagged {@link DespeckleException} kind (the shared {@code
* Tasks} flattens any non-{@code IOException} cause, losing the kind and its exit code). The {@code
* qpdf} call site, whose output is text and whose exit-3 tolerance the shared runner models
* directly, uses {@code ProcessRunner} instead.
*
* <p>Public so the sibling {@code infrastructure.pdf} adapters can call it; it never leaves the
* {@code :infrastructure} module (the application and CLI layers depend only on the {@code :port}
Expand Down Expand Up @@ -121,41 +116,4 @@ private static void awaitExit(Process process, List<String> command, long timeou
null);
}
}

/**
* Run every task on {@code pool}, in submission order, surfacing the first failure as
* IOException.
*/
public static <T> List<T> awaitAll(ExecutorService pool, List<Callable<T>> tasks)
throws IOException {
List<Future<T>> futures;
try {
futures = pool.invokeAll(tasks);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("pipeline interrupted", e);
}
List<T> results = new ArrayList<>(futures.size());
for (Future<T> future : futures) {
try {
results.add(future.get());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("pipeline interrupted", e);
} catch (ExecutionException e) {
Throwable cause = e.getCause();
// Preserve a tagged domain failure (e.g. NATIVE_TOOL_FAILED / IMAGE_UNREADABLE
// raised inside a worker) so its kind — and exit code — survives the pool boundary
// instead of being flattened to a generic IOException.
if (cause instanceof DespeckleException de) {
throw de;
}
if (cause instanceof IOException io) {
throw io;
}
throw new IOException("pipeline task failed", cause);
}
}
return results;
}
}
Loading
Loading