From 617b671720f845c9eab74da6b32503988390f3b9 Mon Sep 17 00:00:00 2001 From: David Zhao Date: Wed, 18 Jun 2025 00:02:59 -0700 Subject: [PATCH 1/2] Handle room updates, move participant --- examples/agent-dispatch/index.ts | 3 + packages/livekit-rtc/rust-sdks | 2 +- .../livekit-rtc/src/proto/participant_pb.ts | 6 + packages/livekit-rtc/src/proto/room_pb.ts | 108 +++++++++++++++++- packages/livekit-rtc/src/room.ts | 79 +++++++++++++ .../livekit-server-sdk/src/AccessToken.ts | 4 +- packages/livekit-server-sdk/src/SipClient.ts | 6 +- 7 files changed, 203 insertions(+), 5 deletions(-) diff --git a/examples/agent-dispatch/index.ts b/examples/agent-dispatch/index.ts index 4f403c24..151ef074 100644 --- a/examples/agent-dispatch/index.ts +++ b/examples/agent-dispatch/index.ts @@ -35,6 +35,9 @@ async function createTokenWithAgentDispatch(): Promise { const at = new AccessToken(); at.identity = 'my-participant'; at.addGrant({ roomJoin: true, room: roomName }); + at.attributes = { + mykey: 'myvalue', + }; at.roomConfig = new RoomConfiguration({ agents: [ new RoomAgentDispatch({ diff --git a/packages/livekit-rtc/rust-sdks b/packages/livekit-rtc/rust-sdks index fd7866cc..89355fbb 160000 --- a/packages/livekit-rtc/rust-sdks +++ b/packages/livekit-rtc/rust-sdks @@ -1 +1 @@ -Subproject commit fd7866cc4823f31282d34f00231b5b527a2878e8 +Subproject commit 89355fbb351014e39a8df5d599532972382f1326 diff --git a/packages/livekit-rtc/src/proto/participant_pb.ts b/packages/livekit-rtc/src/proto/participant_pb.ts index 9d15ce3c..7d9b708c 100644 --- a/packages/livekit-rtc/src/proto/participant_pb.ts +++ b/packages/livekit-rtc/src/proto/participant_pb.ts @@ -163,6 +163,11 @@ export enum DisconnectReason { * @generated from enum value: CONNECTION_TIMEOUT = 14; */ CONNECTION_TIMEOUT = 14, + + /** + * @generated from enum value: MEDIA_FAILURE = 15; + */ + MEDIA_FAILURE = 15, } // Retrieve enum metadata with: proto2.getEnumType(DisconnectReason) proto2.util.setEnumType(DisconnectReason, "livekit.proto.DisconnectReason", [ @@ -181,6 +186,7 @@ proto2.util.setEnumType(DisconnectReason, "livekit.proto.DisconnectReason", [ { no: 12, name: "USER_REJECTED" }, { no: 13, name: "SIP_TRUNK_FAILURE" }, { no: 14, name: "CONNECTION_TIMEOUT" }, + { no: 15, name: "MEDIA_FAILURE" }, ]); /** diff --git a/packages/livekit-rtc/src/proto/room_pb.ts b/packages/livekit-rtc/src/proto/room_pb.ts index b6eba2a4..a65fcad7 100644 --- a/packages/livekit-rtc/src/proto/room_pb.ts +++ b/packages/livekit-rtc/src/proto/room_pb.ts @@ -19,7 +19,7 @@ import type { BinaryReadOptions, FieldList, JsonReadOptions, JsonValue, PartialMessage, PlainMessage } from "@bufbuild/protobuf"; import { Message, proto2 } from "@bufbuild/protobuf"; -import { DisconnectReason, OwnedParticipant } from "./participant_pb.js"; +import { DisconnectReason, OwnedParticipant, ParticipantInfo } from "./participant_pb.js"; import { OwnedTrack, OwnedTrackPublication, TrackSource } from "./track_pb.js"; import { RtcStats } from "./stats_pb.js"; import { VideoCodec } from "./video_frame_pb.js"; @@ -2791,6 +2791,30 @@ export class RoomEvent extends Message { */ value: TextStreamOpened; case: "textStreamOpened"; + } | { + /** + * Room info updated + * + * @generated from field: livekit.proto.RoomInfo room_updated = 36; + */ + value: RoomInfo; + case: "roomUpdated"; + } | { + /** + * Participant moved to new room + * + * @generated from field: livekit.proto.RoomInfo moved = 37; + */ + value: RoomInfo; + case: "moved"; + } | { + /** + * carry over all participant info updates, including sid + * + * @generated from field: livekit.proto.ParticipantsUpdated participants_updated = 38; + */ + value: ParticipantsUpdated; + case: "participantsUpdated"; } | { case: undefined; value?: undefined } = { case: undefined }; constructor(data?: PartialMessage) { @@ -2836,6 +2860,9 @@ export class RoomEvent extends Message { { no: 33, name: "data_channel_low_threshold_changed", kind: "message", T: DataChannelBufferedAmountLowThresholdChanged, oneof: "message" }, { no: 34, name: "byte_stream_opened", kind: "message", T: ByteStreamOpened, oneof: "message" }, { no: 35, name: "text_stream_opened", kind: "message", T: TextStreamOpened, oneof: "message" }, + { no: 36, name: "room_updated", kind: "message", T: RoomInfo, oneof: "message" }, + { no: 37, name: "moved", kind: "message", T: RoomInfo, oneof: "message" }, + { no: 38, name: "participants_updated", kind: "message", T: ParticipantsUpdated, oneof: "message" }, ]); static fromBinary(bytes: Uint8Array, options?: Partial): RoomEvent { @@ -2884,6 +2911,41 @@ export class RoomInfo extends Message { */ reliableDcBufferedAmountLowThreshold?: bigint; + /** + * @generated from field: required uint32 empty_timeout = 6; + */ + emptyTimeout?: number; + + /** + * @generated from field: required uint32 departure_timeout = 7; + */ + departureTimeout?: number; + + /** + * @generated from field: required uint32 max_participants = 8; + */ + maxParticipants?: number; + + /** + * @generated from field: required int64 creation_time = 9; + */ + creationTime?: bigint; + + /** + * @generated from field: required uint32 num_participants = 10; + */ + numParticipants?: number; + + /** + * @generated from field: required uint32 num_publishers = 11; + */ + numPublishers?: number; + + /** + * @generated from field: required bool active_recording = 12; + */ + activeRecording?: boolean; + constructor(data?: PartialMessage) { super(); proto2.util.initPartial(data, this); @@ -2897,6 +2959,13 @@ export class RoomInfo extends Message { { no: 3, name: "metadata", kind: "scalar", T: 9 /* ScalarType.STRING */, req: true }, { no: 4, name: "lossy_dc_buffered_amount_low_threshold", kind: "scalar", T: 4 /* ScalarType.UINT64 */, req: true }, { no: 5, name: "reliable_dc_buffered_amount_low_threshold", kind: "scalar", T: 4 /* ScalarType.UINT64 */, req: true }, + { no: 6, name: "empty_timeout", kind: "scalar", T: 13 /* ScalarType.UINT32 */, req: true }, + { no: 7, name: "departure_timeout", kind: "scalar", T: 13 /* ScalarType.UINT32 */, req: true }, + { no: 8, name: "max_participants", kind: "scalar", T: 13 /* ScalarType.UINT32 */, req: true }, + { no: 9, name: "creation_time", kind: "scalar", T: 3 /* ScalarType.INT64 */, req: true }, + { no: 10, name: "num_participants", kind: "scalar", T: 13 /* ScalarType.UINT32 */, req: true }, + { no: 11, name: "num_publishers", kind: "scalar", T: 13 /* ScalarType.UINT32 */, req: true }, + { no: 12, name: "active_recording", kind: "scalar", T: 8 /* ScalarType.BOOL */, req: true }, ]); static fromBinary(bytes: Uint8Array, options?: Partial): RoomInfo { @@ -2959,6 +3028,43 @@ export class OwnedRoom extends Message { } } +/** + * @generated from message livekit.proto.ParticipantsUpdated + */ +export class ParticipantsUpdated extends Message { + /** + * @generated from field: repeated livekit.proto.ParticipantInfo participants = 1; + */ + participants: ParticipantInfo[] = []; + + constructor(data?: PartialMessage) { + super(); + proto2.util.initPartial(data, this); + } + + static readonly runtime: typeof proto2 = proto2; + static readonly typeName = "livekit.proto.ParticipantsUpdated"; + static readonly fields: FieldList = proto2.util.newFieldList(() => [ + { no: 1, name: "participants", kind: "message", T: ParticipantInfo, repeated: true }, + ]); + + static fromBinary(bytes: Uint8Array, options?: Partial): ParticipantsUpdated { + return new ParticipantsUpdated().fromBinary(bytes, options); + } + + static fromJson(jsonValue: JsonValue, options?: Partial): ParticipantsUpdated { + return new ParticipantsUpdated().fromJson(jsonValue, options); + } + + static fromJsonString(jsonString: string, options?: Partial): ParticipantsUpdated { + return new ParticipantsUpdated().fromJsonString(jsonString, options); + } + + static equals(a: ParticipantsUpdated | PlainMessage | undefined, b: ParticipantsUpdated | PlainMessage | undefined): boolean { + return proto2.util.equals(ParticipantsUpdated, a, b); + } +} + /** * @generated from message livekit.proto.ParticipantConnected */ diff --git a/packages/livekit-rtc/src/room.ts b/packages/livekit-rtc/src/room.ts index df98fd99..7d0e4c39 100644 --- a/packages/livekit-rtc/src/room.ts +++ b/packages/livekit-rtc/src/room.ts @@ -104,6 +104,12 @@ export class Room extends (EventEmitter as new () => TypedEmitter return this.ffiHandle != undefined && this.connectionState != ConnectionState.CONN_DISCONNECTED; } + /** + * Gets the room's server ID. This ID is assigned by the LiveKit server + * and is unique for each room session. + * SID is assigned asynchronously after connection. + * @returns Promise that resolves to the room's server ID, or empty string if not connected + */ async getSid(): Promise { if (!this.isConnected) { return ''; @@ -126,6 +132,44 @@ export class Room extends (EventEmitter as new () => TypedEmitter }); } + get numParticipants(): number { + return this.info?.numParticipants ?? 0; + } + + get numPublishers(): number { + return this.info?.numPublishers ?? 0; + } + + get creationTime(): Date { + return new Date(Number(this.info?.creationTime ?? 0)); + } + + get isRecording(): boolean { + return this.info?.activeRecording ?? false; + } + + /** + * The time in seconds after which a room will be closed after the last + * participant has disconnected. + */ + get departureTimeout(): number { + return this.info?.departureTimeout ?? 0; + } + + /** + * The time in seconds after which an empty room will be automatically closed. + */ + get emptyTimeout(): number { + return this.info?.emptyTimeout ?? 0; + } + + /** + * Connects to a LiveKit room using the provided URL and access token. + * @param url The WebSocket URL of the LiveKit server + * @param token A valid LiveKit access token for authentication + * @param opts Optional room configuration options + * @throws ConnectError if connection fails + */ async connect(url: string, token: string, opts?: RoomOptions) { const options = { ...defaultRoomOptions, ...opts }; const e2eeOptions = { ...defaultE2EEOptions, ...options.e2ee }; @@ -175,6 +219,10 @@ export class Room extends (EventEmitter as new () => TypedEmitter } } + /** + * Disconnects from the room and cleans up all resources. + * This will stop all tracks and close the connection. + */ async disconnect() { if (!this.isConnected) { return; @@ -193,6 +241,13 @@ export class Room extends (EventEmitter as new () => TypedEmitter this.removeAllListeners(); } + /** + * Registers a handler for incoming text data streams on a specific topic. + * Text streams are used for receiving structured text data from other participants. + * @param topic The topic to listen for text streams on + * @param callback Function to handle incoming text stream data + * @throws Error if a handler for this topic is already registered + */ registerTextStreamHandler(topic: string, callback: TextStreamHandler) { if (this.textStreamHandlers.has(topic)) { throw new Error(`A text stream handler for topic "${topic}" has already been set.`); @@ -204,6 +259,13 @@ export class Room extends (EventEmitter as new () => TypedEmitter this.textStreamHandlers.delete(topic); } + /** + * Registers a handler for incoming byte data streams on a specific topic. + * Byte streams are used for receiving binary data like files from other participants. + * @param topic The topic to listen for byte streams on + * @param callback Function to handle incoming byte stream data + * @throws Error if a handler for this topic is already registered + */ registerByteStreamHandler(topic: string, callback: ByteStreamHandler) { if (this.byteStreamHandlers.has(topic)) { throw new Error(`A byte stream handler for topic "${topic}" has already been set.`); @@ -420,6 +482,19 @@ export class Room extends (EventEmitter as new () => TypedEmitter this.handleStreamChunk(ev.value.chunk); } else if (ev.case === 'streamTrailerReceived' && ev.value.trailer) { this.handleStreamTrailer(ev.value.trailer); + } else if (ev.case === 'roomUpdated') { + this.info = ev.value; + this.emit(RoomEvent.RoomUpdated); + } else if (ev.case === 'moved') { + this.info = ev.value; + this.emit(RoomEvent.Moved); + } else if (ev.case === 'participantsUpdated') { + for (const info of ev.value.participants) { + const participant = this.retrieveParticipantByIdentity(info.identity!); + if (participant) { + participant.info = info; + } + } } }; @@ -631,6 +706,8 @@ export type RoomCallbacks = { reconnecting: () => void; reconnected: () => void; roomSidChanged: (sid: string) => void; + roomUpdated: () => void; + moved: () => void; }; export enum RoomEvent { @@ -662,4 +739,6 @@ export enum RoomEvent { Disconnected = 'disconnected', Reconnecting = 'reconnecting', Reconnected = 'reconnected', + RoomUpdated = 'roomUpdated', + Moved = 'moved', } diff --git a/packages/livekit-server-sdk/src/AccessToken.ts b/packages/livekit-server-sdk/src/AccessToken.ts index 76e45197..6ba8416e 100644 --- a/packages/livekit-server-sdk/src/AccessToken.ts +++ b/packages/livekit-server-sdk/src/AccessToken.ts @@ -30,12 +30,12 @@ export interface AccessTokenOptions { identity?: string; /** - * custom metadata to be passed to participants + * custom participant metadata */ metadata?: string; /** - * custom attributes to be passed to participants + * custom participant attributes */ attributes?: Record; } diff --git a/packages/livekit-server-sdk/src/SipClient.ts b/packages/livekit-server-sdk/src/SipClient.ts index 5f1d3a61..47c687ac 100644 --- a/packages/livekit-server-sdk/src/SipClient.ts +++ b/packages/livekit-server-sdk/src/SipClient.ts @@ -147,7 +147,7 @@ export interface CreateSipParticipantOptions { krispEnabled?: boolean; /** If `true`, this will wait until the call is answered before returning. */ waitUntilAnswered?: boolean; - /** Optional request timeout in seconds. */ + /** Optional request timeout in seconds. default 60 seconds if waitUntilAnswered is true, otherwise 10 seconds */ timeout?: number; } @@ -708,6 +708,10 @@ export class SipClient extends ServiceBase { opts = {}; } + if (opts.timeout === undefined) { + opts.timeout = opts.waitUntilAnswered ? 60 : 10; + } + const req = new CreateSIPParticipantRequest({ sipTrunkId: sipTrunkId, sipCallTo: number, From fbf4bfdd29c077b0cd20818070fd3aeb25b00db1 Mon Sep 17 00:00:00 2001 From: David Zhao Date: Wed, 18 Jun 2025 00:04:15 -0700 Subject: [PATCH 2/2] changeset --- .changeset/full-turkeys-fail.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/full-turkeys-fail.md diff --git a/.changeset/full-turkeys-fail.md b/.changeset/full-turkeys-fail.md new file mode 100644 index 00000000..373bb06f --- /dev/null +++ b/.changeset/full-turkeys-fail.md @@ -0,0 +1,5 @@ +--- +'@livekit/rtc-node': patch +--- + +Handle room updates, move participant