Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.24.10
require (
github.com/go-playground/validator/v10 v10.30.1
github.com/rabbitmq/amqp091-go v1.10.0
github.com/uug-ai/models v1.2.24
github.com/uug-ai/models v1.2.26
)

require (
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzuk
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/uug-ai/models v1.2.24 h1:dQzajYI3WiePM6a3z0/sa+s1VDPSUGjy2pA5r/S0j9c=
github.com/uug-ai/models v1.2.24/go.mod h1:0EHI6EKF/f2J1iXmFuPFuZZ2yv9Q6kphqcS8wzHYGd8=
github.com/uug-ai/models v1.2.26 h1:gHqq/+HT7D9EXEUpgLJVWbfjC+CwYmRHBJoRsMZwJfI=
github.com/uug-ai/models v1.2.26/go.mod h1:0EHI6EKF/f2J1iXmFuPFuZZ2yv9Q6kphqcS8wzHYGd8=
go.mongodb.org/mongo-driver v1.17.4 h1:jUorfmVzljjr0FLzYQsGP8cgN/qzzxlY9Vh0C9KFXVw=
go.mongodb.org/mongo-driver v1.17.4/go.mod h1:Hy04i7O2kC4RS06ZrhPRqj/u4DTYkFDAAccj+rVKqgQ=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
Expand Down
175 changes: 175 additions & 0 deletions pkg/queue/mock.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package queue

import (
"encoding/json"
"os"
"time"

"github.com/go-playground/validator/v10"
"github.com/uug-ai/models/pkg/models"
)
Expand Down Expand Up @@ -41,6 +45,7 @@ type MockQueue struct {
running bool // Running flag for controlling the message loop
ConnectCalled bool // Flag to indicate if Connect was called
ConnectError error // Error to return on Connect
CloseCalled bool // Flag to indicate if Close was called
}

// NewMock creates a new MockQueue instance
Expand All @@ -64,3 +69,173 @@ func (m *MockQueue) Connect() error {
m.ConnectCalled = true
return m.ConnectError
}

// Close stops the queue operations and cleans up resources
func (m *MockQueue) Close() error {
m.CloseCalled = true

// Stop the message processing loop
m.running = false

// Clear internal queues
m.messageQueue = make([]models.PipelineEvent, 0)
m.sentMessages = make([]string, 0)
return nil
}

func (m *MockQueue) ReadMessages(handleMessage models.MessageHandler, handlePrometheus models.PrometheusHandler, args ...any) error {

m.running = true
// Process messages from internal queue
for m.running {
if len(m.messageQueue) > 0 {
// Get the first message from the queue
pipelineEvent := m.messageQueue[0]
m.messageQueue = m.messageQueue[1:] // Remove processed message

// Simulate processing time
startTime := time.Now()

// Call the message handler
pipelineAction, pipelineEvent, _ := handleMessage(pipelineEvent, args...)

switch pipelineAction {
case models.PipelineForward:
_ = pipelineEvent // Suppress unused variable warning

case models.PipelineCancel:
_ = pipelineEvent
case models.PipelineRetry:
// Put the message back at the end of the queue
m.messageQueue = append(m.messageQueue, pipelineEvent)
_ = pipelineEvent
}

// Simulate processing time event
endTime := time.Now()
processingTime := endTime.Sub(startTime)
e := models.PipelineMetrics{
ProcessingTime: processingTime.Seconds(),
}
handlePrometheus(e)
} else {
// No messages, wait a bit
time.Sleep(100 * time.Millisecond)
}
}

return nil
}

// RouteMessages simulates routing messages to other queues
func (m *MockQueue) RouteMessages(handleMessage models.MessageHandler, handlePrometheus models.PrometheusHandler, args ...any) error {

m.running = true

for m.running {
if len(m.messageQueue) > 0 {
// Get the first message from the queue
pipelineEvent := m.messageQueue[0]
m.messageQueue = m.messageQueue[1:] // Remove processed message

// Simulate processing time
startTime := time.Now()

// Simulate forwarding logic from SQS implementation
if len(pipelineEvent.Stages) > 0 {
// In a real implementation, this would send to the next queue
// For mock, we just log the action
}

// Simulate processing time event
endTime := time.Now()
processingTime := endTime.Sub(startTime)
e := models.PipelineMetrics{
ProcessingTime: processingTime.Seconds(),
}
handlePrometheus(e)
} else {
// No messages, wait a bit
time.Sleep(100 * time.Millisecond)
}
}

return nil
}

// Publish sends a message immediately to the specified RabbitMQ queue
func (m *MockQueue) Publish(queueName string, payload []byte) error {

// Parse the payload as a PipelineEvent
var pipelineEvent models.PipelineEvent
if err := json.Unmarshal([]byte(payload), &pipelineEvent); err != nil {
return err
}

// Add the message to internal queue
m.messageQueue = append(m.messageQueue, pipelineEvent)

// Store the message payload for testing purposes
m.sentMessages = append(m.sentMessages, string(payload))
return nil
}

// Helper methods for testing
// AddMessageToQueue adds a message directly to the internal queue for testing
func (m *MockQueue) AddMessageToQueue(event models.PipelineEvent) {
m.messageQueue = append(m.messageQueue, event)
}

// GetQueueSize returns the current number of messages in the internal queue
func (m *MockQueue) GetQueueSize() int {
return len(m.messageQueue)
}

// GetSentMessages returns a copy of all sent messages for testing verification
func (m *MockQueue) GetSentMessages() []string {
messages := make([]string, len(m.sentMessages))
copy(messages, m.sentMessages)
return messages
}

// ClearQueues clears both internal message queue and sent messages
func (m *MockQueue) ClearQueues() {
m.messageQueue = make([]models.PipelineEvent, 0)
m.sentMessages = make([]string, 0)
}

// IsRunning returns the current running state of the queue
func (m *MockQueue) IsRunning() bool {
return m.running
}

// Stop stops the queue operations without clearing the messages
func (m *MockQueue) Stop() {
m.running = false
}

// LoadMessages loads pipeline events from a JSON file and sends them to the queue
func (m *MockQueue) LoadMessages(filename string) error {
// Read the mock.json which is an array with models.PipelineEvent
file, err := os.ReadFile(filename)
if err != nil {
return err
}
var events []models.PipelineEvent
err = json.Unmarshal(file, &events)
if err != nil {
return err
}
for _, event := range events {
// convert event to a string
eventBytes, err := json.Marshal(event)
if err != nil {
return err
}
err = m.Publish(m.options.QueueName, eventBytes)
if err != nil {
return err
}
}
return nil
}
Loading
Loading