Skip to content

Commit

Permalink
feat: make the # of CPU threads configurable and document cpu/memory …
Browse files Browse the repository at this point in the history
…patterns (#2773)

This gets rid of all usage of `num_cpus` (except in one spot to
determine the default # of CPU threads) and instead uses
`ObjectStore::io_parallelism` or `get_num_compute_intensive_cpus`.
Admittedly, many of our operations do not split out I/O and compute. For
example:

```
async move {
  let batch = read_batch(...).await?;
  let transformed = transform_batch(batch);
}
```

However, we can just pick one and clean these up as we go. For now, I
made a best effort guess (most places where we were applying a
multiplier to `num_cpus` I use the I/O count. Other places I use the
compute count.
  • Loading branch information
westonpace committed Aug 26, 2024
1 parent d5e5a3b commit 1e6ee60
Show file tree
Hide file tree
Showing 50 changed files with 260 additions and 130 deletions.
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ mockall = { version = "0.12.1" }
mock_instant = { version = "0.3.1", features = ["sync"] }
moka = { version = "0.11", features = ["future"] }
num-traits = "0.2"
num_cpus = "1.0"
# Set min to prevent use of versions with CVE-2024-41178
object_store = { version = "0.10.2" }
parquet = "52.0"
Expand Down
2 changes: 2 additions & 0 deletions docs/format.rst
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,8 @@ systems and cloud object stores, with the notable except of AWS S3. For ones
that lack this functionality, an external locking mechanism can be configured
by the user.

.. _conflict_resolution:

Conflict resolution
~~~~~~~~~~~~~~~~~~~

Expand Down
1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ Preview releases receive the same level of testing as regular releases.
Lance Formats <./format>
Arrays <./arrays>
Integrations <./integrations/integrations>
Performance Guide <./performance>
API References <./api/api>
Contributor Guide <./contributing>
Examples <./examples/examples>
Expand Down
83 changes: 83 additions & 0 deletions docs/performance.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
Lance Performance Guide
=======================

This guide provides tips and tricks for optimizing the performance of your Lance applications.

Threading Model
---------------

Lance is designed to be thread-safe and performant. Lance APIs can be called concurrently unless
explicity stated otherwise. Users may create multiple tables and share tables between threads.
Operations may run in parallel on the same table, but some operations may lead to conflicts. For
details see :ref:`conflict_resolution`.

Most Lance operations will use multiple threads to perform work in parallel. There are two thread
pools in lance: the IO thread pool and the compute thread pool. The IO thread pool is used for
reading and writing data from disk. The compute thread pool is used for performing computations
on data. The number of threads in each pool can be configured by the user.

The IO thread pool is used for reading and writing data from disk. The number of threads in the IO
thread pool is determined by the object store that the operation is working with. Local object stores
will use 8 threads by default. Cloud object stores will use 64 threads by default. This is a fairly
conservative default and you may need 128 or 256 threads to saturate network bandwidth on some cloud
providers. The ``LANCE_IO_THREADS`` environment variable can be used to override the number of IO
threads. If you increase this variable you may also want to increase the ``io_buffer_size``.

The compute thread pool is used for performing computations on data. The number of threads in the
compute thread pool is determined by the number of cores on the machine. The number of threads in
the compute thread pool can be overridden by setting the ``LANCE_CPU_THREADS`` environment variable.
This is commonly done when running multiple Lance processes on the same machine (e.g when working with
tools like Ray). Keep in mind that decoding data is a compute intensive operation, even if a workload
seems I/O bound (like scanning a table) it may still need quite a few compute threads to achieve peak
performance.

Memory Requirements
-------------------

Lance is designed to be memory efficient. Operations should stream data from disk and not require
loading the entire dataset into memory. However, there are a few components of Lance that can use
a lot of memory.

Index Cache
~~~~~~~~~~~

Lance uses an index cache to speed up queries. This caches vector and scalar indices in memory. The
max size of this cache can be configured when creating a ``LanceDataset`` using the ``index_cache_size``
parameter. This cache is an LRU cached that is sized by "number of entries". The size of each entry
and the number of entries needed depends on the index in question. For example, an IVF/PQ vector index
contains 1 header entry and 1 entry for each partition. The size of each entry is determined by the
number of vectors and the PQ parameters (e.g. number of subvectors). You can view the size of this cache
by inspecting the result of ``dataset.session().size_bytes()``.

The index cache is not shared between tables. For best performance you should create a single table and
share it across your application.

Scanning Data
~~~~~~~~~~~~~

Searches (e.g. vector search, full text search) do not use a lot of memory to hold data because they don't
typically return a lot of data. However, scanning data can use a lot of memory. Scanning is a streaming
operation but we need enough memory to hold the data that we are scanning. The amount of memory needed is
largely determined by the ``io_buffer_size`` and the ``batch_size`` variables.

Each I/O thread should have enough memory to buffer an entire page of data. Pages today are typically between
8 and 32 MB. This means, as a rule of thumb, you should generally have about 32MB of memory per I/O thread.
The default ``io_buffer_size`` is 2GB which is enough to buffer 64 pages of data. If you increase the number
of I/O threads you should also increase the ``io_buffer_size``.

Scans will also decode data (and run any filtering or compute) in parallel on CPU threads. The amount of data
decoded at any one time is determined by the ``batch_size`` and the size of your rows. Each CPU thread will
need enough memory to hold one batch. Once batches are delivered to your application, they are no longer tracked
by Lance and so if memory is a concern then you should also be careful not to accumulate memory in your own
application (e.g. by running ``to_table`` or otherwise collecting all batches in memory.)

The default ``batch_size`` is 8192 rows. When you are working with mostly scalar data you want to keep batches
around 1MB and so the amount of memory needed by the compute threads is fairly small. However, when working with
large data you may need to turn down the ``batch_size`` to keep memory usage under control. For example, when
working with 1024-dimensional vector embeddings (e.g. 32-bit floats) then 8192 rows would be 32MB of data. If you
spread that across 16 CPU threads then you would need 512MB of compute memory per scan. You might find working
with 1024 rows per batch is more appropriate.

In summary, scans could use up to ``(2 * io_buffer_size) + (batch_size * num_compute_threads)`` bytes of memory.
Keep in mind that ``io_buffer_size`` is a soft limit (e.g. we cannot read less than one page at a time right now)
and so it is not neccesarily a bug if you see memory usage exceed this limit by a small margin.
1 change: 0 additions & 1 deletion python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ uuid = "1.3.0"
serde_json = "1"
serde = "1.0.197"
serde_yaml = "0.9.34"
num_cpus = "1"
snafu = "0.7.4"
tracing-chrome = "0.7.1"
tracing-subscriber = "0.3.17"
Expand Down
47 changes: 29 additions & 18 deletions python/DEVELOPMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ make lint
### Format and lint on commit

If you would like to run the formatters and linters when you commit your code
then you can use the pre-commit tool. The project includes a pre-commit config
file already. First, install the pre-commit tool:
then you can use the pre-commit tool. The project includes a pre-commit config
file already. First, install the pre-commit tool:

```shell
pip install pre-commit
Expand Down Expand Up @@ -162,33 +162,33 @@ pytest --benchmark-compare=$COMPARE_ID python/benchmarks -m "not slow"
## Tracing

Rust has great integration with tools like criterion and pprof which make it easy
to profile and debug CPU intensive tasks. However, these tools are not as effective
to profile and debug CPU intensive tasks. However, these tools are not as effective
at profiling I/O intensive work or providing a high level trace of an operation.

To fill this gap the lance code utlizies the Rust tracing crate to provide tracing
information for lance operations. User applications can receive these events and
forward them on for logging purposes. Developers can also use this information to
information for lance operations. User applications can receive these events and
forward them on for logging purposes. Developers can also use this information to
get a sense of the I/O that happens during an operation.

### Instrumenting code

When instrumenting code you can use the `#[instrument]` macro from the Rust tracing
crate. See the crate docs for more information on the various parameters that can
be set. As a general guideline we should aim to instrument the following methods:
crate. See the crate docs for more information on the various parameters that can
be set. As a general guideline we should aim to instrument the following methods:

* Top-level methods that will often be called by external libraries and could be slow
* Compute intensive methods that will perform a significant amount of CPU compute
* Any point where we are waiting on external resources (e.g. disk)
- Top-level methods that will often be called by external libraries and could be slow
- Compute intensive methods that will perform a significant amount of CPU compute
- Any point where we are waiting on external resources (e.g. disk)

To begin with, instrument methods as close to the user as possible and refine downwards
as you need. For example, start by instrumenting the entire dataset write operation
as you need. For example, start by instrumenting the entire dataset write operation
and then instrument any individual parts of the operation that you would like to see
details for.

### Tracing a unit test

If you would like tracing information for a rust unit test then you will need to
decorate your test with the lance_test_macros::test attribute. This will wrap any
decorate your test with the lance_test_macros::test attribute. This will wrap any
existing test attributes that you are using:

```rust
Expand All @@ -205,13 +205,13 @@ LANCE_TRACING to the your desired verbosity level (trace, debug, info, warn, err
LANCE_TESTING=debug cargo test dataset::tests::test_create_dataset
```

This will create a .json file (named with a timestamp) in your working directory. This
This will create a .json file (named with a timestamp) in your working directory. This
.json file can be loaded by chrome or by <https://ui.perfetto.dev>

### Tracing a python script

If you would like to trace a python script (application, benchmark, test) then you can easily
do so using the lance.tracing module. Simply call:
do so using the lance.tracing module. Simply call:

```python
from lance.tracing import trace_to_chrome
Expand All @@ -223,7 +223,7 @@ trace_to_chrome(level="debug")

A single .json trace file will be generated after python has exited.

You can use the `trace_to_chrome` function within the benchmarks, but for
You can use the `trace_to_chrome` function within the benchmarks, but for
sensible results you'll want to force the benchmark to just run only once.
To do this, rewrite the benchmark using the pedantic API:

Expand All @@ -237,14 +237,13 @@ benchmark.pedantic(run, iterations=1, rounds=1)
### Trace visualization limitations

The current tracing implementation is slightly flawed when it comes to async
operations that run in parallel. The rust tracing-chrome library emits
trace events into the chrome trace events JSON format. This format is not
operations that run in parallel. The rust tracing-chrome library emits
trace events into the chrome trace events JSON format. This format is not
sophisticated enough to represent asynchronous parallel work.

As a result, a single instrumented async method may appear as many different
spans in the UI.


## Running S3 Integration tests

The integration tests run against local minio and local dynamodb. To start the
Expand Down Expand Up @@ -307,3 +306,15 @@ maturin build --release \
--target x86_64-apple-darwin \
--out wheels
```

## Picking a thread pool

When an operation should run in parallel you typically need to specify how many threads
to use. For example, as input to `StreamExt::buffered`. There are two numbers you can
use. You can use `ObjectStore::io_parallelism` or `get_num_compute_intensive_cpus`.

Often, operations will do a little of both compute and I/O, and you will need to make
a judgement call. If you are unsure, and you are doing any I/O, then picking the
`io_parallelism` is a good fallback behavior. The worst case is just that we over-parallelize
and there is more CPU contention then there needs to be. If this becomes a problem we
can always split the operation into two parts and use the two different thread pools.
4 changes: 1 addition & 3 deletions python/src/dataset/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,7 @@ fn parse_compaction_options(options: &PyDict) -> PyResult<CompactionOptions> {
opts.materialize_deletions_threshold = value.extract()?;
}
"num_threads" => {
opts.num_threads = value
.extract::<Option<usize>>()?
.unwrap_or_else(num_cpus::get);
opts.num_threads = value.extract()?;
}
"batch_size" => {
opts.batch_size = value.extract()?;
Expand Down
2 changes: 1 addition & 1 deletion rust/lance-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ futures.workspace = true
lazy_static.workspace = true
mock_instant.workspace = true
moka.workspace = true
num_cpus.workspace = true
num_cpus = "1.0"
object_store = { workspace = true }
pin-project.workspace = true
prost.workspace = true
Expand Down
4 changes: 4 additions & 0 deletions rust/lance-core/src/utils/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ use tokio::runtime::{Builder, Runtime};
use tracing::Span;

pub fn get_num_compute_intensive_cpus() -> usize {
if let Ok(user_specified) = std::env::var("LANCE_CPU_THREADS") {
return user_specified.parse().unwrap();
}

let cpus = num_cpus::get();

if cpus <= *IO_CORE_RESERVATION {
Expand Down
1 change: 0 additions & 1 deletion rust/lance-encoding/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ bytes.workspace = true
futures.workspace = true
fsst.workspace = true
log.workspace = true
num_cpus.workspace = true
num-traits.workspace = true
prost.workspace = true
hyperloglogplus.workspace = true
Expand Down
1 change: 0 additions & 1 deletion rust/lance-file/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ datafusion-common.workspace = true
deepsize.workspace = true
futures.workspace = true
log.workspace = true
num_cpus.workspace = true
num-traits.workspace = true
object_store.workspace = true
prost.workspace = true
Expand Down
12 changes: 8 additions & 4 deletions rust/lance-file/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,10 @@ impl FileReader {
Self::try_new_with_fragment_id(object_store, path, schema, 0, 0, max_field_id, None).await
}

fn io_parallelism(&self) -> usize {
self.object_reader.io_parallelism()
}

/// Requested projection of the data in this file, excluding the row id column.
pub fn schema(&self) -> &Schema {
&self.schema
Expand Down Expand Up @@ -287,7 +291,7 @@ impl FileReader {
.map(|(batch_id, range)| async move {
self.read_batch(batch_id, range, projection).await
})
.buffered(num_cpus::get())
.buffered(self.io_parallelism())
.try_collect::<Vec<_>>()
.await?;
if batches.len() == 1 {
Expand Down Expand Up @@ -322,7 +326,7 @@ impl FileReader {
.await
}
})
.buffered(num_cpus::get() * 4)
.buffered(self.io_parallelism())
.try_collect::<Vec<_>>()
.await?;

Expand Down Expand Up @@ -368,7 +372,7 @@ impl FileReader {
)
.await
})
.buffered(num_cpus::get())
.buffered(self.io_parallelism())
.try_collect::<Vec<_>>()
.await?;

Expand Down Expand Up @@ -432,7 +436,7 @@ pub async fn read_batch(
// We box this because otherwise we get a higher-order lifetime error.
let arrs = stream::iter(&schema.fields)
.map(|f| async { read_array(reader, f, batch_id, &reader.page_table, params).await })
.buffered(num_cpus::get() * 4)
.buffered(reader.io_parallelism())
.try_collect::<Vec<_>>()
.boxed();
let arrs = arrs.await?;
Expand Down
1 change: 0 additions & 1 deletion rust/lance-index/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ lance-table.workspace = true
lazy_static.workspace = true
log.workspace = true
moka.workspace = true
num_cpus.workspace = true
num-traits.workspace = true
object_store.workspace = true
prost.workspace = true
Expand Down
3 changes: 3 additions & 0 deletions rust/lance-index/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ pub trait IndexReader: Send + Sync {
pub trait IndexStore: std::fmt::Debug + Send + Sync + DeepSizeOf {
fn as_any(&self) -> &dyn Any;

/// Suggested I/O parallelism for the store
fn io_parallelism(&self) -> usize;

/// Create a new file and return a writer to store data in the file
async fn new_index_file(&self, name: &str, schema: Arc<Schema>)
-> Result<Box<dyn IndexWriter>>;
Expand Down
11 changes: 8 additions & 3 deletions rust/lance-index/src/scalar/btree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ use futures::{
stream::{self},
FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt,
};
use lance_core::{utils::mask::RowIdTreeMap, Error, Result};
use lance_core::{
utils::{mask::RowIdTreeMap, tokio::get_num_compute_intensive_cpus},
Error, Result,
};
use lance_datafusion::{
chunker::chunk_concat_stream,
exec::{execute_plan, LanceExecutionOptions, OneShotExec},
Expand Down Expand Up @@ -762,7 +765,7 @@ impl BTreeIndex {
idx: 0,
}
.map(|fut| fut.map_err(DataFusionError::from))
.buffered(num_cpus::get())
.buffered(self.store.io_parallelism())
.boxed();
Ok(RecordBatchStreamAdapter::new(schema, batches))
}
Expand Down Expand Up @@ -879,7 +882,9 @@ impl ScalarIndex for BTreeIndex {
})
.collect::<Vec<_>>();
stream::iter(page_tasks)
.buffered(num_cpus::get())
// I/O and compute mixed here but important case is index in cache so
// use compute intensive thread count
.buffered(get_num_compute_intensive_cpus())
.try_collect::<RowIdTreeMap>()
.await
}
Expand Down
4 changes: 3 additions & 1 deletion rust/lance-index/src/scalar/inverted/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use futures::stream::repeat_with;
use futures::{stream, StreamExt, TryStreamExt};
use itertools::Itertools;
use lance_core::utils::mask::RowIdTreeMap;
use lance_core::utils::tokio::get_num_compute_intensive_cpus;
use lance_core::{Error, Result, ROW_ID};
use lazy_static::lazy_static;
use moka::future::Cache;
Expand Down Expand Up @@ -144,7 +145,8 @@ impl InvertedIndex {
mask.clone(),
))
})
.buffered(num_cpus::get())
// Use compute count since data hopefully cached
.buffered(get_num_compute_intensive_cpus())
.try_collect::<Vec<_>>()
.await?;

Expand Down
Loading

0 comments on commit 1e6ee60

Please sign in to comment.