Skip to content

Commit

Permalink
[improve](cache) File cache async init apache#39036
Browse files Browse the repository at this point in the history
  • Loading branch information
suxiaogang223 authored and morningman committed Aug 9, 2024
1 parent 86945eb commit 12f0411
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 14 deletions.
3 changes: 3 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -990,6 +990,9 @@ DEFINE_Bool(enable_file_cache, "false");
// format: [{"path":"/path/to/file_cache","total_size":21474836480,"query_limit":10737418240}]
// format: [{"path":"/path/to/file_cache","total_size":21474836480,"query_limit":10737418240},{"path":"/path/to/file_cache2","total_size":21474836480,"query_limit":10737418240}]
DEFINE_String(file_cache_path, "");
// thread will sleep 10ms per scan file num to limit IO
DEFINE_Int64(async_file_cache_init_file_num_interval, "1000");
DEFINE_Int64(async_file_cache_init_sleep_interval_ms, "20");
DEFINE_Int64(file_cache_max_file_segment_size, "4194304"); // 4MB
// 4KB <= file_cache_max_file_segment_size <= 256MB
DEFINE_Validator(file_cache_max_file_segment_size, [](const int64_t config) -> bool {
Expand Down
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1040,6 +1040,8 @@ DECLARE_Bool(enable_file_cache);
// format: [{"path":"/path/to/file_cache","total_size":21474836480,"query_limit":10737418240},{"path":"/path/to/file_cache2","total_size":21474836480,"query_limit":10737418240}]
// format: [{"path":"/path/to/file_cache","total_size":21474836480,"query_limit":10737418240,"normal_percent":85, "disposable_percent":10, "index_percent":5}]
DECLARE_String(file_cache_path);
DECLARE_Int64(async_file_cache_init_file_num_interval);
DECLARE_Int64(async_file_cache_init_sleep_interval_ms);
DECLARE_Int64(file_cache_min_file_segment_size);
DECLARE_Int64(file_cache_max_file_segment_size);
DECLARE_Bool(clear_file_cache);
Expand Down
56 changes: 43 additions & 13 deletions be/src/io/cache/block/block_lru_file_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,33 @@ LRUFileCache::LRUFileCache(const std::string& cache_base_path,
Status LRUFileCache::initialize() {
MonotonicStopWatch watch;
watch.start();
std::lock_guard cache_lock(_mutex);
if (!_is_initialized) {
if (fs::exists(_cache_base_path)) {
RETURN_IF_ERROR(load_cache_info_into_memory(cache_lock));
// the cache already exists, try to load cache info asyncly
_lazy_open_done = false;
_cache_background_load_thread = std::thread([this]() {
MonotonicStopWatch watch;
watch.start();
std::lock_guard<std::mutex> cache_lock(_mutex);
Status s = load_cache_info_into_memory(cache_lock);
if (s.ok()) {
_lazy_open_done = true;
} else {
LOG(WARNING) << fmt::format("Failed to load cache info from {}: {}",
_cache_base_path, s.to_string());
}
int64_t cost = watch.elapsed_time() / 1000 / 1000;
LOG(INFO) << fmt::format(
"FileCache lazy load done path={}, disposable queue size={} elements={}, "
"index queue size={} elements={}, query queue size={} elements={}, init "
"cost(ms)={}",
_cache_base_path, _disposable_queue.get_total_cache_size(cache_lock),
_disposable_queue.get_elements_num(cache_lock),
_index_queue.get_total_cache_size(cache_lock),
_index_queue.get_elements_num(cache_lock),
_normal_queue.get_total_cache_size(cache_lock),
_normal_queue.get_elements_num(cache_lock), cost);
});
} else {
std::error_code ec;
fs::create_directories(_cache_base_path, ec);
Expand All @@ -136,17 +159,8 @@ Status LRUFileCache::initialize() {
_is_initialized = true;
_cache_background_thread = std::thread(&LRUFileCache::run_background_operation, this);
int64_t cost = watch.elapsed_time() / 1000 / 1000;
LOG(INFO) << fmt::format(
"After initialize file cache path={}, disposable queue size={} elements={}, index "
"queue size={} "
"elements={}, query queue "
"size={} elements={}, init cost(ms)={}",
_cache_base_path, _disposable_queue.get_total_cache_size(cache_lock),
_disposable_queue.get_elements_num(cache_lock),
_index_queue.get_total_cache_size(cache_lock),
_index_queue.get_elements_num(cache_lock),
_normal_queue.get_total_cache_size(cache_lock),
_normal_queue.get_elements_num(cache_lock), cost);
LOG(INFO) << fmt::format("After initialize file cache path={}, init cost(ms)={}",
_cache_base_path, cost);
return Status::OK();
}

Expand Down Expand Up @@ -376,6 +390,16 @@ void LRUFileCache::fill_holes_with_empty_file_blocks(FileBlocks& file_blocks, co

FileBlocksHolder LRUFileCache::get_or_set(const Key& key, size_t offset, size_t size,
const CacheContext& context) {
if (!_lazy_open_done) {
// Cache is not ready yet
LOG(WARNING) << fmt::format(
"Cache is not ready yet, skip cache for key: {}, offset: {}, size: {}.",
key.to_string(), offset, size);
FileBlocks file_blocks = {std::make_shared<FileBlock>(
offset, size, key, this, FileBlock::State::SKIP_CACHE, context.cache_type)};
return FileBlocksHolder(std::move(file_blocks));
}

FileBlock::Range range(offset, offset + size - 1);

std::lock_guard cache_lock(_mutex);
Expand Down Expand Up @@ -827,6 +851,7 @@ Status LRUFileCache::load_cache_info_into_memory(std::lock_guard<std::mutex>& ca
std::vector<std::pair<Key, size_t>> queue_entries;
std::vector<std::string> need_to_check_if_empty_dir;
Status st = Status::OK();
size_t scan_file_num = 0;
auto scan_file_cache = [&](fs::directory_iterator& key_it) {
for (; key_it != fs::directory_iterator(); ++key_it) {
key = Key(
Expand Down Expand Up @@ -888,6 +913,11 @@ Status LRUFileCache::load_cache_info_into_memory(std::lock_guard<std::mutex>& ca
}
need_to_check_if_empty_dir.push_back(key_it->path());
}
scan_file_num += 1;
if (scan_file_num % config::async_file_cache_init_file_num_interval == 0) {
std::this_thread::sleep_for(
std::chrono::milliseconds(config::async_file_cache_init_sleep_interval_ms));
}
}
}
};
Expand Down
5 changes: 5 additions & 0 deletions be/src/io/cache/block/block_lru_file_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ class LRUFileCache final : public IFileCache {
LRUFileCache(const std::string& cache_base_path, const FileCacheSettings& cache_settings);
~LRUFileCache() override {
_close = true;
if (_cache_background_load_thread.joinable()) {
_cache_background_thread.join();
}
if (_cache_background_thread.joinable()) {
_cache_background_thread.join();
}
Expand Down Expand Up @@ -201,6 +204,8 @@ class LRUFileCache final : public IFileCache {
private:
std::atomic_bool _close {false};
std::thread _cache_background_thread;
std::atomic_bool _lazy_open_done {true};
std::thread _cache_background_load_thread;
size_t _num_read_segments = 0;
size_t _num_hit_segments = 0;
size_t _num_removed_segments = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2742,7 +2742,7 @@ public static boolean isNotCloudMode() {
// start of lock config
@ConfField(description = {"是否开启死锁检测",
"Whether to enable deadlock detection"})
public static boolean enable_deadlock_detection = false;
public static boolean enable_deadlock_detection = true;

@ConfField(description = {"死锁检测间隔时间,单位分钟",
"Deadlock detection interval time, unit minute"})
Expand Down

0 comments on commit 12f0411

Please sign in to comment.