Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Soften txpool p2p reputation requirements #1663

Merged
merged 9 commits into from
Feb 20, 2024
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Description of the upcoming release here.

### Changed

- [#1663](https://github.com/FuelLabs/fuel-core/pull/1663): Reduce the punishment criteria for mempool gossipping.
- [#1658](https://github.com/FuelLabs/fuel-core/pull/1658): Removed `Receipts` table. Instead, receipts are part of the `TransactionStatuses` table.
- [#1640](https://github.com/FuelLabs/fuel-core/pull/1640): Upgrade to fuel-vm 0.45.0.
- [#1635](https://github.com/FuelLabs/fuel-core/pull/1635): Move updating of the owned messages and coins to off-chain worker.
Expand Down
7 changes: 6 additions & 1 deletion crates/fuel-core/src/service/adapters/graphql_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,12 @@ impl TxPoolPort for TxPoolAdapter {
&self,
txs: Vec<Arc<Transaction>>,
) -> Vec<anyhow::Result<InsertionResult>> {
self.service.insert(txs).await
self.service
.insert(txs)
.await
.into_iter()
.map(|res| res.map_err(anyhow::Error::from))
.collect()
}

fn tx_update_subscribe(
Expand Down
165 changes: 82 additions & 83 deletions crates/services/txpool/src/containers/dependency.rs

Large diffs are not rendered by default.

21 changes: 11 additions & 10 deletions crates/services/txpool/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,9 +251,11 @@ where
Some(Ok(_)) => {
GossipsubMessageAcceptance::Accept
},
Some(Err(_)) => {
// Use similar p2p punishment rules as bitcoin
// https://github.com/bitcoin/bitcoin/blob/6ff0aa089c01ff3e610ecb47814ed739d685a14c/src/net_processing.cpp#L1856
Some(Err(Error::ConsensusValidity(_))) | Some(Err(Error::MintIsDisallowed)) => {
GossipsubMessageAcceptance::Reject
}
},
_ => GossipsubMessageAcceptance::Ignore
}
}
Expand All @@ -262,14 +264,13 @@ where
}
};

if acceptance != GossipsubMessageAcceptance::Ignore {
Copy link
Member Author

@Voxelot Voxelot Feb 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By not notifying the ignore status straightaway, these messages will be forced to sit in a cache until they expire, taking up gossipsub message capacity.

let message_info = GossipsubMessageInfo {
message_id,
peer_id,
};
// notify p2p layer about whether this tx was accepted
let message_info = GossipsubMessageInfo {
message_id,
peer_id,
};

let _ = self.shared.p2p.notify_gossip_transaction_validity(message_info, acceptance);
}
let _ = self.shared.p2p.notify_gossip_transaction_validity(message_info, acceptance);

should_continue = true;
} else {
Expand Down Expand Up @@ -355,7 +356,7 @@ where
pub async fn insert(
&self,
txs: Vec<Arc<Transaction>>,
) -> Vec<anyhow::Result<InsertionResult>> {
) -> Vec<Result<InsertionResult, Error>> {
// verify txs
let current_height = *self.current_height.lock();

Expand Down
12 changes: 9 additions & 3 deletions crates/services/txpool/src/service/test_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,7 @@ impl MockP2P {
});
Box::pin(stream)
});
p2p.expect_broadcast_transaction()
.returning(move |_| Ok(()));

