Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

polygon/sync: implement header downloader #9030

Merged
merged 20 commits into from
Dec 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
ca828b1
polygon/blocksyncer: v1 implementation
taratorio Dec 19, 2023
cfd5206
polygon/blocksyncer: v1 implementation
taratorio Dec 19, 2023
3681a36
polygon/blocksyncer: v1 implementation
taratorio Dec 19, 2023
0ad65bf
polygon/blocksyncer: v1 implementation
taratorio Dec 19, 2023
e55959c
Heimdall wrapper
battlmonstr Dec 18, 2023
d350ded
Heimdall wrapper TestFetchCheckpoints1 test
battlmonstr Dec 18, 2023
e450ae9
Heimdall wrapper FetchCheckpoint v2
battlmonstr Dec 19, 2023
634d296
Heimdall wrapper TestOnMilestoneEvent
battlmonstr Dec 19, 2023
8cafd3c
rename polygon_sync to sync
battlmonstr Dec 19, 2023
5801257
IHeimdallClient.FetchMilestone by index API
battlmonstr Dec 19, 2023
0463fca
Heimdall wrapper FetchMilestones
battlmonstr Dec 19, 2023
28b6eb4
rename package aliases without underscores
battlmonstr Dec 20, 2023
36c9baf
Merge branch 'pr/astrid_sync' of github.com:ledgerwatch/erigon into a…
taratorio Dec 20, 2023
d42b15a
polygon/sync: implement header downloader
taratorio Dec 20, 2023
322a6e8
Merge branch 'devel' of github.com:ledgerwatch/erigon into astrid-dow…
taratorio Dec 20, 2023
ecc54da
polygon/sync: implement header downloader
taratorio Dec 20, 2023
6f54fbf
polygon/sync: implement header downloader
taratorio Dec 20, 2023
eea687c
polygon/sync: implement header downloader
taratorio Dec 21, 2023
b0a6bb1
Merge branch 'devel' of github.com:ledgerwatch/erigon into astrid-dow…
taratorio Dec 21, 2023
09adfd2
polygon/sync: implement header downloader
taratorio Dec 21, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions polygon/sync/db.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package sync

import "github.com/ledgerwatch/erigon/core/types"

//go:generate mockgen -destination=./mock/db_mock.go -package=mock . DB
type DB interface {
WriteHeaders(headers []*types.Header) error
}
214 changes: 214 additions & 0 deletions polygon/sync/header_downloader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
package sync

import (
"context"
"fmt"
"math"
"sort"
"sync"
"time"

lru "github.com/hashicorp/golang-lru/v2"
"github.com/ledgerwatch/log/v3"

"github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon/core/types"
"github.com/ledgerwatch/erigon/polygon/sync/peerinfo"
)

const headerDownloaderLogPrefix = "HeaderDownloader"

func NewHeaderDownloader(logger log.Logger, sentry Sentry, db DB, heimdall Heimdall, verify HeaderVerifier) *HeaderDownloader {
statePointHeadersMemo, err := lru.New[common.Hash, []*types.Header](sentry.MaxPeers())
if err != nil {
panic(err)
}

return &HeaderDownloader{
logger: logger,
sentry: sentry,
db: db,
heimdall: heimdall,
verify: verify,
statePointHeadersMemo: statePointHeadersMemo,
}
}

type HeaderDownloader struct {
logger log.Logger
sentry Sentry
db DB
heimdall Heimdall
verify HeaderVerifier
statePointHeadersMemo *lru.Cache[common.Hash, []*types.Header] // statePoint.rootHash->[headers part of state point]
}

func (hd *HeaderDownloader) DownloadUsingCheckpoints(ctx context.Context, start uint64) error {
checkpoints, err := hd.heimdall.FetchCheckpoints(ctx, start)
if err != nil {
return err
}

err = hd.downloadUsingStatePoints(ctx, statePointsFromCheckpoints(checkpoints))
if err != nil {
return err
}

return nil
}

func (hd *HeaderDownloader) DownloadUsingMilestones(ctx context.Context, start uint64) error {
milestones, err := hd.heimdall.FetchMilestones(ctx, start)
if err != nil {
return err
}

err = hd.downloadUsingStatePoints(ctx, statePointsFromMilestones(milestones))
if err != nil {
return err
}

return nil
}

