Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dvovk/snapshotsstats #8935

Merged
merged 14 commits into from
Dec 8, 2023
41 changes: 27 additions & 14 deletions diagnostics/diagnostic.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package diagnostics

import (
"context"
"fmt"
"net/http"

"github.com/ledgerwatch/erigon-lib/common"
Expand All @@ -16,12 +17,11 @@ type DiagnosticClient struct {
metricsMux *http.ServeMux
node *node.ErigonNode

snapshotDownload map[string]diaglib.DownloadStatistics
fileDownload map[string]diaglib.TorrentFile
snapshotDownload diaglib.SnapshotDownloadStatistics
}

func NewDiagnosticClient(ctx *cli.Context, metricsMux *http.ServeMux, node *node.ErigonNode) *DiagnosticClient {
return &DiagnosticClient{ctx: ctx, metricsMux: metricsMux, node: node, snapshotDownload: map[string]diaglib.DownloadStatistics{}, fileDownload: map[string]diaglib.TorrentFile{}}
return &DiagnosticClient{ctx: ctx, metricsMux: metricsMux, node: node, snapshotDownload: diaglib.SnapshotDownloadStatistics{}}
}

func (d *DiagnosticClient) Setup() {
Expand All @@ -31,19 +31,32 @@ func (d *DiagnosticClient) Setup() {

func (d *DiagnosticClient) runSnapshotListener() {
go func() {
ctx, ch, cancel := diaglib.Context[diaglib.DownloadStatistics](context.Background(), 1)
ctx, ch, cancel := diaglib.Context[diaglib.SnapshotDownloadStatistics](context.Background(), 1)
defer cancel()

rootCtx, _ := common.RootContext()

diaglib.StartProviders(ctx, diaglib.TypeOf(diaglib.DownloadStatistics{}), log.Root())
diaglib.StartProviders(ctx, diaglib.TypeOf(diaglib.SnapshotDownloadStatistics{}), log.Root())
for {
select {
case <-rootCtx.Done():
cancel()
return
case info := <-ch:
d.snapshotDownload[info.StagePrefix] = info
d.snapshotDownload.Downloaded = info.Downloaded
d.snapshotDownload.Total = info.Total
d.snapshotDownload.TotalTime = info.TotalTime
d.snapshotDownload.DownloadRate = info.DownloadRate
d.snapshotDownload.UploadRate = info.UploadRate
d.snapshotDownload.Peers = info.Peers
d.snapshotDownload.Files = info.Files
d.snapshotDownload.Connections = info.Connections
d.snapshotDownload.Alloc = info.Alloc
d.snapshotDownload.Sys = info.Sys
d.snapshotDownload.DownloadFinished = info.DownloadFinished

fmt.Println("snapshotDownload", d.snapshotDownload)

if info.DownloadFinished {
return
}
Expand All @@ -53,30 +66,30 @@ func (d *DiagnosticClient) runSnapshotListener() {
}()
}

func (d *DiagnosticClient) SnapshotDownload() map[string]diaglib.DownloadStatistics {
func (d *DiagnosticClient) SnapshotDownload() diaglib.SnapshotDownloadStatistics {
return d.snapshotDownload
}

func (d *DiagnosticClient) runTorrentListener() {
go func() {
ctx, ch, cancel := diaglib.Context[diaglib.TorrentFile](context.Background(), 1)
ctx, ch, cancel := diaglib.Context[diaglib.SegmentDownloadStatistics](context.Background(), 1)
defer cancel()

rootCtx, _ := common.RootContext()

diaglib.StartProviders(ctx, diaglib.TypeOf(diaglib.TorrentFile{}), log.Root())
diaglib.StartProviders(ctx, diaglib.TypeOf(diaglib.SegmentDownloadStatistics{}), log.Root())
for {
select {
case <-rootCtx.Done():
cancel()
return
case info := <-ch:
d.fileDownload[info.Name] = info
if d.snapshotDownload.Segments == nil {
d.snapshotDownload.Segments = map[string]diaglib.SegmentDownloadStatistics{}
}

d.snapshotDownload.Segments[info.Name] = info
}
}
}()
}

func (d *DiagnosticClient) TorrentFileDownload() map[string]diaglib.TorrentFile {
return d.fileDownload
}
36 changes: 18 additions & 18 deletions erigon-lib/diagnostics/entities.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,35 +29,35 @@ type PeerStatistics struct {
TypeBytesOut map[string]uint64
}

type DownloadStatistics struct {
Downloaded uint64 `json:"downloaded"`
Total uint64 `json:"total"`
TotalTime float64 `json:"totalTime"`
DownloadRate uint64 `json:"downloadRate"`
UploadRate uint64 `json:"uploadRate"`
Peers int32 `json:"peers"`
Files int32 `json:"files"`
Connections uint64 `json:"connections"`
Alloc uint64 `json:"alloc"`
Sys uint64 `json:"sys"`
DownloadFinished bool `json:"downloadFinished"`
StagePrefix string `json:"stagePrefix"`
type SnapshotDownloadStatistics struct {
Downloaded uint64 `json:"downloaded"`
Total uint64 `json:"total"`
TotalTime float64 `json:"totalTime"`
DownloadRate uint64 `json:"downloadRate"`
UploadRate uint64 `json:"uploadRate"`
Peers int32 `json:"peers"`
Files int32 `json:"files"`
Connections uint64 `json:"connections"`
Alloc uint64 `json:"alloc"`
Sys uint64 `json:"sys"`
DownloadFinished bool `json:"downloadFinished"`
Segments map[string]SegmentDownloadStatistics `json:"segments"`
}

type TorrentFile struct {
type SegmentDownloadStatistics struct {
Name string `json:"name"`
TotalBytes uint64 `json:"totalBytes"`
DownloadedBytes uint64 `json:"downloadedBytes"`
SeedsCount int `json:"seedsCount"`
WebseedsCount int `json:"webseedsCount"`
PeersCount int `json:"peersCount"`
WebseedsRate uint64 `json:"webseedsRate"`
SeedsRate uint64 `json:"seedsRate"`
PeersRate uint64 `json:"peersRate"`
}

func (ti DownloadStatistics) Type() Type {
func (ti SnapshotDownloadStatistics) Type() Type {
return TypeOf(ti)
}

func (ti TorrentFile) Type() Type {
func (ti SegmentDownloadStatistics) Type() Type {
return TypeOf(ti)
}
14 changes: 7 additions & 7 deletions erigon-lib/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ func (d *Downloader) ReCalcStats(interval time.Duration) {
}

d.logger.Log(d.verbosity, "[snapshots] progress", "file", t.Name(), "progress", fmt.Sprintf("%.2f%%", progress), "peers", len(peersOfThisFile), "webseeds", len(weebseedPeersOfThisFile))
isDiagEnabled := diagnostics.TypeOf(diagnostics.TorrentFile{}).Enabled()
isDiagEnabled := diagnostics.TypeOf(diagnostics.SegmentDownloadStatistics{}).Enabled()
if d.verbosity < log.LvlInfo || isDiagEnabled {

// more detailed statistic: download rate of each peer (for each file)
Expand All @@ -376,28 +376,28 @@ func (d *Downloader) ReCalcStats(interval time.Duration) {

d.logger.Info(fmt.Sprintf("[snapshots] webseed peers file=%s", t.Name()), webseedRates...)
rates := make([]interface{}, 0, len(peersOfThisFile)*2)
seedsRates := uint64(0)
peersRates := uint64(0)
for _, peer := range peersOfThisFile {
dr := uint64(peer.DownloadRate())
rates = append(rates, peer.PeerClientName.Load(), fmt.Sprintf("%s/s", common.ByteCount(dr)))
seedsRates += dr
peersRates += dr
}
d.logger.Info(fmt.Sprintf("[snapshots] bittorrent peers file=%s", t.Name()), rates...)

lenght = uint64(len(peersOfThisFile))
if lenght > 0 {
seedsRates = seedsRates / uint64(len(peersOfThisFile))
peersRates = peersRates / uint64(len(peersOfThisFile))
}

if isDiagEnabled {
diagnostics.Send(diagnostics.TorrentFile{
diagnostics.Send(diagnostics.SegmentDownloadStatistics{
Name: t.Name(),
TotalBytes: uint64(t.Length()),
DownloadedBytes: uint64(t.BytesCompleted()),
SeedsCount: len(t.Metainfo().UrlList),
WebseedsCount: len(weebseedPeersOfThisFile),
PeersCount: len(peersOfThisFile),
WebseedsRate: websRates,
SeedsRate: seedsRates,
PeersRate: peersRates,
})
}
}
Expand Down
6 changes: 2 additions & 4 deletions turbo/snapshotsync/snapshotsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ Loop:
}
*/

diagnostics.Send(diagnostics.DownloadStatistics{
diagnostics.Send(diagnostics.SnapshotDownloadStatistics{
Downloaded: stats.BytesCompleted,
Total: stats.BytesTotal,
TotalTime: time.Since(downloadStartTime).Round(time.Second).Seconds(),
Expand All @@ -224,7 +224,6 @@ Loop:
Alloc: m.Alloc,
Sys: m.Sys,
DownloadFinished: stats.Completed,
StagePrefix: logPrefix,
})

log.Info(fmt.Sprintf("[%s] download finished", logPrefix), "time", time.Since(downloadStartTime).String())
Expand All @@ -241,7 +240,7 @@ Loop:
suffix += " (or verifying)"
}

diagnostics.Send(diagnostics.DownloadStatistics{
diagnostics.Send(diagnostics.SnapshotDownloadStatistics{
Downloaded: stats.BytesCompleted,
Total: stats.BytesTotal,
TotalTime: time.Since(downloadStartTime).Round(time.Second).Seconds(),
Expand All @@ -253,7 +252,6 @@ Loop:
Alloc: m.Alloc,
Sys: m.Sys,
DownloadFinished: stats.Completed,
StagePrefix: logPrefix,
})

log.Info(fmt.Sprintf("[%s] %s", logPrefix, suffix),
Expand Down
Loading