p2p
}
}
Expand Down Expand Up @@ -190,7 +189,14 @@ impl TestContextBuilder {
let config = self.config.unwrap_or_default();
let mock_db = self.mock_db;

let p2p = self.p2p.unwrap_or_else(|| MockP2P::new_with_txs(vec![]));
let mut p2p = self.p2p.unwrap_or_else(|| MockP2P::new_with_txs(vec![]));
// set default handlers for p2p methods after test is set up, so they will be last on the FIFO
// ordering of methods handlers: https://docs.rs/mockall/0.12.1/mockall/index.html#matching-multiple-calls
p2p.expect_notify_gossip_transaction_validity()
.returning(move |_, _| Ok(()));
p2p.expect_broadcast_transaction()
.returning(move |_| Ok(()));

let importer = self
.importer
.unwrap_or_else(|| MockImporter::with_blocks(vec![]));
Expand Down
139 changes: 136 additions & 3 deletions crates/services/txpool/src/service/tests_p2p.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,21 @@
use super::*;
use crate::service::test_helpers::{
MockP2P,
TestContextBuilder,
use crate::{
service::test_helpers::{
MockP2P,
TestContextBuilder,
},
test_helpers::TEST_COIN_AMOUNT,
};
use fuel_core_services::Service;
use fuel_core_storage::rand::{
prelude::StdRng,
SeedableRng,
};
use fuel_core_types::fuel_tx::{
field::Inputs,
AssetId,
Transaction,
TransactionBuilder,
UniqueIdentifier,
};
use std::{
Expand Down Expand Up @@ -133,3 +143,126 @@ async fn test_insert_from_p2p_does_not_broadcast_to_p2p() {
"expected a timeout because no broadcast should have occurred"
)
}

#[tokio::test]
async fn test_gossipped_transaction_with_check_error_rejected() {
// verify that gossipped transactions which fail basic sanity checks are rejected (punished)

let mut ctx_builder = TestContextBuilder::new();
// add coin to builder db and generate a valid tx
let mut tx1 = ctx_builder.setup_script_tx(10);
// now intentionally muck up the tx such that it will return a CheckError,
// by duplicating an input
let script = tx1.as_script_mut().unwrap();
let input = script.inputs()[0].clone();
script.inputs_mut().push(input);
// setup p2p mock - with tx incoming from p2p
let txs = vec![tx1.clone()];
let mut p2p = MockP2P::new_with_txs(txs);
let (send, mut receive) = broadcast::channel::<()>(1);
p2p.expect_notify_gossip_transaction_validity()
.returning(move |_, validity| {
// Expect the transaction to be rejected
assert_eq!(validity, GossipsubMessageAcceptance::Reject);
// Notify test that the gossipsub acceptance was set
send.send(()).unwrap();
Ok(())
});
ctx_builder.with_p2p(p2p);

// build and start the txpool service
let ctx = ctx_builder.build();
let service = ctx.service();
service.start_and_await().await.unwrap();
// verify p2p was notified about the transaction validity
let gossip_validity_notified =
tokio::time::timeout(Duration::from_millis(100), receive.recv()).await;
assert!(
gossip_validity_notified.is_ok(),
"expected to receive gossip validity notification"
)
}

#[tokio::test]
async fn test_gossipped_mint_rejected() {
// verify that gossipped mint transactions are rejected (punished)
let tx1 = TransactionBuilder::mint(
0u32.into(),
0,
Default::default(),
Default::default(),
1,
AssetId::BASE,
)
.finalize_as_transaction();
// setup p2p mock - with tx incoming from p2p
let txs = vec![tx1.clone()];
let mut p2p = MockP2P::new_with_txs(txs);
let (send, mut receive) = broadcast::channel::<()>(1);
p2p.expect_notify_gossip_transaction_validity()
.returning(move |_, validity| {
// Expect the transaction to be rejected
assert_eq!(validity, GossipsubMessageAcceptance::Reject);
// Notify test that the gossipsub acceptance was set
send.send(()).unwrap();
Ok(())
});
// setup test context
let mut ctx_builder = TestContextBuilder::new();
ctx_builder.with_p2p(p2p);

// build and start the txpool service
let ctx = ctx_builder.build();
let service = ctx.service();
service.start_and_await().await.unwrap();
// verify p2p was notified about the transaction validity
let gossip_validity_notified =
tokio::time::timeout(Duration::from_millis(100), receive.recv()).await;
assert!(
gossip_validity_notified.is_ok(),
"expected to receive gossip validity notification"
)
}

#[tokio::test]
async fn test_gossipped_transaction_with_transient_error_ignored() {
// verify that gossipped transactions that fails stateful checks are ignored (but not punished)
let mut rng = StdRng::seed_from_u64(100);
let mut ctx_builder = TestContextBuilder::new();
// add coin to builder db and generate a valid tx
let mut tx1 = ctx_builder.setup_script_tx(10);
// now intentionally muck up the tx such that it will return a coin not found error
// by replacing the default coin with one that is not in the database
let script = tx1.as_script_mut().unwrap();
script.inputs_mut()[0] = crate::test_helpers::random_predicate(
&mut rng,
AssetId::BASE,
TEST_COIN_AMOUNT,
None,
);
// setup p2p mock - with tx incoming from p2p
let txs = vec![tx1.clone()];
let mut p2p = MockP2P::new_with_txs(txs);
let (send, mut receive) = broadcast::channel::<()>(1);
p2p.expect_notify_gossip_transaction_validity()
.returning(move |_, validity| {
// Expect the transaction to be rejected
assert_eq!(validity, GossipsubMessageAcceptance::Ignore);
// Notify test that the gossipsub acceptance was set
send.send(()).unwrap();
Ok(())
});
ctx_builder.with_p2p(p2p);

// build and start the txpool service
let ctx = ctx_builder.build();
let service = ctx.service();
service.start_and_await().await.unwrap();
// verify p2p was notified about the transaction validity
let gossip_validity_notified =
tokio::time::timeout(Duration::from_millis(100), receive.recv()).await;
assert!(
gossip_validity_notified.is_ok(),
"expected to receive gossip validity notification"
)
}
37 changes: 15 additions & 22 deletions crates/services/txpool/src/txpool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ where
fn insert_single(
&mut self,
tx: Checked<Transaction>,
) -> anyhow::Result<InsertionResult> {
) -> Result<InsertionResult, Error> {
let view = self.database.latest_view();
self.insert_inner(tx, &view)
}
Expand All @@ -274,32 +274,29 @@ where
&mut self,
tx: Checked<Transaction>,
view: &View,
) -> anyhow::Result<InsertionResult> {
) -> Result<InsertionResult, Error> {
let tx: CheckedTransaction = tx.into();

let tx = Arc::new(match tx {
CheckedTransaction::Script(script) => PoolTransaction::Script(script),
CheckedTransaction::Create(create) => PoolTransaction::Create(create),
CheckedTransaction::Mint(_) => {
return Err(anyhow::anyhow!("Mint transactions is not supported"))
}
CheckedTransaction::Mint(_) => return Err(Error::MintIsDisallowed),
});

if !tx.is_computed() {
return Err(Error::NoMetadata.into())
return Err(Error::NoMetadata)
}

// verify max gas is less than block limit
if tx.max_gas() > self.config.chain_config.block_gas_limit {
return Err(Error::NotInsertedMaxGasLimit {
tx_gas: tx.max_gas(),
block_limit: self.config.chain_config.block_gas_limit,
}
.into())
})
}

