From 9445ac70d8af36852c69bca62ff9e26697f02647 Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Tue, 17 Jan 2023 09:53:37 +0100 Subject: [PATCH 01/16] Check data availability boundary in rpc request --- beacon_node/beacon_chain/src/beacon_chain.rs | 33 +++++++-- .../beacon_processor/worker/rpc_methods.rs | 72 ++++++++++++++----- consensus/types/src/signed_beacon_block.rs | 7 +- consensus/types/src/signed_block_and_blobs.rs | 12 +++- 4 files changed, 99 insertions(+), 25 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index caa73401e22..e5bb002d5ba 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -2486,7 +2486,7 @@ impl BeaconChain { while let Some((_root, block)) = filtered_chain_segment.first() { // Determine the epoch of the first block in the remaining segment. - let start_epoch = block.slot().epoch(T::EthSpec::slots_per_epoch()); + let start_epoch = block.epoch(); // The `last_index` indicates the position of the first block in an epoch greater // than the current epoch: partitioning the blocks into a run of blocks in the same @@ -2494,9 +2494,7 @@ impl BeaconChain { // the same `BeaconState`. let last_index = filtered_chain_segment .iter() - .position(|(_root, block)| { - block.slot().epoch(T::EthSpec::slots_per_epoch()) > start_epoch - }) + .position(|(_root, block)| block.epoch() > start_epoch) .unwrap_or(filtered_chain_segment.len()); let mut blocks = filtered_chain_segment.split_off(last_index); @@ -3162,7 +3160,7 @@ impl BeaconChain { // Sync aggregate. if let Ok(sync_aggregate) = block.body().sync_aggregate() { // `SyncCommittee` for the sync_aggregate should correspond to the duty slot - let duty_epoch = block.slot().epoch(T::EthSpec::slots_per_epoch()); + let duty_epoch = block.epoch(); match self.sync_committee_at_epoch(duty_epoch) { Ok(sync_committee) => { @@ -3429,7 +3427,7 @@ impl BeaconChain { parent_block_slot: Slot, ) { // Do not write to eth1 finalization cache for blocks older than 5 epochs. - if block.slot().epoch(T::EthSpec::slots_per_epoch()) + 5 < current_epoch { + if block.epoch() + 5 < current_epoch { return; } @@ -5860,6 +5858,29 @@ impl BeaconChain { .flatten() } + /// The epoch since which we cater blob data upon a request 'ByRoot'. + /// `None` if the `Eip4844` fork is disabled. + pub fn data_availability_boundary_by_root_rpc_request(&self) -> Option { + self.spec + .eip4844_fork_epoch + .map(|fork_epoch| { + self.epoch().ok().map(|current_epoch| { + vec![ + fork_epoch, + current_epoch.saturating_sub(*MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS), + self.canonical_head + .cached_head() + .finalized_checkpoint() + .epoch, + ] + .into_iter() + .max() + }) + }) + .flatten() + .flatten() + } + /// Returns `true` if we are at or past the `Eip4844` fork. This will always return `false` if /// the `Eip4844` fork is disabled. pub fn is_data_availability_check_required(&self) -> Result { diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index f08ffe1e61e..62e6934ff0b 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -246,19 +246,39 @@ impl Worker { "request_root" => ?root ); } - Ok((Some(_), None)) => { - debug!( - self.log, - "Peer requested block and blob, but no blob found"; - "peer" => %peer_id, - "request_root" => ?root - ); - self.send_error_response( - peer_id, - RPCResponseErrorCode::ResourceUnavailable, - "No blob for requested block".into(), - request_id, - ); + Ok((Some(block), None)) => { + let data_availability_boundary_by_root = self.chain.data_availability_boundary_by_root_rpc_request(); + let block_epoch = block.epoch(); + + if Some(block_epoch) >= data_availability_boundary_by_root { + debug!( + self.log, + "Peer requested block and blob that should be available, but no blob found"; + "peer" => %peer_id, + "request_root" => ?root, + "data_availability_boundary_by_root" => data_availability_boundary_by_root, + ); + self.send_error_response( + peer_id, + RPCResponseErrorCode::ResourceUnavailable, + "No blob for requested block.".into(), + request_id, + ); + } else { + debug!( + self.log, + "Peer requested block and blob older than the data availability boundary for ByRoot request, no blob found"; + "peer" => %peer_id, + "request_root" => ?root, + "data_availability_boundary_by_root" => data_availability_boundary_by_root, + ); + self.send_error_response( + peer_id, + RPCResponseErrorCode::ResourceUnavailable, + format!("No blob for requested block. Requested blob is older than the data availability boundary for a ByRoot request, currently at epoch {:?}", data_availability_boundary_by_root), + request_id, + ); + } send_response = false; break; } @@ -592,15 +612,33 @@ impl Worker { "start_slot" => req.start_slot, ); + let start_slot = Slot::from(req.start_slot); + let start_epoch = start_slot.epoch(T::EthSpec::slots_per_epoch()); + let data_availability_boundary = self.chain.data_availability_boundary(); + + if Some(start_epoch) < data_availability_boundary { + let oldest_blob_slot = self + .chain + .store + .get_blob_info() + .map(|blob_info| blob_info.oldest_blob_slot); + + debug!(self.log, "Range request start slot is older than data availability boundary"; "requested_slot" => req.start_slot, "oldest_known_slot" => ?oldest_blob_slot, "data_availability_boundary" => data_availability_boundary); + + return self.send_error_response( + peer_id, + RPCResponseErrorCode::ResourceUnavailable, + format!("Requested start slot in epoch {}. Data availability boundary is currently at epoch {:?}", start_epoch, data_availability_boundary), + request_id, + ); + } + // Should not send more than max request blocks if req.count > MAX_REQUEST_BLOBS_SIDECARS { req.count = MAX_REQUEST_BLOBS_SIDECARS; } - let forwards_block_root_iter = match self - .chain - .forwards_iter_block_roots(Slot::from(req.start_slot)) - { + let forwards_block_root_iter = match self.chain.forwards_iter_block_roots(start_slot) { Ok(iter) => iter, Err(BeaconChainError::HistoricalBlockError( HistoricalBlockError::BlockOutOfRange { diff --git a/consensus/types/src/signed_beacon_block.rs b/consensus/types/src/signed_beacon_block.rs index f57169c72d0..89d063365b9 100644 --- a/consensus/types/src/signed_beacon_block.rs +++ b/consensus/types/src/signed_beacon_block.rs @@ -195,7 +195,7 @@ impl> SignedBeaconBlock } let domain = spec.get_domain( - self.slot().epoch(E::slots_per_epoch()), + self.epoch(), Domain::BeaconProposer, fork, genesis_validators_root, @@ -227,6 +227,11 @@ impl> SignedBeaconBlock self.message().slot() } + /// Convenience accessor for the block's epoch. + pub fn epoch(&self) -> Epoch { + self.message().slot().epoch(E::slots_per_epoch()) + } + /// Convenience accessor for the block's parent root. pub fn parent_root(&self) -> Hash256 { self.message().parent_root() diff --git a/consensus/types/src/signed_block_and_blobs.rs b/consensus/types/src/signed_block_and_blobs.rs index c589fbcfeb9..9d8f4f627bd 100644 --- a/consensus/types/src/signed_block_and_blobs.rs +++ b/consensus/types/src/signed_block_and_blobs.rs @@ -1,5 +1,7 @@ use crate::signed_beacon_block::BlobReconstructionError; -use crate::{BlobsSidecar, EthSpec, Hash256, SignedBeaconBlock, SignedBeaconBlockEip4844, Slot}; +use crate::{ + BlobsSidecar, Epoch, EthSpec, Hash256, SignedBeaconBlock, SignedBeaconBlockEip4844, Slot, +}; use derivative::Derivative; use serde_derive::{Deserialize, Serialize}; use ssz::{Decode, DecodeError}; @@ -74,6 +76,14 @@ impl BlockWrapper { } } } + pub fn epoch(&self) -> Epoch { + match &self.0 { + BlockWrapperInner::Block(block) => block.epoch(), + BlockWrapperInner::BlockAndBlob(block_sidecar_pair) => { + block_sidecar_pair.beacon_block.epoch() + } + } + } pub fn block(&self) -> &SignedBeaconBlock { match &self.0 { BlockWrapperInner::Block(block) => &block, From b4ec4c1ccf9cad988e34323513644a444a3e31b0 Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Wed, 18 Jan 2023 14:17:11 +0100 Subject: [PATCH 02/16] Less strict handling of faulty rpc req params and syntax improvement --- .../beacon_processor/worker/rpc_methods.rs | 84 +++++++++++-------- 1 file changed, 51 insertions(+), 33 deletions(-) diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index 62e6934ff0b..61359f031b4 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -440,7 +440,10 @@ impl Worker { oldest_block_slot, }, )) => { - debug!(self.log, "Range request failed during backfill"; "requested_slot" => slot, "oldest_known_slot" => oldest_block_slot); + debug!(self.log, "Range request failed during backfill"; + "requested_slot" => slot, + "oldest_known_slot" => oldest_block_slot + ); return self.send_error_response( peer_id, RPCResponseErrorCode::ResourceUnavailable, @@ -616,46 +619,61 @@ impl Worker { let start_epoch = start_slot.epoch(T::EthSpec::slots_per_epoch()); let data_availability_boundary = self.chain.data_availability_boundary(); - if Some(start_epoch) < data_availability_boundary { - let oldest_blob_slot = self - .chain - .store - .get_blob_info() - .map(|blob_info| blob_info.oldest_blob_slot); + let serve_blobs_from_slot = match data_availability_boundary { + Some(data_availability_boundary_epoch) => { + if Some(start_epoch) < data_availability_boundary { + let oldest_blob_slot = self + .chain + .store + .get_blob_info() + .map(|blob_info| blob_info.oldest_blob_slot); - debug!(self.log, "Range request start slot is older than data availability boundary"; "requested_slot" => req.start_slot, "oldest_known_slot" => ?oldest_blob_slot, "data_availability_boundary" => data_availability_boundary); + debug!( + self.log, + "Range request start slot is older than data availability boundary"; + "requested_slot" => req.start_slot, + "oldest_known_slot" => ?oldest_blob_slot, + "data_availability_boundary" => data_availability_boundary + ); - return self.send_error_response( - peer_id, - RPCResponseErrorCode::ResourceUnavailable, - format!("Requested start slot in epoch {}. Data availability boundary is currently at epoch {:?}", start_epoch, data_availability_boundary), - request_id, - ); - } + data_availability_boundary_epoch.start_slot(T::EthSpec::slots_per_epoch()) + } else { + start_slot + } + } + None => { + debug!(self.log, "Eip4844 fork is disabled"); + return; + } + }; // Should not send more than max request blocks if req.count > MAX_REQUEST_BLOBS_SIDECARS { req.count = MAX_REQUEST_BLOBS_SIDECARS; } - let forwards_block_root_iter = match self.chain.forwards_iter_block_roots(start_slot) { - Ok(iter) => iter, - Err(BeaconChainError::HistoricalBlockError( - HistoricalBlockError::BlockOutOfRange { - slot, - oldest_block_slot, - }, - )) => { - debug!(self.log, "Range request failed during backfill"; "requested_slot" => slot, "oldest_known_slot" => oldest_block_slot); - return self.send_error_response( - peer_id, - RPCResponseErrorCode::ResourceUnavailable, - "Backfilling".into(), - request_id, - ); - } - Err(e) => return error!(self.log, "Unable to obtain root iter"; "error" => ?e), - }; + let forwards_block_root_iter = + match self.chain.forwards_iter_block_roots(serve_blobs_from_slot) { + Ok(iter) => iter, + Err(BeaconChainError::HistoricalBlockError( + HistoricalBlockError::BlockOutOfRange { + slot, + oldest_block_slot, + }, + )) => { + debug!(self.log, "Range request failed during backfill"; + "requested_slot" => slot, + "oldest_known_slot" => oldest_block_slot + ); + return self.send_error_response( + peer_id, + RPCResponseErrorCode::ResourceUnavailable, + "Backfilling".into(), + request_id, + ); + } + Err(e) => return error!(self.log, "Unable to obtain root iter"; "error" => ?e), + }; // Pick out the required blocks, ignoring skip-slots. let mut last_block_root = req From a00b3558001c4d5aa3728bf87ba3781a7b257d79 Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Wed, 18 Jan 2023 18:52:18 +0100 Subject: [PATCH 03/16] fixup! Less strict handling of faulty rpc req params and syntax improvement --- beacon_node/beacon_chain/src/beacon_chain.rs | 31 +++++++------------- 1 file changed, 11 insertions(+), 20 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index e5bb002d5ba..b1f9c8d3194 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -5858,27 +5858,18 @@ impl BeaconChain { .flatten() } - /// The epoch since which we cater blob data upon a request 'ByRoot'. + /// The epoch that is a data availability boundary, or the latest finalized epoch. /// `None` if the `Eip4844` fork is disabled. - pub fn data_availability_boundary_by_root_rpc_request(&self) -> Option { - self.spec - .eip4844_fork_epoch - .map(|fork_epoch| { - self.epoch().ok().map(|current_epoch| { - vec![ - fork_epoch, - current_epoch.saturating_sub(*MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS), - self.canonical_head - .cached_head() - .finalized_checkpoint() - .epoch, - ] - .into_iter() - .max() - }) - }) - .flatten() - .flatten() + pub fn finalized_data_availability_boundary(&self) -> Option { + self.data_availability_boundary().map(|boundary| { + std::cmp::max( + boundary, + self.canonical_head + .cached_head() + .finalized_checkpoint() + .epoch, + ) + }) } /// Returns `true` if we are at or past the `Eip4844` fork. This will always return `false` if From 654e59cbba530420d087516c809791d719e7b085 Mon Sep 17 00:00:00 2001 From: Emilia Hane <58548332+emhane@users.noreply.github.com> Date: Wed, 18 Jan 2023 19:40:15 +0100 Subject: [PATCH 04/16] Fix rename fn bug Co-authored-by: realbigsean --- beacon_node/network/src/beacon_processor/worker/rpc_methods.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index 61359f031b4..2df13f4ea42 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -247,7 +247,7 @@ impl Worker { ); } Ok((Some(block), None)) => { - let data_availability_boundary_by_root = self.chain.data_availability_boundary_by_root_rpc_request(); + let data_availability_boundary_by_root = self.chain.finalized_data_availability_boundary(); let block_epoch = block.epoch(); if Some(block_epoch) >= data_availability_boundary_by_root { From 9cc25162e2f445a5fecce93481360a4b9d4e6d14 Mon Sep 17 00:00:00 2001 From: Emilia Hane <58548332+emhane@users.noreply.github.com> Date: Wed, 18 Jan 2023 19:48:16 +0100 Subject: [PATCH 05/16] Send error message if eip4844 fork disabled Co-authored-by: realbigsean --- .../network/src/beacon_processor/worker/rpc_methods.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index 2df13f4ea42..8a3ce9b8bcd 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -643,6 +643,12 @@ impl Worker { } None => { debug!(self.log, "Eip4844 fork is disabled"); + self.send_error_response( + peer_id, + RPCResponseErrorCode::ResourceUnavailable, + "Backfilling".into(), + request_id, + ); return; } }; From 89cb58d17b9ca79b151104e75625e31b685b29bc Mon Sep 17 00:00:00 2001 From: Emilia Hane <58548332+emhane@users.noreply.github.com> Date: Wed, 18 Jan 2023 19:54:35 +0100 Subject: [PATCH 06/16] Fix typo Co-authored-by: realbigsean --- beacon_node/network/src/beacon_processor/worker/rpc_methods.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index 8a3ce9b8bcd..b697c0e7bf4 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -646,7 +646,7 @@ impl Worker { self.send_error_response( peer_id, RPCResponseErrorCode::ResourceUnavailable, - "Backfilling".into(), + "Eip4844 fork is disabled".into(), request_id, ); return; From f7f64eb0078b884bc16ba94a0954efa89ffc8bb8 Mon Sep 17 00:00:00 2001 From: realbigsean Date: Wed, 18 Jan 2023 16:27:12 -0500 Subject: [PATCH 07/16] fix/consolidate some error handling --- beacon_node/beacon_chain/src/beacon_chain.rs | 58 ++++++++++++------- .../beacon_processor/worker/rpc_methods.rs | 33 ++++++++--- 2 files changed, 63 insertions(+), 28 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index b1f9c8d3194..dac3dee8d90 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -971,7 +971,7 @@ impl BeaconChain { } Ok(( self.get_block(block_root).await?.map(Arc::new), - self.get_blobs(block_root).ok().flatten().map(Arc::new), + self.get_blobs(block_root)?.map(Arc::new), )) } @@ -1044,9 +1044,17 @@ impl BeaconChain { /// Returns the blobs at the given root, if any. /// - /// ## Errors + /// Returns `Ok(None)` if the blobs are not found. This could indicate the blob has been pruned + /// or that the block it is referenced by doesn't exist in our database. /// - /// May return a database error. + /// If we can find the corresponding block in our database, we know whether we *should* have + /// blobs. If we should have blobs and no blobs are found, this will error. If we shouldn't, + /// this will reconstruct an empty `BlobsSidecar`. + /// + /// ## Errors + /// - any database read errors + /// - block and blobs are inconsistent in the database + /// - this method is called with a pre-eip4844 block root pub fn get_blobs( &self, block_root: &Hash256, @@ -1054,23 +1062,33 @@ impl BeaconChain { match self.store.get_blobs(block_root)? { Some(blobs) => Ok(Some(blobs)), None => { - if let Ok(Some(block)) = self.get_blinded_block(block_root) { - let expected_kzg_commitments = block.message().body().blob_kzg_commitments()?; - - if expected_kzg_commitments.len() > 0 { - Err(Error::DBInconsistent(format!( - "Expected kzg commitments but no blobs stored for block root {}", - block_root - ))) - } else { - Ok(Some(BlobsSidecar::empty_from_parts( - *block_root, - block.slot(), - ))) - } - } else { - Ok(None) - } + // Check for the corresponding block to understand whether we *should* have blobs. + self + .get_blinded_block(block_root)? + .map(|block| { + // If there are no KZG commitments in the block, we know the sidecar should + // be empty. + let expected_kzg_commitments = block.message().body().blob_kzg_commitments()?; + if expected_kzg_commitments.is_empty() { + Ok(Some(BlobsSidecar::empty_from_parts( + *block_root, + block.slot(), + ))) + } else { + if let Some(boundary) = self.data_availability_boundary() { + // We should have blobs for all blocks after the boundary. + if boundary <= block.epoch() { + return Err(Error::DBInconsistent(format!( + "Expected kzg commitments but no blobs stored for block root {}", + block_root + ))) + } + } + Ok(None) + } + }) + .transpose() + .map(Option::flatten) } } } diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index b697c0e7bf4..6e48389b558 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -247,35 +247,38 @@ impl Worker { ); } Ok((Some(block), None)) => { - let data_availability_boundary_by_root = self.chain.finalized_data_availability_boundary(); + let finalized_data_availability_boundary = self.chain.finalized_data_availability_boundary(); let block_epoch = block.epoch(); - if Some(block_epoch) >= data_availability_boundary_by_root { + if Some(block_epoch) >= finalized_data_availability_boundary { debug!( self.log, "Peer requested block and blob that should be available, but no blob found"; "peer" => %peer_id, "request_root" => ?root, - "data_availability_boundary_by_root" => data_availability_boundary_by_root, + "finalized_data_availability_boundary" => finalized_data_availability_boundary, ); self.send_error_response( peer_id, RPCResponseErrorCode::ResourceUnavailable, - "No blob for requested block.".into(), + "Blobs unavailable".into(), request_id, ); + send_response = false; + break; } else { debug!( self.log, - "Peer requested block and blob older than the data availability boundary for ByRoot request, no blob found"; + "Peer requested block and blob older than the data availability \ + boundary for ByRoot request, no blob found"; "peer" => %peer_id, "request_root" => ?root, - "data_availability_boundary_by_root" => data_availability_boundary_by_root, + "finalized_data_availability_boundary" => finalized_data_availability_boundary, ); self.send_error_response( peer_id, RPCResponseErrorCode::ResourceUnavailable, - format!("No blob for requested block. Requested blob is older than the data availability boundary for a ByRoot request, currently at epoch {:?}", data_availability_boundary_by_root), + "Blobs unavailable".into(), request_id, ); } @@ -717,7 +720,7 @@ impl Worker { let block_roots = block_roots.into_iter().flatten().collect::>(); let mut blobs_sent = 0; - let send_response = true; + let mut send_response = true; for root in block_roots { match self.chain.get_blobs(&root) { @@ -735,6 +738,13 @@ impl Worker { "No blobs or block in the store for block root"; "block_root" => ?root ); + self.send_error_response( + peer_id, + RPCResponseErrorCode::ResourceUnavailable, + "Blobs unavailable".into(), + request_id, + ); + send_response = false; break; } Err(e) => { @@ -744,6 +754,13 @@ impl Worker { "block_root" => ?root, "error" => ?e ); + self.send_error_response( + peer_id, + RPCResponseErrorCode::ServerError, + "Failed fetching blobs".into(), + request_id, + ); + send_response = false; break; } } From e1ce4e5b7851f7d18c4d74758d5a3ec213aa794d Mon Sep 17 00:00:00 2001 From: realbigsean Date: Wed, 18 Jan 2023 17:47:32 -0500 Subject: [PATCH 08/16] make explicity BlobsUnavailable error and handle it directly --- beacon_node/beacon_chain/src/beacon_chain.rs | 11 +++---- beacon_node/beacon_chain/src/errors.rs | 1 + .../beacon_processor/worker/rpc_methods.rs | 30 +++++++++++++++++++ 3 files changed, 35 insertions(+), 7 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index dac3dee8d90..a43d82ff329 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -1063,12 +1063,12 @@ impl BeaconChain { Some(blobs) => Ok(Some(blobs)), None => { // Check for the corresponding block to understand whether we *should* have blobs. - self - .get_blinded_block(block_root)? + self.get_blinded_block(block_root)? .map(|block| { // If there are no KZG commitments in the block, we know the sidecar should // be empty. - let expected_kzg_commitments = block.message().body().blob_kzg_commitments()?; + let expected_kzg_commitments = + block.message().body().blob_kzg_commitments()?; if expected_kzg_commitments.is_empty() { Ok(Some(BlobsSidecar::empty_from_parts( *block_root, @@ -1078,10 +1078,7 @@ impl BeaconChain { if let Some(boundary) = self.data_availability_boundary() { // We should have blobs for all blocks after the boundary. if boundary <= block.epoch() { - return Err(Error::DBInconsistent(format!( - "Expected kzg commitments but no blobs stored for block root {}", - block_root - ))) + return Err(Error::BlobsUnavailable); } } Ok(None) diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index 25f2554f3d2..e744e2af5b4 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -209,6 +209,7 @@ pub enum BeaconChainError { BlsToExecutionChangeBadFork(ForkName), InconsistentFork(InconsistentFork), ProposerHeadForkChoiceError(fork_choice::Error), + BlobsUnavailable, } easy_from_to!(SlotProcessingError, BeaconChainError); diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index 6e48389b558..8888e6f9f7d 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -293,6 +293,21 @@ impl Worker { "request_root" => ?root ); } + Err(BeaconChainError::BlobsUnavailable) => { + error!( + self.log, + "No blobs in the store for block root"; + "block_root" => ?root + ); + self.send_error_response( + peer_id, + RPCResponseErrorCode::ResourceUnavailable, + "Blobs unavailable".into(), + request_id, + ); + send_response = false; + break; + } Err(BeaconChainError::BlockHashMissingFromExecutionLayer(_)) => { debug!( self.log, @@ -747,6 +762,21 @@ impl Worker { send_response = false; break; } + Err(BeaconChainError::BlobsUnavailable) => { + error!( + self.log, + "No blobs in the store for block root"; + "block_root" => ?root + ); + self.send_error_response( + peer_id, + RPCResponseErrorCode::ResourceUnavailable, + "Blobs unavailable".into(), + request_id, + ); + send_response = false; + break; + } Err(e) => { error!( self.log, From c6479444c2b87e3c61e21edbd43154e08f6fb43d Mon Sep 17 00:00:00 2001 From: realbigsean Date: Wed, 18 Jan 2023 18:01:46 -0500 Subject: [PATCH 09/16] don't send errors when we *correctly* don't have blobs --- .../beacon_processor/worker/rpc_methods.rs | 23 ------------------- 1 file changed, 23 deletions(-) diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index 8888e6f9f7d..aff0c765110 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -258,14 +258,6 @@ impl Worker { "request_root" => ?root, "finalized_data_availability_boundary" => finalized_data_availability_boundary, ); - self.send_error_response( - peer_id, - RPCResponseErrorCode::ResourceUnavailable, - "Blobs unavailable".into(), - request_id, - ); - send_response = false; - break; } else { debug!( self.log, @@ -275,15 +267,7 @@ impl Worker { "request_root" => ?root, "finalized_data_availability_boundary" => finalized_data_availability_boundary, ); - self.send_error_response( - peer_id, - RPCResponseErrorCode::ResourceUnavailable, - "Blobs unavailable".into(), - request_id, - ); } - send_response = false; - break; } Ok((None, Some(_))) => { debug!( @@ -753,13 +737,6 @@ impl Worker { "No blobs or block in the store for block root"; "block_root" => ?root ); - self.send_error_response( - peer_id, - RPCResponseErrorCode::ResourceUnavailable, - "Blobs unavailable".into(), - request_id, - ); - send_response = false; break; } Err(BeaconChainError::BlobsUnavailable) => { From 8e57eef0ed9f679a75c0dc80824f9064dd935f20 Mon Sep 17 00:00:00 2001 From: realbigsean Date: Wed, 18 Jan 2023 18:15:03 -0500 Subject: [PATCH 10/16] return a `BlobsUnavailable` error when the block root is a pre-4844 block --- beacon_node/beacon_chain/src/beacon_chain.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index a43d82ff329..99b463e0bce 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -1068,7 +1068,10 @@ impl BeaconChain { // If there are no KZG commitments in the block, we know the sidecar should // be empty. let expected_kzg_commitments = - block.message().body().blob_kzg_commitments()?; + match block.message().body().blob_kzg_commitments() { + Ok(kzg_commitments) => kzg_commitments, + Err(_) => return Err(Error::BlobsUnavailable), + }; if expected_kzg_commitments.is_empty() { Ok(Some(BlobsSidecar::empty_from_parts( *block_root, From f7eb89ddd9ef9ed341ec3f7b566b84755de6e0ed Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Fri, 20 Jan 2023 21:16:34 +0100 Subject: [PATCH 11/16] Improve error handling --- beacon_node/beacon_chain/src/beacon_chain.rs | 6 +-- beacon_node/beacon_chain/src/errors.rs | 2 + .../beacon_processor/worker/rpc_methods.rs | 38 +++++++++++++++++-- 3 files changed, 39 insertions(+), 7 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 99b463e0bce..aba9be32620 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -1070,7 +1070,7 @@ impl BeaconChain { let expected_kzg_commitments = match block.message().body().blob_kzg_commitments() { Ok(kzg_commitments) => kzg_commitments, - Err(_) => return Err(Error::BlobsUnavailable), + Err(_) => return Err(Error::NoKzgCommitmentsFieldOnBlock), }; if expected_kzg_commitments.is_empty() { Ok(Some(BlobsSidecar::empty_from_parts( @@ -1079,12 +1079,12 @@ impl BeaconChain { ))) } else { if let Some(boundary) = self.data_availability_boundary() { - // We should have blobs for all blocks after the boundary. + // We should have blobs for all blocks younger than the boundary. if boundary <= block.epoch() { return Err(Error::BlobsUnavailable); } } - Ok(None) + Err(Error::BlobsOlderThanDataAvailabilityBoundary) } }) .transpose() diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index e744e2af5b4..9a9b09fe1f9 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -210,6 +210,8 @@ pub enum BeaconChainError { InconsistentFork(InconsistentFork), ProposerHeadForkChoiceError(fork_choice::Error), BlobsUnavailable, + NoKzgCommitmentsFieldOnBlock, + BlobsOlderThanDataAvailabilityBoundary, } easy_from_to!(SlotProcessingError, BeaconChainError); diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index aff0c765110..441a82c12be 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -251,7 +251,7 @@ impl Worker { let block_epoch = block.epoch(); if Some(block_epoch) >= finalized_data_availability_boundary { - debug!( + error!( self.log, "Peer requested block and blob that should be available, but no blob found"; "peer" => %peer_id, @@ -270,7 +270,7 @@ impl Worker { } } Ok((None, Some(_))) => { - debug!( + error!( self.log, "Peer requested block and blob, but no block found"; "peer" => %peer_id, @@ -754,17 +754,47 @@ impl Worker { send_response = false; break; } + Err(BeaconChainError::NoKzgCommitmentsFieldOnBlock) => { + error!( + self.log, + "No kzg_commitments field in block"; + "block_root" => ?root, + ); + self.send_error_response( + peer_id, + RPCResponseErrorCode::ResourceUnavailable, + "Failed reading field kzg_commitments from block".into(), + request_id, + ); + send_response = false; + break; + } + Err(BeaconChainError::BlobsOlderThanDataAvailabilityBoundary) => { + error!( + self.log, + "Failed loading blobs older than data availability boundary"; + "block_root" => ?root, + ); + self.send_error_response( + peer_id, + RPCResponseErrorCode::ResourceUnavailable, + "Blobs older than data availability boundary".into(), + request_id, + ); + send_response = false; + break; + } Err(e) => { error!( self.log, - "Error fetching blob for peer"; + "Error fetching blinded block for block root"; "block_root" => ?root, "error" => ?e ); self.send_error_response( peer_id, RPCResponseErrorCode::ServerError, - "Failed fetching blobs".into(), + "No blobs and failed fetching corresponding block".into(), request_id, ); send_response = false; From 5fc648217d58d57ea5536a8f602696a8bf8e1e9b Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Sat, 21 Jan 2023 14:46:24 +0100 Subject: [PATCH 12/16] fixup! Improve error handling --- .../beacon_processor/worker/rpc_methods.rs | 48 ++++++++++++------- 1 file changed, 31 insertions(+), 17 deletions(-) diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index 441a82c12be..ba16905aa8e 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -250,23 +250,37 @@ impl Worker { let finalized_data_availability_boundary = self.chain.finalized_data_availability_boundary(); let block_epoch = block.epoch(); - if Some(block_epoch) >= finalized_data_availability_boundary { - error!( - self.log, - "Peer requested block and blob that should be available, but no blob found"; - "peer" => %peer_id, - "request_root" => ?root, - "finalized_data_availability_boundary" => finalized_data_availability_boundary, - ); - } else { - debug!( - self.log, - "Peer requested block and blob older than the data availability \ - boundary for ByRoot request, no blob found"; - "peer" => %peer_id, - "request_root" => ?root, - "finalized_data_availability_boundary" => finalized_data_availability_boundary, - ); + match finalized_data_availability_boundary { + Some(boundary_epoch) => { + if block_epoch >= finalized_data_availability_boundary { + error!( + self.log, + "Peer requested block and blob that should be available, but no blob found"; + "peer" => %peer_id, + "request_root" => ?root, + "finalized_data_availability_boundary" => finalized_data_availability_boundary, + ); + } else { + debug!( + self.log, + "Peer requested block and blob older than the data availability \ + boundary for ByRoot request, no blob found"; + "peer" => %peer_id, + "request_root" => ?root, + "finalized_data_availability_boundary" => finalized_data_availability_boundary, + ); + } + } + None => { + debug!(self.log, "Eip4844 fork is disabled"); + self.send_error_response( + peer_id, + RPCResponseErrorCode::ResourceUnavailable, + "Eip4844 fork is disabled".into(), + request_id, + ); + return; + } } } Ok((None, Some(_))) => { From f32f08eec0df9b1f7dab1590e255392c85b1127a Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Sat, 21 Jan 2023 14:47:14 +0100 Subject: [PATCH 13/16] Fix typo --- .../network/src/beacon_processor/worker/rpc_methods.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index ba16905aa8e..00b7038cf1a 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -367,7 +367,7 @@ impl Worker { self.send_error_response( peer_id, RPCResponseErrorCode::ResourceUnavailable, - "Bootstrap not avaiable".into(), + "Bootstrap not available".into(), request_id, ); return; @@ -377,7 +377,7 @@ impl Worker { self.send_error_response( peer_id, RPCResponseErrorCode::ResourceUnavailable, - "Bootstrap not avaiable".into(), + "Bootstrap not available".into(), request_id, ); return; @@ -390,7 +390,7 @@ impl Worker { self.send_error_response( peer_id, RPCResponseErrorCode::ResourceUnavailable, - "Bootstrap not avaiable".into(), + "Bootstrap not available".into(), request_id, ); return; @@ -400,7 +400,7 @@ impl Worker { self.send_error_response( peer_id, RPCResponseErrorCode::ResourceUnavailable, - "Bootstrap not avaiable".into(), + "Bootstrap not available".into(), request_id, ); return; @@ -412,7 +412,7 @@ impl Worker { self.send_error_response( peer_id, RPCResponseErrorCode::ResourceUnavailable, - "Bootstrap not avaiable".into(), + "Bootstrap not available".into(), request_id, ); return; From 81a754577dcfe337d0ffd23822839738088cf23a Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Sat, 21 Jan 2023 15:47:33 +0100 Subject: [PATCH 14/16] fixup! Improve error handling --- .../lighthouse_network/src/rpc/methods.rs | 16 ++- .../beacon_processor/worker/rpc_methods.rs | 113 ++++++++++++------ 2 files changed, 94 insertions(+), 35 deletions(-) diff --git a/beacon_node/lighthouse_network/src/rpc/methods.rs b/beacon_node/lighthouse_network/src/rpc/methods.rs index 02e24d8e1d1..8ee427da52a 100644 --- a/beacon_node/lighthouse_network/src/rpc/methods.rs +++ b/beacon_node/lighthouse_network/src/rpc/methods.rs @@ -507,9 +507,23 @@ impl std::fmt::Display for OldBlocksByRangeRequest { } } +impl std::fmt::Display for BlobsByRootRequest { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "Request: BlobsByRoot: Number of Requested Roots: {}", + self.block_roots.len() + ) + } +} + impl std::fmt::Display for BlobsByRangeRequest { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "Start Slot: {}, Count: {}", self.start_slot, self.count) + write!( + f, + "Request: BlobsByRange: Start Slot: {}, Count: {}", + self.start_slot, self.count + ) } } diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index 00b7038cf1a..b9abf8ea927 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -122,7 +122,10 @@ impl Worker { }; self.send_sync_message(SyncMessage::AddPeer(peer_id, info)); } - Err(e) => error!(self.log, "Could not process status message"; "error" => ?e), + Err(e) => error!(self.log, "Could not process status message"; + "peer" => %peer_id, + "error" => ?e + ), } } @@ -252,13 +255,14 @@ impl Worker { match finalized_data_availability_boundary { Some(boundary_epoch) => { - if block_epoch >= finalized_data_availability_boundary { + if block_epoch >= boundary_epoch { error!( self.log, "Peer requested block and blob that should be available, but no blob found"; + "request" => %request, "peer" => %peer_id, "request_root" => ?root, - "finalized_data_availability_boundary" => finalized_data_availability_boundary, + "finalized_data_availability_boundary" => %boundary_epoch, ); } else { debug!( @@ -267,7 +271,7 @@ impl Worker { boundary for ByRoot request, no blob found"; "peer" => %peer_id, "request_root" => ?root, - "finalized_data_availability_boundary" => finalized_data_availability_boundary, + "finalized_data_availability_boundary" => ?finalized_data_availability_boundary, ); } } @@ -287,6 +291,7 @@ impl Worker { error!( self.log, "Peer requested block and blob, but no block found"; + "request" => %request, "peer" => %peer_id, "request_root" => ?root ); @@ -295,6 +300,8 @@ impl Worker { error!( self.log, "No blobs in the store for block root"; + "request" => %request, + "peer" => %peer_id, "block_root" => ?root ); self.send_error_response( @@ -436,8 +443,8 @@ impl Worker { ) { debug!(self.log, "Received BlocksByRange Request"; "peer_id" => %peer_id, - "count" => req.count, - "start_slot" => req.start_slot, + "count" => %req.count, + "start_slot" => %req.start_slot, ); // Should not send more than max request blocks @@ -457,8 +464,8 @@ impl Worker { }, )) => { debug!(self.log, "Range request failed during backfill"; - "requested_slot" => slot, - "oldest_known_slot" => oldest_block_slot + "requested_slot" => %slot, + "oldest_known_slot" => %oldest_block_slot ); return self.send_error_response( peer_id, @@ -467,7 +474,13 @@ impl Worker { request_id, ); } - Err(e) => return error!(self.log, "Unable to obtain root iter"; "error" => ?e), + Err(e) => { + return error!(self.log, "Unable to obtain root iter"; + "request" => %req, + "peer" => %peer_id, + "error" => ?e + ) + } }; // Pick out the required blocks, ignoring skip-slots. @@ -499,7 +512,13 @@ impl Worker { let block_roots = match maybe_block_roots { Ok(block_roots) => block_roots, - Err(e) => return error!(self.log, "Error during iteration over blocks"; "error" => ?e), + Err(e) => { + return error!(self.log, "Error during iteration over blocks"; + "request" => %req, + "peer" => %peer_id, + "error" => ?e + ) + } }; // remove all skip slots @@ -531,6 +550,8 @@ impl Worker { error!( self.log, "Block in the chain is not in the store"; + "request" => %req, + "peer" => %peer_id, "request_root" => ?root ); break; @@ -556,6 +577,8 @@ impl Worker { error!( self.log, "Error fetching block for peer"; + "request" => %req, + "peer" => %peer_id, "block_root" => ?root, "error" => ?e ); @@ -584,20 +607,20 @@ impl Worker { "BlocksByRange outgoing response processed"; "peer" => %peer_id, "msg" => "Failed to return all requested blocks", - "start_slot" => req.start_slot, - "current_slot" => current_slot, - "requested" => req.count, - "returned" => blocks_sent + "start_slot" => %req.start_slot, + "current_slot" => %current_slot, + "requested" => %req.count, + "returned" => %blocks_sent ); } else { debug!( self.log, "BlocksByRange outgoing response processed"; "peer" => %peer_id, - "start_slot" => req.start_slot, - "current_slot" => current_slot, - "requested" => req.count, - "returned" => blocks_sent + "start_slot" => %req.start_slot, + "current_slot" => %current_slot, + "requested" => %req.count, + "returned" => %blocks_sent ); } @@ -627,8 +650,8 @@ impl Worker { ) { debug!(self.log, "Received BlobsByRange Request"; "peer_id" => %peer_id, - "count" => req.count, - "start_slot" => req.start_slot, + "count" => %req.count, + "start_slot" => %req.start_slot, ); let start_slot = Slot::from(req.start_slot); @@ -647,8 +670,8 @@ impl Worker { debug!( self.log, "Range request start slot is older than data availability boundary"; - "requested_slot" => req.start_slot, - "oldest_known_slot" => ?oldest_blob_slot, + "requested_slot" => %req.start_slot, + "oldest_known_slot" => oldest_blob_slot, "data_availability_boundary" => data_availability_boundary ); @@ -684,8 +707,8 @@ impl Worker { }, )) => { debug!(self.log, "Range request failed during backfill"; - "requested_slot" => slot, - "oldest_known_slot" => oldest_block_slot + "requested_slot" => %slot, + "oldest_known_slot" => %oldest_block_slot ); return self.send_error_response( peer_id, @@ -694,7 +717,13 @@ impl Worker { request_id, ); } - Err(e) => return error!(self.log, "Unable to obtain root iter"; "error" => ?e), + Err(e) => { + return error!(self.log, "Unable to obtain root iter"; + "request" => %req, + "peer" => %peer_id, + "error" => ?e + ) + } }; // Pick out the required blocks, ignoring skip-slots. @@ -726,7 +755,13 @@ impl Worker { let block_roots = match maybe_block_roots { Ok(block_roots) => block_roots, - Err(e) => return error!(self.log, "Error during iteration over blocks"; "error" => ?e), + Err(e) => { + return error!(self.log, "Error during iteration over blocks"; + "request" => %req, + "peer" => %peer_id, + "error" => ?e + ) + } }; // remove all skip slots @@ -749,6 +784,8 @@ impl Worker { error!( self.log, "No blobs or block in the store for block root"; + "request" => %req, + "peer" => %peer_id, "block_root" => ?root ); break; @@ -757,6 +794,8 @@ impl Worker { error!( self.log, "No blobs in the store for block root"; + "request" => %req, + "peer" => %peer_id, "block_root" => ?root ); self.send_error_response( @@ -772,6 +811,8 @@ impl Worker { error!( self.log, "No kzg_commitments field in block"; + "request" => %req, + "peer" => %peer_id, "block_root" => ?root, ); self.send_error_response( @@ -787,6 +828,8 @@ impl Worker { error!( self.log, "Failed loading blobs older than data availability boundary"; + "request" => %req, + "peer" => %peer_id, "block_root" => ?root, ); self.send_error_response( @@ -802,6 +845,8 @@ impl Worker { error!( self.log, "Error fetching blinded block for block root"; + "request" => %req, + "peer" => %peer_id, "block_root" => ?root, "error" => ?e ); @@ -828,20 +873,20 @@ impl Worker { "BlobsByRange Response processed"; "peer" => %peer_id, "msg" => "Failed to return all requested blobs", - "start_slot" => req.start_slot, - "current_slot" => current_slot, - "requested" => req.count, - "returned" => blobs_sent + "start_slot" => %req.start_slot, + "current_slot" => %current_slot, + "requested" => %req.count, + "returned" => %blobs_sent ); } else { debug!( self.log, "BlobsByRange Response processed"; "peer" => %peer_id, - "start_slot" => req.start_slot, - "current_slot" => current_slot, - "requested" => req.count, - "returned" => blobs_sent + "start_slot" => %req.start_slot, + "current_slot" => %current_slot, + "requested" => %req.count, + "returned" => %blobs_sent ); } From e14550425d51bd47a67581f37025ab0755925fca Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Mon, 23 Jan 2023 13:23:04 +0100 Subject: [PATCH 15/16] Fix mismatched response bug --- beacon_node/network/src/beacon_processor/worker/rpc_methods.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index b9abf8ea927..f08dac547cb 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -351,7 +351,7 @@ impl Worker { // send stream termination if send_response { - self.send_response(peer_id, Response::BlocksByRoot(None), request_id); + self.send_response(peer_id, Response::BlobsByRoot(None), request_id); } drop(send_on_drop); }, From 679f5359a43f17fa122c2dbdc733af9b7e9e8e88 Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Mon, 23 Jan 2023 16:05:53 +0100 Subject: [PATCH 16/16] fixup! Improve error handling --- beacon_node/beacon_chain/src/beacon_chain.rs | 109 ++++--- beacon_node/beacon_chain/src/errors.rs | 4 +- beacon_node/http_api/src/block_id.rs | 4 +- .../beacon_processor/worker/rpc_methods.rs | 280 ++++++++++-------- 4 files changed, 231 insertions(+), 166 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index aba9be32620..4aa9ccca539 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -950,29 +950,36 @@ impl BeaconChain { if let Some(block) = self.early_attester_cache.get_block(*block_root) { return Ok(Some(block)); } - Ok(self.get_block(block_root).await?.map(Arc::new)) + self.get_block(block_root).await + } + + pub fn get_blobs_checking_early_attester_cache( + &self, + block_root: &Hash256, + ) -> Result>>, Error> { + if let Some(blobs) = self.early_attester_cache.get_blobs(*block_root) { + return Ok(Some(blobs)); + } + self.get_blobs(block_root, self.data_availability_boundary()) } pub async fn get_block_and_blobs_checking_early_attester_cache( &self, block_root: &Hash256, - ) -> Result< - ( - Option>>, - Option>>, - ), - Error, - > { + ) -> ( + Result>>, Error>, + Result>>, Error>, + ) { if let (Some(block), Some(blobs)) = ( self.early_attester_cache.get_block(*block_root), self.early_attester_cache.get_blobs(*block_root), ) { - return Ok((Some(block), Some(blobs))); + return (Ok(Some(block)), Ok(Some(blobs))); } - Ok(( - self.get_block(block_root).await?.map(Arc::new), - self.get_blobs(block_root)?.map(Arc::new), - )) + ( + self.get_block(block_root).await, + self.get_blobs(block_root, self.data_availability_boundary()), + ) } /// Returns the block at the given root, if any. @@ -983,11 +990,11 @@ impl BeaconChain { pub async fn get_block( &self, block_root: &Hash256, - ) -> Result>, Error> { + ) -> Result>>, Error> { // Load block from database, returning immediately if we have the full block w payload // stored. let blinded_block = match self.store.try_get_full_block(block_root)? { - Some(DatabaseBlock::Full(block)) => return Ok(Some(block)), + Some(DatabaseBlock::Full(block)) => return Ok(Some(Arc::new(block))), Some(DatabaseBlock::Blinded(block)) => block, None => return Ok(None), }; @@ -1039,6 +1046,7 @@ impl BeaconChain { blinded_block .try_into_full_block(Some(execution_payload)) .ok_or(Error::AddPayloadLogicError) + .map(Arc::new) .map(Some) } @@ -1055,41 +1063,58 @@ impl BeaconChain { /// - any database read errors /// - block and blobs are inconsistent in the database /// - this method is called with a pre-eip4844 block root + /// - there is not block in store that could have committed these blobs pub fn get_blobs( &self, block_root: &Hash256, - ) -> Result>, Error> { - match self.store.get_blobs(block_root)? { - Some(blobs) => Ok(Some(blobs)), - None => { + data_availability_boundary: Option, + ) -> Result>>, Error> { + match self.store.get_blobs(block_root) { + Ok(Some(blobs)) => Ok(Some(Arc::new(blobs))), + Ok(None) => { // Check for the corresponding block to understand whether we *should* have blobs. - self.get_blinded_block(block_root)? - .map(|block| { - // If there are no KZG commitments in the block, we know the sidecar should - // be empty. - let expected_kzg_commitments = - match block.message().body().blob_kzg_commitments() { - Ok(kzg_commitments) => kzg_commitments, - Err(_) => return Err(Error::NoKzgCommitmentsFieldOnBlock), - }; - if expected_kzg_commitments.is_empty() { - Ok(Some(BlobsSidecar::empty_from_parts( - *block_root, - block.slot(), - ))) - } else { - if let Some(boundary) = self.data_availability_boundary() { - // We should have blobs for all blocks younger than the boundary. - if boundary <= block.epoch() { - return Err(Error::BlobsUnavailable); + match self.get_blinded_block(block_root)? { + Some(block) => { + // For some ops the finalized_data_availability_boundary is used, hence it + // must be explicitly passed as a param. + if let Some(boundary) = data_availability_boundary { + // We should have blobs for all blocks younger than the boundary, i.e. + // of the same epoch as the boundary or a higher epoch. + if boundary <= block.epoch() { + // If there are no KZG commitments in the block, we know the + // sidecar should be empty. + let expected_kzg_commitments = + match block.message().body().blob_kzg_commitments() { + Ok(kzg_commitments) => kzg_commitments, + Err(_) => { + // Block verification has failed (big time). + // + // todo(emhane): this doesn't need to be mapped to + // it's own error, it can be directly propagated up, + // but leaving for initial testing phase. + return Err(Error::NoKzgCommitmentsFieldOnBlock); + } + }; + if expected_kzg_commitments.is_empty() { + Ok(Some(Arc::new(BlobsSidecar::empty_from_parts( + *block_root, + block.slot(), + )))) + } else { + // DB inconsistent + Err(Error::BlobsUnavailable) } + } else { + Ok(None) } - Err(Error::BlobsOlderThanDataAvailabilityBoundary) + } else { + Err(Error::Eip4844ForkDisabled) } - }) - .transpose() - .map(Option::flatten) + } + None => Err(Error::BlobsLookupForInexistentBlock), + } } + Err(e) => Err(Error::BlobsDBError(e)), } } diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index 9a9b09fe1f9..96be660f585 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -55,6 +55,7 @@ pub enum BeaconChainError { BeaconStateError(BeaconStateError), DBInconsistent(String), DBError(store::Error), + BlobsDBError(store::Error), ForkChoiceError(ForkChoiceError), ForkChoiceStoreError(ForkChoiceStoreError), MissingBeaconBlock(Hash256), @@ -211,7 +212,8 @@ pub enum BeaconChainError { ProposerHeadForkChoiceError(fork_choice::Error), BlobsUnavailable, NoKzgCommitmentsFieldOnBlock, - BlobsOlderThanDataAvailabilityBoundary, + Eip4844ForkDisabled, + BlobsLookupForInexistentBlock, } easy_from_to!(SlotProcessingError, BeaconChainError); diff --git a/beacon_node/http_api/src/block_id.rs b/beacon_node/http_api/src/block_id.rs index 45c7bed1f7a..6474fdd762b 100644 --- a/beacon_node/http_api/src/block_id.rs +++ b/beacon_node/http_api/src/block_id.rs @@ -184,7 +184,7 @@ impl BlockId { slot ))); } - Ok((Arc::new(block), execution_optimistic)) + Ok((block, execution_optimistic)) } None => Err(warp_utils::reject::custom_not_found(format!( "beacon block with root {}", @@ -200,7 +200,7 @@ impl BlockId { .map_err(warp_utils::reject::beacon_chain_error) .and_then(|block_opt| { block_opt - .map(|block| (Arc::new(block), execution_optimistic)) + .map(|block| (block, execution_optimistic)) .ok_or_else(|| { warp_utils::reject::custom_not_found(format!( "beacon block with root {}", diff --git a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs index f08dac547cb..72fc9ee9f8c 100644 --- a/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs +++ b/beacon_node/network/src/beacon_processor/worker/rpc_methods.rs @@ -12,7 +12,6 @@ use lighthouse_network::rpc::*; use lighthouse_network::{PeerId, PeerRequestId, ReportSource, Response, SyncInfo}; use slog::{debug, error}; use slot_clock::SlotClock; -use std::sync::Arc; use task_executor::TaskExecutor; use types::light_client_bootstrap::LightClientBootstrap; use types::{Epoch, EthSpec, Hash256, SignedBeaconBlockAndBlobsSidecar, Slot}; @@ -224,13 +223,14 @@ impl Worker { async move { let mut send_block_count = 0; let mut send_response = true; + let finalized_data_availability_boundary = self.chain.finalized_data_availability_boundary(); + for root in request.block_roots.iter() { - match self + match (self .chain - .get_block_and_blobs_checking_early_attester_cache(root) - .await + .get_block(root).await, self.chain.get_blobs(root, finalized_data_availability_boundary)) { - Ok((Some(block), Some(blobs))) => { + (Ok(Some(block)), Ok(Some(blobs))) => { self.send_response( peer_id, Response::BlobsByRoot(Some(SignedBeaconBlockAndBlobsSidecar { @@ -241,82 +241,54 @@ impl Worker { ); send_block_count += 1; } - Ok((None, None)) => { - debug!( + (Ok(Some(_)), Ok(None)) => { + // This request could be considered spam (hence logged as error). + error!( self.log, - "Peer requested unknown block and blobs"; + "Peer requested blobs older than data availability boundary"; + "request" => %request, "peer" => %peer_id, - "request_root" => ?root + "block_root" => ?root, + "finalized_data_availability_boundary" => ?finalized_data_availability_boundary, ); } - Ok((Some(block), None)) => { - let finalized_data_availability_boundary = self.chain.finalized_data_availability_boundary(); - let block_epoch = block.epoch(); - - match finalized_data_availability_boundary { - Some(boundary_epoch) => { - if block_epoch >= boundary_epoch { - error!( - self.log, - "Peer requested block and blob that should be available, but no blob found"; - "request" => %request, - "peer" => %peer_id, - "request_root" => ?root, - "finalized_data_availability_boundary" => %boundary_epoch, - ); - } else { - debug!( - self.log, - "Peer requested block and blob older than the data availability \ - boundary for ByRoot request, no blob found"; - "peer" => %peer_id, - "request_root" => ?root, - "finalized_data_availability_boundary" => ?finalized_data_availability_boundary, - ); - } - } - None => { - debug!(self.log, "Eip4844 fork is disabled"); - self.send_error_response( - peer_id, - RPCResponseErrorCode::ResourceUnavailable, - "Eip4844 fork is disabled".into(), - request_id, - ); - return; - } - } - } - Ok((None, Some(_))) => { + (Ok(None), Ok(None)) => {} + (Ok(None), Err(BeaconChainError::BlobsLookupForInexistentBlock)) => { + // No block or blob found for the block root, no guarantee that we + // should have this blob. Continue with the other block roots. + // + // This request could be considered spam (hence logged as error). error!( self.log, - "Peer requested block and blob, but no block found"; - "request" => %request, + "Peer requested unknown block and blobs"; "peer" => %peer_id, "request_root" => ?root ); } - Err(BeaconChainError::BlobsUnavailable) => { + (Ok(None), Ok(Some(_))) => { + // This block should be available. DB inconsistency. error!( self.log, - "No blobs in the store for block root"; + "Block that commits blob not found"; "request" => %request, "peer" => %peer_id, - "block_root" => ?root + "request_root" => ?root ); self.send_error_response( peer_id, RPCResponseErrorCode::ResourceUnavailable, - "Blobs unavailable".into(), + "Block that commits blobs is unavailable".into(), request_id, ); send_response = false; break; } - Err(BeaconChainError::BlockHashMissingFromExecutionLayer(_)) => { + (Err(BeaconChainError::BlockHashMissingFromExecutionLayer(_)), _) => { debug!( self.log, "Failed to fetch execution payload for block and blobs by root request"; + "request" => %request, + "peer" => %peer_id, "block_root" => ?root, "reason" => "execution layer not synced", ); @@ -330,14 +302,27 @@ impl Worker { send_response = false; break; } - Err(e) => { - debug!( + (Err(e), _) => { + error!( self.log, - "Error fetching block for peer"; + "Error fetching block"; + "request" => %request, "peer" => %peer_id, "request_root" => ?root, "error" => ?e, ); + self.send_error_response( + peer_id, + RPCResponseErrorCode::ServerError, + "Fetching block that commits blob failed".into(), + request_id, + ); + send_response = false; + break; + } + (_, Err(e)) => { + send_response = self.handle_blobs_request_error(e, peer_id, root, request_id, format_args!("{}", request)); + break; } } } @@ -541,7 +526,7 @@ impl Worker { blocks_sent += 1; self.send_network_message(NetworkMessage::SendResponse { peer_id, - response: Response::BlocksByRange(Some(Arc::new(block))), + response: Response::BlocksByRange(Some(block)), id: request_id, }); } @@ -577,8 +562,6 @@ impl Worker { error!( self.log, "Error fetching block for peer"; - "request" => %req, - "peer" => %peer_id, "block_root" => ?root, "error" => ?e ); @@ -771,92 +754,57 @@ impl Worker { let mut send_response = true; for root in block_roots { - match self.chain.get_blobs(&root) { + match self.chain.get_blobs_checking_early_attester_cache(&root) { Ok(Some(blobs)) => { blobs_sent += 1; self.send_network_message(NetworkMessage::SendResponse { peer_id, - response: Response::BlobsByRange(Some(Arc::new(blobs))), + response: Response::BlobsByRange(Some(blobs)), id: request_id, }); } Ok(None) => { + // This request could be considered spam (hence logged as error). error!( self.log, - "No blobs or block in the store for block root"; - "request" => %req, - "peer" => %peer_id, - "block_root" => ?root - ); - break; - } - Err(BeaconChainError::BlobsUnavailable) => { - error!( - self.log, - "No blobs in the store for block root"; - "request" => %req, - "peer" => %peer_id, - "block_root" => ?root - ); - self.send_error_response( - peer_id, - RPCResponseErrorCode::ResourceUnavailable, - "Blobs unavailable".into(), - request_id, - ); - send_response = false; - break; - } - Err(BeaconChainError::NoKzgCommitmentsFieldOnBlock) => { - error!( - self.log, - "No kzg_commitments field in block"; + "Peer requested blobs older than data availability boundary"; "request" => %req, "peer" => %peer_id, "block_root" => ?root, ); - self.send_error_response( - peer_id, - RPCResponseErrorCode::ResourceUnavailable, - "Failed reading field kzg_commitments from block".into(), - request_id, - ); - send_response = false; + // Since ByRange sends blobs younger than finalization, after the initial blobs + // sidecar the fork choice may have changed the view of the chain in the + // context of the request, and the blobs sidecar might have been pruned along + // with its abandoned fork. This is not an error, but we stop sending + // responses from this batch as clients MUST respond with blobs sidecars that + // are consistent from a single chain within the context of the request. + send_response = true; break; } - Err(BeaconChainError::BlobsOlderThanDataAvailabilityBoundary) => { + Err(BeaconChainError::BlobsLookupForInexistentBlock) => { + // No block or blob found for the block root, no guarantee that we + // should have this blob. + // + // This request could be considered spam (hence logged as error). error!( self.log, - "Failed loading blobs older than data availability boundary"; - "request" => %req, + "Peer requested unknown block and blobs"; "peer" => %peer_id, - "block_root" => ?root, - ); - self.send_error_response( - peer_id, - RPCResponseErrorCode::ResourceUnavailable, - "Blobs older than data availability boundary".into(), - request_id, + "request_root" => ?root ); - send_response = false; + // ByRange should send chained blobs, same reason to break as when requested + // blobs are too old. + send_response = true; break; } Err(e) => { - error!( - self.log, - "Error fetching blinded block for block root"; - "request" => %req, - "peer" => %peer_id, - "block_root" => ?root, - "error" => ?e - ); - self.send_error_response( + send_response = self.handle_blobs_request_error( + e, peer_id, - RPCResponseErrorCode::ServerError, - "No blobs and failed fetching corresponding block".into(), + &root, request_id, + format_args!("{}", req), ); - send_response = false; break; } } @@ -901,4 +849,94 @@ impl Worker { drop(send_on_drop); } + + /// Errors where blobs should be available are handled the same for ByRoot and ByRange + /// requests. Returns true if a sigterm is yet to be sent. + fn handle_blobs_request_error( + &self, + error: BeaconChainError, + peer_id: PeerId, + root: &Hash256, + request_id: PeerRequestId, + req: std::fmt::Arguments, + ) -> bool { + match error { + BeaconChainError::BlobsUnavailable => { + error!( + self.log, + "No blobs in the store for block root"; + "request" => req, + "peer" => %peer_id, + "block_root" => ?root + ); + self.send_error_response( + peer_id, + RPCResponseErrorCode::ResourceUnavailable, + "Blobs unavailable".into(), + request_id, + ); + false + } + BeaconChainError::Eip4844ForkDisabled => { + // This request could be considered spam (hence logged as error). + error!( + self.log, + "Peer requested blobs but eip4844 fork is disabled"; + "request" => req, + "peer" => %peer_id, + "block_root" => ?root, + ); + true + } + BeaconChainError::NoKzgCommitmentsFieldOnBlock => { + error!( + self.log, + "No kzg_commitments field in block at eip4844 fork epoch or younger"; + "request" => req, + "peer" => %peer_id, + "block_root" => ?root, + ); + self.send_error_response( + peer_id, + RPCResponseErrorCode::ServerError, + "Failed reading field kzg_commitments from block".into(), + request_id, + ); + false + } + BeaconChainError::BlobsDBError(e) => { + error!( + self.log, + "Error fetching blobs from db"; + "request" => req, + "peer" => %peer_id, + "request_root" => ?root, + "error" => ?e, + ); + self.send_error_response( + peer_id, + RPCResponseErrorCode::ServerError, + "Fetching blobs from store failed".into(), + request_id, + ); + false + } + e => { + error!( + self.log, + "Error fetching block"; + "peer" => %peer_id, + "request_root" => ?root, + "error" => ?e, + ); + self.send_error_response( + peer_id, + RPCResponseErrorCode::ServerError, + "Fetching block that commits blob failed".into(), + request_id, + ); + false + } + } + } }