diff --git a/src/Playground/Benchmark/OldTests.cs b/src/Playground/Benchmark/OldTests.cs index faa25db..4a562b9 100644 --- a/src/Playground/Benchmark/OldTests.cs +++ b/src/Playground/Benchmark/OldTests.cs @@ -125,7 +125,7 @@ public static void ShowBottomSegments(WriteAheadLogMode mode, int count) var iterate = false; if (iterate) { - var random = new Random(); + var random = Random.Shared; Parallel.For(0, 750000, (i) => { var key = random.Next(0, 999_999_999); diff --git a/src/Playground/DeadlockFinder.cs b/src/Playground/DeadlockFinder.cs index dd9214f..fe77124 100644 --- a/src/Playground/DeadlockFinder.cs +++ b/src/Playground/DeadlockFinder.cs @@ -20,7 +20,7 @@ void TreeLoop(int taskNo) const int upsertLogFrequency = 1_000_000; const int iterationLogFrequency = 10_000_000; const int iterationYieldFrequency = 1000; - var rand = new Random(); + var rand = Random.Shared; void resetTree() { diff --git a/src/ZoneTree.UnitTests/AtomicUpdateTests.cs b/src/ZoneTree.UnitTests/AtomicUpdateTests.cs index 156f681..050f85d 100644 --- a/src/ZoneTree.UnitTests/AtomicUpdateTests.cs +++ b/src/ZoneTree.UnitTests/AtomicUpdateTests.cs @@ -29,7 +29,7 @@ public void IntIntAtomicIncrement(WriteAheadLogMode walMode) } data.Maintenance.MoveMutableSegmentForward(); data.Maintenance.StartMergeOperation().Join(); - var random = new Random(); + var random = Random.Shared; var off = -1; Parallel.For(0, 1001, (x) => { @@ -100,7 +100,7 @@ public void IntIntAtomicIncrementForBTree(WriteAheadLogMode walMode) { data.Upsert(i, i + i); } - var random = new Random(); + var random = Random.Shared; var off = -1; Parallel.For(0, 1001, (x) => { @@ -166,7 +166,7 @@ public void IntIntMutableSegmentOnlyAtomicIncrement(WriteAheadLogMode walMode) { data.Upsert(i, i + i); } - var random = new Random(); + var random = Random.Shared; var off = -1; Parallel.For(0, 1001, (x) => { @@ -228,7 +228,7 @@ public void IntIntMutableSegmentSeveralUpserts(WriteAheadLogMode walMode) .ConfigureWriteAheadLogOptions(x => x.WriteAheadLogMode = walMode) .OpenOrCreate(); var n = 1000; - var random = new Random(); + var random = Random.Shared; Parallel.For(0, 1000, (x) => { try diff --git a/src/ZoneTree.UnitTests/ExceptionlessTransactionTests.cs b/src/ZoneTree.UnitTests/ExceptionlessTransactionTests.cs index 133eeaa..54987c1 100644 --- a/src/ZoneTree.UnitTests/ExceptionlessTransactionTests.cs +++ b/src/ZoneTree.UnitTests/ExceptionlessTransactionTests.cs @@ -82,7 +82,7 @@ public async Task TransactionWithFluentAPI(int compactionThreshold) zoneTree.Maintenance.TransactionLog.CompactionThreshold = compactionThreshold; - var random = new Random(); + var random = Random.Shared; await Parallel.ForEachAsync(Enumerable.Range(0, 1000), async (x, cancel) => { using var transaction = diff --git a/src/ZoneTree.UnitTests/IteratorTests.cs b/src/ZoneTree.UnitTests/IteratorTests.cs index 1501bc6..889864d 100644 --- a/src/ZoneTree.UnitTests/IteratorTests.cs +++ b/src/ZoneTree.UnitTests/IteratorTests.cs @@ -240,7 +240,7 @@ public void IntIntIteratorParallelInserts() if (Directory.Exists(dataPath)) Directory.Delete(dataPath, true); - var random = new Random(); + var random = Random.Shared; var insertCount = 100000; var iteratorCount = 1000; @@ -289,7 +289,7 @@ public void IntIntReverseIteratorParallelInserts(bool reverse) if (Directory.Exists(dataPath)) Directory.Delete(dataPath, true); - var random = new Random(); + var random = Random.Shared; var insertCount = 1000000; var iteratorCount = 1000; @@ -352,7 +352,7 @@ public void IntIntSnapshotIteratorParallelInserts() if (Directory.Exists(dataPath)) Directory.Delete(dataPath, true); - var random = new Random(); + var random = Random.Shared; var insertCount = 100000; var iteratorCount = 1000; @@ -512,7 +512,7 @@ public void SeekIteratorsAfterMerge( if (merge) zoneTree.Maintenance.StartMergeOperation()?.Join(); - var random = new Random(); + var random = Random.Shared; DoPrefixSearch(zoneTree, list, random); zoneTree.Maintenance.DiskSegment.InitSparseArray(100); DoPrefixSearch(zoneTree, list, random); @@ -604,7 +604,7 @@ public void SeekIteratorsAfterMergeReload( x.MaximumRecordCount = maximumRecordCount; }) .Open(); - var random = new Random(); + var random = Random.Shared; DoPrefixSearch(zoneTree, list, random); zoneTree.Maintenance.DiskSegment.InitSparseArray(100); DoPrefixSearch(zoneTree, list, random); diff --git a/src/ZoneTree.UnitTests/ReplicatorTests.cs b/src/ZoneTree.UnitTests/ReplicatorTests.cs index 88d85ab..e791a3c 100644 --- a/src/ZoneTree.UnitTests/ReplicatorTests.cs +++ b/src/ZoneTree.UnitTests/ReplicatorTests.cs @@ -4,15 +4,6 @@ namespace Tenray.ZoneTree.UnitTests; public sealed class ReplicatorTests { - [Test] - public void TestReplicator2() - { - for (int i = 0; i < 100; i++) - { - TestReplicator(); - } - } - [Test] public void TestReplicator() { @@ -21,14 +12,17 @@ public void TestReplicator() Directory.Delete(dataPath, true); var recordCount = 50_000; var keyCount = 15_000; + var maxMemory = 2_000; void CreateData() { using var zoneTree = new ZoneTreeFactory() .SetDataDirectory(dataPath + "/source") + .SetMutableSegmentMaxItemCount(maxMemory) .OpenOrCreate(); using var replica = new ZoneTreeFactory() .SetDataDirectory(dataPath + "/replica") + .SetMutableSegmentMaxItemCount(maxMemory) .OpenOrCreate(); using var replicator = new Replicator(replica, dataPath + "/replica-op-index"); @@ -84,4 +78,83 @@ void TestEqual() CreateData(); TestEqual(); } -} + + [Test] + public void TestReplicator2() + { + for (int i = 0; i < 5; i++) + { + var dataPath = "data/TestReplicator"; + if (Directory.Exists(dataPath)) + Directory.Delete(dataPath, true); + + var recordCount = 50_000; + var keyCount = 15_000; + var maxMemory = 2_000; + + void CreateData() + { + using var zoneTree = new ZoneTreeFactory() + .SetDataDirectory(dataPath + "/source") + .SetMutableSegmentMaxItemCount(maxMemory) + .OpenOrCreate(); + + using var replica = new ZoneTreeFactory() + .SetDataDirectory(dataPath + "/replica") + .SetMutableSegmentMaxItemCount(maxMemory) + .OpenOrCreate(); + + using var replicator = new Replicator(replica, dataPath + "/replica-op-index"); + using var maintainer1 = zoneTree.CreateMaintainer(); + using var maintainer2 = replica.CreateMaintainer(); + int replicated = 0; + var k = 0; + Parallel.For(0, recordCount, (i) => + { + var key = i % keyCount; + var value = Interlocked.Increment(ref k); + var opIndex = zoneTree.Upsert(key, value); + Task.Run(() => + { + replicator.OnUpsert(key, value, opIndex); + Interlocked.Increment(ref replicated); + }); + }); + while (replicated < recordCount) Task.Delay(500).Wait(); + maintainer1.EvictToDisk(); + maintainer2.EvictToDisk(); + maintainer1.WaitForBackgroundThreads(); + maintainer2.WaitForBackgroundThreads(); + } + + void TestEqual() + { + using var zoneTree = new ZoneTreeFactory() + .SetDataDirectory(dataPath + "/source") + .Open(); + + using var replica = new ZoneTreeFactory() + .SetDataDirectory(dataPath + "/replica") + .Open(); + + using var iterator1 = zoneTree.CreateIterator(); + using var iterator2 = replica.CreateIterator(); + var i = 0; + while (true) + { + var n1 = iterator1.Next(); + var n2 = iterator2.Next(); + Assert.That(n2, Is.EqualTo(n1)); + if (!n1) break; + Assert.That(iterator2.Current, Is.EqualTo(iterator1.Current)); + ++i; + } + Assert.That(i, Is.EqualTo(keyCount)); + zoneTree.Maintenance.Drop(); + replica.Maintenance.Drop(); + } + CreateData(); + TestEqual(); + } + } +} \ No newline at end of file diff --git a/src/ZoneTree.UnitTests/SafeBplusTreeTests.cs b/src/ZoneTree.UnitTests/SafeBplusTreeTests.cs index 9054572..ad5c4a6 100644 --- a/src/ZoneTree.UnitTests/SafeBplusTreeTests.cs +++ b/src/ZoneTree.UnitTests/SafeBplusTreeTests.cs @@ -22,13 +22,13 @@ public void BTreeIteration(BTreeLockMode lockMode) tree.TryInsert(i, i + i, out _); var iterator = new BTreeSeekableIterator(tree); - var j = 0; + var j = 0; while (iterator.Next()) { Assert.That(iterator.CurrentKey, Is.EqualTo(j)); Assert.That(iterator.CurrentValue, Is.EqualTo(j + j)); ++j; - } + } iterator.SeekEnd(); j = n - 1; @@ -92,7 +92,7 @@ public void BTreeLowerOrEqualBound(BTreeLockMode lockMode) Assert.That(GetLastNodeSmallerOrEqual(iterator, 4), Is.EqualTo(3)); Assert.That(GetLastNodeSmallerOrEqual(iterator, 3), Is.EqualTo(3)); Assert.Throws( - () => GetLastNodeSmallerOrEqual(iterator ,- 1)); + () => GetLastNodeSmallerOrEqual(iterator, -1)); Assert.That(GetLastNodeSmallerOrEqual(iterator, 10), Is.EqualTo(9)); Assert.That(GetLastNodeSmallerOrEqual(iterator, 9), Is.EqualTo(9)); Assert.That(GetLastNodeSmallerOrEqual(iterator, 1), Is.EqualTo(1)); @@ -135,7 +135,7 @@ int GetFirstNodeGreaterOrEqual(BTreeSeekableIterator iterator, int key [TestCase(BTreeLockMode.NodeLevelMonitor)] public void BTreeIteratorParallelInserts(BTreeLockMode lockMode) { - var random = new Random(); + var random = Random.Shared; var insertCount = 100000; var iteratorCount = 1000; @@ -190,7 +190,7 @@ public void BTreeIteratorParallelInserts(BTreeLockMode lockMode) [TestCase(BTreeLockMode.NodeLevelMonitor)] public void BTreeReverseIteratorParallelInserts(BTreeLockMode lockMode) { - var random = new Random(); + var random = Random.Shared; var insertCount = 100000; var iteratorCount = 1550; @@ -203,7 +203,7 @@ public void BTreeReverseIteratorParallelInserts(BTreeLockMode lockMode) { var key = random.Next(); tree.AddOrUpdate(key, - AddOrUpdateResult (ref int x) => + AddOrUpdateResult (ref int x) => { x = key + key; return AddOrUpdateResult.ADDED; @@ -253,7 +253,7 @@ public void BTreeReverseIteratorParallelInserts(BTreeLockMode lockMode) [TestCase(BTreeLockMode.NodeLevelMonitor)] public void IntIntDuplicateRecords(BTreeLockMode lockMode) { - var random = new Random(); + var random = Random.Shared; var insertCount = 1000000; var iteratorCount = 1000; @@ -297,7 +297,7 @@ public void IntIntDuplicateRecords(BTreeLockMode lockMode) [TestCase(BTreeLockMode.NodeLevelMonitor)] public void IntIntDuplicateReverseRecords(BTreeLockMode lockMode) { - var random = new Random(); + var random = Random.Shared; var insertCount = 1000000; var iteratorCount = 1000; diff --git a/src/ZoneTree/Core/Replicator.cs b/src/ZoneTree/Core/Replicator.cs index 5750179..7c8fc7c 100644 --- a/src/ZoneTree/Core/Replicator.cs +++ b/src/ZoneTree/Core/Replicator.cs @@ -4,22 +4,61 @@ using System.Collections.Generic; using Tenray.ZoneTree.Collections; +/// +/// The class provides asynchronous replication for +/// instances, allowing efficient upsert operations +/// and background maintenance tasks. +/// +/// The type of the key used in the ZoneTree. +/// The type of the value used in the ZoneTree. public sealed class Replicator : IDisposable { - readonly IZoneTree Replica; + /// + /// The main instance that acts as the replica. + /// This holds the replicated data. + /// + public readonly IZoneTree Replica; - readonly IZoneTree LatestOpIndexes; + /// + /// An that tracks the latest operation indexes for each key. + /// This helps in ensuring that only the most recent updates are reflected in the . + /// + public readonly IZoneTree LatestOpIndexes; - readonly IMaintainer Maintainer; + /// + /// The responsible for managing background maintenance jobs, + /// such as cleaning inactive caches and evicting data to disk. + /// + public readonly IMaintainer Maintainer; + /// + /// A flag indicating whether data should be evicted to disk when the replicator is disposed. + /// + readonly bool EvictToDiskOnDispose; + + /// + /// A flag indicating whether the replicator has already been disposed to avoid multiple disposal operations. + /// bool isDisposed; + /// + /// Initializes a new instance of the class. + /// + /// The instance representing the replica. + /// The file path where data will be persisted to disk. + /// Indicates whether the data should be evicted to disk on dispose. + /// + /// An optional action to configure the instance used for + /// creating or opening the tree. + /// public Replicator( IZoneTree replica, string dataPath, + bool evictToDiskOnDispose = true, Action> configure = null) { this.Replica = replica; + EvictToDiskOnDispose = evictToDiskOnDispose; var factory = new ZoneTreeFactory() .SetDataDirectory(dataPath); if (configure != null) configure(factory); @@ -28,6 +67,18 @@ public Replicator( Maintainer.EnableJobForCleaningInactiveCaches = true; } + /// + /// Handles the upsert operation in the replicator, ensuring atomic updates + /// to both the and the . + /// + /// The key of the element to be upserted. + /// The value of the element to be upserted. + /// The operation index associated with this upsert operation. + /// + /// The upsert operation ensures that the is updated atomically. + /// If the new operation index () is greater than or equal to the existing index, + /// the key-value pair is upserted into the . + /// public void OnUpsert(TKey key, TValue value, long opIndex) { LatestOpIndexes.TryAtomicAddOrUpdate( @@ -51,11 +102,21 @@ public void OnUpsert(TKey key, TValue value, long opIndex) }); } + /// + /// Releases all resources used by the . + /// + /// + /// If is set to true, the data is + /// evicted to disk before disposal. The method ensures that all background + /// maintenance jobs are completed and disposes of the + /// and the . + /// public void Dispose() { if (isDisposed) return; isDisposed = true; - Maintainer.EvictToDisk(); + if (EvictToDiskOnDispose) + Maintainer.EvictToDisk(); Maintainer.WaitForBackgroundThreads(); Maintainer.Dispose(); LatestOpIndexes.Dispose(); diff --git a/src/ZoneTree/Core/ZoneTree.Merge.cs b/src/ZoneTree/Core/ZoneTree.Merge.cs index c730736..c9b3ed7 100644 --- a/src/ZoneTree/Core/ZoneTree.Merge.cs +++ b/src/ZoneTree/Core/ZoneTree.Merge.cs @@ -26,7 +26,7 @@ void MoveMutableSegmentForward(IMutableSegment mutableSegment) // move segment zero only if // the given mutable segment is the current mutable segment (not already moved) // and it is not frozen. - if (mutableSegment.IsFrozen || mutableSegment != MutableSegment) + if (mutableSegment.IsFrozen || !ReferenceEquals(mutableSegment, MutableSegment)) return; //Don't move empty mutable segment. @@ -34,11 +34,12 @@ void MoveMutableSegmentForward(IMutableSegment mutableSegment) if (c == 0) return; + MutableSegment = new FrozenMutableSegment(mutableSegment); mutableSegment.Freeze(); + while (!mutableSegment.IsFullyFrozen) Thread.Yield(); ReadOnlySegmentQueue.Enqueue(mutableSegment); MetaWal.EnqueueMaximumOpIndex(mutableSegment.MaximumOpIndex); MetaWal.EnqueueReadOnlySegment(mutableSegment.SegmentId); - MutableSegment = new MutableSegment( Options, IncrementalIdProvider.NextId(), mutableSegment.OpIndexProvider); diff --git a/src/ZoneTree/Core/ZoneTree.ReadWrite.cs b/src/ZoneTree/Core/ZoneTree.ReadWrite.cs index d3def6d..1db1dcd 100644 --- a/src/ZoneTree/Core/ZoneTree.ReadWrite.cs +++ b/src/ZoneTree/Core/ZoneTree.ReadWrite.cs @@ -240,6 +240,7 @@ public bool TryAtomicAddOrUpdate( switch (status) { case AddOrUpdateResult.RETRY_SEGMENT_IS_FROZEN: + Thread.Yield(); continue; case AddOrUpdateResult.RETRY_SEGMENT_IS_FULL: MoveMutableSegmentForward(mutableSegment); @@ -318,6 +319,7 @@ public bool TryAtomicAddOrUpdate( switch (status) { case AddOrUpdateResult.RETRY_SEGMENT_IS_FROZEN: + Thread.Yield(); continue; case AddOrUpdateResult.RETRY_SEGMENT_IS_FULL: MoveMutableSegmentForward(mutableSegment); @@ -349,6 +351,7 @@ public long Upsert(in TKey key, in TValue value) switch (status) { case AddOrUpdateResult.RETRY_SEGMENT_IS_FROZEN: + Thread.Yield(); continue; case AddOrUpdateResult.RETRY_SEGMENT_IS_FULL: MoveMutableSegmentForward(mutableSegment); @@ -370,6 +373,7 @@ public long Upsert(in TKey key, GetValueDelegate valueGetter) switch (status) { case AddOrUpdateResult.RETRY_SEGMENT_IS_FROZEN: + Thread.Yield(); continue; case AddOrUpdateResult.RETRY_SEGMENT_IS_FULL: MoveMutableSegmentForward(mutableSegment); @@ -405,6 +409,7 @@ public long ForceDelete(in TKey key) switch (status) { case AddOrUpdateResult.RETRY_SEGMENT_IS_FROZEN: + Thread.Yield(); continue; case AddOrUpdateResult.RETRY_SEGMENT_IS_FULL: MoveMutableSegmentForward(mutableSegment); diff --git a/src/ZoneTree/Segments/InMemory/FrozenMutableSegment.cs b/src/ZoneTree/Segments/InMemory/FrozenMutableSegment.cs new file mode 100644 index 0000000..9efb330 --- /dev/null +++ b/src/ZoneTree/Segments/InMemory/FrozenMutableSegment.cs @@ -0,0 +1,79 @@ +using Tenray.ZoneTree.Collections; +using Tenray.ZoneTree.Collections.BTree; +using Tenray.ZoneTree.Core; + +namespace Tenray.ZoneTree.Segments.InMemory; + +public sealed class FrozenMutableSegment : IMutableSegment +{ + private IMutableSegment mutableSegment; + + public FrozenMutableSegment(IMutableSegment mutableSegment) + { + this.mutableSegment = mutableSegment; + } + + public bool IsFrozen => true; + + public IIncrementalIdProvider OpIndexProvider => mutableSegment.OpIndexProvider; + + public long SegmentId => mutableSegment.SegmentId; + + public long Length => mutableSegment.Length; + + public long MaximumOpIndex => mutableSegment.MaximumOpIndex; + + public bool IsFullyFrozen => false; + + public bool ContainsKey(in TKey key) + { + return mutableSegment.ContainsKey(key); + } + + public AddOrUpdateResult Delete(in TKey key, out long opIndex) + { + opIndex = 0; + return AddOrUpdateResult.RETRY_SEGMENT_IS_FROZEN; + } + + public void Drop() + { + mutableSegment.Drop(); + } + + public void Freeze() + { + } + + public IIndexedReader GetIndexedReader() + { + throw new NotSupportedException("BTree Indexed Reader is not supported."); + } + + public ISeekableIterator GetSeekableIterator(bool contributeToTheBlockCache = false) + { + return mutableSegment.GetSeekableIterator(contributeToTheBlockCache); + } + + public void ReleaseResources() + { + mutableSegment.ReleaseResources(); + } + + public bool TryGet(in TKey key, out TValue value) + { + return mutableSegment.TryGet(key, out value); + } + + public AddOrUpdateResult Upsert(in TKey key, in TValue value, out long opIndex) + { + opIndex = 0; + return AddOrUpdateResult.RETRY_SEGMENT_IS_FROZEN; + } + + public AddOrUpdateResult Upsert(in TKey key, GetValueDelegate valueGetter, out long opIndex) + { + opIndex = 0; + return AddOrUpdateResult.RETRY_SEGMENT_IS_FROZEN; + } +} diff --git a/src/ZoneTree/Segments/InMemory/MutableSegment.cs b/src/ZoneTree/Segments/InMemory/MutableSegment.cs index fb134ef..1e80d9b 100644 --- a/src/ZoneTree/Segments/InMemory/MutableSegment.cs +++ b/src/ZoneTree/Segments/InMemory/MutableSegment.cs @@ -207,12 +207,18 @@ public AddOrUpdateResult Delete(in TKey key, out long opIndex) try { Interlocked.Increment(ref WritesInProgress); - opIndex = 0; + if (IsFrozenFlag) + { + opIndex = 0; return AddOrUpdateResult.RETRY_SEGMENT_IS_FROZEN; + } if (BTree.Length >= MutableSegmentMaxItemCount) + { + opIndex = 0; return AddOrUpdateResult.RETRY_SEGMENT_IS_FULL; + } TValue insertedValue = default; diff --git a/src/ZoneTree/Segments/MultiPart/MultiPartDiskSegmentCreator.cs b/src/ZoneTree/Segments/MultiPart/MultiPartDiskSegmentCreator.cs index 3f2d107..2511c8a 100644 --- a/src/ZoneTree/Segments/MultiPart/MultiPartDiskSegmentCreator.cs +++ b/src/ZoneTree/Segments/MultiPart/MultiPartDiskSegmentCreator.cs @@ -34,7 +34,7 @@ public sealed class MultiPartDiskSegmentCreator : IDiskSegmentCrea readonly List PartValues = new(); - readonly Random Random = new(); + readonly Random Random = Random.Shared; TKey LastAppendedKey;