diff --git a/maas/maas-service/service/rabbit_service/helper/mock/helper.go b/maas/maas-service/service/rabbit_service/helper/mock/helper.go index 55947bbe..696258ea 100644 --- a/maas/maas-service/service/rabbit_service/helper/mock/helper.go +++ b/maas/maas-service/service/rabbit_service/helper/mock/helper.go @@ -500,6 +500,21 @@ func (mr *MockRabbitHelperMockRecorder) GetVhostShovels(ctx interface{}) *gomock return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetVhostShovels", reflect.TypeOf((*MockRabbitHelper)(nil).GetVhostShovels), ctx) } +// GetAllInstanceShovels mocks base method. +func (m *MockRabbitHelper) GetAllInstanceShovels(ctx context.Context) ([]model.Shovel, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetAllInstanceShovels", ctx) + ret0, _ := ret[0].([]model.Shovel) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetAllInstanceShovels indicates an expected call of GetAllInstanceShovels. +func (mr *MockRabbitHelperMockRecorder) GetAllInstanceShovels(ctx interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAllInstanceShovels", reflect.TypeOf((*MockRabbitHelper)(nil).GetAllInstanceShovels), ctx) +} + // IsInstanceAvailable mocks base method. func (m *MockRabbitHelper) IsInstanceAvailable() error { m.ctrl.T.Helper() diff --git a/maas/maas-service/service/rabbit_service/helper/rabbit_helper.go b/maas/maas-service/service/rabbit_service/helper/rabbit_helper.go index 20f6ab91..ab4ac26d 100644 --- a/maas/maas-service/service/rabbit_service/helper/rabbit_helper.go +++ b/maas/maas-service/service/rabbit_service/helper/rabbit_helper.go @@ -60,6 +60,7 @@ type RabbitHelper interface { CreateQueuesAndShovelsForExportedExchange(ctx context.Context, vhostAndVersion []model.VhostAndVersion, exchange model.Exchange, existingShovelsSet map[string]struct{}) ([]string, error) DeleteShovelByName(ctx context.Context, shovelName string) error GetVhostShovels(ctx context.Context) ([]model.Shovel, error) + GetAllInstanceShovels(ctx context.Context) ([]model.Shovel, error) } const shovelQueue = "-sq" @@ -425,10 +426,30 @@ func (h RabbitHelperImpl) DeleteQueue(ctx context.Context, queue interface{}) (i log.ErrorC(ctx, "queue entity '%v' doesn't have name field", queue) return nil, err } + h.deleteAssociatedShovels(ctx, queueName) url := fmt.Sprintf("queues/%s/%s", h.vhost.Vhost, queueName) return h.DeleteEntity(ctx, queue, url) } +// deleteAssociatedShovels deletes all shovels across all vhosts that source from queueName in this vhost. +// Errors are logged as warnings and do not block queue deletion (e.g. when shovel plugin is disabled). +func (h RabbitHelperImpl) deleteAssociatedShovels(ctx context.Context, queueName string) { + allShovels, err := h.GetAllInstanceShovels(ctx) + if err != nil || allShovels == nil { + log.WarnC(ctx, "Could not retrieve instance shovels (shovel plugin may be disabled), skipping shovel cleanup for queue '%s': %v", queueName, err) + return + } + for _, shovel := range allShovels { + if shovel.Value.SrcQueue == queueName && strings.Contains(shovel.Value.SrcUri, h.vhost.Vhost) { + log.InfoC(ctx, "Deleting shovel '%s' in vhost '%s' before deleting queue '%s'", shovel.Name, shovel.Vhost, queueName) + url := fmt.Sprintf("parameters/shovel/%s/%s", shovel.Vhost, shovel.Name) + if _, delErr := h.DeleteAdminEntity(ctx, nil, url); delErr != nil { + log.WarnC(ctx, "Failed to delete shovel '%s' in vhost '%s': %v", shovel.Name, shovel.Vhost, delErr) + } + } + } +} + func (h RabbitHelperImpl) GetBinding(ctx context.Context, binding interface{}) (interface{}, error) { log.InfoC(ctx, "Get binding: %v", binding) source, destination, err := utils.ExtractSourceAndDestination(binding) @@ -864,7 +885,21 @@ func (h RabbitHelperImpl) GetVhostShovels(ctx context.Context) ([]model.Shovel, url := fmt.Sprintf("parameters/shovel/%s", h.vhost.Vhost) shovels, err := h.GetAdminEntity(ctx, url, []model.Shovel{}) if err != nil { - return nil, utils.LogError(log, ctx, "error during DeleteAdminEntity: %w", err) + return nil, utils.LogError(log, ctx, "error during GetAdminEntity for vhost shovels: %w", err) + } + + if shovels == nil { + return nil, nil + } + shovelsConverted := shovels.(*[]model.Shovel) + + return *shovelsConverted, nil +} + +func (h RabbitHelperImpl) GetAllInstanceShovels(ctx context.Context) ([]model.Shovel, error) { + shovels, err := h.GetAdminEntity(ctx, "parameters/shovel", []model.Shovel{}) + if err != nil { + return nil, utils.LogError(log, ctx, "error during GetAdminEntity for all instance shovels: %w", err) } if shovels == nil { diff --git a/maas/maas-service/service/rabbit_service/helper/rabbit_helper_test.go b/maas/maas-service/service/rabbit_service/helper/rabbit_helper_test.go index df48f309..37a14462 100644 --- a/maas/maas-service/service/rabbit_service/helper/rabbit_helper_test.go +++ b/maas/maas-service/service/rabbit_service/helper/rabbit_helper_test.go @@ -118,47 +118,156 @@ func TestRabbitVhostHelperImpl_CreateQueueInequivArg(t *testing.T) { _, err = buf.Write(queueBytes) assert.NoError(err) - httpHelper.EXPECT(). - DoRequest(gomock.Any(), "PUT", gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). - Return(nil, &RabbitHttpError{ - Code: http.StatusBadRequest, - ExpectedCodes: []int{http.StatusOK}, - Message: "error during creating queue", - Response: []byte("{\"reason\": \"inequivalent arg\"}"), - }). - Times(1) - httpHelper.EXPECT(). - DoRequest(gomock.Any(), "DELETE", gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). - Return(&resty.Response{ - RawResponse: &http.Response{ - StatusCode: http.StatusNoContent, - Body: buf, - }, - }, nil). - Times(1) - httpHelper.EXPECT(). - DoRequest(gomock.Any(), "PUT", gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). - Return(&resty.Response{ - Request: &resty.Request{URL: "url"}, - RawResponse: &http.Response{ - StatusCode: http.StatusOK, - }}, nil). - Times(1) - httpHelper.EXPECT(). - DoRequest(gomock.Any(), "GET", gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). - Return(&resty.Response{ - RawResponse: &http.Response{ - StatusCode: http.StatusOK, - Body: buf, - }, - }, nil). - Times(1) + getShovelsBuf := gbytes.NewBuffer() + _, err = getShovelsBuf.Write([]byte("[]")) + assert.NoError(err) + + gomock.InOrder( + httpHelper.EXPECT(). + DoRequest(gomock.Any(), "PUT", gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + Return(nil, &RabbitHttpError{ + Code: http.StatusBadRequest, + ExpectedCodes: []int{http.StatusOK}, + Message: "error during creating queue", + Response: []byte("{\"reason\": \"inequivalent arg\"}"), + }), + // deleteAssociatedShovels calls GET parameters/shovel before deleting the queue + httpHelper.EXPECT(). + DoRequest(gomock.Any(), "GET", gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + Return(&resty.Response{ + RawResponse: &http.Response{ + StatusCode: http.StatusOK, + Body: getShovelsBuf, + }, + }, nil), + httpHelper.EXPECT(). + DoRequest(gomock.Any(), "DELETE", gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + Return(&resty.Response{ + RawResponse: &http.Response{ + StatusCode: http.StatusNoContent, + Body: buf, + }, + }, nil), + httpHelper.EXPECT(). + DoRequest(gomock.Any(), "PUT", gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + Return(&resty.Response{ + Request: &resty.Request{URL: "url"}, + RawResponse: &http.Response{ + StatusCode: http.StatusOK, + }}, nil), + httpHelper.EXPECT(). + DoRequest(gomock.Any(), "GET", gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + Return(&resty.Response{ + RawResponse: &http.Response{ + StatusCode: http.StatusOK, + Body: buf, + }, + }, nil), + ) resp, _, err := rabbitHelper.CreateQueue(context.Background(), queue) assert.NoError(err) assert.Equal("test-queue", (*resp.(*map[string]interface{}))["name"]) } +// TestRabbitVhostHelperImpl_CreateQueueInequivArg_ShovelDeletedFirst verifies that when a queue is +// inequivalent and must be deleted+recreated, any shovel sourcing from that queue is deleted +// BEFORE the queue itself is deleted, so there is no gap where the queue is absent but the shovel exists. +func TestRabbitVhostHelperImpl_CreateQueueInequivArg_ShovelDeletedFirst(t *testing.T) { + assert := assert.New(t) + mockCtrl := gomock.NewController(t) + httpHelper := mock_helper.NewMockHttpHelper(mockCtrl) + + const ( + srcVhost = "source-vhost" + exportedVhost = "exported-vhost" + queueName = "my-queue" + shovelName = "my-queue-ns-exported" + ) + + rabbitHelper := NewRabbitHelperWithHttpHelper( + model.RabbitInstance{}, + model.VHostRegistration{Vhost: srcVhost}, + httpHelper, + ) + + queue := map[string]interface{}{"name": queueName} + queueBytes, err := json.Marshal(queue) + assert.NoError(err) + + // shovel in exported vhost sourcing from the queue in source-vhost + shovels := []model.Shovel{{ + Name: shovelName, + Vhost: exportedVhost, + Value: model.ShovelValue{ + SrcProtocol: "amqp091", + SrcUri: "amqp://user:pass@rabbitmq/" + srcVhost, + SrcQueue: queueName, + DestProtocol: "amqp091", + DestUri: "amqp://user:pass@rabbitmq/" + exportedVhost, + DestQueue: queueName, + }, + }} + shovelsBytes, err := json.Marshal(shovels) + assert.NoError(err) + + shovelsBuf := gbytes.NewBuffer() + _, err = shovelsBuf.Write(shovelsBytes) + assert.NoError(err) + + queueBuf := gbytes.NewBuffer() + _, err = queueBuf.Write(queueBytes) + assert.NoError(err) + + // InOrder guarantees: shovel DELETE happens before queue DELETE + gomock.InOrder( + // 1. initial PUT → inequiv 400 + httpHelper.EXPECT(). + DoRequest(gomock.Any(), "PUT", gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + Return(nil, &RabbitHttpError{ + Code: http.StatusBadRequest, + ExpectedCodes: []int{http.StatusOK}, + Message: "inequivalent arg", + Response: []byte(`{"reason": "inequivalent arg"}`), + }), + // 2. GET parameters/shovel → returns the shovel referencing our queue + httpHelper.EXPECT(). + DoRequest(gomock.Any(), "GET", gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + Return(&resty.Response{ + RawResponse: &http.Response{StatusCode: http.StatusOK, Body: shovelsBuf}, + }, nil), + // 3. DELETE parameters/shovel/exported-vhost/shovel-name — BEFORE queue deletion + httpHelper.EXPECT(). + DoRequest(gomock.Any(), "DELETE", gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + Return(&resty.Response{ + RawResponse: &http.Response{StatusCode: http.StatusNoContent, Body: http.NoBody}, + }, nil), + // 4. DELETE queue + httpHelper.EXPECT(). + DoRequest(gomock.Any(), "DELETE", gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + Return(&resty.Response{ + RawResponse: &http.Response{StatusCode: http.StatusNoContent, Body: http.NoBody}, + }, nil), + // 5. PUT queue recreate + httpHelper.EXPECT(). + DoRequest(gomock.Any(), "PUT", gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + Return(&resty.Response{ + Request: &resty.Request{URL: "url"}, + RawResponse: &http.Response{StatusCode: http.StatusOK}, + }, nil), + // 6. GET queue (post-create verification) + httpHelper.EXPECT(). + DoRequest(gomock.Any(), "GET", gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + Return(&resty.Response{ + RawResponse: &http.Response{StatusCode: http.StatusOK, Body: queueBuf}, + }, nil), + ) + + resp, _, err := rabbitHelper.CreateQueue(context.Background(), queue) + assert.NoError(err) + assert.Equal(queueName, (*resp.(*map[string]interface{}))["name"]) +} + func TestRabbitVhostHelperImpl_CreateQueueBadRequest(t *testing.T) { assert := assert.New(t) mockCtrl := gomock.NewController(t)