From fca52f9d67c3c9944750ab0ecec69b02a8a51e47 Mon Sep 17 00:00:00 2001 From: Domino Valdano <2644901+reductionista@users.noreply.github.com> Date: Wed, 2 Oct 2024 20:05:04 -0700 Subject: [PATCH] Refactor ExecPagedQuery into method of generic type RangedQuery[T] --- core/chains/evm/logpoller/orm.go | 95 ++++++++++++++++----------- core/chains/evm/logpoller/orm_test.go | 16 +++-- 2 files changed, 64 insertions(+), 47 deletions(-) diff --git a/core/chains/evm/logpoller/orm.go b/core/chains/evm/logpoller/orm.go index d851670cdd..d6c4ebe327 100644 --- a/core/chains/evm/logpoller/orm.go +++ b/core/chains/evm/logpoller/orm.go @@ -295,20 +295,36 @@ func (o *DSORM) SelectLatestLogByEventSigWithConfs(ctx context.Context, eventSig return &l, nil } +type RangeQueryer[T comparable] struct { + chainID *ubig.Big + ds sqlutil.DataSource + query func(ctx context.Context, r *RangeQueryer[T], lower, upper int64) (rowsAffected int64, err error) + acc []T +} + +func NewRangeQueryer[T comparable](evmChainID *big.Int, ds sqlutil.DataSource, query func(ctx context.Context, r *RangeQueryer[T], lower, upper int64) (rowsAffected int64, err error)) *RangeQueryer[T] { + return &RangeQueryer[T]{ + chainID: ubig.New(evmChainID), + ds: ds, + query: query, + } +} + // ExecPagedQuery runs a query accepting an upper limit block (end) in a fast paged way. limit is the maximum number // of results to be returned, but it is also used to break the query up into smaller queries restricted to limit # of blocks. // The first range of blocks will be from MIN(block_number) to MIN(block_number) + limit. The iterative process ends either once -// the limit on results is reached or block_number = end. The query will never be exeucted on blocks where -// block_number > end, and it will never be executed on block_number = B unless it has also been executed on all -// blocks with block_number < B -func (o *DSORM) ExecPagedQuery(ctx context.Context, limit int64, end int64, query func(lower, upper int64) (int64, error)) (numResults int64, err error) { +// the limit on results is reached or block_number = end. The query will never be executed on blocks where block_number > end, and +// it will never be executed on block_number = B unless it has also been executed on all blocks with block_number < B +// r.AddResults(moreResults []T) should be called if this is a query returning results (ie, SELECT). These will be accumulated in +// r.acc and can be retrieved later with r.AllResults() +func (r *RangeQueryer[T]) ExecPagedQuery(ctx context.Context, limit, end int64) (rowsAffected int64, err error) { if limit == 0 { - return query(0, end) + return r.query(ctx, r, 0, end) } var start int64 - err = o.ds.GetContext(ctx, &start, `SELECT MIN(block_number) FROM evm.log_poller_blocks - WHERE evm_chain_id = $1`, ubig.New(o.chainID)) + err = r.ds.GetContext(ctx, &start, `SELECT MIN(block_number) FROM evm.log_poller_blocks + WHERE evm_chain_id = $1`, r.chainID) if err != nil { if errors.Is(err, sql.ErrNoRows) { return 0, nil @@ -318,36 +334,45 @@ func (o *DSORM) ExecPagedQuery(ctx context.Context, limit int64, end int64, quer // Remove up to limit blocks at a time, until we've reached the limit or removed everything eligible for deletion var upper int64 - for lower := start; numResults < limit; lower = upper + 1 { + for lower := start; rowsAffected < limit; lower = upper + 1 { upper = lower + limit - 1 if upper > end { upper = end } - rows, err2 := query(lower, upper) + + rows, err2 := r.query(ctx, r, lower, upper) if err2 != nil { - return numResults, err + return rowsAffected, err2 } - - numResults += rows + rowsAffected += rows if upper == end { break } } - return numResults, nil + return rowsAffected, nil +} + +func (r *RangeQueryer[T]) AddResults(moreResults []T) { + r.acc = append(r.acc, moreResults...) +} + +func (r *RangeQueryer[T]) AllResults() []T { + return r.acc } // DeleteBlocksBefore delete blocks before and including end. When limit is set, it will delete at most limit blocks. // Otherwise, it will delete all blocks at once. func (o *DSORM) DeleteBlocksBefore(ctx context.Context, end int64, limit int64) (int64, error) { - return o.ExecPagedQuery(ctx, limit, end, func(lower, upper int64) (int64, error) { - result, err := o.ds.ExecContext(ctx, `DELETE FROM evm.log_poller_blocks WHERE evm_chain_id = $1 AND block_number >= $2 AND block_number <= $3`, - ubig.New(o.chainID), lower, upper) + q := NewRangeQueryer[uint64](o.chainID, o.ds, func(ctx context.Context, r *RangeQueryer[uint64], lower, upper int64) (int64, error) { + result, err := r.ds.ExecContext(ctx, `DELETE FROM evm.log_poller_blocks WHERE evm_chain_id = $1 AND block_number >= $2 AND block_number <= $3`, + r.chainID, lower, upper) if err != nil { return 0, err } return result.RowsAffected() }) + return q.ExecPagedQuery(ctx, limit, end) } func (o *DSORM) DeleteLogsAndBlocksAfter(ctx context.Context, start int64) error { @@ -411,21 +436,19 @@ func (o *DSORM) SelectUnmatchedLogIDs(ctx context.Context, limit int64) (ids []u return ids, err } - _, err = o.ExecPagedQuery(ctx, limit, latestBlock.FinalizedBlockNumber, func(lower, upper int64) (numResults int64, err2 error) { + r := NewRangeQueryer[uint64](o.chainID, o.ds, func(ctx context.Context, r *RangeQueryer[uint64], lower, upper int64) (int64, error) { var rowIDs []uint64 - err2 = o.ds.SelectContext(ctx, &rowIDs, query, ubig.New(o.chainID), lower, upper) - // dedupe rowIDs before appending them to results - m := make(map[uint64]bool) - for _, id := range rowIDs { - if _, val := m[id]; !val { - m[id] = true - ids = append(ids, id) - } + err2 := r.ds.SelectContext(ctx, &rowIDs, query, r.chainID, lower, upper) + if err2 != nil { + return 0, err2 } - return int64(len(rowIDs)), err2 + r.AddResults(rowIDs) + return int64(len(rowIDs)), nil }) - return ids, err + _, err = r.ExecPagedQuery(ctx, limit, latestBlock.FinalizedBlockNumber) + + return r.AllResults(), err } // SelectExcessLogIDs finds any logs old enough that MaxLogsKept has been exceeded for every filter they match. @@ -461,26 +484,18 @@ func (o *DSORM) SelectExcessLogIDs(ctx context.Context, limit int64) (results [] return results, err } - _, err = o.ExecPagedQuery(ctx, limit, latestBlock.FinalizedBlockNumber, func(lower, upper int64) (int64, error) { + r := NewRangeQueryer[uint64](o.chainID, o.ds, func(ctx context.Context, r *RangeQueryer[uint64], lower, upper int64) (int64, error) { var rowIDs []uint64 - err = o.ds.SelectContext(ctx, &rowIDs, query, ubig.New(o.chainID), lower, upper) + err = r.ds.SelectContext(ctx, &rowIDs, query, r.chainID, lower, upper) if err != nil { return 0, err } - - // dedupe rowIDs before appending them to results - m := make(map[uint64]bool) - for _, id := range rowIDs { - if _, val := m[id]; !val { - m[id] = true - results = append(results, id) - } - } - + r.AddResults(rowIDs) return int64(len(rowIDs)), err }) + _, err = r.ExecPagedQuery(ctx, limit, latestBlock.FinalizedBlockNumber) - return results, err + return r.AllResults(), err } // DeleteExpiredLogs removes any logs which either: diff --git a/core/chains/evm/logpoller/orm_test.go b/core/chains/evm/logpoller/orm_test.go index ba5b16d697..6e618ba9ce 100644 --- a/core/chains/evm/logpoller/orm_test.go +++ b/core/chains/evm/logpoller/orm_test.go @@ -2,6 +2,7 @@ package logpoller_test import ( "bytes" + "context" "database/sql" "errors" "fmt" @@ -1315,12 +1316,12 @@ type mockQueryExecutor struct { mock.Mock } -func (m *mockQueryExecutor) Exec(lower, upper int64) (int64, error) { +func (m *mockQueryExecutor) Exec(ctx context.Context, r *logpoller.RangeQueryer[uint64], lower, upper int64) (int64, error) { res := m.Called(lower, upper) return int64(res.Int(0)), res.Error(1) } -func TestORM_ExecPagedQuery(t *testing.T) { +func Test_ExecPagedQuery(t *testing.T) { t.Parallel() ctx := testutils.Context(t) lggr := logger.Test(t) @@ -1334,18 +1335,19 @@ func TestORM_ExecPagedQuery(t *testing.T) { m.On("Exec", int64(0), int64(0)).Return(0, queryError).Once() // Should handle errors gracefully - _, err := o.ExecPagedQuery(ctx, 0, 0, m.Exec) + r := logpoller.NewRangeQueryer(chainID, db, m.Exec) + _, err := r.ExecPagedQuery(ctx, 0, 0) assert.ErrorIs(t, err, queryError) m.On("Exec", int64(0), int64(60)).Return(4, nil).Once() // Query should only get executed once with limitBlock=end if called with limit=0 - numResults, err := o.ExecPagedQuery(ctx, 0, 60, m.Exec) + numResults, err := r.ExecPagedQuery(ctx, 0, 60) require.NoError(t, err) assert.Equal(t, int64(4), numResults) // Should report actual db errors - _, err = o.ExecPagedQuery(ctx, 300, 1000, m.Exec) + _, err = r.ExecPagedQuery(ctx, 300, 1000) assert.Error(t, err) require.NoError(t, o.InsertBlock(ctx, common.HexToHash("0x1234"), 42, time.Now(), 0)) @@ -1353,7 +1355,7 @@ func TestORM_ExecPagedQuery(t *testing.T) { m.On("Exec", mock.Anything, mock.Anything).Return(3, nil) // Should get called with limitBlock = 342, 642, 942, 1000 - numResults, err = o.ExecPagedQuery(ctx, 300, 1000, m.Exec) + numResults, err = r.ExecPagedQuery(ctx, 300, 1000) require.NoError(t, err) assert.Equal(t, int64(12), numResults) // 3 results in each of 4 calls m.AssertNumberOfCalls(t, "Exec", 6) // 4 new calls, plus the prior 2 @@ -1364,7 +1366,7 @@ func TestORM_ExecPagedQuery(t *testing.T) { // Should not go all the way to 1000, but stop after ~ 13 results have // been returned - numResults, err = o.ExecPagedQuery(ctx, 15, 1000, m.Exec) + numResults, err = r.ExecPagedQuery(ctx, 15, 1000) require.NoError(t, err) assert.Equal(t, int64(15), numResults) m.AssertNumberOfCalls(t, "Exec", 11)