Skip to content

Commit

Permalink
txpool: switch db to durable mode - with fsync outside of pool's glob…
Browse files Browse the repository at this point in the history
…al lock
  • Loading branch information
AskAlexSharov authored and blxdyx committed Sep 13, 2023
1 parent a3b0611 commit 207659a
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 12 deletions.
21 changes: 18 additions & 3 deletions txpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -1439,12 +1439,11 @@ func MainLoop(ctx context.Context, db kv.RwDB, coreDB kv.RoDB, p *TxPool, newTxs
}
}

func (p *TxPool) flush(ctx context.Context, db kv.RwDB) (written uint64, err error) {
defer writeToDBTimer.UpdateDuration(time.Now())
func (p *TxPool) flushNoFsync(ctx context.Context, db kv.RwDB) (written uint64, err error) {
p.lock.Lock()
defer p.lock.Unlock()
//it's important that write db tx is done inside lock, to make last writes visible for all read operations
if err := db.Update(ctx, func(tx kv.RwTx) error {
if err := db.UpdateNosync(ctx, func(tx kv.RwTx) error {
err = p.flushLocked(tx)
if err != nil {
return err
Expand All @@ -1459,6 +1458,22 @@ func (p *TxPool) flush(ctx context.Context, db kv.RwDB) (written uint64, err err
}
return written, nil
}

func (p *TxPool) flush(ctx context.Context, db kv.RwDB) (written uint64, err error) {
defer writeToDBTimer.UpdateDuration(time.Now())
// 1. get global lock on txpool and flush it to db, without fsync (to release lock asap)
// 2. then fsync db without txpool lock
written, err = p.flushNoFsync(ctx, db)
if err != nil {
return 0, err
}

// fsync
if err := db.Update(ctx, func(tx kv.RwTx) error { return nil }); err != nil {
return 0, err
}
return written, nil
}
func (p *TxPool) flushLocked(tx kv.RwTx) (err error) {
for i, mt := range p.deletedTxs {
id := mt.Tx.SenderID
Expand Down
15 changes: 6 additions & 9 deletions txpool/txpooluitl/all_components.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2021 Erigon contributors
Copyright 2021 The Erigon contributors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -104,23 +104,20 @@ func AllComponents(ctx context.Context, cfg txpoolcfg.Config, cache kvcache.Cach
sentryClients []direct.SentryClient, stateChangesClient txpool.StateChangesClient, logger log.Logger) (kv.RwDB, *txpool.TxPool, *txpool.Fetch, *txpool.Send, *txpool.GrpcServer, error) {
opts := mdbx.NewMDBX(log.New()).Label(kv.TxPoolDB).Path(cfg.DBDir).
WithTableCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg { return kv.TxpoolTablesCfg }).
Flags(func(f uint) uint { return f ^ mdbx2.Durable | mdbx2.SafeNoSync }).
SyncPeriod(30 * time.Second)
Flags(func(f uint) uint { return f ^ mdbx2.Durable }).
WriteMergeThreshold(3 * 8192).
PageSize(uint64(16 * datasize.KB)).
GrowthStep(16 * datasize.MB).
MapSize(1 * datasize.TB)

if cfg.MdbxPageSize.Bytes() > 0 {
opts = opts.PageSize(cfg.MdbxPageSize.Bytes())
}

if cfg.MdbxDBSizeLimit > 0 {
opts = opts.MapSize(cfg.MdbxDBSizeLimit)
} else {
opts = opts.MapSize(1 * datasize.GB)

}
if cfg.MdbxGrowthStep > 0 {
opts = opts.GrowthStep(cfg.MdbxGrowthStep)
} else {
opts = opts.GrowthStep(16 * datasize.MB)
}

txPoolDB, err := opts.Open()
Expand Down

0 comments on commit 207659a

Please sign in to comment.