diff --git a/Cargo.lock b/Cargo.lock index 0a9315e325e..bf7b2564417 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -289,6 +289,7 @@ dependencies = [ "futures 0.3.5", "genesis", "integer-sqrt", + "itertools 0.9.0", "lazy_static", "lighthouse_metrics", "log 0.4.8", @@ -2994,6 +2995,7 @@ dependencies = [ "genesis", "hashset_delay", "hex 0.4.2", + "itertools 0.9.0", "lazy_static", "lighthouse_metrics", "matches", @@ -3871,6 +3873,7 @@ dependencies = [ "hex 0.4.2", "http 0.2.1", "hyper 0.13.6", + "itertools 0.9.0", "lazy_static", "lighthouse_metrics", "network", diff --git a/beacon_node/beacon_chain/Cargo.toml b/beacon_node/beacon_chain/Cargo.toml index 262cc485cb6..2bcedec8763 100644 --- a/beacon_node/beacon_chain/Cargo.toml +++ b/beacon_node/beacon_chain/Cargo.toml @@ -48,7 +48,7 @@ bls = { path = "../../crypto/bls" } safe_arith = { path = "../../consensus/safe_arith" } environment = { path = "../../lighthouse/environment" } bus = "2.2.3" +itertools = "0.9.0" [dev-dependencies] lazy_static = "1.4.0" - diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index ff293d8bcdf..7e7f76c7a77 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -23,6 +23,7 @@ use crate::snapshot_cache::SnapshotCache; use crate::timeout_rw_lock::TimeoutRwLock; use crate::validator_pubkey_cache::ValidatorPubkeyCache; use crate::BeaconSnapshot; +use itertools::process_results; use operation_pool::{OperationPool, PersistedOperationPool}; use slog::{crit, debug, error, info, trace, warn, Logger}; use slot_clock::SlotClock; @@ -319,12 +320,16 @@ impl BeaconChain { /// - Iterator returns `(Hash256, Slot)`. /// - As this iterator starts at the `head` of the chain (viz., the best block), the first slot /// returned may be earlier than the wall-clock slot. - pub fn rev_iter_block_roots(&self) -> Result, Error> { + pub fn rev_iter_block_roots( + &self, + ) -> Result>, Error> { let head = self.head()?; - let iter = BlockRootsIterator::owned(self.store.clone(), head.beacon_state); - - Ok(std::iter::once((head.beacon_block_root, head.beacon_block.slot())).chain(iter)) + Ok( + std::iter::once(Ok((head.beacon_block_root, head.beacon_block.slot()))) + .chain(iter) + .map(|result| result.map_err(|e| e.into())), + ) } pub fn forwards_iter_block_roots( @@ -339,7 +344,7 @@ impl BeaconChain { local_head.beacon_state, local_head.beacon_block_root, &self.spec, - )) + )?) } /// Traverse backwards from `block_root` to find the block roots of its ancestors. @@ -354,7 +359,7 @@ impl BeaconChain { pub fn rev_iter_block_roots_from( &self, block_root: Hash256, - ) -> Result, Error> { + ) -> Result>, Error> { let block = self .get_block(&block_root)? .ok_or_else(|| Error::MissingBeaconBlock(block_root))?; @@ -362,7 +367,9 @@ impl BeaconChain { .get_state(&block.state_root(), Some(block.slot()))? .ok_or_else(|| Error::MissingBeaconState(block.state_root()))?; let iter = BlockRootsIterator::owned(self.store.clone(), state); - Ok(std::iter::once((block_root, block.slot())).chain(iter)) + Ok(std::iter::once(Ok((block_root, block.slot()))) + .chain(iter) + .map(|result| result.map_err(|e| e.into()))) } /// Traverse backwards from `block_root` to find the root of the ancestor block at `slot`. @@ -371,10 +378,10 @@ impl BeaconChain { block_root: Hash256, slot: Slot, ) -> Result, Error> { - Ok(self - .rev_iter_block_roots_from(block_root)? - .find(|(_, ancestor_slot)| *ancestor_slot == slot) - .map(|(ancestor_block_root, _)| ancestor_block_root)) + process_results(self.rev_iter_block_roots_from(block_root)?, |mut iter| { + iter.find(|(_, ancestor_slot)| *ancestor_slot == slot) + .map(|(ancestor_block_root, _)| ancestor_block_root) + }) } /// Iterates across all `(state_root, slot)` pairs from the head of the chain (inclusive) to @@ -386,13 +393,16 @@ impl BeaconChain { /// - Iterator returns `(Hash256, Slot)`. /// - As this iterator starts at the `head` of the chain (viz., the best block), the first slot /// returned may be earlier than the wall-clock slot. - pub fn rev_iter_state_roots(&self) -> Result, Error> { + pub fn rev_iter_state_roots( + &self, + ) -> Result>, Error> { let head = self.head()?; let slot = head.beacon_state.slot; - let iter = StateRootsIterator::owned(self.store.clone(), head.beacon_state); - - Ok(std::iter::once((head.beacon_state_root, slot)).chain(iter)) + let iter = std::iter::once(Ok((head.beacon_state_root, slot))) + .chain(iter) + .map(|result| result.map_err(Into::into)); + Ok(iter) } /// Returns the block at the given slot, if any. Only returns blocks in the canonical chain. @@ -404,10 +414,10 @@ impl BeaconChain { &self, slot: Slot, ) -> Result>, Error> { - let root = self - .rev_iter_block_roots()? - .find(|(_, this_slot)| *this_slot == slot) - .map(|(root, _)| root); + let root = process_results(self.rev_iter_block_roots()?, |mut iter| { + iter.find(|(_, this_slot)| *this_slot == slot) + .map(|(root, _)| root) + })?; if let Some(block_root) = root { Ok(self.store.get_item(&block_root)?) @@ -553,12 +563,12 @@ impl BeaconChain { Ok(state) } Ordering::Less => { - let state_root = self - .rev_iter_state_roots()? - .take_while(|(_root, current_slot)| *current_slot >= slot) - .find(|(_root, current_slot)| *current_slot == slot) - .map(|(root, _slot)| root) - .ok_or_else(|| Error::NoStateForSlot(slot))?; + let state_root = process_results(self.rev_iter_state_roots()?, |iter| { + iter.take_while(|(_, current_slot)| *current_slot >= slot) + .find(|(_, current_slot)| *current_slot == slot) + .map(|(root, _slot)| root) + })? + .ok_or_else(|| Error::NoStateForSlot(slot))?; Ok(self .get_state(&state_root, Some(slot))? @@ -633,10 +643,10 @@ impl BeaconChain { /// /// Returns None if a block doesn't exist at the slot. pub fn root_at_slot(&self, target_slot: Slot) -> Result, Error> { - Ok(self - .rev_iter_block_roots()? - .find(|(_root, slot)| *slot == target_slot) - .map(|(root, _slot)| root)) + process_results(self.rev_iter_block_roots()?, |mut iter| { + iter.find(|(_, slot)| *slot == target_slot) + .map(|(root, _)| root) + }) } /// Returns the block proposer for a given slot. diff --git a/beacon_node/beacon_chain/src/migrate.rs b/beacon_node/beacon_chain/src/migrate.rs index e90ee97a9e0..40be5d41dc3 100644 --- a/beacon_node/beacon_chain/src/migrate.rs +++ b/beacon_node/beacon_chain/src/migrate.rs @@ -84,9 +84,10 @@ pub trait Migrate: Send + Sync + 'static { .ok_or_else(|| BeaconStateError::MissingBeaconBlock(head_hash.into()))? .state_root(); - let iterator = std::iter::once((head_hash, head_state_hash, head_slot)) + let iter = std::iter::once(Ok((head_hash, head_state_hash, head_slot))) .chain(RootsIterator::from_block(Arc::clone(&store), head_hash)?); - for (block_hash, state_hash, slot) in iterator { + for maybe_tuple in iter { + let (block_hash, state_hash, slot) = maybe_tuple?; if slot < old_finalized_slot { // We must assume here any candidate chains include old_finalized_block_hash, // i.e. there aren't any forks starting at a block that is a strict ancestor of diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index 39b8556a71d..6c2f828ff40 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -392,6 +392,7 @@ fn delete_blocks_and_states() { .expect("faulty head state exists"); let states_to_delete = StateRootsIterator::new(store.clone(), &faulty_head_state) + .map(Result::unwrap) .take_while(|(_, slot)| *slot > unforked_blocks) .collect::>(); @@ -409,6 +410,7 @@ fn delete_blocks_and_states() { // Deleting the blocks from the fork should remove them completely let blocks_to_delete = BlockRootsIterator::new(store.clone(), &faulty_head_state) + .map(Result::unwrap) // Extra +1 here accounts for the skipped slot that started this fork .take_while(|(_, slot)| *slot > unforked_blocks + 1) .collect::>(); @@ -424,6 +426,7 @@ fn delete_blocks_and_states() { .chain .rev_iter_state_roots() .expect("rev iter ok") + .map(Result::unwrap) .filter(|(_, slot)| *slot < split_slot); for (state_root, slot) in finalized_states { @@ -659,11 +662,12 @@ fn check_shuffling_compatible( let previous_pivot_slot = (head_state.previous_epoch() - shuffling_lookahead).end_slot(E::slots_per_epoch()); - for (block_root, slot) in harness + for maybe_tuple in harness .chain .rev_iter_block_roots_from(head_block_root) .unwrap() { + let (block_root, slot) = maybe_tuple.unwrap(); // Shuffling is compatible targeting the current epoch, // iff slot is greater than or equal to the current epoch pivot block assert_eq!( @@ -1364,6 +1368,8 @@ fn check_chain_dump(harness: &TestHarness, expected_len: u64) { head.beacon_block_root, &harness.spec, ) + .unwrap() + .map(Result::unwrap) .collect::>(); // Drop the block roots for skipped slots. @@ -1387,6 +1393,7 @@ fn check_iterators(harness: &TestHarness) { .rev_iter_state_roots() .expect("should get iter") .last() + .map(Result::unwrap) .map(|(_, slot)| slot), Some(Slot::new(0)) ); @@ -1396,6 +1403,7 @@ fn check_iterators(harness: &TestHarness) { .rev_iter_block_roots() .expect("should get iter") .last() + .map(Result::unwrap) .map(|(_, slot)| slot), Some(Slot::new(0)) ); diff --git a/beacon_node/beacon_chain/tests/tests.rs b/beacon_node/beacon_chain/tests/tests.rs index 38d6083bf96..5d5c75731df 100644 --- a/beacon_node/beacon_chain/tests/tests.rs +++ b/beacon_node/beacon_chain/tests/tests.rs @@ -73,11 +73,13 @@ fn iterators() { .chain .rev_iter_block_roots() .expect("should get iter") + .map(Result::unwrap) .collect(); let state_roots: Vec<(Hash256, Slot)> = harness .chain .rev_iter_state_roots() .expect("should get iter") + .map(Result::unwrap) .collect(); assert_eq!( diff --git a/beacon_node/network/Cargo.toml b/beacon_node/network/Cargo.toml index e9181e88b2e..ed19c473404 100644 --- a/beacon_node/network/Cargo.toml +++ b/beacon_node/network/Cargo.toml @@ -37,3 +37,4 @@ rlp = "0.4.5" lazy_static = "1.4.0" lighthouse_metrics = { path = "../../common/lighthouse_metrics" } environment = { path = "../../lighthouse/environment" } +itertools = "0.9.0" diff --git a/beacon_node/network/src/router/processor.rs b/beacon_node/network/src/router/processor.rs index 1fd6927c558..4a50eddb05f 100644 --- a/beacon_node/network/src/router/processor.rs +++ b/beacon_node/network/src/router/processor.rs @@ -9,6 +9,7 @@ use beacon_chain::{ }; use eth2_libp2p::rpc::*; use eth2_libp2p::{NetworkGlobals, PeerId, Request, Response}; +use itertools::process_results; use slog::{debug, error, o, trace, warn}; use ssz::Encode; use std::sync::Arc; @@ -357,20 +358,29 @@ impl Processor { // pick out the required blocks, ignoring skip-slots and stepping by the step parameter; let mut last_block_root = None; - let block_roots = forwards_block_root_iter - .take_while(|(_root, slot)| slot.as_u64() < req.start_slot + req.count * req.step) - // map skip slots to None - .map(|(root, _slot)| { - let result = if Some(root) == last_block_root { - None - } else { - Some(root) - }; - last_block_root = Some(root); - result - }) - .step_by(req.step as usize) - .collect::>(); + let maybe_block_roots = process_results(forwards_block_root_iter, |iter| { + iter.take_while(|(_, slot)| slot.as_u64() < req.start_slot + req.count * req.step) + // map skip slots to None + .map(|(root, _)| { + let result = if Some(root) == last_block_root { + None + } else { + Some(root) + }; + last_block_root = Some(root); + result + }) + .step_by(req.step as usize) + .collect::>>() + }); + + let block_roots = match maybe_block_roots { + Ok(block_roots) => block_roots, + Err(e) => { + error!(self.log, "Error during iteration over blocks"; "error" => format!("{:?}", e)); + return; + } + }; // remove all skip slots let block_roots = block_roots diff --git a/beacon_node/rest_api/Cargo.toml b/beacon_node/rest_api/Cargo.toml index f8cdbfe4f4e..122887d71cb 100644 --- a/beacon_node/rest_api/Cargo.toml +++ b/beacon_node/rest_api/Cargo.toml @@ -39,6 +39,7 @@ rayon = "1.3.0" environment = { path = "../../lighthouse/environment" } uhttp_sse = "0.5.1" bus = "2.2.3" +itertools = "0.9.0" [dev-dependencies] assert_matches = "1.3.0" diff --git a/beacon_node/rest_api/src/helpers.rs b/beacon_node/rest_api/src/helpers.rs index b07bb97d588..a8a2d5af77a 100644 --- a/beacon_node/rest_api/src/helpers.rs +++ b/beacon_node/rest_api/src/helpers.rs @@ -5,6 +5,7 @@ use eth2_libp2p::PubsubMessage; use hex; use http::header; use hyper::{Body, Request}; +use itertools::process_results; use network::NetworkMessage; use ssz::Decode; use store::{iter::AncestorIter, Store}; @@ -118,11 +119,14 @@ pub fn block_root_at_slot( beacon_chain: &BeaconChain, target: Slot, ) -> Result, ApiError> { - Ok(beacon_chain - .rev_iter_block_roots()? - .take_while(|(_root, slot)| *slot >= target) - .find(|(_root, slot)| *slot == target) - .map(|(root, _slot)| root)) + Ok(process_results( + beacon_chain.rev_iter_block_roots()?, + |iter| { + iter.take_while(|(_, slot)| *slot >= target) + .find(|(_, slot)| *slot == target) + .map(|(root, _)| root) + }, + )?) } /// Returns a `BeaconState` and it's root in the canonical chain of `beacon_chain` at the given @@ -190,12 +194,15 @@ pub fn state_root_at_slot( // // Iterate through the state roots on the head state to find the root for that // slot. Once the root is found, load it from the database. - Ok(head_state - .try_iter_ancestor_roots(beacon_chain.store.clone()) - .ok_or_else(|| ApiError::ServerError("Failed to create roots iterator".to_string()))? - .find(|(_root, s)| *s == slot) - .map(|(root, _slot)| root) - .ok_or_else(|| ApiError::NotFound(format!("Unable to find state at slot {}", slot)))?) + process_results( + head_state + .try_iter_ancestor_roots(beacon_chain.store.clone()) + .ok_or_else(|| { + ApiError::ServerError("Failed to create roots iterator".to_string()) + })?, + |mut iter| iter.find(|(_, s)| *s == slot).map(|(root, _)| root), + )? + .ok_or_else(|| ApiError::NotFound(format!("Unable to find state at slot {}", slot))) } else { // 4. The request slot is later than the head slot. // diff --git a/beacon_node/rest_api/tests/test.rs b/beacon_node/rest_api/tests/test.rs index 6d62a5fe355..6c80336bb10 100644 --- a/beacon_node/rest_api/tests/test.rs +++ b/beacon_node/rest_api/tests/test.rs @@ -759,6 +759,7 @@ fn get_genesis_state_root() { .expect("should have beacon chain") .rev_iter_state_roots() .expect("should get iter") + .map(Result::unwrap) .find(|(_cur_root, cur_slot)| slot == *cur_slot) .map(|(cur_root, _)| cur_root) .expect("chain should have state root at slot"); @@ -786,6 +787,7 @@ fn get_genesis_block_root() { .expect("should have beacon chain") .rev_iter_block_roots() .expect("should get iter") + .map(Result::unwrap) .find(|(_cur_root, cur_slot)| slot == *cur_slot) .map(|(cur_root, _)| cur_root) .expect("chain should have state root at slot"); diff --git a/beacon_node/store/src/errors.rs b/beacon_node/store/src/errors.rs index 49c4b514461..4512ed70df4 100644 --- a/beacon_node/store/src/errors.rs +++ b/beacon_node/store/src/errors.rs @@ -3,6 +3,8 @@ use crate::hot_cold_store::HotColdDBError; use ssz::DecodeError; use types::{BeaconStateError, Hash256}; +pub type Result = std::result::Result; + #[derive(Debug)] pub enum Error { SszDecodeError(DecodeError), @@ -13,6 +15,7 @@ pub enum Error { DBError { message: String }, RlpError(String), BlockNotFound(Hash256), + NoContinuationData, } impl From for Error { diff --git a/beacon_node/store/src/forwards_iter.rs b/beacon_node/store/src/forwards_iter.rs index b105608a087..2971eeafeac 100644 --- a/beacon_node/store/src/forwards_iter.rs +++ b/beacon_node/store/src/forwards_iter.rs @@ -1,8 +1,9 @@ use crate::chunked_iter::ChunkedVectorIter; use crate::chunked_vector::BlockRoots; +use crate::errors::{Error, Result}; use crate::iter::BlockRootsIterator; use crate::{HotColdDB, Store}; -use slog::error; +use itertools::process_results; use std::sync::Arc; use types::{BeaconState, ChainSpec, EthSpec, Hash256, Slot}; @@ -63,22 +64,26 @@ impl SimpleForwardsBlockRootsIterator { start_slot: Slot, end_state: BeaconState, end_block_root: Hash256, - ) -> Self { + ) -> Result { // Iterate backwards from the end state, stopping at the start slot. - let iter = std::iter::once((end_block_root, end_state.slot)) - .chain(BlockRootsIterator::owned(store, end_state)); - Self { - values: iter.take_while(|(_, slot)| *slot >= start_slot).collect(), - } + let values = process_results( + std::iter::once(Ok((end_block_root, end_state.slot))) + .chain(BlockRootsIterator::owned(store, end_state)), + |iter| { + iter.take_while(|(_, slot)| *slot >= start_slot) + .collect::>() + }, + )?; + Ok(Self { values: values }) } } impl Iterator for SimpleForwardsBlockRootsIterator { - type Item = (Hash256, Slot); + type Item = Result<(Hash256, Slot)>; fn next(&mut self) -> Option { // Pop from the end of the vector to get the block roots in slot-ascending order. - self.values.pop() + Ok(self.values.pop()).transpose() } } @@ -89,12 +94,12 @@ impl HybridForwardsBlockRootsIterator { end_state: BeaconState, end_block_root: Hash256, spec: &ChainSpec, - ) -> Self { + ) -> Result { use HybridForwardsBlockRootsIterator::*; let latest_restore_point_slot = store.get_latest_restore_point_slot(); - if start_slot < latest_restore_point_slot { + let result = if start_slot < latest_restore_point_slot { PreFinalization { iter: Box::new(FrozenForwardsBlockRootsIterator::new( store, @@ -111,16 +116,14 @@ impl HybridForwardsBlockRootsIterator { start_slot, end_state, end_block_root, - ), + )?, } - } - } -} + }; -impl Iterator for HybridForwardsBlockRootsIterator { - type Item = (Hash256, Slot); + Ok(result) + } - fn next(&mut self) -> Option { + fn do_next(&mut self) -> Result> { use HybridForwardsBlockRootsIterator::*; match self { @@ -129,19 +132,13 @@ impl Iterator for HybridForwardsBlockRootsIterator { continuation_data, } => { match iter.next() { - Some(x) => Some(x), + Some(x) => Ok(Some(x)), // Once the pre-finalization iterator is consumed, transition // to a post-finalization iterator beginning from the last slot // of the pre iterator. None => { let (end_state, end_block_root) = - continuation_data.take().or_else(|| { - error!( - iter.inner.store.log, - "HybridForwardsBlockRootsIterator: logic error" - ); - None - })?; + continuation_data.take().ok_or(Error::NoContinuationData)?; *self = PostFinalization { iter: SimpleForwardsBlockRootsIterator::new( @@ -149,13 +146,21 @@ impl Iterator for HybridForwardsBlockRootsIterator { Slot::from(iter.inner.end_vindex), end_state, end_block_root, - ), + )?, }; - self.next() + self.do_next() } } } - PostFinalization { iter } => iter.next(), + PostFinalization { iter } => iter.next().transpose(), } } } + +impl Iterator for HybridForwardsBlockRootsIterator { + type Item = Result<(Hash256, Slot)>; + + fn next(&mut self) -> Option { + self.do_next().transpose() + } +} diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index ec93d97563a..dfe98985b86 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -190,7 +190,7 @@ impl Store for HotColdDB { end_state: BeaconState, end_block_root: Hash256, spec: &ChainSpec, - ) -> Self::ForwardsBlockRootsIterator { + ) -> Result { HybridForwardsBlockRootsIterator::new(store, start_slot, end_state, end_block_root, spec) } @@ -708,7 +708,11 @@ pub fn process_finalization( let state_root_iter = StateRootsIterator::new(store.clone(), frozen_head); let mut to_delete = vec![]; - for (state_root, slot) in state_root_iter.take_while(|&(_, slot)| slot >= current_split_slot) { + for maybe_pair in state_root_iter.take_while(|result| match result { + Ok((_, slot)) => slot >= ¤t_split_slot, + Err(_) => true, + }) { + let (state_root, slot) = maybe_pair?; if slot % store.config.slots_per_restore_point == 0 { let state: BeaconState = get_full_state(&store.hot_db, &state_root)? .ok_or_else(|| HotColdDBError::MissingStateToFreeze(state_root))?; diff --git a/beacon_node/store/src/iter.rs b/beacon_node/store/src/iter.rs index 58ec2009483..0dcb69bff0c 100644 --- a/beacon_node/store/src/iter.rs +++ b/beacon_node/store/src/iter.rs @@ -69,12 +69,12 @@ impl<'a, T: EthSpec, U: Store> StateRootsIterator<'a, T, U> { } impl<'a, T: EthSpec, U: Store> Iterator for StateRootsIterator<'a, T, U> { - type Item = (Hash256, Slot); + type Item = Result<(Hash256, Slot), Error>; fn next(&mut self) -> Option { self.inner .next() - .map(|(_, state_root, slot)| (state_root, slot)) + .map(|result| result.map(|(_, state_root, slot)| (state_root, slot))) } } @@ -115,12 +115,12 @@ impl<'a, T: EthSpec, U: Store> BlockRootsIterator<'a, T, U> { } impl<'a, T: EthSpec, U: Store> Iterator for BlockRootsIterator<'a, T, U> { - type Item = (Hash256, Slot); + type Item = Result<(Hash256, Slot), Error>; fn next(&mut self) -> Option { self.inner .next() - .map(|(block_root, _, slot)| (block_root, slot)) + .map(|result| result.map(|(block_root, _, slot)| (block_root, slot))) } } @@ -167,15 +167,10 @@ impl<'a, T: EthSpec, U: Store> RootsIterator<'a, T, U> { .ok_or_else(|| BeaconStateError::MissingBeaconState(block.state_root().into()))?; Ok(Self::owned(store, state)) } -} -impl<'a, T: EthSpec, U: Store> Iterator for RootsIterator<'a, T, U> { - /// (block_root, state_root, slot) - type Item = (Hash256, Hash256, Slot); - - fn next(&mut self) -> Option { + fn do_next(&mut self) -> Result, Error> { if self.slot == 0 || self.slot > self.beacon_state.slot { - return None; + return Ok(None); } self.slot -= 1; @@ -184,7 +179,7 @@ impl<'a, T: EthSpec, U: Store> Iterator for RootsIterator<'a, T, U> { self.beacon_state.get_block_root(self.slot), self.beacon_state.get_state_root(self.slot), ) { - (Ok(block_root), Ok(state_root)) => Some((*block_root, *state_root, self.slot)), + (Ok(block_root), Ok(state_root)) => Ok(Some((*block_root, *state_root, self.slot))), (Err(BeaconStateError::SlotOutOfBounds), Err(BeaconStateError::SlotOutOfBounds)) => { // Read a `BeaconState` from the store that has access to prior historical roots. let beacon_state = @@ -192,16 +187,26 @@ impl<'a, T: EthSpec, U: Store> Iterator for RootsIterator<'a, T, U> { self.beacon_state = Cow::Owned(beacon_state); - let block_root = *self.beacon_state.get_block_root(self.slot).ok()?; - let state_root = *self.beacon_state.get_state_root(self.slot).ok()?; + let block_root = *self.beacon_state.get_block_root(self.slot)?; + let state_root = *self.beacon_state.get_state_root(self.slot)?; - Some((block_root, state_root, self.slot)) + Ok(Some((block_root, state_root, self.slot))) } - _ => None, + (Err(e), _) => Err(e.into()), + (Ok(_), Err(e)) => Err(e.into()), } } } +impl<'a, T: EthSpec, U: Store> Iterator for RootsIterator<'a, T, U> { + /// (block_root, state_root, slot) + type Item = Result<(Hash256, Hash256, Slot), Error>; + + fn next(&mut self) -> Option { + self.do_next().transpose() + } +} + /// Block iterator that uses the `parent_root` of each block to backtrack. pub struct ParentRootBlockIterator<'a, E: EthSpec, S: Store> { store: &'a S, @@ -263,14 +268,22 @@ impl<'a, T: EthSpec, U: Store> BlockIterator<'a, T, U> { roots: BlockRootsIterator::owned(store, beacon_state), } } + + fn do_next(&mut self) -> Result>, Error> { + if let Some(result) = self.roots.next() { + let (root, _slot) = result?; + self.roots.inner.store.get_block(&root) + } else { + Ok(None) + } + } } impl<'a, T: EthSpec, U: Store> Iterator for BlockIterator<'a, T, U> { - type Item = SignedBeaconBlock; + type Item = Result, Error>; fn next(&mut self) -> Option { - let (root, _slot) = self.roots.next()?; - self.roots.inner.store.get_block(&root).ok()? + self.do_next().transpose() } } @@ -278,14 +291,16 @@ impl<'a, T: EthSpec, U: Store> Iterator for BlockIterator<'a, T, U> { fn next_historical_root_backtrack_state>( store: &S, current_state: &BeaconState, -) -> Option> { +) -> Result, Error> { // For compatibility with the freezer database's restore points, we load a state at // a restore point slot (thus avoiding replaying blocks). In the case where we're // not frozen, this just means we might not jump back by the maximum amount on // our first jump (i.e. at most 1 extra state load). let new_state_slot = slot_of_prev_restore_point::(current_state.slot); - let new_state_root = current_state.get_state_root(new_state_slot).ok()?; - store.get_state(new_state_root, Some(new_state_slot)).ok()? + let new_state_root = current_state.get_state_root(new_state_slot)?; + Ok(store + .get_state(new_state_root, Some(new_state_slot))? + .ok_or_else(|| BeaconStateError::MissingBeaconState((*new_state_root).into()))?) } /// Compute the slot of the last guaranteed restore point in the freezer database. @@ -337,11 +352,12 @@ mod test { let iter = BlockRootsIterator::new(store, &state_b); assert!( - iter.clone().any(|(_root, slot)| slot == 0), + iter.clone() + .any(|result| result.map(|(_root, slot)| slot == 0).unwrap()), "iter should contain zero slot" ); - let mut collected: Vec<(Hash256, Slot)> = iter.collect(); + let mut collected: Vec<(Hash256, Slot)> = iter.collect::, _>>().unwrap(); collected.reverse(); let expected_len = 2 * MainnetEthSpec::slots_per_historical_root(); @@ -386,11 +402,12 @@ mod test { let iter = StateRootsIterator::new(store, &state_b); assert!( - iter.clone().any(|(_root, slot)| slot == 0), + iter.clone() + .any(|result| result.map(|(_root, slot)| slot == 0).unwrap()), "iter should contain zero slot" ); - let mut collected: Vec<(Hash256, Slot)> = iter.collect(); + let mut collected: Vec<(Hash256, Slot)> = iter.collect::, _>>().unwrap(); collected.reverse(); let expected_len = MainnetEthSpec::slots_per_historical_root() * 2; diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index 55489679e7b..3abdae423bc 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -13,7 +13,7 @@ extern crate lazy_static; pub mod chunked_iter; pub mod chunked_vector; pub mod config; -mod errors; +pub mod errors; mod forwards_iter; pub mod hot_cold_store; mod impls; @@ -99,7 +99,7 @@ pub trait ItemStore: KeyValueStore + Sync + Send + Sized + 'stati /// columns. A simple column implementation might involve prefixing a key with some bytes unique to /// each column. pub trait Store: Sync + Send + Sized + 'static { - type ForwardsBlockRootsIterator: Iterator; + type ForwardsBlockRootsIterator: Iterator>; /// Store a block in the store. fn put_block(&self, block_root: &Hash256, block: SignedBeaconBlock) -> Result<(), Error>; @@ -146,7 +146,7 @@ pub trait Store: Sync + Send + Sized + 'static { end_state: BeaconState, end_block_root: Hash256, spec: &ChainSpec, - ) -> Self::ForwardsBlockRootsIterator; + ) -> Result; fn load_epoch_boundary_state( &self, diff --git a/beacon_node/store/src/memory_store.rs b/beacon_node/store/src/memory_store.rs index 45918e74e4a..3465e385b4c 100644 --- a/beacon_node/store/src/memory_store.rs +++ b/beacon_node/store/src/memory_store.rs @@ -149,7 +149,7 @@ impl Store for MemoryStore { end_state: BeaconState, end_block_root: Hash256, _: &ChainSpec, - ) -> Self::ForwardsBlockRootsIterator { + ) -> Result { SimpleForwardsBlockRootsIterator::new(store, start_slot, end_state, end_block_root) }