diff --git a/BITFINITY.README.md b/BITFINITY.README.md new file mode 100644 index 00000000000..f87975e419f --- /dev/null +++ b/BITFINITY.README.md @@ -0,0 +1,52 @@ +# Introduction + +Block importer is a process that periodically polls the blocks from the EVMC canister and writes it into a file-based erigon db. It can be used to launch a blockhain explorer that introspects the EMVC blockchain. + +The code of the blockimporter is in `cmd/blockimporter`. + +# Archiving historical data + +``` +blockimporter [--evm ] [--db ] [--secondary-blocks-url ] +``` + +# Serving rpcdaemon (EVM JSON RPC) + +You can use rpcdaemon to setup an endpoint with JSON RPC server to access the blockchain state from blockimporter. + +To build the project just run in the project folder. + +``` +git checkout origin/evmc_importer +make +``` + +The binary artifacts can be found in `build/bin` folder. + +Now you can run two processes together sharing the same Db path: + +``` +build/bin/blockimporter --evm --db &\ +build/bin/rpcdaemon --datadir --http.corsdomain * --http.api=eth,erigon,ots +``` + +In this case the JSON RPC API can be accessed by address localhost:8545 (which is a default setting for the `rpcdaemon` that can be changed by passing `--http.port` argument). For more options run `build/bin/rpcdaemon --help` + +# Setting up the otterscan block explorer + +Otterscan can be run with `blockimporter` using the integration via rpcdaemon: + +``` +build/bin/blockimporter --evm --db &\ +build/bin/rpcdaemon --datadir --http.corsdomain * --http.api=eth,erigon,ots &\ +docker run --rm -p 5100:80 --name otterscan -d --env ERIGON_URL=localhost:8545 otterscan/otterscan:v1.29.0 +``` + +Another option is to us the docker-compose file: + +``` +cd docker +mkdir ./db +chmod 777 ./db +docker-compose up +``` \ No newline at end of file diff --git a/cmd/blockimporter/Readme.md b/cmd/blockimporter/Readme.md deleted file mode 100644 index eddd5399b0e..00000000000 --- a/cmd/blockimporter/Readme.md +++ /dev/null @@ -1,26 +0,0 @@ -# Introduction - -Block importer is a process that periodically polls the blocks from the EVMC canister and writes it into a file-based erigon db. It can be used to launch a blockhain explorer that introspects the EMVC blockchain. - -# Usage - -blockimporter [--evm ] [--db ] - -# Running Ottrerscan with blockimporter - -Otterscan can be run with `blockimporter` using the integration via rpcdaemon: - -``` -blockimporter --evm --db &\ -rpcdaemon --datadir --http.corsdomain * --http.api=eth,erigon,ots &\ -docker run --rm -p 5100:80 --name otterscan -d --env ERIGON_URL=localhost:8545 otterscan/otterscan:v1.29.0 -``` - -Another option is to us the docker-compose file: - -``` -cd docker -mkdir ./db -chmod 777 ./db -docker-compose up -``` \ No newline at end of file diff --git a/cmd/blockimporter/import_runner.go b/cmd/blockimporter/import_runner.go index fee866aabaa..5c0fa1612b9 100644 --- a/cmd/blockimporter/import_runner.go +++ b/cmd/blockimporter/import_runner.go @@ -4,7 +4,9 @@ import ( "fmt" "time" + "github.com/ledgerwatch/erigon/core/types" "github.com/ledgerwatch/log/v3" + "github.com/pion/udp/pkg/sync" ) type Settings struct { @@ -38,32 +40,55 @@ func RunImport(settings *Settings, blockSource BlockSource, secondaryBlocksSourc panic(err) } - blockNum := state.BlockNum() blockSource = makeBlockSource(settings, blockSource, secondaryBlocksSource) - for { - select { - case <-settings.Terminated: - { - return nil + + blocksChan := make(chan []types.Block, 10) + + var resultErr error + wg := sync.NewWaitGroup() + wg.Add(2) + + go func() { + defer wg.Done() + + blockNum := state.BlockNum() + + for { + blocks, err := blockSource.PollBlocks(blockNum) + if err != nil { + resultErr = fmt.Errorf("failed to poll blocks: %w", err) + close(blocksChan) + close(settings.Terminated) + return } - default: - { - blocks, err := blockSource.PollBlocks(blockNum) - if err != nil { - return fmt.Errorf("failed to poll blocks: %w", err) - } + blockNum += uint64(len(blocks)) - for _, block := range blocks { - if err := state.ProcessBlock(block); err != nil { - return fmt.Errorf("failed to process block: %w", err) - } + select { + case blocksChan <- blocks: + case <-settings.Terminated: + close(blocksChan) + return + } + } + }() + + go func() { + defer wg.Done() - blockNum += 1 + for blocks := range blocksChan { + for _, block := range blocks { + if err := state.ProcessBlock(block); err != nil { + resultErr = fmt.Errorf("failed to process block: %w", err) + close(settings.Terminated) + return } } } - } + }() + + wg.Wait() + return resultErr } func makeBlockSource(settings *Settings, blockSource BlockSource, secondaryBlockSource BlockSource) BlockSource { diff --git a/cmd/blockimporter/main.go b/cmd/blockimporter/main.go index 0288e572bc8..49e0a02dbef 100644 --- a/cmd/blockimporter/main.go +++ b/cmd/blockimporter/main.go @@ -29,7 +29,7 @@ func main() { PollInterval: time.Second, } - c := make(chan os.Signal) + c := make(chan os.Signal, 10) signal.Notify(c, os.Interrupt, syscall.SIGTERM) go func() { <-c diff --git a/cmd/blockimporter/state.go b/cmd/blockimporter/state.go index f69396394eb..a7fd7034cf5 100644 --- a/cmd/blockimporter/state.go +++ b/cmd/blockimporter/state.go @@ -29,6 +29,7 @@ import ( type State struct { db *DB + trieLoader *trie.FlatDBTrieLoader blockNum *big.Int totalDifficulty *big.Int chainConfig *chain.Config @@ -41,7 +42,14 @@ func NewState(db *DB, initialBalances []BalanceEntry, chainID int64) (*State, er } defer tx.Rollback() - state := &State{db: db} + state := &State{ + db: db, + trieLoader: trie.NewFlatDBTrieLoader("loader"), + } + + if err := state.trieLoader.Reset(trie.NewRetainList(0), nil, nil, false); err != nil { + return nil, err + } if blockNum := rawdb.ReadCurrentBlockNumber(tx); blockNum == nil || *blockNum == 0 { // Close the transaction @@ -97,7 +105,9 @@ func NewState(db *DB, initialBalances []BalanceEntry, chainID int64) (*State, er if err = stagedsync.PromoteHashedStateCleanly("logPrefix", tx, stagedsync.StageHashStateCfg(db.chain, dirs, false, nil), context.Background()); err != nil { return nil, fmt.Errorf("error while promoting state: %v", err) } - if root, err := trie.CalcRoot("block state root", tx); err != nil { + + root, err := state.trieLoader.CalcTrieRoot(tx, nil, nil) + if err != nil { return nil, err } else if root != block.Root() { // This error may happen if we forgot to initialize the data for state root calculation. @@ -167,12 +177,16 @@ func (state *State) ProcessBlock(block types.Block) error { return nil } - // check state root hash dirs := datadir2.New(state.db.path) if err = stagedsync.PromoteHashedStateIncrementally("hashedstate", block.NumberU64()-1, block.NumberU64(), tx, stagedsync.StageHashStateCfg(nil, dirs, false, nil), context.Background(), false); err != nil { return err } - if root, err := trie.CalcRoot("block state root", tx); err != nil { + + s := stagedsync.StageState{ + BlockNumber: block.NumberU64() - 1, + } + cfg := stagedsync.StageTrieCfg(state.db.chain, false, true, true, state.db.path, nil, nil, false, nil) + if root, err := stagedsync.IncrementIntermediateHashes("increment hashes", &s, tx, block.NumberU64(), cfg, common.Hash{}, nil); err != nil { return err } else if root != block.Root() { return fmt.Errorf("invalid root, have: %s, want: %s", root.String(), block.Root().String()) diff --git a/eth/stagedsync/stage_interhashes.go b/eth/stagedsync/stage_interhashes.go index 174ab41ec2a..800ef24136b 100644 --- a/eth/stagedsync/stage_interhashes.go +++ b/eth/stagedsync/stage_interhashes.go @@ -114,7 +114,7 @@ func SpawnIntermediateHashesStage(s *StageState, u Unwinder, tx kv.RwTx, cfg Tri return trie.EmptyRoot, err } } else { - if root, err = incrementIntermediateHashes(logPrefix, s, tx, to, cfg, expectedRootHash, quit); err != nil { + if root, err = IncrementIntermediateHashes(logPrefix, s, tx, to, cfg, expectedRootHash, quit); err != nil { return trie.EmptyRoot, err } } @@ -522,7 +522,7 @@ func (p *HashPromoter) Unwind(logPrefix string, s *StageState, u *UnwindState, s return nil } -func incrementIntermediateHashes(logPrefix string, s *StageState, db kv.RwTx, to uint64, cfg TrieCfg, expectedRootHash libcommon.Hash, quit <-chan struct{}) (libcommon.Hash, error) { +func IncrementIntermediateHashes(logPrefix string, s *StageState, db kv.RwTx, to uint64, cfg TrieCfg, expectedRootHash libcommon.Hash, quit <-chan struct{}) (libcommon.Hash, error) { p := NewHashPromoter(db, cfg.tmpDir, quit, logPrefix) rl := trie.NewRetainList(0) if cfg.historyV3 { diff --git a/eth/stagedsync/stage_interhashes_test.go b/eth/stagedsync/stage_interhashes_test.go index 4f272eb1964..139f1d76db4 100644 --- a/eth/stagedsync/stage_interhashes_test.go +++ b/eth/stagedsync/stage_interhashes_test.go @@ -149,7 +149,7 @@ func TestAccountAndStorageTrie(t *testing.T) { var s StageState s.BlockNumber = 0 - _, err = incrementIntermediateHashes("IH", &s, tx, 1 /* to */, cfg, libcommon.Hash{} /* expectedRootHash */, nil /* quit */) + _, err = IncrementIntermediateHashes("IH", &s, tx, 1 /* to */, cfg, libcommon.Hash{} /* expectedRootHash */, nil /* quit */) assert.Nil(t, err) accountTrieB := make(map[string][]byte) @@ -299,7 +299,7 @@ func TestStorageDeletion(t *testing.T) { var s StageState s.BlockNumber = 0 - _, err = incrementIntermediateHashes("IH", &s, tx, 1 /* to */, cfg, libcommon.Hash{} /* expectedRootHash */, nil /* quit */) + _, err = IncrementIntermediateHashes("IH", &s, tx, 1 /* to */, cfg, libcommon.Hash{} /* expectedRootHash */, nil /* quit */) assert.Nil(t, err) storageTrieB := make(map[string][]byte) @@ -396,7 +396,7 @@ func TestHiveTrieRoot(t *testing.T) { var s StageState s.BlockNumber = 0 - incrementalRoot, err := incrementIntermediateHashes("IH", &s, tx, 1 /* to */, cfg, libcommon.Hash{} /* expectedRootHash */, nil /* quit */) + incrementalRoot, err := IncrementIntermediateHashes("IH", &s, tx, 1 /* to */, cfg, libcommon.Hash{} /* expectedRootHash */, nil /* quit */) require.Nil(t, err) regeneratedRoot, err := RegenerateIntermediateHashes("IH", tx, cfg, libcommon.Hash{} /* expectedRootHash */, ctx)