Skip to content

Commit

Permalink
add notes about ratelimits
Browse files Browse the repository at this point in the history
  • Loading branch information
sslivkoff committed Jul 6, 2023
1 parent f0beae4 commit 555ef0d
Show file tree
Hide file tree
Showing 9 changed files with 199 additions and 143 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

*`cryo` is an early WIP, please report bugs + feedback to the issue tracker*

*note that `cryo`'s default settings will slam a node too hard for use with 3rd party RPC providers. Instead, `--requests-per-second` and `--max-concurrent-requests` should be used to impose ratelimits. Such settings will be handled automatically in a future release*.

## Example Usage

use as `cryo <dataset> [OPTIONS]`
Expand Down
102 changes: 55 additions & 47 deletions crates/freeze/src/datasets/blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,10 @@ async fn fetch_blocks(
.map_err(CollectError::ProviderError);
match tx.send(block).await {
Ok(_) => {}
Err(tokio::sync::mpsc::error::SendError(_e)) => println!("send error"),
Err(tokio::sync::mpsc::error::SendError(_e)) => {
eprintln!("send error, try using a rate limit with --requests-per-second or limiting max concurrency with --max-concurrent-requests");
std::process::exit(1)
}
}
});
}
Expand Down Expand Up @@ -145,52 +148,57 @@ async fn blocks_to_df(
let mut base_fee_per_gas: Vec<Option<u64>> = Vec::with_capacity(capacity);

let mut n_rows = 0;
while let Some(Ok(Some(block))) = blocks.recv().await {
if let (Some(n), Some(h), Some(a)) = (block.number, block.hash, block.author) {
n_rows += 1;

if include_hash {
hash.push(h.as_bytes().to_vec());
}
if include_parent_hash {
parent_hash.push(block.parent_hash.as_bytes().to_vec());
}
if include_author {
author.push(a.as_bytes().to_vec());
}
if include_state_root {
state_root.push(block.state_root.as_bytes().to_vec());
}
if include_transactions_root {
transactions_root.push(block.transactions_root.as_bytes().to_vec());
}
if include_receipts_root {
receipts_root.push(block.receipts_root.as_bytes().to_vec());
}
if include_number {
number.push(n.as_u32())
}
if include_gas_used {
gas_used.push(block.gas_used.as_u32());
}
if include_extra_data {
extra_data.push(block.extra_data.to_vec());
}
if include_logs_bloom {
logs_bloom.push(block.logs_bloom.map(|x| x.0.to_vec()));
}
if include_timestamp {
timestamp.push(block.timestamp.as_u32());
}
if include_total_difficulty {
total_difficulty.push(block.total_difficulty.map(|x| x.to_vec_u8()));
}
if include_size {
size.push(block.size.map(|x| x.as_u32()));
}
if include_base_fee_per_gas {
base_fee_per_gas.push(block.base_fee_per_gas.map(|value| value.as_u64()));
}
while let Some(message) = blocks.recv().await {
match message {
Ok(Some(block)) => {
if let (Some(n), Some(h), Some(a)) = (block.number, block.hash, block.author) {
n_rows += 1;

if include_hash {
hash.push(h.as_bytes().to_vec());
}
if include_parent_hash {
parent_hash.push(block.parent_hash.as_bytes().to_vec());
}
if include_author {
author.push(a.as_bytes().to_vec());
}
if include_state_root {
state_root.push(block.state_root.as_bytes().to_vec());
}
if include_transactions_root {
transactions_root.push(block.transactions_root.as_bytes().to_vec());
}
if include_receipts_root {
receipts_root.push(block.receipts_root.as_bytes().to_vec());
}
if include_number {
number.push(n.as_u32())
}
if include_gas_used {
gas_used.push(block.gas_used.as_u32());
}
if include_extra_data {
extra_data.push(block.extra_data.to_vec());
}
if include_logs_bloom {
logs_bloom.push(block.logs_bloom.map(|x| x.0.to_vec()));
}
if include_timestamp {
timestamp.push(block.timestamp.as_u32());
}
if include_total_difficulty {
total_difficulty.push(block.total_difficulty.map(|x| x.to_vec_u8()));
}
if include_size {
size.push(block.size.map(|x| x.as_u32()));
}
if include_base_fee_per_gas {
base_fee_per_gas.push(block.base_fee_per_gas.map(|value| value.as_u64()));
}
}
}
_ => return Err(CollectError::TooManyRequestsError),
}
}

Expand Down
111 changes: 60 additions & 51 deletions crates/freeze/src/datasets/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,10 @@ async fn fetch_logs(
.map_err(CollectError::ProviderError);
match tx.send(result).await {
Ok(_) => {}
Err(tokio::sync::mpsc::error::SendError(_e)) => println!("send error"),
Err(tokio::sync::mpsc::error::SendError(_e)) => {
eprintln!("send error, try using a rate limit with --requests-per-second or limiting max concurrency with --max-concurrent-requests");
std::process::exit(1)
}
}
});
}
Expand All @@ -129,60 +132,66 @@ async fn logs_to_df(
let mut data: Vec<Vec<u8>> = Vec::new();

let mut n_rows = 0;
while let Some(Ok(logs)) = logs.recv().await {
for log in logs.iter() {
if let Some(true) = log.removed {
continue;
}
if let (Some(bn), Some(tx), Some(ti), Some(li)) = (
log.block_number,
log.transaction_hash,
log.transaction_index,
log.log_index,
) {
n_rows += 1;
address.push(log.address.as_bytes().to_vec());
match log.topics.len() {
0 => {
topic0.push(None);
topic1.push(None);
topic2.push(None);
topic3.push(None);
}
1 => {
topic0.push(Some(log.topics[0].as_bytes().to_vec()));
topic1.push(None);
topic2.push(None);
topic3.push(None);
}
2 => {
topic0.push(Some(log.topics[0].as_bytes().to_vec()));
topic1.push(Some(log.topics[1].as_bytes().to_vec()));
topic2.push(None);
topic3.push(None);
}
3 => {
topic0.push(Some(log.topics[0].as_bytes().to_vec()));
topic1.push(Some(log.topics[1].as_bytes().to_vec()));
topic2.push(Some(log.topics[2].as_bytes().to_vec()));
topic3.push(None);
}
4 => {
topic0.push(Some(log.topics[0].as_bytes().to_vec()));
topic1.push(Some(log.topics[1].as_bytes().to_vec()));
topic2.push(Some(log.topics[2].as_bytes().to_vec()));
topic3.push(Some(log.topics[3].as_bytes().to_vec()));
// while let Some(Ok(logs)) = logs.recv().await {
while let Some(message) = logs.recv().await {
match message {
Ok(logs) => {
for log in logs.iter() {
if let Some(true) = log.removed {
continue;
}
_ => {
return Err(CollectError::InvalidNumberOfTopics);
if let (Some(bn), Some(tx), Some(ti), Some(li)) = (
log.block_number,
log.transaction_hash,
log.transaction_index,
log.log_index,
) {
n_rows += 1;
address.push(log.address.as_bytes().to_vec());
match log.topics.len() {
0 => {
topic0.push(None);
topic1.push(None);
topic2.push(None);
topic3.push(None);
}
1 => {
topic0.push(Some(log.topics[0].as_bytes().to_vec()));
topic1.push(None);
topic2.push(None);
topic3.push(None);
}
2 => {
topic0.push(Some(log.topics[0].as_bytes().to_vec()));
topic1.push(Some(log.topics[1].as_bytes().to_vec()));
topic2.push(None);
topic3.push(None);
}
3 => {
topic0.push(Some(log.topics[0].as_bytes().to_vec()));
topic1.push(Some(log.topics[1].as_bytes().to_vec()));
topic2.push(Some(log.topics[2].as_bytes().to_vec()));
topic3.push(None);
}
4 => {
topic0.push(Some(log.topics[0].as_bytes().to_vec()));
topic1.push(Some(log.topics[1].as_bytes().to_vec()));
topic2.push(Some(log.topics[2].as_bytes().to_vec()));
topic3.push(Some(log.topics[3].as_bytes().to_vec()));
}
_ => {
return Err(CollectError::InvalidNumberOfTopics);
}
}
data.push(log.data.clone().to_vec());
block_number.push(bn.as_u32());
transaction_hash.push(tx.as_bytes().to_vec());
transaction_index.push(ti.as_u32());
log_index.push(li.as_u32());
}
}
data.push(log.data.clone().to_vec());
block_number.push(bn.as_u32());
transaction_hash.push(tx.as_bytes().to_vec());
transaction_index.push(ti.as_u32());
log_index.push(li.as_u32());
}
_ => return Err(CollectError::TooManyRequestsError),
}
}

Expand Down
34 changes: 24 additions & 10 deletions crates/freeze/src/datasets/state_diffs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ async fn state_diffs_to_df(

// storage
let include_storage_block_number = included(schemas, Datatype::StorageDiffs, "block_number");
let include_storage_transaction_index = included(schemas, Datatype::StorageDiffs, "transaction_index");
let include_storage_transaction_index =
included(schemas, Datatype::StorageDiffs, "transaction_index");
let include_storage_transaction_hash =
included(schemas, Datatype::StorageDiffs, "transaction_hash");
let include_storage_address = included(schemas, Datatype::StorageDiffs, "address");
Expand All @@ -113,7 +114,8 @@ async fn state_diffs_to_df(

// balance
let include_balance_block_number = included(schemas, Datatype::BalanceDiffs, "block_number");
let include_balance_transaction_index = included(schemas, Datatype::BalanceDiffs, "transaction_index");
let include_balance_transaction_index =
included(schemas, Datatype::BalanceDiffs, "transaction_index");
let include_balance_transaction_hash =
included(schemas, Datatype::BalanceDiffs, "transaction_hash");
let include_balance_address = included(schemas, Datatype::BalanceDiffs, "address");
Expand All @@ -128,7 +130,8 @@ async fn state_diffs_to_df(

// nonce
let include_nonce_block_number = included(schemas, Datatype::NonceDiffs, "block_number");
let include_nonce_transaction_index = included(schemas, Datatype::NonceDiffs, "transaction_index");
let include_nonce_transaction_index =
included(schemas, Datatype::NonceDiffs, "transaction_index");
let include_nonce_transaction_hash =
included(schemas, Datatype::NonceDiffs, "transaction_hash");
let include_nonce_address = included(schemas, Datatype::NonceDiffs, "address");
Expand All @@ -143,7 +146,8 @@ async fn state_diffs_to_df(

// code
let include_code_block_number = included(schemas, Datatype::CodeDiffs, "block_number");
let include_code_transaction_index = included(schemas, Datatype::CodeDiffs, "transaction_index");
let include_code_transaction_index =
included(schemas, Datatype::CodeDiffs, "transaction_index");
let include_code_transaction_hash = included(schemas, Datatype::CodeDiffs, "transaction_hash");
let include_code_address = included(schemas, Datatype::CodeDiffs, "address");
let include_code_from_value = included(schemas, Datatype::CodeDiffs, "from_value");
Expand All @@ -160,7 +164,9 @@ async fn state_diffs_to_df(
match message {
(block_num, Ok(blocks_traces)) => {
for ts in blocks_traces.iter() {
if let (Some(tx), Some(StateDiff(state_diff))) = (ts.transaction_hash, &ts.state_diff) {
if let (Some(tx), Some(StateDiff(state_diff))) =
(ts.transaction_hash, &ts.state_diff)
{
for (addr, addr_diff) in state_diff.iter() {
n_rows += n_rows;

Expand Down Expand Up @@ -233,7 +239,9 @@ async fn state_diffs_to_df(
Diff::Same => (0u64, 0u64),
Diff::Born(value) => (0u64, value.as_u64()),
Diff::Died(value) => (value.as_u64(), 0u64),
Diff::Changed(ChangedType { from, to }) => (from.as_u64(), to.as_u64()),
Diff::Changed(ChangedType { from, to }) => {
(from.as_u64(), to.as_u64())
}
};
if include_nonce_block_number {
nonce_block_number.push(block_num);
Expand Down Expand Up @@ -262,9 +270,15 @@ async fn state_diffs_to_df(
H256::zero().as_bytes().to_vec(),
H256::zero().as_bytes().to_vec(),
),
Diff::Born(value) => (H256::zero().as_bytes().to_vec(), value.to_vec()),
Diff::Died(value) => (value.to_vec(), H256::zero().as_bytes().to_vec()),
Diff::Changed(ChangedType { from, to }) => (from.to_vec(), to.to_vec()),
Diff::Born(value) => {
(H256::zero().as_bytes().to_vec(), value.to_vec())
}
Diff::Died(value) => {
(value.to_vec(), H256::zero().as_bytes().to_vec())
}
Diff::Changed(ChangedType { from, to }) => {
(from.to_vec(), to.to_vec())
}
};
if include_code_block_number {
code_block_number.push(block_num);
Expand All @@ -289,7 +303,7 @@ async fn state_diffs_to_df(
}
}
}
_ => { return Err(CollectError::TooManyRequestsError) }
_ => return Err(CollectError::TooManyRequestsError),
}
}

Expand Down
10 changes: 7 additions & 3 deletions crates/freeze/src/datasets/traces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,10 @@ impl Dataset for Traces {
}

fn default_sort(&self) -> Vec<String> {
vec!["block_number".to_string(), "transaction_position".to_string()]
vec![
"block_number".to_string(),
"transaction_position".to_string(),
]
}

async fn collect_chunk(
Expand Down Expand Up @@ -256,7 +259,8 @@ async fn traces_to_df(
action_input.push(Some(a.input.to_vec()));
}
if include_action_call_type {
action_call_type.push(Some(action_call_type_to_string(&a.call_type)));
action_call_type
.push(Some(action_call_type_to_string(&a.call_type)));
}

if include_action_init {
Expand Down Expand Up @@ -430,7 +434,7 @@ async fn traces_to_df(
}
}
}
_ => { return Err(CollectError::TooManyRequestsError) }
_ => return Err(CollectError::TooManyRequestsError),
}
}

Expand Down
Loading

0 comments on commit 555ef0d

Please sign in to comment.