Fix Cross-Cache Deadlock with Notification Queue#1074
Fix Cross-Cache Deadlock with Notification Queue#1074dwcullop wants to merge 17 commits intoreactivemarbles:mainfrom
Conversation
There was a problem hiding this comment.
There's a critical flaw here, that I discussed in the corresponding issue: there's a race condition that allows for corruption of the change stream. If 2 edits are occurring in parallel, it's possible for edit A to release _locker, and then get suspended before it acquires _notificationLocker. Within this window, edit B can come in, make it's changes, and then acquire _notificationLocker before edit A.
There's a mechanism in the codebase already for coordinating lock handoff like this, where you need to acquire a second lock before releasing the first. I don't recall exactlybwhere it is or what it's called, but I believe I wrote it for one of the .Filter() operators. Lemme know if you have trouble finding it. I'm out of town, so it's tough for me to look just on my phone.
317cd48 to
145156d
Compare
Perfect. Exactly what I needed to close the gap that two locks introduces. |
There was a problem hiding this comment.
Pull request overview
This PR addresses cross-cache deadlocks in ObservableCache by separating the mutation lock from the notification lock, so subscriber callbacks (OnNext) no longer run while the cache mutation lock is held.
Changes:
- Introduces a dedicated
_notificationLockerand re-entrancy-safe notification draining inObservableCache. - Extends
SwappableLockto supportLockonNET9_0_OR_GREATERfor lock handoff patterns. - Adds a regression test covering concurrent edits with cross-cache subscriber interactions, and hardens expiration removal logic.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
src/DynamicData/Internal/SwappableLock.cs |
Adds Lock-based enter/swap/dispose support for NET9 to enable lock handoff without a gap. |
src/DynamicData/Cache/ObservableCache.cs |
Splits mutation vs notification locking and adds a re-entrancy queue to serialize notifications. |
src/DynamicData/Cache/Internal/ExpireAfter.ForSource.cs |
Adds safety checks to avoid removing items that no longer match the scheduled expiration conditions. |
src/DynamicData.Tests/Cache/SourceCacheFixture.cs |
Adds a concurrency regression test intended to catch the historical deadlock scenario. |
Comments suppressed due to low confidence (2)
src/DynamicData/Cache/ObservableCache.cs:357
- InvokeNext reads SuspensionTracker state (AreNotificationsSuspended / IsCountSuspended) while only holding _notificationLocker, but Suspend*/Resume* mutate those counters while holding _locker. This introduces a cross-thread data race and can allow notifications/count updates to slip through while suspension is being applied (or remain suppressed after resume). Consider synchronizing suspension state with _notificationLocker as well (e.g., take _notificationLocker inside Suspend*/Resume* in the fixed order _locker -> _notificationLocker), or make the suspension counters/flags use Volatile/Interlocked so InvokeNext has a happens-before relationship with Suspend*/Resume*.
lock (_notificationLocker)
{
// If Notifications are not suspended
if (!_suspensionTracker.IsValueCreated || !_suspensionTracker.Value.AreNotificationsSuspended)
{
if (_isNotifying)
{
// Re-entrant call: queue for delivery after the current OnNext completes.
_pendingNotifications.Enqueue(changes);
}
else
{
_isNotifying = true;
try
{
_changes.OnNext(changes);
// Drain any re-entrant notifications queued during OnNext.
while (_pendingNotifications.Count > 0)
{
_changes.OnNext(_pendingNotifications.Dequeue());
}
}
finally
{
_isNotifying = false;
}
}
}
else
{
// Don't emit the changes, but add them to the list
_suspensionTracker.Value.EnqueueChanges(changes);
}
// If CountChanges are not suspended
if (!_suspensionTracker.IsValueCreated || !_suspensionTracker.Value.IsCountSuspended)
{
InvokeCountNext();
}
src/DynamicData/Cache/ObservableCache.cs:65
- In the source-backed constructor, source.Synchronize(_locker) causes the subscription callback (and the subsequent InvokeNext call) to execute while holding _locker. That means OnNext delivery for these ObservableCache instances is still performed under the mutation lock, which can reintroduce the cross-cache lock-order deadlock this PR is trying to eliminate. Consider replacing Synchronize(_locker) with an explicit lock + SwappableLock handoff so the write happens under _locker, then notifications are delivered under _notificationLocker after releasing _locker.
_suspensionTracker = new(() => new SuspensionTracker(InvokeNext, InvokeCountNext));
_readerWriter = new ReaderWriter<TObject, TKey>();
var loader = source.Synchronize(_locker).Finally(
() =>
{
_changes.OnCompleted();
_changesPreview.OnCompleted();
}).Subscribe(
changeSet =>
{
var previewHandler = _changesPreview.HasObservers ? (Action<ChangeSet<TObject, TKey>>)InvokePreview : null;
var changes = _readerWriter.Write(changeSet, previewHandler, _changes.HasObservers);
InvokeNext(changes);
},
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
…nt cross-cache deadlock The original code held _locker while calling _changes.OnNext(), so subscriber callbacks that propagated to other caches created ABBA deadlocks when concurrent writes were happening on those caches. New design: - Single _locker protects mutation and queue state - Write paths: lock, mutate, enqueue changeset, release lock, then drain - DrainOutsideLock delivers notifications with no lock held - _isDraining flag ensures only one thread drains at a time, preserving Rx serialization contract - Re-entrant writes enqueue and return; the outer drain loop delivers them sequentially - Connect/Watch/CountChanged use Skip(pendingCount) to avoid duplicating items already in the snapshot, with no delivery under lock - Terminal events (OnCompleted/OnError) routed through drain queue - Preview remains synchronous under _locker (required by ReaderWriter) - Suspension state captured at enqueue time; re-checked at delivery - try/catch resets _isDraining on exception - volatile _isTerminated prevents post-dispose delivery
6337b24 to
501c9f2
Compare
… contracts and thread-safety.
|
I had to completely change tactics to get all the corner cases closed but this is working and the unit tests pass. I updated the bug with a real repro, but it is kind of contrived. It's just not easy to repro what I am getting to production without contriving it. |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 6 out of 6 changed files in this pull request and generated 6 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Introduce ReadOnlyScopedAccess to DeliveryQueue<TItem> for safe, read-only access to queue state under lock. Update tests and ObservableCache<TObject, TKey> to use AcquireReadLock() for reading PendingCount, replacing direct property access and manual locking. Make PendingCount private and encapsulate lock release logic. Wrap _suspensionTracker disposal in a lock for thread safety. These changes improve thread safety and clarify access patterns for queue state.
6a81e82 to
ab41353
Compare
Refactored DirectCrossWriteDoesNotDeadlock to use Connect, Filter, Transform, and PopulateInto operators for bidirectional cache updates, replacing manual subscription logic. Increased test timeout and clarified assertion message. Prevented infinite feedback with key prefix filtering.
Refactored DeliveryQueue<TItem> to eliminate pending item tracking and PendingCount, removing related read-only lock APIs. ObservableCache<TObject, TKey> now ensures new subscribers do not receive in-flight notifications by connecting under the main lock, preventing duplicate deliveries without pending count logic. NotificationItem and delivery logic were simplified to check suspension state at delivery time. Updated tests: removed PendingCount tests and added a test to verify no duplicate notifications during delivery. Improved comments and code clarity.
Add conditional logic for .NET 9.0+ in SwappableLock to handle both _gate and _lockGate fields. SwapTo now checks both fields for initialization and releases the appropriate lock type, ensuring compatibility with new locking mechanisms while preserving legacy behavior.
Previously, haveExpirationsChanged was overwritten by each call to TrySetExpiration, potentially losing information about prior changes. Now, the |= operator is used to ensure haveExpirationsChanged remains true if any expiration update occurs, preserving the correct state across multiple updates.
Moved _isDelivering reset from finally to catch block in DeliveryQueue<TItem>. Now, the flag is only reset when an exception occurs, and the exception is rethrown, making the error handling more explicit and preventing unnecessary state changes during normal execution.
The MultiThreadedStressTest asserts immediately after stress observables complete, but with drain-outside-lock delivery, Edit() returns after enqueueing while delivery may still be in-flight on another thread. Add a short delay before checking results to allow in-flight deliveries to complete. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 7 out of 7 changed files in this pull request and generated 6 comments.
Comments suppressed due to low confidence (1)
src/DynamicData/Cache/ObservableCache.cs:241
CreateConnectObservablerelies on a comment aboutSubject<T>observer snapshots to claim snapshot + live stream cannot overlap, but with the new drain-outside-lock design a notification can be enqueued (or even already dequeued) beforeConnecttakes its initial snapshot and still be delivered after the subscription is added. In that schedule the new subscriber can see the item in the initial snapshot and then receive the same pre-snapshot notification from_changes, producing duplicate Add/Update events. Consider reintroducing an explicit “already-enqueued / already-dequeued” boundary (e.g., a monotonically increasing sequence number captured with the snapshot and used to skip older notifications, or another mechanism that guarantees the subscriber only observes notifications that occur after the snapshot).
private IObservable<IChangeSet<TObject, TKey>> CreateConnectObservable(Func<TObject, bool>? predicate, bool suppressEmptyChangeSets) =>
Observable.Create<IChangeSet<TObject, TKey>>(
observer =>
{
// Subject<T> snapshots its observer array before iterating OnNext, so a
// subscriber added here will not receive any in-flight notification.
lock (_locker)
{
var initial = InternalEx.Return(() => (IChangeSet<TObject, TKey>)GetInitialUpdates(predicate));
var changes = initial.Concat(_changes);
if (predicate != null)
{
changes = changes.Filter(predicate, suppressEmptyChangeSets);
}
else if (suppressEmptyChangeSets)
{
changes = changes.NotEmpty();
}
return changes.SubscribeSafe(observer);
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheSourceCompareFixture.cs
Outdated
Show resolved
Hide resolved
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
- Publish and explicitly connect merged observable in test, and await completion of all notifications for robust result verification. - Move _suspensionTracker disposal outside lock in ObservableCache to prevent deadlocks and reentrancy issues. - Add System.Reactive.Threading.Tasks import for ToTask() usage.
…ynamicData into bugfix/lock_inversion
Fix race where new subscribers could see duplicate Add notifications if they connect while in-flight changes are being delivered. Introduce a versioning mechanism in ObservableCache to track committed and delivered notifications, and skip already-delivered changes for new subscribers. Extend NotificationItem with a version field and add read-only lock support in DeliveryQueue. Update test to reliably reproduce and verify the fix.
Add comprehensive tests for nested and concurrent suspend/resume scenarios in SuspendNotificationsFixture. Emit resume signals under lock in ObservableCache to prevent race conditions and ensure consistent notification delivery. These changes enhance reliability and determinism of notification delivery under complex and concurrent usage patterns.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 8 out of 8 changed files in this pull request and generated 7 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
- Strengthen test reliability and clarify test names/messages - Rewrite DeliveryQueueFixture test for robust concurrency checks - Enhance ObservableCache to avoid duplicate/applied notifications - Refactor ResumeNotifications to prevent race conditions - Improve comments and code clarity throughout
|
I was expecting this to use System.Threading.Channels for a modern queuing implementation, but then again I'm not familiar with the internals of DynamicData at all so it might not fit in here. |
Fix cross-cache deadlock in ObservableCache
Fixes the cross-cache deadlock described in #1073 by using a queue to avoid holding a lock while firing downstream events.
Problem
ObservableCache held its lock (
_locker) while calling_changes.OnNext()to deliver notifications to subscribers. When a subscriber's callback propagated through a pipeline (Transform, PopulateInto, MergeMany) into a second cache, the callback needed that second cache's lock. If another thread was simultaneously writing to the second cache and its subscriber callback chained back to the first cache, both threads blocked permanently waiting for each other's lock.This was confirmed through analysis of a production hang where 6 threads were deadlocked across multiple ObservableCache instances. The stacks showed the classic pattern: Thread A held CacheA._locker and was blocked on CacheB._locker, while Thread B held CacheB._locker and was blocked on CacheA._locker.
Solution: Queue-Based Drain Pattern
The fix separates mutation from notification delivery. Instead of calling
OnNextwhile holding the lock, writes now enqueue a notification item under the lock, then deliver it after releasing the lock. No lock is ever held when subscriber code runs, which eliminates the cross-cache deadlock entirely.The core of this approach is a new
DeliveryQueue<TItem>class that encapsulates the pattern.DeliveryQueue
DeliveryQueue<TItem>is a generic, reusable concurrency primitive insrc/DynamicData/Internal/DeliveryQueue.cs. It manages a FIFO queue of items and a delivery token that ensures only one thread delivers at a time.The public API is designed to make incorrect usage difficult:
AcquireLock()returns aref struct ScopedAccessthat holds the caller's lock. Queue operations (Enqueue) go through this scoped handle. When it is disposed, the lock is released and pending items are delivered outside the lock. Because it is aref struct, it cannot escape the calling method.AcquireReadLock()returns aref struct ReadOnlyScopedAccessfor inspecting queue state without mutation. It exposes a single property,HasPending, which reports whether items are queued or mid-delivery. Disposing releases the lock without triggering delivery. This is used byConnectandWatchto determine whether version filtering is needed.Func<TItem, bool>that returnstrueto continue orfalseto signal termination. When the callback returnsfalse, the queue sets itself as terminated, clears remaining items, and stops. This gives the queue full ownership of its lifecycle.IsTerminatedis a volatile flag safe to read from any thread.catch/throwblock resets the delivery token so the queue recovers on the next write.System.Threading.LockwithLock.Enter()/Lock.Exit(). Older TFMs useMonitor.Enter()/Monitor.Exit(). Both are defined under a single#ifblock for clarity.How ObservableCache Uses It
Write paths (
UpdateFromSource,UpdateFromIntermediate, source subscriptionOnNext):Terminal events (source completion, source error, dispose) go through the same queue as sentinel items. The delivery callback returns
falsefor these, and the queue self-terminates. This means all terminal cleanup (completing subjects, disposing suspension tracker) happens through a single code path inDeliverNotification.ConnectandWatchuseDeliveryQueue.AcquireReadLock()to atomically read queue state, take a snapshot, and subscribe (all under a single lock acquisition). WhenReadOnlyScopedAccess.HasPendingindicates notifications are queued or mid-delivery, the snapshot may contain data whose notification hasn't fired yet. To prevent duplicates, each enqueued change notification carries a monotonically increasing version (_currentVersion, incremented under lock). Before calling_changes.OnNext(), the delivery callback stamps the item's version into_currentDeliveryVersionviaVolatile.Write. The subscriber's stream is wrapped with.SkipWhile(_ => Volatile.Read(ref _currentDeliveryVersion) <= snapshotVersion), which drops notifications already covered by the snapshot. Once the first post-snapshot notification arrives, Rx'sSkipWhilenulls out its predicate (zero ongoing cost). When no notifications are pending, the subscriber gets the raw_changesSubject with no filter overhead.Suspension
Suspension state is checked at delivery time via local helper methods
EmitOrSuspendChangesandEmitOrSuspendCount. These check the liveAreNotificationsSuspended/IsCountSuspendedstate under the lock at the moment of delivery, avoiding TOCTOU issues that would arise from capturing the state at enqueue time. A fast-path check on_suspensionTracker.IsValueCreated(a monotone volatile flag onLazy<T>) skips the lock entirely when suspension has never been used.SuspensionTrackerno longer holds delegates into the queue.ResumeNotificationsreturns the accumulated changes; the caller enqueues them throughScopedAccess. The resume signal (_areNotificationsSuspended.OnNext(false)) is emitted after releasing the lock so that deferredConnect/Watchsubscribers are activated outside the lock scope.NotificationItem
NotificationItemuses aNotificationKindenum (Changes,CountOnly,Completed,Error) and all instances are created through factory methods (CreateChanges,CreateCountOnly,CreateCompleted,CreateError). The delivery callback uses a switch on the kind for dispatch.Connect/Watch Snapshot-Duplicate Fix
The drain-outside-lock design introduced a race where
Connect()could take a snapshot containing data whose notification was still queued or mid-delivery. When that notification later fired via_changes.OnNext(), the new subscriber saw duplicate Adds.The fix uses version-tagged delivery: each enqueued change notification carries a monotonically increasing version (
_currentVersion, incremented under lock). Before calling_changes.OnNext(), the delivery callback stamps the item's version into_currentDeliveryVersionviaVolatile.Write.CreateConnectObservableandCreateWatchObservablecapturesnapshotVersion = _currentVersionunder lock and, only whenDeliveryQueue.ReadOnlyScopedAccess.HasPendingis true, apply.SkipWhile(_ => Volatile.Read(ref _currentDeliveryVersion) <= snapshotVersion)to the_changesstream. Once the first post-snapshot notification arrives,SkipWhilenulls out its predicate (zero ongoing cost).ReadOnlyScopedAccessis a newref structonDeliveryQueuethat acquires the gate for inspection only (exposesHasPendingbut no mutation methods), and itsDisposereleases the lock without triggering delivery.Testing
All existing tests pass, including all multi-threaded stress tests. 21 new tests across 3 fixtures, plus 1 existing test updated for timing reliability.
DeliveryQueueFixture (14 tests)
Covers the new
DeliveryQueue<T>concurrency primitive.EnqueueAndDeliverDeliversItemDeliverDeliversItemsInFifoOrderDeliverWithEmptyQueueIsNoOpOnlyOneDelivererAtATimeSecondWriterItemPickedUpByFirstDelivererReentrantEnqueueDoesNotRecurseExceptionInDeliveryResetsDeliveryTokenRemainingItemsDeliveredAfterExceptionRecoveryTerminalCallbackStopsDeliveryreturn falseterminates queue and clears remaining itemsEnqueueAfterTerminationIsIgnoredIsTerminatedIsFalseInitiallyConcurrentEnqueueAllItemsDeliveredConcurrentEnqueueNoDuplicatesConcurrentEnqueuePreservesPerThreadOrderingSourceCacheFixture (3 tests)
Covers the cross-cache deadlock fix and snapshot-duplicate prevention.
DirectCrossWriteDoesNotDeadlock🐛main.MultiCacheFanInDoesNotDeadlockConnectDuringDeliveryDoesNotDuplicate🐛SuspendNotificationsFixture (4 tests)
Covers the resume signal atomicity fix and suspend/resume ordering.
ResumeThenReSuspendDeliversFirstBatchOnlyReSuspendThenResumeDeliversAllInSingleBatchConcurrentSuspendDuringResumeDoesNotCorruptResumeSignalUnderLockPreventsStaleSnapshotFromReSuspend🐛MergeManyChangeSetsCacheSourceCompareFixture (1 modified test)
Existing test updated for queue-based delivery timing.
MergeManyStressPublish/ConnectandLastOrDefaultAsyncawait to ensure all notifications are delivered before assertionsAlso Included
Locktype overloads. FixedSwapTo(object)to handle the case where the lock was initialized viaCreateAndEnter(Lock).=to|=forhaveExpirationsChangedin the Update case to prevent silently dropping a prior change flag.