Skip to content

Commit

Permalink
LogPoller CLI command to resolve reorg greater than finality depth (#…
Browse files Browse the repository at this point in the history
…12867)

* find lca and remove block after CLI

* fix sort.Find typo

* make RemoveBlocks local cmd

* tests

* added changeset

* added tags to the changeset

* fixed tests

* make cmds, vars cases consistent
  • Loading branch information
dhaidashenko authored Apr 26, 2024
1 parent f55d8be commit 27d9413
Show file tree
Hide file tree
Showing 21 changed files with 745 additions and 2 deletions.
7 changes: 7 additions & 0 deletions .changeset/brave-dots-breathe.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"chainlink": minor
---

Added a new CLI command, `blocks find-lca,` which finds the latest block that is available in both the database and on the chain for the specified chain.
Added a new CLI command, `node remove-blocks,` which removes all blocks and logs greater than or equal to the specified block number.
#nops #added
8 changes: 8 additions & 0 deletions core/chains/evm/logpoller/disabled.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,11 @@ func (d disabled) LatestBlockByEventSigsAddrsWithConfs(ctx context.Context, from
func (d disabled) LogsDataWordBetween(ctx context.Context, eventSig common.Hash, address common.Address, wordIndexMin, wordIndexMax int, wordValue common.Hash, confs Confirmations) ([]Log, error) {
return nil, ErrDisabled
}

func (d disabled) FindLCA(ctx context.Context) (*LogPollerBlock, error) {
return nil, ErrDisabled
}

func (d disabled) DeleteLogsAndBlocksAfter(ctx context.Context, start int64) error {
return ErrDisabled
}
99 changes: 99 additions & 0 deletions core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ type LogPoller interface {
GetFilters() map[string]Filter
LatestBlock(ctx context.Context) (LogPollerBlock, error)
GetBlocksRange(ctx context.Context, numbers []uint64) ([]LogPollerBlock, error)
FindLCA(ctx context.Context) (*LogPollerBlock, error)
DeleteLogsAndBlocksAfter(ctx context.Context, start int64) error

// General querying
Logs(ctx context.Context, start, end int64, eventSig common.Hash, address common.Address) ([]Log, error)
Expand Down Expand Up @@ -1422,6 +1424,103 @@ func (lp *logPoller) IndexedLogsWithSigsExcluding(ctx context.Context, address c
return lp.orm.SelectIndexedLogsWithSigsExcluding(ctx, eventSigA, eventSigB, topicIndex, address, fromBlock, toBlock, confs)
}

// DeleteLogsAndBlocksAfter - removes blocks and logs starting from the specified block
func (lp *logPoller) DeleteLogsAndBlocksAfter(ctx context.Context, start int64) error {
return lp.orm.DeleteLogsAndBlocksAfter(ctx, start)
}

func (lp *logPoller) FindLCA(ctx context.Context) (*LogPollerBlock, error) {
latest, err := lp.orm.SelectLatestBlock(ctx)
if err != nil {
return nil, fmt.Errorf("failed to select the latest block: %w", err)
}

oldest, err := lp.orm.SelectOldestBlock(ctx, 0)
if err != nil {
return nil, fmt.Errorf("failed to select the oldest block: %w", err)
}

if latest == nil || oldest == nil {
return nil, fmt.Errorf("expected at least one block to be present in DB")
}

lp.lggr.Debugf("Received request to find LCA. Searching in range [%d, %d]", oldest.BlockNumber, latest.BlockNumber)

// Find the largest block number for which block hash stored in the DB matches one that we get from the RPC.
// `sort.Find` expects slice of following format s = [1, 0, -1] and returns smallest index i for which s[i] = 0.
// To utilise `sort.Find` we represent range of blocks as slice [latestBlock, latestBlock-1, ..., olderBlock+1, oldestBlock]
// and return 1 if DB block was reorged or 0 if it's still present on chain.
lcaI, found := sort.Find(int(latest.BlockNumber-oldest.BlockNumber)+1, func(i int) int {
const notFound = 1
const found = 0
// if there is an error - stop the search
if err != nil {
return notFound
}

// canceled search
if ctx.Err() != nil {
err = fmt.Errorf("aborted, FindLCA request cancelled: %w", ctx.Err())
return notFound
}
iBlockNumber := latest.BlockNumber - int64(i)
var dbBlock *LogPollerBlock
// Block with specified block number might not exist in the database, to address that we check closest child
// of the iBlockNumber. If the child is present on chain, it's safe to assume that iBlockNumber is present too
dbBlock, err = lp.orm.SelectOldestBlock(ctx, iBlockNumber)
if err != nil {
err = fmt.Errorf("failed to select block %d by number: %w", iBlockNumber, err)
return notFound
}

if dbBlock == nil {
err = fmt.Errorf("expected block to exist with blockNumber >= %d as observed block with number %d", iBlockNumber, latest.BlockNumber)
return notFound
}

lp.lggr.Debugf("Looking for matching block on chain blockNumber: %d blockHash: %s",
dbBlock.BlockNumber, dbBlock.BlockHash)
var chainBlock *evmtypes.Head
chainBlock, err = lp.ec.HeadByHash(ctx, dbBlock.BlockHash)
// our block in DB does not exist on chain
if (chainBlock == nil && err == nil) || errors.Is(err, ethereum.NotFound) {
err = nil
return notFound
}
if err != nil {
err = fmt.Errorf("failed to get block %s from RPC: %w", dbBlock.BlockHash, err)
return notFound
}

if chainBlock.BlockNumber() != dbBlock.BlockNumber {
err = fmt.Errorf("expected block numbers to match (db: %d, chain: %d), if block hashes match "+
"(db: %s, chain: %s)", dbBlock.BlockNumber, chainBlock.BlockNumber(), dbBlock.BlockHash, chainBlock.Hash)
return notFound
}

return found
})
if err != nil {
return nil, fmt.Errorf("failed to find: %w", err)
}

if !found {
return nil, fmt.Errorf("failed to find LCA, this means that whole database LogPoller state was reorged out of chain or RPC/Core node is misconfigured")
}

lcaBlockNumber := latest.BlockNumber - int64(lcaI)
lca, err := lp.orm.SelectBlockByNumber(ctx, lcaBlockNumber)
if err != nil {
return nil, fmt.Errorf("failed to select lca from db: %w", err)
}

if lca == nil {
return nil, fmt.Errorf("expected lca (blockNum: %d) to exist in DB", lcaBlockNumber)
}

return lca, nil
}

func EvmWord(i uint64) common.Hash {
var b = make([]byte, 8)
binary.BigEndian.PutUint64(b, i)
Expand Down
116 changes: 116 additions & 0 deletions core/chains/evm/logpoller/log_poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1921,3 +1921,119 @@ func markBlockAsFinalizedByHash(t *testing.T, th TestHarness, blockHash common.H
require.NoError(t, err)
th.Client.Blockchain().SetFinalized(b.Header())
}

func TestFindLCA(t *testing.T) {
ctx := testutils.Context(t)
ec := evmtest.NewEthClientMockWithDefaultChain(t)
lggr := logger.Test(t)
chainID := testutils.NewRandomEVMChainID()
db := pgtest.NewSqlxDB(t)

orm := logpoller.NewORM(chainID, db, lggr)

lpOpts := logpoller.Opts{
PollPeriod: time.Hour,
FinalityDepth: 2,
BackfillBatchSize: 20,
RpcBatchSize: 10,
KeepFinalizedBlocksDepth: 1000,
}

lp := logpoller.NewLogPoller(orm, ec, lggr, lpOpts)
t.Run("Fails, if failed to select oldest block", func(t *testing.T) {
_, err := lp.FindLCA(ctx)
require.ErrorContains(t, err, "failed to select the latest block")
})
// oldest
require.NoError(t, orm.InsertBlock(ctx, common.HexToHash("0x123"), 10, time.Now(), 0))
// latest
latestBlockHash := common.HexToHash("0x124")
require.NoError(t, orm.InsertBlock(ctx, latestBlockHash, 16, time.Now(), 0))
t.Run("Fails, if caller's context canceled", func(t *testing.T) {
lCtx, cancel := context.WithCancel(ctx)
ec.On("HeadByHash", mock.Anything, latestBlockHash).Return(nil, nil).Run(func(_ mock.Arguments) {
cancel()
}).Once()
_, err := lp.FindLCA(lCtx)
require.ErrorContains(t, err, "aborted, FindLCA request cancelled")

})
t.Run("Fails, if RPC returns an error", func(t *testing.T) {
expectedError := fmt.Errorf("failed to call RPC")
ec.On("HeadByHash", mock.Anything, latestBlockHash).Return(nil, expectedError).Once()
_, err := lp.FindLCA(ctx)
require.ErrorContains(t, err, expectedError.Error())
})
t.Run("Fails, if block numbers do not match", func(t *testing.T) {
ec.On("HeadByHash", mock.Anything, latestBlockHash).Return(&evmtypes.Head{
Number: 123,
}, nil).Once()
_, err := lp.FindLCA(ctx)
require.ErrorContains(t, err, "expected block numbers to match")
})
t.Run("Fails, if none of the blocks in db matches on chain", func(t *testing.T) {
ec.On("HeadByHash", mock.Anything, mock.Anything).Return(nil, nil).Times(3)
_, err := lp.FindLCA(ctx)
require.ErrorContains(t, err, "failed to find LCA, this means that whole database LogPoller state was reorged out of chain or RPC/Core node is misconfigured")
})

type block struct {
BN int
Exists bool
}
testCases := []struct {
Name string
Blocks []block
ExpectedBlockNumber int
ExpectedError error
}{
{
Name: "All of the blocks are present on chain - returns the latest",
Blocks: []block{{BN: 1, Exists: true}, {BN: 2, Exists: true}, {BN: 3, Exists: true}, {BN: 4, Exists: true}},
ExpectedBlockNumber: 4,
},
{
Name: "None of the blocks exists on chain - returns an erro",
Blocks: []block{{BN: 1, Exists: false}, {BN: 2, Exists: false}, {BN: 3, Exists: false}, {BN: 4, Exists: false}},
ExpectedBlockNumber: 0,
ExpectedError: fmt.Errorf("failed to find LCA, this means that whole database LogPoller state was reorged out of chain or RPC/Core node is misconfigured"),
},
{
Name: "Only latest block does not exist",
Blocks: []block{{BN: 1, Exists: true}, {BN: 2, Exists: true}, {BN: 3, Exists: true}, {BN: 4, Exists: false}},
ExpectedBlockNumber: 3,
},
{
Name: "Only oldest block exists on chain",
Blocks: []block{{BN: 1, Exists: true}, {BN: 2, Exists: false}, {BN: 3, Exists: false}, {BN: 4, Exists: false}},
ExpectedBlockNumber: 1,
},
}

blockHashI := int64(0)
for _, tc := range testCases {
t.Run(tc.Name, func(t *testing.T) {
// reset the database
require.NoError(t, orm.DeleteLogsAndBlocksAfter(ctx, 0))
for _, b := range tc.Blocks {
blockHashI++
hash := common.BigToHash(big.NewInt(blockHashI))
require.NoError(t, orm.InsertBlock(ctx, hash, int64(b.BN), time.Now(), 0))
// Hashes are unique for all test cases
var onChainBlock *evmtypes.Head
if b.Exists {
onChainBlock = &evmtypes.Head{Number: int64(b.BN)}
}
ec.On("HeadByHash", mock.Anything, hash).Return(onChainBlock, nil).Maybe()
}

result, err := lp.FindLCA(ctx)
if tc.ExpectedError != nil {
require.ErrorContains(t, err, tc.ExpectedError.Error())
} else {
require.NotNil(t, result)
require.Equal(t, result.BlockNumber, int64(tc.ExpectedBlockNumber), "expected block numbers to match")
}
})
}
}
48 changes: 48 additions & 0 deletions core/chains/evm/logpoller/mocks/log_poller.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions core/chains/evm/logpoller/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,12 @@ func (o *ObservedORM) SelectLatestBlock(ctx context.Context) (*LogPollerBlock, e
})
}

func (o *ObservedORM) SelectOldestBlock(ctx context.Context, minAllowedBlockNumber int64) (*LogPollerBlock, error) {
return withObservedQuery(o, "SelectOldestBlock", func() (*LogPollerBlock, error) {
return o.ORM.SelectOldestBlock(ctx, minAllowedBlockNumber)
})
}

func (o *ObservedORM) SelectLatestLogByEventSigWithConfs(ctx context.Context, eventSig common.Hash, address common.Address, confs Confirmations) (*Log, error) {
return withObservedQuery(o, "SelectLatestLogByEventSigWithConfs", func() (*Log, error) {
return o.ORM.SelectLatestLogByEventSigWithConfs(ctx, eventSig, address, confs)
Expand Down
9 changes: 9 additions & 0 deletions core/chains/evm/logpoller/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type ORM interface {
SelectBlockByNumber(ctx context.Context, blockNumber int64) (*LogPollerBlock, error)
SelectBlockByHash(ctx context.Context, hash common.Hash) (*LogPollerBlock, error)
SelectLatestBlock(ctx context.Context) (*LogPollerBlock, error)
SelectOldestBlock(ctx context.Context, minAllowedBlockNumber int64) (*LogPollerBlock, error)

SelectLogs(ctx context.Context, start, end int64, address common.Address, eventSig common.Hash) ([]Log, error)
SelectLogsWithSigs(ctx context.Context, start, end int64, address common.Address, eventSigs []common.Hash) ([]Log, error)
Expand Down Expand Up @@ -202,6 +203,14 @@ func (o *DSORM) SelectLatestBlock(ctx context.Context) (*LogPollerBlock, error)
return &b, nil
}

func (o *DSORM) SelectOldestBlock(ctx context.Context, minAllowedBlockNumber int64) (*LogPollerBlock, error) {
var b LogPollerBlock
if err := o.ds.GetContext(ctx, &b, `SELECT * FROM evm.log_poller_blocks WHERE evm_chain_id = $1 AND block_number >= $2 ORDER BY block_number ASC LIMIT 1`, ubig.New(o.chainID), minAllowedBlockNumber); err != nil {
return nil, err
}
return &b, nil
}

func (o *DSORM) SelectLatestLogByEventSigWithConfs(ctx context.Context, eventSig common.Hash, address common.Address, confs Confirmations) (*Log, error) {
args, err := newQueryArgsForEvent(o.chainID, address, eventSig).
withConfs(confs).
Expand Down
30 changes: 30 additions & 0 deletions core/chains/evm/logpoller/orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1759,3 +1759,33 @@ func Benchmark_DeleteExpiredLogs(b *testing.B) {
assert.NoError(b, err1)
}
}

func TestSelectOldestBlock(t *testing.T) {
th := SetupTH(t, lpOpts)
o1 := th.ORM
o2 := th.ORM2
ctx := testutils.Context(t)
t.Run("Selects oldest within given chain", func(t *testing.T) {
// insert blocks
require.NoError(t, o2.InsertBlock(ctx, common.HexToHash("0x1231"), 11, time.Now(), 0))
require.NoError(t, o2.InsertBlock(ctx, common.HexToHash("0x1232"), 12, time.Now(), 0))
// insert newer block from different chain
require.NoError(t, o1.InsertBlock(ctx, common.HexToHash("0x1233"), 13, time.Now(), 0))
require.NoError(t, o1.InsertBlock(ctx, common.HexToHash("0x1231"), 14, time.Now(), 0))
block, err := o1.SelectOldestBlock(ctx, 0)
require.NoError(t, err)
require.NotNil(t, block)
require.Equal(t, block.BlockNumber, int64(13))
require.Equal(t, block.BlockHash, common.HexToHash("0x1233"))
})
t.Run("Does not select blocks older than specified limit", func(t *testing.T) {
require.NoError(t, o1.InsertBlock(ctx, common.HexToHash("0x1232"), 11, time.Now(), 0))
require.NoError(t, o1.InsertBlock(ctx, common.HexToHash("0x1233"), 13, time.Now(), 0))
require.NoError(t, o1.InsertBlock(ctx, common.HexToHash("0x1234"), 15, time.Now(), 0))
block, err := o1.SelectOldestBlock(ctx, 12)
require.NoError(t, err)
require.NotNil(t, block)
require.Equal(t, block.BlockNumber, int64(13))
require.Equal(t, block.BlockHash, common.HexToHash("0x1233"))
})
}
Loading

0 comments on commit 27d9413

Please sign in to comment.