Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(fix): HTTP error handling in VC API #4606

Closed
wants to merge 29 commits into from
Closed
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
5577b03
(fix): HTTP error handling in VC API
protocolwhisper Aug 10, 2023
db932da
Merge branch 'sigp:unstable' into unstable
protocolwhisper Aug 17, 2023
edecf5a
(rebase): HTTP error handling in VC API
protocolwhisper Aug 18, 2023
24fa06b
(rebase) : Not working
protocolwhisper Aug 19, 2023
166f0f5
Revert "(rebase) : Not working"
protocolwhisper Aug 19, 2023
09a0eb6
(rebase) : Http vc api
protocolwhisper Aug 20, 2023
802e7ea
(rebase): block_signed_task , blocking_task
protocolwhisper Aug 21, 2023
a27b18e
(Fix) : Rejection handling in VC API
protocolwhisper Aug 22, 2023
f05c3fb
(rebase) : Fix lint
protocolito Aug 28, 2023
b608d2e
Revert "(rebase) : Fix lint"
protocolito Aug 28, 2023
195b9b3
(rebase): Fix Lint
protocolwhisper Aug 28, 2023
e6fa5ce
Merge branch 'sigp:unstable' into unstable
protocolwhisper Aug 28, 2023
918f2ec
Merge branch 'sigp:unstable' into unstable
protocolwhisper Oct 3, 2023
669c7ea
(rebase) : Convert Rejection into signed Response
protocolwhisper Oct 4, 2023
66578b6
(rebase) : Improve rejection error handling
protocolwhisper Oct 8, 2023
34d07e5
(rebase): Improving rejection error handling
protocolwhisper Oct 8, 2023
19f7096
Merge branch 'sigp:unstable' into unstable
protocolwhisper Oct 8, 2023
1516ff7
(rebase): Fixing comments
protocolwhisper Oct 8, 2023
9759a88
Merge branch 'sigp:unstable' into unstable
protocolwhisper Oct 20, 2023
3271d4a
Merge branch 'sigp:unstable' into unstable
protocolwhisper Oct 31, 2023
52bdc5b
Merge branch 'sigp:unstable' into unstable
protocolwhisper Nov 5, 2023
f6671a1
Merge branch 'sigp:unstable' into unstable
protocolwhisper Nov 9, 2023
f77a5a0
Merge branch 'sigp:unstable' into unstable
protocolwhisper Nov 21, 2023
8e36053
(rebase) : empty lines
protocolwhisper Dec 17, 2023
6db95ae
Merge branch 'sigp:unstable' into unstable
protocolwhisper Jan 18, 2024
d590201
(rebase) : Changed output to Response<body>
protocolwhisper Jan 18, 2024
dd1149d
(fix): logic & error handling
protocolwhisper Jan 26, 2024
891e718
Run cargo fmt
dapplion Apr 2, 2024
b45c97e
Clean doc
dapplion Apr 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 80 additions & 62 deletions validator_client/src/http_api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,13 @@ use task_executor::TaskExecutor;
use tokio_stream::{wrappers::BroadcastStream, StreamExt};
use types::{ChainSpec, ConfigAndPreset, EthSpec};
use validator_dir::Builder as ValidatorDirBuilder;
use warp::reply::Reply;
use warp::reply::Response;
use warp::{
http::{
header::{HeaderValue, CONTENT_TYPE},
response::Response,
StatusCode,
},
http::header::{HeaderValue, CONTENT_TYPE},
sse::Event,
Filter,
};

