Skip to content
This repository has been archived by the owner on Sep 23, 2023. It is now read-only.

Commit

Permalink
Add BorEvents snapshot type (#1051)
Browse files Browse the repository at this point in the history
Co-authored-by: Alex Sharp <alexsharp@Alexs-MacBook-Pro-2.local>
  • Loading branch information
2 people authored and AskAlexSharov committed Sep 6, 2023
1 parent 69530a4 commit 766d520
Show file tree
Hide file tree
Showing 10 changed files with 392 additions and 128 deletions.
4 changes: 4 additions & 0 deletions direct/eth_backend_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,3 +210,7 @@ func (s *EthBackendClientDirect) Peers(ctx context.Context, in *emptypb.Empty, o
func (s *EthBackendClientDirect) PendingBlock(ctx context.Context, in *emptypb.Empty, opts ...grpc.CallOption) (*remote.PendingBlockReply, error) {
return s.server.PendingBlock(ctx, in)
}

func (s *EthBackendClientDirect) BorEvent(ctx context.Context, in *remote.BorEventRequest, opts ...grpc.CallOption) (*remote.BorEventReply, error) {
return s.server.BorEvent(ctx, in)
}
53 changes: 45 additions & 8 deletions downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/mdbx"
"github.com/ledgerwatch/log/v3"
"golang.org/x/exp/maps"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
)
Expand Down Expand Up @@ -65,7 +66,8 @@ type AggStats struct {
Completed bool
Progress float32

BytesCompleted, BytesTotal uint64
BytesCompleted, BytesTotal uint64
DroppedCompleted, DroppedTotal uint64

BytesDownload, BytesUpload uint64
UploadRate, DownloadRate uint64
Expand Down Expand Up @@ -132,24 +134,64 @@ func (d *Downloader) mainLoop(ctx context.Context, silent bool) {
var sem = semaphore.NewWeighted(int64(d.cfg.DownloadSlots))

go func() {
// Torrents that are already taken care of
torrentMap := map[metainfo.Hash]struct{}{}
// First loop drops torrents that were downloaded or are already complete
// This improves efficiency of download by reducing number of active torrent (empirical observation)
for torrents := d.Torrent().Torrents(); len(torrents) > 0; torrents = d.Torrent().Torrents() {
for _, t := range torrents {
if _, already := torrentMap[t.InfoHash()]; already {
continue
}
<-t.GotInfo()
if t.Complete.Bool() {
atomic.AddUint64(&d.stats.DroppedCompleted, uint64(t.BytesCompleted()))
atomic.AddUint64(&d.stats.DroppedTotal, uint64(t.Length()))
t.Drop()
torrentMap[t.InfoHash()] = struct{}{}
continue
}
if err := sem.Acquire(ctx, 1); err != nil {
return
}
t.AllowDataDownload()
t.DownloadAll()
torrentMap[t.InfoHash()] = struct{}{}
go func(t *torrent.Torrent) {
defer sem.Release(1)
<-t.Complete.On()
atomic.AddUint64(&d.stats.DroppedCompleted, uint64(t.BytesCompleted()))
atomic.AddUint64(&d.stats.DroppedTotal, uint64(t.Length()))
t.Drop()
}(t)
}
}
atomic.StoreUint64(&d.stats.DroppedCompleted, 0)
atomic.StoreUint64(&d.stats.DroppedTotal, 0)
d.addSegments()
maps.Clear(torrentMap)
for {
torrents := d.Torrent().Torrents()
for _, t := range torrents {
if _, already := torrentMap[t.InfoHash()]; already {
continue
}
<-t.GotInfo()
if t.Complete.Bool() {
torrentMap[t.InfoHash()] = struct{}{}
continue
}
if err := sem.Acquire(ctx, 1); err != nil {
return
}
t.AllowDataDownload()
t.DownloadAll()
torrentMap[t.InfoHash()] = struct{}{}
go func(t *torrent.Torrent) {
defer sem.Release(1)
//r := t.NewReader()
//r.SetReadahead(t.Length())
//_, _ = io.Copy(io.Discard, r) // enable streaming - it will prioritize sequential download

<-t.Complete.On()
}(t)
}
Expand Down Expand Up @@ -179,11 +221,6 @@ func (d *Downloader) mainLoop(ctx context.Context, silent bool) {

stats := d.Stats()

if stats.MetadataReady < stats.FilesTotal {
log.Info(fmt.Sprintf("[snapshots] Waiting for torrents metadata: %d/%d", stats.MetadataReady, stats.FilesTotal))
continue
}

if stats.Completed {
if justCompleted {
justCompleted = false
Expand Down Expand Up @@ -237,7 +274,7 @@ func (d *Downloader) ReCalcStats(interval time.Duration) {
stats.BytesDownload = uint64(connStats.BytesReadUsefulIntendedData.Int64())
stats.BytesUpload = uint64(connStats.BytesWrittenData.Int64())

stats.BytesTotal, stats.BytesCompleted, stats.ConnectionsTotal, stats.MetadataReady = 0, 0, 0, 0
stats.BytesTotal, stats.BytesCompleted, stats.ConnectionsTotal, stats.MetadataReady = atomic.LoadUint64(&stats.DroppedTotal), atomic.LoadUint64(&stats.DroppedCompleted), 0, 0
for _, t := range torrents {
select {
case <-t.GotInfo():
Expand Down
14 changes: 14 additions & 0 deletions downloader/snaptype/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ const (
Headers Type = iota
Bodies
Transactions
BorEvents
BorSpans
NumberOfTypes
)

Expand All @@ -45,6 +47,10 @@ func (ft Type) String() string {
return "bodies"
case Transactions:
return "transactions"
case BorEvents:
return "borevents"
case BorSpans:
return "borspans"
default:
panic(fmt.Sprintf("unknown file type: %d", ft))
}
Expand All @@ -58,6 +64,10 @@ func ParseFileType(s string) (Type, bool) {
return Bodies, true
case "transactions":
return Transactions, true
case "borevents":
return BorEvents, true
case "borspans":
return BorSpans, true
default:
return NumberOfTypes, false
}
Expand Down Expand Up @@ -141,6 +151,10 @@ func ParseFileName(dir, fileName string) (res FileInfo, err error) {
snapshotType = Bodies
case Transactions:
snapshotType = Transactions
case BorEvents:
snapshotType = BorEvents
case BorSpans:
snapshotType = BorSpans
default:
return res, fmt.Errorf("unexpected snapshot suffix: %s,%w", parts[2], ErrInvalidFileName)
}
Expand Down
7 changes: 5 additions & 2 deletions downloader/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,9 @@ func buildTorrentIfNeed(fName, root string) (err error) {
if dir2.FileExist(fPath + ".torrent") {
return
}
if !dir2.FileExist(fPath) {
return
}
info := &metainfo.Info{PieceLength: downloadercfg.DefaultPieceSize, Name: fName}
if err := info.BuildFromFilePath(fPath); err != nil {
return fmt.Errorf("createTorrentFileFromSegment: %w", err)
Expand Down Expand Up @@ -232,7 +235,7 @@ func BuildTorrentFilesIfNeed(ctx context.Context, snapDir string) ([]string, err
workers := cmp.Max(1, runtime.GOMAXPROCS(-1)-1) * 2
var sem = semaphore.NewWeighted(int64(workers))
i := atomic.Int32{}
for _, f := range files {
for _, file := range files {
wg.Add(1)
if err := sem.Acquire(ctx, 1); err != nil {
return nil, err
Expand All @@ -252,7 +255,7 @@ func BuildTorrentFilesIfNeed(ctx context.Context, snapDir string) ([]string, err
case <-logEvery.C:
log.Info("[snapshots] Creating .torrent files", "Progress", fmt.Sprintf("%d/%d", i.Load(), len(files)))
}
}(f)
}(file)
}
go func() {
wg.Wait()
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/ledgerwatch/erigon-lib
go 1.19

require (
github.com/ledgerwatch/interfaces v0.0.0-20230811182153-2fcb75060567
github.com/ledgerwatch/interfaces v0.0.0-20230818152001-a8f70b6e9ac6
github.com/ledgerwatch/log/v3 v3.8.0
github.com/ledgerwatch/secp256k1 v1.0.0
github.com/ledgerwatch/trackerslist v1.1.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,8 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c=
github.com/ledgerwatch/interfaces v0.0.0-20230811182153-2fcb75060567 h1:ZZGeye8uJaIYvOmI2TbAdV5Oo9j8+SA4dXlK6y3GJsY=
github.com/ledgerwatch/interfaces v0.0.0-20230811182153-2fcb75060567/go.mod h1:ugQv1QllJzBny3cKZKxUrSnykkjkBgm27eQM6dnGAcc=
github.com/ledgerwatch/interfaces v0.0.0-20230818152001-a8f70b6e9ac6 h1:kvmYo8Q0ovpRjk/HhRGaQmQCVGDumLu/+ECt2TW0yKI=
github.com/ledgerwatch/interfaces v0.0.0-20230818152001-a8f70b6e9ac6/go.mod h1:ugQv1QllJzBny3cKZKxUrSnykkjkBgm27eQM6dnGAcc=
github.com/ledgerwatch/log/v3 v3.8.0 h1:gCpp7uGtIerEz1jKVPeDnbIopFPud9ZnCpBLlLBGqPU=
github.com/ledgerwatch/log/v3 v3.8.0/go.mod h1:J2Jl6zV/58LeA6LTaVVnCGyf1/cYYSEOOLHY4ZN8S2A=
github.com/ledgerwatch/secp256k1 v1.0.0 h1:Usvz87YoTG0uePIV8woOof5cQnLXGYa162rFf3YnwaQ=
Expand Down
Loading

0 comments on commit 766d520

Please sign in to comment.