Skip to content

Commit

Permalink
fixup! Boil down TryIntoAvailableBlock and IntoAvailabilityPendingBlo…
Browse files Browse the repository at this point in the history
…ck to one trait
  • Loading branch information
emhane committed Feb 27, 2023
1 parent 93db7ed commit 70bf525
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 60 deletions.
3 changes: 2 additions & 1 deletion beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2778,7 +2778,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// Verify and import the block.
match import_block.await {
// The block was successfully verified and imported. Yay.
Ok(block_root) => {
Ok(Some(block_root)) => {
trace!(
self.log,
"Beacon block imported";
Expand Down Expand Up @@ -2819,6 +2819,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
);
Err(other)
}
Ok(None) => Err(BlockError::BlockMovedToAvailabilityPendingCache),
}
}

Expand Down
118 changes: 67 additions & 51 deletions beacon_node/beacon_chain/src/blob_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use futures::{
oneshot::Canceled,
},
future::Future,
StreamExt,
FutureExt, StreamExt,
};
use kzg::Kzg;
use slog::error;
Expand Down Expand Up @@ -238,9 +238,10 @@ fn verify_data_availability<T: EthSpec>(
return Err(BlobError::TransactionCommitmentMismatch);
}

//todo(emhane)
// Validatate that the kzg proof is valid against the commitments and blobs
if !kzg_utils::validate_blob_sidecars(
**kzg,
/* if !kzg_utils::validate_blob_sidecars(
*kzg,
block_slot,
block_root,
kzg_commitments,
Expand All @@ -251,7 +252,7 @@ fn verify_data_availability<T: EthSpec>(
.into(),
)? {
return Err(BlobError::InvalidKzgProof);
}
}*/
Ok(())
}

