Skip to content

Commit

Permalink
Blob txs reorgs (#6254)
Browse files Browse the repository at this point in the history
* add column for processed txs

* add feature of returning keys only

* adjust BlobTxStorage

* cosmetics

* add logic for storing processed blob txs and removing finalized

* rename

* fix

* add tests

* cosmetics

* add sender recovery for reorganized blob txs

* requested change

* one more place

* postmerge fix

* fix saving of processed blob txs

* optimize saving processed txs

* readd blob support config

* fix whitespace

* fix test

* fix tests parallelism?

* add more logs

* fix blob txs metric

* refactor BlobTxStorage

* optimize collecting blob txs from processed block

* Create IBlobTxStorage and separate blob specific methods from ITxStorage

* use batch when cleaning processed txs db

* fix

* fix file encoding

* fix init when blob support or reorgs disabled

* fix signing test txs

* add regression test

* fix tests

* one more regression test

* post-merge fix

* add test for readding reorganized blob txs

* improve test

* one more check in test

* Remove unnecessary list

Sleep -> Task in test

* cosmetic

* compress 3 bool flags to 1 enum

* fix file encoding

* fix whitespaces

---------

Co-authored-by: lukasz.rozmej <lukasz.rozmej@gmail.com>
  • Loading branch information
marcindsobczak and LukaszRozmej authored Dec 27, 2023
1 parent f3ed968 commit 8fa3263
Show file tree
Hide file tree
Showing 30 changed files with 562 additions and 77 deletions.
2 changes: 1 addition & 1 deletion src/Nethermind/Nethermind.Api/IApiWithStores.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ namespace Nethermind.Api
{
public interface IApiWithStores : IBasicApi
{
ITxStorage? BlobTxStorage { get; set; }
IBlobTxStorage? BlobTxStorage { get; set; }
IBlockTree? BlockTree { get; set; }
IBloomStorage? BloomStorage { get; set; }
IChainLevelInfoRepository? ChainLevelInfoRepository { get; set; }
Expand Down
2 changes: 1 addition & 1 deletion src/Nethermind/Nethermind.Api/NethermindApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public IBlockchainBridge CreateBlockchainBridge()
}

public IAbiEncoder AbiEncoder { get; } = Nethermind.Abi.AbiEncoder.Instance;
public ITxStorage? BlobTxStorage { get; set; }
public IBlobTxStorage? BlobTxStorage { get; set; }
public IBlockchainProcessor? BlockchainProcessor { get; set; }
public CompositeBlockPreprocessorStep BlockPreprocessor { get; } = new();
public IBlockProcessingQueue? BlockProcessingQueue { get; set; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ protected virtual TxPool.TxPool CreateTxPool() =>
EthereumEcdsa,
new BlobTxStorage(),
new ChainHeadInfoProvider(new FixedForkActivationChainHeadSpecProvider(SpecProvider), BlockTree, ReadOnlyState),
new TxPoolConfig() { BlobSupportEnabled = true },
new TxPoolConfig() { BlobsSupport = BlobsSupportMode.InMemory },
new TxValidator(SpecProvider.ChainId),
LogManager,
TransactionComparerProvider.GetDefaultComparer());
Expand Down
6 changes: 6 additions & 0 deletions src/Nethermind/Nethermind.Db.Rocks/ColumnDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ public void PutSpan(ReadOnlySpan<byte> key, ReadOnlySpan<byte> value, WriteFlags
return _mainDb.GetAllCore(iterator);
}

public IEnumerable<byte[]> GetAllKeys(bool ordered = false)
{
Iterator iterator = _mainDb.CreateIterator(ordered, _columnFamily);
return _mainDb.GetAllKeysCore(iterator);
}

public IEnumerable<byte[]> GetAllValues(bool ordered = false)
{
Iterator iterator = _mainDb.CreateIterator(ordered, _columnFamily);
Expand Down
53 changes: 53 additions & 0 deletions src/Nethermind/Nethermind.Db.Rocks/DbOnTheRocks.cs
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,17 @@ protected internal Iterator CreateIterator(bool ordered = false, ColumnFamilyHan
}
}

public IEnumerable<byte[]> GetAllKeys(bool ordered = false)
{
if (_isDisposing)
{
throw new ObjectDisposedException($"Attempted to read form a disposed database {Name}");
}

Iterator iterator = CreateIterator(ordered);
return GetAllKeysCore(iterator);
}

public IEnumerable<byte[]> GetAllValues(bool ordered = false)
{
ObjectDisposedException.ThrowIf(_isDisposing, this);
Expand Down Expand Up @@ -708,6 +719,48 @@ internal IEnumerable<byte[]> GetAllValuesCore(Iterator iterator)
}
}

internal IEnumerable<byte[]> GetAllKeysCore(Iterator iterator)
{
try
{
try
{
iterator.SeekToFirst();
}
catch (RocksDbSharpException e)
{
CreateMarkerIfCorrupt(e);
throw;
}

while (iterator.Valid())
{
yield return iterator.Key();
try
{
iterator.Next();
}
catch (RocksDbSharpException e)
{
CreateMarkerIfCorrupt(e);
throw;
}
}
}
finally
{
try
{
iterator.Dispose();
}
catch (RocksDbSharpException e)
{
CreateMarkerIfCorrupt(e);
throw;
}
}
}

public IEnumerable<KeyValuePair<byte[], byte[]?>> GetAllCore(Iterator iterator)
{
try
Expand Down
2 changes: 2 additions & 0 deletions src/Nethermind/Nethermind.Db.Rpc/RpcDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ public void Clear() { }

public IEnumerable<KeyValuePair<byte[], byte[]>> GetAll(bool ordered = false) => _recordDb.GetAll();

public IEnumerable<byte[]> GetAllKeys(bool ordered = false) => _recordDb.GetAllKeys();

public IEnumerable<byte[]> GetAllValues(bool ordered = false) => _recordDb.GetAllValues();

public IWriteBatch StartWriteBatch()
Expand Down
3 changes: 2 additions & 1 deletion src/Nethermind/Nethermind.Db/BlobTxsColumns.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ namespace Nethermind.Db;
public enum BlobTxsColumns
{
FullBlobTxs,
LightBlobTxs
LightBlobTxs,
ProcessedTxs
}
3 changes: 3 additions & 0 deletions src/Nethermind/Nethermind.Db/CompressingDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@ private static ReadOnlySpan<byte> Compress(ReadOnlySpan<byte> bytes, Span<byte>
public IEnumerable<KeyValuePair<byte[], byte[]>> GetAll(bool ordered = false) => _wrapped.GetAll(ordered)
.Select(kvp => new KeyValuePair<byte[], byte[]>(kvp.Key, Decompress(kvp.Value)));

public IEnumerable<byte[]> GetAllKeys(bool ordered = false) =>
_wrapped.GetAllKeys(ordered).Select(Decompress);

public IEnumerable<byte[]> GetAllValues(bool ordered = false) =>
_wrapped.GetAllValues(ordered).Select(Decompress);

Expand Down
2 changes: 2 additions & 0 deletions src/Nethermind/Nethermind.Db/FullPruning/FullPruningDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ public void Dispose()

public IEnumerable<KeyValuePair<byte[], byte[]>> GetAll(bool ordered = false) => _currentDb.GetAll(ordered);

public IEnumerable<byte[]> GetAllKeys(bool ordered = false) => _currentDb.GetAllKeys(ordered);

public IEnumerable<byte[]> GetAllValues(bool ordered = false) => _currentDb.GetAllValues(ordered);

// we need to remove from both DB's
Expand Down
1 change: 1 addition & 0 deletions src/Nethermind/Nethermind.Db/IDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ public interface IDb : IKeyValueStoreWithBatching, IDbMeta, IDisposable
string Name { get; }
KeyValuePair<byte[], byte[]?>[] this[byte[][] keys] { get; }
IEnumerable<KeyValuePair<byte[], byte[]?>> GetAll(bool ordered = false);
IEnumerable<byte[]> GetAllKeys(bool ordered = false);
IEnumerable<byte[]> GetAllValues(bool ordered = false);

public IReadOnlyDb CreateReadOnly(bool createInMemWriteStore) => new ReadOnlyDb(this, createInMemWriteStore);
Expand Down
2 changes: 2 additions & 0 deletions src/Nethermind/Nethermind.Db/MemDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ public void Clear()

public IEnumerable<KeyValuePair<byte[], byte[]?>> GetAll(bool ordered = false) => _db;

public IEnumerable<byte[]> GetAllKeys(bool ordered = false) => Keys;

public IEnumerable<byte[]> GetAllValues(bool ordered = false) => Values;

public virtual IWriteBatch StartWriteBatch()
Expand Down
1 change: 1 addition & 0 deletions src/Nethermind/Nethermind.Db/NullDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public void Clear() { }

public IEnumerable<KeyValuePair<byte[], byte[]>> GetAll(bool ordered = false) => Enumerable.Empty<KeyValuePair<byte[], byte[]>>();

public IEnumerable<byte[]> GetAllKeys(bool ordered = false) => Enumerable.Empty<byte[]>();
public IEnumerable<byte[]> GetAllValues(bool ordered = false) => Enumerable.Empty<byte[]>();

public IWriteBatch StartWriteBatch()
Expand Down
2 changes: 2 additions & 0 deletions src/Nethermind/Nethermind.Db/ReadOnlyDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ public KeyValuePair<byte[], byte[]>[] this[byte[][] keys]

public IEnumerable<KeyValuePair<byte[], byte[]>> GetAll(bool ordered = false) => _memDb.GetAll();

public IEnumerable<byte[]> GetAllKeys(bool ordered = false) => _memDb.GetAllKeys();

public IEnumerable<byte[]> GetAllValues(bool ordered = false) => _memDb.GetAllValues();

public IWriteBatch StartWriteBatch()
Expand Down
2 changes: 2 additions & 0 deletions src/Nethermind/Nethermind.Db/SimpleFilePublicKeyDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ public void Clear()

public IEnumerable<KeyValuePair<byte[], byte[]>> GetAll(bool ordered = false) => _cache;

public IEnumerable<byte[]> GetAllKeys(bool ordered = false) => _cache.Keys;

public IEnumerable<byte[]> GetAllValues(bool ordered = false) => _cache.Values;

public IWriteBatch StartWriteBatch()
Expand Down
2 changes: 1 addition & 1 deletion src/Nethermind/Nethermind.Init/Steps/InitDatabase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public virtual async Task Execute(CancellationToken _)
try
{
bool useReceiptsDb = initConfig.StoreReceipts || syncConfig.DownloadReceiptsInFastSync;
bool useBlobsDb = txPoolConfig is { BlobSupportEnabled: true, PersistentBlobStorageEnabled: true };
bool useBlobsDb = txPoolConfig.BlobsSupport.IsPersistentStorage();
InitDbApi(initConfig, dbConfig, initConfig.StoreReceipts || syncConfig.DownloadReceiptsInFastSync);
StandardDbInitializer dbInitializer = new(_api.DbProvider, _api.RocksDbFactory, _api.MemDbFactory, _api.FileSystem);
await dbInitializer.InitStandardDbsAsync(useReceiptsDb, useBlobsDb);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// SPDX-FileCopyrightText: 2023 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

using System.Threading.Tasks;
using FluentAssertions;
using Nethermind.Blockchain;
using Nethermind.Core;
using Nethermind.Core.Specs;
using Nethermind.Core.Test.Builders;
using Nethermind.Crypto;
using Nethermind.Db;
using Nethermind.Int256;
using Nethermind.Logging;
using Nethermind.Specs;
using Nethermind.TxPool;
using NSubstitute;
using NUnit.Framework;

namespace Nethermind.Merge.Plugin.Test;

[TestFixture]
[Parallelizable(ParallelScope.All)]
public class ProcessedTransactionsDbCleanerTests
{
private readonly ILogManager _logManager = LimboLogs.Instance;
private readonly ISpecProvider _specProvider = MainnetSpecProvider.Instance;

[Test]
public async Task should_remove_processed_txs_from_db_after_finalization([Values(0, 1, 42, 358)] long blockOfTxs, [Values(1, 42, 358)] long finalizedBlock)
{
Transaction GetTx(PrivateKey sender)
{
return Build.A.Transaction
.WithShardBlobTxTypeAndFields()
.WithMaxFeePerGas(UInt256.One)
.WithMaxPriorityFeePerGas(UInt256.One)
.WithNonce(UInt256.Zero)
.SignedAndResolved(new EthereumEcdsa(_specProvider.ChainId, _logManager), sender).TestObject;
}

IColumnsDb<BlobTxsColumns> columnsDb = new MemColumnsDb<BlobTxsColumns>(BlobTxsColumns.ProcessedTxs);
BlobTxStorage blobTxStorage = new(columnsDb);
Transaction[] txs = { GetTx(TestItem.PrivateKeyA), GetTx(TestItem.PrivateKeyB) };

blobTxStorage.AddBlobTransactionsFromBlock(blockOfTxs, txs);

blobTxStorage.TryGetBlobTransactionsFromBlock(blockOfTxs, out Transaction[]? returnedTxs).Should().BeTrue();
returnedTxs!.Length.Should().Be(2);

IBlockFinalizationManager finalizationManager = Substitute.For<IBlockFinalizationManager>();
ProcessedTransactionsDbCleaner dbCleaner = new(finalizationManager, columnsDb.GetColumnDb(BlobTxsColumns.ProcessedTxs), _logManager);

finalizationManager.BlocksFinalized += Raise.EventWith(
new FinalizeEventArgs(Build.A.BlockHeader.TestObject,
Build.A.BlockHeader.WithNumber(finalizedBlock).TestObject));

await dbCleaner.CleaningTask;

blobTxStorage.TryGetBlobTransactionsFromBlock(blockOfTxs, out returnedTxs).Should().Be(blockOfTxs > finalizedBlock);
}
}
12 changes: 8 additions & 4 deletions src/Nethermind/Nethermind.Merge.Plugin/MergePlugin.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
// SPDX-License-Identifier: LGPL-3.0-only

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading;
Expand All @@ -16,7 +15,6 @@
using Nethermind.Consensus.Rewards;
using Nethermind.Consensus.Validators;
using Nethermind.Core;
using Nethermind.Core.Crypto;
using Nethermind.Core.Exceptions;
using Nethermind.Db;
using Nethermind.Facade.Proxy;
Expand All @@ -26,14 +24,13 @@
using Nethermind.Logging;
using Nethermind.Merge.Plugin.BlockProduction;
using Nethermind.Merge.Plugin.BlockProduction.Boost;
using Nethermind.Merge.Plugin.Data;
using Nethermind.Merge.Plugin.GC;
using Nethermind.Merge.Plugin.Handlers;
using Nethermind.Merge.Plugin.InvalidChainTracker;
using Nethermind.Merge.Plugin.Synchronization;
using Nethermind.Synchronization.ParallelSync;
using Nethermind.Synchronization.Reporting;
using Nethermind.Trie.Pruning;
using Nethermind.TxPool;

namespace Nethermind.Merge.Plugin;

Expand All @@ -44,6 +41,7 @@ public partial class MergePlugin : IConsensusWrapperPlugin, ISynchronizationPlug
protected IMergeConfig _mergeConfig = null!;
private ISyncConfig _syncConfig = null!;
protected IBlocksConfig _blocksConfig = null!;
protected ITxPoolConfig _txPoolConfig = null!;
protected IPoSSwitcher _poSSwitcher = NoPoS.Instance;
private IBeaconPivot? _beaconPivot;
private BeaconSync? _beaconSync;
Expand All @@ -70,6 +68,7 @@ public virtual Task Init(INethermindApi nethermindApi)
_mergeConfig = nethermindApi.Config<IMergeConfig>();
_syncConfig = nethermindApi.Config<ISyncConfig>();
_blocksConfig = nethermindApi.Config<IBlocksConfig>();
_txPoolConfig = nethermindApi.Config<ITxPoolConfig>();

MigrateSecondsPerSlot(_blocksConfig, _mergeConfig);

Expand Down Expand Up @@ -102,6 +101,11 @@ public virtual Task Init(INethermindApi nethermindApi)
_api.PoSSwitcher = _poSSwitcher;
_api.DisposeStack.Push(_invalidChainTracker);
_blockFinalizationManager = new ManualBlockFinalizationManager();
if (_txPoolConfig.BlobsSupport.SupportsReorgs())
{
ProcessedTransactionsDbCleaner processedTransactionsDbCleaner = new(_blockFinalizationManager, _api.DbProvider.BlobTransactionsDb.GetColumnDb(BlobTxsColumns.ProcessedTxs), _api.LogManager);
_api.DisposeStack.Push(processedTransactionsDbCleaner);
}

_api.RewardCalculatorSource = new MergeRewardCalculatorSource(
_api.RewardCalculatorSource ?? NoBlockRewards.Instance, _poSSwitcher);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// SPDX-FileCopyrightText: 2023 Demerzel Solutions Limited
// SPDX-License-Identifier: LGPL-3.0-only

using System;
using System.Threading.Tasks;
using Nethermind.Blockchain;
using Nethermind.Core;
using Nethermind.Core.Extensions;
using Nethermind.Db;
using Nethermind.Logging;

namespace Nethermind.Merge.Plugin;

public class ProcessedTransactionsDbCleaner : IDisposable
{
private readonly IBlockFinalizationManager _finalizationManager;
private readonly IDb _processedTxsDb;
private readonly ILogger _logger;
private long _lastFinalizedBlock = 0;
public Task CleaningTask { get; private set; } = Task.CompletedTask;

public ProcessedTransactionsDbCleaner(IBlockFinalizationManager finalizationManager, IDb processedTxsDb, ILogManager logManager)
{
_finalizationManager = finalizationManager ?? throw new ArgumentNullException(nameof(finalizationManager));
_processedTxsDb = processedTxsDb ?? throw new ArgumentNullException(nameof(processedTxsDb));
_logger = logManager.GetClassLogger() ?? throw new ArgumentNullException(nameof(logManager));

_finalizationManager.BlocksFinalized += OnBlocksFinalized;
}

private void OnBlocksFinalized(object? sender, FinalizeEventArgs e)
{
if (e.FinalizedBlocks.Count > 0 && e.FinalizedBlocks[0].Number > _lastFinalizedBlock)
{
CleaningTask = Task.Run(() => CleanProcessedTransactionsDb(e.FinalizedBlocks[0].Number));
}
}

private void CleanProcessedTransactionsDb(long newlyFinalizedBlockNumber)
{
try
{
using (IWriteBatch writeBatch = _processedTxsDb.StartWriteBatch())
{
foreach (byte[] key in _processedTxsDb.GetAllKeys())
{
long blockNumber = key.ToLongFromBigEndianByteArrayWithoutLeadingZeros();
if (newlyFinalizedBlockNumber >= blockNumber)
{
if (_logger.IsTrace) _logger.Trace($"Cleaning processed blob txs from block {blockNumber}");
writeBatch.Delete(blockNumber);
}
}
}

if (_logger.IsDebug) _logger.Debug($"Cleaned processed blob txs from block {_lastFinalizedBlock} to block {newlyFinalizedBlockNumber}");

_lastFinalizedBlock = newlyFinalizedBlockNumber;
}
catch (Exception exception)
{
if (_logger.IsError) _logger.Error($"Couldn't correctly clean db with processed transactions. Newly finalized block {newlyFinalizedBlockNumber}, last finalized block: {_lastFinalizedBlock}", exception);
}
}

public void Dispose()
{
_finalizationManager.BlocksFinalized -= OnBlocksFinalized;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public void RequestTransactionsEth68(Action<V66.Messages.GetPooledTransactionsMe
packetSizeLeft = TransactionsMessage.MaxPacketSize;
}

if (_txPoolConfig.BlobSupportEnabled || txType != TxType.Blob)
if (_txPoolConfig.BlobsSupport.IsEnabled() || txType != TxType.Blob)
{
hashesToRequest.Add(discoveredTxHashesAndSizes[i].Hash);
packetSizeLeft -= txSize;
Expand Down
Loading

0 comments on commit 8fa3263

Please sign in to comment.