Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prune blobs #3852

Merged
merged 64 commits into from
Feb 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
e2a6da4
Boiler plate code for blobs pruning
emhane Jan 5, 2023
7bf88c2
Prune blobs before data availability breakpoint
emhane Jan 6, 2023
fe0c911
Plug in pruning of blobs into app
emhane Jan 8, 2023
2a41f25
fixup! Prune blobs before data availability breakpoint
emhane Jan 8, 2023
b88d888
fixup! Plug in pruning of blobs into app
emhane Jan 8, 2023
d21c66d
fixup! Plug in pruning of blobs into app
emhane Jan 8, 2023
934f3ab
Remove inaccurate guess for db index
emhane Jan 8, 2023
d3b94d8
fixup! Prune blobs before data availability breakpoint
emhane Jan 9, 2023
28e1e63
Fix rebase conflict
emhane Feb 6, 2023
a211e6a
Fix rebase conflict
emhane Feb 6, 2023
ce2db35
Fix rebase conflict
emhane Feb 6, 2023
82ffec3
Fix typo
emhane Jan 12, 2023
d67468d
Prune blobs on migration in addition to start-up
emhane Jan 13, 2023
667cca5
Fix try_prune_blobs to use state root
emhane Jan 13, 2023
6f5ca02
Improve syntax
emhane Jan 13, 2023
a2b8c6e
Save fetching state for blobs pruning
emhane Jan 13, 2023
94aa2ce
Log info loaded from disk
emhane Jan 14, 2023
c7f53a9
Delete blobs that conflict with finalization
emhane Jan 14, 2023
8752dee
Store orphan block roots
emhane Jan 14, 2023
2f565d2
Prune blobs in bg after canonical head update
emhane Jan 14, 2023
6346c30
Enable skipping blob pruning at each epoch
emhane Jan 14, 2023
d58a30b
fixup! Store orphan block roots
emhane Jan 14, 2023
fb2ce90
Avoid repeteadly updating blob info for multiple head candidates
emhane Jan 15, 2023
b5abfe6
Convert epochs_per_blob_prune to Epoch once
emhane Jan 16, 2023
0d13932
Fix epoch constructor misconception
emhane Jan 16, 2023
7103a25
Simplify conceptual design
emhane Jan 16, 2023
2056775
fixup! Simplify conceptual design
emhane Jan 16, 2023
44ec331
fixup! Simplify conceptual design
emhane Jan 17, 2023
3d93dad
Fix type bug
emhane Jan 17, 2023
74172ed
Ignore IDE file
emhane Jan 17, 2023
83a9520
Clarify hybrid blob prune solution and fix error handling
emhane Jan 18, 2023
54699f8
fixup! Clarify hybrid blob prune solution and fix error handling
emhane Jan 18, 2023
3bede06
Fix typo
emhane Jan 18, 2023
a875bec
Fix blobs store bug
emhane Jan 18, 2023
caa04db
Run prune blobs on migrator thread
emhane Jan 18, 2023
0bdc291
Only store non-empty orphaned blobs
emhane Jan 19, 2023
d1b75e2
Fix typo
emhane Jan 20, 2023
d7fc24a
Plug in running blob pruning in migrator, related bug fixes and add t…
emhane Jan 20, 2023
1812301
Allow user to set an epoch margin for pruning
emhane Jan 20, 2023
4de523f
fixup! Allow user to set an epoch margin for pruning
emhane Jan 20, 2023
756c881
Keep uniform size small keys
emhane Jan 24, 2023
e4b4473
Clarify wording
emhane Jan 24, 2023
f6346f8
Clarify comment
emhane Jan 24, 2023
c50f831
Fix wording
emhane Jan 24, 2023
63ca3bf
Prune from highest data availability boundary
emhane Jan 24, 2023
43c3c74
fixup! Fix blobs store bug
emhane Jan 24, 2023
d479560
fixup! Prune from highest data availability boundary
emhane Jan 24, 2023
9c2e623
Reflect use of prune margin epochs at import
emhane Jan 24, 2023
5d2480c
Improve naming
emhane Jan 24, 2023
6dff69b
Atomically update blob info with pruned blobs
emhane Jan 24, 2023
9ee9b6d
Remove unused stuff
emhane Jan 24, 2023
1e59cb9
Add tests for blob pruning flags
emhane Jan 24, 2023
a2eda76
Correct comment
emhane Jan 24, 2023
8f137df
fixup! Allow user to set an epoch margin for pruning
emhane Jan 24, 2023
00ca21e
Make implementation of BlobInfo more coder friendly
emhane Jan 24, 2023
b2abec5
Verify StoreConfig
emhane Jan 24, 2023
56c8417
Fix conflicts rebasing eip4844
emhane Feb 6, 2023
577262c
Improve use of whitespace
emhane Feb 7, 2023
d599e41
Remove debug comment
emhane Feb 7, 2023
d7eb944
Reorder loading of db metadata from disk to allow for future changes …
emhane Feb 7, 2023
9d91991
Removed unused code
emhane Feb 7, 2023
ac4b5b5
Fix regression in DB write atomicity
michaelsproul Jan 30, 2023
bc468b4
fixup! Improve use of whitespace
emhane Feb 7, 2023
6a37e84
fixup! Fix regression in DB write atomicity
emhane Feb 7, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,6 @@ genesis.ssz

