Skip to content

Commit e7fa66e

Browse files
committed
Fixed PipeableSSRExecutorManager streaming
1 parent 9e1a944 commit e7fa66e

File tree

2 files changed

+44
-48
lines changed

2 files changed

+44
-48
lines changed
Lines changed: 33 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,51 +1,44 @@
1-
import { Writable } from 'stream';
2-
import { SSRExecutorManager, SSRExecutorManagerOptions } from '../SSRExecutorManager.js';
1+
import { SSRExecutorManager } from '../SSRExecutorManager.js';
2+
import { AbortError } from '../../utils.js';
33

44
/**
55
* Streaming executor manager for NodeJS environment.
66
*/
77
export class PipeableSSRExecutorManager extends SSRExecutorManager {
8-
/**
9-
* The stream that includes both React rendering chunks and executor hydration chunks.
10-
*/
11-
readonly stream: NodeJS.WritableStream;
8+
private _isPiped = false;
9+
private _pendingChunk = '';
1210

1311
/**
14-
* Creates a new {@link PipeableSSRExecutorManager} instance.
12+
* Outputs executor hydration chunks into the provided
13+
* [Writable Node.js Stream](https://nodejs.org/api/stream.html#writable-streams).
1514
*
16-
* @param stream The output stream to which both React chunks and executor hydration chunks are written.
17-
* @param options Additional options.
15+
* @param stream The stream to write to.
16+
* @template Stream The stream to write to.
1817
*/
19-
constructor(stream: NodeJS.WritableStream, options?: SSRExecutorManagerOptions) {
20-
super(options);
21-
22-
this.stream = new Writable({
23-
write: (chunk, encoding, callback) => {
24-
stream.write(chunk, encoding, error => {
25-
if (error) {
26-
callback(error);
27-
return;
28-
}
29-
30-
if (!chunk.toString().endsWith('</script>')) {
31-
callback();
32-
return;
33-
}
34-
35-
const hydrationChunk = this.nextHydrationChunk();
36-
37-
if (hydrationChunk !== '') {
38-
stream.write(hydrationChunk, callback);
39-
return;
40-
}
41-
42-
callback();
43-
});
44-
},
45-
46-
final: callback => {
47-
stream.end(callback);
48-
},
49-
});
18+
pipe<Stream extends NodeJS.WritableStream>(stream: Stream): Stream {
19+
if (this._isPiped) {
20+
throw new Error('React Executor currently only supports piping to one stream');
21+
}
22+
23+
this._isPiped = true;
24+
25+
const writeChunk = () => {
26+
if ((this._pendingChunk ||= this.nextHydrationChunk()) && stream.write(this._pendingChunk)) {
27+
console.log('EXECUTOR ' + this._pendingChunk + '\n\n');
28+
this._pendingChunk = '';
29+
}
30+
};
31+
32+
this.subscribe(writeChunk);
33+
34+
stream.on('drain', writeChunk);
35+
36+
stream.on('error', () => this.abort(AbortError('The stream errored while writing data')));
37+
38+
stream.on('cancel', () => this.abort(AbortError('The stream closed early')));
39+
40+
writeChunk();
41+
42+
return stream;
5043
}
5144
}

src/test/ssr/node/PipeableSSRExecutorManager.test.ts

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,20 @@ Date.now = () => 50;
88
test('sends hydration chunk after the content chunk', async () => {
99
const writeMock = vi.fn();
1010

11-
const outputStream = new Writable({
11+
const writable = new Writable({
1212
write(chunk, _encoding, callback) {
1313
writeMock(chunk.toString());
1414
callback();
1515
},
1616
});
1717

18-
const manager = new PipeableSSRExecutorManager(outputStream);
18+
const manager = new PipeableSSRExecutorManager();
1919

2020
manager.getOrCreate('xxx', 111);
2121

22-
manager.stream.write('aaa</script>');
22+
writable.write('aaa</script>');
23+
24+
manager.pipe(writable);
2325

2426
await delay(200);
2527

@@ -34,20 +36,21 @@ test('sends hydration chunk after the content chunk', async () => {
3436
test('does not send hydration chunk if nothing has changed', async () => {
3537
const writeMock = vi.fn();
3638

37-
const outputStream = new Writable({
39+
const writable = new Writable({
3840
write(chunk, _encoding, callback) {
3941
writeMock(chunk.toString());
4042
callback();
4143
},
4244
});
4345

44-
const manager = new PipeableSSRExecutorManager(outputStream);
46+
const manager = new PipeableSSRExecutorManager();
4547

4648
manager.getOrCreate('xxx', 111);
4749

48-
manager.stream.write('aaa</script>');
49-
manager.stream.write('bbb');
50-
manager.stream.write('ccc</script>');
50+
writable.write('aaa</script>');
51+
manager.pipe(writable);
52+
writable.write('bbb');
53+
writable.write('ccc</script>');
5154

5255
await delay(200);
5356

0 commit comments

Comments
 (0)