Skip to content

Commit

Permalink
added combined datasource approach for slashings page (show most rece…
Browse files Browse the repository at this point in the history
…nt slashings from indexer cache)
  • Loading branch information
pk910 committed Jun 14, 2024
1 parent 351aaf4 commit f0d3b8c
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 11 deletions.
8 changes: 1 addition & 7 deletions handlers/slashings.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

v1 "github.com/attestantio/go-eth2-client/api/v1"
"github.com/attestantio/go-eth2-client/spec/phase0"
"github.com/ethpandaops/dora/db"
"github.com/ethpandaops/dora/dbtypes"
"github.com/ethpandaops/dora/services"
"github.com/ethpandaops/dora/templates"
Expand Down Expand Up @@ -178,12 +177,7 @@ func buildFilteredSlashingsPageData(pageIdx uint64, pageSize uint64, minSlot uin
WithOrphaned: withOrphaned,
}

offset := (pageIdx - 1) * pageSize

dbSlashings, totalRows, err := db.GetSlashingsFiltered(offset, uint32(pageSize), uint64(finalizedEpoch), slashingFilter)
if err != nil {
panic(err)
}
dbSlashings, totalRows := services.GlobalBeaconService.GetSlashingsByFilter(slashingFilter, pageIdx-1, uint32(pageSize))

