From b5e6ad94fcaea8dac175c2d44704c5e369ffb1ab Mon Sep 17 00:00:00 2001 From: Adam Szkoda Date: Fri, 19 Jun 2020 14:36:55 +0200 Subject: [PATCH 1/9] Mostly atomic put_state() --- beacon_node/store/src/chunked_vector.rs | 54 ++++++++++------- beacon_node/store/src/hot_cold_store.rs | 67 ++++++++++++++------- beacon_node/store/src/impls/beacon_state.rs | 24 +++----- beacon_node/store/src/leveldb_store.rs | 4 ++ beacon_node/store/src/lib.rs | 1 + beacon_node/store/src/memory_store.rs | 4 ++ 6 files changed, 96 insertions(+), 58 deletions(-) diff --git a/beacon_node/store/src/chunked_vector.rs b/beacon_node/store/src/chunked_vector.rs index eec16ba5526..e2c73ca97a0 100644 --- a/beacon_node/store/src/chunked_vector.rs +++ b/beacon_node/store/src/chunked_vector.rs @@ -195,7 +195,7 @@ pub trait Field: Copy { fn check_and_store_genesis_value>( store: &S, value: Self::Value, - ) -> Result<(), Error> { + ) -> Result, Error> { let key = &genesis_value_key()[..]; if let Some(existing_chunk) = Chunk::::load(store, Self::column(), key)? { @@ -214,10 +214,12 @@ pub trait Field: Copy { } .into()) } else { - Ok(()) + Ok(vec![]) } } else { - Chunk::new(vec![value]).store(store, Self::column(), &genesis_value_key()[..]) + let chunk = Chunk::new(vec![value]); + let op = chunk.store(Self::column(), &genesis_value_key()[..])?; + Ok(vec![op]) } } @@ -332,7 +334,8 @@ pub fn store_updated_vector, E: EthSpec, S: KeyValueStore>( store: &S, state: &BeaconState, spec: &ChainSpec, -) -> Result<(), Error> { +) -> Result, Error> { + let mut ops: Vec = Vec::new(); let chunk_size = F::chunk_size(); let (start_vindex, end_vindex) = F::start_and_end_vindex(state.slot, spec); let start_cindex = start_vindex / chunk_size; @@ -341,13 +344,14 @@ pub fn store_updated_vector, E: EthSpec, S: KeyValueStore>( // Store the genesis value if we have access to it, and it hasn't been stored already. if F::slot_needs_genesis_value(state.slot, spec) { let genesis_value = F::extract_genesis_value(state, spec)?; - F::check_and_store_genesis_value(store, genesis_value)?; + ops.extend(F::check_and_store_genesis_value(store, genesis_value)?); } // Start by iterating backwards from the last chunk, storing new chunks in the database. // Stop once a chunk in the database matches what we were about to store, this indicates // that a previously stored state has already filled-in a portion of the indices covered. - let full_range_checked = store_range( + let mut full_range_checked = false; + ops.extend(store_range( field, (start_cindex..=end_cindex).rev(), start_vindex, @@ -355,13 +359,14 @@ pub fn store_updated_vector, E: EthSpec, S: KeyValueStore>( store, state, spec, - )?; + Some(&mut full_range_checked), + )?); // If the previous `store_range` did not check the entire range, it may be the case that the // state's vector includes elements at low vector indices that are not yet stored in the // database, so run another `store_range` to ensure these values are also stored. if !full_range_checked { - store_range( + ops.extend(store_range( field, start_cindex..end_cindex, start_vindex, @@ -369,10 +374,11 @@ pub fn store_updated_vector, E: EthSpec, S: KeyValueStore>( store, state, spec, - )?; + None, + )?); } - Ok(()) + Ok(ops) } fn store_range( @@ -383,13 +389,19 @@ fn store_range( store: &S, state: &BeaconState, spec: &ChainSpec, -) -> Result + mut full_range_checked: Option<&mut bool>, +) -> Result, Error> where F: Field, E: EthSpec, S: KeyValueStore, I: Iterator, { + let mut ops: Vec = Vec::new(); + + if let Some(ref mut full_range_checked) = full_range_checked { + **full_range_checked = false; + } for chunk_index in range { let chunk_key = &chunk_key(chunk_index as u64)[..]; @@ -406,13 +418,16 @@ where )?; if new_chunk == existing_chunk { - return Ok(false); + return Ok(ops); } - new_chunk.store(store, F::column(), chunk_key)?; + ops.push(new_chunk.store(F::column(), chunk_key)?); } - Ok(true) + if let Some(full_range_checked) = full_range_checked { + *full_range_checked = true; + } + Ok(ops) } // Chunks at the end index are included. @@ -585,14 +600,9 @@ where .transpose() } - pub fn store, E: EthSpec>( - &self, - store: &S, - column: DBColumn, - key: &[u8], - ) -> Result<(), Error> { - store.put_bytes(column.into(), key, &self.encode()?)?; - Ok(()) + pub fn store(&self, column: DBColumn, key: &[u8]) -> Result { + let db_key = get_key_for_col(column.into(), key); + Ok(KeyValueStoreOp::PutKeyValue(db_key, self.encode()?)) } /// Attempt to decode a single chunk. diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 73ddad43693..10854d0d3b9 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -141,9 +141,11 @@ impl, Cold: ItemStore> HotColdDB /// Store a state in the store. pub fn put_state(&self, state_root: &Hash256, state: &BeaconState) -> Result<(), Error> { if state.slot < self.get_split_slot() { - self.store_cold_state(state_root, &state) + let ops = self.store_cold_state(state_root, &state)?; + self.cold_db.do_atomically(&ops) } else { - self.store_hot_state(state_root, state) + let ops = self.store_hot_state(state_root, state)?; + self.hot_db.do_atomically(&ops) } } @@ -352,7 +354,9 @@ impl, Cold: ItemStore> HotColdDB &self, state_root: &Hash256, state: &BeaconState, - ) -> Result<(), Error> { + ) -> Result, Error> { + let mut ops: Vec = Vec::new(); + // On the epoch boundary, store the full state. if state.slot % E::slots_per_epoch() == 0 { trace!( @@ -361,15 +365,21 @@ impl, Cold: ItemStore> HotColdDB "slot" => state.slot.as_u64(), "state_root" => format!("{:?}", state_root) ); - store_full_state(&self.hot_db, state_root, &state)?; + ops.push(store_full_state(state_root, &state)?); } // Store a summary of the state. // We store one even for the epoch boundary states, as we may need their slots // when doing a look up by state root. - self.put_state_summary(state_root, HotStateSummary::new(state_root, state)?)?; + let hot_state_summary = HotStateSummary::new(state_root, state)?; + let state_summary_key = + get_key_for_col(DBColumn::BeaconStateSummary.into(), state_root.as_bytes()); + ops.push(KeyValueStoreOp::PutKeyValue( + state_summary_key, + hot_state_summary.as_store_bytes(), + )); - Ok(()) + Ok(ops) } /// Load a post-finalization state from the hot database. @@ -413,7 +423,9 @@ impl, Cold: ItemStore> HotColdDB &self, state_root: &Hash256, state: &BeaconState, - ) -> Result<(), Error> { + ) -> Result, Error> { + let mut ops: Vec = Vec::new(); + if state.slot % self.config.slots_per_restore_point != 0 { warn!( self.log, @@ -421,7 +433,7 @@ impl, Cold: ItemStore> HotColdDB "slot" => state.slot.as_u64(), "state_root" => format!("{:?}", state_root) ); - return Ok(()); + return Ok(vec![]); } trace!( @@ -433,20 +445,30 @@ impl, Cold: ItemStore> HotColdDB // 1. Convert to PartialBeaconState and store that in the DB. let partial_state = PartialBeaconState::from_state_forgetful(state); - self.cold_db.put(state_root, &partial_state)?; + let partial_state_column = as StoreItem>::db_column(); + let partial_state_key = get_key_for_col(partial_state_column.into(), state_root.as_bytes()); + ops.push(KeyValueStoreOp::PutKeyValue( + partial_state_key, + partial_state.as_store_bytes(), + )); // 2. Store updated vector entries. let db = &self.cold_db; - store_updated_vector(BlockRoots, db, state, &self.spec)?; - store_updated_vector(StateRoots, db, state, &self.spec)?; - store_updated_vector(HistoricalRoots, db, state, &self.spec)?; - store_updated_vector(RandaoMixes, db, state, &self.spec)?; + ops.extend(store_updated_vector(BlockRoots, db, state, &self.spec)?); + ops.extend(store_updated_vector(StateRoots, db, state, &self.spec)?); + ops.extend(store_updated_vector( + HistoricalRoots, + db, + state, + &self.spec, + )?); + ops.extend(store_updated_vector(RandaoMixes, db, state, &self.spec)?); // 3. Store restore point. let restore_point_index = state.slot.as_u64() / self.config.slots_per_restore_point; - self.store_restore_point_hash(restore_point_index, *state_root)?; + ops.push(self.store_restore_point_hash(restore_point_index, *state_root)?); - Ok(()) + Ok(ops) } /// Try to load a pre-finalization state from the freezer database. @@ -666,11 +688,13 @@ impl, Cold: ItemStore> HotColdDB &self, restore_point_index: u64, state_root: Hash256, - ) -> Result<(), Error> { - let key = Self::restore_point_key(restore_point_index); - self.cold_db - .put(&key, &RestorePointHash { state_root }) - .map_err(Into::into) + ) -> Result { + let key = get_key_for_col( + DBColumn::BeaconRestorePoint.into(), + Self::restore_point_key(restore_point_index).as_bytes(), + ); + let value = &RestorePointHash { state_root }; + Ok(KeyValueStoreOp::PutKeyValue(key, value.as_store_bytes())) } /// Convert a `restore_point_index` into a database key. @@ -775,7 +799,8 @@ pub fn process_finalization, Cold: ItemStore>( let state: BeaconState = get_full_state(&store.hot_db, &state_root)? .ok_or_else(|| HotColdDBError::MissingStateToFreeze(state_root))?; - store.store_cold_state(&state_root, &state)?; + let ops = store.store_cold_state(&state_root, &state)?; + store.cold_db.do_atomically(&ops)?; } // Store a pointer from this state root to its slot, so we can later reconstruct states diff --git a/beacon_node/store/src/impls/beacon_state.rs b/beacon_node/store/src/impls/beacon_state.rs index 8b57f80e16e..83f791970cd 100644 --- a/beacon_node/store/src/impls/beacon_state.rs +++ b/beacon_node/store/src/impls/beacon_state.rs @@ -4,24 +4,18 @@ use ssz_derive::{Decode, Encode}; use std::convert::TryInto; use types::beacon_state::{CloneConfig, CommitteeCache, CACHED_EPOCHS}; -pub fn store_full_state, E: EthSpec>( - store: &KV, +pub fn store_full_state( state_root: &Hash256, state: &BeaconState, -) -> Result<(), Error> { - let total_timer = metrics::start_timer(&metrics::BEACON_STATE_WRITE_TIMES); - let overhead_timer = metrics::start_timer(&metrics::BEACON_STATE_WRITE_OVERHEAD_TIMES); - - let bytes = StorageContainer::new(state).as_ssz_bytes(); - metrics::stop_timer(overhead_timer); - - let result = store.put_bytes(DBColumn::BeaconState.into(), state_root.as_bytes(), &bytes); - - metrics::stop_timer(total_timer); - metrics::inc_counter(&metrics::BEACON_STATE_WRITE_COUNT); +) -> Result { + let bytes = { + let _overhead_timer = metrics::start_timer(&metrics::BEACON_STATE_WRITE_OVERHEAD_TIMES); + StorageContainer::new(state).as_ssz_bytes() + }; metrics::inc_counter_by(&metrics::BEACON_STATE_WRITE_BYTES, bytes.len() as i64); - - result + metrics::inc_counter(&metrics::BEACON_STATE_WRITE_COUNT); + let key = get_key_for_col(DBColumn::BeaconState.into(), state_root.as_bytes()); + Ok(KeyValueStoreOp::PutKeyValue(key, bytes)) } pub fn get_full_state, E: EthSpec>( diff --git a/beacon_node/store/src/leveldb_store.rs b/beacon_node/store/src/leveldb_store.rs index 6e3dc8bbe34..982a0e757da 100644 --- a/beacon_node/store/src/leveldb_store.rs +++ b/beacon_node/store/src/leveldb_store.rs @@ -102,6 +102,10 @@ impl KeyValueStore for LevelDB { let mut leveldb_batch = Writebatch::new(); for op in ops_batch.into_iter() { match op { + KeyValueStoreOp::PutKeyValue(key, value) => { + leveldb_batch.put(BytesKey::from_vec(key.to_vec()), value); + } + KeyValueStoreOp::DeleteKey(key) => { leveldb_batch.delete(BytesKey::from_vec(key.to_vec())); } diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index 1d0506ce360..7ccc0cd1594 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -60,6 +60,7 @@ pub fn get_key_for_col(column: &str, key: &[u8]) -> Vec { } pub enum KeyValueStoreOp { + PutKeyValue(Vec, Vec), DeleteKey(Vec), } diff --git a/beacon_node/store/src/memory_store.rs b/beacon_node/store/src/memory_store.rs index c8556860cf9..349f3e4f344 100644 --- a/beacon_node/store/src/memory_store.rs +++ b/beacon_node/store/src/memory_store.rs @@ -67,6 +67,10 @@ impl KeyValueStore for MemoryStore { fn do_atomically(&self, batch: &[KeyValueStoreOp]) -> Result<(), Error> { for op in batch { match op { + KeyValueStoreOp::PutKeyValue(key, value) => { + self.db.write().insert(key.to_vec(), value.to_vec()); + } + KeyValueStoreOp::DeleteKey(hash) => { self.db.write().remove(hash); } From 2ebfbadbd5c1d8078e849271e44fdc5e40f80051 Mon Sep 17 00:00:00 2001 From: Adam Szkoda Date: Mon, 22 Jun 2020 09:16:30 +0200 Subject: [PATCH 2/9] Reduce number of vec allocations --- beacon_node/store/src/chunked_vector.rs | 58 ++++++++++----------- beacon_node/store/src/hot_cold_store.rs | 42 +++++++-------- beacon_node/store/src/impls/beacon_state.rs | 6 ++- 3 files changed, 51 insertions(+), 55 deletions(-) diff --git a/beacon_node/store/src/chunked_vector.rs b/beacon_node/store/src/chunked_vector.rs index e2c73ca97a0..8f5b09debb0 100644 --- a/beacon_node/store/src/chunked_vector.rs +++ b/beacon_node/store/src/chunked_vector.rs @@ -195,7 +195,8 @@ pub trait Field: Copy { fn check_and_store_genesis_value>( store: &S, value: Self::Value, - ) -> Result, Error> { + ops: &mut Vec, + ) -> Result<(), Error> { let key = &genesis_value_key()[..]; if let Some(existing_chunk) = Chunk::::load(store, Self::column(), key)? { @@ -214,12 +215,12 @@ pub trait Field: Copy { } .into()) } else { - Ok(vec![]) + Ok(()) } } else { let chunk = Chunk::new(vec![value]); - let op = chunk.store(Self::column(), &genesis_value_key()[..])?; - Ok(vec![op]) + chunk.store(Self::column(), &genesis_value_key()[..], ops)?; + Ok(()) } } @@ -334,8 +335,8 @@ pub fn store_updated_vector, E: EthSpec, S: KeyValueStore>( store: &S, state: &BeaconState, spec: &ChainSpec, -) -> Result, Error> { - let mut ops: Vec = Vec::new(); + ops: &mut Vec, +) -> Result<(), Error> { let chunk_size = F::chunk_size(); let (start_vindex, end_vindex) = F::start_and_end_vindex(state.slot, spec); let start_cindex = start_vindex / chunk_size; @@ -344,14 +345,13 @@ pub fn store_updated_vector, E: EthSpec, S: KeyValueStore>( // Store the genesis value if we have access to it, and it hasn't been stored already. if F::slot_needs_genesis_value(state.slot, spec) { let genesis_value = F::extract_genesis_value(state, spec)?; - ops.extend(F::check_and_store_genesis_value(store, genesis_value)?); + F::check_and_store_genesis_value(store, genesis_value, ops)?; } // Start by iterating backwards from the last chunk, storing new chunks in the database. // Stop once a chunk in the database matches what we were about to store, this indicates // that a previously stored state has already filled-in a portion of the indices covered. - let mut full_range_checked = false; - ops.extend(store_range( + let full_range_checked = store_range( field, (start_cindex..=end_cindex).rev(), start_vindex, @@ -359,14 +359,14 @@ pub fn store_updated_vector, E: EthSpec, S: KeyValueStore>( store, state, spec, - Some(&mut full_range_checked), - )?); + ops, + )?; // If the previous `store_range` did not check the entire range, it may be the case that the // state's vector includes elements at low vector indices that are not yet stored in the // database, so run another `store_range` to ensure these values are also stored. if !full_range_checked { - ops.extend(store_range( + store_range( field, start_cindex..end_cindex, start_vindex, @@ -374,11 +374,11 @@ pub fn store_updated_vector, E: EthSpec, S: KeyValueStore>( store, state, spec, - None, - )?); + ops, + )?; } - Ok(ops) + Ok(()) } fn store_range( @@ -389,19 +389,14 @@ fn store_range( store: &S, state: &BeaconState, spec: &ChainSpec, - mut full_range_checked: Option<&mut bool>, -) -> Result, Error> + ops: &mut Vec, +) -> Result where F: Field, E: EthSpec, S: KeyValueStore, I: Iterator, { - let mut ops: Vec = Vec::new(); - - if let Some(ref mut full_range_checked) = full_range_checked { - **full_range_checked = false; - } for chunk_index in range { let chunk_key = &chunk_key(chunk_index as u64)[..]; @@ -418,16 +413,13 @@ where )?; if new_chunk == existing_chunk { - return Ok(ops); + return Ok(false); } - ops.push(new_chunk.store(F::column(), chunk_key)?); + new_chunk.store(F::column(), chunk_key, ops)?; } - if let Some(full_range_checked) = full_range_checked { - *full_range_checked = true; - } - Ok(ops) + Ok(true) } // Chunks at the end index are included. @@ -600,9 +592,15 @@ where .transpose() } - pub fn store(&self, column: DBColumn, key: &[u8]) -> Result { + pub fn store( + &self, + column: DBColumn, + key: &[u8], + ops: &mut Vec, + ) -> Result<(), Error> { let db_key = get_key_for_col(column.into(), key); - Ok(KeyValueStoreOp::PutKeyValue(db_key, self.encode()?)) + ops.push(KeyValueStoreOp::PutKeyValue(db_key, self.encode()?)); + Ok(()) } /// Attempt to decode a single chunk. diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 10854d0d3b9..4aa7036c89c 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -141,10 +141,12 @@ impl, Cold: ItemStore> HotColdDB /// Store a state in the store. pub fn put_state(&self, state_root: &Hash256, state: &BeaconState) -> Result<(), Error> { if state.slot < self.get_split_slot() { - let ops = self.store_cold_state(state_root, &state)?; + let mut ops: Vec = Vec::new(); + self.store_cold_state(state_root, &state, &mut ops)?; self.cold_db.do_atomically(&ops) } else { - let ops = self.store_hot_state(state_root, state)?; + let mut ops: Vec = Vec::new(); + self.store_hot_state(state_root, state, &mut ops)?; self.hot_db.do_atomically(&ops) } } @@ -354,9 +356,8 @@ impl, Cold: ItemStore> HotColdDB &self, state_root: &Hash256, state: &BeaconState, - ) -> Result, Error> { - let mut ops: Vec = Vec::new(); - + mut ops: &mut Vec, + ) -> Result<(), Error> { // On the epoch boundary, store the full state. if state.slot % E::slots_per_epoch() == 0 { trace!( @@ -365,7 +366,7 @@ impl, Cold: ItemStore> HotColdDB "slot" => state.slot.as_u64(), "state_root" => format!("{:?}", state_root) ); - ops.push(store_full_state(state_root, &state)?); + store_full_state(state_root, &state, &mut ops)?; } // Store a summary of the state. @@ -379,7 +380,7 @@ impl, Cold: ItemStore> HotColdDB hot_state_summary.as_store_bytes(), )); - Ok(ops) + Ok(()) } /// Load a post-finalization state from the hot database. @@ -423,9 +424,8 @@ impl, Cold: ItemStore> HotColdDB &self, state_root: &Hash256, state: &BeaconState, - ) -> Result, Error> { - let mut ops: Vec = Vec::new(); - + mut ops: &mut Vec, + ) -> Result<(), Error> { if state.slot % self.config.slots_per_restore_point != 0 { warn!( self.log, @@ -433,7 +433,7 @@ impl, Cold: ItemStore> HotColdDB "slot" => state.slot.as_u64(), "state_root" => format!("{:?}", state_root) ); - return Ok(vec![]); + return Ok(()); } trace!( @@ -454,21 +454,16 @@ impl, Cold: ItemStore> HotColdDB // 2. Store updated vector entries. let db = &self.cold_db; - ops.extend(store_updated_vector(BlockRoots, db, state, &self.spec)?); - ops.extend(store_updated_vector(StateRoots, db, state, &self.spec)?); - ops.extend(store_updated_vector( - HistoricalRoots, - db, - state, - &self.spec, - )?); - ops.extend(store_updated_vector(RandaoMixes, db, state, &self.spec)?); + store_updated_vector(BlockRoots, db, state, &self.spec, &mut ops)?; + store_updated_vector(StateRoots, db, state, &self.spec, &mut ops)?; + store_updated_vector(HistoricalRoots, db, state, &self.spec, &mut ops)?; + store_updated_vector(RandaoMixes, db, state, &self.spec, &mut ops)?; // 3. Store restore point. let restore_point_index = state.slot.as_u64() / self.config.slots_per_restore_point; - ops.push(self.store_restore_point_hash(restore_point_index, *state_root)?); + self.store_restore_point_hash(restore_point_index, *state_root)?; - Ok(ops) + Ok(()) } /// Try to load a pre-finalization state from the freezer database. @@ -799,7 +794,8 @@ pub fn process_finalization, Cold: ItemStore>( let state: BeaconState = get_full_state(&store.hot_db, &state_root)? .ok_or_else(|| HotColdDBError::MissingStateToFreeze(state_root))?; - let ops = store.store_cold_state(&state_root, &state)?; + let mut ops: Vec = Vec::new(); + store.store_cold_state(&state_root, &state, &mut ops)?; store.cold_db.do_atomically(&ops)?; } diff --git a/beacon_node/store/src/impls/beacon_state.rs b/beacon_node/store/src/impls/beacon_state.rs index 83f791970cd..6cff12707d9 100644 --- a/beacon_node/store/src/impls/beacon_state.rs +++ b/beacon_node/store/src/impls/beacon_state.rs @@ -7,7 +7,8 @@ use types::beacon_state::{CloneConfig, CommitteeCache, CACHED_EPOCHS}; pub fn store_full_state( state_root: &Hash256, state: &BeaconState, -) -> Result { + ops: &mut Vec, +) -> Result<(), Error> { let bytes = { let _overhead_timer = metrics::start_timer(&metrics::BEACON_STATE_WRITE_OVERHEAD_TIMES); StorageContainer::new(state).as_ssz_bytes() @@ -15,7 +16,8 @@ pub fn store_full_state( metrics::inc_counter_by(&metrics::BEACON_STATE_WRITE_BYTES, bytes.len() as i64); metrics::inc_counter(&metrics::BEACON_STATE_WRITE_COUNT); let key = get_key_for_col(DBColumn::BeaconState.into(), state_root.as_bytes()); - Ok(KeyValueStoreOp::PutKeyValue(key, bytes)) + ops.push(KeyValueStoreOp::PutKeyValue(key, bytes)); + Ok(()) } pub fn get_full_state, E: EthSpec>( From 308718a5106dd76dfedf3c5c74c4042c82acc08f Mon Sep 17 00:00:00 2001 From: Adam Szkoda Date: Mon, 22 Jun 2020 12:29:51 +0200 Subject: [PATCH 3/9] Make crucial db operations atomic --- beacon_node/beacon_chain/src/beacon_chain.rs | 20 ++++-------- .../beacon_chain/src/block_verification.rs | 18 +++++++---- beacon_node/beacon_chain/src/migrate.rs | 4 +-- beacon_node/store/src/hot_cold_store.rs | 31 ++++++++++++++++--- beacon_node/store/src/lib.rs | 28 ++++++++++++++++- 5 files changed, 74 insertions(+), 27 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 143f07e0d58..0b710b1145c 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -44,7 +44,7 @@ use std::io::prelude::*; use std::sync::Arc; use std::time::{Duration, Instant}; use store::iter::{BlockRootsIterator, ParentRootBlockIterator, StateRootsIterator}; -use store::{Error as DBError, HotColdDB}; +use store::{Error as DBError, HotColdDB, StoreOp}; use types::*; pub type ForkChoiceError = fork_choice::Error; @@ -1422,8 +1422,8 @@ impl BeaconChain { let block_root = fully_verified_block.block_root; let state = fully_verified_block.state; let parent_block = fully_verified_block.parent_block; - let intermediate_states = fully_verified_block.intermediate_states; let current_slot = self.slot()?; + let mut ops = fully_verified_block.intermediate_states; let attestation_observation_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_ATTESTATION_OBSERVATION); @@ -1515,18 +1515,10 @@ impl BeaconChain { let db_write_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_DB_WRITE); - // Store all the states between the parent block state and this block's slot before storing - // the final state. - intermediate_states.commit(&*self.store)?; - - // Store the block and state. - // NOTE: we store the block *after* the state to guard against inconsistency in the event of - // a crash, as states are usually looked up from blocks, not the other way around. A better - // solution would be to use a database transaction (once our choice of database and API - // settles down). - // See: https://github.com/sigp/lighthouse/issues/692 - self.store.put_state(&block.state_root, &state)?; - self.store.put_block(&block_root, signed_block.clone())?; + // Store all the states between the parent block state and this block's slot, the block and state. + ops.push(StoreOp::PutBlock(block_root.into(), signed_block.clone())); + ops.push(StoreOp::PutState(block.state_root.into(), Cow::Borrowed(&state))); + self.store.do_atomically(ops)?; // The fork choice write-lock is dropped *after* the on-disk database has been updated. // This prevents inconsistency between the two at the expense of concurrency. diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 0467009aa1d..78e385e5f3b 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -62,7 +62,7 @@ use std::borrow::Cow; use std::convert::TryFrom; use std::fs; use std::io::Write; -use store::{Error as DBError, StateBatch}; +use store::{Error as DBError, StoreOp, HotStateSummary}; use tree_hash::TreeHash; use types::{ BeaconBlock, BeaconState, BeaconStateError, ChainSpec, CloneConfig, EthSpec, Hash256, @@ -263,12 +263,12 @@ pub struct SignatureVerifiedBlock { /// Note: a `FullyVerifiedBlock` is not _forever_ valid to be imported, it may later become invalid /// due to finality or some other event. A `FullyVerifiedBlock` should be imported into the /// `BeaconChain` immediately after it is instantiated. -pub struct FullyVerifiedBlock { +pub struct FullyVerifiedBlock<'a, T: BeaconChainTypes> { pub block: SignedBeaconBlock, pub block_root: Hash256, pub state: BeaconState, pub parent_block: SignedBeaconBlock, - pub intermediate_states: StateBatch, + pub intermediate_states: Vec>, } /// Implemented on types that can be converted into a `FullyVerifiedBlock`. @@ -506,7 +506,7 @@ impl IntoFullyVerifiedBlock for SignedBeaconBlock FullyVerifiedBlock { +impl<'a, T: BeaconChainTypes> FullyVerifiedBlock<'a, T> { /// Instantiates `Self`, a wrapper that indicates that the given `block` is fully valid. See /// the struct-level documentation for more information. /// @@ -552,7 +552,7 @@ impl FullyVerifiedBlock { // Keep a batch of any states that were "skipped" (block-less) in between the parent state // slot and the block slot. These will be stored in the database. - let mut intermediate_states = StateBatch::new(); + let mut intermediate_states: Vec> = Vec::new(); // The block must have a higher slot than its parent. if block.slot() <= parent.beacon_state.slot { @@ -575,7 +575,13 @@ impl FullyVerifiedBlock { // Computing the state root here is time-equivalent to computing it during slot // processing, but we get early access to it. let state_root = state.update_tree_hash_cache()?; - intermediate_states.add_state(state_root, &state)?; + + let op = if state.slot % T::EthSpec::slots_per_epoch() == 0 { + StoreOp::PutState(state_root.into(), Cow::Owned(state.clone())) + } else { + StoreOp::PutStateSummary(state_root.into(), HotStateSummary::new(&state_root, &state)?) + }; + intermediate_states.push(op); state_root }; diff --git a/beacon_node/beacon_chain/src/migrate.rs b/beacon_node/beacon_chain/src/migrate.rs index 7291a3ffc25..1ad83b63356 100644 --- a/beacon_node/beacon_chain/src/migrate.rs +++ b/beacon_node/beacon_chain/src/migrate.rs @@ -152,7 +152,7 @@ pub trait Migrate, Cold: ItemStore>: } } - let batch: Vec = abandoned_blocks + let batch: Vec> = abandoned_blocks .into_iter() .map(|block_hash| StoreOp::DeleteBlock(block_hash)) .chain( @@ -161,7 +161,7 @@ pub trait Migrate, Cold: ItemStore>: .map(|(slot, state_hash)| StoreOp::DeleteState(state_hash, slot)), ) .collect(); - store.do_atomically(&batch)?; + store.do_atomically(batch)?; for head_hash in abandoned_heads.into_iter() { head_tracker.remove_head(head_hash); } diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 4aa7036c89c..e366dbb525d 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -10,7 +10,7 @@ use crate::memory_store::MemoryStore; use crate::metrics; use crate::{ get_key_for_col, DBColumn, Error, ItemStore, KeyValueStoreOp, PartialBeaconState, StoreItem, - StoreOp, + StoreOp, put_block_op, put_state_summary_op, }; use lru::LruCache; use parking_lot::{Mutex, RwLock}; @@ -247,12 +247,25 @@ impl, Cold: ItemStore> HotColdDB self.hot_db.exists::(key) } - pub fn do_atomically(&self, batch: &[StoreOp]) -> Result<(), Error> { + pub fn do_atomically(&self, batch: Vec>) -> Result<(), Error> { let mut guard = self.block_cache.lock(); let mut key_value_batch: Vec = Vec::with_capacity(batch.len()); - for op in batch { + for op in &batch { match op { + StoreOp::PutBlock(block_hash, block) => { + put_block_op(*block_hash, block, &mut key_value_batch); + } + + StoreOp::PutState(state_hash, state) => { + let untyped_hash: Hash256 = (*state_hash).into(); + self.store_hot_state(&untyped_hash, state, &mut key_value_batch)?; + } + + StoreOp::PutStateSummary(state_hash, summary) => { + put_state_summary_op(*state_hash, &summary, &mut key_value_batch); + } + StoreOp::DeleteBlock(block_hash) => { let untyped_hash: Hash256 = (*block_hash).into(); let key = @@ -278,12 +291,22 @@ impl, Cold: ItemStore> HotColdDB } self.hot_db.do_atomically(&key_value_batch)?; - for op in batch { + for op in &batch { match op { + StoreOp::PutBlock(block_hash, block) => { + let untyped_hash: Hash256 = (*block_hash).into(); + guard.put(untyped_hash, block.clone()); + } + + StoreOp::PutState(_, _) => (), + + StoreOp::PutStateSummary(_, _) => (), + StoreOp::DeleteBlock(block_hash) => { let untyped_hash: Hash256 = (*block_hash).into(); guard.pop(&untyped_hash); } + StoreOp::DeleteState(_, _) => (), } } diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index 7ccc0cd1594..3c196c58f7d 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -25,6 +25,8 @@ mod state_batch; pub mod iter; +use std::borrow::Cow; + pub use self::config::StoreConfig; pub use self::hot_cold_store::{HotColdDB, HotStateSummary}; pub use self::leveldb_store::LevelDB; @@ -59,6 +61,27 @@ pub fn get_key_for_col(column: &str, key: &[u8]) -> Vec { result } + +pub fn put_block_op(hash: SignedBeaconBlockHash, block: &SignedBeaconBlock, ops: &mut Vec) { + let column = SignedBeaconBlock::::db_column().into(); + let untyped_hash: Hash256 = hash.into(); + let key = get_key_for_col(column, untyped_hash.as_bytes()); + let op = KeyValueStoreOp::PutKeyValue(key, block.as_store_bytes()); + ops.push(op) +} + + +pub fn put_state_summary_op(hash: BeaconStateHash, summary: &HotStateSummary, ops: &mut Vec) { + let untyped_hash: Hash256 = hash.into(); + let key = get_key_for_col( + DBColumn::BeaconStateSummary.into(), + untyped_hash.as_bytes(), + ); + let op = KeyValueStoreOp::PutKeyValue(key, summary.as_store_bytes()); + ops.push(op); +} + + pub enum KeyValueStoreOp { PutKeyValue(Vec, Vec), DeleteKey(Vec), @@ -104,7 +127,10 @@ pub trait ItemStore: KeyValueStore + Sync + Send + Sized + 'stati /// Reified key-value storage operation. Helps in modifying the storage atomically. /// See also https://github.com/sigp/lighthouse/issues/692 -pub enum StoreOp { +pub enum StoreOp<'a, E: EthSpec> { + PutBlock(SignedBeaconBlockHash, SignedBeaconBlock), + PutState(BeaconStateHash, Cow<'a, BeaconState>), + PutStateSummary(BeaconStateHash, HotStateSummary), DeleteBlock(SignedBeaconBlockHash), DeleteState(BeaconStateHash, Slot), } From 93950aceecf0d7aeb78030049e02af128ffa6be4 Mon Sep 17 00:00:00 2001 From: Adam Szkoda Date: Mon, 22 Jun 2020 12:35:08 +0200 Subject: [PATCH 4/9] Save restore points --- beacon_node/beacon_chain/src/beacon_chain.rs | 5 ++++- .../beacon_chain/src/block_verification.rs | 7 +++++-- beacon_node/store/src/hot_cold_store.rs | 12 ++++++----- beacon_node/store/src/lib.rs | 20 ++++++++++--------- 4 files changed, 27 insertions(+), 17 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 0b710b1145c..1dac8b296a6 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -1517,7 +1517,10 @@ impl BeaconChain { // Store all the states between the parent block state and this block's slot, the block and state. ops.push(StoreOp::PutBlock(block_root.into(), signed_block.clone())); - ops.push(StoreOp::PutState(block.state_root.into(), Cow::Borrowed(&state))); + ops.push(StoreOp::PutState( + block.state_root.into(), + Cow::Borrowed(&state), + )); self.store.do_atomically(ops)?; // The fork choice write-lock is dropped *after* the on-disk database has been updated. diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 78e385e5f3b..fa4e0d6a9dd 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -62,7 +62,7 @@ use std::borrow::Cow; use std::convert::TryFrom; use std::fs; use std::io::Write; -use store::{Error as DBError, StoreOp, HotStateSummary}; +use store::{Error as DBError, HotStateSummary, StoreOp}; use tree_hash::TreeHash; use types::{ BeaconBlock, BeaconState, BeaconStateError, ChainSpec, CloneConfig, EthSpec, Hash256, @@ -579,7 +579,10 @@ impl<'a, T: BeaconChainTypes> FullyVerifiedBlock<'a, T> { let op = if state.slot % T::EthSpec::slots_per_epoch() == 0 { StoreOp::PutState(state_root.into(), Cow::Owned(state.clone())) } else { - StoreOp::PutStateSummary(state_root.into(), HotStateSummary::new(&state_root, &state)?) + StoreOp::PutStateSummary( + state_root.into(), + HotStateSummary::new(&state_root, &state)?, + ) }; intermediate_states.push(op); state_root diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index e366dbb525d..8aae93bd770 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -9,8 +9,8 @@ use crate::leveldb_store::LevelDB; use crate::memory_store::MemoryStore; use crate::metrics; use crate::{ - get_key_for_col, DBColumn, Error, ItemStore, KeyValueStoreOp, PartialBeaconState, StoreItem, - StoreOp, put_block_op, put_state_summary_op, + get_key_for_col, put_block_op, put_state_summary_op, DBColumn, Error, ItemStore, + KeyValueStoreOp, PartialBeaconState, StoreItem, StoreOp, }; use lru::LruCache; use parking_lot::{Mutex, RwLock}; @@ -484,7 +484,7 @@ impl, Cold: ItemStore> HotColdDB // 3. Store restore point. let restore_point_index = state.slot.as_u64() / self.config.slots_per_restore_point; - self.store_restore_point_hash(restore_point_index, *state_root)?; + self.store_restore_point_hash(restore_point_index, *state_root, &mut ops); Ok(()) } @@ -706,13 +706,15 @@ impl, Cold: ItemStore> HotColdDB &self, restore_point_index: u64, state_root: Hash256, - ) -> Result { + ops: &mut Vec, + ) { let key = get_key_for_col( DBColumn::BeaconRestorePoint.into(), Self::restore_point_key(restore_point_index).as_bytes(), ); let value = &RestorePointHash { state_root }; - Ok(KeyValueStoreOp::PutKeyValue(key, value.as_store_bytes())) + let op = KeyValueStoreOp::PutKeyValue(key, value.as_store_bytes()); + ops.push(op); } /// Convert a `restore_point_index` into a database key. diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index 3c196c58f7d..a79007dd897 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -61,8 +61,11 @@ pub fn get_key_for_col(column: &str, key: &[u8]) -> Vec { result } - -pub fn put_block_op(hash: SignedBeaconBlockHash, block: &SignedBeaconBlock, ops: &mut Vec) { +pub fn put_block_op( + hash: SignedBeaconBlockHash, + block: &SignedBeaconBlock, + ops: &mut Vec, +) { let column = SignedBeaconBlock::::db_column().into(); let untyped_hash: Hash256 = hash.into(); let key = get_key_for_col(column, untyped_hash.as_bytes()); @@ -70,18 +73,17 @@ pub fn put_block_op(hash: SignedBeaconBlockHash, block: &SignedBeaco ops.push(op) } - -pub fn put_state_summary_op(hash: BeaconStateHash, summary: &HotStateSummary, ops: &mut Vec) { +pub fn put_state_summary_op( + hash: BeaconStateHash, + summary: &HotStateSummary, + ops: &mut Vec, +) { let untyped_hash: Hash256 = hash.into(); - let key = get_key_for_col( - DBColumn::BeaconStateSummary.into(), - untyped_hash.as_bytes(), - ); + let key = get_key_for_col(DBColumn::BeaconStateSummary.into(), untyped_hash.as_bytes()); let op = KeyValueStoreOp::PutKeyValue(key, summary.as_store_bytes()); ops.push(op); } - pub enum KeyValueStoreOp { PutKeyValue(Vec, Vec), DeleteKey(Vec), From 2301f1f3c5153308e587ba57917f0f49d6cd104e Mon Sep 17 00:00:00 2001 From: Adam Szkoda Date: Mon, 22 Jun 2020 12:40:06 +0200 Subject: [PATCH 5/9] Remove StateBatch --- beacon_node/store/src/lib.rs | 2 -- beacon_node/store/src/state_batch.rs | 50 ---------------------------- 2 files changed, 52 deletions(-) delete mode 100644 beacon_node/store/src/state_batch.rs diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index a79007dd897..f37376f367c 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -21,7 +21,6 @@ mod leveldb_store; mod memory_store; mod metrics; mod partial_beacon_state; -mod state_batch; pub mod iter; @@ -35,7 +34,6 @@ pub use self::partial_beacon_state::PartialBeaconState; pub use errors::Error; pub use impls::beacon_state::StorageContainer as BeaconStateStorageContainer; pub use metrics::scrape_for_metrics; -pub use state_batch::StateBatch; pub use types::*; pub trait KeyValueStore: Sync + Send + Sized + 'static { diff --git a/beacon_node/store/src/state_batch.rs b/beacon_node/store/src/state_batch.rs deleted file mode 100644 index ab2d060c1de..00000000000 --- a/beacon_node/store/src/state_batch.rs +++ /dev/null @@ -1,50 +0,0 @@ -use crate::{Error, HotColdDB, HotStateSummary, ItemStore}; -use types::{BeaconState, EthSpec, Hash256}; - -/// A collection of states to be stored in the database. -/// -/// Consumes minimal space in memory by not storing states between epoch boundaries. -#[derive(Debug, Clone, Default)] -pub struct StateBatch { - items: Vec>, -} - -#[derive(Debug, Clone)] -#[allow(clippy::large_enum_variant)] -enum BatchItem { - Full(Hash256, BeaconState), - Summary(Hash256, HotStateSummary), -} - -impl StateBatch { - /// Create a new empty batch. - pub fn new() -> Self { - Self::default() - } - - /// Stage a `BeaconState` to be stored. - pub fn add_state(&mut self, state_root: Hash256, state: &BeaconState) -> Result<(), Error> { - let item = if state.slot % E::slots_per_epoch() == 0 { - BatchItem::Full(state_root, state.clone()) - } else { - BatchItem::Summary(state_root, HotStateSummary::new(&state_root, state)?) - }; - self.items.push(item); - Ok(()) - } - - /// Write the batch to the database. - /// - /// May fail to write the full batch if any of the items error (i.e. not atomic!) - pub fn commit, Cold: ItemStore>( - self, - store: &HotColdDB, - ) -> Result<(), Error> { - self.items.into_iter().try_for_each(|item| match item { - BatchItem::Full(state_root, state) => store.put_state(&state_root, &state), - BatchItem::Summary(state_root, summary) => { - store.put_state_summary(&state_root, summary) - } - }) - } -} From a07de834beb67375a2ab32f05e2355451e92c64f Mon Sep 17 00:00:00 2001 From: Adam Szkoda Date: Mon, 22 Jun 2020 12:42:27 +0200 Subject: [PATCH 6/9] Merge two HotColdDB impls --- beacon_node/store/src/hot_cold_store.rs | 115 ++++++++++++------------ 1 file changed, 56 insertions(+), 59 deletions(-) diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 8aae93bd770..a044a4e1649 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -87,6 +87,62 @@ pub enum HotColdDBError { RestorePointBlockHashError(BeaconStateError), } +impl HotColdDB, MemoryStore> { + pub fn open_ephemeral( + config: StoreConfig, + spec: ChainSpec, + log: Logger, + ) -> Result, MemoryStore>, Error> { + Self::verify_slots_per_restore_point(config.slots_per_restore_point)?; + + let db = HotColdDB { + split: RwLock::new(Split::default()), + cold_db: MemoryStore::open(), + hot_db: MemoryStore::open(), + block_cache: Mutex::new(LruCache::new(config.block_cache_size)), + config, + spec, + log, + _phantom: PhantomData, + }; + + Ok(db) + } +} + +impl HotColdDB, LevelDB> { + /// Open a new or existing database, with the given paths to the hot and cold DBs. + /// + /// The `slots_per_restore_point` parameter must be a divisor of `SLOTS_PER_HISTORICAL_ROOT`. + pub fn open( + hot_path: &Path, + cold_path: &Path, + config: StoreConfig, + spec: ChainSpec, + log: Logger, + ) -> Result, LevelDB>, Error> { + Self::verify_slots_per_restore_point(config.slots_per_restore_point)?; + + let db = HotColdDB { + split: RwLock::new(Split::default()), + cold_db: LevelDB::open(cold_path)?, + hot_db: LevelDB::open(hot_path)?, + block_cache: Mutex::new(LruCache::new(config.block_cache_size)), + config, + spec, + log, + _phantom: PhantomData, + }; + + // Load the previous split slot from the database (if any). This ensures we can + // stop and restart correctly. + if let Some(split) = db.load_split()? { + *db.split.write() = split; + } + Ok(db) + } +} + impl, Cold: ItemStore> HotColdDB { /// Store a block and update the LRU cache. pub fn put_block( @@ -312,65 +368,6 @@ impl, Cold: ItemStore> HotColdDB } Ok(()) } -} - -impl HotColdDB, MemoryStore> { - pub fn open_ephemeral( - config: StoreConfig, - spec: ChainSpec, - log: Logger, - ) -> Result, MemoryStore>, Error> { - Self::verify_slots_per_restore_point(config.slots_per_restore_point)?; - - let db = HotColdDB { - split: RwLock::new(Split::default()), - cold_db: MemoryStore::open(), - hot_db: MemoryStore::open(), - block_cache: Mutex::new(LruCache::new(config.block_cache_size)), - config, - spec, - log, - _phantom: PhantomData, - }; - - Ok(db) - } -} - -impl HotColdDB, LevelDB> { - /// Open a new or existing database, with the given paths to the hot and cold DBs. - /// - /// The `slots_per_restore_point` parameter must be a divisor of `SLOTS_PER_HISTORICAL_ROOT`. - pub fn open( - hot_path: &Path, - cold_path: &Path, - config: StoreConfig, - spec: ChainSpec, - log: Logger, - ) -> Result, LevelDB>, Error> { - Self::verify_slots_per_restore_point(config.slots_per_restore_point)?; - - let db = HotColdDB { - split: RwLock::new(Split::default()), - cold_db: LevelDB::open(cold_path)?, - hot_db: LevelDB::open(hot_path)?, - block_cache: Mutex::new(LruCache::new(config.block_cache_size)), - config, - spec, - log, - _phantom: PhantomData, - }; - - // Load the previous split slot from the database (if any). This ensures we can - // stop and restart correctly. - if let Some(split) = db.load_split()? { - *db.split.write() = split; - } - Ok(db) - } -} - -impl, Cold: ItemStore> HotColdDB { /// Store a post-finalization state efficiently in the hot database. /// /// On an epoch boundary, store a full state. On an intermediate slot, store From 3beeab8cf2ed9084f006dce2bd3c630547073fbd Mon Sep 17 00:00:00 2001 From: Adam Szkoda Date: Tue, 23 Jun 2020 11:38:19 +0200 Subject: [PATCH 7/9] Further reduce allocations --- beacon_node/store/src/hot_cold_store.rs | 8 ++++---- beacon_node/store/src/leveldb_store.rs | 8 ++++---- beacon_node/store/src/lib.rs | 2 +- beacon_node/store/src/memory_store.rs | 6 +++--- 4 files changed, 12 insertions(+), 12 deletions(-) diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index a044a4e1649..42eb13077e0 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -199,11 +199,11 @@ impl, Cold: ItemStore> HotColdDB if state.slot < self.get_split_slot() { let mut ops: Vec = Vec::new(); self.store_cold_state(state_root, &state, &mut ops)?; - self.cold_db.do_atomically(&ops) + self.cold_db.do_atomically(ops) } else { let mut ops: Vec = Vec::new(); self.store_hot_state(state_root, state, &mut ops)?; - self.hot_db.do_atomically(&ops) + self.hot_db.do_atomically(ops) } } @@ -345,7 +345,7 @@ impl, Cold: ItemStore> HotColdDB } } } - self.hot_db.do_atomically(&key_value_batch)?; + self.hot_db.do_atomically(key_value_batch)?; for op in &batch { match op { @@ -818,7 +818,7 @@ pub fn process_finalization, Cold: ItemStore>( let mut ops: Vec = Vec::new(); store.store_cold_state(&state_root, &state, &mut ops)?; - store.cold_db.do_atomically(&ops)?; + store.cold_db.do_atomically(ops)?; } // Store a pointer from this state root to its slot, so we can later reconstruct states diff --git a/beacon_node/store/src/leveldb_store.rs b/beacon_node/store/src/leveldb_store.rs index 982a0e757da..4a7822b85f6 100644 --- a/beacon_node/store/src/leveldb_store.rs +++ b/beacon_node/store/src/leveldb_store.rs @@ -98,16 +98,16 @@ impl KeyValueStore for LevelDB { .map_err(Into::into) } - fn do_atomically(&self, ops_batch: &[KeyValueStoreOp]) -> Result<(), Error> { + fn do_atomically(&self, ops_batch: Vec) -> Result<(), Error> { let mut leveldb_batch = Writebatch::new(); - for op in ops_batch.into_iter() { + for op in ops_batch { match op { KeyValueStoreOp::PutKeyValue(key, value) => { - leveldb_batch.put(BytesKey::from_vec(key.to_vec()), value); + leveldb_batch.put(BytesKey::from_vec(key), &value); } KeyValueStoreOp::DeleteKey(key) => { - leveldb_batch.delete(BytesKey::from_vec(key.to_vec())); + leveldb_batch.delete(BytesKey::from_vec(key)); } } } diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index f37376f367c..ac5f3986703 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -50,7 +50,7 @@ pub trait KeyValueStore: Sync + Send + Sized + 'static { fn key_delete(&self, column: &str, key: &[u8]) -> Result<(), Error>; /// Execute either all of the operations in `batch` or none at all, returning an error. - fn do_atomically(&self, batch: &[KeyValueStoreOp]) -> Result<(), Error>; + fn do_atomically(&self, batch: Vec) -> Result<(), Error>; } pub fn get_key_for_col(column: &str, key: &[u8]) -> Vec { diff --git a/beacon_node/store/src/memory_store.rs b/beacon_node/store/src/memory_store.rs index 349f3e4f344..30a0b1e0be4 100644 --- a/beacon_node/store/src/memory_store.rs +++ b/beacon_node/store/src/memory_store.rs @@ -64,15 +64,15 @@ impl KeyValueStore for MemoryStore { Ok(()) } - fn do_atomically(&self, batch: &[KeyValueStoreOp]) -> Result<(), Error> { + fn do_atomically(&self, batch: Vec) -> Result<(), Error> { for op in batch { match op { KeyValueStoreOp::PutKeyValue(key, value) => { - self.db.write().insert(key.to_vec(), value.to_vec()); + self.db.write().insert(key, value); } KeyValueStoreOp::DeleteKey(hash) => { - self.db.write().remove(hash); + self.db.write().remove(&hash); } } } From 356b3fc852fa97887c865bbe2c8bd35464bee241 Mon Sep 17 00:00:00 2001 From: Adam Szkoda Date: Thu, 25 Jun 2020 15:55:48 +0200 Subject: [PATCH 8/9] Review feedback --- beacon_node/store/src/hot_cold_store.rs | 48 ++++++++++--------------- beacon_node/store/src/lib.rs | 28 +++------------ beacon_node/store/src/metrics.rs | 4 --- 3 files changed, 24 insertions(+), 56 deletions(-) diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 42eb13077e0..0d96be25e49 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -9,8 +9,8 @@ use crate::leveldb_store::LevelDB; use crate::memory_store::MemoryStore; use crate::metrics; use crate::{ - get_key_for_col, put_block_op, put_state_summary_op, DBColumn, Error, ItemStore, - KeyValueStoreOp, PartialBeaconState, StoreItem, StoreOp, + get_key_for_col, DBColumn, Error, ItemStore, KeyValueStoreOp, PartialBeaconState, StoreItem, + StoreOp, }; use lru::LruCache; use parking_lot::{Mutex, RwLock}; @@ -310,7 +310,8 @@ impl, Cold: ItemStore> HotColdDB for op in &batch { match op { StoreOp::PutBlock(block_hash, block) => { - put_block_op(*block_hash, block, &mut key_value_batch); + let untyped_hash: Hash256 = (*block_hash).into(); + key_value_batch.push(block.as_kv_store_op(untyped_hash)); } StoreOp::PutState(state_hash, state) => { @@ -319,7 +320,8 @@ impl, Cold: ItemStore> HotColdDB } StoreOp::PutStateSummary(state_hash, summary) => { - put_state_summary_op(*state_hash, &summary, &mut key_value_batch); + let untyped_hash: Hash256 = (*state_hash).into(); + key_value_batch.push(summary.as_kv_store_op(untyped_hash)); } StoreOp::DeleteBlock(block_hash) => { @@ -376,7 +378,7 @@ impl, Cold: ItemStore> HotColdDB &self, state_root: &Hash256, state: &BeaconState, - mut ops: &mut Vec, + ops: &mut Vec, ) -> Result<(), Error> { // On the epoch boundary, store the full state. if state.slot % E::slots_per_epoch() == 0 { @@ -386,19 +388,15 @@ impl, Cold: ItemStore> HotColdDB "slot" => state.slot.as_u64(), "state_root" => format!("{:?}", state_root) ); - store_full_state(state_root, &state, &mut ops)?; + store_full_state(state_root, &state, ops)?; } // Store a summary of the state. // We store one even for the epoch boundary states, as we may need their slots // when doing a look up by state root. let hot_state_summary = HotStateSummary::new(state_root, state)?; - let state_summary_key = - get_key_for_col(DBColumn::BeaconStateSummary.into(), state_root.as_bytes()); - ops.push(KeyValueStoreOp::PutKeyValue( - state_summary_key, - hot_state_summary.as_store_bytes(), - )); + let op = hot_state_summary.as_kv_store_op(*state_root); + ops.push(op); Ok(()) } @@ -444,7 +442,7 @@ impl, Cold: ItemStore> HotColdDB &self, state_root: &Hash256, state: &BeaconState, - mut ops: &mut Vec, + ops: &mut Vec, ) -> Result<(), Error> { if state.slot % self.config.slots_per_restore_point != 0 { warn!( @@ -465,23 +463,19 @@ impl, Cold: ItemStore> HotColdDB // 1. Convert to PartialBeaconState and store that in the DB. let partial_state = PartialBeaconState::from_state_forgetful(state); - let partial_state_column = as StoreItem>::db_column(); - let partial_state_key = get_key_for_col(partial_state_column.into(), state_root.as_bytes()); - ops.push(KeyValueStoreOp::PutKeyValue( - partial_state_key, - partial_state.as_store_bytes(), - )); + let op = partial_state.as_kv_store_op(*state_root); + ops.push(op); // 2. Store updated vector entries. let db = &self.cold_db; - store_updated_vector(BlockRoots, db, state, &self.spec, &mut ops)?; - store_updated_vector(StateRoots, db, state, &self.spec, &mut ops)?; - store_updated_vector(HistoricalRoots, db, state, &self.spec, &mut ops)?; - store_updated_vector(RandaoMixes, db, state, &self.spec, &mut ops)?; + store_updated_vector(BlockRoots, db, state, &self.spec, ops)?; + store_updated_vector(StateRoots, db, state, &self.spec, ops)?; + store_updated_vector(HistoricalRoots, db, state, &self.spec, ops)?; + store_updated_vector(RandaoMixes, db, state, &self.spec, ops)?; // 3. Store restore point. let restore_point_index = state.slot.as_u64() / self.config.slots_per_restore_point; - self.store_restore_point_hash(restore_point_index, *state_root, &mut ops); + self.store_restore_point_hash(restore_point_index, *state_root, ops); Ok(()) } @@ -705,12 +699,8 @@ impl, Cold: ItemStore> HotColdDB state_root: Hash256, ops: &mut Vec, ) { - let key = get_key_for_col( - DBColumn::BeaconRestorePoint.into(), - Self::restore_point_key(restore_point_index).as_bytes(), - ); let value = &RestorePointHash { state_root }; - let op = KeyValueStoreOp::PutKeyValue(key, value.as_store_bytes()); + let op = value.as_kv_store_op(Self::restore_point_key(restore_point_index)); ops.push(op); } diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index ac5f3986703..c3ae8dabdb6 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -59,29 +59,6 @@ pub fn get_key_for_col(column: &str, key: &[u8]) -> Vec { result } -pub fn put_block_op( - hash: SignedBeaconBlockHash, - block: &SignedBeaconBlock, - ops: &mut Vec, -) { - let column = SignedBeaconBlock::::db_column().into(); - let untyped_hash: Hash256 = hash.into(); - let key = get_key_for_col(column, untyped_hash.as_bytes()); - let op = KeyValueStoreOp::PutKeyValue(key, block.as_store_bytes()); - ops.push(op) -} - -pub fn put_state_summary_op( - hash: BeaconStateHash, - summary: &HotStateSummary, - ops: &mut Vec, -) { - let untyped_hash: Hash256 = hash.into(); - let key = get_key_for_col(DBColumn::BeaconStateSummary.into(), untyped_hash.as_bytes()); - let op = KeyValueStoreOp::PutKeyValue(key, summary.as_store_bytes()); - ops.push(op); -} - pub enum KeyValueStoreOp { PutKeyValue(Vec, Vec), DeleteKey(Vec), @@ -192,6 +169,11 @@ pub trait StoreItem: Sized { /// /// Return an instance of the type and the number of bytes that were read. fn from_store_bytes(bytes: &[u8]) -> Result; + + fn as_kv_store_op(&self, key: Hash256) -> KeyValueStoreOp { + let db_key = get_key_for_col(Self::db_column().into(), key.as_bytes()); + KeyValueStoreOp::PutKeyValue(db_key, self.as_store_bytes()) + } } #[cfg(test)] diff --git a/beacon_node/store/src/metrics.rs b/beacon_node/store/src/metrics.rs index 2f821f03332..826712a72c9 100644 --- a/beacon_node/store/src/metrics.rs +++ b/beacon_node/store/src/metrics.rs @@ -78,10 +78,6 @@ lazy_static! { "store_beacon_state_read_bytes_total", "Total number of beacon state bytes read from the DB" ); - pub static ref BEACON_STATE_WRITE_TIMES: Result = try_create_histogram( - "store_beacon_state_write_seconds", - "Total time required to write a BeaconState to the database" - ); pub static ref BEACON_STATE_WRITE_OVERHEAD_TIMES: Result = try_create_histogram( "store_beacon_state_write_overhead_seconds", "Overhead on writing a beacon state to the DB (e.g., encoding)" From 3fb2eef543b1f64bc2f9a183db73dc7fb2295d05 Mon Sep 17 00:00:00 2001 From: Adam Szkoda Date: Mon, 29 Jun 2020 13:39:41 +0200 Subject: [PATCH 9/9] Silence clippy warning --- beacon_node/store/src/lib.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index c3ae8dabdb6..f9f04b88407 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -104,6 +104,7 @@ pub trait ItemStore: KeyValueStore + Sync + Send + Sized + 'stati /// Reified key-value storage operation. Helps in modifying the storage atomically. /// See also https://github.com/sigp/lighthouse/issues/692 +#[allow(clippy::large_enum_variant)] pub enum StoreOp<'a, E: EthSpec> { PutBlock(SignedBeaconBlockHash, SignedBeaconBlock), PutState(BeaconStateHash, Cow<'a, BeaconState>),