diff --git a/crates/freeze/src/datasets/balance_diffs.rs b/crates/freeze/src/datasets/balance_diffs.rs index f9eb2696..a7962045 100644 --- a/crates/freeze/src/datasets/balance_diffs.rs +++ b/crates/freeze/src/datasets/balance_diffs.rs @@ -4,7 +4,8 @@ use polars::prelude::*; use super::state_diffs; use crate::types::{ - BalanceDiffs, BlockChunk, CollectError, ColumnType, Dataset, Datatype, RowFilter, Source, Table, + BalanceDiffs, BlockChunk, CollectError, ColumnType, Dataset, Datatype, RowFilter, Source, + Table, TransactionChunk, }; #[async_trait::async_trait] @@ -51,6 +52,30 @@ impl Dataset for BalanceDiffs { schema: &Table, filter: Option<&RowFilter>, ) -> Result { - state_diffs::collect_single(&Datatype::BalanceDiffs, chunk, source, schema, filter).await + state_diffs::collect_block_state_diffs( + &Datatype::BalanceDiffs, + chunk, + source, + schema, + filter, + ) + .await + } + + async fn collect_transaction_chunk( + &self, + chunk: &TransactionChunk, + source: &Source, + schema: &Table, + filter: Option<&RowFilter>, + ) -> Result { + state_diffs::collect_transaction_state_diffs( + &Datatype::BalanceDiffs, + chunk, + source, + schema, + filter, + ) + .await } } diff --git a/crates/freeze/src/datasets/code_diffs.rs b/crates/freeze/src/datasets/code_diffs.rs index d055a8df..2dbf7579 100644 --- a/crates/freeze/src/datasets/code_diffs.rs +++ b/crates/freeze/src/datasets/code_diffs.rs @@ -5,6 +5,7 @@ use polars::prelude::*; use super::state_diffs; use crate::types::{ BlockChunk, CodeDiffs, CollectError, ColumnType, Dataset, Datatype, RowFilter, Source, Table, + TransactionChunk, }; #[async_trait::async_trait] @@ -51,6 +52,24 @@ impl Dataset for CodeDiffs { schema: &Table, filter: Option<&RowFilter>, ) -> Result { - state_diffs::collect_single(&Datatype::CodeDiffs, chunk, source, schema, filter).await + state_diffs::collect_block_state_diffs(&Datatype::CodeDiffs, chunk, source, schema, filter) + .await + } + + async fn collect_transaction_chunk( + &self, + chunk: &TransactionChunk, + source: &Source, + schema: &Table, + filter: Option<&RowFilter>, + ) -> Result { + state_diffs::collect_transaction_state_diffs( + &Datatype::CodeDiffs, + chunk, + source, + schema, + filter, + ) + .await } } diff --git a/crates/freeze/src/datasets/nonce_diffs.rs b/crates/freeze/src/datasets/nonce_diffs.rs index c0d6e9dd..27ec40d7 100644 --- a/crates/freeze/src/datasets/nonce_diffs.rs +++ b/crates/freeze/src/datasets/nonce_diffs.rs @@ -5,6 +5,7 @@ use polars::prelude::*; use super::state_diffs; use crate::types::{ BlockChunk, CollectError, ColumnType, Dataset, Datatype, NonceDiffs, RowFilter, Source, Table, + TransactionChunk, }; #[async_trait::async_trait] @@ -51,6 +52,24 @@ impl Dataset for NonceDiffs { schema: &Table, filter: Option<&RowFilter>, ) -> Result { - state_diffs::collect_single(&Datatype::NonceDiffs, chunk, source, schema, filter).await + state_diffs::collect_block_state_diffs(&Datatype::NonceDiffs, chunk, source, schema, filter) + .await + } + + async fn collect_transaction_chunk( + &self, + chunk: &TransactionChunk, + source: &Source, + schema: &Table, + filter: Option<&RowFilter>, + ) -> Result { + state_diffs::collect_transaction_state_diffs( + &Datatype::NonceDiffs, + chunk, + source, + schema, + filter, + ) + .await } } diff --git a/crates/freeze/src/datasets/state_diffs.rs b/crates/freeze/src/datasets/state_diffs.rs index 2c619967..55d29279 100644 --- a/crates/freeze/src/datasets/state_diffs.rs +++ b/crates/freeze/src/datasets/state_diffs.rs @@ -8,11 +8,15 @@ use crate::{ dataframes::SortableDataFrame, types::{ conversions::ToVecHex, BlockChunk, ChunkData, CollectError, ColumnType, Datatype, - MultiDataset, RowFilter, Source, StateDiffs, Table, + MultiDataset, RowFilter, Source, StateDiffs, Table, TransactionChunk, }, with_series, with_series_binary, }; +// entries: block_number, transaction_indices, transaction_traces +pub(crate) type BlockNumberTransactionsTraces = + Result<(Option, Vec<(u32, BlockTrace)>), CollectError>; + #[async_trait::async_trait] impl MultiDataset for StateDiffs { fn name(&self) -> &'static str { @@ -32,19 +36,56 @@ impl MultiDataset for StateDiffs { schemas: HashMap, _filter: HashMap, ) -> Result, CollectError> { - let rx = fetch_state_diffs(chunk, source).await; + let rx = fetch_block_state_diffs(chunk, source).await; + state_diffs_to_df(rx, &schemas, source.chain_id).await + } + + async fn collect_transaction_chunk( + &self, + chunk: &TransactionChunk, + source: &Source, + schemas: HashMap, + _filter: HashMap, + ) -> Result, CollectError> { + let include_indices = schemas.values().any(|schema| schema.has_column("block_number")); + let rx = fetch_transaction_state_diffs(chunk, source, include_indices).await; state_diffs_to_df(rx, &schemas, source.chain_id).await } } -pub(crate) async fn collect_single( +pub(crate) async fn collect_block_state_diffs( datatype: &Datatype, chunk: &BlockChunk, source: &Source, schema: &Table, _filter: Option<&RowFilter>, ) -> Result { - let rx = fetch_state_diffs(chunk, source).await; + let rx = fetch_block_state_diffs(chunk, source).await; + let mut schemas: HashMap = HashMap::new(); + schemas.insert(*datatype, schema.clone()); + let dfs = state_diffs_to_df(rx, &schemas, source.chain_id).await; + + // get single df out of result + let df = match dfs { + Ok(mut dfs) => match dfs.remove(datatype) { + Some(df) => Ok(df), + None => Err(CollectError::BadSchemaError), + }, + Err(e) => Err(e), + }; + + df.sort_by_schema(schema) +} + +pub(crate) async fn collect_transaction_state_diffs( + datatype: &Datatype, + chunk: &TransactionChunk, + source: &Source, + schema: &Table, + _filter: Option<&RowFilter>, +) -> Result { + let include_indices = schema.has_column("block_number"); + let rx = fetch_transaction_state_diffs(chunk, source, include_indices).await; let mut schemas: HashMap = HashMap::new(); schemas.insert(*datatype, schema.clone()); let dfs = state_diffs_to_df(rx, &schemas, source.chain_id).await; @@ -65,7 +106,7 @@ pub(crate) async fn fetch_block_traces( block_chunk: &BlockChunk, trace_types: &[TraceType], source: &Source, -) -> mpsc::Receiver<(u32, Result, CollectError>)> { +) -> mpsc::Receiver { let (tx, rx) = mpsc::channel(block_chunk.size() as usize); for number in block_chunk.numbers() { let tx = tx.clone(); @@ -84,8 +125,17 @@ pub(crate) async fn fetch_block_traces( let result = provider .trace_replay_block_transactions(BlockNumber::Number(number.into()), trace_types) .await + .map(|res| { + ( + Some(number as u32), + res.into_iter() + .enumerate() + .map(|(index, traces)| (index as u32, traces)) + .collect(), + ) + }) .map_err(CollectError::ProviderError); - match tx.send((number as u32, result)).await { + match tx.send(result).await { Ok(_) => {} Err(tokio::sync::mpsc::error::SendError(_e)) => { eprintln!("send error, try using a rate limit with --requests-per-second or limiting max concurrency with --max-concurrent-requests"); @@ -98,15 +148,109 @@ pub(crate) async fn fetch_block_traces( rx } -pub(crate) async fn fetch_state_diffs( - block_chunk: &BlockChunk, +pub(crate) async fn fetch_transaction_traces( + transaction_chunk: &TransactionChunk, + trace_types: &[TraceType], + source: &Source, + include_indices: bool, +) -> mpsc::Receiver { + match transaction_chunk { + TransactionChunk::Values(tx_hashes) => { + let (tx, rx) = mpsc::channel(tx_hashes.len()); + for tx_hash in tx_hashes.iter() { + let tx_hash = tx_hash.clone(); + let tx = tx.clone(); + let provider = source.provider.clone(); + let semaphore = source.semaphore.clone(); + let rate_limiter = source.rate_limiter.as_ref().map(Arc::clone); + let trace_types = trace_types.to_vec(); + tokio::spawn(async move { + let _permit = match semaphore { + Some(semaphore) => Some(Arc::clone(&semaphore).acquire_owned().await), + _ => None, + }; + if let Some(limiter) = &rate_limiter { + Arc::clone(limiter).until_ready().await; + } + let tx_hash = H256::from_slice(&tx_hash); + let result = provider + .trace_replay_transaction(tx_hash, trace_types) + .await + .map_err(CollectError::ProviderError); + let result = match result { + Ok(trace) => { + let trace = BlockTrace { transaction_hash: Some(tx_hash), ..trace }; + if include_indices { + if let Some(limiter) = rate_limiter { + Arc::clone(&limiter).until_ready().await; + }; + match provider.get_transaction(tx_hash).await { + Ok(Some(tx)) => match (tx.block_number, tx.transaction_index) { + (Some(block_number), Some(tx_index)) => Ok(( + Some(block_number.as_u32()), + vec![(tx_index.as_u32(), trace)], + )), + _ => Err(CollectError::CollectError( + "could not get block number".to_string(), + )), + }, + _ => Err(CollectError::CollectError( + "could not get block number".to_string(), + )), + } + } else { + Ok((None, vec![(0, trace)])) + } + } + Err(_e) => { + Err(CollectError::CollectError("failed to collect tx".to_string())) + } + }; + match tx.send(result).await { + Ok(_) => {} + Err(tokio::sync::mpsc::error::SendError(_e)) => { + eprintln!("send error, try using a rate limit with --requests-per-second or limiting max concurrency with --max-concurrent-requests"); + std::process::exit(1) + } + } + }); + } + rx + } + _ => { + let (tx, rx) = mpsc::channel(1); + let result = Err(CollectError::CollectError( + "transaction value ranges not supported".to_string(), + )); + match tx.send(result).await { + Ok(_) => {} + Err(tokio::sync::mpsc::error::SendError(_e)) => { + eprintln!("send error, try using a rate limit with --requests-per-second or limiting max concurrency with --max-concurrent-requests"); + std::process::exit(1) + } + } + rx + } + } +} + +pub(crate) async fn fetch_block_state_diffs( + chunk: &BlockChunk, + source: &Source, +) -> mpsc::Receiver { + fetch_block_traces(chunk, &[TraceType::StateDiff], source).await +} + +pub(crate) async fn fetch_transaction_state_diffs( + chunk: &TransactionChunk, source: &Source, -) -> mpsc::Receiver<(u32, Result, CollectError>)> { - fetch_block_traces(block_chunk, &[TraceType::StateDiff], source).await + include_indices: bool, +) -> mpsc::Receiver { + fetch_transaction_traces(chunk, &[TraceType::StateDiff], source, include_indices).await } async fn state_diffs_to_df( - mut rx: mpsc::Receiver<(u32, Result, CollectError>)>, + mut rx: mpsc::Receiver, schemas: &HashMap, chain_id: u64, ) -> Result, CollectError> { @@ -185,9 +329,8 @@ async fn state_diffs_to_df( let mut n_rows = 0; while let Some(message) = rx.recv().await { match message { - (block_num, Ok(blocks_traces)) => { - for (t_index, ts) in blocks_traces.iter().enumerate() { - let t_index = t_index as u32; + Ok((block_num, blocks_traces)) => { + for (t_index, ts) in blocks_traces.iter() { if let (Some(tx), Some(StateDiff(state_diff))) = (ts.transaction_hash, &ts.state_diff) { @@ -204,10 +347,17 @@ async fn state_diffs_to_df( Diff::Changed(ChangedType { from, to }) => (*from, *to), }; if include_storage_block_number { - storage_block_number.push(block_num); + match block_num { + Some(block_num) => storage_block_number.push(block_num), + None => { + return Err(CollectError::CollectError( + "block number not given".to_string(), + )) + } + } }; if include_storage_transaction_index { - storage_transaction_index.push(t_index); + storage_transaction_index.push(*t_index); }; if include_storage_transaction_hash { storage_transaction_hash.push(tx.as_bytes().to_vec()); @@ -238,10 +388,17 @@ async fn state_diffs_to_df( } }; if include_balance_block_number { - balance_block_number.push(block_num); + match block_num { + Some(block_num) => balance_block_number.push(block_num), + None => { + return Err(CollectError::CollectError( + "block number not given".to_string(), + )) + } + } }; if include_balance_transaction_index { - balance_transaction_index.push(t_index); + balance_transaction_index.push(*t_index); }; if include_balance_transaction_hash { balance_transaction_hash.push(tx.as_bytes().to_vec()); @@ -268,10 +425,17 @@ async fn state_diffs_to_df( } }; if include_nonce_block_number { - nonce_block_number.push(block_num); + match block_num { + Some(block_num) => nonce_block_number.push(block_num), + None => { + return Err(CollectError::CollectError( + "block number not given".to_string(), + )) + } + } }; if include_nonce_transaction_index { - nonce_transaction_index.push(t_index); + nonce_transaction_index.push(*t_index); }; if include_nonce_transaction_hash { nonce_transaction_hash.push(tx.as_bytes().to_vec()); @@ -305,10 +469,17 @@ async fn state_diffs_to_df( } }; if include_code_block_number { - code_block_number.push(block_num); + match block_num { + Some(block_num) => code_block_number.push(block_num), + None => { + return Err(CollectError::CollectError( + "block number not given".to_string(), + )) + } + } }; if include_code_transaction_index { - code_transaction_index.push(t_index); + code_transaction_index.push(*t_index); }; if include_code_transaction_hash { code_transaction_hash.push(tx.as_bytes().to_vec()); diff --git a/crates/freeze/src/datasets/storage_diffs.rs b/crates/freeze/src/datasets/storage_diffs.rs index 2c04ac9f..9ca0a0fd 100644 --- a/crates/freeze/src/datasets/storage_diffs.rs +++ b/crates/freeze/src/datasets/storage_diffs.rs @@ -4,7 +4,8 @@ use polars::prelude::*; use super::state_diffs; use crate::types::{ - BlockChunk, CollectError, ColumnType, Dataset, Datatype, RowFilter, Source, StorageDiffs, Table, + BlockChunk, CollectError, ColumnType, Dataset, Datatype, RowFilter, Source, StorageDiffs, + Table, TransactionChunk, }; #[async_trait::async_trait] @@ -53,6 +54,30 @@ impl Dataset for StorageDiffs { schema: &Table, filter: Option<&RowFilter>, ) -> Result { - state_diffs::collect_single(&Datatype::StorageDiffs, chunk, source, schema, filter).await + state_diffs::collect_block_state_diffs( + &Datatype::StorageDiffs, + chunk, + source, + schema, + filter, + ) + .await + } + + async fn collect_transaction_chunk( + &self, + chunk: &TransactionChunk, + source: &Source, + schema: &Table, + filter: Option<&RowFilter>, + ) -> Result { + state_diffs::collect_transaction_state_diffs( + &Datatype::StorageDiffs, + chunk, + source, + schema, + filter, + ) + .await } } diff --git a/crates/freeze/src/datasets/vm_traces.rs b/crates/freeze/src/datasets/vm_traces.rs index 26c29703..480e524a 100644 --- a/crates/freeze/src/datasets/vm_traces.rs +++ b/crates/freeze/src/datasets/vm_traces.rs @@ -9,7 +9,7 @@ use crate::{ datasets::state_diffs, types::{ conversions::ToVecHex, BlockChunk, CollectError, ColumnType, Dataset, Datatype, RowFilter, - Source, Table, ToVecU8, VmTraces, + Source, Table, ToVecU8, TransactionChunk, VmTraces, }, with_series, with_series_binary, }; @@ -56,18 +56,39 @@ impl Dataset for VmTraces { schema: &Table, _filter: Option<&RowFilter>, ) -> Result { - let rx = fetch_vm_traces(chunk, source).await; + let rx = fetch_block_vm_traces(chunk, source).await; + vm_traces_to_df(rx, schema, source.chain_id).await + } + + async fn collect_transaction_chunk( + &self, + chunk: &TransactionChunk, + source: &Source, + schema: &Table, + _filter: Option<&RowFilter>, + ) -> Result { + let include_indices = schema.has_column("block_number"); + let rx = fetch_transaction_vm_traces(chunk, source, include_indices).await; vm_traces_to_df(rx, schema, source.chain_id).await } } -async fn fetch_vm_traces( +async fn fetch_block_vm_traces( block_chunk: &BlockChunk, source: &Source, -) -> mpsc::Receiver<(u32, Result, CollectError>)> { +) -> mpsc::Receiver { state_diffs::fetch_block_traces(block_chunk, &[TraceType::VmTrace], source).await } +async fn fetch_transaction_vm_traces( + chunk: &TransactionChunk, + source: &Source, + include_indices: bool, +) -> mpsc::Receiver { + state_diffs::fetch_transaction_traces(chunk, &[TraceType::VmTrace], source, include_indices) + .await +} + struct VmTraceColumns { block_number: Vec, transaction_position: Vec, @@ -84,7 +105,7 @@ struct VmTraceColumns { } async fn vm_traces_to_df( - mut rx: mpsc::Receiver<(u32, Result, CollectError>)>, + mut rx: mpsc::Receiver, schema: &Table, chain_id: u64, ) -> Result { @@ -106,10 +127,10 @@ async fn vm_traces_to_df( while let Some(message) = rx.recv().await { match message { - (number, Ok(block_traces)) => { - for (tx_pos, block_trace) in block_traces.into_iter().enumerate() { + Ok((number, block_traces)) => { + for (tx_pos, block_trace) in block_traces.into_iter() { if let Some(vm_trace) = block_trace.vm_trace { - add_ops(vm_trace, schema, &mut columns, number, tx_pos as u32) + add_ops(vm_trace, schema, &mut columns, number, tx_pos)?; } } } @@ -142,14 +163,17 @@ fn add_ops( vm_trace: VMTrace, schema: &Table, columns: &mut VmTraceColumns, - number: u32, + number: Option, tx_pos: u32, -) { +) -> Result<(), CollectError> { for opcode in vm_trace.ops { columns.n_rows += 1; if schema.has_column("block_number") { - columns.block_number.push(number); + match number { + Some(number) => columns.block_number.push(number), + None => return Err(CollectError::CollectError("block number not give".to_string())), + } }; if schema.has_column("transaction_position") { columns.transaction_position.push(tx_pos); @@ -226,7 +250,9 @@ fn add_ops( }; if let Some(sub) = opcode.sub { - add_ops(sub, schema, columns, number, tx_pos) + add_ops(sub, schema, columns, number, tx_pos)? } } + + Ok(()) }