diff --git a/Cargo.lock b/Cargo.lock index 8403b42df5f..7ac31a60432 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2955,6 +2955,7 @@ dependencies = [ "genesis", "hashset_delay", "hex 0.4.2", + "itertools 0.9.0", "lazy_static", "lighthouse_metrics", "matches", @@ -3832,6 +3833,7 @@ dependencies = [ "hex 0.4.2", "http 0.2.1", "hyper 0.13.5", + "itertools 0.9.0", "lazy_static", "lighthouse_metrics", "network", diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index fddb6911048..d46590cfd1d 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -378,14 +378,10 @@ impl BeaconChain { block_root: Hash256, slot: Slot, ) -> Result, Error> { - Ok(self - .rev_iter_block_roots_from(block_root)? - .find(|result| match result { - Ok((_, ancestor_slot)) => *ancestor_slot == slot, - Err(_) => true, - }) - .transpose()? - .map(|(ancestor_block_root, _)| ancestor_block_root)) + process_results(self.rev_iter_block_roots_from(block_root)?, |mut iter| { + iter.find(|(_, ancestor_slot)| *ancestor_slot == slot) + .map(|(ancestor_block_root, _)| ancestor_block_root) + }) } /// Iterates across all `(state_root, slot)` pairs from the head of the chain (inclusive) to @@ -416,14 +412,10 @@ impl BeaconChain { &self, slot: Slot, ) -> Result>, Error> { - let root = self - .rev_iter_block_roots()? - .find(|result| match result { - Ok((_, this_slot)) => *this_slot == slot, - Err(_) => true, - }) - .transpose()? - .map(|(root, _)| root); + let root = process_results(self.rev_iter_state_roots()?, |mut iter| { + iter.find(|(_, this_slot)| *this_slot == slot) + .map(|(root, _)| root) + })?; if let Some(block_root) = root { Ok(self.store.get_item(&block_root)?) diff --git a/beacon_node/network/Cargo.toml b/beacon_node/network/Cargo.toml index e9181e88b2e..ed19c473404 100644 --- a/beacon_node/network/Cargo.toml +++ b/beacon_node/network/Cargo.toml @@ -37,3 +37,4 @@ rlp = "0.4.5" lazy_static = "1.4.0" lighthouse_metrics = { path = "../../common/lighthouse_metrics" } environment = { path = "../../lighthouse/environment" } +itertools = "0.9.0" diff --git a/beacon_node/network/src/router/processor.rs b/beacon_node/network/src/router/processor.rs index 119424a336a..4a50eddb05f 100644 --- a/beacon_node/network/src/router/processor.rs +++ b/beacon_node/network/src/router/processor.rs @@ -9,10 +9,10 @@ use beacon_chain::{ }; use eth2_libp2p::rpc::*; use eth2_libp2p::{NetworkGlobals, PeerId, Request, Response}; +use itertools::process_results; use slog::{debug, error, o, trace, warn}; use ssz::Encode; use std::sync::Arc; -use store::errors::Error as StoreError; use store::Store; use tokio::sync::mpsc; use types::{ @@ -358,14 +358,10 @@ impl Processor { // pick out the required blocks, ignoring skip-slots and stepping by the step parameter; let mut last_block_root = None; - let maybe_block_roots: Result>, StoreError> = forwards_block_root_iter - .take_while(|result| match result { - Ok((_, slot)) => slot.as_u64() < req.start_slot + req.count * req.step, - Err(_) => true, - }) - // map skip slots to None - .map(|result| { - result.map(|(root, _)| { + let maybe_block_roots = process_results(forwards_block_root_iter, |iter| { + iter.take_while(|(_, slot)| slot.as_u64() < req.start_slot + req.count * req.step) + // map skip slots to None + .map(|(root, _)| { let result = if Some(root) == last_block_root { None } else { @@ -374,9 +370,9 @@ impl Processor { last_block_root = Some(root); result }) - }) - .step_by(req.step as usize) - .collect::>(); + .step_by(req.step as usize) + .collect::>>() + }); let block_roots = match maybe_block_roots { Ok(block_roots) => block_roots, diff --git a/beacon_node/rest_api/Cargo.toml b/beacon_node/rest_api/Cargo.toml index 18db2094f52..443320269c9 100644 --- a/beacon_node/rest_api/Cargo.toml +++ b/beacon_node/rest_api/Cargo.toml @@ -37,6 +37,7 @@ futures = "0.3.5" operation_pool = { path = "../operation_pool" } rayon = "1.3.0" environment = { path = "../../lighthouse/environment" } +itertools = "0.9.0" [dev-dependencies] assert_matches = "1.3.0" diff --git a/beacon_node/rest_api/src/helpers.rs b/beacon_node/rest_api/src/helpers.rs index 830f6a8ddd2..a8a2d5af77a 100644 --- a/beacon_node/rest_api/src/helpers.rs +++ b/beacon_node/rest_api/src/helpers.rs @@ -5,6 +5,7 @@ use eth2_libp2p::PubsubMessage; use hex; use http::header; use hyper::{Body, Request}; +use itertools::process_results; use network::NetworkMessage; use ssz::Decode; use store::{iter::AncestorIter, Store}; @@ -118,18 +119,14 @@ pub fn block_root_at_slot( beacon_chain: &BeaconChain, target: Slot, ) -> Result, ApiError> { - Ok(beacon_chain - .rev_iter_block_roots()? - .take_while(|result| match result { - Ok((_, slot)) => *slot >= target, - Err(_) => true, - }) - .find(|result| match result { - Ok((_, slot)) => *slot == target, - Err(_) => true, - }) - .transpose()? - .map(|(root, _)| root)) + Ok(process_results( + beacon_chain.rev_iter_block_roots()?, + |iter| { + iter.take_while(|(_, slot)| *slot >= target) + .find(|(_, slot)| *slot == target) + .map(|(root, _)| root) + }, + )?) } /// Returns a `BeaconState` and it's root in the canonical chain of `beacon_chain` at the given @@ -197,16 +194,15 @@ pub fn state_root_at_slot( // // Iterate through the state roots on the head state to find the root for that // slot. Once the root is found, load it from the database. - Ok(head_state - .try_iter_ancestor_roots(beacon_chain.store.clone()) - .ok_or_else(|| ApiError::ServerError("Failed to create roots iterator".to_string()))? - .find(|result| match result { - Ok((_, s)) => *s == slot, - Err(_) => true, - }) - .transpose()? - .map(|(root, _)| root) - .ok_or_else(|| ApiError::NotFound(format!("Unable to find state at slot {}", slot)))?) + process_results( + head_state + .try_iter_ancestor_roots(beacon_chain.store.clone()) + .ok_or_else(|| { + ApiError::ServerError("Failed to create roots iterator".to_string()) + })?, + |mut iter| iter.find(|(_, s)| *s == slot).map(|(root, _)| root), + )? + .ok_or_else(|| ApiError::NotFound(format!("Unable to find state at slot {}", slot))) } else { // 4. The request slot is later than the head slot. // diff --git a/beacon_node/store/src/forwards_iter.rs b/beacon_node/store/src/forwards_iter.rs index c6c11902668..2971eeafeac 100644 --- a/beacon_node/store/src/forwards_iter.rs +++ b/beacon_node/store/src/forwards_iter.rs @@ -3,6 +3,7 @@ use crate::chunked_vector::BlockRoots; use crate::errors::{Error, Result}; use crate::iter::BlockRootsIterator; use crate::{HotColdDB, Store}; +use itertools::process_results; use std::sync::Arc; use types::{BeaconState, ChainSpec, EthSpec, Hash256, Slot}; @@ -65,13 +66,14 @@ impl SimpleForwardsBlockRootsIterator { end_block_root: Hash256, ) -> Result { // Iterate backwards from the end state, stopping at the start slot. - let values = std::iter::once(Ok((end_block_root, end_state.slot))) - .chain(BlockRootsIterator::owned(store, end_state)) - .take_while(|result| match result { - Ok((_, slot)) => *slot >= start_slot, - Err(_) => true, - }) - .collect::>>()?; + let values = process_results( + std::iter::once(Ok((end_block_root, end_state.slot))) + .chain(BlockRootsIterator::owned(store, end_state)), + |iter| { + iter.take_while(|(_, slot)| *slot >= start_slot) + .collect::>() + }, + )?; Ok(Self { values: values }) } }