-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmonitor.py
More file actions
139 lines (113 loc) · 4.52 KB
/
monitor.py
File metadata and controls
139 lines (113 loc) · 4.52 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
from typing import List
import pandas as pd
from pyspark.sql import SparkSession
from moc_monitors import DriftDetector
from moc_schema_infer import set_detector_parameters
# modelop.init
def begin():
# A new comment
# Another comment
# A final change
# A change at 6:34
# Introducing a new change
# testing again
print("Begin function...")
global SPARK
SPARK = SparkSession.builder.appName("DriftTest").getOrCreate()
# Read schema
#Making a new change here
global SCHEMA
SCHEMA = pd.read_json("df_sample_scored_input_schema.avsc", orient="records")
# set schema index to be "name"
SCHEMA.set_index("name", inplace=True)
# modelop.metrics
def metrics(external_inputs: List, external_outputs: List, external_model_assets: List):
print("Metrics function...")
comparator_path, baseline_path, output_path = parse_assets(
external_inputs, external_outputs, external_model_assets
)
# Load the assets as Pandas dataframes
baseline_df = SPARK.read.option("header", "true").csv(baseline_path).toPandas()
comparator_df = SPARK.read.option("header", "true").csv(comparator_path).toPandas()
print("Detecting drift...")
print("Baseline df:")
print(baseline_df)
print("Comparator df:")
print(comparator_df)
# Run the monitoring to detect drift
detector_parameters = set_detector_parameters(SCHEMA)
drift_detector = DriftDetector(
df_baseline=baseline_df,
df_sample=comparator_df,
categorical_columns=detector_parameters["categorical_columns"],
numerical_columns=detector_parameters["numerical_columns"],
score_column=detector_parameters["score_column"][0],
label_column=detector_parameters["label_column"][0],
)
output = drift_detector.calculate_drift(
pre_defined_metric="jensen-shannon", user_defined_metric=None
)
print("Monitor output:")
print(output)
# Spark doesn't like numpy float values, so convert the output to
# pandas before casting to a Spark Dataframe
#
# But, pandas needs the dict to have list values, so first we need
# to wrap each value in a list
print("Converting drift output to Spark...")
output_as_lists = {k: [v] for k, v in output.items()}
output_pandas_df = pd.DataFrame.from_dict(output_as_lists)
# Cast to Spark dataframe
output_df = SPARK.createDataFrame(output_pandas_df)
print("Spark metrics output:")
output_df.show()
print("Writing output to", output_path)
# Use coalesce() so that the output is a single file for easy reading
output_df.coalesce(1).write.mode("overwrite").option("header", "true").format(
"json"
).save(output_path)
SPARK.stop()
def parse_assets(
external_inputs: List, external_outputs: List, external_model_assets: List
):
"""
Returns a tuple of (comparator_path, baseline_path, output_path) for paths to HDFS
assets
"""
print("Parsing assets...")
### Input assets
# Grab input assets from arguments
comparator_assets = [
a for a in external_inputs if a["assetRole"] == "COMPARATOR_DATA"
]
if len(comparator_assets) != 1:
raise ValueError(
"There must be one item in input assets with COMPARATOR_DATA role, found {}".format(
len(comparator_assets)
)
)
comparator_asset = comparator_assets[0]
baseline_assets = [a for a in external_inputs if a["assetRole"] == "TRAINING_DATA"]
if len(baseline_assets) != 1:
raise ValueError(
"There must be one item in input assets with TRAINING_DATA role, found {}".format(
len(baseline_assets)
)
)
baseline_asset = baseline_assets[0]
# If either asset is a JSON, error out because Spark doesn't like JSON
if ("fileFormat" in comparator_asset) and (
comparator_asset["fileFormat"] == "JSON"
):
raise ValueError("Comparator data file format is set as JSON but must be CSV")
if ("fileFormat" in baseline_asset) and (baseline_asset["fileFormat"] == "JSON"):
raise ValueError("Baseline data file format is set as JSON but must be CSV")
# Pull the HDFS file paths for the assets
comparator_path = comparator_asset["fileUrl"]
baseline_path = baseline_asset["fileUrl"]
### Output asset
# Grab the output asset
if len(external_outputs) != 1:
raise ValueError("There must be one output asset, found 0")
output_path = external_outputs[0]["fileUrl"]
return (comparator_path, baseline_path, output_path)