Skip to content

Commit

Permalink
Fix setting of countBasedPruningActive flag
Browse files Browse the repository at this point in the history
  • Loading branch information
reductionista committed Oct 8, 2024
1 parent 502d628 commit f115060
Showing 1 changed file with 41 additions and 21 deletions.
62 changes: 41 additions & 21 deletions core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,24 +158,25 @@ type Opts struct {
// support chain, polygon, which has 2s block times, we need RPCs roughly with <= 500ms latency
func NewLogPoller(orm ORM, ec Client, lggr logger.Logger, headTracker HeadTracker, opts Opts) *logPoller {
return &logPoller{
stopCh: make(chan struct{}),
ec: ec,
orm: orm,
headTracker: headTracker,
lggr: logger.Sugared(logger.Named(lggr, "LogPoller")),
replayStart: make(chan int64),
replayComplete: make(chan error),
pollPeriod: opts.PollPeriod,
backupPollerBlockDelay: opts.BackupPollerBlockDelay,
finalityDepth: opts.FinalityDepth,
useFinalityTag: opts.UseFinalityTag,
backfillBatchSize: opts.BackfillBatchSize,
rpcBatchSize: opts.RpcBatchSize,
keepFinalizedBlocksDepth: opts.KeepFinalizedBlocksDepth,
logPrunePageSize: opts.LogPrunePageSize,
filters: make(map[string]Filter),
filterDirty: true, // Always build Filter on first call to cache an empty filter if nothing registered yet.
finalityViolated: new(atomic.Bool),
stopCh: make(chan struct{}),
ec: ec,
orm: orm,
headTracker: headTracker,
lggr: logger.Sugared(logger.Named(lggr, "LogPoller")),
replayStart: make(chan int64),
replayComplete: make(chan error),
pollPeriod: opts.PollPeriod,
backupPollerBlockDelay: opts.BackupPollerBlockDelay,
finalityDepth: opts.FinalityDepth,
useFinalityTag: opts.UseFinalityTag,
backfillBatchSize: opts.BackfillBatchSize,
rpcBatchSize: opts.RpcBatchSize,
keepFinalizedBlocksDepth: opts.KeepFinalizedBlocksDepth,
logPrunePageSize: opts.LogPrunePageSize,
filters: make(map[string]Filter),
filterDirty: true, // Always build Filter on first call to cache an empty filter if nothing registered yet.
finalityViolated: new(atomic.Bool),
countBasedLogPruningActive: new(atomic.Bool),

Check failure on line 179 in core/chains/evm/logpoller/log_poller.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_tests)

unknown field countBasedLogPruningActive in struct literal of type logPoller

Check failure on line 179 in core/chains/evm/logpoller/log_poller.go

View workflow job for this annotation

GitHub Actions / lint

unknown field countBasedLogPruningActive in struct literal of type logPoller

Check failure on line 179 in core/chains/evm/logpoller/log_poller.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_race_tests)

unknown field countBasedLogPruningActive in struct literal of type logPoller

Check failure on line 179 in core/chains/evm/logpoller/log_poller.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_fuzz)

unknown field countBasedLogPruningActive in struct literal of type logPoller

Check failure on line 179 in core/chains/evm/logpoller/log_poller.go

View workflow job for this annotation

GitHub Actions / Flakey Test Detection

unknown field countBasedLogPruningActive in struct literal of type logPoller
}
}

Expand Down Expand Up @@ -541,18 +542,37 @@ func (lp *logPoller) GetReplayFromBlock(ctx context.Context, requested int64) (i
return mathutil.Min(requested, lastProcessed.BlockNumber), nil
}

// loadFilters loads the filters from db, and activates count-based Log Pruning
// if required by any of the filters
func (lp *logPoller) loadFilters(ctx context.Context) error {
filters, err := lp._loadFilters(ctx)
if err != nil {
return pkgerrors.Wrapf(err, "Failed to load initial filters from db, retrying")
}
if lp.countBasedLogPruningActive.Load() {

Check failure on line 552 in core/chains/evm/logpoller/log_poller.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_tests)

lp.countBasedLogPruningActive undefined (type *logPoller has no field or method countBasedLogPruningActive)

Check failure on line 552 in core/chains/evm/logpoller/log_poller.go

View workflow job for this annotation

GitHub Actions / lint

lp.countBasedLogPruningActive undefined (type *logPoller has no field or method countBasedLogPruningActive)

Check failure on line 552 in core/chains/evm/logpoller/log_poller.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_race_tests)

lp.countBasedLogPruningActive undefined (type *logPoller has no field or method countBasedLogPruningActive)

Check failure on line 552 in core/chains/evm/logpoller/log_poller.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_fuzz)

lp.countBasedLogPruningActive undefined (type *logPoller has no field or method countBasedLogPruningActive)

Check failure on line 552 in core/chains/evm/logpoller/log_poller.go

View workflow job for this annotation

GitHub Actions / Flakey Test Detection

lp.countBasedLogPruningActive undefined (type *logPoller has no field or method countBasedLogPruningActive)
return nil
}
for _, filter := range filters {
if filter.MaxLogsKept != 0 {
lp.countBasedLogPruningActive.Store(true)

Check failure on line 557 in core/chains/evm/logpoller/log_poller.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_tests)

lp.countBasedLogPruningActive undefined (type *logPoller has no field or method countBasedLogPruningActive)

Check failure on line 557 in core/chains/evm/logpoller/log_poller.go

View workflow job for this annotation

GitHub Actions / lint

lp.countBasedLogPruningActive undefined (type *logPoller has no field or method countBasedLogPruningActive)

Check failure on line 557 in core/chains/evm/logpoller/log_poller.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_race_tests)

lp.countBasedLogPruningActive undefined (type *logPoller has no field or method countBasedLogPruningActive)

Check failure on line 557 in core/chains/evm/logpoller/log_poller.go

View workflow job for this annotation

GitHub Actions / Core Tests (go_core_fuzz)

lp.countBasedLogPruningActive undefined (type *logPoller has no field or method countBasedLogPruningActive)

Check failure on line 557 in core/chains/evm/logpoller/log_poller.go

View workflow job for this annotation

GitHub Actions / Flakey Test Detection

lp.countBasedLogPruningActive undefined (type *logPoller has no field or method countBasedLogPruningActive)
}
}
return nil
}

// _loadFilters is the part of loadFilters() requiring a filterMu lock
func (lp *logPoller) _loadFilters(ctx context.Context) (filters map[string]Filter, err error) {
lp.filterMu.Lock()
defer lp.filterMu.Unlock()
filters, err := lp.orm.LoadFilters(ctx)

filters, err = lp.orm.LoadFilters(ctx)
if err != nil {
return pkgerrors.Wrapf(err, "Failed to load initial filters from db, retrying")
return filters, err
}

lp.filters = filters
lp.filterDirty = true
return nil
return filters, nil
}

// tickStaggeredDelay chooses a uniformly random amount of time to delay between minDelay and minDelay + period
Expand Down

0 comments on commit f115060

Please sign in to comment.