diff --git a/core/chains/evm/logpoller/log_poller.go b/core/chains/evm/logpoller/log_poller.go index f1fd03cf38..1e606d70a4 100644 --- a/core/chains/evm/logpoller/log_poller.go +++ b/core/chains/evm/logpoller/log_poller.go @@ -158,24 +158,25 @@ type Opts struct { // support chain, polygon, which has 2s block times, we need RPCs roughly with <= 500ms latency func NewLogPoller(orm ORM, ec Client, lggr logger.Logger, headTracker HeadTracker, opts Opts) *logPoller { return &logPoller{ - stopCh: make(chan struct{}), - ec: ec, - orm: orm, - headTracker: headTracker, - lggr: logger.Sugared(logger.Named(lggr, "LogPoller")), - replayStart: make(chan int64), - replayComplete: make(chan error), - pollPeriod: opts.PollPeriod, - backupPollerBlockDelay: opts.BackupPollerBlockDelay, - finalityDepth: opts.FinalityDepth, - useFinalityTag: opts.UseFinalityTag, - backfillBatchSize: opts.BackfillBatchSize, - rpcBatchSize: opts.RpcBatchSize, - keepFinalizedBlocksDepth: opts.KeepFinalizedBlocksDepth, - logPrunePageSize: opts.LogPrunePageSize, - filters: make(map[string]Filter), - filterDirty: true, // Always build Filter on first call to cache an empty filter if nothing registered yet. - finalityViolated: new(atomic.Bool), + stopCh: make(chan struct{}), + ec: ec, + orm: orm, + headTracker: headTracker, + lggr: logger.Sugared(logger.Named(lggr, "LogPoller")), + replayStart: make(chan int64), + replayComplete: make(chan error), + pollPeriod: opts.PollPeriod, + backupPollerBlockDelay: opts.BackupPollerBlockDelay, + finalityDepth: opts.FinalityDepth, + useFinalityTag: opts.UseFinalityTag, + backfillBatchSize: opts.BackfillBatchSize, + rpcBatchSize: opts.RpcBatchSize, + keepFinalizedBlocksDepth: opts.KeepFinalizedBlocksDepth, + logPrunePageSize: opts.LogPrunePageSize, + filters: make(map[string]Filter), + filterDirty: true, // Always build Filter on first call to cache an empty filter if nothing registered yet. + finalityViolated: new(atomic.Bool), + countBasedLogPruningActive: new(atomic.Bool), } } @@ -541,18 +542,37 @@ func (lp *logPoller) GetReplayFromBlock(ctx context.Context, requested int64) (i return mathutil.Min(requested, lastProcessed.BlockNumber), nil } +// loadFilters loads the filters from db, and activates count-based Log Pruning +// if required by any of the filters func (lp *logPoller) loadFilters(ctx context.Context) error { + filters, err := lp._loadFilters(ctx) + if err != nil { + return pkgerrors.Wrapf(err, "Failed to load initial filters from db, retrying") + } + if lp.countBasedLogPruningActive.Load() { + return nil + } + for _, filter := range filters { + if filter.MaxLogsKept != 0 { + lp.countBasedLogPruningActive.Store(true) + } + } + return nil +} + +// _loadFilters is the part of loadFilters() requiring a filterMu lock +func (lp *logPoller) _loadFilters(ctx context.Context) (filters map[string]Filter, err error) { lp.filterMu.Lock() defer lp.filterMu.Unlock() - filters, err := lp.orm.LoadFilters(ctx) + filters, err = lp.orm.LoadFilters(ctx) if err != nil { - return pkgerrors.Wrapf(err, "Failed to load initial filters from db, retrying") + return filters, err } lp.filters = filters lp.filterDirty = true - return nil + return filters, nil } // tickStaggeredDelay chooses a uniformly random amount of time to delay between minDelay and minDelay + period