From 2eac24cb79b0c847a74e3f8395ab5808c4059dce Mon Sep 17 00:00:00 2001 From: Nishant Das Date: Tue, 5 May 2020 17:06:43 +0800 Subject: [PATCH] Retry Initialization of ETH1 Connection (#5730) * retry failures * Merge branch 'master' into retryInitialization * fix test * Merge branch 'retryInitialization' of https://github.com/prysmaticlabs/geth-sharding into retryInitialization * Merge refs/heads/master into retryInitialization * Merge refs/heads/master into retryInitialization * Merge refs/heads/master into retryInitialization * Merge refs/heads/master into retryInitialization * Merge refs/heads/master into retryInitialization * Merge refs/heads/master into retryInitialization * fix method * Merge branch 'retryInitialization' of https://github.com/prysmaticlabs/geth-sharding into retryInitialization * Merge refs/heads/master into retryInitialization * Merge refs/heads/master into retryInitialization * Merge refs/heads/master into retryInitialization * Merge refs/heads/master into retryInitialization --- beacon-chain/powchain/service.go | 84 +++++++++++++++++++-------- beacon-chain/powchain/service_test.go | 9 ++- 2 files changed, 65 insertions(+), 28 deletions(-) diff --git a/beacon-chain/powchain/service.go b/beacon-chain/powchain/service.go index e98bf19c31a8..2af2bdd60821 100644 --- a/beacon-chain/powchain/service.go +++ b/beacon-chain/powchain/service.go @@ -555,35 +555,73 @@ func (s *Service) handleDelayTicker() { } } -// run subscribes to all the services for the ETH1.0 chain. -func (s *Service) run(done <-chan struct{}) { - s.isRunning = true - s.runError = nil - if err := s.initDataFromContract(); err != nil { - log.Errorf("Unable to retrieve data from deposit contract %v", err) - return - } - - headSub, err := s.reader.SubscribeNewHead(s.ctx, s.headerChan) - if err != nil { - log.Errorf("Unable to subscribe to incoming ETH1.0 chain headers: %v", err) +func (s *Service) initPOWService() ethereum.Subscription { + // initialize a nil subscription + headSub := ethereum.Subscription(nil) + // reconnect to eth1 node in case of any failure + retryETH1Node := func(err error) { s.runError = err - return + s.connectedETH1 = false + s.waitForConnection() + // reset value in the event of a successful connection. + s.runError = nil } - header, err := s.blockFetcher.HeaderByNumber(context.Background(), nil) - if err != nil { - log.Errorf("Unable to retrieve latest ETH1.0 chain header: %v", err) - s.runError = err - return + // run in a select loop to retry in the event of any failures. + for { + select { + case <-s.ctx.Done(): + return headSub + default: + err := s.initDataFromContract() + if err != nil { + log.Errorf("Unable to retrieve data from deposit contract %v", err) + retryETH1Node(err) + continue + } + + headSub, err = s.reader.SubscribeNewHead(s.ctx, s.headerChan) + if err != nil { + log.Errorf("Unable to subscribe to incoming ETH1.0 chain headers: %v", err) + retryETH1Node(err) + continue + } + + if headSub == nil { + log.Errorf("Nil head subscription received: %v", err) + retryETH1Node(err) + continue + } + + header, err := s.blockFetcher.HeaderByNumber(context.Background(), nil) + if err != nil { + log.Errorf("Unable to retrieve latest ETH1.0 chain header: %v", err) + retryETH1Node(err) + continue + } + + s.latestEth1Data.BlockHeight = header.Number.Uint64() + s.latestEth1Data.BlockHash = header.Hash().Bytes() + + if err := s.processPastLogs(context.Background()); err != nil { + log.Errorf("Unable to process past logs %v", err) + retryETH1Node(err) + continue + } + return headSub + } } +} - s.latestEth1Data.BlockHeight = header.Number.Uint64() - s.latestEth1Data.BlockHash = header.Hash().Bytes() +// run subscribes to all the services for the ETH1.0 chain. +func (s *Service) run(done <-chan struct{}) { + var err error + s.isRunning = true + s.runError = nil - if err := s.processPastLogs(context.Background()); err != nil { - log.Errorf("Unable to process past logs %v", err) - s.runError = err + headSub := s.initPOWService() + if headSub == nil { + log.Error("Received a nil head subscription, exiting service") return } diff --git a/beacon-chain/powchain/service_test.go b/beacon-chain/powchain/service_test.go index bc53d9a7493a..b7850f97fae3 100644 --- a/beacon-chain/powchain/service_test.go +++ b/beacon-chain/powchain/service_test.go @@ -310,12 +310,11 @@ func TestWeb3Service_BadReader(t *testing.T) { testAcc.Backend.Commit() web3Service.reader = &badReader{} web3Service.logger = &goodLogger{} - web3Service.run(web3Service.ctx.Done()) - msg := hook.LastEntry().Message + go web3Service.initPOWService() + time.Sleep(200 * time.Millisecond) + web3Service.cancel() want := "Unable to subscribe to incoming ETH1.0 chain headers: subscription has failed" - if msg != want { - t.Errorf("incorrect log, expected %s, got %s", want, msg) - } + testutil.AssertLogsContain(t, hook, want) hook.Reset() }