Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions _typos.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,7 @@ Pagent = "Pagent"
Flavour = "Flavour"
flavours = "flavours"
initialise = "initialise"
# The PDF spec's own key name `DecodeParms` (and PDFBox's COSName.DECODE_PARMS
# constant mirroring it) — third-party/spec identifiers we must name verbatim.
Parms = "Parms"
PARMS = "PARMS"
1 change: 1 addition & 0 deletions pipeline/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ never stops the rest; existing outputs are skipped unless `--force`).
| `--pdf-a` | off | emit PDF/A-2b conformance |
| `--force` | off | overwrite an existing output (batch: regenerate, don't skip) |
| `--progress-file <path>` | — | write machine-readable JSONL progress events (single input only) |
| `--timings` | off | print a per-stage wall-clock breakdown to stderr when each run ends |
| `-i, --interactive` | off | guided mode: prompt for the input, options and output |
| `-h, --help` | — | show help and exit |
| `-V, --version` | — | print version and exit |
Expand Down
47 changes: 47 additions & 0 deletions pipeline/app/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ dependencies {
implementation(libs.commons.cli)
implementation(libs.slf4j.api)
runtimeOnly(libs.slf4j.simple)

// The benchmark fixture generator (test sources, never shipped — mirroring register's
// createSamplePdf) draws synthetic scan pages with PDFBox directly.
testImplementation(libs.pdfbox)
}

// The one place native access is granted to the launched app; run, test and JavaExec inherit it.
Expand Down Expand Up @@ -85,3 +89,46 @@ selfContainedApp {
// jbig2 (its register stage writes TIFF-G4; the spread pack embeds CCITT G4).
bundleQpdf(this, libs.versions.qpdf.get())
}

// ---- Stage-level benchmark (see pipeline/docs/perf-baseline.md) ---------------------------------

// Deterministic synthetic scan book for the benchmark: an existing output is reused, so the
// generation cost (a minute at 200 pages × 600 dpi) is paid once. Knob: -Ppages=N (default 200).
tasks.register<JavaExec>("createSampleScan") {
group = "verification"
description = "Generate the synthetic bitonal scan book the benchmark converts (cached)"
dependsOn(tasks.named("testClasses"))
classpath = sourceSets["test"].runtimeClasspath
mainClass = "io.github.p4suta.pipeline.tools.SampleScanGenerator"
val pages = providers.gradleProperty("pages").getOrElse("200")
args = listOf("build/test-data/sample-scan-${pages}p.pdf", pages, "600")
}

// Stage-level runtime + memory benchmark (the pdfbook counterpart of tate's benchRuntime): runs the
// installDist launcher in-container with --timings, parses the per-stage breakdown, samples peak
// RSS from /proc, and writes pipeline/docs/perf-baseline.md. Knobs: -Pruns=N (warm runs, default
// 3), -Pjobs=1,8 (comma-separated -j sweep; default auto = the launcher's CPU-count default),
// -Ppages=N (fixture size, default 200), -Pinputs="a.pdf b.pdf" (real books instead of the
// fixture; resolved against the repo root).
tasks.register<JavaExec>("benchPipeline") {
group = "verification"
description = "Benchmark pdfbook stage timings + peak memory; writes pipeline/docs/perf-baseline.md"
dependsOn(tasks.named("installDist"), tasks.named("createSampleScan"))
classpath = sourceSets["test"].runtimeClasspath
mainClass = "io.github.p4suta.pipeline.tools.PipelineBenchmark"
workingDir = rootDir
val runs = providers.gradleProperty("runs").getOrElse("3")
val jobs = providers.gradleProperty("jobs").getOrElse("auto")
val pages = providers.gradleProperty("pages").getOrElse("200")
val extraInputs =
providers
.gradleProperty("inputs")
.orNull
?.split(Regex("\\s+"))
?.filter { it.isNotBlank() }
?: emptyList()
val launcher = "pipeline/app/build/install/pdfbook/bin/pdfbook"
val inputs =
extraInputs.ifEmpty { listOf("pipeline/app/build/test-data/sample-scan-${pages}p.pdf") }
args = listOf(launcher, "qpdf", "pipeline/docs/perf-baseline.md", runs, jobs) + inputs
}
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,13 @@ private static Options buildOptions() {
"Write machine-readable JSONL progress events to this file (single"
+ " input only); used by front ends to report progress.")
.get());
options.addOption(
Option.builder()
.longOpt("timings")
.desc(
"Print a per-stage wall-clock breakdown to stderr when each run"
+ " ends.")
.get());
CliDocs.options(options);
return options;
}
Expand Down Expand Up @@ -351,6 +358,7 @@ record Plan(Path input, Path output, Config config) {}
deskew,
scale,
pdfA,
false,
force);
return new Plan(input, output, config);
}
Expand Down Expand Up @@ -380,14 +388,32 @@ private static String defaultOutput(Path input) {
private static void runOne(Path input, Path output, Config config, @Nullable Path progressFile)
throws IOException {
if (progressFile == null) {
runWith(input, output, config, ProgressSink.NO_OP);
runWith(input, output, config, withTimings(config, ProgressSink.NO_OP));
} else {
try (JsonlFileProgressSink progress = new JsonlFileProgressSink(progressFile)) {
runWith(input, output, config, progress);
runWith(input, output, config, withTimings(config, progress));
}
}
}

/**
* Wraps {@code sink} with a fresh {@link StageTimingSink} when {@code --timings} is set, so
* each run (every book of a batch separately) prints its own per-stage breakdown to stderr.
*/
private static ProgressSink withTimings(Config config, ProgressSink sink) {
if (!config.timings()) {
return sink;
}
StageTimingSink timings = new StageTimingSink(System.err);
if (sink == ProgressSink.NO_OP) {
return timings;
}
return event -> {
sink.emit(event);
timings.emit(event);
};
}

// Resolves the progress sink first so the stages and sink report page-level PageProcessed
// events into the same sink PipelineRunner reports stage boundaries into. With no
// --progress-file the sink is NO_OP and every emit is a no-op.
Expand All @@ -401,9 +427,11 @@ private static void runWith(Path input, Path output, Config config, ProgressSink
stages.add(new RegisterStage(config.jobs(), config.deskew(), config.scale(), progress));
}
if (stages.isEmpty()) {
// --no-despeckle --no-register: the raw pdfimages TIFFs are not CCITT G4, which the
// spread sink's pass-through embedding requires; despeckle/register each re-encode G4
// themselves, so only the no-stage path needs this normalization.
// --no-despeckle --no-register: a non-CCITT source extracts as decoded TIFFs that are
// not the single-strip CCITT G4 the spread sink's pass-through embedding requires;
// despeckle/register each re-encode G4 themselves, so only the no-stage path needs
// this normalization (an all-CCITT source arrives already G4 — then this is a cheap
// lossless re-encode that keeps the path uniform).
stages.add(new G4EncodeStage(config.jobs(), progress));
}
Source source = new PdfExtractSource(input, config.jobs());
Expand Down Expand Up @@ -447,6 +475,7 @@ private static Config parseConfig(CommandLine cmd) throws ParseException {
!cmd.hasOption("no-deskew"),
!cmd.hasOption("no-scale"),
cmd.hasOption("pdf-a"),
cmd.hasOption("timings"),
cmd.hasOption("force"));
}

