1- const { pack, unpack } = require ( 'msgpackr' )
2- const { mkdirSync } = require ( 'fs' )
3- const { Cursor, Env } = require ( 'node-gyp-build' ) ( __dirname )
1+ const { pack, unpack } = require ( 'msgpackr' )
2+ const { mkdirSync } = require ( 'fs' )
3+ const { Cursor, Env } = require ( 'node-gyp-build' ) ( __dirname )
4+ const { AsyncLocalStorage } = require ( 'async_hooks' )
45
56function asBinary ( buffer ) {
67 return {
@@ -69,138 +70,167 @@ class Iterator {
6970}
7071
7172/**
72- * This class enforces a very specific usage of lmdb. All keys are utf8 encoded
73- * buffers and all values are encoded with msgpack. Transaction handling is also
74- * intentionally simplified to just one current transaction, no nesting, with
75- * the assumption that there is only one active client . This is the pattern that
73+ * This class enforces a very specific usage of lmdb. All keys are
74+ * utf8 encoded buffers and all values are encoded with
75+ * msgpack. Transaction handling is also intentionally simplified to
76+ * just one current transaction, no nesting . This is the pattern that
7677 * adset-consumer uses in its current form.
7778 */
7879class Store {
7980 // TODO: add in support for a persistent readonly transaction for the lifetime
8081 // of the Store instance - Engines will need this
8182 // TODO: maybe refactor to split env/dbi params
8283 // TODO: support additional dbis
84+
85+ /**
86+ * @param {object } options
87+ * @param {boolean } options.create
88+ * @param {number } options.mapSize
89+ * @param {string } options.name
90+ * @param {boolean } options.noReadAhead
91+ * @param {string } options.path
92+ * @param {AsyncLocalStorage } context
93+ */
8394 constructor ( {
8495 create = false ,
8596 mapSize,
8697 name,
8798 noReadAhead = false ,
8899 path
89- } ) {
100+ } , context ) {
90101 this . env = new Env ( )
91102 this . env . open ( { path, mapSize, noReadAhead } )
92103 this . dbi = this . env . openDbi ( { name, create, keyIsBuffer : true } )
93- this . txn = null
104+
105+ if ( context ) {
106+ this . context = context
107+ } else {
108+ this . context = new AsyncLocalStorage ( )
109+ }
94110 }
95111
112+ /**
113+ *
114+ * @param {string } key
115+ * @param {object } value
116+ */
96117 put ( key , value ) {
97- this . transact ( ( ) => {
98- try {
99- const keyBuffer = Buffer . from ( key , 'utf8' )
100- let valueBuffer
101- if ( value && value [ '\x10binary-data\x02' ] )
102- valueBuffer = value [ '\x10binary-data\x02' ]
103- else
104- valueBuffer = pack ( value )
105- this . txn . putBinary ( this . dbi , keyBuffer , valueBuffer )
106- } catch ( error ) {
107- console . error ( 'Error storing value:' , error )
108- }
118+ this . transact ( ( txn ) => {
119+ const keyBuffer = Buffer . from ( key , 'utf8' )
120+ let valueBuffer
121+ if ( value && value [ '\x10binary-data\x02' ] )
122+ valueBuffer = value [ '\x10binary-data\x02' ]
123+ else
124+ valueBuffer = pack ( value )
125+ txn . putBinary ( this . dbi , keyBuffer , valueBuffer )
109126 } )
110127 }
111128
129+ /**
130+ *
131+ * @param {object } key
132+ * @returns
133+ */
112134 get ( key ) {
113- return this . transact ( ( ) => {
114- try {
115- const keyBuffer = Buffer . from ( key , 'utf8' )
116- const value = this . txn . getBinary ( this . dbi , keyBuffer )
117- return value == null ? null : unpack ( value )
118- } catch ( error ) {
119- console . error ( 'Error retrieving value:' , error )
120- return null
121- }
135+ return this . transact ( ( txn ) => {
136+ const keyBuffer = Buffer . from ( key , 'utf8' )
137+ const value = txn . getBinary ( this . dbi , keyBuffer )
138+ return value == null ? null : unpack ( value )
122139 } , true )
123140 }
124141
142+ /**
143+ *
144+ * @param {object } key
145+ * @returns
146+ */
125147 del ( key ) {
126- return this . transact ( ( ) => {
127- try {
128- const keyBuffer = Buffer . from ( key , 'utf8' )
129- if ( this . txn . getBinary ( this . dbi , keyBuffer ) != null ) {
130- this . txn . del ( this . dbi , keyBuffer )
131- return true
132- } else {
133- return false
134- }
135- } catch ( error ) {
136- console . error ( 'Error deleting value:' , error )
148+ return this . transact ( ( txn ) => {
149+ const keyBuffer = Buffer . from ( key , 'utf8' )
150+ if ( txn . getBinary ( this . dbi , keyBuffer ) != null ) {
151+ txn . del ( this . dbi , keyBuffer )
152+ return true
153+ } else {
154+ return false
137155 }
138156 } )
139157 }
140158
141159 /**
142- * Wrapper function for a transaction. This is only sensible in adset-consumer
143- * as there is only ever one thread of control working on the Store instance
144- * at a time.
145- * @param {* } f
160+ * Wrapper function for a transaction.
161+ * @param {* } f Function to execute within a txn
162+ * @param {boolean } [readOnly=false] Set to true if txn is readOnly
146163 */
147- transact ( f , readonly = false ) {
164+ transact ( f , readOnly = false ) {
148165 let ownTxn = false
149- if ( ! this . txn ) {
150- this . txn = this . env . beginTxn ( )
166+ let store = this . context . getStore ( )
167+ if ( ! store ) {
168+ store = { }
169+ this . context . enterWith ( store )
170+ }
171+ let txn = store . txn
172+ if ( ! txn ) {
173+ txn = this . env . beginTxn ( { readOnly } )
174+ store . txn = txn
151175 ownTxn = true
152176 }
153177
154178 try {
155- const result = f ( )
179+ const result = f ( txn )
156180 if ( ownTxn ) {
157- if ( readonly ) {
158- this . txn . abort ( )
159- }
160- else {
161- this . txn . commit ( )
162- }
181+ if ( readOnly )
182+ txn . abort ( )
183+ else
184+ txn . commit ( )
163185 }
164186 return result
165187 } catch ( error ) {
166- console . error ( 'Transaction aborted due to an error:' , error . message )
167- this . txn . abort ( )
188+ if ( ownTxn ) {
189+ console . error ( 'transaction aborted:' , error . message )
190+ txn . abort ( )
191+ }
168192 throw error
169193 } finally {
170194 if ( ownTxn )
171- this . txn = null
195+ store . txn = null
172196 }
173197 }
174198
175- async transactAsync ( f , readonly = false ) {
199+ async transactAsync ( f , readOnly = false ) {
176200 let ownTxn = false
177- if ( ! this . txn ) {
178- this . txn = this . env . beginTxn ( )
201+ let store = this . context . getStore ( )
202+ if ( ! store ) {
203+ store = { }
204+ this . context . enterWith ( store )
205+ }
206+ let txn = store . txn
207+ if ( ! txn ) {
208+ txn = this . env . beginTxn ( { readOnly } )
209+ store . txn = txn
179210 ownTxn = true
180211 }
181212
182213 try {
183- const result = await f ( )
214+ const result = await f ( txn )
184215 if ( ownTxn ) {
185- if ( readonly ) {
186- this . txn . abort ( )
187- }
188- else {
189- this . txn . commit ( )
190- }
216+ if ( readOnly )
217+ txn . abort ( )
218+ else
219+ txn . commit ( )
191220 }
192221 return result
193222 } catch ( error ) {
194- console . error ( 'Transaction aborted due to an error:' , error . message )
195- this . txn . abort ( )
223+ if ( ownTxn ) {
224+ console . error ( 'transaction aborted:' , error . message )
225+ txn . abort ( )
226+ }
196227 throw error
197228 } finally {
198229 if ( ownTxn )
199- this . txn = null
230+ store . txn = null
200231 }
201232 }
202233
203-
204234 iterate ( ) {
205235 return new Iterator ( this . env , this . dbi )
206236 }
@@ -216,10 +246,10 @@ class Store {
216246 getMany ( keys , callback ) {
217247 // TODO: optimise this: use zero-copy/unsafe buffers
218248 let results = new Array ( keys . length )
219- this . transact ( ( ) => {
249+ this . transact ( ( txn ) => {
220250 for ( let i = 0 , l = keys . length ; i < l ; i ++ ) {
221251 const keyBuffer = Buffer . from ( keys [ i ] , 'utf8' )
222- const valueBuffer = this . txn . getBinary ( this . dbi , keyBuffer )
252+ const valueBuffer = txn . getBinary ( this . dbi , keyBuffer )
223253 results [ i ] = ( valueBuffer != null ) ? unpack ( valueBuffer ) : null
224254 }
225255 } , true )
@@ -239,8 +269,8 @@ class Store {
239269 }
240270
241271 getCount ( ) {
242- return this . transact ( ( ) => {
243- return this . dbi . stat ( this . txn ) ?. entryCount
272+ return this . transact ( ( txn ) => {
273+ return this . dbi . stat ( txn ) ?. entryCount
244274 } )
245275 }
246276
0 commit comments