Skip to content

Commit

Permalink
fixes for minimal preset & for handling epoch 0 finalization/synchron…
Browse files Browse the repository at this point in the history
…ization
  • Loading branch information
pk910 committed Aug 6, 2024
1 parent a6e8fea commit d553bdf
Show file tree
Hide file tree
Showing 8 changed files with 33 additions and 14 deletions.
3 changes: 3 additions & 0 deletions clients/consensus/rpc/beaconapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,9 @@ func (bc *BeaconClient) GetBlockHeaderBySlot(ctx context.Context, slot phase0.Sl
},
})
if err != nil {
if strings.HasPrefix(err.Error(), "GET failed with status 404") {
return nil, nil
}
return nil, err
}

Expand Down
6 changes: 5 additions & 1 deletion indexer/beacon/blockcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,10 @@ func (cache *blockCache) getDependentBlock(chainState *consensus.ChainState, blo
return dependentBlock
}

if block.Slot == 0 {
return block
}

parentRoot := block.GetParentRoot()
blockEpoch := chainState.EpochOfSlot(block.Slot)

Expand Down Expand Up @@ -406,7 +410,7 @@ func (cache *blockCache) getDependentBlock(chainState *consensus.ChainState, blo
break
}

if chainState.EpochOfSlot(parentBlock.Slot) < blockEpoch {
if chainState.EpochOfSlot(parentBlock.Slot) < blockEpoch || parentBlock.Slot == 0 {
block.dependentRoot = &parentBlock.Root
return parentBlock
}
Expand Down
2 changes: 2 additions & 0 deletions indexer/beacon/canonical.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,5 +328,7 @@ func (indexer *Indexer) GetCanonicalValidatorSet(overrideForkId *ForkKey) []*v1.
Validator: validator,
}
}

return validatorSet
}
}
2 changes: 1 addition & 1 deletion indexer/beacon/finalization.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (indexer *Indexer) finalizeEpoch(epoch phase0.Epoch, justifiedRoot phase0.R
dependentBlock := indexer.blockCache.getDependentBlock(chainState, firstBlock, client)
if dependentBlock != nil {
dependentRoot = dependentBlock.Root
isValid = chainState.EpochOfSlot(dependentBlock.Slot) < chainState.EpochOfSlot(firstBlock.Slot)
isValid = chainState.EpochOfSlot(dependentBlock.Slot) < chainState.EpochOfSlot(firstBlock.Slot) || dependentBlock.Slot == 0
} else {
depRoot := firstBlock.GetParentRoot()
if depRoot != nil {
Expand Down
22 changes: 12 additions & 10 deletions indexer/beacon/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,18 +85,10 @@ func NewIndexer(logger logrus.FieldLogger, consensusPool *consensus.Pool) *Index
blockCompression = false
}

// initialize dynamic SSZ encoder
staticSpec := map[string]any{}
specYaml, err := yaml.Marshal(consensusPool.GetChainState().GetSpecs())
if err != nil {
yaml.Unmarshal(specYaml, staticSpec)
}

// Create the indexer instance.
indexer := &Indexer{
logger: logger,
consensusPool: consensusPool,
dynSsz: dynssz.NewDynSsz(staticSpec),

writeDb: !utils.Config.Indexer.DisableIndexWriter,
disableSync: utils.Config.Indexer.DisableSynchronizer,
Expand Down Expand Up @@ -199,8 +191,18 @@ func (indexer *Indexer) StartIndexer() {
}

indexer.running = true
indexer.synchronizer = newSynchronizer(indexer, indexer.logger.WithField("service", "synchronizer"))
chainState := indexer.consensusPool.GetChainState()

// initialize dynamic SSZ encoder
staticSpec := map[string]any{}
specYaml, err := yaml.Marshal(chainState.GetSpecs())
if err == nil {
yaml.Unmarshal(specYaml, &staticSpec)
}
indexer.dynSsz = dynssz.NewDynSsz(staticSpec)

// initialize synchronizer & restore state
indexer.synchronizer = newSynchronizer(indexer, indexer.logger.WithField("service", "synchronizer"))
finalizedSlot := chainState.GetFinalizedSlot()
finalizedEpoch, _ := chainState.GetFinalizedCheckpoint()
indexer.lastFinalizedEpoch = finalizedEpoch
Expand Down Expand Up @@ -237,7 +239,7 @@ func (indexer *Indexer) StartIndexer() {
t1 := time.Now()
processingLimiter := make(chan bool, 10)
processingWaitGroup := sync.WaitGroup{}
err := db.StreamUnfinalizedDuties(func(dbDuty *dbtypes.UnfinalizedDuty) {
err = db.StreamUnfinalizedDuties(func(dbDuty *dbtypes.UnfinalizedDuty) {
if dbDuty.Epoch < uint64(finalizedEpoch) {
return
}
Expand Down
4 changes: 4 additions & 0 deletions indexer/beacon/requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ func LoadBeaconHeaderBySlot(ctx context.Context, client *Client, slot phase0.Slo
return nil, phase0.Root{}, false, err
}

if header == nil {
return nil, phase0.Root{}, false, nil
}

return header.Header, header.Root, !header.Canonical, nil
}

Expand Down
6 changes: 5 additions & 1 deletion indexer/beacon/synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,11 @@ func (sync *synchronizer) syncEpoch(syncEpoch phase0.Epoch, client *Client, last
// load epoch state
var dependentRoot phase0.Root
if firstBlock != nil {
dependentRoot = firstBlock.header.Message.ParentRoot
if firstBlock.Slot == 0 { // epoch 0 dependent root is the genesis block
dependentRoot = firstBlock.Root
} else {
dependentRoot = firstBlock.header.Message.ParentRoot
}
} else {
// get from db
depRoot := db.GetHighestRootBeforeSlot(uint64(firstSlot), false)
Expand Down
2 changes: 1 addition & 1 deletion indexer/beacon/writedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func (dbw *dbWriter) buildDbEpoch(epoch phase0.Epoch, blocks []*Block, epochStat
if block != nil {
dbEpoch.BlockCount++
blockBody := block.GetBlock()
if blockBody == nil {
if blockBody == nil && block.Slot > 0 {
dbw.indexer.logger.Errorf("error while building db epoch: block body not found for aggregation: %v", block.Slot)
continue
}
Expand Down

0 comments on commit d553bdf

Please sign in to comment.