Skip to content

Commit

Permalink
Leverage itertools::process_results() in few places
Browse files Browse the repository at this point in the history
  • Loading branch information
adaszko committed Jun 9, 2020
1 parent e0cdc6b commit 96f34ab
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 57 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 8 additions & 16 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,14 +378,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
block_root: Hash256,
slot: Slot,
) -> Result<Option<Hash256>, Error> {
Ok(self
.rev_iter_block_roots_from(block_root)?
.find(|result| match result {
Ok((_, ancestor_slot)) => *ancestor_slot == slot,
Err(_) => true,
})
.transpose()?
.map(|(ancestor_block_root, _)| ancestor_block_root))
process_results(self.rev_iter_block_roots_from(block_root)?, |mut iter| {
iter.find(|(_, ancestor_slot)| *ancestor_slot == slot)
.map(|(ancestor_block_root, _)| ancestor_block_root)
})
}

/// Iterates across all `(state_root, slot)` pairs from the head of the chain (inclusive) to
Expand Down Expand Up @@ -416,14 +412,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
&self,
slot: Slot,
) -> Result<Option<SignedBeaconBlock<T::EthSpec>>, Error> {
let root = self
.rev_iter_block_roots()?
.find(|result| match result {
Ok((_, this_slot)) => *this_slot == slot,
Err(_) => true,
})
.transpose()?
.map(|(root, _)| root);
let root = process_results(self.rev_iter_state_roots()?, |mut iter| {
iter.find(|(_, this_slot)| *this_slot == slot)
.map(|(root, _)| root)
})?;

if let Some(block_root) = root {
Ok(self.store.get_item(&block_root)?)
Expand Down
1 change: 1 addition & 0 deletions beacon_node/network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,4 @@ rlp = "0.4.5"
lazy_static = "1.4.0"
lighthouse_metrics = { path = "../../common/lighthouse_metrics" }
environment = { path = "../../lighthouse/environment" }
itertools = "0.9.0"
20 changes: 8 additions & 12 deletions beacon_node/network/src/router/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ use beacon_chain::{
};
use eth2_libp2p::rpc::*;
use eth2_libp2p::{NetworkGlobals, PeerId, Request, Response};
use itertools::process_results;
use slog::{debug, error, o, trace, warn};
use ssz::Encode;
use std::sync::Arc;
use store::errors::Error as StoreError;
use store::Store;
use tokio::sync::mpsc;
use types::{
Expand Down Expand Up @@ -358,14 +358,10 @@ impl<T: BeaconChainTypes> Processor<T> {

// pick out the required blocks, ignoring skip-slots and stepping by the step parameter;
let mut last_block_root = None;
let maybe_block_roots: Result<Vec<Option<Hash256>>, StoreError> = forwards_block_root_iter
.take_while(|result| match result {
Ok((_, slot)) => slot.as_u64() < req.start_slot + req.count * req.step,
Err(_) => true,
})
// map skip slots to None
.map(|result| {
result.map(|(root, _)| {
let maybe_block_roots = process_results(forwards_block_root_iter, |iter| {
iter.take_while(|(_, slot)| slot.as_u64() < req.start_slot + req.count * req.step)
// map skip slots to None
.map(|(root, _)| {
let result = if Some(root) == last_block_root {
None
} else {
Expand All @@ -374,9 +370,9 @@ impl<T: BeaconChainTypes> Processor<T> {
last_block_root = Some(root);
result
})
})
.step_by(req.step as usize)
.collect::<Result<_, _>>();
.step_by(req.step as usize)
.collect::<Vec<Option<Hash256>>>()
});

let block_roots = match maybe_block_roots {
Ok(block_roots) => block_roots,
Expand Down
1 change: 1 addition & 0 deletions beacon_node/rest_api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ rayon = "1.3.0"
environment = { path = "../../lighthouse/environment" }
uhttp_sse = "0.5.1"
bus = "2.2.3"
itertools = "0.9.0"

[dev-dependencies]
assert_matches = "1.3.0"
Expand Down
40 changes: 18 additions & 22 deletions beacon_node/rest_api/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use eth2_libp2p::PubsubMessage;
use hex;
use http::header;
use hyper::{Body, Request};
use itertools::process_results;
use network::NetworkMessage;
use ssz::Decode;
use store::{iter::AncestorIter, Store};
Expand Down Expand Up @@ -118,18 +119,14 @@ pub fn block_root_at_slot<T: BeaconChainTypes>(
beacon_chain: &BeaconChain<T>,
target: Slot,
) -> Result<Option<Hash256>, ApiError> {
Ok(beacon_chain
.rev_iter_block_roots()?
.take_while(|result| match result {
Ok((_, slot)) => *slot >= target,
Err(_) => true,
})
.find(|result| match result {
Ok((_, slot)) => *slot == target,
Err(_) => true,
})
.transpose()?
.map(|(root, _)| root))
Ok(process_results(
beacon_chain.rev_iter_block_roots()?,
|iter| {
iter.take_while(|(_, slot)| *slot >= target)
.find(|(_, slot)| *slot == target)
.map(|(root, _)| root)
},
)?)
}

/// Returns a `BeaconState` and it's root in the canonical chain of `beacon_chain` at the given
Expand Down Expand Up @@ -197,16 +194,15 @@ pub fn state_root_at_slot<T: BeaconChainTypes>(
//
// Iterate through the state roots on the head state to find the root for that
// slot. Once the root is found, load it from the database.
Ok(head_state
.try_iter_ancestor_roots(beacon_chain.store.clone())
.ok_or_else(|| ApiError::ServerError("Failed to create roots iterator".to_string()))?
.find(|result| match result {
Ok((_, s)) => *s == slot,
Err(_) => true,
})
.transpose()?
.map(|(root, _)| root)
.ok_or_else(|| ApiError::NotFound(format!("Unable to find state at slot {}", slot)))?)
process_results(
head_state
.try_iter_ancestor_roots(beacon_chain.store.clone())
.ok_or_else(|| {
ApiError::ServerError("Failed to create roots iterator".to_string())
})?,
|mut iter| iter.find(|(_, s)| *s == slot).map(|(root, _)| root),
)?
.ok_or_else(|| ApiError::NotFound(format!("Unable to find state at slot {}", slot)))
} else {
// 4. The request slot is later than the head slot.
//
Expand Down
16 changes: 9 additions & 7 deletions beacon_node/store/src/forwards_iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::chunked_vector::BlockRoots;
use crate::errors::{Error, Result};
use crate::iter::BlockRootsIterator;
use crate::{HotColdDB, Store};
use itertools::process_results;
use std::sync::Arc;
use types::{BeaconState, ChainSpec, EthSpec, Hash256, Slot};

Expand Down Expand Up @@ -65,13 +66,14 @@ impl SimpleForwardsBlockRootsIterator {
end_block_root: Hash256,
) -> Result<Self> {
// Iterate backwards from the end state, stopping at the start slot.
let values = std::iter::once(Ok((end_block_root, end_state.slot)))
.chain(BlockRootsIterator::owned(store, end_state))
.take_while(|result| match result {
Ok((_, slot)) => *slot >= start_slot,
Err(_) => true,
})
.collect::<Result<Vec<_>>>()?;
let values = process_results(
std::iter::once(Ok((end_block_root, end_state.slot)))
.chain(BlockRootsIterator::owned(store, end_state)),
|iter| {
iter.take_while(|(_, slot)| *slot >= start_slot)
.collect::<Vec<_>>()
},
)?;
Ok(Self { values: values })
}
}
Expand Down

0 comments on commit 96f34ab

Please sign in to comment.