diff --git a/src/uu/split/src/platform/unix.rs b/src/uu/split/src/platform/unix.rs index c2bf7216b57..1fd990e0a91 100644 --- a/src/uu/split/src/platform/unix.rs +++ b/src/uu/split/src/platform/unix.rs @@ -117,22 +117,37 @@ impl Drop for FilterWriter { pub fn instantiate_current_writer( filter: &Option, filename: &str, + is_new: bool, ) -> Result>> { match filter { - None => Ok(BufWriter::new(Box::new( - // write to the next file - std::fs::OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .open(std::path::Path::new(&filename)) - .map_err(|_| { - Error::new( - ErrorKind::Other, - format!("unable to open '{filename}'; aborting"), - ) - })?, - ) as Box)), + None => { + let file = if is_new { + // create new file + std::fs::OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(std::path::Path::new(&filename)) + .map_err(|_| { + Error::new( + ErrorKind::Other, + format!("unable to open '{filename}'; aborting"), + ) + })? + } else { + // re-open file that we previously created to append to it + std::fs::OpenOptions::new() + .append(true) + .open(std::path::Path::new(&filename)) + .map_err(|_| { + Error::new( + ErrorKind::Other, + format!("unable to re-open '{filename}'; aborting"), + ) + })? + }; + Ok(BufWriter::new(Box::new(file) as Box)) + } Some(ref filter_command) => Ok(BufWriter::new(Box::new( // spawn a shell command and write to it FilterWriter::new(filter_command, filename)?, diff --git a/src/uu/split/src/platform/windows.rs b/src/uu/split/src/platform/windows.rs index 8b90789896f..a531d6abc1f 100644 --- a/src/uu/split/src/platform/windows.rs +++ b/src/uu/split/src/platform/windows.rs @@ -14,9 +14,10 @@ use uucore::fs; pub fn instantiate_current_writer( _filter: &Option, filename: &str, + is_new: bool, ) -> Result>> { - Ok(BufWriter::new(Box::new( - // write to the next file + let file = if is_new { + // create new file std::fs::OpenOptions::new() .write(true) .create(true) @@ -25,10 +26,22 @@ pub fn instantiate_current_writer( .map_err(|_| { Error::new( ErrorKind::Other, - format!("'{filename}' would overwrite input; aborting"), + format!("unable to open '{filename}'; aborting"), ) - })?, - ) as Box)) + })? + } else { + // re-open file that we previously created to append to it + std::fs::OpenOptions::new() + .append(true) + .open(std::path::Path::new(&filename)) + .map_err(|_| { + Error::new( + ErrorKind::Other, + format!("unable to re-open '{filename}'; aborting"), + ) + })? + }; + Ok(BufWriter::new(Box::new(file) as Box)) } pub fn paths_refer_to_same_file(p1: &str, p2: &str) -> bool { diff --git a/src/uu/split/src/split.rs b/src/uu/split/src/split.rs index 4e2af0be4d8..e5f9032c947 100644 --- a/src/uu/split/src/split.rs +++ b/src/uu/split/src/split.rs @@ -3,7 +3,7 @@ // For the full copyright and license information, please view the LICENSE // file that was distributed with this source code. -// spell-checker:ignore nbbbb ncccc hexdigit +// spell-checker:ignore nbbbb ncccc hexdigit getmaxstdio mod filenames; mod number; @@ -563,7 +563,11 @@ impl Settings { Ok(result) } - fn instantiate_current_writer(&self, filename: &str) -> io::Result>> { + fn instantiate_current_writer( + &self, + filename: &str, + is_new: bool, + ) -> io::Result>> { if platform::paths_refer_to_same_file(&self.input, filename) { return Err(io::Error::new( ErrorKind::Other, @@ -571,7 +575,7 @@ impl Settings { )); } - platform::instantiate_current_writer(&self.filter, filename) + platform::instantiate_current_writer(&self.filter, filename, is_new) } } @@ -748,7 +752,7 @@ impl<'a> ByteChunkWriter<'a> { if settings.verbose { println!("creating file {}", filename.quote()); } - let inner = settings.instantiate_current_writer(&filename)?; + let inner = settings.instantiate_current_writer(&filename, true)?; Ok(ByteChunkWriter { settings, chunk_size, @@ -786,7 +790,7 @@ impl<'a> Write for ByteChunkWriter<'a> { if self.settings.verbose { println!("creating file {}", filename.quote()); } - self.inner = self.settings.instantiate_current_writer(&filename)?; + self.inner = self.settings.instantiate_current_writer(&filename, true)?; } // If the capacity of this chunk is greater than the number of @@ -872,7 +876,7 @@ impl<'a> LineChunkWriter<'a> { if settings.verbose { println!("creating file {}", filename.quote()); } - let inner = settings.instantiate_current_writer(&filename)?; + let inner = settings.instantiate_current_writer(&filename, true)?; Ok(LineChunkWriter { settings, chunk_size, @@ -907,7 +911,7 @@ impl<'a> Write for LineChunkWriter<'a> { if self.settings.verbose { println!("creating file {}", filename.quote()); } - self.inner = self.settings.instantiate_current_writer(&filename)?; + self.inner = self.settings.instantiate_current_writer(&filename, true)?; self.num_lines_remaining_in_current_chunk = self.chunk_size; } @@ -979,7 +983,7 @@ impl<'a> LineBytesChunkWriter<'a> { if settings.verbose { println!("creating file {}", filename.quote()); } - let inner = settings.instantiate_current_writer(&filename)?; + let inner = settings.instantiate_current_writer(&filename, true)?; Ok(LineBytesChunkWriter { settings, chunk_size, @@ -1045,7 +1049,7 @@ impl<'a> Write for LineBytesChunkWriter<'a> { if self.settings.verbose { println!("creating file {}", filename.quote()); } - self.inner = self.settings.instantiate_current_writer(&filename)?; + self.inner = self.settings.instantiate_current_writer(&filename, true)?; self.num_bytes_remaining_in_current_chunk = self.chunk_size.try_into().unwrap(); } @@ -1134,55 +1138,135 @@ impl<'a> Write for LineBytesChunkWriter<'a> { struct OutFile { filename: String, maybe_writer: Option>>, + is_new: bool, } -impl OutFile { - /// Get the writer for the output file - /// Instantiate the writer if it has not been instantiated upfront - fn get_writer(&mut self, settings: &Settings) -> UResult<&mut BufWriter>> { - if self.maybe_writer.is_some() { - Ok(self.maybe_writer.as_mut().unwrap()) - } else { - // Writer was not instantiated upfront - // Instantiate it and record for future use - self.maybe_writer = Some(settings.instantiate_current_writer(self.filename.as_str())?); - Ok(self.maybe_writer.as_mut().unwrap()) +// impl OutFile { +// /// Get the writer for the output file. +// /// Instantiate the writer if it has not been instantiated upfront +// /// or temporarily closed to free up system resources +// fn get_writer(&mut self, settings: &Settings) -> UResult<&mut BufWriter>> { +// if self.maybe_writer.is_some() { +// Ok(self.maybe_writer.as_mut().unwrap()) +// } else { +// // Writer was not instantiated upfront or was temporarily closed due to system resources constraints. +// // Instantiate it and record for future use. +// self.maybe_writer = +// Some(settings.instantiate_current_writer(self.filename.as_str(), self.is_new)?); +// Ok(self.maybe_writer.as_mut().unwrap()) +// } +// } +// } + +/// A set of output files +/// Used in [`n_chunks_by_byte`], [`n_chunks_by_line`] +/// and [`n_chunks_by_line_round_robin`] functions. +type OutFiles = Vec; +trait ManageOutFiles { + /// Initialize a new set of output files + /// Each OutFile is generated with filename, while the writer for it could be + /// optional, to be instantiated later by the calling function as needed. + /// Optional writers could happen in the following situations: + /// * in [`n_chunks_by_line`] if `elide_empty_files` parameter is set to `true` + /// * if the number of files is greater than system limit for open files + fn init(num_files: u64, settings: &Settings, is_writer_optional: bool) -> UResult + where + Self: Sized; + /// Get the writer for the output file by index. + /// If system limit of open files has been reached + /// it will try to close one of previously instantiated writers + /// to free up resources and re-try instantiating current writer, + /// except for `--filter` mode. + /// The writers that get closed to free up resources for the current writer + /// are flagged as `is_new=false`, so they can be re-opened for appending + /// instead of created anew if we need to keep writing into them later, + /// i.e. in case of round robin distribution as in [`n_chunks_by_line_round_robin`] + fn get_writer( + &mut self, + idx: usize, + settings: &Settings, + ) -> UResult<&mut BufWriter>>; +} + +impl ManageOutFiles for OutFiles { + fn init(num_files: u64, settings: &Settings, is_writer_optional: bool) -> UResult { + // This object is responsible for creating the filename for each chunk + let mut filename_iterator: FilenameIterator<'_> = + FilenameIterator::new(&settings.prefix, &settings.suffix) + .map_err(|e| io::Error::new(ErrorKind::Other, format!("{e}")))?; + let mut out_files: Self = Self::new(); + for _ in 0..num_files { + let filename = filename_iterator + .next() + .ok_or_else(|| USimpleError::new(1, "output file suffixes exhausted"))?; + let maybe_writer = if is_writer_optional { + None + } else { + let instantiated = settings.instantiate_current_writer(filename.as_str(), true); + // If there was an error instantiating the writer for a file, + // it could be due to hitting the system limit of open files, + // so record it as None and let [`get_writer`] function handle closing/re-opening + // of writers as needed within system limits. + // However, for `--filter` child process writers - propagate the error, + // as working around system limits of open files for child shell processes + // is currently not supported (same as in GNU) + match instantiated { + Ok(writer) => Some(writer), + Err(e) if settings.filter.is_some() => { + return Err(e.into()); + } + Err(_) => None, + } + }; + out_files.push(OutFile { + filename, + maybe_writer, + is_new: true, + }); } + Ok(out_files) } -} -/// Generate a set of Output Files -/// This is a helper function to [`n_chunks_by_byte`], [`n_chunks_by_line`] -/// and [`n_chunks_by_line_round_robin`]. -/// Each OutFile is generated with filename, while the writer for it could be -/// optional, to be instantiated later by the calling function as needed. -/// Optional writers could happen in [`n_chunks_by_line`] -/// if `elide_empty_files` parameter is set to `true`. -fn get_out_files( - num_files: u64, - settings: &Settings, - is_writer_optional: bool, -) -> UResult> { - // This object is responsible for creating the filename for each chunk - let mut filename_iterator: FilenameIterator<'_> = - FilenameIterator::new(&settings.prefix, &settings.suffix) - .map_err(|e| io::Error::new(ErrorKind::Other, format!("{e}")))?; - let mut out_files: Vec = Vec::new(); - for _ in 0..num_files { - let filename = filename_iterator - .next() - .ok_or_else(|| USimpleError::new(1, "output file suffixes exhausted"))?; - let maybe_writer = if is_writer_optional { - None + fn get_writer( + &mut self, + idx: usize, + settings: &Settings, + ) -> UResult<&mut BufWriter>> { + if self[idx].maybe_writer.is_some() { + Ok(self[idx].maybe_writer.as_mut().unwrap()) } else { - Some(settings.instantiate_current_writer(filename.as_str())?) - }; - out_files.push(OutFile { - filename, - maybe_writer, - }); + // Writer was not instantiated upfront or was temporarily closed due to system resources constraints. + // Instantiate it and record for future use. + let maybe_writer = + settings.instantiate_current_writer(self[idx].filename.as_str(), self[idx].is_new); + if let Ok(writer) = maybe_writer { + self[idx].maybe_writer = Some(writer); + Ok(self[idx].maybe_writer.as_mut().unwrap()) + } else if settings.filter.is_some() { + // Propagate error if in `--filter` mode + Err(maybe_writer.err().unwrap().into()) + } else { + // Could have hit system limit for open files. + // Try to close one previously instantiated writer first + for (i, out_file) in self.iter_mut().enumerate() { + if i != idx && out_file.maybe_writer.is_some() { + out_file.maybe_writer.as_mut().unwrap().flush()?; + out_file.maybe_writer = None; + out_file.is_new = false; + break; + } + } + // And then try to instantiate the writer again + // If this fails - give up and propagate the error + self[idx].maybe_writer = + Some(settings.instantiate_current_writer( + self[idx].filename.as_str(), + self[idx].is_new, + )?); + Ok(self[idx].maybe_writer.as_mut().unwrap()) + } + } } - Ok(out_files) } /// Split a file or STDIN into a specific number of chunks by byte. @@ -1261,7 +1345,7 @@ where // In Kth chunk of N mode - we will write to stdout instead of to a file. let mut stdout_writer = std::io::stdout().lock(); // In N chunks mode - we will write to `num_chunks` files - let mut out_files: Vec = Vec::new(); + let mut out_files: OutFiles = OutFiles::new(); // Calculate chunk size base and modulo reminder // to be used in calculating chunk_size later on @@ -1273,7 +1357,7 @@ where // This will create each of the underlying files // or stdin pipes to child shell/command processes if in `--filter` mode if kth_chunk.is_none() { - out_files = get_out_files(num_chunks, settings, false)?; + out_files = OutFiles::init(num_chunks, settings, false)?; } for i in 1_u64..=num_chunks { @@ -1317,7 +1401,7 @@ where } None => { let idx = (i - 1) as usize; - let writer = out_files[idx].get_writer(settings)?; + let writer = out_files.get_writer(idx, settings)?; writer.write_all(buf)?; } } @@ -1387,7 +1471,7 @@ where // In Kth chunk of N mode - we will write to stdout instead of to a file. let mut stdout_writer = std::io::stdout().lock(); // In N chunks mode - we will write to `num_chunks` files - let mut out_files: Vec = Vec::new(); + let mut out_files: OutFiles = OutFiles::new(); // Calculate chunk size base and modulo reminder // to be used in calculating `num_bytes_should_be_written` later on @@ -1402,7 +1486,7 @@ where // Otherwise keep writer optional, to be instantiated later if there is data // to write for the associated chunk. if kth_chunk.is_none() { - out_files = get_out_files(num_chunks, settings, settings.elide_empty_files)?; + out_files = OutFiles::init(num_chunks, settings, settings.elide_empty_files)?; } let mut chunk_number = 1; @@ -1429,7 +1513,7 @@ where None => { // Should write into a file let idx = (chunk_number - 1) as usize; - let writer = out_files[idx].get_writer(settings)?; + let writer = out_files.get_writer(idx, settings)?; custom_write_all(bytes, writer, settings)?; } } @@ -1503,14 +1587,14 @@ where // In Kth chunk of N mode - we will write to stdout instead of to a file. let mut stdout_writer = std::io::stdout().lock(); // In N chunks mode - we will write to `num_chunks` files - let mut out_files: Vec = Vec::new(); + let mut out_files: OutFiles = OutFiles::new(); // If in N chunks mode // Create one writer for each chunk. // This will create each of the underlying files // or stdin pipes to child shell/command processes if in `--filter` mode if kth_chunk.is_none() { - out_files = get_out_files(num_chunks, settings, false)?; + out_files = OutFiles::init(num_chunks, settings, false)?; } let num_chunks: usize = num_chunks.try_into().unwrap(); @@ -1532,7 +1616,7 @@ where } } None => { - let writer = out_files[i % num_chunks].get_writer(settings)?; + let writer = out_files.get_writer(i % num_chunks, settings)?; let writer_stdin_open = custom_write_all(bytes, writer, settings)?; if !writer_stdin_open { closed_writers += 1;