Skip to content

Commit

Permalink
split: --filter and brokenpipe handling
Browse files Browse the repository at this point in the history
  • Loading branch information
zhitkoff committed Oct 17, 2023
1 parent 0a7fe3c commit bd3f26f
Show file tree
Hide file tree
Showing 2 changed files with 153 additions and 47 deletions.
170 changes: 123 additions & 47 deletions src/uu/split/src/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -746,6 +746,9 @@ enum SettingsError {
/// Multiple different separator characters
MultipleSeparatorCharacters,

/// Using `--filter` with Kth chunk options that output to stdout
FilterWithKthChunkNumber,

/// The `--filter` option is not supported on Windows.
#[cfg(windows)]
NotSupported,
Expand Down Expand Up @@ -778,6 +781,9 @@ impl fmt::Display for SettingsError {
"invalid suffix {}, contains directory separator",
s.quote()
),
Self::FilterWithKthChunkNumber => {
write!(f, "--filter does not process a chunk extracted to stdout")
}
#[cfg(windows)]
Self::NotSupported => write!(
f,
Expand Down Expand Up @@ -848,12 +854,26 @@ impl Settings {
filter: matches.get_one::<String>(OPT_FILTER).map(|s| s.to_owned()),
elide_empty_files: matches.get_flag(OPT_ELIDE_EMPTY_FILES),
};

#[cfg(windows)]
if result.filter.is_some() {
// see https://github.com/rust-lang/rust/issues/29494
return Err(SettingsError::NotSupported);
}

// Return an error if `--filter` option is used with any of the
// Kth chunk sub-strategies of `--number` option
// As those are writing to stdout of `split` and cannot write to filter command child process
let kth_chunk = matches!(
result.strategy,
Strategy::Number(NumberType::KthBytes(_, _))
| Strategy::Number(NumberType::KthLines(_, _))
| Strategy::Number(NumberType::KthRoundRobin(_, _))
);
if kth_chunk && result.filter.is_some() {
return Err(SettingsError::FilterWithKthChunkNumber);
}

Ok(result)
}

Expand All @@ -869,6 +889,46 @@ impl Settings {
}
}

/// When using `--filter` option, writing to child command process stdin
/// could fail with BrokenPipe error
/// It can be safely ignored
fn ignorable_io_error(error: &std::io::Error, settings: &Settings) -> bool {
error.kind() == ErrorKind::BrokenPipe && settings.filter.is_some()
}

/// Custom wrapper for `write()` method
/// Follows similar approach to GNU implementation
/// If ignorable io error occurs, return number of bytes as if all bytes written
/// Should not be used for Kth chunk number sub-strategies
/// as those do not work with `--filter` option
fn custom_write<T: Write>(
bytes: &[u8],
writer: &mut T,
settings: &Settings,
) -> std::io::Result<usize> {
match writer.write(bytes) {
Ok(n) => Ok(n),
Err(e) if ignorable_io_error(&e, settings) => Ok(bytes.len()),
Err(e) => Err(e),
}
}

/// Custom wrapper for `write_all()` method
/// Similar to * [`custom_write`]
/// Should not be used for Kth chunk number sub-strategies
/// as those do not work with `--filter` option
fn custom_write_all<T: Write>(
bytes: &[u8],
writer: &mut T,
settings: &Settings,
) -> std::io::Result<()> {
match writer.write_all(bytes) {
Ok(()) => Ok(()),
Err(e) if ignorable_io_error(&e, settings) => Ok(()),
Err(e) => Err(e),
}
}

