From 529ea05786e945b43057239f95cba456fd1dd81e Mon Sep 17 00:00:00 2001 From: Brandon Williams Date: Wed, 7 Sep 2022 10:38:34 -0700 Subject: [PATCH 01/17] cluster: convert cluster to use CommitteeFixture --- executor/tests/consensus_integration_tests.rs | 2 +- primary/tests/causal_completion_tests.rs | 4 +-- .../integration_tests_configuration_api.rs | 6 ++-- primary/tests/nodes_bootstrapping_tests.rs | 10 +++--- test_utils/src/cluster.rs | 33 ++++++++----------- test_utils/src/tests/cluster_tests.rs | 4 +-- 6 files changed, 26 insertions(+), 33 deletions(-) diff --git a/executor/tests/consensus_integration_tests.rs b/executor/tests/consensus_integration_tests.rs index b57981f88..eeeeb5cc3 100644 --- a/executor/tests/consensus_integration_tests.rs +++ b/executor/tests/consensus_integration_tests.rs @@ -11,7 +11,7 @@ async fn test_internal_consensus_output() { // nodes logs. let _guard = setup_tracing(); - let mut cluster = Cluster::new(None, None, None, true); + let mut cluster = Cluster::new(None, true); // start the cluster cluster.start(Some(4), Some(1), None).await; diff --git a/primary/tests/causal_completion_tests.rs b/primary/tests/causal_completion_tests.rs index bd518058d..80da102cc 100644 --- a/primary/tests/causal_completion_tests.rs +++ b/primary/tests/causal_completion_tests.rs @@ -15,7 +15,7 @@ async fn test_restore_from_disk() { // nodes logs. let _guard = setup_tracing(); - let mut cluster = Cluster::new(None, None, None, true); + let mut cluster = Cluster::new(None, true); // start the cluster cluster.start(Some(4), Some(1), None).await; @@ -105,7 +105,7 @@ async fn test_read_causal_signed_certificates() { // nodes logs. let _guard = setup_tracing(); - let mut cluster = Cluster::new(None, None, None, true); + let mut cluster = Cluster::new(None, true); // start the cluster cluster.start(Some(4), Some(1), None).await; diff --git a/primary/tests/integration_tests_configuration_api.rs b/primary/tests/integration_tests_configuration_api.rs index 3c9362ed3..ed1af7904 100644 --- a/primary/tests/integration_tests_configuration_api.rs +++ b/primary/tests/integration_tests_configuration_api.rs @@ -9,7 +9,7 @@ use types::{ #[tokio::test] async fn test_new_epoch() { - let mut cluster = Cluster::new(None, None, None, false); + let mut cluster = Cluster::new(None, false); // start the cluster will all the possible nodes cluster.start(Some(2), Some(1), None).await; @@ -53,7 +53,7 @@ async fn test_new_epoch() { #[tokio::test] async fn test_new_network_info() { - let mut cluster = Cluster::new(None, None, None, false); + let mut cluster = Cluster::new(None, false); // start the cluster will all the possible nodes cluster.start(Some(2), Some(1), None).await; @@ -113,7 +113,7 @@ async fn test_new_network_info() { #[tokio::test] async fn test_get_primary_address() { - let mut cluster = Cluster::new(None, None, None, false); + let mut cluster = Cluster::new(None, false); // start the cluster will all the possible nodes cluster.start(Some(2), Some(1), None).await; diff --git a/primary/tests/nodes_bootstrapping_tests.rs b/primary/tests/nodes_bootstrapping_tests.rs index 3d0daae2a..106169d0c 100644 --- a/primary/tests/nodes_bootstrapping_tests.rs +++ b/primary/tests/nodes_bootstrapping_tests.rs @@ -14,7 +14,7 @@ async fn test_shutdown_bug() { let delay = Duration::from_secs(10); // 10 seconds // A cluster of 4 nodes will be created - let cluster = Cluster::new(None, None, None, false); + let cluster = Cluster::new(None, false); // ==== Start first authority ==== let authority = cluster.authority(0); @@ -53,7 +53,7 @@ async fn test_node_staggered_starts() { let node_staggered_delay = Duration::from_secs(60 * 5); // 5 minutes // A cluster of 4 nodes will be created - let cluster = Cluster::new(None, None, None, true); + let cluster = Cluster::new(None, true); // ==== Start first authority ==== cluster.authority(0).start(false, Some(1)).await; @@ -103,7 +103,7 @@ async fn test_second_node_restart() { let node_advance_delay = Duration::from_secs(60); // A cluster of 4 nodes will be created - let mut cluster = Cluster::new(None, None, None, true); + let mut cluster = Cluster::new(None, true); // ===== Start the cluster ==== cluster.start(Some(4), Some(1), None).await; @@ -148,7 +148,7 @@ async fn test_loss_of_liveness_without_recovery() { let node_advance_delay = Duration::from_secs(60); // A cluster of 4 nodes will be created - let mut cluster = Cluster::new(None, None, None, true); + let mut cluster = Cluster::new(None, true); // ===== Start the cluster ==== cluster.start(Some(4), Some(1), None).await; @@ -199,7 +199,7 @@ async fn test_loss_of_liveness_with_recovery() { let node_advance_delay = Duration::from_secs(60); // A cluster of 4 nodes will be created - let mut cluster = Cluster::new(None, None, None, true); + let mut cluster = Cluster::new(None, true); // ===== Start the cluster ==== cluster.start(Some(4), Some(1), None).await; diff --git a/test_utils/src/cluster.rs b/test_utils/src/cluster.rs index 5155273ad..8cbf45daa 100644 --- a/test_utils/src/cluster.rs +++ b/test_utils/src/cluster.rs @@ -1,8 +1,8 @@ // Copyright (c) 2022, Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use crate::{keys, pure_committee_from_keys, shared_worker_cache_from_keys, temp_dir}; +use crate::{temp_dir, CommitteeFixture}; use arc_swap::ArcSwap; -use config::{Committee, Parameters, SharedCommittee, SharedWorkerCache, WorkerId}; +use config::{Parameters, SharedCommittee, SharedWorkerCache, WorkerId}; use crypto::{KeyPair, PublicKey}; use executor::{SerializedTransaction, SubscriberResult}; use fastcrypto::traits::KeyPair as _; @@ -29,6 +29,8 @@ use types::{ConfigurationClient, ProposerClient, TransactionsClient}; pub mod cluster_tests; pub struct Cluster { + #[allow(unused)] + fixture: CommitteeFixture, authorities: HashMap, pub committee_shared: SharedCommittee, pub worker_cache_shared: SharedWorkerCache, @@ -40,38 +42,28 @@ impl Cluster { /// Initialises a new cluster by the provided parameters. The cluster will /// create all the authorities (primaries & workers) that are defined under /// the committee structure, but none of them will be started. - /// If an `input_committee` is provided then this will be used, otherwise the default - /// will be used instead. - /// If an `input_shared_worker_cache` is provided then this will be used, - /// otherwise the default will be used instead. + /// /// When the `internal_consensus_enabled` is true then the standard internal /// consensus engine will be enabled. If false, then the internal consensus will /// be disabled and the gRPC server will be enabled to manage the Collections & the /// DAG externally. - pub fn new( - parameters: Option, - input_committee: Option, - input_shared_worker_cache: Option, - internal_consensus_enabled: bool, - ) -> Self { - let k = keys(None); - let c = input_committee.unwrap_or_else(|| pure_committee_from_keys(&k)); - let shared_worker_cache = - input_shared_worker_cache.unwrap_or_else(|| shared_worker_cache_from_keys(&k)); + pub fn new(parameters: Option, internal_consensus_enabled: bool) -> Self { + let fixture = CommitteeFixture::builder().randomize_ports(true).build(); + let c = fixture.committee(); + let shared_worker_cache = fixture.shared_worker_cache(); let shared_committee = Arc::new(ArcSwap::from_pointee(c)); let params = parameters.unwrap_or_else(Self::parameters); info!("###### Creating new cluster ######"); info!("Validator keys:"); - let k = keys(None); let mut nodes = HashMap::new(); - for (id, key_pair) in k.into_iter().enumerate() { - info!("Key {id} -> {}", key_pair.public().clone()); + for (id, authority_fixture) in fixture.authorities().enumerate() { + info!("Key {id} -> {}", authority_fixture.public_key()); let authority = AuthorityDetails::new( id, - key_pair, + authority_fixture.keypair().copy(), params.clone(), shared_committee.clone(), shared_worker_cache.clone(), @@ -81,6 +73,7 @@ impl Cluster { } Self { + fixture, authorities: nodes, committee_shared: shared_committee, worker_cache_shared: shared_worker_cache, diff --git a/test_utils/src/tests/cluster_tests.rs b/test_utils/src/tests/cluster_tests.rs index 49bd3c93a..68402b133 100644 --- a/test_utils/src/tests/cluster_tests.rs +++ b/test_utils/src/tests/cluster_tests.rs @@ -6,7 +6,7 @@ use types::{PublicKeyProto, RoundsRequest}; #[tokio::test] async fn basic_cluster_setup() { - let mut cluster = Cluster::new(None, None, None, true); + let mut cluster = Cluster::new(None, true); // start the cluster will all the possible nodes cluster.start(None, None, None).await; @@ -37,7 +37,7 @@ async fn basic_cluster_setup() { #[tokio::test] async fn cluster_setup_with_consensus_disabled() { - let mut cluster = Cluster::new(None, None, None, false); + let mut cluster = Cluster::new(None, false); // start the cluster will all the possible nodes cluster.start(Some(2), Some(1), None).await; From 6db1c02c13e6e9f192e68715eb74faf618a4c2a5 Mon Sep 17 00:00:00 2001 From: Brandon Williams Date: Wed, 7 Sep 2022 11:19:57 -0700 Subject: [PATCH 02/17] test-utils: add utility for adding a new authority to a CommitteeFixture --- .../snapshots/config_tests__committee.snap | 8 +- .../snapshots/config_tests__worker_cache.snap | 8 +- test_utils/src/lib.rs | 124 +++++++++++------- 3 files changed, 84 insertions(+), 56 deletions(-) diff --git a/config/tests/snapshots/config_tests__committee.snap b/config/tests/snapshots/config_tests__committee.snap index 1a4850b65..82e6bcbb3 100644 --- a/config/tests/snapshots/config_tests__committee.snap +++ b/config/tests/snapshots/config_tests__committee.snap @@ -4,28 +4,28 @@ expression: committee --- { "authorities": { - "ildi4hrBzbOHBELHe0w69Yx87bh3nQJw5tTx4vc2fXQ=": { + "O6EMZUvkFG2rPW2rgLPOJM4323x0XCslMgXWL5Z7ALM=": { "stake": 1, "primary": { "primary_to_primary": "/ip4/127.0.0.1/tcp/0/http", "worker_to_primary": "/ip4/127.0.0.1/tcp/0/http" } }, - "rvP0pLjsod/DQzYb+OQ2vULeklnAS4MU644gVN1ugqs=": { + "gznZnDM7s6J3/reAuNRsrDDNzZBQ2+I1+4zGe5HBsGo=": { "stake": 1, "primary": { "primary_to_primary": "/ip4/127.0.0.1/tcp/0/http", "worker_to_primary": "/ip4/127.0.0.1/tcp/0/http" } }, - "ucbuFjDvPnERRKZI2wa7sihPcnTPvuU//O5QPMGkkgA=": { + "nUdhahjfIDkwfwZCImAliLUqaq6qe2/oxrTZ66inh7E=": { "stake": 1, "primary": { "primary_to_primary": "/ip4/127.0.0.1/tcp/0/http", "worker_to_primary": "/ip4/127.0.0.1/tcp/0/http" } }, - "zGIzLjS7LVzWn2Dvuyo2y5FsfrRYMB6jZjbE27ASvYg=": { + "ucbuFjDvPnERRKZI2wa7sihPcnTPvuU//O5QPMGkkgA=": { "stake": 1, "primary": { "primary_to_primary": "/ip4/127.0.0.1/tcp/0/http", diff --git a/config/tests/snapshots/config_tests__worker_cache.snap b/config/tests/snapshots/config_tests__worker_cache.snap index 5f3625a39..a86d60d9c 100644 --- a/config/tests/snapshots/config_tests__worker_cache.snap +++ b/config/tests/snapshots/config_tests__worker_cache.snap @@ -4,7 +4,7 @@ expression: worker_cache --- { "workers": { - "ildi4hrBzbOHBELHe0w69Yx87bh3nQJw5tTx4vc2fXQ=": { + "O6EMZUvkFG2rPW2rgLPOJM4323x0XCslMgXWL5Z7ALM=": { "0": { "transactions": "/ip4/127.0.0.1/tcp/0/http", "worker_to_worker": "/ip4/127.0.0.1/tcp/0/http", @@ -26,7 +26,7 @@ expression: worker_cache "primary_to_worker": "/ip4/127.0.0.1/tcp/0/http" } }, - "rvP0pLjsod/DQzYb+OQ2vULeklnAS4MU644gVN1ugqs=": { + "gznZnDM7s6J3/reAuNRsrDDNzZBQ2+I1+4zGe5HBsGo=": { "0": { "transactions": "/ip4/127.0.0.1/tcp/0/http", "worker_to_worker": "/ip4/127.0.0.1/tcp/0/http", @@ -48,7 +48,7 @@ expression: worker_cache "primary_to_worker": "/ip4/127.0.0.1/tcp/0/http" } }, - "ucbuFjDvPnERRKZI2wa7sihPcnTPvuU//O5QPMGkkgA=": { + "nUdhahjfIDkwfwZCImAliLUqaq6qe2/oxrTZ66inh7E=": { "0": { "transactions": "/ip4/127.0.0.1/tcp/0/http", "worker_to_worker": "/ip4/127.0.0.1/tcp/0/http", @@ -70,7 +70,7 @@ expression: worker_cache "primary_to_worker": "/ip4/127.0.0.1/tcp/0/http" } }, - "zGIzLjS7LVzWn2Dvuyo2y5FsfrRYMB6jZjbE27ASvYg=": { + "ucbuFjDvPnERRKZI2wa7sihPcnTPvuU//O5QPMGkkgA=": { "0": { "transactions": "/ip4/127.0.0.1/tcp/0/http", "worker_to_worker": "/ip4/127.0.0.1/tcp/0/http", diff --git a/test_utils/src/lib.rs b/test_utils/src/lib.rs index 5c6c02bc5..69a706db8 100644 --- a/test_utils/src/lib.rs +++ b/test_utils/src/lib.rs @@ -993,54 +993,8 @@ impl Builder { } }; - let primary_keys = (0..self.committee_size.get()) - .map(|_| KeyPair::generate(&mut self.rng)) - .collect::>(); - - let authorities = primary_keys - .into_iter() - .map(|keypair| { - let primary_addresses = PrimaryAddresses { - primary_to_primary: format!("/ip4/127.0.0.1/tcp/{}/http", get_port()) - .parse() - .unwrap(), - worker_to_primary: format!("/ip4/127.0.0.1/tcp/{}/http", get_port()) - .parse() - .unwrap(), - }; - - let workers = (0..self.number_of_workers.get()) - .map(|idx| { - let worker = WorkerFixture { - keypair: KeyPair::generate(&mut self.rng), - id: idx as u32, - info: WorkerInfo { - primary_to_worker: format!( - "/ip4/127.0.0.1/tcp/{}/http", - get_port() - ) - .parse() - .unwrap(), - transactions: format!("/ip4/127.0.0.1/tcp/{}/http", get_port()) - .parse() - .unwrap(), - worker_to_worker: format!("/ip4/127.0.0.1/tcp/{}/http", get_port()) - .parse() - .unwrap(), - }, - }; - - (idx as u32, worker) - }) - .collect(); - - AuthorityFixture { - keypair, - stake: 1, - addresses: primary_addresses, - workers, - } - }) + let authorities = (0..self.committee_size.get()) + .map(|_| AuthorityFixture::generate(&mut self.rng, self.number_of_workers, get_port)) .collect(); CommitteeFixture { @@ -1132,6 +1086,20 @@ impl CommitteeFixture { .collect(); Certificate::new(&committee, header.clone(), votes).unwrap() } + + /// Add a new authority to the committ by randoming generating a key + pub fn add_authority(&mut self) { + let authority = AuthorityFixture::generate( + &mut OsRng, + NonZeroUsize::new(4).unwrap(), + get_available_port, + ); + self.authorities.push(authority) + } + + pub fn bump_epoch(&mut self) { + self.epoch += 1 + } } pub struct AuthorityFixture { @@ -1189,6 +1157,37 @@ impl AuthorityFixture { pub fn vote(&self, header: &Header) -> Vote { Vote::new_with_signer(header, self.keypair.public(), &self.keypair) } + + fn generate(mut rng: R, number_of_workers: NonZeroUsize, mut get_port: P) -> Self + where + R: ::rand::RngCore + ::rand::CryptoRng, + P: FnMut() -> u16, + { + let keypair = KeyPair::generate(&mut rng); + let primary_addresses = PrimaryAddresses { + primary_to_primary: format!("/ip4/127.0.0.1/tcp/{}/http", get_port()) + .parse() + .unwrap(), + worker_to_primary: format!("/ip4/127.0.0.1/tcp/{}/http", get_port()) + .parse() + .unwrap(), + }; + + let workers = (0..number_of_workers.get()) + .map(|idx| { + let worker = WorkerFixture::generate(&mut rng, idx as u32, &mut get_port); + + (idx as u32, worker) + }) + .collect(); + + Self { + keypair, + stake: 1, + addresses: primary_addresses, + workers, + } + } } pub struct WorkerFixture { @@ -1198,3 +1197,32 @@ pub struct WorkerFixture { id: WorkerId, info: WorkerInfo, } + +impl WorkerFixture { + fn generate(mut rng: R, id: WorkerId, mut get_port: P) -> Self + where + R: ::rand::RngCore + ::rand::CryptoRng, + P: FnMut() -> u16, + { + let keypair = KeyPair::generate(&mut rng); + let primary_to_worker = format!("/ip4/127.0.0.1/tcp/{}/http", get_port()) + .parse() + .unwrap(); + let worker_to_worker = format!("/ip4/127.0.0.1/tcp/{}/http", get_port()) + .parse() + .unwrap(); + let transactions = format!("/ip4/127.0.0.1/tcp/{}/http", get_port()) + .parse() + .unwrap(); + + Self { + keypair, + id, + info: WorkerInfo { + primary_to_worker, + worker_to_worker, + transactions, + }, + } + } +} From 9ba1d9e8e4ac4536a4711375cb838be82eeaf747 Mon Sep 17 00:00:00 2001 From: Brandon Williams Date: Wed, 7 Sep 2022 11:20:17 -0700 Subject: [PATCH 03/17] primary: convert epoch-change tests to use a CommitteeFixture --- primary/tests/epoch_change.rs | 115 +++++++++++----------------------- 1 file changed, 38 insertions(+), 77 deletions(-) diff --git a/primary/tests/epoch_change.rs b/primary/tests/epoch_change.rs index 61e2339f8..35c72e86b 100644 --- a/primary/tests/epoch_change.rs +++ b/primary/tests/epoch_change.rs @@ -1,17 +1,15 @@ // Copyright (c) 2022, Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 use arc_swap::ArcSwap; -use config::{Committee, Epoch, Parameters}; +use config::{Committee, Parameters}; use fastcrypto::traits::KeyPair; use futures::future::join_all; use network::{CancelOnDropHandler, ReliableNetwork, WorkerToPrimaryNetwork}; use node::NodeStorage; use primary::{NetworkModel, Primary, CHANNEL_CAPACITY}; use prometheus::Registry; -use std::{collections::BTreeMap, sync::Arc, time::Duration}; -use test_utils::{ - keys, make_authority, pure_committee_from_keys, shared_worker_cache_from_keys, temp_dir, -}; +use std::{sync::Arc, time::Duration}; +use test_utils::{temp_dir, CommitteeFixture}; use tokio::sync::watch; use types::{ReconfigureNotification, WorkerPrimaryMessage}; @@ -24,16 +22,16 @@ async fn test_simple_epoch_change() { }; // The configuration of epoch 0. - let keys_0 = keys(None); - let committee_0 = pure_committee_from_keys(&keys_0); - let worker_cache_0 = shared_worker_cache_from_keys(&keys_0); + let fixture = CommitteeFixture::builder().randomize_ports(true).build(); + let committee_0 = fixture.committee(); + let worker_cache_0 = fixture.shared_worker_cache(); // Spawn the committee of epoch 0. let mut rx_channels = Vec::new(); let mut tx_channels = Vec::new(); - for keypair in keys_0 { - let name = keypair.public().clone(); - let signer = keypair; + for authority in fixture.authorities() { + let name = authority.public_key(); + let signer = authority.keypair().copy(); let (tx_new_certificates, rx_new_certificates) = test_utils::test_new_certificates_channel!(CHANNEL_CAPACITY); @@ -130,24 +128,16 @@ async fn test_partial_committee_change() { }; // Make the committee of epoch 0. - let keys_0 = keys(None); - let authorities_0: Vec<_> = keys_0.iter().map(|_| make_authority()).collect(); - let committee_0 = Committee { - epoch: Epoch::default(), - authorities: keys_0 - .iter() - .zip(authorities_0.clone().into_iter()) - .map(|(kp, authority)| (kp.public().clone(), authority)) - .collect(), - }; - let worker_cache_0 = shared_worker_cache_from_keys(&keys_0); + let mut fixture = CommitteeFixture::builder().randomize_ports(true).build(); + let committee_0 = fixture.committee(); + let worker_cache_0 = fixture.shared_worker_cache(); // Spawn the committee of epoch 0. let mut epoch_0_rx_channels = Vec::new(); let mut epoch_0_tx_channels = Vec::new(); - for keypair in keys_0 { - let name = keypair.public().clone(); - let signer = keypair; + for authority in fixture.authorities() { + let name = authority.public_key(); + let signer = authority.keypair().copy(); let (tx_new_certificates, rx_new_certificates) = test_utils::test_new_certificates_channel!(CHANNEL_CAPACITY); @@ -195,46 +185,17 @@ async fn test_partial_committee_change() { } // Make the committee of epoch 1. - let mut to_spawn = Vec::new(); - - let keys_0 = keys(None); - let keys_1 = keys(Some(1)); - let mut total_stake = 0; - let mut committee_keys = vec![]; - let authorities_1: BTreeMap<_, _> = authorities_0 - .into_iter() - .zip(keys_0.into_iter()) - .zip(keys_1.into_iter()) - .map(|((authority, key_0), key_1)| { - let stake = authority.stake; - let x = if total_stake < committee_0.validity_threshold() { - let pk = key_0.public().clone(); - committee_keys.push(key_0); - (pk, authority) - } else { - let new_authority = make_authority(); - let pk = key_1.public().clone(); - committee_keys.push(key_1.copy()); - to_spawn.push(key_1); - (pk, new_authority) - }; - total_stake += stake; - x - }) - .collect(); - - let committee_1 = Committee { - epoch: Epoch::default() + 1, - authorities: authorities_1, - }; - let worker_cache_1 = shared_worker_cache_from_keys(&committee_keys); + fixture.add_authority(); + fixture.bump_epoch(); + let committee_1 = fixture.committee(); + let worker_cache_1 = fixture.shared_worker_cache(); // Spawn the committee of epoch 1 (only the node not already booted). let mut epoch_1_rx_channels = Vec::new(); let mut epoch_1_tx_channels = Vec::new(); - for keypair in to_spawn { - let name = keypair.public().clone(); - let signer = keypair; + if let Some(authority) = fixture.authorities().last() { + let name = authority.public_key(); + let signer = authority.keypair().copy(); let (tx_new_certificates, rx_new_certificates) = test_utils::test_new_certificates_channel!(CHANNEL_CAPACITY); @@ -258,7 +219,7 @@ async fn test_partial_committee_change() { parameters.clone(), store.header_store.clone(), store.certificate_store.clone(), - store.payload_store.clone(), + store.payload_store, /* tx_consensus */ tx_new_certificates, /* rx_consensus */ rx_feedback, tx_get_block_commands, @@ -310,17 +271,17 @@ async fn test_restart_with_new_committee_change() { }; // The configuration of epoch 0. - let keys_0 = keys(None); - let committee_0 = pure_committee_from_keys(&keys_0); - let worker_cache_0 = shared_worker_cache_from_keys(&keys_0); + let fixture = CommitteeFixture::builder().randomize_ports(true).build(); + let committee_0 = fixture.committee(); + let worker_cache_0 = fixture.shared_worker_cache(); // Spawn the committee of epoch 0. let mut rx_channels = Vec::new(); let mut tx_channels = Vec::new(); let mut handles = Vec::new(); - for keypair in keys_0 { - let name = keypair.public().clone(); - let signer = keypair; + for authority in fixture.authorities() { + let name = authority.public_key(); + let signer = authority.keypair().copy(); let (tx_new_certificates, rx_new_certificates) = test_utils::test_new_certificates_channel!(CHANNEL_CAPACITY); @@ -401,9 +362,9 @@ async fn test_restart_with_new_committee_change() { let mut rx_channels = Vec::new(); let mut tx_channels = Vec::new(); let mut handles = Vec::new(); - for keypair in keys(None) { - let name = keypair.public().clone(); - let signer = keypair; + for authority in fixture.authorities() { + let name = authority.public_key(); + let signer = authority.keypair().copy(); let (tx_new_certificates, rx_new_certificates) = test_utils::test_channel!(CHANNEL_CAPACITY); @@ -483,16 +444,16 @@ async fn test_simple_committee_update() { }; // The configuration of epoch 0. - let keys_0 = keys(None); - let committee_0 = pure_committee_from_keys(&keys_0); - let worker_cache_0 = shared_worker_cache_from_keys(&keys_0); + let fixture = CommitteeFixture::builder().randomize_ports(true).build(); + let committee_0 = fixture.committee(); + let worker_cache_0 = fixture.shared_worker_cache(); // Spawn the committee of epoch 0. let mut rx_channels = Vec::new(); let mut tx_channels = Vec::new(); - for keypair in keys_0 { - let name = keypair.public().clone(); - let signer = keypair; + for authority in fixture.authorities() { + let name = authority.public_key(); + let signer = authority.keypair().copy(); let (tx_new_certificates, rx_new_certificates) = test_utils::test_channel!(CHANNEL_CAPACITY); From 5b46cac44ef9abdcc6180fcc663a1574921d12d2 Mon Sep 17 00:00:00 2001 From: Brandon Williams Date: Wed, 7 Sep 2022 11:25:29 -0700 Subject: [PATCH 04/17] primary: use proper fixture in validator-api tests --- primary/tests/integration_tests_validator_api.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/primary/tests/integration_tests_validator_api.rs b/primary/tests/integration_tests_validator_api.rs index a4b515062..d4853bf30 100644 --- a/primary/tests/integration_tests_validator_api.rs +++ b/primary/tests/integration_tests_validator_api.rs @@ -18,8 +18,8 @@ use std::{ use storage::CertificateStore; use store::Store; use test_utils::{ - certificate, fixture_batch_with_transactions, make_optimal_certificates, - make_optimal_signed_certificates, temp_dir, AuthorityFixture, CommitteeFixture, + fixture_batch_with_transactions, make_optimal_certificates, make_optimal_signed_certificates, + temp_dir, AuthorityFixture, CommitteeFixture, }; use tokio::sync::watch; use tonic::transport::Channel; @@ -68,7 +68,7 @@ async fn test_get_collections() { .build(author.keypair()) .unwrap(); - let certificate = certificate(&header); + let certificate = fixture.certificate(&header); let block_id = certificate.digest(); collection_ids.push(block_id); @@ -266,7 +266,7 @@ async fn test_remove_collections() { .build(author.keypair()) .unwrap(); - let certificate = certificate(&header); + let certificate = fixture.certificate(&header); let block_id = certificate.digest(); collection_ids.push(block_id); From 2d6525a53147af6f21ca5a254190d4eb73da1731 Mon Sep 17 00:00:00 2001 From: Brandon Williams Date: Wed, 7 Sep 2022 13:24:02 -0700 Subject: [PATCH 05/17] primary: convert helper tests to use a CommitteeFixture --- primary/src/tests/helper_tests.rs | 95 +++++++++++++++++-------------- 1 file changed, 53 insertions(+), 42 deletions(-) diff --git a/primary/src/tests/helper_tests.rs b/primary/src/tests/helper_tests.rs index f08d8fe83..daa010a96 100644 --- a/primary/src/tests/helper_tests.rs +++ b/primary/src/tests/helper_tests.rs @@ -15,9 +15,8 @@ use std::{ use storage::CertificateStore; use store::{reopen, rocks, rocks::DBMap, Store}; use test_utils::{ - certificate, fixture_batch_with_transactions, fixture_header_builder, keys, - resolve_name_committee_and_worker_cache, temp_dir, PrimaryToPrimaryMockServer, CERTIFICATES_CF, - CERTIFICATE_ID_BY_ROUND_CF, PAYLOAD_CF, + fixture_batch_with_transactions, temp_dir, CommitteeFixture, PrimaryToPrimaryMockServer, + CERTIFICATES_CF, CERTIFICATE_ID_BY_ROUND_CF, PAYLOAD_CF, }; use tokio::{sync::watch, time::timeout}; use types::{BatchDigest, Certificate, CertificateDigest, ReconfigureNotification, Round}; @@ -27,13 +26,15 @@ async fn test_process_certificates_stream_mode() { telemetry_subscribers::init_for_testing(); // GIVEN let (_, certificate_store, payload_store) = create_db_stores(); - let mut primary_keys = keys(None); - let author_key = primary_keys.pop().unwrap(); - let name = author_key.public().clone(); - let (requestor_name, committee, _) = resolve_name_committee_and_worker_cache(); - let (_tx_reconfigure, rx_reconfigure) = watch::channel(ReconfigureNotification::NewEpoch( - test_utils::committee(None), - )); + let fixture = CommitteeFixture::builder().randomize_ports(true).build(); + let committee = fixture.committee(); + let author = fixture.authorities().next().unwrap(); + let name = author.public_key(); + let requestor = fixture.authorities().nth(1).unwrap(); + let requestor_name = requestor.public_key(); + + let (_tx_reconfigure, rx_reconfigure) = + watch::channel(ReconfigureNotification::NewEpoch(committee.clone())); let (tx_primaries, rx_primaries) = test_utils::test_channel!(10); let own_address = @@ -41,7 +42,7 @@ async fn test_process_certificates_stream_mode() { .unwrap(); let network = anemo::Network::bind(own_address) .server_name("narwhal") - .private_key(author_key.copy().private().0.to_bytes()) + .private_key(author.keypair().copy().private().0.to_bytes()) .start(anemo::Router::new()) .unwrap(); @@ -71,12 +72,13 @@ async fn test_process_certificates_stream_mode() { // AND some mock certificates let mut certificates = HashMap::new(); for _ in 0..5 { - let header = fixture_header_builder() + let header = author + .header_builder(&committee) .with_payload_batch(fixture_batch_with_transactions(10), 0) - .build(&author_key) + .build(author.keypair()) .unwrap(); - let certificate = certificate(&header); + let certificate = fixture.certificate(&header); let id = certificate.clone().digest(); // write the certificate @@ -90,7 +92,7 @@ async fn test_process_certificates_stream_mode() { .primary(&requestor_name) .unwrap() .primary_to_primary; - let requestor_key = primary_keys.pop().unwrap(); + let requestor_key = requestor.keypair().copy(); let (mut handler, _network) = PrimaryToPrimaryMockServer::spawn(requestor_key, address); // Wait for connectivity @@ -141,10 +143,12 @@ async fn test_process_certificates_stream_mode() { async fn test_process_certificates_batch_mode() { // GIVEN let (_, certificate_store, payload_store) = create_db_stores(); - let mut primary_keys = keys(None); - let author_key = primary_keys.pop().unwrap(); - let name = author_key.public().clone(); - let (requestor_name, committee, _) = resolve_name_committee_and_worker_cache(); + let fixture = CommitteeFixture::builder().randomize_ports(true).build(); + let committee = fixture.committee(); + let author = fixture.authorities().next().unwrap(); + let name = author.public_key(); + let requestor = fixture.authorities().nth(1).unwrap(); + let requestor_name = requestor.public_key(); let (_tx_reconfigure, rx_reconfigure) = watch::channel(ReconfigureNotification::NewEpoch( test_utils::committee(None), )); @@ -155,7 +159,7 @@ async fn test_process_certificates_batch_mode() { .unwrap(); let network = anemo::Network::bind(own_address) .server_name("narwhal") - .private_key(author_key.copy().private().0.to_bytes()) + .private_key(author.keypair().copy().private().0.to_bytes()) .start(anemo::Router::new()) .unwrap(); @@ -187,12 +191,13 @@ async fn test_process_certificates_batch_mode() { let mut missing_certificates = HashSet::new(); for i in 0..10 { - let header = fixture_header_builder() + let header = author + .header_builder(&committee) .with_payload_batch(fixture_batch_with_transactions(10), 0) - .build(&author_key) + .build(author.keypair()) .unwrap(); - let certificate = certificate(&header); + let certificate = fixture.certificate(&header); let id = certificate.clone().digest(); certificates.insert(id, certificate.clone()); @@ -213,7 +218,7 @@ async fn test_process_certificates_batch_mode() { .primary(&requestor_name) .unwrap() .primary_to_primary; - let requestor_key = primary_keys.pop().unwrap(); + let requestor_key = requestor.keypair().copy(); let (mut handler, _network) = PrimaryToPrimaryMockServer::spawn(requestor_key, address); // Wait for connectivity @@ -276,10 +281,12 @@ async fn test_process_certificates_batch_mode() { async fn test_process_payload_availability_success() { // GIVEN let (_, certificate_store, payload_store) = create_db_stores(); - let mut primary_keys = keys(None); - let author_key = primary_keys.pop().unwrap(); - let name = author_key.public().clone(); - let (requestor_name, committee, _) = resolve_name_committee_and_worker_cache(); + let fixture = CommitteeFixture::builder().randomize_ports(true).build(); + let committee = fixture.committee(); + let author = fixture.authorities().next().unwrap(); + let name = author.public_key(); + let requestor = fixture.authorities().nth(1).unwrap(); + let requestor_name = requestor.public_key(); let (_tx_reconfigure, rx_reconfigure) = watch::channel(ReconfigureNotification::NewEpoch( test_utils::committee(None), )); @@ -290,7 +297,7 @@ async fn test_process_payload_availability_success() { .unwrap(); let network = anemo::Network::bind(own_address) .server_name("narwhal") - .private_key(author_key.copy().private().0.to_bytes()) + .private_key(author.keypair().copy().private().0.to_bytes()) .start(anemo::Router::new()) .unwrap(); @@ -322,12 +329,13 @@ async fn test_process_payload_availability_success() { let mut missing_certificates = HashSet::new(); for i in 0..10 { - let header = fixture_header_builder() + let header = author + .header_builder(&committee) .with_payload_batch(fixture_batch_with_transactions(10), 0) - .build(&author_key) + .build(author.keypair()) .unwrap(); - let certificate = certificate(&header); + let certificate = fixture.certificate(&header); let id = certificate.clone().digest(); certificates.insert(id, certificate.clone()); @@ -352,7 +360,7 @@ async fn test_process_payload_availability_success() { .primary(&requestor_name) .unwrap() .primary_to_primary; - let requestor_key = primary_keys.pop().unwrap(); + let requestor_key = requestor.keypair().copy(); let (mut handler, _network) = PrimaryToPrimaryMockServer::spawn(requestor_key, address); // Wait for connectivity @@ -434,10 +442,12 @@ async fn test_process_payload_availability_when_failures() { let payload_store: Store<(types::BatchDigest, WorkerId), PayloadToken> = Store::new(payload_map); - let mut primary_keys = keys(None); - let author_key = primary_keys.pop().unwrap(); - let name = author_key.public().clone(); - let (requestor_name, committee, _) = resolve_name_committee_and_worker_cache(); + let fixture = CommitteeFixture::builder().randomize_ports(true).build(); + let committee = fixture.committee(); + let author = fixture.authorities().next().unwrap(); + let name = author.public_key(); + let requestor = fixture.authorities().nth(1).unwrap(); + let requestor_name = requestor.public_key(); let (_tx_reconfigure, rx_reconfigure) = watch::channel(ReconfigureNotification::NewEpoch( test_utils::committee(None), )); @@ -448,7 +458,7 @@ async fn test_process_payload_availability_when_failures() { .unwrap(); let network = anemo::Network::bind(own_address) .server_name("narwhal") - .private_key(author_key.copy().private().0.to_bytes()) + .private_key(author.keypair().copy().private().0.to_bytes()) .start(anemo::Router::new()) .unwrap(); @@ -478,12 +488,13 @@ async fn test_process_payload_availability_when_failures() { // AND some mock certificates let mut certificate_ids = Vec::new(); for _ in 0..10 { - let header = fixture_header_builder() + let header = author + .header_builder(&committee) .with_payload_batch(fixture_batch_with_transactions(10), 0) - .build(&author_key) + .build(author.keypair()) .unwrap(); - let certificate = certificate(&header); + let certificate = fixture.certificate(&header); let id = certificate.clone().digest(); // In order to test an error scenario that is coming from the data store, @@ -516,7 +527,7 @@ async fn test_process_payload_availability_when_failures() { .primary(&requestor_name) .unwrap() .primary_to_primary; - let requestor_key = primary_keys.pop().unwrap(); + let requestor_key = requestor.keypair().copy(); let (mut handler, _network) = PrimaryToPrimaryMockServer::spawn(requestor_key, address); // Wait for connectivity From 965ffb14e2803ea9b98b66cad749986816faaf45 Mon Sep 17 00:00:00 2001 From: Brandon Williams Date: Wed, 7 Sep 2022 13:37:34 -0700 Subject: [PATCH 06/17] primary: convert proposer tests to use a CommitteeFixture --- primary/src/tests/proposer_tests.rs | 36 +++++++++++++++-------------- 1 file changed, 19 insertions(+), 17 deletions(-) diff --git a/primary/src/tests/proposer_tests.rs b/primary/src/tests/proposer_tests.rs index b8a1f8732..495e2e7d4 100644 --- a/primary/src/tests/proposer_tests.rs +++ b/primary/src/tests/proposer_tests.rs @@ -4,16 +4,19 @@ use super::*; use fastcrypto::traits::KeyPair; use prometheus::Registry; -use test_utils::{committee, keys, shared_worker_cache}; +use test_utils::CommitteeFixture; #[tokio::test] async fn propose_empty() { - let kp = keys(None).pop().unwrap(); - let name = kp.public().clone(); - let signature_service = SignatureService::new(kp); + let fixture = CommitteeFixture::builder().build(); + let committee = fixture.committee(); + let shared_worker_cache = fixture.shared_worker_cache(); + let primary = fixture.authorities().next().unwrap(); + let name = primary.public_key(); + let signature_service = SignatureService::new(primary.keypair().copy()); let (_tx_reconfigure, rx_reconfigure) = - watch::channel(ReconfigureNotification::NewEpoch(committee(None))); + watch::channel(ReconfigureNotification::NewEpoch(committee.clone())); let (_tx_parents, rx_parents) = test_utils::test_channel!(1); let (_tx_our_digests, rx_our_digests) = test_utils::test_channel!(1); let (tx_headers, mut rx_headers) = test_utils::test_channel!(1); @@ -23,7 +26,7 @@ async fn propose_empty() { // Spawn the proposer. let _proposer_handle = Proposer::spawn( name, - committee(None), + committee.clone(), signature_service, /* header_size */ 1_000, /* max_header_delay */ Duration::from_millis(20), @@ -39,19 +42,20 @@ async fn propose_empty() { let header = rx_headers.recv().await.unwrap(); assert_eq!(header.round, 1); assert!(header.payload.is_empty()); - assert!(header - .verify(&committee(None), shared_worker_cache(None)) - .is_ok()); + assert!(header.verify(&committee, shared_worker_cache).is_ok()); } #[tokio::test] async fn propose_payload() { - let kp = keys(None).pop().unwrap(); - let name = kp.public().clone(); - let signature_service = SignatureService::new(kp); + let fixture = CommitteeFixture::builder().build(); + let committee = fixture.committee(); + let shared_worker_cache = fixture.shared_worker_cache(); + let primary = fixture.authorities().next().unwrap(); + let name = primary.public_key(); + let signature_service = SignatureService::new(primary.keypair().copy()); let (_tx_reconfigure, rx_reconfigure) = - watch::channel(ReconfigureNotification::NewEpoch(committee(None))); + watch::channel(ReconfigureNotification::NewEpoch(committee.clone())); let (_tx_parents, rx_parents) = test_utils::test_channel!(1); let (tx_our_digests, rx_our_digests) = test_utils::test_channel!(1); let (tx_headers, mut rx_headers) = test_utils::test_channel!(1); @@ -61,7 +65,7 @@ async fn propose_payload() { // Spawn the proposer. let _proposer_handle = Proposer::spawn( name.clone(), - committee(None), + committee.clone(), signature_service, /* header_size */ 32, /* max_header_delay */ @@ -86,7 +90,5 @@ async fn propose_payload() { let header = rx_headers.recv().await.unwrap(); assert_eq!(header.round, 1); assert_eq!(header.payload.get(&digest), Some(&worker_id)); - assert!(header - .verify(&committee(None), shared_worker_cache(None)) - .is_ok()); + assert!(header.verify(&committee, shared_worker_cache).is_ok()); } From 58e88a67e82457fcfc127864783f7a4ecda5fe71 Mon Sep 17 00:00:00 2001 From: Brandon Williams Date: Wed, 7 Sep 2022 13:42:58 -0700 Subject: [PATCH 07/17] primary: convert synchronizer tests to use a CommitteeFixture --- primary/src/tests/synchronizer_tests.rs | 41 +++++++++++++++++-------- 1 file changed, 28 insertions(+), 13 deletions(-) diff --git a/primary/src/tests/synchronizer_tests.rs b/primary/src/tests/synchronizer_tests.rs index 8eb0f92ea..3462dadf6 100644 --- a/primary/src/tests/synchronizer_tests.rs +++ b/primary/src/tests/synchronizer_tests.rs @@ -5,16 +5,16 @@ use consensus::{dag::Dag, metrics::ConsensusMetrics}; use fastcrypto::{traits::KeyPair, Hash}; use prometheus::Registry; use std::{collections::BTreeSet, sync::Arc}; -use test_utils::{committee, keys, make_optimal_signed_certificates}; +use test_utils::{make_optimal_signed_certificates, CommitteeFixture}; use types::Certificate; #[tokio::test] async fn deliver_certificate_using_dag() { - let kp = keys(None).pop().unwrap(); - let name = kp.public().clone(); + let fixture = CommitteeFixture::builder().build(); + let name = fixture.authorities().next().unwrap().public_key(); // Make the current committee. - let committee = committee(None); + let committee = fixture.committee(); let (_, certificates_store, payload_store) = create_db_stores(); let (tx_header_waiter, _rx_header_waiter) = test_utils::test_channel!(1); @@ -41,8 +41,13 @@ async fn deliver_certificate_using_dag() { .map(|x| x.digest()) .collect::>(); + let keys = fixture + .authorities() + .map(|a| a.keypair().copy()) + .take(3) + .collect::>(); let (mut certificates, _next_parents) = - make_optimal_signed_certificates(1..=4, &genesis, &committee, &keys(None)[..3]); + make_optimal_signed_certificates(1..=4, &genesis, &committee, &keys); // insert the certificates in the DAG for certificate in certificates.clone() { @@ -62,11 +67,11 @@ async fn deliver_certificate_using_dag() { #[tokio::test] async fn deliver_certificate_using_store() { - let kp = keys(None).pop().unwrap(); - let name = kp.public().clone(); + let fixture = CommitteeFixture::builder().build(); + let name = fixture.authorities().next().unwrap().public_key(); // Make the current committee. - let committee = committee(None); + let committee = fixture.committee(); let (_, certificates_store, payload_store) = create_db_stores(); let (tx_header_waiter, _rx_header_waiter) = test_utils::test_channel!(1); @@ -89,8 +94,13 @@ async fn deliver_certificate_using_store() { .map(|x| x.digest()) .collect::>(); + let keys = fixture + .authorities() + .map(|a| a.keypair().copy()) + .take(3) + .collect::>(); let (mut certificates, _next_parents) = - make_optimal_signed_certificates(1..=4, &genesis, &committee, &keys(None)[..3]); + make_optimal_signed_certificates(1..=4, &genesis, &committee, &keys); // insert the certificates in the DAG for certificate in certificates.clone() { @@ -110,11 +120,11 @@ async fn deliver_certificate_using_store() { #[tokio::test] async fn deliver_certificate_not_found_parents() { - let kp = keys(None).pop().unwrap(); - let name = kp.public().clone(); + let fixture = CommitteeFixture::builder().build(); + let name = fixture.authorities().next().unwrap().public_key(); // Make the current committee. - let committee = committee(None); + let committee = fixture.committee(); let (_, certificates_store, payload_store) = create_db_stores(); let (tx_header_waiter, _rx_header_waiter) = test_utils::test_channel!(1); @@ -137,8 +147,13 @@ async fn deliver_certificate_not_found_parents() { .map(|x| x.digest()) .collect::>(); + let keys = fixture + .authorities() + .map(|a| a.keypair().copy()) + .take(3) + .collect::>(); let (mut certificates, _next_parents) = - make_optimal_signed_certificates(1..=4, &genesis, &committee, &keys(None)[..3]); + make_optimal_signed_certificates(1..=4, &genesis, &committee, &keys); // take the last one (top) and test for parents let test_certificate = certificates.pop_back().unwrap(); From a5cf1e8fa5bcd13caa75d60e61514d22a46a2b03 Mon Sep 17 00:00:00 2001 From: Brandon Williams Date: Wed, 7 Sep 2022 13:54:23 -0700 Subject: [PATCH 08/17] primary: convert certificate waiter tests to use a CommitteeFixture --- primary/src/tests/certificate_waiter_tests.rs | 51 +++++++++---------- test_utils/src/lib.rs | 25 +++++++++ 2 files changed, 49 insertions(+), 27 deletions(-) diff --git a/primary/src/tests/certificate_waiter_tests.rs b/primary/src/tests/certificate_waiter_tests.rs index cb0f156a7..8e5170e4d 100644 --- a/primary/src/tests/certificate_waiter_tests.rs +++ b/primary/src/tests/certificate_waiter_tests.rs @@ -12,22 +12,19 @@ use fastcrypto::{traits::KeyPair, Hash, SignatureService}; use network::{PrimaryNetwork, PrimaryToWorkerNetwork}; use prometheus::Registry; use std::{collections::BTreeSet, sync::Arc, time::Duration}; -use test_utils::{ - certificate, fixture_headers_round, keys, pure_committee_from_keys, - shared_worker_cache_from_keys, -}; +use test_utils::CommitteeFixture; use tokio::sync::watch; use types::{Certificate, PrimaryMessage, ReconfigureNotification, Round}; #[tokio::test] async fn process_certificate_missing_parents_in_reverse() { - let mut k = keys(None); - let committee = pure_committee_from_keys(&k); - let worker_cache = shared_worker_cache_from_keys(&k); - let kp = k.pop().unwrap(); - let network_key = kp.copy().private().0.to_bytes(); - let name = kp.public().clone(); - let signature_service = SignatureService::new(kp); + let fixture = CommitteeFixture::builder().randomize_ports(true).build(); + let committee = fixture.committee(); + let worker_cache = fixture.shared_worker_cache(); + let primary = fixture.authorities().next().unwrap(); + let network_key = primary.keypair().copy().private().0.to_bytes(); + let name = primary.public_key(); + let signature_service = SignatureService::new(primary.keypair().copy()); // kept empty let (_tx_reconfigure, rx_reconfigure) = @@ -138,9 +135,9 @@ async fn process_certificate_missing_parents_in_reverse() { for i in 0..rounds { let parents: BTreeSet<_> = current_round .into_iter() - .map(|header| certificate(&header).digest()) + .map(|header| fixture.certificate(&header).digest()) .collect(); - (_, current_round) = fixture_headers_round(i, &parents); + (_, current_round) = fixture.headers_round(i, &parents); headers.extend(current_round.clone()); } @@ -150,17 +147,17 @@ async fn process_certificate_missing_parents_in_reverse() { } // sanity-check - assert!(headers.len() == keys(None).len() * rounds as usize); // note we don't include genesis + assert!(headers.len() == fixture.authorities().count() * rounds as usize); // note we don't include genesis // the `rev()` below is important, as we want to test anti-topological arrival #[allow(clippy::needless_collect)] let ids: Vec<_> = headers .iter() - .map(|header| certificate(header).digest()) + .map(|header| fixture.certificate(header).digest()) .collect(); for header in headers.into_iter().rev() { tx_primary_messages - .send(PrimaryMessage::Certificate(certificate(&header))) + .send(PrimaryMessage::Certificate(fixture.certificate(&header))) .await .unwrap(); } @@ -175,13 +172,13 @@ async fn process_certificate_missing_parents_in_reverse() { #[tokio::test] async fn process_certificate_check_gc_fires() { - let mut k = keys(None); - let committee = pure_committee_from_keys(&k); - let worker_cache = shared_worker_cache_from_keys(&k); - let kp = k.pop().unwrap(); - let network_key = kp.copy().private().0.to_bytes(); - let name = kp.public().clone(); - let signature_service = SignatureService::new(kp); + let fixture = CommitteeFixture::builder().randomize_ports(true).build(); + let committee = fixture.committee(); + let worker_cache = fixture.shared_worker_cache(); + let primary = fixture.authorities().next().unwrap(); + let network_key = primary.keypair().copy().private().0.to_bytes(); + let name = primary.public_key(); + let signature_service = SignatureService::new(primary.keypair().copy()); // kept empty let (_tx_reconfigure, rx_reconfigure) = @@ -292,9 +289,9 @@ async fn process_certificate_check_gc_fires() { for i in 0..rounds { let parents: BTreeSet<_> = current_round .into_iter() - .map(|header| certificate(&header).digest()) + .map(|header| fixture.certificate(&header).digest()) .collect(); - (_, current_round) = fixture_headers_round(i, &parents); + (_, current_round) = fixture.headers_round(i, &parents); headers.extend(current_round.clone()); } @@ -304,11 +301,11 @@ async fn process_certificate_check_gc_fires() { } // sanity-check - assert!(headers.len() == keys(None).len() * rounds as usize); // note we don't include genesis + assert!(headers.len() == fixture.authorities().count() * rounds as usize); // note we don't include genesis // Just send the last header, the causal certificate completion cannot complete let header = headers.last().unwrap(); - let cert = certificate(header); + let cert = fixture.certificate(header); let id = cert.digest(); tx_primary_messages diff --git a/test_utils/src/lib.rs b/test_utils/src/lib.rs index 69a706db8..a08da5f68 100644 --- a/test_utils/src/lib.rs +++ b/test_utils/src/lib.rs @@ -1063,6 +1063,31 @@ impl CommitteeFixture { .collect() } + pub fn headers_round( + &self, + prior_round: Round, + parents: &BTreeSet, + ) -> (Round, Vec
) { + let round = prior_round + 1; + let next_headers = self + .authorities + .iter() + .map(|a| { + let builder = types::HeaderBuilder::default(); + builder + .author(a.public_key()) + .round(round) + .epoch(0) + .parents(parents.clone()) + .with_payload_batch(fixture_batch_with_transactions(10), 0) + .build(a.keypair()) + .unwrap() + }) + .collect(); + + (round, next_headers) + } + pub fn votes(&self, header: &Header) -> Vec { self.authorities() .flat_map(|a| { From 7c89a252ed07d598d7800d0f17fe3b56af003e2b Mon Sep 17 00:00:00 2001 From: Brandon Williams Date: Wed, 7 Sep 2022 14:16:36 -0700 Subject: [PATCH 09/17] primary: convert block waiter tests to use a CommitteeFixture --- primary/src/tests/block_waiter_tests.rs | 83 ++++++++++++++++++------- 1 file changed, 61 insertions(+), 22 deletions(-) diff --git a/primary/src/tests/block_waiter_tests.rs b/primary/src/tests/block_waiter_tests.rs index df1039f9d..06db3cf6e 100644 --- a/primary/src/tests/block_waiter_tests.rs +++ b/primary/src/tests/block_waiter_tests.rs @@ -15,9 +15,7 @@ use mockall::*; use network::PrimaryToWorkerNetwork; use std::{collections::HashMap, sync::Arc}; use test_utils::{ - certificate, fixture_batch_with_transactions, fixture_header_builder, - fixture_header_with_payload, keys, resolve_name_committee_and_worker_cache, - PrimaryToWorkerMockServer, + fixture_batch_with_transactions, fixture_payload, CommitteeFixture, PrimaryToWorkerMockServer, }; use tokio::{ sync::{oneshot, watch}, @@ -32,11 +30,19 @@ use types::{ #[tokio::test] async fn test_successfully_retrieve_block() { // GIVEN - let (name, committee, worker_cache) = resolve_name_committee_and_worker_cache(); + let fixture = CommitteeFixture::builder().randomize_ports(true).build(); + let committee = fixture.committee(); + let worker_cache = fixture.shared_worker_cache(); + let author = fixture.authorities().next().unwrap(); + let name = fixture.authorities().nth(1).unwrap().public_key(); // AND store certificate - let header = fixture_header_with_payload(2); - let certificate = certificate(&header); + let header = author + .header_builder(&committee) + .payload(fixture_payload(2)) + .build(author.keypair()) + .unwrap(); + let certificate = fixture.certificate(&header); let block_id = certificate.digest(); // AND spawn a new blocks waiter @@ -139,9 +145,12 @@ async fn test_successfully_retrieve_block() { #[tokio::test] async fn test_successfully_retrieve_multiple_blocks() { // GIVEN - let (name, committee, worker_cache) = resolve_name_committee_and_worker_cache(); + let fixture = CommitteeFixture::builder().randomize_ports(true).build(); + let committee = fixture.committee(); + let worker_cache = fixture.shared_worker_cache(); + let author = fixture.authorities().next().unwrap(); + let name = fixture.authorities().nth(1).unwrap().public_key(); - let key = keys(None).pop().unwrap(); let mut block_ids = Vec::new(); let mut expected_batch_messages = HashMap::new(); let worker_id = 0; @@ -155,7 +164,7 @@ async fn test_successfully_retrieve_multiple_blocks() { let common_batch_2 = fixture_batch_with_transactions(10); for i in 0..10 { - let mut builder = fixture_header_builder(); + let mut builder = author.header_builder(&committee); let batch_1 = fixture_batch_with_transactions(10); let batch_2 = fixture_batch_with_transactions(10); @@ -228,9 +237,9 @@ async fn test_successfully_retrieve_multiple_blocks() { // sort the batches to make sure that the response is the expected one. batches.sort_by(|a, b| a.id.cmp(&b.id)); - let header = builder.build(&key).unwrap(); + let header = builder.build(author.keypair()).unwrap(); - let certificate = certificate(&header); + let certificate = fixture.certificate(&header); certificates.push(certificate.clone()); block_ids.push(certificate.digest()); @@ -341,11 +350,19 @@ async fn test_successfully_retrieve_multiple_blocks() { #[tokio::test] async fn test_one_pending_request_for_block_at_time() { // GIVEN - let (name, committee, worker_cache) = resolve_name_committee_and_worker_cache(); + let fixture = CommitteeFixture::builder().randomize_ports(true).build(); + let committee = fixture.committee(); + let worker_cache = fixture.shared_worker_cache(); + let author = fixture.authorities().next().unwrap(); + let name = fixture.authorities().nth(1).unwrap().public_key(); // AND store certificate - let header = fixture_header_with_payload(2); - let certificate = certificate(&header); + let header = author + .header_builder(&committee) + .payload(fixture_payload(2)) + .build(author.keypair()) + .unwrap(); + let certificate = fixture.certificate(&header); let block_id = certificate.digest(); // AND @@ -419,11 +436,19 @@ async fn test_one_pending_request_for_block_at_time() { #[tokio::test] async fn test_unlocking_pending_get_block_request_after_response() { // GIVEN - let (name, committee, worker_cache) = resolve_name_committee_and_worker_cache(); + let fixture = CommitteeFixture::builder().randomize_ports(true).build(); + let committee = fixture.committee(); + let worker_cache = fixture.shared_worker_cache(); + let author = fixture.authorities().next().unwrap(); + let name = fixture.authorities().nth(1).unwrap().public_key(); // AND store certificate - let header = fixture_header_with_payload(2); - let certificate = certificate(&header); + let header = author + .header_builder(&committee) + .payload(fixture_payload(2)) + .build(author.keypair()) + .unwrap(); + let certificate = fixture.certificate(&header); let block_id = certificate.digest(); // AND spawn a new blocks waiter @@ -488,11 +513,19 @@ async fn test_unlocking_pending_get_block_request_after_response() { #[tokio::test] async fn test_batch_timeout() { // GIVEN - let (name, committee, worker_cache) = resolve_name_committee_and_worker_cache(); + let fixture = CommitteeFixture::builder().randomize_ports(true).build(); + let committee = fixture.committee(); + let worker_cache = fixture.shared_worker_cache(); + let author = fixture.authorities().next().unwrap(); + let name = fixture.authorities().nth(1).unwrap().public_key(); // AND store certificate - let header = fixture_header_with_payload(2); - let certificate = certificate(&header); + let header = author + .header_builder(&committee) + .payload(fixture_payload(2)) + .build(author.keypair()) + .unwrap(); + let certificate = fixture.certificate(&header); let block_id = certificate.digest(); // AND spawn a new blocks waiter @@ -559,7 +592,10 @@ async fn test_batch_timeout() { #[tokio::test] async fn test_return_error_when_certificate_is_missing() { // GIVEN - let (name, committee, worker_cache) = resolve_name_committee_and_worker_cache(); + let fixture = CommitteeFixture::builder().randomize_ports(true).build(); + let committee = fixture.committee(); + let worker_cache = fixture.shared_worker_cache(); + let name = fixture.authorities().nth(1).unwrap().public_key(); // AND create a certificate but don't store it let certificate = Certificate::default(); @@ -622,7 +658,10 @@ async fn test_return_error_when_certificate_is_missing() { #[tokio::test] async fn test_return_error_when_certificate_is_missing_when_get_blocks() { // GIVEN - let (name, committee, worker_cache) = resolve_name_committee_and_worker_cache(); + let fixture = CommitteeFixture::builder().randomize_ports(true).build(); + let committee = fixture.committee(); + let worker_cache = fixture.shared_worker_cache(); + let name = fixture.authorities().nth(1).unwrap().public_key(); // AND create a certificate but don't store it let certificate = Certificate::default(); From def51ab65c1aada078d7abb1601a661db06aa519 Mon Sep 17 00:00:00 2001 From: Brandon Williams Date: Wed, 7 Sep 2022 14:29:28 -0700 Subject: [PATCH 10/17] primary: convert header waiter tests to use a CommitteeFixture --- primary/src/tests/header_waiter_tests.rs | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/primary/src/tests/header_waiter_tests.rs b/primary/src/tests/header_waiter_tests.rs index de4394905..0eb61bae5 100644 --- a/primary/src/tests/header_waiter_tests.rs +++ b/primary/src/tests/header_waiter_tests.rs @@ -11,14 +11,19 @@ use fastcrypto::{traits::KeyPair, Hash}; use network::{PrimaryNetwork, PrimaryToWorkerNetwork}; use prometheus::Registry; use std::{sync::Arc, time::Duration}; -use test_utils::{fixture_header_with_payload, keys, resolve_name_committee_and_worker_cache}; +use test_utils::{fixture_payload, CommitteeFixture}; use tokio::{sync::watch, time::timeout}; use types::{BatchDigest, ReconfigureNotification, Round}; #[tokio::test] async fn successfully_synchronize_batches() { // GIVEN - let (name, committee, worker_cache) = resolve_name_committee_and_worker_cache(); + let fixture = CommitteeFixture::builder().randomize_ports(true).build(); + let committee = fixture.committee(); + let worker_cache = fixture.shared_worker_cache(); + let author = fixture.authorities().next().unwrap(); + let primary = fixture.authorities().nth(1).unwrap(); + let name = primary.public_key(); let (_, certificate_store, payload_store) = create_db_stores(); let gc_depth: Round = 1; let (_tx_reconfigure, rx_reconfigure) = @@ -31,11 +36,10 @@ async fn successfully_synchronize_batches() { let own_address = network::multiaddr_to_address(&committee.primary(&name).unwrap().primary_to_primary) .unwrap(); - let kp = keys(None).remove(2); let network = anemo::Network::bind(own_address) .server_name("narwhal") - .private_key(kp.private().0.to_bytes()) + .private_key(primary.keypair().copy().private().0.to_bytes()) .start(anemo::Router::new()) .unwrap(); @@ -59,7 +63,11 @@ async fn successfully_synchronize_batches() { // AND a header let worker_id = 0; - let header = fixture_header_with_payload(2); + let header = author + .header_builder(&committee) + .payload(fixture_payload(2)) + .build(author.keypair()) + .unwrap(); let missing_digests = vec![BatchDigest::default()]; let missing_digests_map = missing_digests .clone() From b1039e81f91a895860b51a60683d730e434e39f6 Mon Sep 17 00:00:00 2001 From: Brandon Williams Date: Wed, 7 Sep 2022 14:43:25 -0700 Subject: [PATCH 11/17] primary: convert block synchronizer handler tests to use a CommitteeFixture --- .../block_synchronizer/tests/handler_tests.rs | 69 ++++++++++++++++--- test_utils/src/lib.rs | 8 --- 2 files changed, 60 insertions(+), 17 deletions(-) diff --git a/primary/src/block_synchronizer/tests/handler_tests.rs b/primary/src/block_synchronizer/tests/handler_tests.rs index 8ac81ab92..9430e1481 100644 --- a/primary/src/block_synchronizer/tests/handler_tests.rs +++ b/primary/src/block_synchronizer/tests/handler_tests.rs @@ -10,8 +10,8 @@ use crate::{ }; use fastcrypto::Hash; use std::{collections::HashSet, time::Duration}; -use test_utils::{certificate, fixture_header_with_payload}; -use types::{Certificate, CertificateDigest, PrimaryMessage}; +use test_utils::{fixture_payload, CommitteeFixture}; +use types::{CertificateDigest, PrimaryMessage}; #[tokio::test] async fn test_get_and_synchronize_block_headers_when_fetched_from_storage() { @@ -27,8 +27,17 @@ async fn test_get_and_synchronize_block_headers_when_fetched_from_storage() { certificate_deliver_timeout: Duration::from_millis(2_000), }; + let fixture = CommitteeFixture::builder().build(); + let committee = fixture.committee(); + let author = fixture.authorities().next().unwrap(); + // AND dummy certificate - let certificate = certificate(&fixture_header_with_payload(1)); + let header = author + .header_builder(&committee) + .payload(fixture_payload(1)) + .build(author.keypair()) + .unwrap(); + let certificate = fixture.certificate(&header); // AND let block_ids = vec![CertificateDigest::default()]; @@ -76,12 +85,26 @@ async fn test_get_and_synchronize_block_headers_when_fetched_from_peers() { certificate_deliver_timeout: Duration::from_millis(2_000), }; + let fixture = CommitteeFixture::builder().build(); + let committee = fixture.committee(); + let author = fixture.authorities().next().unwrap(); + // AND a certificate stored - let cert_stored = certificate(&fixture_header_with_payload(1)); + let header = author + .header_builder(&committee) + .payload(fixture_payload(1)) + .build(author.keypair()) + .unwrap(); + let cert_stored = fixture.certificate(&header); certificate_store.write(cert_stored.clone()).unwrap(); // AND a certificate NOT stored - let cert_missing = certificate(&fixture_header_with_payload(2)); + let header = author + .header_builder(&committee) + .payload(fixture_payload(2)) + .build(author.keypair()) + .unwrap(); + let cert_missing = fixture.certificate(&header); // AND let mut block_ids = HashSet::new(); @@ -161,12 +184,26 @@ async fn test_get_and_synchronize_block_headers_timeout_on_causal_completion() { certificate_deliver_timeout: Duration::from_millis(2_000), }; + let fixture = CommitteeFixture::builder().build(); + let committee = fixture.committee(); + let author = fixture.authorities().next().unwrap(); + // AND a certificate stored - let cert_stored = certificate(&fixture_header_with_payload(1)); + let header = author + .header_builder(&committee) + .payload(fixture_payload(1)) + .build(author.keypair()) + .unwrap(); + let cert_stored = fixture.certificate(&header); certificate_store.write(cert_stored.clone()).unwrap(); // AND a certificate NOT stored - let cert_missing = certificate(&fixture_header_with_payload(2)); + let header = author + .header_builder(&committee) + .payload(fixture_payload(2)) + .build(author.keypair()) + .unwrap(); + let cert_missing = fixture.certificate(&header); // AND let block_ids = vec![cert_stored.digest(), cert_missing.digest()]; @@ -228,14 +265,28 @@ async fn test_synchronize_block_payload() { certificate_deliver_timeout: Duration::from_millis(2_000), }; + let fixture = CommitteeFixture::builder().build(); + let committee = fixture.committee(); + let author = fixture.authorities().next().unwrap(); + // AND a certificate with payload already available - let cert_stored: Certificate = certificate(&fixture_header_with_payload(1)); + let header = author + .header_builder(&committee) + .payload(fixture_payload(1)) + .build(author.keypair()) + .unwrap(); + let cert_stored = fixture.certificate(&header); for e in cert_stored.clone().header.payload { payload_store.write(e, 1).await; } // AND a certificate with payload NOT available - let cert_missing = certificate(&fixture_header_with_payload(2)); + let header = author + .header_builder(&committee) + .payload(fixture_payload(2)) + .build(author.keypair()) + .unwrap(); + let cert_missing = fixture.certificate(&header); // AND let block_ids = vec![cert_stored.digest(), cert_missing.digest()]; diff --git a/test_utils/src/lib.rs b/test_utils/src/lib.rs index a08da5f68..41c130a75 100644 --- a/test_utils/src/lib.rs +++ b/test_utils/src/lib.rs @@ -431,14 +431,6 @@ pub fn fixture_payload(number_of_batches: u8) -> IndexMap payload } -pub fn fixture_header_with_payload(number_of_batches: u8) -> Header { - let kp = keys(None).pop().unwrap(); - let payload = fixture_payload(number_of_batches); - - let builder = fixture_header_builder(); - builder.payload(payload).build(&kp).unwrap() -} - // will create a batch with randomly formed transactions // dictated by the parameter number_of_transactions pub fn fixture_batch_with_transactions(number_of_transactions: u32) -> Batch { From 5f12d670d030fc53162eee0b5daf3cdfa4bff7e0 Mon Sep 17 00:00:00 2001 From: Brandon Williams Date: Wed, 7 Sep 2022 14:51:05 -0700 Subject: [PATCH 12/17] storage: convert certificate store tests to use a CommitteeFixture --- storage/src/certificate_store.rs | 15 +++++++-------- test_utils/src/lib.rs | 2 +- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/storage/src/certificate_store.rs b/storage/src/certificate_store.rs index 3ddd5468c..1e9757207 100644 --- a/storage/src/certificate_store.rs +++ b/storage/src/certificate_store.rs @@ -325,9 +325,7 @@ mod test { reopen, rocks::{open_cf, DBMap}, }; - use test_utils::{ - certificate, certificate_from_committee, committee, fixture_headers_round, temp_dir, - }; + use test_utils::{temp_dir, CommitteeFixture}; use types::{Certificate, CertificateDigest, Round}; fn new_store(path: std::path::PathBuf) -> CertificateStore { @@ -348,24 +346,25 @@ mod test { // helper method that creates certificates for the provided // number of rounds. fn certificates(rounds: u64) -> Vec { - let mut current_round: Vec<_> = Certificate::genesis(&committee(None)) + let fixture = CommitteeFixture::builder().build(); + let committee = fixture.committee(); + let mut current_round: Vec<_> = Certificate::genesis(&committee) .into_iter() .map(|cert| cert.header) .collect(); - let committee = &committee(None); let mut result: Vec = Vec::new(); for i in 0..rounds { let parents: BTreeSet<_> = current_round .iter() - .map(|header| certificate_from_committee(header, committee).digest()) + .map(|header| fixture.certificate(header).digest()) .collect(); - (_, current_round) = fixture_headers_round(i, &parents); + (_, current_round) = fixture.headers_round(i, &parents); result.extend( current_round .iter() - .map(certificate) + .map(|h| fixture.certificate(h)) .collect::>(), ); } diff --git a/test_utils/src/lib.rs b/test_utils/src/lib.rs index 41c130a75..c97f2714b 100644 --- a/test_utils/src/lib.rs +++ b/test_utils/src/lib.rs @@ -475,7 +475,7 @@ pub fn votes(header: &Header) -> Vec { } // Fixture -pub fn certificate_from_committee(header: &Header, committee: &Committee) -> Certificate { +fn certificate_from_committee(header: &Header, committee: &Committee) -> Certificate { let votes: Vec<_> = votes(header) .into_iter() .map(|x| (x.author, x.signature)) From e0bbe4343ea47d8bd7db1a91a4368728b5f6faaf Mon Sep 17 00:00:00 2001 From: Brandon Williams Date: Wed, 7 Sep 2022 15:01:42 -0700 Subject: [PATCH 13/17] primary: convert block remover tests to use a CommitteeFixture --- primary/src/tests/block_remover_tests.rs | 58 +++++++++++++----------- 1 file changed, 32 insertions(+), 26 deletions(-) diff --git a/primary/src/tests/block_remover_tests.rs b/primary/src/tests/block_remover_tests.rs index 3a1ec2f2d..7064870bc 100644 --- a/primary/src/tests/block_remover_tests.rs +++ b/primary/src/tests/block_remover_tests.rs @@ -20,15 +20,9 @@ use futures::{ use network::PrimaryToWorkerNetwork; use prometheus::Registry; use std::{borrow::Borrow, collections::HashMap, sync::Arc, time::Duration}; -use test_utils::{ - certificate, fixture_batch_with_transactions, fixture_header_builder, keys, - resolve_name_committee_and_worker_cache, PrimaryToWorkerMockServer, -}; +use test_utils::{fixture_batch_with_transactions, CommitteeFixture, PrimaryToWorkerMockServer}; use tokio::{ - sync::{ - mpsc::{self}, - watch, - }, + sync::{mpsc, watch}, task::JoinHandle, time::{sleep, timeout}, }; @@ -45,7 +39,12 @@ async fn test_successful_blocks_delete() { let (tx_delete_batches, rx_delete_batches) = test_utils::test_channel!(10); // AND the necessary keys - let (name, committee, worker_cache) = resolve_name_committee_and_worker_cache(); + let fixture = CommitteeFixture::builder().randomize_ports(true).build(); + let committee = fixture.committee(); + let worker_cache = fixture.shared_worker_cache(); + let author = fixture.authorities().next().unwrap(); + let primary = fixture.authorities().nth(1).unwrap(); + let name = primary.public_key(); let (_tx_reconfigure, rx_reconfigure) = watch::channel(ReconfigureNotification::NewEpoch(committee.clone())); // AND a Dag with genesis populated @@ -72,8 +71,6 @@ async fn test_successful_blocks_delete() { let mut header_ids = Vec::new(); let handlers = FuturesUnordered::new(); - let key = keys(None).pop().unwrap(); - let mut worker_batches: HashMap> = HashMap::new(); let worker_id_0 = 0; @@ -84,13 +81,14 @@ async fn test_successful_blocks_delete() { let batch_1 = fixture_batch_with_transactions(10); let batch_2 = fixture_batch_with_transactions(10); - let header = fixture_header_builder() + let header = author + .header_builder(&committee) .with_payload_batch(batch_1.clone(), worker_id_0) .with_payload_batch(batch_2.clone(), worker_id_1) - .build(&key) + .build(author.keypair()) .unwrap(); - let certificate = certificate(&header); + let certificate = fixture.certificate(&header); let block_id = certificate.digest(); // write the certificate @@ -210,7 +208,12 @@ async fn test_timeout() { let (tx_removed_certificates, _rx_removed_certificates) = test_utils::test_channel!(10); // AND the necessary keys - let (name, committee, worker_cache) = resolve_name_committee_and_worker_cache(); + let fixture = CommitteeFixture::builder().randomize_ports(true).build(); + let committee = fixture.committee(); + let worker_cache = fixture.shared_worker_cache(); + let author = fixture.authorities().next().unwrap(); + let primary = fixture.authorities().nth(1).unwrap(); + let name = primary.public_key(); let (_tx_reconfigure, rx_reconfigure) = watch::channel(ReconfigureNotification::NewEpoch(committee.clone())); // AND a Dag with genesis populated @@ -236,8 +239,6 @@ async fn test_timeout() { let mut block_ids = Vec::new(); let mut header_ids = Vec::new(); - let key = keys(None).pop().unwrap(); - let mut worker_batches: HashMap> = HashMap::new(); let worker_id_2 = 2; @@ -248,13 +249,14 @@ async fn test_timeout() { let batch_1 = fixture_batch_with_transactions(10); let batch_2 = fixture_batch_with_transactions(10); - let header = fixture_header_builder() + let header = author + .header_builder(&committee) .with_payload_batch(batch_1.clone(), worker_id_2) .with_payload_batch(batch_2.clone(), worker_id_3) - .build(&key) + .build(author.keypair()) .unwrap(); - let certificate = certificate(&header); + let certificate = fixture.certificate(&header); let block_id = certificate.digest(); // write the certificate @@ -346,7 +348,12 @@ async fn test_unlocking_pending_requests() { let (tx_removed_certificates, _rx_removed_certificates) = test_utils::test_channel!(10); // AND the necessary keys - let (name, committee, worker_cache) = resolve_name_committee_and_worker_cache(); + let fixture = CommitteeFixture::builder().randomize_ports(true).build(); + let committee = fixture.committee(); + let worker_cache = fixture.shared_worker_cache(); + let author = fixture.authorities().next().unwrap(); + let primary = fixture.authorities().nth(1).unwrap(); + let name = primary.public_key(); let (_, rx_reconfigure) = watch::channel(ReconfigureNotification::NewEpoch(committee.clone())); // AND a Dag with genesis populated @@ -375,17 +382,16 @@ async fn test_unlocking_pending_requests() { let mut block_ids = Vec::new(); let mut header_ids = Vec::new(); - let key = keys(None).pop().unwrap(); - let worker_id_0 = 0; let batch = fixture_batch_with_transactions(10); - let header = fixture_header_builder() + let header = author + .header_builder(&committee) .with_payload_batch(batch.clone(), worker_id_0) - .build(&key) + .build(author.keypair()) .unwrap(); - let certificate = certificate(&header); + let certificate = fixture.certificate(&header); let block_id = certificate.digest(); // write the certificate From 04b1c0dd484c9706a945d4eb09d3fe6d4afcb524 Mon Sep 17 00:00:00 2001 From: Brandon Williams Date: Wed, 7 Sep 2022 15:17:45 -0700 Subject: [PATCH 14/17] primary: convert block synchronizer tests to use a CommitteeFixture --- .../tests/block_synchronizer_tests.rs | 181 ++++++++++-------- 1 file changed, 102 insertions(+), 79 deletions(-) diff --git a/primary/src/block_synchronizer/tests/block_synchronizer_tests.rs b/primary/src/block_synchronizer/tests/block_synchronizer_tests.rs index 761729ec9..28847d183 100644 --- a/primary/src/block_synchronizer/tests/block_synchronizer_tests.rs +++ b/primary/src/block_synchronizer/tests/block_synchronizer_tests.rs @@ -18,10 +18,7 @@ use std::{ collections::{HashMap, HashSet}, time::Duration, }; -use test_utils::{ - certificate, fixture_batch_with_transactions, fixture_header_builder, keys, - resolve_name_committee_and_worker_cache, PrimaryToPrimaryMockServer, -}; +use test_utils::{fixture_batch_with_transactions, CommitteeFixture, PrimaryToPrimaryMockServer}; use tokio::{ sync::{mpsc, watch}, task::JoinHandle, @@ -41,9 +38,13 @@ async fn test_successful_headers_synchronization() { let (_, certificate_store, payload_store) = create_db_stores(); // AND the necessary keys - let (name, committee, worker_cache) = resolve_name_committee_and_worker_cache(); - - let mut primary_keys = keys(None); + let fixture = CommitteeFixture::builder().randomize_ports(true).build(); + let committee = fixture.committee(); + let worker_cache = fixture.shared_worker_cache(); + let author = fixture.authorities().next().unwrap(); + let primary = fixture.authorities().nth(1).unwrap(); + let name = primary.public_key(); + let network_key = primary.keypair().copy().private().0.to_bytes(); let (_tx_reconfigure, rx_reconfigure) = watch::channel(ReconfigureNotification::NewEpoch(committee.clone())); @@ -54,7 +55,6 @@ async fn test_successful_headers_synchronization() { // AND some blocks (certificates) let mut certificates: HashMap = HashMap::new(); - let key = keys(None).pop().unwrap(); let worker_id_0 = 0; let worker_id_1 = 1; @@ -63,13 +63,14 @@ async fn test_successful_headers_synchronization() { let batch_1 = fixture_batch_with_transactions(10); let batch_2 = fixture_batch_with_transactions(10); - let header = fixture_header_builder() + let header = author + .header_builder(&committee) .with_payload_batch(batch_1.clone(), worker_id_0) .with_payload_batch(batch_2.clone(), worker_id_1) - .build(&key) + .build(author.keypair()) .unwrap(); - let certificate = certificate(&header); + let certificate = fixture.certificate(&header); certificates.insert(certificate.clone().digest(), certificate.clone()); } @@ -78,10 +79,9 @@ async fn test_successful_headers_synchronization() { network::multiaddr_to_address(&committee.primary(&name).unwrap().primary_to_primary) .unwrap(); println!("New primary added: {:?}", own_address); - let kp = primary_keys.remove(2); let network = anemo::Network::bind(own_address) .server_name("narwhal") - .private_key(kp.private().0.to_bytes()) + .private_key(network_key) .start(anemo::Router::new()) .unwrap(); @@ -116,12 +116,16 @@ async fn test_successful_headers_synchronization() { // AND let's assume that all the primaries are responding with the full set // of requested certificates. - let handlers: FuturesUnordered>> = primary_keys - .into_iter() - .map(|kp| { - let address = committee.primary(kp.public()).unwrap().primary_to_primary; + let handlers: FuturesUnordered>> = fixture + .authorities() + .filter(|a| a.public_key() != name) + .map(|a| { + let address = committee + .primary(&a.public_key()) + .unwrap() + .primary_to_primary; println!("New primary added: {:?}", address); - primary_listener(1, kp, address) + primary_listener(1, a.keypair().copy(), address) }) .collect(); @@ -234,9 +238,13 @@ async fn test_successful_payload_synchronization() { let (_, certificate_store, payload_store) = create_db_stores(); // AND the necessary keys - let (name, committee, worker_cache) = resolve_name_committee_and_worker_cache(); - - let mut primary_keys = keys(None); + let fixture = CommitteeFixture::builder().randomize_ports(true).build(); + let committee = fixture.committee(); + let worker_cache = fixture.shared_worker_cache(); + let author = fixture.authorities().next().unwrap(); + let primary = fixture.authorities().nth(1).unwrap(); + let name = primary.public_key(); + let network_key = primary.keypair().copy().private().0.to_bytes(); let (_tx_reconfigure, rx_reconfigure) = watch::channel(ReconfigureNotification::NewEpoch(committee.clone())); @@ -248,7 +256,6 @@ async fn test_successful_payload_synchronization() { // AND some blocks (certificates) let mut certificates: HashMap = HashMap::new(); - let key = keys(None).pop().unwrap(); let worker_id_0: u32 = 0; let worker_id_1: u32 = 1; @@ -257,13 +264,14 @@ async fn test_successful_payload_synchronization() { let batch_1 = fixture_batch_with_transactions(10); let batch_2 = fixture_batch_with_transactions(10); - let header = fixture_header_builder() + let header = author + .header_builder(&committee) .with_payload_batch(batch_1.clone(), worker_id_0) .with_payload_batch(batch_2.clone(), worker_id_1) - .build(&key) + .build(author.keypair()) .unwrap(); - let certificate = certificate(&header); + let certificate = fixture.certificate(&header); certificates.insert(certificate.clone().digest(), certificate.clone()); } @@ -272,10 +280,9 @@ async fn test_successful_payload_synchronization() { network::multiaddr_to_address(&committee.primary(&name).unwrap().primary_to_primary) .unwrap(); println!("New primary added: {:?}", own_address); - let kp = primary_keys.remove(2); let network = anemo::Network::bind(own_address) .server_name("narwhal") - .private_key(kp.private().0.to_bytes()) + .private_key(network_key) .start(anemo::Router::new()) .unwrap(); @@ -310,12 +317,16 @@ async fn test_successful_payload_synchronization() { // AND let's assume that all the primaries are responding with the full set // of requested certificates. - let handlers_primaries: FuturesUnordered>> = primary_keys - .into_iter() - .map(|kp| { - let address = committee.primary(kp.public()).unwrap().primary_to_primary; + let handlers_primaries: FuturesUnordered>> = fixture + .authorities() + .filter(|a| a.public_key() != name) + .map(|a| { + let address = committee + .primary(&a.public_key()) + .unwrap() + .primary_to_primary; println!("New primary added: {:?}", address); - primary_listener(1, kp, address) + primary_listener(1, a.keypair().copy(), address) }) .collect(); @@ -468,7 +479,13 @@ async fn test_successful_payload_synchronization() { async fn test_multiple_overlapping_requests() { // GIVEN let (_, certificate_store, payload_store) = create_db_stores(); - let (name, committee, worker_cache) = resolve_name_committee_and_worker_cache(); + let fixture = CommitteeFixture::builder().randomize_ports(true).build(); + let committee = fixture.committee(); + let worker_cache = fixture.shared_worker_cache(); + let author = fixture.authorities().next().unwrap(); + let primary = fixture.authorities().nth(1).unwrap(); + let name = primary.public_key(); + let network_key = primary.keypair().copy().private().0.to_bytes(); let (_, rx_reconfigure) = watch::channel(ReconfigureNotification::NewEpoch(committee.clone())); let (_, rx_commands) = test_utils::test_channel!(10); @@ -478,16 +495,15 @@ async fn test_multiple_overlapping_requests() { // AND some blocks (certificates) let mut certificates: HashMap = HashMap::new(); - let key = keys(None).pop().unwrap(); - // AND generate headers with distributed batches between 2 workers (0 and 1) for _ in 0..5 { - let header = fixture_header_builder() + let header = author + .header_builder(&committee) .with_payload_batch(fixture_batch_with_transactions(10), 0) - .build(&key) + .build(author.keypair()) .unwrap(); - let certificate = certificate(&header); + let certificate = fixture.certificate(&header); certificates.insert(certificate.clone().digest(), certificate.clone()); } @@ -498,10 +514,9 @@ async fn test_multiple_overlapping_requests() { network::multiaddr_to_address(&committee.primary(&name).unwrap().primary_to_primary) .unwrap(); println!("New primary added: {:?}", own_address); - let kp = keys(None).remove(2); let network = anemo::Network::bind(own_address) .server_name("narwhal") - .private_key(kp.private().0.to_bytes()) + .private_key(network_key) .start(anemo::Router::new()) .unwrap(); @@ -597,8 +612,13 @@ async fn test_timeout_while_waiting_for_certificates() { let (_, certificate_store, payload_store) = create_db_stores(); // AND the necessary keys - let (name, committee, worker_cache) = resolve_name_committee_and_worker_cache(); - let key = keys(None).pop().unwrap(); + let fixture = CommitteeFixture::builder().randomize_ports(true).build(); + let committee = fixture.committee(); + let worker_cache = fixture.shared_worker_cache(); + let author = fixture.authorities().next().unwrap(); + let primary = fixture.authorities().nth(1).unwrap(); + let name = primary.public_key(); + let network_key = primary.keypair().copy().private().0.to_bytes(); let (_tx_reconfigure, rx_reconfigure) = watch::channel(ReconfigureNotification::NewEpoch(committee.clone())); @@ -610,12 +630,13 @@ async fn test_timeout_while_waiting_for_certificates() { let block_ids: Vec = (0..10) .into_iter() .map(|_| { - let header = fixture_header_builder() + let header = author + .header_builder(&committee) .with_payload_batch(fixture_batch_with_transactions(10), 0) - .build(&key) + .build(author.keypair()) .unwrap(); - certificate(&header).digest() + fixture.certificate(&header).digest() }) .collect(); @@ -623,10 +644,9 @@ async fn test_timeout_while_waiting_for_certificates() { network::multiaddr_to_address(&committee.primary(&name).unwrap().primary_to_primary) .unwrap(); println!("New primary added: {:?}", own_address); - let kp = keys(None).remove(2); let network = anemo::Network::bind(own_address) .server_name("narwhal") - .private_key(kp.private().0.to_bytes()) + .private_key(network_key) .start(anemo::Router::new()) .unwrap(); @@ -700,8 +720,13 @@ async fn test_reply_with_certificates_already_in_storage() { let (_, certificate_store, payload_store) = create_db_stores(); // AND the necessary keys - let (name, committee, worker_cache) = resolve_name_committee_and_worker_cache(); - let key = keys(None).pop().unwrap(); + let fixture = CommitteeFixture::builder().randomize_ports(true).build(); + let committee = fixture.committee(); + let worker_cache = fixture.shared_worker_cache(); + let author = fixture.authorities().next().unwrap(); + let primary = fixture.authorities().nth(1).unwrap(); + let name = primary.public_key(); + let network_key = primary.keypair().copy().private().0.to_bytes(); let (_, rx_reconfigure) = watch::channel(ReconfigureNotification::NewEpoch(committee.clone())); let (_, rx_commands) = test_utils::test_channel!(10); @@ -711,11 +736,10 @@ async fn test_reply_with_certificates_already_in_storage() { let own_address = network::multiaddr_to_address(&committee.primary(&name).unwrap().primary_to_primary) .unwrap(); - let kp = keys(None).remove(2); let network = anemo::Network::bind(own_address) .server_name("narwhal") - .private_key(kp.private().0.to_bytes()) + .private_key(network_key) .start(anemo::Router::new()) .unwrap(); @@ -747,12 +771,13 @@ async fn test_reply_with_certificates_already_in_storage() { for i in 1..=8 { let batch = fixture_batch_with_transactions(10); - let header = fixture_header_builder() + let header = author + .header_builder(&committee) .with_payload_batch(batch.clone(), 0) - .build(&key) + .build(author.keypair()) .unwrap(); - let certificate = certificate(&header); + let certificate = fixture.certificate(&header); block_ids.push(certificate.digest()); certificates.insert(certificate.clone().digest(), certificate.clone()); @@ -803,8 +828,13 @@ async fn test_reply_with_payload_already_in_storage() { let (_, certificate_store, payload_store) = create_db_stores(); // AND the necessary keys - let (name, committee, worker_cache) = resolve_name_committee_and_worker_cache(); - let key = keys(None).pop().unwrap(); + let fixture = CommitteeFixture::builder().randomize_ports(true).build(); + let committee = fixture.committee(); + let worker_cache = fixture.shared_worker_cache(); + let author = fixture.authorities().next().unwrap(); + let primary = fixture.authorities().nth(1).unwrap(); + let name = primary.public_key(); + let network_key = primary.keypair().copy().private().0.to_bytes(); let (_, rx_reconfigure) = watch::channel(ReconfigureNotification::NewEpoch(committee.clone())); let (_, rx_commands) = test_utils::test_channel!(10); @@ -814,11 +844,10 @@ async fn test_reply_with_payload_already_in_storage() { let own_address = network::multiaddr_to_address(&committee.primary(&name).unwrap().primary_to_primary) .unwrap(); - let kp = keys(None).remove(2); let network = anemo::Network::bind(own_address) .server_name("narwhal") - .private_key(kp.private().0.to_bytes()) + .private_key(network_key) .start(anemo::Router::new()) .unwrap(); let synchronizer = BlockSynchronizer { @@ -849,12 +878,13 @@ async fn test_reply_with_payload_already_in_storage() { for i in 1..=8 { let batch = fixture_batch_with_transactions(10); - let header = fixture_header_builder() + let header = author + .header_builder(&committee) .with_payload_batch(batch.clone(), 0) - .build(&key) + .build(author.keypair()) .unwrap(); - let certificate: Certificate = certificate(&header); + let certificate = fixture.certificate(&header); certificates.push(certificate.clone()); certificates_map.insert(certificate.clone().digest(), certificate.clone()); @@ -909,12 +939,15 @@ async fn test_reply_with_payload_already_in_storage_for_own_certificates() { let (_, certificate_store, payload_store) = create_db_stores(); // AND the necessary keys - let (_, committee, worker_cache) = resolve_name_committee_and_worker_cache(); - let key = keys(None).pop().unwrap(); + let fixture = CommitteeFixture::builder().randomize_ports(true).build(); + let committee = fixture.committee(); + let worker_cache = fixture.shared_worker_cache(); + let primary = fixture.authorities().next().unwrap(); + let network_key = primary.keypair().copy().private().0.to_bytes(); // AND make sure the key used for our "own" primary is the one that will // be used to create the headers. - let name = key.public().clone(); + let name = primary.public_key(); let (_, rx_reconfigure) = watch::channel(ReconfigureNotification::NewEpoch(committee.clone())); let (_, rx_commands) = test_utils::test_channel!(10); @@ -924,11 +957,10 @@ async fn test_reply_with_payload_already_in_storage_for_own_certificates() { let own_address = network::multiaddr_to_address(&committee.primary(&name).unwrap().primary_to_primary) .unwrap(); - let kp = keys(None).remove(2); let network = anemo::Network::bind(own_address) .server_name("narwhal") - .private_key(kp.private().0.to_bytes()) + .private_key(network_key) .start(anemo::Router::new()) .unwrap(); let synchronizer = BlockSynchronizer { @@ -958,22 +990,13 @@ async fn test_reply_with_payload_already_in_storage_for_own_certificates() { for _ in 0..5 { let batch = fixture_batch_with_transactions(10); - let builder = types::HeaderBuilder::default(); - let header = builder - .author(name.clone()) - .round(1) - .epoch(0) - .parents( - Certificate::genesis(&committee) - .iter() - .map(|x| x.digest()) - .collect(), - ) + let header = primary + .header_builder(&committee) .with_payload_batch(batch.clone(), 0) - .build(&key) + .build(primary.keypair()) .unwrap(); - let certificate: Certificate = certificate(&header); + let certificate = fixture.certificate(&header); certificates.push(certificate.clone()); certificates_map.insert(certificate.clone().digest(), certificate.clone()); From 5d276b4bd332be8182ce226e8c83dcac42e61507 Mon Sep 17 00:00:00 2001 From: Brandon Williams Date: Wed, 7 Sep 2022 15:18:03 -0700 Subject: [PATCH 15/17] test-utils: remove unused fixture generators --- test_utils/src/lib.rs | 131 +----------------------------------------- 1 file changed, 1 insertion(+), 130 deletions(-) diff --git a/test_utils/src/lib.rs b/test_utils/src/lib.rs index c97f2714b..334a2ec50 100644 --- a/test_utils/src/lib.rs +++ b/test_utils/src/lib.rs @@ -6,7 +6,7 @@ use config::{ utils::get_available_port, Authority, Committee, Epoch, PrimaryAddresses, SharedWorkerCache, WorkerCache, WorkerId, WorkerIndex, WorkerInfo, }; -use crypto::{KeyPair, PublicKey, Signature}; +use crypto::{KeyPair, PublicKey}; use fastcrypto::{ traits::{KeyPair as _, Signer as _}, Digest, Hash as _, @@ -324,95 +324,6 @@ pub fn make_consensus_store(store_path: &std::path::Path) -> Arc Arc::new(ConsensusStore::new(last_committed_map, sequence_map)) } -// Fixture -pub fn header() -> Header { - header_with_epoch(&committee(None)) -} - -// Fixture -pub fn headers() -> Vec
{ - keys(None) - .into_iter() - .map(|kp| { - let header = Header { - author: kp.public().clone(), - round: 1, - parents: Certificate::genesis(&committee(None)) - .iter() - .map(|x| x.digest()) - .collect(), - ..Header::default() - }; - let header_digest = header.digest(); - Header { - id: header_digest, - signature: kp.sign(Digest::from(header_digest).as_ref()), - ..header - } - }) - .collect() -} - -// Fixture -pub fn header_with_epoch(committee: &Committee) -> Header { - let kp = keys(None).pop().unwrap(); - let header = Header { - author: kp.public().clone(), - round: 1, - epoch: committee.epoch(), - parents: Certificate::genesis(committee) - .iter() - .map(|x| x.digest()) - .collect(), - ..Header::default() - }; - - let header_digest = header.digest(); - Header { - id: header_digest, - signature: kp.sign(Digest::from(header_digest).as_ref()), - ..header - } -} - -pub fn fixture_header_builder() -> types::HeaderBuilder { - let kp = keys(None).pop().unwrap(); - - let builder = types::HeaderBuilder::default(); - builder - .author(kp.public().clone()) - .round(1) - .epoch(0) - .parents( - Certificate::genesis(&committee(None)) - .iter() - .map(|x| x.digest()) - .collect(), - ) -} - -pub fn fixture_headers_round( - prior_round: Round, - parents: &BTreeSet, -) -> (Round, Vec
) { - let round = prior_round + 1; - let next_headers: Vec<_> = keys(None) - .into_iter() - .map(|kp| { - let builder = types::HeaderBuilder::default(); - builder - .author(kp.public().clone()) - .round(round) - .epoch(0) - .parents(parents.clone()) - .with_payload_batch(fixture_batch_with_transactions(10), 0) - .build(&kp) - .unwrap() - }) - .collect(); - (round, next_headers) -} - pub fn fixture_payload(number_of_batches: u8) -> IndexMap { let mut payload: IndexMap = IndexMap::new(); @@ -447,46 +358,6 @@ pub fn transaction() -> Transaction { (0..100).map(|_v| rand::random::()).collect() } -// Fixture -pub fn votes(header: &Header) -> Vec { - keys(None) - .into_iter() - .flat_map(|kp| { - // we should not re-sign using the key of the authority - // that produced the header - if kp.public() == &header.author { - None - } else { - let vote = Vote { - id: header.id, - round: header.round, - epoch: header.epoch, - origin: header.author.clone(), - author: kp.public().clone(), - signature: Signature::default(), - }; - Some(Vote { - signature: kp.sign(Digest::from(vote.digest()).as_ref()), - ..vote - }) - } - }) - .collect() -} - -// Fixture -fn certificate_from_committee(header: &Header, committee: &Committee) -> Certificate { - let votes: Vec<_> = votes(header) - .into_iter() - .map(|x| (x.author, x.signature)) - .collect(); - Certificate::new(committee, header.clone(), votes).unwrap() -} - -pub fn certificate(header: &Header) -> Certificate { - certificate_from_committee(header, &committee(None)) -} - #[derive(Clone)] pub struct PrimaryToPrimaryMockServer { sender: Sender, From a209bb084a180c924465790afe6b8a3269790b06 Mon Sep 17 00:00:00 2001 From: Brandon Williams Date: Wed, 7 Sep 2022 15:22:48 -0700 Subject: [PATCH 16/17] node: convert smoke-tests to use a CommitteeFixture --- node/tests/node_smoke_test.rs | 30 +++++++++++++++++++++--------- 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/node/tests/node_smoke_test.rs b/node/tests/node_smoke_test.rs index 92215af43..6ceaa806d 100644 --- a/node/tests/node_smoke_test.rs +++ b/node/tests/node_smoke_test.rs @@ -3,7 +3,7 @@ use config::Export; use std::time::{Duration, Instant}; -use test_utils::{keys, pure_committee_from_keys, shared_worker_cache_from_keys, temp_dir}; +use test_utils::{temp_dir, CommitteeFixture}; const TEST_DURATION: Duration = Duration::from_secs(3); @@ -14,15 +14,21 @@ fn test_primary_no_consensus() { let now = Instant::now(); let duration = TEST_DURATION; - let keys = keys(None); + let fixture = CommitteeFixture::builder().randomize_ports(true).build(); + let committee = fixture.committee(); + let worker_cache = fixture.shared_worker_cache(); let keys_file_path = format!("{config_path}/smoke_test_keys.json"); - keys[0].export(&keys_file_path).unwrap(); + fixture + .authorities() + .next() + .unwrap() + .keypair() + .export(&keys_file_path) + .unwrap(); - let committee = pure_committee_from_keys(&keys); let committee_file_path = format!("{config_path}/smoke_test_committee.json"); committee.export(&committee_file_path).unwrap(); - let worker_cache = shared_worker_cache_from_keys(&keys); let workers_file_path = format!("{config_path}/smoke_test_workers.json"); worker_cache.export(&workers_file_path).unwrap(); @@ -70,15 +76,21 @@ fn test_primary_with_consensus() { let now = Instant::now(); let duration = TEST_DURATION; - let keys = keys(None); + let fixture = CommitteeFixture::builder().randomize_ports(true).build(); + let committee = fixture.committee(); + let worker_cache = fixture.shared_worker_cache(); let keys_file_path = format!("{config_path}/smoke_test_keys.json"); - keys[0].export(&keys_file_path).unwrap(); + fixture + .authorities() + .next() + .unwrap() + .keypair() + .export(&keys_file_path) + .unwrap(); - let committee = pure_committee_from_keys(&keys); let committee_file_path = format!("{config_path}/smoke_test_committee.json"); committee.export(&committee_file_path).unwrap(); - let worker_cache = shared_worker_cache_from_keys(&keys); let workers_file_path = format!("{config_path}/smoke_test_workers.json"); worker_cache.export(&workers_file_path).unwrap(); From ae94260d048b9de56c01af46c447d02bf3512cef Mon Sep 17 00:00:00 2001 From: Brandon Williams Date: Wed, 7 Sep 2022 15:24:17 -0700 Subject: [PATCH 17/17] primary: don't use test_utils::committee in helper tests --- primary/src/tests/helper_tests.rs | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/primary/src/tests/helper_tests.rs b/primary/src/tests/helper_tests.rs index daa010a96..367b9a7e4 100644 --- a/primary/src/tests/helper_tests.rs +++ b/primary/src/tests/helper_tests.rs @@ -149,9 +149,8 @@ async fn test_process_certificates_batch_mode() { let name = author.public_key(); let requestor = fixture.authorities().nth(1).unwrap(); let requestor_name = requestor.public_key(); - let (_tx_reconfigure, rx_reconfigure) = watch::channel(ReconfigureNotification::NewEpoch( - test_utils::committee(None), - )); + let (_tx_reconfigure, rx_reconfigure) = + watch::channel(ReconfigureNotification::NewEpoch(committee.clone())); let (tx_primaries, rx_primaries) = test_utils::test_channel!(10); let own_address = @@ -287,9 +286,8 @@ async fn test_process_payload_availability_success() { let name = author.public_key(); let requestor = fixture.authorities().nth(1).unwrap(); let requestor_name = requestor.public_key(); - let (_tx_reconfigure, rx_reconfigure) = watch::channel(ReconfigureNotification::NewEpoch( - test_utils::committee(None), - )); + let (_tx_reconfigure, rx_reconfigure) = + watch::channel(ReconfigureNotification::NewEpoch(committee.clone())); let (tx_primaries, rx_primaries) = test_utils::test_channel!(10); let own_address = @@ -448,9 +446,8 @@ async fn test_process_payload_availability_when_failures() { let name = author.public_key(); let requestor = fixture.authorities().nth(1).unwrap(); let requestor_name = requestor.public_key(); - let (_tx_reconfigure, rx_reconfigure) = watch::channel(ReconfigureNotification::NewEpoch( - test_utils::committee(None), - )); + let (_tx_reconfigure, rx_reconfigure) = + watch::channel(ReconfigureNotification::NewEpoch(committee.clone())); let (tx_primaries, rx_primaries) = test_utils::test_channel!(10); let own_address =