From 628ef2d6a30422de9abf351956becbb273745504 Mon Sep 17 00:00:00 2001 From: Shubhrakanti Ganguly Date: Fri, 18 Jul 2025 13:13:08 -0700 Subject: [PATCH 01/10] init --- packages/livekit-rtc/src/ffi_client.ts | 51 +++++++++++++++++++++ packages/livekit-rtc/src/room.ts | 61 ++++++++++++++++++++++++-- tsconfig.json | 2 +- 3 files changed, 110 insertions(+), 4 deletions(-) diff --git a/packages/livekit-rtc/src/ffi_client.ts b/packages/livekit-rtc/src/ffi_client.ts index 857b712b..c0a66b27 100644 --- a/packages/livekit-rtc/src/ffi_client.ts +++ b/packages/livekit-rtc/src/ffi_client.ts @@ -4,6 +4,7 @@ import type { PartialMessage } from '@bufbuild/protobuf'; import type { TypedEventEmitter as TypedEmitter } from '@livekit/typed-emitter'; import EventEmitter from 'events'; +import { log } from './log.js'; import { FfiHandle, livekitCopyBuffer, @@ -30,6 +31,49 @@ declare global { var _ffiClientInstance: FfiClient | undefined; } +export class FfiQueue { + private subscribers: Set> = new Set(); + + constructor() {} + + put(item: T): void { + const toRemove: Set> = new Set(); + + for (const controller of this.subscribers) { + try { + controller.enqueue(item); + } catch (error: unknown) { + log.error(error, 'Error enqueuing item to stream'); + toRemove.add(controller); + } + } + + for (const controller of toRemove) { + this.subscribers.delete(controller); + } + } + + subscribe(): ReadableStream { + let controller: ReadableStreamDefaultController; + + const stream = new ReadableStream({ + start: (ctrl) => { + controller = ctrl; + this.subscribers.add(controller); + }, + cancel: () => { + this.subscribers.delete(controller); + }, + }); + + return stream; + } + + unsubscribe(stream: ReadableStream): void { + stream.cancel(); + } +} + export class FfiClient extends (EventEmitter as new () => TypedEmitter) { /** @internal */ static get instance(): FfiClient { @@ -39,20 +83,27 @@ export class FfiClient extends (EventEmitter as new () => TypedEmitter = new FfiQueue(); constructor() { super(); this.setMaxListeners(0); + this._queue = new FfiQueue(); livekitInitialize( (event_data: Uint8Array) => { const event = FfiEvent.fromBinary(event_data); this.emit(FfiClientEvent.FfiEvent, event); + this._queue.put(event); }, true, SDK_VERSION, ); } + get queue(): FfiQueue { + return this._queue; + } + request(req: PartialMessage): T { const request = new FfiRequest(req); const req_data = request.toBinary(); diff --git a/packages/livekit-rtc/src/room.ts b/packages/livekit-rtc/src/room.ts index 7d0e4c39..66302330 100644 --- a/packages/livekit-rtc/src/room.ts +++ b/packages/livekit-rtc/src/room.ts @@ -13,7 +13,7 @@ import type { } from './data_streams/types.js'; import type { E2EEOptions } from './e2ee.js'; import { E2EEManager, defaultE2EEOptions } from './e2ee.js'; -import { FfiClient, FfiClientEvent, FfiHandle } from './ffi_client.js'; +import { FfiClient, FfiHandle } from './ffi_client.js'; import { log } from './log.js'; import type { Participant } from './participant.js'; import { LocalParticipant, RemoteParticipant } from './participant.js'; @@ -76,6 +76,8 @@ export const defaultRoomOptions = new FfiRoomOptions({ export class Room extends (EventEmitter as new () => TypedEmitter) { private info?: RoomInfo; private ffiHandle?: FfiHandle; + private ffiQueue?: ReadableStream; + private listenTaskPromise?: Promise; private byteStreamControllers = new Map>(); private textStreamControllers = new Map>(); @@ -88,8 +90,19 @@ export class Room extends (EventEmitter as new () => TypedEmitter remoteParticipants: Map = new Map(); localParticipant?: LocalParticipant; + private static cleanupRegistry = new FinalizationRegistry((cleanup: () => void) => { + cleanup(); + }); constructor() { super(); + // Register a finalizer to disconnect the room when it's garbage collected + Room.cleanupRegistry.register(this, () => { + // Note: This is a synchronous cleanup, so we can't await disconnect() + // but the disconnect() method is designed to handle this case + this.disconnect().catch((error) => { + log.error(error, `Error during cleanup of Room:${this.name}`); + }); + }); } get name(): string | undefined { @@ -180,6 +193,9 @@ export class Room extends (EventEmitter as new () => TypedEmitter options, }); + // subscribe before connecting so we don't miss any events + this.ffiQueue = FfiClient.instance.queue.subscribe(); + const res = FfiClient.instance.request({ message: { case: 'connect', @@ -211,12 +227,14 @@ export class Room extends (EventEmitter as new () => TypedEmitter } } - FfiClient.instance.on(FfiClientEvent.FfiEvent, this.onFfiEvent); break; case 'error': default: + FfiClient.instance.queue.unsubscribe(this.ffiQueue); throw new ConnectError(cb.message.value || ''); } + + this.listenTaskPromise = this.listenTask(); } /** @@ -237,10 +255,43 @@ export class Room extends (EventEmitter as new () => TypedEmitter }, }); - FfiClient.instance.removeListener(FfiClientEvent.FfiEvent, this.onFfiEvent); + // Wait for the listen task to complete before unsubscribing + if (this.listenTaskPromise) { + try { + await this.listenTaskPromise; + } catch (error) { + log.error(error, 'Error waiting for listen task to complete on disconnect.'); + } + } + + if (this.ffiQueue) { + FfiClient.instance.queue.unsubscribe(this.ffiQueue); + } + this.removeAllListeners(); } + private async listenTask() { + if (!this.ffiQueue) { + throw new Error('ffiQueue is not set'); + } + try { + for await (const event of this.ffiQueue) { + if ( + event.message.case == 'roomEvent' && + event.message.value?.roomHandle != this.ffiHandle!.handle && + event.message.value?.message.case == 'eos' + ) { + break; + } + this.onFfiEvent(event); + } + } catch (error) { + log.debug(error, 'Listen task ended'); + } finally { + this.listenTaskPromise = undefined; + } + } /** * Registers a handler for incoming text data streams on a specific topic. * Text streams are used for receiving structured text data from other participants. @@ -495,6 +546,10 @@ export class Room extends (EventEmitter as new () => TypedEmitter participant.info = info; } } + } else if (ev.case === 'eos') { + // End of stream - this will cause the listen task to terminate + // The stream will be closed and the for-await loop will exit + return; } }; diff --git a/tsconfig.json b/tsconfig.json index 2ad4c3d2..f09ddfe1 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -1,6 +1,6 @@ { "compilerOptions": { - "lib": ["es2015"], + "lib": ["es2015", "ES2021.WeakRef"], "target": "es2015" /* Specify ECMAScript target version: 'ES3' (default), 'ES5', 'ES2015', 'ES2016', 'ES2017', 'ES2018', 'ES2019', 'ES2020', or 'ESNEXT'. */, "module": "node16" /* Specify module code generation: 'none', 'commonjs', 'amd', 'system', 'umd', 'es2015', 'es2020', or 'ESNext'. */, "declaration": true, From 80769e5a0d24c221aab2d7d182c3f670911b4ee0 Mon Sep 17 00:00:00 2001 From: Shubhrakanti Ganguly Date: Fri, 18 Jul 2025 13:15:50 -0700 Subject: [PATCH 02/10] remove unused code --- packages/livekit-rtc/src/room.ts | 4 ---- 1 file changed, 4 deletions(-) diff --git a/packages/livekit-rtc/src/room.ts b/packages/livekit-rtc/src/room.ts index 66302330..b1ebc45b 100644 --- a/packages/livekit-rtc/src/room.ts +++ b/packages/livekit-rtc/src/room.ts @@ -546,10 +546,6 @@ export class Room extends (EventEmitter as new () => TypedEmitter participant.info = info; } } - } else if (ev.case === 'eos') { - // End of stream - this will cause the listen task to terminate - // The stream will be closed and the for-await loop will exit - return; } }; From 0c6fcb7e5756d2eb094ea4224410a67494a5ad7d Mon Sep 17 00:00:00 2001 From: Shubhrakanti Ganguly Date: Fri, 18 Jul 2025 13:21:21 -0700 Subject: [PATCH 03/10] update comments --- packages/livekit-rtc/src/ffi_client.ts | 5 +++++ packages/livekit-rtc/src/room.ts | 4 +++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/packages/livekit-rtc/src/ffi_client.ts b/packages/livekit-rtc/src/ffi_client.ts index c0a66b27..600715ce 100644 --- a/packages/livekit-rtc/src/ffi_client.ts +++ b/packages/livekit-rtc/src/ffi_client.ts @@ -69,6 +69,11 @@ export class FfiQueue { return stream; } + /** + * @throws "TypeError: Invalid state: ReadableStream is locked" + * if the stream is locked, make sure the stream is not being read from + * before calling this method. + */ unsubscribe(stream: ReadableStream): void { stream.cancel(); } diff --git a/packages/livekit-rtc/src/room.ts b/packages/livekit-rtc/src/room.ts index b1ebc45b..ad1dfcff 100644 --- a/packages/livekit-rtc/src/room.ts +++ b/packages/livekit-rtc/src/room.ts @@ -255,7 +255,9 @@ export class Room extends (EventEmitter as new () => TypedEmitter }, }); - // Wait for the listen task to complete before unsubscribing + // Wait for the listen task to complete before unsubscribing. + // This makes sure the we release the lock on the ffi queue's ReadableStream + // before calling cancel() on the ffi queue. if (this.listenTaskPromise) { try { await this.listenTaskPromise; From 79bfa1a87c67f4be998f57a1fdd13ea5dd311c5d Mon Sep 17 00:00:00 2001 From: Shubhrakanti Ganguly Date: Fri, 18 Jul 2025 13:34:22 -0700 Subject: [PATCH 04/10] changeset --- .changeset/pretty-donkeys-report.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/pretty-donkeys-report.md diff --git a/.changeset/pretty-donkeys-report.md b/.changeset/pretty-donkeys-report.md new file mode 100644 index 00000000..5690d34e --- /dev/null +++ b/.changeset/pretty-donkeys-report.md @@ -0,0 +1,5 @@ +--- +'@livekit/rtc-node': patch +--- + +Bugfix: Queue FFI events from rust and always process them in order From 8975a20d343441db0ebe22a33ada3fd9c07afb5f Mon Sep 17 00:00:00 2001 From: Shubhrakanti Ganguly Date: Sat, 19 Jul 2025 12:33:10 -0700 Subject: [PATCH 05/10] remove garbage collection hadle --- packages/livekit-rtc/src/room.ts | 11 ----------- tsconfig.json | 2 +- 2 files changed, 1 insertion(+), 12 deletions(-) diff --git a/packages/livekit-rtc/src/room.ts b/packages/livekit-rtc/src/room.ts index ad1dfcff..f36269af 100644 --- a/packages/livekit-rtc/src/room.ts +++ b/packages/livekit-rtc/src/room.ts @@ -90,19 +90,8 @@ export class Room extends (EventEmitter as new () => TypedEmitter remoteParticipants: Map = new Map(); localParticipant?: LocalParticipant; - private static cleanupRegistry = new FinalizationRegistry((cleanup: () => void) => { - cleanup(); - }); constructor() { super(); - // Register a finalizer to disconnect the room when it's garbage collected - Room.cleanupRegistry.register(this, () => { - // Note: This is a synchronous cleanup, so we can't await disconnect() - // but the disconnect() method is designed to handle this case - this.disconnect().catch((error) => { - log.error(error, `Error during cleanup of Room:${this.name}`); - }); - }); } get name(): string | undefined { diff --git a/tsconfig.json b/tsconfig.json index f09ddfe1..2ad4c3d2 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -1,6 +1,6 @@ { "compilerOptions": { - "lib": ["es2015", "ES2021.WeakRef"], + "lib": ["es2015"], "target": "es2015" /* Specify ECMAScript target version: 'ES3' (default), 'ES5', 'ES2015', 'ES2016', 'ES2017', 'ES2018', 'ES2019', 'ES2020', or 'ESNEXT'. */, "module": "node16" /* Specify module code generation: 'none', 'commonjs', 'amd', 'system', 'umd', 'es2015', 'es2020', or 'ESNext'. */, "declaration": true, From fca0fe0d40516280b875d6c0aa702c03bdaae4af Mon Sep 17 00:00:00 2001 From: Shubhrakanti Ganguly Date: Mon, 21 Jul 2025 14:02:53 -0700 Subject: [PATCH 06/10] simpler solution --- packages/livekit-rtc/src/ffi_client.ts | 56 -------------------------- packages/livekit-rtc/src/room.ts | 48 ++-------------------- 2 files changed, 3 insertions(+), 101 deletions(-) diff --git a/packages/livekit-rtc/src/ffi_client.ts b/packages/livekit-rtc/src/ffi_client.ts index 600715ce..857b712b 100644 --- a/packages/livekit-rtc/src/ffi_client.ts +++ b/packages/livekit-rtc/src/ffi_client.ts @@ -4,7 +4,6 @@ import type { PartialMessage } from '@bufbuild/protobuf'; import type { TypedEventEmitter as TypedEmitter } from '@livekit/typed-emitter'; import EventEmitter from 'events'; -import { log } from './log.js'; import { FfiHandle, livekitCopyBuffer, @@ -31,54 +30,6 @@ declare global { var _ffiClientInstance: FfiClient | undefined; } -export class FfiQueue { - private subscribers: Set> = new Set(); - - constructor() {} - - put(item: T): void { - const toRemove: Set> = new Set(); - - for (const controller of this.subscribers) { - try { - controller.enqueue(item); - } catch (error: unknown) { - log.error(error, 'Error enqueuing item to stream'); - toRemove.add(controller); - } - } - - for (const controller of toRemove) { - this.subscribers.delete(controller); - } - } - - subscribe(): ReadableStream { - let controller: ReadableStreamDefaultController; - - const stream = new ReadableStream({ - start: (ctrl) => { - controller = ctrl; - this.subscribers.add(controller); - }, - cancel: () => { - this.subscribers.delete(controller); - }, - }); - - return stream; - } - - /** - * @throws "TypeError: Invalid state: ReadableStream is locked" - * if the stream is locked, make sure the stream is not being read from - * before calling this method. - */ - unsubscribe(stream: ReadableStream): void { - stream.cancel(); - } -} - export class FfiClient extends (EventEmitter as new () => TypedEmitter) { /** @internal */ static get instance(): FfiClient { @@ -88,27 +39,20 @@ export class FfiClient extends (EventEmitter as new () => TypedEmitter = new FfiQueue(); constructor() { super(); this.setMaxListeners(0); - this._queue = new FfiQueue(); livekitInitialize( (event_data: Uint8Array) => { const event = FfiEvent.fromBinary(event_data); this.emit(FfiClientEvent.FfiEvent, event); - this._queue.put(event); }, true, SDK_VERSION, ); } - get queue(): FfiQueue { - return this._queue; - } - request(req: PartialMessage): T { const request = new FfiRequest(req); const req_data = request.toBinary(); diff --git a/packages/livekit-rtc/src/room.ts b/packages/livekit-rtc/src/room.ts index f36269af..7d0e4c39 100644 --- a/packages/livekit-rtc/src/room.ts +++ b/packages/livekit-rtc/src/room.ts @@ -13,7 +13,7 @@ import type { } from './data_streams/types.js'; import type { E2EEOptions } from './e2ee.js'; import { E2EEManager, defaultE2EEOptions } from './e2ee.js'; -import { FfiClient, FfiHandle } from './ffi_client.js'; +import { FfiClient, FfiClientEvent, FfiHandle } from './ffi_client.js'; import { log } from './log.js'; import type { Participant } from './participant.js'; import { LocalParticipant, RemoteParticipant } from './participant.js'; @@ -76,8 +76,6 @@ export const defaultRoomOptions = new FfiRoomOptions({ export class Room extends (EventEmitter as new () => TypedEmitter) { private info?: RoomInfo; private ffiHandle?: FfiHandle; - private ffiQueue?: ReadableStream; - private listenTaskPromise?: Promise; private byteStreamControllers = new Map>(); private textStreamControllers = new Map>(); @@ -182,9 +180,6 @@ export class Room extends (EventEmitter as new () => TypedEmitter options, }); - // subscribe before connecting so we don't miss any events - this.ffiQueue = FfiClient.instance.queue.subscribe(); - const res = FfiClient.instance.request({ message: { case: 'connect', @@ -216,14 +211,12 @@ export class Room extends (EventEmitter as new () => TypedEmitter } } + FfiClient.instance.on(FfiClientEvent.FfiEvent, this.onFfiEvent); break; case 'error': default: - FfiClient.instance.queue.unsubscribe(this.ffiQueue); throw new ConnectError(cb.message.value || ''); } - - this.listenTaskPromise = this.listenTask(); } /** @@ -244,45 +237,10 @@ export class Room extends (EventEmitter as new () => TypedEmitter }, }); - // Wait for the listen task to complete before unsubscribing. - // This makes sure the we release the lock on the ffi queue's ReadableStream - // before calling cancel() on the ffi queue. - if (this.listenTaskPromise) { - try { - await this.listenTaskPromise; - } catch (error) { - log.error(error, 'Error waiting for listen task to complete on disconnect.'); - } - } - - if (this.ffiQueue) { - FfiClient.instance.queue.unsubscribe(this.ffiQueue); - } - + FfiClient.instance.removeListener(FfiClientEvent.FfiEvent, this.onFfiEvent); this.removeAllListeners(); } - private async listenTask() { - if (!this.ffiQueue) { - throw new Error('ffiQueue is not set'); - } - try { - for await (const event of this.ffiQueue) { - if ( - event.message.case == 'roomEvent' && - event.message.value?.roomHandle != this.ffiHandle!.handle && - event.message.value?.message.case == 'eos' - ) { - break; - } - this.onFfiEvent(event); - } - } catch (error) { - log.debug(error, 'Listen task ended'); - } finally { - this.listenTaskPromise = undefined; - } - } /** * Registers a handler for incoming text data streams on a specific topic. * Text streams are used for receiving structured text data from other participants. From 7389f7d98c231a28e78e56f2d0d939eb0869d6a4 Mon Sep 17 00:00:00 2001 From: Shubhrakanti Ganguly Date: Mon, 21 Jul 2025 14:03:12 -0700 Subject: [PATCH 07/10] init solution --- packages/livekit-rtc/src/room.ts | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/packages/livekit-rtc/src/room.ts b/packages/livekit-rtc/src/room.ts index 7d0e4c39..5556d54e 100644 --- a/packages/livekit-rtc/src/room.ts +++ b/packages/livekit-rtc/src/room.ts @@ -82,6 +82,8 @@ export class Room extends (EventEmitter as new () => TypedEmitter private byteStreamHandlers = new Map(); private textStreamHandlers = new Map(); + private preConnectEvents: FfiEvent[] = []; + e2eeManager?: E2EEManager; connectionState: ConnectionState = ConnectionState.CONN_DISCONNECTED; @@ -180,6 +182,8 @@ export class Room extends (EventEmitter as new () => TypedEmitter options, }); + FfiClient.instance.on(FfiClientEvent.FfiEvent, this.onFfiEvent); + const res = FfiClient.instance.request({ message: { case: 'connect', @@ -210,8 +214,11 @@ export class Room extends (EventEmitter as new () => TypedEmitter rp.trackPublications.set(publication.sid!, publication); } } - - FfiClient.instance.on(FfiClientEvent.FfiEvent, this.onFfiEvent); + // process preConnectEvents + for (const ev of this.preConnectEvents) { + this.onFfiEvent(ev); + } + this.preConnectEvents = []; break; case 'error': default: @@ -279,7 +286,12 @@ export class Room extends (EventEmitter as new () => TypedEmitter private onFfiEvent = (ffiEvent: FfiEvent) => { if (!this.localParticipant || !this.ffiHandle || !this.info) { - throw TypeError('cannot handle ffi events before connectCallback'); + this.logger.debug( + { ffiEvent: ffiEvent.message.case }, + 'received ffi event before connectCallback, storing in preConnectEvents', + ); + this.preConnectEvents.push(ffiEvent); + return; } if (ffiEvent.message.case == 'rpcMethodInvocation') { From c2b8b76ebc18630a434aeb047e676b34cb78f9fb Mon Sep 17 00:00:00 2001 From: Shubhrakanti Ganguly Date: Mon, 21 Jul 2025 14:04:20 -0700 Subject: [PATCH 08/10] clean up --- packages/livekit-rtc/src/room.ts | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/packages/livekit-rtc/src/room.ts b/packages/livekit-rtc/src/room.ts index 5556d54e..25e7f0f8 100644 --- a/packages/livekit-rtc/src/room.ts +++ b/packages/livekit-rtc/src/room.ts @@ -214,11 +214,6 @@ export class Room extends (EventEmitter as new () => TypedEmitter rp.trackPublications.set(publication.sid!, publication); } } - // process preConnectEvents - for (const ev of this.preConnectEvents) { - this.onFfiEvent(ev); - } - this.preConnectEvents = []; break; case 'error': default: @@ -286,14 +281,17 @@ export class Room extends (EventEmitter as new () => TypedEmitter private onFfiEvent = (ffiEvent: FfiEvent) => { if (!this.localParticipant || !this.ffiHandle || !this.info) { - this.logger.debug( - { ffiEvent: ffiEvent.message.case }, - 'received ffi event before connectCallback, storing in preConnectEvents', - ); + this.logger.debug({ ffiEvent: ffiEvent.message.case }, 'received ffi event before connect'); this.preConnectEvents.push(ffiEvent); return; } + // process preConnectEvents if we recieved the connectCallback after the events were queued + for (const ev of this.preConnectEvents) { + this.onFfiEvent(ev); + } + this.preConnectEvents = []; + if (ffiEvent.message.case == 'rpcMethodInvocation') { if ( ffiEvent.message.value.localParticipantHandle == this.localParticipant.ffi_handle.handle From 1e5a64ed130b169d80684f159b4c35d708b47097 Mon Sep 17 00:00:00 2001 From: Shubhrakanti Ganguly Date: Mon, 21 Jul 2025 14:07:14 -0700 Subject: [PATCH 09/10] clean up --- packages/livekit-rtc/src/room.ts | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/packages/livekit-rtc/src/room.ts b/packages/livekit-rtc/src/room.ts index 25e7f0f8..c2bdfb2c 100644 --- a/packages/livekit-rtc/src/room.ts +++ b/packages/livekit-rtc/src/room.ts @@ -286,12 +286,16 @@ export class Room extends (EventEmitter as new () => TypedEmitter return; } - // process preConnectEvents if we recieved the connectCallback after the events were queued + // process preConnectEvents if we received the connectCallback after the events were queued for (const ev of this.preConnectEvents) { - this.onFfiEvent(ev); + this.processFfiEvent(ev); } this.preConnectEvents = []; + this.processFfiEvent(ffiEvent); + }; + + private processFfiEvent = (ffiEvent: FfiEvent) => { if (ffiEvent.message.case == 'rpcMethodInvocation') { if ( ffiEvent.message.value.localParticipantHandle == this.localParticipant.ffi_handle.handle From 772570f7c760212c109503e1ac06c5d68b391fa4 Mon Sep 17 00:00:00 2001 From: Shubhrakanti Ganguly Date: Mon, 21 Jul 2025 14:13:17 -0700 Subject: [PATCH 10/10] fix builds --- packages/livekit-rtc/src/room.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/packages/livekit-rtc/src/room.ts b/packages/livekit-rtc/src/room.ts index c2bdfb2c..d5484c83 100644 --- a/packages/livekit-rtc/src/room.ts +++ b/packages/livekit-rtc/src/room.ts @@ -281,7 +281,6 @@ export class Room extends (EventEmitter as new () => TypedEmitter private onFfiEvent = (ffiEvent: FfiEvent) => { if (!this.localParticipant || !this.ffiHandle || !this.info) { - this.logger.debug({ ffiEvent: ffiEvent.message.case }, 'received ffi event before connect'); this.preConnectEvents.push(ffiEvent); return; } @@ -296,6 +295,10 @@ export class Room extends (EventEmitter as new () => TypedEmitter }; private processFfiEvent = (ffiEvent: FfiEvent) => { + if (!this.localParticipant || !this.ffiHandle || !this.info) { + throw new Error('processFfiEvent called before connect'); + } + if (ffiEvent.message.case == 'rpcMethodInvocation') { if ( ffiEvent.message.value.localParticipantHandle == this.localParticipant.ffi_handle.handle