diff --git a/validator_client/src/http_api/mod.rs b/validator_client/src/http_api/mod.rs index c65beb7390a..1509ffa2ec4 100644 --- a/validator_client/src/http_api/mod.rs +++ b/validator_client/src/http_api/mod.rs @@ -45,16 +45,13 @@ use task_executor::TaskExecutor; use tokio_stream::{wrappers::BroadcastStream, StreamExt}; use types::{ChainSpec, ConfigAndPreset, EthSpec}; use validator_dir::Builder as ValidatorDirBuilder; +use warp::reply::Reply; +use warp::reply::Response; use warp::{ - http::{ - header::{HeaderValue, CONTENT_TYPE}, - response::Response, - StatusCode, - }, + http::header::{HeaderValue, CONTENT_TYPE}, sse::Event, Filter, }; - #[derive(Debug)] pub enum Error { Warp(warp::Error), @@ -271,7 +268,7 @@ pub fn serve( .and(warp::path("version")) .and(warp::path::end()) .and(signer.clone()) - .and_then(|signer| { + .then(|signer| { blocking_signed_json_task(signer, move || { Ok(api_types::GenericResponse::from(api_types::VersionData { version: version_with_platform(), @@ -284,7 +281,7 @@ pub fn serve( .and(warp::path("health")) .and(warp::path::end()) .and(signer.clone()) - .and_then(|signer| { + .then(|signer| { blocking_signed_json_task(signer, move || { eth2::lighthouse::Health::observe() .map(api_types::GenericResponse::from) @@ -298,7 +295,7 @@ pub fn serve( .and(warp::path::end()) .and(spec_filter.clone()) .and(signer.clone()) - .and_then(|spec: Arc<_>, signer| { + .then(|spec: Arc<_>, signer| { blocking_signed_json_task(signer, move || { let config = ConfigAndPreset::from_chain_spec::(&spec, None); Ok(api_types::GenericResponse::from(config)) @@ -311,7 +308,7 @@ pub fn serve( .and(warp::path::end()) .and(validator_store_filter.clone()) .and(signer.clone()) - .and_then(|validator_store: Arc>, signer| { + .then(|validator_store: Arc>, signer| { blocking_signed_json_task(signer, move || { let validators = validator_store .initialized_validators() @@ -336,7 +333,7 @@ pub fn serve( .and(warp::path::end()) .and(validator_store_filter.clone()) .and(signer.clone()) - .and_then( + .then( |validator_pubkey: PublicKey, validator_store: Arc>, signer| { blocking_signed_json_task(signer, move || { let validator = validator_store @@ -371,7 +368,7 @@ pub fn serve( .and(app_start_filter) .and(validator_dir_filter.clone()) .and(signer.clone()) - .and_then(|sysinfo, app_start: std::time::Instant, val_dir, signer| { + .then(|sysinfo, app_start: std::time::Instant, val_dir, signer| { blocking_signed_json_task(signer, move || { let app_uptime = app_start.elapsed().as_secs(); Ok(api_types::GenericResponse::from(observe_system_health_vc( @@ -389,7 +386,7 @@ pub fn serve( .and(graffiti_flag_filter) .and(signer.clone()) .and(log_filter.clone()) - .and_then( + .then( |validator_store: Arc>, graffiti_file: Option, graffiti_flag: Option, @@ -427,7 +424,7 @@ pub fn serve( .and(spec_filter.clone()) .and(signer.clone()) .and(task_executor_filter.clone()) - .and_then( + .then( move |body: Vec, validator_dir: PathBuf, secrets_dir: PathBuf, @@ -474,7 +471,7 @@ pub fn serve( .and(spec_filter) .and(signer.clone()) .and(task_executor_filter.clone()) - .and_then( + .then( move |body: api_types::CreateValidatorsMnemonicRequest, validator_dir: PathBuf, secrets_dir: PathBuf, @@ -523,7 +520,7 @@ pub fn serve( .and(validator_store_filter.clone()) .and(signer.clone()) .and(task_executor_filter.clone()) - .and_then( + .then( move |body: api_types::KeystoreValidatorsPostRequest, validator_dir: PathBuf, secrets_dir: PathBuf, @@ -609,7 +606,7 @@ pub fn serve( .and(validator_store_filter.clone()) .and(signer.clone()) .and(task_executor_filter.clone()) - .and_then( + .then( |body: Vec, validator_store: Arc>, signer, @@ -662,7 +659,7 @@ pub fn serve( .and(graffiti_file_filter.clone()) .and(signer.clone()) .and(task_executor_filter.clone()) - .and_then( + .then( |validator_pubkey: PublicKey, body: api_types::ValidatorPatchRequest, validator_store: Arc>, @@ -732,7 +729,7 @@ pub fn serve( let get_auth = get_auth .and(signer.clone()) .and(api_token_path_filter) - .and_then(|signer, token_path: PathBuf| { + .then(|signer, token_path: PathBuf| { blocking_signed_json_task(signer, move || { Ok(AuthResponse { token_path: token_path.display().to_string(), @@ -749,7 +746,7 @@ pub fn serve( .and(validator_store_filter.clone()) .and(task_executor_filter.clone()) .and(log_filter.clone()) - .and_then( + .then( move |request, signer, validator_store, task_executor, log| { blocking_signed_json_task(signer, move || { if allow_keystore_export { @@ -776,7 +773,7 @@ pub fn serve( .and(warp::path::end()) .and(validator_store_filter.clone()) .and(signer.clone()) - .and_then( + .then( |validator_pubkey: PublicKey, validator_store: Arc>, signer| { blocking_signed_json_task(signer, move || { if validator_store @@ -816,7 +813,7 @@ pub fn serve( .and(warp::path::end()) .and(validator_store_filter.clone()) .and(signer.clone()) - .and_then( + .then( |validator_pubkey: PublicKey, request: api_types::UpdateFeeRecipientRequest, validator_store: Arc>, @@ -856,7 +853,7 @@ pub fn serve( .and(warp::path::end()) .and(validator_store_filter.clone()) .and(signer.clone()) - .and_then( + .then( |validator_pubkey: PublicKey, validator_store: Arc>, signer| { blocking_signed_json_task(signer, move || { if validator_store @@ -893,7 +890,7 @@ pub fn serve( .and(warp::path::end()) .and(validator_store_filter.clone()) .and(signer.clone()) - .and_then( + .then( |validator_pubkey: PublicKey, validator_store: Arc>, signer| { blocking_signed_json_task(signer, move || { if validator_store @@ -925,7 +922,7 @@ pub fn serve( .and(warp::path::end()) .and(validator_store_filter.clone()) .and(signer.clone()) - .and_then( + .then( |validator_pubkey: PublicKey, request: api_types::UpdateGasLimitRequest, validator_store: Arc>, @@ -965,7 +962,7 @@ pub fn serve( .and(warp::path::end()) .and(validator_store_filter.clone()) .and(signer.clone()) - .and_then( + .then( |validator_pubkey: PublicKey, validator_store: Arc>, signer| { blocking_signed_json_task(signer, move || { if validator_store @@ -1006,7 +1003,7 @@ pub fn serve( .and(log_filter.clone()) .and(signer.clone()) .and(task_executor_filter.clone()) - .and_then( + .then( |pubkey: PublicKey, query: api_types::VoluntaryExitQuery, validator_store: Arc>, @@ -1043,7 +1040,7 @@ pub fn serve( .and(validator_store_filter.clone()) .and(graffiti_flag_filter) .and(signer.clone()) - .and_then( + .then( |pubkey: PublicKey, validator_store: Arc>, graffiti_flag: Option, @@ -1068,7 +1065,7 @@ pub fn serve( .and(validator_store_filter.clone()) .and(graffiti_file_filter.clone()) .and(signer.clone()) - .and_then( + .then( |pubkey: PublicKey, query: SetGraffitiRequest, validator_store: Arc>, @@ -1096,7 +1093,7 @@ pub fn serve( .and(validator_store_filter.clone()) .and(graffiti_file_filter.clone()) .and(signer.clone()) - .and_then( + .then( |pubkey: PublicKey, validator_store: Arc>, graffiti_file: Option, @@ -1118,7 +1115,7 @@ pub fn serve( let get_std_keystores = std_keystores .and(signer.clone()) .and(validator_store_filter.clone()) - .and_then(|signer, validator_store: Arc>| { + .then(|signer, validator_store: Arc>| { blocking_signed_json_task(signer, move || Ok(keystores::list(validator_store))) }); @@ -1131,7 +1128,7 @@ pub fn serve( .and(validator_store_filter.clone()) .and(task_executor_filter.clone()) .and(log_filter.clone()) - .and_then( + .then( move |request, signer, validator_dir, @@ -1160,7 +1157,7 @@ pub fn serve( .and(validator_store_filter.clone()) .and(task_executor_filter.clone()) .and(log_filter.clone()) - .and_then(|request, signer, validator_store, task_executor, log| { + .then(|request, signer, validator_store, task_executor, log| { blocking_signed_json_task(signer, move || { keystores::delete(request, validator_store, task_executor, log) }) @@ -1170,7 +1167,7 @@ pub fn serve( let get_std_remotekeys = std_remotekeys .and(signer.clone()) .and(validator_store_filter.clone()) - .and_then(|signer, validator_store: Arc>| { + .then(|signer, validator_store: Arc>| { blocking_signed_json_task(signer, move || Ok(remotekeys::list(validator_store))) }); @@ -1181,7 +1178,7 @@ pub fn serve( .and(validator_store_filter.clone()) .and(task_executor_filter.clone()) .and(log_filter.clone()) - .and_then(|request, signer, validator_store, task_executor, log| { + .then(|request, signer, validator_store, task_executor, log| { blocking_signed_json_task(signer, move || { remotekeys::import(request, validator_store, task_executor, log) }) @@ -1194,7 +1191,7 @@ pub fn serve( .and(validator_store_filter) .and(task_executor_filter) .and(log_filter.clone()) - .and_then(|request, signer, validator_store, task_executor, log| { + .then(|request, signer, validator_store, task_executor, log| { blocking_signed_json_task(signer, move || { remotekeys::delete(request, validator_store, task_executor, log) }) @@ -1316,41 +1313,62 @@ pub fn serve( Ok((listening_socket, server)) } +/// Convert a warp `Rejection` into a `Response`. +/// +/// This function should *always* be used to convert rejections into responses. This prevents warp +/// from trying to backtrack in strange ways. See: https://github.com/sigp/lighthouse/issues/3404 +pub async fn convert_rejection(e: warp::Rejection, signature: String) -> Response { + let mut resp = warp_utils::reject::handle_rejection(e) + .await + .into_response(); // It's infallible + let header_value = HeaderValue::from_str(&signature).expect("hash can be encoded as header"); + resp.headers_mut().append("Signature", header_value); + resp +} + /// Executes `func` in blocking tokio task (i.e., where long-running tasks are permitted). /// JSON-encodes the return value of `func`, using the `signer` function to produce a signature of /// those bytes. -pub async fn blocking_signed_json_task( - signer: S, - func: F, -) -> Result +pub async fn blocking_signed_json_task(signer: S, func: F) -> Response where S: Fn(&[u8]) -> String, F: FnOnce() -> Result + Send + 'static, T: Serialize + Send + 'static, { - warp_utils::task::blocking_task(func) - .await - .map(|func_output| { - let mut response = match serde_json::to_vec(&func_output) { + match warp_utils::task::blocking_task(func).await { + Ok(blocked_task) => { + match serde_json::to_vec(&blocked_task) { Ok(body) => { - let mut res = Response::new(body); - res.headers_mut() + let signature = signer(&body); + let mut response = Response::new(body.into()); + response + .headers_mut() .insert(CONTENT_TYPE, HeaderValue::from_static("application/json")); - res + let header_value = + HeaderValue::from_str(&signature).expect("hash can be encoded as header"); + response.headers_mut().append("Signature", header_value); + response } - Err(_) => Response::builder() - .status(StatusCode::INTERNAL_SERVER_ERROR) - .body(vec![]) - .expect("can produce simple response from static values"), - }; - - let body: &Vec = response.body(); - let signature = signer(body); - let header_value = - HeaderValue::from_str(&signature).expect("hash can be encoded as header"); - - response.headers_mut().append("Signature", header_value); - - response - }) + Err(_) => { + let data = "error producing response from blocking task"; + let body = warp::reply::json(&data); // This does not implement the trait serialize + let byte_slice: &[u8] = data.as_bytes(); + let mut rejection = + warp::reply::with_status(body, eth2::StatusCode::INTERNAL_SERVER_ERROR) + .into_response(); + let signature = signer(byte_slice); + let header_value = + HeaderValue::from_str(&signature).expect("hash can be encoded as header"); + rejection.headers_mut().append("Signature", header_value); + rejection + } + } + } + Err(rejection) => { + let data = "error in blocking task"; + let byte_slice: &[u8] = data.as_bytes(); + let signature = signer(byte_slice); + convert_rejection(rejection, signature).await + } + } }