Skip to content

Commit

Permalink
[Feature](Cloud) Support session variable disable_file_cache in query.
Browse files Browse the repository at this point in the history
  • Loading branch information
wangshuo128 committed Jul 4, 2024
1 parent 4f24b7a commit 09e731c
Show file tree
Hide file tree
Showing 9 changed files with 72 additions and 19 deletions.
1 change: 1 addition & 0 deletions be/src/exec/rowid_fetcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ Status RowIdStorageReader::read_by_rowids(const PMultiGetRequest& request,
<< ", row_size:" << row_size;
*response->add_row_locs() = row_loc;
});
// TODO: supoort session variable enable_page_cache and disable_file_cache if necessary.
SegmentCacheHandle segment_cache;
RETURN_IF_ERROR(scope_timer_run(
[&]() {
Expand Down
9 changes: 8 additions & 1 deletion be/src/olap/parallel_scanner_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,13 +179,20 @@ Status ParallelScannerBuilder::_load() {
RETURN_IF_ERROR(tablet->capture_consistent_rowsets_unlocked({0, version}, &rowsets));
}

bool enable_segment_cache = _state->query_options().__isset.enable_segment_cache
? _state->query_options().enable_segment_cache
: true;
bool disable_file_cache = _state->query_options().__isset.disable_file_cache
? _state->query_options().disable_file_cache
: false;
for (auto& rowset : rowsets) {
RETURN_IF_ERROR(rowset->load());
const auto rowset_id = rowset->rowset_id();
auto& segment_cache_handle = _segment_cache_handles[rowset_id];

RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(
std::dynamic_pointer_cast<BetaRowset>(rowset), &segment_cache_handle, true));
std::dynamic_pointer_cast<BetaRowset>(rowset), &segment_cache_handle,
enable_segment_cache, false, disable_file_cache));
_total_rows += rowset->num_rows();
}
}
Expand Down
19 changes: 12 additions & 7 deletions be/src/olap/rowset/beta_rowset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,23 +146,26 @@ Status BetaRowset::get_segments_size(std::vector<size_t>* segments_size) {
return Status::OK();
}

