From d93000eb0005dcf24ed941f78dfaf3c4579b1923 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C5=82az=CC=87ej=20Pankowski?= <86720177+pblazej@users.noreply.github.com> Date: Tue, 5 May 2026 13:22:27 +0200 Subject: [PATCH 1/9] perf(connect): skip 20ms negotiate debounce on initial connect MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The publisher transport's `negotiate()` always runs through a 20ms debouncer to coalesce rapid renegotiations during track publishing. At connect time there is nothing to coalesce, and the delay sits directly in the critical path — most visible in single-PC mode where the client must initiate the SDP offer. Two changes: 1. `Transport.negotiate(force:)` — when `force=true`, cancel any pending debounced action and call `createAndSendOffer()` directly. 2. Move the eager-negotiate call out of `configureTransports` and into `fullConnectSequence` *after* `signalClient.resumeQueues()`, calling with `force=true`. This is required because offers are not queueable (`Livekit_SignalRequest.canBeQueued()` returns false for `.offer`), so a forced send while the request queue is still suspended would silently drop the offer. Cloud benchmark (BM-CONN-003 SinglePC) p50 wall time: ~406ms → ~338ms, closing the gap with BM-CONN-001 DualPC. Co-Authored-By: Claude Opus 4.7 (1M context) --- Sources/LiveKit/Core/Room+Engine.swift | 17 +++++++++++------ Sources/LiveKit/Core/Transport.swift | 12 +++++++++--- 2 files changed, 20 insertions(+), 9 deletions(-) diff --git a/Sources/LiveKit/Core/Room+Engine.swift b/Sources/LiveKit/Core/Room+Engine.swift index 66a5c8c6c..e99d9698a 100644 --- a/Sources/LiveKit/Core/Room+Engine.swift +++ b/Sources/LiveKit/Core/Room+Engine.swift @@ -53,11 +53,11 @@ extension Room { } } - func publisherShouldNegotiate() async throws { + func publisherShouldNegotiate(force: Bool = false) async throws { log() let publisher = try requirePublisher() - await publisher.negotiate() + try await publisher.negotiate(force: force) _state.mutate { $0.hasPublished = true } } @@ -196,10 +196,6 @@ extension Room { _state.mutate { $0.transport = transport } log("[Connect] Fast publish enabled: \(joinResponse.fastPublish ? "true" : "false")") - if isSinglePC || !isSubscriberPrimary || joinResponse.fastPublish { - // In single PC mode or when publisher is primary, negotiate immediately - try await publisherShouldNegotiate() - } } else if case let .reconnect(reconnectResponse) = connectResponse { log("[Connect] Configuring transports with RECONNECT response...") @@ -287,6 +283,15 @@ extension Room { // Resume after configuring transports... await signalClient.resumeQueues() + // Eager publisher negotiation must run after `resumeQueues()` — + // offers are not queueable, so sending while suspended drops them. + if case let .join(joinResponse) = connectResponse { + let isSubscriberPrimary = singlePC ? false : joinResponse.subscriberPrimary + if singlePC || !isSubscriberPrimary || joinResponse.fastPublish { + try await publisherShouldNegotiate(force: true) + } + } + // Wait for transport... try await primaryTransportConnectedCompleter.wait(timeout: _state.connectOptions.primaryTransportConnectTimeout) try Task.checkCancellation() diff --git a/Sources/LiveKit/Core/Transport.swift b/Sources/LiveKit/Core/Transport.swift index e5ecff708..7d23f4aaf 100644 --- a/Sources/LiveKit/Core/Transport.swift +++ b/Sources/LiveKit/Core/Transport.swift @@ -98,9 +98,15 @@ actor Transport: NSObject, Loggable { _delegate.add(delegate: delegate) } - func negotiate() async { - await _debounce.schedule { - try await self.createAndSendOffer() + func negotiate(force: Bool = false) async throws { + if force { + // Cancel any pending debounced negotiation; this call supersedes it. + await _debounce.cancel() + try await createAndSendOffer() + } else { + await _debounce.schedule { + try await self.createAndSendOffer() + } } } From cee9c73690e3335cf0405d1f4ecc7e1efdd7afc5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C5=82az=CC=87ej=20Pankowski?= <86720177+pblazej@users.noreply.github.com> Date: Tue, 5 May 2026 13:22:52 +0200 Subject: [PATCH 2/9] refactor(tracing): unify connect-time dc_open and add offer_sent split MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two related observability changes for the BM-CONN connect-time spans: 1. `dc_open` is now recorded in a single place — `Room.dataChannelDidOpen` — for both single-PC (publisher pair) and dual-PC (subscriber pair). `DataChannelPair` notifies via the new `DataChannelDelegate.dataChannelDidOpen` when the pair transitions to open. `TransportMode.connectDataChannelPair(...)` selects which pair is canonical for the active mode. The previous inline record in `Room+TransportDelegate` is removed; in single-PC mode it never fired (subscriber transport is nil), so `D_DC_MS` was missing from BM-CONN-003 entirely. 2. `offer_sent` is recorded in the publisher's `onOffer` block, mirroring the existing `answer_sent` split for server-initiated offers. The benchmark reads either label as `sdpDispatched` so `D_ICE_DTLS_MS` is recorded for both client-initiated (single PC, publisher-primary) and server-initiated (subscriber-primary) flows. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../LiveKitBenchmark/ConnectionBenchmark.swift | 14 ++++++++------ Sources/LiveKit/Core/DataChannelPair.swift | 12 +++++++++--- Sources/LiveKit/Core/Room+Engine.swift | 1 + Sources/LiveKit/Core/Room+TransportDelegate.swift | 4 ---- Sources/LiveKit/Core/Room.swift | 10 ++++++++++ Sources/LiveKit/Core/TransportMode.swift | 10 ++++++++++ 6 files changed, 38 insertions(+), 13 deletions(-) diff --git a/Benchmarks/LiveKitBenchmark/ConnectionBenchmark.swift b/Benchmarks/LiveKitBenchmark/ConnectionBenchmark.swift index d21c3d369..280909ef9 100644 --- a/Benchmarks/LiveKitBenchmark/ConnectionBenchmark.swift +++ b/Benchmarks/LiveKitBenchmark/ConnectionBenchmark.swift @@ -67,7 +67,9 @@ let connectionBenchmarks: @Sendable () -> Void = { let s = span.splitMilliseconds let wsOpen = s["ws_open"] ?? 0 let joinRecv = s["signal"] ?? s["join_recv"] ?? 0 - let answerSent = s["answer_sent"] + // Either side may initiate SDP — answer_sent in dual PC subscriber-primary + // (server-initiated offer), offer_sent in single PC / publisher-primary. + let sdpDispatched = s["answer_sent"] ?? s["offer_sent"] let pcConnected = s["pc_connected"] ?? 0 let dcOpen = s["dc_open"] @@ -75,8 +77,8 @@ let connectionBenchmarks: @Sendable () -> Void = { benchmark.measurement(dSignal, Int(joinRecv - wsOpen)) benchmark.measurement(dTransport, Int(pcConnected - joinRecv)) - if let answerSent { - benchmark.measurement(dIceDtls, Int(pcConnected - answerSent)) + if let sdpDispatched { + benchmark.measurement(dIceDtls, Int(pcConnected - sdpDispatched)) } if let dcOpen { @@ -120,7 +122,7 @@ let connectionBenchmarks: @Sendable () -> Void = { let s = span.splitMilliseconds let wsOpen = s["ws_open"] ?? 0 let joinRecv = s["signal"] ?? s["join_recv"] ?? 0 - let answerSent = s["answer_sent"] + let sdpDispatched = s["answer_sent"] ?? s["offer_sent"] let pcConnected = s["pc_connected"] ?? 0 let dcOpen = s["dc_open"] @@ -128,8 +130,8 @@ let connectionBenchmarks: @Sendable () -> Void = { benchmark.measurement(dSignal, Int(joinRecv - wsOpen)) benchmark.measurement(dTransport, Int(pcConnected - joinRecv)) - if let answerSent { - benchmark.measurement(dIceDtls, Int(pcConnected - answerSent)) + if let sdpDispatched { + benchmark.measurement(dIceDtls, Int(pcConnected - sdpDispatched)) } if let dcOpen { diff --git a/Sources/LiveKit/Core/DataChannelPair.swift b/Sources/LiveKit/Core/DataChannelPair.swift index e0d04a72b..79d4bb6bd 100644 --- a/Sources/LiveKit/Core/DataChannelPair.swift +++ b/Sources/LiveKit/Core/DataChannelPair.swift @@ -25,6 +25,7 @@ internal import LiveKitWebRTC protocol DataChannelDelegate: AnyObject, Sendable { func dataChannel(_ dataChannelPair: DataChannelPair, didReceiveDataPacket dataPacket: Livekit_DataPacket) func dataChannel(_ dataChannelPair: DataChannelPair, didFailToDecryptDataPacket dataPacket: Livekit_DataPacket, error: LiveKitError) + func dataChannelDidOpen(_ dataChannelPair: DataChannelPair) } // swiftlint:disable:next type_body_length @@ -282,7 +283,7 @@ class DataChannelPair: NSObject, @unchecked Sendable, Loggable { channel?.delegate = self if isOpen { - openCompleter.resume(returning: ()) + handleDidOpen() } } @@ -295,10 +296,15 @@ class DataChannelPair: NSObject, @unchecked Sendable, Loggable { channel?.delegate = self if isOpen { - openCompleter.resume(returning: ()) + handleDidOpen() } } + private func handleDidOpen() { + openCompleter.resume(returning: ()) + delegates.notify { $0.dataChannelDidOpen(self) } + } + func reset() { let (lossy, reliable) = _state.mutate { let result = ($0.lossy, $0.reliable) @@ -417,7 +423,7 @@ extension DataChannelPair: LKRTCDataChannelDelegate { func dataChannelDidChangeState(_: LKRTCDataChannel) { if isOpen { - openCompleter.resume(returning: ()) + handleDidOpen() } } diff --git a/Sources/LiveKit/Core/Room+Engine.swift b/Sources/LiveKit/Core/Room+Engine.swift index e99d9698a..a638761c0 100644 --- a/Sources/LiveKit/Core/Room+Engine.swift +++ b/Sources/LiveKit/Core/Room+Engine.swift @@ -165,6 +165,7 @@ extension Room { guard let self else { return } log("Publisher onOffer with offerId: \(offerId), sdp: \(offer.sdp)") try await signalClient.send(offer: offer, offerId: offerId) + connectSpan?.record("offer_sent") } // data over pub channel for backwards compatibility diff --git a/Sources/LiveKit/Core/Room+TransportDelegate.swift b/Sources/LiveKit/Core/Room+TransportDelegate.swift index de299cb59..a8f6a11c1 100644 --- a/Sources/LiveKit/Core/Room+TransportDelegate.swift +++ b/Sources/LiveKit/Core/Room+TransportDelegate.swift @@ -113,10 +113,6 @@ extension Room: TransportDelegate { case LKRTCDataChannel.Labels.lossy: subscriberDataChannel.set(lossy: dataChannel) default: log("Unknown data channel label \(dataChannel.label)", .warning) } - - if subscriberDataChannel.isOpen { - connectSpan?.record("dc_open") - } } func transportShouldNegotiate(_: Transport) {} diff --git a/Sources/LiveKit/Core/Room.swift b/Sources/LiveKit/Core/Room.swift index 1ec3773bf..5a2b5d220 100644 --- a/Sources/LiveKit/Core/Room.swift +++ b/Sources/LiveKit/Core/Room.swift @@ -740,4 +740,14 @@ extension Room: DataChannelDelegate { $0.room?(self, didFailToDecryptDataWithEror: error) } } + + func dataChannelDidOpen(_ pair: DataChannelPair) { + let connectPair = _state.transport?.connectDataChannelPair( + publisher: publisherDataChannel, + subscriber: subscriberDataChannel + ) + if pair === connectPair { + connectSpan?.record("dc_open") + } + } } diff --git a/Sources/LiveKit/Core/TransportMode.swift b/Sources/LiveKit/Core/TransportMode.swift index da42529ba..c2def8eb4 100644 --- a/Sources/LiveKit/Core/TransportMode.swift +++ b/Sources/LiveKit/Core/TransportMode.swift @@ -50,6 +50,16 @@ extension TransportMode { } } + /// The data-channel pair whose open event marks "data channels ready" during + /// the connect handshake: the publisher pair (client-created) in single PC, + /// the subscriber pair (server-pushed) in dual PC. + func connectDataChannelPair(publisher: DataChannelPair, subscriber: DataChannelPair) -> DataChannelPair { + switch self { + case .publisherOnly: publisher + case .subscriberPrimary, .publisherPrimary: subscriber + } + } + /// All distinct transports (one in single PC, two in dual PC). var allTransports: [Transport] { switch self { From ec1aba4f9d06bd1a5673591cc3454ad9e9d1ad7e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C5=82az=CC=87ej=20Pankowski?= <86720177+pblazej@users.noreply.github.com> Date: Tue, 5 May 2026 13:28:56 +0200 Subject: [PATCH 3/9] chore: add changeset for single-PC connect speedup Co-Authored-By: Claude Opus 4.7 (1M context) --- .changes/single-pc-connect-perf | 1 + 1 file changed, 1 insertion(+) create mode 100644 .changes/single-pc-connect-perf diff --git a/.changes/single-pc-connect-perf b/.changes/single-pc-connect-perf new file mode 100644 index 000000000..d4a74a5c0 --- /dev/null +++ b/.changes/single-pc-connect-perf @@ -0,0 +1 @@ +patch type="fixed" "Faster initial connect in single peer connection mode by skipping an unnecessary 20ms negotiate debounce" From 60a50980b3fb8a4b1f90d0696e82275930c90aff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C5=82az=CC=87ej=20Pankowski?= <86720177+pblazej@users.noreply.github.com> Date: Tue, 5 May 2026 13:34:40 +0200 Subject: [PATCH 4/9] chore(connect): wait for connect-time data channels before returning MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Awaits the canonical `DataChannelPair.openCompleter` after `pc_connected` so the `dc_open` tracing split is reliably captured before `connect()` returns. Best-effort (`try?`) so a data-channel-open timeout doesn't fail connect — the primary transport is already up at this point. In healthy connects the data channels share the SCTP transport and open within ~1-2ms of DTLS, so this adds negligible blocking. Recovers `D_DC_MS` as a reliable benchmark measurement (was 1/25 samples, now 25/25). Co-Authored-By: Claude Opus 4.7 (1M context) --- Sources/LiveKit/Core/Room+Engine.swift | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/Sources/LiveKit/Core/Room+Engine.swift b/Sources/LiveKit/Core/Room+Engine.swift index a638761c0..986ec80a9 100644 --- a/Sources/LiveKit/Core/Room+Engine.swift +++ b/Sources/LiveKit/Core/Room+Engine.swift @@ -299,6 +299,16 @@ extension Room { connectSpan?.record("engine") connectSpan?.record("pc_connected") + + // Wait for the data channels carrying connect-time signaling so the + // dc_open split is recorded before `connect()` returns. Best-effort: + // a timeout here doesn't fail connect (the primary transport is already up). + if let connectPair = _state.transport?.connectDataChannelPair( + publisher: publisherDataChannel, + subscriber: subscriberDataChannel + ) { + try? await connectPair.openCompleter.wait() + } } // swiftlint:disable:next cyclomatic_complexity function_body_length From 90e9ce164950fb8c0833f01218ec2308d6ac270d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C5=82az=CC=87ej=20Pankowski?= <86720177+pblazej@users.noreply.github.com> Date: Tue, 5 May 2026 13:39:59 +0200 Subject: [PATCH 5/9] refactor(tracing): drop dataChannelDidOpen delegate MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Now that the connect sequence awaits the canonical pair's openCompleter and records dc_open inline, the delegate hop is dead weight. Drops the protocol method, the Room implementation, and the handleDidOpen helper — the open-state notification falls back to direct openCompleter.resume(). Co-Authored-By: Claude Opus 4.7 (1M context) --- Sources/LiveKit/Core/DataChannelPair.swift | 12 +++--------- Sources/LiveKit/Core/Room+Engine.swift | 1 + Sources/LiveKit/Core/Room.swift | 10 ---------- 3 files changed, 4 insertions(+), 19 deletions(-) diff --git a/Sources/LiveKit/Core/DataChannelPair.swift b/Sources/LiveKit/Core/DataChannelPair.swift index 79d4bb6bd..e0d04a72b 100644 --- a/Sources/LiveKit/Core/DataChannelPair.swift +++ b/Sources/LiveKit/Core/DataChannelPair.swift @@ -25,7 +25,6 @@ internal import LiveKitWebRTC protocol DataChannelDelegate: AnyObject, Sendable { func dataChannel(_ dataChannelPair: DataChannelPair, didReceiveDataPacket dataPacket: Livekit_DataPacket) func dataChannel(_ dataChannelPair: DataChannelPair, didFailToDecryptDataPacket dataPacket: Livekit_DataPacket, error: LiveKitError) - func dataChannelDidOpen(_ dataChannelPair: DataChannelPair) } // swiftlint:disable:next type_body_length @@ -283,7 +282,7 @@ class DataChannelPair: NSObject, @unchecked Sendable, Loggable { channel?.delegate = self if isOpen { - handleDidOpen() + openCompleter.resume(returning: ()) } } @@ -296,15 +295,10 @@ class DataChannelPair: NSObject, @unchecked Sendable, Loggable { channel?.delegate = self if isOpen { - handleDidOpen() + openCompleter.resume(returning: ()) } } - private func handleDidOpen() { - openCompleter.resume(returning: ()) - delegates.notify { $0.dataChannelDidOpen(self) } - } - func reset() { let (lossy, reliable) = _state.mutate { let result = ($0.lossy, $0.reliable) @@ -423,7 +417,7 @@ extension DataChannelPair: LKRTCDataChannelDelegate { func dataChannelDidChangeState(_: LKRTCDataChannel) { if isOpen { - handleDidOpen() + openCompleter.resume(returning: ()) } } diff --git a/Sources/LiveKit/Core/Room+Engine.swift b/Sources/LiveKit/Core/Room+Engine.swift index 986ec80a9..d3d7f7a89 100644 --- a/Sources/LiveKit/Core/Room+Engine.swift +++ b/Sources/LiveKit/Core/Room+Engine.swift @@ -308,6 +308,7 @@ extension Room { subscriber: subscriberDataChannel ) { try? await connectPair.openCompleter.wait() + connectSpan?.record("dc_open") } } diff --git a/Sources/LiveKit/Core/Room.swift b/Sources/LiveKit/Core/Room.swift index 5a2b5d220..1ec3773bf 100644 --- a/Sources/LiveKit/Core/Room.swift +++ b/Sources/LiveKit/Core/Room.swift @@ -740,14 +740,4 @@ extension Room: DataChannelDelegate { $0.room?(self, didFailToDecryptDataWithEror: error) } } - - func dataChannelDidOpen(_ pair: DataChannelPair) { - let connectPair = _state.transport?.connectDataChannelPair( - publisher: publisherDataChannel, - subscriber: subscriberDataChannel - ) - if pair === connectPair { - connectSpan?.record("dc_open") - } - } } From 13998ba7c3874603361fcce2231953ddabcbf245 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C5=82az=CC=87ej=20Pankowski?= <86720177+pblazej@users.noreply.github.com> Date: Tue, 5 May 2026 13:50:54 +0200 Subject: [PATCH 6/9] revert(tracing): drop dc_open instrumentation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Recording dc_open without blocking connect on it produces a measurement inconsistent with the spec — `T_ROOM_CONNECTED` would arrive before `T_DC_OPEN`. Blocking on it (60a50980) added ~70-80ms to cloud connect latency for an internal benchmark concern. Drops: - The async dc_open recording in `fullConnectSequence` - `TransportMode.connectDataChannelPair(...)` (no remaining callers) - `D_DC_MS` from the benchmark output (kept commented for future use) Net effect: connect returns at `pc_connected` as before, no `D_DC_MS` column. Other splits (`D_WS_MS`, `D_SIGNAL_MS`, `D_TRANSPORT_MS`, `D_ICE_DTLS_MS`) are unaffected. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../ConnectionBenchmark.swift | 27 +++++++++---------- Sources/LiveKit/Core/Room+Engine.swift | 11 -------- Sources/LiveKit/Core/TransportMode.swift | 10 ------- 3 files changed, 12 insertions(+), 36 deletions(-) diff --git a/Benchmarks/LiveKitBenchmark/ConnectionBenchmark.swift b/Benchmarks/LiveKitBenchmark/ConnectionBenchmark.swift index 280909ef9..e531b3ae7 100644 --- a/Benchmarks/LiveKitBenchmark/ConnectionBenchmark.swift +++ b/Benchmarks/LiveKitBenchmark/ConnectionBenchmark.swift @@ -32,16 +32,19 @@ private let dWs: BenchmarkMetric = .custom("D_WS_MS", polarity: .prefersSmaller, private let dSignal: BenchmarkMetric = .custom("D_SIGNAL_MS", polarity: .prefersSmaller, useScalingFactor: false) private let dTransport: BenchmarkMetric = .custom("D_TRANSPORT_MS", polarity: .prefersSmaller, useScalingFactor: false) private let dIceDtls: BenchmarkMetric = .custom("D_ICE_DTLS_MS", polarity: .prefersSmaller, useScalingFactor: false) -private let dDc: BenchmarkMetric = .custom("D_DC_MS", polarity: .prefersSmaller, useScalingFactor: false) +// `D_DC_MS` is not collected: the SDK does not block `connect()` on data channels +// opening, so a `dc_open` split would race with `splitMilliseconds` being read +// here. See spec/01-connection-time.md for the spec-defined `T_DC_OPEN`. +// private let dDc: BenchmarkMetric = .custom("D_DC_MS", polarity: .prefersSmaller, useScalingFactor: false) let connectionBenchmarks: @Sendable () -> Void = { // BM-CONN-001: Dual PeerConnection, subscriber-primary (default) Benchmark( "BM-CONN-001-DualPC-SubscriberPrimary", configuration: .init( - metrics: .default + [dWs, dSignal, dTransport, dIceDtls, dDc], + metrics: .default + [dWs, dSignal, dTransport, dIceDtls], timeUnits: .milliseconds, - units: [dWs: .count, dSignal: .count, dTransport: .count, dIceDtls: .count, dDc: .count], + units: [dWs: .count, dSignal: .count, dTransport: .count, dIceDtls: .count], warmupIterations: 5, scalingFactor: .one, maxDuration: .seconds(300), @@ -71,7 +74,7 @@ let connectionBenchmarks: @Sendable () -> Void = { // (server-initiated offer), offer_sent in single PC / publisher-primary. let sdpDispatched = s["answer_sent"] ?? s["offer_sent"] let pcConnected = s["pc_connected"] ?? 0 - let dcOpen = s["dc_open"] + // let dcOpen = s["dc_open"] // see note on `dDc` above benchmark.measurement(dWs, Int(wsOpen)) benchmark.measurement(dSignal, Int(joinRecv - wsOpen)) @@ -80,10 +83,7 @@ let connectionBenchmarks: @Sendable () -> Void = { if let sdpDispatched { benchmark.measurement(dIceDtls, Int(pcConnected - sdpDispatched)) } - - if let dcOpen { - benchmark.measurement(dDc, Int(dcOpen - pcConnected)) - } + // if let dcOpen { benchmark.measurement(dDc, Int(dcOpen - pcConnected)) } } await room.disconnect() @@ -95,9 +95,9 @@ let connectionBenchmarks: @Sendable () -> Void = { Benchmark( "BM-CONN-003-SinglePC", configuration: .init( - metrics: .default + [dWs, dSignal, dTransport, dIceDtls, dDc], + metrics: .default + [dWs, dSignal, dTransport, dIceDtls], timeUnits: .milliseconds, - units: [dWs: .count, dSignal: .count, dTransport: .count, dIceDtls: .count, dDc: .count], + units: [dWs: .count, dSignal: .count, dTransport: .count, dIceDtls: .count], warmupIterations: 5, scalingFactor: .one, maxDuration: .seconds(300), @@ -124,7 +124,7 @@ let connectionBenchmarks: @Sendable () -> Void = { let joinRecv = s["signal"] ?? s["join_recv"] ?? 0 let sdpDispatched = s["answer_sent"] ?? s["offer_sent"] let pcConnected = s["pc_connected"] ?? 0 - let dcOpen = s["dc_open"] + // let dcOpen = s["dc_open"] // see note on `dDc` above benchmark.measurement(dWs, Int(wsOpen)) benchmark.measurement(dSignal, Int(joinRecv - wsOpen)) @@ -133,10 +133,7 @@ let connectionBenchmarks: @Sendable () -> Void = { if let sdpDispatched { benchmark.measurement(dIceDtls, Int(pcConnected - sdpDispatched)) } - - if let dcOpen { - benchmark.measurement(dDc, Int(dcOpen - pcConnected)) - } + // if let dcOpen { benchmark.measurement(dDc, Int(dcOpen - pcConnected)) } } await room.disconnect() diff --git a/Sources/LiveKit/Core/Room+Engine.swift b/Sources/LiveKit/Core/Room+Engine.swift index d3d7f7a89..a638761c0 100644 --- a/Sources/LiveKit/Core/Room+Engine.swift +++ b/Sources/LiveKit/Core/Room+Engine.swift @@ -299,17 +299,6 @@ extension Room { connectSpan?.record("engine") connectSpan?.record("pc_connected") - - // Wait for the data channels carrying connect-time signaling so the - // dc_open split is recorded before `connect()` returns. Best-effort: - // a timeout here doesn't fail connect (the primary transport is already up). - if let connectPair = _state.transport?.connectDataChannelPair( - publisher: publisherDataChannel, - subscriber: subscriberDataChannel - ) { - try? await connectPair.openCompleter.wait() - connectSpan?.record("dc_open") - } } // swiftlint:disable:next cyclomatic_complexity function_body_length diff --git a/Sources/LiveKit/Core/TransportMode.swift b/Sources/LiveKit/Core/TransportMode.swift index c2def8eb4..da42529ba 100644 --- a/Sources/LiveKit/Core/TransportMode.swift +++ b/Sources/LiveKit/Core/TransportMode.swift @@ -50,16 +50,6 @@ extension TransportMode { } } - /// The data-channel pair whose open event marks "data channels ready" during - /// the connect handshake: the publisher pair (client-created) in single PC, - /// the subscriber pair (server-pushed) in dual PC. - func connectDataChannelPair(publisher: DataChannelPair, subscriber: DataChannelPair) -> DataChannelPair { - switch self { - case .publisherOnly: publisher - case .subscriberPrimary, .publisherPrimary: subscriber - } - } - /// All distinct transports (one in single PC, two in dual PC). var allTransports: [Transport] { switch self { From 769386a9b9edc4a6ecd2499690b8b19b9d1fda92 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C5=82az=CC=87ej=20Pankowski?= <86720177+pblazej@users.noreply.github.com> Date: Tue, 5 May 2026 14:11:48 +0200 Subject: [PATCH 7/9] feat(connect): add Room.waitUntilDataChannelsOpen() Provides an opt-in await for the data channels created during the connect handshake. `connect()` returns at primary peer connection DTLS completion; data channels share the same SCTP transport and open shortly after but the open event fires asynchronously. Awaiting this method observes full handshake completion and records `dc_open` on the connect span. Mirrors the `RemoteParticipant.waitUntilActive(timeout:)` shape (default timeout, throwing, returns Self for chaining). The `BM-CONN` benchmarks call this between `connect()` and `stopMeasurement()` so wall-clock matches the spec's `T_ROOM_CONNECTED` and `D_DC_MS` is captured reliably. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../ConnectionBenchmark.swift | 27 ++++++----- Sources/LiveKit/Core/Room+DataChannels.swift | 46 +++++++++++++++++++ Sources/LiveKit/Core/TransportMode.swift | 10 ++++ 3 files changed, 71 insertions(+), 12 deletions(-) create mode 100644 Sources/LiveKit/Core/Room+DataChannels.swift diff --git a/Benchmarks/LiveKitBenchmark/ConnectionBenchmark.swift b/Benchmarks/LiveKitBenchmark/ConnectionBenchmark.swift index e531b3ae7..da5fd66b6 100644 --- a/Benchmarks/LiveKitBenchmark/ConnectionBenchmark.swift +++ b/Benchmarks/LiveKitBenchmark/ConnectionBenchmark.swift @@ -32,19 +32,16 @@ private let dWs: BenchmarkMetric = .custom("D_WS_MS", polarity: .prefersSmaller, private let dSignal: BenchmarkMetric = .custom("D_SIGNAL_MS", polarity: .prefersSmaller, useScalingFactor: false) private let dTransport: BenchmarkMetric = .custom("D_TRANSPORT_MS", polarity: .prefersSmaller, useScalingFactor: false) private let dIceDtls: BenchmarkMetric = .custom("D_ICE_DTLS_MS", polarity: .prefersSmaller, useScalingFactor: false) -// `D_DC_MS` is not collected: the SDK does not block `connect()` on data channels -// opening, so a `dc_open` split would race with `splitMilliseconds` being read -// here. See spec/01-connection-time.md for the spec-defined `T_DC_OPEN`. -// private let dDc: BenchmarkMetric = .custom("D_DC_MS", polarity: .prefersSmaller, useScalingFactor: false) +private let dDc: BenchmarkMetric = .custom("D_DC_MS", polarity: .prefersSmaller, useScalingFactor: false) let connectionBenchmarks: @Sendable () -> Void = { // BM-CONN-001: Dual PeerConnection, subscriber-primary (default) Benchmark( "BM-CONN-001-DualPC-SubscriberPrimary", configuration: .init( - metrics: .default + [dWs, dSignal, dTransport, dIceDtls], + metrics: .default + [dWs, dSignal, dTransport, dIceDtls, dDc], timeUnits: .milliseconds, - units: [dWs: .count, dSignal: .count, dTransport: .count, dIceDtls: .count], + units: [dWs: .count, dSignal: .count, dTransport: .count, dIceDtls: .count, dDc: .count], warmupIterations: 5, scalingFactor: .one, maxDuration: .seconds(300), @@ -63,6 +60,7 @@ let connectionBenchmarks: @Sendable () -> Void = { benchmark.startMeasurement() try await room.connect(url: config.url, token: token) + try? await room.waitUntilDataChannelsOpen() benchmark.stopMeasurement() // Extract fine-grained timestamps from the completed connect span @@ -74,7 +72,7 @@ let connectionBenchmarks: @Sendable () -> Void = { // (server-initiated offer), offer_sent in single PC / publisher-primary. let sdpDispatched = s["answer_sent"] ?? s["offer_sent"] let pcConnected = s["pc_connected"] ?? 0 - // let dcOpen = s["dc_open"] // see note on `dDc` above + let dcOpen = s["dc_open"] benchmark.measurement(dWs, Int(wsOpen)) benchmark.measurement(dSignal, Int(joinRecv - wsOpen)) @@ -83,7 +81,9 @@ let connectionBenchmarks: @Sendable () -> Void = { if let sdpDispatched { benchmark.measurement(dIceDtls, Int(pcConnected - sdpDispatched)) } - // if let dcOpen { benchmark.measurement(dDc, Int(dcOpen - pcConnected)) } + if let dcOpen { + benchmark.measurement(dDc, Int(dcOpen - pcConnected)) + } } await room.disconnect() @@ -95,9 +95,9 @@ let connectionBenchmarks: @Sendable () -> Void = { Benchmark( "BM-CONN-003-SinglePC", configuration: .init( - metrics: .default + [dWs, dSignal, dTransport, dIceDtls], + metrics: .default + [dWs, dSignal, dTransport, dIceDtls, dDc], timeUnits: .milliseconds, - units: [dWs: .count, dSignal: .count, dTransport: .count, dIceDtls: .count], + units: [dWs: .count, dSignal: .count, dTransport: .count, dIceDtls: .count, dDc: .count], warmupIterations: 5, scalingFactor: .one, maxDuration: .seconds(300), @@ -116,6 +116,7 @@ let connectionBenchmarks: @Sendable () -> Void = { benchmark.startMeasurement() try await room.connect(url: config.url, token: token) + try? await room.waitUntilDataChannelsOpen() benchmark.stopMeasurement() if let span = benchmarkTracer.completedSpan("connect") { @@ -124,7 +125,7 @@ let connectionBenchmarks: @Sendable () -> Void = { let joinRecv = s["signal"] ?? s["join_recv"] ?? 0 let sdpDispatched = s["answer_sent"] ?? s["offer_sent"] let pcConnected = s["pc_connected"] ?? 0 - // let dcOpen = s["dc_open"] // see note on `dDc` above + let dcOpen = s["dc_open"] benchmark.measurement(dWs, Int(wsOpen)) benchmark.measurement(dSignal, Int(joinRecv - wsOpen)) @@ -133,7 +134,9 @@ let connectionBenchmarks: @Sendable () -> Void = { if let sdpDispatched { benchmark.measurement(dIceDtls, Int(pcConnected - sdpDispatched)) } - // if let dcOpen { benchmark.measurement(dDc, Int(dcOpen - pcConnected)) } + if let dcOpen { + benchmark.measurement(dDc, Int(dcOpen - pcConnected)) + } } await room.disconnect() diff --git a/Sources/LiveKit/Core/Room+DataChannels.swift b/Sources/LiveKit/Core/Room+DataChannels.swift new file mode 100644 index 000000000..d7e8df806 --- /dev/null +++ b/Sources/LiveKit/Core/Room+DataChannels.swift @@ -0,0 +1,46 @@ +/* + * Copyright 2026 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import Foundation + +public extension Room { + /// Waits until the connect-time data channels have opened. + /// + /// ``connect(url:token:connectOptions:roomOptions:)`` returns once the + /// primary peer connection reaches DTLS completion. Data channels share + /// the same SCTP transport and typically open within a few milliseconds + /// after, but the open event is observed asynchronously. Await this + /// method when full handshake completion (readiness to send and receive + /// data) must be observed before proceeding. + /// + /// Records a `dc_open` event on ``connectSpan`` when the data channels open. + /// + /// - Parameters: + /// - timeout: The timeout for the operation. + /// - Throws: `LiveKitError` if data channels do not open within the timeout. + @discardableResult + func waitUntilDataChannelsOpen(timeout: TimeInterval = .defaultPublisherDataChannelOpen) async throws -> Self { + guard let pair = _state.transport?.connectDataChannelPair( + publisher: publisherDataChannel, + subscriber: subscriberDataChannel + ) else { + return self + } + try await pair.openCompleter.wait(timeout: timeout) + connectSpan?.record("dc_open") + return self + } +} diff --git a/Sources/LiveKit/Core/TransportMode.swift b/Sources/LiveKit/Core/TransportMode.swift index da42529ba..db768c4e8 100644 --- a/Sources/LiveKit/Core/TransportMode.swift +++ b/Sources/LiveKit/Core/TransportMode.swift @@ -50,6 +50,16 @@ extension TransportMode { } } + /// The data-channel pair that opens during the connect handshake: + /// the publisher pair (client-created) in single PC, the subscriber pair + /// (server-pushed) in dual PC. + func connectDataChannelPair(publisher: DataChannelPair, subscriber: DataChannelPair) -> DataChannelPair { + switch self { + case .publisherOnly: publisher + case .subscriberPrimary, .publisherPrimary: subscriber + } + } + /// All distinct transports (one in single PC, two in dual PC). var allTransports: [Transport] { switch self { From e531799484c869ee6a8816130bfc94c3fa904ecf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C5=82az=CC=87ej=20Pankowski?= <86720177+pblazej@users.noreply.github.com> Date: Tue, 5 May 2026 14:22:01 +0200 Subject: [PATCH 8/9] chore(connect): cancellation check after resumeQueues Matches the existing pattern of checking cancellation after each await in fullConnectSequence. Without it, a cancellation between resumeQueues and publisherShouldNegotiate could let an offer go out before the wait on primaryTransportConnectedCompleter fails. Co-Authored-By: Claude Opus 4.7 (1M context) --- Sources/LiveKit/Core/Room+Engine.swift | 1 + 1 file changed, 1 insertion(+) diff --git a/Sources/LiveKit/Core/Room+Engine.swift b/Sources/LiveKit/Core/Room+Engine.swift index a638761c0..ac4adf609 100644 --- a/Sources/LiveKit/Core/Room+Engine.swift +++ b/Sources/LiveKit/Core/Room+Engine.swift @@ -283,6 +283,7 @@ extension Room { // Resume after configuring transports... await signalClient.resumeQueues() + try Task.checkCancellation() // Eager publisher negotiation must run after `resumeQueues()` — // offers are not queueable, so sending while suspended drops them. From b37099b26e5518e85166ba424e15de39c79e5d1e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?B=C5=82az=CC=87ej=20Pankowski?= <86720177+pblazej@users.noreply.github.com> Date: Fri, 8 May 2026 11:14:52 +0200 Subject: [PATCH 9/9] Revert "feat(connect): add Room.waitUntilDataChannelsOpen()" This reverts commit 769386a9b9edc4a6ecd2499690b8b19b9d1fda92. --- .../ConnectionBenchmark.swift | 27 +++++------ Sources/LiveKit/Core/Room+DataChannels.swift | 46 ------------------- Sources/LiveKit/Core/TransportMode.swift | 10 ---- 3 files changed, 12 insertions(+), 71 deletions(-) delete mode 100644 Sources/LiveKit/Core/Room+DataChannels.swift diff --git a/Benchmarks/LiveKitBenchmark/ConnectionBenchmark.swift b/Benchmarks/LiveKitBenchmark/ConnectionBenchmark.swift index da5fd66b6..e531b3ae7 100644 --- a/Benchmarks/LiveKitBenchmark/ConnectionBenchmark.swift +++ b/Benchmarks/LiveKitBenchmark/ConnectionBenchmark.swift @@ -32,16 +32,19 @@ private let dWs: BenchmarkMetric = .custom("D_WS_MS", polarity: .prefersSmaller, private let dSignal: BenchmarkMetric = .custom("D_SIGNAL_MS", polarity: .prefersSmaller, useScalingFactor: false) private let dTransport: BenchmarkMetric = .custom("D_TRANSPORT_MS", polarity: .prefersSmaller, useScalingFactor: false) private let dIceDtls: BenchmarkMetric = .custom("D_ICE_DTLS_MS", polarity: .prefersSmaller, useScalingFactor: false) -private let dDc: BenchmarkMetric = .custom("D_DC_MS", polarity: .prefersSmaller, useScalingFactor: false) +// `D_DC_MS` is not collected: the SDK does not block `connect()` on data channels +// opening, so a `dc_open` split would race with `splitMilliseconds` being read +// here. See spec/01-connection-time.md for the spec-defined `T_DC_OPEN`. +// private let dDc: BenchmarkMetric = .custom("D_DC_MS", polarity: .prefersSmaller, useScalingFactor: false) let connectionBenchmarks: @Sendable () -> Void = { // BM-CONN-001: Dual PeerConnection, subscriber-primary (default) Benchmark( "BM-CONN-001-DualPC-SubscriberPrimary", configuration: .init( - metrics: .default + [dWs, dSignal, dTransport, dIceDtls, dDc], + metrics: .default + [dWs, dSignal, dTransport, dIceDtls], timeUnits: .milliseconds, - units: [dWs: .count, dSignal: .count, dTransport: .count, dIceDtls: .count, dDc: .count], + units: [dWs: .count, dSignal: .count, dTransport: .count, dIceDtls: .count], warmupIterations: 5, scalingFactor: .one, maxDuration: .seconds(300), @@ -60,7 +63,6 @@ let connectionBenchmarks: @Sendable () -> Void = { benchmark.startMeasurement() try await room.connect(url: config.url, token: token) - try? await room.waitUntilDataChannelsOpen() benchmark.stopMeasurement() // Extract fine-grained timestamps from the completed connect span @@ -72,7 +74,7 @@ let connectionBenchmarks: @Sendable () -> Void = { // (server-initiated offer), offer_sent in single PC / publisher-primary. let sdpDispatched = s["answer_sent"] ?? s["offer_sent"] let pcConnected = s["pc_connected"] ?? 0 - let dcOpen = s["dc_open"] + // let dcOpen = s["dc_open"] // see note on `dDc` above benchmark.measurement(dWs, Int(wsOpen)) benchmark.measurement(dSignal, Int(joinRecv - wsOpen)) @@ -81,9 +83,7 @@ let connectionBenchmarks: @Sendable () -> Void = { if let sdpDispatched { benchmark.measurement(dIceDtls, Int(pcConnected - sdpDispatched)) } - if let dcOpen { - benchmark.measurement(dDc, Int(dcOpen - pcConnected)) - } + // if let dcOpen { benchmark.measurement(dDc, Int(dcOpen - pcConnected)) } } await room.disconnect() @@ -95,9 +95,9 @@ let connectionBenchmarks: @Sendable () -> Void = { Benchmark( "BM-CONN-003-SinglePC", configuration: .init( - metrics: .default + [dWs, dSignal, dTransport, dIceDtls, dDc], + metrics: .default + [dWs, dSignal, dTransport, dIceDtls], timeUnits: .milliseconds, - units: [dWs: .count, dSignal: .count, dTransport: .count, dIceDtls: .count, dDc: .count], + units: [dWs: .count, dSignal: .count, dTransport: .count, dIceDtls: .count], warmupIterations: 5, scalingFactor: .one, maxDuration: .seconds(300), @@ -116,7 +116,6 @@ let connectionBenchmarks: @Sendable () -> Void = { benchmark.startMeasurement() try await room.connect(url: config.url, token: token) - try? await room.waitUntilDataChannelsOpen() benchmark.stopMeasurement() if let span = benchmarkTracer.completedSpan("connect") { @@ -125,7 +124,7 @@ let connectionBenchmarks: @Sendable () -> Void = { let joinRecv = s["signal"] ?? s["join_recv"] ?? 0 let sdpDispatched = s["answer_sent"] ?? s["offer_sent"] let pcConnected = s["pc_connected"] ?? 0 - let dcOpen = s["dc_open"] + // let dcOpen = s["dc_open"] // see note on `dDc` above benchmark.measurement(dWs, Int(wsOpen)) benchmark.measurement(dSignal, Int(joinRecv - wsOpen)) @@ -134,9 +133,7 @@ let connectionBenchmarks: @Sendable () -> Void = { if let sdpDispatched { benchmark.measurement(dIceDtls, Int(pcConnected - sdpDispatched)) } - if let dcOpen { - benchmark.measurement(dDc, Int(dcOpen - pcConnected)) - } + // if let dcOpen { benchmark.measurement(dDc, Int(dcOpen - pcConnected)) } } await room.disconnect() diff --git a/Sources/LiveKit/Core/Room+DataChannels.swift b/Sources/LiveKit/Core/Room+DataChannels.swift deleted file mode 100644 index d7e8df806..000000000 --- a/Sources/LiveKit/Core/Room+DataChannels.swift +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright 2026 LiveKit - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import Foundation - -public extension Room { - /// Waits until the connect-time data channels have opened. - /// - /// ``connect(url:token:connectOptions:roomOptions:)`` returns once the - /// primary peer connection reaches DTLS completion. Data channels share - /// the same SCTP transport and typically open within a few milliseconds - /// after, but the open event is observed asynchronously. Await this - /// method when full handshake completion (readiness to send and receive - /// data) must be observed before proceeding. - /// - /// Records a `dc_open` event on ``connectSpan`` when the data channels open. - /// - /// - Parameters: - /// - timeout: The timeout for the operation. - /// - Throws: `LiveKitError` if data channels do not open within the timeout. - @discardableResult - func waitUntilDataChannelsOpen(timeout: TimeInterval = .defaultPublisherDataChannelOpen) async throws -> Self { - guard let pair = _state.transport?.connectDataChannelPair( - publisher: publisherDataChannel, - subscriber: subscriberDataChannel - ) else { - return self - } - try await pair.openCompleter.wait(timeout: timeout) - connectSpan?.record("dc_open") - return self - } -} diff --git a/Sources/LiveKit/Core/TransportMode.swift b/Sources/LiveKit/Core/TransportMode.swift index db768c4e8..da42529ba 100644 --- a/Sources/LiveKit/Core/TransportMode.swift +++ b/Sources/LiveKit/Core/TransportMode.swift @@ -50,16 +50,6 @@ extension TransportMode { } } - /// The data-channel pair that opens during the connect handshake: - /// the publisher pair (client-created) in single PC, the subscriber pair - /// (server-pushed) in dual PC. - func connectDataChannelPair(publisher: DataChannelPair, subscriber: DataChannelPair) -> DataChannelPair { - switch self { - case .publisherOnly: publisher - case .subscriberPrimary, .publisherPrimary: subscriber - } - } - /// All distinct transports (one in single PC, two in dual PC). var allTransports: [Transport] { switch self {