Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Diagnostics: Optimize db write #11016

Merged
merged 10 commits into from
Jul 4, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions erigon-lib/diagnostics/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@ package diagnostics
import (
"context"
"net/http"
"os"
"os/signal"
"path/filepath"
"sync"
"syscall"
"time"

"github.com/c2h5oh/datasize"
"golang.org/x/sync/semaphore"
Expand Down Expand Up @@ -101,8 +105,64 @@ func (d *DiagnosticClient) Setup() {
d.setupBodiesDiagnostics(rootCtx)
d.setupResourcesUsageDiagnostics(rootCtx)
d.setupSpeedtestDiagnostics(rootCtx)
d.runSaveProcess(rootCtx)
d.runStopNodeListener(rootCtx)

//d.logDiagMsgs()

}

func (d *DiagnosticClient) runStopNodeListener(rootCtx context.Context) {
go func() {
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
select {
case <-ch:
d.SaveData()
case <-rootCtx.Done():
}
}()
}

// Save diagnostic data by time interval to reduce save events
func (d *DiagnosticClient) runSaveProcess(rootCtx context.Context) {
ticker := time.NewTicker(5 * time.Minute)
go func() {
for {
select {
case <-ticker.C:
d.SaveData()
case <-rootCtx.Done():
ticker.Stop()
return
}
}
}()
}

func (d *DiagnosticClient) SaveData() {
err := d.db.Update(d.ctx, func(tx kv.RwTx) error {
err := SnapshotDownloadUpdater(d.syncStats.SnapshotDownload)(tx)
awskii marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}

err = StagesListUpdater(d.syncStages)(tx)
if err != nil {
return err
}

err = SnapshotIndexingUpdater(d.syncStats.SnapshotIndexing)(tx)
if err != nil {
return err
}

return nil
})

if err != nil {
log.Warn("Failed to save diagnostics data", "err", err)
}
}

