Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - Add execution_optimistic flag to HTTP responses #3070

Closed
wants to merge 14 commits into from
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

46 changes: 26 additions & 20 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1289,23 +1289,28 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
epoch: Epoch,
head_block_root: Hash256,
) -> Result<(Vec<Option<AttestationDuty>>, Hash256, ExecutionStatus), Error> {
self.with_committee_cache(head_block_root, epoch, |committee_cache, dependent_root| {
let duties = validator_indices
.iter()
.map(|validator_index| {
let validator_index = *validator_index as usize;
committee_cache.get_attestation_duties(validator_index)
})
.collect();

let execution_status = self
.canonical_head
.fork_choice_read_lock()
.get_block_execution_status(&head_block_root)
.ok_or(Error::AttestationHeadNotInForkChoice(head_block_root))?;
let execution_status = self
.canonical_head
.fork_choice_read_lock()
.get_block_execution_status(&head_block_root)
.ok_or(Error::AttestationHeadNotInForkChoice(head_block_root))?;

let (duties, dependent_root) = self.with_committee_cache(
head_block_root,
epoch,
|committee_cache, dependent_root| {
let duties = validator_indices
.iter()
.map(|validator_index| {
let validator_index = *validator_index as usize;
committee_cache.get_attestation_duties(validator_index)
})
.collect();

Ok((duties, dependent_root, execution_status))
})
Ok((duties, dependent_root))
},
)?;
Ok((duties, dependent_root, execution_status))
}

