diff --git a/core/scripts/functions/templates/oracle.toml b/core/scripts/functions/templates/oracle.toml index 4739252d68e..d21fe4a5e87 100644 --- a/core/scripts/functions/templates/oracle.toml +++ b/core/scripts/functions/templates/oracle.toml @@ -36,6 +36,7 @@ requestTimeoutSec = 300 maxRequestSizesList = [30_720, 51_200, 102_400, 204_800, 512_000, 1_048_576, 2_097_152, 3_145_728, 5_242_880, 10_485_760] maxSecretsSizesList = [10_240, 20_480, 51_200, 102_400, 307_200, 512_000, 1_048_576, 2_097_152] minimumSubscriptionBalance = "2 link" +pastBlocksToPoll = 25 [pluginConfig.OnchainAllowlist] diff --git a/core/services/ocr2/plugins/functions/config/config.go b/core/services/ocr2/plugins/functions/config/config.go index 3f35d1dba9b..0978500deb5 100644 --- a/core/services/ocr2/plugins/functions/config/config.go +++ b/core/services/ocr2/plugins/functions/config/config.go @@ -23,7 +23,11 @@ type PluginConfig struct { EnableRequestSignatureCheck bool `json:"enableRequestSignatureCheck"` DONID string `json:"donID"` ContractVersion uint32 `json:"contractVersion"` + MinRequestConfirmations uint32 `json:"minRequestConfirmations"` + MinResponseConfirmations uint32 `json:"minResponseConfirmations"` MinIncomingConfirmations uint32 `json:"minIncomingConfirmations"` + PastBlocksToPoll uint32 `json:"pastBlocksToPoll"` + LogPollerCacheDurationSec uint32 `json:"logPollerCacheDurationSec"` // Duration to cache previously detected request or response logs such that they can be filtered when calling logpoller_wrapper.LatestEvents() RequestTimeoutSec uint32 `json:"requestTimeoutSec"` RequestTimeoutCheckFrequencySec uint32 `json:"requestTimeoutCheckFrequencySec"` RequestTimeoutBatchLookupSize uint32 `json:"requestTimeoutBatchLookupSize"` diff --git a/core/services/relay/evm/functions/logpoller_wrapper.go b/core/services/relay/evm/functions/logpoller_wrapper.go index d355bd6569b..6193f4ba862 100644 --- a/core/services/relay/evm/functions/logpoller_wrapper.go +++ b/core/services/relay/evm/functions/logpoller_wrapper.go @@ -24,21 +24,39 @@ import ( type logPollerWrapper struct { services.StateMachine - routerContract *functions_router.FunctionsRouter - pluginConfig config.PluginConfig - client client.Client - logPoller logpoller.LogPoller - subscribers map[string]evmRelayTypes.RouteUpdateSubscriber - activeCoordinator common.Address - proposedCoordinator common.Address - blockOffset int64 - nextBlock int64 - mu sync.Mutex - closeWait sync.WaitGroup - stopCh utils.StopChan - lggr logger.Logger + routerContract *functions_router.FunctionsRouter + pluginConfig config.PluginConfig + client client.Client + logPoller logpoller.LogPoller + subscribers map[string]evmRelayTypes.RouteUpdateSubscriber + activeCoordinator common.Address + proposedCoordinator common.Address + requestBlockOffset int64 + responseBlockOffset int64 + pastBlocksToPoll int64 + logPollerCacheDurationSec int64 + detectedRequests detectedEvents + detectedResponses detectedEvents + mu sync.Mutex + closeWait sync.WaitGroup + stopCh utils.StopChan + lggr logger.Logger } +type detectedEvent struct { + requestId [32]byte + timeDetected time.Time +} + +type detectedEvents struct { + isPreviouslyDetected map[[32]byte]struct{} + detectedEventsOrdered []detectedEvent +} + +const logPollerCacheDurationSecDefault = 300 +const pastBlocksToPollDefault = 50 +const maxLogsToProcess = 1000 + var _ evmRelayTypes.LogPollerWrapper = &logPollerWrapper{} func NewLogPollerWrapper(routerContractAddress common.Address, pluginConfig config.PluginConfig, client client.Client, logPoller logpoller.LogPoller, lggr logger.Logger) (evmRelayTypes.LogPollerWrapper, error) { @@ -48,18 +66,48 @@ func NewLogPollerWrapper(routerContractAddress common.Address, pluginConfig conf } blockOffset := int64(pluginConfig.MinIncomingConfirmations) - 1 if blockOffset < 0 { + lggr.Warnw("invalid minIncomingConfirmations, using 1 instead", "minIncomingConfirmations", pluginConfig.MinIncomingConfirmations) blockOffset = 0 } + requestBlockOffset := int64(pluginConfig.MinRequestConfirmations) - 1 + if requestBlockOffset < 0 { + lggr.Warnw("invalid minRequestConfirmations, using minIncomingConfirmations instead", "minRequestConfirmations", pluginConfig.MinRequestConfirmations) + requestBlockOffset = blockOffset + } + responseBlockOffset := int64(pluginConfig.MinResponseConfirmations) - 1 + if responseBlockOffset < 0 { + lggr.Warnw("invalid minResponseConfirmations, using minIncomingConfirmations instead", "minResponseConfirmations", pluginConfig.MinResponseConfirmations) + responseBlockOffset = blockOffset + } + logPollerCacheDurationSec := int64(pluginConfig.LogPollerCacheDurationSec) + if logPollerCacheDurationSec <= 0 { + lggr.Warnw("invalid logPollerCacheDuration, using 300 instead", "logPollerCacheDurationSec", logPollerCacheDurationSec) + logPollerCacheDurationSec = logPollerCacheDurationSecDefault + } + pastBlocksToPoll := int64(pluginConfig.PastBlocksToPoll) + if pastBlocksToPoll <= 0 { + lggr.Warnw("invalid pastBlocksToPoll, using 50 instead", "pastBlocksToPoll", pastBlocksToPoll) + pastBlocksToPoll = pastBlocksToPollDefault + } + if blockOffset >= pastBlocksToPoll || requestBlockOffset >= pastBlocksToPoll || responseBlockOffset >= pastBlocksToPoll { + lggr.Errorw("invalid config: number of required confirmation blocks >= pastBlocksToPoll", "pastBlocksToPoll", pastBlocksToPoll, "minIncomingConfirmations", pluginConfig.MinIncomingConfirmations, "minRequestConfirmations", pluginConfig.MinRequestConfirmations, "minResponseConfirmations", pluginConfig.MinResponseConfirmations) + return nil, errors.Errorf("invalid config: number of required confirmation blocks >= pastBlocksToPoll") + } return &logPollerWrapper{ - routerContract: routerContract, - pluginConfig: pluginConfig, - blockOffset: blockOffset, - logPoller: logPoller, - client: client, - subscribers: make(map[string]evmRelayTypes.RouteUpdateSubscriber), - stopCh: make(utils.StopChan), - lggr: lggr, + routerContract: routerContract, + pluginConfig: pluginConfig, + requestBlockOffset: requestBlockOffset, + responseBlockOffset: responseBlockOffset, + pastBlocksToPoll: pastBlocksToPoll, + logPollerCacheDurationSec: logPollerCacheDurationSec, + detectedRequests: detectedEvents{isPreviouslyDetected: make(map[[32]byte]struct{})}, + detectedResponses: detectedEvents{isPreviouslyDetected: make(map[[32]byte]struct{})}, + logPoller: logPoller, + client: client, + subscribers: make(map[string]evmRelayTypes.RouteUpdateSubscriber), + stopCh: make(utils.StopChan), + lggr: lggr, }, nil } @@ -68,20 +116,11 @@ func (l *logPollerWrapper) Start(context.Context) error { l.lggr.Infow("starting LogPollerWrapper", "routerContract", l.routerContract.Address().Hex(), "contractVersion", l.pluginConfig.ContractVersion) l.mu.Lock() defer l.mu.Unlock() - if l.pluginConfig.ContractVersion == 0 { - l.activeCoordinator = l.routerContract.Address() - l.proposedCoordinator = l.routerContract.Address() - } else if l.pluginConfig.ContractVersion == 1 { - nextBlock, err := l.logPoller.LatestBlock() - if err != nil { - l.lggr.Errorw("LogPollerWrapper: LatestBlock() failed, starting from 0", "error", err) - } else { - l.lggr.Debugw("LogPollerWrapper: LatestBlock() got starting block", "block", nextBlock) - l.nextBlock = nextBlock.BlockNumber - l.blockOffset - } - l.closeWait.Add(1) - go l.checkForRouteUpdates() + if l.pluginConfig.ContractVersion != 1 { + return errors.New("only contract version 1 is supported") } + l.closeWait.Add(1) + go l.checkForRouteUpdates() return nil }) } @@ -117,16 +156,15 @@ func (l *logPollerWrapper) LatestEvents() ([]evmRelayTypes.OracleRequest, []evmR if l.proposedCoordinator != (common.Address{}) && l.activeCoordinator != l.proposedCoordinator { coordinators = append(coordinators, l.proposedCoordinator) } - nextBlock := l.nextBlock latest, err := l.logPoller.LatestBlock() if err != nil { l.mu.Unlock() return nil, nil, err } - latestBlockNumber := latest.BlockNumber - latestBlockNumber -= l.blockOffset - if latestBlockNumber >= nextBlock { - l.nextBlock = latestBlockNumber + 1 + latestBlockNum := latest.BlockNumber + startBlockNum := latestBlockNum - l.pastBlocksToPoll + if startBlockNum < 0 { + startBlockNum = 0 } l.mu.Unlock() @@ -137,22 +175,24 @@ func (l *logPollerWrapper) LatestEvents() ([]evmRelayTypes.OracleRequest, []evmR l.lggr.Debug("LatestEvents: no non-zero coordinators to check") return resultsReq, resultsResp, errors.New("no non-zero coordinators to check") } - if latestBlockNumber < nextBlock { - l.lggr.Debugw("LatestEvents: no new blocks to check", "latest", latest, "nextBlock", nextBlock) - return resultsReq, resultsResp, nil - } for _, coordinator := range coordinators { - requestLogs, err := l.logPoller.Logs(nextBlock, latestBlockNumber, functions_coordinator.FunctionsCoordinatorOracleRequest{}.Topic(), coordinator) + requestEndBlock := latestBlockNum - l.requestBlockOffset + requestLogs, err := l.logPoller.Logs(startBlockNum, requestEndBlock, functions_coordinator.FunctionsCoordinatorOracleRequest{}.Topic(), coordinator) if err != nil { - l.lggr.Errorw("LatestEvents: fetching request logs from LogPoller failed", "latest", latest, "nextBlock", nextBlock) + l.lggr.Errorw("LatestEvents: fetching request logs from LogPoller failed", "startBlock", startBlockNum, "endBlock", requestEndBlock) return nil, nil, err } - responseLogs, err := l.logPoller.Logs(nextBlock, latestBlockNumber, functions_coordinator.FunctionsCoordinatorOracleResponse{}.Topic(), coordinator) + l.lggr.Debugw("LatestEvents: fetched request logs", "nRequestLogs", len(requestLogs), "latestBlock", latest, "startBlock", startBlockNum, "endBlock", requestEndBlock) + requestLogs = l.filterPreviouslyDetectedEvents(requestLogs, &l.detectedRequests, "requests") + responseEndBlock := latestBlockNum - l.responseBlockOffset + responseLogs, err := l.logPoller.Logs(startBlockNum, responseEndBlock, functions_coordinator.FunctionsCoordinatorOracleResponse{}.Topic(), coordinator) if err != nil { - l.lggr.Errorw("LatestEvents: fetching response logs from LogPoller failed", "latest", latest, "nextBlock", nextBlock) + l.lggr.Errorw("LatestEvents: fetching response logs from LogPoller failed", "startBlock", startBlockNum, "endBlock", responseEndBlock) return nil, nil, err } + l.lggr.Debugw("LatestEvents: fetched request logs", "nResponseLogs", len(responseLogs), "latestBlock", latest, "startBlock", startBlockNum, "endBlock", responseEndBlock) + responseLogs = l.filterPreviouslyDetectedEvents(responseLogs, &l.detectedResponses, "responses") parsingContract, err := functions_coordinator.NewFunctionsCoordinator(coordinator, l.client) if err != nil { @@ -165,7 +205,7 @@ func (l *logPollerWrapper) LatestEvents() ([]evmRelayTypes.OracleRequest, []evmR gethLog := log.ToGethLog() oracleRequest, err := parsingContract.ParseOracleRequest(gethLog) if err != nil { - l.lggr.Errorw("LatestEvents: failed to parse a request log, skipping") + l.lggr.Errorw("LatestEvents: failed to parse a request log, skipping", "err", err) continue } @@ -241,10 +281,46 @@ func (l *logPollerWrapper) LatestEvents() ([]evmRelayTypes.OracleRequest, []evmR } } - l.lggr.Debugw("LatestEvents: done", "nRequestLogs", len(resultsReq), "nResponseLogs", len(resultsResp), "nextBlock", nextBlock, "latest", latest) + l.lggr.Debugw("LatestEvents: done", "nRequestLogs", len(resultsReq), "nResponseLogs", len(resultsResp), "startBlock", startBlockNum, "endBlock", latestBlockNum) return resultsReq, resultsResp, nil } +func (l *logPollerWrapper) filterPreviouslyDetectedEvents(logs []logpoller.Log, detectedEvents *detectedEvents, filterType string) []logpoller.Log { + if len(logs) > maxLogsToProcess { + l.lggr.Errorw("filterPreviouslyDetectedEvents: too many logs to process, only processing latest maxLogsToProcess logs", "filterType", filterType, "nLogs", len(logs), "maxLogsToProcess", maxLogsToProcess) + logs = logs[len(logs)-maxLogsToProcess:] + } + l.mu.Lock() + defer l.mu.Unlock() + filteredLogs := []logpoller.Log{} + for _, log := range logs { + var requestId [32]byte + if len(log.Topics) < 2 || len(log.Topics[1]) != 32 { + l.lggr.Errorw("filterPreviouslyDetectedEvents: invalid log, skipping", "filterType", filterType, "log", log) + continue + } + copy(requestId[:], log.Topics[1]) // requestId is the second topic (1st topic is the event signature) + if _, ok := detectedEvents.isPreviouslyDetected[requestId]; !ok { + filteredLogs = append(filteredLogs, log) + detectedEvents.isPreviouslyDetected[requestId] = struct{}{} + detectedEvents.detectedEventsOrdered = append(detectedEvents.detectedEventsOrdered, detectedEvent{requestId: requestId, timeDetected: time.Now()}) + } + } + expiredRequests := 0 + for _, detectedEvent := range detectedEvents.detectedEventsOrdered { + expirationTime := time.Now().Add(-time.Second * time.Duration(l.logPollerCacheDurationSec)) + if detectedEvent.timeDetected.Before(expirationTime) { + delete(detectedEvents.isPreviouslyDetected, detectedEvent.requestId) + expiredRequests++ + } else { + break + } + } + detectedEvents.detectedEventsOrdered = detectedEvents.detectedEventsOrdered[expiredRequests:] + l.lggr.Debugw("filterPreviouslyDetectedEvents: done", "filterType", filterType, "nLogs", len(logs), "nFilteredLogs", len(filteredLogs), "nExpiredRequests", expiredRequests, "previouslyDetectedCacheSize", len(detectedEvents.detectedEventsOrdered)) + return filteredLogs +} + // "internal" method called only by EVM relayer components func (l *logPollerWrapper) SubscribeToUpdates(subscriberName string, subscriber evmRelayTypes.RouteUpdateSubscriber) { if l.pluginConfig.ContractVersion == 0 { diff --git a/core/services/relay/evm/functions/logpoller_wrapper_test.go b/core/services/relay/evm/functions/logpoller_wrapper_test.go index c91c3c49aad..2108e822d5e 100644 --- a/core/services/relay/evm/functions/logpoller_wrapper_test.go +++ b/core/services/relay/evm/functions/logpoller_wrapper_test.go @@ -1,22 +1,24 @@ -package functions_test +package functions import ( + "crypto/rand" "encoding/hex" "sync" "testing" + "time" "github.com/ethereum/go-ethereum/common" - gethcommon "github.com/ethereum/go-ethereum/common" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" evmclimocks "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client/mocks" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" lpmocks "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller/mocks" + "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/functions/generated/functions_coordinator" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/functions/config" - "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/functions" "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/types" ) @@ -57,17 +59,34 @@ func setUp(t *testing.T, updateFrequencySec uint32) (*lpmocks.LogPoller, types.L ContractUpdateCheckFrequencySec: updateFrequencySec, ContractVersion: 1, } - lpWrapper, err := functions.NewLogPollerWrapper(gethcommon.Address{}, config, client, lp, lggr) + lpWrapper, err := NewLogPollerWrapper(common.Address{}, config, client, lp, lggr) require.NoError(t, err) - lp.On("LatestBlock").Return(logpoller.LogPollerBlock{BlockNumber: int64(100)}, nil) - return lp, lpWrapper, client } +func getMockedRequestLog(t *testing.T) logpoller.Log { + // NOTE: Change this to be a more readable log generation + data, err := hex.DecodeString("000000000000000000000000c113ba31b0080f940ca5812bbccc1e038ea9efb40000000000000000000000000000000000000000000000000000000000000001000000000000000000000000c113ba31b0080f940ca5812bbccc1e038ea9efb4000000000000000000000000000000000000000000000000000000000000024000000000000000000000000000000000000000000000000000000000000000010000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000001117082cd81744eb9504dc37f53a86db7e3fb24929b8e7507b097d501ab5b315fb20e0000000000000000000000001b4f2b0e6363097f413c249910d5bc632993ed08000000000000000000000000000000000000000000000000015bcf880382c000000000000000000000000000665785a800593e8fa915208c1ce62f6e57fd75ba0000000000000000000000000000000000000000000000000000000000000001000000000000000000000000000000000000000000000000000000000001117000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000004f588000000000000000000000000000000000000000000000000000000000000c350000000000000000000000000000000000000000000000000000000000000021c00000000000000000000000000000000000000000000000000000000000008866c636f64654c6f636174696f6ec258200000000000000000000000000000000000000000000000000000000000000000686c616e6775616765c25820000000000000000000000000000000000000000000000000000000000000000066736f757263657907d0633836366665643238326533313137636466303836633934396662613133643834666331376131656335353934656361643034353133646632326137623538356333363763633132326236373138306334383737303435616235383033373463353066313862346564386132346131323437383532363731623030633035663237373163663036363632333663333236393939323139363866323833346438626462616266306661643165313237613837643237363936323831643965656539326134646263316337356137316136656333613135356438633230616661643064623432383362613433353736303734653035633433633561653061656466643332323838346536613231386466323430323630316436356437316131303061633065376563643037663565646364633535643562373932646130626632353665623038363139336463376431333965613764373965653531653831356465333834386565643363366330353837393265366461333434363738626436373239346636643639656564356132663836323835343965616530323235323835346232666361333635646265623032383433386537326234383465383864316136646563373933633739656265353834666465363465663831383363313365386231623735663037636532303963393138633532643637613735343862653236366433663964316439656132613162303166633838376231316162383739663164333861373833303563373031316533643938346130393863663634383931316536653065383038396365306130363230393136663134323935343036336630376239343931326435666331393366303138633764616135363136323562313966376463323036663930353365623234643036323234616164326338623430646162663631656166666635326234653831373239353837333830313561643730663739316663643864333739343035353737393563383937363164636665333639373938373437353439633234643530646464303563623337613465613863353162306530313032363738643433653766306563353039653434633564343764353335626261363831303936383264643864653439326532363633646336653133653532383539663664336565306533633430336236366362653338643236366137356163373639363863613465653331396166363965373431333137393162653630376537353832373430366164653038306335623239653665343262386563386137373761663865383166336234616337626263666531643066616633393338613664353061316561633835643933643234343066313863333037356237306433626134663930323836396439383937663266636562626262366263646439333436633336633663643838626434336265306562333134323562343665613765386338336638386230363933343836383666366134313839623535666132666431396634326264333730313634616339356530303635656461663130373761633131366632393930303833616631333839636661666336613433323439376531363437393762633738616633366335613435366136646661326636626430626639326136613930366130653930313130626266323265613066333163663364353132663466303331653236343330633831663935656431323362323938356266623830623161396432646337306232356264613961386261303839323833666166663634383661316231646235613938353564346237363966623835663531353063393935306462303964373536326537353133633234653531636163366634366634633231636234373561613937363166666466626434656138613531626465613432383037313466363538393630656336643139656539373237626339316635313665346466306665346264613762623035343161393462326334396636323938616132396337656130646662653635346632306437663164323239633066303262356535326137363031376237306439383232643533383166623966613166393361353861376338383632326631326462643363623937323363626132313639633337643538303939336333663666393065323039336331336130363132323334303064393731363031656262313631343332613966666333373033396562663537326364326566666635636562323539346236346462336261616431633734663532653938343938353964383363313238353465376263393764363432363464653931343735386333386438383739343132333937653263643534653431366234373962363331623830626633306266653062366239353564393066356362303435346361373531303963393938366330636536316165356566376534653433353036313432633633646235363862383634353139623463306636366137633161376661336538666431323231376666336665383164663830643138386232646334343833356132663332323733666133353139633531343764643233353763326161346336326461386238353232306535386130333565373662633133316634623734376632663731643263663933376431303832356138316533623963323136663962316134646431663239383463656635656363656265353530363662363061373263363063323864303336653766386635323131343735386638326366323330646636363930636364617267739f64617267316461726732ff6f736563726574734c6f636174696f6ec2582000000000000000000000000000000000000000000000000000000000000000016773656372657473430102030000000000000000000000000000000000000000000000000000") + require.NoError(t, err) + topic0, err := hex.DecodeString("bf50768ccf13bd0110ca6d53a9c4f1f3271abdd4c24a56878863ed25b20598ff") + require.NoError(t, err) + // Create a random requestID + topic1 := make([]byte, 32) + _, err = rand.Read(topic1) + require.NoError(t, err) + topic2, err := hex.DecodeString("000000000000000000000000665785a800593e8fa915208c1ce62f6e57fd75ba") + require.NoError(t, err) + return logpoller.Log{ + Topics: [][]byte{topic0, topic1, topic2}, + Data: data, + } +} + func TestLogPollerWrapper_SingleSubscriberEmptyEvents(t *testing.T) { t.Parallel() lp, lpWrapper, client := setUp(t, 100_000) // check only once + lp.On("LatestBlock").Return(logpoller.LogPollerBlock{BlockNumber: int64(100)}, nil) lp.On("Logs", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]logpoller.Log{}, nil) client.On("CallContract", mock.Anything, mock.Anything, mock.Anything).Return(addr(t, "01"), nil) @@ -87,7 +106,8 @@ func TestLogPollerWrapper_SingleSubscriberEmptyEvents(t *testing.T) { func TestLogPollerWrapper_ErrorOnZeroAddresses(t *testing.T) { t.Parallel() - _, lpWrapper, client := setUp(t, 100_000) // check only once + lp, lpWrapper, client := setUp(t, 100_000) // check only once + lp.On("LatestBlock").Return(logpoller.LogPollerBlock{BlockNumber: int64(100)}, nil) client.On("CallContract", mock.Anything, mock.Anything, mock.Anything).Return(addr(t, "00"), nil) @@ -96,3 +116,96 @@ func TestLogPollerWrapper_ErrorOnZeroAddresses(t *testing.T) { require.Error(t, err) lpWrapper.Close() } + +func TestLogPollerWrapper_LatestEvents_ReorgHandling(t *testing.T) { + t.Parallel() + lp, lpWrapper, client := setUp(t, 100_000) + lp.On("LatestBlock").Return(logpoller.LogPollerBlock{BlockNumber: int64(100)}, nil) + client.On("CallContract", mock.Anything, mock.Anything, mock.Anything).Return(addr(t, "01"), nil) + lp.On("RegisterFilter", mock.Anything).Return(nil) + subscriber := newSubscriber(1) + lpWrapper.SubscribeToUpdates("mock_subscriber", subscriber) + mockedLog := getMockedRequestLog(t) + // All logPoller queries for responses return none + lp.On("Logs", mock.Anything, mock.Anything, functions_coordinator.FunctionsCoordinatorOracleResponse{}.Topic(), mock.Anything).Return([]logpoller.Log{}, nil) + // On the first logPoller query for requests, the request log appears + lp.On("Logs", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]logpoller.Log{mockedLog}, nil).Once() + // On the 2nd query, the request log disappears + lp.On("Logs", mock.Anything, mock.Anything, functions_coordinator.FunctionsCoordinatorOracleRequest{}.Topic(), mock.Anything).Return([]logpoller.Log{}, nil).Once() + // On the 3rd query, the original request log appears again + lp.On("Logs", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]logpoller.Log{mockedLog}, nil).Once() + + require.NoError(t, lpWrapper.Start(testutils.Context(t))) + subscriber.updates.Wait() + + oracleRequests, _, err := lpWrapper.LatestEvents() + require.NoError(t, err) + assert.Equal(t, 1, len(oracleRequests)) + oracleRequests, _, err = lpWrapper.LatestEvents() + require.NoError(t, err) + assert.Equal(t, 0, len(oracleRequests)) + require.NoError(t, err) + oracleRequests, _, err = lpWrapper.LatestEvents() + require.NoError(t, err) + assert.Equal(t, 0, len(oracleRequests)) +} + +func TestLogPollerWrapper_FilterPreviouslyDetectedEvents_TruncatesLogs(t *testing.T) { + t.Parallel() + _, lpWrapper, _ := setUp(t, 100_000) + + inputLogs := make([]logpoller.Log, maxLogsToProcess+100) + for i := 0; i < 1100; i++ { + inputLogs[i] = getMockedRequestLog(t) + } + + functionsLpWrapper := lpWrapper.(*logPollerWrapper) + mockedDetectedEvents := detectedEvents{isPreviouslyDetected: make(map[[32]byte]struct{})} + outputLogs := functionsLpWrapper.filterPreviouslyDetectedEvents(inputLogs, &mockedDetectedEvents, "request") + + assert.Equal(t, maxLogsToProcess, len(outputLogs)) + assert.Equal(t, 1000, len(mockedDetectedEvents.detectedEventsOrdered)) + assert.Equal(t, 1000, len(mockedDetectedEvents.isPreviouslyDetected)) +} + +func TestLogPollerWrapper_FilterPreviouslyDetectedEvents_SkipsInvalidLog(t *testing.T) { + t.Parallel() + _, lpWrapper, _ := setUp(t, 100_000) + inputLogs := []logpoller.Log{getMockedRequestLog(t)} + inputLogs[0].Topics = [][]byte{[]byte("invalid topic")} + mockedDetectedEvents := detectedEvents{isPreviouslyDetected: make(map[[32]byte]struct{})} + + functionsLpWrapper := lpWrapper.(*logPollerWrapper) + outputLogs := functionsLpWrapper.filterPreviouslyDetectedEvents(inputLogs, &mockedDetectedEvents, "request") + + assert.Equal(t, 0, len(outputLogs)) + assert.Equal(t, 0, len(mockedDetectedEvents.detectedEventsOrdered)) + assert.Equal(t, 0, len(mockedDetectedEvents.isPreviouslyDetected)) +} + +func TestLogPollerWrapper_FilterPreviouslyDetectedEvents_FiltersPreviouslyDetectedEvent(t *testing.T) { + t.Parallel() + _, lpWrapper, _ := setUp(t, 100_000) + mockedRequestLog := getMockedRequestLog(t) + inputLogs := []logpoller.Log{mockedRequestLog} + var mockedRequestId [32]byte + copy(mockedRequestId[:], mockedRequestLog.Topics[1]) + + mockedDetectedEvents := detectedEvents{ + isPreviouslyDetected: make(map[[32]byte]struct{}), + detectedEventsOrdered: make([]detectedEvent, 1), + } + mockedDetectedEvents.isPreviouslyDetected[mockedRequestId] = struct{}{} + mockedDetectedEvents.detectedEventsOrdered[0] = detectedEvent{ + requestId: mockedRequestId, + timeDetected: time.Now().Add(-time.Second * time.Duration(logPollerCacheDurationSecDefault+1)), + } + + functionsLpWrapper := lpWrapper.(*logPollerWrapper) + outputLogs := functionsLpWrapper.filterPreviouslyDetectedEvents(inputLogs, &mockedDetectedEvents, "request") + + assert.Equal(t, 0, len(outputLogs)) + // Ensure that expired events are removed from the cache + assert.Equal(t, 0, len(mockedDetectedEvents.detectedEventsOrdered)) + assert.Equal(t, 0, len(mockedDetectedEvents.isPreviouslyDetected)) +}