Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions pkg/queue/queue.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,40 @@
package queue

import (
"fmt"
)

type QueueInterface interface {
Connect() error
}

type QueueOptions interface {
Validate() error
}

// Queue represents a queue client instance
type Queue struct {
Options QueueOptions
Client QueueInterface
}

func New(opts QueueOptions, client ...QueueInterface) (*Queue, error) {
// If no client provided, create default production client
var err error
var q QueueInterface
if len(client) == 0 {
// Type assert to RabbitOptions for creating RabbitMQ client
if rabbitOpts, ok := opts.(*RabbitOptions); ok {
q, err = NewRabbitMQ(rabbitOpts)
} else {
return nil, fmt.Errorf("unsupported queue options type")
}
} else {
q, err = client[0], nil
}

return &Queue{
Options: opts,
Client: q,
}, err
}
11 changes: 8 additions & 3 deletions pkg/queue/rabbitmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ type RabbitOptions struct {
Exchange string
}

// Validate validates the RabbitOptions configuration
func (r *RabbitOptions) Validate() error {
validate := validator.New()
return validate.Struct(r)
}

// RabbitOptionsBuilder provides a fluent interface for building Rabbit options
type RabbitOptionsBuilder struct {
options *RabbitOptions
Expand Down Expand Up @@ -111,10 +117,9 @@ type RabbitMQ struct {

// NewRabbitMQ creates a new RabbitMQ with the provided RabbitMQ settings
func NewRabbitMQ(options *RabbitOptions) (*RabbitMQ, error) {

// Validate RabbitMQ configuration
validate := validator.New()
err := validate.Struct(options)
if err != nil {
if err := options.Validate(); err != nil {
return nil, err
}
// Extract protocol from host if present, otherwise default to amqp://
Expand Down
41 changes: 31 additions & 10 deletions pkg/queue/rabbitmq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,12 +325,13 @@ func TestRabbitConnectionStringGeneration(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
opts := tt.buildOpts()
rabbit, err := NewRabbitMQ(opts)

queueClient, err := New(opts)
if err != nil {
t.Fatalf("failed to create RabbitMQ instance: %v", err)
}

rabbit := queueClient.Client.(*RabbitMQ)

if rabbit.connectionString != tt.expectedConnStr {
t.Errorf("expected connection string '%s', got '%s'", tt.expectedConnStr, rabbit.connectionString)
}
Expand Down Expand Up @@ -373,11 +374,17 @@ func TestRabbitMQIntegration(t *testing.T) {
Build()

// Create RabbitMQ instance
rabbit, err := NewRabbitMQ(opts)
queueClient, err := New(opts)
if err != nil {
t.Fatalf("Failed to create RabbitMQ instance: %v", err)
}

// Get the underlying RabbitMQ client
rabbit, ok := queueClient.Client.(*RabbitMQ)
if !ok {
t.Fatal("Failed to assert RabbitMQ client type")
}

// Test connection
err = rabbit.Connect()
if err != nil {
Expand Down Expand Up @@ -422,11 +429,13 @@ func TestRabbitMQIntegration(t *testing.T) {
SetExchange(exchange).
Build()

rabbit, err := NewRabbitMQ(opts)
queueClient, err := New(opts)
if err != nil {
t.Fatalf("Failed to create RabbitMQ instance: %v", err)
}

rabbit := queueClient.Client.(*RabbitMQ)

err = rabbit.Connect()
if err != nil {
t.Fatalf("Failed to connect to RabbitMQ: %v", err)
Expand Down Expand Up @@ -463,11 +472,13 @@ func TestRabbitMQIntegration(t *testing.T) {
// Create multiple connections
connections := make([]*RabbitMQ, 3)
for i := 0; i < 3; i++ {
rabbit, err := NewRabbitMQ(opts)
queueClient, err := New(opts)
if err != nil {
t.Fatalf("Failed to create RabbitMQ instance %d: %v", i, err)
}

rabbit := queueClient.Client.(*RabbitMQ)

err = rabbit.Connect()
if err != nil {
t.Fatalf("Failed to connect RabbitMQ instance %d: %v", i, err)
Expand Down Expand Up @@ -550,11 +561,13 @@ func TestDisasterRecovery(t *testing.T) {
SetPassword("guest").
Build()

rabbit, err := NewRabbitMQ(opts)
queueClient, err := New(opts)
if err != nil {
t.Fatalf("failed to create RabbitMQ instance: %v", err)
}

rabbit := queueClient.Client.(*RabbitMQ)

// Track whether handler was called and with what payload
var handlerCalled bool
var receivedPayload []byte
Expand Down Expand Up @@ -600,11 +613,13 @@ func TestDisasterRecovery(t *testing.T) {
SetPassword("guest").
Build()

rabbit, err := NewRabbitMQ(opts)
queueClient, err := New(opts)
if err != nil {
t.Fatalf("failed to create RabbitMQ instance: %v", err)
}

rabbit := queueClient.Client.(*RabbitMQ)

// Set a handler that returns an error
expectedErr := fmt.Errorf("handler error")
rabbit.SetDisasterRecoveryHandler(func(payload []byte) error {
Expand Down Expand Up @@ -634,11 +649,13 @@ func TestDisasterRecovery(t *testing.T) {
SetPassword("guest").
Build()

rabbit, err := NewRabbitMQ(opts)
queueClient, err := New(opts)
if err != nil {
t.Fatalf("failed to create RabbitMQ instance: %v", err)
}

rabbit := queueClient.Client.(*RabbitMQ)

// Test payload
testPayload := []byte(`{"test": "data"}`)

Expand All @@ -662,11 +679,13 @@ func TestDisasterRecovery(t *testing.T) {
SetPassword("guest").
Build()

rabbit, err := NewRabbitMQ(opts)
queueClient, err := New(opts)
if err != nil {
t.Fatalf("failed to create RabbitMQ instance: %v", err)
}

rabbit := queueClient.Client.(*RabbitMQ)

// Track handler call
var handlerCalled bool
var receivedPayload []byte
Expand Down Expand Up @@ -710,11 +729,13 @@ func TestDisasterRecovery(t *testing.T) {
SetPassword("guest").
Build()

rabbit, err := NewRabbitMQ(opts)
queueClient, err := New(opts)
if err != nil {
t.Fatalf("failed to create RabbitMQ instance: %v", err)
}

rabbit := queueClient.Client.(*RabbitMQ)

// Track handler call
var handlerCalled bool
var receivedPayload []byte
Expand Down
Loading