diff --git a/beacon_node/beacon_chain/src/events.rs b/beacon_node/beacon_chain/src/events.rs index fed05032374..b267cc853f8 100644 --- a/beacon_node/beacon_chain/src/events.rs +++ b/beacon_node/beacon_chain/src/events.rs @@ -21,8 +21,11 @@ pub struct ServerSentEventHandler { } impl ServerSentEventHandler { - pub fn new(log: Logger) -> Self { - Self::new_with_capacity(log, DEFAULT_CHANNEL_CAPACITY) + pub fn new(log: Logger, capacity_multiplier: usize) -> Self { + Self::new_with_capacity( + log, + capacity_multiplier.saturating_mul(DEFAULT_CHANNEL_CAPACITY), + ) } pub fn new_with_capacity(log: Logger, capacity: usize) -> Self { diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index 44112271636..fc3e5f5cefe 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -159,7 +159,10 @@ where let context = runtime_context.service_context("beacon".into()); let spec = chain_spec.ok_or("beacon_chain_start_method requires a chain spec")?; let event_handler = if self.http_api_config.enabled { - Some(ServerSentEventHandler::new(context.log().clone())) + Some(ServerSentEventHandler::new( + context.log().clone(), + self.http_api_config.sse_capacity_multiplier, + )) } else { None }; diff --git a/beacon_node/http_api/src/test_utils.rs b/beacon_node/http_api/src/test_utils.rs index 18c2df31d15..f041f5a3600 100644 --- a/beacon_node/http_api/src/test_utils.rs +++ b/beacon_node/http_api/src/test_utils.rs @@ -222,6 +222,7 @@ pub async fn create_api_server_on_port( allow_sync_stalled: false, data_dir: std::path::PathBuf::from(DEFAULT_ROOT_DIR), spec_fork_name: None, + sse_capacity_multiplier: 1, enable_beacon_processor: true, }, chain: Some(chain), diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index 2b5c243b08b..30bb11d8d96 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -389,6 +389,15 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { stalled. This is useful for very small testnets. TESTING ONLY. DO NOT USE ON \ MAINNET.") ) + .arg( + Arg::with_name("http-sse-capacity-multiplier") + .long("http-sse-capacity-multiplier") + .takes_value(true) + .default_value("1") + .value_name("N") + .help("Multiplier to apply to the length of HTTP server-sent-event (SSE) channels. \ + Increasing this value can prevent messages from being dropped.") + ) .arg( Arg::with_name("http-enable-beacon-processor") .long("http-enable-beacon-processor") diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 14858976730..90d1254607d 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -150,6 +150,9 @@ pub fn get_config( client_config.http_api.allow_sync_stalled = true; } + client_config.http_api.sse_capacity_multiplier = + parse_required(cli_args, "http-sse-capacity-multiplier")?; + client_config.http_api.enable_beacon_processor = parse_required(cli_args, "http-enable-beacon-processor")?; diff --git a/lighthouse/tests/beacon_node.rs b/lighthouse/tests/beacon_node.rs index 14c35316399..be7e4a71be2 100644 --- a/lighthouse/tests/beacon_node.rs +++ b/lighthouse/tests/beacon_node.rs @@ -2388,3 +2388,18 @@ fn beacon_processor_zero_workers() { .flag("beacon-processor-max-workers", Some("0")) .run_with_zero_port(); } + +#[test] +fn http_sse_capacity_multiplier_default() { + CommandLineTest::new() + .run_with_zero_port() + .with_config(|config| assert_eq!(config.http_api.sse_capacity_multiplier, 1)); +} + +#[test] +fn http_sse_capacity_multiplier_override() { + CommandLineTest::new() + .flag("http-sse-capacity-multiplier", Some("10")) + .run_with_zero_port() + .with_config(|config| assert_eq!(config.http_api.sse_capacity_multiplier, 10)); +}