Skip to content

Commit

Permalink
various indexer fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
pk910 committed Jul 31, 2024
1 parent a2ef656 commit bf484d5
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 22 deletions.
23 changes: 8 additions & 15 deletions indexer/beacon/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,36 +187,29 @@ func (c *Client) processHeadEvent(headEvent *v1.HeadEvent) error {
if !bytes.Equal(dependentRoot[:], consensus.NullRoot[:]) {
block.dependentRoot = &dependentRoot

dependentBlock := c.indexer.blockCache.getBlockByRoot(dependentRoot)
dependentBlock = c.indexer.blockCache.getBlockByRoot(dependentRoot)
if dependentBlock == nil {
c.logger.Warnf("dependent block (%v) not found after backfilling", dependentRoot.String())
}
} else {
dependentBlock = c.indexer.blockCache.getDependentBlock(chainState, block, c)
}

currentBlock := block
minInMemorySlot := c.indexer.getMinInMemorySlot()
for {
if bytes.Equal(dependentRoot[:], consensus.NullRoot[:]) || dependentBlock == nil {
dependentBlock = c.indexer.blockCache.getDependentBlock(chainState, currentBlock, c)
if dependentBlock != nil {
dependentRoot = dependentBlock.Root
} else {
dependentRoot = consensus.NullRoot
}
}

if !bytes.Equal(dependentRoot[:], consensus.NullRoot[:]) && currentBlock.Slot >= minInMemorySlot {
if dependentBlock != nil && currentBlock.Slot >= minInMemorySlot {
// ensure epoch stats are in loading queue
epochStats, _ := c.indexer.epochCache.createOrGetEpochStats(chainState.EpochOfSlot(currentBlock.Slot), dependentRoot)
epochStats, _ := c.indexer.epochCache.createOrGetEpochStats(chainState.EpochOfSlot(currentBlock.Slot), dependentBlock.Root)
if !epochStats.addRequestedBy(c) {
break
}

currentBlock = dependentBlock
dependentBlock = nil
} else {
break
}

currentBlock = dependentBlock
dependentBlock = c.indexer.blockCache.getDependentBlock(chainState, currentBlock, c)
}

c.headRoot = block.Root
Expand Down
12 changes: 10 additions & 2 deletions indexer/beacon/epochstats.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,10 +369,18 @@ func (es *EpochStats) processState(indexer *Indexer) {
}

// GetValues returns the EpochStats values.
func (es *EpochStats) GetValues() *EpochStatsValues {
func (es *EpochStats) GetValues(chainState *consensus.ChainState) *EpochStatsValues {
if es == nil {
return nil
}

return es.values
if es.values != nil {
return es.values
}

if es.packedValues != nil {
return es.getUnpackedValues(chainState)
}

return nil
}
9 changes: 7 additions & 2 deletions indexer/beacon/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,15 @@ func (indexer *Indexer) StartIndexer() {
}

restoredEpochStats++
if dbDuty.Epoch < uint64(indexer.lastPrunedEpoch) {
epochStats.packValues()
if dbDuty.Epoch >= uint64(indexer.lastPrunedEpoch) {
epochStats.unpackValues(chainState)
}
})
if err != nil {
indexer.logger.WithError(err).Errorf("failed restoring unfinalized epoch stats from DB")
} else {
indexer.logger.Infof("restored %v unfinalized epoch stats from DB", restoredEpochStats)
}

// prefill block cache with all unfinalized blocks from db
restoredBlockCount := 0
Expand Down
2 changes: 1 addition & 1 deletion indexer/beacon/processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func (indexer *Indexer) finalizeEpoch(epoch phase0.Epoch, justifiedRoot phase0.R
return fmt.Errorf("missing epoch stats for epoch %v", epoch)
}

epochStatsValues := epochStats.GetValues()
epochStatsValues := epochStats.GetValues(chainState)
if epochStatsValues == nil {
return fmt.Errorf("missing epoch stats values for epoch %v", epoch)
}
Expand Down
4 changes: 2 additions & 2 deletions indexer/beacon/writedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func newDbWriter(indexer *Indexer) *dbWriter {

func (dbw *dbWriter) persistMissedSlots(tx *sqlx.Tx, epoch phase0.Epoch, blocks []*Block, epochStats *EpochStats) error {
chainState := dbw.indexer.consensusPool.GetChainState()
epochStatsValues := epochStats.GetValues()
epochStatsValues := epochStats.GetValues(chainState)

// insert missed slots
firstSlot := chainState.EpochStartSlot(epoch)
Expand Down Expand Up @@ -255,7 +255,7 @@ func (dbw *dbWriter) buildDbBlock(block *Block, epochStats *EpochStats, override

func (dbw *dbWriter) buildDbEpoch(epoch phase0.Epoch, blocks []*Block, epochStats *EpochStats, epochVotes *EpochVotes, blockFn func(block *Block, depositIndex *uint64)) *dbtypes.Epoch {
chainState := dbw.indexer.consensusPool.GetChainState()
epochStatsValues := epochStats.GetValues()
epochStatsValues := epochStats.GetValues(chainState)

// insert missed slots
firstSlot := chainState.EpochStartSlot(epoch)
Expand Down

0 comments on commit bf484d5

Please sign in to comment.