-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
polygon/sync: implement header downloader #9030
Merged
Merged
Changes from all commits
Commits
Show all changes
20 commits
Select commit
Hold shift + click to select a range
ca828b1
polygon/blocksyncer: v1 implementation
taratorio cfd5206
polygon/blocksyncer: v1 implementation
taratorio 3681a36
polygon/blocksyncer: v1 implementation
taratorio 0ad65bf
polygon/blocksyncer: v1 implementation
taratorio e55959c
Heimdall wrapper
battlmonstr d350ded
Heimdall wrapper TestFetchCheckpoints1 test
battlmonstr e450ae9
Heimdall wrapper FetchCheckpoint v2
battlmonstr 634d296
Heimdall wrapper TestOnMilestoneEvent
battlmonstr 8cafd3c
rename polygon_sync to sync
battlmonstr 5801257
IHeimdallClient.FetchMilestone by index API
battlmonstr 0463fca
Heimdall wrapper FetchMilestones
battlmonstr 28b6eb4
rename package aliases without underscores
battlmonstr 36c9baf
Merge branch 'pr/astrid_sync' of github.com:ledgerwatch/erigon into a…
taratorio d42b15a
polygon/sync: implement header downloader
taratorio 322a6e8
Merge branch 'devel' of github.com:ledgerwatch/erigon into astrid-dow…
taratorio ecc54da
polygon/sync: implement header downloader
taratorio 6f54fbf
polygon/sync: implement header downloader
taratorio eea687c
polygon/sync: implement header downloader
taratorio b0a6bb1
Merge branch 'devel' of github.com:ledgerwatch/erigon into astrid-dow…
taratorio 09adfd2
polygon/sync: implement header downloader
taratorio File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
package sync | ||
|
||
import "github.com/ledgerwatch/erigon/core/types" | ||
|
||
//go:generate mockgen -destination=./mock/db_mock.go -package=mock . DB | ||
type DB interface { | ||
WriteHeaders(headers []*types.Header) error | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,214 @@ | ||
package sync | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"math" | ||
"sort" | ||
"sync" | ||
"time" | ||
|
||
lru "github.com/hashicorp/golang-lru/v2" | ||
"github.com/ledgerwatch/log/v3" | ||
|
||
"github.com/ledgerwatch/erigon-lib/common" | ||
"github.com/ledgerwatch/erigon/core/types" | ||
"github.com/ledgerwatch/erigon/polygon/sync/peerinfo" | ||
) | ||
|
||
const headerDownloaderLogPrefix = "HeaderDownloader" | ||
|
||
func NewHeaderDownloader(logger log.Logger, sentry Sentry, db DB, heimdall Heimdall, verify HeaderVerifier) *HeaderDownloader { | ||
statePointHeadersMemo, err := lru.New[common.Hash, []*types.Header](sentry.MaxPeers()) | ||
if err != nil { | ||
panic(err) | ||
} | ||
|
||
return &HeaderDownloader{ | ||
logger: logger, | ||
sentry: sentry, | ||
db: db, | ||
heimdall: heimdall, | ||
verify: verify, | ||
statePointHeadersMemo: statePointHeadersMemo, | ||
} | ||
} | ||
|
||
type HeaderDownloader struct { | ||
logger log.Logger | ||
sentry Sentry | ||
db DB | ||
heimdall Heimdall | ||
verify HeaderVerifier | ||
statePointHeadersMemo *lru.Cache[common.Hash, []*types.Header] // statePoint.rootHash->[headers part of state point] | ||
} | ||
|
||
func (hd *HeaderDownloader) DownloadUsingCheckpoints(ctx context.Context, start uint64) error { | ||
checkpoints, err := hd.heimdall.FetchCheckpoints(ctx, start) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
err = hd.downloadUsingStatePoints(ctx, statePointsFromCheckpoints(checkpoints)) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (hd *HeaderDownloader) DownloadUsingMilestones(ctx context.Context, start uint64) error { | ||
milestones, err := hd.heimdall.FetchMilestones(ctx, start) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
err = hd.downloadUsingStatePoints(ctx, statePointsFromMilestones(milestones)) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (hd *HeaderDownloader) downloadUsingStatePoints(ctx context.Context, statePoints statePoints) error { | ||
for len(statePoints) > 0 { | ||
allPeers := hd.sentry.PeersWithBlockNumInfo() | ||
if len(allPeers) == 0 { | ||
hd.logger.Warn(fmt.Sprintf("[%s] zero peers, will try again", headerDownloaderLogPrefix)) | ||
continue | ||
} | ||
|
||
sort.Sort(allPeers) // sort by block num in asc order | ||
peers := hd.choosePeers(allPeers, statePoints) | ||
if len(peers) == 0 { | ||
hd.logger.Warn( | ||
fmt.Sprintf("[%s] can't use any peers to sync, will try again", headerDownloaderLogPrefix), | ||
"start", statePoints[0].startBlock, | ||
"end", statePoints[len(statePoints)-1].endBlock, | ||
"minPeerBlockNum", allPeers[0].BlockNum, | ||
"minPeerID", allPeers[0].ID, | ||
) | ||
continue | ||
} | ||
|
||
peerCount := len(peers) | ||
statePointsBatch := statePoints[:peerCount] | ||
hd.logger.Info( | ||
fmt.Sprintf("[%s] downloading headers", headerDownloaderLogPrefix), | ||
"start", statePointsBatch[0].startBlock, | ||
"end", statePointsBatch[len(statePointsBatch)-1].endBlock, | ||
"kind", statePointsBatch[0].kind, | ||
"peerCount", peerCount, | ||
) | ||
|
||
headerBatches := make([][]*types.Header, len(statePointsBatch)) | ||
maxStatePointLength := float64(0) | ||
wg := sync.WaitGroup{} | ||
for i, point := range statePointsBatch { | ||
maxStatePointLength = math.Max(float64(point.length()), maxStatePointLength) | ||
wg.Add(1) | ||
go func(i int, statePoint *statePoint, peerID string) { | ||
defer wg.Done() | ||
|
||
if headers, ok := hd.statePointHeadersMemo.Get(statePoint.rootHash); ok { | ||
headerBatches[i] = headers | ||
return | ||
} | ||
|
||
headers, err := hd.sentry.DownloadHeaders(ctx, statePoint.startBlock, statePoint.endBlock, peerID) | ||
if err != nil { | ||
hd.logger.Debug( | ||
fmt.Sprintf("[%s] issue downloading headers, will try again", headerDownloaderLogPrefix), | ||
"err", err, | ||
"start", statePoint.startBlock, | ||
"end", statePoint.endBlock, | ||
"rootHash", statePoint.rootHash, | ||
"kind", statePoint.kind, | ||
"peerID", peerID, | ||
) | ||
return | ||
} | ||
|
||
if err := hd.verify(statePoint, headers); err != nil { | ||
hd.logger.Debug( | ||
fmt.Sprintf( | ||
"[%s] bad headers received from peer for state point - penalizing and will try again", | ||
headerDownloaderLogPrefix, | ||
), | ||
"start", statePoint.startBlock, | ||
"end", statePoint.endBlock, | ||
"rootHash", statePoint.rootHash, | ||
"kind", statePoint.kind, | ||
"peerID", peerID, | ||
) | ||
|
||
hd.sentry.Penalize(peerID) | ||
return | ||
} | ||
|
||
hd.statePointHeadersMemo.Add(statePoint.rootHash, headers) | ||
headerBatches[i] = headers | ||
}(i, point, peers[i].ID) | ||
} | ||
|
||
wg.Wait() | ||
headers := make([]*types.Header, 0, int(maxStatePointLength)*peerCount) | ||
gapIndex := -1 | ||
for i, headerBatch := range headerBatches { | ||
if len(headerBatch) == 0 { | ||
hd.logger.Debug( | ||
fmt.Sprintf("[%s] no headers, will try again", headerDownloaderLogPrefix), | ||
"start", statePointsBatch[i].startBlock, | ||
"end", statePointsBatch[i].endBlock, | ||
"rootHash", statePointsBatch[i].rootHash, | ||
"kind", statePointsBatch[i].kind, | ||
) | ||
|
||
gapIndex = i | ||
break | ||
} | ||
|
||
headers = append(headers, headerBatch...) | ||
} | ||
|
||
if gapIndex >= 0 { | ||
statePoints = statePoints[gapIndex:] | ||
} else { | ||
statePoints = statePoints[len(statePointsBatch):] | ||
} | ||
|
||
dbWriteStartTime := time.Now() | ||
if err := hd.db.WriteHeaders(headers); err != nil { | ||
return err | ||
} | ||
|
||
hd.logger.Debug( | ||
fmt.Sprintf("[%s] wrote headers to db", headerDownloaderLogPrefix), | ||
"numHeaders", len(headers), | ||
"time", time.Since(dbWriteStartTime), | ||
) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// choosePeers assumes peers are sorted in ascending order based on block num | ||
func (hd *HeaderDownloader) choosePeers(peers peerinfo.PeersWithBlockNumInfo, statePoints statePoints) peerinfo.PeersWithBlockNumInfo { | ||
var peersIdx int | ||
chosenPeers := make(peerinfo.PeersWithBlockNumInfo, 0, len(peers)) | ||
for _, statePoint := range statePoints { | ||
if peersIdx >= len(peers) { | ||
break | ||
} | ||
|
||
peer := peers[peersIdx] | ||
if peer.BlockNum.Cmp(statePoint.endBlock) > -1 { | ||
chosenPeers = append(chosenPeers, peer) | ||
} | ||
|
||
peersIdx++ | ||
} | ||
|
||
return chosenPeers | ||
} |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@battlmonstr if this ever happens, ie we have all 100 max peers but all are not synced enough (highly unlikely I suppose but possible) we can add a sentry function to temporarily disconnect them for some time interval so we can connect to others that may be synced further ahead - what do you think? would a disconnect be the same as penalize in this case or penalized means we will never connect to the penalized peer again in the future?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
work for a future PR regardless