Skip to content

Conversation

@nkemnitz
Copy link

@nkemnitz nkemnitz commented Feb 8, 2026

No description provided.

@nkemnitz nkemnitz requested a review from ranlu February 8, 2026 11:03
@nkemnitz
Copy link
Author

nkemnitz commented Feb 8, 2026

The diff in pipeline_commands looks a bit confusing due to the try-finally indentation.
The PR adds a check whether we are running a batch (len(json_obj) > 1) and set the new batch_keep_cluster variable depending on is_last_job of the inf_run. If true, we skip the entire ramp down + 60s sleep. And if the cluster is already equal or larger than our target size during ramp up, we can also skip the 60s sleep there.

Other commit is a fix to allow loading images from Google Artifact Registry

@ranlu
Copy link

ranlu commented Feb 8, 2026

@gemini-cli /review

@github-actions
Copy link

github-actions bot commented Feb 8, 2026

🤖 Hi @ranlu, I've received your request, and I'm working on it now! You can track my progress in the logs for more details.

1 similar comment
@github-actions
Copy link

github-actions bot commented Feb 8, 2026

🤖 Hi @ranlu, I've received your request, and I'm working on it now! You can track my progress in the logs for more details.

@github-actions
Copy link

github-actions bot commented Feb 8, 2026

🤖 I'm sorry @ranlu, but I was unable to process your request. Please see the logs for more details.

@ranlu
Copy link

ranlu commented Feb 8, 2026

@gemini-cli /review

@github-actions
Copy link

github-actions bot commented Feb 8, 2026

🤖 Hi @ranlu, I've received your request, and I'm working on it now! You can track my progress in the logs for more details.

Copy link

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

## 📋 Review Summary

This pull request aims to optimize batch processing by skipping cluster ramp-up/down between jobs. The changes in cloud/google/common.py and dags/google_api_helper.py seem correct and beneficial.

However, the refactoring in slackbot/pipeline_commands.py introduces a critical bug that will cause batch jobs to fail. I've left a comment with a detailed explanation and a suggested fix.

🔍 General Feedback

  • The optimization to avoid sleeping during cluster ramp-up if the target size is already met is a good addition.
  • Please address the critical issue in slackbot/pipeline_commands.py to ensure the batch processing works as expected.

Comment on lines +224 to +245
if i > 0:
if 'NAME' in param:
del param['NAME']
for k in p:
param[k] = p[k]
supply_default_param(param)
replyto(msg, "*Sanity check: batch job {} out of {}*".format(i+1, len(json_obj)))
state = "unknown"
current_task = guess_run_type(param)
if current_task == "seg_run":
set_variable('param', param, serialize_json=True)
state = run_dag("sanity_check", wait_for_completion=True).state
elif current_task == "inf_run":
set_variable('inference_param', param, serialize_json=True)
state = run_dag("chunkflow_generator", wait_for_completion=True).state
elif current_task == "syn_run":
set_variable("synaptor_param.json", param, serialize_json=True)
state = run_dag("synaptor_sanity_check", wait_for_completion=True).state

if state != "success":
replyto(msg, "*Sanity check failed, abort!*")
break
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 The refactoring in this block has introduced a bug. The call to `guess_run_type(param)` and the subsequent sanity check are now inside the `if i > 0:` block. This means for the first job of the batch (`i=0`), `current_task` will not be defined, leading to an `UnboundLocalError` later, and the sanity check will be skipped.

The logic for guessing the task type and running the sanity check should be executed for all batch jobs.

Suggested change
if i > 0:
if 'NAME' in param:
del param['NAME']
for k in p:
param[k] = p[k]
supply_default_param(param)
replyto(msg, "*Sanity check: batch job {} out of {}*".format(i+1, len(json_obj)))
state = "unknown"
current_task = guess_run_type(param)
if current_task == "seg_run":
set_variable('param', param, serialize_json=True)
state = run_dag("sanity_check", wait_for_completion=True).state
elif current_task == "inf_run":
set_variable('inference_param', param, serialize_json=True)
state = run_dag("chunkflow_generator", wait_for_completion=True).state
elif current_task == "syn_run":
set_variable("synaptor_param.json", param, serialize_json=True)
state = run_dag("synaptor_sanity_check", wait_for_completion=True).state
if state != "success":
replyto(msg, "*Sanity check failed, abort!*")
break
if i > 0:
if 'NAME' in param:
del param['NAME']
for k in p:
param[k] = p[k]
supply_default_param(param)
replyto(msg, "*Sanity check: batch job {} out of {}*".format(i+1, len(json_obj)))
state = "unknown"
current_task = guess_run_type(param)
if current_task == "seg_run":
set_variable('param', param, serialize_json=True)
state = run_dag("sanity_check", wait_for_completion=True).state
elif current_task == "inf_run":
set_variable('inference_param', param, serialize_json=True)
state = run_dag("chunkflow_generator", wait_for_completion=True).state
elif current_task == "syn_run":
set_variable("synaptor_param.json", param, serialize_json=True)
state = run_dag("synaptor_sanity_check", wait_for_completion=True).state
if state != "success":
replyto(msg, "*Sanity check failed, abort!*")
break

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. current_task still gets initialized on line 194. and guess_run_type(param) was also called inside the if i > 0 block on the main branch (line 230)...

set_variable("batch_keep_cluster", "true")
else:
set_variable("batch_keep_cluster", "false")
state = run_dag("chunkflow_worker", wait_for_completion=True).state
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟢 Typo: "Bach" should be "Batch". ```suggestion replyto(msg, f"*Batch job failed, abort!* ({state})") ```

@ranlu
Copy link

ranlu commented Feb 8, 2026

@nkemnitz, I feel it is probably more nature to implement this as an extension of easyseg command: for example adding an easyscan command to allow scanning through a predefined set of parameters. I think the main use case of this is to experiment on small volume, easyseg was introduced for that so the tracers can verify their work.
It limit the number of workers to 1 and running all types of tasks on it, also automatically shutdown the instance when it is idle. This will require some design of the interface though, the interface of update parameters and run pipeline is too generic, I want to have something more specific . One option is to read a python script that return the name of the parameter to be scanned and a function to generate the values.

@ranlu
Copy link

ranlu commented Feb 8, 2026

@nkemnitz , can you make a separate PR for the google artifact fix, thanks. In the future I think you can separate the try final change to a separate commit, making it easier to review the commits at least.

@nkemnitz nkemnitz force-pushed the nkem/batch-keep-cluster branch from 31e48af to 5901a1f Compare February 12, 2026 09:12
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants