Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/impl splistore #6231

Open
wants to merge 33 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
7d5f66c
feat: rm useless store
LinZexiao Nov 9, 2023
a384c93
feat: modify fs repo for splitstore
LinZexiao Nov 9, 2023
e16e356
feat: add more log about MinerGetBaseInfo
LinZexiao Nov 20, 2023
611e023
feat: add SetLogLevel to set log level
LinZexiao Nov 20, 2023
c7c83c8
feat: add ForEachKey to BadgerBlockstore
LinZexiao Nov 20, 2023
ec298c7
feat: add splitstore
LinZexiao Nov 20, 2023
9c549bf
feat: add unit test
LinZexiao Nov 20, 2023
aaed1eb
feat: add split store
LinZexiao Nov 20, 2023
c884ebe
fix: inject initstore
LinZexiao Nov 20, 2023
776e515
fix: call putmany on primary
LinZexiao Nov 20, 2023
601f1c7
feat: add log
LinZexiao Nov 20, 2023
bf83de3
feat: add config for splitstore:
LinZexiao Nov 20, 2023
6c76c1d
feat: add lock for close
LinZexiao Nov 21, 2023
7c31d37
fix: try best to walk more object
LinZexiao Nov 21, 2023
2904ddb
feat: add cmd to rollback splitstore
LinZexiao Nov 22, 2023
66a0408
feat: limit store size to half of chan finality
LinZexiao Nov 22, 2023
1c06ee4
feat: a smaller db for test
LinZexiao Nov 22, 2023
b39a27b
fix: enable start with splitstore
LinZexiao Nov 22, 2023
9b339e4
feat: store size must bigger than chain finality
LinZexiao Nov 22, 2023
40bdbf8
fix: typo
LinZexiao Nov 23, 2023
5c3656c
fix: make lint happy
LinZexiao Nov 23, 2023
6e956a7
fix: var name overlap
LinZexiao Nov 23, 2023
dc9d2be
feat: walk object concurrently
LinZexiao Nov 28, 2023
2e41814
feat: walk state for 4 finality
LinZexiao Dec 25, 2023
72f895f
feat: protect head cid
LinZexiao Dec 29, 2023
4f67ef5
chore: rm temp test
LinZexiao Dec 29, 2023
103feac
chore: ignore some unused
LinZexiao Dec 29, 2023
3f7daef
fix: return error when rollback disable
LinZexiao Dec 29, 2023
969292b
fix: git ignore lock of db
LinZexiao Dec 29, 2023
86bf950
fix : unit test for splitstore
LinZexiao Dec 29, 2023
f6c30c5
feat: enable soft delete in splitstore
LinZexiao Dec 29, 2023
7979e67
chore: rm TestWalkOneState
LinZexiao Dec 29, 2023
0bc46e3
feat: make lint happy
LinZexiao Dec 29, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,4 @@ gengen
dockerfile
coverage_unit.txt
coverage_venus_shared.txt
venus-shared/blockstore/splitstore/test_data/base_583_bafy2bzaceazuutcexhvwkyyesszohrkjjzk2zgknasgs7bb7zgfnwghtnu5w2.db/LOCK
10 changes: 6 additions & 4 deletions app/node/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/filecoin-project/venus/app/submodule/storagenetworking"
v1api "github.com/filecoin-project/venus/venus-shared/api/chain/v1"
"github.com/filecoin-project/venus/venus-shared/blockstore/splitstore"
)

// Env is the environment for command API handlers.
Expand All @@ -22,10 +23,11 @@ type Env struct {
MingingAPI v1api.IMining
MessagePoolAPI v1api.IMessagePool

MarketAPI v1api.IMarket
PaychAPI v1api.IPaychan
CommonAPI v1api.ICommon
EthAPI v1api.IETH
MarketAPI v1api.IMarket
PaychAPI v1api.IPaychan
CommonAPI v1api.ICommon
EthAPI v1api.IETH
SplitstoreAPI splitstore.Controller
}

