From 115393e726039c63bcc18c1dccada21a2e529462 Mon Sep 17 00:00:00 2001 From: zhannngchen <48427519+zhannngchen@users.noreply.github.com> Date: Thu, 1 Aug 2024 15:00:28 +0800 Subject: [PATCH] [fix](merge-on-write) segcompaction should process delete bitmap if necessary (#38369) Issue Number: close #xxx When loading data to a unique key table with sequence column, some data in current load job might be marked as delete due to a lower sequence value. If there's many segments in such load job, segcompaction might be triggered, which don't process the delete bitmap currently, will cause data correctness issue For example: 1. we have 4 segments in current load job initially, and due to seq column, some rows are marked as deleted 2. after segcompaction, if we don't process the delete bitmap, it's content is still corresponding to the old segment layout, and row 7,14,15 is not mark deleted correctly on new generated segment 1. 3. in this PR, we convert old delete bitmap to fit new segment layout, it use similar way as base/cumulative compaction to convert delete bitmaps on old layout to new one, but the rowid conversion is simpler ![whiteboard_exported_image-2](https://github.com/user-attachments/assets/a419b6a4-e583-457a-bf4e-56d9bd2a3544) --- be/src/olap/merger.cpp | 19 +- be/src/olap/merger.h | 6 +- be/src/olap/rowset/beta_rowset_writer.cpp | 34 +- be/src/olap/rowset/beta_rowset_writer.h | 3 +- be/src/olap/rowset/segcompaction.cpp | 67 +- be/src/olap/rowset/segcompaction.h | 13 + be/src/olap/simple_rowid_conversion.h | 84 ++ be/src/vec/olap/vertical_block_reader.cpp | 1 + be/test/common/status_test.cpp | 2 - be/test/olap/segcompaction_mow_test.cpp | 890 ++++++++++++++++++++++ 10 files changed, 1100 insertions(+), 19 deletions(-) create mode 100644 be/src/olap/simple_rowid_conversion.h create mode 100644 be/test/olap/segcompaction_mow_test.cpp diff --git a/be/src/olap/merger.cpp b/be/src/olap/merger.cpp index b2e789b5b5a7c1..216a791106726c 100644 --- a/be/src/olap/merger.cpp +++ b/be/src/olap/merger.cpp @@ -307,16 +307,12 @@ Status Merger::vertical_compact_one_group( } // for segcompaction -Status Merger::vertical_compact_one_group(TabletSharedPtr tablet, ReaderType reader_type, - TabletSchemaSPtr tablet_schema, bool is_key, - const std::vector& column_group, - vectorized::RowSourcesBuffer* row_source_buf, - vectorized::VerticalBlockReader& src_block_reader, - segment_v2::SegmentWriter& dst_segment_writer, - int64_t max_rows_per_segment, Statistics* stats_output, - uint64_t* index_size, KeyBoundsPB& key_bounds) { - // build tablet reader - VLOG_NOTICE << "vertical compact one group, max_rows_per_segment=" << max_rows_per_segment; +Status Merger::vertical_compact_one_group( + TabletSharedPtr tablet, ReaderType reader_type, TabletSchemaSPtr tablet_schema, bool is_key, + const std::vector& column_group, vectorized::RowSourcesBuffer* row_source_buf, + vectorized::VerticalBlockReader& src_block_reader, + segment_v2::SegmentWriter& dst_segment_writer, Statistics* stats_output, + uint64_t* index_size, KeyBoundsPB& key_bounds, SimpleRowIdConversion* rowid_conversion) { // TODO: record_rowids vectorized::Block block = tablet_schema->create_block(column_group); size_t output_rows = 0; @@ -333,6 +329,9 @@ Status Merger::vertical_compact_one_group(TabletSharedPtr tablet, ReaderType rea "failed to write block when merging rowsets of tablet " + std::to_string(tablet->tablet_id())); + if (is_key && rowid_conversion != nullptr) { + rowid_conversion->add(src_block_reader.current_block_row_locations()); + } output_rows += block.rows(); block.clear_column_data(); } diff --git a/be/src/olap/merger.h b/be/src/olap/merger.h index 49ca1e5227fe6e..e5d35001eeaa43 100644 --- a/be/src/olap/merger.h +++ b/be/src/olap/merger.h @@ -25,6 +25,7 @@ #include "io/io_common.h" #include "olap/iterators.h" #include "olap/rowset/rowset_reader.h" +#include "olap/simple_rowid_conversion.h" #include "olap/tablet.h" #include "olap/tablet_schema.h" @@ -86,8 +87,9 @@ class Merger { vectorized::RowSourcesBuffer* row_source_buf, vectorized::VerticalBlockReader& src_block_reader, segment_v2::SegmentWriter& dst_segment_writer, - int64_t max_rows_per_segment, Statistics* stats_output, - uint64_t* index_size, KeyBoundsPB& key_bounds); + Statistics* stats_output, uint64_t* index_size, + KeyBoundsPB& key_bounds, + SimpleRowIdConversion* rowid_conversion); // for mow with cluster key table, the key group also contains cluster key columns. // the `key_group_cluster_key_idxes` marks the positions of cluster key columns in key group. diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index 101571f0256831..ebe79c7e3d2475 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -245,7 +245,12 @@ Status BetaRowsetWriter::_find_longest_consecutive_small_segment( if (is_large_segment) { if (segid == _segcompacted_point) { // skip large segments at the front + auto dst_seg_id = _num_segcompacted.load(); RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++)); + if (_segcompaction_worker->need_convert_delete_bitmap()) { + _segcompaction_worker->convert_segment_delete_bitmap( + _context.mow_context->delete_bitmap, segid, dst_seg_id); + } continue; } else { // stop because we need consecutive segments @@ -270,7 +275,13 @@ Status BetaRowsetWriter::_find_longest_consecutive_small_segment( } if (s == 1) { // poor bachelor, let it go VLOG_DEBUG << "only one candidate segment"; + auto src_seg_id = _segcompacted_point.load(); + auto dst_seg_id = _num_segcompacted.load(); RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++)); + if (_segcompaction_worker->need_convert_delete_bitmap()) { + _segcompaction_worker->convert_segment_delete_bitmap( + _context.mow_context->delete_bitmap, src_seg_id, dst_seg_id); + } segments->clear(); return Status::OK(); } @@ -454,7 +465,7 @@ Status BetaRowsetWriter::_segcompaction_rename_last_segments() { "code: {}", _segcompaction_status.load()); } - if (!_is_segcompacted() || _segcompacted_point == _num_segment) { + if (!is_segcompacted() || _segcompacted_point == _num_segment) { // no need if never segcompact before or all segcompacted return Status::OK(); } @@ -462,7 +473,12 @@ Status BetaRowsetWriter::_segcompaction_rename_last_segments() { // so that transaction can be committed ASAP VLOG_DEBUG << "segcompaction last few segments"; for (int32_t segid = _segcompacted_point; segid < _num_segment; segid++) { + auto dst_segid = _num_segcompacted.load(); RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++)); + if (_segcompaction_worker->need_convert_delete_bitmap()) { + _segcompaction_worker->convert_segment_delete_bitmap( + _context.mow_context->delete_bitmap, segid, dst_segid); + } } return Status::OK(); } @@ -584,6 +600,20 @@ Status BetaRowsetWriter::_close_file_writers() { RETURN_NOT_OK_STATUS_WITH_WARN(_segcompaction_worker->get_file_writer()->close(), "close segment compaction worker failed"); } + // process delete bitmap for mow table + if (is_segcompacted() && _segcompaction_worker->need_convert_delete_bitmap()) { + auto converted_delete_bitmap = _segcompaction_worker->get_converted_delete_bitmap(); + // which means the segment compaction is triggerd + if (converted_delete_bitmap != nullptr) { + RowsetIdUnorderedSet rowsetids; + rowsetids.insert(rowset_id()); + context().tablet->add_sentinel_mark_to_delete_bitmap(converted_delete_bitmap.get(), + rowsetids); + context().mow_context->delete_bitmap->remove({rowset_id(), 0, 0}, + {rowset_id(), UINT32_MAX, INT64_MAX}); + context().mow_context->delete_bitmap->merge(*converted_delete_bitmap); + } + } } return Status::OK(); } @@ -616,7 +646,7 @@ int64_t BaseBetaRowsetWriter::_num_seg() const { } int64_t BetaRowsetWriter::_num_seg() const { - return _is_segcompacted() ? _num_segcompacted : _num_segment; + return is_segcompacted() ? _num_segcompacted : _num_segment; } // update tablet schema when meet variant columns, before commit_txn diff --git a/be/src/olap/rowset/beta_rowset_writer.h b/be/src/olap/rowset/beta_rowset_writer.h index 3e285c7e508fb7..b897d96c9b2293 100644 --- a/be/src/olap/rowset/beta_rowset_writer.h +++ b/be/src/olap/rowset/beta_rowset_writer.h @@ -205,6 +205,8 @@ class BetaRowsetWriter : public BaseBetaRowsetWriter { std::unique_ptr* writer, uint64_t index_size, KeyBoundsPB& key_bounds); + bool is_segcompacted() const { return _num_segcompacted > 0; } + private: Status _generate_delete_bitmap(int32_t segment_id) override; @@ -220,7 +222,6 @@ class BetaRowsetWriter : public BaseBetaRowsetWriter { Status _segcompaction_rename_last_segments(); Status _load_noncompacted_segment(segment_v2::SegmentSharedPtr& segment, int32_t segment_id); Status _find_longest_consecutive_small_segment(SegCompactionCandidatesSharedPtr& segments); - bool _is_segcompacted() const { return _num_segcompacted > 0; } bool _check_and_set_is_doing_segcompaction(); Status _rename_compacted_segments(int64_t begin, int64_t end); Status _rename_compacted_segment_plain(uint64_t seg_id); diff --git a/be/src/olap/rowset/segcompaction.cpp b/be/src/olap/rowset/segcompaction.cpp index 1c152c75c0fb43..d6754ca224dcc4 100644 --- a/be/src/olap/rowset/segcompaction.cpp +++ b/be/src/olap/rowset/segcompaction.cpp @@ -76,11 +76,14 @@ Status SegcompactionWorker::_get_segcompaction_reader( std::vector& return_columns, std::unique_ptr* reader) { auto ctx = _writer->_context; + bool record_rowids = need_convert_delete_bitmap() && is_key; StorageReadOptions read_options; read_options.stats = stat; read_options.use_page_cache = false; read_options.tablet_schema = ctx.tablet_schema; + read_options.record_rowids = record_rowids; std::vector> seg_iterators; + std::map segment_rows; for (auto& seg_ptr : *segments) { std::unique_ptr iter; auto s = seg_ptr->new_iterator(schema, read_options, &iter); @@ -89,6 +92,10 @@ Status SegcompactionWorker::_get_segcompaction_reader( s.to_string()); } seg_iterators.push_back(std::move(iter)); + segment_rows.emplace(seg_ptr->id(), seg_ptr->num_rows()); + } + if (record_rowids && _rowid_conversion != nullptr) { + _rowid_conversion->reset_segment_map(segment_rows); } *reader = std::unique_ptr { @@ -103,6 +110,7 @@ Status SegcompactionWorker::_get_segcompaction_reader( reader_params.return_columns = return_columns; reader_params.is_key_column_group = is_key; reader_params.use_page_cache = false; + reader_params.record_rowids = record_rowids; return (*reader)->init(reader_params, nullptr); } @@ -232,6 +240,9 @@ Status SegcompactionWorker::_do_compact_segments(SegCompactionCandidatesSharedPt DCHECK(ctx.tablet); auto tablet = std::static_pointer_cast(ctx.tablet); + if (need_convert_delete_bitmap() && _rowid_conversion == nullptr) { + _rowid_conversion = std::make_unique(_writer->rowset_id()); + } std::vector> column_groups; Merger::vertical_split_columns(ctx.tablet_schema, &column_groups); @@ -262,8 +273,8 @@ Status SegcompactionWorker::_do_compact_segments(SegCompactionCandidatesSharedPt Merger::Statistics merger_stats; RETURN_IF_ERROR(Merger::vertical_compact_one_group( tablet, ReaderType::READER_SEGMENT_COMPACTION, ctx.tablet_schema, is_key, - column_ids, &row_sources_buf, *reader, *writer, INT_MAX, &merger_stats, &index_size, - key_bounds)); + column_ids, &row_sources_buf, *reader, *writer, &merger_stats, &index_size, + key_bounds, _rowid_conversion.get())); total_index_size += index_size; if (is_key) { RETURN_IF_ERROR(row_sources_buf.flush()); @@ -289,6 +300,10 @@ Status SegcompactionWorker::_do_compact_segments(SegCompactionCandidatesSharedPt } RETURN_IF_ERROR(_delete_original_segments(begin, end)); + if (_rowid_conversion != nullptr) { + convert_segment_delete_bitmap(ctx.mow_context->delete_bitmap, begin, end, + _writer->_num_segcompacted); + } RETURN_IF_ERROR(_writer->_rename_compacted_segments(begin, end)); if (VLOG_DEBUG_IS_ON) { @@ -349,6 +364,54 @@ void SegcompactionWorker::compact_segments(SegCompactionCandidatesSharedPtr segm _is_compacting_state_mutable = true; } +bool SegcompactionWorker::need_convert_delete_bitmap() { + if (_writer == nullptr) { + return false; + } + auto tablet = _writer->context().tablet; + return tablet != nullptr && tablet->keys_type() == KeysType::UNIQUE_KEYS && + tablet->enable_unique_key_merge_on_write() && + tablet->tablet_schema()->has_sequence_col(); +} + +void SegcompactionWorker::convert_segment_delete_bitmap(DeleteBitmapPtr src_delete_bitmap, + uint32_t src_seg_id, uint32_t dest_seg_id) { + // lazy init + if (nullptr == _converted_delete_bitmap) { + _converted_delete_bitmap = std::make_shared(_writer->context().tablet_id); + } + auto rowset_id = _writer->context().rowset_id; + const auto* seg_map = + src_delete_bitmap->get({rowset_id, src_seg_id, DeleteBitmap::TEMP_VERSION_COMMON}); + _converted_delete_bitmap->set({rowset_id, dest_seg_id, DeleteBitmap::TEMP_VERSION_COMMON}, + *seg_map); +} + +void SegcompactionWorker::convert_segment_delete_bitmap(DeleteBitmapPtr src_delete_bitmap, + uint32_t src_begin, uint32_t src_end, + uint32_t dst_seg_id) { + // lazy init + if (nullptr == _converted_delete_bitmap) { + _converted_delete_bitmap = std::make_shared(_writer->context().tablet_id); + } + auto rowset_id = _writer->context().rowset_id; + RowLocation src(rowset_id, 0, 0); + for (uint32_t seg_id = src_begin; seg_id <= src_end; seg_id++) { + const auto* seg_map = + src_delete_bitmap->get({rowset_id, seg_id, DeleteBitmap::TEMP_VERSION_COMMON}); + src.segment_id = seg_id; + for (unsigned int row_id : *seg_map) { + src.row_id = row_id; + auto dst_row_id = _rowid_conversion->get(src); + if (dst_row_id < 0) { + continue; + } + _converted_delete_bitmap->add( + {rowset_id, dst_seg_id, DeleteBitmap::TEMP_VERSION_COMMON}, dst_row_id); + } + } +} + bool SegcompactionWorker::cancel() { // return true if the task is canncellable (actual compaction is not started) // return false when the task is not cancellable (it is in the middle of segcompaction) diff --git a/be/src/olap/rowset/segcompaction.h b/be/src/olap/rowset/segcompaction.h index 5aef89992d30b8..67dd6889aadd72 100644 --- a/be/src/olap/rowset/segcompaction.h +++ b/be/src/olap/rowset/segcompaction.h @@ -23,6 +23,7 @@ #include "common/status.h" #include "io/fs/file_reader_writer_fwd.h" #include "olap/merger.h" +#include "olap/simple_rowid_conversion.h" #include "olap/tablet.h" #include "segment_v2/segment.h" @@ -51,6 +52,14 @@ class SegcompactionWorker { void compact_segments(SegCompactionCandidatesSharedPtr segments); + bool need_convert_delete_bitmap(); + + void convert_segment_delete_bitmap(DeleteBitmapPtr src_delete_bitmap, uint32_t src_seg_id, + uint32_t dest_seg_id); + void convert_segment_delete_bitmap(DeleteBitmapPtr src_delete_bitmap, uint32_t src_begin, + uint32_t src_end, uint32_t dest_seg_id); + DeleteBitmapPtr get_converted_delete_bitmap() { return _converted_delete_bitmap; } + io::FileWriterPtr& get_file_writer() { return _file_writer; } // set the cancel flag, tasks already started will not be cancelled. @@ -78,6 +87,10 @@ class SegcompactionWorker { BetaRowsetWriter* _writer = nullptr; io::FileWriterPtr _file_writer; + // for unique key mow table + std::unique_ptr _rowid_conversion; + DeleteBitmapPtr _converted_delete_bitmap; + // the state is not mutable when 1)actual compaction operation started or 2) cancelled std::atomic _is_compacting_state_mutable = true; }; diff --git a/be/src/olap/simple_rowid_conversion.h b/be/src/olap/simple_rowid_conversion.h new file mode 100644 index 00000000000000..1a89b01838fe8c --- /dev/null +++ b/be/src/olap/simple_rowid_conversion.h @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +#include "olap/olap_common.h" +#include "olap/utils.h" + +namespace doris { + +// Simple verion of rowid conversion, for segcompaction +// convert rows from several segments to rows in 1 segment +class SimpleRowIdConversion { +public: + SimpleRowIdConversion(const RowsetId& rowset_id) : _rowst_id(rowset_id) {}; + ~SimpleRowIdConversion() = default; + + // resize segment rowid map to its rows num + void reset_segment_map(const std::map& num_rows) { + _cur_dst_segment_rowid = 0; + for (auto seg_rows : num_rows) { + _segments_rowid_map.emplace(seg_rows.first, + std::vector(seg_rows.second, UINT32_MAX)); + } + } + + // add row id to the map + void add(const std::vector& rss_row_ids) { + for (auto& item : rss_row_ids) { + if (item.row_id == -1) { + continue; + } + DCHECK(_segments_rowid_map.find(item.segment_id) != _segments_rowid_map.end() && + _segments_rowid_map[item.segment_id].size() > item.row_id); + _segments_rowid_map[item.segment_id][item.row_id] = _cur_dst_segment_rowid++; + } + } + + // get destination RowLocation + // return non-zero if the src RowLocation does not exist + int get(const RowLocation& src) const { + auto it = _segments_rowid_map.find(src.segment_id); + if (it == _segments_rowid_map.end()) { + return -1; + } + const auto& rowid_map = it->second; + if (src.row_id >= rowid_map.size() || UINT32_MAX == rowid_map[src.row_id]) { + return -1; + } + + return rowid_map[src.row_id]; + } + +private: + // key: index indicates src segment. + // value: index indicates row id of source segment, value indicates row id of destination + // segment. UINT32_MAX indicates current row not exist. + std::map> _segments_rowid_map; + + // dst rowset id + RowsetId _rowst_id; + + // current rowid of dst segment + std::uint32_t _cur_dst_segment_rowid = 0; +}; + +} // namespace doris diff --git a/be/src/vec/olap/vertical_block_reader.cpp b/be/src/vec/olap/vertical_block_reader.cpp index 58a2332d5a8d5d..c16cb1f1bc23cf 100644 --- a/be/src/vec/olap/vertical_block_reader.cpp +++ b/be/src/vec/olap/vertical_block_reader.cpp @@ -132,6 +132,7 @@ Status VerticalBlockReader::_init_collect_iter(const ReaderParams& read_params, _reader_context.need_ordered_result = true; // TODO: should it be? _reader_context.is_unique = tablet()->keys_type() == UNIQUE_KEYS; _reader_context.is_key_column_group = read_params.is_key_column_group; + _reader_context.record_rowids = read_params.record_rowids; } // build heap if key column iterator or build vertical merge iterator if value column diff --git a/be/test/common/status_test.cpp b/be/test/common/status_test.cpp index c1197dad0b1628..e5477db1127371 100644 --- a/be/test/common/status_test.cpp +++ b/be/test/common/status_test.cpp @@ -50,8 +50,6 @@ TEST_F(StatusTest, OK) { TEST_F(StatusTest, TStatusCodeWithStatus) { // The definition in status.h //extern ErrorCode::ErrorCodeState error_states[ErrorCode::MAX_ERROR_CODE_DEFINE_NUM]; - extern ErrorCode::ErrorCodeState error_states; - extern ErrorCode::ErrorCodeInitializer error_code_init; // The definition in Status_types.h extern const std::map _TStatusCode_VALUES_TO_NAMES; ErrorCode::error_code_init.check_init(); diff --git a/be/test/olap/segcompaction_mow_test.cpp b/be/test/olap/segcompaction_mow_test.cpp new file mode 100644 index 00000000000000..41e8ef74ed61cb --- /dev/null +++ b/be/test/olap/segcompaction_mow_test.cpp @@ -0,0 +1,890 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include +#include +#include +#include + +#include "common/config.h" +#include "gen_cpp/AgentService_types.h" +#include "gen_cpp/olap_file.pb.h" +#include "io/fs/local_file_system.h" +#include "olap/data_dir.h" +#include "olap/row_cursor.h" +#include "olap/rowset/beta_rowset_reader.h" +#include "olap/rowset/beta_rowset_writer.h" +#include "olap/rowset/rowset_factory.h" +#include "olap/rowset/rowset_reader_context.h" +#include "olap/rowset/rowset_writer.h" +#include "olap/rowset/rowset_writer_context.h" +#include "olap/storage_engine.h" +#include "olap/tablet_meta.h" +#include "olap/tablet_schema.h" +#include "olap/utils.h" +#include "runtime/exec_env.h" +#include "runtime/memory/mem_tracker.h" +#include "util/slice.h" + +namespace doris { +using namespace ErrorCode; + +static const uint32_t MAX_PATH_LEN = 1024; +static const uint32_t TABLET_ID = 12345; +static StorageEngine* s_engine; +static const std::string lTestDir = "./data_test/data/segcompaction_mow_test"; + +class SegCompactionMoWTest : public ::testing::TestWithParam { +public: + SegCompactionMoWTest() = default; + + void SetUp() { + config::enable_segcompaction = true; + config::tablet_map_shard_size = 1; + config::txn_map_shard_size = 1; + config::txn_shard_size = 1; + + char buffer[MAX_PATH_LEN]; + EXPECT_NE(getcwd(buffer, MAX_PATH_LEN), nullptr); + config::storage_root_path = std::string(buffer) + "/data_test"; + + auto st = io::global_local_filesystem()->delete_directory(config::storage_root_path); + ASSERT_TRUE(st.ok()) << st; + st = io::global_local_filesystem()->create_directory(config::storage_root_path); + ASSERT_TRUE(st.ok()) << st; + + std::vector paths; + paths.emplace_back(config::storage_root_path, -1); + + doris::EngineOptions options; + options.store_paths = paths; + + auto engine = std::make_unique(options); + s_engine = engine.get(); + ExecEnv::GetInstance()->set_storage_engine(std::move(engine)); + + Status s = s_engine->open(); + EXPECT_TRUE(s.ok()) << s.to_string(); + + _data_dir = std::make_unique(*s_engine, lTestDir); + static_cast(_data_dir->update_capacity()); + + EXPECT_TRUE(io::global_local_filesystem()->create_directory(lTestDir).ok()); + + s = s_engine->start_bg_threads(); + EXPECT_TRUE(s.ok()) << s.to_string(); + } + + void TearDown() { config::enable_segcompaction = false; } + +protected: + OlapReaderStatistics _stats; + + bool check_dir(std::vector& vec) { + std::vector result; + for (const auto& entry : std::filesystem::directory_iterator(lTestDir)) { + result.push_back(std::filesystem::path(entry.path()).filename()); + } + + LOG(INFO) << "expected ls:" << std::endl; + for (auto& i : vec) { + LOG(INFO) << i; + } + LOG(INFO) << "acutal ls:" << std::endl; + for (auto& i : result) { + LOG(INFO) << i; + } + + if (result.size() != vec.size()) { + return false; + } else { + for (auto& i : vec) { + if (std::find(result.begin(), result.end(), i) == result.end()) { + return false; + } + } + } + return true; + } + + // (k1 int, k2 varchar(20), k3 int) keys (k1, k2) + void create_tablet_schema(TabletSchemaSPtr tablet_schema) { + TabletSchemaPB tablet_schema_pb; + tablet_schema_pb.set_keys_type(UNIQUE_KEYS); + tablet_schema_pb.set_num_short_key_columns(2); + tablet_schema_pb.set_num_rows_per_row_block(1024); + tablet_schema_pb.set_compress_kind(COMPRESS_NONE); + tablet_schema_pb.set_next_column_unique_id(5); + // add seq column so that segcompaction will process delete bitmap + tablet_schema_pb.set_sequence_col_idx(3); + + ColumnPB* column_1 = tablet_schema_pb.add_column(); + column_1->set_unique_id(1); + column_1->set_name("k1"); + column_1->set_type("INT"); + column_1->set_is_key(true); + column_1->set_length(4); + column_1->set_index_length(4); + column_1->set_is_nullable(true); + column_1->set_is_bf_column(false); + + ColumnPB* column_2 = tablet_schema_pb.add_column(); + column_2->set_unique_id(2); + column_2->set_name("k2"); + column_2->set_type( + "INT"); // TODO change to varchar(20) when dict encoding for string is supported + column_2->set_length(4); + column_2->set_index_length(4); + column_2->set_is_nullable(true); + column_2->set_is_key(true); + column_2->set_is_nullable(true); + column_2->set_is_bf_column(false); + + ColumnPB* v_column = tablet_schema_pb.add_column(); + v_column->set_unique_id(3); + v_column->set_name(fmt::format("v1")); + v_column->set_type("INT"); + v_column->set_length(4); + v_column->set_is_key(false); + v_column->set_is_nullable(false); + v_column->set_is_bf_column(false); + v_column->set_default_value(std::to_string(10)); + v_column->set_aggregation("NONE"); + + ColumnPB* seq_column = tablet_schema_pb.add_column(); + seq_column->set_unique_id(4); + seq_column->set_name(SEQUENCE_COL); + seq_column->set_type("INT"); + seq_column->set_length(4); + seq_column->set_is_key(false); + seq_column->set_is_nullable(false); + seq_column->set_is_bf_column(false); + seq_column->set_default_value(std::to_string(10)); + seq_column->set_aggregation("NONE"); + + tablet_schema->init_from_pb(tablet_schema_pb); + } + + // use different id to avoid conflict + void create_rowset_writer_context(int64_t id, TabletSchemaSPtr tablet_schema, + RowsetWriterContext* rowset_writer_context) { + RowsetId rowset_id; + rowset_id.init(id); + // rowset_writer_context->data_dir = _data_dir.get(); + rowset_writer_context->rowset_id = rowset_id; + rowset_writer_context->tablet_id = TABLET_ID; + rowset_writer_context->tablet_schema_hash = 1111; + rowset_writer_context->partition_id = 10; + rowset_writer_context->rowset_type = BETA_ROWSET; + rowset_writer_context->tablet_path = lTestDir; + rowset_writer_context->rowset_state = VISIBLE; + rowset_writer_context->tablet_schema = tablet_schema; + rowset_writer_context->version.first = 10; + rowset_writer_context->version.second = 10; + + TabletMetaSharedPtr tablet_meta = std::make_shared(); + tablet_meta->_tablet_id = TABLET_ID; + static_cast(tablet_meta->set_partition_id(10000)); + tablet_meta->_schema = tablet_schema; + tablet_meta->_enable_unique_key_merge_on_write = true; + auto tablet = std::make_shared(*s_engine, tablet_meta, _data_dir.get(), "test_str"); + // tablet->key + rowset_writer_context->tablet = tablet; + } + + void create_and_init_rowset_reader(Rowset* rowset, RowsetReaderContext& context, + RowsetReaderSharedPtr* result) { + auto s = rowset->create_reader(result); + EXPECT_EQ(Status::OK(), s); + EXPECT_TRUE(*result != nullptr); + + s = (*result)->init(&context); + EXPECT_EQ(Status::OK(), s); + } + + bool check_data_read_with_delete_bitmap(TabletSchemaSPtr tablet_schema, + DeleteBitmapPtr delete_bitmap, RowsetSharedPtr rowset, + int expect_total_rows, int rows_mark_deleted) { + RowsetReaderContext reader_context; + reader_context.tablet_schema = tablet_schema; + // use this type to avoid cache from other ut + reader_context.reader_type = ReaderType::READER_QUERY; + reader_context.need_ordered_result = true; + std::vector return_columns = {0, 1, 2}; + reader_context.return_columns = &return_columns; + reader_context.stats = &_stats; + reader_context.delete_bitmap = delete_bitmap.get(); + + std::vector segment_num_rows; + Status s; + + // without predicates + { + RowsetReaderSharedPtr rowset_reader; + create_and_init_rowset_reader(rowset.get(), reader_context, &rowset_reader); + + uint32_t num_rows_read = 0; + bool eof = false; + while (!eof) { + std::shared_ptr output_block = + std::make_shared( + tablet_schema->create_block(return_columns)); + s = rowset_reader->next_block(output_block.get()); + if (s != Status::OK()) { + eof = true; + } + EXPECT_EQ(return_columns.size(), output_block->columns()); + for (int i = 0; i < output_block->rows(); ++i) { + vectorized::ColumnPtr col0 = output_block->get_by_position(0).column; + vectorized::ColumnPtr col1 = output_block->get_by_position(1).column; + vectorized::ColumnPtr col2 = output_block->get_by_position(2).column; + auto field1 = (*col0)[i]; + auto field2 = (*col1)[i]; + auto field3 = (*col2)[i]; + uint32_t k1 = *reinterpret_cast((char*)(&field1)); + uint32_t k2 = *reinterpret_cast((char*)(&field2)); + uint32_t v3 = *reinterpret_cast((char*)(&field3)); + EXPECT_EQ(100 * v3 + k2, k1); + EXPECT_TRUE(v3 % 3 != 0); // all v3%3==0 is deleted + num_rows_read++; + } + output_block->clear(); + } + EXPECT_EQ(Status::Error(""), s); + EXPECT_EQ(rowset->rowset_meta()->num_rows(), expect_total_rows); + EXPECT_EQ(num_rows_read, expect_total_rows - rows_mark_deleted); + EXPECT_TRUE(rowset_reader->get_segment_num_rows(&segment_num_rows).ok()); + size_t total_num_rows = 0; + for (const auto& i : segment_num_rows) { + total_num_rows += i; + } + EXPECT_EQ(total_num_rows, expect_total_rows); + } + return true; + } + +private: + std::unique_ptr _data_dir; +}; + +TEST_P(SegCompactionMoWTest, SegCompactionThenRead) { + std::string delete_ratio = GetParam(); + config::enable_segcompaction = true; + Status s; + TabletSchemaSPtr tablet_schema = std::make_shared(); + create_tablet_schema(tablet_schema); + + RowsetSharedPtr rowset; + const int num_segments = 15; + const uint32_t rows_per_segment = 4096; + config::segcompaction_candidate_max_rows = 6000; // set threshold above + // rows_per_segment + config::segcompaction_batch_size = 10; + std::vector segment_num_rows; + DeleteBitmapPtr delete_bitmap = std::make_shared(TABLET_ID); + uint32_t rows_mark_deleted = 0; + { // write `num_segments * rows_per_segment` rows to rowset + RowsetWriterContext writer_context; + int raw_rsid = rand(); + create_rowset_writer_context(raw_rsid, tablet_schema, &writer_context); + RowsetIdUnorderedSet rsids; + std::vector rowset_ptrs; + writer_context.mow_context = + std::make_shared(1, 1, rsids, rowset_ptrs, delete_bitmap); + auto rowset_id = writer_context.rowset_id; + + auto res = RowsetFactory::create_rowset_writer(*s_engine, writer_context, false); + EXPECT_TRUE(res.has_value()) << res.error(); + auto rowset_writer = std::move(res).value(); + EXPECT_EQ(Status::OK(), s); + // for segment "i", row "rid" + // k1 := rid*10 + i + // k2 := k1 * 10 + // k3 := rid + for (int i = 0; i < num_segments; ++i) { + vectorized::Block block = tablet_schema->create_block(); + auto columns = block.mutate_columns(); + for (int rid = 0; rid < rows_per_segment; ++rid) { + uint32_t k1 = rid * 100 + i; + uint32_t k2 = i; + uint32_t k3 = rid; + uint32_t seq = 0; + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); + columns[3]->insert_data((const char*)&seq, sizeof(seq)); + if (delete_ratio == "full") { // delete all data + writer_context.mow_context->delete_bitmap->add( + {rowset_id, i, DeleteBitmap::TEMP_VERSION_COMMON}, rid); + rows_mark_deleted++; + } else { + // mark delete every 3 rows + if (rid % 3 == 0) { + writer_context.mow_context->delete_bitmap->add( + {rowset_id, i, DeleteBitmap::TEMP_VERSION_COMMON}, rid); + rows_mark_deleted++; + } + } + } + s = rowset_writer->add_block(&block); + EXPECT_TRUE(s.ok()); + s = rowset_writer->flush(); + EXPECT_EQ(Status::OK(), s); + sleep(1); + } + + size_t total_cardinality1 = 0; + for (auto entry : delete_bitmap->delete_bitmap) { + total_cardinality1 += entry.second.cardinality(); + } + EXPECT_EQ(num_segments, delete_bitmap->delete_bitmap.size()); + EXPECT_EQ(Status::OK(), rowset_writer->build(rowset)); + std::vector ls; + ls.push_back(fmt::format("{}_0.dat", raw_rsid)); + ls.push_back(fmt::format("{}_1.dat", raw_rsid)); + ls.push_back(fmt::format("{}_2.dat", raw_rsid)); + ls.push_back(fmt::format("{}_3.dat", raw_rsid)); + ls.push_back(fmt::format("{}_4.dat", raw_rsid)); + ls.push_back(fmt::format("{}_5.dat", raw_rsid)); + ls.push_back(fmt::format("{}_6.dat", raw_rsid)); + EXPECT_TRUE(check_dir(ls)); + // 7 segments plus 1 sentinel mark + size_t total_cardinality2 = 0; + for (auto entry : delete_bitmap->delete_bitmap) { + if (std::get<1>(entry.first) == DeleteBitmap::INVALID_SEGMENT_ID) { + continue; + } + total_cardinality2 += entry.second.cardinality(); + } + // 7 segments + 1 sentinel mark + EXPECT_EQ(8, delete_bitmap->delete_bitmap.size()); + EXPECT_EQ(total_cardinality1, total_cardinality2); + } + + EXPECT_TRUE(check_data_read_with_delete_bitmap(tablet_schema, delete_bitmap, rowset, + num_segments * rows_per_segment, + rows_mark_deleted)); +} + +TEST_F(SegCompactionMoWTest, SegCompactionInterleaveWithBig_ooooOOoOooooooooO) { + config::enable_segcompaction = true; + Status s; + TabletSchemaSPtr tablet_schema = std::make_shared(); + create_tablet_schema(tablet_schema); + + RowsetSharedPtr rowset; + config::segcompaction_candidate_max_rows = 6000; // set threshold above + // rows_per_segment + DeleteBitmapPtr delete_bitmap = std::make_shared(TABLET_ID); + uint32_t rows_mark_deleted = 0; + uint32_t total_written_rows = 0; + std::vector segment_num_rows; + { // write `num_segments * rows_per_segment` rows to rowset + RowsetWriterContext writer_context; + create_rowset_writer_context(20048, tablet_schema, &writer_context); + RowsetIdUnorderedSet rsids; + std::vector rowset_ptrs; + writer_context.mow_context = + std::make_shared(1, 1, rsids, rowset_ptrs, delete_bitmap); + auto rowset_id = writer_context.rowset_id; + + auto res = RowsetFactory::create_rowset_writer(*s_engine, writer_context, false); + EXPECT_TRUE(res.has_value()) << res.error(); + auto rowset_writer = std::move(res).value(); + EXPECT_EQ(Status::OK(), s); + + // for segment "i", row "rid" + // k1 := rid*10 + i + // k2 := k1 * 10 + // k3 := 4096 * i + rid + int num_segments = 4; + uint32_t rows_per_segment = 4096; + int segid = 0; + for (int i = 0; i < num_segments; ++i) { + vectorized::Block block = tablet_schema->create_block(); + auto columns = block.mutate_columns(); + for (int rid = 0; rid < rows_per_segment; ++rid) { + uint32_t k1 = rid * 100 + i; + uint32_t k2 = i; + uint32_t k3 = rid; + uint32_t seq = 0; + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); + columns[3]->insert_data((const char*)&seq, sizeof(seq)); + // mark delete every 3 rows + if (rid % 3 == 0) { + writer_context.mow_context->delete_bitmap->add( + {rowset_id, segid, DeleteBitmap::TEMP_VERSION_COMMON}, rid); + rows_mark_deleted++; + } + } + s = rowset_writer->add_block(&block); + EXPECT_TRUE(s.ok()); + s = rowset_writer->flush(); + EXPECT_EQ(Status::OK(), s); + segid++; + total_written_rows += rows_per_segment; + } + num_segments = 2; + rows_per_segment = 6400; + for (int i = 0; i < num_segments; ++i) { + vectorized::Block block = tablet_schema->create_block(); + auto columns = block.mutate_columns(); + for (int rid = 0; rid < rows_per_segment; ++rid) { + uint32_t k1 = rid * 100 + i; + uint32_t k2 = i; + uint32_t k3 = rid; + uint32_t seq = 0; + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); + columns[3]->insert_data((const char*)&seq, sizeof(seq)); + // mark delete every 3 rows + if (rid % 3 == 0) { + writer_context.mow_context->delete_bitmap->add( + {rowset_id, segid, DeleteBitmap::TEMP_VERSION_COMMON}, rid); + rows_mark_deleted++; + } + } + s = rowset_writer->add_block(&block); + EXPECT_TRUE(s.ok()); + s = rowset_writer->flush(); + EXPECT_EQ(Status::OK(), s); + segid++; + total_written_rows += rows_per_segment; + } + num_segments = 1; + rows_per_segment = 4096; + for (int i = 0; i < num_segments; ++i) { + vectorized::Block block = tablet_schema->create_block(); + auto columns = block.mutate_columns(); + for (int rid = 0; rid < rows_per_segment; ++rid) { + uint32_t k1 = rid * 100 + i; + uint32_t k2 = i; + uint32_t k3 = rid; + uint32_t seq = 0; + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); + columns[3]->insert_data((const char*)&seq, sizeof(seq)); + // mark delete every 3 rows + if (rid % 3 == 0) { + writer_context.mow_context->delete_bitmap->add( + {rowset_id, segid, DeleteBitmap::TEMP_VERSION_COMMON}, rid); + rows_mark_deleted++; + } + } + s = rowset_writer->add_block(&block); + EXPECT_TRUE(s.ok()); + s = rowset_writer->flush(); + EXPECT_EQ(Status::OK(), s); + segid++; + total_written_rows += rows_per_segment; + } + num_segments = 1; + rows_per_segment = 6400; + for (int i = 0; i < num_segments; ++i) { + vectorized::Block block = tablet_schema->create_block(); + auto columns = block.mutate_columns(); + for (int rid = 0; rid < rows_per_segment; ++rid) { + uint32_t k1 = rid * 100 + i; + uint32_t k2 = i; + uint32_t k3 = rid; + uint32_t seq = 0; + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); + columns[3]->insert_data((const char*)&seq, sizeof(seq)); + // mark delete every 3 rows + if (rid % 3 == 0) { + writer_context.mow_context->delete_bitmap->add( + {rowset_id, segid, DeleteBitmap::TEMP_VERSION_COMMON}, rid); + rows_mark_deleted++; + } + } + s = rowset_writer->add_block(&block); + EXPECT_TRUE(s.ok()); + s = rowset_writer->flush(); + EXPECT_EQ(Status::OK(), s); + segid++; + total_written_rows += rows_per_segment; + } + num_segments = 8; + rows_per_segment = 4096; + std::map unique_keys; + for (int i = 0; i < num_segments; ++i) { + vectorized::Block block = tablet_schema->create_block(); + auto columns = block.mutate_columns(); + for (int rid = 0; rid < rows_per_segment; ++rid) { + // generate some duplicate rows, segment compaction will merge them + int rand_i = rand() % (num_segments - 3); + uint32_t k1 = rid * 100 + rand_i; + uint32_t k2 = rand_i; + uint32_t k3 = rid; + uint32_t seq = 0; + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); + columns[3]->insert_data((const char*)&seq, sizeof(seq)); + // mark delete every 3 rows + if (rid % 3 == 0) { + writer_context.mow_context->delete_bitmap->add( + {rowset_id, segid, DeleteBitmap::TEMP_VERSION_COMMON}, rid); + } + unique_keys.emplace(k1, rid); + } + s = rowset_writer->add_block(&block); + EXPECT_TRUE(s.ok()); + s = rowset_writer->flush(); + EXPECT_EQ(Status::OK(), s); + sleep(1); + segid++; + } + // these 8 segments should be compacted to 1 segment finally + // so the finally written rows should be the unique rows after compaction + total_written_rows += unique_keys.size(); + for (auto entry : unique_keys) { + if (entry.second % 3 == 0) { + rows_mark_deleted++; + } + } + + num_segments = 1; + rows_per_segment = 6400; + for (int i = 0; i < num_segments; ++i) { + vectorized::Block block = tablet_schema->create_block(); + auto columns = block.mutate_columns(); + for (int rid = 0; rid < rows_per_segment; ++rid) { + uint32_t k1 = rid * 100 + i; + uint32_t k2 = i; + uint32_t k3 = rid; + uint32_t seq = 0; + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); + columns[3]->insert_data((const char*)&seq, sizeof(seq)); + // mark delete every 3 rows + if (rid % 3 == 0) { + writer_context.mow_context->delete_bitmap->add( + {rowset_id, segid, DeleteBitmap::TEMP_VERSION_COMMON}, rid); + rows_mark_deleted++; + } + } + s = rowset_writer->add_block(&block); + EXPECT_TRUE(s.ok()); + s = rowset_writer->flush(); + EXPECT_EQ(Status::OK(), s); + sleep(1); + segid++; + total_written_rows += rows_per_segment; + } + + EXPECT_EQ(Status::OK(), rowset_writer->build(rowset)); + std::vector ls; + // ooooOOoOooooooooO + ls.push_back("20048_0.dat"); // oooo + ls.push_back("20048_1.dat"); // O + ls.push_back("20048_2.dat"); // O + ls.push_back("20048_3.dat"); // o + ls.push_back("20048_4.dat"); // O + ls.push_back("20048_5.dat"); // oooooooo + ls.push_back("20048_6.dat"); // O + EXPECT_TRUE(check_dir(ls)); + // 7 segments + 1 sentinel mark + EXPECT_EQ(8, delete_bitmap->delete_bitmap.size()); + } + EXPECT_TRUE(check_data_read_with_delete_bitmap(tablet_schema, delete_bitmap, rowset, + total_written_rows, rows_mark_deleted)); +} + +TEST_F(SegCompactionMoWTest, SegCompactionInterleaveWithBig_OoOoO) { + config::enable_segcompaction = true; + Status s; + TabletSchemaSPtr tablet_schema = std::make_shared(); + create_tablet_schema(tablet_schema); + + RowsetSharedPtr rowset; + config::segcompaction_candidate_max_rows = 6000; // set threshold above + config::segcompaction_batch_size = 5; + std::vector segment_num_rows; + DeleteBitmapPtr delete_bitmap = std::make_shared(TABLET_ID); + uint32_t rows_mark_deleted = 0; + uint32_t total_written_rows = 0; + { // write `num_segments * rows_per_segment` rows to rowset + RowsetWriterContext writer_context; + create_rowset_writer_context(20049, tablet_schema, &writer_context); + RowsetIdUnorderedSet rsids; + std::vector rowset_ptrs; + writer_context.mow_context = + std::make_shared(1, 1, rsids, rowset_ptrs, delete_bitmap); + auto rowset_id = writer_context.rowset_id; + + auto res = RowsetFactory::create_rowset_writer(*s_engine, writer_context, false); + EXPECT_TRUE(res.has_value()) << res.error(); + auto rowset_writer = std::move(res).value(); + EXPECT_EQ(Status::OK(), s); + + // for segment "i", row "rid" + // k1 := rid*10 + i + // k2 := k1 * 10 + // k3 := 4096 * i + rid + int num_segments = 1; + uint32_t rows_per_segment = 6400; + int segid = 0; + for (int i = 0; i < num_segments; ++i) { + vectorized::Block block = tablet_schema->create_block(); + auto columns = block.mutate_columns(); + for (int rid = 0; rid < rows_per_segment; ++rid) { + uint32_t k1 = rid * 100 + i; + uint32_t k2 = i; + uint32_t k3 = rid; + uint32_t seq = 0; + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); + columns[3]->insert_data((const char*)&seq, sizeof(seq)); + // mark delete every 3 rows + if (rid % 3 == 0) { + writer_context.mow_context->delete_bitmap->add( + {rowset_id, segid, DeleteBitmap::TEMP_VERSION_COMMON}, rid); + rows_mark_deleted++; + } + } + s = rowset_writer->add_block(&block); + EXPECT_TRUE(s.ok()); + s = rowset_writer->flush(); + EXPECT_EQ(Status::OK(), s); + segid++; + total_written_rows += rows_per_segment; + } + num_segments = 1; + rows_per_segment = 4096; + for (int i = 0; i < num_segments; ++i) { + vectorized::Block block = tablet_schema->create_block(); + auto columns = block.mutate_columns(); + for (int rid = 0; rid < rows_per_segment; ++rid) { + uint32_t k1 = rid * 100 + i; + uint32_t k2 = i; + uint32_t k3 = rid; + uint32_t seq = 0; + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); + columns[3]->insert_data((const char*)&seq, sizeof(seq)); + // mark delete every 3 rows + if (rid % 3 == 0) { + writer_context.mow_context->delete_bitmap->add( + {rowset_id, segid, DeleteBitmap::TEMP_VERSION_COMMON}, rid); + rows_mark_deleted++; + } + } + s = rowset_writer->add_block(&block); + EXPECT_TRUE(s.ok()); + s = rowset_writer->flush(); + EXPECT_EQ(Status::OK(), s); + segid++; + total_written_rows += rows_per_segment; + } + num_segments = 1; + rows_per_segment = 6400; + for (int i = 0; i < num_segments; ++i) { + vectorized::Block block = tablet_schema->create_block(); + auto columns = block.mutate_columns(); + for (int rid = 0; rid < rows_per_segment; ++rid) { + uint32_t k1 = rid * 100 + i; + uint32_t k2 = i; + uint32_t k3 = rid; + uint32_t seq = 0; + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); + columns[3]->insert_data((const char*)&seq, sizeof(seq)); + // mark delete every 3 rows + if (rid % 3 == 0) { + writer_context.mow_context->delete_bitmap->add( + {rowset_id, segid, DeleteBitmap::TEMP_VERSION_COMMON}, rid); + rows_mark_deleted++; + } + } + s = rowset_writer->add_block(&block); + EXPECT_TRUE(s.ok()); + s = rowset_writer->flush(); + EXPECT_EQ(Status::OK(), s); + segid++; + total_written_rows += rows_per_segment; + } + num_segments = 1; + rows_per_segment = 4096; + for (int i = 0; i < num_segments; ++i) { + vectorized::Block block = tablet_schema->create_block(); + auto columns = block.mutate_columns(); + for (int rid = 0; rid < rows_per_segment; ++rid) { + uint32_t k1 = rid * 100 + i; + uint32_t k2 = i; + uint32_t k3 = rid; + uint32_t seq = 0; + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); + columns[3]->insert_data((const char*)&seq, sizeof(seq)); + // mark delete every 3 rows + if (rid % 3 == 0) { + writer_context.mow_context->delete_bitmap->add( + {rowset_id, segid, DeleteBitmap::TEMP_VERSION_COMMON}, rid); + rows_mark_deleted++; + } + } + s = rowset_writer->add_block(&block); + EXPECT_TRUE(s.ok()); + s = rowset_writer->flush(); + EXPECT_EQ(Status::OK(), s); + segid++; + total_written_rows += rows_per_segment; + } + num_segments = 1; + rows_per_segment = 6400; + for (int i = 0; i < num_segments; ++i) { + vectorized::Block block = tablet_schema->create_block(); + auto columns = block.mutate_columns(); + for (int rid = 0; rid < rows_per_segment; ++rid) { + uint32_t k1 = rid * 100 + i; + uint32_t k2 = i; + uint32_t k3 = rid; + uint32_t seq = 0; + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); + columns[3]->insert_data((const char*)&seq, sizeof(seq)); + // mark delete every 3 rows + if (rid % 3 == 0) { + writer_context.mow_context->delete_bitmap->add( + {rowset_id, segid, DeleteBitmap::TEMP_VERSION_COMMON}, rid); + rows_mark_deleted++; + } + } + s = rowset_writer->add_block(&block); + EXPECT_TRUE(s.ok()); + s = rowset_writer->flush(); + EXPECT_EQ(Status::OK(), s); + sleep(1); + segid++; + total_written_rows += rows_per_segment; + } + + EXPECT_EQ(Status::OK(), rowset_writer->build(rowset)); + std::vector ls; + ls.push_back("20049_0.dat"); // O + ls.push_back("20049_1.dat"); // o + ls.push_back("20049_2.dat"); // O + ls.push_back("20049_3.dat"); // o + ls.push_back("20049_4.dat"); // O + EXPECT_TRUE(check_dir(ls)); + } + + EXPECT_TRUE(check_data_read_with_delete_bitmap(tablet_schema, delete_bitmap, rowset, + total_written_rows, rows_mark_deleted)); +} + +TEST_F(SegCompactionMoWTest, SegCompactionNotTrigger) { + config::enable_segcompaction = true; + Status s; + TabletSchemaSPtr tablet_schema = std::make_shared(); + create_tablet_schema(tablet_schema); + + RowsetSharedPtr rowset; + const int num_segments = 8; + const uint32_t rows_per_segment = 4096; + config::segcompaction_candidate_max_rows = 6000; // set threshold above + // rows_per_segment + config::segcompaction_batch_size = 10; + std::vector segment_num_rows; + DeleteBitmapPtr delete_bitmap = std::make_shared(TABLET_ID); + uint32_t rows_mark_deleted = 0; + { // write `num_segments * rows_per_segment` rows to rowset + RowsetWriterContext writer_context; + create_rowset_writer_context(20050, tablet_schema, &writer_context); + RowsetIdUnorderedSet rsids; + std::vector rowset_ptrs; + writer_context.mow_context = + std::make_shared(1, 1, rsids, rowset_ptrs, delete_bitmap); + auto rowset_id = writer_context.rowset_id; + + auto res = RowsetFactory::create_rowset_writer(*s_engine, writer_context, false); + EXPECT_TRUE(res.has_value()) << res.error(); + auto rowset_writer = std::move(res).value(); + EXPECT_EQ(Status::OK(), s); + // for segment "i", row "rid" + // k1 := rid*10 + i + // k2 := k1 * 10 + // k3 := rid + for (int i = 0; i < num_segments; ++i) { + vectorized::Block block = tablet_schema->create_block(); + auto columns = block.mutate_columns(); + for (int rid = 0; rid < rows_per_segment; ++rid) { + uint32_t k1 = rid * 100 + i; + uint32_t k2 = i; + uint32_t k3 = rid; + uint32_t seq = 0; + columns[0]->insert_data((const char*)&k1, sizeof(k1)); + columns[1]->insert_data((const char*)&k2, sizeof(k2)); + columns[2]->insert_data((const char*)&k3, sizeof(k3)); + columns[3]->insert_data((const char*)&seq, sizeof(seq)); + // mark delete every 3 rows + if (rid % 3 == 0) { + writer_context.mow_context->delete_bitmap->add( + {rowset_id, i, DeleteBitmap::TEMP_VERSION_COMMON}, rid); + rows_mark_deleted++; + } + } + s = rowset_writer->add_block(&block); + EXPECT_TRUE(s.ok()); + s = rowset_writer->flush(); + EXPECT_EQ(Status::OK(), s); + sleep(1); + } + + EXPECT_EQ(num_segments, delete_bitmap->delete_bitmap.size()); + EXPECT_EQ(Status::OK(), rowset_writer->build(rowset)); + std::vector ls; + ls.push_back("20050_0.dat"); + ls.push_back("20050_1.dat"); + ls.push_back("20050_2.dat"); + ls.push_back("20050_3.dat"); + ls.push_back("20050_4.dat"); + ls.push_back("20050_5.dat"); + ls.push_back("20050_6.dat"); + ls.push_back("20050_7.dat"); + EXPECT_TRUE(check_dir(ls)); + EXPECT_EQ(num_segments, delete_bitmap->delete_bitmap.size()); + + EXPECT_FALSE(static_cast(rowset_writer.get())->is_segcompacted()); + } + + EXPECT_TRUE(check_data_read_with_delete_bitmap(tablet_schema, delete_bitmap, rowset, + num_segments * rows_per_segment, + rows_mark_deleted)); +} + +INSTANTIATE_TEST_SUITE_P(Params, SegCompactionMoWTest, + ::testing::ValuesIn(std::vector {"partial", "full"})); + +} // namespace doris + +// @brief Test Stub