Skip to content

Commit

Permalink
Iterate over upkeeps based on iterations derived from min log and max…
Browse files Browse the repository at this point in the history
… results

Add logging
Add tests
Clean up linting
  • Loading branch information
ferglor committed Jun 4, 2024
1 parent d2cd916 commit 58c374e
Show file tree
Hide file tree
Showing 6 changed files with 662 additions and 17 deletions.
4 changes: 4 additions & 0 deletions .changeset/modern-ghosts-hang.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
"chainlink": patch
---
Iterate over upkeeps using an upkeep selector #changed
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type LogBuffer interface {
// It also accepts a function to select upkeeps.
// Returns logs (associated to upkeeps) and the number of remaining
// logs in that window for the involved upkeeps.
Dequeue(block int64, blockRate, upkeepLimit, maxResults int, upkeepSelector func(id *big.Int) bool) ([]BufferedLog, int)
Dequeue(start, end int64, upkeepLimit, maxResults int, upkeepSelector func(id *big.Int) bool) ([]BufferedLog, int)
// SetConfig sets the buffer size and the maximum number of logs to keep for each upkeep.
SetConfig(lookback, blockRate, logLimit uint32)
// NumOfUpkeeps returns the number of upkeeps that are being tracked by the buffer.
Expand Down Expand Up @@ -81,6 +81,7 @@ type logBuffer struct {
// map for then number of times we have enqueued logs for a block number
enqueuedBlocks map[int64]map[string]int
enqueuedBlockLock sync.RWMutex
queueIDs []string
}

func NewLogBuffer(lggr logger.Logger, lookback, blockRate, logLimit uint32) LogBuffer {
Expand All @@ -89,6 +90,7 @@ func NewLogBuffer(lggr logger.Logger, lookback, blockRate, logLimit uint32) LogB
opts: newLogBufferOptions(lookback, blockRate, logLimit),
lastBlockSeen: new(atomic.Int64),
enqueuedBlocks: map[int64]map[string]int{},
queueIDs: []string{},
queues: make(map[string]*upkeepLogQueue),
}
}
Expand Down Expand Up @@ -167,11 +169,10 @@ func (b *logBuffer) trackBlockNumbersForUpkeep(uid *big.Int, uniqueBlocks map[in

// Dequeue greedly pulls logs from the buffers.
// Returns logs and the number of remaining logs in the buffer.
func (b *logBuffer) Dequeue(block int64, blockRate, upkeepLimit, maxResults int, upkeepSelector func(id *big.Int) bool) ([]BufferedLog, int) {
func (b *logBuffer) Dequeue(start, end int64, upkeepLimit, maxResults int, upkeepSelector func(id *big.Int) bool) ([]BufferedLog, int) {
b.lock.RLock()
defer b.lock.RUnlock()

start, end := getBlockWindow(block, blockRate)
return b.dequeue(start, end, upkeepLimit, maxResults, upkeepSelector)
}

Expand All @@ -183,11 +184,14 @@ func (b *logBuffer) Dequeue(block int64, blockRate, upkeepLimit, maxResults int,
func (b *logBuffer) dequeue(start, end int64, upkeepLimit, capacity int, upkeepSelector func(id *big.Int) bool) ([]BufferedLog, int) {
var result []BufferedLog
var remainingLogs int
for _, q := range b.queues {
var selectedUpkeeps int
numLogs := 0
for _, qid := range b.queueIDs {
q := b.queues[qid]
if !upkeepSelector(q.id) {
// if the upkeep is not selected, skip it
continue
}
selectedUpkeeps++
logsInRange := q.sizeOfRange(start, end)
if logsInRange == 0 {
// if there are no logs in the range, skip the upkeep
Expand All @@ -207,8 +211,10 @@ func (b *logBuffer) dequeue(start, end int64, upkeepLimit, capacity int, upkeepS
result = append(result, BufferedLog{ID: q.id, Log: l})
capacity--
}
numLogs += len(logs)
remainingLogs += remaining
}
b.lggr.Debugw("dequeued logs for upkeeps", "selectedUpkeeps", selectedUpkeeps, "numLogs", numLogs)
return result, remainingLogs
}

Expand All @@ -230,12 +236,18 @@ func (b *logBuffer) SyncFilters(filterStore UpkeepFilterStore) error {
b.lock.Lock()
defer b.lock.Unlock()

for upkeepID := range b.queues {
for _, upkeepID := range b.queueIDs {
uid := new(big.Int)
_, ok := uid.SetString(upkeepID, 10)
if ok && !filterStore.Has(uid) {
// remove upkeep that is not in the filter store
delete(b.queues, upkeepID)
for i, v := range b.queueIDs {
if v == upkeepID {
b.queueIDs = append(b.queueIDs[:i], b.queueIDs[i+1:]...)
break
}
}
}
}

Expand All @@ -254,6 +266,16 @@ func (b *logBuffer) setUpkeepQueue(uid *big.Int, buf *upkeepLogQueue) {
b.lock.Lock()
defer b.lock.Unlock()

found := false
for _, id := range b.queueIDs {
if id == uid.String() {
found = true
break
}
}
if !found {
b.queueIDs = append(b.queueIDs, uid.String())
}
b.queues[uid.String()] = buf
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package logprovider

import (
"fmt"
"math"
"math/big"
"testing"

Expand Down Expand Up @@ -232,13 +234,146 @@ func TestLogEventBufferV1_Dequeue(t *testing.T) {
added, dropped := buf.Enqueue(id, logs...)
require.Equal(t, len(logs), added+dropped)
}
results, remaining := buf.Dequeue(tc.args.block, tc.args.blockRate, tc.args.upkeepLimit, tc.args.maxResults, tc.args.upkeepSelector)
start, end := getBlockWindow(tc.args.block, tc.args.blockRate)
results, remaining := buf.Dequeue(start, end, tc.args.upkeepLimit, tc.args.maxResults, tc.args.upkeepSelector)
require.Equal(t, len(tc.results), len(results))
require.Equal(t, tc.remaining, remaining)
})
}
}

func TestLogEventBufferV1_Dequeue_highLoad(t *testing.T) {
t.Run("Dequeue from a deterministic order of upkeeps", func(t *testing.T) {
lookback := uint32(20)
blockRate := uint32(1)
logLimit := uint32(1)
buf := NewLogBuffer(logger.TestLogger(t), lookback, blockRate, logLimit)

upkeepIDs := []*big.Int{
big.NewInt(1),
big.NewInt(2),
big.NewInt(3),
big.NewInt(4),
big.NewInt(5),
}

numUpkeeps := len(upkeepIDs)

blockNumbers := []int64{
100, 101, 102, 103, 104, 105, 106, 107, 108, 109,
}

// for each upkeep, enqueue 10 logs per block, for 10 blocks
for _, upkeepID := range upkeepIDs {
for _, blockNumber := range blockNumbers {
for i := 0; i < 10; i++ {
log := logpoller.Log{
BlockNumber: blockNumber,
TxHash: common.HexToHash(fmt.Sprintf("0x%dff%dff%d", blockNumber, upkeepID.Int64(), i)),
}
buf.Enqueue(upkeepID, log)
}
}
}

bufV1 := buf.(*logBuffer)

assert.Equal(t, 5, len(bufV1.queues))

// each queue should have 100 logs
assert.Equal(t, 100, len(bufV1.queues["1"].logs))
assert.Equal(t, 100, len(bufV1.queues["2"].logs))
assert.Equal(t, 100, len(bufV1.queues["3"].logs))
assert.Equal(t, 100, len(bufV1.queues["4"].logs))
assert.Equal(t, 100, len(bufV1.queues["5"].logs))

maxResults := 5
iterations := int(math.Ceil(float64(numUpkeeps*5) / float64(maxResults)))

assert.Equal(t, 5, iterations)

upkeepSelectorFn := func(id *big.Int) bool {
return id.Int64()%int64(iterations) == int64(0) // on this dequeue attempt, current iteration will be 0
}

logs, remaining := buf.Dequeue(100, 101, 5, maxResults, upkeepSelectorFn)

// we should dequeue 5 logs, and the block window should have 15 logs remaining for this selector
assert.Equal(t, 5, len(logs))
assert.Equal(t, 15, remaining)

assert.Equal(t, 100, len(bufV1.queues["1"].logs))
assert.Equal(t, 100, len(bufV1.queues["2"].logs))
assert.Equal(t, 100, len(bufV1.queues["3"].logs))
assert.Equal(t, 100, len(bufV1.queues["4"].logs))
assert.Equal(t, 95, len(bufV1.queues["5"].logs))

upkeepSelectorFn = func(id *big.Int) bool {
return id.Int64()%int64(iterations) == int64(1) // on this dequeue attempt, current iteration will be 1
}

logs, remaining = buf.Dequeue(100, 101, 5, maxResults, upkeepSelectorFn)

// we should dequeue 5 logs, and the block window should have 15 logs remaining for this selector
assert.Equal(t, 5, len(logs))
assert.Equal(t, 15, remaining)

assert.Equal(t, 95, len(bufV1.queues["1"].logs))
assert.Equal(t, 100, len(bufV1.queues["2"].logs))
assert.Equal(t, 100, len(bufV1.queues["3"].logs))
assert.Equal(t, 100, len(bufV1.queues["4"].logs))
assert.Equal(t, 95, len(bufV1.queues["5"].logs))

upkeepSelectorFn = func(id *big.Int) bool {
return id.Int64()%int64(iterations) == int64(2) // on this dequeue attempt, current iteration will be 2
}

logs, remaining = buf.Dequeue(100, 101, 5, maxResults, upkeepSelectorFn)

// we should dequeue 5 logs, and the block window should have 15 logs remaining for this selector
assert.Equal(t, 5, len(logs))
assert.Equal(t, 15, remaining)

assert.Equal(t, 95, len(bufV1.queues["1"].logs))
assert.Equal(t, 95, len(bufV1.queues["2"].logs))
assert.Equal(t, 100, len(bufV1.queues["3"].logs))
assert.Equal(t, 100, len(bufV1.queues["4"].logs))
assert.Equal(t, 95, len(bufV1.queues["5"].logs))

upkeepSelectorFn = func(id *big.Int) bool {
return id.Int64()%int64(iterations) == int64(3) // on this dequeue attempt, current iteration will be 3
}

logs, remaining = buf.Dequeue(100, 101, 5, maxResults, upkeepSelectorFn)

// we should dequeue 5 logs, and the block window should have 15 logs remaining for this selector
assert.Equal(t, 5, len(logs))
assert.Equal(t, 15, remaining)

assert.Equal(t, 95, len(bufV1.queues["1"].logs))
assert.Equal(t, 95, len(bufV1.queues["2"].logs))
assert.Equal(t, 95, len(bufV1.queues["3"].logs))
assert.Equal(t, 100, len(bufV1.queues["4"].logs))
assert.Equal(t, 95, len(bufV1.queues["5"].logs))

upkeepSelectorFn = func(id *big.Int) bool {
return id.Int64()%int64(iterations) == int64(4) // on this dequeue attempt, current iteration will be 4
}

logs, remaining = buf.Dequeue(100, 101, 5, maxResults, upkeepSelectorFn)

// we should dequeue 5 logs, and the block window should have 15 logs remaining for this selector
assert.Equal(t, 5, len(logs))
assert.Equal(t, 15, remaining)

assert.Equal(t, 95, len(bufV1.queues["1"].logs))
assert.Equal(t, 95, len(bufV1.queues["2"].logs))
assert.Equal(t, 95, len(bufV1.queues["3"].logs))
assert.Equal(t, 95, len(bufV1.queues["4"].logs))
assert.Equal(t, 95, len(bufV1.queues["5"].logs))
})
}

func TestLogEventBufferV1_Enqueue(t *testing.T) {
tests := []struct {
name string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"hash"
"io"
"math"
"math/big"
"runtime"
"sync"
Expand Down Expand Up @@ -114,19 +115,24 @@ type logEventProvider struct {
currentPartitionIdx uint64

chainID *big.Int

currentIteration int
calculateIterations bool
iterations int
}

func NewLogProvider(lggr logger.Logger, poller logpoller.LogPoller, chainID *big.Int, packer LogDataPacker, filterStore UpkeepFilterStore, opts LogTriggersOptions) *logEventProvider {
return &logEventProvider{
threadCtrl: utils.NewThreadControl(),
lggr: lggr.Named("KeepersRegistry.LogEventProvider"),
packer: packer,
buffer: newLogEventBuffer(lggr, int(opts.LookbackBlocks), defaultNumOfLogUpkeeps, defaultFastExecLogsHigh),
bufferV1: NewLogBuffer(lggr, uint32(opts.LookbackBlocks), opts.BlockRate, opts.LogLimit),
poller: poller,
opts: opts,
filterStore: filterStore,
chainID: chainID,
threadCtrl: utils.NewThreadControl(),
lggr: lggr.Named("KeepersRegistry.LogEventProvider"),
packer: packer,
buffer: newLogEventBuffer(lggr, int(opts.LookbackBlocks), defaultNumOfLogUpkeeps, defaultFastExecLogsHigh),
bufferV1: NewLogBuffer(lggr, uint32(opts.LookbackBlocks), opts.BlockRate, opts.LogLimit),
poller: poller,
opts: opts,
filterStore: filterStore,
chainID: chainID,
calculateIterations: true,
}
}

Expand All @@ -151,6 +157,7 @@ func (p *logEventProvider) SetConfig(cfg ocr2keepers.LogEventProviderConfig) {

switch p.opts.BufferVersion {
case BufferVersionV1:
p.lggr.With("where", "setConfig").Infow("setting buffer v1 config")
p.bufferV1.SetConfig(uint32(p.opts.LookbackBlocks), blockRate, logLimit)
default:
}
Expand Down Expand Up @@ -288,8 +295,28 @@ func (p *logEventProvider) getLogsFromBuffer(latestBlock int64) []ocr2keepers.Up
case BufferVersionV1:
// in v1, we use a greedy approach - we keep dequeuing logs until we reach the max results or cover the entire range.
blockRate, logLimitLow, maxResults, _ := p.getBufferDequeueArgs()

if p.iterations == p.currentIteration {
p.calculateIterations = true
}

if p.calculateIterations {
p.calculateIterations = false
p.currentIteration = 0
p.iterations = int(math.Ceil(float64(p.bufferV1.NumOfUpkeeps()*logLimitLow) / float64(maxResults)))
if p.iterations == 0 {
p.iterations = 1
}
}

upkeepSelectorFn := func(id *big.Int) bool {
return id.Int64()%int64(p.iterations) == int64(p.currentIteration)
}

for len(payloads) < maxResults && start <= latestBlock {
logs, remaining := p.bufferV1.Dequeue(start, blockRate, logLimitLow, maxResults-len(payloads), DefaultUpkeepSelector)
startWindow, end := getBlockWindow(start, blockRate)

logs, remaining := p.bufferV1.Dequeue(startWindow, end, logLimitLow, maxResults-len(payloads), upkeepSelectorFn)
if len(logs) > 0 {
p.lggr.Debugw("Dequeued logs", "start", start, "latestBlock", latestBlock, "logs", len(logs))
}
Expand All @@ -299,13 +326,16 @@ func (p *logEventProvider) getLogsFromBuffer(latestBlock int64) []ocr2keepers.Up
payloads = append(payloads, payload)
}
}

if remaining > 0 {
p.lggr.Debugw("Remaining logs", "start", start, "latestBlock", latestBlock, "remaining", remaining)
// TODO: handle remaining logs in a better way than consuming the entire window, e.g. do not repeat more than x times
continue
}

start += int64(blockRate)
}
p.currentIteration++
default:
logs := p.buffer.dequeueRange(start, latestBlock, AllowedLogsPerUpkeep, MaxPayloads)
for _, l := range logs {
Expand Down
Loading

0 comments on commit 58c374e

Please sign in to comment.