A Kotlin/Spring Boot module for production-grade event-driven messaging with NATS JetStream, durable consumers with at-least-once delivery semantics, and real-time SSE polling via Kotlin Flow.
flowchart LR
subgraph Publishers
P1[NatsEventPublisher]
end
subgraph NATS["NATS Server"]
JS[JetStream Stream\nFLOWPULSE_TELEMETRY]
CN[Core NATS\nfallback]
end
subgraph Consumers
C1[NatsTelemetryConsumer\nJetStream pull, durable]
C2[NatsEventConsumer\ncore NATS, exact-match routing]
end
subgraph Handlers
H1[TelemetryEventHandler]
H2[InboxEventHandler]
end
subgraph Streaming
S1[TelemetryPollingSseService\nSSE polling via Flow]
S2[TelemetryStreamController\nthin REST layer]
end
Client[HTTP Client / Browser]
P1 -->|publish + Nats-Msg-Id| JS
P1 -->|fallback| CN
JS -->|pull batch\n100 msg / 500ms| C1
CN --> C2
C1 -->|HandleResult| H1
C2 -->|HandleResult| H2
S2 --> S1
S1 -->|Flow of SsePayload| Client
Key design points:
JetStreambean is created centrally inNatsConfigand injected into both publisher and consumer.- Publisher returns
List<PublishResult>(Persisted / BestEffort / Failed) - caller knows exactly what guarantee each subject got. Nats-Msg-Idheader on every JetStream publish enables server-side dedup.- Handlers return
HandleResult(Processed / Duplicate / Rejected / RetryLater) - consumer maps to ack/nak/term. - JetStream pull consumer: durable, configurable AckWait/MaxDeliver/MaxAckPending, semaphore-based concurrency (64 in-flight).
- NatsEventConsumer: exact-match subject routing via Map, bounded concurrency, graceful drain on shutdown.
- SSE uses polling with change detection - emits heartbeat when data unchanged, structured
SsePayload<T>envelope. - Input validation: invalid intervalSeconds or empty tagCodes return 400, not silent coercion.
- Events use typed enums (
EventSeverity,DataSource) with init validation on domain invariants.
| Layer | Technology |
|---|---|
| Runtime | Kotlin 1.9 / JVM 21 (Virtual Threads) |
| Framework | Spring Boot 3.2 + WebFlux |
| Messaging | NATS JetStream (jnats 2.17.6) |
| Async | Kotlin Coroutines + kotlinx-coroutines-reactor |
| SSE | Flow<ServerSentEvent<SsePayload<T>>> (Spring WebFlux native) |
| Serialization | Jackson + jackson-module-kotlin |
| Testing | JUnit 5 + MockK + Turbine |
src/main/kotlin/com/flowpulse/
FlowpulseApplication.kt Spring Boot entry point
config/
NatsConfig.kt NATS connection + JetStream bean + stream bootstrap
publisher/
NatsEventPublisher.kt Publish to JetStream with core NATS fallback;
returns List<PublishResult>; Nats-Msg-Id headers
PublishResult.kt Persisted / BestEffort / Failed sealed interface
consumer/
HandleResult.kt Processed / Duplicate / Rejected / RetryLater
NatsTelemetryConsumer.kt JetStream pull consumer (durable, configurable,
semaphore-limited) + core NATS fallback
TelemetryEventHandler.kt Interface - returns HandleResult
NatsEventConsumer.kt Core NATS with exact-match subject routing,
bounded concurrency, graceful shutdown drain
InboxEventHandler.kt Interface - returns HandleResult
streaming/
TelemetryStreamController.kt Thin REST layer - delegates to polling service
TelemetryPollingSseService.kt SSE polling with change detection, validation,
SsePayload envelope, heartbeat support
TelemetryQueryService.kt Interface - implement to provide telemetry data
dto/
TelemetryDtos.kt Response DTOs + SsePayload + SseStatus
events/
Events.kt Event data classes + EventSeverity + DataSource enums
src/test/kotlin/com/flowpulse/
publisher/NatsEventPublisherTest.kt PublishResult contracts, fallback
consumer/NatsTelemetryConsumerTest.kt Subscription, deserialization, HandleResult
consumer/NatsEventConsumerTest.kt Exact-match routing, lifecycle
streaming/TelemetryPollingSseServiceTest.kt Polling, change detection, validation, heartbeat
streaming/TelemetryStreamControllerTest.kt Controller delegation
Requires JDK 21 and Gradle (wrapper included).
./gradlew buildStart a NATS server with JetStream enabled:
docker run -p 4222:4222 nats:latest -jsThen run the application:
./gradlew bootRunThe application connects to nats://localhost:4222 by default and creates the FLOWPULSE_TELEMETRY JetStream stream on startup.
The stream covers the following subject patterns:
| Pattern | Purpose |
|---|---|
flowpulse.telemetry.> |
Flat telemetry subjects (aggregate arrived, backfill, per-datasource) |
flowpulse.events.> |
Vessel event detection |
flowpulse.*.telemetry.> |
Org-partitioned telemetry (flowpulse.{orgId}.telemetry.{vesselId}.*) |
flowpulse.vessel.> |
Mode transitions (flowpulse.vessel.mode.transition.*) |
| Endpoint | Description |
|---|---|
GET /api/v1/telemetry/stream?vesselId=X&tagCodes=SPEED,FUEL&intervalSeconds=5 |
Per-vessel telemetry polling |
GET /api/v1/telemetry/stream/aggregated?tagCodes=SPEED&intervalSeconds=30 |
Fleet-wide aggregated metrics polling |
SSE endpoints activate only when a TelemetryQueryService bean is present (@ConditionalOnBean).
Responses use SsePayload<T> envelope with explicit status: OK | ERROR - error events never contain fake business data.
nats:
enabled: true
url: nats://localhost:4222
jetstream:
enabled: true
stream-name: FLOWPULSE_TELEMETRY
retention-hours: 168
consumer-name: flowpulse-telemetry-consumer
replicas: 1
auto-create-stream: true
ack-wait-seconds: 30
max-deliver: 5
max-ack-pending: 1000
subjects:
aggregate-arrived: flowpulse.telemetry.aggregate.arrived
vessel-event-detected: flowpulse.events.vessel.detected
backfill-received: flowpulse.telemetry.backfill.received
mode-transition: flowpulse.vessel.mode.transition
telemetry-aggregate: flowpulse.telemetry.aggregate.arrived
telemetry-partitioned: "flowpulse.*.telemetry.>"
fleet-intel-anomaly: flowpulse.fleetintel.anomaly
fleet-intel-insight: flowpulse.fleetintel.insight
decision-recommendation: flowpulse.decision.recommendation
alert-triggered: flowpulse.alert.triggeredSet nats.enabled=false to disable NATS entirely. Set nats.jetstream.enabled=false to use core NATS only. Set nats.jetstream.auto-create-stream=false in production to manage streams via ops tooling.
All handler interfaces are gated by @ConditionalOnBean - the module starts without implementations; relevant consumers simply don't activate.
- Provide a
@ComponentimplementingTelemetryEventHandlerto activateNatsTelemetryConsumerand process inbound telemetry. - Provide a
@ComponentimplementingInboxEventHandlerto activateNatsEventConsumerand handle anomaly/insight/alert/recommendation events. - Provide a
@ComponentimplementingTelemetryQueryServiceto activate SSE endpoints with real data.
Handler methods return HandleResult to control consumer acknowledgment behavior:
| Result | JetStream action | Core NATS action |
|---|---|---|
Processed |
ack | log debug |
Duplicate |
ack | log debug |
RetryLater |
nak (redelivery) | log warn |
Rejected |
term (no redelivery) | log warn |
0.1.0 - API is stabilising but not yet frozen. Minor versions may include breaking changes until 1.0.0.
MIT - see LICENSE
