diff --git a/core/chains/evm/logpoller/orm_test.go b/core/chains/evm/logpoller/orm_test.go index 3352e04745..ba5b16d697 100644 --- a/core/chains/evm/logpoller/orm_test.go +++ b/core/chains/evm/logpoller/orm_test.go @@ -2,7 +2,6 @@ package logpoller_test import ( "bytes" - "context" "database/sql" "errors" "fmt" @@ -17,7 +16,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" "github.com/ethereum/go-ethereum/common" - "github.com/jackc/pgx/v4" pkgerrors "github.com/pkg/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -548,16 +546,160 @@ func TestORM(t *testing.T) { assert.Zero(t, len(logs)) } -type PgxLogger struct { - lggr logger.Logger -} +func TestORM_SelectExcessLogs(t *testing.T) { + t.Parallel() + th := SetupTH(t, lpOpts) + o1 := th.ORM + o2 := th.ORM2 + ctx := testutils.Context(t) -func NewPgxLogger(lggr logger.Logger) PgxLogger { - return PgxLogger{lggr} -} + topic := common.HexToHash("0x1599") + topic2 := common.HexToHash("0x1600") + + blockHashes := []common.Hash{common.HexToHash("0x1234"), common.HexToHash("0x1235"), common.HexToHash("0x1236")} + + // Insert blocks for active chain + for i := int64(0); i < 3; i++ { + blockNumber := 10 + i + require.NoError(t, o1.InsertBlock(ctx, blockHashes[i], blockNumber, time.Now(), blockNumber)) + b1, err := o1.SelectBlockByHash(ctx, blockHashes[i]) + require.NoError(t, err) + require.Equal(t, blockNumber, b1.BlockNumber) + } + + // Insert block from a different chain + require.NoError(t, o2.InsertBlock(ctx, common.HexToHash("0x1234"), 17, time.Now(), 17)) + b, err := o2.SelectBlockByHash(ctx, common.HexToHash("0x1234")) + require.NoError(t, err) + require.Equal(t, int64(17), b.BlockNumber) + + for i := int64(0); i < 7; i++ { + require.NoError(t, o1.InsertLogs(ctx, []logpoller.Log{ + { + EvmChainId: ubig.New(th.ChainID), + LogIndex: i, + BlockHash: common.HexToHash("0x1234"), + BlockNumber: int64(10), + EventSig: topic, + Topics: [][]byte{topic[:]}, + Address: common.HexToAddress("0x1234"), + TxHash: common.HexToHash("0x1888"), + Data: []byte("hello"), + BlockTimestamp: time.Now(), + }, + { + EvmChainId: ubig.New(th.ChainID), + LogIndex: i, + BlockHash: common.HexToHash("0x1234"), + BlockNumber: int64(11), + EventSig: topic, + Topics: [][]byte{topic[:]}, + Address: common.HexToAddress("0x1235"), + TxHash: common.HexToHash("0x1888"), + Data: []byte("hello"), + BlockTimestamp: time.Now(), + }, + { + EvmChainId: ubig.New(th.ChainID), + LogIndex: i, + BlockHash: common.HexToHash("0x1234"), + BlockNumber: int64(12), + EventSig: topic2, + Topics: [][]byte{topic2[:]}, + Address: common.HexToAddress("0x1235"), + TxHash: common.HexToHash("0x1888"), + Data: []byte("hello"), + BlockTimestamp: time.Now(), + }, + })) + } -func (l PgxLogger) Log(ctx context.Context, log pgx.LogLevel, msg string, data map[string]interface{}) { + logs, err := o1.SelectLogsByBlockRange(ctx, 1, 12) + require.NoError(t, err) + require.Len(t, logs, 21) + // Insert a log on a different chain, to make sure + // it's not affected by any operations on the chain LogPoller + // is managing. + require.NoError(t, o2.InsertLogs(ctx, []logpoller.Log{ + { + EvmChainId: ubig.New(th.ChainID2), + LogIndex: 8, + BlockHash: common.HexToHash("0x1238"), + BlockNumber: int64(17), + EventSig: topic2, + Topics: [][]byte{topic2[:]}, + Address: common.HexToAddress("0x1236"), + TxHash: common.HexToHash("0x1888"), + Data: []byte("same log on unrelated chain"), + BlockTimestamp: time.Now(), + }, + })) + + logs, err = o2.SelectLogsByBlockRange(ctx, 1, 17) + require.NoError(t, err) + require.Len(t, logs, 1) + + filter1 := logpoller.Filter{ + Name: "MaxLogsKept = 0 (addr 1234 topic1)", + Addresses: []common.Address{common.HexToAddress("0x1234")}, + EventSigs: types.HashArray{topic}, + MaxLogsKept: 0, + } + + filter12 := logpoller.Filter{ // retain both topic1 and topic2 on contract3 for at least 1ms + Name: "MaxLogsKept = 1 (addr 1235 topic1 & topic2)", + Addresses: []common.Address{common.HexToAddress("0x1235")}, + EventSigs: types.HashArray{topic, topic2}, + Retention: time.Millisecond, + MaxLogsKept: 1, + } + filter2 := logpoller.Filter{ // retain topic2 on contract3 for at least 1 hour + Name: "MaxLogsKept = 5 (addr 1235 topic2)", + Addresses: []common.Address{common.HexToAddress("0x1235")}, + EventSigs: types.HashArray{topic2}, + MaxLogsKept: 5, + } + + // Test inserting filters and reading them back + require.NoError(t, o1.InsertFilter(ctx, filter1)) + require.NoError(t, o1.InsertFilter(ctx, filter12)) + require.NoError(t, o1.InsertFilter(ctx, filter2)) + + filters, err := o1.LoadFilters(ctx) + require.NoError(t, err) + require.Len(t, filters, 3) + assert.Equal(t, filter1, filters["MaxLogsKept = 0 (addr 1234 topic1)"]) + assert.Equal(t, filter12, filters["MaxLogsKept = 1 (addr 1235 topic1 & topic2)"]) + assert.Equal(t, filter2, filters["MaxLogsKept = 5 (addr 1235 topic2)"]) + + ids, err := o1.SelectUnmatchedLogIDs(ctx, 0) + require.NoError(t, err) + require.Len(t, ids, 0) + + // Number of excess logs eligible for pruning: + // 2 of the 7 matching filter2 + 6 of the 7 matching filter12 but not filter2 = 8 total of 21 + + // Test SelectExcessLogIDs with limit less than # blocks + // ( should only consider blocks 10 & 11, returning 6 excess events from block 11 + // but ignoring the 2 in block 12 ) + ids, err = o1.SelectExcessLogIDs(ctx, 2) + require.NoError(t, err) + assert.Len(t, ids, 6) + + // Test SelectExcessLogIDs with limit greater than # blocks: + ids, err = o1.SelectExcessLogIDs(ctx, 4) + require.NoError(t, err) + assert.Len(t, ids, 8) + + // Test SelectExcessLogIDs with no limit + ids, err = o1.SelectExcessLogIDs(ctx, 10) + require.NoError(t, err) + assert.Len(t, ids, 8) + + deleted, err := o1.DeleteLogsByRowID(ctx, ids) + require.NoError(t, err) + assert.Equal(t, int64(8), deleted) } func TestLogPollerFilters(t *testing.T) {