Skip to content

feat(message): add Stoppable interface for graceful subscriber shutdown#678

Draft
SinuheTellez wants to merge 1 commit intoThreeDotsLabs:masterfrom
SinuheTellez:add-stoppable-interface
Draft

feat(message): add Stoppable interface for graceful subscriber shutdown#678
SinuheTellez wants to merge 1 commit intoThreeDotsLabs:masterfrom
SinuheTellez:add-stoppable-interface

Conversation

@SinuheTellez
Copy link
Copy Markdown

@SinuheTellez SinuheTellez commented May 4, 2026

Background/Description

During graceful shutdown the router calls subscriber.Close() immediately, which cancels in-flight message processing — messages that were mid-handler lose their chance to be acked or nacked. When the subscriber re-connects, those messages get redelivered and double-handled.

This adds a Stoppable interface (Stop() error) that subscribers can optionally implement. Stop() tells the subscriber to stop delivering new messages while keeping the connection alive so in-flight messages can still be acked or nacked.

The router's handleClose now checks if the subscriber implements Stoppable. If it does: call Stop(), wait for running handlers to finish (runningHandlersWg.Wait()), then call Close(). Non-stoppable subscribers behave exactly as before.

The originalSubscriber field was added to the handler struct because decorateHandlerSubscriber wraps the subscriber with MessageTransformSubscriberDecorator, which doesn't implement Stoppable. The type assertion checks the unwrapped subscriber.

Relates to #446

Type of Change

  • bug fix (non breaking)
  • New feature (non breaking)
  • Breaking change (fix or feature that may break current functionality)
  • Requires documentation change

Relates to ThreeDotsLabs#446

During graceful shutdown, the router now checks if a subscriber implements
the Stoppable interface. If so, it calls Stop() first to halt new message
delivery, waits for in-flight handlers to finish, then calls Close(). This
prevents double-handling caused by in-flight messages losing their chance
to be acked or nacked when the subscriber is closed immediately.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant