Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CCIP-1230 Exposing entire LogPollerBlock from LatestBlock in LogPoller #11105

Merged
merged 3 commits into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion core/chains/evm/logpoller/disabled.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ func (disabled) UnregisterFilter(name string, qopts ...pg.QOpt) error { return E

func (disabled) HasFilter(name string) bool { return false }

func (disabled) LatestBlock(qopts ...pg.QOpt) (int64, error) { return -1, ErrDisabled }
func (disabled) LatestBlock(qopts ...pg.QOpt) (LogPollerBlock, error) {
return LogPollerBlock{}, ErrDisabled
}

func (disabled) GetBlocksRange(ctx context.Context, numbers []uint64, qopts ...pg.QOpt) ([]LogPollerBlock, error) {
return nil, ErrDisabled
Expand Down
2 changes: 1 addition & 1 deletion core/chains/evm/logpoller/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func SetupTH(t testing.TB, useFinalityTag bool, finalityDepth, backfillBatchSize
func (th *TestHarness) PollAndSaveLogs(ctx context.Context, currentBlockNumber int64) int64 {
th.LogPoller.PollAndSaveLogs(ctx, currentBlockNumber)
latest, _ := th.LogPoller.LatestBlock(pg.WithParentCtx(ctx))
return latest + 1
return latest.BlockNumber + 1
}

func (th *TestHarness) assertDontHave(t *testing.T, start, end int) {
Expand Down
8 changes: 4 additions & 4 deletions core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type LogPoller interface {
RegisterFilter(filter Filter, qopts ...pg.QOpt) error
UnregisterFilter(name string, qopts ...pg.QOpt) error
HasFilter(name string) bool
LatestBlock(qopts ...pg.QOpt) (int64, error)
LatestBlock(qopts ...pg.QOpt) (LogPollerBlock, error)
GetBlocksRange(ctx context.Context, numbers []uint64, qopts ...pg.QOpt) ([]LogPollerBlock, error)

// General querying
Expand Down Expand Up @@ -1018,13 +1018,13 @@ func (lp *logPoller) IndexedLogsTopicRange(eventSig common.Hash, address common.

// LatestBlock returns the latest block the log poller is on. It tracks blocks to be able
// to detect reorgs.
func (lp *logPoller) LatestBlock(qopts ...pg.QOpt) (int64, error) {
func (lp *logPoller) LatestBlock(qopts ...pg.QOpt) (LogPollerBlock, error) {
b, err := lp.orm.SelectLatestBlock(qopts...)
if err != nil {
return 0, err
return LogPollerBlock{}, err
}

return b.BlockNumber, nil
return *b, nil
}

func (lp *logPoller) BlockByNumber(n int64, qopts ...pg.QOpt) (*LogPollerBlock, error) {
Expand Down
2 changes: 1 addition & 1 deletion core/chains/evm/logpoller/log_poller_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func TestLogPoller_Replay(t *testing.T) {
lp.PollAndSaveLogs(tctx, 4)
latest, err := lp.LatestBlock()
require.NoError(t, err)
require.Equal(t, int64(4), latest)
require.Equal(t, int64(4), latest.BlockNumber)

t.Run("abort before replayStart received", func(t *testing.T) {
// Replay() should abort immediately if caller's context is cancelled before request signal is read
Expand Down
39 changes: 25 additions & 14 deletions core/chains/evm/logpoller/log_poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,8 @@ func Test_BackupLogPoller(t *testing.T) {
body.Transactions = types.Transactions{} // number of tx's must match # of logs for GetLogs() to succeed
rawdb.WriteBody(th.EthDB, h.Hash(), h.Number.Uint64(), body)

currentBlock := th.PollAndSaveLogs(ctx, 1)
assert.Equal(t, int64(35), currentBlock)
currentBlockNumber := th.PollAndSaveLogs(ctx, 1)
assert.Equal(t, int64(35), currentBlockNumber)

// simulate logs becoming available
rawdb.WriteReceipts(th.EthDB, h.Hash(), h.Number.Uint64(), receipts)
Expand Down Expand Up @@ -342,12 +342,12 @@ func Test_BackupLogPoller(t *testing.T) {
markBlockAsFinalized(t, th, 34)

// Run ordinary poller + backup poller at least once
currentBlock, _ = th.LogPoller.LatestBlock(pg.WithParentCtx(testutils.Context(t)))
th.LogPoller.PollAndSaveLogs(ctx, currentBlock+1)
currentBlock, _ := th.LogPoller.LatestBlock(pg.WithParentCtx(testutils.Context(t)))
th.LogPoller.PollAndSaveLogs(ctx, currentBlock.BlockNumber+1)
th.LogPoller.BackupPollAndSaveLogs(ctx, 100)
currentBlock, _ = th.LogPoller.LatestBlock(pg.WithParentCtx(testutils.Context(t)))

require.Equal(t, int64(37), currentBlock+1)
require.Equal(t, int64(37), currentBlock.BlockNumber+1)

// logs still shouldn't show up, because we don't want to backfill the last finalized log
// to help with reorg detection
Expand All @@ -359,11 +359,11 @@ func Test_BackupLogPoller(t *testing.T) {
markBlockAsFinalized(t, th, 35)

// Run ordinary poller + backup poller at least once more
th.LogPoller.PollAndSaveLogs(ctx, currentBlock+1)
th.LogPoller.PollAndSaveLogs(ctx, currentBlockNumber+1)
th.LogPoller.BackupPollAndSaveLogs(ctx, 100)
currentBlock, _ = th.LogPoller.LatestBlock(pg.WithParentCtx(testutils.Context(t)))

require.Equal(t, int64(38), currentBlock+1)
require.Equal(t, int64(38), currentBlock.BlockNumber+1)

// all 3 logs in block 34 should show up now, thanks to backup logger
logs, err = th.LogPoller.Logs(30, 37, EmitterABI.Events["Log1"].ID, th.EmitterAddress1,
Expand Down Expand Up @@ -471,6 +471,13 @@ func TestLogPoller_BackupPollAndSaveLogsWithDeepBlockDelay(t *testing.T) {
// 1 -> 2 -> ...
th.PollAndSaveLogs(ctx, 1)

// Check that latest block has the same properties as the head
latestBlock, err := th.LogPoller.LatestBlock()
require.NoError(t, err)
assert.Equal(t, latestBlock.BlockNumber, header.Number.Int64())
assert.Equal(t, latestBlock.FinalizedBlockNumber, header.Number.Int64())
assert.Equal(t, latestBlock.BlockHash, header.Hash())

// Register filter
err = th.LogPoller.RegisterFilter(logpoller.Filter{
Name: "Test Emitter",
Expand Down Expand Up @@ -619,7 +626,7 @@ func TestLogPoller_BlockTimestamps(t *testing.T) {
require.Len(t, gethLogs, 2)

lb, _ := th.LogPoller.LatestBlock(pg.WithParentCtx(testutils.Context(t)))
th.PollAndSaveLogs(context.Background(), lb+1)
th.PollAndSaveLogs(context.Background(), lb.BlockNumber+1)
lg1, err := th.LogPoller.Logs(0, 20, EmitterABI.Events["Log1"].ID, th.EmitterAddress1,
pg.WithParentCtx(testutils.Context(t)))
require.NoError(t, err)
Expand Down Expand Up @@ -667,9 +674,9 @@ func TestLogPoller_SynchronizedWithGeth(t *testing.T) {
for i := 0; i < finalityDepth; i++ { // Have enough blocks that we could reorg the full finalityDepth-1.
ec.Commit()
}
currentBlock := int64(1)
lp.PollAndSaveLogs(testutils.Context(t), currentBlock)
currentBlock, err = lp.LatestBlock(pg.WithParentCtx(testutils.Context(t)))
currentBlockNumber := int64(1)
lp.PollAndSaveLogs(testutils.Context(t), currentBlockNumber)
currentBlock, err := lp.LatestBlock(pg.WithParentCtx(testutils.Context(t)))
require.NoError(t, err)
matchesGeth := func() bool {
// Check every block is identical
Expand Down Expand Up @@ -719,7 +726,7 @@ func TestLogPoller_SynchronizedWithGeth(t *testing.T) {
require.NoError(t, err1)
t.Logf("New latest (%v, %x), latest parent %x)\n", latest.NumberU64(), latest.Hash(), latest.ParentHash())
}
lp.PollAndSaveLogs(testutils.Context(t), currentBlock)
lp.PollAndSaveLogs(testutils.Context(t), currentBlock.BlockNumber)
currentBlock, err = lp.LatestBlock(pg.WithParentCtx(testutils.Context(t)))
require.NoError(t, err)
}
Expand Down Expand Up @@ -1245,7 +1252,7 @@ func TestGetReplayFromBlock(t *testing.T) {
require.NoError(t, err)
latest, err := th.LogPoller.LatestBlock(pg.WithParentCtx(testutils.Context(t)))
require.NoError(t, err)
assert.Equal(t, latest, fromBlock)
assert.Equal(t, latest.BlockNumber, fromBlock)

// Should take min(latest, requested) in this case requested.
requested = int64(7)
Expand Down Expand Up @@ -1551,6 +1558,10 @@ func Test_PollAndSavePersistsFinalityInBlocks(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
th := SetupTH(t, tt.useFinalityTag, tt.finalityDepth, 3, 2, 1000)
// Should return error before the first poll and save
_, err := th.LogPoller.LatestBlock()
require.Error(t, err)

// Mark first block as finalized
h := th.Client.Blockchain().CurrentHeader()
th.Client.Blockchain().SetFinalized(h)
Expand All @@ -1562,7 +1573,7 @@ func Test_PollAndSavePersistsFinalityInBlocks(t *testing.T) {

th.PollAndSaveLogs(ctx, 1)

latestBlock, err := th.ORM.SelectLatestBlock()
latestBlock, err := th.LogPoller.LatestBlock()
require.NoError(t, err)
require.Equal(t, int64(numberOfBlocks), latestBlock.BlockNumber)
require.Equal(t, tt.expectedFinalizedBlock, latestBlock.FinalizedBlockNumber)
Expand Down
10 changes: 5 additions & 5 deletions core/chains/evm/logpoller/mocks/log_poller.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions core/services/blockhashstore/coordinators.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (v *V1Coordinator) Fulfillments(ctx context.Context, fromBlock uint64) ([]E

logs, err := v.lp.LogsWithSigs(
int64(fromBlock),
int64(toBlock),
toBlock.BlockNumber,
[]common.Hash{
v1.VRFCoordinatorRandomnessRequestFulfilled{}.Topic(),
},
Expand Down Expand Up @@ -219,7 +219,7 @@ func (v *V2Coordinator) Fulfillments(ctx context.Context, fromBlock uint64) ([]E

logs, err := v.lp.LogsWithSigs(
int64(fromBlock),
int64(toBlock),
toBlock.BlockNumber,
[]common.Hash{
v2.VRFCoordinatorV2RandomWordsFulfilled{}.Topic(),
},
Expand Down Expand Up @@ -310,7 +310,7 @@ func (v *V2PlusCoordinator) Fulfillments(ctx context.Context, fromBlock uint64)

logs, err := v.lp.LogsWithSigs(
int64(fromBlock),
int64(toBlock),
toBlock.BlockNumber,
[]common.Hash{
v2plus.IVRFCoordinatorV2PlusInternalRandomWordsFulfilled{}.Topic(),
},
Expand Down
2 changes: 1 addition & 1 deletion core/services/blockhashstore/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func (d *Delegate) ServicesForSpec(jb job.Job) ([]job.ServiceCtx, error) {
if err != nil {
return 0, errors.Wrap(err, "getting chain head")
}
return uint64(head), nil
return uint64(head.BlockNumber), nil
})

return []job.ServiceCtx{&service{
Expand Down
3 changes: 2 additions & 1 deletion core/services/blockhashstore/delegate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/smartcontractkit/chainlink/v2/core/chains/evm"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/client/mocks"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
mocklp "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller/mocks"
"github.com/smartcontractkit/chainlink/v2/core/internal/cltest"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
Expand Down Expand Up @@ -58,7 +59,7 @@ func createTestDelegate(t *testing.T) (*blockhashstore.Delegate, *testData) {
sendingKey, _ := cltest.MustInsertRandomKey(t, kst)
lp := &mocklp.LogPoller{}
lp.On("RegisterFilter", mock.Anything).Return(nil)
lp.On("LatestBlock", mock.Anything, mock.Anything).Return(int64(0), nil)
lp.On("LatestBlock", mock.Anything, mock.Anything).Return(logpoller.LogPollerBlock{}, nil)

relayExtenders := evmtest.NewChainRelayExtenders(
t,
Expand Down
6 changes: 3 additions & 3 deletions core/services/blockhashstore/feeder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ func (test testCase) testFeederWithLogPollerVRFv1(t *testing.T) {

// Mock log poller.
lp.On("LatestBlock", mock.Anything).
Return(latest, nil)
Return(logpoller.LogPollerBlock{BlockNumber: latest}, nil)
lp.On(
"LogsWithSigs",
fromBlock,
Expand Down Expand Up @@ -543,7 +543,7 @@ func (test testCase) testFeederWithLogPollerVRFv2(t *testing.T) {

// Mock log poller.
lp.On("LatestBlock", mock.Anything).
Return(latest, nil)
Return(logpoller.LogPollerBlock{BlockNumber: latest}, nil)
lp.On(
"LogsWithSigs",
fromBlock,
Expand Down Expand Up @@ -641,7 +641,7 @@ func (test testCase) testFeederWithLogPollerVRFv2Plus(t *testing.T) {

// Mock log poller.
lp.On("LatestBlock", mock.Anything).
Return(latest, nil)
Return(logpoller.LogPollerBlock{BlockNumber: latest}, nil)
lp.On(
"LogsWithSigs",
fromBlock,
Expand Down
24 changes: 12 additions & 12 deletions core/services/ocr2/plugins/ocr2keeper/evm20/log_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,8 @@ func (c *LogProvider) PerformLogs(ctx context.Context) ([]ocr2keepers.PerformLog
// always check the last lookback number of blocks and rebroadcast
// this allows the plugin to make decisions based on event confirmations
logs, err := c.logPoller.LogsWithSigs(
end-c.lookbackBlocks,
end,
end.BlockNumber-c.lookbackBlocks,
end.BlockNumber,
[]common.Hash{
registry.KeeperRegistryUpkeepPerformed{}.Topic(),
},
Expand All @@ -174,7 +174,7 @@ func (c *LogProvider) PerformLogs(ctx context.Context) ([]ocr2keepers.PerformLog
Key: UpkeepKeyHelper[uint32]{}.MakeUpkeepKey(p.CheckBlockNumber, p.Id),
TransmitBlock: BlockKeyHelper[int64]{}.MakeBlockKey(p.BlockNumber),
TransactionHash: p.TxHash.Hex(),
Confirmations: end - p.BlockNumber,
Confirmations: end.BlockNumber - p.BlockNumber,
}
vals = append(vals, l)
}
Expand All @@ -193,8 +193,8 @@ func (c *LogProvider) StaleReportLogs(ctx context.Context) ([]ocr2keepers.StaleR

// ReorgedUpkeepReportLogs
logs, err := c.logPoller.LogsWithSigs(
end-c.lookbackBlocks,
end,
end.BlockNumber-c.lookbackBlocks,
end.BlockNumber,
[]common.Hash{
registry.KeeperRegistryReorgedUpkeepReport{}.Topic(),
},
Expand All @@ -211,8 +211,8 @@ func (c *LogProvider) StaleReportLogs(ctx context.Context) ([]ocr2keepers.StaleR

// StaleUpkeepReportLogs
logs, err = c.logPoller.LogsWithSigs(
end-c.lookbackBlocks,
end,
end.BlockNumber-c.lookbackBlocks,
end.BlockNumber,
[]common.Hash{
registry.KeeperRegistryStaleUpkeepReport{}.Topic(),
},
Expand All @@ -229,8 +229,8 @@ func (c *LogProvider) StaleReportLogs(ctx context.Context) ([]ocr2keepers.StaleR

// InsufficientFundsUpkeepReportLogs
logs, err = c.logPoller.LogsWithSigs(
end-c.lookbackBlocks,
end,
end.BlockNumber-c.lookbackBlocks,
end.BlockNumber,
[]common.Hash{
registry.KeeperRegistryInsufficientFundsUpkeepReport{}.Topic(),
},
Expand All @@ -257,7 +257,7 @@ func (c *LogProvider) StaleReportLogs(ctx context.Context) ([]ocr2keepers.StaleR
Key: encoding.BasicEncoder{}.MakeUpkeepKey(checkBlockNumber, upkeepId),
TransmitBlock: BlockKeyHelper[int64]{}.MakeBlockKey(r.BlockNumber),
TransactionHash: r.TxHash.Hex(),
Confirmations: end - r.BlockNumber,
Confirmations: end.BlockNumber - r.BlockNumber,
}
vals = append(vals, l)
}
Expand All @@ -272,7 +272,7 @@ func (c *LogProvider) StaleReportLogs(ctx context.Context) ([]ocr2keepers.StaleR
Key: encoding.BasicEncoder{}.MakeUpkeepKey(checkBlockNumber, upkeepId),
TransmitBlock: BlockKeyHelper[int64]{}.MakeBlockKey(r.BlockNumber),
TransactionHash: r.TxHash.Hex(),
Confirmations: end - r.BlockNumber,
Confirmations: end.BlockNumber - r.BlockNumber,
}
vals = append(vals, l)
}
Expand All @@ -287,7 +287,7 @@ func (c *LogProvider) StaleReportLogs(ctx context.Context) ([]ocr2keepers.StaleR
Key: encoding.BasicEncoder{}.MakeUpkeepKey(checkBlockNumber, upkeepId),
TransmitBlock: BlockKeyHelper[int64]{}.MakeBlockKey(r.BlockNumber),
TransactionHash: r.TxHash.Hex(),
Confirmations: end - r.BlockNumber,
Confirmations: end.BlockNumber - r.BlockNumber,
}
vals = append(vals, l)
}
Expand Down
10 changes: 5 additions & 5 deletions core/services/ocr2/plugins/ocr2keeper/evm20/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ func (r *EvmRegistry) initialize() error {

func (r *EvmRegistry) pollLogs() error {
var latest int64
var end int64
var end logpoller.LogPollerBlock
var err error

if end, err = r.poller.LatestBlock(pg.WithParentCtx(r.ctx)); err != nil {
Expand All @@ -355,20 +355,20 @@ func (r *EvmRegistry) pollLogs() error {

r.mu.Lock()
latest = r.lastPollBlock
r.lastPollBlock = end
r.lastPollBlock = end.BlockNumber
r.mu.Unlock()

// if start and end are the same, no polling needs to be done
if latest == 0 || latest == end {
if latest == 0 || latest == end.BlockNumber {
return nil
}

{
var logs []logpoller.Log

if logs, err = r.poller.LogsWithSigs(
end-logEventLookback,
end,
end.BlockNumber-logEventLookback,
end.BlockNumber,
upkeepStateEvents,
r.addr,
pg.WithParentCtx(r.ctx),
Expand Down
Loading
Loading