Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(fix): HTTP error handling in VC API #4606

Closed
wants to merge 29 commits into from
Closed
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
5577b03
(fix): HTTP error handling in VC API
protocolwhisper Aug 10, 2023
db932da
Merge branch 'sigp:unstable' into unstable
protocolwhisper Aug 17, 2023
edecf5a
(rebase): HTTP error handling in VC API
protocolwhisper Aug 18, 2023
24fa06b
(rebase) : Not working
protocolwhisper Aug 19, 2023
166f0f5
Revert "(rebase) : Not working"
protocolwhisper Aug 19, 2023
09a0eb6
(rebase) : Http vc api
protocolwhisper Aug 20, 2023
802e7ea
(rebase): block_signed_task , blocking_task
protocolwhisper Aug 21, 2023
a27b18e
(Fix) : Rejection handling in VC API
protocolwhisper Aug 22, 2023
f05c3fb
(rebase) : Fix lint
protocolito Aug 28, 2023
b608d2e
Revert "(rebase) : Fix lint"
protocolito Aug 28, 2023
195b9b3
(rebase): Fix Lint
protocolwhisper Aug 28, 2023
e6fa5ce
Merge branch 'sigp:unstable' into unstable
protocolwhisper Aug 28, 2023
918f2ec
Merge branch 'sigp:unstable' into unstable
protocolwhisper Oct 3, 2023
669c7ea
(rebase) : Convert Rejection into signed Response
protocolwhisper Oct 4, 2023
66578b6
(rebase) : Improve rejection error handling
protocolwhisper Oct 8, 2023
34d07e5
(rebase): Improving rejection error handling
protocolwhisper Oct 8, 2023
19f7096
Merge branch 'sigp:unstable' into unstable
protocolwhisper Oct 8, 2023
1516ff7
(rebase): Fixing comments
protocolwhisper Oct 8, 2023
9759a88
Merge branch 'sigp:unstable' into unstable
protocolwhisper Oct 20, 2023
3271d4a
Merge branch 'sigp:unstable' into unstable
protocolwhisper Oct 31, 2023
52bdc5b
Merge branch 'sigp:unstable' into unstable
protocolwhisper Nov 5, 2023
f6671a1
Merge branch 'sigp:unstable' into unstable
protocolwhisper Nov 9, 2023
f77a5a0
Merge branch 'sigp:unstable' into unstable
protocolwhisper Nov 21, 2023
8e36053
(rebase) : empty lines
protocolwhisper Dec 17, 2023
6db95ae
Merge branch 'sigp:unstable' into unstable
protocolwhisper Jan 18, 2024
d590201
(rebase) : Changed output to Response<body>
protocolwhisper Jan 18, 2024
dd1149d
(fix): logic & error handling
protocolwhisper Jan 26, 2024
891e718
Run cargo fmt
dapplion Apr 2, 2024
b45c97e
Clean doc
dapplion Apr 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion common/warp_utils/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ where
.await
.unwrap_or_else(|_| Err(warp::reject::reject()))
}

