Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[enhancement](load) optimize load string data and dict page write #9123

Merged
merged 2 commits into from
May 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 47 additions & 40 deletions be/src/olap/memtable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,28 @@ MemTable::MemTable(int64_t tablet_id, Schema* schema, const TabletSchema* tablet
_skip_list = nullptr;
_vec_row_comparator = std::make_shared<RowInBlockComparator>(_schema);
// TODO: Support ZOrderComparator in the future
_vec_skip_list = new VecTable(_vec_row_comparator.get(), _table_mem_pool.get(),
_keys_type == KeysType::DUP_KEYS);
_vec_skip_list = std::make_unique<VecTable>(
_vec_row_comparator.get(), _table_mem_pool.get(), _keys_type == KeysType::DUP_KEYS);
} else {
_vec_skip_list = nullptr;
if (_keys_type == KeysType::DUP_KEYS) {
_insert_fn = &MemTable::_insert_dup;
} else {
_insert_fn = &MemTable::_insert_agg;
}
if (_tablet_schema->has_sequence_col()) {
_aggregate_two_row_fn = &MemTable::_aggregate_two_row_with_sequence;
} else {
_aggregate_two_row_fn = &MemTable::_aggregate_two_row;
}
if (tablet_schema->sort_type() == SortType::ZORDER) {
_row_comparator = std::make_shared<TupleRowZOrderComparator>(
_schema, tablet_schema->sort_col_num());
} else {
_row_comparator = std::make_shared<RowCursorComparator>(_schema);
}
_skip_list = new Table(_row_comparator.get(), _table_mem_pool.get(),
_keys_type == KeysType::DUP_KEYS);
_skip_list = std::make_unique<Table>(_row_comparator.get(), _table_mem_pool.get(),
_keys_type == KeysType::DUP_KEYS);
}
}

Expand All @@ -86,9 +96,6 @@ void MemTable::_init_agg_functions(const vectorized::Block* block) {
}

MemTable::~MemTable() {
delete _skip_list;
delete _vec_skip_list;

std::for_each(_row_in_blocks.begin(), _row_in_blocks.end(), std::default_delete<RowInBlock>());
_mem_tracker->release(_mem_usage);
}
Expand Down Expand Up @@ -158,44 +165,42 @@ void MemTable::_insert_one_row_from_block(RowInBlock* row_in_block) {
}
}

