diff --git a/backend/bridgecommon/bridgecommon.go b/backend/bridgecommon/bridgecommon.go index c99dbb9f87..cfe6cfe0eb 100644 --- a/backend/bridgecommon/bridgecommon.go +++ b/backend/bridgecommon/bridgecommon.go @@ -330,8 +330,9 @@ func Serve( globalHandlers = handlers.NewHandlers(globalBackend, handlers.NewConnectionData(-1, globalToken)) - events := globalHandlers.Events() + events, unsubscribe := globalHandlers.Events() go func() { + defer unsubscribe() for { select { case <-quitChan: @@ -340,7 +341,10 @@ func Serve( select { case <-quitChan: return - case event := <-events: + case event, ok := <-events: + if !ok { + return + } func() { mu.RLock() defer mu.RUnlock() diff --git a/backend/handlers/eventbroker.go b/backend/handlers/eventbroker.go new file mode 100644 index 0000000000..6d1352cf7b --- /dev/null +++ b/backend/handlers/eventbroker.go @@ -0,0 +1,82 @@ +// SPDX-License-Identifier: Apache-2.0 + +package handlers + +import "sync" + +const eventChannelBufferSize = 1000 + +// eventBroker fans out events to multiple subscribers. Each subscriber gets its own buffered +// channel. If a subscriber's channel is full, events are dropped for that subscriber to avoid +// blocking other subscribers or the publisher. +// +// Events published before any subscriber connects are buffered (up to eventChannelBufferSize) and +// replayed to the first subscriber that connects. This prevents a race where events (e.g. +// auth-result) are lost because the WebSocket client hasn't connected yet. +type eventBroker struct { + mu sync.Mutex + subscribers map[int]chan interface{} + nextID int + // earlyEvents buffers events published before the first subscriber connects. Once the first + // subscriber is added, this slice is drained into its channel and set to nil. + earlyEvents []interface{} +} + +func newEventBroker() *eventBroker { + return &eventBroker{ + subscribers: make(map[int]chan interface{}), + earlyEvents: make([]interface{}, 0), + } +} + +// subscribe registers a new subscriber and returns an ID (for unsubscribe) and a channel to +// receive events on. +func (b *eventBroker) subscribe() (int, <-chan interface{}) { + b.mu.Lock() + defer b.mu.Unlock() + id := b.nextID + b.nextID++ + ch := make(chan interface{}, eventChannelBufferSize) + // Replay early events to the first subscriber, then discard the buffer. + if b.earlyEvents != nil { + for _, event := range b.earlyEvents { + select { + case ch <- event: + default: + } + } + b.earlyEvents = nil + } + b.subscribers[id] = ch + return id, ch +} + +// unsubscribe removes a subscriber and closes its channel. +func (b *eventBroker) unsubscribe(id int) { + b.mu.Lock() + defer b.mu.Unlock() + if ch, ok := b.subscribers[id]; ok { + delete(b.subscribers, id) + close(ch) + } +} + +// publish sends an event to all subscribers. Non-blocking: if a subscriber's buffer is full, the +// event is dropped for that subscriber. +func (b *eventBroker) publish(event interface{}) { + b.mu.Lock() + defer b.mu.Unlock() + if len(b.subscribers) == 0 && b.earlyEvents != nil { + if len(b.earlyEvents) < eventChannelBufferSize { + b.earlyEvents = append(b.earlyEvents, event) + } + return + } + for _, ch := range b.subscribers { + select { + case ch <- event: + default: + // Drop event for slow subscriber to avoid blocking others. + } + } +} diff --git a/backend/handlers/handlers.go b/backend/handlers/handlers.go index 459ff9bcb2..a8e13d93f7 100644 --- a/backend/handlers/handlers.go +++ b/backend/handlers/handlers.go @@ -118,7 +118,7 @@ type Handlers struct { // backend to secure the API call. The data is fed into the static javascript app // that is served, so the client knows where and how to connect to. apiData *ConnectionData - backendEvents chan interface{} + eventBroker *eventBroker websocketUpgrader websocket.Upgrader log *logrus.Entry } @@ -152,10 +152,10 @@ func NewHandlers( log := logging.Get().WithGroup("handlers") router := mux.NewRouter() handlers := &Handlers{ - Router: router, - backend: backend, - apiData: connData, - backendEvents: make(chan interface{}, 1000), + Router: router, + backend: backend, + apiData: connData, + eventBroker: newEventBroker(), websocketUpgrader: websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 1024, @@ -336,21 +336,24 @@ func NewHandlers( // The backend relays events in two ways: // a) old school through the channel returned by Start() // b) new school via observable. - // Merge both. + // Merge both and fan out to all subscribers via the event broker. events := backend.Start() go func() { - for { - handlers.backendEvents <- <-events + for event := range events { + handlers.eventBroker.publish(event) } }() - backend.Observe(func(event observable.Event) { handlers.backendEvents <- event }) + backend.Observe(func(event observable.Event) { handlers.eventBroker.publish(event) }) return handlers } -// Events returns the push notifications channel. -func (handlers *Handlers) Events() <-chan interface{} { - return handlers.backendEvents +// Events returns a push notifications channel and a cleanup function. Each call creates a new +// subscription, so every caller receives all events independently. The caller must call the +// returned function to unsubscribe when done. +func (handlers *Handlers) Events() (<-chan interface{}, func()) { + id, ch := handlers.eventBroker.subscribe() + return ch, func() { handlers.eventBroker.unsubscribe(id) } } func writeJSON(w io.Writer, value interface{}) { @@ -1046,8 +1049,10 @@ func (handlers *Handlers) eventsHandler(w http.ResponseWriter, r *http.Request) panic(err) } + events, unsubscribe := handlers.Events() sendChan, quitChan := runWebsocket(conn, handlers.apiData, handlers.log) go func() { + defer unsubscribe() for { select { case <-quitChan: @@ -1056,7 +1061,10 @@ func (handlers *Handlers) eventsHandler(w http.ResponseWriter, r *http.Request) select { case <-quitChan: return - case event := <-handlers.backendEvents: + case event, ok := <-events: + if !ok { + return + } sendChan <- jsonp.MustMarshal(event) } }