@@ -15,6 +15,9 @@ let currentBatch = [];
1515let existingColumnsSet = new Set ( ) ;
1616let pendingColumns = [ ] ;
1717let parser = null ;
18+ let parserWriter = null ;
19+ let parserReader = null ;
20+ let parserReadPromise = null ;
1821let JSONParser = null ;
1922
2023/**
@@ -25,6 +28,10 @@ let JSONParser = null;
2528 * 2. Classic workers (needed for importScripts) don't support ES modules natively
2629 * 3. The library is loaded from esm.sh CDN which properly handles ESM module resolution
2730 *
31+ * The @streamparser/json-whatwg library uses WHATWG Streams API:
32+ * - parser.writable is a WritableStream
33+ * - We need to get a writer from it to write chunks
34+ *
2835 * For production deployments with strict security requirements, consider:
2936 * - Hosting the library locally
3037 * - Using Subresource Integrity (SRI) hashes
@@ -41,18 +48,34 @@ async function initParser() {
4148 // keepStack: false keeps memory usage low
4249 parser = new JSONParser ( { paths : [ '$[*]' ] , keepStack : false } ) ;
4350
44- parser . onValue = ( { value } ) => {
45- // This fires exactly once per complete object in your array
46- processObject ( value ) ;
47- } ;
51+ // Get a writer from the parser's writable stream
52+ parserWriter = parser . writable . getWriter ( ) ;
53+
54+ // Set up the readable stream to consume parsed values
55+ parserReader = parser . readable . getReader ( ) ;
56+
57+ // Read parsed values in the background
58+ // Store the promise so we can wait for it to complete
59+ parserReadPromise = ( async ( ) => {
60+ try {
61+ while ( true ) {
62+ const { done, value } = await parserReader . read ( ) ;
63+ if ( done ) break ;
64+
65+ // Process each parsed object
66+ if ( value ) {
67+ processObject ( value ) ;
68+ }
69+ }
70+ } catch ( err ) {
71+ console . error ( '[DB Worker] JSON Parse Error:' , err ) ;
72+ postMessage ( {
73+ type : 'error' ,
74+ data : { message : `JSON Parse Error: ${ err . message } ` }
75+ } ) ;
76+ }
77+ } ) ( ) ;
4878
49- parser . onError = ( err ) => {
50- console . error ( '[DB Worker] JSON Parse Error:' , err ) ;
51- postMessage ( {
52- type : 'error' ,
53- data : { message : `JSON Parse Error: ${ err . message } ` }
54- } ) ;
55- } ;
5679 } catch ( error ) {
5780 console . error ( '[DB Worker] Failed to initialize parser:' , error ) ;
5881 postMessage ( {
@@ -114,11 +137,11 @@ self.onmessage = async function(event) {
114137 break ;
115138
116139 case 'chunk' :
117- // THIS IS THE FIX: We just feed the raw string to the library.
118- // It handles the partial brackets, nested objects, and arrays automatically.
119- if ( parser ) {
140+ // Write the chunk to the parser using the WHATWG Streams API
141+ // The parser's writable stream will handle partial JSON and buffer management
142+ if ( parserWriter ) {
120143 try {
121- parser . write ( data ) ;
144+ await parserWriter . write ( data ) ;
122145 } catch ( e ) {
123146 console . error ( '[DB Worker] Parser Write Error:' , e ) ;
124147 postMessage ( {
@@ -132,9 +155,21 @@ self.onmessage = async function(event) {
132155 break ;
133156
134157 case 'end' :
135- if ( parser ) {
136- console . log ( '[DB Worker] End of stream received, finalizing...' ) ;
137- await finalizeProcessing ( ) ;
158+ if ( parserWriter ) {
159+ console . log ( '[DB Worker] End of stream received, closing parser...' ) ;
160+ try {
161+ await parserWriter . close ( ) ;
162+ console . log ( '[DB Worker] Parser closed, waiting for all values to be processed...' ) ;
163+ // Wait for the reader to finish processing all parsed values
164+ if ( parserReadPromise ) {
165+ await parserReadPromise ;
166+ }
167+ console . log ( '[DB Worker] All values processed, finalizing...' ) ;
168+ await finalizeProcessing ( ) ;
169+ } catch ( e ) {
170+ console . error ( '[DB Worker] Error closing parser:' , e ) ;
171+ await finalizeProcessing ( ) ;
172+ }
138173 }
139174 break ;
140175
0 commit comments