Skip to content

Add distributed notifications via PostgreSQL LISTEN/NOTIFY#27

Open
roeierez wants to merge 7 commits intomainfrom
claude/distributed-notification-queue-GQp2Z
Open

Add distributed notifications via PostgreSQL LISTEN/NOTIFY#27
roeierez wants to merge 7 commits intomainfrom
claude/distributed-notification-queue-GQp2Z

Conversation

@roeierez
Copy link
Member

@roeierez roeierez commented Mar 1, 2026

Summary

This PR introduces a distributed notification system that enables multi-instance deployments to broadcast data sync changes across all instances using PostgreSQL's LISTEN/NOTIFY mechanism, while maintaining backward compatibility with single-instance SQLite deployments.

Key Changes

  • New notifier package with abstraction layer for change notifications:

    • ChangeNotifier interface supporting both local and distributed notifications
    • LocalNotifier for single-instance deployments (SQLite or local-only mode)
    • PGNotifier for multi-instance deployments using PostgreSQL LISTEN/NOTIFY
  • PostgreSQL LISTEN/NOTIFY implementation:

    • Uses a dedicated direct database connection for LISTEN (bypassing connection pool)
    • Sends notifications via the connection pool using pg_notify()
    • Implements exponential backoff reconnection (1s to 30s) for resilience
    • All instances receive notifications including the originator (unified model)
    • JSON payload encoding for pubkey and optional clientID
  • Configuration updates:

    • Added PgDirectUrl config option for direct PostgreSQL connections
    • Automatically selects PGNotifier when both DATABASE_URL and DATABASE_DIRECT_URL are configured
    • Falls back to LocalNotifier for SQLite or when direct URL is not set
  • Integration with SyncerServer:

    • Modified SetRecord to use notifier.Notify() instead of direct eventsManager.notifyChange()
    • Notifier lifecycle managed alongside server startup/shutdown
    • Graceful shutdown with context cancellation

Implementation Details

  • Notifications are JSON-encoded with compact field names (p for pubkey, c for clientID)
  • LISTEN uses a separate connection to avoid blocking the connection pool
  • Reconnection logic handles transient connection failures automatically
  • Comprehensive test coverage including cross-instance notifications, nil clientID handling, and graceful shutdown

https://claude.ai/code/session_01ETtJh2CpT5zn9Vab3yfRZ7

claude and others added 5 commits March 1, 2026 11:08
Route all change notifications through a ChangeNotifier interface to
enable horizontal scaling. For PostgreSQL, notifications are broadcast
via pg_notify and received via a dedicated LISTEN connection (using
DATABASE_DIRECT_URL to bypass connection poolers like Supavisor). For
SQLite, a LocalNotifier dispatches directly to the in-memory
eventsManager preserving current behavior.

https://claude.ai/code/session_01ETtJh2CpT5zn9Vab3yfRZ7
Replace TEST_PG_DATABASE_URL env var requirement with testcontainers-go.
Tests now spin up a disposable PostgreSQL container automatically via
Docker, making them fully self-contained and CI-friendly.

- Add testutil/pgcontainer.go shared helper using postgres:15-alpine
- Update store/postgres/pg_test.go with TestMain container lifecycle
- Update notifier/pg_test.go with TestMain container lifecycle
- Add testcontainers-go v0.31.0 dependency

https://claude.ai/code/session_01ETtJh2CpT5zn9Vab3yfRZ7
log.Printf("pgNotifier: listen connection lost: %v, reconnecting in %v", err, delay)
select {
case <-time.After(delay):
delay = min(delay*2, maxReconnectDelay)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any potential that a client could miss an event during the reconnect? I'm wondering if we could trigger a change to the handler so the client pulls

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean send notification to all clients on reconnect?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless there is a different way to handle it. I'm more interested if there's potential to miss a notify and the client doesn't pull

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes there is a potential to miss the notify if the connection is down. My concern in sending all clients a notification is that it will create a spike on the service at that time.
If we think this is important I can explore more alternatives. One of them I think to maintain a record per user with the last change, update it on every change and use when calling notify it will also clear that table. On re-connection we can go over that table and notify missing notifications.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we can use the revision increment to detect changes. I will check.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added recovery

syncer_server.go Outdated
storage = pgStorage

if config.PgDirectUrl != "" {
changeNotifier = notifier.NewPGNotifier(pgStorage, config.PgDirectUrl, em.notifyChange)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth validating/testing the PgDirectUrl and falling back to local notifier? A failure would mean the server would run without a working notifier

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps panicking is better? Otherwise we won't know why clients don't get notifications.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, if we have good oversight on panicking instances

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

@dangeross dangeross left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants