Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Regen historical states for new-state-mgmt compatibility #5261

Merged
merged 10 commits into from
Mar 31, 2020
4 changes: 4 additions & 0 deletions beacon-chain/blockchain/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,10 @@ func (s *Service) pruneGarbageState(ctx context.Context, slot uint64) error {
return err
}

if err := s.beaconDB.SaveLastArchivedIndex(ctx, 0); err != nil {
return err
}

return nil
}

Expand Down
5 changes: 4 additions & 1 deletion beacon-chain/db/kv/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@ go_library(
"attestations.go",
"backup.go",
"blocks.go",
"check_state.go",
"check_historical_state.go",
"checkpoint.go",
"deposit_contract.go",
"encoding.go",
"finalized_block_roots.go",
"kv.go",
"operations.go",
"powchain.go",
"regen_historical_states.go",
"schema.go",
"slashings.go",
"state.go",
Expand All @@ -28,13 +29,15 @@ go_library(
deps = [
"//beacon-chain/cache:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/core/state:go_default_library",
"//beacon-chain/db/filters:go_default_library",
"//beacon-chain/db/iface:go_default_library",
"//beacon-chain/state:go_default_library",
"//beacon-chain/state/stateutil:go_default_library",
"//proto/beacon/db:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
"//shared/bytesutil:go_default_library",
"//shared/cmd:go_default_library",
"//shared/featureconfig:go_default_library",
"//shared/params:go_default_library",
"//shared/sliceutil:go_default_library",
Expand Down
16 changes: 16 additions & 0 deletions beacon-chain/db/kv/archived_point.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kv

import (
"context"
"encoding/binary"

"github.com/prysmaticlabs/prysm/shared/bytesutil"
bolt "go.etcd.io/bbolt"
Expand Down Expand Up @@ -29,6 +30,21 @@ func (k *Store) SaveLastArchivedIndex(ctx context.Context, index uint64) error {
})
}

// LastArchivedIndex from the db.
func (k *Store) LastArchivedIndex(ctx context.Context) (uint64, error) {
ctx, span := trace.StartSpan(ctx, "BeaconDB.LastArchivedIndex")
defer span.End()
var index uint64
err := k.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(archivedIndexRootBucket)
b := bucket.Get(lastArchivedIndexKey)
index = binary.LittleEndian.Uint64(b)
return nil
})

return index, err
}

