Skip to content

powersemmi/ruststream-nats

Repository files navigation

ruststream-nats

The NATS broker for the RustStream messaging framework: Core NATS and JetStream, request/reply, and an in-process test broker.

CI crates.io docs.rs MSRV 1.85 License Telegram news channel Telegram RU chat

Documentation


ruststream-nats implements the RustStream broker contract over async-nats. Handlers, routers, codecs, and middleware come from the framework; this crate supplies the transport - and nothing broker-specific leaks back into the framework.

Features

  • Core NATS and JetStream. Subscribe by subject, or describe a durable JetStream consumer with the SubscribeOptions builder (stream, durable name, queue group, filter subject, ack wait, max ack pending, deliver policy).
  • Lazy startup contract. NatsBroker::new(url) is synchronous and does no I/O; the runtime connects once at startup, so the broker composes with #[ruststream::app]. An existing connection plugs in via NatsBroker::from_client.
  • Acknowledgement that matches the transport. JetStream deliveries ack/nack natively; Core NATS reports AckError::Unsupported instead of pretending.
  • Request/reply. NatsPublisher implements the RequestReply capability over native NATS request semantics.
  • In-process test broker. The testing feature ships NatsTestBroker, a handler-stub transport that reproduces Core routing (no server, no JetStream simulation), implements ruststream::testing::TestableBroker, and passes the framework's conformance suite in process.

Install

[dependencies]
ruststream = { version = "0.5", features = ["macros", "json"] }
ruststream-nats = "0.5"
serde = { version = "1", features = ["derive"] }

[dev-dependencies]
ruststream-nats = { version = "0.5", features = ["testing"] }

Scaffold

Generate a ready-to-run service with cargo generate - nats for a Core NATS starter, nats-js for a durable JetStream consumer:

cargo generate --git https://github.com/powersemmi/ruststream-nats templates/nats --name my-service
cargo generate --git https://github.com/powersemmi/ruststream-nats templates/nats-js --name my-service

Write a service

use ruststream::runtime::{App, AppInfo, HandlerResult, RustStream};
use ruststream::subscriber;
use ruststream_nats::NatsBroker;
use serde::Deserialize;

#[derive(Debug, Deserialize)]
struct Order {
    id: u64,
}

#[subscriber("orders.created")]
async fn handle(order: &Order) -> HandlerResult {
    println!("got order {}", order.id);
    HandlerResult::Ack
}

#[ruststream::app]
fn app() -> impl App {
    RustStream::new(AppInfo::new("orders", "0.1.0"))
        .with_broker(NatsBroker::new("nats://localhost:4222"), |b| b.include(handle))
}

JetStream

Bind the same handler to a durable JetStream consumer by overriding its source - either at the mount site or directly in the decorator:

use ruststream_nats::SubscribeOptions;

// at the mount site
b.include_on(
    SubscribeOptions::new("orders.*").jetstream("ORDERS").durable("orders-worker"),
    handle,
);

// or in the decorator
#[subscriber(SubscribeOptions::new("orders.*").jetstream("ORDERS").durable("orders-worker"))]
async fn handle(order: &Order) -> HandlerResult { /* ... */ }

Test it

The testing feature runs handlers against an in-process NATS stand-in - no server, same routing. Inject a message as an external producer would with TestableBroker::inject, then assert on what a handler published with the free expect_published:

use ruststream::OutgoingMessage;
use ruststream::testing::{TestableBroker, expect_published};
use ruststream_nats::testing::NatsTestBroker;

let broker = NatsTestBroker::new();
broker.inject(OutgoingMessage::new("orders.created", br#"{"id":1}"#));
let confirmations =
    expect_published(&broker, "confirmations", 1, std::time::Duration::from_secs(1)).await;

JetStream-specific behaviour (durable resume, redelivery timing) is covered by the env-gated integration suite instead: just test-brokers spins up nats:2-alpine with JetStream and runs the live tests plus the framework conformance suite against it.

Layout

ruststream-nats/
├── crates/
│   └── ruststream-nats/        the published crate
│       └── examples/           runnable nats_* examples (docs-site snippet sources)
├── docs/                       the documentation site (MkDocs Material)
├── templates/                  cargo-generate scaffolds (nats, nats-js)
├── mkdocs.yml                  docs site config
└── Cargo.toml                  workspace

The crate resolves ruststream against the crates.io version range (ruststream = ">=0.5.0, <0.6.0").

Documentation

The NATS broker docs live at powersemmi.github.io/ruststream-nats and are built from docs/ with MkDocs Material. The runnable nats_* examples under crates/ruststream-nats/examples/ are embedded into the docs as snippets, so they stay compiled and in sync. Framework concepts (subscribers, routing, codecs, middleware, the CLI) live in the RustStream docs.

Build the site locally:

pip install -r docs/requirements.txt
mkdocs serve

Contributing

just check          # fmt, clippy, feature checks
just test           # handler-stub tests, no server
just test-brokers   # live integration + conformance against nats:2-alpine

License

Licensed under the Apache-2.0 license.

About

implements the RustStream broker contract over async-nats

Topics

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages