From d2a8dc45ea5700102de6b634fe83eb576621a835 Mon Sep 17 00:00:00 2001 From: "Darrin W. Cullop" Date: Sun, 5 Apr 2026 22:49:49 -0700 Subject: [PATCH 01/16] Refactor SwappableLock to add NET9+ Lock overloads --- src/DynamicData/Internal/SwappableLock.cs | 39 +++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/src/DynamicData/Internal/SwappableLock.cs b/src/DynamicData/Internal/SwappableLock.cs index 267607e9b..0176505ee 100644 --- a/src/DynamicData/Internal/SwappableLock.cs +++ b/src/DynamicData/Internal/SwappableLock.cs @@ -18,6 +18,14 @@ public static SwappableLock CreateAndEnter(object gate) return result; } +#if NET9_0_OR_GREATER + public static SwappableLock CreateAndEnter(Lock gate) + { + gate.Enter(); + return new SwappableLock() { _lockGate = gate }; + } +#endif + public void SwapTo(object gate) { if (_gate is null) @@ -33,8 +41,35 @@ public void SwapTo(object gate) _gate = gate; } +#if NET9_0_OR_GREATER + public void SwapTo(Lock gate) + { + if (_lockGate is null && _gate is null) + throw new InvalidOperationException("Lock is not initialized"); + + gate.Enter(); + + if (_lockGate is not null) + _lockGate.Exit(); + else if (_hasLock) + Monitor.Exit(_gate!); + + _lockGate = gate; + _hasLock = false; + _gate = null; + } +#endif + public void Dispose() { +#if NET9_0_OR_GREATER + if (_lockGate is not null) + { + _lockGate.Exit(); + _lockGate = null; + } + else +#endif if (_hasLock && (_gate is not null)) { Monitor.Exit(_gate); @@ -45,4 +80,8 @@ public void Dispose() private bool _hasLock; private object? _gate; + +#if NET9_0_OR_GREATER + private Lock? _lockGate; +#endif } From c4b89af529cc7f08f2741156adcc65a565aa0ff0 Mon Sep 17 00:00:00 2001 From: "Darrin W. Cullop" Date: Sun, 5 Apr 2026 22:49:49 -0700 Subject: [PATCH 02/16] Fix race in ExpireAfter when item is removed or updated before expiration fires --- .../Cache/Internal/ExpireAfter.ForSource.cs | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/src/DynamicData/Cache/Internal/ExpireAfter.ForSource.cs b/src/DynamicData/Cache/Internal/ExpireAfter.ForSource.cs index cf3e317ec..e97b9861f 100644 --- a/src/DynamicData/Cache/Internal/ExpireAfter.ForSource.cs +++ b/src/DynamicData/Cache/Internal/ExpireAfter.ForSource.cs @@ -161,11 +161,18 @@ private void OnEditingSource(ISourceUpdater updater) { _expirationDueTimesByKey.Remove(proposedExpiration.Key); - _removedItemsBuffer.Add(new( - key: proposedExpiration.Key, - value: updater.Lookup(proposedExpiration.Key).Value)); - - updater.RemoveKey(proposedExpiration.Key); + // The item may have been removed or updated by another thread between when + // this expiration was scheduled and when it fired. Check that the item is + // still present and still has an expiration before removing it. + var lookup = updater.Lookup(proposedExpiration.Key); + if (lookup.HasValue && _timeSelector.Invoke(lookup.Value) is not null) + { + _removedItemsBuffer.Add(new( + key: proposedExpiration.Key, + value: lookup.Value)); + + updater.RemoveKey(proposedExpiration.Key); + } } } _proposedExpirationsQueue.RemoveRange(0, proposedExpirationIndex); From 501c9f26ffb7a86fcfb56eb5a0189a54209908d6 Mon Sep 17 00:00:00 2001 From: "Darrin W. Cullop" Date: Sun, 5 Apr 2026 22:49:49 -0700 Subject: [PATCH 03/16] fix: Replace lock-during-notification with queue-based drain to prevent cross-cache deadlock The original code held _locker while calling _changes.OnNext(), so subscriber callbacks that propagated to other caches created ABBA deadlocks when concurrent writes were happening on those caches. New design: - Single _locker protects mutation and queue state - Write paths: lock, mutate, enqueue changeset, release lock, then drain - DrainOutsideLock delivers notifications with no lock held - _isDraining flag ensures only one thread drains at a time, preserving Rx serialization contract - Re-entrant writes enqueue and return; the outer drain loop delivers them sequentially - Connect/Watch/CountChanged use Skip(pendingCount) to avoid duplicating items already in the snapshot, with no delivery under lock - Terminal events (OnCompleted/OnError) routed through drain queue - Preview remains synchronous under _locker (required by ReaderWriter) - Suspension state captured at enqueue time; re-checked at delivery - try/catch resets _isDraining on exception - volatile _isTerminated prevents post-dispose delivery --- .../Cache/SourceCacheFixture.cs | 43 +- src/DynamicData/Cache/ObservableCache.cs | 433 +++++++++++++++--- 2 files changed, 412 insertions(+), 64 deletions(-) diff --git a/src/DynamicData.Tests/Cache/SourceCacheFixture.cs b/src/DynamicData.Tests/Cache/SourceCacheFixture.cs index a1380137b..6b8666339 100644 --- a/src/DynamicData.Tests/Cache/SourceCacheFixture.cs +++ b/src/DynamicData.Tests/Cache/SourceCacheFixture.cs @@ -1,6 +1,8 @@ -using System; +using System; using System.Linq; using System.Reactive.Linq; +using System.Threading; +using System.Threading.Tasks; using DynamicData.Tests.Domain; @@ -188,4 +190,43 @@ public void StaticFilterRemove() public record class SomeObject(int Id, int Value); + + [Fact] + public async Task ConcurrentEditsShouldNotDeadlockWithSubscribersThatModifyOtherCaches() + { + const int itemCount = 100; + + using var cacheA = new SourceCache(static x => x.Key); + using var cacheB = new SourceCache(static x => x.Key); + using var destination = new SourceCache(static x => x.Key); + using var subA = cacheA.Connect().PopulateInto(destination); + using var subB = cacheB.Connect().PopulateInto(destination); + using var results = destination.Connect().AsAggregator(); + + var taskA = Task.Run(() => + { + for (var i = 0; i < itemCount; i++) + { + cacheA.AddOrUpdate(new TestItem($"a-{i}", $"ValueA-{i}")); + } + }); + + var taskB = Task.Run(() => + { + for (var i = 0; i < itemCount; i++) + { + cacheB.AddOrUpdate(new TestItem($"b-{i}", $"ValueB-{i}")); + } + }); + + var completed = Task.WhenAll(taskA, taskB); + var finished = await Task.WhenAny(completed, Task.Delay(TimeSpan.FromSeconds(10))); + + finished.Should().BeSameAs(completed, "concurrent edits with cross-cache subscribers should not deadlock"); + results.Error.Should().BeNull(); + results.Data.Count.Should().Be(itemCount * 2, "all items from both caches should arrive in the destination"); + results.Data.Items.Should().BeEquivalentTo([.. cacheA.Items, .. cacheB.Items], "all items should be in the destination"); + } + + private sealed record TestItem(string Key, string Value); } diff --git a/src/DynamicData/Cache/ObservableCache.cs b/src/DynamicData/Cache/ObservableCache.cs index e56bab14f..5f1df9a64 100644 --- a/src/DynamicData/Cache/ObservableCache.cs +++ b/src/DynamicData/Cache/ObservableCache.cs @@ -38,67 +38,132 @@ internal sealed class ObservableCache : IObservableCache _readerWriter; + private readonly Queue _notificationQueue = new(); + private int _editLevel; // The level of recursion in editing. + private bool _isDraining; + + // Set under _locker when terminal events are delivered or Dispose runs. + // Checked by DeliverNotification to skip delivery after termination. + // Volatile because it's read outside _locker in DrainOutsideLock's delivery path. + private volatile bool _isTerminated; + + // Tracks how many items currently in the queue will produce _changes.OnNext. + // Excludes suspended, count-only, and terminal items. Incremented at enqueue, + // decremented at dequeue (both under _locker). Used by Connect/Watch for + // precise Skip(N) that avoids both duplicates and missed notifications. + private int _pendingChangesOnNextCount; + public ObservableCache(IObservable> source) { - _suspensionTracker = new(() => new SuspensionTracker(_changes.OnNext, InvokeCountNext)); _readerWriter = new ReaderWriter(); + _suspensionTracker = new(() => new SuspensionTracker(EnqueueChanges, EnqueueCount)); - var loader = source.Synchronize(_locker).Finally( - () => - { - _changes.OnCompleted(); - _changesPreview.OnCompleted(); - }).Subscribe( + var loader = source.Subscribe( changeSet => { - var previewHandler = _changesPreview.HasObservers ? (Action>)InvokePreview : null; - var changes = _readerWriter.Write(changeSet, previewHandler, _changes.HasObservers); - InvokeNext(changes); + bool shouldDrain; + lock (_locker) + { + var previewHandler = _changesPreview.HasObservers ? (Action>)InvokePreview : null; + var changes = _readerWriter.Write(changeSet, previewHandler, _changes.HasObservers); + + if (changes is null) + { + return; + } + + EnqueueUnderLock(changes); + shouldDrain = TryStartDrain(); + } + + if (shouldDrain) + { + DrainOutsideLock(); + } }, ex => { - _changesPreview.OnError(ex); - _changes.OnError(ex); + bool shouldDrain; + lock (_locker) + { + _notificationQueue.Enqueue(NotificationItem.CreateError(ex)); + shouldDrain = TryStartDrain(); + } + + if (shouldDrain) + { + DrainOutsideLock(); + } + }, + () => + { + bool shouldDrain; + lock (_locker) + { + _notificationQueue.Enqueue(NotificationItem.CreateCompleted()); + shouldDrain = TryStartDrain(); + } + + if (shouldDrain) + { + DrainOutsideLock(); + } }); _cleanUp = Disposable.Create( () => { loader.Dispose(); - _changes.OnCompleted(); - _changesPreview.OnCompleted(); - if (_suspensionTracker.IsValueCreated) - { - _suspensionTracker.Value.Dispose(); - } - if (_countChanged.IsValueCreated) + lock (_locker) { - _countChanged.Value.OnCompleted(); + // Dispose is a teardown path. Clear pending items and terminate directly. + _isTerminated = true; + _pendingChangesOnNextCount = 0; + _notificationQueue.Clear(); + _changes.OnCompleted(); + _changesPreview.OnCompleted(); + + if (_countChanged.IsValueCreated) + { + _countChanged.Value.OnCompleted(); + } + + if (_suspensionTracker.IsValueCreated) + { + _suspensionTracker.Value.Dispose(); + } } }); } public ObservableCache(Func? keySelector = null) { - _suspensionTracker = new(() => new SuspensionTracker(_changes.OnNext, InvokeCountNext)); _readerWriter = new ReaderWriter(keySelector); + _suspensionTracker = new(() => new SuspensionTracker(EnqueueChanges, EnqueueCount)); _cleanUp = Disposable.Create( () => { - _changes.OnCompleted(); - _changesPreview.OnCompleted(); - if (_suspensionTracker.IsValueCreated) + lock (_locker) { - _suspensionTracker.Value.Dispose(); - } + _isTerminated = true; + _pendingChangesOnNextCount = 0; + _notificationQueue.Clear(); + _changes.OnCompleted(); + _changesPreview.OnCompleted(); - if (_countChanged.IsValueCreated) - { - _countChanged.Value.OnCompleted(); + if (_countChanged.IsValueCreated) + { + _countChanged.Value.OnCompleted(); + } + + if (_suspensionTracker.IsValueCreated) + { + _suspensionTracker.Value.Dispose(); + } } }); } @@ -111,7 +176,9 @@ public ObservableCache(Func? keySelector = null) { lock (_locker) { - var source = _countChanged.Value.StartWith(_readerWriter.Count).DistinctUntilChanged(); + var skipCount = _notificationQueue.Count; + var countStream = skipCount > 0 ? _countChanged.Value.Skip(skipCount) : _countChanged.Value; + var source = countStream.StartWith(_readerWriter.Count).DistinctUntilChanged(); return source.SubscribeSafe(observer); } }); @@ -188,6 +255,7 @@ internal void UpdateFromIntermediate(Action> update { updateAction.ThrowArgumentNullExceptionIfNull(nameof(updateAction)); + bool shouldDrain; lock (_locker) { ChangeSet? changes = null; @@ -207,8 +275,15 @@ internal void UpdateFromIntermediate(Action> update if (changes is not null && _editLevel == 0) { - InvokeNext(changes); + EnqueueUnderLock(changes); } + + shouldDrain = TryStartDrain(); + } + + if (shouldDrain) + { + DrainOutsideLock(); } } @@ -216,6 +291,7 @@ internal void UpdateFromSource(Action> updateActio { updateAction.ThrowArgumentNullExceptionIfNull(nameof(updateAction)); + bool shouldDrain; lock (_locker) { ChangeSet? changes = null; @@ -235,8 +311,15 @@ internal void UpdateFromSource(Action> updateActio if (changes is not null && _editLevel == 0) { - InvokeNext(changes); + EnqueueUnderLock(changes); } + + shouldDrain = TryStartDrain(); + } + + if (shouldDrain) + { + DrainOutsideLock(); } } @@ -246,8 +329,14 @@ private IObservable> CreateConnectObservable(Func (IChangeSet)GetInitialUpdates(predicate)); - var changes = initial.Concat(_changes); + var changesStream = skipCount > 0 ? _changes.Skip(skipCount) : _changes; + var changes = initial.Concat(changesStream); if (predicate != null) { @@ -268,13 +357,16 @@ private IObservable> CreateWatchObservable(TKey key) => { lock (_locker) { + var skipCount = _pendingChangesOnNextCount; + var initial = _readerWriter.Lookup(key); if (initial.HasValue) { observer.OnNext(new Change(ChangeReason.Add, key, initial.Value)); } - return _changes.Finally(observer.OnCompleted).Subscribe( + var changesStream = skipCount > 0 ? _changes.Skip(skipCount) : _changes; + return changesStream.Finally(observer.OnCompleted).Subscribe( changes => { foreach (var change in changes.ToConcreteType()) @@ -289,68 +381,277 @@ private IObservable> CreateWatchObservable(TKey key) => } }); - private void InvokeNext(ChangeSet changes) + /// + /// Delivers a preview notification synchronously under _locker. Preview is + /// called by ReaderWriter during a write, between two data swaps, so it MUST + /// fire under the lock with the pre-write state visible to subscribers. + /// + private void InvokePreview(ChangeSet changes) { - lock (_locker) + if (changes.Count != 0) + { + _changesPreview.OnNext(changes); + } + } + + /// + /// Enqueues a changeset (plus associated count) for delivery outside the lock. + /// Must be called while _locker is held. + /// + private void EnqueueUnderLock(ChangeSet changes) + { + // Check suspension state under lock to avoid TOCTOU race. + var isSuspended = _suspensionTracker.IsValueCreated && _suspensionTracker.Value.AreNotificationsSuspended; + var isCountSuspended = _suspensionTracker.IsValueCreated && _suspensionTracker.Value.IsCountSuspended; + + _notificationQueue.Enqueue(new NotificationItem(changes, _readerWriter.Count, isSuspended, isCountSuspended)); + + if (!isSuspended) + { + _pendingChangesOnNextCount++; + } + } + + /// + /// Attempts to claim the drain token. Returns true if this thread should drain. + /// Must be called while _locker is held. + /// + private bool TryStartDrain() + { + if (_isDraining || _notificationQueue.Count == 0) + { + return false; + } + + _isDraining = true; + return true; + } + + /// + /// Delivers all pending notifications outside the lock. Only the thread that + /// successfully called TryStartDrain may call this. Serializes all OnNext + /// calls for this cache instance, preserving the Rx contract. + /// + private void DrainOutsideLock() + { + try + { + while (true) + { + NotificationItem item; + lock (_locker) + { + if (_notificationQueue.Count == 0) + { + _isDraining = false; + return; + } + + item = _notificationQueue.Dequeue(); + + // Decrement the per-subject counter for items that will emit _changes.OnNext. + if (!item.IsSuspended && !item.IsCountOnly && !item.IsCompleted && !item.IsError) + { + _pendingChangesOnNextCount--; + } + } + + DeliverNotification(item); + } + } + catch { - // If Notifications are not suspended - if (!_suspensionTracker.IsValueCreated || !_suspensionTracker.Value.AreNotificationsSuspended) + lock (_locker) { - // Emit the changes - _changes.OnNext(changes); + _isDraining = false; + _pendingChangesOnNextCount = 0; } - else + + throw; + } + } + + private void DeliverNotification(NotificationItem item) + { + // After Dispose or a terminal event has been delivered, skip all delivery. + // Subject.OnNext after OnCompleted is a no-op, but this avoids wasted work + // and prevents subtle ordering issues. + if (_isTerminated) + { + return; + } + + if (item.IsCompleted) + { + _isTerminated = true; + _changes.OnCompleted(); + _changesPreview.OnCompleted(); + + if (_countChanged.IsValueCreated) { - // Don't emit the changes, but add them to the list - _suspensionTracker.Value.EnqueueChanges(changes); + _countChanged.Value.OnCompleted(); } - // If CountChanges are not suspended - if (!_suspensionTracker.IsValueCreated || !_suspensionTracker.Value.IsCountSuspended) + return; + } + + if (item.IsError) + { + _isTerminated = true; + _changesPreview.OnError(item.Error!); + _changes.OnError(item.Error!); + return; + } + + if (item.IsCountOnly) + { + if (_countChanged.IsValueCreated) { - InvokeCountNext(); + _countChanged.Value.OnNext(item.Count); } + + return; } - } - private void InvokePreview(ChangeSet changes) - { - lock (_locker) + // Suspension state was captured at enqueue time (under lock) to avoid TOCTOU. + // For unsuspended items, deliver directly. For suspended items, re-check the + // live state under lock — ResumeNotifications may have run between dequeue and + // delivery, in which case we deliver directly instead of orphaning in _pendingChanges. + if (!item.IsSuspended) + { + _changes.OnNext(item.Changes); + } + else { - if (changes.Count != 0) + bool deliverNow; + lock (_locker) + { + if (_suspensionTracker.Value.AreNotificationsSuspended) + { + _suspensionTracker.Value.EnqueueChanges(item.Changes); + deliverNow = false; + } + else + { + deliverNow = true; + } + } + + if (deliverNow) { - _changesPreview.OnNext(changes); + _changes.OnNext(item.Changes); } } - } - private void InvokeCountNext() - { - lock (_locker) + if (!item.IsCountSuspended) { if (_countChanged.IsValueCreated) { - _countChanged.Value.OnNext(_readerWriter.Count); + _countChanged.Value.OnNext(item.Count); } } } + /// + /// Called by SuspensionTracker.ResumeNotifications to deliver accumulated + /// changes. This enqueues under _locker; the caller's TryStartDrain + + /// DrainOutsideLock handles delivery outside the lock. + /// + private void EnqueueChanges(ChangeSet changes) + { + _notificationQueue.Enqueue(new NotificationItem(changes, _readerWriter.Count, isSuspended: false, isCountSuspended: false)); + _pendingChangesOnNextCount++; + } + + /// + /// Called by SuspensionTracker.ResumeCount to deliver the current count. + /// + private void EnqueueCount() + { + if (_countChanged.IsValueCreated) + { + _notificationQueue.Enqueue(NotificationItem.CreateCountOnly(_readerWriter.Count)); + } + } + private void ResumeCount() { + bool shouldDrain; lock (_locker) { Debug.Assert(_suspensionTracker.IsValueCreated, "Should not be Resuming Count without Suspend Count instance"); _suspensionTracker.Value.ResumeCount(); + shouldDrain = TryStartDrain(); + } + + if (shouldDrain) + { + DrainOutsideLock(); } } private void ResumeNotifications() { + bool shouldDrain; lock (_locker) { - Debug.Assert(_suspensionTracker.IsValueCreated, "Should not be Resuming Notifications without Suspend Count instance"); + Debug.Assert(_suspensionTracker.IsValueCreated, "Should not be Resuming Notifications without Suspend Notifications instance"); _suspensionTracker.Value.ResumeNotifications(); + shouldDrain = TryStartDrain(); + } + + if (shouldDrain) + { + DrainOutsideLock(); + } + } + + private readonly record struct NotificationItem + { + public ChangeSet Changes { get; } + + public int Count { get; } + + public bool IsCountOnly { get; } + + public bool IsSuspended { get; } + + public bool IsCountSuspended { get; } + + public bool IsCompleted { get; } + + public bool IsError { get; } + + public Exception? Error { get; } + + public NotificationItem(ChangeSet changes, int count, bool isSuspended, bool isCountSuspended) + { + Changes = changes; + Count = count; + IsSuspended = isSuspended; + IsCountSuspended = isCountSuspended; } + + private NotificationItem(int count, bool isCountOnly) + { + Changes = []; + Count = count; + IsCountOnly = isCountOnly; + } + + private NotificationItem(bool isCompleted, Exception? error) + { + Changes = []; + IsCompleted = isCompleted; + IsError = error is not null; + Error = error; + } + + public static NotificationItem CreateCountOnly(int count) => new(count, isCountOnly: true); + + public static NotificationItem CreateCompleted() => new(isCompleted: true, error: null); + + public static NotificationItem CreateError(Exception error) => new(isCompleted: false, error: error); } private sealed class SuspensionTracker(Action> onResumeNotifications, Action onResumeCount) : IDisposable @@ -396,15 +697,21 @@ public void ResumeNotifications() { if (--_notifySuspendCount == 0 && !_areNotificationsSuspended.IsDisposed) { - // Fire pending changes to existing subscribers + // Swap out pending changes before the callback to handle re-entrant + // suspend/resume correctly. If a subscriber re-suspends during the + // callback, new changes go into the fresh list, not the one being delivered. if (_pendingChanges.Count > 0) { - _onResumeNotifications(new ChangeSet(_pendingChanges)); - _pendingChanges.Clear(); + var changesToDeliver = _pendingChanges; + _pendingChanges = []; + _onResumeNotifications(new ChangeSet(changesToDeliver)); } - // Tell deferred subscribers they can continue - _areNotificationsSuspended.OnNext(false); + // Re-check: a subscriber callback may have re-suspended during delivery. + if (_notifySuspendCount == 0) + { + _areNotificationsSuspended.OnNext(false); + } } } From 72ea32c91b628dfce12c11c4927ddb5d83072980 Mon Sep 17 00:00:00 2001 From: "Darrin W. Cullop" Date: Mon, 6 Apr 2026 18:21:39 -0700 Subject: [PATCH 04/16] Refactor to use one lock and a serialized delivery queue to ensure Rx contracts and thread-safety. --- .../Cache/SourceCacheFixture.cs | 61 +- .../Internal/DeliveryQueueFixture.cs | 412 +++++++++++++ src/DynamicData/Cache/ObservableCache.cs | 547 ++++++------------ src/DynamicData/Internal/DeliveryQueue.cs | 222 +++++++ 4 files changed, 868 insertions(+), 374 deletions(-) create mode 100644 src/DynamicData.Tests/Internal/DeliveryQueueFixture.cs create mode 100644 src/DynamicData/Internal/DeliveryQueue.cs diff --git a/src/DynamicData.Tests/Cache/SourceCacheFixture.cs b/src/DynamicData.Tests/Cache/SourceCacheFixture.cs index 6b8666339..c3c039455 100644 --- a/src/DynamicData.Tests/Cache/SourceCacheFixture.cs +++ b/src/DynamicData.Tests/Cache/SourceCacheFixture.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Linq; using System.Reactive.Linq; using System.Threading; @@ -228,5 +228,64 @@ public async Task ConcurrentEditsShouldNotDeadlockWithSubscribersThatModifyOther results.Data.Items.Should().BeEquivalentTo([.. cacheA.Items, .. cacheB.Items], "all items should be in the destination"); } + + [Fact] + public async Task DirectCrossWriteDoesNotDeadlock() + { + const int iterations = 100; + + for (var iter = 0; iter < iterations; iter++) + { + using var cacheA = new SourceCache(static x => x.Key); + using var cacheB = new SourceCache(static x => x.Key); + + using var subA = cacheA.Connect().Subscribe(changes => + { + foreach (var c in changes) + { + if (c.Reason == ChangeReason.Add && !c.Current.Key.StartsWith("x")) + { + cacheB.AddOrUpdate(new TestItem("x" + c.Current.Key, c.Current.Value)); + } + } + }); + + using var subB = cacheB.Connect().Subscribe(changes => + { + foreach (var c in changes) + { + if (c.Reason == ChangeReason.Add && !c.Current.Key.StartsWith("x")) + { + cacheA.AddOrUpdate(new TestItem("x" + c.Current.Key, c.Current.Value)); + } + } + }); + + var barrier = new Barrier(2); + + var taskA = Task.Run(() => + { + barrier.SignalAndWait(); + for (var i = 0; i < 1000; i++) + { + cacheA.AddOrUpdate(new TestItem("a" + i, "V" + i)); + } + }); + + var taskB = Task.Run(() => + { + barrier.SignalAndWait(); + for (var i = 0; i < 1000; i++) + { + cacheB.AddOrUpdate(new TestItem("b" + i, "V" + i)); + } + }); + + var completed = Task.WhenAll(taskA, taskB); + var finished = await Task.WhenAny(completed, Task.Delay(TimeSpan.FromSeconds(5))); + + finished.Should().BeSameAs(completed, $"iteration {iter}: direct cross-cache writes should not deadlock"); + } + } private sealed record TestItem(string Key, string Value); } diff --git a/src/DynamicData.Tests/Internal/DeliveryQueueFixture.cs b/src/DynamicData.Tests/Internal/DeliveryQueueFixture.cs new file mode 100644 index 000000000..ad111dc41 --- /dev/null +++ b/src/DynamicData.Tests/Internal/DeliveryQueueFixture.cs @@ -0,0 +1,412 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; + +using DynamicData.Internal; + +using FluentAssertions; + +using Xunit; + +namespace DynamicData.Tests.Internal; + +public class DeliveryQueueFixture +{ +#if NET9_0_OR_GREATER + private readonly Lock _gate = new(); +#else + private readonly object _gate = new(); +#endif + + private static void EnqueueAndDeliver(DeliveryQueue queue, T item) + { + using var notifications = queue.AcquireLock(); + notifications.Enqueue(item); + } + + private static void TriggerDelivery(DeliveryQueue queue) + { + using var notifications = queue.AcquireLock(); + } + + // Category 1: Basic Behavior + + [Fact] + public void EnqueueAndDeliverDeliversItem() + { + var delivered = new List(); + var queue = new DeliveryQueue(_gate, item => { delivered.Add(item); return true; }); + + EnqueueAndDeliver(queue, "A"); + + delivered.Should().Equal("A"); + } + + [Fact] + public void DeliverDeliversItemsInFifoOrder() + { + var delivered = new List(); + var queue = new DeliveryQueue(_gate, item => { delivered.Add(item); return true; }); + + using (var notifications = queue.AcquireLock()) + { + notifications.Enqueue("A"); + notifications.Enqueue("B"); + notifications.Enqueue("C"); + } + + delivered.Should().Equal("A", "B", "C"); + } + + [Fact] + public void DeliverWithEmptyQueueIsNoOp() + { + var delivered = new List(); + var queue = new DeliveryQueue(_gate, item => { delivered.Add(item); return true; }); + + TriggerDelivery(queue); + + delivered.Should().BeEmpty(); + } + + // Category 2: Delivery Token Serialization + + [Fact] + public async Task OnlyOneDelivererAtATime() + { + var concurrentCount = 0; + var maxConcurrent = 0; + var queue = new DeliveryQueue(_gate, _ => + { + var current = Interlocked.Increment(ref concurrentCount); + if (current > maxConcurrent) + { + Interlocked.Exchange(ref maxConcurrent, current); + } + + Thread.SpinWait(1000); + Interlocked.Decrement(ref concurrentCount); + return true; + }); + + using (var notifications = queue.AcquireLock()) + { + for (var i = 0; i < 100; i++) + { + notifications.Enqueue(i); + } + } + + var tasks = Enumerable.Range(0, 4).Select(_ => Task.Run(() => TriggerDelivery(queue))).ToArray(); + await Task.WhenAll(tasks); + + maxConcurrent.Should().Be(1, "only one thread should be delivering at a time"); + } + + [Fact] + public void SecondWriterItemPickedUpByFirstDeliverer() + { + var delivered = new List(); + var deliveryCount = 0; + DeliveryQueue? q = null; + + var queue = new DeliveryQueue(_gate, item => + { + delivered.Add(item); + if (Interlocked.Increment(ref deliveryCount) == 1) + { + using var notifications = q!.AcquireLock(); + notifications.Enqueue("B"); + } + + return true; + }); + q = queue; + + EnqueueAndDeliver(queue, "A"); + + delivered.Should().Equal("A", "B"); + } + + [Fact] + public void ReentrantEnqueueDoesNotRecurse() + { + var callDepth = 0; + var maxDepth = 0; + var delivered = new List(); + DeliveryQueue? q = null; + + var queue = new DeliveryQueue(_gate, item => + { + callDepth++; + if (callDepth > maxDepth) + { + maxDepth = callDepth; + } + + delivered.Add(item); + + if (item == "A") + { + using var notifications = q!.AcquireLock(); + notifications.Enqueue("B"); + } + + callDepth--; + return true; + }); + q = queue; + + EnqueueAndDeliver(queue, "A"); + + delivered.Should().Equal("A", "B"); + maxDepth.Should().Be(1, "delivery callback should not recurse"); + } + + // Category 3: Exception Safety + + [Fact] + public void ExceptionInDeliveryResetsDeliveryToken() + { + var callCount = 0; + var queue = new DeliveryQueue(_gate, item => + { + callCount++; + if (callCount == 1) + { + throw new InvalidOperationException("boom"); + } + + return true; + }); + + var act = () => EnqueueAndDeliver(queue, "A"); + act.Should().Throw(); + + EnqueueAndDeliver(queue, "B"); + + callCount.Should().Be(2, "delivery should work after exception recovery"); + } + + [Fact] + public void RemainingItemsDeliveredAfterExceptionRecovery() + { + var delivered = new List(); + var shouldThrow = true; + var queue = new DeliveryQueue(_gate, item => + { + if (shouldThrow && item == "A") + { + throw new InvalidOperationException("boom"); + } + + delivered.Add(item); + return true; + }); + + var act = () => + { + using var notifications = queue.AcquireLock(); + notifications.Enqueue("A"); + notifications.Enqueue("B"); + }; + + act.Should().Throw(); + + shouldThrow = false; + TriggerDelivery(queue); + + delivered.Should().Equal("B"); + } + + // Category 4: Termination + + [Fact] + public void TerminalCallbackStopsDelivery() + { + var delivered = new List(); + var queue = new DeliveryQueue(_gate, item => + { + delivered.Add(item); + return item != "STOP"; + }); + + using (var notifications = queue.AcquireLock()) + { + notifications.Enqueue("A"); + notifications.Enqueue("STOP"); + notifications.Enqueue("B"); + } + + delivered.Should().Equal("A", "STOP"); + queue.IsTerminated.Should().BeTrue(); + } + + [Fact] + public void EnqueueAfterTerminationIsIgnored() + { + var delivered = new List(); + var queue = new DeliveryQueue(_gate, item => + { + delivered.Add(item); + return item != "STOP"; + }); + + EnqueueAndDeliver(queue, "STOP"); + + EnqueueAndDeliver(queue, "AFTER"); + + delivered.Should().Equal("STOP"); + } + + [Fact] + public void IsTerminatedIsFalseInitially() + { + var queue = new DeliveryQueue(_gate, _ => true); + queue.IsTerminated.Should().BeFalse(); + } + + // Category 5: PendingCount + + [Fact] + public void PendingCountTracksAutomatically() + { + var queue = new DeliveryQueue(_gate, _ => true); + + using (var notifications = queue.AcquireLock()) + { + notifications.PendingCount.Should().Be(0); + + notifications.Enqueue("A", countAsPending: true); + notifications.Enqueue("B", countAsPending: true); + notifications.Enqueue("C"); + + notifications.PendingCount.Should().Be(2); + } + + using (var notifications = queue.AcquireLock()) + { + notifications.PendingCount.Should().Be(0, "pending count should auto-decrement on delivery"); + } + } + + [Fact] + public void PendingCountPreservedOnException() + { + var callCount = 0; + var queue = new DeliveryQueue(_gate, _ => + { + if (++callCount == 1) + { + throw new InvalidOperationException("boom"); + } + + return true; + }); + + var act = () => + { + using var notifications = queue.AcquireLock(); + notifications.Enqueue("A", countAsPending: true); + notifications.Enqueue("B", countAsPending: true); + }; + + act.Should().Throw(); + + lock (_gate) + { + queue.PendingCount.Should().Be(1, "only the dequeued item should be decremented"); + } + } + + [Fact] + public void PendingCountClearedOnTermination() + { + var queue = new DeliveryQueue(_gate, item => item != "STOP"); + + using (var notifications = queue.AcquireLock()) + { + notifications.Enqueue("A", countAsPending: true); + notifications.Enqueue("B", countAsPending: true); + notifications.Enqueue("STOP"); + } + + queue.PendingCount.Should().Be(0); + } + + // Category 6: Stress / Thread Safety + + [Fact] + public async Task ConcurrentEnqueueAllItemsDelivered() + { + const int threadCount = 8; + const int itemsPerThread = 500; + var delivered = new ConcurrentBag(); + var queue = new DeliveryQueue(_gate, item => { delivered.Add(item); return true; }); + + var tasks = Enumerable.Range(0, threadCount).Select(t => Task.Run(() => + { + for (var i = 0; i < itemsPerThread; i++) + { + EnqueueAndDeliver(queue, (t * itemsPerThread) + i); + } + })).ToArray(); + + await Task.WhenAll(tasks); + TriggerDelivery(queue); + + delivered.Count.Should().Be(threadCount * itemsPerThread); + } + + [Fact] + public async Task ConcurrentEnqueueNoDuplicates() + { + const int threadCount = 8; + const int itemsPerThread = 500; + var delivered = new ConcurrentBag(); + var queue = new DeliveryQueue(_gate, item => { delivered.Add(item); return true; }); + + var tasks = Enumerable.Range(0, threadCount).Select(t => Task.Run(() => + { + for (var i = 0; i < itemsPerThread; i++) + { + EnqueueAndDeliver(queue, (t * itemsPerThread) + i); + } + })).ToArray(); + + await Task.WhenAll(tasks); + TriggerDelivery(queue); + + delivered.Distinct().Count().Should().Be(threadCount * itemsPerThread, "each item should be delivered exactly once"); + } + + [Fact] + public async Task ConcurrentEnqueuePreservesPerThreadOrdering() + { + const int threadCount = 4; + const int itemsPerThread = 200; + var delivered = new ConcurrentQueue<(int Thread, int Seq)>(); + var queue = new DeliveryQueue<(int Thread, int Seq)>(_gate, item => { delivered.Enqueue(item); return true; }); + + var tasks = Enumerable.Range(0, threadCount).Select(t => Task.Run(() => + { + for (var i = 0; i < itemsPerThread; i++) + { + EnqueueAndDeliver(queue, (t, i)); + } + })).ToArray(); + + await Task.WhenAll(tasks); + TriggerDelivery(queue); + + var itemsByThread = delivered.ToArray().GroupBy(x => x.Thread).ToDictionary(g => g.Key, g => g.Select(x => x.Seq).ToList()); + + foreach (var (thread, sequences) in itemsByThread) + { + sequences.Should().BeInAscendingOrder($"items from thread {thread} should preserve enqueue order"); + } + } +} \ No newline at end of file diff --git a/src/DynamicData/Cache/ObservableCache.cs b/src/DynamicData/Cache/ObservableCache.cs index 5f1df9a64..c7d54d994 100644 --- a/src/DynamicData/Cache/ObservableCache.cs +++ b/src/DynamicData/Cache/ObservableCache.cs @@ -1,4 +1,4 @@ -// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved. +// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved. // Roland Pheasant licenses this file to you under the MIT license. // See the LICENSE file in the project root for full license information. @@ -9,6 +9,7 @@ using DynamicData.Binding; using DynamicData.Cache; using DynamicData.Cache.Internal; +using DynamicData.Internal; // ReSharper disable once CheckNamespace namespace DynamicData; @@ -38,134 +39,51 @@ internal sealed class ObservableCache : IObservableCache _readerWriter; - private readonly Queue _notificationQueue = new(); + private readonly DeliveryQueue _notifications; private int _editLevel; // The level of recursion in editing. - private bool _isDraining; - - // Set under _locker when terminal events are delivered or Dispose runs. - // Checked by DeliverNotification to skip delivery after termination. - // Volatile because it's read outside _locker in DrainOutsideLock's delivery path. - private volatile bool _isTerminated; - - // Tracks how many items currently in the queue will produce _changes.OnNext. - // Excludes suspended, count-only, and terminal items. Incremented at enqueue, - // decremented at dequeue (both under _locker). Used by Connect/Watch for - // precise Skip(N) that avoids both duplicates and missed notifications. - private int _pendingChangesOnNextCount; - public ObservableCache(IObservable> source) { _readerWriter = new ReaderWriter(); - _suspensionTracker = new(() => new SuspensionTracker(EnqueueChanges, EnqueueCount)); + _notifications = new DeliveryQueue(_locker, DeliverNotification); + _suspensionTracker = new(() => new SuspensionTracker()); var loader = source.Subscribe( changeSet => { - bool shouldDrain; - lock (_locker) - { - var previewHandler = _changesPreview.HasObservers ? (Action>)InvokePreview : null; - var changes = _readerWriter.Write(changeSet, previewHandler, _changes.HasObservers); + using var notifications = _notifications.AcquireLock(); - if (changes is null) - { - return; - } - - EnqueueUnderLock(changes); - shouldDrain = TryStartDrain(); - } - - if (shouldDrain) - { - DrainOutsideLock(); - } - }, - ex => - { - bool shouldDrain; - lock (_locker) - { - _notificationQueue.Enqueue(NotificationItem.CreateError(ex)); - shouldDrain = TryStartDrain(); - } + var previewHandler = _changesPreview.HasObservers ? (Action>)InvokePreview : null; + var changes = _readerWriter.Write(changeSet, previewHandler, _changes.HasObservers); - if (shouldDrain) + if (changes is not null) { - DrainOutsideLock(); + var isSuspended = _suspensionTracker.IsValueCreated && _suspensionTracker.Value.AreNotificationsSuspended; + var isCountSuspended = _suspensionTracker.IsValueCreated && _suspensionTracker.Value.IsCountSuspended; + notifications.Enqueue( + NotificationItem.CreateChanges(changes, _readerWriter.Count, isSuspended, isCountSuspended), + countAsPending: !isSuspended); } }, - () => - { - bool shouldDrain; - lock (_locker) - { - _notificationQueue.Enqueue(NotificationItem.CreateCompleted()); - shouldDrain = TryStartDrain(); - } - - if (shouldDrain) - { - DrainOutsideLock(); - } - }); + NotifyError, + NotifyCompleted); _cleanUp = Disposable.Create( () => { loader.Dispose(); - - lock (_locker) - { - // Dispose is a teardown path. Clear pending items and terminate directly. - _isTerminated = true; - _pendingChangesOnNextCount = 0; - _notificationQueue.Clear(); - _changes.OnCompleted(); - _changesPreview.OnCompleted(); - - if (_countChanged.IsValueCreated) - { - _countChanged.Value.OnCompleted(); - } - - if (_suspensionTracker.IsValueCreated) - { - _suspensionTracker.Value.Dispose(); - } - } + NotifyCompleted(); }); } public ObservableCache(Func? keySelector = null) { _readerWriter = new ReaderWriter(keySelector); - _suspensionTracker = new(() => new SuspensionTracker(EnqueueChanges, EnqueueCount)); + _notifications = new DeliveryQueue(_locker, DeliverNotification); + _suspensionTracker = new(() => new SuspensionTracker()); - _cleanUp = Disposable.Create( - () => - { - lock (_locker) - { - _isTerminated = true; - _pendingChangesOnNextCount = 0; - _notificationQueue.Clear(); - _changes.OnCompleted(); - _changesPreview.OnCompleted(); - - if (_countChanged.IsValueCreated) - { - _countChanged.Value.OnCompleted(); - } - - if (_suspensionTracker.IsValueCreated) - { - _suspensionTracker.Value.Dispose(); - } - } - }); + _cleanUp = Disposable.Create(NotifyCompleted); } public int Count => _readerWriter.Count; @@ -176,9 +94,7 @@ public ObservableCache(Func? keySelector = null) { lock (_locker) { - var skipCount = _notificationQueue.Count; - var countStream = skipCount > 0 ? _countChanged.Value.Skip(skipCount) : _countChanged.Value; - var source = countStream.StartWith(_readerWriter.Count).DistinctUntilChanged(); + var source = _countChanged.Value.StartWith(_readerWriter.Count).DistinctUntilChanged(); return source.SubscribeSafe(observer); } }); @@ -255,35 +171,30 @@ internal void UpdateFromIntermediate(Action> update { updateAction.ThrowArgumentNullExceptionIfNull(nameof(updateAction)); - bool shouldDrain; - lock (_locker) - { - ChangeSet? changes = null; + using var notifications = _notifications.AcquireLock(); - _editLevel++; - if (_editLevel == 1) - { - var previewHandler = _changesPreview.HasObservers ? (Action>)InvokePreview : null; - changes = _readerWriter.Write(updateAction, previewHandler, _changes.HasObservers); - } - else - { - _readerWriter.WriteNested(updateAction); - } - - _editLevel--; - - if (changes is not null && _editLevel == 0) - { - EnqueueUnderLock(changes); - } + ChangeSet? changes = null; - shouldDrain = TryStartDrain(); + _editLevel++; + if (_editLevel == 1) + { + var previewHandler = _changesPreview.HasObservers ? (Action>)InvokePreview : null; + changes = _readerWriter.Write(updateAction, previewHandler, _changes.HasObservers); } + else + { + _readerWriter.WriteNested(updateAction); + } + + _editLevel--; - if (shouldDrain) + if (changes is not null && _editLevel == 0) { - DrainOutsideLock(); + var isSuspended = _suspensionTracker.IsValueCreated && _suspensionTracker.Value.AreNotificationsSuspended; + var isCountSuspended = _suspensionTracker.IsValueCreated && _suspensionTracker.Value.IsCountSuspended; + notifications.Enqueue( + NotificationItem.CreateChanges(changes, _readerWriter.Count, isSuspended, isCountSuspended), + countAsPending: !isSuspended); } } @@ -291,35 +202,30 @@ internal void UpdateFromSource(Action> updateActio { updateAction.ThrowArgumentNullExceptionIfNull(nameof(updateAction)); - bool shouldDrain; - lock (_locker) - { - ChangeSet? changes = null; + using var notifications = _notifications.AcquireLock(); - _editLevel++; - if (_editLevel == 1) - { - var previewHandler = _changesPreview.HasObservers ? (Action>)InvokePreview : null; - changes = _readerWriter.Write(updateAction, previewHandler, _changes.HasObservers); - } - else - { - _readerWriter.WriteNested(updateAction); - } - - _editLevel--; + ChangeSet? changes = null; - if (changes is not null && _editLevel == 0) - { - EnqueueUnderLock(changes); - } - - shouldDrain = TryStartDrain(); + _editLevel++; + if (_editLevel == 1) + { + var previewHandler = _changesPreview.HasObservers ? (Action>)InvokePreview : null; + changes = _readerWriter.Write(updateAction, previewHandler, _changes.HasObservers); } + else + { + _readerWriter.WriteNested(updateAction); + } + + _editLevel--; - if (shouldDrain) + if (changes is not null && _editLevel == 0) { - DrainOutsideLock(); + var isSuspended = _suspensionTracker.IsValueCreated && _suspensionTracker.Value.AreNotificationsSuspended; + var isCountSuspended = _suspensionTracker.IsValueCreated && _suspensionTracker.Value.IsCountSuspended; + notifications.Enqueue( + NotificationItem.CreateChanges(changes, _readerWriter.Count, isSuspended, isCountSuspended), + countAsPending: !isSuspended); } } @@ -329,10 +235,8 @@ private IObservable> CreateConnectObservable(Func (IChangeSet)GetInitialUpdates(predicate)); var changesStream = skipCount > 0 ? _changes.Skip(skipCount) : _changes; @@ -357,7 +261,7 @@ private IObservable> CreateWatchObservable(TKey key) => { lock (_locker) { - var skipCount = _pendingChangesOnNextCount; + var skipCount = _notifications.PendingCount; var initial = _readerWriter.Lookup(key); if (initial.HasValue) @@ -388,280 +292,184 @@ private IObservable> CreateWatchObservable(TKey key) => /// private void InvokePreview(ChangeSet changes) { - if (changes.Count != 0) + if (changes.Count != 0 && !_notifications.IsTerminated) { _changesPreview.OnNext(changes); } } - /// - /// Enqueues a changeset (plus associated count) for delivery outside the lock. - /// Must be called while _locker is held. - /// - private void EnqueueUnderLock(ChangeSet changes) + private void NotifyCompleted() { - // Check suspension state under lock to avoid TOCTOU race. - var isSuspended = _suspensionTracker.IsValueCreated && _suspensionTracker.Value.AreNotificationsSuspended; - var isCountSuspended = _suspensionTracker.IsValueCreated && _suspensionTracker.Value.IsCountSuspended; - - _notificationQueue.Enqueue(new NotificationItem(changes, _readerWriter.Count, isSuspended, isCountSuspended)); - - if (!isSuspended) - { - _pendingChangesOnNextCount++; - } + using var notifications = _notifications.AcquireLock(); + notifications.Enqueue(NotificationItem.CreateCompleted()); } - /// - /// Attempts to claim the drain token. Returns true if this thread should drain. - /// Must be called while _locker is held. - /// - private bool TryStartDrain() + private void NotifyError(Exception ex) { - if (_isDraining || _notificationQueue.Count == 0) - { - return false; - } - - _isDraining = true; - return true; + using var notifications = _notifications.AcquireLock(); + notifications.Enqueue(NotificationItem.CreateError(ex)); } - /// - /// Delivers all pending notifications outside the lock. Only the thread that - /// successfully called TryStartDrain may call this. Serializes all OnNext - /// calls for this cache instance, preserving the Rx contract. - /// - private void DrainOutsideLock() + private bool DeliverNotification(NotificationItem item) { - try + switch (item.Kind) { - while (true) - { - NotificationItem item; - lock (_locker) - { - if (_notificationQueue.Count == 0) - { - _isDraining = false; - return; - } + case NotificationKind.Completed: + _changes.OnCompleted(); + _changesPreview.OnCompleted(); - item = _notificationQueue.Dequeue(); - - // Decrement the per-subject counter for items that will emit _changes.OnNext. - if (!item.IsSuspended && !item.IsCountOnly && !item.IsCompleted && !item.IsError) - { - _pendingChangesOnNextCount--; - } + if (_countChanged.IsValueCreated) + { + _countChanged.Value.OnCompleted(); } - DeliverNotification(item); - } - } - catch - { - lock (_locker) - { - _isDraining = false; - _pendingChangesOnNextCount = 0; - } - - throw; - } - } - - private void DeliverNotification(NotificationItem item) - { - // After Dispose or a terminal event has been delivered, skip all delivery. - // Subject.OnNext after OnCompleted is a no-op, but this avoids wasted work - // and prevents subtle ordering issues. - if (_isTerminated) - { - return; - } - - if (item.IsCompleted) - { - _isTerminated = true; - _changes.OnCompleted(); - _changesPreview.OnCompleted(); + if (_suspensionTracker.IsValueCreated) + { + _suspensionTracker.Value.Dispose(); + } + return false; - if (_countChanged.IsValueCreated) - { - _countChanged.Value.OnCompleted(); - } + case NotificationKind.Error: + _changesPreview.OnError(item.Error!); + _changes.OnError(item.Error!); - return; - } + if (_countChanged.IsValueCreated) + { + _countChanged.Value.OnError(item.Error!); + } - if (item.IsError) - { - _isTerminated = true; - _changesPreview.OnError(item.Error!); - _changes.OnError(item.Error!); - return; - } + if (_suspensionTracker.IsValueCreated) + { + _suspensionTracker.Value.Dispose(); + } + return false; - if (item.IsCountOnly) - { - if (_countChanged.IsValueCreated) - { - _countChanged.Value.OnNext(item.Count); - } + case NotificationKind.CountOnly: + if (_countChanged.IsValueCreated) + { + _countChanged.Value.OnNext(item.Count); + } - return; - } + return true; - // Suspension state was captured at enqueue time (under lock) to avoid TOCTOU. - // For unsuspended items, deliver directly. For suspended items, re-check the - // live state under lock — ResumeNotifications may have run between dequeue and - // delivery, in which case we deliver directly instead of orphaning in _pendingChanges. - if (!item.IsSuspended) - { - _changes.OnNext(item.Changes); - } - else - { - bool deliverNow; - lock (_locker) - { - if (_suspensionTracker.Value.AreNotificationsSuspended) + default: + if (!item.IsSuspended) { - _suspensionTracker.Value.EnqueueChanges(item.Changes); - deliverNow = false; + _changes.OnNext(item.Changes); } else { - deliverNow = true; + bool deliverNow; + lock (_locker) + { + if (_suspensionTracker.Value.AreNotificationsSuspended) + { + _suspensionTracker.Value.EnqueueChanges(item.Changes); + deliverNow = false; + } + else + { + deliverNow = true; + } + } + + if (deliverNow) + { + _changes.OnNext(item.Changes); + } } - } - if (deliverNow) - { - _changes.OnNext(item.Changes); - } - } + if (!item.IsCountSuspended) + { + if (_countChanged.IsValueCreated) + { + _countChanged.Value.OnNext(item.Count); + } + } - if (!item.IsCountSuspended) - { - if (_countChanged.IsValueCreated) - { - _countChanged.Value.OnNext(item.Count); - } + return true; } } - /// - /// Called by SuspensionTracker.ResumeNotifications to deliver accumulated - /// changes. This enqueues under _locker; the caller's TryStartDrain + - /// DrainOutsideLock handles delivery outside the lock. - /// - private void EnqueueChanges(ChangeSet changes) + private void ResumeCount() { - _notificationQueue.Enqueue(new NotificationItem(changes, _readerWriter.Count, isSuspended: false, isCountSuspended: false)); - _pendingChangesOnNextCount++; - } + using var notifications = _notifications.AcquireLock(); + Debug.Assert(_suspensionTracker.IsValueCreated, "Should not be Resuming Count without Suspend Count instance"); - /// - /// Called by SuspensionTracker.ResumeCount to deliver the current count. - /// - private void EnqueueCount() - { - if (_countChanged.IsValueCreated) + if (_suspensionTracker.Value.ResumeCount() && _countChanged.IsValueCreated) { - _notificationQueue.Enqueue(NotificationItem.CreateCountOnly(_readerWriter.Count)); + notifications.Enqueue(NotificationItem.CreateCountOnly(_readerWriter.Count)); } } - private void ResumeCount() + private void ResumeNotifications() { - bool shouldDrain; - lock (_locker) + using var notifications = _notifications.AcquireLock(); + Debug.Assert(_suspensionTracker.IsValueCreated, "Should not be Resuming Notifications without Suspend Notifications instance"); + + var (resumedChanges, emitResume) = _suspensionTracker.Value.ResumeNotifications(); + if (resumedChanges is not null) { - Debug.Assert(_suspensionTracker.IsValueCreated, "Should not be Resuming Count without Suspend Count instance"); - _suspensionTracker.Value.ResumeCount(); - shouldDrain = TryStartDrain(); + notifications.Enqueue( + NotificationItem.CreateChanges(resumedChanges, _readerWriter.Count, isSuspended: false, isCountSuspended: false), + countAsPending: true); } - if (shouldDrain) + if (emitResume) { - DrainOutsideLock(); + _suspensionTracker.Value.EmitResumeNotification(); } } - private void ResumeNotifications() + private enum NotificationKind { - bool shouldDrain; - lock (_locker) - { - Debug.Assert(_suspensionTracker.IsValueCreated, "Should not be Resuming Notifications without Suspend Notifications instance"); - _suspensionTracker.Value.ResumeNotifications(); - shouldDrain = TryStartDrain(); - } - - if (shouldDrain) - { - DrainOutsideLock(); - } + Changes, + CountOnly, + Completed, + Error, } private readonly record struct NotificationItem { + public NotificationKind Kind { get; } + public ChangeSet Changes { get; } public int Count { get; } - public bool IsCountOnly { get; } - public bool IsSuspended { get; } public bool IsCountSuspended { get; } - public bool IsCompleted { get; } - - public bool IsError { get; } - public Exception? Error { get; } - public NotificationItem(ChangeSet changes, int count, bool isSuspended, bool isCountSuspended) + private NotificationItem(NotificationKind kind, ChangeSet changes, int count = 0, bool isSuspended = false, bool isCountSuspended = false, Exception? error = null) { + Kind = kind; Changes = changes; Count = count; IsSuspended = isSuspended; IsCountSuspended = isCountSuspended; - } - - private NotificationItem(int count, bool isCountOnly) - { - Changes = []; - Count = count; - IsCountOnly = isCountOnly; - } - - private NotificationItem(bool isCompleted, Exception? error) - { - Changes = []; - IsCompleted = isCompleted; - IsError = error is not null; Error = error; } - public static NotificationItem CreateCountOnly(int count) => new(count, isCountOnly: true); + public static NotificationItem CreateChanges(ChangeSet changes, int count, bool isSuspended, bool isCountSuspended) => + new(NotificationKind.Changes, changes, count, isSuspended, isCountSuspended); - public static NotificationItem CreateCompleted() => new(isCompleted: true, error: null); + public static NotificationItem CreateCountOnly(int count) => + new(NotificationKind.CountOnly, [], count: count); - public static NotificationItem CreateError(Exception error) => new(isCompleted: false, error: error); + public static NotificationItem CreateCompleted() => + new(NotificationKind.Completed, []); + + public static NotificationItem CreateError(Exception error) => + new(NotificationKind.Error, [], error: error); } - private sealed class SuspensionTracker(Action> onResumeNotifications, Action onResumeCount) : IDisposable + private sealed class SuspensionTracker : IDisposable { private readonly BehaviorSubject _areNotificationsSuspended = new(false); - private readonly Action> _onResumeNotifications = onResumeNotifications; - - private readonly Action _onResumeCount = onResumeCount; - private List> _pendingChanges = []; private int _countSuspendCount; @@ -693,36 +501,29 @@ public void SuspendNotifications() public void SuspendCount() => ++_countSuspendCount; - public void ResumeNotifications() + public bool ResumeCount() => --_countSuspendCount == 0; + + public (ChangeSet? Changes, bool EmitResume) ResumeNotifications() { if (--_notifySuspendCount == 0 && !_areNotificationsSuspended.IsDisposed) { - // Swap out pending changes before the callback to handle re-entrant - // suspend/resume correctly. If a subscriber re-suspends during the - // callback, new changes go into the fresh list, not the one being delivered. + ChangeSet? changes = null; + if (_pendingChanges.Count > 0) { var changesToDeliver = _pendingChanges; _pendingChanges = []; - _onResumeNotifications(new ChangeSet(changesToDeliver)); + changes = new ChangeSet(changesToDeliver); } - // Re-check: a subscriber callback may have re-suspended during delivery. - if (_notifySuspendCount == 0) - { - _areNotificationsSuspended.OnNext(false); - } + return (changes, _notifySuspendCount == 0); } - } - public void ResumeCount() - { - if (--_countSuspendCount == 0) - { - _onResumeCount(); - } + return (null, false); } + public void EmitResumeNotification() => _areNotificationsSuspended.OnNext(false); + public void Dispose() { _areNotificationsSuspended.OnCompleted(); diff --git a/src/DynamicData/Internal/DeliveryQueue.cs b/src/DynamicData/Internal/DeliveryQueue.cs new file mode 100644 index 000000000..9d21ca39e --- /dev/null +++ b/src/DynamicData/Internal/DeliveryQueue.cs @@ -0,0 +1,222 @@ +// Copyright (c) 2011-2025 Roland Pheasant. All rights reserved. +// Roland Pheasant licenses this file to you under the MIT license. +// See the LICENSE file in the project root for full license information. + +namespace DynamicData.Internal; + +/// +/// A queue that serializes item delivery outside a caller-owned lock. +/// Use to obtain a scoped ScopedAccess for enqueueing items +/// and reading queue state. When the ScopedAccess is disposed, the lock is released +/// and pending items are delivered. Only one thread delivers at a time. +/// +/// The item type. +internal sealed class DeliveryQueue +{ + private readonly Queue<(TItem Item, bool CountAsPending)> _queue = new(); + private readonly Func _deliver; + +#if NET9_0_OR_GREATER + private readonly Lock _gate; +#else + private readonly object _gate; +#endif + + private bool _isDelivering; + private volatile bool _isTerminated; + private int _pendingCount; + + /// + /// Initializes a new instance of the class. + /// + /// The lock shared with the caller. The queue acquires this + /// lock during and during the dequeue step of delivery. + /// Callback invoked for each item, outside the lock. Returns false if the item was terminal, which stops further delivery. +#if NET9_0_OR_GREATER + public DeliveryQueue(Lock gate, Func deliver) +#else + public DeliveryQueue(object gate, Func deliver) +#endif + { + _gate = gate; + _deliver = deliver; + } + + /// + /// Gets whether this queue has been terminated. Safe to read from any thread. + /// + public bool IsTerminated => _isTerminated; + + /// + /// Gets the number of pending items enqueued with countAsPending: true. + /// Must be read while the caller holds the gate. + /// + public int PendingCount => _pendingCount; + + /// + /// Acquires the gate and returns a scoped ScopedAccess for enqueueing items and + /// reading queue state. When the ScopedAccess is disposed, the gate is released + /// and delivery runs if needed. The ScopedAccess is a ref struct and cannot + /// escape the calling method. + /// + public ScopedAccess AcquireLock() => new(this); + + private void EnterLock() + { +#if NET9_0_OR_GREATER + _gate.Enter(); +#else + Monitor.Enter(_gate); +#endif + } + + private void EnqueueItem(TItem item, bool countAsPending) + { + if (_isTerminated) + { + return; + } + + _queue.Enqueue((item, countAsPending)); + + if (countAsPending) + { + _pendingCount++; + } + } + + private void ExitLockAndDeliver() + { + // Before releasing the lock, check if we should start delivery. Only one thread can succeed + var shouldDeliver = TryStartDelivery(); + + // Now release the lock. We do this before delivering to allow other threads to enqueue items while delivery is in progress. +#if NET9_0_OR_GREATER + _gate.Exit(); +#else + Monitor.Exit(_gate); +#endif + + // If this thread has been chosen to deliver, do it now that the lock is released. + // If not, another thread is already delivering or there are no items to deliver. + if (shouldDeliver) + { + DeliverAll(); + } + + bool TryStartDelivery() + { + // Bail if something is already delivering or there's nothing to do + if (_isDelivering || _queue.Count == 0) + { + return false; + } + + // Mark that we're doing the delivering + _isDelivering = true; + return true; + } + + void DeliverAll() + { + try + { + while (true) + { + TItem item; + + // Inside of the lock, see if there is work and get the next item to deliver. + // If there is no work, mark that we're done delivering and exit. + lock (_gate) + { + if (_queue.Count == 0) + { + _isDelivering = false; + return; + } + + var entry = _queue.Dequeue(); + item = entry.Item; + + if (entry.CountAsPending) + { + _pendingCount--; + } + } + + // Now the lock is release, we can deliver the item + // If delivery returns false, it means the item was terminal and we should stop delivering and clear the queue. + if (!_deliver(item)) + { + lock (_gate) + { + _isTerminated = true; + _isDelivering = false; + _pendingCount = 0; + _queue.Clear(); + } + + return; + } + } + } + catch + { + // If anything bad happens, we must release the flag so that deliveries aren't stuck + lock (_gate) + { + _isDelivering = false; + } + + throw; + } + } + } + + /// + /// A scoped ScopedAccess for working under the gate lock. All queue mutation and + /// state reads go through this ScopedAccess, ensuring the lock is held. Disposing + /// releases the lock and triggers delivery if needed. + /// + public ref struct ScopedAccess + { + private DeliveryQueue? _owner; + + internal ScopedAccess(DeliveryQueue owner) + { + _owner = owner; + owner.EnterLock(); + } + + /// + /// Gets the number of pending items that were enqueued with + /// countAsPending: true and have not yet been dequeued for delivery. + /// + public readonly int PendingCount => _owner?._pendingCount ?? 0; + + /// + /// Adds an item to the queue. Ignored if the queue has been terminated. + /// + /// The item to enqueue. + /// True if this item should be tracked by + /// . The count is automatically decremented + /// when the item is dequeued for delivery. + public readonly void Enqueue(TItem item, bool countAsPending = false) => _owner?.EnqueueItem(item, countAsPending); + + /// + /// Releases the gate lock and delivers pending items if this thread + /// holds the delivery token. + /// + public void Dispose() + { + var owner = _owner; + if (owner is null) + { + return; + } + + _owner = null; + owner.ExitLockAndDeliver(); + } + } +} From ab41353330455cbeedd1893bcfdf1a1d938f3be6 Mon Sep 17 00:00:00 2001 From: "Darrin W. Cullop" Date: Mon, 6 Apr 2026 23:08:31 -0700 Subject: [PATCH 05/16] Add read-only lock for DeliveryQueue and improve safety Introduce ReadOnlyScopedAccess to DeliveryQueue for safe, read-only access to queue state under lock. Update tests and ObservableCache to use AcquireReadLock() for reading PendingCount, replacing direct property access and manual locking. Make PendingCount private and encapsulate lock release logic. Wrap _suspensionTracker disposal in a lock for thread safety. These changes improve thread safety and clarify access patterns for queue state. --- .../Internal/DeliveryQueueFixture.cs | 9 +- src/DynamicData/Cache/ObservableCache.cs | 85 ++++++++++--------- src/DynamicData/Internal/DeliveryQueue.cs | 60 +++++++++++-- 3 files changed, 104 insertions(+), 50 deletions(-) diff --git a/src/DynamicData.Tests/Internal/DeliveryQueueFixture.cs b/src/DynamicData.Tests/Internal/DeliveryQueueFixture.cs index ad111dc41..909cd9cf9 100644 --- a/src/DynamicData.Tests/Internal/DeliveryQueueFixture.cs +++ b/src/DynamicData.Tests/Internal/DeliveryQueueFixture.cs @@ -316,9 +316,9 @@ public void PendingCountPreservedOnException() act.Should().Throw(); - lock (_gate) + using (var rl = queue.AcquireReadLock()) { - queue.PendingCount.Should().Be(1, "only the dequeued item should be decremented"); + rl.PendingCount.Should().Be(1, "only the dequeued item should be decremented"); } } @@ -334,7 +334,10 @@ public void PendingCountClearedOnTermination() notifications.Enqueue("STOP"); } - queue.PendingCount.Should().Be(0); + using (var rl = queue.AcquireReadLock()) + { + rl.PendingCount.Should().Be(0); + } } // Category 6: Stress / Thread Safety diff --git a/src/DynamicData/Cache/ObservableCache.cs b/src/DynamicData/Cache/ObservableCache.cs index c7d54d994..9fcc7dfdc 100644 --- a/src/DynamicData/Cache/ObservableCache.cs +++ b/src/DynamicData/Cache/ObservableCache.cs @@ -92,11 +92,10 @@ public ObservableCache(Func? keySelector = null) Observable.Create( observer => { - lock (_locker) - { - var source = _countChanged.Value.StartWith(_readerWriter.Count).DistinctUntilChanged(); - return source.SubscribeSafe(observer); - } + using var readLock = _notifications.AcquireReadLock(); + + var source = _countChanged.Value.StartWith(_readerWriter.Count).DistinctUntilChanged(); + return source.SubscribeSafe(observer); }); public IReadOnlyList Items => _readerWriter.Items; @@ -233,56 +232,54 @@ private IObservable> CreateConnectObservable(Func>( observer => { - lock (_locker) - { - // Skip pending notifications to avoid duplicating items already in the snapshot. - var skipCount = _notifications.PendingCount; + using var readLock = _notifications.AcquireReadLock(); - var initial = InternalEx.Return(() => (IChangeSet)GetInitialUpdates(predicate)); - var changesStream = skipCount > 0 ? _changes.Skip(skipCount) : _changes; - var changes = initial.Concat(changesStream); + // Skip pending notifications to avoid duplicating items already in the snapshot. + var skipCount = readLock.PendingCount; - if (predicate != null) - { - changes = changes.Filter(predicate, suppressEmptyChangeSets); - } - else if (suppressEmptyChangeSets) - { - changes = changes.NotEmpty(); - } + var initial = InternalEx.Return(() => (IChangeSet)GetInitialUpdates(predicate)); + var changesStream = skipCount > 0 ? _changes.Skip(skipCount) : _changes; + var changes = initial.Concat(changesStream); - return changes.SubscribeSafe(observer); + if (predicate != null) + { + changes = changes.Filter(predicate, suppressEmptyChangeSets); + } + else if (suppressEmptyChangeSets) + { + changes = changes.NotEmpty(); } + + return changes.SubscribeSafe(observer); }); private IObservable> CreateWatchObservable(TKey key) => Observable.Create>( observer => { - lock (_locker) + using var readLock = _notifications.AcquireReadLock(); + + var skipCount = readLock.PendingCount; + + var initial = _readerWriter.Lookup(key); + if (initial.HasValue) { - var skipCount = _notifications.PendingCount; + observer.OnNext(new Change(ChangeReason.Add, key, initial.Value)); + } - var initial = _readerWriter.Lookup(key); - if (initial.HasValue) + var changesStream = skipCount > 0 ? _changes.Skip(skipCount) : _changes; + return changesStream.Finally(observer.OnCompleted).Subscribe( + changes => { - observer.OnNext(new Change(ChangeReason.Add, key, initial.Value)); - } - - var changesStream = skipCount > 0 ? _changes.Skip(skipCount) : _changes; - return changesStream.Finally(observer.OnCompleted).Subscribe( - changes => + foreach (var change in changes.ToConcreteType()) { - foreach (var change in changes.ToConcreteType()) + var match = EqualityComparer.Default.Equals(change.Key, key); + if (match) { - var match = EqualityComparer.Default.Equals(change.Key, key); - if (match) - { - observer.OnNext(change); - } + observer.OnNext(change); } - }); - } + } + }); }); /// @@ -325,7 +322,10 @@ private bool DeliverNotification(NotificationItem item) if (_suspensionTracker.IsValueCreated) { - _suspensionTracker.Value.Dispose(); + lock (_locker) + { + _suspensionTracker.Value.Dispose(); + } } return false; @@ -340,7 +340,10 @@ private bool DeliverNotification(NotificationItem item) if (_suspensionTracker.IsValueCreated) { - _suspensionTracker.Value.Dispose(); + lock (_locker) + { + _suspensionTracker.Value.Dispose(); + } } return false; diff --git a/src/DynamicData/Internal/DeliveryQueue.cs b/src/DynamicData/Internal/DeliveryQueue.cs index 9d21ca39e..cca5e30a5 100644 --- a/src/DynamicData/Internal/DeliveryQueue.cs +++ b/src/DynamicData/Internal/DeliveryQueue.cs @@ -51,7 +51,7 @@ public DeliveryQueue(object gate, Func deliver) /// Gets the number of pending items enqueued with countAsPending: true. /// Must be read while the caller holds the gate. /// - public int PendingCount => _pendingCount; + private int PendingCount => _pendingCount; /// /// Acquires the gate and returns a scoped ScopedAccess for enqueueing items and @@ -61,6 +61,13 @@ public DeliveryQueue(object gate, Func deliver) /// public ScopedAccess AcquireLock() => new(this); + /// + /// Acquires the gate for read-only access and returns a scoped handle. + /// Provides access to queue state (e.g., ) but + /// cannot enqueue items and does not trigger delivery on dispose. + /// + public ReadOnlyScopedAccess AcquireReadLock() => new(this); + private void EnterLock() { #if NET9_0_OR_GREATER @@ -70,6 +77,15 @@ private void EnterLock() #endif } + private void ExitLock() + { +#if NET9_0_OR_GREATER + _gate.Exit(); +#else + Monitor.Exit(_gate); +#endif + } + private void EnqueueItem(TItem item, bool countAsPending) { if (_isTerminated) @@ -91,11 +107,7 @@ private void ExitLockAndDeliver() var shouldDeliver = TryStartDelivery(); // Now release the lock. We do this before delivering to allow other threads to enqueue items while delivery is in progress. -#if NET9_0_OR_GREATER - _gate.Exit(); -#else - Monitor.Exit(_gate); -#endif + ExitLock(); // If this thread has been chosen to deliver, do it now that the lock is released. // If not, another thread is already delivering or there are no items to deliver. @@ -219,4 +231,40 @@ public void Dispose() owner.ExitLockAndDeliver(); } } + + /// + /// A read-only scoped handle for reading queue state under the gate lock. + /// Cannot enqueue items and does not trigger delivery on dispose. + /// + public ref struct ReadOnlyScopedAccess + { + private DeliveryQueue? _owner; + + internal ReadOnlyScopedAccess(DeliveryQueue owner) + { + _owner = owner; + owner.EnterLock(); + } + + /// + /// Gets the number of pending items that were enqueued with + /// countAsPending: true and have not yet been dequeued for delivery. + /// + public readonly int PendingCount => _owner?._pendingCount ?? 0; + + /// + /// Releases the gate lock. Does not trigger delivery. + /// + public void Dispose() + { + var owner = _owner; + if (owner is null) + { + return; + } + + _owner = null; + owner.ExitLock(); + } + } } From c4593654ff278acb4fde237fcc20e3f643e35c98 Mon Sep 17 00:00:00 2001 From: "Darrin W. Cullop" Date: Mon, 6 Apr 2026 23:22:23 -0700 Subject: [PATCH 06/16] Refactor cross-cache deadlock test to use operators Refactored DirectCrossWriteDoesNotDeadlock to use Connect, Filter, Transform, and PopulateInto operators for bidirectional cache updates, replacing manual subscription logic. Increased test timeout and clarified assertion message. Prevented infinite feedback with key prefix filtering. --- .../Cache/SourceCacheFixture.cs | 35 +++++++------------ 1 file changed, 12 insertions(+), 23 deletions(-) diff --git a/src/DynamicData.Tests/Cache/SourceCacheFixture.cs b/src/DynamicData.Tests/Cache/SourceCacheFixture.cs index c3c039455..62ca8be86 100644 --- a/src/DynamicData.Tests/Cache/SourceCacheFixture.cs +++ b/src/DynamicData.Tests/Cache/SourceCacheFixture.cs @@ -228,7 +228,6 @@ public async Task ConcurrentEditsShouldNotDeadlockWithSubscribersThatModifyOther results.Data.Items.Should().BeEquivalentTo([.. cacheA.Items, .. cacheB.Items], "all items should be in the destination"); } - [Fact] public async Task DirectCrossWriteDoesNotDeadlock() { @@ -239,27 +238,17 @@ public async Task DirectCrossWriteDoesNotDeadlock() using var cacheA = new SourceCache(static x => x.Key); using var cacheB = new SourceCache(static x => x.Key); - using var subA = cacheA.Connect().Subscribe(changes => - { - foreach (var c in changes) - { - if (c.Reason == ChangeReason.Add && !c.Current.Key.StartsWith("x")) - { - cacheB.AddOrUpdate(new TestItem("x" + c.Current.Key, c.Current.Value)); - } - } - }); + // Bidirectional: A items flow into B, B items flow into A. + // Filter by prefix prevents infinite feedback. + using var aToB = cacheA.Connect() + .Filter(static x => x.Key.StartsWith('a')) + .Transform(static (item, _) => new TestItem("from-a-" + item.Key, item.Value)) + .PopulateInto(cacheB); - using var subB = cacheB.Connect().Subscribe(changes => - { - foreach (var c in changes) - { - if (c.Reason == ChangeReason.Add && !c.Current.Key.StartsWith("x")) - { - cacheA.AddOrUpdate(new TestItem("x" + c.Current.Key, c.Current.Value)); - } - } - }); + using var bToA = cacheB.Connect() + .Filter(static x => x.Key.StartsWith('b')) + .Transform(static (item, _) => new TestItem("from-b-" + item.Key, item.Value)) + .PopulateInto(cacheA); var barrier = new Barrier(2); @@ -282,9 +271,9 @@ public async Task DirectCrossWriteDoesNotDeadlock() }); var completed = Task.WhenAll(taskA, taskB); - var finished = await Task.WhenAny(completed, Task.Delay(TimeSpan.FromSeconds(5))); + var finished = await Task.WhenAny(completed, Task.Delay(TimeSpan.FromSeconds(30))); - finished.Should().BeSameAs(completed, $"iteration {iter}: direct cross-cache writes should not deadlock"); + finished.Should().BeSameAs(completed, $"iteration {iter}: bidirectional cross-cache writes should not deadlock"); } } private sealed record TestItem(string Key, string Value); From 8eb5ffb3a091232b65a73733a44788250eef1ebe Mon Sep 17 00:00:00 2001 From: "Darrin W. Cullop" Date: Tue, 7 Apr 2026 11:54:22 -0700 Subject: [PATCH 07/16] Simplify delivery queue; remove pending count logic Refactored DeliveryQueue to eliminate pending item tracking and PendingCount, removing related read-only lock APIs. ObservableCache now ensures new subscribers do not receive in-flight notifications by connecting under the main lock, preventing duplicate deliveries without pending count logic. NotificationItem and delivery logic were simplified to check suspension state at delivery time. Updated tests: removed PendingCount tests and added a test to verify no duplicate notifications during delivery. Improved comments and code clarity. --- .../Cache/SourceCacheFixture.cs | 49 ++++ .../Internal/DeliveryQueueFixture.cs | 85 +------ src/DynamicData/Cache/ObservableCache.cs | 228 ++++++++---------- src/DynamicData/Internal/DeliveryQueue.cs | 123 ++-------- 4 files changed, 177 insertions(+), 308 deletions(-) diff --git a/src/DynamicData.Tests/Cache/SourceCacheFixture.cs b/src/DynamicData.Tests/Cache/SourceCacheFixture.cs index 62ca8be86..ede51d114 100644 --- a/src/DynamicData.Tests/Cache/SourceCacheFixture.cs +++ b/src/DynamicData.Tests/Cache/SourceCacheFixture.cs @@ -1,4 +1,5 @@ using System; +using System.Collections.Generic; using System.Linq; using System.Reactive.Linq; using System.Threading; @@ -276,5 +277,53 @@ public async Task DirectCrossWriteDoesNotDeadlock() finished.Should().BeSameAs(completed, $"iteration {iter}: bidirectional cross-cache writes should not deadlock"); } } + + [Fact] + public void ConnectDuringDeliveryDoesNotDuplicate() + { + // Proves the dequeue-to-delivery gap. A slow subscriber holds delivery + // while another thread connects. The new subscriber's snapshot is taken + // under the lock, so it will not see a duplicate Add. + using var cache = new SourceCache(static x => x.Key); + + var delivering = new ManualResetEventSlim(false); + var connectDone = new ManualResetEventSlim(false); + + // First subscriber: signals when delivery starts, then waits + using var slowSub = cache.Connect().Subscribe(_ => + { + delivering.Set(); + connectDone.Wait(TimeSpan.FromSeconds(5)); + }); + + // Write one item -- delivery starts, slow subscriber blocks + var writeTask = Task.Run(() => cache.AddOrUpdate(new TestItem("k1", "v1"))); + + // Wait for delivery to be in progress + delivering.Wait(TimeSpan.FromSeconds(5)); + + // Now Connect on the main thread while delivery is in progress. + // The item is already committed to ReaderWriter and dequeued from + // the delivery queue, but OnNext hasn't finished iterating observers. + var duplicateKeys = new List(); + using var newSub = cache.Connect().Subscribe(changes => + { + foreach (var c in changes) + { + if (c.Reason == ChangeReason.Add) + { + duplicateKeys.Add(c.Current.Key); + } + } + }); + + // Let delivery finish + connectDone.Set(); + writeTask.Wait(TimeSpan.FromSeconds(5)); + + // Check: k1 should appear exactly once (either in snapshot or stream, not both) + var k1Count = duplicateKeys.Count(k => k == "k1"); + k1Count.Should().Be(1, "k1 should appear exactly once via Connect, not duplicated from snapshot + in-flight delivery"); + } private sealed record TestItem(string Key, string Value); } diff --git a/src/DynamicData.Tests/Internal/DeliveryQueueFixture.cs b/src/DynamicData.Tests/Internal/DeliveryQueueFixture.cs index 909cd9cf9..276d74526 100644 --- a/src/DynamicData.Tests/Internal/DeliveryQueueFixture.cs +++ b/src/DynamicData.Tests/Internal/DeliveryQueueFixture.cs @@ -6,9 +6,7 @@ using System.Threading.Tasks; using DynamicData.Internal; - using FluentAssertions; - using Xunit; namespace DynamicData.Tests.Internal; @@ -32,8 +30,6 @@ private static void TriggerDelivery(DeliveryQueue queue) using var notifications = queue.AcquireLock(); } - // Category 1: Basic Behavior - [Fact] public void EnqueueAndDeliverDeliversItem() { @@ -72,8 +68,6 @@ public void DeliverWithEmptyQueueIsNoOp() delivered.Should().BeEmpty(); } - // Category 2: Delivery Token Serialization - [Fact] public async Task OnlyOneDelivererAtATime() { @@ -166,8 +160,6 @@ public void ReentrantEnqueueDoesNotRecurse() maxDepth.Should().Be(1, "delivery callback should not recurse"); } - // Category 3: Exception Safety - [Fact] public void ExceptionInDeliveryResetsDeliveryToken() { @@ -222,8 +214,6 @@ public void RemainingItemsDeliveredAfterExceptionRecovery() delivered.Should().Equal("B"); } - // Category 4: Termination - [Fact] public void TerminalCallbackStopsDelivery() { @@ -269,79 +259,6 @@ public void IsTerminatedIsFalseInitially() queue.IsTerminated.Should().BeFalse(); } - // Category 5: PendingCount - - [Fact] - public void PendingCountTracksAutomatically() - { - var queue = new DeliveryQueue(_gate, _ => true); - - using (var notifications = queue.AcquireLock()) - { - notifications.PendingCount.Should().Be(0); - - notifications.Enqueue("A", countAsPending: true); - notifications.Enqueue("B", countAsPending: true); - notifications.Enqueue("C"); - - notifications.PendingCount.Should().Be(2); - } - - using (var notifications = queue.AcquireLock()) - { - notifications.PendingCount.Should().Be(0, "pending count should auto-decrement on delivery"); - } - } - - [Fact] - public void PendingCountPreservedOnException() - { - var callCount = 0; - var queue = new DeliveryQueue(_gate, _ => - { - if (++callCount == 1) - { - throw new InvalidOperationException("boom"); - } - - return true; - }); - - var act = () => - { - using var notifications = queue.AcquireLock(); - notifications.Enqueue("A", countAsPending: true); - notifications.Enqueue("B", countAsPending: true); - }; - - act.Should().Throw(); - - using (var rl = queue.AcquireReadLock()) - { - rl.PendingCount.Should().Be(1, "only the dequeued item should be decremented"); - } - } - - [Fact] - public void PendingCountClearedOnTermination() - { - var queue = new DeliveryQueue(_gate, item => item != "STOP"); - - using (var notifications = queue.AcquireLock()) - { - notifications.Enqueue("A", countAsPending: true); - notifications.Enqueue("B", countAsPending: true); - notifications.Enqueue("STOP"); - } - - using (var rl = queue.AcquireReadLock()) - { - rl.PendingCount.Should().Be(0); - } - } - - // Category 6: Stress / Thread Safety - [Fact] public async Task ConcurrentEnqueueAllItemsDelivered() { @@ -412,4 +329,4 @@ public async Task ConcurrentEnqueuePreservesPerThreadOrdering() sequences.Should().BeInAscendingOrder($"items from thread {thread} should preserve enqueue order"); } } -} \ No newline at end of file +} diff --git a/src/DynamicData/Cache/ObservableCache.cs b/src/DynamicData/Cache/ObservableCache.cs index 9fcc7dfdc..6568efab1 100644 --- a/src/DynamicData/Cache/ObservableCache.cs +++ b/src/DynamicData/Cache/ObservableCache.cs @@ -59,11 +59,7 @@ public ObservableCache(IObservable> source) if (changes is not null) { - var isSuspended = _suspensionTracker.IsValueCreated && _suspensionTracker.Value.AreNotificationsSuspended; - var isCountSuspended = _suspensionTracker.IsValueCreated && _suspensionTracker.Value.IsCountSuspended; - notifications.Enqueue( - NotificationItem.CreateChanges(changes, _readerWriter.Count, isSuspended, isCountSuspended), - countAsPending: !isSuspended); + notifications.Enqueue(NotificationItem.CreateChanges(changes, _readerWriter.Count)); } }, NotifyError, @@ -92,10 +88,11 @@ public ObservableCache(Func? keySelector = null) Observable.Create( observer => { - using var readLock = _notifications.AcquireReadLock(); - - var source = _countChanged.Value.StartWith(_readerWriter.Count).DistinctUntilChanged(); - return source.SubscribeSafe(observer); + lock (_locker) + { + var source = _countChanged.Value.StartWith(_readerWriter.Count).DistinctUntilChanged(); + return source.SubscribeSafe(observer); + } }); public IReadOnlyList Items => _readerWriter.Items; @@ -189,11 +186,7 @@ internal void UpdateFromIntermediate(Action> update if (changes is not null && _editLevel == 0) { - var isSuspended = _suspensionTracker.IsValueCreated && _suspensionTracker.Value.AreNotificationsSuspended; - var isCountSuspended = _suspensionTracker.IsValueCreated && _suspensionTracker.Value.IsCountSuspended; - notifications.Enqueue( - NotificationItem.CreateChanges(changes, _readerWriter.Count, isSuspended, isCountSuspended), - countAsPending: !isSuspended); + notifications.Enqueue(NotificationItem.CreateChanges(changes, _readerWriter.Count)); } } @@ -220,11 +213,7 @@ internal void UpdateFromSource(Action> updateActio if (changes is not null && _editLevel == 0) { - var isSuspended = _suspensionTracker.IsValueCreated && _suspensionTracker.Value.AreNotificationsSuspended; - var isCountSuspended = _suspensionTracker.IsValueCreated && _suspensionTracker.Value.IsCountSuspended; - notifications.Enqueue( - NotificationItem.CreateChanges(changes, _readerWriter.Count, isSuspended, isCountSuspended), - countAsPending: !isSuspended); + notifications.Enqueue(NotificationItem.CreateChanges(changes, _readerWriter.Count)); } } @@ -232,54 +221,53 @@ private IObservable> CreateConnectObservable(Func>( observer => { - using var readLock = _notifications.AcquireReadLock(); - - // Skip pending notifications to avoid duplicating items already in the snapshot. - var skipCount = readLock.PendingCount; + // Subject snapshots its observer array before iterating OnNext, so a + // subscriber added here will not receive any in-flight notification. + lock (_locker) + { + var initial = InternalEx.Return(() => (IChangeSet)GetInitialUpdates(predicate)); + var changes = initial.Concat(_changes); - var initial = InternalEx.Return(() => (IChangeSet)GetInitialUpdates(predicate)); - var changesStream = skipCount > 0 ? _changes.Skip(skipCount) : _changes; - var changes = initial.Concat(changesStream); + if (predicate != null) + { + changes = changes.Filter(predicate, suppressEmptyChangeSets); + } + else if (suppressEmptyChangeSets) + { + changes = changes.NotEmpty(); + } - if (predicate != null) - { - changes = changes.Filter(predicate, suppressEmptyChangeSets); - } - else if (suppressEmptyChangeSets) - { - changes = changes.NotEmpty(); + return changes.SubscribeSafe(observer); } - - return changes.SubscribeSafe(observer); }); private IObservable> CreateWatchObservable(TKey key) => Observable.Create>( observer => { - using var readLock = _notifications.AcquireReadLock(); - - var skipCount = readLock.PendingCount; - - var initial = _readerWriter.Lookup(key); - if (initial.HasValue) + // Subject snapshots its observer array before iterating OnNext, so a + // subscriber added here will not receive any in-flight notification. + lock (_locker) { - observer.OnNext(new Change(ChangeReason.Add, key, initial.Value)); - } - - var changesStream = skipCount > 0 ? _changes.Skip(skipCount) : _changes; - return changesStream.Finally(observer.OnCompleted).Subscribe( - changes => + var initial = _readerWriter.Lookup(key); + if (initial.HasValue) { - foreach (var change in changes.ToConcreteType()) + observer.OnNext(new Change(ChangeReason.Add, key, initial.Value)); + } + + return _changes.Finally(observer.OnCompleted).Subscribe( + changes => { - var match = EqualityComparer.Default.Equals(change.Key, key); - if (match) + foreach (var change in changes.ToConcreteType()) { - observer.OnNext(change); + var match = EqualityComparer.Default.Equals(change.Key, key); + if (match) + { + observer.OnNext(change); + } } - } - }); + }); + } }); /// @@ -307,6 +295,16 @@ private void NotifyError(Exception ex) notifications.Enqueue(NotificationItem.CreateError(ex)); } + /// + /// Delivers a single notification to subscribers. This method is the delivery + /// callback for and must never be called directly. + /// It is invoked by the after releasing the + /// lock, which guarantees that no lock is held when subscriber code runs. The + /// queue's single-deliverer token ensures this method is never called concurrently, + /// preserving the Rx serialization contract across all subjects. + /// Returns true to continue delivery, or false for terminal items (OnCompleted/OnError) + /// which causes the queue to self-terminate. + /// private bool DeliverNotification(NotificationItem item) { switch (item.Kind) @@ -327,6 +325,7 @@ private bool DeliverNotification(NotificationItem item) _suspensionTracker.Value.Dispose(); } } + return false; case NotificationKind.Error: @@ -345,52 +344,53 @@ private bool DeliverNotification(NotificationItem item) _suspensionTracker.Value.Dispose(); } } + return false; case NotificationKind.CountOnly: - if (_countChanged.IsValueCreated) - { - _countChanged.Value.OnNext(item.Count); - } - + EmitCount(item.Count); return true; default: - if (!item.IsSuspended) - { - _changes.OnNext(item.Changes); - } - else - { - bool deliverNow; - lock (_locker) - { - if (_suspensionTracker.Value.AreNotificationsSuspended) - { - _suspensionTracker.Value.EnqueueChanges(item.Changes); - deliverNow = false; - } - else - { - deliverNow = true; - } - } + EmitChanges(item.Changes); + EmitCount(item.Count); + return true; + } - if (deliverNow) + void EmitChanges(ChangeSet changes) + { + if (_suspensionTracker.IsValueCreated) + { + lock (_locker) + { + if (_suspensionTracker.Value.AreNotificationsSuspended) { - _changes.OnNext(item.Changes); + _suspensionTracker.Value.EnqueueChanges(changes); + return; } } + } + + _changes.OnNext(changes); + } - if (!item.IsCountSuspended) + void EmitCount(int count) + { + if (_suspensionTracker.IsValueCreated) + { + lock (_locker) { - if (_countChanged.IsValueCreated) + if (_suspensionTracker.Value.IsCountSuspended) { - _countChanged.Value.OnNext(item.Count); + return; } } + } - return true; + if (_countChanged.IsValueCreated) + { + _countChanged.Value.OnNext(count); + } } } @@ -407,21 +407,25 @@ private void ResumeCount() private void ResumeNotifications() { - using var notifications = _notifications.AcquireLock(); - Debug.Assert(_suspensionTracker.IsValueCreated, "Should not be Resuming Notifications without Suspend Notifications instance"); - - var (resumedChanges, emitResume) = _suspensionTracker.Value.ResumeNotifications(); - if (resumedChanges is not null) + using (var notifications = _notifications.AcquireLock()) { - notifications.Enqueue( - NotificationItem.CreateChanges(resumedChanges, _readerWriter.Count, isSuspended: false, isCountSuspended: false), - countAsPending: true); - } + Debug.Assert(_suspensionTracker.IsValueCreated, "Should not be Resuming Notifications without Suspend Notifications instance"); - if (emitResume) - { - _suspensionTracker.Value.EmitResumeNotification(); + var (changes, emitResume) = _suspensionTracker.Value.ResumeNotifications(); + if (changes is not null) + { + notifications.Enqueue(NotificationItem.CreateChanges(changes, _readerWriter.Count)); + } + + if (!emitResume) + { + return; + } } + + // Emit the resume signal after releasing the lock so that deferred + // Connect/Watch subscribers are activated outside the lock scope. + _suspensionTracker.Value.EmitResumeNotification(); } private enum NotificationKind @@ -432,41 +436,19 @@ private enum NotificationKind Error, } - private readonly record struct NotificationItem + private readonly record struct NotificationItem(NotificationKind Kind, ChangeSet Changes, int Count = 0, Exception? Error = null) { - public NotificationKind Kind { get; } - - public ChangeSet Changes { get; } - - public int Count { get; } - - public bool IsSuspended { get; } - - public bool IsCountSuspended { get; } - - public Exception? Error { get; } - - private NotificationItem(NotificationKind kind, ChangeSet changes, int count = 0, bool isSuspended = false, bool isCountSuspended = false, Exception? error = null) - { - Kind = kind; - Changes = changes; - Count = count; - IsSuspended = isSuspended; - IsCountSuspended = isCountSuspended; - Error = error; - } - - public static NotificationItem CreateChanges(ChangeSet changes, int count, bool isSuspended, bool isCountSuspended) => - new(NotificationKind.Changes, changes, count, isSuspended, isCountSuspended); + public static NotificationItem CreateChanges(ChangeSet changes, int count) => + new(NotificationKind.Changes, changes, count); public static NotificationItem CreateCountOnly(int count) => - new(NotificationKind.CountOnly, [], count: count); + new(NotificationKind.CountOnly, [], count); public static NotificationItem CreateCompleted() => new(NotificationKind.Completed, []); public static NotificationItem CreateError(Exception error) => - new(NotificationKind.Error, [], error: error); + new(NotificationKind.Error, [], Error: error); } private sealed class SuspensionTracker : IDisposable @@ -519,7 +501,7 @@ public void SuspendNotifications() changes = new ChangeSet(changesToDeliver); } - return (changes, _notifySuspendCount == 0); + return (changes, true); } return (null, false); diff --git a/src/DynamicData/Internal/DeliveryQueue.cs b/src/DynamicData/Internal/DeliveryQueue.cs index cca5e30a5..6aa65421c 100644 --- a/src/DynamicData/Internal/DeliveryQueue.cs +++ b/src/DynamicData/Internal/DeliveryQueue.cs @@ -6,14 +6,14 @@ namespace DynamicData.Internal; /// /// A queue that serializes item delivery outside a caller-owned lock. -/// Use to obtain a scoped ScopedAccess for enqueueing items -/// and reading queue state. When the ScopedAccess is disposed, the lock is released +/// Use to obtain a scoped ScopedAccess for enqueueing items. +/// When the ScopedAccess is disposed, the lock is released /// and pending items are delivered. Only one thread delivers at a time. /// /// The item type. internal sealed class DeliveryQueue { - private readonly Queue<(TItem Item, bool CountAsPending)> _queue = new(); + private readonly Queue _queue = new(); private readonly Func _deliver; #if NET9_0_OR_GREATER @@ -24,7 +24,6 @@ internal sealed class DeliveryQueue private bool _isDelivering; private volatile bool _isTerminated; - private int _pendingCount; /// /// Initializes a new instance of the class. @@ -48,57 +47,31 @@ public DeliveryQueue(object gate, Func deliver) public bool IsTerminated => _isTerminated; /// - /// Gets the number of pending items enqueued with countAsPending: true. - /// Must be read while the caller holds the gate. - /// - private int PendingCount => _pendingCount; - - /// - /// Acquires the gate and returns a scoped ScopedAccess for enqueueing items and - /// reading queue state. When the ScopedAccess is disposed, the gate is released + /// Acquires the gate and returns a scoped ScopedAccess for enqueueing items. + /// When the ScopedAccess is disposed, the gate is released /// and delivery runs if needed. The ScopedAccess is a ref struct and cannot /// escape the calling method. /// public ScopedAccess AcquireLock() => new(this); - /// - /// Acquires the gate for read-only access and returns a scoped handle. - /// Provides access to queue state (e.g., ) but - /// cannot enqueue items and does not trigger delivery on dispose. - /// - public ReadOnlyScopedAccess AcquireReadLock() => new(this); - - private void EnterLock() - { #if NET9_0_OR_GREATER - _gate.Enter(); -#else - Monitor.Enter(_gate); -#endif - } + private void EnterLock() => _gate.Enter(); - private void ExitLock() - { -#if NET9_0_OR_GREATER - _gate.Exit(); + private void ExitLock() => _gate.Exit(); #else - Monitor.Exit(_gate); + private void EnterLock() => Monitor.Enter(_gate); + + private void ExitLock() => Monitor.Exit(_gate); #endif - } - private void EnqueueItem(TItem item, bool countAsPending) + private void EnqueueItem(TItem item) { if (_isTerminated) { return; } - _queue.Enqueue((item, countAsPending)); - - if (countAsPending) - { - _pendingCount++; - } + _queue.Enqueue(item); } private void ExitLockAndDeliver() @@ -147,24 +120,18 @@ void DeliverAll() return; } - var entry = _queue.Dequeue(); - item = entry.Item; - - if (entry.CountAsPending) - { - _pendingCount--; - } + item = _queue.Dequeue(); } - // Now the lock is release, we can deliver the item - // If delivery returns false, it means the item was terminal and we should stop delivering and clear the queue. + // Outside of the lock, invoke the callback to deliver the item. + // If delivery returns false, it means the item was terminal + // and we should stop delivering and clear the queue. if (!_deliver(item)) { lock (_gate) { _isTerminated = true; _isDelivering = false; - _pendingCount = 0; _queue.Clear(); } @@ -172,22 +139,21 @@ void DeliverAll() } } } - catch + finally { - // If anything bad happens, we must release the flag so that deliveries aren't stuck + // Safety net: if an exception bypassed the normal exit paths, + // ensure _isDelivering is reset so the queue doesn't get stuck. lock (_gate) { _isDelivering = false; } - - throw; } } } /// - /// A scoped ScopedAccess for working under the gate lock. All queue mutation and - /// state reads go through this ScopedAccess, ensuring the lock is held. Disposing + /// A scoped ScopedAccess for working under the gate lock. All queue mutation + /// goes through this ScopedAccess, ensuring the lock is held. Disposing /// releases the lock and triggers delivery if needed. /// public ref struct ScopedAccess @@ -200,20 +166,11 @@ internal ScopedAccess(DeliveryQueue owner) owner.EnterLock(); } - /// - /// Gets the number of pending items that were enqueued with - /// countAsPending: true and have not yet been dequeued for delivery. - /// - public readonly int PendingCount => _owner?._pendingCount ?? 0; - /// /// Adds an item to the queue. Ignored if the queue has been terminated. /// /// The item to enqueue. - /// True if this item should be tracked by - /// . The count is automatically decremented - /// when the item is dequeued for delivery. - public readonly void Enqueue(TItem item, bool countAsPending = false) => _owner?.EnqueueItem(item, countAsPending); + public readonly void Enqueue(TItem item) => _owner?.EnqueueItem(item); /// /// Releases the gate lock and delivers pending items if this thread @@ -231,40 +188,4 @@ public void Dispose() owner.ExitLockAndDeliver(); } } - - /// - /// A read-only scoped handle for reading queue state under the gate lock. - /// Cannot enqueue items and does not trigger delivery on dispose. - /// - public ref struct ReadOnlyScopedAccess - { - private DeliveryQueue? _owner; - - internal ReadOnlyScopedAccess(DeliveryQueue owner) - { - _owner = owner; - owner.EnterLock(); - } - - /// - /// Gets the number of pending items that were enqueued with - /// countAsPending: true and have not yet been dequeued for delivery. - /// - public readonly int PendingCount => _owner?._pendingCount ?? 0; - - /// - /// Releases the gate lock. Does not trigger delivery. - /// - public void Dispose() - { - var owner = _owner; - if (owner is null) - { - return; - } - - _owner = null; - owner.ExitLock(); - } - } } From 97b721af583d8ebfe640debe55c40c739ae1b7fa Mon Sep 17 00:00:00 2001 From: "Darrin W. Cullop" Date: Tue, 7 Apr 2026 12:06:21 -0700 Subject: [PATCH 08/16] Support .NET 9+ locking in SwappableLock's SwapTo method Add conditional logic for .NET 9.0+ in SwappableLock to handle both _gate and _lockGate fields. SwapTo now checks both fields for initialization and releases the appropriate lock type, ensuring compatibility with new locking mechanisms while preserving legacy behavior. --- src/DynamicData/Internal/SwappableLock.cs | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/src/DynamicData/Internal/SwappableLock.cs b/src/DynamicData/Internal/SwappableLock.cs index 0176505ee..5f1f47599 100644 --- a/src/DynamicData/Internal/SwappableLock.cs +++ b/src/DynamicData/Internal/SwappableLock.cs @@ -28,14 +28,29 @@ public static SwappableLock CreateAndEnter(Lock gate) public void SwapTo(object gate) { +#if NET9_0_OR_GREATER + if (_gate is null && _lockGate is null) + throw new InvalidOperationException("Lock is not initialized"); +#else if (_gate is null) throw new InvalidOperationException("Lock is not initialized"); +#endif var hasNewLock = false; Monitor.Enter(gate, ref hasNewLock); +#if NET9_0_OR_GREATER + if (_lockGate is not null) + { + _lockGate.Exit(); + _lockGate = null; + } + else +#endif if (_hasLock) - Monitor.Exit(_gate); + { + Monitor.Exit(_gate!); + } _hasLock = hasNewLock; _gate = gate; From 7625ac26d642c3b44a4bee68f1810d7948637e3f Mon Sep 17 00:00:00 2001 From: "Darrin W. Cullop" Date: Tue, 7 Apr 2026 12:29:40 -0700 Subject: [PATCH 09/16] Use |= to accumulate expiration changes correctly Previously, haveExpirationsChanged was overwritten by each call to TrySetExpiration, potentially losing information about prior changes. Now, the |= operator is used to ensure haveExpirationsChanged remains true if any expiration update occurs, preserving the correct state across multiple updates. --- src/DynamicData/Cache/Internal/ExpireAfter.ForSource.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/DynamicData/Cache/Internal/ExpireAfter.ForSource.cs b/src/DynamicData/Cache/Internal/ExpireAfter.ForSource.cs index e97b9861f..60bcd9de2 100644 --- a/src/DynamicData/Cache/Internal/ExpireAfter.ForSource.cs +++ b/src/DynamicData/Cache/Internal/ExpireAfter.ForSource.cs @@ -280,7 +280,7 @@ private void OnSourceNext(IChangeSet changes) { if (_timeSelector.Invoke(change.Current) is { } expireAfter) { - haveExpirationsChanged = TrySetExpiration( + haveExpirationsChanged |= TrySetExpiration( key: change.Key, dueTime: now + expireAfter); } From 5e4ad0e132aa0a6572e72a66e022af610a14f94b Mon Sep 17 00:00:00 2001 From: "Darrin W. Cullop" Date: Tue, 7 Apr 2026 12:29:58 -0700 Subject: [PATCH 10/16] Refactor DeliveryQueue exception handling logic Moved _isDelivering reset from finally to catch block in DeliveryQueue. Now, the flag is only reset when an exception occurs, and the exception is rethrown, making the error handling more explicit and preventing unnecessary state changes during normal execution. --- src/DynamicData/Internal/DeliveryQueue.cs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/DynamicData/Internal/DeliveryQueue.cs b/src/DynamicData/Internal/DeliveryQueue.cs index 6aa65421c..3110cdc6e 100644 --- a/src/DynamicData/Internal/DeliveryQueue.cs +++ b/src/DynamicData/Internal/DeliveryQueue.cs @@ -139,7 +139,7 @@ void DeliverAll() } } } - finally + catch { // Safety net: if an exception bypassed the normal exit paths, // ensure _isDelivering is reset so the queue doesn't get stuck. @@ -147,6 +147,8 @@ void DeliverAll() { _isDelivering = false; } + + throw; } } } From a92b59636be65b82a6a7afb41a3f570c1622843b Mon Sep 17 00:00:00 2001 From: "Darrin W. Cullop" Date: Tue, 7 Apr 2026 15:44:23 -0700 Subject: [PATCH 11/16] Fix MergeMany stress test timing for queue-based delivery The MultiThreadedStressTest asserts immediately after stress observables complete, but with drain-outside-lock delivery, Edit() returns after enqueueing while delivery may still be in-flight on another thread. Add a short delay before checking results to allow in-flight deliveries to complete. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../Cache/MergeManyChangeSetsCacheSourceCompareFixture.cs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheSourceCompareFixture.cs b/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheSourceCompareFixture.cs index 33db06f05..6001f076a 100644 --- a/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheSourceCompareFixture.cs +++ b/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheSourceCompareFixture.cs @@ -118,6 +118,11 @@ IObservable AddRemovePrices(Market market, int priceCount, int para } while (adding); + // Allow any in-flight notification deliveries to complete before checking results. + // With the queue-based drain pattern, Edit() returns after enqueueing but delivery + // may still be in progress on another thread. Give the drain a moment to finish. + await Task.Delay(250).ConfigureAwait(false); + // Verify the results CheckResultContents(_marketCacheResults, priceResults, Market.RatingCompare); } From 92f1afe9f8c3a6a37f062dc52bc0e128cb5c7a54 Mon Sep 17 00:00:00 2001 From: "Darrin W. Cullop" Date: Tue, 7 Apr 2026 23:09:45 -0700 Subject: [PATCH 12/16] Update src/DynamicData.Tests/Cache/SourceCacheFixture.cs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/DynamicData.Tests/Cache/SourceCacheFixture.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/DynamicData.Tests/Cache/SourceCacheFixture.cs b/src/DynamicData.Tests/Cache/SourceCacheFixture.cs index ede51d114..84c703ca8 100644 --- a/src/DynamicData.Tests/Cache/SourceCacheFixture.cs +++ b/src/DynamicData.Tests/Cache/SourceCacheFixture.cs @@ -251,7 +251,7 @@ public async Task DirectCrossWriteDoesNotDeadlock() .Transform(static (item, _) => new TestItem("from-b-" + item.Key, item.Value)) .PopulateInto(cacheA); - var barrier = new Barrier(2); + using var barrier = new Barrier(2); var taskA = Task.Run(() => { From 7544f8c13398fe23cc08f63598a5bd2f64d8a6a5 Mon Sep 17 00:00:00 2001 From: "Darrin W. Cullop" Date: Tue, 7 Apr 2026 23:49:35 -0700 Subject: [PATCH 13/16] Improve test reliability and ObservableCache disposal safety - Publish and explicitly connect merged observable in test, and await completion of all notifications for robust result verification. - Move _suspensionTracker disposal outside lock in ObservableCache to prevent deadlocks and reentrancy issues. - Add System.Reactive.Threading.Tasks import for ToTask() usage. --- .../MergeManyChangeSetsCacheSourceCompareFixture.cs | 11 ++++++----- src/DynamicData/Cache/ObservableCache.cs | 13 +++++-------- 2 files changed, 11 insertions(+), 13 deletions(-) diff --git a/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheSourceCompareFixture.cs b/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheSourceCompareFixture.cs index 6001f076a..ce6cc89fd 100644 --- a/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheSourceCompareFixture.cs +++ b/src/DynamicData.Tests/Cache/MergeManyChangeSetsCacheSourceCompareFixture.cs @@ -5,6 +5,7 @@ using System.Reactive.Disposables; using System.Reactive; using System.Reactive.Linq; +using System.Reactive.Threading.Tasks; using System.Threading.Tasks; using Bogus; using DynamicData.Kernel; @@ -90,9 +91,11 @@ IObservable AddRemovePrices(Market market, int priceCount, int para .Parallelize(priceCount, parallel, obs => obs.StressAddRemove(market.PricesCache, _ => GetRemoveTime(), scheduler)) .Finally(market.PricesCache.Dispose); - var merged = _marketCache.Connect().MergeManyChangeSets(market => market.LatestPrices, Market.RatingCompare, resortOnSourceRefresh: true); + var merged = _marketCache.Connect().MergeManyChangeSets(market => market.LatestPrices, Market.RatingCompare, resortOnSourceRefresh: true).Publish(); var adding = true; + var cacheCompleted = merged.LastOrDefaultAsync().ToTask(); using var priceResults = merged.AsAggregator(); + using var connect = merged.Connect(); // Start asynchrononously modifying the parent list and the child lists using var addingSub = AddRemoveStress(marketCount, priceCount, Environment.ProcessorCount, TaskPoolScheduler.Default) @@ -118,10 +121,8 @@ IObservable AddRemovePrices(Market market, int priceCount, int para } while (adding); - // Allow any in-flight notification deliveries to complete before checking results. - // With the queue-based drain pattern, Edit() returns after enqueueing but delivery - // may still be in progress on another thread. Give the drain a moment to finish. - await Task.Delay(250).ConfigureAwait(false); + // Wait for the source cache to finish delivering all notifications. + await cacheCompleted; // Verify the results CheckResultContents(_marketCacheResults, priceResults, Market.RatingCompare); diff --git a/src/DynamicData/Cache/ObservableCache.cs b/src/DynamicData/Cache/ObservableCache.cs index 6568efab1..c1b48658c 100644 --- a/src/DynamicData/Cache/ObservableCache.cs +++ b/src/DynamicData/Cache/ObservableCache.cs @@ -318,12 +318,11 @@ private bool DeliverNotification(NotificationItem item) _countChanged.Value.OnCompleted(); } + // Dispose outside lock — BehaviorSubject.OnCompleted runs observers + // synchronously which could execute subscriber code under the lock. if (_suspensionTracker.IsValueCreated) { - lock (_locker) - { - _suspensionTracker.Value.Dispose(); - } + _suspensionTracker.Value.Dispose(); } return false; @@ -337,12 +336,10 @@ private bool DeliverNotification(NotificationItem item) _countChanged.Value.OnError(item.Error!); } + // Dispose outside lock — same reasoning as Completed path above. if (_suspensionTracker.IsValueCreated) { - lock (_locker) - { - _suspensionTracker.Value.Dispose(); - } + _suspensionTracker.Value.Dispose(); } return false; From 836d8f3206ca9002d37e1f8faecacf2f503f3929 Mon Sep 17 00:00:00 2001 From: "Darrin W. Cullop" Date: Wed, 8 Apr 2026 10:13:42 -0700 Subject: [PATCH 14/16] Prevent duplicate notifications on Connect during delivery Fix race where new subscribers could see duplicate Add notifications if they connect while in-flight changes are being delivered. Introduce a versioning mechanism in ObservableCache to track committed and delivered notifications, and skip already-delivered changes for new subscribers. Extend NotificationItem with a version field and add read-only lock support in DeliveryQueue. Update test to reliably reproduce and verify the fix. --- .../Cache/SourceCacheFixture.cs | 71 ++++++++---- src/DynamicData/Cache/ObservableCache.cs | 102 ++++++++++-------- src/DynamicData/Internal/DeliveryQueue.cs | 45 ++++++++ 3 files changed, 154 insertions(+), 64 deletions(-) diff --git a/src/DynamicData.Tests/Cache/SourceCacheFixture.cs b/src/DynamicData.Tests/Cache/SourceCacheFixture.cs index 84c703ca8..43bd48378 100644 --- a/src/DynamicData.Tests/Cache/SourceCacheFixture.cs +++ b/src/DynamicData.Tests/Cache/SourceCacheFixture.cs @@ -281,49 +281,78 @@ public async Task DirectCrossWriteDoesNotDeadlock() [Fact] public void ConnectDuringDeliveryDoesNotDuplicate() { - // Proves the dequeue-to-delivery gap. A slow subscriber holds delivery - // while another thread connects. The new subscriber's snapshot is taken - // under the lock, so it will not see a duplicate Add. + // Exploits the dequeue-to-OnNext window. Thread A writes two items in + // separate batches. The first delivery is held by a slow subscriber. + // While item1 delivery is blocked, item2 is committed to ReaderWriter + // and sitting in the queue. Thread B calls Connect(), takes a snapshot + // (sees both items), subscribes to _changes, then item2 is delivered + // via OnNext — producing a duplicate if not guarded by a generation counter. using var cache = new SourceCache(static x => x.Key); - var delivering = new ManualResetEventSlim(false); - var connectDone = new ManualResetEventSlim(false); + using var delivering = new ManualResetEventSlim(false); + using var item2Written = new ManualResetEventSlim(false); + using var connectDone = new ManualResetEventSlim(false); - // First subscriber: signals when delivery starts, then waits + var firstDelivery = true; + + // First subscriber: blocks on the first delivery to create the window using var slowSub = cache.Connect().Subscribe(_ => { - delivering.Set(); - connectDone.Wait(TimeSpan.FromSeconds(5)); + if (firstDelivery) + { + firstDelivery = false; + delivering.Set(); + + // Wait until item2 has been written and the Connect has subscribed + connectDone.Wait(TimeSpan.FromSeconds(5)); + } + }); + + // Write item1 on a background thread — delivery starts, slow subscriber blocks + var writeTask = Task.Run(() => + { + cache.AddOrUpdate(new TestItem("k1", "v1")); }); - // Write one item -- delivery starts, slow subscriber blocks - var writeTask = Task.Run(() => cache.AddOrUpdate(new TestItem("k1", "v1"))); + // Wait for delivery of item1 to be in progress (slow sub is blocking) + delivering.Wait(TimeSpan.FromSeconds(5)).Should().BeTrue("delivery should have started"); - // Wait for delivery to be in progress - delivering.Wait(TimeSpan.FromSeconds(5)); + // Now write item2 on another thread. It will acquire the lock, commit to + // ReaderWriter, enqueue a notification, and return. The notification sits + // in the queue because the deliverer (Thread A) is blocked by the slow sub. + var writeTask2 = Task.Run(() => + { + cache.AddOrUpdate(new TestItem("k2", "v2")); + item2Written.Set(); + }); + item2Written.Wait(TimeSpan.FromSeconds(5)).Should().BeTrue("item2 should have been written"); - // Now Connect on the main thread while delivery is in progress. - // The item is already committed to ReaderWriter and dequeued from - // the delivery queue, but OnNext hasn't finished iterating observers. - var duplicateKeys = new List(); + // Now Connect on the main thread. The snapshot from ReaderWriter includes + // BOTH k1 and k2. The subscription to _changes is added. When the slow + // subscriber unblocks, item2's notification will be delivered via OnNext + // and the new subscriber will see k2 again — a duplicate Add. + var addCounts = new Dictionary(); using var newSub = cache.Connect().Subscribe(changes => { foreach (var c in changes) { if (c.Reason == ChangeReason.Add) { - duplicateKeys.Add(c.Current.Key); + var key = c.Current.Key; + addCounts[key] = addCounts.GetValueOrDefault(key) + 1; } } }); - // Let delivery finish + // Unblock the slow subscriber — delivery resumes, item2 delivered connectDone.Set(); writeTask.Wait(TimeSpan.FromSeconds(5)); + writeTask2.Wait(TimeSpan.FromSeconds(5)); - // Check: k1 should appear exactly once (either in snapshot or stream, not both) - var k1Count = duplicateKeys.Count(k => k == "k1"); - k1Count.Should().Be(1, "k1 should appear exactly once via Connect, not duplicated from snapshot + in-flight delivery"); + // Each key should appear exactly once in the new subscriber's view + addCounts.GetValueOrDefault("k1").Should().Be(1, "k1 should appear once (snapshot only)"); + addCounts.GetValueOrDefault("k2").Should().Be(1, "k2 should appear once, not duplicated from snapshot + queued delivery"); } + private sealed record TestItem(string Key, string Value); } diff --git a/src/DynamicData/Cache/ObservableCache.cs b/src/DynamicData/Cache/ObservableCache.cs index c1b48658c..669c07dbf 100644 --- a/src/DynamicData/Cache/ObservableCache.cs +++ b/src/DynamicData/Cache/ObservableCache.cs @@ -6,6 +6,7 @@ using System.Reactive.Disposables; using System.Reactive.Linq; using System.Reactive.Subjects; +using System.Threading; using DynamicData.Binding; using DynamicData.Cache; using DynamicData.Cache.Internal; @@ -43,6 +44,10 @@ internal sealed class ObservableCache : IObservableCache> source) { _readerWriter = new ReaderWriter(); @@ -59,7 +64,7 @@ public ObservableCache(IObservable> source) if (changes is not null) { - notifications.Enqueue(NotificationItem.CreateChanges(changes, _readerWriter.Count)); + notifications.Enqueue(NotificationItem.CreateChanges(changes, _readerWriter.Count, ++_currentVersion)); } }, NotifyError, @@ -186,7 +191,7 @@ internal void UpdateFromIntermediate(Action> update if (changes is not null && _editLevel == 0) { - notifications.Enqueue(NotificationItem.CreateChanges(changes, _readerWriter.Count)); + notifications.Enqueue(NotificationItem.CreateChanges(changes, _readerWriter.Count, ++_currentVersion)); } } @@ -213,7 +218,7 @@ internal void UpdateFromSource(Action> updateActio if (changes is not null && _editLevel == 0) { - notifications.Enqueue(NotificationItem.CreateChanges(changes, _readerWriter.Count)); + notifications.Enqueue(NotificationItem.CreateChanges(changes, _readerWriter.Count, ++_currentVersion)); } } @@ -221,53 +226,64 @@ private IObservable> CreateConnectObservable(Func>( observer => { - // Subject snapshots its observer array before iterating OnNext, so a - // subscriber added here will not receive any in-flight notification. - lock (_locker) - { - var initial = InternalEx.Return(() => (IChangeSet)GetInitialUpdates(predicate)); - var changes = initial.Concat(_changes); + using var readLock = _notifications.AcquireReadLock(); - if (predicate != null) - { - changes = changes.Filter(predicate, suppressEmptyChangeSets); - } - else if (suppressEmptyChangeSets) - { - changes = changes.NotEmpty(); - } + var initial = InternalEx.Return(() => (IChangeSet)GetInitialUpdates(predicate)); + + // The current snapshot may contain changes that have been made but the notifications + // have yet to be delivered. We need to filter those out to avoid delivering an update + // that has already been applied (but detect this possiblity and skip filtering unless absolutely needed) + var snapshotVersion = _currentVersion; + var changes = readLock.HasPending + ? _changes.SkipWhile(_ => Volatile.Read(ref _currentDeliveryVersion) <= snapshotVersion) + : (IObservable>)_changes; - return changes.SubscribeSafe(observer); + changes = initial.Concat(changes); + + if (predicate != null) + { + changes = changes.Filter(predicate, suppressEmptyChangeSets); } + else if (suppressEmptyChangeSets) + { + changes = changes.NotEmpty(); + } + + return changes.SubscribeSafe(observer); }); private IObservable> CreateWatchObservable(TKey key) => Observable.Create>( observer => { - // Subject snapshots its observer array before iterating OnNext, so a - // subscriber added here will not receive any in-flight notification. - lock (_locker) + using var readLock = _notifications.AcquireReadLock(); + + var initial = _readerWriter.Lookup(key); + if (initial.HasValue) { - var initial = _readerWriter.Lookup(key); - if (initial.HasValue) - { - observer.OnNext(new Change(ChangeReason.Add, key, initial.Value)); - } + observer.OnNext(new Change(ChangeReason.Add, key, initial.Value)); + } + + // The current snapshot may contain changes that have been made but the notifications + // have yet to be delivered. We need to filter those out to avoid delivering an update + // that has already been applied (but detect this possiblity and skip filtering unless absolutely needed) + var snapshotVersion = _currentVersion; + var changes = readLock.HasPending + ? _changes.SkipWhile(_ => Volatile.Read(ref _currentDeliveryVersion) <= snapshotVersion) + : _changes; - return _changes.Finally(observer.OnCompleted).Subscribe( - changes => + return changes.Finally(observer.OnCompleted).Subscribe( + changes => + { + foreach (var change in changes) { - foreach (var change in changes.ToConcreteType()) + var match = EqualityComparer.Default.Equals(change.Key, key); + if (match) { - var match = EqualityComparer.Default.Equals(change.Key, key); - if (match) - { - observer.OnNext(change); - } + observer.OnNext(change); } - }); - } + } + }); }); /// @@ -318,8 +334,7 @@ private bool DeliverNotification(NotificationItem item) _countChanged.Value.OnCompleted(); } - // Dispose outside lock — BehaviorSubject.OnCompleted runs observers - // synchronously which could execute subscriber code under the lock. + // Dispose outside lock because it fires OnCompleted if (_suspensionTracker.IsValueCreated) { _suspensionTracker.Value.Dispose(); @@ -336,7 +351,7 @@ private bool DeliverNotification(NotificationItem item) _countChanged.Value.OnError(item.Error!); } - // Dispose outside lock — same reasoning as Completed path above. + // Dispose outside lock because it fires OnCompleted if (_suspensionTracker.IsValueCreated) { _suspensionTracker.Value.Dispose(); @@ -349,6 +364,7 @@ private bool DeliverNotification(NotificationItem item) return true; default: + Volatile.Write(ref _currentDeliveryVersion, item.Version); EmitChanges(item.Changes); EmitCount(item.Count); return true; @@ -411,7 +427,7 @@ private void ResumeNotifications() var (changes, emitResume) = _suspensionTracker.Value.ResumeNotifications(); if (changes is not null) { - notifications.Enqueue(NotificationItem.CreateChanges(changes, _readerWriter.Count)); + notifications.Enqueue(NotificationItem.CreateChanges(changes, _readerWriter.Count, ++_currentVersion)); } if (!emitResume) @@ -433,10 +449,10 @@ private enum NotificationKind Error, } - private readonly record struct NotificationItem(NotificationKind Kind, ChangeSet Changes, int Count = 0, Exception? Error = null) + private readonly record struct NotificationItem(NotificationKind Kind, ChangeSet Changes, int Count = 0, long Version = 0, Exception? Error = null) { - public static NotificationItem CreateChanges(ChangeSet changes, int count) => - new(NotificationKind.Changes, changes, count); + public static NotificationItem CreateChanges(ChangeSet changes, int count, long version) => + new(NotificationKind.Changes, changes, count, version); public static NotificationItem CreateCountOnly(int count) => new(NotificationKind.CountOnly, [], count); diff --git a/src/DynamicData/Internal/DeliveryQueue.cs b/src/DynamicData/Internal/DeliveryQueue.cs index 3110cdc6e..284e6c576 100644 --- a/src/DynamicData/Internal/DeliveryQueue.cs +++ b/src/DynamicData/Internal/DeliveryQueue.cs @@ -54,6 +54,13 @@ public DeliveryQueue(object gate, Func deliver) /// public ScopedAccess AcquireLock() => new(this); + /// + /// Acquires the gate and returns a read-only scoped access for inspecting + /// queue state. No mutation is possible and disposing does not trigger + /// delivery — the lock is simply released. + /// + public ReadOnlyScopedAccess AcquireReadLock() => new(this); + #if NET9_0_OR_GREATER private void EnterLock() => _gate.Enter(); @@ -190,4 +197,42 @@ public void Dispose() owner.ExitLockAndDeliver(); } } + + /// + /// A read-only scoped access for inspecting queue state under the gate lock. + /// No mutation is possible. Disposing releases the lock without triggering + /// delivery. + /// + public ref struct ReadOnlyScopedAccess + { + private DeliveryQueue? _owner; + + internal ReadOnlyScopedAccess(DeliveryQueue owner) + { + _owner = owner; + owner.EnterLock(); + } + + /// + /// Gets whether there are notifications pending delivery (queued or + /// currently being delivered outside the lock). + /// + public readonly bool HasPending => + _owner is not null && (_owner._queue.Count > 0 || _owner._isDelivering); + + /// + /// Releases the gate lock. Does not trigger delivery. + /// + public void Dispose() + { + var owner = _owner; + if (owner is null) + { + return; + } + + _owner = null; + owner.ExitLock(); + } + } } From b5c7862f04443e05a08293098d0fe1cad79c1f5a Mon Sep 17 00:00:00 2001 From: "Darrin W. Cullop" Date: Wed, 8 Apr 2026 12:40:50 -0700 Subject: [PATCH 15/16] Improve suspend/resume notification handling and tests Add comprehensive tests for nested and concurrent suspend/resume scenarios in SuspendNotificationsFixture. Emit resume signals under lock in ObservableCache to prevent race conditions and ensure consistent notification delivery. These changes enhance reliability and determinism of notification delivery under complex and concurrent usage patterns. --- .../Cache/SuspendNotificationsFixture.cs | 229 ++++++++++++++++++ src/DynamicData/Cache/ObservableCache.cs | 27 +-- 2 files changed, 242 insertions(+), 14 deletions(-) diff --git a/src/DynamicData.Tests/Cache/SuspendNotificationsFixture.cs b/src/DynamicData.Tests/Cache/SuspendNotificationsFixture.cs index e73de850e..0911cbaa4 100644 --- a/src/DynamicData.Tests/Cache/SuspendNotificationsFixture.cs +++ b/src/DynamicData.Tests/Cache/SuspendNotificationsFixture.cs @@ -2,6 +2,7 @@ using System.Collections.Generic; using System.Linq; using System.Reactive.Linq; +using System.Threading; using System.Threading.Tasks; using DynamicData.Kernel; using FluentAssertions; @@ -352,6 +353,234 @@ public async Task SuspensionsAreThreadSafe() _results.Messages[0].Adds.Should().Be(100, "Should have 100 adds"); } + [Fact] + public void ResumeThenReSuspendDeliversFirstBatchOnly() + { + // Forces the ordering: resume completes before re-suspend. + // The deferred subscriber activates with the first batch snapshot, + // then re-suspend holds the second batch until final resume. + using var cache = new SourceCache(static x => x); + var dataSet1 = Enumerable.Range(0, 100).ToList(); + var dataSet2 = Enumerable.Range(1000, 100).ToList(); + var allData = dataSet1.Concat(dataSet2).ToList(); + + var suspend1 = cache.SuspendNotifications(); + cache.AddOrUpdate(dataSet1); + + using var results = cache.Connect().AsAggregator(); + results.Messages.Count.Should().Be(0, "no messages during suspension"); + + // Resume first — subscriber activates + suspend1.Dispose(); + + results.Messages.Count.Should().Be(1, "exactly one message after resume"); + results.Messages[0].Adds.Should().Be(dataSet1.Count, $"snapshot should have {dataSet1.Count} adds"); + results.Messages[0].Removes.Should().Be(0, "no removes"); + results.Messages[0].Updates.Should().Be(0, "no updates"); + results.Messages[0].Select(x => x.Key).Should().Equal(dataSet1, "snapshot should contain first batch keys"); + + // Re-suspend, write second batch + var suspend2 = cache.SuspendNotifications(); + cache.AddOrUpdate(dataSet2); + + results.Messages.Count.Should().Be(1, "still one message — second batch held by suspension"); + results.Summary.Overall.Adds.Should().Be(dataSet1.Count, $"still {dataSet1.Count} adds total"); + + // Final resume + suspend2.Dispose(); + + results.Messages.Count.Should().Be(2, "two messages total"); + results.Messages[1].Adds.Should().Be(dataSet2.Count, $"second message has {dataSet2.Count} adds"); + results.Messages[1].Removes.Should().Be(0, "no removes in second message"); + results.Messages[1].Updates.Should().Be(0, "no updates in second message"); + results.Messages[1].Select(x => x.Key).Should().Equal(dataSet2, "snapshot should contain first batch keys"); + + results.Summary.Overall.Adds.Should().Be(allData.Count, $"exactly {allData.Count} adds total"); + results.Summary.Overall.Removes.Should().Be(0, "no removes"); + results.Data.Count.Should().Be(allData.Count, $"{allData.Count} items in final state"); + results.Error.Should().BeNull(); + results.IsCompleted.Should().BeFalse(); + } + + [Fact] + public void ReSuspendThenResumeDeliversAllInSingleBatch() + { + // Forces the ordering: re-suspend before resume. + // Suspend count goes 1→2→1, no resume signal fires. + // Both batches accumulate and arrive as a single changeset on final resume. + using var cache = new SourceCache(static x => x); + var dataSet1 = Enumerable.Range(0, 100).ToList(); + var dataSet2 = Enumerable.Range(1000, 100).ToList(); + var allData = dataSet1.Concat(dataSet2).ToList(); + + var suspend1 = cache.SuspendNotifications(); + cache.AddOrUpdate(dataSet1); + + using var results = cache.Connect().AsAggregator(); + results.Messages.Count.Should().Be(0, "no messages during suspension"); + + // Re-suspend first — count goes 1→2 + var suspend2 = cache.SuspendNotifications(); + + // Resume first suspend — count goes 2→1, still suspended + suspend1.Dispose(); + + results.Messages.Count.Should().Be(0, "no messages — still suspended (count=1)"); + results.Summary.Overall.Adds.Should().Be(0, "no adds — still suspended"); + + // Write second batch while still suspended + cache.AddOrUpdate(dataSet2); + + results.Messages.Count.Should().Be(0, "still no messages"); + + // Final resume — count goes 1→0 + suspend2.Dispose(); + + results.Messages.Count.Should().Be(1, "single message with all data"); + results.Messages[0].Adds.Should().Be(allData.Count, $"all {allData.Count} items in one changeset"); + results.Messages[0].Removes.Should().Be(0, "no removes"); + results.Messages[0].Updates.Should().Be(0, "no updates"); + results.Messages[0].Select(c => c.Key).OrderBy(k => k).Should().Equal(allData, "should contain both batches in order"); + + results.Summary.Overall.Adds.Should().Be(allData.Count, $"exactly {allData.Count} adds total"); + results.Summary.Overall.Removes.Should().Be(0, "no removes"); + results.Summary.Overall.Updates.Should().Be(0, "no updates"); + results.Data.Count.Should().Be(allData.Count, $"{allData.Count} items in final state"); + results.Error.Should().BeNull(); + results.IsCompleted.Should().BeFalse(); + } + + [Fact] + public async Task ConcurrentSuspendDuringResumeDoesNotCorrupt() + { + // Stress test: races resume against re-suspend on two threads. + // Both orderings are correct (tested deterministically above). + // This test verifies no corruption, deadlocks, or data loss under contention. + const int iterations = 200; + var dataSet1 = Enumerable.Range(0, 100).ToList(); + var dataSet2 = Enumerable.Range(1000, 100).ToList(); + var allData = dataSet1.Concat(dataSet2).ToList(); + + for (var iter = 0; iter < iterations; iter++) + { + using var cache = new SourceCache(static x => x); + + var suspend1 = cache.SuspendNotifications(); + cache.AddOrUpdate(dataSet1); + using var results = cache.Connect().AsAggregator(); + + using var barrier = new Barrier(2); + var resumeTask = Task.Run(() => + { + barrier.SignalAndWait(); + suspend1.Dispose(); + }); + + var reSuspendTask = Task.Run(() => + { + barrier.SignalAndWait(); + return cache.SuspendNotifications(); + }); + + await Task.WhenAll(resumeTask, reSuspendTask); + var suspend2 = await reSuspendTask; + + cache.AddOrUpdate(dataSet2); + suspend2.Dispose(); + + results.Summary.Overall.Adds.Should().Be(allData.Count, $"iteration {iter}: exactly {allData.Count} adds"); + results.Summary.Overall.Removes.Should().Be(0, $"iteration {iter}: no removes"); + results.Summary.Overall.Updates.Should().Be(0, $"iteration {iter}: no updates because keys don't overlap"); + results.Data.Count.Should().Be(allData.Count, $"iteration {iter}: {allData.Count} items in final state"); + results.Data.Keys.OrderBy(k => k).Should().Equal(allData, $"iteration {iter}: all keys present in order"); + results.Error.Should().BeNull($"iteration {iter}: no errors"); + results.IsCompleted.Should().BeFalse($"iteration {iter}: not completed"); + } + } + + [Fact] + public async Task ResumeSignalUnderLockPreventsStaleSnapshotFromReSuspend() + { + // Verifies that a deferred Connect subscriber never sees data written during + // a re-suspension. The resume signal fires under the lock (reentrant), so the + // deferred subscriber activates and takes its snapshot before any other thread + // can re-suspend or write new data. + // + // A slow first subscriber blocks delivery of accumulated changes, creating a + // window where the main thread re-suspends and writes a second batch. The + // deferred subscriber's snapshot must contain only the first batch. + using var cache = new SourceCache(static x => x); + var dataSet1 = Enumerable.Range(0, 100).ToList(); + var dataSet2 = Enumerable.Range(1000, 100).ToList(); + var allData = dataSet1.Concat(dataSet2).ToList(); + + using var delivering = new SemaphoreSlim(0, 1); + using var proceedWithResuspend = new SemaphoreSlim(0, 1); + + var suspend1 = cache.SuspendNotifications(); + cache.AddOrUpdate(dataSet1); + + // First subscriber blocks on delivery to hold the delivery thread + var firstDelivery = true; + using var slowSub = cache.Connect().Subscribe(_ => + { + if (firstDelivery) + { + firstDelivery = false; + delivering.Release(); + proceedWithResuspend.Wait(TimeSpan.FromSeconds(5)); + } + }); + + // Deferred subscriber — will activate when resume signal fires + using var results = cache.Connect().AsAggregator(); + results.Messages.Count.Should().Be(0, "no messages during suspension"); + + // Resume on background thread — delivery blocks on slow subscriber + var resumeTask = Task.Run(() => suspend1.Dispose()); + (await delivering.WaitAsync(TimeSpan.FromSeconds(5))).Should().BeTrue("delivery should have started"); + + // Re-suspend and write second batch while delivery is blocked + var suspend2 = cache.SuspendNotifications(); + cache.AddOrUpdate(dataSet2); + + // dataSet2 must not appear in any message received so far + foreach (var msg in results.Messages) + { + foreach (var change in msg) + { + change.Key.Should().BeInRange(0, 99, + "deferred subscriber should only have first-batch keys before second resume"); + } + } + + // Unblock delivery + proceedWithResuspend.Release(); + await resumeTask; + + // Only dataSet1 should have been delivered — dataSet2 is held by second suspension + results.Summary.Overall.Adds.Should().Be(dataSet1.Count, + $"exactly {dataSet1.Count} adds before second resume — dataSet2 must be held by suspension"); + results.Messages.Should().HaveCount(1, "exactly one message (snapshot of dataSet1)"); + results.Messages[0].Adds.Should().Be(dataSet1.Count); + results.Messages[0].Select(c => c.Key).Should().Equal(dataSet1, + "snapshot should contain exactly first-batch keys in order"); + + // Resume second suspension — dataSet2 arrives now + suspend2.Dispose(); + + results.Summary.Overall.Adds.Should().Be(allData.Count, $"exactly {allData.Count} adds total"); + results.Summary.Overall.Removes.Should().Be(0, "no removes"); + results.Messages.Should().HaveCount(2, "two messages: snapshot + second batch"); + results.Messages[1].Adds.Should().Be(dataSet2.Count); + results.Messages[1].Select(c => c.Key).Should().Equal(dataSet2, + "second message should contain exactly second-batch keys in order"); + results.Data.Count.Should().Be(allData.Count); + results.Data.Keys.OrderBy(k => k).Should().Equal(allData); + results.Error.Should().BeNull(); + results.IsCompleted.Should().BeFalse(); + } + public void Dispose() { _source.Dispose(); diff --git a/src/DynamicData/Cache/ObservableCache.cs b/src/DynamicData/Cache/ObservableCache.cs index 669c07dbf..47421cd40 100644 --- a/src/DynamicData/Cache/ObservableCache.cs +++ b/src/DynamicData/Cache/ObservableCache.cs @@ -420,24 +420,23 @@ private void ResumeCount() private void ResumeNotifications() { - using (var notifications = _notifications.AcquireLock()) - { - Debug.Assert(_suspensionTracker.IsValueCreated, "Should not be Resuming Notifications without Suspend Notifications instance"); + using var notifications = _notifications.AcquireLock(); - var (changes, emitResume) = _suspensionTracker.Value.ResumeNotifications(); - if (changes is not null) - { - notifications.Enqueue(NotificationItem.CreateChanges(changes, _readerWriter.Count, ++_currentVersion)); - } + Debug.Assert(_suspensionTracker.IsValueCreated, "Should not be Resuming Notifications without Suspend Notifications instance"); - if (!emitResume) - { - return; - } + var (changes, emitResume) = _suspensionTracker.Value.ResumeNotifications(); + if (changes is not null) + { + notifications.Enqueue(NotificationItem.CreateChanges(changes, _readerWriter.Count, ++_currentVersion)); + } + + if (!emitResume) + { + return; } - // Emit the resume signal after releasing the lock so that deferred - // Connect/Watch subscribers are activated outside the lock scope. + // Emit the resume signal under the lock to eliminate the race where a concurrent + // SuspendNotifications could produce overlapping emissions. _suspensionTracker.Value.EmitResumeNotification(); } From b8e03b6b87ac3c9633d1ec3d0297121703b28f59 Mon Sep 17 00:00:00 2001 From: "Darrin W. Cullop" Date: Wed, 8 Apr 2026 13:53:40 -0700 Subject: [PATCH 16/16] Improve thread safety, tests, and notification delivery - Strengthen test reliability and clarify test names/messages - Rewrite DeliveryQueueFixture test for robust concurrency checks - Enhance ObservableCache to avoid duplicate/applied notifications - Refactor ResumeNotifications to prevent race conditions - Improve comments and code clarity throughout --- .../Cache/SourceCacheFixture.cs | 6 +- .../Cache/SuspendNotificationsFixture.cs | 2 +- .../Internal/DeliveryQueueFixture.cs | 57 +++++++++++++++---- src/DynamicData/Cache/ObservableCache.cs | 44 +++++++------- 4 files changed, 75 insertions(+), 34 deletions(-) diff --git a/src/DynamicData.Tests/Cache/SourceCacheFixture.cs b/src/DynamicData.Tests/Cache/SourceCacheFixture.cs index 43bd48378..ca46b77f5 100644 --- a/src/DynamicData.Tests/Cache/SourceCacheFixture.cs +++ b/src/DynamicData.Tests/Cache/SourceCacheFixture.cs @@ -193,7 +193,7 @@ public record class SomeObject(int Id, int Value); [Fact] - public async Task ConcurrentEditsShouldNotDeadlockWithSubscribersThatModifyOtherCaches() + public async Task MultiCacheFanInDoesNotDeadlock() { const int itemCount = 100; @@ -346,8 +346,8 @@ public void ConnectDuringDeliveryDoesNotDuplicate() // Unblock the slow subscriber — delivery resumes, item2 delivered connectDone.Set(); - writeTask.Wait(TimeSpan.FromSeconds(5)); - writeTask2.Wait(TimeSpan.FromSeconds(5)); + writeTask.Wait(TimeSpan.FromSeconds(5)).Should().BeTrue("writeTask should complete"); + writeTask2.Wait(TimeSpan.FromSeconds(5)).Should().BeTrue("writeTask2 should complete"); // Each key should appear exactly once in the new subscriber's view addCounts.GetValueOrDefault("k1").Should().Be(1, "k1 should appear once (snapshot only)"); diff --git a/src/DynamicData.Tests/Cache/SuspendNotificationsFixture.cs b/src/DynamicData.Tests/Cache/SuspendNotificationsFixture.cs index 0911cbaa4..db39604a5 100644 --- a/src/DynamicData.Tests/Cache/SuspendNotificationsFixture.cs +++ b/src/DynamicData.Tests/Cache/SuspendNotificationsFixture.cs @@ -393,7 +393,7 @@ public void ResumeThenReSuspendDeliversFirstBatchOnly() results.Messages[1].Adds.Should().Be(dataSet2.Count, $"second message has {dataSet2.Count} adds"); results.Messages[1].Removes.Should().Be(0, "no removes in second message"); results.Messages[1].Updates.Should().Be(0, "no updates in second message"); - results.Messages[1].Select(x => x.Key).Should().Equal(dataSet2, "snapshot should contain first batch keys"); + results.Messages[1].Select(x => x.Key).Should().Equal(dataSet2, "second message should contain second batch keys"); results.Summary.Overall.Adds.Should().Be(allData.Count, $"exactly {allData.Count} adds total"); results.Summary.Overall.Removes.Should().Be(0, "no removes"); diff --git a/src/DynamicData.Tests/Internal/DeliveryQueueFixture.cs b/src/DynamicData.Tests/Internal/DeliveryQueueFixture.cs index 276d74526..9aac32055 100644 --- a/src/DynamicData.Tests/Internal/DeliveryQueueFixture.cs +++ b/src/DynamicData.Tests/Internal/DeliveryQueueFixture.cs @@ -73,12 +73,32 @@ public async Task OnlyOneDelivererAtATime() { var concurrentCount = 0; var maxConcurrent = 0; - var queue = new DeliveryQueue(_gate, _ => + var deliveryCount = 0; + var delivered = new ConcurrentBag(); + using var firstDeliveryStarted = new ManualResetEventSlim(false); + using var allowFirstDeliveryToContinue = new ManualResetEventSlim(false); + using var startContenders = new ManualResetEventSlim(false); + + var queue = new DeliveryQueue(_gate, item => { var current = Interlocked.Increment(ref concurrentCount); - if (current > maxConcurrent) + int snapshot; + do + { + snapshot = maxConcurrent; + if (current <= snapshot) + { + break; + } + } + while (Interlocked.CompareExchange(ref maxConcurrent, current, snapshot) != snapshot); + + delivered.Add(item); + + if (Interlocked.Increment(ref deliveryCount) == 1) { - Interlocked.Exchange(ref maxConcurrent, current); + firstDeliveryStarted.Set(); + allowFirstDeliveryToContinue.Wait(); } Thread.SpinWait(1000); @@ -86,18 +106,33 @@ public async Task OnlyOneDelivererAtATime() return true; }); - using (var notifications = queue.AcquireLock()) - { - for (var i = 0; i < 100; i++) + // Start delivering the first item — it will block in the callback + var firstDelivery = Task.Run(() => EnqueueAndDeliver(queue, -1)); + firstDeliveryStarted.Wait(); + + // While first delivery is blocked, enqueue 100 items from concurrent threads + var enqueueTasks = Enumerable.Range(0, 100) + .Select(i => Task.Run(() => { - notifications.Enqueue(i); - } - } + startContenders.Wait(); + EnqueueAndDeliver(queue, i); + })); - var tasks = Enumerable.Range(0, 4).Select(_ => Task.Run(() => TriggerDelivery(queue))).ToArray(); - await Task.WhenAll(tasks); + var triggerTasks = Enumerable.Range(0, 4) + .Select(_ => Task.Run(() => + { + startContenders.Wait(); + TriggerDelivery(queue); + })); + + var tasks = enqueueTasks.Concat(triggerTasks).ToArray(); + startContenders.Set(); + allowFirstDeliveryToContinue.Set(); + + await Task.WhenAll(tasks.Append(firstDelivery)); maxConcurrent.Should().Be(1, "only one thread should be delivering at a time"); + delivered.Should().HaveCount(101); } [Fact] diff --git a/src/DynamicData/Cache/ObservableCache.cs b/src/DynamicData/Cache/ObservableCache.cs index 47421cd40..60bcbf07e 100644 --- a/src/DynamicData/Cache/ObservableCache.cs +++ b/src/DynamicData/Cache/ObservableCache.cs @@ -93,11 +93,15 @@ public ObservableCache(Func? keySelector = null) Observable.Create( observer => { - lock (_locker) - { - var source = _countChanged.Value.StartWith(_readerWriter.Count).DistinctUntilChanged(); - return source.SubscribeSafe(observer); - } + using var readLock = _notifications.AcquireReadLock(); + + var snapshotVersion = _currentVersion; + var countChanged = readLock.HasPending + ? _countChanged.Value.SkipWhile(_ => Volatile.Read(ref _currentDeliveryVersion) <= snapshotVersion) + : _countChanged.Value; + + var source = countChanged.StartWith(_readerWriter.Count).DistinctUntilChanged(); + return source.SubscribeSafe(observer); }); public IReadOnlyList Items => _readerWriter.Items; @@ -232,7 +236,7 @@ private IObservable> CreateConnectObservable(Func Volatile.Read(ref _currentDeliveryVersion) <= snapshotVersion) @@ -266,7 +270,7 @@ private IObservable> CreateWatchObservable(TKey key) => // The current snapshot may contain changes that have been made but the notifications // have yet to be delivered. We need to filter those out to avoid delivering an update - // that has already been applied (but detect this possiblity and skip filtering unless absolutely needed) + // that has already been applied (but detect this possibility and skip filtering unless absolutely needed) var snapshotVersion = _currentVersion; var changes = readLock.HasPending ? _changes.SkipWhile(_ => Volatile.Read(ref _currentDeliveryVersion) <= snapshotVersion) @@ -420,24 +424,26 @@ private void ResumeCount() private void ResumeNotifications() { - using var notifications = _notifications.AcquireLock(); - - Debug.Assert(_suspensionTracker.IsValueCreated, "Should not be Resuming Notifications without Suspend Notifications instance"); + bool emitResume; - var (changes, emitResume) = _suspensionTracker.Value.ResumeNotifications(); - if (changes is not null) + using (var notifications = _notifications.AcquireLock()) { - notifications.Enqueue(NotificationItem.CreateChanges(changes, _readerWriter.Count, ++_currentVersion)); + Debug.Assert(_suspensionTracker.IsValueCreated, "Should not be Resuming Notifications without Suspend Notifications instance"); + + (var changes, emitResume) = _suspensionTracker.Value.ResumeNotifications(); + if (changes is not null) + { + notifications.Enqueue(NotificationItem.CreateChanges(changes, _readerWriter.Count, ++_currentVersion)); + } } - if (!emitResume) + // Emit the resume signal after releasing the delivery scope so that + // accumulated changes are delivered first + if (emitResume) { - return; + using var readLock = _notifications.AcquireReadLock(); + _suspensionTracker.Value.EmitResumeNotification(); } - - // Emit the resume signal under the lock to eliminate the race where a concurrent - // SuspendNotifications could produce overlapping emissions. - _suspensionTracker.Value.EmitResumeNotification(); } private enum NotificationKind