Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Handle removed logs in filter changes and add geth compatibility field #8796

Merged
merged 11 commits into from
Jun 13, 2018
2 changes: 1 addition & 1 deletion rpc/src/v1/helpers/poll_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub enum PollFilter {
/// Hashes of all transactions which client was notified about.
PendingTransaction(Vec<H256>),
/// Number of From block number, pending logs and log filter itself.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add documentation for this new parameter: "last seen block hash"?

Logs(BlockNumber, HashSet<Log>, Filter)
Logs(BlockNumber, Option<H256>, HashSet<Log>, Filter)
}

/// Returns only last `n` logs
Expand Down
60 changes: 53 additions & 7 deletions rpc/src/v1/impls/eth_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub trait Filterable {
fn best_block_number(&self) -> u64;

/// Get a block hash by block id.
fn block_hash(&self, id: BlockId) -> Option<RpcH256>;
fn block_hash(&self, id: BlockId) -> Option<H256>;

/// pending transaction hashes at the given block.
fn pending_transactions_hashes(&self) -> Vec<H256>;
Expand All @@ -52,6 +52,9 @@ pub trait Filterable {

/// Get a reference to the poll manager.
fn polls(&self) -> &Mutex<PollManager<PollFilter>>;

/// Get removed logs within route from the given block to the nearest canon block, not including the canon block. Also returns how many logs have been traversed.
fn canon_logs(&self, block_hash: H256, filter: &EthcoreFilter) -> (Vec<Log>, u64);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we rename this to something like removed_logs or reorged_logs? Something other than canon_logs since we're returning logs that are not canon.

}

/// Eth filter rpc implementation for a full node.
Expand Down Expand Up @@ -80,8 +83,8 @@ impl<C, M> Filterable for EthFilterClient<C, M> where
self.client.chain_info().best_block_number
}

fn block_hash(&self, id: BlockId) -> Option<RpcH256> {
self.client.block_hash(id).map(Into::into)
fn block_hash(&self, id: BlockId) -> Option<H256> {
self.client.block_hash(id)
}

fn pending_transactions_hashes(&self) -> Vec<H256> {
Expand All @@ -100,13 +103,47 @@ impl<C, M> Filterable for EthFilterClient<C, M> where
}

fn polls(&self) -> &Mutex<PollManager<PollFilter>> { &self.polls }

fn canon_logs(&self, block_hash: H256, filter: &EthcoreFilter) -> (Vec<Log>, u64) {
let inner = || -> Option<Vec<H256>> {
let mut route = Vec::new();

let mut current_block_hash = block_hash;
let mut current_block_header = self.client.block_header(BlockId::Hash(current_block_hash))?;

while current_block_hash != self.client.block_hash(BlockId::Number(current_block_header.number()))? {
route.push(current_block_hash);

current_block_hash = current_block_header.parent_hash();
current_block_header = self.client.block_header(BlockId::Hash(current_block_hash))?;
}

Some(route)
};

let route = inner().unwrap_or_default();
let route_len = route.len() as u64;
(route.into_iter().flat_map(|block_hash| {
let mut filter = filter.clone();
filter.from_block = BlockId::Hash(block_hash);
filter.to_block = filter.from_block;

self.client.logs(filter).into_iter().map(|log| {
let mut log: Log = log.into();
log.log_type = "removed".into();
log.removed = true;

log
})
}).collect(), route_len)
}
}

impl<T: Filterable + Send + Sync + 'static> EthFilter for T {
fn new_filter(&self, filter: Filter) -> Result<RpcU256> {
let mut polls = self.polls().lock();
let block_number = self.best_block_number();
let id = polls.create_poll(PollFilter::Logs(block_number, Default::default(), filter));
let id = polls.create_poll(PollFilter::Logs(block_number, None, Default::default(), filter));
Ok(id.into())
}

Expand Down Expand Up @@ -134,7 +171,7 @@ impl<T: Filterable + Send + Sync + 'static> EthFilter for T {
let current_number = self.best_block_number() + 1;
let hashes = (*block_number..current_number).into_iter()
.map(BlockId::Number)
.filter_map(|id| self.block_hash(id))
.filter_map(|id| self.block_hash(id).map(Into::into))
.collect::<Vec<RpcH256>>();

