Skip to content

Commit ab41353

Browse files
committed
Add read-only lock for DeliveryQueue and improve safety
Introduce ReadOnlyScopedAccess to DeliveryQueue<TItem> for safe, read-only access to queue state under lock. Update tests and ObservableCache<TObject, TKey> to use AcquireReadLock() for reading PendingCount, replacing direct property access and manual locking. Make PendingCount private and encapsulate lock release logic. Wrap _suspensionTracker disposal in a lock for thread safety. These changes improve thread safety and clarify access patterns for queue state.
1 parent 72ea32c commit ab41353

File tree

3 files changed

+104
-50
lines changed

3 files changed

+104
-50
lines changed

src/DynamicData.Tests/Internal/DeliveryQueueFixture.cs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -316,9 +316,9 @@ public void PendingCountPreservedOnException()
316316

317317
act.Should().Throw<InvalidOperationException>();
318318

319-
lock (_gate)
319+
using (var rl = queue.AcquireReadLock())
320320
{
321-
queue.PendingCount.Should().Be(1, "only the dequeued item should be decremented");
321+
rl.PendingCount.Should().Be(1, "only the dequeued item should be decremented");
322322
}
323323
}
324324

@@ -334,7 +334,10 @@ public void PendingCountClearedOnTermination()
334334
notifications.Enqueue("STOP");
335335
}
336336

337-
queue.PendingCount.Should().Be(0);
337+
using (var rl = queue.AcquireReadLock())
338+
{
339+
rl.PendingCount.Should().Be(0);
340+
}
338341
}
339342

340343
// Category 6: Stress / Thread Safety

src/DynamicData/Cache/ObservableCache.cs

Lines changed: 44 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -92,11 +92,10 @@ public ObservableCache(Func<TObject, TKey>? keySelector = null)
9292
Observable.Create<int>(
9393
observer =>
9494
{
95-
lock (_locker)
96-
{
97-
var source = _countChanged.Value.StartWith(_readerWriter.Count).DistinctUntilChanged();
98-
return source.SubscribeSafe(observer);
99-
}
95+
using var readLock = _notifications.AcquireReadLock();
96+
97+
var source = _countChanged.Value.StartWith(_readerWriter.Count).DistinctUntilChanged();
98+
return source.SubscribeSafe(observer);
10099
});
101100

102101
public IReadOnlyList<TObject> Items => _readerWriter.Items;
@@ -233,56 +232,54 @@ private IObservable<IChangeSet<TObject, TKey>> CreateConnectObservable(Func<TObj
233232
Observable.Create<IChangeSet<TObject, TKey>>(
234233
observer =>
235234
{
236-
lock (_locker)
237-
{
238-
// Skip pending notifications to avoid duplicating items already in the snapshot.
239-
var skipCount = _notifications.PendingCount;
235+
using var readLock = _notifications.AcquireReadLock();
240236

241-
var initial = InternalEx.Return(() => (IChangeSet<TObject, TKey>)GetInitialUpdates(predicate));
242-
var changesStream = skipCount > 0 ? _changes.Skip(skipCount) : _changes;
243-
var changes = initial.Concat(changesStream);
237+
// Skip pending notifications to avoid duplicating items already in the snapshot.
238+
var skipCount = readLock.PendingCount;
244239

245-
if (predicate != null)
246-
{
247-
changes = changes.Filter(predicate, suppressEmptyChangeSets);
248-
}
249-
else if (suppressEmptyChangeSets)
250-
{
251-
changes = changes.NotEmpty();
252-
}
240+
var initial = InternalEx.Return(() => (IChangeSet<TObject, TKey>)GetInitialUpdates(predicate));
241+
var changesStream = skipCount > 0 ? _changes.Skip(skipCount) : _changes;
242+
var changes = initial.Concat(changesStream);
253243

254-
return changes.SubscribeSafe(observer);
244+
if (predicate != null)
245+
{
246+
changes = changes.Filter(predicate, suppressEmptyChangeSets);
247+
}
248+
else if (suppressEmptyChangeSets)
249+
{
250+
changes = changes.NotEmpty();
255251
}
252+
253+
return changes.SubscribeSafe(observer);
256254
});
257255

258256
private IObservable<Change<TObject, TKey>> CreateWatchObservable(TKey key) =>
259257
Observable.Create<Change<TObject, TKey>>(
260258
observer =>
261259
{
262-
lock (_locker)
260+
using var readLock = _notifications.AcquireReadLock();
261+
262+
var skipCount = readLock.PendingCount;
263+
264+
var initial = _readerWriter.Lookup(key);
265+
if (initial.HasValue)
263266
{
264-
var skipCount = _notifications.PendingCount;
267+
observer.OnNext(new Change<TObject, TKey>(ChangeReason.Add, key, initial.Value));
268+
}
265269

266-
var initial = _readerWriter.Lookup(key);
267-
if (initial.HasValue)
270+
var changesStream = skipCount > 0 ? _changes.Skip(skipCount) : _changes;
271+
return changesStream.Finally(observer.OnCompleted).Subscribe(
272+
changes =>
268273
{
269-
observer.OnNext(new Change<TObject, TKey>(ChangeReason.Add, key, initial.Value));
270-
}
271-
272-
var changesStream = skipCount > 0 ? _changes.Skip(skipCount) : _changes;
273-
return changesStream.Finally(observer.OnCompleted).Subscribe(
274-
changes =>
274+
foreach (var change in changes.ToConcreteType())
275275
{
276-
foreach (var change in changes.ToConcreteType())
276+
var match = EqualityComparer<TKey>.Default.Equals(change.Key, key);
277+
if (match)
277278
{
278-
var match = EqualityComparer<TKey>.Default.Equals(change.Key, key);
279-
if (match)
280-
{
281-
observer.OnNext(change);
282-
}
279+
observer.OnNext(change);
283280
}
284-
});
285-
}
281+
}
282+
});
286283
});
287284

