From bf484d5afc77d3777f0a4db842db61498c153208 Mon Sep 17 00:00:00 2001 From: pk910 Date: Wed, 31 Jul 2024 22:56:56 +0200 Subject: [PATCH] various indexer fixes --- indexer/beacon/client.go | 23 ++++++++--------------- indexer/beacon/epochstats.go | 12 ++++++++++-- indexer/beacon/indexer.go | 9 +++++++-- indexer/beacon/processing.go | 2 +- indexer/beacon/writedb.go | 4 ++-- 5 files changed, 28 insertions(+), 22 deletions(-) diff --git a/indexer/beacon/client.go b/indexer/beacon/client.go index 62a873b..4efaca7 100644 --- a/indexer/beacon/client.go +++ b/indexer/beacon/client.go @@ -187,36 +187,29 @@ func (c *Client) processHeadEvent(headEvent *v1.HeadEvent) error { if !bytes.Equal(dependentRoot[:], consensus.NullRoot[:]) { block.dependentRoot = &dependentRoot - dependentBlock := c.indexer.blockCache.getBlockByRoot(dependentRoot) + dependentBlock = c.indexer.blockCache.getBlockByRoot(dependentRoot) if dependentBlock == nil { c.logger.Warnf("dependent block (%v) not found after backfilling", dependentRoot.String()) } + } else { + dependentBlock = c.indexer.blockCache.getDependentBlock(chainState, block, c) } currentBlock := block minInMemorySlot := c.indexer.getMinInMemorySlot() for { - if bytes.Equal(dependentRoot[:], consensus.NullRoot[:]) || dependentBlock == nil { - dependentBlock = c.indexer.blockCache.getDependentBlock(chainState, currentBlock, c) - if dependentBlock != nil { - dependentRoot = dependentBlock.Root - } else { - dependentRoot = consensus.NullRoot - } - } - - if !bytes.Equal(dependentRoot[:], consensus.NullRoot[:]) && currentBlock.Slot >= minInMemorySlot { + if dependentBlock != nil && currentBlock.Slot >= minInMemorySlot { // ensure epoch stats are in loading queue - epochStats, _ := c.indexer.epochCache.createOrGetEpochStats(chainState.EpochOfSlot(currentBlock.Slot), dependentRoot) + epochStats, _ := c.indexer.epochCache.createOrGetEpochStats(chainState.EpochOfSlot(currentBlock.Slot), dependentBlock.Root) if !epochStats.addRequestedBy(c) { break } - - currentBlock = dependentBlock - dependentBlock = nil } else { break } + + currentBlock = dependentBlock + dependentBlock = c.indexer.blockCache.getDependentBlock(chainState, currentBlock, c) } c.headRoot = block.Root diff --git a/indexer/beacon/epochstats.go b/indexer/beacon/epochstats.go index 71673a2..0a2d0f7 100644 --- a/indexer/beacon/epochstats.go +++ b/indexer/beacon/epochstats.go @@ -369,10 +369,18 @@ func (es *EpochStats) processState(indexer *Indexer) { } // GetValues returns the EpochStats values. -func (es *EpochStats) GetValues() *EpochStatsValues { +func (es *EpochStats) GetValues(chainState *consensus.ChainState) *EpochStatsValues { if es == nil { return nil } - return es.values + if es.values != nil { + return es.values + } + + if es.packedValues != nil { + return es.getUnpackedValues(chainState) + } + + return nil } diff --git a/indexer/beacon/indexer.go b/indexer/beacon/indexer.go index 1470fe1..3f3b064 100644 --- a/indexer/beacon/indexer.go +++ b/indexer/beacon/indexer.go @@ -161,10 +161,15 @@ func (indexer *Indexer) StartIndexer() { } restoredEpochStats++ - if dbDuty.Epoch < uint64(indexer.lastPrunedEpoch) { - epochStats.packValues() + if dbDuty.Epoch >= uint64(indexer.lastPrunedEpoch) { + epochStats.unpackValues(chainState) } }) + if err != nil { + indexer.logger.WithError(err).Errorf("failed restoring unfinalized epoch stats from DB") + } else { + indexer.logger.Infof("restored %v unfinalized epoch stats from DB", restoredEpochStats) + } // prefill block cache with all unfinalized blocks from db restoredBlockCount := 0 diff --git a/indexer/beacon/processing.go b/indexer/beacon/processing.go index 17db911..ba1daea 100644 --- a/indexer/beacon/processing.go +++ b/indexer/beacon/processing.go @@ -243,7 +243,7 @@ func (indexer *Indexer) finalizeEpoch(epoch phase0.Epoch, justifiedRoot phase0.R return fmt.Errorf("missing epoch stats for epoch %v", epoch) } - epochStatsValues := epochStats.GetValues() + epochStatsValues := epochStats.GetValues(chainState) if epochStatsValues == nil { return fmt.Errorf("missing epoch stats values for epoch %v", epoch) } diff --git a/indexer/beacon/writedb.go b/indexer/beacon/writedb.go index ae0672a..1768754 100644 --- a/indexer/beacon/writedb.go +++ b/indexer/beacon/writedb.go @@ -24,7 +24,7 @@ func newDbWriter(indexer *Indexer) *dbWriter { func (dbw *dbWriter) persistMissedSlots(tx *sqlx.Tx, epoch phase0.Epoch, blocks []*Block, epochStats *EpochStats) error { chainState := dbw.indexer.consensusPool.GetChainState() - epochStatsValues := epochStats.GetValues() + epochStatsValues := epochStats.GetValues(chainState) // insert missed slots firstSlot := chainState.EpochStartSlot(epoch) @@ -255,7 +255,7 @@ func (dbw *dbWriter) buildDbBlock(block *Block, epochStats *EpochStats, override func (dbw *dbWriter) buildDbEpoch(epoch phase0.Epoch, blocks []*Block, epochStats *EpochStats, epochVotes *EpochVotes, blockFn func(block *Block, depositIndex *uint64)) *dbtypes.Epoch { chainState := dbw.indexer.consensusPool.GetChainState() - epochStatsValues := epochStats.GetValues() + epochStatsValues := epochStats.GetValues(chainState) // insert missed slots firstSlot := chainState.EpochStartSlot(epoch)