Skip to content

Commit

Permalink
Perf/skip commit queue (#7571)
Browse files Browse the repository at this point in the history
Co-authored-by: lukasz.rozmej <lukasz.rozmej@gmail.com>
  • Loading branch information
asdacap and LukaszRozmej authored Oct 10, 2024
1 parent 522deba commit 8296752
Show file tree
Hide file tree
Showing 18 changed files with 500 additions and 515 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ public CompositePublisher(params IPublisher[] publishers)

public async Task PublishAsync<T>(T data) where T : class
{
// TODO: .Net 9 stackalloc
Task[] tasks = new Task[_publishers.Length];
for (int i = 0; i < _publishers.Length; i++)
{
Expand Down
3 changes: 2 additions & 1 deletion src/Nethermind/Nethermind.Network/CompositeNodeSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ public async IAsyncEnumerable<Node> DiscoverNodes([EnumeratorCancellation] Cance
{
Channel<Node> ch = Channel.CreateBounded<Node>(1);

Task[] feedTasks = _nodeSources.Select(async (innerSource) =>
// TODO: .Net 9 stackalloc
Task[] feedTasks = _nodeSources.Select(async innerSource =>
{
await foreach (Node node in innerSource.DiscoverNodes(cancellationToken))
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -702,8 +702,11 @@ public void GetNodeData_returns_cached_trie_nodes()
Hash256 nodeKey = TestItem.KeccakA;
TrieNode node = new(NodeType.Leaf, nodeKey, TestItem.KeccakB.Bytes);
IScopedTrieStore scopedTrieStore = trieStore.GetTrieStore(null);
scopedTrieStore.CommitNode(1, new NodeCommitInfo(node, TreePath.Empty));
scopedTrieStore.FinishBlockCommit(TrieType.State, 1, node);
using (ICommitter committer = scopedTrieStore.BeginCommit(TrieType.State, 1, node))
{
TreePath path = TreePath.Empty;
committer.CommitNode(ref path, new NodeCommitInfo(node));
}

stateDb.KeyExists(nodeKey).Should().BeFalse();
ctx.SyncServer.GetNodeData(new[] { nodeKey }, CancellationToken.None, NodeDataType.All).Should().BeEquivalentTo(new[] { TestItem.KeccakB.BytesToArray() });
Expand Down
376 changes: 233 additions & 143 deletions src/Nethermind/Nethermind.Trie.Test/Pruning/TreeStoreTests.cs

Large diffs are not rendered by default.

11 changes: 8 additions & 3 deletions src/Nethermind/Nethermind.Trie.Test/TrieNodeTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -933,15 +933,20 @@ public void Rlp_is_cloned_when_cloning()
TreePath emptyPath = TreePath.Empty;
leaf1.ResolveKey(trieStore, ref emptyPath, false);
leaf1.Seal();
trieStore.CommitNode(0, new NodeCommitInfo(leaf1, TreePath.Empty));

TrieNode leaf2 = new(NodeType.Leaf);
leaf2.Key = Bytes.FromHexString("abd");
leaf2.Value = new byte[222];
leaf2.ResolveKey(trieStore, ref emptyPath, false);
leaf2.Seal();
trieStore.CommitNode(0, new NodeCommitInfo(leaf2, TreePath.Empty));
trieStore.FinishBlockCommit(TrieType.State, 0, leaf2);

TreePath path = TreePath.Empty;

using (ICommitter? committer = trieStore.BeginCommit(TrieType.State, 0, leaf2))
{
committer.CommitNode(ref path, new NodeCommitInfo(leaf1));
committer.CommitNode(ref path, new NodeCommitInfo(leaf2));
}

TrieNode trieNode = new(NodeType.Branch);
trieNode.SetChild(1, leaf1);
Expand Down
5 changes: 3 additions & 2 deletions src/Nethermind/Nethermind.Trie/BatchedTrieVisitor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,12 @@ public void Start(

try
{
// TODO: .Net 9 stackalloc
Task[]? tasks = Enumerable.Range(0, trieVisitContext.MaxDegreeOfParallelism)
.Select((_) => Task.Run(BatchedThread))
.Select(_ => Task.Run(BatchedThread))
.ToArray();

Task.WhenAll(tasks).Wait();
Task.WaitAll(tasks);
}
catch (Exception)
{
Expand Down
43 changes: 11 additions & 32 deletions src/Nethermind/Nethermind.Trie/CachedTrieStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,46 +20,25 @@ public class CachedTrieStore(IScopedTrieStore @base) : IScopedTrieStore
{
private readonly NonBlocking.ConcurrentDictionary<(TreePath path, Hash256 hash), TrieNode> _cachedNode = new();

public TrieNode FindCachedOrUnknown(in TreePath path, Hash256 hash)
{
return _cachedNode.GetOrAdd((path, hash), (key) => @base.FindCachedOrUnknown(key.path, key.hash));
}
public TrieNode FindCachedOrUnknown(in TreePath path, Hash256 hash) =>
_cachedNode.GetOrAdd((path, hash), (key) => @base.FindCachedOrUnknown(key.path, key.hash));

public byte[]? LoadRlp(in TreePath path, Hash256 hash, ReadFlags flags = ReadFlags.None)
{
return @base.LoadRlp(in path, hash, flags);
}
public byte[]? LoadRlp(in TreePath path, Hash256 hash, ReadFlags flags = ReadFlags.None) =>
@base.LoadRlp(in path, hash, flags);

public byte[]? TryLoadRlp(in TreePath path, Hash256 hash, ReadFlags flags = ReadFlags.None)
{
return @base.TryLoadRlp(in path, hash, flags);
}
public byte[]? TryLoadRlp(in TreePath path, Hash256 hash, ReadFlags flags = ReadFlags.None) =>
@base.TryLoadRlp(in path, hash, flags);

public ITrieNodeResolver GetStorageTrieNodeResolver(Hash256? address)
{
public ITrieNodeResolver GetStorageTrieNodeResolver(Hash256? address) =>
throw new InvalidOperationException("unsupported");
}

public INodeStorage.KeyScheme Scheme => @base.Scheme;

public void CommitNode(long blockNumber, NodeCommitInfo nodeCommitInfo, WriteFlags writeFlags = WriteFlags.None)
{
@base.CommitNode(blockNumber, nodeCommitInfo, writeFlags);
}
public ICommitter BeginCommit(TrieType trieType, long blockNumber, TrieNode? root, WriteFlags writeFlags = WriteFlags.None) =>
@base.BeginCommit(trieType, blockNumber, root, writeFlags);

public void FinishBlockCommit(TrieType trieType, long blockNumber, TrieNode? root, WriteFlags writeFlags = WriteFlags.None)
{
@base.FinishBlockCommit(trieType, blockNumber, root, writeFlags);
}
public bool IsPersisted(in TreePath path, in ValueHash256 keccak) => @base.IsPersisted(in path, in keccak);

public bool IsPersisted(in TreePath path, in ValueHash256 keccak)
{
return @base.IsPersisted(in path, in keccak);
}

public void Set(in TreePath path, in ValueHash256 keccak, byte[] rlp)
{
@base.Set(in path, in keccak, rlp);
}
public void Set(in TreePath path, in ValueHash256 keccak, byte[] rlp) => @base.Set(in path, in keccak, rlp);
}

10 changes: 4 additions & 6 deletions src/Nethermind/Nethermind.Trie/NodeCommitInfo.cs
Original file line number Diff line number Diff line change
@@ -1,37 +1,35 @@
using System.Diagnostics.CodeAnalysis;

namespace Nethermind.Trie
{
public readonly struct NodeCommitInfo
{
public NodeCommitInfo(
TrieNode node,
in TreePath path
TrieNode node
)
{
ChildPositionAtParent = 0;
Node = node;
Path = path;
NodeParent = null;
}

public NodeCommitInfo(
TrieNode node,
TrieNode nodeParent,
in TreePath path,
int childPositionAtParent)
{
ChildPositionAtParent = childPositionAtParent;
Node = node;
Path = path;
NodeParent = nodeParent;
}

public TrieNode? Node { get; }
public readonly TreePath Path;

public TrieNode? NodeParent { get; }

public int ChildPositionAtParent { get; }

[MemberNotNullWhen(false, nameof(Node))]
public bool IsEmptyBlockMarker => Node is null;

public bool IsRoot => !IsEmptyBlockMarker && NodeParent is null;
Expand Down
Loading

0 comments on commit 8296752

Please sign in to comment.