Expand Down Expand Up @@ -475,5 +504,6 @@ record Config(
boolean deskew,
boolean scale,
boolean pdfA,
boolean timings,
boolean force) {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package io.github.p4suta.pipeline.cli;

import io.github.p4suta.shared.progress.ProgressEvent;
import io.github.p4suta.shared.progress.ProgressSink;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import org.jspecify.annotations.Nullable;

/**
* Measures each stage's wall clock from its {@link ProgressEvent.StageStarted}/{@link
* ProgressEvent.StageCompleted} boundaries and prints a per-stage breakdown when the run ends — the
* {@code --timings} flag's implementation. One line per stage in completion order, then the
* run-wide total:
*
* <pre>{@code
* timing: extract = 4.21s (18.3%)
* timing: despeckle = 9.87s (42.9%)
* timing: total = 23.01s
* }</pre>
*
* <p>The {@code timing: <stage> = <seconds>s} shape is a stable contract the {@code benchPipeline}
* harness parses; keep it machine-readable. A stage still open when the run fails is reported with
* its elapsed-so-far, so a failed run still shows where the time went. Thread-safe like every
* {@link ProgressSink}: events are handled under one lock.
*/
final class StageTimingSink implements ProgressSink {

private final PrintStream out;
private final Object lock = new Object();
private final List<String> stages = new ArrayList<>();
private final List<Long> stageNanos = new ArrayList<>();
private @Nullable String openStage;
private long openedAtNanos;
private long runStartedAtNanos;
private boolean runStarted;

StageTimingSink(PrintStream out) {
this.out = out;
}

@Override
public void emit(ProgressEvent event) {
synchronized (lock) {
switch (event) {
case ProgressEvent.RunStarted ignored -> markRunStarted();
case ProgressEvent.StageStarted s -> {
// Defensive: a sink wired mid-run still measures from the first boundary.
markRunStarted();
openStage = s.stage();
openedAtNanos = System.nanoTime();
}
case ProgressEvent.StageCompleted ignored -> closeOpenStage();
case ProgressEvent.PageProcessed ignored -> {
// Stage boundaries carry all the timing information.
}
case ProgressEvent.RunCompleted ignored -> report();
case ProgressEvent.RunFailed ignored -> report();
}
}
}

private void markRunStarted() {
if (!runStarted) {
runStartedAtNanos = System.nanoTime();
runStarted = true;
}
}

private void closeOpenStage() {
@Nullable String stage = openStage;
if (stage != null) {
stages.add(stage);
stageNanos.add(System.nanoTime() - openedAtNanos);
openStage = null;
}
}

private void report() {
closeOpenStage();
long totalNanos = runStarted ? System.nanoTime() - runStartedAtNanos : 0;
for (int i = 0; i < stages.size(); i++) {
out.printf(
Locale.ROOT,
"timing: %s = %.2fs (%.1f%%)%n",
stages.get(i),
stageNanos.get(i) / 1e9,
totalNanos > 0 ? stageNanos.get(i) * 100.0 / totalNanos : 0.0);
}
out.printf(Locale.ROOT, "timing: total = %.2fs%n", totalNanos / 1e9);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package io.github.p4suta.pipeline.cli;

import static org.assertj.core.api.Assertions.assertThat;

import io.github.p4suta.shared.progress.ProgressEvent;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
import org.junit.jupiter.api.Test;

/**
* Pins the {@code --timings} report: one machine-parseable {@code timing: <stage> = <seconds>s}
* line per completed stage (in completion order, percentages attached) plus a {@code timing: total}
* line, printed only when the run ends — and on failure, the still-open stage is reported with its
* elapsed-so-far. The line shape is the contract the {@code benchPipeline} harness parses.
*/
final class StageTimingSinkTest {

private final ByteArrayOutputStream buf = new ByteArrayOutputStream();
private final StageTimingSink sink =
new StageTimingSink(new PrintStream(buf, true, StandardCharsets.UTF_8));

private String output() {
return buf.toString(StandardCharsets.UTF_8);
}

@Test
void completedRunReportsEachStageInOrderAndATotal() {
sink.emit(new ProgressEvent.RunStarted(2));
sink.emit(new ProgressEvent.StageStarted("extract", 0, 2));
sink.emit(new ProgressEvent.PageProcessed("extract", 1, 2));
sink.emit(new ProgressEvent.StageCompleted("extract"));
sink.emit(new ProgressEvent.StageStarted("spread", 1, 2));
sink.emit(new ProgressEvent.StageCompleted("spread"));
sink.emit(new ProgressEvent.RunCompleted());

assertThat(output().lines())
.hasSize(3)
.satisfiesExactly(
extract ->
assertThat(extract)
.matches(
"timing: extract = \\d+\\.\\d{2}s"
+ " \\(\\d+\\.\\d%\\)"),
spread ->
assertThat(spread)
.matches(
"timing: spread = \\d+\\.\\d{2}s"
+ " \\(\\d+\\.\\d%\\)"),
total -> assertThat(total).matches("timing: total = \\d+\\.\\d{2}s"));
}

@Test
void nothingIsPrintedBeforeTheRunEnds() {
sink.emit(new ProgressEvent.RunStarted(1));
sink.emit(new ProgressEvent.StageStarted("extract", 0, 1));
sink.emit(new ProgressEvent.StageCompleted("extract"));

assertThat(output()).isEmpty();
}

@Test
void failedRunReportsTheStillOpenStage() {
sink.emit(new ProgressEvent.RunStarted(2));
sink.emit(new ProgressEvent.StageStarted("extract", 0, 2));
sink.emit(new ProgressEvent.StageCompleted("extract"));
sink.emit(new ProgressEvent.StageStarted("register", 1, 2));
sink.emit(new ProgressEvent.RunFailed("INTERNAL", "boom"));

assertThat(output())
.contains("timing: extract = ")
.contains("timing: register = ")
.contains("timing: total = ");
}
}
Loading
Loading