Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions despeckle/application/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ dependencies {
// implementations are no longer duplicated here. Static calls returning java.base types only, so
// implementation (not api) is the right scope.
implementation(project(":shared:io"))
// Tasks.awaitAll(Workers...): the shared fail-fast page fan-out (batch-owned executor,
// sibling interruption, quiescence before the failure propagates) DespeckleService runs its
// per-page work on.
implementation(project(":shared:process"))
implementation(libs.slf4j.api)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,13 @@
import io.github.p4suta.shared.io.CorpusFiles;
import io.github.p4suta.shared.io.OutputDirs;
import io.github.p4suta.shared.kernel.PageProgressListener;
import io.github.p4suta.shared.process.Tasks;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import org.jspecify.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -88,8 +83,8 @@ public Summary run(Config config) throws IOException {
/**
* Execute a run, reporting each finished page to {@code progress}.
*
* @param progress called once per page as it completes (1-based count, total page count); may
* be invoked from the worker threads, so it must be thread-safe
* @param progress called once per page as it completes (1-based count, total page count),
* always on the calling thread and in order
*/
public Summary run(Config config, PageProgressListener progress) throws IOException {
OutputDirs.prepare(config.outputDir(), config.force());
Expand All @@ -106,27 +101,25 @@ public Summary run(Config config, PageProgressListener progress) throws IOExcept
? Reporter.noOp()
: reporterFactory.create(config.reportDir(), config.flipbook());

AtomicInteger done = new AtomicInteger();
List<PageOutcome> outcomes;
ExecutorService pool = Executors.newFixedThreadPool(config.jobs());
try {
List<Callable<PageOutcome>> tasks = new ArrayList<>(files.size());
for (Path src : files) {
tasks.add(
() -> {
PageOutcome outcome = processOne(src, config, report);
int n = done.incrementAndGet();
progress.onPage(n, files.size());
if (n % PROGRESS_EVERY == 0 || n == files.size()) {
LOG.info("{}/{}", n, files.size());
List<Callable<PageOutcome>> tasks = new ArrayList<>(files.size());
for (Path src : files) {
tasks.add(() -> processOne(src, config, report));
}
// Platform workers: each page is CPU-bound Leptonica work (FFM downcalls pin virtual
// threads' carriers). The fan-out fails fast and quiesces before throwing, so a failed
// run never leaves workers writing into the output directory. Progress and the human
// log arrive on this thread, ordered.
List<PageOutcome> outcomes =
Tasks.awaitAll(
Tasks.Workers.platform(config.jobs()),
tasks,
"despeckle",
(done, total) -> {
progress.onPage(done, total);
if (done % PROGRESS_EVERY == 0 || done == total) {
LOG.info("{}/{}", done, total);
}
return outcome;
});
}
outcomes = invokeAll(pool, tasks);
} finally {
pool.shutdown();
}

report.finish();

Expand All @@ -152,7 +145,7 @@ public Summary run(Config config, PageProgressListener progress) throws IOExcept

private record PageOutcome(Path source, ProcessResult result) {}

private PageOutcome processOne(Path src, Config config, Reporter report) {
private PageOutcome processOne(Path src, Config config, Reporter report) throws IOException {
Path dest =
CorpusFiles.mirrorDestination(
src, config.inputDir(), config.outputDir(), config.format().extension());
Expand All @@ -166,30 +159,9 @@ private PageOutcome processOne(Path src, Config config, Reporter report) {
report.addPage(stem, src, dest, result);
return new PageOutcome(src, result);
} catch (IOException e) {
throw new UncheckedIOException("failed to process " + src, e);
}
}

private static List<PageOutcome> invokeAll(
ExecutorService pool, List<Callable<PageOutcome>> tasks) throws IOException {
List<Future<PageOutcome>> futures;
try {
futures = pool.invokeAll(tasks);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("despeckle run interrupted", e);
}
List<PageOutcome> outcomes = new ArrayList<>(futures.size());
for (Future<PageOutcome> future : futures) {
try {
outcomes.add(future.get());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("despeckle run interrupted", e);
} catch (ExecutionException e) {
throw new IOException("page processing failed", e.getCause());
}
// Re-throw with the page named: Tasks surfaces a task's IOException unchanged, so
// this context reaches the user instead of being lost in a generic wrapper.
throw new IOException("failed to process " + src, e);
}
return outcomes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@
import java.nio.file.Path;
import java.util.Comparator;
import java.util.OptionalInt;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Stream;
import org.jspecify.annotations.Nullable;
import org.slf4j.Logger;
Expand Down Expand Up @@ -77,17 +75,15 @@ public void run(Config config) throws IOException {
}

Path jb2Dir = createWorkDir(config.outPdf());
ExecutorService pool = Executors.newFixedThreadPool(config.jobs());
try {
assembler.assemble(
config.imageDir(),
config.outPdf(),
config.source(),
config.dpi(),
pool,
config.jobs(),
jb2Dir);
} finally {
pool.shutdown();
deleteRecursively(jb2Dir);
}
linearizer.linearize(config.outPdf());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
import java.nio.file.Path;
import java.util.Comparator;
import java.util.OptionalInt;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Stream;
import org.jspecify.annotations.Nullable;
import org.slf4j.Logger;
Expand Down Expand Up @@ -96,41 +94,37 @@ public DespeckleService.Summary run(Config config) throws IOException {
Path extracted = Files.createDirectories(work.resolve("in"));
Path cleaned = Files.createDirectories(work.resolve("clean"));
Path jbig2Dir = Files.createDirectories(work.resolve("jb2"));
ExecutorService pool = Executors.newFixedThreadPool(config.jobs());
DespeckleService.Summary summary;
try {
int dpi =
config.options().dpi().isPresent()
? config.options().dpi().getAsInt()
: extractor.dominantDpi(config.inputPdf());
LOG.info(
"pipeline: {} -> {} at {} dpi", config.inputPdf(), config.outputPdf(), dpi);
int dpi =
config.options().dpi().isPresent()
? config.options().dpi().getAsInt()
: extractor.dominantDpi(config.inputPdf());
LOG.info("pipeline: {} -> {} at {} dpi", config.inputPdf(), config.outputPdf(), dpi);

extractor.extract(config.inputPdf(), extracted, config.jobs(), pool);
// Each step fans out on its own batch-owned workers bounded by the same jobs budget,
// so the steps never hold idle threads for each other (the old shared outer pool sat
// idle through the whole despeckle step).
extractor.extract(config.inputPdf(), extracted, config.jobs());

DespeckleService.Config clean =
new DespeckleService.Config(
extracted,
cleaned,
OutputFormat.TIFF,
"*.tif",
config.jobs(),
true,
config.options().withDpi(dpi),
config.reportDir(),
config.flipbook());
summary = despeckleService.run(clean);
DespeckleService.Config clean =
new DespeckleService.Config(
extracted,
cleaned,
OutputFormat.TIFF,
"*.tif",
config.jobs(),
true,
config.options().withDpi(dpi),
config.reportDir(),
config.flipbook());
DespeckleService.Summary summary = despeckleService.run(clean);

assembler.assemble(
cleaned,
config.outputPdf(),
config.inputPdf(),
OptionalInt.of(dpi),
pool,
jbig2Dir);
} finally {
pool.shutdown();
}
assembler.assemble(
cleaned,
config.outputPdf(),
config.inputPdf(),
OptionalInt.of(dpi),
config.jobs(),
jbig2Dir);
linearizer.linearize(config.outputPdf());
LOG.info("wrote {}", config.outputPdf());
return summary;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import java.nio.file.Path;
import java.util.List;
import java.util.OptionalInt;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import org.jspecify.annotations.Nullable;

Expand Down Expand Up @@ -111,8 +110,7 @@ public int dominantDpi(Path pdf) {
}

@Override
public void extract(Path pdf, Path outDir, int jobs, ExecutorService pool)
throws IOException {
public void extract(Path pdf, Path outDir, int jobs) throws IOException {
if (failOn != null && pdf.getFileName().toString().contains(failOn)) {
throw new IOException("fake extract failed for " + pdf);
}
Expand All @@ -135,7 +133,7 @@ public void assemble(
Path outPdf,
@Nullable Path source,
OptionalInt forcedDpi,
ExecutorService pool,
int jobs,
Path scratchDir)
throws IOException {
Files.writeString(outPdf, "%PDF-fake", StandardCharsets.UTF_8);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import java.io.IOException;
import java.nio.file.Path;
import java.util.OptionalInt;
import java.util.concurrent.ExecutorService;
import org.jspecify.annotations.Nullable;

/**
Expand Down Expand Up @@ -34,7 +33,7 @@ public PdfBoxJbig2Assembler() {}
* @param outPdf the lossless-JBIG2 PDF to write
* @param source a PDF whose Info dict, XMP and version are inherited, or {@code null} for none
* @param forcedDpi a single DPI to size every page with, or empty to read each image's own
* @param pool the worker pool the per-page {@code jbig2} encodes run on
* @param jobs how many {@code jbig2} children may encode at once
* @param scratchDir scratch directory for the intermediate per-page JBIG2 streams
* (caller-owned)
* @throws IOException if the directory is empty, a tool fails, or the write fails
Expand All @@ -45,9 +44,9 @@ public void assemble(
Path outPdf,
@Nullable Path source,
OptionalInt forcedDpi,
ExecutorService pool,
int jobs,
Path scratchDir)
throws IOException {
delegate.assemble(imageDir, outPdf, source, forcedDpi, pool, scratchDir);
delegate.assemble(imageDir, outPdf, source, forcedDpi, jobs, scratchDir);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import io.github.p4suta.despeckle.port.PdfImageExtractor;
import java.io.IOException;
import java.nio.file.Path;
import java.util.concurrent.ExecutorService;

/**
* Despeckle's {@link PdfImageExtractor} adapter — a wrapper over the shared {@code
Expand Down Expand Up @@ -34,10 +33,10 @@ public int dominantDpi(Path pdf) throws IOException {

/**
* Extract all pages of {@code pdf} into {@code outDir} as TIFFs, parallelized over page-range
* chunks. {@code jobs} bounds both the chunk count and the pool slots used.
* chunks. at most {@code jobs} chunks run at once.
*/
@Override
public void extract(Path pdf, Path outDir, int jobs, ExecutorService pool) throws IOException {
delegate.extract(pdf, outDir, jobs, pool);
public void extract(Path pdf, Path outDir, int jobs) throws IOException {
delegate.extract(pdf, outDir, jobs);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

/**
Expand All @@ -26,10 +21,10 @@
* <p>The launch helpers stay local rather than routing through {@code :shared:process}'s {@code
* ProcessRunner}/{@code Tasks}: {@link #capture} returns the raw {@code byte[]} a JBIG2 stream
* needs (the shared runner decodes to a UTF-8 {@code String}, which is lossy for binary output),
* and {@link #run}/{@link #awaitAll} raise/propagate the tagged {@link DespeckleException} kind
* (the shared {@code Tasks} flattens any non-{@code IOException} cause, losing the kind and its
* exit code). The {@code qpdf} call site, whose output is text and whose exit-3 tolerance the
* shared runner models directly, uses {@code ProcessRunner} instead.
* and {@link #run} raises/propagates the tagged {@link DespeckleException} kind (the shared {@code
* Tasks} flattens any non-{@code IOException} cause, losing the kind and its exit code). The {@code
* qpdf} call site, whose output is text and whose exit-3 tolerance the shared runner models
* directly, uses {@code ProcessRunner} instead.
*
* <p>Public so the sibling {@code infrastructure.pdf} adapters can call it; it never leaves the
* {@code :infrastructure} module (the application and CLI layers depend only on the {@code :port}
Expand Down Expand Up @@ -121,41 +116,4 @@ private static void awaitExit(Process process, List<String> command, long timeou
null);
}
}

/**
* Run every task on {@code pool}, in submission order, surfacing the first failure as
* IOException.
*/
public static <T> List<T> awaitAll(ExecutorService pool, List<Callable<T>> tasks)
throws IOException {
List<Future<T>> futures;
try {
futures = pool.invokeAll(tasks);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("pipeline interrupted", e);
}
List<T> results = new ArrayList<>(futures.size());
for (Future<T> future : futures) {
try {
results.add(future.get());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("pipeline interrupted", e);
} catch (ExecutionException e) {
Throwable cause = e.getCause();
// Preserve a tagged domain failure (e.g. NATIVE_TOOL_FAILED / IMAGE_UNREADABLE
// raised inside a worker) so its kind — and exit code — survives the pool boundary
// instead of being flattened to a generic IOException.
if (cause instanceof DespeckleException de) {
throw de;
}
if (cause instanceof IOException io) {
throw io;
}
throw new IOException("pipeline task failed", cause);
}
}
return results;
}
}
Loading
Loading