*block_number = current_number;
Expand Down Expand Up @@ -164,7 +201,7 @@ impl<T: Filterable + Send + Sync + 'static> EthFilter for T {
// return new hashes
Either::A(future::ok(FilterChanges::Hashes(new_hashes)))
},
PollFilter::Logs(ref mut block_number, ref mut previous_logs, ref filter) => {
PollFilter::Logs(ref mut block_number, ref mut last_block_hash, ref mut previous_logs, ref filter) => {
// retrive the current block number
let current_number = self.best_block_number();

Expand All @@ -176,6 +213,10 @@ impl<T: Filterable + Send + Sync + 'static> EthFilter for T {
filter.from_block = BlockId::Number(*block_number);
filter.to_block = BlockId::Latest;

// retrieve reorg logs
let (mut reorg, reorg_len) = last_block_hash.map_or_else(|| (Vec::new(), 0), |h| self.canon_logs(h, &filter));
*block_number -= reorg_len as u64;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't affect the filter that's used below when getting pending logs. And it's overridden afterwards with current + 1.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My mistake. Filter from_block should be set after we alter block_number.


// retrieve pending logs
let pending = if include_pending {
let pending_logs = self.pending_logs(current_number, &filter);
Expand All @@ -198,9 +239,14 @@ impl<T: Filterable + Send + Sync + 'static> EthFilter for T {
// we want to get logs
*block_number = current_number + 1;

// save the current block hash, which we used to get back to the
// canon chain in case of reorg.
*last_block_hash = self.block_hash(BlockId::Number(current_number));

// retrieve logs in range from_block..min(BlockId::Latest..to_block)
let limit = filter.limit;
Either::B(self.logs(filter)
.map(move |logs| { reorg.extend(logs); reorg }) // append reorg logs in the front
.map(move |mut logs| { logs.extend(pending); logs }) // append fetched pending logs
.map(move |logs| limit_logs(logs, limit)) // limit the logs
.map(FilterChanges::Logs))
Expand All @@ -214,7 +260,7 @@ impl<T: Filterable + Send + Sync + 'static> EthFilter for T {
let mut polls = self.polls().lock();

match polls.poll(&index.value()) {
Some(&PollFilter::Logs(ref _block_number, ref _previous_log, ref filter)) => filter.clone(),
Some(&PollFilter::Logs(ref _block_number, ref _last_block_hash, ref _previous_log, ref filter)) => filter.clone(),
// just empty array
Some(_) => return Box::new(future::ok(Vec::new())),
None => return Box::new(future::err(errors::filter_not_found())),
Expand Down
1 change: 1 addition & 0 deletions rpc/src/v1/impls/eth_pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ impl<C: BlockChainClient> ChainNotify for ChainNotificationHandler<C> {
&ChainRouteType::Retracted =>
Ok(self.client.logs(filter).into_iter().map(Into::into).map(|mut log: Log| {
log.log_type = "removed".into();
log.removed = true;
log
}).collect()),
}
Expand Down
8 changes: 6 additions & 2 deletions rpc/src/v1/impls/light/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -529,8 +529,8 @@ impl<T: LightChainClient + 'static> Eth for EthClient<T> {
impl<T: LightChainClient + 'static> Filterable for EthClient<T> {
fn best_block_number(&self) -> u64 { self.client.chain_info().best_block_number }

fn block_hash(&self, id: BlockId) -> Option<RpcH256> {
self.client.block_hash(id).map(Into::into)
fn block_hash(&self, id: BlockId) -> Option<::ethereum_types::H256> {
self.client.block_hash(id)
}

fn pending_transactions_hashes(&self) -> Vec<::ethereum_types::H256> {
Expand All @@ -548,6 +548,10 @@ impl<T: LightChainClient + 'static> Filterable for EthClient<T> {
fn polls(&self) -> &Mutex<PollManager<PollFilter>> {
&self.polls
}

fn canon_logs(&self, _block_hash: ::ethereum_types::H256, _filter: &EthcoreFilter) -> (Vec<Log>, u64) {
(Default::default(), 0)
}
}

fn extract_uncle_at_index<T: LightChainClient>(block: encoded::Block, index: Index, client: Arc<T>) -> Option<RichBlock> {
Expand Down
10 changes: 5 additions & 5 deletions rpc/src/v1/tests/mocked/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,8 @@ fn rpc_eth_logs() {
let request2 = r#"{"jsonrpc": "2.0", "method": "eth_getLogs", "params": [{"limit":1}], "id": 1}"#;
let request3 = r#"{"jsonrpc": "2.0", "method": "eth_getLogs", "params": [{"limit":0}], "id": 1}"#;

let response1 = r#"{"jsonrpc":"2.0","result":[{"address":"0x0000000000000000000000000000000000000000","blockHash":"0x0000000000000000000000000000000000000000000000000000000000000000","blockNumber":"0x1","data":"0x010203","logIndex":"0x0","topics":[],"transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionIndex":"0x0","transactionLogIndex":"0x0","type":"mined"},{"address":"0x0000000000000000000000000000000000000000","blockHash":"0x0000000000000000000000000000000000000000000000000000000000000000","blockNumber":"0x1","data":"0x010203","logIndex":"0x1","topics":[],"transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionIndex":"0x0","transactionLogIndex":"0x1","type":"mined"}],"id":1}"#;
let response2 = r#"{"jsonrpc":"2.0","result":[{"address":"0x0000000000000000000000000000000000000000","blockHash":"0x0000000000000000000000000000000000000000000000000000000000000000","blockNumber":"0x1","data":"0x010203","logIndex":"0x1","topics":[],"transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionIndex":"0x0","transactionLogIndex":"0x1","type":"mined"}],"id":1}"#;
let response1 = r#"{"jsonrpc":"2.0","result":[{"address":"0x0000000000000000000000000000000000000000","blockHash":"0x0000000000000000000000000000000000000000000000000000000000000000","blockNumber":"0x1","data":"0x010203","logIndex":"0x0","removed":false,"topics":[],"transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionIndex":"0x0","transactionLogIndex":"0x0","type":"mined"},{"address":"0x0000000000000000000000000000000000000000","blockHash":"0x0000000000000000000000000000000000000000000000000000000000000000","blockNumber":"0x1","data":"0x010203","logIndex":"0x1","removed":false,"topics":[],"transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionIndex":"0x0","transactionLogIndex":"0x1","type":"mined"}],"id":1}"#;
let response2 = r#"{"jsonrpc":"2.0","result":[{"address":"0x0000000000000000000000000000000000000000","blockHash":"0x0000000000000000000000000000000000000000000000000000000000000000","blockNumber":"0x1","data":"0x010203","logIndex":"0x1","removed":false,"topics":[],"transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionIndex":"0x0","transactionLogIndex":"0x1","type":"mined"}],"id":1}"#;
let response3 = r#"{"jsonrpc":"2.0","result":[],"id":1}"#;

assert_eq!(tester.io.handle_request_sync(request1), Some(response1.to_owned()));
Expand Down Expand Up @@ -274,8 +274,8 @@ fn rpc_logs_filter() {

let request_changes1 = r#"{"jsonrpc": "2.0", "method": "eth_getFilterChanges", "params": ["0x0"], "id": 1}"#;
let request_changes2 = r#"{"jsonrpc": "2.0", "method": "eth_getFilterChanges", "params": ["0x1"], "id": 1}"#;
let response1 = r#"{"jsonrpc":"2.0","result":[{"address":"0x0000000000000000000000000000000000000000","blockHash":"0x0000000000000000000000000000000000000000000000000000000000000000","blockNumber":"0x1","data":"0x010203","logIndex":"0x0","topics":[],"transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionIndex":"0x0","transactionLogIndex":"0x0","type":"mined"},{"address":"0x0000000000000000000000000000000000000000","blockHash":"0x0000000000000000000000000000000000000000000000000000000000000000","blockNumber":"0x1","data":"0x010203","logIndex":"0x1","topics":[],"transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionIndex":"0x0","transactionLogIndex":"0x1","type":"mined"}],"id":1}"#;
let response2 = r#"{"jsonrpc":"2.0","result":[{"address":"0x0000000000000000000000000000000000000000","blockHash":"0x0000000000000000000000000000000000000000000000000000000000000000","blockNumber":"0x1","data":"0x010203","logIndex":"0x1","topics":[],"transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionIndex":"0x0","transactionLogIndex":"0x1","type":"mined"}],"id":1}"#;
let response1 = r#"{"jsonrpc":"2.0","result":[{"address":"0x0000000000000000000000000000000000000000","blockHash":"0x0000000000000000000000000000000000000000000000000000000000000000","blockNumber":"0x1","data":"0x010203","logIndex":"0x0","removed":false,"topics":[],"transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionIndex":"0x0","transactionLogIndex":"0x0","type":"mined"},{"address":"0x0000000000000000000000000000000000000000","blockHash":"0x0000000000000000000000000000000000000000000000000000000000000000","blockNumber":"0x1","data":"0x010203","logIndex":"0x1","removed":false,"topics":[],"transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionIndex":"0x0","transactionLogIndex":"0x1","type":"mined"}],"id":1}"#;
let response2 = r#"{"jsonrpc":"2.0","result":[{"address":"0x0000000000000000000000000000000000000000","blockHash":"0x0000000000000000000000000000000000000000000000000000000000000000","blockNumber":"0x1","data":"0x010203","logIndex":"0x1","removed":false,"topics":[],"transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionIndex":"0x0","transactionLogIndex":"0x1","type":"mined"}],"id":1}"#;

assert_eq!(tester.io.handle_request_sync(request_changes1), Some(response1.to_owned()));
assert_eq!(tester.io.handle_request_sync(request_changes2), Some(response2.to_owned()));
Expand Down Expand Up @@ -1043,7 +1043,7 @@ fn rpc_eth_transaction_receipt() {
"params": ["0xb903239f8543d04b5dc1ba6579132b143087c68db1b2168786408fcbce568238"],
"id": 1
}"#;
let response = r#"{"jsonrpc":"2.0","result":{"blockHash":"0xed76641c68a1c641aee09a94b3b471f4dc0316efe5ac19cf488e2674cf8d05b5","blockNumber":"0x4510c","contractAddress":null,"cumulativeGasUsed":"0x20","from":"0xb60e8dd61c5d32be8058bb8eb970870f07233155","gasUsed":"0x10","logs":[{"address":"0x33990122638b9132ca29c723bdf037f1a891a70c","blockHash":"0xed76641c68a1c641aee09a94b3b471f4dc0316efe5ac19cf488e2674cf8d05b5","blockNumber":"0x4510c","data":"0x","logIndex":"0x1","topics":["0xa6697e974e6a320f454390be03f74955e8978f1a6971ea6730542e37b66179bc","0x4861736852656700000000000000000000000000000000000000000000000000"],"transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionIndex":"0x0","transactionLogIndex":"0x0","type":"mined"}],"logsBloom":"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","root":"0x0000000000000000000000000000000000000000000000000000000000000000","status":null,"to":"0xd46e8dd67c5d32be8058bb8eb970870f07244567","transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionIndex":"0x0"},"id":1}"#;
let response = r#"{"jsonrpc":"2.0","result":{"blockHash":"0xed76641c68a1c641aee09a94b3b471f4dc0316efe5ac19cf488e2674cf8d05b5","blockNumber":"0x4510c","contractAddress":null,"cumulativeGasUsed":"0x20","from":"0xb60e8dd61c5d32be8058bb8eb970870f07233155","gasUsed":"0x10","logs":[{"address":"0x33990122638b9132ca29c723bdf037f1a891a70c","blockHash":"0xed76641c68a1c641aee09a94b3b471f4dc0316efe5ac19cf488e2674cf8d05b5","blockNumber":"0x4510c","data":"0x","logIndex":"0x1","removed":false,"topics":["0xa6697e974e6a320f454390be03f74955e8978f1a6971ea6730542e37b66179bc","0x4861736852656700000000000000000000000000000000000000000000000000"],"transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionIndex":"0x0","transactionLogIndex":"0x0","type":"mined"}],"logsBloom":"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","root":"0x0000000000000000000000000000000000000000000000000000000000000000","status":null,"to":"0xd46e8dd67c5d32be8058bb8eb970870f07244567","transactionHash":"0x0000000000000000000000000000000000000000000000000000000000000000","transactionIndex":"0x0"},"id":1}"#;

assert_eq!(tester.io.handle_request_sync(request), Some(response.to_owned()));
}
Expand Down
4 changes: 2 additions & 2 deletions rpc/src/v1/tests/mocked/eth_pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,15 +131,15 @@ fn should_subscribe_to_logs() {
// Check notifications (enacted)
handler.new_blocks(vec![], vec![], ChainRoute::new(vec![(h1, ChainRouteType::Enacted)]), vec![], vec![], DURATION_ZERO);
let (res, receiver) = receiver.into_future().wait().unwrap();
let response = r#"{"jsonrpc":"2.0","method":"eth_subscription","params":{"result":{"address":"0x0000000000000000000000000000000000000005","blockHash":"0x3457d2fa2e3dd33c78ac681cf542e429becf718859053448748383af67e23218","blockNumber":"0x1","data":"0x","logIndex":"0x0","topics":["0x0000000000000000000000000000000000000000000000000000000000000001","0x0000000000000000000000000000000000000000000000000000000000000002","0x0000000000000000000000000000000000000000000000000000000000000000","0x0000000000000000000000000000000000000000000000000000000000000000"],"transactionHash":""#.to_owned()
let response = r#"{"jsonrpc":"2.0","method":"eth_subscription","params":{"result":{"address":"0x0000000000000000000000000000000000000005","blockHash":"0x3457d2fa2e3dd33c78ac681cf542e429becf718859053448748383af67e23218","blockNumber":"0x1","data":"0x","logIndex":"0x0","removed":false,"topics":["0x0000000000000000000000000000000000000000000000000000000000000001","0x0000000000000000000000000000000000000000000000000000000000000002","0x0000000000000000000000000000000000000000000000000000000000000000","0x0000000000000000000000000000000000000000000000000000000000000000"],"transactionHash":""#.to_owned()
+ &format!("0x{:x}", tx_hash)
+ r#"","transactionIndex":"0x0","transactionLogIndex":"0x0","type":"mined"},"subscription":"0x416d77337e24399d"}}"#;
assert_eq!(res, Some(response.into()));

// Check notifications (retracted)
handler.new_blocks(vec![], vec![], ChainRoute::new(vec![(h1, ChainRouteType::Retracted)]), vec![], vec![], DURATION_ZERO);
let (res, receiver) = receiver.into_future().wait().unwrap();
let response = r#"{"jsonrpc":"2.0","method":"eth_subscription","params":{"result":{"address":"0x0000000000000000000000000000000000000005","blockHash":"0x3457d2fa2e3dd33c78ac681cf542e429becf718859053448748383af67e23218","blockNumber":"0x1","data":"0x","logIndex":"0x0","topics":["0x0000000000000000000000000000000000000000000000000000000000000001","0x0000000000000000000000000000000000000000000000000000000000000002","0x0000000000000000000000000000000000000000000000000000000000000000","0x0000000000000000000000000000000000000000000000000000000000000000"],"transactionHash":""#.to_owned()
let response = r#"{"jsonrpc":"2.0","method":"eth_subscription","params":{"result":{"address":"0x0000000000000000000000000000000000000005","blockHash":"0x3457d2fa2e3dd33c78ac681cf542e429becf718859053448748383af67e23218","blockNumber":"0x1","data":"0x","logIndex":"0x0","removed":true,"topics":["0x0000000000000000000000000000000000000000000000000000000000000001","0x0000000000000000000000000000000000000000000000000000000000000002","0x0000000000000000000000000000000000000000000000000000000000000000","0x0000000000000000000000000000000000000000000000000000000000000000"],"transactionHash":""#.to_owned()
+ &format!("0x{:x}", tx_hash)
+ r#"","transactionIndex":"0x0","transactionLogIndex":"0x0","type":"removed"},"subscription":"0x416d77337e24399d"}}"#;
assert_eq!(res, Some(response.into()));
Expand Down
Loading