Skip to content

Commit

Permalink
Fix http_api to pass make lint
Browse files Browse the repository at this point in the history
  • Loading branch information
paulhauner committed Jun 28, 2022
1 parent 4fa5be6 commit ac01a7e
Show file tree
Hide file tree
Showing 8 changed files with 81 additions and 122 deletions.
13 changes: 6 additions & 7 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ use crate::{metrics, BeaconChainError};
use eth2::types::{EventKind, SseBlock, SyncDuty};
use execution_layer::{ExecutionLayer, PayloadAttributes, PayloadStatus};
use fork_choice::{
AttestationFromBlock, ForkChoice, ForkchoiceUpdateParameters, InvalidationOperation,
AttestationFromBlock, ExecutionStatus, ForkChoice, ForkchoiceUpdateParameters,
InvalidationOperation,
};
use futures::channel::mpsc::Sender;
use itertools::process_results;
Expand Down Expand Up @@ -1352,7 +1353,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
validator_indices: &[u64],
epoch: Epoch,
head_block_root: Hash256,
) -> Result<(Vec<Option<AttestationDuty>>, Hash256, bool), Error> {
) -> Result<(Vec<Option<AttestationDuty>>, Hash256, ExecutionStatus), Error> {
self.with_committee_cache(head_block_root, epoch, |committee_cache, dependent_root| {
let duties = validator_indices
.iter()
Expand All @@ -1362,15 +1363,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
})
.collect();

let execution_optimistic = self
let execution_status = self
.canonical_head
.read()
.fork_choice
.get_block_execution_status(&head_block_root)
.ok_or(Error::AttestationHeadNotInForkChoice(head_block_root))?
.is_optimistic();
.ok_or(Error::AttestationHeadNotInForkChoice(head_block_root))?;

Ok((duties, dependent_root, execution_optimistic))
Ok((duties, dependent_root, execution_status))
})
}

Expand Down Expand Up @@ -2803,7 +2803,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
event_handler.register(EventKind::Block(SseBlock {
slot,
block: block_root,
execution_optimistic: self.is_optimistic_block(&signed_block)?,
}));
}
}
Expand Down
31 changes: 22 additions & 9 deletions beacon_node/beacon_chain/src/beacon_proposer_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@
//! values it stores are very small, so this should not be an issue.

use crate::{BeaconChain, BeaconChainError, BeaconChainTypes};
use fork_choice::ExecutionStatus;
use lru::LruCache;
use smallvec::SmallVec;
use state_processing::state_advance::partial_state_advance;
use std::cmp::Ordering;
use types::{
BeaconState, BeaconStateError, ChainSpec, Epoch, EthSpec, Fork, Hash256, Slot, Unsigned,
BeaconState, BeaconStateError, ChainSpec, CloneConfig, Epoch, EthSpec, Fork, Hash256, Slot,
Unsigned,
};

