From eed30f320fbe4558e9b2bce5dd38d3622823caf9 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Thu, 22 Aug 2024 08:35:55 -0700 Subject: [PATCH 1/6] Make the # of CPU threads configurable. Document CPU & Memory patterns --- Cargo.toml | 1 - docs/format.rst | 2 + docs/index.rst | 1 + docs/performance.rst | 83 +++++++++++++++++++ python/Cargo.toml | 1 - python/src/dataset/optimize.rs | 4 +- rust/lance-core/Cargo.toml | 2 +- rust/lance-core/src/utils/tokio.rs | 4 + rust/lance-encoding/Cargo.toml | 1 - rust/lance-file/Cargo.toml | 1 - rust/lance-file/src/reader.rs | 12 ++- rust/lance-index/Cargo.toml | 1 - rust/lance-index/src/scalar.rs | 3 + rust/lance-index/src/scalar/btree.rs | 11 ++- rust/lance-index/src/scalar/inverted/index.rs | 4 +- rust/lance-index/src/scalar/lance_format.rs | 4 + rust/lance-index/src/vector/hnsw/builder.rs | 10 ++- rust/lance-index/src/vector/ivf/shuffler.rs | 3 +- rust/lance-index/src/vector/pq/builder.rs | 3 +- rust/lance-index/src/vector/v3/shuffler.rs | 7 +- rust/lance-io/Cargo.toml | 4 +- rust/lance-io/src/encodings/binary.rs | 2 +- rust/lance-io/src/encodings/plain.rs | 9 +- rust/lance-io/src/local.rs | 5 ++ rust/lance-io/src/object_reader.rs | 6 +- rust/lance-io/src/object_store.rs | 16 +--- rust/lance-io/src/scheduler.rs | 4 +- rust/lance-io/src/traits.rs | 3 + rust/lance-linalg/Cargo.toml | 1 - rust/lance/Cargo.toml | 1 - rust/lance/src/dataset.rs | 19 +++-- rust/lance/src/dataset/cleanup.rs | 9 +- rust/lance/src/dataset/fragment.rs | 5 +- rust/lance/src/dataset/hash_joiner.rs | 5 +- rust/lance/src/dataset/optimize.rs | 6 +- rust/lance/src/dataset/rowids.rs | 2 +- rust/lance/src/dataset/scanner.rs | 6 +- rust/lance/src/dataset/take.rs | 6 +- rust/lance/src/dataset/write/merge_insert.rs | 4 +- rust/lance/src/dataset/write/update.rs | 5 +- rust/lance/src/index/prefilter.rs | 4 +- rust/lance/src/index/vector/builder.rs | 5 +- rust/lance/src/index/vector/ivf.rs | 10 ++- rust/lance/src/index/vector/ivf/io.rs | 8 +- rust/lance/src/index/vector/ivf/v2.rs | 4 +- rust/lance/src/io/commit.rs | 2 +- rust/lance/src/io/exec/knn.rs | 6 +- rust/lance/src/io/exec/pushdown_scan.rs | 5 +- rust/lance/src/io/exec/scan.rs | 2 +- 49 files changed, 217 insertions(+), 105 deletions(-) create mode 100644 docs/performance.rst diff --git a/Cargo.toml b/Cargo.toml index 9c2121ce02..6800feef82 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -121,7 +121,6 @@ mockall = { version = "0.12.1" } mock_instant = { version = "0.3.1", features = ["sync"] } moka = { version = "0.11", features = ["future"] } num-traits = "0.2" -num_cpus = "1.0" # Set min to prevent use of versions with CVE-2024-41178 object_store = { version = "0.10.2" } parquet = "52.0" diff --git a/docs/format.rst b/docs/format.rst index f0e4ccb426..421b73374d 100644 --- a/docs/format.rst +++ b/docs/format.rst @@ -338,6 +338,8 @@ systems and cloud object stores, with the notable except of AWS S3. For ones that lack this functionality, an external locking mechanism can be configured by the user. +.. _conflict_resolution: + Conflict resolution ~~~~~~~~~~~~~~~~~~~ diff --git a/docs/index.rst b/docs/index.rst index 910578dea2..28c96053ce 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -46,6 +46,7 @@ Preview releases receive the same level of testing as regular releases. Lance Formats <./format> Arrays <./arrays> Integrations <./integrations/integrations> + Performance Guide <./performance> API References <./api/api> Contributor Guide <./contributing> Examples <./examples/examples> diff --git a/docs/performance.rst b/docs/performance.rst new file mode 100644 index 0000000000..1fd27e8ede --- /dev/null +++ b/docs/performance.rst @@ -0,0 +1,83 @@ +Lance Performance Guide +======================= + +This guide provides tips and tricks for optimizing the performance of your Lance applications. + +Threading Model +--------------- + +Lance is designed to be thread-safe and performant. Lance APIs can be called concurrently unless +explicity stated otherwise. Users may create multiple tables and share tables between threads. +Operations may run in parallel on the same table, but some operations may lead to conflicts. For +details see :ref:`conflict_resolution`. + +Most Lance operations will use multiple threads to perform work in parallel. There are two thread +pools in lance: the IO thread pool and the compute thread pool. The IO thread pool is used for +reading and writing data from disk. The compute thread pool is used for performing computations +on data. The number of threads in each pool can be configured by the user. + +The IO thread pool is used for reading and writing data from disk. The number of threads in the IO +thread pool is determined by the object store that the operation is working with. Local object stores +will use 8 threads by default. Cloud object stores will use 64 threads by default. This is a fairly +conservative default and you may need 128 or 256 threads to saturate network bandwidth on some cloud +providers. The ``LANCE_IO_THREADS`` environment variable can be used to override the number of IO +threads. If you increase this variable you may also want to increase the ``io_buffer_size``. + +The compute thread pool is used for performing computations on data. The number of threads in the +compute thread pool is determined by the number of cores on the machine. The number of threads in +the compute thread pool can be overridden by setting the ``LANCE_CPU_THREADS`` environment variable. +This is commonly done when running multiple Lance processes on the same machine (e.g when working with +tools like Ray). Keep in mind that decoding data is a compute intensive operation, even if a workload +seems I/O bound (like scanning a table) it may still need quite a few compute threads to achieve peak +performance. + +Memory Requirements +------------------- + +Lance is designed to be memory efficient. Operations should stream data from disk and not require +loading the entire dataset into memory. However, there are a few components of Lance that can use +a lot of memory. + +Index Cache +~~~~~~~~~~~ + +Lance uses an index cache to speed up queries. This caches vector and scalar indices in memory. The +max size of this cache can be configured when creating a ``LanceDataset`` using the ``index_cache_size`` +parameter. This cache is an LRU cached that is sized by "number of entries". The size of each entry +and the number of entries needed depends on the index in question. For example, an IVF/PQ vector index +contains 1 header entry and 1 entry for each partition. The size of each entry is determined by the +number of vectors and the PQ parameters (e.g. number of subvectors). You can view the size of this cache +by inspecting the result of ``dataset.session().size_bytes()``. + +The index cache is not shared between tables. For best performance you should create a single table and +share it across your application. + +Scanning Data +~~~~~~~~~~~~~ + +Searches (e.g. vector search, full text search) do not use a lot of memory to hold data because they don't +typically return a lot of data. However, scanning data can use a lot of memory. Scanning is a streaming +operation but we need enough memory to hold the data that we are scanning. The amount of memory needed is +largely determined by the ``io_buffer_size`` and the ``batch_size`` variables. + +Each I/O thread should have enough memory to buffer an entire page of data. Pages today are typically between +8 and 32 MB. This means, as a rule of thumb, you should generally have about 32MB of memory per I/O thread. +The default ``io_buffer_size`` is 2GB which is enough to buffer 64 pages of data. If you increase the number +of I/O threads you should also increase the ``io_buffer_size``. + +Scans will also decode data (and run any filtering or compute) in parallel on CPU threads. The amount of data +decoded at any one time is determined by the ``batch_size`` and the size of your rows. Each CPU thread will +need enough memory to hold one batch. Once batches are delivered to your application, they are no longer tracked +by Lance and so if memory is a concern then you should also be careful not to accumulate memory in your own +application (e.g. by running ``to_table`` or otherwise collecting all batches in memory.) + +The default ``batch_size`` is 8192 rows. When you are working with mostly scalar data you want to keep batches +around 1MB and so the amount of memory needed by the compute threads is fairly small. However, when working with +large data you may need to turn down the ``batch_size`` to keep memory usage under control. For example, when +working with 1024-dimensional vector embeddings (e.g. 32-bit floats) then 8192 rows would be 32MB of data. If you +spread that across 16 CPU threads then you would need 512MB of compute memory per scan. You might find working +with 1024 rows per batch is more appropriate. + +In summary, scans could use up to ``(2 * io_buffer_size) + (batch_size * num_compute_threads)`` bytes of memory. +Keep in mind that ``io_buffer_size`` is a soft limit (e.g. we cannot read less than one page at a time right now) +and so it is not neccesarily a bug if you see memory usage exceed this limit by a small margin. \ No newline at end of file diff --git a/python/Cargo.toml b/python/Cargo.toml index 34a8669169..dc37450dad 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -48,7 +48,6 @@ uuid = "1.3.0" serde_json = "1" serde = "1.0.197" serde_yaml = "0.9.34" -num_cpus = "1" snafu = "0.7.4" tracing-chrome = "0.7.1" tracing-subscriber = "0.3.17" diff --git a/python/src/dataset/optimize.rs b/python/src/dataset/optimize.rs index bb0623a2e7..eeac1ea8a8 100644 --- a/python/src/dataset/optimize.rs +++ b/python/src/dataset/optimize.rs @@ -46,9 +46,7 @@ fn parse_compaction_options(options: &PyDict) -> PyResult { opts.materialize_deletions_threshold = value.extract()?; } "num_threads" => { - opts.num_threads = value - .extract::>()? - .unwrap_or_else(num_cpus::get); + opts.num_threads = value.extract()?; } "batch_size" => { opts.batch_size = value.extract()?; diff --git a/rust/lance-core/Cargo.toml b/rust/lance-core/Cargo.toml index 0e0f219bdb..3ca1e440df 100644 --- a/rust/lance-core/Cargo.toml +++ b/rust/lance-core/Cargo.toml @@ -27,7 +27,7 @@ futures.workspace = true lazy_static.workspace = true mock_instant.workspace = true moka.workspace = true -num_cpus.workspace = true +num_cpus = "1.0" object_store = { workspace = true } pin-project.workspace = true prost.workspace = true diff --git a/rust/lance-core/src/utils/tokio.rs b/rust/lance-core/src/utils/tokio.rs index 9b9c9d9718..4db88cdc0b 100644 --- a/rust/lance-core/src/utils/tokio.rs +++ b/rust/lance-core/src/utils/tokio.rs @@ -10,6 +10,10 @@ use tokio::runtime::{Builder, Runtime}; use tracing::Span; pub fn get_num_compute_intensive_cpus() -> usize { + if let Ok(user_specified) = std::env::var("LANCE_CPU_THREADS") { + return user_specified.parse().unwrap(); + } + let cpus = num_cpus::get(); if cpus <= *IO_CORE_RESERVATION { diff --git a/rust/lance-encoding/Cargo.toml b/rust/lance-encoding/Cargo.toml index 5e2865efaa..668b95fc4e 100644 --- a/rust/lance-encoding/Cargo.toml +++ b/rust/lance-encoding/Cargo.toml @@ -25,7 +25,6 @@ bytes.workspace = true futures.workspace = true fsst.workspace = true log.workspace = true -num_cpus.workspace = true num-traits.workspace = true prost.workspace = true hyperloglogplus.workspace = true diff --git a/rust/lance-file/Cargo.toml b/rust/lance-file/Cargo.toml index 8ac30c6d64..9dbd064c7b 100644 --- a/rust/lance-file/Cargo.toml +++ b/rust/lance-file/Cargo.toml @@ -30,7 +30,6 @@ datafusion-common.workspace = true deepsize.workspace = true futures.workspace = true log.workspace = true -num_cpus.workspace = true num-traits.workspace = true object_store.workspace = true prost.workspace = true diff --git a/rust/lance-file/src/reader.rs b/rust/lance-file/src/reader.rs index 3f67c9ed6a..0c5df4a4eb 100644 --- a/rust/lance-file/src/reader.rs +++ b/rust/lance-file/src/reader.rs @@ -232,6 +232,10 @@ impl FileReader { Self::try_new_with_fragment_id(object_store, path, schema, 0, 0, max_field_id, None).await } + fn io_parallelism(&self) -> u32 { + self.object_reader.io_parallelism() + } + /// Requested projection of the data in this file, excluding the row id column. pub fn schema(&self) -> &Schema { &self.schema @@ -287,7 +291,7 @@ impl FileReader { .map(|(batch_id, range)| async move { self.read_batch(batch_id, range, projection).await }) - .buffered(num_cpus::get()) + .buffered(self.io_parallelism() as usize) .try_collect::>() .await?; if batches.len() == 1 { @@ -322,7 +326,7 @@ impl FileReader { .await } }) - .buffered(num_cpus::get() * 4) + .buffered(self.io_parallelism() as usize) .try_collect::>() .await?; @@ -368,7 +372,7 @@ impl FileReader { ) .await }) - .buffered(num_cpus::get()) + .buffered(self.io_parallelism() as usize) .try_collect::>() .await?; @@ -432,7 +436,7 @@ pub async fn read_batch( // We box this because otherwise we get a higher-order lifetime error. let arrs = stream::iter(&schema.fields) .map(|f| async { read_array(reader, f, batch_id, &reader.page_table, params).await }) - .buffered(num_cpus::get() * 4) + .buffered(reader.io_parallelism() as usize) .try_collect::>() .boxed(); let arrs = arrs.await?; diff --git a/rust/lance-index/Cargo.toml b/rust/lance-index/Cargo.toml index 8f8669cf71..fffaf1c504 100644 --- a/rust/lance-index/Cargo.toml +++ b/rust/lance-index/Cargo.toml @@ -40,7 +40,6 @@ lance-table.workspace = true lazy_static.workspace = true log.workspace = true moka.workspace = true -num_cpus.workspace = true num-traits.workspace = true object_store.workspace = true prost.workspace = true diff --git a/rust/lance-index/src/scalar.rs b/rust/lance-index/src/scalar.rs index 39db294177..96f41174ab 100644 --- a/rust/lance-index/src/scalar.rs +++ b/rust/lance-index/src/scalar.rs @@ -133,6 +133,9 @@ pub trait IndexReader: Send + Sync { pub trait IndexStore: std::fmt::Debug + Send + Sync + DeepSizeOf { fn as_any(&self) -> &dyn Any; + /// Suggested I/O parallelism for the store + fn io_parallelism(&self) -> u32; + /// Create a new file and return a writer to store data in the file async fn new_index_file(&self, name: &str, schema: Arc) -> Result>; diff --git a/rust/lance-index/src/scalar/btree.rs b/rust/lance-index/src/scalar/btree.rs index e01718344f..e8f0cfb721 100644 --- a/rust/lance-index/src/scalar/btree.rs +++ b/rust/lance-index/src/scalar/btree.rs @@ -29,7 +29,10 @@ use futures::{ stream::{self}, FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt, }; -use lance_core::{utils::mask::RowIdTreeMap, Error, Result}; +use lance_core::{ + utils::{mask::RowIdTreeMap, tokio::get_num_compute_intensive_cpus}, + Error, Result, +}; use lance_datafusion::{ chunker::chunk_concat_stream, exec::{execute_plan, LanceExecutionOptions, OneShotExec}, @@ -762,7 +765,7 @@ impl BTreeIndex { idx: 0, } .map(|fut| fut.map_err(DataFusionError::from)) - .buffered(num_cpus::get()) + .buffered(self.store.io_parallelism() as usize) .boxed(); Ok(RecordBatchStreamAdapter::new(schema, batches)) } @@ -879,7 +882,9 @@ impl ScalarIndex for BTreeIndex { }) .collect::>(); stream::iter(page_tasks) - .buffered(num_cpus::get()) + // I/O and compute mixed here but important case is index in cache so + // use compute intensive thread count + .buffered(get_num_compute_intensive_cpus()) .try_collect::() .await } diff --git a/rust/lance-index/src/scalar/inverted/index.rs b/rust/lance-index/src/scalar/inverted/index.rs index 2859e758a4..59e9df0651 100644 --- a/rust/lance-index/src/scalar/inverted/index.rs +++ b/rust/lance-index/src/scalar/inverted/index.rs @@ -19,6 +19,7 @@ use futures::stream::repeat_with; use futures::{stream, StreamExt, TryStreamExt}; use itertools::Itertools; use lance_core::utils::mask::RowIdTreeMap; +use lance_core::utils::tokio::get_num_compute_intensive_cpus; use lance_core::{Error, Result, ROW_ID}; use lazy_static::lazy_static; use moka::future::Cache; @@ -144,7 +145,8 @@ impl InvertedIndex { mask.clone(), )) }) - .buffered(num_cpus::get()) + // Use compute count since data hopefully cached + .buffered(get_num_compute_intensive_cpus()) .try_collect::>() .await?; diff --git a/rust/lance-index/src/scalar/lance_format.rs b/rust/lance-index/src/scalar/lance_format.rs index b9326b011b..0cc11710de 100644 --- a/rust/lance-index/src/scalar/lance_format.rs +++ b/rust/lance-index/src/scalar/lance_format.rs @@ -193,6 +193,10 @@ impl IndexStore for LanceIndexStore { self } + fn io_parallelism(&self) -> u32 { + self.object_store.io_parallelism() + } + async fn new_index_file( &self, name: &str, diff --git a/rust/lance-index/src/vector/hnsw/builder.rs b/rust/lance-index/src/vector/hnsw/builder.rs index 84d8ad7d51..ae14b9824b 100644 --- a/rust/lance-index/src/vector/hnsw/builder.rs +++ b/rust/lance-index/src/vector/hnsw/builder.rs @@ -11,6 +11,7 @@ use crossbeam_queue::ArrayQueue; use deepsize::DeepSizeOf; use itertools::Itertools; +use lance_core::utils::tokio::get_num_compute_intensive_cpus; use lance_linalg::distance::DistanceType; use rayon::prelude::*; use snafu::{location, Location}; @@ -331,8 +332,8 @@ impl HnswBuilder { .map(|_| AtomicUsize::new(0)) .collect::>(); - let visited_generator_queue = Arc::new(ArrayQueue::new(num_cpus::get())); - for _ in 0..num_cpus::get() { + let visited_generator_queue = Arc::new(ArrayQueue::new(get_num_compute_intensive_cpus())); + for _ in 0..get_num_compute_intensive_cpus() { visited_generator_queue .push(VisitedGenerator::new(0)) .unwrap(); @@ -611,8 +612,9 @@ impl IvfSubIndex for HNSW { } } - let visited_generator_queue = Arc::new(ArrayQueue::new(num_cpus::get() * 2)); - for _ in 0..num_cpus::get() * 2 { + let visited_generator_queue = + Arc::new(ArrayQueue::new(get_num_compute_intensive_cpus() * 2)); + for _ in 0..get_num_compute_intensive_cpus() * 2 { visited_generator_queue .push(VisitedGenerator::new(0)) .unwrap(); diff --git a/rust/lance-index/src/vector/ivf/shuffler.rs b/rust/lance-index/src/vector/ivf/shuffler.rs index f724a1211a..abbcf41fd7 100644 --- a/rust/lance-index/src/vector/ivf/shuffler.rs +++ b/rust/lance-index/src/vector/ivf/shuffler.rs @@ -27,6 +27,7 @@ use arrow_schema::{DataType, Field, Fields}; use futures::stream::repeat_with; use futures::{stream, FutureExt, Stream, StreamExt, TryStreamExt}; use lance_arrow::RecordBatchExt; +use lance_core::utils::tokio::get_num_compute_intensive_cpus; use lance_core::{datatypes::Schema, Error, Result, ROW_ID}; use lance_encoding::decoder::{DecoderMiddlewareChain, FilterExpression}; use lance_file::reader::FileReader; @@ -324,7 +325,7 @@ pub async fn shuffle_dataset( ivf.transform(&batch) }) }) - .buffer_unordered(num_cpus::get()) + .buffer_unordered(get_num_compute_intensive_cpus()) .map(|res| match res { Ok(Ok(batch)) => Ok(batch), Ok(Err(err)) => Err(Error::io(err.to_string(), location!())), diff --git a/rust/lance-index/src/vector/pq/builder.rs b/rust/lance-index/src/vector/pq/builder.rs index 1c4e812a04..f4d2a6c4f1 100644 --- a/rust/lance-index/src/vector/pq/builder.rs +++ b/rust/lance-index/src/vector/pq/builder.rs @@ -12,6 +12,7 @@ use arrow_array::{ArrowNumericType, FixedSizeListArray, PrimitiveArray}; use arrow_schema::DataType; use futures::{stream, StreamExt, TryStreamExt}; use lance_arrow::FixedSizeListArrayExt; +use lance_core::utils::tokio::get_num_compute_intensive_cpus; use lance_core::{Error, Result}; use lance_linalg::distance::DistanceType; use lance_linalg::distance::{Dot, Normalize, L2}; @@ -117,7 +118,7 @@ impl PQBuildParams { ) .await }) - .buffered(num_cpus::get()) + .buffered(get_num_compute_intensive_cpus()) .try_collect::>() .await?; let mut codebook_builder = PrimitiveBuilder::::with_capacity(num_centroids * dimension); diff --git a/rust/lance-index/src/vector/v3/shuffler.rs b/rust/lance-index/src/vector/v3/shuffler.rs index 4fcf5861bc..97ca35c2ac 100644 --- a/rust/lance-index/src/vector/v3/shuffler.rs +++ b/rust/lance-index/src/vector/v3/shuffler.rs @@ -11,7 +11,10 @@ use arrow_array::{RecordBatch, UInt32Array}; use future::join_all; use futures::prelude::*; use lance_arrow::RecordBatchExt; -use lance_core::{utils::tokio::spawn_cpu, Error, Result}; +use lance_core::{ + utils::tokio::{get_num_compute_intensive_cpus, spawn_cpu}, + Error, Result, +}; use lance_encoding::decoder::{DecoderMiddlewareChain, FilterExpression}; use lance_file::v2::{reader::FileReader, writer::FileWriter}; use lance_io::{ @@ -123,7 +126,7 @@ impl Shuffler for IvfShuffler { Ok::>, Error>(partition_buffers) }) }) - .buffered(num_cpus::get()); + .buffered(get_num_compute_intensive_cpus()); // part_id: | 0 | 1 | 3 | // partition_buffers: |[batch,batch,..]|[batch,batch,..]|[batch,batch,..]| diff --git a/rust/lance-io/Cargo.toml b/rust/lance-io/Cargo.toml index d7b550a650..a27e59ae9f 100644 --- a/rust/lance-io/Cargo.toml +++ b/rust/lance-io/Cargo.toml @@ -12,6 +12,8 @@ categories.workspace = true rust-version.workspace = true [dependencies] + +object_store = { workspace = true, features = ["aws", "gcp", "azure"] } lance-arrow.workspace = true lance-core.workspace = true arrow = { workspace = true, features = ["ffi"] } @@ -33,8 +35,6 @@ deepsize.workspace = true futures.workspace = true lazy_static.workspace = true log.workspace = true -num_cpus.workspace = true -object_store = { workspace = true, features = ["aws", "gcp", "azure"] } pin-project.workspace = true prost.workspace = true shellexpand.workspace = true diff --git a/rust/lance-io/src/encodings/binary.rs b/rust/lance-io/src/encodings/binary.rs index af6ec57507..44d665fdf6 100644 --- a/rust/lance-io/src/encodings/binary.rs +++ b/rust/lance-io/src/encodings/binary.rs @@ -340,7 +340,7 @@ impl<'a, T: ByteArrayType> Decoder for BinaryDecoder<'a, T> { .await?; Result::Ok((chunk, chunk_offset, array)) }) - .buffered(num_cpus::get()) + .buffered(self.reader.io_parallelism() as usize) .try_for_each(|(chunk, chunk_offset, array)| { let array: &GenericByteArray = array.as_bytes(); diff --git a/rust/lance-io/src/encodings/plain.rs b/rust/lance-io/src/encodings/plain.rs index 1911cd13c9..76e5b1d553 100644 --- a/rust/lance-io/src/encodings/plain.rs +++ b/rust/lance-io/src/encodings/plain.rs @@ -35,11 +35,6 @@ use tokio::io::AsyncWriteExt; use crate::encodings::{AsyncIndex, Decoder}; -/// Parallelism factor decides how many run parallel I/O issued per CPU core. -/// This is a heuristic value, with the assumption NVME and S3/GCS can -/// handles large mount of parallel I/O & large disk-queue. -const PARALLELISM_FACTOR: usize = 4; - /// Encoder for plain encoding. /// pub struct PlainEncoder<'a> { @@ -369,7 +364,7 @@ impl<'a> PlainDecoder<'a> { let shifted_indices = sub(&request, &UInt32Array::new_scalar(start))?; Ok::(take(&array, &shifted_indices, None)?) }) - .buffered(num_cpus::get()) + .buffered(self.reader.io_parallelism() as usize) .try_collect::>() .await?; let references = arrays.iter().map(|a| a.as_ref()).collect::>(); @@ -435,7 +430,7 @@ impl<'a> Decoder for PlainDecoder<'a> { let adjusted_offsets = sub(&request, &UInt32Array::new_scalar(start))?; Ok::(take(&array, &adjusted_offsets, None)?) }) - .buffered(num_cpus::get() * PARALLELISM_FACTOR) + .buffered(self.reader.io_parallelism() as usize) .try_collect::>() .await?; let references = arrays.iter().map(|a| a.as_ref()).collect::>(); diff --git a/rust/lance-io/src/local.rs b/rust/lance-io/src/local.rs index c44cd61006..503b7002eb 100644 --- a/rust/lance-io/src/local.rs +++ b/rust/lance-io/src/local.rs @@ -24,6 +24,7 @@ use tokio::io::AsyncSeekExt; use tokio::sync::OnceCell; use tracing::instrument; +use crate::object_store::DEFAULT_LOCAL_IO_PARALLELISM; use crate::traits::{Reader, Writer}; /// Convert an [`object_store::path::Path`] to a [`std::path::Path`]. @@ -122,6 +123,10 @@ impl Reader for LocalObjectReader { self.block_size } + fn io_parallelism(&self) -> u32 { + DEFAULT_LOCAL_IO_PARALLELISM + } + /// Returns the file size. async fn size(&self) -> object_store::Result { let file = self.file.clone(); diff --git a/rust/lance-io/src/object_reader.rs b/rust/lance-io/src/object_reader.rs index 074a7e9f5a..53d47965ed 100644 --- a/rust/lance-io/src/object_reader.rs +++ b/rust/lance-io/src/object_reader.rs @@ -13,7 +13,7 @@ use object_store::{path::Path, ObjectStore}; use tokio::sync::OnceCell; use tracing::instrument; -use crate::traits::Reader; +use crate::{object_store::DEFAULT_CLOUD_IO_PARALLELISM, traits::Reader}; /// Object Reader /// @@ -85,6 +85,10 @@ impl Reader for CloudObjectReader { self.block_size } + fn io_parallelism(&self) -> u32 { + DEFAULT_CLOUD_IO_PARALLELISM + } + /// Object/File Size. async fn size(&self) -> object_store::Result { self.size diff --git a/rust/lance-io/src/object_store.rs b/rust/lance-io/src/object_store.rs index 18ae092308..1747c753c5 100644 --- a/rust/lance-io/src/object_store.rs +++ b/rust/lance-io/src/object_store.rs @@ -488,20 +488,10 @@ impl ObjectStore { self.io_parallelism = io_parallelism; } - pub fn io_parallelism(&self) -> Result { + pub fn io_parallelism(&self) -> u32 { std::env::var("LANCE_IO_THREADS") - .map(|val| { - val.parse::().map_err(|parse_err| { - Error::invalid_input( - format!( - "The LANCE_IO_THREADS variable is not set to an integer: {}", - parse_err - ), - location!(), - ) - }) - }) - .unwrap_or(Ok(self.io_parallelism)) + .map(|val| val.parse::().unwrap()) + .unwrap_or(self.io_parallelism) } /// Open a file for path. diff --git a/rust/lance-io/src/scheduler.rs b/rust/lance-io/src/scheduler.rs index f8e9ebc1c1..c60ef8bf82 100644 --- a/rust/lance-io/src/scheduler.rs +++ b/rust/lance-io/src/scheduler.rs @@ -405,7 +405,7 @@ impl SchedulerConfig { /// at all). We assume a max page size of 32MiB and then allow 32MiB per I/O thread pub fn max_bandwidth(store: &ObjectStore) -> Self { Self { - io_buffer_size_bytes: 32 * 1024 * 1024 * store.io_parallelism().unwrap() as u64, + io_buffer_size_bytes: 32 * 1024 * 1024 * store.io_parallelism() as u64, } } } @@ -418,7 +418,7 @@ impl ScanScheduler { /// * object_store - the store to wrap /// * config - configuration settings for the scheduler pub fn new(object_store: Arc, config: SchedulerConfig) -> Arc { - let io_capacity = object_store.io_parallelism().unwrap(); + let io_capacity = object_store.io_parallelism(); let io_queue = Arc::new(IoQueue::new(io_capacity, config.io_buffer_size_bytes)); let scheduler = Self { object_store, diff --git a/rust/lance-io/src/traits.rs b/rust/lance-io/src/traits.rs index 1f04097853..572c18664f 100644 --- a/rust/lance-io/src/traits.rs +++ b/rust/lance-io/src/traits.rs @@ -86,6 +86,9 @@ pub trait Reader: std::fmt::Debug + Send + Sync + DeepSizeOf { /// Suggest optimal I/O size per storage device. fn block_size(&self) -> usize; + /// Suggest optimal I/O parallelism per storage device. + fn io_parallelism(&self) -> u32; + /// Object/File Size. async fn size(&self) -> object_store::Result; diff --git a/rust/lance-linalg/Cargo.toml b/rust/lance-linalg/Cargo.toml index 9ad7a09940..2410a8e7e9 100644 --- a/rust/lance-linalg/Cargo.toml +++ b/rust/lance-linalg/Cargo.toml @@ -21,7 +21,6 @@ lance-arrow = { workspace = true } lance-core = { workspace = true } lazy_static = { workspace = true } log = { workspace = true } -num_cpus = { workspace = true } num-traits = { workspace = true } rand = { workspace = true } rayon = { workspace = true } diff --git a/rust/lance/Cargo.toml b/rust/lance/Cargo.toml index 1bc62ec3c1..d1242558ea 100644 --- a/rust/lance/Cargo.toml +++ b/rust/lance/Cargo.toml @@ -55,7 +55,6 @@ rand.workspace = true futures.workspace = true uuid.workspace = true arrow.workspace = true -num_cpus.workspace = true # TODO: use datafusion sub-modules to reduce build size? datafusion.workspace = true datafusion-functions.workspace = true diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index e9ef2facf6..0937ff2443 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -11,6 +11,7 @@ use deepsize::DeepSizeOf; use futures::future::BoxFuture; use futures::stream::{self, StreamExt, TryStreamExt}; use futures::{FutureExt, Stream}; +use lance_core::utils::tokio::get_num_compute_intensive_cpus; use lance_core::{datatypes::SchemaCompareOptions, traits::DatasetTakeRows}; use lance_datafusion::projection::ProjectionPlan; use lance_datafusion::utils::{peek_reader_schema, reader_to_stream}; @@ -1054,7 +1055,7 @@ impl Dataset { let new_fragment = f.delete(predicate).await?.map(|f| f.metadata); Ok((old_fragment, new_fragment)) }) - .buffer_unordered(num_cpus::get()) + .buffer_unordered(get_num_compute_intensive_cpus()) // Drop the fragments that were deleted. .try_for_each(|(old_fragment, new_fragment)| { if let Some(new_fragment) = new_fragment { @@ -1096,7 +1097,7 @@ impl Dataset { pub async fn count_deleted_rows(&self) -> Result { futures::stream::iter(self.get_fragments()) .map(|f| async move { f.count_deletions().await }) - .buffer_unordered(num_cpus::get() * 4) + .buffer_unordered(self.object_store.io_parallelism() as usize) .try_fold(0, |acc, x| futures::future::ready(Ok(acc + x))) .await } @@ -1209,13 +1210,13 @@ impl Dataset { /// group. These are considered too small because reading many of them is /// much less efficient than reading a single file because the separate files /// split up what would otherwise be single IO requests into multiple. - pub async fn num_small_files(&self, max_rows_per_group: usize) -> usize { - futures::stream::iter(self.get_fragments()) + pub async fn num_small_files(&self, max_rows_per_group: usize) -> Result { + Ok(futures::stream::iter(self.get_fragments()) .map(|f| async move { f.physical_rows().await }) - .buffered(num_cpus::get() * 4) + .buffered(self.object_store.io_parallelism() as usize) .try_filter(|row_count| futures::future::ready(*row_count < max_rows_per_group)) .count() - .await + .await) } pub async fn validate(&self) -> Result<()> { @@ -1245,7 +1246,7 @@ impl Dataset { // All fragments have equal lengths futures::stream::iter(self.get_fragments()) .map(|f| async move { f.validate().await }) - .buffer_unordered(num_cpus::get() * 4) + .buffer_unordered(self.object_store.io_parallelism() as usize) .try_collect::>() .await?; @@ -3346,8 +3347,8 @@ mod tests { .unwrap(); dataset.validate().await.unwrap(); - assert!(dataset.num_small_files(1024).await > 0); - assert!(dataset.num_small_files(512).await == 0); + assert!(dataset.num_small_files(1024).await.unwrap() > 0); + assert!(dataset.num_small_files(512).await.unwrap() == 0); } #[tokio::test] diff --git a/rust/lance/src/dataset/cleanup.rs b/rust/lance/src/dataset/cleanup.rs index b60b85601a..8fb0032aef 100644 --- a/rust/lance/src/dataset/cleanup.rs +++ b/rust/lance/src/dataset/cleanup.rs @@ -152,9 +152,10 @@ impl<'a> CleanupTask<'a> { .commit_handler .list_manifests(&self.dataset.base, &self.dataset.object_store.inner) .await? - .try_for_each_concurrent(num_cpus::get(), |path| { - self.process_manifest_file(path, &inspection, tagged_versions) - }) + .try_for_each_concurrent( + self.dataset.object_store.io_parallelism() as usize, + |path| self.process_manifest_file(path, &inspection, tagged_versions), + ) .await?; Ok(inspection.into_inner().unwrap()) } @@ -275,7 +276,7 @@ impl<'a> CleanupTask<'a> { .collect::>() .await; let manifest_bytes_removed = stream::iter(manifest_bytes_removed) - .buffer_unordered(num_cpus::get()) + .buffer_unordered(self.dataset.object_store.io_parallelism() as usize) .try_fold(0, |acc, size| async move { Ok(acc + (size as u64)) }) .await; diff --git a/rust/lance/src/dataset/fragment.rs b/rust/lance/src/dataset/fragment.rs index ab1ada0b49..0a94c46f65 100644 --- a/rust/lance/src/dataset/fragment.rs +++ b/rust/lance/src/dataset/fragment.rs @@ -19,6 +19,7 @@ use datafusion::scalar::ScalarValue; use futures::future::try_join_all; use futures::{join, stream, FutureExt, StreamExt, TryFutureExt, TryStreamExt}; use lance_core::utils::deletion::DeletionVector; +use lance_core::utils::tokio::get_num_compute_intensive_cpus; use lance_core::{datatypes::Schema, Error, Result}; use lance_core::{ROW_ADDR, ROW_ADDR_FIELD, ROW_ID_FIELD}; use lance_encoding::decoder::DecoderMiddlewareChain; @@ -1719,7 +1720,7 @@ impl FragmentReader { range.start as u32..range.end as u32, DEFAULT_BATCH_READ_SIZE, )? - .buffered(num_cpus::get()) + .buffered(get_num_compute_intensive_cpus()) .try_collect::>() .await?; concat_batches(&Arc::new(self.output_schema.clone()), batches.iter()).map_err(Error::from) @@ -1741,7 +1742,7 @@ impl FragmentReader { let batches = self .take(indices, u32::MAX) .await? - .buffered(num_cpus::get()) + .buffered(get_num_compute_intensive_cpus()) .try_collect::>() .await?; concat_batches(&Arc::new(self.output_schema.clone()), batches.iter()).map_err(Error::from) diff --git a/rust/lance/src/dataset/hash_joiner.rs b/rust/lance/src/dataset/hash_joiner.rs index 3c3190579b..6333acb1d1 100644 --- a/rust/lance/src/dataset/hash_joiner.rs +++ b/rust/lance/src/dataset/hash_joiner.rs @@ -12,6 +12,7 @@ use arrow_schema::{DataType as ArrowDataType, SchemaRef}; use arrow_select::interleave::interleave; use dashmap::{DashMap, ReadOnlyView}; use futures::{StreamExt, TryStreamExt}; +use lance_core::utils::tokio::get_num_compute_intensive_cpus; use snafu::{location, Location}; use tokio::task; @@ -78,7 +79,7 @@ impl HashJoiner { let map = Arc::new(map); futures::stream::iter(batches.iter().enumerate().map(Ok::<_, Error>)) - .try_for_each_concurrent(num_cpus::get(), |(batch_i, batch)| { + .try_for_each_concurrent(get_num_compute_intensive_cpus(), |(batch_i, batch)| { // A clone of map we can send to a new thread let map = map.clone(); async move { @@ -199,7 +200,7 @@ impl HashJoiner { } } }) - .buffered(num_cpus::get()) + .buffered(get_num_compute_intensive_cpus()) .try_collect::>() .await?; diff --git a/rust/lance/src/dataset/optimize.rs b/rust/lance/src/dataset/optimize.rs index e6739b4c49..b4dca9c352 100644 --- a/rust/lance/src/dataset/optimize.rs +++ b/rust/lance/src/dataset/optimize.rs @@ -87,6 +87,7 @@ use std::sync::{Arc, RwLock}; use datafusion::physical_plan::SendableRecordBatchStream; use futures::{StreamExt, TryStreamExt}; +use lance_core::utils::tokio::get_num_compute_intensive_cpus; use lance_index::DatasetIndexExt; use lance_table::io::deletion::read_deletion_file; use roaring::{RoaringBitmap, RoaringTreemap}; @@ -153,7 +154,8 @@ impl Default for CompactionOptions { max_rows_per_group: 1024, materialize_deletions: true, materialize_deletions_threshold: 0.1, - num_threads: num_cpus::get(), + // TODO: Should this be based on # I/O threads? + num_threads: get_num_compute_intensive_cpus(), max_bytes_per_file: None, batch_size: None, } @@ -467,7 +469,7 @@ pub async fn plan_compaction( Err(e) => Err(e), } }) - .buffered(num_cpus::get() * 2); + .buffered(dataset.object_store().io_parallelism() as usize); let index_fragmaps = load_index_fragmaps(dataset).await?; let indices_containing_frag = |frag_id: u32| { diff --git a/rust/lance/src/dataset/rowids.rs b/rust/lance/src/dataset/rowids.rs index 64bff4d88e..338c758dd4 100644 --- a/rust/lance/src/dataset/rowids.rs +++ b/rust/lance/src/dataset/rowids.rs @@ -64,7 +64,7 @@ pub fn load_row_id_sequences<'a>( .map(|fragment| { load_row_id_sequence(dataset, fragment).map_ok(move |seq| (fragment.id as u32, seq)) }) - .buffer_unordered(num_cpus::get()) + .buffer_unordered(dataset.object_store.io_parallelism() as usize) } pub async fn get_row_id_index( diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index d7e9d6c482..0b800d5b77 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -34,6 +34,7 @@ use datafusion_physical_expr::PhysicalExpr; use futures::stream::{Stream, StreamExt}; use futures::TryStreamExt; use lance_arrow::floats::{coerce_float_vector, FloatType}; +use lance_core::utils::tokio::get_num_compute_intensive_cpus; use lance_core::{ROW_ADDR, ROW_ADDR_FIELD, ROW_ID, ROW_ID_FIELD}; use lance_datafusion::exec::{execute_plan, LanceExecutionOptions}; use lance_datafusion::projection::ProjectionPlan; @@ -68,9 +69,6 @@ use lance_datafusion::expr::parse_substrait; pub const DEFAULT_BATCH_SIZE: usize = 8192; -// Same as pyarrow Dataset::scanner() -pub const DEFAULT_BATCH_READAHEAD: usize = 16; - // Same as pyarrow Dataset::scanner() pub const DEFAULT_FRAGMENT_READAHEAD: usize = 4; @@ -222,7 +220,7 @@ impl Scanner { filter: None, full_text_query: None, batch_size: None, - batch_readahead: DEFAULT_BATCH_READAHEAD, + batch_readahead: get_num_compute_intensive_cpus(), fragment_readahead: None, io_buffer_size: None, limit: None, diff --git a/rust/lance/src/dataset/take.rs b/rust/lance/src/dataset/take.rs index 830c73beca..d5510e70fd 100644 --- a/rust/lance/src/dataset/take.rs +++ b/rust/lance/src/dataset/take.rs @@ -130,7 +130,7 @@ pub async fn take( }) .collect::>(); let take_stream = futures::stream::iter(take_tasks) - .buffered(num_cpus::get() * 4) + .buffered(dataset.object_store.io_parallelism() as usize) .map_err(|err| DataFusionError::External(err.into())) .boxed(); let take_stream = Box::pin(RecordBatchStreamAdapter::new( @@ -253,7 +253,7 @@ pub async fn take_rows( batches.push(batch_fut); } let batches: Vec = futures::stream::iter(batches) - .buffered(4 * num_cpus::get()) + .buffered(dataset.object_store.io_parallelism() as usize) .try_collect() .await?; Ok(concat_batches(&batches[0].schema(), &batches)?) @@ -293,7 +293,7 @@ pub async fn take_rows( .map(|(fragment, indices)| { do_take(fragment, indices, projection.physical_schema.clone(), true) }) - .buffered(4 * num_cpus::get()) + .buffered(dataset.object_store.io_parallelism() as usize) .try_collect::>() .await?; diff --git a/rust/lance/src/dataset/write/merge_insert.rs b/rust/lance/src/dataset/write/merge_insert.rs index 9b5d45581f..7db790fc24 100644 --- a/rust/lance/src/dataset/write/merge_insert.rs +++ b/rust/lance/src/dataset/write/merge_insert.rs @@ -452,7 +452,7 @@ impl MergeInsertJob { self.dataset.clone(), index_mapper, Arc::new(self.dataset.schema().project_by_schema(schema.as_ref())?), - num_cpus::get(), + get_num_compute_intensive_cpus(), )?) as Arc; // 5 - Take puts the row id and row addr at the beginning. A full scan (used when there is @@ -957,7 +957,7 @@ impl MergeInsertJob { } } }) - .buffer_unordered(num_cpus::get() * 4); + .buffer_unordered(dataset.object_store.io_parallelism() as usize); while let Some(res) = stream.next().await.transpose()? { match res { diff --git a/rust/lance/src/dataset/write/update.rs b/rust/lance/src/dataset/write/update.rs index ed8a3bfdf5..9db98c3c3d 100644 --- a/rust/lance/src/dataset/write/update.rs +++ b/rust/lance/src/dataset/write/update.rs @@ -18,6 +18,7 @@ use datafusion::scalar::ScalarValue; use futures::StreamExt; use lance_arrow::RecordBatchExt; use lance_core::error::{box_error, InvalidInputSnafu}; +use lance_core::utils::tokio::get_num_compute_intensive_cpus; use lance_datafusion::expr::safe_coerce_scalar; use lance_table::format::Fragment; use roaring::RoaringTreemap; @@ -223,7 +224,7 @@ impl UpdateJob { let updates = updates_ref.clone(); tokio::task::spawn_blocking(move || Self::apply_updates(batch?, updates)) }) - .buffered(num_cpus::get()) + .buffered(get_num_compute_intensive_cpus()) .map(|res| match res { Ok(Ok(batch)) => Ok(batch), Ok(Err(err)) => Err(err), @@ -301,7 +302,7 @@ impl UpdateJob { } } }) - .buffer_unordered(num_cpus::get() * 4); + .buffer_unordered(self.dataset.object_store.io_parallelism() as usize); while let Some(res) = stream.next().await.transpose()? { match res { diff --git a/rust/lance/src/index/prefilter.rs b/rust/lance/src/index/prefilter.rs index e60ad6d65f..112732abb9 100644 --- a/rust/lance/src/index/prefilter.rs +++ b/rust/lance/src/index/prefilter.rs @@ -104,8 +104,8 @@ impl DatasetPreFilter { }) .collect::>() .await; - let mut frag_id_deletion_vectors = - stream::iter(frag_id_deletion_vectors).buffer_unordered(num_cpus::get()); + let mut frag_id_deletion_vectors = stream::iter(frag_id_deletion_vectors) + .buffer_unordered(dataset.object_store.io_parallelism() as usize); let mut deleted_ids = RowIdTreeMap::new(); while let Some((id, deletion_vector)) = frag_id_deletion_vectors.try_next().await? { diff --git a/rust/lance/src/index/vector/builder.rs b/rust/lance/src/index/vector/builder.rs index 165b9c9c3c..1cda8c8475 100644 --- a/rust/lance/src/index/vector/builder.rs +++ b/rust/lance/src/index/vector/builder.rs @@ -8,6 +8,7 @@ use arrow_array::{RecordBatch, UInt64Array}; use futures::prelude::stream::{StreamExt, TryStreamExt}; use itertools::Itertools; use lance_arrow::RecordBatchExt; +use lance_core::utils::tokio::get_num_compute_intensive_cpus; use lance_core::{Error, Result, ROW_ID_FIELD}; use lance_encoding::decoder::{DecoderMiddlewareChain, FilterExpression}; use lance_file::v2::{reader::FileReader, writer::FileWriter}; @@ -235,7 +236,7 @@ impl IvfIndexBuilde let stream = self .dataset .scan() - .batch_readahead(num_cpus::get() * 2) + .batch_readahead(get_num_compute_intensive_cpus()) .project(&[self.column.as_str()])? .with_row_id() .try_into_stream() @@ -279,7 +280,7 @@ impl IvfIndexBuilde let ivf_transformer = transformer.clone(); tokio::spawn(async move { ivf_transformer.transform(&batch?) }) }) - .buffered(num_cpus::get()) + .buffered(get_num_compute_intensive_cpus()) .map(|x| x.unwrap()) .peekable(), ); diff --git a/rust/lance/src/index/vector/ivf.rs b/rust/lance/src/index/vector/ivf.rs index 9591839122..3e1b6541ac 100644 --- a/rust/lance/src/index/vector/ivf.rs +++ b/rust/lance/src/index/vector/ivf.rs @@ -26,7 +26,10 @@ use futures::{ }; use io::write_hnsw_quantization_index_partitions; use lance_arrow::*; -use lance_core::{datatypes::Field, traits::DatasetTakeRows, Error, Result, ROW_ID_FIELD}; +use lance_core::{ + datatypes::Field, traits::DatasetTakeRows, utils::tokio::get_num_compute_intensive_cpus, Error, + Result, ROW_ID_FIELD, +}; use lance_file::{ format::MAGIC, writer::{FileWriter, FileWriterOptions}, @@ -783,7 +786,7 @@ impl VectorIndex for IVFIndex { let part_ids = partition_ids.values().to_vec(); let batches = stream::iter(part_ids) .map(|part_id| self.search_in_partition(part_id as usize, &query, pre_filter.clone())) - .buffer_unordered(num_cpus::get()) + .buffer_unordered(get_num_compute_intensive_cpus()) .try_collect::>() .await?; let batch = concat_batches(&batches[0].schema(), &batches)?; @@ -1181,7 +1184,6 @@ async fn scan_index_field_stream( column: &str, ) -> Result { let mut scanner = dataset.scan(); - scanner.batch_readahead(num_cpus::get() * 2); scanner.project(&[column])?; scanner.with_row_id(); scanner.try_into_stream().await @@ -1354,7 +1356,7 @@ pub(crate) async fn remap_index_file( let mut task_stream = stream::iter(tasks.into_iter()) .map(|task| task.load_and_remap(reader.clone(), index, mapping)) - .buffered(num_cpus::get()); + .buffered(object_store.io_parallelism() as usize); let mut ivf = IvfModel { centroids: index.ivf.centroids.clone(), diff --git a/rust/lance/src/index/vector/ivf/io.rs b/rust/lance/src/index/vector/ivf/io.rs index eaf0014a7a..5910c2adef 100644 --- a/rust/lance/src/index/vector/ivf/io.rs +++ b/rust/lance/src/index/vector/ivf/io.rs @@ -16,7 +16,7 @@ use futures::{Stream, StreamExt, TryStreamExt}; use lance_arrow::*; use lance_core::datatypes::Schema; use lance_core::traits::DatasetTakeRows; -use lance_core::utils::tokio::spawn_cpu; +use lance_core::utils::tokio::{get_num_compute_intensive_cpus, spawn_cpu}; use lance_core::Error; use lance_file::reader::FileReader; use lance_file::writer::FileWriter; @@ -49,7 +49,7 @@ use crate::Result; // TODO: make it configurable, limit by the number of CPU cores & memory lazy_static::lazy_static! { - static ref HNSW_PARTITIONS_BUILD_PARALLEL: usize = num_cpus::get(); + static ref HNSW_PARTITIONS_BUILD_PARALLEL: usize = get_num_compute_intensive_cpus(); } /// Merge streams with the same partition id and collect PQ codes and row IDs. @@ -405,7 +405,7 @@ pub(super) async fn write_hnsw_quantization_index_partitions( part_reader.schema(), ) }) - .buffered(num_cpus::get()) + .buffered(object_store.io_parallelism() as usize) .try_collect::>() .await?; writer.write(&batches).await?; @@ -429,7 +429,7 @@ pub(super) async fn write_hnsw_quantization_index_partitions( aux_part_reader.schema(), ) }) - .buffered(num_cpus::get()) + .buffered(object_store.io_parallelism() as usize) .try_collect::>() .await?; std::mem::drop(aux_part_reader); diff --git a/rust/lance/src/index/vector/ivf/v2.rs b/rust/lance/src/index/vector/ivf/v2.rs index 44fc61c8c9..85e9b9a938 100644 --- a/rust/lance/src/index/vector/ivf/v2.rs +++ b/rust/lance/src/index/vector/ivf/v2.rs @@ -21,6 +21,7 @@ use async_trait::async_trait; use deepsize::DeepSizeOf; use futures::prelude::stream::{self, StreamExt, TryStreamExt}; use lance_arrow::RecordBatchExt; +use lance_core::utils::tokio::get_num_compute_intensive_cpus; use lance_core::{cache::DEFAULT_INDEX_CACHE_SIZE, Error, Result}; use lance_encoding::decoder::{DecoderMiddlewareChain, FilterExpression}; use lance_file::v2::reader::FileReader; @@ -367,7 +368,7 @@ impl>() .await?; let batch = concat_batches(&batches[0].schema(), &batches)?; @@ -425,7 +426,6 @@ impl Self { Self { - batch_readahead: DEFAULT_BATCH_READAHEAD, + batch_readahead: get_num_compute_intensive_cpus(), fragment_readahead: DEFAULT_FRAGMENT_READAHEAD, with_row_id: false, with_row_address: false, diff --git a/rust/lance/src/io/exec/scan.rs b/rust/lance/src/io/exec/scan.rs index 3578a4e791..709e182869 100644 --- a/rust/lance/src/io/exec/scan.rs +++ b/rust/lance/src/io/exec/scan.rs @@ -156,7 +156,7 @@ impl LanceStream { io_buffer_size: u64, ) -> Result { let project_schema = projection.clone(); - let io_parallelism = dataset.object_store.io_parallelism()?; + let io_parallelism = dataset.object_store.io_parallelism(); let frag_parallelism = fragment_parallelism .unwrap_or_else(|| { // This is somewhat aggressive. It assumes a single page per column. If there are many pages per From 11f1ec6e47df0c55c323a51196c4dc29baceb659 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Thu, 22 Aug 2024 09:06:08 -0700 Subject: [PATCH 2/6] Minor cleanup. Add note for devs --- python/DEVELOPMENT.md | 47 ++++++++++++++++++++++++--------------- rust/lance/src/dataset.rs | 6 ++--- 2 files changed, 32 insertions(+), 21 deletions(-) diff --git a/python/DEVELOPMENT.md b/python/DEVELOPMENT.md index ddbe90a78b..5202701d6e 100644 --- a/python/DEVELOPMENT.md +++ b/python/DEVELOPMENT.md @@ -47,8 +47,8 @@ make lint ### Format and lint on commit If you would like to run the formatters and linters when you commit your code -then you can use the pre-commit tool. The project includes a pre-commit config -file already. First, install the pre-commit tool: +then you can use the pre-commit tool. The project includes a pre-commit config +file already. First, install the pre-commit tool: ```shell pip install pre-commit @@ -162,33 +162,33 @@ pytest --benchmark-compare=$COMPARE_ID python/benchmarks -m "not slow" ## Tracing Rust has great integration with tools like criterion and pprof which make it easy -to profile and debug CPU intensive tasks. However, these tools are not as effective +to profile and debug CPU intensive tasks. However, these tools are not as effective at profiling I/O intensive work or providing a high level trace of an operation. To fill this gap the lance code utlizies the Rust tracing crate to provide tracing -information for lance operations. User applications can receive these events and -forward them on for logging purposes. Developers can also use this information to +information for lance operations. User applications can receive these events and +forward them on for logging purposes. Developers can also use this information to get a sense of the I/O that happens during an operation. ### Instrumenting code When instrumenting code you can use the `#[instrument]` macro from the Rust tracing -crate. See the crate docs for more information on the various parameters that can -be set. As a general guideline we should aim to instrument the following methods: +crate. See the crate docs for more information on the various parameters that can +be set. As a general guideline we should aim to instrument the following methods: -* Top-level methods that will often be called by external libraries and could be slow -* Compute intensive methods that will perform a significant amount of CPU compute -* Any point where we are waiting on external resources (e.g. disk) +- Top-level methods that will often be called by external libraries and could be slow +- Compute intensive methods that will perform a significant amount of CPU compute +- Any point where we are waiting on external resources (e.g. disk) To begin with, instrument methods as close to the user as possible and refine downwards -as you need. For example, start by instrumenting the entire dataset write operation +as you need. For example, start by instrumenting the entire dataset write operation and then instrument any individual parts of the operation that you would like to see details for. ### Tracing a unit test If you would like tracing information for a rust unit test then you will need to -decorate your test with the lance_test_macros::test attribute. This will wrap any +decorate your test with the lance_test_macros::test attribute. This will wrap any existing test attributes that you are using: ```rust @@ -205,13 +205,13 @@ LANCE_TRACING to the your desired verbosity level (trace, debug, info, warn, err LANCE_TESTING=debug cargo test dataset::tests::test_create_dataset ``` -This will create a .json file (named with a timestamp) in your working directory. This +This will create a .json file (named with a timestamp) in your working directory. This .json file can be loaded by chrome or by ### Tracing a python script If you would like to trace a python script (application, benchmark, test) then you can easily -do so using the lance.tracing module. Simply call: +do so using the lance.tracing module. Simply call: ```python from lance.tracing import trace_to_chrome @@ -223,7 +223,7 @@ trace_to_chrome(level="debug") A single .json trace file will be generated after python has exited. -You can use the `trace_to_chrome` function within the benchmarks, but for +You can use the `trace_to_chrome` function within the benchmarks, but for sensible results you'll want to force the benchmark to just run only once. To do this, rewrite the benchmark using the pedantic API: @@ -237,14 +237,13 @@ benchmark.pedantic(run, iterations=1, rounds=1) ### Trace visualization limitations The current tracing implementation is slightly flawed when it comes to async -operations that run in parallel. The rust tracing-chrome library emits -trace events into the chrome trace events JSON format. This format is not +operations that run in parallel. The rust tracing-chrome library emits +trace events into the chrome trace events JSON format. This format is not sophisticated enough to represent asynchronous parallel work. As a result, a single instrumented async method may appear as many different spans in the UI. - ## Running S3 Integration tests The integration tests run against local minio and local dynamodb. To start the @@ -307,3 +306,15 @@ maturin build --release \ --target x86_64-apple-darwin \ --out wheels ``` + +## Picking a thread pool + +When an operation should run in parallel you typically need to specify how many threads +to use. For example, as input to `StreamExt::buffered`. There are two numbers you can +use. You can use `ObjectStore::io_parallelism` or `get_num_compute_intensive_cpus`. + +Often, operations will do a little of both compute and I/O, and you will need to make +a judgement call. If you are unsure, and you are doing any I/O, then picking the +`io_parallelism` is a good fallback behavior. The worst case is just that we over-parallelize +and there is more CPU contention then there needs to be. If this becomes a problem we +can always split the operation into two parts and use the two different thread pools. diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 0937ff2443..6928a49910 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -1210,13 +1210,13 @@ impl Dataset { /// group. These are considered too small because reading many of them is /// much less efficient than reading a single file because the separate files /// split up what would otherwise be single IO requests into multiple. - pub async fn num_small_files(&self, max_rows_per_group: usize) -> Result { - Ok(futures::stream::iter(self.get_fragments()) + pub async fn num_small_files(&self, max_rows_per_group: usize) -> usize { + futures::stream::iter(self.get_fragments()) .map(|f| async move { f.physical_rows().await }) .buffered(self.object_store.io_parallelism() as usize) .try_filter(|row_count| futures::future::ready(*row_count < max_rows_per_group)) .count() - .await) + .await } pub async fn validate(&self) -> Result<()> { From 0bfe35133b56d421913b19b40cf30b68985f3c97 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Thu, 22 Aug 2024 14:17:56 -0700 Subject: [PATCH 3/6] Change io_parallelism method to return usize instead of u32 to reduce casting --- rust/lance-file/src/reader.rs | 10 +++++----- rust/lance-index/src/scalar.rs | 2 +- rust/lance-index/src/scalar/btree.rs | 2 +- rust/lance-index/src/scalar/lance_format.rs | 2 +- rust/lance-io/src/encodings/binary.rs | 2 +- rust/lance-io/src/encodings/plain.rs | 4 ++-- rust/lance-io/src/local.rs | 2 +- rust/lance-io/src/object_reader.rs | 2 +- rust/lance-io/src/object_store.rs | 18 +++++++++--------- rust/lance-io/src/scheduler.rs | 5 ++++- rust/lance-io/src/traits.rs | 2 +- rust/lance/src/dataset.rs | 10 +++++----- rust/lance/src/dataset/cleanup.rs | 9 ++++----- rust/lance/src/dataset/optimize.rs | 2 +- rust/lance/src/dataset/rowids.rs | 2 +- rust/lance/src/dataset/take.rs | 6 +++--- rust/lance/src/dataset/write/merge_insert.rs | 2 +- rust/lance/src/dataset/write/update.rs | 2 +- rust/lance/src/index/prefilter.rs | 2 +- rust/lance/src/index/vector/ivf.rs | 2 +- rust/lance/src/index/vector/ivf/io.rs | 4 ++-- rust/lance/src/io/commit.rs | 2 +- rust/lance/src/io/exec/scan.rs | 4 ++-- 23 files changed, 50 insertions(+), 48 deletions(-) diff --git a/rust/lance-file/src/reader.rs b/rust/lance-file/src/reader.rs index 0c5df4a4eb..0ecda0394a 100644 --- a/rust/lance-file/src/reader.rs +++ b/rust/lance-file/src/reader.rs @@ -232,7 +232,7 @@ impl FileReader { Self::try_new_with_fragment_id(object_store, path, schema, 0, 0, max_field_id, None).await } - fn io_parallelism(&self) -> u32 { + fn io_parallelism(&self) -> usize { self.object_reader.io_parallelism() } @@ -291,7 +291,7 @@ impl FileReader { .map(|(batch_id, range)| async move { self.read_batch(batch_id, range, projection).await }) - .buffered(self.io_parallelism() as usize) + .buffered(self.io_parallelism()) .try_collect::>() .await?; if batches.len() == 1 { @@ -326,7 +326,7 @@ impl FileReader { .await } }) - .buffered(self.io_parallelism() as usize) + .buffered(self.io_parallelism()) .try_collect::>() .await?; @@ -372,7 +372,7 @@ impl FileReader { ) .await }) - .buffered(self.io_parallelism() as usize) + .buffered(self.io_parallelism()) .try_collect::>() .await?; @@ -436,7 +436,7 @@ pub async fn read_batch( // We box this because otherwise we get a higher-order lifetime error. let arrs = stream::iter(&schema.fields) .map(|f| async { read_array(reader, f, batch_id, &reader.page_table, params).await }) - .buffered(reader.io_parallelism() as usize) + .buffered(reader.io_parallelism()) .try_collect::>() .boxed(); let arrs = arrs.await?; diff --git a/rust/lance-index/src/scalar.rs b/rust/lance-index/src/scalar.rs index 96f41174ab..4a16f52503 100644 --- a/rust/lance-index/src/scalar.rs +++ b/rust/lance-index/src/scalar.rs @@ -134,7 +134,7 @@ pub trait IndexStore: std::fmt::Debug + Send + Sync + DeepSizeOf { fn as_any(&self) -> &dyn Any; /// Suggested I/O parallelism for the store - fn io_parallelism(&self) -> u32; + fn io_parallelism(&self) -> usize; /// Create a new file and return a writer to store data in the file async fn new_index_file(&self, name: &str, schema: Arc) diff --git a/rust/lance-index/src/scalar/btree.rs b/rust/lance-index/src/scalar/btree.rs index e8f0cfb721..375925c6f7 100644 --- a/rust/lance-index/src/scalar/btree.rs +++ b/rust/lance-index/src/scalar/btree.rs @@ -765,7 +765,7 @@ impl BTreeIndex { idx: 0, } .map(|fut| fut.map_err(DataFusionError::from)) - .buffered(self.store.io_parallelism() as usize) + .buffered(self.store.io_parallelism()) .boxed(); Ok(RecordBatchStreamAdapter::new(schema, batches)) } diff --git a/rust/lance-index/src/scalar/lance_format.rs b/rust/lance-index/src/scalar/lance_format.rs index 0cc11710de..7f60a793e2 100644 --- a/rust/lance-index/src/scalar/lance_format.rs +++ b/rust/lance-index/src/scalar/lance_format.rs @@ -193,7 +193,7 @@ impl IndexStore for LanceIndexStore { self } - fn io_parallelism(&self) -> u32 { + fn io_parallelism(&self) -> usize { self.object_store.io_parallelism() } diff --git a/rust/lance-io/src/encodings/binary.rs b/rust/lance-io/src/encodings/binary.rs index 44d665fdf6..fe202c31ef 100644 --- a/rust/lance-io/src/encodings/binary.rs +++ b/rust/lance-io/src/encodings/binary.rs @@ -340,7 +340,7 @@ impl<'a, T: ByteArrayType> Decoder for BinaryDecoder<'a, T> { .await?; Result::Ok((chunk, chunk_offset, array)) }) - .buffered(self.reader.io_parallelism() as usize) + .buffered(self.reader.io_parallelism()) .try_for_each(|(chunk, chunk_offset, array)| { let array: &GenericByteArray = array.as_bytes(); diff --git a/rust/lance-io/src/encodings/plain.rs b/rust/lance-io/src/encodings/plain.rs index 76e5b1d553..3cdb664e37 100644 --- a/rust/lance-io/src/encodings/plain.rs +++ b/rust/lance-io/src/encodings/plain.rs @@ -364,7 +364,7 @@ impl<'a> PlainDecoder<'a> { let shifted_indices = sub(&request, &UInt32Array::new_scalar(start))?; Ok::(take(&array, &shifted_indices, None)?) }) - .buffered(self.reader.io_parallelism() as usize) + .buffered(self.reader.io_parallelism()) .try_collect::>() .await?; let references = arrays.iter().map(|a| a.as_ref()).collect::>(); @@ -430,7 +430,7 @@ impl<'a> Decoder for PlainDecoder<'a> { let adjusted_offsets = sub(&request, &UInt32Array::new_scalar(start))?; Ok::(take(&array, &adjusted_offsets, None)?) }) - .buffered(self.reader.io_parallelism() as usize) + .buffered(self.reader.io_parallelism()) .try_collect::>() .await?; let references = arrays.iter().map(|a| a.as_ref()).collect::>(); diff --git a/rust/lance-io/src/local.rs b/rust/lance-io/src/local.rs index 503b7002eb..81368c6de0 100644 --- a/rust/lance-io/src/local.rs +++ b/rust/lance-io/src/local.rs @@ -123,7 +123,7 @@ impl Reader for LocalObjectReader { self.block_size } - fn io_parallelism(&self) -> u32 { + fn io_parallelism(&self) -> usize { DEFAULT_LOCAL_IO_PARALLELISM } diff --git a/rust/lance-io/src/object_reader.rs b/rust/lance-io/src/object_reader.rs index 53d47965ed..e943d791a6 100644 --- a/rust/lance-io/src/object_reader.rs +++ b/rust/lance-io/src/object_reader.rs @@ -85,7 +85,7 @@ impl Reader for CloudObjectReader { self.block_size } - fn io_parallelism(&self) -> u32 { + fn io_parallelism(&self) -> usize { DEFAULT_CLOUD_IO_PARALLELISM } diff --git a/rust/lance-io/src/object_store.rs b/rust/lance-io/src/object_store.rs index 1747c753c5..b4fd2ed200 100644 --- a/rust/lance-io/src/object_store.rs +++ b/rust/lance-io/src/object_store.rs @@ -42,9 +42,9 @@ use lance_core::{Error, Result}; // Note: the number of threads here also impacts the number of files // we need to read in some situations. So keeping this at 8 keeps the // RAM on our scanner down. -pub const DEFAULT_LOCAL_IO_PARALLELISM: u32 = 8; +pub const DEFAULT_LOCAL_IO_PARALLELISM: usize = 8; // Cloud disks often need many many threads to saturate the network -pub const DEFAULT_CLOUD_IO_PARALLELISM: u32 = 64; +pub const DEFAULT_CLOUD_IO_PARALLELISM: usize = 64; #[async_trait] pub trait ObjectStoreExt { @@ -94,7 +94,7 @@ pub struct ObjectStore { scheme: String, block_size: usize, pub use_constant_size_upload_parts: bool, - io_parallelism: u32, + io_parallelism: usize, } impl DeepSizeOf for ObjectStore { @@ -467,7 +467,7 @@ impl ObjectStore { scheme: String::from("memory"), block_size: 64 * 1024, use_constant_size_upload_parts: false, - io_parallelism: get_num_compute_intensive_cpus() as u32, + io_parallelism: get_num_compute_intensive_cpus(), } } @@ -484,13 +484,13 @@ impl ObjectStore { self.block_size = new_size; } - pub fn set_io_parallelism(&mut self, io_parallelism: u32) { + pub fn set_io_parallelism(&mut self, io_parallelism: usize) { self.io_parallelism = io_parallelism; } - pub fn io_parallelism(&self) -> u32 { + pub fn io_parallelism(&self) -> usize { std::env::var("LANCE_IO_THREADS") - .map(|val| val.parse::().unwrap()) + .map(|val| val.parse::().unwrap()) .unwrap_or(self.io_parallelism) } @@ -847,7 +847,7 @@ async fn configure_store( scheme: String::from("memory"), block_size: 64 * 1024, use_constant_size_upload_parts: false, - io_parallelism: get_num_compute_intensive_cpus() as u32, + io_parallelism: get_num_compute_intensive_cpus(), }), unknown_scheme => { if let Some(provider) = registry.providers.get(unknown_scheme) { @@ -870,7 +870,7 @@ impl ObjectStore { block_size: Option, wrapper: Option>, use_constant_size_upload_parts: bool, - io_parallelism: u32, + io_parallelism: usize, ) -> Self { let scheme = location.scheme(); let block_size = block_size.unwrap_or_else(|| infer_block_size(scheme)); diff --git a/rust/lance-io/src/scheduler.rs b/rust/lance-io/src/scheduler.rs index c60ef8bf82..07e5e76751 100644 --- a/rust/lance-io/src/scheduler.rs +++ b/rust/lance-io/src/scheduler.rs @@ -419,7 +419,10 @@ impl ScanScheduler { /// * config - configuration settings for the scheduler pub fn new(object_store: Arc, config: SchedulerConfig) -> Arc { let io_capacity = object_store.io_parallelism(); - let io_queue = Arc::new(IoQueue::new(io_capacity, config.io_buffer_size_bytes)); + let io_queue = Arc::new(IoQueue::new( + io_capacity as u32, + config.io_buffer_size_bytes, + )); let scheduler = Self { object_store, io_queue: io_queue.clone(), diff --git a/rust/lance-io/src/traits.rs b/rust/lance-io/src/traits.rs index 572c18664f..0863891935 100644 --- a/rust/lance-io/src/traits.rs +++ b/rust/lance-io/src/traits.rs @@ -87,7 +87,7 @@ pub trait Reader: std::fmt::Debug + Send + Sync + DeepSizeOf { fn block_size(&self) -> usize; /// Suggest optimal I/O parallelism per storage device. - fn io_parallelism(&self) -> u32; + fn io_parallelism(&self) -> usize; /// Object/File Size. async fn size(&self) -> object_store::Result; diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 6928a49910..d6f749f184 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -1097,7 +1097,7 @@ impl Dataset { pub async fn count_deleted_rows(&self) -> Result { futures::stream::iter(self.get_fragments()) .map(|f| async move { f.count_deletions().await }) - .buffer_unordered(self.object_store.io_parallelism() as usize) + .buffer_unordered(self.object_store.io_parallelism()) .try_fold(0, |acc, x| futures::future::ready(Ok(acc + x))) .await } @@ -1213,7 +1213,7 @@ impl Dataset { pub async fn num_small_files(&self, max_rows_per_group: usize) -> usize { futures::stream::iter(self.get_fragments()) .map(|f| async move { f.physical_rows().await }) - .buffered(self.object_store.io_parallelism() as usize) + .buffered(self.object_store.io_parallelism()) .try_filter(|row_count| futures::future::ready(*row_count < max_rows_per_group)) .count() .await @@ -1246,7 +1246,7 @@ impl Dataset { // All fragments have equal lengths futures::stream::iter(self.get_fragments()) .map(|f| async move { f.validate().await }) - .buffer_unordered(self.object_store.io_parallelism() as usize) + .buffer_unordered(self.object_store.io_parallelism()) .try_collect::>() .await?; @@ -3347,8 +3347,8 @@ mod tests { .unwrap(); dataset.validate().await.unwrap(); - assert!(dataset.num_small_files(1024).await.unwrap() > 0); - assert!(dataset.num_small_files(512).await.unwrap() == 0); + assert!(dataset.num_small_files(1024).await > 0); + assert!(dataset.num_small_files(512).await == 0); } #[tokio::test] diff --git a/rust/lance/src/dataset/cleanup.rs b/rust/lance/src/dataset/cleanup.rs index 8fb0032aef..243541b5dc 100644 --- a/rust/lance/src/dataset/cleanup.rs +++ b/rust/lance/src/dataset/cleanup.rs @@ -152,10 +152,9 @@ impl<'a> CleanupTask<'a> { .commit_handler .list_manifests(&self.dataset.base, &self.dataset.object_store.inner) .await? - .try_for_each_concurrent( - self.dataset.object_store.io_parallelism() as usize, - |path| self.process_manifest_file(path, &inspection, tagged_versions), - ) + .try_for_each_concurrent(self.dataset.object_store.io_parallelism(), |path| { + self.process_manifest_file(path, &inspection, tagged_versions) + }) .await?; Ok(inspection.into_inner().unwrap()) } @@ -276,7 +275,7 @@ impl<'a> CleanupTask<'a> { .collect::>() .await; let manifest_bytes_removed = stream::iter(manifest_bytes_removed) - .buffer_unordered(self.dataset.object_store.io_parallelism() as usize) + .buffer_unordered(self.dataset.object_store.io_parallelism()) .try_fold(0, |acc, size| async move { Ok(acc + (size as u64)) }) .await; diff --git a/rust/lance/src/dataset/optimize.rs b/rust/lance/src/dataset/optimize.rs index b4dca9c352..fa810df26f 100644 --- a/rust/lance/src/dataset/optimize.rs +++ b/rust/lance/src/dataset/optimize.rs @@ -469,7 +469,7 @@ pub async fn plan_compaction( Err(e) => Err(e), } }) - .buffered(dataset.object_store().io_parallelism() as usize); + .buffered(dataset.object_store().io_parallelism()); let index_fragmaps = load_index_fragmaps(dataset).await?; let indices_containing_frag = |frag_id: u32| { diff --git a/rust/lance/src/dataset/rowids.rs b/rust/lance/src/dataset/rowids.rs index 338c758dd4..808bdae377 100644 --- a/rust/lance/src/dataset/rowids.rs +++ b/rust/lance/src/dataset/rowids.rs @@ -64,7 +64,7 @@ pub fn load_row_id_sequences<'a>( .map(|fragment| { load_row_id_sequence(dataset, fragment).map_ok(move |seq| (fragment.id as u32, seq)) }) - .buffer_unordered(dataset.object_store.io_parallelism() as usize) + .buffer_unordered(dataset.object_store.io_parallelism()) } pub async fn get_row_id_index( diff --git a/rust/lance/src/dataset/take.rs b/rust/lance/src/dataset/take.rs index d5510e70fd..265f3d741a 100644 --- a/rust/lance/src/dataset/take.rs +++ b/rust/lance/src/dataset/take.rs @@ -130,7 +130,7 @@ pub async fn take( }) .collect::>(); let take_stream = futures::stream::iter(take_tasks) - .buffered(dataset.object_store.io_parallelism() as usize) + .buffered(dataset.object_store.io_parallelism()) .map_err(|err| DataFusionError::External(err.into())) .boxed(); let take_stream = Box::pin(RecordBatchStreamAdapter::new( @@ -253,7 +253,7 @@ pub async fn take_rows( batches.push(batch_fut); } let batches: Vec = futures::stream::iter(batches) - .buffered(dataset.object_store.io_parallelism() as usize) + .buffered(dataset.object_store.io_parallelism()) .try_collect() .await?; Ok(concat_batches(&batches[0].schema(), &batches)?) @@ -293,7 +293,7 @@ pub async fn take_rows( .map(|(fragment, indices)| { do_take(fragment, indices, projection.physical_schema.clone(), true) }) - .buffered(dataset.object_store.io_parallelism() as usize) + .buffered(dataset.object_store.io_parallelism()) .try_collect::>() .await?; diff --git a/rust/lance/src/dataset/write/merge_insert.rs b/rust/lance/src/dataset/write/merge_insert.rs index 7db790fc24..352db503ad 100644 --- a/rust/lance/src/dataset/write/merge_insert.rs +++ b/rust/lance/src/dataset/write/merge_insert.rs @@ -957,7 +957,7 @@ impl MergeInsertJob { } } }) - .buffer_unordered(dataset.object_store.io_parallelism() as usize); + .buffer_unordered(dataset.object_store.io_parallelism()); while let Some(res) = stream.next().await.transpose()? { match res { diff --git a/rust/lance/src/dataset/write/update.rs b/rust/lance/src/dataset/write/update.rs index 9db98c3c3d..6b970dea50 100644 --- a/rust/lance/src/dataset/write/update.rs +++ b/rust/lance/src/dataset/write/update.rs @@ -302,7 +302,7 @@ impl UpdateJob { } } }) - .buffer_unordered(self.dataset.object_store.io_parallelism() as usize); + .buffer_unordered(self.dataset.object_store.io_parallelism()); while let Some(res) = stream.next().await.transpose()? { match res { diff --git a/rust/lance/src/index/prefilter.rs b/rust/lance/src/index/prefilter.rs index 112732abb9..cf8eb67ee4 100644 --- a/rust/lance/src/index/prefilter.rs +++ b/rust/lance/src/index/prefilter.rs @@ -105,7 +105,7 @@ impl DatasetPreFilter { .collect::>() .await; let mut frag_id_deletion_vectors = stream::iter(frag_id_deletion_vectors) - .buffer_unordered(dataset.object_store.io_parallelism() as usize); + .buffer_unordered(dataset.object_store.io_parallelism()); let mut deleted_ids = RowIdTreeMap::new(); while let Some((id, deletion_vector)) = frag_id_deletion_vectors.try_next().await? { diff --git a/rust/lance/src/index/vector/ivf.rs b/rust/lance/src/index/vector/ivf.rs index 3e1b6541ac..9c52998a16 100644 --- a/rust/lance/src/index/vector/ivf.rs +++ b/rust/lance/src/index/vector/ivf.rs @@ -1356,7 +1356,7 @@ pub(crate) async fn remap_index_file( let mut task_stream = stream::iter(tasks.into_iter()) .map(|task| task.load_and_remap(reader.clone(), index, mapping)) - .buffered(object_store.io_parallelism() as usize); + .buffered(object_store.io_parallelism()); let mut ivf = IvfModel { centroids: index.ivf.centroids.clone(), diff --git a/rust/lance/src/index/vector/ivf/io.rs b/rust/lance/src/index/vector/ivf/io.rs index 5910c2adef..5c254727cc 100644 --- a/rust/lance/src/index/vector/ivf/io.rs +++ b/rust/lance/src/index/vector/ivf/io.rs @@ -405,7 +405,7 @@ pub(super) async fn write_hnsw_quantization_index_partitions( part_reader.schema(), ) }) - .buffered(object_store.io_parallelism() as usize) + .buffered(object_store.io_parallelism()) .try_collect::>() .await?; writer.write(&batches).await?; @@ -429,7 +429,7 @@ pub(super) async fn write_hnsw_quantization_index_partitions( aux_part_reader.schema(), ) }) - .buffered(object_store.io_parallelism() as usize) + .buffered(object_store.io_parallelism()) .try_collect::>() .await?; std::mem::drop(aux_part_reader); diff --git a/rust/lance/src/io/commit.rs b/rust/lance/src/io/commit.rs index bf89363670..f60709b404 100644 --- a/rust/lance/src/io/commit.rs +++ b/rust/lance/src/io/commit.rs @@ -361,7 +361,7 @@ pub(crate) async fn migrate_fragments( ..fragment.clone() }) }) - .buffered(dataset.object_store.io_parallelism() as usize) + .buffered(dataset.object_store.io_parallelism()) .boxed(); new_fragments.try_collect().await diff --git a/rust/lance/src/io/exec/scan.rs b/rust/lance/src/io/exec/scan.rs index 709e182869..b5ff73afee 100644 --- a/rust/lance/src/io/exec/scan.rs +++ b/rust/lance/src/io/exec/scan.rs @@ -164,9 +164,9 @@ impl LanceStream { // answer though and so we err on the side of speed over memory. Users can tone down fragment_parallelism // by hand if needed. if projection.fields.is_empty() { - io_parallelism as usize + io_parallelism } else { - bit_util::ceil(io_parallelism as usize, projection.fields.len()) + bit_util::ceil(io_parallelism, projection.fields.len()) } }) // fragment_readhead=0 doesn't make sense so we just bump it to 1 From 0516bd8e917a4c5e12540ffb51a1cf982e61ba65 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Sun, 25 Aug 2024 08:10:38 -0700 Subject: [PATCH 4/6] Missed a spot where ? is no longer needed --- rust/lance/src/index/prefilter.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/lance/src/index/prefilter.rs b/rust/lance/src/index/prefilter.rs index cf8eb67ee4..ec8947ef2f 100644 --- a/rust/lance/src/index/prefilter.rs +++ b/rust/lance/src/index/prefilter.rs @@ -139,7 +139,7 @@ impl DatasetPreFilter { let (row_ids, deletion_vector) = join!(row_ids, deletion_vector); Ok::<_, crate::Error>((row_ids?, deletion_vector?)) }) - .buffer_unordered(dataset.object_store().io_parallelism()? as usize) + .buffer_unordered(dataset.object_store().io_parallelism()) .try_collect::>() .await } From 8bb8ab2a54191a079c6c9a94d343ff00b58da18e Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Mon, 26 Aug 2024 05:51:31 -0700 Subject: [PATCH 5/6] Make num_threads in CompactionOptions optional --- rust/lance/src/dataset/optimize.rs | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/rust/lance/src/dataset/optimize.rs b/rust/lance/src/dataset/optimize.rs index fa810df26f..ec940366f1 100644 --- a/rust/lance/src/dataset/optimize.rs +++ b/rust/lance/src/dataset/optimize.rs @@ -138,8 +138,10 @@ pub struct CompactionOptions { /// lower) will materialize deletions for all fragments with deletions. /// Setting above 1.0 will never materialize deletions. pub materialize_deletions_threshold: f32, - /// The number of threads to use. Defaults to the number of cores. - pub num_threads: usize, + /// The number of threads to use (how many compaction tasks to run in parallel). + /// Defaults to the number of compute-intensive CPUs. Not used when running + /// tasks manually using [`plan_compaction`] + pub num_threads: Option, /// The batch size to use when scanning the input fragments. If not /// specified then the default (see /// [`crate::dataset::Scanner::batch_size`]) will be used. @@ -154,8 +156,7 @@ impl Default for CompactionOptions { max_rows_per_group: 1024, materialize_deletions: true, materialize_deletions_threshold: 0.1, - // TODO: Should this be based on # I/O threads? - num_threads: get_num_compute_intensive_cpus(), + num_threads: None, max_bytes_per_file: None, batch_size: None, } @@ -222,7 +223,11 @@ pub async fn compact_files( let result_stream = futures::stream::iter(compaction_plan.tasks.into_iter()) .map(|task| rewrite_files(Cow::Borrowed(dataset_ref), task, &options)) - .buffer_unordered(options.num_threads); + .buffer_unordered( + options + .num_threads + .unwrap_or_else(|| get_num_compute_intensive_cpus()), + ); let completed_tasks: Vec = result_stream.try_collect().await?; let remap_options = remap_options.unwrap_or(Arc::new(DatasetIndexRemapperOptions::default())); From 57d076d3f7d952f04209b286121d3769a611e494 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Mon, 26 Aug 2024 06:39:05 -0700 Subject: [PATCH 6/6] Address clippy suggestion --- rust/lance/src/dataset/optimize.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/lance/src/dataset/optimize.rs b/rust/lance/src/dataset/optimize.rs index ec940366f1..d06de7c419 100644 --- a/rust/lance/src/dataset/optimize.rs +++ b/rust/lance/src/dataset/optimize.rs @@ -226,7 +226,7 @@ pub async fn compact_files( .buffer_unordered( options .num_threads - .unwrap_or_else(|| get_num_compute_intensive_cpus()), + .unwrap_or_else(get_num_compute_intensive_cpus), ); let completed_tasks: Vec = result_stream.try_collect().await?;