Skip to content

Commit

Permalink
Decouple blobs using data_availability_handle
Browse files Browse the repository at this point in the history
  • Loading branch information
emhane committed Feb 19, 2023
1 parent dd31621 commit 472d153
Show file tree
Hide file tree
Showing 7 changed files with 360 additions and 268 deletions.
127 changes: 103 additions & 24 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ use crate::attester_cache::{AttesterCache, AttesterCacheKey};
use crate::beacon_proposer_cache::compute_proposer_duties_from_head;
use crate::beacon_proposer_cache::BeaconProposerCache;
use crate::blob_cache::BlobCache;
use crate::blob_verification::{AsBlock, AvailabilityPendingBlock, AvailableBlock, BlockWrapper, IntoAvailableBlock};
use crate::blob_verification::{
AsBlock, AvailabilityPendingBlock, AvailableBlock, BlockWrapper, ExecutedBlock,
ExecutedBlockAvailabiityHandle, IntoAvailablilityPendingBlock,
};
use crate::block_times_cache::BlockTimesCache;
use crate::block_verification::{
check_block_is_finalized_descendant, check_block_relevancy, get_block_root,
Expand Down Expand Up @@ -2684,7 +2687,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
///
/// Returns an `Err` if the given block was invalid, or an error was encountered during
/// verification.
pub async fn process_block<A: IntoAvailableBlock, B: IntoExecutionPendingBlock<T, A>>(
pub async fn process_block<
A: IntoAvailabilityPendingBlock,
B: IntoExecutionPendingBlock<T, A>,
>(
self: &Arc<Self>,
block_root: Hash256,
unverified_block: B,
Expand Down Expand Up @@ -2763,7 +2769,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
///
/// An error is returned if the block was unable to be imported. It may be partially imported
/// (i.e., this function is not atomic).
async fn import_execution_pending_block<B: IntoAvailableBlock>(
async fn import_execution_pending_block<B: IntoAvailablilityPendingBlock>(
self: Arc<Self>,
execution_pending_block: ExecutionPendingBlock<T, B>,
count_unrealized: CountUnrealized,
Expand Down Expand Up @@ -2817,29 +2823,102 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
);
}

let available_block = block.into_available_block()?;

let chain = self.clone();
let block_hash = self
.spawn_blocking_handle(
move || {
chain.import_block(
available_block,
block_root,
state,
confirmed_state_roots,
payload_verification_status,
count_unrealized,
parent_block,
parent_eth1_finalization_data,
consensus_context,
let block = block.into_availability_pending_block(block_root, &self);
match block.try_into(&self) {
// If this block has blobs, at best they have all come over network while waiting on
// input from execution layer.
Ok(available_block) => {
let chain = self.clone();
let block_hash = self
.spawn_blocking_handle(
move || {
chain.import_block(
available_block,
block_root,
state,
confirmed_state_roots,
payload_verification_status,
count_unrealized,
parent_block,
parent_eth1_finalization_data,
consensus_context,
)
},
"payload_verification_handle",
)
},
"payload_verification_handle",
)
.await??;
.await??;

Ok(block_hash)
Ok(block_hash)
}
Err(BlobError::PendingAvailability) => {
// move block to a background thread, like we use migrator, to do following.

let (cache_block, blob_cache_update) = block.cache_item();
self.block_pending_availability_cache
.put(&block_root, cache_block);
let ExecutedBlockAvailabilityHandle { expected_blobs, tx } = blob_cache_update;

// atomically check if expected blobs number blobs have been received over
// network, otherwise update expected blobs cache item. Always check on
// insert into blobs cache item's blobs list if expected number of blobs have
// arrived.
let blob_cache_entry = self
.blobs_pending_availability_cache
.get(&block_root)
.write();
if *blob.blobs.len() >= expected_blobs {
tx.send(());
} else {
*blob_cahce_entry.expected_blobs = Some(expected_blobs);
*blob_cahce_entry.sender = Some(tx);
}
drop(blob_cache_entry);

/// **** When blobs arrive over network, don't call `process_block` from the background thread again, but instead use

/*
let ExecutedBlock {
block,
block_root,
state,
confirmed_state_roots,
payload_verification_status,
count_unrealized,
parent_block,
parent_eth1_finalization_data,
consensus_context,
} = cache_block.block;
let available_block = block.try_into(&self.chain)?;
let chain = self.chain.clone()
let block_hash = self
.spawn_blocking_handle(
move || {
chain.import_block(
available_block,
block_root,
state,
confirmed_state_roots,
payload_verification_status,
count_unrealized,
parent_block,
parent_eth1_finalization_data,
consensus_context,
)
},
"payload_verification_handle",
)
.await??;
Ok(block_hash)
*/

/// ****
Err(BlobError::PendingAvailability)
}
Err(e) => Err(e),
}
}

/// Accepts a fully-verified block and imports it into the chain without performing any
Expand Down
Loading

0 comments on commit 472d153

Please sign in to comment.