Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions pkg/api/status_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ const (
AdapterConditionUnknown AdapterConditionStatus = "Unknown"
)

// Condition type constants
const (
ConditionTypeAvailable = "Available"
ConditionTypeApplied = "Applied"
ConditionTypeHealth = "Health"
ConditionTypeReady = "Ready"
)

// ResourceCondition represents a condition of a resource
// Domain equivalent of openapi.ResourceCondition
// JSON tags match database JSONB structure
Expand Down
57 changes: 25 additions & 32 deletions pkg/services/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
stderrors "errors"
"fmt"
"strings"
"time"

Expand Down Expand Up @@ -237,9 +238,10 @@ func (s *sqlClusterService) UpdateClusterStatusFromAdapters(
}

// ProcessAdapterStatus handles the business logic for adapter status:
// - If Available is "Unknown" and a status already exists: returns (nil, nil) as no-op
// - If Available is "Unknown" and no status exists (first report): upserts the status
// - Otherwise: upserts the status and triggers aggregation
// - Validates that all mandatory conditions (Available, Applied, Health) are present
// - Rejects updates where any mandatory condition is missing or has status "Unknown"
// - Uses complete replacement semantics: each update replaces all conditions for this adapter
// - Returns (nil, nil) for discarded updates, which results in HTTP 204 No Content
func (s *sqlClusterService) ProcessAdapterStatus(
ctx context.Context, clusterID string, adapterStatus *api.AdapterStatus,
) (*api.AdapterStatus, *errors.ServiceError) {
Expand All @@ -264,42 +266,33 @@ func (s *sqlClusterService) ProcessAdapterStatus(
}
}

// Find the "Available" condition
hasAvailableCondition := false
for _, cond := range conditions {
if cond.Type != "Available" {
continue
}

hasAvailableCondition = true
if cond.Status == api.AdapterConditionUnknown {
if existingStatus != nil {
// Available condition is "Unknown" and a status already exists, return nil to indicate no-op
return nil, nil
}
// First report from this adapter: allow storing even with Available=Unknown
// but skip aggregation since Unknown should not affect cluster-level conditions
hasAvailableCondition = false
}
// Validate that all mandatory conditions are present and not Unknown
if missingCondition, unknownCondition := ValidateMandatoryConditions(conditions); missingCondition != "" {
// Missing mandatory condition - discard the update
ctx = logger.WithClusterID(ctx, clusterID)
logger.Info(ctx, fmt.Sprintf("Discarding adapter status update from %s: missing mandatory condition %s",
adapterStatus.Adapter, missingCondition))
return nil, nil
} else if unknownCondition != "" {
// Mandatory condition has Unknown status - discard the update
ctx = logger.WithClusterID(ctx, clusterID)
logger.Info(ctx, fmt.Sprintf("Discarding adapter status update from %s: mandatory condition %s has Unknown status",
adapterStatus.Adapter, unknownCondition))
return nil, nil
}

// Upsert the adapter status
// All validations passed - upsert the adapter status (complete replacement)
upsertedStatus, err := s.adapterStatusDao.Upsert(ctx, adapterStatus)
if err != nil {
return nil, handleCreateError("AdapterStatus", err)
}

// Only trigger aggregation when the adapter reported an Available condition.
// If the adapter status doesn't include Available (e.g. it only reports Ready/Progressing),
// saving it should not overwrite the cluster's synthetic Available/Ready conditions.
if hasAvailableCondition {
if _, aggregateErr := s.UpdateClusterStatusFromAdapters(
ctx, clusterID,
); aggregateErr != nil {
// Log error but don't fail the request - the status will be computed on next update
ctx = logger.WithClusterID(ctx, clusterID)
logger.WithError(ctx, aggregateErr).Warn("Failed to aggregate cluster status")
}
// Trigger aggregation to update cluster-level conditions
if _, aggregateErr := s.UpdateClusterStatusFromAdapters(
ctx, clusterID,
); aggregateErr != nil {
ctx = logger.WithClusterID(ctx, clusterID)
logger.WithError(ctx, aggregateErr).Warn("Failed to aggregate cluster status")
}

return upsertedStatus, nil
Expand Down
Loading