From 075365ab7563db9ec2c104c7b568407d0f0ad152 Mon Sep 17 00:00:00 2001 From: Ben Adams Date: Thu, 28 Dec 2023 23:08:43 +0000 Subject: [PATCH] Reduce lock contention (#6417) * Reduce TxPool lock contention * Reduce lock contention with added spice * Use McsLock for LruCache * Whitespace * Calculate hashes in parallel * Name clash with extension and invalid round trip check * Add tests * Fix * Use monitor signalling to wake up sleeping threads * Additional comments * throttle db read/writes * Less contention on the priority lock * Whitespace * Boost forkchoice --- .../Processing/RecoverSignature.cs | 54 +++++- .../Nethermind.Core.Test/MCSLockTests.cs | 114 ++++++++++++ .../Nethermind.Core.Test/McsPriorityLock.cs | 171 ++++++++++++++++++ .../Nethermind.Core/Caching/LruCache.cs | 36 +++- .../Nethermind.Core/Caching/LruKeyCache.cs | 15 +- .../Nethermind.Core/Caching/SpanLruCache.cs | 48 +++-- .../Nethermind.Core/Threading/McsLock.cs | 170 +++++++++++++++++ .../Threading/McsPriorityLock.cs | 72 ++++++++ .../Threading/ThreadExtensions.cs | 33 ++++ src/Nethermind/Nethermind.Core/Transaction.cs | 47 +++-- .../Nethermind.Db.Rocks/DbOnTheRocks.cs | 26 ++- .../Handlers/ForkchoiceUpdatedHandler.cs | 4 + .../Collections/BlobTxDistinctSortedPool.cs | 2 +- .../Collections/DistinctValueSortedPool.cs | 2 +- .../Collections/SortedPool.cs | 68 ++++--- .../Collections/TxDistinctSortedPool.cs | 16 +- src/Nethermind/Nethermind.TxPool/TxPool.cs | 125 ++++++------- 17 files changed, 845 insertions(+), 158 deletions(-) create mode 100644 src/Nethermind/Nethermind.Core.Test/MCSLockTests.cs create mode 100644 src/Nethermind/Nethermind.Core.Test/McsPriorityLock.cs create mode 100644 src/Nethermind/Nethermind.Core/Threading/McsLock.cs create mode 100644 src/Nethermind/Nethermind.Core/Threading/McsPriorityLock.cs create mode 100644 src/Nethermind/Nethermind.Core/Threading/ThreadExtensions.cs diff --git a/src/Nethermind/Nethermind.Consensus/Processing/RecoverSignature.cs b/src/Nethermind/Nethermind.Consensus/Processing/RecoverSignature.cs index 48249fa7c6d..c9e6d56dcb6 100644 --- a/src/Nethermind/Nethermind.Consensus/Processing/RecoverSignature.cs +++ b/src/Nethermind/Nethermind.Consensus/Processing/RecoverSignature.cs @@ -45,21 +45,55 @@ public void RecoverData(Block block) // so we assume the rest of txs in the block are already recovered return; - var releaseSpec = _specProvider.GetSpec(block.Header); - Parallel.ForEach( - block.Transactions.Where(tx => tx.IsSigned && tx.SenderAddress is null), + block.Transactions.Where(tx => !tx.IsHashCalculated), blockTransaction => { - _txPool.TryGetPendingTransaction(blockTransaction.Hash, out Transaction? transaction); + blockTransaction.CalculateHashInternal(); + }); - Address sender = transaction?.SenderAddress; - Address blockTransactionAddress = blockTransaction.SenderAddress; + var releaseSpec = _specProvider.GetSpec(block.Header); - blockTransaction.SenderAddress = - sender ?? _ecdsa.RecoverAddress(blockTransaction, !releaseSpec.ValidateChainId); - if (_logger.IsTrace) _logger.Trace($"Recovered {blockTransaction.SenderAddress} sender for {blockTransaction.Hash} (tx pool cached value: {sender}, block transaction address: {blockTransactionAddress})"); - }); + int recoverFromEcdsa = 0; + // Don't access txPool in Parallel loop as increases contention + foreach (Transaction blockTransaction in block.Transactions.Where(tx => tx.IsSigned && tx.SenderAddress is null)) + { + _txPool.TryGetPendingTransaction(blockTransaction.Hash, out Transaction? transaction); + + Address sender = transaction?.SenderAddress; + if (sender != null) + { + blockTransaction.SenderAddress = sender; + + if (_logger.IsTrace) _logger.Trace($"Recovered {blockTransaction.SenderAddress} sender for {blockTransaction.Hash} (tx pool cached value: {sender})"); + } + else + { + recoverFromEcdsa++; + } + } + + if (recoverFromEcdsa >= 4) + { + // Recover ecdsa in Parallel + Parallel.ForEach( + block.Transactions.Where(tx => tx.IsSigned && tx.SenderAddress is null), + blockTransaction => + { + blockTransaction.SenderAddress = _ecdsa.RecoverAddress(blockTransaction, !releaseSpec.ValidateChainId); + + if (_logger.IsTrace) _logger.Trace($"Recovered {blockTransaction.SenderAddress} sender for {blockTransaction.Hash}"); + }); + } + else if (recoverFromEcdsa > 0) + { + foreach (Transaction blockTransaction in block.Transactions.Where(tx => tx.IsSigned && tx.SenderAddress is null)) + { + blockTransaction.SenderAddress = _ecdsa.RecoverAddress(blockTransaction, !releaseSpec.ValidateChainId); + + if (_logger.IsTrace) _logger.Trace($"Recovered {blockTransaction.SenderAddress} sender for {blockTransaction.Hash}"); + } + } } } } diff --git a/src/Nethermind/Nethermind.Core.Test/MCSLockTests.cs b/src/Nethermind/Nethermind.Core.Test/MCSLockTests.cs new file mode 100644 index 00000000000..889643fce1b --- /dev/null +++ b/src/Nethermind/Nethermind.Core.Test/MCSLockTests.cs @@ -0,0 +1,114 @@ +// SPDX-FileCopyrightText: 2023 Demerzel Solutions Limited +// SPDX-License-Identifier: LGPL-3.0-only + +using Nethermind.Core.Threading; +using NUnit.Framework; + +using System.Collections.Generic; +using System.Linq; +using System.Threading; + +namespace Nethermind.Core.Test; + +[TestFixture] +public class MCSLockTests +{ + private McsLock mcsLock; + + [SetUp] + public void Setup() + { + mcsLock = new McsLock(); + } + + [Test] + public void SingleThreadAcquireRelease() + { + using (var handle = mcsLock.Acquire()) + { + Thread.Sleep(10); + } + + Assert.Pass(); // Test passes if no deadlock or exception occurs. + } + + [Test] + public void MultipleThreads() + { + int counter = 0; + int numberOfThreads = 10; + var threads = new List(); + + for (int i = 0; i < numberOfThreads; i++) + { + var thread = new Thread(() => + { + using var handle = mcsLock.Acquire(); + + counter++; + }); + threads.Add(thread); + thread.Start(); + } + + foreach (Thread thread in threads) + { + thread.Join(); // Wait for all threads to complete. + } + + Assert.That(counter, Is.EqualTo(numberOfThreads)); // Counter should equal the number of threads. + } + + [Test] + public void LockFairnessTest() + { + int numberOfThreads = 10; + var executionOrder = new List(); + var threads = new List(); + + for (int i = 0; i < numberOfThreads; i++) + { + int threadId = i; + var thread = new Thread(() => + { + using var handle = mcsLock.Acquire(); + executionOrder.Add(threadId); + Thread.Sleep(15); // Ensure the order is maintained + }); + threads.Add(thread); + thread.Start(); + Thread.Sleep(1); // Ensure the order is maintained + } + + foreach (Thread thread in threads) + { + thread.Join(); + } + + var expectedOrder = Enumerable.Range(0, numberOfThreads).ToList(); + CollectionAssert.AreEqual(expectedOrder, executionOrder, "Threads did not acquire lock in the order they were started."); + } + + [Test] + public void NonReentrantTest() + { + bool reentrancyDetected = false; + var thread = new Thread(() => + { + using var handle = mcsLock.Acquire(); + try + { + using var innerHandle = mcsLock.Acquire(); // Attempt to re-lock + } + catch + { + reentrancyDetected = true; + } + }); + + thread.Start(); + thread.Join(); + + Assert.IsTrue(reentrancyDetected, "Reentrancy was not properly detected."); + } +} diff --git a/src/Nethermind/Nethermind.Core.Test/McsPriorityLock.cs b/src/Nethermind/Nethermind.Core.Test/McsPriorityLock.cs new file mode 100644 index 00000000000..0eabe4c1eb5 --- /dev/null +++ b/src/Nethermind/Nethermind.Core.Test/McsPriorityLock.cs @@ -0,0 +1,171 @@ +// SPDX-FileCopyrightText: 2023 Demerzel Solutions Limited +// SPDX-License-Identifier: LGPL-3.0-only + +using Nethermind.Core.Threading; +using NUnit.Framework; +using NUnit.Framework.Internal; + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; + +namespace Nethermind.Core.Test; + +[TestFixture] +public class McsPriorityLockTests +{ + private McsPriorityLock mcsLock; + + [SetUp] + public void Setup() + { + mcsLock = new McsPriorityLock(); + } + + [Test] + public void SingleThreadAcquireRelease() + { + using (var handle = mcsLock.Acquire()) + { + Thread.Sleep(10); + } + + Assert.Pass(); // Test passes if no deadlock or exception occurs. + } + + [Test] + public void MultipleThreads() + { + int counter = 0; + int numberOfThreads = 10; + var threads = new List(); + + for (int i = 0; i < numberOfThreads; i++) + { + var thread = new Thread(() => + { + using var handle = mcsLock.Acquire(); + + counter++; + }); + threads.Add(thread); + thread.Start(); + } + + foreach (Thread thread in threads) + { + thread.Join(); // Wait for all threads to complete. + } + + Assert.That(counter, Is.EqualTo(numberOfThreads)); // Counter should equal the number of threads. + } + + [Test] + public void LockFairnessTest() + { + int numberOfThreads = 10; + var executionOrder = new List(); + var threads = new List(); + + for (int i = 0; i < numberOfThreads; i++) + { + int threadId = i; + var thread = new Thread(() => + { + using var handle = mcsLock.Acquire(); + executionOrder.Add(threadId); + Thread.Sleep(15); // Ensure the order is maintained + }); + threads.Add(thread); + thread.Start(); + Thread.Sleep(1); // Ensure the order is maintained + } + + foreach (Thread thread in threads) + { + thread.Join(); + } + + var expectedOrder = Enumerable.Range(0, numberOfThreads).ToList(); + CollectionAssert.AreEqual(expectedOrder, executionOrder, "Threads did not acquire lock in the order they were started."); + } + + + [Test] + public void PriorityQueueJumpingTest() + { + int numberOfThreads = 100; + var threads = new List(); + List executionOrder = new(); + Dictionary threadPriorities = new(); + + // Create threads with varying priorities. + for (int i = 0; i < numberOfThreads; i++) + { + ThreadPriority priority = i % 2 == 0 ? ThreadPriority.Highest : ThreadPriority.Normal; // Alternate priorities + var thread = new Thread(() => + { + using var handle = mcsLock.Acquire(); + executionOrder.Add(Thread.CurrentThread.ManagedThreadId); + Thread.Sleep(25); // Simulate work + }); + thread.Priority = priority; // Set thread priority + threads.Add(thread); + threadPriorities[thread] = priority; + } + + // Start threads. + foreach (var thread in threads) + { + thread.Start(); + } + + // Wait for all threads to complete. + foreach (var thread in threads) + { + thread.Join(); + } + + // Analyze execution order based on priority. + int lowPriorityFirst = 0; + for (int i = 0; i < executionOrder.Count - 1; i++) + { + int currentThreadId = executionOrder[i]; + int nextThreadId = executionOrder[i + 1]; + Thread currentThread = threads.First(t => t.ManagedThreadId == currentThreadId); + Thread nextThread = threads.First(t => t.ManagedThreadId == nextThreadId); + + if (threadPriorities[currentThread] < threadPriorities[nextThread]) + { + lowPriorityFirst++; + } + } + + // Some lower priority threads will acquire first; we are asserting that they mostly queue jump + Assert.That(lowPriorityFirst < (numberOfThreads / 8), Is.True, "High priority threads did not acquire the lock before lower priority ones."); + } + + [Test] + public void NonReentrantTest() + { + bool reentrancyDetected = false; + var thread = new Thread(() => + { + using var handle = mcsLock.Acquire(); + try + { + using var innerHandle = mcsLock.Acquire(); // Attempt to re-lock + } + catch + { + reentrancyDetected = true; + } + }); + + thread.Start(); + thread.Join(); + + Assert.IsTrue(reentrancyDetected, "Reentrancy was not properly detected."); + } +} diff --git a/src/Nethermind/Nethermind.Core/Caching/LruCache.cs b/src/Nethermind/Nethermind.Core/Caching/LruCache.cs index 6301cd72da5..d96a4bc8d4b 100644 --- a/src/Nethermind/Nethermind.Core/Caching/LruCache.cs +++ b/src/Nethermind/Nethermind.Core/Caching/LruCache.cs @@ -4,8 +4,8 @@ using System; using System.Collections.Generic; using System.Diagnostics.CodeAnalysis; -using System.Runtime.CompilerServices; using Nethermind.Core.Extensions; +using Nethermind.Core.Threading; namespace Nethermind.Core.Caching { @@ -13,6 +13,7 @@ public sealed class LruCache : ICache where TKey : n { private readonly int _maxCapacity; private readonly Dictionary> _cacheMap; + private readonly McsLock _lock = new(); private LinkedListNode? _leastRecentlyUsed; public LruCache(int maxCapacity, int startCapacity, string name) @@ -30,16 +31,18 @@ public LruCache(int maxCapacity, string name) { } - [MethodImpl(MethodImplOptions.Synchronized)] public void Clear() { + using var lockRelease = _lock.Acquire(); + _leastRecentlyUsed = null; _cacheMap.Clear(); } - [MethodImpl(MethodImplOptions.Synchronized)] public TValue Get(TKey key) { + using var lockRelease = _lock.Acquire(); + if (_cacheMap.TryGetValue(key, out LinkedListNode? node)) { TValue value = node.Value.Value; @@ -53,9 +56,10 @@ public TValue Get(TKey key) #pragma warning restore 8603 } - [MethodImpl(MethodImplOptions.Synchronized)] public bool TryGet(TKey key, out TValue value) { + using var lockRelease = _lock.Acquire(); + if (_cacheMap.TryGetValue(key, out LinkedListNode? node)) { value = node.Value.Value; @@ -70,12 +74,13 @@ public bool TryGet(TKey key, out TValue value) return false; } - [MethodImpl(MethodImplOptions.Synchronized)] public bool Set(TKey key, TValue val) { + using var lockRelease = _lock.Acquire(); + if (val is null) { - return Delete(key); + return DeleteNoLock(key); } if (_cacheMap.TryGetValue(key, out LinkedListNode? node)) @@ -101,8 +106,14 @@ public bool Set(TKey key, TValue val) } } - [MethodImpl(MethodImplOptions.Synchronized)] public bool Delete(TKey key) + { + using var lockRelease = _lock.Acquire(); + + return DeleteNoLock(key); + } + + private bool DeleteNoLock(TKey key) { if (_cacheMap.TryGetValue(key, out LinkedListNode? node)) { @@ -114,12 +125,17 @@ public bool Delete(TKey key) return false; } - [MethodImpl(MethodImplOptions.Synchronized)] - public bool Contains(TKey key) => _cacheMap.ContainsKey(key); + public bool Contains(TKey key) + { + using var lockRelease = _lock.Acquire(); + + return _cacheMap.ContainsKey(key); + } - [MethodImpl(MethodImplOptions.Synchronized)] public KeyValuePair[] ToArray() { + using var lockRelease = _lock.Acquire(); + int i = 0; KeyValuePair[] array = new KeyValuePair[_cacheMap.Count]; foreach (KeyValuePair> kvp in _cacheMap) diff --git a/src/Nethermind/Nethermind.Core/Caching/LruKeyCache.cs b/src/Nethermind/Nethermind.Core/Caching/LruKeyCache.cs index 4dd895c9680..cc58f807a16 100644 --- a/src/Nethermind/Nethermind.Core/Caching/LruKeyCache.cs +++ b/src/Nethermind/Nethermind.Core/Caching/LruKeyCache.cs @@ -4,8 +4,8 @@ using System; using System.Collections.Generic; using System.Diagnostics.CodeAnalysis; -using System.Runtime.CompilerServices; using Nethermind.Core.Extensions; +using Nethermind.Core.Threading; namespace Nethermind.Core.Caching { @@ -14,6 +14,7 @@ public sealed class LruKeyCache where TKey : notnull private readonly int _maxCapacity; private readonly string _name; private readonly Dictionary> _cacheMap; + private readonly McsLock _lock = new(); private LinkedListNode? _leastRecentlyUsed; public LruKeyCache(int maxCapacity, int startCapacity, string name) @@ -30,16 +31,18 @@ public LruKeyCache(int maxCapacity, string name) { } - [MethodImpl(MethodImplOptions.Synchronized)] public void Clear() { + using var lockRelease = _lock.Acquire(); + _leastRecentlyUsed = null; _cacheMap.Clear(); } - [MethodImpl(MethodImplOptions.Synchronized)] public bool Get(TKey key) { + using var lockRelease = _lock.Acquire(); + if (_cacheMap.TryGetValue(key, out LinkedListNode? node)) { LinkedListNode.MoveToMostRecent(ref _leastRecentlyUsed, node); @@ -49,9 +52,10 @@ public bool Get(TKey key) return false; } - [MethodImpl(MethodImplOptions.Synchronized)] public bool Set(TKey key) { + using var lockRelease = _lock.Acquire(); + if (_cacheMap.TryGetValue(key, out LinkedListNode? node)) { LinkedListNode.MoveToMostRecent(ref _leastRecentlyUsed, node); @@ -74,9 +78,10 @@ public bool Set(TKey key) } } - [MethodImpl(MethodImplOptions.Synchronized)] public void Delete(TKey key) { + using var lockRelease = _lock.Acquire(); + if (_cacheMap.TryGetValue(key, out LinkedListNode? node)) { LinkedListNode.Remove(ref _leastRecentlyUsed, node); diff --git a/src/Nethermind/Nethermind.Core/Caching/SpanLruCache.cs b/src/Nethermind/Nethermind.Core/Caching/SpanLruCache.cs index 11d36c676f4..4436df72a7d 100644 --- a/src/Nethermind/Nethermind.Core/Caching/SpanLruCache.cs +++ b/src/Nethermind/Nethermind.Core/Caching/SpanLruCache.cs @@ -7,6 +7,7 @@ using System.Linq; using System.Runtime.CompilerServices; using Nethermind.Core.Collections; +using Nethermind.Core.Threading; namespace Nethermind.Core.Caching { @@ -19,6 +20,7 @@ public sealed class SpanLruCache : ISpanCache where { private readonly int _maxCapacity; private readonly SpanDictionary> _cacheMap; + private readonly McsLock _lock = new(); private LinkedListNode? _leastRecentlyUsed; public SpanLruCache(int maxCapacity, int startCapacity, string name, ISpanEqualityComparer comparer) @@ -29,16 +31,18 @@ public SpanLruCache(int maxCapacity, int startCapacity, string name, ISpanEquali _cacheMap = new SpanDictionary>(startCapacity, comparer); } - [MethodImpl(MethodImplOptions.Synchronized)] public void Clear() { + using var lockRelease = _lock.Acquire(); + _leastRecentlyUsed = null; _cacheMap.Clear(); } - [MethodImpl(MethodImplOptions.Synchronized)] public TValue Get(ReadOnlySpan key) { + using var lockRelease = _lock.Acquire(); + if (_cacheMap.TryGetValue(key, out LinkedListNode? node)) { TValue value = node.Value.Value; @@ -52,9 +56,10 @@ public TValue Get(ReadOnlySpan key) #pragma warning restore 8603 } - [MethodImpl(MethodImplOptions.Synchronized)] public bool TryGet(ReadOnlySpan key, out TValue value) { + using var lockRelease = _lock.Acquire(); + if (_cacheMap.TryGetValue(key, out LinkedListNode? node)) { value = node.Value.Value; @@ -69,12 +74,13 @@ public bool TryGet(ReadOnlySpan key, out TValue value) return false; } - [MethodImpl(MethodImplOptions.Synchronized)] public bool Set(ReadOnlySpan key, TValue val) { + using var lockRelease = _lock.Acquire(); + if (val is null) { - return Delete(key); + return DeleteNoLock(key); } if (_cacheMap.TryGetValue(key, out LinkedListNode? node)) @@ -101,8 +107,14 @@ public bool Set(ReadOnlySpan key, TValue val) } } - [MethodImpl(MethodImplOptions.Synchronized)] public bool Delete(ReadOnlySpan key) + { + using var lockRelease = _lock.Acquire(); + + return DeleteNoLock(key); + } + + private bool DeleteNoLock(ReadOnlySpan key) { if (_cacheMap.TryGetValue(key, out LinkedListNode? node)) { @@ -114,14 +126,26 @@ public bool Delete(ReadOnlySpan key) return false; } - [MethodImpl(MethodImplOptions.Synchronized)] - public bool Contains(ReadOnlySpan key) => _cacheMap.ContainsKey(key); + public bool Contains(ReadOnlySpan key) + { + using var lockRelease = _lock.Acquire(); - [MethodImpl(MethodImplOptions.Synchronized)] - public IDictionary Clone() => _cacheMap.ToDictionary(i => i.Key, i => i.Value.Value.Value); + return _cacheMap.ContainsKey(key); + } + + public IDictionary Clone() + { + using var lockRelease = _lock.Acquire(); + + return _cacheMap.ToDictionary(i => i.Key, i => i.Value.Value.Value); + } + + public KeyValuePair[] ToArray() + { + using var lockRelease = _lock.Acquire(); - [MethodImpl(MethodImplOptions.Synchronized)] - public KeyValuePair[] ToArray() => _cacheMap.Select(kv => new KeyValuePair(kv.Key, kv.Value.Value.Value)).ToArray(); + return _cacheMap.Select(kv => new KeyValuePair(kv.Key, kv.Value.Value.Value)).ToArray(); + } private void Replace(ReadOnlySpan key, TValue value) { diff --git a/src/Nethermind/Nethermind.Core/Threading/McsLock.cs b/src/Nethermind/Nethermind.Core/Threading/McsLock.cs new file mode 100644 index 00000000000..84f23f8f542 --- /dev/null +++ b/src/Nethermind/Nethermind.Core/Threading/McsLock.cs @@ -0,0 +1,170 @@ +// SPDX-FileCopyrightText: 2023 Demerzel Solutions Limited +// SPDX-License-Identifier: LGPL-3.0-only + +using System; +using System.Diagnostics; +using System.Diagnostics.CodeAnalysis; +using System.Threading; + +namespace Nethermind.Core.Threading; + +/// +/// MCSLock (Mellor-Crummey and Scott Lock) provides a fair, scalable mutual exclusion lock. +/// This lock is particularly effective in systems with a high number of threads, as it reduces +/// the contention and spinning overhead typical of other spinlocks. It achieves this by forming +/// a queue of waiting threads, ensuring each thread gets the lock in the order it was requested. +/// +public class McsLock +{ + /// + /// Thread-local storage to ensure each thread has its own node instance. + /// + private readonly ThreadLocal _node = new(() => new ThreadNode()); + + /// + /// Points to the last node in the queue (tail). Used to manage the queue of waiting threads. + /// + private volatile ThreadNode? _tail; + + internal volatile Thread? currentLockHolder = null; + + /// + /// Acquires the lock. If the lock is already held, the calling thread is placed into a queue and + /// enters a busy-wait state until the lock becomes available. + /// + public Disposable Acquire() + { + // Check for reentrancy. + if (Thread.CurrentThread == currentLockHolder) + ThrowInvalidOperationException(); + + ThreadNode node = _node.Value!; + node.Locked = true; + + ThreadNode? predecessor = Interlocked.Exchange(ref _tail, node); + if (predecessor is not null) + { + // If there was a previous tail, it means the lock is already held by someone. + // Set this node as the next node of the predecessor. + predecessor.Next = node; + + // Busy-wait (spin) until our 'Locked' flag is set to false by the thread + // that is releasing the lock. + SpinWait sw = default; + // This lock is more scalable than regular locks as each thread + // spins on their own local flag rather than a shared flag for + // lower cpu cache thrashing. Drawback is it is a strict queue and + // the next thread in line may be sleeping when lock is released. + while (node.Locked) + { + if (sw.NextSpinWillYield) + { + // We use Monitor signalling to try to combat additional latency + // that may be introduced by the strict in-order thread queuing + // rather than letting the SpinWait sleep the thread. + lock (node) + { + if (node.Locked) + // Sleep till signal + Monitor.Wait(node); + else + { + // Acquired the lock + break; + } + } + } + else + { + sw.SpinOnce(); + } + } + } + + // Set current lock holder. + currentLockHolder = Thread.CurrentThread; + + return new Disposable(this); + + [DoesNotReturn] + [StackTraceHidden] + static void ThrowInvalidOperationException() + { + throw new InvalidOperationException("Lock is not reentrant"); + } + } + + /// + /// Used to releases the lock. If there are waiting threads in the queue, it passes the lock to the next + /// thread in line. + /// + public readonly struct Disposable : IDisposable + { + readonly McsLock _lock; + + internal Disposable(McsLock @lock) + { + _lock = @lock; + } + + /// + /// Releases the lock. If there are waiting threads in the queue, it passes the lock to the next + /// thread in line. + /// + public void Dispose() + { + ThreadNode node = _lock._node.Value!; + + // If there is no next node, it means this thread might be the last in the queue. + if (node.Next == null) + { + // Attempt to atomically set the tail to null, indicating no thread is waiting. + // If it is still 'node', then there are no other waiting threads. + if (Interlocked.CompareExchange(ref _lock._tail, null, node) == node) + { + // Clear current lock holder. + _lock.currentLockHolder = null; + return; + } + + // If another thread is in the process of enqueuing itself, + // wait until it finishes setting its node as the 'Next' node. + SpinWait sw = default; + while (node.Next == null) + { + sw.SpinOnce(); + } + } + + // Clear current lock holder. + _lock.currentLockHolder = null; + + // Pass the lock to the next thread by setting its 'Locked' flag to false. + node.Next.Locked = false; + + lock (node.Next) + { + // Wake up next node if sleeping + Monitor.Pulse(node.Next); + } + // Remove the reference to the next node + node.Next = null; + } + } + + /// + /// Node class to represent each thread in the MCS lock queue. + /// + private class ThreadNode + { + /// + /// Indicates whether the current thread is waiting for the lock. + /// + public volatile bool Locked = true; + + /// + /// Points to the next node in the queue. + /// + public ThreadNode? Next = null; + } +} diff --git a/src/Nethermind/Nethermind.Core/Threading/McsPriorityLock.cs b/src/Nethermind/Nethermind.Core/Threading/McsPriorityLock.cs new file mode 100644 index 00000000000..72407ed8efb --- /dev/null +++ b/src/Nethermind/Nethermind.Core/Threading/McsPriorityLock.cs @@ -0,0 +1,72 @@ +// SPDX-FileCopyrightText: 2023 Demerzel Solutions Limited +// SPDX-License-Identifier: LGPL-3.0-only + +using System; +using System.Diagnostics.CodeAnalysis; +using System.Diagnostics; +using System.Threading; + +namespace Nethermind.Core.Threading; + +/// +/// MCSLock (Mellor-Crummey and Scott Lock) provides a fair, scalable mutual exclusion lock. +/// The McsPriorityLock allows higher priority threads to queue jump on the lock queue. +/// This lock is particularly effective in systems with a high number of threads, as it reduces +/// the contention and spinning overhead typical of other spinlocks. It achieves this by forming +/// a queue of waiting threads, ensuring each thread gets the lock in the order it was requested. +/// +public class McsPriorityLock +{ + private readonly int HalfCores = Math.Max(Environment.ProcessorCount / 2, 1); + + private readonly McsLock _coreLock = new(); + private readonly McsLock[] _queuedLocks; + private uint _queueId; + + public McsPriorityLock() + { + var queue = new McsLock[HalfCores]; + for (var i = 0; i < queue.Length; i++) + { + queue[i] = new McsLock(); + } + + _queuedLocks = queue; + } + + /// + /// Acquires the lock. If the lock is already held, the calling thread is placed into a queue and + /// enters a busy-wait state until the lock becomes available. + /// + /// Higher priority threads will queue jump. + /// + public McsLock.Disposable Acquire() + { + // Check for reentrancy. + if (Thread.CurrentThread == _coreLock.currentLockHolder) + ThrowInvalidOperationException(); + + var isPriority = Thread.CurrentThread.Priority > ThreadPriority.Normal; + if (!isPriority) + // If not a priority thread max of half processors can being to acquire the lock (e.g. block processing) + return NonPriorityAcquire(); + + return _coreLock.Acquire(); + + [DoesNotReturn] + [StackTraceHidden] + static void ThrowInvalidOperationException() + { + throw new InvalidOperationException("Lock is not reentrant"); + } + } + + private McsLock.Disposable NonPriorityAcquire() + { + var queueId = Interlocked.Increment(ref _queueId) % (uint)_queuedLocks.Length; + + using var handle = _queuedLocks[queueId].Acquire(); + + return _coreLock.Acquire(); + } +} diff --git a/src/Nethermind/Nethermind.Core/Threading/ThreadExtensions.cs b/src/Nethermind/Nethermind.Core/Threading/ThreadExtensions.cs new file mode 100644 index 00000000000..31d892ba8b6 --- /dev/null +++ b/src/Nethermind/Nethermind.Core/Threading/ThreadExtensions.cs @@ -0,0 +1,33 @@ +// SPDX-FileCopyrightText: 2023 Demerzel Solutions Limited +// SPDX-License-Identifier: LGPL-3.0-only + +using System; +using System.Threading; + +namespace Nethermind.Core.Threading; + +public static class ThreadExtensions +{ + public readonly struct Disposable : IDisposable + { + private readonly Thread _thread; + private readonly ThreadPriority _previousPriority; + + internal Disposable(Thread thread) + { + _thread = thread; + _previousPriority = thread.Priority; + thread.Priority = ThreadPriority.AboveNormal; + } + + public void Dispose() + { + _thread.Priority = _previousPriority; + } + } + + public static Disposable BoostPriority(this Thread thread) + { + return new Disposable(thread); + } +} diff --git a/src/Nethermind/Nethermind.Core/Transaction.cs b/src/Nethermind/Nethermind.Core/Transaction.cs index a8c6956b9df..f83f0ea5d75 100644 --- a/src/Nethermind/Nethermind.Core/Transaction.cs +++ b/src/Nethermind/Nethermind.Core/Transaction.cs @@ -4,13 +4,16 @@ using System; using System.Buffers; using System.Diagnostics; +using System.Runtime.CompilerServices; using System.Text; +using System.Text.Json.Serialization; using Microsoft.Extensions.ObjectPool; using Nethermind.Core.Crypto; using Nethermind.Core.Eip2930; using Nethermind.Core.Extensions; using Nethermind.Int256; +[assembly: InternalsVisibleTo("Nethermind.Consensus")] namespace Nethermind.Core { [DebuggerDisplay("{Hash}, Value: {Value}, To: {To}, Gas: {GasLimit}")] @@ -53,29 +56,45 @@ public class Transaction public bool IsMessageCall => To is not null; private Hash256? _hash; - public Hash256? Hash + + [JsonIgnore] + internal bool IsHashCalculated => _hash is not null; + internal Hash256 CalculateHashInternal() { - get + Hash256? hash = _hash; + if (hash is not null) return hash; + + lock (this) { - if (_hash is not null) return _hash; + hash = _hash; + if (hash is not null) return hash; - lock (this) + if (_preHash.Length > 0) { - if (_hash is not null) return _hash; - - if (_preHash.Length > 0) - { - _hash = Keccak.Compute(_preHash.Span); - ClearPreHashInternal(); - } + _hash = hash = Keccak.Compute(_preHash.Span); + ClearPreHashInternal(); } + } - return _hash; + return hash!; + } + + public Hash256? Hash + { + get + { + Hash256? hash = _hash; + if (hash is not null) return hash; + + return CalculateHashInternal(); } set { - ClearPreHash(); - _hash = value; + lock (this) + { + ClearPreHash(); + _hash = value; + } } } diff --git a/src/Nethermind/Nethermind.Db.Rocks/DbOnTheRocks.cs b/src/Nethermind/Nethermind.Db.Rocks/DbOnTheRocks.cs index cf639221c54..a509f96e874 100644 --- a/src/Nethermind/Nethermind.Db.Rocks/DbOnTheRocks.cs +++ b/src/Nethermind/Nethermind.Db.Rocks/DbOnTheRocks.cs @@ -15,6 +15,7 @@ using Nethermind.Core.Crypto; using Nethermind.Core.Exceptions; using Nethermind.Core.Extensions; +using Nethermind.Core.Threading; using Nethermind.Db.Rocks.Config; using Nethermind.Db.Rocks.Statistics; using Nethermind.Logging; @@ -25,6 +26,9 @@ namespace Nethermind.Db.Rocks; public class DbOnTheRocks : IDb, ITunableDb { + private McsPriorityLock _readThrottle = new(); + private McsPriorityLock _writeThrottle = new(); + private ILogger _logger; private string? _fullPath; @@ -501,6 +505,7 @@ public byte[]? this[ReadOnlySpan key] } } + using var handle = _readThrottle.Acquire(); return _db.Get(key, cf); } catch (RocksDbSharpException e) @@ -523,6 +528,7 @@ internal void SetWithColumnFamily(ReadOnlySpan key, ColumnFamilyHandle? cf try { + using var handle = _writeThrottle.Acquire(); if (value.IsNull()) { _db.Remove(key, cf, WriteFlagsToWriteOptions(flags)); @@ -566,6 +572,7 @@ internal void SetWithColumnFamily(ReadOnlySpan key, ColumnFamilyHandle? cf { try { + using var handle = _readThrottle.Acquire(); return _db.MultiGet(keys); } catch (RocksDbSharpException e) @@ -589,7 +596,11 @@ internal Span GetSpanWithColumnFamily(ReadOnlySpan key, ColumnFamily try { - Span span = _db.GetSpan(key, cf); + Span span; + using (var handle = _readThrottle.Acquire()) + { + span = _db.GetSpan(key, cf); + } if (!span.IsNullOrEmpty()) { Interlocked.Increment(ref _allocatedSpan); @@ -625,6 +636,7 @@ public void Remove(ReadOnlySpan key) try { + using var handle = _writeThrottle.Acquire(); _db.Remove(key, null, WriteOptions); } catch (RocksDbSharpException e) @@ -812,6 +824,7 @@ public bool KeyExists(ReadOnlySpan key) try { + using var handle = _readThrottle.Acquire(); // seems it has no performance impact return _db.Get(key) is not null; // return _db.Get(key, 32, _keyExistsBuffer, 0, 0, null, null) != -1; @@ -890,7 +903,11 @@ public void Dispose() try { - _dbOnTheRocks._db.Write(_rocksBatch, _dbOnTheRocks.WriteFlagsToWriteOptions(_writeFlags)); + using (var handle = _dbOnTheRocks._writeThrottle.Acquire()) + { + _dbOnTheRocks._db.Write(_rocksBatch, _dbOnTheRocks.WriteFlagsToWriteOptions(_writeFlags)); + } + _dbOnTheRocks._currentBatches.TryRemove(this); ReturnWriteBatch(_rocksBatch); } @@ -943,7 +960,10 @@ private void FlushOnTooManyWrites() try { - _dbOnTheRocks._db.Write(currentBatch, _dbOnTheRocks.WriteFlagsToWriteOptions(_writeFlags)); + using (var handle = _dbOnTheRocks._writeThrottle.Acquire()) + { + _dbOnTheRocks._db.Write(currentBatch, _dbOnTheRocks.WriteFlagsToWriteOptions(_writeFlags)); + } ReturnWriteBatch(currentBatch); } catch (RocksDbSharpException e) diff --git a/src/Nethermind/Nethermind.Merge.Plugin/Handlers/ForkchoiceUpdatedHandler.cs b/src/Nethermind/Nethermind.Merge.Plugin/Handlers/ForkchoiceUpdatedHandler.cs index d8ec1a33370..2685c2981bb 100644 --- a/src/Nethermind/Nethermind.Merge.Plugin/Handlers/ForkchoiceUpdatedHandler.cs +++ b/src/Nethermind/Nethermind.Merge.Plugin/Handlers/ForkchoiceUpdatedHandler.cs @@ -3,6 +3,7 @@ using System; using System.Collections.Generic; +using System.Threading; using System.Threading.Tasks; using Nethermind.Blockchain; using Nethermind.Blockchain.Find; @@ -12,6 +13,7 @@ using Nethermind.Core; using Nethermind.Core.Crypto; using Nethermind.Core.Specs; +using Nethermind.Core.Threading; using Nethermind.Crypto; using Nethermind.JsonRpc; using Nethermind.Logging; @@ -86,6 +88,8 @@ public Task> Handle(ForkchoiceStateV1 f private ResultWrapper? ApplyForkchoiceUpdate(Block? newHeadBlock, ForkchoiceStateV1 forkchoiceState, PayloadAttributes? payloadAttributes) { + using var handle = Thread.CurrentThread.BoostPriority(); + if (_invalidChainTracker.IsOnKnownInvalidChain(forkchoiceState.HeadBlockHash, out Hash256? lastValidHash)) { if (_logger.IsInfo) _logger.Info($"Received Invalid {forkchoiceState} {payloadAttributes} - {forkchoiceState.HeadBlockHash} is known to be a part of an invalid chain."); diff --git a/src/Nethermind/Nethermind.TxPool/Collections/BlobTxDistinctSortedPool.cs b/src/Nethermind/Nethermind.TxPool/Collections/BlobTxDistinctSortedPool.cs index d77c44aeeca..96332932782 100644 --- a/src/Nethermind/Nethermind.TxPool/Collections/BlobTxDistinctSortedPool.cs +++ b/src/Nethermind/Nethermind.TxPool/Collections/BlobTxDistinctSortedPool.cs @@ -24,7 +24,7 @@ protected override IComparer GetReplacementComparer(IComparer _poolCapacity && _logger.IsWarn) + if (_logger.IsWarn && Count > _poolCapacity) _logger.Warn($"Blob pool exceeds the config size {Count}/{_poolCapacity}"); } diff --git a/src/Nethermind/Nethermind.TxPool/Collections/DistinctValueSortedPool.cs b/src/Nethermind/Nethermind.TxPool/Collections/DistinctValueSortedPool.cs index e6071d4947f..948871ada72 100644 --- a/src/Nethermind/Nethermind.TxPool/Collections/DistinctValueSortedPool.cs +++ b/src/Nethermind/Nethermind.TxPool/Collections/DistinctValueSortedPool.cs @@ -49,7 +49,7 @@ protected override void InsertCore(TKey key, TValue value, TGroupKey groupKey) { if (_distinctDictionary.TryGetValue(value, out KeyValuePair oldKvp)) { - TryRemove(oldKvp.Key); + TryRemoveNonLocked(oldKvp.Key, evicted: false, out _, out _); } base.InsertCore(key, value, groupKey); diff --git a/src/Nethermind/Nethermind.TxPool/Collections/SortedPool.cs b/src/Nethermind/Nethermind.TxPool/Collections/SortedPool.cs index 578b29700b4..13243f06886 100644 --- a/src/Nethermind/Nethermind.TxPool/Collections/SortedPool.cs +++ b/src/Nethermind/Nethermind.TxPool/Collections/SortedPool.cs @@ -8,6 +8,7 @@ using System.Linq; using System.Runtime.CompilerServices; using Nethermind.Core.Collections; +using Nethermind.Core.Threading; namespace Nethermind.TxPool.Collections { @@ -21,6 +22,8 @@ public abstract partial class SortedPool where TKey : notnull where TGroupKey : notnull { + protected McsPriorityLock Lock { get; } = new(); + private readonly int _capacity; // comparer for a bucket @@ -82,9 +85,10 @@ protected SortedPool(int capacity, IComparer comparer) /// /// Gets all items in random order. /// - [MethodImpl(MethodImplOptions.Synchronized)] public TValue[] GetSnapshot() { + using var lockRelease = Lock.Acquire(); + TValue[]? snapshot = _snapshot; snapshot ??= _snapshot = _buckets.SelectMany(b => b.Value).ToArray(); @@ -94,9 +98,10 @@ public TValue[] GetSnapshot() /// /// Gets all items in groups in supplied comparer order in groups. /// - [MethodImpl(MethodImplOptions.Synchronized)] public IDictionary GetBucketSnapshot(Predicate? where = null) { + using var lockRelease = Lock.Acquire(); + IEnumerable>> buckets = _buckets; if (where is not null) { @@ -108,9 +113,10 @@ public IDictionary GetBucketSnapshot(Predicate? /// /// Gets all items of requested group. /// - [MethodImpl(MethodImplOptions.Synchronized)] public TValue[] GetBucketSnapshot(TGroupKey group) { + using var lockRelease = Lock.Acquire(); + if (group is null) throw new ArgumentNullException(nameof(group)); return _buckets.TryGetValue(group, out EnhancedSortedSet? bucket) ? bucket.ToArray() : Array.Empty(); } @@ -118,9 +124,10 @@ public TValue[] GetBucketSnapshot(TGroupKey group) /// /// Gets number of items in requested group. /// - [MethodImpl(MethodImplOptions.Synchronized)] public int GetBucketCount(TGroupKey group) { + using var lockRelease = Lock.Acquire(); + if (group is null) throw new ArgumentNullException(nameof(group)); return _buckets.TryGetValue(group, out EnhancedSortedSet? bucket) ? bucket.Count : 0; } @@ -128,7 +135,6 @@ public int GetBucketCount(TGroupKey group) /// /// Takes first element in supplied comparer order. /// - [MethodImpl(MethodImplOptions.Synchronized)] public bool TryTakeFirst(out TValue? first) { if (GetFirsts().Min is TValue min) @@ -140,9 +146,10 @@ public bool TryTakeFirst(out TValue? first) /// /// Returns best element of each bucket in supplied comparer order. /// - [MethodImpl(MethodImplOptions.Synchronized)] public EnhancedSortedSet GetFirsts() { + using var lockRelease = Lock.Acquire(); + EnhancedSortedSet sortedValues = new(_sortedComparer); foreach (KeyValuePair> bucket in _buckets) { @@ -172,11 +179,14 @@ protected void UpdateWorstValue() => /// Removed element or null. /// Bucket for same sender transactions. /// If element was removed. False if element was not present in pool. - [MethodImpl(MethodImplOptions.Synchronized)] - private bool TryRemove(TKey key, out TValue? value, [NotNullWhen(true)] out ICollection? bucket) => - TryRemove(key, false, out value, out bucket); + private bool TryRemove(TKey key, out TValue? value, [NotNullWhen(true)] out ICollection? bucket) + { + using var lockRelease = Lock.Acquire(); - private bool TryRemove(TKey key, bool evicted, [NotNullWhen(true)] out TValue? value, out ICollection? bucket) + return TryRemoveNonLocked(key, false, out value, out bucket); + } + + protected bool TryRemoveNonLocked(TKey key, bool evicted, [NotNullWhen(true)] out TValue? value, out ICollection? bucket) { if (_cacheMap.TryGetValue(key, out value) && value != null) { @@ -219,10 +229,8 @@ private bool TryRemove(TKey key, bool evicted, [NotNullWhen(true)] out TValue? v protected abstract TKey GetKey(TValue value); - [MethodImpl(MethodImplOptions.Synchronized)] public bool TryRemove(TKey key, [NotNullWhen(true)] out TValue? value) => TryRemove(key, out value, out _); - [MethodImpl(MethodImplOptions.Synchronized)] public bool TryRemove(TKey key) => TryRemove(key, out _, out _); /// @@ -231,9 +239,10 @@ private bool TryRemove(TKey key, bool evicted, [NotNullWhen(true)] out TValue? v /// Given GroupKey, which elements are checked. /// Predicated criteria. /// Elements matching predicated criteria. - [MethodImpl(MethodImplOptions.Synchronized)] public IEnumerable TakeWhile(TGroupKey groupKey, Predicate where) { + using var lockRelease = Lock.Acquire(); + if (_buckets.TryGetValue(groupKey, out EnhancedSortedSet? bucket)) { using EnhancedSortedSet.Enumerator enumerator = bucket!.GetEnumerator(); @@ -260,9 +269,10 @@ public IEnumerable TakeWhile(TGroupKey groupKey, Predicate where /// /// Key to check presence. /// True if element is present in pool. - [MethodImpl(MethodImplOptions.Synchronized)] public bool ContainsKey(TKey key) { + using var lockRelease = Lock.Acquire(); + return _cacheMap.ContainsKey(key); } @@ -272,9 +282,10 @@ public bool ContainsKey(TKey key) /// Key to be returned. /// Returned element or null. /// If element retrieval succeeded. True if element was present in pool. - [MethodImpl(MethodImplOptions.Synchronized)] public virtual bool TryGetValue(TKey key, [NotNullWhen(true)] out TValue? value) { + using var lockRelease = Lock.Acquire(); + return _cacheMap.TryGetValue(key, out value) && value != null; } @@ -285,9 +296,10 @@ public virtual bool TryGetValue(TKey key, [NotNullWhen(true)] out TValue? value) /// Element to insert. /// Element removed because of exceeding capacity /// If element was inserted. False if element was already present in pool. - [MethodImpl(MethodImplOptions.Synchronized)] public virtual bool TryInsert(TKey key, TValue value, out TValue? removed) { + using var lockRelease = Lock.Acquire(); + if (CanInsert(key, value)) { TGroupKey group = MapToGroup(value); @@ -311,7 +323,6 @@ public virtual bool TryInsert(TKey key, TValue value, out TValue? removed) return false; } - [MethodImpl(MethodImplOptions.Synchronized)] public bool TryInsert(TKey key, TValue value) => TryInsert(key, value, out _); private void RemoveLast(out TValue? removed) @@ -319,7 +330,7 @@ private void RemoveLast(out TValue? removed) TKey? key = _worstValue.GetValueOrDefault().Value; if (key is not null) { - TryRemove(key, true, out removed, out _); + TryRemoveNonLocked(key, true, out removed, out _); } else { @@ -401,13 +412,17 @@ protected virtual bool Remove(TKey key, TValue value) private void UpdateIsFull() => _isFull = _cacheMap.Count >= _capacity; - [MethodImpl(MethodImplOptions.Synchronized)] - public bool ContainsBucket(TGroupKey groupKey) => - _buckets.ContainsKey(groupKey); + public bool ContainsBucket(TGroupKey groupKey) + { + using var lockRelease = Lock.Acquire(); + + return _buckets.ContainsKey(groupKey); + } - [MethodImpl(MethodImplOptions.Synchronized)] public bool TryGetBucket(TGroupKey groupKey, out TValue[] items) { + using var lockRelease = Lock.Acquire(); + if (_buckets.TryGetValue(groupKey, out EnhancedSortedSet? bucket)) { items = bucket.ToArray(); @@ -418,9 +433,10 @@ public bool TryGetBucket(TGroupKey groupKey, out TValue[] items) return false; } - [MethodImpl(MethodImplOptions.Synchronized)] public bool TryGetBucketsWorstValue(TGroupKey groupKey, out TValue? item) { + using var lockRelease = Lock.Acquire(); + if (_buckets.TryGetValue(groupKey, out EnhancedSortedSet? bucket)) { item = bucket.Max; @@ -431,9 +447,10 @@ public bool TryGetBucketsWorstValue(TGroupKey groupKey, out TValue? item) return false; } - [MethodImpl(MethodImplOptions.Synchronized)] public void UpdatePool(Func, IEnumerable<(TValue Tx, Action? Change)>> changingElements) { + using var lockRelease = Lock.Acquire(); + foreach ((TGroupKey groupKey, EnhancedSortedSet bucket) in _buckets) { Debug.Assert(bucket.Count > 0); @@ -442,9 +459,10 @@ public void UpdatePool(Func, IEnumerable<( } } - [MethodImpl(MethodImplOptions.Synchronized)] public void UpdateGroup(TGroupKey groupKey, Func, IEnumerable<(TValue Tx, Action? Change)>> changingElements) { + using var lockRelease = Lock.Acquire(); + if (groupKey is null) throw new ArgumentNullException(nameof(groupKey)); if (_buckets.TryGetValue(groupKey, out EnhancedSortedSet? bucket)) { diff --git a/src/Nethermind/Nethermind.TxPool/Collections/TxDistinctSortedPool.cs b/src/Nethermind/Nethermind.TxPool/Collections/TxDistinctSortedPool.cs index a035dcd6c4d..abb9a8bd3b7 100644 --- a/src/Nethermind/Nethermind.TxPool/Collections/TxDistinctSortedPool.cs +++ b/src/Nethermind/Nethermind.TxPool/Collections/TxDistinctSortedPool.cs @@ -71,19 +71,20 @@ protected override void UpdateGroup(Address groupKey, EnhancedSortedSet, IEnumerable<(Transaction Tx, UInt256? changedGasBottleneck)>> changingElements) { + using var lockRelease = Lock.Acquire(); + foreach ((Address address, EnhancedSortedSet bucket) in _buckets) { Debug.Assert(bucket.Count > 0); Account? account = accounts.GetAccount(address); - UpdateGroup(address, account, bucket, changingElements); + UpdateGroupNonLocked(address, account, bucket, changingElements); } } - private void UpdateGroup(Address groupKey, Account groupValue, EnhancedSortedSet bucket, Func, IEnumerable<(Transaction Tx, UInt256? changedGasBottleneck)>> changingElements) + private void UpdateGroupNonLocked(Address groupKey, Account groupValue, EnhancedSortedSet bucket, Func, IEnumerable<(Transaction Tx, UInt256? changedGasBottleneck)>> changingElements) { _transactionsToRemove.Clear(); Transaction? lastElement = bucket.Max; @@ -114,25 +115,26 @@ private void UpdateGroup(Address groupKey, Account groupValue, EnhancedSortedSet ReadOnlySpan txs = CollectionsMarshal.AsSpan(_transactionsToRemove); for (int i = 0; i < txs.Length; i++) { - TryRemove(txs[i].Hash!); + TryRemoveNonLocked(txs[i].Hash!, evicted: false, out _, out _); } } - [MethodImpl(MethodImplOptions.Synchronized)] public void UpdateGroup(Address groupKey, Account groupValue, Func, IEnumerable<(Transaction Tx, UInt256? changedGasBottleneck)>> changingElements) { + using var lockRelease = Lock.Acquire(); + ArgumentNullException.ThrowIfNull(groupKey); if (_buckets.TryGetValue(groupKey, out EnhancedSortedSet? bucket)) { Debug.Assert(bucket.Count > 0); - UpdateGroup(groupKey, groupValue, bucket, changingElements); + UpdateGroupNonLocked(groupKey, groupValue, bucket, changingElements); } } public virtual void VerifyCapacity() { - if (Count > _poolCapacity && _logger.IsWarn) + if (_logger.IsWarn && Count > _poolCapacity) _logger.Warn($"TxPool exceeds the config size {Count}/{_poolCapacity}"); } } diff --git a/src/Nethermind/Nethermind.TxPool/TxPool.cs b/src/Nethermind/Nethermind.TxPool/TxPool.cs index 235f149a88c..26f204f4103 100644 --- a/src/Nethermind/Nethermind.TxPool/TxPool.cs +++ b/src/Nethermind/Nethermind.TxPool/TxPool.cs @@ -31,8 +31,6 @@ namespace Nethermind.TxPool /// public class TxPool : ITxPool, IDisposable { - private readonly object _locker = new(); - private readonly IIncomingTxFilter[] _preHashFilters; private readonly IIncomingTxFilter[] _postHashFilters; @@ -430,54 +428,51 @@ private AcceptTxResult FilterTransactions(Transaction tx, TxHandlingOptions hand private AcceptTxResult AddCore(Transaction tx, TxFilteringState state, bool isPersistentBroadcast) { - lock (_locker) - { - bool eip1559Enabled = _specProvider.GetCurrentHeadSpec().IsEip1559Enabled; - UInt256 effectiveGasPrice = tx.CalculateEffectiveGasPrice(eip1559Enabled, _headInfo.CurrentBaseFee); - TxDistinctSortedPool relevantPool = (tx.SupportsBlobs ? _blobTransactions : _transactions); + bool eip1559Enabled = _specProvider.GetCurrentHeadSpec().IsEip1559Enabled; + UInt256 effectiveGasPrice = tx.CalculateEffectiveGasPrice(eip1559Enabled, _headInfo.CurrentBaseFee); + TxDistinctSortedPool relevantPool = (tx.SupportsBlobs ? _blobTransactions : _transactions); - relevantPool.TryGetBucketsWorstValue(tx.SenderAddress!, out Transaction? worstTx); - tx.GasBottleneck = (worstTx is null || effectiveGasPrice <= worstTx.GasBottleneck) - ? effectiveGasPrice - : worstTx.GasBottleneck; + relevantPool.TryGetBucketsWorstValue(tx.SenderAddress!, out Transaction? worstTx); + tx.GasBottleneck = (worstTx is null || effectiveGasPrice <= worstTx.GasBottleneck) + ? effectiveGasPrice + : worstTx.GasBottleneck; - bool inserted = relevantPool.TryInsert(tx.Hash!, tx, out Transaction? removed); + bool inserted = relevantPool.TryInsert(tx.Hash!, tx, out Transaction? removed); - if (!inserted) - { - // it means it failed on adding to the pool - it is possible when new tx has the same sender - // and nonce as already existent tx and is not good enough to replace it - Metrics.PendingTransactionsPassedFiltersButCannotReplace++; - return AcceptTxResult.ReplacementNotAllowed; - } + if (!inserted) + { + // it means it failed on adding to the pool - it is possible when new tx has the same sender + // and nonce as already existent tx and is not good enough to replace it + Metrics.PendingTransactionsPassedFiltersButCannotReplace++; + return AcceptTxResult.ReplacementNotAllowed; + } - if (tx.Hash == removed?.Hash) + if (tx.Hash == removed?.Hash) + { + // it means it was added and immediately evicted - pool was full of better txs + if (isPersistentBroadcast) { - // it means it was added and immediately evicted - pool was full of better txs - if (isPersistentBroadcast) - { - // we are adding only to persistent broadcast - not good enough for standard pool, - // but can be good enough for TxBroadcaster pool - for local txs only - _broadcaster.Broadcast(tx, isPersistentBroadcast); - } - Metrics.PendingTransactionsPassedFiltersButCannotCompeteOnFees++; - return AcceptTxResult.FeeTooLowToCompete; + // we are adding only to persistent broadcast - not good enough for standard pool, + // but can be good enough for TxBroadcaster pool - for local txs only + _broadcaster.Broadcast(tx, isPersistentBroadcast); } + Metrics.PendingTransactionsPassedFiltersButCannotCompeteOnFees++; + return AcceptTxResult.FeeTooLowToCompete; + } - relevantPool.UpdateGroup(tx.SenderAddress!, state.SenderAccount, UpdateBucketWithAddedTransaction); - Metrics.PendingTransactionsAdded++; - if (tx.Supports1559) { Metrics.Pending1559TransactionsAdded++; } - if (tx.SupportsBlobs) { Metrics.PendingBlobTransactionsAdded++; } + relevantPool.UpdateGroup(tx.SenderAddress!, state.SenderAccount, UpdateBucketWithAddedTransaction); + Metrics.PendingTransactionsAdded++; + if (tx.Supports1559) { Metrics.Pending1559TransactionsAdded++; } + if (tx.SupportsBlobs) { Metrics.PendingBlobTransactionsAdded++; } - if (removed is not null) - { - EvictedPending?.Invoke(this, new TxEventArgs(removed)); - // transaction which was on last position in sorted TxPool and was deleted to give - // a place for a newly added tx (with higher priority) is now removed from hashCache - // to give it opportunity to come back to TxPool in the future, when fees drops - _hashCache.DeleteFromLongTerm(removed.Hash!); - Metrics.PendingTransactionsEvicted++; - } + if (removed is not null) + { + EvictedPending?.Invoke(this, new TxEventArgs(removed)); + // transaction which was on last position in sorted TxPool and was deleted to give + // a place for a newly added tx (with higher priority) is now removed from hashCache + // to give it opportunity to come back to TxPool in the future, when fees drops + _hashCache.DeleteFromLongTerm(removed.Hash!); + Metrics.PendingTransactionsEvicted++; } _broadcaster.Broadcast(tx, isPersistentBroadcast); @@ -558,14 +553,11 @@ private AcceptTxResult AddCore(Transaction tx, TxFilteringState state, bool isPe private void UpdateBuckets() { - lock (_locker) - { - _transactions.VerifyCapacity(); - _transactions.UpdatePool(_accounts, _updateBucket); + _transactions.VerifyCapacity(); + _transactions.UpdatePool(_accounts, _updateBucket); - _blobTransactions.VerifyCapacity(); - _blobTransactions.UpdatePool(_accounts, _updateBucket); - } + _blobTransactions.VerifyCapacity(); + _blobTransactions.UpdatePool(_accounts, _updateBucket); } private IEnumerable<(Transaction Tx, UInt256? changedGasBottleneck)> UpdateBucket(Address address, Account account, EnhancedSortedSet transactions) @@ -628,22 +620,21 @@ public bool RemoveTransaction(Hash256? hash) return false; } - bool hasBeenRemoved; - lock (_locker) - { - hasBeenRemoved = _transactions.TryRemove(hash, out Transaction? transaction) + bool hasBeenRemoved = _transactions.TryRemove(hash, out Transaction? transaction) || _blobTransactions.TryRemove(hash, out transaction); - if (transaction is null || !hasBeenRemoved) - return false; - if (hasBeenRemoved) - { - RemovedPending?.Invoke(this, new TxEventArgs(transaction)); - } + if (transaction is null || !hasBeenRemoved) + { + return false; + } - _broadcaster.StopBroadcast(hash); + if (hasBeenRemoved) + { + RemovedPending?.Invoke(this, new TxEventArgs(transaction)); } + _broadcaster.StopBroadcast(hash); + if (_logger.IsTrace) _logger.Trace($"Removed a transaction: {hash}"); return hasBeenRemoved; @@ -655,20 +646,14 @@ public bool ContainsTx(Hash256 hash, TxType txType) => txType == TxType.Blob public bool TryGetPendingTransaction(Hash256 hash, out Transaction? transaction) { - lock (_locker) - { - return _transactions.TryGetValue(hash, out transaction) - || _blobTransactions.TryGetValue(hash, out transaction) - || _broadcaster.TryGetPersistentTx(hash, out transaction); - } + return _transactions.TryGetValue(hash, out transaction) + || _blobTransactions.TryGetValue(hash, out transaction) + || _broadcaster.TryGetPersistentTx(hash, out transaction); } public bool TryGetPendingBlobTransaction(Hash256 hash, [NotNullWhen(true)] out Transaction? blobTransaction) { - lock (_locker) - { - return _blobTransactions.TryGetValue(hash, out blobTransaction); - } + return _blobTransactions.TryGetValue(hash, out blobTransaction); } // only for tests - to test sorting