From e77827a4306662f07a3dbf92e8d98daf419b2fd4 Mon Sep 17 00:00:00 2001 From: "alex.sharov" Date: Mon, 18 Sep 2023 10:20:38 +0700 Subject: [PATCH] save --- txpool/pool.go | 55 +++++++++++++++++++++++++------------------------- 1 file changed, 27 insertions(+), 28 deletions(-) diff --git a/txpool/pool.go b/txpool/pool.go index 520650ff6..1c39b28bb 100644 --- a/txpool/pool.go +++ b/txpool/pool.go @@ -293,40 +293,40 @@ func New(newTxs chan types.Announcements, coreDB kv.RoDB, cfg txpoolcfg.Config, } func (p *TxPool) OnNewBlock(ctx context.Context, stateChanges *remote.StateChangeBatch, unwindTxs, minedTxs types.TxSlots, tx kv.Tx) error { + if err := minedTxs.Valid(); err != nil { + return err + } + defer newBlockTimer.UpdateDuration(time.Now()) //t := time.Now() - cache := p.cache() + coreDB, cache := p.coreDBWithCache() cache.OnNewBlock(stateChanges) - coreTx, err := p.coreDB().BeginRo(ctx) + coreTx, err := coreDB.BeginRo(ctx) if err != nil { return err } defer coreTx.Rollback() - p.lock.Lock() - defer p.lock.Unlock() - p.lastSeenBlock.Store(stateChanges.ChangeBatch[len(stateChanges.ChangeBatch)-1].BlockHeight) if !p.started.Load() { - if err := p.fromDB(ctx, tx, coreTx); err != nil { + if err := p.fromDBWithLock(ctx, tx, coreTx); err != nil { return fmt.Errorf("OnNewBlock: loading txs from DB: %w", err) } } - cacheView, err := cache.View(ctx, coreTx) if err != nil { return err } + + p.lock.Lock() + defer p.lock.Unlock() + if assert.Enable { if _, err := kvcache.AssertCheckValues(ctx, coreTx, cache); err != nil { p.logger.Error("AssertCheckValues", "err", err, "stack", stack.Trace().String()) } } - - if err := minedTxs.Valid(); err != nil { - return err - } baseFee := stateChanges.PendingBlockBaseFee pendingBaseFee, baseFeeChanged := p.setBaseFee(baseFee) @@ -406,9 +406,9 @@ func (p *TxPool) processRemoteTxs(ctx context.Context) error { return fmt.Errorf("txpool not started yet") } - cache := p.cache() defer processBatchTxsTimer.UpdateDuration(time.Now()) - coreTx, err := p.coreDB().BeginRo(ctx) + coreDB, cache := p.coreDBWithCache() + coreTx, err := coreDB.BeginRo(ctx) if err != nil { return err } @@ -599,9 +599,6 @@ func (p *TxPool) AddNewGoodPeer(peerID types.PeerID) { p.recentlyConnectedPeers. func (p *TxPool) Started() bool { return p.started.Load() } func (p *TxPool) best(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf, availableGas, availableBlobGas uint64, toSkip mapset.Set[[32]byte]) (bool, int, error) { - p.lock.Lock() - defer p.lock.Unlock() - // First wait for the corresponding block to arrive if p.lastSeenBlock.Load() < onTopOf { return false, 0, nil // Too early @@ -683,11 +680,15 @@ func (p *TxPool) ResetYieldedStatus() { } func (p *TxPool) YieldBest(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf, availableGas, availableBlobGas uint64, toSkip mapset.Set[[32]byte]) (bool, int, error) { + p.lock.Lock() + defer p.lock.Unlock() return p.best(n, txs, tx, onTopOf, availableGas, availableBlobGas, toSkip) } func (p *TxPool) PeekBest(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf, availableGas, availableBlobGas uint64) (bool, error) { set := mapset.NewThreadUnsafeSet[[32]byte]() + p.lock.Lock() + defer p.lock.Unlock() onTime, _, err := p.best(n, txs, tx, onTopOf, availableGas, availableBlobGas, set) return onTime, err } @@ -1004,13 +1005,14 @@ func fillDiscardReasons(reasons []txpoolcfg.DiscardReason, newTxs types.TxSlots, } func (p *TxPool) AddLocalTxs(ctx context.Context, newTransactions types.TxSlots, tx kv.Tx) ([]txpoolcfg.DiscardReason, error) { - coreTx, err := p.coreDB().BeginRo(ctx) + coreDb, cache := p.coreDBWithCache() + coreTx, err := coreDb.BeginRo(ctx) if err != nil { return nil, err } defer coreTx.Rollback() - cacheView, err := p.cache().View(ctx, coreTx) + cacheView, err := cache.View(ctx, coreTx) if err != nil { return nil, err } @@ -1068,19 +1070,11 @@ func (p *TxPool) AddLocalTxs(ctx context.Context, newTransactions types.TxSlots, } return reasons, nil } - -func (p *TxPool) coreDB() kv.RoDB { +func (p *TxPool) coreDBWithCache() (kv.RoDB, kvcache.Cache) { p.lock.Lock() defer p.lock.Unlock() - return p._chainDB + return p._chainDB, p._stateCache } - -func (p *TxPool) cache() kvcache.Cache { - p.lock.Lock() - defer p.lock.Unlock() - return p._stateCache -} - func addTxs(blockNum uint64, cacheView kvcache.CacheView, senders *sendersBatch, newTxs types.TxSlots, pendingBaseFee, blockGasLimit uint64, pending *PendingPool, baseFee, queued *SubPool, @@ -1863,6 +1857,11 @@ func (p *TxPool) flushLocked(tx kv.RwTx) (err error) { return nil } +func (p *TxPool) fromDBWithLock(ctx context.Context, tx kv.Tx, coreTx kv.Tx) error { + p.lock.Lock() + defer p.lock.Unlock() + return p.fromDB(ctx, tx, coreTx) +} func (p *TxPool) fromDB(ctx context.Context, tx kv.Tx, coreTx kv.Tx) error { if p.lastSeenBlock.Load() == 0 { lastSeenBlock, err := LastSeenBlock(tx)