Skip to content

Commit

Permalink
cleanup & comments
Browse files Browse the repository at this point in the history
  • Loading branch information
pk910 committed Jul 31, 2024
1 parent 2883219 commit e33a07f
Show file tree
Hide file tree
Showing 8 changed files with 68 additions and 42 deletions.
7 changes: 7 additions & 0 deletions indexer/beacon/block_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

var jsonVersionOffset uint64 = 0x70000000

// marshalVersionedSignedBeaconBlockSSZ marshals a versioned signed beacon block using SSZ encoding.
func marshalVersionedSignedBeaconBlockSSZ(dynSsz *dynssz.DynSsz, block *spec.VersionedSignedBeaconBlock) (version uint64, ssz []byte, err error) {
if utils.Config.KillSwitch.DisableSSZEncoding {
// SSZ encoding disabled, use json instead
Expand Down Expand Up @@ -48,6 +49,7 @@ func marshalVersionedSignedBeaconBlockSSZ(dynSsz *dynssz.DynSsz, block *spec.Ver
return
}

// unmarshalVersionedSignedBeaconBlockSSZ unmarshals a versioned signed beacon block using SSZ encoding.
func unmarshalVersionedSignedBeaconBlockSSZ(dynSsz *dynssz.DynSsz, version uint64, ssz []byte) (*spec.VersionedSignedBeaconBlock, error) {
if version >= jsonVersionOffset {
return unmarshalVersionedSignedBeaconBlockJson(version, ssz)
Expand Down Expand Up @@ -93,6 +95,7 @@ func unmarshalVersionedSignedBeaconBlockSSZ(dynSsz *dynssz.DynSsz, version uint6
return block, nil
}

// marshalVersionedSignedBeaconBlockJson marshals a versioned signed beacon block using JSON encoding.
func marshalVersionedSignedBeaconBlockJson(block *spec.VersionedSignedBeaconBlock) (version uint64, jsonRes []byte, err error) {
switch block.Version {
case spec.DataVersionPhase0:
Expand All @@ -119,6 +122,7 @@ func marshalVersionedSignedBeaconBlockJson(block *spec.VersionedSignedBeaconBloc
return
}

// unmarshalVersionedSignedBeaconBlockJson unmarshals a versioned signed beacon block using JSON encoding.
func unmarshalVersionedSignedBeaconBlockJson(version uint64, ssz []byte) (*spec.VersionedSignedBeaconBlock, error) {
if version < jsonVersionOffset {
return nil, fmt.Errorf("no json encoding")
Expand Down Expand Up @@ -163,6 +167,7 @@ func unmarshalVersionedSignedBeaconBlockJson(version uint64, ssz []byte) (*spec.
return block, nil
}

// getBlockExecutionExtraData returns the extra data from the execution payload of a versioned signed beacon block.
func getBlockExecutionExtraData(v *spec.VersionedSignedBeaconBlock) ([]byte, error) {
switch v.Version {
case spec.DataVersionBellatrix:
Expand Down Expand Up @@ -194,6 +199,7 @@ func getBlockExecutionExtraData(v *spec.VersionedSignedBeaconBlock) ([]byte, err
}
}

// getStateRandaoMixes returns the RANDAO mixes from a versioned beacon state.
func getStateRandaoMixes(v *spec.VersionedBeaconState) ([]phase0.Root, error) {
switch v.Version {
case spec.DataVersionBellatrix:
Expand Down Expand Up @@ -225,6 +231,7 @@ func getStateRandaoMixes(v *spec.VersionedBeaconState) ([]phase0.Root, error) {
}
}

// getStateDepositIndex returns the deposit index from a versioned beacon state.
func getStateDepositIndex(state *spec.VersionedBeaconState) uint64 {
switch state.Version {
case spec.DataVersionPhase0:
Expand Down
5 changes: 5 additions & 0 deletions indexer/beacon/epochstats.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type EpochStats struct {
values *EpochStatsValues
}

// EpochStatsValues holds the values for the epoch-specific information.
type EpochStatsValues struct {
ProposerDuties []phase0.ValidatorIndex
AttesterDuties [][][]EpochStatsAttesterDuty
Expand All @@ -30,6 +31,7 @@ type EpochStatsValues struct {
EffectiveBalance phase0.Gwei
}

// EpochStatsAttesterDuty holds the attester duty information for a validator.
type EpochStatsAttesterDuty struct {
ValidatorIndex phase0.ValidatorIndex
EffectiveBalanceEth uint16
Expand Down Expand Up @@ -72,6 +74,7 @@ func (es *EpochStats) getRequestedBy() []*Client {
return clients
}

// marshalSSZ marshals the EpochStats values using SSZ.
func (es *EpochStats) marshalSSZ(dynSsz *dynssz.DynSsz) ([]byte, error) {
if dynSsz == nil {
dynSsz = dynssz.NewDynSsz(nil)
Expand All @@ -82,6 +85,7 @@ func (es *EpochStats) marshalSSZ(dynSsz *dynssz.DynSsz) ([]byte, error) {
return dynSsz.MarshalSSZ(es.values)
}

// unmarshalSSZ unmarshals the EpochStats values using the provided SSZ bytes.
func (es *EpochStats) unmarshalSSZ(dynSsz *dynssz.DynSsz, ssz []byte) error {
if dynSsz == nil {
dynSsz = dynssz.NewDynSsz(nil)
Expand Down Expand Up @@ -194,6 +198,7 @@ func (es *EpochStats) processState(indexer *Indexer) {
indexer.logger.Infof("epoch %v stats (%v / %v) ready, %v bytes", es.epoch, es.dependentRoot.String(), es.dependentState.stateRoot.String(), len(ssz))
}

// GetValues returns the EpochStats values.
func (es *EpochStats) GetValues() *EpochStatsValues {
if es == nil {
return nil
Expand Down
43 changes: 23 additions & 20 deletions indexer/beacon/epochvotes.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,32 +10,34 @@ import (
"github.com/prysmaticlabs/go-bitfield"
)

// EpochVotes represents the aggregated votes for an epoch.
type EpochVotes struct {
currentEpoch struct {
targetVoteAmount phase0.Gwei
headVoteAmount phase0.Gwei
totalVoteAmount phase0.Gwei
CurrentEpoch struct {
TargetVoteAmount phase0.Gwei
HeadVoteAmount phase0.Gwei
TotalVoteAmount phase0.Gwei
}
nextEpoch struct {
targetVoteAmount phase0.Gwei
headVoteAmount phase0.Gwei
totalVoteAmount phase0.Gwei
NextEpoch struct {
TargetVoteAmount phase0.Gwei
HeadVoteAmount phase0.Gwei
TotalVoteAmount phase0.Gwei
}
ActivityMap map[phase0.ValidatorIndex]bool
}

// aggregateEpochVotes aggregates the votes for an epoch based on the provided chain state, blocks, and epoch stats.
func (indexer *Indexer) aggregateEpochVotes(chainState *consensus.ChainState, blocks []*Block, epochStats *EpochStats) *EpochVotes {
t1 := time.Now()

epochStatsValues := epochStats.values
specs := chainState.GetSpecs()

votes := EpochVotes{
votes := &EpochVotes{
ActivityMap: map[phase0.ValidatorIndex]bool{},
}

if len(blocks) == 0 || epochStatsValues == nil {
return &votes
return votes
}

var targetRoot phase0.Root
Expand Down Expand Up @@ -91,21 +93,21 @@ func (indexer *Indexer) aggregateEpochVotes(chainState *consensus.ChainState, bl
if uint64(committee) >= specs.MaxCommitteesPerSlot {
continue
}
voteAmt, committeeSize := aggregateAttestationVotes(&votes, epochStats, slotIndex, uint64(committee), attAggregationBits, aggregationBitsOffset)
voteAmt, committeeSize := votes.aggregateVotes(epochStats, slotIndex, uint64(committee), attAggregationBits, aggregationBitsOffset)
voteAmount += voteAmt
aggregationBitsOffset += committeeSize
}
} else {
// pre electra attestation aggregation
voteAmt, _ := aggregateAttestationVotes(&votes, epochStats, slotIndex, uint64(attData.Index), attAggregationBits, 0)
voteAmt, _ := votes.aggregateVotes(epochStats, slotIndex, uint64(attData.Index), attAggregationBits, 0)
voteAmount += voteAmt
}

if bytes.Equal(attData.Target.Root[:], targetRoot[:]) {
if isNextEpoch {
votes.nextEpoch.targetVoteAmount += voteAmount
votes.NextEpoch.TargetVoteAmount += voteAmount
} else {
votes.currentEpoch.targetVoteAmount += voteAmount
votes.CurrentEpoch.TargetVoteAmount += voteAmount
}
} /*else {
indexer.logger.Infof("vote target missmatch %v != 0x%x", attData.Target.Root, targetRoot)
Expand All @@ -114,24 +116,25 @@ func (indexer *Indexer) aggregateEpochVotes(chainState *consensus.ChainState, bl

if parentRoot != nil && bytes.Equal(attData.BeaconBlockRoot[:], parentRoot[:]) {
if isNextEpoch {
votes.nextEpoch.headVoteAmount += voteAmount
votes.NextEpoch.HeadVoteAmount += voteAmount
} else {
votes.currentEpoch.headVoteAmount += voteAmount
votes.CurrentEpoch.HeadVoteAmount += voteAmount
}
}
if isNextEpoch {
votes.nextEpoch.totalVoteAmount += voteAmount
votes.NextEpoch.TotalVoteAmount += voteAmount
} else {
votes.currentEpoch.totalVoteAmount += voteAmount
votes.CurrentEpoch.TotalVoteAmount += voteAmount
}
}
}

indexer.logger.Debugf("aggregated epoch %v votes in %v (blocks: %v, dependent: %v)", epochStats.epoch, time.Since(t1), len(blocks), epochStats.dependentRoot)
return &votes
return votes
}

func aggregateAttestationVotes(votes *EpochVotes, epochStats *EpochStats, slot phase0.Slot, committee uint64, aggregationBits bitfield.Bitfield, aggregationBitsOffset uint64) (phase0.Gwei, uint64) {
// aggregateVotes aggregates the votes for a specific slot and committee based on the provided epoch statistics, aggregation bits, and offset.
func (votes *EpochVotes) aggregateVotes(epochStats *EpochStats, slot phase0.Slot, committee uint64, aggregationBits bitfield.Bitfield, aggregationBitsOffset uint64) (phase0.Gwei, uint64) {
voteAmount := phase0.Gwei(0)

voteDuties := epochStats.values.AttesterDuties[slot][committee]
Expand Down
10 changes: 0 additions & 10 deletions indexer/beacon/finalization.go

This file was deleted.

18 changes: 12 additions & 6 deletions indexer/beacon/fork.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ import (
"github.com/ethpandaops/dora/dbtypes"
)

// ForkKey represents a key used for indexing forks.
type ForkKey uint64

// getForkKey calculates the ForkKey based on the base and leaf roots.
func getForkKey(baseRoot phase0.Root, leafRoot phase0.Root) ForkKey {
hashData := make([]byte, 64)
copy(hashData[:32], baseRoot[:])
Expand All @@ -22,15 +24,17 @@ func getForkKey(baseRoot phase0.Root, leafRoot phase0.Root) ForkKey {
return ForkKey(binary.BigEndian.Uint64(keyBytes))
}

// Fork represents a fork in the beacon chain.
type Fork struct {
forkId ForkKey
baseSlot phase0.Slot
baseRoot phase0.Root
leafSlot phase0.Slot
leafRoot phase0.Root
parentFork *Fork
forkId ForkKey // Unique identifier for the fork.
baseSlot phase0.Slot // Slot of the base block.
baseRoot phase0.Root // Root of the base block.
leafSlot phase0.Slot // Slot of the leaf block.
leafRoot phase0.Root // Root of the leaf block.
parentFork *Fork // Parent fork.
}

// newFork creates a new Fork instance.
func newFork(baseBlock *Block, leafBlock *Block, parentFork *Fork) *Fork {
fork := &Fork{
forkId: getForkKey(baseBlock.Root, leafBlock.Root),
Expand All @@ -44,6 +48,7 @@ func newFork(baseBlock *Block, leafBlock *Block, parentFork *Fork) *Fork {
return fork
}

// newForkFromDb creates a new Fork instance from a database record.
func newForkFromDb(dbFork *dbtypes.Fork, cache *forkCache) *Fork {
fork := &Fork{
forkId: ForkKey(dbFork.ForkId),
Expand All @@ -60,6 +65,7 @@ func newForkFromDb(dbFork *dbtypes.Fork, cache *forkCache) *Fork {
return fork
}

// toDbFork converts the Fork instance to a database record.
func (fork *Fork) toDbFork() *dbtypes.Fork {
dbFork := &dbtypes.Fork{
ForkId: uint64(fork.forkId),
Expand Down
9 changes: 9 additions & 0 deletions indexer/beacon/forkcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/jmoiron/sqlx"
)

// forkCache is a struct that represents the fork cache in the indexer.
type forkCache struct {
indexer *Indexer
cacheMutex sync.RWMutex
Expand All @@ -20,27 +21,31 @@ type forkCache struct {
forkProcessLock sync.Mutex
}

// newForkCache creates a new instance of the forkCache struct.
func newForkCache(indexer *Indexer) *forkCache {
return &forkCache{
indexer: indexer,
forkMap: make(map[ForkKey]*Fork),
}
}

// getForkById retrieves a fork from the cache by its ID.
func (cache *forkCache) getForkById(forkId ForkKey) *Fork {
cache.cacheMutex.RLock()
defer cache.cacheMutex.RUnlock()

return cache.forkMap[forkId]
}

// addFork adds a fork to the cache.
func (cache *forkCache) addFork(fork *Fork) {
cache.cacheMutex.Lock()
defer cache.cacheMutex.Unlock()

cache.forkMap[fork.forkId] = fork
}

// getClosestFork finds the closest fork that a given block is part of.
func (cache *forkCache) getClosestFork(block *Block) *Fork {
cache.cacheMutex.RLock()
defer cache.cacheMutex.RUnlock()
Expand All @@ -63,6 +68,8 @@ func (cache *forkCache) getClosestFork(block *Block) *Fork {
return closestFork
}

// checkForkDistance checks the distance between two blocks in a fork and returns the base block and distances.
// if the fork happened before the latest finalized slot, only the side of the fork that does not include the finalized block gets returned.
func (cache *forkCache) checkForkDistance(block1 *Block, block2 *Block, parentsMap map[phase0.Root]bool) (baseBlock *Block, block1Distance uint64, leafBlock1 *Block, block2Distance uint64, leafBlock2 *Block) {
finalizedSlot := cache.indexer.consensusPool.GetChainState().GetFinalizedSlot()
_, finalizedRoot := cache.indexer.consensusPool.GetChainState().GetFinalizedCheckpoint()
Expand Down Expand Up @@ -154,6 +161,7 @@ func (cache *forkCache) checkForkDistance(block1 *Block, block2 *Block, parentsM
return nil, 0, nil, 0, nil
}

// processBlock processes a block and detects new forks if any. persists the new forks to the database and returns the fork ID.
func (cache *forkCache) processBlock(block *Block) (ForkKey, error) {
cache.forkProcessLock.Lock()
defer cache.forkProcessLock.Unlock()
Expand Down Expand Up @@ -236,6 +244,7 @@ func (cache *forkCache) processBlock(block *Block) (ForkKey, error) {
return parentForkId, nil
}

// updateNewForkBlocks updates the fork blocks with the given fork. returns the roots of the updated blocks.
func (cache *forkCache) updateNewForkBlocks(fork *Fork, blocks []*Block) [][]byte {
updatedRoots := [][]byte{}

Expand Down
12 changes: 6 additions & 6 deletions indexer/beacon/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
type Indexer struct {
logger logrus.FieldLogger
consensusPool *consensus.Pool
dynSsz *dynssz.DynSsz

// configuration
writeDb bool
Expand All @@ -30,15 +31,14 @@ type Indexer struct {
minForkDistance uint16
cachePersistenceDelay uint16

// state
running bool
dynSsz *dynssz.DynSsz
clients []*Client
// caches
blockCache *blockCache
epochCache *epochCache
forkCache *forkCache

// worker state
// indexer state
clients []*Client
running bool
lastFinalizedEpoch phase0.Epoch
lastFinalizedRoot phase0.Root
lastPruningEpoch phase0.Epoch
Expand Down Expand Up @@ -73,6 +73,7 @@ func NewIndexer(logger logrus.FieldLogger, consensusPool *consensus.Pool) *Index
indexer := &Indexer{
logger: logger,
consensusPool: consensusPool,
dynSsz: dynssz.NewDynSsz(staticSpec),

writeDb: !utils.Config.Indexer.DisableIndexWriter,
disableSync: utils.Config.Indexer.DisableSynchronizer,
Expand All @@ -81,7 +82,6 @@ func NewIndexer(logger logrus.FieldLogger, consensusPool *consensus.Pool) *Index
minForkDistance: 3,
cachePersistenceDelay: cachePersistenceDelay,

dynSsz: dynssz.NewDynSsz(staticSpec),
clients: make([]*Client, 0),
}

Expand Down
6 changes: 6 additions & 0 deletions indexer/beacon/pruning.go → indexer/beacon/processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package beacon
import (
"sort"

v1 "github.com/attestantio/go-eth2-client/api/v1"
"github.com/attestantio/go-eth2-client/spec/phase0"
)

Expand Down Expand Up @@ -77,3 +78,8 @@ func (indexer *Indexer) pruneEpoch(epoch phase0.Epoch, pruneBlocks []*Block) err
return nil

}

func (indexer *Indexer) processFinalityEvent(finalityEvent *v1.Finality) error {
indexer.logger.Infof("finality event! %v %v", finalityEvent.Finalized.Epoch, finalityEvent.Finalized.Root.String())
return nil
}

0 comments on commit e33a07f

Please sign in to comment.