Skip to content

Commit 7088a9b

Browse files
PureWeenCopilot
andauthored
Orchestration relaunch resilience: persist dispatch state, resume workers, fix watchdog (#207)
## Summary Makes multi-agent orchestration dispatch resilient to app relaunches. When the app is relaunched while workers are processing, the orchestrator can now automatically resume and collect results. ### Root Cause When the app relaunches mid-dispatch, the in-memory `Task.WhenAll` awaiting worker TCS completions dies with the old process. Workers continue on the backend but their results are never collected for synthesis. ### Changes **Relaunch Resilience** (`CopilotService.Organization.cs`) - `PendingOrchestration` model persisted to `~/.polypilot/pending-orchestration.json` before dispatching workers - `ResumeOrchestrationIfPendingAsync` detects pending orchestrations on restart and monitors workers - `MonitorAndSynthesizeAsync` polls every 5s (15min timeout) until all workers idle, then auto-synthesizes - Dispatch state cleared in `finally` blocks to prevent leaked files on cancellation/error - Worker results filtered by dispatch timestamp to avoid picking up stale pre-dispatch messages - UTC→local time conversion for reliable timestamp comparison **Watchdog Fix** (`CopilotService.cs`) - `IsMultiAgentSession` carried forward on session reconnect — without this, the watchdog used the 120s inactivity timeout instead of 600s, killing long-running workers prematurely - `[DISPATCH]` tag added to diagnostic log file filter for post-mortem analysis **Tests** (`MultiAgentRegressionTests.cs`) - 7 new tests: save/load/clear lifecycle, no-file resume, missing-group cleanup, dispatch tag guard, reconnect state guard, timestamp filter guard, finally block guard ### Multi-Model Review Reviewed by 3 models (Opus 4.6, Sonnet 4.6, GPT-5.3-Codex). All 3 consensus findings fixed: 1. ✅ Worker result collection picks up pre-dispatch history → timestamp filter added 2. ✅ UTC/local time mismatch → converted on comparison 3. ✅ Pending file leaked on cancellation → try/finally added ### Testing - 1268 tests total, 1267 pass (1 pre-existing flaky `TestIsolationGuard`) - Manually tested orchestration resume after app relaunch --------- Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 8e650f4 commit 7088a9b

9 files changed

Lines changed: 605 additions & 45 deletions
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
using Xunit;
2+
3+
namespace PolyPilot.Tests;
4+
5+
/// <summary>
6+
/// xUnit collection that serializes all test classes that read or mutate
7+
/// CopilotService.BaseDir (via SetBaseDirForTesting). Without this,
8+
/// parallel test classes race on the shared static _polyPilotBaseDir field,
9+
/// causing flaky failures in TestIsolationGuardTests.
10+
/// </summary>
11+
[CollectionDefinition("BaseDir")]
12+
public class BaseDirCollection : ICollectionFixture<BaseDirCollectionFixture> { }
13+
14+
public class BaseDirCollectionFixture { }

PolyPilot.Tests/MultiAgentRegressionTests.cs

Lines changed: 217 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ namespace PolyPilot.Tests;
1717
/// 5. Mode enum gaps: OrchestratorReflect missing from dropdowns and serialization
1818
/// 6. Reflection loop error handling: unhandled exceptions kill the async task silently
1919
/// </summary>
20+
[Collection("BaseDir")]
2021
public class MultiAgentRegressionTests
2122
{
2223
private readonly StubChatDatabase _chatDb = new();
@@ -1352,4 +1353,220 @@ public void ReconcileOrganization_PreInit_ZeroSessions_StillProtected()
13521353
}
13531354

13541355
#endregion
1356+
1357+
#region Orchestration Persistence (relaunch resilience)
1358+
1359+
[Fact]
1360+
public void PendingOrchestration_SaveLoadClear_FullLifecycle()
1361+
{
1362+
// Use a dedicated subdirectory to avoid races with parallel tests
1363+
var testDir = Path.Combine(TestSetup.TestBaseDir, "pending-orch-test-" + Guid.NewGuid().ToString("N")[..8]);
1364+
Directory.CreateDirectory(testDir);
1365+
CopilotService.SetBaseDirForTesting(testDir);
1366+
try
1367+
{
1368+
var svc = CreateService();
1369+
1370+
// Save
1371+
var pending = new PendingOrchestration
1372+
{
1373+
GroupId = "test-group-id",
1374+
OrchestratorName = "test-orchestrator",
1375+
WorkerNames = new List<string> { "worker-1", "worker-2", "worker-3" },
1376+
OriginalPrompt = "Review the code",
1377+
StartedAt = new DateTime(2026, 2, 24, 15, 0, 0, DateTimeKind.Utc),
1378+
IsReflect = true,
1379+
ReflectIteration = 2
1380+
};
1381+
svc.SavePendingOrchestration(pending);
1382+
1383+
// Load and verify round-trip
1384+
var loaded = CopilotService.LoadPendingOrchestrationForTest();
1385+
Assert.NotNull(loaded);
1386+
Assert.Equal("test-group-id", loaded.GroupId);
1387+
Assert.Equal("test-orchestrator", loaded.OrchestratorName);
1388+
Assert.Equal(3, loaded.WorkerNames.Count);
1389+
Assert.Contains("worker-2", loaded.WorkerNames);
1390+
Assert.Equal("Review the code", loaded.OriginalPrompt);
1391+
Assert.True(loaded.IsReflect);
1392+
Assert.Equal(2, loaded.ReflectIteration);
1393+
1394+
// Clear and verify deletion
1395+
CopilotService.ClearPendingOrchestrationForTest();
1396+
Assert.Null(CopilotService.LoadPendingOrchestrationForTest());
1397+
}
1398+
finally
1399+
{
1400+
// Restore shared test dir
1401+
CopilotService.SetBaseDirForTesting(TestSetup.TestBaseDir);
1402+
}
1403+
}
1404+
1405+
[Fact]
1406+
public async Task ResumeOrchestration_NoFile_DoesNothing()
1407+
{
1408+
CopilotService.ClearPendingOrchestrationForTest();
1409+
1410+
var svc = CreateService();
1411+
// Should complete without error and not add any messages
1412+
await svc.ResumeOrchestrationIfPendingAsync();
1413+
}
1414+
1415+
[Fact]
1416+
public async Task ResumeOrchestration_MissingGroup_ClearsState()
1417+
{
1418+
var testDir = Path.Combine(TestSetup.TestBaseDir, "pending-orch-resume-" + Guid.NewGuid().ToString("N")[..8]);
1419+
Directory.CreateDirectory(testDir);
1420+
CopilotService.SetBaseDirForTesting(testDir);
1421+
try
1422+
{
1423+
var svc = CreateService();
1424+
svc.SavePendingOrchestration(new PendingOrchestration
1425+
{
1426+
GroupId = "nonexistent-group",
1427+
OrchestratorName = "orch",
1428+
WorkerNames = new() { "w1" },
1429+
OriginalPrompt = "test",
1430+
StartedAt = DateTime.UtcNow
1431+
});
1432+
1433+
await svc.ResumeOrchestrationIfPendingAsync();
1434+
1435+
// Should have cleared the pending file since group doesn't exist
1436+
Assert.Null(CopilotService.LoadPendingOrchestrationForTest());
1437+
}
1438+
finally
1439+
{
1440+
CopilotService.SetBaseDirForTesting(TestSetup.TestBaseDir);
1441+
}
1442+
}
1443+
1444+
[Fact]
1445+
public void DiagnosticLogFilter_IncludesDispatchTag()
1446+
{
1447+
// The Debug() method's file filter must include [DISPATCH] so orchestration
1448+
// events are written to event-diagnostics.log for post-mortem analysis.
1449+
// This was a bug: [DISPATCH] was written to Console but not persisted.
1450+
var source = File.ReadAllText(Path.Combine(GetRepoRoot(), "PolyPilot", "Services", "CopilotService.cs"));
1451+
Assert.Contains("[DISPATCH", source.Substring(source.IndexOf("message.StartsWith(\"[EVT")));
1452+
}
1453+
1454+
[Fact]
1455+
public void ReconnectState_ShouldCarryIsMultiAgentSession()
1456+
{
1457+
// After reconnect in SendPromptAsync, the new SessionState must carry forward
1458+
// IsMultiAgentSession from the old state. Without this, the watchdog uses the
1459+
// 120s inactivity timeout instead of 600s, killing long-running worker tasks.
1460+
var source = File.ReadAllText(Path.Combine(GetRepoRoot(), "PolyPilot", "Services", "CopilotService.cs"));
1461+
1462+
// Find the reconnect block where HasUsedToolsThisTurn is carried forward
1463+
var reconnectBlock = source.Substring(source.IndexOf("newState.HasUsedToolsThisTurn = state.HasUsedToolsThisTurn"));
1464+
// IsMultiAgentSession must be carried forward in the same block
1465+
Assert.Contains("newState.IsMultiAgentSession = state.IsMultiAgentSession", reconnectBlock.Substring(0, 200));
1466+
}
1467+
1468+
[Fact]
1469+
public void MonitorAndSynthesize_ShouldFilterByDispatchTimestamp()
1470+
{
1471+
// MonitorAndSynthesizeAsync must filter worker results by dispatch timestamp
1472+
// to avoid picking up stale pre-dispatch assistant messages from prior conversations.
1473+
// This was a 3/3 consensus finding from multi-model review.
1474+
var source = File.ReadAllText(Path.Combine(GetRepoRoot(), "PolyPilot", "Services", "CopilotService.Organization.cs"));
1475+
1476+
// Find the result collection section in MonitorAndSynthesizeAsync
1477+
var monitorSection = source.Substring(source.IndexOf("Collect worker results from their chat history"));
1478+
var sectionEnd = Math.Min(monitorSection.Length, 1500);
1479+
var block = monitorSection.Substring(0, sectionEnd);
1480+
// Must convert StartedAt to local time for comparison with ChatMessage.Timestamp
1481+
Assert.Contains("dispatchTimeLocal", block);
1482+
// Must filter by timestamp
1483+
Assert.Contains("Timestamp >= dispatchTimeLocal", block);
1484+
}
1485+
1486+
[Fact]
1487+
public void PendingOrchestration_ShouldClearInFinallyBlock()
1488+
{
1489+
// ClearPendingOrchestration must be in a finally block so it's cleaned up
1490+
// even on cancellation/error. Otherwise stale pending files cause spurious
1491+
// resume on next launch. Opus review finding.
1492+
var source = File.ReadAllText(Path.Combine(GetRepoRoot(), "PolyPilot", "Services", "CopilotService.Organization.cs"));
1493+
1494+
// Non-reflect path: must have finally { ClearPendingOrchestration }
1495+
var nonReflectDispatch = source.Substring(source.IndexOf("Phase 3: Dispatch tasks to workers"));
1496+
var nextMethod = nonReflectDispatch.IndexOf("private string Build");
1497+
var dispatchBlock = nonReflectDispatch.Substring(0, nextMethod);
1498+
Assert.Contains("finally", dispatchBlock);
1499+
Assert.Contains("ClearPendingOrchestration", dispatchBlock);
1500+
}
1501+
1502+
private static string GetRepoRoot()
1503+
{
1504+
var dir = AppContext.BaseDirectory;
1505+
while (dir != null && !File.Exists(Path.Combine(dir, "PolyPilot.slnx")))
1506+
dir = Path.GetDirectoryName(dir);
1507+
return dir ?? throw new InvalidOperationException("Could not find repo root");
1508+
}
1509+
1510+
#endregion
1511+
1512+
#region GetOrchestratorGroupId
1513+
1514+
[Fact]
1515+
public void GetOrchestratorGroupId_ReturnsGroupId_ForOrchestratorSession()
1516+
{
1517+
// This tests the fix for the queue-drain dispatch bypass bug:
1518+
// When the orchestrator session was processing and a user sent a message,
1519+
// it was queued. On dequeue, it bypassed the multi-agent routing and went
1520+
// directly to SendPromptAsync instead of SendToMultiAgentGroupAsync.
1521+
var svc = CreateService();
1522+
CopilotService.SetBaseDirForTesting(TestSetup.TestBaseDir);
1523+
1524+
var group = svc.CreateMultiAgentGroup("DispatchTest", MultiAgentMode.Orchestrator);
1525+
svc.Organization.Sessions.Add(new SessionMeta { SessionName = "orch", GroupId = group.Id, Role = MultiAgentRole.Orchestrator });
1526+
svc.Organization.Sessions.Add(new SessionMeta { SessionName = "worker", GroupId = group.Id, Role = MultiAgentRole.Worker });
1527+
1528+
var result = svc.GetOrchestratorGroupId("orch");
1529+
Assert.Equal(group.Id, result);
1530+
}
1531+
1532+
[Fact]
1533+
public void GetOrchestratorGroupId_ReturnsNull_ForWorkerSession()
1534+
{
1535+
var svc = CreateService();
1536+
CopilotService.SetBaseDirForTesting(TestSetup.TestBaseDir);
1537+
1538+
var group = svc.CreateMultiAgentGroup("DispatchTest2", MultiAgentMode.Orchestrator);
1539+
svc.Organization.Sessions.Add(new SessionMeta { SessionName = "orch2", GroupId = group.Id, Role = MultiAgentRole.Orchestrator });
1540+
svc.Organization.Sessions.Add(new SessionMeta { SessionName = "worker2", GroupId = group.Id, Role = MultiAgentRole.Worker });
1541+
1542+
var result = svc.GetOrchestratorGroupId("worker2");
1543+
Assert.Null(result);
1544+
}
1545+
1546+
[Fact]
1547+
public void GetOrchestratorGroupId_ReturnsNull_ForNonGroupSession()
1548+
{
1549+
var svc = CreateService();
1550+
CopilotService.SetBaseDirForTesting(TestSetup.TestBaseDir);
1551+
1552+
var result = svc.GetOrchestratorGroupId("nonexistent-session");
1553+
Assert.Null(result);
1554+
}
1555+
1556+
[Fact]
1557+
public void GetOrchestratorGroupId_ReturnsNull_ForBroadcastMode()
1558+
{
1559+
var svc = CreateService();
1560+
CopilotService.SetBaseDirForTesting(TestSetup.TestBaseDir);
1561+
1562+
var group = svc.CreateMultiAgentGroup("BroadcastTest", MultiAgentMode.Broadcast);
1563+
svc.Organization.Sessions.Add(new SessionMeta { SessionName = "b1", GroupId = group.Id });
1564+
svc.Organization.Sessions.Add(new SessionMeta { SessionName = "b2", GroupId = group.Id });
1565+
1566+
// Broadcast mode has no orchestrator — should return null for all members
1567+
Assert.Null(svc.GetOrchestratorGroupId("b1"));
1568+
Assert.Null(svc.GetOrchestratorGroupId("b2"));
1569+
}
1570+
1571+
#endregion
13551572
}

