Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - Use BeaconProcessor for API requests #4462

Closed
wants to merge 43 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
238837b
Start adding HTTP queues
paulhauner Jul 3, 2023
e911287
Start wiring up beacon processor
paulhauner Jul 3, 2023
da96733
Start adding `TaskSpawner`
paulhauner Jul 4, 2023
5e19518
Refactor to avoid repetition
paulhauner Jul 4, 2023
be7f32f
Add async fns to task spawner
paulhauner Jul 4, 2023
c5f159e
Start wiring task spawner into routes
paulhauner Jul 4, 2023
5914e36
Wire `task_spawner` into all routes
paulhauner Jul 4, 2023
799d20c
Add P1 queue to BP
paulhauner Jul 4, 2023
a7b259d
Start simplifying `TaskSpawner`
paulhauner Jul 4, 2023
10dc153
Futher simplify `TaskSpawner`
paulhauner Jul 4, 2023
b01b595
Hold the `TestRuntime` in tests to prevent BP shutdown
paulhauner Jul 5, 2023
1ea991c
Dedup runtime shutdown
paulhauner Jul 5, 2023
eb2be2c
Bump syncing and health endpoints to P0
paulhauner Jul 5, 2023
8a76593
Bypass the task_spawner for `node/version`
paulhauner Jul 5, 2023
1157568
Add `BeaconProcessorConfig`
paulhauner Jul 7, 2023
e03017d
Add CLI flags
paulhauner Jul 7, 2023
596df18
Set max_workers to 4 in sims
paulhauner Jul 7, 2023
15ed85d
Add batch size CLI flags
paulhauner Jul 7, 2023
f576b94
Add -len suffix to flags.
paulhauner Jul 7, 2023
90a8f8a
Tidy help text
paulhauner Jul 7, 2023
1d84f94
Bump max workers to 8
paulhauner Jul 10, 2023
9133265
Remove unused import
paulhauner Jul 10, 2023
0a3ffdf
Increase simulator HTTP timeout
paulhauner Jul 10, 2023
40612b4
Reduce beacon processor cores to 4 for sims
paulhauner Jul 10, 2023
60206c6
Add flag to enable/disable BP in HTTP API
paulhauner Jul 11, 2023
b4973f2
Review priorties
paulhauner Jul 11, 2023
311e1bd
Add value name for new flag
paulhauner Jul 11, 2023
219a61c
Move register_validator off beacon processor
paulhauner Jul 11, 2023
7be4dbd
Fix priority for POST beacon/blocks
paulhauner Jul 11, 2023
efbabe3
Optimise single validator lookup query
paulhauner Jul 11, 2023
e41e289
Merge branch 'unstable' into api-uses-bp
paulhauner Jul 17, 2023
d6778bc
Add metrics for HTTP API queue lengths
paulhauner Jul 17, 2023
7e18f71
Fix queue popping bug
paulhauner Jul 17, 2023
d8c6319
Add comments to task_spawner.rs
paulhauner Jul 17, 2023
bbdb3d2
De-dupe `TestRuntime`
paulhauner Jul 18, 2023
ec6446c
Use two workers in tests
paulhauner Jul 18, 2023
63de59b
Merge branch 'unstable' into api-uses-bp
paulhauner Jul 18, 2023
a9a478e
Fix CLI flag
paulhauner Jul 18, 2023
9632d66
Merge branch 'unstable' into api-uses-bp
paulhauner Aug 3, 2023
84f6684
Merge branch 'unstable' into api-uses-bp
paulhauner Aug 7, 2023
edf8f1f
Fix lint
paulhauner Aug 7, 2023
93c9228
Apply suggestions from code review
paulhauner Aug 8, 2023
4e6b868
Merge branch 'unstable' into api-uses-bp
paulhauner Aug 8, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions beacon_node/beacon_chain/src/chain_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,6 @@ pub struct ChainConfig {
///
/// This is useful for block builders and testing.
pub always_prepare_payload: bool,
/// Whether backfill sync processing should be rate-limited.
pub enable_backfill_rate_limiting: bool,
/// Whether to use `ProgressiveBalancesCache` in unrealized FFG progression calculation.
pub progressive_balances_mode: ProgressiveBalancesMode,
/// Number of epochs between each migration of data from the hot database to the freezer.
Expand Down Expand Up @@ -114,7 +112,6 @@ impl Default for ChainConfig {
shuffling_cache_size: crate::shuffling_cache::DEFAULT_CACHE_SIZE,
genesis_backfill: false,
always_prepare_payload: false,
enable_backfill_rate_limiting: true,
progressive_balances_mode: ProgressiveBalancesMode::Checked,
epochs_per_migration: crate::migrate::DEFAULT_EPOCHS_PER_MIGRATION,
}
Expand Down
4 changes: 3 additions & 1 deletion beacon_node/beacon_processor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,6 @@ types = { path = "../../consensus/types" }
ethereum_ssz = "0.5.0"
lazy_static = "1.4.0"
lighthouse_metrics = { path = "../../common/lighthouse_metrics" }
parking_lot = "0.12.0"
parking_lot = "0.12.0"
num_cpus = "1.13.0"
serde = { version = "1.0.116", features = ["derive"] }
127 changes: 115 additions & 12 deletions beacon_node/beacon_processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use lighthouse_network::NetworkGlobals;
use lighthouse_network::{MessageId, PeerId};
use logging::TimeLatch;
use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use slog::{crit, debug, error, trace, warn, Logger};
use slot_clock::SlotClock;
use std::cmp;
Expand All @@ -70,7 +71,7 @@ pub mod work_reprocessing_queue;
/// The maximum size of the channel for work events to the `BeaconProcessor`.
///
/// Setting this too low will cause consensus messages to be dropped.
pub const MAX_WORK_EVENT_QUEUE_LEN: usize = 16_384;
const DEFAULT_MAX_WORK_EVENT_QUEUE_LEN: usize = 16_384;

/// The maximum size of the channel for idle events to the `BeaconProcessor`.
///
Expand All @@ -79,7 +80,7 @@ pub const MAX_WORK_EVENT_QUEUE_LEN: usize = 16_384;
const MAX_IDLE_QUEUE_LEN: usize = 16_384;

/// The maximum size of the channel for re-processing work events.
pub const MAX_SCHEDULED_WORK_QUEUE_LEN: usize = 3 * MAX_WORK_EVENT_QUEUE_LEN / 4;
const DEFAULT_MAX_SCHEDULED_WORK_QUEUE_LEN: usize = 3 * DEFAULT_MAX_WORK_EVENT_QUEUE_LEN / 4;

/// The maximum number of queued `Attestation` objects that will be stored before we start dropping
/// them.
Expand Down Expand Up @@ -167,6 +168,14 @@ const MAX_BLS_TO_EXECUTION_CHANGE_QUEUE_LEN: usize = 16_384;
/// will be stored before we start dropping them.
const MAX_LIGHT_CLIENT_BOOTSTRAP_QUEUE_LEN: usize = 1_024;

/// The maximum number of priority-0 (highest priority) messages that will be queued before
/// they begin to be dropped.
const MAX_API_REQUEST_P0_QUEUE_LEN: usize = 1_024;

/// The maximum number of priority-1 (second-highest priority) messages that will be queued before
/// they begin to be dropped.
const MAX_API_REQUEST_P1_QUEUE_LEN: usize = 1_024;

/// The name of the manager tokio task.
const MANAGER_TASK_NAME: &str = "beacon_processor_manager";

Expand All @@ -184,8 +193,8 @@ const WORKER_TASK_NAME: &str = "beacon_processor_worker";
/// Poisoning occurs when an invalid signature is included in a batch of attestations. A single
/// invalid signature causes the entire batch to fail. When a batch fails, we fall-back to
/// individually verifying each attestation signature.
const MAX_GOSSIP_ATTESTATION_BATCH_SIZE: usize = 64;
const MAX_GOSSIP_AGGREGATE_BATCH_SIZE: usize = 64;
const DEFAULT_MAX_GOSSIP_ATTESTATION_BATCH_SIZE: usize = 64;
const DEFAULT_MAX_GOSSIP_AGGREGATE_BATCH_SIZE: usize = 64;

/// Unique IDs used for metrics and testing.
pub const WORKER_FREED: &str = "worker_freed";
Expand Down Expand Up @@ -215,6 +224,61 @@ pub const UNKNOWN_BLOCK_ATTESTATION: &str = "unknown_block_attestation";
pub const UNKNOWN_BLOCK_AGGREGATE: &str = "unknown_block_aggregate";
pub const UNKNOWN_LIGHT_CLIENT_UPDATE: &str = "unknown_light_client_update";
pub const GOSSIP_BLS_TO_EXECUTION_CHANGE: &str = "gossip_bls_to_execution_change";
pub const API_REQUEST_P0: &str = "api_request_p0";
pub const API_REQUEST_P1: &str = "api_request_p1";

#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)]
pub struct BeaconProcessorConfig {
pub max_workers: usize,
pub max_work_event_queue_len: usize,
pub max_scheduled_work_queue_len: usize,
pub max_gossip_attestation_batch_size: usize,
pub max_gossip_aggregate_batch_size: usize,
pub enable_backfill_rate_limiting: bool,
}

impl Default for BeaconProcessorConfig {
fn default() -> Self {
Self {
max_workers: cmp::max(1, num_cpus::get()),
max_work_event_queue_len: DEFAULT_MAX_WORK_EVENT_QUEUE_LEN,
max_scheduled_work_queue_len: DEFAULT_MAX_SCHEDULED_WORK_QUEUE_LEN,
max_gossip_attestation_batch_size: DEFAULT_MAX_GOSSIP_ATTESTATION_BATCH_SIZE,
max_gossip_aggregate_batch_size: DEFAULT_MAX_GOSSIP_AGGREGATE_BATCH_SIZE,
enable_backfill_rate_limiting: true,
}
}
}

// The channels necessary to instantiate a `BeaconProcessor`.
pub struct BeaconProcessorChannels<E: EthSpec> {
pub beacon_processor_tx: BeaconProcessorSend<E>,
pub beacon_processor_rx: mpsc::Receiver<WorkEvent<E>>,
pub work_reprocessing_tx: mpsc::Sender<ReprocessQueueMessage>,
pub work_reprocessing_rx: mpsc::Receiver<ReprocessQueueMessage>,
}

impl<E: EthSpec> BeaconProcessorChannels<E> {
pub fn new(config: &BeaconProcessorConfig) -> Self {
let (beacon_processor_tx, beacon_processor_rx) =
mpsc::channel(config.max_scheduled_work_queue_len);
let (work_reprocessing_tx, work_reprocessing_rx) =
mpsc::channel(config.max_scheduled_work_queue_len);

Self {
beacon_processor_tx: BeaconProcessorSend(beacon_processor_tx),
beacon_processor_rx,
work_reprocessing_rx,
work_reprocessing_tx,
}
}
}

impl<E: EthSpec> Default for BeaconProcessorChannels<E> {
fn default() -> Self {
Self::new(&BeaconProcessorConfig::default())
}
}

/// A simple first-in-first-out queue with a maximum length.
struct FifoQueue<T> {
Expand Down Expand Up @@ -363,7 +427,7 @@ impl<E: EthSpec> WorkEvent<E> {
}
}

impl<E: EthSpec> std::convert::From<ReadyWork> for WorkEvent<E> {
impl<E: EthSpec> From<ReadyWork> for WorkEvent<E> {
fn from(ready_work: ReadyWork) -> Self {
match ready_work {
ReadyWork::Block(QueuedGossipBlock {
Expand Down Expand Up @@ -465,6 +529,10 @@ impl<E: EthSpec> BeaconProcessorSend<E> {
pub type AsyncFn = Pin<Box<dyn Future<Output = ()> + Send + Sync>>;
pub type BlockingFn = Box<dyn FnOnce() + Send + Sync>;
pub type BlockingFnWithManualSendOnIdle = Box<dyn FnOnce(SendOnDrop) + Send + Sync>;
pub enum BlockingOrAsync {
Blocking(BlockingFn),
Async(AsyncFn),
}

/// Indicates the type of work to be performed and therefore its priority and
/// queuing specifics.
Expand Down Expand Up @@ -523,6 +591,8 @@ pub enum Work<E: EthSpec> {
BlocksByRootsRequest(BlockingFnWithManualSendOnIdle),
GossipBlsToExecutionChange(BlockingFn),
LightClientBootstrapRequest(BlockingFn),
ApiRequestP0(BlockingOrAsync),
ApiRequestP1(BlockingOrAsync),
}

impl<E: EthSpec> fmt::Debug for Work<E> {
Expand Down Expand Up @@ -560,6 +630,8 @@ impl<E: EthSpec> Work<E> {
Work::UnknownBlockAggregate { .. } => UNKNOWN_BLOCK_AGGREGATE,
Work::GossipBlsToExecutionChange(_) => GOSSIP_BLS_TO_EXECUTION_CHANGE,
Work::UnknownLightClientOptimisticUpdate { .. } => UNKNOWN_LIGHT_CLIENT_UPDATE,
Work::ApiRequestP0 { .. } => API_REQUEST_P0,
Work::ApiRequestP1 { .. } => API_REQUEST_P1,
}
}
}
Expand Down Expand Up @@ -638,7 +710,7 @@ pub struct BeaconProcessor<E: EthSpec> {
pub executor: TaskExecutor,
pub max_workers: usize,
pub current_workers: usize,
pub enable_backfill_rate_limiting: bool,
pub config: BeaconProcessorConfig,
pub log: Logger,
}

Expand Down Expand Up @@ -714,11 +786,13 @@ impl<E: EthSpec> BeaconProcessor<E> {

let mut lcbootstrap_queue = FifoQueue::new(MAX_LIGHT_CLIENT_BOOTSTRAP_QUEUE_LEN);

let mut api_request_p0_queue = FifoQueue::new(MAX_API_REQUEST_P0_QUEUE_LEN);
let mut api_request_p1_queue = FifoQueue::new(MAX_API_REQUEST_P1_QUEUE_LEN);

// Channels for sending work to the re-process scheduler (`work_reprocessing_tx`) and to
// receive them back once they are ready (`ready_work_rx`).
let (ready_work_tx, ready_work_rx) =
mpsc::channel::<ReadyWork>(MAX_SCHEDULED_WORK_QUEUE_LEN);

mpsc::channel::<ReadyWork>(self.config.max_scheduled_work_queue_len);
spawn_reprocess_scheduler(
ready_work_tx,
work_reprocessing_rx,
Expand All @@ -739,7 +813,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
reprocess_work_rx: ready_work_rx,
};

let enable_backfill_rate_limiting = self.enable_backfill_rate_limiting;
let enable_backfill_rate_limiting = self.config.enable_backfill_rate_limiting;

loop {
let work_event = match inbound_events.next().await {
Expand Down Expand Up @@ -850,12 +924,17 @@ impl<E: EthSpec> BeaconProcessor<E> {
// required to verify some attestations.
} else if let Some(item) = gossip_block_queue.pop() {
self.spawn_worker(item, idle_tx);
// Check the priority 0 API requests after blocks, but before attestations.
} else if let Some(item) = api_request_p0_queue.pop() {
self.spawn_worker(item, idle_tx);
// Check the aggregates, *then* the unaggregates since we assume that
// aggregates are more valuable to local validators and effectively give us
// more information with less signature verification time.
} else if aggregate_queue.len() > 0 {
let batch_size =
cmp::min(aggregate_queue.len(), MAX_GOSSIP_AGGREGATE_BATCH_SIZE);
let batch_size = cmp::min(
aggregate_queue.len(),
self.config.max_gossip_aggregate_batch_size,
);

if batch_size < 2 {
// One single aggregate is in the queue, process it individually.
Expand Down Expand Up @@ -914,7 +993,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
} else if attestation_queue.len() > 0 {
let batch_size = cmp::min(
attestation_queue.len(),
MAX_GOSSIP_ATTESTATION_BATCH_SIZE,
self.config.max_gossip_attestation_batch_size,
);

if batch_size < 2 {
Expand Down Expand Up @@ -1005,6 +1084,12 @@ impl<E: EthSpec> BeaconProcessor<E> {
self.spawn_worker(item, idle_tx);
} else if let Some(item) = gossip_bls_to_execution_change_queue.pop() {
self.spawn_worker(item, idle_tx);
// Check the priority 1 API requests after we've
// processed all the interesting things from the network
// and things required for us to stay in good repute
// with our P2P peers.
} else if let Some(item) = api_request_p1_queue.pop() {
self.spawn_worker(item, idle_tx);
// Handle backfill sync chain segments.
} else if let Some(item) = backfill_chain_segment.pop() {
self.spawn_worker(item, idle_tx);
Expand Down Expand Up @@ -1127,6 +1212,12 @@ impl<E: EthSpec> BeaconProcessor<E> {
Work::UnknownLightClientOptimisticUpdate { .. } => {
unknown_light_client_update_queue.push(work, work_id, &self.log)
}
Work::ApiRequestP0 { .. } => {
api_request_p0_queue.push(work, work_id, &self.log)
}
Work::ApiRequestP1 { .. } => {
api_request_p1_queue.push(work, work_id, &self.log)
}
}
}
}
Expand Down Expand Up @@ -1183,6 +1274,14 @@ impl<E: EthSpec> BeaconProcessor<E> {
&metrics::BEACON_PROCESSOR_BLS_TO_EXECUTION_CHANGE_QUEUE_TOTAL,
gossip_bls_to_execution_change_queue.len() as i64,
);
metrics::set_gauge(
&metrics::BEACON_PROCESSOR_API_REQUEST_P0_QUEUE_TOTAL,
api_request_p0_queue.len() as i64,
);
metrics::set_gauge(
&metrics::BEACON_PROCESSOR_API_REQUEST_P1_QUEUE_TOTAL,
api_request_p1_queue.len() as i64,
);

if aggregate_queue.is_full() && aggregate_debounce.elapsed() {
error!(
Expand Down Expand Up @@ -1299,6 +1398,10 @@ impl<E: EthSpec> BeaconProcessor<E> {
task_spawner.spawn_blocking_with_manual_send_idle(work)
}
Work::ChainSegmentBackfill(process_fn) => task_spawner.spawn_async(process_fn),
Work::ApiRequestP0(process_fn) | Work::ApiRequestP1(process_fn) => match process_fn {
BlockingOrAsync::Blocking(process_fn) => task_spawner.spawn_blocking(process_fn),
BlockingOrAsync::Async(process_fn) => task_spawner.spawn_async(process_fn),
},
Work::GossipVoluntaryExit(process_fn)
| Work::GossipProposerSlashing(process_fn)
| Work::GossipAttesterSlashing(process_fn)
Expand Down
9 changes: 9 additions & 0 deletions beacon_node/beacon_processor/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,15 @@ lazy_static::lazy_static! {
"beacon_processor_sync_contribution_queue_total",
"Count of sync committee contributions waiting to be processed."
);
// HTTP API requests.
pub static ref BEACON_PROCESSOR_API_REQUEST_P0_QUEUE_TOTAL: Result<IntGauge> = try_create_int_gauge(
"beacon_processor_api_request_p0_queue_total",
"Count of P0 HTTP requesets waiting to be processed."
);
pub static ref BEACON_PROCESSOR_API_REQUEST_P1_QUEUE_TOTAL: Result<IntGauge> = try_create_int_gauge(
"beacon_processor_api_request_p1_queue_total",
"Count of P1 HTTP requesets waiting to be processed."
);

/*
* Attestation reprocessing queue metrics.
Expand Down
Loading
Loading