diff --git a/go.mod b/go.mod index a208c92..f9c6384 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.24.10 require ( github.com/go-playground/validator/v10 v10.30.1 github.com/rabbitmq/amqp091-go v1.10.0 - github.com/uug-ai/models v1.2.24 + github.com/uug-ai/models v1.2.26 ) require ( diff --git a/go.sum b/go.sum index ae22238..c8b70e9 100644 --- a/go.sum +++ b/go.sum @@ -18,8 +18,8 @@ github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzuk github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= -github.com/uug-ai/models v1.2.24 h1:dQzajYI3WiePM6a3z0/sa+s1VDPSUGjy2pA5r/S0j9c= -github.com/uug-ai/models v1.2.24/go.mod h1:0EHI6EKF/f2J1iXmFuPFuZZ2yv9Q6kphqcS8wzHYGd8= +github.com/uug-ai/models v1.2.26 h1:gHqq/+HT7D9EXEUpgLJVWbfjC+CwYmRHBJoRsMZwJfI= +github.com/uug-ai/models v1.2.26/go.mod h1:0EHI6EKF/f2J1iXmFuPFuZZ2yv9Q6kphqcS8wzHYGd8= go.mongodb.org/mongo-driver v1.17.4 h1:jUorfmVzljjr0FLzYQsGP8cgN/qzzxlY9Vh0C9KFXVw= go.mongodb.org/mongo-driver v1.17.4/go.mod h1:Hy04i7O2kC4RS06ZrhPRqj/u4DTYkFDAAccj+rVKqgQ= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= diff --git a/pkg/queue/mock.go b/pkg/queue/mock.go index 2e250e8..c03adcd 100644 --- a/pkg/queue/mock.go +++ b/pkg/queue/mock.go @@ -1,6 +1,10 @@ package queue import ( + "encoding/json" + "os" + "time" + "github.com/go-playground/validator/v10" "github.com/uug-ai/models/pkg/models" ) @@ -41,6 +45,7 @@ type MockQueue struct { running bool // Running flag for controlling the message loop ConnectCalled bool // Flag to indicate if Connect was called ConnectError error // Error to return on Connect + CloseCalled bool // Flag to indicate if Close was called } // NewMock creates a new MockQueue instance @@ -64,3 +69,173 @@ func (m *MockQueue) Connect() error { m.ConnectCalled = true return m.ConnectError } + +// Close stops the queue operations and cleans up resources +func (m *MockQueue) Close() error { + m.CloseCalled = true + + // Stop the message processing loop + m.running = false + + // Clear internal queues + m.messageQueue = make([]models.PipelineEvent, 0) + m.sentMessages = make([]string, 0) + return nil +} + +func (m *MockQueue) ReadMessages(handleMessage models.MessageHandler, handlePrometheus models.PrometheusHandler, args ...any) error { + + m.running = true + // Process messages from internal queue + for m.running { + if len(m.messageQueue) > 0 { + // Get the first message from the queue + pipelineEvent := m.messageQueue[0] + m.messageQueue = m.messageQueue[1:] // Remove processed message + + // Simulate processing time + startTime := time.Now() + + // Call the message handler + pipelineAction, pipelineEvent, _ := handleMessage(pipelineEvent, args...) + + switch pipelineAction { + case models.PipelineForward: + _ = pipelineEvent // Suppress unused variable warning + + case models.PipelineCancel: + _ = pipelineEvent + case models.PipelineRetry: + // Put the message back at the end of the queue + m.messageQueue = append(m.messageQueue, pipelineEvent) + _ = pipelineEvent + } + + // Simulate processing time event + endTime := time.Now() + processingTime := endTime.Sub(startTime) + e := models.PipelineMetrics{ + ProcessingTime: processingTime.Seconds(), + } + handlePrometheus(e) + } else { + // No messages, wait a bit + time.Sleep(100 * time.Millisecond) + } + } + + return nil +} + +// RouteMessages simulates routing messages to other queues +func (m *MockQueue) RouteMessages(handleMessage models.MessageHandler, handlePrometheus models.PrometheusHandler, args ...any) error { + + m.running = true + + for m.running { + if len(m.messageQueue) > 0 { + // Get the first message from the queue + pipelineEvent := m.messageQueue[0] + m.messageQueue = m.messageQueue[1:] // Remove processed message + + // Simulate processing time + startTime := time.Now() + + // Simulate forwarding logic from SQS implementation + if len(pipelineEvent.Stages) > 0 { + // In a real implementation, this would send to the next queue + // For mock, we just log the action + } + + // Simulate processing time event + endTime := time.Now() + processingTime := endTime.Sub(startTime) + e := models.PipelineMetrics{ + ProcessingTime: processingTime.Seconds(), + } + handlePrometheus(e) + } else { + // No messages, wait a bit + time.Sleep(100 * time.Millisecond) + } + } + + return nil +} + +// Publish sends a message immediately to the specified RabbitMQ queue +func (m *MockQueue) Publish(queueName string, payload []byte) error { + + // Parse the payload as a PipelineEvent + var pipelineEvent models.PipelineEvent + if err := json.Unmarshal([]byte(payload), &pipelineEvent); err != nil { + return err + } + + // Add the message to internal queue + m.messageQueue = append(m.messageQueue, pipelineEvent) + + // Store the message payload for testing purposes + m.sentMessages = append(m.sentMessages, string(payload)) + return nil +} + +// Helper methods for testing +// AddMessageToQueue adds a message directly to the internal queue for testing +func (m *MockQueue) AddMessageToQueue(event models.PipelineEvent) { + m.messageQueue = append(m.messageQueue, event) +} + +// GetQueueSize returns the current number of messages in the internal queue +func (m *MockQueue) GetQueueSize() int { + return len(m.messageQueue) +} + +// GetSentMessages returns a copy of all sent messages for testing verification +func (m *MockQueue) GetSentMessages() []string { + messages := make([]string, len(m.sentMessages)) + copy(messages, m.sentMessages) + return messages +} + +// ClearQueues clears both internal message queue and sent messages +func (m *MockQueue) ClearQueues() { + m.messageQueue = make([]models.PipelineEvent, 0) + m.sentMessages = make([]string, 0) +} + +// IsRunning returns the current running state of the queue +func (m *MockQueue) IsRunning() bool { + return m.running +} + +// Stop stops the queue operations without clearing the messages +func (m *MockQueue) Stop() { + m.running = false +} + +// LoadMessages loads pipeline events from a JSON file and sends them to the queue +func (m *MockQueue) LoadMessages(filename string) error { + // Read the mock.json which is an array with models.PipelineEvent + file, err := os.ReadFile(filename) + if err != nil { + return err + } + var events []models.PipelineEvent + err = json.Unmarshal(file, &events) + if err != nil { + return err + } + for _, event := range events { + // convert event to a string + eventBytes, err := json.Marshal(event) + if err != nil { + return err + } + err = m.Publish(m.options.QueueName, eventBytes) + if err != nil { + return err + } + } + return nil +} diff --git a/pkg/queue/mock_test.go b/pkg/queue/mock_test.go index efb227a..14d94b8 100644 --- a/pkg/queue/mock_test.go +++ b/pkg/queue/mock_test.go @@ -1,6 +1,7 @@ package queue import ( + "os" "testing" ) @@ -248,3 +249,244 @@ type MockError struct { func (e *MockError) Error() string { return e.Message } + +// TestLoadMessages tests the LoadMessages method of MockQueue +func TestLoadMessages(t *testing.T) { + t.Run("LoadMessagesFromValidFile", func(t *testing.T) { + // Create a temporary test file with valid JSON + testFile := t.TempDir() + "/test_events.json" + validJSON := `[ + { + "id": "event-1", + "stages": [] + }, + { + "id": "event-2", + "stages": [] + } + ]` + + if err := os.WriteFile(testFile, []byte(validJSON), 0644); err != nil { + t.Fatalf("failed to create test file: %v", err) + } + + opts := NewMockOptions().SetQueueName("test-queue").Build() + mock, err := NewMock(opts) + if err != nil { + t.Fatalf("failed to create MockQueue: %v", err) + } + + // Load messages from file + err = mock.LoadMessages(testFile) + if err != nil { + t.Errorf("expected no error loading messages, got: %v", err) + } + + // Verify messages were loaded + if mock.GetQueueSize() != 2 { + t.Errorf("expected 2 messages in queue, got %d", mock.GetQueueSize()) + } + + // Verify messages were also recorded as sent + sentMessages := mock.GetSentMessages() + if len(sentMessages) != 2 { + t.Errorf("expected 2 sent messages, got %d", len(sentMessages)) + } + }) + + t.Run("LoadMessagesFromEmptyArray", func(t *testing.T) { + // Create a test file with empty array + testFile := t.TempDir() + "/empty_events.json" + emptyJSON := `[]` + + if err := os.WriteFile(testFile, []byte(emptyJSON), 0644); err != nil { + t.Fatalf("failed to create test file: %v", err) + } + + opts := NewMockOptions().SetQueueName("test-queue").Build() + mock, err := NewMock(opts) + if err != nil { + t.Fatalf("failed to create MockQueue: %v", err) + } + + err = mock.LoadMessages(testFile) + if err != nil { + t.Errorf("expected no error loading empty array, got: %v", err) + } + + if mock.GetQueueSize() != 0 { + t.Errorf("expected 0 messages in queue, got %d", mock.GetQueueSize()) + } + }) + + t.Run("LoadMessagesFromNonExistentFile", func(t *testing.T) { + opts := NewMockOptions().SetQueueName("test-queue").Build() + mock, err := NewMock(opts) + if err != nil { + t.Fatalf("failed to create MockQueue: %v", err) + } + + // Attempt to load from non-existent file + err = mock.LoadMessages("/non/existent/file.json") + if err == nil { + t.Error("expected error loading from non-existent file, got nil") + } + + // Queue should remain empty + if mock.GetQueueSize() != 0 { + t.Errorf("expected 0 messages in queue after failed load, got %d", mock.GetQueueSize()) + } + }) + + t.Run("LoadMessagesFromInvalidJSON", func(t *testing.T) { + // Create a test file with invalid JSON + testFile := t.TempDir() + "/invalid_events.json" + invalidJSON := `{this is not valid json}` + + if err := os.WriteFile(testFile, []byte(invalidJSON), 0644); err != nil { + t.Fatalf("failed to create test file: %v", err) + } + + opts := NewMockOptions().SetQueueName("test-queue").Build() + mock, err := NewMock(opts) + if err != nil { + t.Fatalf("failed to create MockQueue: %v", err) + } + + err = mock.LoadMessages(testFile) + if err == nil { + t.Error("expected error loading invalid JSON, got nil") + } + + // Queue should remain empty + if mock.GetQueueSize() != 0 { + t.Errorf("expected 0 messages in queue after failed load, got %d", mock.GetQueueSize()) + } + }) + + t.Run("LoadMessagesFromWrongJSONStructure", func(t *testing.T) { + // Create a test file with wrong structure (object instead of array) + testFile := t.TempDir() + "/wrong_structure.json" + wrongJSON := `{"id": "event-1", "stages": []}` + + if err := os.WriteFile(testFile, []byte(wrongJSON), 0644); err != nil { + t.Fatalf("failed to create test file: %v", err) + } + + opts := NewMockOptions().SetQueueName("test-queue").Build() + mock, err := NewMock(opts) + if err != nil { + t.Fatalf("failed to create MockQueue: %v", err) + } + + err = mock.LoadMessages(testFile) + if err == nil { + t.Error("expected error loading wrong JSON structure, got nil") + } + }) + + t.Run("LoadMessagesMultipleTimes", func(t *testing.T) { + // Create test files + testFile1 := t.TempDir() + "/events1.json" + testFile2 := t.TempDir() + "/events2.json" + + json1 := `[{"id": "event-1", "stages": []}]` + json2 := `[{"id": "event-2", "stages": []}, {"id": "event-3", "stages": []}]` + + if err := os.WriteFile(testFile1, []byte(json1), 0644); err != nil { + t.Fatalf("failed to create test file 1: %v", err) + } + if err := os.WriteFile(testFile2, []byte(json2), 0644); err != nil { + t.Fatalf("failed to create test file 2: %v", err) + } + + opts := NewMockOptions().SetQueueName("test-queue").Build() + mock, err := NewMock(opts) + if err != nil { + t.Fatalf("failed to create MockQueue: %v", err) + } + + // Load first file + err = mock.LoadMessages(testFile1) + if err != nil { + t.Errorf("expected no error loading first file, got: %v", err) + } + + if mock.GetQueueSize() != 1 { + t.Errorf("expected 1 message after first load, got %d", mock.GetQueueSize()) + } + + // Load second file + err = mock.LoadMessages(testFile2) + if err != nil { + t.Errorf("expected no error loading second file, got: %v", err) + } + + // Should accumulate messages + if mock.GetQueueSize() != 3 { + t.Errorf("expected 3 messages after second load, got %d", mock.GetQueueSize()) + } + + // Verify sent messages count + sentMessages := mock.GetSentMessages() + if len(sentMessages) != 3 { + t.Errorf("expected 3 sent messages, got %d", len(sentMessages)) + } + }) + + t.Run("LoadMessagesWithComplexEvents", func(t *testing.T) { + // Create a test file with complex event structure + testFile := t.TempDir() + "/complex_events.json" + complexJSON := `[ + { + "id": "event-1", + "stages": [ + {"name": "stage1", "status": "pending"}, + {"name": "stage2", "status": "pending"} + ], + "metadata": { + "key": "value" + } + } + ]` + + if err := os.WriteFile(testFile, []byte(complexJSON), 0644); err != nil { + t.Fatalf("failed to create test file: %v", err) + } + + opts := NewMockOptions().SetQueueName("test-queue").Build() + mock, err := NewMock(opts) + if err != nil { + t.Fatalf("failed to create MockQueue: %v", err) + } + + err = mock.LoadMessages(testFile) + if err != nil { + t.Errorf("expected no error loading complex events, got: %v", err) + } + + if mock.GetQueueSize() != 1 { + t.Errorf("expected 1 message in queue, got %d", mock.GetQueueSize()) + } + }) + + t.Run("LoadMessagesWithEmptyFile", func(t *testing.T) { + // Create an empty file + testFile := t.TempDir() + "/empty.json" + + if err := os.WriteFile(testFile, []byte(""), 0644); err != nil { + t.Fatalf("failed to create test file: %v", err) + } + + opts := NewMockOptions().SetQueueName("test-queue").Build() + mock, err := NewMock(opts) + if err != nil { + t.Fatalf("failed to create MockQueue: %v", err) + } + + err = mock.LoadMessages(testFile) + if err == nil { + t.Error("expected error loading empty file, got nil") + } + }) +} diff --git a/pkg/queue/rabbitmq.go b/pkg/queue/rabbitmq.go index 204663b..d73d35a 100644 --- a/pkg/queue/rabbitmq.go +++ b/pkg/queue/rabbitmq.go @@ -1,22 +1,31 @@ package queue import ( + "context" + "encoding/json" + "fmt" "strings" "time" "github.com/go-playground/validator/v10" amqp "github.com/rabbitmq/amqp091-go" + "github.com/uug-ai/models/pkg/models" ) +// DisasterRecoveryHandler is a function type for handling messages that failed to publish +type DisasterRecoveryHandler func([]byte) error + // RabbitOptions holds the configuration for RabbitMQ type RabbitOptions struct { - QueueName string `validate:"required"` - Uri string - Host string `validate:"required"` - Username string `validate:"required"` - Password string `validate:"required"` - PrefetchCount int - Exchange string + ConsumerQueue string `validate:"required"` // Queue from which to consume messages, one consumer per queue + DeadletterQueue string `validate:"required"` // When something goes wrong, messages are sent here + RouterQueue string `validate:"required"` // Router queue for routing messages, the message will be send to this queue if Forward action reached. + Uri string + Host string `validate:"required"` + Username string `validate:"required"` + Password string `validate:"required"` + PrefetchCount int + Exchange string } // RabbitOptionsBuilder provides a fluent interface for building Rabbit options @@ -31,9 +40,21 @@ func NewRabbitOptions() *RabbitOptionsBuilder { } } -// SetQueueName sets the queue name -func (b *RabbitOptionsBuilder) SetQueueName(queueName string) *RabbitOptionsBuilder { - b.options.QueueName = queueName +// SetConsumerQueue sets the consumer queue name +func (b *RabbitOptionsBuilder) SetConsumerQueue(queueName string) *RabbitOptionsBuilder { + b.options.ConsumerQueue = queueName + return b +} + +// SetDeadletterQueue sets the deadletter queue name +func (b *RabbitOptionsBuilder) SetDeadletterQueue(queueName string) *RabbitOptionsBuilder { + b.options.DeadletterQueue = queueName + return b +} + +// SetRouterQueue sets the router queue name +func (b *RabbitOptionsBuilder) SetRouterQueue(queueName string) *RabbitOptionsBuilder { + b.options.RouterQueue = queueName return b } @@ -80,11 +101,12 @@ func (b *RabbitOptionsBuilder) Build() *RabbitOptions { // RabbitMQ wraps rabbitmq.Client to implement the Queue interface type RabbitMQ struct { - options *RabbitOptions - connectionString string // e.g., amqp://user:pass@host:port/ - Connection *amqp.Connection // The underlying RabbitMQ connection - Consumer *amqp.Channel // Channel for consuming messages - Producer *amqp.Channel // Channel for producing messages + options *RabbitOptions + connectionString string // e.g., amqp://user:pass@host:port/ + Connection *amqp.Connection // The underlying RabbitMQ connection + Consumer *amqp.Channel // Channel for consuming messages + Producer *amqp.Channel // Channel for producing messages + disasterRecoveryHandler DisasterRecoveryHandler // Optional handler for failed messages } // NewRabbitMQ creates a new RabbitMQ with the provided RabbitMQ settings @@ -111,6 +133,12 @@ func NewRabbitMQ(options *RabbitOptions) (*RabbitMQ, error) { }, nil } +// SetDisasterRecoveryHandler sets a custom disaster recovery handler for failed message publishes +func (r *RabbitMQ) SetDisasterRecoveryHandler(handler DisasterRecoveryHandler) { + r.disasterRecoveryHandler = handler +} + +// Connect establishes the RabbitMQ connection and channels func (r *RabbitMQ) Connect() error { prefetchCount := 5 @@ -154,15 +182,25 @@ func (r *RabbitMQ) Connect() error { return nil } +// Reconnect attempts to re-establish the RabbitMQ connection +// Basic implementation just calls Connect again +func (r *RabbitMQ) Reconnect() error { + err := r.Connect() + if err != nil { + return err + } + return nil +} + // declareQueue declares a quorum queue with the configured queue name func (r *RabbitMQ) declareQueue() error { // Declare quorum queue (idempotent - succeeds if queue exists with same parameters) _, err := r.Consumer.QueueDeclare( - r.options.QueueName, // name - true, // durable - false, // delete when unused - false, // exclusive - false, // no-wait + r.options.ConsumerQueue, // name + true, // durable + false, // delete when unused + false, // exclusive + false, // no-wait amqp.Table{ "x-queue-type": "quorum", }, // arguments @@ -172,3 +210,311 @@ func (r *RabbitMQ) declareQueue() error { } return nil } + +// ReadMessages reads messages from the RabbitMQ queue, processes them using the provided handler, +// and reports metrics using the provided Prometheus handler. It will then take action based on the handler's result. +// Forwards, cancels, retries or sends to deadletter as needed. +// +// Parameters: +// - handleMessage: function to process each message +// - handlePrometheus: function to handle metrics reporting +// - args: additional arguments to pass to the message handler +func (r *RabbitMQ) ReadMessages(handleMessage models.MessageHandler, handlePrometheus models.PrometheusHandler, args ...any) error { + + // Subscribe to a queue + if r.Consumer == nil { + return fmt.Errorf("RabbitMQ channel is not initialized") + } + + msgs, err := r.Consumer.Consume( + r.options.ConsumerQueue, // queue + "", // consumer + false, // auto-ack + false, // exclusive + false, // no-local + false, // no-wait + nil, + ) + if err != nil { + return err + } + + for d := range msgs { + + // Chrono start, we will measure processing time (start to end) + startTime := time.Now() + + // Extract message + payload := d.Body + + // Unmarshal message body into PipelineEvent + var pipelineEvent models.PipelineEvent + err = json.Unmarshal(payload, &pipelineEvent) + if err != nil { + // Failed to unmarshal - send to deadletter and ack to remove from queue + if dlErr := r.AddToDeadletter(payload); dlErr != nil { + // Deadletter failed - use disaster recovery + r.DisasterRecovery(payload) + } + d.Ack(false) // Ack even on error to prevent infinite redelivery + continue // Continue processing next message + } + + // We will override payload with the new payload + // some consumers might provide additional information, that can be leveraged + // later (stateful messaging). + pipelineAction, pipelineEvent, _ := handleMessage(pipelineEvent, args...) + + // Depending on action, we either forward, cancel or retry + switch pipelineAction { + case models.PipelineForward: + // Bring event to the next stage + pipelineEvent.Stages = pipelineEvent.Stages[1:] + if len(pipelineEvent.Stages) == 0 { + // No more stages, nothing to do + break + } + // Marshal updated event + pipelineEventPayload, err := json.Marshal(pipelineEvent) + if err != nil { + // Marshal failed - send original payload to deadletter + if dlErr := r.AddToDeadletter(payload); dlErr != nil { + r.DisasterRecovery(payload) + } + d.Ack(false) + continue + } + topic := r.options.RouterQueue + err = r.Publish(topic, pipelineEventPayload) + if err != nil { + // Publish failed - send to deadletter + if dlErr := r.AddToDeadletter(payload); dlErr != nil { + r.DisasterRecovery(payload) + } + d.Ack(false) + continue + } + case models.PipelineError: + // Send to deadletter queue + pipelineEventPayload, err := json.Marshal(pipelineEvent) + if err != nil { + // Marshal failed - send original payload to deadletter + if dlErr := r.AddToDeadletter(payload); dlErr != nil { + r.DisasterRecovery(payload) + } + d.Ack(false) + continue + } + topic := r.options.DeadletterQueue + err = r.Publish(topic, pipelineEventPayload) + if err != nil { + // Publish to deadletter failed - try with original payload + if dlErr := r.AddToDeadletter(payload); dlErr != nil { + r.DisasterRecovery(payload) + } + d.Ack(false) + continue + } + case models.PipelineCancel: + // Nothing to do, just acknowledge, message will be removed from the queue. + case models.PipelineRetry: + // Re-publish the same message to the same queue for retry + backoff := 5 + r.PublishWithDelay(r.options.ConsumerQueue, payload, backoff) + } + + // Always acknowledge messages regardless of sync mode + err = d.Ack(false) + if err != nil { + // Ack failed - try disaster recovery but continue processing + // The message may be redelivered, but we shouldn't stop the consumer + r.DisasterRecovery(payload) + continue + } + + // Chrono end + endTime := time.Now() + processingTime := endTime.Sub(startTime) + e := models.PipelineMetrics{ + ProcessingTime: processingTime.Seconds(), + } + handlePrometheus(e) + } + + // Check if connection/channel was closed + if r.Consumer.IsClosed() || r.Connection.IsClosed() { + return fmt.Errorf("connection lost") + } + + r.Close() + return nil +} + +func (r *RabbitMQ) RouteMessages(handleMessage models.MessageHandler, handlePrometheus models.PrometheusHandler, args ...any) error { + + // Subscribe to a queue + if r.Consumer == nil { + return fmt.Errorf("RabbitMQ channel is not initialized") + } + + msgs, err := r.Consumer.Consume( + r.options.ConsumerQueue, // queue + "", // consumer + false, // auto-ack + false, // exclusive + false, // no-local + false, // no-wait + nil, + ) + if err != nil { + return err + } + + for d := range msgs { + + // Chrono start, we will measure processing time (start to end) + startTime := time.Now() + + // Extract message + payload := d.Body + + // Unmarshal message body into PipelineEvent + var pipelineEvent models.PipelineEvent + err = json.Unmarshal(payload, &pipelineEvent) + if err != nil { + // Failed to unmarshal - send to deadletter and ack to remove from queue + if dlErr := r.AddToDeadletter(payload); dlErr != nil { + // Deadletter failed - use disaster recovery + r.DisasterRecovery(payload) + } + d.Ack(false) // Ack even on error to prevent infinite redelivery + continue // Continue processing next message + } + + if len(pipelineEvent.Stages) > 0 { + nextQueue := pipelineEvent.Stages[0] + nextQueue = r.formatQueueName(nextQueue) // Apply legacy naming convention, we will remove this later + err = r.Publish(nextQueue, payload) + if err != nil { + // Publish failed - send to deadletter + if dlErr := r.AddToDeadletter(payload); dlErr != nil { + r.DisasterRecovery(payload) + } + d.Ack(false) + continue + } + } + + // Always acknowledge messages regardless of sync mode + err = d.Ack(false) + if err != nil { + // Ack failed - try disaster recovery but continue processing + // The message may be redelivered, but we shouldn't stop the consumer + r.DisasterRecovery(payload) + continue + } + + // Chrono end + endTime := time.Now() + processingTime := endTime.Sub(startTime) + e := models.PipelineMetrics{ + ProcessingTime: processingTime.Seconds(), + } + handlePrometheus(e) + } + r.Close() + return nil +} + +func (r *RabbitMQ) Close() { + if r.Consumer != nil { + r.Consumer.Close() + } + if r.Producer != nil { + r.Producer.Close() + } + if r.Connection != nil { + r.Connection.Close() + } +} + +// formatQueueName applies legacy naming convention to queue names +// TODO: Remove this once legacy naming convention is deprecated +func (r *RabbitMQ) formatQueueName(queueName string) string { + return "kcloud-" + queueName + "-queue" +} + +// Publish sends a message immediately to the specified RabbitMQ queue +func (r *RabbitMQ) Publish(queueName string, payload []byte) error { + if r.Producer == nil { + r.DisasterRecovery(payload) + return fmt.Errorf("RabbitMQ producer channel is not initialized") + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + err := r.Producer.PublishWithContext(ctx, + "", // exchange + queueName, // routing key (queue name) + false, // mandatory + false, // immediate + amqp.Publishing{ + ContentType: "application/json", + Body: payload, + }, + ) + if err != nil { + r.DisasterRecovery(payload) + } + return err +} + +// PublishWithDelay sends a message to the specified RabbitMQ queue after a delay +// This is non-blocking and runs in a goroutine. Errors are not returned to the caller. +// For production use, consider adding logging or an error channel. +func (r *RabbitMQ) PublishWithDelay(queueName string, payload []byte, backoff int) { + go func() { + time.Sleep(time.Duration(backoff) * time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // Note: errors from delayed publishes are not returned to caller + // Consider adding logging or error channel if needed + err := r.Producer.PublishWithContext(ctx, + "", // exchange + queueName, // routing key (queue name) + false, // mandatory + false, // immediate + amqp.Publishing{ + ContentType: "application/json", + Body: payload, + }, + ) + if err != nil { + r.DisasterRecovery(payload) + } + }() +} + +// AddToDeadletter adds a message to the deadletter queue +func (r RabbitMQ) AddToDeadletter(payload []byte) error { + topic := r.options.DeadletterQueue + return r.Publish(topic, payload) +} + +// DisasterRecovery handles messages that failed to publish +// Uses the injected disaster recovery handler if provided, otherwise does nothing +func (r *RabbitMQ) DisasterRecovery(payload []byte) error { + if r.disasterRecoveryHandler != nil { + return r.disasterRecoveryHandler(payload) + } + // No handler configured - message will be lost + // Consider logging this in production + return nil +} + +func (r RabbitMQ) LoadMessages(filename string) error { + // This method is only meaningful for MockQueue + return nil +} diff --git a/pkg/queue/rabbitmq_test.go b/pkg/queue/rabbitmq_test.go index 4209c19..9a2b4d8 100644 --- a/pkg/queue/rabbitmq_test.go +++ b/pkg/queue/rabbitmq_test.go @@ -1,6 +1,7 @@ package queue import ( + "fmt" "os" "testing" "time" @@ -17,7 +18,9 @@ func TestRabbitOptionsValidation(t *testing.T) { name: "ValidOptionsComplete", buildOpts: func() *RabbitOptions { return NewRabbitOptions(). - SetQueueName("test-queue"). + SetConsumerQueue("test-queue"). + SetDeadletterQueue("test-queue-dlq"). + SetRouterQueue("test-queue-router"). SetUri("amqp://user:pass@localhost:5672/"). SetHost("localhost:5672"). SetUsername("user"). @@ -31,7 +34,9 @@ func TestRabbitOptionsValidation(t *testing.T) { name: "ValidOptionsMinimal", buildOpts: func() *RabbitOptions { return NewRabbitOptions(). - SetQueueName("test-queue"). + SetConsumerQueue("test-queue"). + SetDeadletterQueue("test-queue-dlq"). + SetRouterQueue("test-queue-router"). SetUri("amqp://localhost"). SetHost("localhost"). SetUsername("guest"). @@ -42,9 +47,11 @@ func TestRabbitOptionsValidation(t *testing.T) { expectError: false, }, { - name: "MissingQueueName", + name: "MissingConsumerQueue", buildOpts: func() *RabbitOptions { return NewRabbitOptions(). + SetDeadletterQueue("test-queue-dlq"). + SetRouterQueue("test-queue-router"). SetUri("amqp://localhost"). SetHost("localhost"). SetUsername("user"). @@ -58,7 +65,9 @@ func TestRabbitOptionsValidation(t *testing.T) { name: "MissingHost", buildOpts: func() *RabbitOptions { return NewRabbitOptions(). - SetQueueName("test-queue"). + SetConsumerQueue("test-queue"). + SetDeadletterQueue("test-queue-dlq"). + SetRouterQueue("test-queue-router"). SetUri("amqp://localhost"). SetUsername("user"). SetPassword("pass"). @@ -71,7 +80,9 @@ func TestRabbitOptionsValidation(t *testing.T) { name: "MissingUsername", buildOpts: func() *RabbitOptions { return NewRabbitOptions(). - SetQueueName("test-queue"). + SetConsumerQueue("test-queue"). + SetDeadletterQueue("test-queue-dlq"). + SetRouterQueue("test-queue-router"). SetUri("amqp://localhost"). SetHost("localhost"). SetPassword("pass"). @@ -84,7 +95,9 @@ func TestRabbitOptionsValidation(t *testing.T) { name: "MissingPassword", buildOpts: func() *RabbitOptions { return NewRabbitOptions(). - SetQueueName("test-queue"). + SetConsumerQueue("test-queue"). + SetDeadletterQueue("test-queue-dlq"). + SetRouterQueue("test-queue-router"). SetUri("amqp://localhost"). SetHost("localhost"). SetUsername("user"). @@ -104,7 +117,9 @@ func TestRabbitOptionsValidation(t *testing.T) { name: "ValidOptionsWithAmqps", buildOpts: func() *RabbitOptions { return NewRabbitOptions(). - SetQueueName("secure-queue"). + SetConsumerQueue("secure-queue"). + SetDeadletterQueue("secure-queue-dlq"). + SetRouterQueue("secure-queue-router"). SetUri("amqps://user:pass@localhost:5671/"). SetHost("amqps://localhost:5671"). SetUsername("user"). @@ -136,7 +151,9 @@ func TestRabbitOptionsValidation(t *testing.T) { func TestRabbitOptionsBuilder(t *testing.T) { t.Run("BuilderSettersChaining", func(t *testing.T) { opts := NewRabbitOptions(). - SetQueueName("test-queue"). + SetConsumerQueue("test-queue"). + SetDeadletterQueue("test-queue-dlq"). + SetRouterQueue("test-queue-router"). SetUri("amqp://testuser:testpass@localhost:5672/testvhost"). SetHost("localhost:5672"). SetUsername("testuser"). @@ -144,8 +161,14 @@ func TestRabbitOptionsBuilder(t *testing.T) { SetExchange("test-exchange"). Build() - if opts.QueueName != "test-queue" { - t.Errorf("expected QueueName to be 'test-queue', got '%s'", opts.QueueName) + if opts.ConsumerQueue != "test-queue" { + t.Errorf("expected ConsumerQueue to be 'test-queue', got '%s'", opts.ConsumerQueue) + } + if opts.DeadletterQueue != "test-queue-dlq" { + t.Errorf("expected DeadletterQueue to be 'test-queue-dlq', got '%s'", opts.DeadletterQueue) + } + if opts.RouterQueue != "test-queue-router" { + t.Errorf("expected RouterQueue to be 'test-queue-router', got '%s'", opts.RouterQueue) } if opts.Uri != "amqp://testuser:testpass@localhost:5672/testvhost" { t.Errorf("expected Uri to be 'amqp://testuser:testpass@localhost:5672/testvhost', got '%s'", opts.Uri) @@ -170,8 +193,14 @@ func TestRabbitOptionsBuilder(t *testing.T) { SetHost("localhost"). Build() - if opts.QueueName != "" { - t.Errorf("expected QueueName to be empty by default, got '%s'", opts.QueueName) + if opts.ConsumerQueue != "" { + t.Errorf("expected ConsumerQueue to be empty by default, got '%s'", opts.ConsumerQueue) + } + if opts.DeadletterQueue != "" { + t.Errorf("expected DeadletterQueue to be empty by default, got '%s'", opts.DeadletterQueue) + } + if opts.RouterQueue != "" { + t.Errorf("expected RouterQueue to be empty by default, got '%s'", opts.RouterQueue) } if opts.Uri != "amqp://localhost" { t.Errorf("expected Uri to be set, got '%s'", opts.Uri) @@ -193,8 +222,14 @@ func TestRabbitOptionsBuilder(t *testing.T) { t.Run("EmptyBuilder", func(t *testing.T) { opts := NewRabbitOptions().Build() - if opts.QueueName != "" { - t.Errorf("expected QueueName to be empty, got '%s'", opts.QueueName) + if opts.ConsumerQueue != "" { + t.Errorf("expected ConsumerQueue to be empty, got '%s'", opts.ConsumerQueue) + } + if opts.DeadletterQueue != "" { + t.Errorf("expected DeadletterQueue to be empty, got '%s'", opts.DeadletterQueue) + } + if opts.RouterQueue != "" { + t.Errorf("expected RouterQueue to be empty, got '%s'", opts.RouterQueue) } if opts.Uri != "" { t.Errorf("expected Uri to be empty, got '%s'", opts.Uri) @@ -225,7 +260,9 @@ func TestRabbitConnectionStringGeneration(t *testing.T) { name: "BasicAmqpProtocol", buildOpts: func() *RabbitOptions { return NewRabbitOptions(). - SetQueueName("test-queue"). + SetConsumerQueue("test-queue"). + SetDeadletterQueue("test-queue-dlq"). + SetRouterQueue("test-queue-router"). SetUri("amqp://localhost"). SetHost("localhost:5672"). SetUsername("user"). @@ -239,7 +276,9 @@ func TestRabbitConnectionStringGeneration(t *testing.T) { name: "AmqpsProtocol", buildOpts: func() *RabbitOptions { return NewRabbitOptions(). - SetQueueName("test-queue"). + SetConsumerQueue("test-queue"). + SetDeadletterQueue("test-queue-dlq"). + SetRouterQueue("test-queue-router"). SetUri("amqps://localhost"). SetHost("amqps://localhost:5671"). SetUsername("user"). @@ -253,7 +292,9 @@ func TestRabbitConnectionStringGeneration(t *testing.T) { name: "AmqpProtocol", buildOpts: func() *RabbitOptions { return NewRabbitOptions(). - SetQueueName("test-queue"). + SetConsumerQueue("test-queue"). + SetDeadletterQueue("test-queue-dlq"). + SetRouterQueue("test-queue-router"). SetUri("amqp://localhost"). SetHost("amqp://localhost:5672"). SetUsername("user"). @@ -267,7 +308,9 @@ func TestRabbitConnectionStringGeneration(t *testing.T) { name: "NoProtocolInHost", buildOpts: func() *RabbitOptions { return NewRabbitOptions(). - SetQueueName("test-queue"). + SetConsumerQueue("test-queue"). + SetDeadletterQueue("test-queue-dlq"). + SetRouterQueue("test-queue-router"). SetUri("amqp://localhost"). SetHost("localhost:5672"). SetUsername("guest"). @@ -305,6 +348,11 @@ func TestRabbitMQIntegration(t *testing.T) { queueName := os.Getenv("RABBITMQ_QUEUE_NAME") exchange := os.Getenv("RABBITMQ_EXCHANGE") + // Skip integration tests if required environment variables are not set + if host == "" || username == "" || password == "" { + t.Skip("Skipping integration tests: RABBITMQ_HOST, RABBITMQ_USERNAME, and RABBITMQ_PASSWORD must be set") + } + // Set defaults for optional values if queueName == "" { queueName = "test-integration-queue" @@ -313,7 +361,9 @@ func TestRabbitMQIntegration(t *testing.T) { t.Run("ConnectToRealRabbitMQ", func(t *testing.T) { // Build RabbitMQ options from environment variables opts := NewRabbitOptions(). - SetQueueName(queueName). + SetConsumerQueue(queueName). + SetDeadletterQueue(queueName + "-dlq"). + SetRouterQueue(queueName + "-router"). SetUri(uri). SetHost(host). SetUsername(username). @@ -362,7 +412,9 @@ func TestRabbitMQIntegration(t *testing.T) { t.Run("ConnectionHealthCheck", func(t *testing.T) { opts := NewRabbitOptions(). - SetQueueName(queueName). + SetConsumerQueue(queueName). + SetDeadletterQueue(queueName + "-dlq"). + SetRouterQueue(queueName + "-router"). SetUri(uri). SetHost(host). SetUsername(username). @@ -398,7 +450,9 @@ func TestRabbitMQIntegration(t *testing.T) { t.Run("MultipleConnections", func(t *testing.T) { opts := NewRabbitOptions(). - SetQueueName(queueName). + SetConsumerQueue(queueName). + SetDeadletterQueue(queueName + "-dlq"). + SetRouterQueue(queueName + "-router"). SetUri(uri). SetHost(host). SetUsername(username). @@ -435,3 +489,258 @@ func TestRabbitMQIntegration(t *testing.T) { t.Log("Successfully created and closed multiple connections") }) } + +// TestFormatQueueName tests the formatQueueName function that applies legacy naming convention +func TestFormatQueueName(t *testing.T) { + tests := []struct { + name string + input string + expected string + }{ + { + name: "Simple queue name", + input: "test", + expected: "kcloud-test-queue", + }, + { + name: "Queue name with dashes", + input: "test-service", + expected: "kcloud-test-service-queue", + }, + { + name: "Empty string", + input: "", + expected: "kcloud--queue", + }, + { + name: "Queue name with underscores", + input: "test_service", + expected: "kcloud-test_service-queue", + }, + { + name: "Queue name with numbers", + input: "test123", + expected: "kcloud-test123-queue", + }, + } + + // Create a minimal RabbitMQ instance for testing + rabbit := &RabbitMQ{} + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := rabbit.formatQueueName(tt.input) + if result != tt.expected { + t.Errorf("formatQueueName(%q) = %q, want %q", tt.input, result, tt.expected) + } + }) + } +} + +// TestDisasterRecovery tests the DisasterRecovery function +func TestDisasterRecovery(t *testing.T) { + t.Run("WithHandler", func(t *testing.T) { + // Create a RabbitMQ instance using the builder pattern + opts := NewRabbitOptions(). + SetConsumerQueue("test-queue"). + SetDeadletterQueue("test-queue-dlq"). + SetRouterQueue("test-queue-router"). + SetHost("localhost"). + SetUsername("guest"). + SetPassword("guest"). + Build() + + rabbit, err := NewRabbitMQ(opts) + if err != nil { + t.Fatalf("failed to create RabbitMQ instance: %v", err) + } + + // Track whether handler was called and with what payload + var handlerCalled bool + var receivedPayload []byte + var handlerError error + + // Set a custom disaster recovery handler + rabbit.SetDisasterRecoveryHandler(func(payload []byte) error { + handlerCalled = true + receivedPayload = payload + return handlerError + }) + + // Test payload + testPayload := []byte(`{"test": "data"}`) + + // Call DisasterRecovery + err = rabbit.DisasterRecovery(testPayload) + + // Verify handler was called + if !handlerCalled { + t.Error("expected disaster recovery handler to be called, but it wasn't") + } + + // Verify correct payload was passed + if string(receivedPayload) != string(testPayload) { + t.Errorf("expected payload %q, got %q", testPayload, receivedPayload) + } + + // Verify no error was returned + if err != nil { + t.Errorf("expected no error, got %v", err) + } + }) + + t.Run("WithHandlerReturningError", func(t *testing.T) { + // Create a RabbitMQ instance using the builder pattern + opts := NewRabbitOptions(). + SetConsumerQueue("test-queue"). + SetDeadletterQueue("test-queue-dlq"). + SetRouterQueue("test-queue-router"). + SetHost("localhost"). + SetUsername("guest"). + SetPassword("guest"). + Build() + + rabbit, err := NewRabbitMQ(opts) + if err != nil { + t.Fatalf("failed to create RabbitMQ instance: %v", err) + } + + // Set a handler that returns an error + expectedErr := fmt.Errorf("handler error") + rabbit.SetDisasterRecoveryHandler(func(payload []byte) error { + return expectedErr + }) + + // Test payload + testPayload := []byte(`{"test": "data"}`) + + // Call DisasterRecovery + err = rabbit.DisasterRecovery(testPayload) + + // Verify error was returned + if err != expectedErr { + t.Errorf("expected error %v, got %v", expectedErr, err) + } + }) + + t.Run("WithoutHandler", func(t *testing.T) { + // Create a RabbitMQ instance without setting a handler + opts := NewRabbitOptions(). + SetConsumerQueue("test-queue"). + SetDeadletterQueue("test-queue-dlq"). + SetRouterQueue("test-queue-router"). + SetHost("localhost"). + SetUsername("guest"). + SetPassword("guest"). + Build() + + rabbit, err := NewRabbitMQ(opts) + if err != nil { + t.Fatalf("failed to create RabbitMQ instance: %v", err) + } + + // Test payload + testPayload := []byte(`{"test": "data"}`) + + // Call DisasterRecovery + err = rabbit.DisasterRecovery(testPayload) + + // Verify no error was returned (default behavior) + if err != nil { + t.Errorf("expected no error when no handler is set, got %v", err) + } + }) + + t.Run("WithEmptyPayload", func(t *testing.T) { + // Create a RabbitMQ instance using the builder pattern + opts := NewRabbitOptions(). + SetConsumerQueue("test-queue"). + SetDeadletterQueue("test-queue-dlq"). + SetRouterQueue("test-queue-router"). + SetHost("localhost"). + SetUsername("guest"). + SetPassword("guest"). + Build() + + rabbit, err := NewRabbitMQ(opts) + if err != nil { + t.Fatalf("failed to create RabbitMQ instance: %v", err) + } + + // Track handler call + var handlerCalled bool + var receivedPayload []byte + + rabbit.SetDisasterRecoveryHandler(func(payload []byte) error { + handlerCalled = true + receivedPayload = payload + return nil + }) + + // Empty payload + testPayload := []byte{} + + // Call DisasterRecovery + err = rabbit.DisasterRecovery(testPayload) + + // Verify handler was called + if !handlerCalled { + t.Error("expected disaster recovery handler to be called") + } + + // Verify empty payload was passed + if len(receivedPayload) != 0 { + t.Errorf("expected empty payload, got %v", receivedPayload) + } + + // Verify no error + if err != nil { + t.Errorf("expected no error, got %v", err) + } + }) + + t.Run("WithNilPayload", func(t *testing.T) { + // Create a RabbitMQ instance using the builder pattern + opts := NewRabbitOptions(). + SetConsumerQueue("test-queue"). + SetDeadletterQueue("test-queue-dlq"). + SetRouterQueue("test-queue-router"). + SetHost("localhost"). + SetUsername("guest"). + SetPassword("guest"). + Build() + + rabbit, err := NewRabbitMQ(opts) + if err != nil { + t.Fatalf("failed to create RabbitMQ instance: %v", err) + } + + // Track handler call + var handlerCalled bool + var receivedPayload []byte + + rabbit.SetDisasterRecoveryHandler(func(payload []byte) error { + handlerCalled = true + receivedPayload = payload + return nil + }) + + // Call DisasterRecovery with nil + err = rabbit.DisasterRecovery(nil) + + // Verify handler was called + if !handlerCalled { + t.Error("expected disaster recovery handler to be called") + } + + // Verify nil payload was passed + if receivedPayload != nil { + t.Errorf("expected nil payload, got %v", receivedPayload) + } + + // Verify no error + if err != nil { + t.Errorf("expected no error, got %v", err) + } + }) +}