Skip to content

Commit

Permalink
Merge pull request #22 from bitfinity-network/use_incremental_trie_up…
Browse files Browse the repository at this point in the history
…date

Use incremental trie update
  • Loading branch information
blutooth authored Jan 29, 2024
2 parents 73759a4 + ee059ee commit d4d385e
Show file tree
Hide file tree
Showing 7 changed files with 119 additions and 54 deletions.
52 changes: 52 additions & 0 deletions BITFINITY.README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Introduction

Block importer is a process that periodically polls the blocks from the EVMC canister and writes it into a file-based erigon db. It can be used to launch a blockhain explorer that introspects the EMVC blockchain.

The code of the blockimporter is in `cmd/blockimporter`.

# Archiving historical data

```
blockimporter [--evm <EVMC_CANISTER_URL>] [--db <DATABASE_PATH>] [--secondary-blocks-url <PATH TO A SECONDARY SOURCE OF BLOCKS>]
```

# Serving rpcdaemon (EVM JSON RPC)

You can use rpcdaemon to setup an endpoint with JSON RPC server to access the blockchain state from blockimporter.

To build the project just run in the project folder.

```
git checkout origin/evmc_importer
make
```

The binary artifacts can be found in `build/bin` folder.

Now you can run two processes together sharing the same Db path:

```
build/bin/blockimporter --evm <EVMC_CANISTER_URL> --db <DB_PATH> &\
build/bin/rpcdaemon --datadir <DB_PATH> --http.corsdomain * --http.api=eth,erigon,ots
```

In this case the JSON RPC API can be accessed by address localhost:8545 (which is a default setting for the `rpcdaemon` that can be changed by passing `--http.port` argument). For more options run `build/bin/rpcdaemon --help`

# Setting up the otterscan block explorer

Otterscan can be run with `blockimporter` using the integration via rpcdaemon:

```
build/bin/blockimporter --evm <EVMC_CANISTER_URL> --db <DB_PATH> &\
build/bin/rpcdaemon --datadir <DB_PATH> --http.corsdomain * --http.api=eth,erigon,ots &\
docker run --rm -p 5100:80 --name otterscan -d --env ERIGON_URL=localhost:8545 otterscan/otterscan:v1.29.0
```

Another option is to us the docker-compose file:

```
cd docker
mkdir ./db
chmod 777 ./db
docker-compose up
```
26 changes: 0 additions & 26 deletions cmd/blockimporter/Readme.md

This file was deleted.

61 changes: 43 additions & 18 deletions cmd/blockimporter/import_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"fmt"
"time"

"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/log/v3"
"github.com/pion/udp/pkg/sync"
)

