-
Notifications
You must be signed in to change notification settings - Fork 1
Meeting: harden realtime fallback and async jobs completion #39
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
d7e1645
e22c98b
b35623a
4a42ed3
2f3a496
d2bd3cc
3b012d8
874a5cf
a715a74
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,7 +6,7 @@ final class AsyncTranscriptionJobService { | |
| SettingsStorage.shared.proxyBaseURL.trimmingCharacters(in: CharacterSet(charactersIn: "/")) | ||
| } | ||
|
|
||
| private let maxRetries = 3 | ||
| private let maxJobWaitSeconds: TimeInterval = 7200 | ||
| private let maxAudioBytesForSpeechPrecheck = 25 * 1024 * 1024 | ||
| private let longRunningSessionBodyThresholdBytes = 10 * 1024 * 1024 | ||
| private let strictSpeechPrecheck = false | ||
|
|
@@ -167,18 +167,19 @@ final class AsyncTranscriptionJobService { | |
| try Task.checkCancellation() | ||
| let submission = try await submitJob(audioData: audioData, config: config) | ||
|
|
||
| var retries = 0 | ||
| var sseFailures = 0 | ||
| let deadline = Date().addingTimeInterval(maxJobWaitSeconds) | ||
|
|
||
| while retries < self.maxRetries { | ||
| while Date() < deadline { | ||
| try Task.checkCancellation() | ||
| do { | ||
| let result = try await streamJobResult(jobId: submission.jobId, onUpdate: onUpdate) | ||
| return result.text | ||
| } catch is CancellationError { | ||
| throw CancellationError() | ||
| } catch { | ||
| retries += 1 | ||
| Log.transcription.warning("SSE stream failed (attempt \(retries)/\(self.maxRetries)): \(error)") | ||
| sseFailures += 1 | ||
| Log.transcription.warning("SSE stream failed (attempt \(sseFailures)): \(error)") | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use This changed line uses As per coding guidelines, " 🤖 Prompt for AI AgentsSource: Coding guidelines |
||
|
|
||
| // Check if job finished while disconnected | ||
| let status = try await getJobStatus(jobId: submission.jobId) | ||
|
|
@@ -188,14 +189,19 @@ final class AsyncTranscriptionJobService { | |
| if status.status == "error" { | ||
| throw TranscriptionError.apiError(status.error ?? "Transcription failed") | ||
| } | ||
| if let parsed = JobStatus(rawValue: status.status) { | ||
| onUpdate(parsed) | ||
| } | ||
|
|
||
| // Still in progress — backoff and retry SSE | ||
| // Still in progress. SSE is best-effort; keep polling/retrying until | ||
| // the server-side job reaches a terminal state or the long job timeout. | ||
| try Task.checkCancellation() | ||
| try await Task.sleep(nanoseconds: UInt64(retries) * 2_000_000_000) | ||
| let delaySeconds = min(Double(max(sseFailures, 1)) * 2, 30) | ||
| try await Task.sleep(nanoseconds: UInt64(delaySeconds * 1_000_000_000)) | ||
| } | ||
| } | ||
|
|
||
| throw TranscriptionError.apiError("Failed to get transcription result after \(self.maxRetries) retries") | ||
| throw TranscriptionError.apiError("Timed out waiting for transcription result") | ||
| } | ||
|
|
||
| // MARK: - Upload Preparation | ||
|
|
@@ -405,8 +411,13 @@ final class AsyncTranscriptionJobService { | |
| guard let jsonData = data.data(using: .utf8) else { | ||
| throw TranscriptionError.invalidResponse | ||
| } | ||
| let result = try JSONDecoder().decode(JobTranscriptionResult.self, from: jsonData) | ||
| return JobResult(text: result.text) | ||
| if let wrapped = try? JSONDecoder().decode(JobStatusResponse.self, from: jsonData), | ||
| let result = wrapped.result | ||
| { | ||
| return JobResult(text: result.text) | ||
| } | ||
| let direct = try JSONDecoder().decode(JobTranscriptionResult.self, from: jsonData) | ||
| return JobResult(text: direct.text) | ||
| } | ||
|
|
||
| private func parseErrorMessage(_ data: String) -> String { | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -150,7 +150,17 @@ final class CloudRealtimeService: NSObject, @unchecked Sendable { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| let configString = String(data: configData, encoding: .utf8) ?? "{}" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| NSLog("[Cloud RT] Sending config: %@", configString) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| try await task.send(.string(configString)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| do { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| try await task.send(.string(configString)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } catch { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // A refused upgrade (e.g. HTTP 402 usage limit) surfaces as the first | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // send/receive throwing. Map 402 to a typed usage error so the caller | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // shows "limit reached" instead of a generic connection failure. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if let usageError = await usageLimitUpgradeError() { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| throw usageError | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| throw error | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| NSLog("[Cloud RT] Config sent successfully, WebSocket connected") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| isConnected = true | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -417,6 +427,21 @@ final class CloudRealtimeService: NSObject, @unchecked Sendable { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // MARK: - Reconnect | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// If the last WS upgrade was refused with HTTP 402, map it to a typed usage | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// error (using the best usage numbers we have) and kick off a refresh so the | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// UI shows accurate figures shortly. Returns nil for any other status. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private func usageLimitUpgradeError() async -> RealtimeTranscriptionError? { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| guard (webSocketTask?.response as? HTTPURLResponse)?.statusCode == 402 else { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return nil | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| let usage = await UsageService.shared.cachedUsage | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| await UsageService.shared.refresh() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return .usageLimitExceeded( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| usedHours: usage?.usedHours ?? 0, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| limitHours: usage?.limitHours ?? 5 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+433
to
+442
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Build the usage-limit error from the refreshed cache. Both 402 paths snapshot Also applies to: 465-470 🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// Called when the receive loop exits due to an error or a server-initiated close. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// ADR-0004 edge cases handled here: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -428,6 +453,26 @@ final class CloudRealtimeService: NSObject, @unchecked Sendable { | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| guard isConnected else { return } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| isConnected = false | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // A refused WS upgrade (HTTP 402 usage limit) lands here via the receive | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // loop with no close code. Reconnecting is futile — the server will keep | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // refusing — and would surface a generic "Connection lost" instead of the | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // real reason. Detect it synchronously to stop the reconnect, then surface | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // the typed usage error with the best numbers we have. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (webSocketTask?.response as? HTTPURLResponse)?.statusCode == 402 { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Log.transcription.warning("Cloud RT: WS upgrade returned 402 — usage limit, not reconnecting") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Task { [weak self] in | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| guard let self else { return } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| let usage = await UsageService.shared.cachedUsage | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| await UsageService.shared.refresh() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.onError?(RealtimeTranscriptionError.usageLimitExceeded( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| usedHours: usage?.usedHours ?? 0, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| limitHours: usage?.limitHours ?? 5 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| )) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.onConnectionStatusChanged?(.failed("Cloud usage limit reached")) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+456
to
+474
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Tear down the failed socket before returning on 402. This branch returns before either cleanup path runs. Since the ping loop is already started earlier, the dead Suggested fix if (webSocketTask?.response as? HTTPURLResponse)?.statusCode == 402 {
+ pingTask?.cancel()
+ pingTask = nil
+ receiveTask?.cancel()
+ receiveTask = nil
+ webSocketTask?.cancel(with: .normalClosure, reason: nil)
+ webSocketTask = nil
+ urlSession?.invalidateAndCancel()
+ urlSession = nil
Log.transcription.warning("Cloud RT: WS upgrade returned 402 — usage limit, not reconnecting")
Task { [weak self] in
guard let self else { return }
let usage = await UsageService.shared.cachedUsage
await UsageService.shared.refresh()📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // 1001 Going Away — proxy-initiated graceful close (8h cap or rolling restart). | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Per ADR-0004: save partial transcript, show non-error UI, do NOT reconnect. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if closeCode?.rawValue == 1001 { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,3 +1,4 @@ | ||
| import AppKit | ||
| import XCTest | ||
| @testable import Diduny | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Keep the sleep-flush path within the coordinator’s timing budget.
SleepFlushCoordinator.flushCurrentChunkis documented to finish on the power-management thread within ~250 ms, but this path can now block for up to 2 seconds on actor/file I/O. That can stall system sleep and still leaves you without a confirmed manifest write if the timeout fires. Please move this persistence onto a truly bounded synchronous path, or at minimum cap the wait to the coordinator budget and treat timeout as a failed flush.Also applies to: 87-90
🤖 Prompt for AI Agents