validatorSetRsp := services.GlobalBeaconService.GetCachedValidatorSet()
validatorActivityMap, validatorActivityMax := services.GlobalBeaconService.GetValidatorActivity()
Expand Down
4 changes: 2 additions & 2 deletions indexer/write_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ func BuildDbVoluntaryExits(block *CacheBlock) []*dbtypes.VoluntaryExit {

func persistBlockSlashings(block *CacheBlock, orphaned bool, tx *sqlx.Tx) error {
// insert slashings
dbSlashings := buildDbSlashings(block)
dbSlashings := BuildDbSlashings(block)
if orphaned {
for idx := range dbSlashings {
dbSlashings[idx].Orphaned = true
Expand All @@ -431,7 +431,7 @@ func persistBlockSlashings(block *CacheBlock, orphaned bool, tx *sqlx.Tx) error
return nil
}

func buildDbSlashings(block *CacheBlock) []*dbtypes.Slashing {
func BuildDbSlashings(block *CacheBlock) []*dbtypes.Slashing {
blockBody := block.GetBlockBody()
if blockBody == nil {
return nil
Expand Down
121 changes: 119 additions & 2 deletions services/chainservice_objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ func (bs *ChainService) GetVoluntaryExitsByFilter(filter *dbtypes.VoluntaryExitF
if !strings.Contains(validatorName, filter.ValidatorName) {
continue
}
continue
}

cachedMatches = append(cachedMatches, voluntaryExits[idx])
Expand Down Expand Up @@ -100,7 +99,125 @@ func (bs *ChainService) GetVoluntaryExitsByFilter(filter *dbtypes.VoluntaryExitF
}

if err != nil {
logrus.Warnf("GetVoluntaryExitsByFilter error: %v", err)
logrus.Warnf("ChainService.GetVoluntaryExitsByFilter error: %v", err)
} else {
for idx, dbObject := range dbObjects {
if dbObject.SlotNumber > finalizedBlock {
blockStatus := bs.CheckBlockOrphanedStatus(dbObject.SlotRoot)
dbObjects[idx].Orphaned = blockStatus == dbtypes.Orphaned
}

if filter.WithOrphaned != 1 {
if filter.WithOrphaned == 0 && dbObjects[idx].Orphaned {
continue
}
if filter.WithOrphaned == 2 && !dbObjects[idx].Orphaned {
continue
}
}

resObjs = append(resObjs, dbObjects[idx])
}
}

return resObjs, cachedMatchesLen + dbCount
}

func (bs *ChainService) GetSlashingsByFilter(filter *dbtypes.SlashingFilter, pageIdx uint64, pageSize uint32) ([]*dbtypes.Slashing, uint64) {
idxHeadSlot, finalizedEpoch, persistedEpoch, _ := bs.indexer.GetCacheState()
finalizedBlock := uint64(0)
if finalizedEpoch > 0 {
finalizedBlock = (uint64(finalizedEpoch)+1)*utils.Config.Chain.Config.SlotsPerEpoch - 1
}

// load most recent objects from indexer cache
idxMinSlot := (persistedEpoch + 1) * int64(utils.Config.Chain.Config.SlotsPerEpoch)
cachedMatches := make([]*dbtypes.Slashing, 0)
for slotIdx := int64(idxHeadSlot); slotIdx >= int64(idxMinSlot); slotIdx-- {
slot := uint64(slotIdx)
blocks := bs.indexer.GetCachedBlocks(slot)
if blocks != nil {
for bidx := 0; bidx < len(blocks); bidx++ {
block := blocks[bidx]
if filter.WithOrphaned != 1 {
isOrphaned := !block.IsCanonical(bs.indexer, nil)
if filter.WithOrphaned == 0 && isOrphaned {
continue
}
if filter.WithOrphaned == 2 && !isOrphaned {
continue
}
}
if filter.MinSlot > 0 && slot < filter.MinSlot {
continue
}
if filter.MaxSlot > 0 && slot > filter.MaxSlot {
continue
}

slashings := indexer.BuildDbSlashings(block)
for idx, slashing := range slashings {
if filter.MinIndex > 0 && slashing.ValidatorIndex < filter.MinIndex {
continue
}
if filter.MaxIndex > 0 && slashing.ValidatorIndex > filter.MaxIndex {
continue
}
if filter.ValidatorName != "" {
validatorName := bs.validatorNames.GetValidatorName(slashing.ValidatorIndex)
if !strings.Contains(validatorName, filter.ValidatorName) {
continue
}
}
if filter.SlasherName != "" {
slasherName := bs.validatorNames.GetValidatorName(slashing.SlasherIndex)
if !strings.Contains(slasherName, filter.SlasherName) {
continue
}
}

cachedMatches = append(cachedMatches, slashings[idx])
}
}
}
}

cachedMatchesLen := uint64(len(cachedMatches))
cachedPages := cachedMatchesLen / uint64(pageSize)
resObjs := make([]*dbtypes.Slashing, 0)
resIdx := 0

cachedStart := pageIdx * uint64(pageSize)
cachedEnd := cachedStart + uint64(pageSize)

if cachedPages > 0 && pageIdx < cachedPages {
resObjs = append(resObjs, cachedMatches[cachedStart:cachedEnd]...)
resIdx += int(cachedEnd - cachedStart)
} else if pageIdx == cachedPages {
resObjs = append(resObjs, cachedMatches[cachedStart:]...)
resIdx += len(cachedMatches) - int(cachedStart)
}

// load older objects from db
dbPage := pageIdx - cachedPages
dbCacheOffset := uint64(pageSize) - (cachedMatchesLen % uint64(pageSize))

var dbObjects []*dbtypes.Slashing
var dbCount uint64
var err error

if resIdx > int(pageSize) {
// all results from cache, just get result count from db
_, dbCount, err = db.GetSlashingsFiltered(0, 1, finalizedBlock, filter)
} else if dbPage == 0 {
// first page, load first `pagesize-cachedResults` items from db
dbObjects, dbCount, err = db.GetSlashingsFiltered(0, uint32(dbCacheOffset), finalizedBlock, filter)
} else {
dbObjects, dbCount, err = db.GetSlashingsFiltered((dbPage-1)*uint64(pageSize)+dbCacheOffset, pageSize, finalizedBlock, filter)
}

if err != nil {
logrus.Warnf("ChainService.GetSlashingsByFilter error: %v", err)
} else {
for idx, dbObject := range dbObjects {
if dbObject.SlotNumber > finalizedBlock {
Expand Down

0 comments on commit f0d3b8c

Please sign in to comment.