Skip to content

Commit

Permalink
Rework the validator scan-for-incorrect-shred-version function (#2851)
Browse files Browse the repository at this point in the history
This scan is important for ensuring that invalidated blocks are
cleared from the Blockstore around cluster outage/restart situations.
The scan previously relied on --wait-for-supermajority being set in
addition to --expected-shred-version. After a cluster had restarted,
nodes could restart by downloading a new snapshot, but still having
invalid state in their blockstore that disrupted normal operation.

The key modifications from this change are:
- The blockstore is always examined, even with wfsm arg
- The blockstore bounds are compared to hard forks to evaluate
  whether or not the blockstore should be scanned for invalid shreds
- The blockstore scan is done with computed shred version to avoid
  any opportunity for an operator setting a bad expected shred version
  and wiping their ledger out
  • Loading branch information
steviez committed Sep 13, 2024
1 parent 5155158 commit 7c82cfd
Showing 1 changed file with 232 additions and 32 deletions.
264 changes: 232 additions & 32 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ use {
epoch_schedule::MAX_LEADER_SCHEDULE_EPOCH_OFFSET,
exit::Exit,
genesis_config::{ClusterType, GenesisConfig},
hard_forks::HardForks,
hash::Hash,
pubkey::Pubkey,
shred_version::compute_shred_version,
Expand Down Expand Up @@ -596,25 +597,8 @@ impl Validator {
));
}
let genesis_config = load_genesis(config, ledger_path)?;

metrics_config_sanity_check(genesis_config.cluster_type)?;

if let Some(expected_shred_version) = config.expected_shred_version {
if let Some(wait_for_supermajority_slot) = config.wait_for_supermajority {
*start_progress.write().unwrap() = ValidatorStartProgress::CleaningBlockStore;
backup_and_clear_blockstore(
ledger_path,
config,
wait_for_supermajority_slot + 1,
expected_shred_version,
)
.context(
"Failed to backup and clear shreds with incorrect shred version from \
blockstore",
)?;
}
}

info!("Cleaning accounts paths..");
*start_progress.write().unwrap() = ValidatorStartProgress::CleaningAccounts;
let mut timer = Measure::start("clean_accounts_paths");
Expand Down Expand Up @@ -741,7 +725,10 @@ impl Validator {
check_poh_speed(&bank_forks.read().unwrap().root_bank(), None)?;
}

let hard_forks = bank_forks.read().unwrap().root_bank().hard_forks();
let (root_slot, hard_forks) = {
let root_bank = bank_forks.read().unwrap().root_bank();
(root_bank.slot(), root_bank.hard_forks())
};
let shred_version = compute_shred_version(&genesis_config.hash(), Some(&hard_forks));
info!(
"shred version: {shred_version}, hard forks: {:?}",
Expand All @@ -758,6 +745,23 @@ impl Validator {
}
}

if let Some(start_slot) = should_cleanup_blockstore_incorrect_shred_versions(
config,
&blockstore,
root_slot,
&hard_forks,
)? {
*start_progress.write().unwrap() = ValidatorStartProgress::CleaningBlockStore;
cleanup_blockstore_incorrect_shred_versions(
&blockstore,
config,
start_slot,
shred_version,
)?;
} else {
info!("Skipping the blockstore check for shreds with incorrect version");
}

node.info.set_shred_version(shred_version);
node.info.set_wallclock(timestamp());
Self::print_node_info(&node);
Expand Down Expand Up @@ -2181,9 +2185,71 @@ fn maybe_warp_slot(
Ok(())
}

