Skip to content

feat(api): Add lightweight HTTP API with SSE event streaming #2

@Fatih0234

Description

@Fatih0234

Problem

The orchestrator emits a rich stream of events (types.OrchestratorEvent) that currently has only two consumers: the Bubble Tea TUI and headless stdout logging. When running headless on a remote machine or in CI, there is no way to inspect the live state of the orchestrator without tailing log files. There is also no programmatic way to query "what is running right now?" or "which issues are in review?"

A lightweight HTTP API would solve this immediately. It does not require a full React dashboard — just a JSON snapshot endpoint and a live SSE event stream.

Goals

  • Expose the orchestrator state as a JSON snapshot over HTTP
  • Stream live orchestrator events via Server-Sent Events (SSE)
  • Keep the server minimal — no external JS build, no embedded SPA
  • Integrate cleanly with the existing Orchestrator.Events channel and StateManager
  • Gate the feature behind a --port CLI flag (disabled by default)

Non-Goals

  • Full REST CRUD for issues (tracker owns that)
  • Embedded React dashboard (deferred; this API is the prerequisite)
  • Authentication / authz on the API (assume local/development use)
  • WebSocket transport (SSE is simpler and sufficient for one-way streaming)
  • Persistent API history (only live events; consumers can record if needed)

API Specification

GET /api/v1/state

Returns a full snapshot of the orchestrator state.

{
  "running": [
    {
      "issue_id": "CB-1",
      "title": "Fix login bug",
      "stage": "execute",
      "attempt": 1,
      "pid": 12345,
      "tokens_in": 1500,
      "tokens_out": 2300,
      "started_at": "2026-04-25T12:00:00Z",
      "last_event_at": "2026-04-25T12:05:00Z"
    }
  ],
  "backoff": [
    {
      "issue_id": "CB-2",
      "stage": "verify",
      "attempt": 2,
      "retry_at": "2026-04-25T12:10:00Z",
      "error": "verification failed: tests did not pass"
    }
  ],
  "review": [
    {
      "issue_id": "CB-3",
      "title": "Add OAuth flow",
      "branch": "opencode/CB-3",
      "workspace_path": "/path/to/workspace",
      "ready_at": "2026-04-25T11:30:00Z",
      "stages_completed": ["plan", "execute", "verify"]
    }
  ],
  "stats": {
    "running_count": 1,
    "max_concurrency": 3,
    "total_tokens_in": 1500,
    "total_tokens_out": 2300
  }
}

GET /api/v1/events

Server-Sent Events stream. Sends an initial snapshot, then streams every orchestrator event as it happens.

HTTP/1.1 200 OK
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive

event: snapshot
data: {"running": [...], "backoff": [...], ...}

event: orchestrator
data: {"type": "stage.started", "issue_id": "CB-1", "timestamp": "...", "payload": {...}}

event: orchestrator
data: {"type": "tokens.updated", "issue_id": "CB-1", ...}

Event types on the wire:

SSE Event Name When
snapshot On client connect, send full state
orchestrator Every types.OrchestratorEvent emitted by the orchestrator

POST /api/v1/refresh

Placeholder hook that returns 202 Accepted. Future use: trigger an immediate poll cycle.

Data Model

Reuse existing types where possible. Define a thin JSON wrapper in a new package internal/web/.

// internal/web/server.go

// Snapshot mirrors the TUI model's derived view of the world.
type Snapshot struct {
    Running []RunningSnapshot `json:"running"`
    Backoff []BackoffSnapshot `json:"backoff"`
    Review  []ReviewSnapshot  `json:"review"`
    Stats   StatsSnapshot     `json:"stats"`
}

type RunningSnapshot struct {
    IssueID     string    `json:"issue_id"`
    Title       string    `json:"title"`
    Stage       string    `json:"stage"`
    Attempt     int       `json:"attempt"`
    PID         int       `json:"pid"`
    TokensIn    int64     `json:"tokens_in"`
    TokensOut   int64     `json:"tokens_out"`
    StartedAt   time.Time `json:"started_at"`
    LastEventAt time.Time `json:"last_event_at"`
}

// ...etc

Integration Points

CLI Flag

Add to cmd/contrabass/main.go:

port := flag.Int("port", 0, "HTTP API port (0 = disabled)")

Orchestrator Hook

The web server needs:

  1. A reference to orchestrator.StateManager to build snapshots
  2. A subscription to orchestrator.Events to stream live events

Create a SnapshotProvider interface so the web server does not depend directly on orchestrator internals:

type SnapshotProvider interface {
    Snapshot() Snapshot
}

The StateManager can implement this, or a thin adapter can sit between them.

Hub / Broadcast

Use a simple sync.RWMutex + map[chan WebEvent]struct{} pattern for SSE fan-out. The reference Contrabass has internal/hub/hub.go which is a generic broadcast hub — you can copy a minimal version or roll your own.

Concurrency Considerations

  • SSE clients must not block the orchestrator event loop. Use a buffered per-client channel (e.g., 64 events) with non-blocking send; drop events if the client is slow.
  • Snapshot reads must be safe while the orchestrator mutates state. The existing StateManager uses a sync.RWMutex already — ensure snapshot building takes the read lock.
  • The HTTP server runs in its own goroutine, separate from the orchestrator loop.

Edge Cases & Error Handling

Scenario Behavior
Client disconnects mid-stream Clean up the client channel, no panic
No events for a while SSE heartbeat comment every 30s to keep connection alive
Orchestrator not yet started Snapshot returns empty arrays, not an error
Port already in use Log fatal error and exit before starting orchestrator
--port not set Do not start the HTTP server at all

Existing Reference

See docs/references/contrabass/internal/web/server.go for the full Contrabass web server. It includes SnapshotProvider, BoardProvider, SSE streaming via a hub.Hub, and dashboard asset serving. Use it as a guide, but skip the dashboard FS embedding and board CRUD endpoints.

Files to Create / Modify

  • Create: internal/web/server.go — HTTP router + handlers
  • Create: internal/web/server_test.gohttptest based tests for snapshot and SSE
  • Create: internal/web/sse.go — SSE response writer helper
  • Create: internal/web/events.goWebEvent type and mapping from OrchestratorEvent
  • Modify: cmd/contrabass/main.go — add --port flag, conditionally start web server
  • Modify: internal/orchestrator/state.go — optionally implement Snapshot() or expose state safely

Acceptance Criteria

  • GET /api/v1/state returns valid JSON matching the snapshot schema
  • GET /api/v1/events streams an initial snapshot event, then live orchestrator events
  • Multiple SSE clients can connect simultaneously without blocking each other
  • Closing an SSE client does not leak goroutines or channels
  • The server does not start if --port is omitted or set to 0
  • Unit tests cover snapshot endpoint and SSE stream with mock events
  • Running curl -N http://localhost:8080/api/v1/events in a separate terminal shows live stage transitions

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions