Skip to content

Commit

Permalink
resolve race in closing sync segments channel (#1201)
Browse files Browse the repository at this point in the history
* resolve race in closing sync channel

* fix comment
  • Loading branch information
darioush authored Jun 4, 2024
1 parent 4a75fa7 commit 22fc7cc
Showing 1 changed file with 52 additions and 33 deletions.
85 changes: 52 additions & 33 deletions sync/statesync/state_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type stateSync struct {

// track completion and progress of work
mainTrieDone chan struct{}
storageTriesDone chan struct{}
triesInProgressSem chan struct{}
done chan error
stats *trieSyncStats
Expand All @@ -79,9 +80,10 @@ func NewStateSyncer(config *StateSyncerConfig) (*stateSync, error) {
// Each [trieToSync] will have a maximum of [numSegments] segments.
// We set the capacity of [segments] such that [defaultNumThreads]
// storage tries can sync concurrently.
segments: make(chan syncclient.LeafSyncTask, defaultNumThreads*numStorageTrieSegments),
mainTrieDone: make(chan struct{}),
done: make(chan error, 1),
segments: make(chan syncclient.LeafSyncTask, defaultNumThreads*numStorageTrieSegments),
mainTrieDone: make(chan struct{}),
storageTriesDone: make(chan struct{}),
done: make(chan error, 1),
}
ss.syncer = syncclient.NewCallbackLeafSyncer(config.Client, ss.segments, config.RequestSize)
ss.codeSyncer = newCodeSyncer(CodeSyncerConfig{
Expand Down Expand Up @@ -115,7 +117,19 @@ func (t *stateSync) onStorageTrieFinished(root common.Hash) error {
return err
}
// track the completion of this storage trie
return t.removeTrieInProgress(root)
numInProgress, err := t.removeTrieInProgress(root)
if err != nil {
return err
}
if numInProgress == 0 {
select {
case <-t.storageTriesDone:
// when the last storage trie finishes, close the segments channel
close(t.segments)
default:
}
}
return nil
}

// onMainTrieFinished is called after the main trie finishes syncing.
Expand All @@ -131,7 +145,8 @@ func (t *stateSync) onMainTrieFinished() error {

// mark the main trie done
close(t.mainTrieDone)
return t.removeTrieInProgress(t.root)
_, err = t.removeTrieInProgress(t.root)
return err
}

// onSyncComplete is called after the account trie and
Expand Down Expand Up @@ -167,30 +182,36 @@ func (t *stateSync) storageTrieProducer(ctx context.Context) error {
return err
}
// If there are no storage tries, then root will be the empty hash on the first pass.
if root != (common.Hash{}) {
// acquire semaphore (to keep number of tries in progress limited)
select {
case t.triesInProgressSem <- struct{}{}:
case <-ctx.Done():
return ctx.Err()
}
if root == (common.Hash{}) && !more {
close(t.segments)
return nil
}

// Arbitrarily use the first account for making requests to the server.
// Note: getNextTrie guarantees that if a non-nil storage root is returned, then the
// slice of account hashes is non-empty.
syncAccount := accounts[0]
// create a trieToSync for the storage trie and mark it as in progress.
storageTrie, err := NewTrieToSync(t, root, syncAccount, NewStorageTrieTask(t, root, accounts))
if err != nil {
return err
}
t.addTrieInProgress(root, storageTrie)
storageTrie.startSyncing() // start syncing after tracking the trie as in progress
// acquire semaphore (to keep number of tries in progress limited)
select {
case t.triesInProgressSem <- struct{}{}:
case <-ctx.Done():
return ctx.Err()
}

// Arbitrarily use the first account for making requests to the server.
// Note: getNextTrie guarantees that if a non-nil storage root is returned, then the
// slice of account hashes is non-empty.
syncAccount := accounts[0]

// create a trieToSync for the storage trie and mark it as in progress.
storageTrie, err := NewTrieToSync(t, root, syncAccount, NewStorageTrieTask(t, root, accounts))
if err != nil {
return err
}
// if there are no more storage tries, close
// the task queue and exit the producer.
t.addTrieInProgress(root, storageTrie)
if !more {
close(t.storageTriesDone)
}
// start syncing after tracking the trie as in progress
storageTrie.startSyncing()

if !more {
close(t.segments)
return nil
}
}
Expand Down Expand Up @@ -233,20 +254,18 @@ func (t *stateSync) addTrieInProgress(root common.Hash, trie *trieToSync) {
t.triesInProgress[root] = trie
}

// removeTrieInProgress removes root from the set of tracked
// tries in progress and notifies the storage root producer
// so it can continue in case it was paused due to the
// maximum number of tries in progress being previously reached.
func (t *stateSync) removeTrieInProgress(root common.Hash) error {
// removeTrieInProgress removes root from the set of tracked tries in progress
// and returns the number of tries in progress after the removal.
func (t *stateSync) removeTrieInProgress(root common.Hash) (int, error) {
t.lock.Lock()
defer t.lock.Unlock()

t.stats.trieDone(root)
if _, ok := t.triesInProgress[root]; !ok {
return fmt.Errorf("removeTrieInProgress for unexpected root: %s", root)
return 0, fmt.Errorf("removeTrieInProgress for unexpected root: %s", root)
}
delete(t.triesInProgress, root)
return nil
return len(t.triesInProgress), nil
}

// onSyncFailure is called if the sync fails, this writes all
Expand Down

0 comments on commit 22fc7cc

Please sign in to comment.