Skip to content

Commit

Permalink
Retry Initialization of ETH1 Connection (#5730)
Browse files Browse the repository at this point in the history
* retry failures
* Merge branch 'master' into retryInitialization
* fix test
* Merge branch 'retryInitialization' of https://github.com/prysmaticlabs/geth-sharding into retryInitialization
* Merge refs/heads/master into retryInitialization
* Merge refs/heads/master into retryInitialization
* Merge refs/heads/master into retryInitialization
* Merge refs/heads/master into retryInitialization
* Merge refs/heads/master into retryInitialization
* Merge refs/heads/master into retryInitialization
* fix method
* Merge branch 'retryInitialization' of https://github.com/prysmaticlabs/geth-sharding into retryInitialization
* Merge refs/heads/master into retryInitialization
* Merge refs/heads/master into retryInitialization
* Merge refs/heads/master into retryInitialization
* Merge refs/heads/master into retryInitialization
  • Loading branch information
nisdas authored May 5, 2020
1 parent bbde2a6 commit 2eac24c
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 28 deletions.
84 changes: 61 additions & 23 deletions beacon-chain/powchain/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,35 +555,73 @@ func (s *Service) handleDelayTicker() {
}
}

// run subscribes to all the services for the ETH1.0 chain.
func (s *Service) run(done <-chan struct{}) {
s.isRunning = true
s.runError = nil
if err := s.initDataFromContract(); err != nil {
log.Errorf("Unable to retrieve data from deposit contract %v", err)
return
}

headSub, err := s.reader.SubscribeNewHead(s.ctx, s.headerChan)
if err != nil {
log.Errorf("Unable to subscribe to incoming ETH1.0 chain headers: %v", err)
func (s *Service) initPOWService() ethereum.Subscription {
// initialize a nil subscription
headSub := ethereum.Subscription(nil)
// reconnect to eth1 node in case of any failure
retryETH1Node := func(err error) {
s.runError = err
return
s.connectedETH1 = false
s.waitForConnection()
// reset value in the event of a successful connection.
s.runError = nil
}

header, err := s.blockFetcher.HeaderByNumber(context.Background(), nil)
if err != nil {
log.Errorf("Unable to retrieve latest ETH1.0 chain header: %v", err)
s.runError = err
return
// run in a select loop to retry in the event of any failures.
for {
select {
case <-s.ctx.Done():
return headSub
default:
err := s.initDataFromContract()
if err != nil {
log.Errorf("Unable to retrieve data from deposit contract %v", err)
retryETH1Node(err)
continue
}

headSub, err = s.reader.SubscribeNewHead(s.ctx, s.headerChan)
if err != nil {
log.Errorf("Unable to subscribe to incoming ETH1.0 chain headers: %v", err)
retryETH1Node(err)
continue
}

if headSub == nil {
log.Errorf("Nil head subscription received: %v", err)
retryETH1Node(err)
continue
}

header, err := s.blockFetcher.HeaderByNumber(context.Background(), nil)
if err != nil {
log.Errorf("Unable to retrieve latest ETH1.0 chain header: %v", err)
retryETH1Node(err)
continue
}

s.latestEth1Data.BlockHeight = header.Number.Uint64()
s.latestEth1Data.BlockHash = header.Hash().Bytes()

if err := s.processPastLogs(context.Background()); err != nil {
log.Errorf("Unable to process past logs %v", err)
retryETH1Node(err)
continue
}
return headSub
}
}
}

s.latestEth1Data.BlockHeight = header.Number.Uint64()
s.latestEth1Data.BlockHash = header.Hash().Bytes()
// run subscribes to all the services for the ETH1.0 chain.
func (s *Service) run(done <-chan struct{}) {
var err error
s.isRunning = true
s.runError = nil

if err := s.processPastLogs(context.Background()); err != nil {
log.Errorf("Unable to process past logs %v", err)
s.runError = err
headSub := s.initPOWService()
if headSub == nil {
log.Error("Received a nil head subscription, exiting service")
return
}

Expand Down
9 changes: 4 additions & 5 deletions beacon-chain/powchain/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,12 +310,11 @@ func TestWeb3Service_BadReader(t *testing.T) {
testAcc.Backend.Commit()
web3Service.reader = &badReader{}
web3Service.logger = &goodLogger{}
web3Service.run(web3Service.ctx.Done())
msg := hook.LastEntry().Message
go web3Service.initPOWService()
time.Sleep(200 * time.Millisecond)
web3Service.cancel()
want := "Unable to subscribe to incoming ETH1.0 chain headers: subscription has failed"
if msg != want {
t.Errorf("incorrect log, expected %s, got %s", want, msg)
}
testutil.AssertLogsContain(t, hook, want)
hook.Reset()
}

Expand Down

0 comments on commit 2eac24c

Please sign in to comment.