Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make eth1 caching work with fast synced node #709

Merged
merged 26 commits into from
Dec 19, 2019
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
f82e275
Add functions to get deposit_count and deposit_root from deposit cache
pawanjay176 Dec 10, 2019
e331816
Fetch deposit root and deposit count from cache
pawanjay176 Dec 10, 2019
9fbc01c
Fix bugs
pawanjay176 Dec 11, 2019
365d6a2
Add test
pawanjay176 Dec 11, 2019
e3d0325
Compare deposit_count between the caching and http eth1 blocks
pawanjay176 Dec 11, 2019
a8e99da
Revert "Compare deposit_count between the caching and http eth1 blocks"
pawanjay176 Dec 11, 2019
bba7a5d
Fetch deposit cache using binary search instead of linear search
pawanjay176 Dec 13, 2019
3f4f88c
BlockCache waits till DepositCache is in sync
pawanjay176 Dec 13, 2019
1ec606c
Merge branch 'master' into eth1-fast-sync
pawanjay176 Dec 16, 2019
9f14df3
Truncate required_blocks in block_cache upto latest_processed_block i…
pawanjay176 Dec 16, 2019
69f49de
Clean up
pawanjay176 Dec 16, 2019
8d0b5db
Handled getting deposit count before deploying deposit contract
pawanjay176 Dec 16, 2019
ffda83f
More cleanup
pawanjay176 Dec 16, 2019
921dd4a
Remove calls to http get deposit/count
pawanjay176 Dec 16, 2019
cc246aa
Fix block cache tests
pawanjay176 Dec 17, 2019
754eaa2
Merge branch 'master' into eth1-fast-sync
pawanjay176 Dec 17, 2019
7706f8f
Minor changes
pawanjay176 Dec 17, 2019
e709f7e
Fix bootnode ports
pawanjay176 Dec 17, 2019
6378db8
Merge branch 'master' into eth1-fast-sync
pawanjay176 Dec 18, 2019
63c8616
Address some of Paul's comments
pawanjay176 Dec 18, 2019
0777c04
Optimize `get_deposit_root` by caching `DepositDataTree`
pawanjay176 Dec 18, 2019
39a4871
Fix comments and minor changes
pawanjay176 Dec 18, 2019
040a57d
Change eth1 default config parameters
pawanjay176 Dec 18, 2019
35ac316
Use `Vec` instead of `HashMap` to store `deposit_roots`
pawanjay176 Dec 18, 2019
5beb4c8
Minor renaming
pawanjay176 Dec 19, 2019
7132c3c
Merge branch 'master' into eth1-fast-sync
pawanjay176 Dec 19, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 73 additions & 1 deletion beacon_node/eth1/src/deposit_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ use eth2_hashing::hash;
use tree_hash::TreeHash;
use types::{Deposit, Hash256};

const DEPOSIT_CONTRACT_TREE_DEPTH: usize = 32;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This const tends to keep getting duplicated around, perhaps we can import this instead:

pub const DEPOSIT_TREE_DEPTH: usize = 32;


