Skip to content

Commit

Permalink
[fix](merge-on-write) segcompaction should process delete bitmap if n…
Browse files Browse the repository at this point in the history
…ecessary (apache#38369)

Issue Number: close #xxx

When loading data to a unique key table with sequence column, some data
in current load job might be marked as delete due to a lower sequence
value.
If there's many segments in such load job, segcompaction might be
triggered, which don't process the delete bitmap currently, will cause
data correctness issue

For example:
1. we have 4 segments in current load job initially, and due to seq
column, some rows are marked as deleted
2. after segcompaction, if we don't process the delete bitmap, it's
content is still corresponding to the old segment layout, and row
7,14,15 is not mark deleted correctly on new generated segment 1.
3. in this PR, we convert old delete bitmap to fit new segment layout,
it use similar way as base/cumulative compaction to convert delete
bitmaps on old layout to new one, but the rowid conversion is simpler

![whiteboard_exported_image-2](https://github.com/user-attachments/assets/a419b6a4-e583-457a-bf4e-56d9bd2a3544)
  • Loading branch information
zhannngchen committed Aug 21, 2024
1 parent 610f694 commit 115393e
Show file tree
Hide file tree
Showing 10 changed files with 1,100 additions and 19 deletions.
19 changes: 9 additions & 10 deletions be/src/olap/merger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -307,16 +307,12 @@ Status Merger::vertical_compact_one_group(
}

// for segcompaction
Status Merger::vertical_compact_one_group(TabletSharedPtr tablet, ReaderType reader_type,
TabletSchemaSPtr tablet_schema, bool is_key,
const std::vector<uint32_t>& column_group,
vectorized::RowSourcesBuffer* row_source_buf,
vectorized::VerticalBlockReader& src_block_reader,
segment_v2::SegmentWriter& dst_segment_writer,
int64_t max_rows_per_segment, Statistics* stats_output,
uint64_t* index_size, KeyBoundsPB& key_bounds) {
// build tablet reader
VLOG_NOTICE << "vertical compact one group, max_rows_per_segment=" << max_rows_per_segment;
Status Merger::vertical_compact_one_group(
TabletSharedPtr tablet, ReaderType reader_type, TabletSchemaSPtr tablet_schema, bool is_key,
const std::vector<uint32_t>& column_group, vectorized::RowSourcesBuffer* row_source_buf,
vectorized::VerticalBlockReader& src_block_reader,
segment_v2::SegmentWriter& dst_segment_writer, Statistics* stats_output,
uint64_t* index_size, KeyBoundsPB& key_bounds, SimpleRowIdConversion* rowid_conversion) {
// TODO: record_rowids
vectorized::Block block = tablet_schema->create_block(column_group);
size_t output_rows = 0;
Expand All @@ -333,6 +329,9 @@ Status Merger::vertical_compact_one_group(TabletSharedPtr tablet, ReaderType rea
"failed to write block when merging rowsets of tablet " +
std::to_string(tablet->tablet_id()));

if (is_key && rowid_conversion != nullptr) {
rowid_conversion->add(src_block_reader.current_block_row_locations());
}
output_rows += block.rows();
block.clear_column_data();
}
Expand Down
6 changes: 4 additions & 2 deletions be/src/olap/merger.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "io/io_common.h"
#include "olap/iterators.h"
#include "olap/rowset/rowset_reader.h"
#include "olap/simple_rowid_conversion.h"
#include "olap/tablet.h"
#include "olap/tablet_schema.h"

Expand Down Expand Up @@ -86,8 +87,9 @@ class Merger {
vectorized::RowSourcesBuffer* row_source_buf,
vectorized::VerticalBlockReader& src_block_reader,
segment_v2::SegmentWriter& dst_segment_writer,
int64_t max_rows_per_segment, Statistics* stats_output,
uint64_t* index_size, KeyBoundsPB& key_bounds);
Statistics* stats_output, uint64_t* index_size,
KeyBoundsPB& key_bounds,
SimpleRowIdConversion* rowid_conversion);

// for mow with cluster key table, the key group also contains cluster key columns.
// the `key_group_cluster_key_idxes` marks the positions of cluster key columns in key group.
Expand Down
34 changes: 32 additions & 2 deletions be/src/olap/rowset/beta_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,12 @@ Status BetaRowsetWriter::_find_longest_consecutive_small_segment(
if (is_large_segment) {
if (segid == _segcompacted_point) {
// skip large segments at the front
auto dst_seg_id = _num_segcompacted.load();
RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++));
if (_segcompaction_worker->need_convert_delete_bitmap()) {
_segcompaction_worker->convert_segment_delete_bitmap(
_context.mow_context->delete_bitmap, segid, dst_seg_id);
}
continue;
} else {
// stop because we need consecutive segments
Expand All @@ -270,7 +275,13 @@ Status BetaRowsetWriter::_find_longest_consecutive_small_segment(
}
if (s == 1) { // poor bachelor, let it go
VLOG_DEBUG << "only one candidate segment";
auto src_seg_id = _segcompacted_point.load();
auto dst_seg_id = _num_segcompacted.load();
RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++));
if (_segcompaction_worker->need_convert_delete_bitmap()) {
_segcompaction_worker->convert_segment_delete_bitmap(
_context.mow_context->delete_bitmap, src_seg_id, dst_seg_id);
}
segments->clear();
return Status::OK();
}
Expand Down Expand Up @@ -454,15 +465,20 @@ Status BetaRowsetWriter::_segcompaction_rename_last_segments() {
"code: {}",
_segcompaction_status.load());
}
if (!_is_segcompacted() || _segcompacted_point == _num_segment) {
if (!is_segcompacted() || _segcompacted_point == _num_segment) {
// no need if never segcompact before or all segcompacted
return Status::OK();
}
// currently we only rename remaining segments to reduce wait time
// so that transaction can be committed ASAP
VLOG_DEBUG << "segcompaction last few segments";
for (int32_t segid = _segcompacted_point; segid < _num_segment; segid++) {
auto dst_segid = _num_segcompacted.load();
RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++));
if (_segcompaction_worker->need_convert_delete_bitmap()) {
_segcompaction_worker->convert_segment_delete_bitmap(
_context.mow_context->delete_bitmap, segid, dst_segid);
}
}
return Status::OK();
}
Expand Down Expand Up @@ -584,6 +600,20 @@ Status BetaRowsetWriter::_close_file_writers() {
RETURN_NOT_OK_STATUS_WITH_WARN(_segcompaction_worker->get_file_writer()->close(),
"close segment compaction worker failed");
}
// process delete bitmap for mow table
if (is_segcompacted() && _segcompaction_worker->need_convert_delete_bitmap()) {
auto converted_delete_bitmap = _segcompaction_worker->get_converted_delete_bitmap();
// which means the segment compaction is triggerd
if (converted_delete_bitmap != nullptr) {
RowsetIdUnorderedSet rowsetids;
rowsetids.insert(rowset_id());
context().tablet->add_sentinel_mark_to_delete_bitmap(converted_delete_bitmap.get(),
rowsetids);
context().mow_context->delete_bitmap->remove({rowset_id(), 0, 0},
{rowset_id(), UINT32_MAX, INT64_MAX});
context().mow_context->delete_bitmap->merge(*converted_delete_bitmap);
}
}
}
return Status::OK();
}
Expand Down Expand Up @@ -616,7 +646,7 @@ int64_t BaseBetaRowsetWriter::_num_seg() const {
}

