Skip to content

Commit 317cd48

Browse files
committed
fix: Separate mutation and notification locks in ObservableCache to prevent cross-cache deadlock
Splits _locker into _locker (mutation) and _notificationLocker (notification delivery) so that InvokeNext no longer holds the mutation lock during _changes.OnNext. Adds re-entrant notification queue to preserve Rx serialization contract when subscribers trigger nested writes.
1 parent 8db4806 commit 317cd48

File tree

2 files changed

+119
-39
lines changed

2 files changed

+119
-39
lines changed

src/DynamicData.Tests/Cache/SourceCacheFixture.cs

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1-
using System;
1+
using System;
22
using System.Linq;
33
using System.Reactive.Linq;
4+
using System.Threading;
5+
using System.Threading.Tasks;
46

57
using DynamicData.Tests.Domain;
68

@@ -188,4 +190,39 @@ public void StaticFilterRemove()
188190

189191
public record class SomeObject(int Id, int Value);
190192

193+
194+
[Fact]
195+
public void ConcurrentEditsShouldNotDeadlockWithSubscribersThatModifyOtherCaches()
196+
{
197+
using var cacheA = new SourceCache<string, int>(static x => x.GetHashCode());
198+
using var cacheB = new SourceCache<string, int>(static x => x.GetHashCode());
199+
using var destination = new SourceCache<string, int>(static x => x.GetHashCode());
200+
using var subA = cacheA.Connect().PopulateInto(destination);
201+
using var subB = cacheB.Connect().PopulateInto(destination);
202+
203+
var count = 0;
204+
using var feedback = destination.Connect().Subscribe(_ => Interlocked.Increment(ref count));
205+
206+
var completed = Task.WaitAll(
207+
[
208+
Task.Run(() =>
209+
{
210+
for (var i = 0; i < 100; i++)
211+
{
212+
cacheA.AddOrUpdate($"A-{i}");
213+
}
214+
}),
215+
Task.Run(() =>
216+
{
217+
for (var i = 0; i < 100; i++)
218+
{
219+
cacheB.AddOrUpdate($"B-{i}");
220+
}
221+
}),
222+
],
223+
TimeSpan.FromSeconds(10));
224+
225+
completed.Should().BeTrue("concurrent edits with cross-cache subscribers should not deadlock");
226+
count.Should().BeGreaterThan(0, "destination should have received changeset notifications");
227+
}
191228
}

src/DynamicData/Cache/ObservableCache.cs

Lines changed: 81 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -32,17 +32,23 @@ internal sealed class ObservableCache<TObject, TKey> : IObservableCache<TObject,
3232

3333
#if NET9_0_OR_GREATER
3434
private readonly Lock _locker = new();
35+
private readonly Lock _notificationLocker = new();
3536
#else
3637
private readonly object _locker = new();
38+
private readonly object _notificationLocker = new();
3739
#endif
3840

3941
private readonly ReaderWriter<TObject, TKey> _readerWriter;
4042

43+
private readonly Queue<ChangeSet<TObject, TKey>> _pendingNotifications = new();
44+
4145
private int _editLevel; // The level of recursion in editing.
4246

