Skip to content

Commit

Permalink
[fix](merge-on-write) segcompaction should process delete bitmap if n…
Browse files Browse the repository at this point in the history
…ecessary (apache#38369) (apache#39707)

Issue Number: close #xxx

cherry-pick apache#38369 and apache#38800
  • Loading branch information
zhannngchen committed Aug 22, 2024
1 parent 7e52a1a commit 28143ff
Show file tree
Hide file tree
Showing 8 changed files with 1,125 additions and 15 deletions.
19 changes: 9 additions & 10 deletions be/src/olap/merger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -283,16 +283,12 @@ Status Merger::vertical_compact_one_group(
}

// for segcompaction
Status Merger::vertical_compact_one_group(TabletSharedPtr tablet, ReaderType reader_type,
TabletSchemaSPtr tablet_schema, bool is_key,
const std::vector<uint32_t>& column_group,
vectorized::RowSourcesBuffer* row_source_buf,
vectorized::VerticalBlockReader& src_block_reader,
segment_v2::SegmentWriter& dst_segment_writer,
int64_t max_rows_per_segment, Statistics* stats_output,
uint64_t* index_size, KeyBoundsPB& key_bounds) {
// build tablet reader
VLOG_NOTICE << "vertical compact one group, max_rows_per_segment=" << max_rows_per_segment;
Status Merger::vertical_compact_one_group(
TabletSharedPtr tablet, ReaderType reader_type, TabletSchemaSPtr tablet_schema, bool is_key,
const std::vector<uint32_t>& column_group, vectorized::RowSourcesBuffer* row_source_buf,
vectorized::VerticalBlockReader& src_block_reader,
segment_v2::SegmentWriter& dst_segment_writer, Statistics* stats_output,
uint64_t* index_size, KeyBoundsPB& key_bounds, SimpleRowIdConversion* rowid_conversion) {
// TODO: record_rowids
vectorized::Block block = tablet_schema->create_block(column_group);
size_t output_rows = 0;
Expand All @@ -309,6 +305,9 @@ Status Merger::vertical_compact_one_group(TabletSharedPtr tablet, ReaderType rea
dst_segment_writer.append_block(&block, 0, block.rows()),
"failed to write block when merging rowsets of tablet " + tablet->full_name());

if (is_key && rowid_conversion != nullptr) {
rowid_conversion->add(src_block_reader.current_block_row_locations());
}
output_rows += block.rows();
block.clear_column_data();
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/merger.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "common/status.h"
#include "io/io_common.h"
#include "olap/rowset/rowset_reader.h"
#include "olap/simple_rowid_conversion.h"
#include "olap/tablet.h"
#include "olap/tablet_schema.h"

Expand Down Expand Up @@ -84,8 +85,7 @@ class Merger {
vectorized::RowSourcesBuffer* row_source_buf,
vectorized::VerticalBlockReader& src_block_reader,
segment_v2::SegmentWriter& dst_segment_writer,
int64_t max_rows_per_segment, Statistics* stats_output,
uint64_t* index_size, KeyBoundsPB& key_bounds);
Statistics* stats_output, uint64_t* index_size, KeyBoundsPB& key_bounds, SimpleRowIdConversion* rowid_conversion);
};

} // namespace doris
33 changes: 32 additions & 1 deletion be/src/olap/rowset/beta_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,12 @@ Status BetaRowsetWriter::_find_longest_consecutive_small_segment(
if (is_large_segment) {
if (segid == _segcompacted_point) {
// skip large segments at the front
auto dst_seg_id = _num_segcompacted.load();
RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++));
if (_segcompaction_worker->need_convert_delete_bitmap()) {
_segcompaction_worker->convert_segment_delete_bitmap(
_context.mow_context->delete_bitmap, segid, dst_seg_id);
}
continue;
} else {
// stop because we need consecutive segments
Expand All @@ -216,7 +221,13 @@ Status BetaRowsetWriter::_find_longest_consecutive_small_segment(
}
if (s == 1) { // poor bachelor, let it go
VLOG_DEBUG << "only one candidate segment";
auto src_seg_id = _segcompacted_point.load();
auto dst_seg_id = _num_segcompacted.load();
RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++));
if (_segcompaction_worker->need_convert_delete_bitmap()) {
_segcompaction_worker->convert_segment_delete_bitmap(
_context.mow_context->delete_bitmap, src_seg_id, dst_seg_id);
}
segments->clear();
return Status::OK();
}
Expand Down Expand Up @@ -374,15 +385,20 @@ Status BetaRowsetWriter::_segcompaction_rename_last_segments() {
return Status::Error<SEGCOMPACTION_FAILED>(
"BetaRowsetWriter::_segcompaction_rename_last_segments meet invalid state");
}
if (!_is_segcompacted() || _segcompacted_point == _num_segment) {
if (!is_segcompacted() || _segcompacted_point == _num_segment) {
// no need if never segcompact before or all segcompacted
return Status::OK();
}
// currently we only rename remaining segments to reduce wait time
// so that transaction can be committed ASAP
VLOG_DEBUG << "segcompaction last few segments";
for (int32_t segid = _segcompacted_point; segid < _num_segment; segid++) {
auto dst_segid = _num_segcompacted.load();
RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++));
if (_segcompaction_worker->need_convert_delete_bitmap()) {
_segcompaction_worker->convert_segment_delete_bitmap(
_context.mow_context->delete_bitmap, segid, dst_segid);
}
}
return Status::OK();
}
Expand Down Expand Up @@ -543,6 +559,21 @@ Status BetaRowsetWriter::build(RowsetSharedPtr& rowset) {
RETURN_NOT_OK_STATUS_WITH_WARN(_segcompaction_worker.get_file_writer()->close(),
"close segment compaction worker failed");
}
// process delete bitmap for mow table
if (is_segcompacted() && _segcompaction_worker->need_convert_delete_bitmap()) {
auto converted_delete_bitmap = _segcompaction_worker->get_converted_delete_bitmap();
// which means the segment compaction is triggerd
if (converted_delete_bitmap != nullptr) {
RowsetIdUnorderedSet rowsetids;
rowsetids.insert(rowset_id());
auto tablet = static_cast<Tablet*>(_context.tablet.get());
tablet->add_sentinel_mark_to_delete_bitmap(converted_delete_bitmap.get(),
rowsetids);
context().mow_context->delete_bitmap->remove({rowset_id(), 0, 0},
{rowset_id(), UINT32_MAX, INT64_MAX});
context().mow_context->delete_bitmap->merge(*converted_delete_bitmap);
}
}
}
// When building a rowset, we must ensure that the current _segment_writer has been
// flushed, that is, the current _segment_writer is nullptr
Expand Down
78 changes: 76 additions & 2 deletions be/src/olap/rowset/segcompaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,14 @@ Status SegcompactionWorker::_get_segcompaction_reader(
std::vector<uint32_t>& return_columns,
std::unique_ptr<vectorized::VerticalBlockReader>* reader) {
auto ctx = _writer->_context;
bool record_rowids = need_convert_delete_bitmap() && is_key;
StorageReadOptions read_options;
read_options.stats = stat;
read_options.use_page_cache = false;
read_options.tablet_schema = ctx.tablet_schema;
read_options.record_rowids = record_rowids;
std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators;
std::map<uint32_t, uint32_t> segment_rows;
for (auto& seg_ptr : *segments) {
std::unique_ptr<RowwiseIterator> iter;
auto s = seg_ptr->new_iterator(schema, read_options, &iter);
Expand All @@ -88,6 +91,10 @@ Status SegcompactionWorker::_get_segcompaction_reader(
s.to_string());
}
seg_iterators.push_back(std::move(iter));
segment_rows.emplace(seg_ptr->id(), seg_ptr->num_rows());
}
if (record_rowids && _rowid_conversion != nullptr) {
_rowid_conversion->reset_segment_map(segment_rows);
}