func (hd *HeaderDownloader) downloadUsingStatePoints(ctx context.Context, statePoints statePoints) error {
for len(statePoints) > 0 {
allPeers := hd.sentry.PeersWithBlockNumInfo()
if len(allPeers) == 0 {
hd.logger.Warn(fmt.Sprintf("[%s] zero peers, will try again", headerDownloaderLogPrefix))
continue
}

sort.Sort(allPeers) // sort by block num in asc order
peers := hd.choosePeers(allPeers, statePoints)
if len(peers) == 0 {
hd.logger.Warn(
fmt.Sprintf("[%s] can't use any peers to sync, will try again", headerDownloaderLogPrefix),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@battlmonstr if this ever happens, ie we have all 100 max peers but all are not synced enough (highly unlikely I suppose but possible) we can add a sentry function to temporarily disconnect them for some time interval so we can connect to others that may be synced further ahead - what do you think? would a disconnect be the same as penalize in this case or penalized means we will never connect to the penalized peer again in the future?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

work for a future PR regardless

"start", statePoints[0].startBlock,
"end", statePoints[len(statePoints)-1].endBlock,
"minPeerBlockNum", allPeers[0].BlockNum,
"minPeerID", allPeers[0].ID,
)
continue
}

peerCount := len(peers)
statePointsBatch := statePoints[:peerCount]
hd.logger.Info(
fmt.Sprintf("[%s] downloading headers", headerDownloaderLogPrefix),
"start", statePointsBatch[0].startBlock,
"end", statePointsBatch[len(statePointsBatch)-1].endBlock,
"kind", statePointsBatch[0].kind,
"peerCount", peerCount,
)

headerBatches := make([][]*types.Header, len(statePointsBatch))
maxStatePointLength := float64(0)
wg := sync.WaitGroup{}
for i, point := range statePointsBatch {
maxStatePointLength = math.Max(float64(point.length()), maxStatePointLength)
wg.Add(1)
go func(i int, statePoint *statePoint, peerID string) {
defer wg.Done()

if headers, ok := hd.statePointHeadersMemo.Get(statePoint.rootHash); ok {
headerBatches[i] = headers
return
}

headers, err := hd.sentry.DownloadHeaders(ctx, statePoint.startBlock, statePoint.endBlock, peerID)
if err != nil {
hd.logger.Debug(
fmt.Sprintf("[%s] issue downloading headers, will try again", headerDownloaderLogPrefix),
"err", err,
"start", statePoint.startBlock,
"end", statePoint.endBlock,
"rootHash", statePoint.rootHash,
"kind", statePoint.kind,
"peerID", peerID,
)
return
}

if err := hd.verify(statePoint, headers); err != nil {
hd.logger.Debug(
fmt.Sprintf(
"[%s] bad headers received from peer for state point - penalizing and will try again",
headerDownloaderLogPrefix,
),
"start", statePoint.startBlock,
"end", statePoint.endBlock,
"rootHash", statePoint.rootHash,
"kind", statePoint.kind,
"peerID", peerID,
)

hd.sentry.Penalize(peerID)
return
}

hd.statePointHeadersMemo.Add(statePoint.rootHash, headers)
headerBatches[i] = headers
}(i, point, peers[i].ID)
}

wg.Wait()
headers := make([]*types.Header, 0, int(maxStatePointLength)*peerCount)
gapIndex := -1
for i, headerBatch := range headerBatches {
if len(headerBatch) == 0 {
hd.logger.Debug(
fmt.Sprintf("[%s] no headers, will try again", headerDownloaderLogPrefix),
"start", statePointsBatch[i].startBlock,
"end", statePointsBatch[i].endBlock,
"rootHash", statePointsBatch[i].rootHash,
"kind", statePointsBatch[i].kind,
)

gapIndex = i
break
}

headers = append(headers, headerBatch...)
}

if gapIndex >= 0 {
statePoints = statePoints[gapIndex:]
} else {
statePoints = statePoints[len(statePointsBatch):]
}

dbWriteStartTime := time.Now()
if err := hd.db.WriteHeaders(headers); err != nil {
return err
}

hd.logger.Debug(
fmt.Sprintf("[%s] wrote headers to db", headerDownloaderLogPrefix),
"numHeaders", len(headers),
"time", time.Since(dbWriteStartTime),
)
}

return nil
}

// choosePeers assumes peers are sorted in ascending order based on block num
func (hd *HeaderDownloader) choosePeers(peers peerinfo.PeersWithBlockNumInfo, statePoints statePoints) peerinfo.PeersWithBlockNumInfo {
var peersIdx int
chosenPeers := make(peerinfo.PeersWithBlockNumInfo, 0, len(peers))
for _, statePoint := range statePoints {
if peersIdx >= len(peers) {
break
}

peer := peers[peersIdx]
if peer.BlockNum.Cmp(statePoint.endBlock) > -1 {
chosenPeers = append(chosenPeers, peer)
}

peersIdx++
}

return chosenPeers
}
Loading
Loading