Skip to content

Commit

Permalink
[feature](realtime-pk) rows conversion for compaction (apache#34)
Browse files Browse the repository at this point in the history
Conflicts:
	be/src/olap/iterators.h
	be/src/olap/merger.cpp
	be/src/olap/reader.cpp
	be/src/olap/rowset/beta_rowset_reader.cpp
	be/src/olap/rowset/rowset_reader.h
	be/src/olap/rowset/rowset_reader_context.h
	be/src/olap/rowset/segment_v2/segment_iterator.cpp
	be/src/vec/olap/vgeneric_iterators.cpp
	be/test/CMakeLists.txt
  • Loading branch information
liaoxin01 committed Jul 22, 2022
1 parent 9017afb commit 06c1972
Show file tree
Hide file tree
Showing 23 changed files with 447 additions and 6 deletions.
19 changes: 19 additions & 0 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,17 @@ Status Compaction::do_compaction_impl(int64_t permits) {
// The test results show that merger is low-memory-footprint, there is no need to tracker its mem pool
Merger::Statistics stats;
Status res;
TabletReader::ReaderParams reader_params;
reader_params.tablet = _tablet;
reader_params.reader_type = compaction_type();
reader_params.rs_readers = _input_rs_readers;
reader_params.version = _output_rs_writer->version();
if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
_tablet->speed_up_unique_key_with_aux_index()) {
stats.rowid_conversion = &_rowid_conversion;
reader_params.record_rowids = true;
reader_params.delete_bitmap = &_tablet->tablet_meta()->delete_bitmap();
}

if (use_vectorized_compaction) {
res = Merger::vmerge_rowsets(_tablet, compaction_type(), &cur_tablet_schema,
Expand Down Expand Up @@ -242,6 +253,14 @@ Status Compaction::modify_rowsets() {
std::vector<RowsetSharedPtr> output_rowsets;
output_rowsets.push_back(_output_rowset);
std::lock_guard<std::shared_mutex> wrlock(_tablet->get_header_lock());

// update dst rowset delete bitmap
if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
_tablet->speed_up_unique_key_with_aux_index()) {
_tablet->tablet_meta()->update_delete_bitmap(_input_rowsets, _output_rs_writer->version(),
_rowid_conversion);
}

RETURN_NOT_OK(_tablet->modify_rowsets(output_rowsets, _input_rowsets, true));
_tablet->save_meta();
return Status::OK();
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ class Compaction {

int64_t _oldest_write_timestamp;
int64_t _newest_write_timestamp;
RowIdConversion _rowid_conversion;

DISALLOW_COPY_AND_ASSIGN(Compaction);
};
Expand Down
6 changes: 6 additions & 0 deletions be/src/olap/iterators.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "olap/column_predicate.h"
#include "olap/olap_common.h"
#include "olap/tablet_schema.h"
#include "olap/rowid_conversion.h"
#include "vec/core/block.h"

namespace doris {
Expand Down Expand Up @@ -86,6 +87,7 @@ class StorageReadOptions {
int block_row_max = 4096;

const TabletSchema* tablet_schema = nullptr;
bool record_rowids = false;
};

// Used to read data in RowBlockV2 one by one
Expand Down Expand Up @@ -113,6 +115,10 @@ class RowwiseIterator {
return Status::NotSupported("to be implemented");
}

virtual Status current_batch_segment_rowids(std::vector<SegmentRowId>* block_segment_rowids) {
return Status::NotSupported("to be implemented");
}

// return schema for this Iterator
virtual const Schema& schema() const = 0;

Expand Down
23 changes: 23 additions & 0 deletions be/src/olap/merger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ Status Merger::vmerge_rowsets(TabletSharedPtr tablet, ReaderType reader_type,
reader_params.tablet_schema = cur_tablet_schema;

const auto& schema = *cur_tablet_schema;
// rows id conversion can works in UNIQUE_KEYS and DUP_KEYS model
// only UNIQUE_KEYS model need rows id conversion
reader_params.record_rowids = tablet->keys_type() == KeysType::UNIQUE_KEYS;

reader_params.return_columns.resize(schema.num_columns());
std::iota(reader_params.return_columns.begin(), reader_params.return_columns.end(), 0);
reader_params.origin_return_columns = &reader_params.return_columns;
Expand All @@ -113,6 +117,7 @@ Status Merger::vmerge_rowsets(TabletSharedPtr tablet, ReaderType reader_type,
vectorized::Block block = schema.create_block(reader_params.return_columns);
size_t output_rows = 0;
bool eof = false;
RowIdConversion rowid_conversion;
while (!eof) {
// Read one block from block reader
RETURN_NOT_OK_LOG(
Expand All @@ -121,6 +126,11 @@ Status Merger::vmerge_rowsets(TabletSharedPtr tablet, ReaderType reader_type,
RETURN_NOT_OK_LOG(
dst_rowset_writer->add_block(&block),
"failed to write block when merging rowsets of tablet " + tablet->full_name());

if (reader_params.record_rowids && block.rows() > 0) {
stats_output->rowid_conversion->add(reader.current_block_row_locations());
}

output_rows += block.rows();
block.clear_column_data();
}
Expand All @@ -135,6 +145,19 @@ Status Merger::vmerge_rowsets(TabletSharedPtr tablet, ReaderType reader_type,
dst_rowset_writer->flush(),
"failed to flush rowset when merging rowsets of tablet " + tablet->full_name());

if (reader_params.record_rowids) {
// rowid_conversion set segment rows number of destination rowset
std::vector<uint32_t> segment_num_rows;
RETURN_NOT_OK(dst_rowset_writer->get_segment_num_rows(&segment_num_rows));
rowid_conversion.set_dst_segment_num_rows(dst_rowset_writer->rowset_id(), segment_num_rows);

// get row id conversion here for delete bitmap
// use exmaple:
// RowsetSegmentRowId src(rowset_id, segment_id, row_id);
// RowsetSegmentRowId dst;
// rowid_conversion.get(src, &dst);
}

return Status::OK();
}

Expand Down
1 change: 1 addition & 0 deletions be/src/olap/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ Status TabletReader::_capture_rs_readers(const ReaderParams& read_params,
_reader_context.batch_size = _batch_size;
_reader_context.is_unique = tablet()->keys_type() == UNIQUE_KEYS;
_reader_context.merged_rows = &_merged_rows;
_reader_context.record_rowids = read_params.record_rowids;

*valid_rs_readers = *rs_readers;

Expand Down
3 changes: 3 additions & 0 deletions be/src/olap/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ class TabletReader {
std::vector<uint32_t>* origin_return_columns = nullptr;
std::unordered_set<uint32_t>* tablet_columns_convert_to_null_set = nullptr;

// used for comapction to record row ids
bool record_rowids = false;

void check_validation() const;

std::string to_string() const;
Expand Down
111 changes: 111 additions & 0 deletions be/src/olap/rowid_conversion.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <map>
#include <vector>

#include "olap/olap_common.h"

namespace doris {

class RowIdConversion {
public:
RowIdConversion() = default;
~RowIdConversion() = default;

// resize segment map to its rows num
void init_segment_map(const RowsetId& src_rowset_id, const std::vector<uint32_t>& num_rows) {
for (size_t i = 0; i < num_rows.size(); i++) {
_segments_rowid_map[{src_rowset_id, i}].resize(num_rows[i], UINT64_MAX);
}
}

// add row id to the map
void add(const std::vector<RowLocation>& rss_row_ids) {
for (auto& item : rss_row_ids) {
_segments_rowid_map[{item.rowset_id, item.segment_id}][item.row_id] =
_dst_rowset_num_rows;
_dst_rowset_num_rows++;
}
}

// get destination RowLocation
// return non-zero if the src RowLocation does not exist
int get(const RowLocation& src, RowLocation* dst) const {
auto iter = _segments_rowid_map.find({src.rowset_id, src.segment_id});
if (iter == _segments_rowid_map.end()) {
return -1;
}
const RowIdMap& rowid_map = iter->second;
if (src.row_id >= rowid_map.size()) {
return -1;
}
uint64_t dst_rowset_rowid = rowid_map[src.row_id];
if (dst_rowset_rowid == UINT64_MAX) {
return -1;
}

dst->rowset_id = _dst_rowst_id;
// get dst segment id and row id
for (auto i = 0; i < _dst_rowset_segment_cumu_num_rows.size(); i++) {
if (dst_rowset_rowid < _dst_rowset_segment_cumu_num_rows[i]) {
dst->segment_id = i;
if (i == 0) {
dst->row_id = dst_rowset_rowid;
} else {
dst->row_id = dst_rowset_rowid - _dst_rowset_segment_cumu_num_rows[i - 1];
}
break;
}
}
}

// set segment rows number of destination rowset
// record cumulative value
void set_dst_segment_num_rows(const RowsetId& dst_rowset_rowid,
const std::vector<uint32_t>& segment_num_rows) {
_dst_rowst_id = dst_rowset_rowid;
uint64_t sum = 0;
for (auto num_rows : segment_num_rows) {
sum += num_rows;
_dst_rowset_segment_cumu_num_rows.push_back(sum);
}
DCHECK_EQ(sum, _dst_rowset_num_rows);
}

private:
using RowId = uint32_t;
using SegmentId = uint32_t;
// key: vector index indicates row id of source segment, value: row id of destination rowset
// UINT64_MAX indicates not exist
using RowIdMap = std::vector<uint64_t>;
// key: src segment
using SegmentsRowIdMap = std::map<std::pair<RowsetId, SegmentId>, RowIdMap>;

SegmentsRowIdMap _segments_rowid_map;

// segement rows number of destination rowset
std::vector<uint64_t> _dst_rowset_segment_cumu_num_rows;

RowsetId _dst_rowst_id;
// rows number of destination rowset
uint64_t _dst_rowset_num_rows = 0;
};

} // namespace doris
1 change: 1 addition & 0 deletions be/src/olap/rowset/beta_rowset_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ Status BetaRowsetReader::init(RowsetReaderContext* read_context) {
}
read_options.use_page_cache = read_context->use_page_cache;
read_options.tablet_schema = read_context->tablet_schema;
read_options.record_rowids = read_context->record_rowids;

// load segments
RETURN_NOT_OK(SegmentLoader::instance()->load_segments(
Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/rowset/beta_rowset_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ class BetaRowsetReader : public RowsetReader {

RowsetTypePB type() const override { return RowsetTypePB::BETA_ROWSET; }

Status current_batch_segment_rowids(std::vector<SegmentRowId>* block_segment_rowids) override {
return _iterator->current_batch_segment_rowids(block_segment_rowids);
}

private:
bool _should_push_down_value_predicates() const;

Expand Down
1 change: 1 addition & 0 deletions be/src/olap/rowset/beta_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ Status BetaRowsetWriter::_create_segment_writer(
}

Status BetaRowsetWriter::_flush_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>* writer) {
_segment_num_rows.push_back((*writer)->num_rows_written());
if ((*writer)->num_rows_written() == 0) {
return Status::OK();
}
Expand Down
8 changes: 8 additions & 0 deletions be/src/olap/rowset/beta_rowset_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ class BetaRowsetWriter : public RowsetWriter {

RowsetTypePB type() const override { return RowsetTypePB::BETA_ROWSET; }

Status get_segment_num_rows(std::vector<uint32_t>* segment_num_rows) const {
*segment_num_rows = _segment_num_rows;
return Status::OK();
}

private:
template <typename RowType>
Status _add_row(const RowType& row);
Expand Down Expand Up @@ -95,6 +100,9 @@ class BetaRowsetWriter : public RowsetWriter {

bool _is_pending = false;
bool _already_built = false;

// record rows number of every segment
std::vector<uint32_t> _segment_num_rows;
};

} // namespace doris
4 changes: 4 additions & 0 deletions be/src/olap/rowset/rowset_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <unordered_map>

#include "gen_cpp/olap_file.pb.h"
#include "olap/rowid_conversion.h"
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_reader_context.h"
#include "vec/core/block.h"
Expand Down Expand Up @@ -64,6 +65,9 @@ class RowsetReader {

virtual int64_t oldest_write_timestamp() = 0;
virtual int64_t newest_write_timestamp() = 0;
virtual Status current_batch_segment_rowids(std::vector<SegmentRowId>* block_segment_rowids) {
return Status::NotSupported("to be implemented");
}
};

} // namespace doris
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/rowset/rowset_reader_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ struct RowsetReaderContext {
bool is_unique = false;
//record row num merged in generic iterator
uint64_t* merged_rows = nullptr;
bool record_rowids = false;
};

} // namespace doris
Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/rowset/rowset_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ class RowsetWriter {

virtual RowsetTypePB type() const = 0;

virtual Status get_segment_num_rows(std::vector<uint32_t>* segment_num_rows) const {
return Status::NotSupported("to be implemented");
}

private:
DISALLOW_COPY_AND_ASSIGN(RowsetWriter);
};
Expand Down
26 changes: 20 additions & 6 deletions be/src/olap/rowset/segment_v2/segment_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -972,7 +972,7 @@ Status SegmentIterator::next_batch(vectorized::Block* block) {
if (UNLIKELY(!_inited)) {
RETURN_IF_ERROR(_init(true));
_inited = true;
if (_lazy_materialization_read) {
if (_lazy_materialization_read || _opts.record_rowids) {
_block_rowids.resize(_opts.block_row_max);
}
_current_return_columns.resize(_schema.columns().size());
Expand Down Expand Up @@ -1000,18 +1000,19 @@ Status SegmentIterator::next_batch(vectorized::Block* block) {

_init_current_block(block, _current_return_columns);

uint32_t nrows_read = 0;
_current_batch_rows_read = 0;
uint32_t nrows_read_limit = _opts.block_row_max;
if (UNLIKELY(_estimate_row_size)) {
// read 100 rows to estimate average row size
nrows_read_limit = 100;
}
_read_columns_by_index(nrows_read_limit, nrows_read, _lazy_materialization_read);
_read_columns_by_index(nrows_read_limit, _current_batch_rows_read,
_lazy_materialization_read || _opts.record_rowids);

_opts.stats->blocks_load += 1;
_opts.stats->raw_rows_read += nrows_read;
_opts.stats->raw_rows_read += _current_batch_rows_read;

if (nrows_read == 0) {
if (_current_batch_rows_read == 0) {
for (int i = 0; i < block->columns(); i++) {
auto cid = _schema.column_id(i);
// todo(wb) abstract make column where
Expand All @@ -1027,7 +1028,7 @@ Status SegmentIterator::next_batch(vectorized::Block* block) {
_output_non_pred_columns(block);
} else {
_convert_dict_code_for_predicate_if_necessary();
uint16_t selected_size = nrows_read;
uint16_t selected_size = _current_batch_rows_read;
uint16_t sel_rowid_idx[selected_size];

// step 1: evaluate vectorization predicate
Expand Down Expand Up @@ -1088,5 +1089,18 @@ void SegmentIterator::_update_max_row(const vectorized::Block* block) {
_opts.block_row_max = std::min(block_row_max, _opts.block_row_max);
}

// used for comapction, so no need to process predicate column
Status SegmentIterator::current_batch_segment_rowids(
std::vector<SegmentRowId>* block_segment_rowids) {
DCHECK(_opts.record_rowids);
DCHECK_GE(_block_rowids.size(), _current_batch_rows_read);
block_segment_rowids->resize(_current_batch_rows_read);
uint32_t sid = segment_id();
for (auto i = 0; i < _current_batch_rows_read; i++) {
(*block_segment_rowids)[i] = SegmentRowId(sid, _block_rowids[i]);
}
return Status::OK();
}

} // namespace segment_v2
} // namespace doris
Loading

0 comments on commit 06c1972

Please sign in to comment.