Skip to content

Commit

Permalink
switch remaining pages to new indexer
Browse files Browse the repository at this point in the history
  • Loading branch information
pk910 committed Aug 5, 2024
1 parent 2f59d0f commit 48cc357
Show file tree
Hide file tree
Showing 29 changed files with 427 additions and 217 deletions.
10 changes: 6 additions & 4 deletions clients/consensus/clientlogic.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,13 +175,15 @@ func (client *Client) runClientLogic() error {

client.logger.Tracef("event (%v) processing time: %v ms", evt.Event, time.Since(now).Milliseconds())
client.lastEvent = time.Now()
case ready := <-blockStream.ReadyChan:
if client.isOnline != ready {
client.isOnline = ready
if ready {
case streamStatus := <-blockStream.ReadyChan:
if client.isOnline != streamStatus.Ready {
client.isOnline = streamStatus.Ready
if streamStatus.Ready {
client.logger.Debug("RPC event stream connected")
client.lastError = nil
} else {
client.logger.Debug("RPC event stream disconnected")
client.lastError = streamStatus.Error
}
}
case <-time.After(eventTimeout):
Expand Down
18 changes: 14 additions & 4 deletions clients/consensus/rpc/beaconstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,19 @@ type BeaconStreamEvent struct {
Data interface{}
}

type BeaconStreamStatus struct {
Ready bool
Error error
}

type BeaconStream struct {
ctx context.Context
ctxCancel context.CancelFunc
logger logrus.FieldLogger
running bool
events uint16
client *BeaconClient
ReadyChan chan bool
ReadyChan chan *BeaconStreamStatus
EventChan chan *BeaconStreamEvent
lastHeadSeen time.Time
}
Expand All @@ -49,7 +54,7 @@ func (bc *BeaconClient) NewBlockStream(ctx context.Context, logger logrus.FieldL
running: true,
events: events,
client: bc,
ReadyChan: make(chan bool, 10),
ReadyChan: make(chan *BeaconStreamStatus, 10),
EventChan: make(chan *BeaconStreamEvent, 10),
}
go blockStream.startStream()
Expand Down Expand Up @@ -84,11 +89,16 @@ func (bs *BeaconStream) startStream() {
bs.processFinalizedEvent(evt)
}
case <-stream.Ready:
bs.ReadyChan <- true
bs.ReadyChan <- &BeaconStreamStatus{
Ready: true,
}
case err := <-stream.Errors:
bs.logger.Warnf("beacon block stream error: %v", err)
select {
case bs.ReadyChan <- false:
case bs.ReadyChan <- &BeaconStreamStatus{
Ready: false,
Error: err,
}:
case <-bs.ctx.Done():
}
}
Expand Down
22 changes: 11 additions & 11 deletions dbtypes/search.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package dbtypes

type SearchBlockResult struct {
Slot uint64 `db:"slot"`
Root []byte `db:"root"`
Orphaned bool `db:"orphaned"`
Slot uint64 `db:"slot"`
Root []byte `db:"root"`
Status SlotStatus `db:"status"`
}

type SearchGraffitiResult struct {
Expand All @@ -19,17 +19,17 @@ type SearchAheadEpochsResult []struct {
}

type SearchAheadSlotsResult []struct {
Slot uint64 `db:"slot"`
Root []byte `db:"root"`
Orphaned bool `db:"orphaned"`
Slot uint64 `db:"slot"`
Root []byte `db:"root"`
Status SlotStatus `db:"status"`
}

type SearchAheadExecBlocksResult []struct {
Slot uint64 `db:"slot"`
Root []byte `db:"root"`
ExecHash []byte `db:"eth_block_hash"`
ExecNumber uint64 `db:"eth_block_number"`
Orphaned bool `db:"orphaned"`
Slot uint64 `db:"slot"`
Root []byte `db:"root"`
ExecHash []byte `db:"eth_block_hash"`
ExecNumber uint64 `db:"eth_block_number"`
Status SlotStatus `db:"status"`
}

type SearchAheadGraffitiResult []struct {
Expand Down
6 changes: 3 additions & 3 deletions handlers/deposits.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func buildDepositsPageData(firstEpoch uint64, pageSize uint64) (*models.Deposits
}

validatorSetRsp := services.GlobalBeaconService.GetCachedValidatorPubkeyMap()
validatorActivityMap, validatorActivityMax := services.GlobalBeaconService.GetValidatorActivity()
validatorActivityMap, validatorActivityMax := services.GlobalBeaconService.GetValidatorActivity(3, false)

// load initiated deposits
dbDepositTxs := db.GetDepositTxs(0, 20)
Expand Down Expand Up @@ -121,7 +121,7 @@ func buildDepositsPageData(firstEpoch uint64, pageSize uint64) (*models.Deposits
}

if depositTxData.ShowUpcheck {
depositTxData.UpcheckActivity = validatorActivityMap[uint64(validator.Index)]
depositTxData.UpcheckActivity = validatorActivityMap[validator.Index]
depositTxData.UpcheckMaximum = uint8(validatorActivityMax)
}
}
Expand Down Expand Up @@ -172,7 +172,7 @@ func buildDepositsPageData(firstEpoch uint64, pageSize uint64) (*models.Deposits
}

if depositData.ShowUpcheck {
depositData.UpcheckActivity = validatorActivityMap[uint64(validator.Index)]
depositData.UpcheckActivity = validatorActivityMap[validator.Index]
depositData.UpcheckMaximum = uint8(validatorActivityMax)
}
}
Expand Down
4 changes: 2 additions & 2 deletions handlers/included_deposits.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func buildFilteredIncludedDepositsPageData(pageIdx uint64, pageSize uint64, minI
dbDeposits, totalRows := services.GlobalBeaconService.GetIncludedDepositsByFilter(depositFilter, pageIdx-1, uint32(pageSize))

validatorSetRsp := services.GlobalBeaconService.GetCachedValidatorPubkeyMap()
validatorActivityMap, validatorActivityMax := services.GlobalBeaconService.GetValidatorActivity()
validatorActivityMap, validatorActivityMax := services.GlobalBeaconService.GetValidatorActivity(3, false)

for _, deposit := range dbDeposits {
depositData := &models.IncludedDepositsPageDataDeposit{
Expand Down Expand Up @@ -210,7 +210,7 @@ func buildFilteredIncludedDepositsPageData(pageIdx uint64, pageSize uint64, minI
}

if depositData.ShowUpcheck {
depositData.UpcheckActivity = validatorActivityMap[uint64(validator.Index)]
depositData.UpcheckActivity = validatorActivityMap[validator.Index]
depositData.UpcheckMaximum = uint8(validatorActivityMax)
}
}
Expand Down
4 changes: 2 additions & 2 deletions handlers/initiated_deposits.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func buildFilteredInitiatedDepositsPageData(pageIdx uint64, pageSize uint64, add
}

validatorSetRsp := services.GlobalBeaconService.GetCachedValidatorPubkeyMap()
validatorActivityMap, validatorActivityMax := services.GlobalBeaconService.GetValidatorActivity()
validatorActivityMap, validatorActivityMax := services.GlobalBeaconService.GetValidatorActivity(3, false)

for _, depositTx := range dbDepositTxs {
depositTxData := &models.InitiatedDepositsPageDataDeposit{
Expand Down Expand Up @@ -217,7 +217,7 @@ func buildFilteredInitiatedDepositsPageData(pageIdx uint64, pageSize uint64, add
}

if depositTxData.ShowUpcheck {
depositTxData.UpcheckActivity = validatorActivityMap[uint64(validator.Index)]
depositTxData.UpcheckActivity = validatorActivityMap[validator.Index]
depositTxData.UpcheckMaximum = uint8(validatorActivityMax)
}
}
Expand Down
83 changes: 39 additions & 44 deletions handlers/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ func Search(w http.ResponseWriter, r *http.Request) {
if err == nil {
blockResult := &dbtypes.SearchBlockResult{}
err = db.ReaderDb.Get(blockResult, `
SELECT slot, root, orphaned
SELECT slot, root, status
FROM slots
WHERE slot = $1
WHERE slot = $1 AND status != 0
LIMIT 1`, searchQuery)
if err == nil {
if blockResult.Orphaned {
if blockResult.Status == dbtypes.Orphaned {
http.Redirect(w, r, fmt.Sprintf("/slot/0x%x", blockResult.Root), http.StatusMovedPermanently)
} else {
http.Redirect(w, r, fmt.Sprintf("/slot/%v", blockResult.Slot), http.StatusMovedPermanently)
Expand All @@ -62,7 +62,7 @@ func Search(w http.ResponseWriter, r *http.Request) {
state_root = $1
LIMIT 1`, blockHash)
if err == nil {
if blockResult.Orphaned {
if blockResult.Status == dbtypes.Orphaned {
http.Redirect(w, r, fmt.Sprintf("/slot/0x%x", blockResult.Root), http.StatusMovedPermanently)
} else {
http.Redirect(w, r, fmt.Sprintf("/slot/%v", blockResult.Slot), http.StatusMovedPermanently)
Expand Down Expand Up @@ -130,7 +130,10 @@ func SearchAhead(w http.ResponseWriter, r *http.Request) {
logger := logrus.WithField("searchType", searchType)
var result interface{}

indexer := services.GlobalBeaconService.GetIndexer()
indexer := services.GlobalBeaconService.GetBeaconIndexer()
_, pruneEpoch := indexer.GetBlockCacheState()
chainState := services.GlobalBeaconService.GetChainState()
minSlotIdx := chainState.EpochStartSlot(pruneEpoch)

switch searchType {
case "epochs":
Expand Down Expand Up @@ -160,30 +163,26 @@ func SearchAhead(w http.ResponseWriter, r *http.Request) {
return
}

cachedBlock := indexer.GetCachedBlock(blockHash)
cachedBlock := indexer.GetBlockByRoot(phase0.Root(blockHash))
if cachedBlock == nil {
cachedBlock = indexer.GetCachedBlockByStateroot(blockHash)
}
if cachedBlock != nil && !cachedBlock.IsReady() {
cachedBlock = nil
cachedBlock = indexer.GetBlockByStateRoot(phase0.Root(blockHash))
}
if cachedBlock != nil {
header := cachedBlock.GetHeader()
result = &[]models.SearchAheadSlotsResult{
{
Slot: fmt.Sprintf("%v", uint64(header.Message.Slot)),
Root: phase0.Root(cachedBlock.Root),
Orphaned: !cachedBlock.IsCanonical(indexer, nil),
Orphaned: !indexer.IsCanonicalBlock(cachedBlock, nil),
},
}
} else {
dbres := &dbtypes.SearchAheadSlotsResult{}
err = db.ReaderDb.Select(dbres, `
SELECT slot, root, orphaned
FROM slots
WHERE root = $1 OR
state_root = $1
ORDER BY slot LIMIT 1`, blockHash)
WHERE slot < $1 AND (root = $2 OR state_root = $2)
ORDER BY slot LIMIT 1`, minSlotIdx, blockHash)
if err != nil {
logger.Errorf("error reading block root: %v", err)
http.Error(w, "Internal server error", http.StatusServiceUnavailable)
Expand All @@ -194,42 +193,42 @@ func SearchAhead(w http.ResponseWriter, r *http.Request) {
{
Slot: fmt.Sprintf("%v", (*dbres)[0].Slot),
Root: phase0.Root((*dbres)[0].Root),
Orphaned: (*dbres)[0].Orphaned,
Orphaned: (*dbres)[0].Status == dbtypes.Orphaned,
},
}
}

}
} else if blockNumber, convertErr := strconv.ParseUint(search, 10, 32); convertErr == nil {
cachedBlocks := indexer.GetCachedBlocks(blockNumber)
cachedBlocks := indexer.GetBlocksBySlot(phase0.Slot(blockNumber))
if len(cachedBlocks) > 0 {
res := make([]*models.SearchAheadSlotsResult, 0)
for _, cachedBlock := range cachedBlocks {
if !cachedBlock.IsReady() {
header := cachedBlock.GetHeader()
if header == nil {
continue
}
header := cachedBlock.GetHeader()
res = append(res, &models.SearchAheadSlotsResult{
Slot: fmt.Sprintf("%v", uint64(header.Message.Slot)),
Root: phase0.Root(cachedBlock.Root),
Orphaned: !cachedBlock.IsCanonical(indexer, nil),
Orphaned: !indexer.IsCanonicalBlock(cachedBlock, nil),
})
}
result = res
} else {
dbres := &dbtypes.SearchAheadSlotsResult{}
err = db.ReaderDb.Select(dbres, `
SELECT slot, root, orphaned
SELECT slot, root, status
FROM slots
WHERE slot = $1
WHERE slot = $1 AND status != 0
ORDER BY slot LIMIT 10`, blockNumber)
if err == nil {
model := make([]models.SearchAheadSlotsResult, len(*dbres))
for idx, entry := range *dbres {
model[idx] = models.SearchAheadSlotsResult{
Slot: fmt.Sprintf("%v", entry.Slot),
Root: phase0.Root(entry.Root),
Orphaned: entry.Orphaned,
Orphaned: entry.Status == dbtypes.Orphaned,
}
}
result = model
Expand All @@ -251,30 +250,28 @@ func SearchAhead(w http.ResponseWriter, r *http.Request) {
return
}

cachedBlocks := indexer.GetCachedBlocksByExecutionBlockHash(blockHash)
cachedBlocks := indexer.GetBlocksByExecutionBlockHash(phase0.Hash32(blockHash))
if len(cachedBlocks) > 0 {
res := make([]*models.SearchAheadExecBlocksResult, 0)
for idx, cachedBlock := range cachedBlocks {
if !cachedBlock.IsReady() {
continue
}
header := cachedBlock.GetHeader()
index := cachedBlock.GetBlockIndex()
res[idx] = &models.SearchAheadExecBlocksResult{
Slot: fmt.Sprintf("%v", uint64(header.Message.Slot)),
Root: phase0.Root(cachedBlock.Root),
ExecHash: phase0.Hash32(cachedBlock.Refs.ExecutionHash),
ExecNumber: cachedBlock.Refs.ExecutionNumber,
Orphaned: !cachedBlock.IsCanonical(indexer, nil),
ExecHash: phase0.Hash32(index.ExecutionHash),
ExecNumber: index.ExecutionNumber,
Orphaned: !indexer.IsCanonicalBlock(cachedBlock, nil),
}
}
result = res
} else {
dbres := &dbtypes.SearchAheadExecBlocksResult{}
err = db.ReaderDb.Select(dbres, `
SELECT slot, root, eth_block_hash, eth_block_number, orphaned
SELECT slot, root, eth_block_hash, eth_block_number, status
FROM slots
WHERE eth_block_hash = $1
ORDER BY slot LIMIT 10`, blockHash)
WHERE slot < $1 AND eth_block_hash = $2
ORDER BY slot LIMIT 10`, minSlotIdx, blockHash)
if err != nil {
logger.Errorf("error reading block: %v", err)
http.Error(w, "Internal server error", http.StatusServiceUnavailable)
Expand All @@ -287,37 +284,35 @@ func SearchAhead(w http.ResponseWriter, r *http.Request) {
Root: phase0.Root((*dbres)[0].Root),
ExecHash: phase0.Hash32((*dbres)[0].ExecHash),
ExecNumber: (*dbres)[0].ExecNumber,
Orphaned: (*dbres)[0].Orphaned,
Orphaned: (*dbres)[0].Status == dbtypes.Orphaned,
},
}
}

}
} else if blockNumber, convertErr := strconv.ParseUint(search, 10, 32); convertErr == nil {
cachedBlocks := indexer.GetCachedBlocksByExecutionBlockNumber(blockNumber)
cachedBlocks := indexer.GetBlocksByExecutionBlockNumber(blockNumber)
if len(cachedBlocks) > 0 {
res := make([]*models.SearchAheadExecBlocksResult, 0)
for _, cachedBlock := range cachedBlocks {
if !cachedBlock.IsReady() {
continue
}
header := cachedBlock.GetHeader()
index := cachedBlock.GetBlockIndex()
res = append(res, &models.SearchAheadExecBlocksResult{
Slot: fmt.Sprintf("%v", uint64(header.Message.Slot)),
Root: phase0.Root(cachedBlock.Root),
ExecHash: phase0.Hash32(cachedBlock.Refs.ExecutionHash),
ExecNumber: cachedBlock.Refs.ExecutionNumber,
Orphaned: !cachedBlock.IsCanonical(indexer, nil),
ExecHash: phase0.Hash32(index.ExecutionHash),
ExecNumber: index.ExecutionNumber,
Orphaned: !indexer.IsCanonicalBlock(cachedBlock, nil),
})
}
result = res
} else {
dbres := &dbtypes.SearchAheadExecBlocksResult{}
err = db.ReaderDb.Select(dbres, `
SELECT slot, root, eth_block_hash, eth_block_number, orphaned
SELECT slot, root, eth_block_hash, eth_block_number, status
FROM slots
WHERE eth_block_number = $1
ORDER BY slot LIMIT 10`, blockNumber)
WHERE slot < $1 AND eth_block_number = $2
ORDER BY slot LIMIT 10`, minSlotIdx, blockNumber)
if err == nil {
model := make([]models.SearchAheadExecBlocksResult, len(*dbres))
for idx, entry := range *dbres {
Expand All @@ -326,7 +321,7 @@ func SearchAhead(w http.ResponseWriter, r *http.Request) {
Root: phase0.Root(entry.Root),
ExecHash: phase0.Hash32(entry.ExecHash),
ExecNumber: entry.ExecNumber,
Orphaned: entry.Orphaned,
Orphaned: entry.Status == dbtypes.Orphaned,
}
}
result = model
Expand Down
Loading

0 comments on commit 48cc357

Please sign in to comment.