From d01867971e2df4af473cef4ccf75048e8c87e212 Mon Sep 17 00:00:00 2001 From: Dhruba Basu <7675102+dhrubabasu@users.noreply.github.com> Date: Tue, 8 Aug 2023 12:44:29 -0700 Subject: [PATCH] P-chain state prune + height index (#1719) --- vms/platformvm/config/execution_config.go | 2 + .../config/execution_config_test.go | 2 + vms/platformvm/state/mock_state.go | 31 ++ vms/platformvm/state/state.go | 319 +++++++++++++++++- vms/platformvm/vm.go | 15 +- 5 files changed, 356 insertions(+), 13 deletions(-) diff --git a/vms/platformvm/config/execution_config.go b/vms/platformvm/config/execution_config.go index c083f82e83a6..7cdd34c8f46a 100644 --- a/vms/platformvm/config/execution_config.go +++ b/vms/platformvm/config/execution_config.go @@ -16,6 +16,7 @@ var DefaultExecutionConfig = ExecutionConfig{ RewardUTXOsCacheSize: 2048, ChainCacheSize: 2048, ChainDBCacheSize: 2048, + BlockIDCacheSize: 8192, ChecksumsEnabled: false, } @@ -27,6 +28,7 @@ type ExecutionConfig struct { RewardUTXOsCacheSize int `json:"reward-utxos-cache-size"` ChainCacheSize int `json:"chain-cache-size"` ChainDBCacheSize int `json:"chain-db-cache-size"` + BlockIDCacheSize int `json:"block-id-cache-size"` ChecksumsEnabled bool `json:"checksums-enabled"` } diff --git a/vms/platformvm/config/execution_config_test.go b/vms/platformvm/config/execution_config_test.go index 2b7397147e22..b9db05e2fe22 100644 --- a/vms/platformvm/config/execution_config_test.go +++ b/vms/platformvm/config/execution_config_test.go @@ -45,6 +45,7 @@ func TestExecutionConfigUnmarshal(t *testing.T) { "reward-utxos-cache-size": 5, "chain-cache-size": 6, "chain-db-cache-size": 7, + "block-id-cache-size": 8, "checksums-enabled": true }`) ec, err := GetExecutionConfig(b) @@ -56,6 +57,7 @@ func TestExecutionConfigUnmarshal(t *testing.T) { RewardUTXOsCacheSize: 5, ChainCacheSize: 6, ChainDBCacheSize: 7, + BlockIDCacheSize: 8, ChecksumsEnabled: true, } require.Equal(expected, ec) diff --git a/vms/platformvm/state/mock_state.go b/vms/platformvm/state/mock_state.go index a150aecd251f..883cc3f84f00 100644 --- a/vms/platformvm/state/mock_state.go +++ b/vms/platformvm/state/mock_state.go @@ -10,11 +10,13 @@ package state import ( context "context" reflect "reflect" + sync "sync" time "time" database "github.com/ava-labs/avalanchego/database" ids "github.com/ava-labs/avalanchego/ids" validators "github.com/ava-labs/avalanchego/snow/validators" + logging "github.com/ava-labs/avalanchego/utils/logging" avax "github.com/ava-labs/avalanchego/vms/components/avax" blocks "github.com/ava-labs/avalanchego/vms/platformvm/blocks" status "github.com/ava-labs/avalanchego/vms/platformvm/status" @@ -286,6 +288,21 @@ func (mr *MockStateMockRecorder) DeleteUTXO(arg0 interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteUTXO", reflect.TypeOf((*MockState)(nil).DeleteUTXO), arg0) } +// GetBlockIDAtHeight mocks base method. +func (m *MockState) GetBlockIDAtHeight(arg0 uint64) (ids.ID, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetBlockIDAtHeight", arg0) + ret0, _ := ret[0].(ids.ID) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetBlockIDAtHeight indicates an expected call of GetBlockIDAtHeight. +func (mr *MockStateMockRecorder) GetBlockIDAtHeight(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetBlockIDAtHeight", reflect.TypeOf((*MockState)(nil).GetBlockIDAtHeight), arg0) +} + // GetChains mocks base method. func (m *MockState) GetChains(arg0 ids.ID) ([]*txs.Tx, error) { m.ctrl.T.Helper() @@ -571,6 +588,20 @@ func (mr *MockStateMockRecorder) GetUptime(arg0, arg1 interface{}) *gomock.Call return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetUptime", reflect.TypeOf((*MockState)(nil).GetUptime), arg0, arg1) } +// PruneAndIndex mocks base method. +func (m *MockState) PruneAndIndex(arg0 sync.Locker, arg1 logging.Logger) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "PruneAndIndex", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// PruneAndIndex indicates an expected call of PruneAndIndex. +func (mr *MockStateMockRecorder) PruneAndIndex(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PruneAndIndex", reflect.TypeOf((*MockState)(nil).PruneAndIndex), arg0, arg1) +} + // PutCurrentDelegator mocks base method. func (m *MockState) PutCurrentDelegator(arg0 *Staker) { m.ctrl.T.Helper() diff --git a/vms/platformvm/state/state.go b/vms/platformvm/state/state.go index f1f1d4c5ee98..5626696eed30 100644 --- a/vms/platformvm/state/state.go +++ b/vms/platformvm/state/state.go @@ -7,10 +7,15 @@ import ( "context" "errors" "fmt" + "sync" "time" + stdmath "math" + "github.com/google/btree" + "go.uber.org/zap" + "github.com/prometheus/client_golang/prometheus" "github.com/ava-labs/avalanchego/cache" @@ -28,7 +33,9 @@ import ( "github.com/ava-labs/avalanchego/utils/constants" "github.com/ava-labs/avalanchego/utils/crypto/bls" "github.com/ava-labs/avalanchego/utils/hashing" + "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/avalanchego/utils/math" + "github.com/ava-labs/avalanchego/utils/timer" "github.com/ava-labs/avalanchego/utils/wrappers" "github.com/ava-labs/avalanchego/vms/components/avax" "github.com/ava-labs/avalanchego/vms/platformvm/blocks" @@ -40,6 +47,13 @@ import ( "github.com/ava-labs/avalanchego/vms/platformvm/txs" ) +const ( + pruneCommitLimit = 1024 + pruneCommitSleepMultiplier = 5 + pruneCommitSleepCap = 10 * time.Second + pruneUpdateFrequency = 30 * time.Second +) + var ( _ State = (*state)(nil) @@ -48,6 +62,7 @@ var ( errValidatorSetAlreadyPopulated = errors.New("validator set already populated") errDuplicateValidatorSet = errors.New("duplicate validator set") + blockIDPrefix = []byte("blockID") blockPrefix = []byte("block") validatorsPrefix = []byte("validators") currentPrefix = []byte("current") @@ -74,6 +89,7 @@ var ( lastAcceptedKey = []byte("last accepted") heightsIndexedKey = []byte("heights indexed") initializedKey = []byte("initialized") + prunedKey = []byte("pruned") ) // Chain collects all methods to manage the state of the chain for block @@ -119,6 +135,8 @@ type State interface { // Invariant: [block] is an accepted block. AddStatelessBlock(block blocks.Block) + GetBlockIDAtHeight(height uint64) (ids.ID, error) + // ValidatorSet adds all the validators and delegators of [subnetID] into // [vdrs]. ValidatorSet(subnetID ids.ID, vdrs validators.Set) error @@ -165,6 +183,12 @@ type State interface { // Discard uncommitted changes to the database. Abort() + // Removes rejected blocks from disk and indexes accepted blocks by height. This + // function supports being (and is recommended to be) called asynchronously. + // + // TODO: Remove after v1.11.x is activated + PruneAndIndex(sync.Locker, logging.Logger) error + // Commit changes to the base database. Commit() error @@ -225,6 +249,8 @@ type stateBlk struct { * | | '-- subnet+height+nodeID -> weightChange * | '-. flat pub key diffs * | '-- subnet+height+nodeID -> uncompressed public key or nil + * |-. blockIDs + * | '-- height -> blockID * |-. blocks * | '-- blockID -> block bytes * |-. txs @@ -244,6 +270,7 @@ type stateBlk struct { * | '-- txID -> nil * '-. singletons * |-- initializedKey -> nil + * |-- prunedKey -> nil * |-- timestampKey -> timestamp * |-- currentSupplyKey -> currentSupply * |-- lastAcceptedKey -> lastAccepted @@ -265,6 +292,10 @@ type state struct { currentHeight uint64 + addedBlockIDs map[uint64]ids.ID // map of height -> blockID + blockIDCache cache.Cacher[uint64, ids.ID] // cache of height -> blockID. If the entry is ids.Empty, it is not in the database + blockIDDB database.Database + addedBlocks map[ids.ID]blocks.Block // map of blockID -> Block blockCache cache.Cacher[ids.ID, blocks.Block] // cache of blockID -> Block. If the entry is nil, it is not in the database blockDB database.Database @@ -432,6 +463,24 @@ func New( return nil, err } + // Before we start accepting new blocks, we check if the pruning process needs + // to be run. + // + // TODO: Cleanup after v1.11.x is activated + shouldPrune, err := s.shouldPrune() + if err != nil { + return nil, err + } + if shouldPrune { + if err := s.singletonDB.Delete(prunedKey); err != nil { + return nil, fmt.Errorf("failed to remove prunedKey from singletonDB: %w", err) + } + + if err := s.Commit(); err != nil { + return nil, fmt.Errorf("failed to commit to baseDB: %w", err) + } + } + return s, nil } @@ -445,6 +494,15 @@ func newState( rewards reward.Calculator, bootstrapped *utils.Atomic[bool], ) (*state, error) { + blockIDCache, err := metercacher.New[uint64, ids.ID]( + "block_id_cache", + metricsReg, + &cache.LRU[uint64, ids.ID]{Size: execCfg.BlockIDCacheSize}, + ) + if err != nil { + return nil, err + } + blockCache, err := metercacher.New[ids.ID, blocks.Block]( "block_cache", metricsReg, @@ -548,6 +606,10 @@ func newState( bootstrapped: bootstrapped, baseDB: baseDB, + addedBlockIDs: make(map[uint64]ids.ID), + blockIDCache: blockIDCache, + blockIDDB: prefixdb.New(blockIDPrefix, baseDB), + addedBlocks: make(map[ids.ID]blocks.Block), blockCache: blockCache, blockDB: prefixdb.New(blockPrefix, baseDB), @@ -676,6 +738,37 @@ func (s *state) doneInit() error { return s.singletonDB.Put(initializedKey, nil) } +func (s *state) shouldPrune() (bool, error) { + has, err := s.singletonDB.Has(prunedKey) + if err != nil { + return true, err + } + + // If [prunedKey] is not in [singletonDB], [PruneAndIndex()] did not finish + // execution. + if !has { + return true, nil + } + + // To ensure the db was not modified since we last ran [PruneAndIndex()], we + // must verify that [s.lastAccepted] is height indexed. + blk, err := s.GetStatelessBlock(s.lastAccepted) + if err != nil { + return true, err + } + + _, err = s.GetBlockIDAtHeight(blk.Height()) + if err == database.ErrNotFound { + return true, nil + } + + return false, err +} + +func (s *state) donePrune() error { + return s.singletonDB.Put(prunedKey, nil) +} + func (s *state) GetSubnets() ([]*txs.Tx, error) { if s.cachedSubnets != nil { return s.cachedSubnets, nil @@ -1588,6 +1681,7 @@ func (s *state) Close() error { s.chainDB.Close(), s.singletonDB.Close(), s.blockDB.Close(), + s.blockIDDB.Close(), ) return errs.Err } @@ -1647,7 +1741,9 @@ func (s *state) init(genesisBytes []byte) error { } func (s *state) AddStatelessBlock(block blocks.Block) { - s.addedBlocks[block.ID()] = block + blkID := block.ID() + s.addedBlockIDs[block.Height()] = blkID + s.addedBlocks[blkID] = block } func (s *state) SetHeight(height uint64) { @@ -1693,17 +1789,14 @@ func (s *state) CommitBatch() (database.Batch, error) { func (s *state) writeBlocks() error { for blkID, blk := range s.addedBlocks { blkID := blkID + blkBytes := blk.Bytes() + blkHeight := blk.Height() + heightKey := database.PackUInt64(blkHeight) - stBlk := stateBlk{ - Blk: blk, - Bytes: blk.Bytes(), - Status: choices.Accepted, - } - - // Note: blocks to be stored are verified, so it's safe to marshal them with GenesisCodec - blockBytes, err := blocks.GenesisCodec.Marshal(blocks.Version, &stBlk) - if err != nil { - return fmt.Errorf("failed to marshal block %s to store: %w", blkID, err) + delete(s.addedBlockIDs, blkHeight) + s.blockIDCache.Put(blkHeight, blkID) + if err := database.PutID(s.blockIDDB, heightKey, blkID); err != nil { + return fmt.Errorf("failed to add blockID: %w", err) } delete(s.addedBlocks, blkID) @@ -1711,7 +1804,7 @@ func (s *state) writeBlocks() error { // referencing additional data (because of shared byte slices) that // would not be properly accounted for in the cache sizing. s.blockCache.Evict(blkID) - if err := s.blockDB.Put(blkID[:], blockBytes); err != nil { + if err := s.blockDB.Put(blkID[:], blkBytes); err != nil { return fmt.Errorf("failed to write block %s: %w", blkID, err) } } @@ -1753,6 +1846,33 @@ func (s *state) GetStatelessBlock(blockID ids.ID) (blocks.Block, error) { return blk, nil } +func (s *state) GetBlockIDAtHeight(height uint64) (ids.ID, error) { + if blkID, exists := s.addedBlockIDs[height]; exists { + return blkID, nil + } + if blkID, cached := s.blockIDCache.Get(height); cached { + if blkID == ids.Empty { + return ids.Empty, database.ErrNotFound + } + + return blkID, nil + } + + heightKey := database.PackUInt64(height) + + blkID, err := database.GetID(s.blockIDDB, heightKey) + if err == database.ErrNotFound { + s.blockIDCache.Put(height, ids.Empty) + return ids.Empty, database.ErrNotFound + } + if err != nil { + return ids.Empty, err + } + + s.blockIDCache.Put(height, blkID) + return blkID, nil +} + func (s *state) writeCurrentStakers(updateValidators bool, height uint64) error { heightBytes := database.PackUInt64(height) rawNestedPublicKeyDiffDB := prefixdb.New(heightBytes, s.nestedValidatorPublicKeyDiffsDB) @@ -2220,3 +2340,178 @@ func parseStoredBlock(blkBytes []byte) (blocks.Block, choices.Status, bool, erro return blkState.Blk, blkState.Status, true, nil } + +func (s *state) PruneAndIndex(lock sync.Locker, log logging.Logger) error { + lock.Lock() + shouldPrune, err := s.shouldPrune() + if err != nil { + lock.Unlock() + return fmt.Errorf( + "failed to check if the database should be pruned: %w", + err, + ) + } + if !shouldPrune { + lock.Unlock() + + log.Info("state already pruned and indexed") + return nil + } + + // It is possible that new blocks are added after grabbing this iterator. New + // blocks are guaranteed to be accepted and height-indexed, so we don't need to + // check them. + blockIterator := s.blockDB.NewIterator() + // Releasing is done using a closure to ensure that updating blockIterator will + // result in having the most recent iterator released when executing the + // deferred function. + defer func() { + blockIterator.Release() + }() + + // While we are pruning the disk, we disable caching of the data we are + // modifying. Caching is re-enabled when pruning finishes. + // + // Note: If an unexpected error occurs the caches are never re-enabled. + // That's fine as the node is going to be in an unhealthy state regardless. + oldBlockIDCache := s.blockIDCache + s.blockIDCache = &cache.Empty[uint64, ids.ID]{} + lock.Unlock() + + log.Info("starting state pruning and indexing") + + var ( + startTime = time.Now() + lastCommit = startTime + lastUpdate = startTime + numPruned = 0 + numIndexed = 0 + ) + + for blockIterator.Next() { + blkBytes := blockIterator.Value() + + blk, status, isStateBlk, err := parseStoredBlock(blkBytes) + if err != nil { + return err + } + + if status != choices.Accepted { + // Remove non-accepted blocks from disk. + if err := s.blockDB.Delete(blockIterator.Key()); err != nil { + return fmt.Errorf("failed to delete block: %w", err) + } + + numPruned++ + + // We don't index the height of non-accepted blocks. + continue + } + + blkHeight := blk.Height() + blkID := blk.ID() + + // Populate the map of height -> blockID. + heightKey := database.PackUInt64(blkHeight) + if err := database.PutID(s.blockIDDB, heightKey, blkID); err != nil { + return fmt.Errorf("failed to add blockID: %w", err) + } + + // Since we only store accepted blocks on disk, we only need to store a map of + // ids.ID to Block. + if isStateBlk { + if err := s.blockDB.Put(blkID[:], blkBytes); err != nil { + return fmt.Errorf("failed to write block: %w", err) + } + } + + numIndexed++ + + if numIndexed%pruneCommitLimit == 0 { + // We must hold the lock during committing to make sure we don't + // attempt to commit to disk while a block is concurrently being + // accepted. + lock.Lock() + errs := wrappers.Errs{} + errs.Add( + s.Commit(), + blockIterator.Error(), + ) + lock.Unlock() + if errs.Errored() { + return errs.Err + } + + // We release the iterator here to allow the underlying database to + // clean up deleted state. + blockIterator.Release() + + now := time.Now() + if now.Sub(lastUpdate) > pruneUpdateFrequency { + lastUpdate = now + + progress := timer.ProgressFromHash(blkID[:]) + eta := timer.EstimateETA( + startTime, + progress, + stdmath.MaxUint64, + ) + + log.Info("committing state pruning and indexing", + zap.Int("numPruned", numPruned), + zap.Int("numIndexed", numIndexed), + zap.Duration("eta", eta), + ) + } + + // We take the minimum here because it's possible that the node is + // currently bootstrapping. This would mean that grabbing the lock + // could take an extremely long period of time; which we should not + // delay processing for. + pruneDuration := now.Sub(lastCommit) + sleepDuration := math.Min( + pruneCommitSleepMultiplier*pruneDuration, + pruneCommitSleepCap, + ) + time.Sleep(sleepDuration) + + // Make sure not to include the sleep duration into the next prune + // duration. + lastCommit = time.Now() + + blockIterator = s.blockDB.NewIteratorWithStart(blkID[:]) + } + } + + // Ensure we fully iterated over all blocks before writing that pruning has + // finished. + // + // Note: This is needed because a transient read error could cause the + // iterator to stop early. + if err := blockIterator.Error(); err != nil { + return err + } + + if err := s.donePrune(); err != nil { + return err + } + + // We must hold the lock during committing to make sure we don't + // attempt to commit to disk while a block is concurrently being + // accepted. + lock.Lock() + defer lock.Unlock() + + // Make sure we flush the original cache before re-enabling it to prevent + // surfacing any stale data. + oldBlockIDCache.Flush() + s.blockIDCache = oldBlockIDCache + + log.Info("finished state pruning and indexing", + zap.Int("numPruned", numPruned), + zap.Int("numIndexed", numIndexed), + zap.Duration("duration", time.Since(startTime)), + ) + + return s.Commit() +} diff --git a/vms/platformvm/vm.go b/vms/platformvm/vm.go index 3bff44acea65..68e2de9c77f2 100644 --- a/vms/platformvm/vm.go +++ b/vms/platformvm/vm.go @@ -211,7 +211,20 @@ func (vm *VM) Initialize( chainCtx.Log.Info("initializing last accepted", zap.Stringer("blkID", lastAcceptedID), ) - return vm.SetPreference(ctx, lastAcceptedID) + if err := vm.SetPreference(ctx, lastAcceptedID); err != nil { + return err + } + + go func() { + err := vm.state.PruneAndIndex(&vm.ctx.Lock, vm.ctx.Log) + if err != nil { + vm.ctx.Log.Error("state pruning and height indexing failed", + zap.Error(err), + ) + } + }() + + return nil } // Create all chains that exist that this node validates.