void MemTable::insert(const Tuple* tuple) {
// For non-DUP models, for the data rows passed from the upper layer, when copying the data,
// we first allocate from _buffer_mem_pool, and then check whether it already exists in
// _skiplist. If it exists, we aggregate the new row into the row in skiplist.
// otherwise, we need to copy it into _table_mem_pool before we can insert it.
void MemTable::_insert_agg(const Tuple* tuple) {
yangzhg marked this conversation as resolved.
Show resolved Hide resolved
_rows++;
bool overwritten = false;
uint8_t* _tuple_buf = nullptr;
if (_keys_type == KeysType::DUP_KEYS) {
// Will insert directly, so use memory from _table_mem_pool
_tuple_buf = _table_mem_pool->allocate(_schema_size);
ContiguousRow row(_schema, _tuple_buf);
_tuple_to_row(tuple, &row, _table_mem_pool.get());
_skip_list->Insert((TableKey)_tuple_buf, &overwritten);
DCHECK(!overwritten) << "Duplicate key model meet overwrite in SkipList";
return;
}

// For non-DUP models, for the data rows passed from the upper layer, when copying the data,
// we first allocate from _buffer_mem_pool, and then check whether it already exists in
// _skiplist. If it exists, we aggregate the new row into the row in skiplist.
// otherwise, we need to copy it into _table_mem_pool before we can insert it.
_tuple_buf = _buffer_mem_pool->allocate(_schema_size);
ContiguousRow src_row(_schema, _tuple_buf);
uint8_t* tuple_buf = _buffer_mem_pool->allocate(_schema_size);
ContiguousRow src_row(_schema, tuple_buf);
_tuple_to_row(tuple, &src_row, _buffer_mem_pool.get());

bool is_exist = _skip_list->Find((TableKey)_tuple_buf, &_hint);
bool is_exist = _skip_list->Find((TableKey)tuple_buf, &_hint);
if (is_exist) {
_aggregate_two_row(src_row, _hint.curr->key);
(this->*_aggregate_two_row_fn)(src_row, _hint.curr->key);
} else {
_tuple_buf = _table_mem_pool->allocate(_schema_size);
ContiguousRow dst_row(_schema, _tuple_buf);
tuple_buf = _table_mem_pool->allocate(_schema_size);
ContiguousRow dst_row(_schema, tuple_buf);
_agg_object_pool.acquire_data(&_agg_buffer_pool);
copy_row_in_memtable(&dst_row, src_row, _table_mem_pool.get());
_skip_list->InsertWithHint((TableKey)_tuple_buf, is_exist, &_hint);
_skip_list->InsertWithHint((TableKey)tuple_buf, is_exist, &_hint);
}

// Make MemPool to be reusable, but does not free its memory
_buffer_mem_pool->clear();
_agg_buffer_pool.clear();
}

void MemTable::_insert_dup(const Tuple* tuple) {
_rows++;
bool overwritten = false;
uint8_t* tuple_buf = _table_mem_pool->allocate(_schema_size);
ContiguousRow row(_schema, tuple_buf);
_tuple_to_row(tuple, &row, _table_mem_pool.get());
_skip_list->Insert((TableKey)tuple_buf, &overwritten);
DCHECK(!overwritten) << "Duplicate key model meet overwrite in SkipList";
}

void MemTable::_tuple_to_row(const Tuple* tuple, ContiguousRow* row, MemPool* mem_pool) {
for (size_t i = 0; i < _slot_descs->size(); ++i) {
auto cell = row->cell(i);
Expand All @@ -209,12 +214,14 @@ void MemTable::_tuple_to_row(const Tuple* tuple, ContiguousRow* row, MemPool* me

void MemTable::_aggregate_two_row(const ContiguousRow& src_row, TableKey row_in_skiplist) {
ContiguousRow dst_row(_schema, row_in_skiplist);
if (_tablet_schema->has_sequence_col()) {
agg_update_row_with_sequence(&dst_row, src_row, _tablet_schema->sequence_col_idx(),
_table_mem_pool.get());
} else {
agg_update_row(&dst_row, src_row, _table_mem_pool.get());
}
agg_update_row(&dst_row, src_row, _table_mem_pool.get());
}

void MemTable::_aggregate_two_row_with_sequence(const ContiguousRow& src_row,
TableKey row_in_skiplist) {
ContiguousRow dst_row(_schema, row_in_skiplist);
agg_update_row_with_sequence(&dst_row, src_row, _tablet_schema->sequence_col_idx(),
_table_mem_pool.get());
}

void MemTable::_aggregate_two_row_in_block(RowInBlock* new_row, RowInBlock* row_in_skiplist) {
Expand All @@ -236,7 +243,7 @@ void MemTable::_aggregate_two_row_in_block(RowInBlock* new_row, RowInBlock* row_
}
}
vectorized::Block MemTable::_collect_vskiplist_results() {
VecTable::Iterator it(_vec_skip_list);
VecTable::Iterator it(_vec_skip_list.get());
vectorized::Block in_block = _input_mutable_block.to_block();
// TODO: should try to insert data by column, not by row. to opt the the code
if (_keys_type == KeysType::DUP_KEYS) {
Expand Down Expand Up @@ -282,7 +289,7 @@ Status MemTable::_do_flush(int64_t& duration_ns) {
if (st == Status::OLAPInternalError(OLAP_ERR_FUNC_NOT_IMPLEMENTED)) {
// For alpha rowset, we do not implement "flush_single_memtable".
// Flush the memtable like the old way.
Table::Iterator it(_skip_list);
Table::Iterator it(_skip_list.get());
for (it.SeekToFirst(); it.Valid(); it.Next()) {
char* row = (char*)it.key();
ContiguousRow dst_row(_schema, row);
Expand All @@ -307,7 +314,7 @@ Status MemTable::close() {
}

MemTable::Iterator::Iterator(MemTable* memtable)
: _mem_table(memtable), _it(memtable->_skip_list) {}
: _mem_table(memtable), _it(memtable->_skip_list.get()) {}

void MemTable::Iterator::seek_to_first() {
_it.SeekToFirst();
Expand Down
12 changes: 9 additions & 3 deletions be/src/olap/memtable.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class MemTable {
size_t memory_usage() const { return _mem_tracker->consumption(); }
std::shared_ptr<MemTracker>& mem_tracker() { return _mem_tracker; }

void insert(const Tuple* tuple);
inline void insert(const Tuple* tuple) { (this->*_insert_fn)(tuple); }
// insert tuple from (row_pos) to (row_pos+num_rows)
void insert(const vectorized::Block* block, size_t row_pos, size_t num_rows);

Expand Down Expand Up @@ -140,6 +140,9 @@ class MemTable {
private:
void _tuple_to_row(const Tuple* tuple, ContiguousRow* row, MemPool* mem_pool);
void _aggregate_two_row(const ContiguousRow& new_row, TableKey row_in_skiplist);
void _aggregate_two_row_with_sequence(const ContiguousRow& new_row, TableKey row_in_skiplist);
void _insert_dup(const Tuple* tuple);
void _insert_agg(const Tuple* tuple);
// for vectorized
void _insert_one_row_from_block(RowInBlock* row_in_block);
void _aggregate_two_row_in_block(RowInBlock* new_row, RowInBlock* row_in_skiplist);
Expand Down Expand Up @@ -171,10 +174,10 @@ class MemTable {
ObjectPool _agg_object_pool;

size_t _schema_size;
Table* _skip_list;
std::unique_ptr<Table> _skip_list;
Table::Hint _hint;

VecTable* _vec_skip_list;
std::unique_ptr<VecTable> _vec_skip_list;
VecTable::Hint _vec_hint;

RowsetWriter* _rowset_writer;
Expand All @@ -185,6 +188,9 @@ class MemTable {
// This is not the rows in this memtable, because rows may be merged
// in unique or aggragate key model.
int64_t _rows = 0;
void (MemTable::*_insert_fn)(const Tuple* tuple) = nullptr;
void (MemTable::*_aggregate_two_row_fn)(const ContiguousRow& new_row,
TableKey row_in_skiplist) = nullptr;

//for vectorized
vectorized::MutableBlock _input_mutable_block;
Expand Down
34 changes: 14 additions & 20 deletions be/src/olap/rowset/segment_v2/binary_dict_page.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
#include "util/slice.h" // for Slice
#include "vec/columns/column.h"
#include "vec/columns/column_dictionary.h"
#include "vec/columns/column_vector.h"
#include "vec/columns/column_string.h"
#include "vec/columns/column_nullable.h"
#include "vec/columns/column_string.h"
#include "vec/columns/column_vector.h"
#include "vec/columns/predicate_column.h"

namespace doris {
Expand Down Expand Up @@ -66,6 +66,8 @@ Status BinaryDictPageBuilder::add(const uint8_t* vals, size_t* count) {
const Slice* src = reinterpret_cast<const Slice*>(vals);
size_t num_added = 0;
uint32_t value_code = -1;
auto* actual_builder =
down_cast<BitshufflePageBuilder<OLAP_FIELD_TYPE_INT>*>(_data_page_builder.get());

if (_data_page_builder->count() == 0) {
_first_value.assign_copy(reinterpret_cast<const uint8_t*>(src->get_data()),
Expand All @@ -90,13 +92,18 @@ Status BinaryDictPageBuilder::add(const uint8_t* vals, size_t* count) {
dict_item.relocate(item_mem);
}
value_code = _dictionary.size();
size_t add_count = 1;
RETURN_IF_ERROR(_dict_builder->add(reinterpret_cast<const uint8_t*>(&dict_item),
&add_count));
if (add_count == 0) {
// current dict page is full, stop processing remaining inputs
break;
}
_dictionary.emplace(dict_item, value_code);
_dict_items.push_back(dict_item);
_dict_builder->update_prepared_size(dict_item.size);
}
size_t add_count = 1;
RETURN_IF_ERROR(_data_page_builder->add(reinterpret_cast<const uint8_t*>(&value_code),
&add_count));
RETURN_IF_ERROR(actual_builder->single_add(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use add_internal<true>

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason for adding add internal is that I want to unify the logic of add and single add, and I don't want to add a separate function, so add internal doesn't want to be exposed for use

reinterpret_cast<const uint8_t*>(&value_code), &add_count));
if (add_count == 0) {
// current data page is full, stop processing remaining inputs
break;
Expand Down Expand Up @@ -144,17 +151,7 @@ uint64_t BinaryDictPageBuilder::size() const {
}

Status BinaryDictPageBuilder::get_dictionary_page(OwnedSlice* dictionary_page) {
_dictionary.clear();
_dict_builder->reset();
size_t add_count = 1;
// here do not check is_page_full of dict_builder
// because it is checked in add
for (auto& dict_item : _dict_items) {
RETURN_IF_ERROR(
_dict_builder->add(reinterpret_cast<const uint8_t*>(&dict_item), &add_count));
}
*dictionary_page = _dict_builder->finish();
_dict_items.clear();
return Status::OK();
}

Expand All @@ -180,10 +177,7 @@ Status BinaryDictPageBuilder::get_last_value(void* value) const {
}
uint32_t value_code;
RETURN_IF_ERROR(_data_page_builder->get_last_value(&value_code));
// TODO _dict_items is cleared in get_dictionary_page, which could cause
// get_last_value to fail when it's called after get_dictionary_page.
// the solution is to read last value from _dict_builder instead of _dict_items
*reinterpret_cast<Slice*>(value) = _dict_items[value_code];
*reinterpret_cast<Slice*>(value) = _dict_builder->get(value_code);
return Status::OK();
}

Expand Down
18 changes: 10 additions & 8 deletions be/src/olap/rowset/segment_v2/binary_plain_page.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,13 @@ namespace segment_v2 {
class BinaryPlainPageBuilder : public PageBuilder {
public:
BinaryPlainPageBuilder(const PageBuilderOptions& options)
: _size_estimate(0), _prepared_size(0), _options(options) {
: _size_estimate(0), _options(options) {
reset();
}

bool is_page_full() override {
// data_page_size is 0, do not limit the page size
return _options.data_page_size != 0 && (_size_estimate > _options.data_page_size ||
_prepared_size > _options.data_page_size);
return _options.data_page_size != 0 && _size_estimate > _options.data_page_size;
}

Status add(const uint8_t* vals, size_t* count) override {
Expand Down Expand Up @@ -101,7 +100,6 @@ class BinaryPlainPageBuilder : public PageBuilder {
_buffer.clear();
_buffer.reserve(_options.data_page_size == 0 ? 1024 : _options.data_page_size);
_size_estimate = sizeof(uint32_t);
_prepared_size = sizeof(uint32_t);
_finished = false;
_last_value_size = 0;
}
Expand All @@ -127,11 +125,16 @@ class BinaryPlainPageBuilder : public PageBuilder {
return Status::OK();
}

void update_prepared_size(size_t added_size) {
_prepared_size += added_size;
_prepared_size += sizeof(uint32_t);
inline Slice operator[](size_t idx) const {
DCHECK(!_finished);
DCHECK_LT(idx, _offsets.size());
size_t value_size =
(idx < _offsets.size() - 1) ? _offsets[idx + 1] - _offsets[idx] : _last_value_size;
return Slice(&_buffer[_offsets[idx]], value_size);
}

inline Slice get(std::size_t idx) const { return (*this)[idx]; }

private:
void _copy_value_at(size_t idx, faststring* value) const {
size_t value_size =
Expand All @@ -141,7 +144,6 @@ class BinaryPlainPageBuilder : public PageBuilder {

faststring _buffer;
size_t _size_estimate;
size_t _prepared_size;
// Offsets of each entry, relative to the start of the page
std::vector<uint32_t> _offsets;
bool _finished;
Expand Down
37 changes: 36 additions & 1 deletion be/src/olap/rowset/segment_v2/bitshuffle_page.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,48 @@ class BitshufflePageBuilder : public PageBuilder {
bool is_page_full() override { return _remain_element_capacity == 0; }

Status add(const uint8_t* vals, size_t* count) override {
return add_internal<false>(vals, count);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we delete this function?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no add is pure virtual

}

Status single_add(const uint8_t* vals, size_t* count) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we delete this function?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No,The reason for adding add internal is that I want to unify the logic of add and single add, and I don't want to add a separate function, so add internal doesn't want to be exposed for use

return add_internal<true>(vals, count);
}

template <bool single>
inline Status add_internal(const uint8_t* vals, size_t* count) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this implement faster than _data.append?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use assign instead of memcpy

DCHECK(!_finished);
if (_remain_element_capacity <= 0) {
*count = 0;
return Status::RuntimeError("page is full.");
}
int to_add = std::min<int>(_remain_element_capacity, *count);
_data.append(vals, to_add * SIZE_OF_TYPE);
int to_add_size = to_add * SIZE_OF_TYPE;
size_t orig_size = _data.size();
_data.resize(orig_size + to_add_size);
_count += to_add;
_remain_element_capacity -= to_add;
// return added number through count
*count = to_add;
if constexpr (single) {
if constexpr (SIZE_OF_TYPE == 1) {
*reinterpret_cast<uint8_t*>(&_data[orig_size]) = *vals;
return Status::OK();
} else if constexpr (SIZE_OF_TYPE == 2) {
*reinterpret_cast<uint16_t*>(&_data[orig_size]) =
*reinterpret_cast<const uint16_t*>(vals);
return Status::OK();
} else if constexpr (SIZE_OF_TYPE == 4) {
*reinterpret_cast<uint32_t*>(&_data[orig_size]) =
*reinterpret_cast<const uint32_t*>(vals);
return Status::OK();
} else if constexpr (SIZE_OF_TYPE == 8) {
*reinterpret_cast<uint64_t*>(&_data[orig_size]) =
*reinterpret_cast<const uint64_t*>(vals);
return Status::OK();
}
}
// when single is true and SIZE_OF_TYPE > 8 or single is false
memcpy(&_data[orig_size], vals, to_add_size);
return Status::OK();
}

Expand Down