diff --git a/gointerfaces/downloader/downloader.pb.go b/gointerfaces/downloader/downloader.pb.go index 8cef15ac6..773282e31 100644 --- a/gointerfaces/downloader/downloader.pb.go +++ b/gointerfaces/downloader/downloader.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.30.0 -// protoc v4.23.3 +// protoc-gen-go v1.31.0 +// protoc v4.24.2 // source: downloader/downloader.proto package downloader diff --git a/gointerfaces/downloader/downloader_grpc.pb.go b/gointerfaces/downloader/downloader_grpc.pb.go index 8a6a60a7d..831743bbc 100644 --- a/gointerfaces/downloader/downloader_grpc.pb.go +++ b/gointerfaces/downloader/downloader_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.3.0 -// - protoc v4.23.3 +// - protoc v4.24.2 // source: downloader/downloader.proto package downloader diff --git a/gointerfaces/execution/execution.pb.go b/gointerfaces/execution/execution.pb.go index 60dcef584..5c2effd51 100644 --- a/gointerfaces/execution/execution.pb.go +++ b/gointerfaces/execution/execution.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.30.0 -// protoc v4.23.3 +// protoc-gen-go v1.31.0 +// protoc v4.24.2 // source: execution/execution.proto package execution diff --git a/gointerfaces/execution/execution_grpc.pb.go b/gointerfaces/execution/execution_grpc.pb.go index 9faac8573..b3779a0b1 100644 --- a/gointerfaces/execution/execution_grpc.pb.go +++ b/gointerfaces/execution/execution_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.3.0 -// - protoc v4.23.3 +// - protoc v4.24.2 // source: execution/execution.proto package execution diff --git a/gointerfaces/remote/ethbackend.pb.go b/gointerfaces/remote/ethbackend.pb.go index ac0a099c6..118a3f763 100644 --- a/gointerfaces/remote/ethbackend.pb.go +++ b/gointerfaces/remote/ethbackend.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.30.0 -// protoc v4.23.3 +// protoc-gen-go v1.31.0 +// protoc v4.24.2 // source: remote/ethbackend.proto package remote diff --git a/gointerfaces/remote/ethbackend_grpc.pb.go b/gointerfaces/remote/ethbackend_grpc.pb.go index 8e986e082..4a410a32b 100644 --- a/gointerfaces/remote/ethbackend_grpc.pb.go +++ b/gointerfaces/remote/ethbackend_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.3.0 -// - protoc v4.23.3 +// - protoc v4.24.2 // source: remote/ethbackend.proto package remote diff --git a/gointerfaces/remote/kv.pb.go b/gointerfaces/remote/kv.pb.go index 6dd2f965e..b5ac8e64a 100644 --- a/gointerfaces/remote/kv.pb.go +++ b/gointerfaces/remote/kv.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.30.0 -// protoc v4.23.3 +// protoc-gen-go v1.31.0 +// protoc v4.24.2 // source: remote/kv.proto package remote diff --git a/gointerfaces/remote/kv_grpc.pb.go b/gointerfaces/remote/kv_grpc.pb.go index eb32cbf39..d0305cb0f 100644 --- a/gointerfaces/remote/kv_grpc.pb.go +++ b/gointerfaces/remote/kv_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.3.0 -// - protoc v4.23.3 +// - protoc v4.24.2 // source: remote/kv.proto package remote diff --git a/gointerfaces/remote/mocks.go b/gointerfaces/remote/mocks.go index 24e98be04..8300eb434 100644 --- a/gointerfaces/remote/mocks.go +++ b/gointerfaces/remote/mocks.go @@ -650,10 +650,10 @@ var _ KV_StateChangesClient = &KV_StateChangesClientMock{} // RecvFunc: func() (*StateChangeBatch, error) { // panic("mock out the Recv method") // }, -// RecvMsgFunc: func(m interface{}) error { +// RecvMsgFunc: func(m any) error { // panic("mock out the RecvMsg method") // }, -// SendMsgFunc: func(m interface{}) error { +// SendMsgFunc: func(m any) error { // panic("mock out the SendMsg method") // }, // TrailerFunc: func() metadata.MD { @@ -679,10 +679,10 @@ type KV_StateChangesClientMock struct { RecvFunc func() (*StateChangeBatch, error) // RecvMsgFunc mocks the RecvMsg method. - RecvMsgFunc func(m interface{}) error + RecvMsgFunc func(m any) error // SendMsgFunc mocks the SendMsg method. - SendMsgFunc func(m interface{}) error + SendMsgFunc func(m any) error // TrailerFunc mocks the Trailer method. TrailerFunc func() metadata.MD @@ -704,12 +704,12 @@ type KV_StateChangesClientMock struct { // RecvMsg holds details about calls to the RecvMsg method. RecvMsg []struct { // M is the m argument value. - M interface{} + M any } // SendMsg holds details about calls to the SendMsg method. SendMsg []struct { // M is the m argument value. - M interface{} + M any } // Trailer holds details about calls to the Trailer method. Trailer []struct { @@ -847,9 +847,9 @@ func (mock *KV_StateChangesClientMock) RecvCalls() []struct { } // RecvMsg calls RecvMsgFunc. -func (mock *KV_StateChangesClientMock) RecvMsg(m interface{}) error { +func (mock *KV_StateChangesClientMock) RecvMsg(m any) error { callInfo := struct { - M interface{} + M any }{ M: m, } @@ -870,10 +870,10 @@ func (mock *KV_StateChangesClientMock) RecvMsg(m interface{}) error { // // len(mockedKV_StateChangesClient.RecvMsgCalls()) func (mock *KV_StateChangesClientMock) RecvMsgCalls() []struct { - M interface{} + M any } { var calls []struct { - M interface{} + M any } mock.lockRecvMsg.RLock() calls = mock.calls.RecvMsg @@ -882,9 +882,9 @@ func (mock *KV_StateChangesClientMock) RecvMsgCalls() []struct { } // SendMsg calls SendMsgFunc. -func (mock *KV_StateChangesClientMock) SendMsg(m interface{}) error { +func (mock *KV_StateChangesClientMock) SendMsg(m any) error { callInfo := struct { - M interface{} + M any }{ M: m, } @@ -905,10 +905,10 @@ func (mock *KV_StateChangesClientMock) SendMsg(m interface{}) error { // // len(mockedKV_StateChangesClient.SendMsgCalls()) func (mock *KV_StateChangesClientMock) SendMsgCalls() []struct { - M interface{} + M any } { var calls []struct { - M interface{} + M any } mock.lockSendMsg.RLock() calls = mock.calls.SendMsg diff --git a/gointerfaces/sentinel/sentinel.pb.go b/gointerfaces/sentinel/sentinel.pb.go index 0e8be2e06..608597e7f 100644 --- a/gointerfaces/sentinel/sentinel.pb.go +++ b/gointerfaces/sentinel/sentinel.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.30.0 -// protoc v4.23.3 +// protoc-gen-go v1.31.0 +// protoc v4.24.2 // source: p2psentinel/sentinel.proto package sentinel diff --git a/gointerfaces/sentinel/sentinel_grpc.pb.go b/gointerfaces/sentinel/sentinel_grpc.pb.go index 13052e192..a62786b60 100644 --- a/gointerfaces/sentinel/sentinel_grpc.pb.go +++ b/gointerfaces/sentinel/sentinel_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.3.0 -// - protoc v4.23.3 +// - protoc v4.24.2 // source: p2psentinel/sentinel.proto package sentinel diff --git a/gointerfaces/sentry/sentry.pb.go b/gointerfaces/sentry/sentry.pb.go index 0e43453fd..87710f442 100644 --- a/gointerfaces/sentry/sentry.pb.go +++ b/gointerfaces/sentry/sentry.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.30.0 -// protoc v4.23.3 +// protoc-gen-go v1.31.0 +// protoc v4.24.2 // source: p2psentry/sentry.proto package sentry diff --git a/gointerfaces/sentry/sentry_grpc.pb.go b/gointerfaces/sentry/sentry_grpc.pb.go index 7802cf4fd..1a9d1959b 100644 --- a/gointerfaces/sentry/sentry_grpc.pb.go +++ b/gointerfaces/sentry/sentry_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.3.0 -// - protoc v4.23.3 +// - protoc v4.24.2 // source: p2psentry/sentry.proto package sentry diff --git a/gointerfaces/txpool/mining.pb.go b/gointerfaces/txpool/mining.pb.go index deacde3e6..20b3e0bd7 100644 --- a/gointerfaces/txpool/mining.pb.go +++ b/gointerfaces/txpool/mining.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.30.0 -// protoc v4.23.3 +// protoc-gen-go v1.31.0 +// protoc v4.24.2 // source: txpool/mining.proto package txpool diff --git a/gointerfaces/txpool/mining_grpc.pb.go b/gointerfaces/txpool/mining_grpc.pb.go index c2054b4e1..d0465eb5f 100644 --- a/gointerfaces/txpool/mining_grpc.pb.go +++ b/gointerfaces/txpool/mining_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.3.0 -// - protoc v4.23.3 +// - protoc v4.24.2 // source: txpool/mining.proto package txpool diff --git a/gointerfaces/txpool/txpool.pb.go b/gointerfaces/txpool/txpool.pb.go index 65b061e9a..52b9b02de 100644 --- a/gointerfaces/txpool/txpool.pb.go +++ b/gointerfaces/txpool/txpool.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.30.0 -// protoc v4.23.3 +// protoc-gen-go v1.31.0 +// protoc v4.24.2 // source: txpool/txpool.proto package txpool diff --git a/gointerfaces/txpool/txpool_grpc.pb.go b/gointerfaces/txpool/txpool_grpc.pb.go index a1ae12fc0..d8c6da0d0 100644 --- a/gointerfaces/txpool/txpool_grpc.pb.go +++ b/gointerfaces/txpool/txpool_grpc.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.3.0 -// - protoc v4.23.3 +// - protoc v4.24.2 // source: txpool/txpool.proto package txpool diff --git a/gointerfaces/types/types.pb.go b/gointerfaces/types/types.pb.go index 088bbfb73..adae72de7 100644 --- a/gointerfaces/types/types.pb.go +++ b/gointerfaces/types/types.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.30.0 -// protoc v4.23.3 +// protoc-gen-go v1.31.0 +// protoc v4.24.2 // source: types/types.proto package types diff --git a/txpool/fetch.go b/txpool/fetch.go index a9b24a6db..ecc9797b7 100644 --- a/txpool/fetch.go +++ b/txpool/fetch.go @@ -222,20 +222,15 @@ func (f *Fetch) handleInboundMessage(ctx context.Context, req *sentry.InboundMes if err != nil { return fmt.Errorf("parsing NewPooledTransactionHashes: %w", err) } - var hashbuf [32]byte - var unknownHashes types2.Hashes - for i := 0; i < hashCount; i++ { - _, pos, err = types2.ParseHash(req.Data, pos, hashbuf[:0]) - if err != nil { - return fmt.Errorf("parsing NewPooledTransactionHashes: %w", err) - } - known, err := f.pool.IdHashKnown(tx, hashbuf[:]) - if err != nil { + hashes := make([]byte, 32*hashCount) + for i := 0; i < len(hashes); i += 32 { + if _, pos, err = types2.ParseHash(req.Data, pos, hashes[i:]); err != nil { return err } - if !known { - unknownHashes = append(unknownHashes, hashbuf[:]...) - } + } + unknownHashes, err := f.pool.FilterKnownIdHashes(tx, hashes) + if err != nil { + return err } if len(unknownHashes) > 0 { var encodedRequest []byte @@ -256,15 +251,9 @@ func (f *Fetch) handleInboundMessage(ctx context.Context, req *sentry.InboundMes if err != nil { return fmt.Errorf("parsing NewPooledTransactionHashes88: %w", err) } - var unknownHashes types2.Hashes - for i := 0; i < len(hashes); i += 32 { - known, err := f.pool.IdHashKnown(tx, hashes[i:i+32]) - if err != nil { - return err - } - if !known { - unknownHashes = append(unknownHashes, hashes[i:i+32]...) - } + unknownHashes, err := f.pool.FilterKnownIdHashes(tx, hashes) + if err != nil { + return err } if len(unknownHashes) > 0 { @@ -482,7 +471,10 @@ func (f *Fetch) handleStateChanges(ctx context.Context, client StateChangesClien if err = f.threadSafeParseStateChangeTxn(func(parseContext *types2.TxParseContext) error { _, err = parseContext.ParseTransaction(change.Txs[i], 0, unwindTxs.Txs[i], unwindTxs.Senders.At(i), false /* hasEnvelope */, false /* wrappedWithBlobs */, nil) if unwindTxs.Txs[i].Type == types2.BlobTxType { - knownBlobTxn := f.pool.GetKnownBlobTxn(tx, unwindTxs.Txs[i].IDHash[:]) + knownBlobTxn, err := f.pool.GetKnownBlobTxn(tx, unwindTxs.Txs[i].IDHash[:]) + if err != nil { + return err + } if knownBlobTxn != nil { unwindTxs.Txs[i] = knownBlobTxn.Tx } diff --git a/txpool/mocks_test.go b/txpool/mocks_test.go index 78140c64c..22e8e8121 100644 --- a/txpool/mocks_test.go +++ b/txpool/mocks_test.go @@ -31,7 +31,10 @@ var _ Pool = &PoolMock{} // AddRemoteTxsFunc: func(ctx context.Context, newTxs types2.TxSlots) { // panic("mock out the AddRemoteTxs method") // }, -// GetKnownBlobTxnFunc: func(tx kv.Tx, hash []byte) *metaTx { +// FilterKnownIdHashesFunc: func(tx kv.Tx, hashes types2.Hashes) (types2.Hashes, error) { +// panic("mock out the FilterKnownIdHashes method") +// }, +// GetKnownBlobTxnFunc: func(tx kv.Tx, hash []byte) (*metaTx, error) { // panic("mock out the GetKnownBlobTxn method") // }, // GetRlpFunc: func(tx kv.Tx, hash []byte) ([]byte, error) { @@ -65,8 +68,11 @@ type PoolMock struct { // AddRemoteTxsFunc mocks the AddRemoteTxs method. AddRemoteTxsFunc func(ctx context.Context, newTxs types2.TxSlots) + // FilterKnownIdHashesFunc mocks the FilterKnownIdHashes method. + FilterKnownIdHashesFunc func(tx kv.Tx, hashes types2.Hashes) (types2.Hashes, error) + // GetKnownBlobTxnFunc mocks the GetKnownBlobTxn method. - GetKnownBlobTxnFunc func(tx kv.Tx, hash []byte) *metaTx + GetKnownBlobTxnFunc func(tx kv.Tx, hash []byte) (*metaTx, error) // GetRlpFunc mocks the GetRlp method. GetRlpFunc func(tx kv.Tx, hash []byte) ([]byte, error) @@ -106,6 +112,13 @@ type PoolMock struct { // NewTxs is the newTxs argument value. NewTxs types2.TxSlots } + // FilterKnownIdHashes holds details about calls to the FilterKnownIdHashes method. + FilterKnownIdHashes []struct { + // Tx is the tx argument value. + Tx kv.Tx + // Hashes is the hashes argument value. + Hashes types2.Hashes + } // GetKnownBlobTxn holds details about calls to the GetKnownBlobTxn method. GetKnownBlobTxn []struct { // Tx is the tx argument value. @@ -152,6 +165,7 @@ type PoolMock struct { lockAddLocalTxs sync.RWMutex lockAddNewGoodPeer sync.RWMutex lockAddRemoteTxs sync.RWMutex + lockFilterKnownIdHashes sync.RWMutex lockGetKnownBlobTxn sync.RWMutex lockGetRlp sync.RWMutex lockIdHashKnown sync.RWMutex @@ -272,8 +286,48 @@ func (mock *PoolMock) AddRemoteTxsCalls() []struct { return calls } +// FilterKnownIdHashes calls FilterKnownIdHashesFunc. +func (mock *PoolMock) FilterKnownIdHashes(tx kv.Tx, hashes types2.Hashes) (types2.Hashes, error) { + callInfo := struct { + Tx kv.Tx + Hashes types2.Hashes + }{ + Tx: tx, + Hashes: hashes, + } + mock.lockFilterKnownIdHashes.Lock() + mock.calls.FilterKnownIdHashes = append(mock.calls.FilterKnownIdHashes, callInfo) + mock.lockFilterKnownIdHashes.Unlock() + if mock.FilterKnownIdHashesFunc == nil { + var ( + unknownHashesOut types2.Hashes + errOut error + ) + return unknownHashesOut, errOut + } + return mock.FilterKnownIdHashesFunc(tx, hashes) +} + +// FilterKnownIdHashesCalls gets all the calls that were made to FilterKnownIdHashes. +// Check the length with: +// +// len(mockedPool.FilterKnownIdHashesCalls()) +func (mock *PoolMock) FilterKnownIdHashesCalls() []struct { + Tx kv.Tx + Hashes types2.Hashes +} { + var calls []struct { + Tx kv.Tx + Hashes types2.Hashes + } + mock.lockFilterKnownIdHashes.RLock() + calls = mock.calls.FilterKnownIdHashes + mock.lockFilterKnownIdHashes.RUnlock() + return calls +} + // GetKnownBlobTxn calls GetKnownBlobTxnFunc. -func (mock *PoolMock) GetKnownBlobTxn(tx kv.Tx, hash []byte) *metaTx { +func (mock *PoolMock) GetKnownBlobTxn(tx kv.Tx, hash []byte) (*metaTx, error) { callInfo := struct { Tx kv.Tx Hash []byte @@ -287,8 +341,9 @@ func (mock *PoolMock) GetKnownBlobTxn(tx kv.Tx, hash []byte) *metaTx { if mock.GetKnownBlobTxnFunc == nil { var ( metaTxMoqParamOut *metaTx + errOut error ) - return metaTxMoqParamOut + return metaTxMoqParamOut, errOut } return mock.GetKnownBlobTxnFunc(tx, hash) } diff --git a/txpool/pool.go b/txpool/pool.go index fe59e96ff..1c39b28bb 100644 --- a/txpool/pool.go +++ b/txpool/pool.go @@ -86,9 +86,10 @@ type Pool interface { OnNewBlock(ctx context.Context, stateChanges *remote.StateChangeBatch, unwindTxs, minedTxs types.TxSlots, tx kv.Tx) error // IdHashKnown check whether transaction with given Id hash is known to the pool IdHashKnown(tx kv.Tx, hash []byte) (bool, error) + FilterKnownIdHashes(tx kv.Tx, hashes types.Hashes) (unknownHashes types.Hashes, err error) Started() bool GetRlp(tx kv.Tx, hash []byte) ([]byte, error) - GetKnownBlobTxn(tx kv.Tx, hash []byte) *metaTx + GetKnownBlobTxn(tx kv.Tx, hash []byte) (*metaTx, error) AddNewGoodPeer(peerID types.PeerID) } @@ -292,40 +293,40 @@ func New(newTxs chan types.Announcements, coreDB kv.RoDB, cfg txpoolcfg.Config, } func (p *TxPool) OnNewBlock(ctx context.Context, stateChanges *remote.StateChangeBatch, unwindTxs, minedTxs types.TxSlots, tx kv.Tx) error { + if err := minedTxs.Valid(); err != nil { + return err + } + defer newBlockTimer.UpdateDuration(time.Now()) //t := time.Now() - cache := p.cache() + coreDB, cache := p.coreDBWithCache() cache.OnNewBlock(stateChanges) - coreTx, err := p.coreDB().BeginRo(ctx) + coreTx, err := coreDB.BeginRo(ctx) if err != nil { return err } defer coreTx.Rollback() - p.lock.Lock() - defer p.lock.Unlock() - p.lastSeenBlock.Store(stateChanges.ChangeBatch[len(stateChanges.ChangeBatch)-1].BlockHeight) if !p.started.Load() { - if err := p.fromDB(ctx, tx, coreTx); err != nil { + if err := p.fromDBWithLock(ctx, tx, coreTx); err != nil { return fmt.Errorf("OnNewBlock: loading txs from DB: %w", err) } } - cacheView, err := cache.View(ctx, coreTx) if err != nil { return err } + + p.lock.Lock() + defer p.lock.Unlock() + if assert.Enable { if _, err := kvcache.AssertCheckValues(ctx, coreTx, cache); err != nil { p.logger.Error("AssertCheckValues", "err", err, "stack", stack.Trace().String()) } } - - if err := minedTxs.Valid(); err != nil { - return err - } baseFee := stateChanges.PendingBlockBaseFee pendingBaseFee, baseFeeChanged := p.setBaseFee(baseFee) @@ -405,9 +406,9 @@ func (p *TxPool) processRemoteTxs(ctx context.Context) error { return fmt.Errorf("txpool not started yet") } - cache := p.cache() defer processBatchTxsTimer.UpdateDuration(time.Now()) - coreTx, err := p.coreDB().BeginRo(ctx) + coreDB, cache := p.coreDBWithCache() + coreTx, err := coreDB.BeginRo(ctx) if err != nil { return err } @@ -517,59 +518,87 @@ func (p *TxPool) AppendAllAnnouncements(types []byte, sizes []uint32, hashes []b types, sizes, hashes = p.AppendRemoteAnnouncements(types, sizes, hashes) return types, sizes, hashes } -func (p *TxPool) IdHashKnown(tx kv.Tx, hash []byte) (bool, error) { - p.lock.Lock() - defer p.lock.Unlock() - if _, ok := p.discardReasonsLRU.Get(string(hash)); ok { +func (p *TxPool) idHashKnown(tx kv.Tx, hash []byte, hashS string) (bool, error) { + if _, ok := p.unprocessedRemoteByHash[hashS]; ok { return true, nil } - if _, ok := p.unprocessedRemoteByHash[string(hash)]; ok { + if _, ok := p.discardReasonsLRU.Get(hashS); ok { return true, nil } - if _, ok := p.byHash[string(hash)]; ok { + if _, ok := p.byHash[hashS]; ok { return true, nil } - if _, ok := p.minedBlobTxsByHash[string(hash)]; ok { + if _, ok := p.minedBlobTxsByHash[hashS]; ok { return true, nil } return tx.Has(kv.PoolTransaction, hash) } +func (p *TxPool) IdHashKnown(tx kv.Tx, hash []byte) (bool, error) { + hashS := string(hash) + p.lock.Lock() + defer p.lock.Unlock() + return p.idHashKnown(tx, hash, hashS) +} +func (p *TxPool) FilterKnownIdHashes(tx kv.Tx, hashes types.Hashes) (unknownHashes types.Hashes, err error) { + p.lock.Lock() + defer p.lock.Unlock() + for i := 0; i < len(hashes); i += 32 { + known, err := p.idHashKnown(tx, hashes[i:i+32], string(hashes[i:i+32])) + if err != nil { + return unknownHashes, err + } + if !known { + unknownHashes = append(unknownHashes, hashes[i:i+32]...) + } + } + return unknownHashes, err +} + +func (p *TxPool) getUnprocessedTxn(hashS string) (*types.TxSlot, bool) { + if i, ok := p.unprocessedRemoteByHash[hashS]; ok { + return p.unprocessedRemoteTxs.Txs[i], true + } + return nil, false +} -func (p *TxPool) GetKnownBlobTxn(tx kv.Tx, hash []byte) *metaTx { +func (p *TxPool) GetKnownBlobTxn(tx kv.Tx, hash []byte) (*metaTx, error) { + hashS := string(hash) p.lock.Lock() defer p.lock.Unlock() - if mt, ok := p.minedBlobTxsByHash[string(hash)]; ok { - return mt + if mt, ok := p.minedBlobTxsByHash[hashS]; ok { + return mt, nil } - if i, ok := p.unprocessedRemoteByHash[string(hash)]; ok { - return newMetaTx(p.unprocessedRemoteTxs.Txs[i], false, 0) + if txn, ok := p.getUnprocessedTxn(hashS); ok { + return newMetaTx(txn, false, 0), nil } - if mt, ok := p.byHash[string(hash)]; ok { - return mt + if mt, ok := p.byHash[hashS]; ok { + return mt, nil } - if has, _ := tx.Has(kv.PoolTransaction, hash); has { - txn, _ := tx.GetOne(kv.PoolTransaction, hash) - parseCtx := types.NewTxParseContext(p.chainID) - parseCtx.WithSender(false) - txSlot := &types.TxSlot{} - parseCtx.ParseTransaction(txn, 0, txSlot, nil, false, true, nil) - return newMetaTx(txSlot, false, 0) + has, err := tx.Has(kv.PoolTransaction, hash) + if err != nil { + return nil, err } - return nil + if !has { + return nil, nil + } + txn, _ := tx.GetOne(kv.PoolTransaction, hash) + parseCtx := types.NewTxParseContext(p.chainID) + parseCtx.WithSender(false) + txSlot := &types.TxSlot{} + parseCtx.ParseTransaction(txn, 0, txSlot, nil, false, true, nil) + return newMetaTx(txSlot, false, 0), nil } func (p *TxPool) IsLocal(idHash []byte) bool { + hashS := string(idHash) p.lock.Lock() defer p.lock.Unlock() - return p.isLocalLRU.Contains(string(idHash)) + return p.isLocalLRU.Contains(hashS) } func (p *TxPool) AddNewGoodPeer(peerID types.PeerID) { p.recentlyConnectedPeers.AddPeer(peerID) } func (p *TxPool) Started() bool { return p.started.Load() } func (p *TxPool) best(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf, availableGas, availableBlobGas uint64, toSkip mapset.Set[[32]byte]) (bool, int, error) { - p.lock.Lock() - defer p.lock.Unlock() - // First wait for the corresponding block to arrive if p.lastSeenBlock.Load() < onTopOf { return false, 0, nil // Too early @@ -651,11 +680,15 @@ func (p *TxPool) ResetYieldedStatus() { } func (p *TxPool) YieldBest(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf, availableGas, availableBlobGas uint64, toSkip mapset.Set[[32]byte]) (bool, int, error) { + p.lock.Lock() + defer p.lock.Unlock() return p.best(n, txs, tx, onTopOf, availableGas, availableBlobGas, toSkip) } func (p *TxPool) PeekBest(n uint16, txs *types.TxsRlp, tx kv.Tx, onTopOf, availableGas, availableBlobGas uint64) (bool, error) { set := mapset.NewThreadUnsafeSet[[32]byte]() + p.lock.Lock() + defer p.lock.Unlock() onTime, _, err := p.best(n, txs, tx, onTopOf, availableGas, availableBlobGas, set) return onTime, err } @@ -670,11 +703,12 @@ func (p *TxPool) AddRemoteTxs(_ context.Context, newTxs types.TxSlots) { p.lock.Lock() defer p.lock.Unlock() for i, txn := range newTxs.Txs { - _, ok := p.unprocessedRemoteByHash[string(txn.IDHash[:])] + hashS := string(txn.IDHash[:]) + _, ok := p.unprocessedRemoteByHash[hashS] if ok { continue } - p.unprocessedRemoteByHash[string(txn.IDHash[:])] = len(p.unprocessedRemoteTxs.Txs) + p.unprocessedRemoteByHash[hashS] = len(p.unprocessedRemoteTxs.Txs) p.unprocessedRemoteTxs.Append(txn, newTxs.Senders.At(i), false) } } @@ -971,13 +1005,14 @@ func fillDiscardReasons(reasons []txpoolcfg.DiscardReason, newTxs types.TxSlots, } func (p *TxPool) AddLocalTxs(ctx context.Context, newTransactions types.TxSlots, tx kv.Tx) ([]txpoolcfg.DiscardReason, error) { - coreTx, err := p.coreDB().BeginRo(ctx) + coreDb, cache := p.coreDBWithCache() + coreTx, err := coreDb.BeginRo(ctx) if err != nil { return nil, err } defer coreTx.Rollback() - cacheView, err := p.cache().View(ctx, coreTx) + cacheView, err := cache.View(ctx, coreTx) if err != nil { return nil, err } @@ -1035,19 +1070,11 @@ func (p *TxPool) AddLocalTxs(ctx context.Context, newTransactions types.TxSlots, } return reasons, nil } - -func (p *TxPool) coreDB() kv.RoDB { - p.lock.Lock() - defer p.lock.Unlock() - return p._chainDB -} - -func (p *TxPool) cache() kvcache.Cache { +func (p *TxPool) coreDBWithCache() (kv.RoDB, kvcache.Cache) { p.lock.Lock() defer p.lock.Unlock() - return p._stateCache + return p._chainDB, p._stateCache } - func addTxs(blockNum uint64, cacheView kvcache.CacheView, senders *sendersBatch, newTxs types.TxSlots, pendingBaseFee, blockGasLimit uint64, pending *PendingPool, baseFee, queued *SubPool, @@ -1244,11 +1271,12 @@ func (p *TxPool) addLocked(mt *metaTx, announcements *types.Announcements) txpoo // Remove from mined cache in case this is coming from unwind txs // and to ensure not double adding into the memory - if _, ok := p.minedBlobTxsByHash[string(mt.Tx.IDHash[:])]; ok { - p.deleteMinedBlobTxn(string(mt.Tx.IDHash[:])) + hashStr := string(mt.Tx.IDHash[:]) + if _, ok := p.minedBlobTxsByHash[hashStr]; ok { + p.deleteMinedBlobTxn(hashStr) } - p.byHash[string(mt.Tx.IDHash[:])] = mt + p.byHash[hashStr] = mt if replaced := p.all.replaceOrInsert(mt); replaced != nil { if assert.Enable { @@ -1257,7 +1285,7 @@ func (p *TxPool) addLocked(mt *metaTx, announcements *types.Announcements) txpoo } if mt.subPool&IsLocal != 0 { - p.isLocalLRU.Add(string(mt.Tx.IDHash[:]), struct{}{}) + p.isLocalLRU.Add(hashStr, struct{}{}) } // All transactions are first added to the queued pool and then immediately promoted from there if required p.queued.Add(mt, p.logger) @@ -1267,10 +1295,11 @@ func (p *TxPool) addLocked(mt *metaTx, announcements *types.Announcements) txpoo // dropping transaction from all sub-structures and from db // Important: don't call it while iterating by all func (p *TxPool) discardLocked(mt *metaTx, reason txpoolcfg.DiscardReason) { - delete(p.byHash, string(mt.Tx.IDHash[:])) + hashStr := string(mt.Tx.IDHash[:]) + delete(p.byHash, hashStr) p.deletedTxs = append(p.deletedTxs, mt) p.all.delete(mt) - p.discardReasonsLRU.Add(string(mt.Tx.IDHash[:]), reason) + p.discardReasonsLRU.Add(hashStr, reason) } // Cache recently mined blobs in anticipation of reorg, delete finalized ones @@ -1828,6 +1857,11 @@ func (p *TxPool) flushLocked(tx kv.RwTx) (err error) { return nil } +func (p *TxPool) fromDBWithLock(ctx context.Context, tx kv.Tx, coreTx kv.Tx) error { + p.lock.Lock() + defer p.lock.Unlock() + return p.fromDB(ctx, tx, coreTx) +} func (p *TxPool) fromDB(ctx context.Context, tx kv.Tx, coreTx kv.Tx) error { if p.lastSeenBlock.Load() == 0 { lastSeenBlock, err := LastSeenBlock(tx)