-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstream_test.go
More file actions
135 lines (110 loc) · 3.07 KB
/
Copy pathstream_test.go
File metadata and controls
135 lines (110 loc) · 3.07 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package netpipe
import (
"bytes"
"net"
"testing"
)
// TestStreamRoundTrip proves a large payload chunked and reassembled correctly.
func TestStreamRoundTrip(t *testing.T) {
server, client := net.Pipe()
defer server.Close()
defer client.Close()
// 256 KB payload, 64 KB chunks → 4 chunks
msg := make([]byte, 256*1024)
for i := range msg {
msg[i] = byte(i % 256)
}
go func() {
if err := writeStream(client, msg, DefaultChunkSize); err != nil {
t.Errorf("writeStream: %v", err)
}
}()
buf := newStreamBuffer()
for {
flags, body, err := readFrameFull(server)
if err != nil {
t.Fatalf("readFrameFull: %v", err)
}
if flags&flagStream == 0 {
t.Fatal("expected flagStream to be set")
}
streamID, index, total, chunk, err := parseStreamBody(body)
if err != nil {
t.Fatalf("parseStreamBody: %v", err)
}
assembled, complete := buf.addChunk(streamID, index, total, chunk)
if complete {
if !bytes.Equal(assembled, msg) {
t.Fatalf("reassembled data mismatch (got %d bytes, want %d)", len(assembled), len(msg))
}
return // success
}
}
}
// TestStreamSmallPayload proves streaming works for a payload smaller than one chunk.
func TestStreamSmallPayload(t *testing.T) {
server, client := net.Pipe()
defer server.Close()
defer client.Close()
msg := []byte("tiny payload")
go func() {
writeStream(client, msg, DefaultChunkSize)
}()
flags, body, err := readFrameFull(server)
if err != nil {
t.Fatalf("readFrameFull: %v", err)
}
if flags&flagStream == 0 {
t.Fatal("expected flagStream")
}
streamID, index, total, chunk, err := parseStreamBody(body)
if err != nil {
t.Fatalf("parseStreamBody: %v", err)
}
if total != 1 {
t.Fatalf("expected 1 total chunk, got %d", total)
}
assembled, complete := newStreamBuffer().addChunk(streamID, index, total, chunk)
if !complete {
t.Fatal("expected complete after single chunk")
}
if !bytes.Equal(assembled, msg) {
t.Fatalf("got %q, want %q", assembled, msg)
}
}
// TestStreamDuplicateChunkIgnored proves duplicate chunks don't corrupt reassembly.
func TestStreamDuplicateChunkIgnored(t *testing.T) {
buf := newStreamBuffer()
chunk0 := []byte("aaaa")
chunk1 := []byte("bbbb")
// add chunk 0
_, complete := buf.addChunk("stream-1", 0, 2, chunk0)
if complete {
t.Fatal("should not be complete yet")
}
// duplicate chunk 0 - should be ignored
_, complete = buf.addChunk("stream-1", 0, 2, chunk0)
if complete {
t.Fatal("duplicate should not trigger completion")
}
// add chunk 1 - should complete
assembled, complete := buf.addChunk("stream-1", 1, 2, chunk1)
if !complete {
t.Fatal("should be complete now")
}
expected := append(chunk0, chunk1...)
if !bytes.Equal(assembled, expected) {
t.Fatalf("got %q, want %q", assembled, expected)
}
}
// TestStreamBufferCleanup proves completed streams are removed from the buffer.
func TestStreamBufferCleanup(t *testing.T) {
buf := newStreamBuffer()
buf.addChunk("s1", 0, 1, []byte("data"))
buf.mu.Lock()
_, exists := buf.streams["s1"]
buf.mu.Unlock()
if exists {
t.Fatal("completed stream should be cleaned up from buffer")
}
}