/// Returns an aggregated `Attestation`, if any, that has a matching `attestation.data`.
Expand Down Expand Up @@ -2908,6 +2913,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
event_handler.register(EventKind::Block(SseBlock {
slot,
block: block_root,
execution_optimistic: payload_verification_status.is_optimistic(),
}));
}
}
Expand Down Expand Up @@ -4055,9 +4061,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
///
/// Returns `Ok(false)` if the block is pre-Bellatrix, or has `ExecutionStatus::Valid`.
/// Returns `Ok(true)` if the block has `ExecutionStatus::Optimistic`.
pub fn is_optimistic_block(
pub fn is_optimistic_block<Payload: ExecPayload<T::EthSpec>>(
&self,
block: &SignedBeaconBlock<T::EthSpec>,
block: &SignedBeaconBlock<T::EthSpec, Payload>,
) -> Result<bool, BeaconChainError> {
// Check if the block is pre-Bellatrix.
if self.slot_is_prior_to_bellatrix(block.slot()) {
Expand All @@ -4081,9 +4087,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
///
/// There is a potential race condition when syncing where the block_root of `head_block` could
/// be pruned from the fork choice store before being read.
pub fn is_optimistic_head_block(
pub fn is_optimistic_head_block<Payload: ExecPayload<T::EthSpec>>(
&self,
head_block: &SignedBeaconBlock<T::EthSpec>,
head_block: &SignedBeaconBlock<T::EthSpec, Payload>,
) -> Result<bool, BeaconChainError> {
// Check if the block is pre-Bellatrix.
if self.slot_is_prior_to_bellatrix(head_block.slot()) {
Expand Down
24 changes: 24 additions & 0 deletions beacon_node/beacon_chain/src/canonical_head.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,23 @@ impl<T: BeaconChainTypes> CanonicalHead<T> {
.ok_or(Error::HeadMissingFromForkChoice(head_block_root))
}

/// Returns a clone of the `CachedHead` and the execution status of the contained head block.
///
/// This will only return `Err` in the scenario where `self.fork_choice` has advanced
/// significantly past the cached `head_snapshot`. In such a scenario it is likely prudent to
/// run `BeaconChain::recompute_head` to update the cached values.
pub fn head_and_execution_status(
&self,
) -> Result<(CachedHead<T::EthSpec>, ExecutionStatus), Error> {
let head = self.cached_head();
let head_block_root = head.head_block_root();
let execution_status = self
.fork_choice_read_lock()
.get_block_execution_status(&head_block_root)
.ok_or(Error::HeadMissingFromForkChoice(head_block_root))?;
Ok((head, execution_status))
}

/// Returns a clone of `self.cached_head`.
///
/// Takes a read-lock on `self.cached_head` for a short time (just long enough to clone it).
Expand Down Expand Up @@ -713,6 +730,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
) -> Result<(), Error> {
let old_snapshot = &old_cached_head.snapshot;
let new_snapshot = &new_cached_head.snapshot;
let new_head_is_optimistic = new_head_proto_block.execution_status.is_optimistic();

// Detect and potentially report any re-orgs.
let reorg_distance = detect_reorg(
Expand Down Expand Up @@ -798,6 +816,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
current_duty_dependent_root,
previous_duty_dependent_root,
epoch_transition: is_epoch_transition,
execution_optimistic: new_head_is_optimistic,
}));
}
(Err(e), _) | (_, Err(e)) => {
Expand Down Expand Up @@ -825,6 +844,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
new_head_block: new_snapshot.beacon_block_root,
new_head_state: new_snapshot.beacon_state_root(),
epoch: head_slot.epoch(T::EthSpec::slots_per_epoch()),
execution_optimistic: new_head_is_optimistic,
}));
}
}
Expand All @@ -841,6 +861,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
finalized_proto_block: ProtoBlock,
) -> Result<(), Error> {
let new_snapshot = &new_cached_head.snapshot;
let finalized_block_is_optimistic = finalized_proto_block.execution_status.is_optimistic();

self.op_pool
.prune_all(&new_snapshot.beacon_state, self.epoch()?);
Expand Down Expand Up @@ -884,6 +905,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// specific state root at the first slot of the finalized epoch (which
// might be a skip slot).
state: finalized_proto_block.state_root,
execution_optimistic: finalized_block_is_optimistic,
}));
}
}
Expand Down Expand Up @@ -1216,6 +1238,7 @@ fn observe_head_block_delays<E: EthSpec, S: SlotClock>(
let block_time_set_as_head = timestamp_now();
let head_block_root = head_block.root;
let head_block_slot = head_block.slot;
let head_block_is_optimistic = head_block.execution_status.is_optimistic();

// Calculate the total delay between the start of the slot and when it was set as head.
let block_delay_total = get_slot_delay_ms(block_time_set_as_head, head_block_slot, slot_clock);
Expand Down Expand Up @@ -1308,6 +1331,7 @@ fn observe_head_block_delays<E: EthSpec, S: SlotClock>(
observed_delay: block_delays.observed,
imported_delay: block_delays.imported,
set_as_head_delay: block_delays.set_as_head,
execution_optimistic: head_block_is_optimistic,
}));
}
}
Expand Down
8 changes: 8 additions & 0 deletions beacon_node/beacon_chain/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use state_processing::{
};
use std::borrow::Cow;
use std::collections::{HashMap, HashSet};
use std::fmt;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -1778,3 +1779,10 @@ where
(honest_head, faulty_head)
}
}

// Junk `Debug` impl to satistfy certain trait bounds during testing.
impl<T: BeaconChainTypes> fmt::Debug for BeaconChainHarness<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "BeaconChainHarness")
}
}
1 change: 1 addition & 0 deletions beacon_node/http_api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ tree_hash = "0.4.1"
sensitive_url = { path = "../../common/sensitive_url" }
logging = { path = "../../common/logging" }
serde_json = "1.0.58"
proto_array = { path = "../../consensus/proto_array" }

[[test]]
name = "bn_http_api_tests"
Expand Down
81 changes: 51 additions & 30 deletions beacon_node/http_api/src/attester_duties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,17 @@ fn cached_attestation_duties<T: BeaconChainTypes>(
) -> Result<ApiDuties, warp::reject::Rejection> {
let head_block_root = chain.canonical_head.cached_head().head_block_root();

let (duties, dependent_root, _execution_status) = chain
let (duties, dependent_root, execution_status) = chain
.validator_attestation_duties(request_indices, request_epoch, head_block_root)
.map_err(warp_utils::reject::beacon_chain_error)?;

convert_to_api_response(duties, request_indices, dependent_root, chain)
convert_to_api_response(
duties,
request_indices,
dependent_root,
execution_status.is_optimistic(),
chain,
)
}

