-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
245 lines (198 loc) · 8.73 KB
/
utils.py
File metadata and controls
245 lines (198 loc) · 8.73 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
import ast
import glob
import math
import os
import json
import numpy as np
import pandas.errors
import pytz
import pandas as pd
import datetime
from typing import Sequence
from ast import literal_eval
import constants
def validate_dir_path(path):
if not os.path.exists(path):
os.makedirs(path)
return path
def read_json(file):
with open(file, encoding='utf-8') as f:
return json.load(f)
def import_preprocessed(path):
data = pd.read_csv(path, index_col=0)
data.index = pd.to_datetime(data.index, utc=True)
data.index = data.index.tz_convert(constants.TZ)
return data
def localize_datetime(d):
"""
This function get a datetime.datetime object and do the following:
- convert it to time zone aware
- convert it to be dst aware
return: datetime.datetime
"""
tz_object = pytz.timezone(str(constants.TZ))
aware_dt = tz_object.localize(d)
return aware_dt
def get_test_dates(test_name: str):
"""
test_name: Must be one of: 'w1', 'w2', 'w3', 'w4'
return: tuple of 3 datetime.datetime (start_test, end_short_test, end_long_test)
"""
start_test = constants.TEST_TIMES[test_name]['start']
end_short_test = start_test + datetime.timedelta(hours=24)
end_long_test = constants.TEST_TIMES[test_name]['end']
return start_test, end_short_test, end_long_test
def drop_other_dmas(data, y_label):
cols_to_drop = list(set(constants.DMA_NAMES) - set([y_label]))
data = data.drop(cols_to_drop, axis=1)
return data
def calculate_zscore(column):
"""
Calculates the Z-scores of a pandas Series while ignoring NaN values.
:param column: Pandas Series with potential NaN values.
:return: Pandas Series with the Z-scores.
"""
mean = column.mean(skipna=True)
std = column.std(ddof=0, skipna=True)
z_scores = (column - mean) / std
return z_scores
def num_hours_between_timestamps(t1, t2):
"""
Calculate the number of hours between two timestamps where t2 > t1
param t1: datetime.datetime, period start
param t2: datetime.datetime, period end
return: int, number of hours
"""
diff = t2 - t1
days, seconds = diff.days, diff.seconds
hours = int(days * 24 + seconds // 3600)
return hours
def record_results(dma: str, short_model_name: str, long_model_name: str, dates: dict, lags: dict, norm_method: str,
pred_type: str, cols_to_move_stat: list, cols_to_decompose: list, clusters_idx: int,
score: Sequence):
# WINDOW_SIZE IS ACCORDING TO PREDICTION HORIZON
result = pd.DataFrame({
'dma': dma,
'short_model_name': short_model_name,
'long_model_name': long_model_name,
'start_train': dates['start_train'],
'start_test': dates['start_test'],
'end_test': dates['end_test'],
'lags': [lags],
'norm': norm_method,
'pred_type': pred_type,
'cols_to_move_stat': [cols_to_move_stat],
'cols_to_decompose': [cols_to_decompose],
'clusters_idx': clusters_idx,
'i1': score[0],
'i2': score[1],
'i3': score[2],
}, index=[datetime.datetime.now().strftime("%d-%m-%Y %H:%M:%S")])
df = pd.read_csv("models_scores.csv", index_col=0)
df = pd.concat([df, result])
df.to_csv("models_scores.csv", index=True)
def get_dfs_commons(df1, df2):
common_cols = df1.columns.intersection(df2.columns).to_list()
common_rows = pd.merge(df1, df2, left_index=True, right_index=True, how='right').index
df1 = df1.loc[common_rows, common_cols]
df2 = df2.loc[common_rows, common_cols]
return df1, df2
def decompose_dictionaries_column(df, column_name, prefix='_'):
df.reset_index(inplace=True)
try:
# convert str to dictionary
df[column_name] = df[column_name].apply(ast.literal_eval)
except (SyntaxError, ValueError):
pass
decomposed = pd.json_normalize(df[column_name])
decomposed.columns = [prefix + col for col in decomposed.columns]
df = pd.concat([df.drop([column_name], axis=1), decomposed], axis=1)
return df
def collect_experiments(dir_path, p, dmas, horizon, dates_idx, models, abs_n=None):
df = pd.DataFrame()
ranking_cols = ['i1', 'i2'] if horizon == 'short' else ['i3']
def get_param(raw_str):
return raw_str.split('-')[1]
def rank(df, ranking_cols):
df['w1'] = (df['i1'] - df['i1'].min()) / (df['i1'].max() - df['i1'].min())
df['w2'] = (df['i2'] - df['i2'].min()) / (df['i2'].max() - df['i2'].min())
df['w3'] = (df['i3'] - df['i3'].min()) / (df['i3'].max() - df['i3'].min())
df['rank'] = df[ranking_cols].mean(axis=1)
df = df.sort_values('rank')
return df
for i, fname in enumerate(glob.glob(dir_path + "/*.csv")):
_prefix, _dma_idx, _model_name, _dates_idx, _horizon, _slurm_id = fname.split('--')
_dma_idx = int(get_param(_dma_idx))
_model_name = get_param(_model_name)
_dates_idx = int(get_param(_dates_idx))
_horizon = get_param(_horizon)
if (_dma_idx in dmas) and (_model_name in models) and (_dates_idx in dates_idx) and (_horizon == horizon):
try:
temp = pd.read_csv(fname, index_col=0, engine="python", on_bad_lines='skip')
df = pd.concat([df, temp])
except pd.errors.EmptyDataError:
print(f"Empty DataFrame: {fname}")
def select_smallest_n(group, n_percent, ranking_cols):
group = rank(group, ranking_cols)
if abs_n is None:
n = max(50, math.ceil(len(group) * n_percent / 100))
else:
n = abs_n
return group.nsmallest(n, 'rank')
df = df.drop_duplicates(subset=['dma', 'dates_idx', 'model_name'] + ranking_cols)
print(df.shape, df['run_time'].mean())
df = df.groupby(['dma', 'model_name', 'dates_idx'], group_keys=False).apply(
lambda x: select_smallest_n(x, n_percent=p, ranking_cols=ranking_cols))
df = decompose_dictionaries_column(df, column_name='model_params', prefix='param_')
df = decompose_dictionaries_column(df, column_name='lags', prefix='lags_')
return df
def experiment_to_json(csv_path, horizon, models, export_path, constant_lags=None):
"""
Nested parsing of experiments csv files to construct final candidates
The csv files are generated with the select_models function in graphs where the number of models in each category
(dma, model_name, dates) need to be defined
"""
df = pd.read_csv(csv_path, index_col=0)
df["cols_to_move_stat"] = df["cols_to_move_stat"].apply(lambda x: [] if pd.isna(x) else literal_eval(x))
df["cols_to_decompose"] = df["cols_to_decompose"].apply(lambda x: [] if pd.isna(x) else literal_eval(x))
for col in df.columns:
if col.startswith("lags_"):
df[col] = df[col].fillna(0)
candidates = {}
for dma in constants.DMA_NAMES:
temp = df.loc[df['dma'] == dma]
dma_candidates = []
model_idx = 0
for i, row in temp.iterrows():
if row["model_name"] not in models:
continue
else:
params = {col[6:]: row[col] for col in temp if col.startswith("param_") and not pd.isnull(row[col])}
params = {name: int(value) if isinstance(value, (int, float)) and np.floor(value) == value else value
for name, value in params.items()}
if constant_lags is None:
lags = {"Rainfall depth (mm)": int(row["lags_Rainfall depth (mm)"]),
"Air temperature (°C)": int(row["lags_Air temperature (°C)"]),
"Windspeed (km/h)": int(row["lags_Windspeed (km/h)"]),
"Air humidity (%)": int(row["lags_Air humidity (%)"])
}
else:
lags = constant_lags
dma_candidates.append(
{
"model_idx": model_idx,
"model_name": row["model_name"],
"params": params,
"cols_to_move_stat": row["cols_to_move_stat"],
"cols_to_decompose": row["cols_to_decompose"],
"decompose_target": True if dma in row["cols_to_decompose"] else False,
"norm_method": row["norm"],
"lags": lags,
"lag_target": int(row[f"lags_{dma}"]),
"clusters_idx": int(row["clusters_idx"]) if not pd.isnull(row["clusters_idx"]) else None
}
)
model_idx += 1
candidates[dma] = {horizon: dma_candidates}
with open(export_path, 'w', encoding='utf8') as file:
json.dump(candidates, file, indent=4, ensure_ascii=False)