From b8b023b99238892afd66548ef227d514d4913ea2 Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Sat, 23 Dec 2023 15:47:43 +0700 Subject: [PATCH 1/4] save --- .../freezeblocks/block_snapshots.go | 40 ++++++++++++------- 1 file changed, 26 insertions(+), 14 deletions(-) diff --git a/turbo/snapshotsync/freezeblocks/block_snapshots.go b/turbo/snapshotsync/freezeblocks/block_snapshots.go index 98d0e9d3ae4..bae25c3f078 100644 --- a/turbo/snapshotsync/freezeblocks/block_snapshots.go +++ b/turbo/snapshotsync/freezeblocks/block_snapshots.go @@ -1369,21 +1369,35 @@ func (br *BlockRetire) RetireBlocksInBackground(ctx context.Context, forwardProg } func (br *BlockRetire) RetireBlocks(ctx context.Context, forwardProgress uint64, lvl log.Lvl, seedNewSnapshots func(downloadRequest []services.DownloadRequest) error, onDeleteSnapshots func(l []string) error) error { - blockFrom, blockTo, ok := CanRetire(forwardProgress, br.blockReader.FrozenBlocks()) - if ok { - if err := br.retireBlocks(ctx, blockFrom, blockTo, lvl, seedNewSnapshots, onDeleteSnapshots); err != nil { - //br.logger.Warn("[snapshots] retire blocks", "err", err, "fromBlock", blockFrom, "toBlock", blockTo) - return err - } - } - includeBor := br.chainConfig.Bor != nil if includeBor { - blockFrom, blockTo, ok = CanRetire(forwardProgress, br.blockReader.FrozenBorBlocks()) - if ok { + // if bor snapshots are behind, let's align them + for br.blockReader.FrozenBorBlocks() < br.blockReader.FrozenBlocks() { + blockFrom, blockTo, ok := CanRetire(forwardProgress, br.blockReader.FrozenBorBlocks()) + if !ok { + break + } if err := br.retireBorBlocks(ctx, blockFrom, blockTo, lvl, seedNewSnapshots, onDeleteSnapshots); err != nil { return err - //br.logger.Warn("[bor snapshots] retire blocks", "err", err, "fromBlock", blockFrom, "toBlock", blockTo) + } + } + } + + for { + blockFrom, blockTo, ok := CanRetire(forwardProgress, br.blockReader.FrozenBlocks()) + if !ok { + break + } + if err := br.retireBlocks(ctx, blockFrom, blockTo, lvl, seedNewSnapshots, onDeleteSnapshots); err != nil { + return err + } + + if includeBor { + blockFrom, blockTo, ok = CanRetire(forwardProgress, br.blockReader.FrozenBorBlocks()) + if ok { + if err := br.retireBorBlocks(ctx, blockFrom, blockTo, lvl, seedNewSnapshots, onDeleteSnapshots); err != nil { + return err + } } } } @@ -2384,9 +2398,6 @@ func (m *Merger) Merge(ctx context.Context, snapshots *RoSnapshots, mergeRanges return err } } - } - time.Sleep(1 * time.Second) // i working on blocking API - to ensure client does not use old snapsthos - and then delete them - for _, t := range snaptype.BlockSnapshotTypes { m.removeOldFiles(toMerge[t], snapDir) } } @@ -2446,6 +2457,7 @@ func (m *Merger) merge(ctx context.Context, toMerge []string, targetFile string, func (m *Merger) removeOldFiles(toDel []string, snapDir string) { for _, f := range toDel { _ = os.Remove(f) + _ = os.Remove(f + ".torrent") ext := filepath.Ext(f) withoutExt := f[:len(f)-len(ext)] _ = os.Remove(withoutExt + ".idx") From f687d58a071af360c848b55d7946ea1c2d3aabd5 Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Sat, 23 Dec 2023 15:55:00 +0700 Subject: [PATCH 2/4] save --- turbo/snapshotsync/freezeblocks/block_snapshots.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/turbo/snapshotsync/freezeblocks/block_snapshots.go b/turbo/snapshotsync/freezeblocks/block_snapshots.go index bae25c3f078..445b6b52a28 100644 --- a/turbo/snapshotsync/freezeblocks/block_snapshots.go +++ b/turbo/snapshotsync/freezeblocks/block_snapshots.go @@ -1371,7 +1371,7 @@ func (br *BlockRetire) RetireBlocksInBackground(ctx context.Context, forwardProg func (br *BlockRetire) RetireBlocks(ctx context.Context, forwardProgress uint64, lvl log.Lvl, seedNewSnapshots func(downloadRequest []services.DownloadRequest) error, onDeleteSnapshots func(l []string) error) error { includeBor := br.chainConfig.Bor != nil if includeBor { - // if bor snapshots are behind, let's align them + // "bor snaps" can be behind "block snaps", it's ok: for example because of `kill -9` in the middle of merge for br.blockReader.FrozenBorBlocks() < br.blockReader.FrozenBlocks() { blockFrom, blockTo, ok := CanRetire(forwardProgress, br.blockReader.FrozenBorBlocks()) if !ok { From a42f856765de5cf914df476474fac40e5ae2befb Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Sat, 23 Dec 2023 15:57:11 +0700 Subject: [PATCH 3/4] save --- turbo/snapshotsync/freezeblocks/block_snapshots.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/turbo/snapshotsync/freezeblocks/block_snapshots.go b/turbo/snapshotsync/freezeblocks/block_snapshots.go index 445b6b52a28..ef912c4cec5 100644 --- a/turbo/snapshotsync/freezeblocks/block_snapshots.go +++ b/turbo/snapshotsync/freezeblocks/block_snapshots.go @@ -1572,12 +1572,14 @@ func dumpBlocksRange(ctx context.Context, blockFrom, blockTo uint64, tmpDir, sna } snapDir, fileName := filepath.Split(f.Path) ext := filepath.Ext(fileName) - logger.Log(lvl, "[snapshots] Compression", "ratio", sn.Ratio.String(), "file", fileName[:len(fileName)-len(ext)]) - + logger.Log(lvl, "[snapshots] Compression start", "file", fileName[:len(fileName)-len(ext)]) + t := time.Now() _, expectedCount, err = txsAmountBasedOnBodiesSnapshots(snapDir, blockFrom, blockTo) if err != nil { return err } + logger.Log(lvl, "[snapshots] Compression", "took", time.Since(t), "ratio", sn.Ratio.String(), "file", fileName[:len(fileName)-len(ext)]) + if expectedCount != sn.Count() { return fmt.Errorf("incorrect tx count: %d, expected from snapshots: %d", sn.Count(), expectedCount) } From c683b100d049440a703294434efe2e455229bef7 Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Sat, 23 Dec 2023 16:10:06 +0700 Subject: [PATCH 4/4] save --- erigon-lib/compress/compress.go | 1 + turbo/snapshotsync/freezeblocks/block_snapshots.go | 5 ++--- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/erigon-lib/compress/compress.go b/erigon-lib/compress/compress.go index c9ef174d621..d2b57d458d4 100644 --- a/erigon-lib/compress/compress.go +++ b/erigon-lib/compress/compress.go @@ -127,6 +127,7 @@ func (c *Compressor) Close() { } func (c *Compressor) SetTrace(trace bool) { c.trace = trace } +func (c *Compressor) Workers() int { return c.workers } func (c *Compressor) Count() int { return int(c.wordsCount) } diff --git a/turbo/snapshotsync/freezeblocks/block_snapshots.go b/turbo/snapshotsync/freezeblocks/block_snapshots.go index ef912c4cec5..eecf518be76 100644 --- a/turbo/snapshotsync/freezeblocks/block_snapshots.go +++ b/turbo/snapshotsync/freezeblocks/block_snapshots.go @@ -1572,20 +1572,19 @@ func dumpBlocksRange(ctx context.Context, blockFrom, blockTo uint64, tmpDir, sna } snapDir, fileName := filepath.Split(f.Path) ext := filepath.Ext(fileName) - logger.Log(lvl, "[snapshots] Compression start", "file", fileName[:len(fileName)-len(ext)]) + logger.Log(lvl, "[snapshots] Compression start", "file", fileName[:len(fileName)-len(ext)], "workers", sn.Workers()) t := time.Now() _, expectedCount, err = txsAmountBasedOnBodiesSnapshots(snapDir, blockFrom, blockTo) if err != nil { return err } - logger.Log(lvl, "[snapshots] Compression", "took", time.Since(t), "ratio", sn.Ratio.String(), "file", fileName[:len(fileName)-len(ext)]) - if expectedCount != sn.Count() { return fmt.Errorf("incorrect tx count: %d, expected from snapshots: %d", sn.Count(), expectedCount) } if err := sn.Compress(); err != nil { return fmt.Errorf("compress: %w", err) } + logger.Log(lvl, "[snapshots] Compression", "took", time.Since(t), "ratio", sn.Ratio.String(), "file", fileName[:len(fileName)-len(ext)]) p := &background.Progress{} if err := buildIdx(ctx, f, &chainConfig, tmpDir, p, lvl, logger); err != nil {