Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 121 additions & 0 deletions Motely.Tests/MotelySearchReliabilityTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
using Motely.Filters;
using Xunit;

namespace Motely.Tests;

/// <summary>
/// Reliability regressions for <see cref="MotelySearch{TBaseFilter}"/>.
/// Both tests covered HIGH-severity items in ISSUES.md:
/// 1. Single-thread <see cref="MotelySearch{TBaseFilter}.RunSearchAsync"/> used to run the
/// worker body synchronously on the caller, so awaiting it on the same context deadlocked.
/// 2. Multi-thread workers used to swallow exceptions silently — the completion source
/// stayed unset and callers hung forever.
/// </summary>
public sealed class MotelySearchReliabilityTests
{
/// <summary>Always-pass base filter so the worker actually runs for the seed list.</summary>
private readonly struct PassFilterDesc : IMotelySeedFilterDesc<PassFilterDesc.PassFilter>
{
public PassFilter CreateFilter(ref MotelyFilterCreationContext ctx) => new();

public readonly struct PassFilter : IMotelySeedFilter
{
public readonly VectorMask Filter(ref MotelyVectorSearchContext _) =>
VectorMask.AllBitsSet;
}
}

/// <summary>Base filter that throws inside the SIMD loop.</summary>
private readonly struct ThrowingFilterDesc : IMotelySeedFilterDesc<ThrowingFilterDesc.ThrowingFilter>
{
public ThrowingFilter CreateFilter(ref MotelyFilterCreationContext ctx) => new();

public readonly struct ThrowingFilter : IMotelySeedFilter
{
public readonly VectorMask Filter(ref MotelyVectorSearchContext _) =>
throw new InvalidOperationException("boom from worker");
}
}

[Fact]
public async Task RunSearchAsync_SingleThread_DoesNotDeadlockAwaitOnSameContext()
{
var settings = new MotelySearchSettings<PassFilterDesc.PassFilter>(new PassFilterDesc())
.WithListSearch(["AAAAAAAA"], 1)
.WithThreadCount(1)
.WithQuietMode(true);

using var search = settings.CreateSearch();

// The bug under fix: when totalWorkers == 1 the worker body used to run
// synchronously inside Start(), meaning RunSearchAsync()'s returned Task
// only completed after the search had already drained on the caller's
// thread — fatal for `await search.RunSearchAsync()` on a sync ctx.
// A bounded wait proves the Task yields and completes off-thread.
var run = search.RunSearchAsync();
var winner = await Task.WhenAny(run, Task.Delay(TimeSpan.FromSeconds(5)));
Assert.Same(run, winner);
await run; // no exception → completed cleanly
Assert.True(search.IsCompleted);
}

[Fact]
public async Task RunSearchAsync_SingleThread_SurfacesWorkerException()
{
var settings = new MotelySearchSettings<ThrowingFilterDesc.ThrowingFilter>(new ThrowingFilterDesc())
.WithListSearch(["AAAAAAAA"], 1)
.WithThreadCount(1)
.WithQuietMode(true);

using var search = settings.CreateSearch();
var ex = await Assert.ThrowsAnyAsync<Exception>(() => search.RunSearchAsync());
Assert.Contains("boom from worker", FlattenMessages(ex));
}

[Fact]
public async Task RunSearchAsync_MultiThread_SurfacesWorkerException()
{
// Many lanes / multiple workers so the throw lands on a worker thread,
// not the caller. Previously this would set _completionSource to nothing
// and the await would hang forever.
var seeds = Enumerable.Range(0, 1024).Select(i => $"S{i:D7}").ToArray();
var settings = new MotelySearchSettings<ThrowingFilterDesc.ThrowingFilter>(new ThrowingFilterDesc())
.WithListSearch(seeds, seeds.Length)
.WithThreadCount(Math.Max(2, Environment.ProcessorCount))
.WithQuietMode(true);

using var search = settings.CreateSearch();
var run = search.RunSearchAsync();
var winner = await Task.WhenAny(run, Task.Delay(TimeSpan.FromSeconds(5)));
Assert.Same(run, winner); // didn't hang
var ex = await Assert.ThrowsAnyAsync<Exception>(() => run);
Assert.Contains("boom from worker", FlattenMessages(ex));
}

[Fact]
public void RunSearchUntilCompletion_MultiThread_SurfacesWorkerException()
{
var seeds = Enumerable.Range(0, 1024).Select(i => $"S{i:D7}").ToArray();
var settings = new MotelySearchSettings<ThrowingFilterDesc.ThrowingFilter>(new ThrowingFilterDesc())
.WithListSearch(seeds, seeds.Length)
.WithThreadCount(Math.Max(2, Environment.ProcessorCount))
.WithQuietMode(true);

using var search = settings.CreateSearch();
// Sync surface: previously the throw was lost and SignalSearchCompleted()
// marked the search "clean". Now the first error rethrows.
var ex = Assert.ThrowsAny<Exception>(() => search.RunSearchUntilCompletion());
Assert.Contains("boom from worker", FlattenMessages(ex));
}

private static string FlattenMessages(Exception ex)
{
var msgs = new List<string>();
for (var current = ex; current is not null; current = current.InnerException)
msgs.Add(current.Message);
if (ex is AggregateException agg)
foreach (var inner in agg.Flatten().InnerExceptions)
msgs.Add(inner.Message);
return string.Join(" | ", msgs);
}
}
126 changes: 95 additions & 31 deletions Motely/MotelySearch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1095,10 +1095,28 @@ public void RunSearchUntilCompletion()
throw new InvalidOperationException("Search has already been started.");
_elapsedTime.Start();

