Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Perf/skip commit queue #7571

Merged
merged 16 commits into from
Oct 10, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -702,8 +702,11 @@ public void GetNodeData_returns_cached_trie_nodes()
Hash256 nodeKey = TestItem.KeccakA;
TrieNode node = new(NodeType.Leaf, nodeKey, TestItem.KeccakB.Bytes);
IScopedTrieStore scopedTrieStore = trieStore.GetTrieStore(null);
scopedTrieStore.CommitNode(1, new NodeCommitInfo(node, TreePath.Empty));
scopedTrieStore.FinishBlockCommit(TrieType.State, 1, node);
using (ICommitter committer = scopedTrieStore.BeginCommit(TrieType.State, 1, node))
{
TreePath path = TreePath.Empty;
committer.CommitNode(ref path, new NodeCommitInfo(node));
}

stateDb.KeyExists(nodeKey).Should().BeFalse();
ctx.SyncServer.GetNodeData(new[] { nodeKey }, CancellationToken.None, NodeDataType.All).Should().BeEquivalentTo(new[] { TestItem.KeccakB.BytesToArray() });
Expand Down
376 changes: 233 additions & 143 deletions src/Nethermind/Nethermind.Trie.Test/Pruning/TreeStoreTests.cs

Large diffs are not rendered by default.

11 changes: 8 additions & 3 deletions src/Nethermind/Nethermind.Trie.Test/TrieNodeTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -933,15 +933,20 @@ public void Rlp_is_cloned_when_cloning()
TreePath emptyPath = TreePath.Empty;
leaf1.ResolveKey(trieStore, ref emptyPath, false);
leaf1.Seal();
trieStore.CommitNode(0, new NodeCommitInfo(leaf1, TreePath.Empty));

TrieNode leaf2 = new(NodeType.Leaf);
leaf2.Key = Bytes.FromHexString("abd");
leaf2.Value = new byte[222];
leaf2.ResolveKey(trieStore, ref emptyPath, false);
leaf2.Seal();
trieStore.CommitNode(0, new NodeCommitInfo(leaf2, TreePath.Empty));
trieStore.FinishBlockCommit(TrieType.State, 0, leaf2);

TreePath path = TreePath.Empty;

using (ICommitter? committer = trieStore.BeginCommit(TrieType.State, 0, leaf2))
{
committer.CommitNode(ref path, new NodeCommitInfo(leaf1));
committer.CommitNode(ref path, new NodeCommitInfo(leaf2));
}

TrieNode trieNode = new(NodeType.Branch);
trieNode.SetChild(1, leaf1);
Expand Down
9 changes: 2 additions & 7 deletions src/Nethermind/Nethermind.Trie/CachedTrieStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,9 @@ public ITrieNodeResolver GetStorageTrieNodeResolver(Hash256? address)

public INodeStorage.KeyScheme Scheme => @base.Scheme;

public void CommitNode(long blockNumber, NodeCommitInfo nodeCommitInfo, WriteFlags writeFlags = WriteFlags.None)
public ICommitter BeginCommit(TrieType trieType, long blockNumber, TrieNode? root, WriteFlags writeFlags = WriteFlags.None)
{
@base.CommitNode(blockNumber, nodeCommitInfo, writeFlags);
}

public void FinishBlockCommit(TrieType trieType, long blockNumber, TrieNode? root, WriteFlags writeFlags = WriteFlags.None)
{
@base.FinishBlockCommit(trieType, blockNumber, root, writeFlags);
return @base.BeginCommit(trieType, blockNumber, root, writeFlags);
}

