From b1db381dbadc1f50a3dbb23b4334c60da6d6937d Mon Sep 17 00:00:00 2001 From: jstrnbrg <34884458+jstrnbrg@users.noreply.github.com> Date: Tue, 17 Mar 2026 10:12:56 +0100 Subject: [PATCH] backend: fan out WebSocket events to multiple subscribers Replace the single backendEvents channel with an eventBroker that copies each event to all connected WebSocket clients. Previously, multiple clients would race to consume events from one channel, causing each to receive only a subset and rendering all but one client non-functional. This enables parallel development previews (e.g. browser + dev tool) without event loss. --- backend/bridgecommon/bridgecommon.go | 8 ++- backend/handlers/eventbroker.go | 82 ++++++++++++++++++++++++++++ backend/handlers/handlers.go | 34 +++++++----- 3 files changed, 109 insertions(+), 15 deletions(-) create mode 100644 backend/handlers/eventbroker.go diff --git a/backend/bridgecommon/bridgecommon.go b/backend/bridgecommon/bridgecommon.go index c99dbb9f87..cfe6cfe0eb 100644 --- a/backend/bridgecommon/bridgecommon.go +++ b/backend/bridgecommon/bridgecommon.go @@ -330,8 +330,9 @@ func Serve( globalHandlers = handlers.NewHandlers(globalBackend, handlers.NewConnectionData(-1, globalToken)) - events := globalHandlers.Events() + events, unsubscribe := globalHandlers.Events() go func() { + defer unsubscribe() for { select { case <-quitChan: @@ -340,7 +341,10 @@ func Serve( select { case <-quitChan: return - case event := <-events: + case event, ok := <-events: + if !ok { + return + } func() { mu.RLock() defer mu.RUnlock() diff --git a/backend/handlers/eventbroker.go b/backend/handlers/eventbroker.go new file mode 100644 index 0000000000..6d1352cf7b --- /dev/null +++ b/backend/handlers/eventbroker.go @@ -0,0 +1,82 @@ +// SPDX-License-Identifier: Apache-2.0 + +package handlers + +import "sync" + +const eventChannelBufferSize = 1000 + +// eventBroker fans out events to multiple subscribers. Each subscriber gets its own buffered +// channel. If a subscriber's channel is full, events are dropped for that subscriber to avoid +// blocking other subscribers or the publisher. +// +// Events published before any subscriber connects are buffered (up to eventChannelBufferSize) and +// replayed to the first subscriber that connects. This prevents a race where events (e.g. +// auth-result) are lost because the WebSocket client hasn't connected yet. +type eventBroker struct { + mu sync.Mutex + subscribers map[int]chan interface{} + nextID int + // earlyEvents buffers events published before the first subscriber connects. Once the first + // subscriber is added, this slice is drained into its channel and set to nil. + earlyEvents []interface{} +} + +func newEventBroker() *eventBroker { + return &eventBroker{ + subscribers: make(map[int]chan interface{}), + earlyEvents: make([]interface{}, 0), + } +} + +// subscribe registers a new subscriber and returns an ID (for unsubscribe) and a channel to +// receive events on. +func (b *eventBroker) subscribe() (int, <-chan interface{}) { + b.mu.Lock() + defer b.mu.Unlock() + id := b.nextID + b.nextID++ + ch := make(chan interface{}, eventChannelBufferSize) + // Replay early events to the first subscriber, then discard the buffer. + if b.earlyEvents != nil { + for _, event := range b.earlyEvents { + select { + case ch <- event: + default: + } + } + b.earlyEvents = nil + } + b.subscribers[id] = ch + return id, ch +} + +// unsubscribe removes a subscriber and closes its channel. +func (b *eventBroker) unsubscribe(id int) { + b.mu.Lock() + defer b.mu.Unlock() + if ch, ok := b.subscribers[id]; ok { + delete(b.subscribers, id) + close(ch) + } +} + +// publish sends an event to all subscribers. Non-blocking: if a subscriber's buffer is full, the +// event is dropped for that subscriber. +func (b *eventBroker) publish(event interface{}) { + b.mu.Lock() + defer b.mu.Unlock() + if len(b.subscribers) == 0 && b.earlyEvents != nil { + if len(b.earlyEvents) < eventChannelBufferSize { + b.earlyEvents = append(b.earlyEvents, event) + } + return + } + for _, ch := range b.subscribers { + select { + case ch <- event: + default: + // Drop event for slow subscriber to avoid blocking others. + } + } +} diff --git a/backend/handlers/handlers.go b/backend/handlers/handlers.go index 459ff9bcb2..a8e13d93f7 100644 --- a/backend/handlers/handlers.go +++ b/backend/handlers/handlers.go @@ -118,7 +118,7 @@ type Handlers struct { // backend to secure the API call. The data is fed into the static javascript app // that is served, so the client knows where and how to connect to. apiData *ConnectionData - backendEvents chan interface{} + eventBroker *eventBroker websocketUpgrader websocket.Upgrader log *logrus.Entry } @@ -152,10 +152,10 @@ func NewHandlers( log := logging.Get().WithGroup("handlers") router := mux.NewRouter() handlers := &Handlers{ - Router: router, - backend: backend, - apiData: connData, - backendEvents: make(chan interface{}, 1000), + Router: router, + backend: backend, + apiData: connData, + eventBroker: newEventBroker(), websocketUpgrader: websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 1024, @@ -336,21 +336,24 @@ func NewHandlers( // The backend relays events in two ways: // a) old school through the channel returned by Start() // b) new school via observable. - // Merge both. + // Merge both and fan out to all subscribers via the event broker. events := backend.Start() go func() { - for { - handlers.backendEvents <- <-events + for event := range events { + handlers.eventBroker.publish(event) } }() - backend.Observe(func(event observable.Event) { handlers.backendEvents <- event }) + backend.Observe(func(event observable.Event) { handlers.eventBroker.publish(event) }) return handlers } -// Events returns the push notifications channel. -func (handlers *Handlers) Events() <-chan interface{} { - return handlers.backendEvents +// Events returns a push notifications channel and a cleanup function. Each call creates a new +// subscription, so every caller receives all events independently. The caller must call the +// returned function to unsubscribe when done. +func (handlers *Handlers) Events() (<-chan interface{}, func()) { + id, ch := handlers.eventBroker.subscribe() + return ch, func() { handlers.eventBroker.unsubscribe(id) } } func writeJSON(w io.Writer, value interface{}) { @@ -1046,8 +1049,10 @@ func (handlers *Handlers) eventsHandler(w http.ResponseWriter, r *http.Request) panic(err) } + events, unsubscribe := handlers.Events() sendChan, quitChan := runWebsocket(conn, handlers.apiData, handlers.log) go func() { + defer unsubscribe() for { select { case <-quitChan: @@ -1056,7 +1061,10 @@ func (handlers *Handlers) eventsHandler(w http.ResponseWriter, r *http.Request) select { case <-quitChan: return - case event := <-handlers.backendEvents: + case event, ok := <-events: + if !ok { + return + } sendChan <- jsonp.MustMarshal(event) } }