Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
3fca5ad
feat(pipeline): add stage-level timings and a benchmark harness
P4suta Jun 10, 2026
62c37bf
perf(extract): remux all-CCITT sources instead of decoding, finer chunks
P4suta Jun 10, 2026
86074bf
fix(concurrency): fail-fast page fan-out that keeps exception identity
P4suta Jun 10, 2026
b7c20be
test(despeckle): add an op-level micro-benchmark for the page cleaner
P4suta Jun 10, 2026
d311176
perf(despeckle): skip the metrics-only component-counting passes
P4suta Jun 10, 2026
5e58d41
perf(imaging): route morphology through Leptonica's DWA kernels, exactly
P4suta Jun 10, 2026
97b3db2
chore(lint): teach typos the PDF spec's DecodeParms, reword "mis-sizes"
P4suta Jun 10, 2026
fbe6417
Merge branch 'perf/ccitt-passthrough-extract' into concurrency/tasks-…
P4suta Jun 10, 2026
8601937
test(process): make the contiguous-progress test deterministic
P4suta Jun 10, 2026
6512ac0
Merge branch 'concurrency/tasks-failfast' into perf/despeckle-bench
P4suta Jun 10, 2026
a2d54a2
Merge branch 'perf/despeckle-bench' into perf/despeckle-skip-metrics
P4suta Jun 10, 2026
5301445
Merge branch 'perf/despeckle-skip-metrics' into perf/despeckle-dwa-mo…
P4suta Jun 10, 2026
efac0ad
style: rewrap the line the failing-task rename pushed past 100 columns
P4suta Jun 10, 2026
65bb8ca
Merge branch 'concurrency/tasks-failfast' into perf/despeckle-bench
P4suta Jun 10, 2026
cda1e87
Merge branch 'perf/despeckle-bench' into perf/despeckle-skip-metrics
P4suta Jun 10, 2026
0daa41b
Merge branch 'perf/despeckle-skip-metrics' into perf/despeckle-dwa-mo…
P4suta Jun 10, 2026
543a510
docs: align the generated baseline docs with main
P4suta Jun 10, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions _typos.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,7 @@ Pagent = "Pagent"
Flavour = "Flavour"
flavours = "flavours"
initialise = "initialise"
# The PDF spec's own key name `DecodeParms` (and PDFBox's COSName.DECODE_PARMS
# constant mirroring it) — third-party/spec identifiers we must name verbatim.
Parms = "Parms"
PARMS = "PARMS"
4 changes: 4 additions & 0 deletions despeckle/application/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ dependencies {
// implementations are no longer duplicated here. Static calls returning java.base types only, so
// implementation (not api) is the right scope.
implementation(project(":shared:io"))
// Tasks.awaitAll(Workers...): the shared fail-fast page fan-out (batch-owned executor,
// sibling interruption, quiescence before the failure propagates) DespeckleService runs its
// per-page work on.
implementation(project(":shared:process"))
implementation(libs.slf4j.api)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,13 @@
import io.github.p4suta.shared.io.CorpusFiles;
import io.github.p4suta.shared.io.OutputDirs;
import io.github.p4suta.shared.kernel.PageProgressListener;
import io.github.p4suta.shared.process.Tasks;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import org.jspecify.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -75,7 +70,8 @@ public record Config(
* Aggregate outcome of a run.
*
* @param pages number of pages processed
* @param componentsRemoved total components removed across all pages
* @param componentsRemoved total components removed across all pages; {@code 0} when no report
* consumed component stats (counting is skipped for speed without a report)
* @param overRemovalWarnings number of pages flagged for possible over-removal
*/
public record Summary(int pages, long componentsRemoved, int overRemovalWarnings) {}
Expand All @@ -88,8 +84,8 @@ public Summary run(Config config) throws IOException {
/**
* Execute a run, reporting each finished page to {@code progress}.
*
* @param progress called once per page as it completes (1-based count, total page count); may
* be invoked from the worker threads, so it must be thread-safe
* @param progress called once per page as it completes (1-based count, total page count),
* always on the calling thread and in order
*/
public Summary run(Config config, PageProgressListener progress) throws IOException {
OutputDirs.prepare(config.outputDir(), config.force());
Expand All @@ -101,39 +97,45 @@ public Summary run(Config config, PageProgressListener progress) throws IOExcept
}
LOG.info("despeckling {} page(s) with {} thread(s)", files.size(), config.jobs());

@Nullable Path reportDir = config.reportDir();
boolean reporting = reportDir != null;
Reporter report =
config.reportDir() == null
? Reporter.noOp()
: reporterFactory.create(config.reportDir(), config.flipbook());

AtomicInteger done = new AtomicInteger();
List<PageOutcome> outcomes;
ExecutorService pool = Executors.newFixedThreadPool(config.jobs());
try {
List<Callable<PageOutcome>> tasks = new ArrayList<>(files.size());
for (Path src : files) {
tasks.add(
() -> {
PageOutcome outcome = processOne(src, config, report);
int n = done.incrementAndGet();
progress.onPage(n, files.size());
if (n % PROGRESS_EVERY == 0 || n == files.size()) {
LOG.info("{}/{}", n, files.size());
reportDir != null
? reporterFactory.create(reportDir, config.flipbook())
: Reporter.noOp();
// The report is the only consumer of per-page component counts, and counting is a full
// connected-component labeling twice per page — skip it when no report will be written.
ProcessOptions options =
reporting ? config.options() : config.options().withoutComponentStats();

List<Callable<PageOutcome>> tasks = new ArrayList<>(files.size());
for (Path src : files) {
tasks.add(() -> processOne(src, config, options, report));
}
// Platform workers: each page is CPU-bound Leptonica work (FFM downcalls pin virtual
// threads' carriers). The fan-out fails fast and quiesces before throwing, so a failed
// run never leaves workers writing into the output directory. Progress and the human
// log arrive on this thread, ordered.
List<PageOutcome> outcomes =
Tasks.awaitAll(
Tasks.Workers.platform(config.jobs()),
tasks,
"despeckle",
(done, total) -> {
progress.onPage(done, total);
if (done % PROGRESS_EVERY == 0 || done == total) {
LOG.info("{}/{}", done, total);
}
return outcome;
});
}
outcomes = invokeAll(pool, tasks);
} finally {
pool.shutdown();
}

report.finish();

long totalRemoved = 0;
long blackRemoved = 0;
int warnings = 0;
for (PageOutcome outcome : outcomes) {
totalRemoved += outcome.result().componentsRemoved();
blackRemoved += outcome.result().blackPixelsRemoved();
if (outcome.result().isOverRemoval()) {
warnings++;
LOG.warn(
Expand All @@ -142,17 +144,28 @@ public Summary run(Config config, PageProgressListener progress) throws IOExcept
Math.round(outcome.result().removedBlackPixelRatio() * 100));
}
}
LOG.info(
"done: {} page(s), {} component(s) removed, {} over-removal warning(s)",
files.size(),
totalRemoved,
warnings);
if (reporting) {
LOG.info(
"done: {} page(s), {} component(s) removed, {} over-removal warning(s)",
files.size(),
totalRemoved,
warnings);
} else {
// Without a report nothing counted components (the counting passes are skipped for
// speed), so the summary speaks in the always-measured black-pixel terms.
LOG.info(
"done: {} page(s), {} black pixel(s) removed, {} over-removal warning(s)",
files.size(),
blackRemoved,
warnings);
}
return new Summary(files.size(), totalRemoved, warnings);
}

private record PageOutcome(Path source, ProcessResult result) {}

private PageOutcome processOne(Path src, Config config, Reporter report) {
private PageOutcome processOne(Path src, Config config, ProcessOptions options, Reporter report)
throws IOException {
Path dest =
CorpusFiles.mirrorDestination(
src, config.inputDir(), config.outputDir(), config.format().extension());
Expand All @@ -161,35 +174,14 @@ private PageOutcome processOne(Path src, Config config, Reporter report) {
if (parent != null) {
Files.createDirectories(parent);
}
ProcessResult result = pageCleaner.clean(src, dest, config.format(), config.options());
ProcessResult result = pageCleaner.clean(src, dest, config.format(), options);
Path stem = config.inputDir().relativize(src);
report.addPage(stem, src, dest, result);
return new PageOutcome(src, result);
} catch (IOException e) {
throw new UncheckedIOException("failed to process " + src, e);
}
}

private static List<PageOutcome> invokeAll(
ExecutorService pool, List<Callable<PageOutcome>> tasks) throws IOException {
List<Future<PageOutcome>> futures;
try {
futures = pool.invokeAll(tasks);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("despeckle run interrupted", e);
}
List<PageOutcome> outcomes = new ArrayList<>(futures.size());
for (Future<PageOutcome> future : futures) {
try {
outcomes.add(future.get());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("despeckle run interrupted", e);
} catch (ExecutionException e) {
throw new IOException("page processing failed", e.getCause());
}
// Re-throw with the page named: Tasks surfaces a task's IOException unchanged, so
// this context reaches the user instead of being lost in a generic wrapper.
throw new IOException("failed to process " + src, e);
}
return outcomes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@
import java.nio.file.Path;
import java.util.Comparator;
import java.util.OptionalInt;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Stream;
import org.jspecify.annotations.Nullable;
import org.slf4j.Logger;
Expand Down Expand Up @@ -77,17 +75,15 @@ public void run(Config config) throws IOException {
}

Path jb2Dir = createWorkDir(config.outPdf());
ExecutorService pool = Executors.newFixedThreadPool(config.jobs());
try {
assembler.assemble(
config.imageDir(),
config.outPdf(),
config.source(),
config.dpi(),
pool,
config.jobs(),
jb2Dir);
} finally {
pool.shutdown();
deleteRecursively(jb2Dir);
}
linearizer.linearize(config.outPdf());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,14 +148,23 @@ public Summary run(Config config) throws IOException {

if (config.reportParent() != null) {
batchReporter.write(config.reportParent(), books);
LOG.info(
"done: {} ok, {} skipped, {} failed, {} page(s), {} component(s) removed",
ok,
skipped,
failed,
totalPages,
totalComponentsRemoved);
} else {
// Without reports the runs skip component counting (an expensive labeling, twice per
// page), so a component total would always read 0 — leave it out of the line.
LOG.info(
"done: {} ok, {} skipped, {} failed, {} page(s)",
ok,
skipped,
failed,
totalPages);
}
LOG.info(
"done: {} ok, {} skipped, {} failed, {} page(s), {} component(s) removed",
ok,
skipped,
failed,
totalPages,
totalComponentsRemoved);
return new Summary(ok, skipped, failed, totalPages, totalComponentsRemoved);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
import java.nio.file.Path;
import java.util.Comparator;
import java.util.OptionalInt;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Stream;
import org.jspecify.annotations.Nullable;
import org.slf4j.Logger;
Expand Down Expand Up @@ -96,41 +94,37 @@ public DespeckleService.Summary run(Config config) throws IOException {
Path extracted = Files.createDirectories(work.resolve("in"));
Path cleaned = Files.createDirectories(work.resolve("clean"));
Path jbig2Dir = Files.createDirectories(work.resolve("jb2"));
ExecutorService pool = Executors.newFixedThreadPool(config.jobs());
DespeckleService.Summary summary;
try {
int dpi =
config.options().dpi().isPresent()
? config.options().dpi().getAsInt()
: extractor.dominantDpi(config.inputPdf());
LOG.info(
"pipeline: {} -> {} at {} dpi", config.inputPdf(), config.outputPdf(), dpi);
int dpi =
config.options().dpi().isPresent()
? config.options().dpi().getAsInt()
: extractor.dominantDpi(config.inputPdf());
LOG.info("pipeline: {} -> {} at {} dpi", config.inputPdf(), config.outputPdf(), dpi);

extractor.extract(config.inputPdf(), extracted, config.jobs(), pool);
// Each step fans out on its own batch-owned workers bounded by the same jobs budget,
// so the steps never hold idle threads for each other (the old shared outer pool sat
// idle through the whole despeckle step).
extractor.extract(config.inputPdf(), extracted, config.jobs());

DespeckleService.Config clean =
new DespeckleService.Config(
extracted,
cleaned,
OutputFormat.TIFF,
"*.tif",
config.jobs(),
true,
config.options().withDpi(dpi),
config.reportDir(),
config.flipbook());
summary = despeckleService.run(clean);
DespeckleService.Config clean =
new DespeckleService.Config(
extracted,
cleaned,
OutputFormat.TIFF,
"*.tif",
config.jobs(),
true,
config.options().withDpi(dpi),
config.reportDir(),
config.flipbook());
DespeckleService.Summary summary = despeckleService.run(clean);

assembler.assemble(
cleaned,
config.outputPdf(),
config.inputPdf(),
OptionalInt.of(dpi),
pool,
jbig2Dir);
} finally {
pool.shutdown();
}
assembler.assemble(
cleaned,
config.outputPdf(),
config.inputPdf(),
OptionalInt.of(dpi),
config.jobs(),
jbig2Dir);
linearizer.linearize(config.outputPdf());
LOG.info("wrote {}", config.outputPdf());
return summary;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.github.p4suta.despeckle.application;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

Expand Down Expand Up @@ -141,4 +142,30 @@ void drivesTheReporterWhenAReportDirIsGiven(@TempDir Path tmp) throws IOExceptio
assertEquals(2, factory.reporter.pages.get(), "every page is reported");
assertEquals(1, factory.reporter.finished.get(), "the report is finalized once");
}

@Test
void skipsComponentCountingWithoutAReport(@TempDir Path tmp) throws IOException {
Path in = tmp.resolve("in");
writeInputs(in, 1);
FakePageCleaner cleaner = new FakePageCleaner(LIGHT);

new DespeckleService(cleaner, new RecordingReporterFactory())
.run(config(in, tmp.resolve("out"), false, null));

ProcessOptions seen = java.util.Objects.requireNonNull(cleaner.lastOptions);
assertFalse(seen.collectComponentStats(), "no report -> counting passes are skipped");
}

@Test
void keepsComponentCountingForTheReportPath(@TempDir Path tmp) throws IOException {
Path in = tmp.resolve("in");
writeInputs(in, 1);
FakePageCleaner cleaner = new FakePageCleaner(LIGHT);

new DespeckleService(cleaner, new RecordingReporterFactory())
.run(config(in, tmp.resolve("out"), false, tmp.resolve("report")));

ProcessOptions seen = java.util.Objects.requireNonNull(cleaner.lastOptions);
assertTrue(seen.collectComponentStats(), "the report consumes the counts");
}
}
Loading
Loading