diff --git a/be/src/olap/merger.cpp b/be/src/olap/merger.cpp index b2e789b5b5a7c1..216a791106726c 100644 --- a/be/src/olap/merger.cpp +++ b/be/src/olap/merger.cpp @@ -307,16 +307,12 @@ Status Merger::vertical_compact_one_group( } // for segcompaction -Status Merger::vertical_compact_one_group(TabletSharedPtr tablet, ReaderType reader_type, - TabletSchemaSPtr tablet_schema, bool is_key, - const std::vector& column_group, - vectorized::RowSourcesBuffer* row_source_buf, - vectorized::VerticalBlockReader& src_block_reader, - segment_v2::SegmentWriter& dst_segment_writer, - int64_t max_rows_per_segment, Statistics* stats_output, - uint64_t* index_size, KeyBoundsPB& key_bounds) { - // build tablet reader - VLOG_NOTICE << "vertical compact one group, max_rows_per_segment=" << max_rows_per_segment; +Status Merger::vertical_compact_one_group( + TabletSharedPtr tablet, ReaderType reader_type, TabletSchemaSPtr tablet_schema, bool is_key, + const std::vector& column_group, vectorized::RowSourcesBuffer* row_source_buf, + vectorized::VerticalBlockReader& src_block_reader, + segment_v2::SegmentWriter& dst_segment_writer, Statistics* stats_output, + uint64_t* index_size, KeyBoundsPB& key_bounds, SimpleRowIdConversion* rowid_conversion) { // TODO: record_rowids vectorized::Block block = tablet_schema->create_block(column_group); size_t output_rows = 0; @@ -333,6 +329,9 @@ Status Merger::vertical_compact_one_group(TabletSharedPtr tablet, ReaderType rea "failed to write block when merging rowsets of tablet " + std::to_string(tablet->tablet_id())); + if (is_key && rowid_conversion != nullptr) { + rowid_conversion->add(src_block_reader.current_block_row_locations()); + } output_rows += block.rows(); block.clear_column_data(); } diff --git a/be/src/olap/merger.h b/be/src/olap/merger.h index 49ca1e5227fe6e..e5d35001eeaa43 100644 --- a/be/src/olap/merger.h +++ b/be/src/olap/merger.h @@ -25,6 +25,7 @@ #include "io/io_common.h" #include "olap/iterators.h" #include "olap/rowset/rowset_reader.h" +#include "olap/simple_rowid_conversion.h" #include "olap/tablet.h" #include "olap/tablet_schema.h" @@ -86,8 +87,9 @@ class Merger { vectorized::RowSourcesBuffer* row_source_buf, vectorized::VerticalBlockReader& src_block_reader, segment_v2::SegmentWriter& dst_segment_writer, - int64_t max_rows_per_segment, Statistics* stats_output, - uint64_t* index_size, KeyBoundsPB& key_bounds); + Statistics* stats_output, uint64_t* index_size, + KeyBoundsPB& key_bounds, + SimpleRowIdConversion* rowid_conversion); // for mow with cluster key table, the key group also contains cluster key columns. // the `key_group_cluster_key_idxes` marks the positions of cluster key columns in key group. diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index 101571f0256831..ebe79c7e3d2475 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -245,7 +245,12 @@ Status BetaRowsetWriter::_find_longest_consecutive_small_segment( if (is_large_segment) { if (segid == _segcompacted_point) { // skip large segments at the front + auto dst_seg_id = _num_segcompacted.load(); RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++)); + if (_segcompaction_worker->need_convert_delete_bitmap()) { + _segcompaction_worker->convert_segment_delete_bitmap( + _context.mow_context->delete_bitmap, segid, dst_seg_id); + } continue; } else { // stop because we need consecutive segments @@ -270,7 +275,13 @@ Status BetaRowsetWriter::_find_longest_consecutive_small_segment( } if (s == 1) { // poor bachelor, let it go VLOG_DEBUG << "only one candidate segment"; + auto src_seg_id = _segcompacted_point.load(); + auto dst_seg_id = _num_segcompacted.load(); RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++)); + if (_segcompaction_worker->need_convert_delete_bitmap()) { + _segcompaction_worker->convert_segment_delete_bitmap( + _context.mow_context->delete_bitmap, src_seg_id, dst_seg_id); + } segments->clear(); return Status::OK(); } @@ -454,7 +465,7 @@ Status BetaRowsetWriter::_segcompaction_rename_last_segments() { "code: {}", _segcompaction_status.load()); } - if (!_is_segcompacted() || _segcompacted_point == _num_segment) { + if (!is_segcompacted() || _segcompacted_point == _num_segment) { // no need if never segcompact before or all segcompacted return Status::OK(); } @@ -462,7 +473,12 @@ Status BetaRowsetWriter::_segcompaction_rename_last_segments() { // so that transaction can be committed ASAP VLOG_DEBUG << "segcompaction last few segments"; for (int32_t segid = _segcompacted_point; segid < _num_segment; segid++) { + auto dst_segid = _num_segcompacted.load(); RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++)); + if (_segcompaction_worker->need_convert_delete_bitmap()) { + _segcompaction_worker->convert_segment_delete_bitmap( + _context.mow_context->delete_bitmap, segid, dst_segid); + } } return Status::OK(); } @@ -584,6 +600,20 @@ Status BetaRowsetWriter::_close_file_writers() { RETURN_NOT_OK_STATUS_WITH_WARN(_segcompaction_worker->get_file_writer()->close(), "close segment compaction worker failed"); } + // process delete bitmap for mow table + if (is_segcompacted() && _segcompaction_worker->need_convert_delete_bitmap()) { + auto converted_delete_bitmap = _segcompaction_worker->get_converted_delete_bitmap(); + // which means the segment compaction is triggerd + if (converted_delete_bitmap != nullptr) { + RowsetIdUnorderedSet rowsetids; + rowsetids.insert(rowset_id()); + context().tablet->add_sentinel_mark_to_delete_bitmap(converted_delete_bitmap.get(), + rowsetids); + context().mow_context->delete_bitmap->remove({rowset_id(), 0, 0}, + {rowset_id(), UINT32_MAX, INT64_MAX}); + context().mow_context->delete_bitmap->merge(*converted_delete_bitmap); + } + } } return Status::OK(); } @@ -616,7 +646,7 @@ int64_t BaseBetaRowsetWriter::_num_seg() const { } int64_t BetaRowsetWriter::_num_seg() const { - return _is_segcompacted() ? _num_segcompacted : _num_segment; + return is_segcompacted() ? _num_segcompacted : _num_segment; } // update tablet schema when meet variant columns, before commit_txn diff --git a/be/src/olap/rowset/beta_rowset_writer.h b/be/src/olap/rowset/beta_rowset_writer.h index 3e285c7e508fb7..b897d96c9b2293 100644 --- a/be/src/olap/rowset/beta_rowset_writer.h +++ b/be/src/olap/rowset/beta_rowset_writer.h @@ -205,6 +205,8 @@ class BetaRowsetWriter : public BaseBetaRowsetWriter { std::unique_ptr* writer, uint64_t index_size, KeyBoundsPB& key_bounds); + bool is_segcompacted() const { return _num_segcompacted > 0; } + private: Status _generate_delete_bitmap(int32_t segment_id) override; @@ -220,7 +222,6 @@ class BetaRowsetWriter : public BaseBetaRowsetWriter { Status _segcompaction_rename_last_segments(); Status _load_noncompacted_segment(segment_v2::SegmentSharedPtr& segment, int32_t segment_id); Status _find_longest_consecutive_small_segment(SegCompactionCandidatesSharedPtr& segments); - bool _is_segcompacted() const { return _num_segcompacted > 0; } bool _check_and_set_is_doing_segcompaction(); Status _rename_compacted_segments(int64_t begin, int64_t end); Status _rename_compacted_segment_plain(uint64_t seg_id); diff --git a/be/src/olap/rowset/segcompaction.cpp b/be/src/olap/rowset/segcompaction.cpp index 1c152c75c0fb43..d6754ca224dcc4 100644 --- a/be/src/olap/rowset/segcompaction.cpp +++ b/be/src/olap/rowset/segcompaction.cpp @@ -76,11 +76,14 @@ Status SegcompactionWorker::_get_segcompaction_reader( std::vector& return_columns, std::unique_ptr* reader) { auto ctx = _writer->_context; + bool record_rowids = need_convert_delete_bitmap() && is_key; StorageReadOptions read_options; read_options.stats = stat; read_options.use_page_cache = false; read_options.tablet_schema = ctx.tablet_schema; + read_options.record_rowids = record_rowids; std::vector> seg_iterators; + std::map segment_rows; for (auto& seg_ptr : *segments) { std::unique_ptr iter; auto s = seg_ptr->new_iterator(schema, read_options, &iter); @@ -89,6 +92,10 @@ Status SegcompactionWorker::_get_segcompaction_reader( s.to_string()); } seg_iterators.push_back(std::move(iter)); + segment_rows.emplace(seg_ptr->id(), seg_ptr->num_rows()); + } + if (record_rowids && _rowid_conversion != nullptr) { + _rowid_conversion->reset_segment_map(segment_rows); } *reader = std::unique_ptr { @@ -103,6 +110,7 @@ Status SegcompactionWorker::_get_segcompaction_reader( reader_params.return_columns = return_columns; reader_params.is_key_column_group = is_key; reader_params.use_page_cache = false; + reader_params.record_rowids = record_rowids; return (*reader)->init(reader_params, nullptr); } @@ -232,6 +240,9 @@ Status SegcompactionWorker::_do_compact_segments(SegCompactionCandidatesSharedPt DCHECK(ctx.tablet); auto tablet = std::static_pointer_cast(ctx.tablet); + if (need_convert_delete_bitmap() && _rowid_conversion == nullptr) { + _rowid_conversion = std::make_unique(_writer->rowset_id()); + } std::vector> column_groups; Merger::vertical_split_columns(ctx.tablet_schema, &column_groups); @@ -262,8 +273,8 @@ Status SegcompactionWorker::_do_compact_segments(SegCompactionCandidatesSharedPt Merger::Statistics merger_stats; RETURN_IF_ERROR(Merger::vertical_compact_one_group( tablet, ReaderType::READER_SEGMENT_COMPACTION, ctx.tablet_schema, is_key, - column_ids, &row_sources_buf, *reader, *writer, INT_MAX, &merger_stats, &index_size, - key_bounds)); + column_ids, &row_sources_buf, *reader, *writer, &merger_stats, &index_size, + key_bounds, _rowid_conversion.get())); total_index_size += index_size; if (is_key) { RETURN_IF_ERROR(row_sources_buf.flush()); @@ -289,6 +300,10 @@ Status SegcompactionWorker::_do_compact_segments(SegCompactionCandidatesSharedPt } RETURN_IF_ERROR(_delete_original_segments(begin, end)); + if (_rowid_conversion != nullptr) { + convert_segment_delete_bitmap(ctx.mow_context->delete_bitmap, begin, end, + _writer->_num_segcompacted); + } RETURN_IF_ERROR(_writer->_rename_compacted_segments(begin, end)); if (VLOG_DEBUG_IS_ON) { @@ -349,6 +364,54 @@ void SegcompactionWorker::compact_segments(SegCompactionCandidatesSharedPtr segm _is_compacting_state_mutable = true; } +bool SegcompactionWorker::need_convert_delete_bitmap() { + if (_writer == nullptr) { + return false; + } + auto tablet = _writer->context().tablet; + return tablet != nullptr && tablet->keys_type() == KeysType::UNIQUE_KEYS && + tablet->enable_unique_key_merge_on_write() && + tablet->tablet_schema()->has_sequence_col(); +} + +void SegcompactionWorker::convert_segment_delete_bitmap(DeleteBitmapPtr src_delete_bitmap, + uint32_t src_seg_id, uint32_t dest_seg_id) { + // lazy init + if (nullptr == _converted_delete_bitmap) { + _converted_delete_bitmap = std::make_shared(_writer->context().tablet_id); + } + auto rowset_id = _writer->context().rowset_id; + const auto* seg_map = + src_delete_bitmap->get({rowset_id, src_seg_id, DeleteBitmap::TEMP_VERSION_COMMON}); + _converted_delete_bitmap->set({rowset_id, dest_seg_id, DeleteBitmap::TEMP_VERSION_COMMON}, + *seg_map); +} + +void SegcompactionWorker::convert_segment_delete_bitmap(DeleteBitmapPtr src_delete_bitmap, + uint32_t src_begin, uint32_t src_end, + uint32_t dst_seg_id) { + // lazy init + if (nullptr == _converted_delete_bitmap) { + _converted_delete_bitmap = std::make_shared(_writer->context().tablet_id); + } + auto rowset_id = _writer->context().rowset_id; + RowLocation src(rowset_id, 0, 0); + for (uint32_t seg_id = src_begin; seg_id <= src_end; seg_id++) { + const auto* seg_map = + src_delete_bitmap->get({rowset_id, seg_id, DeleteBitmap::TEMP_VERSION_COMMON}); + src.segment_id = seg_id; + for (unsigned int row_id : *seg_map) { + src.row_id = row_id; + auto dst_row_id = _rowid_conversion->get(src); + if (dst_row_id < 0) { + continue; + } + _converted_delete_bitmap->add( + {rowset_id, dst_seg_id, DeleteBitmap::TEMP_VERSION_COMMON}, dst_row_id); + } + } +} + bool SegcompactionWorker::cancel() { // return true if the task is canncellable (actual compaction is not started) // return false when the task is not cancellable (it is in the middle of segcompaction) diff --git a/be/src/olap/rowset/segcompaction.h b/be/src/olap/rowset/segcompaction.h index 5aef89992d30b8..67dd6889aadd72 100644 --- a/be/src/olap/rowset/segcompaction.h +++ b/be/src/olap/rowset/segcompaction.h @@ -23,6 +23,7 @@ #include "common/status.h" #include "io/fs/file_reader_writer_fwd.h" #include "olap/merger.h" +#include "olap/simple_rowid_conversion.h" #include "olap/tablet.h" #include "segment_v2/segment.h" @@ -51,6 +52,14 @@ class SegcompactionWorker { void compact_segments(SegCompactionCandidatesSharedPtr segments); + bool need_convert_delete_bitmap(); + + void convert_segment_delete_bitmap(DeleteBitmapPtr src_delete_bitmap, uint32_t src_seg_id, + uint32_t dest_seg_id); + void convert_segment_delete_bitmap(DeleteBitmapPtr src_delete_bitmap, uint32_t src_begin, + uint32_t src_end, uint32_t dest_seg_id); + DeleteBitmapPtr get_converted_delete_bitmap() { return _converted_delete_bitmap; } + io::FileWriterPtr& get_file_writer() { return _file_writer; } // set the cancel flag, tasks already started will not be cancelled. @@ -78,6 +87,10 @@ class SegcompactionWorker { BetaRowsetWriter* _writer = nullptr; io::FileWriterPtr _file_writer; + // for unique key mow table + std::unique_ptr _rowid_conversion; + DeleteBitmapPtr _converted_delete_bitmap; + // the state is not mutable when 1)actual compaction operation started or 2) cancelled std::atomic _is_compacting_state_mutable = true; }; diff --git a/be/src/olap/simple_rowid_conversion.h b/be/src/olap/simple_rowid_conversion.h new file mode 100644 index 00000000000000..1a89b01838fe8c --- /dev/null +++ b/be/src/olap/simple_rowid_conversion.h @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +#include "olap/olap_common.h" +#include "olap/utils.h" + +namespace doris { + +// Simple verion of rowid conversion, for segcompaction +// convert rows from several segments to rows in 1 segment +class SimpleRowIdConversion { +public: + SimpleRowIdConversion(const RowsetId& rowset_id) : _rowst_id(rowset_id) {}; + ~SimpleRowIdConversion() = default; + + // resize segment rowid map to its rows num + void reset_segment_map(const std::map& num_rows) { + _cur_dst_segment_rowid = 0; + for (auto seg_rows : num_rows) { + _segments_rowid_map.emplace(seg_rows.first, + std::vector(seg_rows.second, UINT32_MAX)); + } + } + + // add row id to the map + void add(const std::vector& rss_row_ids) { + for (auto& item : rss_row_ids) { + if (item.row_id == -1) { + continue; + } + DCHECK(_segments_rowid_map.find(item.segment_id) != _segments_rowid_map.end() && + _segments_rowid_map[item.segment_id].size() > item.row_id); + _segments_rowid_map[item.segment_id][item.row_id] = _cur_dst_segment_rowid++; + } + } + + // get destination RowLocation + // return non-zero if the src RowLocation does not exist + int get(const RowLocation& src) const { + auto it = _segments_rowid_map.find(src.segment_id); + if (it == _segments_rowid_map.end()) { + return -1; + } + const auto& rowid_map = it->second; + if (src.row_id >= rowid_map.size() || UINT32_MAX == rowid_map[src.row_id]) { + return -1; + } + + return rowid_map[src.row_id]; + } + +private: + // key: index indicates src segment. + // value: index indicates row id of source segment, value indicates row id of destination + // segment. UINT32_MAX indicates current row not exist. + std::map> _segments_rowid_map; + + // dst rowset id + RowsetId _rowst_id; + + // current rowid of dst segment + std::uint32_t _cur_dst_segment_rowid = 0; +}; + +} // namespace doris diff --git a/be/src/vec/olap/vertical_block_reader.cpp b/be/src/vec/olap/vertical_block_reader.cpp index 58a2332d5a8d5d..c16cb1f1bc23cf 100644 --- a/be/src/vec/olap/vertical_block_reader.cpp +++ b/be/src/vec/olap/vertical_block_reader.cpp @@ -132,6 +132,7 @@ Status VerticalBlockReader::_init_collect_iter(const ReaderParams& read_params, _reader_context.need_ordered_result = true; // TODO: should it be? _reader_context.is_unique = tablet()->keys_type() == UNIQUE_KEYS; _reader_context.is_key_column_group = read_params.is_key_column_group; + _reader_context.record_rowids = read_params.record_rowids; } // build heap if key column iterator or build vertical merge iterator if value column diff --git a/be/test/common/status_test.cpp b/be/test/common/status_test.cpp index c1197dad0b1628..e5477db1127371 100644 --- a/be/test/common/status_test.cpp +++ b/be/test/common/status_test.cpp @@ -50,8 +50,6 @@ TEST_F(StatusTest, OK) { TEST_F(StatusTest, TStatusCodeWithStatus) { // The definition in status.h //extern ErrorCode::ErrorCodeState error_states[ErrorCode::MAX_ERROR_CODE_DEFINE_NUM]; - extern ErrorCode::ErrorCodeState error_states; - extern ErrorCode::ErrorCodeInitializer error_code_init; // The definition in Status_types.h extern const std::map _TStatusCode_VALUES_TO_NAMES; ErrorCode::error_code_init.check_init(); diff --git a/be/test/olap/segcompaction_mow_test.cpp b/be/test/olap/segcompaction_mow_test.cpp new file mode 100644 index 00000000000000..41e8ef74ed61cb --- /dev/null +++ b/be/test/olap/segcompaction_mow_test.cpp @@ -0,0 +1,890 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include +#include +#include +#include + +#include "common/config.h" +#include "gen_cpp/AgentService_types.h" +#include "gen_cpp/olap_file.pb.h" +#include "io/fs/local_file_system.h" +#include "olap/data_dir.h" +#include "olap/row_cursor.h" +#include "olap/rowset/beta_rowset_reader.h" +#include "olap/rowset/beta_rowset_writer.h" +#include "olap/rowset/rowset_factory.h" +#include "olap/rowset/rowset_reader_context.h" +#include "olap/rowset/rowset_writer.h" +#include "olap/rowset/rowset_writer_context.h" +#include "olap/storage_engine.h" +#include "olap/tablet_meta.h" +#include "olap/tablet_schema.h" +#include "olap/utils.h" +#include "runtime/exec_env.h" +#include "runtime/memory/mem_tracker.h" +#include "util/slice.h" + +namespace doris { +using namespace ErrorCode; + +static const uint32_t MAX_PATH_LEN = 1024; +static const uint32_t TABLET_ID = 12345; +static StorageEngine* s_engine; +static const std::string lTestDir = "./data_test/data/segcompaction_mow_test"; + +class SegCompactionMoWTest : public ::testing::TestWithParam { +public: + SegCompactionMoWTest() = default; + + void SetUp() { + config::enable_segcompaction = true; + config::tablet_map_shard_size = 1; + config::txn_map_shard_size = 1; + config::txn_shard_size = 1; + + char buffer[MAX_PATH_LEN]; + EXPECT_NE(getcwd(buffer, MAX_PATH_LEN), nullptr); + config::storage_root_path = std::string(buffer) + "/data_test"; + + auto st = io::global_local_filesystem()->delete_directory(config::storage_root_path); + ASSERT_TRUE(st.ok()) << st; + st = io::global_local_filesystem()->create_directory(config::storage_root_path); + ASSERT_TRUE(st.ok()) << st; + + std::vector paths; + paths.emplace_back(config::storage_root_path, -1); + + doris::EngineOptions options; + options.store_paths = paths; + + auto engine = std::make_unique(options); + s_engine = engine.get(); + ExecEnv::GetInstance()->set_storage_engine(std::move(engine)); + + Status s = s_engine->open(); + EXPECT_TRUE(s.ok()) << s.to_string(); + + _data_dir = std::make_unique(*s_engine, lTestDir); + static_cast(_data_dir->update_capacity()); + + EXPECT_TRUE(io::global_local_filesystem()->create_directory(lTestDir).ok()); + + s = s_engine->start_bg_threads(); + EXPECT_TRUE(s.ok()) << s.to_string(); + } + + void TearDown() { config::enable_segcompaction = false; } + +protected: + OlapReaderStatistics _stats; + + bool check_dir(std::vector& vec) { + std::vector result; + for (const auto& entry : std::filesystem::directory_iterator(lTestDir)) { + result.push_back(std::filesystem::path(entry.path()).filename()); + } + + LOG(INFO) << "expected ls:" << std::endl; + for (auto& i : vec) { + LOG(INFO) << i; + } + LOG(INFO) << "acutal ls:" << std::endl; + for (auto& i : result) { + LOG(INFO) << i; + } + + if (result.size() != vec.size()) { + return false; + } else { + for (auto& i : vec) { + if (std::find(result.begin(), result.end(), i) == result.end()) { + return false; + } + } + } + return true; + } + + // (k1 int, k2 varchar(20), k3 int) keys (k1, k2) + void create_tablet_schema(TabletSchemaSPtr tablet_schema) { + TabletSchemaPB tablet_schema_pb; + tablet_schema_pb.set_keys_type(UNIQUE_KEYS); + tablet_schema_pb.set_num_short_key_columns(2); + tablet_schema_pb.set_num_rows_per_row_block(1024); + tablet_schema_pb.set_compress_kind(COMPRESS_NONE); + tablet_schema_pb.set_next_column_unique_id(5); + // add seq column so that segcompaction will process delete bitmap + tablet_schema_pb.set_sequence_col_idx(3); + + ColumnPB* column_1 = tablet_schema_pb.add_column(); + column_1->set_unique_id(1); + column_1->set_name("k1"); + column_1->set_type("INT"); + column_1->set_is_key(true); + column_1->set_length(4); + column_1->set_index_length(4); + column_1->set_is_nullable(true); + column_1->set_is_bf_column(false); + + ColumnPB* column_2 = tablet_schema_pb.add_column(); + column_2->set_unique_id(2); + column_2->set_name("k2"); + column_2->set_type( + "INT"); // TODO change to varchar(20) when dict encoding for string is supported + column_2->set_length(4); + column_2->set_index_length(4); + column_2->set_is_nullable(true); + column_2->set_is_key(true); + column_2->set_is_nullable(true); + column_2->set_is_bf_column(false); + + ColumnPB* v_column = tablet_schema_pb.add_column(); + v_column->set_unique_id(3); + v_column->set_name(fmt::format("v1")); + v_column->set_type("INT"); + v_column->set_length(4); + v_column->set_is_key(false); + v_column->set_is_nullable(false); + v_column->set_is_bf_column(false); + v_column->set_default_value(std::to_string(10)); + v_column->set_aggregation("NONE"); + + ColumnPB* seq_column = tablet_schema_pb.add_column(); + seq_column->set_unique_id(4); + seq_column->set_name(SEQUENCE_COL); + seq_column->set_type("INT"); + seq_column->set_length(4); + seq_column->set_is_key(false); + seq_column->set_is_nullable(false); + seq_column->set_is_bf_column(false); + seq_column->set_default_value(std::to_string(10)); + seq_column->set_aggregation("NONE"); + + tablet_schema->init_from_pb(tablet_schema_pb); + } + + // use different id to avoid conflict + void create_rowset_writer_context(int64_t id, TabletSchemaSPtr tablet_schema, + RowsetWriterContext* rowset_writer_context) { + RowsetId rowset_id; + rowset_id.init(id); + // rowset_writer_context->data_dir = _data_dir.get(); + rowset_writer_context->rowset_id = rowset_id; + rowset_writer_context->tablet_id = TABLET_ID; + rowset_writer_context->tablet_schema_hash = 1111; + rowset_writer_context->partition_id = 10; + rowset_writer_context->rowset_type = BETA_ROWSET; + rowset_writer_context->tablet_path = lTestDir; + rowset_writer_context->rowset_state = VISIBLE; + rowset_writer_context->tablet_schema = tablet_schema; + rowset_writer_context->version.first = 10; + rowset_writer_context->version.second = 10; + + TabletMetaSharedPtr tablet_meta = std::make_shared(); + tablet_meta->_tablet_id = TABLET_ID; + static_cast(tablet_meta->set_partition_id(10000)); + tablet_meta->_schema = tablet_schema; + tablet_meta->_enable_unique_key_merge_on_write = true; + auto tablet = std::make_shared(*s_engine, tablet_meta, _data_dir.get(), "test_str"); + // tablet->key + rowset_writer_context->tablet = tablet; + } + + void create_and_init_rowset_reader(Rowset* rowset, RowsetReaderContext& context, + RowsetReaderSharedPtr* result) { + auto s = rowset->create_reader(result); + EXPECT_EQ(Status::OK(), s); + EXPECT_TRUE(*result != nullptr); + + s = (*result)->init(&context); + EXPECT_EQ(Status::OK(), s); + } + + bool check_data_read_with_delete_bitmap(TabletSchemaSPtr tablet_schema, + DeleteBitmapPtr delete_bitmap, RowsetSharedPtr rowset, + int expect_total_rows, int rows_mark_deleted) { + RowsetReaderContext reader_context; + reader_context.tablet_schema = tablet_schema; + // use this type to avoid cache from other ut + reader_context.reader_type = ReaderType::READER_QUERY; + reader_context.need_ordered_result = true; + std::vector return_columns = {0, 1, 2}; + reader_context.return_columns = &return_columns; + reader_context.stats = &_stats; + reader_context.delete_bitmap = delete_bitmap.get(); + + std::vector segment_num_rows; + Status s; + + // without predicates + { + RowsetReaderSharedPtr rowset_reader; + create_and_init_rowset_reader(rowset.get(), reader_context, &rowset_reader); + + uint32_t num_rows_read = 0; + bool eof = false; + while (!eof) { + std::shared_ptr output_block = + std::make_shared( + tablet_schema->create_block(return_columns)); + s = rowset_reader->next_block(output_block.get()); + if (s != Status::OK()) { + eof = true; + } + EXPECT_EQ(return_columns.size(), output_block->columns()); + for (int i = 0; i < output_block->rows(); ++i) { + vectorized::ColumnPtr col0 = output_block->get_by_position(0).column; + vectorized::ColumnPtr col1 = output_block->get_by_position(1).column; + vectorized::ColumnPtr col2 = output_block->get_by_position(2).column; + auto field1 = (*col0)[i]; + auto field2 = (*col1)[i]; + auto field3 = (*col2)[i]; + uint32_t k1 = *reinterpret_cast((char*)(&field1)); + uint32_t k2 = *reinterpret_cast((char*)(&field2)); + uint32_t v3 = *reinterpret_cast((char*)(&field3)); + EXPECT_EQ(100 * v3 + k2, k1); + EXPECT_TRUE(v3 % 3 != 0); // all v3%3==0 is deleted + num_rows_read++; + } + output_block->clear(); + } + EXPECT_EQ(Status::Error(""), s); + EXPECT_EQ(rowset->rowset_meta()->num_rows(), expect_total_rows); + EXPECT_EQ(num_rows_read, expect_total_rows - rows_mark_deleted); + EXPECT_TRUE(rowset_reader->get_segment_num_rows(&segment_num_rows).ok()); + size_t total_num_rows = 0; + for (const auto& i : segment_num_rows) { + total_num_rows += i; + } + EXPECT_EQ(total_num_rows, expect_total_rows); + } + return true; + } + +private: + std::unique_ptr _data_dir; +}; + +TEST_P(SegCompactionMoWTest, SegCompactionThenRead) { + std::string delete_ratio = GetParam(); + config::enable_segcompaction = true; + Status s; + TabletSchemaSPtr tablet_schema = std::make_shared(); + create_tablet_schema(tablet_schema); + + RowsetSharedPtr rowset; + const int num_segments = 15; + const uint32_t rows_per_segment = 4096; + config::segcompaction_candidate_max_rows = 6000; // set threshold above + // rows_per_segment + config::segcompaction_batch_size = 10; + std::vector segment_num_rows; + DeleteBitmapPtr delete_bitmap = std::make_shared(TABLET_ID); + uint32_t rows_mark_deleted = 0; + { // write `num_segments * rows_per_segment` rows to rowset + RowsetWriterContext writer_context; + int raw_rsid = rand(); + create_rowset_writer_context(raw_rsid, tablet_schema, &writer_context); + RowsetIdUnorderedSet rsids; + std::vector rowset_ptrs; + writer_context.mow_context = + std::make_shared(1, 1, rsids, rowset_ptrs, delete_bitmap); + auto rowset_id = writer_context.rowset_id; + + auto res = RowsetFactory::create_rowset_writer(*s_engine, writer_context, false); + EXPECT_TRUE(res.has_value()) << res.error(); + auto rowset_writer = std::move(res).value(); + EXPECT_EQ(Status::OK(), s); + // for segment "i", row "rid" + // k1 := rid*10 + i + // k2 := k1 * 10 + // k3 := rid + for (int i = 0; i < num_segments; ++i) { + vectorized::Block block = tablet_schema->create_block(); + auto columns = block.mutate_columns(); + for (int rid = 0; rid < rows_per_segment; ++rid) { + uint32_t k1 = rid * 100 + i; + uint32_t k2 = i; + uint32_t k3 = rid; + uint32_t seq = 0; + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); + columns[3]->insert_data((const char*)&seq, sizeof(seq)); + if (delete_ratio == "full") { // delete all data + writer_context.mow_context->delete_bitmap->add( + {rowset_id, i, DeleteBitmap::TEMP_VERSION_COMMON}, rid); + rows_mark_deleted++; + } else { + // mark delete every 3 rows + if (rid % 3 == 0) { + writer_context.mow_context->delete_bitmap->add( + {rowset_id, i, DeleteBitmap::TEMP_VERSION_COMMON}, rid); + rows_mark_deleted++; + } + } + } + s = rowset_writer->add_block(&block); + EXPECT_TRUE(s.ok()); + s = rowset_writer->flush(); + EXPECT_EQ(Status::OK(), s); + sleep(1); + } + + size_t total_cardinality1 = 0; + for (auto entry : delete_bitmap->delete_bitmap) { + total_cardinality1 += entry.second.cardinality(); + } + EXPECT_EQ(num_segments, delete_bitmap->delete_bitmap.size()); + EXPECT_EQ(Status::OK(), rowset_writer->build(rowset)); + std::vector ls; + ls.push_back(fmt::format("{}_0.dat", raw_rsid)); + ls.push_back(fmt::format("{}_1.dat", raw_rsid)); + ls.push_back(fmt::format("{}_2.dat", raw_rsid)); + ls.push_back(fmt::format("{}_3.dat", raw_rsid)); + ls.push_back(fmt::format("{}_4.dat", raw_rsid)); + ls.push_back(fmt::format("{}_5.dat", raw_rsid)); + ls.push_back(fmt::format("{}_6.dat", raw_rsid)); + EXPECT_TRUE(check_dir(ls)); + // 7 segments plus 1 sentinel mark + size_t total_cardinality2 = 0; + for (auto entry : delete_bitmap->delete_bitmap) { + if (std::get<1>(entry.first) == DeleteBitmap::INVALID_SEGMENT_ID) { + continue; + } + total_cardinality2 += entry.second.cardinality(); + } + // 7 segments + 1 sentinel mark + EXPECT_EQ(8, delete_bitmap->delete_bitmap.size()); + EXPECT_EQ(total_cardinality1, total_cardinality2); + } + + EXPECT_TRUE(check_data_read_with_delete_bitmap(tablet_schema, delete_bitmap, rowset, + num_segments * rows_per_segment, + rows_mark_deleted)); +} + +TEST_F(SegCompactionMoWTest, SegCompactionInterleaveWithBig_ooooOOoOooooooooO) { + config::enable_segcompaction = true; + Status s; + TabletSchemaSPtr tablet_schema = std::make_shared(); + create_tablet_schema(tablet_schema); + + RowsetSharedPtr rowset; + config::segcompaction_candidate_max_rows = 6000; // set threshold above + // rows_per_segment + DeleteBitmapPtr delete_bitmap = std::make_shared(TABLET_ID); + uint32_t rows_mark_deleted = 0; + uint32_t total_written_rows = 0; + std::vector segment_num_rows; + { // write `num_segments * rows_per_segment` rows to rowset + RowsetWriterContext writer_context; + create_rowset_writer_context(20048, tablet_schema, &writer_context); + RowsetIdUnorderedSet rsids; + std::vector rowset_ptrs; + writer_context.mow_context = + std::make_shared(1, 1, rsids, rowset_ptrs, delete_bitmap); + auto rowset_id = writer_context.rowset_id; + + auto res = RowsetFactory::create_rowset_writer(*s_engine, writer_context, false); + EXPECT_TRUE(res.has_value()) << res.error(); + auto rowset_writer = std::move(res).value(); + EXPECT_EQ(Status::OK(), s); + + // for segment "i", row "rid" + // k1 := rid*10 + i + // k2 := k1 * 10 + // k3 := 4096 * i + rid + int num_segments = 4; + uint32_t rows_per_segment = 4096; + int segid = 0; + for (int i = 0; i < num_segments; ++i) { + vectorized::Block block = tablet_schema->create_block(); + auto columns = block.mutate_columns(); + for (int rid = 0; rid < rows_per_segment; ++rid) { + uint32_t k1 = rid * 100 + i; + uint32_t k2 = i; + uint32_t k3 = rid; + uint32_t seq = 0; + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); + columns[3]->insert_data((const char*)&seq, sizeof(seq)); + // mark delete every 3 rows + if (rid % 3 == 0) { + writer_context.mow_context->delete_bitmap->add( + {rowset_id, segid, DeleteBitmap::TEMP_VERSION_COMMON}, rid); + rows_mark_deleted++; + } + } + s = rowset_writer->add_block(&block); + EXPECT_TRUE(s.ok()); + s = rowset_writer->flush(); + EXPECT_EQ(Status::OK(), s); + segid++; + total_written_rows += rows_per_segment; + } + num_segments = 2; + rows_per_segment = 6400; + for (int i = 0; i < num_segments; ++i) { + vectorized::Block block = tablet_schema->create_block(); + auto columns = block.mutate_columns(); + for (int rid = 0; rid < rows_per_segment; ++rid) { + uint32_t k1 = rid * 100 + i; + uint32_t k2 = i; + uint32_t k3 = rid; + uint32_t seq = 0; + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); + columns[3]->insert_data((const char*)&seq, sizeof(seq)); + // mark delete every 3 rows + if (rid % 3 == 0) { + writer_context.mow_context->delete_bitmap->add( + {rowset_id, segid, DeleteBitmap::TEMP_VERSION_COMMON}, rid); + rows_mark_deleted++; + } + } + s = rowset_writer->add_block(&block); + EXPECT_TRUE(s.ok()); + s = rowset_writer->flush(); + EXPECT_EQ(Status::OK(), s); + segid++; + total_written_rows += rows_per_segment; + } + num_segments = 1; + rows_per_segment = 4096; + for (int i = 0; i < num_segments; ++i) { + vectorized::Block block = tablet_schema->create_block(); + auto columns = block.mutate_columns(); + for (int rid = 0; rid < rows_per_segment; ++rid) { + uint32_t k1 = rid * 100 + i; + uint32_t k2 = i; + uint32_t k3 = rid; + uint32_t seq = 0; + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); + columns[3]->insert_data((const char*)&seq, sizeof(seq)); + // mark delete every 3 rows + if (rid % 3 == 0) { + writer_context.mow_context->delete_bitmap->add( + {rowset_id, segid, DeleteBitmap::TEMP_VERSION_COMMON}, rid); + rows_mark_deleted++; + } + } + s = rowset_writer->add_block(&block); + EXPECT_TRUE(s.ok()); + s = rowset_writer->flush(); + EXPECT_EQ(Status::OK(), s); + segid++; + total_written_rows += rows_per_segment; + } + num_segments = 1; + rows_per_segment = 6400; + for (int i = 0; i < num_segments; ++i) { + vectorized::Block block = tablet_schema->create_block(); + auto columns = block.mutate_columns(); + for (int rid = 0; rid < rows_per_segment; ++rid) { + uint32_t k1 = rid * 100 + i; + uint32_t k2 = i; + uint32_t k3 = rid; + uint32_t seq = 0; + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); + columns[3]->insert_data((const char*)&seq, sizeof(seq)); + // mark delete every 3 rows + if (rid % 3 == 0) { + writer_context.mow_context->delete_bitmap->add( + {rowset_id, segid, DeleteBitmap::TEMP_VERSION_COMMON}, rid); + rows_mark_deleted++; + } + } + s = rowset_writer->add_block(&block); + EXPECT_TRUE(s.ok()); + s = rowset_writer->flush(); + EXPECT_EQ(Status::OK(), s); + segid++; + total_written_rows += rows_per_segment; + } + num_segments = 8; + rows_per_segment = 4096; + std::map unique_keys; + for (int i = 0; i < num_segments; ++i) { + vectorized::Block block = tablet_schema->create_block(); + auto columns = block.mutate_columns(); + for (int rid = 0; rid < rows_per_segment; ++rid) { + // generate some duplicate rows, segment compaction will merge them + int rand_i = rand() % (num_segments - 3); + uint32_t k1 = rid * 100 + rand_i; + uint32_t k2 = rand_i; + uint32_t k3 = rid; + uint32_t seq = 0; + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); + columns[3]->insert_data((const char*)&seq, sizeof(seq)); + // mark delete every 3 rows + if (rid % 3 == 0) { + writer_context.mow_context->delete_bitmap->add( + {rowset_id, segid, DeleteBitmap::TEMP_VERSION_COMMON}, rid); + } + unique_keys.emplace(k1, rid); + } + s = rowset_writer->add_block(&block); + EXPECT_TRUE(s.ok()); + s = rowset_writer->flush(); + EXPECT_EQ(Status::OK(), s); + sleep(1); + segid++; + } + // these 8 segments should be compacted to 1 segment finally + // so the finally written rows should be the unique rows after compaction + total_written_rows += unique_keys.size(); + for (auto entry : unique_keys) { + if (entry.second % 3 == 0) { + rows_mark_deleted++; + } + } + + num_segments = 1; + rows_per_segment = 6400; + for (int i = 0; i < num_segments; ++i) { + vectorized::Block block = tablet_schema->create_block(); + auto columns = block.mutate_columns(); + for (int rid = 0; rid < rows_per_segment; ++rid) { + uint32_t k1 = rid * 100 + i; + uint32_t k2 = i; + uint32_t k3 = rid; + uint32_t seq = 0; + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); + columns[3]->insert_data((const char*)&seq, sizeof(seq)); + // mark delete every 3 rows + if (rid % 3 == 0) { + writer_context.mow_context->delete_bitmap->add( + {rowset_id, segid, DeleteBitmap::TEMP_VERSION_COMMON}, rid); + rows_mark_deleted++; + } + } + s = rowset_writer->add_block(&block); + EXPECT_TRUE(s.ok()); + s = rowset_writer->flush(); + EXPECT_EQ(Status::OK(), s); + sleep(1); + segid++; + total_written_rows += rows_per_segment; + } + + EXPECT_EQ(Status::OK(), rowset_writer->build(rowset)); + std::vector ls; + // ooooOOoOooooooooO + ls.push_back("20048_0.dat"); // oooo + ls.push_back("20048_1.dat"); // O + ls.push_back("20048_2.dat"); // O + ls.push_back("20048_3.dat"); // o + ls.push_back("20048_4.dat"); // O + ls.push_back("20048_5.dat"); // oooooooo + ls.push_back("20048_6.dat"); // O + EXPECT_TRUE(check_dir(ls)); + // 7 segments + 1 sentinel mark + EXPECT_EQ(8, delete_bitmap->delete_bitmap.size()); + } + EXPECT_TRUE(check_data_read_with_delete_bitmap(tablet_schema, delete_bitmap, rowset, + total_written_rows, rows_mark_deleted)); +} + +TEST_F(SegCompactionMoWTest, SegCompactionInterleaveWithBig_OoOoO) { + config::enable_segcompaction = true; + Status s; + TabletSchemaSPtr tablet_schema = std::make_shared(); + create_tablet_schema(tablet_schema); + + RowsetSharedPtr rowset; + config::segcompaction_candidate_max_rows = 6000; // set threshold above + config::segcompaction_batch_size = 5; + std::vector segment_num_rows; + DeleteBitmapPtr delete_bitmap = std::make_shared(TABLET_ID); + uint32_t rows_mark_deleted = 0; + uint32_t total_written_rows = 0; + { // write `num_segments * rows_per_segment` rows to rowset + RowsetWriterContext writer_context; + create_rowset_writer_context(20049, tablet_schema, &writer_context); + RowsetIdUnorderedSet rsids; + std::vector rowset_ptrs; + writer_context.mow_context = + std::make_shared(1, 1, rsids, rowset_ptrs, delete_bitmap); + auto rowset_id = writer_context.rowset_id; + + auto res = RowsetFactory::create_rowset_writer(*s_engine, writer_context, false); + EXPECT_TRUE(res.has_value()) << res.error(); + auto rowset_writer = std::move(res).value(); + EXPECT_EQ(Status::OK(), s); + + // for segment "i", row "rid" + // k1 := rid*10 + i + // k2 := k1 * 10 + // k3 := 4096 * i + rid + int num_segments = 1; + uint32_t rows_per_segment = 6400; + int segid = 0; + for (int i = 0; i < num_segments; ++i) { + vectorized::Block block = tablet_schema->create_block(); + auto columns = block.mutate_columns(); + for (int rid = 0; rid < rows_per_segment; ++rid) { + uint32_t k1 = rid * 100 + i; + uint32_t k2 = i; + uint32_t k3 = rid; + uint32_t seq = 0; + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); + columns[3]->insert_data((const char*)&seq, sizeof(seq)); + // mark delete every 3 rows + if (rid % 3 == 0) { + writer_context.mow_context->delete_bitmap->add( + {rowset_id, segid, DeleteBitmap::TEMP_VERSION_COMMON}, rid); + rows_mark_deleted++; + } + } + s = rowset_writer->add_block(&block); + EXPECT_TRUE(s.ok()); + s = rowset_writer->flush(); + EXPECT_EQ(Status::OK(), s); + segid++; + total_written_rows += rows_per_segment; + } + num_segments = 1; + rows_per_segment = 4096; + for (int i = 0; i < num_segments; ++i) { + vectorized::Block block = tablet_schema->create_block(); + auto columns = block.mutate_columns(); + for (int rid = 0; rid < rows_per_segment; ++rid) { + uint32_t k1 = rid * 100 + i; + uint32_t k2 = i; + uint32_t k3 = rid; + uint32_t seq = 0; + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); + columns[3]->insert_data((const char*)&seq, sizeof(seq)); + // mark delete every 3 rows + if (rid % 3 == 0) { + writer_context.mow_context->delete_bitmap->add( + {rowset_id, segid, DeleteBitmap::TEMP_VERSION_COMMON}, rid); + rows_mark_deleted++; + } + } + s = rowset_writer->add_block(&block); + EXPECT_TRUE(s.ok()); + s = rowset_writer->flush(); + EXPECT_EQ(Status::OK(), s); + segid++; + total_written_rows += rows_per_segment; + } + num_segments = 1; + rows_per_segment = 6400; + for (int i = 0; i < num_segments; ++i) { + vectorized::Block block = tablet_schema->create_block(); + auto columns = block.mutate_columns(); + for (int rid = 0; rid < rows_per_segment; ++rid) { + uint32_t k1 = rid * 100 + i; + uint32_t k2 = i; + uint32_t k3 = rid; + uint32_t seq = 0; + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); + columns[3]->insert_data((const char*)&seq, sizeof(seq)); + // mark delete every 3 rows + if (rid % 3 == 0) { + writer_context.mow_context->delete_bitmap->add( + {rowset_id, segid, DeleteBitmap::TEMP_VERSION_COMMON}, rid); + rows_mark_deleted++; + } + } + s = rowset_writer->add_block(&block); + EXPECT_TRUE(s.ok()); + s = rowset_writer->flush(); + EXPECT_EQ(Status::OK(), s); + segid++; + total_written_rows += rows_per_segment; + } + num_segments = 1; + rows_per_segment = 4096; + for (int i = 0; i < num_segments; ++i) { + vectorized::Block block = tablet_schema->create_block(); + auto columns = block.mutate_columns(); + for (int rid = 0; rid < rows_per_segment; ++rid) { + uint32_t k1 = rid * 100 + i; + uint32_t k2 = i; + uint32_t k3 = rid; + uint32_t seq = 0; + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); + columns[3]->insert_data((const char*)&seq, sizeof(seq)); + // mark delete every 3 rows + if (rid % 3 == 0) { + writer_context.mow_context->delete_bitmap->add( + {rowset_id, segid, DeleteBitmap::TEMP_VERSION_COMMON}, rid); + rows_mark_deleted++; + } + } + s = rowset_writer->add_block(&block); + EXPECT_TRUE(s.ok()); + s = rowset_writer->flush(); + EXPECT_EQ(Status::OK(), s); + segid++; + total_written_rows += rows_per_segment; + } + num_segments = 1; + rows_per_segment = 6400; + for (int i = 0; i < num_segments; ++i) { + vectorized::Block block = tablet_schema->create_block(); + auto columns = block.mutate_columns(); + for (int rid = 0; rid < rows_per_segment; ++rid) { + uint32_t k1 = rid * 100 + i; + uint32_t k2 = i; + uint32_t k3 = rid; + uint32_t seq = 0; + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); + columns[3]->insert_data((const char*)&seq, sizeof(seq)); + // mark delete every 3 rows + if (rid % 3 == 0) { + writer_context.mow_context->delete_bitmap->add( + {rowset_id, segid, DeleteBitmap::TEMP_VERSION_COMMON}, rid); + rows_mark_deleted++; + } + } + s = rowset_writer->add_block(&block); + EXPECT_TRUE(s.ok()); + s = rowset_writer->flush(); + EXPECT_EQ(Status::OK(), s); + sleep(1); + segid++; + total_written_rows += rows_per_segment; + } + + EXPECT_EQ(Status::OK(), rowset_writer->build(rowset)); + std::vector ls; + ls.push_back("20049_0.dat"); // O + ls.push_back("20049_1.dat"); // o + ls.push_back("20049_2.dat"); // O + ls.push_back("20049_3.dat"); // o + ls.push_back("20049_4.dat"); // O + EXPECT_TRUE(check_dir(ls)); + } + + EXPECT_TRUE(check_data_read_with_delete_bitmap(tablet_schema, delete_bitmap, rowset, + total_written_rows, rows_mark_deleted)); +} + +TEST_F(SegCompactionMoWTest, SegCompactionNotTrigger) { + config::enable_segcompaction = true; + Status s; + TabletSchemaSPtr tablet_schema = std::make_shared(); + create_tablet_schema(tablet_schema); + + RowsetSharedPtr rowset; + const int num_segments = 8; + const uint32_t rows_per_segment = 4096; + config::segcompaction_candidate_max_rows = 6000; // set threshold above + // rows_per_segment + config::segcompaction_batch_size = 10; + std::vector segment_num_rows; + DeleteBitmapPtr delete_bitmap = std::make_shared(TABLET_ID); + uint32_t rows_mark_deleted = 0; + { // write `num_segments * rows_per_segment` rows to rowset + RowsetWriterContext writer_context; + create_rowset_writer_context(20050, tablet_schema, &writer_context); + RowsetIdUnorderedSet rsids; + std::vector rowset_ptrs; + writer_context.mow_context = + std::make_shared(1, 1, rsids, rowset_ptrs, delete_bitmap); + auto rowset_id = writer_context.rowset_id; + + auto res = RowsetFactory::create_rowset_writer(*s_engine, writer_context, false); + EXPECT_TRUE(res.has_value()) << res.error(); + auto rowset_writer = std::move(res).value(); + EXPECT_EQ(Status::OK(), s); + // for segment "i", row "rid" + // k1 := rid*10 + i + // k2 := k1 * 10 + // k3 := rid + for (int i = 0; i < num_segments; ++i) { + vectorized::Block block = tablet_schema->create_block(); + auto columns = block.mutate_columns(); + for (int rid = 0; rid < rows_per_segment; ++rid) { + uint32_t k1 = rid * 100 + i; + uint32_t k2 = i; + uint32_t k3 = rid; + uint32_t seq = 0; + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); + columns[3]->insert_data((const char*)&seq, sizeof(seq)); + // mark delete every 3 rows + if (rid % 3 == 0) { + writer_context.mow_context->delete_bitmap->add( + {rowset_id, i, DeleteBitmap::TEMP_VERSION_COMMON}, rid); + rows_mark_deleted++; + } + } + s = rowset_writer->add_block(&block); + EXPECT_TRUE(s.ok()); + s = rowset_writer->flush(); + EXPECT_EQ(Status::OK(), s); + sleep(1); + } + + EXPECT_EQ(num_segments, delete_bitmap->delete_bitmap.size()); + EXPECT_EQ(Status::OK(), rowset_writer->build(rowset)); + std::vector ls; + ls.push_back("20050_0.dat"); + ls.push_back("20050_1.dat"); + ls.push_back("20050_2.dat"); + ls.push_back("20050_3.dat"); + ls.push_back("20050_4.dat"); + ls.push_back("20050_5.dat"); + ls.push_back("20050_6.dat"); + ls.push_back("20050_7.dat"); + EXPECT_TRUE(check_dir(ls)); + EXPECT_EQ(num_segments, delete_bitmap->delete_bitmap.size()); + + EXPECT_FALSE(static_cast(rowset_writer.get())->is_segcompacted()); + } + + EXPECT_TRUE(check_data_read_with_delete_bitmap(tablet_schema, delete_bitmap, rowset, + num_segments * rows_per_segment, + rows_mark_deleted)); +} + +INSTANTIATE_TEST_SUITE_P(Params, SegCompactionMoWTest, + ::testing::ValuesIn(std::vector {"partial", "full"})); + +} // namespace doris + +// @brief Test Stub