Skip to content

Commit

Permalink
Merge branch 'develop' into ci-check-relaxed
Browse files Browse the repository at this point in the history
  • Loading branch information
chenyukang committed Sep 2, 2024
2 parents 3d78da8 + a73b7f4 commit ea3651f
Show file tree
Hide file tree
Showing 4 changed files with 229 additions and 22 deletions.
1 change: 1 addition & 0 deletions test/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,7 @@ fn all_specs() -> Vec<Box<dyn Spec>> {
Box::new(RbfReplaceProposedSuccess),
Box::new(RbfConcurrency),
Box::new(RbfCellDepsCheck),
Box::new(RbfCyclingAttack),
Box::new(CompactBlockEmpty),
Box::new(CompactBlockEmptyParentUnknown),
Box::new(CompactBlockPrefilled),
Expand Down
136 changes: 136 additions & 0 deletions test/src/specs/tx_pool/replace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1013,6 +1013,142 @@ impl Spec for RbfCellDepsCheck {
}
}

pub struct RbfCyclingAttack;
impl Spec for RbfCyclingAttack {
fn run(&self, nodes: &mut Vec<Node>) {
let node0 = &nodes[0];

let initial_inputs = gen_spendable(node0, 3);
let input_a = &initial_inputs[0];
let input_b = &initial_inputs[1];
let input_c = &initial_inputs[2];

let input_c: CellInput = CellInput::new_builder()
.previous_output(input_c.out_point.clone())
.build();

// Commit transaction root
let tx_a = {
let tx_a = always_success_transaction(node0, input_a);
node0.submit_transaction(&tx_a);
tx_a
};

let tx_b = {
let tx_b = always_success_transaction(node0, input_b);
node0.submit_transaction(&tx_b);
tx_b
};

let mut prev = tx_a.clone();
// Create transaction chain, A0 -> A1 -> A2
let mut txs_chain_a = vec![tx_a];
for _i in 0..2 {
let input =
CellMetaBuilder::from_cell_output(prev.output(0).unwrap(), Default::default())
.out_point(OutPoint::new(prev.hash(), 0))
.build();
let cur = always_success_transaction(node0, &input);
txs_chain_a.push(cur.clone());
let _ = node0.rpc_client().send_transaction(cur.data().into());
prev = cur.clone();
}

// Create transaction chain, B0 -> B1
let mut txs_chain_b = vec![tx_b.clone()];
let mut prev = tx_b;
for _i in 0..1 {
let input =
CellMetaBuilder::from_cell_output(prev.output(0).unwrap(), Default::default())
.out_point(OutPoint::new(prev.hash(), 0))
.build();
let cur = always_success_transaction(node0, &input);
txs_chain_b.push(cur.clone());
let _ = node0.rpc_client().send_transaction(cur.data().into());
prev = cur.clone();
}
let tx_b1 = txs_chain_b[1].clone();
eprintln!("tx_b1 {:?}", tx_b1.proposal_short_id());

// Create a child transaction consume B0 and A1
// A0 ---> A1 ---> A2
// |
// ----------> B2
// |
// B0 ---> B1
let tx_a1 = &txs_chain_a[1];
let tx_b0 = &txs_chain_b[0];

let input_a1: CellInput = CellInput::new_builder()
.previous_output(OutPoint::new(tx_a1.hash(), 0))
.build();
let input_b0 = CellInput::new_builder()
.previous_output(OutPoint::new(tx_b0.hash(), 0))
.build();

let tx_b2_output = CellOutputBuilder::default()
.capacity(capacity_bytes!(200).pack())
.build();
let tx_b2 = tx_a1
.as_advanced_builder()
.set_inputs(vec![input_a1, input_b0])
.set_outputs(vec![tx_b2_output])
.build();
let res = node0.rpc_client().send_transaction(tx_b2.data().into());
eprintln!("tx_b2 {:?}", res);

// after A2 and B1 is replaced by B2
// A0 ---> A1
// |
// ----------> B2
// |
// B0
let res = node0.rpc_client().get_transaction(tx_b2.hash());
assert_eq!(res.tx_status.status, Status::Pending);
let res = node0.rpc_client().get_transaction(txs_chain_a[2].hash());
assert_eq!(res.tx_status.status, Status::Rejected);
let res = node0.rpc_client().get_transaction(txs_chain_b[1].hash());
assert_eq!(res.tx_status.status, Status::Rejected);

// tx_b1 is still rejected
let res = node0.rpc_client().get_transaction(tx_b1.hash());
assert_eq!(res.tx_status.status, Status::Rejected);

// Create a new transaction A3 consume A1, it will replace B2
let input_a1 = CellInput::new_builder()
.previous_output(OutPoint::new(tx_a1.hash(), 0))
.build();
let tx_a3_output = CellOutputBuilder::default()
.capacity(capacity_bytes!(100).pack())
.build();
let tx_a3 = tx_a1
.as_advanced_builder()
.set_inputs(vec![input_a1, input_c])
.set_outputs(vec![tx_a3_output])
.build();
let _res = node0.rpc_client().send_transaction(tx_a3.data().into());

// now result is:
// A0 ---> A1 -> A3
//
// B0 -> B1 (B1 is recovered back)
//
let res = node0.rpc_client().get_transaction(tx_a3.hash());
assert_eq!(res.tx_status.status, Status::Pending);
let res = node0.rpc_client().get_transaction(tx_b2.hash());
assert_eq!(res.tx_status.status, Status::Rejected);
eprintln!("tx_b1 {:?}", tx_b1.proposal_short_id());

// B1 is expected by recovered back
let res = node0.rpc_client().get_transaction(tx_b1.hash());
assert_eq!(res.tx_status.status, Status::Pending);
}

fn modify_app_config(&self, config: &mut ckb_app_config::CKBAppConfig) {
config.tx_pool.min_rbf_rate = ckb_types::core::FeeRate(1500);
}
}

