Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry picking some LogPoller changes from the main repo #282

Merged
merged 3 commits into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion core/chains/evm/logpoller/disabled.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ func (disabled) UnregisterFilter(name string, qopts ...pg.QOpt) error { return E

func (disabled) HasFilter(name string) bool { return false }

func (disabled) LatestBlock(qopts ...pg.QOpt) (int64, error) { return -1, ErrDisabled }
func (disabled) LatestBlock(qopts ...pg.QOpt) (LogPollerBlock, error) {
return LogPollerBlock{}, ErrDisabled
}

func (disabled) GetBlocksRange(ctx context.Context, numbers []uint64, qopts ...pg.QOpt) ([]LogPollerBlock, error) {
return nil, ErrDisabled
Expand Down
2 changes: 1 addition & 1 deletion core/chains/evm/logpoller/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func SetupTH(t testing.TB, useFinalityTag bool, finalityDepth, backfillBatchSize
func (th *TestHarness) PollAndSaveLogs(ctx context.Context, currentBlockNumber int64) int64 {
th.LogPoller.PollAndSaveLogs(ctx, currentBlockNumber)
latest, _ := th.LogPoller.LatestBlock(pg.WithParentCtx(ctx))
return latest + 1
return latest.BlockNumber + 1
}

func (th *TestHarness) assertDontHave(t *testing.T, start, end int) {
Expand Down
47 changes: 11 additions & 36 deletions core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type LogPoller interface {
RegisterFilter(filter Filter, qopts ...pg.QOpt) error
UnregisterFilter(name string, qopts ...pg.QOpt) error
HasFilter(name string) bool
LatestBlock(qopts ...pg.QOpt) (int64, error)
LatestBlock(qopts ...pg.QOpt) (LogPollerBlock, error)
GetBlocksRange(ctx context.Context, numbers []uint64, qopts ...pg.QOpt) ([]LogPollerBlock, error)

// General querying
Expand Down Expand Up @@ -676,9 +676,7 @@ func (lp *logPoller) backfill(ctx context.Context, start, end int64) error {
}

lp.lggr.Debugw("Backfill found logs", "from", from, "to", to, "logs", len(gethLogs), "blocks", blocks)
err = lp.orm.Q().WithOpts(pg.WithParentCtx(ctx)).Transaction(func(tx pg.Queryer) error {
return lp.orm.InsertLogs(convertLogs(gethLogs, blocks, lp.lggr, lp.ec.ConfiguredChainID()), pg.WithQueryer(tx))
})
err = lp.orm.InsertLogs(convertLogs(gethLogs, blocks, lp.lggr, lp.ec.ConfiguredChainID()), pg.WithParentCtx(ctx))
if err != nil {
lp.lggr.Warnw("Unable to insert logs, retrying", "err", err, "from", from, "to", to)
return err
Expand Down Expand Up @@ -747,21 +745,7 @@ func (lp *logPoller) getCurrentBlockMaybeHandleReorg(ctx context.Context, curren
// the canonical set per read. Typically, if an application took action on a log
// it would be saved elsewhere e.g. evm.txes, so it seems better to just support the fast reads.
// Its also nicely analogous to reading from the chain itself.
err2 = lp.orm.Q().WithOpts(pg.WithParentCtx(ctx)).Transaction(func(tx pg.Queryer) error {
// These deletes are bounded by reorg depth, so they are
// fast and should not slow down the log readers.
err3 := lp.orm.DeleteBlocksAfter(blockAfterLCA.Number, pg.WithQueryer(tx))
if err3 != nil {
lp.lggr.Warnw("Unable to clear reorged blocks, retrying", "err", err3)
return err3
}
err3 = lp.orm.DeleteLogsAfter(blockAfterLCA.Number, pg.WithQueryer(tx))
if err3 != nil {
lp.lggr.Warnw("Unable to clear reorged logs, retrying", "err", err3)
return err3
}
return nil
})
err2 = lp.orm.DeleteLogsAndBlocksAfter(blockAfterLCA.Number, pg.WithParentCtx(ctx))
if err2 != nil {
// If we error on db commit, we can't know if the tx went through or not.
// We return an error here which will cause us to restart polling from lastBlockSaved + 1
Expand Down Expand Up @@ -846,20 +830,11 @@ func (lp *logPoller) PollAndSaveLogs(ctx context.Context, currentBlockNumber int
return
}
lp.lggr.Debugw("Unfinalized log query", "logs", len(logs), "currentBlockNumber", currentBlockNumber, "blockHash", currentBlock.Hash, "timestamp", currentBlock.Timestamp.Unix())
err = lp.orm.Q().WithOpts(pg.WithParentCtx(ctx)).Transaction(func(tx pg.Queryer) error {
if err2 := lp.orm.InsertBlock(h, currentBlockNumber, currentBlock.Timestamp, latestFinalizedBlockNumber, pg.WithQueryer(tx)); err2 != nil {
return err2
}
if len(logs) == 0 {
return nil
}
return lp.orm.InsertLogs(convertLogs(logs,
[]LogPollerBlock{{BlockNumber: currentBlockNumber,
BlockTimestamp: currentBlock.Timestamp}},
lp.lggr,
lp.ec.ConfiguredChainID(),
), pg.WithQueryer(tx))
})
block := NewLogPollerBlock(h, currentBlockNumber, currentBlock.Timestamp, latestFinalizedBlockNumber)
err = lp.orm.InsertLogsWithBlock(
convertLogs(logs, []LogPollerBlock{block}, lp.lggr, lp.ec.ConfiguredChainID()),
block,
)
if err != nil {
lp.lggr.Warnw("Unable to save logs resuming from last saved block + 1", "err", err, "block", currentBlockNumber)
return
Expand Down Expand Up @@ -1018,13 +993,13 @@ func (lp *logPoller) IndexedLogsTopicRange(eventSig common.Hash, address common.

// LatestBlock returns the latest block the log poller is on. It tracks blocks to be able
// to detect reorgs.
func (lp *logPoller) LatestBlock(qopts ...pg.QOpt) (int64, error) {
func (lp *logPoller) LatestBlock(qopts ...pg.QOpt) (LogPollerBlock, error) {
b, err := lp.orm.SelectLatestBlock(qopts...)
if err != nil {
return 0, err
return LogPollerBlock{}, err
}

return b.BlockNumber, nil
return *b, nil
}

func (lp *logPoller) BlockByNumber(n int64, qopts ...pg.QOpt) (*LogPollerBlock, error) {
Expand Down
2 changes: 1 addition & 1 deletion core/chains/evm/logpoller/log_poller_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func TestLogPoller_Replay(t *testing.T) {
lp.PollAndSaveLogs(tctx, 4)
latest, err := lp.LatestBlock()
require.NoError(t, err)
require.Equal(t, int64(4), latest)
require.Equal(t, int64(4), latest.BlockNumber)

t.Run("abort before replayStart received", func(t *testing.T) {
// Replay() should abort immediately if caller's context is cancelled before request signal is read
Expand Down
39 changes: 25 additions & 14 deletions core/chains/evm/logpoller/log_poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,8 @@ func Test_BackupLogPoller(t *testing.T) {
body.Transactions = types.Transactions{} // number of tx's must match # of logs for GetLogs() to succeed
rawdb.WriteBody(th.EthDB, h.Hash(), h.Number.Uint64(), body)

currentBlock := th.PollAndSaveLogs(ctx, 1)
assert.Equal(t, int64(35), currentBlock)
currentBlockNumber := th.PollAndSaveLogs(ctx, 1)
assert.Equal(t, int64(35), currentBlockNumber)

// simulate logs becoming available
rawdb.WriteReceipts(th.EthDB, h.Hash(), h.Number.Uint64(), receipts)
Expand Down Expand Up @@ -342,12 +342,12 @@ func Test_BackupLogPoller(t *testing.T) {
markBlockAsFinalized(t, th, 34)

// Run ordinary poller + backup poller at least once
currentBlock, _ = th.LogPoller.LatestBlock(pg.WithParentCtx(testutils.Context(t)))
th.LogPoller.PollAndSaveLogs(ctx, currentBlock+1)
currentBlock, _ := th.LogPoller.LatestBlock(pg.WithParentCtx(testutils.Context(t)))
th.LogPoller.PollAndSaveLogs(ctx, currentBlock.BlockNumber+1)
th.LogPoller.BackupPollAndSaveLogs(ctx, 100)
currentBlock, _ = th.LogPoller.LatestBlock(pg.WithParentCtx(testutils.Context(t)))

require.Equal(t, int64(37), currentBlock+1)
require.Equal(t, int64(37), currentBlock.BlockNumber+1)

// logs still shouldn't show up, because we don't want to backfill the last finalized log
// to help with reorg detection
Expand All @@ -359,11 +359,11 @@ func Test_BackupLogPoller(t *testing.T) {
markBlockAsFinalized(t, th, 35)

// Run ordinary poller + backup poller at least once more
th.LogPoller.PollAndSaveLogs(ctx, currentBlock+1)
th.LogPoller.PollAndSaveLogs(ctx, currentBlockNumber+1)
th.LogPoller.BackupPollAndSaveLogs(ctx, 100)
currentBlock, _ = th.LogPoller.LatestBlock(pg.WithParentCtx(testutils.Context(t)))

require.Equal(t, int64(38), currentBlock+1)
require.Equal(t, int64(38), currentBlock.BlockNumber+1)

// all 3 logs in block 34 should show up now, thanks to backup logger
logs, err = th.LogPoller.Logs(30, 37, EmitterABI.Events["Log1"].ID, th.EmitterAddress1,
Expand Down Expand Up @@ -471,6 +471,13 @@ func TestLogPoller_BackupPollAndSaveLogsWithDeepBlockDelay(t *testing.T) {
// 1 -> 2 -> ...
th.PollAndSaveLogs(ctx, 1)

// Check that latest block has the same properties as the head
latestBlock, err := th.LogPoller.LatestBlock()
require.NoError(t, err)
assert.Equal(t, latestBlock.BlockNumber, header.Number.Int64())
assert.Equal(t, latestBlock.FinalizedBlockNumber, header.Number.Int64())
assert.Equal(t, latestBlock.BlockHash, header.Hash())

// Register filter
err = th.LogPoller.RegisterFilter(logpoller.Filter{
Name: "Test Emitter",
Expand Down Expand Up @@ -619,7 +626,7 @@ func TestLogPoller_BlockTimestamps(t *testing.T) {
require.Len(t, gethLogs, 2)

lb, _ := th.LogPoller.LatestBlock(pg.WithParentCtx(testutils.Context(t)))
th.PollAndSaveLogs(context.Background(), lb+1)
th.PollAndSaveLogs(context.Background(), lb.BlockNumber+1)
lg1, err := th.LogPoller.Logs(0, 20, EmitterABI.Events["Log1"].ID, th.EmitterAddress1,
pg.WithParentCtx(testutils.Context(t)))
require.NoError(t, err)
Expand Down Expand Up @@ -667,9 +674,9 @@ func TestLogPoller_SynchronizedWithGeth(t *testing.T) {
for i := 0; i < finalityDepth; i++ { // Have enough blocks that we could reorg the full finalityDepth-1.
ec.Commit()
}
currentBlock := int64(1)
lp.PollAndSaveLogs(testutils.Context(t), currentBlock)
currentBlock, err = lp.LatestBlock(pg.WithParentCtx(testutils.Context(t)))
currentBlockNumber := int64(1)
lp.PollAndSaveLogs(testutils.Context(t), currentBlockNumber)
currentBlock, err := lp.LatestBlock(pg.WithParentCtx(testutils.Context(t)))
require.NoError(t, err)
matchesGeth := func() bool {
// Check every block is identical
Expand Down Expand Up @@ -719,7 +726,7 @@ func TestLogPoller_SynchronizedWithGeth(t *testing.T) {
require.NoError(t, err1)
t.Logf("New latest (%v, %x), latest parent %x)\n", latest.NumberU64(), latest.Hash(), latest.ParentHash())
}
lp.PollAndSaveLogs(testutils.Context(t), currentBlock)
lp.PollAndSaveLogs(testutils.Context(t), currentBlock.BlockNumber)
currentBlock, err = lp.LatestBlock(pg.WithParentCtx(testutils.Context(t)))
require.NoError(t, err)
}
Expand Down Expand Up @@ -1245,7 +1252,7 @@ func TestGetReplayFromBlock(t *testing.T) {
require.NoError(t, err)
latest, err := th.LogPoller.LatestBlock(pg.WithParentCtx(testutils.Context(t)))
require.NoError(t, err)
assert.Equal(t, latest, fromBlock)
assert.Equal(t, latest.BlockNumber, fromBlock)

// Should take min(latest, requested) in this case requested.
requested = int64(7)
Expand Down Expand Up @@ -1551,6 +1558,10 @@ func Test_PollAndSavePersistsFinalityInBlocks(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
th := SetupTH(t, tt.useFinalityTag, tt.finalityDepth, 3, 2, 1000)
// Should return error before the first poll and save
_, err := th.LogPoller.LatestBlock()
require.Error(t, err)

// Mark first block as finalized
h := th.Client.Blockchain().CurrentHeader()
th.Client.Blockchain().SetFinalized(h)
Expand All @@ -1562,7 +1573,7 @@ func Test_PollAndSavePersistsFinalityInBlocks(t *testing.T) {

th.PollAndSaveLogs(ctx, 1)

latestBlock, err := th.ORM.SelectLatestBlock()
latestBlock, err := th.LogPoller.LatestBlock()
require.NoError(t, err)
require.Equal(t, int64(numberOfBlocks), latestBlock.BlockNumber)
require.Equal(t, tt.expectedFinalizedBlock, latestBlock.FinalizedBlockNumber)
Expand Down
10 changes: 5 additions & 5 deletions core/chains/evm/logpoller/mocks/log_poller.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions core/chains/evm/logpoller/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,12 @@ func (l *Log) ToGethLog() types.Log {
Index: uint(l.LogIndex),
}
}

func NewLogPollerBlock(blockHash common.Hash, blockNumber int64, timestamp time.Time, finalizedBlockNumber int64) LogPollerBlock {
return LogPollerBlock{
BlockHash: blockHash,
BlockNumber: blockNumber,
BlockTimestamp: timestamp,
FinalizedBlockNumber: finalizedBlockNumber,
}
}
22 changes: 6 additions & 16 deletions core/chains/evm/logpoller/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,19 +68,15 @@ func NewObservedORM(chainID *big.Int, db *sqlx.DB, lggr logger.Logger, cfg pg.QC
}
}

func (o *ObservedORM) Q() pg.Q {
return o.ORM.Q()
}

func (o *ObservedORM) InsertLogs(logs []Log, qopts ...pg.QOpt) error {
return withObservedExec(o, "InsertLogs", func() error {
return o.ORM.InsertLogs(logs, qopts...)
})
}

func (o *ObservedORM) InsertBlock(hash common.Hash, blockNumber int64, blockTimestamp time.Time, lastFinalizedBlock int64, qopts ...pg.QOpt) error {
return withObservedExec(o, "InsertBlock", func() error {
return o.ORM.InsertBlock(hash, blockNumber, blockTimestamp, lastFinalizedBlock, qopts...)
func (o *ObservedORM) InsertLogsWithBlock(logs []Log, block LogPollerBlock, qopts ...pg.QOpt) error {
return withObservedExec(o, "InsertLogsWithBlock", func() error {
return o.ORM.InsertLogsWithBlock(logs, block, qopts...)
})
}

Expand All @@ -102,21 +98,15 @@ func (o *ObservedORM) DeleteFilter(name string, qopts ...pg.QOpt) error {
})
}

func (o *ObservedORM) DeleteBlocksAfter(start int64, qopts ...pg.QOpt) error {
return withObservedExec(o, "DeleteBlocksAfter", func() error {
return o.ORM.DeleteBlocksAfter(start, qopts...)
})
}

func (o *ObservedORM) DeleteBlocksBefore(end int64, qopts ...pg.QOpt) error {
return withObservedExec(o, "DeleteBlocksBefore", func() error {
return o.ORM.DeleteBlocksBefore(end, qopts...)
})
}

func (o *ObservedORM) DeleteLogsAfter(start int64, qopts ...pg.QOpt) error {
return withObservedExec(o, "DeleteLogsAfter", func() error {
return o.ORM.DeleteLogsAfter(start, qopts...)
func (o *ObservedORM) DeleteLogsAndBlocksAfter(start int64, qopts ...pg.QOpt) error {
return withObservedExec(o, "DeleteLogsAndBlocksAfter", func() error {
return o.ORM.DeleteLogsAndBlocksAfter(start, qopts...)
})
}

Expand Down
2 changes: 1 addition & 1 deletion core/chains/evm/logpoller/observability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestMultipleMetricsArePublished(t *testing.T) {
_, _ = orm.SelectLatestLogEventSigsAddrsWithConfs(0, []common.Address{{}}, []common.Hash{{}}, 1, pg.WithParentCtx(ctx))
_, _ = orm.SelectIndexedLogsCreatedAfter(common.Address{}, common.Hash{}, 1, []common.Hash{}, time.Now(), 0, pg.WithParentCtx(ctx))
_ = orm.InsertLogs([]Log{}, pg.WithParentCtx(ctx))
_ = orm.InsertBlock(common.Hash{}, 1, time.Now(), 0, pg.WithParentCtx(ctx))
_ = orm.InsertLogsWithBlock([]Log{}, NewLogPollerBlock(common.Hash{}, 1, time.Now(), 0), pg.WithParentCtx(ctx))

require.Equal(t, 13, testutil.CollectAndCount(orm.queryDuration))
require.Equal(t, 10, testutil.CollectAndCount(orm.datasetSize))
Expand Down
Loading
Loading