PolyPilot.Tests/TestIsolationGuardTests.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ namespace PolyPilot.Tests;
1616
/// settings. This has caused production data loss (squad groups destroyed)
1717
/// multiple times before the guard was added.
1818
/// </summary>
19+
[Collection("BaseDir")]
1920
public class TestIsolationGuardTests
2021
{
2122
[Fact]

PolyPilot.Tests/UsageStatsTests.cs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
namespace PolyPilot.Tests;
66

7+
[Collection("BaseDir")]
78
public class UsageStatsTests : IDisposable
89
{
910
private readonly string _testDir;
@@ -25,10 +26,8 @@ private void ResetStaticFields()
2526
BindingFlags.NonPublic | BindingFlags.Static);
2627
statsPathField?.SetValue(null, null);
2728

28-
// Override CopilotService.BaseDir
29-
var baseDirField = typeof(CopilotService).GetField("_polyPilotBaseDir",
30-
BindingFlags.NonPublic | BindingFlags.Static);
31-
baseDirField?.SetValue(null, _testDir);
29+
// Override CopilotService.BaseDir via the proper API
30+
CopilotService.SetBaseDirForTesting(_testDir);
3231
}
3332

3433
private UsageStatsService CreateService()
@@ -58,9 +57,8 @@ public void Dispose()
5857
BindingFlags.NonPublic | BindingFlags.Static);
5958
statsPathField?.SetValue(null, null);
6059

