Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Never drop local transactions from different senders. #9002

Merged
merged 2 commits into from
Jul 3, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 46 additions & 38 deletions miner/src/pool/scoring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
use std::cmp;

use ethereum_types::U256;
use txpool;
use txpool::{self, scoring};
use super::{verifier, PrioritizationStrategy, VerifiedTransaction};

/// Transaction with the same (sender, nonce) can be replaced only if
Expand Down Expand Up @@ -75,9 +75,9 @@ impl txpool::Scoring<VerifiedTransaction> for NonceAndGasPrice {
old.transaction.nonce.cmp(&other.transaction.nonce)
}

fn choose(&self, old: &VerifiedTransaction, new: &VerifiedTransaction) -> txpool::scoring::Choice {
fn choose(&self, old: &VerifiedTransaction, new: &VerifiedTransaction) -> scoring::Choice {
if old.transaction.nonce != new.transaction.nonce {
return txpool::scoring::Choice::InsertNew
return scoring::Choice::InsertNew
}

let old_gp = old.transaction.gas_price;
Expand All @@ -86,13 +86,13 @@ impl txpool::Scoring<VerifiedTransaction> for NonceAndGasPrice {
let min_required_gp = bump_gas_price(old_gp);

match min_required_gp.cmp(&new_gp) {
cmp::Ordering::Greater => txpool::scoring::Choice::RejectNew,
_ => txpool::scoring::Choice::ReplaceOld,
cmp::Ordering::Greater => scoring::Choice::RejectNew,
_ => scoring::Choice::ReplaceOld,
}
}

fn update_scores(&self, txs: &[txpool::Transaction<VerifiedTransaction>], scores: &mut [U256], change: txpool::scoring::Change) {
use self::txpool::scoring::Change;
fn update_scores(&self, txs: &[txpool::Transaction<VerifiedTransaction>], scores: &mut [U256], change: scoring::Change) {
use self::scoring::Change;

match change {
Change::Culled(_) => {},
Expand Down Expand Up @@ -122,19 +122,26 @@ impl txpool::Scoring<VerifiedTransaction> for NonceAndGasPrice {
}
}

fn should_replace(&self, old: &VerifiedTransaction, new: &VerifiedTransaction) -> bool {
fn should_replace(&self, old: &VerifiedTransaction, new: &VerifiedTransaction) -> scoring::Choice {
if old.sender == new.sender {
// prefer earliest transaction
match new.transaction.nonce.cmp(&old.transaction.nonce) {
cmp::Ordering::Less => true,
cmp::Ordering::Greater => false,
cmp::Ordering::Equal => self.choose(old, new) == txpool::scoring::Choice::ReplaceOld,
cmp::Ordering::Less => scoring::Choice::ReplaceOld,
cmp::Ordering::Greater => scoring::Choice::RejectNew,
cmp::Ordering::Equal => self.choose(old, new),
}
} else if old.priority().is_local() && new.priority().is_local() {
// accept local transactions over the limit
scoring::Choice::InsertNew
} else {
let old_score = (old.priority(), old.transaction.gas_price);
let new_score = (new.priority(), new.transaction.gas_price);
new_score > old_score
}
if new_score > old_score {
scoring::Choice::ReplaceOld
} else {
scoring::Choice::RejectNew
}
}
}
}

Expand All @@ -146,6 +153,7 @@ mod tests {
use ethkey::{Random, Generator};
use pool::tests::tx::{Tx, TxExt};
use txpool::Scoring;
use txpool::scoring::Choice::*;

#[test]
fn should_replace_same_sender_by_nonce() {
Expand Down Expand Up @@ -181,14 +189,14 @@ mod tests {
}
}).collect::<Vec<_>>();

assert!(!scoring.should_replace(&txs[0], &txs[1]));
assert!(scoring.should_replace(&txs[1], &txs[0]));
assert_eq!(scoring.should_replace(&txs[0], &txs[1]), RejectNew);
assert_eq!(scoring.should_replace(&txs[1], &txs[0]), ReplaceOld);

assert!(!scoring.should_replace(&txs[1], &txs[2]));
assert!(!scoring.should_replace(&txs[2], &txs[1]));
assert_eq!(scoring.should_replace(&txs[1], &txs[2]), RejectNew);
assert_eq!(scoring.should_replace(&txs[2], &txs[1]), RejectNew);

assert!(scoring.should_replace(&txs[1], &txs[3]));
assert!(!scoring.should_replace(&txs[3], &txs[1]));
assert_eq!(scoring.should_replace(&txs[1], &txs[3]), ReplaceOld);
assert_eq!(scoring.should_replace(&txs[3], &txs[1]), RejectNew);
}

#[test]
Expand Down Expand Up @@ -246,14 +254,14 @@ mod tests {
}
};

assert!(scoring.should_replace(&tx_regular_low_gas, &tx_regular_high_gas));
assert!(!scoring.should_replace(&tx_regular_high_gas, &tx_regular_low_gas));
assert_eq!(scoring.should_replace(&tx_regular_low_gas, &tx_regular_high_gas), ReplaceOld);
assert_eq!(scoring.should_replace(&tx_regular_high_gas, &tx_regular_low_gas), RejectNew);

assert!(scoring.should_replace(&tx_regular_high_gas, &tx_local_low_gas));
assert!(!scoring.should_replace(&tx_local_low_gas, &tx_regular_high_gas));
assert_eq!(scoring.should_replace(&tx_regular_high_gas, &tx_local_low_gas), ReplaceOld);
assert_eq!(scoring.should_replace(&tx_local_low_gas, &tx_regular_high_gas), RejectNew);

assert!(scoring.should_replace(&tx_local_low_gas, &tx_local_high_gas));
assert!(!scoring.should_replace(&tx_local_high_gas, &tx_regular_low_gas));
assert_eq!(scoring.should_replace(&tx_local_low_gas, &tx_local_high_gas), InsertNew);
assert_eq!(scoring.should_replace(&tx_local_high_gas, &tx_regular_low_gas), RejectNew);
}

