Skip to content

Shutdown refactor#2368

Open
yuandrew wants to merge 18 commits into
mainfrom
shutdown-refactor
Open

Shutdown refactor#2368
yuandrew wants to merge 18 commits into
mainfrom
shutdown-refactor

Conversation

@yuandrew
Copy link
Copy Markdown
Contributor

@yuandrew yuandrew commented May 26, 2026

non-fork version of #2261, so cloud tests can run

What was changed

Refactored worker shutdown to use a two-stage approach: pollers shut down first, then the task dispatcher drains any remaining tasks before exiting. This ensures tasks polled during shutdown are processed rather than silently dropped.

Key changes:

  • Added pollerWG to baseWorker to track poller goroutines separately from the global stopWG
  • A closer goroutine waits for all pollers to finish, then closes taskQueueCh
  • runTaskDispatcher now ranges over taskQueueCh
  • pollTask always sends to taskQueueCh
  • Removed the 5s timeout hack in doPoll from Fix test flakes #2253 (no longer needed)

Why?

PR #2199 changed shutdown to let the server complete in-flight polls, instead of cancelling them. This exposed a pre-existing race when a poller receives a task during shutdown, Go would silently dropping the task. The dispatcher had the same issue — it could exit on stopCh before reading pending tasks from the channel.

This aligns the Go SDK's shutdown with how Core SDK handles it:

  1. Set a flag so pollers stop polling after their current attempt
  2. Close channels from pollers → task processing
  3. Wait for all in-flight tasks to complete & channels to be empty

Checklist

  1. Closes Drain polled tasks on shutdown #1197

  2. How was this tested:

    • New unit test TestTaskNotDroppedDuringShutdown — verifies a task polled during shutdown is processed, not dropped
    • Existing TestDoPollGracefulShutdown — validates both graceful and legacy poll completion
  3. Any docs updates needed?
    No


Note

Medium Risk
Changes core worker stop/poll/dispatch paths and ShutdownWorker behavior for session queues; mitigated by extensive unit/integration tests and preserved legacy shutdown when the capability is disabled.

Overview
Refactors graceful worker shutdown so tasks polled during stop are drained and processed instead of dropped when the server supports worker_poll_complete_on_shutdown.

baseWorker now shuts down in two stages: pollers exit first (pollerWG), a goroutine closes taskQueueCh, and the task dispatcher ranges the channel in drain mode—still honoring dispatch rate limits via a separate taskLimiterContext, with a path to release unprocessed tasks if the limiter is canceled. Pollers block-send tasks to the dispatcher during drain (no stopCh drop). Legacy shutdown (capability off/nil) keeps immediate poll cancel and early dispatcher exit.

Pollers (doPoll, ProcessTask on workflow/activity/nexus) use shouldDrainOnShutdown() to finish in-flight polls without the old 5s wait hack and to keep processing tasks after stop when draining.

AggregatedWorker sends ShutdownWorker for session creation/activity queues, sets noRepoll on session workers, and removes forcing session workers onto legacy-only shutdown.

Tests/CI: broad unit coverage (drain vs legacy, rate limits, stop timeout); integration tests for active timer/activity shutdown and session poll-complete; cloud CI runs TestShutdownDuringActiveTimerActivityWorkflows; local dev server enables frontend.enableCancelWorkerPollsOnShutdown.

Reviewed by Cursor Bugbot for commit 428608a. Bugbot is set up for automated code reviews on this repo. Configure here.

@yuandrew yuandrew requested a review from a team as a code owner May 26, 2026 20:32

aw.sendShutdownWorkerRPCForTaskQueue(
grpcCtx,
aw.sessionWorker.creationWorker.executionParameters.TaskQueue,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Consider having the session worker report its task queues via a func so aggregate worker doesn't have to know what the structure of the session worker is.

aw.nexusWorker.worker.noRepoll.Store(true)
}
if !util.IsInterfaceNil(aw.sessionWorker) {
aw.sessionWorker.creationWorker.worker.noRepoll.Store(true)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Add a func to session worker to stop polling so that aggregate worker doesn't have to know about the details of the session worker.

if aw.client.eagerDispatcher != nil {
aw.client.eagerDispatcher.deregisterWorker(aw.workflowWorker)
}
aw.workflowWorker.Stop()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With more work happening in the stops for each worker, consider parallelizing these calls on each of the workers.


service.EXPECT().ShutdownWorker(gomock.Any(), gomock.Any(), gomock.Any()).
Return(&workflowservice.ShutdownWorkerResponse{}, nil).Times(1)
Return(&workflowservice.ShutdownWorkerResponse{}, nil).Times(3)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This and the below Times(3) feel like magic constants. Maybe move them to a constant and explain why this value was chosen or (if we can) calculate what it should be.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Drain polled tasks on shutdown

2 participants