Skip to content

Commit

Permalink
Merge pull request #4447 from Freaky/dd-alarm-timer
Browse files Browse the repository at this point in the history
dd: use an alarm thread instead of elapsed() calls
  • Loading branch information
sylvestre committed Jun 19, 2023
2 parents dffbb32 + e7557c2 commit 66723e0
Showing 1 changed file with 54 additions and 8 deletions.
62 changes: 54 additions & 8 deletions src/uu/dd/src/dd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,12 @@ use std::os::unix::{
io::{AsRawFd, FromRawFd},
};
use std::path::Path;
use std::sync::mpsc;
use std::sync::{
atomic::{AtomicBool, Ordering::Relaxed},
mpsc, Arc,
};
use std::thread;
use std::time;
use std::time::{Duration, Instant};

use clap::{crate_version, Arg, Command};
use gcd::Gcd;
Expand Down Expand Up @@ -75,6 +78,45 @@ struct Settings {
status: Option<StatusLevel>,
}

/// A timer which triggers on a given interval
///
/// After being constructed with [`Alarm::with_interval`], [`Alarm::is_triggered`]
/// will return true once per the given [`Duration`].
///
/// Can be cloned, but the trigger status is shared across all instances so only
/// the first caller each interval will yield true.
///
/// When all instances are dropped the background thread will exit on the next interval.
#[derive(Debug, Clone)]
pub struct Alarm {
interval: Duration,
trigger: Arc<AtomicBool>,
}

impl Alarm {
pub fn with_interval(interval: Duration) -> Self {
let trigger = Arc::new(AtomicBool::default());

let weak_trigger = Arc::downgrade(&trigger);
thread::spawn(move || {
while let Some(trigger) = weak_trigger.upgrade() {
thread::sleep(interval);
trigger.store(true, Relaxed);
}
});

Self { interval, trigger }
}

pub fn is_triggered(&self) -> bool {
self.trigger.swap(false, Relaxed)
}

pub fn get_interval(&self) -> Duration {
self.interval
}
}

/// A number in blocks or bytes
///
/// Some values (seek, skip, iseek, oseek) can have values either in blocks or in bytes.
Expand Down Expand Up @@ -760,7 +802,7 @@ fn dd_copy(mut i: Input, mut o: Output) -> std::io::Result<()> {
// of its report includes the throughput in bytes per second,
// which requires knowing how long the process has been
// running.
let start = time::Instant::now();
let start = Instant::now();

// A good buffer size for reading.
//
Expand All @@ -780,7 +822,6 @@ fn dd_copy(mut i: Input, mut o: Output) -> std::io::Result<()> {
// information.
let (prog_tx, rx) = mpsc::channel();
let output_thread = thread::spawn(gen_prog_updater(rx, i.settings.status));
let mut progress_as_secs = 0;

// Optimization: if no blocks are to be written, then don't
// bother allocating any buffers.
Expand Down Expand Up @@ -813,6 +854,12 @@ fn dd_copy(mut i: Input, mut o: Output) -> std::io::Result<()> {
// This is the max size needed.
let mut buf = vec![BUF_INIT_BYTE; bsize];

// Spawn a timer thread to provide a scheduled signal indicating when we
// should send an update of our progress to the reporting thread.
//
// This avoids the need to query the OS monotonic clock for every block.
let alarm = Alarm::with_interval(Duration::from_secs(1));

// Index in the input file where we are reading bytes and in
// the output file where we are writing bytes.
//
Expand Down Expand Up @@ -871,9 +918,8 @@ fn dd_copy(mut i: Input, mut o: Output) -> std::io::Result<()> {
// error.
rstat += rstat_update;
wstat += wstat_update;
let prog_update = ProgUpdate::new(rstat, wstat, start.elapsed(), false);
if prog_update.duration.as_secs() >= progress_as_secs {
progress_as_secs = prog_update.duration.as_secs() + 1;
if alarm.is_triggered() {
let prog_update = ProgUpdate::new(rstat, wstat, start.elapsed(), false);
prog_tx.send(prog_update).unwrap_or(());
}
}
Expand All @@ -885,7 +931,7 @@ fn finalize<T>(
output: &mut Output,
rstat: ReadStat,
wstat: WriteStat,
start: time::Instant,
start: Instant,
prog_tx: &mpsc::Sender<ProgUpdate>,
output_thread: thread::JoinHandle<T>,
) -> std::io::Result<()> {
Expand Down

0 comments on commit 66723e0

Please sign in to comment.