From 2ef60703711aac089f9dfffb6bab7719c4750c72 Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Tue, 20 Sep 2022 21:35:34 +0800 Subject: [PATCH 1/2] fix load channel tracker --- be/src/olap/compaction.cpp | 9 ++---- be/src/olap/delta_writer.cpp | 12 ++------ be/src/olap/memtable.cpp | 28 ++++++++++++------- be/src/olap/memtable.h | 13 ++++++--- be/src/olap/memtable_flush_executor.cpp | 13 +++------ be/src/olap/memtable_flush_executor.h | 3 +- be/src/runtime/load_channel.cpp | 3 +- be/src/runtime/memory/mem_tracker_limiter.cpp | 5 ++-- be/src/runtime/memory/mem_tracker_limiter.h | 6 ++++ .../runtime/memory/mem_tracker_task_pool.cpp | 12 ++++---- be/src/runtime/runtime_state.cpp | 1 + be/src/util/mem_info.cpp | 2 ++ be/src/util/mem_info.h | 6 ++++ 13 files changed, 60 insertions(+), 53 deletions(-) diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index 2e9b4a6b55a44b..f2e5170d8810db 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -37,18 +37,13 @@ Compaction::Compaction(TabletSharedPtr tablet, const std::string& label) #ifndef BE_TEST _mem_tracker = std::make_shared( -1, label, StorageEngine::instance()->compaction_mem_tracker()); + _mem_tracker->enable_reset_zero(); #else _mem_tracker = std::make_shared(-1, label); #endif } -Compaction::~Compaction() { -#ifndef BE_TEST - // Compaction tracker cannot be completely accurate, offset the global impact. - StorageEngine::instance()->compaction_mem_tracker()->cache_consume_local( - -_mem_tracker->consumption()); -#endif -} +Compaction::~Compaction() {} Status Compaction::compact() { RETURN_NOT_OK(prepare_compact()); diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index 8e1b27f5c07e03..6719b9ddf68d78 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -191,8 +191,6 @@ Status DeltaWriter::write(const RowBatch* row_batch, const std::vector& row return Status::OLAPInternalError(OLAP_ERR_ALREADY_CANCELLED); } - SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::LOAD); - for (const auto& row_idx : row_idxs) { _mem_table->insert(row_batch->get_row(row_idx)->get_tuple(0)); } @@ -218,7 +216,6 @@ Status DeltaWriter::write(const vectorized::Block* block, const std::vector return Status::OLAPInternalError(OLAP_ERR_ALREADY_CANCELLED); } - SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::LOAD); _mem_table->insert(block, row_idxs); if (_mem_table->need_to_agg()) { @@ -236,7 +233,7 @@ Status DeltaWriter::_flush_memtable_async() { if (++_segment_counter > config::max_segment_num_per_rowset) { return Status::OLAPInternalError(OLAP_ERR_TOO_MANY_SEGMENTS); } - return _flush_token->submit(std::move(_mem_table), _mem_tracker); + return _flush_token->submit(std::move(_mem_table)); } Status DeltaWriter::flush_memtable_and_wait(bool need_wait) { @@ -253,7 +250,6 @@ Status DeltaWriter::flush_memtable_and_wait(bool need_wait) { return Status::OLAPInternalError(OLAP_ERR_ALREADY_CANCELLED); } - SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::LOAD); if (_flush_token->get_stats().flush_running_count == 0) { // equal means there is no memtable in flush queue, just flush this memtable VLOG_NOTICE << "flush memtable to reduce mem consumption. memtable size: " @@ -290,7 +286,7 @@ void DeltaWriter::_reset_mem_table() { } _mem_table.reset(new MemTable(_tablet, _schema.get(), _tablet_schema.get(), _req.slots, _req.tuple_desc, _rowset_writer.get(), _delete_bitmap, - _rowset_ids, _cur_max_version, _is_vec)); + _rowset_ids, _cur_max_version, _mem_tracker, _is_vec)); } Status DeltaWriter::close() { @@ -308,7 +304,6 @@ Status DeltaWriter::close() { return Status::OLAPInternalError(OLAP_ERR_ALREADY_CANCELLED); } - SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::LOAD); RETURN_NOT_OK(_flush_memtable_async()); _mem_table.reset(); return Status::OK(); @@ -323,8 +318,6 @@ Status DeltaWriter::close_wait(const PSlaveTabletNodes& slave_tablet_nodes, if (_is_cancelled) { return Status::OLAPInternalError(OLAP_ERR_ALREADY_CANCELLED); } - - SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::LOAD); // return error if previous flush failed RETURN_NOT_OK(_flush_token->wait()); @@ -384,7 +377,6 @@ Status DeltaWriter::cancel() { if (!_is_init || _is_cancelled) { return Status::OK(); } - SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::LOAD); _mem_table.reset(); if (_flush_token != nullptr) { // cancel and wait all memtables in flush queue to be finished diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index 565cd49a81ee8c..65fd0b3583bb47 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -34,15 +34,14 @@ MemTable::MemTable(TabletSharedPtr tablet, Schema* schema, const TabletSchema* t const std::vector* slot_descs, TupleDescriptor* tuple_desc, RowsetWriter* rowset_writer, DeleteBitmapPtr delete_bitmap, const RowsetIdUnorderedSet& rowset_ids, int64_t cur_max_version, - bool support_vec) + const std::shared_ptr& tracker, bool support_vec) : _tablet(std::move(tablet)), _schema(schema), _tablet_schema(tablet_schema), _slot_descs(slot_descs), - _mem_tracker(std::make_unique( - fmt::format("MemTable:tabletId={}", std::to_string(tablet_id())))), - _buffer_mem_pool(new MemPool(_mem_tracker.get())), - _table_mem_pool(new MemPool(_mem_tracker.get())), + _mem_tracker_hook(std::make_shared( + -1, fmt::format("MemTableHook:tabletId={}", std::to_string(tablet_id())), + tracker)), _schema_size(_schema->schema_size()), _rowset_writer(rowset_writer), _is_first_insertion(true), @@ -53,6 +52,12 @@ MemTable::MemTable(TabletSharedPtr tablet, Schema* schema, const TabletSchema* t _delete_bitmap(delete_bitmap), _rowset_ids(rowset_ids), _cur_max_version(cur_max_version) { + _mem_tracker_hook->enable_reset_zero(); + SCOPED_ATTACH_TASK(_mem_tracker_hook, ThreadContext::TaskType::LOAD); + _mem_tracker_manual = std::make_unique( + fmt::format("MemTableManual:tabletId={}", std::to_string(tablet_id()))); + _buffer_mem_pool = std::make_unique(_mem_tracker_manual.get()); + _table_mem_pool = std::make_unique(_mem_tracker_manual.get()); if (support_vec) { _skip_list = nullptr; _vec_row_comparator = std::make_shared(_schema); @@ -147,12 +152,12 @@ MemTable::~MemTable() { } } std::for_each(_row_in_blocks.begin(), _row_in_blocks.end(), std::default_delete()); - _mem_tracker->release(_mem_usage); + _mem_tracker_manual->release(_mem_usage); _buffer_mem_pool->free_all(); _table_mem_pool->free_all(); - DCHECK_EQ(_mem_tracker->consumption(), 0) + DCHECK_EQ(_mem_tracker_manual->consumption(), 0) << std::endl - << MemTracker::log_usage(_mem_tracker->make_snapshot(0)); + << MemTracker::log_usage(_mem_tracker_manual->make_snapshot(0)); } MemTable::RowCursorComparator::RowCursorComparator(const Schema* schema) : _schema(schema) {} @@ -170,6 +175,7 @@ int MemTable::RowInBlockComparator::operator()(const RowInBlock* left, } void MemTable::insert(const vectorized::Block* input_block, const std::vector& row_idxs) { + SCOPED_ATTACH_TASK(_mem_tracker_hook, ThreadContext::TaskType::LOAD); auto target_block = input_block->copy_block(_column_offset); if (_is_first_insertion) { _is_first_insertion = false; @@ -186,7 +192,7 @@ void MemTable::insert(const vectorized::Block* input_block, const std::vectorconsume(input_size); + _mem_tracker_manual->consume(input_size); for (int i = 0; i < num_rows; i++) { _row_in_blocks.emplace_back(new RowInBlock {cursor_in_mutableblock + i}); @@ -367,7 +373,7 @@ void MemTable::_collect_vskiplist_results() { if constexpr (!is_final) { // if is not final, we collect the agg results to input_block and then continue to insert size_t shrunked_after_agg = _output_mutable_block.allocated_bytes(); - _mem_tracker->consume(shrunked_after_agg - _mem_usage); + _mem_tracker_manual->consume(shrunked_after_agg - _mem_usage); _mem_usage = shrunked_after_agg; _input_mutable_block.swap(_output_mutable_block); //TODO(weixang):opt here. @@ -385,6 +391,7 @@ void MemTable::_collect_vskiplist_results() { } void MemTable::shrink_memtable_by_agg() { + SCOPED_ATTACH_TASK(_mem_tracker_hook, ThreadContext::TaskType::LOAD); if (keys_type() == KeysType::DUP_KEYS) { return; } @@ -421,6 +428,7 @@ Status MemTable::_generate_delete_bitmap() { } Status MemTable::flush() { + SCOPED_ATTACH_TASK(_mem_tracker_hook, ThreadContext::TaskType::LOAD); VLOG_CRITICAL << "begin to flush memtable for tablet: " << tablet_id() << ", memsize: " << memory_usage() << ", rows: " << _rows; int64_t duration_ns = 0; diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h index 357d21a189b318..2170a5db59cdd4 100644 --- a/be/src/olap/memtable.h +++ b/be/src/olap/memtable.h @@ -45,14 +45,18 @@ class MemTable { const std::vector* slot_descs, TupleDescriptor* tuple_desc, RowsetWriter* rowset_writer, DeleteBitmapPtr delete_bitmap, const RowsetIdUnorderedSet& rowset_ids, int64_t cur_max_version, - bool support_vec = false); + const std::shared_ptr& tracker, bool support_vec = false); ~MemTable(); int64_t tablet_id() const { return _tablet->tablet_id(); } KeysType keys_type() const { return _tablet->keys_type(); } - size_t memory_usage() const { return _mem_tracker->consumption(); } + std::shared_ptr mem_tracker_hook() const { return _mem_tracker_hook; } + size_t memory_usage() const { return _mem_tracker_manual->consumption(); } - inline void insert(const Tuple* tuple) { (this->*_insert_fn)(tuple); } + inline void insert(const Tuple* tuple) { + SCOPED_ATTACH_TASK(_mem_tracker_hook, ThreadContext::TaskType::LOAD); + (this->*_insert_fn)(tuple); + } // insert tuple from (row_pos) to (row_pos+num_rows) void insert(const vectorized::Block* block, const std::vector& row_idxs); @@ -157,7 +161,8 @@ class MemTable { std::shared_ptr _vec_row_comparator; - std::unique_ptr _mem_tracker; + std::unique_ptr _mem_tracker_manual; + std::shared_ptr _mem_tracker_hook; // This is a buffer, to hold the memory referenced by the rows that have not // been inserted into the SkipList std::unique_ptr _buffer_mem_pool; diff --git a/be/src/olap/memtable_flush_executor.cpp b/be/src/olap/memtable_flush_executor.cpp index bf1b6819a5e347..1dd4859e5a20dc 100644 --- a/be/src/olap/memtable_flush_executor.cpp +++ b/be/src/olap/memtable_flush_executor.cpp @@ -29,16 +29,14 @@ namespace doris { class MemtableFlushTask final : public Runnable { public: MemtableFlushTask(FlushToken* flush_token, std::unique_ptr memtable, - int64_t submit_task_time, const std::shared_ptr& tracker) + int64_t submit_task_time) : _flush_token(flush_token), _memtable(std::move(memtable)), - _submit_task_time(submit_task_time), - _tracker(tracker) {} + _submit_task_time(submit_task_time) {} ~MemtableFlushTask() override = default; void run() override { - SCOPED_ATTACH_TASK(_tracker, ThreadContext::TaskType::LOAD); _flush_token->_flush_memtable(_memtable.get(), _submit_task_time); _memtable.reset(); } @@ -47,7 +45,6 @@ class MemtableFlushTask final : public Runnable { FlushToken* _flush_token; std::unique_ptr _memtable; int64_t _submit_task_time; - std::shared_ptr _tracker; }; std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat) { @@ -60,15 +57,13 @@ std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat) { return os; } -Status FlushToken::submit(std::unique_ptr mem_table, - const std::shared_ptr& tracker) { +Status FlushToken::submit(std::unique_ptr mem_table) { ErrorCode s = _flush_status.load(); if (s != OLAP_SUCCESS) { return Status::OLAPInternalError(s); } int64_t submit_task_time = MonotonicNanos(); - auto task = std::make_shared(this, std::move(mem_table), submit_task_time, - tracker); + auto task = std::make_shared(this, std::move(mem_table), submit_task_time); _stats.flush_running_count++; return _flush_token->submit(std::move(task)); } diff --git a/be/src/olap/memtable_flush_executor.h b/be/src/olap/memtable_flush_executor.h index 53e2cfaf984b5e..db16dd2acfc1ad 100644 --- a/be/src/olap/memtable_flush_executor.h +++ b/be/src/olap/memtable_flush_executor.h @@ -58,8 +58,7 @@ class FlushToken { explicit FlushToken(std::unique_ptr flush_pool_token) : _flush_token(std::move(flush_pool_token)), _flush_status(OLAP_SUCCESS) {} - Status submit(std::unique_ptr mem_table, - const std::shared_ptr& tracker); + Status submit(std::unique_ptr mem_table); // error has happpens, so we cancel this token // And remove all tasks in the queue. diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp index 48d62a4445597b..c989f113362cc8 100644 --- a/be/src/runtime/load_channel.cpp +++ b/be/src/runtime/load_channel.cpp @@ -38,6 +38,7 @@ LoadChannel::LoadChannel(const UniqueId& load_id, std::shared_ptrenable_reset_zero(); } LoadChannel::~LoadChannel() { @@ -45,8 +46,6 @@ LoadChannel::~LoadChannel() { << ", info=" << _mem_tracker->debug_string() << ", load_id=" << _load_id << ", is high priority=" << _is_high_priority << ", sender_ip=" << _sender_ip << ", is_vec=" << _is_vec; - // Load channel tracker cannot be completely accurate, offsetting the impact on the load channel mgr tracker. - _mem_tracker->parent()->cache_consume_local(-_mem_tracker->consumption()); } Status LoadChannel::open(const PTabletWriterOpenRequest& params) { diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index e1ff4f04db5fbc..e327be3fef6ac7 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -83,12 +83,13 @@ MemTrackerLimiter::~MemTrackerLimiter() { // the first layer: process; // the second layer: a tracker that will not be destructed globally (query/load pool, load channel mgr, etc.); // the third layer: a query/load/compaction task generates a tracker (query tracker, load channel tracker, etc.). - if (_parent->parent()->label() == "Process") { + if ((_parent && _parent->label() == "Process") || + (_parent->parent() && _parent->parent()->label() == "Process")) { ExecEnv::GetInstance()->orphan_mem_tracker_raw()->cache_consume_local( _consumption->current_value()); } #endif - + if (_reset_zero) cache_consume_local(-_consumption->current_value()); if (_parent) { std::lock_guard l(_parent->_child_tracker_limiter_lock); if (_child_tracker_it != _parent->_child_tracker_limiters.end()) { diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index ea2acfeef3facc..e68d5b65514b10 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -129,6 +129,7 @@ class MemTrackerLimiter final : public MemTracker { } void enable_print_log_usage() { _print_log_usage = true; } + void enable_reset_zero() { _reset_zero = true; } // Logs the usage of this tracker limiter and optionally its children (recursively). // If 'logged_consumption' is non-nullptr, sets the consumption value logged. @@ -250,6 +251,11 @@ class MemTrackerLimiter final : public MemTracker { std::atomic_size_t _had_child_count = 0; bool _print_log_usage = false; + // mem hook record tracker cannot guarantee that the final consumption is 0, + // nor can it guarantee that the memory alloc and free are recorded in a one-to-one correspondence. + // In some cases, in order to avoid the cumulative error of the upper global tracker, + // the consumption of the current tracker is reset to zero. + bool _reset_zero = false; }; inline void MemTrackerLimiter::consume(int64_t bytes) { diff --git a/be/src/runtime/memory/mem_tracker_task_pool.cpp b/be/src/runtime/memory/mem_tracker_task_pool.cpp index a08d876370cf0c..143e7486fa295f 100644 --- a/be/src/runtime/memory/mem_tracker_task_pool.cpp +++ b/be/src/runtime/memory/mem_tracker_task_pool.cpp @@ -89,14 +89,12 @@ void MemTrackerTaskPool::logout_task_mem_tracker() { // between the two trackers. // At present, it is impossible to effectively locate which memory consume and release on different trackers, // so query memory leaks cannot be found. - // - // In order to ensure that the query pool mem tracker is the sum of all currently running query mem trackers, - // the effect of the ended query mem tracker on the query pool mem tracker should be cleared, that is, - // the negative number of the current value of consume. - it->second->parent()->cache_consume_local(-it->second->consumption()); LOG(INFO) << fmt::format( - "Deregister query/load memory tracker, queryId={}, Limit={}, PeakUsed={}", - it->first, it->second->limit(), it->second->peak_consumption()); + "Deregister query/load memory tracker, queryId={}, Limit={}, CurrUsed={}, " + "PeakUsed={}", + it->first, PrettyPrinter::print(it->second->limit(), TUnit::BYTES), + PrettyPrinter::print(it->second->consumption(), TUnit::BYTES), + PrettyPrinter::print(it->second->peak_consumption(), TUnit::BYTES)); expired_task_ids.emplace_back(it->first); } } diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 6f48e084abf7ad..646ae960ca46fc 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -240,6 +240,7 @@ Status RuntimeState::init_mem_trackers(const TUniqueId& query_id) { DCHECK(false); _query_mem_tracker = ExecEnv::GetInstance()->query_pool_mem_tracker(); } + _query_mem_tracker->enable_reset_zero(); _instance_mem_tracker = std::make_shared( -1, "RuntimeState:instance:" + print_id(_fragment_instance_id), _query_mem_tracker, diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp index 522ea0566a5c67..c487ffe8eb1c20 100644 --- a/be/src/util/mem_info.cpp +++ b/be/src/util/mem_info.cpp @@ -43,12 +43,14 @@ int64_t MemInfo::_s_mem_limit = -1; std::string MemInfo::_s_mem_limit_str = ""; int64_t MemInfo::_s_hard_mem_limit = -1; size_t MemInfo::_s_allocator_physical_mem = 0; +size_t MemInfo::_s_pageheap_unmapped_bytes = 0; size_t MemInfo::_s_tcmalloc_pageheap_free_bytes = 0; size_t MemInfo::_s_tcmalloc_central_bytes = 0; size_t MemInfo::_s_tcmalloc_transfer_bytes = 0; size_t MemInfo::_s_tcmalloc_thread_bytes = 0; size_t MemInfo::_s_allocator_cache_mem = 0; std::string MemInfo::_s_allocator_cache_mem_str = ""; +size_t MemInfo::_s_virtual_memory_used = 0; void MemInfo::init() { // Read from /proc/meminfo diff --git a/be/src/util/mem_info.h b/be/src/util/mem_info.h index 2aabf18b384dd6..925f1b89862390 100644 --- a/be/src/util/mem_info.h +++ b/be/src/util/mem_info.h @@ -46,6 +46,7 @@ class MemInfo { } static inline size_t current_mem() { return _s_allocator_physical_mem; } + static inline size_t allocator_virtual_mem() { return _s_virtual_memory_used; } static inline size_t allocator_cache_mem() { return _s_allocator_cache_mem; } static inline std::string allocator_cache_mem_str() { return _s_allocator_cache_mem_str; } @@ -54,6 +55,8 @@ class MemInfo { static inline void refresh_allocator_mem() { MallocExtension::instance()->GetNumericProperty("generic.total_physical_bytes", &_s_allocator_physical_mem); + MallocExtension::instance()->GetNumericProperty("tcmalloc.pageheap_unmapped_bytes", + &_s_pageheap_unmapped_bytes); MallocExtension::instance()->GetNumericProperty("tcmalloc.pageheap_free_bytes", &_s_tcmalloc_pageheap_free_bytes); MallocExtension::instance()->GetNumericProperty("tcmalloc.central_cache_free_bytes", @@ -65,6 +68,7 @@ class MemInfo { _s_allocator_cache_mem = _s_tcmalloc_pageheap_free_bytes + _s_tcmalloc_central_bytes + _s_tcmalloc_transfer_bytes + _s_tcmalloc_thread_bytes; _s_allocator_cache_mem_str = PrettyPrinter::print(_s_allocator_cache_mem, TUnit::BYTES); + _s_virtual_memory_used = _s_allocator_physical_mem + _s_pageheap_unmapped_bytes; } static inline int64_t mem_limit() { @@ -86,12 +90,14 @@ class MemInfo { static std::string _s_mem_limit_str; static int64_t _s_hard_mem_limit; static size_t _s_allocator_physical_mem; + static size_t _s_pageheap_unmapped_bytes; static size_t _s_tcmalloc_pageheap_free_bytes; static size_t _s_tcmalloc_central_bytes; static size_t _s_tcmalloc_transfer_bytes; static size_t _s_tcmalloc_thread_bytes; static size_t _s_allocator_cache_mem; static std::string _s_allocator_cache_mem_str; + static size_t _s_virtual_memory_used; }; } // namespace doris From cdfd84ca93ee5c98f899094787905cd9366e990b Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Wed, 21 Sep 2022 00:45:57 +0800 Subject: [PATCH 2/2] fix ut --- be/src/runtime/thread_context.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index b097910f24a6ab..eb1e7751672169 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -134,9 +134,11 @@ class ThreadContext { void attach_task(const TaskType& type, const std::string& task_id, const TUniqueId& fragment_instance_id, const std::shared_ptr& mem_tracker) { +#ifndef BE_TEST DCHECK((_type == TaskType::UNKNOWN || _type == TaskType::BRPC) && _task_id == "") << ",new tracker label: " << mem_tracker->label() << ",old tracker label: " << _thread_mem_tracker_mgr->limiter_mem_tracker_raw()->label(); +#endif DCHECK(type != TaskType::UNKNOWN); _type = type; _task_id = task_id;