From c83790442fc2fbe292e97d7b2f86f3f939fe7523 Mon Sep 17 00:00:00 2001 From: s7v7nislands Date: Sun, 2 Apr 2023 13:56:54 +0800 Subject: [PATCH 1/3] eth/downloader: use atomic type --- eth/downloader/beaconsync.go | 3 +-- eth/downloader/downloader.go | 30 +++++++++++++++--------------- eth/downloader/downloader_test.go | 24 ++++++++++++------------ eth/downloader/queue.go | 18 +++++++++--------- eth/downloader/resultstore.go | 8 ++++---- eth/downloader/skeleton_test.go | 24 ++++++++++++------------ 6 files changed, 53 insertions(+), 54 deletions(-) diff --git a/eth/downloader/beaconsync.go b/eth/downloader/beaconsync.go index ff985e6b035f..df8af68bc798 100644 --- a/eth/downloader/beaconsync.go +++ b/eth/downloader/beaconsync.go @@ -19,7 +19,6 @@ package downloader import ( "fmt" "sync" - "sync/atomic" "time" "github.com/ethereum/go-ethereum/common" @@ -371,7 +370,7 @@ func (d *Downloader) fetchBeaconHeaders(from uint64) error { continue } // If the pivot block is committed, signal header sync termination - if atomic.LoadInt32(&d.committed) == 1 { + if d.committed.Load() { select { case d.headerProcCh <- nil: return nil diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index fb9de79912e2..a3d8a210600c 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -98,7 +98,7 @@ type headerTask struct { } type Downloader struct { - mode uint32 // Synchronisation mode defining the strategy used (per sync cycle), use d.getMode() to get the SyncMode + mode atomic.Uint32 // Synchronisation mode defining the strategy used (per sync cycle), use d.getMode() to get the SyncMode mux *event.TypeMux // Event multiplexer to announce sync operation events checkpoint uint64 // Checkpoint block number to enforce head against (e.g. snap sync) @@ -122,9 +122,9 @@ type Downloader struct { // Status synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing - synchronising int32 - notified int32 - committed int32 + synchronising atomic.Bool + notified atomic.Bool + committed atomic.Bool ancientLimit uint64 // The maximum block number which can be regarded as ancient data. // Channels @@ -292,7 +292,7 @@ func (d *Downloader) Progress() ethereum.SyncProgress { // Synchronising returns whether the downloader is currently retrieving blocks. func (d *Downloader) Synchronising() bool { - return atomic.LoadInt32(&d.synchronising) > 0 + return d.synchronising.Load() } // RegisterPeer injects a new download peer into the set of block source to be @@ -392,13 +392,13 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td, ttd *big.Int, return d.synchroniseMock(id, hash) } // Make sure only one goroutine is ever allowed past this point at once - if !atomic.CompareAndSwapInt32(&d.synchronising, 0, 1) { + if !d.synchronising.CompareAndSwap(false, true) { return errBusy } - defer atomic.StoreInt32(&d.synchronising, 0) + defer d.synchronising.Store(false) // Post a user notification of the sync (only once per session) - if atomic.CompareAndSwapInt32(&d.notified, 0, 1) { + if d.notified.CompareAndSwap(false, true) { log.Info("Block synchronisation started") } if mode == SnapSync { @@ -435,7 +435,7 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td, ttd *big.Int, defer d.Cancel() // No matter what, we can't leave the cancel channel open // Atomically set the requested sync mode - atomic.StoreUint32(&d.mode, uint32(mode)) + d.mode.Store(uint32(mode)) // Retrieve the origin peer and initiate the downloading process var p *peerConnection @@ -452,7 +452,7 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td, ttd *big.Int, } func (d *Downloader) getMode() SyncMode { - return SyncMode(atomic.LoadUint32(&d.mode)) + return SyncMode(d.mode.Load()) } // syncWithPeer starts a block synchronization based on the hash chain from the @@ -562,9 +562,9 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd * rawdb.WriteLastPivotNumber(d.stateDB, pivotNumber) } } - d.committed = 1 + d.committed.Store(true) if mode == SnapSync && pivot.Number.Uint64() != 0 { - d.committed = 0 + d.committed.Store(false) } if mode == SnapSync { // Set the ancient data limitation. If we are running snap sync, all block @@ -1128,7 +1128,7 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, head uint64) e // If no more headers are inbound, notify the content fetchers and return if len(headers) == 0 { // Don't abort header fetches while the pivot is downloading - if atomic.LoadInt32(&d.committed) == 0 && pivot <= from { + if !d.committed.Load() && pivot <= from { p.log.Debug("No headers, waiting for pivot commit") select { case <-time.After(fsHeaderContCheck): @@ -1669,7 +1669,7 @@ func (d *Downloader) processSnapSyncContent() error { results = append(append([]*fetchResult{oldPivot}, oldTail...), results...) } // Split around the pivot block and process the two sides via snap/full sync - if atomic.LoadInt32(&d.committed) == 0 { + if !d.committed.Load() { latest := results[len(results)-1].Header // If the height is above the pivot block by 2 sets, it means the pivot // become stale in the network and it was garbage collected, move to a @@ -1794,7 +1794,7 @@ func (d *Downloader) commitPivotBlock(result *fetchResult) error { if err := d.blockchain.SnapSyncCommitHead(block.Hash()); err != nil { return err } - atomic.StoreInt32(&d.committed, 1) + d.committed.Store(true) return nil } diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index a884c1e950b0..0671c15b7133 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -476,9 +476,9 @@ func testThrottling(t *testing.T, protocol uint, mode SyncMode) { tester.newPeer("peer", protocol, testChainBase.blocks[1:]) // Wrap the importer to allow stepping - blocked, proceed := uint32(0), make(chan struct{}) + blocked, proceed := atomic.Uint32{}, make(chan struct{}) tester.downloader.chainInsertHook = func(results []*fetchResult) { - atomic.StoreUint32(&blocked, uint32(len(results))) + blocked.Store(uint32(len(results))) <-proceed } // Start a synchronisation concurrently @@ -505,7 +505,7 @@ func testThrottling(t *testing.T, protocol uint, mode SyncMode) { tester.downloader.queue.resultCache.lock.Lock() { cached = tester.downloader.queue.resultCache.countCompleted() - frozen = int(atomic.LoadUint32(&blocked)) + frozen = int(blocked.Load()) retrieved = int(tester.chain.CurrentSnapBlock().Number.Uint64()) + 1 } tester.downloader.queue.resultCache.lock.Unlock() @@ -528,8 +528,8 @@ func testThrottling(t *testing.T, protocol uint, mode SyncMode) { t.Fatalf("block count mismatch: have %v, want %v (owned %v, blocked %v, target %v)", cached, blockCacheMaxItems, retrieved, frozen, targetBlocks+1) } // Permit the blocked blocks to import - if atomic.LoadUint32(&blocked) > 0 { - atomic.StoreUint32(&blocked, uint32(0)) + if blocked.Load() > 0 { + blocked.Store(uint32(0)) proceed <- struct{}{} } } @@ -786,12 +786,12 @@ func testEmptyShortCircuit(t *testing.T, protocol uint, mode SyncMode) { tester.newPeer("peer", protocol, chain.blocks[1:]) // Instrument the downloader to signal body requests - bodiesHave, receiptsHave := int32(0), int32(0) + bodiesHave, receiptsHave := atomic.Int32{}, atomic.Int32{} tester.downloader.bodyFetchHook = func(headers []*types.Header) { - atomic.AddInt32(&bodiesHave, int32(len(headers))) + bodiesHave.Add(int32(len(headers))) } tester.downloader.receiptFetchHook = func(headers []*types.Header) { - atomic.AddInt32(&receiptsHave, int32(len(headers))) + receiptsHave.Add(int32(len(headers))) } // Synchronise with the peer and make sure all blocks were retrieved if err := tester.sync("peer", nil, mode); err != nil { @@ -811,11 +811,11 @@ func testEmptyShortCircuit(t *testing.T, protocol uint, mode SyncMode) { receiptsNeeded++ } } - if int(bodiesHave) != bodiesNeeded { - t.Errorf("body retrieval count mismatch: have %v, want %v", bodiesHave, bodiesNeeded) + if int(bodiesHave.Load()) != bodiesNeeded { + t.Errorf("body retrieval count mismatch: have %v, want %v", bodiesHave.Load(), bodiesNeeded) } - if int(receiptsHave) != receiptsNeeded { - t.Errorf("receipt retrieval count mismatch: have %v, want %v", receiptsHave, receiptsNeeded) + if int(receiptsHave.Load()) != receiptsNeeded { + t.Errorf("receipt retrieval count mismatch: have %v, want %v", receiptsHave.Load(), receiptsNeeded) } } diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index 5af5068c98cf..e9907297a0b9 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -61,7 +61,7 @@ type fetchRequest struct { // fetchResult is a struct collecting partial results from data fetchers until // all outstanding pieces complete and the result as a whole can be processed. type fetchResult struct { - pending int32 // Flag telling what deliveries are outstanding + pending atomic.Int32 // Flag telling what deliveries are outstanding Header *types.Header Uncles []*types.Header @@ -75,38 +75,38 @@ func newFetchResult(header *types.Header, fastSync bool) *fetchResult { Header: header, } if !header.EmptyBody() { - item.pending |= (1 << bodyType) + item.pending.Store(item.pending.Load() | (1 << bodyType)) } else if header.WithdrawalsHash != nil { item.Withdrawals = make(types.Withdrawals, 0) } if fastSync && !header.EmptyReceipts() { - item.pending |= (1 << receiptType) + item.pending.Store(item.pending.Load() | (1 << receiptType)) } return item } // SetBodyDone flags the body as finished. func (f *fetchResult) SetBodyDone() { - if v := atomic.LoadInt32(&f.pending); (v & (1 << bodyType)) != 0 { - atomic.AddInt32(&f.pending, -1) + if v := f.pending.Load(); (v & (1 << bodyType)) != 0 { + f.pending.Add(-1) } } // AllDone checks if item is done. func (f *fetchResult) AllDone() bool { - return atomic.LoadInt32(&f.pending) == 0 + return f.pending.Load() == 0 } // SetReceiptsDone flags the receipts as finished. func (f *fetchResult) SetReceiptsDone() { - if v := atomic.LoadInt32(&f.pending); (v & (1 << receiptType)) != 0 { - atomic.AddInt32(&f.pending, -2) + if v := f.pending.Load(); (v & (1 << receiptType)) != 0 { + f.pending.Add(-2) } } // Done checks if the given type is done already func (f *fetchResult) Done(kind uint) bool { - v := atomic.LoadInt32(&f.pending) + v := f.pending.Load() return v&(1<= int32(len(r.items)) { break @@ -156,7 +156,7 @@ func (r *resultStore) countCompleted() int { break } } - atomic.StoreInt32(&r.indexIncomplete, index) + r.indexIncomplete.Store(index) return int(index) } @@ -179,7 +179,7 @@ func (r *resultStore) GetCompleted(limit int) []*fetchResult { } // Advance the expected block number of the first cache entry r.resultOffset += uint64(limit) - atomic.AddInt32(&r.indexIncomplete, int32(-limit)) + r.indexIncomplete.Add(int32(-limit)) return results } diff --git a/eth/downloader/skeleton_test.go b/eth/downloader/skeleton_test.go index b19494a7b069..6a76d78ac817 100644 --- a/eth/downloader/skeleton_test.go +++ b/eth/downloader/skeleton_test.go @@ -82,8 +82,8 @@ type skeletonTestPeer struct { serve func(origin uint64) []*types.Header // Hook to allow custom responses - served uint64 // Number of headers served by this peer - dropped uint64 // Flag whether the peer was dropped (stop responding) + served atomic.Uint64 // Number of headers served by this peer + dropped atomic.Uint64 // Flag whether the peer was dropped (stop responding) } // newSkeletonTestPeer creates a new mock peer to test the skeleton sync with. @@ -113,7 +113,7 @@ func (p *skeletonTestPeer) RequestHeadersByNumber(origin uint64, amount int, ski // Since skeleton test peer are in-memory mocks, dropping the does not make // them inaccessible. As such, check a local `dropped` field to see if the // peer has been dropped and should not respond any more. - if atomic.LoadUint64(&p.dropped) != 0 { + if p.dropped.Load() != 0 { return nil, errors.New("peer already dropped") } // Skeleton sync retrieves batches of headers going backward without gaps. @@ -161,7 +161,7 @@ func (p *skeletonTestPeer) RequestHeadersByNumber(origin uint64, amount int, ski } } } - atomic.AddUint64(&p.served, uint64(len(headers))) + p.served.Add(uint64(len(headers))) hashes := make([]common.Hash, len(headers)) for i, header := range headers { @@ -182,7 +182,7 @@ func (p *skeletonTestPeer) RequestHeadersByNumber(origin uint64, amount int, ski sink <- res if err := <-res.Done; err != nil { log.Warn("Skeleton test peer response rejected", "err", err) - atomic.AddUint64(&p.dropped, 1) + p.dropped.Add(1) } }() return req, nil @@ -817,7 +817,7 @@ func TestSkeletonSyncRetrievals(t *testing.T) { dropped := make(map[string]int) drop := func(peer string) { if p := peerset.Peer(peer); p != nil { - atomic.AddUint64(&p.peer.(*skeletonTestPeer).dropped, 1) + p.peer.(*skeletonTestPeer).dropped.Add(1) } peerset.Unregister(peer) dropped[peer]++ @@ -895,14 +895,14 @@ func TestSkeletonSyncRetrievals(t *testing.T) { if !tt.unpredictable { var served uint64 for _, peer := range tt.peers { - served += atomic.LoadUint64(&peer.served) + served += peer.served.Load() } if served != tt.midserve { t.Errorf("test %d, mid state: served headers mismatch: have %d, want %d", i, served, tt.midserve) } var drops uint64 for _, peer := range tt.peers { - drops += atomic.LoadUint64(&peer.dropped) + drops += peer.dropped.Load() } if drops != tt.middrop { t.Errorf("test %d, mid state: dropped peers mismatch: have %d, want %d", i, drops, tt.middrop) @@ -950,20 +950,20 @@ func TestSkeletonSyncRetrievals(t *testing.T) { if !tt.unpredictable { served := uint64(0) for _, peer := range tt.peers { - served += atomic.LoadUint64(&peer.served) + served += peer.served.Load() } if tt.newPeer != nil { - served += atomic.LoadUint64(&tt.newPeer.served) + served += tt.newPeer.served.Load() } if served != tt.endserve { t.Errorf("test %d, end state: served headers mismatch: have %d, want %d", i, served, tt.endserve) } drops := uint64(0) for _, peer := range tt.peers { - drops += atomic.LoadUint64(&peer.dropped) + drops += peer.dropped.Load() } if tt.newPeer != nil { - drops += atomic.LoadUint64(&tt.newPeer.dropped) + drops += tt.newPeer.dropped.Load() } if drops != tt.enddrop { t.Errorf("test %d, end state: dropped peers mismatch: have %d, want %d", i, drops, tt.middrop) From 7172748b00bef3f99dfad0b7c4406a12b6b7081b Mon Sep 17 00:00:00 2001 From: s7v7nislands Date: Mon, 3 Apr 2023 15:47:44 +0800 Subject: [PATCH 2/3] Update eth/downloader/downloader_test.go Co-authored-by: Martin Holst Swende --- eth/downloader/downloader_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 0671c15b7133..fb3b6b977b2a 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -476,7 +476,8 @@ func testThrottling(t *testing.T, protocol uint, mode SyncMode) { tester.newPeer("peer", protocol, testChainBase.blocks[1:]) // Wrap the importer to allow stepping - blocked, proceed := atomic.Uint32{}, make(chan struct{}) + var blocked atomic.Uint32 + proceed := make(chan struct{}) tester.downloader.chainInsertHook = func(results []*fetchResult) { blocked.Store(uint32(len(results))) <-proceed From 059f9d27c4296a5e4e91612d954627a35a98a342 Mon Sep 17 00:00:00 2001 From: s7v7nislands Date: Mon, 3 Apr 2023 15:52:49 +0800 Subject: [PATCH 3/3] Update eth/downloader/downloader_test.go Co-authored-by: Martin Holst Swende --- eth/downloader/downloader_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index fb3b6b977b2a..37f7a7670469 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -787,7 +787,7 @@ func testEmptyShortCircuit(t *testing.T, protocol uint, mode SyncMode) { tester.newPeer("peer", protocol, chain.blocks[1:]) // Instrument the downloader to signal body requests - bodiesHave, receiptsHave := atomic.Int32{}, atomic.Int32{} + var bodiesHave, receiptsHave atomic.Int32 tester.downloader.bodyFetchHook = func(headers []*types.Header) { bodiesHave.Add(int32(len(headers))) }