Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Write new blocks and states to the database atomically #1285

Merged
merged 9 commits into from
Jul 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 9 additions & 14 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use std::io::prelude::*;
use std::sync::Arc;
use std::time::{Duration, Instant};
use store::iter::{BlockRootsIterator, ParentRootBlockIterator, StateRootsIterator};
use store::{Error as DBError, HotColdDB};
use store::{Error as DBError, HotColdDB, StoreOp};
use types::*;

pub type ForkChoiceError = fork_choice::Error<crate::ForkChoiceStoreError>;
Expand Down Expand Up @@ -1422,8 +1422,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let block_root = fully_verified_block.block_root;
let state = fully_verified_block.state;
let parent_block = fully_verified_block.parent_block;
let intermediate_states = fully_verified_block.intermediate_states;
let current_slot = self.slot()?;
let mut ops = fully_verified_block.intermediate_states;

let attestation_observation_timer =
metrics::start_timer(&metrics::BLOCK_PROCESSING_ATTESTATION_OBSERVATION);
Expand Down Expand Up @@ -1515,18 +1515,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

let db_write_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_DB_WRITE);

// Store all the states between the parent block state and this block's slot before storing
// the final state.
intermediate_states.commit(&*self.store)?;

// Store the block and state.
// NOTE: we store the block *after* the state to guard against inconsistency in the event of
// a crash, as states are usually looked up from blocks, not the other way around. A better
// solution would be to use a database transaction (once our choice of database and API
// settles down).
// See: https://github.com/sigp/lighthouse/issues/692
self.store.put_state(&block.state_root, &state)?;
self.store.put_block(&block_root, signed_block.clone())?;
// Store all the states between the parent block state and this block's slot, the block and state.
ops.push(StoreOp::PutBlock(block_root.into(), signed_block.clone()));
ops.push(StoreOp::PutState(
block.state_root.into(),
Cow::Borrowed(&state),
));
self.store.do_atomically(ops)?;

// The fork choice write-lock is dropped *after* the on-disk database has been updated.
// This prevents inconsistency between the two at the expense of concurrency.
Expand Down
21 changes: 15 additions & 6 deletions beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ use std::borrow::Cow;
use std::convert::TryFrom;
use std::fs;
use std::io::Write;
use store::{Error as DBError, StateBatch};
use store::{Error as DBError, HotStateSummary, StoreOp};
use tree_hash::TreeHash;
use types::{
BeaconBlock, BeaconState, BeaconStateError, ChainSpec, CloneConfig, EthSpec, Hash256,
Expand Down Expand Up @@ -263,12 +263,12 @@ pub struct SignatureVerifiedBlock<T: BeaconChainTypes> {
/// Note: a `FullyVerifiedBlock` is not _forever_ valid to be imported, it may later become invalid
/// due to finality or some other event. A `FullyVerifiedBlock` should be imported into the
/// `BeaconChain` immediately after it is instantiated.
pub struct FullyVerifiedBlock<T: BeaconChainTypes> {
pub struct FullyVerifiedBlock<'a, T: BeaconChainTypes> {
pub block: SignedBeaconBlock<T::EthSpec>,
pub block_root: Hash256,
pub state: BeaconState<T::EthSpec>,
pub parent_block: SignedBeaconBlock<T::EthSpec>,
pub intermediate_states: StateBatch<T::EthSpec>,
pub intermediate_states: Vec<StoreOp<'a, T::EthSpec>>,
}

/// Implemented on types that can be converted into a `FullyVerifiedBlock`.
Expand Down Expand Up @@ -506,7 +506,7 @@ impl<T: BeaconChainTypes> IntoFullyVerifiedBlock<T> for SignedBeaconBlock<T::Eth
}
}

