diff --git a/Cargo.lock b/Cargo.lock index 06a1a495..30cbfe27 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -857,6 +857,7 @@ dependencies = [ "eyre", "governor", "hex", + "indicatif", "polars", "thousands", "tokio", diff --git a/crates/cli/Cargo.toml b/crates/cli/Cargo.toml index 0f481e4c..e5a8bb46 100644 --- a/crates/cli/Cargo.toml +++ b/crates/cli/Cargo.toml @@ -22,6 +22,7 @@ clap_cryo = { version = "4.3.21-cryo", features = ["derive", "color", "unstable- color-print = "0.3.4" ethers = "2.0.7" hex = "0.4.3" +indicatif = "0.17.5" polars = "0.30.0" tokio = "1.29.0" cryo_freeze = { version = "0.1.0", path = "../freeze" } diff --git a/crates/cli/src/run.rs b/crates/cli/src/run.rs index 58dc11ef..8b3f93e7 100644 --- a/crates/cli/src/run.rs +++ b/crates/cli/src/run.rs @@ -1,7 +1,8 @@ -use std::time::SystemTime; +use std::{sync::Arc, time::SystemTime}; use crate::{args, parse, summaries}; use cryo_freeze::{FreezeError, FreezeSummary}; +use indicatif::ProgressBar; /// run freeze for given Args pub async fn run(args: args::Args) -> Result, FreezeError> { @@ -13,9 +14,11 @@ pub async fn run(args: args::Args) -> Result, FreezeError> }; let t_parse_done = SystemTime::now(); + let n_chunks_remaining = query.get_n_chunks_remaining(&sink)?; + // print summary if !args.no_verbose { - summaries::print_cryo_summary(&query, &source, &sink); + summaries::print_cryo_summary(&query, &source, &sink, n_chunks_remaining); } // check dry run @@ -26,11 +29,19 @@ pub async fn run(args: args::Args) -> Result, FreezeError> return Ok(None) }; + // create progress bar + let bar = Arc::new(ProgressBar::new(n_chunks_remaining)); + bar.set_style( + indicatif::ProgressStyle::default_bar() + .template("{wide_bar:.green} {human_pos} / {human_len} ETA={eta_precise} ") + .map_err(FreezeError::ProgressBarError)?, + ); + // collect data if !args.no_verbose { summaries::print_header("\n\ncollecting data"); } - match cryo_freeze::freeze(&query, &source, &sink).await { + match cryo_freeze::freeze(&query, &source, &sink, bar).await { Ok(freeze_summary) => { // print summary let t_data_done = SystemTime::now(); diff --git a/crates/cli/src/summaries.rs b/crates/cli/src/summaries.rs index 0fa4683e..988ddeb6 100644 --- a/crates/cli/src/summaries.rs +++ b/crates/cli/src/summaries.rs @@ -29,7 +29,12 @@ fn print_bullet, B: AsRef>(key: A, value: B) { println!("{}{}{}{}", bullet_str, key_str, colon_str, value_str); } -pub(crate) fn print_cryo_summary(query: &MultiQuery, source: &Source, sink: &FileOutput) { +pub(crate) fn print_cryo_summary( + query: &MultiQuery, + source: &Source, + sink: &FileOutput, + n_chunks_remaining: u64, +) { print_header("cryo parameters"); let datatype_strs: Vec<_> = query.schemas.keys().map(|d| d.dataset().name()).collect(); print_bullet("datatypes", datatype_strs.join(", ")); @@ -38,6 +43,14 @@ pub(crate) fn print_cryo_summary(query: &MultiQuery, source: &Source, sink: &Fil // print_bullet("provider", rpc_url); print_block_chunks(query); print_transaction_chunks(query); + print_bullet( + "chunks to collect", + format!( + "{} / {}", + n_chunks_remaining.separate_with_commas(), + query.chunks.len().separate_with_commas() + ), + ); print_bullet("max concurrent chunks", source.max_concurrent_chunks.separate_with_commas()); if query.schemas.contains_key(&Datatype::Logs) { print_bullet("inner request size", source.inner_request_size.to_string()); @@ -68,7 +81,6 @@ fn print_block_chunks(query: &MultiQuery) { let chunk_size = first_chunk.size(); print_bullet("block chunk size", chunk_size.separate_with_commas()); }; - print_bullet("total block chunks", block_chunks.len().separate_with_commas()); } } @@ -155,11 +167,18 @@ pub(crate) fn print_cryo_conclusion( "t_end", " ".to_string() + dt_data_done.format("%Y-%m-%d %H:%M:%S%.3f").to_string().as_str(), ); - print_bullet("chunks errored", freeze_summary.n_errored.separate_with_commas()); - print_bullet("chunks skipped", freeze_summary.n_skipped.separate_with_commas()); + let n_chunks = query.chunks.len(); + print_bullet( + "chunks errored", + format!(" {} / {}", freeze_summary.n_errored.separate_with_commas(), n_chunks), + ); + print_bullet( + "chunks skipped", + format!(" {} / {}", freeze_summary.n_skipped.separate_with_commas(), n_chunks), + ); print_bullet( "chunks collected", - format!("{} / {}", freeze_summary.n_completed.separate_with_commas(), query.chunks.len()), + format!("{} / {}", freeze_summary.n_completed.separate_with_commas(), n_chunks), ); print_block_chunk_summary(query, freeze_summary, total_time); diff --git a/crates/freeze/src/freeze.rs b/crates/freeze/src/freeze.rs index b93df9cd..fd9a9955 100644 --- a/crates/freeze/src/freeze.rs +++ b/crates/freeze/src/freeze.rs @@ -14,15 +14,8 @@ pub async fn freeze( query: &MultiQuery, source: &Source, sink: &FileOutput, + bar: Arc, ) -> Result { - // create progress bar - let bar = Arc::new(ProgressBar::new(query.chunks.len() as u64)); - bar.set_style( - indicatif::ProgressStyle::default_bar() - .template("{wide_bar:.green} {human_pos} / {human_len} ETA={eta_precise} ") - .map_err(FreezeError::ProgressBarError)?, - ); - // freeze chunks concurrently let (datatypes, multi_datatypes) = cluster_datatypes(query.schemas.keys().collect()); let sem = Arc::new(Semaphore::new(source.max_concurrent_chunks as usize)); @@ -92,7 +85,7 @@ async fn freeze_datatype_chunk( // create path let (chunk, chunk_label) = chunk; - let path = match chunk.filepath(ds.name(), &sink, &chunk_label) { + let path = match chunk.filepath(&datatype, &sink, &chunk_label) { Err(_e) => return FreezeChunkSummary::error(HashMap::new()), Ok(path) => path, }; @@ -140,13 +133,14 @@ async fn freeze_multi_datatype_chunk( // create paths let (chunk, chunk_label) = chunk; - let mut paths: HashMap = HashMap::new(); - for ds in mdt.multi_dataset().datasets().values() { - match chunk.filepath(ds.name(), &sink, &chunk_label) { - Err(_e) => return FreezeChunkSummary::error(paths), - Ok(path) => paths.insert(ds.datatype(), path), - }; - } + let paths = match chunk.filepaths( + mdt.multi_dataset().datatypes().iter().collect(), + &sink, + &chunk_label, + ) { + Err(_e) => return FreezeChunkSummary::error(HashMap::new()), + Ok(paths) => paths, + }; // skip path if file already exists if paths.values().all(|path| Path::new(&path).exists()) && !sink.overwrite { diff --git a/crates/freeze/src/types/chunks/chunk.rs b/crates/freeze/src/types/chunks/chunk.rs index 8806d408..eda432dc 100644 --- a/crates/freeze/src/types/chunks/chunk.rs +++ b/crates/freeze/src/types/chunks/chunk.rs @@ -1,4 +1,5 @@ -use crate::types::{FileError, FileOutput}; +use crate::types::{Datatype, FileError, FileOutput}; +use std::collections::HashMap; use super::{binary_chunk::BinaryChunk, chunk_ops::ChunkData, number_chunk::NumberChunk}; @@ -29,16 +30,31 @@ impl Chunk { /// get filepath for chunk pub fn filepath( &self, - name: &str, + datatype: &Datatype, file_output: &FileOutput, chunk_label: &Option, ) -> Result { match self { - Chunk::Block(chunk) => chunk.filepath(name, file_output, chunk_label), - Chunk::Transaction(chunk) => chunk.filepath(name, file_output, chunk_label), - Chunk::Address(chunk) => chunk.filepath(name, file_output, chunk_label), + Chunk::Block(chunk) => chunk.filepath(datatype, file_output, chunk_label), + Chunk::Transaction(chunk) => chunk.filepath(datatype, file_output, chunk_label), + Chunk::Address(chunk) => chunk.filepath(datatype, file_output, chunk_label), } } + + /// get filepath for chunk + pub fn filepaths( + &self, + datatypes: Vec<&Datatype>, + file_output: &FileOutput, + chunk_label: &Option, + ) -> Result, FileError> { + let mut paths = HashMap::new(); + for datatype in datatypes { + let path = self.filepath(datatype, file_output, chunk_label)?; + paths.insert(*datatype, path); + } + Ok(paths) + } } impl From> for Chunk { diff --git a/crates/freeze/src/types/chunks/chunk_ops.rs b/crates/freeze/src/types/chunks/chunk_ops.rs index 53cca8c8..65e4e3de 100644 --- a/crates/freeze/src/types/chunks/chunk_ops.rs +++ b/crates/freeze/src/types/chunks/chunk_ops.rs @@ -1,4 +1,4 @@ -use crate::{ChunkError, FileError, FileOutput}; +use crate::{ChunkError, Datatype, FileError, FileOutput}; /// Trait for common chunk methods pub trait ChunkData: Sized { @@ -30,7 +30,7 @@ pub trait ChunkData: Sized { /// get filepath for chunk fn filepath( &self, - name: &str, + datatype: &Datatype, file_output: &FileOutput, chunk_label: &Option, ) -> Result { @@ -40,8 +40,10 @@ pub trait ChunkData: Sized { None => self.stub()?, }; let pieces: Vec = match &file_output.suffix { - Some(suffix) => vec![network_name, name.to_string(), stub, suffix.clone()], - None => vec![network_name, name.to_string(), stub], + Some(suffix) => { + vec![network_name, datatype.dataset().name().to_string(), stub, suffix.clone()] + } + None => vec![network_name, datatype.dataset().name().to_string(), stub], }; let filename = format!("{}.{}", pieces.join("__"), file_output.format.as_str()); match file_output.output_dir.as_str() { diff --git a/crates/freeze/src/types/queries.rs b/crates/freeze/src/types/queries.rs index e1588c7d..ccc5afa8 100644 --- a/crates/freeze/src/types/queries.rs +++ b/crates/freeze/src/types/queries.rs @@ -1,8 +1,11 @@ -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use ethers::prelude::*; -use crate::types::{Chunk, Datatype, Table}; +use crate::{ + types::{Chunk, Datatype, Table}, + CollectError, FileOutput, FreezeError, +}; /// Query multiple data types #[derive(Clone)] @@ -28,6 +31,39 @@ pub struct MultiQuery { pub row_filters: HashMap, } +impl MultiQuery { + /// get number of chunks that have not yet been collected + pub fn get_n_chunks_remaining(&self, sink: &FileOutput) -> Result { + let actual_files: HashSet = list_files(&sink.output_dir) + .map_err(|_e| { + FreezeError::CollectError(CollectError::CollectError( + "could not list files in output dir".to_string(), + )) + })? + .into_iter() + .collect(); + let mut n_chunks_remaining: u64 = 0; + for (chunk, chunk_label) in &self.chunks { + let chunk_files = chunk.filepaths(self.schemas.keys().collect(), sink, chunk_label)?; + if !chunk_files.values().all(|file| actual_files.contains(file)) { + n_chunks_remaining += 1; + } + } + Ok(n_chunks_remaining) + } +} + +fn list_files(dir: &str) -> Result, std::io::Error> { + let mut file_list = Vec::new(); + for entry in std::fs::read_dir(dir)? { + let entry = entry?; + if let Some(filename) = entry.path().to_str() { + file_list.push(filename.to_string()); + } + } + Ok(file_list) +} + /// Options for fetching logs #[derive(Clone)] pub struct RowFilter {