Skip to content

Commit

Permalink
QoL improvements to summary and conclusion text
Browse files Browse the repository at this point in the history
  • Loading branch information
sslivkoff committed Aug 11, 2023
1 parent 5003dd3 commit 04b952e
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 35 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ clap_cryo = { version = "4.3.21-cryo", features = ["derive", "color", "unstable-
color-print = "0.3.4"
ethers = "2.0.7"
hex = "0.4.3"
indicatif = "0.17.5"
polars = "0.30.0"
tokio = "1.29.0"
cryo_freeze = { version = "0.1.0", path = "../freeze" }
Expand Down
17 changes: 14 additions & 3 deletions crates/cli/src/run.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::time::SystemTime;
use std::{sync::Arc, time::SystemTime};

use crate::{args, parse, summaries};
use cryo_freeze::{FreezeError, FreezeSummary};
use indicatif::ProgressBar;

/// run freeze for given Args
pub async fn run(args: args::Args) -> Result<Option<FreezeSummary>, FreezeError> {
Expand All @@ -13,9 +14,11 @@ pub async fn run(args: args::Args) -> Result<Option<FreezeSummary>, FreezeError>
};
let t_parse_done = SystemTime::now();

let n_chunks_remaining = query.get_n_chunks_remaining(&sink)?;

// print summary
if !args.no_verbose {
summaries::print_cryo_summary(&query, &source, &sink);
summaries::print_cryo_summary(&query, &source, &sink, n_chunks_remaining);
}

// check dry run
Expand All @@ -26,11 +29,19 @@ pub async fn run(args: args::Args) -> Result<Option<FreezeSummary>, FreezeError>
return Ok(None)
};

// create progress bar
let bar = Arc::new(ProgressBar::new(n_chunks_remaining));
bar.set_style(
indicatif::ProgressStyle::default_bar()
.template("{wide_bar:.green} {human_pos} / {human_len} ETA={eta_precise} ")
.map_err(FreezeError::ProgressBarError)?,
);

// collect data
if !args.no_verbose {
summaries::print_header("\n\ncollecting data");
}
match cryo_freeze::freeze(&query, &source, &sink).await {
match cryo_freeze::freeze(&query, &source, &sink, bar).await {
Ok(freeze_summary) => {
// print summary
let t_data_done = SystemTime::now();
Expand Down
29 changes: 24 additions & 5 deletions crates/cli/src/summaries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,12 @@ fn print_bullet<A: AsRef<str>, B: AsRef<str>>(key: A, value: B) {
println!("{}{}{}{}", bullet_str, key_str, colon_str, value_str);
}

pub(crate) fn print_cryo_summary(query: &MultiQuery, source: &Source, sink: &FileOutput) {
pub(crate) fn print_cryo_summary(
query: &MultiQuery,
source: &Source,
sink: &FileOutput,
n_chunks_remaining: u64,
) {
print_header("cryo parameters");
let datatype_strs: Vec<_> = query.schemas.keys().map(|d| d.dataset().name()).collect();
print_bullet("datatypes", datatype_strs.join(", "));
Expand All @@ -38,6 +43,14 @@ pub(crate) fn print_cryo_summary(query: &MultiQuery, source: &Source, sink: &Fil
// print_bullet("provider", rpc_url);
print_block_chunks(query);
print_transaction_chunks(query);
print_bullet(
"chunks to collect",
format!(
"{} / {}",
n_chunks_remaining.separate_with_commas(),
query.chunks.len().separate_with_commas()
),
);
print_bullet("max concurrent chunks", source.max_concurrent_chunks.separate_with_commas());
if query.schemas.contains_key(&Datatype::Logs) {
print_bullet("inner request size", source.inner_request_size.to_string());
Expand Down Expand Up @@ -68,7 +81,6 @@ fn print_block_chunks(query: &MultiQuery) {
let chunk_size = first_chunk.size();
print_bullet("block chunk size", chunk_size.separate_with_commas());
};
print_bullet("total block chunks", block_chunks.len().separate_with_commas());
}
}

Expand Down Expand Up @@ -155,11 +167,18 @@ pub(crate) fn print_cryo_conclusion(
"t_end",
" ".to_string() + dt_data_done.format("%Y-%m-%d %H:%M:%S%.3f").to_string().as_str(),
);
print_bullet("chunks errored", freeze_summary.n_errored.separate_with_commas());
print_bullet("chunks skipped", freeze_summary.n_skipped.separate_with_commas());
let n_chunks = query.chunks.len();
print_bullet(
"chunks errored",
format!(" {} / {}", freeze_summary.n_errored.separate_with_commas(), n_chunks),
);
print_bullet(
"chunks skipped",
format!(" {} / {}", freeze_summary.n_skipped.separate_with_commas(), n_chunks),
);
print_bullet(
"chunks collected",
format!("{} / {}", freeze_summary.n_completed.separate_with_commas(), query.chunks.len()),
format!("{} / {}", freeze_summary.n_completed.separate_with_commas(), n_chunks),
);

print_block_chunk_summary(query, freeze_summary, total_time);
Expand Down
26 changes: 10 additions & 16 deletions crates/freeze/src/freeze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,8 @@ pub async fn freeze(
query: &MultiQuery,
source: &Source,
sink: &FileOutput,
bar: Arc<ProgressBar>,
) -> Result<FreezeSummary, FreezeError> {
// create progress bar
let bar = Arc::new(ProgressBar::new(query.chunks.len() as u64));
bar.set_style(
indicatif::ProgressStyle::default_bar()
.template("{wide_bar:.green} {human_pos} / {human_len} ETA={eta_precise} ")
.map_err(FreezeError::ProgressBarError)?,
);

// freeze chunks concurrently
let (datatypes, multi_datatypes) = cluster_datatypes(query.schemas.keys().collect());
let sem = Arc::new(Semaphore::new(source.max_concurrent_chunks as usize));
Expand Down Expand Up @@ -92,7 +85,7 @@ async fn freeze_datatype_chunk(

// create path
let (chunk, chunk_label) = chunk;
let path = match chunk.filepath(ds.name(), &sink, &chunk_label) {
let path = match chunk.filepath(&datatype, &sink, &chunk_label) {
Err(_e) => return FreezeChunkSummary::error(HashMap::new()),
Ok(path) => path,
};
Expand Down Expand Up @@ -140,13 +133,14 @@ async fn freeze_multi_datatype_chunk(

// create paths
let (chunk, chunk_label) = chunk;
let mut paths: HashMap<Datatype, String> = HashMap::new();
for ds in mdt.multi_dataset().datasets().values() {
match chunk.filepath(ds.name(), &sink, &chunk_label) {
Err(_e) => return FreezeChunkSummary::error(paths),
Ok(path) => paths.insert(ds.datatype(), path),
};
}
let paths = match chunk.filepaths(
mdt.multi_dataset().datatypes().iter().collect(),
&sink,
&chunk_label,
) {
Err(_e) => return FreezeChunkSummary::error(HashMap::new()),
Ok(paths) => paths,
};

// skip path if file already exists
if paths.values().all(|path| Path::new(&path).exists()) && !sink.overwrite {
Expand Down
26 changes: 21 additions & 5 deletions crates/freeze/src/types/chunks/chunk.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::types::{FileError, FileOutput};
use crate::types::{Datatype, FileError, FileOutput};
use std::collections::HashMap;

use super::{binary_chunk::BinaryChunk, chunk_ops::ChunkData, number_chunk::NumberChunk};

Expand Down Expand Up @@ -29,16 +30,31 @@ impl Chunk {
/// get filepath for chunk
pub fn filepath(
&self,
name: &str,
datatype: &Datatype,
file_output: &FileOutput,
chunk_label: &Option<String>,
) -> Result<String, FileError> {
match self {
Chunk::Block(chunk) => chunk.filepath(name, file_output, chunk_label),
Chunk::Transaction(chunk) => chunk.filepath(name, file_output, chunk_label),
Chunk::Address(chunk) => chunk.filepath(name, file_output, chunk_label),
Chunk::Block(chunk) => chunk.filepath(datatype, file_output, chunk_label),
Chunk::Transaction(chunk) => chunk.filepath(datatype, file_output, chunk_label),
Chunk::Address(chunk) => chunk.filepath(datatype, file_output, chunk_label),
}
}

/// get filepath for chunk
pub fn filepaths(
&self,
datatypes: Vec<&Datatype>,
file_output: &FileOutput,
chunk_label: &Option<String>,
) -> Result<HashMap<Datatype, String>, FileError> {
let mut paths = HashMap::new();
for datatype in datatypes {
let path = self.filepath(datatype, file_output, chunk_label)?;
paths.insert(*datatype, path);
}
Ok(paths)
}
}

impl From<Vec<Chunk>> for Chunk {
Expand Down
10 changes: 6 additions & 4 deletions crates/freeze/src/types/chunks/chunk_ops.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{ChunkError, FileError, FileOutput};
use crate::{ChunkError, Datatype, FileError, FileOutput};

/// Trait for common chunk methods
pub trait ChunkData: Sized {
Expand Down Expand Up @@ -30,7 +30,7 @@ pub trait ChunkData: Sized {
/// get filepath for chunk
fn filepath(
&self,
name: &str,
datatype: &Datatype,
file_output: &FileOutput,
chunk_label: &Option<String>,
) -> Result<String, FileError> {
Expand All @@ -40,8 +40,10 @@ pub trait ChunkData: Sized {
None => self.stub()?,
};
let pieces: Vec<String> = match &file_output.suffix {
Some(suffix) => vec![network_name, name.to_string(), stub, suffix.clone()],
None => vec![network_name, name.to_string(), stub],
Some(suffix) => {
vec![network_name, datatype.dataset().name().to_string(), stub, suffix.clone()]
}
None => vec![network_name, datatype.dataset().name().to_string(), stub],
};
let filename = format!("{}.{}", pieces.join("__"), file_output.format.as_str());
match file_output.output_dir.as_str() {
Expand Down
40 changes: 38 additions & 2 deletions crates/freeze/src/types/queries.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};

use ethers::prelude::*;

use crate::types::{Chunk, Datatype, Table};
use crate::{
types::{Chunk, Datatype, Table},
CollectError, FileOutput, FreezeError,
};

/// Query multiple data types
#[derive(Clone)]
Expand All @@ -28,6 +31,39 @@ pub struct MultiQuery {
pub row_filters: HashMap<Datatype, RowFilter>,
}

impl MultiQuery {
/// get number of chunks that have not yet been collected
pub fn get_n_chunks_remaining(&self, sink: &FileOutput) -> Result<u64, FreezeError> {
let actual_files: HashSet<String> = list_files(&sink.output_dir)
.map_err(|_e| {
FreezeError::CollectError(CollectError::CollectError(
"could not list files in output dir".to_string(),
))
})?
.into_iter()
.collect();
let mut n_chunks_remaining: u64 = 0;
for (chunk, chunk_label) in &self.chunks {
let chunk_files = chunk.filepaths(self.schemas.keys().collect(), sink, chunk_label)?;
if !chunk_files.values().all(|file| actual_files.contains(file)) {
n_chunks_remaining += 1;
}
}
Ok(n_chunks_remaining)
}
}

fn list_files(dir: &str) -> Result<Vec<String>, std::io::Error> {
let mut file_list = Vec::new();
for entry in std::fs::read_dir(dir)? {
let entry = entry?;
if let Some(filename) = entry.path().to_str() {
file_list.push(filename.to_string());
}
}
Ok(file_list)
}

/// Options for fetching logs
#[derive(Clone)]
pub struct RowFilter {
Expand Down

0 comments on commit 04b952e

Please sign in to comment.