From d31abdf8a659db45d77d1cf9433b31562542ba13 Mon Sep 17 00:00:00 2001 From: Shuo Wang Date: Thu, 4 Jul 2024 13:01:27 +0800 Subject: [PATCH] [Feature](Cloud) Support session variable disable_file_cache in query. --- be/src/exec/rowid_fetcher.cpp | 1 + be/src/olap/parallel_scanner_builder.cpp | 9 ++++++++- be/src/olap/rowset/beta_rowset.cpp | 19 +++++++++++------- be/src/olap/rowset/beta_rowset.h | 9 ++++++--- be/src/olap/rowset/beta_rowset_reader.cpp | 20 ++++++++++++++++--- be/src/olap/segment_loader.cpp | 4 ++-- be/src/olap/segment_loader.h | 3 ++- .../org/apache/doris/qe/SessionVariable.java | 17 ++++++++++++++++ gensrc/thrift/PaloInternalService.thrift | 9 +++++++-- 9 files changed, 72 insertions(+), 19 deletions(-) diff --git a/be/src/exec/rowid_fetcher.cpp b/be/src/exec/rowid_fetcher.cpp index 0ec1c7ce3a34a47..beb8c2f0962a5a9 100644 --- a/be/src/exec/rowid_fetcher.cpp +++ b/be/src/exec/rowid_fetcher.cpp @@ -381,6 +381,7 @@ Status RowIdStorageReader::read_by_rowids(const PMultiGetRequest& request, << ", row_size:" << row_size; *response->add_row_locs() = row_loc; }); + // TODO: supoort session variable enable_page_cache and disable_file_cache if necessary. SegmentCacheHandle segment_cache; RETURN_IF_ERROR(scope_timer_run( [&]() { diff --git a/be/src/olap/parallel_scanner_builder.cpp b/be/src/olap/parallel_scanner_builder.cpp index ac57448ade7e6db..6a2503a70e90029 100644 --- a/be/src/olap/parallel_scanner_builder.cpp +++ b/be/src/olap/parallel_scanner_builder.cpp @@ -179,13 +179,20 @@ Status ParallelScannerBuilder::_load() { RETURN_IF_ERROR(tablet->capture_consistent_rowsets_unlocked({0, version}, &rowsets)); } + bool enable_segment_cache = _state->query_options().__isset.enable_segment_cache + ? _state->query_options().enable_segment_cache + : true; + bool disable_file_cache = _state->query_options().__isset.disable_file_cache + ? _state->query_options().disable_file_cache + : false; for (auto& rowset : rowsets) { RETURN_IF_ERROR(rowset->load()); const auto rowset_id = rowset->rowset_id(); auto& segment_cache_handle = _segment_cache_handles[rowset_id]; RETURN_IF_ERROR(SegmentLoader::instance()->load_segments( - std::dynamic_pointer_cast(rowset), &segment_cache_handle, true)); + std::dynamic_pointer_cast(rowset), &segment_cache_handle, + enable_segment_cache, false, disable_file_cache)); _total_rows += rowset->num_rows(); } } diff --git a/be/src/olap/rowset/beta_rowset.cpp b/be/src/olap/rowset/beta_rowset.cpp index a76cbe636eef905..a5d98ef3e4a5f94 100644 --- a/be/src/olap/rowset/beta_rowset.cpp +++ b/be/src/olap/rowset/beta_rowset.cpp @@ -146,23 +146,26 @@ Status BetaRowset::get_segments_size(std::vector* segments_size) { return Status::OK(); } -Status BetaRowset::load_segments(std::vector* segments) { - return load_segments(0, num_segments(), segments); +Status BetaRowset::load_segments(std::vector* segments, + bool disable_file_cache) { + return load_segments(0, num_segments(), segments, disable_file_cache); } Status BetaRowset::load_segments(int64_t seg_id_begin, int64_t seg_id_end, - std::vector* segments) { + std::vector* segments, + bool disable_file_cache) { int64_t seg_id = seg_id_begin; while (seg_id < seg_id_end) { std::shared_ptr segment; - RETURN_IF_ERROR(load_segment(seg_id, &segment)); + RETURN_IF_ERROR(load_segment(seg_id, &segment, disable_file_cache)); segments->push_back(std::move(segment)); seg_id++; } return Status::OK(); } -Status BetaRowset::load_segment(int64_t seg_id, segment_v2::SegmentSharedPtr* segment) { +Status BetaRowset::load_segment(int64_t seg_id, segment_v2::SegmentSharedPtr* segment, + bool disable_file_cache) { auto fs = _rowset_meta->fs(); if (!fs) { return Status::Error("get fs failed"); @@ -171,11 +174,13 @@ Status BetaRowset::load_segment(int64_t seg_id, segment_v2::SegmentSharedPtr* se DCHECK(seg_id >= 0); auto seg_path = DORIS_TRY(segment_path(seg_id)); io::FileReaderOptions reader_options { - .cache_type = config::enable_file_cache ? io::FileCachePolicy::FILE_BLOCK_CACHE - : io::FileCachePolicy::NO_CACHE, + .cache_type = !disable_file_cache && config::enable_file_cache + ? io::FileCachePolicy::FILE_BLOCK_CACHE + : io::FileCachePolicy::NO_CACHE, .is_doris_table = true, .file_size = _rowset_meta->segment_file_size(seg_id), }; + auto s = segment_v2::Segment::open(fs, seg_path, seg_id, rowset_id(), _schema, reader_options, segment); if (!s.ok()) { diff --git a/be/src/olap/rowset/beta_rowset.h b/be/src/olap/rowset/beta_rowset.h index bf7daf8bdfa6f94..ded7bc0e2d650fd 100644 --- a/be/src/olap/rowset/beta_rowset.h +++ b/be/src/olap/rowset/beta_rowset.h @@ -71,12 +71,15 @@ class BetaRowset final : public Rowset { Status check_file_exist() override; - Status load_segments(std::vector* segments); + Status load_segments(std::vector* segments, + bool disable_file_cache = false); Status load_segments(int64_t seg_id_begin, int64_t seg_id_end, - std::vector* segments); + std::vector* segments, + bool disable_file_cache = false); - Status load_segment(int64_t seg_id, segment_v2::SegmentSharedPtr* segment); + Status load_segment(int64_t seg_id, segment_v2::SegmentSharedPtr* segment, + bool disable_file_cache = false); Status get_segments_size(std::vector* segments_size); diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp b/be/src/olap/rowset/beta_rowset_reader.cpp index 458b3d29547062f..4d953d1dbe37e74 100644 --- a/be/src/olap/rowset/beta_rowset_reader.cpp +++ b/be/src/olap/rowset/beta_rowset_reader.cpp @@ -249,10 +249,24 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context } // load segments - bool should_use_cache = use_cache || _read_context->reader_type == ReaderType::READER_QUERY; + bool disable_file_cache = false; + bool enable_segment_cache = true; + auto* state = read_context->runtime_state; + if (state != nullptr) { + disable_file_cache = state->query_options().__isset.disable_file_cache + ? state->query_options().disable_file_cache + : false; + enable_segment_cache = state->query_options().__isset.enable_segment_cache + ? state->query_options().enable_segment_cache + : true; + } + // When reader type is for query, session variable `enable_segment_cache` should be respected. + bool should_use_cache = use_cache || (_read_context->reader_type == ReaderType::READER_QUERY && + enable_segment_cache); SegmentCacheHandle segment_cache_handle; - RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(_rowset, &segment_cache_handle, - should_use_cache)); + RETURN_IF_ERROR(SegmentLoader::instance()->load_segments( + _rowset, &segment_cache_handle, should_use_cache, + /*need_load_pk_index_and_bf*/ false, disable_file_cache)); // create iterator for each segment auto& segments = segment_cache_handle.get_segments(); diff --git a/be/src/olap/segment_loader.cpp b/be/src/olap/segment_loader.cpp index 12ab89af0be283a..98db03512409012 100644 --- a/be/src/olap/segment_loader.cpp +++ b/be/src/olap/segment_loader.cpp @@ -52,7 +52,7 @@ void SegmentCache::erase(const SegmentCache::CacheKey& key) { Status SegmentLoader::load_segments(const BetaRowsetSharedPtr& rowset, SegmentCacheHandle* cache_handle, bool use_cache, - bool need_load_pk_index_and_bf) { + bool need_load_pk_index_and_bf, bool disable_file_cache) { if (cache_handle->is_inited()) { return Status::OK(); } @@ -62,7 +62,7 @@ Status SegmentLoader::load_segments(const BetaRowsetSharedPtr& rowset, continue; } segment_v2::SegmentSharedPtr segment; - RETURN_IF_ERROR(rowset->load_segment(i, &segment)); + RETURN_IF_ERROR(rowset->load_segment(i, &segment, disable_file_cache)); if (need_load_pk_index_and_bf) { RETURN_IF_ERROR(segment->load_pk_index_and_bf()); } diff --git a/be/src/olap/segment_loader.h b/be/src/olap/segment_loader.h index 5bb8fae3c418775..fc2f0d8c03fafea 100644 --- a/be/src/olap/segment_loader.h +++ b/be/src/olap/segment_loader.h @@ -118,7 +118,8 @@ class SegmentLoader { // Load segments of "rowset", return the "cache_handle" which contains segments. // If use_cache is true, it will be loaded from _cache. Status load_segments(const BetaRowsetSharedPtr& rowset, SegmentCacheHandle* cache_handle, - bool use_cache = false, bool need_load_pk_index_and_bf = false); + bool use_cache = false, bool need_load_pk_index_and_bf = false, + bool disable_file_cache = false); void erase_segment(const SegmentCache::CacheKey& key); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 1cba6eb87c576c7..9a9631f6ed9cdb1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -1882,6 +1882,8 @@ public void setIgnoreRuntimeFilterIds(String ignoreRuntimeFilterIds) { public static final String IGNORE_SHAPE_NODE = "ignore_shape_nodes"; + public static final String ENABLE_SEGMENT_CACHE = "enable_segment_cache"; + public Set getIgnoreShapePlanNodes() { return Arrays.stream(ignoreShapePlanNodes.split(",[\\s]*")).collect(ImmutableSet.toImmutableSet()); } @@ -2002,6 +2004,11 @@ public void setIgnoreShapePlanNodes(String ignoreShapePlanNodes) { }) public boolean useMaxLengthOfVarcharInCtas = true; + // Whether enable segment cache. Segment cache only works when FE's query options sets enableSegmentCache true + // along with BE's config `disable_segment_cache` false + @VariableMgr.VarAttr(name = ENABLE_SEGMENT_CACHE, needForward = true) + public boolean enableSegmentCache = true; + public boolean isEnableJoinSpill() { return enableJoinSpill; } @@ -3425,6 +3432,14 @@ public void setLoadStreamPerNode(int loadStreamPerNode) { this.loadStreamPerNode = loadStreamPerNode; } + public void setEnableSegmentCache(boolean value) { + this.enableSegmentCache = value; + } + + public boolean isEnableSegmentCache() { + return this.enableSegmentCache; + } + /** * Serialize to thrift object. * Used for rest api. @@ -3558,6 +3573,8 @@ public TQueryOptions toThrift() { tResult.setEnableShortCircuitQueryAccessColumnStore(enableShortCircuitQueryAcessColumnStore); tResult.setReadCsvEmptyLineAsNull(readCsvEmptyLineAsNull); tResult.setSerdeDialect(getSerdeDialect()); + + tResult.setEnableSegmentCache(enableSegmentCache); return tResult; } diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 0e0a87eae0b98e8..1186022f7384a71 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -308,7 +308,7 @@ struct TQueryOptions { 113: optional bool enable_local_merge_sort = false; 114: optional bool enable_parallel_result_sink = false; - + 115: optional bool enable_short_circuit_query_access_column_store = false; 116: optional bool enable_no_need_read_data_opt = true; @@ -316,7 +316,12 @@ struct TQueryOptions { 117: optional bool read_csv_empty_line_as_null = false; 118: optional TSerdeDialect serde_dialect = TSerdeDialect.DORIS; - // For cloud, to control if the content would be written into file cache + + 119: optional bool enable_segment_cache = true + + // For cloud. + // In write path, to control if the content would be written into file cache. + // In read path, read from file cache or remote storage when execute query. 1000: optional bool disable_file_cache = false }