diff --git a/src/parser.ts b/src/parser.ts index c7e4346..f31b2c3 100644 --- a/src/parser.ts +++ b/src/parser.ts @@ -36,6 +36,7 @@ export default class Parser extends Emitter { #messageSize = 0 #handshakeComplete = false #isWaitingForData = true + #destroyed = false constructor(socket: TLSSocket) { super() @@ -45,6 +46,7 @@ export default class Parser extends Emitter { } destroy(): void { + this.#destroyed = true this.#isWaitingForData = false this.#socket.removeListener('data', this.#handleData) } @@ -63,56 +65,67 @@ export default class Parser extends Emitter { } } + // Loop instead of recurse: each iteration checks the current state, ensures + // enough bytes are buffered, and dispatches to a handler. Handlers update + // #state / #data and return; they no longer call #waitForData() directly. + // The loop exits when the parser is destroyed or a handler sets + // #isWaitingForData = true (i.e. needs more bytes from the socket). + // The previous version recursed through #waitForData -> #handleGotMessageSize + // / #handleGotMessageBytes -> #getNextMessage -> #waitForData, which could + // blow the call stack on a burst of buffered messages or on a TCP segment + // that split a varint size header. #waitForData() { - Logger.debug(`waitForData state: ${this.#state}`) + while (!this.#destroyed && !this.#isWaitingForData) { + Logger.debug(`waitForData state: ${this.#state}`) - let minBytesNeeded = 0 + let minBytesNeeded = 0 - switch (this.#state) { - case ProcessingState.MCS_VERSION_TAG_AND_SIZE: - minBytesNeeded += Variables.kVersionPacketLen - // eslint-disable-next-line no-fallthrough - case ProcessingState.MCS_TAG_AND_SIZE: - minBytesNeeded += Variables.kTagPacketLen + switch (this.#state) { + case ProcessingState.MCS_VERSION_TAG_AND_SIZE: + minBytesNeeded += Variables.kVersionPacketLen // eslint-disable-next-line no-fallthrough - case ProcessingState.MCS_SIZE: - minBytesNeeded += Variables.kSizePacketLenMin - break - case ProcessingState.MCS_PROTO_BYTES: - minBytesNeeded = this.#messageSize - break - default: - this.#emitError(new Error(`Unexpected state: ${this.#state}`)) - return - } - - if (this.#data.length < minBytesNeeded) { - Logger.debug(`Waiting for ${minBytesNeeded - this.#data.length} more bytes. Got ${this.#data.length}`) - this.#isWaitingForData = true - return - } + case ProcessingState.MCS_TAG_AND_SIZE: + minBytesNeeded += Variables.kTagPacketLen + // eslint-disable-next-line no-fallthrough + case ProcessingState.MCS_SIZE: + minBytesNeeded += Variables.kSizePacketLenMin + break + case ProcessingState.MCS_PROTO_BYTES: + minBytesNeeded = this.#messageSize + break + default: + this.#emitError(new Error(`Unexpected state: ${this.#state}`)) + return + } - Logger.debug(`Processing MCS data: state == ${this.#state}`) - - switch (this.#state) { - case ProcessingState.MCS_VERSION_TAG_AND_SIZE: - this.#handleGotVersion() - this.#handleGotMessageTag() - this.#handleGotMessageSize() - break - case ProcessingState.MCS_TAG_AND_SIZE: - this.#handleGotMessageTag() - this.#handleGotMessageSize() - break - case ProcessingState.MCS_SIZE: - this.#handleGotMessageSize() - break - case ProcessingState.MCS_PROTO_BYTES: - this.#handleGotMessageBytes() - break - default: - this.#emitError(new Error(`Unexpected state: ${this.#state}`)) + if (this.#data.length < minBytesNeeded) { + Logger.debug(`Waiting for ${minBytesNeeded - this.#data.length} more bytes. Got ${this.#data.length}`) + this.#isWaitingForData = true return + } + + Logger.debug(`Processing MCS data: state == ${this.#state}`) + + switch (this.#state) { + case ProcessingState.MCS_VERSION_TAG_AND_SIZE: + this.#handleGotVersion() + this.#handleGotMessageTag() + this.#handleGotMessageSize() + break + case ProcessingState.MCS_TAG_AND_SIZE: + this.#handleGotMessageTag() + this.#handleGotMessageSize() + break + case ProcessingState.MCS_SIZE: + this.#handleGotMessageSize() + break + case ProcessingState.MCS_PROTO_BYTES: + this.#handleGotMessageBytes() + break + default: + this.#emitError(new Error(`Unexpected state: ${this.#state}`)) + return + } } } @@ -154,8 +167,13 @@ export default class Parser extends Emitter { // NOTE(ibash) I could only test this case by manually cutting the buffer // above to be mid-packet like: new ProtobufJS.BufferReader(this.#data.slice(0, 1)) if (incompleteSizePacket) { + // The varint extends past what's currently buffered; we need more + // bytes from the socket. Mark that we're waiting and bail -- do + // NOT re-enter #waitForData here, the length check would still + // pass (#data.length >= kSizePacketLenMin) and the recursive call + // would loop until it blew the stack across many TCP reads. this.#state = ProcessingState.MCS_SIZE - this.#waitForData() + this.#isWaitingForData = true return } @@ -165,10 +183,10 @@ export default class Parser extends Emitter { if (this.#messageSize > 0) { this.#state = ProcessingState.MCS_PROTO_BYTES - this.#waitForData() - } else { - this.#handleGotMessageBytes() + return } + + this.#handleGotMessageBytes() } #handleGotMessageBytes() { @@ -190,7 +208,7 @@ export default class Parser extends Emitter { // Continue reading data. Logger.debug(`Continuing data read. Buffer size is ${this.#data.length}, expecting ${this.#messageSize}`) this.#state = ProcessingState.MCS_PROTO_BYTES - this.#waitForData() + this.#isWaitingForData = true return } @@ -223,7 +241,8 @@ export default class Parser extends Emitter { this.#messageTag = 0 this.#messageSize = 0 this.#state = ProcessingState.MCS_TAG_AND_SIZE - this.#waitForData() + // No recursion: #waitForData's loop will pick up the new state on the + // next iteration. } #buildProtobufFromTag(tag) {