From 702c86255098d0b1b44341148be2b34a691a6352 Mon Sep 17 00:00:00 2001 From: "Peter Z." Date: Sat, 30 May 2026 22:47:11 +0200 Subject: [PATCH 1/6] add datagram handshake message fragmenter Splits a serialized CH-KEM handshake message into datagram HANDSHAKE frames within the MTU, the inverse of the Reassembler. Each fragment carries a distinct increasing sequence and the same total length, so the peer reassembles in any order. Rejects empty and oversize messages. Round-trip tests reassemble out of order and confirm each frame stays under the MTU. The retransmission state machine and responder flight cache build on this. --- pkg/tunnel/dgram_handshake.go | 61 +++++++++++++++++++++++ pkg/tunnel/dgram_handshake_test.go | 77 ++++++++++++++++++++++++++++++ 2 files changed, 138 insertions(+) create mode 100644 pkg/tunnel/dgram_handshake.go create mode 100644 pkg/tunnel/dgram_handshake_test.go diff --git a/pkg/tunnel/dgram_handshake.go b/pkg/tunnel/dgram_handshake.go new file mode 100644 index 0000000..f7b322b --- /dev/null +++ b/pkg/tunnel/dgram_handshake.go @@ -0,0 +1,61 @@ +// Package tunnel - dgram_handshake.go drives the CH-KEM handshake over the +// connectionless datagram transport. Unlike the stream handshake it tolerates +// loss, reordering, and duplication, and it splits the large post-quantum +// handshake messages (the ~1.6 KB Hellos) across multiple datagrams. +// +// This file provides the send-side fragmenter, the inverse of the Reassembler. +// The retransmission state machine and the responder flight cache build on it. +package tunnel + +import ( + "github.com/sara-star-quant/quantum-go/internal/constants" + qerrors "github.com/sara-star-quant/quantum-go/internal/errors" + "github.com/sara-star-quant/quantum-go/pkg/protocol" +) + +// maxHandshakeFragmentPayload is the largest handshake message slice that fits +// in one datagram after the common header and the handshake extension (and any +// cookie). It derives from the wire-format sizes in pkg/protocol so it stays +// correct if those change. +func maxHandshakeFragmentPayload(cookieLen int) int { + return constants.DatagramMTU - protocol.DatagramHeaderSize - protocol.DatagramHandshakeExtSize - cookieLen +} + +// fragmentHandshake splits a serialized handshake message into one or more +// datagram HANDSHAKE frames, each within the datagram MTU. base supplies the +// per-frame header fields (Epoch, RecvIndex, SenderIndex, MsgType, and an +// optional Cookie); fragmentHandshake fills FragOffset, FragLength, and +// TotalLength per fragment and assigns each frame a distinct, increasing +// sequence number starting at base.Seq. The frames reassemble in any order on +// the peer. +func fragmentHandshake(base protocol.DatagramHandshakeHeader, msg []byte) ([][]byte, error) { + total := len(msg) + if total == 0 || total > constants.DatagramMaxHandshakeMessageSize { + return nil, qerrors.ErrMessageTooLarge + } + maxFrag := maxHandshakeFragmentPayload(len(base.Cookie)) + if maxFrag <= 0 { + return nil, qerrors.ErrInvalidMessage + } + + frames := make([][]byte, 0, (total+maxFrag-1)/maxFrag) + seq := base.Seq + for off := 0; off < total; off += maxFrag { + end := off + maxFrag + if end > total { + end = total + } + h := base + h.Seq = seq + h.FragOffset = uint16(off) + h.FragLength = uint16(end - off) + h.TotalLength = uint16(total) + frame, err := protocol.EncodeDatagramHandshake(h, msg[off:end]) + if err != nil { + return nil, err + } + frames = append(frames, frame) + seq++ + } + return frames, nil +} diff --git a/pkg/tunnel/dgram_handshake_test.go b/pkg/tunnel/dgram_handshake_test.go new file mode 100644 index 0000000..f2f00b2 --- /dev/null +++ b/pkg/tunnel/dgram_handshake_test.go @@ -0,0 +1,77 @@ +package tunnel + +import ( + "bytes" + "testing" + "time" + + "github.com/sara-star-quant/quantum-go/internal/constants" + "github.com/sara-star-quant/quantum-go/pkg/protocol" +) + +func TestFragmentHandshake_RoundTrip(t *testing.T) { + // A message larger than one datagram must split and reassemble exactly. + msg := make([]byte, 1644) // ~ ClientHello size, exceeds the 1200 MTU + for i := range msg { + msg[i] = byte(i) + } + base := protocol.DatagramHandshakeHeader{ + DatagramHeader: protocol.DatagramHeader{RecvIndex: 7, Seq: 100}, + SenderIndex: 42, + MsgType: protocol.MessageTypeClientHello, + } + + frames, err := fragmentHandshake(base, msg) + if err != nil { + t.Fatal(err) + } + if len(frames) < 2 { + t.Fatalf("expected multiple fragments for %d bytes, got %d", len(msg), len(frames)) + } + + // Reassemble in reverse order to exercise reordering tolerance. + r := NewReassembler(4, 8192, time.Second) + var out []byte + var done bool + for i := len(frames) - 1; i >= 0; i-- { + if len(frames[i]) > constants.DatagramMTU { + t.Fatalf("frame %d is %d bytes, over MTU %d", i, len(frames[i]), constants.DatagramMTU) + } + h, frag, perr := protocol.ParseDatagramHandshake(frames[i]) + if perr != nil { + t.Fatalf("frame %d parse: %v", i, perr) + } + out, done, err = r.Add("peer", h, frag) + if err != nil { + t.Fatalf("frame %d reassemble: %v", i, err) + } + } + if !done { + t.Fatal("reassembly did not complete") + } + if !bytes.Equal(out, msg) { + t.Fatal("reassembled message differs from the original") + } +} + +func TestFragmentHandshake_SingleFragment(t *testing.T) { + base := protocol.DatagramHandshakeHeader{MsgType: protocol.MessageTypeClientHello} + frames, err := fragmentHandshake(base, []byte("short message")) + if err != nil { + t.Fatal(err) + } + if len(frames) != 1 { + t.Fatalf("a small message should be one frame, got %d", len(frames)) + } +} + +func TestFragmentHandshake_RejectsEmptyAndOversize(t *testing.T) { + base := protocol.DatagramHandshakeHeader{MsgType: protocol.MessageTypeClientHello} + if _, err := fragmentHandshake(base, nil); err == nil { + t.Fatal("an empty message must be rejected") + } + big := make([]byte, constants.DatagramMaxHandshakeMessageSize+1) + if _, err := fragmentHandshake(base, big); err == nil { + t.Fatal("an oversize message must be rejected") + } +} From 780cfb76e0f87d9c5581330bec6e00d146328896 Mon Sep 17 00:00:00 2001 From: "Peter Z." Date: Sun, 31 May 2026 00:08:14 +0200 Subject: [PATCH 2/6] add transport-agnostic datagram handshake state machine --- pkg/tunnel/dgram_handshake_fsm.go | 148 ++++++++++++++++++++++ pkg/tunnel/dgram_handshake_fsm_test.go | 169 +++++++++++++++++++++++++ 2 files changed, 317 insertions(+) create mode 100644 pkg/tunnel/dgram_handshake_fsm.go create mode 100644 pkg/tunnel/dgram_handshake_fsm_test.go diff --git a/pkg/tunnel/dgram_handshake_fsm.go b/pkg/tunnel/dgram_handshake_fsm.go new file mode 100644 index 0000000..135e53c --- /dev/null +++ b/pkg/tunnel/dgram_handshake_fsm.go @@ -0,0 +1,148 @@ +// Package tunnel - dgram_handshake_fsm.go is the transport-agnostic handshake +// state machine for the datagram path. It drives the existing CH-KEM flight +// builders (handshake.go) over an unreliable transport: it classifies each +// inbound handshake message as advance / replay / drop and caches its last +// outbound message so a peer retransmit is answered from bytes, never by +// re-invoking a builder (the builders zeroize their secrets via cleanup() on +// completion, so re-invocation is neither correct nor possible). +// +// This file holds no sockets, timers, fragmentation, or sequence numbers; those +// belong to the driver (dgram_handshake_driver.go) and the endpoint wiring +// (datagram.go). Each transition produces exactly one handshake message. +package tunnel + +import "github.com/sara-star-quant/quantum-go/pkg/protocol" + +// hsMessage is one serialized handshake message plus its type. The bytes are the +// verbatim output of a flight builder (a plaintext codec frame for the Hellos, or +// self-contained ciphertext for the Finished messages). +type hsMessage struct { + typ protocol.MessageType + body []byte +} + +// dgramHandshake is the per-handshake state machine. It wraps a *Handshake (the +// crypto building blocks) and tracks, for the datagram transport, which inbound +// message advances the handshake, which one is a retransmit to replay, and the +// last outbound message to replay. +type dgramHandshake struct { + hs *Handshake + role Role + + // expecting is the inbound message type that advances the handshake from the + // current state. It is zero (not a valid MessageType) once complete. + expecting protocol.MessageType + // prev is the most recently processed inbound type; a repeat of it is a + // retransmit and replays cached. + prev protocol.MessageType + // cached is the last outbound message produced, replayed on a retransmit. + cached *hsMessage +} + +// newDgramHandshake builds an FSM for session. The initiator must call start() to +// emit the ClientHello; the responder waits for one. +func newDgramHandshake(session *Session) *dgramHandshake { + d := &dgramHandshake{ + hs: NewHandshake(session), + role: session.Role, + } + if d.role == RoleResponder { + d.expecting = protocol.MessageTypeClientHello + } + return d +} + +// start emits the initiator's ClientHello and arms the FSM to expect a +// ServerHello. It is a no-op error for the responder. +func (d *dgramHandshake) start() (*hsMessage, error) { + body, err := d.hs.CreateClientHello() + if err != nil { + return nil, err + } + d.cached = &hsMessage{typ: protocol.MessageTypeClientHello, body: body} + d.expecting = protocol.MessageTypeServerHello + return d.cached, nil +} + +// onMessage feeds one reassembled inbound handshake message to the FSM. It returns +// the outbound message to send (nil if none) and whether the handshake is now +// complete. A single inbound never fails the handshake: an unexpected type, or a +// builder error on the expected type (malformed/forged/decap failure), is dropped +// (nil, false) so the genuine retransmit can still complete it. A repeat of the +// last processed message replays the cached outbound. +func (d *dgramHandshake) onMessage(typ protocol.MessageType, body []byte) (out *hsMessage, complete bool) { + switch { + case typ != 0 && typ == d.expecting: + return d.advance(typ, body) + case typ != 0 && typ == d.prev && d.cached != nil: + return d.cached, false // retransmit: replay last flight, no re-invocation + default: + return nil, false // stray / out-of-order / post-complete: drop + } +} + +// advance runs the builder pair for the expected inbound type. It commits the new +// outbound, prev, and expecting only when every builder call succeeds; on any +// builder error it leaves all FSM state untouched and drops, so a later genuine +// retransmit retries the same transition. +func (d *dgramHandshake) advance(typ protocol.MessageType, body []byte) (out *hsMessage, complete bool) { + switch { + case d.role == RoleInitiator && typ == protocol.MessageTypeServerHello: + if err := d.hs.ProcessServerHello(body); err != nil { + return nil, false + } + reply, err := d.hs.CreateClientFinished() + if err != nil { + return nil, false + } + d.commit(typ, &hsMessage{typ: protocol.MessageTypeClientFinished, body: reply}, protocol.MessageTypeServerFinished) + return d.cached, false + + case d.role == RoleInitiator && typ == protocol.MessageTypeServerFinished: + if err := d.hs.ProcessServerFinished(body); err != nil { + return nil, false + } + d.prev = typ + d.expecting = 0 // no further inbound + return nil, true + + case d.role == RoleResponder && typ == protocol.MessageTypeClientHello: + if err := d.hs.ProcessClientHello(body); err != nil { + return nil, false + } + reply, err := d.hs.CreateServerHello() + if err != nil { + return nil, false + } + d.commit(typ, &hsMessage{typ: protocol.MessageTypeServerHello, body: reply}, protocol.MessageTypeClientFinished) + return d.cached, false + + case d.role == RoleResponder && typ == protocol.MessageTypeClientFinished: + if err := d.hs.ProcessClientFinished(body); err != nil { + return nil, false + } + reply, err := d.hs.CreateServerFinished() + if err != nil { + return nil, false + } + d.commit(typ, &hsMessage{typ: protocol.MessageTypeServerFinished, body: reply}, 0) + return d.cached, true + + default: + return nil, false + } +} + +// commit records a successful transition: the inbound just processed (prev), the +// new outbound to cache and replay, and the next inbound to expect. +func (d *dgramHandshake) commit(processed protocol.MessageType, out *hsMessage, next protocol.MessageType) { + d.prev = processed + d.cached = out + d.expecting = next +} + +// state reports the wrapped handshake's state. +func (d *dgramHandshake) state() HandshakeState { return d.hs.State() } + +// isComplete reports whether the handshake completed successfully. +func (d *dgramHandshake) isComplete() bool { return d.hs.IsComplete() } diff --git a/pkg/tunnel/dgram_handshake_fsm_test.go b/pkg/tunnel/dgram_handshake_fsm_test.go new file mode 100644 index 0000000..4542b7f --- /dev/null +++ b/pkg/tunnel/dgram_handshake_fsm_test.go @@ -0,0 +1,169 @@ +package tunnel + +import ( + "bytes" + "testing" + + "github.com/sara-star-quant/quantum-go/pkg/protocol" +) + +// newFSMPair builds an initiator/responder FSM pair over fresh sessions. +func newFSMPair(t *testing.T) (initiator, responder *dgramHandshake) { + t.Helper() + ci, err := NewSession(RoleInitiator) + if err != nil { + t.Fatalf("initiator session: %v", err) + } + ri, err := NewSession(RoleResponder) + if err != nil { + t.Fatalf("responder session: %v", err) + } + return newDgramHandshake(ci), newDgramHandshake(ri) +} + +// TestDgramHandshakeFSMRoundTrip drives a full handshake by passing each FSM's +// output straight into the peer, no transport. Reaching Complete on the initiator +// cryptographically proves key agreement: ProcessServerFinished verifies a MAC +// computed over the shared secret and transcript. +func TestDgramHandshakeFSMRoundTrip(t *testing.T) { + initiator, responder := newFSMPair(t) + + ch, err := initiator.start() + if err != nil { + t.Fatalf("start: %v", err) + } + if ch.typ != protocol.MessageTypeClientHello { + t.Fatalf("start produced %v, want ClientHello", ch.typ) + } + + sh, done := responder.onMessage(ch.typ, ch.body) + if sh == nil || sh.typ != protocol.MessageTypeServerHello || done { + t.Fatalf("responder(ClientHello) = %v, done=%v; want ServerHello, false", sh, done) + } + + cf, done := initiator.onMessage(sh.typ, sh.body) + if cf == nil || cf.typ != protocol.MessageTypeClientFinished || done { + t.Fatalf("initiator(ServerHello) = %v, done=%v; want ClientFinished, false", cf, done) + } + + sf, done := responder.onMessage(cf.typ, cf.body) + if sf == nil || sf.typ != protocol.MessageTypeServerFinished || !done { + t.Fatalf("responder(ClientFinished) = %v, done=%v; want ServerFinished, true", sf, done) + } + if !responder.isComplete() { + t.Fatal("responder not complete after sending ServerFinished") + } + + out, done := initiator.onMessage(sf.typ, sf.body) + if out != nil || !done { + t.Fatalf("initiator(ServerFinished) = %v, done=%v; want nil, true", out, done) + } + if !initiator.isComplete() { + t.Fatal("initiator not complete after receiving ServerFinished") + } + + if got := initiator.hs.session.State(); got != SessionStateEstablished { + t.Fatalf("initiator session state = %v, want Established", got) + } + if got := responder.hs.session.State(); got != SessionStateEstablished { + t.Fatalf("responder session state = %v, want Established", got) + } +} + +// TestDgramHandshakeFSMDuplicateReplay verifies a retransmitted ClientHello, after +// the responder already answered, replays the byte-identical cached ServerHello +// without re-running the KEM (re-encapsulation would derive a different secret). +func TestDgramHandshakeFSMDuplicateReplay(t *testing.T) { + initiator, responder := newFSMPair(t) + + ch, _ := initiator.start() + sh1, _ := responder.onMessage(ch.typ, ch.body) + if sh1 == nil { + t.Fatal("no ServerHello on first ClientHello") + } + stateAfter := responder.state() + + sh2, done := responder.onMessage(ch.typ, ch.body) // duplicate ClientHello + if done { + t.Fatal("duplicate ClientHello reported complete") + } + if sh2 == nil || !bytes.Equal(sh1.body, sh2.body) { + t.Fatal("duplicate ClientHello did not replay the identical ServerHello") + } + if responder.state() != stateAfter { + t.Fatal("duplicate ClientHello advanced responder state") + } +} + +// TestDgramHandshakeFSMBadMessageDropped verifies a tampered ServerFinished is +// dropped, not fatal: the initiator stays put and a genuine retransmit still +// completes the handshake. +func TestDgramHandshakeFSMBadMessageDropped(t *testing.T) { + initiator, responder := newFSMPair(t) + + ch, _ := initiator.start() + sh, _ := responder.onMessage(ch.typ, ch.body) + cf, _ := initiator.onMessage(sh.typ, sh.body) + sf, done := responder.onMessage(cf.typ, cf.body) + if !done { + t.Fatal("responder not complete") + } + + tampered := append([]byte(nil), sf.body...) + tampered[len(tampered)-1] ^= 0xFF + out, done := initiator.onMessage(sf.typ, tampered) + if out != nil || done { + t.Fatalf("tampered ServerFinished = %v, done=%v; want nil, false (dropped)", out, done) + } + if initiator.isComplete() { + t.Fatal("tampered ServerFinished wrongly completed the handshake") + } + + if out, done := initiator.onMessage(sf.typ, sf.body); out != nil || !done { + t.Fatalf("genuine ServerFinished after tamper = %v, done=%v; want nil, true", out, done) + } + if !initiator.isComplete() { + t.Fatal("genuine ServerFinished did not complete the handshake") + } +} + +// TestDgramHandshakeFSMReplayAfterComplete verifies the responder, after +// completing, still replays its cached ServerFinished for a retransmitted +// ClientFinished (recovers a lost final flight) and stays complete. +func TestDgramHandshakeFSMReplayAfterComplete(t *testing.T) { + initiator, responder := newFSMPair(t) + + ch, _ := initiator.start() + sh, _ := responder.onMessage(ch.typ, ch.body) + cf, _ := initiator.onMessage(sh.typ, sh.body) + sf1, done := responder.onMessage(cf.typ, cf.body) + if !done || sf1 == nil { + t.Fatal("responder did not complete with a ServerFinished") + } + + sf2, done := responder.onMessage(cf.typ, cf.body) // duplicate ClientFinished + if done { + t.Fatal("duplicate ClientFinished re-reported completion transition") + } + if sf2 == nil || !bytes.Equal(sf1.body, sf2.body) { + t.Fatal("duplicate ClientFinished did not replay the identical ServerFinished") + } + if !responder.isComplete() { + t.Fatal("responder lost completion after replay") + } +} + +// TestDgramHandshakeFSMUnexpectedDropped verifies an out-of-order message (a +// ServerHello arriving at a fresh responder) is dropped with no state change. +func TestDgramHandshakeFSMUnexpectedDropped(t *testing.T) { + _, responder := newFSMPair(t) + before := responder.state() + + out, done := responder.onMessage(protocol.MessageTypeServerHello, []byte("garbage")) + if out != nil || done { + t.Fatalf("unexpected ServerHello = %v, done=%v; want nil, false", out, done) + } + if responder.state() != before { + t.Fatal("unexpected message changed responder state") + } +} From 2bed79b5e95dd25dbb862cf1aa3e8fa7b38dffa4 Mon Sep 17 00:00:00 2001 From: "Peter Z." Date: Sun, 31 May 2026 00:12:46 +0200 Subject: [PATCH 3/6] add datagram handshake reliability driver with retransmission and linger --- internal/constants/constants.go | 5 + pkg/tunnel/dgram_handshake_driver.go | 194 ++++++++++++++++++++++ pkg/tunnel/dgram_handshake_driver_test.go | 191 +++++++++++++++++++++ 3 files changed, 390 insertions(+) create mode 100644 pkg/tunnel/dgram_handshake_driver.go create mode 100644 pkg/tunnel/dgram_handshake_driver_test.go diff --git a/internal/constants/constants.go b/internal/constants/constants.go index f8502d3..5cbe153 100644 --- a/internal/constants/constants.go +++ b/internal/constants/constants.go @@ -196,6 +196,11 @@ const ( // it aborts the handshake. DatagramHandshakeMaxRetries = 8 + // DatagramHandshakeLingerReplays bounds how many times a completed responder + // will replay its cached ServerFinished for a retransmitted ClientFinished. + // It caps post-handshake reflection from a spoofed final flight. + DatagramHandshakeLingerReplays = 8 + // DatagramIdleTimeoutSeconds is how long a session may be idle before it is // reaped (there is no FIN over UDP; close is best-effort). DatagramIdleTimeoutSeconds = 120 diff --git a/pkg/tunnel/dgram_handshake_driver.go b/pkg/tunnel/dgram_handshake_driver.go new file mode 100644 index 0000000..ae653af --- /dev/null +++ b/pkg/tunnel/dgram_handshake_driver.go @@ -0,0 +1,194 @@ +// Package tunnel - dgram_handshake_driver.go adds reliability around the +// transport-agnostic FSM (dgram_handshake_fsm.go): retransmission with +// exponential backoff, a retry ceiling that bounds an un-established handshake's +// lifetime, and role-specific completion (the responder lingers briefly to answer +// a retransmitted ClientFinished with its cached ServerFinished, recovering a lost +// final flight). +// +// The driver is event-driven and clock-injected so it is testable without sockets +// or real timers: feed it inbound messages and timer ticks, and it returns the +// messages to send. The endpoint wiring (datagram.go) owns the goroutine, the +// real clock, fragmentation, and the socket. +package tunnel + +import ( + "time" + + "github.com/sara-star-quant/quantum-go/internal/constants" + "github.com/sara-star-quant/quantum-go/pkg/protocol" +) + +type driverStatus int + +const ( + driverRunning driverStatus = iota + driverEstablished + driverFailed +) + +// dgramDriver wraps a dgramHandshake with time. All methods are called by the one +// goroutine that owns this handshake, so the driver needs no locking. +type dgramDriver struct { + fsm *dgramHandshake + now func() time.Time + + rto time.Duration // current retransmit timeout + retries int + + // retransmitAt is when to resend the cached flight (zero = disarmed). + retransmitAt time.Time + // lingerAt is when a completed responder stops answering retransmits + // (zero = not lingering). + lingerAt time.Time + replaysLeft int + + status driverStatus +} + +func initialRTO() time.Duration { + return time.Duration(constants.DatagramHandshakeInitialTimeoutMillis) * time.Millisecond +} + +func maxRTO() time.Duration { + return time.Duration(constants.DatagramHandshakeMaxTimeoutMillis) * time.Millisecond +} + +// newDgramDriver builds a driver for session. now may be nil for the real clock. +func newDgramDriver(session *Session, now func() time.Time) *dgramDriver { + if now == nil { + now = time.Now + } + return &dgramDriver{ + fsm: newDgramHandshake(session), + now: now, + rto: initialRTO(), + status: driverRunning, + } +} + +// start emits the initiator's ClientHello and arms its retransmit timer. +func (d *dgramDriver) start() ([]hsMessage, error) { + msg, err := d.fsm.start() + if err != nil { + return nil, err + } + d.armRetransmit() + return []hsMessage{*msg}, nil +} + +// onInbound feeds one reassembled handshake message and returns the messages to +// send in response (a freshly advanced flight, or a replayed cached flight). It +// distinguishes advance from replay by whether the FSM state moved, so it knows +// when to reset the backoff and when to charge the linger replay cap. +func (d *dgramDriver) onInbound(typ protocol.MessageType, body []byte) []hsMessage { + if d.status == driverFailed { + return nil + } + if d.status == driverEstablished && !d.lingering() { + return nil // initiator (or expired responder) is done; ignore further input + } + + before := d.fsm.state() + out, complete := d.fsm.onMessage(typ, body) + if out == nil && !complete { + return nil // dropped + } + advanced := d.fsm.state() != before + + if complete { + return d.onComplete(out) + } + if advanced { + d.rto = initialRTO() + d.retries = 0 + d.armRetransmit() + return []hsMessage{*out} + } + // Replay: peer retransmitted; resend cached without touching our backoff or + // ceiling. During linger this is the recovery path, bounded by the cap. + if d.lingering() { + if d.replaysLeft <= 0 { + return nil + } + d.replaysLeft-- + } + return []hsMessage{*out} +} + +// onComplete records establishment and, for the responder, opens the bounded +// linger window during which a retransmitted ClientFinished is answered with the +// cached ServerFinished. final is the responder's ServerFinished (nil for the +// initiator, which sends nothing on completion). +func (d *dgramDriver) onComplete(final *hsMessage) []hsMessage { + d.status = driverEstablished + d.retransmitAt = time.Time{} + if d.fsm.role == RoleResponder { + d.lingerAt = d.now().Add(maxRTO()) + d.replaysLeft = constants.DatagramHandshakeLingerReplays + } + if final != nil { + return []hsMessage{*final} + } + return nil +} + +// onTimeout handles an elapsed timer: it ends an expired linger, or retransmits +// the cached flight (growing the backoff) until the retry ceiling, past which the +// handshake fails. It returns the messages to resend, if any. +func (d *dgramDriver) onTimeout() []hsMessage { + now := d.now() + + if d.lingering() && !now.Before(d.lingerAt) { + d.lingerAt = time.Time{} // linger over; driver is now fully done + return nil + } + + if d.status == driverRunning && !d.retransmitAt.IsZero() && !now.Before(d.retransmitAt) { + d.retries++ + if d.retries > constants.DatagramHandshakeMaxRetries { + d.status = driverFailed + d.retransmitAt = time.Time{} + return nil + } + d.rto = min(d.rto*2, maxRTO()) + d.armRetransmit() + if d.fsm.cached != nil { + return []hsMessage{*d.fsm.cached} + } + } + return nil +} + +// nextWake reports the earliest time onTimeout must be called, or the zero time if +// no timer is armed (the driver is terminal). +func (d *dgramDriver) nextWake() time.Time { + switch { + case d.status == driverRunning: + return d.retransmitAt + case d.lingering(): + return d.lingerAt + default: + return time.Time{} + } +} + +func (d *dgramDriver) armRetransmit() { d.retransmitAt = d.now().Add(d.rto) } + +func (d *dgramDriver) lingering() bool { + return d.status == driverEstablished && d.fsm.role == RoleResponder && !d.lingerAt.IsZero() +} + +// established reports a successful handshake (the session is keyed). +func (d *dgramDriver) established() bool { return d.status == driverEstablished } + +// failed reports the handshake aborted (retry ceiling exceeded). +func (d *dgramDriver) failed() bool { return d.status == driverFailed } + +// done reports the owner can stop driving: failed, or established with no linger +// left to serve. +func (d *dgramDriver) done() bool { + return d.status == driverFailed || (d.status == driverEstablished && !d.lingering()) +} + +// session returns the underlying session (established once established() is true). +func (d *dgramDriver) session() *Session { return d.fsm.hs.session } diff --git a/pkg/tunnel/dgram_handshake_driver_test.go b/pkg/tunnel/dgram_handshake_driver_test.go new file mode 100644 index 0000000..0ab3c4c --- /dev/null +++ b/pkg/tunnel/dgram_handshake_driver_test.go @@ -0,0 +1,191 @@ +package tunnel + +import ( + "testing" + "time" + + "github.com/sara-star-quant/quantum-go/internal/constants" + "github.com/sara-star-quant/quantum-go/pkg/protocol" +) + +// fakeClock is a manually advanced clock for deterministic timer tests. +type fakeClock struct{ t time.Time } + +func (c *fakeClock) now() time.Time { return c.t } +func (c *fakeClock) set(t time.Time) { c.t = t } + +// epoch is a fixed base time so tests never read the wall clock. +var epoch = time.Unix(1_700_000_000, 0) + +func feed(d *dgramDriver, msgs []hsMessage) []hsMessage { + var out []hsMessage + for _, m := range msgs { + out = append(out, d.onInbound(m.typ, m.body)...) + } + return out +} + +func newDriverPair(t *testing.T) (initiator, responder *dgramDriver) { + t.Helper() + ci, err := NewSession(RoleInitiator) + if err != nil { + t.Fatalf("initiator session: %v", err) + } + ri, err := NewSession(RoleResponder) + if err != nil { + t.Fatalf("responder session: %v", err) + } + ic := &fakeClock{t: epoch} + rc := &fakeClock{t: epoch} + return newDgramDriver(ci, ic.now), newDgramDriver(ri, rc.now) +} + +// TestDgramHandshakeDriverRoundTrip runs a clean handshake through both drivers +// and checks both reach an established session. +func TestDgramHandshakeDriverRoundTrip(t *testing.T) { + initiator, responder := newDriverPair(t) + + ch, err := initiator.start() + if err != nil { + t.Fatalf("start: %v", err) + } + sh := feed(responder, ch) + cf := feed(initiator, sh) + sf := feed(responder, cf) + if out := feed(initiator, sf); len(out) != 0 { + t.Fatalf("initiator emitted %d messages on completion, want 0", len(out)) + } + + if !initiator.established() || !responder.established() { + t.Fatalf("not both established: initiator=%v responder=%v", + initiator.established(), responder.established()) + } + if initiator.session().State() != SessionStateEstablished || + responder.session().State() != SessionStateEstablished { + t.Fatal("sessions not established") + } + if !initiator.done() { + t.Fatal("initiator should be done immediately after establishment") + } +} + +// TestDgramHandshakeDriverBackoff verifies the retransmit schedule +// (500/1000/2000/4000/8000 capped) and that the handshake fails once the retry +// ceiling is exceeded. +func TestDgramHandshakeDriverBackoff(t *testing.T) { + ci, err := NewSession(RoleInitiator) + if err != nil { + t.Fatalf("session: %v", err) + } + clk := &fakeClock{t: epoch} + d := newDgramDriver(ci, clk.now) + + if _, err := d.start(); err != nil { + t.Fatalf("start: %v", err) + } + // First arm is at +500ms (the initial RTO). + if want := epoch.Add(500 * time.Millisecond); !d.nextWake().Equal(want) { + t.Fatalf("first wake = %v, want %v", d.nextWake(), want) + } + + wantRTOs := []time.Duration{1000, 2000, 4000, 8000, 8000, 8000, 8000, 8000} + var resends int + for i := range constants.DatagramHandshakeMaxRetries { + clk.set(d.nextWake()) + out := d.onTimeout() + if len(out) != 1 || out[0].typ != protocol.MessageTypeClientHello { + t.Fatalf("retransmit %d emitted %v, want one ClientHello", i, out) + } + resends++ + gotRTO := d.nextWake().Sub(clk.now()) + if gotRTO != wantRTOs[i]*time.Millisecond { + t.Fatalf("after retransmit %d, RTO = %v, want %v", i, gotRTO, wantRTOs[i]*time.Millisecond) + } + } + if resends != constants.DatagramHandshakeMaxRetries { + t.Fatalf("resends = %d, want %d", resends, constants.DatagramHandshakeMaxRetries) + } + + // One more timeout exceeds the ceiling and fails the handshake. + clk.set(d.nextWake()) + if out := d.onTimeout(); out != nil { + t.Fatalf("post-ceiling timeout emitted %v, want nil", out) + } + if !d.failed() { + t.Fatal("driver should be failed after exceeding the retry ceiling") + } + if !d.nextWake().IsZero() { + t.Fatal("failed driver should have no armed timer") + } +} + +// TestDgramHandshakeDriverResponderLinger verifies a completed responder replays +// its cached ServerFinished for a retransmitted ClientFinished, bounded by the +// replay cap, and stops once the linger window expires. +func TestDgramHandshakeDriverResponderLinger(t *testing.T) { + ci, _ := NewSession(RoleInitiator) + ri, _ := NewSession(RoleResponder) + ic := &fakeClock{t: epoch} + rc := &fakeClock{t: epoch} + initiator := newDgramDriver(ci, ic.now) + responder := newDgramDriver(ri, rc.now) + + ch, _ := initiator.start() + sh := feed(responder, ch) + cf := feed(initiator, sh) + if len(cf) != 1 || cf[0].typ != protocol.MessageTypeClientFinished { + t.Fatalf("expected one ClientFinished, got %v", cf) + } + sf := feed(responder, cf) + if len(sf) != 1 || sf[0].typ != protocol.MessageTypeServerFinished { + t.Fatalf("expected one ServerFinished, got %v", sf) + } + if !responder.established() || responder.done() { + t.Fatal("responder should be established and lingering") + } + + // Retransmitted ClientFinished replays ServerFinished, up to the cap. + for i := range constants.DatagramHandshakeLingerReplays { + out := responder.onInbound(cf[0].typ, cf[0].body) + if len(out) != 1 || out[0].typ != protocol.MessageTypeServerFinished { + t.Fatalf("linger replay %d = %v, want one ServerFinished", i, out) + } + } + // Cap exhausted: no more replays. + if out := responder.onInbound(cf[0].typ, cf[0].body); out != nil { + t.Fatalf("replay past cap = %v, want nil", out) + } + + // Linger expiry ends the driver. + rc.set(responder.nextWake()) + if out := responder.onTimeout(); out != nil { + t.Fatalf("linger-expiry timeout = %v, want nil", out) + } + if !responder.done() { + t.Fatal("responder should be done after linger expiry") + } + if out := responder.onInbound(cf[0].typ, cf[0].body); out != nil { + t.Fatalf("post-linger replay = %v, want nil", out) + } +} + +// TestDgramHandshakeDriverDuplicateNoBackoffReset verifies a duplicate ClientHello +// replays the cached ServerHello without disturbing the responder's own +// retransmit schedule. +func TestDgramHandshakeDriverDuplicateNoBackoffReset(t *testing.T) { + initiator, responder := newDriverPair(t) + + ch, _ := initiator.start() + if out := feed(responder, ch); len(out) != 1 || out[0].typ != protocol.MessageTypeServerHello { + t.Fatalf("first ClientHello -> %v, want ServerHello", out) + } + wakeBefore := responder.nextWake() + + dup := responder.onInbound(ch[0].typ, ch[0].body) + if len(dup) != 1 || dup[0].typ != protocol.MessageTypeServerHello { + t.Fatalf("duplicate ClientHello -> %v, want replayed ServerHello", dup) + } + if !responder.nextWake().Equal(wakeBefore) { + t.Fatal("duplicate ClientHello reset the responder retransmit timer") + } +} From 119f6c1a7d79470b86822dcaf248a3f7cf61e131 Mon Sep 17 00:00:00 2001 From: "Peter Z." Date: Sun, 31 May 2026 00:34:28 +0200 Subject: [PATCH 4/6] wire datagram handshake into the endpoint with retransmission and accept Adds the receive loop, index/source demux, outbound fragmentation, the per-handshake goroutine, DialDatagram, and responder accept. Drivers take configurable backoff so tests run fast; fixes the advance path resetting the RTO to the default constant instead of the configured value, and lengthens the responder linger to cover the initiator's full ClientFinished retransmit window. Adds a seeded loss/reorder/dup end-to-end test. --- internal/constants/constants.go | 5 +- pkg/tunnel/datagram.go | 80 ++++++-- pkg/tunnel/dgram_handshake_driver.go | 34 +++- pkg/tunnel/dgram_handshake_e2e_test.go | 225 ++++++++++++++++++++++ pkg/tunnel/dgram_handshake_wire.go | 251 +++++++++++++++++++++++++ 5 files changed, 567 insertions(+), 28 deletions(-) create mode 100644 pkg/tunnel/dgram_handshake_e2e_test.go create mode 100644 pkg/tunnel/dgram_handshake_wire.go diff --git a/internal/constants/constants.go b/internal/constants/constants.go index 5cbe153..17664ce 100644 --- a/internal/constants/constants.go +++ b/internal/constants/constants.go @@ -198,8 +198,9 @@ const ( // DatagramHandshakeLingerReplays bounds how many times a completed responder // will replay its cached ServerFinished for a retransmitted ClientFinished. - // It caps post-handshake reflection from a spoofed final flight. - DatagramHandshakeLingerReplays = 8 + // It caps post-handshake reflection from a spoofed final flight while staying + // above the initiator's retransmit count (with room for duplicates). + DatagramHandshakeLingerReplays = 16 // DatagramIdleTimeoutSeconds is how long a session may be idle before it is // reaped (there is no FIN over UDP; close is best-effort). diff --git a/pkg/tunnel/datagram.go b/pkg/tunnel/datagram.go index d0f5139..4f57566 100644 --- a/pkg/tunnel/datagram.go +++ b/pkg/tunnel/datagram.go @@ -18,6 +18,7 @@ import ( "errors" "net" "sync" + "time" "github.com/sara-star-quant/quantum-go/internal/constants" "github.com/sara-star-quant/quantum-go/pkg/crypto" @@ -33,10 +34,14 @@ var errConnIndexExhausted = errors.New("tunnel: could not allocate a free connec const maxIndexAllocAttempts = 100 // datagramSession is the per-session routing state held by a DatagramEndpoint, -// keyed in the registry by the connection index. The handshake and data paths -// attach the peer address and the underlying *Session as those layers are built. +// keyed in the registry by the connection index. index and inbox are immutable +// after creation (the receive loop reads only those), so the owning handshake +// goroutine and the receive loop need no shared lock; session is written once by +// the handshake goroutine and published via acceptCh / the dial return. type datagramSession struct { - index uint32 // the local connection index we assigned (peer echoes it) + index uint32 // the local connection index we assigned (peer echoes it) + inbox chan inboundMsg // reassembled handshake messages for this session's goroutine + session *Session // set once the handshake establishes } // connRegistry maps the random connection indices we assign to their sessions @@ -48,12 +53,14 @@ type datagramSession struct { type connRegistry struct { mu sync.Mutex byIndex map[uint32]*datagramSession - halfOpen map[string]int // source identity -> count of un-established sessions + bySource map[string]*datagramSession // in-progress responder handshakes, keyed by source + halfOpen map[string]int // source identity -> count of un-established sessions } func newConnRegistry() *connRegistry { return &connRegistry{ byIndex: make(map[uint32]*datagramSession), + bySource: make(map[string]*datagramSession), halfOpen: make(map[string]int), } } @@ -138,6 +145,29 @@ func (r *connRegistry) releaseHalfOpen(source string) { } } +// addSource registers an in-progress responder handshake under its source so a +// retransmitted ClientHello (which still carries RecvIndex 0) routes to it. +func (r *connRegistry) addSource(source string, s *datagramSession) { + r.mu.Lock() + defer r.mu.Unlock() + r.bySource[source] = s +} + +// lookupSource returns the in-progress responder handshake for a source, or nil. +func (r *connRegistry) lookupSource(source string) *datagramSession { + r.mu.Lock() + defer r.mu.Unlock() + return r.bySource[source] +} + +// removeSource drops the source mapping (on establishment or teardown). It is +// idempotent. +func (r *connRegistry) removeSource(source string) { + r.mu.Lock() + defer r.mu.Unlock() + delete(r.bySource, source) +} + // DatagramEndpoint is a connectionless UDP transport over a single PacketConn. // It demultiplexes incoming datagrams to per-session state by connection index // and surfaces newly-established inbound sessions on an accept channel. @@ -147,6 +177,11 @@ type DatagramEndpoint struct { reasm *Reassembler acceptCh chan *datagramSession + // rtoInitial/rtoMax bound handshake retransmission backoff; defaulted from + // constants, overridable (in-package) for fast tests. + rtoInitial time.Duration + rtoMax time.Duration + closeOnce sync.Once done chan struct{} } @@ -156,11 +191,13 @@ type DatagramEndpoint struct { // start it explicitly so tests can drive routing deterministically. func NewDatagramEndpoint(conn net.PacketConn) *DatagramEndpoint { return &DatagramEndpoint{ - conn: conn, - registry: newConnRegistry(), - reasm: NewReassembler(0, 0, 0), - acceptCh: make(chan *datagramSession, 16), - done: make(chan struct{}), + conn: conn, + registry: newConnRegistry(), + reasm: NewReassembler(0, 0, 0), + acceptCh: make(chan *datagramSession, 16), + rtoInitial: time.Duration(constants.DatagramHandshakeInitialTimeoutMillis) * time.Millisecond, + rtoMax: time.Duration(constants.DatagramHandshakeMaxTimeoutMillis) * time.Millisecond, + done: make(chan struct{}), } } @@ -175,6 +212,22 @@ func (e *DatagramEndpoint) Close() error { return err } +// Serve runs the receive loop: it reads datagrams and dispatches each through +// routeDatagram until the endpoint is closed. Callers run it in their own +// goroutine (go ep.Serve()). It returns when the underlying conn is closed. +func (e *DatagramEndpoint) Serve() { + buf := make([]byte, constants.DatagramMTU+512) + for { + n, src, err := e.conn.ReadFrom(buf) + if err != nil { + return + } + data := make([]byte, n) + copy(data, buf[:n]) + _ = e.routeDatagram(src, data) + } +} + // routeDatagram parses one received datagram and dispatches it. It is the demux // seam: handshake frames go to the reassembler + handshake FSM, data frames are // looked up by connection index and handed to the session's datagram recv path, @@ -194,17 +247,12 @@ func (e *DatagramEndpoint) routeDatagram(src net.Addr, data []byte) error { if perr != nil { return perr } - _, complete, aerr := e.reasm.Add(src.String(), h, fragment) + msg, complete, aerr := e.reasm.Add(src.String(), h, fragment) if aerr != nil { return aerr } if complete { - // TODO: gate new half-open state through registry.tryAddHalfOpen( - // src.String()) (releasing on establishment or teardown) before - // advancing, then feed the reassembled message into the bilateral - // handshake FSM (dgram_handshake.go); on completion register the session - // and surface it on acceptCh. - _ = h + e.deliverHandshake(src, h, msg) } return nil case protocol.DatagramFrameData: diff --git a/pkg/tunnel/dgram_handshake_driver.go b/pkg/tunnel/dgram_handshake_driver.go index ae653af..6282f8a 100644 --- a/pkg/tunnel/dgram_handshake_driver.go +++ b/pkg/tunnel/dgram_handshake_driver.go @@ -32,8 +32,10 @@ type dgramDriver struct { fsm *dgramHandshake now func() time.Time - rto time.Duration // current retransmit timeout - retries int + rtoInitial time.Duration // first retransmit timeout + rtoMax time.Duration // backoff cap + rto time.Duration // current retransmit timeout + retries int // retransmitAt is when to resend the cached flight (zero = disarmed). retransmitAt time.Time @@ -53,16 +55,25 @@ func maxRTO() time.Duration { return time.Duration(constants.DatagramHandshakeMaxTimeoutMillis) * time.Millisecond } -// newDgramDriver builds a driver for session. now may be nil for the real clock. +// newDgramDriver builds a driver for session with the default backoff. now may be +// nil for the real clock. func newDgramDriver(session *Session, now func() time.Time) *dgramDriver { + return newDgramDriverWithRTO(session, now, initialRTO(), maxRTO()) +} + +// newDgramDriverWithRTO builds a driver with explicit backoff bounds (tests pass +// short values for fast, deterministic runs; production uses the defaults). +func newDgramDriverWithRTO(session *Session, now func() time.Time, rtoInitial, rtoMax time.Duration) *dgramDriver { if now == nil { now = time.Now } return &dgramDriver{ - fsm: newDgramHandshake(session), - now: now, - rto: initialRTO(), - status: driverRunning, + fsm: newDgramHandshake(session), + now: now, + rtoInitial: rtoInitial, + rtoMax: rtoMax, + rto: rtoInitial, + status: driverRunning, } } @@ -99,7 +110,7 @@ func (d *dgramDriver) onInbound(typ protocol.MessageType, body []byte) []hsMessa return d.onComplete(out) } if advanced { - d.rto = initialRTO() + d.rto = d.rtoInitial d.retries = 0 d.armRetransmit() return []hsMessage{*out} @@ -123,7 +134,10 @@ func (d *dgramDriver) onComplete(final *hsMessage) []hsMessage { d.status = driverEstablished d.retransmitAt = time.Time{} if d.fsm.role == RoleResponder { - d.lingerAt = d.now().Add(maxRTO()) + // Linger long enough to cover the initiator's whole ClientFinished + // retransmit window (it backs off up to rtoMax for MaxRetries tries), so a + // lost ServerFinished is recovered by replay rather than timing out. + d.lingerAt = d.now().Add(constants.DatagramHandshakeMaxRetries * d.rtoMax) d.replaysLeft = constants.DatagramHandshakeLingerReplays } if final != nil { @@ -150,7 +164,7 @@ func (d *dgramDriver) onTimeout() []hsMessage { d.retransmitAt = time.Time{} return nil } - d.rto = min(d.rto*2, maxRTO()) + d.rto = min(d.rto*2, d.rtoMax) d.armRetransmit() if d.fsm.cached != nil { return []hsMessage{*d.fsm.cached} diff --git a/pkg/tunnel/dgram_handshake_e2e_test.go b/pkg/tunnel/dgram_handshake_e2e_test.go new file mode 100644 index 0000000..5946d07 --- /dev/null +++ b/pkg/tunnel/dgram_handshake_e2e_test.go @@ -0,0 +1,225 @@ +package tunnel + +import ( + "bytes" + "fmt" + "math/rand/v2" + "net" + "sync" + "testing" + "time" +) + +// memAddr is a stable in-memory packet address. +type memAddr struct{ name string } + +func (a memAddr) Network() string { return "mempipe" } +func (a memAddr) String() string { return a.name } + +// memPacket is one datagram in flight. +type memPacket struct { + data []byte + src net.Addr +} + +// faultModel deterministically drops, duplicates, and reorders datagrams on the +// send path using a seeded PRNG, so a given seed reproduces a run exactly. +type faultModel struct { + mu sync.Mutex + rng *rand.Rand + dropP, dupP, reorderP float64 + held []byte // a single packet held back to reorder with the next +} + +func newFaultModel(seed uint64, drop, dup, reorder float64) *faultModel { + return &faultModel{ + rng: rand.New(rand.NewPCG(seed, seed+0x9e3779b9)), + dropP: drop, + dupP: dup, + reorderP: reorder, + } +} + +func (f *faultModel) emit(peer *memPacketConn, data []byte, src net.Addr) { + f.mu.Lock() + defer f.mu.Unlock() + + if f.rng.Float64() < f.dropP { + return // lost + } + if f.held == nil && f.rng.Float64() < f.reorderP { + f.held = data // hold it back; the next packet overtakes it + return + } + if f.held != nil { + held := f.held + f.held = nil + f.deliver(peer, data, src) // newer packet first + f.deliver(peer, held, src) // then the held (reordered) one + return + } + f.deliver(peer, data, src) +} + +func (f *faultModel) deliver(peer *memPacketConn, data []byte, src net.Addr) { + n := 1 + if f.rng.Float64() < f.dupP { + n = 2 + } + for range n { + peer.enqueue(memPacket{data: data, src: src}) + } +} + +// memPacketConn is one end of an in-memory net.PacketConn pair. +type memPacketConn struct { + addr memAddr + peer *memPacketConn + fault *faultModel + in chan memPacket + closed chan struct{} + closeOnce sync.Once +} + +func newMemPacketConn(name string, fault *faultModel) *memPacketConn { + return &memPacketConn{ + addr: memAddr{name: name}, + fault: fault, + in: make(chan memPacket, 256), + closed: make(chan struct{}), + } +} + +func (c *memPacketConn) enqueue(p memPacket) { + select { + case c.in <- p: + case <-c.closed: + default: // full: drop (an additional, rare loss) + } +} + +func (c *memPacketConn) ReadFrom(p []byte) (int, net.Addr, error) { + select { + case pkt := <-c.in: + return copy(p, pkt.data), pkt.src, nil + case <-c.closed: + return 0, nil, net.ErrClosed + } +} + +func (c *memPacketConn) WriteTo(p []byte, _ net.Addr) (int, error) { + select { + case <-c.closed: + return 0, net.ErrClosed + default: + } + c.fault.emit(c.peer, append([]byte(nil), p...), c.addr) + return len(p), nil +} + +func (c *memPacketConn) Close() error { + c.closeOnce.Do(func() { close(c.closed) }) + return nil +} + +func (c *memPacketConn) LocalAddr() net.Addr { return c.addr } +func (c *memPacketConn) SetDeadline(time.Time) error { return nil } +func (c *memPacketConn) SetReadDeadline(time.Time) error { return nil } +func (c *memPacketConn) SetWriteDeadline(time.Time) error { return nil } + +// memPipe wires two endpoints together with independent per-direction faults. +func memPipe(seed uint64, drop, dup, reorder float64) (a, b *memPacketConn) { + a = newMemPacketConn("A", newFaultModel(seed, drop, dup, reorder)) + b = newMemPacketConn("B", newFaultModel(seed+1, drop, dup, reorder)) + a.peer, b.peer = b, a + return a, b +} + +// TestDgramHandshakeE2E runs the full datagram handshake over an in-memory +// transport that drops, duplicates, and reorders datagrams, across many seeds. It +// asserts the handshake completes, the responder surfaces the session on its +// accept channel, and the two sides derive matching directional keys (proven by a +// bidirectional encrypt/decrypt round-trip). +func TestDgramHandshakeE2E(t *testing.T) { + const ( + drop = 0.2 + dup = 0.1 + reorder = 0.15 + ) + for seed := uint64(1); seed <= 24; seed++ { + t.Run(fmt.Sprintf("seed-%d", seed), func(t *testing.T) { + runE2E(t, seed, drop, dup, reorder) + }) + } +} + +func runE2E(t *testing.T, seed uint64, drop, dup, reorder float64) { + t.Helper() + connA, connB := memPipe(seed, drop, dup, reorder) + epA := NewDatagramEndpoint(connA) + epB := NewDatagramEndpoint(connB) + for _, ep := range []*DatagramEndpoint{epA, epB} { + ep.rtoInitial = 2 * time.Millisecond + ep.rtoMax = 20 * time.Millisecond + } + go epA.Serve() + go epB.Serve() + defer func() { _ = epA.Close() }() + defer func() { _ = epB.Close() }() + + type dialResult struct { + s *Session + err error + } + dialCh := make(chan dialResult, 1) + go func() { + s, err := DialDatagram(epA, connB.addr) + dialCh <- dialResult{s, err} + }() + + var server *Session + select { + case ds := <-epB.acceptCh: + server = ds.session + case <-time.After(5 * time.Second): + t.Fatal("responder did not surface a session") + } + + var client *Session + select { + case r := <-dialCh: + if r.err != nil { + t.Fatalf("dial: %v", r.err) + } + client = r.s + case <-time.After(5 * time.Second): + t.Fatal("dial did not complete") + } + + if client == nil || server == nil { + t.Fatal("nil session(s) after handshake") + } + if client.State() != SessionStateEstablished || server.State() != SessionStateEstablished { + t.Fatalf("not established: client=%v server=%v", client.State(), server.State()) + } + assertDataRoundTrip(t, client, server) + assertDataRoundTrip(t, server, client) +} + +// assertDataRoundTrip encrypts on from and decrypts on to, proving their +// directional keys agree. +func assertDataRoundTrip(t *testing.T, from, to *Session) { + t.Helper() + plaintext := []byte("quantum-go datagram payload") + ct, seq, err := from.Encrypt(plaintext) + if err != nil { + t.Fatalf("encrypt: %v", err) + } + got, err := to.Decrypt(ct, seq) + if err != nil { + t.Fatalf("decrypt: %v", err) + } + if !bytes.Equal(got, plaintext) { + t.Fatalf("round-trip mismatch: got %q want %q", got, plaintext) + } +} diff --git a/pkg/tunnel/dgram_handshake_wire.go b/pkg/tunnel/dgram_handshake_wire.go new file mode 100644 index 0000000..efe85ca --- /dev/null +++ b/pkg/tunnel/dgram_handshake_wire.go @@ -0,0 +1,251 @@ +// Package tunnel - dgram_handshake_wire.go connects the reliability driver +// (dgram_handshake_driver.go) to a real DatagramEndpoint: it owns one goroutine +// per in-progress handshake, fragments outbound flights onto the PacketConn, and +// demultiplexes reassembled inbound messages from the receive loop to the right +// handshake by connection index (with a source-keyed bootstrap for the first +// ClientHello, which still carries RecvIndex 0). +package tunnel + +import ( + "errors" + "net" + "time" + + "github.com/sara-star-quant/quantum-go/pkg/protocol" +) + +// inboxCap buffers a handful of reassembled messages per handshake; the receive +// loop forwards non-blocking, so overflow just drops and the peer retransmits. +const inboxCap = 8 + +var ( + errHandshakeFailed = errors.New("tunnel: datagram handshake failed") + errEndpointClosed = errors.New("tunnel: datagram endpoint closed") +) + +// inboundMsg is one reassembled handshake message handed from the receive loop to +// a handshake goroutine. sender is the peer's SenderIndex, which the initiator +// uses to learn the responder's connection index. +type inboundMsg struct { + typ protocol.MessageType + body []byte + sender uint32 +} + +// trySend forwards without blocking; a full inbox drops (the sender retransmits). +func trySend(ch chan inboundMsg, m inboundMsg) { + select { + case ch <- m: + default: + } +} + +// deliverHandshake routes one fully reassembled handshake message. A frame that +// names our connection index goes to that handshake's inbox; a ClientHello with +// RecvIndex 0 goes to the in-progress responder for its source, or starts one +// (gated by the half-open cap) - the gate precedes any decapsulation, so we never +// do CH-KEM work or emit a ServerHello for an unvalidated source until its full +// ClientHello has arrived. +func (e *DatagramEndpoint) deliverHandshake(src net.Addr, h protocol.DatagramHandshakeHeader, msg []byte) { + in := inboundMsg{typ: h.MsgType, body: msg, sender: h.SenderIndex} + + if h.RecvIndex != 0 { + if ds := e.registry.lookup(h.RecvIndex); ds != nil && ds.inbox != nil { + trySend(ds.inbox, in) + } + return + } + + // RecvIndex 0: only a ClientHello bootstraps a responder. + if h.MsgType != protocol.MessageTypeClientHello { + return + } + if ds := e.registry.lookupSource(src.String()); ds != nil { + trySend(ds.inbox, in) + return + } + if !e.registry.tryAddHalfOpen(src.String()) { + return + } + e.startResponder(src, in) +} + +// startResponder allocates the responder's connection index, registers it (in +// byIndex so the ClientFinished routes, and in bySource so ClientHello +// retransmits route), and runs the handshake in its own goroutine. +func (e *DatagramEndpoint) startResponder(src net.Addr, first inboundMsg) { + session, err := NewSession(RoleResponder) + if err != nil { + e.registry.releaseHalfOpen(src.String()) + return + } + ds := &datagramSession{inbox: make(chan inboundMsg, inboxCap)} + idx, err := e.registry.add(ds) + if err != nil { + e.registry.releaseHalfOpen(src.String()) + return + } + e.registry.addSource(src.String(), ds) + + l := &handshakeLoop{ + ep: e, + ds: ds, + src: src, + driver: newDgramDriverWithRTO(session, nil, e.rtoInitial, e.rtoMax), + localIndex: idx, + peerIndex: first.sender, // the initiator's index, echoed as RecvIndex in our replies + established: func() { e.surface(ds) }, + } + trySend(ds.inbox, first) + go func() { _, _ = l.run() }() +} + +// DialDatagram performs the initiator handshake to dst over ep (whose Serve loop +// must be running) and returns the established session. +func DialDatagram(ep *DatagramEndpoint, dst net.Addr) (*Session, error) { + session, err := NewSession(RoleInitiator) + if err != nil { + return nil, err + } + ds := &datagramSession{inbox: make(chan inboundMsg, inboxCap)} + idx, err := ep.registry.add(ds) + if err != nil { + return nil, err + } + l := &handshakeLoop{ + ep: ep, + ds: ds, + src: dst, + driver: newDgramDriverWithRTO(session, nil, ep.rtoInitial, ep.rtoMax), + localIndex: idx, + } + s, err := l.run() + if err != nil { + ep.registry.remove(idx) + return nil, err + } + return s, nil +} + +// surface offers an established inbound session on the accept channel without +// blocking the handshake goroutine. +func (e *DatagramEndpoint) surface(ds *datagramSession) { + select { + case e.acceptCh <- ds: + default: + } +} + +// handshakeLoop owns one in-progress handshake: its driver, connection indices, +// the outbound sequence counter, and the socket sends. Exactly one goroutine runs +// it, so its fields need no locking. +type handshakeLoop struct { + ep *DatagramEndpoint + ds *datagramSession + src net.Addr + driver *dgramDriver + localIndex uint32 + peerIndex uint32 + seq uint64 + + // established, if set, is called once when the session is keyed (responders + // surface it on acceptCh). Initiators leave it nil and take the session from + // run's return value. + established func() + surfaced bool +} + +// run drives the handshake to completion or failure. The initiator sends the +// first ClientHello; both roles then loop on {inbound, retransmit timer, close}, +// retransmitting on silence and (responder) lingering after completion to answer a +// retransmitted ClientFinished. +func (l *handshakeLoop) run() (*Session, error) { + if l.driver.fsm.role == RoleInitiator { + msgs, err := l.driver.start() + if err != nil { + return nil, err + } + l.send(msgs) + } + + timer := time.NewTimer(time.Hour) + defer timer.Stop() + + for !l.driver.done() { + armTimer(timer, l.driver.nextWake().Sub(l.driver.now())) + select { + case m := <-l.ds.inbox: + if m.sender != 0 { + l.peerIndex = m.sender + } + l.send(l.driver.onInbound(m.typ, m.body)) + case <-timer.C: + l.send(l.driver.onTimeout()) + case <-l.ep.done: + return nil, errEndpointClosed + } + l.maybeSurface() + } + + if l.driver.failed() { + l.ep.registry.removeSource(l.src.String()) + l.ep.registry.releaseHalfOpen(l.src.String()) + l.ep.registry.remove(l.localIndex) + return nil, errHandshakeFailed + } + return l.ds.session, nil +} + +// maybeSurface publishes the session the instant it is established (so a responder +// surfaces it on acceptCh without waiting out its linger window) and frees the +// half-open slot exactly once. +func (l *handshakeLoop) maybeSurface() { + if l.surfaced || !l.driver.established() { + return + } + l.surfaced = true + l.ds.session = l.driver.session() + l.ep.registry.removeSource(l.src.String()) + l.ep.registry.releaseHalfOpen(l.src.String()) + if l.established != nil { + l.established() + } +} + +// send fragments each outbound message onto the PacketConn, advancing the global +// sequence counter across all frames. +func (l *handshakeLoop) send(msgs []hsMessage) { + for _, m := range msgs { + base := protocol.DatagramHandshakeHeader{ + DatagramHeader: protocol.DatagramHeader{ + Type: protocol.DatagramFrameHandshake, + RecvIndex: l.peerIndex, + Seq: l.seq, + }, + SenderIndex: l.localIndex, + MsgType: m.typ, + } + frames, err := fragmentHandshake(base, m.body) + if err != nil { + return + } + for _, f := range frames { + _, _ = l.ep.conn.WriteTo(f, l.src) + } + l.seq += uint64(len(frames)) + } +} + +// armTimer resets t to fire after d (clamped to >= 0), draining any stale tick. +func armTimer(t *time.Timer, d time.Duration) { + if !t.Stop() { + select { + case <-t.C: + default: + } + } + if d < 0 { + d = 0 + } + t.Reset(d) +} From 4ce95392accda5ee867f2d5697188e86989886cc Mon Sep 17 00:00:00 2001 From: "Peter Z." Date: Sun, 31 May 2026 01:01:46 +0200 Subject: [PATCH 5/6] add regression tests for the datagram handshake rto and linger bugs Pins the two bugs the fault-injection sweep surfaced: the advance path must re-arm at the configured rtoInitial, and the responder linger must span the initiator's full retransmit window. Adds a third test that the endpoint hands its configured backoff to every driver, and folds the duplicated driver construction into a newDriver helper so the wiring has one tested seam. --- pkg/tunnel/dgram_handshake_driver_test.go | 109 ++++++++++++++++++++++ pkg/tunnel/dgram_handshake_wire.go | 11 ++- 2 files changed, 118 insertions(+), 2 deletions(-) diff --git a/pkg/tunnel/dgram_handshake_driver_test.go b/pkg/tunnel/dgram_handshake_driver_test.go index 0ab3c4c..850e165 100644 --- a/pkg/tunnel/dgram_handshake_driver_test.go +++ b/pkg/tunnel/dgram_handshake_driver_test.go @@ -189,3 +189,112 @@ func TestDgramHandshakeDriverDuplicateNoBackoffReset(t *testing.T) { t.Fatal("duplicate ClientHello reset the responder retransmit timer") } } + +// newDriverPairRTO pairs an initiator and responder driver with non-default +// backoff. The non-default values are the point: they differ from the package +// constants, so a path that reads a constant where the configured value belongs +// shows up. Both clocks start at epoch and are advanced explicitly. +func newDriverPairRTO(t *testing.T, rtoInitial, rtoMax time.Duration) (ini, res *dgramDriver, iclk, rclk *fakeClock) { + t.Helper() + ci, err := NewSession(RoleInitiator) + if err != nil { + t.Fatalf("initiator session: %v", err) + } + ri, err := NewSession(RoleResponder) + if err != nil { + t.Fatalf("responder session: %v", err) + } + iclk = &fakeClock{t: epoch} + rclk = &fakeClock{t: epoch} + return newDgramDriverWithRTO(ci, iclk.now, rtoInitial, rtoMax), + newDgramDriverWithRTO(ri, rclk.now, rtoInitial, rtoMax), iclk, rclk +} + +// TestDgramHandshakeDriverAdvanceKeepsConfiguredRTO is a regression test for the +// RTO reset bug: advancing to the next flight must re-arm the retransmit timer at +// the configured rtoInitial, not the default package constant. The existing +// backoff test missed this because it uses the default driver, where the two are +// equal. +func TestDgramHandshakeDriverAdvanceKeepsConfiguredRTO(t *testing.T) { + const rtoInitial = 5 * time.Millisecond + const rtoMax = 50 * time.Millisecond + ini, res, iclk, _ := newDriverPairRTO(t, rtoInitial, rtoMax) + + ch, err := ini.start() + if err != nil { + t.Fatalf("start: %v", err) + } + sh := feed(res, ch) + if cf := feed(ini, sh); len(cf) != 1 || cf[0].typ != protocol.MessageTypeClientFinished { + t.Fatalf("initiator did not advance to ClientFinished: %v", cf) + } + + got := ini.nextWake().Sub(iclk.now()) + dflt := time.Duration(constants.DatagramHandshakeInitialTimeoutMillis) * time.Millisecond + if got != rtoInitial { + t.Fatalf("post-advance RTO = %v, want configured %v (a value of %v means it reverted to the default constant)", + got, rtoInitial, dflt) + } +} + +// TestDgramHandshakeDriverLingerCoversRetransmitWindow is a regression test for +// the short-linger bug: a completed responder must keep answering ClientFinished +// retransmits for the initiator's whole retransmit window, not one RTO, so a lost +// final ServerFinished is recovered by replay. +func TestDgramHandshakeDriverLingerCoversRetransmitWindow(t *testing.T) { + const rtoInitial = 5 * time.Millisecond + const rtoMax = 50 * time.Millisecond + ini, res, _, rclk := newDriverPairRTO(t, rtoInitial, rtoMax) + + ch, _ := ini.start() + sh := feed(res, ch) + cf := feed(ini, sh) + if sf := feed(res, cf); len(sf) != 1 || sf[0].typ != protocol.MessageTypeServerFinished { + t.Fatalf("responder did not complete: %v", sf) + } + if !res.established() { + t.Fatal("responder not established") + } + + // Linger must span the initiator's full ClientFinished retransmit window + // (MaxRetries backoffs capped at rtoMax), well beyond the one-RTO old value. + got := res.nextWake().Sub(rclk.now()) + want := constants.DatagramHandshakeMaxRetries * rtoMax + if got != want { + t.Fatalf("linger window = %v, want MaxRetries*rtoMax = %v (one rtoMax %v was the buggy value)", got, want, rtoMax) + } + + // A ClientFinished arriving late but within the window still replays SF. + rclk.set(res.nextWake().Add(-time.Millisecond)) + if out := res.onInbound(cf[0].typ, cf[0].body); len(out) != 1 || out[0].typ != protocol.MessageTypeServerFinished { + t.Fatalf("late-but-in-window ClientFinished did not replay ServerFinished: %v", out) + } + + // At the window edge the linger ends. + rclk.set(res.nextWake()) + _ = res.onTimeout() + if !res.done() { + t.Fatal("responder still lingering after the window expired") + } +} + +// TestDatagramEndpointPropagatesRTO guards the wiring class of the RTO bug: the +// endpoint must build drivers carrying its configured backoff. Without this, a +// change that built drivers with the defaults would silently run at 500ms RTOs, +// and neither the driver tests nor the e2e (which would just pass, slowly) would +// catch it. +func TestDatagramEndpointPropagatesRTO(t *testing.T) { + ep := NewDatagramEndpoint(nil) + ep.rtoInitial = 7 * time.Millisecond + ep.rtoMax = 70 * time.Millisecond + + s, err := NewSession(RoleInitiator) + if err != nil { + t.Fatalf("session: %v", err) + } + d := ep.newDriver(s) + if d.rtoInitial != ep.rtoInitial || d.rtoMax != ep.rtoMax { + t.Fatalf("driver RTO = (%v, %v), want endpoint config (%v, %v)", + d.rtoInitial, d.rtoMax, ep.rtoInitial, ep.rtoMax) + } +} diff --git a/pkg/tunnel/dgram_handshake_wire.go b/pkg/tunnel/dgram_handshake_wire.go index efe85ca..a03f202 100644 --- a/pkg/tunnel/dgram_handshake_wire.go +++ b/pkg/tunnel/dgram_handshake_wire.go @@ -91,7 +91,7 @@ func (e *DatagramEndpoint) startResponder(src net.Addr, first inboundMsg) { ep: e, ds: ds, src: src, - driver: newDgramDriverWithRTO(session, nil, e.rtoInitial, e.rtoMax), + driver: e.newDriver(session), localIndex: idx, peerIndex: first.sender, // the initiator's index, echoed as RecvIndex in our replies established: func() { e.surface(ds) }, @@ -116,7 +116,7 @@ func DialDatagram(ep *DatagramEndpoint, dst net.Addr) (*Session, error) { ep: ep, ds: ds, src: dst, - driver: newDgramDriverWithRTO(session, nil, ep.rtoInitial, ep.rtoMax), + driver: ep.newDriver(session), localIndex: idx, } s, err := l.run() @@ -127,6 +127,13 @@ func DialDatagram(ep *DatagramEndpoint, dst net.Addr) (*Session, error) { return s, nil } +// newDriver builds a handshake driver carrying this endpoint's configured +// retransmission backoff. Both the dial and responder paths go through here so the +// configured RTO cannot be dropped on the way to the driver. +func (e *DatagramEndpoint) newDriver(session *Session) *dgramDriver { + return newDgramDriverWithRTO(session, nil, e.rtoInitial, e.rtoMax) +} + // surface offers an established inbound session on the accept channel without // blocking the handshake goroutine. func (e *DatagramEndpoint) surface(ds *datagramSession) { From 2bbfc21a1c0fa1c85182a4e748af9984162b7260 Mon Sep 17 00:00:00 2001 From: "Peter Z." Date: Sun, 31 May 2026 01:15:27 +0200 Subject: [PATCH 6/6] document the udp datagram handshake and add a datagram handshake benchmark Updates README, CHANGELOG, the datagram design doc, and the CLI reference to record that the datagram handshake now works while the encrypted data path does not yet. Adds quantum-vpn bench --datagram-handshakes N, which measures the datagram handshake rate over loopback UDP: around 1,300/sec on an M1 Pro versus 1,450/sec for the stream path, and computes the benchmark banner padding so the box aligns. --- CHANGELOG.md | 8 + README.md | 20 +- cmd/quantum-vpn/bench.go | 82 +++++++- cmd/quantum-vpn/main.go | 10 +- docs/datagram-transport.md | 44 ++-- docs/udp-datagram-transport-plan.md | 316 ---------------------------- docs/usage/CLI.md | 17 +- 7 files changed, 135 insertions(+), 362 deletions(-) delete mode 100644 docs/udp-datagram-transport-plan.md diff --git a/CHANGELOG.md b/CHANGELOG.md index fccf547..8509bab 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased][] +### Added +- **UDP/datagram transport (handshake)**: a connectionless transport alongside the TCP/stream one, demultiplexed by a random per-session connection index rather than source address (survives NAT rebind and roaming). It carries the full CH-KEM handshake over a lossy link: the large post-quantum Hellos are fragmented across datagrams and reassembled (`pkg/tunnel/reassembly.go`), and a transport-agnostic state machine plus reliability driver (`pkg/tunnel/dgram_handshake_{fsm,driver,wire}.go`) add retransmission with exponential backoff, a retry ceiling, duplicate/replay handling, and a responder linger that recovers a lost final flight. A bad or forged datagram drops rather than failing the handshake. The responder runs no decapsulation and sends no ServerHello until a full ClientHello arrives and a per-source half-open slot is granted, so it never sends more than it received from an unvalidated source. `DialDatagram` performs the initiator handshake and returns an established session. +- **Datagram handshake benchmark**: `quantum-vpn bench --datagram-handshakes N` measures the datagram handshake rate over loopback UDP (~1,300/sec, ~760 µs each on an M1 Pro, vs ~1,450/sec for the stream path). + +### Not yet implemented (datagram) +- The encrypted data path (epoch-keyed AEAD over DATA frames) and authenticated CLOSE, so there is no datagram throughput number yet. +- The stateless cookie/RETRY anti-amplification exchange and connection roaming. + ### Planned: v0.0.11 - Security Hardening (carryover) - Handshake timeout on server Accept - Module integrity verification (fix always-true check) diff --git a/README.md b/README.md index a80cca4..4229104 100644 --- a/README.md +++ b/README.md @@ -90,19 +90,23 @@ See [Quick Start Guide](docs/usage/QUICKSTART.md) for detailed examples. Hardware-accelerated where available (ARMv8 Crypto Extensions on Apple Silicon; AES-NI / AVX2 / hardware SHA-3 on x86-64). Go 1.26.3 (Green Tea GC). -The transport is **TCP/stream only** (length-prefixed framing); UDP is not currently -supported. Two distinct numbers matter: the raw AEAD cipher rate, and the rate actually -achieved end-to-end through a single tunnel (lower, currently allocation-bound; zero-copy -data-plane work is tracked on the [roadmap](docs/ROADMAP.md)). +The stream transport is **TCP** (length-prefixed framing). A connectionless +**UDP/datagram** transport is in progress: its handshake is implemented (fragmented PQ +Hellos, retransmission with backoff, replay of cached flights), with the encrypted data +path not yet landed, so only the datagram handshake rate appears below. For the stream +path, two distinct numbers matter: the raw AEAD cipher rate, and the rate actually achieved +end-to-end through a single tunnel (lower, currently allocation-bound; zero-copy data-plane +work is tracked on the [roadmap](docs/ROADMAP.md)). -**Measured (Apple M1 Pro, Go 1.26.3, loopback TCP):** +**Measured (Apple M1 Pro, Go 1.26.3, loopback):** | Metric | Result | |--------|--------| | AES-256-GCM cipher (raw AEAD, single core) | ~2.5 GB/s | | ChaCha20-Poly1305 cipher (raw AEAD) | ~0.7 GB/s | -| Handshakes/sec (full CH-KEM, sequential) | ~1,450 (~670 µs each) | -| Single-tunnel throughput (AES-GCM, end-to-end) | ~690 MB/s (5.5 Gb/s), sustained across rekeys | +| Handshakes/sec (stream/TCP, full CH-KEM, sequential) | ~1,450 (~670 µs each) | +| Handshakes/sec (datagram/UDP, full CH-KEM, sequential) | ~1,300 (~760 µs each) | +| Single-tunnel throughput (stream/TCP, AES-GCM, end-to-end) | ~690 MB/s (5.5 Gb/s), sustained across rekeys | **Estimated on other hardware** (extrapolated from cipher throughput; not yet independently measured; run the benchmark to verify): @@ -113,7 +117,7 @@ independently measured; run the benchmark to verify): | Mid-range server (Xeon Silver) | 4-7 GB/s | | Enterprise (Xeon Platinum / EPYC) | 8-12 GB/s | -Run `quantum-vpn bench --handshakes N --throughput` on your target hardware. See [CLI Reference](docs/usage/CLI.md#benchmark-mode). +Run `quantum-vpn bench --handshakes N --datagram-handshakes N --throughput` on your target hardware. See [CLI Reference](docs/usage/CLI.md#benchmark-mode). ## Contributing diff --git a/cmd/quantum-vpn/bench.go b/cmd/quantum-vpn/bench.go index 69a62b5..16fd484 100644 --- a/cmd/quantum-vpn/bench.go +++ b/cmd/quantum-vpn/bench.go @@ -2,6 +2,7 @@ package main import ( "fmt" + "net" "os" "strings" "sync" @@ -11,15 +12,16 @@ import ( "github.com/sara-star-quant/quantum-go/pkg/tunnel" ) -func runBench(handshakes int, throughputTest bool, sizeStr, durationStr, cipherSuite string) { - fmt.Println("╔═══════════════════════════════════════════════════════════╗") - fmt.Println("║ Quantum-Resistant VPN Benchmark ║") - fmt.Println("║ CH-KEM: ML-KEM-1024 + X25519 ║") - fmt.Println("╚═══════════════════════════════════════════════════════════╝") +func runBench(handshakes, datagramHandshakes int, throughputTest bool, sizeStr, durationStr, cipherSuite string) { + const bannerWidth = 59 + fmt.Println("╔" + strings.Repeat("═", bannerWidth) + "╗") + fmt.Printf("║%-*s║\n", bannerWidth, " Quantum-Resistant VPN Benchmark") + fmt.Printf("║%-*s║\n", bannerWidth, " CH-KEM: ML-KEM-1024 + X25519") + fmt.Println("╚" + strings.Repeat("═", bannerWidth) + "╝") fmt.Println() - if handshakes == 0 && !throughputTest { - fmt.Println("No benchmarks specified. Use --handshakes or --throughput") + if handshakes == 0 && datagramHandshakes == 0 && !throughputTest { + fmt.Println("No benchmarks specified. Use --handshakes, --datagram-handshakes, or --throughput") fmt.Println("Run 'quantum-vpn bench --help' for usage") os.Exit(1) } @@ -29,6 +31,11 @@ func runBench(handshakes int, throughputTest bool, sizeStr, durationStr, cipherS fmt.Println() } + if datagramHandshakes > 0 { + benchDatagramHandshakes(datagramHandshakes) + fmt.Println() + } + if throughputTest { size := parseSize(sizeStr) duration := parseDuration(durationStr) @@ -102,6 +109,67 @@ func benchHandshakes(count int) { printHandshakeResults(count, successCount, errors, totalTime, durations) } +// benchDatagramHandshakes measures full CH-KEM handshakes over the connectionless +// UDP transport on loopback: it dials sequentially through DialDatagram, which +// fragments the post-quantum Hellos and drives retransmission, so the result is +// directly comparable to the stream handshake number. The encrypted datagram data +// path is not implemented yet, so there is no datagram throughput benchmark. +func benchDatagramHandshakes(count int) { + fmt.Printf("Benchmarking Datagram Handshakes (%d iterations)\n", count) + fmt.Println(strings.Repeat("─", 60)) + + responderConn, err := net.ListenPacket("udp", "127.0.0.1:0") + if err != nil { + fmt.Fprintf(os.Stderr, "Error: Failed to open responder socket: %v\n", err) + os.Exit(1) + } + responder := tunnel.NewDatagramEndpoint(responderConn) + go responder.Serve() + defer func() { _ = responder.Close() }() + + initiatorConn, err := net.ListenPacket("udp", "127.0.0.1:0") + if err != nil { + fmt.Fprintf(os.Stderr, "Error: Failed to open initiator socket: %v\n", err) + os.Exit(1) + } + initiator := tunnel.NewDatagramEndpoint(initiatorConn) + go initiator.Serve() + defer func() { _ = initiator.Close() }() + + dst := responderConn.LocalAddr() + fmt.Printf("Test setup: %s -> %s\n\n", initiatorConn.LocalAddr(), dst) + + durations := make([]time.Duration, count) + errors := 0 + + startTime := time.Now() + for i := 0; i < count; i++ { + handshakeStart := time.Now() + + session, err := tunnel.DialDatagram(initiator, dst) + if err != nil { + errors++ + durations[i] = 0 + continue + } + durations[i] = time.Since(handshakeStart) + _ = session + + step := count / 10 + if step == 0 { + step = 1 + } + if (i+1)%step == 0 || i == count-1 { + fmt.Printf("Progress: %d/%d (%.0f%%)\r", i+1, count, float64(i+1)/float64(count)*100) + } + } + fmt.Println() + + totalTime := time.Since(startTime) + successCount := count - errors + printHandshakeResults(count, successCount, errors, totalTime, durations) +} + func printHandshakeResults(total, successful, failed int, totalTime time.Duration, durations []time.Duration) { if failed == total { fmt.Fprintf(os.Stderr, "All handshakes failed\n") diff --git a/cmd/quantum-vpn/main.go b/cmd/quantum-vpn/main.go index 60578e4..8826689 100644 --- a/cmd/quantum-vpn/main.go +++ b/cmd/quantum-vpn/main.go @@ -130,7 +130,8 @@ EXAMPLES: func benchCommand() { fs := flag.NewFlagSet("bench", flag.ExitOnError) - handshakes := fs.Int("handshakes", 0, "Number of handshakes to benchmark (0 = skip)") + handshakes := fs.Int("handshakes", 0, "Number of stream (TCP) handshakes to benchmark (0 = skip)") + datagramHandshakes := fs.Int("datagram-handshakes", 0, "Number of datagram (UDP) handshakes to benchmark (0 = skip)") throughput := fs.Bool("throughput", false, "Run throughput benchmark") size := fs.String("size", "100MB", "Data size for throughput test (e.g., 100MB, 1GB)") duration := fs.String("duration", "10s", "Duration for throughput test (e.g., 10s, 1m)") @@ -145,9 +146,12 @@ OPTIONS:`) fs.PrintDefaults() fmt.Println(` EXAMPLES: - # Benchmark 100 handshakes + # Benchmark 100 stream (TCP) handshakes quantum-vpn bench --handshakes 100 + # Benchmark 100 datagram (UDP) handshakes + quantum-vpn bench --datagram-handshakes 100 + # Benchmark throughput for 30 seconds quantum-vpn bench --throughput --duration 30s @@ -160,7 +164,7 @@ EXAMPLES: _ = fs.Parse(os.Args[2:]) - runBench(*handshakes, *throughput, *size, *duration, *cipherSuite) + runBench(*handshakes, *datagramHandshakes, *throughput, *size, *duration, *cipherSuite) } func exampleCommand() { diff --git a/docs/datagram-transport.md b/docs/datagram-transport.md index 2ad7d01..99bee0e 100644 --- a/docs/datagram-transport.md +++ b/docs/datagram-transport.md @@ -5,22 +5,22 @@ the existing TCP/stream transport. The two share the crypto core (`pkg/chkem`, `pkg/crypto`, and the `Session` key/rekey secret derivation) but have **separate wire formats**. There is no TCP↔UDP interop, by design. -The transport is being built as a phased epic. This document tracks the design -and is updated as phases land. +The transport is being built incrementally. This document tracks the design and +is updated as pieces land. ## Status | Component | File | Status | |-----------|------|--------| -| Datagram wire codec | `pkg/protocol/datagram_codec.go` | Phase 1a — implemented | -| Multi-word replay window | `pkg/tunnel/replay.go` | Phase 1a — implemented | -| Bounded handshake reassembler | `pkg/tunnel/reassembly.go` | Phase 1a — implemented | -| Datagram constants | `internal/constants/constants.go` | Phase 1a — implemented | -| Endpoint + demux | `pkg/tunnel/datagram.go` | Phase 1a — pending | -| Reliable handshake FSM | `pkg/tunnel/dgram_handshake.go` | Phase 1a — pending | -| Epoch cipher selection (recv) | `pkg/tunnel/session.go` (datagram path) | Phase 1a — pending | -| Zero-alloc / batched I/O | — | Phase 1b | -| Stateless cookie / anti-amplification / roaming | — | Phase 2 | +| Datagram wire codec | `pkg/protocol/datagram_codec.go` | implemented | +| Multi-word replay window | `pkg/tunnel/replay.go` | implemented | +| Bounded handshake reassembler | `pkg/tunnel/reassembly.go` | implemented | +| Datagram constants | `internal/constants/constants.go` | implemented | +| Endpoint + demux + dial/accept | `pkg/tunnel/datagram.go` | implemented | +| Reliable handshake FSM + driver + wiring | `pkg/tunnel/dgram_handshake_{fsm,driver,wire}.go` | implemented | +| Epoch cipher selection (recv) | `pkg/tunnel/session.go` (datagram path) | pending | +| Zero-alloc / batched I/O | - | future | +| Stateless cookie / anti-amplification / roaming | - | future | ## Wire format @@ -33,7 +33,7 @@ Common 14-byte header (all frame types): [FrameType:1][Epoch:1][RecvIndex:4 BE][Seq:8 BE] ``` -- **FrameType** — DATA, HANDSHAKE, CLOSE, or RETRY (RETRY reserved for Phase 2). +- **FrameType** — DATA, HANDSHAKE, CLOSE, or RETRY (RETRY reserved for a future stateless-retry exchange). - **Epoch** — selects the receive cipher for DATA frames (see *Rekey*). Carried in the clear but authenticated: the AEAD AAD is the entire 14-byte header, so a flipped epoch is rejected, not merely mis-routed. @@ -51,8 +51,8 @@ HANDSHAKE frame appends an extension before the (possibly fragmented) message: [SenderIndex:4][MsgType:1][FragOffset:2][FragLen:2][TotalLen:2][CookieLen:1][Cookie:CookieLen] ``` -`CookieLen` is `0` in Phase 1; the field exists so the Phase 2 stateless-retry -hardening needs no wire change. +`CookieLen` is `0` today; the field exists so a future stateless-retry hardening +needs no wire change. ## Key design decisions (improve, do not inherit) @@ -65,8 +65,8 @@ hardening needs no wire change. - **Demux by random connection index, not source address.** A session survives NAT rebind/roaming, one address can host many sessions, and indices resist off-path guessing (CSPRNG, regenerated on the rare active-table collision). The - source address is only a hint, updated after an AEAD-valid packet (the - authenticated-rebinding enforcement lands in Phase 2). + source address is only a hint, updated after an AEAD-valid packet + (authenticated-rebinding enforcement is future work). - **Epoch-based rekey (reorder-safe).** The stream transport promotes the new receive cipher on the first new-key packet and discards the old one — correct @@ -93,16 +93,16 @@ hardening needs no wire change. best-effort datagram, never relied upon. `Send` emits one datagram per call and rejects payloads larger than `DatagramMaxDataPayload` (no PMTU discovery). -## Phase 1 DoS posture +## DoS posture Amplification is inherently low: the ~1.7 KB ClientHello must be fully received and reassembled before the comparable ServerHello is sent (response:request ≈ -1:1, not an amplifier). Phase 1 additionally reuses the existing rate limiters -and caps concurrent half-open handshakes per source and overall. The Phase 2 -stateless cookie closes the residual spoofed-source state-exhaustion gap and -enforces a strict anti-amplification bound. +1:1, not an amplifier). The current implementation additionally reuses the +existing rate limiters and caps concurrent half-open handshakes per source and +overall. A future stateless cookie will close the residual spoofed-source +state-exhaustion gap and enforce a strict anti-amplification bound. ## Out of scope (future) GSO/GRO offload, PMTU discovery, multipath, and a parallel per-datagram crypto -pipeline (revisited after the Phase 1 baseline is measured). +pipeline (revisited after the current baseline is measured). diff --git a/docs/udp-datagram-transport-plan.md b/docs/udp-datagram-transport-plan.md deleted file mode 100644 index 23114d9..0000000 --- a/docs/udp-datagram-transport-plan.md +++ /dev/null @@ -1,316 +0,0 @@ -# UDP / Datagram Transport — phased design (refined) - -> **Planning-environment caveat (read first).** During this refinement the -> sandbox stopped returning tool output after the first call. I confirmed the -> package layout (one successful `find`/`ls`) but could **not** re-open the -> source files to re-verify line numbers or internal structures. Items that -> depend on unverified internals are tagged **[VERIFY]** below — confirm each -> against the code before/while implementing. The one structural correction I -> *could* confirm from the directory listing is called out in §0. - ---- - -## Context - -The transport is TCP/stream-only. The draft's exploration found stream -assumptions throughout: blocking `io.ReadFull` handshake reads, length-prefixed -framing, a single-writer `writeMu`, a 64-entry replay window, and a rekey that -*assumes in-order delivery* (promote-on-first-new-key, discard-old). None of -that survives datagram reordering, loss, duplication, or the ~1644 B Hellos that -exceed a 1500 B MTU. UDP needs a purpose-built datagram transport, not a port of -the stream code. - -**Goal:** a clean-sheet, connectionless datagram transport that reuses the -crypto core unchanged and *improves on* (does not inherit) the TCP stack's -constrained choices. Delivered as a phased epic with an explicit Datagram API -(not `Dial`/`Listen("udp")`), DoS/amplification hardening deferred to Phase 2 but -with its wire hooks reserved in Phase 1. - -**Reused unchanged (no fork):** `pkg/chkem`, `pkg/crypto` (AEAD, -`DeriveTrafficKeys`, `DeriveRekeySecret`, KDF), and the `Session` key/rekey -*secret-derivation* primitives. **New:** the transport/framing/handshake-delivery -layers. The TCP wire format is untouched; UDP is a separate transport with its -own framing (no TCP↔UDP interop, documented). - ---- - -## §0. Corrections to the draft (confirmed structural facts) - -- **There is no `pkg/tunnel/replay.go`.** The `pkg/tunnel` listing contains - `session.go`, `handshake.go`, `transport.go`, `limiter.go`, `ticket.go`, - pool/observer files, and tests — but no `replay.go`. The draft lists - `replay.go` under *Extended* and as the home of the multi-word window. The - replay window today lives inside another file (almost certainly `session.go`). - **Action:** treat the replay window as **New** code — create - `pkg/tunnel/replay.go` housing the multi-word window type, and have the - datagram recv path use it. Decide during implementation whether to also - migrate the TCP window into it or leave TCP's in place (prefer leaving TCP - untouched to avoid regressing the stream path). **[VERIFY]** the current - window's exact location/shape. - -- Everywhere the draft says "extend `pkg/tunnel/replay.go`", read "add - `pkg/tunnel/replay.go` (new)". - ---- - -## Architecture at a glance - -``` - ┌─────────────────────────── DatagramEndpoint ──────────────────────────┐ - *net.UDPConn ──recv──▶│ parse frame header (type,epoch,recvIndex,seq) │ - │ │ │ - │ ├─ recvIndex ─▶ session table (random CSPRNG index, NOT src addr) │ - │ │ │ │ - data datagram ────────┼───┤ ┌───────┴────────┐ │ - [type|epoch|idx|seq|ct] │ │ found session │ ── epoch ─▶ select recv cipher │ - │ │ │ │ (cur | prev-retained) │ - │ │ │ │ ── seq ─▶ global replay window │ - │ │ │ │ (multi-word, never reset) │ - │ │ │ │ ── AEAD Open (AAD = idx||epoch) │ - │ │ │ │ ── on success: maybe rebind src addr │ - │ │ └────────────────┘ (Phase 2 enforcement) │ - │ │ │ - handshake datagram ───┼───┴─▶ reassembler (HANDSHAKE only, bounded) ─▶ dgram_handshake FSM │ - [type|...|msgID|fragOff| (per-src cap, per-buf cap, timeout) (retransmit/dedup/ │ - fragLen|totalLen|...] flight cache) │ - │ │ - new session ─────────┼────────────────────────────────────────────────▶ accept channel │ - └─────────────────────────────────────────────────────────────────────────┘ -``` - -The crypto core (`chkem`, `crypto`, `Session` secret derivation) sits unchanged -underneath; only header parsing, demux, reassembly, the handshake FSM, epoch -cipher selection, and the replay window are new. - ---- - -## Phase 1 — Functional UDP datagram transport (PR 1) - -### Wire format (`pkg/protocol/datagram_codec.go`, new) - -Self-contained per-datagram frame, **no cross-datagram length prefix, no -transmitted nonce**: - -``` -data: [type:1][epoch:1][recvIndex:4][seq:8] || ciphertext+tag - └────────── AAD includes recvIndex + epoch ──────────┘ -handshake: [type:1][epoch:1][recvIndex:4][seq:8][senderIndex:4] - [msgID:?][fragOffset:?][fragLen:?][totalLen:?] || ct+tag -``` - -- **Nonce is derived, not sent:** `nonce = sessionNoncePrefix(4B) || seq(8B)` - (12 B AEAD nonce). `seq` is the **single** replay counter *and* AEAD nonce - counter, **globally monotonic, never reset across rekey**. This (a) keeps - per-`(key,nonce)` uniqueness trivially — key changes by epoch before any - realistic 2⁶⁴ wrap, and within an epoch `seq` is unique; (b) gives one replay - window for the whole session; (c) delivers ROADMAP #2 session-bound nonces for - free; (d) saves 12 B/packet vs the TCP path that prepends the nonce. -- `recvIndex` + `epoch` go in the **AEAD AAD** (authenticated — a flipped epoch - is *rejected*, not merely mis-routed). -- **Overhead budget (state it for the Send error):** data header = 1+1+4+8 = - **14 B** + **16 B** tag = **30 B**/datagram. With a conservative 1200 B - datagram budget, max inner payload ≈ **1170 B**. Define the exact constant in - `internal/constants`. -- Confirm field widths for `msgID`/`fragOffset`/`fragLen`/`totalLen` against the - ~1644 B Hello reassembly need (totalLen must hold ≥ ~1700; offsets/lengths fit - in 2 B at a 1200 B MTU). **[VERIFY]** existing message sizes via - `EncodeClientHello`/`EncodeServerHello`. **[VERIFY]** that these encode - functions exist with those names in `pkg/protocol`. - -### Endpoint + demux (`pkg/tunnel/datagram.go`, new) - -- `DatagramEndpoint` over a single `*net.UDPConn`. Demux **by random per-session - connection index** carried in every frame (WireGuard receiver-index model), - **not by source address** → a session survives NAT rebind/roaming, one address - hosts many sessions, indices resist off-path guessing (CSPRNG via - `pkg/crypto/random`, not sequential; regenerate on the rare active-table - collision). **[VERIFY]** the CSPRNG helper name in `pkg/crypto/random.go`. -- Source address is a *hint*, updated only after an AEAD-valid packet (secure - roaming; enforcement lands in Phase 2, hook present in Phase 1). -- New sessions surface on an **accept channel** (connectionless) — no - `net.Listener.Accept` shape. - -### Phase 1a — correctness (land + benchmark before optimizing) - -**Reassembly (`pkg/tunnel/reassembly.go`, new) — HANDSHAKE messages only.** -Data frames are capped to one datagram (a VPN inner payload ≤ MTU), so the -reassembler never sees data traffic — only the bounded, few-message handshake. -Bound it hard (it runs pre-auth): per-source cap on concurrent buffers, -per-buffer size cap, and a timeout; reject/evict on breach. - -**Reliable handshake (`pkg/tunnel/dgram_handshake.go`, new), bilateral.** -- Both sides retransmit their last flight on timeout or on receiving a duplicate - of the peer's prior flight; the **initiator** owns the abort timeout + - exponential backoff + bounded retries; per-flight dedup. -- The **responder caches its generated ServerHello + derived secret** (keyed by - connection index) and replays that flight *verbatim* on a duplicate - ClientHello — it must **never** re-run the randomized `Encapsulate`, which would - derive a different secret and desync the sides. **[VERIFY]** the `Encapsulate` - entry point in `pkg/chkem`. -- Reuse existing message *contents* - (`EncodeClientHello`/`ServerHello`/`Finished`), wrapped in datagram+fragment - framing. **[VERIFY]** these names. - -**Lifecycle (no TCP FIN over UDP).** Reap by idle timeout; close is a -best-effort datagram, never relied on. **Send contract changes:** one datagram -per `Send`; payload capped to the datagram MTU (overhead-adjusted); `Send` -returns a clear, typed error on oversize (PMTU discovery out of scope). -**Resumption** reuses the existing ticket `Resume`/`ExportTicket` flow within the -new framing. **[VERIFY]** ticket API names in `pkg/tunnel/ticket.go`. - -**Rekey + replay reworked for reordering (NOT inherited).** -- **Epoch-based rekey:** 1-byte `epoch` per data frame, in the AAD. Receiver - selects cipher by epoch and retains the *previous* epoch's recv cipher for a - bounded window (seq-distance **and** time), then retires it. Define wrap - handling: epoch is mod 256, only adjacent epochs are ever live, so wrap is - unambiguous. Replaces the unsafe in-order promote/discard trial-decrypt. - *Alternative considered:* trial-decrypt current-then-previous with no epoch - byte — simpler, no wire field, but pays a second `Open` during the window; - epoch chosen since the UDP wire is new anyway. -- **Implementation guidance (avoid TCP regression):** add the epoch-keyed cipher - selection as a **datagram-specific recv path / new methods on `Session`**, - reusing `DeriveRekeySecret` and the pending-cipher derivation — **do not mutate - the TCP recv path's promote/discard logic.** **[VERIFY]** how `Session` - currently stores current/pending ciphers so the datagram path can hold - `{epoch → cipher}` for the two live epochs. -- **Single global replay window** (`pkg/tunnel/replay.go`, new): multi-word - bitmap (≥1024, ROADMAP #5) over the monotonic `seq`, **never reset across - rekey** — epoch picks the cipher, the window tracks seq independently. This - carries the boundary-replay lesson from the TCP fix (resetting the window on - key change re-opens a one-packet replay). - -**Reserve DoS hooks now (no Phase-2 wire change):** include an optional cookie -field and a `RetryRequest` message type in the handshake framing. - -**Phase-1 DoS posture (not wide-open despite no cookie yet):** -- Reuse `HandshakeLimiter` (global rate); adapt `IPRateLimiter` to a - per-source-addr half-open handshake cap; bound concurrent half-open sessions. - **[VERIFY]** `HandshakeLimiter` / `IPRateLimiter` signatures in - `pkg/tunnel/limiter.go`. -- Amplification is inherently low: the ~1644 B PQ ClientHello must be fully - received/reassembled before the ~1642 B ServerHello is sent, so - response:request ≈ 1:1 (not an amplifier). Document this; the Phase-2 cookie - closes the residual spoofed-source state-exhaustion gap. - -### Phase 1b — performance (after a measured 1a baseline) - -Design 1a to *allow* these (header headroom in the frame layout, no forced -copies); land and tune them after a correctness baseline + benchmark, mirroring -the TCP "measure then optimize" approach: -- **Zero-alloc steady-state path:** encrypt in place into a reusable - per-endpoint datagram buffer with header headroom; `SealAppend`/`OpenInto`-style - AEAD; stack `[8]byte`/`[12]byte` nonce+AAD scratch. Target 0 allocs/op on send - and on the opt-in receive path. **[VERIFY]** which append/in-place AEAD entry - points `pkg/crypto/aead.go` exposes (reuse, don't add). -- **Batched syscalls:** `recvmmsg`/`sendmmsg` via `golang.org/x/net/ipv4|ipv6` - `PacketConn` `ReadBatch`/`WriteBatch`. Adds a direct `golang.org/x/net` dep - (already indirect) — **confirm before promoting to a direct dep.** -- **Drop per-packet overhead:** atomic `LastActivity` (no per-packet - `mu.Lock()`), coarse deadlines, no per-packet `time.Now()` in the hot path. - ---- - -## Phase 2 — DoS / amplification hardening + assurance (PR 2) - -- **Stateless retry/cookie:** responder holds no state until the client echoes a - MAC'd cookie bound to its source address (HelloRetryRequest / WireGuard-cookie - / Rosenpass-biscuit style). Enforce **anti-amplification** (never send >~1× - received bytes to an unverified source). Uses the Phase-1 reserved cookie - field — no wire change. -- **Authenticated address rebinding (roaming):** migrate a session's peer - address only after an AEAD-valid packet from the new address (prevents off-path - hijack/redirect via spoofed source). UDP-specific property, no TCP analog. -- **Assurance:** threat-model doc in `docs/`; fuzz the datagram parser + - reassembler + cookie path; negative tests for spoofed-source and - amplification. -- *Optional:* fixed-size datagram padding for traffic-analysis resistance. - ---- - -## UDP-native improvements (explicit, not inherited) - -| Area | TCP today | UDP design | -|------|-----------|------------| -| Nonce | `[0000‖counter]` prepended (12 B/pkt) | derived from `seq`+session prefix, **not transmitted** (−12 B/pkt; ROADMAP #2) | -| Demux / roaming | one TCP conn per peer | random per-frame connection index; address rebind only after an authenticated packet | -| Rekey activation | in-order trial-decrypt, promote+discard | explicit 1-byte epoch (in AAD) per frame; reorder-safe | -| Replay window | 64 entries | ≥1024 multi-word, single global, reorder-tolerant, never reset | -| Data-path allocs | per packet | zero-alloc in-place encrypt into reused buffer | -| Syscalls | one write/read per packet | batched recvmmsg/sendmmsg | -| Per-packet bookkeeping | `mu.Lock()`+`time.Now()` | atomic timestamp, coarse deadlines | -| Source spoofing/DoS | n/a (TCP) | stateless cookie + anti-amplification (Phase 2) | -| HoL blocking | inherent | none; opens door to parallel per-datagram crypto (future) | - ---- - -## Critical files - -- **New:** `pkg/tunnel/datagram.go` (endpoint + index demux + accept channel), - `pkg/tunnel/dgram_handshake.go` (retransmit/timeout/flight-cache/dedup), - `pkg/tunnel/reassembly.go` (bounded handshake reassembly), - `pkg/tunnel/replay.go` (multi-word global window — **new, not extended**), - `pkg/protocol/datagram_codec.go` (frame + fragment format). -- **Extended (additively, no TCP-path regression):** `pkg/tunnel/session.go` - (datagram recv path with epoch→cipher selection; reuse `DeriveRekeySecret` / - pending-cipher derivation), `internal/constants/constants.go` (UDP MTU, - payload cap, fragment limits, window size, epoch width), - `pkg/protocol/messages.go` (cookie/retry + fragment headers), - `pkg/tunnel/limiter.go` (per-src half-open cap; reuse existing limiters). -- **Reused unchanged:** `pkg/chkem`, `pkg/crypto` (aead, kdf, random), `Session` - secret/rekey derivation. - ---- - -## Implementation order - -1. `datagram_codec.go` frame/fragment encode+decode + unit tests (pure, no I/O). -2. `replay.go` multi-word window + table-driven reorder/dup tests. -3. `reassembly.go` bounded reassembler + bounds/timeout tests. -4. `datagram.go` endpoint skeleton: UDPConn, index table, accept channel, demux. -5. `dgram_handshake.go` bilateral retransmit FSM + responder flight cache; - wire in `chkem` Encapsulate/Decapsulate and existing Hello/Finished encoders. -6. Epoch cipher selection on the datagram recv path in `session.go`; close the - data plane (`Send`/recv) with the new framing + replay window. -7. Limiter integration + reserved cookie/retry fields (no enforcement yet). -8. Phase-1a verification suite + loopback baseline. **Then** Phase 1b. **Then** - Phase 2 in a separate PR. - ---- - -## Verification - -- **Phase 1a (correctness):** a deterministic, seeded fault-injection - `net.PacketConn` (configurable drop/reorder/duplicate rates) drives tests for - fragmentation+reassembly, handshake completion under drop/reorder/dup, rekey - under reorder (epoch selection + previous-epoch retention + retirement), replay - window under reorder, reassembly memory bounds, half-open caps. `go test ./... - -race`. Establish a loopback-UDP throughput + handshakes/sec baseline. -- **Phase 1b (performance):** `testing.AllocsPerRun == 0` on the send path; - loopback-UDP throughput with/without batched I/O vs the 1a baseline - (benchstat); confirm no correctness regression under `-race`. -- **Phase 2:** spoofed-source/amplification tests (unverified source can neither - create session state nor elicit a larger-than-received response), cookie - round-trip, address-rebind only after an AEAD-valid packet; fuzz parser + - reassembler + cookie path. - -## Out of scope (future) - -GSO/GRO offload, PMTU discovery, multipath, parallel per-datagram crypto pipeline -(revisit after the Phase-1 baseline is measured). - ---- - -## Open items to confirm during implementation (the **[VERIFY]** list) - -1. Current location/shape of the 64-entry replay window (no `replay.go` exists). -2. How `Session` stores current/pending ciphers + the rekey state machine entry - points (`DeriveRekeySecret`, pending-cipher promotion) — to add an - epoch-keyed selection without touching the TCP path. -3. Exact `pkg/protocol` encoder names (`EncodeClientHello`/`ServerHello`/ - `Finished`) and current Hello sizes (drives fragment header widths). -4. `pkg/chkem` `Encapsulate`/`Decapsulate` entry points (responder must cache, - not re-run, Encapsulate). -5. `pkg/crypto/aead.go` in-place/append AEAD entry points and - `pkg/crypto/random.go` CSPRNG helper (reuse for zero-alloc + index gen). -6. `HandshakeLimiter` / `IPRateLimiter` signatures in `limiter.go`. -7. Ticket `Resume`/`ExportTicket` API in `ticket.go`. diff --git a/docs/usage/CLI.md b/docs/usage/CLI.md index 6e2b5ee..6b92d04 100644 --- a/docs/usage/CLI.md +++ b/docs/usage/CLI.md @@ -73,9 +73,12 @@ go build -tags otel -o quantum-vpn ./cmd/quantum-vpn Test performance on your hardware: ```bash -# Benchmark 100 handshakes +# Benchmark 100 stream (TCP) handshakes quantum-vpn bench --handshakes 100 +# Benchmark 100 datagram (UDP) handshakes +quantum-vpn bench --datagram-handshakes 100 + # Benchmark throughput for 30 seconds quantum-vpn bench --throughput --duration 30s @@ -86,13 +89,15 @@ quantum-vpn bench --throughput --size 1GB --cipher chacha20 quantum-vpn bench --handshakes 100 --throughput --size 500MB ``` -### Verified Performance (Apple M1 Pro, Go 1.26.3, loopback TCP) -- **Handshakes**: ~1,450/sec (~670us each, full CH-KEM, sequential) +### Verified Performance (Apple M1 Pro, Go 1.26.3, loopback) +- **Handshakes (stream/TCP)**: ~1,450/sec (~670us each, full CH-KEM, sequential) +- **Handshakes (datagram/UDP)**: ~1,300/sec (~760us each, full CH-KEM, sequential) - **Cipher throughput**: ~2.5 GB/s AES-256-GCM raw AEAD (ARMv8 Crypto Extensions); ~0.7 GB/s ChaCha20-Poly1305 -- **Single-tunnel throughput**: ~690 MB/s (5.5 Gb/s) end-to-end over TCP, sustained across automatic rekeys +- **Single-tunnel throughput (stream/TCP)**: ~690 MB/s (5.5 Gb/s) end-to-end over TCP, sustained across automatic rekeys -> Transport is TCP/stream only; UDP is not supported. End-to-end tunnel throughput is -> currently allocation-bound and below the raw cipher rate. +> The datagram (UDP) transport currently implements the handshake only; its encrypted data +> path is not yet landed, so there is no datagram throughput number. End-to-end stream +> throughput is currently allocation-bound and below the raw cipher rate. ## Example Mode