diff --git a/src/Nethermind/Nethermind.Blockchain.Test/FullPruning/FullPruningDiskTest.cs b/src/Nethermind/Nethermind.Blockchain.Test/FullPruning/FullPruningDiskTest.cs index 3f6e5e8b7f2..b61881b8319 100644 --- a/src/Nethermind/Nethermind.Blockchain.Test/FullPruning/FullPruningDiskTest.cs +++ b/src/Nethermind/Nethermind.Blockchain.Test/FullPruning/FullPruningDiskTest.cs @@ -132,7 +132,7 @@ protected override async Task RunFullPruning(CancellationToken cancellationToken } } - [Test, MaxTime(Timeout.MaxTestTime), Retry(5)] + [Test, MaxTime(Timeout.LongTestTime)] public async Task prune_on_disk_multiple_times() { using PruningTestBlockchain chain = await PruningTestBlockchain.Create(new PruningConfig { FullPruningMinimumDelayHours = 0 }); @@ -142,7 +142,7 @@ public async Task prune_on_disk_multiple_times() } } - [Test, MaxTime(Timeout.MaxTestTime), Retry(5)] + [Test, MaxTime(Timeout.LongTestTime)] public async Task prune_on_disk_only_once() { using PruningTestBlockchain chain = await PruningTestBlockchain.Create(new PruningConfig { FullPruningMinimumDelayHours = 10 }); diff --git a/src/Nethermind/Nethermind.Blockchain.Test/Timeout.cs b/src/Nethermind/Nethermind.Blockchain.Test/Timeout.cs index 9b36657bb48..394b42f25b3 100644 --- a/src/Nethermind/Nethermind.Blockchain.Test/Timeout.cs +++ b/src/Nethermind/Nethermind.Blockchain.Test/Timeout.cs @@ -4,6 +4,7 @@ namespace Nethermind.Blockchain.Test; internal class Timeout { + public const int LongTestTime = 60_000; public const int MaxTestTime = 10_000; public const int MaxWaitTime = 1_000; } diff --git a/src/Nethermind/Nethermind.Core.Test/TestMemColumnDb.cs b/src/Nethermind/Nethermind.Core.Test/TestMemColumnDb.cs index bcb48306915..529df1fdfc1 100644 --- a/src/Nethermind/Nethermind.Core.Test/TestMemColumnDb.cs +++ b/src/Nethermind/Nethermind.Core.Test/TestMemColumnDb.cs @@ -31,4 +31,5 @@ public IColumnsWriteBatch StartWriteBatch() return new InMemoryColumnWriteBatch(this); } public void Dispose() { } + public void Flush(bool onlyWal = false) { } } diff --git a/src/Nethermind/Nethermind.Core.Test/TestMemDb.cs b/src/Nethermind/Nethermind.Core.Test/TestMemDb.cs index c2a1b5c5faa..169a166c58c 100644 --- a/src/Nethermind/Nethermind.Core.Test/TestMemDb.cs +++ b/src/Nethermind/Nethermind.Core.Test/TestMemDb.cs @@ -109,7 +109,7 @@ public override IWriteBatch StartWriteBatch() return new InMemoryWriteBatch(this); } - public override void Flush() + public override void Flush(bool onlyWal) { FlushCount++; } diff --git a/src/Nethermind/Nethermind.Db.Rocks/ColumnDb.cs b/src/Nethermind/Nethermind.Db.Rocks/ColumnDb.cs index c3ae97eb78a..b5451766045 100644 --- a/src/Nethermind/Nethermind.Db.Rocks/ColumnDb.cs +++ b/src/Nethermind/Nethermind.Db.Rocks/ColumnDb.cs @@ -128,9 +128,9 @@ public bool KeyExists(ReadOnlySpan key) return _mainDb.KeyExistsWithColumn(key, _columnFamily); } - public void Flush() + public void Flush(bool onlyWal) { - _mainDb.Flush(); + _mainDb.Flush(onlyWal); } public void Compact() diff --git a/src/Nethermind/Nethermind.Db.Rocks/DbOnTheRocks.cs b/src/Nethermind/Nethermind.Db.Rocks/DbOnTheRocks.cs index 8427e7f2d98..66766e788bd 100644 --- a/src/Nethermind/Nethermind.Db.Rocks/DbOnTheRocks.cs +++ b/src/Nethermind/Nethermind.Db.Rocks/DbOnTheRocks.cs @@ -1321,11 +1321,11 @@ private void FlushOnTooManyWrites() } } - public void Flush() + public void Flush(bool onlyWal = false) { ObjectDisposedException.ThrowIf(_isDisposing, this); - InnerFlush(); + InnerFlush(onlyWal); } public virtual void Compact() @@ -1333,11 +1333,16 @@ public virtual void Compact() _db.CompactRange(Keccak.Zero.BytesToArray(), Keccak.MaxValue.BytesToArray()); } - private void InnerFlush() + private void InnerFlush(bool onlyWal) { try { - _rocksDbNative.rocksdb_flush(_db.Handle, FlushOptions.DefaultFlushOptions.Handle); + _rocksDbNative.rocksdb_flush_wal(_db.Handle, true); + + if (!onlyWal) + { + _rocksDbNative.rocksdb_flush(_db.Handle, FlushOptions.DefaultFlushOptions.Handle); + } } catch (RocksDbSharpException e) { @@ -1439,7 +1444,7 @@ public void Dispose() dbMetricsUpdater.Dispose(); } - InnerFlush(); + InnerFlush(false); ReleaseUnmanagedResources(); _dbsByPath.Remove(_fullPath!, out _); diff --git a/src/Nethermind/Nethermind.Db.Rpc/RpcColumnsDb.cs b/src/Nethermind/Nethermind.Db.Rpc/RpcColumnsDb.cs index 8a458965589..8ddcde89941 100644 --- a/src/Nethermind/Nethermind.Db.Rpc/RpcColumnsDb.cs +++ b/src/Nethermind/Nethermind.Db.Rpc/RpcColumnsDb.cs @@ -46,5 +46,6 @@ public IColumnsWriteBatch StartWriteBatch() return new InMemoryColumnWriteBatch(this); } public void Dispose() { } + public void Flush(bool onlyWal = false) { } } } diff --git a/src/Nethermind/Nethermind.Db.Rpc/RpcDb.cs b/src/Nethermind/Nethermind.Db.Rpc/RpcDb.cs index d0d361d7df1..98279f33c4a 100644 --- a/src/Nethermind/Nethermind.Db.Rpc/RpcDb.cs +++ b/src/Nethermind/Nethermind.Db.Rpc/RpcDb.cs @@ -73,6 +73,8 @@ public bool KeyExists(ReadOnlySpan key) public IDb Innermost => this; // record db is just a helper DB here public void Flush() { } + public void Flush(bool onlyWal = false) { } + public void Clear() { } public IEnumerable> GetAll(bool ordered = false) => _recordDb.GetAll(); diff --git a/src/Nethermind/Nethermind.Db/CompressingDb.cs b/src/Nethermind/Nethermind.Db/CompressingDb.cs index 95623cb6f5a..997d2c29877 100644 --- a/src/Nethermind/Nethermind.Db/CompressingDb.cs +++ b/src/Nethermind/Nethermind.Db/CompressingDb.cs @@ -137,7 +137,7 @@ public IEnumerable GetAllValues(bool ordered = false) => public bool KeyExists(ReadOnlySpan key) => _wrapped.KeyExists(key); - public void Flush() => _wrapped.Flush(); + public void Flush(bool onlyWal) => _wrapped.Flush(onlyWal); public void Clear() => _wrapped.Clear(); diff --git a/src/Nethermind/Nethermind.Db/FullPruning/FullPruningDb.cs b/src/Nethermind/Nethermind.Db/FullPruning/FullPruningDb.cs index 78349d64266..bdf9e37088f 100755 --- a/src/Nethermind/Nethermind.Db/FullPruning/FullPruningDb.cs +++ b/src/Nethermind/Nethermind.Db/FullPruning/FullPruningDb.cs @@ -144,11 +144,11 @@ public void Remove(ReadOnlySpan key) public IDb Innermost => this; // we need to flush both DB's - public void Flush() + public void Flush(bool onlyWal) { - _currentDb.Flush(); + _currentDb.Flush(onlyWal); IDb? cloningDb = _pruningContext?.CloningDb; - cloningDb?.Flush(); + cloningDb?.Flush(onlyWal); } // we need to clear both DB's diff --git a/src/Nethermind/Nethermind.Db/IDb.cs b/src/Nethermind/Nethermind.Db/IDb.cs index 41c068db458..73f9e4ba171 100644 --- a/src/Nethermind/Nethermind.Db/IDb.cs +++ b/src/Nethermind/Nethermind.Db/IDb.cs @@ -23,7 +23,7 @@ public interface IDbMeta { DbMetric GatherMetric(bool includeSharedCache = false) => new DbMetric(); - void Flush() { } + void Flush(bool onlyWal = false); void Clear() { } void Compact() { } diff --git a/src/Nethermind/Nethermind.Db/MemColumnsDb.cs b/src/Nethermind/Nethermind.Db/MemColumnsDb.cs index 044fce49982..ddc3b88f1dc 100644 --- a/src/Nethermind/Nethermind.Db/MemColumnsDb.cs +++ b/src/Nethermind/Nethermind.Db/MemColumnsDb.cs @@ -35,5 +35,6 @@ public IColumnsWriteBatch StartWriteBatch() return new InMemoryColumnWriteBatch(this); } public void Dispose() { } + public void Flush(bool onlyWal = false) { } } } diff --git a/src/Nethermind/Nethermind.Db/MemDb.cs b/src/Nethermind/Nethermind.Db/MemDb.cs index b31d7ca92b0..5d78c8e68b5 100644 --- a/src/Nethermind/Nethermind.Db/MemDb.cs +++ b/src/Nethermind/Nethermind.Db/MemDb.cs @@ -77,9 +77,7 @@ public bool KeyExists(ReadOnlySpan key) public IDb Innermost => this; - public virtual void Flush() - { - } + public virtual void Flush(bool onlyWal = false) { } public void Clear() { diff --git a/src/Nethermind/Nethermind.Db/NullDb.cs b/src/Nethermind/Nethermind.Db/NullDb.cs index c53b4569772..923dc74f528 100644 --- a/src/Nethermind/Nethermind.Db/NullDb.cs +++ b/src/Nethermind/Nethermind.Db/NullDb.cs @@ -43,7 +43,8 @@ public bool KeyExists(ReadOnlySpan key) return false; } - public void Flush() { } + public void Flush(bool onlyWal = false) { } + public void Clear() { } public IEnumerable> GetAll(bool ordered = false) => Enumerable.Empty>(); diff --git a/src/Nethermind/Nethermind.Db/ReadOnlyColumnsDb.cs b/src/Nethermind/Nethermind.Db/ReadOnlyColumnsDb.cs index 0591877186f..262a6400582 100644 --- a/src/Nethermind/Nethermind.Db/ReadOnlyColumnsDb.cs +++ b/src/Nethermind/Nethermind.Db/ReadOnlyColumnsDb.cs @@ -44,5 +44,7 @@ public void Dispose() readOnlyColumn.Value.Dispose(); } } + + public void Flush(bool onlyWal = false) { } } } diff --git a/src/Nethermind/Nethermind.Db/ReadOnlyDb.cs b/src/Nethermind/Nethermind.Db/ReadOnlyDb.cs index 495a51fbe5e..ff73f260a3a 100644 --- a/src/Nethermind/Nethermind.Db/ReadOnlyDb.cs +++ b/src/Nethermind/Nethermind.Db/ReadOnlyDb.cs @@ -67,11 +67,7 @@ public void Remove(ReadOnlySpan key) { } public bool KeyExists(ReadOnlySpan key) => _memDb.KeyExists(key) || wrappedDb.KeyExists(key); - public void Flush() - { - wrappedDb.Flush(); - _memDb.Flush(); - } + public void Flush(bool onlyWal) { } public void Clear() => throw new InvalidOperationException(); diff --git a/src/Nethermind/Nethermind.Db/SimpleFilePublicKeyDb.cs b/src/Nethermind/Nethermind.Db/SimpleFilePublicKeyDb.cs index 473263508c5..da7fe692fe3 100644 --- a/src/Nethermind/Nethermind.Db/SimpleFilePublicKeyDb.cs +++ b/src/Nethermind/Nethermind.Db/SimpleFilePublicKeyDb.cs @@ -85,7 +85,8 @@ public bool KeyExists(ReadOnlySpan key) return _cache.ContainsKey(key); } - public void Flush() { } + public void Flush(bool onlyWal = false) { } + public void Clear() { File.Delete(DbPath); diff --git a/src/Nethermind/Nethermind.Init/InitializeStateDb.cs b/src/Nethermind/Nethermind.Init/InitializeStateDb.cs index 2b5e642d17b..8f53c42a517 100644 --- a/src/Nethermind/Nethermind.Init/InitializeStateDb.cs +++ b/src/Nethermind/Nethermind.Init/InitializeStateDb.cs @@ -119,9 +119,9 @@ public Task Execute(CancellationToken cancellationToken) var minimumWriteBufferMb = 0.2 * pruningConfig.CacheMb; if (totalWriteBufferMb < minimumWriteBufferMb) { - int minimumWriteBufferNumber = (int)Math.Ceiling((minimumWriteBufferMb * 1.MB()) / dbConfig.StateDbWriteBufferSize); + long minimumWriteBufferSize = (int)Math.Ceiling((minimumWriteBufferMb * 1.MB()) / dbConfig.StateDbWriteBufferNumber); - if (_logger.IsWarn) _logger.Warn($"Detected {totalWriteBufferMb}MB of maximum write buffer size. Write buffer size should be at least 20% of pruning cache MB or memory pruning may slow down. Try setting `--Db.{nameof(dbConfig.WriteBufferNumber)} {minimumWriteBufferNumber}`."); + if (_logger.IsWarn) _logger.Warn($"Detected {totalWriteBufferMb}MB of maximum write buffer size. Write buffer size should be at least 20% of pruning cache MB or memory pruning may slow down. Try setting `--Db.{nameof(dbConfig.StateDbWriteBufferSize)} {minimumWriteBufferSize}`."); } pruningStrategy = Prune diff --git a/src/Nethermind/Nethermind.Synchronization/FastSync/TreeSync.cs b/src/Nethermind/Nethermind.Synchronization/FastSync/TreeSync.cs index 1abfe9643bb..12d2540e789 100644 --- a/src/Nethermind/Nethermind.Synchronization/FastSync/TreeSync.cs +++ b/src/Nethermind/Nethermind.Synchronization/FastSync/TreeSync.cs @@ -687,7 +687,7 @@ private void SaveNode(StateSyncItem syncItem, byte[] data) { if (_logger.IsInfo) _logger.Info($"Saving root {syncItem.Hash} of {_branchProgress.CurrentSyncBlock}"); - _nodeStorage.Flush(); + _nodeStorage.Flush(onlyWal: false); _codeDb.Flush(); Interlocked.Exchange(ref _rootSaved, 1); diff --git a/src/Nethermind/Nethermind.Trie.Test/Pruning/TreeStoreTests.cs b/src/Nethermind/Nethermind.Trie.Test/Pruning/TreeStoreTests.cs index 3527f248c8b..480fba25516 100644 --- a/src/Nethermind/Nethermind.Trie.Test/Pruning/TreeStoreTests.cs +++ b/src/Nethermind/Nethermind.Trie.Test/Pruning/TreeStoreTests.cs @@ -78,6 +78,26 @@ public void Memory_with_one_node_is_288() trieNode.GetMemorySize(false) + ExpectedPerNodeKeyMemorySize); } + [Test] + public void Flush_ShouldBeCalledOnEachPersist() + { + TrieNode trieNode = new(NodeType.Leaf, Keccak.Zero); + + TestMemDb testMemDb = new TestMemDb(); + using TrieStore fullTrieStore = CreateTrieStore(persistenceStrategy: Archive.Instance, kvStore: testMemDb); + PatriciaTree pt = new PatriciaTree(fullTrieStore.GetTrieStore(null), LimboLogs.Instance); + + for (int i = 0; i < 4; i++) + { + pt.Set(TestItem.KeccakA.BytesToArray(), TestItem.Keccaks[i].BytesToArray()); + using (ICommitter? committer = fullTrieStore.BeginStateBlockCommit(i + 1, trieNode)) + { + pt.Commit(); + } + } + + testMemDb.FlushCount.Should().Be(4); + } [Test] public void Pruning_off_cache_should_not_change_commit_node() @@ -978,8 +998,9 @@ public async Task Will_Not_RemovePastKeys_OnSnapshot_DuringFullPruning() pruningStrategy: new TestPruningStrategy(true, true, 2, 100000), persistenceStrategy: isPruningPersistenceStrategy); - IScopedTrieStore trieStore = fullTrieStore.GetTrieStore(null); TreePath emptyPath = TreePath.Empty; + TaskCompletionSource tcs = new TaskCompletionSource(); + fullTrieStore.OnMemoryPruneCompleted += (sender, args) => tcs.TrySetResult(); for (int i = 0; i < 64; i++) { @@ -990,7 +1011,8 @@ public async Task Will_Not_RemovePastKeys_OnSnapshot_DuringFullPruning() } // Pruning is done in background - await Task.Delay(TimeSpan.FromMilliseconds(10)); + await tcs.Task; + tcs = new TaskCompletionSource(); } memDb.Count.Should().Be(61); diff --git a/src/Nethermind/Nethermind.Trie/INodeStorage.cs b/src/Nethermind/Nethermind.Trie/INodeStorage.cs index 91f8abc08d6..70b63892736 100644 --- a/src/Nethermind/Nethermind.Trie/INodeStorage.cs +++ b/src/Nethermind/Nethermind.Trie/INodeStorage.cs @@ -30,7 +30,8 @@ public interface INodeStorage /// /// Used by StateSync to make sure values are flushed. /// - void Flush(); + /// True if only WAL file should be flushed, not memtable. + void Flush(bool onlyWal); void Compact(); public enum KeyScheme diff --git a/src/Nethermind/Nethermind.Trie/NodeStorage.cs b/src/Nethermind/Nethermind.Trie/NodeStorage.cs index 066b5c19fb8..108da6a4a11 100644 --- a/src/Nethermind/Nethermind.Trie/NodeStorage.cs +++ b/src/Nethermind/Nethermind.Trie/NodeStorage.cs @@ -184,11 +184,11 @@ public void Set(Hash256? address, in TreePath path, in ValueHash256 keccak, Read _keyValueStore.PutSpan(GetExpectedPath(stackalloc byte[StoragePathLength], address, path, keccak), data, writeFlags); } - public void Flush() + public void Flush(bool onlyWal) { if (_keyValueStore is IDb db) { - db.Flush(); + db.Flush(onlyWal); } } diff --git a/src/Nethermind/Nethermind.Trie/Pruning/TrieStore.cs b/src/Nethermind/Nethermind.Trie/Pruning/TrieStore.cs index b5c6bbcab66..10ee4260760 100644 --- a/src/Nethermind/Nethermind.Trie/Pruning/TrieStore.cs +++ b/src/Nethermind/Nethermind.Trie/Pruning/TrieStore.cs @@ -360,6 +360,9 @@ private void FinishBlockCommit(BlockCommitSet set, TrieNode? root) public event EventHandler? ReorgBoundaryReached; + // Used in testing to not have to wait for condition. + public event EventHandler OnMemoryPruneCompleted; + public byte[]? TryLoadRlp(Hash256? address, in TreePath path, Hash256 keccak, INodeStorage? nodeStorage, ReadFlags readFlags = ReadFlags.None) { nodeStorage ??= _nodeStorage; @@ -457,14 +460,15 @@ public void Prune() // otherwise, it may not fit the whole dirty cache. // Additionally, if (WriteBufferSize * (WriteBufferNumber - 1)) is already more than 20% of pruning // cache, it is likely that there are enough space for it on most time, except for syncing maybe. - _nodeStorage.Flush(); + _nodeStorage.Flush(onlyWal: false); lock (_dirtyNodesLock) { long start = Stopwatch.GetTimestamp(); if (_logger.IsDebug) _logger.Debug($"Locked {nameof(TrieStore)} for pruning."); long memoryUsedByDirtyCache = MemoryUsedByDirtyCache; - if (!_pruningTaskCancellationTokenSource.IsCancellationRequested && _pruningStrategy.ShouldPrune(memoryUsedByDirtyCache)) + if (!_pruningTaskCancellationTokenSource.IsCancellationRequested && + _pruningStrategy.ShouldPrune(memoryUsedByDirtyCache)) { // Most of the time in memory pruning is on `PrunePersistedRecursively`. So its // usually faster to just SaveSnapshot causing most of the entry to be persisted. @@ -497,6 +501,11 @@ public void Prune() if (_logger.IsError) _logger.Error("Pruning failed with exception.", e); } }); + + _pruningTask.ContinueWith((_) => + { + OnMemoryPruneCompleted?.Invoke(this, EventArgs.Empty); + }); } } @@ -673,9 +682,7 @@ public void WaitForPruning() private ConcurrentQueue CommitSetQueue => (_commitSetQueue ?? CreateQueueAtomic(ref _commitSetQueue)); -#if DEBUG private BlockCommitSet? _lastCommitSet = null; -#endif private long _memoryUsedByDirtyCache; @@ -703,18 +710,20 @@ private BlockCommitSet CreateCommitSet(long blockNumber) { if (_logger.IsDebug) _logger.Debug($"Beginning new {nameof(BlockCommitSet)} - {blockNumber}"); - // TODO: this throws on reorgs, does it not? let us recreate it in test -#if DEBUG - Debug.Assert(_lastCommitSet == null || blockNumber == _lastCommitSet.BlockNumber + 1 || _lastCommitSet.BlockNumber == 0, $"Newly begun block is not a successor of the last one."); - Debug.Assert(_lastCommitSet == null || _lastCommitSet.IsSealed, "Not sealed when beginning new block"); -#endif + if (_lastCommitSet is not null) + { + Debug.Assert(_lastCommitSet.IsSealed, "Not sealed when beginning new block"); + + if (_lastCommitSet.BlockNumber != blockNumber - 1 && blockNumber != 0 && _lastCommitSet.BlockNumber != 0) + { + if (_logger.IsInfo) _logger.Info($"Non consecutive block commit. This is likely a reorg. Last block commit: {_lastCommitSet.BlockNumber}. New block commit: {blockNumber}."); + } + } BlockCommitSet commitSet = new(blockNumber); CommitSetQueue.Enqueue(commitSet); -#if DEBUG _lastCommitSet = commitSet; -#endif LatestCommittedBlockNumber = Math.Max(blockNumber, LatestCommittedBlockNumber); // Why are we announcing **before** committing next block?? @@ -757,7 +766,7 @@ void TopLevelPersist(TrieNode tn, Hash256? address2, TreePath path) } } - if (_logger.IsDebug) _logger.Debug($"Persisting from root {commitSet.Root} in {commitSet.BlockNumber}"); + if (_logger.IsInfo) _logger.Info($"Persisting from root {commitSet.Root?.Keccak} in block {commitSet.BlockNumber}"); long start = Stopwatch.GetTimestamp(); @@ -793,8 +802,9 @@ void TopLevelPersist(TrieNode tn, Hash256? address2, TreePath path) disposeQueue.CompleteAdding(); Task.WaitAll(_disposeTasks); - // Dispose top level last in case something goes wrong, at least the root wont be stored + // Dispose top level last in case something goes wrong, at least the root won't be stored topLevelWriteBatch.Dispose(); + _nodeStorage.Flush(onlyWal: true); long elapsedMilliseconds = (long)Stopwatch.GetElapsedTime(start).TotalMilliseconds; Metrics.SnapshotPersistenceTime = elapsedMilliseconds;