Expand Down Expand Up @@ -553,13 +554,16 @@ impl<T: BeaconChainTypes> TryIntoAvailableBlock<T> for AvailabilityPendingBlock<
JoinHandle<Option<Result<AvailableBlock<T::EthSpec>, BlockError<T::EthSpec>>>>,
BlockError<T::EthSpec>,
> {
if !self.data_availability_handle.is_finished() {
Err(BlobError::PendingAvailability(self))?
let finished = self.data_availability_handle.is_finished();
if finished {
return Err(BlockError::BlobValidation(BlobError::PendingAvailability(
self,
)));
}
let block_root = <AvailabilityPendingBlock<<T as BeaconChainTypes>::EthSpec> as AsSignedBlock<T>>::block_root(&self);
let data_availability_handle = self.data_availability_handle;
let chain_cloned = chain.clone();
let availability_result_handle = chain.task_executor.spawn_handle(async move {
match data_availability_handle.await {
match self.data_availability_handle.await {
Ok(Some(Ok(available_block))) => Ok(available_block),
Err(_) | Ok(None) => Err(
DataAvailabilityFailure::Block(
Expand All @@ -570,24 +574,24 @@ impl<T: BeaconChainTypes> TryIntoAvailableBlock<T> for AvailabilityPendingBlock<
),
),
)?,
Ok(Some(Err(DataAvailabilityFailure::Block(block, blobs, e)))) => {
let channels = chain.pending_blocks_tx_rx.write();
Ok(Some(Err(DataAvailabilityFailure::Block(block, mut blobs, e)))) => {
let mut channels = chain_cloned.pending_blocks_tx_rx.write();
let block_root =
match block {
Some(block) => <Arc<
Some(ref block) => <Arc<
SignedBeaconBlock<<T as BeaconChainTypes>::EthSpec>,
> as AsSignedBlock<T>>::block_root(
&block
),
None => match blobs.get(0) {
Some(blob) => blob.beacon_block_root(),
None => {
Err(DataAvailabilityFailure::Block(block, blobs, e))?
return Err(BlockError::DataAvailability(DataAvailabilityFailure::Block(block, blobs, e)))
}
},
};
match channels.remove(&block_root) {
Some((_, Some(rx))) => {
Some((_, Some(mut rx))) => {
loop {
match rx.try_next() {
Ok(Some((blob, _))) => {
Expand All @@ -596,7 +600,7 @@ impl<T: BeaconChainTypes> TryIntoAvailableBlock<T> for AvailabilityPendingBlock<
Ok(None) => {}
Err(e) => {
error!(
chain.log, "Error while adding blobs to Data Availability Failure";
chain_cloned.log, "Error while adding blobs to Data Availability Failure";
"block_root" => %block_root,
"error" => %e
);
Expand All @@ -610,11 +614,11 @@ impl<T: BeaconChainTypes> TryIntoAvailableBlock<T> for AvailabilityPendingBlock<
drop(channels);
Err(DataAvailabilityFailure::Block(block, blobs, e))?
}
Ok(Some(Err(DataAvailabilityFailure::ExecutedBlock(block, blobs, e)))) => {
let channels = chain.pending_blocks_tx_rx.write();
Ok(Some(Err(DataAvailabilityFailure::ExecutedBlock(block, mut blobs, e)))) => {
let mut channels = chain_cloned.pending_blocks_tx_rx.write();
let block_root = <ExecutedBlockInError<<T as BeaconChainTypes>::EthSpec> as AsSignedBlock<T>>::block_root(&block);
match channels.remove(&block_root) {
Some((_, Some(rx))) => {
Some((_, Some(mut rx))) => {
loop {
match rx.try_next() {
Ok(Some((blob, _))) => {
Expand All @@ -623,7 +627,7 @@ impl<T: BeaconChainTypes> TryIntoAvailableBlock<T> for AvailabilityPendingBlock<
Ok(None) => {}
Err(e) => {
error!(
chain.log, "Error while adding blobs to Data Availability Failure";
chain_cloned.log, "Error while adding blobs to Data Availability Failure";
"block_root" => %block_root,
"error" => %e
);
Expand All @@ -638,7 +642,7 @@ impl<T: BeaconChainTypes> TryIntoAvailableBlock<T> for AvailabilityPendingBlock<
Err(DataAvailabilityFailure::ExecutedBlock(block, blobs, e))?
}
}
}, &format!("try_into_available_block_{block_root}"));
}, "try_into_available_block");
match availability_result_handle {
Some(handle) => Ok(handle),
None => Err(BeaconChainError::RuntimeShutdown)?,
Expand Down Expand Up @@ -673,8 +677,11 @@ pub trait TryIntoAvailableBlock<T: BeaconChainTypes>:
+ Debug
+ Into<SomeAvailabilityBlock<T::EthSpec>>
{
/// Tries to make the block available. Block must be available before importing to fork
/// choice, but musn't before.
/// Consumes self and returns an [`AvailableBlock`] on success or an
/// [`AvailabilityPendingBlock`] wrapped in the [`BlobError::PendingAvailability`] error
/// variant. On errror the parts that have been gathered so far by the
/// `data_availability_handle` are returned wrapped in the [`DataAvailabilityFailure::Block`]
/// error variant. Block must be available before importing to fork choice, but musn't before.
fn try_into_available_block(
self,
chain: &Arc<BeaconChain<T>>,
Expand All @@ -688,14 +695,11 @@ pub trait TryIntoAvailableBlock<T: BeaconChainTypes>:
))?
}

/// Consumes a block and wraps it in an [`AvailabilityPendingBlock`] with a
/// Consumes selfs and wraps the block in an [`AvailabilityPendingBlock`] with a
/// [`DataAvailabilityHandle`] to receive blobs on from the network and kzg-verify them.
/// Calling `try_into` on an [`AvailabilityPendingBlock`] returns an [`AvailableBlock`] on
/// success, and on failure returns the parts that have been gathered so far wrapped
/// in a [`DataAvailabilityFailure::Block`] error variant. Use the blobs param to start
/// the `data_availability_handle` again with any blobs returned in the previous error, for
/// example after time out waiting for blobs from gossip and this time tell an rpc worker to
/// send the missing blobs on the handle's blob channel.
/// Use the blobs param to start the `data_availability_handle` again with any blobs returned
/// in the previous error, for example after time out waiting for blobs from gossip and this
/// time tell an rpc worker to send the missing blobs on the handle's blob channel.
fn into_availability_pending_block(
self,
block_root: Hash256,
Expand All @@ -719,13 +723,18 @@ pub trait TryIntoAvailableBlock<T: BeaconChainTypes>:
let data_availability_handle = if self.slot().epoch(T::EthSpec::slots_per_epoch())
>= data_availability_boundary
{
let kzg_commitments = self.message().body().blob_kzg_commitments().map_err(|_| {
DataAvailabilityFailure::Block(
Some(block.clone()),
blobs.clone(),
BlobError::KzgCommitmentMissing,
)
})?;
let kzg_commitments = self
.message()
.body()
.blob_kzg_commitments()
.map_err(|_| {
DataAvailabilityFailure::Block(
Some(block.clone()),
blobs.clone(),
BlobError::KzgCommitmentMissing,
)
})?
.clone();
if kzg_commitments.is_empty() {
// check that txns match with empty kzg-commitments
if let Err(e) =
Expand Down Expand Up @@ -765,7 +774,7 @@ pub trait TryIntoAvailableBlock<T: BeaconChainTypes>:
channels.insert(block_root, (tx, None));
drop(channels);
let chain_cloned = chain.clone();
let data_availability_handle = chain.task_executor
let data_availability_handle = chain.clone().task_executor
.spawn_handle(
async move {
tokio::pin!(time_out);
Expand Down Expand Up @@ -807,7 +816,7 @@ pub trait TryIntoAvailableBlock<T: BeaconChainTypes>:
}
}
}
_ = time_out => {
_ = &mut time_out => {
return Err(DataAvailabilityFailure::Block(
Some(block),
blobs,
Expand All @@ -816,19 +825,28 @@ pub trait TryIntoAvailableBlock<T: BeaconChainTypes>:
}
}
}
let kzg = chain_cloned.kzg;
let kzg = chain_cloned.kzg.clone();
let block_cloned = block.clone();
let blobs_cloned = blobs.clone();
let kzg_handle = chain_cloned.task_executor.spawn_blocking_handle(
move || {
verify_blobs::<T::EthSpec>(&*block, blobs, &kzg).map_err(|e| {
DataAvailabilityFailure::Block(Some(block), blobs, e)
})
},
&format!("verify_blobs_{block_root}"),
verify_blobs::<T::EthSpec>(&*block_cloned, blobs_cloned, &kzg)
},
"kzg_verification",
);
match kzg_handle {
Some(handle) => {
if handle.await.is_err() {
return Err(DataAvailabilityFailure::Block(
match handle.await {
Ok(res) => {
if let Err(e) = res {
return Err(DataAvailabilityFailure::Block(
Some(block),
blobs,
e,
))
}
}
Err(_) => return Err(DataAvailabilityFailure::Block(
Some(block),
blobs,
BlobError::BeaconChainError(BeaconChainError::RuntimeShutdown),
Expand All @@ -845,14 +863,14 @@ pub trait TryIntoAvailableBlock<T: BeaconChainTypes>:
block, blobs,
)))
},
&format!("data_availability_block_{block_root}"),
"availability_pending_block"
);

match data_availability_handle {
Some(data_availability_handle) => data_availability_handle,
None => {
return Err(DataAvailabilityFailure::Block(
Some(block),
Some(self.block_cloned()),
VariableList::empty(),
BlobError::BeaconChainError(BeaconChainError::RuntimeShutdown),
))
Expand All @@ -875,7 +893,7 @@ pub trait TryIntoAvailableBlock<T: BeaconChainTypes>:
}
};
Ok(AvailabilityPendingBlock {
block,
block: self.block_cloned(),
data_availability_handle,
})
}
Expand Down Expand Up @@ -918,9 +936,7 @@ impl<T: BeaconChainTypes> Future for ExecutedBlock<T, AvailabilityPendingBlock<T
type Output =
Result<ExecutedBlock<T, AvailableBlock<T::EthSpec>>, DataAvailabilityFailure<T::EthSpec>>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let data_availability_handle = self.block.data_availability_handle;
tokio::pin!(data_availability_handle);
match (&mut data_availability_handle).poll(cx) {
match self.block.data_availability_handle.poll_unpin(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(Some(Ok(available_block)))) => Poll::Ready(Ok(ExecutedBlock {
block_root: self.block_root,
Expand Down
28 changes: 20 additions & 8 deletions beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ use slog::{debug, error, warn, Logger};
use slot_clock::SlotClock;
use ssz::Encode;
use ssz_types::VariableList;
use state_processing::consensus_context;
use state_processing::per_block_processing::{errors::IntoWithIndex, is_merge_transition_block};
use state_processing::{
block_signature_verifier::{BlockSignatureVerifier, Error as BlockSignatureVerifierError},
Expand Down Expand Up @@ -295,6 +296,10 @@ pub enum BlockError<T: EthSpec> {
/// Making a block available failed. The [`DataAvailabilityFailure`] error contains the block
/// or blobs that have already been received over the network.
DataAvailability(DataAvailabilityFailure<T>),
/// Block is still not yet available after payload verification and is moved as an
/// [`crate::blob_verification::ExecutedBlock`] to the
/// [`crate::beacon_chain::AvailabilityPendingCache`] where it waits till it's available.
BlockMovedToAvailabilityPendingCache,
}

impl_wrap_type_in_variant!(<T: EthSpec,>, BlobError<T>, BlockError<T>, Self::BlobValidation);
Expand Down Expand Up @@ -597,34 +602,41 @@ pub fn signature_verify_chain_segment<T: BeaconChainTypes, B: TryIntoAvailableBl

let mut signature_verified_blocks = Vec::with_capacity(chain_segment.len());

let mut consensus_contexts = Vec::with_capacity(chain_segment.len());
for (block_root, block) in &chain_segment {
let mut consensus_context =
ConsensusContext::new(block.slot()).set_current_block_root(*block_root);

signature_verifier.include_all_signatures(block.as_block(), &mut consensus_context)?;

consensus_context = consensus_context.set_kzg_commitments_consistent(true);
consensus_contexts.push(consensus_context);
}

if signature_verifier.verify().is_err() {
return Err(BlockError::InvalidSignature);
}

for ((block_root, block), consensus_context) in chain_segment
.into_iter()
.zip(consensus_contexts.into_iter())
{
//FIXME(sean) batch kzg verification
// converts the block to a type listening for blobs from network workers (if it isn't
// already this type).
let availability_pending_block =
block.into_availability_pending_block(*block_root, chain, VariableList::empty())?;

consensus_context = consensus_context.set_kzg_commitments_consistent(true);
block.into_availability_pending_block(block_root, chain, VariableList::empty())?;

// Save the block and its consensus context. The context will have had its proposer index
// and attesting indices filled in, which can be used to accelerate later block processing.
signature_verified_blocks.push(SignatureVerifiedBlock {
block: availability_pending_block,
block_root: *block_root,
block_root: block_root,
parent: None,
consensus_context,
});
}

if signature_verifier.verify().is_err() {
return Err(BlockError::InvalidSignature);
}

drop(pubkey_cache);

if let Some(signature_verified_block) = signature_verified_blocks.first_mut() {
Expand Down

0 comments on commit 70bf525

Please sign in to comment.