Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
286 changes: 38 additions & 248 deletions README.md

Large diffs are not rendered by default.

1,269 changes: 1,269 additions & 0 deletions Showcases.ipynb

Large diffs are not rendered by default.

110 changes: 100 additions & 10 deletions bio_files_processor.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,28 @@
from __future__ import annotations
import os
from dataclasses import dataclass


def convert_multiline_fasta_to_oneline(input_fasta: str, output_fasta: str = None) -> None:
"""
Converts a multiline FASTA file to a oneline FASTA file.

Converts a multiline FASTA file to an oneline FASTA file.
Args:
- input_fasta (str): path to the input FASTA file.
- output_fasta (str): path to the output oneline FASTA file. If not provided, it will be generated
using the input file name.

Returns:
- None: The function doesn't return a value but writes the oneline FASTA to the output file.
"""
if output_fasta is None:
output_fasta = 'oneline_' + os.path.basename(input_fasta)

with open(input_fasta, mode='r') as infile:
multiline = infile.readlines()

output_str = []
name = ''
record = []

for line in multiline:
line = line.strip()
if line.startswith('>') and not name:
Expand All @@ -35,12 +35,12 @@ def convert_multiline_fasta_to_oneline(input_fasta: str, output_fasta: str = Non
output_str.append(sequence)
name = line
record = []

if name and record:
sequence = ''.join(record)
output_str.append(name)
output_str.append(sequence)

with open(output_fasta, mode='w') as outfile:
outfile.write('\n'.join(output_str))
return None
Expand All @@ -65,7 +65,7 @@ def select_genes_from_gbk_to_fasta(input_gbk: str, genes_to_find: list, n_before
if output_fasta is None:
name = os.path.basename(input_gbk).split('.')[0]
output_fasta = name + '.fasta'

with open(input_gbk, mode='r') as file:
gbk = file.readlines()
gene_protein_list = []
Expand Down Expand Up @@ -101,7 +101,7 @@ def select_genes_from_gbk_to_fasta(input_gbk: str, genes_to_find: list, n_before
flanks.append(idx - i)
for i in range(1, n_after + 1):
flanks.append(idx + i)

selected_records = [gene_protein_list[i] for i in flanks]
with open(output_fasta, mode='w') as outfile:
for gene, protein in selected_records:
Expand Down Expand Up @@ -130,6 +130,7 @@ def change_fasta_start_pos(input_fasta: str, shift: int, output_fasta: str = Non

output_str = []
name = ''
shifted_sequence = ''
sequence = []

for line in input_data:
Expand Down Expand Up @@ -195,3 +196,92 @@ def parse_blast_output(input_file: str, output_file: str = None) -> None:
outfile.write('\n'.join(significant_alignments))

return None


@dataclass
class FastaRecord:
"""
Represents a FASTA record.

Attributes:
id (str): The identifier of the record.
description (str): The description or additional information about the record.
sequence (str): The sequence data of the record.
"""
id: str
description: str
sequence: str

def __repr__(self) -> str:
if len(self.sequence) < 10:
repr_seq = self.sequence
else:
repr_seq = f"{self.sequence[:10]}..."
return f"FastaRecord('{repr_seq}')"


class OpenFasta:
"""
Provides a context manager for reading FASTA files.

Attributes:
file (str): The path to the FASTA file.
mode (str): The mode in which the file should be opened (default is "r").
"""

def __init__(self, file, mode="r"):
self.file = file
self.mode = mode
self.handler = None
self.current_line = None

def __enter__(self) -> OpenFasta:
self.handler = open(self.file, mode=self.mode)
return self

def __exit__(self, exc_type, exc_val, exc_tb) -> None:
if self.handler:
self.handler.close()

def __iter__(self) -> OpenFasta:
return self

def __next__(self) -> FastaRecord:
line = self.handler.readline().strip()
if line == "":
raise StopIteration

lines = []

if self.current_line is not None:
lines.append(self.current_line)
self.current_line = None

while line:
if line.startswith(">") and lines:
header = lines[0][1:].split(" ", 1)
record_id = header[0]
record_description = header[1] if len(header) > 1 else ""
sequence = "".join(lines[1:])
return FastaRecord(id=record_id, description=record_description, sequence=sequence)
else:
lines.append(line)
line = self.handler.readline().strip()

if lines:
header = lines[0][1:].split(" ", 1)
record_id = header[0]
record_description = header[1] if len(header) > 1 else ""
sequence = "".join(lines[1:])
return FastaRecord(id=record_id, description=record_description, sequence=sequence)

raise StopIteration

def read_record(self) -> FastaRecord:
return self.__next__()

def read_records(self) -> list:
records = []
for record in self.__iter__():
records.append(record)
return records
143 changes: 143 additions & 0 deletions custom_random_forest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
from __future__ import annotations
from typing import Optional
import numpy as np
from sklearn.base import BaseEstimator
from sklearn.tree import DecisionTreeClassifier
from multiprocessing import Pool


class RandomForestClassifierCustom(BaseEstimator):
"""
A custom implementation of Random Forest classifier.
Parameters:
n_estimators (int, default=10): The number of trees in the forest.
max_depth (int or None, default=None): The maximum depth of the tree. If None, then nodes are expanded
until all leaves are pure or until all leaves contain less than min_samples_split samples.
max_features (int or None, default=None): The number of features to consider when looking for the best split.
random_state (int or None, default=None): Controls the randomness of the bootstrapping and the individual trees.
Attributes:
classes_ (list): The classes labels.
trees (list): The list of DecisionTreeClassifier models.
feat_ids_by_tree (list): The list of feature indices used by each tree.
"""

def __init__(
self, n_estimators: int = 10, max_depth: Optional[int] = None,
max_features: Optional[int] = None, random_state: Optional[int] = None
) -> None:
self.n_estimators = n_estimators
self.max_depth = max_depth
self.max_features = max_features
self.random_state = random_state

self.trees = []
self.feat_ids_by_tree = []

def _fit_single_tree(self, args) -> tuple:
"""
Fit a single decision tree.
Args:
args (tuple): A tuple containing the following elements:
X (numpy.ndarray): The input features.
y (numpy.ndarray): The target labels.
max_depth (int): The maximum depth of the tree.
max_features (int): The maximum number of features to consider when splitting.
random_state (int): The random seed for reproducibility.
i (int): The index of the tree.
Returns:
tuple: A tuple containing the trained decision tree and the indices of the selected features.
"""
X, y, max_depth, max_features, random_state, i = args
np.random.seed(random_state + i)
n_samples, n_features = X.shape

np.random.seed(self.random_state + i)

feature_idx = np.random.choice(range(n_features),
size=n_features if self.max_features is None else self.max_features,
replace=False)
bootstrap_idx = np.random.choice(range(n_samples), size=n_samples, replace=True)
X_bootstrap = X[bootstrap_idx][:, feature_idx]
y_bootstrap = y[bootstrap_idx]

tree = DecisionTreeClassifier(
max_depth=self.max_depth,
max_features=self.max_features,
random_state=self.random_state + i
)
tree.fit(X_bootstrap, y_bootstrap)
self.trees.append(tree)

return tree, feature_idx


def fit(self, X: np.ndarray, y: np.ndarray, n_jobs: int = 1) -> RandomForestClassifierCustom:
"""
Fit the Random Forest classifier to the training data.
Parameters:
X (array-like of shape (n_samples, n_features)): The input samples.
y (array-like of shape (n_samples,)): The target values.
n_jobs (int, default=1): The number of processes to use for parallel training.
If n_jobs=-1, all available processes will be used.
Returns:
self (RandomForestClassifierCustom): The fitted estimator.
"""
self.trees = []
self.feat_ids_by_tree = []
self.classes_ = sorted(np.unique(y))

with Pool(n_jobs) as pool:
results = pool.map(self._fit_single_tree,
[(X, y, self.max_depth, self.max_features, self.random_state, i) for i in
range(self.n_estimators)])

self.trees, self.feat_ids_by_tree = zip(*results)

return self

def _predict_proba_single_tree(self, args) -> np.ndarray:
"""
Predict class probabilities for X using a single decision tree.
Parameters:
args (tuple): A tuple containing the following elements:
tree (DecisionTreeClassifier): The decision tree model.
feature_idx (numpy.ndarray): The feature indices used by the tree.
X (numpy.ndarray): The input samples.
Returns:
probas (numpy.ndarray): The class probabilities of the input samples predicted by the tree.
"""
tree, feature_idx, X = args
X_subset = X[:, feature_idx]
single_tree_probas = tree.predict_proba(X_subset)
return single_tree_probas

def predict_proba(self, X: np.ndarray, n_jobs: int = 1) -> np.ndarray:
"""
Predict class probabilities for X.
Parameters:
X (array-like of shape (n_samples, n_features)): The input samples.
n_jobs (int, default=1): The number of processes to use for parallel prediction.
If n_jobs=-1, all available processes will be used.
Returns:
probas (array-like of shape (n_samples, n_classes)): The class probabilities of the input samples.
"""
with Pool(n_jobs) as pool:
probas = pool.map(self._predict_proba_single_tree,
[(tree, feature_idx, X) for tree, feature_idx in zip(self.trees, self.feat_ids_by_tree)]
)

avg_probas = np.sum(probas, axis=0) / self.n_estimators
return avg_probas

def predict(self, X: np.ndarray) -> np.ndarray:
"""
Predict class labels for X.
Parameters:
X (array-like of shape (n_samples, n_features)): The input samples.
Returns:
predictions (array-like of shape (n_samples,)): The predicted class labels.
"""
probas = self.predict_proba(X)
predictions = np.argmax(probas, axis=1)

return predictions
Loading