Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions _typos.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,7 @@ Pagent = "Pagent"
Flavour = "Flavour"
flavours = "flavours"
initialise = "initialise"
# The PDF spec's own key name `DecodeParms` (and PDFBox's COSName.DECODE_PARMS
# constant mirroring it) — third-party/spec identifiers we must name verbatim.
Parms = "Parms"
PARMS = "PARMS"
4 changes: 4 additions & 0 deletions despeckle/application/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ dependencies {
// implementations are no longer duplicated here. Static calls returning java.base types only, so
// implementation (not api) is the right scope.
implementation(project(":shared:io"))
// Tasks.awaitAll(Workers...): the shared fail-fast page fan-out (batch-owned executor,
// sibling interruption, quiescence before the failure propagates) DespeckleService runs its
// per-page work on.
implementation(project(":shared:process"))
implementation(libs.slf4j.api)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,13 @@
import io.github.p4suta.shared.io.CorpusFiles;
import io.github.p4suta.shared.io.OutputDirs;
import io.github.p4suta.shared.kernel.PageProgressListener;
import io.github.p4suta.shared.process.Tasks;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import org.jspecify.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -88,8 +83,8 @@ public Summary run(Config config) throws IOException {
/**
* Execute a run, reporting each finished page to {@code progress}.
*
* @param progress called once per page as it completes (1-based count, total page count); may
* be invoked from the worker threads, so it must be thread-safe
* @param progress called once per page as it completes (1-based count, total page count),
* always on the calling thread and in order
*/
public Summary run(Config config, PageProgressListener progress) throws IOException {
OutputDirs.prepare(config.outputDir(), config.force());
Expand All @@ -106,27 +101,25 @@ public Summary run(Config config, PageProgressListener progress) throws IOExcept
? Reporter.noOp()
: reporterFactory.create(config.reportDir(), config.flipbook());

AtomicInteger done = new AtomicInteger();
List<PageOutcome> outcomes;
ExecutorService pool = Executors.newFixedThreadPool(config.jobs());
try {
List<Callable<PageOutcome>> tasks = new ArrayList<>(files.size());
for (Path src : files) {
tasks.add(
() -> {
PageOutcome outcome = processOne(src, config, report);
int n = done.incrementAndGet();
progress.onPage(n, files.size());
if (n % PROGRESS_EVERY == 0 || n == files.size()) {
LOG.info("{}/{}", n, files.size());
List<Callable<PageOutcome>> tasks = new ArrayList<>(files.size());
for (Path src : files) {
tasks.add(() -> processOne(src, config, report));
}
// Platform workers: each page is CPU-bound Leptonica work (FFM downcalls pin virtual
// threads' carriers). The fan-out fails fast and quiesces before throwing, so a failed
// run never leaves workers writing into the output directory. Progress and the human
// log arrive on this thread, ordered.
List<PageOutcome> outcomes =
Tasks.awaitAll(
Tasks.Workers.platform(config.jobs()),
tasks,
"despeckle",
(done, total) -> {
progress.onPage(done, total);
if (done % PROGRESS_EVERY == 0 || done == total) {
LOG.info("{}/{}", done, total);
}
return outcome;
});
}
outcomes = invokeAll(pool, tasks);
} finally {
pool.shutdown();
}

report.finish();

Expand All @@ -152,7 +145,7 @@ public Summary run(Config config, PageProgressListener progress) throws IOExcept

private record PageOutcome(Path source, ProcessResult result) {}

private PageOutcome processOne(Path src, Config config, Reporter report) {
private PageOutcome processOne(Path src, Config config, Reporter report) throws IOException {
Path dest =
CorpusFiles.mirrorDestination(
src, config.inputDir(), config.outputDir(), config.format().extension());
Expand All @@ -166,30 +159,9 @@ private PageOutcome processOne(Path src, Config config, Reporter report) {
report.addPage(stem, src, dest, result);
return new PageOutcome(src, result);
} catch (IOException e) {
throw new UncheckedIOException("failed to process " + src, e);
}
}

private static List<PageOutcome> invokeAll(
ExecutorService pool, List<Callable<PageOutcome>> tasks) throws IOException {
List<Future<PageOutcome>> futures;
try {
futures = pool.invokeAll(tasks);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("despeckle run interrupted", e);
}
List<PageOutcome> outcomes = new ArrayList<>(futures.size());
for (Future<PageOutcome> future : futures) {
try {
outcomes.add(future.get());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("despeckle run interrupted", e);
} catch (ExecutionException e) {
throw new IOException("page processing failed", e.getCause());
}
// Re-throw with the page named: Tasks surfaces a task's IOException unchanged, so
// this context reaches the user instead of being lost in a generic wrapper.
throw new IOException("failed to process " + src, e);
}
return outcomes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@
import java.nio.file.Path;
import java.util.Comparator;
import java.util.OptionalInt;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Stream;
import org.jspecify.annotations.Nullable;
import org.slf4j.Logger;
Expand Down Expand Up @@ -77,17 +75,15 @@ public void run(Config config) throws IOException {
}

Path jb2Dir = createWorkDir(config.outPdf());
ExecutorService pool = Executors.newFixedThreadPool(config.jobs());
try {
assembler.assemble(
config.imageDir(),
config.outPdf(),
config.source(),
config.dpi(),
pool,
config.jobs(),
jb2Dir);
} finally {
pool.shutdown();
deleteRecursively(jb2Dir);
}
linearizer.linearize(config.outPdf());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
import java.nio.file.Path;
import java.util.Comparator;
import java.util.OptionalInt;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Stream;
import org.jspecify.annotations.Nullable;
import org.slf4j.Logger;
Expand Down Expand Up @@ -96,41 +94,37 @@ public DespeckleService.Summary run(Config config) throws IOException {
Path extracted = Files.createDirectories(work.resolve("in"));
Path cleaned = Files.createDirectories(work.resolve("clean"));
Path jbig2Dir = Files.createDirectories(work.resolve("jb2"));
ExecutorService pool = Executors.newFixedThreadPool(config.jobs());
DespeckleService.Summary summary;
try {
int dpi =
config.options().dpi().isPresent()
? config.options().dpi().getAsInt()
: extractor.dominantDpi(config.inputPdf());
LOG.info(
"pipeline: {} -> {} at {} dpi", config.inputPdf(), config.outputPdf(), dpi);
int dpi =
config.options().dpi().isPresent()
? config.options().dpi().getAsInt()
: extractor.dominantDpi(config.inputPdf());
LOG.info("pipeline: {} -> {} at {} dpi", config.inputPdf(), config.outputPdf(), dpi);

extractor.extract(config.inputPdf(), extracted, config.jobs(), pool);
// Each step fans out on its own batch-owned workers bounded by the same jobs budget,
// so the steps never hold idle threads for each other (the old shared outer pool sat
// idle through the whole despeckle step).
extractor.extract(config.inputPdf(), extracted, config.jobs());

DespeckleService.Config clean =
new DespeckleService.Config(
extracted,
cleaned,
OutputFormat.TIFF,
"*.tif",
config.jobs(),
true,
config.options().withDpi(dpi),
config.reportDir(),
config.flipbook());
summary = despeckleService.run(clean);
DespeckleService.Config clean =
new DespeckleService.Config(
extracted,
cleaned,
OutputFormat.TIFF,
"*.tif",
config.jobs(),
true,
config.options().withDpi(dpi),
config.reportDir(),
config.flipbook());
DespeckleService.Summary summary = despeckleService.run(clean);

assembler.assemble(
cleaned,
config.outputPdf(),
config.inputPdf(),
OptionalInt.of(dpi),
pool,
jbig2Dir);
} finally {
pool.shutdown();
}
assembler.assemble(
cleaned,
config.outputPdf(),
config.inputPdf(),
OptionalInt.of(dpi),
config.jobs(),
jbig2Dir);
linearizer.linearize(config.outputPdf());
LOG.info("wrote {}", config.outputPdf());
return summary;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import java.nio.file.Path;
import java.util.List;
import java.util.OptionalInt;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import org.jspecify.annotations.Nullable;

Expand Down Expand Up @@ -111,8 +110,7 @@ public int dominantDpi(Path pdf) {
}

@Override
public void extract(Path pdf, Path outDir, int jobs, ExecutorService pool)
throws IOException {
public void extract(Path pdf, Path outDir, int jobs) throws IOException {
if (failOn != null && pdf.getFileName().toString().contains(failOn)) {
throw new IOException("fake extract failed for " + pdf);
}
Expand All @@ -135,7 +133,7 @@ public void assemble(
Path outPdf,
@Nullable Path source,
OptionalInt forcedDpi,
ExecutorService pool,
int jobs,
Path scratchDir)
throws IOException {
Files.writeString(outPdf, "%PDF-fake", StandardCharsets.UTF_8);
Expand Down
31 changes: 31 additions & 0 deletions despeckle/docs/cleaner-baseline.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# despeckle cleaner op-level baseline

Generated by `CleanerBenchmark` (`./gradlew :despeckle:infrastructure:benchCleaner`, dev container).
Times each Leptonica primitive the page cleaner composes on a synthetic
600-dpi A5 page (3496x4961 px, fixed seed). Re-run after any change to the
cleaner or the imaging bindings and compare before merging.

- Date (UTC): 2026-06-10 06:25:05
- Host: Linux amd64, 8 CPUs
- Samples: median of 10 reps after 2 warmups; single-threaded.

| op | median (ms) | min (ms) | calls/clean() | est. share of clean() |
|---|---:|---:|---:|---:|
| read TIFF-G4 | 2.59 | 2.52 | 1 | 1.5% |
| selectBySize k=6 (page) | 15.22 | 14.98 | 1 | 8.7% |
| selectBySize 15 (page) | 15.19 | 14.80 | 1 | 8.7% |
| selectBySize k=6 (inverted) | 22.16 | 21.89 | 2 | 25.3% |
| dilate 43x43 (text mask) | 37.52 | 37.01 | 1 | 21.5% |
| open 7x7 (page) | 12.18 | 12.04 | 1 | 7.0% |
| invert | 0.26 | 0.25 | 2 | 0.3% |
| subtract | 0.38 | 0.36 | 5 | 1.1% |
| and | 0.40 | 0.38 | 1 | 0.2% |
| or | 0.33 | 0.32 | 3 | 0.6% |
| countConnComp | 11.82 | 11.46 | 2 | 13.5% |
| countPixels | 0.41 | 0.41 | 2 | 0.5% |
| write TIFF-G4 | 6.44 | 6.29 | 1 | 3.7% |
| **Σ(median × calls)** | 161.74 | | | 92.5% |
| **clean() end-to-end** | 174.91 | 173.28 | 1 | 100% |

The Σ row landing near 100% means the table accounts for clean()'s real cost;
a large gap points at untimed work (allocation churn, codec internals).
18 changes: 18 additions & 0 deletions despeckle/infrastructure/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,21 @@ tasks.named<JacocoCoverageVerification>("jacocoTestCoverageVerification") {
}

tasks.named("check") { dependsOn("jacocoTestCoverageVerification") }

// ---- Cleaner op-level micro-benchmark (see despeckle/docs/cleaner-baseline.md) ------------------
// Times each Leptonica primitive the page cleaner composes, plus end-to-end clean(), on a
// deterministic synthetic 600-dpi A5 page — the measurement every cleaner/imaging optimization is
// judged against. Knob: -Preps=N (default 10). Dev-container only (needs the native Leptonica).
tasks.register<JavaExec>("benchCleaner") {
group = "verification"
description = "Micro-benchmark cleaner ops + end-to-end clean(); writes despeckle/docs/cleaner-baseline.md"
dependsOn(tasks.named("testClasses"))
classpath = sourceSets["test"].runtimeClasspath
mainClass = "io.github.p4suta.despeckle.infrastructure.tools.CleanerBenchmark"
workingDir = rootDir
args =
listOf(
"despeckle/docs/cleaner-baseline.md",
providers.gradleProperty("reps").getOrElse("10"),
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import java.io.IOException;
import java.nio.file.Path;
import java.util.OptionalInt;
import java.util.concurrent.ExecutorService;
import org.jspecify.annotations.Nullable;

/**
Expand Down Expand Up @@ -34,7 +33,7 @@ public PdfBoxJbig2Assembler() {}
* @param outPdf the lossless-JBIG2 PDF to write
* @param source a PDF whose Info dict, XMP and version are inherited, or {@code null} for none
* @param forcedDpi a single DPI to size every page with, or empty to read each image's own
* @param pool the worker pool the per-page {@code jbig2} encodes run on
* @param jobs how many {@code jbig2} children may encode at once
* @param scratchDir scratch directory for the intermediate per-page JBIG2 streams
* (caller-owned)
* @throws IOException if the directory is empty, a tool fails, or the write fails
Expand All @@ -45,9 +44,9 @@ public void assemble(
Path outPdf,
@Nullable Path source,
OptionalInt forcedDpi,
ExecutorService pool,
int jobs,
Path scratchDir)
throws IOException {
delegate.assemble(imageDir, outPdf, source, forcedDpi, pool, scratchDir);
delegate.assemble(imageDir, outPdf, source, forcedDpi, jobs, scratchDir);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import io.github.p4suta.despeckle.port.PdfImageExtractor;
import java.io.IOException;
import java.nio.file.Path;
import java.util.concurrent.ExecutorService;

/**
* Despeckle's {@link PdfImageExtractor} adapter — a wrapper over the shared {@code
Expand Down Expand Up @@ -34,10 +33,10 @@ public int dominantDpi(Path pdf) throws IOException {

/**
* Extract all pages of {@code pdf} into {@code outDir} as TIFFs, parallelized over page-range
* chunks. {@code jobs} bounds both the chunk count and the pool slots used.
* chunks. at most {@code jobs} chunks run at once.
*/
@Override
public void extract(Path pdf, Path outDir, int jobs, ExecutorService pool) throws IOException {
delegate.extract(pdf, outDir, jobs, pool);
public void extract(Path pdf, Path outDir, int jobs) throws IOException {
delegate.extract(pdf, outDir, jobs);
}
}
Loading
Loading