47+
private bool _isNotifying;
48+
4349
public ObservableCache(IObservable<IChangeSet<TObject, TKey>> source)
4450
{
45-
_suspensionTracker = new(() => new SuspensionTracker(_changes.OnNext, InvokeCountNext));
51+
_suspensionTracker = new(() => new SuspensionTracker(InvokeNext, InvokeCountNext));
4652
_readerWriter = new ReaderWriter<TObject, TKey>();
4753

4854
var loader = source.Synchronize(_locker).Finally(
@@ -83,7 +89,7 @@ public ObservableCache(IObservable<IChangeSet<TObject, TKey>> source)
8389

8490
public ObservableCache(Func<TObject, TKey>? keySelector = null)
8591
{
86-
_suspensionTracker = new(() => new SuspensionTracker(_changes.OnNext, InvokeCountNext));
92+
_suspensionTracker = new(() => new SuspensionTracker(InvokeNext, InvokeCountNext));
8793
_readerWriter = new ReaderWriter<TObject, TKey>(keySelector);
8894

8995
_cleanUp = Disposable.Create(
@@ -188,10 +194,10 @@ internal void UpdateFromIntermediate(Action<ICacheUpdater<TObject, TKey>> update
188194
{
189195
updateAction.ThrowArgumentNullExceptionIfNull(nameof(updateAction));
190196

197+
ChangeSet<TObject, TKey>? changes = null;
198+
191199
lock (_locker)
192200
{
193-
ChangeSet<TObject, TKey>? changes = null;
194-
195201
_editLevel++;
196202
if (_editLevel == 1)
197203
{
@@ -205,21 +211,26 @@ internal void UpdateFromIntermediate(Action<ICacheUpdater<TObject, TKey>> update
205211

206212
_editLevel--;
207213

208-
if (changes is not null && _editLevel == 0)
214+
if (_editLevel != 0)
209215
{
210-
InvokeNext(changes);
216+
changes = null;
211217
}
212218
}
219+
220+
if (changes is not null)
221+
{
222+
InvokeNext(changes);
223+
}
213224
}
214225

215226
internal void UpdateFromSource(Action<ISourceUpdater<TObject, TKey>> updateAction)
216227
{
217228
updateAction.ThrowArgumentNullExceptionIfNull(nameof(updateAction));
218229

230+
ChangeSet<TObject, TKey>? changes = null;
231+
219232
lock (_locker)
220233
{
221-
ChangeSet<TObject, TKey>? changes = null;
222-
223234
_editLevel++;
224235
if (_editLevel == 1)
225236
{
@@ -233,11 +244,16 @@ internal void UpdateFromSource(Action<ISourceUpdater<TObject, TKey>> updateActio
233244

234245
_editLevel--;
235246

236-
if (changes is not null && _editLevel == 0)
247+
if (_editLevel != 0)
237248
{
238-
InvokeNext(changes);
249+
changes = null;
239250
}
240251
}
252+
253+
if (changes is not null)
254+
{
255+
InvokeNext(changes);
256+
}
241257
}
242258

243259
private IObservable<IChangeSet<TObject, TKey>> CreateConnectObservable(Func<TObject, bool>? predicate, bool suppressEmptyChangeSets) =>
@@ -246,19 +262,22 @@ private IObservable<IChangeSet<TObject, TKey>> CreateConnectObservable(Func<TObj
246262
{
247263
lock (_locker)
248264
{
249-
var initial = InternalEx.Return(() => (IChangeSet<TObject, TKey>)GetInitialUpdates(predicate));
250-
var changes = initial.Concat(_changes);
251-
252-
if (predicate != null)
265+
lock (_notificationLocker)
253266
{
254-
changes = changes.Filter(predicate, suppressEmptyChangeSets);
255-
}
256-
else if (suppressEmptyChangeSets)
257-
{
258-
changes = changes.NotEmpty();
259-
}
267+
var initial = InternalEx.Return(() => (IChangeSet<TObject, TKey>)GetInitialUpdates(predicate));
268+
var changes = initial.Concat(_changes);
269+
270+
if (predicate != null)
271+
{
272+
changes = changes.Filter(predicate, suppressEmptyChangeSets);
273+
}
274+
else if (suppressEmptyChangeSets)
275+
{
276+
changes = changes.NotEmpty();
277+
}
260278

261-
return changes.SubscribeSafe(observer);
279+
return changes.SubscribeSafe(observer);
280+
}
262281
}
263282
});
264283

@@ -268,36 +287,60 @@ private IObservable<Change<TObject, TKey>> CreateWatchObservable(TKey key) =>
268287
{
269288
lock (_locker)
270289
{
271-
var initial = _readerWriter.Lookup(key);
272-
if (initial.HasValue)
290+
lock (_notificationLocker)
273291
{
274-
observer.OnNext(new Change<TObject, TKey>(ChangeReason.Add, key, initial.Value));
275-
}
276-
277-
return _changes.Finally(observer.OnCompleted).Subscribe(
278-
changes =>
292+
var initial = _readerWriter.Lookup(key);
293+
if (initial.HasValue)
279294
{
280-
foreach (var change in changes.ToConcreteType())
295+
observer.OnNext(new Change<TObject, TKey>(ChangeReason.Add, key, initial.Value));
296+
}
297+
298+
return _changes.Finally(observer.OnCompleted).Subscribe(
299+
changes =>
281300
{
282-
var match = EqualityComparer<TKey>.Default.Equals(change.Key, key);
283-
if (match)
301+
foreach (var change in changes.ToConcreteType())
284302
{
285-
observer.OnNext(change);
303+
var match = EqualityComparer<TKey>.Default.Equals(change.Key, key);
304+
if (match)
305+
{
306+
observer.OnNext(change);
307+
}
286308
}
287-
}
288-
});
309+
});
310+
}
289311
}
290312
});
291313

292314
private void InvokeNext(ChangeSet<TObject, TKey> changes)
293315
{
294-
lock (_locker)
316+
lock (_notificationLocker)
295317
{
296318
// If Notifications are not suspended
297319
if (!_suspensionTracker.IsValueCreated || !_suspensionTracker.Value.AreNotificationsSuspended)
298320
{
299-
// Emit the changes
300-
_changes.OnNext(changes);
321+
if (_isNotifying)
322+
{
323+
// Re-entrant call: queue for delivery after the current OnNext completes.
324+
_pendingNotifications.Enqueue(changes);
325+
}
326+
else
327+
{
328+
_isNotifying = true;
329+
try
330+
{
331+
_changes.OnNext(changes);
332+
333+
// Drain any re-entrant notifications queued during OnNext.
334+
while (_pendingNotifications.Count > 0)
335+
{
336+
_changes.OnNext(_pendingNotifications.Dequeue());
337+
}
338+
}
339+
finally
340+
{
341+
_isNotifying = false;
342+
}
343+
}
301344
}
302345
else
303346
{
@@ -326,7 +369,7 @@ private void InvokePreview(ChangeSet<TObject, TKey> changes)
326369

327370
private void InvokeCountNext()
328371
{
329-
lock (_locker)
372+
lock (_notificationLocker)
330373
{
331374
if (_countChanged.IsValueCreated)
332375
{

0 commit comments

Comments
 (0)