Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dvovk/sync file level #8931

Merged
merged 9 commits into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion diagnostics/diagnostic.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@ type DiagnosticClient struct {
node *node.ErigonNode

snapshotDownload map[string]diaglib.DownloadStatistics
fileDownload map[string]diaglib.TorrentFile
}

func NewDiagnosticClient(ctx *cli.Context, metricsMux *http.ServeMux, node *node.ErigonNode) *DiagnosticClient {
return &DiagnosticClient{ctx: ctx, metricsMux: metricsMux, node: node, snapshotDownload: map[string]diaglib.DownloadStatistics{}}
return &DiagnosticClient{ctx: ctx, metricsMux: metricsMux, node: node, snapshotDownload: map[string]diaglib.DownloadStatistics{}, fileDownload: map[string]diaglib.TorrentFile{}}
}

func (d *DiagnosticClient) Setup() {
d.runSnapshotListener()
d.runTorrentListener()
}

func (d *DiagnosticClient) runSnapshotListener() {
Expand Down Expand Up @@ -54,3 +56,27 @@ func (d *DiagnosticClient) runSnapshotListener() {
func (d *DiagnosticClient) SnapshotDownload() map[string]diaglib.DownloadStatistics {
return d.snapshotDownload
}

func (d *DiagnosticClient) runTorrentListener() {
go func() {
ctx, ch, cancel := diaglib.Context[diaglib.TorrentFile](context.Background(), 1)
defer cancel()

rootCtx, _ := common.RootContext()

diaglib.StartProviders(ctx, diaglib.TypeOf(diaglib.TorrentFile{}), log.Root())
for {
select {
case <-rootCtx.Done():
cancel()
return
case info := <-ch:
d.fileDownload[info.Name] = info
}
}
}()
}

func (d *DiagnosticClient) TorrentFileDownload() map[string]diaglib.TorrentFile {
return d.fileDownload
}
14 changes: 14 additions & 0 deletions erigon-lib/diagnostics/entities.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,20 @@ type DownloadStatistics struct {
StagePrefix string `json:"stagePrefix"`
}

type TorrentFile struct {
Name string `json:"name"`
TotalBytes uint64 `json:"totalBytes"`
DownloadedBytes uint64 `json:"downloadedBytes"`
SeedsCount int `json:"seedsCount"`
PeersCount int `json:"peersCount"`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then need reneame count field also

WebseedsRate uint64 `json:"webseedsRate"`
SeedsRate uint64 `json:"seedsRate"`
}

func (ti DownloadStatistics) Type() Type {
return TypeOf(ti)
}

func (ti TorrentFile) Type() Type {
return TypeOf(ti)
}
5 changes: 5 additions & 0 deletions erigon-lib/diagnostics/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type Type interface {
reflect.Type
Context() context.Context
Err() error
Enabled() bool
}

type diagType struct {
Expand All @@ -48,6 +49,10 @@ func (t diagType) Err() error {
return t.Context().Err()
}

func (t diagType) Enabled() bool {
return t.Err() == nil
}

type Info interface {
Type() Type
}
Expand Down
69 changes: 52 additions & 17 deletions erigon-lib/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/common/datadir"
"github.com/ledgerwatch/erigon-lib/common/dbg"
"github.com/ledgerwatch/erigon-lib/diagnostics"
"github.com/ledgerwatch/erigon-lib/downloader/downloadercfg"
"github.com/ledgerwatch/erigon-lib/downloader/snaptype"
"github.com/ledgerwatch/erigon-lib/kv"
Expand Down Expand Up @@ -117,6 +118,7 @@ func New(ctx context.Context, cfg *downloadercfg.Cfg, dirs datadir.Dirs, logger
if err := d.addTorrentFilesFromDisk(false); err != nil {
return nil, err
}

// CornerCase: no peers -> no anoncments to trackers -> no magnetlink resolution (but magnetlink has filename)
// means we can start adding weebseeds without waiting for `<-t.GotInfo()`
d.wg.Add(1)
Expand Down Expand Up @@ -348,32 +350,65 @@ func (d *Downloader) ReCalcStats(interval time.Duration) {
}

d.logger.Log(d.verbosity, "[snapshots] progress", "file", t.Name(), "progress", fmt.Sprintf("%.2f%%", progress), "peers", len(peersOfThisFile), "webseeds", len(weebseedPeersOfThisFile))
if d.verbosity < log.LvlInfo {
break //of switch
}

// more detailed statistic: download rate of each peer (for each file)
webseedRates := make([]interface{}, 0, len(weebseedPeersOfThisFile)*2)
for _, peer := range weebseedPeersOfThisFile {
urlS := strings.Trim(strings.TrimPrefix(peer.String(), "webseed peer for "), "\"")
if urlObj, err := url.Parse(urlS); err == nil {
if shortUrl, err := url.JoinPath(urlObj.Host, urlObj.Path); err == nil {
webseedRates = append(webseedRates, shortUrl, fmt.Sprintf("%s/s", common.ByteCount(uint64(peer.DownloadRate()))))
isDiagEnabled := diagnostics.TypeOf(diagnostics.TorrentFile{}).Enabled()
if d.verbosity < log.LvlInfo || isDiagEnabled {

// more detailed statistic: download rate of each peer (for each file)
websRates := uint64(0)
webseedRates := make([]interface{}, 0, len(weebseedPeersOfThisFile)*2)
for _, peer := range weebseedPeersOfThisFile {
urlS := strings.Trim(strings.TrimPrefix(peer.String(), "webseed peer for "), "\"")
if urlObj, err := url.Parse(urlS); err == nil {
if shortUrl, err := url.JoinPath(urlObj.Host, urlObj.Path); err == nil {
dr := uint64(peer.DownloadRate())
webseedRates = append(webseedRates, shortUrl, fmt.Sprintf("%s/s", common.ByteCount(dr)))
websRates += dr
}

d.logger.Log(d.verbosity, "[snapshots] progress", "name", t.Name(), "progress", fmt.Sprintf("%.2f%%", progress), "webseeds", len(t.Metainfo().UrlList), "peers", len(peersOfThisFile))
}
}

lenght := uint64(len(weebseedPeersOfThisFile))
if lenght > 0 {
websRates = websRates / lenght
}

d.logger.Info(fmt.Sprintf("[snapshots] webseed peers file=%s", t.Name()), webseedRates...)
rates := make([]interface{}, 0, len(peersOfThisFile)*2)
seedsRates := uint64(0)
for _, peer := range peersOfThisFile {
dr := uint64(peer.DownloadRate())
rates = append(rates, peer.PeerClientName.Load(), fmt.Sprintf("%s/s", common.ByteCount(dr)))
seedsRates += dr
}
d.logger.Info(fmt.Sprintf("[snapshots] bittorrent peers file=%s", t.Name()), rates...)

lenght = uint64(len(peersOfThisFile))
if lenght > 0 {
seedsRates = seedsRates / uint64(len(peersOfThisFile))
}

if isDiagEnabled {
diagnostics.Send(diagnostics.TorrentFile{
Name: t.Name(),
TotalBytes: uint64(t.Length()),
DownloadedBytes: uint64(t.BytesCompleted()),
SeedsCount: len(t.Metainfo().UrlList),
PeersCount: len(peersOfThisFile),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is small confusion happened:
Seed and Webseed is not the same thing.

Dictionary:

  • Peer - any node - it may or may not have file X
  • Seed - if you have file X, you become Seed of file X - other peers can request this file from you
  • Webseed it's Http Peer - means we can download file X from Webseed by HTTP protocol (instead of Bittorrent protocol). So, in this func: webseed and peers are "ingress traffic" for us (just different protocol).
  • Leech` - node which doen's have file X, but downloading it

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed names

WebseedsRate: websRates,
SeedsRate: seedsRates,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I advise rename “seedsRate” to “peersRate”. Just because “Seed” word is not used anywhere in erigon: logs, p2p peers, etc…

})
}
}
d.logger.Info(fmt.Sprintf("[snapshots] webseed peers file=%s", t.Name()), webseedRates...)
rates := make([]interface{}, 0, len(peersOfThisFile)*2)
for _, peer := range peersOfThisFile {
rates = append(rates, peer.PeerClientName.Load(), fmt.Sprintf("%s/s", common.ByteCount(uint64(peer.DownloadRate()))))
}
d.logger.Info(fmt.Sprintf("[snapshots] bittorrent peers file=%s", t.Name()), rates...)

default:
noMetadata = append(noMetadata, t.Name())
}

stats.Completed = stats.Completed && t.Complete.Bool()
}

if len(noMetadata) > 0 {
amount := len(noMetadata)
if len(noMetadata) > 5 {
Expand Down
Loading