#[derive(Debug)]
pub enum Error {
Warp(warp::Error),
Expand Down Expand Up @@ -271,7 +268,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::path("version"))
.and(warp::path::end())
.and(signer.clone())
.and_then(|signer| {
.then(|signer| {
blocking_signed_json_task(signer, move || {
Ok(api_types::GenericResponse::from(api_types::VersionData {
version: version_with_platform(),
Expand All @@ -284,7 +281,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::path("health"))
.and(warp::path::end())
.and(signer.clone())
.and_then(|signer| {
.then(|signer| {
blocking_signed_json_task(signer, move || {
eth2::lighthouse::Health::observe()
.map(api_types::GenericResponse::from)
Expand All @@ -298,7 +295,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::path::end())
.and(spec_filter.clone())
.and(signer.clone())
.and_then(|spec: Arc<_>, signer| {
.then(|spec: Arc<_>, signer| {
blocking_signed_json_task(signer, move || {
let config = ConfigAndPreset::from_chain_spec::<E>(&spec, None);
Ok(api_types::GenericResponse::from(config))
Expand All @@ -311,7 +308,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::path::end())
.and(validator_store_filter.clone())
.and(signer.clone())
.and_then(|validator_store: Arc<ValidatorStore<T, E>>, signer| {
.then(|validator_store: Arc<ValidatorStore<T, E>>, signer| {
blocking_signed_json_task(signer, move || {
let validators = validator_store
.initialized_validators()
Expand All @@ -336,7 +333,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::path::end())
.and(validator_store_filter.clone())
.and(signer.clone())
.and_then(
.then(
|validator_pubkey: PublicKey, validator_store: Arc<ValidatorStore<T, E>>, signer| {
blocking_signed_json_task(signer, move || {
let validator = validator_store
Expand Down Expand Up @@ -371,7 +368,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(app_start_filter)
.and(validator_dir_filter.clone())
.and(signer.clone())
.and_then(|sysinfo, app_start: std::time::Instant, val_dir, signer| {
.then(|sysinfo, app_start: std::time::Instant, val_dir, signer| {
blocking_signed_json_task(signer, move || {
let app_uptime = app_start.elapsed().as_secs();
Ok(api_types::GenericResponse::from(observe_system_health_vc(
Expand All @@ -389,7 +386,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(graffiti_flag_filter)
.and(signer.clone())
.and(log_filter.clone())
.and_then(
.then(
|validator_store: Arc<ValidatorStore<T, E>>,
graffiti_file: Option<GraffitiFile>,
graffiti_flag: Option<Graffiti>,
Expand Down Expand Up @@ -427,7 +424,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(spec_filter.clone())
.and(signer.clone())
.and(task_executor_filter.clone())
.and_then(
.then(
move |body: Vec<api_types::ValidatorRequest>,
validator_dir: PathBuf,
secrets_dir: PathBuf,
Expand Down Expand Up @@ -474,7 +471,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(spec_filter)
.and(signer.clone())
.and(task_executor_filter.clone())
.and_then(
.then(
move |body: api_types::CreateValidatorsMnemonicRequest,
validator_dir: PathBuf,
secrets_dir: PathBuf,
Expand Down Expand Up @@ -523,7 +520,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(validator_store_filter.clone())
.and(signer.clone())
.and(task_executor_filter.clone())
.and_then(
.then(
move |body: api_types::KeystoreValidatorsPostRequest,
validator_dir: PathBuf,
secrets_dir: PathBuf,
Expand Down Expand Up @@ -609,7 +606,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(validator_store_filter.clone())
.and(signer.clone())
.and(task_executor_filter.clone())
.and_then(
.then(
|body: Vec<api_types::Web3SignerValidatorRequest>,
validator_store: Arc<ValidatorStore<T, E>>,
signer,
Expand Down Expand Up @@ -662,7 +659,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(graffiti_file_filter.clone())
.and(signer.clone())
.and(task_executor_filter.clone())
.and_then(
.then(
|validator_pubkey: PublicKey,
body: api_types::ValidatorPatchRequest,
validator_store: Arc<ValidatorStore<T, E>>,
Expand Down Expand Up @@ -732,7 +729,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
let get_auth = get_auth
.and(signer.clone())
.and(api_token_path_filter)
.and_then(|signer, token_path: PathBuf| {
.then(|signer, token_path: PathBuf| {
blocking_signed_json_task(signer, move || {
Ok(AuthResponse {
token_path: token_path.display().to_string(),
Expand All @@ -749,7 +746,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(validator_store_filter.clone())
.and(task_executor_filter.clone())
.and(log_filter.clone())
.and_then(
.then(
move |request, signer, validator_store, task_executor, log| {
blocking_signed_json_task(signer, move || {
if allow_keystore_export {
Expand All @@ -776,7 +773,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::path::end())
.and(validator_store_filter.clone())
.and(signer.clone())
.and_then(
.then(
|validator_pubkey: PublicKey, validator_store: Arc<ValidatorStore<T, E>>, signer| {
blocking_signed_json_task(signer, move || {
if validator_store
Expand Down Expand Up @@ -816,7 +813,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::path::end())
.and(validator_store_filter.clone())
.and(signer.clone())
.and_then(
.then(
|validator_pubkey: PublicKey,
request: api_types::UpdateFeeRecipientRequest,
validator_store: Arc<ValidatorStore<T, E>>,
Expand Down Expand Up @@ -856,7 +853,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::path::end())
.and(validator_store_filter.clone())
.and(signer.clone())
.and_then(
.then(
|validator_pubkey: PublicKey, validator_store: Arc<ValidatorStore<T, E>>, signer| {
blocking_signed_json_task(signer, move || {
if validator_store
Expand Down Expand Up @@ -893,7 +890,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::path::end())
.and(validator_store_filter.clone())
.and(signer.clone())
.and_then(
.then(
|validator_pubkey: PublicKey, validator_store: Arc<ValidatorStore<T, E>>, signer| {
blocking_signed_json_task(signer, move || {
if validator_store
Expand Down Expand Up @@ -925,7 +922,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::path::end())
.and(validator_store_filter.clone())
.and(signer.clone())
.and_then(
.then(
|validator_pubkey: PublicKey,
request: api_types::UpdateGasLimitRequest,
validator_store: Arc<ValidatorStore<T, E>>,
Expand Down Expand Up @@ -965,7 +962,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::path::end())
.and(validator_store_filter.clone())
.and(signer.clone())
.and_then(
.then(
|validator_pubkey: PublicKey, validator_store: Arc<ValidatorStore<T, E>>, signer| {
blocking_signed_json_task(signer, move || {
if validator_store
Expand Down Expand Up @@ -1006,7 +1003,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(log_filter.clone())
.and(signer.clone())
.and(task_executor_filter.clone())
.and_then(
.then(
|pubkey: PublicKey,
query: api_types::VoluntaryExitQuery,
validator_store: Arc<ValidatorStore<T, E>>,
Expand Down Expand Up @@ -1043,7 +1040,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(validator_store_filter.clone())
.and(graffiti_flag_filter)
.and(signer.clone())
.and_then(
.then(
|pubkey: PublicKey,
validator_store: Arc<ValidatorStore<T, E>>,
graffiti_flag: Option<Graffiti>,
Expand All @@ -1068,7 +1065,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(validator_store_filter.clone())
.and(graffiti_file_filter.clone())
.and(signer.clone())
.and_then(
.then(
|pubkey: PublicKey,
query: SetGraffitiRequest,
validator_store: Arc<ValidatorStore<T, E>>,
Expand Down Expand Up @@ -1096,7 +1093,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(validator_store_filter.clone())
.and(graffiti_file_filter.clone())
.and(signer.clone())
.and_then(
.then(
|pubkey: PublicKey,
validator_store: Arc<ValidatorStore<T, E>>,
graffiti_file: Option<GraffitiFile>,
Expand All @@ -1118,7 +1115,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
let get_std_keystores = std_keystores
.and(signer.clone())
.and(validator_store_filter.clone())
.and_then(|signer, validator_store: Arc<ValidatorStore<T, E>>| {
.then(|signer, validator_store: Arc<ValidatorStore<T, E>>| {
blocking_signed_json_task(signer, move || Ok(keystores::list(validator_store)))
});

Expand All @@ -1131,7 +1128,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(validator_store_filter.clone())
.and(task_executor_filter.clone())
.and(log_filter.clone())
.and_then(
.then(
move |request,
signer,
validator_dir,
Expand Down Expand Up @@ -1160,7 +1157,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(validator_store_filter.clone())
.and(task_executor_filter.clone())
.and(log_filter.clone())
.and_then(|request, signer, validator_store, task_executor, log| {
.then(|request, signer, validator_store, task_executor, log| {
blocking_signed_json_task(signer, move || {
keystores::delete(request, validator_store, task_executor, log)
})
Expand All @@ -1170,7 +1167,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
let get_std_remotekeys = std_remotekeys
.and(signer.clone())
.and(validator_store_filter.clone())
.and_then(|signer, validator_store: Arc<ValidatorStore<T, E>>| {
.then(|signer, validator_store: Arc<ValidatorStore<T, E>>| {
blocking_signed_json_task(signer, move || Ok(remotekeys::list(validator_store)))
});

Expand All @@ -1181,7 +1178,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(validator_store_filter.clone())
.and(task_executor_filter.clone())
.and(log_filter.clone())
.and_then(|request, signer, validator_store, task_executor, log| {
.then(|request, signer, validator_store, task_executor, log| {
blocking_signed_json_task(signer, move || {
remotekeys::import(request, validator_store, task_executor, log)
})
Expand All @@ -1194,7 +1191,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(validator_store_filter)
.and(task_executor_filter)
.and(log_filter.clone())
.and_then(|request, signer, validator_store, task_executor, log| {
.then(|request, signer, validator_store, task_executor, log| {
blocking_signed_json_task(signer, move || {
remotekeys::delete(request, validator_store, task_executor, log)
})
protocolwhisper marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -1316,41 +1313,62 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
Ok((listening_socket, server))
}

/// Convert a warp `Rejection` into a `Response`.
///
/// This function should *always* be used to convert rejections into responses. This prevents warp
/// from trying to backtrack in strange ways. See: https://github.com/sigp/lighthouse/issues/3404
pub async fn convert_rejection(e: warp::Rejection, signature: String) -> Response {
let mut resp = warp_utils::reject::handle_rejection(e)
.await
.into_response(); // It's infallible
let header_value = HeaderValue::from_str(&signature).expect("hash can be encoded as header");
resp.headers_mut().append("Signature", header_value);
resp
}

/// Executes `func` in blocking tokio task (i.e., where long-running tasks are permitted).
/// JSON-encodes the return value of `func`, using the `signer` function to produce a signature of
/// those bytes.
pub async fn blocking_signed_json_task<S, F, T>(
signer: S,
func: F,
) -> Result<impl warp::Reply, warp::Rejection>
pub async fn blocking_signed_json_task<S, F, T>(signer: S, func: F) -> Response
where
S: Fn(&[u8]) -> String,
F: FnOnce() -> Result<T, warp::Rejection> + Send + 'static,
T: Serialize + Send + 'static,
{
warp_utils::task::blocking_task(func)
.await
.map(|func_output| {
let mut response = match serde_json::to_vec(&func_output) {
match warp_utils::task::blocking_task(func).await {
Ok(blocked_task) => {
match serde_json::to_vec(&blocked_task) {
Ok(body) => {
let mut res = Response::new(body);
res.headers_mut()
let signature = signer(&body);
let mut response = Response::new(body.into());
response
.headers_mut()
.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
res
let header_value =
HeaderValue::from_str(&signature).expect("hash can be encoded as header");
response.headers_mut().append("Signature", header_value);
response
}
Err(_) => Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(vec![])
.expect("can produce simple response from static values"),
};

let body: &Vec<u8> = response.body();
let signature = signer(body);
let header_value =
HeaderValue::from_str(&signature).expect("hash can be encoded as header");

response.headers_mut().append("Signature", header_value);

response
})
Err(_) => {
let data = "error producing response from blocking task";
let body = warp::reply::json(&data); // This does not implement the trait serialize
let byte_slice: &[u8] = data.as_bytes();
let mut rejection =
warp::reply::with_status(body, eth2::StatusCode::INTERNAL_SERVER_ERROR)
.into_response();
let signature = signer(byte_slice);
let header_value =
HeaderValue::from_str(&signature).expect("hash can be encoded as header");
rejection.headers_mut().append("Signature", header_value);
rejection
}
}
}
Err(rejection) => {
let data = "error in blocking task";
let byte_slice: &[u8] = data.as_bytes();
let signature = signer(byte_slice);
convert_rejection(rejection, signature).await
}
}
}
Loading