diff --git a/cmd/rpcdaemon/cli/config.go b/cmd/rpcdaemon/cli/config.go index e03bf01637a..43d80203023 100644 --- a/cmd/rpcdaemon/cli/config.go +++ b/cmd/rpcdaemon/cli/config.go @@ -31,6 +31,7 @@ import ( "time" "github.com/spf13/cobra" + "golang.org/x/sync/errgroup" "golang.org/x/sync/semaphore" "google.golang.org/grpc" grpcHealth "google.golang.org/grpc/health" @@ -408,12 +409,6 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger // Configure sapshots allSnapshots = freezeblocks.NewRoSnapshots(cfg.Snap, cfg.Dirs.Snap, 0, logger) allBorSnapshots = freezeblocks.NewBorRoSnapshots(cfg.Snap, cfg.Dirs.Snap, 0, logger) - // To povide good UX - immediatly can read snapshots after RPCDaemon start, even if Erigon is down - // Erigon does store list of snapshots in db: means RPCDaemon can read this list now, but read by `remoteKvClient.Snapshots` after establish grpc connection - allSnapshots.OptimisticalyReopenFolder() - allBorSnapshots.OptimisticalyReopenFolder() - allSnapshots.LogStat("remote") - allBorSnapshots.LogStat("bor:remote") blockReader = freezeblocks.NewBlockReader(allSnapshots, allBorSnapshots) txNumsReader := rawdbv3.TxNums.WithCustomReadTxNumFunc(freezeblocks.ReadTxNumFuncFromBlockReader(ctx, blockReader)) @@ -421,23 +416,41 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger if err != nil { return nil, nil, nil, nil, nil, nil, nil, ff, nil, nil, fmt.Errorf("create aggregator: %w", err) } - _ = agg.OpenFolder() //TODO: must use analog of `OptimisticReopenWithDB` - - db.View(context.Background(), func(tx kv.Tx) error { - aggTx := agg.BeginFilesRo() - defer aggTx.Close() - aggTx.LogStats(tx, func(endTxNumMinimax uint64) (uint64, error) { - _, histBlockNumProgress, err := txNumsReader.FindBlockNum(tx, endTxNumMinimax) - return histBlockNumProgress, err + // To povide good UX - immediatly can read snapshots after RPCDaemon start, even if Erigon is down + // Erigon does store list of snapshots in db: means RPCDaemon can read this list now, but read by `remoteKvClient.Snapshots` after establish grpc connection + allSegmentsDownloadComplete, err := rawdb.AllSegmentsDownloadCompleteFromDB(rwKv) + if err != nil { + return nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, err + } + if allSegmentsDownloadComplete { + allSnapshots.OptimisticalyReopenFolder() + allBorSnapshots.OptimisticalyReopenFolder() + + allSnapshots.LogStat("remote") + allBorSnapshots.LogStat("bor:remote") + _ = agg.OpenFolder() //TODO: must use analog of `OptimisticReopenWithDB` + + db.View(context.Background(), func(tx kv.Tx) error { + aggTx := agg.BeginFilesRo() + defer aggTx.Close() + aggTx.LogStats(tx, func(endTxNumMinimax uint64) (uint64, error) { + _, histBlockNumProgress, err := txNumsReader.FindBlockNum(tx, endTxNumMinimax) + return histBlockNumProgress, err + }) + return nil }) - return nil - }) + } else { + log.Warn("[rpc] download of segments not complete yet (need wait, then RPC will work)") + } + + wg := errgroup.Group{} + wg.SetLimit(1) onNewSnapshot = func() { - go func() { // don't block events processing by network communication + wg.Go(func() error { // don't block events processing by network communication reply, err := remoteKvClient.Snapshots(ctx, &remote.SnapshotsRequest{}, grpc.WaitForReady(true)) if err != nil { logger.Warn("[snapshots] reopen", "err", err) - return + return nil } if err := allSnapshots.ReopenList(reply.BlocksFiles, true); err != nil { logger.Error("[snapshots] reopen", "err", err) @@ -450,7 +463,6 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger allBorSnapshots.LogStat("bor:reopen") } - //if err = agg.openList(reply.HistoryFiles, true); err != nil { if err = agg.OpenFolder(); err != nil { logger.Error("[snapshots] reopen", "err", err) } else { @@ -464,7 +476,8 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger return nil }) } - }() + return nil + }) } onNewSnapshot() diff --git a/core/rawdb/accessors_chain.go b/core/rawdb/accessors_chain.go index edebdfe5f4a..ea8deff567c 100644 --- a/core/rawdb/accessors_chain.go +++ b/core/rawdb/accessors_chain.go @@ -30,8 +30,6 @@ import ( "github.com/gballet/go-verkle" - "github.com/erigontech/erigon-lib/log/v3" - "github.com/erigontech/erigon-lib/common" "github.com/erigontech/erigon-lib/common/dbg" "github.com/erigontech/erigon-lib/common/hexutility" @@ -39,7 +37,7 @@ import ( "github.com/erigontech/erigon-lib/kv" "github.com/erigontech/erigon-lib/kv/dbutils" "github.com/erigontech/erigon-lib/kv/rawdbv3" - + "github.com/erigontech/erigon-lib/log/v3" "github.com/erigontech/erigon/core/types" "github.com/erigontech/erigon/ethdb/cbor" "github.com/erigontech/erigon/rlp" diff --git a/core/rawdb/accessors_metadata.go b/core/rawdb/accessors_metadata.go index d8e7ab6d65b..fe31e55e1a9 100644 --- a/core/rawdb/accessors_metadata.go +++ b/core/rawdb/accessors_metadata.go @@ -20,10 +20,12 @@ package rawdb import ( + "context" "encoding/json" "fmt" "github.com/erigontech/erigon/core/types" + "github.com/erigontech/erigon/eth/stagedsync/stages" "github.com/erigontech/erigon/polygon/bor/borcfg" "github.com/erigontech/erigon-lib/chain" @@ -81,11 +83,6 @@ func WriteChainConfig(db kv.Putter, hash libcommon.Hash, cfg *chain.Config) erro return nil } -// DeleteChainConfig retrieves the consensus settings based on the given genesis hash. -func DeleteChainConfig(db kv.Deleter, hash libcommon.Hash) error { - return db.Delete(kv.ConfigTable, hash[:]) -} - func WriteGenesisIfNotExist(db kv.RwTx, g *types.Genesis) error { has, err := db.Has(kv.ConfigTable, kv.GenesisKey) if err != nil { @@ -117,3 +114,15 @@ func ReadGenesis(db kv.Getter) (*types.Genesis, error) { } return &g, nil } + +func AllSegmentsDownloadComplete(tx kv.Getter) (allSegmentsDownloadComplete bool, err error) { + snapshotsStageProgress, err := stages.GetStageProgress(tx, stages.Snapshots) + return snapshotsStageProgress > 0, err +} +func AllSegmentsDownloadCompleteFromDB(db kv.RoDB) (allSegmentsDownloadComplete bool, err error) { + err = db.View(context.Background(), func(tx kv.Tx) error { + allSegmentsDownloadComplete, err = AllSegmentsDownloadComplete(tx) + return err + }) + return allSegmentsDownloadComplete, err +} diff --git a/erigon-lib/downloader/downloader.go b/erigon-lib/downloader/downloader.go index 828532c1102..9e6d3373cee 100644 --- a/erigon-lib/downloader/downloader.go +++ b/erigon-lib/downloader/downloader.go @@ -21,7 +21,6 @@ import ( "bytes" "context" "encoding/hex" - "encoding/json" "errors" "fmt" "math" @@ -58,6 +57,7 @@ import ( "github.com/erigontech/erigon-lib/common/dir" "github.com/erigontech/erigon-lib/diagnostics" "github.com/erigontech/erigon-lib/downloader/downloadercfg" + "github.com/erigontech/erigon-lib/downloader/downloaderrawdb" "github.com/erigontech/erigon-lib/downloader/snaptype" prototypes "github.com/erigontech/erigon-lib/gointerfaces/typesproto" "github.com/erigontech/erigon-lib/kv" @@ -725,23 +725,10 @@ func localHashBytes(ctx context.Context, fileInfo snaptype.FileInfo, db kv.RoDB, if db != nil { err := db.View(ctx, func(tx kv.Tx) (err error) { - infoBytes, err := tx.GetOne(kv.BittorrentInfo, []byte(fileInfo.Name())) - + hashBytes, err = downloaderrawdb.ReadTorrentInfoHash(tx, fileInfo.Name()) if err != nil { return err } - - if len(infoBytes) == 20 { - hashBytes = infoBytes - return nil - } - - var info torrentInfo - - if err = json.Unmarshal(infoBytes, &info); err == nil { - hashBytes = info.Hash - } - return nil }) @@ -1503,18 +1490,14 @@ func logSeedHashMismatches(torrentHash infohash.T, name string, seedHashMismatch } } -func (d *Downloader) checkComplete(name string) (bool, int64, *time.Time) { - if info, err := d.torrentInfo(name); err == nil { - if info.Completed != nil && info.Completed.Before(time.Now()) { - if info.Length != nil { - if fi, err := os.Stat(filepath.Join(d.SnapDir(), name)); err == nil { - return fi.Size() == *info.Length && fi.ModTime().Equal(*info.Completed), *info.Length, info.Completed - } - } - } +func (d *Downloader) checkComplete(name string) (complete bool, fileLen int64, completedAt *time.Time) { + if err := d.db.View(d.ctx, func(tx kv.Tx) error { + complete, fileLen, completedAt = downloaderrawdb.CheckFileComplete(tx, name, d.SnapDir()) + return nil + }); err != nil { + return false, 0, nil } - - return false, 0, nil + return } func (d *Downloader) getWebDownloadInfo(t *torrent.Torrent) (webDownloadInfo, []*seedHash, error) { @@ -1934,28 +1917,19 @@ func availableTorrents(ctx context.Context, pending []*torrent.Torrent, download func (d *Downloader) SnapDir() string { return d.cfg.Dirs.Snap } -func (d *Downloader) torrentInfo(name string) (*torrentInfo, error) { - var info torrentInfo - +func (d *Downloader) torrentInfo(name string) (*downloaderrawdb.TorrentInfo, error) { + var info *downloaderrawdb.TorrentInfo err := d.db.View(d.ctx, func(tx kv.Tx) (err error) { - infoBytes, err := tx.GetOne(kv.BittorrentInfo, []byte(name)) - + info, err = downloaderrawdb.ReadTorrentInfo(tx, name) if err != nil { return err } - - if err = json.Unmarshal(infoBytes, &info); err != nil { - return err - } - return nil }) - if err != nil { return nil, err } - - return &info, nil + return info, nil } func (d *Downloader) ReCalcStats(interval time.Duration) { diff --git a/erigon-lib/downloader/downloadercfg/downloadercfg.go b/erigon-lib/downloader/downloadercfg/downloadercfg.go index e093aec8058..fb6e6e78d3a 100644 --- a/erigon-lib/downloader/downloadercfg/downloadercfg.go +++ b/erigon-lib/downloader/downloadercfg/downloadercfg.go @@ -219,7 +219,8 @@ func New(dirs datadir.Dirs, version string, verbosity lg.Level, downloadRate, up } // setup snapcfg - if err := loadSnapshotsEitherFromDiskIfNeeded(dirs, chainName); err != nil { + preverifiedCfg, err := loadSnapshotsEitherFromDiskIfNeeded(dirs, chainName) + if err != nil { return nil, err } @@ -227,28 +228,30 @@ func New(dirs datadir.Dirs, version string, verbosity lg.Level, downloadRate, up ClientConfig: torrentConfig, DownloadSlots: downloadSlots, WebSeedUrls: webseedHttpProviders, WebSeedFileProviders: webseedFileProviders, DownloadTorrentFilesFromWebseed: true, AddTorrentsFromDisk: true, SnapshotLock: lockSnapshots, - SnapshotConfig: snapcfg.KnownCfg(chainName), + SnapshotConfig: preverifiedCfg, MdbxWriteMap: mdbxWriteMap, }, nil } -func loadSnapshotsEitherFromDiskIfNeeded(dirs datadir.Dirs, chainName string) error { +func loadSnapshotsEitherFromDiskIfNeeded(dirs datadir.Dirs, chainName string) (*snapcfg.Cfg, error) { preverifiedToml := filepath.Join(dirs.Snap, "preverified.toml") exists, err := dir.FileExist(preverifiedToml) if err != nil { - return err + return nil, err } if exists { // Read the preverified.toml and load the snapshots haveToml, err := os.ReadFile(preverifiedToml) if err != nil { - return err + return nil, err } snapcfg.SetToml(chainName, haveToml) - return nil } - return dir.WriteFileWithFsync(preverifiedToml, snapcfg.GetToml(chainName), 0644) + if err := dir.WriteFileWithFsync(preverifiedToml, snapcfg.GetToml(chainName), 0644); err != nil { + return nil, err + } + return snapcfg.KnownCfg(preverifiedToml), nil } func getIpv6Enabled() bool { diff --git a/erigon-lib/downloader/downloaderrawdb/accessors_downloader.go b/erigon-lib/downloader/downloaderrawdb/accessors_downloader.go new file mode 100644 index 00000000000..1e5c9ea0246 --- /dev/null +++ b/erigon-lib/downloader/downloaderrawdb/accessors_downloader.go @@ -0,0 +1,73 @@ +package downloaderrawdb + +import ( + "encoding/json" + "os" + "path/filepath" + "time" + + "github.com/erigontech/erigon-lib/kv" +) + +type TorrentInfo struct { + Name string `json:"name"` + Hash []byte `json:"hash"` + Length *int64 `json:"length,omitempty"` + Created *time.Time `json:"created,omitempty"` + Completed *time.Time `json:"completed,omitempty"` +} + +func ReadTorrentInfo(downloaderDBTx kv.Tx, name string) (*TorrentInfo, error) { + var info TorrentInfo + infoBytes, err := downloaderDBTx.GetOne(kv.BittorrentInfo, []byte(name)) + if err != nil { + return nil, err + } + if len(infoBytes) == 0 { + return &info, nil + } + if err = json.Unmarshal(infoBytes, &info); err != nil { + return nil, err + } + return &info, nil +} + +func ReadTorrentInfoHash(downloaderDBTx kv.Tx, name string) (hashBytes []byte, err error) { + infoBytes, err := downloaderDBTx.GetOne(kv.BittorrentInfo, []byte(name)) + if err != nil { + return nil, err + } + + if len(infoBytes) == 20 { + return infoBytes, nil + } + + var info TorrentInfo + if err = json.Unmarshal(infoBytes, &info); err == nil { + return info.Hash, nil + } + return nil, nil +} + +func WriteTorrentInfo(tx kv.RwTx, info *TorrentInfo) error { + infoBytes, err := json.Marshal(info) + if err != nil { + return err + } + return tx.Put(kv.BittorrentInfo, []byte(info.Name), infoBytes) +} + +func CheckFileComplete(tx kv.Tx, name string, snapDir string) (bool, int64, *time.Time) { + info, err := ReadTorrentInfo(tx, name) + if err != nil { + return false, 0, nil + } + if info.Completed != nil && info.Completed.Before(time.Now()) { + if info.Length != nil { + if fi, err := os.Stat(filepath.Join(snapDir, name)); err == nil { + return fi.Size() == *info.Length && fi.ModTime().Equal(*info.Completed), *info.Length, info.Completed + } + } + } + return false, 0, nil +} diff --git a/erigon-lib/downloader/util.go b/erigon-lib/downloader/util.go index e47655fecab..5dc141ddbca 100644 --- a/erigon-lib/downloader/util.go +++ b/erigon-lib/downloader/util.go @@ -20,7 +20,6 @@ import ( "bytes" "context" "crypto/sha1" - "encoding/json" "errors" "fmt" "io" @@ -42,6 +41,7 @@ import ( "github.com/erigontech/erigon-lib/common/dbg" dir2 "github.com/erigontech/erigon-lib/common/dir" "github.com/erigontech/erigon-lib/downloader/downloadercfg" + "github.com/erigontech/erigon-lib/downloader/downloaderrawdb" "github.com/erigontech/erigon-lib/downloader/snaptype" "github.com/erigontech/erigon-lib/kv" "github.com/erigontech/erigon-lib/log/v3" @@ -67,14 +67,6 @@ var Trackers = [][]string{ //websocketTrackers // TODO: Ws protocol producing too many errors and flooding logs. But it's also very fast and reactive. } -type torrentInfo struct { - Name string `json:"name"` - Hash []byte `json:"hash"` - Length *int64 `json:"length,omitempty"` - Created *time.Time `json:"created,omitempty"` - Completed *time.Time `json:"completed,omitempty"` -} - func seedableSegmentFiles(dir string, chainName string, skipSeedableCheck bool) ([]string, error) { extensions := snaptype.SeedableV2Extensions() if skipSeedableCheck { @@ -388,16 +380,11 @@ func _addTorrentFile(ctx context.Context, ts *torrent.TorrentSpec, torrentClient func torrentInfoUpdater(fileName string, infoHash []byte, length int64, completionTime *time.Time) func(tx kv.RwTx) error { return func(tx kv.RwTx) error { - infoBytes, err := tx.GetOne(kv.BittorrentInfo, []byte(fileName)) - + info, err := downloaderrawdb.ReadTorrentInfo(tx, fileName) if err != nil { return err } - var info torrentInfo - - err = json.Unmarshal(infoBytes, &info) - changed := false if err != nil || (len(infoHash) > 0 && !bytes.Equal(info.Hash, infoHash)) { @@ -423,13 +410,7 @@ func torrentInfoUpdater(fileName string, infoHash []byte, length int64, completi return nil } - infoBytes, err = json.Marshal(info) - - if err != nil { - return err - } - - return tx.Put(kv.BittorrentInfo, []byte(fileName), infoBytes) + return downloaderrawdb.WriteTorrentInfo(tx, info) } } @@ -437,7 +418,7 @@ func torrentInfoReset(fileName string, infoHash []byte, length int64) func(tx kv return func(tx kv.RwTx) error { now := time.Now() - info := torrentInfo{ + info := downloaderrawdb.TorrentInfo{ Name: fileName, Hash: infoHash, Created: &now, @@ -446,14 +427,7 @@ func torrentInfoReset(fileName string, infoHash []byte, length int64) func(tx kv if length > 0 { info.Length = &length } - - infoBytes, err := json.Marshal(info) - - if err != nil { - return err - } - - return tx.Put(kv.BittorrentInfo, []byte(fileName), infoBytes) + return downloaderrawdb.WriteTorrentInfo(tx, &info) } } diff --git a/erigon-lib/kv/mdbx/kv_mdbx.go b/erigon-lib/kv/mdbx/kv_mdbx.go index 4bdfcd87c4d..ea51e6a82fd 100644 --- a/erigon-lib/kv/mdbx/kv_mdbx.go +++ b/erigon-lib/kv/mdbx/kv_mdbx.go @@ -1013,7 +1013,7 @@ func (tx *MdbxTx) CreateBucket(name string) error { dbi, err = tx.tx.OpenDBI(name, nativeFlags, nil, nil) if err != nil { - return fmt.Errorf("db-talbe doesn't exists: %s, %w. Tip: try run `integration run_migrations` to create non-existing tables", name, err) + return fmt.Errorf("db-talbe doesn't exists: %s, lable: %s, %w. Tip: try run `integration run_migrations` to create non-existing tables", name, tx.db.opts.label, err) } cnfCopy.DBI = kv.DBI(dbi) diff --git a/eth/backend.go b/eth/backend.go index c6353a4103a..45a079262da 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -39,23 +39,21 @@ import ( "github.com/erigontech/mdbx-go/mdbx" lru "github.com/hashicorp/golang-lru/arc/v2" "github.com/holiman/uint256" - "golang.org/x/sync/errgroup" "golang.org/x/sync/semaphore" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/protobuf/types/known/emptypb" - "github.com/erigontech/erigon-lib/common/dir" - "github.com/erigontech/erigon-lib/config3" - "github.com/erigontech/erigon-lib/chain" "github.com/erigontech/erigon-lib/chain/networkname" "github.com/erigontech/erigon-lib/chain/snapcfg" libcommon "github.com/erigontech/erigon-lib/common" "github.com/erigontech/erigon-lib/common/datadir" "github.com/erigontech/erigon-lib/common/dbg" + "github.com/erigontech/erigon-lib/common/dir" "github.com/erigontech/erigon-lib/common/disk" "github.com/erigontech/erigon-lib/common/mem" + "github.com/erigontech/erigon-lib/config3" "github.com/erigontech/erigon-lib/diagnostics" "github.com/erigontech/erigon-lib/direct" "github.com/erigontech/erigon-lib/downloader" @@ -1433,24 +1431,10 @@ func setUpBlockReader(ctx context.Context, db kv.RwDB, dirs datadir.Dirs, snConf } allSnapshots := freezeblocks.NewRoSnapshots(snConfig.Snapshot, dirs.Snap, minFrozenBlock, logger) - var allBorSnapshots *freezeblocks.BorRoSnapshots if isBor { allBorSnapshots = freezeblocks.NewBorRoSnapshots(snConfig.Snapshot, dirs.Snap, minFrozenBlock, logger) } - - g := &errgroup.Group{} - g.Go(func() error { - allSnapshots.OptimisticalyReopenFolder() - return nil - }) - g.Go(func() error { - if isBor { - allBorSnapshots.OptimisticalyReopenFolder() - } - return nil - }) - blockReader := freezeblocks.NewBlockReader(allSnapshots, allBorSnapshots) agg, err := libstate.NewAggregator(ctx, dirs, config3.HistoryV3AggregationStep, db, logger) if err != nil { @@ -1458,12 +1442,19 @@ func setUpBlockReader(ctx context.Context, db kv.RwDB, dirs datadir.Dirs, snConf } agg.SetProduceMod(snConfig.Snapshot.ProduceE3) - g.Go(func() error { - return agg.OpenFolder() - }) - if err = g.Wait(); err != nil { + allSegmentsDownloadComplete, err := rawdb.AllSegmentsDownloadCompleteFromDB(db) + if err != nil { return nil, nil, nil, nil, nil, err } + if allSegmentsDownloadComplete { + allSnapshots.OptimisticalyReopenFolder() + if isBor { + allBorSnapshots.OptimisticalyReopenFolder() + } + _ = agg.OpenFolder() + } else { + log.Warn("[rpc] download of segments not complete yet (need wait, then RPC will work)") + } blockWriter := blockio.NewBlockWriter() diff --git a/eth/stagedsync/stage_snapshots.go b/eth/stagedsync/stage_snapshots.go index df7a01f801e..6d0d0140c70 100644 --- a/eth/stagedsync/stage_snapshots.go +++ b/eth/stagedsync/stage_snapshots.go @@ -271,7 +271,7 @@ func DownloadAndIndexSnapshotsIfNeed(s *StageState, ctx context.Context, tx kv.R diagnostics.Send(diagnostics.CurrentSyncSubStage{SubStage: "Download header-chain"}) // Download only the snapshots that are for the header chain. - if err := snapshotsync.WaitForDownloader(ctx, s.LogPrefix() /*headerChain=*/, cfg.dirs, true, cfg.blobs, cfg.prune, cstate, cfg.agg, tx, cfg.blockReader, &cfg.chainConfig, cfg.snapshotDownloader, s.state.StagesIdsList()); err != nil { + if err := snapshotsync.WaitForDownloader(ctx, s.LogPrefix(), cfg.dirs, true /*headerChain=*/, cfg.blobs, cfg.prune, cstate, cfg.agg, tx, cfg.blockReader, &cfg.chainConfig, cfg.snapshotDownloader, s.state.StagesIdsList()); err != nil { return err } @@ -280,11 +280,9 @@ func DownloadAndIndexSnapshotsIfNeed(s *StageState, ctx context.Context, tx kv.R } diagnostics.Send(diagnostics.CurrentSyncSubStage{SubStage: "Download snapshots"}) - if err := snapshotsync.WaitForDownloader(ctx, s.LogPrefix() /*headerChain=*/, cfg.dirs, false, cfg.blobs, cfg.prune, cstate, cfg.agg, tx, cfg.blockReader, &cfg.chainConfig, cfg.snapshotDownloader, s.state.StagesIdsList()); err != nil { + if err := snapshotsync.WaitForDownloader(ctx, s.LogPrefix(), cfg.dirs, false /*headerChain=*/, cfg.blobs, cfg.prune, cstate, cfg.agg, tx, cfg.blockReader, &cfg.chainConfig, cfg.snapshotDownloader, s.state.StagesIdsList()); err != nil { return err } - - // It's ok to notify before tx.Commit(), because RPCDaemon does read list of files by gRPC (not by reading from db) if cfg.notifier.Events != nil { cfg.notifier.Events.OnNewSnapshot() } @@ -309,14 +307,16 @@ func DownloadAndIndexSnapshotsIfNeed(s *StageState, ctx context.Context, tx kv.R if err := cfg.agg.BuildMissedIndices(ctx, indexWorkers); err != nil { return err } - if cfg.notifier.Events != nil { - cfg.notifier.Events.OnNewSnapshot() - } if casted, ok := tx.(*temporal.Tx); ok { casted.ForceReopenAggCtx() // otherwise next stages will not see just-indexed-files } + // It's ok to notify before tx.Commit(), because RPCDaemon does read list of files by gRPC (not by reading from db) + if cfg.notifier.Events != nil { + cfg.notifier.Events.OnNewSnapshot() + } + frozenBlocks := cfg.blockReader.FrozenBlocks() if s.BlockNumber < frozenBlocks { // allow genesis if err := s.Update(tx, frozenBlocks); err != nil { diff --git a/turbo/snapshotsync/freezeblocks/block_snapshots.go b/turbo/snapshotsync/freezeblocks/block_snapshots.go index aabeb4d88e7..6815bf41e20 100644 --- a/turbo/snapshotsync/freezeblocks/block_snapshots.go +++ b/turbo/snapshotsync/freezeblocks/block_snapshots.go @@ -276,12 +276,13 @@ func (s *DirtySegment) isSubSetOf(j *DirtySegment) bool { } func (s *DirtySegment) reopenSeg(dir string) (err error) { - if s.refcount.Load() == 0 { - s.closeSeg() - s.Decompressor, err = seg.NewDecompressor(filepath.Join(dir, s.FileName())) - if err != nil { - return fmt.Errorf("%w, fileName: %s", err, s.FileName()) - } + if s.refcount.Load() > 0 { + return + } + s.closeSeg() + s.Decompressor, err = seg.NewDecompressor(filepath.Join(dir, s.FileName())) + if err != nil { + return fmt.Errorf("%w, fileName: %s", err, s.FileName()) } return nil } @@ -334,12 +335,7 @@ func (s *DirtySegment) openFiles() []string { } func (s *DirtySegment) reopenIdxIfNeed(dir string, optimistic bool) (err error) { - if len(s.Type().IdxFileNames(s.version, s.from, s.to)) == 0 { - return nil - } - err = s.reopenIdx(dir) - if err != nil { if !errors.Is(err, os.ErrNotExist) { if optimistic { @@ -857,7 +853,7 @@ func (s *RoSnapshots) ReopenFolder() error { s.dirtySegmentsLock.Lock() defer s.dirtySegmentsLock.Unlock() - files, _, err := typedSegments(s.dir, s.segmentsMin.Load(), s.Types(), false) + files, _, err := typedSegments(s.dir, s.Types(), false) if err != nil { return err } @@ -880,7 +876,7 @@ func (s *RoSnapshots) ReopenSegments(types []snaptype.Type, allowGaps bool) erro s.dirtySegmentsLock.Lock() defer s.dirtySegmentsLock.Unlock() - files, _, err := typedSegments(s.dir, s.segmentsMin.Load(), types, allowGaps) + files, _, err := typedSegments(s.dir, types, allowGaps) if err != nil { return err @@ -1302,10 +1298,10 @@ func SegmentsCaplin(dir string, minBlock uint64) (res []snaptype.FileInfo, missi } func Segments(dir string, minBlock uint64) (res []snaptype.FileInfo, missingSnapshots []Range, err error) { - return typedSegments(dir, minBlock, coresnaptype.BlockSnapshotTypes, true) + return typedSegments(dir, coresnaptype.BlockSnapshotTypes, true) } -func typedSegments(dir string, minBlock uint64, types []snaptype.Type, allowGaps bool) (res []snaptype.FileInfo, missingSnapshots []Range, err error) { +func typedSegments(dir string, types []snaptype.Type, allowGaps bool) (res []snaptype.FileInfo, missingSnapshots []Range, err error) { segmentsTypeCheck := func(dir string, in []snaptype.FileInfo) (res []snaptype.FileInfo) { return typeOfSegmentsMustExist(dir, in, types) } diff --git a/turbo/snapshotsync/freezeblocks/bor_snapshots.go b/turbo/snapshotsync/freezeblocks/bor_snapshots.go index fde041e8d7f..5ba8e4d2957 100644 --- a/turbo/snapshotsync/freezeblocks/bor_snapshots.go +++ b/turbo/snapshotsync/freezeblocks/bor_snapshots.go @@ -136,7 +136,7 @@ func (br *BlockRetire) retireBorBlocks(ctx context.Context, minBlockNum uint64, } { - files, _, err := typedSegments(br.borSnapshots().dir, br.borSnapshots().segmentsMin.Load(), borsnaptype.BorSnapshotTypes(), false) + files, _, err := typedSegments(br.borSnapshots().dir, borsnaptype.BorSnapshotTypes(), false) if err != nil { return blocksRetired, err } @@ -239,7 +239,7 @@ func removeBorOverlaps(dir string, active []snaptype.FileInfo, max uint64) { } func (s *BorRoSnapshots) ReopenFolder() error { - files, _, err := typedSegments(s.dir, s.segmentsMin.Load(), borsnaptype.BorSnapshotTypes(), false) + files, _, err := typedSegments(s.dir, borsnaptype.BorSnapshotTypes(), false) if err != nil { return err }