diff --git a/Cargo.lock b/Cargo.lock index e3d29316cbe..35691ceada9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1055,6 +1055,7 @@ name = "boot_node" version = "5.3.0" dependencies = [ "beacon_node", + "bytes", "clap", "clap_utils", "eth2_network_config", @@ -5059,6 +5060,7 @@ name = "lighthouse_network" version = "0.2.0" dependencies = [ "alloy-primitives", + "alloy-rlp", "async-channel", "bytes", "delay_map", diff --git a/beacon_node/beacon_chain/src/migrate.rs b/beacon_node/beacon_chain/src/migrate.rs index f83df7b4468..cf98b1cb9d1 100644 --- a/beacon_node/beacon_chain/src/migrate.rs +++ b/beacon_node/beacon_chain/src/migrate.rs @@ -679,11 +679,9 @@ impl, Cold: ItemStore> BackgroundMigrator(&self) -> Result, &'static str> { - let bitfield_bytes: Vec = self + let bitfield_bytes: Bytes = self .get_decodable(ATTESTATION_BITFIELD_ENR_KEY) .ok_or("ENR attestation bitfield non-existent")? .map_err(|_| "Invalid RLP Encoding")?; @@ -57,7 +58,7 @@ impl Eth2Enr for Enr { fn sync_committee_bitfield( &self, ) -> Result, &'static str> { - let bitfield_bytes: Vec = self + let bitfield_bytes: Bytes = self .get_decodable(SYNC_COMMITTEE_BITFIELD_ENR_KEY) .ok_or("ENR sync committee bitfield non-existent")? .map_err(|_| "Invalid RLP Encoding")?; @@ -80,7 +81,7 @@ impl Eth2Enr for Enr { } fn eth2(&self) -> Result { - let eth2_bytes: Vec = self + let eth2_bytes: Bytes = self .get_decodable(ETH2_ENR_KEY) .ok_or("ENR has no eth2 field")? .map_err(|_| "Invalid RLP Encoding")?; @@ -234,17 +235,23 @@ pub fn build_enr( } // set the `eth2` field on our ENR - builder.add_value(ETH2_ENR_KEY, &enr_fork_id.as_ssz_bytes()); + builder.add_value::(ETH2_ENR_KEY, &enr_fork_id.as_ssz_bytes().into()); // set the "attnets" field on our ENR let bitfield = BitVector::::new(); - builder.add_value(ATTESTATION_BITFIELD_ENR_KEY, &bitfield.as_ssz_bytes()); + builder.add_value::( + ATTESTATION_BITFIELD_ENR_KEY, + &bitfield.as_ssz_bytes().into(), + ); // set the "syncnets" field on our ENR let bitfield = BitVector::::new(); - builder.add_value(SYNC_COMMITTEE_BITFIELD_ENR_KEY, &bitfield.as_ssz_bytes()); + builder.add_value::( + SYNC_COMMITTEE_BITFIELD_ENR_KEY, + &bitfield.as_ssz_bytes().into(), + ); // only set `csc` if PeerDAS fork epoch has been scheduled if spec.is_peer_das_scheduled() { @@ -275,16 +282,16 @@ fn compare_enr(local_enr: &Enr, disk_enr: &Enr) -> bool { && local_enr.quic4() == disk_enr.quic4() && local_enr.quic6() == disk_enr.quic6() // must match on the same fork - && local_enr.get_decodable::>(ETH2_ENR_KEY) == disk_enr.get_decodable(ETH2_ENR_KEY) + && local_enr.get_decodable::(ETH2_ENR_KEY) == disk_enr.get_decodable(ETH2_ENR_KEY) // take preference over disk udp port if one is not specified && (local_enr.udp4().is_none() || local_enr.udp4() == disk_enr.udp4()) && (local_enr.udp6().is_none() || local_enr.udp6() == disk_enr.udp6()) // we need the ATTESTATION_BITFIELD_ENR_KEY and SYNC_COMMITTEE_BITFIELD_ENR_KEY and // PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY key to match, otherwise we use a new ENR. This will // likely only be true for non-validating nodes. - && local_enr.get_decodable::>(ATTESTATION_BITFIELD_ENR_KEY) == disk_enr.get_decodable(ATTESTATION_BITFIELD_ENR_KEY) - && local_enr.get_decodable::>(SYNC_COMMITTEE_BITFIELD_ENR_KEY) == disk_enr.get_decodable(SYNC_COMMITTEE_BITFIELD_ENR_KEY) - && local_enr.get_decodable::>(PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY) == disk_enr.get_decodable(PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY) + && local_enr.get_decodable::(ATTESTATION_BITFIELD_ENR_KEY) == disk_enr.get_decodable(ATTESTATION_BITFIELD_ENR_KEY) + && local_enr.get_decodable::(SYNC_COMMITTEE_BITFIELD_ENR_KEY) == disk_enr.get_decodable(SYNC_COMMITTEE_BITFIELD_ENR_KEY) + && local_enr.get_decodable::(PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY) == disk_enr.get_decodable(PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY) } /// Loads enr from the given directory @@ -332,6 +339,14 @@ mod test { spec } + fn build_enr_with_config(config: NetworkConfig, spec: &ChainSpec) -> (Enr, CombinedKey) { + let keypair = libp2p::identity::secp256k1::Keypair::generate(); + let enr_key = CombinedKey::from_secp256k1(&keypair); + let enr_fork_id = EnrForkId::default(); + let enr = build_enr::(&enr_key, &config, &enr_fork_id, spec).unwrap(); + (enr, enr_key) + } + #[test] fn custody_subnet_count_default() { let config = NetworkConfig { @@ -363,11 +378,22 @@ mod test { ); } - fn build_enr_with_config(config: NetworkConfig, spec: &ChainSpec) -> (Enr, CombinedKey) { - let keypair = libp2p::identity::secp256k1::Keypair::generate(); - let enr_key = CombinedKey::from_secp256k1(&keypair); - let enr_fork_id = EnrForkId::default(); - let enr = build_enr::(&enr_key, &config, &enr_fork_id, spec).unwrap(); - (enr, enr_key) + #[test] + fn test_encode_decode_eth2_enr() { + let (enr, _key) = build_enr_with_config(NetworkConfig::default(), &E::default_spec()); + // Check all Eth2 Mappings are decodeable + enr.eth2().unwrap(); + enr.attestation_bitfield::().unwrap(); + enr.sync_committee_bitfield::().unwrap(); + } + + #[test] + fn test_eth2_enr_encodings() { + let enr_str = "enr:-Mm4QEX9fFRi1n4H3M9sGIgFQ6op1IysTU4Gz6tpIiOGRM1DbJtIih1KgGgv3Xl-oUlwco3HwdXsbYuXStBuNhUVIPoBh2F0dG5ldHOIAAAAAAAAAACDY3NjBIRldGgykI-3hTFgAAA4AOH1BQAAAACCaWSCdjSCaXCErBAADoRxdWljgiMpiXNlY3AyNTZrMaECph91xMyTVyE5MVj6lBpPgz6KP2--Kr9lPbo6_GjrfRKIc3luY25ldHMAg3RjcIIjKIN1ZHCCIyg"; + //let my_enr_str = "enr:-Ma4QM2I1AxBU116QcMV2wKVrSr5Nsko90gMVkstZO4APysQCEwJJJeuTvODKmv7fDsLhVFjrlidVNhBOxSZ8sZPbCWCCcqHYXR0bmV0c4gAAAAAAAAMAIRldGgykGqVoakEAAAA__________-CaWSCdjSCaXCEJq-HPYRxdWljgiMziXNlY3AyNTZrMaECMPAnmmHQpD1k6DuOxWVoFXBoTYY6Wuv9BP4lxauAlmiIc3luY25ldHMAg3RjcIIjMoN1ZHCCIzI"; + let enr = Enr::from_str(enr_str).unwrap(); + enr.eth2().unwrap(); + enr.attestation_bitfield::().unwrap(); + enr.sync_committee_bitfield::().unwrap(); } } diff --git a/beacon_node/lighthouse_network/src/discovery/mod.rs b/beacon_node/lighthouse_network/src/discovery/mod.rs index 3356dd3cf78..e1cea3153ac 100644 --- a/beacon_node/lighthouse_network/src/discovery/mod.rs +++ b/beacon_node/lighthouse_network/src/discovery/mod.rs @@ -15,6 +15,7 @@ pub use enr::{build_enr, load_enr_from_disk, use_or_load_enr, CombinedKey, Eth2E pub use enr_ext::{peer_id_to_node_id, CombinedKeyExt, EnrExt}; pub use libp2p::identity::{Keypair, PublicKey}; +use alloy_rlp::bytes::Bytes; use enr::{ATTESTATION_BITFIELD_ENR_KEY, ETH2_ENR_KEY, SYNC_COMMITTEE_BITFIELD_ENR_KEY}; use futures::prelude::*; use futures::stream::FuturesUnordered; @@ -512,9 +513,9 @@ impl Discovery { // insert the bitfield into the ENR record self.discv5 - .enr_insert( + .enr_insert::( ATTESTATION_BITFIELD_ENR_KEY, - ¤t_bitfield.as_ssz_bytes(), + ¤t_bitfield.as_ssz_bytes().into(), ) .map_err(|e| format!("{:?}", e))?; } @@ -546,9 +547,9 @@ impl Discovery { // insert the bitfield into the ENR record self.discv5 - .enr_insert( + .enr_insert::( SYNC_COMMITTEE_BITFIELD_ENR_KEY, - ¤t_bitfield.as_ssz_bytes(), + ¤t_bitfield.as_ssz_bytes().into(), ) .map_err(|e| format!("{:?}", e))?; } @@ -582,7 +583,7 @@ impl Discovery { let _ = self .discv5 - .enr_insert(ETH2_ENR_KEY, &enr_fork_id.as_ssz_bytes()) + .enr_insert::(ETH2_ENR_KEY, &enr_fork_id.as_ssz_bytes().into()) .map_err(|e| { warn!( self.log, @@ -1289,7 +1290,10 @@ mod tests { bitfield.set(id, true).unwrap(); } - builder.add_value(ATTESTATION_BITFIELD_ENR_KEY, &bitfield.as_ssz_bytes()); + builder.add_value::( + ATTESTATION_BITFIELD_ENR_KEY, + &bitfield.as_ssz_bytes().into(), + ); builder.build(&enr_key).unwrap() } diff --git a/beacon_node/store/src/database/interface.rs b/beacon_node/store/src/database/interface.rs index ce02fbde6bb..ba491cc4101 100644 --- a/beacon_node/store/src/database/interface.rs +++ b/beacon_node/store/src/database/interface.rs @@ -101,6 +101,17 @@ impl KeyValueStore for BeaconNodeBackend { } } + fn do_atomically_for_col(&self, col: &str, batch: Vec) -> Result<(), Error> { + match self { + #[cfg(feature = "leveldb")] + BeaconNodeBackend::LevelDb(txn) => { + leveldb_impl::LevelDB::do_atomically_for_col(txn, col, batch) + } + #[cfg(feature = "redb")] + BeaconNodeBackend::Redb(txn) => redb_impl::Redb::do_atomically_for_col(txn, col, batch), + } + } + fn do_atomically(&self, batch: Vec) -> Result<(), Error> { match self { #[cfg(feature = "leveldb")] @@ -110,6 +121,19 @@ impl KeyValueStore for BeaconNodeBackend { } } + fn extract_if( + &self, + _col: &str, + _ops: std::collections::HashSet<&[u8]>, + ) -> Result<(), crate::Error> { + match self { + #[cfg(feature = "leveldb")] + BeaconNodeBackend::LevelDb(txn) => leveldb_impl::LevelDB::extract_if(txn, _col, _ops), + #[cfg(feature = "redb")] + BeaconNodeBackend::Redb(txn) => redb_impl::Redb::extract_if(txn, _col, _ops), + } + } + fn begin_rw_transaction(&self) -> parking_lot::MutexGuard<()> { match self { #[cfg(feature = "leveldb")] diff --git a/beacon_node/store/src/database/leveldb_impl.rs b/beacon_node/store/src/database/leveldb_impl.rs index 1d706592e69..10e353283b2 100644 --- a/beacon_node/store/src/database/leveldb_impl.rs +++ b/beacon_node/store/src/database/leveldb_impl.rs @@ -1,7 +1,8 @@ use crate::hot_cold_store::{BytesKey, HotColdDBError}; use crate::Key; use crate::{ - get_key_for_col, metrics, ColumnIter, ColumnKeyIter, DBColumn, Error, KeyValueStoreOp, + errors::Error as DBError, get_key_for_col, metrics, ColumnIter, ColumnKeyIter, DBColumn, Error, + KeyValueStoreOp, }; use leveldb::{ compaction::Compaction, @@ -14,6 +15,7 @@ use leveldb::{ options::{Options, ReadOptions}, }; use parking_lot::{Mutex, MutexGuard}; +use std::collections::HashSet; use std::marker::PhantomData; use std::path::Path; use types::{EthSpec, FixedBytesExtended, Hash256}; @@ -145,6 +147,64 @@ impl LevelDB { .map_err(Into::into) } + pub fn extract_if(&self, col: &str, ops: HashSet<&[u8]>) -> Result<(), Error> { + let mut leveldb_batch = Writebatch::new(); + for op in ops { + let column_key = get_key_for_col(col, op); + leveldb_batch.delete(BytesKey::from_vec(column_key)); + } + self.db.write(self.write_options().into(), &leveldb_batch)?; + Ok(()) + } + + pub fn do_atomically_for_col( + &self, + col: &str, + ops_batch: Vec, + ) -> Result<(), Error> { + let mut leveldb_batch = Writebatch::new(); + for op in ops_batch { + match op { + KeyValueStoreOp::PutKeyValue(column, key, value) => { + if col != column { + return Err(DBError::DBError { + message: format!( + "Attempted to mutate unexpected column: {}. Expected: {}, ", + column, col + ), + }); + } + let _timer = metrics::start_timer(&metrics::DISK_DB_WRITE_TIMES); + metrics::inc_counter_vec_by( + &metrics::DISK_DB_WRITE_BYTES, + &[&column], + value.len() as u64, + ); + metrics::inc_counter_vec(&metrics::DISK_DB_WRITE_COUNT, &[&column]); + let column_key = get_key_for_col(&column, &key); + leveldb_batch.put(BytesKey::from_vec(column_key), &value); + } + + KeyValueStoreOp::DeleteKey(column, key) => { + if col != column { + return Err(DBError::DBError { + message: format!( + "Attempted to mutate unexpected column: {}. Expected: {}, ", + column, col + ), + }); + } + let _timer = metrics::start_timer(&metrics::DISK_DB_DELETE_TIMES); + metrics::inc_counter_vec(&metrics::DISK_DB_DELETE_COUNT, &[&column]); + let column_key = get_key_for_col(&column, &key); + leveldb_batch.delete(BytesKey::from_vec(column_key)); + } + } + } + self.db.write(self.write_options().into(), &leveldb_batch)?; + Ok(()) + } + pub fn do_atomically(&self, ops_batch: Vec) -> Result<(), Error> { let mut leveldb_batch = Writebatch::new(); for op in ops_batch { diff --git a/beacon_node/store/src/database/redb_impl.rs b/beacon_node/store/src/database/redb_impl.rs index 3643e704127..d506013b332 100644 --- a/beacon_node/store/src/database/redb_impl.rs +++ b/beacon_node/store/src/database/redb_impl.rs @@ -1,7 +1,8 @@ +use crate::{errors::Error as DBError, DBColumn, Error, KeyValueStoreOp}; use crate::{metrics, ColumnIter, ColumnKeyIter, Key}; -use crate::{DBColumn, Error, KeyValueStoreOp}; use parking_lot::{Mutex, MutexGuard, RwLock}; use redb::TableDefinition; +use std::collections::HashSet; use std::{borrow::BorrowMut, marker::PhantomData, path::Path}; use strum::IntoEnumIterator; use types::EthSpec; @@ -33,7 +34,9 @@ impl Redb { } else { path.to_path_buf() }; - let db = redb::Database::create(path)?; + let db = redb::Builder::new() + .set_cache_size(2 * 1024 * 1024 * 1024) + .create(path)?; let transaction_mutex = Mutex::new(()); for column in DBColumn::iter() { @@ -163,6 +166,74 @@ impl Redb { tx.commit().map_err(Into::into) } + pub fn extract_if(&self, col: &str, ops: HashSet<&[u8]>) -> Result<(), Error> { + let open_db = self.db.read(); + let mut tx = open_db.begin_write()?; + + tx.set_durability(redb::Durability::None); + + let table_definition: TableDefinition<'_, &[u8], &[u8]> = TableDefinition::new(col); + + let mut table = tx.open_table(table_definition)?; + + table.retain(|key, _| !ops.contains(key))?; + + drop(table); + tx.commit()?; + Ok(()) + } + + pub fn do_atomically_for_col( + &self, + col: &str, + ops_batch: Vec, + ) -> Result<(), Error> { + let open_db = self.db.read(); + let mut tx = open_db.begin_write()?; + tx.set_durability(redb::Durability::None); + let table_definition: TableDefinition<'_, &[u8], &[u8]> = TableDefinition::new(col); + let mut table = tx.open_table(table_definition)?; + + for op in ops_batch { + match op { + KeyValueStoreOp::PutKeyValue(column, key, value) => { + if col != column { + return Err(DBError::DBError { + message: format!( + "Attempted to mutate unexpected column: {}. Expected: {}, ", + column, col + ), + }); + } + let _timer = metrics::start_timer(&metrics::DISK_DB_WRITE_TIMES); + metrics::inc_counter_vec_by( + &metrics::DISK_DB_WRITE_BYTES, + &[&column], + value.len() as u64, + ); + metrics::inc_counter_vec(&metrics::DISK_DB_WRITE_COUNT, &[&column]); + table.insert(key.as_slice(), value.as_slice())?; + } + KeyValueStoreOp::DeleteKey(column, key) => { + if col != column { + return Err(DBError::DBError { + message: format!( + "Attempted to mutate unexpected column: {}. Expected: {}, ", + column, col + ), + }); + } + metrics::inc_counter_vec(&metrics::DISK_DB_DELETE_COUNT, &[&column]); + let _timer = metrics::start_timer(&metrics::DISK_DB_DELETE_TIMES); + table.remove(key.as_slice())?; + } + } + } + drop(table); + tx.commit()?; + Ok(()) + } + pub fn do_atomically(&self, ops_batch: Vec) -> Result<(), Error> { let open_db = self.db.read(); let mut tx = open_db.begin_write()?; diff --git a/beacon_node/store/src/garbage_collection.rs b/beacon_node/store/src/garbage_collection.rs index 75c24cd7682..b5b1e4cf00e 100644 --- a/beacon_node/store/src/garbage_collection.rs +++ b/beacon_node/store/src/garbage_collection.rs @@ -1,7 +1,7 @@ //! Garbage collection process that runs at start-up to clean up the database. use crate::database::interface::BeaconNodeBackend; use crate::hot_cold_store::HotColdDB; -use crate::{Error, StoreOp}; +use crate::{DBColumn, Error}; use slog::debug; use types::EthSpec; @@ -18,22 +18,33 @@ where /// Delete the temporary states that were leftover by failed block imports. pub fn delete_temp_states(&self) -> Result<(), Error> { - let delete_ops = - self.iter_temporary_state_roots()? - .try_fold(vec![], |mut ops, state_root| { - let state_root = state_root?; - ops.push(StoreOp::DeleteState(state_root, None)); - ops.push(StoreOp::DeleteStateTemporaryFlag(state_root)); - Result::<_, Error>::Ok(ops) - })?; - - if !delete_ops.is_empty() { + let mut ops = vec![]; + // let mut delete_state_ops = vec![]; + // let mut delete_summary_ops = vec![]; + // let mut delete_temporary_state_ops = vec![]; + let mut delete_states = false; + self.iter_temporary_state_roots()?.for_each(|state_root| { + if let Ok(state_root) = state_root { + ops.push(state_root); + delete_states = true + } + }); + if delete_states { debug!( self.log, "Garbage collecting {} temporary states", - delete_ops.len() / 2 + ops.len() ); - self.do_atomically_with_block_and_blobs_cache(delete_ops)?; + let state_col: &str = DBColumn::BeaconState.into(); + let summary_col: &str = DBColumn::BeaconStateSummary.into(); + let temp_state_col: &str = DBColumn::BeaconStateTemporary.into(); + // self.do_atomically_for_garbage_collection(state_col, delete_state_ops)?; + // self.do_atomically_for_garbage_collection(summary_col, delete_summary_ops)?; + // self.do_atomically_for_garbage_collection(temp_state_col, delete_temporary_state_ops)?; + + self.extract_if(state_col, ops.clone())?; + self.extract_if(summary_col, ops.clone())?; + self.extract_if(temp_state_col, ops)?; } Ok(()) diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 75caadc6c7e..1afac0a63c9 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -1010,7 +1010,7 @@ impl, Cold: ItemStore> HotColdDB /// (which are frozen, and won't be deleted), or valid descendents of the finalized checkpoint /// (which will be deleted by this function but shouldn't be). pub fn delete_state(&self, state_root: &Hash256, slot: Slot) -> Result<(), Error> { - self.do_atomically_with_block_and_blobs_cache(vec![StoreOp::DeleteState( + self.do_atomically_with_block_and_blobs_cache(vec![StoreOp::DeleteStateAndSummary( *state_root, Some(slot), )]) @@ -1183,7 +1183,23 @@ impl, Cold: ItemStore> HotColdDB } } - StoreOp::DeleteState(state_root, slot) => { + StoreOp::DeleteSummary(state_root) => { + let column_name: &str = DBColumn::BeaconStateSummary.into(); + key_value_batch.push(KeyValueStoreOp::DeleteKey( + column_name.to_owned(), + state_root.as_slice().to_vec(), + )); + } + + StoreOp::DeleteState(state_root) => { + let column_name: &str = DBColumn::BeaconState.into(); + key_value_batch.push(KeyValueStoreOp::DeleteKey( + column_name.to_owned(), + state_root.as_slice().to_vec(), + )); + } + + StoreOp::DeleteStateAndSummary(state_root, slot) => { let column_name: &str = DBColumn::BeaconStateSummary.into(); key_value_batch.push(KeyValueStoreOp::DeleteKey( column_name.to_owned(), @@ -1223,6 +1239,25 @@ impl, Cold: ItemStore> HotColdDB Ok(key_value_batch) } + pub fn extract_if(&self, col: &str, ops: Vec) -> Result<(), Error> { + let new_ops = ops.iter().map(|v| v.as_slice()).collect(); + + self.hot_db.extract_if(col, new_ops) + } + + pub fn do_atomically_for_garbage_collection( + &self, + col: &str, + batch: Vec>, + ) -> Result<(), Error> { + match self.convert_to_kv_batch(batch) { + Ok(kv_store_ops) => self.hot_db.do_atomically_for_col(col, kv_store_ops)?, + Err(e) => return Err(e), + }; + + Ok(()) + } + pub fn do_atomically_with_block_and_blobs_cache( &self, batch: Vec>, @@ -1350,10 +1385,16 @@ impl, Cold: ItemStore> HotColdDB self.state_cache.lock().delete_block_states(&block_root); } - StoreOp::DeleteState(state_root, _) => { + StoreOp::DeleteStateAndSummary(state_root, _) => { self.state_cache.lock().delete_state(&state_root) } + StoreOp::DeleteState(state_root) => { + self.state_cache.lock().delete_state(&state_root) + } + + StoreOp::DeleteSummary(_) => (), + StoreOp::DeleteBlobs(_) => (), StoreOp::DeleteDataColumns(_, _) => (), @@ -2955,7 +2996,10 @@ impl, Cold: ItemStore> HotColdDB "slot" => summary.slot, "reason" => reason, ); - state_delete_batch.push(StoreOp::DeleteState(state_root, Some(summary.slot))); + state_delete_batch.push(StoreOp::DeleteStateAndSummary( + state_root, + Some(summary.slot), + )); } } } @@ -3052,7 +3096,7 @@ pub fn migrate_database, Cold: ItemStore>( } // Delete the old summary, and the full state if we lie on an epoch boundary. - hot_db_ops.push(StoreOp::DeleteState(state_root, Some(slot))); + hot_db_ops.push(StoreOp::DeleteStateAndSummary(state_root, Some(slot))); // Store the block root for this slot in the linear array of frozen block roots. block_root_writer.set(slot.as_usize(), block_root, &mut cold_db_ops)?; diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index 96cc020a8ad..d6db63f5aa9 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -38,6 +38,7 @@ pub use impls::beacon_state::StorageContainer as BeaconStateStorageContainer; pub use metadata::AnchorInfo; pub use metrics::scrape_for_metrics; use parking_lot::MutexGuard; +use std::collections::HashSet; use std::sync::Arc; use strum::{EnumIter, EnumString, IntoStaticStr}; pub use types::*; @@ -72,6 +73,9 @@ pub trait KeyValueStore: Sync + Send + Sized + 'static { /// Removes `key` from `column`. fn key_delete(&self, column: &str, key: &[u8]) -> Result<(), Error>; + /// Execute either all of the operations in `batch` for a given `col` or none at all, returning an error. + fn do_atomically_for_col(&self, col: &str, batch: Vec) -> Result<(), Error>; + /// Execute either all of the operations in `batch` or none at all, returning an error. fn do_atomically(&self, batch: Vec) -> Result<(), Error>; @@ -119,6 +123,8 @@ pub trait KeyValueStore: Sync + Send + Sized + 'static { /// Iterate through all keys in a particular column. fn iter_column_keys_from(&self, column: DBColumn, from: &[u8]) -> ColumnKeyIter; + + fn extract_if(&self, _: &str, ops: HashSet<&[u8]>) -> Result<(), Error>; } pub trait Key: Sized + 'static { @@ -247,9 +253,12 @@ pub enum StoreOp<'a, E: EthSpec> { DeleteBlock(Hash256), DeleteBlobs(Hash256), DeleteDataColumns(Hash256, Vec), - DeleteState(Hash256, Option), + // Delete Summary and delete state if state is on an epoch boundary + DeleteStateAndSummary(Hash256, Option), + DeleteState(Hash256), DeleteExecutionPayload(Hash256), DeleteSyncCommitteeBranch(Hash256), + DeleteSummary(Hash256), KeyValueOp(KeyValueStoreOp), } diff --git a/beacon_node/store/src/memory_store.rs b/beacon_node/store/src/memory_store.rs index 170856ca09f..682d891999d 100644 --- a/beacon_node/store/src/memory_store.rs +++ b/beacon_node/store/src/memory_store.rs @@ -1,9 +1,9 @@ use crate::{ - get_key_for_col, hot_cold_store::BytesKey, ColumnIter, ColumnKeyIter, DBColumn, Error, - ItemStore, Key, KeyValueStore, KeyValueStoreOp, + errors::Error as DBError, get_key_for_col, hot_cold_store::BytesKey, ColumnIter, ColumnKeyIter, + DBColumn, Error, ItemStore, Key, KeyValueStore, KeyValueStoreOp, }; use parking_lot::{Mutex, MutexGuard, RwLock}; -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashSet}; use std::marker::PhantomData; use types::*; @@ -63,6 +63,51 @@ impl KeyValueStore for MemoryStore { Ok(()) } + // TODO(modularize-backend) extract if impl + fn extract_if(&self, col: &str, ops: HashSet<&[u8]>) -> Result<(), DBError> { + for op in ops { + let column_key = get_key_for_col(col, op); + self.db.write().remove(&BytesKey::from_vec(column_key)); + } + Ok(()) + } + + // TODO(modularize-backend) do atomcally for col impl + fn do_atomically_for_col(&self, col: &str, batch: Vec) -> Result<(), Error> { + for op in batch { + match op { + KeyValueStoreOp::PutKeyValue(column, key, value) => { + if col != column { + return Err(DBError::DBError { + message: format!( + "Attempted to mutate unexpected column: {}. Expected: {}, ", + column, col + ), + }); + } + let column_key = get_key_for_col(&column, &key); + self.db + .write() + .insert(BytesKey::from_vec(column_key), value); + } + + KeyValueStoreOp::DeleteKey(column, key) => { + if col != column { + return Err(DBError::DBError { + message: format!( + "Attempted to mutate unexpected column: {}. Expected: {}, ", + column, col + ), + }); + } + let column_key = get_key_for_col(&column, &key); + self.db.write().remove(&BytesKey::from_vec(column_key)); + } + } + } + Ok(()) + } + fn do_atomically(&self, batch: Vec) -> Result<(), Error> { for op in batch { match op { diff --git a/boot_node/Cargo.toml b/boot_node/Cargo.toml index 46ccd4566be..76d41ae11a8 100644 --- a/boot_node/Cargo.toml +++ b/boot_node/Cargo.toml @@ -21,3 +21,4 @@ slog-scope = "4.3.0" hex = { workspace = true } serde = { workspace = true } eth2_network_config = { workspace = true } +bytes = { workspace = true } diff --git a/boot_node/src/config.rs b/boot_node/src/config.rs index aaa9f084826..bb7678631fd 100644 --- a/boot_node/src/config.rs +++ b/boot_node/src/config.rs @@ -1,4 +1,5 @@ use beacon_node::{get_data_dir, set_network_config}; +use bytes::Bytes; use clap::ArgMatches; use eth2_network_config::Eth2NetworkConfig; use lighthouse_network::discv5::{self, enr::CombinedKey, Enr}; @@ -152,7 +153,7 @@ impl BootNodeConfig { // If we know of the ENR field, add it to the initial construction if let Some(enr_fork_bytes) = enr_fork { - builder.add_value("eth2", &enr_fork_bytes); + builder.add_value::("eth2", &enr_fork_bytes.into()); } builder .build(&local_key)