Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch Eth1 RPC Calls #4392

Merged
merged 44 commits into from
Jan 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
8f9c03b
add new methods
nisdas Jan 3, 2020
a18c991
Merge branch 'v0.9.2' of https://github.com/prysmaticlabs/geth-shardi…
nisdas Jan 7, 2020
92376d2
get it working
nisdas Jan 7, 2020
e213d8d
optimize past deposit logs processing
nisdas Jan 7, 2020
f675113
revert change
nisdas Jan 7, 2020
d482933
fix all tests
nisdas Jan 7, 2020
0e0b621
use mock
nisdas Jan 7, 2020
0460399
lint
nisdas Jan 7, 2020
3105d87
lint
nisdas Jan 7, 2020
9454546
check for nil
nisdas Jan 7, 2020
3817fa6
stop panics
nisdas Jan 7, 2020
14c79ec
Merge refs/heads/v0.9.2 into batchCalls
prylabs-bulldozer[bot] Jan 7, 2020
60abc2e
Merge refs/heads/v0.9.2 into batchCalls
prylabs-bulldozer[bot] Jan 7, 2020
75132e5
Merge refs/heads/v0.9.2 into batchCalls
prylabs-bulldozer[bot] Jan 7, 2020
5dc9f5f
Merge refs/heads/v0.9.2 into batchCalls
prylabs-bulldozer[bot] Jan 7, 2020
b888374
Merge refs/heads/v0.9.2 into batchCalls
prylabs-bulldozer[bot] Jan 7, 2020
a4cb475
Merge refs/heads/v0.9.2 into batchCalls
prylabs-bulldozer[bot] Jan 7, 2020
d1b8b74
Merge refs/heads/v0.9.2 into batchCalls
prylabs-bulldozer[bot] Jan 7, 2020
495a4b2
Merge refs/heads/v0.9.2 into batchCalls
prylabs-bulldozer[bot] Jan 7, 2020
46d8fc0
Merge refs/heads/v0.9.2 into batchCalls
prylabs-bulldozer[bot] Jan 7, 2020
bb00951
Merge refs/heads/v0.9.2 into batchCalls
prylabs-bulldozer[bot] Jan 7, 2020
e8b5826
Merge refs/heads/v0.9.2 into batchCalls
prylabs-bulldozer[bot] Jan 7, 2020
d29d7f3
Merge refs/heads/v0.9.2 into batchCalls
prylabs-bulldozer[bot] Jan 7, 2020
099cea1
Merge refs/heads/v0.9.2 into batchCalls
prylabs-bulldozer[bot] Jan 7, 2020
0cb83d7
Merge refs/heads/v0.9.2 into batchCalls
prylabs-bulldozer[bot] Jan 7, 2020
498dcd4
Merge refs/heads/v0.9.2 into batchCalls
prylabs-bulldozer[bot] Jan 7, 2020
68a5119
Merge refs/heads/v0.9.2 into batchCalls
prylabs-bulldozer[bot] Jan 7, 2020
cbf872a
Merge refs/heads/v0.9.2 into batchCalls
prylabs-bulldozer[bot] Jan 7, 2020
d9fe7cb
Merge refs/heads/v0.9.2 into batchCalls
prylabs-bulldozer[bot] Jan 7, 2020
cda561b
Merge refs/heads/v0.9.2 into batchCalls
prylabs-bulldozer[bot] Jan 7, 2020
b24ee96
Merge refs/heads/v0.9.2 into batchCalls
prylabs-bulldozer[bot] Jan 7, 2020
0c884e5
Merge refs/heads/v0.9.2 into batchCalls
prylabs-bulldozer[bot] Jan 7, 2020
7a78252
Merge refs/heads/v0.9.2 into batchCalls
prylabs-bulldozer[bot] Jan 7, 2020
403bdcc
Merge refs/heads/v0.9.2 into batchCalls
prylabs-bulldozer[bot] Jan 7, 2020
abe9b07
Merge refs/heads/v0.9.2 into batchCalls
prylabs-bulldozer[bot] Jan 7, 2020
f429526
Merge refs/heads/v0.9.2 into batchCalls
prylabs-bulldozer[bot] Jan 7, 2020
f1dc6ab
Merge refs/heads/v0.9.2 into batchCalls
prylabs-bulldozer[bot] Jan 7, 2020
96daf5c
Apply suggestions from code review
nisdas Jan 7, 2020
7ceaeea
Terence's Review
nisdas Jan 7, 2020
5383b80
Merge refs/heads/v0.9.2 into batchCalls
prylabs-bulldozer[bot] Jan 7, 2020
06f117a
Merge refs/heads/v0.9.2 into batchCalls
prylabs-bulldozer[bot] Jan 7, 2020
5d7bcc0
Merge refs/heads/v0.9.2 into batchCalls
prylabs-bulldozer[bot] Jan 7, 2020
e18f3df
Merge refs/heads/v0.9.2 into batchCalls
prylabs-bulldozer[bot] Jan 7, 2020
afcda7b
Merge refs/heads/v0.9.2 into batchCalls
prylabs-bulldozer[bot] Jan 7, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions beacon-chain/powchain/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ go_test(
"//beacon-chain/core/state:go_default_library",
"//beacon-chain/db:go_default_library",
"//beacon-chain/db/testing:go_default_library",
"//beacon-chain/powchain/testing:go_default_library",
"//contracts/deposit-contract:go_default_library",
"//proto/beacon/db:go_default_library",
"//shared/bls:go_default_library",
Expand Down
3 changes: 3 additions & 0 deletions beacon-chain/powchain/block_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil"
gethTypes "github.com/ethereum/go-ethereum/core/types"
dbutil "github.com/prysmaticlabs/prysm/beacon-chain/db/testing"
mockPOW "github.com/prysmaticlabs/prysm/beacon-chain/powchain/testing"
contracts "github.com/prysmaticlabs/prysm/contracts/deposit-contract"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
)
Expand Down Expand Up @@ -42,6 +43,8 @@ func TestLatestMainchainInfo_OK(t *testing.T) {
t.Fatalf("unable to setup web3 ETH1.0 chain service: %v", err)
}
web3Service = setDefaultMocks(web3Service)
web3Service.rpcClient = &mockPOW.RPCClient{Backend: testAcc.Backend}

