-
Notifications
You must be signed in to change notification settings - Fork 53
add gcp storage to xgboost-operator #81
base: master
Are you sure you want to change the base?
Changes from all commits
2240810
ba00fb4
f3d8619
d904b9c
813e3cc
4056365
bae957b
b583c1b
758ec4e
4125851
9a0e655
a2a1702
05675a1
dc71d6b
cf309d8
ead8563
fcf83ec
ef7a7d0
9b5d214
309eee4
fb48969
af63ce3
ca9bed0
0c22468
1db400c
ca55228
2490274
eeb1049
fc7543f
8d6cf3c
341bd49
c8185e7
925e26f
a313d2a
cf82e5a
e57465a
06d2992
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,6 +15,8 @@ | |
| import xgboost as xgb | ||
| import os | ||
| import tempfile | ||
| from googel.cloud import storage | ||
| from oauth2client.service_account import ServiceAccountCredentials | ||
| import oss2 | ||
| import json | ||
| import pandas as pd | ||
|
|
@@ -59,7 +61,7 @@ def read_train_data(rank, num_workers, path): | |
| y = iris.target | ||
|
|
||
| start, end = get_range_data(len(x), rank, num_workers) | ||
| x = x[start:end, :] | ||
| x = x[start:end] | ||
| y = y[start:end] | ||
|
|
||
| x = pd.DataFrame(x) | ||
|
|
@@ -87,7 +89,7 @@ def read_predict_data(rank, num_workers, path): | |
| y = iris.target | ||
|
|
||
| start, end = get_range_data(len(x), rank, num_workers) | ||
| x = x[start:end, :] | ||
| x = x[start:end] | ||
| y = y[start:end] | ||
| x = pd.DataFrame(x) | ||
| y = pd.DataFrame(y) | ||
|
|
@@ -113,7 +115,7 @@ def get_range_data(num_row, rank, num_workers): | |
| x_start = rank * num_per_partition | ||
| x_end = (rank + 1) * num_per_partition | ||
|
|
||
| if x_end > num_row: | ||
| if x_end > num_row or (rank==num_workers-1 and x_end< num_row): | ||
| x_end = num_row | ||
|
|
||
| return x_start, x_end | ||
|
|
@@ -140,10 +142,18 @@ def dump_model(model, type, model_path, args): | |
| oss_param = parse_parameters(args.oss_param, ",", ":") | ||
| if oss_param is None: | ||
| raise Exception("Please config oss parameter to store model") | ||
|
|
||
| return False | ||
| oss_param['path'] = args.model_path | ||
| dump_model_to_oss(oss_param, model) | ||
| logging.info("Dump model into oss place %s", args.model_path) | ||
| elif type == 'gcp': | ||
| gcp_param = parse_parameters(args.gcp_param, ',',':') | ||
| if gcp_param is None: | ||
| raise Exception('Please config gcp parameter to store model') | ||
| return False | ||
| gcp_param['path'] = args.model_path | ||
| dump_model_to_gcp(gcp_param, model) | ||
| logging.info('Dump model into gcp place %s', args.model_path) | ||
|
|
||
| return True | ||
|
|
||
|
|
@@ -171,6 +181,14 @@ def read_model(type, model_path, args): | |
|
|
||
| model = read_model_from_oss(oss_param) | ||
| logging.info("read model from oss place %s", model_path) | ||
| elif type == 'gcp': | ||
| gcp_param = parse_parameters(args.gcp_param,',',':') | ||
| if gcp_param is None: | ||
| raise Exception('Please config gcp to read model') | ||
| return False | ||
| gcp_param['path'] = args.model_path | ||
| model = read_model_from_gcp(args.gcp_param) | ||
| logging.info('read model from gcp place %s', model_path) | ||
|
|
||
| return model | ||
|
|
||
|
|
@@ -189,7 +207,7 @@ def dump_model_to_oss(oss_parameters, booster): | |
| 'feature_importance.json') | ||
|
|
||
| oss_path = oss_parameters['path'] | ||
| logger.info('---- export model ----') | ||
| logger.info('---- export model to OSS----') | ||
| booster.save_model(model_fname) | ||
| booster.dump_model(text_model_fname) # format output model | ||
| fscore_dict = booster.get_fscore() | ||
|
|
@@ -208,6 +226,39 @@ def dump_model_to_oss(oss_parameters, booster): | |
| upload_oss(oss_parameters, model_fname, aux_path) | ||
| upload_oss(oss_parameters, text_model_fname, aux_path) | ||
| upload_oss(oss_parameters, feature_importance, aux_path) | ||
| logger.info('---- model uploaded to OSS successfully!----') | ||
| else: | ||
| raise Exception("fail to generate model") | ||
| return False | ||
|
|
||
| return True | ||
| def dump_model_to_gcp(gcp_parameters,booster): | ||
| model_fname = os.path.join(tempfile.mkdtemp(), 'model') | ||
| text_model_fname = os.path.join(tempfile.mkdtemp(), 'model.text') | ||
| feature_importance = os.path.join(tempfile.mkdtemp(), | ||
| 'feature_importance.json') | ||
|
|
||
| gcp_path = gcp_parameters['path'] | ||
| logger.info('---- export model to GCP----') | ||
| booster.save_model(model_fname) | ||
| booster.dump_model(text_model_fname) | ||
| fscore_dict = booster.get_fscore() | ||
| with open(feature_importance, 'w') as file: | ||
| file.write(json.dumps(fscore_dict)) | ||
| logger.info('---- chief dump model successfully!') | ||
|
|
||
| if os.path.exists(model_fname): | ||
| logger.info('---- Upload Model start...') | ||
|
|
||
| while gcp_path[-1] == '/': | ||
| gcp_path = gcp_path[:-1] | ||
|
|
||
| upload_gcp(gcp_parameters, model_fname, gcp_path) | ||
| aux_path = gcp_path + '_dir/' | ||
| upload_gcp(gcp_parameters, model_fname, aux_path) | ||
| upload_gcp(gcp_parameters, text_model_fname, aux_path) | ||
| upload_gcp(gcp_parameters, feature_importance, aux_path) | ||
| logger.info('---- model uploaded to GCP successfully!----') | ||
| else: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add the log to say that this model is updated success?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for sure |
||
| raise Exception("fail to generate model") | ||
| return False | ||
|
|
@@ -237,6 +288,25 @@ def upload_oss(kw, local_file, oss_path): | |
| except Exception(): | ||
| raise ValueError('upload %s to %s failed' % | ||
| (os.path.abspath(local_file), oss_path)) | ||
| def upload_gcp(kw, local_file, gcp_path): | ||
| if gcp_path[-1] == '/': | ||
| gcp_path = '%s%s' % (gcp_path, os.path.basename(local_file)) | ||
| credentials_dict = { | ||
| 'type': 'service_account', | ||
| 'client_id': kw['client_id'], | ||
| 'client_email': kw['client_email'], | ||
| 'private_key_id':kw['private_key_id'], | ||
| 'private_key': kw['private_key'], | ||
| } | ||
| credentials=ServiceAccountCredentials.from_json_keyfile_dict( | ||
| credentials_dict | ||
| ) | ||
| client = storage.Client(credentials=credentials) | ||
| bucket=storage.get_bucket(kw['access_bucket']) | ||
| blob=bucket.blob(gcp_path) | ||
| blob.upload_from_filename(local_file) | ||
|
|
||
|
|
||
|
|
||
|
|
||
| def read_model_from_oss(kw): | ||
|
|
@@ -263,7 +333,29 @@ def read_model_from_oss(kw): | |
| bst.load_model(temp_model_fname) | ||
|
|
||
| return bst | ||
|
|
||
| def read_model_from_gcp(kw): | ||
| credentials_dict = { | ||
| 'type': 'service_account', | ||
| 'client_id': kw['client_id'], | ||
| 'client_email': kw['client_email'], | ||
| 'private_key_id':kw['private_key_id'], | ||
| 'private_key': kw['private_key'], | ||
| } | ||
| credentials=ServiceAccountCredentials.from_json_keyfile_dict( | ||
| credentials_dict | ||
| ) | ||
| client = storage.Client(credentials=credentials) | ||
| bucket=storage.get_bucket(kw['access_bucket']) | ||
| gcp_path = kw["path"] | ||
| blob = bucket.blob(gcp_path) | ||
| temp_model_fname = os.path.join(tempfile.mkdtemp(), 'local_model') | ||
| try: | ||
| blob.download_to_filename(temp_model_fname) | ||
| logger.info("success to load model from gcp %s", gcp_path) | ||
| except Exception as e: | ||
| logging.error("fail to load model: " + e) | ||
| raise Exception("fail to load model from gcp %s", gcp_path) | ||
|
|
||
|
|
||
| def parse_parameters(input, splitter_between, splitter_in): | ||
| """ | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,44 @@ | ||
| apiVersion: "xgboostjob.kubeflow.org/v1alpha1" | ||
| kind: "XGBoostJob" | ||
| metadata: | ||
| name: "xgboost-dist-iris-test-predict-gcp" | ||
| spec: | ||
| xgbReplicaSpecs: | ||
| Master: | ||
| replicas: 1 | ||
| restartPolicy: Never | ||
| template: | ||
| apiVersion: v1 | ||
| kind: Pod | ||
| spec: | ||
| containers: | ||
| - name: xgboostjob | ||
| image: docker.io/xfate123/xgboost-dist-iris:1.1 | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use a local image |
||
| ports: | ||
| - containerPort: 9991 | ||
| name: xgboostjob-port | ||
| imagePullPolicy: Always | ||
| args: | ||
| - --job_type=Predict | ||
| - --model_path=autoAI/xgb-opt/2 | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we simplify the model path? |
||
| - --model_storage_type=gcp | ||
| - --gcp_param=unknown | ||
| Worker: | ||
| replicas: 2 | ||
| restartPolicy: ExitCode | ||
| template: | ||
| apiVersion: v1 | ||
| kind: Pod | ||
| spec: | ||
| containers: | ||
| - name: xgboostjob | ||
| image: docker.io/xfate123/xgboost-dist-iris:1.1 | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same here |
||
| ports: | ||
| - containerPort: 9991 | ||
| name: xgboostjob-port | ||
| imagePullPolicy: Always | ||
| args: | ||
| - --job_type=Predict | ||
| - --model_path=autoAI/xgb-opt/2 | ||
| - --model_storage_type=gcp | ||
| - --gcp_param=unknown | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why unknown here? |
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,7 +17,7 @@ spec: | |
| claimName: xgboostlocal | ||
| containers: | ||
| - name: xgboostjob | ||
| image: docker.io/merlintang/xgboost-dist-iris:1.1 | ||
| image: docker.io/xfate123/xgboost-dist-iris:1.1 | ||
| volumeMounts: | ||
| - name: task-pv-storage | ||
| mountPath: /tmp/xgboost_model | ||
|
|
@@ -42,7 +42,7 @@ spec: | |
| claimName: xgboostlocal | ||
| containers: | ||
| - name: xgboostjob | ||
| image: docker.io/merlintang/xgboost-dist-iris:1.1 | ||
| image: docker.io/xfate123/xgboost-dist-iris:1.1 | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we have a Dockerfile for this image in this repo?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We only have image in this repo |
||
| volumeMounts: | ||
| - name: task-pv-storage | ||
| mountPath: /tmp/xgboost_model | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dump model to local ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I learnt it from dump to oss module, I think the logic is dump the model to local first, and then upload from local to the cloud