Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 125 additions & 13 deletions api/cmd/shared-syncer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"fmt"
"io"
"log"
"net"
"net/http"
"net/url"
"os"
Expand All @@ -37,6 +38,17 @@ const (
publishSuppressAfterApply = 2 * time.Second
)

var (
initRetryWindow = 2 * time.Minute
initRetryBackoff = 2 * time.Second
initLatestRequestTTL = 15 * time.Second
initApplyRequestTTL = 60 * time.Second
sharedMountDialTimeout = 5 * time.Second
sharedMountKeepAlive = 30 * time.Second
sharedMountHeaderTTL = 30 * time.Second
sharedMountIdleConnTTL = 90 * time.Second
)

type sharedMountClient struct {
baseURL string
token string
Expand Down Expand Up @@ -71,7 +83,19 @@ func main() {
token: token,
// Long-polling calls can legitimately hold the connection open.
// Prefer per-request timeouts (via context) over a tight global client timeout.
client: &http.Client{Timeout: 5 * time.Minute},
client: &http.Client{
Timeout: 5 * time.Minute,
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{Timeout: sharedMountDialTimeout, KeepAlive: sharedMountKeepAlive}).DialContext,
ForceAttemptHTTP2: true,
MaxIdleConns: 100,
IdleConnTimeout: sharedMountIdleConnTTL,
TLSHandshakeTimeout: sharedMountDialTimeout,
ExpectContinueTimeout: 1 * time.Second,
ResponseHeaderTimeout: sharedMountHeaderTTL,
},
},
}

state := make([]*sharedMountState, 0, len(mounts))
Expand Down Expand Up @@ -134,23 +158,99 @@ func runInit(ctx context.Context, logger *log.Logger, client *sharedMountClient,
if err := ensureMountPath(state.spec.MountPath); err != nil {
return err
}
manifest, found, err := client.latest(ctx, ownerID, state.spec.Name)
if err != nil {
if err := runInitMount(ctx, logger, client, ownerID, state); err != nil {
return err
}
if !found {
continue
}
logger.Print("init complete")
return nil
}

func runInitMount(ctx context.Context, logger *log.Logger, client *sharedMountClient, ownerID string, state *sharedMountState) error {
deadline := time.Now().Add(initRetryWindow)
attempt := 0
for {
attempt++
err := runInitMountAttempt(ctx, client, ownerID, state)
if err == nil {
return nil
}
if err := applyRevision(ctx, client, ownerID, state.spec, manifest.Revision); err != nil {
if !isRetryableInitError(err) || time.Now().After(deadline) {
return err
}
state.currentRevision = manifest.Revision
state.currentChecksum = manifest.Checksum
logger.Printf("init retry for %s attempt=%d after error: %v", state.spec.Name, attempt, err)
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(initRetryBackoff):
}
}
logger.Print("init complete")
}

func runInitMountAttempt(ctx context.Context, client *sharedMountClient, ownerID string, state *sharedMountState) error {
latestCtx, cancelLatest := context.WithTimeout(ctx, initLatestRequestTTL)
defer cancelLatest()

manifest, found, err := client.latest(latestCtx, ownerID, state.spec.Name)
if err != nil {
return err
}
if !found {
return nil
}

applyCtx, cancelApply := context.WithTimeout(ctx, initApplyRequestTTL)
defer cancelApply()

if err := applyRevision(applyCtx, client, ownerID, state.spec, manifest.Revision); err != nil {
return err
}
state.currentRevision = manifest.Revision
state.currentChecksum = manifest.Checksum
return nil
}

type remoteHTTPError struct {
StatusCode int
Message string
}

func (e *remoteHTTPError) Error() string {
return e.Message
}

func isRetryableInitError(err error) bool {
if err == nil {
return false
}
if errors.Is(err, context.DeadlineExceeded) {
return true
}
var netErr net.Error
if errors.As(err, &netErr) && netErr.Timeout() {
return true
}
var urlErr *url.Error
if errors.As(err, &urlErr) {
if errors.Is(urlErr.Err, context.DeadlineExceeded) {
return true
}
if errors.As(urlErr.Err, &netErr) && netErr.Timeout() {
return true
}
}
var httpErr *remoteHTTPError
if errors.As(err, &httpErr) {
return httpErr.StatusCode == http.StatusTooManyRequests || httpErr.StatusCode >= http.StatusInternalServerError
}
message := strings.ToLower(err.Error())
return strings.Contains(message, "i/o timeout") ||
strings.Contains(message, "connection reset by peer") ||
strings.Contains(message, "connection refused") ||
strings.Contains(message, "no route to host") ||
strings.Contains(message, "unexpected eof")
}

func runSidecar(ctx context.Context, logger *log.Logger, client *sharedMountClient, ownerID string, mounts []*sharedMountState) {
for _, state := range mounts {
state := state
Expand Down Expand Up @@ -876,7 +976,10 @@ func (c *sharedMountClient) latest(ctx context.Context, ownerID, mount string) (
}
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return sharedmounts.LatestManifest{}, false, fmt.Errorf("latest fetch failed: %s", strings.TrimSpace(string(body)))
return sharedmounts.LatestManifest{}, false, &remoteHTTPError{
StatusCode: resp.StatusCode,
Message: fmt.Sprintf("latest fetch failed (%d): %s", resp.StatusCode, strings.TrimSpace(string(body))),
}
}
body, err := io.ReadAll(resp.Body)
if err != nil {
Expand Down Expand Up @@ -985,7 +1088,10 @@ func (c *sharedMountClient) downloadRevision(ctx context.Context, ownerID, mount
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("revision fetch failed: %s", strings.TrimSpace(string(body)))
return &remoteHTTPError{
StatusCode: resp.StatusCode,
Message: fmt.Sprintf("revision fetch failed (%d): %s", resp.StatusCode, strings.TrimSpace(string(body))),
}
}
_, err = io.Copy(dest, resp.Body)
return err
Expand Down Expand Up @@ -1016,7 +1122,10 @@ func (c *sharedMountClient) uploadRevision(ctx context.Context, ownerID, mount,
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("revision upload failed: %s", strings.TrimSpace(string(body)))
return &remoteHTTPError{
StatusCode: resp.StatusCode,
Message: fmt.Sprintf("revision upload failed (%d): %s", resp.StatusCode, strings.TrimSpace(string(body))),
}
}
return nil
}
Expand Down Expand Up @@ -1046,7 +1155,10 @@ func (c *sharedMountClient) updateLatest(ctx context.Context, ownerID, mount str
}
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("latest update failed: %s", strings.TrimSpace(string(body)))
return &remoteHTTPError{
StatusCode: resp.StatusCode,
Message: fmt.Sprintf("latest update failed (%d): %s", resp.StatusCode, strings.TrimSpace(string(body))),
}
}
return nil
}
Expand Down
56 changes: 56 additions & 0 deletions api/cmd/shared-syncer/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"encoding/json"
"io"
"log"
"net/http"
"net/http/httptest"
"os"
Expand All @@ -15,6 +16,8 @@ import (
"strings"
"testing"
"time"

"spritz.sh/operator/sharedmounts"
)

