Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/full-turkeys-fail.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@livekit/rtc-node': patch
---

Handle room updates, move participant
3 changes: 3 additions & 0 deletions examples/agent-dispatch/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ async function createTokenWithAgentDispatch(): Promise<string> {
const at = new AccessToken();
at.identity = 'my-participant';
at.addGrant({ roomJoin: true, room: roomName });
at.attributes = {
mykey: 'myvalue',
};
at.roomConfig = new RoomConfiguration({
agents: [
new RoomAgentDispatch({
Expand Down
6 changes: 6 additions & 0 deletions packages/livekit-rtc/src/proto/participant_pb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,11 @@ export enum DisconnectReason {
* @generated from enum value: CONNECTION_TIMEOUT = 14;
*/
CONNECTION_TIMEOUT = 14,

/**
* @generated from enum value: MEDIA_FAILURE = 15;
*/
MEDIA_FAILURE = 15,
}
// Retrieve enum metadata with: proto2.getEnumType(DisconnectReason)
proto2.util.setEnumType(DisconnectReason, "livekit.proto.DisconnectReason", [
Expand All @@ -181,6 +186,7 @@ proto2.util.setEnumType(DisconnectReason, "livekit.proto.DisconnectReason", [
{ no: 12, name: "USER_REJECTED" },
{ no: 13, name: "SIP_TRUNK_FAILURE" },
{ no: 14, name: "CONNECTION_TIMEOUT" },
{ no: 15, name: "MEDIA_FAILURE" },
]);

/**
Expand Down
108 changes: 107 additions & 1 deletion packages/livekit-rtc/src/proto/room_pb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import type { BinaryReadOptions, FieldList, JsonReadOptions, JsonValue, PartialMessage, PlainMessage } from "@bufbuild/protobuf";
import { Message, proto2 } from "@bufbuild/protobuf";
import { DisconnectReason, OwnedParticipant } from "./participant_pb.js";
import { DisconnectReason, OwnedParticipant, ParticipantInfo } from "./participant_pb.js";
import { OwnedTrack, OwnedTrackPublication, TrackSource } from "./track_pb.js";
import { RtcStats } from "./stats_pb.js";
import { VideoCodec } from "./video_frame_pb.js";
Expand Down Expand Up @@ -2791,6 +2791,30 @@ export class RoomEvent extends Message<RoomEvent> {
*/
value: TextStreamOpened;
case: "textStreamOpened";
} | {
/**
* Room info updated
*
* @generated from field: livekit.proto.RoomInfo room_updated = 36;
*/
value: RoomInfo;
case: "roomUpdated";
} | {
/**
* Participant moved to new room
*
* @generated from field: livekit.proto.RoomInfo moved = 37;
*/
value: RoomInfo;
case: "moved";
} | {
/**
* carry over all participant info updates, including sid
*
* @generated from field: livekit.proto.ParticipantsUpdated participants_updated = 38;
*/
value: ParticipantsUpdated;
case: "participantsUpdated";
} | { case: undefined; value?: undefined } = { case: undefined };

constructor(data?: PartialMessage<RoomEvent>) {
Expand Down Expand Up @@ -2836,6 +2860,9 @@ export class RoomEvent extends Message<RoomEvent> {
{ no: 33, name: "data_channel_low_threshold_changed", kind: "message", T: DataChannelBufferedAmountLowThresholdChanged, oneof: "message" },
{ no: 34, name: "byte_stream_opened", kind: "message", T: ByteStreamOpened, oneof: "message" },
{ no: 35, name: "text_stream_opened", kind: "message", T: TextStreamOpened, oneof: "message" },
{ no: 36, name: "room_updated", kind: "message", T: RoomInfo, oneof: "message" },
{ no: 37, name: "moved", kind: "message", T: RoomInfo, oneof: "message" },
{ no: 38, name: "participants_updated", kind: "message", T: ParticipantsUpdated, oneof: "message" },
]);

static fromBinary(bytes: Uint8Array, options?: Partial<BinaryReadOptions>): RoomEvent {
Expand Down Expand Up @@ -2884,6 +2911,41 @@ export class RoomInfo extends Message<RoomInfo> {
*/
reliableDcBufferedAmountLowThreshold?: bigint;

/**
* @generated from field: required uint32 empty_timeout = 6;
*/
emptyTimeout?: number;

/**
* @generated from field: required uint32 departure_timeout = 7;
*/
departureTimeout?: number;

/**
* @generated from field: required uint32 max_participants = 8;
*/
maxParticipants?: number;

/**
* @generated from field: required int64 creation_time = 9;
*/
creationTime?: bigint;

/**
* @generated from field: required uint32 num_participants = 10;
*/
numParticipants?: number;

/**
* @generated from field: required uint32 num_publishers = 11;
*/
numPublishers?: number;

/**
* @generated from field: required bool active_recording = 12;
*/
activeRecording?: boolean;

constructor(data?: PartialMessage<RoomInfo>) {
super();
proto2.util.initPartial(data, this);
Expand All @@ -2897,6 +2959,13 @@ export class RoomInfo extends Message<RoomInfo> {
{ no: 3, name: "metadata", kind: "scalar", T: 9 /* ScalarType.STRING */, req: true },
{ no: 4, name: "lossy_dc_buffered_amount_low_threshold", kind: "scalar", T: 4 /* ScalarType.UINT64 */, req: true },
{ no: 5, name: "reliable_dc_buffered_amount_low_threshold", kind: "scalar", T: 4 /* ScalarType.UINT64 */, req: true },
{ no: 6, name: "empty_timeout", kind: "scalar", T: 13 /* ScalarType.UINT32 */, req: true },
{ no: 7, name: "departure_timeout", kind: "scalar", T: 13 /* ScalarType.UINT32 */, req: true },
{ no: 8, name: "max_participants", kind: "scalar", T: 13 /* ScalarType.UINT32 */, req: true },
{ no: 9, name: "creation_time", kind: "scalar", T: 3 /* ScalarType.INT64 */, req: true },
{ no: 10, name: "num_participants", kind: "scalar", T: 13 /* ScalarType.UINT32 */, req: true },
{ no: 11, name: "num_publishers", kind: "scalar", T: 13 /* ScalarType.UINT32 */, req: true },
{ no: 12, name: "active_recording", kind: "scalar", T: 8 /* ScalarType.BOOL */, req: true },
]);

static fromBinary(bytes: Uint8Array, options?: Partial<BinaryReadOptions>): RoomInfo {
Expand Down Expand Up @@ -2959,6 +3028,43 @@ export class OwnedRoom extends Message<OwnedRoom> {
}
}

/**
* @generated from message livekit.proto.ParticipantsUpdated
*/
export class ParticipantsUpdated extends Message<ParticipantsUpdated> {
/**
* @generated from field: repeated livekit.proto.ParticipantInfo participants = 1;
*/
participants: ParticipantInfo[] = [];

constructor(data?: PartialMessage<ParticipantsUpdated>) {
super();
proto2.util.initPartial(data, this);
}

static readonly runtime: typeof proto2 = proto2;
static readonly typeName = "livekit.proto.ParticipantsUpdated";
static readonly fields: FieldList = proto2.util.newFieldList(() => [
{ no: 1, name: "participants", kind: "message", T: ParticipantInfo, repeated: true },
]);

static fromBinary(bytes: Uint8Array, options?: Partial<BinaryReadOptions>): ParticipantsUpdated {
return new ParticipantsUpdated().fromBinary(bytes, options);
}

static fromJson(jsonValue: JsonValue, options?: Partial<JsonReadOptions>): ParticipantsUpdated {
return new ParticipantsUpdated().fromJson(jsonValue, options);
}

static fromJsonString(jsonString: string, options?: Partial<JsonReadOptions>): ParticipantsUpdated {
return new ParticipantsUpdated().fromJsonString(jsonString, options);
}

static equals(a: ParticipantsUpdated | PlainMessage<ParticipantsUpdated> | undefined, b: ParticipantsUpdated | PlainMessage<ParticipantsUpdated> | undefined): boolean {
return proto2.util.equals(ParticipantsUpdated, a, b);
}
}

/**
* @generated from message livekit.proto.ParticipantConnected
*/
Expand Down
79 changes: 79 additions & 0 deletions packages/livekit-rtc/src/room.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks>
return this.ffiHandle != undefined && this.connectionState != ConnectionState.CONN_DISCONNECTED;
}

/**
* Gets the room's server ID. This ID is assigned by the LiveKit server
* and is unique for each room session.
* SID is assigned asynchronously after connection.
* @returns Promise that resolves to the room's server ID, or empty string if not connected
*/
async getSid(): Promise<string> {
if (!this.isConnected) {
return '';
Expand All @@ -126,6 +132,44 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks>
});
}

get numParticipants(): number {
return this.info?.numParticipants ?? 0;
}

get numPublishers(): number {
return this.info?.numPublishers ?? 0;
}

get creationTime(): Date {
return new Date(Number(this.info?.creationTime ?? 0));
}

get isRecording(): boolean {
return this.info?.activeRecording ?? false;
}

/**
* The time in seconds after which a room will be closed after the last
* participant has disconnected.
*/
get departureTimeout(): number {
return this.info?.departureTimeout ?? 0;
}

/**
* The time in seconds after which an empty room will be automatically closed.
*/
get emptyTimeout(): number {
return this.info?.emptyTimeout ?? 0;
}

/**
* Connects to a LiveKit room using the provided URL and access token.
* @param url The WebSocket URL of the LiveKit server
* @param token A valid LiveKit access token for authentication
* @param opts Optional room configuration options
* @throws ConnectError if connection fails
*/
async connect(url: string, token: string, opts?: RoomOptions) {
const options = { ...defaultRoomOptions, ...opts };
const e2eeOptions = { ...defaultE2EEOptions, ...options.e2ee };
Expand Down Expand Up @@ -175,6 +219,10 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks>
}
}

/**
* Disconnects from the room and cleans up all resources.
* This will stop all tracks and close the connection.
*/
async disconnect() {
if (!this.isConnected) {
return;
Expand All @@ -193,6 +241,13 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks>
this.removeAllListeners();
}

/**
* Registers a handler for incoming text data streams on a specific topic.
* Text streams are used for receiving structured text data from other participants.
* @param topic The topic to listen for text streams on
* @param callback Function to handle incoming text stream data
* @throws Error if a handler for this topic is already registered
*/
registerTextStreamHandler(topic: string, callback: TextStreamHandler) {
if (this.textStreamHandlers.has(topic)) {
throw new Error(`A text stream handler for topic "${topic}" has already been set.`);
Expand All @@ -204,6 +259,13 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks>
this.textStreamHandlers.delete(topic);
}

/**
* Registers a handler for incoming byte data streams on a specific topic.
* Byte streams are used for receiving binary data like files from other participants.
* @param topic The topic to listen for byte streams on
* @param callback Function to handle incoming byte stream data
* @throws Error if a handler for this topic is already registered
*/
registerByteStreamHandler(topic: string, callback: ByteStreamHandler) {
if (this.byteStreamHandlers.has(topic)) {
throw new Error(`A byte stream handler for topic "${topic}" has already been set.`);
Expand Down Expand Up @@ -420,6 +482,19 @@ export class Room extends (EventEmitter as new () => TypedEmitter<RoomCallbacks>
this.handleStreamChunk(ev.value.chunk);
} else if (ev.case === 'streamTrailerReceived' && ev.value.trailer) {
this.handleStreamTrailer(ev.value.trailer);
} else if (ev.case === 'roomUpdated') {
this.info = ev.value;
this.emit(RoomEvent.RoomUpdated);
} else if (ev.case === 'moved') {
this.info = ev.value;
this.emit(RoomEvent.Moved);
} else if (ev.case === 'participantsUpdated') {
for (const info of ev.value.participants) {
const participant = this.retrieveParticipantByIdentity(info.identity!);
if (participant) {
participant.info = info;
}
}
}
};

Expand Down Expand Up @@ -631,6 +706,8 @@ export type RoomCallbacks = {
reconnecting: () => void;
reconnected: () => void;
roomSidChanged: (sid: string) => void;
roomUpdated: () => void;
moved: () => void;
};

export enum RoomEvent {
Expand Down Expand Up @@ -662,4 +739,6 @@ export enum RoomEvent {
Disconnected = 'disconnected',
Reconnecting = 'reconnecting',
Reconnected = 'reconnected',
RoomUpdated = 'roomUpdated',
Moved = 'moved',
}
4 changes: 2 additions & 2 deletions packages/livekit-server-sdk/src/AccessToken.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ export interface AccessTokenOptions {
identity?: string;

/**
* custom metadata to be passed to participants
* custom participant metadata
*/
metadata?: string;

/**
* custom attributes to be passed to participants
* custom participant attributes
*/
attributes?: Record<string, string>;
}
Expand Down
6 changes: 5 additions & 1 deletion packages/livekit-server-sdk/src/SipClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ export interface CreateSipParticipantOptions {
krispEnabled?: boolean;
/** If `true`, this will wait until the call is answered before returning. */
waitUntilAnswered?: boolean;
/** Optional request timeout in seconds. */
/** Optional request timeout in seconds. default 60 seconds if waitUntilAnswered is true, otherwise 10 seconds */
timeout?: number;
}

Expand Down Expand Up @@ -708,6 +708,10 @@ export class SipClient extends ServiceBase {
opts = {};
}

if (opts.timeout === undefined) {
opts.timeout = opts.waitUntilAnswered ? 60 : 10;
}

const req = new CreateSIPParticipantRequest({
sipTrunkId: sipTrunkId,
sipCallTo: number,
Expand Down
Loading