diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 863d69338bc1c2..5de674c81a8014 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -990,6 +990,9 @@ DEFINE_Bool(enable_file_cache, "false"); // format: [{"path":"/path/to/file_cache","total_size":21474836480,"query_limit":10737418240}] // format: [{"path":"/path/to/file_cache","total_size":21474836480,"query_limit":10737418240},{"path":"/path/to/file_cache2","total_size":21474836480,"query_limit":10737418240}] DEFINE_String(file_cache_path, ""); +// thread will sleep 10ms per scan file num to limit IO +DEFINE_Int64(async_file_cache_init_file_num_interval, "1000"); +DEFINE_Int64(async_file_cache_init_sleep_interval_ms, "20"); DEFINE_Int64(file_cache_max_file_segment_size, "4194304"); // 4MB // 4KB <= file_cache_max_file_segment_size <= 256MB DEFINE_Validator(file_cache_max_file_segment_size, [](const int64_t config) -> bool { diff --git a/be/src/common/config.h b/be/src/common/config.h index 9050701261c6a5..71d82c61ee65e1 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1040,6 +1040,8 @@ DECLARE_Bool(enable_file_cache); // format: [{"path":"/path/to/file_cache","total_size":21474836480,"query_limit":10737418240},{"path":"/path/to/file_cache2","total_size":21474836480,"query_limit":10737418240}] // format: [{"path":"/path/to/file_cache","total_size":21474836480,"query_limit":10737418240,"normal_percent":85, "disposable_percent":10, "index_percent":5}] DECLARE_String(file_cache_path); +DECLARE_Int64(async_file_cache_init_file_num_interval); +DECLARE_Int64(async_file_cache_init_sleep_interval_ms); DECLARE_Int64(file_cache_min_file_segment_size); DECLARE_Int64(file_cache_max_file_segment_size); DECLARE_Bool(clear_file_cache); diff --git a/be/src/io/cache/block/block_lru_file_cache.cpp b/be/src/io/cache/block/block_lru_file_cache.cpp index 3406e827980578..fcb5421e19eaba 100644 --- a/be/src/io/cache/block/block_lru_file_cache.cpp +++ b/be/src/io/cache/block/block_lru_file_cache.cpp @@ -119,10 +119,33 @@ LRUFileCache::LRUFileCache(const std::string& cache_base_path, Status LRUFileCache::initialize() { MonotonicStopWatch watch; watch.start(); - std::lock_guard cache_lock(_mutex); if (!_is_initialized) { if (fs::exists(_cache_base_path)) { - RETURN_IF_ERROR(load_cache_info_into_memory(cache_lock)); + // the cache already exists, try to load cache info asyncly + _lazy_open_done = false; + _cache_background_load_thread = std::thread([this]() { + MonotonicStopWatch watch; + watch.start(); + std::lock_guard cache_lock(_mutex); + Status s = load_cache_info_into_memory(cache_lock); + if (s.ok()) { + _lazy_open_done = true; + } else { + LOG(WARNING) << fmt::format("Failed to load cache info from {}: {}", + _cache_base_path, s.to_string()); + } + int64_t cost = watch.elapsed_time() / 1000 / 1000; + LOG(INFO) << fmt::format( + "FileCache lazy load done path={}, disposable queue size={} elements={}, " + "index queue size={} elements={}, query queue size={} elements={}, init " + "cost(ms)={}", + _cache_base_path, _disposable_queue.get_total_cache_size(cache_lock), + _disposable_queue.get_elements_num(cache_lock), + _index_queue.get_total_cache_size(cache_lock), + _index_queue.get_elements_num(cache_lock), + _normal_queue.get_total_cache_size(cache_lock), + _normal_queue.get_elements_num(cache_lock), cost); + }); } else { std::error_code ec; fs::create_directories(_cache_base_path, ec); @@ -136,17 +159,8 @@ Status LRUFileCache::initialize() { _is_initialized = true; _cache_background_thread = std::thread(&LRUFileCache::run_background_operation, this); int64_t cost = watch.elapsed_time() / 1000 / 1000; - LOG(INFO) << fmt::format( - "After initialize file cache path={}, disposable queue size={} elements={}, index " - "queue size={} " - "elements={}, query queue " - "size={} elements={}, init cost(ms)={}", - _cache_base_path, _disposable_queue.get_total_cache_size(cache_lock), - _disposable_queue.get_elements_num(cache_lock), - _index_queue.get_total_cache_size(cache_lock), - _index_queue.get_elements_num(cache_lock), - _normal_queue.get_total_cache_size(cache_lock), - _normal_queue.get_elements_num(cache_lock), cost); + LOG(INFO) << fmt::format("After initialize file cache path={}, init cost(ms)={}", + _cache_base_path, cost); return Status::OK(); } @@ -376,6 +390,16 @@ void LRUFileCache::fill_holes_with_empty_file_blocks(FileBlocks& file_blocks, co FileBlocksHolder LRUFileCache::get_or_set(const Key& key, size_t offset, size_t size, const CacheContext& context) { + if (!_lazy_open_done) { + // Cache is not ready yet + LOG(WARNING) << fmt::format( + "Cache is not ready yet, skip cache for key: {}, offset: {}, size: {}.", + key.to_string(), offset, size); + FileBlocks file_blocks = {std::make_shared( + offset, size, key, this, FileBlock::State::SKIP_CACHE, context.cache_type)}; + return FileBlocksHolder(std::move(file_blocks)); + } + FileBlock::Range range(offset, offset + size - 1); std::lock_guard cache_lock(_mutex); @@ -827,6 +851,7 @@ Status LRUFileCache::load_cache_info_into_memory(std::lock_guard& ca std::vector> queue_entries; std::vector need_to_check_if_empty_dir; Status st = Status::OK(); + size_t scan_file_num = 0; auto scan_file_cache = [&](fs::directory_iterator& key_it) { for (; key_it != fs::directory_iterator(); ++key_it) { key = Key( @@ -888,6 +913,11 @@ Status LRUFileCache::load_cache_info_into_memory(std::lock_guard& ca } need_to_check_if_empty_dir.push_back(key_it->path()); } + scan_file_num += 1; + if (scan_file_num % config::async_file_cache_init_file_num_interval == 0) { + std::this_thread::sleep_for( + std::chrono::milliseconds(config::async_file_cache_init_sleep_interval_ms)); + } } } }; diff --git a/be/src/io/cache/block/block_lru_file_cache.h b/be/src/io/cache/block/block_lru_file_cache.h index c7644ecbd8aea5..bcf00d938a725b 100644 --- a/be/src/io/cache/block/block_lru_file_cache.h +++ b/be/src/io/cache/block/block_lru_file_cache.h @@ -53,6 +53,9 @@ class LRUFileCache final : public IFileCache { LRUFileCache(const std::string& cache_base_path, const FileCacheSettings& cache_settings); ~LRUFileCache() override { _close = true; + if (_cache_background_load_thread.joinable()) { + _cache_background_thread.join(); + } if (_cache_background_thread.joinable()) { _cache_background_thread.join(); } @@ -201,6 +204,8 @@ class LRUFileCache final : public IFileCache { private: std::atomic_bool _close {false}; std::thread _cache_background_thread; + std::atomic_bool _lazy_open_done {true}; + std::thread _cache_background_load_thread; size_t _num_read_segments = 0; size_t _num_hit_segments = 0; size_t _num_removed_segments = 0; diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 897ac77c19c719..0725c59a916a0f 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2742,7 +2742,7 @@ public static boolean isNotCloudMode() { // start of lock config @ConfField(description = {"是否开启死锁检测", "Whether to enable deadlock detection"}) - public static boolean enable_deadlock_detection = false; + public static boolean enable_deadlock_detection = true; @ConfField(description = {"死锁检测间隔时间,单位分钟", "Deadlock detection interval time, unit minute"})