/*func (d *DiagnosticClient) logDiagMsgs() {
Expand Down
21 changes: 1 addition & 20 deletions erigon-lib/diagnostics/snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,6 @@ func (d *DiagnosticClient) runSnapshotListener(rootCtx context.Context) {
Progress: downloadedPercent,
}, "Downloading snapshots")

if err := d.db.Update(d.ctx, SnapshotDownloadUpdater(d.syncStats.SnapshotDownload)); err != nil {
log.Error("[Diagnostics] Failed to update snapshot download info", "err", err)
}

d.saveSyncStagesToDB()

d.mu.Unlock()

if d.snapshotStageFinished() {
Expand All @@ -90,7 +84,7 @@ func getPercentDownloaded(downloaded, total uint64) string {
func (d *DiagnosticClient) updateSnapshotStageStats(stats SyncStageStats, subStageInfo string) {
idxs := d.getCurrentSyncIdxs()
if idxs.Stage == -1 || idxs.SubStage == -1 {
log.Warn("[Diagnostics] Can't find running stage or substage while updating Snapshots stage stats.", "stages:", d.syncStages, "stats:", stats, "subStageInfo:", subStageInfo)
log.Debug("[Diagnostics] Can't find running stage or substage while updating Snapshots stage stats.", "stages:", d.syncStages, "stats:", stats, "subStageInfo:", subStageInfo)
return
}

Expand Down Expand Up @@ -133,10 +127,6 @@ func (d *DiagnosticClient) runSegmentDownloadingListener(rootCtx context.Context
d.syncStats.SnapshotDownload.SegmentsDownloading[info.Name] = info
}

if err := d.db.Update(d.ctx, SnapshotDownloadUpdater(d.syncStats.SnapshotDownload)); err != nil {
log.Error("[Diagnostics] Failed to update snapshot download info", "err", err)
}

d.mu.Unlock()
}
}
Expand All @@ -155,9 +145,6 @@ func (d *DiagnosticClient) runSegmentIndexingListener(rootCtx context.Context) {
return
case info := <-ch:
d.addOrUpdateSegmentIndexingState(info)
if err := d.db.Update(d.ctx, SnapshotIndexingUpdater(d.syncStats.SnapshotIndexing)); err != nil {
log.Error("[Diagnostics] Failed to update snapshot indexing info", "err", err)
}
}
}
}()
Expand Down Expand Up @@ -192,10 +179,6 @@ func (d *DiagnosticClient) runSegmentIndexingFinishedListener(rootCtx context.Co
})
}

if err := d.db.Update(d.ctx, SnapshotIndexingUpdater(d.syncStats.SnapshotIndexing)); err != nil {
log.Error("[Diagnostics] Failed to update snapshot indexing info", "err", err)
}

d.mu.Unlock()
}
}
Expand Down Expand Up @@ -238,8 +221,6 @@ func (d *DiagnosticClient) addOrUpdateSegmentIndexingState(upd SnapshotIndexingS
TimeLeft: "unknown",
Progress: fmt.Sprintf("%d%%", totalProgress/len(d.syncStats.SnapshotIndexing.Segments)),
}, "Indexing snapshots")

d.saveSyncStagesToDB()
}

func (d *DiagnosticClient) runSnapshotFilesListListener(rootCtx context.Context) {
Expand Down
22 changes: 8 additions & 14 deletions erigon-lib/diagnostics/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,14 @@ func (ti CurrentSyncSubStage) Type() Type {
return TypeOf(ti)
}

type StopNodeEvent struct {
Stop bool
}

func (ti StopNodeEvent) Type() Type {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why to use type for event while before type is used for stages?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forgot to delete this, it isn't in use at all.

return TypeOf(ti)
}

func (d *DiagnosticClient) setupStagesDiagnostics(rootCtx context.Context) {
d.runSyncStagesListListener(rootCtx)
d.runCurrentSyncStageListener(rootCtx)
Expand All @@ -103,8 +111,6 @@ func (d *DiagnosticClient) runSyncStagesListListener(rootCtx context.Context) {
d.mu.Lock()
d.SetStagesList(info.StagesList)
d.mu.Unlock()

d.saveSyncStagesToDB()
}
}
}()
Expand All @@ -124,8 +130,6 @@ func (d *DiagnosticClient) runCurrentSyncStageListener(rootCtx context.Context)
d.mu.Lock()
awskii marked this conversation as resolved.
Show resolved Hide resolved
d.SetCurrentSyncStage(info)
d.mu.Unlock()

d.saveSyncStagesToDB()
}
}
}()
Expand All @@ -145,8 +149,6 @@ func (d *DiagnosticClient) runCurrentSyncSubStageListener(rootCtx context.Contex
d.mu.Lock()
d.SetCurrentSyncSubStage(info)
d.mu.Unlock()

d.saveSyncStagesToDB()
}
}
}()
Expand All @@ -166,19 +168,11 @@ func (d *DiagnosticClient) runSubStageListener(rootCtx context.Context) {
d.mu.Lock()
d.SetSubStagesList(info.Stage, info.List)
d.mu.Unlock()

d.saveSyncStagesToDB()
}
}
}()
}

func (d *DiagnosticClient) saveSyncStagesToDB() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When using approach with flushing to database by timeout can easily forget to flush once it's really needed (e.g. When stage is over)

Is flushing once in 5 min is really what you wanted?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if err := d.db.Update(d.ctx, StagesListUpdater(d.syncStages)); err != nil {
log.Error("[Diagnostics] Failed to update stages list", "err", err)
}
}

func (d *DiagnosticClient) getCurrentSyncIdxs() CurrentSyncStagesIdxs {
currentIdxs := CurrentSyncStagesIdxs{
Stage: -1,
Expand Down
27 changes: 19 additions & 8 deletions erigon-lib/diagnostics/sys_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,27 @@ var (

func (d *DiagnosticClient) setupSysInfoDiagnostics() {
sysInfo := GetSysInfo(d.dataDirPath)
if err := d.db.Update(d.ctx, RAMInfoUpdater(sysInfo.RAM)); err != nil {
log.Error("[Diagnostics] Failed to update RAM info", "err", err)
}
err := d.db.Update(d.ctx, func(tx kv.RwTx) error {
err := RAMInfoUpdater(d.hardwareInfo.RAM)(tx)
if err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check makes no sense because returned error goes to scope of caller
Function from .Update which also may be not what you wanted.

I would recommend just make 'return fn()' inside db.Update and check error outside

Also err has to be checked after each call

return err
}

if err := d.db.Update(d.ctx, CPUInfoUpdater(sysInfo.CPU)); err != nil {
log.Error("[Diagnostics] Failed to update CPU info", "err", err)
}
err = CPUInfoUpdater(d.hardwareInfo.CPU)(tx)
if err != nil {
return err
}

if err := d.db.Update(d.ctx, DiskInfoUpdater(sysInfo.Disk)); err != nil {
log.Error("[Diagnostics] Failed to update Disk info", "err", err)
err = DiskInfoUpdater(d.hardwareInfo.Disk)(tx)
if err != nil {
return err
}

return nil
})

if err != nil {
log.Warn("[Diagnostics] Failed to update system info", "err", err)
}

d.mu.Lock()
Expand Down
Loading