Skip to content
This repository has been archived by the owner on Sep 23, 2023. It is now read-only.

Commit

Permalink
save
Browse files Browse the repository at this point in the history
  • Loading branch information
AskAlexSharov committed Sep 18, 2023
1 parent 60db705 commit e77827a
Showing 1 changed file with 27 additions and 28 deletions.
55 changes: 27 additions & 28 deletions txpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,40 +293,40 @@ func New(newTxs chan types.Announcements, coreDB kv.RoDB, cfg txpoolcfg.Config,
}

func (p *TxPool) OnNewBlock(ctx context.Context, stateChanges *remote.StateChangeBatch, unwindTxs, minedTxs types.TxSlots, tx kv.Tx) error {
if err := minedTxs.Valid(); err != nil {
return err
}

defer newBlockTimer.UpdateDuration(time.Now())
//t := time.Now()

cache := p.cache()
coreDB, cache := p.coreDBWithCache()
cache.OnNewBlock(stateChanges)
coreTx, err := p.coreDB().BeginRo(ctx)
coreTx, err := coreDB.BeginRo(ctx)
if err != nil {
return err
}
defer coreTx.Rollback()

p.lock.Lock()
defer p.lock.Unlock()

p.lastSeenBlock.Store(stateChanges.ChangeBatch[len(stateChanges.ChangeBatch)-1].BlockHeight)
if !p.started.Load() {
if err := p.fromDB(ctx, tx, coreTx); err != nil {
if err := p.fromDBWithLock(ctx, tx, coreTx); err != nil {
return fmt.Errorf("OnNewBlock: loading txs from DB: %w", err)
}
}

cacheView, err := cache.View(ctx, coreTx)
if err != nil {
return err
}

p.lock.Lock()
defer p.lock.Unlock()

if assert.Enable {
if _, err := kvcache.AssertCheckValues(ctx, coreTx, cache); err != nil {
p.logger.Error("AssertCheckValues", "err", err, "stack", stack.Trace().String())
}
}

if err := minedTxs.Valid(); err != nil {
return err
}
baseFee := stateChanges.PendingBlockBaseFee

pendingBaseFee, baseFeeChanged := p.setBaseFee(baseFee)
Expand Down Expand Up @@ -406,9 +406,9 @@ func (p *TxPool) processRemoteTxs(ctx context.Context) error {
return fmt.Errorf("txpool not started yet")
}

cache := p.cache()
defer processBatchTxsTimer.UpdateDuration(time.Now())
coreTx, err := p.coreDB().BeginRo(ctx)
coreDB, cache := p.coreDBWithCache()
coreTx, err := coreDB.BeginRo(ctx)
if err != nil {
return err
}
Expand Down Expand Up @@ -599,9 +599,6 @@ func (p *TxPool) AddNewGoodPeer(peerID types.PeerID) { p.recentlyConnectedPeers.
func (p *TxPool) Started() bool { return p.started.Load() }

func (p *TxPool) best(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf, availableGas, availableBlobGas uint64, toSkip mapset.Set[[32]byte]) (bool, int, error) {
p.lock.Lock()
defer p.lock.Unlock()

// First wait for the corresponding block to arrive
if p.lastSeenBlock.Load() < onTopOf {
return false, 0, nil // Too early
Expand Down Expand Up @@ -683,11 +680,15 @@ func (p *TxPool) ResetYieldedStatus() {
}

func (p *TxPool) YieldBest(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf, availableGas, availableBlobGas uint64, toSkip mapset.Set[[32]byte]) (bool, int, error) {
p.lock.Lock()
defer p.lock.Unlock()
return p.best(n, txs, tx, onTopOf, availableGas, availableBlobGas, toSkip)
}

func (p *TxPool) PeekBest(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf, availableGas, availableBlobGas uint64) (bool, error) {
set := mapset.NewThreadUnsafeSet[[32]byte]()
p.lock.Lock()
defer p.lock.Unlock()
onTime, _, err := p.best(n, txs, tx, onTopOf, availableGas, availableBlobGas, set)
return onTime, err
}
Expand Down Expand Up @@ -1004,13 +1005,14 @@ func fillDiscardReasons(reasons []txpoolcfg.DiscardReason, newTxs types.TxSlots,
}

func (p *TxPool) AddLocalTxs(ctx context.Context, newTransactions types.TxSlots, tx kv.Tx) ([]txpoolcfg.DiscardReason, error) {
coreTx, err := p.coreDB().BeginRo(ctx)
coreDb, cache := p.coreDBWithCache()
coreTx, err := coreDb.BeginRo(ctx)
if err != nil {
return nil, err
}
defer coreTx.Rollback()

cacheView, err := p.cache().View(ctx, coreTx)
cacheView, err := cache.View(ctx, coreTx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1068,19 +1070,11 @@ func (p *TxPool) AddLocalTxs(ctx context.Context, newTransactions types.TxSlots,
}
return reasons, nil
}

func (p *TxPool) coreDB() kv.RoDB {
func (p *TxPool) coreDBWithCache() (kv.RoDB, kvcache.Cache) {
p.lock.Lock()
defer p.lock.Unlock()
return p._chainDB
return p._chainDB, p._stateCache
}

func (p *TxPool) cache() kvcache.Cache {
p.lock.Lock()
defer p.lock.Unlock()
return p._stateCache
}

func addTxs(blockNum uint64, cacheView kvcache.CacheView, senders *sendersBatch,
newTxs types.TxSlots, pendingBaseFee, blockGasLimit uint64,
pending *PendingPool, baseFee, queued *SubPool,
Expand Down Expand Up @@ -1863,6 +1857,11 @@ func (p *TxPool) flushLocked(tx kv.RwTx) (err error) {
return nil
}

func (p *TxPool) fromDBWithLock(ctx context.Context, tx kv.Tx, coreTx kv.Tx) error {
p.lock.Lock()
defer p.lock.Unlock()
return p.fromDB(ctx, tx, coreTx)
}
func (p *TxPool) fromDB(ctx context.Context, tx kv.Tx, coreTx kv.Tx) error {
if p.lastSeenBlock.Load() == 0 {
lastSeenBlock, err := LastSeenBlock(tx)
Expand Down

0 comments on commit e77827a

Please sign in to comment.