From 73b0ac7d3a3f3edeb1a12b9d1468c7a20e129d09 Mon Sep 17 00:00:00 2001 From: hi117 Date: Mon, 21 Jul 2025 14:11:22 -0400 Subject: [PATCH 1/4] Remove unused type --- pgxlisten_test.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/pgxlisten_test.go b/pgxlisten_test.go index 78bd8e3..62f7631 100644 --- a/pgxlisten_test.go +++ b/pgxlisten_test.go @@ -178,10 +178,6 @@ create table pgxlisten_test (id int primary key generated by default as identity // No way to know when Listener is ready so wait a little. time.Sleep(2 * time.Second) - type notificationTest struct { - payload string - } - notificationMsgs := []string{"d", "e"} // Send all notifications. From 01424de5a7bebdbd316dd7edba36eaf2a9ed50ac Mon Sep 17 00:00:00 2001 From: hi117 Date: Mon, 21 Jul 2025 14:12:14 -0400 Subject: [PATCH 2/4] Add a unhandle function --- pgxlisten.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/pgxlisten.go b/pgxlisten.go index 5b8d1f0..6f66e6d 100644 --- a/pgxlisten.go +++ b/pgxlisten.go @@ -39,6 +39,19 @@ func (l *Listener) Handle(channel string, handler Handler) { l.handlers[channel] = handler } +// Unhandle removes the handler for notifications sent to channel. +func (l *Listener) Unhandle(channel string) bool { + if l.handlers == nil { + return false + } + _, ok := l.handlers[channel] + if !ok { + return false + } + delete(l.handlers, channel) + return true +} + // Listen listens for and handles notifications. It will only return when ctx is cancelled or a fatal error occurs. // Because Listen is intended to continue running even when there is a network or database outage most errors are not // considered fatal. For example, if connecting to the database fails it will wait a while and try to reconnect. From 6af96c12ac8a00164cb9ea84b5ddaa6086b018d2 Mon Sep 17 00:00:00 2001 From: hi117 Date: Mon, 21 Jul 2025 14:46:41 -0400 Subject: [PATCH 3/4] Implement listening after start --- pgxlisten.go | 85 +++++++++++++++++++++++++++++++++++------------ pgxlisten_test.go | 6 ++-- 2 files changed, 66 insertions(+), 25 deletions(-) diff --git a/pgxlisten.go b/pgxlisten.go index 6f66e6d..e47710f 100644 --- a/pgxlisten.go +++ b/pgxlisten.go @@ -5,6 +5,7 @@ import ( "context" "errors" "fmt" + "sync" "time" "github.com/jackc/pgx/v5" @@ -27,29 +28,55 @@ type Listener struct { // is lost. If set to 0, the default of 1 minute is used. A negative value disables the timeout entirely. ReconnectDelay time.Duration - handlers map[string]Handler + handlers map[string]Handler + mux sync.RWMutex + hasStarted bool + conn *pgx.Conn } // Handle sets the handler for notifications sent to channel. -func (l *Listener) Handle(channel string, handler Handler) { +func (l *Listener) Handle(ctx context.Context, channel string, handler Handler) error { + l.mux.Lock() + defer l.mux.Unlock() if l.handlers == nil { l.handlers = make(map[string]Handler) } + _, ok := l.handlers[channel] l.handlers[channel] = handler + if l.hasStarted { + var err error + if ok { + // We are changing handlers, don't need to read backlog + // TODO: Maybe just error in this case? + _, err = l.conn.Exec(ctx, "listen "+pgx.Identifier{channel}.Sanitize()) + } else { + err = l.listenandbacklog(ctx, channel, handler) + } + return err + } + return nil } // Unhandle removes the handler for notifications sent to channel. -func (l *Listener) Unhandle(channel string) bool { +func (l *Listener) Unhandle(ctx context.Context, channel string) (bool, error) { + l.mux.Lock() + defer l.mux.Unlock() if l.handlers == nil { - return false + return false, nil } _, ok := l.handlers[channel] if !ok { - return false + return false, nil } delete(l.handlers, channel) - return true + if l.hasStarted { + _, err := l.conn.Exec(ctx, "unlisten "+pgx.Identifier{channel}.Sanitize()) + if err != nil { + return true, err + } + } + return true, nil } // Listen listens for and handles notifications. It will only return when ctx is cancelled or a fatal error occurs. @@ -93,24 +120,34 @@ func (l *Listener) Listen(ctx context.Context) error { } } +func (l *Listener) listenandbacklog(ctx context.Context, channel string, handler Handler) error { + _, err := l.conn.Exec(ctx, "listen "+pgx.Identifier{channel}.Sanitize()) + if err != nil { + return fmt.Errorf("listen %q: %w", channel, err) + } + + if backlogHandler, ok := handler.(BacklogHandler); ok { + err := backlogHandler.HandleBacklog(ctx, channel, l.conn) + if err != nil { + l.logError(ctx, fmt.Errorf("handle backlog %q: %w", channel, err)) + } + } + return nil +} + func (l *Listener) listen(ctx context.Context) error { conn, err := l.Connect(ctx) if err != nil { return fmt.Errorf("connect: %w", err) } defer conn.Close(ctx) + l.hasStarted = true + l.conn = conn for channel, handler := range l.handlers { - _, err := conn.Exec(ctx, "listen "+pgx.Identifier{channel}.Sanitize()) + err = l.listenandbacklog(ctx, channel, handler) if err != nil { - return fmt.Errorf("listen %q: %w", channel, err) - } - - if backlogHandler, ok := handler.(BacklogHandler); ok { - err := backlogHandler.HandleBacklog(ctx, channel, conn) - if err != nil { - l.logError(ctx, fmt.Errorf("handle backlog %q: %w", channel, err)) - } + return err } } @@ -120,14 +157,18 @@ func (l *Listener) listen(ctx context.Context) error { return fmt.Errorf("waiting for notification: %w", err) } - if handler, ok := l.handlers[notification.Channel]; ok { - err := handler.HandleNotification(ctx, notification, conn) - if err != nil { - l.logError(ctx, fmt.Errorf("handle %s notification: %w", notification.Channel, err)) + func() { + l.mux.RLock() + defer l.mux.RUnlock() + if handler, ok := l.handlers[notification.Channel]; ok { + err := handler.HandleNotification(ctx, notification, conn) + if err != nil { + l.logError(ctx, fmt.Errorf("handle %s notification: %w", notification.Channel, err)) + } + } else { + l.logError(ctx, fmt.Errorf("missing handler: %s", notification.Channel)) } - } else { - l.logError(ctx, fmt.Errorf("missing handler: %s", notification.Channel)) - } + }() } } diff --git a/pgxlisten_test.go b/pgxlisten_test.go index 62f7631..e418a73 100644 --- a/pgxlisten_test.go +++ b/pgxlisten_test.go @@ -29,7 +29,7 @@ func TestListenerListenDispatchesNotifications(t *testing.T) { fooChan := make(chan *pgconn.Notification) barChan := make(chan *pgconn.Notification) - listener.Handle("foo", pgxlisten.HandlerFunc(func(ctx context.Context, notification *pgconn.Notification, conn *pgx.Conn) error { + listener.Handle(ctx, "foo", pgxlisten.HandlerFunc(func(ctx context.Context, notification *pgconn.Notification, conn *pgx.Conn) error { select { case fooChan <- notification: case <-ctx.Done(): @@ -37,7 +37,7 @@ func TestListenerListenDispatchesNotifications(t *testing.T) { return nil })) - listener.Handle("bar", pgxlisten.HandlerFunc(func(ctx context.Context, notification *pgconn.Notification, conn *pgx.Conn) error { + listener.Handle(ctx, "bar", pgxlisten.HandlerFunc(func(ctx context.Context, notification *pgconn.Notification, conn *pgx.Conn) error { select { case barChan <- notification: case <-ctx.Done(): @@ -164,7 +164,7 @@ create table pgxlisten_test (id int primary key generated by default as identity ch: fooChan, } - listener.Handle("foo", handler) + listener.Handle(ctx, "foo", handler) listenerCtx, listenerCtxCancel := context.WithCancel(ctx) defer listenerCtxCancel() From e0637f62d4853b8f5ec5e72a83d4a7589c229bfb Mon Sep 17 00:00:00 2001 From: hi117 Date: Mon, 21 Jul 2025 18:53:11 -0400 Subject: [PATCH 4/4] Fix bug, we don't need to listen when just changing handlers --- pgxlisten.go | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/pgxlisten.go b/pgxlisten.go index e47710f..16b9f00 100644 --- a/pgxlisten.go +++ b/pgxlisten.go @@ -44,15 +44,8 @@ func (l *Listener) Handle(ctx context.Context, channel string, handler Handler) _, ok := l.handlers[channel] l.handlers[channel] = handler - if l.hasStarted { - var err error - if ok { - // We are changing handlers, don't need to read backlog - // TODO: Maybe just error in this case? - _, err = l.conn.Exec(ctx, "listen "+pgx.Identifier{channel}.Sanitize()) - } else { - err = l.listenandbacklog(ctx, channel, handler) - } + if l.hasStarted && !ok { + err := l.listenandbacklog(ctx, channel, handler) return err } return nil