Skip to content

Commit

Permalink
Limit the number of transactions in pending set (#8777)
Browse files Browse the repository at this point in the history
* Unordered iterator.

* Use unordered and limited set if full not required.

* Split timeout work into smaller timers.

* Avoid collecting all pending transactions when mining

* Remove println.

* Use priority ordering in eth-filter.

* Fix ethcore-miner tests and tx propagation.

* Review grumbles addressed.

* Add test for unordered not populating the cache.

* Fix ethcore tests.

* Fix light tests.

* Fix ethcore-sync tests.

* Fix RPC tests.
  • Loading branch information
tomusdrw authored and debris committed Jun 12, 2018
1 parent 3d8df8f commit a815112
Show file tree
Hide file tree
Showing 29 changed files with 415 additions and 115 deletions.
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion ethcore/light/src/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ const PROPAGATE_TIMEOUT_INTERVAL: Duration = Duration::from_secs(5);
const RECALCULATE_COSTS_TIMEOUT: TimerToken = 3;
const RECALCULATE_COSTS_INTERVAL: Duration = Duration::from_secs(60 * 60);

/// Max number of transactions in a single packet.
const MAX_TRANSACTIONS_TO_PROPAGATE: usize = 64;

// minimum interval between updates.
const UPDATE_INTERVAL: Duration = Duration::from_millis(5000);

Expand Down Expand Up @@ -647,7 +650,7 @@ impl LightProtocol {
fn propagate_transactions(&self, io: &IoContext) {
if self.capabilities.read().tx_relay { return }

let ready_transactions = self.provider.ready_transactions();
let ready_transactions = self.provider.ready_transactions(MAX_TRANSACTIONS_TO_PROPAGATE);
if ready_transactions.is_empty() { return }

trace!(target: "pip", "propagate transactions: {} ready", ready_transactions.len());
Expand Down
4 changes: 2 additions & 2 deletions ethcore/light/src/net/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,8 @@ impl Provider for TestProvider {
})
}

fn ready_transactions(&self) -> Vec<PendingTransaction> {
self.0.client.ready_transactions()
fn ready_transactions(&self, max_len: usize) -> Vec<PendingTransaction> {
self.0.client.ready_transactions(max_len)
}
}

Expand Down
13 changes: 8 additions & 5 deletions ethcore/light/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ pub trait Provider: Send + Sync {
fn header_proof(&self, req: request::CompleteHeaderProofRequest) -> Option<request::HeaderProofResponse>;

/// Provide pending transactions.
fn ready_transactions(&self) -> Vec<PendingTransaction>;
fn ready_transactions(&self, max_len: usize) -> Vec<PendingTransaction>;

/// Provide a proof-of-execution for the given transaction proof request.
/// Returns a vector of all state items necessary to execute the transaction.
Expand Down Expand Up @@ -280,8 +280,8 @@ impl<T: ProvingBlockChainClient + ?Sized> Provider for T {
.map(|(_, proof)| ::request::ExecutionResponse { items: proof })
}

fn ready_transactions(&self) -> Vec<PendingTransaction> {
BlockChainClient::ready_transactions(self)
fn ready_transactions(&self, max_len: usize) -> Vec<PendingTransaction> {
BlockChainClient::ready_transactions(self, max_len)
.into_iter()
.map(|tx| tx.pending().clone())
.collect()
Expand Down Expand Up @@ -367,9 +367,12 @@ impl<L: AsLightClient + Send + Sync> Provider for LightProvider<L> {
None
}

fn ready_transactions(&self) -> Vec<PendingTransaction> {
fn ready_transactions(&self, max_len: usize) -> Vec<PendingTransaction> {
let chain_info = self.chain_info();
self.txqueue.read().ready_transactions(chain_info.best_block_number, chain_info.best_block_timestamp)
let mut transactions = self.txqueue.read()
.ready_transactions(chain_info.best_block_number, chain_info.best_block_timestamp);
transactions.truncate(max_len);
transactions
}
}

Expand Down
4 changes: 2 additions & 2 deletions ethcore/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1956,8 +1956,8 @@ impl BlockChainClient for Client {
(*self.build_last_hashes(&self.chain.read().best_block_hash())).clone()
}

fn ready_transactions(&self) -> Vec<Arc<VerifiedTransaction>> {
self.importer.miner.ready_transactions(self)
fn ready_transactions(&self, max_len: usize) -> Vec<Arc<VerifiedTransaction>> {
self.importer.miner.ready_transactions(self, max_len, ::miner::PendingOrdering::Priority)
}

fn signing_chain_id(&self) -> Option<u64> {
Expand Down
6 changes: 3 additions & 3 deletions ethcore/src/client/test_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use log_entry::LocalizedLogEntry;
use receipt::{Receipt, LocalizedReceipt, TransactionOutcome};
use error::ImportResult;
use vm::Schedule;
use miner::{Miner, MinerService};
use miner::{self, Miner, MinerService};
use spec::Spec;
use types::basic_account::BasicAccount;
use types::pruning_info::PruningInfo;
Expand Down Expand Up @@ -806,8 +806,8 @@ impl BlockChainClient for TestBlockChainClient {
self.traces.read().clone()
}

fn ready_transactions(&self) -> Vec<Arc<VerifiedTransaction>> {
self.miner.ready_transactions(self)
fn ready_transactions(&self, max_len: usize) -> Vec<Arc<VerifiedTransaction>> {
self.miner.ready_transactions(self, max_len, miner::PendingOrdering::Priority)
}

fn signing_chain_id(&self) -> Option<u64> { None }
Expand Down
2 changes: 1 addition & 1 deletion ethcore/src/client/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ pub trait BlockChainClient : Sync + Send + AccountData + BlockChain + CallContra
fn last_hashes(&self) -> LastHashes;

/// List all transactions that are allowed into the next block.
fn ready_transactions(&self) -> Vec<Arc<VerifiedTransaction>>;
fn ready_transactions(&self, max_len: usize) -> Vec<Arc<VerifiedTransaction>>;

/// Sorted list of transaction gas prices from at least last sample_size blocks.
fn gas_price_corpus(&self, sample_size: usize) -> ::stats::Corpus<U256> {
Expand Down
51 changes: 35 additions & 16 deletions ethcore/src/miner/miner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,18 +364,28 @@ impl Miner {

let client = self.pool_client(chain);
let engine_params = self.engine.params();
let min_tx_gas = self.engine.schedule(chain_info.best_block_number).tx_gas.into();
let min_tx_gas: U256 = self.engine.schedule(chain_info.best_block_number).tx_gas.into();
let nonce_cap: Option<U256> = if chain_info.best_block_number + 1 >= engine_params.dust_protection_transition {
Some((engine_params.nonce_cap_increment * (chain_info.best_block_number + 1)).into())
} else {
None
};
// we will never need more transactions than limit divided by min gas
let max_transactions = if min_tx_gas.is_zero() {
usize::max_value()
} else {
(*open_block.block().header().gas_limit() / min_tx_gas).as_u64() as usize
};

let pending: Vec<Arc<_>> = self.transaction_queue.pending(
client.clone(),
chain_info.best_block_number,
chain_info.best_block_timestamp,
nonce_cap,
pool::PendingSettings {
block_number: chain_info.best_block_number,
current_timestamp: chain_info.best_block_timestamp,
nonce_cap,
max_len: max_transactions,
ordering: miner::PendingOrdering::Priority,
}
);

let took_ms = |elapsed: &Duration| {
Expand Down Expand Up @@ -807,20 +817,28 @@ impl miner::MinerService for Miner {
self.transaction_queue.all_transactions()
}

fn ready_transactions<C>(&self, chain: &C) -> Vec<Arc<VerifiedTransaction>> where
fn ready_transactions<C>(&self, chain: &C, max_len: usize, ordering: miner::PendingOrdering)
-> Vec<Arc<VerifiedTransaction>>
where
C: ChainInfo + Nonce + Sync,
{
let chain_info = chain.chain_info();

let from_queue = || {
// We propagate transactions over the nonce cap.
// The mechanism is only to limit number of transactions in pending block
// those transactions are valid and will just be ready to be included in next block.
let nonce_cap = None;

self.transaction_queue.pending(
CachedNonceClient::new(chain, &self.nonce_cache),
chain_info.best_block_number,
chain_info.best_block_timestamp,
// We propagate transactions over the nonce cap.
// The mechanism is only to limit number of transactions in pending block
// those transactions are valid and will just be ready to be included in next block.
None,
pool::PendingSettings {
block_number: chain_info.best_block_number,
current_timestamp: chain_info.best_block_timestamp,
nonce_cap,
max_len,
ordering,
},
)
};

Expand All @@ -830,6 +848,7 @@ impl miner::MinerService for Miner {
.iter()
.map(|signed| pool::VerifiedTransaction::from_pending_block_transaction(signed.clone()))
.map(Arc::new)
.take(max_len)
.collect()
}, chain_info.best_block_number)
};
Expand Down Expand Up @@ -1083,7 +1102,7 @@ mod tests {
use rustc_hex::FromHex;

use client::{TestBlockChainClient, EachBlockWith, ChainInfo, ImportSealedBlock};
use miner::MinerService;
use miner::{MinerService, PendingOrdering};
use test_helpers::{generate_dummy_client, generate_dummy_client_with_spec_and_accounts};
use transaction::{Transaction};

Expand Down Expand Up @@ -1179,7 +1198,7 @@ mod tests {
assert_eq!(res.unwrap(), ());
assert_eq!(miner.pending_transactions(best_block).unwrap().len(), 1);
assert_eq!(miner.pending_receipts(best_block).unwrap().len(), 1);
assert_eq!(miner.ready_transactions(&client).len(), 1);
assert_eq!(miner.ready_transactions(&client, 10, PendingOrdering::Priority).len(), 1);
// This method will let us know if pending block was created (before calling that method)
assert!(!miner.prepare_pending_block(&client));
}
Expand All @@ -1198,7 +1217,7 @@ mod tests {
assert_eq!(res.unwrap(), ());
assert_eq!(miner.pending_transactions(best_block), None);
assert_eq!(miner.pending_receipts(best_block), None);
assert_eq!(miner.ready_transactions(&client).len(), 1);
assert_eq!(miner.ready_transactions(&client, 10, PendingOrdering::Priority).len(), 1);
}

#[test]
Expand All @@ -1217,11 +1236,11 @@ mod tests {
assert_eq!(miner.pending_transactions(best_block), None);
assert_eq!(miner.pending_receipts(best_block), None);
// By default we use PendingSet::AlwaysSealing, so no transactions yet.
assert_eq!(miner.ready_transactions(&client).len(), 0);
assert_eq!(miner.ready_transactions(&client, 10, PendingOrdering::Priority).len(), 0);
// This method will let us know if pending block was created (before calling that method)
assert!(miner.prepare_pending_block(&client));
// After pending block is created we should see a transaction.
assert_eq!(miner.ready_transactions(&client).len(), 1);
assert_eq!(miner.ready_transactions(&client, 10, PendingOrdering::Priority).len(), 1);
}

#[test]
Expand Down
7 changes: 5 additions & 2 deletions ethcore/src/miner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub mod pool_client;
pub mod stratum;

pub use self::miner::{Miner, MinerOptions, Penalization, PendingSet, AuthoringParams};
pub use ethcore_miner::pool::PendingOrdering;

use std::sync::Arc;
use std::collections::BTreeMap;
Expand Down Expand Up @@ -156,10 +157,12 @@ pub trait MinerService : Send + Sync {
fn next_nonce<C>(&self, chain: &C, address: &Address) -> U256
where C: Nonce + Sync;

/// Get a list of all ready transactions.
/// Get a list of all ready transactions either ordered by priority or unordered (cheaper).
///
/// Depending on the settings may look in transaction pool or only in pending block.
fn ready_transactions<C>(&self, chain: &C) -> Vec<Arc<VerifiedTransaction>>
/// If you don't need a full set of transactions, you can add `max_len` and create only a limited set of
/// transactions.
fn ready_transactions<C>(&self, chain: &C, max_len: usize, ordering: PendingOrdering) -> Vec<Arc<VerifiedTransaction>>
where C: ChainInfo + Nonce + Sync;

/// Get a list of all transactions in the pool (some of them might not be ready for inclusion yet).
Expand Down
10 changes: 5 additions & 5 deletions ethcore/src/tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use test_helpers::{
use types::filter::Filter;
use ethereum_types::{U256, Address};
use kvdb_rocksdb::{Database, DatabaseConfig};
use miner::Miner;
use miner::{Miner, PendingOrdering};
use spec::Spec;
use views::BlockView;
use ethkey::KeyPair;
Expand Down Expand Up @@ -343,12 +343,12 @@ fn does_not_propagate_delayed_transactions() {

client.miner().import_own_transaction(&*client, tx0).unwrap();
client.miner().import_own_transaction(&*client, tx1).unwrap();
assert_eq!(0, client.ready_transactions().len());
assert_eq!(0, client.miner().ready_transactions(&*client).len());
assert_eq!(0, client.ready_transactions(10).len());
assert_eq!(0, client.miner().ready_transactions(&*client, 10, PendingOrdering::Priority).len());
push_blocks_to_client(&client, 53, 2, 2);
client.flush_queue();
assert_eq!(2, client.ready_transactions().len());
assert_eq!(2, client.miner().ready_transactions(&*client).len());
assert_eq!(2, client.ready_transactions(10).len());
assert_eq!(2, client.miner().ready_transactions(&*client, 10, PendingOrdering::Priority).len());
}

#[test]
Expand Down
21 changes: 16 additions & 5 deletions ethcore/sync/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,10 @@ impl SyncProvider for EthSync {
}
}

const PEERS_TIMER: TimerToken = 0;
const SYNC_TIMER: TimerToken = 1;
const TX_TIMER: TimerToken = 2;

struct SyncProtocolHandler {
/// Shared blockchain client.
chain: Arc<BlockChainClient>,
Expand All @@ -373,7 +377,9 @@ struct SyncProtocolHandler {
impl NetworkProtocolHandler for SyncProtocolHandler {
fn initialize(&self, io: &NetworkContext) {
if io.subprotocol_name() != WARP_SYNC_PROTOCOL_ID {
io.register_timer(0, Duration::from_secs(1)).expect("Error registering sync timer");
io.register_timer(PEERS_TIMER, Duration::from_millis(700)).expect("Error registering peers timer");
io.register_timer(SYNC_TIMER, Duration::from_millis(1100)).expect("Error registering sync timer");
io.register_timer(TX_TIMER, Duration::from_millis(1300)).expect("Error registering transactions timer");
}
}

Expand All @@ -399,12 +405,17 @@ impl NetworkProtocolHandler for SyncProtocolHandler {
}
}

fn timeout(&self, io: &NetworkContext, _timer: TimerToken) {
fn timeout(&self, io: &NetworkContext, timer: TimerToken) {
trace_time!("sync::timeout");
let mut io = NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay);
self.sync.write().maintain_peers(&mut io);
self.sync.write().maintain_sync(&mut io);
self.sync.write().propagate_new_transactions(&mut io);
match timer {
PEERS_TIMER => self.sync.write().maintain_peers(&mut io),
SYNC_TIMER => self.sync.write().maintain_sync(&mut io),
TX_TIMER => {
self.sync.write().propagate_new_transactions(&mut io);
},
_ => warn!("Unknown timer {} triggered.", timer),
}
}
}

Expand Down
10 changes: 7 additions & 3 deletions ethcore/sync/src/chain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,10 @@ const MAX_NEW_HASHES: usize = 64;
const MAX_NEW_BLOCK_AGE: BlockNumber = 20;
// maximal packet size with transactions (cannot be greater than 16MB - protocol limitation).
const MAX_TRANSACTION_PACKET_SIZE: usize = 8 * 1024 * 1024;
// Maximal number of transactions queried from miner to propagate.
// This set is used to diff with transactions known by the peer and
// we will send a difference of length up to `MAX_TRANSACTIONS_TO_PROPAGATE`.
const MAX_TRANSACTIONS_TO_QUERY: usize = 4096;
// Maximal number of transactions in sent in single packet.
const MAX_TRANSACTIONS_TO_PROPAGATE: usize = 64;
// Min number of blocks to be behind for a snapshot sync
Expand Down Expand Up @@ -1143,7 +1147,7 @@ pub mod tests {
use super::{PeerInfo, PeerAsking};
use ethcore::header::*;
use ethcore::client::{BlockChainClient, EachBlockWith, TestBlockChainClient, ChainInfo, BlockInfo};
use ethcore::miner::MinerService;
use ethcore::miner::{MinerService, PendingOrdering};
use private_tx::NoopPrivateTxHandler;

pub fn get_dummy_block(order: u32, parent_hash: H256) -> Bytes {
Expand Down Expand Up @@ -1355,7 +1359,7 @@ pub mod tests {
let mut io = TestIo::new(&mut client, &ss, &queue, None);
io.chain.miner.chain_new_blocks(io.chain, &[], &[], &[], &good_blocks, false);
sync.chain_new_blocks(&mut io, &[], &[], &[], &good_blocks, &[], &[]);
assert_eq!(io.chain.miner.ready_transactions(io.chain).len(), 1);
assert_eq!(io.chain.miner.ready_transactions(io.chain, 10, PendingOrdering::Priority).len(), 1);
}
// We need to update nonce status (because we say that the block has been imported)
for h in &[good_blocks[0]] {
Expand All @@ -1371,7 +1375,7 @@ pub mod tests {
}

// then
assert_eq!(client.miner.ready_transactions(&client).len(), 1);
assert_eq!(client.miner.ready_transactions(&client, 10, PendingOrdering::Priority).len(), 1);
}

#[test]
Expand Down
3 changes: 2 additions & 1 deletion ethcore/sync/src/chain/propagator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use super::{
MAX_PEERS_PROPAGATION,
MAX_TRANSACTION_PACKET_SIZE,
MAX_TRANSACTIONS_TO_PROPAGATE,
MAX_TRANSACTIONS_TO_QUERY,
MIN_PEERS_PROPAGATION,
CONSENSUS_DATA_PACKET,
NEW_BLOCK_HASHES_PACKET,
Expand Down Expand Up @@ -114,7 +115,7 @@ impl SyncPropagator {
return 0;
}

let transactions = io.chain().ready_transactions();
let transactions = io.chain().ready_transactions(MAX_TRANSACTIONS_TO_QUERY);
if transactions.is_empty() {
return 0;
}
Expand Down
Loading

0 comments on commit a815112

Please sign in to comment.