Skip to content

Commit

Permalink
update unfinalized blocks in batches to avoid too big queries
Browse files Browse the repository at this point in the history
  • Loading branch information
pk910 committed Aug 25, 2024
1 parent bd5a69e commit ac5ba22
Showing 1 changed file with 29 additions and 5 deletions.
34 changes: 29 additions & 5 deletions indexer/beacon/forkdetection.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ type updateForkInfo struct {
}

// processBlock processes a block and detects new forks if any.
// It persists the new forks to the database, updates any subsequent blocks building on top of the given block and returns the fork ID.
// It persists the new forks to the database, sets the forkId of the supplied block
// and updates the forkId of all blocks affected by newly detected forks.
func (cache *forkCache) processBlock(block *Block) error {
cache.forkProcessLock.Lock()
defer cache.forkProcessLock.Unlock()
Expand Down Expand Up @@ -149,7 +150,7 @@ func (cache *forkCache) processBlock(block *Block) error {
}

if len(childBlocks) > 1 {
// one or more forks detected
// multiple blocks building on top of the current one, create a fork for each
logbuf := strings.Builder{}
for idx, child := range childBlocks {
if cache.getForkByLeaf(child.Root) != nil {
Expand Down Expand Up @@ -179,7 +180,7 @@ func (cache *forkCache) processBlock(block *Block) error {
}
}

// update fork ids of all blocks building on top of this block
// update fork ids of all blocks building on top of the current block
updatedBlocks, updatedFork := cache.updateForkBlocks(block, currentForkId, true)
if updatedFork != nil {
updateForks = append(updateForks, updatedFork)
Expand All @@ -204,6 +205,29 @@ func (cache *forkCache) processBlock(block *Block) error {
// persist new forks and updated blocks to the database
if len(newForks) > 0 || len(updatedBlocks) > 0 {
err := db.RunDBTransaction(func(tx *sqlx.Tx) error {
// helper function to update unfinalized block fork ids in batches
updateUnfinalizedBlockForkIds := func(updateRoots [][]byte, forkId ForkKey) error {
batchSize := 1000
numBatches := (len(updateRoots) + batchSize - 1) / batchSize

for i := 0; i < numBatches; i++ {
start := i * batchSize
end := (i + 1) * batchSize
if end > len(updateRoots) {
end = len(updateRoots)
}

batchRoots := updateRoots[start:end]

err := db.UpdateUnfinalizedBlockForkId(batchRoots, uint64(forkId), tx)
if err != nil {
return err
}
}

return nil
}

// add new forks
for _, newFork := range newForks {
err := db.InsertFork(newFork.fork.toDbFork(), tx)
Expand All @@ -212,7 +236,7 @@ func (cache *forkCache) processBlock(block *Block) error {
}

if len(newFork.updateRoots) > 0 {
err = db.UpdateUnfinalizedBlockForkId(newFork.updateRoots, uint64(newFork.fork.forkId), tx)
err := updateUnfinalizedBlockForkIds(newFork.updateRoots, newFork.fork.forkId)
if err != nil {
return err
}
Expand All @@ -221,7 +245,7 @@ func (cache *forkCache) processBlock(block *Block) error {

// update blocks building on top of current block
if len(updatedBlocks) > 0 {
err := db.UpdateUnfinalizedBlockForkId(updatedBlocks, uint64(currentForkId), tx)
err := updateUnfinalizedBlockForkIds(updatedBlocks, currentForkId)
if err != nil {
return err
}
Expand Down

0 comments on commit ac5ba22

Please sign in to comment.