Exception? firstError = null;

void RunWorkerSafe(int idx)
{
try
{
RunWorkerBody(_plans[idx]);
}
catch (OperationCanceledException) when (_cancellationToken.IsCancellationRequested)
{
// cooperative cancellation
}
catch (Exception ex)
{
Interlocked.CompareExchange(ref firstError, ex, null);
}
}

if (_threadCount == 1)
{
// Single-threaded: run directly on this thread
RunWorkerBody(_plans[0]);
RunWorkerSafe(0);
}
else
{
Expand All @@ -1107,7 +1125,7 @@ public void RunSearchUntilCompletion()
for (int i = 0; i < _threadCount; i++)
{
int threadIdx = i;
threads[i] = new Thread(() => RunWorkerBody(_plans[threadIdx]))
threads[i] = new Thread(() => RunWorkerSafe(threadIdx))
{
Name = $"Motely Search Thread {threadIdx}",
IsBackground = true
Expand All @@ -1121,6 +1139,15 @@ public void RunSearchUntilCompletion()
}
}

if (firstError is not null)
{
// Tell awaiters about the failure, then rethrow on the caller for the
// sync surface. Without this, a failed worker used to look like a clean
// completion to anyone waiting on _completionSource.
_completionSource.TrySetException(firstError);
throw firstError;
}

SignalSearchCompleted();
}

Expand Down Expand Up @@ -1195,48 +1222,85 @@ private void BeginSearch(CancellationToken cancellationToken)
/// Starts search threads without blocking the caller.
/// Completion is signaled via <see cref="_completionSource"/>.
/// </summary>
/// <remarks>
/// Both paths off-thread the worker bodies. The single-thread case used to run
/// synchronously on the caller, which broke <see cref="RunSearchAsync"/>: the Task
/// only returned after the search had already completed, so any code that did
/// <c>await search.RunSearchAsync()</c> on the same thread deadlocked. The
/// multi-thread case used to swallow worker exceptions silently — if a worker
/// threw, <see cref="_completionSource"/> never moved and the caller hung forever.
/// Every worker is now wrapped so the first exception is captured and surfaced
/// through <see cref="_completionSource"/> once the last worker drops out.
/// </remarks>
private void StartSearchThreads()
{
// what the fuck - pifreak
// //ObjectDisposedException.ThrowIf(Volatile.Read(ref _isDisposed) != 0, this);

_elapsedTime.Start();

if (_threadCount == 1)
int totalWorkers = _threadCount;
WorkerCoordinator coordinator = new(this, totalWorkers);

for (int i = 0; i < totalWorkers; i++)
{
int threadIdx = i;
var thread = new Thread(() => coordinator.RunWorker(threadIdx))
{
Name = totalWorkers == 1
? "Motely Search Thread"
: $"Motely Search Thread {threadIdx}",
IsBackground = true,
};
thread.Start();
}
}

try
{
RunWorkerBody(_plans[0]);
SignalSearchCompleted();
}
catch (Exception ex)
{
_completionSource.TrySetException(ex);
}
/// <summary>
/// Tracks the running worker count and routes the first thrown exception back
/// through <see cref="_completionSource"/>. One instance per search.
/// </summary>
private sealed class WorkerCoordinator
{
private readonly MotelySearch<TBaseFilter> _owner;
private int _remaining;
private Exception? _firstError;

public WorkerCoordinator(MotelySearch<TBaseFilter> owner, int totalWorkers)
{
_owner = owner;
_remaining = totalWorkers;
}
else

public void RunWorker(int idx)
{
int remaining = _threadCount;
for (int i = 0; i < _threadCount; i++)
try
{
int threadIdx = i;
var thread = new Thread(() =>
_owner.RunWorkerBody(_owner._plans[idx]);
}
catch (OperationCanceledException) when (_owner._cancellationToken.IsCancellationRequested)
{
// honour cooperative cancellation — no error
}
catch (Exception ex)
{
Interlocked.CompareExchange(ref _firstError, ex, null);
}
finally
{
if (Interlocked.Decrement(ref _remaining) == 0)
{
RunWorkerBody(_plans[threadIdx]);
if (Interlocked.Decrement(ref remaining) == 0)
Thread.MemoryBarrier();
var err = Volatile.Read(ref _firstError);
if (err is not null)
{
_owner._completionSource.TrySetException(err);
}
else
{
Thread.MemoryBarrier();
bool completed =
Volatile.Read(ref _isDisposed) == 0 && !_cancellationToken.IsCancellationRequested;
_completionSource.TrySetResult(completed);
Volatile.Read(ref _owner._isDisposed) == 0
&& !_owner._cancellationToken.IsCancellationRequested;
_owner._completionSource.TrySetResult(completed);
}
})
{
Name = $"Motely Search Thread {threadIdx}",
IsBackground = true
};
thread.Start();
}
}
}
}
Expand Down