diff --git a/dataexchange.go b/dataexchange.go index efe250c..00beb46 100644 --- a/dataexchange.go +++ b/dataexchange.go @@ -6,7 +6,9 @@ import ( "encoding/binary" "fmt" "io" + "os" "path/filepath" + "strconv" "strings" "unicode/utf8" ) @@ -20,6 +22,16 @@ const ( // TypeTrace wraps another frame type with nanosecond-precision timing. // Wire layout: [4-byte TypeTrace][4-byte len][4-byte inner_type][8-byte sent_at_ns][inner_payload] TypeTrace uint32 = 5 + // Type 6 is reserved (see Alex's reply-on-conn PR chain / TypeAutoAnswer). + // + // TypeFileStream is the chunked/ACK'd/resumable file-transfer protocol + // (see docs/PROPOSAL-reliable-file-transfer.md and filestream.go). Each + // TypeFileStream frame carries a small control header + at most one + // chunk of file data, so a multi-GiB transfer never collapses into a + // single giant frame the way TypeFile does. Backward compatible: a + // peer that does not understand TypeFileStream never sends INIT-ACK, so + // the sender falls back to TypeFile. + TypeFileStream uint32 = 7 ) // TraceFrame carries timing metadata around an inner message frame. @@ -56,10 +68,42 @@ func ReadTracePayload(f *Frame) (*TraceFrame, error) { // maxFilenameLen limits filename length to prevent abuse. const maxFilenameLen = 255 -// MaxFrameSize caps a single data-exchange frame at 256 MiB. Sized to fit -// the test fleet's 100 MiB file payloads with margin while still rejecting -// pathological 500 MiB+ frames that would dominate memory. -const MaxFrameSize = 1 << 28 +// DefaultMaxFrameSize caps a single data-exchange frame at 1 GiB. +// +// The wire format's length field is a uint32 (see Frame docstring below), +// so the absolute ceiling is ~4 GiB; this cap exists purely to bound the +// memory the receiver allocates for any single transfer. Raising it +// trades RAM for the ability to ship larger artefacts in one frame. +// +// History: was 256 MiB pre-2026-06-14 (sized for the test fleet's 100 MiB +// payloads). Raised after operators on multi-GiB-RAM hosts hit the cap on +// real workloads; the chunked streaming protocol described in +// docs/PROPOSAL-reliable-file-transfer.md (web4) will eventually remove +// the need for a per-frame cap entirely. +const DefaultMaxFrameSize uint32 = 1 << 30 + +// MaxFrameSize is the runtime-effective frame cap. Set at package init +// from the PILOT_DATAEXCHANGE_MAX_FRAME env var (in bytes) if present +// and within the safe range [64 KiB, 2 GiB); otherwise DefaultMaxFrameSize. +// +// Both ends of a transfer must agree on the cap — a sender that exceeds +// the receiver's cap will see the receiver return "frame too large" and +// drop the connection. The env var is honored at process start, so +// rolling out a higher cap means restarting daemons on both sides. +var MaxFrameSize uint32 = func() uint32 { + v := os.Getenv("PILOT_DATAEXCHANGE_MAX_FRAME") + if v == "" { + return DefaultMaxFrameSize + } + n, err := strconv.ParseUint(v, 10, 32) + if err != nil || n < 1<<16 || n >= 1<<31 { + // Silently ignore garbage / unsafe values rather than crash the + // daemon at startup — surface them via the daemon log if the + // operator opted in. + return DefaultMaxFrameSize + } + return uint32(n) +}() // Frame is a typed data unit exchanged between agents. // Wire format: [4-byte type][4-byte length][payload] @@ -152,6 +196,8 @@ func TypeName(t uint32) string { return "FILE" case TypeTrace: return "TRACE" + case TypeFileStream: + return "FILESTREAM" default: return fmt.Sprintf("UNKNOWN(%d)", t) } diff --git a/examples/main.go b/examples/main.go index dc49d67..02cf114 100644 --- a/examples/main.go +++ b/examples/main.go @@ -8,10 +8,10 @@ import ( "log" "net" - internaldx "github.com/pilot-protocol/dataexchange" "github.com/pilot-protocol/common/driver" "github.com/pilot-protocol/common/protocol" "github.com/pilot-protocol/dataexchange" + internaldx "github.com/pilot-protocol/dataexchange" ) func main() { diff --git a/filestream.go b/filestream.go new file mode 100644 index 0000000..c11091f --- /dev/null +++ b/filestream.go @@ -0,0 +1,637 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later + +package dataexchange + +// Chunked, ACK'd, resumable file transfer (TypeFileStream). +// +// Problem this solves: TypeFile ships a whole file as one frame and waits +// for a single ACK after the receiver has read every byte and flushed to +// disk. On any non-trivial path (relay, or a direct link that flips to +// relay under sustained one-way load) the transfer stalls — there is no +// reverse-path traffic to keep the tunnel's blackhole heuristic happy, no +// backpressure, and no progress. Transfers above ~64 KiB time out. +// +// TypeFileStream breaks the file into small chunks. Every chunk is ACK'd, +// so the reverse path always carries traffic, the receiver writes +// incrementally, and a dropped transfer resumes from the last contiguous +// byte. End-to-end integrity is verified with a SHA-256 over the whole +// file (the per-tunnel AEAD only protects individual datagrams). +// +// Wire format — every TypeFileStream frame's payload is: +// +// [1] kind +// [16] transfer_id (sha256(content)[:16] — stable across retries) +// ... kind-specific body +// +// transfer_id is derived from the content hash so a retry of the same +// file lands on the same receiver-side .partial and resumes automatically. + +import ( + "bytes" + "crypto/sha256" + "encoding/binary" + "encoding/hex" + "errors" + "fmt" + "io" + "os" + "path/filepath" + "sync" + "sync/atomic" + "time" +) + +// Stream control-frame kinds (the first byte of a TypeFileStream payload). +const ( + streamKindInit byte = 0x01 // sender→receiver: filename, size, full hash, chunk size + streamKindChunk byte = 0x02 // sender→receiver: offset + chunk bytes + streamKindAck byte = 0x03 // receiver→sender: highest contiguous offset received + streamKindDone byte = 0x04 // sender→receiver: end of stream; verify full hash + streamKindInitAck byte = 0x05 // receiver→sender: resume offset (presence ⇒ peer supports stream) + streamKindComplete byte = 0x06 // receiver→sender: final status after DONE + streamKindAbort byte = 0x07 // either direction: cancel + reason +) + +// Defaults. Chunk size is deliberately held below 64 KiB: on the Mac↔GCP-VM +// rig, single tunnel writes at/above ~256 KiB are silently swallowed by the +// reliable-stream layer (a 64 KiB TypeFile transfer succeeds byte-perfect; +// 256 KiB stalls), so a large chunk would reproduce the very failure this +// protocol exists to avoid. 48 KiB chunks each ride the known-good path, and +// the per-chunk ACK keeps the reverse direction busy so the tunnel's +// blackhole heuristic does not flip the link mid-transfer. Window bounds the +// in-flight (unacked) bytes; 16 × 48 KiB = 768 KiB. +const ( + StreamChunkSize = 48 * 1024 + streamWindow = 16 + streamNegTimeout = 5 * time.Second // wait for INIT-ACK before falling back to TypeFile + streamStepTimeout = 60 * time.Second // max wait for an ACK / the final COMPLETE + transferIDLen = 16 +) + +// ErrStreamUnsupported is returned by SendFileStream when the peer does not +// answer INIT with an INIT-ACK within the negotiation window — i.e. it is a +// pre-TypeFileStream receiver. The caller should fall back to SendFile on a +// fresh connection. +var ErrStreamUnsupported = errors.New("dataexchange: peer does not support TypeFileStream") + +// --- control-frame codec --------------------------------------------------- + +func encodeStreamFrame(kind byte, id [transferIDLen]byte, body []byte) *Frame { + p := make([]byte, 1+transferIDLen+len(body)) + p[0] = kind + copy(p[1:1+transferIDLen], id[:]) + copy(p[1+transferIDLen:], body) + return &Frame{Type: TypeFileStream, Payload: p} +} + +func decodeStreamFrame(f *Frame) (kind byte, id [transferIDLen]byte, body []byte, ok bool) { + if f == nil || f.Type != TypeFileStream || len(f.Payload) < 1+transferIDLen { + return 0, id, nil, false + } + kind = f.Payload[0] + copy(id[:], f.Payload[1:1+transferIDLen]) + body = f.Payload[1+transferIDLen:] + return kind, id, body, true +} + +func encodeInit(id [transferIDLen]byte, size uint64, hash [32]byte, chunkSize uint32, name string) *Frame { + nb := []byte(name) + if len(nb) > maxFilenameLen { + nb = nb[:maxFilenameLen] + } + body := make([]byte, 8+32+4+2+len(nb)) + binary.BigEndian.PutUint64(body[0:8], size) + copy(body[8:40], hash[:]) + binary.BigEndian.PutUint32(body[40:44], chunkSize) + binary.BigEndian.PutUint16(body[44:46], uint16(len(nb))) + copy(body[46:], nb) + return encodeStreamFrame(streamKindInit, id, body) +} + +func decodeInit(body []byte) (size uint64, hash [32]byte, chunkSize uint32, name string, ok bool) { + if len(body) < 46 { + return 0, hash, 0, "", false + } + size = binary.BigEndian.Uint64(body[0:8]) + copy(hash[:], body[8:40]) + chunkSize = binary.BigEndian.Uint32(body[40:44]) + nameLen := int(binary.BigEndian.Uint16(body[44:46])) + if 46+nameLen > len(body) { + return 0, hash, 0, "", false + } + name = string(body[46 : 46+nameLen]) + return size, hash, chunkSize, name, true +} + +func encodeOffset(kind byte, id [transferIDLen]byte, off uint64) *Frame { + var b [8]byte + binary.BigEndian.PutUint64(b[:], off) + return encodeStreamFrame(kind, id, b[:]) +} + +func decodeOffset(body []byte) (uint64, bool) { + if len(body) < 8 { + return 0, false + } + return binary.BigEndian.Uint64(body[0:8]), true +} + +func encodeChunk(id [transferIDLen]byte, off uint64, data []byte) *Frame { + body := make([]byte, 8+len(data)) + binary.BigEndian.PutUint64(body[0:8], off) + copy(body[8:], data) + return encodeStreamFrame(streamKindChunk, id, body) +} + +func decodeChunk(body []byte) (off uint64, data []byte, ok bool) { + if len(body) < 8 { + return 0, nil, false + } + return binary.BigEndian.Uint64(body[0:8]), body[8:], true +} + +func encodeComplete(id [transferIDLen]byte, ok bool, msg string) *Frame { + body := make([]byte, 1+len(msg)) + if !ok { + body[0] = 1 + } + copy(body[1:], msg) + return encodeStreamFrame(streamKindComplete, id, body) +} + +func decodeComplete(body []byte) (ok bool, msg string) { + if len(body) < 1 { + return false, "malformed complete" + } + return body[0] == 0, string(body[1:]) +} + +// --- sender ---------------------------------------------------------------- + +// StreamResult summarizes a completed (or failed) TypeFileStream transfer. +type StreamResult struct { + BytesSent int64 // chunk bytes actually written to the wire this run + BytesResumed int64 // bytes the receiver already had (skipped) + TotalBytes int64 // file size + Sha256 string // hex of the full-content hash declared in INIT + OK bool + Message string // receiver's COMPLETE message (empty on success) +} + +// frameRW is the minimal connection surface the stream sender needs. +// Both *driver.Conn (the production transport) and net.Pipe ends (tests) +// satisfy it. +type frameRW interface { + io.Reader + io.Writer + Close() error +} + +// SendFileStream transfers a file using the chunked TypeFileStream protocol +// with a sliding window, end-to-end SHA-256 verification, and automatic +// resume (the receiver reports how many contiguous bytes it already has). +// +// Returns ErrStreamUnsupported if the peer never answers INIT with an +// INIT-ACK within the negotiation window — the caller should fall back to +// SendFile on a fresh connection. stepTimeout bounds the wait for any single +// ACK and for the final COMPLETE (0 ⇒ default). +func (c *Client) SendFileStream(name string, r io.ReadSeeker, size int64, stepTimeout time.Duration) (*StreamResult, error) { + return streamSend(c.conn, name, r, size, stepTimeout) +} + +func streamSend(conn frameRW, name string, r io.ReadSeeker, size int64, stepTimeout time.Duration) (*StreamResult, error) { + if stepTimeout <= 0 { + stepTimeout = streamStepTimeout + } + + // Pass 1: hash the full content (the transfer_id is derived from it, so + // resume across retries is automatic). + h := sha256.New() + if _, err := io.Copy(h, r); err != nil { + return nil, fmt.Errorf("hash file: %w", err) + } + var fullHash [32]byte + copy(fullHash[:], h.Sum(nil)) + var id [transferIDLen]byte + copy(id[:], fullHash[:transferIDLen]) + if _, err := r.Seek(0, io.SeekStart); err != nil { + return nil, fmt.Errorf("seek file: %w", err) + } + + // INIT + negotiate. + if err := WriteFrame(conn, encodeInit(id, uint64(size), fullHash, uint32(StreamChunkSize), name)); err != nil { + return nil, fmt.Errorf("send INIT: %w", err) + } + initAck, err := recvFrameTimeout(conn, streamNegTimeout) + if err != nil { + return nil, ErrStreamUnsupported + } + kind, gotID, body, ok := decodeStreamFrame(initAck) + if !ok || kind != streamKindInitAck || gotID != id { + // A legacy receiver answers with a plain "ACK UNKNOWN(7)" TEXT + // frame, or nothing useful — treat as unsupported. + return nil, ErrStreamUnsupported + } + resumeOff, _ := decodeOffset(body) + if resumeOff > uint64(size) { + resumeOff = uint64(size) // defensive: receiver claims more than exists + } + + // Reader goroutine owns all reads from here on. It feeds ACK offsets to + // the window and delivers the terminal COMPLETE / error. + acked := &atomic.Uint64{} + acked.Store(resumeOff) + notify := make(chan struct{}, 1) + done := make(chan streamTerminal, 1) + go streamReadLoop(conn, id, acked, notify, done) + + // Send chunks from the resume offset with a bounded in-flight window. + if _, err := r.Seek(int64(resumeOff), io.SeekStart); err != nil { + return nil, fmt.Errorf("seek to resume offset: %w", err) + } + windowBytes := uint64(streamWindow * StreamChunkSize) + buf := make([]byte, StreamChunkSize) + offset := resumeOff + for offset < uint64(size) { + // Window gate: block until the receiver has acked enough that the + // in-flight (unacked) bytes stay under the window. + for offset-acked.Load() >= windowBytes { + select { + case <-notify: + case t := <-done: + return nil, t.errOrTimeout("waiting for window ACK") + case <-time.After(stepTimeout): + _ = conn.Close() + return nil, fmt.Errorf("timed out after %s waiting for ACK (sent %d, acked %d)", stepTimeout, offset, acked.Load()) + } + } + n, rerr := io.ReadFull(r, buf) + if n == 0 && rerr != nil { + if errors.Is(rerr, io.EOF) { + break + } + return nil, fmt.Errorf("read chunk at %d: %w", offset, rerr) + } + if werr := WriteFrame(conn, encodeChunk(id, offset, buf[:n])); werr != nil { + return nil, fmt.Errorf("send chunk at %d: %w", offset, werr) + } + offset += uint64(n) + if rerr != nil && !errors.Is(rerr, io.ErrUnexpectedEOF) && !errors.Is(rerr, io.EOF) { + return nil, fmt.Errorf("read chunk at %d: %w", offset, rerr) + } + } + + // DONE — receiver verifies the full hash and replies COMPLETE. + if err := WriteFrame(conn, encodeStreamFrame(streamKindDone, id, nil)); err != nil { + return nil, fmt.Errorf("send DONE: %w", err) + } + select { + case t := <-done: + if t.err != nil { + return nil, fmt.Errorf("transfer failed: %w", t.err) + } + return &StreamResult{ + BytesSent: int64(offset - resumeOff), + BytesResumed: int64(resumeOff), + TotalBytes: size, + Sha256: hex.EncodeToString(fullHash[:]), + OK: t.completeOK, + Message: t.completeMsg, + }, nil + case <-time.After(stepTimeout): + _ = conn.Close() + return nil, fmt.Errorf("timed out after %s waiting for receiver to verify and COMPLETE", stepTimeout) + } +} + +type streamTerminal struct { + err error // transport / protocol error + completeOK bool // receiver verified the file + completeMsg string // receiver's message + complete bool // a COMPLETE was received +} + +func (t streamTerminal) errOrTimeout(ctx string) error { + if t.err != nil { + return fmt.Errorf("%s: %w", ctx, t.err) + } + if t.complete && !t.completeOK { + return fmt.Errorf("%s: receiver aborted: %s", ctx, t.completeMsg) + } + return fmt.Errorf("%s: stream closed", ctx) +} + +// streamReadLoop consumes ACK and COMPLETE frames until a terminal event. +func streamReadLoop(conn frameRW, id [transferIDLen]byte, acked *atomic.Uint64, notify chan<- struct{}, done chan<- streamTerminal) { + for { + f, err := ReadFrame(conn) + if err != nil { + done <- streamTerminal{err: err} + return + } + kind, gotID, body, ok := decodeStreamFrame(f) + if !ok || gotID != id { + continue // stray frame; ignore + } + switch kind { + case streamKindAck: + if off, ok := decodeOffset(body); ok { + // Monotonic advance only. + for { + cur := acked.Load() + if off <= cur || acked.CompareAndSwap(cur, off) { + break + } + } + select { + case notify <- struct{}{}: + default: + } + } + case streamKindComplete: + cok, msg := decodeComplete(body) + done <- streamTerminal{completeOK: cok, completeMsg: msg, complete: true} + return + case streamKindAbort: + done <- streamTerminal{err: fmt.Errorf("receiver abort: %s", string(body)), complete: true} + return + } + } +} + +// recvFrameTimeout reads one frame with a deadline, racing ReadFrame against +// a timer (driver.Conn has no read deadline we can set here). On timeout the +// blocked goroutine unwinds when the caller closes the connection. +func recvFrameTimeout(conn frameRW, d time.Duration) (*Frame, error) { + type res struct { + f *Frame + err error + } + ch := make(chan res, 1) + go func() { + f, err := ReadFrame(conn) + ch <- res{f, err} + }() + select { + case r := <-ch: + return r.f, r.err + case <-time.After(d): + return nil, fmt.Errorf("read timed out after %s", d) + } +} + +// --- receiver -------------------------------------------------------------- + +// StreamReceiver handles the receive side of TypeFileStream for a single +// connection. It writes incoming chunks contiguously to a .partial file +// (so the on-disk size is always the resume offset), verifies the full +// SHA-256 on DONE, and atomically renames into place. Decoupled from the +// daemon Service so it can be unit-tested with just a directory. +type StreamReceiver struct { + receivedDir string + onSaved func(name, path string, size int64) + nameSuffix func(base string) string // produces the final unique filename + + mu sync.Mutex + transfers map[[transferIDLen]byte]*recvTransfer +} + +type recvTransfer struct { + file *os.File + partial string + name string + size uint64 + hash [32]byte + cursor uint64 // highest contiguous byte written + pending map[uint64][]byte // out-of-order chunks held until contiguous + pendBytes int +} + +// NewStreamReceiver builds a receiver writing into receivedDir. nameSuffix +// maps a base filename to a final unique name (nil ⇒ a timestamped default). +// onSaved (nil ok) fires after a verified file is renamed into place. +func NewStreamReceiver(receivedDir string, nameSuffix func(base string) string, onSaved func(name, path string, size int64)) *StreamReceiver { + if nameSuffix == nil { + nameSuffix = defaultStreamName + } + return &StreamReceiver{ + receivedDir: receivedDir, + onSaved: onSaved, + nameSuffix: nameSuffix, + transfers: make(map[[transferIDLen]byte]*recvTransfer), + } +} + +func defaultStreamName(base string) string { + ts := time.Now().Format("20060102-150405.000") + ext := filepath.Ext(base) + stem := base[:len(base)-len(ext)] + return fmt.Sprintf("%s-%s%s", stem, ts, ext) +} + +// maxPendingBytes bounds the out-of-order buffer per transfer. +const maxPendingBytes = streamWindow * StreamChunkSize + +// HandleFrame processes one TypeFileStream frame and returns the response +// frame to send back (nil ⇒ nothing to send). It never returns an error for +// protocol-level problems — those are reported to the peer via COMPLETE / +// ABORT frames so the connection loop stays simple. +func (sr *StreamReceiver) HandleFrame(f *Frame) *Frame { + kind, id, body, ok := decodeStreamFrame(f) + if !ok { + return nil + } + switch kind { + case streamKindInit: + return sr.handleInit(id, body) + case streamKindChunk: + return sr.handleChunk(id, body) + case streamKindDone: + return sr.handleDone(id) + case streamKindAbort: + sr.discard(id) + return nil + default: + return nil + } +} + +func (sr *StreamReceiver) handleInit(id [transferIDLen]byte, body []byte) *Frame { + size, hash, _, name, ok := decodeInit(body) + if !ok { + return encodeComplete(id, false, "malformed INIT") + } + partialDir := filepath.Join(sr.receivedDir, ".partial") + if err := os.MkdirAll(partialDir, 0700); err != nil { + return encodeComplete(id, false, "mkdir partial: "+err.Error()) + } + partial := filepath.Join(partialDir, hex.EncodeToString(id[:])) + + file, err := os.OpenFile(partial, os.O_RDWR|os.O_CREATE, 0600) + if err != nil { + return encodeComplete(id, false, "open partial: "+err.Error()) + } + // Resume from the contiguous bytes already on disk. Because we only + // ever write contiguously, the file size IS the resume offset. Guard + // against a stale/oversized .partial. + info, err := file.Stat() + var resume uint64 + if err == nil && info.Size() >= 0 && uint64(info.Size()) <= size { + resume = uint64(info.Size()) + } else { + _ = file.Truncate(0) + resume = 0 + } + + sr.mu.Lock() + // Replace any stale in-memory transfer for this id (e.g. a prior conn). + if old := sr.transfers[id]; old != nil && old.file != nil { + _ = old.file.Close() + } + sr.transfers[id] = &recvTransfer{ + file: file, + partial: partial, + name: sanitizeBase(name), + size: size, + hash: hash, + cursor: resume, + pending: make(map[uint64][]byte), + } + sr.mu.Unlock() + + return encodeOffset(streamKindInitAck, id, resume) +} + +func (sr *StreamReceiver) handleChunk(id [transferIDLen]byte, body []byte) *Frame { + off, data, ok := decodeChunk(body) + if !ok { + return encodeComplete(id, false, "malformed CHUNK") + } + sr.mu.Lock() + t := sr.transfers[id] + if t == nil { + sr.mu.Unlock() + return encodeStreamFrame(streamKindAbort, id, []byte("no active transfer (send INIT first)")) + } + switch { + case off == t.cursor: + if werr := sr.writeAt(t, off, data); werr != nil { + sr.mu.Unlock() + return encodeComplete(id, false, werr.Error()) + } + // Drain any buffered successors. + for { + next, has := t.pending[t.cursor] + if !has { + break + } + delete(t.pending, t.cursor) + t.pendBytes -= len(next) + if werr := sr.writeAt(t, t.cursor, next); werr != nil { + sr.mu.Unlock() + return encodeComplete(id, false, werr.Error()) + } + } + case off > t.cursor: + // Out of order (transient reorder). Buffer if room; otherwise drop + // and let the sender's window stall + retransmit re-drive it. + if _, dup := t.pending[off]; !dup && t.pendBytes+len(data) <= maxPendingBytes { + t.pending[off] = append([]byte(nil), data...) + t.pendBytes += len(data) + } + default: + // off < cursor: duplicate already-written bytes — ignore. + } + cursor := t.cursor + sr.mu.Unlock() + return encodeOffset(streamKindAck, id, cursor) +} + +// writeAt appends a contiguous chunk at off (== cursor) and advances cursor. +// Caller holds sr.mu. +func (sr *StreamReceiver) writeAt(t *recvTransfer, off uint64, data []byte) error { + if _, err := t.file.WriteAt(data, int64(off)); err != nil { + return fmt.Errorf("write at %d: %w", off, err) + } + t.cursor += uint64(len(data)) + return nil +} + +func (sr *StreamReceiver) handleDone(id [transferIDLen]byte) *Frame { + sr.mu.Lock() + t := sr.transfers[id] + sr.mu.Unlock() + if t == nil { + return encodeComplete(id, false, "no active transfer") + } + + if err := t.file.Sync(); err != nil { + return encodeComplete(id, false, "fsync: "+err.Error()) + } + if t.cursor != t.size { + return encodeComplete(id, false, fmt.Sprintf("incomplete: have %d of %d bytes", t.cursor, t.size)) + } + + // Verify the full content hash before accepting the file. + if _, err := t.file.Seek(0, io.SeekStart); err != nil { + return encodeComplete(id, false, "seek for verify: "+err.Error()) + } + h := sha256.New() + if _, err := io.Copy(h, t.file); err != nil { + return encodeComplete(id, false, "read for verify: "+err.Error()) + } + if !bytes.Equal(h.Sum(nil), t.hash[:]) { + // Keep .partial for inspection; drop the in-memory transfer. + sr.discard(id) + return encodeComplete(id, false, "sha256 mismatch — file corrupt, .partial retained") + } + + _ = t.file.Close() + finalName := sr.nameSuffix(t.name) + finalPath := filepath.Join(sr.receivedDir, finalName) + if err := os.Rename(t.partial, finalPath); err != nil { + return encodeComplete(id, false, "rename: "+err.Error()) + } + sr.forget(id) + if sr.onSaved != nil { + sr.onSaved(finalName, finalPath, int64(t.size)) + } + return encodeComplete(id, true, "") +} + +// discard closes and forgets a transfer but leaves the .partial on disk. +func (sr *StreamReceiver) discard(id [transferIDLen]byte) { + sr.mu.Lock() + if t := sr.transfers[id]; t != nil && t.file != nil { + _ = t.file.Close() + } + delete(sr.transfers, id) + sr.mu.Unlock() +} + +func (sr *StreamReceiver) forget(id [transferIDLen]byte) { + sr.mu.Lock() + delete(sr.transfers, id) + sr.mu.Unlock() +} + +// Close releases any open .partial handles (call on connection teardown). +// The .partial files themselves remain for resume. +func (sr *StreamReceiver) Close() { + sr.mu.Lock() + for _, t := range sr.transfers { + if t.file != nil { + _ = t.file.Close() + } + } + sr.transfers = make(map[[transferIDLen]byte]*recvTransfer) + sr.mu.Unlock() +} + +func sanitizeBase(name string) string { + b := filepath.Base(name) + if b == "." || b == "/" || b == "" { + return "received.bin" + } + return b +} diff --git a/service.go b/service.go index c4f59b8..b6b7599 100644 --- a/service.go +++ b/service.go @@ -127,6 +127,32 @@ func (s *Service) handleConn(ctx context.Context, conn coreapi.Stream) { // L11 panic boundary: tear down THIS conn only. defer coreapi.RecoverPlugin("dataexchange", "handleConn", s.deps.Events, nil) defer conn.Close() + + // Lazily-constructed TypeFileStream receiver, scoped to this connection. + // Final filenames and the file.received event match saveReceivedFile so + // the two transfer paths are indistinguishable to consumers. + var sr *StreamReceiver + defer func() { + if sr != nil { + sr.Close() + } + }() + streamNameSuffix := func(base string) string { + ts := time.Now().Format("20060102-150405.000") + seq := s.seq.Add(1) + ext := filepath.Ext(base) + stem := base[:len(base)-len(ext)] + return fmt.Sprintf("%s-%s-%06d%s", stem, ts, seq, ext) + } + streamOnSaved := func(name, path string, size int64) { + slog.Info("file saved (stream)", "path", path, "bytes", size) + if s.deps.Events != nil { + s.deps.Events.Publish("file.received", map[string]any{ + "filename": name, "size": int(size), "path": path, + }) + } + } + for { frame, err := ReadFrame(conn) // Capture right after the IO read so receiver-side timestamps are as @@ -143,6 +169,28 @@ func (s *Service) handleConn(ctx context.Context, conn coreapi.Stream) { var saveErr error var ackFrame *Frame switch frame.Type { + case TypeFileStream: + // Chunked/resumable transfer. The receiver emits its own + // control responses (INIT-ACK / ACK / COMPLETE), so skip the + // generic per-frame ACK below. + if sr == nil { + dir, derr := s.receivedDir() + if derr != nil { + _ = WriteFrame(conn, &Frame{Type: TypeText, Payload: []byte("ERR received dir: " + derr.Error())}) + return + } + if mderr := os.MkdirAll(dir, 0700); mderr != nil { + _ = WriteFrame(conn, &Frame{Type: TypeText, Payload: []byte("ERR mkdir: " + mderr.Error())}) + return + } + sr = NewStreamReceiver(dir, streamNameSuffix, streamOnSaved) + } + if resp := sr.HandleFrame(frame); resp != nil { + if werr := WriteFrame(conn, resp); werr != nil { + return + } + } + continue case TypeFile: if frame.Filename != "" { saveErr = s.saveReceivedFile(frame) @@ -281,10 +329,10 @@ func (s *Service) saveInboxMessage(frame *Frame, from protocol.Addr) error { "frame_bytes", len(frame.Payload)) if s.deps.Events != nil { s.deps.Events.Publish("inbox.full", map[string]any{ - "from": from.String(), - "type": TypeName(frame.Type), - "frame_bytes": len(frame.Payload), - "max_bytes": s.cfg.InboxMaxBytes, + "from": from.String(), + "type": TypeName(frame.Type), + "frame_bytes": len(frame.Payload), + "max_bytes": s.cfg.InboxMaxBytes, }) } return fmt.Errorf("inbox byte budget exceeded: %d + %d > %d", diff --git a/zz_filestream_test.go b/zz_filestream_test.go new file mode 100644 index 0000000..a7ce80f --- /dev/null +++ b/zz_filestream_test.go @@ -0,0 +1,222 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later + +package dataexchange + +import ( + "bytes" + "crypto/sha256" + "encoding/hex" + "io" + "net" + "os" + "path/filepath" + "testing" + "time" +) + +// runStreamReceiver drives a StreamReceiver over conn until conn closes. +func runStreamReceiver(conn net.Conn, dir string, done chan<- error) { + sr := NewStreamReceiver(dir, nil, nil) + defer sr.Close() + for { + f, err := ReadFrame(conn) + if err != nil { + done <- err + return + } + if f.Type != TypeFileStream { + continue + } + if resp := sr.HandleFrame(f); resp != nil { + if werr := WriteFrame(conn, resp); werr != nil { + done <- werr + return + } + } + } +} + +func makePayload(n int) []byte { + b := make([]byte, n) + for i := range b { + b[i] = byte((i*131 + 7) & 0xff) + } + return b +} + +func transferIDOf(data []byte) [transferIDLen]byte { + sum := sha256.Sum256(data) + var id [transferIDLen]byte + copy(id[:], sum[:transferIDLen]) + return id +} + +func TestFileStream_HappyPath(t *testing.T) { + dir := t.TempDir() + // Span several chunks plus a partial tail. + data := makePayload(3*StreamChunkSize + 12345) + wantSum := sha256.Sum256(data) + + cli, srv := net.Pipe() + done := make(chan error, 1) + go runStreamReceiver(srv, dir, done) + + res, err := streamSend(cli, "hello.bin", bytes.NewReader(data), int64(len(data)), 10*time.Second) + if err != nil { + t.Fatalf("streamSend: %v", err) + } + _ = cli.Close() + <-done + + if !res.OK { + t.Fatalf("transfer not OK: %s", res.Message) + } + if res.BytesResumed != 0 { + t.Errorf("BytesResumed = %d, want 0", res.BytesResumed) + } + if res.BytesSent != int64(len(data)) { + t.Errorf("BytesSent = %d, want %d", res.BytesSent, len(data)) + } + if res.Sha256 != hex.EncodeToString(wantSum[:]) { + t.Errorf("Sha256 = %s, want %s", res.Sha256, hex.EncodeToString(wantSum[:])) + } + + // Exactly one file in dir (besides the .partial dir), content matches. + assertSingleReceivedFile(t, dir, data) +} + +func TestFileStream_Resume(t *testing.T) { + dir := t.TempDir() + data := makePayload(2*StreamChunkSize + 555) + resumeAt := StreamChunkSize + 100 // mid-second-chunk + + // Pre-seed a .partial holding the first resumeAt bytes, as if a prior + // transfer was killed. transfer_id is content-derived, so the receiver + // will recognize it on INIT. + id := transferIDOf(data) + partialDir := filepath.Join(dir, ".partial") + if err := os.MkdirAll(partialDir, 0700); err != nil { + t.Fatal(err) + } + partial := filepath.Join(partialDir, hex.EncodeToString(id[:])) + if err := os.WriteFile(partial, data[:resumeAt], 0600); err != nil { + t.Fatal(err) + } + + cli, srv := net.Pipe() + done := make(chan error, 1) + go runStreamReceiver(srv, dir, done) + + res, err := streamSend(cli, "resumed.bin", bytes.NewReader(data), int64(len(data)), 10*time.Second) + if err != nil { + t.Fatalf("streamSend: %v", err) + } + _ = cli.Close() + <-done + + if !res.OK { + t.Fatalf("transfer not OK: %s", res.Message) + } + if res.BytesResumed != int64(resumeAt) { + t.Errorf("BytesResumed = %d, want %d", res.BytesResumed, resumeAt) + } + if res.BytesSent != int64(len(data)-resumeAt) { + t.Errorf("BytesSent = %d, want %d (should not re-send resumed bytes)", res.BytesSent, len(data)-resumeAt) + } + assertSingleReceivedFile(t, dir, data) +} + +func TestFileStream_CorruptResumeRejected(t *testing.T) { + // A .partial whose bytes do NOT match the declared content must be + // caught by the full-hash verification at DONE. + dir := t.TempDir() + data := makePayload(StreamChunkSize + 10) + resumeAt := 500 + id := transferIDOf(data) + partialDir := filepath.Join(dir, ".partial") + if err := os.MkdirAll(partialDir, 0700); err != nil { + t.Fatal(err) + } + corrupt := make([]byte, resumeAt) // zeros, not data[:resumeAt] + partial := filepath.Join(partialDir, hex.EncodeToString(id[:])) + if err := os.WriteFile(partial, corrupt, 0600); err != nil { + t.Fatal(err) + } + + cli, srv := net.Pipe() + done := make(chan error, 1) + go runStreamReceiver(srv, dir, done) + + res, err := streamSend(cli, "corrupt.bin", bytes.NewReader(data), int64(len(data)), 10*time.Second) + _ = cli.Close() + <-done + if err != nil { + // A protocol-level error is acceptable here too. + return + } + if res.OK { + t.Fatalf("expected corrupt resume to be rejected, but transfer reported OK") + } +} + +func TestFileStream_CodecRoundTrip(t *testing.T) { + id := transferIDOf([]byte("abc")) + var hash [32]byte + copy(hash[:], makePayload(32)) + + init := encodeInit(id, 1<<40, hash, StreamChunkSize, "a/b/c.bin") + k, gotID, body, ok := decodeStreamFrame(init) + if !ok || k != streamKindInit || gotID != id { + t.Fatalf("decode init header: ok=%v kind=%d", ok, k) + } + size, gotHash, cs, name, ok := decodeInit(body) + if !ok || size != 1<<40 || gotHash != hash || cs != StreamChunkSize || name != "a/b/c.bin" { + t.Fatalf("decodeInit mismatch: size=%d cs=%d name=%q", size, cs, name) + } + + ck := encodeChunk(id, 4096, []byte("payload")) + _, _, cbody, _ := decodeStreamFrame(ck) + off, cdata, ok := decodeChunk(cbody) + if !ok || off != 4096 || string(cdata) != "payload" { + t.Fatalf("decodeChunk mismatch: off=%d data=%q", off, cdata) + } + + cf := encodeComplete(id, false, "boom") + _, _, fbody, _ := decodeStreamFrame(cf) + cok, msg := decodeComplete(fbody) + if cok || msg != "boom" { + t.Fatalf("decodeComplete mismatch: ok=%v msg=%q", cok, msg) + } +} + +func assertSingleReceivedFile(t *testing.T, dir string, want []byte) { + t.Helper() + entries, err := os.ReadDir(dir) + if err != nil { + t.Fatal(err) + } + var files []string + for _, e := range entries { + if e.IsDir() { + continue // skip .partial + } + files = append(files, e.Name()) + } + if len(files) != 1 { + t.Fatalf("expected 1 received file, got %d: %v", len(files), files) + } + got, err := os.ReadFile(filepath.Join(dir, files[0])) + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(got, want) { + t.Fatalf("received content mismatch: got %d bytes, want %d", len(got), len(want)) + } + // .partial must be cleaned up on success. + if _, err := os.Stat(filepath.Join(dir, ".partial")); err == nil { + if rem, _ := os.ReadDir(filepath.Join(dir, ".partial")); len(rem) != 0 { + t.Errorf(".partial not cleaned: %d files remain", len(rem)) + } + } + _ = io.Discard +} diff --git a/zz_max_frame_env_test.go b/zz_max_frame_env_test.go new file mode 100644 index 0000000..51ae235 --- /dev/null +++ b/zz_max_frame_env_test.go @@ -0,0 +1,29 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later + +package dataexchange + +import "testing" + +// TestMaxFrameSize_DefaultAndConstants pins the default cap so a future +// bump is forced to update tests + CHANGELOG alongside the code. +// +// The env-driven override (PILOT_DATAEXCHANGE_MAX_FRAME) is evaluated at +// package init; testing it after the fact would require restarting the +// process, which `go test` can't do cleanly. Instead we verify the +// init-time invariants here and trust the override path through code +// review. +func TestMaxFrameSize_DefaultAndConstants(t *testing.T) { + t.Parallel() + if DefaultMaxFrameSize != 1<<30 { + t.Errorf("DefaultMaxFrameSize = %d; want %d (1 GiB)", DefaultMaxFrameSize, 1<<30) + } + // When PILOT_DATAEXCHANGE_MAX_FRAME is unset (as it is in CI), + // MaxFrameSize must equal the default. A non-default value here + // means the test environment is overriding the cap and the rest of + // the suite may behave unexpectedly — fail loudly so the operator + // sees it. + if MaxFrameSize != DefaultMaxFrameSize { + t.Errorf("MaxFrameSize = %d; want %d (default) — is PILOT_DATAEXCHANGE_MAX_FRAME set?", + MaxFrameSize, DefaultMaxFrameSize) + } +} diff --git a/zz_service_test.go b/zz_service_test.go index 481c0b6..ad54989 100644 --- a/zz_service_test.go +++ b/zz_service_test.go @@ -20,10 +20,10 @@ import ( // fakeStream is a coreapi.Stream backed by io.Pipe pairs. type fakeStream struct { - r *io.PipeReader - w *io.PipeWriter - closed chan struct{} - closeOnce sync.Once + r *io.PipeReader + w *io.PipeWriter + closed chan struct{} + closeOnce sync.Once } func newFakeStream(r *io.PipeReader, w *io.PipeWriter) *fakeStream { @@ -40,11 +40,11 @@ func (s *fakeStream) Close() error { }) return nil } -func (s *fakeStream) LocalAddr() coreapi.Addr { return protocol.Addr{} } -func (s *fakeStream) LocalPort() uint16 { return 1001 } -func (s *fakeStream) RemoteAddr() coreapi.Addr { return protocol.Addr{Node: 0xCAFE} } -func (s *fakeStream) RemotePort() uint16 { return 33000 } -func (s *fakeStream) SetDeadline(time.Time) error { return nil } +func (s *fakeStream) LocalAddr() coreapi.Addr { return protocol.Addr{} } +func (s *fakeStream) LocalPort() uint16 { return 1001 } +func (s *fakeStream) RemoteAddr() coreapi.Addr { return protocol.Addr{Node: 0xCAFE} } +func (s *fakeStream) RemotePort() uint16 { return 33000 } +func (s *fakeStream) SetDeadline(time.Time) error { return nil } func (s *fakeStream) SetReadDeadline(time.Time) error { return nil } func (s *fakeStream) SetWriteDeadline(time.Time) error { return nil } @@ -84,7 +84,7 @@ func (l *fakeListener) Port() uint16 { return 1001 } // fakeStreams returns a programmable listener. type fakeStreams struct { - listener coreapi.Listener + listener coreapi.Listener listenErr error } diff --git a/zz_test.go b/zz_test.go index 10a01fb..e83caea 100644 --- a/zz_test.go +++ b/zz_test.go @@ -11,8 +11,9 @@ import ( // TestReadFrame_SizeCap verifies MaxFrameSize acceptance / rejection. // Replaces the core assertion behind test_size_file_500mb_reject.sh — -// a 500 MiB frame must be rejected cleanly, while frames up to MaxFrameSize -// (256 MiB) are accepted. +// a frame past the cap must be rejected cleanly without allocating the +// payload, while frames up to and including MaxFrameSize are accepted. +// Default cap was raised from 256 MiB to 1 GiB on 2026-06-14. func TestReadFrame_SizeCap(t *testing.T) { t.Parallel() cases := []struct { @@ -25,7 +26,6 @@ func TestReadFrame_SizeCap(t *testing.T) { {"just_under_cap", MaxFrameSize - 1, false}, {"at_cap", MaxFrameSize, false}, {"just_over_cap", MaxFrameSize + 1, true}, - {"way_over_cap", 1 << 30, true}, // 1 GiB } for _, tc := range cases { t.Run(tc.name, func(t *testing.T) {