func TestUploadRevisionSetsContentLength(t *testing.T) {
Expand Down Expand Up @@ -154,6 +157,59 @@ func TestLatestRejectsInvalidPayload(t *testing.T) {
}
}

func TestRunInitRetriesTransientLatestTimeout(t *testing.T) {
originalRetryWindow := initRetryWindow
originalRetryBackoff := initRetryBackoff
originalLatestTTL := initLatestRequestTTL
originalApplyTTL := initApplyRequestTTL
t.Cleanup(func() {
initRetryWindow = originalRetryWindow
initRetryBackoff = originalRetryBackoff
initLatestRequestTTL = originalLatestTTL
initApplyRequestTTL = originalApplyTTL
})

initRetryWindow = 200 * time.Millisecond
initRetryBackoff = 5 * time.Millisecond
initLatestRequestTTL = 20 * time.Millisecond
initApplyRequestTTL = 50 * time.Millisecond

attempts := 0
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if !strings.HasSuffix(r.URL.Path, "/latest") {
t.Fatalf("unexpected path: %s", r.URL.Path)
}
attempts++
if attempts == 1 {
time.Sleep(60 * time.Millisecond)
return
}
w.WriteHeader(http.StatusNotFound)
}))
defer srv.Close()

client := &sharedMountClient{
baseURL: srv.URL,
token: "token",
client: srv.Client(),
}

state := []*sharedMountState{{
spec: sharedmounts.MountSpec{
Name: "config",
Scope: sharedmounts.ScopeOwner,
MountPath: t.TempDir(),
},
}}