/// The number of sets of proposer indices that should be cached.
Expand Down Expand Up @@ -135,11 +137,24 @@ impl BeaconProposerCache {
pub fn compute_proposer_duties_from_head<T: BeaconChainTypes>(
current_epoch: Epoch,
chain: &BeaconChain<T>,
) -> Result<(Vec<usize>, Hash256, bool, Fork), BeaconChainError> {
// Take a copy of the head of the chain.
let head = chain.head()?;
let mut state = head.beacon_state;
let head_state_root = head.beacon_block.state_root();
) -> Result<(Vec<usize>, Hash256, ExecutionStatus, Fork), BeaconChainError> {
// Atomically collect information about the head whilst hogging the `canonical_head_lock` as
// little as possible.
let (mut state, head_state_root, execution_status) = {
let head = chain.canonical_head.read();
// Take a copy of the head state.
let head_state = head
.head_snapshot
.beacon_state
.clone_with(CloneConfig::committee_caches_only());
let head_state_root = head.head_state_root();
let head_block_root = head.head_block_root();
let execution_status = head
.fork_choice
.get_block_execution_status(&head_block_root)
.ok_or(BeaconChainError::HeadMissingFromForkChoice(head_block_root))?;
(head_state, head_state_root, execution_status)
};

// Advance the state into the requested epoch.
ensure_state_is_in_epoch(&mut state, head_state_root, current_epoch, &chain.spec)?;
Expand All @@ -153,9 +168,7 @@ pub fn compute_proposer_duties_from_head<T: BeaconChainTypes>(
.proposer_shuffling_decision_root(chain.genesis_block_root)
.map_err(BeaconChainError::from)?;

let execution_optimistic = chain.is_optimistic_head()?;

Ok((indices, dependent_root, execution_optimistic, state.fork()))
Ok((indices, dependent_root, execution_status, state.fork()))
}

/// If required, advance `state` to `target_epoch`.
Expand Down
4 changes: 0 additions & 4 deletions beacon_node/beacon_chain/src/recompute_head.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
current_duty_dependent_root,
previous_duty_dependent_root,
epoch_transition: is_epoch_transition,
execution_optimistic: new_head_proto_block.execution_status.is_optimistic(),
}));
}
(Err(e), _) | (_, Err(e)) => {
Expand Down Expand Up @@ -435,7 +434,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
new_head_block: new_head.beacon_block_root,
new_head_state: new_head.beacon_state_root(),
epoch: head_slot.epoch(T::EthSpec::slots_per_epoch()),
execution_optimistic: new_head_proto_block.execution_status.is_optimistic(),
}));
}
}
Expand Down Expand Up @@ -494,7 +492,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
epoch: new_view.finalized_checkpoint.epoch,
block: new_view.finalized_checkpoint.root,
state: finalized_proto_block.state_root,
execution_optimistic: self.is_optimistic_head()?,
}));
}
}
Expand Down Expand Up @@ -781,7 +778,6 @@ fn observe_head_block_delays<E: EthSpec, S: SlotClock>(
observed_delay: block_delays.observed,
imported_delay: block_delays.imported,
set_as_head_delay: block_delays.set_as_head,
execution_optimistic: head_block.execution_status.is_optimistic(),
}));
}
}
Expand Down
8 changes: 3 additions & 5 deletions beacon_node/http_api/src/attester_duties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,10 @@ fn cached_attestation_duties<T: BeaconChainTypes>(
request_indices: &[u64],
chain: &BeaconChain<T>,
) -> Result<ApiDuties, warp::reject::Rejection> {
let head = chain
.head_info()
.map_err(warp_utils::reject::beacon_chain_error)?;
let head_block_root = chain.canonical_head.read().head_block_root();

let (duties, dependent_root) = chain
.validator_attestation_duties(request_indices, request_epoch, head.block_root)
let (duties, dependent_root, _execution_status) = chain
.validator_attestation_duties(request_indices, request_epoch, head_block_root)
.map_err(warp_utils::reject::beacon_chain_error)?;

convert_to_api_response(duties, request_indices, dependent_root, chain)
Expand Down
15 changes: 3 additions & 12 deletions beacon_node/http_api/src/block_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,10 @@ impl BlockId {
chain: &BeaconChain<T>,
) -> Result<Hash256, warp::Rejection> {
match &self.0 {
CoreBlockId::Head => chain
.head_info()
.map(|head| head.block_root)
.map_err(warp_utils::reject::beacon_chain_error),
CoreBlockId::Head => Ok(chain.canonical_head.read().head_block_root()),
CoreBlockId::Genesis => Ok(chain.genesis_block_root),
CoreBlockId::Finalized => chain
.head_info()
.map(|head| head.finalized_checkpoint.root)
.map_err(warp_utils::reject::beacon_chain_error),
CoreBlockId::Justified => chain
.head_info()
.map(|head| head.current_justified_checkpoint.root)
.map_err(warp_utils::reject::beacon_chain_error),
CoreBlockId::Finalized => Ok(chain.canonical_head.read().finalized_checkpoint().root),
CoreBlockId::Justified => Ok(chain.canonical_head.read().justified_checkpoint().root),
CoreBlockId::Slot(slot) => chain
.block_root_at_slot(*slot, WhenSlotSkipped::None)
.map_err(warp_utils::reject::beacon_chain_error)
Expand Down
74 changes: 17 additions & 57 deletions beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use beacon_chain::{
observed_operations::ObservationOutcome,
validator_monitor::{get_block_delay_ms, timestamp_now},
AttestationError as AttnError, BeaconChain, BeaconChainError, BeaconChainTypes,
HeadSafetyStatus, ProduceBlockVerification, WhenSlotSkipped,
ProduceBlockVerification, WhenSlotSkipped,
};
use block_id::BlockId;
use eth2::types::{self as api_types, EndpointVersion, ValidatorId};
Expand Down Expand Up @@ -369,9 +369,7 @@ pub fn serve<T: BeaconChainTypes>(
chain: Arc<BeaconChain<T>>| async move {
match *network_globals.sync_state.read() {
SyncState::SyncingFinalized { .. } => {
let head_slot = chain
.best_slot()
.map_err(warp_utils::reject::beacon_chain_error)?;
let head_slot = chain.canonical_head.read().head_slot();

let current_slot =
chain.slot_clock.now_or_genesis().ok_or_else(|| {
Expand Down Expand Up @@ -404,35 +402,6 @@ pub fn serve<T: BeaconChainTypes>(
)
.untuple_one();

// Create a `warp` filter that rejects requests unless the head has been verified by the
// execution layer.
let only_with_safe_head = warp::any()
.and(chain_filter.clone())
.and_then(move |chain: Arc<BeaconChain<T>>| async move {
let status = chain.head_safety_status().map_err(|e| {
warp_utils::reject::custom_server_error(format!(
"failed to read head safety status: {:?}",
e
))
})?;
match status {
HeadSafetyStatus::Safe(_) => Ok(()),
HeadSafetyStatus::Unsafe(hash) => {
Err(warp_utils::reject::custom_server_error(format!(
"optimistic head hash {:?} has not been verified by the execution layer",
hash
)))
}
HeadSafetyStatus::Invalid(hash) => {
Err(warp_utils::reject::custom_server_error(format!(
"the head block has an invalid payload {:?}, this may be unrecoverable",
hash
)))
}
}
})
.untuple_one();

// Create a `warp` filter that provides access to the logger.
let inner_ctx = ctx.clone();
let log_filter = warp::any().map(move || inner_ctx.log.clone());
Expand All @@ -451,15 +420,12 @@ pub fn serve<T: BeaconChainTypes>(
.and(chain_filter.clone())
.and_then(|chain: Arc<BeaconChain<T>>| {
blocking_json_task(move || {
chain
.head_info()
.map_err(warp_utils::reject::beacon_chain_error)
.map(|head| api_types::GenesisData {
genesis_time: head.genesis_time,
genesis_validators_root: head.genesis_validators_root,
genesis_fork_version: chain.spec.genesis_fork_version,
})
.map(api_types::GenericResponse::from)
let genesis_data = api_types::GenesisData {
genesis_time: chain.genesis_time,
genesis_validators_root: chain.genesis_validators_root,
genesis_fork_version: chain.spec.genesis_fork_version,
};
Ok(api_types::GenericResponse::from(genesis_data))
})
});

Expand Down Expand Up @@ -1401,9 +1367,7 @@ pub fn serve<T: BeaconChainTypes>(
)),
)?;

chain
.import_attester_slashing(slashing)
.map_err(warp_utils::reject::beacon_chain_error)?;
chain.import_attester_slashing(slashing);
}

Ok(())
Expand Down Expand Up @@ -1744,10 +1708,7 @@ pub fn serve<T: BeaconChainTypes>(
.and_then(
|network_globals: Arc<NetworkGlobals<T::EthSpec>>, chain: Arc<BeaconChain<T>>| {
blocking_json_task(move || {
let head_slot = chain
.head_info()
.map(|info| info.slot)
.map_err(warp_utils::reject::beacon_chain_error)?;
let head_slot = chain.canonical_head.read().head_slot();
let current_slot = chain.slot_clock.now_or_genesis().ok_or_else(|| {
warp_utils::reject::custom_server_error("Unable to read slot clock".into())
})?;
Expand Down Expand Up @@ -2107,7 +2068,6 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path::end())
.and(warp::query::<api_types::ValidatorAttestationDataQuery>())
.and(not_while_syncing_filter.clone())
.and(only_with_safe_head.clone())
.and(chain_filter.clone())
.and_then(
|query: api_types::ValidatorAttestationDataQuery, chain: Arc<BeaconChain<T>>| {
Expand Down Expand Up @@ -2140,7 +2100,6 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path::end())
.and(warp::query::<api_types::ValidatorAggregateAttestationQuery>())
.and(not_while_syncing_filter.clone())
.and(only_with_safe_head.clone())
.and(chain_filter.clone())
.and_then(
|query: api_types::ValidatorAggregateAttestationQuery, chain: Arc<BeaconChain<T>>| {
Expand Down Expand Up @@ -2217,7 +2176,6 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp::path::end())
.and(warp::query::<SyncContributionData>())
.and(not_while_syncing_filter.clone())
.and(only_with_safe_head)
.and(chain_filter.clone())
.and_then(
|sync_committee_data: SyncContributionData, chain: Arc<BeaconChain<T>>| {
Expand Down Expand Up @@ -2618,7 +2576,12 @@ pub fn serve<T: BeaconChainTypes>(
.and_then(|chain: Arc<BeaconChain<T>>| {
blocking_task(move || {
Ok::<_, warp::Rejection>(warp::reply::json(&api_types::GenericResponseRef::from(
chain.fork_choice.read().proto_array().core_proto_array(),
chain
.canonical_head
.read()
.fork_choice
.proto_array()
.core_proto_array(),
)))
})
});
Expand Down Expand Up @@ -2661,9 +2624,6 @@ pub fn serve<T: BeaconChainTypes>(
.and(chain_filter.clone())
.and_then(|chain: Arc<BeaconChain<T>>| {
blocking_json_task(move || {
let head_info = chain
.head_info()
.map_err(warp_utils::reject::beacon_chain_error)?;
let current_slot_opt = chain.slot().ok();

chain
Expand All @@ -2675,7 +2635,7 @@ pub fn serve<T: BeaconChainTypes>(
)
})
.and_then(|eth1| {
eth1.sync_status(head_info.genesis_time, current_slot_opt, &chain.spec)
eth1.sync_status(chain.genesis_time, current_slot_opt, &chain.spec)
.ok_or_else(|| {
warp_utils::reject::custom_server_error(
"Unable to determine Eth1 sync status".to_string(),
Expand Down
24 changes: 15 additions & 9 deletions beacon_node/http_api/src/proposer_duties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pub fn proposer_duties<T: BeaconChainTypes>(
.safe_add(1)
.map_err(warp_utils::reject::arith_error)?
{
let (proposers, dependent_root, _) =
let (proposers, dependent_root, _execution_status, _fork) =
compute_proposer_duties_from_head(request_epoch, chain)
.map_err(warp_utils::reject::beacon_chain_error)?;
convert_to_api_response(chain, request_epoch, dependent_root, proposers)
Expand Down Expand Up @@ -88,16 +88,21 @@ fn try_proposer_duties_from_cache<T: BeaconChainTypes>(
request_epoch: Epoch,
chain: &BeaconChain<T>,
) -> Result<Option<ApiDuties>, warp::reject::Rejection> {
let head = chain
.head_info()
.map_err(warp_utils::reject::beacon_chain_error)?;
let head_epoch = head.slot.epoch(T::EthSpec::slots_per_epoch());
let (head_slot, head_block_root, head_decision_root) = {
let head = chain.canonical_head.read();
(
head.head_slot(),
head.head_block_root(),
head.head_proposer_shuffling_decision_root,
)
};
let head_epoch = head_slot.epoch(T::EthSpec::slots_per_epoch());

let dependent_root = match head_epoch.cmp(&request_epoch) {
// head_epoch == request_epoch
Ordering::Equal => head.proposer_shuffling_decision_root,
Ordering::Equal => head_decision_root,
// head_epoch < request_epoch
Ordering::Less => head.block_root,
Ordering::Less => head_block_root,
// head_epoch > request_epoch
Ordering::Greater => {
return Err(warp_utils::reject::custom_server_error(format!(
Expand Down Expand Up @@ -132,8 +137,9 @@ fn compute_and_cache_proposer_duties<T: BeaconChainTypes>(
current_epoch: Epoch,
chain: &BeaconChain<T>,
) -> Result<ApiDuties, warp::reject::Rejection> {
let (indices, dependent_root, fork) = compute_proposer_duties_from_head(current_epoch, chain)
.map_err(warp_utils::reject::beacon_chain_error)?;
let (indices, dependent_root, _execution_status, fork) =
compute_proposer_duties_from_head(current_epoch, chain)
.map_err(warp_utils::reject::beacon_chain_error)?;

// Prime the proposer shuffling cache with the newly-learned value.
chain
Expand Down
Loading

0 comments on commit ac01a7e

Please sign in to comment.