# IntelliJ
/*.iml

# VSCode
/.vscode
75 changes: 54 additions & 21 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -611,10 +611,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
start_slot,
end_slot,
|| {
(
Ok((
head.beacon_state.clone_with_only_committee_caches(),
head.beacon_block_root,
)
))
},
&self.spec,
)?;
Expand Down Expand Up @@ -708,10 +708,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
start_slot,
end_slot,
|| {
(
Ok((
head.beacon_state.clone_with_only_committee_caches(),
head.beacon_state_root(),
)
))
},
&self.spec,
)?;
Expand Down Expand Up @@ -2878,7 +2878,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// is so we don't have to think about lock ordering with respect to the fork choice lock.
// There are a bunch of places where we lock both fork choice and the pubkey cache and it
// would be difficult to check that they all lock fork choice first.
let mut kv_store_ops = self
let mut ops = self
.validator_pubkey_cache
.try_write_for(VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT)
.ok_or(Error::ValidatorPubkeyCacheLockTimeout)?
Expand Down Expand Up @@ -2981,9 +2981,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// ---------------------------- BLOCK PROBABLY ATTESTABLE ----------------------------------
// Most blocks are now capable of being attested to thanks to the `early_attester_cache`
// cache above. Resume non-essential processing.
//
// It is important NOT to return errors here before the database commit, because the block
// has already been added to fork choice and the database would be left in an inconsistent
// state if we returned early without committing. In other words, an error here would
// corrupt the node's database permanently.
// -----------------------------------------------------------------------------------------

self.import_block_update_shuffling_cache(block_root, &mut state)?;
self.import_block_update_shuffling_cache(block_root, &mut state);
self.import_block_observe_attestations(
block,
&state,
Expand All @@ -3008,25 +3013,39 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// See https://github.com/sigp/lighthouse/issues/2028
let (signed_block, blobs) = signed_block.deconstruct();
let block = signed_block.message();
let mut ops: Vec<_> = confirmed_state_roots
.into_iter()
.map(StoreOp::DeleteStateTemporaryFlag)
.collect();
ops.extend(
confirmed_state_roots
.into_iter()
.map(StoreOp::DeleteStateTemporaryFlag),
);
ops.push(StoreOp::PutBlock(block_root, signed_block.clone()));
ops.push(StoreOp::PutState(block.state_root(), &state));

if let Some(blobs) = blobs {
if blobs.blobs.len() > 0 {
//FIXME(sean) using this for debugging for now
info!(self.log, "Writing blobs to store"; "block_root" => ?block_root);
ops.push(StoreOp::PutBlobs(block_root, blobs));
// Only consider blobs if the eip4844 fork is enabled.
if let Some(data_availability_boundary) = self.data_availability_boundary() {
let block_epoch = block.slot().epoch(T::EthSpec::slots_per_epoch());
let margin_epochs = self.store.get_config().blob_prune_margin_epochs;
let import_boundary = data_availability_boundary - margin_epochs;

// Only store blobs at the data availability boundary, minus any configured epochs
// margin, or younger (of higher epoch number).
if block_epoch >= import_boundary {
if let Some(blobs) = blobs {
if blobs.blobs.len() > 0 {
//FIXME(sean) using this for debugging for now
info!(
self.log, "Writing blobs to store";
"block_root" => ?block_root
);
ops.push(StoreOp::PutBlobs(block_root, blobs));
}
}
}
};
let txn_lock = self.store.hot_db.begin_rw_transaction();
}

kv_store_ops.extend(self.store.convert_to_kv_batch(ops)?);
let txn_lock = self.store.hot_db.begin_rw_transaction();

if let Err(e) = self.store.hot_db.do_atomically(kv_store_ops) {
if let Err(e) = self.store.do_atomically(ops) {
error!(
self.log,
"Database write failed!";
Expand Down Expand Up @@ -3455,13 +3474,27 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}

// For the current and next epoch of this state, ensure we have the shuffling from this
// block in our cache.
fn import_block_update_shuffling_cache(
&self,
block_root: Hash256,
state: &mut BeaconState<T::EthSpec>,
) {
if let Err(e) = self.import_block_update_shuffling_cache_fallible(block_root, state) {
warn!(
self.log,
"Failed to prime shuffling cache";
"error" => ?e
);
}
}

fn import_block_update_shuffling_cache_fallible(
&self,
block_root: Hash256,
state: &mut BeaconState<T::EthSpec>,
) -> Result<(), BlockError<T::EthSpec>> {
// For the current and next epoch of this state, ensure we have the shuffling from this
// block in our cache.
for relative_epoch in [RelativeEpoch::Current, RelativeEpoch::Next] {
let shuffling_id = AttestationShufflingId::new(block_root, state, relative_epoch)?;

Expand Down
5 changes: 5 additions & 0 deletions beacon_node/beacon_chain/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -914,6 +914,11 @@ where
);
}

// Prune blobs sidecars older than the blob data availability boundary in the background.
beacon_chain
.store_migrator
.process_prune_blobs(beacon_chain.data_availability_boundary());

Ok(beacon_chain)
}
}
Expand Down
4 changes: 4 additions & 0 deletions beacon_node/beacon_chain/src/canonical_head.rs
Original file line number Diff line number Diff line change
Expand Up @@ -751,6 +751,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// Drop the old cache head nice and early to try and free the memory as soon as possible.
drop(old_cached_head);

// Prune blobs in the background.
self.store_migrator
.process_prune_blobs(self.data_availability_boundary());

// If the finalized checkpoint changed, perform some updates.
//
// The `after_finalization` function will take a write-lock on `fork_choice`, therefore it
Expand Down
45 changes: 43 additions & 2 deletions beacon_node/beacon_chain/src/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ pub enum PruningError {
pub enum Notification {
Finalization(FinalizationNotification),
Reconstruction,
PruneBlobs(Option<Epoch>),
}

pub struct FinalizationNotification {
Expand Down Expand Up @@ -152,6 +153,14 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
}
}

pub fn process_prune_blobs(&self, data_availability_boundary: Option<Epoch>) {
if let Some(Notification::PruneBlobs(data_availability_boundary)) =
self.send_background_notification(Notification::PruneBlobs(data_availability_boundary))
{
Self::run_prune_blobs(self.db.clone(), data_availability_boundary, &self.log);
}
}

pub fn run_reconstruction(db: Arc<HotColdDB<E, Hot, Cold>>, log: &Logger) {
if let Err(e) = db.reconstruct_historic_states() {
error!(
Expand All @@ -162,6 +171,20 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
}
}

pub fn run_prune_blobs(
db: Arc<HotColdDB<E, Hot, Cold>>,
data_availability_boundary: Option<Epoch>,
log: &Logger,
) {
if let Err(e) = db.try_prune_blobs(false, data_availability_boundary) {
error!(
log,
"Blobs pruning failed";
"error" => ?e,
);
}
}

/// If configured to run in the background, send `notif` to the background thread.
///
/// Return `None` if the message was sent to the background thread, `Some(notif)` otherwise.
Expand Down Expand Up @@ -320,11 +343,21 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
best
}
}
(Notification::Finalization(_), Notification::PruneBlobs(_)) => best,
(Notification::PruneBlobs(_), Notification::Finalization(_)) => other,
(Notification::PruneBlobs(dab1), Notification::PruneBlobs(dab2)) => {
if dab2 > dab1 {
other
} else {
best
}
}
});

match notif {
Notification::Reconstruction => Self::run_reconstruction(db.clone(), &log),
Notification::Finalization(fin) => Self::run_migration(db.clone(), fin, &log),
Notification::PruneBlobs(dab) => Self::run_prune_blobs(db.clone(), dab, &log),
}
}
});
Expand Down Expand Up @@ -569,10 +602,18 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
.into_iter()
.map(Into::into)
.flat_map(|block_root: Hash256| {
[
let mut store_ops = vec![
StoreOp::DeleteBlock(block_root),
StoreOp::DeleteExecutionPayload(block_root),
]
];
if let Ok(true) = store.blobs_sidecar_exists(&block_root) {
// Keep track of non-empty orphaned blobs sidecars.
store_ops.extend([
StoreOp::DeleteBlobs(block_root),
StoreOp::PutOrphanedBlobsKey(block_root),
]);
}
store_ops
})
.chain(
abandoned_states
Expand Down
17 changes: 11 additions & 6 deletions beacon_node/beacon_chain/src/validator_pubkey_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use ssz::{Decode, Encode};
use std::collections::HashMap;
use std::convert::TryInto;
use std::marker::PhantomData;
use store::{DBColumn, Error as StoreError, KeyValueStore, KeyValueStoreOp, StoreItem};
use store::{DBColumn, Error as StoreError, StoreItem, StoreOp};
use types::{BeaconState, Hash256, PublicKey, PublicKeyBytes};

/// Provides a mapping of `validator_index -> validator_publickey`.
Expand Down Expand Up @@ -38,7 +38,7 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
};

let store_ops = cache.import_new_pubkeys(state)?;
store.hot_db.do_atomically(store_ops)?;
store.do_atomically(store_ops)?;

Ok(cache)
}
Expand Down Expand Up @@ -79,7 +79,7 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
pub fn import_new_pubkeys(
&mut self,
state: &BeaconState<T::EthSpec>,
) -> Result<Vec<KeyValueStoreOp>, BeaconChainError> {
) -> Result<Vec<StoreOp<'static, T::EthSpec>>, BeaconChainError> {
if state.validators().len() > self.pubkeys.len() {
self.import(
state.validators()[self.pubkeys.len()..]
Expand All @@ -92,7 +92,10 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
}

/// Adds zero or more validators to `self`.
fn import<I>(&mut self, validator_keys: I) -> Result<Vec<KeyValueStoreOp>, BeaconChainError>
fn import<I>(
&mut self,
validator_keys: I,
) -> Result<Vec<StoreOp<'static, T::EthSpec>>, BeaconChainError>
where
I: Iterator<Item = PublicKeyBytes> + ExactSizeIterator,
{
Expand All @@ -112,7 +115,9 @@ impl<T: BeaconChainTypes> ValidatorPubkeyCache<T> {
// It will be committed atomically when the block that introduced it is written to disk.
// Notably it is NOT written while the write lock on the cache is held.
// See: https://github.com/sigp/lighthouse/issues/2327
store_ops.push(DatabasePubkey(pubkey).as_kv_store_op(DatabasePubkey::key_for_index(i)));
store_ops.push(StoreOp::KeyValueOp(
DatabasePubkey(pubkey).as_kv_store_op(DatabasePubkey::key_for_index(i)),
));

self.pubkeys.push(
(&pubkey)
Expand Down Expand Up @@ -294,7 +299,7 @@ mod test {
let ops = cache
.import_new_pubkeys(&state)
.expect("should import pubkeys");
store.hot_db.do_atomically(ops).unwrap();
store.do_atomically(ops).unwrap();
check_cache_get(&cache, &keypairs[..]);
drop(cache);

Expand Down
10 changes: 4 additions & 6 deletions beacon_node/network/src/beacon_processor/worker/rpc_methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -688,12 +688,10 @@ impl<T: BeaconChainTypes> Worker<T> {
let serve_blobs_from_slot = if start_epoch < data_availability_boundary {
// Attempt to serve from the earliest block in our database, falling back to the data
// availability boundary
let oldest_blob_slot = self
.chain
.store
.get_blob_info()
.map(|blob_info| blob_info.oldest_blob_slot)
.unwrap_or(data_availability_boundary.start_slot(T::EthSpec::slots_per_epoch()));
let oldest_blob_slot =
self.chain.store.get_blob_info().oldest_blob_slot.unwrap_or(
data_availability_boundary.start_slot(T::EthSpec::slots_per_epoch()),
);

debug!(
self.log,
Expand Down
25 changes: 25 additions & 0 deletions beacon_node/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,31 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
.takes_value(true)
.default_value("true")
)
.arg(
Arg::with_name("prune-blobs")
.long("prune-blobs")
.help("Prune blobs from Lighthouse's database when they are older than the data \
data availability boundary relative to the current epoch.")
.takes_value(true)
.default_value("true")
)
.arg(
Arg::with_name("epochs-per-blob-prune")
.long("epochs-per-blob-prune")
.help("The epoch interval with which to prune blobs from Lighthouse's \
database when they are older than the data availability boundary \
relative to the current epoch.")
.takes_value(true)
.default_value("1")
)
.arg(
Arg::with_name("blob-prune-margin-epochs")
.long("blob-prune-margin-epochs")
.help("The margin for blob pruning in epochs. The oldest blobs are pruned \
up until data_availability_boundary - blob_prune_margin_epochs.")
.takes_value(true)
.default_value("0")
)

/*
* Misc.
Expand Down
16 changes: 16 additions & 0 deletions beacon_node/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,22 @@ pub fn get_config<E: EthSpec>(
client_config.store.prune_payloads = prune_payloads;
}

if let Some(prune_blobs) = clap_utils::parse_optional(cli_args, "prune-blobs")? {
client_config.store.prune_blobs = prune_blobs;
}

if let Some(epochs_per_blob_prune) =
clap_utils::parse_optional(cli_args, "epochs-per-blob-prune")?
{
client_config.store.epochs_per_blob_prune = epochs_per_blob_prune;
}

if let Some(blob_prune_margin_epochs) =
clap_utils::parse_optional(cli_args, "blob-prune-margin-epochs")?
{
client_config.store.blob_prune_margin_epochs = blob_prune_margin_epochs;
}

/*
* Zero-ports
*
Expand Down
Loading