Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce memory use during Archive Sync #7407

Merged
merged 8 commits into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ public void SetUp()
CreateStorage();
}

[TearDown]
public void TearDown()
{
_receiptsDb.Dispose();
}

private void CreateStorage()
{
_decoder = new ReceiptArrayStorageDecoder(_useCompactReceipts);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Runtime;
using System.Threading;
using System.Threading.Tasks;
using Nethermind.Blockchain;
using Nethermind.Blockchain.Find;
using Nethermind.Config;
using Nethermind.Core;
using Nethermind.Core.Attributes;
using Nethermind.Core.Crypto;
using Nethermind.Core.Extensions;
using Nethermind.Core.Memory;
using Nethermind.Evm.Tracing;
using Nethermind.Evm.Tracing.GethStyle;
using Nethermind.Evm.Tracing.ParityStyle;
Expand Down Expand Up @@ -295,12 +296,25 @@ private Task RunProcessing()

private void RunProcessingLoop()
{
const int BlocksBacklogTriggeringManualGC = 20;
const int MaxBlocksWithoutGC = 100;

if (_logger.IsDebug) _logger.Debug($"Starting block processor - {_blockQueue.Count} blocks waiting in the queue.");

FireProcessingQueueEmpty();

var fireGC = false;
var countToGC = 0;
foreach (BlockRef blockRef in _blockQueue.GetConsumingEnumerable(_loopCancellationSource.Token))
{
if (!fireGC && _blockQueue.Count > BlocksBacklogTriggeringManualGC)
{
// Long chains in archive sync don't force GC and don't call MallocTrim;
// so we trigger it manually
fireGC = true;
countToGC = MaxBlocksWithoutGC;
}

try
{
if (blockRef.IsInDb || blockRef.Block is null)
Expand Down Expand Up @@ -338,11 +352,29 @@ private void RunProcessingLoop()

if (_logger.IsTrace) _logger.Trace($"Now {_blockQueue.Count} blocks waiting in the queue.");
FireProcessingQueueEmpty();

if (fireGC)
{
countToGC--;
if (countToGC <= 0)
{
fireGC = false;
PerformFullGC();
}
}
}

if (_logger.IsInfo) _logger.Info("Block processor queue stopped.");
}

private void PerformFullGC()
{
if (_logger.IsDebug) _logger.Debug($"Performing Full GC");
GCSettings.LargeObjectHeapCompactionMode = GCLargeObjectHeapCompactionMode.CompactOnce;
System.GC.Collect(2, GCCollectionMode.Aggressive, blocking: true, compacting: true);
MallocHelper.Instance.MallocTrim((uint)1.MiB());
}

private void FireProcessingQueueEmpty()
{
if (((IBlockProcessingQueue)this).IsEmpty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,8 @@ public virtual void Dispose()
{
CodeDb?.Dispose();
StateDb?.Dispose();
DbProvider.BlobTransactionsDb?.Dispose();
DbProvider.ReceiptsDb?.Dispose();
}

_trieStoreWatcher?.Dispose();
Expand Down
1 change: 1 addition & 0 deletions src/Nethermind/Nethermind.Core.Test/TestMemColumnDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,5 @@ public IColumnsWriteBatch<TKey> StartWriteBatch()
{
return new InMemoryColumnWriteBatch<TKey>(this);
}
public void Dispose() { }
}
1 change: 1 addition & 0 deletions src/Nethermind/Nethermind.Db.Rpc/RpcColumnsDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,6 @@ public IColumnsWriteBatch<T> StartWriteBatch()
{
return new InMemoryColumnWriteBatch<T>(this);
}
public void Dispose() { }
}
}
2 changes: 1 addition & 1 deletion src/Nethermind/Nethermind.Db/IColumnsDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

namespace Nethermind.Db
{
public interface IColumnsDb<TKey> : IDbMeta
public interface IColumnsDb<TKey> : IDbMeta, IDisposable
{
IDb GetColumnDb(TKey key);
IEnumerable<TKey> ColumnKeys { get; }
Expand Down
1 change: 1 addition & 0 deletions src/Nethermind/Nethermind.Db/MemColumnsDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,6 @@ public IColumnsWriteBatch<TKey> StartWriteBatch()
{
return new InMemoryColumnWriteBatch<TKey>(this);
}
public void Dispose() { }
}
}
19 changes: 11 additions & 8 deletions src/Nethermind/Nethermind.State/PartialStorageProviderBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ internal abstract class PartialStorageProviderBase
{
protected readonly Dictionary<StorageCell, StackList<int>> _intraBlockCache = new();
protected readonly ILogger _logger;
protected readonly List<Change?> _changes = new(Resettable.StartCapacity);
protected readonly List<Change> _changes = new(Resettable.StartCapacity);
private readonly List<Change> _keptInCache = new();

// stack of snapshot indexes on changes for start of each transaction
Expand Down Expand Up @@ -105,7 +105,7 @@ public void Restore(int snapshot)
}

_keptInCache.Add(change);
_changes[actualPosition] = null;
_changes[actualPosition] = default;
continue;
}
}
Expand All @@ -116,7 +116,7 @@ public void Restore(int snapshot)
throw new InvalidOperationException($"Expected checked value {forAssertion} to be equal to {currentPosition} - {i}");
}

