Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cloud/google/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
mkdir -p /etc/docker
systemctl restart docker
gcloud auth --quiet configure-docker
gcloud auth --quiet configure-docker us-docker.pkg.dev,us-east1-docker.pkg.dev,us-central1-docker.pkg.dev,us-west1-docker.pkg.dev
'''

INSTALL_NVIDIA_DOCKER_CMD = '''
Expand Down
8 changes: 7 additions & 1 deletion dags/google_api_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,21 +223,27 @@ def ramp_up_cluster(key, initial_size, total_size):
run_metadata = Variable.get("run_metadata", deserialize_json=True, default_var={})
if not run_metadata.get("manage_clusters", True):
return
already_at_size = False
try:
target_sizes = Variable.get("cluster_target_size", deserialize_json=True)
already_at_size = target_sizes.get(key, 0) >= total_size
target_sizes[key] = total_size
Variable.set("cluster_target_size", target_sizes, serialize_json=True)
slack_message(":information_source: ramping up cluster {} to {} instances, starting from {} instances".format(key, total_size, min(initial_size, total_size)))
increase_instance_group_size(key, min(initial_size, total_size))
except:
increase_instance_group_size(key, total_size)
sleep(60)
if not already_at_size:
sleep(60)
Variable.set("cluster_target_size", target_sizes, serialize_json=True)

def ramp_down_cluster(key, total_size):
run_metadata = Variable.get("run_metadata", deserialize_json=True, default_var={})
if not run_metadata.get("manage_clusters", True):
return
if Variable.get("batch_keep_cluster", default_var="false") == "true":
slack_message(f":recycle: Batch mode: keeping {key} cluster alive for next job")
return
try:
target_sizes = Variable.get("cluster_target_size", deserialize_json=True)
target_sizes[key] = total_size
Expand Down
91 changes: 50 additions & 41 deletions slackbot/pipeline_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,53 +207,62 @@ def handle_batch(task, msg):
replyto(msg, "Batch jobs will reuse on the parameters from the first job unless new parameters are specified, *including those with default values*")

default_param = json_obj[0]
for i, p in enumerate(json_obj):
if visible_messages(broker_url, "seuronbot_cmd") != 0:
cmd = get_message(broker_url, "seuronbot_cmd")
if cmd == "cancel":
replyto(msg, "Cancel batch process")
break

if p.get("INHERIT_PARAMETERS", True):
param = deepcopy(default_param)
else:
param = {}

if i > 0:
if 'NAME' in param:
del param['NAME']
for k in p:
param[k] = p[k]
supply_default_param(param)
replyto(msg, "*Sanity check: batch job {} out of {}*".format(i+1, len(json_obj)))
is_batch = len(json_obj) > 1
try:
for i, p in enumerate(json_obj):
if visible_messages(broker_url, "seuronbot_cmd") != 0:
cmd = get_message(broker_url, "seuronbot_cmd")
if cmd == "cancel":
replyto(msg, "Cancel batch process")
break

if p.get("INHERIT_PARAMETERS", True):
param = deepcopy(default_param)
else:
param = {}

if i > 0:
if 'NAME' in param:
del param['NAME']
for k in p:
param[k] = p[k]
supply_default_param(param)
replyto(msg, "*Sanity check: batch job {} out of {}*".format(i+1, len(json_obj)))
state = "unknown"
current_task = guess_run_type(param)
if current_task == "seg_run":
set_variable('param', param, serialize_json=True)
state = run_dag("sanity_check", wait_for_completion=True).state
elif current_task == "inf_run":
set_variable('inference_param', param, serialize_json=True)
state = run_dag("chunkflow_generator", wait_for_completion=True).state
elif current_task == "syn_run":
set_variable("synaptor_param.json", param, serialize_json=True)
state = run_dag("synaptor_sanity_check", wait_for_completion=True).state