/// Compute some attester duties by reading a `BeaconState` from disk, completely ignoring the
Expand All @@ -76,35 +82,42 @@ fn compute_historic_attester_duties<T: BeaconChainTypes>(
) -> Result<ApiDuties, warp::reject::Rejection> {
// If the head is quite old then it might still be relevant for a historical request.
//
// Use the `with_head` function to read & clone in a single call to avoid race conditions.
let state_opt = chain
.with_head(|head| {
if head.beacon_state.current_epoch() <= request_epoch {
Ok(Some((
head.beacon_state_root(),
head.beacon_state
.clone_with(CloneConfig::committee_caches_only()),
)))
} else {
Ok(None)
}
})
.map_err(warp_utils::reject::beacon_chain_error)?;

let mut state = if let Some((state_root, mut state)) = state_opt {
// If we've loaded the head state it might be from a previous epoch, ensure it's in a
// suitable epoch.
ensure_state_knows_attester_duties_for_epoch(
&mut state,
state_root,
request_epoch,
&chain.spec,
)?;
state
} else {
StateId::slot(request_epoch.start_slot(T::EthSpec::slots_per_epoch())).state(chain)?
// Avoid holding the `cached_head` longer than necessary.
let state_opt = {
let (cached_head, execution_status) = chain
.canonical_head
.head_and_execution_status()
.map_err(warp_utils::reject::beacon_chain_error)?;
let head = &cached_head.snapshot;

if head.beacon_state.current_epoch() <= request_epoch {
Some((
head.beacon_state_root(),
head.beacon_state
.clone_with(CloneConfig::committee_caches_only()),
execution_status.is_optimistic(),
))
} else {
None
}
};

let (mut state, execution_optimistic) =
if let Some((state_root, mut state, execution_optimistic)) = state_opt {
// If we've loaded the head state it might be from a previous epoch, ensure it's in a
// suitable epoch.
ensure_state_knows_attester_duties_for_epoch(
&mut state,
state_root,
request_epoch,
&chain.spec,
)?;
(state, execution_optimistic)
} else {
StateId::from_slot(request_epoch.start_slot(T::EthSpec::slots_per_epoch()))
.state(chain)?
};

// Sanity-check the state lookup.
if !(state.current_epoch() == request_epoch || state.current_epoch() + 1 == request_epoch) {
return Err(warp_utils::reject::custom_server_error(format!(
Expand Down Expand Up @@ -140,7 +153,13 @@ fn compute_historic_attester_duties<T: BeaconChainTypes>(
.collect::<Result<_, _>>()
.map_err(warp_utils::reject::beacon_chain_error)?;

convert_to_api_response(duties, request_indices, dependent_root, chain)
convert_to_api_response(
duties,
request_indices,
dependent_root,
execution_optimistic,
chain,
)
}

fn ensure_state_knows_attester_duties_for_epoch<E: EthSpec>(
Expand Down Expand Up @@ -178,6 +197,7 @@ fn convert_to_api_response<T: BeaconChainTypes>(
duties: Vec<Option<AttestationDuty>>,
indices: &[u64],
dependent_root: Hash256,
execution_optimistic: bool,
chain: &BeaconChain<T>,
) -> Result<ApiDuties, warp::reject::Rejection> {
// Protect against an inconsistent slot clock.
Expand Down Expand Up @@ -213,6 +233,7 @@ fn convert_to_api_response<T: BeaconChainTypes>(

Ok(api_types::DutiesResponse {
dependent_root,
execution_optimistic: Some(execution_optimistic),
data,
})
}
Loading