#[derive(Debug, PartialEq, Clone)]
pub enum Error {
/// A deposit log was added when a prior deposit was not already in the cache.
Expand Down Expand Up @@ -71,13 +73,34 @@ impl DepositDataTree {
/// Mirrors the merkle tree of deposits in the eth1 deposit contract.
///
/// Provides `Deposit` objects with merkle proofs included.
#[derive(Default)]
pub struct DepositCache {
logs: Vec<DepositLog>,
roots: Vec<Hash256>,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@paulhauner Perhaps this should be named leafs instead of roots?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed

deposit_contract_deploy_block: u64,
}

impl Default for DepositCache {
fn default() -> Self {
DepositCache {
logs: Vec::new(),
roots: Vec::new(),
// 0 to be compatible with Service::Config. Should be ideally 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we change Service::Config? Your suggestion seems quite reasonable.

deposit_contract_deploy_block: 0,
}
}
}

impl DepositCache {
/// Create new `DepositCache` given block number at which deposit
/// contract was deployed.
pub fn new(deposit_contract_deploy_block: u64) -> Self {
DepositCache {
logs: Vec::new(),
roots: Vec::new(),
deposit_contract_deploy_block,
}
}

/// Returns the number of deposits available in the cache.
pub fn len(&self) -> usize {
self.logs.len()
Expand Down Expand Up @@ -203,6 +226,55 @@ impl DepositCache {
Ok((tree.root(), deposits))
}
}

/// Gets the deposit count at block height = block_number.
///
/// Fetches the `DepositLog` that was emitted at or just before `block_number`
/// and returns the deposit count as `index + 1`.
///
/// Returns `None` if block number queried is 0 or less than deposit_contract_deployed block.
pub fn get_deposit_count_from_cache(&self, block_number: u64) -> Option<u64> {
// Contract cannot be deployed in 0'th block
if block_number == 0 {
return None;
}
if block_number < self.deposit_contract_deploy_block {
return None;
}
// Return 0 if block_num queried is before first deposit
if let Some(first_deposit) = self.logs.first() {
if first_deposit.block_number > block_number {
return Some(0);
}
}
let index = self
.logs
.binary_search_by(|deposit| deposit.block_number.cmp(&block_number));
match index {
Ok(index) => return self.logs.get(index).map(|x| x.index + 1),
Err(prev) => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be next instead of prev?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup you are right

return Some(
self.logs
.get(prev.saturating_sub(1))
.map_or(0, |x| x.index + 1),
)
}
}
}

/// Gets the deposit root at block height = block_number.
///
/// Fetches the `DepositLog` that was emitted at or just before `block_number`
/// and returns the deposit root at that state.
///
/// Note: This method can be potentially optimized by not recreating the `DepositDataTree`
/// at every invocation and caching the tree upto the last added deposit.
pub fn get_deposit_root_from_cache(&self, block_number: u64) -> Option<Hash256> {
let index = self.get_deposit_count_from_cache(block_number)?;
let roots = self.roots.get(0..index as usize)?;
let tree = DepositDataTree::create(roots, index as usize, DEPOSIT_CONTRACT_TREE_DEPTH);
Copy link
Member

@paulhauner paulhauner Dec 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As you mentioned, this is going to be fairly inefficient.

For our testnet, this means that if we sync a cache of 4,096 blocks we're going to have to find the root of a list of 16,384 for each of those blocks. I just benched a tree hash root of 16k hashes at 8ms, so we're looking at 32 secs of hashing just to fill the cache.

I think a fairly easy solution to this would be to attach a DepositDataTree and a Vec<u64, Hash256> to the deposit cache. Each time we import a deposit we add the deposit to the DepositDataTree and then push the new root into the Vec. In this scenario, we incrementally build the deposit tree once and when we want to resolve a block number to a deposit root we can just binary search the Vec.

Thoughts?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, this sounds perfect 👍

Some(tree.root())
}
}

/// Returns `int` as little-endian bytes with a length of 32.
Expand Down
10 changes: 10 additions & 0 deletions beacon_node/eth1/src/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,16 @@ pub struct DepositUpdater {
pub last_processed_block: Option<u64>,
}

impl DepositUpdater {
pub fn new(deposit_contract_deploy_block: u64) -> Self {
let cache = DepositCache::new(deposit_contract_deploy_block);
DepositUpdater {
cache,
last_processed_block: None,
}
}
}

#[derive(Default)]
pub struct Inner {
pub block_cache: RwLock<BlockCache>,
Expand Down
57 changes: 28 additions & 29 deletions beacon_node/eth1/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@ use crate::metrics;
use crate::{
block_cache::{BlockCache, Error as BlockCacheError, Eth1Block},
deposit_cache::Error as DepositCacheError,
http::{
get_block, get_block_number, get_deposit_count, get_deposit_logs_in_range, get_deposit_root,
},
http::{get_block, get_block_number, get_deposit_logs_in_range},
inner::{DepositUpdater, Inner},
DepositLog,
};
Expand All @@ -27,10 +25,6 @@ const STANDARD_TIMEOUT_MILLIS: u64 = 15_000;
const BLOCK_NUMBER_TIMEOUT_MILLIS: u64 = STANDARD_TIMEOUT_MILLIS;
/// Timeout when doing an eth_getBlockByNumber call.
const GET_BLOCK_TIMEOUT_MILLIS: u64 = STANDARD_TIMEOUT_MILLIS;
/// Timeout when doing an eth_call to read the deposit contract root.
const GET_DEPOSIT_ROOT_TIMEOUT_MILLIS: u64 = STANDARD_TIMEOUT_MILLIS;
/// Timeout when doing an eth_call to read the deposit contract deposit count.
const GET_DEPOSIT_COUNT_TIMEOUT_MILLIS: u64 = STANDARD_TIMEOUT_MILLIS;
/// Timeout when doing an eth_getLogs to read the deposit contract logs.
const GET_DEPOSIT_LOG_TIMEOUT_MILLIS: u64 = STANDARD_TIMEOUT_MILLIS;

Expand Down Expand Up @@ -147,6 +141,9 @@ impl Service {
pub fn new(config: Config, log: Logger) -> Self {
Self {
inner: Arc::new(Inner {
deposit_cache: RwLock::new(DepositUpdater::new(
config.deposit_contract_deploy_block,
)),
config: RwLock::new(config),
..Inner::default()
}),
Expand Down Expand Up @@ -254,6 +251,7 @@ impl Service {
"Updated eth1 deposit cache";
"cached_deposits" => inner_1.deposit_cache.read().cache.len(),
"logs_imported" => logs_imported,
"last_processed_eth1_block" => inner_1.deposit_cache.read().last_processed_block,
),
Err(e) => error!(
log_a,
Expand Down Expand Up @@ -491,6 +489,7 @@ impl Service {
let cache_3 = self.inner.clone();
let cache_4 = self.inner.clone();
let cache_5 = self.inner.clone();
let cache_6 = self.inner.clone();

let block_cache_truncation = self.config().block_cache_truncation;
let max_blocks_per_update = self
Expand Down Expand Up @@ -527,7 +526,6 @@ impl Service {
let max_size = block_cache_truncation
.map(|n| n as u64)
.unwrap_or_else(u64::max_value);

if range_size > max_size {
// If the range of required blocks is larger than `max_size`, drop all
// existing blocks and download `max_size` count of blocks.
Expand All @@ -543,14 +541,22 @@ impl Service {
})
// Download the range of blocks and sequentially import them into the cache.
.and_then(move |required_block_numbers| {
// Last processed block in deposit cache
let latest_in_cache = cache_6
.deposit_cache
.read()
.last_processed_block
.unwrap_or(0);

let required_block_numbers = required_block_numbers
.into_iter()
.take(max_blocks_per_update);

.filter(|x| *x <= latest_in_cache)
.take(max_blocks_per_update)
.collect::<Vec<_>>();
// Produce a stream from the list of required block numbers and return a future that
// consumes the it.
stream::unfold(
required_block_numbers,
required_block_numbers.into_iter(),
move |mut block_numbers| match block_numbers.next() {
Some(block_number) => Some(
download_eth1_block(cache_2.clone(), block_number)
Expand Down Expand Up @@ -639,31 +645,24 @@ fn download_eth1_block<'a>(
cache: Arc<Inner>,
block_number: u64,
) -> impl Future<Item = Eth1Block, Error = Error> + 'a {
let deposit_root = cache
.deposit_cache
.read()
.cache
.get_deposit_root_from_cache(block_number);
let deposit_count = cache
.deposit_cache
.read()
.cache
.get_deposit_count_from_cache(block_number);
// Performs a `get_blockByNumber` call to an eth1 node.
get_block(
&cache.config.read().endpoint,
block_number,
Duration::from_millis(GET_BLOCK_TIMEOUT_MILLIS),
)
.map_err(Error::BlockDownloadFailed)
.join3(
// Perform 2x `eth_call` via an eth1 node to read the deposit contract root and count.
get_deposit_root(
&cache.config.read().endpoint,
&cache.config.read().deposit_contract_address,
block_number,
Duration::from_millis(GET_DEPOSIT_ROOT_TIMEOUT_MILLIS),
)
.map_err(Error::GetDepositRootFailed),
get_deposit_count(
&cache.config.read().endpoint,
&cache.config.read().deposit_contract_address,
block_number,
Duration::from_millis(GET_DEPOSIT_COUNT_TIMEOUT_MILLIS),
)
.map_err(Error::GetDepositCountFailed),
)
.map(|(http_block, deposit_root, deposit_count)| Eth1Block {
.map(move |http_block| Eth1Block {
hash: http_block.hash,
number: http_block.number,
timestamp: http_block.timestamp,
Expand Down
104 changes: 98 additions & 6 deletions beacon_node/eth1/tests/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ mod auto_update {

// NOTE: this test is sensitive to the response speed of the external web3 server. If
// you're experiencing failures, try increasing the update_interval.
let update_interval = Duration::from_millis(2_000);
let update_interval = Duration::from_millis(3000);

assert_eq!(
service.block_cache_len(),
Expand Down Expand Up @@ -235,9 +235,12 @@ mod eth1_cache {
.expect("should mine block");
}

runtime
.block_on(service.update_deposit_cache())
.expect("should update deposit cache");
runtime
.block_on(service.update_block_cache())
.expect("should update cache");
.expect("should update block cache");

runtime
.block_on(service.update_block_cache())
Expand Down Expand Up @@ -294,9 +297,12 @@ mod eth1_cache {
.expect("should mine block")
}

runtime
.block_on(service.update_deposit_cache())
.expect("should update deposit cache");
runtime
.block_on(service.update_block_cache())
.expect("should update cache");
.expect("should update block cache");

assert_eq!(
service.block_cache_len(),
Expand Down Expand Up @@ -339,9 +345,12 @@ mod eth1_cache {
.block_on(eth1.ganache.evm_mine())
.expect("should mine block")
}
runtime
.block_on(service.update_deposit_cache())
.expect("should update deposit cache");
runtime
.block_on(service.update_block_cache())
.expect("should update cache");
.expect("should update block cache");
}

assert_eq!(
Expand Down Expand Up @@ -381,14 +390,20 @@ mod eth1_cache {
.block_on(eth1.ganache.evm_mine())
.expect("should mine block")
}

runtime
.block_on(
service
.update_deposit_cache()
.join(service.update_deposit_cache()),
)
.expect("should perform two simultaneous updates of deposit cache");
runtime
.block_on(
service
.update_block_cache()
.join(service.update_block_cache()),
)
.expect("should perform two simultaneous updates");
.expect("should perform two simultaneous updates of block cache");

assert!(service.block_cache_len() >= n, "should grow the cache");
}
Expand Down Expand Up @@ -711,3 +726,80 @@ mod http {
}
}
}

mod fast {
use super::*;

// Adds deposits into deposit cache and matches deposit_count and deposit_root
// with the deposit count and root computed from the deposit cache.
#[test]
fn deposit_cache_query() {
let mut env = new_env();
let log = env.core_context().log;
let runtime = env.runtime();

let eth1 = runtime
.block_on(GanacheEth1Instance::new())
.expect("should start eth1 environment");
let deposit_contract = &eth1.deposit_contract;
let web3 = eth1.web3();

let now = get_block_number(runtime, &web3);
let service = Service::new(
Config {
endpoint: eth1.endpoint(),
deposit_contract_address: deposit_contract.address(),
deposit_contract_deploy_block: now,
lowest_cached_block_number: now,
follow_distance: 0,
block_cache_truncation: None,
..Config::default()
},
log,
);
let n = 10;
let deposits: Vec<_> = (0..n).into_iter().map(|_| random_deposit_data()).collect();
for deposit in &deposits {
deposit_contract
.deposit(runtime, deposit.clone())
.expect("should perform a deposit");
// Mine an extra block between deposits to test for corner cases
runtime
.block_on(eth1.ganache.evm_mine())
.expect("should mine block");
}

runtime
.block_on(service.update_deposit_cache())
.expect("should perform update");

assert!(
service.deposit_cache_len() >= n,
"should have imported n deposits"
);

for block_num in 0..=get_block_number(runtime, &web3) {
let expected_deposit_count = blocking_deposit_count(runtime, &eth1, block_num);
let expected_deposit_root = blocking_deposit_root(runtime, &eth1, block_num);

let deposit_count = service
.deposits()
.read()
.cache
.get_deposit_count_from_cache(block_num);
let deposit_root = service
.deposits()
.read()
.cache
.get_deposit_root_from_cache(block_num);
assert_eq!(
expected_deposit_count, deposit_count,
"deposit count from cache should match queried"
);
assert_eq!(
expected_deposit_root, deposit_root,
"deposit root from cache should match queried"
);
}
}
}
Loading