Skip to content

Commit

Permalink
Cancel Initial Sync Properly (#5529)
Browse files Browse the repository at this point in the history
* fix cancelling
* Merge branch 'master' into cancelServices
* Merge refs/heads/master into cancelServices
* Merge refs/heads/master into cancelServices
  • Loading branch information
nisdas authored Apr 20, 2020
1 parent 9846442 commit 639e307
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 32 deletions.
6 changes: 5 additions & 1 deletion beacon-chain/sync/initial-sync-old/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type Config struct {
// Service service.
type Service struct {
ctx context.Context
cancel context.CancelFunc
chain blockchainService
p2p p2p.P2P
db db.ReadOnlyDatabase
Expand All @@ -60,8 +61,10 @@ type Service struct {
// NewInitialSync configures the initial sync service responsible for bringing the node up to the
// latest head of the blockchain.
func NewInitialSync(cfg *Config) *Service {
ctx, cancel := context.WithCancel(context.Background())
return &Service{
ctx: context.Background(),
ctx: ctx,
cancel: cancel,
chain: cfg.Chain,
p2p: cfg.P2P,
db: cfg.DB,
Expand Down Expand Up @@ -139,6 +142,7 @@ func (s *Service) Start() {

// Stop initial sync.
func (s *Service) Stop() error {
s.cancel()
return nil
}

Expand Down
6 changes: 5 additions & 1 deletion beacon-chain/sync/initial-sync/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type Config struct {
// Service service.
type Service struct {
ctx context.Context
cancel context.CancelFunc
chain blockchainService
p2p p2p.P2P
db db.ReadOnlyDatabase
Expand All @@ -60,8 +61,10 @@ type Service struct {
// NewInitialSync configures the initial sync service responsible for bringing the node up to the
// latest head of the blockchain.
func NewInitialSync(cfg *Config) *Service {
ctx, cancel := context.WithCancel(context.Background())
return &Service{
ctx: context.Background(),
ctx: ctx,
cancel: cancel,
chain: cfg.Chain,
p2p: cfg.P2P,
db: cfg.DB,
Expand Down Expand Up @@ -157,6 +160,7 @@ func (s *Service) Start() {

// Stop initial sync.
func (s *Service) Stop() error {
s.cancel()
return nil
}

Expand Down
60 changes: 30 additions & 30 deletions beacon-chain/sync/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,36 +62,6 @@ type blockchainService interface {
blockchain.GenesisFetcher
}

// NewRegularSync service.
func NewRegularSync(cfg *Config) *Service {
ctx, cancel := context.WithCancel(context.Background())
r := &Service{
ctx: ctx,
cancel: cancel,
db: cfg.DB,
p2p: cfg.P2P,
attPool: cfg.AttPool,
exitPool: cfg.ExitPool,
slashingPool: cfg.SlashingPool,
chain: cfg.Chain,
initialSync: cfg.InitialSync,
attestationNotifier: cfg.AttestationNotifier,
slotToPendingBlocks: make(map[uint64]*ethpb.SignedBeaconBlock),
seenPendingBlocks: make(map[[32]byte]bool),
blkRootToPendingAtts: make(map[[32]byte][]*ethpb.SignedAggregateAttestationAndProof),
stateNotifier: cfg.StateNotifier,
blockNotifier: cfg.BlockNotifier,
stateSummaryCache: cfg.StateSummaryCache,
stateGen: cfg.StateGen,
blocksRateLimiter: leakybucket.NewCollector(allowedBlocksPerSecond, allowedBlocksBurst, false /* deleteEmptyBuckets */),
}

r.registerRPCHandlers()
go r.registerSubscribers()

return r
}

// Service is responsible for handling all run time p2p related operations as the
// main entry point for network messages.
type Service struct {
Expand Down Expand Up @@ -129,6 +99,36 @@ type Service struct {
stateGen *stategen.State
}

// NewRegularSync service.
func NewRegularSync(cfg *Config) *Service {
ctx, cancel := context.WithCancel(context.Background())
r := &Service{
ctx: ctx,
cancel: cancel,
db: cfg.DB,
p2p: cfg.P2P,
attPool: cfg.AttPool,
exitPool: cfg.ExitPool,
slashingPool: cfg.SlashingPool,
chain: cfg.Chain,
initialSync: cfg.InitialSync,
attestationNotifier: cfg.AttestationNotifier,
slotToPendingBlocks: make(map[uint64]*ethpb.SignedBeaconBlock),
seenPendingBlocks: make(map[[32]byte]bool),
blkRootToPendingAtts: make(map[[32]byte][]*ethpb.SignedAggregateAttestationAndProof),
stateNotifier: cfg.StateNotifier,
blockNotifier: cfg.BlockNotifier,
stateSummaryCache: cfg.StateSummaryCache,
stateGen: cfg.StateGen,
blocksRateLimiter: leakybucket.NewCollector(allowedBlocksPerSecond, allowedBlocksBurst, false /* deleteEmptyBuckets */),
}

r.registerRPCHandlers()
go r.registerSubscribers()

return r
}

// Start the regular sync service.
func (r *Service) Start() {
if err := r.initCaches(); err != nil {
Expand Down

0 comments on commit 639e307

Please sign in to comment.