From 22fc7ccdeb380592872b85815a2f72d9b5b010dc Mon Sep 17 00:00:00 2001 From: Darioush Jalali Date: Tue, 4 Jun 2024 06:56:58 -0700 Subject: [PATCH] resolve race in closing sync segments channel (#1201) * resolve race in closing sync channel * fix comment --- sync/statesync/state_syncer.go | 85 +++++++++++++++++++++------------- 1 file changed, 52 insertions(+), 33 deletions(-) diff --git a/sync/statesync/state_syncer.go b/sync/statesync/state_syncer.go index 6fae233802..0cb7c33b54 100644 --- a/sync/statesync/state_syncer.go +++ b/sync/statesync/state_syncer.go @@ -56,6 +56,7 @@ type stateSync struct { // track completion and progress of work mainTrieDone chan struct{} + storageTriesDone chan struct{} triesInProgressSem chan struct{} done chan error stats *trieSyncStats @@ -79,9 +80,10 @@ func NewStateSyncer(config *StateSyncerConfig) (*stateSync, error) { // Each [trieToSync] will have a maximum of [numSegments] segments. // We set the capacity of [segments] such that [defaultNumThreads] // storage tries can sync concurrently. - segments: make(chan syncclient.LeafSyncTask, defaultNumThreads*numStorageTrieSegments), - mainTrieDone: make(chan struct{}), - done: make(chan error, 1), + segments: make(chan syncclient.LeafSyncTask, defaultNumThreads*numStorageTrieSegments), + mainTrieDone: make(chan struct{}), + storageTriesDone: make(chan struct{}), + done: make(chan error, 1), } ss.syncer = syncclient.NewCallbackLeafSyncer(config.Client, ss.segments, config.RequestSize) ss.codeSyncer = newCodeSyncer(CodeSyncerConfig{ @@ -115,7 +117,19 @@ func (t *stateSync) onStorageTrieFinished(root common.Hash) error { return err } // track the completion of this storage trie - return t.removeTrieInProgress(root) + numInProgress, err := t.removeTrieInProgress(root) + if err != nil { + return err + } + if numInProgress == 0 { + select { + case <-t.storageTriesDone: + // when the last storage trie finishes, close the segments channel + close(t.segments) + default: + } + } + return nil } // onMainTrieFinished is called after the main trie finishes syncing. @@ -131,7 +145,8 @@ func (t *stateSync) onMainTrieFinished() error { // mark the main trie done close(t.mainTrieDone) - return t.removeTrieInProgress(t.root) + _, err = t.removeTrieInProgress(t.root) + return err } // onSyncComplete is called after the account trie and @@ -167,30 +182,36 @@ func (t *stateSync) storageTrieProducer(ctx context.Context) error { return err } // If there are no storage tries, then root will be the empty hash on the first pass. - if root != (common.Hash{}) { - // acquire semaphore (to keep number of tries in progress limited) - select { - case t.triesInProgressSem <- struct{}{}: - case <-ctx.Done(): - return ctx.Err() - } + if root == (common.Hash{}) && !more { + close(t.segments) + return nil + } - // Arbitrarily use the first account for making requests to the server. - // Note: getNextTrie guarantees that if a non-nil storage root is returned, then the - // slice of account hashes is non-empty. - syncAccount := accounts[0] - // create a trieToSync for the storage trie and mark it as in progress. - storageTrie, err := NewTrieToSync(t, root, syncAccount, NewStorageTrieTask(t, root, accounts)) - if err != nil { - return err - } - t.addTrieInProgress(root, storageTrie) - storageTrie.startSyncing() // start syncing after tracking the trie as in progress + // acquire semaphore (to keep number of tries in progress limited) + select { + case t.triesInProgressSem <- struct{}{}: + case <-ctx.Done(): + return ctx.Err() + } + + // Arbitrarily use the first account for making requests to the server. + // Note: getNextTrie guarantees that if a non-nil storage root is returned, then the + // slice of account hashes is non-empty. + syncAccount := accounts[0] + + // create a trieToSync for the storage trie and mark it as in progress. + storageTrie, err := NewTrieToSync(t, root, syncAccount, NewStorageTrieTask(t, root, accounts)) + if err != nil { + return err } - // if there are no more storage tries, close - // the task queue and exit the producer. + t.addTrieInProgress(root, storageTrie) + if !more { + close(t.storageTriesDone) + } + // start syncing after tracking the trie as in progress + storageTrie.startSyncing() + if !more { - close(t.segments) return nil } } @@ -233,20 +254,18 @@ func (t *stateSync) addTrieInProgress(root common.Hash, trie *trieToSync) { t.triesInProgress[root] = trie } -// removeTrieInProgress removes root from the set of tracked -// tries in progress and notifies the storage root producer -// so it can continue in case it was paused due to the -// maximum number of tries in progress being previously reached. -func (t *stateSync) removeTrieInProgress(root common.Hash) error { +// removeTrieInProgress removes root from the set of tracked tries in progress +// and returns the number of tries in progress after the removal. +func (t *stateSync) removeTrieInProgress(root common.Hash) (int, error) { t.lock.Lock() defer t.lock.Unlock() t.stats.trieDone(root) if _, ok := t.triesInProgress[root]; !ok { - return fmt.Errorf("removeTrieInProgress for unexpected root: %s", root) + return 0, fmt.Errorf("removeTrieInProgress for unexpected root: %s", root) } delete(t.triesInProgress, root) - return nil + return len(t.triesInProgress), nil } // onSyncFailure is called if the sync fails, this writes all