Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 56 additions & 10 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,15 @@ type Config struct {
// Token is the resolved bearer token compared (in constant time)
// against the Authorization header on every authenticated request.
Token string
// PeerTokens optionally maps a per-peer bearer token to the node
// name that presents it. When non-empty, requireBearer recovers the
// caller's authenticated node identity and the sync handlers bind
// each in-flight session to it (#110a); a token absent from the map
// still authenticates if it equals Token but carries no identity.
// Empty (the default) preserves the single-shared-token behaviour:
// every caller authenticates identically and no session binding is
// enforced.
PeerTokens map[string]string
// TLSCert and TLSKey are filesystem paths to a PEM-encoded certificate
// and matching private key. When both are empty the agent serves
// plain HTTP; when both are set it terminates TLS natively.
Expand Down Expand Up @@ -208,29 +217,66 @@ func (s *Server) buildHandler() http.Handler {

// requireBearer is the auth middleware. The Authorization header must
// parse as `<scheme> <token>` with scheme matching "Bearer" case-
// insensitively (per RFC 7235 §2.1) and token matching the configured
// value. We hash both sides to a fixed-length SHA-256 digest before
// subtle.ConstantTimeCompare so the comparison time is independent of
// the attacker-controlled token length (subtle.ConstantTimeCompare
// short-circuits on len mismatch, which would otherwise leak length).
// The configured token is non-empty (enforced by validateConfig).
// insensitively (per RFC 7235 §2.1). The token authenticates when it
// matches the shared token or any configured per-peer token; a per-peer
// match attaches that node's identity to the request context so the sync
// handlers can bind a session to its caller (#110a).
func (s *Server) requireBearer(next http.Handler) http.Handler {
expectedHash := sha256.Sum256([]byte(s.cfg.Token))
auth := newAuthenticator(s.cfg.Token, s.cfg.PeerTokens)
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
token, ok := extractBearerToken(r.Header.Get("Authorization"))
if !ok {
writeError(w, http.StatusUnauthorized, "missing bearer token")
return
}
gotHash := sha256.Sum256([]byte(token))
if subtle.ConstantTimeCompare(gotHash[:], expectedHash[:]) != 1 {
caller, ok := auth.authenticate(token)
if !ok {
writeError(w, http.StatusUnauthorized, "invalid bearer token")
return
}
next.ServeHTTP(w, r)
next.ServeHTTP(w, withCallerNode(r, caller))
})
}

// authenticator resolves a presented bearer token to an authenticated
// caller. The shared token and every per-peer token are pre-hashed to a
// fixed-length SHA-256 digest so the comparison time is independent of
// the attacker-controlled token length (subtle.ConstantTimeCompare
// short-circuits on a length mismatch, which would otherwise leak it).
// The per-peer set is keyed by digest rather than by the raw secret, so
// the map probe never compares attacker bytes against a stored secret
// directly.
type authenticator struct {
sharedHash [32]byte
peerNodes map[[32]byte]string
}

func newAuthenticator(sharedToken string, peerTokens map[string]string) authenticator {
a := authenticator{sharedHash: sha256.Sum256([]byte(sharedToken))}
if len(peerTokens) > 0 {
a.peerNodes = make(map[[32]byte]string, len(peerTokens))
for token, node := range peerTokens {
a.peerNodes[sha256.Sum256([]byte(token))] = node
}
}
return a
}

// authenticate returns the caller's authenticated node name and whether
// the token is valid. A per-peer token yields its node name; the shared
// token authenticates with an empty name (no recoverable identity, the
// single-token case #110d leaves unbound).
func (a authenticator) authenticate(token string) (string, bool) {
gotHash := sha256.Sum256([]byte(token))
if node, ok := a.peerNodes[gotHash]; ok {
return node, true
}
if subtle.ConstantTimeCompare(gotHash[:], a.sharedHash[:]) == 1 {
return "", true
}
return "", false
}

// extractBearerToken parses `<scheme> <token>` from an Authorization
// header value. The scheme match is case-insensitive; trailing
// whitespace after the scheme is consumed so `Bearer tok` (double
Expand Down
43 changes: 43 additions & 0 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,49 @@ func TestSyncBeginRejectsBearerWithLengthDifference(t *testing.T) {
}
}