fn run_spec_send_conflict_relay(nodes: &mut [Node]) {
let node0 = &nodes[0];
let node1 = &nodes[1];
Expand Down
27 changes: 26 additions & 1 deletion tx-pool/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use std::sync::Arc;

const COMMITTED_HASH_CACHE_SIZE: usize = 100_000;
const CONFLICTES_CACHE_SIZE: usize = 10_000;
const CONFLICTES_INPUTS_CACHE_SIZE: usize = 30_000;
const MAX_REPLACEMENT_CANDIDATES: usize = 100;

/// Tx-pool implementation
Expand All @@ -44,6 +45,8 @@ pub struct TxPool {
pub(crate) expiry: u64,
// conflicted transaction cache
pub(crate) conflicts_cache: lru::LruCache<ProposalShortId, TransactionView>,
// conflicted transaction outputs cache, input -> tx_short_id
pub(crate) conflicts_outputs_cache: lru::LruCache<OutPoint, ProposalShortId>,
}

impl TxPool {
Expand All @@ -59,6 +62,7 @@ impl TxPool {
recent_reject,
expiry,
conflicts_cache: LruCache::new(CONFLICTES_CACHE_SIZE),
conflicts_outputs_cache: lru::LruCache::new(CONFLICTES_INPUTS_CACHE_SIZE),
}
}

Expand Down Expand Up @@ -158,6 +162,9 @@ impl TxPool {

pub(crate) fn record_conflict(&mut self, tx: TransactionView) {
let short_id = tx.proposal_short_id();
for inputs in tx.input_pts_iter() {
self.conflicts_outputs_cache.put(inputs, short_id.clone());
}
self.conflicts_cache.put(short_id.clone(), tx);
debug!(
"record_conflict {:?} now cache size: {}",
Expand All @@ -167,14 +174,31 @@ impl TxPool {
}

pub(crate) fn remove_conflict(&mut self, short_id: &ProposalShortId) {
self.conflicts_cache.pop(short_id);
if let Some(tx) = self.conflicts_cache.pop(short_id) {
for inputs in tx.input_pts_iter() {
self.conflicts_outputs_cache.pop(&inputs);
}
}
debug!(
"remove_conflict {:?} now cache size: {}",
short_id,
self.conflicts_cache.len()
);
}

pub(crate) fn get_conflicted_txs_from_inputs(
&self,
inputs: impl Iterator<Item = OutPoint>,
) -> Vec<TransactionView> {
inputs
.filter_map(|input| {
self.conflicts_outputs_cache
.peek(&input)
.and_then(|id| self.conflicts_cache.peek(id).cloned())
})
.collect()
}

/// Returns tx with cycles corresponding to the id.
pub(crate) fn get_tx_with_cycles(
&self,
Expand Down Expand Up @@ -493,6 +517,7 @@ impl TxPool {
self.snapshot = snapshot;
self.committed_txs_hash_cache = LruCache::new(COMMITTED_HASH_CACHE_SIZE);
self.conflicts_cache = LruCache::new(CONFLICTES_CACHE_SIZE);
self.conflicts_outputs_cache = lru::LruCache::new(CONFLICTES_INPUTS_CACHE_SIZE);
}

pub(crate) fn package_proposals(
Expand Down
87 changes: 66 additions & 21 deletions tx-pool/src/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,39 +132,35 @@ impl TxPoolService {
time_relative_verify(snapshot, Arc::clone(&entry.rtx), tx_env)?;
}

// try to remove conflicted tx here
for id in conflicts.iter() {
let removed = tx_pool.pool_map.remove_entry_and_descendants(id);
for old in removed {
debug!(
"remove conflict tx {} for RBF by new tx {}",
old.transaction().hash(),
entry.transaction().hash()
);
let reject = Reject::RBFRejected(format!(
"replaced by tx {}",
entry.transaction().hash()
));
// RBF replace successfully, put old transactions into conflicts pool
tx_pool.record_conflict(old.transaction().clone());
// after removing old tx from tx_pool, we call reject callbacks manually
self.callbacks.call_reject(tx_pool, &old, reject);
}
}
let may_recovered_txs = self.process_rbf(tx_pool, &entry, &conflicts);
let evicted = _submit_entry(tx_pool, status, entry.clone(), &self.callbacks)?;

// in a corner case, a tx with lower fee rate may be rejected immediately
// after inserting into pool, return proper reject error here
for evict in evicted {
let reject = Reject::Invalidated(format!(
"invalidated by tx {}",
evict.transaction().hash()
));
self.callbacks.call_reject(tx_pool, &evict, reject);
}

tx_pool.remove_conflict(&entry.proposal_short_id());
// in a corner case, a tx with lower fee rate may be rejected immediately
// after inserting into pool, return proper reject error here
tx_pool
.limit_size(&self.callbacks, Some(&entry.proposal_short_id()))
.map_or(Ok(()), Err)?;

if !may_recovered_txs.is_empty() {
let self_clone = self.clone();
tokio::spawn(async move {
// push the recovered txs back to verify queue, so that they can be verified and submitted again
let mut queue = self_clone.verify_queue.write().await;
for tx in may_recovered_txs {
debug!("recover back: {:?}", tx.proposal_short_id());
let _ = queue.add_tx(tx, None);
}
});
}
Ok(())
})
.await;
Expand Down Expand Up @@ -200,6 +196,55 @@ impl TxPoolService {
}
}

// try to remove conflicted tx here, the returned txs can be re-verified and re-submitted
// since they maybe not conflicted anymore
fn process_rbf(
&self,
tx_pool: &mut TxPool,
entry: &TxEntry,
conflicts: &HashSet<ProposalShortId>,
) -> Vec<TransactionView> {
let mut may_recovered_txs = vec![];
let mut available_inputs = HashSet::new();

if conflicts.is_empty() {
return may_recovered_txs;
}

let all_removed: Vec<_> = conflicts
.iter()
.flat_map(|id| tx_pool.pool_map.remove_entry_and_descendants(id))
.collect();

available_inputs.extend(
all_removed
.iter()
.flat_map(|removed| removed.transaction().input_pts_iter()),
);

for input in entry.transaction().input_pts_iter() {
available_inputs.remove(&input);
}

may_recovered_txs = tx_pool.get_conflicted_txs_from_inputs(available_inputs.into_iter());
for old in all_removed {
debug!(
"remove conflict tx {} for RBF by new tx {}",
old.transaction().hash(),
entry.transaction().hash()
);
let reject =
Reject::RBFRejected(format!("replaced by tx {}", entry.transaction().hash()));

// RBF replace successfully, put old transactions into conflicts pool
tx_pool.record_conflict(old.transaction().clone());
// after removing old tx from tx_pool, we call reject callbacks manually
self.callbacks.call_reject(tx_pool, &old, reject);
}
assert!(!may_recovered_txs.contains(entry.transaction()));
may_recovered_txs
}

pub(crate) async fn verify_queue_contains(&self, tx: &TransactionView) -> bool {
let queue = self.verify_queue.read().await;
queue.contains_key(&tx.proposal_short_id())
Expand Down

0 comments on commit ea3651f

Please sign in to comment.