Skip to content
This repository has been archived by the owner on Sep 23, 2023. It is now read-only.

Commit

Permalink
txpool: switch db to durable mode - with fsync outside of pool's glob…
Browse files Browse the repository at this point in the history
…al lock (#1109)
  • Loading branch information
AskAlexSharov authored Sep 6, 2023
1 parent f30a800 commit 31687f4
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 11 deletions.
21 changes: 18 additions & 3 deletions txpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -1589,12 +1589,11 @@ func MainLoop(ctx context.Context, db kv.RwDB, coreDB kv.RoDB, p *TxPool, newTxs
}
}

func (p *TxPool) flush(ctx context.Context, db kv.RwDB) (written uint64, err error) {
defer writeToDBTimer.UpdateDuration(time.Now())
func (p *TxPool) flushNoFsync(ctx context.Context, db kv.RwDB) (written uint64, err error) {
p.lock.Lock()
defer p.lock.Unlock()
//it's important that write db tx is done inside lock, to make last writes visible for all read operations
if err := db.Update(ctx, func(tx kv.RwTx) error {
if err := db.UpdateNosync(ctx, func(tx kv.RwTx) error {
err = p.flushLocked(tx)
if err != nil {
return err
Expand All @@ -1609,6 +1608,22 @@ func (p *TxPool) flush(ctx context.Context, db kv.RwDB) (written uint64, err err
}
return written, nil
}

func (p *TxPool) flush(ctx context.Context, db kv.RwDB) (written uint64, err error) {
defer writeToDBTimer.UpdateDuration(time.Now())
// 1. get global lock on txpool and flush it to db, without fsync (to release lock asap)
// 2. then fsync db without txpool lock
written, err = p.flushNoFsync(ctx, db)
if err != nil {
return 0, err
}

// fsync
if err := db.Update(ctx, func(tx kv.RwTx) error { return nil }); err != nil {
return 0, err
}
return written, nil
}
func (p *TxPool) flushLocked(tx kv.RwTx) (err error) {
for i, mt := range p.deletedTxs {
id := mt.Tx.SenderID
Expand Down
13 changes: 5 additions & 8 deletions txpool/txpooluitl/all_components.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,23 +104,20 @@ func AllComponents(ctx context.Context, cfg txpoolcfg.Config, cache kvcache.Cach
sentryClients []direct.SentryClient, stateChangesClient txpool.StateChangesClient, logger log.Logger) (kv.RwDB, *txpool.TxPool, *txpool.Fetch, *txpool.Send, *txpool.GrpcServer, error) {
opts := mdbx.NewMDBX(log.New()).Label(kv.TxPoolDB).Path(cfg.DBDir).
WithTableCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg { return kv.TxpoolTablesCfg }).
Flags(func(f uint) uint { return f ^ mdbx2.Durable | mdbx2.SafeNoSync }).
SyncPeriod(30 * time.Second)
Flags(func(f uint) uint { return f ^ mdbx2.Durable }).
WriteMergeThreshold(3 * 8192).
PageSize(uint64(16 * datasize.KB)).
GrowthStep(16 * datasize.MB).
MapSize(1 * datasize.TB)

if cfg.MdbxPageSize.Bytes() > 0 {
opts = opts.PageSize(cfg.MdbxPageSize.Bytes())
}

if cfg.MdbxDBSizeLimit > 0 {
opts = opts.MapSize(cfg.MdbxDBSizeLimit)
} else {
opts = opts.MapSize(1 * datasize.TB)

}
if cfg.MdbxGrowthStep > 0 {
opts = opts.GrowthStep(cfg.MdbxGrowthStep)
} else {
opts = opts.GrowthStep(16 * datasize.MB)
}

txPoolDB, err := opts.Open()
Expand Down

0 comments on commit 31687f4

Please sign in to comment.