-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathSnakefile.blob
More file actions
248 lines (219 loc) · 10.3 KB
/
Snakefile.blob
File metadata and controls
248 lines (219 loc) · 10.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
## BLOB plotter rules ##
# vim: ft=python
from smrtino import load_yaml, dump_yaml
# I'll copy the logic from Snakefile.common_denovo here
# Default is that there will be 10000 sequences subsampled
# and they will be blasted in 100 chunks.
# To override the things below set EXTRA_SNAKE_CONFIG
# eg. EXTRA_SNAKE_CONFIG="blob_subsample=1234"
BLOB_LEVELS = config.get('blob_levels', "phylum order species".split())
BLAST_SCRIPT = config.get('blast_script', "blast_nt")
# BLAST S sequences in C chunks. But now the numbers are data-dependent.
def get_blob_size(cell, all_cells=SC['cells']):
try:
bc_num = len(all_cells[cell]['barcodes'])
except KeyError:
bc_num = 1 # assume no barcodes
blob_subsample = 10000
blob_chunks = 100
if bc_num > 5:
# reduced BLAST scan
blob_subsample = blob_subsample // 10
blob_chunks = blob_chunks // 10
res = { 'BLOB_SUBSAMPLE': int(config.get('blob_subsample', blob_subsample)),
'BLOB_CHUNKS': int(config.get('blob_chunks', blob_chunks)) }
res['BLOB_CHUNKSIZE'] = res['BLOB_SUBSAMPLE'] // res['BLOB_CHUNKS']
return res
# This is how I want to pass my plots into compile_cell_info.py
# Serves as the driver by depending on the 6 blob plots and thumbnails for
# each, and arranges the plots into two rows of three columns as we wish to
# display them.
#
# Within the blob directory I'm not having {cell}/{barcode} subdirs - just
# putting everything in one place.
#
localrules: list_blob_plots, get_blob_species
rule list_blob_plots:
output: "blob/{cell}.{part}.{bc_and_mas}.plots.yaml"
input:
png = lambda wc: expand( "blob/{cell}.{part}.{bc_and_mas}.{taxlevel}.{extn}{thumb}.png",
cell = [wc.cell],
part = [wc.part], # hifi_reads
bc_and_mas = [wc.bc_and_mas],
taxlevel = BLOB_LEVELS,
extn = "cov0 read_cov.cov0".split(),
thumb = ['.__thumb', ''] ),
subs = lambda wc: "subsampled_fasta/{wc.cell}.{wc.part}.{wc.bc_and_mas}+sub{sub}.fasta".format(
wc = wc,
sub = get_blob_size(wc.cell)['BLOB_SUBSAMPLE']),
run:
# Un-silence sys.stderr in sub-jobs:
logger.quiet.discard('all')
wc = wildcards
# We want to know how big the subsample actually was, so check the FASTA
with open(str(input.subs)) as fh:
count_from_fastq = sum(1 for l in fh if l.startswith(">"))
# I need to emit the plots in order in pairs. Unfortunately expand() won't quite
# cut it here but I can make a nested list comprehension.
plot_files = [ expand( "{cell}.{part}.{bc_and_mas}.{taxlevel}.{extn}.png",
cell = [wc.cell],
part = [wc.part],
bc_and_mas = [wc.bc_and_mas],
taxlevel = BLOB_LEVELS,
extn = [extn] )
for extn in "read_cov.cov0 cov0".split() ]
plots = dict(title = f"Taxonomy for {wc.part} {wc.bc_and_mas} ({count_from_fastq} sequences)"
f" by {', '.join(BLOB_LEVELS)}",
files = plot_files )
dump_yaml([plots], filename=str(output))
rule get_blob_species:
output: "blob/{cell}.{part}.{bc_and_mas}.species.txt"
input:
txt = expand("blob/{{cell}}.{{part}}.{{bc_and_mas}}.{bl}.blobplot.stats.txt", bl=BLOB_LEVELS[::-1])
shell:
"blobplot_stats_to_species.py {input.txt} > {output}"
# Makes a .complexity file for our FASTA file
# {foo} is blob/{cell}.subreads or blob/{cell}.scraps
rule fasta_to_complexity:
output: "blob/{foo}.complexity"
input: "subsampled_fasta/{foo}.fasta"
params:
level = 10
shell:
r"""{TOOLBOX} dustmasker -level {params.level} -in {input} -outfmt fasta 2>/dev/null | \
count_dust.py > {output}
"""
# Combine all the 100 (or however many) blast reports into one
# I'm filtering out repeated rows to reduce the size of the BLOB DB - there can
# be a _lot_ of repeats so this may be worth running on the cluster.
localrules: merge_blast_reports
def i_merge_blast_reports(wildcards):
"""Return a list of BLAST reports to be merged based upon how many chunks
were outputted by split_fasta_in_chunks.
"""
chunks_list_file = checkpoints.split_fasta_in_chunks.get(**wildcards).output.list
with open(chunks_list_file) as fh:
fasta_chunks = [ l.rstrip('\n') for l in fh ]
# Munge the list of FASTA chunks to get the list of required BLAST chunks
return dict( bparts =
[ re.sub(r'\.fasta_parts/', '.blast_parts/',
re.sub(r'\.fasta$', '.bpart', c))
for c in fasta_chunks ] )
rule merge_blast_reports:
output: "blob/{cell}.{foo}.blast"
input: unpack(i_merge_blast_reports)
shell:
'LC_ALL=C ; ( for i in {input.bparts} ; do sort -u -k1,2 "$i" ; done ) > {output}'
# BLAST a chunk. Note the 'blast_nt' wrapper determines the database to search.
rule blast_chunk:
output: temp("blob/{cell}.{foo}.blast_parts/{chunk}.bpart")
input: "blob/{cell}.{foo}.fasta_parts/{chunk}.fasta"
threads: 4
resources:
mem_mb = 24000,
n_cpus = 6,
params:
evalue = '1e-50',
outfmt = '6 qseqid staxid bitscore'
shadow: 'minimal'
shell:
"""{TOOLBOX} {BLAST_SCRIPT} -query {input} -outfmt '{params.outfmt}' \
-evalue {params.evalue} -max_target_seqs 1 -out {output} -num_threads {threads}
"""
# Split the FASTA into (at most) BLOB_CHUNKS chunks. The number may be less than BLOB_CHUNKS so this
# is a checkpoint rule, and merge_blast_reports then responds to the variable number of outputs. Note
# this will even 'split' a completely empty file if you ask it to, and make zero output files plus
# an empty output.parts.
checkpoint split_fasta_in_chunks:
output:
list = "blob/{cell}.{foo}.fasta_parts_list",
parts = temp(directory("blob/{cell}.{foo}.fasta_parts")),
input: "subsampled_fasta/{cell}.{foo}.fasta"
params:
chunksize = lambda wc: get_blob_size(wc.cell)['BLOB_CHUNKSIZE']
shell:
"""mkdir {output.parts}
touch {output.parts}/list
awk 'BEGIN {{n_seq=0;n_file=0;}} \
/^>/ {{if(n_seq%{params.chunksize}==0){{ \
file=sprintf("{output.parts}/part_%04d.fasta", n_file); n_file++; \
print file >> "{output.parts}/list"; \
}} \
print >> file; n_seq++; next; \
}} \
{{ print >> file; }}' {input}
mv {output.parts}/list {output.list}
"""
# Makes a blob db per FASTA using the complexity file as a COV file.
def i_blob_db(wildcards):
blob_spec = get_blob_size(wildcards.cell)
common_prefix = "{wc.cell}.{wc.foo}+sub{sub}".format( wc = wildcards,
sub = blob_spec['BLOB_SUBSAMPLE'] )
return dict( reads_sample = "subsampled_fasta/{c}.fasta".format(c=common_prefix),
blast_results = "blob/{c}.blast".format(c=common_prefix),
cov = "blob/{c}.complexity".format(c=common_prefix) )
rule blob_db:
output:
json = "blob/{cell}.{foo}.blobDB.json",
input: unpack(i_blob_db)
params:
tmp_prefix = "./{cell}.{foo}"
shadow: 'minimal'
resources:
mem_mb = 30000,
n_cpus = 6,
shell:
r"""if [ ! -s {input.reads_sample} ] ; then touch {output.json} ; exit 0 ; fi
{TOOLBOX} blobtools create -i {input.reads_sample} -o {params.tmp_prefix} \
-t {input.blast_results} -c {input.cov}
ls -l >&2
mv {params.tmp_prefix}.blobDB.json {output.json}
"""
# Run the blob plotting command once per set per tax level. Produce a single
# stats file and a pair of PNG files. Note I gave up on hacking BLOBTools to make sensible
# sized images I just downsample them with GM.
rule blob_plot_png:
output:
plotc = ["blob/{foo}.{taxlevel}.cov0.png", "blob/{foo}.{taxlevel}.cov0.__thumb.png"],
plotr = ["blob/{foo}.{taxlevel}.read_cov.cov0.png", "blob/{foo}.{taxlevel}.read_cov.cov0.__thumb.png"],
stats = "blob/{foo}.{taxlevel}.blobplot.stats.txt"
input:
json = "blob/{foo}.blobDB.json"
params:
maxsize = "1750x1750",
thumbsize = "320x320"
shadow: 'minimal'
resources:
mem_mb = 30000,
n_cpus = 6,
shell:
r""" if [ -s {input.json} ] ; then
export BLOB_COVERAGE_LABEL=Non-Dustiness
{TOOLBOX} blobtools plot -i {input.json} -o ./ --sort_first no-hit,other,undef -r {wildcards.taxlevel}
ls -l >&2
mv {wildcards.foo}.*.stats.txt {output.stats}
{TOOLBOX} convert {wildcards.foo}.*.{wildcards.taxlevel}.*.blobplot.cov0.png \
-resize {params.maxsize}'>' {output.plotc[0]}
{TOOLBOX} convert {wildcards.foo}.*.{wildcards.taxlevel}.*.blobplot.read_cov.cov0.png \
-resize {params.maxsize}'>' {output.plotr[0]}
else
echo "No data" > {output.stats}
{TOOLBOX} gm_label.sh {params.thumbsize} "No data to plot" {output.plotc[0]}
{TOOLBOX} gm_label.sh {params.thumbsize} "No data to plot" {output.plotr[0]}
fi
{TOOLBOX} convert {output.plotc[0]} -resize {params.thumbsize}'>' {output.plotc[1]}
{TOOLBOX} convert {output.plotr[0]} -resize {params.thumbsize}'>' {output.plotr[1]}
"""
# The blob/*.fasta_parts directories are not getting removed. I think this is because they are outputs of
# a checkpoint rule. Remove them here.
onsuccess:
logger.quiet.discard('all')
if config.get('cleanup'):
try:
for cell in SC['cells']:
shell("rm -vf ./blob/{cell}.*.fasta_parts_list")
shell("rm -rvf ./blob/{cell}.*.fasta_parts")
except (NameError, KeyError) as e:
# This should only happen in testing
logger.warning("cleanup failed to read SC['cells']")
## End of BLOB plotter rules ##