Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

erigon snapshots integrity: add check for body.BaseTxnID #9121

Merged
merged 16 commits into from
Jan 4, 2024
1 change: 1 addition & 0 deletions eth/integrity/e3_history_no_system_txs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package integrity
taratorio marked this conversation as resolved.
Show resolved Hide resolved
184 changes: 172 additions & 12 deletions turbo/app/snapshots_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/ledgerwatch/erigon-lib/metrics"
"github.com/ledgerwatch/log/v3"
"github.com/urfave/cli/v2"
"golang.org/x/sync/semaphore"

"github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/common/datadir"
Expand Down Expand Up @@ -144,6 +145,20 @@ var snapshotCommand = cli.Command{
},
}),
},
{
Name: "integrity",
Action: doIntegrity,
Flags: joinFlags([]cli.Flag{
&utils.DataDirFlag,
}),
},
//{
// Name: "bodies_decrement_datafix",
// Action: doBodiesDecrement,
// Flags: joinFlags([]cli.Flag{
// &utils.DataDirFlag,
// }),
//},
},
}

Expand Down Expand Up @@ -174,6 +189,39 @@ var (
}
)

func doIntegrity(cliCtx *cli.Context) error {
logger, _, err := debug.Setup(cliCtx, true /* root logger */)
if err != nil {
return err
}

ctx := cliCtx.Context
dirs := datadir.New(cliCtx.String(utils.DataDirFlag.Name))
chainDB := dbCfg(kv.ChainDB, dirs.Chaindata).MustOpen()
defer chainDB.Close()

cfg := ethconfig.NewSnapCfg(true, false, true)
chainConfig := fromdb.ChainConfig(chainDB)
blockSnaps, borSnaps, blockRetire, agg, err := openSnaps(ctx, cfg, dirs, snapcfg.KnownCfg(chainConfig.ChainName, 0).Version, chainDB, logger)
if err != nil {
return err
}
defer blockSnaps.Close()
defer borSnaps.Close()
defer agg.Close()

blockReader, _ := blockRetire.IO()
if err := blockReader.(*freezeblocks.BlockReader).IntegrityTxnID(false); err != nil {
return err
}

//if err := integrity.E3HistoryNoSystemTxs(ctx, chainDB, agg); err != nil {
// return err
//}

return nil
}

func doDiff(cliCtx *cli.Context) error {
defer log.Info("Done")
srcF, dstF := cliCtx.String("src"), cliCtx.String("dst")
Expand Down Expand Up @@ -243,6 +291,7 @@ func doDecompressSpeed(cliCtx *cli.Context) error {
}()
return nil
}