if err := runInit(context.Background(), log.New(io.Discard, "", 0), client, "owner", state); err != nil {
t.Fatalf("runInit failed: %v", err)
}
if attempts < 2 {
t.Fatalf("expected init retry after transient timeout, got %d attempt(s)", attempts)
}
}

func TestEnsureEmptyLiveCreatesWritableCurrent(t *testing.T) {
mountPath := filepath.Join(t.TempDir(), "mount")
if err := ensureMountPath(mountPath); err != nil {
Expand Down
10 changes: 9 additions & 1 deletion helm/spritz/templates/api-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ metadata:
name: spritz-api
namespace: {{ .Values.api.namespace }}
spec:
replicas: 1
replicas: {{ .Values.api.replicaCount }}
selector:
matchLabels:
app.kubernetes.io/name: spritz-api
Expand All @@ -27,6 +27,14 @@ spec:
{{- end }}
spec:
serviceAccountName: {{ .Values.api.serviceAccountName }}
{{- with .Values.api.affinity }}
affinity:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.api.topologySpreadConstraints }}
topologySpreadConstraints:
{{- toYaml . | nindent 8 }}
{{- end }}
containers:
- name: api
image: {{ .Values.api.image }}
Expand Down
12 changes: 12 additions & 0 deletions helm/spritz/templates/api-pdb.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{{- if and .Values.api.podDisruptionBudget.enabled (gt (int .Values.api.replicaCount) 1) }}
apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
name: spritz-api
namespace: {{ .Values.api.namespace }}
spec:
minAvailable: {{ .Values.api.podDisruptionBudget.minAvailable }}
selector:
matchLabels:
app.kubernetes.io/name: spritz-api
{{- end }}
6 changes: 6 additions & 0 deletions helm/spritz/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,19 @@ operator:
- ALL

api:
replicaCount: 1
image: spritz-api:latest
imagePullPolicy: IfNotPresent
rolloutAt: ""
namespace: spritz-system
serviceAccountName: spritz-api
defaultAnnotations: ""
podAnnotations: {}
affinity: {}
topologySpreadConstraints: []
podDisruptionBudget:
enabled: false
minAvailable: 1
service:
port: 8080
auth:
Expand Down
4 changes: 4 additions & 0 deletions scripts/verify-helm.sh
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,14 @@ default_render="${tmp_dir}/default.yaml"
auth_render="${tmp_dir}/auth.yaml"
auth_annotations_render="${tmp_dir}/auth-annotations.yaml"
acp_network_policy_render="${tmp_dir}/acp-network-policy.yaml"
api_ha_render="${tmp_dir}/api-ha.yaml"

helm lint "${chart_dir}"
helm template spritz "${chart_dir}" >"${default_render}"
helm template spritz "${chart_dir}" -f "${example_values}" >"${auth_render}"
helm template spritz "${chart_dir}" -f "${example_values}" --set authGateway.ingress.annotations.authonly=enabled >"${auth_annotations_render}"
helm template spritz "${chart_dir}" --set acp.networkPolicy.enabled=true >"${acp_network_policy_render}"
helm template spritz "${chart_dir}" --set api.replicaCount=2 --set api.podDisruptionBudget.enabled=true >"${api_ha_render}"

expect_contains "${default_render}" "name: spritz-web" "spritz-web ingress in default render"
expect_not_contains "${default_render}" "name: spritz-auth" "spritz-auth ingress when auth gateway is disabled"
Expand All @@ -82,6 +84,8 @@ expect_contains "${auth_render}" "nginx.ingress.kubernetes.io/configuration-snip
expect_contains "${auth_annotations_render}" "authonly: enabled" "auth ingress custom annotations in auth render"
expect_contains "${acp_network_policy_render}" "kind: NetworkPolicy" "ACP network policy when enabled"
expect_contains "${acp_network_policy_render}" "name: spritz-acp" "ACP network policy name when enabled"
expect_contains "${api_ha_render}" "kind: PodDisruptionBudget" "API pod disruption budget when enabled"
expect_contains "${api_ha_render}" "name: spritz-api" "API pod disruption budget name when enabled"
expect_contains "${default_render}" 'resources: ["spritzes/status", "spritzconversations/status"]' "status RBAC for spritz conversations"
expect_contains "${default_render}" "name: SPRITZ_AUTH_HEADER_TYPE" "principal type auth header wiring"
expect_contains "${default_render}" "name: SPRITZ_AUTH_BEARER_SCOPES_PATHS" "bearer scope path wiring"
Expand Down
Loading