Hi.
I enjoy the watermill library a lot. I ran into some challenges when using the NATS Jetstream PubSub implementation though.
I have created an Embedded NATS server in my Go project. And while the NATS server is seemingly running without any errors arising, I never managed to get an active connection using the documented Publisher and Subscriber initialization from Watermill docs.
NOTE: I have a background Daemon which instantiates Publisher and Subscriber in order to run background processing. My main.go essentially just parses a Publisher to create a CommandBus and delivers that to my Cobra CLI handler.
Steps to reproduce
Create Embedded NATS Server
// infra/nats_server.go
func StartEmbeddedNATS(host string, port int) (*server.Server, error) {
opts := &server.Options{
Host: host,
Port: port,
// This enables the "Persistence" layer (JetStream)
JetStream: true,
StoreDir: "/tmp/webdock-nats",
}
ns, err := server.NewServer(opts)
if err != nil {
return nil, err
}
// Start the server in a goroutine
go ns.Start()
slog.Info("NATS server starting...")
// 1. Wait for the TCP port to be open
if !ns.ReadyForConnections(10 * time.Second) {
return nil, fmt.Errorf("NATS server didn't start")
}
slog.Info("NATS server and JetStream are ready")
return ns, nil
}
Invoking NATS server and PubSubs
// daemon.go which utilizes Publisher and Subscriber
// 2. Start the Embedded NATS Server
ns, err := daemon.StartEmbeddedNATS(natsHost, natsPort)
if err != nil {
slog.Error("Failed to start embedded NATS", "err", err)
os.Exit(1)
}
defer ns.Shutdown()
// I create Publisher and Subscriber objects like so:
pub, sub, err := eventbus.NewNATSPubSub(natsUrl)
if err != nil {
slog.Error("Failed to init NATS pubsub", "err", err)
os.Exit(1)
}
defer pub.Close()
defer sub.Close()
New NATS PubSub func
I instantiate the Publisher and Subscribe objects according to the docs:
func NewNATSPubSub(url string) (message.Publisher, message.Subscriber, error) {
logger := newWatermillLogger()
// NATS uses a Marshaler to turn structs into wire data
marshaler := &nats.JSONMarshaler{}
pub, err := nats.NewPublisher(nats.PublisherConfig{
URL: url,
Marshaler: marshaler,
JetStream: nats.JetStreamConfig{
Disabled: false,
AutoProvision: true, // Automatically creates streams
},
}, logger)
if err != nil {
return nil, nil, err
}
sub, err := nats.NewSubscriber(nats.SubscriberConfig{
URL: url,
Unmarshaler: marshaler,
JetStream: nats.JetStreamConfig{
Disabled: false,
AutoProvision: true, // Automatically creates streams
},
}, logger)
return pub, sub, err
}
Expected behavior
What I expect is that the NATS server initializes and the Publisher and Subscriber objects both connect correctly to the server. Resulting in allowing for events to be sent and retrieved.
Actual behavior
Unfortunately, no connection is available.
I do a quick retry to check if the error object is nil, if it is then break out of the retry, otherwise reattempt to set the objects:
for i := 0; i < 5; i++ {
pub, sub, err = eventbus.NewNATSPubSub("nats://127.0.0.1:4222")
if err == nil {
break
}
slog.Info("Waiting for NATS server to be ready...", "attempt", i+1)
time.Sleep(1 * time.Second)
}
if err != nil {
slog.Error("Failed to connect to NATS after retries", "err", err)
os.Exit(1)
}
It keeps failing on me.
It exhausts the retries. The only error message I receive is: "cannot connect to nats: nats: no servers available for connection"
Possible solution
I have attempted to delay the initialization a bit with this:
// within the StartEmbeddedNATS func
// Wait specifically for JetStream to be initialized
start := time.Now()
for {
if ns.JetStreamEnabled() {
break
}
if time.Since(start) > 5*time.Second {
return nil, fmt.Errorf("NATS started, but JetStream failed to initialize")
}
time.Sleep(50 * time.Millisecond)
}
Effectively is checks for enabled JetStream before returning the NATS server instance.
That works for me.
I'm no expert at all, but maybe Watermill silently fails setting up PubSub when using the JetStream configuration due to some underlying delay. I am not sure.
Hi.
I enjoy the watermill library a lot. I ran into some challenges when using the NATS Jetstream PubSub implementation though.
I have created an Embedded NATS server in my Go project. And while the NATS server is seemingly running without any errors arising, I never managed to get an active connection using the documented Publisher and Subscriber initialization from Watermill docs.
NOTE: I have a background Daemon which instantiates Publisher and Subscriber in order to run background processing. My main.go essentially just parses a Publisher to create a CommandBus and delivers that to my Cobra CLI handler.
Steps to reproduce
Create Embedded NATS Server
Invoking NATS server and PubSubs
New NATS PubSub func
I instantiate the Publisher and Subscribe objects according to the docs:
Expected behavior
What I expect is that the NATS server initializes and the Publisher and Subscriber objects both connect correctly to the server. Resulting in allowing for events to be sent and retrieved.
Actual behavior
Unfortunately, no connection is available.
I do a quick retry to check if the error object is nil, if it is then break out of the retry, otherwise reattempt to set the objects:
It keeps failing on me.
It exhausts the retries. The only error message I receive is: "cannot connect to nats: nats: no servers available for connection"
Possible solution
I have attempted to delay the initialization a bit with this:
Effectively is checks for enabled JetStream before returning the NATS server instance.
That works for me.
I'm no expert at all, but maybe Watermill silently fails setting up PubSub when using the JetStream configuration due to some underlying delay. I am not sure.