-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathcreateMlTable.py
More file actions
109 lines (84 loc) · 3.01 KB
/
createMlTable.py
File metadata and controls
109 lines (84 loc) · 3.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import argparse
from pathlib import Path
import mltable
from workflowhelperfunc.workflowhelper import setup_logger, log_event
## This logic needs to be reviewed and updated, don't think deleting a file is the right way to go
def create_directories(save_dir, logger):
"""
Create necessary directories.
Args:
save_dir (str): The directory where the processed table will be saved.
logger (Logger): Logger object for logging events.
"""
save_dir = Path(save_dir)
save_dir.mkdir(parents=True, exist_ok=True)
# Define subdirectories for MLTable
mltable_dir = save_dir / "MLTable"
# Check if a file with the same name exists
if mltable_dir.is_file():
log_event(logger, 'info', f"A file with the same name '{mltable_dir}' already exists. \
Deleting it.")
mltable_dir.unlink()
# Create the subdirectories if they do not exist
mltable_dir.mkdir(parents=True, exist_ok=True)
return mltable_dir
def load_data():
"""
Load data from predefined paths.
Returns:
mltable.mltable: The loaded ML table.
"""
# Define the data source
paths = [
{
"pattern": f"wasbs://nyctlc@azureopendatastorage.blob.core.windows\
.net/{color}/puYear={year}/puMonth=*/**/*.parquet"
}
for color in ["green", "yellow"]
for year in range(2015, 2020)
]
# Load the data and create an ML table
tbl = mltable.from_parquet_files(paths)
return tbl
def preprocess_data(tbl):
"""
Preprocess the data.
Args:
tbl (mltable.mltable): The mltable to preprocess.
Returns:
mltable.mltable: The preprocessed ML table.
"""
tbl = tbl.take_random_sample(probability=0.001, seed=735)
tbl = tbl.filter("col('tripDistance') > 0")
tbl = tbl.drop_columns(["puLocationId", "doLocationId"])
tbl = tbl.extract_columns_from_partition_format("/puYear={year}/puMonth={month}")
# Set the table name
tbl.name = "NYC_taxi"
return tbl
def save_data(tbl, mltable_dir):
"""
Save the processed data.
Args:
tbl (mltable.mltable): The preprocessed mltable.
mltable_dir (Path): Directory to save the mltable.
"""
# Save the ML table
tbl.save(str(mltable_dir))
def main():
"""
The main function that gets the save directory from the user and creates and saves the ML table.
"""
logger = setup_logger(__name__)
try:
parser = argparse.ArgumentParser(description='Create and save ML table.')
parser.add_argument('save_directory', type=str, help='Directory to save data')
args = parser.parse_args()
mltable_dir = create_directories(args.save_directory, logger)
tbl = load_data()
tbl = preprocess_data(tbl)
save_data(tbl, mltable_dir)
log_event(logger, 'info', "ML table created and saved successfully.")
except Exception as error:
log_event(logger, 'error', f"An error occurred: {str(error)}")
if __name__ == "__main__":
main()