diff --git a/.changes/typed-disconnect-error b/.changes/typed-disconnect-error new file mode 100644 index 000000000..a29bde27f --- /dev/null +++ b/.changes/typed-disconnect-error @@ -0,0 +1 @@ +patch type="fixed" "Report transport-level disconnects as LiveKitError(.network) instead of LiveKitError(.cancelled) so consumers can distinguish network failures from user-initiated cancellation" diff --git a/Sources/LiveKit/Core/DataChannelPair.swift b/Sources/LiveKit/Core/DataChannelPair.swift index e0d04a72b..662461d4e 100644 --- a/Sources/LiveKit/Core/DataChannelPair.swift +++ b/Sources/LiveKit/Core/DataChannelPair.swift @@ -299,7 +299,7 @@ class DataChannelPair: NSObject, @unchecked Sendable, Loggable { } } - func reset() { + func reset(throwing error: Error? = nil) { let (lossy, reliable) = _state.mutate { let result = ($0.lossy, $0.reliable) $0.reliable = nil @@ -312,7 +312,7 @@ class DataChannelPair: NSObject, @unchecked Sendable, Loggable { lossy?.close() reliable?.close() - openCompleter.reset() + openCompleter.reset(throwing: error) } // MARK: - Send diff --git a/Sources/LiveKit/Core/Room+Engine.swift b/Sources/LiveKit/Core/Room+Engine.swift index ac4adf609..073cd27a4 100644 --- a/Sources/LiveKit/Core/Room+Engine.swift +++ b/Sources/LiveKit/Core/Room+Engine.swift @@ -39,10 +39,10 @@ extension Room { } // Resets state of transports - func cleanUpRTC() async { + func cleanUpRTC(withError disconnectError: Error? = nil) async { // Close data channels - publisherDataChannel.reset() - subscriberDataChannel.reset() + publisherDataChannel.reset(throwing: disconnectError) + subscriberDataChannel.reset(throwing: disconnectError) await _state.transport?.close() diff --git a/Sources/LiveKit/Core/Room+TransportDelegate.swift b/Sources/LiveKit/Core/Room+TransportDelegate.swift index a8f6a11c1..5a754f9da 100644 --- a/Sources/LiveKit/Core/Room+TransportDelegate.swift +++ b/Sources/LiveKit/Core/Room+TransportDelegate.swift @@ -32,12 +32,16 @@ extension Room: TransportDelegate { func transport(_ transport: Transport, didUpdateState pcState: LKRTCPeerConnectionState) { log("target: \(transport.target), connectionState: \(pcState.description)") + let pcError: LiveKitError? = _state.connectionState.isTearingDown ? nil : LiveKitError( + .network, message: "Transport \(transport.target) state changed to \(pcState.description)" + ) + // primary connected if transport.isPrimary { if pcState.isConnected { primaryTransportConnectedCompleter.resume(returning: ()) } else if pcState.isDisconnected { - primaryTransportConnectedCompleter.reset() + primaryTransportConnectedCompleter.reset(throwing: pcError) } } @@ -46,7 +50,7 @@ extension Room: TransportDelegate { if pcState.isConnected { publisherTransportConnectedCompleter.resume(returning: ()) } else if pcState.isDisconnected { - publisherTransportConnectedCompleter.reset() + publisherTransportConnectedCompleter.reset(throwing: pcError) } } diff --git a/Sources/LiveKit/Core/Room.swift b/Sources/LiveKit/Core/Room.swift index 1ec3773bf..4c6906c19 100644 --- a/Sources/LiveKit/Core/Room.swift +++ b/Sources/LiveKit/Core/Room.swift @@ -545,15 +545,16 @@ extension Room { log("withError: \(String(describing: disconnectError)), isFullReconnect: \(isFullReconnect)") // Reset completers - _sidCompleter.reset() - primaryTransportConnectedCompleter.reset() - publisherTransportConnectedCompleter.reset() + _sidCompleter.reset(throwing: disconnectError) + primaryTransportConnectedCompleter.reset(throwing: disconnectError) + publisherTransportConnectedCompleter.reset(throwing: disconnectError) + await activeParticipantCompleters.reset(throwing: disconnectError) await signalClient.cleanUp(withError: disconnectError) // Cancel all track stats timers before closing transports to prevent // stats collection from accessing destroyed WebRTC channels. cancelTimers() - await cleanUpRTC() + await cleanUpRTC(withError: disconnectError) await cleanUpParticipants(isFullReconnect: isFullReconnect) // Cleanup for E2EE diff --git a/Sources/LiveKit/Core/SignalClient.swift b/Sources/LiveKit/Core/SignalClient.swift index 3e0ea7baa..89b021d7b 100644 --- a/Sources/LiveKit/Core/SignalClient.swift +++ b/Sources/LiveKit/Core/SignalClient.swift @@ -253,9 +253,9 @@ actor SignalClient: Loggable { $0.lastJoinResponse = nil } - _connectResponseCompleter.reset() + _connectResponseCompleter.reset(throwing: disconnectError) - await _addTrackCompleters.reset() + await _addTrackCompleters.reset(throwing: disconnectError) await _requestQueue.clear() await _responseQueue.clear() diff --git a/Sources/LiveKit/Support/Async/AsyncCompleter.swift b/Sources/LiveKit/Support/Async/AsyncCompleter.swift index a00a0a297..a661fd4a7 100644 --- a/Sources/LiveKit/Support/Async/AsyncCompleter.swift +++ b/Sources/LiveKit/Support/Async/AsyncCompleter.swift @@ -49,14 +49,14 @@ actor CompleterMapActor { } func resume(throwing error: any Error, for key: String) { - let completer = completer(for: key) + guard let completer = _completerMap[key] else { return } completer.resume(throwing: error) } - func reset() { + func reset(throwing error: Error? = nil) { // Reset call completers... for (_, value) in _completerMap { - value.reset() + value.reset(throwing: error) } // Clear all completers... _completerMap.removeAll() @@ -69,8 +69,8 @@ final class AsyncCompleter: @unchecked Sendable, Loggable { let continuation: CheckedContinuation let timeoutBlock: DispatchWorkItem - func cancel() { - continuation.resume(throwing: LiveKitError(.cancelled)) + func cancel(throwing error: LiveKitError? = nil) { + continuation.resume(throwing: error ?? LiveKitError(.cancelled)) timeoutBlock.cancel() } @@ -96,6 +96,10 @@ final class AsyncCompleter: @unchecked Sendable, Loggable { private let _lock: some Lock = createLock() + var waiterCount: Int { + _lock.sync { _entries.count } + } + init(label: String, defaultTimeout: TimeInterval) { self.label = label _defaultTimeout = defaultTimeout.toDispatchTimeInterval @@ -111,10 +115,10 @@ final class AsyncCompleter: @unchecked Sendable, Loggable { } } - func reset() { + func reset(throwing error: Error? = nil) { _lock.sync { for entry in _entries.values { - entry.cancel() + entry.cancel(throwing: LiveKitError.from(error: error)) } _entries.removeAll() _result = nil diff --git a/Sources/LiveKit/Types/ConnectionState.swift b/Sources/LiveKit/Types/ConnectionState.swift index e2cf4f884..e41ca5da8 100644 --- a/Sources/LiveKit/Types/ConnectionState.swift +++ b/Sources/LiveKit/Types/ConnectionState.swift @@ -43,3 +43,9 @@ extension ConnectionState: Identifiable { rawValue } } + +extension ConnectionState { + var isTearingDown: Bool { + self == .disconnecting || self == .disconnected + } +} diff --git a/Tests/LiveKitAudioTests/PublishDeviceOptimization.swift b/Tests/LiveKitAudioTests/PublishDeviceOptimization.swift index dcc0ede8a..84c619889 100644 --- a/Tests/LiveKitAudioTests/PublishDeviceOptimization.swift +++ b/Tests/LiveKitAudioTests/PublishDeviceOptimization.swift @@ -27,20 +27,20 @@ import LiveKitTestSupport // Default publish flow @Test func defaultMicPublish() async throws { - var sw = Stopwatch(label: "Test: Normal publish sequence") + let span = Span(label: "Test: Normal publish sequence") let room1Opts = RoomTestingOptions(url: url, token: token, canPublish: true) try await TestEnvironment.withRooms([room1Opts]) { rooms in - sw.split(label: "Connected to room") + span.record("Connected to room") // Alias to Rooms let room1 = rooms[0] try await room1.localParticipant.setMicrophone(enabled: true) - sw.split(label: "Did publish mic") + span.record("Did publish mic") } - sw.split(label: "Sequence complete") - print(sw) + span.record("Sequence complete") + print(span) - print("Total time: \(sw.total())") + print("Total time: \(span.total())") } // No-VP publish flow @@ -48,39 +48,39 @@ import LiveKitTestSupport // Turn off Apple's VP try AudioManager.shared.setVoiceProcessingEnabled(false) - var sw = Stopwatch(label: "Test: No-VP publish sequence") + let span = Span(label: "Test: No-VP publish sequence") let room1Opts = RoomTestingOptions(url: url, token: token, canPublish: true) try await TestEnvironment.withRooms([room1Opts]) { rooms in - sw.split(label: "Connected to room") + span.record("Connected to room") // Alias to Rooms let room1 = rooms[0] try await room1.localParticipant.setMicrophone(enabled: true) - sw.split(label: "Did publish mic") + span.record("Did publish mic") } - sw.split(label: "Sequence complete") - print(sw) + span.record("Sequence complete") + print(span) - print("Total time: \(sw.total())") + print("Total time: \(span.total())") } // Concurrent device acquisition publish flow @Test func concurrentMicPublish() async throws { - var sw = Stopwatch(label: "Test: Normal publish sequence") + let span = Span(label: "Test: Normal publish sequence") let room1Opts = RoomTestingOptions(url: url, token: token, enableMicrophone: true, canPublish: true) try await TestEnvironment.withRooms([room1Opts]) { rooms in - sw.split(label: "Connected to room") + span.record("Connected to room") // Alias to Rooms let room1 = rooms[0] // Mic should be already enabled at this point let isMicEnabled = room1.localParticipant.isMicrophoneEnabled() #expect(isMicEnabled, "Mic should be enabled at this point") - sw.split(label: "Did publish mic") + span.record("Did publish mic") } - sw.split(label: "Sequence complete") - print(sw) + span.record("Sequence complete") + print(span) - print("Total time: \(sw.total())") + print("Total time: \(span.total())") } } diff --git a/Tests/LiveKitCoreTests/CompleterTests.swift b/Tests/LiveKitCoreTests/CompleterTests.swift index 177d9abd6..6c63b96aa 100644 --- a/Tests/LiveKitCoreTests/CompleterTests.swift +++ b/Tests/LiveKitCoreTests/CompleterTests.swift @@ -108,4 +108,125 @@ struct CompleterTests { print("Unknown error: \(error)") } } + + @Test func resetThrowingPropagatesTypedError() async { + let completer = AsyncCompleter(label: "reset-throwing", defaultTimeout: 30) + let task = Task { try await completer.wait() } + await waitForRegistration(of: completer) + + completer.reset(throwing: LiveKitError(.network, message: "transport failed")) + + await expectLiveKitError(.network, from: task) + } + + @Test func taskCancellationStillProducesCancelled() async { + let completer = AsyncCompleter(label: "task-cancel", defaultTimeout: 30) + let task = Task { try await completer.wait() } + await waitForRegistration(of: completer) + + task.cancel() + + await expectLiveKitError(.cancelled, from: task) + } + + @Test func resetClearsResultForReuse() async throws { + let completer = AsyncCompleter(label: "reuse-after-throw", defaultTimeout: 30) + + let firstTask = Task { try await completer.wait() } + await waitForRegistration(of: completer) + completer.reset(throwing: LiveKitError(.network)) + _ = await firstTask.result + + let secondTask = Task { try await completer.wait() } + await waitForRegistration(of: completer) + completer.resume(returning: ()) + try await secondTask.value + } +} + +private func waitForRegistration(of completer: AsyncCompleter) async { + while completer.waiterCount == 0 { + await Task.yield() + } +} + +private func expectLiveKitError(_ expected: LiveKitErrorType, from task: Task) async { + do { + _ = try await task.value + Issue.record("Expected LiveKitError(.\(expected)) to be thrown") + } catch let error as LiveKitError { + #expect(error.type == expected) + } catch { + Issue.record("Expected LiveKitError, got \(error)") + } +} + +@Suite(.tags(.concurrency)) +struct CompleterMapActorTests { + @Test func resetThrowingFanOutsTypedErrorToAllCompleters() async { + let map = CompleterMapActor(label: "map-test", defaultTimeout: 30) + + let completerA = await map.completer(for: "a") + let completerB = await map.completer(for: "b") + + let taskA = Task { try await completerA.wait() } + let taskB = Task { try await completerB.wait() } + + await waitForRegistration(of: completerA) + await waitForRegistration(of: completerB) + + await map.reset(throwing: LiveKitError(.network, message: "fan-out")) + + await expectLiveKitError(.network, from: taskA) + await expectLiveKitError(.network, from: taskB) + } + + @Test func resetWithoutErrorDefaultsToCancelled() async { + let map = CompleterMapActor(label: "map-test", defaultTimeout: 30) + let completer = await map.completer(for: "a") + let task = Task { try await completer.wait() } + + await waitForRegistration(of: completer) + + await map.reset() + + await expectLiveKitError(.cancelled, from: task) + } + + @Test func resumeThrowingForMissingKeyIsNoOp() async throws { + let map = CompleterMapActor(label: "no-op-test", defaultTimeout: 30) + + // No completer for the key yet — resume(throwing:) must not auto-create. + await map.resume(throwing: LiveKitError(.participantRemoved), for: "absent") + + // Subsequent wait on the same key must NOT see a stale "remembered" failure. + let completer = await map.completer(for: "absent") + let task = Task { try await completer.wait() } + await waitForRegistration(of: completer) + completer.resume(returning: ()) + try await task.value + } + + @Test func resumeReturningForMissingKeyRemembersSuccess() async throws { + let map = CompleterMapActor(label: "remember-success", defaultTimeout: 30) + + // resume(returning:) on a missing key creates and remembers the value. + await map.resume(returning: (), for: "key") + + // A later wait must see the success immediately. + let completer = await map.completer(for: "key") + try await completer.wait() + } + + @Test func resumeThrowingReachesExistingWaiter() async { + let map = CompleterMapActor(label: "existing-waiter", defaultTimeout: 30) + + let completer = await map.completer(for: "key") + let task = Task { try await completer.wait() } + await waitForRegistration(of: completer) + + await map.resume(throwing: LiveKitError(.network), for: "key") + + await expectLiveKitError(.network, from: task) + } } diff --git a/Tests/LiveKitCoreTests/Room/RoomTests.swift b/Tests/LiveKitCoreTests/Room/RoomTests.swift index 66ad13352..7f82e81f4 100644 --- a/Tests/LiveKitCoreTests/Room/RoomTests.swift +++ b/Tests/LiveKitCoreTests/Room/RoomTests.swift @@ -152,8 +152,7 @@ private struct WeakRoomRefs: @unchecked Sendable { localParticipant = room.localParticipant for remoteParticipant in room.remoteParticipants.values { - weak var weakRP: RemoteParticipant? = remoteParticipant - remoteParticipantChecks.append { weakRP == nil } + remoteParticipantChecks.append { [weak remoteParticipant] in remoteParticipant == nil } } state = room._state