Skip to content
This repository has been archived by the owner on Sep 23, 2023. It is now read-only.

txpool: switch db to durable mode - with fsync outside of pool's global lock #1109

Merged
merged 1 commit into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions txpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -1589,12 +1589,11 @@ func MainLoop(ctx context.Context, db kv.RwDB, coreDB kv.RoDB, p *TxPool, newTxs
}
}

func (p *TxPool) flush(ctx context.Context, db kv.RwDB) (written uint64, err error) {
defer writeToDBTimer.UpdateDuration(time.Now())
func (p *TxPool) flushNoFsync(ctx context.Context, db kv.RwDB) (written uint64, err error) {
p.lock.Lock()
defer p.lock.Unlock()
//it's important that write db tx is done inside lock, to make last writes visible for all read operations
if err := db.Update(ctx, func(tx kv.RwTx) error {
if err := db.UpdateNosync(ctx, func(tx kv.RwTx) error {
err = p.flushLocked(tx)
if err != nil {
return err
Expand All @@ -1609,6 +1608,22 @@ func (p *TxPool) flush(ctx context.Context, db kv.RwDB) (written uint64, err err
}
return written, nil
}

func (p *TxPool) flush(ctx context.Context, db kv.RwDB) (written uint64, err error) {
defer writeToDBTimer.UpdateDuration(time.Now())
// 1. get global lock on txpool and flush it to db, without fsync (to release lock asap)
// 2. then fsync db without txpool lock
written, err = p.flushNoFsync(ctx, db)
if err != nil {
return 0, err
}

// fsync
if err := db.Update(ctx, func(tx kv.RwTx) error { return nil }); err != nil {
return 0, err
}
return written, nil
}
func (p *TxPool) flushLocked(tx kv.RwTx) (err error) {
for i, mt := range p.deletedTxs {
id := mt.Tx.SenderID
Expand Down
13 changes: 5 additions & 8 deletions txpool/txpooluitl/all_components.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,23 +104,20 @@ func AllComponents(ctx context.Context, cfg txpoolcfg.Config, cache kvcache.Cach
sentryClients []direct.SentryClient, stateChangesClient txpool.StateChangesClient, logger log.Logger) (kv.RwDB, *txpool.TxPool, *txpool.Fetch, *txpool.Send, *txpool.GrpcServer, error) {
opts := mdbx.NewMDBX(log.New()).Label(kv.TxPoolDB).Path(cfg.DBDir).
WithTableCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg { return kv.TxpoolTablesCfg }).
Flags(func(f uint) uint { return f ^ mdbx2.Durable | mdbx2.SafeNoSync }).
SyncPeriod(30 * time.Second)
Flags(func(f uint) uint { return f ^ mdbx2.Durable }).
WriteMergeThreshold(3 * 8192).
PageSize(uint64(16 * datasize.KB)).
GrowthStep(16 * datasize.MB).
MapSize(1 * datasize.TB)

if cfg.MdbxPageSize.Bytes() > 0 {
opts = opts.PageSize(cfg.MdbxPageSize.Bytes())
}

if cfg.MdbxDBSizeLimit > 0 {
opts = opts.MapSize(cfg.MdbxDBSizeLimit)
} else {
opts = opts.MapSize(1 * datasize.TB)

}
if cfg.MdbxGrowthStep > 0 {
opts = opts.GrowthStep(cfg.MdbxGrowthStep)
} else {
opts = opts.GrowthStep(16 * datasize.MB)
}

txPoolDB, err := opts.Open()
Expand Down
Loading