// LastArchivedIndexRoot from the db.
func (k *Store) LastArchivedIndexRoot(ctx context.Context) [32]byte {
ctx, span := trace.StartSpan(ctx, "BeaconDB.LastArchivedIndexRoot")
Expand Down
55 changes: 55 additions & 0 deletions beacon-chain/db/kv/check_historical_state.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package kv

import (
"context"

"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/shared/cmd"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
bolt "go.etcd.io/bbolt"
)

var historicalStateDeletedKey = []byte("historical-states-deleted")

func (kv *Store) ensureNewStateServiceCompatible(ctx context.Context) error {
if !featureconfig.Get().NewStateMgmt {
return kv.db.Update(func(tx *bolt.Tx) error {
bkt := tx.Bucket(newStateServiceCompatibleBucket)
return bkt.Put(historicalStateDeletedKey, []byte{0x01})
})
}

var historicalStateDeleted bool
kv.db.View(func(tx *bolt.Tx) error {
bkt := tx.Bucket(newStateServiceCompatibleBucket)
v := bkt.Get(historicalStateDeletedKey)
historicalStateDeleted = len(v) == 1 && v[0] == 0x01
return nil
})

regenHistoricalStatesConfirmed := false
var err error
if historicalStateDeleted {
actionText := "Looks like you stopped using --new-state-mgmt. To reuse it, the node will need " +
"to generate and save historical states. The process may take a while, - do you want to proceed? (Y/N)"
deniedText := "Historical states will not be generated. Please remove usage --new-state-mgmt"

regenHistoricalStatesConfirmed, err = cmd.ConfirmAction(actionText, deniedText)
if err != nil {
return err
}

if !regenHistoricalStatesConfirmed {
return errors.New("exiting... please do not run with flag --new-state-mgmt")
}

if err := kv.regenHistoricalStates(ctx); err != nil {
return errors.Wrap(err, "could not regenerate historical states, please retry")
}
}

return kv.db.Update(func(tx *bolt.Tx) error {
bkt := tx.Bucket(newStateServiceCompatibleBucket)
return bkt.Put(historicalStateDeletedKey, []byte{0x00})
})
}
33 changes: 0 additions & 33 deletions beacon-chain/db/kv/check_state.go

This file was deleted.

3 changes: 2 additions & 1 deletion beacon-chain/db/kv/kv.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kv

import (
"context"
"os"
"path"
"sync"
Expand Down Expand Up @@ -120,7 +121,7 @@ func NewKVStore(dirPath string, stateSummaryCache *cache.StateSummaryCache) (*St
return nil, err
}

if err := kv.ensureNewStateServiceCompatible(); err != nil {
if err := kv.ensureNewStateServiceCompatible(context.Background()); err != nil {
return nil, err
}

Expand Down
194 changes: 194 additions & 0 deletions beacon-chain/db/kv/regen_historical_states.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
package kv

import (
"context"
"fmt"

"github.com/pkg/errors"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/go-ssz"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
transition "github.com/prysmaticlabs/prysm/beacon-chain/core/state"
"github.com/prysmaticlabs/prysm/beacon-chain/db/filters"
stateTrie "github.com/prysmaticlabs/prysm/beacon-chain/state"
"github.com/prysmaticlabs/prysm/shared/params"
log "github.com/sirupsen/logrus"
"go.opencensus.io/trace"
)

func (kv *Store) regenHistoricalStates(ctx context.Context) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function does too much, it should be refactored into smaller pieces perhaps within this same file

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I refactored 2 smaller functions out of it. Unfortunately it can't be refactored much further or else it begin to hurts readability and do more con than pro imo...

ctx, span := trace.StartSpan(ctx, "db.regenHistoricalStates")
defer span.End()

genesisState, err := kv.GenesisState(ctx)
if err != nil {
return err
}
currentState := genesisState.Copy()
startSlot := genesisState.Slot()

// Restore from last archived point if this process was previously interrupted.
slotsPerArchivedPoint := params.BeaconConfig().SlotsPerArchivedPoint
lastArchivedIndex, err := kv.LastArchivedIndex(ctx)
if err != nil {
return err
}
if lastArchivedIndex > 0 {
archivedIndexStart := lastArchivedIndex - 1
wantedSlotBelow := archivedIndexStart*slotsPerArchivedPoint + 1
states, err := kv.HighestSlotStatesBelow(ctx, wantedSlotBelow)
if err != nil {
return err
}
if len(states) == 0 {
return errors.New("states can't be empty")
}
if states[0] == nil {
return errors.New("nil last state")
}
currentState = states[0]
startSlot = currentState.Slot()
}

lastSavedBlockArchivedIndex, err := kv.lastSavedBlockArchivedIndex(ctx)
if err != nil {
return err
}
for i := lastArchivedIndex; i <= lastSavedBlockArchivedIndex; i++ {
targetSlot := startSlot + slotsPerArchivedPoint
filter := filters.NewFilter().SetStartSlot(startSlot + 1).SetEndSlot(targetSlot)
blocks, err := kv.Blocks(ctx, filter)
if err != nil {
return err
}

// Replay blocks and replay slots if necessary.
if len(blocks) > 0 {
for i := 0; i < len(blocks); i++ {
if blocks[i].Block.Slot == 0 {
continue
}
currentState, err = regenHistoricalStateTransition(ctx, currentState, blocks[i])
if err != nil {
return err
}
}
}
if targetSlot > currentState.Slot() {
currentState, err = regenHistoricalStateProcessSlots(ctx, currentState, targetSlot)
if err != nil {
return err
}
}

if len(blocks) > 0 {
// Save the historical root, state and highest index to the DB.
if helpers.IsEpochStart(currentState.Slot()) && currentState.Slot()%slotsPerArchivedPoint == 0 && blocks[len(blocks)-1].Block.Slot&slotsPerArchivedPoint == 0 {
if err := kv.saveArchivedInfo(ctx, currentState, blocks, i); err != nil {
return err
}
log.WithFields(log.Fields{
"currentArchivedIndex/totalArchivedIndices": fmt.Sprintf("%d/%d", i, lastSavedBlockArchivedIndex),
"archivedStateSlot": currentState.Slot()}).Info("Saved historical state")
}
}
startSlot += slotsPerArchivedPoint
}
return nil
}

// This runs state transition to recompute historical state.
func regenHistoricalStateTransition(
ctx context.Context,
state *stateTrie.BeaconState,
signed *ethpb.SignedBeaconBlock,
) (*stateTrie.BeaconState, error) {
if ctx.Err() != nil {
return nil, ctx.Err()
}
if signed == nil || signed.Block == nil {
return nil, errors.New("block can't be nil")
}
ctx, span := trace.StartSpan(ctx, "db.regenHistoricalStateTransition")
defer span.End()
var err error
state, err = regenHistoricalStateProcessSlots(ctx, state, signed.Block.Slot)
if err != nil {
return nil, errors.Wrap(err, "could not process slot")
}
state, err = transition.ProcessBlockForStateRoot(ctx, state, signed)
if err != nil {
return nil, errors.Wrap(err, "could not process block")
}
return state, nil
}

// This runs slot transition to recompute historical state.
func regenHistoricalStateProcessSlots(ctx context.Context, state *stateTrie.BeaconState, slot uint64) (*stateTrie.BeaconState, error) {
ctx, span := trace.StartSpan(ctx, "db.regenHistoricalStateProcessSlots")
defer span.End()
if state == nil {
return nil, errors.New("state can't be nil")
}
if state.Slot() > slot {
err := fmt.Errorf("expected state.slot %d < slot %d", state.Slot(), slot)
return nil, err
}
if state.Slot() == slot {
return state, nil
}
for state.Slot() < slot {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic is pretty similar to the state transition function. To reduce code duplication, is there a way we can abstract this core logic ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll have to refactor the core functions which I'm leaning towards to avoid... Since this is just a temp tool.
I don't forsee we maintain this for long, this will be deleted once we roll out --new-state-mgmt

state, err := transition.ProcessSlot(ctx, state)
if err != nil {
return nil, errors.Wrap(err, "could not process slot")
}
if transition.CanProcessEpoch(state) {
state, err = transition.ProcessEpochPrecompute(ctx, state)
if err != nil {
return nil, errors.Wrap(err, "could not process epoch with optimizations")
}
}
state.SetSlot(state.Slot() + 1)
}
return state, nil
}

// This retrieves the last saved block's archived index.
func (kv *Store) lastSavedBlockArchivedIndex(ctx context.Context) (uint64, error) {
b, err := kv.HighestSlotBlocks(ctx)
if err != nil {
return 0, err
}
if len(b) == 0 {
return 0, errors.New("blocks can't be empty")
}
if b[0] == nil {
return 0, errors.New("nil last block")
}
lastSavedBlockSlot := b[0].Block.Slot
slotsPerArchivedPoint := params.BeaconConfig().SlotsPerArchivedPoint
lastSavedBlockArchivedIndex := lastSavedBlockSlot/slotsPerArchivedPoint - 1

return lastSavedBlockArchivedIndex, nil
}

// This saved archived info (state, root, index) into the db.
func (kv *Store) saveArchivedInfo(ctx context.Context,
currentState *stateTrie.BeaconState,
blocks []*ethpb.SignedBeaconBlock,
archivedIndex uint64) error {
lastBlocksRoot, err := ssz.HashTreeRoot(blocks[len(blocks)-1].Block)
if err != nil {
return nil
}
if err := kv.SaveState(ctx, currentState, lastBlocksRoot); err != nil {
return err
}
if err := kv.SaveArchivedPointRoot(ctx, lastBlocksRoot, archivedIndex); err != nil {
return err
}
if err := kv.SaveLastArchivedIndex(ctx, archivedIndex); err != nil {
return err
}
return nil
}
2 changes: 2 additions & 0 deletions shared/params/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ type BeaconChainConfig struct {
EmptySignature [96]byte // EmptySignature is used to represent a zeroed out BLS Signature.
DefaultPageSize int // DefaultPageSize defines the default page size for RPC server request.
MaxPeersToSync int // MaxPeersToSync describes the limit for number of peers in round robin sync.
SlotsPerArchivedPoint uint64 // SlotsPerArchivedPoint defines the number of slots per one archived point.

// Slasher constants.
WeakSubjectivityPeriod uint64 // WeakSubjectivityPeriod defines the time period expressed in number of epochs were proof of stake network should validate block headers and attestations for slashable events.
Expand Down Expand Up @@ -185,6 +186,7 @@ var defaultBeaconConfig = &BeaconChainConfig{
EmptySignature: [96]byte{},
DefaultPageSize: 250,
MaxPeersToSync: 15,
SlotsPerArchivedPoint: 256,

// Slasher related values.
WeakSubjectivityPeriod: 54000,
Expand Down