From ed3cd9cd64c5ff12da698c12667bee4df6b27cf9 Mon Sep 17 00:00:00 2001 From: pk910 Date: Sat, 24 Aug 2024 10:37:55 +0200 Subject: [PATCH 1/9] add parentRoot->block map to blockcache --- indexer/beacon/blockcache.go | 69 ++++++++++++++++++++++++------------ indexer/beacon/client.go | 2 ++ indexer/beacon/indexer.go | 1 + 3 files changed, 50 insertions(+), 22 deletions(-) diff --git a/indexer/beacon/blockcache.go b/indexer/beacon/blockcache.go index 5d4cf31..9aeaa2e 100644 --- a/indexer/beacon/blockcache.go +++ b/indexer/beacon/blockcache.go @@ -18,15 +18,17 @@ type blockCache struct { lowestSlot int64 slotMap map[phase0.Slot][]*Block rootMap map[phase0.Root]*Block + parentMap map[phase0.Root][]*Block latestBlock *Block // latest added block (might not be the head block, just a marker for cache changes) } // newBlockCache creates a new instance of blockCache. func newBlockCache(indexer *Indexer) *blockCache { return &blockCache{ - indexer: indexer, - slotMap: map[phase0.Slot][]*Block{}, - rootMap: map[phase0.Root]*Block{}, + indexer: indexer, + slotMap: map[phase0.Slot][]*Block{}, + rootMap: map[phase0.Root]*Block{}, + parentMap: map[phase0.Root][]*Block{}, } } @@ -60,6 +62,25 @@ func (cache *blockCache) createOrGetBlock(root phase0.Root, slot phase0.Slot) (* return cacheBlock, true } +// addBlockToParentMap adds the given block to the parent map. +func (cache *blockCache) addBlockToParentMap(block *Block) { + cache.cacheMutex.Lock() + defer cache.cacheMutex.Unlock() + + parentRoot := block.GetParentRoot() + if parentRoot == nil { + return + } + + for _, parentBlock := range cache.parentMap[*parentRoot] { + if parentBlock == block { + return + } + } + + cache.parentMap[*parentRoot] = append(cache.parentMap[*parentRoot], block) +} + // getBlockByRoot returns the cached block with the given root. func (cache *blockCache) getBlockByRoot(root phase0.Root) *Block { cache.cacheMutex.RLock() @@ -86,27 +107,13 @@ func (cache *blockCache) getBlocksByParentRoot(parentRoot phase0.Root) []*Block cache.cacheMutex.RLock() defer cache.cacheMutex.RUnlock() - parentBlock := cache.rootMap[parentRoot] - - resBlocks := []*Block{} - for slot, blocks := range cache.slotMap { - if parentBlock != nil && slot <= parentBlock.Slot { - continue - } - - for _, block := range blocks { - blockParentRoot := block.GetParentRoot() - if blockParentRoot == nil { - continue - } - - if bytes.Equal((*blockParentRoot)[:], parentRoot[:]) { - resBlocks = append(resBlocks, block) - } - } + cachedBlocks := cache.parentMap[parentRoot] + blocks := make([]*Block, len(cachedBlocks)) + if len(blocks) > 0 { + copy(blocks, cachedBlocks) } - return resBlocks + return blocks } // getBlockByStateRoot returns the block with the given state root. @@ -265,8 +272,10 @@ func (cache *blockCache) removeBlock(block *Block) { cache.cacheMutex.Lock() defer cache.cacheMutex.Unlock() + // remove the block from the root map. delete(cache.rootMap, block.Root) + // remove the block from the slot map. slotBlocks := cache.slotMap[block.Slot] if len(slotBlocks) == 1 && slotBlocks[0] == block { delete(cache.slotMap, block.Slot) @@ -278,6 +287,22 @@ func (cache *blockCache) removeBlock(block *Block) { } } } + + // remove the block from the parent map. + if parentRoot := block.GetParentRoot(); parentRoot != nil { + parentBlocks := cache.parentMap[*parentRoot] + if len(parentBlocks) == 1 && parentBlocks[0] == block { + delete(cache.parentMap, *parentRoot) + } else if len(parentBlocks) > 1 { + for i, parentBlock := range parentBlocks { + if parentBlock == block { + cache.parentMap[*parentRoot] = append(parentBlocks[:i], parentBlocks[i+1:]...) + break + } + } + } + } + } // getEpochBlocks returns the blocks that belong to the specified epoch. diff --git a/indexer/beacon/client.go b/indexer/beacon/client.go index fcc2dac..19d1af9 100644 --- a/indexer/beacon/client.go +++ b/indexer/beacon/client.go @@ -363,6 +363,8 @@ func (c *Client) processBlock(slot phase0.Slot, root phase0.Root, header *phase0 } if slot >= finalizedSlot && isNew { + c.indexer.blockCache.addBlockToParentMap(block) + // fork detection err2 := c.indexer.forkCache.processBlock(block) if err2 != nil { diff --git a/indexer/beacon/indexer.go b/indexer/beacon/indexer.go index bb8032a..5345fc1 100644 --- a/indexer/beacon/indexer.go +++ b/indexer/beacon/indexer.go @@ -318,6 +318,7 @@ func (indexer *Indexer) StartIndexer() { } block.SetHeader(header) + indexer.blockCache.addBlockToParentMap(block) blockBody, err := unmarshalVersionedSignedBeaconBlockSSZ(indexer.dynSsz, dbBlock.BlockVer, dbBlock.BlockSSZ) if err != nil { From e37c84dbf6ed8caad57aa3411de8b6f3307a33e1 Mon Sep 17 00:00:00 2001 From: pk910 Date: Sat, 24 Aug 2024 11:29:21 +0200 Subject: [PATCH 2/9] simplified fork detection --- indexer/beacon/fork.go | 6 +- indexer/beacon/forkcache.go | 361 +++++++++++++++--------------------- 2 files changed, 148 insertions(+), 219 deletions(-) diff --git a/indexer/beacon/fork.go b/indexer/beacon/fork.go index 0d1245d..5c3651f 100644 --- a/indexer/beacon/fork.go +++ b/indexer/beacon/fork.go @@ -21,11 +21,11 @@ type Fork struct { } // newFork creates a new Fork instance. -func newFork(forkId ForkKey, baseBlock *Block, leafBlock *Block, parentFork ForkKey) *Fork { +func newFork(forkId ForkKey, baseSlot phase0.Slot, baseRoot phase0.Root, leafBlock *Block, parentFork ForkKey) *Fork { fork := &Fork{ forkId: forkId, - baseSlot: baseBlock.Slot, - baseRoot: baseBlock.Root, + baseSlot: baseSlot, + baseRoot: baseRoot, leafSlot: leafBlock.Slot, leafRoot: leafBlock.Root, parentFork: parentFork, diff --git a/indexer/beacon/forkcache.go b/indexer/beacon/forkcache.go index bf0cbab..d92aad1 100644 --- a/indexer/beacon/forkcache.go +++ b/indexer/beacon/forkcache.go @@ -206,97 +206,9 @@ func (cache *forkCache) setFinalizedEpoch(finalizedSlot phase0.Slot, justifiedRo } } -// checkForkDistance checks the distance between two blocks in a fork and returns the base block and distances. -// If the fork happened before the latest finalized slot, only the side of the fork that does not include the finalized block gets returned. -func (cache *forkCache) checkForkDistance(block1 *Block, block2 *Block, parentsMap map[phase0.Root]bool) (baseBlock *Block, block1Distance uint64, leafBlock1 *Block, block2Distance uint64, leafBlock2 *Block) { - finalizedSlot := cache.indexer.consensusPool.GetChainState().GetFinalizedSlot() - _, finalizedRoot := cache.indexer.consensusPool.GetChainState().GetFinalizedCheckpoint() - leafBlock1 = block1 - leafBlock2 = block2 - - var block1IsFinalized, block2IsFinalized bool - - for { - parentsMap[block1.Root] = true - parentsMap[block2.Root] = true - - if bytes.Equal(block1.Root[:], block2.Root[:]) { - baseBlock = block1 - return - } - - if !block1IsFinalized && bytes.Equal(block1.Root[:], finalizedRoot[:]) { - block1IsFinalized = true - } - - if !block2IsFinalized && bytes.Equal(block2.Root[:], finalizedRoot[:]) { - block2IsFinalized = true - } - - if block1.Slot <= finalizedSlot && block2.Slot <= finalizedSlot { - if block1IsFinalized { - baseBlock = block2 - leafBlock1 = nil - return - } - if block2IsFinalized { - baseBlock = block1 - leafBlock2 = nil - return - } - - break - } - - block1Slot := block1.Slot - block2Slot := block2.Slot - - if block1Slot <= block2Slot && !block2IsFinalized { - leafBlock2 = block2 - parentRoot := block2.GetParentRoot() - if parentRoot == nil { - break - } - - block2 = cache.indexer.blockCache.getBlockByRoot(*parentRoot) - if block2 == nil { - dbBlockHead := db.GetBlockHeadByRoot(parentRoot[:]) - if dbBlockHead != nil { - block2 = newBlock(cache.indexer.dynSsz, phase0.Root(dbBlockHead.Root), phase0.Slot(dbBlockHead.Slot)) - block2.isInFinalizedDb = true - block2.parentRoot = (*phase0.Root)(dbBlockHead.ParentRoot) - } else { - break - } - } - - block2Distance++ - } - - if block2Slot <= block1Slot && !block1IsFinalized { - leafBlock1 = block1 - parentRoot := block1.GetParentRoot() - if parentRoot == nil { - break - } - - block1 = cache.indexer.blockCache.getBlockByRoot(*parentRoot) - if block1 == nil { - dbBlockHead := db.GetBlockHeadByRoot(parentRoot[:]) - if dbBlockHead != nil { - block1 = newBlock(cache.indexer.dynSsz, phase0.Root(dbBlockHead.Root), phase0.Slot(dbBlockHead.Slot)) - block1.isInFinalizedDb = true - block1.parentRoot = (*phase0.Root)(dbBlockHead.ParentRoot) - } else { - break - } - } - - block1Distance++ - } - } - - return nil, 0, nil, 0, nil +type newForkInfo struct { + fork *Fork + updateRoots [][]byte } // processBlock processes a block and detects new forks if any. @@ -305,160 +217,170 @@ func (cache *forkCache) processBlock(block *Block) error { cache.forkProcessLock.Lock() defer cache.forkProcessLock.Unlock() - parentForkId := ForkKey(1) - // get fork id from parent block parentRoot := block.GetParentRoot() - if parentRoot != nil { - parentBlock := cache.indexer.blockCache.getBlockByRoot(*parentRoot) - if parentBlock == nil { - blockHead := db.GetBlockHeadByRoot((*parentRoot)[:]) - if blockHead != nil { - parentForkId = ForkKey(blockHead.ForkId) - } - } else if parentBlock.fokChecked { - parentForkId = parentBlock.forkId - } + if parentRoot == nil { + return fmt.Errorf("parent root not found for block %v", block.Slot) } - forkBlocks := cache.indexer.blockCache.getForkBlocks(parentForkId) - sort.Slice(forkBlocks, func(i, j int) bool { - return forkBlocks[i].Slot > forkBlocks[j].Slot - }) + chainState := cache.indexer.consensusPool.GetChainState() + + // get fork id from parent block + parentForkId := ForkKey(1) + parentSlot := phase0.Slot(0) + parentIsProcessed := false + parentIsFinalized := false + + parentBlock := cache.indexer.blockCache.getBlockByRoot(*parentRoot) + if parentBlock == nil { + blockHead := db.GetBlockHeadByRoot((*parentRoot)[:]) + if blockHead != nil { + parentForkId = ForkKey(blockHead.ForkId) + parentSlot = phase0.Slot(blockHead.Slot) + parentIsProcessed = true + parentIsFinalized = parentSlot < chainState.GetFinalizedSlot() + } + } else if parentBlock.fokChecked { + parentForkId = parentBlock.forkId + parentSlot = parentBlock.Slot + parentIsProcessed = true + parentIsFinalized = parentBlock.Slot < chainState.GetFinalizedSlot() + } - var fork1, fork2 *Fork - var fork1Roots, fork2Roots [][]byte + // check if this block introduces a new fork, it does so if: + // 1. the parent is known & processed and has 1 more child block besides this one + // 2. the current block has 2 or more child blocks (multiple forks possible) + newForks := []*newForkInfo{} currentForkId := parentForkId - parentsMap := map[phase0.Root]bool{} - for _, forkBlock := range forkBlocks { - if forkBlock == block || parentsMap[forkBlock.Root] { - continue + // check scenario 1 + if parentIsProcessed { + otherChildren := []*Block{} + for _, child := range cache.indexer.blockCache.getBlocksByParentRoot(*parentRoot) { + if child == block { + continue + } + + otherChildren = append(otherChildren, child) } - baseBlock, distance1, leaf1, distance2, leaf2 := cache.checkForkDistance(block, forkBlock, parentsMap) - if baseBlock != nil && distance1 > 0 && distance2 > 0 { - // new fork detected + if len(otherChildren) > 0 { + logbuf := strings.Builder{} - if leaf1 != nil { - if cache.getForkByLeaf(leaf1.Root) != nil { - cache.indexer.logger.Warnf("fork already exists for leaf %v [%v] (processing %v)", leaf1.Slot, leaf1.Root.String(), block.Slot) - } else { - cache.lastForkId++ - fork1 = newFork(cache.lastForkId, baseBlock, leaf1, parentForkId) - cache.addFork(fork1) - fork1Roots = cache.updateNewForkBlocks(fork1, forkBlocks, block) + // parent already has a children, so this block introduces a new fork + if cache.getForkByLeaf(block.Root) != nil { + cache.indexer.logger.Warnf("fork already exists for leaf %v [%v] (processing %v, scenario 1)", block.Slot, block.Root.String(), block.Slot) + } else { + cache.lastForkId++ + fork := newFork(cache.lastForkId, parentSlot, *parentRoot, block, parentForkId) + cache.addFork(fork) + + currentForkId = fork.forkId + newFork := &newForkInfo{ + fork: fork, } + newForks = append(newForks, newFork) + + fmt.Fprintf(&logbuf, ", head1: %v [%v, ? upd]", block.Slot, block.Root.String()) } - if leaf2 != nil { - if cache.getForkByLeaf(leaf2.Root) != nil { - cache.indexer.logger.Warnf("fork already exists for leaf %v [%v] (processing %v)", leaf2.Slot, leaf2.Root.String(), block.Slot) + if !parentIsFinalized && len(otherChildren) == 1 { + // parent is not finalized and it's the first fork based on this parent + // so we need to create another fork for the other chain and update the fork ids of the blocks + + if cache.getForkByLeaf(block.Root) != nil { + cache.indexer.logger.Warnf("fork already exists for leaf %v [%v] (processing %v, scenario 1)", block.Slot, block.Root.String(), block.Slot) } else { + cache.lastForkId++ - fork2 = newFork(cache.lastForkId, baseBlock, leaf2, parentForkId) - cache.addFork(fork2) - fork2Roots = cache.updateNewForkBlocks(fork2, forkBlocks, nil) - } - } + otherFork := newFork(cache.lastForkId, parentSlot, *parentRoot, otherChildren[0], parentForkId) + cache.addFork(otherFork) - if parentForkId > 0 { - parentFork := cache.getForkById(parentForkId) - if parentFork != nil { - parentFork.headBlock = baseBlock - } - } + newFork := &newForkInfo{ + fork: otherFork, + updateRoots: cache.updateForkBlocks(otherChildren[0], otherFork.forkId, false), + } + newForks = append(newForks, newFork) - logbuf := strings.Builder{} - fmt.Fprintf(&logbuf, "new fork detected (base %v [%v]", baseBlock.Slot, baseBlock.Root.String()) - if leaf1 != nil { - fmt.Fprintf(&logbuf, ", head1: %v [%v]", leaf1.Slot, leaf1.Root.String()) - } - if leaf2 != nil { - fmt.Fprintf(&logbuf, ", head2: %v [%v]", leaf2.Slot, leaf2.Root.String()) + fmt.Fprintf(&logbuf, ", head2: %v [%v, %v upd]", newFork.fork.leafSlot, newFork.fork.leafRoot.String(), len(newFork.updateRoots)) + } } - fmt.Fprintf(&logbuf, ")") - cache.indexer.logger.Infof(logbuf.String()) - if fork1 != nil { - currentForkId = fork1.forkId + if logbuf.Len() > 0 { + cache.indexer.logger.Infof("new fork leaf detected (base %v [%v]%v)", parentSlot, parentRoot.String(), logbuf.String()) } - - break } } - updatedBlocks := [][]byte{} - if currentForkId != 0 { - // apply fork id to all blocks building on top of this block - nextBlock := block - - for { - nextBlocks := cache.indexer.blockCache.getBlocksByParentRoot(nextBlock.Root) - if len(nextBlocks) > 1 { - // sub-fork detected, but that's probably already handled - // TODO (low prio): check if the sub-fork really exists? - break - } - - if len(nextBlocks) == 0 { - break - } + // check scenario 2 + childBlocks := make([]*Block, 0) + for _, child := range cache.indexer.blockCache.getBlocksByParentRoot(block.Root) { + if !child.fokChecked { + continue + } - nextBlock = nextBlocks[0] + childBlocks = append(childBlocks, child) + } - if !nextBlock.fokChecked { - break - } + if len(childBlocks) > 1 { + // one or more forks detected + logbuf := strings.Builder{} + for idx, child := range childBlocks { + if cache.getForkByLeaf(child.Root) != nil { + cache.indexer.logger.Warnf("fork already exists for leaf %v [%v] (processing %v, scenario 2)", child.Slot, child.Root.String(), block.Slot) + } else { + cache.lastForkId++ + fork := newFork(cache.lastForkId, block.Slot, block.Root, child, currentForkId) + cache.addFork(fork) + + newFork := &newForkInfo{ + fork: fork, + updateRoots: cache.updateForkBlocks(child, fork.forkId, false), + } + newForks = append(newForks, newFork) - if nextBlock.forkId == currentForkId { - break + fmt.Fprintf(&logbuf, ", head%v: %v [%v, %v upd]", idx+1, newFork.fork.leafSlot, newFork.fork.leafRoot.String(), len(newFork.updateRoots)) } - - nextBlock.forkId = currentForkId - updatedBlocks = append(updatedBlocks, nextBlock.Root[:]) } - fork := cache.getForkById(currentForkId) - if fork != nil && (fork.headBlock == nil || fork.headBlock.Slot < nextBlock.Slot) { - fork.headBlock = nextBlock + if logbuf.Len() > 0 { + cache.indexer.logger.Infof("new child forks detected (base %v [%v]%v)", block.Slot, block.Root.String(), logbuf.String()) } } + // update fork ids of all blocks building on top of this block + updatedBlocks := cache.updateForkBlocks(block, currentForkId, true) + + // set detected fork id to the block block.forkId = currentForkId block.fokChecked = true - if fork1 != nil || fork2 != nil || len(updatedBlocks) > 0 { - err := db.RunDBTransaction(func(tx *sqlx.Tx) error { - if fork1 != nil { - err := db.InsertFork(fork1.toDbFork(), tx) - if err != nil { - return err - } - - if len(fork1Roots) > 0 { - err = db.UpdateUnfinalizedBlockForkId(fork1Roots, uint64(fork1.forkId), tx) - if err != nil { - return err - } - } - - cache.indexer.logger.Infof("fork %v created (base %v [%v], head %v [%v], updated blocks: %v)", fork1.forkId, fork1.baseSlot, fork1.baseRoot.String(), fork1.leafSlot, fork1.leafRoot.String(), len(fork1Roots)) - } + // update fork head block if needed + fork := cache.getForkById(currentForkId) + if fork != nil { + lastBlock := block + if len(updatedBlocks) > 0 { + lastBlock = cache.indexer.blockCache.getBlockByRoot(phase0.Root(updatedBlocks[len(updatedBlocks)-1])) + } + if lastBlock != nil && (fork.headBlock == nil || lastBlock.Slot > fork.headBlock.Slot) { + fork.headBlock = lastBlock + } + } - if fork2 != nil { - err := db.InsertFork(fork2.toDbFork(), tx) + // persist new forks and updated blocks to the database + if len(newForks) > 0 || len(updatedBlocks) > 0 { + err := db.RunDBTransaction(func(tx *sqlx.Tx) error { + for _, newFork := range newForks { + err := db.InsertFork(newFork.fork.toDbFork(), tx) if err != nil { return err } - if len(fork2Roots) > 0 { - err = db.UpdateUnfinalizedBlockForkId(fork2Roots, uint64(fork2.forkId), tx) + if len(newFork.updateRoots) > 0 { + err = db.UpdateUnfinalizedBlockForkId(newFork.updateRoots, uint64(newFork.fork.forkId), tx) if err != nil { return err } } - - cache.indexer.logger.Infof("fork %v created (base %v [%v], head %v [%v], updated blocks: %v)", fork2.forkId, fork2.baseSlot, fork2.baseRoot.String(), fork2.leafSlot, fork2.leafRoot.String(), len(fork2Roots)) } if len(updatedBlocks) > 0 { @@ -485,27 +407,34 @@ func (cache *forkCache) processBlock(block *Block) error { return nil } -// updateNewForkBlocks updates the fork blocks with the given fork. returns the roots of the updated blocks. -func (cache *forkCache) updateNewForkBlocks(fork *Fork, blocks []*Block, ignoreBlock *Block) [][]byte { - updatedRoots := [][]byte{} +// updateForkBlocks updates the blocks building on top of the given block in the fork and returns the updated block roots. +func (cache *forkCache) updateForkBlocks(startBlock *Block, forkId ForkKey, skipStartBlock bool) [][]byte { + blockRoots := [][]byte{} - for _, block := range blocks { - if block.Slot <= fork.baseSlot { - return updatedRoots + if !skipStartBlock { + blockRoots = append(blockRoots, startBlock.Root[:]) + } + + for { + nextBlocks := cache.indexer.blockCache.getBlocksByParentRoot(startBlock.Root) + if len(nextBlocks) != 1 { + break } - if block == ignoreBlock { - continue + nextBlock := nextBlocks[0] + if !nextBlock.fokChecked { + break } - isInFork, _ := cache.indexer.blockCache.getCanonicalDistance(fork.leafRoot, block.Root, 0) - if !isInFork { - continue + if nextBlock.forkId == forkId { + break } - block.forkId = fork.forkId - updatedRoots = append(updatedRoots, block.Root[:]) + nextBlock.forkId = forkId + blockRoots = append(blockRoots, nextBlock.Root[:]) + + startBlock = nextBlock } - return updatedRoots + return blockRoots } From 9844958c48218033308a6691aa6ad37f44ae0539 Mon Sep 17 00:00:00 2001 From: pk910 Date: Sat, 24 Aug 2024 13:19:04 +0200 Subject: [PATCH 3/9] update parent fork ids for subsequent forks --- db/forks.go | 13 +++++++++ indexer/beacon/forkcache.go | 57 ++++++++++++++++++++++++++++++++----- 2 files changed, 63 insertions(+), 7 deletions(-) diff --git a/db/forks.go b/db/forks.go index c6472b2..0d21bc7 100644 --- a/db/forks.go +++ b/db/forks.go @@ -124,3 +124,16 @@ func UpdateFinalizedForkParents(finalizedRoots [][]byte, tx *sqlx.Tx) error { return nil } + +func UpdateForkParent(parentRoot []byte, parentForkId uint64, tx *sqlx.Tx) error { + _, err := tx.Exec(` + UPDATE forks + SET parent_fork = $1 + WHERE base_root = $2 + `, parentForkId, parentRoot) + if err != nil { + return err + } + + return nil +} diff --git a/indexer/beacon/forkcache.go b/indexer/beacon/forkcache.go index d92aad1..7b249e9 100644 --- a/indexer/beacon/forkcache.go +++ b/indexer/beacon/forkcache.go @@ -211,6 +211,11 @@ type newForkInfo struct { updateRoots [][]byte } +type updateForkInfo struct { + baseRoot []byte + parent ForkKey +} + // processBlock processes a block and detects new forks if any. // It persists the new forks to the database, updates any subsequent blocks building on top of the given block and returns the fork ID. func (cache *forkCache) processBlock(block *Block) error { @@ -250,6 +255,7 @@ func (cache *forkCache) processBlock(block *Block) error { // 1. the parent is known & processed and has 1 more child block besides this one // 2. the current block has 2 or more child blocks (multiple forks possible) newForks := []*newForkInfo{} + updateForks := []*updateForkInfo{} currentForkId := parentForkId // check scenario 1 @@ -295,12 +301,17 @@ func (cache *forkCache) processBlock(block *Block) error { otherFork := newFork(cache.lastForkId, parentSlot, *parentRoot, otherChildren[0], parentForkId) cache.addFork(otherFork) + updatedRoots, updatedFork := cache.updateForkBlocks(otherChildren[0], otherFork.forkId, false) newFork := &newForkInfo{ fork: otherFork, - updateRoots: cache.updateForkBlocks(otherChildren[0], otherFork.forkId, false), + updateRoots: updatedRoots, } newForks = append(newForks, newFork) + if updatedFork != nil { + updateForks = append(updateForks, updatedFork) + } + fmt.Fprintf(&logbuf, ", head2: %v [%v, %v upd]", newFork.fork.leafSlot, newFork.fork.leafRoot.String(), len(newFork.updateRoots)) } } @@ -332,12 +343,17 @@ func (cache *forkCache) processBlock(block *Block) error { fork := newFork(cache.lastForkId, block.Slot, block.Root, child, currentForkId) cache.addFork(fork) + updatedRoots, updatedFork := cache.updateForkBlocks(child, fork.forkId, false) newFork := &newForkInfo{ fork: fork, - updateRoots: cache.updateForkBlocks(child, fork.forkId, false), + updateRoots: updatedRoots, } newForks = append(newForks, newFork) + if updatedFork != nil { + updateForks = append(updateForks, updatedFork) + } + fmt.Fprintf(&logbuf, ", head%v: %v [%v, %v upd]", idx+1, newFork.fork.leafSlot, newFork.fork.leafRoot.String(), len(newFork.updateRoots)) } } @@ -348,7 +364,10 @@ func (cache *forkCache) processBlock(block *Block) error { } // update fork ids of all blocks building on top of this block - updatedBlocks := cache.updateForkBlocks(block, currentForkId, true) + updatedBlocks, updatedFork := cache.updateForkBlocks(block, currentForkId, true) + if updatedFork != nil { + updateForks = append(updateForks, updatedFork) + } // set detected fork id to the block block.forkId = currentForkId @@ -392,6 +411,17 @@ func (cache *forkCache) processBlock(block *Block) error { cache.indexer.logger.Infof("updated %v blocks to fork %v", len(updatedBlocks), currentForkId) } + if len(updateForks) > 0 { + for _, updatedFork := range updateForks { + err := db.UpdateForkParent(updatedFork.baseRoot, uint64(updatedFork.parent), tx) + if err != nil { + return err + } + } + + cache.indexer.logger.Infof("updated %v fork parents", len(updateForks)) + } + err := cache.updateForkState(tx) if err != nil { return fmt.Errorf("error while updating fork state: %v", err) @@ -408,8 +438,8 @@ func (cache *forkCache) processBlock(block *Block) error { } // updateForkBlocks updates the blocks building on top of the given block in the fork and returns the updated block roots. -func (cache *forkCache) updateForkBlocks(startBlock *Block, forkId ForkKey, skipStartBlock bool) [][]byte { - blockRoots := [][]byte{} +func (cache *forkCache) updateForkBlocks(startBlock *Block, forkId ForkKey, skipStartBlock bool) (blockRoots [][]byte, updatedFork *updateForkInfo) { + blockRoots = [][]byte{} if !skipStartBlock { blockRoots = append(blockRoots, startBlock.Root[:]) @@ -417,7 +447,20 @@ func (cache *forkCache) updateForkBlocks(startBlock *Block, forkId ForkKey, skip for { nextBlocks := cache.indexer.blockCache.getBlocksByParentRoot(startBlock.Root) - if len(nextBlocks) != 1 { + if len(nextBlocks) == 0 { + break + } + + if len(nextBlocks) > 1 { + // potential fork ahead, check if the fork is already processed and has correct parent fork id + if fork := cache.getForkByLeaf(startBlock.Root); fork != nil && fork.parentFork != forkId { + fork.parentFork = forkId + + updatedFork = &updateForkInfo{ + baseRoot: startBlock.Root[:], + parent: forkId, + } + } break } @@ -436,5 +479,5 @@ func (cache *forkCache) updateForkBlocks(startBlock *Block, forkId ForkKey, skip startBlock = nextBlock } - return blockRoots + return } From 84dd0414be271ef1930d8de6f8c479c038052437 Mon Sep 17 00:00:00 2001 From: pk910 Date: Sat, 24 Aug 2024 22:21:15 +0200 Subject: [PATCH 4/9] clean up & comments --- indexer/beacon/forkcache.go | 29 ++++++++++++++++++++++------- 1 file changed, 22 insertions(+), 7 deletions(-) diff --git a/indexer/beacon/forkcache.go b/indexer/beacon/forkcache.go index 7b249e9..e694739 100644 --- a/indexer/beacon/forkcache.go +++ b/indexer/beacon/forkcache.go @@ -251,12 +251,19 @@ func (cache *forkCache) processBlock(block *Block) error { parentIsFinalized = parentBlock.Slot < chainState.GetFinalizedSlot() } - // check if this block introduces a new fork, it does so if: - // 1. the parent is known & processed and has 1 more child block besides this one - // 2. the current block has 2 or more child blocks (multiple forks possible) + // check if this block (c) introduces a new fork, it does so if: + // 1. the parent (p) is known & processed and has 1 or more child blocks besides this one (c1, c2, ...) + // c c1 c2 + // \ | / + // p + // 2. the current block (c) has 2 or more child blocks, multiple forks possible (c1, c2, ...) + // c1 c2 c3 + // \ | / + // c + newForks := []*newForkInfo{} updateForks := []*updateForkInfo{} - currentForkId := parentForkId + currentForkId := parentForkId // default to parent fork id // check scenario 1 if parentIsProcessed { @@ -290,13 +297,18 @@ func (cache *forkCache) processBlock(block *Block) error { } if !parentIsFinalized && len(otherChildren) == 1 { - // parent is not finalized and it's the first fork based on this parent - // so we need to create another fork for the other chain and update the fork ids of the blocks + // parent (a) is not finalized and our new detected fork the first fork based on this parent (c) + // we need to create another fork for the other chain that starts from our fork base (b1, b2, ) + // and update the blocks building on top of it + // b2 + // | + // b1 c + // | / + // a if cache.getForkByLeaf(block.Root) != nil { cache.indexer.logger.Warnf("fork already exists for leaf %v [%v] (processing %v, scenario 1)", block.Slot, block.Root.String(), block.Slot) } else { - cache.lastForkId++ otherFork := newFork(cache.lastForkId, parentSlot, *parentRoot, otherChildren[0], parentForkId) cache.addFork(otherFork) @@ -388,6 +400,7 @@ func (cache *forkCache) processBlock(block *Block) error { // persist new forks and updated blocks to the database if len(newForks) > 0 || len(updatedBlocks) > 0 { err := db.RunDBTransaction(func(tx *sqlx.Tx) error { + // add new forks for _, newFork := range newForks { err := db.InsertFork(newFork.fork.toDbFork(), tx) if err != nil { @@ -402,6 +415,7 @@ func (cache *forkCache) processBlock(block *Block) error { } } + // update blocks building on top of current block if len(updatedBlocks) > 0 { err := db.UpdateUnfinalizedBlockForkId(updatedBlocks, uint64(currentForkId), tx) if err != nil { @@ -411,6 +425,7 @@ func (cache *forkCache) processBlock(block *Block) error { cache.indexer.logger.Infof("updated %v blocks to fork %v", len(updatedBlocks), currentForkId) } + // update parents of forks building on top of current blocks chain segment if len(updateForks) > 0 { for _, updatedFork := range updateForks { err := db.UpdateForkParent(updatedFork.baseRoot, uint64(updatedFork.parent), tx) From 4c5105687674d798175bece509e6bb23dd7bb17d Mon Sep 17 00:00:00 2001 From: pk910 Date: Sat, 24 Aug 2024 22:44:26 +0200 Subject: [PATCH 5/9] fixes for fork detection --- indexer/beacon/forkcache.go | 24 ++++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/indexer/beacon/forkcache.go b/indexer/beacon/forkcache.go index e694739..87e8b3a 100644 --- a/indexer/beacon/forkcache.go +++ b/indexer/beacon/forkcache.go @@ -91,6 +91,20 @@ func (cache *forkCache) getForkByLeaf(leafRoot phase0.Root) *Fork { return nil } +func (cache *forkCache) getForkByBase(baseRoot phase0.Root) []*Fork { + cache.cacheMutex.Lock() + defer cache.cacheMutex.Unlock() + + forks := []*Fork{} + for _, fork := range cache.forkMap { + if bytes.Equal(fork.baseRoot[:], baseRoot[:]) { + forks = append(forks, fork) + } + } + + return forks +} + // removeFork removes a fork from the cache. func (cache *forkCache) removeFork(forkId ForkKey) { cache.cacheMutex.Lock() @@ -306,8 +320,8 @@ func (cache *forkCache) processBlock(block *Block) error { // | / // a - if cache.getForkByLeaf(block.Root) != nil { - cache.indexer.logger.Warnf("fork already exists for leaf %v [%v] (processing %v, scenario 1)", block.Slot, block.Root.String(), block.Slot) + if cache.getForkByLeaf(otherChildren[0].Root) != nil { + cache.indexer.logger.Warnf("fork already exists for leaf %v [%v] (processing %v, scenario 1)", otherChildren[0].Slot, otherChildren[0].Root.String(), block.Slot) } else { cache.lastForkId++ otherFork := newFork(cache.lastForkId, parentSlot, *parentRoot, otherChildren[0], parentForkId) @@ -468,8 +482,10 @@ func (cache *forkCache) updateForkBlocks(startBlock *Block, forkId ForkKey, skip if len(nextBlocks) > 1 { // potential fork ahead, check if the fork is already processed and has correct parent fork id - if fork := cache.getForkByLeaf(startBlock.Root); fork != nil && fork.parentFork != forkId { - fork.parentFork = forkId + if forks := cache.getForkByBase(startBlock.Root); len(forks) > 0 && forks[0].parentFork != forkId { + for _, fork := range forks { + fork.parentFork = forkId + } updatedFork = &updateForkInfo{ baseRoot: startBlock.Root[:], From d0cd67e0255afcb52bba70a576562c2beffbe1e4 Mon Sep 17 00:00:00 2001 From: pk910 Date: Sat, 24 Aug 2024 23:04:03 +0200 Subject: [PATCH 6/9] measure loading, processing & db-write durations in block processing --- indexer/beacon/client.go | 59 +++++++++++++++++++++++++++++++++------- 1 file changed, 49 insertions(+), 10 deletions(-) diff --git a/indexer/beacon/client.go b/indexer/beacon/client.go index 19d1af9..fd4b3c7 100644 --- a/indexer/beacon/client.go +++ b/indexer/beacon/client.go @@ -5,6 +5,7 @@ import ( "context" "fmt" "runtime/debug" + "strings" "time" v1 "github.com/attestantio/go-eth2-client/api/v1" @@ -105,6 +106,25 @@ func (c *Client) startClientLoop() { } } +func (c *Client) formatProcessingTimes(processingTimes []time.Duration) string { + if len(processingTimes) == 0 { + return "" + } + + str := strings.Builder{} + str.WriteString(" (") + for i, pt := range processingTimes { + if i > 0 { + str.WriteString(", ") + } + + str.WriteString(fmt.Sprintf("%v ms", pt.Milliseconds())) + } + str.WriteString(")") + + return str.String() +} + // runClientLoop runs the client event processing subroutine. func (c *Client) runClientLoop() error { // 1 - load & process head block @@ -116,7 +136,7 @@ func (c *Client) runClientLoop() error { c.headRoot = headRoot - headBlock, isNew, err := c.processBlock(headSlot, headRoot, nil) + headBlock, isNew, processingTimes, err := c.processBlock(headSlot, headRoot, nil) if err != nil { return fmt.Errorf("failed processing head block: %v", err) } @@ -126,9 +146,9 @@ func (c *Client) runClientLoop() error { } if isNew { - c.logger.Infof("received block %v:%v [0x%x] head", c.client.GetPool().GetChainState().EpochOfSlot(headSlot), headSlot, headRoot) + c.logger.Infof("received block %v:%v [0x%x] head %v", c.client.GetPool().GetChainState().EpochOfSlot(headSlot), headSlot, headRoot, c.formatProcessingTimes(processingTimes)) } else { - c.logger.Debugf("received known block %v:%v [0x%x] head", c.client.GetPool().GetChainState().EpochOfSlot(headSlot), headSlot, headRoot) + c.logger.Debugf("received known block %v:%v [0x%x] head %v", c.client.GetPool().GetChainState().EpochOfSlot(headSlot), headSlot, headRoot, c.formatProcessingTimes(processingTimes)) } // 2 - backfill old blocks up to the finalization checkpoint or known in cache @@ -263,7 +283,7 @@ func (c *Client) processHeadEvent(headEvent *v1.HeadEvent) error { // processStreamBlock processes a block received from the stream (either via block or head events). func (c *Client) processStreamBlock(slot phase0.Slot, root phase0.Root) (*Block, error) { - block, isNew, err := c.processBlock(slot, root, nil) + block, isNew, processingTimes, err := c.processBlock(slot, root, nil) if err != nil { return nil, err } @@ -271,9 +291,9 @@ func (c *Client) processStreamBlock(slot phase0.Slot, root phase0.Root) (*Block, chainState := c.client.GetPool().GetChainState() if isNew { - c.logger.Infof("received block %v:%v [0x%x] stream", chainState.EpochOfSlot(block.Slot), block.Slot, block.Root[:]) + c.logger.Infof("received block %v:%v [0x%x] stream %v", chainState.EpochOfSlot(block.Slot), block.Slot, block.Root[:], c.formatProcessingTimes(processingTimes)) } else { - c.logger.Debugf("received known block %v:%v [0x%x] stream", chainState.EpochOfSlot(block.Slot), block.Slot, block.Root[:]) + c.logger.Debugf("received known block %v:%v [0x%x] stream %v", chainState.EpochOfSlot(block.Slot), block.Slot, block.Root[:], c.formatProcessingTimes(processingTimes)) } return block, nil @@ -323,9 +343,10 @@ func (c *Client) processReorg(oldHead *Block, newHead *Block) error { } // processBlock processes a block (from stream & polling). -func (c *Client) processBlock(slot phase0.Slot, root phase0.Root, header *phase0.SignedBeaconBlockHeader) (block *Block, isNew bool, err error) { +func (c *Client) processBlock(slot phase0.Slot, root phase0.Root, header *phase0.SignedBeaconBlockHeader) (block *Block, isNew bool, processingTimes []time.Duration, err error) { chainState := c.client.GetPool().GetChainState() finalizedSlot := chainState.GetFinalizedSlot() + processingTimes = make([]time.Duration, 3) if slot < finalizedSlot { // block is in finalized epoch @@ -349,6 +370,11 @@ func (c *Client) processBlock(slot phase0.Slot, root phase0.Root, header *phase0 return header, nil } + t1 := time.Now() + defer func() { + processingTimes[0] += time.Since(t1) + }() + return LoadBeaconHeader(c.getContext(), c, root) }) if err != nil { @@ -356,6 +382,12 @@ func (c *Client) processBlock(slot phase0.Slot, root phase0.Root, header *phase0 } isNew, err = block.EnsureBlock(func() (*spec.VersionedSignedBeaconBlock, error) { + + t1 := time.Now() + defer func() { + processingTimes[0] += time.Since(t1) + }() + return LoadBeaconBlock(c.getContext(), c, root) }) if err != nil { @@ -364,6 +396,7 @@ func (c *Client) processBlock(slot phase0.Slot, root phase0.Root, header *phase0 if slot >= finalizedSlot && isNew { c.indexer.blockCache.addBlockToParentMap(block) + t1 := time.Now() // fork detection err2 := c.indexer.forkCache.processBlock(block) @@ -378,6 +411,9 @@ func (c *Client) processBlock(slot phase0.Slot, root phase0.Root, header *phase0 return } + processingTimes[1] = time.Since(t1) + t1 = time.Now() + // write to db err = db.RunDBTransaction(func(tx *sqlx.Tx) error { err := db.InsertUnfinalizedBlock(dbBlock, tx) @@ -391,6 +427,8 @@ func (c *Client) processBlock(slot phase0.Slot, root phase0.Root, header *phase0 return } + processingTimes[2] = time.Since(t1) + block.isInUnfinalizedDb = true c.indexer.blockCache.latestBlock = block } @@ -446,19 +484,20 @@ func (c *Client) backfillParentBlocks(headBlock *Block) error { break } + var processingTimes []time.Duration if parentBlock == nil { var err error - parentBlock, isNewBlock, err = c.processBlock(parentSlot, parentRoot, parentHead) + parentBlock, isNewBlock, processingTimes, err = c.processBlock(parentSlot, parentRoot, parentHead) if err != nil { return fmt.Errorf("could not process block [0x%x]: %v", parentRoot, err) } } if isNewBlock { - c.logger.Infof("received block %v:%v [0x%x] backfill", chainState.EpochOfSlot(parentSlot), parentSlot, parentRoot) + c.logger.Infof("received block %v:%v [0x%x] backfill %v", chainState.EpochOfSlot(parentSlot), parentSlot, parentRoot, c.formatProcessingTimes(processingTimes)) } else { - c.logger.Debugf("received known block %v:%v [0x%x] backfill", chainState.EpochOfSlot(parentSlot), parentSlot, parentRoot) + c.logger.Debugf("received known block %v:%v [0x%x] backfill %v", chainState.EpochOfSlot(parentSlot), parentSlot, parentRoot, c.formatProcessingTimes(processingTimes)) } if parentSlot == 0 { From bd5a69ecb8797128651a31031a0562db3a22ce80 Mon Sep 17 00:00:00 2001 From: pk910 Date: Sun, 25 Aug 2024 01:03:06 +0200 Subject: [PATCH 7/9] move fork detection to `forkdetection.go` --- indexer/beacon/forkcache.go | 294 ------------------------------ indexer/beacon/forkdetection.go | 304 ++++++++++++++++++++++++++++++++ 2 files changed, 304 insertions(+), 294 deletions(-) create mode 100644 indexer/beacon/forkdetection.go diff --git a/indexer/beacon/forkcache.go b/indexer/beacon/forkcache.go index 87e8b3a..d1f6be4 100644 --- a/indexer/beacon/forkcache.go +++ b/indexer/beacon/forkcache.go @@ -4,7 +4,6 @@ import ( "bytes" "fmt" "sort" - "strings" "sync" "github.com/attestantio/go-eth2-client/spec/phase0" @@ -219,296 +218,3 @@ func (cache *forkCache) setFinalizedEpoch(finalizedSlot phase0.Slot, justifiedRo cache.indexer.logger.Errorf("error while updating fork state: %v", err) } } - -type newForkInfo struct { - fork *Fork - updateRoots [][]byte -} - -type updateForkInfo struct { - baseRoot []byte - parent ForkKey -} - -// processBlock processes a block and detects new forks if any. -// It persists the new forks to the database, updates any subsequent blocks building on top of the given block and returns the fork ID. -func (cache *forkCache) processBlock(block *Block) error { - cache.forkProcessLock.Lock() - defer cache.forkProcessLock.Unlock() - - parentRoot := block.GetParentRoot() - if parentRoot == nil { - return fmt.Errorf("parent root not found for block %v", block.Slot) - } - - chainState := cache.indexer.consensusPool.GetChainState() - - // get fork id from parent block - parentForkId := ForkKey(1) - parentSlot := phase0.Slot(0) - parentIsProcessed := false - parentIsFinalized := false - - parentBlock := cache.indexer.blockCache.getBlockByRoot(*parentRoot) - if parentBlock == nil { - blockHead := db.GetBlockHeadByRoot((*parentRoot)[:]) - if blockHead != nil { - parentForkId = ForkKey(blockHead.ForkId) - parentSlot = phase0.Slot(blockHead.Slot) - parentIsProcessed = true - parentIsFinalized = parentSlot < chainState.GetFinalizedSlot() - } - } else if parentBlock.fokChecked { - parentForkId = parentBlock.forkId - parentSlot = parentBlock.Slot - parentIsProcessed = true - parentIsFinalized = parentBlock.Slot < chainState.GetFinalizedSlot() - } - - // check if this block (c) introduces a new fork, it does so if: - // 1. the parent (p) is known & processed and has 1 or more child blocks besides this one (c1, c2, ...) - // c c1 c2 - // \ | / - // p - // 2. the current block (c) has 2 or more child blocks, multiple forks possible (c1, c2, ...) - // c1 c2 c3 - // \ | / - // c - - newForks := []*newForkInfo{} - updateForks := []*updateForkInfo{} - currentForkId := parentForkId // default to parent fork id - - // check scenario 1 - if parentIsProcessed { - otherChildren := []*Block{} - for _, child := range cache.indexer.blockCache.getBlocksByParentRoot(*parentRoot) { - if child == block { - continue - } - - otherChildren = append(otherChildren, child) - } - - if len(otherChildren) > 0 { - logbuf := strings.Builder{} - - // parent already has a children, so this block introduces a new fork - if cache.getForkByLeaf(block.Root) != nil { - cache.indexer.logger.Warnf("fork already exists for leaf %v [%v] (processing %v, scenario 1)", block.Slot, block.Root.String(), block.Slot) - } else { - cache.lastForkId++ - fork := newFork(cache.lastForkId, parentSlot, *parentRoot, block, parentForkId) - cache.addFork(fork) - - currentForkId = fork.forkId - newFork := &newForkInfo{ - fork: fork, - } - newForks = append(newForks, newFork) - - fmt.Fprintf(&logbuf, ", head1: %v [%v, ? upd]", block.Slot, block.Root.String()) - } - - if !parentIsFinalized && len(otherChildren) == 1 { - // parent (a) is not finalized and our new detected fork the first fork based on this parent (c) - // we need to create another fork for the other chain that starts from our fork base (b1, b2, ) - // and update the blocks building on top of it - // b2 - // | - // b1 c - // | / - // a - - if cache.getForkByLeaf(otherChildren[0].Root) != nil { - cache.indexer.logger.Warnf("fork already exists for leaf %v [%v] (processing %v, scenario 1)", otherChildren[0].Slot, otherChildren[0].Root.String(), block.Slot) - } else { - cache.lastForkId++ - otherFork := newFork(cache.lastForkId, parentSlot, *parentRoot, otherChildren[0], parentForkId) - cache.addFork(otherFork) - - updatedRoots, updatedFork := cache.updateForkBlocks(otherChildren[0], otherFork.forkId, false) - newFork := &newForkInfo{ - fork: otherFork, - updateRoots: updatedRoots, - } - newForks = append(newForks, newFork) - - if updatedFork != nil { - updateForks = append(updateForks, updatedFork) - } - - fmt.Fprintf(&logbuf, ", head2: %v [%v, %v upd]", newFork.fork.leafSlot, newFork.fork.leafRoot.String(), len(newFork.updateRoots)) - } - } - - if logbuf.Len() > 0 { - cache.indexer.logger.Infof("new fork leaf detected (base %v [%v]%v)", parentSlot, parentRoot.String(), logbuf.String()) - } - } - } - - // check scenario 2 - childBlocks := make([]*Block, 0) - for _, child := range cache.indexer.blockCache.getBlocksByParentRoot(block.Root) { - if !child.fokChecked { - continue - } - - childBlocks = append(childBlocks, child) - } - - if len(childBlocks) > 1 { - // one or more forks detected - logbuf := strings.Builder{} - for idx, child := range childBlocks { - if cache.getForkByLeaf(child.Root) != nil { - cache.indexer.logger.Warnf("fork already exists for leaf %v [%v] (processing %v, scenario 2)", child.Slot, child.Root.String(), block.Slot) - } else { - cache.lastForkId++ - fork := newFork(cache.lastForkId, block.Slot, block.Root, child, currentForkId) - cache.addFork(fork) - - updatedRoots, updatedFork := cache.updateForkBlocks(child, fork.forkId, false) - newFork := &newForkInfo{ - fork: fork, - updateRoots: updatedRoots, - } - newForks = append(newForks, newFork) - - if updatedFork != nil { - updateForks = append(updateForks, updatedFork) - } - - fmt.Fprintf(&logbuf, ", head%v: %v [%v, %v upd]", idx+1, newFork.fork.leafSlot, newFork.fork.leafRoot.String(), len(newFork.updateRoots)) - } - } - - if logbuf.Len() > 0 { - cache.indexer.logger.Infof("new child forks detected (base %v [%v]%v)", block.Slot, block.Root.String(), logbuf.String()) - } - } - - // update fork ids of all blocks building on top of this block - updatedBlocks, updatedFork := cache.updateForkBlocks(block, currentForkId, true) - if updatedFork != nil { - updateForks = append(updateForks, updatedFork) - } - - // set detected fork id to the block - block.forkId = currentForkId - block.fokChecked = true - - // update fork head block if needed - fork := cache.getForkById(currentForkId) - if fork != nil { - lastBlock := block - if len(updatedBlocks) > 0 { - lastBlock = cache.indexer.blockCache.getBlockByRoot(phase0.Root(updatedBlocks[len(updatedBlocks)-1])) - } - if lastBlock != nil && (fork.headBlock == nil || lastBlock.Slot > fork.headBlock.Slot) { - fork.headBlock = lastBlock - } - } - - // persist new forks and updated blocks to the database - if len(newForks) > 0 || len(updatedBlocks) > 0 { - err := db.RunDBTransaction(func(tx *sqlx.Tx) error { - // add new forks - for _, newFork := range newForks { - err := db.InsertFork(newFork.fork.toDbFork(), tx) - if err != nil { - return err - } - - if len(newFork.updateRoots) > 0 { - err = db.UpdateUnfinalizedBlockForkId(newFork.updateRoots, uint64(newFork.fork.forkId), tx) - if err != nil { - return err - } - } - } - - // update blocks building on top of current block - if len(updatedBlocks) > 0 { - err := db.UpdateUnfinalizedBlockForkId(updatedBlocks, uint64(currentForkId), tx) - if err != nil { - return err - } - - cache.indexer.logger.Infof("updated %v blocks to fork %v", len(updatedBlocks), currentForkId) - } - - // update parents of forks building on top of current blocks chain segment - if len(updateForks) > 0 { - for _, updatedFork := range updateForks { - err := db.UpdateForkParent(updatedFork.baseRoot, uint64(updatedFork.parent), tx) - if err != nil { - return err - } - } - - cache.indexer.logger.Infof("updated %v fork parents", len(updateForks)) - } - - err := cache.updateForkState(tx) - if err != nil { - return fmt.Errorf("error while updating fork state: %v", err) - } - - return nil - }) - if err != nil { - return err - } - } - - return nil -} - -// updateForkBlocks updates the blocks building on top of the given block in the fork and returns the updated block roots. -func (cache *forkCache) updateForkBlocks(startBlock *Block, forkId ForkKey, skipStartBlock bool) (blockRoots [][]byte, updatedFork *updateForkInfo) { - blockRoots = [][]byte{} - - if !skipStartBlock { - blockRoots = append(blockRoots, startBlock.Root[:]) - } - - for { - nextBlocks := cache.indexer.blockCache.getBlocksByParentRoot(startBlock.Root) - if len(nextBlocks) == 0 { - break - } - - if len(nextBlocks) > 1 { - // potential fork ahead, check if the fork is already processed and has correct parent fork id - if forks := cache.getForkByBase(startBlock.Root); len(forks) > 0 && forks[0].parentFork != forkId { - for _, fork := range forks { - fork.parentFork = forkId - } - - updatedFork = &updateForkInfo{ - baseRoot: startBlock.Root[:], - parent: forkId, - } - } - break - } - - nextBlock := nextBlocks[0] - if !nextBlock.fokChecked { - break - } - - if nextBlock.forkId == forkId { - break - } - - nextBlock.forkId = forkId - blockRoots = append(blockRoots, nextBlock.Root[:]) - - startBlock = nextBlock - } - - return -} diff --git a/indexer/beacon/forkdetection.go b/indexer/beacon/forkdetection.go new file mode 100644 index 0000000..40a83df --- /dev/null +++ b/indexer/beacon/forkdetection.go @@ -0,0 +1,304 @@ +package beacon + +import ( + "fmt" + "strings" + + "github.com/attestantio/go-eth2-client/spec/phase0" + "github.com/jmoiron/sqlx" + + "github.com/ethpandaops/dora/db" +) + +type newForkInfo struct { + fork *Fork + updateRoots [][]byte +} + +type updateForkInfo struct { + baseRoot []byte + parent ForkKey +} + +// processBlock processes a block and detects new forks if any. +// It persists the new forks to the database, updates any subsequent blocks building on top of the given block and returns the fork ID. +func (cache *forkCache) processBlock(block *Block) error { + cache.forkProcessLock.Lock() + defer cache.forkProcessLock.Unlock() + + parentRoot := block.GetParentRoot() + if parentRoot == nil { + return fmt.Errorf("parent root not found for block %v", block.Slot) + } + + chainState := cache.indexer.consensusPool.GetChainState() + + // get fork id from parent block + parentForkId := ForkKey(1) + parentSlot := phase0.Slot(0) + parentIsProcessed := false + parentIsFinalized := false + + parentBlock := cache.indexer.blockCache.getBlockByRoot(*parentRoot) + if parentBlock == nil { + blockHead := db.GetBlockHeadByRoot((*parentRoot)[:]) + if blockHead != nil { + parentForkId = ForkKey(blockHead.ForkId) + parentSlot = phase0.Slot(blockHead.Slot) + parentIsProcessed = true + parentIsFinalized = parentSlot < chainState.GetFinalizedSlot() + } + } else if parentBlock.fokChecked { + parentForkId = parentBlock.forkId + parentSlot = parentBlock.Slot + parentIsProcessed = true + parentIsFinalized = parentBlock.Slot < chainState.GetFinalizedSlot() + } + + // check if this block (c) introduces a new fork, it does so if: + // 1. the parent (p) is known & processed and has 1 or more child blocks besides this one (c1, c2, ...) + // c c1 c2 + // \ | / + // p + // 2. the current block (c) has 2 or more child blocks, multiple forks possible (c1, c2, ...) + // c1 c2 c3 + // \ | / + // c + + newForks := []*newForkInfo{} + updateForks := []*updateForkInfo{} + currentForkId := parentForkId // default to parent fork id + + // check scenario 1 + if parentIsProcessed { + otherChildren := []*Block{} + for _, child := range cache.indexer.blockCache.getBlocksByParentRoot(*parentRoot) { + if child == block { + continue + } + + otherChildren = append(otherChildren, child) + } + + if len(otherChildren) > 0 { + logbuf := strings.Builder{} + + // parent already has a children, so this block introduces a new fork + if cache.getForkByLeaf(block.Root) != nil { + cache.indexer.logger.Warnf("fork already exists for leaf %v [%v] (processing %v, scenario 1)", block.Slot, block.Root.String(), block.Slot) + } else { + cache.lastForkId++ + fork := newFork(cache.lastForkId, parentSlot, *parentRoot, block, parentForkId) + cache.addFork(fork) + + currentForkId = fork.forkId + newFork := &newForkInfo{ + fork: fork, + } + newForks = append(newForks, newFork) + + fmt.Fprintf(&logbuf, ", head1: %v [%v, ? upd]", block.Slot, block.Root.String()) + } + + if !parentIsFinalized && len(otherChildren) == 1 { + // parent (a) is not finalized and our new detected fork the first fork based on this parent (c) + // we need to create another fork for the other chain that starts from our fork base (b1, b2, ) + // and update the blocks building on top of it + // b2 + // | + // b1 c + // | / + // a + + if cache.getForkByLeaf(otherChildren[0].Root) != nil { + cache.indexer.logger.Warnf("fork already exists for leaf %v [%v] (processing %v, scenario 1)", otherChildren[0].Slot, otherChildren[0].Root.String(), block.Slot) + } else { + cache.lastForkId++ + otherFork := newFork(cache.lastForkId, parentSlot, *parentRoot, otherChildren[0], parentForkId) + cache.addFork(otherFork) + + updatedRoots, updatedFork := cache.updateForkBlocks(otherChildren[0], otherFork.forkId, false) + newFork := &newForkInfo{ + fork: otherFork, + updateRoots: updatedRoots, + } + newForks = append(newForks, newFork) + + if updatedFork != nil { + updateForks = append(updateForks, updatedFork) + } + + fmt.Fprintf(&logbuf, ", head2: %v [%v, %v upd]", newFork.fork.leafSlot, newFork.fork.leafRoot.String(), len(newFork.updateRoots)) + } + } + + if logbuf.Len() > 0 { + cache.indexer.logger.Infof("new fork leaf detected (base %v [%v]%v)", parentSlot, parentRoot.String(), logbuf.String()) + } + } + } + + // check scenario 2 + childBlocks := make([]*Block, 0) + for _, child := range cache.indexer.blockCache.getBlocksByParentRoot(block.Root) { + if !child.fokChecked { + continue + } + + childBlocks = append(childBlocks, child) + } + + if len(childBlocks) > 1 { + // one or more forks detected + logbuf := strings.Builder{} + for idx, child := range childBlocks { + if cache.getForkByLeaf(child.Root) != nil { + cache.indexer.logger.Warnf("fork already exists for leaf %v [%v] (processing %v, scenario 2)", child.Slot, child.Root.String(), block.Slot) + } else { + cache.lastForkId++ + fork := newFork(cache.lastForkId, block.Slot, block.Root, child, currentForkId) + cache.addFork(fork) + + updatedRoots, updatedFork := cache.updateForkBlocks(child, fork.forkId, false) + newFork := &newForkInfo{ + fork: fork, + updateRoots: updatedRoots, + } + newForks = append(newForks, newFork) + + if updatedFork != nil { + updateForks = append(updateForks, updatedFork) + } + + fmt.Fprintf(&logbuf, ", head%v: %v [%v, %v upd]", idx+1, newFork.fork.leafSlot, newFork.fork.leafRoot.String(), len(newFork.updateRoots)) + } + } + + if logbuf.Len() > 0 { + cache.indexer.logger.Infof("new child forks detected (base %v [%v]%v)", block.Slot, block.Root.String(), logbuf.String()) + } + } + + // update fork ids of all blocks building on top of this block + updatedBlocks, updatedFork := cache.updateForkBlocks(block, currentForkId, true) + if updatedFork != nil { + updateForks = append(updateForks, updatedFork) + } + + // set detected fork id to the block + block.forkId = currentForkId + block.fokChecked = true + + // update fork head block if needed + fork := cache.getForkById(currentForkId) + if fork != nil { + lastBlock := block + if len(updatedBlocks) > 0 { + lastBlock = cache.indexer.blockCache.getBlockByRoot(phase0.Root(updatedBlocks[len(updatedBlocks)-1])) + } + if lastBlock != nil && (fork.headBlock == nil || lastBlock.Slot > fork.headBlock.Slot) { + fork.headBlock = lastBlock + } + } + + // persist new forks and updated blocks to the database + if len(newForks) > 0 || len(updatedBlocks) > 0 { + err := db.RunDBTransaction(func(tx *sqlx.Tx) error { + // add new forks + for _, newFork := range newForks { + err := db.InsertFork(newFork.fork.toDbFork(), tx) + if err != nil { + return err + } + + if len(newFork.updateRoots) > 0 { + err = db.UpdateUnfinalizedBlockForkId(newFork.updateRoots, uint64(newFork.fork.forkId), tx) + if err != nil { + return err + } + } + } + + // update blocks building on top of current block + if len(updatedBlocks) > 0 { + err := db.UpdateUnfinalizedBlockForkId(updatedBlocks, uint64(currentForkId), tx) + if err != nil { + return err + } + + cache.indexer.logger.Infof("updated %v blocks to fork %v", len(updatedBlocks), currentForkId) + } + + // update parents of forks building on top of current blocks chain segment + if len(updateForks) > 0 { + for _, updatedFork := range updateForks { + err := db.UpdateForkParent(updatedFork.baseRoot, uint64(updatedFork.parent), tx) + if err != nil { + return err + } + } + + cache.indexer.logger.Infof("updated %v fork parents", len(updateForks)) + } + + err := cache.updateForkState(tx) + if err != nil { + return fmt.Errorf("error while updating fork state: %v", err) + } + + return nil + }) + if err != nil { + return err + } + } + + return nil +} + +// updateForkBlocks updates the blocks building on top of the given block in the fork and returns the updated block roots. +func (cache *forkCache) updateForkBlocks(startBlock *Block, forkId ForkKey, skipStartBlock bool) (blockRoots [][]byte, updatedFork *updateForkInfo) { + blockRoots = [][]byte{} + + if !skipStartBlock { + blockRoots = append(blockRoots, startBlock.Root[:]) + } + + for { + nextBlocks := cache.indexer.blockCache.getBlocksByParentRoot(startBlock.Root) + if len(nextBlocks) == 0 { + break + } + + if len(nextBlocks) > 1 { + // potential fork ahead, check if the fork is already processed and has correct parent fork id + if forks := cache.getForkByBase(startBlock.Root); len(forks) > 0 && forks[0].parentFork != forkId { + for _, fork := range forks { + fork.parentFork = forkId + } + + updatedFork = &updateForkInfo{ + baseRoot: startBlock.Root[:], + parent: forkId, + } + } + break + } + + nextBlock := nextBlocks[0] + if !nextBlock.fokChecked { + break + } + + if nextBlock.forkId == forkId { + break + } + + nextBlock.forkId = forkId + blockRoots = append(blockRoots, nextBlock.Root[:]) + + startBlock = nextBlock + } + + return +} From ac5ba220d86ee5fde305d744811b069af079fd81 Mon Sep 17 00:00:00 2001 From: pk910 Date: Sun, 25 Aug 2024 09:56:30 +0200 Subject: [PATCH 8/9] update unfinalized blocks in batches to avoid too big queries --- indexer/beacon/forkdetection.go | 34 ++++++++++++++++++++++++++++----- 1 file changed, 29 insertions(+), 5 deletions(-) diff --git a/indexer/beacon/forkdetection.go b/indexer/beacon/forkdetection.go index 40a83df..b1f7cca 100644 --- a/indexer/beacon/forkdetection.go +++ b/indexer/beacon/forkdetection.go @@ -21,7 +21,8 @@ type updateForkInfo struct { } // processBlock processes a block and detects new forks if any. -// It persists the new forks to the database, updates any subsequent blocks building on top of the given block and returns the fork ID. +// It persists the new forks to the database, sets the forkId of the supplied block +// and updates the forkId of all blocks affected by newly detected forks. func (cache *forkCache) processBlock(block *Block) error { cache.forkProcessLock.Lock() defer cache.forkProcessLock.Unlock() @@ -149,7 +150,7 @@ func (cache *forkCache) processBlock(block *Block) error { } if len(childBlocks) > 1 { - // one or more forks detected + // multiple blocks building on top of the current one, create a fork for each logbuf := strings.Builder{} for idx, child := range childBlocks { if cache.getForkByLeaf(child.Root) != nil { @@ -179,7 +180,7 @@ func (cache *forkCache) processBlock(block *Block) error { } } - // update fork ids of all blocks building on top of this block + // update fork ids of all blocks building on top of the current block updatedBlocks, updatedFork := cache.updateForkBlocks(block, currentForkId, true) if updatedFork != nil { updateForks = append(updateForks, updatedFork) @@ -204,6 +205,29 @@ func (cache *forkCache) processBlock(block *Block) error { // persist new forks and updated blocks to the database if len(newForks) > 0 || len(updatedBlocks) > 0 { err := db.RunDBTransaction(func(tx *sqlx.Tx) error { + // helper function to update unfinalized block fork ids in batches + updateUnfinalizedBlockForkIds := func(updateRoots [][]byte, forkId ForkKey) error { + batchSize := 1000 + numBatches := (len(updateRoots) + batchSize - 1) / batchSize + + for i := 0; i < numBatches; i++ { + start := i * batchSize + end := (i + 1) * batchSize + if end > len(updateRoots) { + end = len(updateRoots) + } + + batchRoots := updateRoots[start:end] + + err := db.UpdateUnfinalizedBlockForkId(batchRoots, uint64(forkId), tx) + if err != nil { + return err + } + } + + return nil + } + // add new forks for _, newFork := range newForks { err := db.InsertFork(newFork.fork.toDbFork(), tx) @@ -212,7 +236,7 @@ func (cache *forkCache) processBlock(block *Block) error { } if len(newFork.updateRoots) > 0 { - err = db.UpdateUnfinalizedBlockForkId(newFork.updateRoots, uint64(newFork.fork.forkId), tx) + err := updateUnfinalizedBlockForkIds(newFork.updateRoots, newFork.fork.forkId) if err != nil { return err } @@ -221,7 +245,7 @@ func (cache *forkCache) processBlock(block *Block) error { // update blocks building on top of current block if len(updatedBlocks) > 0 { - err := db.UpdateUnfinalizedBlockForkId(updatedBlocks, uint64(currentForkId), tx) + err := updateUnfinalizedBlockForkIds(updatedBlocks, currentForkId) if err != nil { return err } From dda9d012c87be3e45e0c0842cd4408c01a3ec8b2 Mon Sep 17 00:00:00 2001 From: pk910 Date: Sun, 25 Aug 2024 23:09:05 +0200 Subject: [PATCH 9/9] fix comments --- indexer/beacon/forkcache.go | 3 +++ indexer/beacon/forkdetection.go | 7 ++++--- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/indexer/beacon/forkcache.go b/indexer/beacon/forkcache.go index d1f6be4..f0cebfe 100644 --- a/indexer/beacon/forkcache.go +++ b/indexer/beacon/forkcache.go @@ -77,6 +77,7 @@ func (cache *forkCache) addFork(fork *Fork) { cache.forkMap[fork.forkId] = fork } +// getForkByLeaf retrieves a fork from the cache by its leaf root. func (cache *forkCache) getForkByLeaf(leafRoot phase0.Root) *Fork { cache.cacheMutex.Lock() defer cache.cacheMutex.Unlock() @@ -90,6 +91,7 @@ func (cache *forkCache) getForkByLeaf(leafRoot phase0.Root) *Fork { return nil } +// getForkByBase retrieves forks from the cache by their base root. func (cache *forkCache) getForkByBase(baseRoot phase0.Root) []*Fork { cache.cacheMutex.Lock() defer cache.cacheMutex.Unlock() @@ -112,6 +114,7 @@ func (cache *forkCache) removeFork(forkId ForkKey) { delete(cache.forkMap, forkId) } +// getParentForkIds returns the parent fork ids of the given fork. func (cache *forkCache) getParentForkIds(forkId ForkKey) []ForkKey { parentForks := []ForkKey{forkId} diff --git a/indexer/beacon/forkdetection.go b/indexer/beacon/forkdetection.go index b1f7cca..12a6e34 100644 --- a/indexer/beacon/forkdetection.go +++ b/indexer/beacon/forkdetection.go @@ -40,8 +40,8 @@ func (cache *forkCache) processBlock(block *Block) error { parentIsProcessed := false parentIsFinalized := false - parentBlock := cache.indexer.blockCache.getBlockByRoot(*parentRoot) - if parentBlock == nil { + if parentBlock := cache.indexer.blockCache.getBlockByRoot(*parentRoot); parentBlock == nil { + // parent block might already be finalized, check if it's in the database blockHead := db.GetBlockHeadByRoot((*parentRoot)[:]) if blockHead != nil { parentForkId = ForkKey(blockHead.ForkId) @@ -102,9 +102,10 @@ func (cache *forkCache) processBlock(block *Block) error { } if !parentIsFinalized && len(otherChildren) == 1 { - // parent (a) is not finalized and our new detected fork the first fork based on this parent (c) + // parent (a) is not finalized and our new detected fork is the first fork based on this parent (c) // we need to create another fork for the other chain that starts from our fork base (b1, b2, ) // and update the blocks building on top of it + // we don't need to care about this if there are other forks already based on the parent // b2 // | // b1 c