protocolwhisper marked this conversation as resolved.
Show resolved Hide resolved
/// A convenience wrapper around `blocking_task` that returns a `warp::reply::Response`.
///
/// Using this method consistently makes it possible to simplify types using `.unify()` or `.uor()`.
Expand Down
124 changes: 75 additions & 49 deletions validator_client/src/http_api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use task_executor::TaskExecutor;
use tokio_stream::{wrappers::BroadcastStream, StreamExt};
use types::{ChainSpec, ConfigAndPreset, EthSpec};
use validator_dir::Builder as ValidatorDirBuilder;
use warp::reply::Reply;
use warp::{
http::{
header::{HeaderValue, CONTENT_TYPE},
Expand Down Expand Up @@ -265,7 +266,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::path("version"))
.and(warp::path::end())
.and(signer.clone())
.and_then(|signer| {
.then(|signer| {
blocking_signed_json_task(signer, move || {
Ok(api_types::GenericResponse::from(api_types::VersionData {
version: version_with_platform(),
Expand All @@ -278,7 +279,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::path("health"))
.and(warp::path::end())
.and(signer.clone())
.and_then(|signer| {
.then(|signer| {
blocking_signed_json_task(signer, move || {
eth2::lighthouse::Health::observe()
.map(api_types::GenericResponse::from)
Expand All @@ -292,7 +293,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::path::end())
.and(spec_filter.clone())
.and(signer.clone())
.and_then(|spec: Arc<_>, signer| {
.then(|spec: Arc<_>, signer| {
blocking_signed_json_task(signer, move || {
let config = ConfigAndPreset::from_chain_spec::<E>(&spec, None);
Ok(api_types::GenericResponse::from(config))
Expand All @@ -305,7 +306,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::path::end())
.and(validator_store_filter.clone())
.and(signer.clone())
.and_then(|validator_store: Arc<ValidatorStore<T, E>>, signer| {
.then(|validator_store: Arc<ValidatorStore<T, E>>, signer| {
blocking_signed_json_task(signer, move || {
let validators = validator_store
.initialized_validators()
Expand All @@ -330,7 +331,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::path::end())
.and(validator_store_filter.clone())
.and(signer.clone())
.and_then(
.then(
|validator_pubkey: PublicKey, validator_store: Arc<ValidatorStore<T, E>>, signer| {
blocking_signed_json_task(signer, move || {
let validator = validator_store
Expand Down Expand Up @@ -365,7 +366,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(app_start_filter)
.and(validator_dir_filter.clone())
.and(signer.clone())
.and_then(|sysinfo, app_start: std::time::Instant, val_dir, signer| {
.then(|sysinfo, app_start: std::time::Instant, val_dir, signer| {
blocking_signed_json_task(signer, move || {
let app_uptime = app_start.elapsed().as_secs();
Ok(api_types::GenericResponse::from(observe_system_health_vc(
Expand All @@ -383,7 +384,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(graffiti_flag_filter)
.and(signer.clone())
.and(log_filter.clone())
.and_then(
.then(
|validator_store: Arc<ValidatorStore<T, E>>,
graffiti_file: Option<GraffitiFile>,
graffiti_flag: Option<Graffiti>,
Expand Down Expand Up @@ -421,7 +422,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(spec_filter.clone())
.and(signer.clone())
.and(task_executor_filter.clone())
.and_then(
.then(
move |body: Vec<api_types::ValidatorRequest>,
validator_dir: PathBuf,
secrets_dir: PathBuf,
Expand Down Expand Up @@ -468,7 +469,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(spec_filter)
.and(signer.clone())
.and(task_executor_filter.clone())
.and_then(
.then(
move |body: api_types::CreateValidatorsMnemonicRequest,
validator_dir: PathBuf,
secrets_dir: PathBuf,
Expand Down Expand Up @@ -517,7 +518,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(validator_store_filter.clone())
.and(signer.clone())
.and(task_executor_filter.clone())
.and_then(
.then(
move |body: api_types::KeystoreValidatorsPostRequest,
validator_dir: PathBuf,
secrets_dir: PathBuf,
Expand Down Expand Up @@ -603,7 +604,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(validator_store_filter.clone())
.and(signer.clone())
.and(task_executor_filter.clone())
.and_then(
.then(
|body: Vec<api_types::Web3SignerValidatorRequest>,
validator_store: Arc<ValidatorStore<T, E>>,
signer,
Expand Down Expand Up @@ -656,7 +657,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(graffiti_file_filter)
.and(signer.clone())
.and(task_executor_filter.clone())
.and_then(
.then(
|validator_pubkey: PublicKey,
body: api_types::ValidatorPatchRequest,
validator_store: Arc<ValidatorStore<T, E>>,
Expand Down Expand Up @@ -726,7 +727,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
let get_auth = get_auth
.and(signer.clone())
.and(api_token_path_filter)
.and_then(|signer, token_path: PathBuf| {
.then(|signer, token_path: PathBuf| {
blocking_signed_json_task(signer, move || {
Ok(AuthResponse {
token_path: token_path.display().to_string(),
Expand All @@ -743,7 +744,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(validator_store_filter.clone())
.and(task_executor_filter.clone())
.and(log_filter.clone())
.and_then(
.then(
move |request, signer, validator_store, task_executor, log| {
blocking_signed_json_task(signer, move || {
if allow_keystore_export {
Expand All @@ -770,7 +771,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::path::end())
.and(validator_store_filter.clone())
.and(signer.clone())
.and_then(
.then(
|validator_pubkey: PublicKey, validator_store: Arc<ValidatorStore<T, E>>, signer| {
blocking_signed_json_task(signer, move || {
if validator_store
Expand Down Expand Up @@ -810,7 +811,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::path::end())
.and(validator_store_filter.clone())
.and(signer.clone())
.and_then(
.then(
|validator_pubkey: PublicKey,
request: api_types::UpdateFeeRecipientRequest,
validator_store: Arc<ValidatorStore<T, E>>,
Expand Down Expand Up @@ -850,7 +851,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::path::end())
.and(validator_store_filter.clone())
.and(signer.clone())
.and_then(
.then(
|validator_pubkey: PublicKey, validator_store: Arc<ValidatorStore<T, E>>, signer| {
blocking_signed_json_task(signer, move || {
if validator_store
Expand Down Expand Up @@ -887,7 +888,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::path::end())
.and(validator_store_filter.clone())
.and(signer.clone())
.and_then(
.then(
|validator_pubkey: PublicKey, validator_store: Arc<ValidatorStore<T, E>>, signer| {
blocking_signed_json_task(signer, move || {
if validator_store
Expand Down Expand Up @@ -919,7 +920,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::path::end())
.and(validator_store_filter.clone())
.and(signer.clone())
.and_then(
.then(
|validator_pubkey: PublicKey,
request: api_types::UpdateGasLimitRequest,
validator_store: Arc<ValidatorStore<T, E>>,
Expand Down Expand Up @@ -959,7 +960,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(warp::path::end())
.and(validator_store_filter.clone())
.and(signer.clone())
.and_then(
.then(
|validator_pubkey: PublicKey, validator_store: Arc<ValidatorStore<T, E>>, signer| {
blocking_signed_json_task(signer, move || {
if validator_store
Expand Down Expand Up @@ -1000,7 +1001,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(log_filter.clone())
.and(signer.clone())
.and(task_executor_filter.clone())
.and_then(
.then(
|pubkey: PublicKey,
query: api_types::VoluntaryExitQuery,
validator_store: Arc<ValidatorStore<T, E>>,
Expand Down Expand Up @@ -1032,7 +1033,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
let get_std_keystores = std_keystores
.and(signer.clone())
.and(validator_store_filter.clone())
.and_then(|signer, validator_store: Arc<ValidatorStore<T, E>>| {
.then(|signer, validator_store: Arc<ValidatorStore<T, E>>| {
blocking_signed_json_task(signer, move || Ok(keystores::list(validator_store)))
});

Expand All @@ -1045,7 +1046,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(validator_store_filter.clone())
.and(task_executor_filter.clone())
.and(log_filter.clone())
.and_then(
.then(
move |request,
signer,
validator_dir,
Expand Down Expand Up @@ -1074,7 +1075,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(validator_store_filter.clone())
.and(task_executor_filter.clone())
.and(log_filter.clone())
.and_then(|request, signer, validator_store, task_executor, log| {
.then(|request, signer, validator_store, task_executor, log| {
blocking_signed_json_task(signer, move || {
keystores::delete(request, validator_store, task_executor, log)
})
Expand All @@ -1084,7 +1085,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
let get_std_remotekeys = std_remotekeys
.and(signer.clone())
.and(validator_store_filter.clone())
.and_then(|signer, validator_store: Arc<ValidatorStore<T, E>>| {
.then(|signer, validator_store: Arc<ValidatorStore<T, E>>| {
blocking_signed_json_task(signer, move || Ok(remotekeys::list(validator_store)))
});

Expand All @@ -1095,7 +1096,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(validator_store_filter.clone())
.and(task_executor_filter.clone())
.and(log_filter.clone())
.and_then(|request, signer, validator_store, task_executor, log| {
.then(|request, signer, validator_store, task_executor, log| {
blocking_signed_json_task(signer, move || {
remotekeys::import(request, validator_store, task_executor, log)
})
Expand All @@ -1108,7 +1109,7 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
.and(validator_store_filter)
.and(task_executor_filter)
.and(log_filter.clone())
.and_then(|request, signer, validator_store, task_executor, log| {
.then(|request, signer, validator_store, task_executor, log| {
blocking_signed_json_task(signer, move || {
remotekeys::delete(request, validator_store, task_executor, log)
})
protocolwhisper marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -1227,22 +1228,18 @@ pub fn serve<T: 'static + SlotClock + Clone, E: EthSpec>(
Ok((listening_socket, server))
}

/// Executes `func` in blocking tokio task (i.e., where long-running tasks are permitted).
/// JSON-encodes the return value of `func`, using the `signer` function to produce a signature of
/// those bytes.
pub async fn blocking_signed_json_task<S, F, T>(
signer: S,
func: F,
) -> Result<impl warp::Reply, warp::Rejection>
// Convert a warp `Rejection` into a `Response`.
///
/// This function should *always* be used to convert rejections into responses. This prevents warp
/// from trying to backtrack in strange ways. See: https://github.com/sigp/lighthouse/issues/3404

pub async fn convert_rejection<T>(res: Result<T, warp::Rejection>) -> Response<Vec<u8>>
where
S: Fn(&[u8]) -> String,
F: FnOnce() -> Result<T, warp::Rejection> + Send + 'static,
T: Serialize + Send + 'static,
{
warp_utils::task::blocking_task(func)
.await
.map(|func_output| {
let mut response = match serde_json::to_vec(&func_output) {
match res {
Ok(func_output) => {
protocolwhisper marked this conversation as resolved.
Show resolved Hide resolved
let response = match serde_json::to_vec(&func_output) {
Ok(body) => {
let mut res = Response::new(body);
res.headers_mut()
Expand All @@ -1254,14 +1251,43 @@ where
.body(vec![])
.expect("can produce simple response from static values"),
};

let body: &Vec<u8> = response.body();
let signature = signer(body);
let header_value =
HeaderValue::from_str(&signature).expect("hash can be encoded as header");

response.headers_mut().append("Signature", header_value);

response
})
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is bit isn't really doing what the function is advertised to do; I think it probably falls under the responsibility of the blocking_signed_json_task function instead as it describes here:

/// Executes `func` in blocking tokio task (i.e., where long-running tasks are permitted).
/// JSON-encodes the return value of `func`, using the `signer` function to produce a signature of
/// those bytes.
pub async fn blocking_signed_json_task<S, F, T>(signer: S, func: F) -> Response<Vec<u8>>

Is it possible to move this block there to keep the convert_rejection only do what it's supposed to do?

Convert a warp Rejection into a Response.

If it's too much of an issue, we could consider renaming this function to something like convert_into_json_response?

Err(e) => {
let resp = warp_utils::reject::handle_rejection(e).await;
//Building the response with the Rejection
match resp {
Ok(reply) => {
let response = reply.into_response();
let (_parts, body) = response.into_parts();
let body_bytes = hyper::body::to_bytes(body).await.unwrap();
protocolwhisper marked this conversation as resolved.
Show resolved Hide resolved
Response::builder()
.status(StatusCode::BAD_REQUEST)
.body(body_bytes.to_vec())
.expect("can't produce response from rejection")
protocolwhisper marked this conversation as resolved.
Show resolved Hide resolved
}
Err(_) => Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(vec![])
.expect("unhandled error in blocking task"),
protocolwhisper marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
}
/// Executes `func` in blocking tokio task (i.e., where long-running tasks are permitted).
/// JSON-encodes the return value of `func`, using the `signer` function to produce a signature of
/// those bytes.
pub async fn blocking_signed_json_task<S, F, T>(signer: S, func: F) -> Response<Vec<u8>>
where
S: Fn(&[u8]) -> String,
F: FnOnce() -> Result<T, warp::Rejection> + Send + 'static,
T: Serialize + Send + 'static,
{
let response = warp_utils::task::blocking_task(func).await;
let mut conv_res = convert_rejection(response).await;
let body: &Vec<u8> = conv_res.body();
let signature = signer(body);
let header_value = HeaderValue::from_str(&signature).expect("hash can be encoded as header");
conv_res.headers_mut().append("Signature", header_value);
conv_res
}