61-
var baseDirField = typeof(CopilotService).GetField("_polyPilotBaseDir",
62-
BindingFlags.NonPublic | BindingFlags.Static);
63-
baseDirField?.SetValue(null, null);
60+
// Restore to the shared test base dir (never null it — causes races)
61+
CopilotService.SetBaseDirForTesting(TestSetup.TestBaseDir);
6462
}
6563

6664
[Fact]

PolyPilot/Components/DiffView.razor

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,4 +69,5 @@ else
6969
{
7070
Files = DiffParser.Parse(RawDiff);
7171
}
72+
7273
}

PolyPilot/Components/Pages/Dashboard.razor

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1209,6 +1209,7 @@
12091209
var session = sessions.FirstOrDefault(s => s.Name == sessionName);
12101210
if (session?.IsProcessing == true)
12111211
{
1212+
CopilotService.LogDispatchRoute(sessionName, true, "QUEUED", null, null, null, false);
12121213
List<string>? queueImagePaths = null;
12131214
if (hasImages)
12141215
{
@@ -1250,19 +1251,24 @@
12501251
var group = sessionMeta?.GroupId != null
12511252
? CopilotService.Organization.Groups.FirstOrDefault(g => g.Id == sessionMeta.GroupId)
12521253
: null;
1254+
var orchSession = group != null ? CopilotService.GetOrchestratorSession(group.Id) : null;
12531255
var isOrchestrator = group is { IsMultiAgent: true }
12541256
&& (group.OrchestratorMode == MultiAgentMode.Orchestrator || group.OrchestratorMode == MultiAgentMode.OrchestratorReflect)
1255-
&& CopilotService.GetOrchestratorSession(group.Id) == sessionName;
1257+
&& orchSession == sessionName;
1258+
1259+
// Diagnostic: log every send attempt so we can trace routing failures
1260+
CopilotService.LogDispatchRoute(sessionName, sessionMeta != null, group?.Name, group?.IsMultiAgent, group?.OrchestratorMode, orchSession, isOrchestrator);
12561261

12571262
if (isOrchestrator && (imagePaths == null || imagePaths.Count == 0))
12581263
{
12591264
_ = CopilotService.SendToMultiAgentGroupAsync(group!.Id, finalPrompt).ContinueWith(t =>
12601265
{
12611266
if (t.IsFaulted)
12621267
{
1268+
var msg = t.Exception?.InnerException?.Message ?? t.Exception?.Message ?? "unknown";
12631269
InvokeAsync(() =>
12641270
{
1265-
Console.WriteLine($"Error sending to multi-agent group: {t.Exception?.InnerException?.Message}");
1271+
Console.WriteLine($"[DISPATCH] Error sending to multi-agent group: {msg}");
12661272
});
12671273
}
12681274
});

PolyPilot/Services/CopilotService.Events.cs

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -824,6 +824,10 @@ private void CompleteResponse(SessionState state, long? expectedGeneration = nul
824824
var skipHistory = state.Info.ReflectionCycle is { IsActive: true } &&
825825
ReflectionCycle.IsReflectionFollowUpPrompt(nextPrompt);
826826

827+
// Check if the dequeued message is for an orchestrator session — if so,
828+
// route through the multi-agent dispatch pipeline instead of direct send.
829+
var orchGroupId = GetOrchestratorGroupId(state.Info.Name);
830+
827831
// Use Task.Run to dispatch on a clean stack frame, avoiding reentrancy
828832
// issues where CompleteResponse hasn't fully unwound yet.
829833
_ = Task.Run(async () =>
@@ -839,7 +843,15 @@ private void CompleteResponse(SessionState state, long? expectedGeneration = nul
839843
{
840844
try
841845
{
842-
await SendPromptAsync(state.Info.Name, nextPrompt, imagePaths: nextImagePaths, skipHistoryMessage: skipHistory, agentMode: nextAgentMode);
846+
if (orchGroupId != null && nextImagePaths is null or { Count: 0 })
847+
{
848+
Debug($"[DISPATCH] Queue drain routing to multi-agent pipeline: session='{state.Info.Name}', group='{orchGroupId}'");
849+
await SendToMultiAgentGroupAsync(orchGroupId, nextPrompt);
850+
}
851+
else
852+
{
853+
await SendPromptAsync(state.Info.Name, nextPrompt, imagePaths: nextImagePaths, skipHistoryMessage: skipHistory, agentMode: nextAgentMode);
854+
}
843855
tcs.TrySetResult();
844856
}
845857
catch (Exception ex)
@@ -851,7 +863,15 @@ private void CompleteResponse(SessionState state, long? expectedGeneration = nul
851863
}
852864
else
853865
{
854-
await SendPromptAsync(state.Info.Name, nextPrompt, imagePaths: nextImagePaths, skipHistoryMessage: skipHistory, agentMode: nextAgentMode);
866+
if (orchGroupId != null && nextImagePaths is null or { Count: 0 })
867+
{
868+
Debug($"[DISPATCH] Queue drain routing to multi-agent pipeline: session='{state.Info.Name}', group='{orchGroupId}'");
869+
await SendToMultiAgentGroupAsync(orchGroupId, nextPrompt);
870+
}
871+
else
872+
{
873+
await SendPromptAsync(state.Info.Name, nextPrompt, imagePaths: nextImagePaths, skipHistoryMessage: skipHistory, agentMode: nextAgentMode);
874+
}
855875
}
856876
}
857877
catch (Exception ex)

0 commit comments

Comments
 (0)