/// Write a certain number of bytes to one file, then move on to another one.
///
/// This struct maintains an underlying writer representing the
Expand Down Expand Up @@ -964,9 +1024,9 @@ impl<'a> Write for ByteChunkWriter<'a> {
// bytes in `buf`, then write all the bytes in `buf`. Otherwise,
// write enough bytes to fill the current chunk, then increment
// the chunk number and repeat.
let n = buf.len();
if (n as u64) < self.num_bytes_remaining_in_current_chunk {
let num_bytes_written = self.inner.write(buf)?;
let buf_len = buf.len();
if (buf_len as u64) < self.num_bytes_remaining_in_current_chunk {
let num_bytes_written = custom_write(buf, &mut self.inner, self.settings)?;
self.num_bytes_remaining_in_current_chunk -= num_bytes_written as u64;
return Ok(carryover_bytes_written + num_bytes_written);
} else {
Expand All @@ -976,7 +1036,7 @@ impl<'a> Write for ByteChunkWriter<'a> {
// self.num_bytes_remaining_in_current_chunk is lower than
// n, which is already usize.
let i = self.num_bytes_remaining_in_current_chunk as usize;
let num_bytes_written = self.inner.write(&buf[..i])?;
let num_bytes_written = custom_write(&buf[..i], &mut self.inner, self.settings)?;
self.num_bytes_remaining_in_current_chunk -= num_bytes_written as u64;

// It's possible that the underlying writer did not
Expand Down Expand Up @@ -1090,14 +1150,16 @@ impl<'a> Write for LineChunkWriter<'a> {
// Write the line, starting from *after* the previous
// separator character and ending *after* the current
// separator character.
let n = self.inner.write(&buf[prev..i + 1])?;
total_bytes_written += n;
let num_bytes_written =
custom_write(&buf[prev..i + 1], &mut self.inner, self.settings)?;
total_bytes_written += num_bytes_written;
prev = i + 1;
self.num_lines_remaining_in_current_chunk -= 1;
}

let n = self.inner.write(&buf[prev..buf.len()])?;
total_bytes_written += n;
let num_bytes_written =
custom_write(&buf[prev..buf.len()], &mut self.inner, self.settings)?;
total_bytes_written += num_bytes_written;
Ok(total_bytes_written)
}

Expand Down Expand Up @@ -1246,7 +1308,12 @@ impl<'a> Write for LineBytesChunkWriter<'a> {
{
self.num_bytes_remaining_in_current_chunk = 0;
} else {
let num_bytes_written = self.inner.write(&buf[..end.min(buf.len())])?;
// let num_bytes_written = self.inner.write(&buf[..end.min(buf.len())])?;
let num_bytes_written = custom_write(
&buf[..end.min(buf.len())],
&mut self.inner,
self.settings,
)?;
self.num_bytes_remaining_in_current_chunk -= num_bytes_written;
total_bytes_written += num_bytes_written;
buf = &buf[num_bytes_written..];
Expand All @@ -1259,7 +1326,9 @@ impl<'a> Write for LineBytesChunkWriter<'a> {
// continue to the next iteration. (See chunk 1 in the
// example comment above.)
Some(i) if i < self.num_bytes_remaining_in_current_chunk => {
let num_bytes_written = self.inner.write(&buf[..i + 1])?;
// let num_bytes_written = self.inner.write(&buf[..i + 1])?;
let num_bytes_written =
custom_write(&buf[..i + 1], &mut self.inner, self.settings)?;
self.num_bytes_remaining_in_current_chunk -= num_bytes_written;
total_bytes_written += num_bytes_written;
buf = &buf[num_bytes_written..];
Expand All @@ -1277,7 +1346,9 @@ impl<'a> Write for LineBytesChunkWriter<'a> {
== self.chunk_size.try_into().unwrap() =>
{
let end = self.num_bytes_remaining_in_current_chunk;
let num_bytes_written = self.inner.write(&buf[..end])?;
// let num_bytes_written = self.inner.write(&buf[..end])?;
let num_bytes_written =
custom_write(&buf[..end], &mut self.inner, self.settings)?;
self.num_bytes_remaining_in_current_chunk -= num_bytes_written;
total_bytes_written += num_bytes_written;
buf = &buf[num_bytes_written..];
Expand Down Expand Up @@ -1376,29 +1447,26 @@ where
writers.push(writer);
}

// Capture the result of the `std::io::copy()` calls to check for
// `BrokenPipe`.
let result: std::io::Result<()> = {
// Write `chunk_size` bytes from the reader into each writer
// except the last.
//
// The last writer gets all remaining bytes so that if the number
// of bytes in the input file was not evenly divisible by
// `num_chunks`, we don't leave any bytes behind.
for writer in writers.iter_mut().take(num_chunks - 1) {
io::copy(&mut reader.by_ref().take(chunk_size), writer)?;
}

// Write all the remaining bytes to the last chunk.
let i = num_chunks - 1;
let last_chunk_size = num_bytes - (chunk_size * (num_chunks as u64 - 1));
io::copy(&mut reader.by_ref().take(last_chunk_size), &mut writers[i])?;
// Write `chunk_size` bytes from the reader into each writer
// except the last.
//
// The last writer gets all remaining bytes so that if the number
// of bytes in the input file was not evenly divisible by
// `num_chunks`, we don't leave any bytes behind.
for writer in writers.iter_mut().take(num_chunks - 1) {
match io::copy(&mut reader.by_ref().take(chunk_size), writer) {
Ok(_) => continue,
Err(e) if ignorable_io_error(&e, settings) => continue,
Err(e) => return Err(uio_error!(e, "input/output error")),
};
}

Ok(())
};
match result {
// Write all the remaining bytes to the last chunk.
let i = num_chunks - 1;
let last_chunk_size = num_bytes - (chunk_size * (num_chunks as u64 - 1));
match io::copy(&mut reader.by_ref().take(last_chunk_size), &mut writers[i]) {
Ok(_) => Ok(()),
Err(e) if e.kind() == ErrorKind::BrokenPipe => Ok(()),
Err(e) if ignorable_io_error(&e, settings) => Ok(()),
Err(e) => Err(uio_error!(e, "input/output error")),
}
}
Expand Down Expand Up @@ -1548,8 +1616,8 @@ where
let maybe_writer = writers.get_mut(i);
let writer = maybe_writer.unwrap();
let bytes = line.as_slice();
writer.write_all(bytes)?;
writer.write_all(&[sep])?;
custom_write_all(bytes, writer, settings)?;
custom_write_all(&[sep], writer, settings)?;

// Add one byte for the separator character.
let num_bytes = bytes.len() + 1;
Expand Down Expand Up @@ -1626,11 +1694,28 @@ where
Ok(())
}

/// Split a file into a specific number of chunks by line, but
/// assign lines via round-robin
///
/// This function always creates one output file for each chunk, even
/// if there is an error reading or writing one of the chunks or if
/// the input file is truncated. However, if the `filter` option is
/// being used, then no files are created.
///
/// # Errors
///
/// This function returns an error if there is a problem reading from
/// `reader` or writing to one of the output files.
///
/// # See also
///
/// * [`split_into_n_chunks_by_line`], which splits its input in the same way,
/// but without round robin distribution.
fn split_into_n_chunks_by_line_round_robin<R>(
settings: &Settings,
reader: &mut R,
num_chunks: u64,
) -> std::io::Result<()>
) -> UResult<()>
where
R: BufRead,
{
Expand Down Expand Up @@ -1662,8 +1747,8 @@ where
let maybe_writer = writers.get_mut(i % num_chunks);
let writer = maybe_writer.unwrap();
let bytes = line.as_slice();
writer.write_all(bytes)?;
writer.write_all(&[sep])?;
custom_write_all(bytes, writer, settings)?;
custom_write_all(&[sep], writer, settings)?;
}

Ok(())
Expand Down Expand Up @@ -1745,13 +1830,7 @@ fn split(settings: &Settings) -> UResult<()> {
kth_chunk_by_line(settings, &mut reader, chunk_number, num_chunks)
}
Strategy::Number(NumberType::RoundRobin(num_chunks)) => {
match split_into_n_chunks_by_line_round_robin(settings, &mut reader, num_chunks) {
Ok(_) => Ok(()),
Err(e) => match e.kind() {
ErrorKind::BrokenPipe => Ok(()),
_ => Err(USimpleError::new(1, format!("{e}"))),
},
}
split_into_n_chunks_by_line_round_robin(settings, &mut reader, num_chunks)
}
Strategy::Number(NumberType::KthRoundRobin(chunk_number, num_chunks)) => {
// The chunk number is given as a 1-indexed number, but it
Expand All @@ -1773,7 +1852,6 @@ fn split(settings: &Settings) -> UResult<()> {
// indicate that. A special error message needs to be
// printed in that case.
ErrorKind::Other => Err(USimpleError::new(1, format!("{e}"))),
ErrorKind::BrokenPipe => Ok(()),
_ => Err(uio_error!(e, "input/output error")),
},
}
Expand All @@ -1792,7 +1870,6 @@ fn split(settings: &Settings) -> UResult<()> {
// indicate that. A special error message needs to be
// printed in that case.
ErrorKind::Other => Err(USimpleError::new(1, format!("{e}"))),
ErrorKind::BrokenPipe => Ok(()),
_ => Err(uio_error!(e, "input/output error")),
},
}
Expand All @@ -1811,7 +1888,6 @@ fn split(settings: &Settings) -> UResult<()> {
// indicate that. A special error message needs to be
// printed in that case.
ErrorKind::Other => Err(USimpleError::new(1, format!("{e}"))),
ErrorKind::BrokenPipe => Ok(()),
_ => Err(uio_error!(e, "input/output error")),
},
}
Expand Down
30 changes: 30 additions & 0 deletions tests/by-util/test_split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,36 @@ fn test_filter_broken_pipe() {
.succeeds();
}

#[test]
#[cfg(unix)]
fn test_filter_with_kth_chunk() {
let scene = TestScenario::new(util_name!());
scene
.ucmd()
.args(&["--filter='some'", "--number=1/2"])
.ignore_stdin_write_error()
.pipe_in("a\n")
.fails()
.no_stdout()
.stderr_contains("--filter does not process a chunk extracted to stdout");
scene
.ucmd()
.args(&["--filter='some'", "--number=l/1/2"])
.ignore_stdin_write_error()
.pipe_in("a\n")
.fails()
.no_stdout()
.stderr_contains("--filter does not process a chunk extracted to stdout");
scene
.ucmd()
.args(&["--filter='some'", "--number=r/1/2"])
.ignore_stdin_write_error()
.pipe_in("a\n")
.fails()
.no_stdout()
.stderr_contains("--filter does not process a chunk extracted to stdout");
}

#[test]
fn test_split_lines_number() {
// Test if stdout/stderr for '--lines' option is correct
Expand Down

0 comments on commit bd3f26f

Please sign in to comment.