Skip to content

Commit fe07d72

Browse files
committed
ensure disconnect events are still fired when disconnecting manually
1 parent c3931d1 commit fe07d72

2 files changed

Lines changed: 50 additions & 11 deletions

File tree

packages/livekit-rtc/src/room.ts

Lines changed: 34 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import { Mutex } from '@livekit/mutex';
55
import { EncryptionState, type EncryptionType } from '@livekit/rtc-ffi-bindings';
66
import type { FfiEvent } from '@livekit/rtc-ffi-bindings';
7-
import type { DisconnectReason, OwnedParticipant } from '@livekit/rtc-ffi-bindings';
7+
import { DisconnectReason, type OwnedParticipant } from '@livekit/rtc-ffi-bindings';
88
import type { DataStream_Trailer, DisconnectCallback } from '@livekit/rtc-ffi-bindings';
99
import {
1010
type ConnectCallback,
@@ -100,6 +100,11 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks>
100100
// preventing them from leaking when the room goes away.
101101
private disconnectController = new AbortController();
102102

103+
// Guards cleanupOnDisconnect so the ConnectionStateChanged/Disconnected
104+
// events fire exactly once, no matter which path (explicit disconnect()
105+
// vs. FFI 'disconnected' event) wins the race.
106+
private hasCleanedUp = false;
107+
103108
private _token?: string;
104109
private _serverUrl?: string;
105110

@@ -261,6 +266,7 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks>
261266
// Reset the abort controller for this connection session so that
262267
// a previous disconnect doesn't immediately cancel new operations.
263268
this.disconnectController = new AbortController();
269+
this.hasCleanedUp = false;
264270
this.localParticipant = new LocalParticipant(
265271
cb.message.value.localParticipant!,
266272
this.ffiEventLock,
@@ -309,13 +315,19 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks>
309315
return ev.message.case == 'disconnect' && ev.message.value.asyncId == res.asyncId;
310316
});
311317

312-
this.cleanupOnDisconnect();
313-
318+
this.cleanupOnDisconnect(DisconnectReason.CLIENT_INITIATED);
314319
FfiClient.instance.removeListener(FfiClientEvent.FfiEvent, this.onFfiEvent);
320+
315321
this.removeAllListeners();
316322
}
317323

318-
private cleanupOnDisconnect() {
324+
// Runs at most once per connection session. The FFI layer and explicit
325+
// disconnect() both race to get here — whichever wins emits the events,
326+
// the other is a no-op. A reconnect via connect() clears hasCleanedUp.
327+
private cleanupOnDisconnect(reason: DisconnectReason = DisconnectReason.CLIENT_INITIATED) {
328+
if (this.hasCleanedUp) return;
329+
this.hasCleanedUp = true;
330+
319331
// Error all in-progress stream controllers to prevent FD leaks.
320332
// Streams that were receiving data but never got a trailer (e.g. the sender
321333
// disconnected mid-transfer) would otherwise keep their ReadableStream open
@@ -346,10 +358,14 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks>
346358
// This causes any in-flight operations (publishData, publishTrack, etc.)
347359
// to reject and clean up their event listeners.
348360
this.disconnectController.abort();
349-
// Flip state synchronously so isConnected reflects reality immediately.
350-
// The connectionStateChanged FFI event may arrive after we've removed
351-
// our listener (see disconnect()), so relying on it alone is racy.
352-
this.connectionState = ConnectionState.CONN_DISCONNECTED;
361+
362+
// Only emit ConnectionStateChanged if the FFI 'connectionStateChanged'
363+
// path didn't already flip us to DISCONNECTED.
364+
if (this.connectionState !== ConnectionState.CONN_DISCONNECTED) {
365+
this.connectionState = ConnectionState.CONN_DISCONNECTED;
366+
this.emit(RoomEvent.ConnectionStateChanged, this.connectionState);
367+
}
368+
this.emit(RoomEvent.Disconnected, reason);
353369
}
354370

355371
/**
@@ -662,13 +678,20 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks>
662678
this.emit(RoomEvent.EncryptionError, new Error('internal server error'));
663679
}
664680
} else if (ev.case == 'connectionStateChanged') {
665-
this.connectionState = ev.value.state!;
681+
const newState = ev.value.state!;
682+
// Skip redundant transitions — cleanupOnDisconnect may have already
683+
// flipped us to DISCONNECTED, and we don't want to emit the event twice.
684+
if (this.connectionState === newState) {
685+
return;
686+
}
687+
this.connectionState = newState;
666688
this.emit(RoomEvent.ConnectionStateChanged, this.connectionState);
667689
/*} else if (ev.case == 'connected') {
668690
this.emit(RoomEvent.Connected);*/
669691
} else if (ev.case == 'disconnected') {
670-
this.cleanupOnDisconnect();
671-
this.emit(RoomEvent.Disconnected, ev.value.reason!);
692+
// cleanupOnDisconnect emits RoomEvent.Disconnected itself (guarded by
693+
// hasCleanedUp so it fires exactly once across both disconnect paths).
694+
this.cleanupOnDisconnect(ev.value.reason!);
672695
} else if (ev.case == 'reconnecting') {
673696
this.emit(RoomEvent.Reconnecting);
674697
} else if (ev.case == 'reconnected') {

packages/livekit-rtc/src/tests/e2e.test.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -636,13 +636,29 @@ describeE2E('livekit-rtc e2e', () => {
636636
{ timeoutMs: 5000, debugName: 'all tracks visible' },
637637
);
638638

639+
// Register listeners before disconnecting so we can verify both
640+
// RoomEvent.Disconnected and RoomEvent.ConnectionStateChanged fire
641+
// for every room, even when disconnects race.
642+
const disconnectedEvents = rooms.map((r) =>
643+
waitForRoomEvent(r, RoomEvent.Disconnected, 3_000, (reason) => reason),
644+
);
645+
const connectionStateEvents = rooms.map((r) =>
646+
waitForRoomEvent(r, RoomEvent.ConnectionStateChanged, 3_000, (state) => state),
647+
);
648+
639649
// Disconnect all participants simultaneously
640650
await Promise.all([...rooms.map((r) => r.disconnect()), ...sources.map((s) => s.close())]);
641651

652+
await Promise.all(disconnectedEvents);
653+
const observedStates = await Promise.all(connectionStateEvents);
654+
642655
// Verify all rooms are disconnected and remote participant maps are empty
643656
for (const room of rooms) {
644657
expect(room.isConnected).toBe(false);
645658
}
659+
for (const state of observedStates) {
660+
expect(state).toBe(ConnectionState.CONN_DISCONNECTED);
661+
}
646662
},
647663
testTimeoutMs * 2,
648664
);

0 commit comments

Comments
 (0)