Skip to content

Commit

Permalink
Refactor ExecPagedQuery into method of generic type RangedQuery[T]
Browse files Browse the repository at this point in the history
  • Loading branch information
reductionista committed Oct 8, 2024
1 parent 45d98f3 commit fca52f9
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 47 deletions.
95 changes: 55 additions & 40 deletions core/chains/evm/logpoller/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,20 +295,36 @@ func (o *DSORM) SelectLatestLogByEventSigWithConfs(ctx context.Context, eventSig
return &l, nil
}

type RangeQueryer[T comparable] struct {
chainID *ubig.Big
ds sqlutil.DataSource
query func(ctx context.Context, r *RangeQueryer[T], lower, upper int64) (rowsAffected int64, err error)
acc []T
}

func NewRangeQueryer[T comparable](evmChainID *big.Int, ds sqlutil.DataSource, query func(ctx context.Context, r *RangeQueryer[T], lower, upper int64) (rowsAffected int64, err error)) *RangeQueryer[T] {
return &RangeQueryer[T]{
chainID: ubig.New(evmChainID),
ds: ds,
query: query,
}
}

// ExecPagedQuery runs a query accepting an upper limit block (end) in a fast paged way. limit is the maximum number
// of results to be returned, but it is also used to break the query up into smaller queries restricted to limit # of blocks.
// The first range of blocks will be from MIN(block_number) to MIN(block_number) + limit. The iterative process ends either once
// the limit on results is reached or block_number = end. The query will never be exeucted on blocks where
// block_number > end, and it will never be executed on block_number = B unless it has also been executed on all
// blocks with block_number < B
func (o *DSORM) ExecPagedQuery(ctx context.Context, limit int64, end int64, query func(lower, upper int64) (int64, error)) (numResults int64, err error) {
// the limit on results is reached or block_number = end. The query will never be executed on blocks where block_number > end, and
// it will never be executed on block_number = B unless it has also been executed on all blocks with block_number < B
// r.AddResults(moreResults []T) should be called if this is a query returning results (ie, SELECT). These will be accumulated in
// r.acc and can be retrieved later with r.AllResults()
func (r *RangeQueryer[T]) ExecPagedQuery(ctx context.Context, limit, end int64) (rowsAffected int64, err error) {
if limit == 0 {
return query(0, end)
return r.query(ctx, r, 0, end)
}

var start int64
err = o.ds.GetContext(ctx, &start, `SELECT MIN(block_number) FROM evm.log_poller_blocks
WHERE evm_chain_id = $1`, ubig.New(o.chainID))
err = r.ds.GetContext(ctx, &start, `SELECT MIN(block_number) FROM evm.log_poller_blocks
WHERE evm_chain_id = $1`, r.chainID)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return 0, nil
Expand All @@ -318,36 +334,45 @@ func (o *DSORM) ExecPagedQuery(ctx context.Context, limit int64, end int64, quer

// Remove up to limit blocks at a time, until we've reached the limit or removed everything eligible for deletion
var upper int64
for lower := start; numResults < limit; lower = upper + 1 {
for lower := start; rowsAffected < limit; lower = upper + 1 {
upper = lower + limit - 1
if upper > end {
upper = end
}
rows, err2 := query(lower, upper)

rows, err2 := r.query(ctx, r, lower, upper)
if err2 != nil {
return numResults, err
return rowsAffected, err2
}

numResults += rows
rowsAffected += rows

if upper == end {
break
}
}
return numResults, nil
return rowsAffected, nil
}

func (r *RangeQueryer[T]) AddResults(moreResults []T) {
r.acc = append(r.acc, moreResults...)
}

func (r *RangeQueryer[T]) AllResults() []T {
return r.acc
}

// DeleteBlocksBefore delete blocks before and including end. When limit is set, it will delete at most limit blocks.
// Otherwise, it will delete all blocks at once.
func (o *DSORM) DeleteBlocksBefore(ctx context.Context, end int64, limit int64) (int64, error) {
return o.ExecPagedQuery(ctx, limit, end, func(lower, upper int64) (int64, error) {
result, err := o.ds.ExecContext(ctx, `DELETE FROM evm.log_poller_blocks WHERE evm_chain_id = $1 AND block_number >= $2 AND block_number <= $3`,
ubig.New(o.chainID), lower, upper)
q := NewRangeQueryer[uint64](o.chainID, o.ds, func(ctx context.Context, r *RangeQueryer[uint64], lower, upper int64) (int64, error) {
result, err := r.ds.ExecContext(ctx, `DELETE FROM evm.log_poller_blocks WHERE evm_chain_id = $1 AND block_number >= $2 AND block_number <= $3`,
r.chainID, lower, upper)
if err != nil {
return 0, err
}
return result.RowsAffected()
})
return q.ExecPagedQuery(ctx, limit, end)
}

func (o *DSORM) DeleteLogsAndBlocksAfter(ctx context.Context, start int64) error {
Expand Down Expand Up @@ -411,21 +436,19 @@ func (o *DSORM) SelectUnmatchedLogIDs(ctx context.Context, limit int64) (ids []u
return ids, err
}

_, err = o.ExecPagedQuery(ctx, limit, latestBlock.FinalizedBlockNumber, func(lower, upper int64) (numResults int64, err2 error) {
r := NewRangeQueryer[uint64](o.chainID, o.ds, func(ctx context.Context, r *RangeQueryer[uint64], lower, upper int64) (int64, error) {
var rowIDs []uint64
err2 = o.ds.SelectContext(ctx, &rowIDs, query, ubig.New(o.chainID), lower, upper)
// dedupe rowIDs before appending them to results
m := make(map[uint64]bool)
for _, id := range rowIDs {
if _, val := m[id]; !val {
m[id] = true
ids = append(ids, id)
}
err2 := r.ds.SelectContext(ctx, &rowIDs, query, r.chainID, lower, upper)
if err2 != nil {
return 0, err2
}
return int64(len(rowIDs)), err2
r.AddResults(rowIDs)
return int64(len(rowIDs)), nil
})

return ids, err
_, err = r.ExecPagedQuery(ctx, limit, latestBlock.FinalizedBlockNumber)

return r.AllResults(), err
}

// SelectExcessLogIDs finds any logs old enough that MaxLogsKept has been exceeded for every filter they match.
Expand Down Expand Up @@ -461,26 +484,18 @@ func (o *DSORM) SelectExcessLogIDs(ctx context.Context, limit int64) (results []
return results, err
}

_, err = o.ExecPagedQuery(ctx, limit, latestBlock.FinalizedBlockNumber, func(lower, upper int64) (int64, error) {
r := NewRangeQueryer[uint64](o.chainID, o.ds, func(ctx context.Context, r *RangeQueryer[uint64], lower, upper int64) (int64, error) {
var rowIDs []uint64
err = o.ds.SelectContext(ctx, &rowIDs, query, ubig.New(o.chainID), lower, upper)
err = r.ds.SelectContext(ctx, &rowIDs, query, r.chainID, lower, upper)
if err != nil {
return 0, err
}

// dedupe rowIDs before appending them to results
m := make(map[uint64]bool)
for _, id := range rowIDs {
if _, val := m[id]; !val {
m[id] = true
results = append(results, id)
}
}

r.AddResults(rowIDs)
return int64(len(rowIDs)), err
})
_, err = r.ExecPagedQuery(ctx, limit, latestBlock.FinalizedBlockNumber)

return results, err
return r.AllResults(), err
}

// DeleteExpiredLogs removes any logs which either:
Expand Down
16 changes: 9 additions & 7 deletions core/chains/evm/logpoller/orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package logpoller_test

import (
"bytes"
"context"
"database/sql"
"errors"
"fmt"
Expand Down Expand Up @@ -1315,12 +1316,12 @@ type mockQueryExecutor struct {
mock.Mock
}

func (m *mockQueryExecutor) Exec(lower, upper int64) (int64, error) {
func (m *mockQueryExecutor) Exec(ctx context.Context, r *logpoller.RangeQueryer[uint64], lower, upper int64) (int64, error) {
res := m.Called(lower, upper)
return int64(res.Int(0)), res.Error(1)
}

func TestORM_ExecPagedQuery(t *testing.T) {
func Test_ExecPagedQuery(t *testing.T) {
t.Parallel()
ctx := testutils.Context(t)
lggr := logger.Test(t)
Expand All @@ -1334,26 +1335,27 @@ func TestORM_ExecPagedQuery(t *testing.T) {
m.On("Exec", int64(0), int64(0)).Return(0, queryError).Once()

// Should handle errors gracefully
_, err := o.ExecPagedQuery(ctx, 0, 0, m.Exec)
r := logpoller.NewRangeQueryer(chainID, db, m.Exec)
_, err := r.ExecPagedQuery(ctx, 0, 0)
assert.ErrorIs(t, err, queryError)

m.On("Exec", int64(0), int64(60)).Return(4, nil).Once()

// Query should only get executed once with limitBlock=end if called with limit=0
numResults, err := o.ExecPagedQuery(ctx, 0, 60, m.Exec)
numResults, err := r.ExecPagedQuery(ctx, 0, 60)
require.NoError(t, err)
assert.Equal(t, int64(4), numResults)

// Should report actual db errors
_, err = o.ExecPagedQuery(ctx, 300, 1000, m.Exec)
_, err = r.ExecPagedQuery(ctx, 300, 1000)
assert.Error(t, err)

require.NoError(t, o.InsertBlock(ctx, common.HexToHash("0x1234"), 42, time.Now(), 0))

m.On("Exec", mock.Anything, mock.Anything).Return(3, nil)

// Should get called with limitBlock = 342, 642, 942, 1000
numResults, err = o.ExecPagedQuery(ctx, 300, 1000, m.Exec)
numResults, err = r.ExecPagedQuery(ctx, 300, 1000)
require.NoError(t, err)
assert.Equal(t, int64(12), numResults) // 3 results in each of 4 calls
m.AssertNumberOfCalls(t, "Exec", 6) // 4 new calls, plus the prior 2
Expand All @@ -1364,7 +1366,7 @@ func TestORM_ExecPagedQuery(t *testing.T) {

// Should not go all the way to 1000, but stop after ~ 13 results have
// been returned
numResults, err = o.ExecPagedQuery(ctx, 15, 1000, m.Exec)
numResults, err = r.ExecPagedQuery(ctx, 15, 1000)
require.NoError(t, err)
assert.Equal(t, int64(15), numResults)
m.AssertNumberOfCalls(t, "Exec", 11)
Expand Down

0 comments on commit fca52f9

Please sign in to comment.