From 5003dd308a669effa4a268874c08962f9d19eb0f Mon Sep 17 00:00:00 2001 From: sslivkoff Date: Thu, 10 Aug 2023 19:05:02 -0700 Subject: [PATCH] add contracts and native_transfers datasets --- Cargo.lock | 32 ++- crates/cli/src/parse/query.rs | 2 + crates/freeze/Cargo.toml | 1 + crates/freeze/src/datasets/contracts.rs | 248 ++++++++++++++++++ crates/freeze/src/datasets/mod.rs | 2 + .../freeze/src/datasets/native_transfers.rs | 230 ++++++++++++++++ crates/freeze/src/datasets/traces.rs | 4 +- crates/freeze/src/types/datatypes/scalar.rs | 10 + 8 files changed, 521 insertions(+), 8 deletions(-) create mode 100644 crates/freeze/src/datasets/contracts.rs create mode 100644 crates/freeze/src/datasets/native_transfers.rs diff --git a/Cargo.lock b/Cargo.lock index ed38ff0f..06a1a495 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -715,8 +715,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7e959d788268e3bf9d35ace83e81b124190378e4c91c9067524675e33394b8ba" dependencies = [ "crossterm", - "strum", - "strum_macros", + "strum 0.24.1", + "strum_macros 0.24.3", "unicode-width", ] @@ -868,6 +868,7 @@ version = "0.1.0" dependencies = [ "async-trait", "ethers", + "ethers-core", "futures", "governor", "indexmap 2.0.0", @@ -1302,9 +1303,9 @@ dependencies = [ [[package]] name = "ethers-core" -version = "2.0.7" +version = "2.0.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6da5fa198af0d3be20c19192df2bd9590b92ce09a8421e793bec8851270f1b05" +checksum = "60ca2514feb98918a0a31de7e1983c29f2267ebf61b2dc5d4294f91e5b866623" dependencies = [ "arrayvec", "bytes", @@ -1322,7 +1323,7 @@ dependencies = [ "rlp", "serde", "serde_json", - "strum", + "strum 0.25.0", "syn 2.0.23", "tempfile", "thiserror", @@ -4052,8 +4053,14 @@ name = "strum" version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "063e6045c0e62079840579a7e47a355ae92f60eb74daaf156fb1e84ba164e63f" + +[[package]] +name = "strum" +version = "0.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "290d54ea6f91c969195bdbcd7442c8c2a2ba87da8bf60a7ee86a235d4bc1e125" dependencies = [ - "strum_macros", + "strum_macros 0.25.2", ] [[package]] @@ -4069,6 +4076,19 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "strum_macros" +version = "0.25.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad8d03b598d3d0fff69bf533ee3ef19b8eeb342729596df84bcc7e1f96ec4059" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "rustversion", + "syn 2.0.23", +] + [[package]] name = "subtle" version = "2.5.0" diff --git a/crates/cli/src/parse/query.rs b/crates/cli/src/parse/query.rs index 69c09dfa..4504822b 100644 --- a/crates/cli/src/parse/query.rs +++ b/crates/cli/src/parse/query.rs @@ -65,6 +65,8 @@ fn parse_datatypes(raw_inputs: &Vec) -> Result, ParseError "traces" => Datatype::Traces, "vm_traces" => Datatype::VmTraces, "opcode_traces" => Datatype::VmTraces, + "native_transfers" => Datatype::NativeTransfers, + "contracts" => Datatype::Contracts, _ => { return Err(ParseError::ParseError(format!("invalid datatype {}", datatype))) } diff --git a/crates/freeze/Cargo.toml b/crates/freeze/Cargo.toml index 659c103e..5da17019 100644 --- a/crates/freeze/Cargo.toml +++ b/crates/freeze/Cargo.toml @@ -12,6 +12,7 @@ repository.workspace = true [dependencies] async-trait = "0.1.68" ethers = { version = "2.0.7", features = ["rustls", "ws", "ipc"] } +ethers-core = "2.0.8" futures = "0.3.28" governor = "0.5.1" indexmap = "2.0.0" diff --git a/crates/freeze/src/datasets/contracts.rs b/crates/freeze/src/datasets/contracts.rs new file mode 100644 index 00000000..650a9e28 --- /dev/null +++ b/crates/freeze/src/datasets/contracts.rs @@ -0,0 +1,248 @@ +use std::collections::HashMap; + +use ethers::prelude::*; +use polars::prelude::*; +use tokio::sync::mpsc; + +use super::traces; +use crate::{ + types::{ + conversions::ToVecHex, dataframes::SortableDataFrame, BlockChunk, CollectError, ColumnType, + Contracts, Dataset, Datatype, RowFilter, Source, Table, TransactionChunk, + }, + with_series, with_series_binary, +}; + +#[async_trait::async_trait] +impl Dataset for Contracts { + fn datatype(&self) -> Datatype { + Datatype::Contracts + } + + fn name(&self) -> &'static str { + "contracts" + } + + fn column_types(&self) -> HashMap<&'static str, ColumnType> { + HashMap::from_iter(vec![ + ("block_number", ColumnType::UInt32), + ("create_index", ColumnType::UInt32), + ("transaction_hash", ColumnType::Binary), + ("contract_address", ColumnType::Binary), + ("deployer", ColumnType::Binary), + ("factory", ColumnType::Binary), + ("init_code", ColumnType::Binary), + ("code", ColumnType::Binary), + ("init_code_hash", ColumnType::Binary), + ("code_hash", ColumnType::Binary), + ("chain_id", ColumnType::UInt64), + ]) + } + + fn default_columns(&self) -> Vec<&'static str> { + vec![ + "block_number", + "create_index", + "transaction_hash", + "contract_address", + "deployer", + "factory", + "init_code", + "code", + "init_code_hash", + "code_hash", + ] + } + + fn default_sort(&self) -> Vec { + vec!["block_number".to_string(), "create_index".to_string()] + } + + async fn collect_block_chunk( + &self, + chunk: &BlockChunk, + source: &Source, + schema: &Table, + _filter: Option<&RowFilter>, + ) -> Result { + let rx = traces::fetch_block_traces(chunk, source).await; + traces_to_contracts_df(rx, schema, source.chain_id).await + } + + async fn collect_transaction_chunk( + &self, + chunk: &TransactionChunk, + source: &Source, + schema: &Table, + _filter: Option<&RowFilter>, + ) -> Result { + let rx = traces::fetch_transaction_traces(chunk, source).await; + traces_to_contracts_df(rx, schema, source.chain_id).await + } +} + +struct ContractsColumns { + block_number: Vec, + create_index: Vec, + transaction_hash: Vec>>, + contract_address: Vec>, + deployer: Vec>, + factory: Vec>, + init_code: Vec>, + code: Vec>, + init_code_hash: Vec>, + code_hash: Vec>, + chain_id: Vec, + n_rows: usize, +} + +impl ContractsColumns { + fn new(capacity: usize) -> ContractsColumns { + ContractsColumns { + block_number: Vec::with_capacity(capacity), + create_index: Vec::with_capacity(capacity), + transaction_hash: Vec::with_capacity(capacity), + contract_address: Vec::with_capacity(capacity), + deployer: Vec::with_capacity(capacity), + factory: Vec::with_capacity(capacity), + init_code: Vec::with_capacity(capacity), + code: Vec::with_capacity(capacity), + init_code_hash: Vec::with_capacity(capacity), + code_hash: Vec::with_capacity(capacity), + chain_id: Vec::with_capacity(capacity), + n_rows: 0, + } + } + + fn into_df(self, schema: &Table, chain_id: u64) -> Result { + let mut cols = Vec::new(); + + with_series!(cols, "block_number", self.block_number, schema); + with_series!(cols, "create_index", self.create_index, schema); + with_series_binary!(cols, "transaction_hash", self.transaction_hash, schema); + with_series_binary!(cols, "contract_address", self.contract_address, schema); + with_series_binary!(cols, "deployer", self.deployer, schema); + with_series_binary!(cols, "factory", self.factory, schema); + with_series_binary!(cols, "init_code", self.init_code, schema); + with_series_binary!(cols, "code", self.code, schema); + with_series_binary!(cols, "init_code_hash", self.init_code_hash, schema); + with_series_binary!(cols, "code_hash", self.code_hash, schema); + with_series!(cols, "chain_id", self.chain_id, schema); + + if schema.has_column("chain_id") { + cols.push(Series::new("chain_id", vec![chain_id; self.n_rows])); + }; + + DataFrame::new(cols).map_err(CollectError::PolarsError).sort_by_schema(schema) + } +} + +async fn traces_to_contracts_df( + mut rx: mpsc::Receiver, CollectError>>, + schema: &Table, + chain_id: u64, +) -> Result { + let mut columns = ContractsColumns::new(200); + while let Some(message) = rx.recv().await { + match message { + Ok(traces) => process_traces(traces, schema, &mut columns)?, + _ => return Err(CollectError::TooManyRequestsError), + } + } + columns.into_df(schema, chain_id) +} + +fn process_traces( + traces: Vec, + schema: &Table, + columns: &mut ContractsColumns, +) -> Result<(), CollectError> { + let traces = filter_failed_traces(traces); + + let mut deployer = H160([0; 20]); + let mut create_index = 0; + for trace in traces.iter() { + if trace.trace_address.is_empty() { + deployer = match &trace.action { + Action::Call(call) => call.from, + Action::Create(create) => create.from, + Action::Suicide(suicide) => suicide.refund_address, + Action::Reward(reward) => reward.author, + }; + }; + + if let (Action::Create(create), Some(Res::Create(result))) = (&trace.action, &trace.result) + { + columns.n_rows += 1; + if schema.has_column("block_number") { + columns.block_number.push(trace.block_number as u32); + } + if schema.has_column("create_index") { + columns.create_index.push(create_index); + create_index += 1; + } + if schema.has_column("transaction_hash") { + match trace.transaction_hash { + Some(tx_hash) => columns.transaction_hash.push(Some(tx_hash.as_bytes().into())), + None => columns.transaction_hash.push(None), + } + } + if schema.has_column("contract_address") { + columns.contract_address.push(result.address.as_bytes().into()) + } + if schema.has_column("deployer") { + columns.deployer.push(deployer.as_bytes().into()) + } + if schema.has_column("factory") { + columns.factory.push(create.from.as_bytes().into()) + } + if schema.has_column("init_code") { + columns.init_code.push(create.init.to_vec()) + } + if schema.has_column("code") { + columns.code.push(result.code.to_vec()) + } + if schema.has_column("init_code_hash") { + columns + .init_code_hash + .push(ethers_core::utils::keccak256(create.init.clone()).into()) + } + if schema.has_column("code_hash") { + columns.code_hash.push(ethers_core::utils::keccak256(result.code.clone()).into()) + } + } + } + Ok(()) +} + +/// filter out error traces +fn filter_failed_traces(traces: Vec) -> Vec { + let mut error_address: Option> = None; + let mut filtered: Vec = Vec::new(); + + for trace in traces.into_iter() { + // restart for each transaction + if trace.trace_address.is_empty() { + error_address = None; + }; + + // if in an error, check if next trace is still in error + if let Some(ref e_address) = error_address { + if trace.trace_address.len() >= e_address.len() && + trace.trace_address[0..e_address.len()] == e_address[..] + { + continue + } else { + error_address = None; + } + } + + // check if current trace is start of an error + match trace.error { + Some(_) => error_address = Some(trace.trace_address), + None => filtered.push(trace), + } + } + + filtered +} diff --git a/crates/freeze/src/datasets/mod.rs b/crates/freeze/src/datasets/mod.rs index fb915e8d..506715ea 100644 --- a/crates/freeze/src/datasets/mod.rs +++ b/crates/freeze/src/datasets/mod.rs @@ -2,7 +2,9 @@ mod balance_diffs; mod blocks; mod blocks_and_transactions; mod code_diffs; +mod contracts; mod logs; +mod native_transfers; mod nonce_diffs; mod state_diffs; mod storage_diffs; diff --git a/crates/freeze/src/datasets/native_transfers.rs b/crates/freeze/src/datasets/native_transfers.rs new file mode 100644 index 00000000..5b9ed8c4 --- /dev/null +++ b/crates/freeze/src/datasets/native_transfers.rs @@ -0,0 +1,230 @@ +use std::collections::HashMap; + +use ethers::prelude::*; +use polars::prelude::*; +use tokio::sync::mpsc; + +use super::traces; +use crate::{ + types::{ + conversions::ToVecHex, dataframes::SortableDataFrame, BlockChunk, CollectError, ColumnType, + Dataset, Datatype, NativeTransfers, RowFilter, Source, Table, ToVecU8, TransactionChunk, + }, + with_series, with_series_binary, +}; + +#[async_trait::async_trait] +impl Dataset for NativeTransfers { + fn datatype(&self) -> Datatype { + Datatype::NativeTransfers + } + + fn name(&self) -> &'static str { + "native_transfers" + } + + fn column_types(&self) -> HashMap<&'static str, ColumnType> { + HashMap::from_iter(vec![ + ("block_number", ColumnType::UInt32), + ("transaction_index", ColumnType::UInt32), + ("transfer_index", ColumnType::UInt32), + ("transaction_hash", ColumnType::Binary), + ("from_address", ColumnType::Binary), + ("to_address", ColumnType::Binary), + ("value", ColumnType::Binary), + ("chain_id", ColumnType::UInt64), + ]) + } + + fn default_columns(&self) -> Vec<&'static str> { + vec![ + "block_number", + "transaction_index", + "transfer_index", + "transaction_hash", + "from_address", + "to_address", + "value", + ] + } + + fn default_sort(&self) -> Vec { + vec!["block_number".to_string(), "transfer_index".to_string()] + } + + async fn collect_block_chunk( + &self, + chunk: &BlockChunk, + source: &Source, + schema: &Table, + _filter: Option<&RowFilter>, + ) -> Result { + let rx = traces::fetch_block_traces(chunk, source).await; + traces_to_native_transfers_df(rx, schema, source.chain_id).await + } + + async fn collect_transaction_chunk( + &self, + chunk: &TransactionChunk, + source: &Source, + schema: &Table, + _filter: Option<&RowFilter>, + ) -> Result { + let rx = traces::fetch_transaction_traces(chunk, source).await; + traces_to_native_transfers_df(rx, schema, source.chain_id).await + } +} + +struct NativeTransfersColumns { + block_number: Vec, + transaction_index: Vec>, + transfer_index: Vec, + transaction_hash: Vec>>, + from_address: Vec>, + to_address: Vec>, + value: Vec>, + chain_id: Vec, + n_rows: usize, +} + +impl NativeTransfersColumns { + fn new(capacity: usize) -> NativeTransfersColumns { + NativeTransfersColumns { + block_number: Vec::with_capacity(capacity), + transaction_index: Vec::with_capacity(capacity), + transfer_index: Vec::with_capacity(capacity), + transaction_hash: Vec::with_capacity(capacity), + from_address: Vec::with_capacity(capacity), + to_address: Vec::with_capacity(capacity), + value: Vec::with_capacity(capacity), + chain_id: Vec::with_capacity(capacity), + n_rows: 0, + } + } + + fn into_df(self, schema: &Table, chain_id: u64) -> Result { + let mut cols = Vec::new(); + + with_series!(cols, "block_number", self.block_number, schema); + with_series!(cols, "transaction_index", self.transaction_index, schema); + with_series!(cols, "transfer_index", self.transfer_index, schema); + with_series_binary!(cols, "transaction_hash", self.transaction_hash, schema); + with_series_binary!(cols, "from_address", self.from_address, schema); + with_series_binary!(cols, "to_address", self.to_address, schema); + with_series_binary!(cols, "value", self.value, schema); + with_series!(cols, "chain_id", self.chain_id, schema); + + if schema.has_column("chain_id") { + cols.push(Series::new("chain_id", vec![chain_id; self.n_rows])); + }; + + DataFrame::new(cols).map_err(CollectError::PolarsError).sort_by_schema(schema) + } +} + +async fn traces_to_native_transfers_df( + mut rx: mpsc::Receiver, CollectError>>, + schema: &Table, + chain_id: u64, +) -> Result { + let mut columns = NativeTransfersColumns::new(200); + while let Some(message) = rx.recv().await { + match message { + Ok(traces) => process_traces(traces, schema, &mut columns)?, + _ => return Err(CollectError::TooManyRequestsError), + } + } + columns.into_df(schema, chain_id) +} + +fn process_traces( + traces: Vec, + schema: &Table, + columns: &mut NativeTransfersColumns, +) -> Result<(), CollectError> { + for (transfer_index, trace) in traces.iter().enumerate() { + columns.n_rows += 1; + + if schema.has_column("block_number") { + columns.block_number.push(trace.block_number); + } + if schema.has_column("transaction_index") { + match trace.transaction_position { + Some(index) => { + columns.transaction_index.push(Some(index as u32)); + } + None => { + columns.transaction_index.push(None); + } + }; + }; + if schema.has_column("transfer_index") { + columns.transfer_index.push(transfer_index as u32); + }; + if schema.has_column("transaction_hash") { + match trace.transaction_hash { + Some(hash) => { + columns.transaction_hash.push(Some(hash.as_bytes().into())); + } + None => { + columns.transaction_hash.push(None); + } + }; + }; + match &trace.action { + Action::Call(call) => { + if schema.has_column("from_address") { + columns.from_address.push(call.from.0.into()); + } + if schema.has_column("to_address") { + columns.to_address.push(call.to.0.into()); + } + if schema.has_column("value") { + columns.value.push(call.value.to_vec_u8()); + } + } + Action::Create(create) => { + if schema.has_column("from_address") { + columns.from_address.push(create.from.0.into()); + } + if schema.has_column("to_address") { + match &trace.result { + Some(Res::Create(res)) => columns.to_address.push(res.address.0.into()), + _ => { + return Err(CollectError::CollectError( + "missing create result".to_string(), + )) + } + } + } + if schema.has_column("value") { + columns.value.push(create.value.to_vec_u8()); + } + } + Action::Suicide(suicide) => { + if schema.has_column("from_address") { + columns.from_address.push(suicide.address.0.into()); + } + if schema.has_column("to_address") { + columns.to_address.push(suicide.refund_address.0.into()); + } + if schema.has_column("value") { + columns.value.push(suicide.balance.to_vec_u8()); + } + } + Action::Reward(reward) => { + if schema.has_column("from_address") { + columns.from_address.push(vec![0; 20]); + } + if schema.has_column("to_address") { + columns.to_address.push(reward.author.0.into()); + } + if schema.has_column("value") { + columns.value.push(reward.value.to_vec_u8()); + } + } + } + } + + Ok(()) +} diff --git a/crates/freeze/src/datasets/traces.rs b/crates/freeze/src/datasets/traces.rs index 5e803708..efa136ee 100644 --- a/crates/freeze/src/datasets/traces.rs +++ b/crates/freeze/src/datasets/traces.rs @@ -101,7 +101,7 @@ impl Dataset for Traces { } } -async fn fetch_block_traces( +pub(crate) async fn fetch_block_traces( block_chunk: &BlockChunk, source: &Source, ) -> mpsc::Receiver, CollectError>> { @@ -136,7 +136,7 @@ async fn fetch_block_traces( rx } -async fn fetch_transaction_traces( +pub(crate) async fn fetch_transaction_traces( transaction_chunk: &TransactionChunk, source: &Source, ) -> mpsc::Receiver, CollectError>> { diff --git a/crates/freeze/src/types/datatypes/scalar.rs b/crates/freeze/src/types/datatypes/scalar.rs index b8310369..dd0e0248 100644 --- a/crates/freeze/src/types/datatypes/scalar.rs +++ b/crates/freeze/src/types/datatypes/scalar.rs @@ -26,6 +26,10 @@ pub struct Traces; pub struct Transactions; /// VmTraces Dataset pub struct VmTraces; +/// Native Transfers Dataset +pub struct NativeTransfers; +/// Contracts Dataset +pub struct Contracts; /// enum of possible datatypes that cryo can collect #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] @@ -48,6 +52,10 @@ pub enum Datatype { StorageDiffs, /// VmTraces VmTraces, + /// Native Transfers + NativeTransfers, + /// Contracts + Contracts, } impl Datatype { @@ -63,6 +71,8 @@ impl Datatype { Datatype::Traces => Box::new(Traces), Datatype::StorageDiffs => Box::new(StorageDiffs), Datatype::VmTraces => Box::new(VmTraces), + Datatype::NativeTransfers => Box::new(NativeTransfers), + Datatype::Contracts => Box::new(Contracts), } } }