Skip to content

Commit 501c9f2

Browse files
committed
fix: Replace lock-during-notification with queue-based drain to prevent cross-cache deadlock
The original code held _locker while calling _changes.OnNext(), so subscriber callbacks that propagated to other caches created ABBA deadlocks when concurrent writes were happening on those caches. New design: - Single _locker protects mutation and queue state - Write paths: lock, mutate, enqueue changeset, release lock, then drain - DrainOutsideLock delivers notifications with no lock held - _isDraining flag ensures only one thread drains at a time, preserving Rx serialization contract - Re-entrant writes enqueue and return; the outer drain loop delivers them sequentially - Connect/Watch/CountChanged use Skip(pendingCount) to avoid duplicating items already in the snapshot, with no delivery under lock - Terminal events (OnCompleted/OnError) routed through drain queue - Preview remains synchronous under _locker (required by ReaderWriter) - Suspension state captured at enqueue time; re-checked at delivery - try/catch resets _isDraining on exception - volatile _isTerminated prevents post-dispose delivery
1 parent c4b89af commit 501c9f2

File tree

2 files changed

+412
-64
lines changed

2 files changed

+412
-64
lines changed

src/DynamicData.Tests/Cache/SourceCacheFixture.cs

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1-
using System;
1+
using System;
22
using System.Linq;
33
using System.Reactive.Linq;
4+
using System.Threading;
5+
using System.Threading.Tasks;
46

57
using DynamicData.Tests.Domain;
68

@@ -188,4 +190,43 @@ public void StaticFilterRemove()
188190

189191
public record class SomeObject(int Id, int Value);
190192

193+
194+
[Fact]
195+
public async Task ConcurrentEditsShouldNotDeadlockWithSubscribersThatModifyOtherCaches()
196+
{
197+
const int itemCount = 100;
198+
199+
using var cacheA = new SourceCache<TestItem, string>(static x => x.Key);
200+
using var cacheB = new SourceCache<TestItem, string>(static x => x.Key);
201+
using var destination = new SourceCache<TestItem, string>(static x => x.Key);
202+
using var subA = cacheA.Connect().PopulateInto(destination);
203+
using var subB = cacheB.Connect().PopulateInto(destination);
204+
using var results = destination.Connect().AsAggregator();
205+
206+
var taskA = Task.Run(() =>
207+
{
208+
for (var i = 0; i < itemCount; i++)
209+
{
210+
cacheA.AddOrUpdate(new TestItem($"a-{i}", $"ValueA-{i}"));
211+
}
212+
});
213+
214+
var taskB = Task.Run(() =>
215+
{
216+
for (var i = 0; i < itemCount; i++)
217+
{
218+
cacheB.AddOrUpdate(new TestItem($"b-{i}", $"ValueB-{i}"));
219+
}
220+
});
221+
222+
var completed = Task.WhenAll(taskA, taskB);
223+
var finished = await Task.WhenAny(completed, Task.Delay(TimeSpan.FromSeconds(10)));
224+
225+
finished.Should().BeSameAs(completed, "concurrent edits with cross-cache subscribers should not deadlock");
226+
results.Error.Should().BeNull();
227+
results.Data.Count.Should().Be(itemCount * 2, "all items from both caches should arrive in the destination");
228+
results.Data.Items.Should().BeEquivalentTo([.. cacheA.Items, .. cacheB.Items], "all items should be in the destination");
229+
}
230+
231+
private sealed record TestItem(string Key, string Value);
191232
}

0 commit comments

Comments
 (0)