Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .changeset/bounded-bedrock-and-priced-calls.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
'@eks-agent/sdk': minor
'@eks-agent/pricing': minor
'@eks-agent/core': minor
---

Harden the Bedrock call path and make unpriced traffic observable.

- `@eks-agent/sdk`: every `BedrockAdapter.messages()` call now carries a bounded request deadline even when the caller passes no `AbortSignal`. New `requestTimeoutMs` option (default 60s); a caller-supplied signal is combined with the deadline rather than replacing it, and a deadline fire classifies as a retryable `Network` error.
- `@eks-agent/pricing`: new `priceModel()` returns `{ costUsd, priced }` so an unknown model id is observable instead of silently metering as `$0`. `estimateCost()` is unchanged and now delegates to `priceModel`.
- `@eks-agent/core`: `CallEvent` gains an optional `unpriced` flag, set by the SDK when a model id has no pricing entry, so cost dashboards can surface unmetered traffic.
6 changes: 3 additions & 3 deletions .github/workflows/security.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: aquasecurity/trivy-action@master
- uses: aquasecurity/trivy-action@ed142fd0673e97e23eac54620cfb913e5ce36c25 # v0.36.0
with:
scan-type: config
scan-ref: .
Expand Down Expand Up @@ -68,7 +68,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: aquasecurity/trivy-action@master
- uses: aquasecurity/trivy-action@ed142fd0673e97e23eac54620cfb913e5ce36c25 # v0.36.0
with:
scan-type: fs
scan-ref: .
Expand Down Expand Up @@ -105,7 +105,7 @@ jobs:
with:
go-version-file: operators/go.mod
cache-dependency-path: operators/go.sum
- uses: securego/gosec@master
- uses: securego/gosec@9e6a9843d7a4a6e3e9a8539b02612c8a4aa3f889 # v2.27.1
with:
args: -severity medium -fmt sarif -out gosec.sarif ./operators/...
- uses: github/codeql-action/upload-sarif@v3
Expand Down
2 changes: 1 addition & 1 deletion docs/adr/0002-bedrock-only-v1.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@ v1 ships only `BedrockAdapter` with per-family submodules (Anthropic, Meta, Mist
## Consequences

- The SDK has a thin family of adapters that share `BedrockAdapter` as a base. Adding a model family means adding `buildRequestBody` + `parseResponseBody` for that family's wire shape, nothing more.
- Pricing tables live in `@eks-agent/pricing` and are Bedrock-only. Renovate refresh is weekly.
- Pricing tables live in `@eks-agent/pricing`, are Bedrock-only, and are hand-curated — Renovate bumps the package's deps, not the `PRICES` content. An automated refresh from the AWS Pricing API (`scripts/refresh-pricing.mjs`) is Phase-2 and currently a fail-loud scaffold; until it lands, prices are updated by hand, and a model id missing from the table meters as an unmetered `0` (`priced:false` via `priceModel`) rather than a silent real `$0`.
- The error taxonomy in `@eks-agent/core` is provider-agnostic, so the day we add a non-Bedrock adapter, downstream code doesn't change.
15 changes: 14 additions & 1 deletion operators/internal/awsclients/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ package awsclients
import (
"context"
"fmt"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
awshttp "github.com/aws/aws-sdk-go-v2/aws/transport/http"
awsconfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/athena"
"github.com/aws/aws-sdk-go-v2/service/bedrock"
Expand Down Expand Up @@ -38,11 +40,22 @@ type Clients struct {
Bedrock Bedrock
}

// awsHTTPTimeout bounds every AWS SDK request. controller-runtime does not
// decorate the reconcile context with a per-call deadline, and the SDK's
// default transport sets no overall request timeout — so without this a
// connection that establishes then stalls before responding would pin a
// bounded reconcile worker indefinitely, eventually starving the pool. 30s
// comfortably covers the slowest single control-plane call; the Athena poll
// path bounds its own multi-call loop separately (budget_reconcile.go).
const awsHTTPTimeout = 30 * time.Second

// New builds a Clients backed by the default credential chain (IRSA via
// fromContainerCredentials → fromEnv → fromInstanceProfile). Region is
// resolved from the same chain unless explicitly passed.
func New(ctx context.Context, region string) (*Clients, error) {
opts := []func(*awsconfig.LoadOptions) error{}
opts := []func(*awsconfig.LoadOptions) error{
awsconfig.WithHTTPClient(awshttp.NewBuildableClient().WithTimeout(awsHTTPTimeout)),
}
if region != "" {
opts = append(opts, awsconfig.WithRegion(region))
}
Expand Down
77 changes: 77 additions & 0 deletions operators/internal/controller/budget_killswitch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
Copyright 2026 stxkxs.

Licensed under the Apache License, Version 2.0 (the "License");
*/

package controller

import (
"context"
"strings"
"testing"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/eventbridge"
ebtypes "github.com/aws/aws-sdk-go-v2/service/eventbridge/types"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

commonv1alpha1 "github.com/nanohype/eks-agent-platform/operators/api/common/v1alpha1"
governancev1alpha1 "github.com/nanohype/eks-agent-platform/operators/api/governance/v1alpha1"
)

// fakeEventBridge is a minimal in-memory awsclients.EventBridge that returns a
// canned PutEvents result so the partial-failure branch can be asserted.
type fakeEventBridge struct {
out *eventbridge.PutEventsOutput
calls []eventbridge.PutEventsInput
}

func (f *fakeEventBridge) PutEvents(_ context.Context, params *eventbridge.PutEventsInput, _ ...func(*eventbridge.Options)) (*eventbridge.PutEventsOutput, error) {
f.calls = append(f.calls, *params)
return f.out, nil
}

func newBudgetPolicy() *governancev1alpha1.BudgetPolicy {
return &governancev1alpha1.BudgetPolicy{
ObjectMeta: metav1.ObjectMeta{Name: "acme-budget", Namespace: "tenants-acme"},
Spec: governancev1alpha1.BudgetPolicySpec{
PlatformRef: commonv1alpha1.LocalRef{Name: "acme"},
MonthlyUsd: "100.00",
},
}
}

func TestFireKillSwitch_SuccessReturnsNil(t *testing.T) {
eb := &fakeEventBridge{out: &eventbridge.PutEventsOutput{FailedEntryCount: 0}}
r := &BudgetReconciler{EventBridge: eb, KillSwitchEventBusName: "killswitch-bus"}

if err := r.fireKillSwitch(context.Background(), newBudgetPolicy(), "150.00", 150); err != nil {
t.Fatalf("fireKillSwitch on a clean PutEvents must return nil, got %v", err)
}
if len(eb.calls) != 1 {
t.Fatalf("want exactly 1 PutEvents call, got %d", len(eb.calls))
}
if got := aws.ToString(eb.calls[0].Entries[0].EventBusName); got != "killswitch-bus" {
t.Errorf("event bus = %q, want killswitch-bus", got)
}
}

func TestFireKillSwitch_PartialFailureIsRetryableError(t *testing.T) {
eb := &fakeEventBridge{out: &eventbridge.PutEventsOutput{
FailedEntryCount: 1,
Entries: []ebtypes.PutEventsResultEntry{{
ErrorCode: aws.String("ThrottlingException"),
ErrorMessage: aws.String("rate exceeded"),
}},
}}
r := &BudgetReconciler{EventBridge: eb, KillSwitchEventBusName: "killswitch-bus"}

err := r.fireKillSwitch(context.Background(), newBudgetPolicy(), "150.00", 150)
if err == nil {
t.Fatal("FailedEntryCount>0 must surface as an error so the kill-switch is retried, got nil")
}
if !strings.Contains(err.Error(), "ThrottlingException") {
t.Errorf("error must carry the failed entry's ErrorCode, got %q", err.Error())
}
}
3 changes: 3 additions & 0 deletions operators/internal/controller/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ package controller
// prevent.
const labelPrefix = "agents.nanohype.dev"

// Exported label keys, one per kind of object the operator labels. Each is the
// single source of truth for both the object's metadata label and any selector
// that matches it.
const (
LabelPlatform = labelPrefix + "/platform"
LabelTenant = labelPrefix + "/tenant"
Expand Down
9 changes: 9 additions & 0 deletions operators/internal/controller/platform_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package controller
import (
"context"
"fmt"
"sync"
"time"

corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -52,6 +53,14 @@ type PlatformReconciler struct {
// AWSCfg carries the SSM-resolved values the KMS + S3 steps need:
// DataKMSKeyARN, ArtifactsBucketName, Environment.
AWSCfg PlatformAWSConfig

// bucketPolicyMu serializes the read-modify-write of the SHARED artifacts
// bucket policy across concurrent reconciles. That policy is one document
// holding a statement per tenant; with MaxConcurrentReconciles > 1 two
// Platform reconciles could otherwise interleave Get→mutate→Put and
// silently drop a peer tenant's statement. The operator runs as a single
// leader (leader election), so a process-local mutex is sufficient.
bucketPolicyMu sync.Mutex
}

// +kubebuilder:rbac:groups=platform.nanohype.dev,resources=platforms,verbs=get;list;watch;update;patch
Expand Down
10 changes: 10 additions & 0 deletions operators/internal/controller/platform_kms_s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,12 @@ func (r *PlatformReconciler) ensureBucketPolicy(ctx context.Context, p *platform
},
}

// Serialize the shared-bucket-policy read-modify-write so concurrent
// reconciles can't interleave Get→mutate→Put and drop a peer tenant's
// statement (see PlatformReconciler.bucketPolicyMu).
r.bucketPolicyMu.Lock()
defer r.bucketPolicyMu.Unlock()

currentDoc, err := r.fetchBucketPolicy(ctx, bucket)
if err != nil {
return err
Expand Down Expand Up @@ -210,6 +216,10 @@ func (r *PlatformReconciler) removeBucketPolicyStatements(ctx context.Context, p
}
bucket := cfg.ArtifactsBucketName
sid := "TenantAccess-" + p.Name
// Same shared-document serialization as ensureBucketPolicy: a finalizer
// teardown must not interleave with a peer tenant's reconcile write.
r.bucketPolicyMu.Lock()
defer r.bucketPolicyMu.Unlock()
currentDoc, err := r.fetchBucketPolicy(ctx, bucket)
if err != nil {
return err
Expand Down
Loading
Loading