Skip to content

Commit

Permalink
Change io_parallelism method to return usize instead of u32 to reduce…
Browse files Browse the repository at this point in the history
… casting
  • Loading branch information
westonpace committed Aug 22, 2024
1 parent 0e5a1c7 commit 52a50c1
Show file tree
Hide file tree
Showing 23 changed files with 50 additions and 48 deletions.
10 changes: 5 additions & 5 deletions rust/lance-file/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ impl FileReader {
Self::try_new_with_fragment_id(object_store, path, schema, 0, 0, max_field_id, None).await
}

fn io_parallelism(&self) -> u32 {
fn io_parallelism(&self) -> usize {
self.object_reader.io_parallelism()
}

Expand Down Expand Up @@ -291,7 +291,7 @@ impl FileReader {
.map(|(batch_id, range)| async move {
self.read_batch(batch_id, range, projection).await
})
.buffered(self.io_parallelism() as usize)
.buffered(self.io_parallelism())
.try_collect::<Vec<_>>()
.await?;
if batches.len() == 1 {
Expand Down Expand Up @@ -326,7 +326,7 @@ impl FileReader {
.await
}
})
.buffered(self.io_parallelism() as usize)
.buffered(self.io_parallelism())
.try_collect::<Vec<_>>()
.await?;

Expand Down Expand Up @@ -372,7 +372,7 @@ impl FileReader {
)
.await
})
.buffered(self.io_parallelism() as usize)
.buffered(self.io_parallelism())
.try_collect::<Vec<_>>()
.await?;

