Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor ReceiveBlock to receive the block root. #5785

Merged
merged 8 commits into from
May 9, 2020
37 changes: 16 additions & 21 deletions beacon-chain/blockchain/process_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/core/state"
"github.com/prysmaticlabs/prysm/beacon-chain/flags"
stateTrie "github.com/prysmaticlabs/prysm/beacon-chain/state"
"github.com/prysmaticlabs/prysm/beacon-chain/state/stateutil"
"github.com/prysmaticlabs/prysm/shared/attestationutil"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
Expand All @@ -24,6 +23,8 @@ import (
var initialSyncBlockCacheSize = 2 * params.BeaconConfig().SlotsPerEpoch

// onBlock is called when a gossip block is received. It runs regular state transition on the block.
// The block's signing root should be computed before calling this method to avoid redundant
// computation in this method and methods it calls into.
//
// Spec pseudocode definition:
// def on_block(store: Store, block: BeaconBlock) -> None:
Expand Down Expand Up @@ -54,7 +55,7 @@ var initialSyncBlockCacheSize = 2 * params.BeaconConfig().SlotsPerEpoch
// # Update finalized checkpoint
// if state.finalized_checkpoint.epoch > store.finalized_checkpoint.epoch:
// store.finalized_checkpoint = state.finalized_checkpoint
func (s *Service) onBlock(ctx context.Context, signed *ethpb.SignedBeaconBlock) (*stateTrie.BeaconState, error) {
func (s *Service) onBlock(ctx context.Context, signed *ethpb.SignedBeaconBlock, blockRoot [32]byte) (*stateTrie.BeaconState, error) {
ctx, span := trace.StartSpan(ctx, "blockchain.onBlock")
defer span.End()

Expand All @@ -70,13 +71,9 @@ func (s *Service) onBlock(ctx context.Context, signed *ethpb.SignedBeaconBlock)
return nil, err
}

root, err := stateutil.BlockRoot(b)
if err != nil {
return nil, errors.Wrapf(err, "could not get signing root of block %d", b.Slot)
}
log.WithFields(logrus.Fields{
"slot": b.Slot,
"root": fmt.Sprintf("0x%s...", hex.EncodeToString(root[:])[:8]),
"root": fmt.Sprintf("0x%s...", hex.EncodeToString(blockRoot[:])[:8]),
}).Debug("Executing state transition on block")

postState, err := state.ExecuteStateTransition(ctx, preState, signed)
Expand All @@ -88,16 +85,16 @@ func (s *Service) onBlock(ctx context.Context, signed *ethpb.SignedBeaconBlock)
return nil, errors.Wrapf(err, "could not save block from slot %d", b.Slot)
}

if err := s.insertBlockToForkChoiceStore(ctx, b, root, postState); err != nil {
if err := s.insertBlockToForkChoiceStore(ctx, b, blockRoot, postState); err != nil {
return nil, errors.Wrapf(err, "could not insert block %d to fork choice store", b.Slot)
}

if featureconfig.Get().NewStateMgmt {
if err := s.stateGen.SaveState(ctx, root, postState); err != nil {
if err := s.stateGen.SaveState(ctx, blockRoot, postState); err != nil {
return nil, errors.Wrap(err, "could not save state")
}
} else {
if err := s.beaconDB.SaveState(ctx, postState, root); err != nil {
if err := s.beaconDB.SaveState(ctx, postState, blockRoot); err != nil {
return nil, errors.Wrap(err, "could not save state")
}
}
Expand Down Expand Up @@ -191,7 +188,9 @@ func (s *Service) onBlock(ctx context.Context, signed *ethpb.SignedBeaconBlock)
// onBlockInitialSyncStateTransition is called when an initial sync block is received.
// It runs state transition on the block and without any BLS verification. The excluded BLS verification
// includes attestation's aggregated signature. It also does not save attestations.
func (s *Service) onBlockInitialSyncStateTransition(ctx context.Context, signed *ethpb.SignedBeaconBlock) error {
// The block's signing root should be computed before calling this method to avoid redundant
// computation in this method and methods it calls into.
func (s *Service) onBlockInitialSyncStateTransition(ctx context.Context, signed *ethpb.SignedBeaconBlock, blockRoot [32]byte) error {
ctx, span := trace.StartSpan(ctx, "blockchain.onBlock")
defer span.End()

Expand Down Expand Up @@ -219,31 +218,27 @@ func (s *Service) onBlockInitialSyncStateTransition(ctx context.Context, signed
return errors.Wrap(err, "could not execute state transition")
}

root, err := stateutil.BlockRoot(b)
if err != nil {
return errors.Wrapf(err, "could not get signing root of block %d", b.Slot)
}
if !featureconfig.Get().NoInitSyncBatchSaveBlocks {
s.saveInitSyncBlock(root, signed)
s.saveInitSyncBlock(blockRoot, signed)
} else {
if err := s.beaconDB.SaveBlock(ctx, signed); err != nil {
return errors.Wrapf(err, "could not save block from slot %d", b.Slot)
}
}

if err := s.insertBlockToForkChoiceStore(ctx, b, root, postState); err != nil {
if err := s.insertBlockToForkChoiceStore(ctx, b, blockRoot, postState); err != nil {
return errors.Wrapf(err, "could not insert block %d to fork choice store", b.Slot)
}

if featureconfig.Get().NewStateMgmt {
if err := s.stateGen.SaveState(ctx, root, postState); err != nil {
if err := s.stateGen.SaveState(ctx, blockRoot, postState); err != nil {
return errors.Wrap(err, "could not save state")
}
} else {
s.initSyncStateLock.Lock()
defer s.initSyncStateLock.Unlock()
s.initSyncState[root] = postState.Copy()
s.filterBoundaryCandidates(ctx, root, postState)
s.initSyncState[blockRoot] = postState.Copy()
s.filterBoundaryCandidates(ctx, blockRoot, postState)
}

if flags.Get().EnableArchive {
Expand Down Expand Up @@ -341,7 +336,7 @@ func (s *Service) onBlockInitialSyncStateTransition(ctx context.Context, signed
}

if !featureconfig.Get().NewStateMgmt && helpers.IsEpochStart(postState.Slot()) {
if err := s.beaconDB.SaveState(ctx, postState, root); err != nil {
if err := s.beaconDB.SaveState(ctx, postState, blockRoot); err != nil {
return errors.Wrap(err, "could not save state")
}
}
Expand Down
6 changes: 5 additions & 1 deletion beacon-chain/blockchain/process_block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,11 @@ func TestStore_OnBlock(t *testing.T) {
service.prevFinalizedCheckpt = &ethpb.Checkpoint{Root: validGenesisRoot[:]}
service.finalizedCheckpt.Root = roots[0]

_, err := service.onBlock(ctx, &ethpb.SignedBeaconBlock{Block: tt.blk})
root, err := stateutil.BlockRoot(tt.blk)
if err != nil {
t.Error(err)
}
_, err = service.onBlock(ctx, &ethpb.SignedBeaconBlock{Block: tt.blk}, root)
if err == nil || !strings.Contains(err.Error(), tt.wantErrString) {
t.Errorf("Store.OnBlock() error = %v, wantErr = %v", err, tt.wantErrString)
}
Expand Down
66 changes: 23 additions & 43 deletions beacon-chain/blockchain/receive_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
stateTrie "github.com/prysmaticlabs/prysm/beacon-chain/state"
"github.com/prysmaticlabs/prysm/beacon-chain/state/stateutil"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"github.com/prysmaticlabs/prysm/shared/traceutil"
"github.com/sirupsen/logrus"
Expand All @@ -21,10 +20,10 @@ import (

// BlockReceiver interface defines the methods of chain service receive and processing new blocks.
type BlockReceiver interface {
ReceiveBlock(ctx context.Context, block *ethpb.SignedBeaconBlock) error
ReceiveBlockNoPubsub(ctx context.Context, block *ethpb.SignedBeaconBlock) error
ReceiveBlockNoPubsubForkchoice(ctx context.Context, block *ethpb.SignedBeaconBlock) error
ReceiveBlockNoVerify(ctx context.Context, block *ethpb.SignedBeaconBlock) error
ReceiveBlock(ctx context.Context, block *ethpb.SignedBeaconBlock, blockRoot [32]byte) error
ReceiveBlockNoPubsub(ctx context.Context, block *ethpb.SignedBeaconBlock, blockRoot [32]byte) error
ReceiveBlockNoPubsubForkchoice(ctx context.Context, block *ethpb.SignedBeaconBlock, blockRoot [32]byte) error
ReceiveBlockNoVerify(ctx context.Context, block *ethpb.SignedBeaconBlock, blockRoot [32]byte) error
HasInitSyncBlock(root [32]byte) bool
}

Expand All @@ -34,29 +33,24 @@ type BlockReceiver interface {
// 2. Validate block, apply state transition and update check points
// 3. Apply fork choice to the processed block
// 4. Save latest head info
func (s *Service) ReceiveBlock(ctx context.Context, block *ethpb.SignedBeaconBlock) error {
func (s *Service) ReceiveBlock(ctx context.Context, block *ethpb.SignedBeaconBlock, blockRoot [32]byte) error {
ctx, span := trace.StartSpan(ctx, "beacon-chain.blockchain.ReceiveBlock")
defer span.End()

root, err := stateutil.BlockRoot(block.Block)
if err != nil {
return errors.Wrap(err, "could not get signing root on received block")
}

// Broadcast the new block to the network.
if err := s.p2p.Broadcast(ctx, block); err != nil {
return errors.Wrap(err, "could not broadcast block")
}
log.WithFields(logrus.Fields{
"blockRoot": hex.EncodeToString(root[:]),
"blockRoot": hex.EncodeToString(blockRoot[:]),
}).Debug("Broadcasting block")

if err := captureSentTimeMetric(uint64(s.genesisTime.Unix()), block.Block.Slot); err != nil {
// If a node fails to capture metric, this shouldn't cause the block processing to fail.
log.Warnf("Could not capture block sent time metric: %v", err)
}

if err := s.ReceiveBlockNoPubsub(ctx, block); err != nil {
if err := s.ReceiveBlockNoPubsub(ctx, block, blockRoot); err != nil {
return err
}

Expand All @@ -68,13 +62,13 @@ func (s *Service) ReceiveBlock(ctx context.Context, block *ethpb.SignedBeaconBlo
// 1. Validate block, apply state transition and update check points
// 2. Apply fork choice to the processed block
// 3. Save latest head info
func (s *Service) ReceiveBlockNoPubsub(ctx context.Context, block *ethpb.SignedBeaconBlock) error {
func (s *Service) ReceiveBlockNoPubsub(ctx context.Context, block *ethpb.SignedBeaconBlock, blockRoot [32]byte) error {
ctx, span := trace.StartSpan(ctx, "beacon-chain.blockchain.ReceiveBlockNoPubsub")
defer span.End()
blockCopy := stateTrie.CopySignedBeaconBlock(block)

// Apply state transition on the new block.
postState, err := s.onBlock(ctx, blockCopy)
postState, err := s.onBlock(ctx, blockCopy, blockRoot)
if err != nil {
err := errors.Wrap(err, "could not process block")
traceutil.AnnotateError(span, err)
Expand All @@ -94,13 +88,8 @@ func (s *Service) ReceiveBlockNoPubsub(ctx context.Context, block *ethpb.SignedB
defer s.epochParticipationLock.Unlock()
s.epochParticipation[helpers.SlotToEpoch(blockCopy.Block.Slot)] = precompute.Balances

root, err := stateutil.BlockRoot(blockCopy.Block)
if err != nil {
return errors.Wrap(err, "could not get signing root on received block")
}

if featureconfig.Get().DisableForkChoice && block.Block.Slot > s.headSlot() {
if err := s.saveHead(ctx, root); err != nil {
if err := s.saveHead(ctx, blockRoot); err != nil {
return errors.Wrap(err, "could not save head")
}
} else {
Expand All @@ -114,7 +103,7 @@ func (s *Service) ReceiveBlockNoPubsub(ctx context.Context, block *ethpb.SignedB
Type: statefeed.BlockProcessed,
Data: &statefeed.BlockProcessedData{
Slot: blockCopy.Block.Slot,
BlockRoot: root,
BlockRoot: blockRoot,
Verified: true,
},
})
Expand All @@ -123,7 +112,7 @@ func (s *Service) ReceiveBlockNoPubsub(ctx context.Context, block *ethpb.SignedB
reportSlotMetrics(blockCopy.Block.Slot, s.headSlot(), s.CurrentSlot(), s.finalizedCheckpt)

// Log block sync status.
logBlockSyncStatus(blockCopy.Block, root, s.finalizedCheckpt)
logBlockSyncStatus(blockCopy.Block, blockRoot, s.finalizedCheckpt)

// Log state transition data.
logStateTransitionData(blockCopy.Block)
Expand All @@ -135,29 +124,25 @@ func (s *Service) ReceiveBlockNoPubsub(ctx context.Context, block *ethpb.SignedB
// that are preformed blocks that is received from initial sync service. The operations consists of:
// 1. Validate block, apply state transition and update check points
// 2. Save latest head info
func (s *Service) ReceiveBlockNoPubsubForkchoice(ctx context.Context, block *ethpb.SignedBeaconBlock) error {
func (s *Service) ReceiveBlockNoPubsubForkchoice(ctx context.Context, block *ethpb.SignedBeaconBlock, blockRoot [32]byte) error {
ctx, span := trace.StartSpan(ctx, "beacon-chain.blockchain.ReceiveBlockNoForkchoice")
defer span.End()
blockCopy := stateTrie.CopySignedBeaconBlock(block)

// Apply state transition on the new block.
_, err := s.onBlock(ctx, blockCopy)
_, err := s.onBlock(ctx, blockCopy, blockRoot)
if err != nil {
err := errors.Wrap(err, "could not process block")
traceutil.AnnotateError(span, err)
return err
}

root, err := stateutil.BlockRoot(blockCopy.Block)
if err != nil {
return errors.Wrap(err, "could not get signing root on received block")
}
cachedHeadRoot, err := s.HeadRoot(ctx)
if err != nil {
return errors.Wrap(err, "could not get head root from cache")
}
if !bytes.Equal(root[:], cachedHeadRoot) {
if err := s.saveHead(ctx, root); err != nil {
if !bytes.Equal(blockRoot[:], cachedHeadRoot) {
if err := s.saveHead(ctx, blockRoot); err != nil {
return errors.Wrap(err, "could not save head")
}
}
Expand All @@ -167,7 +152,7 @@ func (s *Service) ReceiveBlockNoPubsubForkchoice(ctx context.Context, block *eth
Type: statefeed.BlockProcessed,
Data: &statefeed.BlockProcessedData{
Slot: blockCopy.Block.Slot,
BlockRoot: root,
BlockRoot: blockRoot,
Verified: true,
},
})
Expand All @@ -176,7 +161,7 @@ func (s *Service) ReceiveBlockNoPubsubForkchoice(ctx context.Context, block *eth
reportSlotMetrics(blockCopy.Block.Slot, s.headSlot(), s.CurrentSlot(), s.finalizedCheckpt)

// Log block sync status.
logBlockSyncStatus(blockCopy.Block, root, s.finalizedCheckpt)
logBlockSyncStatus(blockCopy.Block, blockRoot, s.finalizedCheckpt)

// Log state transition data.
logStateTransitionData(blockCopy.Block)
Expand All @@ -191,30 +176,25 @@ func (s *Service) ReceiveBlockNoPubsubForkchoice(ctx context.Context, block *eth
// ReceiveBlockNoVerify runs state transition on a input block without verifying the block's BLS contents.
// Depends on the security model, this is the "minimal" work a node can do to sync the chain.
// It simulates light client behavior and assumes 100% trust with the syncing peer.
func (s *Service) ReceiveBlockNoVerify(ctx context.Context, block *ethpb.SignedBeaconBlock) error {
func (s *Service) ReceiveBlockNoVerify(ctx context.Context, block *ethpb.SignedBeaconBlock, blockRoot [32]byte) error {
ctx, span := trace.StartSpan(ctx, "beacon-chain.blockchain.ReceiveBlockNoVerify")
defer span.End()
blockCopy := stateTrie.CopySignedBeaconBlock(block)

// Apply state transition on the incoming newly received blockCopy without verifying its BLS contents.
if err := s.onBlockInitialSyncStateTransition(ctx, blockCopy); err != nil {
if err := s.onBlockInitialSyncStateTransition(ctx, blockCopy, blockRoot); err != nil {
err := errors.Wrap(err, "could not process block")
traceutil.AnnotateError(span, err)
return err
}

root, err := stateutil.BlockRoot(blockCopy.Block)
if err != nil {
return errors.Wrap(err, "could not get signing root on received blockCopy")
}

cachedHeadRoot, err := s.HeadRoot(ctx)
if err != nil {
return errors.Wrap(err, "could not get head root from cache")
}

if !bytes.Equal(root[:], cachedHeadRoot) {
if err := s.saveHeadNoDB(ctx, blockCopy, root); err != nil {
if !bytes.Equal(blockRoot[:], cachedHeadRoot) {
if err := s.saveHeadNoDB(ctx, blockCopy, blockRoot); err != nil {
err := errors.Wrap(err, "could not save head")
traceutil.AnnotateError(span, err)
return err
Expand All @@ -226,7 +206,7 @@ func (s *Service) ReceiveBlockNoVerify(ctx context.Context, block *ethpb.SignedB
Type: statefeed.BlockProcessed,
Data: &statefeed.BlockProcessedData{
Slot: blockCopy.Block.Slot,
BlockRoot: root,
BlockRoot: blockRoot,
Verified: false,
},
})
Expand Down
8 changes: 4 additions & 4 deletions beacon-chain/blockchain/testing/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,22 +107,22 @@ func (mon *MockOperationNotifier) OperationFeed() *event.Feed {
}

// ReceiveBlock mocks ReceiveBlock method in chain service.
func (ms *ChainService) ReceiveBlock(ctx context.Context, block *ethpb.SignedBeaconBlock) error {
func (ms *ChainService) ReceiveBlock(ctx context.Context, block *ethpb.SignedBeaconBlock, blockRoot [32]byte) error {
return nil
}

// ReceiveBlockNoVerify mocks ReceiveBlockNoVerify method in chain service.
func (ms *ChainService) ReceiveBlockNoVerify(ctx context.Context, block *ethpb.SignedBeaconBlock) error {
func (ms *ChainService) ReceiveBlockNoVerify(ctx context.Context, block *ethpb.SignedBeaconBlock, blockRoot [32]byte) error {
return nil
}

// ReceiveBlockNoPubsub mocks ReceiveBlockNoPubsub method in chain service.
func (ms *ChainService) ReceiveBlockNoPubsub(ctx context.Context, block *ethpb.SignedBeaconBlock) error {
func (ms *ChainService) ReceiveBlockNoPubsub(ctx context.Context, block *ethpb.SignedBeaconBlock, blockRoot [32]byte) error {
return nil
}

// ReceiveBlockNoPubsubForkchoice mocks ReceiveBlockNoPubsubForkchoice method in chain service.
func (ms *ChainService) ReceiveBlockNoPubsubForkchoice(ctx context.Context, block *ethpb.SignedBeaconBlock) error {
func (ms *ChainService) ReceiveBlockNoPubsubForkchoice(ctx context.Context, block *ethpb.SignedBeaconBlock, blockRoot [32]byte) error {
if ms.State == nil {
ms.State = &stateTrie.BeaconState{}
}
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/rpc/validator/proposer.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (vs *Server) ProposeBlock(ctx context.Context, blk *ethpb.SignedBeaconBlock
})
}()

if err := vs.BlockReceiver.ReceiveBlock(ctx, blk); err != nil {
if err := vs.BlockReceiver.ReceiveBlock(ctx, blk, root); err != nil {
return nil, status.Errorf(codes.Internal, "Could not process beacon block: %v", err)
}

Expand Down
13 changes: 11 additions & 2 deletions beacon-chain/sync/initial-sync/blocks_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package initialsync
import (
"context"
"fmt"
"github.com/prysmaticlabs/prysm/beacon-chain/state/stateutil"
"testing"

eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
Expand Down Expand Up @@ -274,11 +275,19 @@ func TestBlocksQueueLoop(t *testing.T) {
return fmt.Errorf("beacon node doesn't have a block in db with root %#x", block.Block.ParentRoot)
}
if featureconfig.Get().InitSyncNoVerify {
if err := mc.ReceiveBlockNoVerify(ctx, block); err != nil {
root, err := stateutil.BlockRoot(block.Block)
if err != nil {
return err
}
if err := mc.ReceiveBlockNoVerify(ctx, block, root); err != nil {
return err
}
} else {
if err := mc.ReceiveBlockNoPubsubForkchoice(ctx, block); err != nil {
root, err := stateutil.BlockRoot(block.Block)
if err != nil {
return err
}
if err := mc.ReceiveBlockNoPubsubForkchoice(ctx, block, root); err != nil {
return err
}
}
Expand Down
Loading