impl<T: BeaconChainTypes> FullyVerifiedBlock<T> {
impl<'a, T: BeaconChainTypes> FullyVerifiedBlock<'a, T> {
/// Instantiates `Self`, a wrapper that indicates that the given `block` is fully valid. See
/// the struct-level documentation for more information.
///
Expand Down Expand Up @@ -552,7 +552,7 @@ impl<T: BeaconChainTypes> FullyVerifiedBlock<T> {

// Keep a batch of any states that were "skipped" (block-less) in between the parent state
// slot and the block slot. These will be stored in the database.
let mut intermediate_states = StateBatch::new();
let mut intermediate_states: Vec<StoreOp<T::EthSpec>> = Vec::new();

// The block must have a higher slot than its parent.
if block.slot() <= parent.beacon_state.slot {
Expand All @@ -575,7 +575,16 @@ impl<T: BeaconChainTypes> FullyVerifiedBlock<T> {
// Computing the state root here is time-equivalent to computing it during slot
// processing, but we get early access to it.
let state_root = state.update_tree_hash_cache()?;
intermediate_states.add_state(state_root, &state)?;

let op = if state.slot % T::EthSpec::slots_per_epoch() == 0 {
StoreOp::PutState(state_root.into(), Cow::Owned(state.clone()))
} else {
StoreOp::PutStateSummary(
state_root.into(),
HotStateSummary::new(&state_root, &state)?,
)
};
intermediate_states.push(op);
state_root
};

Expand Down
4 changes: 2 additions & 2 deletions beacon_node/beacon_chain/src/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ pub trait Migrate<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>:
}
}

let batch: Vec<StoreOp> = abandoned_blocks
let batch: Vec<StoreOp<E>> = abandoned_blocks
.into_iter()
.map(|block_hash| StoreOp::DeleteBlock(block_hash))
.chain(
Expand All @@ -161,7 +161,7 @@ pub trait Migrate<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>:
.map(|(slot, state_hash)| StoreOp::DeleteState(state_hash, slot)),
)
.collect();
store.do_atomically(&batch)?;
store.do_atomically(batch)?;
for head_hash in abandoned_heads.into_iter() {
head_tracker.remove_head(head_hash);
}
Expand Down
20 changes: 14 additions & 6 deletions beacon_node/store/src/chunked_vector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ pub trait Field<E: EthSpec>: Copy {
fn check_and_store_genesis_value<S: KeyValueStore<E>>(
store: &S,
value: Self::Value,
ops: &mut Vec<KeyValueStoreOp>,
) -> Result<(), Error> {
let key = &genesis_value_key()[..];

Expand All @@ -217,7 +218,9 @@ pub trait Field<E: EthSpec>: Copy {
Ok(())
}
} else {
Chunk::new(vec![value]).store(store, Self::column(), &genesis_value_key()[..])
let chunk = Chunk::new(vec![value]);
chunk.store(Self::column(), &genesis_value_key()[..], ops)?;
Ok(())
}
}

Expand Down Expand Up @@ -332,6 +335,7 @@ pub fn store_updated_vector<F: Field<E>, E: EthSpec, S: KeyValueStore<E>>(
store: &S,
state: &BeaconState<E>,
spec: &ChainSpec,
ops: &mut Vec<KeyValueStoreOp>,
) -> Result<(), Error> {
let chunk_size = F::chunk_size();
let (start_vindex, end_vindex) = F::start_and_end_vindex(state.slot, spec);
Expand All @@ -341,7 +345,7 @@ pub fn store_updated_vector<F: Field<E>, E: EthSpec, S: KeyValueStore<E>>(
// Store the genesis value if we have access to it, and it hasn't been stored already.
if F::slot_needs_genesis_value(state.slot, spec) {
let genesis_value = F::extract_genesis_value(state, spec)?;
F::check_and_store_genesis_value(store, genesis_value)?;
F::check_and_store_genesis_value(store, genesis_value, ops)?;
}

// Start by iterating backwards from the last chunk, storing new chunks in the database.
Expand All @@ -355,6 +359,7 @@ pub fn store_updated_vector<F: Field<E>, E: EthSpec, S: KeyValueStore<E>>(
store,
state,
spec,
ops,
)?;

// If the previous `store_range` did not check the entire range, it may be the case that the
Expand All @@ -369,6 +374,7 @@ pub fn store_updated_vector<F: Field<E>, E: EthSpec, S: KeyValueStore<E>>(
store,
state,
spec,
ops,
)?;
}

Expand All @@ -383,6 +389,7 @@ fn store_range<F, E, S, I>(
store: &S,
state: &BeaconState<E>,
spec: &ChainSpec,
ops: &mut Vec<KeyValueStoreOp>,
) -> Result<bool, Error>
where
F: Field<E>,
Expand All @@ -409,7 +416,7 @@ where
return Ok(false);
}

new_chunk.store(store, F::column(), chunk_key)?;
new_chunk.store(F::column(), chunk_key, ops)?;
}

Ok(true)
Expand Down Expand Up @@ -585,13 +592,14 @@ where
.transpose()
}

pub fn store<S: KeyValueStore<E>, E: EthSpec>(
pub fn store(
&self,
store: &S,
column: DBColumn,
key: &[u8],
ops: &mut Vec<KeyValueStoreOp>,
) -> Result<(), Error> {
store.put_bytes(column.into(), key, &self.encode()?)?;
let db_key = get_key_for_col(column.into(), key);
ops.push(KeyValueStoreOp::PutKeyValue(db_key, self.encode()?));
Ok(())
}

Expand Down
Loading