Skip to content

Commit

Permalink
improve canonical chain selection
Browse files Browse the repository at this point in the history
  • Loading branch information
pk910 committed Aug 28, 2024
1 parent 708a569 commit 2fe2e26
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 71 deletions.
2 changes: 1 addition & 1 deletion .hack/devnet/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ server:
port: "8080"
frontend:
enabled: true
debug: false
debug: true
pprof: true
minimize: false
siteName: "Dora the Explorer"
Expand Down
196 changes: 128 additions & 68 deletions indexer/beacon/canonical.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package beacon

import (
"bytes"
"fmt"
"math"
"slices"
"sort"
"strings"
"time"

v1 "github.com/attestantio/go-eth2-client/api/v1"
Expand All @@ -15,10 +17,9 @@ const FarFutureEpoch = phase0.Epoch(math.MaxUint64)

// ChainHead represents a head block of the chain.
type ChainHead struct {
HeadBlock *Block // The head block of the chain.
AggregatedHeadVotes phase0.Gwei // The aggregated votes of the last 2 epochs for the head block.
LastEpochVotingPercent float64 // The voting percentage in the last epoch.
ThisEpochVotingPercent float64 // The voting percentage in the current epoch.
HeadBlock *Block // The head block of the chain.
AggregatedHeadVotes phase0.Gwei // The aggregated votes of the last 2 epochs for the head block.
PerEpochVotingPercent []float64 // The voting percentage in the last epochs (ascendeing order).
}

// GetCanonicalHead returns the canonical head block of the chain.
Expand Down Expand Up @@ -47,12 +48,21 @@ func (indexer *Indexer) GetCanonicalHead(overrideForkId *ForkKey) *Block {

if len(chainHeadCandidates) > 0 {
sort.Slice(chainHeadCandidates, func(i, j int) bool {
if chainHeadCandidates[i].LastEpochVotingPercent != chainHeadCandidates[j].LastEpochVotingPercent {
return chainHeadCandidates[i].LastEpochVotingPercent > chainHeadCandidates[j].LastEpochVotingPercent
percentagesI := float64(0)
percentagesJ := float64(0)
for k := range chainHeadCandidates[i].PerEpochVotingPercent {
factor := float64(1)
if k == len(chainHeadCandidates[i].PerEpochVotingPercent)-1 {
factor = 0.5
}
percentagesI += chainHeadCandidates[i].PerEpochVotingPercent[k] * factor
percentagesJ += chainHeadCandidates[j].PerEpochVotingPercent[k] * factor
}
if chainHeadCandidates[i].ThisEpochVotingPercent != chainHeadCandidates[j].ThisEpochVotingPercent {
return chainHeadCandidates[i].ThisEpochVotingPercent > chainHeadCandidates[j].ThisEpochVotingPercent

if percentagesI != percentagesJ {
return percentagesI > percentagesJ
}

return chainHeadCandidates[i].HeadBlock.Slot > chainHeadCandidates[j].HeadBlock.Slot
})

Expand All @@ -70,12 +80,21 @@ func (indexer *Indexer) GetChainHeads() []*ChainHead {
heads := make([]*ChainHead, len(indexer.cachedChainHeads))
copy(heads, indexer.cachedChainHeads)
sort.Slice(heads, func(i, j int) bool {
if heads[i].LastEpochVotingPercent != heads[j].LastEpochVotingPercent {
return heads[i].LastEpochVotingPercent > heads[j].LastEpochVotingPercent
percentagesI := float64(0)
percentagesJ := float64(0)
for k := range heads[i].PerEpochVotingPercent {
factor := float64(1)
if k == len(heads[i].PerEpochVotingPercent)-1 {
factor = 0.5
}
percentagesI += heads[i].PerEpochVotingPercent[k] * factor
percentagesJ += heads[j].PerEpochVotingPercent[k] * factor
}
if heads[i].ThisEpochVotingPercent != heads[j].ThisEpochVotingPercent {
return heads[i].ThisEpochVotingPercent > heads[j].ThisEpochVotingPercent

if percentagesI != percentagesJ {
return percentagesI > percentagesJ
}

return heads[i].HeadBlock.Slot > heads[j].HeadBlock.Slot
})

Expand Down Expand Up @@ -103,6 +122,14 @@ func (indexer *Indexer) computeCanonicalChain() bool {

var headBlock *Block = nil
var chainHeads []*ChainHead = nil

chainState := indexer.consensusPool.GetChainState()
specs := chainState.GetSpecs()
aggregateEpochs := (32 / specs.SlotsPerEpoch) + 1 // aggregate votes of last 48 slots (2 epochs for mainnet, 5 epochs for minimal config)
if aggregateEpochs < 2 {
aggregateEpochs = 2
}

t1 := time.Now()

defer func() {
Expand All @@ -124,22 +151,26 @@ func (indexer *Indexer) computeCanonicalChain() bool {
if len(latestBlocks) > 0 {
headBlock = latestBlocks[0]

forkVotes, thisEpochPercent, lastEpochPercent := indexer.aggregateForkVotes(headBlock.forkId)
forkVotes, epochParticipation := indexer.aggregateForkVotes(headBlock.forkId, aggregateEpochs)
participationStr := make([]string, len(epochParticipation))
for i, p := range epochParticipation {
participationStr[i] = fmt.Sprintf("%.2f%%", p)
}

indexer.logger.Debugf(
"fork %v votes in last 2 epochs: %v ETH (%.2f%%, %.2f%%), head: %v (%v)",
"fork %v votes in last %v epochs: %v ETH (%v), head: %v (%v)",
headBlock.forkId,
aggregateEpochs,
forkVotes/EtherGweiFactor,
lastEpochPercent,
thisEpochPercent,
strings.Join(participationStr, ", "),
headBlock.Slot,
headBlock.Root.String(),
)

chainHeads = []*ChainHead{{
HeadBlock: headBlock,
AggregatedHeadVotes: forkVotes,
LastEpochVotingPercent: lastEpochPercent,
ThisEpochVotingPercent: thisEpochPercent,
HeadBlock: headBlock,
AggregatedHeadVotes: forkVotes,
PerEpochVotingPercent: epochParticipation,
}}
}
} else {
Expand All @@ -153,22 +184,25 @@ func (indexer *Indexer) computeCanonicalChain() bool {
continue
}

forkVotes, thisEpochPercent, lastEpochPercent := indexer.aggregateForkVotes(fork.ForkId)
forkVotes, epochParticipation := indexer.aggregateForkVotes(fork.ForkId, aggregateEpochs)
headForkVotes[fork.ForkId] = forkVotes
chainHeads = append(chainHeads, &ChainHead{
HeadBlock: fork.Block,
AggregatedHeadVotes: forkVotes,
LastEpochVotingPercent: lastEpochPercent,
ThisEpochVotingPercent: thisEpochPercent,
HeadBlock: fork.Block,
AggregatedHeadVotes: forkVotes,
PerEpochVotingPercent: epochParticipation,
})

if forkVotes > 0 {
participationStr := make([]string, len(epochParticipation))
for i, p := range epochParticipation {
participationStr[i] = fmt.Sprintf("%.2f%%", p)
}

indexer.logger.Infof(
"fork %v votes in last 2 epochs: %v ETH (%.2f%%, %.2f%%), head: %v (%v)",
"fork %v: votes in last 2 epochs: %v ETH (%v), head: %v (%v)",
fork.ForkId,
forkVotes/EtherGweiFactor,
lastEpochPercent,
thisEpochPercent,
strings.Join(participationStr, ", "),
fork.Block.Slot,
fork.Block.Root.String(),
)
Expand All @@ -187,13 +221,19 @@ func (indexer *Indexer) computeCanonicalChain() bool {
}

// aggregateForkVotes aggregates the votes for a given fork.
func (indexer *Indexer) aggregateForkVotes(forkId ForkKey) (totalVotes phase0.Gwei, thisEpochPercent float64, lastEpochPercent float64) {
func (indexer *Indexer) aggregateForkVotes(forkId ForkKey, epochLimit uint64) (totalVotes phase0.Gwei, epochPercent []float64) {
chainState := indexer.consensusPool.GetChainState()
specs := chainState.GetSpecs()
currentEpoch := chainState.CurrentEpoch()

epochPercent = make([]float64, 0, epochLimit)
if epochLimit == 0 {
return
}

minAggregateEpoch := currentEpoch
if minAggregateEpoch > 1 {
minAggregateEpoch -= 1
if minAggregateEpoch > phase0.Epoch(epochLimit)-1 {
minAggregateEpoch -= phase0.Epoch(epochLimit) - 1
} else {
minAggregateEpoch = 0
}
Expand All @@ -206,12 +246,12 @@ func (indexer *Indexer) aggregateForkVotes(forkId ForkKey) (totalVotes phase0.Gw
return
}

// get all blocks for given fork (and its parents) from the last 2 epochs
// get all blocks for given fork (and its parents) from the last epochs
lastBlocks := []*Block{}
lastSlot := phase0.Slot(0)
thisForkId := forkId
for {
for _, block := range indexer.blockCache.getLatestBlocks(2*specs.SlotsPerEpoch, &thisForkId) {
for _, block := range indexer.blockCache.getLatestBlocks(epochLimit*specs.SlotsPerEpoch, &thisForkId) {
lastSlot = block.Slot
if block.Slot < minAggregateSlot {
break
Expand All @@ -236,52 +276,72 @@ func (indexer *Indexer) aggregateForkVotes(forkId ForkKey) (totalVotes phase0.Gw
}

// already sorted descending by getLatestBlocks, reverse to ascending for aggregation
lastBlock := lastBlocks[0]
slices.Reverse(lastBlocks)

// aggregate votes for last & current epoch
if chainState.EpochOfSlot(lastBlock.Slot) == currentEpoch {
thisEpochDependent := indexer.blockCache.getDependentBlock(chainState, lastBlock, nil)
if thisEpochDependent == nil {
return
}
lastBlock = thisEpochDependent

thisEpochStats := indexer.epochCache.getEpochStats(currentEpoch, thisEpochDependent.Root)
if thisEpochStats != nil {
thisBlocks := []*Block{}
for _, block := range lastBlocks {
if chainState.EpochOfSlot(block.Slot) == currentEpoch {
thisBlocks = append(thisBlocks, block)
}
}

epochVotes := indexer.aggregateEpochVotes(currentEpoch, chainState, thisBlocks, thisEpochStats)
if epochVotes.AmountIsCount {
totalVotes += epochVotes.CurrentEpoch.TargetVoteAmount * 32 * EtherGweiFactor
// aggregate votes per epoch
lastBlockIdx := 0
for epoch := minAggregateEpoch; epoch <= currentEpoch; epoch++ {
epochVotingBlocks := []*Block{}
nextBlockIdx := 0
for lastBlockIdx < len(lastBlocks) {
if chainState.EpochOfSlot(lastBlocks[lastBlockIdx].Slot) == epoch {
epochVotingBlocks = append(epochVotingBlocks, lastBlocks[lastBlockIdx])
lastBlockIdx++
} else if lastBlockIdx+nextBlockIdx < len(lastBlocks) && chainState.EpochOfSlot(lastBlocks[lastBlockIdx+nextBlockIdx].Slot) == epoch+1 {
epochVotingBlocks = append(epochVotingBlocks, lastBlocks[lastBlockIdx+nextBlockIdx])
nextBlockIdx++
} else {
totalVotes += epochVotes.CurrentEpoch.TargetVoteAmount
break
}
thisEpochPercent = epochVotes.TargetVotePercent
}
}

if chainState.EpochOfSlot(lastBlock.Slot)+1 == currentEpoch {
lastEpochDependent := indexer.blockCache.getDependentBlock(chainState, lastBlock, nil)
if lastEpochDependent == nil {
return
if len(epochVotingBlocks) == 0 {
epochPercent = append(epochPercent, 0)
continue
}

dependentRoot := epochVotingBlocks[0].GetParentRoot()
if dependentRoot == nil {
epochPercent = append(epochPercent, 0)
continue
}

epochStats := indexer.epochCache.getEpochStats(epoch, *dependentRoot)
if epochStats == nil {
epochPercent = append(epochPercent, 0)
continue
}

lastEpochStats := indexer.epochCache.getEpochStats(currentEpoch-1, lastEpochDependent.Root)
if lastEpochStats != nil {
epochVotes := indexer.aggregateEpochVotes(currentEpoch-1, chainState, lastBlocks, lastEpochStats)
if epochVotes.AmountIsCount {
totalVotes += (epochVotes.CurrentEpoch.TargetVoteAmount + epochVotes.NextEpoch.TargetVoteAmount) * 32 * EtherGweiFactor
epochVotes := indexer.aggregateEpochVotes(epoch, chainState, epochVotingBlocks, epochStats)
if epochVotes.AmountIsCount {
totalVotes += (epochVotes.CurrentEpoch.TargetVoteAmount + epochVotes.NextEpoch.TargetVoteAmount) * 32 * EtherGweiFactor
} else {
totalVotes += epochVotes.CurrentEpoch.TargetVoteAmount + epochVotes.NextEpoch.TargetVoteAmount
}

lastBlock := epochVotingBlocks[len(epochVotingBlocks)-1]
epochProgress := float64(100)

if chainState.EpochOfSlot(lastBlock.Slot) == epoch {
lastBlockIndex := chainState.SlotToSlotIndex(lastBlock.Slot)
if lastBlockIndex > 0 {
epochProgress = float64(100*lastBlockIndex) / float64(chainState.GetSpecs().SlotsPerEpoch)
} else {
totalVotes += epochVotes.CurrentEpoch.TargetVoteAmount + epochVotes.NextEpoch.TargetVoteAmount
epochProgress = 0
}
lastEpochPercent = epochVotes.TargetVotePercent
}

var participationExtrapolation float64
if epochProgress == 0 {
participationExtrapolation = 0
} else {
participationExtrapolation = 100 * epochVotes.TargetVotePercent / epochProgress
}
if participationExtrapolation > 100 {
participationExtrapolation = 100
}

epochPercent = append(epochPercent, participationExtrapolation)
}

return
Expand Down
4 changes: 3 additions & 1 deletion indexer/beacon/forkdetection.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ func (cache *forkCache) processBlock(block *Block) error {
otherFork := newFork(cache.lastForkId, parentSlot, *parentRoot, otherChildren[0], parentForkId)
cache.addFork(otherFork)

updatedRoots, updatedFork, _ := cache.updateForkBlocks(otherChildren[0], otherFork.forkId, false)
updatedRoots, updatedFork, headBlock := cache.updateForkBlocks(otherChildren[0], otherFork.forkId, false)
otherFork.headBlock = headBlock
newFork := &newForkInfo{
fork: otherFork,
updateRoots: updatedRoots,
Expand Down Expand Up @@ -295,6 +296,7 @@ func (cache *forkCache) updateForkBlocks(startBlock *Block, forkId ForkKey, skip

if !skipStartBlock {
blockRoots = append(blockRoots, startBlock.Root[:])
startBlock.forkId = forkId
headBlock = startBlock
}

Expand Down
2 changes: 1 addition & 1 deletion indexer/beacon/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func (indexer *Indexer) StartIndexer() {
err = db.StreamUnfinalizedEpochs(uint64(finalizedEpoch), func(unfinalizedEpoch *dbtypes.UnfinalizedEpoch) {
epochStats := indexer.epochCache.getEpochStats(phase0.Epoch(unfinalizedEpoch.Epoch), phase0.Root(unfinalizedEpoch.DependentRoot))
if epochStats == nil {
indexer.logger.Warnf("failed restoring epoch aggregations for epoch %v [%x] from db: epoch stats not found", unfinalizedEpoch.Epoch, unfinalizedEpoch.DependentRoot)
indexer.logger.Debugf("failed restoring epoch aggregations for epoch %v [%x] from db: epoch stats not found", unfinalizedEpoch.Epoch, unfinalizedEpoch.DependentRoot)
return
}

Expand Down

0 comments on commit 2fe2e26

Please sign in to comment.