diff --git a/src/uu/dd/src/bufferedoutput.rs b/src/uu/dd/src/bufferedoutput.rs new file mode 100644 index 0000000000..6ac3b43004 --- /dev/null +++ b/src/uu/dd/src/bufferedoutput.rs @@ -0,0 +1,206 @@ +// This file is part of the uutils coreutils package. +// +// For the full copyright and license information, please view the LICENSE +// file that was distributed with this source code. +// +// spell-checker:ignore wstat towrite cdefg bufferedoutput +//! Buffer partial output blocks until they are completed. +//! +//! Use the [`BufferedOutput`] struct to create a buffered form of the +//! [`Output`] writer. +use crate::{Output, WriteStat}; + +/// Buffer partial output blocks until they are completed. +/// +/// Complete blocks are written immediately to the inner [`Output`], +/// but partial blocks are stored in an internal buffer until they are +/// completed. +pub(crate) struct BufferedOutput<'a> { + /// The unbuffered inner block writer. + inner: Output<'a>, + + /// The internal buffer that stores a partial block. + /// + /// The size of this buffer is always less than the output block + /// size (that is, the value of the `obs` command-line option). + buf: Vec, +} + +impl<'a> BufferedOutput<'a> { + /// Add partial block buffering to the given block writer. + /// + /// The internal buffer size is at most the value of `obs` as + /// defined in `inner`. + pub(crate) fn new(inner: Output<'a>) -> Self { + let obs = inner.settings.obs; + Self { + inner, + buf: Vec::with_capacity(obs), + } + } + + pub(crate) fn discard_cache(&self, offset: libc::off_t, len: libc::off_t) { + self.inner.discard_cache(offset, len); + } + + /// Flush the partial block stored in the internal buffer. + pub(crate) fn flush(&mut self) -> std::io::Result { + let wstat = self.inner.write_blocks(&self.buf)?; + let n = wstat.bytes_total.try_into().unwrap(); + self.buf.drain(0..n); + Ok(wstat) + } + + /// Synchronize the inner block writer. + pub(crate) fn sync(&mut self) -> std::io::Result<()> { + self.inner.sync() + } + + /// Truncate the underlying file to the current stream position, if possible. + pub(crate) fn truncate(&mut self) -> std::io::Result<()> { + self.inner.dst.truncate() + } + + /// Write the given bytes one block at a time. + /// + /// Only complete blocks will be written. Partial blocks will be + /// buffered until enough bytes have been provided to complete a + /// block. The returned [`WriteStat`] object will include the + /// number of blocks written during execution of this function. + pub(crate) fn write_blocks(&mut self, buf: &[u8]) -> std::io::Result { + // Split the incoming buffer into two parts: the bytes to write + // and the bytes to buffer for next time. + // + // If `buf` does not include enough bytes to form a full block, + // just buffer the whole thing and write zero blocks. + let n = self.buf.len() + buf.len(); + let rem = n % self.inner.settings.obs; + let i = buf.len().saturating_sub(rem); + let (to_write, to_buffer) = buf.split_at(i); + + // Concatenate the old partial block with the new bytes to form + // some number of complete blocks. + self.buf.extend_from_slice(to_write); + + // Write all complete blocks to the inner block writer. + // + // For example, if the output block size were 3, the buffered + // partial block were `b"ab"` and the new incoming bytes were + // `b"cdefg"`, then we would write blocks `b"abc"` and + // b`"def"` to the inner block writer. + let wstat = self.inner.write_blocks(&self.buf)?; + + // Buffer any remaining bytes as a partial block. + // + // Continuing the example above, the last byte `b"g"` would be + // buffered as a partial block until the next call to + // `write_blocks()`. + self.buf.clear(); + self.buf.extend_from_slice(to_buffer); + + Ok(wstat) + } +} + +#[cfg(unix)] +#[cfg(test)] +mod tests { + use crate::bufferedoutput::BufferedOutput; + use crate::{Dest, Output, Settings}; + + #[test] + fn test_buffered_output_write_blocks_empty() { + let settings = Settings { + obs: 3, + ..Default::default() + }; + let inner = Output { + dst: Dest::Sink, + settings: &settings, + }; + let mut output = BufferedOutput::new(inner); + let wstat = output.write_blocks(&[]).unwrap(); + assert_eq!(wstat.writes_complete, 0); + assert_eq!(wstat.writes_partial, 0); + assert_eq!(wstat.bytes_total, 0); + assert_eq!(output.buf, vec![]); + } + + #[test] + fn test_buffered_output_write_blocks_partial() { + let settings = Settings { + obs: 3, + ..Default::default() + }; + let inner = Output { + dst: Dest::Sink, + settings: &settings, + }; + let mut output = BufferedOutput::new(inner); + let wstat = output.write_blocks(b"ab").unwrap(); + assert_eq!(wstat.writes_complete, 0); + assert_eq!(wstat.writes_partial, 0); + assert_eq!(wstat.bytes_total, 0); + assert_eq!(output.buf, b"ab"); + } + + #[test] + fn test_buffered_output_write_blocks_complete() { + let settings = Settings { + obs: 3, + ..Default::default() + }; + let inner = Output { + dst: Dest::Sink, + settings: &settings, + }; + let mut output = BufferedOutput::new(inner); + let wstat = output.write_blocks(b"abcd").unwrap(); + assert_eq!(wstat.writes_complete, 1); + assert_eq!(wstat.writes_partial, 0); + assert_eq!(wstat.bytes_total, 3); + assert_eq!(output.buf, b"d"); + } + + #[test] + fn test_buffered_output_write_blocks_append() { + let settings = Settings { + obs: 3, + ..Default::default() + }; + let inner = Output { + dst: Dest::Sink, + settings: &settings, + }; + let mut output = BufferedOutput { + inner, + buf: b"ab".to_vec(), + }; + let wstat = output.write_blocks(b"cdefg").unwrap(); + assert_eq!(wstat.writes_complete, 2); + assert_eq!(wstat.writes_partial, 0); + assert_eq!(wstat.bytes_total, 6); + assert_eq!(output.buf, b"g"); + } + + #[test] + fn test_buffered_output_flush() { + let settings = Settings { + obs: 10, + ..Default::default() + }; + let inner = Output { + dst: Dest::Sink, + settings: &settings, + }; + let mut output = BufferedOutput { + inner, + buf: b"abc".to_vec(), + }; + let wstat = output.flush().unwrap(); + assert_eq!(wstat.writes_complete, 0); + assert_eq!(wstat.writes_partial, 1); + assert_eq!(wstat.bytes_total, 3); + assert_eq!(output.buf, vec![]); + } +} diff --git a/src/uu/dd/src/dd.rs b/src/uu/dd/src/dd.rs index b79ae22da4..645c249676 100644 --- a/src/uu/dd/src/dd.rs +++ b/src/uu/dd/src/dd.rs @@ -3,23 +3,21 @@ // For the full copyright and license information, please view the LICENSE // file that was distributed with this source code. -// spell-checker:ignore fname, ftype, tname, fpath, specfile, testfile, unspec, ifile, ofile, outfile, fullblock, urand, fileio, atoe, atoibm, behaviour, bmax, bremain, cflags, creat, ctable, ctty, datastructures, doesnt, etoa, fileout, fname, gnudd, iconvflags, iseek, nocache, noctty, noerror, nofollow, nolinks, nonblock, oconvflags, oseek, outfile, parseargs, rlen, rmax, rremain, rsofar, rstat, sigusr, wlen, wstat seekable oconv canonicalized fadvise Fadvise FADV DONTNEED ESPIPE +// spell-checker:ignore fname, ftype, tname, fpath, specfile, testfile, unspec, ifile, ofile, outfile, fullblock, urand, fileio, atoe, atoibm, behaviour, bmax, bremain, cflags, creat, ctable, ctty, datastructures, doesnt, etoa, fileout, fname, gnudd, iconvflags, iseek, nocache, noctty, noerror, nofollow, nolinks, nonblock, oconvflags, oseek, outfile, parseargs, rlen, rmax, rremain, rsofar, rstat, sigusr, wlen, wstat seekable oconv canonicalized fadvise Fadvise FADV DONTNEED ESPIPE bufferedoutput +mod blocks; +mod bufferedoutput; +mod conversion_tables; mod datastructures; -use datastructures::*; - +mod numbers; mod parseargs; -use parseargs::Parser; - -mod conversion_tables; - mod progress; -use progress::{gen_prog_updater, ProgUpdate, ReadStat, StatusLevel, WriteStat}; -mod blocks; +use crate::bufferedoutput::BufferedOutput; use blocks::conv_block_unblock_helper; - -mod numbers; +use datastructures::*; +use parseargs::Parser; +use progress::{gen_prog_updater, ProgUpdate, ReadStat, StatusLevel, WriteStat}; use std::cmp; use std::env; @@ -76,6 +74,8 @@ struct Settings { oconv: OConvFlags, oflags: OFlags, status: Option, + /// Whether the output writer should buffer partial blocks until complete. + buffered: bool, } /// A timer which triggers on a given interval @@ -128,6 +128,12 @@ enum Num { Bytes(u64), } +impl Default for Num { + fn default() -> Self { + Self::Blocks(0) + } +} + impl Num { fn force_bytes_if(self, force: bool) -> Self { match self { @@ -796,6 +802,68 @@ impl<'a> Output<'a> { Ok(()) } } + + /// Truncate the underlying file to the current stream position, if possible. + fn truncate(&mut self) -> std::io::Result<()> { + self.dst.truncate() + } +} + +/// The block writer either with or without partial block buffering. +enum BlockWriter<'a> { + /// Block writer with partial block buffering. + /// + /// Partial blocks are buffered until completed. + Buffered(BufferedOutput<'a>), + + /// Block writer without partial block buffering. + /// + /// Partial blocks are written immediately. + Unbuffered(Output<'a>), +} + +impl<'a> BlockWriter<'a> { + fn discard_cache(&self, offset: libc::off_t, len: libc::off_t) { + match self { + Self::Unbuffered(o) => o.discard_cache(offset, len), + Self::Buffered(o) => o.discard_cache(offset, len), + } + } + + fn flush(&mut self) -> io::Result { + match self { + Self::Unbuffered(_) => Ok(WriteStat::default()), + Self::Buffered(o) => o.flush(), + } + } + + fn sync(&mut self) -> io::Result<()> { + match self { + Self::Unbuffered(o) => o.sync(), + Self::Buffered(o) => o.sync(), + } + } + + /// Truncate the file to the final cursor location. + fn truncate(&mut self) { + // Calling `set_len()` may result in an error (for example, + // when calling it on `/dev/null`), but we don't want to + // terminate the process when that happens. Instead, we + // suppress the error by calling `Result::ok()`. This matches + // the behavior of GNU `dd` when given the command-line + // argument `of=/dev/null`. + match self { + Self::Unbuffered(o) => o.truncate().ok(), + Self::Buffered(o) => o.truncate().ok(), + }; + } + + fn write_blocks(&mut self, buf: &[u8]) -> std::io::Result { + match self { + Self::Unbuffered(o) => o.write_blocks(buf), + Self::Buffered(o) => o.write_blocks(buf), + } + } } /// Copy the given input data to this output, consuming both. @@ -809,7 +877,7 @@ impl<'a> Output<'a> { /// /// If there is a problem reading from the input or writing to /// this output. -fn dd_copy(mut i: Input, mut o: Output) -> std::io::Result<()> { +fn dd_copy(mut i: Input, o: Output) -> std::io::Result<()> { // The read and write statistics. // // These objects are counters, initialized to zero. After each @@ -846,6 +914,9 @@ fn dd_copy(mut i: Input, mut o: Output) -> std::io::Result<()> { let (prog_tx, rx) = mpsc::channel(); let output_thread = thread::spawn(gen_prog_updater(rx, i.settings.status)); + // Whether to truncate the output file after all blocks have been written. + let truncate = !o.settings.oconv.notrunc; + // Optimization: if no blocks are to be written, then don't // bother allocating any buffers. if let Some(Num::Blocks(0) | Num::Bytes(0)) = i.settings.count { @@ -870,7 +941,15 @@ fn dd_copy(mut i: Input, mut o: Output) -> std::io::Result<()> { let len = o.dst.len()?.try_into().unwrap(); o.discard_cache(offset, len); } - return finalize(&mut o, rstat, wstat, start, &prog_tx, output_thread); + return finalize( + BlockWriter::Unbuffered(o), + rstat, + wstat, + start, + &prog_tx, + output_thread, + truncate, + ); }; // Create a common buffer with a capacity of the block size. @@ -890,13 +969,23 @@ fn dd_copy(mut i: Input, mut o: Output) -> std::io::Result<()> { let mut read_offset = 0; let mut write_offset = 0; + let input_nocache = i.settings.iflags.nocache; + let output_nocache = o.settings.oflags.nocache; + + // Add partial block buffering, if needed. + let mut o = if o.settings.buffered { + BlockWriter::Buffered(BufferedOutput::new(o)) + } else { + BlockWriter::Unbuffered(o) + }; + // The main read/write loop. // // Each iteration reads blocks from the input and writes // blocks to this output. Read/write statistics are updated on // each iteration and cumulative statistics are reported to // the progress reporting thread. - while below_count_limit(&i.settings.count, &rstat, &wstat) { + while below_count_limit(&i.settings.count, &rstat) { // Read a block from the input then write the block to the output. // // As an optimization, make an educated guess about the @@ -914,7 +1003,7 @@ fn dd_copy(mut i: Input, mut o: Output) -> std::io::Result<()> { // // TODO Better error handling for overflowing `offset` and `len`. let read_len = rstat_update.bytes_total; - if i.settings.iflags.nocache { + if input_nocache { let offset = read_offset.try_into().unwrap(); let len = read_len.try_into().unwrap(); i.discard_cache(offset, len); @@ -926,7 +1015,7 @@ fn dd_copy(mut i: Input, mut o: Output) -> std::io::Result<()> { // // TODO Better error handling for overflowing `offset` and `len`. let write_len = wstat_update.bytes_total; - if o.settings.oflags.nocache { + if output_nocache { let offset = write_offset.try_into().unwrap(); let len = write_len.try_into().unwrap(); o.discard_cache(offset, len); @@ -946,34 +1035,33 @@ fn dd_copy(mut i: Input, mut o: Output) -> std::io::Result<()> { prog_tx.send(prog_update).unwrap_or(()); } } - finalize(&mut o, rstat, wstat, start, &prog_tx, output_thread) + finalize(o, rstat, wstat, start, &prog_tx, output_thread, truncate) } /// Flush output, print final stats, and join with the progress thread. fn finalize( - output: &mut Output, + mut output: BlockWriter, rstat: ReadStat, wstat: WriteStat, start: Instant, prog_tx: &mpsc::Sender, output_thread: thread::JoinHandle, + truncate: bool, ) -> std::io::Result<()> { - // Flush the output, if configured to do so. + // Flush the output in case a partial write has been buffered but + // not yet written. + let wstat_update = output.flush()?; + + // Sync the output, if configured to do so. output.sync()?; // Truncate the file to the final cursor location. - // - // Calling `set_len()` may result in an error (for example, - // when calling it on `/dev/null`), but we don't want to - // terminate the process when that happens. Instead, we - // suppress the error by calling `Result::ok()`. This matches - // the behavior of GNU `dd` when given the command-line - // argument `of=/dev/null`. - if !output.settings.oconv.notrunc { - output.dst.truncate().ok(); + if truncate { + output.truncate(); } // Print the final read/write statistics. + let wstat = wstat + wstat_update; let prog_update = ProgUpdate::new(rstat, wstat, start.elapsed(), true); prog_tx.send(prog_update).unwrap_or(()); // Wait for the output thread to finish @@ -1103,16 +1191,10 @@ fn calc_loop_bsize( // Decide if the current progress is below a count=N limit or return // true if no such limit is set. -fn below_count_limit(count: &Option, rstat: &ReadStat, wstat: &WriteStat) -> bool { +fn below_count_limit(count: &Option, rstat: &ReadStat) -> bool { match count { - Some(Num::Blocks(n)) => { - let n = *n; - rstat.reads_complete + rstat.reads_partial <= n - } - Some(Num::Bytes(n)) => { - let n = (*n).try_into().unwrap(); - wstat.bytes_total <= n - } + Some(Num::Blocks(n)) => rstat.reads_complete + rstat.reads_partial < *n, + Some(Num::Bytes(n)) => rstat.bytes_total < *n, None => true, } } diff --git a/src/uu/dd/src/parseargs.rs b/src/uu/dd/src/parseargs.rs index 0ff6e752c0..60ce9a6971 100644 --- a/src/uu/dd/src/parseargs.rs +++ b/src/uu/dd/src/parseargs.rs @@ -35,41 +35,28 @@ pub enum ParseError { } /// Contains a temporary state during parsing of the arguments -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Default)] pub struct Parser { infile: Option, outfile: Option, - ibs: usize, - obs: usize, + /// The block size option specified on the command-line, if any. + bs: Option, + /// The input block size option specified on the command-line, if any. + ibs: Option, + /// The output block size option specified on the command-line, if any. + obs: Option, cbs: Option, skip: Num, seek: Num, count: Option, conv: ConvFlags, + /// Whether a data-transforming `conv` option has been specified. + is_conv_specified: bool, iflag: IFlags, oflag: OFlags, status: Option, } -impl Default for Parser { - fn default() -> Self { - Self { - ibs: 512, - obs: 512, - cbs: None, - infile: None, - outfile: None, - skip: Num::Blocks(0), - seek: Num::Blocks(0), - count: None, - conv: ConvFlags::default(), - iflag: IFlags::default(), - oflag: OFlags::default(), - status: None, - } - } -} - #[derive(Debug, Default, PartialEq, Eq)] pub struct ConvFlags { ascii: bool, @@ -212,15 +199,34 @@ impl Parser { fsync: conv.fsync, }; + // Input and output block sizes. + // + // The `bs` option takes precedence. If either is not + // provided, `ibs` and `obs` are each 512 bytes by default. + let (ibs, obs) = match self.bs { + None => (self.ibs.unwrap_or(512), self.obs.unwrap_or(512)), + Some(bs) => (bs, bs), + }; + + // Whether to buffer partial output blocks until they are completed. + // + // From the GNU `dd` documentation for the `bs=BYTES` option: + // + // > [...] if no data-transforming 'conv' option is specified, + // > input is copied to the output as soon as it's read, even if + // > it is smaller than the block size. + // + let buffered = self.bs.is_none() || self.is_conv_specified; + let skip = self .skip .force_bytes_if(self.iflag.skip_bytes) - .to_bytes(self.ibs as u64); + .to_bytes(ibs as u64); let seek = self .seek .force_bytes_if(self.oflag.seek_bytes) - .to_bytes(self.obs as u64); + .to_bytes(obs as u64); let count = self.count.map(|c| c.force_bytes_if(self.iflag.count_bytes)); @@ -230,8 +236,9 @@ impl Parser { count, iconv, oconv, - ibs: self.ibs, - obs: self.obs, + ibs, + obs, + buffered, infile: self.infile, outfile: self.outfile, iflags: self.iflag, @@ -244,18 +251,17 @@ impl Parser { match operand.split_once('=') { None => return Err(ParseError::UnrecognizedOperand(operand.to_string())), Some((k, v)) => match k { - "bs" => { - let bs = Self::parse_bytes(k, v)?; - self.ibs = bs; - self.obs = bs; - } + "bs" => self.bs = Some(Self::parse_bytes(k, v)?), "cbs" => self.cbs = Some(Self::parse_bytes(k, v)?), - "conv" => self.parse_conv_flags(v)?, + "conv" => { + self.is_conv_specified = true; + self.parse_conv_flags(v)?; + } "count" => self.count = Some(Self::parse_n(v)?), - "ibs" => self.ibs = Self::parse_bytes(k, v)?, + "ibs" => self.ibs = Some(Self::parse_bytes(k, v)?), "if" => self.infile = Some(v.to_string()), "iflag" => self.parse_input_flags(v)?, - "obs" => self.obs = Self::parse_bytes(k, v)?, + "obs" => self.obs = Some(Self::parse_bytes(k, v)?), "of" => self.outfile = Some(v.to_string()), "oflag" => self.parse_output_flags(v)?, "seek" | "oseek" => self.seek = Self::parse_n(v)?, diff --git a/src/uu/dd/src/parseargs/unit_tests.rs b/src/uu/dd/src/parseargs/unit_tests.rs index 142e49fd0b..51b0933e92 100644 --- a/src/uu/dd/src/parseargs/unit_tests.rs +++ b/src/uu/dd/src/parseargs/unit_tests.rs @@ -358,6 +358,7 @@ fn parse_icf_tokens_remaining() { fsync: true, ..Default::default() }, + is_conv_specified: true, ..Default::default() }) ); diff --git a/src/uu/dd/src/progress.rs b/src/uu/dd/src/progress.rs index 4fe04cb0e6..ac7517c2c0 100644 --- a/src/uu/dd/src/progress.rs +++ b/src/uu/dd/src/progress.rs @@ -379,6 +379,17 @@ impl std::ops::AddAssign for WriteStat { } } +impl std::ops::Add for WriteStat { + type Output = Self; + fn add(self, other: Self) -> Self { + Self { + writes_complete: self.writes_complete + other.writes_complete, + writes_partial: self.writes_partial + other.writes_partial, + bytes_total: self.bytes_total + other.bytes_total, + } + } +} + /// How much detail to report when printing transfer statistics. /// /// This corresponds to the available settings of the `status` diff --git a/tests/by-util/test_dd.rs b/tests/by-util/test_dd.rs index d5ac8dc801..a4c70097c8 100644 --- a/tests/by-util/test_dd.rs +++ b/tests/by-util/test_dd.rs @@ -2,7 +2,7 @@ // // For the full copyright and license information, please view the LICENSE // file that was distributed with this source code. -// spell-checker:ignore fname, tname, fpath, specfile, testfile, unspec, ifile, ofile, outfile, fullblock, urand, fileio, atoe, atoibm, availible, behaviour, bmax, bremain, btotal, cflags, creat, ctable, ctty, datastructures, doesnt, etoa, fileout, fname, gnudd, iconvflags, iseek, nocache, noctty, noerror, nofollow, nolinks, nonblock, oconvflags, oseek, outfile, parseargs, rlen, rmax, rposition, rremain, rsofar, rstat, sigusr, sigval, wlen, wstat abcdefghijklm abcdefghi nabcde nabcdefg abcdefg +// spell-checker:ignore fname, tname, fpath, specfile, testfile, unspec, ifile, ofile, outfile, fullblock, urand, fileio, atoe, atoibm, availible, behaviour, bmax, bremain, btotal, cflags, creat, ctable, ctty, datastructures, doesnt, etoa, fileout, fname, gnudd, iconvflags, iseek, nocache, noctty, noerror, nofollow, nolinks, nonblock, oconvflags, oseek, outfile, parseargs, rlen, rmax, rposition, rremain, rsofar, rstat, sigusr, sigval, wlen, wstat abcdefghijklm abcdefghi nabcde nabcdefg abcdefg fifoname #[cfg(unix)] use crate::common::util::run_ucmd_as_root_with_stdin_stdout; @@ -15,6 +15,8 @@ use regex::Regex; use std::fs::{File, OpenOptions}; use std::io::{BufReader, Read, Write}; use std::path::PathBuf; +#[cfg(all(unix, not(target_os = "macos"), not(target_os = "freebsd")))] +use std::process::{Command, Stdio}; #[cfg(not(windows))] use std::thread::sleep; #[cfg(not(windows))] @@ -1582,3 +1584,77 @@ fn test_seek_past_dev() { print!("TEST SKIPPED"); } } + +#[test] +#[cfg(all(unix, not(target_os = "macos"), not(target_os = "freebsd")))] +fn test_reading_partial_blocks_from_fifo() { + // Create the FIFO. + let ts = TestScenario::new(util_name!()); + let at = ts.fixtures.clone(); + at.mkfifo("fifo"); + let fifoname = at.plus_as_string("fifo"); + + // Start a `dd` process that reads from the fifo (so it will wait + // until the writer process starts). + let mut reader_command = Command::new(TESTS_BINARY); + let child = reader_command + .args(["dd", "ibs=3", "obs=3", &format!("if={}", fifoname)]) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn() + .unwrap(); + + // Start different processes to write to the FIFO, with a small + // pause in between. + let mut writer_command = Command::new("sh"); + writer_command + .args([ + "-c", + &format!("(printf \"ab\"; sleep 0.1; printf \"cd\") > {}", fifoname), + ]) + .spawn() + .unwrap(); + + let output = child.wait_with_output().unwrap(); + assert_eq!(output.stdout, b"abcd"); + let expected = b"0+2 records in\n1+1 records out\n4 bytes copied"; + assert!(output.stderr.starts_with(expected)); +} + +#[test] +#[cfg(all(unix, not(target_os = "macos"), not(target_os = "freebsd")))] +fn test_reading_partial_blocks_from_fifo_unbuffered() { + // Create the FIFO. + let ts = TestScenario::new(util_name!()); + let at = ts.fixtures.clone(); + at.mkfifo("fifo"); + let fifoname = at.plus_as_string("fifo"); + + // Start a `dd` process that reads from the fifo (so it will wait + // until the writer process starts). + // + // `bs=N` takes precedence over `ibs=N` and `obs=N`. + let mut reader_command = Command::new(TESTS_BINARY); + let child = reader_command + .args(["dd", "bs=3", "ibs=1", "obs=1", &format!("if={}", fifoname)]) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn() + .unwrap(); + + // Start different processes to write to the FIFO, with a small + // pause in between. + let mut writer_command = Command::new("sh"); + writer_command + .args([ + "-c", + &format!("(printf \"ab\"; sleep 0.1; printf \"cd\") > {}", fifoname), + ]) + .spawn() + .unwrap(); + + let output = child.wait_with_output().unwrap(); + assert_eq!(output.stdout, b"abcd"); + let expected = b"0+2 records in\n0+2 records out\n4 bytes copied"; + assert!(output.stderr.starts_with(expected)); +}