Skip to content

Commit

Permalink
Migrate states to the freezer atomically
Browse files Browse the repository at this point in the history
  • Loading branch information
adaszko committed Jul 1, 2020
1 parent 536728b commit ec3d359
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 52 deletions.
3 changes: 2 additions & 1 deletion beacon_node/store/src/errors.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::chunked_vector::ChunkError;
use crate::hot_cold_store::HotColdDBError;
use ssz::DecodeError;
use types::{BeaconStateError, Hash256};
use types::{BeaconStateError, Hash256, Slot};

pub type Result<T> = std::result::Result<T, Error>;

Expand All @@ -16,6 +16,7 @@ pub enum Error {
RlpError(String),
BlockNotFound(Hash256),
NoContinuationData,
SplitPointModified(Slot, Slot),
}

impl From<DecodeError> for Error {
Expand Down
104 changes: 70 additions & 34 deletions beacon_node/store/src/hot_cold_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::{
};
use lru::LruCache;
use parking_lot::{Mutex, RwLock};
use slog::{debug, trace, warn, Logger};
use slog::{debug, error, trace, warn, Logger};
use ssz::{Decode, Encode};
use ssz_derive::{Decode, Encode};
use state_processing::{
Expand Down Expand Up @@ -676,13 +676,6 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
Ok(split)
}

/// Store the split point on disk.
fn store_split(&self) -> Result<(), Error> {
let key = Hash256::from_slice(SPLIT_DB_KEY.as_bytes());
self.hot_db.put(&key, &*self.split.read())?;
Ok(())
}

/// Load the state root of a restore point.
fn load_restore_point_hash(&self, restore_point_index: u64) -> Result<Hash256, Error> {
let key = Self::restore_point_key(restore_point_index);
Expand Down Expand Up @@ -717,13 +710,6 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
.map(|s: ColdStateSummary| s.slot))
}

/// Store the slot of a frozen state.
fn store_cold_state_slot(&self, state_root: &Hash256, slot: Slot) -> Result<(), Error> {
self.cold_db
.put(state_root, &ColdStateSummary { slot })
.map_err(Into::into)
}