var _ cmds.Environment = (*Env)(nil)
Expand Down
39 changes: 39 additions & 0 deletions app/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
_ "github.com/filecoin-project/venus/pkg/crypto/secp" // enable secp signatures
metricsPKG "github.com/filecoin-project/venus/pkg/metrics"
"github.com/filecoin-project/venus/pkg/repo"
"github.com/filecoin-project/venus/venus-shared/blockstore/splitstore"
"github.com/ipfs-force-community/metrics"
"github.com/ipfs-force-community/sophon-auth/jwtclient"
cmds "github.com/ipfs/go-ipfs-cmds"
Expand Down Expand Up @@ -379,7 +380,45 @@ func (node *Node) createServerEnv(ctx context.Context) *Env {
MarketAPI: node.market.API(),
CommonAPI: node.common,
EthAPI: node.eth.API(),
SplitstoreAPI: &RepoKeeper{repo: node.repo},
}

return &env
}

type RepoKeeper struct {
repo repo.Repo
}

var _ splitstore.Controller = (*RepoKeeper)(nil)

func (r *RepoKeeper) Rollback() error {
ds := r.repo.Datastore()
if ds == nil {
return fmt.Errorf("no blockstore found")
}

rb, ok := ds.(splitstore.Controller)
if !ok {
return fmt.Errorf("split store was disabled")
}
err := rb.Rollback()
if err != nil {
return fmt.Errorf("rollback splitstore: %w", err)
}

// rewrite config
cfg := r.repo.Config()
if cfg == nil {
return fmt.Errorf("no config found")
}
if cfg.Datastore.Type == "splitstore" {
cfg.Datastore.Type = "badgerds"
err = r.repo.ReplaceConfig(cfg)
if err != nil {
return fmt.Errorf("replace config: %w", err)
}
}

return nil
}
7 changes: 7 additions & 0 deletions app/submodule/chain/chain_submodule.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/filecoin-project/venus/pkg/vmsupport"
v0api "github.com/filecoin-project/venus/venus-shared/api/chain/v0"
v1api "github.com/filecoin-project/venus/venus-shared/api/chain/v1"
"github.com/filecoin-project/venus/venus-shared/blockstore/splitstore"
"github.com/filecoin-project/venus/venus-shared/types"
)

