Skip to content

Commit

Permalink
add some docs & reduce memory for attester duties
Browse files Browse the repository at this point in the history
  • Loading branch information
pk910 committed Aug 5, 2024
1 parent 1d573fa commit 315948e
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 3 deletions.
12 changes: 12 additions & 0 deletions indexer/beacon/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ type Block struct {
seenMap map[uint16]*Client
}

// BlockBodyIndex holds important block propoerties that are used as index for cache lookups.
// this structure should be preserved after pruning, so the block is still identifiable.
type BlockBodyIndex struct {
Graffiti [32]byte
ExecutionExtraData []byte
Expand Down Expand Up @@ -231,6 +233,7 @@ func (block *Block) EnsureBlock(loadBlock func() (*spec.VersionedSignedBeaconBlo
return true, nil
}

// setBlockIndex sets the block index of this block.
func (block *Block) setBlockIndex(body *spec.VersionedSignedBeaconBlock) {
blockIndex := &BlockBodyIndex{}
blockIndex.Graffiti, _ = body.Graffiti()
Expand All @@ -241,6 +244,7 @@ func (block *Block) setBlockIndex(body *spec.VersionedSignedBeaconBlock) {
block.blockIndex = blockIndex
}

// GetBlockIndex returns the block index of this block.
func (block *Block) GetBlockIndex() *BlockBodyIndex {
if block.blockIndex != nil {
return block.blockIndex
Expand Down Expand Up @@ -278,6 +282,7 @@ func (block *Block) buildUnfinalizedBlock(compress bool) (*dbtypes.UnfinalizedBl
}, nil
}

// buildOrphanedBlock builds an orphaned block from the block data.
func (block *Block) buildOrphanedBlock(compress bool) (*dbtypes.OrphanedBlock, error) {
headerSSZ, err := block.header.MarshalSSZ()
if err != nil {
Expand All @@ -298,6 +303,7 @@ func (block *Block) buildOrphanedBlock(compress bool) (*dbtypes.OrphanedBlock, e
}, nil
}

// unpruneBlockBody retrieves the block body from the database if it is not already present.
func (block *Block) unpruneBlockBody() {
if block.block != nil || !block.isInUnfinalizedDb {
return
Expand All @@ -309,6 +315,7 @@ func (block *Block) unpruneBlockBody() {
}
}

// GetDbBlock returns the database representation of this block.
func (block *Block) GetDbBlock(indexer *Indexer) *dbtypes.Slot {
var epochStats *EpochStats
chainState := indexer.consensusPool.GetChainState()
Expand All @@ -328,6 +335,7 @@ func (block *Block) GetDbBlock(indexer *Indexer) *dbtypes.Slot {
return dbBlock
}

// GetDbDeposits returns the database representation of the deposits in this block.
func (block *Block) GetDbDeposits(indexer *Indexer, depositIndex *uint64) []*dbtypes.Deposit {
orphaned := !indexer.IsCanonicalBlock(block, nil)
dbDeposits := indexer.dbWriter.buildDbDeposits(block, depositIndex, orphaned, nil)
Expand All @@ -336,16 +344,19 @@ func (block *Block) GetDbDeposits(indexer *Indexer, depositIndex *uint64) []*dbt
return dbDeposits
}

// GetDbVoluntaryExits returns the database representation of the voluntary exits in this block.
func (block *Block) GetDbVoluntaryExits(indexer *Indexer) []*dbtypes.VoluntaryExit {
orphaned := !indexer.IsCanonicalBlock(block, nil)
return indexer.dbWriter.buildDbVoluntaryExits(block, orphaned, nil)
}

// GetDbSlashings returns the database representation of the slashings in this block.
func (block *Block) GetDbSlashings(indexer *Indexer) []*dbtypes.Slashing {
orphaned := !indexer.IsCanonicalBlock(block, nil)
return indexer.dbWriter.buildDbSlashings(block, orphaned, nil)
}

// GetExecutionExtraData returns the execution extra data of this block.
func (block *Block) GetExecutionExtraData() []byte {
blockBody := block.GetBlock()
if blockBody == nil {
Expand All @@ -356,6 +367,7 @@ func (block *Block) GetExecutionExtraData() []byte {
return data
}

// GetForkId returns the fork ID of this block.
func (block *Block) GetForkId() ForkKey {
return block.forkId
}
7 changes: 7 additions & 0 deletions indexer/beacon/blockcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func (cache *blockCache) getBlocksBySlot(slot phase0.Slot) []*Block {
return blocks
}

// getBlocksByParentRoot returns a slice of blocks that have the given parent root.
func (cache *blockCache) getBlocksByParentRoot(parentRoot phase0.Root) []*Block {
cache.cacheMutex.RLock()
defer cache.cacheMutex.RUnlock()
Expand Down Expand Up @@ -108,6 +109,7 @@ func (cache *blockCache) getBlocksByParentRoot(parentRoot phase0.Root) []*Block
return resBlocks
}

// getBlockByStateRoot returns the block with the given state root.
func (cache *blockCache) getBlockByStateRoot(stateRoot phase0.Root) *Block {
cache.cacheMutex.RLock()
defer cache.cacheMutex.RUnlock()
Expand Down Expand Up @@ -203,6 +205,7 @@ func (cache *blockCache) getPruningBlocks(minInMemorySlot phase0.Slot) []*Block
return blocks
}

// getForkBlocks returns a slice of blocks that belong to the specified forkId.
func (cache *blockCache) getForkBlocks(forkId ForkKey) []*Block {
cache.cacheMutex.RLock()
defer cache.cacheMutex.RUnlock()
Expand All @@ -222,6 +225,8 @@ func (cache *blockCache) getForkBlocks(forkId ForkKey) []*Block {
return blocks
}

// getLatestBlocks returns the latest blocks from the block cache, up to the specified limit.
// If a forkId is provided, only blocks with matching forkId will be returned.
func (cache *blockCache) getLatestBlocks(limit uint64, forkId *ForkKey) []*Block {
cache.cacheMutex.RLock()
defer cache.cacheMutex.RUnlock()
Expand Down Expand Up @@ -252,6 +257,7 @@ func (cache *blockCache) getLatestBlocks(limit uint64, forkId *ForkKey) []*Block
return blocks
}

// removeBlock removes the given block from the block cache.
func (cache *blockCache) removeBlock(block *Block) {
cache.cacheMutex.Lock()
defer cache.cacheMutex.Unlock()
Expand All @@ -271,6 +277,7 @@ func (cache *blockCache) removeBlock(block *Block) {
}
}

// getEpochBlocks returns the blocks that belong to the specified epoch.
func (cache *blockCache) getEpochBlocks(epoch phase0.Epoch) []*Block {
cache.cacheMutex.RLock()
defer cache.cacheMutex.RUnlock()
Expand Down
3 changes: 2 additions & 1 deletion indexer/beacon/canonical.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,8 @@ func (indexer *Indexer) aggregateForkVotes(forkId ForkKey) (totalVotes phase0.Gw
return
}

// GetCanonicalValidatorSet returns the latest canonical validator set.
// If an overrideForkId is provided, the latest validator set for the fork is returned.
func (indexer *Indexer) GetCanonicalValidatorSet(overrideForkId *ForkKey) []*v1.Validator {
chainState := indexer.consensusPool.GetChainState()

Expand Down Expand Up @@ -327,5 +329,4 @@ func (indexer *Indexer) GetCanonicalValidatorSet(overrideForkId *ForkKey) []*v1.
}
}
}

}
4 changes: 4 additions & 0 deletions indexer/beacon/compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"compress/zlib"
)

// compressBytes compresses the given byte slice using zlib compression algorithm.
// It returns the compressed byte slice.
func compressBytes(data []byte) []byte {
var b bytes.Buffer
w := zlib.NewWriter(&b)
Expand All @@ -13,6 +15,8 @@ func compressBytes(data []byte) []byte {
return b.Bytes()
}

// decompressBytes decompresses the given byte slice using zlib decompression algorithm.
// It returns the decompressed byte slice and any error encountered during decompression.
func decompressBytes(data []byte) ([]byte, error) {
r, err := zlib.NewReader(bytes.NewReader(data))
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion indexer/beacon/duties/duties.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const totalSize = seedSize + roundSize + positionWindowSize

var maxShuffleListSize uint64 = 1 << 40

type ActiveIndiceIndex uint64
type ActiveIndiceIndex uint32

type BeaconState struct {
RandaoMix *phase0.Hash32
Expand Down
8 changes: 7 additions & 1 deletion indexer/beacon/epochstats.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ func (es *EpochStats) processState(indexer *Indexer) {
proposerIndex := phase0.ValidatorIndex(math.MaxInt64)
if err != nil {
indexer.logger.Warnf("failed computing proposer for slot %v: %v", slot, err)
proposer = math.MaxInt64
proposerIndex = math.MaxInt64
} else {
proposerIndex = values.ActiveIndices[proposer]
}
Expand Down Expand Up @@ -404,6 +404,7 @@ func (es *EpochStats) processState(indexer *Indexer) {
es.setStatsReady()
}

// precomputeFromParentState precomputes the EpochStats values based on the parent state.
func (es *EpochStats) precomputeFromParentState(indexer *Indexer, parentState *EpochStats) error {
es.precalcBaseRoot = parentState.dependentRoot

Expand Down Expand Up @@ -486,6 +487,7 @@ func (es *EpochStats) precomputeFromParentState(indexer *Indexer, parentState *E
})
}

// awaitStatsReady waits for the EpochStats values to be ready.
func (s *EpochStats) awaitStatsReady(ctx context.Context, timeout time.Duration) bool {
s.readyChanMutex.Lock()
if s.readyChan == nil && !s.ready {
Expand Down Expand Up @@ -531,6 +533,7 @@ func (es *EpochStats) GetValues(withPrecalc bool) *EpochStatsValues {
return nil
}

// GetOrLoadValues returns the EpochStats values, loading them from the database if necessary.
func (es *EpochStats) GetOrLoadValues(indexer *Indexer, withPrecalc bool, keepInCache bool) *EpochStatsValues {
if es == nil {
return nil
Expand All @@ -557,6 +560,7 @@ func (es *EpochStats) GetOrLoadValues(indexer *Indexer, withPrecalc bool, keepIn
return nil
}

// GetEffectiveBalance returns the effective balance for the given active validator indice.
func (v *EpochStatsValues) GetEffectiveBalance(index duties.ActiveIndiceIndex) phase0.Gwei {
if v == nil {
return 0
Expand All @@ -565,6 +569,7 @@ func (v *EpochStatsValues) GetEffectiveBalance(index duties.ActiveIndiceIndex) p
return phase0.Gwei(v.EffectiveBalances[index]) * EtherGweiFactor
}

// GetDbEpoch returns the database Epoch representaion for the EpochStats.
func (es *EpochStats) GetDbEpoch(indexer *Indexer, headBlock *Block) *dbtypes.Epoch {
chainState := indexer.consensusPool.GetChainState()
if headBlock == nil {
Expand Down Expand Up @@ -624,6 +629,7 @@ func (es *EpochStats) GetDbEpoch(indexer *Indexer, headBlock *Block) *dbtypes.Ep
return indexer.dbWriter.buildDbEpoch(es.epoch, epochBlocks, es, epochVotes, nil)
}

// GetEpochVotes aggregates & returns the EpochVotes for the EpochStats.
func (es *EpochStats) GetEpochVotes(indexer *Indexer, headBlock *Block) *EpochVotes {
chainState := indexer.consensusPool.GetChainState()
if headBlock == nil {
Expand Down

0 comments on commit 315948e

Please sign in to comment.