// TestAuthenticatorResolvesCaller (#110a/#110d): a per-peer token
// resolves to its node name; the shared token authenticates with an
// empty identity; an unknown token is rejected. The empty-identity case
// is what keeps the session binding a no-op for shared-token callers.
func TestAuthenticatorResolvesCaller(t *testing.T) {
auth := newAuthenticator("shared", map[string]string{
"laptop-token": "laptop",
"nas-token": "nas",
})
cases := []struct {
token string
wantNode string
wantOK bool
}{
{"laptop-token", "laptop", true},
{"nas-token", "nas", true},
{"shared", "", true},
{"unknown", "", false},
}
for _, c := range cases {
t.Run(c.token, func(t *testing.T) {
node, ok := auth.authenticate(c.token)
if node != c.wantNode || ok != c.wantOK {
t.Fatalf("authenticate(%q) = (%q, %v), want (%q, %v)", c.token, node, ok, c.wantNode, c.wantOK)
}
})
}
}

// TestAuthenticatorNoPeerTokensSharedOnly: with no per-peer tokens the
// authenticator behaves exactly as the single-shared-token agent did —
// the shared token authenticates (empty identity), everything else is
// rejected.
func TestAuthenticatorNoPeerTokensSharedOnly(t *testing.T) {
auth := newAuthenticator("only-shared", nil)
if node, ok := auth.authenticate("only-shared"); !ok || node != "" {
t.Fatalf("shared token = (%q, %v), want (\"\", true)", node, ok)
}
if _, ok := auth.authenticate("nope"); ok {
t.Fatalf("unknown token authenticated, want rejection")
}
}