public bool IsPersisted(in TreePath path, in ValueHash256 keccak)
Expand Down
7 changes: 1 addition & 6 deletions src/Nethermind/Nethermind.Trie/NodeCommitInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,25 @@ namespace Nethermind.Trie
public readonly struct NodeCommitInfo
{
public NodeCommitInfo(
TrieNode node,
in TreePath path
TrieNode node
)
{
ChildPositionAtParent = 0;
Node = node;
Path = path;
NodeParent = null;
}

public NodeCommitInfo(
TrieNode node,
TrieNode nodeParent,
in TreePath path,
int childPositionAtParent)
{
ChildPositionAtParent = childPositionAtParent;
Node = node;
Path = path;
NodeParent = nodeParent;
}

public TrieNode? Node { get; }
public readonly TreePath Path;

public TrieNode? NodeParent { get; }

Expand Down
160 changes: 64 additions & 96 deletions src/Nethermind/Nethermind.Trie/PatriciaTree.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Runtime.ExceptionServices;
using System.Threading;
using System.Threading.Tasks;
using Nethermind.Core;
using Nethermind.Core.Buffers;
using Nethermind.Core.Cpu;
using Nethermind.Core.Collections;
using Nethermind.Core.Crypto;
using Nethermind.Core.Extensions;
using Nethermind.Logging;
Expand All @@ -39,10 +40,6 @@ public class PatriciaTree
public TrieType TrieType { get; init; }

private Stack<StackedNode>? _nodeStack;

private ConcurrentQueue<Exception>? _commitExceptions;
private ConcurrentQueue<NodeCommitInfo>? _currentCommit;

public IScopedTrieStore TrieStore { get; }
public ICappedArrayPool? _bufferPool;

Expand All @@ -56,6 +53,9 @@ public class PatriciaTree

public TrieNode? RootRef { get; set; }

// Used to estimate if parallelization is needed during commit
private long _writeBeforeCommit = 0;

/// <summary>
/// Only used in EthereumTests
/// </summary>
Expand Down Expand Up @@ -138,64 +138,60 @@ public void Commit(long blockNumber, bool skipRoot = false, WriteFlags writeFlag
ThrowReadOnlyTrieException();
}

if (RootRef is not null && RootRef.IsDirty)
int maxLevelForConcurrentCommit = -1;
_writeBeforeCommit /= 64;
if (_writeBeforeCommit > 0)
{
Commit(new NodeCommitInfo(RootRef, TreePath.Empty), skipSelf: skipRoot);
while (TryDequeueCommit(out NodeCommitInfo node))
maxLevelForConcurrentCommit++; // Ok, we separate at top level
if (_writeBeforeCommit / 16 > 0)
{
if (_logger.IsTrace) Trace(blockNumber, node);
TrieStore.CommitNode(blockNumber, node, writeFlags: writeFlags);
maxLevelForConcurrentCommit++; // Another level
}

// reset objects
TreePath path = TreePath.Empty;
RootRef!.ResolveKey(TrieStore, ref path, true, bufferPool: _bufferPool);
SetRootHash(RootRef.Keccak!, true);
}
LukaszRozmej marked this conversation as resolved.
Show resolved Hide resolved
_writeBeforeCommit = 0;

TrieStore.FinishBlockCommit(TrieType, blockNumber, RootRef, writeFlags);

if (_logger.IsDebug) Debug(blockNumber);

bool TryDequeueCommit(out NodeCommitInfo value)
using (ICommitter committer = TrieStore.BeginCommit(TrieType, blockNumber, RootRef, writeFlags))
{
Unsafe.SkipInit(out value);
return _currentCommit?.TryDequeue(out value) ?? false;
}
if (RootRef is not null && RootRef.IsDirty)
{
TreePath path = TreePath.Empty;
Commit(committer, ref path, new NodeCommitInfo(RootRef), skipSelf: skipRoot, maxLevelForConcurrentCommit: maxLevelForConcurrentCommit);

[MethodImpl(MethodImplOptions.NoInlining)]
void Trace(long blockNumber, in NodeCommitInfo node)
{
_logger.Trace($"Committing {node} in {blockNumber}");
// reset objects
RootRef!.ResolveKey(TrieStore, ref path, true, bufferPool: _bufferPool);
SetRootHash(RootRef.Keccak!, true);
}
}

if (_logger.IsDebug) Debug(blockNumber);

[MethodImpl(MethodImplOptions.NoInlining)]
void Debug(long blockNumber)
{
_logger.Debug($"Finished committing {RootRef?.Keccak} in block {blockNumber}");
}
}

