From 52a50c1ebcf170d00104c1c665011282179e6e1c Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Thu, 22 Aug 2024 14:17:56 -0700 Subject: [PATCH] Change io_parallelism method to return usize instead of u32 to reduce casting --- rust/lance-file/src/reader.rs | 10 +++++----- rust/lance-index/src/scalar.rs | 2 +- rust/lance-index/src/scalar/btree.rs | 2 +- rust/lance-index/src/scalar/lance_format.rs | 2 +- rust/lance-io/src/encodings/binary.rs | 2 +- rust/lance-io/src/encodings/plain.rs | 4 ++-- rust/lance-io/src/local.rs | 2 +- rust/lance-io/src/object_reader.rs | 2 +- rust/lance-io/src/object_store.rs | 18 +++++++++--------- rust/lance-io/src/scheduler.rs | 5 ++++- rust/lance-io/src/traits.rs | 2 +- rust/lance/src/dataset.rs | 10 +++++----- rust/lance/src/dataset/cleanup.rs | 9 ++++----- rust/lance/src/dataset/optimize.rs | 2 +- rust/lance/src/dataset/rowids.rs | 2 +- rust/lance/src/dataset/take.rs | 6 +++--- rust/lance/src/dataset/write/merge_insert.rs | 2 +- rust/lance/src/dataset/write/update.rs | 2 +- rust/lance/src/index/prefilter.rs | 2 +- rust/lance/src/index/vector/ivf.rs | 2 +- rust/lance/src/index/vector/ivf/io.rs | 4 ++-- rust/lance/src/io/commit.rs | 2 +- rust/lance/src/io/exec/scan.rs | 4 ++-- 23 files changed, 50 insertions(+), 48 deletions(-) diff --git a/rust/lance-file/src/reader.rs b/rust/lance-file/src/reader.rs index 0c5df4a4eb..0ecda0394a 100644 --- a/rust/lance-file/src/reader.rs +++ b/rust/lance-file/src/reader.rs @@ -232,7 +232,7 @@ impl FileReader { Self::try_new_with_fragment_id(object_store, path, schema, 0, 0, max_field_id, None).await } - fn io_parallelism(&self) -> u32 { + fn io_parallelism(&self) -> usize { self.object_reader.io_parallelism() } @@ -291,7 +291,7 @@ impl FileReader { .map(|(batch_id, range)| async move { self.read_batch(batch_id, range, projection).await }) - .buffered(self.io_parallelism() as usize) + .buffered(self.io_parallelism()) .try_collect::>() .await?; if batches.len() == 1 { @@ -326,7 +326,7 @@ impl FileReader { .await } }) - .buffered(self.io_parallelism() as usize) + .buffered(self.io_parallelism()) .try_collect::>() .await?; @@ -372,7 +372,7 @@ impl FileReader { ) .await }) - .buffered(self.io_parallelism() as usize) + .buffered(self.io_parallelism()) .try_collect::>() .await?; @@ -436,7 +436,7 @@ pub async fn read_batch( // We box this because otherwise we get a higher-order lifetime error. let arrs = stream::iter(&schema.fields) .map(|f| async { read_array(reader, f, batch_id, &reader.page_table, params).await }) - .buffered(reader.io_parallelism() as usize) + .buffered(reader.io_parallelism()) .try_collect::>() .boxed(); let arrs = arrs.await?; diff --git a/rust/lance-index/src/scalar.rs b/rust/lance-index/src/scalar.rs index 7d8cb75c39..6c26a193d8 100644 --- a/rust/lance-index/src/scalar.rs +++ b/rust/lance-index/src/scalar.rs @@ -127,7 +127,7 @@ pub trait IndexStore: std::fmt::Debug + Send + Sync + DeepSizeOf { fn as_any(&self) -> &dyn Any; /// Suggested I/O parallelism for the store - fn io_parallelism(&self) -> u32; + fn io_parallelism(&self) -> usize; /// Create a new file and return a writer to store data in the file async fn new_index_file(&self, name: &str, schema: Arc) diff --git a/rust/lance-index/src/scalar/btree.rs b/rust/lance-index/src/scalar/btree.rs index e8f0cfb721..375925c6f7 100644 --- a/rust/lance-index/src/scalar/btree.rs +++ b/rust/lance-index/src/scalar/btree.rs @@ -765,7 +765,7 @@ impl BTreeIndex { idx: 0, } .map(|fut| fut.map_err(DataFusionError::from)) - .buffered(self.store.io_parallelism() as usize) + .buffered(self.store.io_parallelism()) .boxed(); Ok(RecordBatchStreamAdapter::new(schema, batches)) } diff --git a/rust/lance-index/src/scalar/lance_format.rs b/rust/lance-index/src/scalar/lance_format.rs index 8c0a385dcd..0b56ba3722 100644 --- a/rust/lance-index/src/scalar/lance_format.rs +++ b/rust/lance-index/src/scalar/lance_format.rs @@ -167,7 +167,7 @@ impl IndexStore for LanceIndexStore { self } - fn io_parallelism(&self) -> u32 { + fn io_parallelism(&self) -> usize { self.object_store.io_parallelism() } diff --git a/rust/lance-io/src/encodings/binary.rs b/rust/lance-io/src/encodings/binary.rs index 44d665fdf6..fe202c31ef 100644 --- a/rust/lance-io/src/encodings/binary.rs +++ b/rust/lance-io/src/encodings/binary.rs @@ -340,7 +340,7 @@ impl<'a, T: ByteArrayType> Decoder for BinaryDecoder<'a, T> { .await?; Result::Ok((chunk, chunk_offset, array)) }) - .buffered(self.reader.io_parallelism() as usize) + .buffered(self.reader.io_parallelism()) .try_for_each(|(chunk, chunk_offset, array)| { let array: &GenericByteArray = array.as_bytes(); diff --git a/rust/lance-io/src/encodings/plain.rs b/rust/lance-io/src/encodings/plain.rs index 76e5b1d553..3cdb664e37 100644 --- a/rust/lance-io/src/encodings/plain.rs +++ b/rust/lance-io/src/encodings/plain.rs @@ -364,7 +364,7 @@ impl<'a> PlainDecoder<'a> { let shifted_indices = sub(&request, &UInt32Array::new_scalar(start))?; Ok::(take(&array, &shifted_indices, None)?) }) - .buffered(self.reader.io_parallelism() as usize) + .buffered(self.reader.io_parallelism()) .try_collect::>() .await?; let references = arrays.iter().map(|a| a.as_ref()).collect::>(); @@ -430,7 +430,7 @@ impl<'a> Decoder for PlainDecoder<'a> { let adjusted_offsets = sub(&request, &UInt32Array::new_scalar(start))?; Ok::(take(&array, &adjusted_offsets, None)?) }) - .buffered(self.reader.io_parallelism() as usize) + .buffered(self.reader.io_parallelism()) .try_collect::>() .await?; let references = arrays.iter().map(|a| a.as_ref()).collect::>(); diff --git a/rust/lance-io/src/local.rs b/rust/lance-io/src/local.rs index 503b7002eb..81368c6de0 100644 --- a/rust/lance-io/src/local.rs +++ b/rust/lance-io/src/local.rs @@ -123,7 +123,7 @@ impl Reader for LocalObjectReader { self.block_size } - fn io_parallelism(&self) -> u32 { + fn io_parallelism(&self) -> usize { DEFAULT_LOCAL_IO_PARALLELISM } diff --git a/rust/lance-io/src/object_reader.rs b/rust/lance-io/src/object_reader.rs index 53d47965ed..e943d791a6 100644 --- a/rust/lance-io/src/object_reader.rs +++ b/rust/lance-io/src/object_reader.rs @@ -85,7 +85,7 @@ impl Reader for CloudObjectReader { self.block_size } - fn io_parallelism(&self) -> u32 { + fn io_parallelism(&self) -> usize { DEFAULT_CLOUD_IO_PARALLELISM } diff --git a/rust/lance-io/src/object_store.rs b/rust/lance-io/src/object_store.rs index 1747c753c5..b4fd2ed200 100644 --- a/rust/lance-io/src/object_store.rs +++ b/rust/lance-io/src/object_store.rs @@ -42,9 +42,9 @@ use lance_core::{Error, Result}; // Note: the number of threads here also impacts the number of files // we need to read in some situations. So keeping this at 8 keeps the // RAM on our scanner down. -pub const DEFAULT_LOCAL_IO_PARALLELISM: u32 = 8; +pub const DEFAULT_LOCAL_IO_PARALLELISM: usize = 8; // Cloud disks often need many many threads to saturate the network -pub const DEFAULT_CLOUD_IO_PARALLELISM: u32 = 64; +pub const DEFAULT_CLOUD_IO_PARALLELISM: usize = 64; #[async_trait] pub trait ObjectStoreExt { @@ -94,7 +94,7 @@ pub struct ObjectStore { scheme: String, block_size: usize, pub use_constant_size_upload_parts: bool, - io_parallelism: u32, + io_parallelism: usize, } impl DeepSizeOf for ObjectStore { @@ -467,7 +467,7 @@ impl ObjectStore { scheme: String::from("memory"), block_size: 64 * 1024, use_constant_size_upload_parts: false, - io_parallelism: get_num_compute_intensive_cpus() as u32, + io_parallelism: get_num_compute_intensive_cpus(), } } @@ -484,13 +484,13 @@ impl ObjectStore { self.block_size = new_size; } - pub fn set_io_parallelism(&mut self, io_parallelism: u32) { + pub fn set_io_parallelism(&mut self, io_parallelism: usize) { self.io_parallelism = io_parallelism; } - pub fn io_parallelism(&self) -> u32 { + pub fn io_parallelism(&self) -> usize { std::env::var("LANCE_IO_THREADS") - .map(|val| val.parse::().unwrap()) + .map(|val| val.parse::().unwrap()) .unwrap_or(self.io_parallelism) } @@ -847,7 +847,7 @@ async fn configure_store( scheme: String::from("memory"), block_size: 64 * 1024, use_constant_size_upload_parts: false, - io_parallelism: get_num_compute_intensive_cpus() as u32, + io_parallelism: get_num_compute_intensive_cpus(), }), unknown_scheme => { if let Some(provider) = registry.providers.get(unknown_scheme) { @@ -870,7 +870,7 @@ impl ObjectStore { block_size: Option, wrapper: Option>, use_constant_size_upload_parts: bool, - io_parallelism: u32, + io_parallelism: usize, ) -> Self { let scheme = location.scheme(); let block_size = block_size.unwrap_or_else(|| infer_block_size(scheme)); diff --git a/rust/lance-io/src/scheduler.rs b/rust/lance-io/src/scheduler.rs index c60ef8bf82..07e5e76751 100644 --- a/rust/lance-io/src/scheduler.rs +++ b/rust/lance-io/src/scheduler.rs @@ -419,7 +419,10 @@ impl ScanScheduler { /// * config - configuration settings for the scheduler pub fn new(object_store: Arc, config: SchedulerConfig) -> Arc { let io_capacity = object_store.io_parallelism(); - let io_queue = Arc::new(IoQueue::new(io_capacity, config.io_buffer_size_bytes)); + let io_queue = Arc::new(IoQueue::new( + io_capacity as u32, + config.io_buffer_size_bytes, + )); let scheduler = Self { object_store, io_queue: io_queue.clone(), diff --git a/rust/lance-io/src/traits.rs b/rust/lance-io/src/traits.rs index 572c18664f..0863891935 100644 --- a/rust/lance-io/src/traits.rs +++ b/rust/lance-io/src/traits.rs @@ -87,7 +87,7 @@ pub trait Reader: std::fmt::Debug + Send + Sync + DeepSizeOf { fn block_size(&self) -> usize; /// Suggest optimal I/O parallelism per storage device. - fn io_parallelism(&self) -> u32; + fn io_parallelism(&self) -> usize; /// Object/File Size. async fn size(&self) -> object_store::Result; diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 6928a49910..d6f749f184 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -1097,7 +1097,7 @@ impl Dataset { pub async fn count_deleted_rows(&self) -> Result { futures::stream::iter(self.get_fragments()) .map(|f| async move { f.count_deletions().await }) - .buffer_unordered(self.object_store.io_parallelism() as usize) + .buffer_unordered(self.object_store.io_parallelism()) .try_fold(0, |acc, x| futures::future::ready(Ok(acc + x))) .await } @@ -1213,7 +1213,7 @@ impl Dataset { pub async fn num_small_files(&self, max_rows_per_group: usize) -> usize { futures::stream::iter(self.get_fragments()) .map(|f| async move { f.physical_rows().await }) - .buffered(self.object_store.io_parallelism() as usize) + .buffered(self.object_store.io_parallelism()) .try_filter(|row_count| futures::future::ready(*row_count < max_rows_per_group)) .count() .await @@ -1246,7 +1246,7 @@ impl Dataset { // All fragments have equal lengths futures::stream::iter(self.get_fragments()) .map(|f| async move { f.validate().await }) - .buffer_unordered(self.object_store.io_parallelism() as usize) + .buffer_unordered(self.object_store.io_parallelism()) .try_collect::>() .await?; @@ -3347,8 +3347,8 @@ mod tests { .unwrap(); dataset.validate().await.unwrap(); - assert!(dataset.num_small_files(1024).await.unwrap() > 0); - assert!(dataset.num_small_files(512).await.unwrap() == 0); + assert!(dataset.num_small_files(1024).await > 0); + assert!(dataset.num_small_files(512).await == 0); } #[tokio::test] diff --git a/rust/lance/src/dataset/cleanup.rs b/rust/lance/src/dataset/cleanup.rs index 8fb0032aef..243541b5dc 100644 --- a/rust/lance/src/dataset/cleanup.rs +++ b/rust/lance/src/dataset/cleanup.rs @@ -152,10 +152,9 @@ impl<'a> CleanupTask<'a> { .commit_handler .list_manifests(&self.dataset.base, &self.dataset.object_store.inner) .await? - .try_for_each_concurrent( - self.dataset.object_store.io_parallelism() as usize, - |path| self.process_manifest_file(path, &inspection, tagged_versions), - ) + .try_for_each_concurrent(self.dataset.object_store.io_parallelism(), |path| { + self.process_manifest_file(path, &inspection, tagged_versions) + }) .await?; Ok(inspection.into_inner().unwrap()) } @@ -276,7 +275,7 @@ impl<'a> CleanupTask<'a> { .collect::>() .await; let manifest_bytes_removed = stream::iter(manifest_bytes_removed) - .buffer_unordered(self.dataset.object_store.io_parallelism() as usize) + .buffer_unordered(self.dataset.object_store.io_parallelism()) .try_fold(0, |acc, size| async move { Ok(acc + (size as u64)) }) .await; diff --git a/rust/lance/src/dataset/optimize.rs b/rust/lance/src/dataset/optimize.rs index c74c765fd3..c6b244fa11 100644 --- a/rust/lance/src/dataset/optimize.rs +++ b/rust/lance/src/dataset/optimize.rs @@ -469,7 +469,7 @@ pub async fn plan_compaction( Err(e) => Err(e), } }) - .buffered(dataset.object_store().io_parallelism() as usize); + .buffered(dataset.object_store().io_parallelism()); let index_fragmaps = load_index_fragmaps(dataset).await?; let indices_containing_frag = |frag_id: u32| { diff --git a/rust/lance/src/dataset/rowids.rs b/rust/lance/src/dataset/rowids.rs index 338c758dd4..808bdae377 100644 --- a/rust/lance/src/dataset/rowids.rs +++ b/rust/lance/src/dataset/rowids.rs @@ -64,7 +64,7 @@ pub fn load_row_id_sequences<'a>( .map(|fragment| { load_row_id_sequence(dataset, fragment).map_ok(move |seq| (fragment.id as u32, seq)) }) - .buffer_unordered(dataset.object_store.io_parallelism() as usize) + .buffer_unordered(dataset.object_store.io_parallelism()) } pub async fn get_row_id_index( diff --git a/rust/lance/src/dataset/take.rs b/rust/lance/src/dataset/take.rs index d5510e70fd..265f3d741a 100644 --- a/rust/lance/src/dataset/take.rs +++ b/rust/lance/src/dataset/take.rs @@ -130,7 +130,7 @@ pub async fn take( }) .collect::>(); let take_stream = futures::stream::iter(take_tasks) - .buffered(dataset.object_store.io_parallelism() as usize) + .buffered(dataset.object_store.io_parallelism()) .map_err(|err| DataFusionError::External(err.into())) .boxed(); let take_stream = Box::pin(RecordBatchStreamAdapter::new( @@ -253,7 +253,7 @@ pub async fn take_rows( batches.push(batch_fut); } let batches: Vec = futures::stream::iter(batches) - .buffered(dataset.object_store.io_parallelism() as usize) + .buffered(dataset.object_store.io_parallelism()) .try_collect() .await?; Ok(concat_batches(&batches[0].schema(), &batches)?) @@ -293,7 +293,7 @@ pub async fn take_rows( .map(|(fragment, indices)| { do_take(fragment, indices, projection.physical_schema.clone(), true) }) - .buffered(dataset.object_store.io_parallelism() as usize) + .buffered(dataset.object_store.io_parallelism()) .try_collect::>() .await?; diff --git a/rust/lance/src/dataset/write/merge_insert.rs b/rust/lance/src/dataset/write/merge_insert.rs index 7db790fc24..352db503ad 100644 --- a/rust/lance/src/dataset/write/merge_insert.rs +++ b/rust/lance/src/dataset/write/merge_insert.rs @@ -957,7 +957,7 @@ impl MergeInsertJob { } } }) - .buffer_unordered(dataset.object_store.io_parallelism() as usize); + .buffer_unordered(dataset.object_store.io_parallelism()); while let Some(res) = stream.next().await.transpose()? { match res { diff --git a/rust/lance/src/dataset/write/update.rs b/rust/lance/src/dataset/write/update.rs index 9db98c3c3d..6b970dea50 100644 --- a/rust/lance/src/dataset/write/update.rs +++ b/rust/lance/src/dataset/write/update.rs @@ -302,7 +302,7 @@ impl UpdateJob { } } }) - .buffer_unordered(self.dataset.object_store.io_parallelism() as usize); + .buffer_unordered(self.dataset.object_store.io_parallelism()); while let Some(res) = stream.next().await.transpose()? { match res { diff --git a/rust/lance/src/index/prefilter.rs b/rust/lance/src/index/prefilter.rs index 368c0cfe73..121a377e31 100644 --- a/rust/lance/src/index/prefilter.rs +++ b/rust/lance/src/index/prefilter.rs @@ -101,7 +101,7 @@ impl DatasetPreFilter { .collect::>() .await; let mut frag_id_deletion_vectors = stream::iter(frag_id_deletion_vectors) - .buffer_unordered(dataset.object_store.io_parallelism() as usize); + .buffer_unordered(dataset.object_store.io_parallelism()); let mut deleted_ids = RowIdTreeMap::new(); while let Some((id, deletion_vector)) = frag_id_deletion_vectors.try_next().await? { diff --git a/rust/lance/src/index/vector/ivf.rs b/rust/lance/src/index/vector/ivf.rs index 3e1b6541ac..9c52998a16 100644 --- a/rust/lance/src/index/vector/ivf.rs +++ b/rust/lance/src/index/vector/ivf.rs @@ -1356,7 +1356,7 @@ pub(crate) async fn remap_index_file( let mut task_stream = stream::iter(tasks.into_iter()) .map(|task| task.load_and_remap(reader.clone(), index, mapping)) - .buffered(object_store.io_parallelism() as usize); + .buffered(object_store.io_parallelism()); let mut ivf = IvfModel { centroids: index.ivf.centroids.clone(), diff --git a/rust/lance/src/index/vector/ivf/io.rs b/rust/lance/src/index/vector/ivf/io.rs index 5910c2adef..5c254727cc 100644 --- a/rust/lance/src/index/vector/ivf/io.rs +++ b/rust/lance/src/index/vector/ivf/io.rs @@ -405,7 +405,7 @@ pub(super) async fn write_hnsw_quantization_index_partitions( part_reader.schema(), ) }) - .buffered(object_store.io_parallelism() as usize) + .buffered(object_store.io_parallelism()) .try_collect::>() .await?; writer.write(&batches).await?; @@ -429,7 +429,7 @@ pub(super) async fn write_hnsw_quantization_index_partitions( aux_part_reader.schema(), ) }) - .buffered(object_store.io_parallelism() as usize) + .buffered(object_store.io_parallelism()) .try_collect::>() .await?; std::mem::drop(aux_part_reader); diff --git a/rust/lance/src/io/commit.rs b/rust/lance/src/io/commit.rs index 0a9bb26623..f5e86d48dd 100644 --- a/rust/lance/src/io/commit.rs +++ b/rust/lance/src/io/commit.rs @@ -331,7 +331,7 @@ pub(crate) async fn migrate_fragments( ..fragment.clone() }) }) - .buffered(dataset.object_store.io_parallelism() as usize) + .buffered(dataset.object_store.io_parallelism()) .boxed(); new_fragments.try_collect().await diff --git a/rust/lance/src/io/exec/scan.rs b/rust/lance/src/io/exec/scan.rs index 709e182869..b5ff73afee 100644 --- a/rust/lance/src/io/exec/scan.rs +++ b/rust/lance/src/io/exec/scan.rs @@ -164,9 +164,9 @@ impl LanceStream { // answer though and so we err on the side of speed over memory. Users can tone down fragment_parallelism // by hand if needed. if projection.fields.is_empty() { - io_parallelism as usize + io_parallelism } else { - bit_util::ceil(io_parallelism as usize, projection.fields.len()) + bit_util::ceil(io_parallelism, projection.fields.len()) } }) // fragment_readhead=0 doesn't make sense so we just bump it to 1