Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prewarm tx addresses in parallel #7423

Merged
merged 7 commits into from
Sep 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,49 @@ public class BeaconBlockRootHandler(ITransactionProcessor processor) : IBeaconBl
{
private const long GasLimit = 30_000_000L;

public void StoreBeaconRoot(Block block, IReleaseSpec spec)
public (Address? toAddress, AccessList? accessList) BeaconRootsAccessList(Block block, IReleaseSpec spec, bool includeStorageCells = true)
{
BlockHeader? header = block.Header;
var canInsertBeaconRoot = spec.IsBeaconBlockRootAvailable
bool canInsertBeaconRoot = spec.IsBeaconBlockRootAvailable
&& !header.IsGenesis
&& header.ParentBeaconBlockRoot is not null;

if (canInsertBeaconRoot)
Address? eip4788ContractAddress = canInsertBeaconRoot ?
spec.Eip4788ContractAddress ?? Eip4788Constants.BeaconRootsAddress :
null;

if (eip4788ContractAddress is null)
{
return (null, null);
}

var builder = new AccessList.Builder()
.AddAddress(eip4788ContractAddress);

if (includeStorageCells)
{
builder.AddStorage(block.Timestamp % 8191);
}

return (eip4788ContractAddress, builder.Build());
}

public void StoreBeaconRoot(Block block, IReleaseSpec spec)
{
(Address? toAddress, AccessList? accessList) = BeaconRootsAccessList(block, spec, includeStorageCells: false);

if (toAddress is not null)
{
Address beaconRootsAddress = spec.Eip4788ContractAddress ?? Eip4788Constants.BeaconRootsAddress;
BlockHeader? header = block.Header;
Transaction transaction = new()
{
Value = UInt256.Zero,
Data = header.ParentBeaconBlockRoot.Bytes.ToArray(),
To = beaconRootsAddress,
To = toAddress,
SenderAddress = Address.SystemUser,
GasLimit = GasLimit,
GasPrice = UInt256.Zero,
AccessList = new AccessList.Builder().AddAddress(beaconRootsAddress).Build()
AccessList = accessList
benaadams marked this conversation as resolved.
Show resolved Hide resolved
};

transaction.Hash = transaction.CalculateHash();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@
// SPDX-License-Identifier: LGPL-3.0-only

using Nethermind.Core;
using Nethermind.Core.Eip2930;
using Nethermind.Core.Specs;

namespace Nethermind.Blockchain.BeaconBlockRoot;
public interface IBeaconBlockRootHandler
{
(Address? toAddress, AccessList? accessList) BeaconRootsAccessList(Block block, IReleaseSpec spec, bool includeStorageCells = true);
void StoreBeaconRoot(Block block, IReleaseSpec spec);
}
191 changes: 128 additions & 63 deletions src/Nethermind/Nethermind.Consensus/Processing/BlockCachePreWarmer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
using Nethermind.Evm.TransactionProcessing;
using Nethermind.Logging;
using Nethermind.State;
using Nethermind.Core.Eip2930;

namespace Nethermind.Consensus.Processing;

Expand All @@ -24,7 +25,7 @@ public class BlockCachePreWarmer(ReadOnlyTxProcessingEnvFactory envFactory, ISpe
private readonly ObjectPool<SystemTransaction> _systemTransactionPool = new DefaultObjectPool<SystemTransaction>(new DefaultPooledObjectPolicy<SystemTransaction>(), Environment.ProcessorCount);
private readonly ILogger _logger = logManager.GetClassLogger<BlockCachePreWarmer>();

public Task PreWarmCaches(Block suggestedBlock, Hash256? parentStateRoot, CancellationToken cancellationToken = default)
public Task PreWarmCaches(Block suggestedBlock, Hash256? parentStateRoot, AccessList? systemTxAccessList, CancellationToken cancellationToken = default)
{
if (targetWorldState is not null)
{
Expand All @@ -33,40 +34,38 @@ public Task PreWarmCaches(Block suggestedBlock, Hash256? parentStateRoot, Cancel
if (_logger.IsWarn) _logger.Warn("Caches are not empty. Clearing them.");
}

if (!IsGenesisBlock(parentStateRoot) && Environment.ProcessorCount > 2 && !cancellationToken.IsCancellationRequested)
var physicalCoreCount = RuntimeInformation.PhysicalCoreCount;
if (!IsGenesisBlock(parentStateRoot) && physicalCoreCount > 2 && !cancellationToken.IsCancellationRequested)
{
ParallelOptions parallelOptions = new() { MaxDegreeOfParallelism = physicalCoreCount - 1, CancellationToken = cancellationToken };

// Run address warmer ahead of transactions warmer, but queue to ThreadPool so it doesn't block the txs
ThreadPool.UnsafeQueueUserWorkItem(
new AddressWarmer(parallelOptions, suggestedBlock, parentStateRoot, systemTxAccessList, this), preferLocal: false);
// Do not pass cancellation token to the task, we don't want exceptions to be thrown in main processing thread
return Task.Run(() => PreWarmCachesParallel(suggestedBlock, parentStateRoot, cancellationToken));
return Task.Run(() => PreWarmCachesParallel(suggestedBlock, parentStateRoot, parallelOptions, cancellationToken));
}
}

return Task.CompletedTask;
}

// Parent state root is null for genesis block
private bool IsGenesisBlock(Hash256? parentStateRoot) => parentStateRoot is null;
private static bool IsGenesisBlock(Hash256? parentStateRoot) => parentStateRoot is null;

public void ClearCaches() => targetWorldState?.ClearCache();

public Task ClearCachesInBackground() => targetWorldState?.ClearCachesInBackground() ?? Task.CompletedTask;

private void PreWarmCachesParallel(Block suggestedBlock, Hash256 parentStateRoot, CancellationToken cancellationToken)
private void PreWarmCachesParallel(Block suggestedBlock, Hash256 parentStateRoot, ParallelOptions parallelOptions, CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested) return;

try
{
var physicalCoreCount = RuntimeInformation.PhysicalCoreCount;
if (physicalCoreCount < 2)
{
if (_logger.IsDebug) _logger.Debug("Physical core count is less than 2. Skipping pre-warming.");
return;
}
if (_logger.IsDebug) _logger.Debug($"Started pre-warming caches for block {suggestedBlock.Number}.");

ParallelOptions parallelOptions = new() { MaxDegreeOfParallelism = physicalCoreCount - 1, CancellationToken = cancellationToken };
IReleaseSpec spec = specProvider.GetSpec(suggestedBlock.Header);

WarmupTransactions(parallelOptions, spec, suggestedBlock, parentStateRoot);
WarmupWithdrawals(parallelOptions, spec, suggestedBlock, parentStateRoot);

Expand All @@ -76,75 +75,141 @@ private void PreWarmCachesParallel(Block suggestedBlock, Hash256 parentStateRoot
{
if (_logger.IsDebug) _logger.Debug($"Pre-warming caches cancelled for block {suggestedBlock.Number}.");
}
}

void WarmupWithdrawals(ParallelOptions parallelOptions, IReleaseSpec spec, Block block, Hash256 stateRoot)
private void WarmupWithdrawals(ParallelOptions parallelOptions, IReleaseSpec spec, Block block, Hash256 stateRoot)
{
if (parallelOptions.CancellationToken.IsCancellationRequested) return;
if (spec.WithdrawalsEnabled && block.Withdrawals is not null)
{
if (parallelOptions.CancellationToken.IsCancellationRequested) return;
if (spec.WithdrawalsEnabled && block.Withdrawals is not null)
{
int progress = 0;
Parallel.For(0, block.Withdrawals.Length, parallelOptions,
_ =>
int progress = 0;
Parallel.For(0, block.Withdrawals.Length, parallelOptions,
_ =>
{
IReadOnlyTxProcessorSource env = _envPool.Get();
int i = 0;
try
{
IReadOnlyTxProcessorSource env = _envPool.Get();
int i = 0;
try
{
using IReadOnlyTxProcessingScope scope = env.Build(stateRoot);
// Process withdrawals in sequential order, rather than partitioning scheme from Parallel.For
// Interlocked.Increment returns the incremented value, so subtract 1 to start at 0
i = Interlocked.Increment(ref progress) - 1;
scope.WorldState.WarmUp(block.Withdrawals[i].Address);
}
catch (Exception ex)
{
if (_logger.IsDebug) _logger.Error($"Error pre-warming withdrawal {i}", ex);
}
finally
{
_envPool.Return(env);
}
});
using IReadOnlyTxProcessingScope scope = env.Build(stateRoot);
// Process withdrawals in sequential order, rather than partitioning scheme from Parallel.For
// Interlocked.Increment returns the incremented value, so subtract 1 to start at 0
i = Interlocked.Increment(ref progress) - 1;
scope.WorldState.WarmUp(block.Withdrawals[i].Address);
}
catch (Exception ex)
{
if (_logger.IsDebug) _logger.Error($"Error pre-warming withdrawal {i}", ex);
}
finally
{
_envPool.Return(env);
}
});
}
}

private void WarmupTransactions(ParallelOptions parallelOptions, IReleaseSpec spec, Block block, Hash256 stateRoot)
{
if (parallelOptions.CancellationToken.IsCancellationRequested) return;

int progress = 0;
Parallel.For(0, block.Transactions.Length, parallelOptions, _ =>
{
using ThreadExtensions.Disposable handle = Thread.CurrentThread.BoostPriority();
IReadOnlyTxProcessorSource env = _envPool.Get();
SystemTransaction systemTransaction = _systemTransactionPool.Get();
Transaction? tx = null;
try
{
// Process transactions in sequential order, rather than partitioning scheme from Parallel.For
// Interlocked.Increment returns the incremented value, so subtract 1 to start at 0
int i = Interlocked.Increment(ref progress) - 1;
// If the transaction has already been processed or being processed, exit early
if (block.TransactionProcessed > i) return;

tx = block.Transactions[i];
tx.CopyTo(systemTransaction);
using IReadOnlyTxProcessingScope scope = env.Build(stateRoot);
if (spec.UseTxAccessLists)
{
scope.WorldState.WarmUp(tx.AccessList); // eip-2930
}
TransactionResult result = scope.TransactionProcessor.Trace(systemTransaction, new BlockExecutionContext(block.Header.Clone()), NullTxTracer.Instance);
if (_logger.IsTrace) _logger.Trace($"Finished pre-warming cache for tx[{i}] {tx.Hash} with {result}");
}
catch (Exception ex)
{
if (_logger.IsDebug) _logger.Error($"Error pre-warming cache {tx?.Hash}", ex);
}
finally
{
_systemTransactionPool.Return(systemTransaction);
_envPool.Return(env);
}
});
}

private class AddressWarmer(ParallelOptions parallelOptions, Block block, Hash256 stateRoot, AccessList? systemTxAccessList, BlockCachePreWarmer preWarmer)
: IThreadPoolWorkItem
{
private readonly ParallelOptions ParallelOptions = parallelOptions;
private readonly Block Block = block;
private readonly Hash256 StateRoot = stateRoot;
private readonly BlockCachePreWarmer PreWarmer = preWarmer;
private readonly AccessList? SystemTxAccessList = systemTxAccessList;

void IThreadPoolWorkItem.Execute()
{
IReadOnlyTxProcessorSource env = PreWarmer._envPool.Get();
try
{
using IReadOnlyTxProcessingScope scope = env.Build(StateRoot);
WarmupAddresses(ParallelOptions, Block, scope);
}
catch (Exception ex)
{
if (PreWarmer._logger.IsDebug) PreWarmer._logger.Error($"Error pre-warming addresses", ex);
}
finally
{
PreWarmer._envPool.Return(env);
}
}

void WarmupTransactions(ParallelOptions parallelOptions, IReleaseSpec spec, Block block, Hash256 stateRoot)
private void WarmupAddresses(ParallelOptions parallelOptions, Block block, IReadOnlyTxProcessingScope scope)
{
if (parallelOptions.CancellationToken.IsCancellationRequested) return;

if (SystemTxAccessList is not null)
{
scope.WorldState.WarmUp(SystemTxAccessList);
}

int progress = 0;
Parallel.For(0, block.Transactions.Length, parallelOptions, _ =>
Parallel.For(0, block.Transactions.Length, parallelOptions,
_ =>
{
using ThreadExtensions.Disposable handle = Thread.CurrentThread.BoostPriority();
IReadOnlyTxProcessorSource env = _envPool.Get();
SystemTransaction systemTransaction = _systemTransactionPool.Get();
Transaction? tx = null;
int i = 0;
try
{
// Process transactions in sequential order, rather than partitioning scheme from Parallel.For
// Process addresses in sequential order, rather than partitioning scheme from Parallel.For
// Interlocked.Increment returns the incremented value, so subtract 1 to start at 0
int i = Interlocked.Increment(ref progress) - 1;
// If the transaction has already been processed or being processed, exit early
if (block.TransactionProcessed > i) return;

tx = block.Transactions[i];
tx.CopyTo(systemTransaction);
using IReadOnlyTxProcessingScope scope = env.Build(stateRoot);
if (spec.UseTxAccessLists)
i = Interlocked.Increment(ref progress) - 1;
Transaction tx = block.Transactions[i];
Address? sender = tx.SenderAddress;
if (sender is not null)
{
scope.WorldState.WarmUp(sender);
}
Address to = tx.To;
if (to is not null)
{
scope.WorldState.WarmUp(tx.AccessList); // eip-2930
scope.WorldState.WarmUp(to);
}
TransactionResult result = scope.TransactionProcessor.Trace(systemTransaction, new BlockExecutionContext(block.Header.Clone()), NullTxTracer.Instance);
if (_logger.IsTrace) _logger.Trace($"Finished pre-warming cache for tx[{i}] {tx.Hash} with {result}");
}
catch (Exception ex)
{
if (_logger.IsDebug) _logger.Error($"Error pre-warming cache {tx?.Hash}", ex);
}
finally
{
_systemTransactionPool.Return(systemTransaction);
_envPool.Return(env);
if (PreWarmer._logger.IsDebug) PreWarmer._logger.Error($"Error pre-warming addresses {i}", ex);
}
});
}
Expand Down
27 changes: 19 additions & 8 deletions src/Nethermind/Nethermind.Consensus/Processing/BlockProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
using Nethermind.Consensus.Withdrawals;
using Nethermind.Core;
using Nethermind.Core.Crypto;
using Nethermind.Core.Eip2930;
using Nethermind.Core.Specs;
using Nethermind.Crypto;
using Nethermind.Evm;
Expand Down Expand Up @@ -107,17 +108,27 @@ the previous head state.*/
BlockProcessing?.Invoke(this, new BlockEventArgs(suggestedBlock));
}

using CancellationTokenSource cancellationTokenSource = new();
Task? preWarmTask = suggestedBlock.Transactions.Length < 3
? null
: preWarmer?.PreWarmCaches(suggestedBlock, preBlockStateRoot!, cancellationTokenSource.Token);
(Block processedBlock, TxReceipt[] receipts) = ProcessOne(suggestedBlock, options, blockTracer);
// Block is processed, we can cancel the prewarm task
if (preWarmTask is not null)
Block processedBlock;
TxReceipt[] receipts;

Task? preWarmTask = null;
bool skipPrewarming = preWarmer is null || suggestedBlock.Transactions.Length < 3;
if (!skipPrewarming)
{
using CancellationTokenSource cancellationTokenSource = new();
(_, AccessList? accessList) = _beaconBlockRootHandler.BeaconRootsAccessList(suggestedBlock, _specProvider.GetSpec(suggestedBlock.Header));
preWarmTask = preWarmer.PreWarmCaches(suggestedBlock, preBlockStateRoot, accessList, cancellationTokenSource.Token);

(processedBlock, receipts) = ProcessOne(suggestedBlock, options, blockTracer);
// Block is processed, we can cancel the prewarm task
preWarmTask = preWarmTask.ContinueWith(_clearCaches).Unwrap();
cancellationTokenSource.Cancel();
}
cancellationTokenSource.Cancel();
else
{
(processedBlock, receipts) = ProcessOne(suggestedBlock, options, blockTracer);
}

processedBlocks[i] = processedBlock;

// be cautious here as AuRa depends on processing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@
using System.Threading.Tasks;
using Nethermind.Core;
using Nethermind.Core.Crypto;
using Nethermind.Core.Eip2930;

namespace Nethermind.Consensus.Processing;

public interface IBlockCachePreWarmer
{
Task PreWarmCaches(Block suggestedBlock, Hash256 parentStateRoot, CancellationToken cancellationToken = default);
Task PreWarmCaches(Block suggestedBlock, Hash256 parentStateRoot, AccessList? systemTxAccessList, CancellationToken cancellationToken = default);
void ClearCaches();
Task ClearCachesInBackground();
}