Skip to content

vinod-morya/fibersse

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

12 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

FiberSSE

Production-grade Server-Sent Events (SSE) for Fiber v3

CI Coverage Go Reference Go Report Card License Release npm


React SDK available: npm install fibersse-react — hooks for TanStack Query / SWR cache invalidation. GitHub

Blog: How We Eliminated 90% of API Calls by Replacing Polling with SSE

Stop polling. Start pushing. The only SSE library built natively for Fiber v3 — with built-in cache invalidation, event coalescing, and one-line domain event publishing.

Replace setInterval with one line of Go:

// Before: client polls every 30 seconds (wasteful)
// setInterval(() => fetch("/api/orders"), 30_000)

// After: server pushes when data ACTUALLY changes
hub.Invalidate("orders", order.ID, "created")  // → client refetches instantly

80-90% fewer API calls. Real-time UI. Zero polling.

Lineage: fibersse predates and informed Fiber's official middleware/sse, merged in PR #4239. Findings from our review (Stream.Context() disconnect semantics, c.Abandon() ordering, panic-to-OnClose conversion, slow-consumer heartbeat caveat) all landed in the core middleware.

When to use which:

  • Use Fiber's built-in middleware/sse if you only need a single-stream Handler API: one client, one stream, no fan-out / replay / multi-tenant routing.
  • Use fibersse (this library) when you need any of: a Hub broker, topic routing with NATS-style wildcards, three priority lanes (instant / batched / coalesced last-writer-wins), tenant scoping by metadata, Last-Event-ID replay, Redis/NATS bridges, adaptive per-client throttling, or graceful Kubernetes-style drain. fibersse composes around the same Fiber SendStreamWriter transport, so you can adopt it incrementally alongside or in place of the core middleware.

Why FiberSSE?

1. The Only SSE Library That Works on Fiber

Every Go SSE library (r3labs/sse, tmaxmax/go-sse) is built on net/http and breaks on Fiberfasthttp.RequestCtx.Done() only fires on server shutdown, not per-client disconnect. Zombie subscribers leak forever. fibersse uses Fiber's native SendStreamWriter with w.Flush() error detection.

2. Built to Kill Polling

Most SSE libraries just push events. fibersse has built-in patterns for replacing polling:

API What It Does Replaces
hub.Invalidate() Signal clients to refetch a resource setInterval polling
hub.InvalidateForTenant() Tenant-scoped invalidation (multi-tenant SaaS) Tenant polling
hub.InvalidateForTenantWithHint() Tenant-scoped + data hints in one call Polling + extra fetch
hub.DomainEvent() Structured event from any handler/worker Manual event wiring
hub.BatchDomainEvents() Multiple resource changes in one SSE frame Multiple polling loops
hub.Progress() Coalesced progress (5%→8% sends only 8%) 2s progress polling
hub.Complete() Operation done signal (instant delivery) Completion polling
hub.Signal() / SignalForTenant() Generic "something changed" refresh Dashboard polling

3. Every Feature a SaaS Needs

Feature r3labs/sse tmaxmax/go-sse fibersse
Fiber v3 native No No Yes
Disconnect detection Broken on Fiber Broken on Fiber Works (flush-based)
Event coalescing No No Yes (last-writer-wins)
Priority lanes No No Yes (P0 instant / P1 batched / P2 coalesced)
Topic wildcards No No Yes (NATS-style * and >)
Adaptive throttling No No Yes (buffer-depth AIMD)
Connection groups No No Yes (publish by metadata)
Backpressure Blocks sender Blocks sender Drops + reconnect hint
Built-in auth No No Yes (JWT + ticket helpers)
Prometheus metrics No No Yes
Graceful drain No No Yes (Kubernetes-style)
Event TTL No No Yes
Last-Event-ID replay Yes Yes Yes (pluggable)
Fan-out middleware No No Yes (Redis/NATS bridge)

fibersse vs Fiber SSE Recipe

Fiber's official SSE recipe is ~50 lines of raw SendStreamWriter code. It's a great starting point, but it's a recipe (copy-paste example), not a library. Here's what fibersse adds:

