Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redb optimize temp state cleanuop #8

Open
wants to merge 22 commits into
base: modularize-beacon-node-backend
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 3 additions & 5 deletions beacon_node/beacon_chain/src/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -679,11 +679,9 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
StoreOp::DeleteSyncCommitteeBranch(block_root),
]
})
.chain(
abandoned_states
.into_iter()
.map(|(slot, state_hash)| StoreOp::DeleteState(state_hash.into(), Some(slot))),
)
.chain(abandoned_states.into_iter().map(|(slot, state_hash)| {
StoreOp::DeleteStateAndSummary(state_hash.into(), Some(slot))
}))
.collect();

// Persist the head in case the process is killed or crashes here. This prevents
Expand Down
1 change: 1 addition & 0 deletions beacon_node/lighthouse_network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ delay_map = { workspace = true }
bytes = { workspace = true }
either = { workspace = true }
itertools = { workspace = true }
alloy-rlp = { workspace = true }

# Local dependencies
void = "1.0.2"
Expand Down
58 changes: 42 additions & 16 deletions beacon_node/lighthouse_network/src/discovery/enr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use super::enr_ext::CombinedKeyExt;
use super::ENR_FILENAME;
use crate::types::{Enr, EnrAttestationBitfield, EnrSyncCommitteeBitfield};
use crate::NetworkConfig;
use alloy_rlp::bytes::Bytes;
use libp2p::identity::Keypair;
use slog::{debug, warn};
use ssz::{Decode, Encode};
Expand Down Expand Up @@ -45,7 +46,7 @@ pub trait Eth2Enr {

impl Eth2Enr for Enr {
fn attestation_bitfield<E: EthSpec>(&self) -> Result<EnrAttestationBitfield<E>, &'static str> {
let bitfield_bytes: Vec<u8> = self
let bitfield_bytes: Bytes = self
.get_decodable(ATTESTATION_BITFIELD_ENR_KEY)
.ok_or("ENR attestation bitfield non-existent")?
.map_err(|_| "Invalid RLP Encoding")?;
Expand All @@ -57,7 +58,7 @@ impl Eth2Enr for Enr {
fn sync_committee_bitfield<E: EthSpec>(
&self,
) -> Result<EnrSyncCommitteeBitfield<E>, &'static str> {
let bitfield_bytes: Vec<u8> = self
let bitfield_bytes: Bytes = self
.get_decodable(SYNC_COMMITTEE_BITFIELD_ENR_KEY)
.ok_or("ENR sync committee bitfield non-existent")?
.map_err(|_| "Invalid RLP Encoding")?;
Expand All @@ -80,7 +81,7 @@ impl Eth2Enr for Enr {
}

fn eth2(&self) -> Result<EnrForkId, &'static str> {
let eth2_bytes: Vec<u8> = self
let eth2_bytes: Bytes = self
.get_decodable(ETH2_ENR_KEY)
.ok_or("ENR has no eth2 field")?
.map_err(|_| "Invalid RLP Encoding")?;
Expand Down Expand Up @@ -234,17 +235,23 @@ pub fn build_enr<E: EthSpec>(
}

// set the `eth2` field on our ENR
builder.add_value(ETH2_ENR_KEY, &enr_fork_id.as_ssz_bytes());
builder.add_value::<Bytes>(ETH2_ENR_KEY, &enr_fork_id.as_ssz_bytes().into());

// set the "attnets" field on our ENR
let bitfield = BitVector::<E::SubnetBitfieldLength>::new();

builder.add_value(ATTESTATION_BITFIELD_ENR_KEY, &bitfield.as_ssz_bytes());
builder.add_value::<Bytes>(
ATTESTATION_BITFIELD_ENR_KEY,
&bitfield.as_ssz_bytes().into(),
);

// set the "syncnets" field on our ENR
let bitfield = BitVector::<E::SyncCommitteeSubnetCount>::new();

builder.add_value(SYNC_COMMITTEE_BITFIELD_ENR_KEY, &bitfield.as_ssz_bytes());
builder.add_value::<Bytes>(
SYNC_COMMITTEE_BITFIELD_ENR_KEY,
&bitfield.as_ssz_bytes().into(),
);

// only set `csc` if PeerDAS fork epoch has been scheduled
if spec.is_peer_das_scheduled() {
Expand Down Expand Up @@ -275,16 +282,16 @@ fn compare_enr(local_enr: &Enr, disk_enr: &Enr) -> bool {
&& local_enr.quic4() == disk_enr.quic4()
&& local_enr.quic6() == disk_enr.quic6()
// must match on the same fork
&& local_enr.get_decodable::<Vec<u8>>(ETH2_ENR_KEY) == disk_enr.get_decodable(ETH2_ENR_KEY)
&& local_enr.get_decodable::<Bytes>(ETH2_ENR_KEY) == disk_enr.get_decodable(ETH2_ENR_KEY)
// take preference over disk udp port if one is not specified
&& (local_enr.udp4().is_none() || local_enr.udp4() == disk_enr.udp4())
&& (local_enr.udp6().is_none() || local_enr.udp6() == disk_enr.udp6())
// we need the ATTESTATION_BITFIELD_ENR_KEY and SYNC_COMMITTEE_BITFIELD_ENR_KEY and
// PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY key to match, otherwise we use a new ENR. This will
// likely only be true for non-validating nodes.
&& local_enr.get_decodable::<Vec<u8>>(ATTESTATION_BITFIELD_ENR_KEY) == disk_enr.get_decodable(ATTESTATION_BITFIELD_ENR_KEY)
&& local_enr.get_decodable::<Vec<u8>>(SYNC_COMMITTEE_BITFIELD_ENR_KEY) == disk_enr.get_decodable(SYNC_COMMITTEE_BITFIELD_ENR_KEY)
&& local_enr.get_decodable::<Vec<u8>>(PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY) == disk_enr.get_decodable(PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY)
&& local_enr.get_decodable::<Bytes>(ATTESTATION_BITFIELD_ENR_KEY) == disk_enr.get_decodable(ATTESTATION_BITFIELD_ENR_KEY)
&& local_enr.get_decodable::<Bytes>(SYNC_COMMITTEE_BITFIELD_ENR_KEY) == disk_enr.get_decodable(SYNC_COMMITTEE_BITFIELD_ENR_KEY)
&& local_enr.get_decodable::<Bytes>(PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY) == disk_enr.get_decodable(PEERDAS_CUSTODY_SUBNET_COUNT_ENR_KEY)
}

/// Loads enr from the given directory
Expand Down Expand Up @@ -332,6 +339,14 @@ mod test {
spec
}

fn build_enr_with_config(config: NetworkConfig, spec: &ChainSpec) -> (Enr, CombinedKey) {
let keypair = libp2p::identity::secp256k1::Keypair::generate();
let enr_key = CombinedKey::from_secp256k1(&keypair);
let enr_fork_id = EnrForkId::default();
let enr = build_enr::<E>(&enr_key, &config, &enr_fork_id, spec).unwrap();
(enr, enr_key)
}

#[test]
fn custody_subnet_count_default() {
let config = NetworkConfig {
Expand Down Expand Up @@ -363,11 +378,22 @@ mod test {
);
}

fn build_enr_with_config(config: NetworkConfig, spec: &ChainSpec) -> (Enr, CombinedKey) {
let keypair = libp2p::identity::secp256k1::Keypair::generate();
let enr_key = CombinedKey::from_secp256k1(&keypair);
let enr_fork_id = EnrForkId::default();
let enr = build_enr::<E>(&enr_key, &config, &enr_fork_id, spec).unwrap();
(enr, enr_key)
#[test]
fn test_encode_decode_eth2_enr() {
let (enr, _key) = build_enr_with_config(NetworkConfig::default(), &E::default_spec());
// Check all Eth2 Mappings are decodeable
enr.eth2().unwrap();
enr.attestation_bitfield::<MainnetEthSpec>().unwrap();
enr.sync_committee_bitfield::<MainnetEthSpec>().unwrap();
}

#[test]
fn test_eth2_enr_encodings() {
let enr_str = "enr:-Mm4QEX9fFRi1n4H3M9sGIgFQ6op1IysTU4Gz6tpIiOGRM1DbJtIih1KgGgv3Xl-oUlwco3HwdXsbYuXStBuNhUVIPoBh2F0dG5ldHOIAAAAAAAAAACDY3NjBIRldGgykI-3hTFgAAA4AOH1BQAAAACCaWSCdjSCaXCErBAADoRxdWljgiMpiXNlY3AyNTZrMaECph91xMyTVyE5MVj6lBpPgz6KP2--Kr9lPbo6_GjrfRKIc3luY25ldHMAg3RjcIIjKIN1ZHCCIyg";
//let my_enr_str = "enr:-Ma4QM2I1AxBU116QcMV2wKVrSr5Nsko90gMVkstZO4APysQCEwJJJeuTvODKmv7fDsLhVFjrlidVNhBOxSZ8sZPbCWCCcqHYXR0bmV0c4gAAAAAAAAMAIRldGgykGqVoakEAAAA__________-CaWSCdjSCaXCEJq-HPYRxdWljgiMziXNlY3AyNTZrMaECMPAnmmHQpD1k6DuOxWVoFXBoTYY6Wuv9BP4lxauAlmiIc3luY25ldHMAg3RjcIIjMoN1ZHCCIzI";
let enr = Enr::from_str(enr_str).unwrap();
enr.eth2().unwrap();
enr.attestation_bitfield::<MainnetEthSpec>().unwrap();
enr.sync_committee_bitfield::<MainnetEthSpec>().unwrap();
}
}
16 changes: 10 additions & 6 deletions beacon_node/lighthouse_network/src/discovery/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub use enr::{build_enr, load_enr_from_disk, use_or_load_enr, CombinedKey, Eth2E
pub use enr_ext::{peer_id_to_node_id, CombinedKeyExt, EnrExt};
pub use libp2p::identity::{Keypair, PublicKey};

use alloy_rlp::bytes::Bytes;
use enr::{ATTESTATION_BITFIELD_ENR_KEY, ETH2_ENR_KEY, SYNC_COMMITTEE_BITFIELD_ENR_KEY};
use futures::prelude::*;
use futures::stream::FuturesUnordered;
Expand Down Expand Up @@ -512,9 +513,9 @@ impl<E: EthSpec> Discovery<E> {

// insert the bitfield into the ENR record
self.discv5
.enr_insert(
.enr_insert::<Bytes>(
ATTESTATION_BITFIELD_ENR_KEY,
&current_bitfield.as_ssz_bytes(),
&current_bitfield.as_ssz_bytes().into(),
)
.map_err(|e| format!("{:?}", e))?;
}
Expand Down Expand Up @@ -546,9 +547,9 @@ impl<E: EthSpec> Discovery<E> {

// insert the bitfield into the ENR record
self.discv5
.enr_insert(
.enr_insert::<Bytes>(
SYNC_COMMITTEE_BITFIELD_ENR_KEY,
&current_bitfield.as_ssz_bytes(),
&current_bitfield.as_ssz_bytes().into(),
)
.map_err(|e| format!("{:?}", e))?;
}
Expand Down Expand Up @@ -582,7 +583,7 @@ impl<E: EthSpec> Discovery<E> {

let _ = self
.discv5
.enr_insert(ETH2_ENR_KEY, &enr_fork_id.as_ssz_bytes())
.enr_insert::<Bytes>(ETH2_ENR_KEY, &enr_fork_id.as_ssz_bytes().into())
.map_err(|e| {
warn!(
self.log,
Expand Down Expand Up @@ -1289,7 +1290,10 @@ mod tests {
bitfield.set(id, true).unwrap();
}

builder.add_value(ATTESTATION_BITFIELD_ENR_KEY, &bitfield.as_ssz_bytes());
builder.add_value::<Bytes>(
ATTESTATION_BITFIELD_ENR_KEY,
&bitfield.as_ssz_bytes().into(),
);
builder.build(&enr_key).unwrap()
}

Expand Down
24 changes: 24 additions & 0 deletions beacon_node/store/src/database/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,17 @@ impl<E: EthSpec> KeyValueStore<E> for BeaconNodeBackend<E> {
}
}

fn do_atomically_for_col(&self, col: &str, batch: Vec<KeyValueStoreOp>) -> Result<(), Error> {
match self {
#[cfg(feature = "leveldb")]
BeaconNodeBackend::LevelDb(txn) => {
leveldb_impl::LevelDB::do_atomically_for_col(txn, col, batch)
}
#[cfg(feature = "redb")]
BeaconNodeBackend::Redb(txn) => redb_impl::Redb::do_atomically_for_col(txn, col, batch),
}
}

fn do_atomically(&self, batch: Vec<KeyValueStoreOp>) -> Result<(), Error> {
match self {
#[cfg(feature = "leveldb")]
Expand All @@ -110,6 +121,19 @@ impl<E: EthSpec> KeyValueStore<E> for BeaconNodeBackend<E> {
}
}

fn extract_if(
&self,
_col: &str,
_ops: std::collections::HashSet<&[u8]>,
) -> Result<(), crate::Error> {
match self {
#[cfg(feature = "leveldb")]
BeaconNodeBackend::LevelDb(txn) => leveldb_impl::LevelDB::extract_if(txn, _col, _ops),
#[cfg(feature = "redb")]
BeaconNodeBackend::Redb(txn) => redb_impl::Redb::extract_if(txn, _col, _ops),
}
}

fn begin_rw_transaction(&self) -> parking_lot::MutexGuard<()> {
match self {
#[cfg(feature = "leveldb")]
Expand Down
62 changes: 61 additions & 1 deletion beacon_node/store/src/database/leveldb_impl.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::hot_cold_store::{BytesKey, HotColdDBError};
use crate::Key;
use crate::{
get_key_for_col, metrics, ColumnIter, ColumnKeyIter, DBColumn, Error, KeyValueStoreOp,
errors::Error as DBError, get_key_for_col, metrics, ColumnIter, ColumnKeyIter, DBColumn, Error,
KeyValueStoreOp,
};
use leveldb::{
compaction::Compaction,
Expand All @@ -14,6 +15,7 @@ use leveldb::{
options::{Options, ReadOptions},
};
use parking_lot::{Mutex, MutexGuard};
use std::collections::HashSet;
use std::marker::PhantomData;
use std::path::Path;
use types::{EthSpec, FixedBytesExtended, Hash256};
Expand Down Expand Up @@ -145,6 +147,64 @@ impl<E: EthSpec> LevelDB<E> {
.map_err(Into::into)
}

pub fn extract_if(&self, col: &str, ops: HashSet<&[u8]>) -> Result<(), Error> {
let mut leveldb_batch = Writebatch::new();
for op in ops {
let column_key = get_key_for_col(col, op);
leveldb_batch.delete(BytesKey::from_vec(column_key));
}
self.db.write(self.write_options().into(), &leveldb_batch)?;
Ok(())
}

pub fn do_atomically_for_col(
&self,
col: &str,
ops_batch: Vec<KeyValueStoreOp>,
) -> Result<(), Error> {
let mut leveldb_batch = Writebatch::new();
for op in ops_batch {
match op {
KeyValueStoreOp::PutKeyValue(column, key, value) => {
if col != column {
return Err(DBError::DBError {
message: format!(
"Attempted to mutate unexpected column: {}. Expected: {}, ",
column, col
),
});
}
let _timer = metrics::start_timer(&metrics::DISK_DB_WRITE_TIMES);
metrics::inc_counter_vec_by(
&metrics::DISK_DB_WRITE_BYTES,
&[&column],
value.len() as u64,
);
metrics::inc_counter_vec(&metrics::DISK_DB_WRITE_COUNT, &[&column]);
let column_key = get_key_for_col(&column, &key);
leveldb_batch.put(BytesKey::from_vec(column_key), &value);
}

KeyValueStoreOp::DeleteKey(column, key) => {
if col != column {
return Err(DBError::DBError {
message: format!(
"Attempted to mutate unexpected column: {}. Expected: {}, ",
column, col
),
});
}
let _timer = metrics::start_timer(&metrics::DISK_DB_DELETE_TIMES);
metrics::inc_counter_vec(&metrics::DISK_DB_DELETE_COUNT, &[&column]);
let column_key = get_key_for_col(&column, &key);
leveldb_batch.delete(BytesKey::from_vec(column_key));
}
}
}
self.db.write(self.write_options().into(), &leveldb_batch)?;
Ok(())
}

pub fn do_atomically(&self, ops_batch: Vec<KeyValueStoreOp>) -> Result<(), Error> {
let mut leveldb_batch = Writebatch::new();
for op in ops_batch {
Expand Down
Loading
Loading