Expand Down Expand Up @@ -55,6 +56,7 @@ func NewChainSubmodule(ctx context.Context,
) (*ChainSubmodule, error) {
repo := config.Repo()
// initialize chain store
basebs := repo.Datastore()
chainStore := chain.NewStore(repo.ChainDatastore(), repo.Datastore(), config.GenesisCid(), circulatiingSupplyCalculator)
// drand
genBlk, err := chainStore.GetGenesisBlock(context.TODO())
Expand All @@ -78,6 +80,11 @@ func NewChainSubmodule(ctx context.Context,

waiter := chain.NewWaiter(chainStore, messageStore, config.Repo().Datastore(), cbor.NewCborStore(config.Repo().Datastore()))

// SubscribeHeadChanges for splitstore
if ss, ok := basebs.(*splitstore.Splitstore); ok {
chainStore.SubscribeHeadChanges(ss.HeadChange)
}

store := &ChainSubmodule{
ChainReader: chainStore,
MessageStore: messageStore,
Expand Down
7 changes: 7 additions & 0 deletions app/submodule/mining/mining_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ type MiningAPI struct { //nolint

// MinerGetBaseInfo get current miner information
func (miningAPI *MiningAPI) MinerGetBaseInfo(ctx context.Context, maddr address.Address, round abi.ChainEpoch, tsk types.TipSetKey) (*types.MiningBaseInfo, error) {
localLog := log.With(
"maddr", maddr,
"round", round,
)

chainStore := miningAPI.Ming.ChainModule.ChainReader
ts, err := chainStore.GetTipSet(ctx, tsk)
if err != nil {
Expand Down Expand Up @@ -79,6 +84,7 @@ func (miningAPI *MiningAPI) MinerGetBaseInfo(ctx context.Context, maddr address.
return nil, fmt.Errorf("loading miner in current state: %v", err)
}

localLog.Infof("miner actor(%s) not found at look back tipset %s", maddr, ts.Key())
return nil, nil
}
if err != nil {
Expand Down Expand Up @@ -106,6 +112,7 @@ func (miningAPI *MiningAPI) MinerGetBaseInfo(ctx context.Context, maddr address.
}

if len(xsectors) == 0 {
localLog.Info("no sectors found for winning post")
return nil, nil
}

Expand Down
4 changes: 4 additions & 0 deletions app/submodule/mining/mining_submodule.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,12 @@ import (
"github.com/filecoin-project/venus/pkg/util/ffiwrapper"
v0api "github.com/filecoin-project/venus/venus-shared/api/chain/v0"
v1api "github.com/filecoin-project/venus/venus-shared/api/chain/v1"

logging "github.com/ipfs/go-log/v2"
)

var log = logging.Logger("mining")

type miningConfig interface {
Repo() repo.Repo
Verifier() ffiwrapper.Verifier
Expand Down
32 changes: 17 additions & 15 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ TOOL COMMANDS
version - Show venus version information
seed - Seal sectors for genesis miner
fetch - Fetch proving parameters
splitstore - Manage splitstore
`,
},
Options: []cmds.Option{
Expand Down Expand Up @@ -167,21 +168,22 @@ var rootSubcmdsLocal = map[string]*cmds.Command{

// all top level commands, available on daemon. set during init() to avoid configuration loops.
var rootSubcmdsDaemon = map[string]*cmds.Command{
"chain": chainCmd,
"sync": syncCmd,
"drand": drandCmd,
"inspect": inspectCmd,
"log": logCmd,
"send": msgSendCmd,
"mpool": mpoolCmd,
"swarm": swarmCmd,
"wallet": walletCmd,
"version": versionCmd,
"state": stateCmd,
"miner": minerCmd,
"paych": paychCmd,
"info": infoCmd,
"evm": evmCmd,
"chain": chainCmd,
"sync": syncCmd,
"drand": drandCmd,
"inspect": inspectCmd,
"log": logCmd,
"send": msgSendCmd,
"mpool": mpoolCmd,
"swarm": swarmCmd,
"wallet": walletCmd,
"version": versionCmd,
"state": stateCmd,
"miner": minerCmd,
"paych": paychCmd,
"info": infoCmd,
"evm": evmCmd,
"splitstore": splitstoreCmd,
}

func init() {
Expand Down
30 changes: 30 additions & 0 deletions cmd/splitstore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package cmd

import (
"fmt"

"github.com/filecoin-project/venus/app/node"
cmds "github.com/ipfs/go-ipfs-cmds"
)

var splitstoreCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Manage splitstore",
},
Subcommands: map[string]*cmds.Command{
"rollback": splitstoreRollbackCmd,
},
}

var splitstoreRollbackCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Rollback splitstore to badger store",
},
PreRun: func(req *cmds.Request, env cmds.Environment) error {
fmt.Println("It may take a while to transfer block ...")
return nil
},
Run: func(req *cmds.Request, re cmds.ResponseEmitter, env cmds.Environment) error {
return env.(*node.Env).SplitstoreAPI.Rollback()
},
}
4 changes: 4 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ func main() {
_ = logging.SetLogLevel("pubsub", "error")
_ = logging.SetLogLevel("relay", "error")
_ = logging.SetLogLevel("dht/RtRefreshManager", "error")
// todo: remove it
_ = logging.SetLogLevel("splitstore", "debug")
_ = logging.SetLogLevel("chainsync.syncer", "debug")

} else {
level, err := logging.LevelFromString(lvl)
if err != nil {
Expand Down
2 changes: 0 additions & 2 deletions pkg/chain/waiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ type Waiter struct {
chainReader waiterChainReader
messageProvider MessageProvider
cst cbor.IpldStore
bs bstore.Blockstore
Stmgr IStmgr
}

Expand All @@ -55,7 +54,6 @@ func NewWaiter(chainStore waiterChainReader, messages MessageProvider, bs bstore
return &Waiter{
chainReader: chainStore,
cst: cst,
bs: bs,
messageProvider: messages,
}
}
Expand Down
15 changes: 11 additions & 4 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/pkg/errors"

"github.com/filecoin-project/venus/pkg/constants"
"github.com/filecoin-project/venus/venus-shared/actors/policy"
"github.com/filecoin-project/venus/venus-shared/types"
)

Expand Down Expand Up @@ -78,8 +79,12 @@ func newDefaultAPIConfig() *APIConfig {
// DatastoreConfig holds all the configuration options for the datastore.
// TODO: use the advanced datastore configuration from ipfs
type DatastoreConfig struct {
Type string `json:"type"`
Path string `json:"path"`
Type string `json:"type"`
Path string `json:"path"`
SplitstoreSize int64 `json:"splitstoreSize"`
SplitstoreCount int `json:"splitstoreCount"`
SplitstoreInitProtectEpoch int64 `json:"splitstoreInitProtectEpoch"`
SplitstoreSoftDelete bool `json:"splitstoreSoftDelete"`
}

// Validators hold the list of validation functions for each configuration
Expand All @@ -93,8 +98,10 @@ var Validators = map[string]func(string, string) error{

func newDefaultDatastoreConfig() *DatastoreConfig {
return &DatastoreConfig{
Type: "badgerds",
Path: "badger",
Type: "badgerds",
Path: "badger",
SplitstoreSize: int64(5 * policy.ChainFinality),
SplitstoreCount: 3,
}
}

Expand Down
8 changes: 7 additions & 1 deletion pkg/consensus/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/filecoin-project/venus/pkg/fvm"
"github.com/filecoin-project/venus/pkg/vm/vmcontext"
"github.com/filecoin-project/venus/venus-shared/actors/builtin/reward"
"github.com/filecoin-project/venus/venus-shared/blockstore"

"github.com/filecoin-project/go-address"
amt4 "github.com/filecoin-project/go-amt-ipld/v4"
Expand Down Expand Up @@ -95,6 +96,11 @@ func (p *DefaultProcessor) ApplyBlocks(ctx context.Context,
)

makeVM := func(base cid.Cid, e abi.ChainEpoch, timestamp uint64) (vm.Interface, error) {
bs := vmOpts.Bsstore
if ts.Height() == 1185554 {
bs = blockstore.NewLogStore("./bs.log", bs)
// _, _ = bs.Has(ctx, base)
}
vmOpt := vm.VmOption{
CircSupplyCalculator: vmOpts.CircSupplyCalculator,
LookbackStateGetter: vmOpts.LookbackStateGetter,
Expand All @@ -107,7 +113,7 @@ func (p *DefaultProcessor) ApplyBlocks(ctx context.Context,
Timestamp: timestamp,
GasPriceSchedule: vmOpts.GasPriceSchedule,
PRoot: base,
Bsstore: vmOpts.Bsstore,
Bsstore: bs,
SysCallsImpl: vmOpts.SysCallsImpl,
TipSetGetter: vmOpts.TipSetGetter,
Tracing: vmOpts.Tracing,
Expand Down
4 changes: 0 additions & 4 deletions pkg/fork/fork.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ import (

"github.com/filecoin-project/venus/pkg/config"
"github.com/filecoin-project/venus/pkg/constants"
"github.com/filecoin-project/venus/pkg/repo"
vmstate "github.com/filecoin-project/venus/pkg/state/tree"
"github.com/filecoin-project/venus/venus-shared/actors"
"github.com/filecoin-project/venus/venus-shared/actors/adt"
Expand Down Expand Up @@ -569,8 +568,6 @@ type ChainFork struct {
// upgrade param
networkType types.NetworkType
forkUpgrade *config.ForkUpgradeConfig

metadataDs repo.Datastore
}

func NewChainFork(ctx context.Context,
Expand All @@ -586,7 +583,6 @@ func NewChainFork(ctx context.Context,
ipldstore: ipldstore,
networkType: networkParams.NetworkType,
forkUpgrade: networkParams.ForkUpgradeParam,
metadataDs: metadataDs,
}

// If we have upgrades, make sure they're in-order and make sense.
Expand Down
Loading
Loading