#[test]
Expand All @@ -277,35 +285,35 @@ mod tests {

// No update required
let mut scores = initial_scores.clone();
scoring.update_scores(&transactions, &mut *scores, txpool::scoring::Change::Culled(0));
scoring.update_scores(&transactions, &mut *scores, txpool::scoring::Change::Culled(1));
scoring.update_scores(&transactions, &mut *scores, txpool::scoring::Change::Culled(2));
scoring.update_scores(&transactions, &mut *scores, scoring::Change::Culled(0));
scoring.update_scores(&transactions, &mut *scores, scoring::Change::Culled(1));
scoring.update_scores(&transactions, &mut *scores, scoring::Change::Culled(2));
assert_eq!(scores, initial_scores);
let mut scores = initial_scores.clone();
scoring.update_scores(&transactions, &mut *scores, txpool::scoring::Change::RemovedAt(0));
scoring.update_scores(&transactions, &mut *scores, txpool::scoring::Change::RemovedAt(1));
scoring.update_scores(&transactions, &mut *scores, txpool::scoring::Change::RemovedAt(2));
scoring.update_scores(&transactions, &mut *scores, scoring::Change::RemovedAt(0));
scoring.update_scores(&transactions, &mut *scores, scoring::Change::RemovedAt(1));
scoring.update_scores(&transactions, &mut *scores, scoring::Change::RemovedAt(2));
assert_eq!(scores, initial_scores);

// Compute score at given index
let mut scores = initial_scores.clone();
scoring.update_scores(&transactions, &mut *scores, txpool::scoring::Change::InsertedAt(0));
scoring.update_scores(&transactions, &mut *scores, scoring::Change::InsertedAt(0));
assert_eq!(scores, vec![32768.into(), 0.into(), 0.into()]);
scoring.update_scores(&transactions, &mut *scores, txpool::scoring::Change::InsertedAt(1));
scoring.update_scores(&transactions, &mut *scores, scoring::Change::InsertedAt(1));
assert_eq!(scores, vec![32768.into(), 1024.into(), 0.into()]);
scoring.update_scores(&transactions, &mut *scores, txpool::scoring::Change::InsertedAt(2));
scoring.update_scores(&transactions, &mut *scores, scoring::Change::InsertedAt(2));
assert_eq!(scores, vec![32768.into(), 1024.into(), 1.into()]);

let mut scores = initial_scores.clone();
scoring.update_scores(&transactions, &mut *scores, txpool::scoring::Change::ReplacedAt(0));
scoring.update_scores(&transactions, &mut *scores, scoring::Change::ReplacedAt(0));
assert_eq!(scores, vec![32768.into(), 0.into(), 0.into()]);
scoring.update_scores(&transactions, &mut *scores, txpool::scoring::Change::ReplacedAt(1));
scoring.update_scores(&transactions, &mut *scores, scoring::Change::ReplacedAt(1));
assert_eq!(scores, vec![32768.into(), 1024.into(), 0.into()]);
scoring.update_scores(&transactions, &mut *scores, txpool::scoring::Change::ReplacedAt(2));
scoring.update_scores(&transactions, &mut *scores, scoring::Change::ReplacedAt(2));
assert_eq!(scores, vec![32768.into(), 1024.into(), 1.into()]);

// Check penalization
scoring.update_scores(&transactions, &mut *scores, txpool::scoring::Change::Event(()));
scoring.update_scores(&transactions, &mut *scores, scoring::Change::Event(()));
assert_eq!(scores, vec![32768.into(), 128.into(), 0.into()]);
}
}
62 changes: 56 additions & 6 deletions miner/src/pool/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ fn new_queue() -> TransactionQueue {
PrioritizationStrategy::GasPriceOnly,
)
}