/// Load a hot state's summary, given its root.
pub fn load_hot_state_summary(
&self,
Expand Down Expand Up @@ -778,7 +764,7 @@ pub fn process_finalization<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
// 0. Check that the migration is sensible.
// The new frozen head must increase the current split slot, and lie on an epoch
// boundary (in order for the hot state summary scheme to work).
let current_split_slot = store.get_split_slot();
let current_split_slot = store.split.read().slot;

if frozen_head.slot < current_split_slot {
return Err(HotColdDBError::FreezeSlotError {
Expand All @@ -792,45 +778,95 @@ pub fn process_finalization<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
return Err(HotColdDBError::FreezeSlotUnaligned(frozen_head.slot).into());
}

let mut hot_db_ops: Vec<StoreOp<E>> = Vec::new();

// 1. Copy all of the states between the head and the split slot, from the hot DB
// to the cold DB.
let state_root_iter = StateRootsIterator::new(store.clone(), frozen_head);

let mut to_delete = vec![];
for maybe_pair in state_root_iter.take_while(|result| match result {
Ok((_, slot)) => slot >= &current_split_slot,
Err(_) => true,
}) {
let (state_root, slot) = maybe_pair?;

let mut cold_db_ops: Vec<KeyValueStoreOp> = Vec::new();

if slot % store.config.slots_per_restore_point == 0 {
let state: BeaconState<E> = get_full_state(&store.hot_db, &state_root)?
.ok_or_else(|| HotColdDBError::MissingStateToFreeze(state_root))?;

let mut ops: Vec<KeyValueStoreOp> = Vec::new();
store.store_cold_state(&state_root, &state, &mut ops)?;
store.cold_db.do_atomically(ops)?;
store.store_cold_state(&state_root, &state, &mut cold_db_ops)?;
}

// Store a pointer from this state root to its slot, so we can later reconstruct states
// from their state root alone.
store.store_cold_state_slot(&state_root, slot)?;
let cold_state_summary = ColdStateSummary { slot };
let op = cold_state_summary.as_kv_store_op(state_root);
cold_db_ops.push(op);

// There are data dependencies between calls to `store_cold_state()` that prevent us from
// doing one big call to `store.cold_db.do_atomically()` at end of the loop.
store.cold_db.do_atomically(cold_db_ops)?;

// Delete the old summary, and the full state if we lie on an epoch boundary.
to_delete.push((state_root, slot));
}
hot_db_ops.push(StoreOp::DeleteState(state_root.into(), slot));
}

// Warning: Critical section. We have to take care not to put any of the two databases in an
// inconsistent state if the OS process dies at any point during the freezeing
// procedure.
//
// Since it is pretty much impossible to be atomic across more than one database, we trade
// losing track of states to delete, for consistency. In other words: We should be safe to die
// at any point below but it may happen that some states won't be deleted from the hot database
// and will remain there forever. Since dying in these particular few lines should be an
// exceedingly rare event, this should be an acceptable tradeoff.

// Flush to disk all the states that have just been migrated to the cold store.
store
.cold_db
.put_bytes_fsync("fsync", "fsync".as_bytes(), "fsync".as_bytes())?;

{
let mut split_guard = store.split.write();
let latest_split_slot = split_guard.slot;

// Detect a sitation where the split point is (erroneously) changed from more than one
// place in code.
if latest_split_slot != current_split_slot {
error!(
store.log,
"Race condition detected: Split point changed while moving states to the freezer";
"previous split slot" => current_split_slot,
"current split slot" => latest_split_slot,
);

// 2. Update the split slot
*store.split.write() = Split {
slot: frozen_head.slot,
state_root: frozen_head_root,
};
store.store_split()?;
// Assume the freezing procedure will be retried in case this happens.
return Err(Error::SplitPointModified(
current_split_slot,
latest_split_slot,
));
}

// Before updating the in-memory split value, we flush it to disk first, so that should the
// OS process die at this point, we pick up from the right place after a restart.
let split = Split {
slot: frozen_head.slot,
state_root: frozen_head_root,
};
store
.hot_db
.put_fsync(&Hash256::from_slice(SPLIT_DB_KEY.as_bytes()), &split)?;

// 3. Delete from the hot DB
for (state_root, slot) in to_delete {
store.delete_state(&state_root, slot)?;
// Split point is now persisted in the hot database on disk. The in-memory split point
// hasn't been modified elsewhere since we keep a write lock on it. It's safe to update
// the in-memory split point now.
*split_guard = split;
}

// Delete the states from the hot database if we got this far.
store.do_atomically(hot_db_ops)?;

debug!(
store.log,
"Freezer migration complete";
Expand All @@ -842,7 +878,7 @@ pub fn process_finalization<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(

/// Struct for storing the split slot and state root in the database.
#[derive(Debug, Clone, Copy, Default, Encode, Decode)]
struct Split {
pub struct Split {
slot: Slot,
state_root: Hash256,
}
Expand Down
52 changes: 36 additions & 16 deletions beacon_node/store/src/leveldb_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,45 @@ impl<E: EthSpec> LevelDB<E> {
fn write_options(&self) -> WriteOptions {
WriteOptions::new()
}

fn write_options_fsync(&self) -> WriteOptions {
let mut opts = WriteOptions::new();
opts.sync = true;
opts
}

fn put_bytes_with_options(
&self,
col: &str,
key: &[u8],
val: &[u8],
opts: WriteOptions,
) -> Result<(), Error> {
let column_key = get_key_for_col(col, key);

metrics::inc_counter(&metrics::DISK_DB_WRITE_COUNT);
metrics::inc_counter_by(&metrics::DISK_DB_WRITE_BYTES, val.len() as i64);
let timer = metrics::start_timer(&metrics::DISK_DB_WRITE_TIMES);

self.db
.put(opts, BytesKey::from_vec(column_key), val)
.map_err(Into::into)
.map(|()| {
metrics::stop_timer(timer);
})
}
}

impl<E: EthSpec> KeyValueStore<E> for LevelDB<E> {
/// Store some `value` in `column`, indexed with `key`.
fn put_bytes(&self, col: &str, key: &[u8], val: &[u8]) -> Result<(), Error> {
self.put_bytes_with_options(col, key, val, self.write_options())
}

fn put_bytes_fsync(&self, col: &str, key: &[u8], val: &[u8]) -> Result<(), Error> {
self.put_bytes_with_options(col, key, val, self.write_options_fsync())
}

/// Retrieve some bytes in `column` with `key`.
fn get_bytes(&self, col: &str, key: &[u8]) -> Result<Option<Vec<u8>>, Error> {
let column_key = get_key_for_col(col, key);
Expand All @@ -59,22 +95,6 @@ impl<E: EthSpec> KeyValueStore<E> for LevelDB<E> {
})
}

/// Store some `value` in `column`, indexed with `key`.
fn put_bytes(&self, col: &str, key: &[u8], val: &[u8]) -> Result<(), Error> {
let column_key = get_key_for_col(col, key);

metrics::inc_counter(&metrics::DISK_DB_WRITE_COUNT);
metrics::inc_counter_by(&metrics::DISK_DB_WRITE_BYTES, val.len() as i64);
let timer = metrics::start_timer(&metrics::DISK_DB_WRITE_TIMES);

self.db
.put(self.write_options(), BytesKey::from_vec(column_key), val)
.map_err(Into::into)
.map(|()| {
metrics::stop_timer(timer);
})
}

/// Return `true` if `key` exists in `column`.
fn key_exists(&self, col: &str, key: &[u8]) -> Result<bool, Error> {
let column_key = get_key_for_col(col, key);
Expand Down
13 changes: 12 additions & 1 deletion beacon_node/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub mod iter;
use std::borrow::Cow;

pub use self::config::StoreConfig;
pub use self::hot_cold_store::{HotColdDB, HotStateSummary};
pub use self::hot_cold_store::{HotColdDB, HotStateSummary, Split};
pub use self::leveldb_store::LevelDB;
pub use self::memory_store::MemoryStore;
pub use self::partial_beacon_state::PartialBeaconState;
Expand All @@ -43,6 +43,9 @@ pub trait KeyValueStore<E: EthSpec>: Sync + Send + Sized + 'static {
/// Store some `value` in `column`, indexed with `key`.
fn put_bytes(&self, column: &str, key: &[u8], value: &[u8]) -> Result<(), Error>;

/// Same as put_bytes() but also force a flush to disk
fn put_bytes_fsync(&self, column: &str, key: &[u8], value: &[u8]) -> Result<(), Error>;

/// Return `true` if `key` exists in `column`.
fn key_exists(&self, column: &str, key: &[u8]) -> Result<bool, Error>;

Expand Down Expand Up @@ -74,6 +77,14 @@ pub trait ItemStore<E: EthSpec>: KeyValueStore<E> + Sync + Send + Sized + 'stati
.map_err(Into::into)
}

fn put_fsync<I: StoreItem>(&self, key: &Hash256, item: &I) -> Result<(), Error> {
let column = I::db_column().into();
let key = key.as_bytes();

self.put_bytes_fsync(column, key, &item.as_store_bytes())
.map_err(Into::into)
}

/// Retrieve an item from `Self`.
fn get<I: StoreItem>(&self, key: &Hash256) -> Result<Option<I>, Error> {
let column = I::db_column().into();
Expand Down
4 changes: 4 additions & 0 deletions beacon_node/store/src/memory_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ impl<E: EthSpec> KeyValueStore<E> for MemoryStore<E> {
Ok(())
}

fn put_bytes_fsync(&self, col: &str, key: &[u8], val: &[u8]) -> Result<(), Error> {
self.put_bytes(col, key, val)
}

/// Return true if some key exists in some column.
fn key_exists(&self, col: &str, key: &[u8]) -> Result<bool, Error> {
let column_key = Self::get_key_for_col(col, key);
Expand Down

0 comments on commit ec3d359

Please sign in to comment.