Skip to content

Commit

Permalink
Clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
emhane committed Feb 21, 2023
1 parent 1e108da commit cc152b0
Show file tree
Hide file tree
Showing 10 changed files with 234 additions and 206 deletions.
47 changes: 16 additions & 31 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@ use crate::attester_cache::{AttesterCache, AttesterCacheKey};
use crate::beacon_proposer_cache::compute_proposer_duties_from_head;
use crate::beacon_proposer_cache::BeaconProposerCache;
use crate::blob_cache::BlobCache;
use crate::blob_verification::{
verify_blobs, AsBlock, AvailabilityPendingBlock, AvailableBlock, BlockWrapper, ExecutedBlock,
ExecutedBlockAvailabiityHandle, IntoAvailablilityPendingBlock, VerifiedBlobSidecars,
};
use crate::blob_verification::{AsBlock, AvailableBlock, BlobError, BlockWrapper, ExecutedBlock};
use crate::block_times_cache::BlockTimesCache;
use crate::block_verification::{
check_block_is_finalized_descendant, check_block_relevancy, get_block_root,
Expand Down Expand Up @@ -73,9 +70,10 @@ use fork_choice::{
AttestationFromBlock, ExecutionStatus, ForkChoice, ForkchoiceUpdateParameters,
InvalidationOperation, PayloadVerificationStatus, ResetPayloadStatuses,
};
use futures::prelude::stream::futures_unordered::FuturesUnordered;
use futures::{
channel::mpsc::{Receiver, Sender},
prelude::stream::futures_unordered::FuturesUnordered,
Stream,
};
use itertools::process_results;
use itertools::Itertools;
Expand All @@ -87,7 +85,6 @@ use safe_arith::SafeArith;
use slasher::Slasher;
use slog::{crit, debug, error, info, trace, warn, Logger};
use slot_clock::SlotClock;
use smallvec::SmallVec;
use ssz::Encode;
use state_processing::{
common::get_attesting_indices_from_state,
Expand All @@ -105,10 +102,11 @@ use std::collections::HashMap;
use std::collections::HashSet;
use std::io::prelude::*;
use std::marker::PhantomData;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use store::iter::{BlockRootsIterator, ParentRootBlockIterator, StateRootsIterator};
use store::signed_beacon_block::SignedBeaconBlockRef;
use store::{
DatabaseBlock, Error as DBError, HotColdDB, KeyValueStore, KeyValueStoreOp, StoreItem, StoreOp,
};
Expand Down Expand Up @@ -441,7 +439,7 @@ pub struct BeaconChain<T: BeaconChainTypes> {
pub validator_monitor: RwLock<ValidatorMonitor<T::EthSpec>>,
pub blob_cache: BlobCache<T::EthSpec>,
pub kzg: Option<Arc<kzg::Kzg>>,
pub pending_availability_cache_tx: Sender<ExecuteBlock<T::EthSpec>>,
pub pending_availability_cache_tx: Sender<ExecutedBlock<T::EthSpec>>,
/// Borrow sender to send a blob that arrived over network.
pub pending_blobs_tx: LruCache<Hash256, Sender<Arc<SignedBlobSidecar<T::EthSpec>>>>,
/// Remove sender to include in availability-pending block when block arrives over network.
Expand All @@ -452,7 +450,7 @@ pub struct BeaconChain<T: BeaconChainTypes> {
pub struct PendingAvailabilityCache<T: EthSpec>(FuturesUnordered<ExecutedBlock<T>>);

impl<T: EthSpec> Stream for PendingAvailabilityCache<T> {
type Item = Result<ExecutedBlock<T>, BlobError>;
type Item = Result<ExecutedBlock<T>, BlobError<T>>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.0.is_empty() {
return Poll::Pending;
Expand All @@ -466,7 +464,7 @@ impl<T: EthSpec> Stream for PendingAvailabilityCache<T> {
}

impl<T: EthSpec> PendingAvailabilityCache<T> {
fn push(&mut self, block: ExecutedBlock<T>) {
pub fn push(&mut self, block: ExecutedBlock<T>) {
self.0.push(block)
}
}
Expand Down Expand Up @@ -2740,30 +2738,17 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// Increment the Prometheus counter for block processing requests.
metrics::inc_counter(&metrics::BLOCK_PROCESSING_REQUESTS);

// If a blob receiver exists for the block root, some blobs have already arrived.
let existing_rx = self.pending_blocks_rx.remove(&block_root);
let rx = match existing_rx {
Some(rx) => rx,
None => {
let (tx, rx) = oneshot::channel::<Arc<SignedBlobSidecar<T::EthSpec>>>();
self.pending_blobs_tx.put(block_root, tx);
rx
}
};
let pending_availability_block =
block.into_availability_pending_block(block_root, rx, &self);
let block: impl TryInto<AvailableBlock<T>> + IntoExecutionPendingBlock<T, B> =
match pending_availability_block.try_into() {
Ok(available_block) => available_block,
Err(BlobError::PendingAvailability) => pending_availability_block,
Err(e) => return Err(e),
};
unverified_block.into_availability_pending_block(block_root, &self);

// A small closure to group the verification and import errors.
let chain = self.clone();
let import_block = async move {
let execution_pending =
block.into_execution_pending_block(block_root, &chain, notify_execution_layer)?;
let execution_pending = pending_availability_block.into_execution_pending_block(
block_root,
&chain,
notify_execution_layer,
)?;
chain
.import_execution_pending_block(execution_pending, count_unrealized)
.await
Expand Down Expand Up @@ -2922,7 +2907,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
&self,
executed_block: ExecutedBlock<AvailableBlock<T::EthSpec>>,
) -> Result<Hash256, BlockError<T::EthSpec>> {
ExecutedBlock {
let ExecutedBlock {
block_root,
block,
state,
Expand All @@ -2934,7 +2919,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
consensus_context,
} = executed_block;
self.import_block(
block,
block.try_into()?,
block_root,
state,
confirmed_state_roots,
Expand Down
Loading

0 comments on commit cc152b0

Please sign in to comment.