Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Harden against IO failures using DB atomicity #1180

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use store::iter::{
BlockRootsIterator, ParentRootBlockIterator, ReverseBlockRootIterator,
ReverseStateRootIterator, StateRootsIterator,
};
use store::{Error as DBError, Store};
use store::{Error as DBError, Store, StoreOp};
use types::*;

// Text included in blocks.
Expand Down Expand Up @@ -1514,13 +1514,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
intermediate_states.commit(&*self.store)?;

// Store the block and state.
// NOTE: we store the block *after* the state to guard against inconsistency in the event of
// a crash, as states are usually looked up from blocks, not the other way around. A better
// solution would be to use a database transaction (once our choice of database and API
// settles down).
// See: https://github.com/sigp/lighthouse/issues/692
self.store.put_state(&block.state_root, &state)?;
self.store.put_block(&block_root, signed_block.clone())?;
let db_batch: Vec<StoreOp<T::EthSpec>> = vec![
StoreOp::PutState(block.state_root.into(), &state),
StoreOp::PutBlock(block_root.into(), signed_block.clone()),
];
self.store.do_atomically(&db_batch)?;

let parent_root = block.parent_root;
let slot = block.slot;
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/src/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ pub trait Migrate<S: Store<E>, E: EthSpec>: Send + Sync + 'static {
}
}