/// Returns the starting slot at which the blockstore should be scanned for
/// shreds with an incorrect shred version, or None if the check is unnecessary
fn should_cleanup_blockstore_incorrect_shred_versions(
config: &ValidatorConfig,
blockstore: &Blockstore,
root_slot: Slot,
hard_forks: &HardForks,
) -> Result<Option<Slot>, BlockstoreError> {
// Perform the check if we are booting as part of a cluster restart at slot root_slot
let maybe_cluster_restart_slot = maybe_cluster_restart_with_hard_fork(config, root_slot);
if maybe_cluster_restart_slot.is_some() {
return Ok(Some(root_slot + 1));
}

// If there are no hard forks, the shred version cannot have changed
let Some(latest_hard_fork) = hard_forks.iter().last().map(|(slot, _)| *slot) else {
return Ok(None);
};

// If the blockstore is empty, there are certainly no shreds with an incorrect version
let Some(blockstore_max_slot) = blockstore.highest_slot()? else {
return Ok(None);
};
let blockstore_min_slot = blockstore.lowest_slot();
info!(
"Blockstore contains data from slot {blockstore_min_slot} to {blockstore_max_slot}, the \
latest hard fork is {latest_hard_fork}"
);

if latest_hard_fork < blockstore_min_slot {
// latest_hard_fork < blockstore_min_slot <= blockstore_max_slot
//
// All slots in the blockstore are newer than the latest hard fork, and only shreds with
// the correct shred version should have been inserted since the latest hard fork
//
// This is the normal case where the last cluster restart & hard fork was a while ago; we
// can skip the check for this case
Ok(None)
} else if latest_hard_fork < blockstore_max_slot {
// blockstore_min_slot < latest_hard_fork < blockstore_max_slot
//
// This could be a case where there was a cluster restart, but this node was not part of
// the supermajority that actually restarted the cluster. Rather, this node likely
// downloaded a new snapshot while retaining the blockstore, including slots beyond the
// chosen restart slot. We need to perform the blockstore check for this case
//
// Note that the downloaded snapshot slot (root_slot) could be greater than the latest hard
// fork slot. Even though this node will only replay slots after root_slot, start the check
// at latest_hard_fork + 1 to check (and possibly purge) any invalid state.
Ok(Some(latest_hard_fork + 1))
} else {
// blockstore_min_slot <= blockstore_max_slot <= latest_hard_fork
//
// All slots in the blockstore are older than the latest hard fork. The blockstore check
// would start from latest_hard_fork + 1; skip the check as there are no slots to check
//
// This is kind of an unusual case to hit, maybe a node has been offline for a long time
// and just restarted with a new downloaded snapshot but the old blockstore
Ok(None)
}
}