func TestServePlainHTTP(t *testing.T) {
srv := newTestServer(t, Config{})
ln, err := net.Listen("tcp", "127.0.0.1:0")
Expand Down
58 changes: 37 additions & 21 deletions agent/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,11 @@ type peerSession struct {
volumeID int64
peerNodeID int64
// initiatorNodeName is the caller identity declared at /begin,
// recorded so a phase call can be bound back to the node that
// opened the session. Under the single shared agent token there is
// no per-request authenticated identity to compare it against yet
// (see #110d); lookupSession is the chokepoint where that
// comparison lands once per-peer tokens make a caller identity
// recoverable.
// recorded so a later phase call is bound back to the node that
// opened the session. lookupSession compares it against the caller's
// authenticated identity when per-peer tokens make one recoverable
// (#110a); a shared-token caller carries no identity (#110d) and the
// comparison is skipped.
initiatorNodeName string
correlatedRunID int64
// dedupStrategy is the initiator-supplied preference applied by
Expand Down Expand Up @@ -228,13 +227,12 @@ var errSessionCallerMismatch = errors.New("caller node does not own this session

// lookupSession resolves the session for receiverRunID and binds it to
// the caller. A non-empty callerNode must equal the initiator name
// recorded at /begin, so a second node holding the shared token cannot
// drive another node's in-flight session. callerNode is empty today
// because the single shared agent token carries no per-request identity
// (#110d): the comparison is a no-op until per-peer tokens make a caller
// identity recoverable, at which point this is the single place it is
// enforced. ok is false when no session exists; err is non-nil only on a
// caller mismatch.
// recorded at /begin, so a node authenticated as one identity cannot
// drive another node's in-flight session (#110a). callerNode is "" for a
// shared-token caller (no recoverable identity, #110d); the comparison
// then yields verbatim, leaving the session unbound for that caller. This
// is the single place the binding is enforced. ok is false when no
// session exists; err is non-nil only on a caller mismatch.
func (r *peerSyncRouter) lookupSession(receiverRunID int64, callerNode string) (sess *peerSession, ok bool, err error) {
r.mu.Lock()
defer r.mu.Unlock()
Expand All @@ -248,12 +246,27 @@ func (r *peerSyncRouter) lookupSession(receiverRunID int64, callerNode string) (
return sess, true, nil
}

// callerNodeName returns the authenticated initiator identity for a
// phase request, or "" when none is recoverable. The single shared
// agent token authenticates every peer identically, so no per-request
// identity exists yet (#110d); this returns "" until per-peer tokens
// land, keeping the lookupSession binding point in one spot.
func callerNodeName(*http.Request) string { return "" }
// callerNodeContextKey types the request-context slot requireBearer
// stamps with the authenticated caller's node name.
type callerNodeContextKey struct{}

// withCallerNode returns req carrying node as its authenticated caller
// identity. An empty node (the shared-token case, which carries no
// recoverable identity) is stored verbatim so callerNodeName reads it
// back as "" and the session binding stays a no-op for that caller.
func withCallerNode(req *http.Request, node string) *http.Request {
return req.WithContext(context.WithValue(req.Context(), callerNodeContextKey{}, node))
}

// callerNodeName returns the authenticated initiator identity requireBearer
// stamped on the request, or "" when none is recoverable. A per-peer token
// resolves to its node name; the single shared token authenticates without
// an identity (#110d), so this returns "" and lookupSession leaves the
// session unbound for that caller.
func callerNodeName(req *http.Request) string {
node, _ := req.Context().Value(callerNodeContextKey{}).(string)
return node
}

// handleBegin implements POST /v1/sync/begin. The handler is the
// thin HTTP shell over beginSession, which carries the actual flow.
Expand All @@ -263,7 +276,7 @@ func (r *peerSyncRouter) handleBegin(w http.ResponseWriter, req *http.Request) {
writeError(w, http.StatusBadRequest, err.Error())
return
}
resp, status, err := r.beginSession(req.Context(), body)
resp, status, err := r.beginSession(req.Context(), body, callerNodeName(req))
if err != nil {
writeError(w, status, err.Error())
return
Expand All @@ -277,10 +290,13 @@ func (r *peerSyncRouter) handleBegin(w http.ResponseWriter, req *http.Request) {
// The lock is acquired before any DB row insertion that would need
// rollback on a later failure; releasing it lives in the per-phase
// guard.
func (r *peerSyncRouter) beginSession(ctx context.Context, body syncproto.BeginRequest) (syncproto.BeginResponse, int, error) {
func (r *peerSyncRouter) beginSession(ctx context.Context, body syncproto.BeginRequest, callerNode string) (syncproto.BeginResponse, int, error) {
if body.Volume == "" || body.InitiatorNodeName == "" || body.InitiatorRunID == 0 {
return syncproto.BeginResponse{}, http.StatusBadRequest, errors.New("volume, initiator_node_name, and initiator_run_id are required")
}
if callerNode != "" && callerNode != body.InitiatorNodeName {
return syncproto.BeginResponse{}, http.StatusForbidden, fmt.Errorf("authenticated node %q may not open a session as %q", callerNode, body.InitiatorNodeName)
}
strategy, err := normalizeDedupStrategy(body.DedupStrategy)
if err != nil {
return syncproto.BeginResponse{}, http.StatusBadRequest, err
Expand Down
80 changes: 76 additions & 4 deletions agent/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,10 +612,10 @@ func TestValidateRelPathRejectsAllReservedDirs(t *testing.T) {

// TestSessionBoundToCaller (#110a): a phase call presenting a caller
// identity that differs from the node that opened the session is refused.
// The single shared agent token carries no per-request identity yet
// (#110d), so the production phase handlers pass "" (no binding) — this
// exercises the binding directly to prove the chokepoint is correct for
// when per-peer tokens make a caller identity recoverable.
// This exercises the lookup binding directly; the empty-caller case is
// the shared-token path (#110d), which carries no identity and so reaches
// the session unbound. The end-to-end binding over per-peer tokens is
// covered by TestPeerTokenSessionBinding.
func TestSessionBoundToCaller(t *testing.T) {
f := newPreStageFixture(t)
r := f.router
Expand Down Expand Up @@ -705,6 +705,78 @@ func TestPeerEndpointIgnoresWireEndpoint(t *testing.T) {
}
}

// TestPeerTokenSessionBinding (#110a + safe #110d increment): with
// per-peer tokens configured, a session opened by one peer's token is
// bound to that peer's identity. A phase call carrying the same
// receiver_run_id but a different peer's token is refused with 403, so a
// second token-holder can't hijack (or /close-abort) the session. A
// shared-token caller still authenticates but, carrying no identity,
// reaches the session unbound (the pre-#110d behaviour).
func TestPeerTokenSessionBinding(t *testing.T) {
vol := &config.Volume{Name: "pics", Path: t.TempDir()}
srv := newTestServer(t, Config{
Token: "shared",
Volumes: map[string]*config.Volume{vol.Name: vol},
PeerTokens: map[string]string{"owner-token": "owner", "intruder-token": "intruder"},
})

var begin syncproto.BeginResponse
if code := postJSON(t, srv, "/v1/sync/begin", "owner-token", syncproto.BeginRequest{
Volume: vol.Name, InitiatorNodeName: "owner", InitiatorRunID: 1,
}, &begin); code != http.StatusOK {
t.Fatalf("begin as owner: status = %d, want 200", code)
}

verify := syncproto.VerifyRequest{ReceiverRunID: begin.ReceiverRunID}
if code := postJSON(t, srv, "/v1/sync/verify", "intruder-token", verify, nil); code != http.StatusForbidden {
t.Fatalf("verify as intruder: status = %d, want 403", code)
}
if code := postJSON(t, srv, "/v1/sync/verify", "owner-token", verify, nil); code != http.StatusOK {
t.Fatalf("verify as owner: status = %d, want 200", code)
}
}

// TestBeginRejectsImpersonatedNodeName (safe #110d increment): a caller
// authenticated as one node may not open a session declaring a different
// initiator_node_name, so the declared identity is bound to the
// credential rather than self-asserted.
func TestBeginRejectsImpersonatedNodeName(t *testing.T) {
vol := &config.Volume{Name: "pics", Path: t.TempDir()}
srv := newTestServer(t, Config{
Token: "shared",
Volumes: map[string]*config.Volume{vol.Name: vol},
PeerTokens: map[string]string{"owner-token": "owner"},
})
code := postJSON(t, srv, "/v1/sync/begin", "owner-token", syncproto.BeginRequest{
Volume: vol.Name, InitiatorNodeName: "someone-else", InitiatorRunID: 1,
}, nil)
if code != http.StatusForbidden {
t.Fatalf("begin impersonating someone-else: status = %d, want 403", code)
}
}

// postJSON marshals body, POSTs it to urlPath with the given bearer
// token, decodes a 200 response into out (when non-nil), and returns the
// status so a test can assert auth/binding rejections.
func postJSON(t *testing.T, srv *Server, urlPath, token string, body, out any) int {
t.Helper()
encoded, err := json.Marshal(body)
if err != nil {
t.Fatalf("marshal: %v", err)
}
req := httptest.NewRequest(http.MethodPost, urlPath, bytes.NewReader(encoded))
req.Header.Set("Authorization", "Bearer "+token)
req.Header.Set("Content-Type", "application/json")
rec := httptest.NewRecorder()
srv.Handler().ServeHTTP(rec, req)
if rec.Code == http.StatusOK && out != nil {
if err := json.Unmarshal(rec.Body.Bytes(), out); err != nil {
t.Fatalf("decode response: %v", err)
}
}
return rec.Code
}

// postRaw POSTs body verbatim to urlPath with the test bearer token and
// returns the HTTP status, so a malformed or oversized body can be driven
// without the typed marshal helpers rejecting it first.
Expand Down
1 change: 1 addition & 0 deletions cmd/squirrel/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func runAgent(cmd *cobra.Command) error {
srv, err := agent.New(agent.Config{
Listen: cfg.Agent.Listen,
Token: cfg.Agent.Token,
PeerTokens: cfg.Agent.PeerTokens,
TLSCert: cfg.Agent.TLSCert,
TLSKey: cfg.Agent.TLSKey,
Version: agentVersion,
Expand Down
Loading
Loading