-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathclassifier.hivecote.py
More file actions
119 lines (102 loc) · 4.17 KB
/
classifier.hivecote.py
File metadata and controls
119 lines (102 loc) · 4.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import os
import numpy as np
import pandas as pd
from sklearn.metrics import classification_report
from sktime.classification.hybrid import HIVECOTEV2
from sktime.classification.kernel_based import RocketClassifier
from sklearn.metrics import multilabel_confusion_matrix
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import matplotlib.pyplot as plt
SAMPLELEN = 900
def get_data_paths(folder_path):
data_paths = {}
for sensor in ["Accel", "Gyro"]:
sensor_path = f"{folder_path}/{sensor}"
data_paths[sensor] = {
"Training": f"{sensor_path}/Training/",
"Validation": f"{sensor_path}/Validation/",
"Test": f"{sensor_path}/Test/",
}
return data_paths
def load_data_from_file(file_path):
data = pd.read_csv(file_path)
return data
def split_file(file_path, file, typeOfData, device):
dataPaths = get_data_paths(file_path)
dataFromFile = load_data_from_file(
os.path.abspath(os.path.join(dataPaths[device][typeOfData], file))
)
features = dataFromFile[["Activity Label", "Timestamp", "x", "y", "z"]]
Y_train = []
X_train = []
for i in range(0, len(features.index) // 100 * 100, SAMPLELEN):
previousLabel = ""
x = []
y = []
z = []
for k in range(0, SAMPLELEN):
if (i + k) < len(features.index):
if previousLabel == "" or features.iloc[i + k][0] == previousLabel:
x.append(features.iloc[i + k][2])
y.append(features.iloc[i + k][3])
z.append(features.iloc[i + k][4])
previousLabel = features.iloc[i + k][0]
else:
break
if len(x) == len(y) == len(z) == SAMPLELEN:
X_train.append([x, y, z])
Y_train.append(previousLabel)
Y_train = np.array(Y_train)
X_train = np.array(X_train)
# Reshape XTrain to be in numpy3D format
X_train = X_train.reshape(-1, 3, SAMPLELEN)
return X_train, Y_train
def load_data(file_path, typeOfData):
dataPaths = get_data_paths(file_path)
dataFileArrayAccel = os.listdir(dataPaths["Accel"][typeOfData])
dataFileArrayGyro = os.listdir(dataPaths["Gyro"][typeOfData])
XTrain_Combined_Accel = np.empty((0, 3, SAMPLELEN))
XTrain_Combined_Gyro = np.empty((0, 3, SAMPLELEN))
YTrain_Combined = []
for filename in dataFileArrayAccel:
if filename.startswith(".~") == False:
XTrain, YTrain = split_file(file_path, filename, typeOfData, "Accel")
XTrain_Combined_Accel = np.concatenate(
(XTrain_Combined_Accel, XTrain), axis=0
)
YTrain_Combined.extend(YTrain)
for filename in dataFileArrayGyro:
if filename.startswith(".~") == False:
XTrainGyro, YTrainGyro = split_file(file_path, filename, typeOfData, "Gyro")
XTrain_Combined_Gyro = np.concatenate((XTrain_Combined_Gyro, XTrainGyro))
XTrain_Combined_Accel = np.array(XTrain_Combined_Accel)
XTrain_Combined_Gyro = np.array(XTrain_Combined_Gyro)
min_length = min(XTrain_Combined_Accel.shape[0], XTrain_Combined_Gyro.shape[0])
XTrain_Combined = np.concatenate(
(XTrain_Combined_Accel[:min_length], XTrain_Combined_Gyro[:min_length]), axis=1
)
YTrain_Combined = np.array(YTrain_Combined[:min_length])
return XTrain_Combined, YTrain_Combined
print("Loading data..\n")
XTest, YTest = load_data("ProcessedData", "Test")
XTrain, YTrain = load_data("ProcessedData", "Training")
classifier = HIVECOTEV2(
drcif_params={"n_estimators": 200},
time_limit_in_minutes=2,
)
print("Fitting Classifier..\n")
classifier.fit(XTrain, YTrain)
print("Running Prediction..\n")
y_pred = classifier.predict(XTest)
y_predproba = classifier.predict_proba(XTest)
print(f"guesses: \n {y_pred}")
print(f"Probabilities from guess: \n {y_predproba}")
print(f"Actual: \n {YTest}")
report = classification_report(YTest, y_pred)
print("Classification Report:\n", report)
print("Classification Report:\n", report)
cm = confusion_matrix(YTest, y_pred)
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=["B", "C", "M"])
disp.plot()
plt.title("My confusion matrix")
plt.show()