if self.by_hash.contains_key(&tx.id()) {
return Err(Error::NotInsertedTxKnown.into())
return Err(Error::NotInsertedTxKnown)
}

let mut max_limit_hit = false;
Expand All @@ -309,7 +306,7 @@ where
// limit is hit, check if we can push out lowest priced tx
let lowest_price = self.by_gas_price.lowest_value().unwrap_or_default();
if lowest_price >= tx.price() {
return Err(Error::NotInsertedLimitHit.into())
return Err(Error::NotInsertedLimitHit)
}
}
if self.config.metrics {
Expand Down Expand Up @@ -361,7 +358,7 @@ where
&mut self,
tx_status_sender: &TxStatusChange,
txs: Vec<Checked<Transaction>>,
) -> Vec<anyhow::Result<InsertionResult>> {
) -> Vec<Result<InsertionResult, Error>> {
// Check if that data is okay (witness match input/output, and if recovered signatures ara valid).
// should be done before transaction comes to txpool, or before it enters RwLocked region.
let mut res = Vec::new();
Expand Down Expand Up @@ -402,7 +399,7 @@ pub async fn check_transactions(
txs: &[Arc<Transaction>],
current_height: BlockHeight,
config: &Config,
) -> Vec<anyhow::Result<Checked<Transaction>>> {
) -> Vec<Result<Checked<Transaction>, Error>> {
let mut checked_txs = Vec::with_capacity(txs.len());

for tx in txs.iter() {
Expand All @@ -417,9 +414,9 @@ pub async fn check_single_tx(
tx: Transaction,
current_height: BlockHeight,
config: &Config,
) -> anyhow::Result<Checked<Transaction>> {
) -> Result<Checked<Transaction>, Error> {
if tx.is_mint() {
return Err(Error::NotSupportedTransactionType.into())
return Err(Error::NotSupportedTransactionType)
}

verify_tx_min_gas_price(&tx, config)?;
Expand All @@ -428,24 +425,20 @@ pub async fn check_single_tx(
let consensus_params = &config.chain_config.consensus_parameters;

let tx = tx
.into_checked_basic(current_height, consensus_params)
.map_err(|e| anyhow::anyhow!("{e:?}"))?
.check_signatures(&consensus_params.chain_id)
.map_err(|e| anyhow::anyhow!("{e:?}"))?;
.into_checked_basic(current_height, consensus_params)?
.check_signatures(&consensus_params.chain_id)?;

let tx = tx
.check_predicates_async::<TokioWithRayon>(&CheckPredicateParams::from(
consensus_params,
))
.await
.map_err(|e| anyhow::anyhow!("{e:?}"))?;
.await?;

debug_assert!(tx.checks().contains(Checks::all()));

tx
} else {
tx.into_checked_basic(current_height, &config.chain_config.consensus_parameters)
.map_err(|e| anyhow::anyhow!("{e:?}"))?
tx.into_checked_basic(current_height, &config.chain_config.consensus_parameters)?
};

Ok(tx)
Expand Down
Loading
Loading