private void Commit(NodeCommitInfo nodeCommitInfo, bool skipSelf = false)
private void Commit(ICommitter committer, ref TreePath path, NodeCommitInfo nodeCommitInfo, int maxLevelForConcurrentCommit, bool skipSelf = false)
{
if (!_allowCommits)
{
ThrowReadOnlyTrieException();
}

TrieNode node = nodeCommitInfo.Node;
TreePath path = nodeCommitInfo.Path;
if (node!.IsBranch)
{
// idea from EthereumJ - testing parallel branches
if (!_parallelBranches || !nodeCommitInfo.IsRoot)
if (path.Length > maxLevelForConcurrentCommit)
LukaszRozmej marked this conversation as resolved.
Show resolved Hide resolved
{
for (int i = 0; i < 16; i++)
{
if (node.IsChildDirty(i))
{
TreePath childPath = node.GetChildPath(nodeCommitInfo.Path, i);
Commit(new NodeCommitInfo(node.GetChildWithChildPath(TrieStore, ref childPath, i)!, node, childPath, i));
path.AppendMut(i);
TrieNode childNode = node.GetChildWithChildPath(TrieStore, ref path, i);
Commit(committer, ref path, new NodeCommitInfo(childNode!, node, i), maxLevelForConcurrentCommit);
path.TruncateOne();
}
else
{
Expand All @@ -208,13 +204,35 @@ private void Commit(NodeCommitInfo nodeCommitInfo, bool skipSelf = false)
}
else
{
List<NodeCommitInfo> nodesToCommit = new(16);
Task CreateTaskForPath(TreePath childPath, TrieNode childNode, int idx)
{
return Task.Run(() =>
{
Commit(committer, ref childPath, new NodeCommitInfo(childNode!, node, idx), maxLevelForConcurrentCommit);
committer.ReturnConcurrencyQuota();
});
}

ArrayPoolList<Task>? childTasks = null;

LukaszRozmej marked this conversation as resolved.
Show resolved Hide resolved
for (int i = 0; i < 16; i++)
{
if (node.IsChildDirty(i))
{
TreePath childPath = node.GetChildPath(nodeCommitInfo.Path, i);
nodesToCommit.Add(new NodeCommitInfo(node.GetChildWithChildPath(TrieStore, ref childPath, i)!, node, childPath, i));
if (i < 15 && committer.CanSpawnTask())
{
childTasks ??= new ArrayPoolList<Task>(15);
TreePath childPath = path.Append(i);
TrieNode childNode = node.GetChildWithChildPath(TrieStore, ref childPath, i);
childTasks.Add(CreateTaskForPath(childPath, childNode, i));
}
else
{
path.AppendMut(i);
TrieNode childNode = node.GetChildWithChildPath(TrieStore, ref path, i);
Commit(committer, ref path, new NodeCommitInfo(childNode!, node, i), maxLevelForConcurrentCommit);
path.TruncateOne();
}
}
else
{
Expand All @@ -225,52 +243,31 @@ private void Commit(NodeCommitInfo nodeCommitInfo, bool skipSelf = false)
}
}

if (nodesToCommit.Count >= 4)
if (childTasks != null)
{
ClearExceptions();
Parallel.For(0, nodesToCommit.Count, RuntimeInformation.ParallelOptionsLogicalCores, i =>
{
try
{
Commit(nodesToCommit[i]);
}
catch (Exception e)
{
AddException(e);
}
});

if (WereExceptions())
{
ThrowAggregateExceptions();
}
}
else
{
for (int i = 0; i < nodesToCommit.Count; i++)
{
Commit(nodesToCommit[i]);
}
Task.WaitAll(childTasks.ToArray());
childTasks.Dispose();
}
}
}
else if (node.NodeType == NodeType.Extension)
{
TreePath childPath = node.GetChildPath(nodeCommitInfo.Path, 0);
TrieNode extensionChild = node.GetChildWithChildPath(TrieStore, ref childPath, 0);
int previousPathLength = node.AppendChildPath(ref path, 0);
TrieNode extensionChild = node.GetChildWithChildPath(TrieStore, ref path, 0);
if (extensionChild is null)
{
ThrowInvalidExtension();
}

if (extensionChild.IsDirty)
{
Commit(new NodeCommitInfo(extensionChild, node, childPath, 0));
Commit(committer, ref path, new NodeCommitInfo(extensionChild, node, 0), maxLevelForConcurrentCommit);
}
else
{
if (_logger.IsTrace) TraceExtensionSkip(extensionChild);
}
path.TruncateMut(previousPathLength);
}

node.ResolveKey(TrieStore, ref path, nodeCommitInfo.IsRoot, bufferPool: _bufferPool);
Expand All @@ -280,45 +277,14 @@ private void Commit(NodeCommitInfo nodeCommitInfo, bool skipSelf = false)
{
if (!skipSelf)
{
EnqueueCommit(nodeCommitInfo);
committer.CommitNode(ref path, nodeCommitInfo);
}
}
else
{
if (_logger.IsTrace) TraceSkipInlineNode(node);
}

void EnqueueCommit(in NodeCommitInfo value)
{
ConcurrentQueue<NodeCommitInfo> queue = Volatile.Read(ref _currentCommit);
// Allocate queue if first commit made
queue ??= CreateQueue(ref _currentCommit);
queue.Enqueue(value);
}

void ClearExceptions() => _commitExceptions?.Clear();
bool WereExceptions() => _commitExceptions?.IsEmpty == false;

void AddException(Exception value)
{
ConcurrentQueue<Exception> queue = Volatile.Read(ref _commitExceptions);
// Allocate queue if first exception thrown
queue ??= CreateQueue(ref _commitExceptions);
queue.Enqueue(value);
}

[MethodImpl(MethodImplOptions.NoInlining)]
ConcurrentQueue<T> CreateQueue<T>(ref ConcurrentQueue<T> queueRef)
{
ConcurrentQueue<T> queue = new();
ConcurrentQueue<T> current = Interlocked.CompareExchange(ref queueRef, queue, null);
return (current is null) ? queue : current;
}

[DoesNotReturn]
[StackTraceHidden]
void ThrowAggregateExceptions() => throw new AggregateException(_commitExceptions);

[DoesNotReturn]
[StackTraceHidden]
static void ThrowInvalidExtension() => throw new InvalidOperationException("An attempt to store an extension without a child.");
Expand Down Expand Up @@ -493,6 +459,8 @@ public virtual void Set(ReadOnlySpan<byte> rawKey, in CappedArray<byte> value)
ThrowNonConcurrentWrites();
}

_writeBeforeCommit++;

try
{
int nibblesCount = 2 * rawKey.Length;
Expand Down
9 changes: 2 additions & 7 deletions src/Nethermind/Nethermind.Trie/PreCachedTrieStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,9 @@ public void Dispose()
_inner.Dispose();
}

public void CommitNode(long blockNumber, Hash256? address, in NodeCommitInfo nodeCommitInfo, WriteFlags writeFlags = WriteFlags.None)
public ICommitter BeginCommit(TrieType trieType, long blockNumber, Hash256? address, TrieNode? root, WriteFlags writeFlags)
{
_inner.CommitNode(blockNumber, address, in nodeCommitInfo, writeFlags);
}

public void FinishBlockCommit(TrieType trieType, long blockNumber, Hash256? address, TrieNode? root, WriteFlags writeFlags = WriteFlags.None)
{
_inner.FinishBlockCommit(trieType, blockNumber, address, root, writeFlags);
return _inner.BeginCommit(trieType, blockNumber, address, root, writeFlags);
}

public bool IsPersisted(Hash256? address, in TreePath path, in ValueHash256 keccak)
Expand Down
Loading