*reader = std::unique_ptr<vectorized::VerticalBlockReader> {
Expand All @@ -102,6 +109,7 @@ Status SegcompactionWorker::_get_segcompaction_reader(
reader_params.return_columns = return_columns;
reader_params.is_key_column_group = is_key;
reader_params.use_page_cache = false;
reader_params.record_rowids = record_rowids;
return (*reader)->init(reader_params);
}

Expand Down Expand Up @@ -219,6 +227,9 @@ Status SegcompactionWorker::_do_compact_segments(SegCompactionCandidatesSharedPt

DCHECK(ctx.tablet);
auto tablet = ctx.tablet;
if (need_convert_delete_bitmap() && _rowid_conversion == nullptr) {
_rowid_conversion = std::make_unique<SimpleRowIdConversion>(_writer->rowset_id());
}

std::vector<std::vector<uint32_t>> column_groups;
Merger::vertical_split_columns(ctx.tablet_schema, &column_groups);
Expand Down Expand Up @@ -248,8 +259,8 @@ Status SegcompactionWorker::_do_compact_segments(SegCompactionCandidatesSharedPt
Merger::Statistics merger_stats;
RETURN_IF_ERROR(Merger::vertical_compact_one_group(
tablet, ReaderType::READER_SEGMENT_COMPACTION, ctx.tablet_schema, is_key,
column_ids, &row_sources_buf, *reader, *writer, INT_MAX, &merger_stats, &index_size,
key_bounds));
column_ids, &row_sources_buf, *reader, *writer, &merger_stats, &index_size,
key_bounds, _rowid_conversion.get()));
total_index_size += index_size;
if (is_key) {
row_sources_buf.flush();
Expand All @@ -275,6 +286,10 @@ Status SegcompactionWorker::_do_compact_segments(SegCompactionCandidatesSharedPt
}

RETURN_IF_ERROR(_delete_original_segments(begin, end));
if (_rowid_conversion != nullptr) {
convert_segment_delete_bitmap(ctx.mow_context->delete_bitmap, begin, end,
_writer->_num_segcompacted);
}
RETURN_IF_ERROR(_writer->_rename_compacted_segments(begin, end));

if (VLOG_DEBUG_IS_ON) {
Expand Down Expand Up @@ -332,4 +347,63 @@ void SegcompactionWorker::compact_segments(SegCompactionCandidatesSharedPtr segm
}
}

bool SegcompactionWorker::need_convert_delete_bitmap() {
if (_writer == nullptr) {
return false;
}
auto tablet = _writer->context().tablet;
return tablet != nullptr && tablet->keys_type() == KeysType::UNIQUE_KEYS &&
tablet->enable_unique_key_merge_on_write() &&
tablet->tablet_schema()->has_sequence_col();
}

void SegcompactionWorker::convert_segment_delete_bitmap(DeleteBitmapPtr src_delete_bitmap,
uint32_t src_seg_id, uint32_t dest_seg_id) {
// lazy init
if (nullptr == _converted_delete_bitmap) {
_converted_delete_bitmap = std::make_shared<DeleteBitmap>(_writer->context().tablet_id);
}
auto rowset_id = _writer->context().rowset_id;
const auto* seg_map =
src_delete_bitmap->get({rowset_id, src_seg_id, DeleteBitmap::TEMP_VERSION_COMMON});
if (seg_map != nullptr) {
_converted_delete_bitmap->set({rowset_id, dest_seg_id, DeleteBitmap::TEMP_VERSION_COMMON},
*seg_map);
}
}

void SegcompactionWorker::convert_segment_delete_bitmap(DeleteBitmapPtr src_delete_bitmap,
uint32_t src_begin, uint32_t src_end,
uint32_t dst_seg_id) {
// lazy init
if (nullptr == _converted_delete_bitmap) {
_converted_delete_bitmap = std::make_shared<DeleteBitmap>(_writer->context().tablet_id);
}
auto rowset_id = _writer->context().rowset_id;
RowLocation src(rowset_id, 0, 0);
for (uint32_t seg_id = src_begin; seg_id <= src_end; seg_id++) {
const auto* seg_map =
src_delete_bitmap->get({rowset_id, seg_id, DeleteBitmap::TEMP_VERSION_COMMON});
if (!seg_map) {
continue;
}
src.segment_id = seg_id;
for (unsigned int row_id : *seg_map) {
src.row_id = row_id;
auto dst_row_id = _rowid_conversion->get(src);
if (dst_row_id < 0) {
continue;
}
_converted_delete_bitmap->add(
{rowset_id, dst_seg_id, DeleteBitmap::TEMP_VERSION_COMMON}, dst_row_id);
}
}
}

bool SegcompactionWorker::cancel() {
// return true if the task is canncellable (actual compaction is not started)
// return false when the task is not cancellable (it is in the middle of segcompaction)
return _is_compacting_state_mutable.exchange(false);
}

} // namespace doris
13 changes: 13 additions & 0 deletions be/src/olap/rowset/segcompaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "common/status.h"
#include "io/fs/file_reader_writer_fwd.h"
#include "olap/merger.h"
#include "olap/simple_rowid_conversion.h"
#include "olap/tablet.h"
#include "segment_v2/segment.h"

Expand Down Expand Up @@ -52,6 +53,14 @@ class SegcompactionWorker {

void compact_segments(SegCompactionCandidatesSharedPtr segments);

bool need_convert_delete_bitmap();

void convert_segment_delete_bitmap(DeleteBitmapPtr src_delete_bitmap, uint32_t src_seg_id,
uint32_t dest_seg_id);
void convert_segment_delete_bitmap(DeleteBitmapPtr src_delete_bitmap, uint32_t src_begin,
uint32_t src_end, uint32_t dest_seg_id);
DeleteBitmapPtr get_converted_delete_bitmap() { return _converted_delete_bitmap; }

io::FileWriterPtr& get_file_writer() { return _file_writer; }

// set the cancel flag, tasks already started will not be cancelled.
Expand All @@ -78,5 +87,9 @@ class SegcompactionWorker {
BetaRowsetWriter* _writer;
io::FileWriterPtr _file_writer;
std::atomic<bool> _cancelled = false;

// for unique key mow table
std::unique_ptr<SimpleRowIdConversion> _rowid_conversion;
DeleteBitmapPtr _converted_delete_bitmap;
};
} // namespace doris
84 changes: 84 additions & 0 deletions be/src/olap/simple_rowid_conversion.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <map>
#include <vector>

#include "olap/olap_common.h"
#include "olap/utils.h"

namespace doris {

// Simple verion of rowid conversion, for segcompaction
// convert rows from several segments to rows in 1 segment
class SimpleRowIdConversion {
public:
SimpleRowIdConversion(const RowsetId& rowset_id) : _rowst_id(rowset_id) {};
~SimpleRowIdConversion() = default;

// resize segment rowid map to its rows num
void reset_segment_map(const std::map<uint32_t, uint32_t>& num_rows) {
_cur_dst_segment_rowid = 0;
for (auto seg_rows : num_rows) {
_segments_rowid_map.emplace(seg_rows.first,
std::vector<uint32_t>(seg_rows.second, UINT32_MAX));
}
}

// add row id to the map
void add(const std::vector<RowLocation>& rss_row_ids) {
for (auto& item : rss_row_ids) {
if (item.row_id == -1) {
continue;
}
DCHECK(_segments_rowid_map.find(item.segment_id) != _segments_rowid_map.end() &&
_segments_rowid_map[item.segment_id].size() > item.row_id);
_segments_rowid_map[item.segment_id][item.row_id] = _cur_dst_segment_rowid++;
}
}

// get destination RowLocation
// return non-zero if the src RowLocation does not exist
int get(const RowLocation& src) const {
auto it = _segments_rowid_map.find(src.segment_id);
if (it == _segments_rowid_map.end()) {
return -1;
}
const auto& rowid_map = it->second;
if (src.row_id >= rowid_map.size() || UINT32_MAX == rowid_map[src.row_id]) {
return -1;
}

return rowid_map[src.row_id];
}

private:
// key: index indicates src segment.
// value: index indicates row id of source segment, value indicates row id of destination
// segment. UINT32_MAX indicates current row not exist.
std::map<uint32_t, std::vector<uint32_t>> _segments_rowid_map;

// dst rowset id
RowsetId _rowst_id;

// current rowid of dst segment
std::uint32_t _cur_dst_segment_rowid = 0;
};

} // namespace doris
1 change: 1 addition & 0 deletions be/src/vec/olap/vertical_block_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ Status VerticalBlockReader::_init_collect_iter(const ReaderParams& read_params)
_reader_context.need_ordered_result = true; // TODO: should it be?
_reader_context.is_unique = tablet()->keys_type() == UNIQUE_KEYS;
_reader_context.is_key_column_group = read_params.is_key_column_group;
_reader_context.record_rowids = read_params.record_rowids;
}

// build heap if key column iterator or build vertical merge iterator if value column
Expand Down
Loading

0 comments on commit 28143ff

Please sign in to comment.