From f9504dd05a0646f43e35d1f846b68e4972bec501 Mon Sep 17 00:00:00 2001 From: dvovk Date: Tue, 2 Jul 2024 16:47:31 +0100 Subject: [PATCH 01/10] wp --- erigon-lib/diagnostics/client.go | 44 +++++++++++++++++++++++++++++ erigon-lib/diagnostics/snapshots.go | 19 ------------- erigon-lib/diagnostics/stages.go | 14 --------- erigon-lib/diagnostics/sys_info.go | 27 ++++++++++++------ 4 files changed, 63 insertions(+), 41 deletions(-) diff --git a/erigon-lib/diagnostics/client.go b/erigon-lib/diagnostics/client.go index ecadc345126..e10b1098b8a 100644 --- a/erigon-lib/diagnostics/client.go +++ b/erigon-lib/diagnostics/client.go @@ -5,6 +5,7 @@ import ( "net/http" "path/filepath" "sync" + "time" "github.com/c2h5oh/datasize" "golang.org/x/sync/semaphore" @@ -101,8 +102,51 @@ func (d *DiagnosticClient) Setup() { d.setupBodiesDiagnostics(rootCtx) d.setupResourcesUsageDiagnostics(rootCtx) d.setupSpeedtestDiagnostics(rootCtx) + d.runSaveProcess(rootCtx) //d.logDiagMsgs() + +} + +// Save diagnostic data by time interval to reduce save events +func (d *DiagnosticClient) runSaveProcess(rootCtx context.Context) { + ticker := time.NewTicker(5 * time.Minute) + go func() { + for { + select { + case <-ticker.C: + d.SaveData() + case <-rootCtx.Done(): + ticker.Stop() + return + } + } + }() +} + +func (d *DiagnosticClient) SaveData() { + err := d.db.Update(d.ctx, func(tx kv.RwTx) error { + err := SnapshotDownloadUpdater(d.syncStats.SnapshotDownload)(tx) + if err != nil { + return err + } + + err = StagesListUpdater(d.syncStages)(tx) + if err != nil { + return err + } + + err = SnapshotIndexingUpdater(d.syncStats.SnapshotIndexing)(tx) + if err != nil { + return err + } + + return nil + }) + + if err != nil { + log.Warn("Failed to save diagnostics data", "err", err) + } } /*func (d *DiagnosticClient) logDiagMsgs() { diff --git a/erigon-lib/diagnostics/snapshots.go b/erigon-lib/diagnostics/snapshots.go index f042510c6d4..00498a08a81 100644 --- a/erigon-lib/diagnostics/snapshots.go +++ b/erigon-lib/diagnostics/snapshots.go @@ -61,12 +61,6 @@ func (d *DiagnosticClient) runSnapshotListener(rootCtx context.Context) { Progress: downloadedPercent, }, "Downloading snapshots") - if err := d.db.Update(d.ctx, SnapshotDownloadUpdater(d.syncStats.SnapshotDownload)); err != nil { - log.Error("[Diagnostics] Failed to update snapshot download info", "err", err) - } - - d.saveSyncStagesToDB() - d.mu.Unlock() if d.snapshotStageFinished() { @@ -133,10 +127,6 @@ func (d *DiagnosticClient) runSegmentDownloadingListener(rootCtx context.Context d.syncStats.SnapshotDownload.SegmentsDownloading[info.Name] = info } - if err := d.db.Update(d.ctx, SnapshotDownloadUpdater(d.syncStats.SnapshotDownload)); err != nil { - log.Error("[Diagnostics] Failed to update snapshot download info", "err", err) - } - d.mu.Unlock() } } @@ -155,9 +145,6 @@ func (d *DiagnosticClient) runSegmentIndexingListener(rootCtx context.Context) { return case info := <-ch: d.addOrUpdateSegmentIndexingState(info) - if err := d.db.Update(d.ctx, SnapshotIndexingUpdater(d.syncStats.SnapshotIndexing)); err != nil { - log.Error("[Diagnostics] Failed to update snapshot indexing info", "err", err) - } } } }() @@ -192,10 +179,6 @@ func (d *DiagnosticClient) runSegmentIndexingFinishedListener(rootCtx context.Co }) } - if err := d.db.Update(d.ctx, SnapshotIndexingUpdater(d.syncStats.SnapshotIndexing)); err != nil { - log.Error("[Diagnostics] Failed to update snapshot indexing info", "err", err) - } - d.mu.Unlock() } } @@ -238,8 +221,6 @@ func (d *DiagnosticClient) addOrUpdateSegmentIndexingState(upd SnapshotIndexingS TimeLeft: "unknown", Progress: fmt.Sprintf("%d%%", totalProgress/len(d.syncStats.SnapshotIndexing.Segments)), }, "Indexing snapshots") - - d.saveSyncStagesToDB() } func (d *DiagnosticClient) runSnapshotFilesListListener(rootCtx context.Context) { diff --git a/erigon-lib/diagnostics/stages.go b/erigon-lib/diagnostics/stages.go index bc7670609e2..61682ac6f4f 100644 --- a/erigon-lib/diagnostics/stages.go +++ b/erigon-lib/diagnostics/stages.go @@ -103,8 +103,6 @@ func (d *DiagnosticClient) runSyncStagesListListener(rootCtx context.Context) { d.mu.Lock() d.SetStagesList(info.StagesList) d.mu.Unlock() - - d.saveSyncStagesToDB() } } }() @@ -124,8 +122,6 @@ func (d *DiagnosticClient) runCurrentSyncStageListener(rootCtx context.Context) d.mu.Lock() d.SetCurrentSyncStage(info) d.mu.Unlock() - - d.saveSyncStagesToDB() } } }() @@ -145,8 +141,6 @@ func (d *DiagnosticClient) runCurrentSyncSubStageListener(rootCtx context.Contex d.mu.Lock() d.SetCurrentSyncSubStage(info) d.mu.Unlock() - - d.saveSyncStagesToDB() } } }() @@ -166,19 +160,11 @@ func (d *DiagnosticClient) runSubStageListener(rootCtx context.Context) { d.mu.Lock() d.SetSubStagesList(info.Stage, info.List) d.mu.Unlock() - - d.saveSyncStagesToDB() } } }() } -func (d *DiagnosticClient) saveSyncStagesToDB() { - if err := d.db.Update(d.ctx, StagesListUpdater(d.syncStages)); err != nil { - log.Error("[Diagnostics] Failed to update stages list", "err", err) - } -} - func (d *DiagnosticClient) getCurrentSyncIdxs() CurrentSyncStagesIdxs { currentIdxs := CurrentSyncStagesIdxs{ Stage: -1, diff --git a/erigon-lib/diagnostics/sys_info.go b/erigon-lib/diagnostics/sys_info.go index b870e649324..eb3fdd52918 100644 --- a/erigon-lib/diagnostics/sys_info.go +++ b/erigon-lib/diagnostics/sys_info.go @@ -20,16 +20,27 @@ var ( func (d *DiagnosticClient) setupSysInfoDiagnostics() { sysInfo := GetSysInfo(d.dataDirPath) - if err := d.db.Update(d.ctx, RAMInfoUpdater(sysInfo.RAM)); err != nil { - log.Error("[Diagnostics] Failed to update RAM info", "err", err) - } + err := d.db.Update(d.ctx, func(tx kv.RwTx) error { + err := RAMInfoUpdater(d.hardwareInfo.RAM)(tx) + if err != nil { + return err + } - if err := d.db.Update(d.ctx, CPUInfoUpdater(sysInfo.CPU)); err != nil { - log.Error("[Diagnostics] Failed to update CPU info", "err", err) - } + err = CPUInfoUpdater(d.hardwareInfo.CPU)(tx) + if err != nil { + return err + } - if err := d.db.Update(d.ctx, DiskInfoUpdater(sysInfo.Disk)); err != nil { - log.Error("[Diagnostics] Failed to update Disk info", "err", err) + err = DiskInfoUpdater(d.hardwareInfo.Disk)(tx) + if err != nil { + return err + } + + return nil + }) + + if err != nil { + log.Warn("[Diagnostics] Failed to update system info", "err", err) } d.mu.Lock() From a4ac28521750bd2f0ed61b199e82e7caa83afcc5 Mon Sep 17 00:00:00 2001 From: dvovk Date: Thu, 4 Jul 2024 10:29:16 +0100 Subject: [PATCH 02/10] added save on node stop --- erigon-lib/diagnostics/client.go | 16 ++++++++++++++++ erigon-lib/diagnostics/snapshots.go | 2 +- erigon-lib/diagnostics/stages.go | 8 ++++++++ 3 files changed, 25 insertions(+), 1 deletion(-) diff --git a/erigon-lib/diagnostics/client.go b/erigon-lib/diagnostics/client.go index e10b1098b8a..0409f82fb02 100644 --- a/erigon-lib/diagnostics/client.go +++ b/erigon-lib/diagnostics/client.go @@ -3,8 +3,11 @@ package diagnostics import ( "context" "net/http" + "os" + "os/signal" "path/filepath" "sync" + "syscall" "time" "github.com/c2h5oh/datasize" @@ -103,11 +106,24 @@ func (d *DiagnosticClient) Setup() { d.setupResourcesUsageDiagnostics(rootCtx) d.setupSpeedtestDiagnostics(rootCtx) d.runSaveProcess(rootCtx) + d.runStopNodeListener(rootCtx) //d.logDiagMsgs() } +func (d *DiagnosticClient) runStopNodeListener(rootCtx context.Context) { + go func() { + ch := make(chan os.Signal, 1) + signal.Notify(ch, os.Interrupt, syscall.SIGTERM) + select { + case <-ch: + d.SaveData() + case <-rootCtx.Done(): + } + }() +} + // Save diagnostic data by time interval to reduce save events func (d *DiagnosticClient) runSaveProcess(rootCtx context.Context) { ticker := time.NewTicker(5 * time.Minute) diff --git a/erigon-lib/diagnostics/snapshots.go b/erigon-lib/diagnostics/snapshots.go index 00498a08a81..53e1a03a65a 100644 --- a/erigon-lib/diagnostics/snapshots.go +++ b/erigon-lib/diagnostics/snapshots.go @@ -84,7 +84,7 @@ func getPercentDownloaded(downloaded, total uint64) string { func (d *DiagnosticClient) updateSnapshotStageStats(stats SyncStageStats, subStageInfo string) { idxs := d.getCurrentSyncIdxs() if idxs.Stage == -1 || idxs.SubStage == -1 { - log.Warn("[Diagnostics] Can't find running stage or substage while updating Snapshots stage stats.", "stages:", d.syncStages, "stats:", stats, "subStageInfo:", subStageInfo) + log.Debug("[Diagnostics] Can't find running stage or substage while updating Snapshots stage stats.", "stages:", d.syncStages, "stats:", stats, "subStageInfo:", subStageInfo) return } diff --git a/erigon-lib/diagnostics/stages.go b/erigon-lib/diagnostics/stages.go index 61682ac6f4f..12ebb3249c8 100644 --- a/erigon-lib/diagnostics/stages.go +++ b/erigon-lib/diagnostics/stages.go @@ -82,6 +82,14 @@ func (ti CurrentSyncSubStage) Type() Type { return TypeOf(ti) } +type StopNodeEvent struct { + Stop bool +} + +func (ti StopNodeEvent) Type() Type { + return TypeOf(ti) +} + func (d *DiagnosticClient) setupStagesDiagnostics(rootCtx context.Context) { d.runSyncStagesListListener(rootCtx) d.runCurrentSyncStageListener(rootCtx) From 6ca50f75c6ee567688d4d47c86e5661350c8d732 Mon Sep 17 00:00:00 2001 From: dvovk Date: Thu, 4 Jul 2024 13:50:52 +0100 Subject: [PATCH 03/10] fix --- erigon-lib/diagnostics/stages.go | 8 -------- 1 file changed, 8 deletions(-) diff --git a/erigon-lib/diagnostics/stages.go b/erigon-lib/diagnostics/stages.go index 12ebb3249c8..61682ac6f4f 100644 --- a/erigon-lib/diagnostics/stages.go +++ b/erigon-lib/diagnostics/stages.go @@ -82,14 +82,6 @@ func (ti CurrentSyncSubStage) Type() Type { return TypeOf(ti) } -type StopNodeEvent struct { - Stop bool -} - -func (ti StopNodeEvent) Type() Type { - return TypeOf(ti) -} - func (d *DiagnosticClient) setupStagesDiagnostics(rootCtx context.Context) { d.runSyncStagesListListener(rootCtx) d.runCurrentSyncStageListener(rootCtx) From 310f8c53ac2806dfb4cc59e6058ac16a5424ead6 Mon Sep 17 00:00:00 2001 From: dvovk Date: Thu, 4 Jul 2024 13:56:36 +0100 Subject: [PATCH 04/10] fix --- erigon-lib/diagnostics/stages.go | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/erigon-lib/diagnostics/stages.go b/erigon-lib/diagnostics/stages.go index 61682ac6f4f..47bae6d807c 100644 --- a/erigon-lib/diagnostics/stages.go +++ b/erigon-lib/diagnostics/stages.go @@ -100,9 +100,7 @@ func (d *DiagnosticClient) runSyncStagesListListener(rootCtx context.Context) { case <-rootCtx.Done(): return case info := <-ch: - d.mu.Lock() d.SetStagesList(info.StagesList) - d.mu.Unlock() } } }() @@ -119,9 +117,7 @@ func (d *DiagnosticClient) runCurrentSyncStageListener(rootCtx context.Context) case <-rootCtx.Done(): return case info := <-ch: - d.mu.Lock() d.SetCurrentSyncStage(info) - d.mu.Unlock() } } }() @@ -138,9 +134,7 @@ func (d *DiagnosticClient) runCurrentSyncSubStageListener(rootCtx context.Contex case <-rootCtx.Done(): return case info := <-ch: - d.mu.Lock() d.SetCurrentSyncSubStage(info) - d.mu.Unlock() } } }() @@ -157,9 +151,7 @@ func (d *DiagnosticClient) runSubStageListener(rootCtx context.Context) { case <-rootCtx.Done(): return case info := <-ch: - d.mu.Lock() d.SetSubStagesList(info.Stage, info.List) - d.mu.Unlock() } } }() @@ -188,12 +180,17 @@ func (d *DiagnosticClient) getCurrentSyncIdxs() CurrentSyncStagesIdxs { } func (d *DiagnosticClient) SetStagesList(stages []SyncStage) { + d.mu.Lock() + defer d.mu.Unlock() + if len(d.syncStages) != len(stages) { d.syncStages = stages } } func (d *DiagnosticClient) SetSubStagesList(stageId string, subStages []SyncSubStage) { + d.mu.Lock() + defer d.mu.Unlock() for idx, stage := range d.syncStages { if stage.ID == stageId { if len(d.syncStages[idx].SubStages) != len(subStages) { @@ -205,6 +202,8 @@ func (d *DiagnosticClient) SetSubStagesList(stageId string, subStages []SyncSubS } func (d *DiagnosticClient) SetCurrentSyncStage(css CurrentSyncStage) { + d.mu.Lock() + defer d.mu.Unlock() isSet := false for idx, stage := range d.syncStages { if !isSet { @@ -232,6 +231,9 @@ func (d *DiagnosticClient) setSubStagesState(stadeIdx int, state StageState) { } func (d *DiagnosticClient) SetCurrentSyncSubStage(css CurrentSyncSubStage) { + d.mu.Lock() + defer d.mu.Unlock() + for idx, stage := range d.syncStages { if stage.State == Running { for subIdx, subStage := range stage.SubStages { From a39462f1a74520d88d6d25625c026ec57b78e1d2 Mon Sep 17 00:00:00 2001 From: dvovk Date: Thu, 4 Jul 2024 14:33:30 +0100 Subject: [PATCH 05/10] fix --- erigon-lib/diagnostics/client.go | 23 ++++++++++------------- erigon-lib/diagnostics/snapshots.go | 13 ++++++++++--- erigon-lib/diagnostics/sys_info.go | 22 ++++++++++------------ 3 files changed, 30 insertions(+), 28 deletions(-) diff --git a/erigon-lib/diagnostics/client.go b/erigon-lib/diagnostics/client.go index 0409f82fb02..15cf613d51a 100644 --- a/erigon-lib/diagnostics/client.go +++ b/erigon-lib/diagnostics/client.go @@ -141,20 +141,17 @@ func (d *DiagnosticClient) runSaveProcess(rootCtx context.Context) { } func (d *DiagnosticClient) SaveData() { - err := d.db.Update(d.ctx, func(tx kv.RwTx) error { - err := SnapshotDownloadUpdater(d.syncStats.SnapshotDownload)(tx) - if err != nil { - return err - } - - err = StagesListUpdater(d.syncStages)(tx) - if err != nil { - return err - } + var funcs []func(tx kv.RwTx) error + funcs = append(funcs, SnapshotDownloadUpdater(d.syncStats.SnapshotDownload)) + funcs = append(funcs, StagesListUpdater(d.syncStages)) + funcs = append(funcs, SnapshotIndexingUpdater(d.syncStats.SnapshotIndexing)) - err = SnapshotIndexingUpdater(d.syncStats.SnapshotIndexing)(tx) - if err != nil { - return err + err := d.db.Update(d.ctx, func(tx kv.RwTx) error { + for _, updater := range funcs { + updErr := updater(tx) + if updErr != nil { + return updErr + } } return nil diff --git a/erigon-lib/diagnostics/snapshots.go b/erigon-lib/diagnostics/snapshots.go index 53e1a03a65a..ffd1c031f03 100644 --- a/erigon-lib/diagnostics/snapshots.go +++ b/erigon-lib/diagnostics/snapshots.go @@ -64,6 +64,7 @@ func (d *DiagnosticClient) runSnapshotListener(rootCtx context.Context) { d.mu.Unlock() if d.snapshotStageFinished() { + d.SaveData() return } } @@ -126,7 +127,6 @@ func (d *DiagnosticClient) runSegmentDownloadingListener(rootCtx context.Context } else { d.syncStats.SnapshotDownload.SegmentsDownloading[info.Name] = info } - d.mu.Unlock() } } @@ -212,15 +212,22 @@ func (d *DiagnosticClient) addOrUpdateSegmentIndexingState(upd SnapshotIndexingS d.syncStats.SnapshotIndexing.TimeElapsed = upd.TimeElapsed totalProgress := 0 + totalProgressPercent := 0 for _, seg := range d.syncStats.SnapshotIndexing.Segments { - totalProgress += seg.Percent + totalProgressPercent += seg.Percent } + totalProgress = totalProgressPercent / len(d.syncStats.SnapshotIndexing.Segments) + d.updateSnapshotStageStats(SyncStageStats{ TimeElapsed: SecondsToHHMMString(uint64(upd.TimeElapsed)), TimeLeft: "unknown", - Progress: fmt.Sprintf("%d%%", totalProgress/len(d.syncStats.SnapshotIndexing.Segments)), + Progress: fmt.Sprintf("%d%%", totalProgress), }, "Indexing snapshots") + + if totalProgress >= 100 { + d.SaveData() + } } func (d *DiagnosticClient) runSnapshotFilesListListener(rootCtx context.Context) { diff --git a/erigon-lib/diagnostics/sys_info.go b/erigon-lib/diagnostics/sys_info.go index eb3fdd52918..3ec936f883b 100644 --- a/erigon-lib/diagnostics/sys_info.go +++ b/erigon-lib/diagnostics/sys_info.go @@ -20,20 +20,18 @@ var ( func (d *DiagnosticClient) setupSysInfoDiagnostics() { sysInfo := GetSysInfo(d.dataDirPath) - err := d.db.Update(d.ctx, func(tx kv.RwTx) error { - err := RAMInfoUpdater(d.hardwareInfo.RAM)(tx) - if err != nil { - return err - } - err = CPUInfoUpdater(d.hardwareInfo.CPU)(tx) - if err != nil { - return err - } + var funcs []func(tx kv.RwTx) error + funcs = append(funcs, RAMInfoUpdater(sysInfo.RAM)) + funcs = append(funcs, CPUInfoUpdater(sysInfo.CPU)) + funcs = append(funcs, DiskInfoUpdater(sysInfo.Disk)) - err = DiskInfoUpdater(d.hardwareInfo.Disk)(tx) - if err != nil { - return err + err := d.db.Update(d.ctx, func(tx kv.RwTx) error { + for _, updater := range funcs { + updErr := updater(tx) + if updErr != nil { + return updErr + } } return nil From 9551f2307de9ce3d6c46531a214c7919e30d1fc3 Mon Sep 17 00:00:00 2001 From: dvovk Date: Thu, 4 Jul 2024 14:37:49 +0100 Subject: [PATCH 06/10] fix --- erigon-lib/diagnostics/entities.go | 5 +++-- erigon-lib/diagnostics/snapshots.go | 7 ++++++- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/erigon-lib/diagnostics/entities.go b/erigon-lib/diagnostics/entities.go index 5eaea58f081..e9200bf25bc 100644 --- a/erigon-lib/diagnostics/entities.go +++ b/erigon-lib/diagnostics/entities.go @@ -115,8 +115,9 @@ type SegmentPeer struct { } type SnapshotIndexingStatistics struct { - Segments []SnapshotSegmentIndexingStatistics `json:"segments"` - TimeElapsed float64 `json:"timeElapsed"` + Segments []SnapshotSegmentIndexingStatistics `json:"segments"` + TimeElapsed float64 `json:"timeElapsed"` + IndexingFinished bool `json:"indexingFinished"` } type SnapshotSegmentIndexingStatistics struct { diff --git a/erigon-lib/diagnostics/snapshots.go b/erigon-lib/diagnostics/snapshots.go index ffd1c031f03..43d69cbc58a 100644 --- a/erigon-lib/diagnostics/snapshots.go +++ b/erigon-lib/diagnostics/snapshots.go @@ -145,6 +145,11 @@ func (d *DiagnosticClient) runSegmentIndexingListener(rootCtx context.Context) { return case info := <-ch: d.addOrUpdateSegmentIndexingState(info) + + if d.syncStats.SnapshotIndexing.IndexingFinished { + d.SaveData() + return + } } } }() @@ -226,7 +231,7 @@ func (d *DiagnosticClient) addOrUpdateSegmentIndexingState(upd SnapshotIndexingS }, "Indexing snapshots") if totalProgress >= 100 { - d.SaveData() + d.syncStats.SnapshotIndexing.IndexingFinished = true } } From fc1463ca82521fea2e48b54d5552dfcf05ad68bc Mon Sep 17 00:00:00 2001 From: dvovk Date: Thu, 4 Jul 2024 14:53:20 +0100 Subject: [PATCH 07/10] fix --- erigon-lib/diagnostics/snapshots.go | 41 ++++++++++++++++------------- 1 file changed, 23 insertions(+), 18 deletions(-) diff --git a/erigon-lib/diagnostics/snapshots.go b/erigon-lib/diagnostics/snapshots.go index 43d69cbc58a..436afbeb02c 100644 --- a/erigon-lib/diagnostics/snapshots.go +++ b/erigon-lib/diagnostics/snapshots.go @@ -145,6 +145,7 @@ func (d *DiagnosticClient) runSegmentIndexingListener(rootCtx context.Context) { return case info := <-ch: d.addOrUpdateSegmentIndexingState(info) + d.updateIndexingStatus() if d.syncStats.SnapshotIndexing.IndexingFinished { d.SaveData() @@ -185,11 +186,33 @@ func (d *DiagnosticClient) runSegmentIndexingFinishedListener(rootCtx context.Co } d.mu.Unlock() + + d.updateIndexingStatus() } } }() } +func (d *DiagnosticClient) updateIndexingStatus() { + totalProgress := 0 + totalProgressPercent := 0 + for _, seg := range d.syncStats.SnapshotIndexing.Segments { + totalProgressPercent += seg.Percent + } + + totalProgress = totalProgressPercent / len(d.syncStats.SnapshotIndexing.Segments) + + d.updateSnapshotStageStats(SyncStageStats{ + TimeElapsed: SecondsToHHMMString(uint64(d.syncStats.SnapshotIndexing.TimeElapsed)), + TimeLeft: "unknown", + Progress: fmt.Sprintf("%d%%", totalProgress), + }, "Indexing snapshots") + + if totalProgress >= 100 { + d.syncStats.SnapshotIndexing.IndexingFinished = true + } +} + func (d *DiagnosticClient) addOrUpdateSegmentIndexingState(upd SnapshotIndexingStatistics) { d.mu.Lock() defer d.mu.Unlock() @@ -215,24 +238,6 @@ func (d *DiagnosticClient) addOrUpdateSegmentIndexingState(upd SnapshotIndexingS } d.syncStats.SnapshotIndexing.TimeElapsed = upd.TimeElapsed - - totalProgress := 0 - totalProgressPercent := 0 - for _, seg := range d.syncStats.SnapshotIndexing.Segments { - totalProgressPercent += seg.Percent - } - - totalProgress = totalProgressPercent / len(d.syncStats.SnapshotIndexing.Segments) - - d.updateSnapshotStageStats(SyncStageStats{ - TimeElapsed: SecondsToHHMMString(uint64(upd.TimeElapsed)), - TimeLeft: "unknown", - Progress: fmt.Sprintf("%d%%", totalProgress), - }, "Indexing snapshots") - - if totalProgress >= 100 { - d.syncStats.SnapshotIndexing.IndexingFinished = true - } } func (d *DiagnosticClient) runSnapshotFilesListListener(rootCtx context.Context) { From 634374b40863fa3cbf8a88cfb885ebc043de207e Mon Sep 17 00:00:00 2001 From: dvovk Date: Thu, 4 Jul 2024 14:55:32 +0100 Subject: [PATCH 08/10] fix --- erigon-lib/diagnostics/snapshots.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/erigon-lib/diagnostics/snapshots.go b/erigon-lib/diagnostics/snapshots.go index 436afbeb02c..f451cba2b05 100644 --- a/erigon-lib/diagnostics/snapshots.go +++ b/erigon-lib/diagnostics/snapshots.go @@ -49,6 +49,7 @@ func (d *DiagnosticClient) runSnapshotListener(rootCtx context.Context) { d.syncStats.SnapshotDownload.Sys = info.Sys d.syncStats.SnapshotDownload.DownloadFinished = info.DownloadFinished d.syncStats.SnapshotDownload.TorrentMetadataReady = info.TorrentMetadataReady + d.mu.Unlock() downloadedPercent := getPercentDownloaded(info.Downloaded, info.Total) remainingBytes := info.Total - info.Downloaded @@ -61,9 +62,7 @@ func (d *DiagnosticClient) runSnapshotListener(rootCtx context.Context) { Progress: downloadedPercent, }, "Downloading snapshots") - d.mu.Unlock() - - if d.snapshotStageFinished() { + if info.DownloadFinished { d.SaveData() return } @@ -83,6 +82,8 @@ func getPercentDownloaded(downloaded, total uint64) string { } func (d *DiagnosticClient) updateSnapshotStageStats(stats SyncStageStats, subStageInfo string) { + d.mu.Lock() + defer d.mu.Unlock() idxs := d.getCurrentSyncIdxs() if idxs.Stage == -1 || idxs.SubStage == -1 { log.Debug("[Diagnostics] Can't find running stage or substage while updating Snapshots stage stats.", "stages:", d.syncStages, "stats:", stats, "subStageInfo:", subStageInfo) From 9c9bfc0f10a647b5ce2fcd08dbee456c392ae5c0 Mon Sep 17 00:00:00 2001 From: dvovk Date: Thu, 4 Jul 2024 14:56:24 +0100 Subject: [PATCH 09/10] fix --- erigon-lib/diagnostics/snapshots.go | 9 --------- 1 file changed, 9 deletions(-) diff --git a/erigon-lib/diagnostics/snapshots.go b/erigon-lib/diagnostics/snapshots.go index f451cba2b05..e924d140ad9 100644 --- a/erigon-lib/diagnostics/snapshots.go +++ b/erigon-lib/diagnostics/snapshots.go @@ -93,15 +93,6 @@ func (d *DiagnosticClient) updateSnapshotStageStats(stats SyncStageStats, subSta d.syncStages[idxs.Stage].SubStages[idxs.SubStage].Stats = stats } -func (d *DiagnosticClient) snapshotStageFinished() bool { - idx := d.getCurrentSyncIdxs() - if idx.Stage > 0 { - return true - } else { - return false - } -} - func (d *DiagnosticClient) runSegmentDownloadingListener(rootCtx context.Context) { go func() { ctx, ch, closeChannel := Context[SegmentDownloadStatistics](rootCtx, 1) From 895b85849278ac8fe9e472181a69d6f36594136d Mon Sep 17 00:00:00 2001 From: dvovk Date: Thu, 4 Jul 2024 15:21:05 +0100 Subject: [PATCH 10/10] upd --- erigon-lib/diagnostics/client.go | 4 +--- erigon-lib/diagnostics/snapshots.go | 3 +-- erigon-lib/diagnostics/sys_info.go | 4 +--- 3 files changed, 3 insertions(+), 8 deletions(-) diff --git a/erigon-lib/diagnostics/client.go b/erigon-lib/diagnostics/client.go index 15cf613d51a..5e0ca6a5c4f 100644 --- a/erigon-lib/diagnostics/client.go +++ b/erigon-lib/diagnostics/client.go @@ -142,9 +142,7 @@ func (d *DiagnosticClient) runSaveProcess(rootCtx context.Context) { func (d *DiagnosticClient) SaveData() { var funcs []func(tx kv.RwTx) error - funcs = append(funcs, SnapshotDownloadUpdater(d.syncStats.SnapshotDownload)) - funcs = append(funcs, StagesListUpdater(d.syncStages)) - funcs = append(funcs, SnapshotIndexingUpdater(d.syncStats.SnapshotIndexing)) + funcs = append(funcs, SnapshotDownloadUpdater(d.syncStats.SnapshotDownload), StagesListUpdater(d.syncStages), SnapshotIndexingUpdater(d.syncStats.SnapshotIndexing)) err := d.db.Update(d.ctx, func(tx kv.RwTx) error { for _, updater := range funcs { diff --git a/erigon-lib/diagnostics/snapshots.go b/erigon-lib/diagnostics/snapshots.go index e924d140ad9..85579efebaa 100644 --- a/erigon-lib/diagnostics/snapshots.go +++ b/erigon-lib/diagnostics/snapshots.go @@ -186,13 +186,12 @@ func (d *DiagnosticClient) runSegmentIndexingFinishedListener(rootCtx context.Co } func (d *DiagnosticClient) updateIndexingStatus() { - totalProgress := 0 totalProgressPercent := 0 for _, seg := range d.syncStats.SnapshotIndexing.Segments { totalProgressPercent += seg.Percent } - totalProgress = totalProgressPercent / len(d.syncStats.SnapshotIndexing.Segments) + totalProgress := totalProgressPercent / len(d.syncStats.SnapshotIndexing.Segments) d.updateSnapshotStageStats(SyncStageStats{ TimeElapsed: SecondsToHHMMString(uint64(d.syncStats.SnapshotIndexing.TimeElapsed)), diff --git a/erigon-lib/diagnostics/sys_info.go b/erigon-lib/diagnostics/sys_info.go index 3ec936f883b..68a7ce7d4c1 100644 --- a/erigon-lib/diagnostics/sys_info.go +++ b/erigon-lib/diagnostics/sys_info.go @@ -22,9 +22,7 @@ func (d *DiagnosticClient) setupSysInfoDiagnostics() { sysInfo := GetSysInfo(d.dataDirPath) var funcs []func(tx kv.RwTx) error - funcs = append(funcs, RAMInfoUpdater(sysInfo.RAM)) - funcs = append(funcs, CPUInfoUpdater(sysInfo.CPU)) - funcs = append(funcs, DiskInfoUpdater(sysInfo.Disk)) + funcs = append(funcs, RAMInfoUpdater(sysInfo.RAM), CPUInfoUpdater(sysInfo.CPU), DiskInfoUpdater(sysInfo.Disk)) err := d.db.Update(d.ctx, func(tx kv.RwTx) error { for _, updater := range funcs {