Feature Fiber Recipe fibersse
Hub pattern (managed connections)
Topic routing
NATS-style wildcard topics (*, >)
Event coalescing (P0/P1/P2 priorities)
Authentication (JWT + ticket)
Last-Event-ID replay
Heartbeat management ✅ (adaptive)
Connection tracking + groups
Prometheus metrics
Graceful Kubernetes-style drain
Cache invalidation helpers
Multi-tenant support
Domain event publishing
Progress tracking (coalesced)
Auto fan-out from Redis/NATS
Visibility hints (paused tabs)
Adaptive per-connection throttling
React SDK (fibersse-react)

The recipe is perfect if you need to push a single event to a single client. fibersse is for production apps that need topic routing, multi-tenancy, auth, coalescing, and monitoring.

Install

go get github.com/vinod-morya/fibersse@latest

Requirements: Go 1.23+ and Fiber v3.

Quick Start

package main

import (
    "time"
    "github.com/gofiber/fiber/v3"
    "github.com/vinod-morya/fibersse"
)

func main() {
    app := fiber.New()

    // Create the SSE hub
    hub := fibersse.New(fibersse.HubConfig{
        FlushInterval:     2 * time.Second,
        HeartbeatInterval: 30 * time.Second,
        OnConnect: func(c fiber.Ctx, conn *fibersse.Connection) error {
            // Authenticate and set topics
            conn.Topics = []string{"notifications", "live"}
            conn.Metadata["user_id"] = "user_123"
            return nil
        },
    })

    // Mount the SSE endpoint
    app.Get("/events", hub.Handler())

    // Publish events from anywhere in your app
    go func() {
        for i := 0; ; i++ {
            hub.Publish(fibersse.Event{
                Type:   "heartbeat",
                Data:   map[string]int{"count": i},
                Topics: []string{"live"},
            })
            time.Sleep(5 * time.Second)
        }
    }()

    app.Listen(":3000")
}

Client (browser):

const es = new EventSource('/events');
es.addEventListener('heartbeat', (e) => {
    console.log(JSON.parse(e.data)); // { count: 0 }
});
es.addEventListener('notification', (e) => {
    showToast(JSON.parse(e.data));
});

Kill Polling Guide

Step 1: Replace setInterval with Invalidation

Backend — publish when data changes:

// In your order handler
func (h *OrderHandler) Create(c fiber.Ctx) error {
    order, err := h.svc.Create(...)
    if err != nil { return err }

    // One line — replaces 30s polling for ALL connected clients
    hub.InvalidateForTenant(tenantID, "orders", order.ID, "created")
    return c.JSON(order)
}

Frontend — listen and refetch:

// With TanStack Query (React Query)
const es = new EventSource('/events?topics=orders');
es.addEventListener('invalidate', (e) => {
    const { resource } = JSON.parse(e.data);
    queryClient.invalidateQueries({ queryKey: [resource] });
});

// With SWR
es.addEventListener('invalidate', (e) => {
    const { resource } = JSON.parse(e.data);
    mutate(`/api/${resource}`);
});

Step 2: Track Progress Without Polling

// Backend — in your import worker
for i, row := range rows {
    processRow(row)
    hub.Progress("import", importID, tenantID, i+1, len(rows))
    // Fires 1000 times but client receives ~10 updates (coalesced!)
}
hub.Complete("import", importID, tenantID, true, nil)
// Frontend
es.addEventListener('progress', (e) => {
    const { pct } = JSON.parse(e.data);
    setProgressBar(pct); // Smooth updates, no polling
});
es.addEventListener('complete', (e) => {
    showToast("Import complete!");
    queryClient.invalidateQueries({ queryKey: ['products'] });
});

Step 3: Dashboard Signals (No Polling, Ever)

// Backend — after ANY mutation that affects the dashboard
hub.SignalForTenant(tenantID, "dashboard") // coalesced, won't flood

// Or with hints:
hub.InvalidateWithHint("orders", orderID, "created", map[string]any{
    "total": 149.99,
    "customer": "John Doe",
})

Impact

Metric Before (Polling) After (SSE)
API calls per user/minute ~12 (6 pages × 30s) ~0-2 (only when data changes)
Time to see new data 0-30 seconds < 200ms
Server load Constant (even idle users poll) Proportional to actual changes
Battery drain (mobile) High (constant network) Minimal (idle connection)

