From 4cb35f470f67da780435082b4aed9d7edf7864d6 Mon Sep 17 00:00:00 2001 From: Fergal Gribben Date: Sun, 9 Jun 2024 20:16:15 +0100 Subject: [PATCH] Add tests showing reorg handling --- .../evmregistry/v21/logprovider/provider.go | 4 + .../v21/logprovider/provider_test.go | 388 ++++++++++++++++++ 2 files changed, 392 insertions(+) diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go index 12f6f27bfed..163b9b11f77 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider.go @@ -222,6 +222,10 @@ func (c *dequeueCoordinator) markReorg(block int64, blockRate uint32) { startWindow, _ := getBlockWindow(block, int(blockRate)) c.dequeuedMinimum[startWindow] = false + // TODO instead of wiping the count for all upkeeps, should we wipe for upkeeps only impacted by the reorg? + for upkeepID := range c.dequeuedUpkeeps[startWindow] { + c.dequeuedUpkeeps[startWindow][upkeepID] = 0 + } } func (c *dequeueCoordinator) updateBlockWindow(startWindow int64, logs, remaining, numberOfUpkeeps, logLimitLow int) { diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider_test.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider_test.go index 3527f2cf6e5..8b1716d4239 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/provider_test.go @@ -1908,6 +1908,394 @@ func TestLogEventProvider_GetLatestPayloads(t *testing.T) { assert.Equal(t, 190, blockWindowCounts[4]) }) + t.Run("min dequeue followed by best effort followed by reorg followed by best effort", func(t *testing.T) { + oldMaxPayloads := MaxPayloads + MaxPayloads = 10 + defer func() { + MaxPayloads = oldMaxPayloads + }() + + upkeepIDs := []*big.Int{ + big.NewInt(1), + big.NewInt(2), + big.NewInt(3), + big.NewInt(4), + big.NewInt(5), + } + + filterStore := NewUpkeepFilterStore() + + logGenerator := func(start, end int64) []logpoller.Log { + var res []logpoller.Log + for i := start; i < end; i++ { + for j := 0; j < 10; j++ { + res = append(res, logpoller.Log{ + LogIndex: int64(j), + BlockHash: common.HexToHash(fmt.Sprintf("%d", i+1)), + BlockNumber: i + 1, + }) + } + } + return res + } + + // use a log poller that will create logs for the queried block range + logPoller := &mockLogPoller{ + LatestBlockFn: func(ctx context.Context) (int64, error) { + return 100, nil + }, + LogsWithSigsFn: func(ctx context.Context, start, end int64, eventSigs []common.Hash, address common.Address) ([]logpoller.Log, error) { + return logGenerator(start, end), nil + }, + } + + // prepare the filter store with upkeeps + for _, upkeepID := range upkeepIDs { + filterStore.AddActiveUpkeeps( + upkeepFilter{ + addr: []byte(upkeepID.String()), + upkeepID: upkeepID, + topics: []common.Hash{ + common.HexToHash(upkeepID.String()), + }, + }, + ) + } + + opts := NewOptions(200, big.NewInt(1)) + opts.BufferVersion = "v1" + + provider := NewLogProvider(logger.TestLogger(t), logPoller, big.NewInt(1), &mockedPacker{}, filterStore, opts) + + ctx := context.Background() + + err := provider.ReadLogs(ctx, upkeepIDs...) + assert.NoError(t, err) + + assert.Equal(t, 5, provider.bufferV1.NumOfUpkeeps()) + + bufV1 := provider.bufferV1.(*logBuffer) + + // each upkeep should have 10 logs * 100 blocks = 1000 logs + assert.Equal(t, 1000, countLogs(bufV1.queues["1"].logs)) + assert.Equal(t, 1000, countLogs(bufV1.queues["2"].logs)) + assert.Equal(t, 1000, countLogs(bufV1.queues["3"].logs)) + assert.Equal(t, 1000, countLogs(bufV1.queues["4"].logs)) + assert.Equal(t, 1000, countLogs(bufV1.queues["5"].logs)) + + for i := 0; i < 100; i++ { + payloads, err := provider.GetLatestPayloads(ctx) + assert.NoError(t, err) + + // we dequeue a maximum of 10 logs + assert.Equal(t, 10, len(payloads)) + } + + // the dequeue is evenly distributed across the 5 upkeeps + assert.Equal(t, 800, countLogs(bufV1.queues["1"].logs)) + assert.Equal(t, 800, countLogs(bufV1.queues["2"].logs)) + assert.Equal(t, 800, countLogs(bufV1.queues["3"].logs)) + assert.Equal(t, 800, countLogs(bufV1.queues["4"].logs)) + assert.Equal(t, 800, countLogs(bufV1.queues["5"].logs)) + + blockWindowCounts := map[int64]int{} + + for _, q := range bufV1.queues { + for blockNumber, logs := range q.logs { + blockWindowCounts[blockNumber] += len(logs) + } + } + // min dequeue should have happened for all block windows + assert.Equal(t, 40, blockWindowCounts[1]) + assert.Equal(t, 40, blockWindowCounts[100]) + + payloads, err := provider.GetLatestPayloads(ctx) + assert.NoError(t, err) + + // we dequeue a maximum of 10 logs + assert.Equal(t, 10, len(payloads)) + + // the dequeue is evenly distributed across the 5 upkeeps + assert.Equal(t, 798, countLogs(bufV1.queues["1"].logs)) + assert.Equal(t, 798, countLogs(bufV1.queues["2"].logs)) + assert.Equal(t, 798, countLogs(bufV1.queues["3"].logs)) + assert.Equal(t, 798, countLogs(bufV1.queues["4"].logs)) + assert.Equal(t, 798, countLogs(bufV1.queues["5"].logs)) + + blockWindowCounts = map[int64]int{} + + for _, q := range bufV1.queues { + for blockNumber, logs := range q.logs { + blockWindowCounts[blockNumber] += len(logs) + } + } + + // best effort dequeues first block window + assert.Equal(t, 30, blockWindowCounts[1]) + assert.Equal(t, 40, blockWindowCounts[2]) + + payloads, err = provider.GetLatestPayloads(ctx) + assert.NoError(t, err) + + // we dequeue a maximum of 10 logs + assert.Equal(t, 10, len(payloads)) + + // the dequeue is evenly distributed across the 5 upkeeps + assert.Equal(t, 796, countLogs(bufV1.queues["1"].logs)) + assert.Equal(t, 796, countLogs(bufV1.queues["2"].logs)) + assert.Equal(t, 796, countLogs(bufV1.queues["3"].logs)) + assert.Equal(t, 796, countLogs(bufV1.queues["4"].logs)) + assert.Equal(t, 796, countLogs(bufV1.queues["5"].logs)) + + blockWindowCounts = map[int64]int{} + + for _, q := range bufV1.queues { + for blockNumber, logs := range q.logs { + blockWindowCounts[blockNumber] += len(logs) + } + } + + // best effort dequeues first block window + assert.Equal(t, 20, blockWindowCounts[1]) + assert.Equal(t, 40, blockWindowCounts[2]) + + // reorg happens + logGenerator = func(start, end int64) []logpoller.Log { + var res []logpoller.Log + for i := start; i < end; i++ { + if i == 97 { + for j := 0; j < 10; j++ { + res = append(res, logpoller.Log{ + LogIndex: int64(j), + BlockHash: common.HexToHash(fmt.Sprintf("%de", i+1)), + BlockNumber: i + 1, + }) + } + } else { + for j := 0; j < 10; j++ { + res = append(res, logpoller.Log{ + LogIndex: int64(j), + BlockHash: common.HexToHash(fmt.Sprintf("%d", i+1)), + BlockNumber: i + 1, + }) + } + } + } + return res + } + // use a log poller that will create logs for the queried block range + provider.poller = &mockLogPoller{ + LatestBlockFn: func(ctx context.Context) (int64, error) { + return 102, nil + }, + LogsWithSigsFn: func(ctx context.Context, start, end int64, eventSigs []common.Hash, address common.Address) ([]logpoller.Log, error) { + return logGenerator(start, end), nil + }, + } + + blockWindowCounts = map[int64]int{} + + for _, q := range bufV1.queues { + for blockNumber, logs := range q.logs { + blockWindowCounts[blockNumber] += len(logs) + } + } + + assert.Equal(t, 20, blockWindowCounts[1]) + assert.Equal(t, 40, blockWindowCounts[98]) + + err = provider.ReadLogs(ctx, upkeepIDs...) + assert.NoError(t, err) + + blockWindowCounts = map[int64]int{} + + for _, q := range bufV1.queues { + for blockNumber, logs := range q.logs { + blockWindowCounts[blockNumber] += len(logs) + } + } + + assert.Equal(t, 20, blockWindowCounts[1]) + assert.Equal(t, 40, blockWindowCounts[97]) + assert.Equal(t, 50, blockWindowCounts[98]) // reorg block window has had new logs added after reorg + assert.Equal(t, 40, blockWindowCounts[99]) + + assert.Equal(t, 818, countLogs(bufV1.queues["1"].logs)) + assert.Equal(t, 818, countLogs(bufV1.queues["2"].logs)) + assert.Equal(t, 818, countLogs(bufV1.queues["3"].logs)) + assert.Equal(t, 818, countLogs(bufV1.queues["4"].logs)) + assert.Equal(t, 818, countLogs(bufV1.queues["5"].logs)) + + assert.Equal(t, true, provider.dequeueCoordinator.dequeuedMinimum[1]) + assert.Equal(t, true, provider.dequeueCoordinator.dequeuedMinimum[97]) + assert.Equal(t, false, provider.dequeueCoordinator.dequeuedMinimum[98]) // this window has no min commitment met due to reorg + assert.Equal(t, true, provider.dequeueCoordinator.dequeuedMinimum[99]) + + payloads, err = provider.GetLatestPayloads(ctx) + assert.NoError(t, err) + + // we dequeue a maximum of 10 logs + assert.Equal(t, 10, len(payloads)) + + assert.Equal(t, true, provider.dequeueCoordinator.dequeuedMinimum[1]) + assert.Equal(t, true, provider.dequeueCoordinator.dequeuedMinimum[97]) + assert.Equal(t, true, provider.dequeueCoordinator.dequeuedMinimum[98]) // this window has had min commitment met following reorg + assert.Equal(t, true, provider.dequeueCoordinator.dequeuedMinimum[99]) + + // the dequeue is evenly distributed across the 5 upkeeps + assert.Equal(t, 816, countLogs(bufV1.queues["1"].logs)) + assert.Equal(t, 816, countLogs(bufV1.queues["2"].logs)) + assert.Equal(t, 816, countLogs(bufV1.queues["3"].logs)) + assert.Equal(t, 816, countLogs(bufV1.queues["4"].logs)) + assert.Equal(t, 816, countLogs(bufV1.queues["5"].logs)) + + blockWindowCounts = map[int64]int{} + + for _, q := range bufV1.queues { + for blockNumber, logs := range q.logs { + blockWindowCounts[blockNumber] += len(logs) + } + } + + assert.Equal(t, 20, blockWindowCounts[1]) + assert.Equal(t, 40, blockWindowCounts[98]) // this block window has had its min dequeue met following a reorg + assert.Equal(t, 50, blockWindowCounts[101]) + assert.Equal(t, 50, blockWindowCounts[102]) + + payloads, err = provider.GetLatestPayloads(ctx) + assert.NoError(t, err) + + // we dequeue a maximum of 10 logs + assert.Equal(t, 10, len(payloads)) + + // the dequeue is evenly distributed across the 5 upkeeps + assert.Equal(t, 814, countLogs(bufV1.queues["1"].logs)) + assert.Equal(t, 814, countLogs(bufV1.queues["2"].logs)) + assert.Equal(t, 814, countLogs(bufV1.queues["3"].logs)) + assert.Equal(t, 814, countLogs(bufV1.queues["4"].logs)) + assert.Equal(t, 814, countLogs(bufV1.queues["5"].logs)) + + blockWindowCounts = map[int64]int{} + + for _, q := range bufV1.queues { + for blockNumber, logs := range q.logs { + blockWindowCounts[blockNumber] += len(logs) + } + } + + // best effort dequeues first block window + assert.Equal(t, 20, blockWindowCounts[1]) + assert.Equal(t, 40, blockWindowCounts[98]) + assert.Equal(t, 40, blockWindowCounts[101]) + assert.Equal(t, 50, blockWindowCounts[102]) + + payloads, err = provider.GetLatestPayloads(ctx) + assert.NoError(t, err) + + // we dequeue a maximum of 10 logs + assert.Equal(t, 10, len(payloads)) + + // the dequeue is evenly distributed across the 5 upkeeps + assert.Equal(t, 812, countLogs(bufV1.queues["1"].logs)) + assert.Equal(t, 812, countLogs(bufV1.queues["2"].logs)) + assert.Equal(t, 812, countLogs(bufV1.queues["3"].logs)) + assert.Equal(t, 812, countLogs(bufV1.queues["4"].logs)) + assert.Equal(t, 812, countLogs(bufV1.queues["5"].logs)) + + blockWindowCounts = map[int64]int{} + + for _, q := range bufV1.queues { + for blockNumber, logs := range q.logs { + blockWindowCounts[blockNumber] += len(logs) + } + } + + assert.Equal(t, 20, blockWindowCounts[1]) + assert.Equal(t, 40, blockWindowCounts[98]) + assert.Equal(t, 40, blockWindowCounts[101]) + assert.Equal(t, 40, blockWindowCounts[102]) // latest block window has now had min dequeue met + + payloads, err = provider.GetLatestPayloads(ctx) + assert.NoError(t, err) + + // we dequeue a maximum of 10 logs + assert.Equal(t, 10, len(payloads)) + + // the dequeue is evenly distributed across the 5 upkeeps + assert.Equal(t, 810, countLogs(bufV1.queues["1"].logs)) + assert.Equal(t, 810, countLogs(bufV1.queues["2"].logs)) + assert.Equal(t, 810, countLogs(bufV1.queues["3"].logs)) + assert.Equal(t, 810, countLogs(bufV1.queues["4"].logs)) + assert.Equal(t, 810, countLogs(bufV1.queues["5"].logs)) + + blockWindowCounts = map[int64]int{} + + for _, q := range bufV1.queues { + for blockNumber, logs := range q.logs { + blockWindowCounts[blockNumber] += len(logs) + } + } + + assert.Equal(t, 10, blockWindowCounts[1]) // best effort resumes on the first block window + assert.Equal(t, 40, blockWindowCounts[98]) + assert.Equal(t, 40, blockWindowCounts[101]) + assert.Equal(t, 40, blockWindowCounts[102]) + + payloads, err = provider.GetLatestPayloads(ctx) + assert.NoError(t, err) + + // we dequeue a maximum of 10 logs + assert.Equal(t, 10, len(payloads)) + + // the dequeue is evenly distributed across the 5 upkeeps + assert.Equal(t, 808, countLogs(bufV1.queues["1"].logs)) + assert.Equal(t, 808, countLogs(bufV1.queues["2"].logs)) + assert.Equal(t, 808, countLogs(bufV1.queues["3"].logs)) + assert.Equal(t, 808, countLogs(bufV1.queues["4"].logs)) + assert.Equal(t, 808, countLogs(bufV1.queues["5"].logs)) + + blockWindowCounts = map[int64]int{} + + for _, q := range bufV1.queues { + for blockNumber, logs := range q.logs { + blockWindowCounts[blockNumber] += len(logs) + } + } + + assert.Equal(t, 0, blockWindowCounts[1]) // best effort completes on the first block window + assert.Equal(t, 40, blockWindowCounts[2]) + assert.Equal(t, 40, blockWindowCounts[98]) + assert.Equal(t, 40, blockWindowCounts[101]) + assert.Equal(t, 40, blockWindowCounts[102]) + + payloads, err = provider.GetLatestPayloads(ctx) + assert.NoError(t, err) + + // we dequeue a maximum of 10 logs + assert.Equal(t, 10, len(payloads)) + + // the dequeue is evenly distributed across the 5 upkeeps + assert.Equal(t, 806, countLogs(bufV1.queues["1"].logs)) + assert.Equal(t, 806, countLogs(bufV1.queues["2"].logs)) + assert.Equal(t, 806, countLogs(bufV1.queues["3"].logs)) + assert.Equal(t, 806, countLogs(bufV1.queues["4"].logs)) + assert.Equal(t, 806, countLogs(bufV1.queues["5"].logs)) + + blockWindowCounts = map[int64]int{} + + for _, q := range bufV1.queues { + for blockNumber, logs := range q.logs { + blockWindowCounts[blockNumber] += len(logs) + } + } + + assert.Equal(t, 0, blockWindowCounts[1]) + assert.Equal(t, 30, blockWindowCounts[2]) // best effort continues on the second block window + assert.Equal(t, 40, blockWindowCounts[98]) + assert.Equal(t, 40, blockWindowCounts[101]) + assert.Equal(t, 40, blockWindowCounts[102]) + + }) } type mockedPacker struct {