-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathindex.ts
More file actions
154 lines (134 loc) · 4.25 KB
/
index.ts
File metadata and controls
154 lines (134 loc) · 4.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
export default class FetchStreamer {
private originalChunkSize = 0;
private targetChunkSize = 0;
private url = '';
private chunkBuffer = null;
private reader = null;
private decodedChunkData = null;
private textDecoder = null;
private callbackOnFinish = () => {
};
private callbackOnData = () => {
};
private startedAt = 0;
private canReadNextChunk = true;
private writtenBufferSize = 0;
private receivedBytes = 0;
private processedBytes = 0;
private unconsumedBytes = 0;
private writableLength = 0;
private currentWriteLength = 0;
private writingPosition = 0;
/**
* FetchStreamer, process data while downloading it.
* @param url Reading URL
* @param targetChunkSize Initial minimum processing unit size
* @param useDefaultTextDecoder Decode each chunk to text with specific encoding. See also https://developer.mozilla.org/en-US/docs/Web/API/TextDecoder
*/
constructor(url, targetChunkSize, useDefaultTextDecoder = null) {
this.originalChunkSize = targetChunkSize;
this.targetChunkSize = targetChunkSize;
this.url = url;
this.chunkBuffer = new Uint8Array(targetChunkSize);
this.readerCallback = this.readerCallback.bind(this);
if (useDefaultTextDecoder) {
this.textDecoder = new TextDecoder(useDefaultTextDecoder);
}
}
/**
* Set onFinish callback
* @param callback Callback function
*/
onFinish(callback) {
this.callbackOnFinish = callback;
}
/**
* Set onData callback
* @param callback Callback function
*/
onData(callback) {
this.callbackOnData = callback;
}
private resetBuffer(nextChunkSize) {
this.targetChunkSize = +nextChunkSize;
this.chunkBuffer = new Uint8Array(this.targetChunkSize);
}
private chunked(ended = false) {
this.processedBytes += this.writtenBufferSize;
this.decodedChunkData = ended
? this.chunkBuffer.slice(0, this.writtenBufferSize)
: this.chunkBuffer;
if (this.textDecoder) {
this.decodedChunkData = this.textDecoder.decode(this.decodedChunkData);
}
if (ended && this.decodedChunkData.length === 0) return;
const nextChunkSize = this.callbackOnData.call(
this.callbackOnData,
this.decodedChunkData,
{
targetSize: this.targetChunkSize,
fulfilled: this.writtenBufferSize === this.targetChunkSize,
},
)
if (nextChunkSize > 0) this.resetBuffer(nextChunkSize);
this.chunkBuffer.fill(0);
this.writtenBufferSize = 0;
}
private ended() {
this.callbackOnFinish.call(this.callbackOnFinish, {
bytesReceived: this.receivedBytes,
bytesProcessed: this.processedBytes,
elapsed: Date.now() - this.startedAt,
});
}
private process(view: Uint8Array) {
this.unconsumedBytes = view.length;
while (this.unconsumedBytes > 0) {
this.writableLength = this.targetChunkSize - this.writtenBufferSize;
this.currentWriteLength = this.unconsumedBytes > this.writableLength ? this.writableLength : this.unconsumedBytes;
this.writingPosition = view.length - this.unconsumedBytes;
this.chunkBuffer.set(
view.slice(this.writingPosition, this.writingPosition + this.currentWriteLength),
this.writtenBufferSize,
);
this.writtenBufferSize += this.currentWriteLength;
this.unconsumedBytes -= this.currentWriteLength;
this.writtenBufferSize === this.targetChunkSize && this.chunked();
}
}
/**
* Pause processing, this will not pause data fetching
*/
pause() {
this.canReadNextChunk = false;
}
/**
* Resume processing
*/
resume() {
this.canReadNextChunk = true;
this.next();
}
private next() {
this.canReadNextChunk && this.reader.read().then(this.readerCallback);
}
private readerCallback({value, done}) {
if (done) {
this.chunked(true);
this.ended();
} else {
this.process(value);
this.receivedBytes += value.length;
this.next();
}
}
/**
* Start reading and processing
*/
async start() {
this.startedAt = Date.now();
this.reader = await fetch(this.url)
.then(response => response.body.getReader());
this.next();
}
}