/// Searches the blockstore for data shreds with a shred version that differs
/// from the passed `expected_shred_version`
fn blockstore_contains_incorrect_shred_version(
fn scan_blockstore_for_incorrect_shred_version(
blockstore: &Blockstore,
start_slot: Slot,
expected_shred_version: u16,
Expand All @@ -2193,7 +2259,7 @@ fn blockstore_contains_incorrect_shred_version(
// Search for shreds with incompatible version in blockstore
let slot_meta_iterator = blockstore.slot_meta_iterator(start_slot)?;

info!("Searching blockstore for shred with incorrect version..");
info!("Searching blockstore for shred with incorrect version from slot {start_slot}");
for (slot, _meta) in slot_meta_iterator {
let shreds = blockstore.get_data_shreds_for_slot(slot, 0)?;
for shred in &shreds {
Expand All @@ -2211,16 +2277,14 @@ fn blockstore_contains_incorrect_shred_version(

/// If the blockstore contains any shreds with the incorrect shred version,
/// copy them to a backup blockstore and purge them from the actual blockstore.
fn backup_and_clear_blockstore(
ledger_path: &Path,
fn cleanup_blockstore_incorrect_shred_versions(
blockstore: &Blockstore,
config: &ValidatorConfig,
start_slot: Slot,
expected_shred_version: u16,
) -> Result<(), BlockstoreError> {
let blockstore =
Blockstore::open_with_options(ledger_path, blockstore_options_from_config(config))?;
let incorrect_shred_version = blockstore_contains_incorrect_shred_version(
&blockstore,
let incorrect_shred_version = scan_blockstore_for_incorrect_shred_version(
blockstore,
start_slot,
expected_shred_version,
)?;
Expand All @@ -2245,7 +2309,7 @@ fn backup_and_clear_blockstore(
end_slot
);
match Blockstore::open_with_options(
&ledger_path.join(backup_folder),
&blockstore.ledger_path().join(backup_folder),
blockstore_options_from_config(config),
) {
Ok(backup_blockstore) => {
Expand Down Expand Up @@ -2341,6 +2405,9 @@ pub enum ValidatorError {
#[error("Bad expected bank hash")]
BadExpectedBankHash,

#[error("blockstore error: {0}")]
Blockstore(#[source] BlockstoreError),

#[error("genesis hash mismatch: actual={0}, expected={1}")]
GenesisHashMismatch(Hash, Hash),

Expand Down Expand Up @@ -2661,7 +2728,143 @@ mod tests {
}

#[test]
fn test_backup_and_clear_blockstore() {
fn test_should_cleanup_blockstore_incorrect_shred_versions() {
solana_logger::setup();

let ledger_path = get_tmp_ledger_path_auto_delete!();
let blockstore = Blockstore::open(ledger_path.path()).unwrap();

let mut validator_config = ValidatorConfig::default_for_test();
let mut hard_forks = HardForks::default();
let mut root_slot;

// Do check from root_slot + 1 if wait_for_supermajority (10) == root_slot (10)
root_slot = 10;
validator_config.wait_for_supermajority = Some(root_slot);
assert_eq!(
should_cleanup_blockstore_incorrect_shred_versions(
&validator_config,
&blockstore,
root_slot,
&hard_forks
)
.unwrap(),
Some(root_slot + 1)
);

// No check if wait_for_supermajority (10) < root_slot (15) (no hard forks)
// Arguably operator error to pass a value for wait_for_supermajority in this case
root_slot = 15;
assert_eq!(
should_cleanup_blockstore_incorrect_shred_versions(
&validator_config,
&blockstore,
root_slot,
&hard_forks
)
.unwrap(),
None,
);

// Emulate cluster restart at slot 10
// No check if wait_for_supermajority (10) < root_slot (15) (empty blockstore)
hard_forks.register(10);
assert_eq!(
should_cleanup_blockstore_incorrect_shred_versions(
&validator_config,
&blockstore,
root_slot,
&hard_forks
)
.unwrap(),
None,
);

// Insert some shreds at newer slots than hard fork
let entries = entry::create_ticks(1, 0, Hash::default());
for i in 20..35 {
let shreds = blockstore::entries_to_test_shreds(
&entries,
i, // slot
i - 1, // parent_slot
true, // is_full_slot
1, // version
true, // merkle_variant
);
blockstore.insert_shreds(shreds, None, true).unwrap();
}

// No check as all blockstore data is newer than latest hard fork
assert_eq!(
should_cleanup_blockstore_incorrect_shred_versions(
&validator_config,
&blockstore,
root_slot,
&hard_forks
)
.unwrap(),
None,
);

// Emulate cluster restart at slot 25
// Do check from root_slot + 1 regardless of whether wait_for_supermajority set correctly
root_slot = 25;
hard_forks.register(root_slot);
validator_config.wait_for_supermajority = Some(root_slot);
assert_eq!(
should_cleanup_blockstore_incorrect_shred_versions(
&validator_config,
&blockstore,
root_slot,
&hard_forks
)
.unwrap(),
Some(root_slot + 1),
);
validator_config.wait_for_supermajority = None;
assert_eq!(
should_cleanup_blockstore_incorrect_shred_versions(
&validator_config,
&blockstore,
root_slot,
&hard_forks
)
.unwrap(),
Some(root_slot + 1),
);

// Do check with advanced root slot, even without wait_for_supermajority set correctly
// Check starts from latest hard fork + 1
root_slot = 30;
let latest_hard_fork = hard_forks.iter().last().unwrap().0;
assert_eq!(
should_cleanup_blockstore_incorrect_shred_versions(
&validator_config,
&blockstore,
root_slot,
&hard_forks
)
.unwrap(),
Some(latest_hard_fork + 1),
);

// Purge blockstore up to latest hard fork
// No check since all blockstore data newer than latest hard fork
blockstore.purge_slots(0, latest_hard_fork, PurgeType::Exact);
assert_eq!(
should_cleanup_blockstore_incorrect_shred_versions(
&validator_config,
&blockstore,
root_slot,
&hard_forks
)
.unwrap(),
None,
);
}

#[test]
fn test_cleanup_blockstore_incorrect_shred_versions() {
solana_logger::setup();

let validator_config = ValidatorConfig::default_for_test();
Expand All @@ -2680,12 +2883,9 @@ mod tests {
);
blockstore.insert_shreds(shreds, None, true).unwrap();
}
drop(blockstore);

// this purges and compacts all slots greater than or equal to 5
backup_and_clear_blockstore(ledger_path.path(), &validator_config, 5, 2).unwrap();

let blockstore = Blockstore::open(ledger_path.path()).unwrap();
cleanup_blockstore_incorrect_shred_versions(&blockstore, &validator_config, 5, 2).unwrap();
// assert that slots less than 5 aren't affected
assert!(blockstore.meta(4).unwrap().unwrap().next_slots.is_empty());
for i in 5..10 {
Expand Down

0 comments on commit 7c82cfd

Please sign in to comment.