Features

Event Priority & Coalescing

Three priority lanes control how events reach clients:

// P0: INSTANT — bypasses all buffering, sent immediately
// Use for: notifications, errors, chat messages, auth revocations
hub.Publish(fibersse.Event{
    Type:     "notification",
    Data:     map[string]string{"title": "New order!"},
    Topics:   []string{"notifications"},
    Priority: fibersse.PriorityInstant,
})

// P1: BATCHED — collected in a time window, all sent together
// Use for: status updates, media processing
hub.Publish(fibersse.Event{
    Type:     "media_status",
    Data:     map[string]string{"id": "m_1", "status": "ready"},
    Topics:   []string{"media"},
    Priority: fibersse.PriorityBatched,
})

// P2: COALESCED — last-writer-wins per key
// If progress goes 5% → 6% → 7% → 8% in 2 seconds, client receives only 8%
hub.Publish(fibersse.Event{
    Type:        "progress",
    Data:        map[string]int{"pct": 8},
    Topics:      []string{"tasks"},
    Priority:    fibersse.PriorityCoalesced,
    CoalesceKey: "task:abc123",
})

Topic Wildcards (NATS-style)

Subscribe to topic patterns using * (one segment) and > (one or more trailing segments):

// Client subscribes to "analytics.*"
conn.Topics = []string{"analytics.*"}

// These events all match:
hub.Publish(fibersse.Event{Topics: []string{"analytics.live"}})      // matched by *
hub.Publish(fibersse.Event{Topics: []string{"analytics.revenue"}})   // matched by *

// Subscribe to everything under analytics:
conn.Topics = []string{"analytics.>"}

// Now these also match:
hub.Publish(fibersse.Event{Topics: []string{"analytics.live.visitors"}})   // matched by >
hub.Publish(fibersse.Event{Topics: []string{"analytics.funnel.checkout"}}) // matched by >

Connection Groups

Publish to connections by metadata instead of topics — perfect for multi-tenant SaaS:

// During OnConnect, set metadata:
conn.Metadata["tenant_id"] = "t_123"
conn.Metadata["plan"] = "pro"

// Publish to ALL connections for a specific tenant:
hub.Publish(fibersse.Event{
    Type:  "tenant_update",
    Data:  map[string]string{"message": "Plan upgraded"},
    Group: map[string]string{"tenant_id": "t_123"},
})

// Publish to all pro-plan users:
hub.Publish(fibersse.Event{
    Type:  "feature_announcement",
    Data:  "New feature available!",
    Group: map[string]string{"plan": "pro"},
})

Adaptive Throttling

The hub automatically adjusts flush intervals per connection based on buffer saturation:

Buffer Saturation Effective Interval Behavior
< 10% (healthy) FlushInterval / 4 Fast delivery
10-50% (normal) FlushInterval Default cadence
50-80% (warning) FlushInterval × 2 Slowing down
> 80% (critical) FlushInterval × 4 Backpressure relief

Mobile users on slow connections automatically get fewer updates. Desktop users on fast connections get near-real-time delivery. Zero configuration needed.

Client Visibility Hints

Pause non-critical events for hidden browser tabs:

// Server-side: pause/resume a connection
hub.SetPaused(connID, true)   // tab hidden → skip P1/P2 events
hub.SetPaused(connID, false)  // tab visible → resume all events

P0 (instant) events are always delivered regardless of pause state — critical messages like errors and auth revocations never get dropped.

Built-in Authentication

JWT Auth — validate Bearer tokens or query parameters:

hub := fibersse.New(fibersse.HubConfig{
    OnConnect: fibersse.JWTAuth(func(token string) (map[string]string, error) {
        claims, err := myJWTValidator(token)
        if err != nil {
            return nil, err
        }
        return map[string]string{
            "tenant_id": claims.TenantID,
            "user_id":   claims.UserID,
        }, nil
    }),
})

