Skip to content

Commit

Permalink
Broadcast VC requests in parallel and fix subscription error (sigp#6223)
Browse files Browse the repository at this point in the history
* Broadcast VC requests in parallel

* Remove outdated comment

* Try some things

* Fix subscription error

* Remove junk logging
  • Loading branch information
michaelsproul authored and AgeManning committed Sep 3, 2024
1 parent c599e8b commit 60b9e7b
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 59 deletions.
9 changes: 8 additions & 1 deletion common/eth2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ impl fmt::Display for Error {
pub struct Timeouts {
pub attestation: Duration,
pub attester_duties: Duration,
pub attestation_subscriptions: Duration,
pub liveness: Duration,
pub proposal: Duration,
pub proposer_duties: Duration,
Expand All @@ -137,6 +138,7 @@ impl Timeouts {
Timeouts {
attestation: timeout,
attester_duties: timeout,
attestation_subscriptions: timeout,
liveness: timeout,
proposal: timeout,
proposer_duties: timeout,
Expand Down Expand Up @@ -2540,7 +2542,12 @@ impl BeaconNodeHttpClient {
.push("validator")
.push("beacon_committee_subscriptions");

self.post(path, &subscriptions).await?;
self.post_with_timeout(
path,
&subscriptions,
self.timeouts.attestation_subscriptions,
)
.await?;

Ok(())
}
Expand Down
88 changes: 48 additions & 40 deletions validator_client/src/beacon_node_fallback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,12 @@ impl<T: Debug> fmt::Display for Errors<T> {
}
}

impl<T> Errors<T> {
pub fn num_errors(&self) -> usize {
self.0.len()
}
}

/// Reasons why a candidate might not be ready.
#[derive(Debug, Clone, Copy)]
pub enum CandidateError {
Expand Down Expand Up @@ -599,46 +605,41 @@ impl<T: SlotClock, E: EthSpec> BeaconNodeFallback<T, E> {
F: Fn(&'a BeaconNodeHttpClient) -> R,
R: Future<Output = Result<O, Err>>,
{
let mut results = vec![];
let mut to_retry = vec![];
let mut retry_unsynced = vec![];

// Run `func` using a `candidate`, returning the value or capturing errors.
//
// We use a macro instead of a closure here since it is not trivial to move `func` into a
// closure.
macro_rules! try_func {
($candidate: ident) => {{
inc_counter_vec(&ENDPOINT_REQUESTS, &[$candidate.beacon_node.as_ref()]);
let run_on_candidate = |candidate: &'a CandidateBeaconNode<E>| async {
inc_counter_vec(&ENDPOINT_REQUESTS, &[candidate.beacon_node.as_ref()]);

// There exists a race condition where `func` may be called when the candidate is
// actually not ready. We deem this an acceptable inefficiency.
match func(&$candidate.beacon_node).await {
Ok(val) => results.push(Ok(val)),
Err(e) => {
// If we have an error on this function, make the client as not-ready.
//
// There exists a race condition where the candidate may have been marked
// as ready between the `func` call and now. We deem this an acceptable
// inefficiency.
if matches!(offline_on_failure, OfflineOnFailure::Yes) {
$candidate.set_offline().await;
}
results.push(Err((
$candidate.beacon_node.to_string(),
Error::RequestFailed(e),
)));
inc_counter_vec(&ENDPOINT_ERRORS, &[$candidate.beacon_node.as_ref()]);
// There exists a race condition where `func` may be called when the candidate is
// actually not ready. We deem this an acceptable inefficiency.
match func(&candidate.beacon_node).await {
Ok(val) => Ok(val),
Err(e) => {
// If we have an error on this function, mark the client as not-ready.
//
// There exists a race condition where the candidate may have been marked
// as ready between the `func` call and now. We deem this an acceptable
// inefficiency.
if matches!(offline_on_failure, OfflineOnFailure::Yes) {
candidate.set_offline().await;
}
inc_counter_vec(&ENDPOINT_ERRORS, &[candidate.beacon_node.as_ref()]);
Err((candidate.beacon_node.to_string(), Error::RequestFailed(e)))
}
}};
}
}
};

// First pass: try `func` on all synced and ready candidates.
//
// This ensures that we always choose a synced node if it is available.
let mut first_batch_futures = vec![];
for candidate in &self.candidates {
match candidate.status(RequireSynced::Yes).await {
Ok(_) => {
first_batch_futures.push(run_on_candidate(candidate));
}
Err(CandidateError::NotSynced) if require_synced == false => {
// This client is unsynced we will try it after trying all synced clients
retry_unsynced.push(candidate);
Expand All @@ -647,22 +648,24 @@ impl<T: SlotClock, E: EthSpec> BeaconNodeFallback<T, E> {
// This client was not ready on the first pass, we might try it again later.
to_retry.push(candidate);
}
Ok(_) => try_func!(candidate),
}
}
let first_batch_results = futures::future::join_all(first_batch_futures).await;

// Second pass: try `func` on ready unsynced candidates. This only runs if we permit
// unsynced candidates.
//
// Due to async race-conditions, it is possible that we will send a request to a candidate
// that has been set to an offline/unready status. This is acceptable.
if require_synced == false {
for candidate in retry_unsynced {
try_func!(candidate);
}
}
let second_batch_results = if require_synced == false {
futures::future::join_all(retry_unsynced.into_iter().map(run_on_candidate)).await
} else {
vec![]
};

// Third pass: try again, attempting to make non-ready clients become ready.
let mut third_batch_futures = vec![];
let mut third_batch_results = vec![];
for candidate in to_retry {
// If the candidate hasn't luckily transferred into the correct state in the meantime,
// force an update of the state.
Expand All @@ -676,16 +679,21 @@ impl<T: SlotClock, E: EthSpec> BeaconNodeFallback<T, E> {
};

match new_status {
Ok(()) => try_func!(candidate),
Err(CandidateError::NotSynced) if require_synced == false => try_func!(candidate),
Err(e) => {
results.push(Err((
candidate.beacon_node.to_string(),
Error::Unavailable(e),
)));
Ok(()) => third_batch_futures.push(run_on_candidate(candidate)),
Err(CandidateError::NotSynced) if require_synced == false => {
third_batch_futures.push(run_on_candidate(candidate))
}
Err(e) => third_batch_results.push(Err((
candidate.beacon_node.to_string(),
Error::Unavailable(e),
))),
}
}
third_batch_results.extend(futures::future::join_all(third_batch_futures).await);

let mut results = first_batch_results;
results.extend(second_batch_results);
results.extend(third_batch_results);

let errors: Vec<_> = results.into_iter().filter_map(|res| res.err()).collect();

Expand Down
53 changes: 35 additions & 18 deletions validator_client/src/duties_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ const _: () = assert!({
/// This number is based upon `MIN_PEER_DISCOVERY_SLOT_LOOK_AHEAD` value in the
/// `beacon_node::network::attestation_service` crate. It is not imported directly to avoid
/// bringing in the entire crate.
const _: () = assert!(ATTESTATION_SUBSCRIPTION_OFFSETS[0] > 2);
const MIN_ATTESTATION_SUBSCRIPTION_LOOKAHEAD: u64 = 2;
const _: () = assert!(ATTESTATION_SUBSCRIPTION_OFFSETS[0] > MIN_ATTESTATION_SUBSCRIPTION_LOOKAHEAD);

// The info in the enum variants is displayed in logging, clippy thinks it's dead code.
#[derive(Debug)]
Expand Down Expand Up @@ -121,6 +122,8 @@ pub struct DutyAndProof {
pub struct SubscriptionSlots {
/// Pairs of `(slot, already_sent)` in slot-descending order.
slots: Vec<(Slot, AtomicBool)>,
/// The slot of the duty itself.
duty_slot: Slot,
}

/// Create a selection proof for `duty`.
Expand Down Expand Up @@ -172,18 +175,20 @@ impl SubscriptionSlots {
.filter(|scheduled_slot| *scheduled_slot > current_slot)
.map(|scheduled_slot| (scheduled_slot, AtomicBool::new(false)))
.collect();
Arc::new(Self { slots })
Arc::new(Self { slots, duty_slot })
}

/// Return `true` if we should send a subscription at `slot`.
fn should_send_subscription_at(&self, slot: Slot) -> bool {
// Iterate slots from smallest to largest looking for one that hasn't been completed yet.
self.slots
.iter()
.rev()
.any(|(scheduled_slot, already_sent)| {
slot >= *scheduled_slot && !already_sent.load(Ordering::Relaxed)
})
slot + MIN_ATTESTATION_SUBSCRIPTION_LOOKAHEAD <= self.duty_slot
&& self
.slots
.iter()
.rev()
.any(|(scheduled_slot, already_sent)| {
slot >= *scheduled_slot && !already_sent.load(Ordering::Relaxed)
})
}

/// Update our record of subscribed slots to account for successful subscription at `slot`.
Expand Down Expand Up @@ -737,7 +742,7 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
// If there are any subscriptions, push them out to beacon nodes
if !subscriptions.is_empty() {
let subscriptions_ref = &subscriptions;
if let Err(e) = duties_service
let subscription_result = duties_service
.beacon_nodes
.request(
RequireSynced::No,
Expand All @@ -753,15 +758,8 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
.await
},
)
.await
{
error!(
log,
"Failed to subscribe validators";
"error" => %e
)
} else {
// Record that subscriptions were successfully sent.
.await;
if subscription_result.as_ref().is_ok() {
debug!(
log,
"Broadcast attestation subscriptions";
Expand All @@ -770,6 +768,25 @@ async fn poll_beacon_attesters<T: SlotClock + 'static, E: EthSpec>(
for subscription_slots in subscription_slots_to_confirm {
subscription_slots.record_successful_subscription_at(current_slot);
}
} else if let Err(e) = subscription_result {
if e.num_errors() < duties_service.beacon_nodes.num_total() {
warn!(
log,
"Some subscriptions failed";
"error" => %e,
);
// If subscriptions were sent to at least one node, regard that as a success.
// There is some redundancy built into the subscription schedule to handle failures.
for subscription_slots in subscription_slots_to_confirm {
subscription_slots.record_successful_subscription_at(current_slot);
}
} else {
error!(
log,
"All subscriptions failed";
"error" => %e
);
}
}
}

Expand Down
3 changes: 3 additions & 0 deletions validator_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ const WAITING_FOR_GENESIS_POLL_TIME: Duration = Duration::from_secs(12);
/// This can help ensure that proper endpoint fallback occurs.
const HTTP_ATTESTATION_TIMEOUT_QUOTIENT: u32 = 4;
const HTTP_ATTESTER_DUTIES_TIMEOUT_QUOTIENT: u32 = 4;
const HTTP_ATTESTATION_SUBSCRIPTIONS_TIMEOUT_QUOTIENT: u32 = 24;
const HTTP_LIVENESS_TIMEOUT_QUOTIENT: u32 = 4;
const HTTP_PROPOSAL_TIMEOUT_QUOTIENT: u32 = 2;
const HTTP_PROPOSER_DUTIES_TIMEOUT_QUOTIENT: u32 = 4;
Expand Down Expand Up @@ -323,6 +324,8 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
Timeouts {
attestation: slot_duration / HTTP_ATTESTATION_TIMEOUT_QUOTIENT,
attester_duties: slot_duration / HTTP_ATTESTER_DUTIES_TIMEOUT_QUOTIENT,
attestation_subscriptions: slot_duration
/ HTTP_ATTESTATION_SUBSCRIPTIONS_TIMEOUT_QUOTIENT,
liveness: slot_duration / HTTP_LIVENESS_TIMEOUT_QUOTIENT,
proposal: slot_duration / HTTP_PROPOSAL_TIMEOUT_QUOTIENT,
proposer_duties: slot_duration / HTTP_PROPOSER_DUTIES_TIMEOUT_QUOTIENT,
Expand Down

0 comments on commit 60b9e7b

Please sign in to comment.