Skip to content

Commit

Permalink
Upgrade to hyper v0.13; use async/await (#85)
Browse files Browse the repository at this point in the history
* Upgrade to hyper v0.13; use async/await

* Use tokio runtime to create a working `block_on` method

 - delete futures crate from Cargo.toml
 - add tokio as a dev dependency

* rpc: slightly better HTTP error handling

Adds a bogus error code for low-level HTTP errors, and at least captures
the error message from the `http` crate or `hyper`.
  • Loading branch information
tony-iqlusion authored and liamsi committed Dec 11, 2019
1 parent 0b9450d commit 0869914
Show file tree
Hide file tree
Showing 6 changed files with 140 additions and 105 deletions.
5 changes: 4 additions & 1 deletion tendermint/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@ circle-ci = { repository = "interchainio/tendermint-rs" }

[dependencies]
bytes = "0.4"
bytes_0_5 = { version = "0.5", package = "bytes" }
chrono = { version = "0.4", features = ["serde"] }
failure = "0.1"
hyper = { version = "0.10" }
http = "0.2"
hyper = "0.13"
prost-amino = { version = "0.4.0" }
prost-amino-derive = { version = "0.4.0" }
rand_os = { version = "0.1" }
Expand All @@ -53,3 +55,4 @@ ed25519-dalek = {version = "1.0.0-pre.3", features = ["rand"]}

[dev-dependencies]
serde_json = "1"
tokio = "0.2"
132 changes: 63 additions & 69 deletions tendermint/src/rpc/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use crate::{
rpc::{self, endpoint::*, Error, Response},
Genesis,
};
use bytes_0_5::buf::ext::BufExt;
use hyper::header;
use std::io::Read;

/// Tendermint RPC client.
///
Expand All @@ -20,139 +20,134 @@ pub struct Client {

impl Client {
/// Create a new Tendermint RPC client, connecting to the given address
pub fn new(address: &net::Address) -> Result<Self, Error> {
pub async fn new(address: &net::Address) -> Result<Self, Error> {
let client = Client {
address: address.clone(),
};
client.health()?;
client.health().await?;
Ok(client)
}

/// `/abci_info`: get information about the ABCI application.
pub fn abci_info(&self) -> Result<abci_info::AbciInfo, Error> {
Ok(self.perform(abci_info::Request)?.response)
pub async fn abci_info(&self) -> Result<abci_info::AbciInfo, Error> {
Ok(self.perform(abci_info::Request).await?.response)
}

/// `/abci_query`: query the ABCI application
pub fn abci_query<D>(
pub async fn abci_query(
&self,
path: Option<abci::Path>,
data: D,
data: impl Into<Vec<u8>>,
height: Option<Height>,
prove: bool,
) -> Result<abci_query::AbciQuery, Error>
where
D: Into<Vec<u8>>,
{
) -> Result<abci_query::AbciQuery, Error> {
Ok(self
.perform(abci_query::Request::new(path, data, height, prove))?
.perform(abci_query::Request::new(path, data, height, prove))
.await?
.response)
}

/// `/block`: get block at a given height.
pub fn block<H>(&self, height: H) -> Result<block::Response, Error>
where
H: Into<Height>,
{
self.perform(block::Request::new(height.into()))
pub async fn block(&self, height: impl Into<Height>) -> Result<block::Response, Error> {
self.perform(block::Request::new(height.into())).await
}

/// `/block`: get the latest block.
pub fn latest_block(&self) -> Result<block::Response, Error> {
self.perform(block::Request::default())
pub async fn latest_block(&self) -> Result<block::Response, Error> {
self.perform(block::Request::default()).await
}

/// `/block_results`: get ABCI results for a block at a particular height.
pub fn block_results<H>(&self, height: H) -> Result<block_results::Response, Error>
pub async fn block_results<H>(&self, height: H) -> Result<block_results::Response, Error>
where
H: Into<Height>,
{
self.perform(block_results::Request::new(height.into()))
.await
}

/// `/block_results`: get ABCI results for the latest block.
pub fn latest_block_results(&self) -> Result<block_results::Response, Error> {
self.perform(block_results::Request::default())
pub async fn latest_block_results(&self) -> Result<block_results::Response, Error> {
self.perform(block_results::Request::default()).await
}

/// `/blockchain`: get block headers for `min` <= `height` <= `max`.
///
/// Block headers are returned in descending order (highest first).
///
/// Returns at most 20 items.
pub fn blockchain<H>(&self, min: H, max: H) -> Result<blockchain::Response, Error>
where
H: Into<Height>,
{
pub async fn blockchain(
&self,
min: impl Into<Height>,
max: impl Into<Height>,
) -> Result<blockchain::Response, Error> {
// TODO(tarcieri): return errors for invalid params before making request?
self.perform(blockchain::Request::new(min.into(), max.into()))
.await
}

/// `/broadcast_tx_async`: broadcast a transaction, returning immediately.
pub fn broadcast_tx_async(
pub async fn broadcast_tx_async(
&self,
tx: Transaction,
) -> Result<broadcast::tx_async::Response, Error> {
self.perform(broadcast::tx_async::Request::new(tx))
self.perform(broadcast::tx_async::Request::new(tx)).await
}

/// `/broadcast_tx_sync`: broadcast a transaction, returning the response
/// from `CheckTx`.
pub fn broadcast_tx_sync(
pub async fn broadcast_tx_sync(
&self,
tx: Transaction,
) -> Result<broadcast::tx_sync::Response, Error> {
self.perform(broadcast::tx_sync::Request::new(tx))
self.perform(broadcast::tx_sync::Request::new(tx)).await
}

/// `/broadcast_tx_sync`: broadcast a transaction, returning the response
/// from `CheckTx`.
pub fn broadcast_tx_commit(
pub async fn broadcast_tx_commit(
&self,
tx: Transaction,
) -> Result<broadcast::tx_commit::Response, Error> {
self.perform(broadcast::tx_commit::Request::new(tx))
self.perform(broadcast::tx_commit::Request::new(tx)).await
}

/// `/commit`: get block commit at a given height.
pub fn commit<H>(&self, height: H) -> Result<commit::Response, Error>
where
H: Into<Height>,
{
self.perform(commit::Request::new(height.into()))
pub async fn commit(&self, height: impl Into<Height>) -> Result<commit::Response, Error> {
self.perform(commit::Request::new(height.into())).await
}

/// `/commit`: get the latest block commit
pub fn latest_commit(&self) -> Result<commit::Response, Error> {
self.perform(commit::Request::default())
pub async fn latest_commit(&self) -> Result<commit::Response, Error> {
self.perform(commit::Request::default()).await
}

/// `/health`: get node health.
///
/// Returns empty result (200 OK) on success, no response in case of an error.
pub fn health(&self) -> Result<(), Error> {
self.perform(health::Request)?;
pub async fn health(&self) -> Result<(), Error> {
self.perform(health::Request).await?;
Ok(())
}

/// `/genesis`: get genesis file.
pub fn genesis(&self) -> Result<Genesis, Error> {
Ok(self.perform(genesis::Request)?.genesis)
pub async fn genesis(&self) -> Result<Genesis, Error> {
Ok(self.perform(genesis::Request).await?.genesis)
}

/// `/net_info`: obtain information about P2P and other network connections.
pub fn net_info(&self) -> Result<net_info::Response, Error> {
self.perform(net_info::Request)
pub async fn net_info(&self) -> Result<net_info::Response, Error> {
self.perform(net_info::Request).await
}

/// `/status`: get Tendermint status including node info, pubkey, latest
/// block hash, app hash, block height and time.
pub fn status(&self) -> Result<status::Response, Error> {
self.perform(status::Request)
pub async fn status(&self) -> Result<status::Response, Error> {
self.perform(status::Request).await
}

/// Perform a request against the RPC endpoint
pub fn perform<R>(&self, request: R) -> Result<R::Response, Error>
pub async fn perform<R>(&self, request: R) -> Result<R::Response, Error>
where
R: rpc::Request,
{
Expand All @@ -168,26 +163,25 @@ impl Client {
}
};

let mut headers = hyper::header::Headers::new();

// TODO(tarcieri): persistent connections
headers.set(header::Connection::close());
headers.set(header::ContentType::json());
headers.set(header::UserAgent("tendermint.rs RPC client".to_owned()));

let http_client = hyper::Client::new();

let mut res = http_client
.request(hyper::Post, &format!("http://{}:{}/", host, port))
.headers(headers)
.body(&request_body[..])
.send()
.map_err(Error::server_error)?;

let mut response_body = Vec::new();
res.read_to_end(&mut response_body)
.map_err(Error::server_error)?;

R::Response::from_json(&response_body)
let mut request = hyper::Request::builder()
.method("POST")
.uri(&format!("http://{}:{}/", host, port))
.body(hyper::Body::from(request_body.into_bytes()))?;

{
let headers = request.headers_mut();
headers.insert(header::CONTENT_TYPE, "application/json".parse().unwrap());
headers.insert(
header::USER_AGENT,
format!("tendermint.rs/{}", env!("CARGO_PKG_VERSION"))
.parse()
.unwrap(),
);
}

let http_client = hyper::Client::builder().keep_alive(true).build_http();
let response = http_client.request(request).await?;
let response_body = hyper::body::aggregate(response.into_body()).await?;
R::Response::from_reader(response_body.reader())
}
}
24 changes: 23 additions & 1 deletion tendermint/src/rpc/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,23 @@ impl Error {
/// Create a new RPC error
pub fn new(code: Code, data: Option<String>) -> Error {
let message = code.to_string();

Error {
code,
message,
data,
}
}

/// Create a low-level HTTP error
pub fn http_error(message: impl Into<String>) -> Error {
Error {
code: Code::HttpError,
message: message.into(),
data: None,
}
}

/// Create a new invalid parameter error
pub fn invalid_params(data: &str) -> Error {
Error::new(Code::InvalidParams, Some(data.to_string()))
Expand Down Expand Up @@ -91,9 +101,15 @@ impl Fail for Error {
}
}

impl From<http::Error> for Error {
fn from(http_error: http::Error) -> Error {
Error::http_error(http_error.to_string())
}
}

impl From<hyper::Error> for Error {
fn from(hyper_error: hyper::Error) -> Error {
panic!("what am I supposed to do with this? {:?}", hyper_error);
Error::http_error(hyper_error.to_string())
}
}

Expand All @@ -103,6 +119,10 @@ impl From<hyper::Error> for Error {
/// <https://github.com/tendermint/tendermint/blob/master/rpc/lib/types/types.go>
#[derive(Copy, Clone, Debug, Eq, Fail, Hash, PartialEq, PartialOrd, Ord)]
pub enum Code {
/// Low-level HTTP error
#[fail(display = "HTTP error")]
HttpError,

/// Parse error i.e. invalid JSON (-32700)
#[fail(display = "Parse error. Invalid JSON")]
ParseError,
Expand Down Expand Up @@ -142,6 +162,7 @@ impl Code {
impl From<i32> for Code {
fn from(value: i32) -> Code {
match value {
0 => Code::HttpError,
-32700 => Code::ParseError,
-32600 => Code::InvalidRequest,
-32601 => Code::MethodNotFound,
Expand All @@ -156,6 +177,7 @@ impl From<i32> for Code {
impl From<Code> for i32 {
fn from(code: Code) -> i32 {
match code {
Code::HttpError => 0,
Code::ParseError => -32700,
Code::InvalidRequest => -32600,
Code::MethodNotFound => -32601,
Expand Down
11 changes: 7 additions & 4 deletions tendermint/src/rpc/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,20 @@

use super::{Error, Id, Version};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::io::Read;

/// JSONRPC responses
pub trait Response: Serialize + DeserializeOwned + Sized {
/// Parse a JSONRPC response from a JSON string
fn from_json<T>(response: T) -> Result<Self, Error>
where
T: AsRef<[u8]>,
{
fn from_string(response: impl AsRef<[u8]>) -> Result<Self, Error> {
let wrapper: Wrapper<Self> =
serde_json::from_slice(response.as_ref()).map_err(Error::parse_error)?;
wrapper.into_result()
}

/// Parse a JSONRPC response from an `io::Reader`
fn from_reader(reader: impl Read) -> Result<Self, Error> {
let wrapper: Wrapper<Self> = serde_json::from_reader(reader).map_err(Error::parse_error)?;
wrapper.into_result()
}
}
Expand Down
Loading

0 comments on commit 0869914

Please sign in to comment.