Skip to content

Commit

Permalink
block retire: merge all possible files (even bor) even if nothing to …
Browse files Browse the repository at this point in the history
…retire (#9068)

Co-authored-by: Alex Sharp <alexsharp@Alexs-MacBook-Pro-2.local>
  • Loading branch information
AskAlexSharov and Alex Sharp authored Dec 24, 2023
1 parent 77d32cc commit e08003f
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 47 deletions.
65 changes: 35 additions & 30 deletions turbo/snapshotsync/freezeblocks/block_snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -1286,28 +1286,33 @@ func CanDeleteTo(curBlockNum uint64, blocksInSnapshots uint64) (blockTo uint64)
return cmp.Min(hardLimit, blocksInSnapshots+1)
}

func (br *BlockRetire) retireBlocks(ctx context.Context, blockFrom, blockTo uint64, lvl log.Lvl, seedNewSnapshots func(downloadRequest []services.DownloadRequest) error, onDelete func(l []string) error) error {
func (br *BlockRetire) retireBlocks(ctx context.Context, forwardProgress uint64, lvl log.Lvl, seedNewSnapshots func(downloadRequest []services.DownloadRequest) error, onDelete func(l []string) error) (bool, error) {
notifier, logger, blockReader, tmpDir, db, workers := br.notifier, br.logger, br.blockReader, br.tmpDir, br.db, br.workers
logger.Log(lvl, "[snapshots] Retire Blocks", "range", fmt.Sprintf("%dk-%dk", blockFrom/1000, blockTo/1000))
snapshots := br.snapshots()
firstTxNum := blockReader.(*BlockReader).FirstTxNumNotInSnapshots()

// in future we will do it in background
if err := DumpBlocks(ctx, blockFrom, blockTo, snaptype.Erigon2MergeLimit, tmpDir, snapshots.Dir(), firstTxNum, db, workers, lvl, logger, blockReader); err != nil {
return fmt.Errorf("DumpBlocks: %w", err)
}
if err := snapshots.ReopenFolder(); err != nil {
return fmt.Errorf("reopen: %w", err)
}
snapshots.LogStat()
if notifier != nil && !reflect.ValueOf(notifier).IsNil() { // notify about new snapshots of any size
notifier.OnNewSnapshot()
blockFrom, blockTo, ok := CanRetire(forwardProgress, br.blockReader.FrozenBlocks())
if ok {
logger.Log(lvl, "[snapshots] Retire Blocks", "range", fmt.Sprintf("%dk-%dk", blockFrom/1000, blockTo/1000))
// in future we will do it in background
if err := DumpBlocks(ctx, blockFrom, blockTo, snaptype.Erigon2MergeLimit, tmpDir, snapshots.Dir(), firstTxNum, db, workers, lvl, logger, blockReader); err != nil {
return ok, fmt.Errorf("DumpBlocks: %w", err)
}
if err := snapshots.ReopenFolder(); err != nil {
return ok, fmt.Errorf("reopen: %w", err)
}
snapshots.LogStat()
if notifier != nil && !reflect.ValueOf(notifier).IsNil() { // notify about new snapshots of any size
notifier.OnNewSnapshot()
}
}

merger := NewMerger(tmpDir, workers, lvl, db, br.chainConfig, logger)
rangesToMerge := merger.FindMergeRanges(snapshots.Ranges(), snapshots.BlocksAvailable())
if len(rangesToMerge) == 0 {
return nil
return ok, nil
}
ok = true // have something to merge
onMerge := func(r Range) error {
if notifier != nil && !reflect.ValueOf(notifier).IsNil() { // notify about new snapshots of any size
notifier.OnNewSnapshot()
Expand All @@ -1325,10 +1330,10 @@ func (br *BlockRetire) retireBlocks(ctx context.Context, blockFrom, blockTo uint
}
err := merger.Merge(ctx, snapshots, rangesToMerge, snapshots.Dir(), true /* doIndex */, onMerge, onDelete)
if err != nil {
return err
return ok, err
}

return nil
return ok, nil
}

func (br *BlockRetire) PruneAncientBlocks(tx kv.RwTx, limit int) error {
Expand Down Expand Up @@ -1368,38 +1373,38 @@ func (br *BlockRetire) RetireBlocksInBackground(ctx context.Context, forwardProg
}()
}

func (br *BlockRetire) RetireBlocks(ctx context.Context, forwardProgress uint64, lvl log.Lvl, seedNewSnapshots func(downloadRequest []services.DownloadRequest) error, onDeleteSnapshots func(l []string) error) error {
func (br *BlockRetire) RetireBlocks(ctx context.Context, forwardProgress uint64, lvl log.Lvl, seedNewSnapshots func(downloadRequest []services.DownloadRequest) error, onDeleteSnapshots func(l []string) error) (err error) {
includeBor := br.chainConfig.Bor != nil
if includeBor {
// "bor snaps" can be behind "block snaps", it's ok: for example because of `kill -9` in the middle of merge
for br.blockReader.FrozenBorBlocks() < br.blockReader.FrozenBlocks() {
blockFrom, blockTo, ok := CanRetire(forwardProgress, br.blockReader.FrozenBorBlocks())
ok, err := br.retireBorBlocks(ctx, forwardProgress, lvl, seedNewSnapshots, onDeleteSnapshots)
if err != nil {
return err
}
if !ok {
break
}
if err := br.retireBorBlocks(ctx, blockFrom, blockTo, lvl, seedNewSnapshots, onDeleteSnapshots); err != nil {
return err
}
}
}

var ok, okBor bool
for {
blockFrom, blockTo, ok := CanRetire(forwardProgress, br.blockReader.FrozenBlocks())
if !ok {
break
}
if err := br.retireBlocks(ctx, blockFrom, blockTo, lvl, seedNewSnapshots, onDeleteSnapshots); err != nil {
ok, err = br.retireBlocks(ctx, forwardProgress, lvl, seedNewSnapshots, onDeleteSnapshots)
if err != nil {
return err
}

if includeBor {
blockFrom, blockTo, ok = CanRetire(forwardProgress, br.blockReader.FrozenBorBlocks())
if ok {
if err := br.retireBorBlocks(ctx, blockFrom, blockTo, lvl, seedNewSnapshots, onDeleteSnapshots); err != nil {
return err
}
okBor, err = br.retireBorBlocks(ctx, forwardProgress, lvl, seedNewSnapshots, onDeleteSnapshots)
if err != nil {
return err
}
}
haveMore := ok || okBor
if !haveMore {
break
}
}
return nil
}
Expand Down
39 changes: 22 additions & 17 deletions turbo/snapshotsync/freezeblocks/bor_snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"encoding/binary"
"errors"
"fmt"
"github.com/ledgerwatch/erigon/consensus/bor"
"os"
"path"
"path/filepath"
Expand All @@ -16,6 +15,8 @@ import (
"sync/atomic"
"time"

"github.com/ledgerwatch/erigon/consensus/bor"

"github.com/ledgerwatch/erigon-lib/chain"
"github.com/ledgerwatch/erigon-lib/chain/snapcfg"
common2 "github.com/ledgerwatch/erigon-lib/common"
Expand Down Expand Up @@ -170,28 +171,33 @@ type borSpanSegments struct {
segments []*BorSpanSegment
}

func (br *BlockRetire) retireBorBlocks(ctx context.Context, blockFrom, blockTo uint64, lvl log.Lvl, seedNewSnapshots func(downloadRequest []services.DownloadRequest) error, onDelete func(l []string) error) error {
func (br *BlockRetire) retireBorBlocks(ctx context.Context, forwardProgress uint64, lvl log.Lvl, seedNewSnapshots func(downloadRequest []services.DownloadRequest) error, onDelete func(l []string) error) (bool, error) {
chainConfig := fromdb.ChainConfig(br.db)
notifier, logger, blockReader, tmpDir, db, workers := br.notifier, br.logger, br.blockReader, br.tmpDir, br.db, br.workers
logger.Log(lvl, "[bor snapshots] Retire Bor Blocks", "range", fmt.Sprintf("%dk-%dk", blockFrom/1000, blockTo/1000))
snapshots := br.borSnapshots()
firstTxNum := blockReader.(*BlockReader).FirstTxNumNotInSnapshots()

if err := DumpBorBlocks(ctx, chainConfig, blockFrom, blockTo, snaptype.Erigon2MergeLimit, tmpDir, snapshots.Dir(), firstTxNum, db, workers, lvl, logger, blockReader); err != nil {
return fmt.Errorf("DumpBorBlocks: %w", err)
}
if err := snapshots.ReopenFolder(); err != nil {
return fmt.Errorf("reopen: %w", err)
}
snapshots.LogStat()
if notifier != nil && !reflect.ValueOf(notifier).IsNil() { // notify about new snapshots of any size
notifier.OnNewSnapshot()
blockFrom, blockTo, ok := CanRetire(forwardProgress, br.blockReader.FrozenBorBlocks())
if ok {
logger.Log(lvl, "[bor snapshots] Retire Bor Blocks", "range", fmt.Sprintf("%dk-%dk", blockFrom/1000, blockTo/1000))
if err := DumpBorBlocks(ctx, chainConfig, blockFrom, blockTo, snaptype.Erigon2MergeLimit, tmpDir, snapshots.Dir(), firstTxNum, db, workers, lvl, logger, blockReader); err != nil {
return ok, fmt.Errorf("DumpBorBlocks: %w", err)
}
if err := snapshots.ReopenFolder(); err != nil {
return ok, fmt.Errorf("reopen: %w", err)
}
snapshots.LogStat()
if notifier != nil && !reflect.ValueOf(notifier).IsNil() { // notify about new snapshots of any size
notifier.OnNewSnapshot()
}
}

merger := NewBorMerger(tmpDir, workers, lvl, db, chainConfig, notifier, logger)
rangesToMerge := merger.FindMergeRanges(snapshots.Ranges())
logger.Warn("[bor snapshots] Retire Bor Blocks", "rangesToMerge", fmt.Sprintf("%s", Ranges(rangesToMerge)))
if len(rangesToMerge) == 0 {
return nil
return ok, nil
}
ok = true // have something to merge
onMerge := func(r Range) error {
if notifier != nil && !reflect.ValueOf(notifier).IsNil() { // notify about new snapshots of any size
notifier.OnNewSnapshot()
Expand All @@ -209,11 +215,10 @@ func (br *BlockRetire) retireBorBlocks(ctx context.Context, blockFrom, blockTo u
}
err := merger.Merge(ctx, snapshots, rangesToMerge, snapshots.Dir(), true /* doIndex */, onMerge, onDelete)
if err != nil {
return err
return ok, err
}
return nil
return ok, nil
}

func DumpBorBlocks(ctx context.Context, chainConfig *chain.Config, blockFrom, blockTo, blocksPerFile uint64, tmpDir, snapDir string, firstTxNum uint64, chainDB kv.RoDB, workers int, lvl log.Lvl, logger log.Logger, blockReader services.FullBlockReader) error {
if blocksPerFile == 0 {
return nil
Expand Down

0 comments on commit e08003f

Please sign in to comment.