#[test]
fn should_return_correct_nonces_when_dropped_because_of_limit() {
// given
Expand All @@ -63,8 +62,8 @@ fn should_return_correct_nonces_when_dropped_because_of_limit() {
let nonce = tx1.nonce;

// when
let r1= txq.import(TestClient::new(), vec![tx1].local());
let r2= txq.import(TestClient::new(), vec![tx2].local());
let r1 = txq.import(TestClient::new(), vec![tx1].retracted());
let r2 = txq.import(TestClient::new(), vec![tx2].retracted());
assert_eq!(r1, vec![Ok(())]);
assert_eq!(r2, vec![Err(transaction::Error::LimitReached)]);
assert_eq!(txq.status().status.transaction_count, 1);
Expand All @@ -77,17 +76,68 @@ fn should_return_correct_nonces_when_dropped_because_of_limit() {
let tx2 = Tx::gas_price(2).signed();
let tx3 = Tx::gas_price(1).signed();
let tx4 = Tx::gas_price(3).signed();
let res = txq.import(TestClient::new(), vec![tx1, tx2].local());
let res2 = txq.import(TestClient::new(), vec![tx3, tx4].local());
let res = txq.import(TestClient::new(), vec![tx1, tx2].retracted());
let res2 = txq.import(TestClient::new(), vec![tx3, tx4].retracted());

// then
assert_eq!(res, vec![Ok(()), Ok(())]);
assert_eq!(res2, vec![Err(transaction::Error::LimitReached), Ok(())]);
assert_eq!(res2, vec![
// The error here indicates reaching the limit
// and minimal effective gas price taken into account.
Err(transaction::Error::InsufficientGasPrice { minimal: 2.into(), got: 1.into() }),
Ok(())
]);
assert_eq!(txq.status().status.transaction_count, 3);
// First inserted transacton got dropped because of limit
assert_eq!(txq.next_nonce(TestClient::new(), &sender), None);
}

#[test]
fn should_never_drop_local_transactions_from_different_senders() {
// given
let txq = TransactionQueue::new(
txpool::Options {
max_count: 3,
max_per_sender: 1,
max_mem_usage: 50
},
verifier::Options {
minimal_gas_price: 1.into(),
block_gas_limit: 1_000_000.into(),
tx_gas_limit: 1_000_000.into(),
},
PrioritizationStrategy::GasPriceOnly,
);
let (tx1, tx2) = Tx::gas_price(2).signed_pair();
let sender = tx1.sender();
let nonce = tx1.nonce;

// when
let r1 = txq.import(TestClient::new(), vec![tx1].local());
let r2 = txq.import(TestClient::new(), vec![tx2].local());
assert_eq!(r1, vec![Ok(())]);
// max-per-sender is reached, that's ok.
assert_eq!(r2, vec![Err(transaction::Error::LimitReached)]);
assert_eq!(txq.status().status.transaction_count, 1);

// then
assert_eq!(txq.next_nonce(TestClient::new(), &sender), Some(nonce + 1.into()));

// when
let tx1 = Tx::gas_price(2).signed();
let tx2 = Tx::gas_price(2).signed();
let tx3 = Tx::gas_price(1).signed();
let tx4 = Tx::gas_price(3).signed();
let res = txq.import(TestClient::new(), vec![tx1, tx2].local());
let res2 = txq.import(TestClient::new(), vec![tx3, tx4].local());

// then
assert_eq!(res, vec![Ok(()), Ok(())]);
assert_eq!(res2, vec![Ok(()), Ok(())]);
assert_eq!(txq.status().status.transaction_count, 5);
assert_eq!(txq.next_nonce(TestClient::new(), &sender), Some(nonce + 1.into()));
}

#[test]
fn should_handle_same_transaction_imported_twice_with_different_state_nonces() {
// given
Expand Down
53 changes: 34 additions & 19 deletions transaction-pool/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use error;
use listener::{Listener, NoopListener};
use options::Options;
use ready::{Ready, Readiness};
use scoring::{Scoring, ScoreWithRef};
use scoring::{self, Scoring, ScoreWithRef};
use status::{LightStatus, Status};
use transactions::{AddResult, Transactions};

Expand Down Expand Up @@ -139,7 +139,7 @@ impl<T, S, L> Pool<T, S, L> where
ensure!(!self.by_hash.contains_key(transaction.hash()), error::ErrorKind::AlreadyImported(format!("{:?}", transaction.hash())));

self.insertion_id += 1;
let mut transaction = Transaction {
let transaction = Transaction {
insertion_id: self.insertion_id,
transaction: Arc::new(transaction),
};
Expand All @@ -148,27 +148,32 @@ impl<T, S, L> Pool<T, S, L> where
// Avoid using should_replace, but rather use scoring for that.
{
let remove_worst = |s: &mut Self, transaction| {
match s.remove_worst(&transaction) {
match s.remove_worst(transaction) {
Err(err) => {
s.listener.rejected(&transaction, err.kind());
s.listener.rejected(transaction, err.kind());
Err(err)
},
Ok(removed) => {
s.listener.dropped(&removed, Some(&transaction));
Ok(None) => Ok(false),
Ok(Some(removed)) => {
s.listener.dropped(&removed, Some(transaction));
s.finalize_remove(removed.hash());
Ok(transaction)
Ok(true)
},
}
};

while self.by_hash.len() + 1 > self.options.max_count {
trace!("Count limit reached: {} > {}", self.by_hash.len() + 1, self.options.max_count);
transaction = remove_worst(self, transaction)?;
if !remove_worst(self, &transaction)? {
break;
}
}

while self.mem_usage + mem_usage > self.options.max_mem_usage {
trace!("Mem limit reached: {} > {}", self.mem_usage + mem_usage, self.options.max_mem_usage);
transaction = remove_worst(self, transaction)?;
if !remove_worst(self, &transaction)? {
break;
}
}
}

Expand Down Expand Up @@ -273,28 +278,38 @@ impl<T, S, L> Pool<T, S, L> where
}

/// Attempts to remove the worst transaction from the pool if it's worse than the given one.
fn remove_worst(&mut self, transaction: &Transaction<T>) -> error::Result<Transaction<T>> {
///
/// Returns `None` in case we couldn't decide if the transaction should replace the worst transaction or not.
/// In such case we will accept the transaction even though it is going to exceed the limit.
fn remove_worst(&mut self, transaction: &Transaction<T>) -> error::Result<Option<Transaction<T>>> {
let to_remove = match self.worst_transactions.iter().next_back() {
// No elements to remove? and the pool is still full?
None => {
warn!("The pool is full but there are no transactions to remove.");
return Err(error::ErrorKind::TooCheapToEnter(format!("{:?}", transaction.hash()), "unknown".into()).into());
},
Some(old) => if self.scoring.should_replace(&old.transaction, transaction) {
Some(old) => match self.scoring.should_replace(&old.transaction, transaction) {
// We can't decide which of them should be removed, so accept both.
scoring::Choice::InsertNew => None,
// New transaction is better than the worst one so we can replace it.
old.clone()
} else {
scoring::Choice::ReplaceOld => Some(old.clone()),
// otherwise fail
return Err(error::ErrorKind::TooCheapToEnter(format!("{:?}", transaction.hash()), format!("{:?}", old.score)).into())
scoring::Choice::RejectNew => {
return Err(error::ErrorKind::TooCheapToEnter(format!("{:?}", transaction.hash()), format!("{:?}", old.score)).into())
},
},
};

// Remove from transaction set
self.remove_from_set(to_remove.transaction.sender(), |set, scoring| {
set.remove(&to_remove.transaction, scoring)
});
if let Some(to_remove) = to_remove {
// Remove from transaction set
self.remove_from_set(to_remove.transaction.sender(), |set, scoring| {
set.remove(&to_remove.transaction, scoring)
});

Ok(to_remove.transaction)
Ok(Some(to_remove.transaction))
} else {
Ok(None)
}
}

/// Removes transaction from sender's transaction `HashMap`.
Expand Down
4 changes: 3 additions & 1 deletion transaction-pool/src/scoring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ pub trait Scoring<T>: fmt::Debug {
fn update_scores(&self, txs: &[Transaction<T>], scores: &mut [Self::Score], change: Change<Self::Event>);

/// Decides if `new` should push out `old` transaction from the pool.
fn should_replace(&self, old: &T, new: &T) -> bool;
///
/// NOTE returning `InsertNew` here can lead to some transactions being accepted above pool limits.
fn should_replace(&self, old: &T, new: &T) -> Choice;
}

/// A score with a reference to the transaction.
Expand Down
Loading