diff --git a/fastqc b/fastqc index 4efc9e5..60e463e 100755 --- a/fastqc +++ b/fastqc @@ -450,11 +450,9 @@ DESCRIPTION --png Save the graphs in PNG format - -t --threads Specifies the number of files which can be processed - simultaneously. Each thread will be allocated 512MB of - memory so you shouldn't run more threads than your - available memory will cope with, and not more than - 6 threads on a 32 bit machine + -t --threads Total number of CPU threads to use. Each file uses up to + 4 threads. Defaults to 4 x number of files, capped at + the number of available CPUs. -c Specifies a non-default file which contains the list of --contaminants contaminants to screen overrepresented sequences against. diff --git a/uk/ac/babraham/FastQC/Analysis/AnalysisQueue.java b/uk/ac/babraham/FastQC/Analysis/AnalysisQueue.java index 95dd6bb..5d109ae 100644 --- a/uk/ac/babraham/FastQC/Analysis/AnalysisQueue.java +++ b/uk/ac/babraham/FastQC/Analysis/AnalysisQueue.java @@ -34,20 +34,51 @@ public class AnalysisQueue implements Runnable, AnalysisListener{ private AtomicInteger availableSlots = new AtomicInteger(1); private AtomicInteger usedSlots = new AtomicInteger(0); - + private volatile int processorsPerFile = 0; + + // Per-file pipeline = 1 reader + up to MAX_PROCESSORS_PER_FILE processor + // threads. Past 3 processors gains plateau as the reader becomes the bottleneck. + private static final int MAX_PROCESSORS_PER_FILE = 3; + private static final int THREADS_PER_FILE = 1 + MAX_PROCESSORS_PER_FILE; + public static AnalysisQueue getInstance () { return instance; } - + private AnalysisQueue () { - - if (FastQCConfig.getInstance().threads != null) { - availableSlots.set(FastQCConfig.getInstance().threads); - } - + // Sized for a single file; OfflineRunner overrides via configure(). + configure(1); + Thread t = new Thread(this); t.start(); } + + /** + * Size the CPU budget. -t/-Dfastqc.threads is a total-thread budget + * split between outer concurrency (files in parallel) and inner + * concurrency (per-file pipeline). Unset defaults to + * min(THREADS_PER_FILE * max(1, expectedFiles), availableProcessors). + * A budget of 1 makes AnalysisRunner take its single-threaded path. + * Call before submitting any AnalysisRunner. + */ + void configure (int expectedFiles) { + int totalThreads; + if (FastQCConfig.getInstance().threads != null) { + totalThreads = FastQCConfig.getInstance().threads; + } + else { + int filesForBudget = Math.max(1, expectedFiles); + long requested = (long) THREADS_PER_FILE * filesForBudget; // long to avoid overflow + totalThreads = (int) Math.min(requested, Runtime.getRuntime().availableProcessors()); + } + processorsPerFile = totalThreads <= 1 ? 0 : Math.min(MAX_PROCESSORS_PER_FILE, totalThreads - 1); + int actualThreadsPerFile = processorsPerFile == 0 ? 1 : 1 + processorsPerFile; + availableSlots.set(Math.max(1, totalThreads / actualThreadsPerFile)); + } + + int getProcessorsPerFile() { + return processorsPerFile; + } public void addToQueue (AnalysisRunner runner) { queue.add(runner); diff --git a/uk/ac/babraham/FastQC/Analysis/AnalysisRunner.java b/uk/ac/babraham/FastQC/Analysis/AnalysisRunner.java index d1defb0..ed65b37 100644 --- a/uk/ac/babraham/FastQC/Analysis/AnalysisRunner.java +++ b/uk/ac/babraham/FastQC/Analysis/AnalysisRunner.java @@ -20,8 +20,11 @@ package uk.ac.babraham.FastQC.Analysis; import java.util.ArrayList; +import java.util.Arrays; import java.util.Iterator; import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.CountDownLatch; import uk.ac.babraham.FastQC.Modules.BasicStats; import uk.ac.babraham.FastQC.Modules.QCModule; @@ -34,7 +37,7 @@ public class AnalysisRunner implements Runnable { private SequenceFile file; private QCModule [] modules; private List listeners = new ArrayList(); - private int percentComplete = 0; + private volatile int percentComplete = 0; public AnalysisRunner (SequenceFile file) { this.file = file; @@ -63,12 +66,21 @@ public void startAnalysis (QCModule [] modules) { public void run() { + int numProcessors = AnalysisQueue.getInstance().getProcessorsPerFile(); + if (numProcessors == 0) { + runSequential(); + return; + } + runParallel(numProcessors); + } + + private void runSequential() { + Iterator i = listeners.iterator(); while (i.hasNext()) { i.next().analysisStarted(file); } - int seqCount = 0; while (file.hasNext()) { ++seqCount; @@ -83,33 +95,33 @@ public void run() { } return; } - + for (int m=0;m= percentComplete+5) { - + percentComplete = (((int)file.getPercentComplete())/5)*5; - + i = listeners.iterator(); while (i.hasNext()) { i.next().analysisUpdated(file,seqCount,percentComplete); } try { Thread.sleep(10); - } + } catch (InterruptedException e) {} } } } - + // We need to account for their potentially being no sequences - // in the file. In this case the BasicStats module never gets + // in the file. In this case the BasicStats module never gets // the file name so we need to explicitly pass it. - + if (seqCount == 0) { for (int m=0; m i = listeners.iterator(); + while (i.hasNext()) { + i.next().analysisStarted(file); + } + + // Reader fills batches; numProcessors threads drain them, each running + // a disjoint subset of modules. + final int BATCH_SIZE = 1024; + final int QUEUE_CAPACITY = 32; + + @SuppressWarnings("unchecked") + ArrayBlockingQueue[] procQueues = new ArrayBlockingQueue[numProcessors]; + for (int p = 0; p < numProcessors; p++) { + procQueues[p] = new ArrayBlockingQueue<>(QUEUE_CAPACITY); + } + final Sequence[] POISON = new Sequence[0]; + final SequenceFormatException[] readerError = { null }; + final Throwable[] processorError = { null }; + + QCModule[][] moduleSplits = new QCModule[numProcessors][]; + int splitSize = (modules.length + numProcessors - 1) / numProcessors; + for (int p = 0; p < numProcessors; p++) { + int from = p * splitSize; + int to = Math.min(from + splitSize, modules.length); + moduleSplits[p] = Arrays.copyOfRange(modules, from, to); + } + + // Same Sequence[] reference is shared across processor queues; safe + // because each Sequence is fully populated before put() and modules + // only read from it. + final int[] seqCountHolder = { 0 }; + + Thread reader = new Thread(() -> { + int readerSeqCount = 0; + try { + Sequence[] batch = new Sequence[BATCH_SIZE]; + int idx = 0; + while (file.hasNext()) { + batch[idx++] = file.next(); + ++readerSeqCount; + if (idx == BATCH_SIZE) { + for (int p = 0; p < numProcessors; p++) { + procQueues[p].put(batch); + } + batch = new Sequence[BATCH_SIZE]; + idx = 0; + + // Fire analysisUpdated only when the file advances another 5%. + if (file.getPercentComplete() >= percentComplete + 5) { + percentComplete = (((int) file.getPercentComplete()) / 5) * 5; + for (int li = 0; li < listeners.size(); li++) { + listeners.get(li).analysisUpdated(file, readerSeqCount, percentComplete); + } + } + } + } + if (idx > 0) { + Sequence[] partial = Arrays.copyOf(batch, idx); + for (int p = 0; p < numProcessors; p++) { + procQueues[p].put(partial); + } + } + } + catch (SequenceFormatException e) { + readerError[0] = e; + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + finally { + seqCountHolder[0] = readerSeqCount; + // POISON must always be delivered or processors block on take() forever. + for (int p = 0; p < numProcessors; p++) { + try { procQueues[p].put(POISON); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + }, "fastqc-reader"); + reader.setDaemon(true); + reader.start(); + + CountDownLatch processorsDone = new CountDownLatch(numProcessors); + + for (int p = 0; p < numProcessors; p++) { + final QCModule[] myModules = moduleSplits[p]; + final ArrayBlockingQueue myQueue = procQueues[p]; + + Thread processor = new Thread(() -> { + try { + while (true) { + Sequence[] batch = myQueue.take(); + if (batch == POISON) break; + + for (int b = 0; b < batch.length; b++) { + Sequence seq = batch[b]; + for (int m = 0; m < myModules.length; m++) { + if (seq.isFiltered() && myModules[m].ignoreFilteredSequences()) continue; + myModules[m].processSequence(seq); + } + } + } + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + catch (RuntimeException e) { + // Drain so the reader doesn't block on put(); surface the + // failure to listeners after join(). + processorError[0] = e; + try { + while (myQueue.take() != POISON) { /* drain */ } + } + catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + } + finally { + processorsDone.countDown(); + } + }, "fastqc-proc-" + p); + processor.setDaemon(true); + processor.start(); + } + + try { + processorsDone.await(); + reader.join(); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + int seqCount = seqCountHolder[0]; + + if (readerError[0] != null) { + i = listeners.iterator(); + while (i.hasNext()) { + i.next().analysisExceptionReceived(file, readerError[0]); + } + return; + } + if (processorError[0] != null) { + SequenceFormatException sfe = new SequenceFormatException( + "Module processing failed: " + processorError[0].getMessage()); + sfe.initCause(processorError[0]); + i = listeners.iterator(); + while (i.hasNext()) { + i.next().analysisExceptionReceived(file, sfe); + } + return; + } + + // We need to account for their potentially being no sequences + // in the file. In this case the BasicStats module never gets + // the file name so we need to explicitly pass it. + + if (seqCount == 0) { + for (int m=0; m