Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions maas/maas-service/service/rabbit_service/helper/mock/helper.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
CreateQueuesAndShovelsForExportedExchange(ctx context.Context, vhostAndVersion []model.VhostAndVersion, exchange model.Exchange, existingShovelsSet map[string]struct{}) ([]string, error)
DeleteShovelByName(ctx context.Context, shovelName string) error
GetVhostShovels(ctx context.Context) ([]model.Shovel, error)
GetAllInstanceShovels(ctx context.Context) ([]model.Shovel, error)
}

const shovelQueue = "-sq"
Expand Down Expand Up @@ -425,10 +426,30 @@
log.ErrorC(ctx, "queue entity '%v' doesn't have name field", queue)
return nil, err
}
h.deleteAssociatedShovels(ctx, queueName)
url := fmt.Sprintf("queues/%s/%s", h.vhost.Vhost, queueName)
return h.DeleteEntity(ctx, queue, url)
}

// deleteAssociatedShovels deletes all shovels across all vhosts that source from queueName in this vhost.
// Errors are logged as warnings and do not block queue deletion (e.g. when shovel plugin is disabled).
func (h RabbitHelperImpl) deleteAssociatedShovels(ctx context.Context, queueName string) {
allShovels, err := h.GetAllInstanceShovels(ctx)
if err != nil || allShovels == nil {
log.WarnC(ctx, "Could not retrieve instance shovels (shovel plugin may be disabled), skipping shovel cleanup for queue '%s': %v", queueName, err)
return
}
for _, shovel := range allShovels {
if shovel.Value.SrcQueue == queueName && strings.Contains(shovel.Value.SrcUri, h.vhost.Vhost) {
log.InfoC(ctx, "Deleting shovel '%s' in vhost '%s' before deleting queue '%s'", shovel.Name, shovel.Vhost, queueName)
url := fmt.Sprintf("parameters/shovel/%s/%s", shovel.Vhost, shovel.Name)

Check failure on line 445 in maas/maas-service/service/rabbit_service/helper/rabbit_helper.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Define a constant instead of duplicating this literal "parameters/shovel/%s/%s" 4 times.

See more on https://sonarcloud.io/project/issues?id=Netcracker_qubership-maas&issues=AZ5GOrGHIQnTQqJgqc8F&open=AZ5GOrGHIQnTQqJgqc8F&pullRequest=202
if _, delErr := h.DeleteAdminEntity(ctx, nil, url); delErr != nil {
log.WarnC(ctx, "Failed to delete shovel '%s' in vhost '%s': %v", shovel.Name, shovel.Vhost, delErr)
}
}
}
}

func (h RabbitHelperImpl) GetBinding(ctx context.Context, binding interface{}) (interface{}, error) {
log.InfoC(ctx, "Get binding: %v", binding)
source, destination, err := utils.ExtractSourceAndDestination(binding)
Expand Down Expand Up @@ -864,7 +885,21 @@
url := fmt.Sprintf("parameters/shovel/%s", h.vhost.Vhost)
shovels, err := h.GetAdminEntity(ctx, url, []model.Shovel{})
if err != nil {
return nil, utils.LogError(log, ctx, "error during DeleteAdminEntity: %w", err)
return nil, utils.LogError(log, ctx, "error during GetAdminEntity for vhost shovels: %w", err)
}

if shovels == nil {
return nil, nil
}
shovelsConverted := shovels.(*[]model.Shovel)

return *shovelsConverted, nil
}

