From 31874ba5a4abbc2dca7b985f04019485a339a71c Mon Sep 17 00:00:00 2001 From: Dmytro Haidashenko <34754799+dhaidashenko@users.noreply.github.com> Date: Mon, 23 Sep 2024 13:07:00 +0200 Subject: [PATCH] BCI-3668 Optimise HeadTracker's memory usage (#14130) * base benchmarks * Reduce allocs noise from mocks in Backfill Benchmark * Optimize HeadTracker's memory usage * avoid .Parent race in confirmer * optimise block history estimator heads usage * update telemetry test to use new Parent and IsFinalized * avoid redundant allocations * Revert "avoid redundant allocations" This reverts commit 76343e573980ac81df6d6e3b3409a3d164aeb579. * lint fixes * remove redundant files * drop cycle detection logic from head methods. Heads cycle prevention should be enough * drop unused test * nits --- .changeset/short-shoes-crash.md | 5 + common/txmgr/confirmer.go | 5 +- .../evm/client/simulated_backend_client.go | 10 +- .../chains/evm/gas/block_history_estimator.go | 3 +- .../evm/gas/block_history_estimator_test.go | 6 +- core/chains/evm/headtracker/head_saver.go | 12 +- core/chains/evm/headtracker/head_tracker.go | 3 +- .../evm/headtracker/head_tracker_test.go | 299 +++++++----------- core/chains/evm/headtracker/heads.go | 163 ++++++---- core/chains/evm/headtracker/heads_test.go | 125 ++++++-- core/chains/evm/headtracker/heap.go | 35 ++ core/chains/evm/headtracker/types/types.go | 4 + core/chains/evm/log/broadcaster.go | 4 +- core/chains/evm/log/registrations.go | 5 +- core/chains/evm/logpoller/log_poller.go | 6 +- .../evm/logpoller/log_poller_internal_test.go | 3 +- core/chains/evm/txmgr/confirmer_test.go | 115 +++---- core/chains/evm/txmgr/evm_tx_store_test.go | 42 +-- core/chains/evm/txmgr/finalizer_test.go | 35 +- core/chains/evm/txmgr/txmgr_test.go | 15 +- core/chains/evm/types/head_test.go | 51 ++- core/chains/evm/types/models.go | 97 ++---- core/chains/evm/types/models_test.go | 97 +++--- core/internal/cltest/cltest.go | 6 +- core/internal/cltest/factories.go | 5 +- .../headreporter/telemetry_reporter_test.go | 35 +- .../evmregistry/v21/block_subscriber.go | 2 +- .../evmregistry/v21/block_subscriber_test.go | 17 +- .../arbitrum_block_translator_test.go | 53 ++-- .../relay/evm/mercury/v1/data_source_test.go | 7 +- 30 files changed, 681 insertions(+), 584 deletions(-) create mode 100644 .changeset/short-shoes-crash.md create mode 100644 core/chains/evm/headtracker/heap.go diff --git a/.changeset/short-shoes-crash.md b/.changeset/short-shoes-crash.md new file mode 100644 index 00000000000..30431916241 --- /dev/null +++ b/.changeset/short-shoes-crash.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +Optimize HeadTracker's memory usage #internal diff --git a/common/txmgr/confirmer.go b/common/txmgr/confirmer.go index bbf1d3b27b7..4eaa6739d58 100644 --- a/common/txmgr/confirmer.go +++ b/common/txmgr/confirmer.go @@ -1148,10 +1148,11 @@ func hasReceiptInLongestChain[ } } } - if head.GetParent() == nil { + + head = head.GetParent() + if head == nil { return false } - head = head.GetParent() } } diff --git a/core/chains/evm/client/simulated_backend_client.go b/core/chains/evm/client/simulated_backend_client.go index 6c569b16b85..11828e58710 100644 --- a/core/chains/evm/client/simulated_backend_client.go +++ b/core/chains/evm/client/simulated_backend_client.go @@ -320,7 +320,15 @@ func (c *SimulatedBackendClient) SubscribeNewHead( case h := <-ch: var head *evmtypes.Head if h != nil { - head = &evmtypes.Head{Difficulty: h.Difficulty, Timestamp: time.Unix(int64(h.Time), 0), Number: h.Number.Int64(), Hash: h.Hash(), ParentHash: h.ParentHash, Parent: lastHead, EVMChainID: ubig.New(c.chainId)} + head = &evmtypes.Head{ + Difficulty: h.Difficulty, + Timestamp: time.Unix(int64(h.Time), 0), //nolint:gosec + Number: h.Number.Int64(), + Hash: h.Hash(), + ParentHash: h.ParentHash, + EVMChainID: ubig.New(c.chainId), + } + head.Parent.Store(lastHead) lastHead = head } select { diff --git a/core/chains/evm/gas/block_history_estimator.go b/core/chains/evm/gas/block_history_estimator.go index b933ea23825..0386d92a0d6 100644 --- a/core/chains/evm/gas/block_history_estimator.go +++ b/core/chains/evm/gas/block_history_estimator.go @@ -655,12 +655,13 @@ func (b *BlockHistoryEstimator) FetchBlocks(ctx context.Context, head *evmtypes. } blocks := make(map[int64]evmtypes.Block) + earliestInChain := head.EarliestInChain() for _, block := range b.getBlocks() { // Make a best-effort to be re-org resistant using the head // chain, refetch blocks that got re-org'd out. // NOTE: Any blocks in the history that are older than the oldest block // in the provided chain will be assumed final. - if block.Number < head.EarliestInChain().BlockNumber() { + if block.Number < earliestInChain.BlockNumber() { blocks[block.Number] = block } else if head.IsInChain(block.Hash) { blocks[block.Number] = block diff --git a/core/chains/evm/gas/block_history_estimator_test.go b/core/chains/evm/gas/block_history_estimator_test.go index c2f4a2219cb..d84137cb7cf 100644 --- a/core/chains/evm/gas/block_history_estimator_test.go +++ b/core/chains/evm/gas/block_history_estimator_test.go @@ -515,7 +515,7 @@ func TestBlockHistoryEstimator_FetchBlocks(t *testing.T) { head2 := evmtypes.NewHead(big.NewInt(2), b2.Hash, b1.Hash, uint64(time.Now().Unix()), ubig.New(testutils.FixtureChainID)) head3 := evmtypes.NewHead(big.NewInt(3), b3.Hash, b2.Hash, uint64(time.Now().Unix()), ubig.New(testutils.FixtureChainID)) - head3.Parent = &head2 + head3.Parent.Store(&head2) err := bhe.FetchBlocks(tests.Context(t), &head3) require.NoError(t, err) @@ -570,7 +570,7 @@ func TestBlockHistoryEstimator_FetchBlocks(t *testing.T) { // RE-ORG, head2 and head3 have different hash than saved b2 and b3 head2 := evmtypes.NewHead(big.NewInt(2), utils.NewHash(), b1.Hash, uint64(time.Now().Unix()), ubig.New(testutils.FixtureChainID)) head3 := evmtypes.NewHead(big.NewInt(3), utils.NewHash(), head2.Hash, uint64(time.Now().Unix()), ubig.New(testutils.FixtureChainID)) - head3.Parent = &head2 + head3.Parent.Store(&head2) ethClient.On("BatchCallContext", mock.Anything, mock.MatchedBy(func(b []rpc.BatchElem) bool { return len(b) == 2 && @@ -643,7 +643,7 @@ func TestBlockHistoryEstimator_FetchBlocks(t *testing.T) { // head2 and head3 have identical hash to saved blocks head2 := evmtypes.NewHead(big.NewInt(2), b2.Hash, b1.Hash, uint64(time.Now().Unix()), ubig.New(testutils.FixtureChainID)) head3 := evmtypes.NewHead(big.NewInt(3), b3.Hash, head2.Hash, uint64(time.Now().Unix()), ubig.New(testutils.FixtureChainID)) - head3.Parent = &head2 + head3.Parent.Store(&head2) err := bhe.FetchBlocks(tests.Context(t), &head3) require.NoError(t, err) diff --git a/core/chains/evm/headtracker/head_saver.go b/core/chains/evm/headtracker/head_saver.go index 320e88a19bc..f2613334c49 100644 --- a/core/chains/evm/headtracker/head_saver.go +++ b/core/chains/evm/headtracker/head_saver.go @@ -35,13 +35,12 @@ func NewHeadSaver(lggr logger.Logger, orm ORM, config commontypes.Config, htConf } func (hs *headSaver) Save(ctx context.Context, head *evmtypes.Head) error { - if err := hs.orm.IdempotentInsertHead(ctx, head); err != nil { + // adding new head might form a cycle, so it's better to validate cached chain before persisting it + if err := hs.heads.AddHeads(head); err != nil { return err } - hs.heads.AddHeads(head) - - return nil + return hs.orm.IdempotentInsertHead(ctx, head) } func (hs *headSaver) Load(ctx context.Context, latestFinalized int64) (chain *evmtypes.Head, err error) { @@ -51,7 +50,10 @@ func (hs *headSaver) Load(ctx context.Context, latestFinalized int64) (chain *ev return nil, err } - hs.heads.AddHeads(heads...) + err = hs.heads.AddHeads(heads...) + if err != nil { + return nil, fmt.Errorf("failed to populate cache with loaded heads: %w", err) + } return hs.heads.LatestHead(), nil } diff --git a/core/chains/evm/headtracker/head_tracker.go b/core/chains/evm/headtracker/head_tracker.go index f7607189f7e..bb39b3b5c79 100644 --- a/core/chains/evm/headtracker/head_tracker.go +++ b/core/chains/evm/headtracker/head_tracker.go @@ -11,14 +11,13 @@ import ( "github.com/smartcontractkit/chainlink/v2/common/headtracker" commontypes "github.com/smartcontractkit/chainlink/v2/common/headtracker/types" - evmclient "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client" httypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/headtracker/types" evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" ) func NewHeadTracker( lggr logger.Logger, - ethClient evmclient.Client, + ethClient httypes.Client, config commontypes.Config, htConfig commontypes.HeadTrackerConfig, headBroadcaster httypes.HeadBroadcaster, diff --git a/core/chains/evm/headtracker/head_tracker_test.go b/core/chains/evm/headtracker/head_tracker_test.go index 21ff1b1a929..de54f12ff96 100644 --- a/core/chains/evm/headtracker/head_tracker_test.go +++ b/core/chains/evm/headtracker/head_tracker_test.go @@ -12,7 +12,6 @@ import ( "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" - gethTypes "github.com/ethereum/go-ethereum/core/types" "github.com/onsi/gomega" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -21,21 +20,19 @@ import ( "go.uber.org/zap/zaptest/observer" "golang.org/x/exp/maps" + commonconfig "github.com/smartcontractkit/chainlink-common/pkg/config" + "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox/mailboxtest" "github.com/jmoiron/sqlx" - commonconfig "github.com/smartcontractkit/chainlink-common/pkg/config" - "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox" "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" htmocks "github.com/smartcontractkit/chainlink/v2/common/headtracker/mocks" commontypes "github.com/smartcontractkit/chainlink/v2/common/headtracker/types" - "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/evmtest" - - evmclient "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client" evmclimocks "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client/mocks" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/config/toml" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/headtracker" @@ -45,11 +42,13 @@ import ( evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils" ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/evmtest" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" ) -func firstHead(t *testing.T, db *sqlx.DB) (h evmtypes.Head) { - if err := db.Get(&h, `SELECT * FROM evm.heads ORDER BY number ASC LIMIT 1`); err != nil { +func firstHead(t *testing.T, db *sqlx.DB) *evmtypes.Head { + h := new(evmtypes.Head) + if err := db.Get(h, `SELECT * FROM evm.heads ORDER BY number ASC LIMIT 1`); err != nil { t.Fatal(err) } return h @@ -578,27 +577,10 @@ func TestHeadTracker_SwitchesToLongestChainWithHeadSamplingEnabled(t *testing.T) checker.On("OnNewLongestChain", mock.Anything, mock.Anything). Run(func(args mock.Arguments) { h := args.Get(1).(*evmtypes.Head) + // This is the new longest chain [0, 5], check that it came with its parents + assert.Equal(t, uint32(6), h.ChainLength()) + assertChainWithParents(t, blocksForked, 5, 1, h) - assert.Equal(t, int64(5), h.Number) - assert.Equal(t, blocksForked.Head(5).Hash, h.Hash) - - // This is the new longest chain, check that it came with its parents - if !assert.NotNil(t, h.Parent) { - return - } - assert.Equal(t, h.Parent.Hash, blocksForked.Head(4).Hash) - if !assert.NotNil(t, h.Parent.Parent) { - return - } - assert.Equal(t, h.Parent.Parent.Hash, blocksForked.Head(3).Hash) - if !assert.NotNil(t, h.Parent.Parent.Parent) { - return - } - assert.Equal(t, h.Parent.Parent.Parent.Hash, blocksForked.Head(2).Hash) - if !assert.NotNil(t, h.Parent.Parent.Parent.Parent) { - return - } - assert.Equal(t, h.Parent.Parent.Parent.Parent.Hash, blocksForked.Head(1).Hash) lastLongestChainAwaiter.ItHappened() }).Return().Once() @@ -639,6 +621,16 @@ func TestHeadTracker_SwitchesToLongestChainWithHeadSamplingEnabled(t *testing.T) } } +func assertChainWithParents(t testing.TB, blocks *blocks, startBN, endBN uint64, h *evmtypes.Head) { + for blockNumber := startBN; blockNumber >= endBN; blockNumber-- { + assert.NotNil(t, h) + assert.Equal(t, blockNumber, uint64(h.Number)) + assert.Equal(t, blocks.Head(blockNumber).Hash, h.Hash) + // move to parent + h = h.Parent.Load() + } +} + func TestHeadTracker_SwitchesToLongestChainWithHeadSamplingDisabled(t *testing.T) { t.Parallel() @@ -725,34 +717,13 @@ func TestHeadTracker_SwitchesToLongestChainWithHeadSamplingDisabled(t *testing.T checker.On("OnNewLongestChain", mock.Anything, mock.Anything). Run(func(args mock.Arguments) { h := args.Get(1).(*evmtypes.Head) - require.Equal(t, int64(4), h.Number) - require.Equal(t, blocks.Head(4).Hash, h.Hash) - - // Check that the block came with its parents - require.NotNil(t, h.Parent) - require.Equal(t, h.Parent.Hash, blocks.Head(3).Hash) - require.NotNil(t, h.Parent.Parent.Hash) - require.Equal(t, h.Parent.Parent.Hash, blocks.Head(2).Hash) - require.NotNil(t, h.Parent.Parent.Parent) - require.Equal(t, h.Parent.Parent.Parent.Hash, blocks.Head(1).Hash) + assertChainWithParents(t, blocks, 4, 1, h) }).Return().Once() checker.On("OnNewLongestChain", mock.Anything, mock.Anything). Run(func(args mock.Arguments) { h := args.Get(1).(*evmtypes.Head) - - require.Equal(t, int64(5), h.Number) - require.Equal(t, blocksForked.Head(5).Hash, h.Hash) - - // This is the new longest chain, check that it came with its parents - require.NotNil(t, h.Parent) - require.Equal(t, h.Parent.Hash, blocksForked.Head(4).Hash) - require.NotNil(t, h.Parent.Parent) - require.Equal(t, h.Parent.Parent.Hash, blocksForked.Head(3).Hash) - require.NotNil(t, h.Parent.Parent.Parent) - require.Equal(t, h.Parent.Parent.Parent.Hash, blocksForked.Head(2).Hash) - require.NotNil(t, h.Parent.Parent.Parent.Parent) - require.Equal(t, h.Parent.Parent.Parent.Parent.Hash, blocksForked.Head(1).Hash) + assertChainWithParents(t, blocksForked, 5, 1, h) lastLongestChainAwaiter.ItHappened() }).Return().Once() @@ -811,52 +782,37 @@ func TestHeadTracker_Backfill(t *testing.T) { now := uint64(time.Now().UTC().Unix()) - gethHead0 := &gethTypes.Header{ - Number: big.NewInt(0), - ParentHash: common.BigToHash(big.NewInt(0)), - Time: now, - } - head0 := evmtypes.NewHead(gethHead0.Number, utils.NewHash(), gethHead0.ParentHash, gethHead0.Time, ubig.New(testutils.FixtureChainID)) + head0 := evmtypes.NewHead(big.NewInt(0), utils.NewHash(), common.BigToHash(big.NewInt(0)), now, ubig.New(testutils.FixtureChainID)) - h1 := *testutils.Head(1) + h1 := testutils.Head(1) h1.ParentHash = head0.Hash - gethHead8 := &gethTypes.Header{ - Number: big.NewInt(8), - ParentHash: utils.NewHash(), - Time: now, - } - head8 := evmtypes.NewHead(gethHead8.Number, utils.NewHash(), gethHead8.ParentHash, gethHead8.Time, ubig.New(testutils.FixtureChainID)) + head8 := evmtypes.NewHead(big.NewInt(8), utils.NewHash(), utils.NewHash(), now, ubig.New(testutils.FixtureChainID)) - h9 := *testutils.Head(9) + h9 := testutils.Head(9) h9.ParentHash = head8.Hash - gethHead10 := &gethTypes.Header{ - Number: big.NewInt(10), - ParentHash: h9.Hash, - Time: now, - } - head10 := evmtypes.NewHead(gethHead10.Number, utils.NewHash(), gethHead10.ParentHash, gethHead10.Time, ubig.New(testutils.FixtureChainID)) + head10 := evmtypes.NewHead(big.NewInt(10), utils.NewHash(), h9.Hash, now, ubig.New(testutils.FixtureChainID)) - h11 := *testutils.Head(11) + h11 := testutils.Head(11) h11.ParentHash = head10.Hash - h12 := *testutils.Head(12) + h12 := testutils.Head(12) h12.ParentHash = h11.Hash - h13 := *testutils.Head(13) + h13 := testutils.Head(13) h13.ParentHash = h12.Hash - h14Orphaned := *testutils.Head(14) + h14Orphaned := testutils.Head(14) h14Orphaned.ParentHash = h13.Hash - h14 := *testutils.Head(14) + h14 := testutils.Head(14) h14.ParentHash = h13.Hash - h15 := *testutils.Head(15) + h15 := testutils.Head(15) h15.ParentHash = h14.Hash - heads := []evmtypes.Head{ + heads := []*evmtypes.Head{ h9, h11, h12, @@ -869,7 +825,7 @@ func TestHeadTracker_Backfill(t *testing.T) { ctx := tests.Context(t) type opts struct { - Heads []evmtypes.Head + Heads []*evmtypes.Head FinalityTagEnabled bool FinalizedBlockOffset uint32 FinalityDepth uint32 @@ -889,7 +845,7 @@ func TestHeadTracker_Backfill(t *testing.T) { db := pgtest.NewSqlxDB(t) orm := headtracker.NewORM(*testutils.FixtureChainID, db) for i := range opts.Heads { - require.NoError(t, orm.IdempotentInsertHead(tests.Context(t), &opts.Heads[i])) + require.NoError(t, orm.IdempotentInsertHead(tests.Context(t), opts.Heads[i])) } ethClient := testutils.NewEthClientMock(t) ethClient.On("ConfiguredChainID", mock.Anything).Return(evmcfg.EVM().ChainID(), nil) @@ -904,72 +860,70 @@ func TestHeadTracker_Backfill(t *testing.T) { const expectedError = "failed to fetch latest finalized block" htu.ethClient.On("LatestFinalizedBlock", mock.Anything).Return(nil, errors.New(expectedError)).Once() - err := htu.headTracker.Backfill(ctx, &h12) + err := htu.headTracker.Backfill(ctx, h12) require.ErrorContains(t, err, expectedError) }) t.Run("returns error if latestFinalized is not valid", func(t *testing.T) { htu := newHeadTrackerUniverse(t, opts{FinalityTagEnabled: true}) htu.ethClient.On("LatestFinalizedBlock", mock.Anything).Return(nil, nil).Once() - err := htu.headTracker.Backfill(ctx, &h12) + err := htu.headTracker.Backfill(ctx, h12) require.EqualError(t, err, "failed to calculate finalized block: failed to get valid latest finalized block") }) t.Run("Returns error if finality gap is too big", func(t *testing.T) { htu := newHeadTrackerUniverse(t, opts{FinalityTagEnabled: true, MaxAllowedFinalityDepth: 2}) - htu.ethClient.On("LatestFinalizedBlock", mock.Anything).Return(&h9, nil).Once() + htu.ethClient.On("LatestFinalizedBlock", mock.Anything).Return(h9, nil).Once() - err := htu.headTracker.Backfill(ctx, &h12) + err := htu.headTracker.Backfill(ctx, h12) require.EqualError(t, err, "gap between latest finalized block (9) and current head (12) is too large (> 2)") }) t.Run("Returns error if finalized head is ahead of canonical", func(t *testing.T) { htu := newHeadTrackerUniverse(t, opts{FinalityTagEnabled: true}) - htu.ethClient.On("LatestFinalizedBlock", mock.Anything).Return(&h14Orphaned, nil).Once() + htu.ethClient.On("LatestFinalizedBlock", mock.Anything).Return(h14Orphaned, nil).Once() - err := htu.headTracker.Backfill(ctx, &h12) + err := htu.headTracker.Backfill(ctx, h12) require.EqualError(t, err, "invariant violation: expected head of canonical chain to be ahead of the latestFinalized") }) t.Run("Returns error if finalizedHead is not present in the canonical chain", func(t *testing.T) { htu := newHeadTrackerUniverse(t, opts{Heads: heads, FinalityTagEnabled: true}) - htu.ethClient.On("LatestFinalizedBlock", mock.Anything).Return(&h14Orphaned, nil).Once() + htu.ethClient.On("LatestFinalizedBlock", mock.Anything).Return(h14Orphaned, nil).Once() - err := htu.headTracker.Backfill(ctx, &h15) + err := htu.headTracker.Backfill(ctx, h15) require.EqualError(t, err, "expected finalized block to be present in canonical chain") }) t.Run("Marks all blocks in chain that are older than finalized", func(t *testing.T) { htu := newHeadTrackerUniverse(t, opts{Heads: heads, FinalityTagEnabled: true}) - assertFinalized := func(expectedFinalized bool, msg string, heads ...evmtypes.Head) { + assertFinalized := func(expectedFinalized bool, msg string, heads ...*evmtypes.Head) { for _, h := range heads { storedHead := htu.headSaver.Chain(h.Hash) - assert.Equal(t, expectedFinalized, storedHead != nil && storedHead.IsFinalized, msg, "block_number", h.Number) + assert.Equal(t, expectedFinalized, storedHead != nil && storedHead.IsFinalized.Load(), msg, "block_number", h.Number) } } - htu.ethClient.On("LatestFinalizedBlock", mock.Anything).Return(&h14, nil).Once() - err := htu.headTracker.Backfill(ctx, &h15) + htu.ethClient.On("LatestFinalizedBlock", mock.Anything).Return(h14, nil).Once() + err := htu.headTracker.Backfill(ctx, h15) require.NoError(t, err) assertFinalized(true, "expected heads to be marked as finalized after backfill", h14, h13, h12, h11) - assertFinalized(false, "expected heads to remain unfinalized", h15, head10) + assertFinalized(false, "expected heads to remain unfinalized", h15, &head10) }) t.Run("fetches a missing head", func(t *testing.T) { htu := newHeadTrackerUniverse(t, opts{Heads: heads, FinalityTagEnabled: true}) - htu.ethClient.On("LatestFinalizedBlock", mock.Anything).Return(&h9, nil).Once() + htu.ethClient.On("LatestFinalizedBlock", mock.Anything).Return(h9, nil).Once() htu.ethClient.On("HeadByHash", mock.Anything, head10.Hash). Return(&head10, nil) - err := htu.headTracker.Backfill(ctx, &h12) + err := htu.headTracker.Backfill(ctx, h12) require.NoError(t, err) h := htu.headSaver.Chain(h12.Hash) - assert.Equal(t, int64(12), h.Number) - require.NotNil(t, h.Parent) - assert.Equal(t, int64(11), h.Parent.Number) - require.NotNil(t, h.Parent.Parent) - assert.Equal(t, int64(10), h.Parent.Parent.Number) - require.NotNil(t, h.Parent.Parent.Parent) - assert.Equal(t, int64(9), h.Parent.Parent.Parent.Number) + for expectedBlockNumber := int64(12); expectedBlockNumber >= 9; expectedBlockNumber-- { + require.NotNil(t, h) + assert.Equal(t, expectedBlockNumber, h.Number) + h = h.Parent.Load() + } writtenHead, err := htu.orm.HeadByHash(tests.Context(t), head10.Hash) require.NoError(t, err) @@ -984,7 +938,7 @@ func TestHeadTracker_Backfill(t *testing.T) { htu.ethClient.On("HeadByHash", mock.Anything, head8.Hash). Return(&head8, nil) - err := htu.headTracker.Backfill(ctx, &h15) + err := htu.headTracker.Backfill(ctx, h15) require.NoError(t, err) h := htu.headSaver.Chain(h15.Hash) @@ -1005,7 +959,7 @@ func TestHeadTracker_Backfill(t *testing.T) { Return(nil, ethereum.NotFound). Once() - err := htu.headTracker.Backfill(ctx, &h12) + err := htu.headTracker.Backfill(ctx, h12) require.Error(t, err) require.ErrorContains(t, err, "fetchAndSaveHead failed: not found") @@ -1027,7 +981,7 @@ func TestHeadTracker_Backfill(t *testing.T) { cancel() }) - err := htu.headTracker.Backfill(lctx, &h12) + err := htu.headTracker.Backfill(lctx, h12) require.Error(t, err) require.ErrorContains(t, err, "fetchAndSaveHead failed: context canceled") @@ -1039,12 +993,12 @@ func TestHeadTracker_Backfill(t *testing.T) { }) t.Run("abandons backfill and returns error when fetching a block by hash fails, indicating a reorg", func(t *testing.T) { htu := newHeadTrackerUniverse(t, opts{FinalityTagEnabled: true}) - htu.ethClient.On("LatestFinalizedBlock", mock.Anything).Return(&h11, nil).Once() - htu.ethClient.On("HeadByHash", mock.Anything, h14.Hash).Return(&h14, nil).Once() - htu.ethClient.On("HeadByHash", mock.Anything, h13.Hash).Return(&h13, nil).Once() + htu.ethClient.On("LatestFinalizedBlock", mock.Anything).Return(h11, nil).Once() + htu.ethClient.On("HeadByHash", mock.Anything, h14.Hash).Return(h14, nil).Once() + htu.ethClient.On("HeadByHash", mock.Anything, h13.Hash).Return(h13, nil).Once() htu.ethClient.On("HeadByHash", mock.Anything, h12.Hash).Return(nil, errors.New("not found")).Once() - err := htu.headTracker.Backfill(ctx, &h15) + err := htu.headTracker.Backfill(ctx, h15) require.Error(t, err) require.ErrorContains(t, err, "fetchAndSaveHead failed: not found") @@ -1056,83 +1010,83 @@ func TestHeadTracker_Backfill(t *testing.T) { assert.Equal(t, int64(13), h.EarliestInChain().BlockNumber()) }) t.Run("marks head as finalized, if latestHead = finalizedHead (0 finality depth)", func(t *testing.T) { - htu := newHeadTrackerUniverse(t, opts{Heads: []evmtypes.Head{h15}, FinalityTagEnabled: true}) + htu := newHeadTrackerUniverse(t, opts{Heads: []*evmtypes.Head{h15}, FinalityTagEnabled: true}) finalizedH15 := h15 // copy h15 to have different addresses - htu.ethClient.On("LatestFinalizedBlock", mock.Anything).Return(&finalizedH15, nil).Once() - err := htu.headTracker.Backfill(ctx, &h15) + htu.ethClient.On("LatestFinalizedBlock", mock.Anything).Return(finalizedH15, nil).Once() + err := htu.headTracker.Backfill(ctx, h15) require.NoError(t, err) h := htu.headSaver.LatestChain() // Should contain 14, 13 (15 was never added). When trying to get the parent of h13 by hash, a reorg happened and backfill exited. assert.Equal(t, 1, int(h.ChainLength())) - assert.True(t, h.IsFinalized) + assert.True(t, h.IsFinalized.Load()) assert.Equal(t, h15.BlockNumber(), h.BlockNumber()) assert.Equal(t, h15.Hash, h.Hash) }) t.Run("marks block as finalized according to FinalizedBlockOffset (finality tag)", func(t *testing.T) { - htu := newHeadTrackerUniverse(t, opts{Heads: []evmtypes.Head{h15}, FinalityTagEnabled: true, FinalizedBlockOffset: 2}) - htu.ethClient.On("LatestFinalizedBlock", mock.Anything).Return(&h14, nil).Once() + htu := newHeadTrackerUniverse(t, opts{Heads: []*evmtypes.Head{h15}, FinalityTagEnabled: true, FinalizedBlockOffset: 2}) + htu.ethClient.On("LatestFinalizedBlock", mock.Anything).Return(h14, nil).Once() // calculateLatestFinalizedBlock fetches blocks at LatestFinalized - FinalizedBlockOffset - htu.ethClient.On("HeadByNumber", mock.Anything, big.NewInt(h12.Number)).Return(&h12, nil).Once() + htu.ethClient.On("HeadByNumber", mock.Anything, big.NewInt(h12.Number)).Return(h12, nil).Once() // backfill from 15 to 12 - htu.ethClient.On("HeadByHash", mock.Anything, h12.Hash).Return(&h12, nil).Once() - htu.ethClient.On("HeadByHash", mock.Anything, h13.Hash).Return(&h13, nil).Once() - htu.ethClient.On("HeadByHash", mock.Anything, h14.Hash).Return(&h14, nil).Once() - err := htu.headTracker.Backfill(ctx, &h15) + htu.ethClient.On("HeadByHash", mock.Anything, h12.Hash).Return(h12, nil).Once() + htu.ethClient.On("HeadByHash", mock.Anything, h13.Hash).Return(h13, nil).Once() + htu.ethClient.On("HeadByHash", mock.Anything, h14.Hash).Return(h14, nil).Once() + err := htu.headTracker.Backfill(ctx, h15) require.NoError(t, err) h := htu.headSaver.LatestChain() // h - must contain 15, 14, 13, 12 and only 12 is finalized assert.Equal(t, 4, int(h.ChainLength())) - for ; h.Hash != h12.Hash; h = h.Parent { - assert.False(t, h.IsFinalized) + for ; h.Hash != h12.Hash; h = h.Parent.Load() { + assert.False(t, h.IsFinalized.Load()) } - assert.True(t, h.IsFinalized) + assert.True(t, h.IsFinalized.Load()) assert.Equal(t, h12.BlockNumber(), h.BlockNumber()) assert.Equal(t, h12.Hash, h.Hash) }) t.Run("marks block as finalized according to FinalizedBlockOffset (finality depth)", func(t *testing.T) { - htu := newHeadTrackerUniverse(t, opts{Heads: []evmtypes.Head{h15}, FinalityDepth: 1, FinalizedBlockOffset: 2}) - htu.ethClient.On("HeadByNumber", mock.Anything, big.NewInt(12)).Return(&h12, nil).Once() + htu := newHeadTrackerUniverse(t, opts{Heads: []*evmtypes.Head{h15}, FinalityDepth: 1, FinalizedBlockOffset: 2}) + htu.ethClient.On("HeadByNumber", mock.Anything, big.NewInt(12)).Return(h12, nil).Once() // backfill from 15 to 12 - htu.ethClient.On("HeadByHash", mock.Anything, h14.Hash).Return(&h14, nil).Once() - htu.ethClient.On("HeadByHash", mock.Anything, h13.Hash).Return(&h13, nil).Once() - htu.ethClient.On("HeadByHash", mock.Anything, h12.Hash).Return(&h12, nil).Once() - err := htu.headTracker.Backfill(ctx, &h15) + htu.ethClient.On("HeadByHash", mock.Anything, h14.Hash).Return(h14, nil).Once() + htu.ethClient.On("HeadByHash", mock.Anything, h13.Hash).Return(h13, nil).Once() + htu.ethClient.On("HeadByHash", mock.Anything, h12.Hash).Return(h12, nil).Once() + err := htu.headTracker.Backfill(ctx, h15) require.NoError(t, err) h := htu.headSaver.LatestChain() // h - must contain 15, 14, 13, 12 and only 12 is finalized assert.Equal(t, 4, int(h.ChainLength())) - for ; h.Hash != h12.Hash; h = h.Parent { - assert.False(t, h.IsFinalized) + for ; h.Hash != h12.Hash; h = h.Parent.Load() { + assert.False(t, h.IsFinalized.Load()) } - assert.True(t, h.IsFinalized) + assert.True(t, h.IsFinalized.Load()) assert.Equal(t, h12.BlockNumber(), h.BlockNumber()) assert.Equal(t, h12.Hash, h.Hash) }) t.Run("marks block as finalized according to FinalizedBlockOffset even with instant finality", func(t *testing.T) { - htu := newHeadTrackerUniverse(t, opts{Heads: []evmtypes.Head{h15}, FinalityDepth: 0, FinalizedBlockOffset: 2}) - htu.ethClient.On("HeadByNumber", mock.Anything, big.NewInt(13)).Return(&h13, nil).Once() + htu := newHeadTrackerUniverse(t, opts{Heads: []*evmtypes.Head{h15}, FinalityDepth: 0, FinalizedBlockOffset: 2}) + htu.ethClient.On("HeadByNumber", mock.Anything, big.NewInt(13)).Return(h13, nil).Once() // backfill from 15 to 13 - htu.ethClient.On("HeadByHash", mock.Anything, h14.Hash).Return(&h14, nil).Once() - htu.ethClient.On("HeadByHash", mock.Anything, h13.Hash).Return(&h13, nil).Once() - err := htu.headTracker.Backfill(ctx, &h15) + htu.ethClient.On("HeadByHash", mock.Anything, h14.Hash).Return(h14, nil).Once() + htu.ethClient.On("HeadByHash", mock.Anything, h13.Hash).Return(h13, nil).Once() + err := htu.headTracker.Backfill(ctx, h15) require.NoError(t, err) h := htu.headSaver.LatestChain() // h - must contain 15, 14, 13, only 13 is finalized assert.Equal(t, 3, int(h.ChainLength())) - for ; h.Hash != h13.Hash; h = h.Parent { - assert.False(t, h.IsFinalized) + for ; h.Hash != h13.Hash; h = h.Parent.Load() { + assert.False(t, h.IsFinalized.Load()) } - assert.True(t, h.IsFinalized) + assert.True(t, h.IsFinalized.Load()) assert.Equal(t, h13.BlockNumber(), h.BlockNumber()) assert.Equal(t, h13.Hash, h.Hash) }) @@ -1153,7 +1107,7 @@ func TestHeadTracker_LatestAndFinalizedBlock(t *testing.T) { h13.ParentHash = h12.Hash type opts struct { - Heads []evmtypes.Head + Heads []*evmtypes.Head FinalityTagEnabled bool FinalizedBlockOffset uint32 FinalityDepth uint32 @@ -1169,7 +1123,7 @@ func TestHeadTracker_LatestAndFinalizedBlock(t *testing.T) { db := pgtest.NewSqlxDB(t) orm := headtracker.NewORM(*testutils.FixtureChainID, db) for i := range opts.Heads { - require.NoError(t, orm.IdempotentInsertHead(tests.Context(t), &opts.Heads[i])) + require.NoError(t, orm.IdempotentInsertHead(tests.Context(t), opts.Heads[i])) } ethClient := evmtest.NewEthClientMock(t) ethClient.On("ConfiguredChainID", mock.Anything).Return(testutils.FixtureChainID, nil) @@ -1221,7 +1175,7 @@ func TestHeadTracker_LatestAndFinalizedBlock(t *testing.T) { assert.Equal(t, actualLF, h11) }) t.Run("returns latest finalized block with offset from cache (finality tag)", func(t *testing.T) { - htu := newHeadTrackerUniverse(t, opts{FinalityTagEnabled: true, FinalizedBlockOffset: 1, Heads: []evmtypes.Head{*h13, *h12, *h11}}) + htu := newHeadTrackerUniverse(t, opts{FinalityTagEnabled: true, FinalizedBlockOffset: 1, Heads: []*evmtypes.Head{h13, h12, h11}}) htu.ethClient.On("HeadByNumber", mock.Anything, (*big.Int)(nil)).Return(h13, nil).Once() htu.ethClient.On("LatestFinalizedBlock", mock.Anything).Return(h12, nil).Once() @@ -1231,7 +1185,7 @@ func TestHeadTracker_LatestAndFinalizedBlock(t *testing.T) { assert.Equal(t, actualLF.Number, h11.Number) }) t.Run("returns latest finalized block with offset from RPC (finality tag)", func(t *testing.T) { - htu := newHeadTrackerUniverse(t, opts{FinalityTagEnabled: true, FinalizedBlockOffset: 2, Heads: []evmtypes.Head{*h13, *h12, *h11}}) + htu := newHeadTrackerUniverse(t, opts{FinalityTagEnabled: true, FinalizedBlockOffset: 2, Heads: []*evmtypes.Head{h13, h12, h11}}) htu.ethClient.On("HeadByNumber", mock.Anything, (*big.Int)(nil)).Return(h13, nil).Once() htu.ethClient.On("LatestFinalizedBlock", mock.Anything).Return(h12, nil).Once() h10 := testutils.Head(10) @@ -1252,7 +1206,7 @@ func TestHeadTracker_LatestAndFinalizedBlock(t *testing.T) { assert.Equal(t, actualLF.Number, h13.Number) }) t.Run("returns latest finalized block with offset from cache (finality depth)", func(t *testing.T) { - htu := newHeadTrackerUniverse(t, opts{FinalityDepth: 1, FinalizedBlockOffset: 1, Heads: []evmtypes.Head{*h13, *h12, *h11}}) + htu := newHeadTrackerUniverse(t, opts{FinalityDepth: 1, FinalizedBlockOffset: 1, Heads: []*evmtypes.Head{h13, h12, h11}}) htu.ethClient.On("HeadByNumber", mock.Anything, (*big.Int)(nil)).Return(h13, nil).Once() actualL, actualLF, err := htu.headTracker.LatestAndFinalizedBlock(ctx) @@ -1261,7 +1215,7 @@ func TestHeadTracker_LatestAndFinalizedBlock(t *testing.T) { assert.Equal(t, actualLF.Number, h11.Number) }) t.Run("returns latest finalized block with offset from RPC (finality depth)", func(t *testing.T) { - htu := newHeadTrackerUniverse(t, opts{FinalityDepth: 1, FinalizedBlockOffset: 2, Heads: []evmtypes.Head{*h13, *h12, *h11}}) + htu := newHeadTrackerUniverse(t, opts{FinalityDepth: 1, FinalizedBlockOffset: 2, Heads: []*evmtypes.Head{h13, h12, h11}}) htu.ethClient.On("HeadByNumber", mock.Anything, (*big.Int)(nil)).Return(h13, nil).Once() h10 := testutils.Head(10) htu.ethClient.On("HeadByNumber", mock.Anything, big.NewInt(10)).Return(h10, nil).Once() @@ -1273,47 +1227,6 @@ func TestHeadTracker_LatestAndFinalizedBlock(t *testing.T) { }) } -// BenchmarkHeadTracker_Backfill - benchmarks HeadTracker's Backfill with focus on efficiency after initial -// backfill on start up -func BenchmarkHeadTracker_Backfill(b *testing.B) { - evmcfg := testutils.NewTestChainScopedConfig(b, func(c *toml.EVMConfig) { - c.FinalityTagEnabled = ptr(true) - }) - db := pgtest.NewSqlxDB(b) - chainID := big.NewInt(evmclient.NullClientChainID) - orm := headtracker.NewORM(*chainID, db) - ethClient := evmclimocks.NewClient(b) - ethClient.On("ConfiguredChainID").Return(chainID) - ht := createHeadTracker(b, ethClient, evmcfg.EVM(), evmcfg.EVM().HeadTracker(), orm) - ctx := tests.Context(b) - makeHash := func(n int64) common.Hash { - return common.BigToHash(big.NewInt(n)) - } - const finalityDepth = 12000 // observed value on Arbitrum - makeBlock := func(n int64) *evmtypes.Head { - return &evmtypes.Head{Number: n, Hash: makeHash(n), ParentHash: makeHash(n - 1)} - } - latest := makeBlock(finalityDepth) - finalized := makeBlock(1) - ethClient.On("HeadByHash", mock.Anything, mock.Anything).Return(func(_ context.Context, hash common.Hash) (*evmtypes.Head, error) { - number := hash.Big().Int64() - return makeBlock(number), nil - }) - ethClient.On("LatestFinalizedBlock", mock.Anything).Return(finalized, nil).Once() - // run initial backfill to populate the database - err := ht.headTracker.Backfill(ctx, latest) - require.NoError(b, err) - b.ResetTimer() - // focus benchmark on processing of a new latest block - for i := 0; i < b.N; i++ { - latest = makeBlock(int64(finalityDepth + i)) - finalized = makeBlock(int64(i + 1)) - ethClient.On("LatestFinalizedBlock", mock.Anything).Return(finalized, nil).Once() - err := ht.headTracker.Backfill(ctx, latest) - require.NoError(b, err) - } -} - func createHeadTracker(t testing.TB, ethClient *evmclimocks.Client, config commontypes.Config, htConfig commontypes.HeadTrackerConfig, orm headtracker.ORM) *headTrackerUniverse { lggr, ob := logger.TestObserved(t, zap.DebugLevel) hb := headtracker.NewHeadBroadcaster(lggr) @@ -1417,15 +1330,15 @@ func (hb *headBuffer) Append(head *evmtypes.Head) { Number: head.Number, Hash: head.Hash, ParentHash: head.ParentHash, - Parent: head.Parent, Timestamp: time.Unix(int64(len(hb.Heads)), 0), EVMChainID: head.EVMChainID, } + cloned.Parent.Store(head.Parent.Load()) hb.Heads = append(hb.Heads, cloned) } type blocks struct { - t *testing.T + t testing.TB Hashes []common.Hash mHashes map[int64]common.Hash Heads map[int64]*evmtypes.Head @@ -1435,7 +1348,7 @@ func (b *blocks) Head(number uint64) *evmtypes.Head { return b.Heads[int64(number)] } -func NewBlocks(t *testing.T, numHashes int) *blocks { +func NewBlocks(t testing.TB, numHashes int) *blocks { hashes := make([]common.Hash, 0) heads := make(map[int64]*evmtypes.Head) for i := int64(0); i < int64(numHashes); i++ { @@ -1445,7 +1358,7 @@ func NewBlocks(t *testing.T, numHashes int) *blocks { heads[i] = &evmtypes.Head{Hash: hash, Number: i, Timestamp: time.Unix(i, 0), EVMChainID: ubig.New(testutils.FixtureChainID)} if i > 0 { parent := heads[i-1] - heads[i].Parent = parent + heads[i].Parent.Store(parent) heads[i].ParentHash = parent.Hash } } @@ -1474,7 +1387,7 @@ func (b *blocks) ForkAt(t *testing.T, blockNum int64, numHashes int) *blocks { } forked.Heads[blockNum].ParentHash = b.Heads[blockNum].ParentHash - forked.Heads[blockNum].Parent = b.Heads[blockNum].Parent + forked.Heads[blockNum].Parent.Store(b.Heads[blockNum].Parent.Load()) return forked } @@ -1488,9 +1401,9 @@ func (b *blocks) NewHead(number uint64) *evmtypes.Head { Number: parent.Number + 1, Hash: testutils.NewHash(), ParentHash: parent.Hash, - Parent: parent, Timestamp: time.Unix(parent.Number+1, 0), EVMChainID: ubig.New(testutils.FixtureChainID), } + head.Parent.Store(parent) return head } diff --git a/core/chains/evm/headtracker/heads.go b/core/chains/evm/headtracker/heads.go index a61e55dcd28..c3492f9a595 100644 --- a/core/chains/evm/headtracker/heads.go +++ b/core/chains/evm/headtracker/heads.go @@ -1,7 +1,8 @@ package headtracker import ( - "sort" + "container/heap" + "fmt" "sync" "github.com/ethereum/go-ethereum/common" @@ -17,7 +18,7 @@ type Heads interface { HeadByHash(hash common.Hash) *evmtypes.Head // AddHeads adds newHeads to the collection, eliminates duplicates, // sorts by head number, fixes parents and cuts off old heads (historyDepth). - AddHeads(newHeads ...*evmtypes.Head) + AddHeads(newHeads ...*evmtypes.Head) error // Count returns number of heads in the collection. Count() int // MarkFinalized - finds `finalized` in the LatestHead and marks it and all direct ancestors as finalized. @@ -26,114 +27,158 @@ type Heads interface { } type heads struct { - heads []*evmtypes.Head - headsMap map[common.Hash]*evmtypes.Head - mu sync.RWMutex + highest *evmtypes.Head + headsAsc *headsHeap + headsByHash map[common.Hash]*evmtypes.Head + headsByParent map[common.Hash]map[common.Hash]*evmtypes.Head + mu sync.RWMutex } func NewHeads() Heads { - return &heads{} + return &heads{ + headsAsc: &headsHeap{}, + headsByHash: make(map[common.Hash]*evmtypes.Head), + headsByParent: map[common.Hash]map[common.Hash]*evmtypes.Head{}, + } } func (h *heads) LatestHead() *evmtypes.Head { h.mu.RLock() defer h.mu.RUnlock() - if len(h.heads) == 0 { - return nil - } - return h.heads[0] + return h.highest } func (h *heads) HeadByHash(hash common.Hash) *evmtypes.Head { h.mu.RLock() defer h.mu.RUnlock() - if h.headsMap == nil { + if h.headsByHash == nil { return nil } - return h.headsMap[hash] + return h.headsByHash[hash] } func (h *heads) Count() int { h.mu.RLock() defer h.mu.RUnlock() - return len(h.heads) + return h.headsAsc.Len() } -// MarkFinalized - marks block with has equal to finalized and all it's direct ancestors as finalized. +// MarkFinalized - marks block with hash equal to finalized and all it's direct ancestors as finalized. // Trims old blocks whose height is smaller than minBlockToKeep func (h *heads) MarkFinalized(finalized common.Hash, minBlockToKeep int64) bool { h.mu.Lock() defer h.mu.Unlock() - if len(h.heads) == 0 { + if len(h.headsByHash) == 0 { return false } - // deep copy to avoid race on head.Parent - h.heads, h.headsMap = deepCopy(h.heads, minBlockToKeep) - - finalizedHead, ok := h.headsMap[finalized] + finalizedHead, ok := h.headsByHash[finalized] if !ok { return false } - for finalizedHead != nil { - finalizedHead.IsFinalized = true - finalizedHead = finalizedHead.Parent + + markFinalized(finalizedHead) + + // remove all blocks that are older than minBlockToKeep + for h.headsAsc.Len() > 0 && h.headsAsc.Peek().Number < minBlockToKeep { + oldBlock := heap.Pop(h.headsAsc).(*evmtypes.Head) + delete(h.headsByHash, oldBlock.Hash) + // clear .Parent in oldBlock's children + for _, oldBlockChildren := range h.headsByParent[oldBlock.Hash] { + oldBlockChildren.Parent.Store(nil) + } + // headsByParent are expected to be of the same height, so we can remove them all at once + delete(h.headsByParent, oldBlock.ParentHash) + } + + if h.highest.Number < minBlockToKeep { + h.highest = nil } return true } -func deepCopy(oldHeads []*evmtypes.Head, minBlockToKeep int64) ([]*evmtypes.Head, map[common.Hash]*evmtypes.Head) { - headsMap := make(map[common.Hash]*evmtypes.Head, len(oldHeads)) - heads := make([]*evmtypes.Head, 0, len(headsMap)) - for _, head := range oldHeads { - if head.Hash == head.ParentHash { - // shouldn't happen but it is untrusted input - continue - } - if head.BlockNumber() < minBlockToKeep { - // trim redundant blocks - continue - } - // copy all head objects to avoid races when a previous head chain is used - // elsewhere (since we mutate Parent here) - headCopy := *head - headCopy.Parent = nil // always build it from scratch in case it points to a head too old to be included - // map eliminates duplicates - // prefer head that was already in heads as it might have been marked as finalized on previous run - if _, ok := headsMap[head.Hash]; !ok { - headsMap[head.Hash] = &headCopy - heads = append(heads, &headCopy) +func markFinalized(head *evmtypes.Head) { + // we can assume that if a head was previously marked as finalized all its ancestors were marked as finalized + for head != nil && !head.IsFinalized.Load() { + head.IsFinalized.Store(true) + head = head.Parent.Load() + } +} + +func (h *heads) ensureNoCycles(newHead *evmtypes.Head) error { + if newHead.ParentHash == newHead.Hash { + return fmt.Errorf("cycle detected: newHeads reference itself newHead(%s)", newHead.String()) + } + if parent, ok := h.headsByHash[newHead.ParentHash]; ok { + if parent.Number >= newHead.Number { + return fmt.Errorf("potential cycle detected while adding newHead as child: %w", newPotentialCycleError(parent, newHead)) } } - // sort the heads as original slice might be out of order - sort.SliceStable(heads, func(i, j int) bool { - // sorting from the highest number to lowest - return heads[i].Number > heads[j].Number - }) - - // assign parents - for i := 0; i < len(heads); i++ { - head := heads[i] - parent, exists := headsMap[head.ParentHash] - if exists { - head.Parent = parent + for _, child := range h.headsByParent[newHead.Hash] { + if newHead.Number >= child.Number { + return fmt.Errorf("potential cycle detected while adding newHead as parent: %w", newPotentialCycleError(newHead, child)) } } - return heads, headsMap + return nil } -func (h *heads) AddHeads(newHeads ...*evmtypes.Head) { +func (h *heads) AddHeads(newHeads ...*evmtypes.Head) error { h.mu.Lock() defer h.mu.Unlock() - // deep copy to avoid race on head.Parent - h.heads, h.headsMap = deepCopy(append(h.heads, newHeads...), 0) + for _, newHead := range newHeads { + // skip blocks that were previously added + if _, ok := h.headsByHash[newHead.Hash]; ok { + continue + } + + if err := h.ensureNoCycles(newHead); err != nil { + return err + } + + // heads now owns the newHead - reset values that are populated by heads + newHead.IsFinalized.Store(false) + newHead.Parent.Store(nil) + + // prefer newer head to set as highest + if h.highest == nil || h.highest.Number <= newHead.Number { + h.highest = newHead + } + + heap.Push(h.headsAsc, newHead) + h.headsByHash[newHead.Hash] = newHead + siblings, ok := h.headsByParent[newHead.ParentHash] + if !ok { + siblings = make(map[common.Hash]*evmtypes.Head) + h.headsByParent[newHead.ParentHash] = siblings + } + siblings[newHead.Hash] = newHead + // populate reference to parent + if parent, ok := h.headsByHash[newHead.ParentHash]; ok { + newHead.Parent.Store(parent) + } + for _, child := range h.headsByParent[newHead.Hash] { + // ensure all children have reference to newHead + child.Parent.Store(newHead) + if child.IsFinalized.Load() { + // mark newHead as finalized if any of its children is finalized + markFinalized(newHead) + } + } + } + + return nil +} + +func newPotentialCycleError(parent, child *evmtypes.Head) error { + return fmt.Errorf("expected head number to strictly decrease in 'child -> parent' relation: "+ + "child(%s), parent(%s)", child.String(), parent.String()) } diff --git a/core/chains/evm/headtracker/heads_test.go b/core/chains/evm/headtracker/heads_test.go index 6c02c528ba2..92e4015d8c3 100644 --- a/core/chains/evm/headtracker/heads_test.go +++ b/core/chains/evm/headtracker/heads_test.go @@ -20,21 +20,29 @@ func TestHeads_LatestHead(t *testing.T) { t.Parallel() heads := headtracker.NewHeads() - heads.AddHeads(testutils.Head(100), testutils.Head(200), testutils.Head(300)) + assert.NoError(t, heads.AddHeads(testutils.Head(100), testutils.Head(200), testutils.Head(300))) latest := heads.LatestHead() require.NotNil(t, latest) require.Equal(t, int64(300), latest.Number) - heads.AddHeads(testutils.Head(250)) + assert.NoError(t, heads.AddHeads(testutils.Head(250))) latest = heads.LatestHead() require.NotNil(t, latest) require.Equal(t, int64(300), latest.Number) - heads.AddHeads(testutils.Head(400)) + assert.NoError(t, heads.AddHeads(testutils.Head(400))) latest = heads.LatestHead() require.NotNil(t, latest) require.Equal(t, int64(400), latest.Number) + + // if heads have the same height, LatestHead prefers most recent + newerH400 := testutils.Head(400) + assert.NoError(t, heads.AddHeads(newerH400)) + latest = heads.LatestHead() + require.NotNil(t, latest) + require.Equal(t, int64(400), latest.Number) + require.Equal(t, newerH400.Hash, latest.Hash) } func TestHeads_HeadByHash(t *testing.T) { @@ -46,7 +54,7 @@ func TestHeads_HeadByHash(t *testing.T) { testutils.Head(300), } heads := headtracker.NewHeads() - heads.AddHeads(testHeads...) + assert.NoError(t, heads.AddHeads(testHeads...)) head := heads.HeadByHash(testHeads[1].Hash) require.NotNil(t, head) @@ -62,10 +70,10 @@ func TestHeads_Count(t *testing.T) { heads := headtracker.NewHeads() require.Zero(t, heads.Count()) - heads.AddHeads(testutils.Head(100), testutils.Head(200), testutils.Head(300)) + assert.NoError(t, heads.AddHeads(testutils.Head(100), testutils.Head(200), testutils.Head(300))) require.Equal(t, 3, heads.Count()) - heads.AddHeads(testutils.Head(400)) + assert.NoError(t, heads.AddHeads(testutils.Head(400))) require.Equal(t, 4, heads.Count()) } @@ -77,11 +85,11 @@ func TestHeads_AddHeads(t *testing.T) { var testHeads []*evmtypes.Head var parentHash common.Hash - for i := 0; i < 5; i++ { - hash := utils.NewHash() + for i := 1; i < 6; i++ { + hash := common.BigToHash(big.NewInt(int64(i))) h := evmtypes.NewHead(big.NewInt(int64(i)), hash, parentHash, uint64(time.Now().Unix()), ubig.NewI(0)) testHeads = append(testHeads, &h) - if i == 2 { + if i == 3 { // uncled block h := evmtypes.NewHead(big.NewInt(int64(i)), uncleHash, parentHash, uint64(time.Now().Unix()), ubig.NewI(0)) testHeads = append(testHeads, &h) @@ -89,10 +97,10 @@ func TestHeads_AddHeads(t *testing.T) { parentHash = hash } - heads.AddHeads(testHeads...) + assert.NoError(t, heads.AddHeads(testHeads...)) require.Equal(t, 6, heads.Count()) // Add duplicates (should be ignored) - heads.AddHeads(testHeads[2:5]...) + assert.NoError(t, heads.AddHeads(testHeads[2:5]...)) require.Equal(t, 6, heads.Count()) head := heads.LatestHead() @@ -102,6 +110,26 @@ func TestHeads_AddHeads(t *testing.T) { head = heads.HeadByHash(uncleHash) require.NotNil(t, head) require.Equal(t, 3, int(head.ChainLength())) + // returns an error, if newHead creates cycle + t.Run("Returns an error, if newHead create cycle", func(t *testing.T) { + cycleHead := &evmtypes.Head{ + Hash: heads.LatestHead().EarliestInChain().ParentHash, + ParentHash: heads.LatestHead().Hash, + } + // 1. try adding in front + cycleHead.Number = heads.LatestHead().Number + 1 + assert.EqualError(t, heads.AddHeads(cycleHead), "potential cycle detected while adding newHead as parent: expected head number to strictly decrease in 'child -> parent' relation: child(Head{Number: 1, Hash: 0x0000000000000000000000000000000000000000000000000000000000000001, ParentHash: 0x0000000000000000000000000000000000000000000000000000000000000000}), parent(Head{Number: 6, Hash: 0x0000000000000000000000000000000000000000000000000000000000000000, ParentHash: 0x0000000000000000000000000000000000000000000000000000000000000005})") + // 2. try adding to back + cycleHead.Number = heads.LatestHead().EarliestInChain().Number - 1 + assert.EqualError(t, heads.AddHeads(cycleHead), "potential cycle detected while adding newHead as child: expected head number to strictly decrease in 'child -> parent' relation: child(Head{Number: 0, Hash: 0x0000000000000000000000000000000000000000000000000000000000000000, ParentHash: 0x0000000000000000000000000000000000000000000000000000000000000005}), parent(Head{Number: 5, Hash: 0x0000000000000000000000000000000000000000000000000000000000000005, ParentHash: 0x0000000000000000000000000000000000000000000000000000000000000004})") + // 3. try adding to back with reference to self + cycleHead = &evmtypes.Head{ + Number: 1000, + Hash: common.BigToHash(big.NewInt(1000)), + ParentHash: common.BigToHash(big.NewInt(1000)), + } + assert.EqualError(t, heads.AddHeads(cycleHead), "cycle detected: newHeads reference itself newHead(Head{Number: 1000, Hash: 0x00000000000000000000000000000000000000000000000000000000000003e8, ParentHash: 0x00000000000000000000000000000000000000000000000000000000000003e8})") + }) } func TestHeads_MarkFinalized(t *testing.T) { @@ -110,7 +138,7 @@ func TestHeads_MarkFinalized(t *testing.T) { heads := headtracker.NewHeads() // create chain - // H0 <- H1 <- H2 <- H3 <- H4 <- H5 + // H0 <- H1 <- H2 <- H3 <- H4 <- H5 - Canonical // \ \ // H1Uncle H2Uncle // @@ -127,35 +155,80 @@ func TestHeads_MarkFinalized(t *testing.T) { h5 := newHead(5, h4.Hash) h2Uncle := newHead(2, h1.Hash) - allHeads := []*evmtypes.Head{h0, h1, h1Uncle, h2, h2Uncle, h3, h4, h5} - heads.AddHeads(allHeads...) + assert.NoError(t, heads.AddHeads(h0, h1, h1Uncle, h2, h2Uncle, h3, h4, h5)) // mark h3 and all ancestors as finalized require.True(t, heads.MarkFinalized(h3.Hash, h1.BlockNumber()), "expected MarkFinalized succeed") - // original heads remain unchanged - for _, h := range allHeads { - assert.False(t, h.IsFinalized, "expected original heads to remain unfinalized") - } - // h0 is too old. It should not be available directly or through its children assert.Nil(t, heads.HeadByHash(h0.Hash)) - assert.Nil(t, heads.HeadByHash(h1.Hash).Parent) - assert.Nil(t, heads.HeadByHash(h1Uncle.Hash).Parent) - assert.Nil(t, heads.HeadByHash(h2Uncle.Hash).Parent.Parent) + assert.Nil(t, heads.HeadByHash(h1.Hash).Parent.Load()) + assert.Nil(t, heads.HeadByHash(h1Uncle.Hash).Parent.Load()) + assert.Nil(t, heads.HeadByHash(h2Uncle.Hash).Parent.Load().Parent.Load()) require.False(t, heads.MarkFinalized(utils.NewHash(), 0), "expected false if finalized hash was not found in existing LatestHead chain") ensureProperFinalization := func(t *testing.T) { t.Helper() for _, head := range []*evmtypes.Head{h5, h4} { - require.False(t, heads.HeadByHash(head.Hash).IsFinalized, "expected h4-h5 not to be finalized", head.BlockNumber()) + require.False(t, heads.HeadByHash(head.Hash).IsFinalized.Load(), "expected h4-h5 not to be finalized", head.BlockNumber()) } for _, head := range []*evmtypes.Head{h3, h2, h1} { - require.True(t, heads.HeadByHash(head.Hash).IsFinalized, "expected h3 and all ancestors to be finalized", head.BlockNumber()) + require.True(t, heads.HeadByHash(head.Hash).IsFinalized.Load(), "expected h3 and all ancestors to be finalized", head.BlockNumber()) } - require.False(t, heads.HeadByHash(h2Uncle.Hash).IsFinalized, "expected uncle block not to be marked as finalized") + require.False(t, heads.HeadByHash(h2Uncle.Hash).IsFinalized.Load(), "expected uncle block not to be marked as finalized") } t.Run("blocks were correctly marked as finalized", ensureProperFinalization) - heads.AddHeads(h0, h1, h2, h2Uncle, h3, h4, h5) + assert.NoError(t, heads.AddHeads(h0, h1, h2, h2Uncle, h3, h4, h5)) t.Run("blocks remain finalized after re adding them to the Heads", ensureProperFinalization) + + // ensure that IsFinalized is propagated, when older blocks are added + // 1. remove all blocks older than 3 + heads.MarkFinalized(h3.Hash, 3) + // 2. ensure that h2 and h1 are no longer present + assert.Nil(t, heads.HeadByHash(h2.Hash)) + assert.Nil(t, heads.HeadByHash(h1.Hash)) + // 3. add blocks back, starting from older + assert.NoError(t, heads.AddHeads(h1)) + assert.False(t, heads.HeadByHash(h1.Hash).IsFinalized.Load(), "expected h1 to not be finalized as it was not explicitly marked and there no path to h3") + assert.NoError(t, heads.AddHeads(h2)) + // 4. now h2 and h1 must be marked as finalized + assert.True(t, heads.HeadByHash(h1.Hash).IsFinalized.Load()) + assert.True(t, heads.HeadByHash(h2.Hash).IsFinalized.Load()) +} + +func BenchmarkEarliestHeadInChain(b *testing.B) { + const latestBlockNum = 200_000 + blocks := NewBlocks(b, latestBlockNum+1) + b.ResetTimer() + for i := 0; i < b.N; i++ { + latest := blocks.Head(latestBlockNum) + earliest := latest.EarliestHeadInChain() + // perform sanity check + assert.NotEqual(b, latest.BlockNumber(), earliest.BlockNumber()) + assert.NotEqual(b, latest.BlockHash(), earliest.BlockHash()) + } +} + +// BenchmarkSimulated_Backfill - benchmarks AddHeads & MarkFinalized as if it was performed by HeadTracker's backfill +func BenchmarkHeads_SimulatedBackfill(b *testing.B) { + makeHash := func(n int64) common.Hash { + return common.BigToHash(big.NewInt(n)) + } + makeHead := func(n int64) *evmtypes.Head { + return &evmtypes.Head{Number: n, Hash: makeHash(n), ParentHash: makeHash(n - 1)} + } + + const finalityDepth = 16_000 // observed value on Arbitrum + // populate with initial values + heads := headtracker.NewHeads() + for i := int64(1); i <= finalityDepth; i++ { + assert.NoError(b, heads.AddHeads(makeHead(i))) + } + heads.MarkFinalized(makeHash(1), 1) + // focus benchmark on processing of a new latest block + b.ResetTimer() + for i := int64(1); i <= int64(b.N); i++ { + assert.NoError(b, heads.AddHeads(makeHead(finalityDepth+i))) + heads.MarkFinalized(makeHash(i), i) + } } diff --git a/core/chains/evm/headtracker/heap.go b/core/chains/evm/headtracker/heap.go new file mode 100644 index 00000000000..572ed541dfa --- /dev/null +++ b/core/chains/evm/headtracker/heap.go @@ -0,0 +1,35 @@ +package headtracker + +import evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" + +type headsHeap struct { + values []*evmtypes.Head +} + +func (h *headsHeap) Len() int { + return len(h.values) +} + +func (h *headsHeap) Swap(i, j int) { + h.values[i], h.values[j] = h.values[j], h.values[i] +} + +func (h *headsHeap) Less(i, j int) bool { + return h.values[i].Number < h.values[j].Number +} + +func (h *headsHeap) Pop() any { + n := len(h.values) - 1 + old := h.values[n] + h.values[n] = nil + h.values = h.values[:n] + return old +} + +func (h *headsHeap) Push(v any) { + h.values = append(h.values, v.(*evmtypes.Head)) +} + +func (h *headsHeap) Peek() *evmtypes.Head { + return h.values[0] +} diff --git a/core/chains/evm/headtracker/types/types.go b/core/chains/evm/headtracker/types/types.go index 1a03f3cec6f..ca5a79fc68d 100644 --- a/core/chains/evm/headtracker/types/types.go +++ b/core/chains/evm/headtracker/types/types.go @@ -2,10 +2,13 @@ package types import ( "context" + "math/big" + "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" "github.com/smartcontractkit/chainlink/v2/common/headtracker" + htrktypes "github.com/smartcontractkit/chainlink/v2/common/headtracker/types" evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" ) @@ -22,4 +25,5 @@ type ( HeadTrackable = headtracker.HeadTrackable[*evmtypes.Head, common.Hash] HeadListener = headtracker.HeadListener[*evmtypes.Head, common.Hash] HeadBroadcaster = headtracker.HeadBroadcaster[*evmtypes.Head, common.Hash] + Client = htrktypes.Client[*evmtypes.Head, ethereum.Subscription, *big.Int, common.Hash] ) diff --git a/core/chains/evm/log/broadcaster.go b/core/chains/evm/log/broadcaster.go index e7f02d1199c..3e37678bee3 100644 --- a/core/chains/evm/log/broadcaster.go +++ b/core/chains/evm/log/broadcaster.go @@ -590,7 +590,7 @@ func (b *broadcaster) onNewHeads() { b.logger.Errorf("Failed to query for log broadcasts, %v", err) return } - b.registrations.sendLogs(ctx, logs, *latestHead, broadcasts, b.orm) + b.registrations.sendLogs(ctx, logs, latestHead, broadcasts, b.orm) if err := b.orm.SetPendingMinBlock(ctx, nil); err != nil { b.logger.Errorw("Failed to set pending broadcasts number null", "err", err) } @@ -605,7 +605,7 @@ func (b *broadcaster) onNewHeads() { return } - b.registrations.sendLogs(ctx, logs, *latestHead, broadcasts, b.orm) + b.registrations.sendLogs(ctx, logs, latestHead, broadcasts, b.orm) } newMin := b.logPool.deleteOlderLogs(keptDepth) if err := b.orm.SetPendingMinBlock(ctx, newMin); err != nil { diff --git a/core/chains/evm/log/registrations.go b/core/chains/evm/log/registrations.go index 68dd93b9d88..01104349a6f 100644 --- a/core/chains/evm/log/registrations.go +++ b/core/chains/evm/log/registrations.go @@ -11,6 +11,7 @@ import ( pkgerrors "github.com/pkg/errors" "github.com/smartcontractkit/chainlink-common/pkg/logger" + evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" "github.com/smartcontractkit/chainlink/v2/core/gethwrappers" "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/generated" @@ -215,7 +216,7 @@ func (r *registrations) isAddressRegistered(address common.Address) bool { return false } -func (r *registrations) sendLogs(ctx context.Context, logsToSend []logsOnBlock, latestHead evmtypes.Head, broadcasts []LogBroadcast, bc broadcastCreator) { +func (r *registrations) sendLogs(ctx context.Context, logsToSend []logsOnBlock, latestHead *evmtypes.Head, broadcasts []LogBroadcast, bc broadcastCreator) { broadcastsExisting := make(map[LogBroadcastAsKey]bool) for _, b := range broadcasts { broadcastsExisting[b.AsKey()] = b.Consumed @@ -387,7 +388,7 @@ type broadcastCreator interface { CreateBroadcast(ctx context.Context, blockHash common.Hash, blockNumber uint64, logIndex uint, jobID int32) error } -func (r *handler) sendLog(ctx context.Context, log types.Log, latestHead evmtypes.Head, +func (r *handler) sendLog(ctx context.Context, log types.Log, latestHead *evmtypes.Head, broadcasts map[LogBroadcastAsKey]bool, bc broadcastCreator, logger logger.Logger) { diff --git a/core/chains/evm/logpoller/log_poller.go b/core/chains/evm/logpoller/log_poller.go index 360511951ee..dd7e0c5242b 100644 --- a/core/chains/evm/logpoller/log_poller.go +++ b/core/chains/evm/logpoller/log_poller.go @@ -1037,7 +1037,7 @@ func (lp *logPoller) findBlockAfterLCA(ctx context.Context, current *evmtypes.He if err != nil { return nil, err } - blockAfterLCA := *current + blockAfterLCA := current // We expect reorgs up to the block after latestFinalizedBlock // We loop via parent instead of current so current always holds the LCA+1. // If the parent block number becomes < the first finalized block our reorg is too deep. @@ -1049,10 +1049,10 @@ func (lp *logPoller) findBlockAfterLCA(ctx context.Context, current *evmtypes.He } if parent.Hash == ourParentBlockHash.BlockHash { // If we do have the blockhash, return blockAfterLCA - return &blockAfterLCA, nil + return blockAfterLCA, nil } // Otherwise get a new parent and update blockAfterLCA. - blockAfterLCA = *parent + blockAfterLCA = parent parent, err = lp.ec.HeadByHash(ctx, parent.ParentHash) if err != nil { return nil, err diff --git a/core/chains/evm/logpoller/log_poller_internal_test.go b/core/chains/evm/logpoller/log_poller_internal_test.go index ca1bd72dd6c..620bbf14f41 100644 --- a/core/chains/evm/logpoller/log_poller_internal_test.go +++ b/core/chains/evm/logpoller/log_poller_internal_test.go @@ -569,7 +569,8 @@ func Test_latestBlockAndFinalityDepth(t *testing.T) { }) t.Run("headTracker returns valid chain", func(t *testing.T) { headTracker := htMocks.NewHeadTracker[*evmtypes.Head, common.Hash](t) - finalizedBlock := &evmtypes.Head{Number: 2, IsFinalized: true} + finalizedBlock := &evmtypes.Head{Number: 2} + finalizedBlock.IsFinalized.Store(true) head := &evmtypes.Head{Number: 10} headTracker.On("LatestAndFinalizedBlock", mock.Anything).Return(head, finalizedBlock, nil) diff --git a/core/chains/evm/txmgr/confirmer_test.go b/core/chains/evm/txmgr/confirmer_test.go index dc8e30f5f4b..d63f0cf1de0 100644 --- a/core/chains/evm/txmgr/confirmer_test.go +++ b/core/chains/evm/txmgr/confirmer_test.go @@ -148,27 +148,28 @@ func TestEthConfirmer_Lifecycle(t *testing.T) { err = ec.Start(ctx) require.Error(t, err) - latestFinalizedHead := evmtypes.Head{ - Number: 8, - Hash: testutils.NewHash(), - Parent: nil, - IsFinalized: true, // We are guaranteed to receive a latestFinalizedHead. + latestFinalizedHead := &evmtypes.Head{ + Number: 8, + Hash: testutils.NewHash(), } + // We are guaranteed to receive a latestFinalizedHead. + latestFinalizedHead.IsFinalized.Store(true) - head := evmtypes.Head{ + h9 := &evmtypes.Head{ + Hash: testutils.NewHash(), + Number: 9, + } + h9.Parent.Store(latestFinalizedHead) + head := &evmtypes.Head{ Hash: testutils.NewHash(), Number: 10, - Parent: &evmtypes.Head{ - Hash: testutils.NewHash(), - Number: 9, - Parent: &latestFinalizedHead, - }, } + head.Parent.Store(h9) - ethClient.On("HeadByNumber", mock.Anything, (*big.Int)(nil)).Return(&head, nil).Once() - ethClient.On("LatestFinalizedBlock", mock.Anything).Return(&latestFinalizedHead, nil).Once() + ethClient.On("HeadByNumber", mock.Anything, (*big.Int)(nil)).Return(head, nil).Once() + ethClient.On("LatestFinalizedBlock", mock.Anything).Return(latestFinalizedHead, nil).Once() - err = ec.ProcessHead(ctx, &head) + err = ec.ProcessHead(ctx, head) require.NoError(t, err) // Can successfully close once err = ec.Close() @@ -2742,34 +2743,33 @@ func TestEthConfirmer_EnsureConfirmedTransactionsInLongestChain(t *testing.T) { ec := newEthConfirmer(t, txStore, ethClient, gconfig, config, ethKeyStore, nil) latestFinalizedHead := evmtypes.Head{ - Number: 8, - Hash: testutils.NewHash(), - Parent: nil, - IsFinalized: false, // We are guaranteed to receive a latestFinalizedHead. + Number: 8, + Hash: testutils.NewHash(), } - head := evmtypes.Head{ + h8 := &evmtypes.Head{ + Number: 8, + Hash: testutils.NewHash(), + } + h9 := &evmtypes.Head{ + Hash: testutils.NewHash(), + Number: 9, + } + h9.Parent.Store(h8) + head := &evmtypes.Head{ Hash: testutils.NewHash(), Number: 10, - Parent: &evmtypes.Head{ - Hash: testutils.NewHash(), - Number: 9, - Parent: &evmtypes.Head{ - Number: 8, - Hash: testutils.NewHash(), - Parent: nil, - }, - }, } + head.Parent.Store(h9) t.Run("does nothing if there aren't any transactions", func(t *testing.T) { - require.NoError(t, ec.EnsureConfirmedTransactionsInLongestChain(tests.Context(t), &head, latestFinalizedHead.BlockNumber())) + require.NoError(t, ec.EnsureConfirmedTransactionsInLongestChain(tests.Context(t), head, latestFinalizedHead.BlockNumber())) }) t.Run("does nothing to unconfirmed transactions", func(t *testing.T) { etx := cltest.MustInsertUnconfirmedEthTxWithBroadcastLegacyAttempt(t, txStore, 0, fromAddress) // Do the thing - require.NoError(t, ec.EnsureConfirmedTransactionsInLongestChain(tests.Context(t), &head, latestFinalizedHead.BlockNumber())) + require.NoError(t, ec.EnsureConfirmedTransactionsInLongestChain(tests.Context(t), head, latestFinalizedHead.BlockNumber())) etx, err := txStore.FindTxWithAttempts(ctx, etx.ID) require.NoError(t, err) @@ -2781,7 +2781,7 @@ func TestEthConfirmer_EnsureConfirmedTransactionsInLongestChain(t *testing.T) { mustInsertEthReceipt(t, txStore, head.Number, head.Hash, etx.TxAttempts[0].Hash) // Do the thing - require.NoError(t, ec.EnsureConfirmedTransactionsInLongestChain(tests.Context(t), &head, latestFinalizedHead.BlockNumber())) + require.NoError(t, ec.EnsureConfirmedTransactionsInLongestChain(tests.Context(t), head, latestFinalizedHead.BlockNumber())) etx, err := txStore.FindTxWithAttempts(ctx, etx.ID) require.NoError(t, err) @@ -2791,10 +2791,10 @@ func TestEthConfirmer_EnsureConfirmedTransactionsInLongestChain(t *testing.T) { t.Run("does nothing to confirmed transactions that only have receipts older than the start of the chain", func(t *testing.T) { etx := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, 3, 1, fromAddress) // Add receipt that is older than the lowest block of the chain - mustInsertEthReceipt(t, txStore, head.Parent.Parent.Number-1, testutils.NewHash(), etx.TxAttempts[0].Hash) + mustInsertEthReceipt(t, txStore, h8.Number-1, testutils.NewHash(), etx.TxAttempts[0].Hash) // Do the thing - require.NoError(t, ec.EnsureConfirmedTransactionsInLongestChain(tests.Context(t), &head, latestFinalizedHead.BlockNumber())) + require.NoError(t, ec.EnsureConfirmedTransactionsInLongestChain(tests.Context(t), head, latestFinalizedHead.BlockNumber())) etx, err := txStore.FindTxWithAttempts(ctx, etx.ID) require.NoError(t, err) @@ -2805,7 +2805,7 @@ func TestEthConfirmer_EnsureConfirmedTransactionsInLongestChain(t *testing.T) { etx := cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, 4, 1, fromAddress) attempt := etx.TxAttempts[0] // Include one within head height but a different block hash - mustInsertEthReceipt(t, txStore, head.Parent.Number, testutils.NewHash(), attempt.Hash) + mustInsertEthReceipt(t, txStore, head.Parent.Load().Number, testutils.NewHash(), attempt.Hash) ethClient.On("SendTransactionReturnCode", mock.Anything, mock.MatchedBy(func(tx *types.Transaction) bool { atx, err := txmgr.GetGethSignedTx(attempt.SignedRawTx) @@ -2815,7 +2815,7 @@ func TestEthConfirmer_EnsureConfirmedTransactionsInLongestChain(t *testing.T) { }), fromAddress).Return(commonclient.Successful, nil).Once() // Do the thing - require.NoError(t, ec.EnsureConfirmedTransactionsInLongestChain(tests.Context(t), &head, latestFinalizedHead.BlockNumber())) + require.NoError(t, ec.EnsureConfirmedTransactionsInLongestChain(tests.Context(t), head, latestFinalizedHead.BlockNumber())) etx, err := txStore.FindTxWithAttempts(ctx, etx.ID) require.NoError(t, err) @@ -2830,15 +2830,15 @@ func TestEthConfirmer_EnsureConfirmedTransactionsInLongestChain(t *testing.T) { attempt := etx.TxAttempts[0] attemptHash := attempt.Hash // Add receipt that is older than the lowest block of the chain - mustInsertEthReceipt(t, txStore, head.Parent.Parent.Number-1, testutils.NewHash(), attemptHash) + mustInsertEthReceipt(t, txStore, h8.Number-1, testutils.NewHash(), attemptHash) // Include one within head height but a different block hash - mustInsertEthReceipt(t, txStore, head.Parent.Number, testutils.NewHash(), attemptHash) + mustInsertEthReceipt(t, txStore, head.Parent.Load().Number, testutils.NewHash(), attemptHash) ethClient.On("SendTransactionReturnCode", mock.Anything, mock.Anything, fromAddress).Return( commonclient.Successful, nil).Once() // Do the thing - require.NoError(t, ec.EnsureConfirmedTransactionsInLongestChain(tests.Context(t), &head, latestFinalizedHead.BlockNumber())) + require.NoError(t, ec.EnsureConfirmedTransactionsInLongestChain(tests.Context(t), head, latestFinalizedHead.BlockNumber())) etx, err := txStore.FindTxWithAttempts(ctx, etx.ID) require.NoError(t, err) @@ -2862,9 +2862,9 @@ func TestEthConfirmer_EnsureConfirmedTransactionsInLongestChain(t *testing.T) { require.NoError(t, txStore.InsertTxAttempt(ctx, &attempt3)) // Receipt is within head height but a different block hash - mustInsertEthReceipt(t, txStore, head.Parent.Number, testutils.NewHash(), attempt2.Hash) + mustInsertEthReceipt(t, txStore, head.Parent.Load().Number, testutils.NewHash(), attempt2.Hash) // Receipt is within head height but a different block hash - mustInsertEthReceipt(t, txStore, head.Parent.Number, testutils.NewHash(), attempt3.Hash) + mustInsertEthReceipt(t, txStore, head.Parent.Load().Number, testutils.NewHash(), attempt3.Hash) ethClient.On("SendTransactionReturnCode", mock.Anything, mock.MatchedBy(func(tx *types.Transaction) bool { s, err := txmgr.GetGethSignedTx(attempt3.SignedRawTx) @@ -2873,7 +2873,7 @@ func TestEthConfirmer_EnsureConfirmedTransactionsInLongestChain(t *testing.T) { }), fromAddress).Return(commonclient.Successful, nil).Once() // Do the thing - require.NoError(t, ec.EnsureConfirmedTransactionsInLongestChain(tests.Context(t), &head, latestFinalizedHead.BlockNumber())) + require.NoError(t, ec.EnsureConfirmedTransactionsInLongestChain(tests.Context(t), head, latestFinalizedHead.BlockNumber())) etx, err := txStore.FindTxWithAttempts(ctx, etx.ID) require.NoError(t, err) @@ -2893,7 +2893,7 @@ func TestEthConfirmer_EnsureConfirmedTransactionsInLongestChain(t *testing.T) { // Add receipt that is higher than head mustInsertEthReceipt(t, txStore, head.Number+1, testutils.NewHash(), attempt.Hash) - require.NoError(t, ec.EnsureConfirmedTransactionsInLongestChain(tests.Context(t), &head, latestFinalizedHead.BlockNumber())) + require.NoError(t, ec.EnsureConfirmedTransactionsInLongestChain(tests.Context(t), head, latestFinalizedHead.BlockNumber())) etx, err := txStore.FindTxWithAttempts(ctx, etx.ID) require.NoError(t, err) @@ -3020,19 +3020,20 @@ func TestEthConfirmer_ResumePendingRuns(t *testing.T) { evmcfg := evmtest.NewChainScopedConfig(t, config) + h8 := &evmtypes.Head{ + Number: 8, + Hash: testutils.NewHash(), + } + h9 := &evmtypes.Head{ + Hash: testutils.NewHash(), + Number: 9, + } + h9.Parent.Store(h8) head := evmtypes.Head{ Hash: testutils.NewHash(), Number: 10, - Parent: &evmtypes.Head{ - Hash: testutils.NewHash(), - Number: 9, - Parent: &evmtypes.Head{ - Number: 8, - Hash: testutils.NewHash(), - Parent: nil, - }, - }, } + head.Parent.Store(h9) minConfirmations := int64(2) @@ -3254,10 +3255,10 @@ func TestEthConfirmer_ProcessStuckTransactions(t *testing.T) { // Update tx to signal callback once it is identified as terminally stuck pgtest.MustExec(t, db, `UPDATE evm.txes SET pipeline_task_run_id = $1, signal_callback = TRUE WHERE id = $2`, uuid.New(), tx.ID) head := evmtypes.Head{ - Hash: testutils.NewHash(), - Number: blockNum, - IsFinalized: true, + Hash: testutils.NewHash(), + Number: blockNum, } + head.IsFinalized.Store(true) ethClient.On("HeadByNumber", mock.Anything, (*big.Int)(nil)).Return(&head, nil).Once() ethClient.On("LatestFinalizedBlock", mock.Anything).Return(&head, nil).Once() @@ -3282,10 +3283,10 @@ func TestEthConfirmer_ProcessStuckTransactions(t *testing.T) { require.Equal(t, bumpedFee.Legacy, latestAttempt.TxFee.Legacy) head = evmtypes.Head{ - Hash: testutils.NewHash(), - Number: blockNum + 1, - IsFinalized: true, + Hash: testutils.NewHash(), + Number: blockNum + 1, } + head.IsFinalized.Store(true) ethClient.On("HeadByNumber", mock.Anything, (*big.Int)(nil)).Return(&head, nil).Once() ethClient.On("LatestFinalizedBlock", mock.Anything).Return(&head, nil).Once() ethClient.On("SequenceAt", mock.Anything, mock.Anything, mock.Anything).Return(evmtypes.Nonce(1), nil) diff --git a/core/chains/evm/txmgr/evm_tx_store_test.go b/core/chains/evm/txmgr/evm_tx_store_test.go index c711c2788e8..e47387fb8d3 100644 --- a/core/chains/evm/txmgr/evm_tx_store_test.go +++ b/core/chains/evm/txmgr/evm_tx_store_test.go @@ -628,19 +628,20 @@ func TestORM_FindTxesPendingCallback(t *testing.T) { pgtest.MustExec(t, db, `SET CONSTRAINTS fk_pipeline_runs_pruning_key DEFERRED`) pgtest.MustExec(t, db, `SET CONSTRAINTS pipeline_runs_pipeline_spec_id_fkey DEFERRED`) + h8 := &evmtypes.Head{ + Number: 8, + Hash: testutils.NewHash(), + } + h9 := &evmtypes.Head{ + Hash: testutils.NewHash(), + Number: 9, + } + h9.Parent.Store(h8) head := evmtypes.Head{ - Hash: utils.NewHash(), + Hash: testutils.NewHash(), Number: 10, - Parent: &evmtypes.Head{ - Hash: utils.NewHash(), - Number: 9, - Parent: &evmtypes.Head{ - Number: 8, - Hash: utils.NewHash(), - Parent: nil, - }, - }, } + head.Parent.Store(h9) minConfirmations := int64(2) @@ -792,19 +793,20 @@ func TestORM_FindTransactionsConfirmedInBlockRange(t *testing.T) { ethClient := evmtest.NewEthClientMockWithDefaultChain(t) _, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore) + h8 := &evmtypes.Head{ + Number: 8, + Hash: testutils.NewHash(), + } + h9 := &evmtypes.Head{ + Hash: testutils.NewHash(), + Number: 9, + } + h9.Parent.Store(h8) head := evmtypes.Head{ - Hash: utils.NewHash(), + Hash: testutils.NewHash(), Number: 10, - Parent: &evmtypes.Head{ - Hash: utils.NewHash(), - Number: 9, - Parent: &evmtypes.Head{ - Number: 8, - Hash: utils.NewHash(), - Parent: nil, - }, - }, } + head.Parent.Store(h9) t.Run("find all transactions confirmed in range", func(t *testing.T) { etx_8 := mustInsertConfirmedEthTxWithReceipt(t, txStore, fromAddress, 700, 8) diff --git a/core/chains/evm/txmgr/finalizer_test.go b/core/chains/evm/txmgr/finalizer_test.go index f83a53bf499..b91121d773f 100644 --- a/core/chains/evm/txmgr/finalizer_test.go +++ b/core/chains/evm/txmgr/finalizer_test.go @@ -39,15 +39,16 @@ func TestFinalizer_MarkTxFinalized(t *testing.T) { rpcBatchSize := uint32(1) ht := headtracker.NewSimulatedHeadTracker(ethClient, true, 0) + h99 := &evmtypes.Head{ + Hash: utils.NewHash(), + Number: 99, + } + h99.IsFinalized.Store(true) head := &evmtypes.Head{ Hash: utils.NewHash(), Number: 100, - Parent: &evmtypes.Head{ - Hash: utils.NewHash(), - Number: 99, - IsFinalized: true, - }, } + head.Parent.Store(h99) t.Run("returns not finalized for tx with receipt newer than finalized block", func(t *testing.T) { finalizer := txmgr.NewEvmFinalizer(logger.Test(t), testutils.FixtureChainID, rpcBatchSize, txStore, ethClient, ht) @@ -71,7 +72,7 @@ func TestFinalizer_MarkTxFinalized(t *testing.T) { // Insert receipt for unfinalized block num mustInsertEthReceipt(t, txStore, head.Number, head.Hash, attemptHash) ethClient.On("HeadByNumber", mock.Anything, mock.Anything).Return(head, nil).Once() - ethClient.On("LatestFinalizedBlock", mock.Anything).Return(head.Parent, nil).Once() + ethClient.On("LatestFinalizedBlock", mock.Anything).Return(head.Parent.Load(), nil).Once() err := finalizer.ProcessHead(ctx, head) require.NoError(t, err) tx, err = txStore.FindTxWithIdempotencyKey(ctx, idempotencyKey, testutils.FixtureChainID) @@ -99,9 +100,9 @@ func TestFinalizer_MarkTxFinalized(t *testing.T) { } attemptHash := insertTxAndAttemptWithIdempotencyKey(t, txStore, tx, idempotencyKey) // Insert receipt for finalized block num - mustInsertEthReceipt(t, txStore, head.Parent.Number, utils.NewHash(), attemptHash) + mustInsertEthReceipt(t, txStore, head.Parent.Load().Number, utils.NewHash(), attemptHash) ethClient.On("HeadByNumber", mock.Anything, mock.Anything).Return(head, nil).Once() - ethClient.On("LatestFinalizedBlock", mock.Anything).Return(head.Parent, nil).Once() + ethClient.On("LatestFinalizedBlock", mock.Anything).Return(head.Parent.Load(), nil).Once() err := finalizer.ProcessHead(ctx, head) require.NoError(t, err) tx, err = txStore.FindTxWithIdempotencyKey(ctx, idempotencyKey, testutils.FixtureChainID) @@ -129,9 +130,9 @@ func TestFinalizer_MarkTxFinalized(t *testing.T) { } attemptHash := insertTxAndAttemptWithIdempotencyKey(t, txStore, tx, idempotencyKey) // Insert receipt for finalized block num - mustInsertEthReceipt(t, txStore, head.Parent.Number, head.Parent.Hash, attemptHash) + mustInsertEthReceipt(t, txStore, head.Parent.Load().Number, head.Parent.Load().Hash, attemptHash) ethClient.On("HeadByNumber", mock.Anything, mock.Anything).Return(head, nil).Once() - ethClient.On("LatestFinalizedBlock", mock.Anything).Return(head.Parent, nil).Once() + ethClient.On("LatestFinalizedBlock", mock.Anything).Return(head.Parent.Load(), nil).Once() err := finalizer.ProcessHead(ctx, head) require.NoError(t, err) tx, err = txStore.FindTxWithIdempotencyKey(ctx, idempotencyKey, testutils.FixtureChainID) @@ -160,7 +161,7 @@ func TestFinalizer_MarkTxFinalized(t *testing.T) { attemptHash := insertTxAndAttemptWithIdempotencyKey(t, txStore, tx, idempotencyKey) // Insert receipt for finalized block num receiptBlockHash1 := utils.NewHash() - mustInsertEthReceipt(t, txStore, head.Parent.Number-2, receiptBlockHash1, attemptHash) + mustInsertEthReceipt(t, txStore, head.Parent.Load().Number-2, receiptBlockHash1, attemptHash) idempotencyKey = uuid.New().String() nonce = evmtypes.Nonce(1) tx = &txmgr.Tx{ @@ -176,7 +177,7 @@ func TestFinalizer_MarkTxFinalized(t *testing.T) { attemptHash = insertTxAndAttemptWithIdempotencyKey(t, txStore, tx, idempotencyKey) // Insert receipt for finalized block num receiptBlockHash2 := utils.NewHash() - mustInsertEthReceipt(t, txStore, head.Parent.Number-1, receiptBlockHash2, attemptHash) + mustInsertEthReceipt(t, txStore, head.Parent.Load().Number-1, receiptBlockHash2, attemptHash) // Separate batch calls will be made for each tx due to RPC batch size set to 1 when finalizer initialized above ethClient.On("BatchCallContext", mock.Anything, mock.IsType([]rpc.BatchElem{})).Run(func(args mock.Arguments) { rpcElements := args.Get(1).([]rpc.BatchElem) @@ -186,20 +187,20 @@ func TestFinalizer_MarkTxFinalized(t *testing.T) { require.Equal(t, false, rpcElements[0].Args[1]) reqBlockNum := rpcElements[0].Args[0].(string) - req1BlockNum := hexutil.EncodeBig(big.NewInt(head.Parent.Number - 2)) - req2BlockNum := hexutil.EncodeBig(big.NewInt(head.Parent.Number - 1)) + req1BlockNum := hexutil.EncodeBig(big.NewInt(head.Parent.Load().Number - 2)) + req2BlockNum := hexutil.EncodeBig(big.NewInt(head.Parent.Load().Number - 1)) var headResult evmtypes.Head if req1BlockNum == reqBlockNum { - headResult = evmtypes.Head{Number: head.Parent.Number - 2, Hash: receiptBlockHash1} + headResult = evmtypes.Head{Number: head.Parent.Load().Number - 2, Hash: receiptBlockHash1} } else if req2BlockNum == reqBlockNum { - headResult = evmtypes.Head{Number: head.Parent.Number - 1, Hash: receiptBlockHash2} + headResult = evmtypes.Head{Number: head.Parent.Load().Number - 1, Hash: receiptBlockHash2} } else { require.Fail(t, "unrecognized block hash") } rpcElements[0].Result = &headResult }).Return(nil).Twice() ethClient.On("HeadByNumber", mock.Anything, mock.Anything).Return(head, nil).Once() - ethClient.On("LatestFinalizedBlock", mock.Anything).Return(head.Parent, nil).Once() + ethClient.On("LatestFinalizedBlock", mock.Anything).Return(head.Parent.Load(), nil).Once() err := finalizer.ProcessHead(ctx, head) require.NoError(t, err) tx, err = txStore.FindTxWithIdempotencyKey(ctx, idempotencyKey, testutils.FixtureChainID) diff --git a/core/chains/evm/txmgr/txmgr_test.go b/core/chains/evm/txmgr/txmgr_test.go index d4bfbffd12f..e9437960312 100644 --- a/core/chains/evm/txmgr/txmgr_test.go +++ b/core/chains/evm/txmgr/txmgr_test.go @@ -613,20 +613,21 @@ func TestTxm_GetTransactionStatus(t *testing.T) { gcfg := configtest.NewTestGeneralConfig(t) cfg := evmtest.NewChainScopedConfig(t, gcfg) + h99 := &evmtypes.Head{ + Hash: utils.NewHash(), + Number: 99, + } + h99.IsFinalized.Store(true) head := &evmtypes.Head{ Hash: utils.NewHash(), Number: 100, - Parent: &evmtypes.Head{ - Hash: utils.NewHash(), - Number: 99, - IsFinalized: true, - }, } + head.Parent.Store(h99) ethClient := evmtest.NewEthClientMockWithDefaultChain(t) ethClient.On("PendingNonceAt", mock.Anything, mock.Anything).Return(uint64(0), nil).Maybe() ethClient.On("HeadByNumber", mock.Anything, mock.Anything).Return(head, nil).Once() - ethClient.On("HeadByNumber", mock.Anything, mock.Anything).Return(head.Parent, nil).Once() + ethClient.On("HeadByNumber", mock.Anything, mock.Anything).Return(head.Parent.Load(), nil).Once() ethClient.On("HeadByNumber", mock.Anything, mock.Anything).Return(head, nil) feeEstimator := gasmocks.NewEvmFeeEstimator(t) feeEstimator.On("Start", mock.Anything).Return(nil).Once() @@ -755,7 +756,7 @@ func TestTxm_GetTransactionStatus(t *testing.T) { err = txStore.InsertTxAttempt(ctx, &attempt) require.NoError(t, err) // Insert receipt for finalized block num - mustInsertEthReceipt(t, txStore, head.Parent.Number, head.Parent.Hash, attempt.Hash) + mustInsertEthReceipt(t, txStore, head.Parent.Load().Number, head.Parent.Load().Hash, attempt.Hash) state, err := txm.GetTransactionStatus(ctx, idempotencyKey) require.NoError(t, err) require.Equal(t, commontypes.Finalized, state) diff --git a/core/chains/evm/types/head_test.go b/core/chains/evm/types/head_test.go index 97c536a3444..5d887c43c82 100644 --- a/core/chains/evm/types/head_test.go +++ b/core/chains/evm/types/head_test.go @@ -9,6 +9,11 @@ import ( func TestHead_LatestFinalizedHead(t *testing.T) { t.Parallel() + newFinalizedHead := func(num int64) *Head { + result := &Head{Number: num} + result.IsFinalized.Store(true) + return result + } cases := []struct { Name string Head *Head @@ -21,17 +26,17 @@ func TestHead_LatestFinalizedHead(t *testing.T) { }, { Name: "Chain without finalized returns nil", - Head: &Head{Parent: &Head{Parent: &Head{}}}, + Head: sliceToChain(&Head{}, &Head{}, &Head{}), Finalized: nil, }, { Name: "Returns head if it's finalized", - Head: &Head{Number: 2, IsFinalized: true, Parent: &Head{Number: 1, IsFinalized: true}}, + Head: sliceToChain(newFinalizedHead(2), newFinalizedHead(1)), Finalized: &Head{Number: 2}, }, { Name: "Returns first block in chain if it's finalized", - Head: &Head{Number: 3, IsFinalized: false, Parent: &Head{Number: 2, IsFinalized: true, Parent: &Head{Number: 1, IsFinalized: true}}}, + Head: sliceToChain(&Head{Number: 3}, newFinalizedHead(2), newFinalizedHead(1)), Finalized: &Head{Number: 2}, }, } @@ -48,3 +53,43 @@ func TestHead_LatestFinalizedHead(t *testing.T) { }) } } + +func TestHead_ChainString(t *testing.T) { + cases := []struct { + Name string + Chain *Head + ExpectedResult string + }{ + { + Name: "Empty chain", + ExpectedResult: "->nil", + }, + { + Name: "Single head", + Chain: &Head{Number: 1}, + ExpectedResult: "Head{Number: 1, Hash: 0x0000000000000000000000000000000000000000000000000000000000000000, ParentHash: 0x0000000000000000000000000000000000000000000000000000000000000000}->nil", + }, + { + Name: "Multiple heads", + Chain: sliceToChain(&Head{Number: 1}, &Head{Number: 2}, &Head{Number: 3}), + ExpectedResult: "Head{Number: 1, Hash: 0x0000000000000000000000000000000000000000000000000000000000000000, ParentHash: 0x0000000000000000000000000000000000000000000000000000000000000000}->Head{Number: 2, Hash: 0x0000000000000000000000000000000000000000000000000000000000000000, ParentHash: 0x0000000000000000000000000000000000000000000000000000000000000000}->Head{Number: 3, Hash: 0x0000000000000000000000000000000000000000000000000000000000000000, ParentHash: 0x0000000000000000000000000000000000000000000000000000000000000000}->nil", + }, + } + for _, testCase := range cases { + t.Run(testCase.Name, func(t *testing.T) { + assert.Equal(t, testCase.ExpectedResult, testCase.Chain.ChainString()) + }) + } +} + +func sliceToChain(heads ...*Head) *Head { + if len(heads) == 0 { + return nil + } + + for i := 1; i < len(heads); i++ { + heads[i-1].Parent.Store(heads[i]) + } + + return heads[0] +} diff --git a/core/chains/evm/types/models.go b/core/chains/evm/types/models.go index a9e5cd5841b..1da8754cec4 100644 --- a/core/chains/evm/types/models.go +++ b/core/chains/evm/types/models.go @@ -9,6 +9,7 @@ import ( "math/big" "regexp" "strings" + "sync/atomic" "time" "github.com/ethereum/go-ethereum/common" @@ -34,7 +35,7 @@ type Head struct { Number int64 L1BlockNumber sql.NullInt64 ParentHash common.Hash - Parent *Head + Parent atomic.Pointer[Head] EVMChainID *ubig.Big Timestamp time.Time CreatedAt time.Time @@ -44,7 +45,7 @@ type Head struct { StateRoot common.Hash Difficulty *big.Int TotalDifficulty *big.Int - IsFinalized bool + IsFinalized atomic.Bool } var _ commontypes.Head[common.Hash] = &Head{} @@ -74,10 +75,11 @@ func (h *Head) GetParentHash() common.Hash { } func (h *Head) GetParent() commontypes.Head[common.Hash] { - if h.Parent == nil { - return nil + if parent := h.Parent.Load(); parent != nil { + return parent } - return h.Parent + // explicitly return nil to avoid *Head(nil) + return nil } func (h *Head) GetTimestamp() time.Time { @@ -90,10 +92,11 @@ func (h *Head) BlockDifficulty() *big.Int { // EarliestInChain recurses through parents until it finds the earliest one func (h *Head) EarliestInChain() *Head { - for h.Parent != nil { - h = h.Parent + var earliestInChain *Head + for cur := h; cur != nil; cur = cur.Parent.Load() { + earliestInChain = cur } - return h + return earliestInChain } // EarliestHeadInChain recurses through parents until it finds the earliest one @@ -103,14 +106,10 @@ func (h *Head) EarliestHeadInChain() commontypes.Head[common.Hash] { // IsInChain returns true if the given hash matches the hash of a head in the chain func (h *Head) IsInChain(blockHash common.Hash) bool { - for { - if h.Hash == blockHash { + for cur := h; cur != nil; cur = cur.Parent.Load() { + if cur.Hash == blockHash { return true } - if h.Parent == nil { - break - } - h = h.Parent } return false } @@ -127,32 +126,19 @@ func (h *Head) HashAtHeight(blockNum int64) common.Hash { } func (h *Head) HeadAtHeight(blockNum int64) (commontypes.Head[common.Hash], error) { - for h != nil { - if h.Number == blockNum { - return h, nil + for cur := h; cur != nil; cur = cur.Parent.Load() { + if cur.Number == blockNum { + return cur, nil } - - h = h.Parent } return nil, fmt.Errorf("failed to find head at height %d", blockNum) } // ChainLength returns the length of the chain followed by recursively looking up parents func (h *Head) ChainLength() uint32 { - if h == nil { - return 0 - } - l := uint32(1) - - for { - if h.Parent == nil { - break - } + l := uint32(0) + for cur := h; cur != nil; cur = cur.Parent.Load() { l++ - if h == h.Parent { - panic("circular reference detected") - } - h = h.Parent } return l } @@ -160,29 +146,19 @@ func (h *Head) ChainLength() uint32 { // ChainHashes returns an array of block hashes by recursively looking up parents func (h *Head) ChainHashes() []common.Hash { var hashes []common.Hash - - for { - hashes = append(hashes, h.Hash) - if h.Parent == nil { - break - } - if h == h.Parent { - panic("circular reference detected") - } - h = h.Parent + for cur := h; cur != nil; cur = cur.Parent.Load() { + hashes = append(hashes, cur.Hash) } + return hashes } func (h *Head) LatestFinalizedHead() commontypes.Head[common.Hash] { - for h != nil { - if h.IsFinalized { - return h + for cur := h; cur != nil; cur = cur.Parent.Load() { + if cur.IsFinalized.Load() { + return cur } - - h = h.Parent } - return nil } @@ -200,18 +176,13 @@ func (h *Head) IsValid() bool { func (h *Head) ChainString() string { var sb strings.Builder - - for { - sb.WriteString(h.String()) - if h.Parent == nil { - break - } - if h == h.Parent { - panic("circular reference detected") + for cur := h; cur != nil; cur = cur.Parent.Load() { + if sb.Len() > 0 { + sb.WriteString("->") } - sb.WriteString("->") - h = h.Parent + sb.WriteString(cur.String()) } + sb.WriteString("->nil") return sb.String() } @@ -255,11 +226,11 @@ func (h *Head) AsSlice(k int) (heads []*Head) { if k < 1 || h == nil { return } - heads = make([]*Head, 1) - heads[0] = h - for len(heads) < k && h.Parent != nil { - h = h.Parent - heads = append(heads, h) + heads = make([]*Head, 0, k) + for cur := h; cur != nil; cur = cur.Parent.Load() { + if len(heads) < k { + heads = append(heads, cur) + } } return } diff --git a/core/chains/evm/types/models_test.go b/core/chains/evm/types/models_test.go index 6018d68f962..a54f1f58f5b 100644 --- a/core/chains/evm/types/models_test.go +++ b/core/chains/evm/types/models_test.go @@ -116,11 +116,9 @@ func TestEthTxAttempt_GetSignedTx(t *testing.T) { } func TestHead_ChainLength(t *testing.T) { - head := evmtypes.Head{ - Parent: &evmtypes.Head{ - Parent: &evmtypes.Head{}, - }, - } + head := evmtypes.Head{} + head.Parent.Store(&evmtypes.Head{}) + head.Parent.Load().Parent.Store(&evmtypes.Head{}) assert.Equal(t, uint32(3), head.ChainLength()) @@ -134,12 +132,12 @@ func TestHead_AsSlice(t *testing.T) { } h2 := &evmtypes.Head{ Number: 2, - Parent: h1, } + h2.Parent.Store(h1) h3 := &evmtypes.Head{ Number: 3, - Parent: h2, } + h3.Parent.Store(h2) assert.Len(t, (*evmtypes.Head)(nil).AsSlice(0), 0) assert.Len(t, (*evmtypes.Head)(nil).AsSlice(1), 0) @@ -234,36 +232,35 @@ func TestSafeByteSlice_Error(t *testing.T) { } func TestHead_EarliestInChain(t *testing.T) { - head := evmtypes.Head{ + h3 := evmtypes.Head{ Number: 3, - Parent: &evmtypes.Head{ - Number: 2, - Parent: &evmtypes.Head{ - Number: 1, - }, - }, } + h2 := &evmtypes.Head{Number: 2} + h3.Parent.Store(h2) + h1 := &evmtypes.Head{Number: 1} + h2.Parent.Store(h1) - assert.Equal(t, int64(1), head.EarliestInChain().BlockNumber()) + assert.Equal(t, int64(1), h3.EarliestInChain().BlockNumber()) } func TestHead_HeadAtHeight(t *testing.T) { - expectedResult := &evmtypes.Head{ + h1 := &evmtypes.Head{ + Number: 1, + } + h2 := &evmtypes.Head{ Hash: common.BigToHash(big.NewInt(10)), Number: 2, - Parent: &evmtypes.Head{ - Number: 1, - }, } - head := evmtypes.Head{ + h2.Parent.Store(h1) + h3 := evmtypes.Head{ Number: 3, - Parent: expectedResult, } + h3.Parent.Store(h2) - headAtHeight, err := head.HeadAtHeight(2) + headAtHeight, err := h3.HeadAtHeight(2) require.NoError(t, err) - assert.Equal(t, expectedResult, headAtHeight) - _, err = head.HeadAtHeight(0) + assert.Equal(t, h2, headAtHeight) + _, err = h3.HeadAtHeight(0) assert.Error(t, err, "expected to get an error if head is not in the chain") } @@ -271,25 +268,27 @@ func TestHead_IsInChain(t *testing.T) { hash1 := utils.NewHash() hash2 := utils.NewHash() hash3 := utils.NewHash() - - head := evmtypes.Head{ - Number: 3, + h1 := &evmtypes.Head{ + Number: 1, + Hash: hash1, + } + h2 := &evmtypes.Head{ + Hash: hash2, + ParentHash: hash1, + Number: 2, + } + h2.Parent.Store(h1) + h3 := evmtypes.Head{ Hash: hash3, - Parent: &evmtypes.Head{ - Hash: hash2, - Number: 2, - Parent: &evmtypes.Head{ - Hash: hash1, - Number: 1, - }, - }, + Number: 3, } + h3.Parent.Store(h2) - assert.True(t, head.IsInChain(hash1)) - assert.True(t, head.IsInChain(hash2)) - assert.True(t, head.IsInChain(hash3)) - assert.False(t, head.IsInChain(utils.NewHash())) - assert.False(t, head.IsInChain(common.Hash{})) + assert.True(t, h3.IsInChain(hash1)) + assert.True(t, h3.IsInChain(hash2)) + assert.True(t, h3.IsInChain(hash3)) + assert.False(t, h3.IsInChain(utils.NewHash())) + assert.False(t, h3.IsInChain(common.Hash{})) } func TestTxReceipt_ReceiptIndicatesRunLogFulfillment(t *testing.T) { @@ -316,11 +315,11 @@ func TestHead_UnmarshalJSON(t *testing.T) { tests := []struct { name string json string - expected evmtypes.Head + expected *evmtypes.Head }{ {"geth", `{"difficulty":"0xf3a00","extraData":"0xd883010503846765746887676f312e372e318664617277696e","gasLimit":"0xffc001","gasUsed":"0x0","hash":"0x41800b5c3f1717687d85fc9018faac0a6e90b39deaa0b99e7fe4fe796ddeb26a","logsBloom":"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","miner":"0xd1aeb42885a43b72b518182ef893125814811048","mixHash":"0x0f98b15f1a4901a7e9204f3c500a7bd527b3fb2c3340e12176a44b83e414a69e","nonce":"0x0ece08ea8c49dfd9","number":"0x100","parentHash":"0x41941023680923e0fe4d74a34bdac8141f2540e3ae90623718e47d66d1ca4a2d","receiptsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","sha3Uncles":"0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347","size":"0x218","stateRoot":"0xc7b01007a10da045eacb90385887dd0c38fcb5db7393006bdde24b93873c334b","timestamp":"0x58318da2","totalDifficulty":"0x1f3a00","transactions":[],"transactionsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","uncles":[]}`, - evmtypes.Head{ + &evmtypes.Head{ Hash: common.HexToHash("0x41800b5c3f1717687d85fc9018faac0a6e90b39deaa0b99e7fe4fe796ddeb26a"), Number: 0x100, ParentHash: common.HexToHash("0x41941023680923e0fe4d74a34bdac8141f2540e3ae90623718e47d66d1ca4a2d"), @@ -332,7 +331,7 @@ func TestHead_UnmarshalJSON(t *testing.T) { }, {"parity", `{"author":"0xd1aeb42885a43b72b518182ef893125814811048","difficulty":"0xf3a00","extraData":"0xd883010503846765746887676f312e372e318664617277696e","gasLimit":"0xffc001","gasUsed":"0x0","hash":"0x41800b5c3f1717687d85fc9018faac0a6e90b39deaa0b99e7fe4fe796ddeb26a","logsBloom":"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","miner":"0xd1aeb42885a43b72b518182ef893125814811048","mixHash":"0x0f98b15f1a4901a7e9204f3c500a7bd527b3fb2c3340e12176a44b83e414a69e","nonce":"0x0ece08ea8c49dfd9","number":"0x100","parentHash":"0x41941023680923e0fe4d74a34bdac8141f2540e3ae90623718e47d66d1ca4a2d","receiptsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","sealFields":["0xa00f98b15f1a4901a7e9204f3c500a7bd527b3fb2c3340e12176a44b83e414a69e","0x880ece08ea8c49dfd9"],"sha3Uncles":"0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347","size":"0x218","stateRoot":"0xc7b01007a10da045eacb90385887dd0c38fcb5db7393006bdde24b93873c334b","timestamp":"0x58318da2","totalDifficulty":"0x1f3a00","transactions":[],"transactionsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","uncles":[]}`, - evmtypes.Head{ + &evmtypes.Head{ Hash: common.HexToHash("0x41800b5c3f1717687d85fc9018faac0a6e90b39deaa0b99e7fe4fe796ddeb26a"), Number: 0x100, ParentHash: common.HexToHash("0x41941023680923e0fe4d74a34bdac8141f2540e3ae90623718e47d66d1ca4a2d"), @@ -344,7 +343,7 @@ func TestHead_UnmarshalJSON(t *testing.T) { }, {"arbitrum", `{"number":"0x15156","hash":"0x752dab43f7a2482db39227d46cd307623b26167841e2207e93e7566ab7ab7871","parentHash":"0x923ad1e27c1d43cb2d2fb09e26d2502ca4b4914a2e0599161d279c6c06117d34","mixHash":"0x0000000000000000000000000000000000000000000000000000000000000000","nonce":"0x0000000000000000","sha3Uncles":"0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347","logsBloom":"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","transactionsRoot":"0x71448077f5ce420a8e24db62d4d58e8d8e6ad2c7e76318868e089d41f7e0faf3","stateRoot":"0x0000000000000000000000000000000000000000000000000000000000000000","receiptsRoot":"0x2c292672b8fc9d223647a2569e19721f0757c96a1421753a93e141f8e56cf504","miner":"0x0000000000000000000000000000000000000000","difficulty":"0x0","totalDifficulty":"0x0","extraData":"0x","size":"0x0","gasLimit":"0x11278208","gasUsed":"0x3d1fe9","timestamp":"0x60d0952d","transactions":["0xa1ea93556b93ed3b45cb24f21c8deb584e6a9049c35209242651bf3533c23b98","0xfc6593c45ba92351d17173aa1381e84734d252ab0169887783039212c4a41024","0x85ee9d04fd0ebb5f62191eeb53cb45d9c0945d43eba444c3548de2ac8421682f","0x50d120936473e5b75f6e04829ad4eeca7a1df7d3c5026ebb5d34af936a39b29c"],"uncles":[],"l1BlockNumber":"0x8652f9"}`, - evmtypes.Head{ + &evmtypes.Head{ Hash: common.HexToHash("0x752dab43f7a2482db39227d46cd307623b26167841e2207e93e7566ab7ab7871"), Number: 0x15156, ParentHash: common.HexToHash("0x923ad1e27c1d43cb2d2fb09e26d2502ca4b4914a2e0599161d279c6c06117d34"), @@ -357,7 +356,7 @@ func TestHead_UnmarshalJSON(t *testing.T) { }, {"arbitrum_empty_l1BlockNumber", `{"number":"0x15156","hash":"0x752dab43f7a2482db39227d46cd307623b26167841e2207e93e7566ab7ab7871","parentHash":"0x923ad1e27c1d43cb2d2fb09e26d2502ca4b4914a2e0599161d279c6c06117d34","mixHash":"0x0000000000000000000000000000000000000000000000000000000000000000","nonce":"0x0000000000000000","sha3Uncles":"0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347","logsBloom":"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","transactionsRoot":"0x71448077f5ce420a8e24db62d4d58e8d8e6ad2c7e76318868e089d41f7e0faf3","stateRoot":"0x0000000000000000000000000000000000000000000000000000000000000000","receiptsRoot":"0x2c292672b8fc9d223647a2569e19721f0757c96a1421753a93e141f8e56cf504","miner":"0x0000000000000000000000000000000000000000","difficulty":"0x0","totalDifficulty":"0x0","extraData":"0x","size":"0x0","gasLimit":"0x11278208","gasUsed":"0x3d1fe9","timestamp":"0x60d0952d","transactions":["0xa1ea93556b93ed3b45cb24f21c8deb584e6a9049c35209242651bf3533c23b98","0xfc6593c45ba92351d17173aa1381e84734d252ab0169887783039212c4a41024","0x85ee9d04fd0ebb5f62191eeb53cb45d9c0945d43eba444c3548de2ac8421682f","0x50d120936473e5b75f6e04829ad4eeca7a1df7d3c5026ebb5d34af936a39b29c"],"uncles":[]}`, - evmtypes.Head{ + &evmtypes.Head{ Hash: common.HexToHash("0x752dab43f7a2482db39227d46cd307623b26167841e2207e93e7566ab7ab7871"), Number: 0x15156, ParentHash: common.HexToHash("0x923ad1e27c1d43cb2d2fb09e26d2502ca4b4914a2e0599161d279c6c06117d34"), @@ -370,7 +369,7 @@ func TestHead_UnmarshalJSON(t *testing.T) { }, {"not found", `null`, - evmtypes.Head{}, + &evmtypes.Head{}, }, } @@ -395,11 +394,11 @@ func TestHead_UnmarshalJSON(t *testing.T) { func TestHead_MarshalJSON(t *testing.T) { tests := []struct { name string - head evmtypes.Head + head *evmtypes.Head expected string }{ {"happy", - evmtypes.Head{ + &evmtypes.Head{ Hash: common.HexToHash("0x41800b5c3f1717687d85fc9018faac0a6e90b39deaa0b99e7fe4fe796ddeb26a"), Number: 0x100, ParentHash: common.HexToHash("0x41941023680923e0fe4d74a34bdac8141f2540e3ae90623718e47d66d1ca4a2d"), @@ -411,7 +410,7 @@ func TestHead_MarshalJSON(t *testing.T) { `{"hash":"0x41800b5c3f1717687d85fc9018faac0a6e90b39deaa0b99e7fe4fe796ddeb26a","number":"0x100","parentHash":"0x41941023680923e0fe4d74a34bdac8141f2540e3ae90623718e47d66d1ca4a2d","timestamp":"0x58318da2","receiptsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","transactionsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","stateRoot":"0xc7b01007a10da045eacb90385887dd0c38fcb5db7393006bdde24b93873c334b"}`, }, {"empty", - evmtypes.Head{}, + &evmtypes.Head{}, `{"number":"0x0"}`, }, } diff --git a/core/internal/cltest/cltest.go b/core/internal/cltest/cltest.go index 7d333d94018..b60dd8d73ce 100644 --- a/core/internal/cltest/cltest.go +++ b/core/internal/cltest/cltest.go @@ -1389,7 +1389,7 @@ func (b *Blocks) ForkAt(t *testing.T, blockNum int64, numHashes int) *Blocks { } forked.Heads[blockNum].ParentHash = b.Heads[blockNum].ParentHash - forked.Heads[blockNum].Parent = b.Heads[blockNum].Parent + forked.Heads[blockNum].Parent.Store(b.Heads[blockNum].Parent.Load()) return forked } @@ -1403,10 +1403,10 @@ func (b *Blocks) NewHead(number uint64) *evmtypes.Head { Number: parent.Number + 1, Hash: evmutils.NewHash(), ParentHash: parent.Hash, - Parent: parent, Timestamp: time.Unix(parent.Number+1, 0), EVMChainID: ubig.New(&FixtureChainID), } + head.Parent.Store(parent) return head } @@ -1447,7 +1447,7 @@ func NewBlocks(t *testing.T, numHashes int) *Blocks { heads[i] = &evmtypes.Head{Hash: hash, Number: i, Timestamp: time.Unix(i, 0), EVMChainID: ubig.New(&FixtureChainID)} if i > 0 { parent := heads[i-1] - heads[i].Parent = parent + heads[i].Parent.Store(parent) heads[i].ParentHash = parent.Hash } } diff --git a/core/internal/cltest/factories.go b/core/internal/cltest/factories.go index c488dca94a9..8f4b2260a02 100644 --- a/core/internal/cltest/factories.go +++ b/core/internal/cltest/factories.go @@ -25,6 +25,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" "github.com/smartcontractkit/chainlink-common/pkg/utils/jsonserializable" + txmgrcommon "github.com/smartcontractkit/chainlink/v2/common/txmgr" txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types" "github.com/smartcontractkit/chainlink/v2/core/auth" @@ -318,13 +319,13 @@ func MustGenerateRandomKeyState(_ testing.TB) ethkey.State { return ethkey.State{Address: NewEIP55Address()} } -func MustInsertHead(t *testing.T, ds sqlutil.DataSource, number int64) evmtypes.Head { +func MustInsertHead(t *testing.T, ds sqlutil.DataSource, number int64) *evmtypes.Head { h := evmtypes.NewHead(big.NewInt(number), evmutils.NewHash(), evmutils.NewHash(), 0, ubig.New(&FixtureChainID)) horm := headtracker.NewORM(FixtureChainID, ds) err := horm.IdempotentInsertHead(testutils.Context(t), &h) require.NoError(t, err) - return h + return &h } func MustInsertV2JobSpec(t *testing.T, db *sqlx.DB, transmitterAddress common.Address) job.Job { diff --git a/core/services/headreporter/telemetry_reporter_test.go b/core/services/headreporter/telemetry_reporter_test.go index 85bfea5866a..6d7f4e3ddef 100644 --- a/core/services/headreporter/telemetry_reporter_test.go +++ b/core/services/headreporter/telemetry_reporter_test.go @@ -22,18 +22,18 @@ import ( func Test_TelemetryReporter_NewHead(t *testing.T) { head := evmtypes.Head{ - Number: 42, - EVMChainID: ubig.NewI(100), - Hash: common.HexToHash("0x1010"), - Timestamp: time.UnixMilli(1000), - IsFinalized: false, - Parent: &evmtypes.Head{ - Number: 41, - Hash: common.HexToHash("0x1009"), - Timestamp: time.UnixMilli(999), - IsFinalized: true, - }, + Number: 42, + EVMChainID: ubig.NewI(100), + Hash: common.HexToHash("0x1010"), + Timestamp: time.UnixMilli(1000), + } + h41 := &evmtypes.Head{ + Number: 41, + Hash: common.HexToHash("0x1009"), + Timestamp: time.UnixMilli(999), } + h41.IsFinalized.Store(true) + head.Parent.Store(h41) requestBytes, err := proto.Marshal(&telem.HeadReportRequest{ ChainID: "100", Latest: &telem.Block{ @@ -42,9 +42,9 @@ func Test_TelemetryReporter_NewHead(t *testing.T) { Hash: head.Hash.Hex(), }, Finalized: &telem.Block{ - Timestamp: uint64(head.Parent.Timestamp.UTC().Unix()), + Timestamp: uint64(head.Parent.Load().Timestamp.UTC().Unix()), Number: 41, - Hash: head.Parent.Hash.Hex(), + Hash: head.Parent.Load().Hash.Hex(), }, }) assert.NoError(t, err) @@ -64,11 +64,10 @@ func Test_TelemetryReporter_NewHead(t *testing.T) { func Test_TelemetryReporter_NewHeadMissingFinalized(t *testing.T) { head := evmtypes.Head{ - Number: 42, - EVMChainID: ubig.NewI(100), - Hash: common.HexToHash("0x1010"), - Timestamp: time.UnixMilli(1000), - IsFinalized: false, + Number: 42, + EVMChainID: ubig.NewI(100), + Hash: common.HexToHash("0x1010"), + Timestamp: time.UnixMilli(1000), } requestBytes, err := proto.Marshal(&telem.HeadReportRequest{ ChainID: "100", diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/block_subscriber.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/block_subscriber.go index a5a00542179..21adc12d30e 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/block_subscriber.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/block_subscriber.go @@ -234,7 +234,7 @@ func (bs *BlockSubscriber) processHead(h *evmtypes.Head) { // head parent is a linked list with EVM finality depth // when re-org happens, new heads will have pointers to the new blocks i := int64(0) - for cp := h; cp != nil; cp = cp.Parent { + for cp := h; cp != nil; cp = cp.Parent.Load() { // we don't stop when a matching (block number/hash) entry is seen in the map because parent linked list may be // cut short during a re-org if head broadcaster backfill is not complete. This can cause some re-orged blocks // left in the map. for example, re-org happens for block 98, 99, 100. next head 101 from broadcaster has parent list diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/block_subscriber_test.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/block_subscriber_test.go index fefbda77cd7..bdcc37dc6bb 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/block_subscriber_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/block_subscriber_test.go @@ -310,23 +310,22 @@ func TestBlockSubscriber_Start(t *testing.T) { h97 := evmtypes.Head{ Number: 97, Hash: common.HexToHash("0xda2f9d1359eadd7b93338703adc07d942021a78195564038321ef53f23f87333"), - Parent: nil, } h98 := evmtypes.Head{ Number: 98, Hash: common.HexToHash("0xc20c7b47466c081a44a3b168994e89affe85cb894547845d938f923b67c633c0"), - Parent: &h97, } + h98.Parent.Store(&h97) h99 := evmtypes.Head{ Number: 99, Hash: common.HexToHash("0x9bc2b51e147f9cad05f1614b7f1d8181cb24c544cbcf841f3155e54e752a3b44"), - Parent: &h98, } + h99.Parent.Store(&h98) h100 := evmtypes.Head{ Number: 100, Hash: common.HexToHash("0x5e7fadfc14e1cfa9c05a91128c16a20c6cbc3be38b4723c3d482d44bf9c0e07b"), - Parent: &h99, } + h100.Parent.Store(&h99) // no subscribers yet bs.headC <- &h100 @@ -353,8 +352,8 @@ func TestBlockSubscriber_Start(t *testing.T) { h101 := &evmtypes.Head{ Number: 101, Hash: common.HexToHash("0xc20c7b47466c081a44a3b168994e89affe85cb894547845d938f923b67c633c0"), - Parent: &h100, } + h101.Parent.Store(&h100) bs.headC <- h101 time.Sleep(100 * time.Millisecond) @@ -387,24 +386,24 @@ func TestBlockSubscriber_Start(t *testing.T) { new99 := &evmtypes.Head{ Number: 99, Hash: common.HexToHash("0x70c03acc4ddbfb253ba41a25dc13fb21b25da8b63bcd1aa7fb55713d33a36c71"), - Parent: &h98, } + new99.Parent.Store(&h98) new100 := &evmtypes.Head{ Number: 100, Hash: common.HexToHash("0x8a876b62d252e63e16cf3487db3486c0a7c0a8e06bc3792a3b116c5ca480503f"), - Parent: new99, } + new100.Parent.Store(new99) new101 := &evmtypes.Head{ Number: 101, Hash: common.HexToHash("0x41b5842b8847dcf834e39556d2ac51cc7d960a7de9471ec504673d0038fd6c8e"), - Parent: new100, } + new101.Parent.Store(new100) new102 := &evmtypes.Head{ Number: 102, Hash: common.HexToHash("0x9ac1ebc307554cf1bcfcc2a49462278e89d6878d613a33df38a64d0aeac971b5"), - Parent: new101, } + new102.Parent.Store(new101) bs.headC <- new102 diff --git a/core/services/ocrcommon/arbitrum_block_translator_test.go b/core/services/ocrcommon/arbitrum_block_translator_test.go index fa6875fb798..6b9abc93bf7 100644 --- a/core/services/ocrcommon/arbitrum_block_translator_test.go +++ b/core/services/ocrcommon/arbitrum_block_translator_test.go @@ -1,6 +1,7 @@ package ocrcommon_test import ( + "context" "database/sql" "math/big" mrand "math/rand" @@ -34,7 +35,7 @@ func TestArbitrumBlockTranslator_BinarySearch(t *testing.T) { var changedInL1Block int64 = 5541 latestBlock := blocks[1000] - client.On("HeadByNumber", ctx, (*big.Int)(nil)).Return(&latestBlock, nil).Once() + client.On("HeadByNumber", ctx, (*big.Int)(nil)).Return(latestBlock, nil).Once() from, to, err := abt.BinarySearch(ctx, changedInL1Block) require.NoError(t, err) @@ -51,11 +52,10 @@ func TestArbitrumBlockTranslator_BinarySearch(t *testing.T) { var changedInL1Block int64 = 42 latestBlock := blocks[1000] - client.On("HeadByNumber", ctx, (*big.Int)(nil)).Return(&latestBlock, nil).Once() + client.On("HeadByNumber", ctx, (*big.Int)(nil)).Return(latestBlock, nil).Once() - tmp := new(evmtypes.Head) - client.On("HeadByNumber", ctx, mock.AnythingOfType("*big.Int")).Return(tmp, nil).Run(func(args mock.Arguments) { - *tmp = blocks[args[1].(*big.Int).Int64()] + client.On("HeadByNumber", ctx, mock.AnythingOfType("*big.Int")).Return(func(_ context.Context, num *big.Int) (*evmtypes.Head, error) { + return blocks[num.Int64()], nil }) _, _, err := abt.BinarySearch(ctx, changedInL1Block) @@ -71,11 +71,10 @@ func TestArbitrumBlockTranslator_BinarySearch(t *testing.T) { var changedInL1Block int64 = 5043 latestBlock := blocks[1000] - client.On("HeadByNumber", ctx, (*big.Int)(nil)).Return(&latestBlock, nil).Once() + client.On("HeadByNumber", ctx, (*big.Int)(nil)).Return(latestBlock, nil).Once() - tmp := new(evmtypes.Head) - client.On("HeadByNumber", ctx, mock.AnythingOfType("*big.Int")).Return(tmp, nil).Run(func(args mock.Arguments) { - *tmp = blocks[args[1].(*big.Int).Int64()] + client.On("HeadByNumber", ctx, mock.AnythingOfType("*big.Int")).Return(func(_ context.Context, num *big.Int) (*evmtypes.Head, error) { + return blocks[num.Int64()], nil }) _, _, err := abt.BinarySearch(ctx, changedInL1Block) @@ -91,12 +90,10 @@ func TestArbitrumBlockTranslator_BinarySearch(t *testing.T) { var changedInL1Block int64 = 5042 latestBlock := blocks[1000] - client.On("HeadByNumber", ctx, (*big.Int)(nil)).Return(&latestBlock, nil).Once() + client.On("HeadByNumber", ctx, (*big.Int)(nil)).Return(latestBlock, nil).Once() - tmp := new(evmtypes.Head) - client.On("HeadByNumber", ctx, mock.AnythingOfType("*big.Int")).Return(tmp, nil).Run(func(args mock.Arguments) { - h := blocks[args[1].(*big.Int).Int64()] - *tmp = h + client.On("HeadByNumber", ctx, mock.AnythingOfType("*big.Int")).Return(func(_ context.Context, num *big.Int) (*evmtypes.Head, error) { + return blocks[num.Int64()], nil }) from, to, err := abt.BinarySearch(ctx, changedInL1Block) @@ -114,12 +111,10 @@ func TestArbitrumBlockTranslator_BinarySearch(t *testing.T) { var changedInL1Block int64 = 5000 latestBlock := blocks[1000] - client.On("HeadByNumber", ctx, (*big.Int)(nil)).Return(&latestBlock, nil).Once() + client.On("HeadByNumber", ctx, (*big.Int)(nil)).Return(latestBlock, nil).Once() - tmp := new(evmtypes.Head) - client.On("HeadByNumber", ctx, mock.AnythingOfType("*big.Int")).Return(tmp, nil).Run(func(args mock.Arguments) { - h := blocks[args[1].(*big.Int).Int64()] - *tmp = h + client.On("HeadByNumber", ctx, mock.AnythingOfType("*big.Int")).Return(func(_ context.Context, num *big.Int) (*evmtypes.Head, error) { + return blocks[num.Int64()], nil }) from, to, err := abt.BinarySearch(ctx, changedInL1Block) @@ -137,12 +132,10 @@ func TestArbitrumBlockTranslator_BinarySearch(t *testing.T) { var changedInL1Block int64 = 5540 latestBlock := blocks[1000] - client.On("HeadByNumber", ctx, (*big.Int)(nil)).Return(&latestBlock, nil).Once() + client.On("HeadByNumber", ctx, (*big.Int)(nil)).Return(latestBlock, nil).Once() - tmp := new(evmtypes.Head) - client.On("HeadByNumber", ctx, mock.AnythingOfType("*big.Int")).Return(tmp, nil).Run(func(args mock.Arguments) { - h := blocks[args[1].(*big.Int).Int64()] - *tmp = h + client.On("HeadByNumber", ctx, mock.AnythingOfType("*big.Int")).Return(func(_ context.Context, num *big.Int) (*evmtypes.Head, error) { + return blocks[num.Int64()], nil }) from, to, err := abt.BinarySearch(ctx, changedInL1Block) @@ -161,12 +154,10 @@ func TestArbitrumBlockTranslator_BinarySearch(t *testing.T) { latestBlock := blocks[1000] // Latest is never cached - client.On("HeadByNumber", ctx, (*big.Int)(nil)).Return(&latestBlock, nil).Once() + client.On("HeadByNumber", ctx, (*big.Int)(nil)).Return(latestBlock, nil).Once() - tmp := new(evmtypes.Head) - client.On("HeadByNumber", ctx, mock.AnythingOfType("*big.Int")).Times(20+18+14).Return(tmp, nil).Run(func(args mock.Arguments) { - h := blocks[args[1].(*big.Int).Int64()] - *tmp = h + client.On("HeadByNumber", ctx, mock.AnythingOfType("*big.Int")).Return(func(_ context.Context, num *big.Int) (*evmtypes.Head, error) { + return blocks[num.Int64()], nil }) // First search, nothing cached (total 21 - bsearch 20) @@ -230,14 +221,14 @@ func TestArbitrumBlockTranslator_NumberToQueryRange(t *testing.T) { }) } -func generateDeterministicL2Blocks() (heads []evmtypes.Head) { +func generateDeterministicL2Blocks() (heads []*evmtypes.Head) { source := mrand.NewSource(0) deterministicRand := mrand.New(source) l2max := 1000 var l1BlockNumber int64 = 5000 var parentHash common.Hash for i := 0; i <= l2max; i++ { - head := evmtypes.Head{ + head := &evmtypes.Head{ Number: int64(i), L1BlockNumber: sql.NullInt64{Int64: l1BlockNumber, Valid: true}, Hash: utils.NewHash(), diff --git a/core/services/relay/evm/mercury/v1/data_source_test.go b/core/services/relay/evm/mercury/v1/data_source_test.go index 197d802a3b3..7f5117a0aa8 100644 --- a/core/services/relay/evm/mercury/v1/data_source_test.go +++ b/core/services/relay/evm/mercury/v1/data_source_test.go @@ -332,16 +332,15 @@ func TestMercury_Observe(t *testing.T) { t.Run("when chain is too short", func(t *testing.T) { h4 := &evmtypes.Head{ Number: 4, - Parent: nil, } h5 := &evmtypes.Head{ Number: 5, - Parent: h4, } + h5.Parent.Store(h4) h6 := &evmtypes.Head{ Number: 6, - Parent: h5, } + h6.Parent.Store(h5) ht2 := htmocks.NewHeadTracker[*evmtypes.Head, common.Hash](t) ht2.On("LatestChain").Return(h6) @@ -362,7 +361,7 @@ func TestMercury_Observe(t *testing.T) { for i := range heads { heads[i] = &evmtypes.Head{Number: int64(i)} if i > 0 { - heads[i].Parent = heads[i-1] + heads[i].Parent.Store(heads[i-1]) } }