Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

merge #1

Merged
merged 1 commit into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1392,9 +1392,12 @@ void update_s3_resource(const TStorageResource& param, io::RemoteFileSystemSPtr
auto client = static_cast<io::S3FileSystem*>(existed_fs.get())->client_holder();
auto new_s3_conf = S3Conf::get_s3_conf(param.s3_storage_param);
S3ClientConf conf {
.endpoint {},
.region {},
.ak = std::move(new_s3_conf.client_conf.ak),
.sk = std::move(new_s3_conf.client_conf.sk),
.token = std::move(new_s3_conf.client_conf.token),
.bucket {},
.provider = new_s3_conf.client_conf.provider,
};
st = client->reset(conf);
Expand Down
1 change: 1 addition & 0 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ void CloudTablet::add_rowsets(std::vector<RowsetSharedPtr> to_add, bool version_
{
.expiration_time = expiration_time,
},
.download_done {},
});
}
#endif
Expand Down
1 change: 1 addition & 0 deletions be/src/io/cache/block_file_cache_downloader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ void FileCacheBlockDownloader::download_segment_file(const DownloadFileMeta& met
FileReaderOptions opts {
.cache_type = FileCachePolicy::FILE_BLOCK_CACHE,
.is_doris_table = true,
.cache_base_path {},
.file_size = meta.file_size,
};
auto st = meta.file_system->open_file(meta.path, &file_reader, &opts);
Expand Down
6 changes: 5 additions & 1 deletion be/src/io/file_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,11 @@ constexpr std::string_view RANDOM_CACHE_BASE_PATH = "random";

io::FileReaderOptions FileFactory::get_reader_options(RuntimeState* state,
const io::FileDescription& fd) {
io::FileReaderOptions opts {.file_size = fd.file_size, .mtime = fd.mtime};
io::FileReaderOptions opts {
.cache_base_path {},
.file_size = fd.file_size,
.mtime = fd.mtime,
};
if (config::enable_file_cache && state != nullptr &&
state->query_options().__isset.enable_file_cache &&
state->query_options().enable_file_cache) {
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/rowset/beta_rowset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ Status BetaRowset::load_segment(int64_t seg_id, segment_v2::SegmentSharedPtr* se
.cache_type = config::enable_file_cache ? io::FileCachePolicy::FILE_BLOCK_CACHE
: io::FileCachePolicy::NO_CACHE,
.is_doris_table = true,
.cache_base_path = "",
.file_size = _rowset_meta->segment_file_size(seg_id),
};
auto s = segment_v2::Segment::open(fs, seg_path, seg_id, rowset_id(), _schema, reader_options,
Expand Down Expand Up @@ -532,6 +533,7 @@ Status BetaRowset::check_current_rowset_segment() {
.cache_type = config::enable_file_cache ? io::FileCachePolicy::FILE_BLOCK_CACHE
: io::FileCachePolicy::NO_CACHE,
.is_doris_table = true,
.cache_base_path {},
.file_size = _rowset_meta->segment_file_size(seg_id),
};
auto s = segment_v2::Segment::open(fs, seg_path, seg_id, rowset_id(), _schema,
Expand Down
4 changes: 3 additions & 1 deletion be/src/olap/rowset/beta_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,9 @@ Status BetaRowsetWriter::_load_noncompacted_segment(segment_v2::SegmentSharedPtr
? (config::enable_file_cache ? io::FileCachePolicy::FILE_BLOCK_CACHE
: io::FileCachePolicy::NO_CACHE)
: io::FileCachePolicy::NO_CACHE,
.is_doris_table = true};
.is_doris_table = true,
.cache_base_path {},
};
auto s = segment_v2::Segment::open(io::global_local_filesystem(), path, segment_id, rowset_id(),
_context.tablet_schema, reader_options, &segment);
if (!s.ok()) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/segment_v2/column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ struct ColumnIteratorOptions {
// for page cache allocation
// page types are divided into DATA_PAGE & INDEX_PAGE
// INDEX_PAGE including index_page, dict_page and short_key_page
PageTypePB type;
PageTypePB type = PageTypePB::UNKNOWN_PAGE_TYPE;
io::FileReader* file_reader = nullptr; // Ref
// reader statistics
OlapReaderStatistics* stats = nullptr; // Ref
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/load_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ Status TabletStream::init(std::shared_ptr<OlapTableSchemaParam> schema, int64_t
.load_id = _load_id,
.table_schema_param = schema,
// TODO(plat1ko): write_file_cache
.storage_vault_id {},
};

_load_stream_writer = std::make_shared<LoadStreamWriter>(&req, _profile);
Expand Down
4 changes: 2 additions & 2 deletions be/src/runtime/workload_group/workload_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ WorkloadGroupInfo WorkloadGroupInfo::parse_topic_info(
if (tworkload_group_info.__isset.id) {
tg_id = tworkload_group_info.id;
} else {
return {.valid = false};
return {.name = "", .valid = false};
}

// 2 name
Expand All @@ -266,7 +266,7 @@ WorkloadGroupInfo WorkloadGroupInfo::parse_topic_info(
if (tworkload_group_info.__isset.version) {
version = tworkload_group_info.version;
} else {
return {.valid = false};
return {.name {}, .valid = false};
}

// 4 cpu_share
Expand Down
1 change: 1 addition & 0 deletions be/src/util/s3_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ S3Conf S3Conf::get_s3_conf(const cloud::ObjectStoreInfoPB& info) {
.region = info.region(),
.ak = info.ak(),
.sk = info.sk(),
.token {},
.bucket = info.bucket(),
.provider = io::ObjStorageType::AWS,
},
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/exec/format/table/paimon_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ Status PaimonReader::init_row_filters(const TFileRangeDesc& range) {
.system_type = _params.file_type,
.properties = _params.properties,
.hdfs_params = _params.hdfs_params,
.broker_addresses {},
};
if (range.__isset.file_type) {
// for compatibility
Expand Down
19 changes: 19 additions & 0 deletions be/src/vec/exec/scan/new_olap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,27 @@ NewOlapScanner::NewOlapScanner(pipeline::ScanLocalStateBase* parent,
_key_ranges(std::move(params.key_ranges)),
_tablet_reader_params({
.tablet = std::move(params.tablet),
.tablet_schema {},
.aggregation = params.aggregation,
.version = {0, params.version},
.start_key {},
.end_key {},
.conditions {},
.bloom_filters {},
.bitmap_filters {},
.in_filters {},
.conditions_except_leafnode_of_andnode {},
.function_filters {},
.delete_predicates {},
.target_cast_type_for_variants {},
.rs_splits {},
.return_columns {},
.output_columns {},
.remaining_conjunct_roots {},
.common_expr_ctxs_push_down {},
.topn_filter_source_node_ids {},
.filter_block_conjuncts {},
.key_group_cluster_key_idxes {},
}) {
_tablet_reader_params.set_read_source(std::move(params.read_source));
_is_init = false;
Expand Down
3 changes: 2 additions & 1 deletion be/src/vec/sink/writer/iceberg/viceberg_partition_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ Status VIcebergPartitionWriter::open(RuntimeState* state, RuntimeProfile* profil
io::FSPropertiesRef fs_properties(_write_info.file_type);
fs_properties.properties = &_hadoop_conf;
io::FileDescription file_description = {
.path = fmt::format("{}/{}", _write_info.write_path, _get_target_file_name())};
.path = fmt::format("{}/{}", _write_info.write_path, _get_target_file_name()),
.fs_name {}};
_fs = DORIS_TRY(FileFactory::create_fs(fs_properties, file_description));
io::FileWriterOptions file_writer_options = {.used_by_s3_committer = false};
RETURN_IF_ERROR(_fs->create_file(file_description.path, &_file_writer, &file_writer_options));
Expand Down
3 changes: 2 additions & 1 deletion be/src/vec/sink/writer/vhive_partition_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ Status VHivePartitionWriter::open(RuntimeState* state, RuntimeProfile* profile)
io::FSPropertiesRef fs_properties(_write_info.file_type);
fs_properties.properties = &_hadoop_conf;
io::FileDescription file_description = {
.path = fmt::format("{}/{}", _write_info.write_path, _get_target_file_name())};
.path = fmt::format("{}/{}", _write_info.write_path, _get_target_file_name()),
.fs_name {}};
// If the destination path contains a schema, use the schema directly.
// If not, use defaultFS.
// Otherwise a write error will occur.
Expand Down
1 change: 1 addition & 0 deletions be/src/vec/sink/writer/vtablet_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,7 @@ Status VTabletWriterV2::_write_memtable(std::shared_ptr<vectorized::Block> block
.table_schema_param = _schema,
.is_high_priority = _is_high_priority,
.write_file_cache = _write_file_cache,
.storage_vault_id {},
};
bool index_not_found = true;
for (const auto& index : _schema->indexes()) {
Expand Down