Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature](Cloud) Support session variable disable_file_cache and enable_segment_cache in query #37141

Merged
merged 2 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/exec/rowid_fetcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ Status RowIdStorageReader::read_by_rowids(const PMultiGetRequest& request,
<< ", row_size:" << row_size;
*response->add_row_locs() = row_loc;
});
// TODO: supoort session variable enable_page_cache and disable_file_cache if necessary.
SegmentCacheHandle segment_cache;
RETURN_IF_ERROR(scope_timer_run(
[&]() {
Expand Down
9 changes: 8 additions & 1 deletion be/src/olap/parallel_scanner_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,13 +179,20 @@ Status ParallelScannerBuilder::_load() {
RETURN_IF_ERROR(tablet->capture_consistent_rowsets_unlocked({0, version}, &rowsets));
}

bool enable_segment_cache = _state->query_options().__isset.enable_segment_cache
? _state->query_options().enable_segment_cache
: true;
bool disable_file_cache = _state->query_options().__isset.disable_file_cache
? _state->query_options().disable_file_cache
: false;
for (auto& rowset : rowsets) {
RETURN_IF_ERROR(rowset->load());
const auto rowset_id = rowset->rowset_id();
auto& segment_cache_handle = _segment_cache_handles[rowset_id];

RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(
std::dynamic_pointer_cast<BetaRowset>(rowset), &segment_cache_handle, true));
std::dynamic_pointer_cast<BetaRowset>(rowset), &segment_cache_handle,
enable_segment_cache, false, disable_file_cache));
_total_rows += rowset->num_rows();
}
}
Expand Down
19 changes: 12 additions & 7 deletions be/src/olap/rowset/beta_rowset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,23 +147,26 @@ Status BetaRowset::get_segments_size(std::vector<size_t>* segments_size) {
return Status::OK();
}