func doRam(cliCtx *cli.Context) error {
var logger log.Logger
var err error
Expand Down Expand Up @@ -279,19 +328,17 @@ func doIndicesCommand(cliCtx *cli.Context) error {

dirs := datadir.New(cliCtx.String(utils.DataDirFlag.Name))
rebuild := cliCtx.Bool(SnapshotRebuildFlag.Name)
//from := cliCtx.Uint64(SnapshotFromFlag.Name)

chainDB := mdbx.NewMDBX(logger).Path(dirs.Chaindata).MustOpen()
chainDB := dbCfg(kv.ChainDB, dirs.Chaindata).MustOpen()
defer chainDB.Close()

dir.MustExist(dirs.SnapHistory)
chainConfig := fromdb.ChainConfig(chainDB)

if rebuild {
panic("not implemented")
}

cfg := ethconfig.NewSnapCfg(true, false, true)
chainConfig := fromdb.ChainConfig(chainDB)
blockSnaps, borSnaps, br, agg, err := openSnaps(ctx, cfg, dirs, snapcfg.KnownCfg(chainConfig.ChainName, 0).Version, chainDB, logger)

if err != nil {
Expand Down Expand Up @@ -325,13 +372,16 @@ func openSnaps(ctx context.Context, cfg ethconfig.BlocksFreezing, dirs datadir.D
return
}
borSnaps.LogStat("open")

agg, err = libstate.NewAggregatorV3(ctx, dirs.SnapHistory, dirs.Tmp, ethconfig.HistoryV3AggregationStep, chainDB, logger)
if err != nil {
return
}
agg.SetWorkers(estimate.CompressSnapshot.Workers())
err = agg.OpenFolder()
agg = openAgg(ctx, dirs, chainDB, logger)
err = chainDB.View(ctx, func(tx kv.Tx) error {
ac := agg.MakeContext()
defer ac.Close()
//ac.LogStats(tx, func(endTxNumMinimax uint64) uint64 {
// _, histBlockNumProgress, _ := rawdbv3.TxNums.FindBlockNum(tx, endTxNumMinimax)
// return histBlockNumProgress
//})
return nil
})
if err != nil {
return
}
Expand Down Expand Up @@ -461,7 +511,7 @@ func doRetireCommand(cliCtx *cli.Context) error {
every := cliCtx.Uint64(SnapshotEveryFlag.Name)
version := uint8(cliCtx.Int(SnapshotVersionFlag.Name))

db := mdbx.NewMDBX(logger).Label(kv.ChainDB).Path(dirs.Chaindata).MustOpen()
db := dbCfg(kv.ChainDB, dirs.Chaindata).MustOpen()
defer db.Close()

cfg := ethconfig.NewSnapCfg(true, false, true)
Expand Down Expand Up @@ -641,3 +691,113 @@ func doUploaderCommand(cliCtx *cli.Context) error {
}
return err
}

/*
func doBodiesDecrement(cliCtx *cli.Context) error {
taratorio marked this conversation as resolved.
Show resolved Hide resolved
logger, _, err := debug.Setup(cliCtx, true)
if err != nil {
return err
}
dirs := datadir.New(cliCtx.String(utils.DataDirFlag.Name))
ctx := cliCtx.Context
logEvery := time.NewTicker(30 * time.Second)
defer logEvery.Stop()

list, err := snaptype.Segments(dirs.Snap, 1)
if err != nil {
return err
}
var l []snaptype.FileInfo
for _, f := range list {
if f.T != snaptype.Bodies {
continue
}
if f.From < 14_500_000 {
continue
}
l = append(l, f)
}
migrateSingleBody := func(srcF, dstF string) error {
src, err := compress.NewDecompressor(srcF)
if err != nil {
return err
}
defer src.Close()
dst, err := compress.NewCompressor(ctx, "compress", dstF, dirs.Tmp, compress.MinPatternScore, estimate.CompressSnapshot.Workers(), log.LvlInfo, logger)
if err != nil {
return err
}
defer dst.Close()

i := 0
srcG := src.MakeGetter()
var buf []byte
dstBuf := bytes.NewBuffer(nil)
for srcG.HasNext() {
i++
buf, _ = srcG.Next(buf[:0])
body := &types.BodyForStorage{}
if err := rlp.Decode(bytes.NewReader(buf), body); err != nil {
return err
}
body.BaseTxId -= 1
dstBuf.Reset()
if err := rlp.Encode(dstBuf, body); err != nil {
return err
}

if err := dst.AddWord(dstBuf.Bytes()); err != nil {
return err
}

select {
case <-logEvery.C:
logger.Info("[bodies] progress", "f", src.FileName(), "progress", fmt.Sprintf("%dK/%dK", i/1_000, src.Count()/1_000))
default:
}
}
if err := dst.Compress(); err != nil {
return err
}
src.Close()
dst.Close()
os.Rename(srcF, srcF+".back")
os.Rename(dstF, srcF)
os.Remove(srcF + ".torrent")
os.Remove(srcF + ".idx")
ext := filepath.Ext(srcF)
withoutExt := srcF[:len(srcF)-len(ext)]
_ = os.Remove(withoutExt + ".idx")
return nil
}
for _, f := range l {
srcF, dstF := f.Path, f.Path+"2"
if err := migrateSingleBody(srcF, dstF); err != nil {
return err
}
}

return nil
}
*/

func dbCfg(label kv.Label, path string) mdbx.MdbxOpts {
const ThreadsLimit = 9_000
limiterB := semaphore.NewWeighted(ThreadsLimit)
opts := mdbx.NewMDBX(log.New()).Path(path).Label(label).RoTxsLimiter(limiterB)
// integration tool don't intent to create db, then easiest way to open db - it's pass mdbx.Accede flag, which allow
// to read all options from DB, instead of overriding them
opts = opts.Accede()
return opts
}
func openAgg(ctx context.Context, dirs datadir.Dirs, chainDB kv.RwDB, logger log.Logger) *libstate.AggregatorV3 {
agg, err := libstate.NewAggregatorV3(ctx, dirs.Snap, dirs.Tmp, ethconfig.HistoryV3AggregationStep, chainDB, logger)
if err != nil {
panic(err)
}
if err = agg.OpenFolder(); err != nil {
panic(err)
}
agg.SetWorkers(estimate.CompressSnapshot.Workers())
return agg
}
28 changes: 28 additions & 0 deletions turbo/snapshotsync/freezeblocks/block_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/ledgerwatch/erigon/eth/ethconfig"
"github.com/ledgerwatch/erigon/rlp"
"github.com/ledgerwatch/erigon/turbo/services"
"github.com/ledgerwatch/log/v3"
)

type RemoteBlockReader struct {
Expand Down Expand Up @@ -868,6 +869,33 @@ func (r *BlockReader) IterateFrozenBodies(f func(blockNum, baseTxNum, txAmount u
}
return nil
}

func (r *BlockReader) IntegrityTxnID(failFast bool) error {
defer log.Info("[integrity] IntegrityTxnID done")
view := r.sn.View()
defer view.Close()

var expectedFirstTxnID uint64
for _, snb := range view.Bodies() {
firstBlockNum := snb.idxBodyNumber.BaseDataID()
sn, _ := view.TxsSegment(snb.idxBodyNumber.BaseDataID())
AskAlexSharov marked this conversation as resolved.
Show resolved Hide resolved
b, _, err := r.bodyForStorageFromSnapshot(firstBlockNum, snb, nil)
if err != nil {
return err
}
if b.BaseTxId != expectedFirstTxnID {
err := fmt.Errorf("[integrity] IntegrityTxnID: bn=%d, baseID=%d, cnt=%d, expectedFirstTxnID=%d", firstBlockNum, b.BaseTxId, sn.Seg.Count(), expectedFirstTxnID)
if failFast {
return err
} else {
log.Error(err.Error())
}
}
expectedFirstTxnID = b.BaseTxId + uint64(sn.Seg.Count())
}
return nil
}

func (r *BlockReader) BadHeaderNumber(ctx context.Context, tx kv.Getter, hash common.Hash) (blockHeight *uint64, err error) {
return rawdb.ReadBadHeaderNumber(tx, hash)
}
Expand Down
Loading
Loading