int64_t BetaRowsetWriter::_num_seg() const {
return _is_segcompacted() ? _num_segcompacted : _num_segment;
return is_segcompacted() ? _num_segcompacted : _num_segment;
}

// update tablet schema when meet variant columns, before commit_txn
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/rowset/beta_rowset_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,8 @@ class BetaRowsetWriter : public BaseBetaRowsetWriter {
std::unique_ptr<segment_v2::SegmentWriter>* writer, uint64_t index_size,
KeyBoundsPB& key_bounds);

bool is_segcompacted() const { return _num_segcompacted > 0; }

private:
Status _generate_delete_bitmap(int32_t segment_id) override;

Expand All @@ -220,7 +222,6 @@ class BetaRowsetWriter : public BaseBetaRowsetWriter {
Status _segcompaction_rename_last_segments();
Status _load_noncompacted_segment(segment_v2::SegmentSharedPtr& segment, int32_t segment_id);
Status _find_longest_consecutive_small_segment(SegCompactionCandidatesSharedPtr& segments);
bool _is_segcompacted() const { return _num_segcompacted > 0; }
bool _check_and_set_is_doing_segcompaction();
Status _rename_compacted_segments(int64_t begin, int64_t end);
Status _rename_compacted_segment_plain(uint64_t seg_id);
Expand Down
67 changes: 65 additions & 2 deletions be/src/olap/rowset/segcompaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,14 @@ Status SegcompactionWorker::_get_segcompaction_reader(
std::vector<uint32_t>& return_columns,
std::unique_ptr<vectorized::VerticalBlockReader>* reader) {
auto ctx = _writer->_context;
bool record_rowids = need_convert_delete_bitmap() && is_key;
StorageReadOptions read_options;
read_options.stats = stat;
read_options.use_page_cache = false;
read_options.tablet_schema = ctx.tablet_schema;
read_options.record_rowids = record_rowids;
std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators;
std::map<uint32_t, uint32_t> segment_rows;
for (auto& seg_ptr : *segments) {
std::unique_ptr<RowwiseIterator> iter;
auto s = seg_ptr->new_iterator(schema, read_options, &iter);
Expand All @@ -89,6 +92,10 @@ Status SegcompactionWorker::_get_segcompaction_reader(
s.to_string());
}
seg_iterators.push_back(std::move(iter));
segment_rows.emplace(seg_ptr->id(), seg_ptr->num_rows());
}
if (record_rowids && _rowid_conversion != nullptr) {
_rowid_conversion->reset_segment_map(segment_rows);
}

*reader = std::unique_ptr<vectorized::VerticalBlockReader> {
Expand All @@ -103,6 +110,7 @@ Status SegcompactionWorker::_get_segcompaction_reader(
reader_params.return_columns = return_columns;
reader_params.is_key_column_group = is_key;
reader_params.use_page_cache = false;
reader_params.record_rowids = record_rowids;
return (*reader)->init(reader_params, nullptr);
}

Expand Down Expand Up @@ -232,6 +240,9 @@ Status SegcompactionWorker::_do_compact_segments(SegCompactionCandidatesSharedPt

DCHECK(ctx.tablet);
auto tablet = std::static_pointer_cast<Tablet>(ctx.tablet);
if (need_convert_delete_bitmap() && _rowid_conversion == nullptr) {
_rowid_conversion = std::make_unique<SimpleRowIdConversion>(_writer->rowset_id());
}

std::vector<std::vector<uint32_t>> column_groups;
Merger::vertical_split_columns(ctx.tablet_schema, &column_groups);
Expand Down Expand Up @@ -262,8 +273,8 @@ Status SegcompactionWorker::_do_compact_segments(SegCompactionCandidatesSharedPt
Merger::Statistics merger_stats;
RETURN_IF_ERROR(Merger::vertical_compact_one_group(
tablet, ReaderType::READER_SEGMENT_COMPACTION, ctx.tablet_schema, is_key,
column_ids, &row_sources_buf, *reader, *writer, INT_MAX, &merger_stats, &index_size,
key_bounds));
column_ids, &row_sources_buf, *reader, *writer, &merger_stats, &index_size,
key_bounds, _rowid_conversion.get()));
total_index_size += index_size;
if (is_key) {
RETURN_IF_ERROR(row_sources_buf.flush());
Expand All @@ -289,6 +300,10 @@ Status SegcompactionWorker::_do_compact_segments(SegCompactionCandidatesSharedPt
}

RETURN_IF_ERROR(_delete_original_segments(begin, end));
if (_rowid_conversion != nullptr) {
convert_segment_delete_bitmap(ctx.mow_context->delete_bitmap, begin, end,
_writer->_num_segcompacted);
}
RETURN_IF_ERROR(_writer->_rename_compacted_segments(begin, end));

if (VLOG_DEBUG_IS_ON) {
Expand Down Expand Up @@ -349,6 +364,54 @@ void SegcompactionWorker::compact_segments(SegCompactionCandidatesSharedPtr segm
_is_compacting_state_mutable = true;
}

bool SegcompactionWorker::need_convert_delete_bitmap() {
if (_writer == nullptr) {
return false;
}
auto tablet = _writer->context().tablet;
return tablet != nullptr && tablet->keys_type() == KeysType::UNIQUE_KEYS &&
tablet->enable_unique_key_merge_on_write() &&
tablet->tablet_schema()->has_sequence_col();
}

void SegcompactionWorker::convert_segment_delete_bitmap(DeleteBitmapPtr src_delete_bitmap,
uint32_t src_seg_id, uint32_t dest_seg_id) {
// lazy init
if (nullptr == _converted_delete_bitmap) {
_converted_delete_bitmap = std::make_shared<DeleteBitmap>(_writer->context().tablet_id);
}
auto rowset_id = _writer->context().rowset_id;
const auto* seg_map =
src_delete_bitmap->get({rowset_id, src_seg_id, DeleteBitmap::TEMP_VERSION_COMMON});
_converted_delete_bitmap->set({rowset_id, dest_seg_id, DeleteBitmap::TEMP_VERSION_COMMON},
*seg_map);
}

void SegcompactionWorker::convert_segment_delete_bitmap(DeleteBitmapPtr src_delete_bitmap,
uint32_t src_begin, uint32_t src_end,
uint32_t dst_seg_id) {
// lazy init
if (nullptr == _converted_delete_bitmap) {
_converted_delete_bitmap = std::make_shared<DeleteBitmap>(_writer->context().tablet_id);
}
auto rowset_id = _writer->context().rowset_id;
RowLocation src(rowset_id, 0, 0);
for (uint32_t seg_id = src_begin; seg_id <= src_end; seg_id++) {
const auto* seg_map =
src_delete_bitmap->get({rowset_id, seg_id, DeleteBitmap::TEMP_VERSION_COMMON});
src.segment_id = seg_id;
for (unsigned int row_id : *seg_map) {
src.row_id = row_id;
auto dst_row_id = _rowid_conversion->get(src);
if (dst_row_id < 0) {
continue;
}
_converted_delete_bitmap->add(
{rowset_id, dst_seg_id, DeleteBitmap::TEMP_VERSION_COMMON}, dst_row_id);
}
}
}

bool SegcompactionWorker::cancel() {
// return true if the task is canncellable (actual compaction is not started)
// return false when the task is not cancellable (it is in the middle of segcompaction)
Expand Down
13 changes: 13 additions & 0 deletions be/src/olap/rowset/segcompaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "common/status.h"
#include "io/fs/file_reader_writer_fwd.h"
#include "olap/merger.h"
#include "olap/simple_rowid_conversion.h"
#include "olap/tablet.h"
#include "segment_v2/segment.h"

Expand Down Expand Up @@ -51,6 +52,14 @@ class SegcompactionWorker {

void compact_segments(SegCompactionCandidatesSharedPtr segments);

bool need_convert_delete_bitmap();

void convert_segment_delete_bitmap(DeleteBitmapPtr src_delete_bitmap, uint32_t src_seg_id,
uint32_t dest_seg_id);
void convert_segment_delete_bitmap(DeleteBitmapPtr src_delete_bitmap, uint32_t src_begin,
uint32_t src_end, uint32_t dest_seg_id);
DeleteBitmapPtr get_converted_delete_bitmap() { return _converted_delete_bitmap; }

io::FileWriterPtr& get_file_writer() { return _file_writer; }

// set the cancel flag, tasks already started will not be cancelled.
Expand Down Expand Up @@ -78,6 +87,10 @@ class SegcompactionWorker {
BetaRowsetWriter* _writer = nullptr;
io::FileWriterPtr _file_writer;

// for unique key mow table
std::unique_ptr<SimpleRowIdConversion> _rowid_conversion;
DeleteBitmapPtr _converted_delete_bitmap;

// the state is not mutable when 1)actual compaction operation started or 2) cancelled
std::atomic<bool> _is_compacting_state_mutable = true;
};
Expand Down
84 changes: 84 additions & 0 deletions be/src/olap/simple_rowid_conversion.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <map>
#include <vector>

#include "olap/olap_common.h"
#include "olap/utils.h"

namespace doris {

// Simple verion of rowid conversion, for segcompaction
// convert rows from several segments to rows in 1 segment
class SimpleRowIdConversion {
public:
SimpleRowIdConversion(const RowsetId& rowset_id) : _rowst_id(rowset_id) {};
~SimpleRowIdConversion() = default;

// resize segment rowid map to its rows num
void reset_segment_map(const std::map<uint32_t, uint32_t>& num_rows) {
_cur_dst_segment_rowid = 0;
for (auto seg_rows : num_rows) {
_segments_rowid_map.emplace(seg_rows.first,
std::vector<uint32_t>(seg_rows.second, UINT32_MAX));
}
}

// add row id to the map
void add(const std::vector<RowLocation>& rss_row_ids) {
for (auto& item : rss_row_ids) {
if (item.row_id == -1) {
continue;
}
DCHECK(_segments_rowid_map.find(item.segment_id) != _segments_rowid_map.end() &&
_segments_rowid_map[item.segment_id].size() > item.row_id);
_segments_rowid_map[item.segment_id][item.row_id] = _cur_dst_segment_rowid++;
}
}

// get destination RowLocation
// return non-zero if the src RowLocation does not exist
int get(const RowLocation& src) const {
auto it = _segments_rowid_map.find(src.segment_id);
if (it == _segments_rowid_map.end()) {
return -1;
}
const auto& rowid_map = it->second;
if (src.row_id >= rowid_map.size() || UINT32_MAX == rowid_map[src.row_id]) {
return -1;
}

return rowid_map[src.row_id];
}

private:
// key: index indicates src segment.
// value: index indicates row id of source segment, value indicates row id of destination
// segment. UINT32_MAX indicates current row not exist.
std::map<uint32_t, std::vector<uint32_t>> _segments_rowid_map;

// dst rowset id
RowsetId _rowst_id;

// current rowid of dst segment
std::uint32_t _cur_dst_segment_rowid = 0;
};

} // namespace doris
1 change: 1 addition & 0 deletions be/src/vec/olap/vertical_block_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ Status VerticalBlockReader::_init_collect_iter(const ReaderParams& read_params,
_reader_context.need_ordered_result = true; // TODO: should it be?
_reader_context.is_unique = tablet()->keys_type() == UNIQUE_KEYS;
_reader_context.is_key_column_group = read_params.is_key_column_group;
_reader_context.record_rowids = read_params.record_rowids;
}

// build heap if key column iterator or build vertical merge iterator if value column
Expand Down
Loading

0 comments on commit 115393e

Please sign in to comment.