Hi, I’m encountering a message duplication issue.
As per my requirement, I want the delivery policy set to DeliverAll, and I do not want the JetStream consumer to be deleted when the subscriber disconnects. However, since the JetStream consumer is being deleted and recreated, messages that were already acknowledged are getting redelivered.
Can someone help me resolve this issue?
package used:
"github.com/ThreeDotsLabs/watermill-nats/v2/pkg/nats"
Subscriber client code:
marshaler := &BytesMarshalerUnmarshaler{}
options := []nc.Option{
nc.RetryOnFailedConnect(true),
nc.Timeout(30 * time.Second),
nc.ReconnectWait(1 * time.Second),
}
subscribeOptions := []nc.SubOpt{
nc.DeliverAll(),
nc.AckExplicit(),
nc.MaxDeliver(1),
}
jsConfig := watermill_nats.JetStreamConfig{
Disabled: false,
AutoProvision: false,
ConnectOptions: nil,
SubscribeOptions: subscribeOptions,
PublishOptions: nil,
TrackMsgId: false,
AckAsync: false,
DurablePrefix: "ph-rtpcore",
}
subscriber, err := watermill_nats.NewSubscriber(
watermill_nats.SubscriberConfig{
URL: msgBroker.Addr,
AckWaitTimeout: 30 * time.Second,
CloseTimeout: 30 * time.Second,
NatsOptions: options,
Unmarshaler: marshaler,
JetStream: jsConfig,
QueueGroupPrefix: "ph-rtpcore",
},
wmLogger,
)
if err != nil {
return nil, nil, err
}
Closing subscriber:
subscriber.Close()
Hi, I’m encountering a message duplication issue.
As per my requirement, I want the delivery policy set to DeliverAll, and I do not want the JetStream consumer to be deleted when the subscriber disconnects. However, since the JetStream consumer is being deleted and recreated, messages that were already acknowledged are getting redelivered.
Can someone help me resolve this issue?
package used:
"github.com/ThreeDotsLabs/watermill-nats/v2/pkg/nats"
Subscriber client code:
marshaler := &BytesMarshalerUnmarshaler{}
options := []nc.Option{
nc.RetryOnFailedConnect(true),
nc.Timeout(30 * time.Second),
nc.ReconnectWait(1 * time.Second),
}
subscribeOptions := []nc.SubOpt{
nc.DeliverAll(),
nc.AckExplicit(),
nc.MaxDeliver(1),
}
Closing subscriber:
subscriber.Close()