_changes[currentPosition - i] = null;
_changes[currentPosition - i] = default;

if (stack.Count == 0)
{
Expand Down Expand Up @@ -230,7 +230,7 @@ protected bool TryGetCachedValue(in StorageCell storageCell, out byte[]? bytes)
{
int lastChangeIndex = stack.Peek();
{
bytes = _changes[lastChangeIndex]!.Value;
bytes = _changes[lastChangeIndex].Value;
return true;
}
}
Expand Down Expand Up @@ -293,7 +293,7 @@ public virtual void ClearStorage(Address address)
/// <summary>
/// Used for tracking each change to storage
/// </summary>
protected class Change
protected readonly struct Change
{
public Change(ChangeType changeType, StorageCell storageCell, byte[] value)
{
Expand All @@ -302,16 +302,19 @@ public Change(ChangeType changeType, StorageCell storageCell, byte[] value)
ChangeType = changeType;
}

public ChangeType ChangeType { get; }
public StorageCell StorageCell { get; }
public byte[] Value { get; }
public readonly ChangeType ChangeType;
public readonly StorageCell StorageCell;
public readonly byte[] Value;

public bool IsNull => ChangeType == ChangeType.Null;
}

/// <summary>
/// Type of change to track
/// </summary>
protected enum ChangeType
{
Null = 0,
JustCache,
Update,
Destroy,
Expand Down
13 changes: 7 additions & 6 deletions src/Nethermind/Nethermind.State/PersistentStorageProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -104,15 +104,15 @@ public byte[] GetOriginal(in StorageCell storageCell)
{
if (stack.TryGetSearchedItem(snapshot, out int lastChangeIndexBeforeOriginalSnapshot))
{
return _changes[lastChangeIndexBeforeOriginalSnapshot]!.Value;
return _changes[lastChangeIndexBeforeOriginalSnapshot].Value;
}
}
}

return value;
}


private HashSet<AddressAsKey>? _tempToUpdateRoots;
/// <summary>
/// Called by Commit
/// Used for persistent storage specific logic
Expand All @@ -127,12 +127,12 @@ protected override void CommitCore(IStorageTracer tracer)
{
return;
}
if (_changes[currentPosition] is null)
if (_changes[currentPosition].IsNull)
{
throw new InvalidOperationException($"Change at current position {currentPosition} was null when committing {nameof(PartialStorageProviderBase)}");
}

HashSet<Address> toUpdateRoots = new();
HashSet<AddressAsKey> toUpdateRoots = (_tempToUpdateRoots ??= new());

bool isTracing = tracer.IsTracingStorage;
Dictionary<StorageCell, ChangeTrace>? trace = null;
Expand Down Expand Up @@ -202,7 +202,7 @@ protected override void CommitCore(IStorageTracer tracer)
}
}

foreach (Address address in toUpdateRoots)
foreach (AddressAsKey address in toUpdateRoots)
{
// since the accounts could be empty accounts that are removing (EIP-158)
if (_stateProvider.AccountExists(address))
Expand All @@ -215,6 +215,7 @@ protected override void CommitCore(IStorageTracer tracer)
_storages.Remove(address);
}
}
toUpdateRoots.Clear();

base.CommitCore(tracer);
_originalValues.Clear();
Expand Down Expand Up @@ -288,7 +289,7 @@ void UpdateRootHashesMultiThread()
}
}