Ticket Auth — one-time tickets for EventSource (which can't send headers):

store := fibersse.NewMemoryTicketStore() // or implement TicketStore with Redis

// Issue ticket (in your authenticated POST endpoint):
ticket, _ := fibersse.IssueTicket(store, `{"tenant":"t1","topics":"notifications,live"}`, 30*time.Second)

// Use ticket auth in hub:
hub := fibersse.New(fibersse.HubConfig{
    OnConnect: fibersse.TicketAuth(store, func(value string) (map[string]string, []string, error) {
        var data struct{ Tenant, Topics string }
        json.Unmarshal([]byte(value), &data)
        return map[string]string{"tenant_id": data.Tenant},
               strings.Split(data.Topics, ","), nil
    }),
})

Auto Fan-Out (Redis/NATS Bridge)

Bridge external pub/sub to SSE with one line:

// Redis pub/sub → SSE (implement PubSubSubscriber interface)
cancel := hub.FanOut(fibersse.FanOutConfig{
    Subscriber: myRedisSubscriber,
    Channel:    "notifications:tenant_123",
    Topic:      "notifications",
    EventType:  "notification",
    Priority:   fibersse.PriorityInstant,
})
defer cancel()

// Multiple channels at once:
cancel := hub.FanOutMulti(
    fibersse.FanOutConfig{Subscriber: redis, Channel: "notifications:*", Topic: "notifications", EventType: "notification", Priority: fibersse.PriorityInstant},
    fibersse.FanOutConfig{Subscriber: redis, Channel: "media:*", Topic: "media", EventType: "media_status", Priority: fibersse.PriorityBatched},
    fibersse.FanOutConfig{Subscriber: redis, Channel: "import:*", Topic: "import", EventType: "progress", Priority: fibersse.PriorityCoalesced},
)
defer cancel()

Implement the PubSubSubscriber interface for your broker:

type PubSubSubscriber interface {
    Subscribe(ctx context.Context, channel string, onMessage func(payload string)) error
}

Event TTL

Drop stale events instead of delivering outdated data:

hub.Publish(fibersse.Event{
    Type:   "live_count",
    Data:   map[string]int{"visitors": 42},
    Topics: []string{"live"},
    TTL:    5 * time.Second, // useless after 5 seconds
})

Prometheus Metrics

Built-in monitoring endpoints:

// JSON metrics (for dashboards)
app.Get("/admin/sse/metrics", hub.MetricsHandler())

// Prometheus format (for Grafana/Datadog)
app.Get("/metrics/sse", hub.PrometheusHandler())

Exposed metrics:

  • fibersse_connections_active — current open connections
  • fibersse_connections_paused — hidden-tab connections
  • fibersse_events_published_total — lifetime events published
  • fibersse_events_dropped_total — events dropped (backpressure/TTL)
  • fibersse_pending_events — events buffered in coalescers
  • fibersse_buffer_saturation_avg — average send buffer usage
  • fibersse_buffer_saturation_max — worst-case buffer usage
  • fibersse_connections_by_topic{topic="..."} — per-topic breakdown
  • fibersse_events_by_type_total{type="..."} — per-event-type breakdown (invalidate, progress, signal, batch, etc.)

Last-Event-ID Replay

Pluggable replay for reconnecting clients:

hub := fibersse.New(fibersse.HubConfig{
    Replayer: fibersse.NewMemoryReplayer(fibersse.MemoryReplayerConfig{
        MaxEvents: 1000,
        TTL:       5 * time.Minute,
    }),
})

Implement the Replayer interface for Redis Streams or any durable store:

type Replayer interface {
    Store(event MarshaledEvent, topics []string) error
    Replay(lastEventID string, topics []string) ([]MarshaledEvent, error)
}

Graceful Drain (Kubernetes-style)

On shutdown, the hub:

  1. Enters drain mode (rejects new connections with 503 + Retry-After: 5)
  2. Sends server-shutdown event to all connected clients
  3. Waits for context deadline to let clients reconnect elsewhere
  4. Closes all connections and stops the run loop
// In your shutdown handler:
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
hub.Shutdown(ctx)

Backpressure

Each connection has a bounded send buffer (default: 256 events). If a client can't keep up:

  • New events are dropped (not queued infinitely)
  • MessagesDropped counter increments
  • Monitor via hub.Metrics() to identify slow clients
  • The client's EventSource auto-reconnects and gets current state

Benchmarks

Run on Apple M4 Max, Go 1.25, -benchmem:

Operation ns/op B/op allocs/op
Publish (1 conn) 477 72 2
Publish (1,000 conns) 81,976 101,572 22
Coalesce same key 21 0 0
Topic match (exact) 8 0 0
Topic match (wildcard *) 51 64 2
Topic match (wildcard >) 60 96 2
Marshal event (string) 3 0 0
Marshal event (struct) 89 96 2
Connection send 14 0 0
Backpressure drop 2 0 0
Throttle decision 19 0 0
Group match (single key) 27 0 0
Replayer store 140 687 4

Key takeaway: Publishing to 1,000 connections takes 82μs. Zero-alloc on all hot paths (topic match, send, backpressure, throttle).

go test -bench=. -benchmem ./...

Configuration

fibersse.HubConfig{
    FlushInterval:     2 * time.Second,   // P1/P2 coalescing window
    SendBufferSize:    256,               // per-connection buffer capacity
    HeartbeatInterval: 30 * time.Second,  // keepalive for disconnect detection
    MaxLifetime:       30 * time.Minute,  // max connection duration (0 = unlimited)
    RetryMS:           3000,              // client reconnection hint (ms)
    Replayer:          nil,               // Last-Event-ID replay (nil = disabled)
    Logger:            slog.Default(),    // structured logging (nil = disabled)
    OnConnect:         nil,               // auth + topic selection callback
    OnDisconnect:      nil,               // cleanup callback
    OnPause:           nil,               // called when client tab goes hidden
    OnResume:          nil,               // called when client tab becomes visible
}

Architecture

                Publish()
                   │
                   ▼
  ┌────────────────────────────────────────┐
  │  Hub Run Loop (single goroutine)       │
  │                                        │
  │  register   ◄── new connections        │
  │  unregister ◄── disconnects            │
  │  events     ◄── published events       │
  │                                        │
  │  For each event:                       │
  │    1. Match topics (exact + wildcard)  │
  │    2. Match groups (metadata k-v)      │
  │    3. Skip paused connections (P1/P2)  │
  │    4. Route by priority:               │
  │       P0 → send channel (immediate)    │
  │       P1 → batch buffer               │
  │       P2 → coalesce buffer (LWW)      │
  │                                        │
  │  Flush ticker (every FlushInterval):   │
  │    Adaptive throttle per connection    │
  │    Drain batch + coalesce → send chan  │
  │                                        │
  │  Heartbeat ticker:                     │
  │    Send comment to idle connections    │
  └────────────────────────────────────────┘
                   │
                   ▼ (per-connection send channel)
  ┌────────────────────────────────────────┐
  │  Connection Writer (in SendStreamWriter)│
  │                                        │
  │  for event := range sendChan:          │
  │    write SSE format → bufio.Writer     │
  │    w.Flush() → detect disconnect       │
  └────────────────────────────────────────┘

File Structure

fibersse/
├── hub.go             Core hub — New(), Publish(), Handler(), Shutdown()
├── invalidation.go    Kill polling — Invalidate(), Signal(), InvalidateForTenant()
├── domain_event.go    One-line publish — DomainEvent(), Progress(), Complete()
├── event.go           Event struct, Priority constants, SSE wire format
├── connection.go      Per-client connection, write loop, backpressure
├── coalescer.go       Batch + last-writer-wins buffers
├── topic.go           NATS-style wildcard topic matching (* and >)
├── throttle.go        Adaptive per-connection flush interval (AIMD)
├── auth.go            JWTAuth, TicketAuth, TicketStore helpers
├── fanout.go          PubSubSubscriber, FanOut(), FanOutMulti()
├── replayer.go        Last-Event-ID replay (pluggable MemoryReplayer)
├── metrics.go         PrometheusHandler, MetricsHandler
├── stats.go           HubStats struct
├── CLAUDE.md            Instructions for AI agents (Claude, Codex, Copilot)
├── hub_test.go          29 unit tests
├── integration_test.go  11 integration tests (real Fiber HTTP server)
└── benchmark_test.go    42 benchmarks (publish, coalesce, topic match, etc.)

Integration with TanStack Query / SWR

The canonical pattern for bridging fibersse events to your React data layer:

TanStack Query (React Query)

import { useQueryClient } from '@tanstack/react-query';
import { useEffect } from 'react';

function useSSEInvalidation(topics: string[]) {
  const queryClient = useQueryClient();

  useEffect(() => {
    const es = new EventSource(`/events?topics=${topics.join(',')}`);

    // Single resource invalidation
    es.addEventListener('invalidate', (e) => {
      const { resource, resource_id, action, hint } = JSON.parse(e.data);

      // Invalidate the collection
      queryClient.invalidateQueries({ queryKey: [resource] });

      // Invalidate the specific item
      if (resource_id) {
        queryClient.invalidateQueries({ queryKey: [resource, resource_id] });
      }

      // Optional: update cache directly from hint (skip refetch)
      if (hint && resource_id) {
        queryClient.setQueryData([resource, resource_id], (old) =>
          old ? { ...old, ...hint } : old
        );
      }
    });

    // Batch invalidation (multiple resources in one event)
    es.addEventListener('batch', (e) => {
      const events = JSON.parse(e.data);
      const resources = new Set(events.map(e => e.resource));
      resources.forEach(resource => {
        queryClient.invalidateQueries({ queryKey: [resource] });
      });
    });

    // Progress tracking
    es.addEventListener('progress', (e) => {
      const { resource_id, pct } = JSON.parse(e.data);
      // Update local state for progress bars
    });

    // Completion
    es.addEventListener('complete', (e) => {
      const { resource_id, status } = JSON.parse(e.data);
      if (status === 'completed') {
        queryClient.invalidateQueries(); // refetch everything
      }
    });

    return () => es.close();
  }, [topics, queryClient]);
}

// Usage in any page:
function OrdersPage() {
  useSSEInvalidation(['orders', 'dashboard']);
  const { data } = useQuery({ queryKey: ['orders'], queryFn: fetchOrders });
  // ↑ Automatically refetches when server publishes hub.Invalidate("orders", ...)
}

SWR

import { useSWRConfig } from 'swr';

function useSSEInvalidation(topics: string[]) {
  const { mutate } = useSWRConfig();

  useEffect(() => {
    const es = new EventSource(`/events?topics=${topics.join(',')}`);

    es.addEventListener('invalidate', (e) => {
      const { resource, resource_id } = JSON.parse(e.data);
      mutate(`/api/${resource}`);
      if (resource_id) mutate(`/api/${resource}/${resource_id}`);
    });

    return () => es.close();
  }, [topics, mutate]);
}

Versioning

This project follows Semantic Versioning:

  • v0.x.y — Pre-1.0 development. API may change between minor versions.
  • v1.0.0 — Stable API. Breaking changes only in major versions.

Current: v0.5.0.

Roadmap

  • Redis Streams Replayer (durable replay across server restarts)
  • React SDK (fibersse-react) — useSSE() and useSSEInvalidation() hooks
  • Admin Dashboard (web UI for live connection monitoring)
  • WebSocket fallback transport
  • Load testing CLI (fibersse-bench)
  • OpenTelemetry tracing integration
  • TanStack Query integration example

Examples

Runnable examples in the examples/ directory:

Example What it demonstrates Run
basic Minimal hub setup, periodic publisher, browser client cd examples/basic && go run main.go
chat Multi-room chat with topic wildcards and metadata cd examples/chat && go run main.go
polling-replacement Side-by-side polling vs SSE comparison cd examples/polling-replacement && go run main.go

Contributing

Contributions are welcome! See CONTRIBUTING.md for development workflow, code style, and PR process.

License

MIT - Vinod Morya

Author

Vinod Morya@vinod-morya

Built at PersonaCart — the creator commerce platform. fibersse powers all real-time features in PersonaCart: notifications, live analytics, media processing, curriculum generation progress, and more.

About

Production-grade Server-Sent Events (SSE) for Fiber v3. Event coalescing, priority lanes, topic wildcards, adaptive throttling, built-in auth, Prometheus metrics, graceful drain.

Topics

Resources

License

Contributing

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages