Skip to content

Commit

Permalink
[ENH] Use foyer for block cache (#2431)
Browse files Browse the repository at this point in the history
## Description of changes

*Summarize the changes made by this PR.*
 - Improvements & Bug fixes
- This PR fixes bugs that readers and writers share the cache. The fix
is as follows: 1. fork calls get instead of getting data from the read
cache 2. commit returns the newly created blocks 3. flush takes the
newly created blocks as input.
- Temporarily disable the shuttle test as there is a compatibility issue
between tokio filesystem calls and shuttle runtime. Tokio filesystem
call will panic if not wrapped in tokio runtime.
 - New functionality
- This PR introduces cache with eviction policies. Foyer
(https://github.com/MrCroxx/foyer) is used as the cache. In the future,
Foyer will be used as a hybrid cache.
- Introduced cache-related configs to be used by different components.

## Test plan
*How are these changes tested?*

- [ ] Tests pass locally with `pytest` for python, `yarn test` for js,
`cargo test` for rust

## Documentation Changes
*Are all docstrings for user-facing APIs updated if required? Do we need
to make documentation changes in the [docs
repository](https://github.com/chroma-core/docs)?*

---------

Co-authored-by: Max Isom <codetheweb@users.noreply.github.com>
Co-authored-by: Hammad Bashir <HammadB@users.noreply.github.com>
Co-authored-by: Sanket Kedia <kediasanket11121993@gmail.com>
  • Loading branch information
4 people committed Jul 16, 2024
1 parent 2722be0 commit 4644217
Show file tree
Hide file tree
Showing 28 changed files with 1,433 additions and 300 deletions.
494 changes: 486 additions & 8 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions go/pkg/log/repository/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func (r *LogRepository) InsertRecords(ctx context.Context, collectionId string,
}()
collection, err = queriesWithTx.GetCollectionForUpdate(ctx, collectionId)
if err != nil {
trace_log.Error("Error in fetching collection from collection table", zap.Error(err))
// If no row found, insert one.
if errors.Is(err, pgx.ErrNoRows) {
trace_log.Info("No rows found in the collection table for collection", zap.String("collectionId", collectionId))
Expand Down
1 change: 1 addition & 0 deletions rust/worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ opentelemetry-otlp = "0.12.0"
shuttle = "0.7.1"
regex = "1.10.5"
flatbuffers = "24.3.25"
foyer = "0.8"

[dev-dependencies]
proptest = "1.4.0"
Expand Down
16 changes: 16 additions & 0 deletions rust/worker/chroma_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ query_service:
blockfile_provider:
Arrow:
max_block_size_bytes: 16384
block_manager_config:
block_cache_config:
lru:
capacity: 1000
sparse_index_manager_config:
sparse_index_cache_config:
lru:
capacity: 1000

compaction_service:
service_name: "compaction-service"
Expand Down Expand Up @@ -85,3 +93,11 @@ compaction_service:
blockfile_provider:
Arrow:
max_block_size_bytes: 16384
block_manager_config:
block_cache_config:
lru:
capacity: 1000
sparse_index_manager_config:
sparse_index_cache_config:
lru:
capacity: 1000
83 changes: 64 additions & 19 deletions rust/worker/src/blockstore/arrow/block/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,9 @@ impl BlockDelta {
#[cfg(test)]
mod test {
use super::*;
use crate::cache::cache::Cache;
use crate::cache::config::CacheConfig;
use crate::cache::config::UnboundedCacheConfig;
use crate::{
blockstore::arrow::{
block::Block, config::TEST_MAX_BLOCK_SIZE_BYTES, provider::BlockManager,
Expand Down Expand Up @@ -225,7 +228,8 @@ mod test {
let tmp_dir = tempfile::tempdir().unwrap();
let path = tmp_dir.path().to_str().unwrap();
let storage = Storage::Local(LocalStorage::new(path));
let block_manager = BlockManager::new(storage, TEST_MAX_BLOCK_SIZE_BYTES);
let cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {}));
let block_manager = BlockManager::new(storage, TEST_MAX_BLOCK_SIZE_BYTES, cache);
let delta = block_manager.create::<&str, &Int32Array>();

let n = 2000;
Expand All @@ -243,20 +247,32 @@ mod test {
let size = delta.get_size::<&str, &Int32Array>();
// TODO: should commit take ownership of delta?
// Semantically, that makes sense, since a delta is unsuable after commit
block_manager.commit::<&str, &Int32Array>(&delta);
let block = block_manager.get(&delta.id).await.unwrap();
// Ensure the deltas estimated size matches the actual size of the block
assert_eq!(size, block.get_size());

let block = block_manager.commit::<&str, &Int32Array>(&delta);
let mut values_before_flush = vec![];
for i in 0..n {
let key = format!("key{}", i);
let read = block.get::<&str, Int32Array>("prefix", &key).unwrap();
values_before_flush.push(read);
}
block_manager.flush(&block).await.unwrap();
let block = block_manager.get(&block.clone().id).await.unwrap();
for i in 0..n {
let key = format!("key{}", i);
let read = block.get::<&str, Int32Array>("prefix", &key).unwrap();
assert_eq!(read, values_before_flush[i]);
}
test_save_load_size(path, &block);
assert_eq!(size, block.get_size());
}

#[tokio::test]
async fn test_sizing_string_val() {
let tmp_dir = tempfile::tempdir().unwrap();
let path = tmp_dir.path().to_str().unwrap();
let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap()));
let block_manager = BlockManager::new(storage, TEST_MAX_BLOCK_SIZE_BYTES);
let cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {}));
let block_manager = BlockManager::new(storage, TEST_MAX_BLOCK_SIZE_BYTES, cache);
let delta = block_manager.create::<&str, &str>();
let delta_id = delta.id.clone();

Expand All @@ -268,13 +284,22 @@ mod test {
delta.add(prefix, key.as_str(), value.as_str());
}
let size = delta.get_size::<&str, &str>();
block_manager.commit::<&str, &str>(&delta);
let block = block_manager.commit::<&str, &str>(&delta);
let mut values_before_flush = vec![];
for i in 0..n {
let key = format!("key{}", i);
let read = block.get::<&str, &str>("prefix", &key);
values_before_flush.push(read.unwrap().to_string());
}
block_manager.flush(&block).await.unwrap();

let block = block_manager.get(&delta_id).await.unwrap();
// TODO: enable this assertion after the sizing is fixed
assert_eq!(size, block.get_size());
for i in 0..n {
let key = format!("key{}", i);
let read = block.get::<&str, &str>("prefix", &key);
assert_eq!(read, Some(format!("value{}", i).as_str()));
assert_eq!(read.unwrap().to_string(), values_before_flush[i]);
}

// test save/load
Expand All @@ -286,9 +311,10 @@ mod test {
}

// test fork
let forked_block = block_manager.fork::<&str, &str>(&delta_id);
let forked_block = block_manager.fork::<&str, &str>(&delta_id).await;
let new_id = forked_block.id.clone();
block_manager.commit::<&str, &str>(&forked_block);
let block = block_manager.commit::<&str, &str>(&forked_block);
block_manager.flush(&block).await.unwrap();
let forked_block = block_manager.get(&new_id).await.unwrap();
for i in 0..n {
let key = format!("key{}", i);
Expand All @@ -302,7 +328,8 @@ mod test {
let tmp_dir = tempfile::tempdir().unwrap();
let path = tmp_dir.path().to_str().unwrap();
let storage = Storage::Local(LocalStorage::new(path));
let block_manager = BlockManager::new(storage, TEST_MAX_BLOCK_SIZE_BYTES);
let cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {}));
let block_manager = BlockManager::new(storage, TEST_MAX_BLOCK_SIZE_BYTES, cache);
let delta = block_manager.create::<f32, &str>();

let n = 2000;
Expand All @@ -314,10 +341,21 @@ mod test {
}

let size = delta.get_size::<f32, &str>();
block_manager.commit::<f32, &str>(&delta);
let block = block_manager.commit::<f32, &str>(&delta);
let mut values_before_flush = vec![];
for i in 0..n {
let key = i as f32;
let read = block.get::<f32, &str>("prefix", key).unwrap();
values_before_flush.push(read);
}
block_manager.flush(&block).await.unwrap();
let block = block_manager.get(&delta.id).await.unwrap();
assert_eq!(size, block.get_size());

for i in 0..n {
let key = i as f32;
let read = block.get::<f32, &str>("prefix", key).unwrap();
assert_eq!(read, values_before_flush[i]);
}
// test save/load
test_save_load_size(path, &block);
}
Expand All @@ -327,7 +365,8 @@ mod test {
let tmp_dir = tempfile::tempdir().unwrap();
let path = tmp_dir.path().to_str().unwrap();
let storage = Storage::Local(LocalStorage::new(path));
let block_manager = BlockManager::new(storage, TEST_MAX_BLOCK_SIZE_BYTES);
let cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {}));
let block_manager = BlockManager::new(storage, TEST_MAX_BLOCK_SIZE_BYTES, cache);
let delta = block_manager.create::<&str, &RoaringBitmap>();

let n = 2000;
Expand All @@ -339,8 +378,10 @@ mod test {
}

let size = delta.get_size::<&str, &RoaringBitmap>();
block_manager.commit::<&str, &RoaringBitmap>(&delta);
let block = block_manager.commit::<&str, &RoaringBitmap>(&delta);
block_manager.flush(&block).await.unwrap();
let block = block_manager.get(&delta.id).await.unwrap();
// TODO: enable this assertion after the sizing is fixed
assert_eq!(size, block.get_size());

for i in 0..n {
Expand All @@ -359,7 +400,8 @@ mod test {
let tmp_dir = tempfile::tempdir().unwrap();
let path = tmp_dir.path().to_str().unwrap();
let storage = Storage::Local(LocalStorage::new(path));
let block_manager = BlockManager::new(storage, TEST_MAX_BLOCK_SIZE_BYTES);
let cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {}));
let block_manager = BlockManager::new(storage, TEST_MAX_BLOCK_SIZE_BYTES, cache);
let ids = vec!["embedding_id_2", "embedding_id_0", "embedding_id_1"];
let embeddings = vec![
vec![1.0, 2.0, 3.0],
Expand Down Expand Up @@ -400,7 +442,8 @@ mod test {
}

let size = delta.get_size::<&str, &DataRecord>();
block_manager.commit::<&str, &DataRecord>(&delta);
let block = block_manager.commit::<&str, &DataRecord>(&delta);
block_manager.flush(&block).await.unwrap();
let block = block_manager.get(&delta.id).await.unwrap();
for i in 0..3 {
let read = block.get::<&str, DataRecord>("", ids[i]).unwrap();
Expand All @@ -420,7 +463,8 @@ mod test {
let tmp_dir = tempfile::tempdir().unwrap();
let path = tmp_dir.path().to_str().unwrap();
let storage = Storage::Local(LocalStorage::new(path));
let block_manager = BlockManager::new(storage, TEST_MAX_BLOCK_SIZE_BYTES);
let cache = Cache::new(&CacheConfig::Unbounded(UnboundedCacheConfig {}));
let block_manager = BlockManager::new(storage, TEST_MAX_BLOCK_SIZE_BYTES, cache);
let delta = block_manager.create::<u32, &str>();

let n = 2000;
Expand All @@ -432,7 +476,8 @@ mod test {
}

let size = delta.get_size::<u32, &str>();
block_manager.commit::<u32, &str>(&delta);
let block = block_manager.commit::<u32, &str>(&delta);
block_manager.flush(&block).await.unwrap();
let block = block_manager.get(&delta.id).await.unwrap();
assert_eq!(size, block.get_size());

Expand Down
2 changes: 1 addition & 1 deletion rust/worker/src/blockstore/arrow/block/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@ mod types;
mod u32_key;
mod u32_value;
// Re-export types at the arrow_blockfile module level
pub(in crate::blockstore::arrow) use types::*;
pub(in crate::blockstore) use types::*;
Loading

0 comments on commit 4644217

Please sign in to comment.