Status BetaRowset::load_segments(std::vector<segment_v2::SegmentSharedPtr>* segments) {
return load_segments(0, num_segments(), segments);
Status BetaRowset::load_segments(std::vector<segment_v2::SegmentSharedPtr>* segments,
bool disable_file_cache) {
return load_segments(0, num_segments(), segments, disable_file_cache);
}

Status BetaRowset::load_segments(int64_t seg_id_begin, int64_t seg_id_end,
std::vector<segment_v2::SegmentSharedPtr>* segments) {
std::vector<segment_v2::SegmentSharedPtr>* segments,
bool disable_file_cache) {
int64_t seg_id = seg_id_begin;
while (seg_id < seg_id_end) {
std::shared_ptr<segment_v2::Segment> segment;
RETURN_IF_ERROR(load_segment(seg_id, &segment));
RETURN_IF_ERROR(load_segment(seg_id, &segment, disable_file_cache));
segments->push_back(std::move(segment));
seg_id++;
}
return Status::OK();
}

Status BetaRowset::load_segment(int64_t seg_id, segment_v2::SegmentSharedPtr* segment) {
Status BetaRowset::load_segment(int64_t seg_id, segment_v2::SegmentSharedPtr* segment,
bool disable_file_cache) {
auto fs = _rowset_meta->fs();
if (!fs) {
return Status::Error<INIT_FAILED>("get fs failed");
Expand All @@ -172,12 +175,14 @@ Status BetaRowset::load_segment(int64_t seg_id, segment_v2::SegmentSharedPtr* se
DCHECK(seg_id >= 0);
auto seg_path = DORIS_TRY(segment_path(seg_id));
io::FileReaderOptions reader_options {
.cache_type = config::enable_file_cache ? io::FileCachePolicy::FILE_BLOCK_CACHE
: io::FileCachePolicy::NO_CACHE,
.cache_type = !disable_file_cache && config::enable_file_cache
? io::FileCachePolicy::FILE_BLOCK_CACHE
: io::FileCachePolicy::NO_CACHE,
.is_doris_table = true,
.cache_base_path = "",
.file_size = _rowset_meta->segment_file_size(seg_id),
};

auto s = segment_v2::Segment::open(fs, seg_path, seg_id, rowset_id(), _schema, reader_options,
segment);
if (!s.ok()) {
Expand Down
9 changes: 6 additions & 3 deletions be/src/olap/rowset/beta_rowset.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,15 @@ class BetaRowset final : public Rowset {

Status check_file_exist() override;

Status load_segments(std::vector<segment_v2::SegmentSharedPtr>* segments);
Status load_segments(std::vector<segment_v2::SegmentSharedPtr>* segments,
bool disable_file_cache = false);

Status load_segments(int64_t seg_id_begin, int64_t seg_id_end,
std::vector<segment_v2::SegmentSharedPtr>* segments);
std::vector<segment_v2::SegmentSharedPtr>* segments,
bool disable_file_cache = false);

Status load_segment(int64_t seg_id, segment_v2::SegmentSharedPtr* segment);
Status load_segment(int64_t seg_id, segment_v2::SegmentSharedPtr* segment,
bool disable_file_cache = false);

Status get_segments_size(std::vector<size_t>* segments_size);

Expand Down
20 changes: 17 additions & 3 deletions be/src/olap/rowset/beta_rowset_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -249,10 +249,24 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
}

// load segments
bool should_use_cache = use_cache || _read_context->reader_type == ReaderType::READER_QUERY;
bool disable_file_cache = false;
bool enable_segment_cache = true;
auto* state = read_context->runtime_state;
if (state != nullptr) {
disable_file_cache = state->query_options().__isset.disable_file_cache
? state->query_options().disable_file_cache
: false;
enable_segment_cache = state->query_options().__isset.enable_segment_cache
? state->query_options().enable_segment_cache
: true;
}
// When reader type is for query, session variable `enable_segment_cache` should be respected.
bool should_use_cache = use_cache || (_read_context->reader_type == ReaderType::READER_QUERY &&
enable_segment_cache);
SegmentCacheHandle segment_cache_handle;
RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(_rowset, &segment_cache_handle,
should_use_cache));
RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(
_rowset, &segment_cache_handle, should_use_cache,
/*need_load_pk_index_and_bf*/ false, disable_file_cache));

// create iterator for each segment
auto& segments = segment_cache_handle.get_segments();
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/segment_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ void SegmentCache::erase(const SegmentCache::CacheKey& key) {

Status SegmentLoader::load_segments(const BetaRowsetSharedPtr& rowset,
SegmentCacheHandle* cache_handle, bool use_cache,
bool need_load_pk_index_and_bf) {
bool need_load_pk_index_and_bf, bool disable_file_cache) {
if (cache_handle->is_inited()) {
return Status::OK();
}
Expand All @@ -62,7 +62,7 @@ Status SegmentLoader::load_segments(const BetaRowsetSharedPtr& rowset,
continue;
}
segment_v2::SegmentSharedPtr segment;
RETURN_IF_ERROR(rowset->load_segment(i, &segment));
RETURN_IF_ERROR(rowset->load_segment(i, &segment, disable_file_cache));
if (need_load_pk_index_and_bf) {
RETURN_IF_ERROR(segment->load_pk_index_and_bf());
}
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/segment_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ class SegmentLoader {
// Load segments of "rowset", return the "cache_handle" which contains segments.
// If use_cache is true, it will be loaded from _cache.
Status load_segments(const BetaRowsetSharedPtr& rowset, SegmentCacheHandle* cache_handle,
bool use_cache = false, bool need_load_pk_index_and_bf = false);
bool use_cache = false, bool need_load_pk_index_and_bf = false,
bool disable_file_cache = false);

void erase_segment(const SegmentCache::CacheKey& key);

Expand Down
17 changes: 17 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1943,6 +1943,8 @@ public void setIgnoreRuntimeFilterIds(String ignoreRuntimeFilterIds) {

public static final String IGNORE_SHAPE_NODE = "ignore_shape_nodes";

public static final String ENABLE_SEGMENT_CACHE = "enable_segment_cache";

public Set<String> getIgnoreShapePlanNodes() {
return Arrays.stream(ignoreShapePlanNodes.split(",[\\s]*")).collect(ImmutableSet.toImmutableSet());
}
Expand Down Expand Up @@ -2060,6 +2062,11 @@ public void setIgnoreShapePlanNodes(String ignoreShapePlanNodes) {
})
public boolean useMaxLengthOfVarcharInCtas = true;

// Whether enable segment cache. Segment cache only works when FE's query options sets enableSegmentCache true
// along with BE's config `disable_segment_cache` false
@VariableMgr.VarAttr(name = ENABLE_SEGMENT_CACHE, needForward = true)
public boolean enableSegmentCache = true;

/**
* When enabling shard scroll, FE will plan scan ranges by shards of ES indices.
* Otherwise, FE will plan a single query to ES.
Expand Down Expand Up @@ -3523,6 +3530,14 @@ public void setLoadStreamPerNode(int loadStreamPerNode) {
this.loadStreamPerNode = loadStreamPerNode;
}

public void setEnableSegmentCache(boolean value) {
this.enableSegmentCache = value;
}

public boolean isEnableSegmentCache() {
return this.enableSegmentCache;
}

/**
* Serialize to thrift object.
* Used for rest api.
Expand Down Expand Up @@ -3657,6 +3672,8 @@ public TQueryOptions toThrift() {
tResult.setHiveOrcUseColumnNames(hiveOrcUseColumnNames);
tResult.setHiveParquetUseColumnNames(hiveParquetUseColumnNames);
tResult.setKeepCarriageReturn(keepCarriageReturn);

tResult.setEnableSegmentCache(enableSegmentCache);
return tResult;
}

Expand Down
8 changes: 6 additions & 2 deletions gensrc/thrift/PaloInternalService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ struct TQueryOptions {
113: optional bool enable_local_merge_sort = false;

114: optional bool enable_parallel_result_sink = false;

115: optional bool enable_short_circuit_query_access_column_store = false;

116: optional bool enable_no_need_read_data_opt = true;
Expand All @@ -324,13 +324,17 @@ struct TQueryOptions {
121: optional bool keep_carriage_return = false; // \n,\r\n split line in CSV.

122: optional i32 runtime_bloom_filter_min_size = 1048576;

//Access Parquet/ORC columns by name by default. Set this property to `false` to access columns
//by their ordinal position in the Hive table definition.
123: optional bool hive_parquet_use_column_names = true;
124: optional bool hive_orc_use_column_names = true;

125: optional bool enable_segment_cache = true;

// For cloud, to control if the content would be written into file cache
// In write path, to control if the content would be written into file cache.
// In read path, read from file cache or remote storage when execute query.
1000: optional bool disable_file_cache = false
}

Expand Down
Loading