Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](paimon) fix crash when enable cache with paimon deletion vector(#39877) #39875

Merged
merged 1 commit into from
Aug 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions be/src/vec/exec/format/table/iceberg_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ Status IcebergTableReader::get_columns(
return _file_format_reader->get_columns(name_to_type, missing_cols);
}

Status IcebergTableReader::init_row_filters(const TFileRangeDesc& range) {
Status IcebergTableReader::init_row_filters(const TFileRangeDesc& range, io::IOContext* io_ctx) {
// We get the count value by doris's be, so we don't need to read the delete file
if (_push_down_agg_type == TPushAggOp::type::COUNT && _remaining_push_down_count > 0) {
return Status::OK();
Expand Down Expand Up @@ -548,7 +548,7 @@ Status IcebergParquetReader::init_reader(
_gen_new_colname_to_value_range();
parquet_reader->set_table_to_file_col_map(_table_col_to_file_col);
parquet_reader->iceberg_sanitize(_all_required_col_names);
RETURN_IF_ERROR(init_row_filters(_range));
RETURN_IF_ERROR(init_row_filters(_range, _io_ctx));
return parquet_reader->init_reader(
_all_required_col_names, _not_in_file_col_names, &_new_colname_to_value_range,
conjuncts, tuple_descriptor, row_descriptor, colname_to_slot_id,
Expand Down Expand Up @@ -621,7 +621,7 @@ Status IcebergOrcReader::init_reader(
_gen_file_col_names();
_gen_new_colname_to_value_range();
orc_reader->set_table_col_to_file_col(_table_col_to_file_col);
RETURN_IF_ERROR(init_row_filters(_range));
RETURN_IF_ERROR(init_row_filters(_range, _io_ctx));
return orc_reader->init_reader(&_all_required_col_names, &_new_colname_to_value_range,
conjuncts, false, tuple_descriptor, row_descriptor,
not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts);
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exec/format/table/iceberg_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class IcebergTableReader : public TableFormatReader {
int64_t push_down_count);
~IcebergTableReader() override = default;

Status init_row_filters(const TFileRangeDesc& range) final;
Status init_row_filters(const TFileRangeDesc& range, io::IOContext* io_ctx) final;

Status get_next_block(Block* block, size_t* read_rows, bool* eof) final;

Expand Down
5 changes: 3 additions & 2 deletions be/src/vec/exec/format/table/paimon_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ PaimonReader::PaimonReader(std::unique_ptr<GenericReader> file_format_reader,
ADD_CHILD_TIMER(_profile, "DeleteFileReadTime", paimon_profile);
}

Status PaimonReader::init_row_filters(const TFileRangeDesc& range) {
Status PaimonReader::init_row_filters(const TFileRangeDesc& range, io::IOContext* io_ctx) {
const auto& table_desc = range.table_format_params.paimon_params;
if (!table_desc.__isset.deletion_file) {
return Status::OK();
Expand Down Expand Up @@ -80,7 +80,8 @@ Status PaimonReader::init_row_filters(const TFileRangeDesc& range) {
Slice result(buf.data(), bytes_read);
{
SCOPED_TIMER(_paimon_profile.delete_files_read_time);
RETURN_IF_ERROR(delete_file_reader->read_at(deletion_file.offset, result, &bytes_read));
RETURN_IF_ERROR(
delete_file_reader->read_at(deletion_file.offset, result, &bytes_read, io_ctx));
}
if (bytes_read != deletion_file.length + 4) {
return Status::IOError(
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exec/format/table/paimon_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class PaimonReader : public TableFormatReader {
RuntimeState* state, const TFileScanRangeParams& params);
~PaimonReader() override = default;

Status init_row_filters(const TFileRangeDesc& range) final;
Status init_row_filters(const TFileRangeDesc& range, io::IOContext* io_ctx) final;

protected:
struct PaimonProfile {
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exec/format/table/table_format_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class TableFormatReader : public GenericReader {

bool fill_all_columns() const override { return _file_format_reader->fill_all_columns(); }

virtual Status init_row_filters(const TFileRangeDesc& range) = 0;
virtual Status init_row_filters(const TFileRangeDesc& range, io::IOContext* io_ctx) = 0;

protected:
void _collect_profile_before_close() override {
Expand Down
3 changes: 2 additions & 1 deletion be/src/vec/exec/format/table/transactional_hive_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ Status TransactionalHiveReader::get_columns(
return _file_format_reader->get_columns(name_to_type, missing_cols);
}

Status TransactionalHiveReader::init_row_filters(const TFileRangeDesc& range) {
Status TransactionalHiveReader::init_row_filters(const TFileRangeDesc& range,
io::IOContext* io_ctx) {
std::string data_file_path = _range.path;
// the path in _range is remove the namenode prefix,
// and the file_path in delete file is full path, so we should add it back.
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exec/format/table/transactional_hive_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class TransactionalHiveReader : public TableFormatReader {
io::IOContext* io_ctx);
~TransactionalHiveReader() override = default;

Status init_row_filters(const TFileRangeDesc& range) override;
Status init_row_filters(const TFileRangeDesc& range, io::IOContext* io_ctx) override;

Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;

Expand Down
6 changes: 3 additions & 3 deletions be/src/vec/exec/scan/vfile_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -859,7 +859,7 @@ Status VFileScanner::_get_next_reader() {
std::unique_ptr<PaimonParquetReader> paimon_reader =
PaimonParquetReader::create_unique(std::move(parquet_reader), _profile,
_state, *_params);
RETURN_IF_ERROR(paimon_reader->init_row_filters(range));
RETURN_IF_ERROR(paimon_reader->init_row_filters(range, _io_ctx.get()));
_cur_reader = std::move(paimon_reader);
} else {
bool hive_parquet_use_column_names = true;
Expand Down Expand Up @@ -907,7 +907,7 @@ Status VFileScanner::_get_next_reader() {
_file_col_names, _colname_to_value_range, _push_down_conjuncts,
_real_tuple_desc, _default_val_row_desc.get(),
&_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts);
RETURN_IF_ERROR(tran_orc_reader->init_row_filters(range));
RETURN_IF_ERROR(tran_orc_reader->init_row_filters(range, _io_ctx.get()));
_cur_reader = std::move(tran_orc_reader);
} else if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "iceberg") {
Expand All @@ -929,7 +929,7 @@ Status VFileScanner::_get_next_reader() {
&_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts);
std::unique_ptr<PaimonOrcReader> paimon_reader = PaimonOrcReader::create_unique(
std::move(orc_reader), _profile, _state, *_params);
RETURN_IF_ERROR(paimon_reader->init_row_filters(range));
RETURN_IF_ERROR(paimon_reader->init_row_filters(range, _io_ctx.get()));
_cur_reader = std::move(paimon_reader);
} else {
bool hive_orc_use_column_names = true;
Expand Down
Loading
Loading