From 87e0fe59283989cbe63796d72613a882a4f9504b Mon Sep 17 00:00:00 2001 From: Preston Van Loon Date: Fri, 8 May 2020 18:22:59 -0700 Subject: [PATCH] Refactor ReceiveBlock to receive the block root. (#5785) * refactor ReceiveBlockNoPubsub to receive block root * Refactor ReceiveBlock to receive block root * A few other minor refactoring to reduce block HTR * use arg, remove HTR * more slight refactoring, comments * fix test build * Merge refs/heads/master into receive-with-root * Merge refs/heads/master into receive-with-root --- beacon-chain/blockchain/process_block.go | 37 +++++------ beacon-chain/blockchain/process_block_test.go | 6 +- beacon-chain/blockchain/receive_block.go | 66 +++++++------------ beacon-chain/blockchain/testing/mock.go | 8 +-- beacon-chain/rpc/validator/proposer.go | 2 +- .../sync/initial-sync/blocks_queue_test.go | 13 +++- beacon-chain/sync/initial-sync/round_robin.go | 19 ++++-- beacon-chain/sync/pending_blocks_queue.go | 16 ++--- beacon-chain/sync/subscriber_beacon_blocks.go | 9 ++- 9 files changed, 89 insertions(+), 87 deletions(-) diff --git a/beacon-chain/blockchain/process_block.go b/beacon-chain/blockchain/process_block.go index 26b0317c41a4..403f509f5766 100644 --- a/beacon-chain/blockchain/process_block.go +++ b/beacon-chain/blockchain/process_block.go @@ -11,7 +11,6 @@ import ( "github.com/prysmaticlabs/prysm/beacon-chain/core/state" "github.com/prysmaticlabs/prysm/beacon-chain/flags" stateTrie "github.com/prysmaticlabs/prysm/beacon-chain/state" - "github.com/prysmaticlabs/prysm/beacon-chain/state/stateutil" "github.com/prysmaticlabs/prysm/shared/attestationutil" "github.com/prysmaticlabs/prysm/shared/bytesutil" "github.com/prysmaticlabs/prysm/shared/featureconfig" @@ -24,6 +23,8 @@ import ( var initialSyncBlockCacheSize = 2 * params.BeaconConfig().SlotsPerEpoch // onBlock is called when a gossip block is received. It runs regular state transition on the block. +// The block's signing root should be computed before calling this method to avoid redundant +// computation in this method and methods it calls into. // // Spec pseudocode definition: // def on_block(store: Store, block: BeaconBlock) -> None: @@ -54,7 +55,7 @@ var initialSyncBlockCacheSize = 2 * params.BeaconConfig().SlotsPerEpoch // # Update finalized checkpoint // if state.finalized_checkpoint.epoch > store.finalized_checkpoint.epoch: // store.finalized_checkpoint = state.finalized_checkpoint -func (s *Service) onBlock(ctx context.Context, signed *ethpb.SignedBeaconBlock) (*stateTrie.BeaconState, error) { +func (s *Service) onBlock(ctx context.Context, signed *ethpb.SignedBeaconBlock, blockRoot [32]byte) (*stateTrie.BeaconState, error) { ctx, span := trace.StartSpan(ctx, "blockchain.onBlock") defer span.End() @@ -70,13 +71,9 @@ func (s *Service) onBlock(ctx context.Context, signed *ethpb.SignedBeaconBlock) return nil, err } - root, err := stateutil.BlockRoot(b) - if err != nil { - return nil, errors.Wrapf(err, "could not get signing root of block %d", b.Slot) - } log.WithFields(logrus.Fields{ "slot": b.Slot, - "root": fmt.Sprintf("0x%s...", hex.EncodeToString(root[:])[:8]), + "root": fmt.Sprintf("0x%s...", hex.EncodeToString(blockRoot[:])[:8]), }).Debug("Executing state transition on block") postState, err := state.ExecuteStateTransition(ctx, preState, signed) @@ -88,16 +85,16 @@ func (s *Service) onBlock(ctx context.Context, signed *ethpb.SignedBeaconBlock) return nil, errors.Wrapf(err, "could not save block from slot %d", b.Slot) } - if err := s.insertBlockToForkChoiceStore(ctx, b, root, postState); err != nil { + if err := s.insertBlockToForkChoiceStore(ctx, b, blockRoot, postState); err != nil { return nil, errors.Wrapf(err, "could not insert block %d to fork choice store", b.Slot) } if featureconfig.Get().NewStateMgmt { - if err := s.stateGen.SaveState(ctx, root, postState); err != nil { + if err := s.stateGen.SaveState(ctx, blockRoot, postState); err != nil { return nil, errors.Wrap(err, "could not save state") } } else { - if err := s.beaconDB.SaveState(ctx, postState, root); err != nil { + if err := s.beaconDB.SaveState(ctx, postState, blockRoot); err != nil { return nil, errors.Wrap(err, "could not save state") } } @@ -191,7 +188,9 @@ func (s *Service) onBlock(ctx context.Context, signed *ethpb.SignedBeaconBlock) // onBlockInitialSyncStateTransition is called when an initial sync block is received. // It runs state transition on the block and without any BLS verification. The excluded BLS verification // includes attestation's aggregated signature. It also does not save attestations. -func (s *Service) onBlockInitialSyncStateTransition(ctx context.Context, signed *ethpb.SignedBeaconBlock) error { +// The block's signing root should be computed before calling this method to avoid redundant +// computation in this method and methods it calls into. +func (s *Service) onBlockInitialSyncStateTransition(ctx context.Context, signed *ethpb.SignedBeaconBlock, blockRoot [32]byte) error { ctx, span := trace.StartSpan(ctx, "blockchain.onBlock") defer span.End() @@ -219,31 +218,27 @@ func (s *Service) onBlockInitialSyncStateTransition(ctx context.Context, signed return errors.Wrap(err, "could not execute state transition") } - root, err := stateutil.BlockRoot(b) - if err != nil { - return errors.Wrapf(err, "could not get signing root of block %d", b.Slot) - } if !featureconfig.Get().NoInitSyncBatchSaveBlocks { - s.saveInitSyncBlock(root, signed) + s.saveInitSyncBlock(blockRoot, signed) } else { if err := s.beaconDB.SaveBlock(ctx, signed); err != nil { return errors.Wrapf(err, "could not save block from slot %d", b.Slot) } } - if err := s.insertBlockToForkChoiceStore(ctx, b, root, postState); err != nil { + if err := s.insertBlockToForkChoiceStore(ctx, b, blockRoot, postState); err != nil { return errors.Wrapf(err, "could not insert block %d to fork choice store", b.Slot) } if featureconfig.Get().NewStateMgmt { - if err := s.stateGen.SaveState(ctx, root, postState); err != nil { + if err := s.stateGen.SaveState(ctx, blockRoot, postState); err != nil { return errors.Wrap(err, "could not save state") } } else { s.initSyncStateLock.Lock() defer s.initSyncStateLock.Unlock() - s.initSyncState[root] = postState.Copy() - s.filterBoundaryCandidates(ctx, root, postState) + s.initSyncState[blockRoot] = postState.Copy() + s.filterBoundaryCandidates(ctx, blockRoot, postState) } if flags.Get().EnableArchive { @@ -341,7 +336,7 @@ func (s *Service) onBlockInitialSyncStateTransition(ctx context.Context, signed } if !featureconfig.Get().NewStateMgmt && helpers.IsEpochStart(postState.Slot()) { - if err := s.beaconDB.SaveState(ctx, postState, root); err != nil { + if err := s.beaconDB.SaveState(ctx, postState, blockRoot); err != nil { return errors.Wrap(err, "could not save state") } } diff --git a/beacon-chain/blockchain/process_block_test.go b/beacon-chain/blockchain/process_block_test.go index c5349f8c751d..64abf0791e28 100644 --- a/beacon-chain/blockchain/process_block_test.go +++ b/beacon-chain/blockchain/process_block_test.go @@ -119,7 +119,11 @@ func TestStore_OnBlock(t *testing.T) { service.prevFinalizedCheckpt = ðpb.Checkpoint{Root: validGenesisRoot[:]} service.finalizedCheckpt.Root = roots[0] - _, err := service.onBlock(ctx, ðpb.SignedBeaconBlock{Block: tt.blk}) + root, err := stateutil.BlockRoot(tt.blk) + if err != nil { + t.Error(err) + } + _, err = service.onBlock(ctx, ðpb.SignedBeaconBlock{Block: tt.blk}, root) if err == nil || !strings.Contains(err.Error(), tt.wantErrString) { t.Errorf("Store.OnBlock() error = %v, wantErr = %v", err, tt.wantErrString) } diff --git a/beacon-chain/blockchain/receive_block.go b/beacon-chain/blockchain/receive_block.go index 79203b1e8475..33141276a6bc 100644 --- a/beacon-chain/blockchain/receive_block.go +++ b/beacon-chain/blockchain/receive_block.go @@ -12,7 +12,6 @@ import ( statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state" "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" stateTrie "github.com/prysmaticlabs/prysm/beacon-chain/state" - "github.com/prysmaticlabs/prysm/beacon-chain/state/stateutil" "github.com/prysmaticlabs/prysm/shared/featureconfig" "github.com/prysmaticlabs/prysm/shared/traceutil" "github.com/sirupsen/logrus" @@ -21,10 +20,10 @@ import ( // BlockReceiver interface defines the methods of chain service receive and processing new blocks. type BlockReceiver interface { - ReceiveBlock(ctx context.Context, block *ethpb.SignedBeaconBlock) error - ReceiveBlockNoPubsub(ctx context.Context, block *ethpb.SignedBeaconBlock) error - ReceiveBlockNoPubsubForkchoice(ctx context.Context, block *ethpb.SignedBeaconBlock) error - ReceiveBlockNoVerify(ctx context.Context, block *ethpb.SignedBeaconBlock) error + ReceiveBlock(ctx context.Context, block *ethpb.SignedBeaconBlock, blockRoot [32]byte) error + ReceiveBlockNoPubsub(ctx context.Context, block *ethpb.SignedBeaconBlock, blockRoot [32]byte) error + ReceiveBlockNoPubsubForkchoice(ctx context.Context, block *ethpb.SignedBeaconBlock, blockRoot [32]byte) error + ReceiveBlockNoVerify(ctx context.Context, block *ethpb.SignedBeaconBlock, blockRoot [32]byte) error HasInitSyncBlock(root [32]byte) bool } @@ -34,21 +33,16 @@ type BlockReceiver interface { // 2. Validate block, apply state transition and update check points // 3. Apply fork choice to the processed block // 4. Save latest head info -func (s *Service) ReceiveBlock(ctx context.Context, block *ethpb.SignedBeaconBlock) error { +func (s *Service) ReceiveBlock(ctx context.Context, block *ethpb.SignedBeaconBlock, blockRoot [32]byte) error { ctx, span := trace.StartSpan(ctx, "beacon-chain.blockchain.ReceiveBlock") defer span.End() - root, err := stateutil.BlockRoot(block.Block) - if err != nil { - return errors.Wrap(err, "could not get signing root on received block") - } - // Broadcast the new block to the network. if err := s.p2p.Broadcast(ctx, block); err != nil { return errors.Wrap(err, "could not broadcast block") } log.WithFields(logrus.Fields{ - "blockRoot": hex.EncodeToString(root[:]), + "blockRoot": hex.EncodeToString(blockRoot[:]), }).Debug("Broadcasting block") if err := captureSentTimeMetric(uint64(s.genesisTime.Unix()), block.Block.Slot); err != nil { @@ -56,7 +50,7 @@ func (s *Service) ReceiveBlock(ctx context.Context, block *ethpb.SignedBeaconBlo log.Warnf("Could not capture block sent time metric: %v", err) } - if err := s.ReceiveBlockNoPubsub(ctx, block); err != nil { + if err := s.ReceiveBlockNoPubsub(ctx, block, blockRoot); err != nil { return err } @@ -68,13 +62,13 @@ func (s *Service) ReceiveBlock(ctx context.Context, block *ethpb.SignedBeaconBlo // 1. Validate block, apply state transition and update check points // 2. Apply fork choice to the processed block // 3. Save latest head info -func (s *Service) ReceiveBlockNoPubsub(ctx context.Context, block *ethpb.SignedBeaconBlock) error { +func (s *Service) ReceiveBlockNoPubsub(ctx context.Context, block *ethpb.SignedBeaconBlock, blockRoot [32]byte) error { ctx, span := trace.StartSpan(ctx, "beacon-chain.blockchain.ReceiveBlockNoPubsub") defer span.End() blockCopy := stateTrie.CopySignedBeaconBlock(block) // Apply state transition on the new block. - postState, err := s.onBlock(ctx, blockCopy) + postState, err := s.onBlock(ctx, blockCopy, blockRoot) if err != nil { err := errors.Wrap(err, "could not process block") traceutil.AnnotateError(span, err) @@ -94,13 +88,8 @@ func (s *Service) ReceiveBlockNoPubsub(ctx context.Context, block *ethpb.SignedB defer s.epochParticipationLock.Unlock() s.epochParticipation[helpers.SlotToEpoch(blockCopy.Block.Slot)] = precompute.Balances - root, err := stateutil.BlockRoot(blockCopy.Block) - if err != nil { - return errors.Wrap(err, "could not get signing root on received block") - } - if featureconfig.Get().DisableForkChoice && block.Block.Slot > s.headSlot() { - if err := s.saveHead(ctx, root); err != nil { + if err := s.saveHead(ctx, blockRoot); err != nil { return errors.Wrap(err, "could not save head") } } else { @@ -114,7 +103,7 @@ func (s *Service) ReceiveBlockNoPubsub(ctx context.Context, block *ethpb.SignedB Type: statefeed.BlockProcessed, Data: &statefeed.BlockProcessedData{ Slot: blockCopy.Block.Slot, - BlockRoot: root, + BlockRoot: blockRoot, Verified: true, }, }) @@ -123,7 +112,7 @@ func (s *Service) ReceiveBlockNoPubsub(ctx context.Context, block *ethpb.SignedB reportSlotMetrics(blockCopy.Block.Slot, s.headSlot(), s.CurrentSlot(), s.finalizedCheckpt) // Log block sync status. - logBlockSyncStatus(blockCopy.Block, root, s.finalizedCheckpt) + logBlockSyncStatus(blockCopy.Block, blockRoot, s.finalizedCheckpt) // Log state transition data. logStateTransitionData(blockCopy.Block) @@ -135,29 +124,25 @@ func (s *Service) ReceiveBlockNoPubsub(ctx context.Context, block *ethpb.SignedB // that are preformed blocks that is received from initial sync service. The operations consists of: // 1. Validate block, apply state transition and update check points // 2. Save latest head info -func (s *Service) ReceiveBlockNoPubsubForkchoice(ctx context.Context, block *ethpb.SignedBeaconBlock) error { +func (s *Service) ReceiveBlockNoPubsubForkchoice(ctx context.Context, block *ethpb.SignedBeaconBlock, blockRoot [32]byte) error { ctx, span := trace.StartSpan(ctx, "beacon-chain.blockchain.ReceiveBlockNoForkchoice") defer span.End() blockCopy := stateTrie.CopySignedBeaconBlock(block) // Apply state transition on the new block. - _, err := s.onBlock(ctx, blockCopy) + _, err := s.onBlock(ctx, blockCopy, blockRoot) if err != nil { err := errors.Wrap(err, "could not process block") traceutil.AnnotateError(span, err) return err } - root, err := stateutil.BlockRoot(blockCopy.Block) - if err != nil { - return errors.Wrap(err, "could not get signing root on received block") - } cachedHeadRoot, err := s.HeadRoot(ctx) if err != nil { return errors.Wrap(err, "could not get head root from cache") } - if !bytes.Equal(root[:], cachedHeadRoot) { - if err := s.saveHead(ctx, root); err != nil { + if !bytes.Equal(blockRoot[:], cachedHeadRoot) { + if err := s.saveHead(ctx, blockRoot); err != nil { return errors.Wrap(err, "could not save head") } } @@ -167,7 +152,7 @@ func (s *Service) ReceiveBlockNoPubsubForkchoice(ctx context.Context, block *eth Type: statefeed.BlockProcessed, Data: &statefeed.BlockProcessedData{ Slot: blockCopy.Block.Slot, - BlockRoot: root, + BlockRoot: blockRoot, Verified: true, }, }) @@ -176,7 +161,7 @@ func (s *Service) ReceiveBlockNoPubsubForkchoice(ctx context.Context, block *eth reportSlotMetrics(blockCopy.Block.Slot, s.headSlot(), s.CurrentSlot(), s.finalizedCheckpt) // Log block sync status. - logBlockSyncStatus(blockCopy.Block, root, s.finalizedCheckpt) + logBlockSyncStatus(blockCopy.Block, blockRoot, s.finalizedCheckpt) // Log state transition data. logStateTransitionData(blockCopy.Block) @@ -191,30 +176,25 @@ func (s *Service) ReceiveBlockNoPubsubForkchoice(ctx context.Context, block *eth // ReceiveBlockNoVerify runs state transition on a input block without verifying the block's BLS contents. // Depends on the security model, this is the "minimal" work a node can do to sync the chain. // It simulates light client behavior and assumes 100% trust with the syncing peer. -func (s *Service) ReceiveBlockNoVerify(ctx context.Context, block *ethpb.SignedBeaconBlock) error { +func (s *Service) ReceiveBlockNoVerify(ctx context.Context, block *ethpb.SignedBeaconBlock, blockRoot [32]byte) error { ctx, span := trace.StartSpan(ctx, "beacon-chain.blockchain.ReceiveBlockNoVerify") defer span.End() blockCopy := stateTrie.CopySignedBeaconBlock(block) // Apply state transition on the incoming newly received blockCopy without verifying its BLS contents. - if err := s.onBlockInitialSyncStateTransition(ctx, blockCopy); err != nil { + if err := s.onBlockInitialSyncStateTransition(ctx, blockCopy, blockRoot); err != nil { err := errors.Wrap(err, "could not process block") traceutil.AnnotateError(span, err) return err } - root, err := stateutil.BlockRoot(blockCopy.Block) - if err != nil { - return errors.Wrap(err, "could not get signing root on received blockCopy") - } - cachedHeadRoot, err := s.HeadRoot(ctx) if err != nil { return errors.Wrap(err, "could not get head root from cache") } - if !bytes.Equal(root[:], cachedHeadRoot) { - if err := s.saveHeadNoDB(ctx, blockCopy, root); err != nil { + if !bytes.Equal(blockRoot[:], cachedHeadRoot) { + if err := s.saveHeadNoDB(ctx, blockCopy, blockRoot); err != nil { err := errors.Wrap(err, "could not save head") traceutil.AnnotateError(span, err) return err @@ -226,7 +206,7 @@ func (s *Service) ReceiveBlockNoVerify(ctx context.Context, block *ethpb.SignedB Type: statefeed.BlockProcessed, Data: &statefeed.BlockProcessedData{ Slot: blockCopy.Block.Slot, - BlockRoot: root, + BlockRoot: blockRoot, Verified: false, }, }) diff --git a/beacon-chain/blockchain/testing/mock.go b/beacon-chain/blockchain/testing/mock.go index a6929997f103..d3f064554a98 100644 --- a/beacon-chain/blockchain/testing/mock.go +++ b/beacon-chain/blockchain/testing/mock.go @@ -107,22 +107,22 @@ func (mon *MockOperationNotifier) OperationFeed() *event.Feed { } // ReceiveBlock mocks ReceiveBlock method in chain service. -func (ms *ChainService) ReceiveBlock(ctx context.Context, block *ethpb.SignedBeaconBlock) error { +func (ms *ChainService) ReceiveBlock(ctx context.Context, block *ethpb.SignedBeaconBlock, blockRoot [32]byte) error { return nil } // ReceiveBlockNoVerify mocks ReceiveBlockNoVerify method in chain service. -func (ms *ChainService) ReceiveBlockNoVerify(ctx context.Context, block *ethpb.SignedBeaconBlock) error { +func (ms *ChainService) ReceiveBlockNoVerify(ctx context.Context, block *ethpb.SignedBeaconBlock, blockRoot [32]byte) error { return nil } // ReceiveBlockNoPubsub mocks ReceiveBlockNoPubsub method in chain service. -func (ms *ChainService) ReceiveBlockNoPubsub(ctx context.Context, block *ethpb.SignedBeaconBlock) error { +func (ms *ChainService) ReceiveBlockNoPubsub(ctx context.Context, block *ethpb.SignedBeaconBlock, blockRoot [32]byte) error { return nil } // ReceiveBlockNoPubsubForkchoice mocks ReceiveBlockNoPubsubForkchoice method in chain service. -func (ms *ChainService) ReceiveBlockNoPubsubForkchoice(ctx context.Context, block *ethpb.SignedBeaconBlock) error { +func (ms *ChainService) ReceiveBlockNoPubsubForkchoice(ctx context.Context, block *ethpb.SignedBeaconBlock, blockRoot [32]byte) error { if ms.State == nil { ms.State = &stateTrie.BeaconState{} } diff --git a/beacon-chain/rpc/validator/proposer.go b/beacon-chain/rpc/validator/proposer.go index fa2367cbed5e..a6220b47e463 100644 --- a/beacon-chain/rpc/validator/proposer.go +++ b/beacon-chain/rpc/validator/proposer.go @@ -133,7 +133,7 @@ func (vs *Server) ProposeBlock(ctx context.Context, blk *ethpb.SignedBeaconBlock }) }() - if err := vs.BlockReceiver.ReceiveBlock(ctx, blk); err != nil { + if err := vs.BlockReceiver.ReceiveBlock(ctx, blk, root); err != nil { return nil, status.Errorf(codes.Internal, "Could not process beacon block: %v", err) } diff --git a/beacon-chain/sync/initial-sync/blocks_queue_test.go b/beacon-chain/sync/initial-sync/blocks_queue_test.go index 645948e9e2fe..d3c8ef9b9fe9 100644 --- a/beacon-chain/sync/initial-sync/blocks_queue_test.go +++ b/beacon-chain/sync/initial-sync/blocks_queue_test.go @@ -3,6 +3,7 @@ package initialsync import ( "context" "fmt" + "github.com/prysmaticlabs/prysm/beacon-chain/state/stateutil" "testing" eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" @@ -274,11 +275,19 @@ func TestBlocksQueueLoop(t *testing.T) { return fmt.Errorf("beacon node doesn't have a block in db with root %#x", block.Block.ParentRoot) } if featureconfig.Get().InitSyncNoVerify { - if err := mc.ReceiveBlockNoVerify(ctx, block); err != nil { + root, err := stateutil.BlockRoot(block.Block) + if err != nil { + return err + } + if err := mc.ReceiveBlockNoVerify(ctx, block, root); err != nil { return err } } else { - if err := mc.ReceiveBlockNoPubsubForkchoice(ctx, block); err != nil { + root, err := stateutil.BlockRoot(block.Block) + if err != nil { + return err + } + if err := mc.ReceiveBlockNoPubsubForkchoice(ctx, block, root); err != nil { return err } } diff --git a/beacon-chain/sync/initial-sync/round_robin.go b/beacon-chain/sync/initial-sync/round_robin.go index 57ba83633c37..51062b9274d2 100644 --- a/beacon-chain/sync/initial-sync/round_robin.go +++ b/beacon-chain/sync/initial-sync/round_robin.go @@ -58,7 +58,12 @@ func (s *Service) roundRobinSync(genesis time.Time) error { // Step 1 - Sync to end of finalized epoch. for blk := range queue.fetchedBlocks { s.logSyncStatus(genesis, blk.Block, counter) - if err := s.processBlock(ctx, blk); err != nil { + root, err := stateutil.BlockRoot(blk.Block) + if err != nil { + log.WithError(err).Info("Cannot determine root of block") + continue + } + if err := s.processBlock(ctx, blk, root); err != nil { log.WithError(err).Info("Block is invalid") continue } @@ -105,7 +110,11 @@ func (s *Service) roundRobinSync(genesis time.Time) error { for _, blk := range resp { s.logSyncStatus(genesis, blk.Block, counter) - if err := s.chain.ReceiveBlockNoPubsubForkchoice(ctx, blk); err != nil { + root, err := stateutil.BlockRoot(blk.Block) + if err != nil { + return err + } + if err := s.chain.ReceiveBlockNoPubsubForkchoice(ctx, blk, root); err != nil { log.WithError(err).Error("Failed to process block, exiting init sync") return nil } @@ -160,7 +169,7 @@ func (s *Service) logSyncStatus(genesis time.Time, blk *eth.BeaconBlock, counter ) } -func (s *Service) processBlock(ctx context.Context, blk *eth.SignedBeaconBlock) error { +func (s *Service) processBlock(ctx context.Context, blk *eth.SignedBeaconBlock, blockRoot [32]byte) error { parentRoot := bytesutil.ToBytes32(blk.Block.ParentRoot) if !s.db.HasBlock(ctx, parentRoot) && !s.chain.HasInitSyncBlock(parentRoot) { return fmt.Errorf("beacon node doesn't have a block in db with root %#x", blk.Block.ParentRoot) @@ -170,11 +179,11 @@ func (s *Service) processBlock(ctx context.Context, blk *eth.SignedBeaconBlock) Data: &blockfeed.ReceivedBlockData{SignedBlock: blk}, }) if featureconfig.Get().InitSyncNoVerify { - if err := s.chain.ReceiveBlockNoVerify(ctx, blk); err != nil { + if err := s.chain.ReceiveBlockNoVerify(ctx, blk, blockRoot); err != nil { return err } } else { - if err := s.chain.ReceiveBlockNoPubsubForkchoice(ctx, blk); err != nil { + if err := s.chain.ReceiveBlockNoPubsubForkchoice(ctx, blk, blockRoot); err != nil { return err } } diff --git a/beacon-chain/sync/pending_blocks_queue.go b/beacon-chain/sync/pending_blocks_queue.go index 1f7e7ba6e06c..55f2da453f42 100644 --- a/beacon-chain/sync/pending_blocks_queue.go +++ b/beacon-chain/sync/pending_blocks_queue.go @@ -105,7 +105,14 @@ func (r *Service) processPendingBlocks(ctx context.Context) error { continue } - if err := r.chain.ReceiveBlockNoPubsub(ctx, b); err != nil { + blkRoot, err := stateutil.BlockRoot(b.Block) + if err != nil { + traceutil.AnnotateError(span, err) + span.End() + return err + } + + if err := r.chain.ReceiveBlockNoPubsub(ctx, b, blkRoot); err != nil { log.Errorf("Could not process block from slot %d: %v", b.Block.Slot, err) traceutil.AnnotateError(span, err) } @@ -115,13 +122,6 @@ func (r *Service) processPendingBlocks(ctx context.Context) error { log.WithError(err).Error("Failed to broadcast block") } - blkRoot, err := stateutil.BlockRoot(b.Block) - if err != nil { - traceutil.AnnotateError(span, err) - span.End() - return err - } - r.pendingQueueLock.Lock() delete(r.slotToPendingBlocks, uint64(s)) delete(r.seenPendingBlocks, blkRoot) diff --git a/beacon-chain/sync/subscriber_beacon_blocks.go b/beacon-chain/sync/subscriber_beacon_blocks.go index d0d3d72ae849..9cc41672c662 100644 --- a/beacon-chain/sync/subscriber_beacon_blocks.go +++ b/beacon-chain/sync/subscriber_beacon_blocks.go @@ -3,6 +3,7 @@ package sync import ( "context" "errors" + "github.com/prysmaticlabs/prysm/beacon-chain/state/stateutil" "github.com/gogo/protobuf/proto" ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" @@ -26,6 +27,11 @@ func (r *Service) beaconBlockSubscriber(ctx context.Context, msg proto.Message) block := signed.Block + root, err := stateutil.BlockRoot(block) + if err != nil { + return err + } + // Broadcast the block on a feed to notify other services in the beacon node // of a received block (even if it does not process correctly through a state transition). r.blockNotifier.BlockFeed().Send(&feed.Event{ @@ -35,8 +41,7 @@ func (r *Service) beaconBlockSubscriber(ctx context.Context, msg proto.Message) }, }) - err := r.chain.ReceiveBlockNoPubsub(ctx, signed) - if err != nil { + if err := r.chain.ReceiveBlockNoPubsub(ctx, signed, root); err != nil { interop.WriteBlockToDisk(signed, true /*failed*/) }