func (h RabbitHelperImpl) GetAllInstanceShovels(ctx context.Context) ([]model.Shovel, error) {
shovels, err := h.GetAdminEntity(ctx, "parameters/shovel", []model.Shovel{})
if err != nil {
return nil, utils.LogError(log, ctx, "error during GetAdminEntity for all instance shovels: %w", err)
}

if shovels == nil {
Expand Down
179 changes: 144 additions & 35 deletions maas/maas-service/service/rabbit_service/helper/rabbit_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,47 +118,156 @@ func TestRabbitVhostHelperImpl_CreateQueueInequivArg(t *testing.T) {
_, err = buf.Write(queueBytes)
assert.NoError(err)

httpHelper.EXPECT().
DoRequest(gomock.Any(), "PUT", gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil, &RabbitHttpError{
Code: http.StatusBadRequest,
ExpectedCodes: []int{http.StatusOK},
Message: "error during creating queue",
Response: []byte("{\"reason\": \"inequivalent arg\"}"),
}).
Times(1)
httpHelper.EXPECT().
DoRequest(gomock.Any(), "DELETE", gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(&resty.Response{
RawResponse: &http.Response{
StatusCode: http.StatusNoContent,
Body: buf,
},
}, nil).
Times(1)
httpHelper.EXPECT().
DoRequest(gomock.Any(), "PUT", gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(&resty.Response{
Request: &resty.Request{URL: "url"},
RawResponse: &http.Response{
StatusCode: http.StatusOK,
}}, nil).
Times(1)
httpHelper.EXPECT().
DoRequest(gomock.Any(), "GET", gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(&resty.Response{
RawResponse: &http.Response{
StatusCode: http.StatusOK,
Body: buf,
},
}, nil).
Times(1)
getShovelsBuf := gbytes.NewBuffer()
_, err = getShovelsBuf.Write([]byte("[]"))
assert.NoError(err)

gomock.InOrder(
httpHelper.EXPECT().
DoRequest(gomock.Any(), "PUT", gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil, &RabbitHttpError{
Code: http.StatusBadRequest,
ExpectedCodes: []int{http.StatusOK},
Message: "error during creating queue",
Response: []byte("{\"reason\": \"inequivalent arg\"}"),
}),
// deleteAssociatedShovels calls GET parameters/shovel before deleting the queue
httpHelper.EXPECT().
DoRequest(gomock.Any(), "GET", gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(&resty.Response{
RawResponse: &http.Response{
StatusCode: http.StatusOK,
Body: getShovelsBuf,
},
}, nil),
httpHelper.EXPECT().
DoRequest(gomock.Any(), "DELETE", gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(&resty.Response{
RawResponse: &http.Response{
StatusCode: http.StatusNoContent,
Body: buf,
},
}, nil),
httpHelper.EXPECT().
DoRequest(gomock.Any(), "PUT", gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(&resty.Response{
Request: &resty.Request{URL: "url"},
RawResponse: &http.Response{
StatusCode: http.StatusOK,
}}, nil),
httpHelper.EXPECT().
DoRequest(gomock.Any(), "GET", gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(&resty.Response{
RawResponse: &http.Response{
StatusCode: http.StatusOK,
Body: buf,
},
}, nil),
)

resp, _, err := rabbitHelper.CreateQueue(context.Background(), queue)
assert.NoError(err)
assert.Equal("test-queue", (*resp.(*map[string]interface{}))["name"])
}

// TestRabbitVhostHelperImpl_CreateQueueInequivArg_ShovelDeletedFirst verifies that when a queue is
// inequivalent and must be deleted+recreated, any shovel sourcing from that queue is deleted
// BEFORE the queue itself is deleted, so there is no gap where the queue is absent but the shovel exists.
func TestRabbitVhostHelperImpl_CreateQueueInequivArg_ShovelDeletedFirst(t *testing.T) {
assert := assert.New(t)
mockCtrl := gomock.NewController(t)
httpHelper := mock_helper.NewMockHttpHelper(mockCtrl)

const (
srcVhost = "source-vhost"
exportedVhost = "exported-vhost"
queueName = "my-queue"
shovelName = "my-queue-ns-exported"
)

rabbitHelper := NewRabbitHelperWithHttpHelper(
model.RabbitInstance{},
model.VHostRegistration{Vhost: srcVhost},
httpHelper,
)

queue := map[string]interface{}{"name": queueName}
queueBytes, err := json.Marshal(queue)
assert.NoError(err)

// shovel in exported vhost sourcing from the queue in source-vhost
shovels := []model.Shovel{{
Name: shovelName,
Vhost: exportedVhost,
Value: model.ShovelValue{
SrcProtocol: "amqp091",
SrcUri: "amqp://user:pass@rabbitmq/" + srcVhost,
SrcQueue: queueName,
DestProtocol: "amqp091",
DestUri: "amqp://user:pass@rabbitmq/" + exportedVhost,
DestQueue: queueName,
},
}}
shovelsBytes, err := json.Marshal(shovels)
assert.NoError(err)

shovelsBuf := gbytes.NewBuffer()
_, err = shovelsBuf.Write(shovelsBytes)
assert.NoError(err)

queueBuf := gbytes.NewBuffer()
_, err = queueBuf.Write(queueBytes)
assert.NoError(err)

// InOrder guarantees: shovel DELETE happens before queue DELETE
gomock.InOrder(
// 1. initial PUT → inequiv 400
httpHelper.EXPECT().
DoRequest(gomock.Any(), "PUT", gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil, &RabbitHttpError{
Code: http.StatusBadRequest,
ExpectedCodes: []int{http.StatusOK},
Message: "inequivalent arg",
Response: []byte(`{"reason": "inequivalent arg"}`),
}),
// 2. GET parameters/shovel → returns the shovel referencing our queue
httpHelper.EXPECT().
DoRequest(gomock.Any(), "GET", gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(&resty.Response{
RawResponse: &http.Response{StatusCode: http.StatusOK, Body: shovelsBuf},
}, nil),
// 3. DELETE parameters/shovel/exported-vhost/shovel-name — BEFORE queue deletion
httpHelper.EXPECT().
DoRequest(gomock.Any(), "DELETE", gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(&resty.Response{
RawResponse: &http.Response{StatusCode: http.StatusNoContent, Body: http.NoBody},
}, nil),
// 4. DELETE queue
httpHelper.EXPECT().
DoRequest(gomock.Any(), "DELETE", gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(&resty.Response{
RawResponse: &http.Response{StatusCode: http.StatusNoContent, Body: http.NoBody},
}, nil),
// 5. PUT queue recreate
httpHelper.EXPECT().
DoRequest(gomock.Any(), "PUT", gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(&resty.Response{
Request: &resty.Request{URL: "url"},
RawResponse: &http.Response{StatusCode: http.StatusOK},
}, nil),
// 6. GET queue (post-create verification)
httpHelper.EXPECT().
DoRequest(gomock.Any(), "GET", gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(&resty.Response{
RawResponse: &http.Response{StatusCode: http.StatusOK, Body: queueBuf},
}, nil),
)

resp, _, err := rabbitHelper.CreateQueue(context.Background(), queue)
assert.NoError(err)
assert.Equal(queueName, (*resp.(*map[string]interface{}))["name"])
}

func TestRabbitVhostHelperImpl_CreateQueueBadRequest(t *testing.T) {
assert := assert.New(t)
mockCtrl := gomock.NewController(t)
Expand Down
Loading