Status BetaRowset::load_segments(std::vector<segment_v2::SegmentSharedPtr>* segments) {
return load_segments(0, num_segments(), segments);
Status BetaRowset::load_segments(std::vector<segment_v2::SegmentSharedPtr>* segments,
bool disable_file_cache) {
return load_segments(0, num_segments(), segments, disable_file_cache);
}

Status BetaRowset::load_segments(int64_t seg_id_begin, int64_t seg_id_end,
std::vector<segment_v2::SegmentSharedPtr>* segments) {
std::vector<segment_v2::SegmentSharedPtr>* segments,
bool disable_file_cache) {
int64_t seg_id = seg_id_begin;
while (seg_id < seg_id_end) {
std::shared_ptr<segment_v2::Segment> segment;
RETURN_IF_ERROR(load_segment(seg_id, &segment));
RETURN_IF_ERROR(load_segment(seg_id, &segment, disable_file_cache));
segments->push_back(std::move(segment));
seg_id++;
}
return Status::OK();
}

Status BetaRowset::load_segment(int64_t seg_id, segment_v2::SegmentSharedPtr* segment) {
Status BetaRowset::load_segment(int64_t seg_id, segment_v2::SegmentSharedPtr* segment,
bool disable_file_cache) {
auto fs = _rowset_meta->fs();
if (!fs) {
return Status::Error<INIT_FAILED>("get fs failed");
Expand All @@ -171,11 +174,13 @@ Status BetaRowset::load_segment(int64_t seg_id, segment_v2::SegmentSharedPtr* se
DCHECK(seg_id >= 0);
auto seg_path = DORIS_TRY(segment_path(seg_id));
io::FileReaderOptions reader_options {
.cache_type = config::enable_file_cache ? io::FileCachePolicy::FILE_BLOCK_CACHE
: io::FileCachePolicy::NO_CACHE,
.cache_type = !disable_file_cache && config::enable_file_cache
? io::FileCachePolicy::FILE_BLOCK_CACHE
: io::FileCachePolicy::NO_CACHE,
.is_doris_table = true,
.file_size = _rowset_meta->segment_file_size(seg_id),
};

auto s = segment_v2::Segment::open(fs, seg_path, seg_id, rowset_id(), _schema, reader_options,
segment);
if (!s.ok()) {
Expand Down
9 changes: 6 additions & 3 deletions be/src/olap/rowset/beta_rowset.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,15 @@ class BetaRowset final : public Rowset {

Status check_file_exist() override;

Status load_segments(std::vector<segment_v2::SegmentSharedPtr>* segments);
Status load_segments(std::vector<segment_v2::SegmentSharedPtr>* segments,
bool disable_file_cache = false);

Status load_segments(int64_t seg_id_begin, int64_t seg_id_end,
std::vector<segment_v2::SegmentSharedPtr>* segments);
std::vector<segment_v2::SegmentSharedPtr>* segments,
bool disable_file_cache = false);

Status load_segment(int64_t seg_id, segment_v2::SegmentSharedPtr* segment);
Status load_segment(int64_t seg_id, segment_v2::SegmentSharedPtr* segment,
bool disable_file_cache = false);

Status get_segments_size(std::vector<size_t>* segments_size);

Expand Down
20 changes: 17 additions & 3 deletions be/src/olap/rowset/beta_rowset_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -249,10 +249,24 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
}

// load segments
bool should_use_cache = use_cache || _read_context->reader_type == ReaderType::READER_QUERY;
bool disable_file_cache = false;
bool enable_segment_cache = true;
auto* state = read_context->runtime_state;
if (state != nullptr) {
disable_file_cache = state->query_options().__isset.disable_file_cache
? state->query_options().disable_file_cache
: false;
enable_segment_cache = state->query_options().__isset.enable_segment_cache
? state->query_options().enable_segment_cache
: true;
}
// When reader type is for query, session variable `enable_segment_cache` should be respected.
bool should_use_cache = use_cache || (_read_context->reader_type == ReaderType::READER_QUERY &&
enable_segment_cache);
SegmentCacheHandle segment_cache_handle;
RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(_rowset, &segment_cache_handle,
should_use_cache));
RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(
_rowset, &segment_cache_handle, should_use_cache,
/*need_load_pk_index_and_bf*/ false, disable_file_cache));

// create iterator for each segment
auto& segments = segment_cache_handle.get_segments();
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/segment_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ void SegmentCache::erase(const SegmentCache::CacheKey& key) {

Status SegmentLoader::load_segments(const BetaRowsetSharedPtr& rowset,
SegmentCacheHandle* cache_handle, bool use_cache,
bool need_load_pk_index_and_bf) {
bool need_load_pk_index_and_bf, bool disable_file_cache) {
if (cache_handle->is_inited()) {
return Status::OK();
}
Expand All @@ -62,7 +62,7 @@ Status SegmentLoader::load_segments(const BetaRowsetSharedPtr& rowset,
continue;
}
segment_v2::SegmentSharedPtr segment;
RETURN_IF_ERROR(rowset->load_segment(i, &segment));
RETURN_IF_ERROR(rowset->load_segment(i, &segment, disable_file_cache));
if (need_load_pk_index_and_bf) {
RETURN_IF_ERROR(segment->load_pk_index_and_bf());
}
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/segment_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ class SegmentLoader {
// Load segments of "rowset", return the "cache_handle" which contains segments.
// If use_cache is true, it will be loaded from _cache.
Status load_segments(const BetaRowsetSharedPtr& rowset, SegmentCacheHandle* cache_handle,
bool use_cache = false, bool need_load_pk_index_and_bf = false);
bool use_cache = false, bool need_load_pk_index_and_bf = false,
bool disable_file_cache = false);

void erase_segment(const SegmentCache::CacheKey& key);

Expand Down
17 changes: 17 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1882,6 +1882,8 @@ public void setIgnoreRuntimeFilterIds(String ignoreRuntimeFilterIds) {

public static final String IGNORE_SHAPE_NODE = "ignore_shape_nodes";

public static final String ENABLE_SEGMENT_CACHE = "enable_segment_cache";

public Set<String> getIgnoreShapePlanNodes() {
return Arrays.stream(ignoreShapePlanNodes.split(",[\\s]*")).collect(ImmutableSet.toImmutableSet());
}
Expand Down Expand Up @@ -2002,6 +2004,11 @@ public void setIgnoreShapePlanNodes(String ignoreShapePlanNodes) {
})
public boolean useMaxLengthOfVarcharInCtas = true;

// Whether enable segment cache. Segment cache only works when FE's query options sets enableSegmentCache true
// along with BE's config `disable_segment_cache` false
@VariableMgr.VarAttr(name = ENABLE_SEGMENT_CACHE, needForward = true)
public boolean enableSegmentCache = true;

public boolean isEnableJoinSpill() {
return enableJoinSpill;
}
Expand Down Expand Up @@ -3425,6 +3432,14 @@ public void setLoadStreamPerNode(int loadStreamPerNode) {
this.loadStreamPerNode = loadStreamPerNode;
}

public void setEnableSegmentCache(boolean value) {
this.enableSegmentCache = value;
}

public boolean isEnableSegmentCache() {
return this.enableSegmentCache;
}

/**
* Serialize to thrift object.
* Used for rest api.
Expand Down Expand Up @@ -3558,6 +3573,8 @@ public TQueryOptions toThrift() {
tResult.setEnableShortCircuitQueryAccessColumnStore(enableShortCircuitQueryAcessColumnStore);
tResult.setReadCsvEmptyLineAsNull(readCsvEmptyLineAsNull);
tResult.setSerdeDialect(getSerdeDialect());

tResult.setEnableSegmentCache(enableSegmentCache);
return tResult;
}

Expand Down
9 changes: 7 additions & 2 deletions gensrc/thrift/PaloInternalService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -308,15 +308,20 @@ struct TQueryOptions {
113: optional bool enable_local_merge_sort = false;

114: optional bool enable_parallel_result_sink = false;

115: optional bool enable_short_circuit_query_access_column_store = false;

116: optional bool enable_no_need_read_data_opt = true;

117: optional bool read_csv_empty_line_as_null = false;

118: optional TSerdeDialect serde_dialect = TSerdeDialect.DORIS;
// For cloud, to control if the content would be written into file cache

119: optional bool enable_segment_cache = true

// For cloud.
// In write path, to control if the content would be written into file cache.
// In read path, read from file cache or remote storage when execute query.
1000: optional bool disable_file_cache = false
}

Expand Down

0 comments on commit 09e731c

Please sign in to comment.