Expand Down Expand Up @@ -436,7 +436,7 @@ pub async fn read_batch(
// We box this because otherwise we get a higher-order lifetime error.
let arrs = stream::iter(&schema.fields)
.map(|f| async { read_array(reader, f, batch_id, &reader.page_table, params).await })
.buffered(reader.io_parallelism() as usize)
.buffered(reader.io_parallelism())
.try_collect::<Vec<_>>()
.boxed();
let arrs = arrs.await?;
Expand Down
2 changes: 1 addition & 1 deletion rust/lance-index/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ pub trait IndexStore: std::fmt::Debug + Send + Sync + DeepSizeOf {
fn as_any(&self) -> &dyn Any;

/// Suggested I/O parallelism for the store
fn io_parallelism(&self) -> u32;
fn io_parallelism(&self) -> usize;

/// Create a new file and return a writer to store data in the file
async fn new_index_file(&self, name: &str, schema: Arc<Schema>)
Expand Down
2 changes: 1 addition & 1 deletion rust/lance-index/src/scalar/btree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -765,7 +765,7 @@ impl BTreeIndex {
idx: 0,
}
.map(|fut| fut.map_err(DataFusionError::from))
.buffered(self.store.io_parallelism() as usize)
.buffered(self.store.io_parallelism())
.boxed();
Ok(RecordBatchStreamAdapter::new(schema, batches))
}
Expand Down
2 changes: 1 addition & 1 deletion rust/lance-index/src/scalar/lance_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ impl IndexStore for LanceIndexStore {
self
}

fn io_parallelism(&self) -> u32 {
fn io_parallelism(&self) -> usize {
self.object_store.io_parallelism()
}

Expand Down
2 changes: 1 addition & 1 deletion rust/lance-io/src/encodings/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ impl<'a, T: ByteArrayType> Decoder for BinaryDecoder<'a, T> {
.await?;
Result::Ok((chunk, chunk_offset, array))
})
.buffered(self.reader.io_parallelism() as usize)
.buffered(self.reader.io_parallelism())
.try_for_each(|(chunk, chunk_offset, array)| {
let array: &GenericByteArray<T> = array.as_bytes();

Expand Down
4 changes: 2 additions & 2 deletions rust/lance-io/src/encodings/plain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ impl<'a> PlainDecoder<'a> {
let shifted_indices = sub(&request, &UInt32Array::new_scalar(start))?;
Ok::<ArrayRef, Error>(take(&array, &shifted_indices, None)?)
})
.buffered(self.reader.io_parallelism() as usize)
.buffered(self.reader.io_parallelism())
.try_collect::<Vec<_>>()
.await?;
let references = arrays.iter().map(|a| a.as_ref()).collect::<Vec<_>>();
Expand Down Expand Up @@ -430,7 +430,7 @@ impl<'a> Decoder for PlainDecoder<'a> {
let adjusted_offsets = sub(&request, &UInt32Array::new_scalar(start))?;
Ok::<ArrayRef, Error>(take(&array, &adjusted_offsets, None)?)
})
.buffered(self.reader.io_parallelism() as usize)
.buffered(self.reader.io_parallelism())
.try_collect::<Vec<_>>()
.await?;
let references = arrays.iter().map(|a| a.as_ref()).collect::<Vec<_>>();
Expand Down
2 changes: 1 addition & 1 deletion rust/lance-io/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ impl Reader for LocalObjectReader {
self.block_size
}

fn io_parallelism(&self) -> u32 {
fn io_parallelism(&self) -> usize {
DEFAULT_LOCAL_IO_PARALLELISM
}

Expand Down
2 changes: 1 addition & 1 deletion rust/lance-io/src/object_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl Reader for CloudObjectReader {
self.block_size
}

fn io_parallelism(&self) -> u32 {
fn io_parallelism(&self) -> usize {
DEFAULT_CLOUD_IO_PARALLELISM
}

Expand Down
18 changes: 9 additions & 9 deletions rust/lance-io/src/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ use lance_core::{Error, Result};
// Note: the number of threads here also impacts the number of files
// we need to read in some situations. So keeping this at 8 keeps the
// RAM on our scanner down.
pub const DEFAULT_LOCAL_IO_PARALLELISM: u32 = 8;
pub const DEFAULT_LOCAL_IO_PARALLELISM: usize = 8;
// Cloud disks often need many many threads to saturate the network
pub const DEFAULT_CLOUD_IO_PARALLELISM: u32 = 64;
pub const DEFAULT_CLOUD_IO_PARALLELISM: usize = 64;

#[async_trait]
pub trait ObjectStoreExt {
Expand Down Expand Up @@ -94,7 +94,7 @@ pub struct ObjectStore {
scheme: String,
block_size: usize,
pub use_constant_size_upload_parts: bool,
io_parallelism: u32,
io_parallelism: usize,
}

impl DeepSizeOf for ObjectStore {
Expand Down Expand Up @@ -467,7 +467,7 @@ impl ObjectStore {
scheme: String::from("memory"),
block_size: 64 * 1024,
use_constant_size_upload_parts: false,
io_parallelism: get_num_compute_intensive_cpus() as u32,
io_parallelism: get_num_compute_intensive_cpus(),
}
}

Expand All @@ -484,13 +484,13 @@ impl ObjectStore {
self.block_size = new_size;
}

pub fn set_io_parallelism(&mut self, io_parallelism: u32) {
pub fn set_io_parallelism(&mut self, io_parallelism: usize) {
self.io_parallelism = io_parallelism;
}

pub fn io_parallelism(&self) -> u32 {
pub fn io_parallelism(&self) -> usize {
std::env::var("LANCE_IO_THREADS")
.map(|val| val.parse::<u32>().unwrap())
.map(|val| val.parse::<usize>().unwrap())
.unwrap_or(self.io_parallelism)
}

Expand Down Expand Up @@ -847,7 +847,7 @@ async fn configure_store(
scheme: String::from("memory"),
block_size: 64 * 1024,
use_constant_size_upload_parts: false,
io_parallelism: get_num_compute_intensive_cpus() as u32,
io_parallelism: get_num_compute_intensive_cpus(),
}),
unknown_scheme => {
if let Some(provider) = registry.providers.get(unknown_scheme) {
Expand All @@ -870,7 +870,7 @@ impl ObjectStore {
block_size: Option<usize>,
wrapper: Option<Arc<dyn WrappingObjectStore>>,
use_constant_size_upload_parts: bool,
io_parallelism: u32,
io_parallelism: usize,
) -> Self {
let scheme = location.scheme();
let block_size = block_size.unwrap_or_else(|| infer_block_size(scheme));
Expand Down
5 changes: 4 additions & 1 deletion rust/lance-io/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,10 @@ impl ScanScheduler {
/// * config - configuration settings for the scheduler
pub fn new(object_store: Arc<ObjectStore>, config: SchedulerConfig) -> Arc<Self> {
let io_capacity = object_store.io_parallelism();
let io_queue = Arc::new(IoQueue::new(io_capacity, config.io_buffer_size_bytes));
let io_queue = Arc::new(IoQueue::new(
io_capacity as u32,
config.io_buffer_size_bytes,
));
let scheduler = Self {
object_store,
io_queue: io_queue.clone(),
Expand Down
2 changes: 1 addition & 1 deletion rust/lance-io/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ pub trait Reader: std::fmt::Debug + Send + Sync + DeepSizeOf {
fn block_size(&self) -> usize;

/// Suggest optimal I/O parallelism per storage device.
fn io_parallelism(&self) -> u32;
fn io_parallelism(&self) -> usize;

/// Object/File Size.
async fn size(&self) -> object_store::Result<usize>;
Expand Down
10 changes: 5 additions & 5 deletions rust/lance/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1097,7 +1097,7 @@ impl Dataset {
pub async fn count_deleted_rows(&self) -> Result<usize> {
futures::stream::iter(self.get_fragments())
.map(|f| async move { f.count_deletions().await })
.buffer_unordered(self.object_store.io_parallelism() as usize)
.buffer_unordered(self.object_store.io_parallelism())
.try_fold(0, |acc, x| futures::future::ready(Ok(acc + x)))
.await
}
Expand Down Expand Up @@ -1213,7 +1213,7 @@ impl Dataset {
pub async fn num_small_files(&self, max_rows_per_group: usize) -> usize {
futures::stream::iter(self.get_fragments())
.map(|f| async move { f.physical_rows().await })
.buffered(self.object_store.io_parallelism() as usize)
.buffered(self.object_store.io_parallelism())
.try_filter(|row_count| futures::future::ready(*row_count < max_rows_per_group))
.count()
.await
Expand Down Expand Up @@ -1246,7 +1246,7 @@ impl Dataset {
// All fragments have equal lengths
futures::stream::iter(self.get_fragments())
.map(|f| async move { f.validate().await })
.buffer_unordered(self.object_store.io_parallelism() as usize)
.buffer_unordered(self.object_store.io_parallelism())
.try_collect::<Vec<()>>()
.await?;

Expand Down Expand Up @@ -3347,8 +3347,8 @@ mod tests {
.unwrap();
dataset.validate().await.unwrap();

assert!(dataset.num_small_files(1024).await.unwrap() > 0);
assert!(dataset.num_small_files(512).await.unwrap() == 0);
assert!(dataset.num_small_files(1024).await > 0);
assert!(dataset.num_small_files(512).await == 0);
}

#[tokio::test]
Expand Down
9 changes: 4 additions & 5 deletions rust/lance/src/dataset/cleanup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,9 @@ impl<'a> CleanupTask<'a> {
.commit_handler
.list_manifests(&self.dataset.base, &self.dataset.object_store.inner)
.await?
.try_for_each_concurrent(
self.dataset.object_store.io_parallelism() as usize,
|path| self.process_manifest_file(path, &inspection, tagged_versions),
)
.try_for_each_concurrent(self.dataset.object_store.io_parallelism(), |path| {
self.process_manifest_file(path, &inspection, tagged_versions)
})
.await?;
Ok(inspection.into_inner().unwrap())
}
Expand Down Expand Up @@ -276,7 +275,7 @@ impl<'a> CleanupTask<'a> {
.collect::<Vec<_>>()
.await;
let manifest_bytes_removed = stream::iter(manifest_bytes_removed)
.buffer_unordered(self.dataset.object_store.io_parallelism() as usize)
.buffer_unordered(self.dataset.object_store.io_parallelism())
.try_fold(0, |acc, size| async move { Ok(acc + (size as u64)) })
.await;

Expand Down
2 changes: 1 addition & 1 deletion rust/lance/src/dataset/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ pub async fn plan_compaction(
Err(e) => Err(e),
}
})
.buffered(dataset.object_store().io_parallelism() as usize);
.buffered(dataset.object_store().io_parallelism());

let index_fragmaps = load_index_fragmaps(dataset).await?;
let indices_containing_frag = |frag_id: u32| {
Expand Down
2 changes: 1 addition & 1 deletion rust/lance/src/dataset/rowids.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ pub fn load_row_id_sequences<'a>(
.map(|fragment| {
load_row_id_sequence(dataset, fragment).map_ok(move |seq| (fragment.id as u32, seq))
})
.buffer_unordered(dataset.object_store.io_parallelism() as usize)
.buffer_unordered(dataset.object_store.io_parallelism())
}

pub async fn get_row_id_index(
Expand Down
6 changes: 3 additions & 3 deletions rust/lance/src/dataset/take.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ pub async fn take(
})
.collect::<Vec<_>>();
let take_stream = futures::stream::iter(take_tasks)
.buffered(dataset.object_store.io_parallelism() as usize)
.buffered(dataset.object_store.io_parallelism())
.map_err(|err| DataFusionError::External(err.into()))
.boxed();
let take_stream = Box::pin(RecordBatchStreamAdapter::new(
Expand Down Expand Up @@ -253,7 +253,7 @@ pub async fn take_rows(
batches.push(batch_fut);
}
let batches: Vec<RecordBatch> = futures::stream::iter(batches)
.buffered(dataset.object_store.io_parallelism() as usize)
.buffered(dataset.object_store.io_parallelism())
.try_collect()
.await?;
Ok(concat_batches(&batches[0].schema(), &batches)?)
Expand Down Expand Up @@ -293,7 +293,7 @@ pub async fn take_rows(
.map(|(fragment, indices)| {
do_take(fragment, indices, projection.physical_schema.clone(), true)
})
.buffered(dataset.object_store.io_parallelism() as usize)
.buffered(dataset.object_store.io_parallelism())
.try_collect::<Vec<_>>()
.await?;

Expand Down
2 changes: 1 addition & 1 deletion rust/lance/src/dataset/write/merge_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -957,7 +957,7 @@ impl MergeInsertJob {
}
}
})
.buffer_unordered(dataset.object_store.io_parallelism() as usize);
.buffer_unordered(dataset.object_store.io_parallelism());

while let Some(res) = stream.next().await.transpose()? {
match res {
Expand Down
2 changes: 1 addition & 1 deletion rust/lance/src/dataset/write/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ impl UpdateJob {
}
}
})
.buffer_unordered(self.dataset.object_store.io_parallelism() as usize);
.buffer_unordered(self.dataset.object_store.io_parallelism());

while let Some(res) = stream.next().await.transpose()? {
match res {
Expand Down
2 changes: 1 addition & 1 deletion rust/lance/src/index/prefilter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ impl DatasetPreFilter {
.collect::<Vec<_>>()
.await;
let mut frag_id_deletion_vectors = stream::iter(frag_id_deletion_vectors)
.buffer_unordered(dataset.object_store.io_parallelism() as usize);
.buffer_unordered(dataset.object_store.io_parallelism());

let mut deleted_ids = RowIdTreeMap::new();
while let Some((id, deletion_vector)) = frag_id_deletion_vectors.try_next().await? {
Expand Down
2 changes: 1 addition & 1 deletion rust/lance/src/index/vector/ivf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1356,7 +1356,7 @@ pub(crate) async fn remap_index_file(

let mut task_stream = stream::iter(tasks.into_iter())
.map(|task| task.load_and_remap(reader.clone(), index, mapping))
.buffered(object_store.io_parallelism() as usize);
.buffered(object_store.io_parallelism());

let mut ivf = IvfModel {
centroids: index.ivf.centroids.clone(),
Expand Down
4 changes: 2 additions & 2 deletions rust/lance/src/index/vector/ivf/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ pub(super) async fn write_hnsw_quantization_index_partitions(
part_reader.schema(),
)
})
.buffered(object_store.io_parallelism() as usize)
.buffered(object_store.io_parallelism())
.try_collect::<Vec<_>>()
.await?;
writer.write(&batches).await?;
Expand All @@ -429,7 +429,7 @@ pub(super) async fn write_hnsw_quantization_index_partitions(
aux_part_reader.schema(),
)
})
.buffered(object_store.io_parallelism() as usize)
.buffered(object_store.io_parallelism())
.try_collect::<Vec<_>>()
.await?;
std::mem::drop(aux_part_reader);
Expand Down
2 changes: 1 addition & 1 deletion rust/lance/src/io/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ pub(crate) async fn migrate_fragments(
..fragment.clone()
})
})
.buffered(dataset.object_store.io_parallelism() as usize)
.buffered(dataset.object_store.io_parallelism())
.boxed();

new_fragments.try_collect().await
Expand Down
4 changes: 2 additions & 2 deletions rust/lance/src/io/exec/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,9 @@ impl LanceStream {
// answer though and so we err on the side of speed over memory. Users can tone down fragment_parallelism
// by hand if needed.
if projection.fields.is_empty() {
io_parallelism as usize
io_parallelism
} else {
bit_util::ceil(io_parallelism as usize, projection.fields.len())
bit_util::ceil(io_parallelism, projection.fields.len())
}
})
// fragment_readhead=0 doesn't make sense so we just bump it to 1
Expand Down

0 comments on commit 52a50c1

Please sign in to comment.