Skip to content

Commit

Permalink
Get request logs from past n blocks (#11052)
Browse files Browse the repository at this point in the history
* Get request logs from past n blocks

* Separate confirmations setting for reqs & resps

* Filter already detected reqs & resps

* validate pastBlocksToPoll

* Fixed bugs which appeared during tests

* Addressed feedback

* Added test

* Added comment to config

* Fixed log & removed const

* Used const values for defaults

* Address feedback

* Fixed types for logPoller

* Added tests for FilterPreviouslyDetectedEvents

* Added additional test assertions

* Fixed lint errors
  • Loading branch information
KuphJr authored Oct 31, 2023
1 parent 8c96682 commit 24de8af
Show file tree
Hide file tree
Showing 4 changed files with 250 additions and 56 deletions.
1 change: 1 addition & 0 deletions core/scripts/functions/templates/oracle.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ requestTimeoutSec = 300
maxRequestSizesList = [30_720, 51_200, 102_400, 204_800, 512_000, 1_048_576, 2_097_152, 3_145_728, 5_242_880, 10_485_760]
maxSecretsSizesList = [10_240, 20_480, 51_200, 102_400, 307_200, 512_000, 1_048_576, 2_097_152]
minimumSubscriptionBalance = "2 link"
pastBlocksToPoll = 25


[pluginConfig.OnchainAllowlist]
Expand Down
4 changes: 4 additions & 0 deletions core/services/ocr2/plugins/functions/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@ type PluginConfig struct {
EnableRequestSignatureCheck bool `json:"enableRequestSignatureCheck"`
DONID string `json:"donID"`
ContractVersion uint32 `json:"contractVersion"`
MinRequestConfirmations uint32 `json:"minRequestConfirmations"`
MinResponseConfirmations uint32 `json:"minResponseConfirmations"`
MinIncomingConfirmations uint32 `json:"minIncomingConfirmations"`
PastBlocksToPoll uint32 `json:"pastBlocksToPoll"`
LogPollerCacheDurationSec uint32 `json:"logPollerCacheDurationSec"` // Duration to cache previously detected request or response logs such that they can be filtered when calling logpoller_wrapper.LatestEvents()
RequestTimeoutSec uint32 `json:"requestTimeoutSec"`
RequestTimeoutCheckFrequencySec uint32 `json:"requestTimeoutCheckFrequencySec"`
RequestTimeoutBatchLookupSize uint32 `json:"requestTimeoutBatchLookupSize"`
Expand Down
174 changes: 125 additions & 49 deletions core/services/relay/evm/functions/logpoller_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,39 @@ import (
type logPollerWrapper struct {
services.StateMachine

routerContract *functions_router.FunctionsRouter
pluginConfig config.PluginConfig
client client.Client
logPoller logpoller.LogPoller
subscribers map[string]evmRelayTypes.RouteUpdateSubscriber
activeCoordinator common.Address
proposedCoordinator common.Address
blockOffset int64
nextBlock int64
mu sync.Mutex
closeWait sync.WaitGroup
stopCh utils.StopChan
lggr logger.Logger
routerContract *functions_router.FunctionsRouter
pluginConfig config.PluginConfig
client client.Client
logPoller logpoller.LogPoller
subscribers map[string]evmRelayTypes.RouteUpdateSubscriber
activeCoordinator common.Address
proposedCoordinator common.Address
requestBlockOffset int64
responseBlockOffset int64
pastBlocksToPoll int64
logPollerCacheDurationSec int64
detectedRequests detectedEvents
detectedResponses detectedEvents
mu sync.Mutex
closeWait sync.WaitGroup
stopCh utils.StopChan
lggr logger.Logger
}

type detectedEvent struct {
requestId [32]byte
timeDetected time.Time
}

type detectedEvents struct {
isPreviouslyDetected map[[32]byte]struct{}
detectedEventsOrdered []detectedEvent
}

const logPollerCacheDurationSecDefault = 300
const pastBlocksToPollDefault = 50
const maxLogsToProcess = 1000

var _ evmRelayTypes.LogPollerWrapper = &logPollerWrapper{}

func NewLogPollerWrapper(routerContractAddress common.Address, pluginConfig config.PluginConfig, client client.Client, logPoller logpoller.LogPoller, lggr logger.Logger) (evmRelayTypes.LogPollerWrapper, error) {
Expand All @@ -48,18 +66,48 @@ func NewLogPollerWrapper(routerContractAddress common.Address, pluginConfig conf
}
blockOffset := int64(pluginConfig.MinIncomingConfirmations) - 1
if blockOffset < 0 {
lggr.Warnw("invalid minIncomingConfirmations, using 1 instead", "minIncomingConfirmations", pluginConfig.MinIncomingConfirmations)
blockOffset = 0
}
requestBlockOffset := int64(pluginConfig.MinRequestConfirmations) - 1
if requestBlockOffset < 0 {
lggr.Warnw("invalid minRequestConfirmations, using minIncomingConfirmations instead", "minRequestConfirmations", pluginConfig.MinRequestConfirmations)
requestBlockOffset = blockOffset
}
responseBlockOffset := int64(pluginConfig.MinResponseConfirmations) - 1
if responseBlockOffset < 0 {
lggr.Warnw("invalid minResponseConfirmations, using minIncomingConfirmations instead", "minResponseConfirmations", pluginConfig.MinResponseConfirmations)
responseBlockOffset = blockOffset
}
logPollerCacheDurationSec := int64(pluginConfig.LogPollerCacheDurationSec)
if logPollerCacheDurationSec <= 0 {
lggr.Warnw("invalid logPollerCacheDuration, using 300 instead", "logPollerCacheDurationSec", logPollerCacheDurationSec)
logPollerCacheDurationSec = logPollerCacheDurationSecDefault
}
pastBlocksToPoll := int64(pluginConfig.PastBlocksToPoll)
if pastBlocksToPoll <= 0 {
lggr.Warnw("invalid pastBlocksToPoll, using 50 instead", "pastBlocksToPoll", pastBlocksToPoll)
pastBlocksToPoll = pastBlocksToPollDefault
}
if blockOffset >= pastBlocksToPoll || requestBlockOffset >= pastBlocksToPoll || responseBlockOffset >= pastBlocksToPoll {
lggr.Errorw("invalid config: number of required confirmation blocks >= pastBlocksToPoll", "pastBlocksToPoll", pastBlocksToPoll, "minIncomingConfirmations", pluginConfig.MinIncomingConfirmations, "minRequestConfirmations", pluginConfig.MinRequestConfirmations, "minResponseConfirmations", pluginConfig.MinResponseConfirmations)
return nil, errors.Errorf("invalid config: number of required confirmation blocks >= pastBlocksToPoll")
}

return &logPollerWrapper{
routerContract: routerContract,
pluginConfig: pluginConfig,
blockOffset: blockOffset,
logPoller: logPoller,
client: client,
subscribers: make(map[string]evmRelayTypes.RouteUpdateSubscriber),
stopCh: make(utils.StopChan),
lggr: lggr,
routerContract: routerContract,
pluginConfig: pluginConfig,
requestBlockOffset: requestBlockOffset,
responseBlockOffset: responseBlockOffset,
pastBlocksToPoll: pastBlocksToPoll,
logPollerCacheDurationSec: logPollerCacheDurationSec,
detectedRequests: detectedEvents{isPreviouslyDetected: make(map[[32]byte]struct{})},
detectedResponses: detectedEvents{isPreviouslyDetected: make(map[[32]byte]struct{})},
logPoller: logPoller,
client: client,
subscribers: make(map[string]evmRelayTypes.RouteUpdateSubscriber),
stopCh: make(utils.StopChan),
lggr: lggr,
}, nil
}

Expand All @@ -68,20 +116,11 @@ func (l *logPollerWrapper) Start(context.Context) error {
l.lggr.Infow("starting LogPollerWrapper", "routerContract", l.routerContract.Address().Hex(), "contractVersion", l.pluginConfig.ContractVersion)
l.mu.Lock()
defer l.mu.Unlock()
if l.pluginConfig.ContractVersion == 0 {
l.activeCoordinator = l.routerContract.Address()
l.proposedCoordinator = l.routerContract.Address()
} else if l.pluginConfig.ContractVersion == 1 {
nextBlock, err := l.logPoller.LatestBlock()
if err != nil {
l.lggr.Errorw("LogPollerWrapper: LatestBlock() failed, starting from 0", "error", err)
} else {
l.lggr.Debugw("LogPollerWrapper: LatestBlock() got starting block", "block", nextBlock)
l.nextBlock = nextBlock.BlockNumber - l.blockOffset
}
l.closeWait.Add(1)
go l.checkForRouteUpdates()
if l.pluginConfig.ContractVersion != 1 {
return errors.New("only contract version 1 is supported")
}
l.closeWait.Add(1)
go l.checkForRouteUpdates()
return nil
})
}
Expand Down Expand Up @@ -117,16 +156,15 @@ func (l *logPollerWrapper) LatestEvents() ([]evmRelayTypes.OracleRequest, []evmR
if l.proposedCoordinator != (common.Address{}) && l.activeCoordinator != l.proposedCoordinator {
coordinators = append(coordinators, l.proposedCoordinator)
}
nextBlock := l.nextBlock
latest, err := l.logPoller.LatestBlock()
if err != nil {
l.mu.Unlock()
return nil, nil, err
}
latestBlockNumber := latest.BlockNumber
latestBlockNumber -= l.blockOffset
if latestBlockNumber >= nextBlock {
l.nextBlock = latestBlockNumber + 1
latestBlockNum := latest.BlockNumber
startBlockNum := latestBlockNum - l.pastBlocksToPoll
if startBlockNum < 0 {
startBlockNum = 0
}
l.mu.Unlock()

Expand All @@ -137,22 +175,24 @@ func (l *logPollerWrapper) LatestEvents() ([]evmRelayTypes.OracleRequest, []evmR
l.lggr.Debug("LatestEvents: no non-zero coordinators to check")
return resultsReq, resultsResp, errors.New("no non-zero coordinators to check")
}
if latestBlockNumber < nextBlock {
l.lggr.Debugw("LatestEvents: no new blocks to check", "latest", latest, "nextBlock", nextBlock)
return resultsReq, resultsResp, nil
}

for _, coordinator := range coordinators {
requestLogs, err := l.logPoller.Logs(nextBlock, latestBlockNumber, functions_coordinator.FunctionsCoordinatorOracleRequest{}.Topic(), coordinator)
requestEndBlock := latestBlockNum - l.requestBlockOffset
requestLogs, err := l.logPoller.Logs(startBlockNum, requestEndBlock, functions_coordinator.FunctionsCoordinatorOracleRequest{}.Topic(), coordinator)
if err != nil {
l.lggr.Errorw("LatestEvents: fetching request logs from LogPoller failed", "latest", latest, "nextBlock", nextBlock)
l.lggr.Errorw("LatestEvents: fetching request logs from LogPoller failed", "startBlock", startBlockNum, "endBlock", requestEndBlock)
return nil, nil, err
}
responseLogs, err := l.logPoller.Logs(nextBlock, latestBlockNumber, functions_coordinator.FunctionsCoordinatorOracleResponse{}.Topic(), coordinator)
l.lggr.Debugw("LatestEvents: fetched request logs", "nRequestLogs", len(requestLogs), "latestBlock", latest, "startBlock", startBlockNum, "endBlock", requestEndBlock)
requestLogs = l.filterPreviouslyDetectedEvents(requestLogs, &l.detectedRequests, "requests")
responseEndBlock := latestBlockNum - l.responseBlockOffset
responseLogs, err := l.logPoller.Logs(startBlockNum, responseEndBlock, functions_coordinator.FunctionsCoordinatorOracleResponse{}.Topic(), coordinator)
if err != nil {
l.lggr.Errorw("LatestEvents: fetching response logs from LogPoller failed", "latest", latest, "nextBlock", nextBlock)
l.lggr.Errorw("LatestEvents: fetching response logs from LogPoller failed", "startBlock", startBlockNum, "endBlock", responseEndBlock)
return nil, nil, err
}
l.lggr.Debugw("LatestEvents: fetched request logs", "nResponseLogs", len(responseLogs), "latestBlock", latest, "startBlock", startBlockNum, "endBlock", responseEndBlock)
responseLogs = l.filterPreviouslyDetectedEvents(responseLogs, &l.detectedResponses, "responses")

parsingContract, err := functions_coordinator.NewFunctionsCoordinator(coordinator, l.client)
if err != nil {
Expand All @@ -165,7 +205,7 @@ func (l *logPollerWrapper) LatestEvents() ([]evmRelayTypes.OracleRequest, []evmR
gethLog := log.ToGethLog()
oracleRequest, err := parsingContract.ParseOracleRequest(gethLog)
if err != nil {
l.lggr.Errorw("LatestEvents: failed to parse a request log, skipping")
l.lggr.Errorw("LatestEvents: failed to parse a request log, skipping", "err", err)
continue
}

Expand Down Expand Up @@ -241,10 +281,46 @@ func (l *logPollerWrapper) LatestEvents() ([]evmRelayTypes.OracleRequest, []evmR
}
}

l.lggr.Debugw("LatestEvents: done", "nRequestLogs", len(resultsReq), "nResponseLogs", len(resultsResp), "nextBlock", nextBlock, "latest", latest)
l.lggr.Debugw("LatestEvents: done", "nRequestLogs", len(resultsReq), "nResponseLogs", len(resultsResp), "startBlock", startBlockNum, "endBlock", latestBlockNum)
return resultsReq, resultsResp, nil
}

func (l *logPollerWrapper) filterPreviouslyDetectedEvents(logs []logpoller.Log, detectedEvents *detectedEvents, filterType string) []logpoller.Log {
if len(logs) > maxLogsToProcess {
l.lggr.Errorw("filterPreviouslyDetectedEvents: too many logs to process, only processing latest maxLogsToProcess logs", "filterType", filterType, "nLogs", len(logs), "maxLogsToProcess", maxLogsToProcess)
logs = logs[len(logs)-maxLogsToProcess:]
}
l.mu.Lock()
defer l.mu.Unlock()
filteredLogs := []logpoller.Log{}
for _, log := range logs {
var requestId [32]byte
if len(log.Topics) < 2 || len(log.Topics[1]) != 32 {
l.lggr.Errorw("filterPreviouslyDetectedEvents: invalid log, skipping", "filterType", filterType, "log", log)
continue
}
copy(requestId[:], log.Topics[1]) // requestId is the second topic (1st topic is the event signature)
if _, ok := detectedEvents.isPreviouslyDetected[requestId]; !ok {
filteredLogs = append(filteredLogs, log)
detectedEvents.isPreviouslyDetected[requestId] = struct{}{}
detectedEvents.detectedEventsOrdered = append(detectedEvents.detectedEventsOrdered, detectedEvent{requestId: requestId, timeDetected: time.Now()})
}
}
expiredRequests := 0
for _, detectedEvent := range detectedEvents.detectedEventsOrdered {
expirationTime := time.Now().Add(-time.Second * time.Duration(l.logPollerCacheDurationSec))
if detectedEvent.timeDetected.Before(expirationTime) {
delete(detectedEvents.isPreviouslyDetected, detectedEvent.requestId)
expiredRequests++
} else {
break
}
}
detectedEvents.detectedEventsOrdered = detectedEvents.detectedEventsOrdered[expiredRequests:]
l.lggr.Debugw("filterPreviouslyDetectedEvents: done", "filterType", filterType, "nLogs", len(logs), "nFilteredLogs", len(filteredLogs), "nExpiredRequests", expiredRequests, "previouslyDetectedCacheSize", len(detectedEvents.detectedEventsOrdered))
return filteredLogs
}

// "internal" method called only by EVM relayer components
func (l *logPollerWrapper) SubscribeToUpdates(subscriberName string, subscriber evmRelayTypes.RouteUpdateSubscriber) {
if l.pluginConfig.ContractVersion == 0 {
Expand Down
Loading

0 comments on commit 24de8af

Please sign in to comment.