web3Service.depositContractCaller, err = contracts.NewDepositContractCaller(testAcc.ContractAddr, testAcc.Backend)
if err != nil {
t.Fatal(err)
Expand Down
74 changes: 56 additions & 18 deletions beacon-chain/powchain/log_processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ var (

const eth1LookBackPeriod = 100
const eth1DataSavingInterval = 100
const eth1HeaderReqLimit = 2000

// Eth2GenesisPowchainInfo retrieves the genesis time and eth1 block number of the beacon chain
// from the deposit contract.
Expand Down Expand Up @@ -62,7 +63,7 @@ func (s *Service) ProcessETH1Block(ctx context.Context, blkNum *big.Int) error {
}
}
if !s.chainStartData.Chainstarted {
if err := s.checkForChainStart(ctx, blkNum); err != nil {
if err := s.checkBlockNumberForChainStart(ctx, blkNum); err != nil {
return err
}
}
Expand Down Expand Up @@ -262,12 +263,42 @@ func (s *Service) processPastLogs(ctx context.Context) error {
if err != nil {
return err
}
// To store all blocks.
headersMap := make(map[uint64]*gethTypes.Header)

// Batch request the desired headers and store them in a
// map for quick access.
requestHeaders := func(startBlk uint64, endBlk uint64) error {
headers, err := s.batchRequestHeaders(startBlk, endBlk)
if err != nil {
return err
}
for _, h := range headers {
if h != nil && h.Number != nil {
headersMap[h.Number.Uint64()] = h
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we not need mutex protection for headersMap?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is all single-threaded, so using a mutex isnt required since there isnt any concurrent access

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool. What i thought but still wanted to confirm

}
}
return nil
}

if err := requestHeaders(currentBlockNum, currentBlockNum+eth1HeaderReqLimit); err != nil {
return err
}

for _, log := range logs {
if log.BlockNumber > currentBlockNum {
if !s.chainStartData.Chainstarted {
if err := s.checkForChainStart(ctx, big.NewInt(int64(currentBlockNum))); err != nil {
return err
for i := currentBlockNum; i <= log.BlockNumber-1; i++ {
if !s.chainStartData.Chainstarted {
h, ok := headersMap[i]
if !ok {
if err := requestHeaders(i, i+eth1HeaderReqLimit); err != nil {
return err
}
// Retry this block.
i--
continue
}
s.checkHeaderForChainstart(h)
}
}
// set new block number after checking for chainstart for previous block.
Expand All @@ -280,7 +311,6 @@ func (s *Service) processPastLogs(ctx context.Context) error {
}

s.latestEth1Data.LastRequestedBlock = currentBlockNum

currentState, err := s.beaconDB.HeadState(ctx)
if err != nil {
return errors.Wrap(err, "could not get head state")
Expand Down Expand Up @@ -355,24 +385,32 @@ func (s *Service) processBlksInRange(ctx context.Context, startBlk uint64, endBl
return nil
}

// checkForChainStart checks the given block number for if chainstart has occurred.
func (s *Service) checkForChainStart(ctx context.Context, blkNum *big.Int) error {
blk, err := s.blockFetcher.BlockByNumber(ctx, blkNum)
// checkBlockNumberForChainStart checks the given block number for if chainstart has occurred.
func (s *Service) checkBlockNumberForChainStart(ctx context.Context, blkNum *big.Int) error {
hash, err := s.BlockHashByHeight(ctx, blkNum)
if err != nil {
return errors.Wrap(err, "could not get eth1 block")
return errors.Wrap(err, "could not get eth1 block hash")
}
if blk == nil {
return errors.Wrap(err, "got empty block from powchain service")
if hash == [32]byte{} {
return errors.Wrap(err, "got empty block hash")
}
if blk.Hash() == [32]byte{} {
return errors.New("got empty blockhash from powchain service")
timeStamp, err := s.BlockTimeByHeight(ctx, blkNum)
if err != nil {
return errors.Wrap(err, "could not get block timestamp")
}
timeStamp := blk.Time()
s.checkForChainstart(hash, blkNum, timeStamp)
return nil
}

func (s *Service) checkHeaderForChainstart(header *gethTypes.Header) {
s.checkForChainstart(header.Hash(), header.Number, header.Time)
}

func (s *Service) checkForChainstart(blockHash [32]byte, blockNumber *big.Int, blockTime uint64) {
valCount, _ := helpers.ActiveValidatorCount(s.preGenesisState, 0)
triggered := state.IsValidGenesisState(valCount, s.createGenesisTime(timeStamp))
triggered := state.IsValidGenesisState(valCount, s.createGenesisTime(blockTime))
if triggered {
s.chainStartData.GenesisTime = s.createGenesisTime(timeStamp)
s.ProcessChainStart(s.chainStartData.GenesisTime, blk.Hash(), blk.Number())
s.chainStartData.GenesisTime = s.createGenesisTime(blockTime)
s.ProcessChainStart(s.chainStartData.GenesisTime, blockHash, blockNumber)
}
return nil
}
3 changes: 3 additions & 0 deletions beacon-chain/powchain/log_processing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/core/state"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
testDB "github.com/prysmaticlabs/prysm/beacon-chain/db/testing"
mockPOW "github.com/prysmaticlabs/prysm/beacon-chain/powchain/testing"
contracts "github.com/prysmaticlabs/prysm/contracts/deposit-contract"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/testutil"
Expand Down Expand Up @@ -451,6 +452,7 @@ func TestProcessETH2GenesisLog_CorrectNumOfDeposits(t *testing.T) {
if err != nil {
t.Fatal(err)
}
web3Service.rpcClient = &mockPOW.RPCClient{Backend: testAcc.Backend}
web3Service.httpLogger = testAcc.Backend
web3Service.latestEth1Data.LastRequestedBlock = 0
web3Service.latestEth1Data.BlockHeight = 0
Expand Down Expand Up @@ -711,6 +713,7 @@ func newPowchainService(t *testing.T, eth1Backend *contracts.TestAccount, beacon
if err != nil {
t.Fatal(err)
}
web3Service.rpcClient = &mockPOW.RPCClient{Backend: eth1Backend.Backend}
web3Service.reader = &goodReader{backend: eth1Backend.Backend}
web3Service.blockFetcher = &goodFetcher{backend: eth1Backend.Backend}
web3Service.httpLogger = &goodLogger{backend: eth1Backend.Backend}
Expand Down
63 changes: 55 additions & 8 deletions beacon-chain/powchain/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil"
gethTypes "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/rpc"
gethRPC "github.com/ethereum/go-ethereum/rpc"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -106,6 +107,11 @@ type RPCBlockFetcher interface {
BlockByHash(ctx context.Context, hash common.Hash) (*gethTypes.Block, error)
}

// RPCClient defines the rpc methods required to interact with the eth1 node.
type RPCClient interface {
BatchCall(b []rpc.BatchElem) error
}

// Service fetches important information about the canonical
// Ethereum ETH1.0 chain via a web3 endpoint using an ethclient. The Random
// Beacon Chain requires synchronization with the ETH1.0 chain's current
Expand All @@ -125,6 +131,7 @@ type Service struct {
logger bind.ContractFilterer
httpLogger bind.ContractFilterer
blockFetcher RPCBlockFetcher
rpcClient RPCClient
blockCache *blockCache // cache to store block hash/block height.
latestEth1Data *protodb.LatestETH1Data
depositContractCaller *contracts.DepositContractCaller
Expand Down Expand Up @@ -319,7 +326,7 @@ func (s *Service) AreAllDepositsProcessed() (bool, error) {
}

func (s *Service) connectToPowChain() error {
powClient, httpClient, err := s.dialETH1Nodes()
powClient, httpClient, rpcClient, err := s.dialETH1Nodes()
if err != nil {
return errors.Wrap(err, "could not dial eth1 nodes")
}
Expand All @@ -329,36 +336,37 @@ func (s *Service) connectToPowChain() error {
return errors.Wrap(err, "could not create deposit contract caller")
}

s.initializeConnection(powClient, httpClient, depositContractCaller)
s.initializeConnection(powClient, httpClient, rpcClient, depositContractCaller)
return nil
}

func (s *Service) dialETH1Nodes() (*ethclient.Client, *ethclient.Client, error) {
func (s *Service) dialETH1Nodes() (*ethclient.Client, *ethclient.Client, *rpc.Client, error) {
httpRPCClient, err := gethRPC.Dial(s.httpEndpoint)
if err != nil {
return nil, nil, err
return nil, nil, nil, err
}
httpClient := ethclient.NewClient(httpRPCClient)

rpcClient, err := gethRPC.Dial(s.eth1Endpoint)
if err != nil {
httpClient.Close()
return nil, nil, err
return nil, nil, nil, err
}
powClient := ethclient.NewClient(rpcClient)

return powClient, httpClient, nil
return powClient, httpClient, httpRPCClient, nil
}

func (s *Service) initializeConnection(powClient *ethclient.Client,
httpClient *ethclient.Client, contractCaller *contracts.DepositContractCaller) {
httpClient *ethclient.Client, rpcClient *rpc.Client, contractCaller *contracts.DepositContractCaller) {

s.reader = powClient
s.logger = powClient
s.client = httpClient
s.httpLogger = httpClient
s.blockFetcher = httpClient
s.depositContractCaller = contractCaller
s.rpcClient = rpcClient
}

func (s *Service) waitForConnection() {
Expand Down Expand Up @@ -448,6 +456,45 @@ func (s *Service) processSubscribedHeaders(header *gethTypes.Header) {
}
}

// batchRequestHeaders requests the block range specified in the arguments. Instead of requesting
// each block in one call, it batches all requests into a single rpc call.
func (s *Service) batchRequestHeaders(startBlock uint64, endBlock uint64) ([]*gethTypes.Header, error) {
nisdas marked this conversation as resolved.
Show resolved Hide resolved
requestRange := (endBlock - startBlock) + 1
elems := make([]rpc.BatchElem, 0, requestRange)
headers := make([]*gethTypes.Header, 0, requestRange)
errors := make([]error, 0, requestRange)
if requestRange == 0 {
return headers, nil
}
for i := startBlock; i <= endBlock; i++ {
header := &gethTypes.Header{}
err := error(nil)
elems = append(elems, rpc.BatchElem{
Method: "eth_getBlockByNumber",
Args: []interface{}{hexutil.EncodeBig(big.NewInt(int64(i))), true},
Result: header,
Error: err,
})
headers = append(headers, header)
errors = append(errors, err)
}
ioErr := s.rpcClient.BatchCall(elems)
if ioErr != nil {
return nil, ioErr
}
for _, e := range errors {
if e != nil {
return nil, e
}
}
for _, h := range headers {
if h != nil {
s.blockCache.AddBlock(gethTypes.NewBlockWithHeader(h))
}
}
return headers, nil
}

// safelyHandleHeader will recover and log any panic that occurs from the
// block
func safelyHandlePanic() {
Expand All @@ -463,7 +510,7 @@ func safelyHandlePanic() {
func (s *Service) handleDelayTicker() {
defer safelyHandlePanic()
if !s.chainStartData.Chainstarted {
if err := s.checkForChainStart(context.Background(), big.NewInt(int64(s.latestEth1Data.LastRequestedBlock))); err != nil {
if err := s.checkBlockNumberForChainStart(context.Background(), big.NewInt(int64(s.latestEth1Data.LastRequestedBlock))); err != nil {
s.runError = err
log.Error(err)
return
Expand Down
2 changes: 2 additions & 0 deletions beacon-chain/powchain/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/ethereum/go-ethereum/core"
gethTypes "github.com/ethereum/go-ethereum/core/types"
dbutil "github.com/prysmaticlabs/prysm/beacon-chain/db/testing"
mockPOW "github.com/prysmaticlabs/prysm/beacon-chain/powchain/testing"
contracts "github.com/prysmaticlabs/prysm/contracts/deposit-contract"
depositcontract "github.com/prysmaticlabs/prysm/contracts/deposit-contract"
protodb "github.com/prysmaticlabs/prysm/proto/beacon/db"
Expand Down Expand Up @@ -207,6 +208,7 @@ func TestStart_OK(t *testing.T) {
t.Fatalf("unable to setup web3 ETH1.0 chain service: %v", err)
}
web3Service = setDefaultMocks(web3Service)
web3Service.rpcClient = &mockPOW.RPCClient{Backend: testAcc.Backend}
web3Service.depositContractCaller, err = contracts.NewDepositContractCaller(testAcc.ContractAddr, testAcc.Backend)
if err != nil {
t.Fatal(err)
Expand Down
4 changes: 4 additions & 0 deletions beacon-chain/powchain/testing/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@ go_library(
"//shared/bytesutil:go_default_library",
"//shared/event:go_default_library",
"//shared/trieutil:go_default_library",
"@com_github_ethereum_go_ethereum//accounts/abi/bind/backends:go_default_library",
"@com_github_ethereum_go_ethereum//common:go_default_library",
"@com_github_ethereum_go_ethereum//common/hexutil:go_default_library",
"@com_github_ethereum_go_ethereum//core/types:go_default_library",
"@com_github_ethereum_go_ethereum//rpc:go_default_library",
"@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library",
],
)
25 changes: 25 additions & 0 deletions beacon-chain/powchain/testing/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ import (
"math/big"
"time"

"github.com/ethereum/go-ethereum/accounts/abi/bind/backends"
"github.com/ethereum/go-ethereum/common/hexutil"
gethTypes "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/common"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
Expand Down Expand Up @@ -100,3 +104,24 @@ func (m *POWChain) PreGenesisState() *pb.BeaconState {
func (m *POWChain) IsConnectedToETH1() bool {
return true
}

// RPCClient defines the mock rpc client.
type RPCClient struct {
Backend *backends.SimulatedBackend
}

// BatchCall --
func (r *RPCClient) BatchCall(b []rpc.BatchElem) error {
if r.Backend == nil {
return nil
}

for _, r := range b {
num, err := hexutil.DecodeBig(r.Args[0].(string))
if err != nil {
return err
}
r.Result.(*gethTypes.Header).Number = num
}
return nil
}