diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 7be1c7e8d66b65..fc70a6d3dbf972 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -118,14 +118,17 @@ class ExecEnv { } std::shared_ptr process_mem_tracker() { return _process_mem_tracker; } - MemTrackerLimiter* process_mem_tracker_raw() { return _process_mem_tracker_raw; } - void set_process_mem_tracker(const std::shared_ptr& tracker) { - _process_mem_tracker = tracker; - _process_mem_tracker_raw = tracker.get(); + void set_global_mem_tracker(const std::shared_ptr& process_tracker, + const std::shared_ptr& orphan_tracker) { + _process_mem_tracker = process_tracker; + _orphan_mem_tracker = orphan_tracker; + _orphan_mem_tracker_raw = orphan_tracker.get(); } std::shared_ptr allocator_cache_mem_tracker() { return _allocator_cache_mem_tracker; } + std::shared_ptr orphan_mem_tracker() { return _orphan_mem_tracker; } + MemTrackerLimiter* orphan_mem_tracker_raw() { return _orphan_mem_tracker_raw; } std::shared_ptr query_pool_mem_tracker() { return _query_pool_mem_tracker; } std::shared_ptr load_pool_mem_tracker() { return _load_pool_mem_tracker; } MemTrackerTaskPool* task_pool_mem_tracker_registry() { return _task_pool_mem_tracker_registry; } @@ -215,8 +218,15 @@ class ExecEnv { // The ancestor for all trackers. Every tracker is visible from the process down. // Not limit total memory by process tracker, and it's just used to track virtual memory of process. std::shared_ptr _process_mem_tracker; + // tcmalloc/jemalloc allocator cache tracker, Including thread cache, free heap, etc. std::shared_ptr _allocator_cache_mem_tracker; - MemTrackerLimiter* _process_mem_tracker_raw; + // The default tracker consumed by mem hook. If the thread does not attach other trackers, + // by default all consumption will be passed to the process tracker through the orphan tracker. + // In real time, `consumption of all limiter trackers` + `orphan tracker consumption` = `process tracker consumption`. + // Ideally, all threads are expected to attach to the specified tracker, so that "all memory has its own ownership", + // and the consumption of the orphan mem tracker is close to 0, but greater than 0. + std::shared_ptr _orphan_mem_tracker; + MemTrackerLimiter* _orphan_mem_tracker_raw; // The ancestor for all querys tracker. std::shared_ptr _query_pool_mem_tracker; // The ancestor for all load tracker. diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 5404394964aaec..fc2f5a7c02595e 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -206,7 +206,8 @@ Status ExecEnv::_init_mem_tracker() { } _process_mem_tracker = std::make_shared(global_memory_limit_bytes, "Process"); - _process_mem_tracker_raw = _process_mem_tracker.get(); + _orphan_mem_tracker = std::make_shared(-1, "Orphan", _process_mem_tracker); + _orphan_mem_tracker_raw = _orphan_mem_tracker.get(); thread_context()->_thread_mem_tracker_mgr->init(); thread_context()->_thread_mem_tracker_mgr->set_check_attach(false); #if defined(USE_MEM_TRACKER) && !defined(__SANITIZE_ADDRESS__) && !defined(ADDRESS_SANITIZER) && \ diff --git a/be/src/runtime/mem_pool.cpp b/be/src/runtime/mem_pool.cpp index 768cdccb18ecaf..9064f5881865a9 100644 --- a/be/src/runtime/mem_pool.cpp +++ b/be/src/runtime/mem_pool.cpp @@ -67,7 +67,7 @@ MemPool::~MemPool() { ChunkAllocator::instance()->free(chunk.chunk); } THREAD_MEM_TRACKER_TRANSFER_FROM(total_bytes_released - peak_allocated_bytes_, - ExecEnv::GetInstance()->process_mem_tracker_raw()); + ExecEnv::GetInstance()->orphan_mem_tracker_raw()); if (_mem_tracker) _mem_tracker->release(total_bytes_released); DorisMetrics::instance()->memory_pool_bytes_total->increment(-total_bytes_released); } @@ -89,7 +89,7 @@ void MemPool::free_all() { ChunkAllocator::instance()->free(chunk.chunk); } THREAD_MEM_TRACKER_TRANSFER_FROM(total_bytes_released - peak_allocated_bytes_, - ExecEnv::GetInstance()->process_mem_tracker_raw()); + ExecEnv::GetInstance()->orphan_mem_tracker_raw()); if (_mem_tracker) _mem_tracker->release(total_bytes_released); chunks_.clear(); next_chunk_size_ = INITIAL_CHUNK_SIZE; @@ -150,7 +150,7 @@ Status MemPool::find_chunk(size_t min_size, bool check_limits) { // Allocate a new chunk. Return early if allocate fails. Chunk chunk; RETURN_IF_ERROR(ChunkAllocator::instance()->allocate(chunk_size, &chunk)); - THREAD_MEM_TRACKER_TRANSFER_TO(chunk_size, ExecEnv::GetInstance()->process_mem_tracker_raw()); + THREAD_MEM_TRACKER_TRANSFER_TO(chunk_size, ExecEnv::GetInstance()->orphan_mem_tracker_raw()); if (_mem_tracker) _mem_tracker->consume(chunk_size); ASAN_POISON_MEMORY_REGION(chunk.data, chunk_size); // Put it before the first free chunk. If no free chunks, it goes at the end. diff --git a/be/src/runtime/mem_pool.h b/be/src/runtime/mem_pool.h index 98e2d5a271ddce..bf388dde772030 100644 --- a/be/src/runtime/mem_pool.h +++ b/be/src/runtime/mem_pool.h @@ -214,7 +214,7 @@ class MemPool { void reset_peak() { if (total_allocated_bytes_ - peak_allocated_bytes_ > 65536) { THREAD_MEM_TRACKER_TRANSFER_FROM(total_allocated_bytes_ - peak_allocated_bytes_, - ExecEnv::GetInstance()->process_mem_tracker_raw()); + ExecEnv::GetInstance()->orphan_mem_tracker_raw()); peak_allocated_bytes_ = total_allocated_bytes_; } } diff --git a/be/src/runtime/memory/mem_tracker.cpp b/be/src/runtime/memory/mem_tracker.cpp index caf2325386c498..5d53382c0abd98 100644 --- a/be/src/runtime/memory/mem_tracker.cpp +++ b/be/src/runtime/memory/mem_tracker.cpp @@ -58,12 +58,14 @@ MemTracker::MemTracker(const std::string& label, RuntimeProfile* profile) { _consumption = profile->AddSharedHighWaterMarkCounter(COUNTER_NAME, TUnit::BYTES); } - DCHECK(thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker() != nullptr); - _label = fmt::format( - "{} | {}", label, - thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw()->label()); - _bind_group_num = - thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw()->group_num(); + DCHECK(thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw() != nullptr); + MemTrackerLimiter* parent = + thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw(); + _label = fmt::format("[Observer] {} | {}", label, + parent->label() == "Orphan" ? "Process" : parent->label()); + _bind_group_num = parent->label() == "Orphan" + ? ExecEnv::GetInstance()->process_mem_tracker()->group_num() + : parent->group_num(); { std::lock_guard l(mem_tracker_pool[_bind_group_num].group_lock); _tracker_group_it = mem_tracker_pool[_bind_group_num].trackers.insert( @@ -119,8 +121,8 @@ std::shared_ptr MemTracker::get_global_mem_tracker(const std::string if (global_mem_trackers.find(label) != global_mem_trackers.end()) { return global_mem_trackers[label]; } else { - global_mem_trackers.emplace(label, - std::make_shared(fmt::format("[Global]{}", label))); + global_mem_trackers.emplace( + label, std::make_shared(fmt::format("[Global] {}", label))); return global_mem_trackers[label]; } } diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index b1211d48a725eb..e1ff4f04db5fbc 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -42,7 +42,14 @@ MemTrackerLimiter::MemTrackerLimiter(int64_t byte_limit, const std::string& labe _label = label; _limit = byte_limit; _group_num = GetCurrentTimeMicros() % 1000; - _parent = parent ? parent : thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker(); + if (parent || label == "Process") { + _parent = parent; + } else if (thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw()->label() == + "Orphan") { + _parent = ExecEnv::GetInstance()->process_mem_tracker(); + } else { + _parent = thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker(); + } DCHECK(_parent || label == "Process"); // Walks the MemTrackerLimiter hierarchy and populates _all_ancestors and _limited_ancestors @@ -70,6 +77,18 @@ MemTrackerLimiter::~MemTrackerLimiter() { if (_label == "Process") doris::thread_context_ptr._init = false; DCHECK(remain_child_count() == 0 || _label == "Process"); consume(_untracked_mem.exchange(0)); +#ifndef BE_TEST + // In order to ensure `consumption of all limiter trackers` + `orphan tracker consumption` = `process tracker consumption` + // in real time. Merge its consumption into orphan when all third level limiter trackers are destructed, to avoid repetition. + // the first layer: process; + // the second layer: a tracker that will not be destructed globally (query/load pool, load channel mgr, etc.); + // the third layer: a query/load/compaction task generates a tracker (query tracker, load channel tracker, etc.). + if (_parent->parent()->label() == "Process") { + ExecEnv::GetInstance()->orphan_mem_tracker_raw()->cache_consume_local( + _consumption->current_value()); + } +#endif + if (_parent) { std::lock_guard l(_parent->_child_tracker_limiter_lock); if (_child_tracker_it != _parent->_child_tracker_limiters.end()) { @@ -260,7 +279,7 @@ std::string MemTrackerLimiter::mem_limit_exceeded(const std::string& msg, // The limit of the current tracker and parents is less than 0, the consume will not fail, // and the current process memory has no excess limit. detail += fmt::format("unknown exceed reason, executing msg:<{}>", msg); - print_log_usage_tracker = ExecEnv::GetInstance()->process_mem_tracker_raw(); + print_log_usage_tracker = ExecEnv::GetInstance()->process_mem_tracker().get(); } auto failed_msg = MemTrackerLimiter::limit_exceeded_errmsg_suffix_str(detail); if (print_log_usage_tracker != nullptr) print_log_usage_tracker->print_log_usage(failed_msg); diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index 7f12f08f53311a..ea2acfeef3facc 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -216,7 +216,7 @@ class MemTrackerLimiter final : public MemTracker { "alloc size {}", PerfCounters::get_vm_rss_str(), MemInfo::allocator_cache_mem_str(), MemInfo::mem_limit_str(), print_bytes(bytes)); - ExecEnv::GetInstance()->process_mem_tracker_raw()->print_log_usage(err_msg); + ExecEnv::GetInstance()->process_mem_tracker()->print_log_usage(err_msg); return err_msg; } diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp index aba75589fb37ae..4521a24881e47f 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp @@ -29,24 +29,25 @@ void ThreadMemTrackerMgr::attach_limiter_tracker( const std::shared_ptr& mem_tracker) { DCHECK(mem_tracker); flush_untracked_mem(); - _task_id = task_id; - _fragment_instance_id = fragment_instance_id; - _limiter_tracker = mem_tracker; + _task_id_stack.push_back(task_id); + _fragment_instance_id_stack.push_back(fragment_instance_id); + _limiter_tracker_stack.push_back(mem_tracker); _limiter_tracker_raw = mem_tracker.get(); } void ThreadMemTrackerMgr::detach_limiter_tracker() { + DCHECK(!_limiter_tracker_stack.empty()); flush_untracked_mem(); - _task_id = ""; - _fragment_instance_id = TUniqueId(); - _limiter_tracker = ExecEnv::GetInstance()->process_mem_tracker(); - _limiter_tracker_raw = ExecEnv::GetInstance()->process_mem_tracker_raw(); + _task_id_stack.pop_back(); + _fragment_instance_id_stack.pop_back(); + _limiter_tracker_stack.pop_back(); + _limiter_tracker_raw = _limiter_tracker_stack.back().get(); } void ThreadMemTrackerMgr::exceeded_cancel_task(const std::string& cancel_details) { - if (_fragment_instance_id != TUniqueId()) { + if (_fragment_instance_id_stack.back() != TUniqueId()) { ExecEnv::GetInstance()->fragment_mgr()->cancel( - _fragment_instance_id, PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED, + _fragment_instance_id_stack.back(), PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED, cancel_details); } } diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h index 670f852316e248..786df02ddb25a5 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.h +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h @@ -50,7 +50,7 @@ class ThreadMemTrackerMgr { // only for tcmalloc hook static void consume_no_attach(int64_t size) { - ExecEnv::GetInstance()->process_mem_tracker_raw()->consume(size); + ExecEnv::GetInstance()->orphan_mem_tracker_raw()->consume(size); } // After thread initialization, calling `init` again must call `clear_untracked_mems` first @@ -81,9 +81,11 @@ class ThreadMemTrackerMgr { template void flush_untracked_mem(); - bool is_attach_query() { return _fragment_instance_id != TUniqueId(); } + bool is_attach_query() { return _fragment_instance_id_stack.back() != TUniqueId(); } - std::shared_ptr limiter_mem_tracker() { return _limiter_tracker; } + std::shared_ptr limiter_mem_tracker() { + return _limiter_tracker_stack.back(); + } MemTrackerLimiter* limiter_mem_tracker_raw() { return _limiter_tracker_raw; } void set_check_limit(bool check_limit) { _check_limit = check_limit; } @@ -98,8 +100,8 @@ class ThreadMemTrackerMgr { return fmt::format( "ThreadMemTrackerMgr debug, _untracked_mem:{}, _task_id:{}, " "_limiter_tracker:<{}>, _consumer_tracker_stack:<{}>", - std::to_string(_untracked_mem), _task_id, _limiter_tracker->log_usage(1), - fmt::to_string(consumer_tracker_buf)); + std::to_string(_untracked_mem), _task_id_stack.back(), + _limiter_tracker_raw->log_usage(1), fmt::to_string(consumer_tracker_buf)); } private: @@ -115,7 +117,8 @@ class ThreadMemTrackerMgr { int64_t old_untracked_mem = 0; std::string failed_msg = std::string(); - std::shared_ptr _limiter_tracker; + // _limiter_tracker_stack[0] = orphan_mem_tracker + std::vector> _limiter_tracker_stack; MemTrackerLimiter* _limiter_tracker_raw; std::vector _consumer_tracker_stack; @@ -124,16 +127,22 @@ class ThreadMemTrackerMgr { // If there is a memory new/delete operation in the consume method, it may enter infinite recursion. bool _stop_consume = false; bool _check_attach = true; - std::string _task_id; - TUniqueId _fragment_instance_id; + std::vector _task_id_stack; + std::vector _fragment_instance_id_stack; ExceedCallBack _cb_func = nullptr; }; inline void ThreadMemTrackerMgr::init() { DCHECK(_consumer_tracker_stack.empty()); - _task_id = ""; - _limiter_tracker = ExecEnv::GetInstance()->process_mem_tracker(); - _limiter_tracker_raw = ExecEnv::GetInstance()->process_mem_tracker_raw(); + // _limiter_tracker_stack[0] = orphan_mem_tracker + DCHECK(_limiter_tracker_stack.size() <= 1) + << "limiter_tracker_stack.size(): " << _limiter_tracker_stack.size(); + if (_limiter_tracker_stack.size() == 0) { + _limiter_tracker_stack.push_back(ExecEnv::GetInstance()->orphan_mem_tracker()); + _limiter_tracker_raw = ExecEnv::GetInstance()->orphan_mem_tracker_raw(); + _task_id_stack.push_back(""); + _fragment_instance_id_stack.push_back(TUniqueId()); + } _check_limit = true; } @@ -173,7 +182,7 @@ inline void ThreadMemTrackerMgr::flush_untracked_mem() { // the TCMalloc Hook again, so suspend consumption to avoid falling into an infinite loop. _stop_consume = true; old_untracked_mem = _untracked_mem; - DCHECK(_limiter_tracker); + DCHECK(_limiter_tracker_raw); if (CheckLimit) { #ifndef BE_TEST // When all threads are started, `attach_limiter_tracker` is expected to be called to bind the limiter tracker. @@ -183,7 +192,7 @@ inline void ThreadMemTrackerMgr::flush_untracked_mem() { // TODO(zxy) The current p0 test cannot guarantee that all threads are checked, // so disable it and try to open it when memory tracking is not on time. // DCHECK(!_check_attach || btls_key != EMPTY_BTLS_KEY || - // _limiter_tracker->label() != "Process"); + // _limiter_tracker_raw->label() != "Process"); #endif if (!_limiter_tracker_raw->try_consume(old_untracked_mem, failed_msg)) { // The memory has been allocated, so when TryConsume fails, need to continue to complete diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 9e33c70c414c2e..6f48e084abf7ad 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -218,14 +218,13 @@ Status RuntimeState::init(const TUniqueId& fragment_instance_id, const TQueryOpt Status RuntimeState::init_mem_trackers(const TUniqueId& query_id) { bool has_query_mem_tracker = _query_options.__isset.mem_limit && (_query_options.mem_limit > 0); int64_t bytes_limit = has_query_mem_tracker ? _query_options.mem_limit : -1; - if (bytes_limit > ExecEnv::GetInstance()->process_mem_tracker_raw()->limit()) { + if (bytes_limit > ExecEnv::GetInstance()->process_mem_tracker()->limit()) { VLOG_NOTICE << "Query memory limit " << PrettyPrinter::print(bytes_limit, TUnit::BYTES) << " exceeds process memory limit of " - << PrettyPrinter::print( - ExecEnv::GetInstance()->process_mem_tracker_raw()->limit(), - TUnit::BYTES) + << PrettyPrinter::print(ExecEnv::GetInstance()->process_mem_tracker()->limit(), + TUnit::BYTES) << ". Using process memory limit instead"; - bytes_limit = ExecEnv::GetInstance()->process_mem_tracker_raw()->limit(); + bytes_limit = ExecEnv::GetInstance()->process_mem_tracker()->limit(); } auto mem_tracker_counter = ADD_COUNTER(&_profile, "MemoryLimit", TUnit::BYTES); mem_tracker_counter->set(bytes_limit); diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp index 0faeae639c9ad5..730d3ed7582f5e 100644 --- a/be/src/service/doris_main.cpp +++ b/be/src/service/doris_main.cpp @@ -512,7 +512,7 @@ int main(int argc, char** argv) { doris::ExecEnv::GetInstance()->task_pool_mem_tracker_registry()->logout_task_mem_tracker(); // The process tracker print log usage interval is 1s to avoid a large number of tasks being // canceled when the process exceeds the mem limit, resulting in too many duplicate logs. - doris::ExecEnv::GetInstance()->process_mem_tracker_raw()->enable_print_log_usage(); + doris::ExecEnv::GetInstance()->process_mem_tracker()->enable_print_log_usage(); sleep(1); } diff --git a/be/src/vec/common/pod_array.h b/be/src/vec/common/pod_array.h index 3925644c11bf71..db64ab24d4631a 100644 --- a/be/src/vec/common/pod_array.h +++ b/be/src/vec/common/pod_array.h @@ -115,7 +115,7 @@ class PODArrayBase : private boost::noncopyable, inline void reset_peak() { if (UNLIKELY(c_end - c_end_peak > 65536)) { THREAD_MEM_TRACKER_TRANSFER_FROM(c_end - c_end_peak, - ExecEnv::GetInstance()->process_mem_tracker_raw()); + ExecEnv::GetInstance()->orphan_mem_tracker_raw()); c_end_peak = c_end; } } @@ -127,7 +127,7 @@ class PODArrayBase : private boost::noncopyable, template void alloc(size_t bytes, TAllocatorParams&&... allocator_params) { THREAD_MEM_TRACKER_TRANSFER_TO(bytes - pad_right - pad_left, - ExecEnv::GetInstance()->process_mem_tracker_raw()); + ExecEnv::GetInstance()->orphan_mem_tracker_raw()); c_start = c_end = c_end_peak = reinterpret_cast(TAllocator::alloc( bytes, std::forward(allocator_params)...)) + @@ -144,7 +144,7 @@ class PODArrayBase : private boost::noncopyable, TAllocator::free(c_start - pad_left, allocated_bytes()); THREAD_MEM_TRACKER_TRANSFER_FROM(c_end_of_storage - c_end_peak, - ExecEnv::GetInstance()->process_mem_tracker_raw()); + ExecEnv::GetInstance()->orphan_mem_tracker_raw()); } template @@ -157,7 +157,7 @@ class PODArrayBase : private boost::noncopyable, unprotect(); THREAD_MEM_TRACKER_TRANSFER_TO(bytes - allocated_bytes(), - ExecEnv::GetInstance()->process_mem_tracker_raw()); + ExecEnv::GetInstance()->orphan_mem_tracker_raw()); ptrdiff_t end_diff = c_end - c_start; diff --git a/be/test/testutil/run_all_tests.cpp b/be/test/testutil/run_all_tests.cpp index 77352db0103a84..ea518722113ae4 100644 --- a/be/test/testutil/run_all_tests.cpp +++ b/be/test/testutil/run_all_tests.cpp @@ -30,7 +30,9 @@ int main(int argc, char** argv) { std::shared_ptr process_mem_tracker = std::make_shared(-1, "Process"); - doris::ExecEnv::GetInstance()->set_process_mem_tracker(process_mem_tracker); + std::shared_ptr _orphan_mem_tracker = + std::make_shared(-1, "Orphan", process_mem_tracker); + doris::ExecEnv::GetInstance()->set_global_mem_tracker(process_mem_tracker, _orphan_mem_tracker); doris::thread_context()->_thread_mem_tracker_mgr->init(); doris::TabletSchemaCache::create_global_schema_cache(); doris::StoragePageCache::create_global_cache(1 << 30, 10);