private void SaveToTree(HashSet<Address> toUpdateRoots, Change change)
private void SaveToTree(HashSet<AddressAsKey> toUpdateRoots, Change change)
{
if (_originalValues.TryGetValue(change.StorageCell, out byte[] initialValue) &&
initialValue.AsSpan().SequenceEqual(change.Value))
Expand Down
23 changes: 13 additions & 10 deletions src/Nethermind/Nethermind.State/StateProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ internal class StateProvider
private readonly ILogger _logger;
private readonly IKeyValueStore _codeDb;

private List<Change?> _changes = new(Resettable.StartCapacity);
private List<Change> _changes = new(Resettable.StartCapacity);
internal readonly StateTree _tree;
private readonly Func<AddressAsKey, Account> _getStateFromTrie;

Expand Down Expand Up @@ -186,7 +186,7 @@ Account GetThroughCacheCheckExists()
{
// this also works like this in Geth (they don't follow the spec ¯\_(*~*)_/¯)
// however we don't do it because of a consensus issue with Geth, just to avoid
// hitting non-existing account when substractin Zero-value from the sender
// hitting non-existing account when subtracting Zero-value from the sender
if (releaseSpec.IsEip158Enabled && !isSubtracting)
{
Account touched = GetThroughCacheCheckExists();
Expand Down Expand Up @@ -348,12 +348,12 @@ public void Restore(int snapshot)
}

_keptInCache.Add(change);
_changes[actualPosition] = null;
_changes[actualPosition] = default;
continue;
}
}

_changes[currentPosition - i] = null; // TODO: temp, ???
_changes[currentPosition - i] = default; // TODO: temp, ???
int forChecking = stack.Pop();
if (forChecking != currentPosition - i)
{
Expand Down Expand Up @@ -439,7 +439,7 @@ public void Commit(IReleaseSpec releaseSpec, IWorldStateTracer stateTracer, bool
}

if (_logger.IsTrace) _logger.Trace($"Committing state changes (at {currentPosition})");
if (_changes[currentPosition] is null)
if (_changes[currentPosition].IsNull)
{
throw new InvalidOperationException($"Change at current position {currentPosition} was null when committing {nameof(StateProvider)}");
}
Expand Down Expand Up @@ -735,7 +735,7 @@ private void SetState(Address address, Account? account)
{
if (_intraTxCache.TryGetValue(address, out Stack<int> value))
{
return _changes[value.Peek()]!.Account;
return _changes[value.Peek()].Account;
}

Account account = GetAndAddToCache(address);
Expand Down Expand Up @@ -796,14 +796,15 @@ private Stack<int> SetupCache(Address address)

private enum ChangeType
{
Null = 0,
JustCache,
Touch,
Update,
New,
Delete
}

private class Change
private readonly struct Change
{
public Change(ChangeType type, Address address, Account? account)
{
Expand All @@ -812,9 +813,11 @@ public Change(ChangeType type, Address address, Account? account)
Account = account;
}

public ChangeType ChangeType { get; }
public Address Address { get; }
public Account? Account { get; }
public readonly ChangeType ChangeType;
public readonly Address Address;
public readonly Account? Account;

public bool IsNull => ChangeType == ChangeType.Null;
}

public ArrayPoolList<AddressAsKey>? ChangedAddresses()
Expand Down
6 changes: 3 additions & 3 deletions src/Nethermind/Nethermind.Trie/PreCachedTrieStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public void Set(Hash256? address, in TreePath path, in ValueHash256 keccak, byte
public INodeStorage.KeyScheme Scheme => _inner.Scheme;
}

public class NodeKey : IEquatable<NodeKey>
public readonly struct NodeKey : IEquatable<NodeKey>
{
public readonly Hash256? Address;
public readonly TreePath Path;
Expand All @@ -108,8 +108,8 @@ public NodeKey(Hash256? address, in TreePath path, Hash256 hash)
Hash = hash;
}

public bool Equals(NodeKey? other) =>
other is not null && Address == other.Address && Path.Equals(in other.Path) && Hash.Equals(other.Hash);
public bool Equals(NodeKey other) =>
Address == other.Address && Path.Equals(in other.Path) && Hash.Equals(other.Hash);

public override bool Equals(object? obj) => obj is NodeKey key && Equals(key);

Expand Down
Loading