From 31687f4ce91119745b7bac6175ad395ba09dcde3 Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Wed, 6 Sep 2023 14:26:36 +0700 Subject: [PATCH] txpool: switch db to durable mode - with fsync outside of pool's global lock (#1109) --- txpool/pool.go | 21 ++++++++++++++++++--- txpool/txpooluitl/all_components.go | 13 +++++-------- 2 files changed, 23 insertions(+), 11 deletions(-) diff --git a/txpool/pool.go b/txpool/pool.go index f6c89d5bc..989f86b4b 100644 --- a/txpool/pool.go +++ b/txpool/pool.go @@ -1589,12 +1589,11 @@ func MainLoop(ctx context.Context, db kv.RwDB, coreDB kv.RoDB, p *TxPool, newTxs } } -func (p *TxPool) flush(ctx context.Context, db kv.RwDB) (written uint64, err error) { - defer writeToDBTimer.UpdateDuration(time.Now()) +func (p *TxPool) flushNoFsync(ctx context.Context, db kv.RwDB) (written uint64, err error) { p.lock.Lock() defer p.lock.Unlock() //it's important that write db tx is done inside lock, to make last writes visible for all read operations - if err := db.Update(ctx, func(tx kv.RwTx) error { + if err := db.UpdateNosync(ctx, func(tx kv.RwTx) error { err = p.flushLocked(tx) if err != nil { return err @@ -1609,6 +1608,22 @@ func (p *TxPool) flush(ctx context.Context, db kv.RwDB) (written uint64, err err } return written, nil } + +func (p *TxPool) flush(ctx context.Context, db kv.RwDB) (written uint64, err error) { + defer writeToDBTimer.UpdateDuration(time.Now()) + // 1. get global lock on txpool and flush it to db, without fsync (to release lock asap) + // 2. then fsync db without txpool lock + written, err = p.flushNoFsync(ctx, db) + if err != nil { + return 0, err + } + + // fsync + if err := db.Update(ctx, func(tx kv.RwTx) error { return nil }); err != nil { + return 0, err + } + return written, nil +} func (p *TxPool) flushLocked(tx kv.RwTx) (err error) { for i, mt := range p.deletedTxs { id := mt.Tx.SenderID diff --git a/txpool/txpooluitl/all_components.go b/txpool/txpooluitl/all_components.go index fb3d0d666..e831841d8 100644 --- a/txpool/txpooluitl/all_components.go +++ b/txpool/txpooluitl/all_components.go @@ -104,23 +104,20 @@ func AllComponents(ctx context.Context, cfg txpoolcfg.Config, cache kvcache.Cach sentryClients []direct.SentryClient, stateChangesClient txpool.StateChangesClient, logger log.Logger) (kv.RwDB, *txpool.TxPool, *txpool.Fetch, *txpool.Send, *txpool.GrpcServer, error) { opts := mdbx.NewMDBX(log.New()).Label(kv.TxPoolDB).Path(cfg.DBDir). WithTableCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg { return kv.TxpoolTablesCfg }). - Flags(func(f uint) uint { return f ^ mdbx2.Durable | mdbx2.SafeNoSync }). - SyncPeriod(30 * time.Second) + Flags(func(f uint) uint { return f ^ mdbx2.Durable }). + WriteMergeThreshold(3 * 8192). + PageSize(uint64(16 * datasize.KB)). + GrowthStep(16 * datasize.MB). + MapSize(1 * datasize.TB) if cfg.MdbxPageSize.Bytes() > 0 { opts = opts.PageSize(cfg.MdbxPageSize.Bytes()) } - if cfg.MdbxDBSizeLimit > 0 { opts = opts.MapSize(cfg.MdbxDBSizeLimit) - } else { - opts = opts.MapSize(1 * datasize.TB) - } if cfg.MdbxGrowthStep > 0 { opts = opts.GrowthStep(cfg.MdbxGrowthStep) - } else { - opts = opts.GrowthStep(16 * datasize.MB) } txPoolDB, err := opts.Open()