288285
/// <summary>
@@ -325,7 +322,10 @@ private bool DeliverNotification(NotificationItem item)
325322

326323
if (_suspensionTracker.IsValueCreated)
327324
{
328-
_suspensionTracker.Value.Dispose();
325+
lock (_locker)
326+
{
327+
_suspensionTracker.Value.Dispose();
328+
}
329329
}
330330
return false;
331331

@@ -340,7 +340,10 @@ private bool DeliverNotification(NotificationItem item)
340340

341341
if (_suspensionTracker.IsValueCreated)
342342
{
343-
_suspensionTracker.Value.Dispose();
343+
lock (_locker)
344+
{
345+
_suspensionTracker.Value.Dispose();
346+
}
344347
}
345348
return false;
346349

src/DynamicData/Internal/DeliveryQueue.cs

Lines changed: 54 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public DeliveryQueue(object gate, Func<TItem, bool> deliver)
5151
/// Gets the number of pending items enqueued with <c>countAsPending: true</c>.
5252
/// Must be read while the caller holds the gate.
5353
/// </summary>
54-
public int PendingCount => _pendingCount;
54+
private int PendingCount => _pendingCount;
5555

5656
/// <summary>
5757
/// Acquires the gate and returns a scoped ScopedAccess for enqueueing items and
@@ -61,6 +61,13 @@ public DeliveryQueue(object gate, Func<TItem, bool> deliver)
6161
/// </summary>
6262
public ScopedAccess AcquireLock() => new(this);
6363

64+
/// <summary>
65+
/// Acquires the gate for read-only access and returns a scoped handle.
66+
/// Provides access to queue state (e.g., <see cref="PendingCount"/>) but
67+
/// cannot enqueue items and does not trigger delivery on dispose.
68+
/// </summary>
69+
public ReadOnlyScopedAccess AcquireReadLock() => new(this);
70+
6471
private void EnterLock()
6572
{
6673
#if NET9_0_OR_GREATER
@@ -70,6 +77,15 @@ private void EnterLock()
7077
#endif
7178
}
7279

80+
private void ExitLock()
81+
{
82+
#if NET9_0_OR_GREATER
83+
_gate.Exit();
84+
#else
85+
Monitor.Exit(_gate);
86+
#endif
87+
}
88+
7389
private void EnqueueItem(TItem item, bool countAsPending)
7490
{
7591
if (_isTerminated)
@@ -91,11 +107,7 @@ private void ExitLockAndDeliver()
91107
var shouldDeliver = TryStartDelivery();
92108

93109
// Now release the lock. We do this before delivering to allow other threads to enqueue items while delivery is in progress.
94-
#if NET9_0_OR_GREATER
95-
_gate.Exit();
96-
#else
97-
Monitor.Exit(_gate);
98-
#endif
110+
ExitLock();
99111

100112
// If this thread has been chosen to deliver, do it now that the lock is released.
101113
// If not, another thread is already delivering or there are no items to deliver.
@@ -219,4 +231,40 @@ public void Dispose()
219231
owner.ExitLockAndDeliver();
220232
}
221233
}
234+
235+
/// <summary>
236+
/// A read-only scoped handle for reading queue state under the gate lock.
237+
/// Cannot enqueue items and does not trigger delivery on dispose.
238+
/// </summary>
239+
public ref struct ReadOnlyScopedAccess
240+
{
241+
private DeliveryQueue<TItem>? _owner;
242+
243+
internal ReadOnlyScopedAccess(DeliveryQueue<TItem> owner)
244+
{
245+
_owner = owner;
246+
owner.EnterLock();
247+
}
248+
249+
/// <summary>
250+
/// Gets the number of pending items that were enqueued with
251+
/// <c>countAsPending: true</c> and have not yet been dequeued for delivery.
252+
/// </summary>
253+
public readonly int PendingCount => _owner?._pendingCount ?? 0;
254+
255+
/// <summary>
256+
/// Releases the gate lock. Does not trigger delivery.
257+
/// </summary>
258+
public void Dispose()
259+
{
260+
var owner = _owner;
261+
if (owner is null)
262+
{
263+
return;
264+
}
265+
266+
_owner = null;
267+
owner.ExitLock();
268+
}
269+
}
222270
}

0 commit comments

Comments
 (0)