diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 544e6e006eb..f437ef966d6 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -42,7 +42,7 @@ use store::iter::{ BlockRootsIterator, ParentRootBlockIterator, ReverseBlockRootIterator, ReverseStateRootIterator, StateRootsIterator, }; -use store::{Error as DBError, Store}; +use store::{Error as DBError, Store, StoreOp}; use types::*; // Text included in blocks. @@ -1514,13 +1514,11 @@ impl BeaconChain { intermediate_states.commit(&*self.store)?; // Store the block and state. - // NOTE: we store the block *after* the state to guard against inconsistency in the event of - // a crash, as states are usually looked up from blocks, not the other way around. A better - // solution would be to use a database transaction (once our choice of database and API - // settles down). - // See: https://github.com/sigp/lighthouse/issues/692 - self.store.put_state(&block.state_root, &state)?; - self.store.put_block(&block_root, signed_block.clone())?; + let db_batch: Vec> = vec![ + StoreOp::PutState(block.state_root.into(), &state), + StoreOp::PutBlock(block_root.into(), signed_block.clone()), + ]; + self.store.do_atomically(&db_batch)?; let parent_root = block.parent_root; let slot = block.slot; diff --git a/beacon_node/beacon_chain/src/migrate.rs b/beacon_node/beacon_chain/src/migrate.rs index d9d541be251..3e283c036b0 100644 --- a/beacon_node/beacon_chain/src/migrate.rs +++ b/beacon_node/beacon_chain/src/migrate.rs @@ -143,7 +143,7 @@ pub trait Migrate, E: EthSpec>: Send + Sync + 'static { } } - let batch: Vec = abandoned_blocks + let batch: Vec> = abandoned_blocks .into_iter() .map(|block_hash| StoreOp::DeleteBlock(block_hash)) .chain( diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index fdbad31d68e..19d17e85b55 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -204,11 +204,17 @@ impl Store for HotColdDB { Ok(()) } - fn do_atomically(&self, batch: &[StoreOp]) -> Result<(), Error> { + fn do_atomically(&self, batch: &[StoreOp]) -> Result<(), Error> { let mut guard = self.block_cache.lock(); self.hot_db.do_atomically(batch)?; for op in batch { match op { + StoreOp::PutBlock(block_hash, block) => { + guard.put((*block_hash).into(), block.clone()); + } + + StoreOp::PutState(_, _) => (), + StoreOp::DeleteBlock(block_hash) => { let untyped_hash: Hash256 = (*block_hash).into(); guard.pop(&untyped_hash); diff --git a/beacon_node/store/src/leveldb_store.rs b/beacon_node/store/src/leveldb_store.rs index 32fe5bbc91f..dcc35f4aaf6 100644 --- a/beacon_node/store/src/leveldb_store.rs +++ b/beacon_node/store/src/leveldb_store.rs @@ -1,6 +1,6 @@ use super::*; use crate::forwards_iter::SimpleForwardsBlockRootsIterator; -use crate::impls::beacon_state::{get_full_state, store_full_state}; +use crate::impls::beacon_state::{get_full_state, store_full_state, StorageContainer}; use crate::metrics; use db_key::Key; use leveldb::database::batch::{Batch, Writebatch}; @@ -8,6 +8,7 @@ use leveldb::database::kv::KV; use leveldb::database::Database; use leveldb::error::Error as LevelDBError; use leveldb::options::{Options, ReadOptions, WriteOptions}; +use ssz::Encode; use std::marker::PhantomData; use std::path::Path; @@ -147,10 +148,30 @@ impl Store for LevelDB { SimpleForwardsBlockRootsIterator::new(store, start_slot, end_state, end_block_root) } - fn do_atomically(&self, ops_batch: &[StoreOp]) -> Result<(), Error> { + fn do_atomically(&self, ops_batch: &[StoreOp]) -> Result<(), Error> { let mut leveldb_batch = Writebatch::new(); for op in ops_batch { match op { + StoreOp::PutBlock(block_hash, block) => { + let untyped_hash: Hash256 = (*block_hash).into(); + let key = Self::get_key_for_col( + DBColumn::BeaconBlock.into(), + untyped_hash.as_bytes(), + ); + let value = block.as_store_bytes(); + leveldb_batch.put(key, &value); + } + + StoreOp::PutState(state_hash, state) => { + let untyped_hash: Hash256 = (*state_hash).into(); + let key = Self::get_key_for_col( + DBColumn::BeaconState.into(), + untyped_hash.as_bytes(), + ); + let value = StorageContainer::new(state).as_ssz_bytes(); + leveldb_batch.put(key, &value); + } + StoreOp::DeleteBlock(block_hash) => { let untyped_hash: Hash256 = (*block_hash).into(); let key = Self::get_key_for_col( diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index 494c8977fd9..215a6d998cc 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -97,7 +97,7 @@ pub trait Store: Sync + Send + Sized + 'static { fn put_state(&self, state_root: &Hash256, state: &BeaconState) -> Result<(), Error>; /// Execute either all of the operations in `batch` or none at all, returning an error. - fn do_atomically(&self, batch: &[StoreOp]) -> Result<(), Error>; + fn do_atomically(&self, batch: &[StoreOp]) -> Result<(), Error>; /// Store a state summary in the store. // NOTE: this is a hack for the HotColdDb, we could consider splitting this @@ -185,7 +185,9 @@ pub trait Store: Sync + Send + Sized + 'static { /// Reified key-value storage operation. Helps in modifying the storage atomically. /// See also https://github.com/sigp/lighthouse/issues/692 -pub enum StoreOp { +pub enum StoreOp<'a, E: EthSpec> { + PutBlock(SignedBeaconBlockHash, SignedBeaconBlock), + PutState(BeaconStateHash, &'a BeaconState), DeleteBlock(SignedBeaconBlockHash), DeleteState(BeaconStateHash, Slot), } diff --git a/beacon_node/store/src/memory_store.rs b/beacon_node/store/src/memory_store.rs index 01483a9050a..e8023ddbd28 100644 --- a/beacon_node/store/src/memory_store.rs +++ b/beacon_node/store/src/memory_store.rs @@ -1,6 +1,7 @@ use super::{DBColumn, Error, Store, StoreOp}; use crate::forwards_iter::SimpleForwardsBlockRootsIterator; use crate::impls::beacon_state::{get_full_state, store_full_state}; +use crate::SimpleStoreItem; use parking_lot::RwLock; use std::collections::HashMap; use std::marker::PhantomData; @@ -89,9 +90,23 @@ impl Store for MemoryStore { get_full_state(self, state_root) } - fn do_atomically(&self, batch: &[StoreOp]) -> Result<(), Error> { + fn do_atomically(&self, batch: &[StoreOp]) -> Result<(), Error> { for op in batch { match op { + StoreOp::PutBlock(block_hash, block) => { + let untyped_hash: Hash256 = (*block_hash).into(); + let value = block.as_store_bytes(); + self.put_bytes( + DBColumn::BeaconBlock.into(), + untyped_hash.as_bytes(), + &value, + )?; + } + + StoreOp::PutState(state_hash, state) => { + self.put_state(&(*state_hash).into(), state)?; + } + StoreOp::DeleteBlock(block_hash) => { let untyped_hash: Hash256 = (*block_hash).into(); self.key_delete(DBColumn::BeaconBlock.into(), untyped_hash.as_bytes())?;