Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions fastqc
Original file line number Diff line number Diff line change
Expand Up @@ -450,11 +450,9 @@ DESCRIPTION

--png Save the graphs in PNG format

-t --threads Specifies the number of files which can be processed
simultaneously. Each thread will be allocated 512MB of
memory so you shouldn't run more threads than your
available memory will cope with, and not more than
6 threads on a 32 bit machine
-t --threads Total number of CPU threads to use. Each file uses up to
4 threads. Defaults to 4 x number of files, capped at
the number of available CPUs.

-c Specifies a non-default file which contains the list of
--contaminants contaminants to screen overrepresented sequences against.
Expand Down
45 changes: 38 additions & 7 deletions uk/ac/babraham/FastQC/Analysis/AnalysisQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,51 @@ public class AnalysisQueue implements Runnable, AnalysisListener{

private AtomicInteger availableSlots = new AtomicInteger(1);
private AtomicInteger usedSlots = new AtomicInteger(0);

private volatile int processorsPerFile = 0;

// Per-file pipeline = 1 reader + up to MAX_PROCESSORS_PER_FILE processor
// threads. Past 3 processors gains plateau as the reader becomes the bottleneck.
private static final int MAX_PROCESSORS_PER_FILE = 3;
private static final int THREADS_PER_FILE = 1 + MAX_PROCESSORS_PER_FILE;

public static AnalysisQueue getInstance () {
return instance;
}

private AnalysisQueue () {

if (FastQCConfig.getInstance().threads != null) {
availableSlots.set(FastQCConfig.getInstance().threads);
}

// Sized for a single file; OfflineRunner overrides via configure().
configure(1);

Thread t = new Thread(this);
t.start();
}

/**
* Size the CPU budget. -t/-Dfastqc.threads is a total-thread budget
* split between outer concurrency (files in parallel) and inner
* concurrency (per-file pipeline). Unset defaults to
* <code>min(THREADS_PER_FILE * max(1, expectedFiles), availableProcessors)</code>.
* A budget of 1 makes AnalysisRunner take its single-threaded path.
* Call before submitting any AnalysisRunner.
*/
void configure (int expectedFiles) {
int totalThreads;
if (FastQCConfig.getInstance().threads != null) {
totalThreads = FastQCConfig.getInstance().threads;
}
else {
int filesForBudget = Math.max(1, expectedFiles);
long requested = (long) THREADS_PER_FILE * filesForBudget; // long to avoid overflow
totalThreads = (int) Math.min(requested, Runtime.getRuntime().availableProcessors());
}
processorsPerFile = totalThreads <= 1 ? 0 : Math.min(MAX_PROCESSORS_PER_FILE, totalThreads - 1);
int actualThreadsPerFile = processorsPerFile == 0 ? 1 : 1 + processorsPerFile;
availableSlots.set(Math.max(1, totalThreads / actualThreadsPerFile));
}

int getProcessorsPerFile() {
return processorsPerFile;
}

public void addToQueue (AnalysisRunner runner) {
queue.add(runner);
Expand Down
207 changes: 196 additions & 11 deletions uk/ac/babraham/FastQC/Analysis/AnalysisRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@
package uk.ac.babraham.FastQC.Analysis;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;

import uk.ac.babraham.FastQC.Modules.BasicStats;
import uk.ac.babraham.FastQC.Modules.QCModule;
Expand All @@ -34,7 +37,7 @@ public class AnalysisRunner implements Runnable {
private SequenceFile file;
private QCModule [] modules;
private List<AnalysisListener> listeners = new ArrayList<AnalysisListener>();
private int percentComplete = 0;
private volatile int percentComplete = 0;

public AnalysisRunner (SequenceFile file) {
this.file = file;
Expand Down Expand Up @@ -63,12 +66,21 @@ public void startAnalysis (QCModule [] modules) {

public void run() {

int numProcessors = AnalysisQueue.getInstance().getProcessorsPerFile();
if (numProcessors == 0) {
runSequential();
return;
}
runParallel(numProcessors);
}

private void runSequential() {

Iterator<AnalysisListener> i = listeners.iterator();
while (i.hasNext()) {
i.next().analysisStarted(file);
}


int seqCount = 0;
while (file.hasNext()) {
++seqCount;
Expand All @@ -83,41 +95,214 @@ public void run() {
}
return;
}

for (int m=0;m<modules.length;m++) {
if (seq.isFiltered() && modules[m].ignoreFilteredSequences()) continue;
modules[m].processSequence(seq);
}

if (seqCount % 1000 == 0) {
if (file.getPercentComplete() >= percentComplete+5) {

percentComplete = (((int)file.getPercentComplete())/5)*5;

i = listeners.iterator();
while (i.hasNext()) {
i.next().analysisUpdated(file,seqCount,percentComplete);
}
try {
Thread.sleep(10);
}
}
catch (InterruptedException e) {}
}
}
}

// We need to account for their potentially being no sequences
// in the file. In this case the BasicStats module never gets
// in the file. In this case the BasicStats module never gets
// the file name so we need to explicitly pass it.

if (seqCount == 0) {
for (int m=0; m<modules.length; m++) {
if (modules[m] instanceof BasicStats) {
((BasicStats)modules[m]).setFileName(file.name());
}
}
}


i = listeners.iterator();
while (i.hasNext()) {
i.next().analysisComplete(file,modules);
}
}

private void runParallel(final int numProcessors) {

Iterator<AnalysisListener> i = listeners.iterator();
while (i.hasNext()) {
i.next().analysisStarted(file);
}

// Reader fills batches; numProcessors threads drain them, each running
// a disjoint subset of modules.
final int BATCH_SIZE = 1024;
final int QUEUE_CAPACITY = 32;

@SuppressWarnings("unchecked")
ArrayBlockingQueue<Sequence[]>[] procQueues = new ArrayBlockingQueue[numProcessors];
for (int p = 0; p < numProcessors; p++) {
procQueues[p] = new ArrayBlockingQueue<>(QUEUE_CAPACITY);
}
final Sequence[] POISON = new Sequence[0];
final SequenceFormatException[] readerError = { null };
final Throwable[] processorError = { null };

QCModule[][] moduleSplits = new QCModule[numProcessors][];
int splitSize = (modules.length + numProcessors - 1) / numProcessors;
for (int p = 0; p < numProcessors; p++) {
int from = p * splitSize;
int to = Math.min(from + splitSize, modules.length);
moduleSplits[p] = Arrays.copyOfRange(modules, from, to);
}

// Same Sequence[] reference is shared across processor queues; safe
// because each Sequence is fully populated before put() and modules
// only read from it.
final int[] seqCountHolder = { 0 };

Thread reader = new Thread(() -> {
int readerSeqCount = 0;
try {
Sequence[] batch = new Sequence[BATCH_SIZE];
int idx = 0;
while (file.hasNext()) {
batch[idx++] = file.next();
++readerSeqCount;
if (idx == BATCH_SIZE) {
for (int p = 0; p < numProcessors; p++) {
procQueues[p].put(batch);
}
batch = new Sequence[BATCH_SIZE];
idx = 0;

// Fire analysisUpdated only when the file advances another 5%.
if (file.getPercentComplete() >= percentComplete + 5) {
percentComplete = (((int) file.getPercentComplete()) / 5) * 5;
for (int li = 0; li < listeners.size(); li++) {
listeners.get(li).analysisUpdated(file, readerSeqCount, percentComplete);
}
}
}
}
if (idx > 0) {
Sequence[] partial = Arrays.copyOf(batch, idx);
for (int p = 0; p < numProcessors; p++) {
procQueues[p].put(partial);
}
}
}
catch (SequenceFormatException e) {
readerError[0] = e;
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
finally {
seqCountHolder[0] = readerSeqCount;
// POISON must always be delivered or processors block on take() forever.
for (int p = 0; p < numProcessors; p++) {
try { procQueues[p].put(POISON); } catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}, "fastqc-reader");
reader.setDaemon(true);
reader.start();

CountDownLatch processorsDone = new CountDownLatch(numProcessors);

for (int p = 0; p < numProcessors; p++) {
final QCModule[] myModules = moduleSplits[p];
final ArrayBlockingQueue<Sequence[]> myQueue = procQueues[p];

Thread processor = new Thread(() -> {
try {
while (true) {
Sequence[] batch = myQueue.take();
if (batch == POISON) break;

for (int b = 0; b < batch.length; b++) {
Sequence seq = batch[b];
for (int m = 0; m < myModules.length; m++) {
if (seq.isFiltered() && myModules[m].ignoreFilteredSequences()) continue;
myModules[m].processSequence(seq);
}
}
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
catch (RuntimeException e) {
// Drain so the reader doesn't block on put(); surface the
// failure to listeners after join().
processorError[0] = e;
try {
while (myQueue.take() != POISON) { /* drain */ }
}
catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
finally {
processorsDone.countDown();
}
}, "fastqc-proc-" + p);
processor.setDaemon(true);
processor.start();
}

try {
processorsDone.await();
reader.join();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}

int seqCount = seqCountHolder[0];

if (readerError[0] != null) {
i = listeners.iterator();
while (i.hasNext()) {
i.next().analysisExceptionReceived(file, readerError[0]);
}
return;
}
if (processorError[0] != null) {
SequenceFormatException sfe = new SequenceFormatException(
"Module processing failed: " + processorError[0].getMessage());
sfe.initCause(processorError[0]);
i = listeners.iterator();
while (i.hasNext()) {
i.next().analysisExceptionReceived(file, sfe);
}
return;
}

// We need to account for their potentially being no sequences
// in the file. In this case the BasicStats module never gets
// the file name so we need to explicitly pass it.

if (seqCount == 0) {
for (int m=0; m<modules.length; m++) {
if (modules[m] instanceof BasicStats) {
((BasicStats)modules[m]).setFileName(file.name());
}
}
}

i = listeners.iterator();
while (i.hasNext()) {
i.next().analysisComplete(file,modules);
Expand Down
5 changes: 4 additions & 1 deletion uk/ac/babraham/FastQC/Analysis/OfflineRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,10 @@ else if (FastQCConfig.getInstance().nano) {


filesRemaining = new AtomicInteger(fileGroups.length);


// Let the queue size its CPU budget to the actual workload.
AnalysisQueue.getInstance().configure(fileGroups.length);

for (int i=0;i<fileGroups.length;i++) {

try {
Expand Down
Loading