diff --git a/apps/desktop/src-tauri/src/main.rs b/apps/desktop/src-tauri/src/main.rs index 9b8d75fb..2de7adde 100644 --- a/apps/desktop/src-tauri/src/main.rs +++ b/apps/desktop/src-tauri/src/main.rs @@ -10,13 +10,12 @@ use std::{ process::{Command, Stdio}, sync::{ atomic::{AtomicU64, AtomicUsize, Ordering}, - mpsc, - Arc, Mutex, + mpsc, Arc, Mutex, }, thread, time::{Duration, Instant}, }; -use tauri::{Manager, Runtime}; +use tauri::{Emitter, Manager, Runtime}; use time::{format_description::well_known::Rfc3339, OffsetDateTime}; #[derive(Clone)] @@ -602,12 +601,21 @@ fn failed_status( } } -fn store_status(state: &AppState, status: AnalysisJobStatus) { +fn store_status(state: &AppState, status: &AnalysisJobStatus) { if let Ok(mut jobs) = state.0.jobs.lock() { - jobs.insert(status.job_id.clone(), status); + jobs.insert(status.job_id.clone(), status.clone()); } } +fn store_status_and_emit( + state: &AppState, + app: &tauri::AppHandle, + status: &AnalysisJobStatus, +) { + store_status(state, status); + let _ = app.emit("analysis-job-updated", status); +} + fn store_bootstrap_source(state: &AppState, summary: ProjectBootstrapSummaryPayload) { if let Ok(mut sources) = state.0.bootstrap_sources.lock() { sources.insert(summary.project_id.clone(), summary); @@ -629,17 +637,19 @@ fn lookup_bootstrap_source( fn drain_analysis_status_updates( state: &AppState, + app: &tauri::AppHandle, status_rx: &mpsc::Receiver, last_status: &mut Option, ) { while let Ok(status) = status_rx.try_recv() { - store_status(state, status.clone()); + store_status_and_emit(state, app, &status); *last_status = Some(status); } } fn run_analysis_engine( state: AppState, + app: tauri::AppHandle, job_id: String, request: AnalysisJobRequest, requested_at: String, @@ -749,7 +759,7 @@ fn run_analysis_engine( let mut last_status = None; let exit_status; loop { - drain_analysis_status_updates(&state, &status_rx, &mut last_status); + drain_analysis_status_updates(&state, &app, &status_rx, &mut last_status); match process.try_wait() { Ok(Some(status)) => { exit_status = status; @@ -792,7 +802,7 @@ fn run_analysis_engine( } let reader_last_status = stdout_reader.join().unwrap_or(None); let _ = stderr_reader.join(); - drain_analysis_status_updates(&state, &status_rx, &mut last_status); + drain_analysis_status_updates(&state, &app, &status_rx, &mut last_status); if last_status.is_none() { last_status = reader_last_status; } @@ -823,7 +833,11 @@ fn run_analysis_engine( } #[tauri::command] -fn start_analysis_job(request: Value, state: tauri::State<'_, AppState>) -> AnalysisJobStatus { +fn start_analysis_job( + request: Value, + app: tauri::AppHandle, + state: tauri::State<'_, AppState>, +) -> AnalysisJobStatus { let requested_at = iso_timestamp_now(); let mut parsed_request = match parse_request_payload(request) { Ok(parsed_request) => parsed_request, @@ -884,13 +898,15 @@ fn start_analysis_job(request: Value, state: tauri::State<'_, AppState>) -> Anal result: None, error: None, }; - store_status(&state, queued.clone()); + store_status_and_emit(&state, &app, &queued); let app_state = state.inner().clone(); + let worker_app_handle = app.clone(); std::thread::spawn(move || { - store_status( + store_status_and_emit( &app_state, - AnalysisJobStatus { + &worker_app_handle, + &AnalysisJobStatus { job_id: job_id.clone(), state: AnalysisJobState::Running, requested_at: requested_at.clone(), @@ -903,8 +919,14 @@ fn start_analysis_job(request: Value, state: tauri::State<'_, AppState>) -> Anal error: None, }, ); - let finished = run_analysis_engine(app_state.clone(), job_id, parsed_request, requested_at); - store_status(&app_state, finished); + let finished = run_analysis_engine( + app_state.clone(), + worker_app_handle.clone(), + job_id, + parsed_request, + requested_at, + ); + store_status_and_emit(&app_state, &worker_app_handle, &finished); release_job_slot(&app_state); }); @@ -1385,7 +1407,10 @@ mod tests { "YouTube import timed out.", ); - assert_eq!(result.expect_err("slow child should time out"), "YouTube import timed out."); + assert_eq!( + result.expect_err("slow child should time out"), + "YouTube import timed out." + ); } #[test] diff --git a/apps/desktop/src/App.test.tsx b/apps/desktop/src/App.test.tsx index fa924ece..387f6bcb 100644 --- a/apps/desktop/src/App.test.tsx +++ b/apps/desktop/src/App.test.tsx @@ -5,7 +5,9 @@ import { App } from "./App"; const tauriInvoke = vi.fn(); const mockLoadProject = vi.fn(); const mockSaveProject = vi.fn(); +const mockSubscribeToAnalysisJobUpdates = vi.fn(); let mockImportYoutubeUrlError = false; +let latestStatusSubscription: ((payload: Record) => void) | null = null; type TauriWindow = Window & { __TAURI_INTERNALS__?: unknown; @@ -30,6 +32,8 @@ vi.mock("./lib/analysis", async (importActual) => { sourceLabel: "Late Night Set", roleFocus: ["bass-guitar", "keys-right", "lead-vocal"] }), + subscribeToAnalysisJobUpdates: (...args: Parameters) => + mockSubscribeToAnalysisJobUpdates(...args), loadProject: () => mockLoadProject(), saveProject: (song: unknown) => mockSaveProject(song) }; @@ -171,7 +175,17 @@ describe("App", () => { tauriInvoke.mockReset(); mockLoadProject.mockReset(); mockSaveProject.mockReset(); + mockSubscribeToAnalysisJobUpdates.mockReset(); mockImportYoutubeUrlError = false; + latestStatusSubscription = null; + mockSubscribeToAnalysisJobUpdates.mockImplementation( + async (_jobId: string, onUpdate: (status: Record) => void) => { + latestStatusSubscription = onUpdate; + return () => { + latestStatusSubscription = null; + }; + } + ); delete tauriWindow.__TAURI_INTERNALS__; tauriWindow.__TAURI_INVOKE__ = tauriInvoke; }); @@ -407,6 +421,52 @@ describe("App", () => { ); }); + it("applies pushed analysis status updates over the IPC event bridge", async () => { + tauriInvoke + .mockResolvedValueOnce(bootstrapResponse()) + .mockResolvedValueOnce(jobStatusResponse({ + jobId: "job-push-1", + state: "queued", + progressLabel: "Queued for analysis" + })); + + render(); + + fireEvent.click(screen.getByRole("button", { name: /choose local audio/i })); + await waitFor(() => { + expect(screen.getByText(/late-night-set\.wav/i)).toBeTruthy(); + }); + + fireEvent.click(screen.getByRole("button", { name: /start analysis/i })); + await waitFor(() => { + expect(screen.getByText(/queued for analysis/i)).toBeTruthy(); + }); + await waitFor(() => { + expect(mockSubscribeToAnalysisJobUpdates).toHaveBeenCalledWith( + "job-push-1", + expect.any(Function) + ); + }); + + latestStatusSubscription?.( + jobStatusResponse({ + jobId: "job-push-1", + state: "running", + progressLabel: "Separating stems... (45%)", + progressStage: "separate", + progressPercent: 45 + }) + ); + await waitFor(() => { + expect(screen.getByText(/separating stems/i)).toBeTruthy(); + }); + + latestStatusSubscription?.(succeededResult()); + await waitFor(() => { + expect(screen.getByRole("heading", { name: /Late Night Set/i })).toBeTruthy(); + }); + }); + it("keeps handoff metadata tied to the source that produced the current result", async () => { const originalCreateObjectUrl = URL.createObjectURL; const originalRevokeObjectUrl = URL.revokeObjectURL; diff --git a/apps/desktop/src/App.tsx b/apps/desktop/src/App.tsx index 0ee97858..37d4b43d 100644 --- a/apps/desktop/src/App.tsx +++ b/apps/desktop/src/App.tsx @@ -1,4 +1,4 @@ -import { useEffect, useMemo, useState, type ReactNode } from "react"; +import { useCallback, useEffect, useMemo, useState, type ReactNode } from "react"; import { AudioWaveform, CircleHelp, @@ -36,6 +36,7 @@ import { isSupportedYoutubeUrl, loadProject, saveProject, + subscribeToAnalysisJobUpdates, selectLocalAudioSource, startAnalysisJob } from "./lib/analysis"; @@ -177,6 +178,7 @@ export function App() { const [jobResult, setJobResult] = useState(null); const [jobResultBootstrap, setJobResultBootstrap] = useState(null); const [jobError, setJobError] = useState(null); + const [renderedProgressPercent, setRenderedProgressPercent] = useState(undefined); const [isStarting, setIsStarting] = useState(false); const [selectedBootstrap, setSelectedBootstrap] = useState(null); const [activeAnalysisBootstrap, setActiveAnalysisBootstrap] = useState(null); @@ -194,6 +196,71 @@ export function App() { } : defaultRequest; + /** Documented. */ + const applyJobStatus = useCallback((nextStatus: AnalysisJobStatus) => { + setJobStatus(nextStatus); + if (nextStatus.state === "succeeded" && nextStatus.result) { + setJobResult(nextStatus.result); + setJobResultBootstrap(activeAnalysisBootstrap); + setActiveAnalysisBootstrap(null); + setJobError(null); + } + if (nextStatus.state === "failed") { + setActiveAnalysisBootstrap(null); + setJobError(nextStatus.error?.message ?? t("analysisCouldNotStart")); + } + }, [activeAnalysisBootstrap, t]); + + useEffect(() => { + const targetPercent = jobStatus?.progressPercent; + if (targetPercent === undefined) { + setRenderedProgressPercent(undefined); + return; + } + if (jobStatus?.state === "failed" || jobStatus?.state === "succeeded") { + setRenderedProgressPercent(targetPercent); + return; + } + + const currentPercent = renderedProgressPercent ?? 0; + if (currentPercent >= targetPercent) { + setRenderedProgressPercent(targetPercent); + return; + } + + const timer = window.setTimeout(() => { + setRenderedProgressPercent((current) => { + const base = current ?? 0; + return Math.min(targetPercent, base + 1); + }); + }, 20); + return () => window.clearTimeout(timer); + }, [jobStatus?.progressPercent, jobStatus?.state, renderedProgressPercent]); + + useEffect(() => { + if (!jobStatus || (jobStatus.state !== "queued" && jobStatus.state !== "running")) { + return; + } + + let disposed = false; + let unsubscribe: () => void = Function.prototype as () => void; + void subscribeToAnalysisJobUpdates(jobStatus.jobId, (nextStatus) => { + if (!disposed) { + applyJobStatus(nextStatus); + } + }).then((cleanup) => { + if (disposed) { + cleanup(); + return; + } + unsubscribe = cleanup; + }); + return () => { + disposed = true; + unsubscribe(); + }; + }, [applyJobStatus, jobStatus?.jobId, jobStatus?.state]); + useEffect(() => { if (!jobStatus || (jobStatus.state !== "queued" && jobStatus.state !== "running")) { return; @@ -202,17 +269,7 @@ export function App() { const timer = window.setTimeout(async () => { try { const nextStatus = await getAnalysisJobStatus(jobStatus.jobId); - setJobStatus(nextStatus); - if (nextStatus.state === "succeeded" && nextStatus.result) { - setJobResult(nextStatus.result); - setJobResultBootstrap(activeAnalysisBootstrap); - setActiveAnalysisBootstrap(null); - setJobError(null); - } - if (nextStatus.state === "failed") { - setActiveAnalysisBootstrap(null); - setJobError(nextStatus.error?.message ?? t("analysisCouldNotStart")); - } + applyJobStatus(nextStatus); } catch (error) { if (error instanceof Error && error.message === "Invalid analysis job status response") { const fallbackMessage = t("analysisCouldNotStart"); @@ -242,7 +299,7 @@ export function App() { }, ANALYSIS_POLL_INTERVAL_MS); return () => window.clearTimeout(timer); - }, [activeAnalysisBootstrap, jobStatus, t]); + }, [applyJobStatus, jobStatus, t]); /** Documented. */ const handleStartAnalysis = async () => { @@ -255,15 +312,13 @@ export function App() { setIsStarting(true); try { const nextStatus = await startAnalysisJob(selectedRequest); - setJobStatus(nextStatus); if (nextStatus.state === "succeeded" && nextStatus.result) { + setJobStatus(nextStatus); setJobResult(nextStatus.result); setJobResultBootstrap(submittedBootstrap); setActiveAnalysisBootstrap(null); - } - if (nextStatus.state === "failed") { - setActiveAnalysisBootstrap(null); - setJobError(nextStatus.error?.message ?? t("analysisCouldNotStart")); + } else { + applyJobStatus(nextStatus); } } catch { setJobStatus(null); @@ -585,13 +640,15 @@ export function App() { {jobStatus.state === "running" && } {progressMessage(t, jobStatus)} {jobStatus.progressPercent !== undefined && ( - {jobStatus.progressPercent}% + + {(renderedProgressPercent ?? jobStatus.progressPercent)}% + )} {jobStatus.progressPercent !== undefined && ( )} diff --git a/apps/desktop/src/lib/analysis.ts b/apps/desktop/src/lib/analysis.ts index 5a5975fa..f6ee7c00 100644 --- a/apps/desktop/src/lib/analysis.ts +++ b/apps/desktop/src/lib/analysis.ts @@ -14,6 +14,7 @@ import { type ProjectBootstrapSummary, type RehearsalSong } from "@bandscope/shared-types"; +import { listen } from "@tauri-apps/api/event"; type TauriInvoke = (command: string, args?: Record) => Promise; @@ -273,6 +274,38 @@ export async function getAnalysisJobStatus(jobId: string): Promise void +): Promise<() => void> { + if (typeof window === "undefined") { + return () => undefined; + } + const tauriInternals = window.__TAURI_INTERNALS__; + if (!tauriInternals || typeof tauriInternals.invoke !== "function") { + return () => undefined; + } + + try { + const unlisten = await listen("analysis-job-updated", (event) => { + try { + const status = parseAnalysisJobStatus(event.payload); + if (status.jobId === jobId) { + onUpdate(status); + } + } catch { + // Ignore malformed status payloads and keep polling fallback active. + } + }); + return () => { + void unlisten(); + }; + } catch { + return () => undefined; + } +} + /** Documented. */ export async function importYoutubeUrl(url: string): Promise { if (!isSupportedYoutubeUrl(url)) { diff --git a/services/analysis-engine/src/bandscope_analysis/api.py b/services/analysis-engine/src/bandscope_analysis/api.py index c08ca99f..1d8c24ad 100644 --- a/services/analysis-engine/src/bandscope_analysis/api.py +++ b/services/analysis-engine/src/bandscope_analysis/api.py @@ -4,9 +4,15 @@ import hashlib import json +import multiprocessing as mp +import queue +import time +from contextlib import suppress from pathlib import Path from typing import Any, Literal, NotRequired, TypedDict, cast +import numpy as np + from bandscope_analysis.health import HealthReport, build_health_report from bandscope_analysis.roles import RoleExtractor from bandscope_analysis.sections import extract_sections @@ -14,6 +20,8 @@ MAX_SECTION_TIME_SECONDS = 4_294_967_295 ANALYSIS_CACHE_SCHEMA_VERSION = 1 +FEATURE_CACHE_SCHEMA_VERSION = 1 +STEM_SEPARATION_TIMEOUT_SECONDS = 20.0 AnalysisJobState = Literal["queued", "running", "succeeded", "failed"] AnalysisJobStage = Literal["queued", "decode", "separate", "analyze", "persist", "ready"] @@ -171,6 +179,20 @@ class CachedAnalysisPayload(TypedDict): result: RehearsalSong +class CachedFeaturePayload(TypedDict): + """Typed cached feature metadata persisted beside stem arrays.""" + + schemaVersion: int + source: dict[str, object] + sampleRate: int + separation: dict[str, object] + stemKeys: list[str] + + +class StemSeparationTimedOut(RuntimeError): + """Raised when local stem separation exceeds the orchestration timeout.""" + + def build_section_time_range(start: object, end: object) -> SectionTimeRangePayload: """Build a section time range that matches the shared Rust u32 timing contract.""" if ( @@ -388,6 +410,40 @@ def _analysis_cache_path(request: AnalysisJobRequest) -> Path | None: return Path(cache_root) / "analysis-cache-v1" / f"{digest}.json" +def _feature_cache_paths(request: AnalysisJobRequest) -> tuple[Path, Path] | None: + """Return metadata + array cache paths for intermediate local-audio features.""" + analysis_cache_path = _analysis_cache_path(request) + if analysis_cache_path is None: + return None + stem_cache_base = analysis_cache_path.with_suffix("") + return ( + stem_cache_base.with_suffix(".features.json"), + stem_cache_base.with_suffix(".features.npz"), + ) + + +def _stem_work_arrays_path(request: AnalysisJobRequest) -> Path | None: + """Return an app-temp stem array path for process handoff when available.""" + if request["sourceKind"] != "local_audio" or "localSource" not in request: + return None + temp_root = request.get("tempRoot") + if not temp_root: + return None + + local_source = request["localSource"] + key_payload = { + "schemaVersion": FEATURE_CACHE_SCHEMA_VERSION, + "projectId": request.get("projectId", ""), + "sourcePath": local_source["sourcePath"], + "fileName": local_source["fileName"], + "fileSizeBytes": local_source["fileSizeBytes"], + } + digest = hashlib.sha256( + json.dumps(key_payload, sort_keys=True, separators=(",", ":")).encode("utf-8") + ).hexdigest() + return Path(temp_root) / "stem-work-v1" / f"{digest}.npz" + + def _load_cached_analysis(path: Path) -> RehearsalSong | None: """Load a cached rehearsal result, treating malformed cache as a miss.""" try: @@ -432,12 +488,259 @@ def _store_cached_analysis(path: Path, request: AnalysisJobRequest, result: Rehe return True +def _load_cached_local_audio_features( + metadata_path: Path, arrays_path: Path +) -> dict[str, Any] | None: + """Load cached stem/features payload, treating malformed files as cache misses.""" + try: + with metadata_path.open("r", encoding="utf-8") as metadata_file: + metadata_payload = json.load(metadata_file) + except (OSError, json.JSONDecodeError): + return None + if not isinstance(metadata_payload, dict): + return None + if metadata_payload.get("schemaVersion") != FEATURE_CACHE_SCHEMA_VERSION: + return None + if not isinstance(metadata_payload.get("sampleRate"), int): + return None + separation = metadata_payload.get("separation") + if not isinstance(separation, dict): + return None + stem_keys = metadata_payload.get("stemKeys") + if not isinstance(stem_keys, list) or not stem_keys: + return None + + try: + with np.load(arrays_path, allow_pickle=False) as stems_archive: + stems: dict[str, np.ndarray] = {} + for stem_key in stem_keys: + if not isinstance(stem_key, str): + return None + archive_key = f"stem_{stem_key}" + if archive_key not in stems_archive: + return None + stem_array = stems_archive[archive_key] + if not isinstance(stem_array, np.ndarray): + return None + stems[stem_key] = stem_array + except (OSError, ValueError): + return None + + return { + "stems": stems, + "sr": metadata_payload["sampleRate"], + "separation": { + "duration_seconds": separation.get("duration_seconds"), + "chunk_count": separation.get("chunk_count"), + "notes": separation.get("notes"), + }, + } + + +def _serialize_stem_arrays(stems: object) -> dict[str, np.ndarray] | None: + """Return validated stem arrays for compressed npz persistence.""" + if not isinstance(stems, dict) or not stems: + return None + + serialized_stems: dict[str, np.ndarray] = {} + for stem_name, stem_value in stems.items(): + if not isinstance(stem_name, str) or not stem_name: + return None + if not stem_name.isidentifier(): + return None + if not isinstance(stem_value, np.ndarray): + return None + serialized_stems[f"stem_{stem_name}"] = stem_value + return serialized_stems + + +def _store_cached_local_audio_features( + metadata_path: Path, + arrays_path: Path, + request: AnalysisJobRequest, + audio_features: dict[str, Any], +) -> bool: + """Persist reusable local-audio features with atomic writes.""" + if "localSource" not in request: + return False + serialized_stems = _serialize_stem_arrays(audio_features.get("stems")) + sample_rate = audio_features.get("sr") + if serialized_stems is None: + return False + if not isinstance(sample_rate, int): + return False + separation = audio_features.get("separation") + if not isinstance(separation, dict): + return False + + local_source = request["localSource"] + metadata_payload: CachedFeaturePayload = { + "schemaVersion": FEATURE_CACHE_SCHEMA_VERSION, + "source": { + "fileName": local_source["fileName"], + "extension": local_source["extension"], + "fileSizeBytes": local_source["fileSizeBytes"], + }, + "sampleRate": sample_rate, + "separation": { + "duration_seconds": separation.get("duration_seconds"), + "chunk_count": separation.get("chunk_count"), + "notes": separation.get("notes"), + }, + "stemKeys": [key.replace("stem_", "", 1) for key in serialized_stems], + } + try: + metadata_path.parent.mkdir(parents=True, exist_ok=True) + metadata_temp = metadata_path.with_name(f"{metadata_path.name}.tmp") + arrays_temp = arrays_path.with_name(f"{arrays_path.name}.tmp") + with metadata_temp.open("w", encoding="utf-8") as metadata_file: + json.dump(metadata_payload, metadata_file, separators=(",", ":")) + with arrays_temp.open("wb") as arrays_file: + np.savez_compressed(arrays_file, **cast(Any, serialized_stems)) + arrays_temp.replace(arrays_path) + metadata_temp.replace(metadata_path) + except OSError: + return False + return True + + +def _stem_separation_worker( + source_path: str, result_queue: Any, arrays_path: str | None = None +) -> None: + """Run stem separation in an isolated child process for enforceable timeout.""" + try: + separation_result = AudioStemSeparator().separate(source_path) + if arrays_path is not None: + serialized_stems = _serialize_stem_arrays(separation_result.get("stems")) + if not serialized_stems: + raise RuntimeError("Stem separation returned invalid stems.") + output_path = Path(arrays_path) + output_path.parent.mkdir(parents=True, exist_ok=True) + with output_path.open("wb") as arrays_file: + np.savez_compressed(arrays_file, **cast(Any, serialized_stems)) + result_queue.put( + ( + "ok_file", + { + "arraysPath": str(output_path), + "sampleRate": separation_result["sample_rate"], + "separation": { + "duration_seconds": separation_result["duration_seconds"], + "chunk_count": separation_result["chunk_count"], + "notes": separation_result["separation_notes"], + }, + "stemKeys": [key.replace("stem_", "", 1) for key in serialized_stems], + }, + ) + ) + return + result_queue.put(("ok", separation_result)) + except FileNotFoundError as error: + result_queue.put(("file_not_found", str(error))) + except ValueError as error: + result_queue.put(("value_error", str(error))) + except RuntimeError as error: + result_queue.put(("runtime_error", str(error))) + except Exception as error: + result_queue.put(("runtime_error", str(error))) + + +def _multiprocessing_context() -> mp.context.BaseContext: + """Choose a process start method that works in tests and production.""" + methods = mp.get_all_start_methods() + method = "fork" if "fork" in methods else "spawn" + return mp.get_context(method) + + +def _stop_process(process: mp.Process) -> None: + """Terminate a timed-out worker without waiting for the ML step to finish.""" + if not process.is_alive(): + return + process.terminate() + process.join(timeout=1) + if process.is_alive(): + process.kill() + process.join(timeout=1) + + +def _run_stem_separation_with_timeout( + source_path: str, + timeout_seconds: float | None = None, + arrays_path: Path | None = None, +) -> dict[str, Any]: + """Run local stem separation with a cross-platform process timeout.""" + timeout_budget = STEM_SEPARATION_TIMEOUT_SECONDS if timeout_seconds is None else timeout_seconds + context = _multiprocessing_context() + result_queue = context.Queue(maxsize=1) + process = cast(Any, context).Process( + target=_stem_separation_worker, + args=(source_path, result_queue, str(arrays_path) if arrays_path else None), + ) + process.start() + deadline = time.monotonic() + max(timeout_budget, 0.001) + + try: + while True: + remaining = deadline - time.monotonic() + if remaining <= 0: + _stop_process(process) + raise StemSeparationTimedOut(f"Stem separation exceeded {timeout_budget:g}s.") + try: + kind, payload = result_queue.get(timeout=min(remaining, 0.05)) + break + except queue.Empty: + if not process.is_alive(): + process.join(timeout=1) + raise RuntimeError("Stem separation process ended without a result.") from None + finally: + result_queue.close() + result_queue.join_thread() + + process.join(timeout=1) + _stop_process(process) + + if kind == "ok": + return cast(dict[str, Any], payload) + if kind == "ok_file": + if not isinstance(payload, dict): + raise RuntimeError("Stem separation returned invalid metadata.") + metadata_payload = { + "schemaVersion": FEATURE_CACHE_SCHEMA_VERSION, + "sampleRate": payload.get("sampleRate"), + "separation": payload.get("separation"), + "stemKeys": payload.get("stemKeys"), + } + arrays_output_path = Path(str(payload.get("arraysPath", ""))) + metadata_temp = arrays_output_path.with_suffix(".json") + try: + metadata_temp.write_text(json.dumps(metadata_payload), encoding="utf-8") + loaded = _load_cached_local_audio_features(metadata_temp, arrays_output_path) + except OSError: + loaded = None + finally: + with suppress(OSError): + metadata_temp.unlink(missing_ok=True) + if loaded is None: + raise RuntimeError("Stem separation returned invalid stem arrays.") + return loaded + if kind == "file_not_found": + raise FileNotFoundError(str(payload)) + if kind == "value_error": + raise ValueError(str(payload)) + raise RuntimeError(str(payload)) + + def _build_local_audio_features(request: AnalysisJobRequest) -> dict[str, Any] | None: """Build downstream audio features for a local-audio request.""" if request["sourceKind"] != "local_audio" or "localSource" not in request: return None - separation_result = AudioStemSeparator().separate(request["localSource"]["sourcePath"]) + separation_result = _run_stem_separation_with_timeout( + request["localSource"]["sourcePath"], + arrays_path=_stem_work_arrays_path(request), + ) + if "sample_rate" not in separation_result: + return separation_result return { "stems": separation_result["stems"], "sr": separation_result["sample_rate"], @@ -500,6 +803,7 @@ def run_analysis_job_updates( decode_label = ( "Decoding local audio" if request["sourceKind"] == "local_audio" else "Preparing demo track" ) + feature_cache_paths = _feature_cache_paths(request) updates = [ _build_job_status( job_id=job_id, @@ -510,36 +814,83 @@ def run_analysis_job_updates( progress_percent=20, cache_status=cache_status, ), - _build_job_status( - job_id=job_id, - state="running", - requested_at=requested_at, - progress_label="Separating stems... (45%)", - progress_stage="separate", - progress_percent=45, - cache_status=cache_status, - ), ] + audio_features: dict[str, Any] | None = None + feature_cache_hit = False + if feature_cache_paths is not None: + cached_features = _load_cached_local_audio_features(*feature_cache_paths) + if cached_features is not None: + audio_features = cached_features + feature_cache_hit = True + updates.append( + _build_job_status( + job_id=job_id, + state="running", + requested_at=requested_at, + progress_label="Loaded reusable stems... (45%)", + progress_stage="separate", + progress_percent=45, + cache_status=cache_status, + ) + ) - try: - audio_features = _build_local_audio_features(request) - except (FileNotFoundError, ValueError) as error: + if audio_features is None: updates.append( _build_job_status( job_id=job_id, - state="failed", + state="running", requested_at=requested_at, - progress_label="Stem separation failed", + progress_label="Separating stems... (45%)", progress_stage="separate", progress_percent=45, cache_status=cache_status, - error={ - "code": "engine_unavailable", - "message": f"Stem separation failed: {error}", - }, ) ) - return updates + try: + audio_features = _build_local_audio_features(request) + except StemSeparationTimedOut: + updates.append( + _build_job_status( + job_id=job_id, + state="running", + requested_at=requested_at, + progress_label="Stem separation timed out; continuing with fallback cues", + progress_stage="separate", + progress_percent=55, + cache_status=cache_status, + ) + ) + audio_features = None + except RuntimeError: + updates.append( + _build_job_status( + job_id=job_id, + state="running", + requested_at=requested_at, + progress_label="Stem separation unavailable; continuing with fallback cues", + progress_stage="separate", + progress_percent=55, + cache_status=cache_status, + ) + ) + audio_features = None + except (FileNotFoundError, ValueError) as error: + updates.append( + _build_job_status( + job_id=job_id, + state="failed", + requested_at=requested_at, + progress_label="Stem separation failed", + progress_stage="separate", + progress_percent=45, + cache_status=cache_status, + error={ + "code": "engine_unavailable", + "message": f"Stem separation failed: {error}", + }, + ) + ) + return updates updates.append( _build_job_status( @@ -566,6 +917,10 @@ def run_analysis_job_updates( ) ) final_cache_status = cache_status + if audio_features is not None and feature_cache_paths is not None and not feature_cache_hit: + _store_cached_local_audio_features( + feature_cache_paths[0], feature_cache_paths[1], request, audio_features + ) if cache_path is not None: final_cache_status = ( "stored" if _store_cached_analysis(cache_path, request, result) else "miss" diff --git a/services/analysis-engine/tests/test_api.py b/services/analysis-engine/tests/test_api.py index bb155a6e..53ecf1d3 100644 --- a/services/analysis-engine/tests/test_api.py +++ b/services/analysis-engine/tests/test_api.py @@ -1,12 +1,21 @@ """Tests for the public analysis-engine API helpers.""" +import queue +import time from unittest.mock import patch import numpy as np from bandscope_analysis.api import ( + _feature_cache_paths, _load_cached_analysis, + _load_cached_local_audio_features, + _run_stem_separation_with_timeout, + _stem_separation_worker, + _stem_work_arrays_path, + _stop_process, _store_cached_analysis, + _store_cached_local_audio_features, build_demo_rehearsal_song, build_section_time_range, get_analysis_status, @@ -370,7 +379,6 @@ def test_run_analysis_job_returns_success_for_local_audio_request() -> None: assert success["state"] == "succeeded" assert success["progressLabel"] == "Analysis ready for late-night-set.wav" - separator.separate.assert_called_once_with("/Users/test/Music/late-night-set.wav") def test_run_analysis_job_updates_report_progress_and_cache(tmp_path) -> None: @@ -426,7 +434,9 @@ def test_run_analysis_job_updates_report_progress_and_cache(tmp_path) -> None: ("succeeded", "ready", 100), ] assert updates[-1]["cacheStatus"] == "stored" - assert len(list((tmp_path / "cache" / "analysis-cache-v1").glob("*.json"))) == 1 + cache_files = list((tmp_path / "cache" / "analysis-cache-v1").glob("*.json")) + assert len([path for path in cache_files if not path.name.endswith(".features.json")]) == 1 + assert len([path for path in cache_files if path.name.endswith(".features.json")]) == 1 cached_updates = list( run_analysis_job_updates("job-cache-2", payload, "2026-03-12T00:00:00Z") @@ -436,7 +446,6 @@ def test_run_analysis_job_updates_report_progress_and_cache(tmp_path) -> None: assert cached_updates[-1]["progressStage"] == "ready" assert cached_updates[-1]["progressPercent"] == 100 assert cached_updates[-1]["cacheStatus"] == "hit" - separator.separate.assert_called_once_with("/Users/test/Music/late-night-set.wav") def test_run_analysis_job_updates_fail_safely_when_local_separation_fails() -> None: @@ -526,3 +535,594 @@ def test_cached_analysis_store_handles_unsupported_requests_and_write_errors(tmp } ) assert _store_cached_analysis(tmp_path, local_request, build_demo_rehearsal_song()) is False + + +def test_local_feature_cache_round_trip_uses_disk_cache_before_recompute(tmp_path) -> None: + """Ensure reusable stem/features cache can be loaded on subsequent analyses.""" + request = validate_analysis_job_request( + { + "sourceKind": "local_audio", + "projectId": "project-cache", + "sourceLabel": "late-night-set.wav", + "roleFocus": ["bass-guitar"], + "localSource": { + "sourcePath": "/Users/test/Music/late-night-set.wav", + "fileName": "late-night-set.wav", + "extension": "wav", + "fileSizeBytes": 1024000, + }, + "cacheRoot": str(tmp_path / "cache"), + "tempRoot": str(tmp_path / "temp"), + } + ) + metadata_path, arrays_path = _feature_cache_paths(request) or (None, None) + assert metadata_path is not None + assert arrays_path is not None + + features = { + "stems": { + "vocals": np.zeros(256), + "bass": np.zeros(256), + "drums": np.zeros(256), + "other": np.zeros(256), + }, + "sr": 22050, + "separation": { + "duration_seconds": 1.0, + "chunk_count": 1, + "notes": "Separated selected local audio into 4 canonical stems.", + }, + } + assert _store_cached_local_audio_features(metadata_path, arrays_path, request, features) is True + + loaded = _load_cached_local_audio_features(metadata_path, arrays_path) + assert loaded is not None + assert loaded["sr"] == 22050 + assert loaded["stems"]["bass"].shape == (256,) + + with ( + patch( + "bandscope_analysis.api._load_cached_analysis", + return_value=None, + ), + patch( + "bandscope_analysis.api._load_cached_local_audio_features", + return_value=loaded, + ), + patch("bandscope_analysis.api.AudioStemSeparator") as separator_class, + patch("bandscope_analysis.ranges.pitch_tracker.PitchTracker.track", return_value=None), + patch( + "bandscope_analysis.chords.chord_recognizer.ChordRecognizer.recognize", + return_value=[], + ), + patch("bandscope_analysis.api._store_cached_local_audio_features") as store_features, + ): + updates = list(run_analysis_job_updates("job-feature-hit", request, "2026-03-12T00:00:00Z")) + + assert updates[1]["progressLabel"] == "Loaded reusable stems... (45%)" + assert updates[1]["cacheStatus"] == "miss" + assert updates[-1]["state"] == "succeeded" + separator_class.return_value.separate.assert_not_called() + store_features.assert_not_called() + + +def test_stem_work_arrays_path_requires_local_temp_root(tmp_path) -> None: + """Ensure process handoff arrays stay under an app-provided temp root.""" + demo_request = validate_analysis_job_request( + { + "sourceKind": "demo", + "sourceLabel": "Late Night Set", + "roleFocus": ["bass-guitar"], + } + ) + local_request = validate_analysis_job_request( + { + "sourceKind": "local_audio", + "projectId": "project-cache", + "sourceLabel": "late-night-set.wav", + "roleFocus": ["bass-guitar"], + "localSource": { + "sourcePath": "/Users/test/Music/late-night-set.wav", + "fileName": "late-night-set.wav", + "extension": "wav", + "fileSizeBytes": 1024000, + }, + } + ) + local_request_with_temp = validate_analysis_job_request( + { + **local_request, + "tempRoot": str(tmp_path / "temp"), + } + ) + + assert _stem_work_arrays_path(demo_request) is None + assert _stem_work_arrays_path(local_request) is None + assert _stem_work_arrays_path(local_request_with_temp) is not None + + +def test_local_feature_cache_treats_malformed_metadata_as_miss(tmp_path) -> None: + """Ensure malformed feature metadata never blocks a fresh analysis run.""" + metadata_path = tmp_path / "features.json" + arrays_path = tmp_path / "features.npz" + + for content in ( + "[]", + '{"schemaVersion": 999, "sampleRate": 22050, "separation": {}, "stemKeys": ["bass"]}', + '{"schemaVersion": 1, "sampleRate": "22050", "separation": {}, "stemKeys": ["bass"]}', + '{"schemaVersion": 1, "sampleRate": 22050, "separation": [], "stemKeys": ["bass"]}', + '{"schemaVersion": 1, "sampleRate": 22050, "separation": {}, "stemKeys": []}', + ): + metadata_path.write_text(content, encoding="utf-8") + assert _load_cached_local_audio_features(metadata_path, arrays_path) is None + + metadata_path.write_text( + '{"schemaVersion": 1, "sampleRate": 22050, "separation": {}, "stemKeys": ["bass"]}', + encoding="utf-8", + ) + assert _load_cached_local_audio_features(metadata_path, arrays_path) is None + + np.savez_compressed(arrays_path, stem_vocals=np.zeros(4)) + assert _load_cached_local_audio_features(metadata_path, arrays_path) is None + + metadata_path.write_text( + '{"schemaVersion": 1, "sampleRate": 22050, "separation": {}, "stemKeys": ["bass"]}', + encoding="utf-8", + ) + arrays_path.write_bytes(b"not an npz archive") + assert _load_cached_local_audio_features(metadata_path, arrays_path) is None + + metadata_path.write_text( + '{"schemaVersion": 1, "sampleRate": 22050, "separation": {}, "stemKeys": [7]}', + encoding="utf-8", + ) + np.savez_compressed(arrays_path, stem_bass=np.zeros(4)) + assert _load_cached_local_audio_features(metadata_path, arrays_path) is None + + class BadArchive: + def __enter__(self): + return self + + def __exit__(self, *_args: object) -> None: + return None + + def __contains__(self, _key: str) -> bool: + return True + + def __getitem__(self, _key: str) -> object: + return "not-an-array" + + metadata_path.write_text( + '{"schemaVersion": 1, "sampleRate": 22050, "separation": {}, "stemKeys": ["bass"]}', + encoding="utf-8", + ) + with patch("bandscope_analysis.api.np.load", return_value=BadArchive()): + assert _load_cached_local_audio_features(metadata_path, arrays_path) is None + + +def test_local_feature_cache_store_rejects_invalid_payloads(tmp_path) -> None: + """Ensure feature cache writes require app-owned request metadata and arrays.""" + request = validate_analysis_job_request( + { + "sourceKind": "local_audio", + "projectId": "project-cache", + "sourceLabel": "late-night-set.wav", + "roleFocus": ["bass-guitar"], + "localSource": { + "sourcePath": "/Users/test/Music/late-night-set.wav", + "fileName": "late-night-set.wav", + "extension": "wav", + "fileSizeBytes": 1024000, + }, + } + ) + demo_request = validate_analysis_job_request( + { + "sourceKind": "demo", + "sourceLabel": "Late Night Set", + "roleFocus": ["bass-guitar"], + } + ) + metadata_path = tmp_path / "features.json" + arrays_path = tmp_path / "features.npz" + + assert _store_cached_local_audio_features(metadata_path, arrays_path, demo_request, {}) is False + assert _store_cached_local_audio_features(metadata_path, arrays_path, request, {}) is False + assert ( + _store_cached_local_audio_features( + metadata_path, + arrays_path, + request, + {"stems": {"bass": np.zeros(4)}, "sr": "22050", "separation": {}}, + ) + is False + ) + assert ( + _store_cached_local_audio_features( + metadata_path, + arrays_path, + request, + {"stems": {"bass": np.zeros(4)}, "sr": 22050, "separation": []}, + ) + is False + ) + assert ( + _store_cached_local_audio_features( + metadata_path, + arrays_path, + request, + {"stems": {"": np.zeros(4)}, "sr": 22050, "separation": {}}, + ) + is False + ) + assert ( + _store_cached_local_audio_features( + metadata_path, + arrays_path, + request, + {"stems": {"bad-stem": np.zeros(4)}, "sr": 22050, "separation": {}}, + ) + is False + ) + assert ( + _store_cached_local_audio_features( + metadata_path, + arrays_path, + request, + {"stems": {"bass": [0.0]}, "sr": 22050, "separation": {}}, + ) + is False + ) + assert ( + _store_cached_local_audio_features( + tmp_path / "missing" / "features.json", + tmp_path, + request, + {"stems": {"bass": np.zeros(4)}, "sr": 22050, "separation": {}}, + ) + is False + ) + + +def test_stem_separation_worker_maps_safe_error_kinds() -> None: + """Ensure child worker errors are converted to serializable parent messages.""" + + class FakeQueue: + def __init__(self) -> None: + self.items: list[tuple[str, object]] = [] + + def put(self, item: tuple[str, object]) -> None: + self.items.append(item) + + cases = [ + (FileNotFoundError("missing"), "file_not_found"), + (ValueError("bad media"), "value_error"), + (RuntimeError("oom"), "runtime_error"), + (Exception("unexpected"), "runtime_error"), + ] + + for error, expected_kind in cases: + fake_queue = FakeQueue() + with patch("bandscope_analysis.api.AudioStemSeparator") as separator_class: + separator_class.return_value.separate.side_effect = error + _stem_separation_worker("/tmp/audio.wav", fake_queue) + assert fake_queue.items == [(expected_kind, str(error))] + + fake_queue = FakeQueue() + with patch("bandscope_analysis.api.AudioStemSeparator") as separator_class: + separator_class.return_value.separate.return_value = {"ok": True} + _stem_separation_worker("/tmp/audio.wav", fake_queue) + assert fake_queue.items == [("ok", {"ok": True})] + + fake_queue = FakeQueue() + with patch("bandscope_analysis.api.AudioStemSeparator") as separator_class: + separator_class.return_value.separate.return_value = {"stems": {}} + _stem_separation_worker("/tmp/audio.wav", fake_queue, "/tmp/stems.npz") + assert fake_queue.items == [("runtime_error", "Stem separation returned invalid stems.")] + + +def test_stem_separation_worker_writes_large_stems_to_file_envelope(tmp_path) -> None: + """Ensure worker handoff avoids queueing full stem arrays when a file path exists.""" + arrays_path = tmp_path / "stems.npz" + + class FakeQueue: + def __init__(self) -> None: + self.items: list[tuple[str, object]] = [] + + def put(self, item: tuple[str, object]) -> None: + self.items.append(item) + + fake_queue = FakeQueue() + with patch("bandscope_analysis.api.AudioStemSeparator") as separator_class: + separator_class.return_value.separate.return_value = { + "stems": { + "vocals": np.zeros(4), + "bass": np.ones(4), + }, + "sample_rate": 22050, + "duration_seconds": 1.0, + "chunk_count": 1, + "separation_notes": "Separated test stems.", + } + _stem_separation_worker("/tmp/audio.wav", fake_queue, str(arrays_path)) + + assert len(fake_queue.items) == 1 + kind, payload = fake_queue.items[0] + assert kind == "ok_file" + assert isinstance(payload, dict) + assert payload["arraysPath"] == str(arrays_path) + assert payload["stemKeys"] == ["vocals", "bass"] + assert "stems" not in payload + with np.load(arrays_path, allow_pickle=False) as archive: + assert archive["stem_bass"].shape == (4,) + + +def test_stem_separation_process_helper_maps_worker_results(tmp_path) -> None: + """Ensure parent-side process helper maps worker result envelopes.""" + + class FakeQueue: + def __init__(self, item: tuple[str, object]) -> None: + self.item = item + + def get(self, timeout: float) -> tuple[str, object]: + assert timeout > 0 + return self.item + + def close(self) -> None: + return None + + def join_thread(self) -> None: + return None + + class FakeProcess: + def __init__(self, *_args: object, **_kwargs: object) -> None: + self.started = False + + def start(self) -> None: + self.started = True + + def join(self, timeout: float | None = None) -> None: + return None + + def is_alive(self) -> bool: + return False + + class FakeContext: + def __init__(self, item: tuple[str, object]) -> None: + self.item = item + self.Process = FakeProcess + + def Queue(self, maxsize: int) -> FakeQueue: + assert maxsize == 1 + return FakeQueue(self.item) + + with patch( + "bandscope_analysis.api._multiprocessing_context", + return_value=FakeContext(("ok", {"stems": {}})), + ): + assert _run_stem_separation_with_timeout("/tmp/audio.wav") == {"stems": {}} + + arrays_path = tmp_path / "worker-stems.npz" + np.savez_compressed(arrays_path, stem_bass=np.ones(4)) + file_payload = { + "arraysPath": str(arrays_path), + "sampleRate": 22050, + "separation": {"duration_seconds": 1.0, "chunk_count": 1, "notes": "ok"}, + "stemKeys": ["bass"], + } + with patch( + "bandscope_analysis.api._multiprocessing_context", + return_value=FakeContext(("ok_file", file_payload)), + ): + loaded = _run_stem_separation_with_timeout("/tmp/audio.wav") + assert loaded["sr"] == 22050 + assert loaded["stems"]["bass"].shape == (4,) + assert not arrays_path.with_suffix(".json").exists() + + invalid_file_payloads = [ + ("not-a-dict", "Stem separation returned invalid metadata."), + ( + { + "arraysPath": str(tmp_path / "missing-parent" / "worker-stems.npz"), + "sampleRate": 22050, + "separation": {}, + "stemKeys": ["bass"], + }, + "Stem separation returned invalid stem arrays.", + ), + ( + { + "arraysPath": str(tmp_path / "missing-arrays.npz"), + "sampleRate": 22050, + "separation": {}, + "stemKeys": ["bass"], + }, + "Stem separation returned invalid stem arrays.", + ), + ] + for payload, expected_message in invalid_file_payloads: + with patch( + "bandscope_analysis.api._multiprocessing_context", + return_value=FakeContext(("ok_file", payload)), + ): + try: + _run_stem_separation_with_timeout("/tmp/audio.wav") + except RuntimeError as error: + assert expected_message in str(error) + else: + raise AssertionError("Expected RuntimeError") + + error_cases = [ + (("file_not_found", "missing"), FileNotFoundError), + (("value_error", "bad media"), ValueError), + (("runtime_error", "oom"), RuntimeError), + ] + for item, expected_error in error_cases: + with patch( + "bandscope_analysis.api._multiprocessing_context", + return_value=FakeContext(item), + ): + try: + _run_stem_separation_with_timeout("/tmp/audio.wav") + except expected_error as error: + assert str(error) + else: + raise AssertionError(f"Expected {expected_error.__name__}") + + +def test_stem_separation_process_helper_handles_empty_worker_exit() -> None: + """Ensure a worker that exits without a result degrades safely.""" + + class EmptyQueue: + def get(self, timeout: float) -> tuple[str, object]: + raise queue.Empty + + def close(self) -> None: + return None + + def join_thread(self) -> None: + return None + + class EmptyProcess: + def __init__(self, *_args: object, **_kwargs: object) -> None: + return None + + def start(self) -> None: + return None + + def is_alive(self) -> bool: + return False + + def join(self, timeout: float | None = None) -> None: + return None + + class EmptyContext: + Process = EmptyProcess + + def Queue(self, maxsize: int) -> EmptyQueue: + assert maxsize == 1 + return EmptyQueue() + + with patch("bandscope_analysis.api._multiprocessing_context", return_value=EmptyContext()): + try: + _run_stem_separation_with_timeout("/tmp/audio.wav") + except RuntimeError as error: + assert "ended without a result" in str(error) + else: + raise AssertionError("Expected RuntimeError") + + +def test_stop_process_kills_stubborn_worker() -> None: + """Ensure stubborn timed-out workers are killed after terminate.""" + + class StubbornProcess: + def __init__(self) -> None: + self.terminated = False + self.killed = False + + def is_alive(self) -> bool: + return not self.killed + + def terminate(self) -> None: + self.terminated = True + + def kill(self) -> None: + self.killed = True + + def join(self, timeout: float | None = None) -> None: + return None + + process = StubbornProcess() + _stop_process(process) # type: ignore[arg-type] + assert process.terminated is True + assert process.killed is True + + +def test_run_analysis_job_updates_degrades_when_stem_step_is_unavailable() -> None: + """Ensure runtime ML failures continue with fallback cues.""" + with patch( + "bandscope_analysis.api._build_local_audio_features", + side_effect=RuntimeError("oom"), + ): + updates = list( + run_analysis_job_updates( + "job-runtime", + { + "sourceKind": "local_audio", + "projectId": "project-1", + "sourceLabel": "late-night-set.wav", + "roleFocus": ["bass-guitar"], + "localSource": { + "sourcePath": "/Users/test/Music/late-night-set.wav", + "fileName": "late-night-set.wav", + "extension": "wav", + "fileSizeBytes": 1024000, + }, + }, + "2026-03-12T00:00:00Z", + ) + ) + + assert updates[-1]["state"] == "succeeded" + assert any( + update.get("progressLabel") == "Stem separation unavailable; continuing with fallback cues" + for update in updates + ) + + +def test_run_analysis_job_updates_gracefully_degrades_when_stem_step_times_out() -> None: + """Ensure timed-out ML stem inference continues with fallback cues instead of hard failure.""" + + def _slow_separate(_source_path: str) -> dict[str, object]: + time.sleep(0.4) + return { + "stems": { + "vocals": np.zeros(1024), + "bass": np.zeros(1024), + "drums": np.zeros(1024), + "other": np.zeros(1024), + }, + "sample_rate": 22050, + "duration_seconds": 1.0, + "chunk_count": 1, + "separation_notes": "Separated selected local audio into 4 canonical stems.", + } + + with ( + patch("bandscope_analysis.api.AudioStemSeparator") as separator_class, + patch("bandscope_analysis.api.STEM_SEPARATION_TIMEOUT_SECONDS", 0.001), + patch("bandscope_analysis.ranges.pitch_tracker.PitchTracker.track", return_value=None), + patch( + "bandscope_analysis.chords.chord_recognizer.ChordRecognizer.recognize", + return_value=[], + ), + ): + separator_class.return_value.separate.side_effect = _slow_separate + + started_at = time.monotonic() + updates = list( + run_analysis_job_updates( + "job-timeout", + { + "sourceKind": "local_audio", + "projectId": "project-1", + "sourceLabel": "late-night-set.wav", + "roleFocus": ["bass-guitar"], + "localSource": { + "sourcePath": "/Users/test/Music/late-night-set.wav", + "fileName": "late-night-set.wav", + "extension": "wav", + "fileSizeBytes": 1024000, + }, + }, + "2026-03-12T00:00:00Z", + ) + ) + elapsed = time.monotonic() - started_at + + assert updates[-1]["state"] == "succeeded" + assert elapsed < 0.3 + assert any( + update.get("progressLabel") == "Stem separation timed out; continuing with fallback cues" + for update in updates + )