diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index 11521577bdc0ef1..28da63d8980b66a 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -77,6 +77,16 @@ MemTrackerLimiter::~MemTrackerLimiter() { if (_label == "Process") doris::thread_context_ptr._init = false; DCHECK(remain_child_count() == 0 || _label == "Process"); consume(_untracked_mem.exchange(0)); + // In order to ensure `consumption of all limiter trackers` + `orphan tracker consumption` = `process tracker consumption` + // in real time. Merge its consumption into orphan when all third level limiter trackers are destructed, to avoid repetition. + // the first layer: process; + // the second layer: a tracker that will not be destructed globally (query/load pool, load channel mgr, etc.); + // the third layer: a query/load/compaction task generates a tracker (query tracker, load channel tracker, etc.). + if (_parent->parent()->label() == "_parent") { + ExecEnv::GetInstance()->orphan_mem_tracker_raw()->cache_consume_local( + _consumption->current_value()); + } + if (_parent) { std::lock_guard l(_parent->_child_tracker_limiter_lock); if (_child_tracker_it != _parent->_child_tracker_limiters.end()) { @@ -84,10 +94,6 @@ MemTrackerLimiter::~MemTrackerLimiter() { _child_tracker_it = _parent->_child_tracker_limiters.end(); } } - // In order to ensure `consumption of all limiter trackers` + `orphan tracker consumption` = `process tracker consumption` - // in real time. Merge its consumption into orphan when all other trackers are destructed. - ExecEnv::GetInstance()->orphan_mem_tracker_raw()->cache_consume_local( - _consumption->current_value()); } MemTracker::Snapshot MemTrackerLimiter::make_snapshot(size_t level) const { diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index 7f12f08f53311a1..ea2acfeef3facc9 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -216,7 +216,7 @@ class MemTrackerLimiter final : public MemTracker { "alloc size {}", PerfCounters::get_vm_rss_str(), MemInfo::allocator_cache_mem_str(), MemInfo::mem_limit_str(), print_bytes(bytes)); - ExecEnv::GetInstance()->process_mem_tracker_raw()->print_log_usage(err_msg); + ExecEnv::GetInstance()->process_mem_tracker()->print_log_usage(err_msg); return err_msg; } diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp index ee2393f8759058a..4521a24881e47fc 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp @@ -29,8 +29,8 @@ void ThreadMemTrackerMgr::attach_limiter_tracker( const std::shared_ptr& mem_tracker) { DCHECK(mem_tracker); flush_untracked_mem(); - _task_id = task_id; - _fragment_instance_id = fragment_instance_id; + _task_id_stack.push_back(task_id); + _fragment_instance_id_stack.push_back(fragment_instance_id); _limiter_tracker_stack.push_back(mem_tracker); _limiter_tracker_raw = mem_tracker.get(); } @@ -38,16 +38,16 @@ void ThreadMemTrackerMgr::attach_limiter_tracker( void ThreadMemTrackerMgr::detach_limiter_tracker() { DCHECK(!_limiter_tracker_stack.empty()); flush_untracked_mem(); - _task_id = ""; - _fragment_instance_id = TUniqueId(); + _task_id_stack.pop_back(); + _fragment_instance_id_stack.pop_back(); _limiter_tracker_stack.pop_back(); _limiter_tracker_raw = _limiter_tracker_stack.back().get(); } void ThreadMemTrackerMgr::exceeded_cancel_task(const std::string& cancel_details) { - if (_fragment_instance_id != TUniqueId()) { + if (_fragment_instance_id_stack.back() != TUniqueId()) { ExecEnv::GetInstance()->fragment_mgr()->cancel( - _fragment_instance_id, PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED, + _fragment_instance_id_stack.back(), PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED, cancel_details); } } diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h index 06b36c5c70cd02d..786df02ddb25a5f 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.h +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h @@ -81,7 +81,7 @@ class ThreadMemTrackerMgr { template void flush_untracked_mem(); - bool is_attach_query() { return _fragment_instance_id != TUniqueId(); } + bool is_attach_query() { return _fragment_instance_id_stack.back() != TUniqueId(); } std::shared_ptr limiter_mem_tracker() { return _limiter_tracker_stack.back(); @@ -100,8 +100,8 @@ class ThreadMemTrackerMgr { return fmt::format( "ThreadMemTrackerMgr debug, _untracked_mem:{}, _task_id:{}, " "_limiter_tracker:<{}>, _consumer_tracker_stack:<{}>", - std::to_string(_untracked_mem), _task_id, _limiter_tracker_raw->log_usage(1), - fmt::to_string(consumer_tracker_buf)); + std::to_string(_untracked_mem), _task_id_stack.back(), + _limiter_tracker_raw->log_usage(1), fmt::to_string(consumer_tracker_buf)); } private: @@ -127,8 +127,8 @@ class ThreadMemTrackerMgr { // If there is a memory new/delete operation in the consume method, it may enter infinite recursion. bool _stop_consume = false; bool _check_attach = true; - std::string _task_id; - TUniqueId _fragment_instance_id; + std::vector _task_id_stack; + std::vector _fragment_instance_id_stack; ExceedCallBack _cb_func = nullptr; }; @@ -137,10 +137,11 @@ inline void ThreadMemTrackerMgr::init() { // _limiter_tracker_stack[0] = orphan_mem_tracker DCHECK(_limiter_tracker_stack.size() <= 1) << "limiter_tracker_stack.size(): " << _limiter_tracker_stack.size(); - _task_id = ""; if (_limiter_tracker_stack.size() == 0) { _limiter_tracker_stack.push_back(ExecEnv::GetInstance()->orphan_mem_tracker()); _limiter_tracker_raw = ExecEnv::GetInstance()->orphan_mem_tracker_raw(); + _task_id_stack.push_back(""); + _fragment_instance_id_stack.push_back(TUniqueId()); } _check_limit = true; } diff --git a/be/test/testutil/run_all_tests.cpp b/be/test/testutil/run_all_tests.cpp index d9da2a647de573d..ea518722113ae4c 100644 --- a/be/test/testutil/run_all_tests.cpp +++ b/be/test/testutil/run_all_tests.cpp @@ -31,7 +31,7 @@ int main(int argc, char** argv) { std::shared_ptr process_mem_tracker = std::make_shared(-1, "Process"); std::shared_ptr _orphan_mem_tracker = - std::make_shared(-1, "Orphan", process_mem_tracker); + std::make_shared(-1, "Orphan", process_mem_tracker); doris::ExecEnv::GetInstance()->set_global_mem_tracker(process_mem_tracker, _orphan_mem_tracker); doris::thread_context()->_thread_mem_tracker_mgr->init(); doris::TabletSchemaCache::create_global_schema_cache();