-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathparser.py
More file actions
306 lines (269 loc) · 11.5 KB
/
parser.py
File metadata and controls
306 lines (269 loc) · 11.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
import re
from pathlib import Path
import tui
from peptide import Peptide
import util
import files
from exit_codes import EXIT_SUCCESS, EXIT_FAILURE
def read_experiments(context: dict):
experiment_names = read_experiment_names(context)
if not experiment_names:
return EXIT_FAILURE
if len(experiment_names) == 1:
confirm = tui.confirm(f"Only one dataset was found in {Path(context['filename']).name}. Do you wish to continue?")
if not confirm:
tui.say_error("Aborting analysis. Please check your file for columns labeled '# PSMs' or '#Scan [sample]'.")
return EXIT_FAILURE
# At this point, we can be reasonably sure those pieces of data are acceptable
context['experiment_count'] = len(experiment_names)
context['experiment_names'] = experiment_names
peptide_sequences = read_peptide_sequences(context)
if not peptide_sequences:
return EXIT_FAILURE
modification_lists_raw = read_modification_lists(context)
experiment_columns = []
try:
for name in experiment_names:
experiment_columns.append(context['dataframe'][name].tolist())
except KeyError:
tui.say_error("Unexpected error getting experimental data!")
return EXIT_FAILURE
if context['experiment_count'] == 1:
experiment_names = [Path(context['filename']).stem]
context['experiment_names'] = experiment_names
target_mod = tui.choose_between("Which type of modification are you looking for?",
["Acetyl","Other"])
if target_mod == "Other":
target_mod = tui.answer_alpha_only("Which modification?")
if not target_mod:
tui.say_error(f"Expected a string modification, but received {target_mod}! Aborting.")
return EXIT_FAILURE
tui.say(f"Analyzing all '{target_mod}' modifications!")
context['target_modification'] = target_mod
peptide_list = []
for i,peptide_sequence in enumerate(peptide_sequences):
try:
cleaned_modification_list = clean_modification_list(modification_lists_raw[i],target_mod)
except RuntimeError as e:
tui.say(f"Peptide [blue]{peptide_sequence}[/]:")
tui.say_error(e)
return EXIT_FAILURE
if cleaned_modification_list is None:
tui.say(f"Peptide [blue]{peptide_sequence}[/] contains modification string '{modification_lists_raw[i]}'.")
tui.say_error("Due to uncertainty, this peptide is being discarded!!")
continue
scan_counts = {}
for j,name in enumerate(experiment_names):
scan_counts[name] = experiment_columns[j][i]
peptide_list.append(Peptide(peptide_sequence,cleaned_modification_list,scan_counts))
context['peptides'] = peptide_list
context['step'] = 'select-fasta-files'
return EXIT_SUCCESS
def read_experiment_names(context: dict):
experiment_names = context['dataframe'].columns[context['dataframe'].columns.str.contains("# PSMs",case=True)].tolist()
if len(experiment_names) == 0:
experiment_names = context['dataframe'].columns[context['dataframe'].columns.str.contains("#Scan",case=True)].tolist()
if len(experiment_names) == 0:
tui.say_error(f"Could not find peptide hit counts in {Path(context['filename']).name}!" +
"Expected '# PSMs' or '#Scan [sample]'")
return None
return experiment_names
def read_peptide_sequences(context: dict) -> list[str]:
try:
peptide_list_raw = context['dataframe']['Annotated Sequence']
except KeyError:
try:
peptide_list_raw = context['dataframe']['Peptide']
except KeyError:
tui.say_error("Could not find 'Peptide' or 'Annotated Sequence' column! Please double check input.")
return None
peptide_sequences = []
for row in peptide_list_raw:
cleaned_sequence = clean_peptide_sequence(row)
if not cleaned_sequence:
return None
peptide_sequences.append(cleaned_sequence)
return peptide_sequences
def read_modification_lists(context: dict):
try:
modification_lists = context['dataframe']['AScore']
except KeyError:
try:
modification_lists = context['dataframe']['Modifications']
except KeyError:
tui.say_error("Could not find 'AScore' or 'Modifications' column in '{Path(context['filename']).name}'!")
return []
return modification_lists
def clean_peptide_sequence(dirty: str) -> str:
'''
Expects a "dirty" peptide sequence from PEAKS or Proteome Discover, which
may be of a few styles:
Proteome Discover:
[K].VSPFLNATYR.[K]
[].MGKEPMSGFKAIDK.[]
[K].LLSDDGDSVGYGEPLVAVLPSFHDINIQ.[-]
etc.
PEAKS:
D.SHVYPDYVVPPSYDSLLGKLIVWAPTRE.R
E.C(+57.02)RINAEDPFK(+42.01)GFRPGPGRITSYLPSGGPFVRMD.S
etc.
'''
# Remove brackets
dirty = re.sub(r'[\[\]]', '', dirty)
# Remove trailing ends, if there
dot_left = dirty.find('.',0,2)
dot_right = dirty.find('.',len(dirty)-2)
if dot_left >= 0:
dirty = dirty.split('.', 1)[1]
if dot_right >= 0:
dirty = dirty.rsplit('.',1)[0]
# Remove parentheticals
clean = re.sub(r'\([^)]*\)', '', dirty)
# Check that we only have accepted residues
if any(c not in set('ACDEFGHIKLMNPQRSTVWY') for c in clean):
tui.say_error(f"An unexpected character was found in the 'cleaned' peptide sequence '{clean}'.")
tui.say("Firstly, ensure all residues are capitalized. If the error persists and the sequence looks correct, contact the developer.")
return ""
return clean
def modification_uncertain(mod: str):
return '/' in mod
def clean_modification_list(dirty, target_mod: str):
dirty = '' if str(dirty) == 'nan' else str(dirty)
dirty_split = split_modification_str_to_list(dirty)
clean_list = [mod for mod in dirty_split if target_mod in mod]
# At this point, we have either:
# 1: "Kn:Acetylation (K):####.##"
# 2: "Sn:Acetylation (STY):##.##"
# 3: "1xAcetyl [Kn(##)]"
# 4: "1xAcetyl [T/K/S]"
# 5: "1xAcetyl [S]"
# We cannot use case 4 or 5, so the entire peptide hit will be DISCARDED :(
# (not counted toward either acetylated/unacetylated).
if any('/' in mod for mod in clean_list):
return None
out_list = []
for i,mod in enumerate(clean_list):
# We can further simplify our list by
# grabbing only the "Xn" string (e.g., K107)
matched = re.search(r'[A-Z]\d+', mod)
uncertain = re.search(r'\[[A-Z]\]', mod)
term = re.search(r'\b[NC]-Term\b', mod)
if uncertain:
return None
if not matched and not term:
raise RuntimeError(f"Unexpected modification string '{dirty}'!\nDEBUG:\nTerm: {term}\nMatched: {matched}\n{clean_list}")
if matched:
out_list.append(matched.group())
return out_list
def split_modification_str_to_list(mods: str) -> list:
# "AScore" (PEAKS)
# eg.:
# "C1:Carbamidomethylation:1000.00;M32:Oxidation (M):1000.00"
# "C5:Carbamidomethylation:1000.00;K14:Acetylation (K):1000.00"
# "T1:Acetylation (STY):99.62;M2:Oxidation (M):1000.00;K3:Acetylation (K):1000.00"
# "Modifications" (Proteome Discover)
# eg.:
# "1xCarbamidomethyl [C5]; 1xAcetyl [K14(100)]"
# "1xAcetyl [K5(100)]"
# "1xAcetyl [T/K/S]"
# "2xAcetyl [T14; S17]"
# "2xAcetyl [T14; S17]; 1xAcetyl [N-Term]; 1xCarbamidomethyl [C5]"
if ';' not in mods:
# Only a single modification.
# Convert to list containing single string
return [mods]
if '[' not in mods:
# PEAKS version. Split by semicolon
return mods.split(';')
# Only Proteome Discover possibilities remaining
# Inside brackets examples:
# [K5(100)]
# [T/K/S]
# [T14; S17]
# [N-Term]
# [K5]
# [S]
mods_split = re.split(r";(?![^\[]*\])", mods)
if not any(';' in mod for mod in mods_split):
# No semicolon inside brackets; simply split by semicolon
return mods.split(';')
# All that remains now is to split inner semi-colons:
# NxMod [X1; Y2; Z3]
# 1xMod [X1]
# 1xMod [N-Term]
# 1xMod [X1(100)]
mods_updated = []
for mod in mods_split:
if not ';' in mod:
mods_updated.append(mod)
if ';' in mod:
sc_count = mod.count(';')
bracket_string = re.search(r"\[.*\]", mod).group()
mod_prefix = mod.split('[')[0]
inner_list = bracket_string.strip('[]').split(';')
# inner_list is a list of str such as:
# * 'X1'
# * 'Y2'
# * 'Z3'
split_list = [f'{mod_prefix}[{item.strip()}]' for item in inner_list]
mods_updated += split_list
return mods_updated
def remove_fasta_comments(fasta_lines: list[str]) -> list:
for i in reversed(range(len(fasta_lines))):
if fasta_lines[i][0] == '>':
fasta_lines.pop(i)
return fasta_lines
def read_multi_fasta_sequence(fasta_filename: str, fasta_lines: list[str]) -> list[str]:
desc_indices = [i for i,line in enumerate(fasta_lines) if line[0] == '>']
multi_sequence = []
current_desc = ""
current_fasta = ""
fasta_count = 0
for i,line in fasta_lines:
if i in desc_indices:
# new fasta start
if current_fasta:
# end current fasta
fasta_count += 1
multi_sequence.append(current_fasta)
current_fasta = ""
current_desc = ""
if not current_desc:
current_desc = line[1:].strip()
else:
current_desc += ' ' + line[1:].strip()
else:
# continue on current fasta
current_fasta += line.strip()
# After the loop:
if current_desc and not current_fasta:
# Trailing descriptor/comment.
# Warn user, but ignore
tui.say(f"Warning: '{fasta_filename}' ends with a '>' line (are you missing a sequence?)")
elif current_desc and current_fasta:
fasta_count += 1
multi_sequence.append(current_fasta)
return multi_sequence
def read_fasta_sequences(context: dict):
fasta_sequences = {}
for fasta_filename in util.unique_flattened_nested_str_list(list(context['experiment_fasta_files'].values())):
fasta_lines = files.read_text(fasta_filename).splitlines()
# If there are more than one lines beginning with '>' with text between and after each,
# then we treat this fasta file as a "multi-fasta file".
# Double check with user first, of course.
desc_indices = [i for i,line in enumerate(fasta_lines) if line[0] == '>']
if len(desc_indices) <= 1:
# Only one (or no) descriptor/comment line
fasta_sequences[fasta_filename] = ''.join(remove_fasta_comments(fasta_lines))
continue
# Otherwise, there are multiple lines with '>'.
multiple = tui.confirm(f"It looks like {Path(fasta_filename).name} contains multiple sequences. Does it?")
if not multiple:
fasta_sequences[fasta_filename] = ''.join(remove_fasta_comments(fasta_lines))
continue
# Otherwise, we need to treat the fasta as multiple...
# This also means we need to place them all into the dictionary value as a list
fasta_sequences[fasta_filename] = read_multi_fasta_sequence(fasta_filename,fasta_lines)
context['fasta_sequences'] = fasta_sequences
context['step'] = 'compute-results'
return EXIT_SUCCESS