From 69ca2ac902ca8a46fa73723049b7033b918f0068 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Fri, 1 Jun 2018 13:09:43 +0200 Subject: [PATCH 01/13] Unordered iterator. --- Cargo.lock | 6 +-- ethcore/light/src/net/mod.rs | 5 ++- ethcore/light/src/provider.rs | 13 +++--- ethcore/src/client/client.rs | 4 +- ethcore/src/client/test_client.rs | 4 +- ethcore/src/client/traits.rs | 2 +- ethcore/src/miner/miner.rs | 2 +- ethcore/src/miner/mod.rs | 4 +- ethcore/sync/src/chain/propagator.rs | 2 +- miner/src/pool/queue.rs | 4 +- transaction-pool/Cargo.toml | 2 +- transaction-pool/src/pool.rs | 58 ++++++++++++++++++++++++++- transaction-pool/src/tests/mod.rs | 60 ++++++++++++++++++++++++++++ 13 files changed, 144 insertions(+), 22 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c8e60dce693..502ca765ee6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -695,7 +695,7 @@ dependencies = [ "rlp 0.2.1", "rustc-hex 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "trace-time 0.1.0", - "transaction-pool 1.12.0", + "transaction-pool 1.12.1", "url 1.5.1 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -2242,7 +2242,7 @@ dependencies = [ "tempdir 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", "tiny-keccak 1.4.2 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-timer 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", - "transaction-pool 1.12.0", + "transaction-pool 1.12.1", "transient-hashmap 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "vm 0.1.0", ] @@ -3473,7 +3473,7 @@ dependencies = [ [[package]] name = "transaction-pool" -version = "1.12.0" +version = "1.12.1" dependencies = [ "error-chain 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)", "ethereum-types 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/ethcore/light/src/net/mod.rs b/ethcore/light/src/net/mod.rs index 27d5c12a5fa..34f426cb786 100644 --- a/ethcore/light/src/net/mod.rs +++ b/ethcore/light/src/net/mod.rs @@ -72,6 +72,9 @@ const PROPAGATE_TIMEOUT_INTERVAL: Duration = Duration::from_secs(5); const RECALCULATE_COSTS_TIMEOUT: TimerToken = 3; const RECALCULATE_COSTS_INTERVAL: Duration = Duration::from_secs(60 * 60); +/// Max number of transactions in a single packet. +const MAX_TRANSACTIONS_TO_PROPAGATE: usize = 64; + // minimum interval between updates. const UPDATE_INTERVAL: Duration = Duration::from_millis(5000); @@ -648,7 +651,7 @@ impl LightProtocol { fn propagate_transactions(&self, io: &IoContext) { if self.capabilities.read().tx_relay { return } - let ready_transactions = self.provider.ready_transactions(); + let ready_transactions = self.provider.ready_transactions(MAX_TRANSACTIONS_TO_PROPAGATE); if ready_transactions.is_empty() { return } trace!(target: "pip", "propagate transactions: {} ready", ready_transactions.len()); diff --git a/ethcore/light/src/provider.rs b/ethcore/light/src/provider.rs index aaa6f5858ae..826724026df 100644 --- a/ethcore/light/src/provider.rs +++ b/ethcore/light/src/provider.rs @@ -125,7 +125,7 @@ pub trait Provider: Send + Sync { fn header_proof(&self, req: request::CompleteHeaderProofRequest) -> Option; /// Provide pending transactions. - fn ready_transactions(&self) -> Vec; + fn ready_transactions(&self, max_len: usize) -> Vec; /// Provide a proof-of-execution for the given transaction proof request. /// Returns a vector of all state items necessary to execute the transaction. @@ -280,8 +280,8 @@ impl Provider for T { .map(|(_, proof)| ::request::ExecutionResponse { items: proof }) } - fn ready_transactions(&self) -> Vec { - BlockChainClient::ready_transactions(self) + fn ready_transactions(&self, max_len: usize) -> Vec { + BlockChainClient::ready_transactions(self, max_len) .into_iter() .map(|tx| tx.pending().clone()) .collect() @@ -367,9 +367,12 @@ impl Provider for LightProvider { None } - fn ready_transactions(&self) -> Vec { + fn ready_transactions(&self, max_len: usize) -> Vec { let chain_info = self.chain_info(); - self.txqueue.read().ready_transactions(chain_info.best_block_number, chain_info.best_block_timestamp) + let mut transactions = self.txqueue.read() + .ready_transactions(chain_info.best_block_number, chain_info.best_block_timestamp); + transactions.truncate(max_len); + transactions } } diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index b469cf45180..e3c6520158a 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -1954,8 +1954,8 @@ impl BlockChainClient for Client { (*self.build_last_hashes(&self.chain.read().best_block_hash())).clone() } - fn ready_transactions(&self) -> Vec> { - self.importer.miner.ready_transactions(self) + fn ready_transactions(&self, max_len: usize) -> Vec> { + self.importer.miner.ready_transactions(self, max_len) } fn signing_chain_id(&self) -> Option { diff --git a/ethcore/src/client/test_client.rs b/ethcore/src/client/test_client.rs index fab32346572..673468510a1 100644 --- a/ethcore/src/client/test_client.rs +++ b/ethcore/src/client/test_client.rs @@ -808,8 +808,8 @@ impl BlockChainClient for TestBlockChainClient { self.traces.read().clone() } - fn ready_transactions(&self) -> Vec> { - self.miner.ready_transactions(self) + fn ready_transactions(&self, max_len: usize) -> Vec> { + self.miner.ready_transactions(self, max_len) } fn signing_chain_id(&self) -> Option { None } diff --git a/ethcore/src/client/traits.rs b/ethcore/src/client/traits.rs index 358e24fa905..cf62043c91a 100644 --- a/ethcore/src/client/traits.rs +++ b/ethcore/src/client/traits.rs @@ -321,7 +321,7 @@ pub trait BlockChainClient : Sync + Send + AccountData + BlockChain + CallContra fn last_hashes(&self) -> LastHashes; /// List all transactions that are allowed into the next block. - fn ready_transactions(&self) -> Vec>; + fn ready_transactions(&self, max_len: usize) -> Vec>; /// Sorted list of transaction gas prices from at least last sample_size blocks. fn gas_price_corpus(&self, sample_size: usize) -> ::stats::Corpus { diff --git a/ethcore/src/miner/miner.rs b/ethcore/src/miner/miner.rs index 4904535a89b..e7f9e534d26 100644 --- a/ethcore/src/miner/miner.rs +++ b/ethcore/src/miner/miner.rs @@ -807,7 +807,7 @@ impl miner::MinerService for Miner { self.transaction_queue.all_transactions() } - fn ready_transactions(&self, chain: &C) -> Vec> where + fn ready_transactions(&self, chain: &C, max_len: usize) -> Vec> where C: ChainInfo + Nonce + Sync, { let chain_info = chain.chain_info(); diff --git a/ethcore/src/miner/mod.rs b/ethcore/src/miner/mod.rs index fbf4f11b7ad..c141ef6ac4c 100644 --- a/ethcore/src/miner/mod.rs +++ b/ethcore/src/miner/mod.rs @@ -161,7 +161,9 @@ pub trait MinerService : Send + Sync { /// Get a list of all ready transactions. /// /// Depending on the settings may look in transaction pool or only in pending block. - fn ready_transactions(&self, chain: &C) -> Vec> + /// If you don't need a full set of transactions, you can add `max_len` and create only a limited set of + /// transactions. + fn ready_transactions(&self, chain: &C, max_len: usize) -> Vec> where C: ChainInfo + Nonce + Sync; /// Get a list of all transactions in the pool (some of them might not be ready for inclusion yet). diff --git a/ethcore/sync/src/chain/propagator.rs b/ethcore/sync/src/chain/propagator.rs index 4ae0518a537..da0c9302931 100644 --- a/ethcore/sync/src/chain/propagator.rs +++ b/ethcore/sync/src/chain/propagator.rs @@ -114,7 +114,7 @@ impl SyncPropagator { return 0; } - let transactions = io.chain().ready_transactions(); + let transactions = io.chain().ready_transactions(MAX_TRANSACTIONS_TO_PROPAGATE); if transactions.is_empty() { return 0; } diff --git a/miner/src/pool/queue.rs b/miner/src/pool/queue.rs index 8cf4534b763..b7efb091526 100644 --- a/miner/src/pool/queue.rs +++ b/miner/src/pool/queue.rs @@ -201,10 +201,10 @@ impl TransactionQueue { /// Returns all transactions in the queue ordered by priority. pub fn all_transactions(&self) -> Vec> { let ready = |_tx: &pool::VerifiedTransaction| txpool::Readiness::Ready; - self.pool.read().pending(ready).collect() + self.pool.read().unordered_transactions(ready).collect() } - /// Returns current pneding transactions. + /// Returns current pending transactions ordered by priority. /// /// NOTE: This may return a cached version of pending transaction set. /// Re-computing the pending set is possible with `#collect_pending` method, diff --git a/transaction-pool/Cargo.toml b/transaction-pool/Cargo.toml index 8965c8cee01..0ba1790a47f 100644 --- a/transaction-pool/Cargo.toml +++ b/transaction-pool/Cargo.toml @@ -1,7 +1,7 @@ [package] description = "Generic transaction pool." name = "transaction-pool" -version = "1.12.0" +version = "1.12.1" license = "GPL-3.0" authors = ["Parity Technologies "] diff --git a/transaction-pool/src/pool.rs b/transaction-pool/src/pool.rs index 5cb6e479b8e..7a1bd0c8589 100644 --- a/transaction-pool/src/pool.rs +++ b/transaction-pool/src/pool.rs @@ -15,7 +15,8 @@ // along with Parity. If not, see . use std::sync::Arc; -use std::collections::{HashMap, BTreeSet}; +use std::slice; +use std::collections::{hash_map, HashMap, BTreeSet}; use error; use listener::{Listener, NoopListener}; @@ -417,7 +418,16 @@ impl Pool where PendingIterator { ready, best_transactions, - pool: self + pool: self, + } + } + + /// Returns unprioritized list of ready transactions. + pub fn unordered_pending>(&self, ready: R) -> UnorderedIterator { + UnorderedIterator { + ready, + senders: self.transactions.iter(), + transactions: None, } } @@ -483,6 +493,50 @@ impl Pool where } } +/// An iterator over all pending (ready) transactions in unoredered fashion. +/// +/// NOTE: Current implementation will iterate over all transactions from particular sender +/// ordered by nonce, but that might change in the future. +/// +/// NOTE: the transactions are not removed from the queue. +/// You might remove them later by calling `cull`. +pub struct UnorderedIterator<'a, T, R, S> where + T: VerifiedTransaction + 'a, + S: Scoring + 'a, +{ + ready: R, + senders: hash_map::Iter<'a, T::Sender, Transactions>, + transactions: Option>>, +} + +impl<'a, T, R, S> Iterator for UnorderedIterator<'a, T, R, S> where + T: VerifiedTransaction, + R: Ready, + S: Scoring, +{ + type Item = Arc; + + fn next(&mut self) -> Option { + loop { + if let Some(transactions) = self.transactions.as_mut() { + if let Some(tx) = transactions.next() { + match self.ready.is_ready(&tx) { + Readiness::Ready => { + return Some(tx.transaction.clone()); + }, + state => trace!("[{:?}] Ignoring {:?} transaction.", tx.hash(), state), + } + } + } + + // otherwise fallback and try next sender + let next_sender = self.senders.next()?; + self.transactions = Some(next_sender.1.iter()); + } + } +} + + /// An iterator over all pending (ready) transactions. /// NOTE: the transactions are not removed from the queue. /// You might remove them later by calling `cull`. diff --git a/transaction-pool/src/tests/mod.rs b/transaction-pool/src/tests/mod.rs index b21ea31807c..a8f6d9f3014 100644 --- a/transaction-pool/src/tests/mod.rs +++ b/transaction-pool/src/tests/mod.rs @@ -250,6 +250,66 @@ fn should_construct_pending() { assert_eq!(pending.next(), None); } +#[test] +fn should_return_unordered_iterator() { + // given + let b = TransactionBuilder::default(); + let mut txq = TestPool::default(); + + let tx0 = txq.import(b.tx().nonce(0).gas_price(5).new()).unwrap(); + let tx1 = txq.import(b.tx().nonce(1).gas_price(5).new()).unwrap(); + let tx2 = txq.import(b.tx().nonce(2).new()).unwrap(); + let tx3 =txq.import(b.tx().nonce(3).gas_price(4).new()).unwrap(); + //gap + txq.import(b.tx().nonce(5).new()).unwrap(); + + let tx5 = txq.import(b.tx().sender(1).nonce(0).new()).unwrap(); + let tx6 = txq.import(b.tx().sender(1).nonce(1).new()).unwrap(); + let tx7 = txq.import(b.tx().sender(1).nonce(2).new()).unwrap(); + let tx8 = txq.import(b.tx().sender(1).nonce(3).gas_price(4).new()).unwrap(); + // gap + txq.import(b.tx().sender(1).nonce(5).new()).unwrap(); + + let tx9 = txq.import(b.tx().sender(2).nonce(0).new()).unwrap(); + assert_eq!(txq.light_status().transaction_count, 11); + assert_eq!(txq.status(NonceReady::default()), Status { + stalled: 0, + pending: 9, + future: 2, + }); + assert_eq!(txq.status(NonceReady::new(1)), Status { + stalled: 3, + pending: 6, + future: 2, + }); + + // when + let all: Vec<_> = txq.unordered_pending(NonceReady::default()).collect(); + + let chain1 = vec![tx0, tx1, tx2, tx3]; + let chain2 = vec![tx5, tx6, tx7, tx8]; + let chain3 = vec![tx9]; + + assert_eq!(all.len(), chain1.len() + chain2.len() + chain3.len()); + + let mut options = vec![ + vec![chain1.clone(), chain2.clone(), chain3.clone()], + vec![chain2.clone(), chain1.clone(), chain3.clone()], + vec![chain2.clone(), chain3.clone(), chain1.clone()], + vec![chain3.clone(), chain2.clone(), chain1.clone()], + vec![chain3.clone(), chain1.clone(), chain2.clone()], + vec![chain1.clone(), chain3.clone(), chain2.clone()], + ].into_iter().map(|mut v| { + let mut first = v.pop().unwrap(); + for mut x in v { + first.append(&mut x); + } + first + }); + + assert!(options.any(|opt| all == opt)); +} + #[test] fn should_update_scoring_correctly() { // given From 9a57cd8893c46972173da6127bfaae7e18d17f20 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Fri, 1 Jun 2018 14:58:22 +0200 Subject: [PATCH 02/13] Use unordered and limited set if full not required. --- ethcore/src/client/client.rs | 2 +- ethcore/src/client/test_client.rs | 4 +-- ethcore/src/miner/miner.rs | 19 +++++++++--- ethcore/src/miner/mod.rs | 5 +-- miner/src/pool/listener.rs | 1 + miner/src/pool/mod.rs | 9 ++++++ miner/src/pool/queue.rs | 51 +++++++++++++++++++++++++------ parity/rpc_apis.rs | 4 +-- rpc/src/v1/impls/eth_filter.rs | 2 +- rpc/src/v1/impls/eth_pubsub.rs | 2 +- rpc/src/v1/impls/light/parity.rs | 3 +- rpc/src/v1/impls/parity.rs | 10 ++++-- rpc/src/v1/traits/parity.rs | 2 +- 13 files changed, 85 insertions(+), 29 deletions(-) diff --git a/ethcore/src/client/client.rs b/ethcore/src/client/client.rs index e3c6520158a..a9077ee58b4 100644 --- a/ethcore/src/client/client.rs +++ b/ethcore/src/client/client.rs @@ -1955,7 +1955,7 @@ impl BlockChainClient for Client { } fn ready_transactions(&self, max_len: usize) -> Vec> { - self.importer.miner.ready_transactions(self, max_len) + self.importer.miner.ready_transactions(self, max_len, ::miner::PendingOrdering::Priority) } fn signing_chain_id(&self) -> Option { diff --git a/ethcore/src/client/test_client.rs b/ethcore/src/client/test_client.rs index 673468510a1..a3878f4e650 100644 --- a/ethcore/src/client/test_client.rs +++ b/ethcore/src/client/test_client.rs @@ -48,7 +48,7 @@ use log_entry::LocalizedLogEntry; use receipt::{Receipt, LocalizedReceipt, TransactionOutcome}; use error::ImportResult; use vm::Schedule; -use miner::{Miner, MinerService}; +use miner::{self, Miner, MinerService}; use spec::Spec; use types::basic_account::BasicAccount; use types::mode::Mode; @@ -809,7 +809,7 @@ impl BlockChainClient for TestBlockChainClient { } fn ready_transactions(&self, max_len: usize) -> Vec> { - self.miner.ready_transactions(self, max_len) + self.miner.ready_transactions(self, max_len, miner::PendingOrdering::Priority) } fn signing_chain_id(&self) -> Option { None } diff --git a/ethcore/src/miner/miner.rs b/ethcore/src/miner/miner.rs index e7f9e534d26..98e99db2c7f 100644 --- a/ethcore/src/miner/miner.rs +++ b/ethcore/src/miner/miner.rs @@ -377,6 +377,8 @@ impl Miner { chain_info.best_block_number, chain_info.best_block_timestamp, nonce_cap, + usize::max_value(), + miner::PendingOrdering::Priority, ); let took_ms = |elapsed: &Duration| { @@ -807,20 +809,26 @@ impl miner::MinerService for Miner { self.transaction_queue.all_transactions() } - fn ready_transactions(&self, chain: &C, max_len: usize) -> Vec> where + fn ready_transactions(&self, chain: &C, max_len: usize, ordering: miner::PendingOrdering) + -> Vec> + where C: ChainInfo + Nonce + Sync, { let chain_info = chain.chain_info(); let from_queue = || { + // We propagate transactions over the nonce cap. + // The mechanism is only to limit number of transactions in pending block + // those transactions are valid and will just be ready to be included in next block. + let nonce_cap = None; + self.transaction_queue.pending( CachedNonceClient::new(chain, &self.nonce_cache), chain_info.best_block_number, chain_info.best_block_timestamp, - // We propagate transactions over the nonce cap. - // The mechanism is only to limit number of transactions in pending block - // those transactions are valid and will just be ready to be included in next block. - None, + nonce_cap, + max_len, + ordering ) }; @@ -830,6 +838,7 @@ impl miner::MinerService for Miner { .iter() .map(|signed| pool::VerifiedTransaction::from_pending_block_transaction(signed.clone())) .map(Arc::new) + .take(max_len) .collect() }, chain_info.best_block_number) }; diff --git a/ethcore/src/miner/mod.rs b/ethcore/src/miner/mod.rs index c141ef6ac4c..6cc05c8c6fc 100644 --- a/ethcore/src/miner/mod.rs +++ b/ethcore/src/miner/mod.rs @@ -26,6 +26,7 @@ pub mod pool_client; pub mod stratum; pub use self::miner::{Miner, MinerOptions, Penalization, PendingSet, AuthoringParams}; +pub use ethcore_miner::pool::PendingOrdering; use std::sync::Arc; use std::collections::BTreeMap; @@ -158,12 +159,12 @@ pub trait MinerService : Send + Sync { fn next_nonce(&self, chain: &C, address: &Address) -> U256 where C: Nonce + Sync; - /// Get a list of all ready transactions. + /// Get a list of all ready transactions either ordered by priority or unordered (cheaper). /// /// Depending on the settings may look in transaction pool or only in pending block. /// If you don't need a full set of transactions, you can add `max_len` and create only a limited set of /// transactions. - fn ready_transactions(&self, chain: &C, max_len: usize) -> Vec> + fn ready_transactions(&self, chain: &C, max_len: usize, ordering: PendingOrdering) -> Vec> where C: ChainInfo + Nonce + Sync; /// Get a list of all transactions in the pool (some of them might not be ready for inclusion yet). diff --git a/miner/src/pool/listener.rs b/miner/src/pool/listener.rs index 3f42372e840..9a8856166b9 100644 --- a/miner/src/pool/listener.rs +++ b/miner/src/pool/listener.rs @@ -51,6 +51,7 @@ impl Notifier { /// Notify listeners about all currently pending transactions. pub fn notify(&mut self) { for l in &self.listeners { + println!("Notifying about transactions: {:?}", self.pending); (l)(&self.pending); } diff --git a/miner/src/pool/mod.rs b/miner/src/pool/mod.rs index 45d28f3c121..cf78b521a83 100644 --- a/miner/src/pool/mod.rs +++ b/miner/src/pool/mod.rs @@ -45,6 +45,15 @@ pub enum PrioritizationStrategy { GasPriceOnly, } +/// Transaction ordering when requesting pending set. +#[derive(Debug, PartialEq, Eq, Clone, Copy)] +pub enum PendingOrdering { + /// Get pending transactions ordered by their priority (potentially expensive) + Priority, + /// Get pending transactions without any care of particular ordering (cheaper). + Unordered, +} + /// Transaction priority. #[derive(Debug, PartialEq, Eq, Clone, Copy)] pub(crate) enum Priority { diff --git a/miner/src/pool/queue.rs b/miner/src/pool/queue.rs index b7efb091526..2935a44e879 100644 --- a/miner/src/pool/queue.rs +++ b/miner/src/pool/queue.rs @@ -27,7 +27,7 @@ use rayon::prelude::*; use transaction; use txpool::{self, Verifier}; -use pool::{self, scoring, verifier, client, ready, listener, PrioritizationStrategy}; +use pool::{self, scoring, verifier, client, ready, listener, PrioritizationStrategy, PendingOrdering}; use pool::local_transactions::LocalTransactionsList; type Listener = (LocalTransactionsList, (listener::Notifier, listener::Logger)); @@ -75,6 +75,7 @@ struct CachedPending { nonce_cap: Option, has_local_pending: bool, pending: Option>>, + max_len: usize, } impl CachedPending { @@ -86,6 +87,7 @@ impl CachedPending { has_local_pending: false, pending: None, nonce_cap: None, + max_len: 0, } } @@ -100,6 +102,7 @@ impl CachedPending { block_number: u64, current_timestamp: u64, nonce_cap: Option<&U256>, + max_len: usize, ) -> Option>> { // First check if we have anything in cache. let pending = self.pending.as_ref()?; @@ -124,7 +127,12 @@ impl CachedPending { return None; } - Some(pending.clone()) + // It's fine to just take a smaller subset, but not other way around. + if max_len > self.max_len { + return None; + } + + Some(pending.iter().take(max_len).cloned().collect()) } } @@ -198,10 +206,10 @@ impl TransactionQueue { results } - /// Returns all transactions in the queue ordered by priority. + /// Returns all transactions in the queue without explicit ordering. pub fn all_transactions(&self) -> Vec> { let ready = |_tx: &pool::VerifiedTransaction| txpool::Readiness::Ready; - self.pool.read().unordered_transactions(ready).collect() + self.pool.read().unordered_pending(ready).collect() } /// Returns current pending transactions ordered by priority. @@ -215,21 +223,32 @@ impl TransactionQueue { block_number: u64, current_timestamp: u64, nonce_cap: Option, + max_len: usize, + ordering: PendingOrdering, ) -> Vec> where C: client::NonceClient, { - if let Some(pending) = self.cached_pending.read().pending(block_number, current_timestamp, nonce_cap.as_ref()) { + if let Some(pending) = self.cached_pending.read().pending(block_number, current_timestamp, nonce_cap.as_ref(), max_len) { return pending; } // Double check after acquiring write lock let mut cached_pending = self.cached_pending.write(); - if let Some(pending) = cached_pending.pending(block_number, current_timestamp, nonce_cap.as_ref()) { + if let Some(pending) = cached_pending.pending(block_number, current_timestamp, nonce_cap.as_ref(), max_len) { return pending; } - let pending: Vec<_> = self.collect_pending(client, block_number, current_timestamp, nonce_cap, |i| i.collect()); + // In case we don't have a cached set, but we don't care about order + // just return the unordered set. + if let PendingOrdering::Unordered = ordering { + let ready = Self::ready(client, block_number, current_timestamp, nonce_cap); + return self.pool.read().unordered_pending(ready).take(max_len).collect(); + } + + let pending: Vec<_> = self.collect_pending(client, block_number, current_timestamp, nonce_cap, |i| { + i.take(max_len).collect() + }); *cached_pending = CachedPending { block_number, @@ -237,6 +256,7 @@ impl TransactionQueue { nonce_cap, has_local_pending: self.has_local_pending_transactions(), pending: Some(pending.clone()), + max_len, }; pending @@ -261,15 +281,26 @@ impl TransactionQueue { scoring::NonceAndGasPrice, Listener, >) -> T, + { + debug!(target: "txqueue", "Re-computing pending set for block: {}", block_number); + let ready = Self::ready(client, block_number, current_timestamp, nonce_cap); + collect(self.pool.read().pending(ready)) + } + + fn ready( + client: C, + block_number: u64, + current_timestamp: u64, + nonce_cap: Option, + ) -> (ready::Condition, ready::State) where + C: client::NonceClient, { let pending_readiness = ready::Condition::new(block_number, current_timestamp); // don't mark any transactions as stale at this point. let stale_id = None; let state_readiness = ready::State::new(client, stale_id, nonce_cap); - let ready = (pending_readiness, state_readiness); - - collect(self.pool.read().pending(ready)) + (pending_readiness, state_readiness) } /// Culls all stalled transactions from the pool. diff --git a/parity/rpc_apis.rs b/parity/rpc_apis.rs index 7b914f2860d..0bcd32e22b3 100644 --- a/parity/rpc_apis.rs +++ b/parity/rpc_apis.rs @@ -306,7 +306,7 @@ impl FullDependencies { let client = EthPubSubClient::new(self.client.clone(), self.remote.clone()); let h = client.handler(); self.miner.add_transactions_listener(Box::new(move |hashes| if let Some(h) = h.upgrade() { - h.new_transactions(hashes); + h.notify_new_transactions(hashes); })); if let Some(h) = client.handler().upgrade() { @@ -527,7 +527,7 @@ impl LightDependencies { let h = client.handler(); self.transaction_queue.write().add_listener(Box::new(move |transactions| { if let Some(h) = h.upgrade() { - h.new_transactions(transactions); + h.notify_new_transactions(transactions); } })); handler.extend_with(EthPubSub::to_delegate(client)); diff --git a/rpc/src/v1/impls/eth_filter.rs b/rpc/src/v1/impls/eth_filter.rs index 6ca1c355f30..d228f93961f 100644 --- a/rpc/src/v1/impls/eth_filter.rs +++ b/rpc/src/v1/impls/eth_filter.rs @@ -85,7 +85,7 @@ impl Filterable for EthFilterClient where } fn pending_transactions_hashes(&self) -> Vec { - self.miner.ready_transactions(&*self.client) + self.miner.ready_transactions(&*self.client, usize::max_value(), miner::PendingOrdering::Unordered) .into_iter() .map(|tx| tx.signed().hash()) .collect() diff --git a/rpc/src/v1/impls/eth_pubsub.rs b/rpc/src/v1/impls/eth_pubsub.rs index c0789910c33..61a99171de4 100644 --- a/rpc/src/v1/impls/eth_pubsub.rs +++ b/rpc/src/v1/impls/eth_pubsub.rs @@ -175,7 +175,7 @@ impl ChainNotificationHandler { } /// Notify all subscribers about new transaction hashes. - pub fn new_transactions(&self, hashes: &[H256]) { + pub fn notify_new_transactions(&self, hashes: &[H256]) { for subscriber in self.transactions_subscribers.read().values() { for hash in hashes { Self::notify(&self.remote, subscriber, pubsub::Result::TransactionHash((*hash).into())); diff --git a/rpc/src/v1/impls/light/parity.rs b/rpc/src/v1/impls/light/parity.rs index 025538fc427..6249a75a615 100644 --- a/rpc/src/v1/impls/light/parity.rs +++ b/rpc/src/v1/impls/light/parity.rs @@ -264,12 +264,13 @@ impl Parity for ParityClient { .map(Into::into) } - fn pending_transactions(&self) -> Result> { + fn pending_transactions(&self, limit: Trailing) -> Result> { let txq = self.light_dispatch.transaction_queue.read(); let chain_info = self.light_dispatch.client.chain_info(); Ok( txq.ready_transactions(chain_info.best_block_number, chain_info.best_block_timestamp) .into_iter() + .take(limit.unwrap_or_else(usize::max_value)) .map(|tx| Transaction::from_pending(tx, chain_info.best_block_number, self.eip86_transition)) .collect::>() ) diff --git a/rpc/src/v1/impls/parity.rs b/rpc/src/v1/impls/parity.rs index 3fa9cb991bb..048c5346bff 100644 --- a/rpc/src/v1/impls/parity.rs +++ b/rpc/src/v1/impls/parity.rs @@ -314,15 +314,19 @@ impl Parity for ParityClient where .map(Into::into) } - fn pending_transactions(&self) -> Result> { + fn pending_transactions(&self, limit: Trailing) -> Result> { let block_number = self.client.chain_info().best_block_number; - let ready_transactions = self.miner.ready_transactions(&*self.client); + let ready_transactions = self.miner.ready_transactions( + &*self.client, + limit.unwrap_or_else(usize::max_value), + miner::PendingOrdering::Priority, + ); Ok(ready_transactions .into_iter() .map(|t| Transaction::from_pending(t.pending().clone(), block_number, self.eip86_transition)) .collect() - ) + ) } fn all_transactions(&self) -> Result> { diff --git a/rpc/src/v1/traits/parity.rs b/rpc/src/v1/traits/parity.rs index 83d8b19811c..c06997b2bab 100644 --- a/rpc/src/v1/traits/parity.rs +++ b/rpc/src/v1/traits/parity.rs @@ -141,7 +141,7 @@ build_rpc_trait! { /// Returns all pending transactions from transaction queue. #[rpc(name = "parity_pendingTransactions")] - fn pending_transactions(&self) -> Result>; + fn pending_transactions(&self, Trailing) -> Result>; /// Returns all transactions from transaction queue. /// From bae903ef09505fe296be38d897635c51c2b90b1c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Fri, 1 Jun 2018 15:02:06 +0200 Subject: [PATCH 03/13] Split timeout work into smaller timers. --- ethcore/sync/src/api.rs | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/ethcore/sync/src/api.rs b/ethcore/sync/src/api.rs index 9e6cdafa6ec..e8f1a6dd3dd 100644 --- a/ethcore/sync/src/api.rs +++ b/ethcore/sync/src/api.rs @@ -359,6 +359,10 @@ impl SyncProvider for EthSync { } } +const PEERS_TIMER: TimerToken = 0; +const SYNC_TIMER: TimerToken = 1; +const TX_TIMER: TimerToken = 2; + struct SyncProtocolHandler { /// Shared blockchain client. chain: Arc, @@ -373,7 +377,9 @@ struct SyncProtocolHandler { impl NetworkProtocolHandler for SyncProtocolHandler { fn initialize(&self, io: &NetworkContext, _host_info: &HostInfo) { if io.subprotocol_name() != WARP_SYNC_PROTOCOL_ID { - io.register_timer(0, Duration::from_secs(1)).expect("Error registering sync timer"); + io.register_timer(PEERS_TIMER, Duration::from_millis(700)).expect("Error registering sync timer"); + io.register_timer(SYNC_TIMER, Duration::from_millis(1100)).expect("Error registering sync timer"); + io.register_timer(TX_TIMER, Duration::from_millis(1300)).expect("Error registering sync timer"); } } @@ -399,12 +405,14 @@ impl NetworkProtocolHandler for SyncProtocolHandler { } } - fn timeout(&self, io: &NetworkContext, _timer: TimerToken) { + fn timeout(&self, io: &NetworkContext, timer: TimerToken) { trace_time!("sync::timeout"); let mut io = NetSyncIo::new(io, &*self.chain, &*self.snapshot_service, &self.overlay); - self.sync.write().maintain_peers(&mut io); - self.sync.write().maintain_sync(&mut io); - self.sync.write().propagate_new_transactions(&mut io); + match timer { + PEERS_TIMER => self.sync.write().maintain_peers(&mut io), + SYNC_TIMER => self.sync.write().maintain_sync(&mut io), + TX_TIMER => self.sync.write().propagate_new_transactions(&mut io), + } } } From 3804063db30a4b5c085c5c8ceeeeccc222921909 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Fri, 1 Jun 2018 15:30:42 +0200 Subject: [PATCH 04/13] Avoid collecting all pending transactions when mining --- ethcore/src/miner/miner.rs | 10 ++++++++-- ethcore/sync/src/api.rs | 5 ++++- miner/src/pool/queue.rs | 1 + 3 files changed, 13 insertions(+), 3 deletions(-) diff --git a/ethcore/src/miner/miner.rs b/ethcore/src/miner/miner.rs index 98e99db2c7f..8ef3bf04848 100644 --- a/ethcore/src/miner/miner.rs +++ b/ethcore/src/miner/miner.rs @@ -365,19 +365,25 @@ impl Miner { let client = self.pool_client(chain); let engine_params = self.engine.params(); - let min_tx_gas = self.engine.schedule(chain_info.best_block_number).tx_gas.into(); + let min_tx_gas: U256 = self.engine.schedule(chain_info.best_block_number).tx_gas.into(); let nonce_cap: Option = if chain_info.best_block_number + 1 >= engine_params.dust_protection_transition { Some((engine_params.nonce_cap_increment * (chain_info.best_block_number + 1)).into()) } else { None }; + // we will never need more transactions than limit divided by min gas + let max_transactions = if min_tx_gas.is_zero() { + usize::max_value() + } else { + (*open_block.block().header().gas_limit() / min_tx_gas).as_u64() as usize + }; let pending: Vec> = self.transaction_queue.pending( client.clone(), chain_info.best_block_number, chain_info.best_block_timestamp, nonce_cap, - usize::max_value(), + max_transactions, miner::PendingOrdering::Priority, ); diff --git a/ethcore/sync/src/api.rs b/ethcore/sync/src/api.rs index e8f1a6dd3dd..8062fb6f18d 100644 --- a/ethcore/sync/src/api.rs +++ b/ethcore/sync/src/api.rs @@ -411,7 +411,10 @@ impl NetworkProtocolHandler for SyncProtocolHandler { match timer { PEERS_TIMER => self.sync.write().maintain_peers(&mut io), SYNC_TIMER => self.sync.write().maintain_sync(&mut io), - TX_TIMER => self.sync.write().propagate_new_transactions(&mut io), + TX_TIMER => { + self.sync.write().propagate_new_transactions(&mut io); + }, + _ => warn!("Unknown timer {} triggered.", timer), } } } diff --git a/miner/src/pool/queue.rs b/miner/src/pool/queue.rs index 2935a44e879..11fa2a6904f 100644 --- a/miner/src/pool/queue.rs +++ b/miner/src/pool/queue.rs @@ -283,6 +283,7 @@ impl TransactionQueue { >) -> T, { debug!(target: "txqueue", "Re-computing pending set for block: {}", block_number); + let _timer = ::trace_time::PerfTimer::new("pool::collect_pending"); let ready = Self::ready(client, block_number, current_timestamp, nonce_cap); collect(self.pool.read().pending(ready)) } From 1c20ac133d885bab72e2fa9ed87e3fc65520392b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Fri, 1 Jun 2018 16:13:40 +0200 Subject: [PATCH 05/13] Remove println. --- miner/src/pool/listener.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/miner/src/pool/listener.rs b/miner/src/pool/listener.rs index 9a8856166b9..3f42372e840 100644 --- a/miner/src/pool/listener.rs +++ b/miner/src/pool/listener.rs @@ -51,7 +51,6 @@ impl Notifier { /// Notify listeners about all currently pending transactions. pub fn notify(&mut self) { for l in &self.listeners { - println!("Notifying about transactions: {:?}", self.pending); (l)(&self.pending); } From 6f154df1c47d5cddde11dd426a0e0d7cf5191b94 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Mon, 4 Jun 2018 09:20:37 +0200 Subject: [PATCH 06/13] Use priority ordering in eth-filter. --- rpc/src/v1/impls/eth_filter.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rpc/src/v1/impls/eth_filter.rs b/rpc/src/v1/impls/eth_filter.rs index d228f93961f..ed9c00bf59c 100644 --- a/rpc/src/v1/impls/eth_filter.rs +++ b/rpc/src/v1/impls/eth_filter.rs @@ -85,7 +85,7 @@ impl Filterable for EthFilterClient where } fn pending_transactions_hashes(&self) -> Vec { - self.miner.ready_transactions(&*self.client, usize::max_value(), miner::PendingOrdering::Unordered) + self.miner.ready_transactions(&*self.client, usize::max_value(), miner::PendingOrdering::Priority) .into_iter() .map(|tx| tx.signed().hash()) .collect() From 45e8f77448021ccb5d3ebb10543c6b8b707f0a10 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Mon, 4 Jun 2018 15:05:15 +0200 Subject: [PATCH 07/13] Fix ethcore-miner tests and tx propagation. --- ethcore/src/miner/miner.rs | 24 ++++++---- ethcore/sync/src/chain/mod.rs | 4 ++ ethcore/sync/src/chain/propagator.rs | 3 +- miner/src/pool/mod.rs | 30 +++++++++++- miner/src/pool/queue.rs | 15 +++--- miner/src/pool/tests/mod.rs | 68 +++++++++++++++------------- 6 files changed, 93 insertions(+), 51 deletions(-) diff --git a/ethcore/src/miner/miner.rs b/ethcore/src/miner/miner.rs index 8ef3bf04848..7a19a58c500 100644 --- a/ethcore/src/miner/miner.rs +++ b/ethcore/src/miner/miner.rs @@ -380,11 +380,13 @@ impl Miner { let pending: Vec> = self.transaction_queue.pending( client.clone(), - chain_info.best_block_number, - chain_info.best_block_timestamp, - nonce_cap, - max_transactions, - miner::PendingOrdering::Priority, + pool::PendingSettings { + block_number: chain_info.best_block_number, + current_timestamp: chain_info.best_block_timestamp, + nonce_cap, + max_len: max_transactions, + ordering: miner::PendingOrdering::Priority, + } ); let took_ms = |elapsed: &Duration| { @@ -830,11 +832,13 @@ impl miner::MinerService for Miner { self.transaction_queue.pending( CachedNonceClient::new(chain, &self.nonce_cache), - chain_info.best_block_number, - chain_info.best_block_timestamp, - nonce_cap, - max_len, - ordering + pool::PendingSettings { + block_number: chain_info.best_block_number, + current_timestamp: chain_info.best_block_timestamp, + nonce_cap, + max_len, + ordering, + }, ) }; diff --git a/ethcore/sync/src/chain/mod.rs b/ethcore/sync/src/chain/mod.rs index c0ee8299b86..f7e84e95820 100644 --- a/ethcore/sync/src/chain/mod.rs +++ b/ethcore/sync/src/chain/mod.rs @@ -149,6 +149,10 @@ const MAX_NEW_HASHES: usize = 64; const MAX_NEW_BLOCK_AGE: BlockNumber = 20; // maximal packet size with transactions (cannot be greater than 16MB - protocol limitation). const MAX_TRANSACTION_PACKET_SIZE: usize = 8 * 1024 * 1024; +// Maximal number of transactions queried from miner to propagate. +// This set is used to diff with transactions known by the peer and +// we will send a difference of length up to `MAX_TRANSACTIONS_TO_PROPAGATE`. +const MAX_TRANSACTIONS_TO_QUERY: usize = 1024; // Maximal number of transactions in sent in single packet. const MAX_TRANSACTIONS_TO_PROPAGATE: usize = 64; // Min number of blocks to be behind for a snapshot sync diff --git a/ethcore/sync/src/chain/propagator.rs b/ethcore/sync/src/chain/propagator.rs index da0c9302931..75cf550f28b 100644 --- a/ethcore/sync/src/chain/propagator.rs +++ b/ethcore/sync/src/chain/propagator.rs @@ -33,6 +33,7 @@ use super::{ MAX_PEERS_PROPAGATION, MAX_TRANSACTION_PACKET_SIZE, MAX_TRANSACTIONS_TO_PROPAGATE, + MAX_TRANSACTIONS_TO_QUERY, MIN_PEERS_PROPAGATION, CONSENSUS_DATA_PACKET, NEW_BLOCK_HASHES_PACKET, @@ -114,7 +115,7 @@ impl SyncPropagator { return 0; } - let transactions = io.chain().ready_transactions(MAX_TRANSACTIONS_TO_PROPAGATE); + let transactions = io.chain().ready_transactions(MAX_TRANSACTIONS_TO_QUERY); if transactions.is_empty() { return 0; } diff --git a/miner/src/pool/mod.rs b/miner/src/pool/mod.rs index cf78b521a83..465050b8f2c 100644 --- a/miner/src/pool/mod.rs +++ b/miner/src/pool/mod.rs @@ -16,7 +16,7 @@ //! Transaction Pool -use ethereum_types::{H256, Address}; +use ethereum_types::{U256, H256, Address}; use heapsize::HeapSizeOf; use transaction; use txpool; @@ -54,6 +54,34 @@ pub enum PendingOrdering { Unordered, } +/// Pending set query settings +#[derive(Debug, Clone)] +pub struct PendingSettings { + /// Current block number (affects readiness of some transactions). + pub block_number: u64, + /// Current timestamp (affects readiness of some transactions). + pub current_timestamp: u64, + /// Nonce cap (for dust protection; EIP-168) + pub nonce_cap: Option, + /// Maximal number of transactions in pending the set. + pub max_len: usize, + /// Ordering of transactions. + pub ordering: PendingOrdering, +} + +impl PendingSettings { + /// Get all transactions (no cap or len limit) prioritized. + pub fn all_prioritized(block_number: u64, current_timestamp: u64) -> Self { + PendingSettings { + block_number, + current_timestamp, + nonce_cap: None, + max_len: usize::max_value(), + ordering: PendingOrdering::Priority, + } + } +} + /// Transaction priority. #[derive(Debug, PartialEq, Eq, Clone, Copy)] pub(crate) enum Priority { diff --git a/miner/src/pool/queue.rs b/miner/src/pool/queue.rs index 11fa2a6904f..ee3ae5b1196 100644 --- a/miner/src/pool/queue.rs +++ b/miner/src/pool/queue.rs @@ -27,7 +27,10 @@ use rayon::prelude::*; use transaction; use txpool::{self, Verifier}; -use pool::{self, scoring, verifier, client, ready, listener, PrioritizationStrategy, PendingOrdering}; +use pool::{ + self, scoring, verifier, client, ready, listener, + PrioritizationStrategy, PendingOrdering, PendingSettings, +}; use pool::local_transactions::LocalTransactionsList; type Listener = (LocalTransactionsList, (listener::Notifier, listener::Logger)); @@ -220,15 +223,11 @@ impl TransactionQueue { pub fn pending( &self, client: C, - block_number: u64, - current_timestamp: u64, - nonce_cap: Option, - max_len: usize, - ordering: PendingOrdering, + settings: PendingSettings, ) -> Vec> where C: client::NonceClient, { - + let PendingSettings { block_number, current_timestamp, nonce_cap, max_len, ordering } = settings; if let Some(pending) = self.cached_pending.read().pending(block_number, current_timestamp, nonce_cap.as_ref(), max_len) { return pending; } @@ -468,7 +467,7 @@ mod tests { fn should_get_pending_transactions() { let queue = TransactionQueue::new(txpool::Options::default(), verifier::Options::default(), PrioritizationStrategy::GasPriceOnly); - let pending: Vec<_> = queue.pending(TestClient::default(), 0, 0, None); + let pending: Vec<_> = queue.pending(TestClient::default(), PendingSettings::all_prioritized(0, 0)); for tx in pending { assert!(tx.signed().nonce > 0.into()); diff --git a/miner/src/pool/tests/mod.rs b/miner/src/pool/tests/mod.rs index 85dedaaa45b..6ab1b65b76c 100644 --- a/miner/src/pool/tests/mod.rs +++ b/miner/src/pool/tests/mod.rs @@ -18,7 +18,7 @@ use ethereum_types::U256; use transaction::{self, PendingTransaction}; use txpool; -use pool::{verifier, TransactionQueue, PrioritizationStrategy}; +use pool::{verifier, TransactionQueue, PrioritizationStrategy, PendingSettings, PendingOrdering}; pub mod tx; pub mod client; @@ -108,7 +108,7 @@ fn should_handle_same_transaction_imported_twice_with_different_state_nonces() { // and then there should be only one transaction in current (the one with higher gas_price) assert_eq!(res, vec![Ok(())]); assert_eq!(txq.status().status.transaction_count, 1); - let top = txq.pending(TestClient::new(), 0, 0, None); + let top = txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)); assert_eq!(top[0].hash, hash); } @@ -133,7 +133,7 @@ fn should_move_all_transactions_from_future() { // then assert_eq!(res, vec![Ok(())]); assert_eq!(txq.status().status.transaction_count, 2); - let top = txq.pending(TestClient::new(), 0, 0, None); + let top = txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)); assert_eq!(top[0].hash, hash); assert_eq!(top[1].hash, hash2); } @@ -207,7 +207,7 @@ fn should_import_txs_from_same_sender() { txq.import(TestClient::new(), txs.local().into_vec()); // then - let top = txq.pending(TestClient::new(), 0 ,0, None); + let top = txq.pending(TestClient::new(), PendingSettings::all_prioritized(0 ,0)); assert_eq!(top[0].hash, hash); assert_eq!(top[1].hash, hash2); assert_eq!(top.len(), 2); @@ -229,7 +229,7 @@ fn should_prioritize_local_transactions_within_same_nonce_height() { assert_eq!(res, vec![Ok(()), Ok(())]); // then - let top = txq.pending(client, 0, 0, None); + let top = txq.pending(client, PendingSettings::all_prioritized(0, 0)); assert_eq!(top[0].hash, hash); // local should be first assert_eq!(top[1].hash, hash2); assert_eq!(top.len(), 2); @@ -251,7 +251,7 @@ fn should_prioritize_reimported_transactions_within_same_nonce_height() { assert_eq!(res, vec![Ok(()), Ok(())]); // then - let top = txq.pending(TestClient::new(), 0, 0, None); + let top = txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)); assert_eq!(top[0].hash, hash); // retracted should be first assert_eq!(top[1].hash, hash2); assert_eq!(top.len(), 2); @@ -270,7 +270,7 @@ fn should_not_prioritize_local_transactions_with_different_nonce_height() { assert_eq!(res, vec![Ok(()), Ok(())]); // then - let top = txq.pending(TestClient::new(), 0, 0, None); + let top = txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)); assert_eq!(top[0].hash, hash); assert_eq!(top[1].hash, hash2); assert_eq!(top.len(), 2); @@ -288,7 +288,7 @@ fn should_put_transaction_to_futures_if_gap_detected() { // then assert_eq!(res, vec![Ok(()), Ok(())]); - let top = txq.pending(TestClient::new(), 0, 0, None); + let top = txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)); assert_eq!(top.len(), 1); assert_eq!(top[0].hash, hash); } @@ -308,9 +308,9 @@ fn should_handle_min_block() { assert_eq!(res, vec![Ok(()), Ok(())]); // then - let top = txq.pending(TestClient::new(), 0, 0, None); + let top = txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)); assert_eq!(top.len(), 0); - let top = txq.pending(TestClient::new(), 1, 0, None); + let top = txq.pending(TestClient::new(), PendingSettings::all_prioritized(1, 0)); assert_eq!(top.len(), 2); } @@ -341,7 +341,7 @@ fn should_move_transactions_if_gap_filled() { let res = txq.import(TestClient::new(), vec![tx, tx2].local()); assert_eq!(res, vec![Ok(()), Ok(())]); assert_eq!(txq.status().status.transaction_count, 2); - assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 1); + assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)).len(), 1); // when let res = txq.import(TestClient::new(), vec![tx1.local()]); @@ -349,7 +349,7 @@ fn should_move_transactions_if_gap_filled() { // then assert_eq!(txq.status().status.transaction_count, 3); - assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 3); + assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)).len(), 3); } #[test] @@ -361,12 +361,12 @@ fn should_remove_transaction() { let res = txq.import(TestClient::default(), vec![tx, tx2].local()); assert_eq!(res, vec![Ok(()), Ok(())]); assert_eq!(txq.status().status.transaction_count, 2); - assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 1); + assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)).len(), 1); // when txq.cull(TestClient::new().with_nonce(124)); assert_eq!(txq.status().status.transaction_count, 1); - assert_eq!(txq.pending(TestClient::new().with_nonce(125), 0, 0, None).len(), 1); + assert_eq!(txq.pending(TestClient::new().with_nonce(125), PendingSettings::all_prioritized(0, 0)).len(), 1); txq.cull(TestClient::new().with_nonce(126)); // then @@ -384,19 +384,19 @@ fn should_move_transactions_to_future_if_gap_introduced() { let res = txq.import(TestClient::new(), vec![tx3, tx2].local()); assert_eq!(res, vec![Ok(()), Ok(())]); assert_eq!(txq.status().status.transaction_count, 2); - assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 1); + assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)).len(), 1); let res = txq.import(TestClient::new(), vec![tx].local()); assert_eq!(res, vec![Ok(())]); assert_eq!(txq.status().status.transaction_count, 3); - assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 3); + assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)).len(), 3); // when txq.remove(vec![&hash], true); // then assert_eq!(txq.status().status.transaction_count, 2); - assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 1); + assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)).len(), 1); } #[test] @@ -447,7 +447,7 @@ fn should_prefer_current_transactions_when_hitting_the_limit() { assert_eq!(res, vec![Ok(())]); assert_eq!(txq.status().status.transaction_count, 1); - let top = txq.pending(TestClient::new(), 0, 0, None); + let top = txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)); assert_eq!(top.len(), 1); assert_eq!(top[0].hash, hash); assert_eq!(txq.next_nonce(TestClient::new(), &sender), Some(124.into())); @@ -495,19 +495,19 @@ fn should_accept_same_transaction_twice_if_removed() { let res = txq.import(TestClient::new(), txs.local().into_vec()); assert_eq!(res, vec![Ok(()), Ok(())]); assert_eq!(txq.status().status.transaction_count, 2); - assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 2); + assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)).len(), 2); // when txq.remove(vec![&hash], true); assert_eq!(txq.status().status.transaction_count, 1); - assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 0); + assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)).len(), 0); let res = txq.import(TestClient::new(), vec![tx1].local()); assert_eq!(res, vec![Ok(())]); // then assert_eq!(txq.status().status.transaction_count, 2); - assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 2); + assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)).len(), 2); } #[test] @@ -527,8 +527,8 @@ fn should_not_replace_same_transaction_if_the_fee_is_less_than_minimal_bump() { // then assert_eq!(res, vec![Err(transaction::Error::TooCheapToReplace), Ok(())]); assert_eq!(txq.status().status.transaction_count, 2); - assert_eq!(txq.pending(client.clone(), 0, 0, None)[0].signed().gas_price, U256::from(20)); - assert_eq!(txq.pending(client.clone(), 0, 0, None)[1].signed().gas_price, U256::from(2)); + assert_eq!(txq.pending(client.clone(), PendingSettings::all_prioritized(0, 0))[0].signed().gas_price, U256::from(20)); + assert_eq!(txq.pending(client.clone(), PendingSettings::all_prioritized(0, 0))[1].signed().gas_price, U256::from(2)); } #[test] @@ -570,7 +570,7 @@ fn should_return_valid_last_nonce_after_cull() { let client = TestClient::new().with_nonce(124); txq.cull(client.clone()); // tx2 should be not be promoted to current - assert_eq!(txq.pending(client.clone(), 0, 0, None).len(), 0); + assert_eq!(txq.pending(client.clone(), PendingSettings::all_prioritized(0, 0)).len(), 0); // then assert_eq!(txq.next_nonce(client.clone(), &sender), None); @@ -668,7 +668,7 @@ fn should_accept_local_transactions_below_min_gas_price() { assert_eq!(res, vec![Ok(())]); // then - assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 1); + assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)).len(), 1); } #[test] @@ -686,7 +686,7 @@ fn should_accept_local_service_transaction() { assert_eq!(res, vec![Ok(())]); // then - assert_eq!(txq.pending(TestClient::new(), 0, 0, None).len(), 1); + assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)).len(), 1); } #[test] @@ -727,9 +727,15 @@ fn should_not_return_transactions_over_nonce_cap() { assert_eq!(res, vec![Ok(()), Ok(()), Ok(())]); // when - let all = txq.pending(TestClient::new(), 0, 0, None); + let all = txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)); // This should invalidate the cache! - let limited = txq.pending(TestClient::new(), 0, 0, Some(123.into())); + let limited = txq.pending(TestClient::new(), PendingSettings { + block_number: 0, + current_timestamp: 0, + nonce_cap: Some(123.into()), + max_len: usize::max_value(), + ordering: PendingOrdering::Priority, + }); // then @@ -750,12 +756,12 @@ fn should_clear_cache_after_timeout_for_local() { // This should populate cache and set timestamp to 1 // when - assert_eq!(txq.pending(TestClient::new(), 0, 1, None).len(), 0); - assert_eq!(txq.pending(TestClient::new(), 0, 1000, None).len(), 0); + assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 1)).len(), 0); + assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 1000)).len(), 0); // This should invalidate the cache and trigger transaction ready. // then - assert_eq!(txq.pending(TestClient::new(), 0, 1002, None).len(), 2); + assert_eq!(txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 1002)).len(), 2); } #[test] From 2965e5b146ced7f0f75912ab25c5eff895b8b479 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Tue, 5 Jun 2018 15:58:36 +0200 Subject: [PATCH 08/13] Review grumbles addressed. --- ethcore/sync/src/api.rs | 4 ++-- ethcore/sync/src/chain/mod.rs | 2 +- miner/src/lib.rs | 3 ++- miner/src/pool/queue.rs | 4 ++-- transaction-pool/src/tests/mod.rs | 2 +- 5 files changed, 8 insertions(+), 7 deletions(-) diff --git a/ethcore/sync/src/api.rs b/ethcore/sync/src/api.rs index c7647b796fc..b8610d27b5d 100644 --- a/ethcore/sync/src/api.rs +++ b/ethcore/sync/src/api.rs @@ -377,9 +377,9 @@ struct SyncProtocolHandler { impl NetworkProtocolHandler for SyncProtocolHandler { fn initialize(&self, io: &NetworkContext) { if io.subprotocol_name() != WARP_SYNC_PROTOCOL_ID { - io.register_timer(PEERS_TIMER, Duration::from_millis(700)).expect("Error registering sync timer"); + io.register_timer(PEERS_TIMER, Duration::from_millis(700)).expect("Error registering peers timer"); io.register_timer(SYNC_TIMER, Duration::from_millis(1100)).expect("Error registering sync timer"); - io.register_timer(TX_TIMER, Duration::from_millis(1300)).expect("Error registering sync timer"); + io.register_timer(TX_TIMER, Duration::from_millis(1300)).expect("Error registering transactions timer"); } } diff --git a/ethcore/sync/src/chain/mod.rs b/ethcore/sync/src/chain/mod.rs index f7e84e95820..079e8ce8e47 100644 --- a/ethcore/sync/src/chain/mod.rs +++ b/ethcore/sync/src/chain/mod.rs @@ -152,7 +152,7 @@ const MAX_TRANSACTION_PACKET_SIZE: usize = 8 * 1024 * 1024; // Maximal number of transactions queried from miner to propagate. // This set is used to diff with transactions known by the peer and // we will send a difference of length up to `MAX_TRANSACTIONS_TO_PROPAGATE`. -const MAX_TRANSACTIONS_TO_QUERY: usize = 1024; +const MAX_TRANSACTIONS_TO_QUERY: usize = 4096; // Maximal number of transactions in sent in single packet. const MAX_TRANSACTIONS_TO_PROPAGATE: usize = 64; // Min number of blocks to be behind for a snapshot sync diff --git a/miner/src/lib.rs b/miner/src/lib.rs index 08ea7d204fb..5305e67b4df 100644 --- a/miner/src/lib.rs +++ b/miner/src/lib.rs @@ -31,13 +31,14 @@ extern crate parking_lot; extern crate price_info; extern crate rayon; extern crate rlp; -extern crate trace_time; extern crate transaction_pool as txpool; #[macro_use] extern crate error_chain; #[macro_use] extern crate log; +#[macro_use] +extern crate trace_time; #[cfg(test)] extern crate rustc_hex; diff --git a/miner/src/pool/queue.rs b/miner/src/pool/queue.rs index ee3ae5b1196..0f92d74937e 100644 --- a/miner/src/pool/queue.rs +++ b/miner/src/pool/queue.rs @@ -185,7 +185,7 @@ impl TransactionQueue { transactions: Vec, ) -> Vec> { // Run verification - let _timer = ::trace_time::PerfTimer::new("queue::verifyAndImport"); + trace_time!("queue::verifyAndImport"); let options = self.options.read().clone(); let verifier = verifier::Verifier::new(client, options, self.insertion_id.clone()); @@ -282,7 +282,7 @@ impl TransactionQueue { >) -> T, { debug!(target: "txqueue", "Re-computing pending set for block: {}", block_number); - let _timer = ::trace_time::PerfTimer::new("pool::collect_pending"); + trace_time!("pool::collect_pending"); let ready = Self::ready(client, block_number, current_timestamp, nonce_cap); collect(self.pool.read().pending(ready)) } diff --git a/transaction-pool/src/tests/mod.rs b/transaction-pool/src/tests/mod.rs index a8f6d9f3014..9d78a6dd8be 100644 --- a/transaction-pool/src/tests/mod.rs +++ b/transaction-pool/src/tests/mod.rs @@ -259,7 +259,7 @@ fn should_return_unordered_iterator() { let tx0 = txq.import(b.tx().nonce(0).gas_price(5).new()).unwrap(); let tx1 = txq.import(b.tx().nonce(1).gas_price(5).new()).unwrap(); let tx2 = txq.import(b.tx().nonce(2).new()).unwrap(); - let tx3 =txq.import(b.tx().nonce(3).gas_price(4).new()).unwrap(); + let tx3 = txq.import(b.tx().nonce(3).gas_price(4).new()).unwrap(); //gap txq.import(b.tx().nonce(5).new()).unwrap(); From 6e219f47cf36b24bc0dcde7bc7ff8caa3bdd12cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Tue, 5 Jun 2018 16:30:05 +0200 Subject: [PATCH 09/13] Add test for unordered not populating the cache. --- miner/src/pool/queue.rs | 6 ++++ miner/src/pool/tests/mod.rs | 56 +++++++++++++++++++++++++++++++++++++ 2 files changed, 62 insertions(+) diff --git a/miner/src/pool/queue.rs b/miner/src/pool/queue.rs index 0f92d74937e..66dedda6f23 100644 --- a/miner/src/pool/queue.rs +++ b/miner/src/pool/queue.rs @@ -441,6 +441,12 @@ impl TransactionQueue { let mut pool = self.pool.write(); (pool.listener_mut().1).0.add(f); } + + /// Check if pending set is cached. + #[cfg(test)] + pub fn is_pending_cached(&self) -> bool { + self.cached_pending.read().pending.is_some() + } } diff --git a/miner/src/pool/tests/mod.rs b/miner/src/pool/tests/mod.rs index 6ab1b65b76c..60c5beda764 100644 --- a/miner/src/pool/tests/mod.rs +++ b/miner/src/pool/tests/mod.rs @@ -743,6 +743,62 @@ fn should_not_return_transactions_over_nonce_cap() { assert_eq!(limited.len(), 1); } +#[test] +fn should_return_cached_pending_even_if_unordered_is_requested() { + // given + let txq = new_queue(); + let tx1 = Tx::default().signed(); + let (tx2_1, tx2_2)= Tx::default().signed_pair(); + let tx2_1_hash = tx2_1.hash(); + let res = txq.import(TestClient::new(), vec![tx1].unverified()); + assert_eq!(res, vec![Ok(())]); + let res = txq.import(TestClient::new(), vec![tx2_1, tx2_2].local()); + assert_eq!(res, vec![Ok(()), Ok(())]); + + // when + let all = txq.pending(TestClient::new(), PendingSettings::all_prioritized(0, 0)); + assert_eq!(all[0].hash, tx2_1_hash); + assert_eq!(all.len(), 3); + + // This should not invalidate the cache! + let limited = txq.pending(TestClient::new(), PendingSettings { + block_number: 0, + current_timestamp: 0, + nonce_cap: None, + max_len: 3, + ordering: PendingOrdering::Unordered, + }); + + // then + assert_eq!(all, limited); +} + +#[test] +fn should_return_unordered_and_not_populate_the_cache() { + // given + let txq = new_queue(); + let tx1 = Tx::default().signed(); + let (tx2_1, tx2_2)= Tx::default().signed_pair(); + let res = txq.import(TestClient::new(), vec![tx1].unverified()); + assert_eq!(res, vec![Ok(())]); + let res = txq.import(TestClient::new(), vec![tx2_1, tx2_2].local()); + assert_eq!(res, vec![Ok(()), Ok(())]); + + // when + // This should not invalidate the cache! + let limited = txq.pending(TestClient::new(), PendingSettings { + block_number: 0, + current_timestamp: 0, + nonce_cap: None, + max_len: usize::max_value(), + ordering: PendingOrdering::Unordered, + }); + + // then + assert_eq!(limited.len(), 3); + assert!(!txq.is_pending_cached()); +} + #[test] fn should_clear_cache_after_timeout_for_local() { // given From c805564dbf4f42debf70e608b856bd5090570b4d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Tue, 5 Jun 2018 16:45:43 +0200 Subject: [PATCH 10/13] Fix ethcore tests. --- ethcore/src/miner/miner.rs | 10 +++++----- ethcore/src/tests/client.rs | 10 +++++----- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/ethcore/src/miner/miner.rs b/ethcore/src/miner/miner.rs index 7a19a58c500..f2831b5cb03 100644 --- a/ethcore/src/miner/miner.rs +++ b/ethcore/src/miner/miner.rs @@ -1102,7 +1102,7 @@ mod tests { use rustc_hex::FromHex; use client::{TestBlockChainClient, EachBlockWith, ChainInfo, ImportSealedBlock}; - use miner::MinerService; + use miner::{MinerService, PendingOrdering}; use test_helpers::{generate_dummy_client, generate_dummy_client_with_spec_and_accounts}; use transaction::{Transaction}; @@ -1198,7 +1198,7 @@ mod tests { assert_eq!(res.unwrap(), ()); assert_eq!(miner.pending_transactions(best_block).unwrap().len(), 1); assert_eq!(miner.pending_receipts(best_block).unwrap().len(), 1); - assert_eq!(miner.ready_transactions(&client).len(), 1); + assert_eq!(miner.ready_transactions(&client, 10, PendingOrdering::Priority).len(), 1); // This method will let us know if pending block was created (before calling that method) assert!(!miner.prepare_pending_block(&client)); } @@ -1217,7 +1217,7 @@ mod tests { assert_eq!(res.unwrap(), ()); assert_eq!(miner.pending_transactions(best_block), None); assert_eq!(miner.pending_receipts(best_block), None); - assert_eq!(miner.ready_transactions(&client).len(), 1); + assert_eq!(miner.ready_transactions(&client, 10, PendingOrdering::Priority).len(), 1); } #[test] @@ -1236,11 +1236,11 @@ mod tests { assert_eq!(miner.pending_transactions(best_block), None); assert_eq!(miner.pending_receipts(best_block), None); // By default we use PendingSet::AlwaysSealing, so no transactions yet. - assert_eq!(miner.ready_transactions(&client).len(), 0); + assert_eq!(miner.ready_transactions(&client, 10, PendingOrdering::Priority).len(), 0); // This method will let us know if pending block was created (before calling that method) assert!(miner.prepare_pending_block(&client)); // After pending block is created we should see a transaction. - assert_eq!(miner.ready_transactions(&client).len(), 1); + assert_eq!(miner.ready_transactions(&client, 10, PendingOrdering::Priority).len(), 1); } #[test] diff --git a/ethcore/src/tests/client.rs b/ethcore/src/tests/client.rs index 6dcad9ba62f..dbbd5004172 100644 --- a/ethcore/src/tests/client.rs +++ b/ethcore/src/tests/client.rs @@ -30,7 +30,7 @@ use test_helpers::{ use types::filter::Filter; use ethereum_types::{U256, Address}; use kvdb_rocksdb::{Database, DatabaseConfig}; -use miner::Miner; +use miner::{Miner, PendingOrdering}; use spec::Spec; use views::BlockView; use ethkey::KeyPair; @@ -345,12 +345,12 @@ fn does_not_propagate_delayed_transactions() { client.miner().import_own_transaction(&*client, tx0).unwrap(); client.miner().import_own_transaction(&*client, tx1).unwrap(); - assert_eq!(0, client.ready_transactions().len()); - assert_eq!(0, client.miner().ready_transactions(&*client).len()); + assert_eq!(0, client.ready_transactions(10).len()); + assert_eq!(0, client.miner().ready_transactions(&*client, 10, PendingOrdering::Priority).len()); push_blocks_to_client(&client, 53, 2, 2); client.flush_queue(); - assert_eq!(2, client.ready_transactions().len()); - assert_eq!(2, client.miner().ready_transactions(&*client).len()); + assert_eq!(2, client.ready_transactions(10).len()); + assert_eq!(2, client.miner().ready_transactions(&*client, 10, PendingOrdering::Priority).len()); } #[test] From 12854315928d17c16317cc6abe9c8ebccb480d35 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Wed, 6 Jun 2018 10:22:27 +0200 Subject: [PATCH 11/13] Fix light tests. --- ethcore/light/src/net/tests/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ethcore/light/src/net/tests/mod.rs b/ethcore/light/src/net/tests/mod.rs index 3c04c0ffba6..a653a8a7a9b 100644 --- a/ethcore/light/src/net/tests/mod.rs +++ b/ethcore/light/src/net/tests/mod.rs @@ -173,8 +173,8 @@ impl Provider for TestProvider { }) } - fn ready_transactions(&self) -> Vec { - self.0.client.ready_transactions() + fn ready_transactions(&self, max_len: usize) -> Vec { + self.0.client.ready_transactions(max_len) } } From f8fdea4af60dfe72982c1660860effbae8191437 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Thu, 7 Jun 2018 11:42:13 +0200 Subject: [PATCH 12/13] Fix ethcore-sync tests. --- ethcore/sync/src/chain/mod.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ethcore/sync/src/chain/mod.rs b/ethcore/sync/src/chain/mod.rs index 412348c265c..84e6344e688 100644 --- a/ethcore/sync/src/chain/mod.rs +++ b/ethcore/sync/src/chain/mod.rs @@ -1147,7 +1147,7 @@ pub mod tests { use super::{PeerInfo, PeerAsking}; use ethcore::header::*; use ethcore::client::{BlockChainClient, EachBlockWith, TestBlockChainClient, ChainInfo, BlockInfo}; - use ethcore::miner::MinerService; + use ethcore::miner::{MinerService, PendingOrdering}; use private_tx::NoopPrivateTxHandler; pub fn get_dummy_block(order: u32, parent_hash: H256) -> Bytes { @@ -1359,7 +1359,7 @@ pub mod tests { let mut io = TestIo::new(&mut client, &ss, &queue, None); io.chain.miner.chain_new_blocks(io.chain, &[], &[], &[], &good_blocks, false); sync.chain_new_blocks(&mut io, &[], &[], &[], &good_blocks, &[], &[]); - assert_eq!(io.chain.miner.ready_transactions(io.chain).len(), 1); + assert_eq!(io.chain.miner.ready_transactions(io.chain, 10, PendingOrdering::Priority).len(), 1); } // We need to update nonce status (because we say that the block has been imported) for h in &[good_blocks[0]] { @@ -1375,7 +1375,7 @@ pub mod tests { } // then - assert_eq!(client.miner.ready_transactions(&client).len(), 1); + assert_eq!(client.miner.ready_transactions(&client, 10, PendingOrdering::Priority).len(), 1); } #[test] From 9fe496d7c717547ac36ee7917aa76e91d6aac457 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tomasz=20Drwi=C4=99ga?= Date: Mon, 11 Jun 2018 11:36:57 +0200 Subject: [PATCH 13/13] Fix RPC tests. --- rpc/Cargo.toml | 2 +- rpc/src/v1/tests/helpers/miner_service.rs | 4 ++-- rpc/src/v1/tests/mocked/eth_pubsub.rs | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index 8a0b689c655..5c36171dece 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -36,7 +36,7 @@ jsonrpc-macros = { git = "https://github.com/paritytech/jsonrpc.git", branch = " jsonrpc-pubsub = { git = "https://github.com/paritytech/jsonrpc.git", branch = "parity-1.11" } ethash = { path = "../ethash" } -ethcore = { path = "../ethcore" } +ethcore = { path = "../ethcore", features = ["test-helpers"] } ethcore-bytes = { path = "../util/bytes" } ethcore-crypto = { path = "../ethcore/crypto" } ethcore-devtools = { path = "../devtools" } diff --git a/rpc/src/v1/tests/helpers/miner_service.rs b/rpc/src/v1/tests/helpers/miner_service.rs index 90201e346a5..8d0ec23ae1c 100644 --- a/rpc/src/v1/tests/helpers/miner_service.rs +++ b/rpc/src/v1/tests/helpers/miner_service.rs @@ -27,7 +27,7 @@ use ethcore::engines::EthEngine; use ethcore::error::Error; use ethcore::header::{BlockNumber, Header}; use ethcore::ids::BlockId; -use ethcore::miner::{MinerService, AuthoringParams}; +use ethcore::miner::{self, MinerService, AuthoringParams}; use ethcore::receipt::{Receipt, RichReceipt}; use ethereum_types::{H256, U256, Address}; use miner::pool::local_transactions::Status as LocalTransactionStatus; @@ -208,7 +208,7 @@ impl MinerService for TestMinerService { self.local_transactions.lock().iter().map(|(hash, stats)| (*hash, stats.clone())).collect() } - fn ready_transactions(&self, _chain: &C) -> Vec> { + fn ready_transactions(&self, _chain: &C, _max_len: usize, _ordering: miner::PendingOrdering) -> Vec> { self.queued_transactions() } diff --git a/rpc/src/v1/tests/mocked/eth_pubsub.rs b/rpc/src/v1/tests/mocked/eth_pubsub.rs index 0d886fe2f1f..30c99fc67ac 100644 --- a/rpc/src/v1/tests/mocked/eth_pubsub.rs +++ b/rpc/src/v1/tests/mocked/eth_pubsub.rs @@ -181,7 +181,7 @@ fn should_subscribe_to_pending_transactions() { assert_eq!(io.handle_request_sync(request, metadata.clone()), Some(response.to_owned())); // Send new transactions - handler.new_transactions(&[5.into(), 7.into()]); + handler.notify_new_transactions(&[5.into(), 7.into()]); let (res, receiver) = receiver.into_future().wait().unwrap(); let response = r#"{"jsonrpc":"2.0","method":"eth_subscription","params":{"result":"0x0000000000000000000000000000000000000000000000000000000000000005","subscription":"0x416d77337e24399d"}}"#;