if state != "success":
replyto(msg, "*Sanity check failed, abort!*")
break
Comment on lines +224 to +245
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 The refactoring in this block has introduced a bug. The call to `guess_run_type(param)` and the subsequent sanity check are now inside the `if i > 0:` block. This means for the first job of the batch (`i=0`), `current_task` will not be defined, leading to an `UnboundLocalError` later, and the sanity check will be skipped.

The logic for guessing the task type and running the sanity check should be executed for all batch jobs.

Suggested change
if i > 0:
if 'NAME' in param:
del param['NAME']
for k in p:
param[k] = p[k]
supply_default_param(param)
replyto(msg, "*Sanity check: batch job {} out of {}*".format(i+1, len(json_obj)))
state = "unknown"
current_task = guess_run_type(param)
if current_task == "seg_run":
set_variable('param', param, serialize_json=True)
state = run_dag("sanity_check", wait_for_completion=True).state
elif current_task == "inf_run":
set_variable('inference_param', param, serialize_json=True)
state = run_dag("chunkflow_generator", wait_for_completion=True).state
elif current_task == "syn_run":
set_variable("synaptor_param.json", param, serialize_json=True)
state = run_dag("synaptor_sanity_check", wait_for_completion=True).state
if state != "success":
replyto(msg, "*Sanity check failed, abort!*")
break
if i > 0:
if 'NAME' in param:
del param['NAME']
for k in p:
param[k] = p[k]
supply_default_param(param)
replyto(msg, "*Sanity check: batch job {} out of {}*".format(i+1, len(json_obj)))
state = "unknown"
current_task = guess_run_type(param)
if current_task == "seg_run":
set_variable('param', param, serialize_json=True)
state = run_dag("sanity_check", wait_for_completion=True).state
elif current_task == "inf_run":
set_variable('inference_param', param, serialize_json=True)
state = run_dag("chunkflow_generator", wait_for_completion=True).state
elif current_task == "syn_run":
set_variable("synaptor_param.json", param, serialize_json=True)
state = run_dag("synaptor_sanity_check", wait_for_completion=True).state
if state != "success":
replyto(msg, "*Sanity check failed, abort!*")
break

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. current_task still gets initialized on line 194. and guess_run_type(param) was also called inside the if i > 0 block on the main branch (line 230)...


is_last_job = (i == len(json_obj) - 1)
state = "unknown"
current_task = guess_run_type(param)
replyto(msg, "*Starting batch job {} out of {}*".format(i+1, len(json_obj)), broadcast=True)

if current_task == "seg_run":
set_variable('param', param, serialize_json=True)
state = run_dag("sanity_check", wait_for_completion=True).state
state = run_dag('segmentation', wait_for_completion=True).state
elif current_task == "inf_run":
set_variable('inference_param', param, serialize_json=True)
state = run_dag("chunkflow_generator", wait_for_completion=True).state
if is_batch and not is_last_job:
set_variable("batch_keep_cluster", "true")
else:
set_variable("batch_keep_cluster", "false")
state = run_dag("chunkflow_worker", wait_for_completion=True).state
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟢 Typo: "Bach" should be "Batch". ```suggestion replyto(msg, f"*Batch job failed, abort!* ({state})") ```

elif current_task == "syn_run":
set_variable("synaptor_param.json", param, serialize_json=True)
state = run_dag("synaptor_sanity_check", wait_for_completion=True).state
state = run_dag("synaptor", wait_for_completion=True).state

if state != "success":
replyto(msg, "*Sanity check failed, abort!*")
replyto(msg, f"*Bach job failed, abort!* ({state})")
break

state = "unknown"
replyto(msg, "*Starting batch job {} out of {}*".format(i+1, len(json_obj)), broadcast=True)

if current_task == "seg_run":
state = run_dag('segmentation', wait_for_completion=True).state
elif current_task == "inf_run":
state = run_dag("chunkflow_worker", wait_for_completion=True).state
elif current_task == "syn_run":
state = run_dag("synaptor", wait_for_completion=True).state

if state != "success":
replyto(msg, f"*Bach job failed, abort!* ({state})")
break
finally:
set_variable("batch_keep_cluster", "false")

replyto(msg, "*Batch process finished*")
Loading