Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 69 additions & 50 deletions src/parser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ export default class Parser extends Emitter<ParserEvents> {
#messageSize = 0
#handshakeComplete = false
#isWaitingForData = true
#destroyed = false

constructor(socket: TLSSocket) {
super()
Expand All @@ -45,6 +46,7 @@ export default class Parser extends Emitter<ParserEvents> {
}

destroy(): void {
this.#destroyed = true
this.#isWaitingForData = false
this.#socket.removeListener('data', this.#handleData)
}
Expand All @@ -63,56 +65,67 @@ export default class Parser extends Emitter<ParserEvents> {
}
}

// Loop instead of recurse: each iteration checks the current state, ensures
// enough bytes are buffered, and dispatches to a handler. Handlers update
// #state / #data and return; they no longer call #waitForData() directly.
// The loop exits when the parser is destroyed or a handler sets
// #isWaitingForData = true (i.e. needs more bytes from the socket).
// The previous version recursed through #waitForData -> #handleGotMessageSize
// / #handleGotMessageBytes -> #getNextMessage -> #waitForData, which could
// blow the call stack on a burst of buffered messages or on a TCP segment
// that split a varint size header.
#waitForData() {
Logger.debug(`waitForData state: ${this.#state}`)
while (!this.#destroyed && !this.#isWaitingForData) {
Logger.debug(`waitForData state: ${this.#state}`)

let minBytesNeeded = 0
let minBytesNeeded = 0

switch (this.#state) {
case ProcessingState.MCS_VERSION_TAG_AND_SIZE:
minBytesNeeded += Variables.kVersionPacketLen
// eslint-disable-next-line no-fallthrough
case ProcessingState.MCS_TAG_AND_SIZE:
minBytesNeeded += Variables.kTagPacketLen
switch (this.#state) {
case ProcessingState.MCS_VERSION_TAG_AND_SIZE:
minBytesNeeded += Variables.kVersionPacketLen
// eslint-disable-next-line no-fallthrough
case ProcessingState.MCS_SIZE:
minBytesNeeded += Variables.kSizePacketLenMin
break
case ProcessingState.MCS_PROTO_BYTES:
minBytesNeeded = this.#messageSize
break
default:
this.#emitError(new Error(`Unexpected state: ${this.#state}`))
return
}

if (this.#data.length < minBytesNeeded) {
Logger.debug(`Waiting for ${minBytesNeeded - this.#data.length} more bytes. Got ${this.#data.length}`)
this.#isWaitingForData = true
return
}
case ProcessingState.MCS_TAG_AND_SIZE:
minBytesNeeded += Variables.kTagPacketLen
// eslint-disable-next-line no-fallthrough
case ProcessingState.MCS_SIZE:
minBytesNeeded += Variables.kSizePacketLenMin
break
case ProcessingState.MCS_PROTO_BYTES:
minBytesNeeded = this.#messageSize
break
default:
this.#emitError(new Error(`Unexpected state: ${this.#state}`))
return
}

Logger.debug(`Processing MCS data: state == ${this.#state}`)

switch (this.#state) {
case ProcessingState.MCS_VERSION_TAG_AND_SIZE:
this.#handleGotVersion()
this.#handleGotMessageTag()
this.#handleGotMessageSize()
break
case ProcessingState.MCS_TAG_AND_SIZE:
this.#handleGotMessageTag()
this.#handleGotMessageSize()
break
case ProcessingState.MCS_SIZE:
this.#handleGotMessageSize()
break
case ProcessingState.MCS_PROTO_BYTES:
this.#handleGotMessageBytes()
break
default:
this.#emitError(new Error(`Unexpected state: ${this.#state}`))
if (this.#data.length < minBytesNeeded) {
Logger.debug(`Waiting for ${minBytesNeeded - this.#data.length} more bytes. Got ${this.#data.length}`)
this.#isWaitingForData = true
return
}

Logger.debug(`Processing MCS data: state == ${this.#state}`)

switch (this.#state) {
case ProcessingState.MCS_VERSION_TAG_AND_SIZE:
this.#handleGotVersion()
this.#handleGotMessageTag()
this.#handleGotMessageSize()
break
case ProcessingState.MCS_TAG_AND_SIZE:
this.#handleGotMessageTag()
this.#handleGotMessageSize()
break
case ProcessingState.MCS_SIZE:
this.#handleGotMessageSize()
break
case ProcessingState.MCS_PROTO_BYTES:
this.#handleGotMessageBytes()
Comment on lines +111 to +123
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot proposed change sounds a bit excessive, but it has a point. Can you check it out for the PR please? @zzorba

break
default:
this.#emitError(new Error(`Unexpected state: ${this.#state}`))
return
}
}
}

Expand Down Expand Up @@ -154,8 +167,13 @@ export default class Parser extends Emitter<ParserEvents> {
// NOTE(ibash) I could only test this case by manually cutting the buffer
// above to be mid-packet like: new ProtobufJS.BufferReader(this.#data.slice(0, 1))
if (incompleteSizePacket) {
// The varint extends past what's currently buffered; we need more
// bytes from the socket. Mark that we're waiting and bail -- do
// NOT re-enter #waitForData here, the length check would still
// pass (#data.length >= kSizePacketLenMin) and the recursive call
// would loop until it blew the stack across many TCP reads.
this.#state = ProcessingState.MCS_SIZE
this.#waitForData()
this.#isWaitingForData = true
return
}

Expand All @@ -165,10 +183,10 @@ export default class Parser extends Emitter<ParserEvents> {

if (this.#messageSize > 0) {
this.#state = ProcessingState.MCS_PROTO_BYTES
this.#waitForData()
} else {
this.#handleGotMessageBytes()
return
}

this.#handleGotMessageBytes()
}

#handleGotMessageBytes() {
Expand All @@ -190,7 +208,7 @@ export default class Parser extends Emitter<ParserEvents> {
// Continue reading data.
Logger.debug(`Continuing data read. Buffer size is ${this.#data.length}, expecting ${this.#messageSize}`)
this.#state = ProcessingState.MCS_PROTO_BYTES
this.#waitForData()
this.#isWaitingForData = true
return
}

Expand Down Expand Up @@ -223,7 +241,8 @@ export default class Parser extends Emitter<ParserEvents> {
this.#messageTag = 0
this.#messageSize = 0
this.#state = ProcessingState.MCS_TAG_AND_SIZE
this.#waitForData()
// No recursion: #waitForData's loop will pick up the new state on the
// next iteration.
}

#buildProtobufFromTag(tag) {
Expand Down