type Settings struct {
Expand Down Expand Up @@ -38,32 +40,55 @@ func RunImport(settings *Settings, blockSource BlockSource, secondaryBlocksSourc
panic(err)
}

blockNum := state.BlockNum()
blockSource = makeBlockSource(settings, blockSource, secondaryBlocksSource)
for {
select {
case <-settings.Terminated:
{
return nil

blocksChan := make(chan []types.Block, 10)

var resultErr error
wg := sync.NewWaitGroup()
wg.Add(2)

go func() {
defer wg.Done()

blockNum := state.BlockNum()

for {
blocks, err := blockSource.PollBlocks(blockNum)
if err != nil {
resultErr = fmt.Errorf("failed to poll blocks: %w", err)
close(blocksChan)
close(settings.Terminated)
return
}

default:
{
blocks, err := blockSource.PollBlocks(blockNum)
if err != nil {
return fmt.Errorf("failed to poll blocks: %w", err)
}
blockNum += uint64(len(blocks))

for _, block := range blocks {
if err := state.ProcessBlock(block); err != nil {
return fmt.Errorf("failed to process block: %w", err)
}
select {
case blocksChan <- blocks:
case <-settings.Terminated:
close(blocksChan)
return
}
}
}()

go func() {
defer wg.Done()

blockNum += 1
for blocks := range blocksChan {
for _, block := range blocks {
if err := state.ProcessBlock(block); err != nil {
resultErr = fmt.Errorf("failed to process block: %w", err)
close(settings.Terminated)
return
}
}
}
}
}()

wg.Wait()
return resultErr
}

func makeBlockSource(settings *Settings, blockSource BlockSource, secondaryBlockSource BlockSource) BlockSource {
Expand Down
2 changes: 1 addition & 1 deletion cmd/blockimporter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func main() {
PollInterval: time.Second,
}

c := make(chan os.Signal)
c := make(chan os.Signal, 10)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
go func() {
<-c
Expand Down
22 changes: 18 additions & 4 deletions cmd/blockimporter/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

type State struct {
db *DB
trieLoader *trie.FlatDBTrieLoader
blockNum *big.Int
totalDifficulty *big.Int
chainConfig *chain.Config
Expand All @@ -41,7 +42,14 @@ func NewState(db *DB, initialBalances []BalanceEntry, chainID int64) (*State, er
}
defer tx.Rollback()

state := &State{db: db}
state := &State{
db: db,
trieLoader: trie.NewFlatDBTrieLoader("loader"),
}

if err := state.trieLoader.Reset(trie.NewRetainList(0), nil, nil, false); err != nil {
return nil, err
}

if blockNum := rawdb.ReadCurrentBlockNumber(tx); blockNum == nil || *blockNum == 0 {
// Close the transaction
Expand Down Expand Up @@ -97,7 +105,9 @@ func NewState(db *DB, initialBalances []BalanceEntry, chainID int64) (*State, er
if err = stagedsync.PromoteHashedStateCleanly("logPrefix", tx, stagedsync.StageHashStateCfg(db.chain, dirs, false, nil), context.Background()); err != nil {
return nil, fmt.Errorf("error while promoting state: %v", err)
}
if root, err := trie.CalcRoot("block state root", tx); err != nil {

root, err := state.trieLoader.CalcTrieRoot(tx, nil, nil)
if err != nil {
return nil, err
} else if root != block.Root() {
// This error may happen if we forgot to initialize the data for state root calculation.
Expand Down Expand Up @@ -167,12 +177,16 @@ func (state *State) ProcessBlock(block types.Block) error {
return nil
}

// check state root hash
dirs := datadir2.New(state.db.path)
if err = stagedsync.PromoteHashedStateIncrementally("hashedstate", block.NumberU64()-1, block.NumberU64(), tx, stagedsync.StageHashStateCfg(nil, dirs, false, nil), context.Background(), false); err != nil {
return err
}
if root, err := trie.CalcRoot("block state root", tx); err != nil {

s := stagedsync.StageState{
BlockNumber: block.NumberU64() - 1,
}
cfg := stagedsync.StageTrieCfg(state.db.chain, false, true, true, state.db.path, nil, nil, false, nil)
if root, err := stagedsync.IncrementIntermediateHashes("increment hashes", &s, tx, block.NumberU64(), cfg, common.Hash{}, nil); err != nil {
return err
} else if root != block.Root() {
return fmt.Errorf("invalid root, have: %s, want: %s", root.String(), block.Root().String())
Expand Down
4 changes: 2 additions & 2 deletions eth/stagedsync/stage_interhashes.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func SpawnIntermediateHashesStage(s *StageState, u Unwinder, tx kv.RwTx, cfg Tri
return trie.EmptyRoot, err
}
} else {
if root, err = incrementIntermediateHashes(logPrefix, s, tx, to, cfg, expectedRootHash, quit); err != nil {
if root, err = IncrementIntermediateHashes(logPrefix, s, tx, to, cfg, expectedRootHash, quit); err != nil {
return trie.EmptyRoot, err
}
}
Expand Down Expand Up @@ -522,7 +522,7 @@ func (p *HashPromoter) Unwind(logPrefix string, s *StageState, u *UnwindState, s
return nil
}

func incrementIntermediateHashes(logPrefix string, s *StageState, db kv.RwTx, to uint64, cfg TrieCfg, expectedRootHash libcommon.Hash, quit <-chan struct{}) (libcommon.Hash, error) {
func IncrementIntermediateHashes(logPrefix string, s *StageState, db kv.RwTx, to uint64, cfg TrieCfg, expectedRootHash libcommon.Hash, quit <-chan struct{}) (libcommon.Hash, error) {
p := NewHashPromoter(db, cfg.tmpDir, quit, logPrefix)
rl := trie.NewRetainList(0)
if cfg.historyV3 {
Expand Down
6 changes: 3 additions & 3 deletions eth/stagedsync/stage_interhashes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func TestAccountAndStorageTrie(t *testing.T) {

var s StageState
s.BlockNumber = 0
_, err = incrementIntermediateHashes("IH", &s, tx, 1 /* to */, cfg, libcommon.Hash{} /* expectedRootHash */, nil /* quit */)
_, err = IncrementIntermediateHashes("IH", &s, tx, 1 /* to */, cfg, libcommon.Hash{} /* expectedRootHash */, nil /* quit */)
assert.Nil(t, err)

accountTrieB := make(map[string][]byte)
Expand Down Expand Up @@ -299,7 +299,7 @@ func TestStorageDeletion(t *testing.T) {

var s StageState
s.BlockNumber = 0
_, err = incrementIntermediateHashes("IH", &s, tx, 1 /* to */, cfg, libcommon.Hash{} /* expectedRootHash */, nil /* quit */)
_, err = IncrementIntermediateHashes("IH", &s, tx, 1 /* to */, cfg, libcommon.Hash{} /* expectedRootHash */, nil /* quit */)
assert.Nil(t, err)

storageTrieB := make(map[string][]byte)
Expand Down Expand Up @@ -396,7 +396,7 @@ func TestHiveTrieRoot(t *testing.T) {

var s StageState
s.BlockNumber = 0
incrementalRoot, err := incrementIntermediateHashes("IH", &s, tx, 1 /* to */, cfg, libcommon.Hash{} /* expectedRootHash */, nil /* quit */)
incrementalRoot, err := IncrementIntermediateHashes("IH", &s, tx, 1 /* to */, cfg, libcommon.Hash{} /* expectedRootHash */, nil /* quit */)
require.Nil(t, err)

regeneratedRoot, err := RegenerateIntermediateHashes("IH", tx, cfg, libcommon.Hash{} /* expectedRootHash */, ctx)
Expand Down

0 comments on commit d4d385e

Please sign in to comment.