From 8e63ad7656da29ea8f711ece75c8056e66c3bf34 Mon Sep 17 00:00:00 2001 From: Onur Solmaz Date: Thu, 12 Mar 2026 20:34:37 +0100 Subject: [PATCH] fix(shared-mounts): harden init fetch --- api/cmd/shared-syncer/main.go | 138 ++++++++++++++++++++-- api/cmd/shared-syncer/main_test.go | 56 +++++++++ helm/spritz/templates/api-deployment.yaml | 10 +- helm/spritz/templates/api-pdb.yaml | 12 ++ helm/spritz/values.yaml | 6 + scripts/verify-helm.sh | 4 + 6 files changed, 212 insertions(+), 14 deletions(-) create mode 100644 helm/spritz/templates/api-pdb.yaml diff --git a/api/cmd/shared-syncer/main.go b/api/cmd/shared-syncer/main.go index 52378b7..e688b7f 100644 --- a/api/cmd/shared-syncer/main.go +++ b/api/cmd/shared-syncer/main.go @@ -12,6 +12,7 @@ import ( "fmt" "io" "log" + "net" "net/http" "net/url" "os" @@ -37,6 +38,17 @@ const ( publishSuppressAfterApply = 2 * time.Second ) +var ( + initRetryWindow = 2 * time.Minute + initRetryBackoff = 2 * time.Second + initLatestRequestTTL = 15 * time.Second + initApplyRequestTTL = 60 * time.Second + sharedMountDialTimeout = 5 * time.Second + sharedMountKeepAlive = 30 * time.Second + sharedMountHeaderTTL = 30 * time.Second + sharedMountIdleConnTTL = 90 * time.Second +) + type sharedMountClient struct { baseURL string token string @@ -71,7 +83,19 @@ func main() { token: token, // Long-polling calls can legitimately hold the connection open. // Prefer per-request timeouts (via context) over a tight global client timeout. - client: &http.Client{Timeout: 5 * time.Minute}, + client: &http.Client{ + Timeout: 5 * time.Minute, + Transport: &http.Transport{ + Proxy: http.ProxyFromEnvironment, + DialContext: (&net.Dialer{Timeout: sharedMountDialTimeout, KeepAlive: sharedMountKeepAlive}).DialContext, + ForceAttemptHTTP2: true, + MaxIdleConns: 100, + IdleConnTimeout: sharedMountIdleConnTTL, + TLSHandshakeTimeout: sharedMountDialTimeout, + ExpectContinueTimeout: 1 * time.Second, + ResponseHeaderTimeout: sharedMountHeaderTTL, + }, + }, } state := make([]*sharedMountState, 0, len(mounts)) @@ -134,23 +158,99 @@ func runInit(ctx context.Context, logger *log.Logger, client *sharedMountClient, if err := ensureMountPath(state.spec.MountPath); err != nil { return err } - manifest, found, err := client.latest(ctx, ownerID, state.spec.Name) - if err != nil { + if err := runInitMount(ctx, logger, client, ownerID, state); err != nil { return err } - if !found { - continue + } + logger.Print("init complete") + return nil +} + +func runInitMount(ctx context.Context, logger *log.Logger, client *sharedMountClient, ownerID string, state *sharedMountState) error { + deadline := time.Now().Add(initRetryWindow) + attempt := 0 + for { + attempt++ + err := runInitMountAttempt(ctx, client, ownerID, state) + if err == nil { + return nil } - if err := applyRevision(ctx, client, ownerID, state.spec, manifest.Revision); err != nil { + if !isRetryableInitError(err) || time.Now().After(deadline) { return err } - state.currentRevision = manifest.Revision - state.currentChecksum = manifest.Checksum + logger.Printf("init retry for %s attempt=%d after error: %v", state.spec.Name, attempt, err) + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(initRetryBackoff): + } } - logger.Print("init complete") +} + +func runInitMountAttempt(ctx context.Context, client *sharedMountClient, ownerID string, state *sharedMountState) error { + latestCtx, cancelLatest := context.WithTimeout(ctx, initLatestRequestTTL) + defer cancelLatest() + + manifest, found, err := client.latest(latestCtx, ownerID, state.spec.Name) + if err != nil { + return err + } + if !found { + return nil + } + + applyCtx, cancelApply := context.WithTimeout(ctx, initApplyRequestTTL) + defer cancelApply() + + if err := applyRevision(applyCtx, client, ownerID, state.spec, manifest.Revision); err != nil { + return err + } + state.currentRevision = manifest.Revision + state.currentChecksum = manifest.Checksum return nil } +type remoteHTTPError struct { + StatusCode int + Message string +} + +func (e *remoteHTTPError) Error() string { + return e.Message +} + +func isRetryableInitError(err error) bool { + if err == nil { + return false + } + if errors.Is(err, context.DeadlineExceeded) { + return true + } + var netErr net.Error + if errors.As(err, &netErr) && netErr.Timeout() { + return true + } + var urlErr *url.Error + if errors.As(err, &urlErr) { + if errors.Is(urlErr.Err, context.DeadlineExceeded) { + return true + } + if errors.As(urlErr.Err, &netErr) && netErr.Timeout() { + return true + } + } + var httpErr *remoteHTTPError + if errors.As(err, &httpErr) { + return httpErr.StatusCode == http.StatusTooManyRequests || httpErr.StatusCode >= http.StatusInternalServerError + } + message := strings.ToLower(err.Error()) + return strings.Contains(message, "i/o timeout") || + strings.Contains(message, "connection reset by peer") || + strings.Contains(message, "connection refused") || + strings.Contains(message, "no route to host") || + strings.Contains(message, "unexpected eof") +} + func runSidecar(ctx context.Context, logger *log.Logger, client *sharedMountClient, ownerID string, mounts []*sharedMountState) { for _, state := range mounts { state := state @@ -876,7 +976,10 @@ func (c *sharedMountClient) latest(ctx context.Context, ownerID, mount string) ( } if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) - return sharedmounts.LatestManifest{}, false, fmt.Errorf("latest fetch failed: %s", strings.TrimSpace(string(body))) + return sharedmounts.LatestManifest{}, false, &remoteHTTPError{ + StatusCode: resp.StatusCode, + Message: fmt.Sprintf("latest fetch failed (%d): %s", resp.StatusCode, strings.TrimSpace(string(body))), + } } body, err := io.ReadAll(resp.Body) if err != nil { @@ -985,7 +1088,10 @@ func (c *sharedMountClient) downloadRevision(ctx context.Context, ownerID, mount defer resp.Body.Close() if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) - return fmt.Errorf("revision fetch failed: %s", strings.TrimSpace(string(body))) + return &remoteHTTPError{ + StatusCode: resp.StatusCode, + Message: fmt.Sprintf("revision fetch failed (%d): %s", resp.StatusCode, strings.TrimSpace(string(body))), + } } _, err = io.Copy(dest, resp.Body) return err @@ -1016,7 +1122,10 @@ func (c *sharedMountClient) uploadRevision(ctx context.Context, ownerID, mount, defer resp.Body.Close() if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) - return fmt.Errorf("revision upload failed: %s", strings.TrimSpace(string(body))) + return &remoteHTTPError{ + StatusCode: resp.StatusCode, + Message: fmt.Sprintf("revision upload failed (%d): %s", resp.StatusCode, strings.TrimSpace(string(body))), + } } return nil } @@ -1046,7 +1155,10 @@ func (c *sharedMountClient) updateLatest(ctx context.Context, ownerID, mount str } if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) - return fmt.Errorf("latest update failed: %s", strings.TrimSpace(string(body))) + return &remoteHTTPError{ + StatusCode: resp.StatusCode, + Message: fmt.Sprintf("latest update failed (%d): %s", resp.StatusCode, strings.TrimSpace(string(body))), + } } return nil } diff --git a/api/cmd/shared-syncer/main_test.go b/api/cmd/shared-syncer/main_test.go index 971366e..2257f33 100644 --- a/api/cmd/shared-syncer/main_test.go +++ b/api/cmd/shared-syncer/main_test.go @@ -7,6 +7,7 @@ import ( "context" "encoding/json" "io" + "log" "net/http" "net/http/httptest" "os" @@ -15,6 +16,8 @@ import ( "strings" "testing" "time" + + "spritz.sh/operator/sharedmounts" ) func TestUploadRevisionSetsContentLength(t *testing.T) { @@ -154,6 +157,59 @@ func TestLatestRejectsInvalidPayload(t *testing.T) { } } +func TestRunInitRetriesTransientLatestTimeout(t *testing.T) { + originalRetryWindow := initRetryWindow + originalRetryBackoff := initRetryBackoff + originalLatestTTL := initLatestRequestTTL + originalApplyTTL := initApplyRequestTTL + t.Cleanup(func() { + initRetryWindow = originalRetryWindow + initRetryBackoff = originalRetryBackoff + initLatestRequestTTL = originalLatestTTL + initApplyRequestTTL = originalApplyTTL + }) + + initRetryWindow = 200 * time.Millisecond + initRetryBackoff = 5 * time.Millisecond + initLatestRequestTTL = 20 * time.Millisecond + initApplyRequestTTL = 50 * time.Millisecond + + attempts := 0 + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if !strings.HasSuffix(r.URL.Path, "/latest") { + t.Fatalf("unexpected path: %s", r.URL.Path) + } + attempts++ + if attempts == 1 { + time.Sleep(60 * time.Millisecond) + return + } + w.WriteHeader(http.StatusNotFound) + })) + defer srv.Close() + + client := &sharedMountClient{ + baseURL: srv.URL, + token: "token", + client: srv.Client(), + } + + state := []*sharedMountState{{ + spec: sharedmounts.MountSpec{ + Name: "config", + Scope: sharedmounts.ScopeOwner, + MountPath: t.TempDir(), + }, + }} + + if err := runInit(context.Background(), log.New(io.Discard, "", 0), client, "owner", state); err != nil { + t.Fatalf("runInit failed: %v", err) + } + if attempts < 2 { + t.Fatalf("expected init retry after transient timeout, got %d attempt(s)", attempts) + } +} + func TestEnsureEmptyLiveCreatesWritableCurrent(t *testing.T) { mountPath := filepath.Join(t.TempDir(), "mount") if err := ensureMountPath(mountPath); err != nil { diff --git a/helm/spritz/templates/api-deployment.yaml b/helm/spritz/templates/api-deployment.yaml index f0fa7f2..394177d 100644 --- a/helm/spritz/templates/api-deployment.yaml +++ b/helm/spritz/templates/api-deployment.yaml @@ -8,7 +8,7 @@ metadata: name: spritz-api namespace: {{ .Values.api.namespace }} spec: - replicas: 1 + replicas: {{ .Values.api.replicaCount }} selector: matchLabels: app.kubernetes.io/name: spritz-api @@ -27,6 +27,14 @@ spec: {{- end }} spec: serviceAccountName: {{ .Values.api.serviceAccountName }} + {{- with .Values.api.affinity }} + affinity: + {{- toYaml . | nindent 8 }} + {{- end }} + {{- with .Values.api.topologySpreadConstraints }} + topologySpreadConstraints: + {{- toYaml . | nindent 8 }} + {{- end }} containers: - name: api image: {{ .Values.api.image }} diff --git a/helm/spritz/templates/api-pdb.yaml b/helm/spritz/templates/api-pdb.yaml new file mode 100644 index 0000000..ca78a50 --- /dev/null +++ b/helm/spritz/templates/api-pdb.yaml @@ -0,0 +1,12 @@ +{{- if and .Values.api.podDisruptionBudget.enabled (gt (int .Values.api.replicaCount) 1) }} +apiVersion: policy/v1 +kind: PodDisruptionBudget +metadata: + name: spritz-api + namespace: {{ .Values.api.namespace }} +spec: + minAvailable: {{ .Values.api.podDisruptionBudget.minAvailable }} + selector: + matchLabels: + app.kubernetes.io/name: spritz-api +{{- end }} diff --git a/helm/spritz/values.yaml b/helm/spritz/values.yaml index 2bc69b7..39cfb42 100644 --- a/helm/spritz/values.yaml +++ b/helm/spritz/values.yaml @@ -116,6 +116,7 @@ operator: - ALL api: + replicaCount: 1 image: spritz-api:latest imagePullPolicy: IfNotPresent rolloutAt: "" @@ -123,6 +124,11 @@ api: serviceAccountName: spritz-api defaultAnnotations: "" podAnnotations: {} + affinity: {} + topologySpreadConstraints: [] + podDisruptionBudget: + enabled: false + minAvailable: 1 service: port: 8080 auth: diff --git a/scripts/verify-helm.sh b/scripts/verify-helm.sh index 761f99c..93ca4cb 100755 --- a/scripts/verify-helm.sh +++ b/scripts/verify-helm.sh @@ -64,12 +64,14 @@ default_render="${tmp_dir}/default.yaml" auth_render="${tmp_dir}/auth.yaml" auth_annotations_render="${tmp_dir}/auth-annotations.yaml" acp_network_policy_render="${tmp_dir}/acp-network-policy.yaml" +api_ha_render="${tmp_dir}/api-ha.yaml" helm lint "${chart_dir}" helm template spritz "${chart_dir}" >"${default_render}" helm template spritz "${chart_dir}" -f "${example_values}" >"${auth_render}" helm template spritz "${chart_dir}" -f "${example_values}" --set authGateway.ingress.annotations.authonly=enabled >"${auth_annotations_render}" helm template spritz "${chart_dir}" --set acp.networkPolicy.enabled=true >"${acp_network_policy_render}" +helm template spritz "${chart_dir}" --set api.replicaCount=2 --set api.podDisruptionBudget.enabled=true >"${api_ha_render}" expect_contains "${default_render}" "name: spritz-web" "spritz-web ingress in default render" expect_not_contains "${default_render}" "name: spritz-auth" "spritz-auth ingress when auth gateway is disabled" @@ -82,6 +84,8 @@ expect_contains "${auth_render}" "nginx.ingress.kubernetes.io/configuration-snip expect_contains "${auth_annotations_render}" "authonly: enabled" "auth ingress custom annotations in auth render" expect_contains "${acp_network_policy_render}" "kind: NetworkPolicy" "ACP network policy when enabled" expect_contains "${acp_network_policy_render}" "name: spritz-acp" "ACP network policy name when enabled" +expect_contains "${api_ha_render}" "kind: PodDisruptionBudget" "API pod disruption budget when enabled" +expect_contains "${api_ha_render}" "name: spritz-api" "API pod disruption budget name when enabled" expect_contains "${default_render}" 'resources: ["spritzes/status", "spritzconversations/status"]' "status RBAC for spritz conversations" expect_contains "${default_render}" "name: SPRITZ_AUTH_HEADER_TYPE" "principal type auth header wiring" expect_contains "${default_render}" "name: SPRITZ_AUTH_BEARER_SCOPES_PATHS" "bearer scope path wiring"