Skip to content

Commit

Permalink
- removed indicatif progress bar
Browse files Browse the repository at this point in the history
- prevent loop back into slot 0 after syncing
  • Loading branch information
Mercurial committed Jun 25, 2024
1 parent dbcb618 commit 1df6676
Showing 1 changed file with 78 additions and 41 deletions.
119 changes: 78 additions & 41 deletions src/sources/mithril.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use gasket::framework::*;
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use itertools::Itertools;
use miette::{Context as _, IntoDiagnostic as _};
use mithril_client::{ClientBuilder, MessageBuilder, MithrilError, MithrilResult};
Expand All @@ -8,51 +7,85 @@ use pallas::{
network::miniprotocols::Point::{self, *},
};
use serde::Deserialize;
use std::{path::Path, sync::Arc};
use tracing::warn;
use std::{
path::Path,
sync::{Arc, Mutex},
time::{Duration, Instant},
};
use tracing::{info, warn};

use crate::framework::*;

struct Feedback {
_multi: MultiProgress,
download_pb: ProgressBar,
validate_pb: ProgressBar,
progress_logger: Arc<Mutex<ProgressLogger>>,
}

impl Feedback {
fn indeterminate_progress_bar(owner: &mut MultiProgress) -> ProgressBar {
let pb = ProgressBar::new_spinner();

pb.set_style(
ProgressStyle::with_template("{spinner:.green} [{elapsed_precise}] {msg}").unwrap(),
);

owner.add(pb)
fn new(log_interval: Duration) -> Self {
Self {
progress_logger: Arc::new(Mutex::new(ProgressLogger::new(log_interval))),
}
}

fn bytes_progress_bar(owner: &mut MultiProgress) -> ProgressBar {
let pb = ProgressBar::new_spinner();

pb.set_style(
ProgressStyle::with_template(
"{spinner:.green} [{elapsed_precise}] {bar:40.cyan/blue} {bytes}/{total_bytes} (eta: {eta}) {msg}",
)
.unwrap()
.progress_chars("#>-"),
);

owner.add(pb)
fn log_progress(&self, downloaded_bytes: u64, size: u64) {
if let Ok(mut logger) = self.progress_logger.lock() {
logger.log(downloaded_bytes, size);
}
}
}

impl Default for Feedback {
fn default() -> Self {
let mut multi = MultiProgress::new();
Self::new(Duration::from_secs(10))
}
}

struct ProgressLogger {
last_log_time: Option<Instant>,
log_interval: Duration,
initial_logged: bool,
}

impl ProgressLogger {
fn new(log_interval: Duration) -> Self {
Self {
download_pb: Self::bytes_progress_bar(&mut multi),
validate_pb: Self::indeterminate_progress_bar(&mut multi),
_multi: multi,
last_log_time: None,
log_interval,
initial_logged: false,
}
}

fn log(&mut self, downloaded_bytes: u64, size: u64) {
let now = Instant::now();
let percentage = (downloaded_bytes as f64 / size as f64 * 100.0).round() as u64;

if !self.initial_logged {
info!(
"Initial snapshot download progress: {}% ({}/{} bytes)",
percentage, downloaded_bytes, size
);
self.initial_logged = true;
self.last_log_time = Some(now);
return;
}

if downloaded_bytes == size {
info!(
"Snapshot download complete: 100% ({}/{} bytes)",
downloaded_bytes, size
);
self.last_log_time = Some(now);
return;
}

if let Some(last_time) = self.last_log_time {
if now.duration_since(last_time) >= self.log_interval {
info!(
"Snapshot download progress: {}% ({}/{} bytes)",
percentage, downloaded_bytes, size
);
self.last_log_time = Some(now);
}
}
}
}
Expand All @@ -62,35 +95,31 @@ impl mithril_client::feedback::FeedbackReceiver for Feedback {
async fn handle_event(&self, event: mithril_client::feedback::MithrilEvent) {
match event {
mithril_client::feedback::MithrilEvent::SnapshotDownloadStarted { .. } => {
self.download_pb.set_message("snapshot download started")
info!("snapshot download started");
}
mithril_client::feedback::MithrilEvent::SnapshotDownloadProgress {
downloaded_bytes,
size,
..
} => {
self.download_pb.set_length(size);
self.download_pb.set_position(downloaded_bytes);
self.download_pb.set_message("downloading Mithril snapshot");
self.log_progress(downloaded_bytes, size);
}
mithril_client::feedback::MithrilEvent::SnapshotDownloadCompleted { .. } => {
self.download_pb.set_message("snapshot download completed");
info!("snapshot download completed");
}
mithril_client::feedback::MithrilEvent::CertificateChainValidationStarted {
..
} => {
self.validate_pb
.set_message("certificate chain validation started");
info!("certificate chain validation started");
}
mithril_client::feedback::MithrilEvent::CertificateValidated {
certificate_hash: hash,
..
} => {
self.validate_pb
.set_message(format!("validating cert: {hash}"));
info!("certificate validated: {hash}");
}
mithril_client::feedback::MithrilEvent::CertificateChainValidated { .. } => {
self.validate_pb.set_message("certificate chain validated");
info!("certificate chain validation completed");
}
}
}
Expand Down Expand Up @@ -158,6 +187,7 @@ pub struct Stage {

pub struct Worker {
config: Config,
is_bootstrapped: bool,
}

impl Worker {}
Expand Down Expand Up @@ -186,11 +216,16 @@ impl gasket::framework::Worker<Stage> for Worker {

Ok(Self {
config: stage.config.clone(),
is_bootstrapped: false,
})
}

async fn schedule(&mut self, _stage: &mut Stage) -> Result<WorkSchedule<()>, WorkerError> {
Ok(WorkSchedule::Unit(()))
if self.is_bootstrapped {
return Ok(WorkSchedule::Done);
} else {
Ok(WorkSchedule::Unit(()))
}
}

async fn execute(&mut self, _unit: &(), stage: &mut Stage) -> Result<(), WorkerError> {
Expand Down Expand Up @@ -225,6 +260,8 @@ impl gasket::framework::Worker<Stage> for Worker {
}
}

self.is_bootstrapped = true;

Ok(())
}
}
Expand Down

0 comments on commit 1df6676

Please sign in to comment.