From 639e3072fe5f7cc335ad25d8b4a6cb386ad03a41 Mon Sep 17 00:00:00 2001 From: Nishant Das Date: Mon, 20 Apr 2020 12:04:45 +0800 Subject: [PATCH] Cancel Initial Sync Properly (#5529) * fix cancelling * Merge branch 'master' into cancelServices * Merge refs/heads/master into cancelServices * Merge refs/heads/master into cancelServices --- beacon-chain/sync/initial-sync-old/service.go | 6 +- beacon-chain/sync/initial-sync/service.go | 6 +- beacon-chain/sync/service.go | 60 +++++++++---------- 3 files changed, 40 insertions(+), 32 deletions(-) diff --git a/beacon-chain/sync/initial-sync-old/service.go b/beacon-chain/sync/initial-sync-old/service.go index adc89b71e94e..14abc93863ef 100644 --- a/beacon-chain/sync/initial-sync-old/service.go +++ b/beacon-chain/sync/initial-sync-old/service.go @@ -47,6 +47,7 @@ type Config struct { // Service service. type Service struct { ctx context.Context + cancel context.CancelFunc chain blockchainService p2p p2p.P2P db db.ReadOnlyDatabase @@ -60,8 +61,10 @@ type Service struct { // NewInitialSync configures the initial sync service responsible for bringing the node up to the // latest head of the blockchain. func NewInitialSync(cfg *Config) *Service { + ctx, cancel := context.WithCancel(context.Background()) return &Service{ - ctx: context.Background(), + ctx: ctx, + cancel: cancel, chain: cfg.Chain, p2p: cfg.P2P, db: cfg.DB, @@ -139,6 +142,7 @@ func (s *Service) Start() { // Stop initial sync. func (s *Service) Stop() error { + s.cancel() return nil } diff --git a/beacon-chain/sync/initial-sync/service.go b/beacon-chain/sync/initial-sync/service.go index 1fa04e8adffd..10501d9038aa 100644 --- a/beacon-chain/sync/initial-sync/service.go +++ b/beacon-chain/sync/initial-sync/service.go @@ -47,6 +47,7 @@ type Config struct { // Service service. type Service struct { ctx context.Context + cancel context.CancelFunc chain blockchainService p2p p2p.P2P db db.ReadOnlyDatabase @@ -60,8 +61,10 @@ type Service struct { // NewInitialSync configures the initial sync service responsible for bringing the node up to the // latest head of the blockchain. func NewInitialSync(cfg *Config) *Service { + ctx, cancel := context.WithCancel(context.Background()) return &Service{ - ctx: context.Background(), + ctx: ctx, + cancel: cancel, chain: cfg.Chain, p2p: cfg.P2P, db: cfg.DB, @@ -157,6 +160,7 @@ func (s *Service) Start() { // Stop initial sync. func (s *Service) Stop() error { + s.cancel() return nil } diff --git a/beacon-chain/sync/service.go b/beacon-chain/sync/service.go index cf3967ce11b6..362d1f74ddee 100644 --- a/beacon-chain/sync/service.go +++ b/beacon-chain/sync/service.go @@ -62,36 +62,6 @@ type blockchainService interface { blockchain.GenesisFetcher } -// NewRegularSync service. -func NewRegularSync(cfg *Config) *Service { - ctx, cancel := context.WithCancel(context.Background()) - r := &Service{ - ctx: ctx, - cancel: cancel, - db: cfg.DB, - p2p: cfg.P2P, - attPool: cfg.AttPool, - exitPool: cfg.ExitPool, - slashingPool: cfg.SlashingPool, - chain: cfg.Chain, - initialSync: cfg.InitialSync, - attestationNotifier: cfg.AttestationNotifier, - slotToPendingBlocks: make(map[uint64]*ethpb.SignedBeaconBlock), - seenPendingBlocks: make(map[[32]byte]bool), - blkRootToPendingAtts: make(map[[32]byte][]*ethpb.SignedAggregateAttestationAndProof), - stateNotifier: cfg.StateNotifier, - blockNotifier: cfg.BlockNotifier, - stateSummaryCache: cfg.StateSummaryCache, - stateGen: cfg.StateGen, - blocksRateLimiter: leakybucket.NewCollector(allowedBlocksPerSecond, allowedBlocksBurst, false /* deleteEmptyBuckets */), - } - - r.registerRPCHandlers() - go r.registerSubscribers() - - return r -} - // Service is responsible for handling all run time p2p related operations as the // main entry point for network messages. type Service struct { @@ -129,6 +99,36 @@ type Service struct { stateGen *stategen.State } +// NewRegularSync service. +func NewRegularSync(cfg *Config) *Service { + ctx, cancel := context.WithCancel(context.Background()) + r := &Service{ + ctx: ctx, + cancel: cancel, + db: cfg.DB, + p2p: cfg.P2P, + attPool: cfg.AttPool, + exitPool: cfg.ExitPool, + slashingPool: cfg.SlashingPool, + chain: cfg.Chain, + initialSync: cfg.InitialSync, + attestationNotifier: cfg.AttestationNotifier, + slotToPendingBlocks: make(map[uint64]*ethpb.SignedBeaconBlock), + seenPendingBlocks: make(map[[32]byte]bool), + blkRootToPendingAtts: make(map[[32]byte][]*ethpb.SignedAggregateAttestationAndProof), + stateNotifier: cfg.StateNotifier, + blockNotifier: cfg.BlockNotifier, + stateSummaryCache: cfg.StateSummaryCache, + stateGen: cfg.StateGen, + blocksRateLimiter: leakybucket.NewCollector(allowedBlocksPerSecond, allowedBlocksBurst, false /* deleteEmptyBuckets */), + } + + r.registerRPCHandlers() + go r.registerSubscribers() + + return r +} + // Start the regular sync service. func (r *Service) Start() { if err := r.initCaches(); err != nil {