let batch: Vec<StoreOp> = abandoned_blocks
let batch: Vec<StoreOp<E>> = abandoned_blocks
.into_iter()
.map(|block_hash| StoreOp::DeleteBlock(block_hash))
.chain(
Expand Down
8 changes: 7 additions & 1 deletion beacon_node/store/src/hot_cold_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,11 +204,17 @@ impl<E: EthSpec> Store<E> for HotColdDB<E> {
Ok(())
}

fn do_atomically(&self, batch: &[StoreOp]) -> Result<(), Error> {
fn do_atomically(&self, batch: &[StoreOp<E>]) -> Result<(), Error> {
let mut guard = self.block_cache.lock();
self.hot_db.do_atomically(batch)?;
for op in batch {
match op {
StoreOp::PutBlock(block_hash, block) => {
guard.put((*block_hash).into(), block.clone());
}

StoreOp::PutState(_, _) => (),

StoreOp::DeleteBlock(block_hash) => {
let untyped_hash: Hash256 = (*block_hash).into();
guard.pop(&untyped_hash);
Expand Down
25 changes: 23 additions & 2 deletions beacon_node/store/src/leveldb_store.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use super::*;
use crate::forwards_iter::SimpleForwardsBlockRootsIterator;
use crate::impls::beacon_state::{get_full_state, store_full_state};
use crate::impls::beacon_state::{get_full_state, store_full_state, StorageContainer};
use crate::metrics;
use db_key::Key;
use leveldb::database::batch::{Batch, Writebatch};
use leveldb::database::kv::KV;
use leveldb::database::Database;
use leveldb::error::Error as LevelDBError;
use leveldb::options::{Options, ReadOptions, WriteOptions};
use ssz::Encode;
use std::marker::PhantomData;
use std::path::Path;

Expand Down Expand Up @@ -147,10 +148,30 @@ impl<E: EthSpec> Store<E> for LevelDB<E> {
SimpleForwardsBlockRootsIterator::new(store, start_slot, end_state, end_block_root)
}

fn do_atomically(&self, ops_batch: &[StoreOp]) -> Result<(), Error> {
fn do_atomically(&self, ops_batch: &[StoreOp<E>]) -> Result<(), Error> {
let mut leveldb_batch = Writebatch::new();
for op in ops_batch {
match op {
StoreOp::PutBlock(block_hash, block) => {
let untyped_hash: Hash256 = (*block_hash).into();
let key = Self::get_key_for_col(
DBColumn::BeaconBlock.into(),
untyped_hash.as_bytes(),
);
let value = block.as_store_bytes();
leveldb_batch.put(key, &value);
}

StoreOp::PutState(state_hash, state) => {
let untyped_hash: Hash256 = (*state_hash).into();
let key = Self::get_key_for_col(
DBColumn::BeaconState.into(),
untyped_hash.as_bytes(),
);
let value = StorageContainer::new(state).as_ssz_bytes();
leveldb_batch.put(key, &value);
}
Comment on lines +165 to +173
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

State storage needs to take into account the HotStateSummary here, as in the non-atomic case:

/// Store a post-finalization state efficiently in the hot database.
///
/// On an epoch boundary, store a full state. On an intermediate slot, store
/// just a backpointer to the nearest epoch boundary.
pub fn store_hot_state(
&self,
state_root: &Hash256,
state: &BeaconState<E>,
) -> Result<(), Error> {
// On the epoch boundary, store the full state.
if state.slot % E::slots_per_epoch() == 0 {
trace!(
self.log,
"Storing full state on epoch boundary";
"slot" => state.slot.as_u64(),
"state_root" => format!("{:?}", state_root)
);
store_full_state(&self.hot_db, state_root, &state)?;
}
// Store a summary of the state.
// We store one even for the epoch boundary states, as we may need their slots
// when doing a look up by state root.
self.put_state_summary(state_root, HotStateSummary::new(state_root, state)?)?;
Ok(())
}

The compositionality of the HotColdDB consisting of two sub-stores (hot_db and cold_db) with the same API is a lie, and is another reason why the Store trait should be refactored IMO. I don't think Store should be implemented for LevelDB, because we never use just one LevelDB database as the store any more, and it creates awkwardness when a Store API has to be implemented on the underlying store with assumptions about that API only getting used for the hot or cold DB (in this case we're assuming hot). I think it makes sense to put all this logic in the parent impl of do_atomically (i.e. HotColdDB), and stub out the impl for LevelDB so that it returns an error.


StoreOp::DeleteBlock(block_hash) => {
let untyped_hash: Hash256 = (*block_hash).into();
let key = Self::get_key_for_col(
Expand Down
6 changes: 4 additions & 2 deletions beacon_node/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ pub trait Store<E: EthSpec>: Sync + Send + Sized + 'static {
fn put_state(&self, state_root: &Hash256, state: &BeaconState<E>) -> Result<(), Error>;

/// Execute either all of the operations in `batch` or none at all, returning an error.
fn do_atomically(&self, batch: &[StoreOp]) -> Result<(), Error>;
fn do_atomically(&self, batch: &[StoreOp<E>]) -> Result<(), Error>;

/// Store a state summary in the store.
// NOTE: this is a hack for the HotColdDb, we could consider splitting this
Expand Down Expand Up @@ -185,7 +185,9 @@ pub trait Store<E: EthSpec>: Sync + Send + Sized + 'static {

/// Reified key-value storage operation. Helps in modifying the storage atomically.
/// See also https://github.com/sigp/lighthouse/issues/692
pub enum StoreOp {
pub enum StoreOp<'a, E: EthSpec> {
PutBlock(SignedBeaconBlockHash, SignedBeaconBlock<E>),
PutState(BeaconStateHash, &'a BeaconState<E>),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm on board with using a reference here, but with state summaries existing it might make sense to mix in the two variants from StateBatch:

// Allow owned states for the storage of intermediate states in block processing (uncommon)
PutState(BeaconStateHash, Cow<'a, BeaconState<E>>),
PutStateSummary(BeaconStateHash, HotStateSummary),

See https://github.com/sigp/lighthouse/blob/master/beacon_node/store/src/state_batch.rs

StateBatch could then be deleted of course.

DeleteBlock(SignedBeaconBlockHash),
DeleteState(BeaconStateHash, Slot),
}
Expand Down
17 changes: 16 additions & 1 deletion beacon_node/store/src/memory_store.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::{DBColumn, Error, Store, StoreOp};
use crate::forwards_iter::SimpleForwardsBlockRootsIterator;
use crate::impls::beacon_state::{get_full_state, store_full_state};
use crate::SimpleStoreItem;
use parking_lot::RwLock;
use std::collections::HashMap;
use std::marker::PhantomData;
Expand Down Expand Up @@ -89,9 +90,23 @@ impl<E: EthSpec> Store<E> for MemoryStore<E> {
get_full_state(self, state_root)
}

fn do_atomically(&self, batch: &[StoreOp]) -> Result<(), Error> {
fn do_atomically(&self, batch: &[StoreOp<E>]) -> Result<(), Error> {
for op in batch {
match op {
StoreOp::PutBlock(block_hash, block) => {
let untyped_hash: Hash256 = (*block_hash).into();
let value = block.as_store_bytes();
self.put_bytes(
DBColumn::BeaconBlock.into(),
untyped_hash.as_bytes(),
&value,
)?;
}

StoreOp::PutState(state_hash, state) => {
self.put_state(&(*state_hash).into(), state)?;
}

StoreOp::DeleteBlock(block_hash) => {
let untyped_hash: Hash256 = (*block_hash).into();
self.key_delete(DBColumn::BeaconBlock.into(), untyped_hash.as_bytes())?;
Expand Down