From 415bce91ad3a1b57f5d15cdae0e0300285b2468b Mon Sep 17 00:00:00 2001 From: ailuckly Date: Mon, 13 Apr 2026 21:02:17 +0800 Subject: [PATCH] fix: replace Flux.concat with Flux.merge to fix unicast sink re-subscription bug MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Using Flux.concat caused sttFlux.share() to be subscribed twice sequentially — after the STT subscription completed, concat would subscribe llmTtsEvents which triggered a second subscription to the unicast audioSink, causing the "sinks only allow a single Subscriber" error. Fix: wrap sttEvents and llmTtsEvents in Flux.merge so both subscribe simultaneously before any emissions, satisfying the unicast constraint. --- .../vocata/ai/pipeline/StreamingPipelineOrchestrator.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/vocata-server/src/main/java/com/vocata/ai/pipeline/StreamingPipelineOrchestrator.java b/vocata-server/src/main/java/com/vocata/ai/pipeline/StreamingPipelineOrchestrator.java index 118c14b..68176c7 100644 --- a/vocata-server/src/main/java/com/vocata/ai/pipeline/StreamingPipelineOrchestrator.java +++ b/vocata-server/src/main/java/com/vocata/ai/pipeline/StreamingPipelineOrchestrator.java @@ -159,9 +159,10 @@ public Flux processVoiceMessage(String conversationUuid, return Flux.concat( Flux.just(new PipelineEvent.StateChange(PipelineState.LISTENING)), - sttEvents.takeUntil(r -> ((PipelineEvent.SttResult) r).isFinal()), - Flux.just(new PipelineEvent.StateChange(PipelineState.PROCESSING)), - llmTtsEvents, + Flux.merge( + sttEvents.takeUntil(r -> ((PipelineEvent.SttResult) r).isFinal()), + llmTtsEvents + ), Flux.just(new PipelineEvent.Complete()) ); })