Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dequeue upkeeps using the current dequeue iteration #12852

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .changeset/modern-ghosts-hang.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
"chainlink": patch
---
Iterate over upkeeps using an upkeep selector #changed
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type LogBuffer interface {
// It also accepts a function to select upkeeps.
// Returns logs (associated to upkeeps) and the number of remaining
// logs in that window for the involved upkeeps.
Dequeue(block int64, blockRate, upkeepLimit, maxResults int, upkeepSelector func(id *big.Int) bool) ([]BufferedLog, int)
Dequeue(start, end int64, upkeepLimit, maxResults int, upkeepSelector func(id *big.Int) bool) ([]BufferedLog, int)
// SetConfig sets the buffer size and the maximum number of logs to keep for each upkeep.
SetConfig(lookback, blockRate, logLimit uint32)
// NumOfUpkeeps returns the number of upkeeps that are being tracked by the buffer.
Expand Down Expand Up @@ -81,6 +81,7 @@ type logBuffer struct {
// map for then number of times we have enqueued logs for a block number
enqueuedBlocks map[int64]map[string]int
enqueuedBlockLock sync.RWMutex
queueIDs []string
}

func NewLogBuffer(lggr logger.Logger, lookback, blockRate, logLimit uint32) LogBuffer {
Expand All @@ -89,6 +90,7 @@ func NewLogBuffer(lggr logger.Logger, lookback, blockRate, logLimit uint32) LogB
opts: newLogBufferOptions(lookback, blockRate, logLimit),
lastBlockSeen: new(atomic.Int64),
enqueuedBlocks: map[int64]map[string]int{},
queueIDs: []string{},
queues: make(map[string]*upkeepLogQueue),
}
}
Expand Down Expand Up @@ -167,11 +169,10 @@ func (b *logBuffer) trackBlockNumbersForUpkeep(uid *big.Int, uniqueBlocks map[in

// Dequeue greedly pulls logs from the buffers.
// Returns logs and the number of remaining logs in the buffer.
func (b *logBuffer) Dequeue(block int64, blockRate, upkeepLimit, maxResults int, upkeepSelector func(id *big.Int) bool) ([]BufferedLog, int) {
func (b *logBuffer) Dequeue(start, end int64, upkeepLimit, maxResults int, upkeepSelector func(id *big.Int) bool) ([]BufferedLog, int) {
b.lock.RLock()
defer b.lock.RUnlock()

start, end := getBlockWindow(block, blockRate)
return b.dequeue(start, end, upkeepLimit, maxResults, upkeepSelector)
}

Expand All @@ -183,17 +184,24 @@ func (b *logBuffer) Dequeue(block int64, blockRate, upkeepLimit, maxResults int,
func (b *logBuffer) dequeue(start, end int64, upkeepLimit, capacity int, upkeepSelector func(id *big.Int) bool) ([]BufferedLog, int) {
var result []BufferedLog
var remainingLogs int
for _, q := range b.queues {
var selectedUpkeeps int
upkeepNotSelected := 0
for _, qid := range b.queueIDs {
q := b.queues[qid]
if !upkeepSelector(q.id) {
// if the upkeep is not selected, skip it
b.lggr.Debugw("skipping dequeue", "upkeepID", q.id.String())
upkeepNotSelected++
continue
}
selectedUpkeeps++
logsInRange := q.sizeOfRange(start, end)
if logsInRange == 0 {
b.lggr.Debugw("skipping dequeue, no logs in range", "upkeepID", q.id.String())
// if there are no logs in the range, skip the upkeep
continue
}
if capacity == 0 {
b.lggr.Debugw("skipping dequeue, no capacity", "upkeepID", q.id.String())
// if there is no more capacity for results, just count the remaining logs
remainingLogs += logsInRange
continue
Expand All @@ -209,6 +217,7 @@ func (b *logBuffer) dequeue(start, end int64, upkeepLimit, capacity int, upkeepS
}
remainingLogs += remaining
}
b.lggr.Debugw("dequeued logs for upkeeps", "selectedUpkeeps", selectedUpkeeps, "upkeepNotSelected", upkeepNotSelected, "totalUpkeeps", len(b.queueIDs), "numLogs", len(result))
return result, remainingLogs
}

Expand All @@ -230,15 +239,20 @@ func (b *logBuffer) SyncFilters(filterStore UpkeepFilterStore) error {
b.lock.Lock()
defer b.lock.Unlock()

for upkeepID := range b.queues {
var newQueueIDs []string
for _, upkeepID := range b.queueIDs {
uid := new(big.Int)
_, ok := uid.SetString(upkeepID, 10)
if ok && !filterStore.Has(uid) {
// remove upkeep that is not in the filter store
delete(b.queues, upkeepID)
} else {
newQueueIDs = append(newQueueIDs, upkeepID)
}
}

b.queueIDs = newQueueIDs

return nil
}

Expand All @@ -254,6 +268,9 @@ func (b *logBuffer) setUpkeepQueue(uid *big.Int, buf *upkeepLogQueue) {
b.lock.Lock()
defer b.lock.Unlock()

if _, ok := b.queues[uid.String()]; !ok {
b.queueIDs = append(b.queueIDs, uid.String())
}
b.queues[uid.String()] = buf
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package logprovider

import (
"fmt"
"math"
"math/big"
"testing"

Expand Down Expand Up @@ -232,13 +234,146 @@ func TestLogEventBufferV1_Dequeue(t *testing.T) {
added, dropped := buf.Enqueue(id, logs...)
require.Equal(t, len(logs), added+dropped)
}
results, remaining := buf.Dequeue(tc.args.block, tc.args.blockRate, tc.args.upkeepLimit, tc.args.maxResults, tc.args.upkeepSelector)
start, end := getBlockWindow(tc.args.block, tc.args.blockRate)
results, remaining := buf.Dequeue(start, end, tc.args.upkeepLimit, tc.args.maxResults, tc.args.upkeepSelector)
require.Equal(t, len(tc.results), len(results))
require.Equal(t, tc.remaining, remaining)
})
}
}

func TestLogEventBufferV1_Dequeue_highLoad(t *testing.T) {
t.Run("Dequeue from a deterministic order of upkeeps", func(t *testing.T) {
lookback := uint32(20)
blockRate := uint32(1)
logLimit := uint32(1)
buf := NewLogBuffer(logger.TestLogger(t), lookback, blockRate, logLimit)

upkeepIDs := []*big.Int{
big.NewInt(1),
big.NewInt(2),
big.NewInt(3),
big.NewInt(4),
big.NewInt(5),
}

numUpkeeps := len(upkeepIDs)

blockNumbers := []int64{
100, 101, 102, 103, 104, 105, 106, 107, 108, 109,
}

// for each upkeep, enqueue 10 logs per block, for 10 blocks
for _, upkeepID := range upkeepIDs {
for _, blockNumber := range blockNumbers {
for i := 0; i < 10; i++ {
log := logpoller.Log{
BlockNumber: blockNumber,
TxHash: common.HexToHash(fmt.Sprintf("0x%dff%dff%d", blockNumber, upkeepID.Int64(), i)),
}
buf.Enqueue(upkeepID, log)
}
}
}

bufV1 := buf.(*logBuffer)

assert.Equal(t, 5, len(bufV1.queues))

// each queue should have 100 logs
assert.Equal(t, 100, len(bufV1.queues["1"].logs))
assert.Equal(t, 100, len(bufV1.queues["2"].logs))
assert.Equal(t, 100, len(bufV1.queues["3"].logs))
assert.Equal(t, 100, len(bufV1.queues["4"].logs))
assert.Equal(t, 100, len(bufV1.queues["5"].logs))

maxResults := 5
iterations := int(math.Ceil(float64(numUpkeeps*5) / float64(maxResults)))

assert.Equal(t, 5, iterations)

upkeepSelectorFn := func(id *big.Int) bool {
return id.Int64()%int64(iterations) == int64(0) // on this dequeue attempt, current iteration will be 0
}

logs, remaining := buf.Dequeue(100, 101, 5, maxResults, upkeepSelectorFn)

// we should dequeue 5 logs, and the block window should have 15 logs remaining for this selector
assert.Equal(t, 5, len(logs))
assert.Equal(t, 15, remaining)

assert.Equal(t, 100, len(bufV1.queues["1"].logs))
assert.Equal(t, 100, len(bufV1.queues["2"].logs))
assert.Equal(t, 100, len(bufV1.queues["3"].logs))
assert.Equal(t, 100, len(bufV1.queues["4"].logs))
assert.Equal(t, 95, len(bufV1.queues["5"].logs))

upkeepSelectorFn = func(id *big.Int) bool {
return id.Int64()%int64(iterations) == int64(1) // on this dequeue attempt, current iteration will be 1
}

logs, remaining = buf.Dequeue(100, 101, 5, maxResults, upkeepSelectorFn)

// we should dequeue 5 logs, and the block window should have 15 logs remaining for this selector
assert.Equal(t, 5, len(logs))
assert.Equal(t, 15, remaining)

assert.Equal(t, 95, len(bufV1.queues["1"].logs))
assert.Equal(t, 100, len(bufV1.queues["2"].logs))
assert.Equal(t, 100, len(bufV1.queues["3"].logs))
assert.Equal(t, 100, len(bufV1.queues["4"].logs))
assert.Equal(t, 95, len(bufV1.queues["5"].logs))

upkeepSelectorFn = func(id *big.Int) bool {
return id.Int64()%int64(iterations) == int64(2) // on this dequeue attempt, current iteration will be 2
}

logs, remaining = buf.Dequeue(100, 101, 5, maxResults, upkeepSelectorFn)

// we should dequeue 5 logs, and the block window should have 15 logs remaining for this selector
assert.Equal(t, 5, len(logs))
assert.Equal(t, 15, remaining)

assert.Equal(t, 95, len(bufV1.queues["1"].logs))
assert.Equal(t, 95, len(bufV1.queues["2"].logs))
assert.Equal(t, 100, len(bufV1.queues["3"].logs))
assert.Equal(t, 100, len(bufV1.queues["4"].logs))
assert.Equal(t, 95, len(bufV1.queues["5"].logs))

upkeepSelectorFn = func(id *big.Int) bool {
return id.Int64()%int64(iterations) == int64(3) // on this dequeue attempt, current iteration will be 3
}

logs, remaining = buf.Dequeue(100, 101, 5, maxResults, upkeepSelectorFn)

// we should dequeue 5 logs, and the block window should have 15 logs remaining for this selector
assert.Equal(t, 5, len(logs))
assert.Equal(t, 15, remaining)

assert.Equal(t, 95, len(bufV1.queues["1"].logs))
assert.Equal(t, 95, len(bufV1.queues["2"].logs))
assert.Equal(t, 95, len(bufV1.queues["3"].logs))
assert.Equal(t, 100, len(bufV1.queues["4"].logs))
assert.Equal(t, 95, len(bufV1.queues["5"].logs))

upkeepSelectorFn = func(id *big.Int) bool {
return id.Int64()%int64(iterations) == int64(4) // on this dequeue attempt, current iteration will be 4
}

logs, remaining = buf.Dequeue(100, 101, 5, maxResults, upkeepSelectorFn)

// we should dequeue 5 logs, and the block window should have 15 logs remaining for this selector
assert.Equal(t, 5, len(logs))
assert.Equal(t, 15, remaining)

assert.Equal(t, 95, len(bufV1.queues["1"].logs))
assert.Equal(t, 95, len(bufV1.queues["2"].logs))
assert.Equal(t, 95, len(bufV1.queues["3"].logs))
assert.Equal(t, 95, len(bufV1.queues["4"].logs))
assert.Equal(t, 95, len(bufV1.queues["5"].logs))
})
}

func TestLogEventBufferV1_Enqueue(t *testing.T) {
tests := []struct {
name string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"hash"
"io"
"math"
"math/big"
"runtime"
"sync"
Expand Down Expand Up @@ -114,6 +115,9 @@ type logEventProvider struct {
currentPartitionIdx uint64

chainID *big.Int

currentIteration int
iterations int
}

func NewLogProvider(lggr logger.Logger, poller logpoller.LogPoller, chainID *big.Int, packer LogDataPacker, filterStore UpkeepFilterStore, opts LogTriggersOptions) *logEventProvider {
Expand Down Expand Up @@ -151,6 +155,7 @@ func (p *logEventProvider) SetConfig(cfg ocr2keepers.LogEventProviderConfig) {

switch p.opts.BufferVersion {
case BufferVersionV1:
p.lggr.With("where", "setConfig").Infow("setting buffer v1 config")
p.bufferV1.SetConfig(uint32(p.opts.LookbackBlocks), blockRate, logLimit)
default:
}
Expand Down Expand Up @@ -287,25 +292,75 @@ func (p *logEventProvider) getLogsFromBuffer(latestBlock int64) []ocr2keepers.Up
switch p.opts.BufferVersion {
case BufferVersionV1:
// in v1, we use a greedy approach - we keep dequeuing logs until we reach the max results or cover the entire range.
blockRate, logLimitLow, maxResults, _ := p.getBufferDequeueArgs()
blockRate, logLimitLow, maxResults, numOfUpkeeps := p.getBufferDequeueArgs()

p.lggr.With("where", "getLogsFromBuffer").Infow("getBufferDequeueArgs", "blockRate", blockRate, "logLimitLow", logLimitLow, "maxResults", maxResults, "numOfUpkeeps", numOfUpkeeps)
// when numOfUpkeeps exceeds maxResults, it isn't possible to dequeue a log for every upkeep in a single round,
// even if logLimitLow is set to 1. For this reason, we can spread the dequeue process across multiple iterations,
// e.g. if we have 200 upkeeps, and maxResults is 100, a single dequeue could only dequeue logs for half
// of the upkeeps, whereas a dequeue process of two iterations (two dequeue calls) can dequeue logs for upkeeps.
if p.iterations == p.currentIteration {
p.lggr.With("where", "getLogsFromBuffer").Infow("recalculating iterations", "p.iterations", p.iterations, "p.currentIteration", p.currentIteration)

p.currentIteration = 0
p.iterations = int(math.Ceil(float64(numOfUpkeeps*logLimitLow) / float64(maxResults)))
if p.iterations == 0 {
p.iterations = 1
}

p.lggr.With("where", "getLogsFromBuffer").Infow("recalculated iterations", "p.iterations", p.iterations, "p.currentIteration", p.currentIteration)
}

// upkeepSelectorFn is a function that accepts an upkeep ID, and performs a modulus against the number of
// iterations, and compares the result against the current iteration. When this comparison returns true, the
// upkeep is selected for the dequeuing. This means that, for a given set of upkeeps, a different subset of
// upkeeps will be dequeued for each iteration once only, and, across all iterations, all upkeeps will be
// dequeued once.
upkeepSelectorFn := func(id *big.Int) bool {
willDequeue := id.Int64()%int64(p.iterations) == int64(p.currentIteration)
p.lggr.With("where", "getLogsFromBuffer").Infow("upkeepSelectorFn", "p.iterations", p.iterations, "p.currentIteration", p.currentIteration, "upkeepID", id.String(), "willDequeue", willDequeue)
return willDequeue
}

p.lggr.With("where", "getLogsFromBuffer").Infow("dequeuing", "payloads", len(payloads), "maxResults", maxResults, "start", start, "latestBlock", latestBlock)

for len(payloads) < maxResults && start <= latestBlock {
logs, remaining := p.bufferV1.Dequeue(start, blockRate, logLimitLow, maxResults-len(payloads), DefaultUpkeepSelector)
startWindow, end := getBlockWindow(start, blockRate)

p.lggr.With("where", "getLogsFromBuffer").Infow("dequeuing loop", "startWindow", startWindow, "end", end)

logs, remaining := p.bufferV1.Dequeue(startWindow, end, logLimitLow, maxResults-len(payloads), upkeepSelectorFn)

p.lggr.With("where", "getLogsFromBuffer").Infow("dequeued loop", "logs", len(logs), "remaining", remaining)

if len(logs) > 0 {
p.lggr.Debugw("Dequeued logs", "start", start, "latestBlock", latestBlock, "logs", len(logs))
}

payloadsBuilt := 0
for _, l := range logs {
payload, err := p.createPayload(l.ID, l.Log)
if err == nil {
payloads = append(payloads, payload)
payloadsBuilt++
} else {
p.lggr.With("where", "getLogsFromBuffer").Infow("unable to build payload", "err", err.Error())
}
}

p.lggr.With("where", "getLogsFromBuffer").Infow("built payloads", "payloadsBuilt", payloadsBuilt)

if remaining > 0 {
p.lggr.Debugw("Remaining logs", "start", start, "latestBlock", latestBlock, "remaining", remaining)
// TODO: handle remaining logs in a better way than consuming the entire window, e.g. do not repeat more than x times
continue
}

start += int64(blockRate)
p.lggr.With("where", "getLogsFromBuffer").Infow("advancing window", "start", start)
}
p.currentIteration++
p.lggr.With("where", "getLogsFromBuffer").Infow("advanced iteration", "p.currentIteration", p.currentIteration)
default:
logs := p.buffer.dequeueRange(start, latestBlock, AllowedLogsPerUpkeep, MaxPayloads)
for _, l := range logs {
Expand Down
Loading
Loading