Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions backend/bridgecommon/bridgecommon.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,8 +330,9 @@ func Serve(
globalHandlers = handlers.NewHandlers(globalBackend,
handlers.NewConnectionData(-1, globalToken))

events := globalHandlers.Events()
events, unsubscribe := globalHandlers.Events()
go func() {
defer unsubscribe()
for {
select {
case <-quitChan:
Expand All @@ -340,7 +341,10 @@ func Serve(
select {
case <-quitChan:
return
case event := <-events:
case event, ok := <-events:
if !ok {
return
}
func() {
mu.RLock()
defer mu.RUnlock()
Expand Down
82 changes: 82 additions & 0 deletions backend/handlers/eventbroker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// SPDX-License-Identifier: Apache-2.0

package handlers

import "sync"

const eventChannelBufferSize = 1000

// eventBroker fans out events to multiple subscribers. Each subscriber gets its own buffered
// channel. If a subscriber's channel is full, events are dropped for that subscriber to avoid
// blocking other subscribers or the publisher.
Comment on lines +10 to +11
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't check in detail, but dropping events does not sound good. 1000 is an arbitrary cut-off, can't guarantee events won't be dropped in the released app.

//
// Events published before any subscriber connects are buffered (up to eventChannelBufferSize) and
// replayed to the first subscriber that connects. This prevents a race where events (e.g.
// auth-result) are lost because the WebSocket client hasn't connected yet.
type eventBroker struct {
mu sync.Mutex
subscribers map[int]chan interface{}
nextID int
// earlyEvents buffers events published before the first subscriber connects. Once the first
// subscriber is added, this slice is drained into its channel and set to nil.
earlyEvents []interface{}
}

func newEventBroker() *eventBroker {
return &eventBroker{
subscribers: make(map[int]chan interface{}),
earlyEvents: make([]interface{}, 0),
}
}

// subscribe registers a new subscriber and returns an ID (for unsubscribe) and a channel to
// receive events on.
func (b *eventBroker) subscribe() (int, <-chan interface{}) {
b.mu.Lock()
defer b.mu.Unlock()
id := b.nextID
b.nextID++
ch := make(chan interface{}, eventChannelBufferSize)
// Replay early events to the first subscriber, then discard the buffer.
if b.earlyEvents != nil {
for _, event := range b.earlyEvents {
select {
case ch <- event:
default:
}
}
b.earlyEvents = nil
}
b.subscribers[id] = ch
return id, ch
}

// unsubscribe removes a subscriber and closes its channel.
func (b *eventBroker) unsubscribe(id int) {
b.mu.Lock()
defer b.mu.Unlock()
if ch, ok := b.subscribers[id]; ok {
delete(b.subscribers, id)
close(ch)
}
}

// publish sends an event to all subscribers. Non-blocking: if a subscriber's buffer is full, the
// event is dropped for that subscriber.
func (b *eventBroker) publish(event interface{}) {
b.mu.Lock()
defer b.mu.Unlock()
if len(b.subscribers) == 0 && b.earlyEvents != nil {
if len(b.earlyEvents) < eventChannelBufferSize {
b.earlyEvents = append(b.earlyEvents, event)
}
return
}
for _, ch := range b.subscribers {
select {
case ch <- event:
default:
// Drop event for slow subscriber to avoid blocking others.
}
}
}
34 changes: 21 additions & 13 deletions backend/handlers/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ type Handlers struct {
// backend to secure the API call. The data is fed into the static javascript app
// that is served, so the client knows where and how to connect to.
apiData *ConnectionData
backendEvents chan interface{}
eventBroker *eventBroker
websocketUpgrader websocket.Upgrader
log *logrus.Entry
}
Expand Down Expand Up @@ -152,10 +152,10 @@ func NewHandlers(
log := logging.Get().WithGroup("handlers")
router := mux.NewRouter()
handlers := &Handlers{
Router: router,
backend: backend,
apiData: connData,
backendEvents: make(chan interface{}, 1000),
Router: router,
backend: backend,
apiData: connData,
eventBroker: newEventBroker(),
websocketUpgrader: websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
Expand Down Expand Up @@ -336,21 +336,24 @@ func NewHandlers(
// The backend relays events in two ways:
// a) old school through the channel returned by Start()
// b) new school via observable.
// Merge both.
// Merge both and fan out to all subscribers via the event broker.
events := backend.Start()
go func() {
for {
handlers.backendEvents <- <-events
for event := range events {
handlers.eventBroker.publish(event)
}
}()
backend.Observe(func(event observable.Event) { handlers.backendEvents <- event })
backend.Observe(func(event observable.Event) { handlers.eventBroker.publish(event) })

return handlers
}

// Events returns the push notifications channel.
func (handlers *Handlers) Events() <-chan interface{} {
return handlers.backendEvents
// Events returns a push notifications channel and a cleanup function. Each call creates a new
// subscription, so every caller receives all events independently. The caller must call the
// returned function to unsubscribe when done.
func (handlers *Handlers) Events() (<-chan interface{}, func()) {
id, ch := handlers.eventBroker.subscribe()
return ch, func() { handlers.eventBroker.unsubscribe(id) }
}

func writeJSON(w io.Writer, value interface{}) {
Expand Down Expand Up @@ -1046,8 +1049,10 @@ func (handlers *Handlers) eventsHandler(w http.ResponseWriter, r *http.Request)
panic(err)
}

events, unsubscribe := handlers.Events()
sendChan, quitChan := runWebsocket(conn, handlers.apiData, handlers.log)
go func() {
defer unsubscribe()
for {
select {
case <-quitChan:
Expand All @@ -1056,7 +1061,10 @@ func (handlers *Handlers) eventsHandler(w http.ResponseWriter, r *http.Request)
select {
case <-quitChan:
return
case event := <-handlers.backendEvents:
case event, ok := <-events:
if !ok {
return
}
sendChan <- jsonp.MustMarshal(event)
}
}
Expand Down
Loading