From b9a7d2920344b2d5b86769a20050a3b9f1e91468 Mon Sep 17 00:00:00 2001 From: Phil Ewels Date: Thu, 21 May 2026 03:10:54 +0200 Subject: [PATCH 1/3] Three-stage parallel pipeline for module processing AnalysisRunner now runs as one reader thread plus three processor threads. The reader batches Sequences (1024 per batch) and pushes each batch reference onto N ArrayBlockingQueues. Each processor drains its own queue and runs an evenly split subset of the QCModule array, so modules stay single-threaded per processor and no in-module locking is needed. Progress callbacks (analysisUpdated) are fired from the reader thread at the same cadence as the previous single-threaded version (every batch boundary, gated on a 5% file-position advance). Co-Authored-By: Paolo Di Tommaso --- .../FastQC/Analysis/AnalysisRunner.java | 182 +++++++++++++++--- 1 file changed, 150 insertions(+), 32 deletions(-) diff --git a/uk/ac/babraham/FastQC/Analysis/AnalysisRunner.java b/uk/ac/babraham/FastQC/Analysis/AnalysisRunner.java index d1defb0..035d8f4 100644 --- a/uk/ac/babraham/FastQC/Analysis/AnalysisRunner.java +++ b/uk/ac/babraham/FastQC/Analysis/AnalysisRunner.java @@ -20,8 +20,11 @@ package uk.ac.babraham.FastQC.Analysis; import java.util.ArrayList; +import java.util.Arrays; import java.util.Iterator; import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.CountDownLatch; import uk.ac.babraham.FastQC.Modules.BasicStats; import uk.ac.babraham.FastQC.Modules.QCModule; @@ -34,7 +37,7 @@ public class AnalysisRunner implements Runnable { private SequenceFile file; private QCModule [] modules; private List listeners = new ArrayList(); - private int percentComplete = 0; + private volatile int percentComplete = 0; public AnalysisRunner (SequenceFile file) { this.file = file; @@ -68,48 +71,163 @@ public void run() { i.next().analysisStarted(file); } - - int seqCount = 0; - while (file.hasNext()) { - ++seqCount; - Sequence seq; + // Reader thread reads sequences in batches; NUM_PROCESSORS threads + // drain those batches and each runs a disjoint subset of modules. + final int BATCH_SIZE = 1024; + final int QUEUE_CAPACITY = 32; + final int NUM_PROCESSORS = 3; + + @SuppressWarnings("unchecked") + ArrayBlockingQueue[] procQueues = new ArrayBlockingQueue[NUM_PROCESSORS]; + for (int p = 0; p < NUM_PROCESSORS; p++) { + procQueues[p] = new ArrayBlockingQueue<>(QUEUE_CAPACITY); + } + final Sequence[] POISON = new Sequence[0]; + final SequenceFormatException[] readerError = { null }; + final Throwable[] processorError = { null }; + + QCModule[][] moduleSplits = new QCModule[NUM_PROCESSORS][]; + int splitSize = (modules.length + NUM_PROCESSORS - 1) / NUM_PROCESSORS; + for (int p = 0; p < NUM_PROCESSORS; p++) { + int from = p * splitSize; + int to = Math.min(from + splitSize, modules.length); + moduleSplits[p] = Arrays.copyOfRange(modules, from, to); + } + + // The same Sequence[] reference is published to every processor + // queue. This is safe because Sequence is populated by file.next() + // before put() and modules only read from it. + final int[] seqCountHolder = { 0 }; + + Thread reader = new Thread(() -> { + int readerSeqCount = 0; try { - seq = file.next(); + Sequence[] batch = new Sequence[BATCH_SIZE]; + int idx = 0; + while (file.hasNext()) { + batch[idx++] = file.next(); + ++readerSeqCount; + if (idx == BATCH_SIZE) { + for (int p = 0; p < NUM_PROCESSORS; p++) { + procQueues[p].put(batch); + } + batch = new Sequence[BATCH_SIZE]; + idx = 0; + + // Same cadence as the single-threaded version: every + // 1000 reads (rounded up to 1024 by BATCH_SIZE), fire + // analysisUpdated only when we've crossed another 5%. + if (file.getPercentComplete() >= percentComplete + 5) { + percentComplete = (((int) file.getPercentComplete()) / 5) * 5; + for (int li = 0; li < listeners.size(); li++) { + listeners.get(li).analysisUpdated(file, readerSeqCount, percentComplete); + } + } + } + } + if (idx > 0) { + Sequence[] partial = Arrays.copyOf(batch, idx); + for (int p = 0; p < NUM_PROCESSORS; p++) { + procQueues[p].put(partial); + } + } } catch (SequenceFormatException e) { - i = listeners.iterator(); - while (i.hasNext()) { - i.next().analysisExceptionReceived(file,e); - } - return; + readerError[0] = e; } - - for (int m=0;m= percentComplete+5) { - - percentComplete = (((int)file.getPercentComplete())/5)*5; - - i = listeners.iterator(); - while (i.hasNext()) { - i.next().analysisUpdated(file,seqCount,percentComplete); + finally { + seqCountHolder[0] = readerSeqCount; + // POISON must always be delivered, otherwise processor threads + // block forever on take() and the run never completes. + for (int p = 0; p < NUM_PROCESSORS; p++) { + try { procQueues[p].put(POISON); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } + } + } + }, "fastqc-reader"); + reader.setDaemon(true); + reader.start(); + + CountDownLatch processorsDone = new CountDownLatch(NUM_PROCESSORS); + + for (int p = 0; p < NUM_PROCESSORS; p++) { + final QCModule[] myModules = moduleSplits[p]; + final ArrayBlockingQueue myQueue = procQueues[p]; + + Thread processor = new Thread(() -> { + try { + while (true) { + Sequence[] batch = myQueue.take(); + if (batch == POISON) break; + + for (int b = 0; b < batch.length; b++) { + Sequence seq = batch[b]; + for (int m = 0; m < myModules.length; m++) { + if (seq.isFiltered() && myModules[m].ignoreFilteredSequences()) continue; + myModules[m].processSequence(seq); + } + } + } + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + catch (RuntimeException e) { + // Don't let a misbehaving module deadlock the pipeline: + // drain remaining batches so the reader doesn't block on + // put(), and surface the failure after the join. + processorError[0] = e; try { - Thread.sleep(10); - } - catch (InterruptedException e) {} + while (myQueue.take() != POISON) { /* drain */ } + } + catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + } + finally { + processorsDone.countDown(); + } + }, "fastqc-proc-" + p); + processor.setDaemon(true); + processor.start(); + } + + try { + processorsDone.await(); + reader.join(); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + int seqCount = seqCountHolder[0]; + + if (readerError[0] != null) { + i = listeners.iterator(); + while (i.hasNext()) { + i.next().analysisExceptionReceived(file, readerError[0]); } + return; + } + if (processorError[0] != null) { + SequenceFormatException sfe = new SequenceFormatException( + "Module processing failed: " + processorError[0].getMessage()); + sfe.initCause(processorError[0]); + i = listeners.iterator(); + while (i.hasNext()) { + i.next().analysisExceptionReceived(file, sfe); } + return; } - + // We need to account for their potentially being no sequences - // in the file. In this case the BasicStats module never gets + // in the file. In this case the BasicStats module never gets // the file name so we need to explicitly pass it. - + if (seqCount == 0) { for (int m=0; m Date: Thu, 21 May 2026 08:55:15 +0200 Subject: [PATCH 2/3] Scale the parallel pipeline to the -t/-Dfastqc.threads budget AnalysisQueue treats -t as a total-thread budget and splits it between outer concurrency (files in parallel) and inner concurrency (per-file reader + processor pipeline): processorsPerFile = min(MAX_PROCESSORS_PER_FILE, totalThreads - 1) outerSlots = max(1, totalThreads / (1 + processorsPerFile)) When -t is unset, OfflineRunner now tells AnalysisQueue how many files the run has via configure(); the default becomes min(THREADS_PER_FILE * max(1, expectedFiles), availableProcessors), so a single file gets the full per-file pipeline and many files scale up to the host's CPU count without the user needing to set -t. A budget of one CPU makes AnalysisRunner take its single-threaded path so -t 1 produces byte-identical behaviour to the unbatched runner. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../FastQC/Analysis/AnalysisQueue.java | 45 +++++-- .../FastQC/Analysis/AnalysisRunner.java | 115 ++++++++++++++---- .../FastQC/Analysis/OfflineRunner.java | 5 +- 3 files changed, 133 insertions(+), 32 deletions(-) diff --git a/uk/ac/babraham/FastQC/Analysis/AnalysisQueue.java b/uk/ac/babraham/FastQC/Analysis/AnalysisQueue.java index 95dd6bb..5d109ae 100644 --- a/uk/ac/babraham/FastQC/Analysis/AnalysisQueue.java +++ b/uk/ac/babraham/FastQC/Analysis/AnalysisQueue.java @@ -34,20 +34,51 @@ public class AnalysisQueue implements Runnable, AnalysisListener{ private AtomicInteger availableSlots = new AtomicInteger(1); private AtomicInteger usedSlots = new AtomicInteger(0); - + private volatile int processorsPerFile = 0; + + // Per-file pipeline = 1 reader + up to MAX_PROCESSORS_PER_FILE processor + // threads. Past 3 processors gains plateau as the reader becomes the bottleneck. + private static final int MAX_PROCESSORS_PER_FILE = 3; + private static final int THREADS_PER_FILE = 1 + MAX_PROCESSORS_PER_FILE; + public static AnalysisQueue getInstance () { return instance; } - + private AnalysisQueue () { - - if (FastQCConfig.getInstance().threads != null) { - availableSlots.set(FastQCConfig.getInstance().threads); - } - + // Sized for a single file; OfflineRunner overrides via configure(). + configure(1); + Thread t = new Thread(this); t.start(); } + + /** + * Size the CPU budget. -t/-Dfastqc.threads is a total-thread budget + * split between outer concurrency (files in parallel) and inner + * concurrency (per-file pipeline). Unset defaults to + * min(THREADS_PER_FILE * max(1, expectedFiles), availableProcessors). + * A budget of 1 makes AnalysisRunner take its single-threaded path. + * Call before submitting any AnalysisRunner. + */ + void configure (int expectedFiles) { + int totalThreads; + if (FastQCConfig.getInstance().threads != null) { + totalThreads = FastQCConfig.getInstance().threads; + } + else { + int filesForBudget = Math.max(1, expectedFiles); + long requested = (long) THREADS_PER_FILE * filesForBudget; // long to avoid overflow + totalThreads = (int) Math.min(requested, Runtime.getRuntime().availableProcessors()); + } + processorsPerFile = totalThreads <= 1 ? 0 : Math.min(MAX_PROCESSORS_PER_FILE, totalThreads - 1); + int actualThreadsPerFile = processorsPerFile == 0 ? 1 : 1 + processorsPerFile; + availableSlots.set(Math.max(1, totalThreads / actualThreadsPerFile)); + } + + int getProcessorsPerFile() { + return processorsPerFile; + } public void addToQueue (AnalysisRunner runner) { queue.add(runner); diff --git a/uk/ac/babraham/FastQC/Analysis/AnalysisRunner.java b/uk/ac/babraham/FastQC/Analysis/AnalysisRunner.java index 035d8f4..ed65b37 100644 --- a/uk/ac/babraham/FastQC/Analysis/AnalysisRunner.java +++ b/uk/ac/babraham/FastQC/Analysis/AnalysisRunner.java @@ -66,37 +66,108 @@ public void startAnalysis (QCModule [] modules) { public void run() { + int numProcessors = AnalysisQueue.getInstance().getProcessorsPerFile(); + if (numProcessors == 0) { + runSequential(); + return; + } + runParallel(numProcessors); + } + + private void runSequential() { + + Iterator i = listeners.iterator(); + while (i.hasNext()) { + i.next().analysisStarted(file); + } + + int seqCount = 0; + while (file.hasNext()) { + ++seqCount; + Sequence seq; + try { + seq = file.next(); + } + catch (SequenceFormatException e) { + i = listeners.iterator(); + while (i.hasNext()) { + i.next().analysisExceptionReceived(file,e); + } + return; + } + + for (int m=0;m= percentComplete+5) { + + percentComplete = (((int)file.getPercentComplete())/5)*5; + + i = listeners.iterator(); + while (i.hasNext()) { + i.next().analysisUpdated(file,seqCount,percentComplete); + } + try { + Thread.sleep(10); + } + catch (InterruptedException e) {} + } + } + } + + // We need to account for their potentially being no sequences + // in the file. In this case the BasicStats module never gets + // the file name so we need to explicitly pass it. + + if (seqCount == 0) { + for (int m=0; m i = listeners.iterator(); while (i.hasNext()) { i.next().analysisStarted(file); } - // Reader thread reads sequences in batches; NUM_PROCESSORS threads - // drain those batches and each runs a disjoint subset of modules. + // Reader fills batches; numProcessors threads drain them, each running + // a disjoint subset of modules. final int BATCH_SIZE = 1024; final int QUEUE_CAPACITY = 32; - final int NUM_PROCESSORS = 3; @SuppressWarnings("unchecked") - ArrayBlockingQueue[] procQueues = new ArrayBlockingQueue[NUM_PROCESSORS]; - for (int p = 0; p < NUM_PROCESSORS; p++) { + ArrayBlockingQueue[] procQueues = new ArrayBlockingQueue[numProcessors]; + for (int p = 0; p < numProcessors; p++) { procQueues[p] = new ArrayBlockingQueue<>(QUEUE_CAPACITY); } final Sequence[] POISON = new Sequence[0]; final SequenceFormatException[] readerError = { null }; final Throwable[] processorError = { null }; - QCModule[][] moduleSplits = new QCModule[NUM_PROCESSORS][]; - int splitSize = (modules.length + NUM_PROCESSORS - 1) / NUM_PROCESSORS; - for (int p = 0; p < NUM_PROCESSORS; p++) { + QCModule[][] moduleSplits = new QCModule[numProcessors][]; + int splitSize = (modules.length + numProcessors - 1) / numProcessors; + for (int p = 0; p < numProcessors; p++) { int from = p * splitSize; int to = Math.min(from + splitSize, modules.length); moduleSplits[p] = Arrays.copyOfRange(modules, from, to); } - // The same Sequence[] reference is published to every processor - // queue. This is safe because Sequence is populated by file.next() - // before put() and modules only read from it. + // Same Sequence[] reference is shared across processor queues; safe + // because each Sequence is fully populated before put() and modules + // only read from it. final int[] seqCountHolder = { 0 }; Thread reader = new Thread(() -> { @@ -108,15 +179,13 @@ public void run() { batch[idx++] = file.next(); ++readerSeqCount; if (idx == BATCH_SIZE) { - for (int p = 0; p < NUM_PROCESSORS; p++) { + for (int p = 0; p < numProcessors; p++) { procQueues[p].put(batch); } batch = new Sequence[BATCH_SIZE]; idx = 0; - // Same cadence as the single-threaded version: every - // 1000 reads (rounded up to 1024 by BATCH_SIZE), fire - // analysisUpdated only when we've crossed another 5%. + // Fire analysisUpdated only when the file advances another 5%. if (file.getPercentComplete() >= percentComplete + 5) { percentComplete = (((int) file.getPercentComplete()) / 5) * 5; for (int li = 0; li < listeners.size(); li++) { @@ -127,7 +196,7 @@ public void run() { } if (idx > 0) { Sequence[] partial = Arrays.copyOf(batch, idx); - for (int p = 0; p < NUM_PROCESSORS; p++) { + for (int p = 0; p < numProcessors; p++) { procQueues[p].put(partial); } } @@ -140,9 +209,8 @@ public void run() { } finally { seqCountHolder[0] = readerSeqCount; - // POISON must always be delivered, otherwise processor threads - // block forever on take() and the run never completes. - for (int p = 0; p < NUM_PROCESSORS; p++) { + // POISON must always be delivered or processors block on take() forever. + for (int p = 0; p < numProcessors; p++) { try { procQueues[p].put(POISON); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } @@ -152,9 +220,9 @@ public void run() { reader.setDaemon(true); reader.start(); - CountDownLatch processorsDone = new CountDownLatch(NUM_PROCESSORS); + CountDownLatch processorsDone = new CountDownLatch(numProcessors); - for (int p = 0; p < NUM_PROCESSORS; p++) { + for (int p = 0; p < numProcessors; p++) { final QCModule[] myModules = moduleSplits[p]; final ArrayBlockingQueue myQueue = procQueues[p]; @@ -177,9 +245,8 @@ public void run() { Thread.currentThread().interrupt(); } catch (RuntimeException e) { - // Don't let a misbehaving module deadlock the pipeline: - // drain remaining batches so the reader doesn't block on - // put(), and surface the failure after the join. + // Drain so the reader doesn't block on put(); surface the + // failure to listeners after join(). processorError[0] = e; try { while (myQueue.take() != POISON) { /* drain */ } diff --git a/uk/ac/babraham/FastQC/Analysis/OfflineRunner.java b/uk/ac/babraham/FastQC/Analysis/OfflineRunner.java index e2d763f..6671d5f 100644 --- a/uk/ac/babraham/FastQC/Analysis/OfflineRunner.java +++ b/uk/ac/babraham/FastQC/Analysis/OfflineRunner.java @@ -118,7 +118,10 @@ else if (FastQCConfig.getInstance().nano) { filesRemaining = new AtomicInteger(fileGroups.length); - + + // Let the queue size its CPU budget to the actual workload. + AnalysisQueue.getInstance().configure(fileGroups.length); + for (int i=0;i Date: Thu, 21 May 2026 09:02:33 +0200 Subject: [PATCH 3/3] Update CLI help text --- fastqc | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/fastqc b/fastqc index 4efc9e5..60e463e 100755 --- a/fastqc +++ b/fastqc @@ -450,11 +450,9 @@ DESCRIPTION --png Save the graphs in PNG format - -t --threads Specifies the number of files which can be processed - simultaneously. Each thread will be allocated 512MB of - memory so you shouldn't run more threads than your - available memory will cope with, and not more than - 6 threads on a 32 bit machine + -t --threads Total number of CPU threads to use. Each file uses up to + 4 threads. Defaults to 4 x number of files, capped at + the number of available CPUs. -c Specifies a non-default file which contains the list of --contaminants contaminants to screen overrepresented sequences against.