Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't use exceptions for flow control during sync #6425

Merged
merged 5 commits into from
Dec 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Nethermind/Nethermind.Blockchain/BlockTree.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1272,7 +1272,7 @@ private ChainLevelInfo UpdateOrCreateLevel(long number, Hash256 hash, BlockInfo
/// <returns></returns>
private bool ShouldCache(long number)
{
return number == 0L || Head is null || number <= Head.Number + 1;
return number == 0L || Head is null || number >= Head.Number - HeaderStore.CacheSize;
}

public ChainLevelInfo? FindLevel(long number)
Expand Down
4 changes: 2 additions & 2 deletions src/Nethermind/Nethermind.Blockchain/Headers/HeaderStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ namespace Nethermind.Blockchain.Headers;

public class HeaderStore : IHeaderStore
{
// SyncProgressResolver MaxLookupBack is 128, add 16 wiggle room
private const int CacheSize = 128 + 16;
// SyncProgressResolver MaxLookupBack is 256, add 16 wiggle room
public const int CacheSize = 256 + 16;

private readonly IDb _headerDb;
private readonly IDb _blockNumberDb;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ internal void DisplayProgressReport(int pendingRequestsCount, BranchProgress bra
}

if (logger.IsInfo) logger.Info(
$"State Sync {TimeSpan.FromSeconds(SecondsInSync):dd\\.hh\\:mm\\:ss} | {dataSizeInfo} | branches: {branchProgress.Progress:P2} | kB/s: {savedKBytesPerSecond,5:F0} | accounts {SavedAccounts} | nodes {SavedNodesCount} | diagnostics: {pendingRequestsCount}.{AverageTimeInHandler:f2}ms");
$"State Sync {TimeSpan.FromSeconds(SecondsInSync):dd\\.hh\\:mm\\:ss} | {dataSizeInfo} | branches: {branchProgress.Progress:P2} | kB/s: {savedKBytesPerSecond,5:F0} | accounts {SavedAccounts} | nodes {SavedNodesCount} | pending: {pendingRequestsCount,3} | ave: {AverageTimeInHandler:f2}ms");
if (logger.IsDebug && DateTime.UtcNow - LastReportTime.full > TimeSpan.FromSeconds(10))
{
long allChecks = CheckWasInDependencies + CheckWasCached + StateWasThere + StateWasNotThere;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ shorter than the request */

_data.DisplayProgressReport(_pendingRequests.Count, _branchProgress, _logger);

handleWatch.Stop();
long total = handleWatch.ElapsedMilliseconds + _networkWatch.ElapsedMilliseconds;
if (total != 0)
{
Expand All @@ -326,9 +327,7 @@ shorter than the request */

Interlocked.Add(ref _handleWatch, handleWatch.ElapsedMilliseconds);
_data.LastDbReads = _data.DbChecks;
_data.AverageTimeInHandler =
(_data.AverageTimeInHandler * (_data.ProcessedRequestsCount - 1) +
_handleWatch) / _data.ProcessedRequestsCount;
_data.AverageTimeInHandler = _handleWatch / (decimal)_data.ProcessedRequestsCount;

Interlocked.Add(ref _data.HandledNodesCount, nonEmptyResponses);
return result;
Expand Down
6 changes: 1 addition & 5 deletions src/Nethermind/Nethermind.Trie/PatriciaTree.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1077,11 +1077,7 @@ public void Accept(ITreeVisitor visitor, Hash256 rootHash, VisitingOptions? visi
if (!rootHash.Equals(Keccak.EmptyTreeHash))
{
rootRef = RootHash == rootHash ? RootRef : TrieStore.FindCachedOrUnknown(rootHash);
try
{
rootRef!.ResolveNode(TrieStore);
}
catch (TrieException)
if (!rootRef!.TryResolveNode(TrieStore))
{
visitor.VisitMissingNode(rootHash, trieVisitContext);
return;
Expand Down
7 changes: 7 additions & 0 deletions src/Nethermind/Nethermind.Trie/Pruning/ITrieNodeResolver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,12 @@ public interface ITrieNodeResolver
/// <param name="hash"></param>
/// <returns></returns>
byte[]? LoadRlp(Hash256 hash, ReadFlags flags = ReadFlags.None);

/// <summary>
/// Loads RLP of the node.
/// </summary>
/// <param name="hash"></param>
/// <returns></returns>
byte[]? TryLoadRlp(Hash256 hash, ReadFlags flags = ReadFlags.None);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,6 @@ private NullTrieNodeResolver() { }

public TrieNode FindCachedOrUnknown(Hash256 hash) => new(NodeType.Unknown, hash);
public byte[]? LoadRlp(Hash256 hash, ReadFlags flags = ReadFlags.None) => null;
public byte[]? TryLoadRlp(Hash256 hash, ReadFlags flags = ReadFlags.None) => null;
}
}
2 changes: 2 additions & 0 deletions src/Nethermind/Nethermind.Trie/Pruning/NullTrieStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public event EventHandler<ReorgBoundaryReached> ReorgBoundaryReached

public TrieNode FindCachedOrUnknown(Hash256 hash) => new(NodeType.Unknown, hash);

public byte[] TryLoadRlp(Hash256 hash, ReadFlags flags = ReadFlags.None) => null;

public byte[] LoadRlp(Hash256 hash, ReadFlags flags = ReadFlags.None) => Array.Empty<byte>();

public bool IsPersisted(in ValueHash256 keccak) => true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public ReadOnlyTrieStore(TrieStore trieStore, IKeyValueStore? readOnlyStore)
public TrieNode FindCachedOrUnknown(Hash256 hash) =>
_trieStore.FindCachedOrUnknown(hash, true);

public byte[]? TryLoadRlp(Hash256 hash, ReadFlags flags) => _trieStore.TryLoadRlp(hash, _readOnlyStore, flags);
public byte[] LoadRlp(Hash256 hash, ReadFlags flags) => _trieStore.LoadRlp(hash, _readOnlyStore, flags);

public bool IsPersisted(in ValueHash256 keccak) => _trieStore.IsPersisted(keccak);
Expand Down
16 changes: 13 additions & 3 deletions src/Nethermind/Nethermind.Trie/Pruning/TrieStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -322,22 +322,32 @@ public void FinishBlockCommit(TrieType trieType, long blockNumber, TrieNode? roo

public event EventHandler<ReorgBoundaryReached>? ReorgBoundaryReached;

public byte[] LoadRlp(Hash256 keccak, IKeyValueStore? keyValueStore, ReadFlags readFlags = ReadFlags.None)
public byte[]? TryLoadRlp(Hash256 keccak, IKeyValueStore? keyValueStore, ReadFlags readFlags = ReadFlags.None)
{
keyValueStore ??= _keyValueStore;
byte[]? rlp = keyValueStore.Get(keccak.Bytes, readFlags);

if (rlp is not null)
{
Metrics.LoadedFromDbNodesCount++;
}

return rlp;
}

public byte[] LoadRlp(Hash256 keccak, IKeyValueStore? keyValueStore, ReadFlags readFlags = ReadFlags.None)
{
byte[]? rlp = TryLoadRlp(keccak, keyValueStore, readFlags);
if (rlp is null)
{
throw new TrieNodeException($"Node {keccak} is missing from the DB", keccak);
}

Metrics.LoadedFromDbNodesCount++;

return rlp;
}

public virtual byte[] LoadRlp(Hash256 keccak, ReadFlags readFlags = ReadFlags.None) => LoadRlp(keccak, null, readFlags);
public virtual byte[]? TryLoadRlp(Hash256 keccak, ReadFlags readFlags = ReadFlags.None) => TryLoadRlp(keccak, null, readFlags);

public bool IsPersisted(in ValueHash256 keccak)
{
Expand Down
114 changes: 84 additions & 30 deletions src/Nethermind/Nethermind.Trie/TrieNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -304,52 +304,106 @@ public void ResolveNode(ITrieNodeResolver tree, ReadFlags readFlags = ReadFlags.
throw new InvalidAsynchronousStateException($"{nameof(_rlpStream)} is null when {nameof(NodeType)} is {NodeType}");
}

Metrics.TreeNodeRlpDecodings++;
_rlpStream.ReadSequenceLength();
if (!DecodeRlp(bufferPool, out int numberOfItems))
{
throw new TrieNodeException($"Unexpected number of items = {numberOfItems} when decoding a node from RLP ({FullRlp.AsSpan().ToHexString()})", Keccak ?? Nethermind.Core.Crypto.Keccak.Zero);
}
}
catch (RlpException rlpException)
{
throw new TrieNodeException($"Error when decoding node {Keccak}", Keccak ?? Nethermind.Core.Crypto.Keccak.Zero, rlpException);
}
}

/// <summary>
/// Highly optimized
/// </summary>
public bool TryResolveNode(ITrieNodeResolver tree, ReadFlags readFlags = ReadFlags.None, ICappedArrayPool? bufferPool = null)
{
try
{
if (NodeType == NodeType.Unknown)
{
if (FullRlp.IsNull)
{
if (Keccak is null)
{
return false;
}

// micro optimization to prevent searches beyond 3 items for branches (search up to three)
int numberOfItems = _rlpStream.PeekNumberOfItemsRemaining(null, 3);
FullRlp = tree.TryLoadRlp(Keccak, readFlags);
IsPersisted = true;

if (numberOfItems > 2)
if (FullRlp.IsNull)
{
return false;
}
}
}
else
{
NodeType = NodeType.Branch;
return true;
}
else if (numberOfItems == 2)

_rlpStream = FullRlp.AsRlpStream();
if (_rlpStream is null)
{
(byte[] key, bool isLeaf) = HexPrefix.FromBytes(_rlpStream.DecodeByteArraySpan());
throw new InvalidAsynchronousStateException($"{nameof(_rlpStream)} is null when {nameof(NodeType)} is {NodeType}");
}

// a hack to set internally and still verify attempts from the outside
// after the code is ready we should just add proper access control for methods from the outside and inside
bool isDirtyActual = IsDirty;
IsDirty = true;
return DecodeRlp(bufferPool, out _);
}
catch (RlpException)
{
return false;
}
}

if (isLeaf)
{
NodeType = NodeType.Leaf;
Key = key;
private bool DecodeRlp(ICappedArrayPool bufferPool, out int itemsCount)
{
Metrics.TreeNodeRlpDecodings++;
_rlpStream.ReadSequenceLength();

ReadOnlySpan<byte> valueSpan = _rlpStream.DecodeByteArraySpan();
CappedArray<byte> buffer = bufferPool.SafeRentBuffer(valueSpan.Length);
valueSpan.CopyTo(buffer.AsSpan());
Value = buffer;
}
else
{
NodeType = NodeType.Extension;
Key = key;
}
// micro optimization to prevent searches beyond 3 items for branches (search up to three)
int numberOfItems = itemsCount = _rlpStream.PeekNumberOfItemsRemaining(null, 3);

if (numberOfItems > 2)
{
NodeType = NodeType.Branch;
}
else if (numberOfItems == 2)
{
(byte[] key, bool isLeaf) = HexPrefix.FromBytes(_rlpStream.DecodeByteArraySpan());

// a hack to set internally and still verify attempts from the outside
// after the code is ready we should just add proper access control for methods from the outside and inside
bool isDirtyActual = IsDirty;
IsDirty = true;

if (isLeaf)
{
NodeType = NodeType.Leaf;
Key = key;

IsDirty = isDirtyActual;
ReadOnlySpan<byte> valueSpan = _rlpStream.DecodeByteArraySpan();
CappedArray<byte> buffer = bufferPool.SafeRentBuffer(valueSpan.Length);
valueSpan.CopyTo(buffer.AsSpan());
Value = buffer;
}
else
{
throw new TrieNodeException($"Unexpected number of items = {numberOfItems} when decoding a node from RLP ({FullRlp.AsSpan().ToHexString()})", Keccak ?? Nethermind.Core.Crypto.Keccak.Zero);
NodeType = NodeType.Extension;
Key = key;
}

IsDirty = isDirtyActual;
}
catch (RlpException rlpException)
else
{
throw new TrieNodeException($"Error when decoding node {Keccak}", Keccak ?? Nethermind.Core.Crypto.Keccak.Zero, rlpException);
return false;
}

return true;
}

public void ResolveKey(ITrieNodeResolver tree, bool isRoot, ICappedArrayPool? bufferPool = null)
Expand Down
10 changes: 10 additions & 0 deletions src/Nethermind/Nethermind.Trie/TrieNodeResolverWithReadFlags.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,16 @@ public TrieNode FindCachedOrUnknown(Hash256 hash)
return _baseResolver.FindCachedOrUnknown(hash);
}

public byte[]? TryLoadRlp(Hash256 hash, ReadFlags flags = ReadFlags.None)
{
if (flags != ReadFlags.None)
{
return _baseResolver.TryLoadRlp(hash, flags | _defaultFlags);
}

return _baseResolver.TryLoadRlp(hash, _defaultFlags);
}

public byte[]? LoadRlp(Hash256 hash, ReadFlags flags = ReadFlags.None)
{
if (flags != ReadFlags.None)
Expand Down