-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathhelper_functions.py
More file actions
171 lines (149 loc) · 7.06 KB
/
helper_functions.py
File metadata and controls
171 lines (149 loc) · 7.06 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
#################### Imports ####################
import functools
import gzip
import pickle
from typing import List, Callable
from aligner import Read
from kmers import Reference, KmerCollection
import sys
#################### Constants ####################
NUCLEO_LETTERS = {'A', 'C', 'G', 'T', 'N'} # a set of possible letters
#################### Error Handling ####################
class AppErr(Exception):
"""Custom exception for known application errors (files, permissions)."""
pass
def handle_f_read(operation_desc: str) -> Callable:
"""
This decorator is used to handle errors of file read operations.
It is used throughout the program with different files that need opening.
:param operation_desc: for example, "loading reference databases", etc.
:return: the decorated function with proper file reading errors' handling.
"""
def decorator(func):
@functools.wraps(func) # maintain function attrs after wrapping
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except FileNotFoundError:
raise AppErr(f"Error: File not found while {operation_desc}.")
except PermissionError:
raise AppErr(f"Error: Permission denied to "
f"file while {operation_desc}.")
except Exception as e:
raise AppErr(f"Error while {operation_desc}: {str(e)}")
return wrapper
return decorator
def handle_f_write(operation_desc: str) -> Callable:
"""
This decorator is used to handle errors of file write operations.
:param operation_desc: for example, "writing alignment results", etc.
:return: the decorated function with proper file writing errors' handling.
"""
def decorator(func):
@functools.wraps(func) # maintain function attrs after wrapping
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except PermissionError:
raise AppErr(f"Error: Permission "
f"denied to file while {operation_desc}.")
except OSError:
raise AppErr(f"Error: OS error occurred while {operation_desc}.")
except Exception as e:
raise AppErr(f"Error while {operation_desc}: {str(e)}")
return wrapper
return decorator
#################### File Reading ####################
def open_gz_file(filename: str, mode='r'):
"""This helper function is used to open a gzip or regular file."""
if filename.endswith('.gz'):
return gzip.open(filename, mode + 't') # mode in gzip format
else:
return open(filename, mode)
@handle_f_read("loading FASTA file")
def import_fasta(filename) -> List[Reference]:
"""This function imports sequences from a FASTA-formatted file.
This format is usually used for bacterial genomes."""
if not filename.endswith(('.fa', '.fasta', '.fa.gz', '.fasta.gz')):
raise AppErr(f"Error: {filename} is not a FASTA-formatted file.")
def read_fasta_helper():
current_id = ''
sequence = []
with open_gz_file(filename, "r") as f:
for line in f:
line = line.strip()
if line.startswith(">"):
if current_id and sequence: # if valid
sequence = ''.join(sequence)
if sequence:
yield Reference(current_id, sequence)
# only if something was added -> create Ref inst
current_id = line[1:].strip()
sequence = [] # after creating instance -> inits a new
else:
if all(char in NUCLEO_LETTERS for char in line):
sequence.append(line) # only if valid DNA letters
if current_id and sequence:
sequence = ''.join(sequence) #last
if sequence:
yield Reference(current_id, sequence)
return list(read_fasta_helper())
@handle_f_read("loading reference databases")
def load_kdb_file(filename: str) -> KmerCollection:
"""
This function loads a previously saved kmer collection from a pickle file,
and returns it as a KmerCollection instance if not corrupt.
"""
if not filename.endswith('.kdb'):
raise AppErr(f"Error: {filename} is not a KDB file.")
with gzip.open(filename, "rb") as f:
kmer_collection = pickle.load(f)
return kmer_collection
@handle_f_read("loading FASTQ file")
def import_fastq(filename) -> List[Read]:
"""This function imports sequences from a FASTQ-formatted file.
This format is usually used for NGS-sequenced genomes."""
if not filename.endswith(('.fq', '.fastq', '.fq.gz', '.fastq.gz')):
raise AppErr(f"Error: {filename} is not a FASTQ-formatted file.")
def read_fastq_chunks():
"""This helper function basically iterates over 4 line chunks
from the FASTQ file in an efficient manner."""
with open_gz_file(filename, "r") as f:
while True:
header = f.readline().strip()
if not header:
break
sequence = f.readline().strip()
plus = f.readline().strip()
quality = f.readline().strip()
if not header.startswith("@") or not plus.startswith("+"):
continue # invalid format
if not all(char in NUCLEO_LETTERS for char in sequence):
continue # invalid sequence
quality_str = [ord(char) - 33 for char in quality.strip()]
# converting using a Phred33 format
if len(sequence) != len(quality_str):
continue # quality str and seq don't match in length
yield Read(header[1:], sequence, quality_str)
# generator: returns and will continue to next in the next call
return list(read_fastq_chunks())
#################### File Writing ####################
@handle_f_write("saving kmer collection")
def save_kmer_collection(kmer_collection: KmerCollection,
output_file: str) -> None:
"""This function saves the kmer collection in a pickle file.
It uses gzip compression to open the file in binary mode."""
if not output_file.endswith('.kdb'):
raise AppErr(f"Error: {output_file} is not a KDB file.")
with gzip.open(output_file, "wb") as f:
pickle.dump(kmer_collection, f) # type: ignore
#################### Other ####################
def is_pos_int(var) -> bool:
"""This function checks if a variable is a positive integer."""
return isinstance(var, int) and var > 0
def get_kmer_size_from_collection(kmer_collection: KmerCollection) -> int:
"""This function extracts the kmer size from the collection."""
if len(kmer_collection.get_all_kmers()) > 0:
return len(next(iter(kmer_collection.get_all_kmers())).sequence)
else:
print("Collection does not contain any kmers.")