-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathSnakefile.process_cells
More file actions
executable file
·450 lines (383 loc) · 17 KB
/
Snakefile.process_cells
File metadata and controls
executable file
·450 lines (383 loc) · 17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
#!/bin/bash
# vim: ft=python
# Contents >>>
# + Embedded BASH script to bootstrap the workflow
# + Initialisation and configuration
# + Helper functions
# + The rules specific to this workflow
# + More generic rules
# Note this workflow normally expects to run on gseg-login0 while viewing files
# in RUNDIR on /fluidfs, which cannot be seen by the worker nodes.
"""true" ### Begin shell script part
set -u
source "`dirname $0`"/shell_helper_functions.sh
# The TOOLBOX setting gets passed to jobs that run on SLURM. The PATH setting
# does not, as SLURM resets that env var.
export TOOLBOX="$(find_toolbox)"
export PATH="${PATH}:$(dirname "$0")"
snakerun_drmaa "$0" "$@"
"exit""" ### End of shell script part
#!/usr/bin/env snakemake
from snakemake.io import Wildcards
from glob import glob
from pprint import pprint, pformat
from smrtino import load_yaml, dump_yaml
""" This is a recipe to process raw SMRT cells from the PacBio Revio,
and start to calculate some stats.
The script wants to be run in an output dir under $TO_LOCATION
It will process all SMRT cells specified by config['cells']
If should (maybe?) be possible to run multiple instances at once, so long as the
cells are not conflicting.
"""
TOOLBOX = 'env PATH="{}:$PATH"'.format(os.environ['TOOLBOX'])
# Other than that, ensure that scripts in the directory with this Snakefile are
# in the PATH (we have to do it here as $PATH does not get preserved on cluster jobs).
if ( not os.path.dirname(workflow.snakefile) in os.environ['PATH'].split(':') and
not os.path.dirname(os.path.abspath(workflow.snakefile)) in os.environ['PATH'].split(':') ):
os.environ['PATH'] += ':' + os.path.dirname(workflow.snakefile)
# Should not be necessary to give the run dir as filenames in sc_data.yaml should all
# contain the full path to the inputs.
#RUNDIR = config.get('rundir', 'pbpipeline/from')
# Load the info prepared by scan_cells.py
try:
SC = load_yaml(config.get("sc_data", "sc_data.yaml"))
SC['unfiltered_cells'] = SC['cells']
if 'cells' in config:
SC['cells'] = { k: v for k, v in SC['unfiltered_cells'].items()
if v['slot'] in config['cells'].split() }
except Exception as e:
logger.warning(f"{e} - will continue with empty sc_data")
# Allow for cases where we just want to run one rule and don't care about the sc_data
SC = dict(cells={}, unfiltered_cells=[None])
# Load the .kinnex_scan.yaml files and merge into SC['cells']
for cell, cellinfo in SC['cells'].items():
cellinfo['bc_and_unass'] = list(cellinfo['barcodes'])
if cellinfo.get('unassigned'):
cellinfo['bc_and_unass'].append('unassigned')
for bc in cellinfo['bc_and_unass']:
kinnex_yaml = load_yaml(f"kinnex_scan/{cell}.hifi_reads.{bc}.kinnex_scan.yaml")
cellinfo.setdefault('kinnex_scan', {})[bc] = kinnex_yaml
logger.info(f"SC loaded with cells: {list(SC['unfiltered_cells'])}")
logger.info(f"Of these, we process: {list(SC['cells'])}")
def find_source_file(**args):
""" Shim to get me a source file by looking into SC.
This returns a function which can be used as an input function.
"""
def _fsf(wildcards):
# combine args and wildcards. This is an undocumented use of the snakemake.io.Wildcards
# constructor to combine args with wildcards.
wc = Wildcards(wildcards, args)
try:
if wc.barcode == "unassigned":
# Special case when "unassigned" is used as a barcode name.
fn = SC['cells'][wc.cell]['unassigned'][wc.part][wc.fmt]
else:
fn = SC['cells'][wc.cell]['barcodes'][wc.barcode][wc.part][wc.fmt]
except AttributeError:
# No barcode. Must be the metadata or reports.zip or lima_counts.txt file.
fn = SC['cells'][wc.cell][wc.fmt]
return fn
return _fsf
def get_primers_masN(n):
"""The primers file might be mas(8|12|16)_primers.fasta
which should be in the primers/ subdirectory relative to this
Snakefile
"""
prog_dir = os.path.dirname(os.path.abspath(workflow.snakefile))
return f"{prog_dir}/primers/mas{n}_primers.fasta"
wildcard_constraints:
n = r"\d+",
cell = r"m\w+",
barcode = r"[^/.]+",
_mas = r"(\.mas\d{1,2})?",
bc_and_mas = r"[^/.]+(\.mas\d{1,2})?",
part = r"hifi_reads|fail_reads",
# Main target is one yaml file (of metadata) per cell. A little bit like statfrombam.yml in the
# project QC pipelines.
localrules: main, one_cell_info, one_barcode_info
localrules: copy_meta, get_bam_head, copy_reports_zip, copy_lima_counts, count_fastq
rule main:
input:
yaml = [ f"{c}.info.yaml" for c in SC['cells'] ]
def i_one_cell_info(wc):
"""The summary for a cell is just the summaries for all barcodes, plus unassigned,
combined with the run info from sc_data.yaml
The info we need to make this list is in SC.
"""
cell = wc.cell
cell_barcodes = sorted(SC['cells'][cell]['barcodes'])
res = dict( sc_data = config.get("sc_data", "sc_data.yaml") )
if 'unassigned' in SC['cells'][cell]:
res['unass'] = f"{cell}/unassigned/{cell}.info.unassigned.yaml"
res['bc'] = [ f"{cell}/{bc}/{cell}.info.{bc}.yaml"
for bc in cell_barcodes ]
# Various reports to unpack from the .reports.zip file, but we can do that
# within make_report.py, so just copy the file.
# Note that there was no reports.zip prior to SMRTLink 13
# raw_data adapter ccs control loading
res["reports_zip"] = f"{cell}.reports.zip"
# Also the metadata.xml file (which is also included in one_barcode_info)
res['metaxml'] = f"{cell}.metadata.xml"
# And the sts.xml file, which is not
res['stsxml'] = f"{cell}.sts.xml"
# Lima counts is optional
if SC['cells'][cell].get('lima_counts'):
res['lima_counts'] = f"{cell}.hifi_reads.lima_counts.txt"
return res
# This rule connects the .info.yaml to all the bits of data we need and also generates
# the .info.yaml contents for a barcode.
def i_one_barcode_info(wc):
"""See what we need to generate for a single barcode. This used to be for a cell but now
accounts for barcodes by processing each barcode singly.
"""
cell = wc.cell
barcode = wc.bc
# barcode could be "unassigned"
kinnex_scan = SC['cells'][cell]['kinnex_scan'][barcode]
if kinnex_scan['mas']:
bc_and_mas = f"{barcode}.{kinnex_scan['mas']}"
else:
bc_and_mas = barcode
# See if we want a quick run (ie. a pre-run, no QC) and blobs
quick = str(config.get('quick', '0')) != '0'
blobs = str(config.get('blobs', '1')) != '0'
res = dict( md5 = [],
cstats = [],
bamhead = [],
cou = [],
xml = [] )
# We need md5sums and contig stats for hifi_reads and fail_reads
# May as well run cstats for all cases as we need them to make .count files
for part in "hifi_reads fail_reads".split():
res['md5'].append(f"md5sums/{cell}/{barcode}/{cell}.{part}.{bc_and_mas}.bam.md5")
res['cstats'].append(f"{cell}/{barcode}/{cell}.{part}.{bc_and_mas}.cstats.yaml")
# Also the consensusreadset.xml and bam.head, but only for hifi_reads
res['xml'].append(f"{cell}/{barcode}/{cell}.hifi_reads.{bc_and_mas}.consensusreadset.xml")
res['bamhead'].append(f"{cell}/{barcode}/{cell}.hifi_reads.{bc_and_mas}.bam.head")
# Also the metadata.xml file (which is per cell not per barcode)
res['metaxml'] = f"{cell}.metadata.xml"
# For the HiFi reads we want the files in FASTQ format as well as BAM, and we want
# a .count file and md5sum for that fastq.
res['cou'].append(f"{cell}/{barcode}/{cell}.hifi_reads.{bc_and_mas}.fastq.count")
res['md5'].append(f"md5sums/{cell}/{barcode}/{cell}.hifi_reads.{bc_and_mas}.fastq.gz.md5")
if not quick:
# Add in the rRNA scan
res['plots'] = [f"rRNA_scan/{cell}.hifi_reads.{bc_and_mas}.results.yaml"]
# Blobs or no? Only on the HiFi reads in any case.
# Only if we do the blobs do we get the taxon guess.
if blobs:
res['plots'].append(f"blob/{cell}.hifi_reads.{bc_and_mas}.plots.yaml")
res['taxon'] = f"blob/{cell}.hifi_reads.{bc_and_mas}.species.txt"
# And the result of the kinnex scan is always needed.
res['kinnex'] = f"kinnex_scan/{cell}.hifi_reads.{barcode}.kinnex_scan.yaml"
return res
rule one_cell_info:
output: "{cell}.info.yaml"
input: unpack(i_one_cell_info)
run:
# Un-silence sys.stderr in sub-jobs:
logger.quiet.discard('all')
# The output here is going to be a YAML file linking to all the other
# per-barcode YAML files - there is no point in copying the actual data over.
# However, the info from reports.zip will be juiced to get the parts we care
# about, reformatting as needed.
# Add all the reports, and unassigned if we have it
optional_bits = ""
for n in input._names:
if n in ['unass', 'reports_zip', 'lima_counts', 'metaxml', 'stsxml']:
optional_bits += f"--{n} {getattr(input, n)} "
shell("""compile_cell_info.py \
--sc_data {input.sc_data} \
{optional_bits} \
{input.bc} > {output}
""")
rule one_barcode_info:
output: "{cell}/{bc}/{cell}.info.{bc}.yaml"
input: unpack(i_one_barcode_info)
run:
# Un-silence sys.stderr in sub-jobs:
logger.quiet.discard('all')
optional_bits = ""
for n in input._names:
if n in ['cstats', 'taxon', 'kinnex', 'plots']:
optional_bits += f"--{n} {getattr(input, n)} "
# What needs to go into the YML? Stuff from the XML and also some stuff from the
# stats, maybe? At present the script will discover extra files automagically.
# input.xml[0] is the xml for the hifi reads - we don't need to read the XML for
# the failed reads.
shell("""compile_bc_info.py \
--metaxml {input.metaxml} \
--binning <( binned_or_not.py <{input.bamhead} ) \
{optional_bits} \
{input.xml[0]} > {output}
""")
# On GSEG this had to be on the login node as the worker nodes can't see FluidFS.
# Still a useful option to have.
# TODO - if we can get the inputs on a different FS again then revert to copying
# on the head node
if os.environ.get('FILTER_LOCALLY', '1') != '0':
localrules: copy_reads, copy_xml, copy_xml_segged
# These rules copy/link the files for a given barcode, then fix up the XML.
# part may be "hifi_reads" (ie. pass) or "fail_reads".
# Note there is no longer XML for the fail_reads, after SMRTLink 13.1
# Also, I did make this use a shadow directory but then it can look like nothing
# is happening while the file copies, so I've switched back to making a partial file.
rule copy_reads:
output:
bam = "{cell}/{barcode}/{cell}.{part}.{barcode}.bam",
pbi = "{cell}/{barcode}/{cell}.{part}.{barcode}.bam.pbi",
input:
bam = find_source_file(fmt="bam"),
pbi = find_source_file(fmt="pbi"),
resources:
nfscopy=1
shell:
"""cp -f --no-preserve=all -Lv {input.bam} {output.bam}.part
mv {output.bam}.part {output.bam}
cp -f --no-preserve=all -Lv {input.pbi} {output.pbi}.part
mv {output.pbi}.part {output.pbi}
"""
# For Kinnex reads we segment rather than copying the original BAM.
# If we go back to having the pbpipeline/from data only on the head node
# then I will probably need to go back to copying the originals too.
rule segment_reads:
output:
bam = "{cell}/{barcode}/{cell}.{part}.{barcode}.mas{n}.bam",
pbi = "{cell}/{barcode}/{cell}.{part}.{barcode}.mas{n}.bam.pbi",
ligs = "{cell}/{barcode}/{cell}.{part}.{barcode}.mas{n}.ligations.csv",
json = "{cell}/{barcode}/{cell}.{part}.{barcode}.mas{n}.summary.json",
np = "{cell}/{barcode}/{cell}.{part}.{barcode}.mas{n}.non_passing.bam",
nppbi = "{cell}/{barcode}/{cell}.{part}.{barcode}.mas{n}.non_passing.bam.pbi",
input:
bam = find_source_file(fmt="bam"),
pbi = find_source_file(fmt="pbi"),
primers = lambda wc: ancient(get_primers_masN(wc.n)),
resources:
mem_mb = 128000,
n_cpus = 18,
threads: 18
shadow: 'minimal'
shell:
"""{TOOLBOX} smrt skera split -j {threads} {input.bam} {input.primers} {output.bam}
"""
rule copy_xml:
output:
xml = "{cell}/{barcode}/{cell}.{part}.{barcode}.consensusreadset.xml",
input:
bam = "{cell}/{barcode}/{cell}.{part}.{barcode}.bam",
pbi = "{cell}/{barcode}/{cell}.{part}.{barcode}.bam.pbi",
xml = find_source_file(fmt="xml"),
shadow: 'minimal'
shell:
"""cp --no-preserve=all -Lv {input.xml} {output.xml}.orig
strip_readset_resources.py {output.xml}.orig > {output.xml}
{TOOLBOX} smrt dataset --skipCounts --log-level INFO relativize {output.xml}
"""
# Use of "shadow: 'minimal' results in a load of warnings for unresolved resources
# in --metadata but the result is fine.
rule copy_xml_segged:
output:
xml = "{cell}/{barcode}/{cell}.{part}.{barcode}.mas{n}.consensusreadset.xml",
input:
bam = "{cell}/{barcode}/{cell}.{part}.{barcode}.mas{n}.bam",
pbi = "{cell}/{barcode}/{cell}.{part}.{barcode}.mas{n}.bam.pbi",
xml = find_source_file(fmt="xml"),
shadow: 'minimal'
shell:
"""{TOOLBOX} smrt dataset --skipCounts --log-level INFO \
create --type ConsensusReadSet --relative \
--metadata {input.xml} \
{output.xml} \
{input.bam}
"""
# XML files get copied, but pretty-printed as we go
rule copy_meta:
output:
meta = "{cell}.{fmt,metadata|sts}.xml"
input:
meta = find_source_file()
shadow: 'minimal'
shell:
"xml_pp.py < {input.meta} > {output.meta}"
# This rule copies the reports.zip file. No real need to unpack it.
rule copy_reports_zip:
output: "{cell}.reports.zip"
input:
zip = find_source_file(fmt="reports_zip")
shadow: 'minimal'
shell:
"cp --no-preserve=all -Lv {input.zip} {output}"
# This rule copies the lima.counts file. For cells with no barcodes this
# should not even be triggered.
rule copy_lima_counts:
output: "{cell}.hifi_reads.lima_counts.txt"
input:
counts = find_source_file(fmt="lima_counts")
shadow: 'minimal'
shell:
"cp --no-preserve=all -Lv {input.counts} {output}"
# This script produces some headline stats as well as (with the -H option) a histogram we could use
# (but currently we don't)
rule get_cstats_yaml:
output:
yaml = "{bam}.cstats.yaml",
input: "{bam}.bam"
threads: 6
shadow: 'minimal'
resources:
mem_mb = 36000,
n_cpus = 6,
shell:
"fasta_stats.py <({TOOLBOX} samtools fasta -@ {threads} {input}) > {output.yaml}"
rule get_bam_head:
output: "{bam}.bam.head"
input: "{bam}.bam"
shell:
"{TOOLBOX} samtools view -H {input} > {output}"
# Export the HiFi reads (could also work with fail reads) as FASTQ.
# My logic on using $(( {threads} / 2 }} for both compression and decompression is that
# decompression is faster but the FASTQ (which has no kinetics) is much smaller. But I've
# not really tested if this is optimal.
rule bam_to_fastq:
output: "{cell}/{barcode}/{foo}.fastq.gz"
input: "{cell}/{barcode}/{foo}.bam"
threads: 16
resources:
mem_mb = 108000,
n_cpus = 16,
shadow: 'minimal'
shell:
r"""sam_threads=$(( ({threads} > 4) ? ({threads} / 4) : 1 ))
pigz_threads=$(( ({threads} > 2) ? (3 * {threads} / 4) : 1 ))
{TOOLBOX} samtools fastq -@ $sam_threads {input} | pigz -c -n -p $pigz_threads > {output}
"""
# Convert to FASTA and subsample and munge the headers
rule bam_to_subsampled_fasta:
output: "subsampled_fasta/{cell}.{part}.{barcode}{_mas}+sub{n}.fasta"
input: "{cell}/{barcode}/{cell}.{part}.{barcode}{_mas}.bam"
threads: 4
resources:
mem_mb = 44000,
n_cpus = 8,
shell:
r"""{TOOLBOX} samtools fasta -@ {threads} {input} | \
{TOOLBOX} seqtk sample - {wildcards.n} | \
sed 's,/,_,g' > {output}
"""
# Make a .count file for the FASTQ file
# Rather than re-scanning the file this should be able to re-use the info from fasta_stats.py,
# so use that as an input.
rule count_fastq:
output: "{foo}.fastq.count"
input:
fastq = "{foo}.fastq.gz",
cstats = "{foo}.cstats.yaml",
shell: "fq_base_counter.py --cstats {input.cstats} {input.fastq} > {output}"
# md5summer that keeps the file path out of the .md5 file
rule md5sum_file:
output: "md5sums/{foo}.md5"
input: "{foo}"
shell: '( cd "$(dirname {input:q})" && md5sum -- "$(basename {input:q})" ) > {output:q}'
## BLOB plotter and rRNA scanner rules ##
include: "Snakefile.blob"
include: "Snakefile.rrnascan"