From 20d5e01844a7f177cbbaad253cd6ccd7caee95f6 Mon Sep 17 00:00:00 2001 From: Tyler Gannon Date: Mon, 23 Feb 2026 18:08:51 +0000 Subject: [PATCH 1/2] fix: return error frames instead of dropping connection on store errors GetBlob, GetHead, and PutBlob handlers used ? on store operations, which propagated errors out of handle_client() and dropped the TCP connection. This violated the protocol spec (docs/protocol.md s7) which requires a 404 ERROR frame for blob not found. Changed store operation errors to flow into the response match so the error-frame builder (map_error + encode_error) handles them. Go client additions: - GetBlob implementation (Client + ReconnectingClient) - ErrBlobNotFound sentinel error - Unit tests with mock TCP server (fs_test.go) - cxdb-blob-verify CLI tool for live server validation - get_blob protocol fixture --- clients/go/cmd/cxdb-blob-verify/main.go | 296 +++++++++++++++++++ clients/go/cmd/cxdb-fixtures/main.go | 7 + clients/go/errors.go | 3 + clients/go/fs.go | 23 ++ clients/go/fs_test.go | 359 ++++++++++++++++++++++++ clients/go/reconnect.go | 11 + fixtures/protocol/get_blob.json | 6 + server/src/main.rs | 38 ++- 8 files changed, 728 insertions(+), 15 deletions(-) create mode 100644 clients/go/cmd/cxdb-blob-verify/main.go create mode 100644 clients/go/fs_test.go create mode 100644 fixtures/protocol/get_blob.json diff --git a/clients/go/cmd/cxdb-blob-verify/main.go b/clients/go/cmd/cxdb-blob-verify/main.go new file mode 100644 index 0000000..80e1be1 --- /dev/null +++ b/clients/go/cmd/cxdb-blob-verify/main.go @@ -0,0 +1,296 @@ +// Copyright 2025 StrongDM Inc +// SPDX-License-Identifier: Apache-2.0 + +// cxdb-blob-verify is a one-off verification tool that exercises PutBlob and +// GetBlob against a live CXDB server. It verifies end-to-end correctness of +// the blob CAS round-trip, including content integrity via BLAKE3 hashing. +// +// Usage: +// +// cxdb-blob-verify -addr 127.0.0.1:9009 +// CXDB_PORT=9009 cxdb-blob-verify +package main + +import ( + "bytes" + "context" + "encoding/hex" + "errors" + "flag" + "fmt" + "os" + "time" + + cxdb "github.com/strongdm/ai-cxdb/clients/go" + "github.com/zeebo/blake3" +) + +var addr string + +func dial() *cxdb.Client { + client, err := cxdb.Dial(addr, cxdb.WithClientTag("blob-verify")) + if err != nil { + fmt.Fprintf(os.Stderr, " dial failed: %v\n", err) + os.Exit(2) + } + return client +} + +func main() { + flag.StringVar(&addr, "addr", "", "CXDB server address (host:port). Falls back to 127.0.0.1:$CXDB_PORT") + flag.Parse() + + if addr == "" { + port := os.Getenv("CXDB_PORT") + if port == "" { + fmt.Fprintln(os.Stderr, "error: no -addr flag and CXDB_PORT not set") + os.Exit(1) + } + addr = "127.0.0.1:" + port + } + + fmt.Printf("server: %s\n\n", addr) + + passed := 0 + failed := 0 + + run := func(name string, fn func() error) { + fmt.Printf("=== %s ===\n", name) + if err := fn(); err != nil { + fmt.Fprintf(os.Stderr, " FAIL: %v\n", err) + failed++ + } else { + fmt.Println(" PASS") + passed++ + } + fmt.Println() + } + + // --- Test 1: PutBlob + GetBlob round-trip --- + run("PutBlob + GetBlob round-trip", func() error { + client := dial() + defer client.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + testData := []byte(fmt.Sprintf("cxdb-blob-verify test payload %d", time.Now().UnixNano())) + expectedHash := blake3.Sum256(testData) + + result, err := client.PutBlob(ctx, &cxdb.PutBlobRequest{Data: testData}) + if err != nil { + return fmt.Errorf("PutBlob: %w", err) + } + fmt.Printf(" PutBlob: hash=%s was_new=%v\n", hex.EncodeToString(result.Hash[:]), result.WasNew) + if result.Hash != expectedHash { + return fmt.Errorf("hash mismatch: got %x, want %x", result.Hash, expectedHash) + } + + retrieved, err := client.GetBlob(ctx, expectedHash) + if err != nil { + return fmt.Errorf("GetBlob: %w", err) + } + if !bytes.Equal(retrieved, testData) { + return fmt.Errorf("content mismatch: got %d bytes, want %d bytes", len(retrieved), len(testData)) + } + fmt.Printf(" GetBlob: %d bytes, content matches\n", len(retrieved)) + return nil + }) + + // --- Test 2: GetBlob nonexistent hash returns ErrBlobNotFound --- + run("GetBlob nonexistent hash (expect ErrBlobNotFound)", func() error { + client := dial() + defer client.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + var bogusHash [32]byte + for i := range bogusHash { + bogusHash[i] = 0xDE + } + _, err := client.GetBlob(ctx, bogusHash) + if err == nil { + return fmt.Errorf("expected error for nonexistent blob, got nil") + } + if !errors.Is(err, cxdb.ErrBlobNotFound) { + return fmt.Errorf("expected ErrBlobNotFound, got: %v (type %T)", err, err) + } + fmt.Printf(" got expected ErrBlobNotFound: %v\n", err) + return nil + }) + + // --- Test 3: PutBlobIfAbsent deduplication --- + run("PutBlobIfAbsent deduplication", func() error { + client := dial() + defer client.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + dedupData := []byte("dedup-test-payload-" + fmt.Sprint(time.Now().UnixNano())) + + hash1, wasNew1, err := client.PutBlobIfAbsent(ctx, dedupData) + if err != nil { + return fmt.Errorf("first PutBlobIfAbsent: %w", err) + } + fmt.Printf(" first put: hash=%s was_new=%v\n", hex.EncodeToString(hash1[:]), wasNew1) + if !wasNew1 { + return fmt.Errorf("first put should be new") + } + + hash2, wasNew2, err := client.PutBlobIfAbsent(ctx, dedupData) + if err != nil { + return fmt.Errorf("second PutBlobIfAbsent: %w", err) + } + fmt.Printf(" second put: hash=%s was_new=%v\n", hex.EncodeToString(hash2[:]), wasNew2) + if wasNew2 { + return fmt.Errorf("second put should NOT be new (dedup)") + } + if hash1 != hash2 { + return fmt.Errorf("hashes differ: %x vs %x", hash1, hash2) + } + + // Verify we can get it back + retrieved, err := client.GetBlob(ctx, hash1) + if err != nil { + return fmt.Errorf("GetBlob after dedup: %w", err) + } + if !bytes.Equal(retrieved, dedupData) { + return fmt.Errorf("dedup content mismatch") + } + fmt.Printf(" GetBlob after dedup: %d bytes, content matches\n", len(retrieved)) + return nil + }) + + // --- Test 4: Large blob (1 MiB) --- + run("Large blob (1 MiB) round-trip", func() error { + client := dial() + defer client.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + largeData := make([]byte, 1024*1024) + for i := range largeData { + largeData[i] = byte(i % 251) + } + + largeHash, _, err := client.PutBlobIfAbsent(ctx, largeData) + if err != nil { + return fmt.Errorf("PutBlob large: %w", err) + } + fmt.Printf(" PutBlob: hash=%s\n", hex.EncodeToString(largeHash[:])) + + retrieved, err := client.GetBlob(ctx, largeHash) + if err != nil { + return fmt.Errorf("GetBlob large: %w", err) + } + if !bytes.Equal(retrieved, largeData) { + return fmt.Errorf("large blob content mismatch: got %d bytes, want %d", len(retrieved), len(largeData)) + } + fmt.Printf(" GetBlob: %d bytes, content verified\n", len(retrieved)) + return nil + }) + + // --- Test 5: Connection survives a not-found --- + run("Connection survives not-found", func() error { + client := dial() + defer client.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // Step 1: PutBlob proves the connection works + data1 := []byte(fmt.Sprintf("survive-test-1-%d", time.Now().UnixNano())) + _, err := client.PutBlob(ctx, &cxdb.PutBlobRequest{Data: data1}) + if err != nil { + return fmt.Errorf("step 1 PutBlob: %w", err) + } + fmt.Println(" step 1: PutBlob succeeded") + + // Step 2: GetBlob for nonexistent hash (used to kill the connection) + var bogusHash [32]byte + for i := range bogusHash { + bogusHash[i] = 0xAB + } + _, err = client.GetBlob(ctx, bogusHash) + if !errors.Is(err, cxdb.ErrBlobNotFound) { + return fmt.Errorf("step 2: expected ErrBlobNotFound, got: %v", err) + } + fmt.Println(" step 2: GetBlob not-found returned proper error") + + // Step 3: PutBlob + GetBlob for real data (proves connection is STILL ALIVE) + data2 := []byte(fmt.Sprintf("survive-test-2-%d", time.Now().UnixNano())) + hash2 := blake3.Sum256(data2) + _, err = client.PutBlob(ctx, &cxdb.PutBlobRequest{Data: data2}) + if err != nil { + return fmt.Errorf("step 3 PutBlob: %w (connection died after not-found)", err) + } + retrieved, err := client.GetBlob(ctx, hash2) + if err != nil { + return fmt.Errorf("step 3 GetBlob: %w (connection died after not-found)", err) + } + if !bytes.Equal(retrieved, data2) { + return fmt.Errorf("step 3: content mismatch") + } + fmt.Println(" step 3: PutBlob + GetBlob succeeded — connection survived!") + return nil + }) + + // --- Test 6: ReconnectingClient handles not-found without reconnecting --- + run("ReconnectingClient not-found without reconnect", func() error { + rc, err := cxdb.DialReconnecting(addr, nil, cxdb.WithClientTag("blob-verify-reconnect")) + if err != nil { + return fmt.Errorf("DialReconnecting: %w", err) + } + defer rc.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + sessionBefore := rc.SessionID() + fmt.Printf(" session before: %d\n", sessionBefore) + + // GetBlob for nonexistent hash should return ErrBlobNotFound + var bogusHash [32]byte + for i := range bogusHash { + bogusHash[i] = 0xCD + } + _, err = rc.GetBlob(ctx, bogusHash) + if !errors.Is(err, cxdb.ErrBlobNotFound) { + return fmt.Errorf("expected ErrBlobNotFound, got: %v", err) + } + fmt.Printf(" GetBlob not-found returned proper error\n") + + sessionAfter := rc.SessionID() + fmt.Printf(" session after: %d\n", sessionAfter) + if sessionBefore != sessionAfter { + return fmt.Errorf("session ID changed from %d to %d — reconnection happened (should not for 404)", sessionBefore, sessionAfter) + } + fmt.Println(" session ID unchanged — no reconnection") + + // Prove client still works with a real PutBlob + GetBlob + realData := []byte(fmt.Sprintf("reconnect-test-%d", time.Now().UnixNano())) + realHash := blake3.Sum256(realData) + _, err = rc.PutBlob(ctx, &cxdb.PutBlobRequest{Data: realData}) + if err != nil { + return fmt.Errorf("PutBlob after not-found: %w", err) + } + retrieved, err := rc.GetBlob(ctx, realHash) + if err != nil { + return fmt.Errorf("GetBlob after not-found: %w", err) + } + if !bytes.Equal(retrieved, realData) { + return fmt.Errorf("content mismatch after not-found") + } + fmt.Println(" PutBlob + GetBlob after not-found succeeded") + return nil + }) + + // --- Summary --- + fmt.Printf("=== Results: %d passed, %d failed ===\n", passed, failed) + if failed > 0 { + os.Exit(1) + } +} diff --git a/clients/go/cmd/cxdb-fixtures/main.go b/clients/go/cmd/cxdb-fixtures/main.go index ed5b834..7b13cb6 100644 --- a/clients/go/cmd/cxdb-fixtures/main.go +++ b/clients/go/cmd/cxdb-fixtures/main.go @@ -40,6 +40,7 @@ func main() { getLastFixture("get_last_payload", 1, 5, true), attachFsFixture("attach_fs", 99, testHash(0xAA)), putBlobFixture("put_blob", []byte("hello blob")), + getBlobFixture("get_blob", testHash(0xCC)), appendWithFsFixture("append_with_fs", 1, 0, "cxdb.ConversationItem", 3, []byte{0x91, 0x04}, "", testHash(0xBB)), } @@ -131,6 +132,12 @@ func getLastFixture(name string, contextID uint64, limit uint32, includePayload return Fixture{Name: name, MsgType: 6, Flags: 0, PayloadHex: hex.EncodeToString(payload)} } +func getBlobFixture(name string, hash [32]byte) Fixture { + payload := make([]byte, 0, 32) + payload = append(payload, hash[:]...) + return Fixture{Name: name, MsgType: 9, Flags: 0, PayloadHex: hex.EncodeToString(payload)} +} + func attachFsFixture(name string, turnID uint64, fsHash [32]byte) Fixture { payload := make([]byte, 0, 40) payload = appendU64(payload, turnID) diff --git a/clients/go/errors.go b/clients/go/errors.go index 6a59155..10127d8 100644 --- a/clients/go/errors.go +++ b/clients/go/errors.go @@ -21,6 +21,9 @@ var ( // ErrInvalidResponse is returned when the server response is malformed. ErrInvalidResponse = errors.New("cxdb: invalid response") + + // ErrBlobNotFound is returned when a blob hash doesn't exist in the store. + ErrBlobNotFound = errors.New("cxdb: blob not found") ) // ServerError represents an error returned by the CXDB server. diff --git a/clients/go/fs.go b/clients/go/fs.go index 5ba201c..093932a 100644 --- a/clients/go/fs.go +++ b/clients/go/fs.go @@ -58,6 +58,29 @@ func (c *Client) AttachFs(ctx context.Context, req *AttachFsRequest) (*AttachFsR return result, nil } +// GetBlob retrieves a blob by its BLAKE3-256 hash from the content-addressed store. +// Returns ErrBlobNotFound (wrapped in a ServerError) if the hash doesn't exist. +func (c *Client) GetBlob(ctx context.Context, hash [32]byte) ([]byte, error) { + resp, err := c.sendRequest(ctx, msgGetBlob, hash[:]) + if err != nil { + if IsServerError(err, 404) { + return nil, fmt.Errorf("%w: %s", ErrBlobNotFound, err) + } + return nil, fmt.Errorf("get blob: %w", err) + } + + if len(resp.payload) < 4 { + return nil, fmt.Errorf("%w: get blob response too short (%d bytes)", ErrInvalidResponse, len(resp.payload)) + } + + rawLen := binary.LittleEndian.Uint32(resp.payload[0:4]) + if uint32(len(resp.payload)-4) < rawLen { + return nil, fmt.Errorf("%w: get blob payload truncated (expected %d, got %d)", ErrInvalidResponse, rawLen, len(resp.payload)-4) + } + + return resp.payload[4 : 4+rawLen], nil +} + // PutBlobRequest contains parameters for storing a blob. type PutBlobRequest struct { // Data is the raw blob content. diff --git a/clients/go/fs_test.go b/clients/go/fs_test.go new file mode 100644 index 0000000..b354c8e --- /dev/null +++ b/clients/go/fs_test.go @@ -0,0 +1,359 @@ +// Copyright 2025 StrongDM Inc +// SPDX-License-Identifier: Apache-2.0 + +package cxdb + +import ( + "bytes" + "context" + "encoding/binary" + "errors" + "fmt" + "io" + "net" + "sync" + "testing" +) + +// ============================================================================= +// mockServer — reusable TCP test helper for the CXDB binary protocol +// ============================================================================= + +// mockHandler is called for each frame after the HELLO handshake. +// It receives the message type, request ID, and payload from the client. +// It returns the response message type, flags, payload, and an optional error. +// If err is non-nil, an error frame is sent instead of the normal response. +type mockHandler func(msgType uint16, reqID uint64, payload []byte) (respMsgType uint16, flags uint16, respPayload []byte, err error) + +// mockServer starts a TCP listener on 127.0.0.1:0, handles the HELLO handshake, +// then dispatches subsequent frames to the given handler. +// It returns the listener address and a cleanup function. +func mockServer(t *testing.T, handler mockHandler) (addr string, cleanup func()) { + t.Helper() + + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("mockServer: listen: %v", err) + } + + var wg sync.WaitGroup + wg.Add(1) + + go func() { + defer wg.Done() + + conn, err := ln.Accept() + if err != nil { + return // listener closed + } + defer conn.Close() + + // --- HELLO handshake --- + helloFrame, err := mockReadFrame(conn) + if err != nil { + t.Errorf("mockServer: read hello: %v", err) + return + } + if helloFrame.msgType != msgHello { + t.Errorf("mockServer: expected HELLO (1), got %d", helloFrame.msgType) + return + } + + // Respond with session_id=1, protocol_version=1 + helloResp := &bytes.Buffer{} + _ = binary.Write(helloResp, binary.LittleEndian, uint64(1)) // session_id + _ = binary.Write(helloResp, binary.LittleEndian, uint16(1)) // protocol_version + if err := mockWriteFrame(conn, msgHello, 0, helloFrame.reqID, helloResp.Bytes()); err != nil { + t.Errorf("mockServer: write hello response: %v", err) + return + } + + // --- Dispatch loop --- + for { + f, err := mockReadFrame(conn) + if err != nil { + if errors.Is(err, io.EOF) || errors.Is(err, net.ErrClosed) { + return + } + // Connection closed by client — normal during cleanup + return + } + + respMsgType, flags, respPayload, handlerErr := handler(f.msgType, f.reqID, f.payload) + if handlerErr != nil { + // Send error frame. Parse code from the error if it's a *ServerError, + // otherwise use code 500. + var se *ServerError + code := uint32(500) + detail := handlerErr.Error() + if errors.As(handlerErr, &se) { + code = se.Code + detail = se.Detail + } + errPayload := &bytes.Buffer{} + _ = binary.Write(errPayload, binary.LittleEndian, code) + _ = binary.Write(errPayload, binary.LittleEndian, uint32(len(detail))) + errPayload.WriteString(detail) + if err := mockWriteFrame(conn, msgError, 0, f.reqID, errPayload.Bytes()); err != nil { + return + } + continue + } + + if err := mockWriteFrame(conn, respMsgType, flags, f.reqID, respPayload); err != nil { + return + } + } + }() + + cleanup = func() { + ln.Close() + wg.Wait() + } + t.Cleanup(cleanup) + + return ln.Addr().String(), cleanup +} + +// mockReadFrame reads one binary protocol frame from conn. +func mockReadFrame(conn net.Conn) (*frame, error) { + header := make([]byte, 16) + if _, err := io.ReadFull(conn, header); err != nil { + return nil, err + } + length := binary.LittleEndian.Uint32(header[0:4]) + msgType := binary.LittleEndian.Uint16(header[4:6]) + // flags at header[6:8] — ignored for reading + reqID := binary.LittleEndian.Uint64(header[8:16]) + + payload := make([]byte, length) + if _, err := io.ReadFull(conn, payload); err != nil { + return nil, err + } + return &frame{msgType: msgType, reqID: reqID, payload: payload}, nil +} + +// mockWriteFrame writes one binary protocol frame to conn. +func mockWriteFrame(conn net.Conn, msgType uint16, flags uint16, reqID uint64, payload []byte) error { + header := &bytes.Buffer{} + _ = binary.Write(header, binary.LittleEndian, uint32(len(payload))) + _ = binary.Write(header, binary.LittleEndian, msgType) + _ = binary.Write(header, binary.LittleEndian, flags) + _ = binary.Write(header, binary.LittleEndian, reqID) + + _, err := conn.Write(append(header.Bytes(), payload...)) + return err +} + +// ============================================================================= +// Tests +// ============================================================================= + +func TestGetBlob_Success(t *testing.T) { + expectedData := []byte("hello, blob world!") + var requestHash [32]byte + for i := range requestHash { + requestHash[i] = byte(i) + } + + addr, _ := mockServer(t, func(msgType uint16, reqID uint64, payload []byte) (uint16, uint16, []byte, error) { + if msgType != msgGetBlob { + return 0, 0, nil, fmt.Errorf("unexpected msg type: %d", msgType) + } + if len(payload) != 32 { + return 0, 0, nil, fmt.Errorf("expected 32-byte hash payload, got %d", len(payload)) + } + var got [32]byte + copy(got[:], payload) + if got != requestHash { + return 0, 0, nil, fmt.Errorf("hash mismatch") + } + + // Build response: u32(len) + data + resp := &bytes.Buffer{} + _ = binary.Write(resp, binary.LittleEndian, uint32(len(expectedData))) + resp.Write(expectedData) + return msgGetBlob, 0, resp.Bytes(), nil + }) + + client, err := Dial(addr) + if err != nil { + t.Fatalf("Dial: %v", err) + } + defer client.Close() + + data, err := client.GetBlob(context.Background(), requestHash) + if err != nil { + t.Fatalf("GetBlob: %v", err) + } + if !bytes.Equal(data, expectedData) { + t.Errorf("got %q, want %q", data, expectedData) + } +} + +func TestGetBlob_NotFound(t *testing.T) { + addr, _ := mockServer(t, func(msgType uint16, reqID uint64, payload []byte) (uint16, uint16, []byte, error) { + return 0, 0, nil, &ServerError{Code: 404, Detail: "blob not found"} + }) + + client, err := Dial(addr) + if err != nil { + t.Fatalf("Dial: %v", err) + } + defer client.Close() + + var hash [32]byte + _, err = client.GetBlob(context.Background(), hash) + if err == nil { + t.Fatal("expected error, got nil") + } + if !errors.Is(err, ErrBlobNotFound) { + t.Errorf("expected error wrapping ErrBlobNotFound, got: %v", err) + } +} + +func TestGetBlob_ResponseTooShort(t *testing.T) { + addr, _ := mockServer(t, func(msgType uint16, reqID uint64, payload []byte) (uint16, uint16, []byte, error) { + // Return a payload shorter than 4 bytes + return msgGetBlob, 0, []byte{0x01, 0x02}, nil + }) + + client, err := Dial(addr) + if err != nil { + t.Fatalf("Dial: %v", err) + } + defer client.Close() + + var hash [32]byte + _, err = client.GetBlob(context.Background(), hash) + if err == nil { + t.Fatal("expected error, got nil") + } + if !errors.Is(err, ErrInvalidResponse) { + t.Errorf("expected error wrapping ErrInvalidResponse, got: %v", err) + } +} + +func TestGetBlob_PayloadTruncated(t *testing.T) { + addr, _ := mockServer(t, func(msgType uint16, reqID uint64, payload []byte) (uint16, uint16, []byte, error) { + // Claim 100 bytes but only provide 10 + resp := &bytes.Buffer{} + _ = binary.Write(resp, binary.LittleEndian, uint32(100)) + resp.Write(make([]byte, 10)) + return msgGetBlob, 0, resp.Bytes(), nil + }) + + client, err := Dial(addr) + if err != nil { + t.Fatalf("Dial: %v", err) + } + defer client.Close() + + var hash [32]byte + _, err = client.GetBlob(context.Background(), hash) + if err == nil { + t.Fatal("expected error, got nil") + } + if !errors.Is(err, ErrInvalidResponse) { + t.Errorf("expected error wrapping ErrInvalidResponse, got: %v", err) + } +} + +func TestPutBlobThenGetBlob_Roundtrip(t *testing.T) { + store := make(map[[32]byte][]byte) + var mu sync.Mutex + + addr, _ := mockServer(t, func(msgType uint16, reqID uint64, payload []byte) (uint16, uint16, []byte, error) { + switch msgType { + case msgPutBlob: + // Payload: hash(32) + len(u32) + data + if len(payload) < 36 { + return 0, 0, nil, fmt.Errorf("putblob payload too short: %d", len(payload)) + } + var hash [32]byte + copy(hash[:], payload[0:32]) + dataLen := binary.LittleEndian.Uint32(payload[32:36]) + if uint32(len(payload)-36) < dataLen { + return 0, 0, nil, fmt.Errorf("putblob data truncated") + } + data := make([]byte, dataLen) + copy(data, payload[36:36+dataLen]) + + mu.Lock() + _, existed := store[hash] + store[hash] = data + mu.Unlock() + + // Response: hash(32) + was_new(1) + resp := &bytes.Buffer{} + resp.Write(hash[:]) + if existed { + resp.WriteByte(0) + } else { + resp.WriteByte(1) + } + return msgPutBlob, 0, resp.Bytes(), nil + + case msgGetBlob: + if len(payload) != 32 { + return 0, 0, nil, fmt.Errorf("getblob expected 32-byte hash, got %d", len(payload)) + } + var hash [32]byte + copy(hash[:], payload) + + mu.Lock() + data, ok := store[hash] + mu.Unlock() + + if !ok { + return 0, 0, nil, &ServerError{Code: 404, Detail: "blob not found"} + } + + resp := &bytes.Buffer{} + _ = binary.Write(resp, binary.LittleEndian, uint32(len(data))) + resp.Write(data) + return msgGetBlob, 0, resp.Bytes(), nil + + default: + return 0, 0, nil, fmt.Errorf("unexpected msg type: %d", msgType) + } + }) + + client, err := Dial(addr) + if err != nil { + t.Fatalf("Dial: %v", err) + } + defer client.Close() + + blobData := []byte("the quick brown fox jumps over the lazy dog") + + // PutBlob + putResult, err := client.PutBlob(context.Background(), &PutBlobRequest{Data: blobData}) + if err != nil { + t.Fatalf("PutBlob: %v", err) + } + if !putResult.WasNew { + t.Error("expected WasNew=true for first put") + } + + // GetBlob with the returned hash + got, err := client.GetBlob(context.Background(), putResult.Hash) + if err != nil { + t.Fatalf("GetBlob: %v", err) + } + if !bytes.Equal(got, blobData) { + t.Errorf("roundtrip mismatch: got %q, want %q", got, blobData) + } + + // Put again — should not be new + putResult2, err := client.PutBlob(context.Background(), &PutBlobRequest{Data: blobData}) + if err != nil { + t.Fatalf("PutBlob (second): %v", err) + } + if putResult2.WasNew { + t.Error("expected WasNew=false for duplicate put") + } + if putResult2.Hash != putResult.Hash { + t.Error("hash mismatch between first and second put") + } +} diff --git a/clients/go/reconnect.go b/clients/go/reconnect.go index fc7bc11..04a2217 100644 --- a/clients/go/reconnect.go +++ b/clients/go/reconnect.go @@ -467,6 +467,17 @@ func (rc *ReconnectingClient) AttachFs(ctx context.Context, req *AttachFsRequest return result, err } +// GetBlob retrieves a blob by its BLAKE3-256 hash. +func (rc *ReconnectingClient) GetBlob(ctx context.Context, hash [32]byte) ([]byte, error) { + var result []byte + err := rc.enqueue(ctx, "GetBlob", func(c *Client) error { + var opErr error + result, opErr = c.GetBlob(ctx, hash) + return opErr + }) + return result, err +} + // PutBlob stores a blob and returns its hash. func (rc *ReconnectingClient) PutBlob(ctx context.Context, req *PutBlobRequest) (*PutBlobResult, error) { var result *PutBlobResult diff --git a/fixtures/protocol/get_blob.json b/fixtures/protocol/get_blob.json new file mode 100644 index 0000000..6a0428a --- /dev/null +++ b/fixtures/protocol/get_blob.json @@ -0,0 +1,6 @@ +{ + "name": "get_blob", + "msg_type": 9, + "flags": 0, + "payload_hex": "cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc" +} \ No newline at end of file diff --git a/server/src/main.rs b/server/src/main.rs index ed290e6..beacb55 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -319,10 +319,13 @@ fn handle_client( x if x == MsgType::GetHead as u16 => { let context_id = parse_get_head(&payload)?; let store = store.read().unwrap(); - let head = store.get_head(context_id)?; - let resp = - encode_ctx_create_resp(head.context_id, head.head_turn_id, head.head_depth)?; - Ok((MsgType::GetHead as u16, resp)) + match store.get_head(context_id) { + Ok(head) => { + let resp = encode_ctx_create_resp(head.context_id, head.head_turn_id, head.head_depth)?; + Ok((MsgType::GetHead as u16, resp)) + } + Err(e) => Err(e), + } } x if x == MsgType::AppendTurn as u16 => { let req = parse_append_turn(&payload, header.flags)?; @@ -399,12 +402,13 @@ fn handle_client( // Verify hash matches let actual_hash = blake3::hash(&req.data); if actual_hash.as_bytes() != &req.hash { - return Err(StoreError::InvalidInput("blob hash mismatch".into())); + Err(StoreError::InvalidInput("blob hash mismatch".into())) + } else { + let was_new = !store.blob_store.contains(&req.hash); + store.blob_store.put_if_absent(req.hash, &req.data)?; + let resp = encode_put_blob_resp(&req.hash, was_new)?; + Ok((MsgType::PutBlob as u16, resp)) } - let was_new = !store.blob_store.contains(&req.hash); - store.blob_store.put_if_absent(req.hash, &req.data)?; - let resp = encode_put_blob_resp(&req.hash, was_new)?; - Ok((MsgType::PutBlob as u16, resp)) } x if x == MsgType::GetLast as u16 => { let req = parse_get_last(&payload)?; @@ -447,12 +451,16 @@ fn handle_client( x if x == MsgType::GetBlob as u16 => { let hash = parse_get_blob(&payload)?; let store = store.read().unwrap(); - let bytes = store.get_blob(&hash)?; - metrics.record_get_blob(op_start.elapsed()); - let mut resp = Vec::new(); - resp.write_u32::(bytes.len() as u32)?; - resp.extend_from_slice(&bytes); - Ok((MsgType::GetBlob as u16, resp)) + match store.get_blob(&hash) { + Ok(bytes) => { + metrics.record_get_blob(op_start.elapsed()); + let mut resp = Vec::new(); + resp.write_u32::(bytes.len() as u32)?; + resp.extend_from_slice(&bytes); + Ok((MsgType::GetBlob as u16, resp)) + } + Err(e) => Err(e), + } } _ => Err(StoreError::InvalidInput("unknown msg_type".into())), }; From f314bfb7ab9ae56048410e38d76d0455b89d10c8 Mon Sep 17 00:00:00 2001 From: Tyler Gannon Date: Mon, 23 Feb 2026 18:51:57 +0000 Subject: [PATCH 2/2] feat(types): add Synthetic field to UserInput Adds a boolean Synthetic field to UserInput for marking programmatically injected messages (e.g., from navigators or orchestrators) vs direct human input. UIs may render synthetic inputs differently (dimmed, different icon). This is a simpler alternative to a separate SteeringMessage type - all steering/injection use cases can use UserInput{Synthetic: true}. --- clients/go/types/conversation.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/clients/go/types/conversation.go b/clients/go/types/conversation.go index 29b8fa3..ba90a29 100644 --- a/clients/go/types/conversation.go +++ b/clients/go/types/conversation.go @@ -178,6 +178,9 @@ type UserInput struct { // Files lists file paths included with the input. Files []string `msgpack:"2" json:"files,omitempty"` + + // Synthetic means programmatically injected. + Synthetic bool `msgpack:"3" json:"synthetic,omitempty"` } // =============================================================================