diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index a1fdb60f9daeb4..b1211d48a725eb 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -25,7 +25,6 @@ #include "gutil/walltime.h" #include "runtime/runtime_state.h" #include "runtime/thread_context.h" -#include "service/backend_options.h" #include "util/pretty_printer.h" #include "util/string_util.h" @@ -192,36 +191,34 @@ std::string MemTrackerLimiter::log_usage(int max_recursive_depth, return usage_strings.size() == 0 ? "" : "\n " + join(usage_strings, "\n "); } -Status MemTrackerLimiter::mem_limit_exceeded_construct(const std::string& msg) { - std::string detail = fmt::format( - "{}. backend {} process memory used {}, limit {}. If query tracker exceed, `set " - "exec_mem_limit=8G` to change limit, details mem usage see be.INFO.", - msg, BackendOptions::get_localhost(), print_bytes(PerfCounters::get_vm_rss()), - print_bytes(MemInfo::mem_limit())); - return Status::MemoryLimitExceeded(detail); -} - void MemTrackerLimiter::print_log_usage(const std::string& msg) { DCHECK(_limit != -1); // only print the tracker log_usage in be log. std::string detail = msg; + detail += "\n " + fmt::format( + "process memory used {}, limit {}, hard limit {}, tc/jemalloc " + "allocator cache {}", + PerfCounters::get_vm_rss_str(), MemInfo::mem_limit_str(), + print_bytes(MemInfo::hard_mem_limit()), + MemInfo::allocator_cache_mem_str()); if (_print_log_usage) { if (_label == "Process") { // Dumping the process MemTracker is expensive. Limiting the recursive depth to two // levels limits the level of detail to a one-line summary for each query MemTracker. - detail += "\n" + log_usage(2); + detail += "\n " + log_usage(2); } else { - detail += "\n" + log_usage(); + detail += "\n " + log_usage(); } - // TODO: memory leak by calling `boost::stacktrace` in mem hook + // TODO: memory leak by calling `boost::stacktrace` in tcmalloc hook, + // test whether overwriting malloc/free is the same problem in jemalloc/tcmalloc. // detail += "\n" + boost::stacktrace::to_string(boost::stacktrace::stacktrace()); LOG(WARNING) << detail; _print_log_usage = false; } } -Status MemTrackerLimiter::mem_limit_exceeded(const std::string& msg, - int64_t failed_allocation_size) { +std::string MemTrackerLimiter::mem_limit_exceeded(const std::string& msg, + int64_t failed_allocation_size) { STOP_CHECK_THREAD_MEM_TRACKER_LIMIT(); std::string detail = fmt::format("Memory limit exceeded:, ", _label); MemTrackerLimiter* exceeded_tracker = nullptr; @@ -242,19 +239,13 @@ Status MemTrackerLimiter::mem_limit_exceeded(const std::string& msg, } } - auto sys_exceed_st = check_sys_mem_info(failed_allocation_size); MemTrackerLimiter* print_log_usage_tracker = nullptr; if (exceeded_tracker != nullptr) { - detail += fmt::format( - "failed alloc size {}, exceeded tracker:<{}>, limit {}, peak used {}, " - "current used {}>, executing msg:<{}>", - print_bytes(failed_allocation_size), exceeded_tracker->label(), - print_bytes(exceeded_tracker->limit()), - print_bytes(exceeded_tracker->peak_consumption()), - print_bytes(exceeded_tracker->consumption()), msg); + detail += limit_exceeded_errmsg_prefix_str(failed_allocation_size, exceeded_tracker); print_log_usage_tracker = exceeded_tracker; - } else if (!sys_exceed_st) { - detail += fmt::format("{}>, executing msg:<{}>", sys_exceed_st.get_error_msg(), msg); + } else if (sys_mem_exceed_limit_check(failed_allocation_size)) { + detail += fmt::format("{}>, executing msg:<{}>", + limit_exceeded_errmsg_sys_str(failed_allocation_size), msg); } else if (max_consumption_tracker != nullptr) { // must after check_sys_mem_info false detail += fmt::format( @@ -271,29 +262,28 @@ Status MemTrackerLimiter::mem_limit_exceeded(const std::string& msg, detail += fmt::format("unknown exceed reason, executing msg:<{}>", msg); print_log_usage_tracker = ExecEnv::GetInstance()->process_mem_tracker_raw(); } - auto st = MemTrackerLimiter::mem_limit_exceeded_construct(detail); - if (print_log_usage_tracker != nullptr) - print_log_usage_tracker->print_log_usage(st.get_error_msg()); - return st; + auto failed_msg = MemTrackerLimiter::limit_exceeded_errmsg_suffix_str(detail); + if (print_log_usage_tracker != nullptr) print_log_usage_tracker->print_log_usage(failed_msg); + return failed_msg; } -Status MemTrackerLimiter::mem_limit_exceeded(const std::string& msg, - MemTrackerLimiter* failed_tracker, - Status failed_try_consume_st) { +std::string MemTrackerLimiter::mem_limit_exceeded(const std::string& msg, + MemTrackerLimiter* failed_tracker, + const std::string& limit_exceeded_errmsg_prefix) { STOP_CHECK_THREAD_MEM_TRACKER_LIMIT(); std::string detail = fmt::format("Memory limit exceeded:, {}>, executing msg:<{}>", - _label, failed_try_consume_st.get_error_msg(), msg); - auto st = MemTrackerLimiter::mem_limit_exceeded_construct(detail); - failed_tracker->print_log_usage(st.get_error_msg()); - return st; + _label, limit_exceeded_errmsg_prefix, msg); + auto failed_msg = MemTrackerLimiter::limit_exceeded_errmsg_suffix_str(detail); + failed_tracker->print_log_usage(failed_msg); + return failed_msg; } Status MemTrackerLimiter::mem_limit_exceeded(RuntimeState* state, const std::string& msg, int64_t failed_alloc_size) { - Status rt = mem_limit_exceeded(msg, failed_alloc_size); - state->log_error(rt.to_string()); - return rt; + auto failed_msg = mem_limit_exceeded(msg, failed_alloc_size); + state->log_error(failed_msg); + return Status::MemoryLimitExceeded(failed_msg); } } // namespace doris diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index e571f9d2b024e3..7f12f08f53311a 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -22,6 +22,7 @@ #include "common/config.h" #include "runtime/exec_env.h" #include "runtime/memory/mem_tracker.h" +#include "service/backend_options.h" #include "util/mem_info.h" #include "util/perf_counters.h" @@ -64,7 +65,7 @@ class MemTrackerLimiter final : public MemTracker { size_t upper_level) const; public: - static Status check_sys_mem_info(int64_t bytes) { + static bool sys_mem_exceed_limit_check(int64_t bytes) { // Limit process memory usage using the actual physical memory of the process in `/proc/self/status`. // This is independent of the consumption value of the mem tracker, which counts the virtual memory // of the process malloc. @@ -78,16 +79,9 @@ class MemTrackerLimiter final : public MemTracker { bytes >= MemInfo::mem_limit() || PerfCounters::get_vm_rss() + bytes >= MemInfo::hard_mem_limit()) { - auto st = Status::MemoryLimitExceeded( - "process memory used {}, tc/jemalloc cache {}, exceed limit {}, hard limit {}, " - "failed alloc size {}", - print_bytes(PerfCounters::get_vm_rss()), - print_bytes(MemInfo::allocator_cache_mem()), print_bytes(MemInfo::mem_limit()), - print_bytes(MemInfo::hard_mem_limit()), print_bytes(bytes)); - ExecEnv::GetInstance()->process_mem_tracker_raw()->print_log_usage(st.get_error_msg()); - return st; + return true; } - return Status::OK(); + return false; } int64_t group_num() const { return _group_num; } @@ -150,9 +144,9 @@ class MemTrackerLimiter final : public MemTracker { // If 'failed_allocation_size' is greater than zero, logs the allocation size. If // 'failed_allocation_size' is zero, nothing about the allocation size is logged. // If 'state' is non-nullptr, logs the error to 'state'. - Status mem_limit_exceeded(const std::string& msg, int64_t failed_allocation_size = 0); - Status mem_limit_exceeded(const std::string& msg, MemTrackerLimiter* failed_tracker, - Status failed_try_consume_st); + std::string mem_limit_exceeded(const std::string& msg, int64_t failed_allocation_size = 0); + std::string mem_limit_exceeded(const std::string& msg, MemTrackerLimiter* failed_tracker, + const std::string& limit_exceeded_errmsg_prefix); Status mem_limit_exceeded(RuntimeState* state, const std::string& msg, int64_t failed_allocation_size = 0); @@ -167,7 +161,7 @@ class MemTrackerLimiter final : public MemTracker { } static std::string print_bytes(int64_t bytes) { - return fmt::format("{}", PrettyPrinter::print(bytes, TUnit::BYTES)); + return PrettyPrinter::print(bytes, TUnit::BYTES); } private: @@ -184,7 +178,7 @@ class MemTrackerLimiter final : public MemTracker { // they can all consume 'bytes' without exceeding limit. If limit would be exceed, // no MemTrackerLimiters are updated. Returns true if the consumption was successfully updated. WARN_UNUSED_RESULT - Status try_consume(int64_t bytes); + bool try_consume(int64_t bytes, std::string& failed_msg); // When the accumulated untracked memory value exceeds the upper limit, // the current value is returned and set to 0. @@ -198,7 +192,33 @@ class MemTrackerLimiter final : public MemTracker { const std::list& trackers, int64_t* logged_consumption); - static Status mem_limit_exceeded_construct(const std::string& msg); + static std::string limit_exceeded_errmsg_prefix_str(int64_t bytes, + MemTrackerLimiter* exceed_tracker) { + return fmt::format( + "failed alloc size {}, exceeded tracker:<{}>, limit {}, peak " + "used {}, current used {}", + print_bytes(bytes), exceed_tracker->label(), print_bytes(exceed_tracker->limit()), + print_bytes(exceed_tracker->_consumption->value()), + print_bytes(exceed_tracker->_consumption->current_value())); + } + + static std::string limit_exceeded_errmsg_suffix_str(const std::string& msg) { + return fmt::format( + "{}. backend {} process memory used {}, limit {}. If query tracker exceed, `set " + "exec_mem_limit=8G` to change limit, details mem usage see be.INFO.", + msg, BackendOptions::get_localhost(), PerfCounters::get_vm_rss_str(), + MemInfo::mem_limit_str()); + } + + static std::string limit_exceeded_errmsg_sys_str(int64_t bytes) { + auto err_msg = fmt::format( + "process memory used {}, tc/jemalloc allocator cache {}, exceed limit {}, failed " + "alloc size {}", + PerfCounters::get_vm_rss_str(), MemInfo::allocator_cache_mem_str(), + MemInfo::mem_limit_str(), print_bytes(bytes)); + ExecEnv::GetInstance()->process_mem_tracker_raw()->print_log_usage(err_msg); + return err_msg; + } private: // Limit on memory consumption, in bytes. If limit_ == -1, there is no consumption limit. Used in log_usage。 @@ -258,12 +278,16 @@ inline void MemTrackerLimiter::cache_consume_local(int64_t bytes) { } } -inline Status MemTrackerLimiter::try_consume(int64_t bytes) { +inline bool MemTrackerLimiter::try_consume(int64_t bytes, std::string& failed_msg) { if (bytes <= 0) { release(-bytes); - return Status::OK(); + failed_msg = std::string(); + return true; + } + if (sys_mem_exceed_limit_check(bytes)) { + failed_msg = limit_exceeded_errmsg_sys_str(bytes); + return false; } - RETURN_IF_ERROR(check_sys_mem_info(bytes)); int i; // Walk the tracker tree top-down. for (i = _all_ancestors.size() - 1; i >= 0; --i) { @@ -278,34 +302,28 @@ inline Status MemTrackerLimiter::try_consume(int64_t bytes) { for (int j = _all_ancestors.size() - 1; j > i; --j) { _all_ancestors[j]->_consumption->add(-bytes); } - return Status::MemoryLimitExceeded(fmt::format( - "failed alloc size {}, exceeded tracker:<{}>, limit {}, peak " - "used {}, current used {}", - print_bytes(bytes), tracker->label(), print_bytes(tracker->limit()), - print_bytes(tracker->_consumption->value()), - print_bytes(tracker->_consumption->current_value()))); + failed_msg = limit_exceeded_errmsg_prefix_str(bytes, tracker); + return false; } } } // Everyone succeeded, return. DCHECK_EQ(i, -1); - return Status::OK(); + failed_msg = std::string(); + return true; } inline Status MemTrackerLimiter::check_limit(int64_t bytes) { if (bytes <= 0) return Status::OK(); - RETURN_IF_ERROR(check_sys_mem_info(bytes)); + if (sys_mem_exceed_limit_check(bytes)) { + return Status::MemoryLimitExceeded(limit_exceeded_errmsg_sys_str(bytes)); + } int i; // Walk the tracker tree top-down. for (i = _limited_ancestors.size() - 1; i >= 0; --i) { MemTrackerLimiter* tracker = _limited_ancestors[i]; if (tracker->_consumption->current_value() + bytes > tracker->limit()) { - return Status::MemoryLimitExceeded( - fmt::format("expected alloc size {}, exceeded tracker:<{}>, limit {}, peak " - "used {}, current used {}", - print_bytes(bytes), tracker->label(), print_bytes(tracker->limit()), - print_bytes(tracker->_consumption->value()), - print_bytes(tracker->_consumption->current_value()))); + return Status::MemoryLimitExceeded(limit_exceeded_errmsg_prefix_str(bytes, tracker)); } } return Status::OK(); diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp index 5cbc76eadccb12..aba75589fb37ae 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp @@ -51,16 +51,17 @@ void ThreadMemTrackerMgr::exceeded_cancel_task(const std::string& cancel_details } } -void ThreadMemTrackerMgr::exceeded(Status failed_try_consume_st) { +void ThreadMemTrackerMgr::exceeded(const std::string& failed_msg) { if (_cb_func != nullptr) { _cb_func(); } if (is_attach_query()) { - auto st = _limiter_tracker_raw->mem_limit_exceeded( + auto cancel_msg = _limiter_tracker_raw->mem_limit_exceeded( fmt::format("exec node:<{}>", last_consumer_tracker()), - _limiter_tracker_raw->parent().get(), failed_try_consume_st); - exceeded_cancel_task(st.get_error_msg()); + _limiter_tracker_raw->parent().get(), failed_msg); + exceeded_cancel_task(cancel_msg); _check_limit = false; // Make sure it will only be canceled once } } + } // namespace doris diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h index 13ecea46b4fc3a..670f852316e248 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.h +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h @@ -106,12 +106,14 @@ class ThreadMemTrackerMgr { // If tryConsume fails due to task mem tracker exceeding the limit, the task must be canceled void exceeded_cancel_task(const std::string& cancel_details); - void exceeded(Status failed_try_consume_st); + void exceeded(const std::string& failed_msg); private: // Cache untracked mem, only update to _untracked_mems when switching mem tracker. // Frequent calls to unordered_map _untracked_mems[] in consume will degrade performance. int64_t _untracked_mem = 0; + int64_t old_untracked_mem = 0; + std::string failed_msg = std::string(); std::shared_ptr _limiter_tracker; MemTrackerLimiter* _limiter_tracker_raw; @@ -170,6 +172,7 @@ inline void ThreadMemTrackerMgr::flush_untracked_mem() { // Temporary memory may be allocated during the consumption of the mem tracker, which will lead to entering // the TCMalloc Hook again, so suspend consumption to avoid falling into an infinite loop. _stop_consume = true; + old_untracked_mem = _untracked_mem; DCHECK(_limiter_tracker); if (CheckLimit) { #ifndef BE_TEST @@ -182,20 +185,19 @@ inline void ThreadMemTrackerMgr::flush_untracked_mem() { // DCHECK(!_check_attach || btls_key != EMPTY_BTLS_KEY || // _limiter_tracker->label() != "Process"); #endif - Status st = _limiter_tracker_raw->try_consume(_untracked_mem); - if (!st) { + if (!_limiter_tracker_raw->try_consume(old_untracked_mem, failed_msg)) { // The memory has been allocated, so when TryConsume fails, need to continue to complete // the consume to ensure the accuracy of the statistics. - _limiter_tracker_raw->consume(_untracked_mem); - exceeded(st); + _limiter_tracker_raw->consume(old_untracked_mem); + exceeded(failed_msg); } } else { - _limiter_tracker_raw->consume(_untracked_mem); + _limiter_tracker_raw->consume(old_untracked_mem); } for (auto tracker : _consumer_tracker_stack) { - tracker->consume(_untracked_mem); + tracker->consume(old_untracked_mem); } - _untracked_mem = 0; + _untracked_mem -= old_untracked_mem; _stop_consume = false; } diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp index 1607ccf7624f12..522ea0566a5c67 100644 --- a/be/src/util/mem_info.cpp +++ b/be/src/util/mem_info.cpp @@ -40,6 +40,7 @@ namespace doris { bool MemInfo::_s_initialized = false; int64_t MemInfo::_s_physical_mem = -1; int64_t MemInfo::_s_mem_limit = -1; +std::string MemInfo::_s_mem_limit_str = ""; int64_t MemInfo::_s_hard_mem_limit = -1; size_t MemInfo::_s_allocator_physical_mem = 0; size_t MemInfo::_s_tcmalloc_pageheap_free_bytes = 0; @@ -47,6 +48,7 @@ size_t MemInfo::_s_tcmalloc_central_bytes = 0; size_t MemInfo::_s_tcmalloc_transfer_bytes = 0; size_t MemInfo::_s_tcmalloc_thread_bytes = 0; size_t MemInfo::_s_allocator_cache_mem = 0; +std::string MemInfo::_s_allocator_cache_mem_str = ""; void MemInfo::init() { // Read from /proc/meminfo @@ -94,6 +96,7 @@ void MemInfo::init() { bool is_percent = true; _s_mem_limit = ParseUtil::parse_mem_spec(config::mem_limit, -1, _s_physical_mem, &is_percent); + _s_mem_limit_str = PrettyPrinter::print(_s_mem_limit, TUnit::BYTES); _s_hard_mem_limit = _s_physical_mem - std::min(209715200L, _s_physical_mem / 10); // 200M LOG(INFO) << "Physical Memory: " << PrettyPrinter::print(_s_physical_mem, TUnit::BYTES); diff --git a/be/src/util/mem_info.h b/be/src/util/mem_info.h index f29e328a8ee44a..2aabf18b384dd6 100644 --- a/be/src/util/mem_info.h +++ b/be/src/util/mem_info.h @@ -25,6 +25,7 @@ #include #include "common/logging.h" +#include "util/pretty_printer.h" namespace doris { @@ -46,6 +47,7 @@ class MemInfo { static inline size_t current_mem() { return _s_allocator_physical_mem; } static inline size_t allocator_cache_mem() { return _s_allocator_cache_mem; } + static inline std::string allocator_cache_mem_str() { return _s_allocator_cache_mem_str; } // Tcmalloc property `generic.total_physical_bytes` records the total length of the virtual memory // obtained by the process malloc, not the physical memory actually used by the process in the OS. @@ -62,12 +64,17 @@ class MemInfo { &_s_tcmalloc_thread_bytes); _s_allocator_cache_mem = _s_tcmalloc_pageheap_free_bytes + _s_tcmalloc_central_bytes + _s_tcmalloc_transfer_bytes + _s_tcmalloc_thread_bytes; + _s_allocator_cache_mem_str = PrettyPrinter::print(_s_allocator_cache_mem, TUnit::BYTES); } static inline int64_t mem_limit() { DCHECK(_s_initialized); return _s_mem_limit; } + static inline std::string mem_limit_str() { + DCHECK(_s_initialized); + return _s_mem_limit_str; + } static inline int64_t hard_mem_limit() { return _s_hard_mem_limit; } static std::string debug_string(); @@ -76,6 +83,7 @@ class MemInfo { static bool _s_initialized; static int64_t _s_physical_mem; static int64_t _s_mem_limit; + static std::string _s_mem_limit_str; static int64_t _s_hard_mem_limit; static size_t _s_allocator_physical_mem; static size_t _s_tcmalloc_pageheap_free_bytes; @@ -83,6 +91,7 @@ class MemInfo { static size_t _s_tcmalloc_transfer_bytes; static size_t _s_tcmalloc_thread_bytes; static size_t _s_allocator_cache_mem; + static std::string _s_allocator_cache_mem_str; }; } // namespace doris diff --git a/be/src/util/perf_counters.cpp b/be/src/util/perf_counters.cpp index 2e9d51d56ce36d..8951fd3a20eb5f 100644 --- a/be/src/util/perf_counters.cpp +++ b/be/src/util/perf_counters.cpp @@ -46,6 +46,9 @@ namespace doris { static std::unordered_map _process_state; +int64_t PerfCounters::_vm_rss = 0; +std::string PerfCounters::_vm_rss_str = ""; + // This is the order of the counters in /proc/self/io enum PERF_IO_IDX { PROC_IO_READ = 0, @@ -575,6 +578,9 @@ void PerfCounters::refresh_proc_status() { } if (statusinfo.is_open()) statusinfo.close(); + + _vm_rss = parse_bytes("status/VmRSS"); + _vm_rss_str = PrettyPrinter::print(_vm_rss, TUnit::BYTES); } void PerfCounters::get_proc_status(ProcStatus* out) { diff --git a/be/src/util/perf_counters.h b/be/src/util/perf_counters.h index c2a4b1c4d9893d..1a19f6025b6624 100644 --- a/be/src/util/perf_counters.h +++ b/be/src/util/perf_counters.h @@ -115,7 +115,8 @@ class PerfCounters { static void refresh_proc_status(); static void get_proc_status(ProcStatus* out); // Return the process actual physical memory in bytes. - static inline int64_t get_vm_rss() { return parse_bytes("status/VmRSS"); } + static inline int64_t get_vm_rss() { return _vm_rss; } + static inline std::string get_vm_rss_str() { return _vm_rss_str; } private: // Copy constructor and assignment not allowed @@ -159,6 +160,9 @@ class PerfCounters { // System perf counters can be grouped together. The OS will update all grouped counters // at the same time. This is useful to better correlate counter values. int _group_fd; + + static int64_t _vm_rss; + static std::string _vm_rss_str; }; } // namespace doris diff --git a/be/test/runtime/mem_limit_test.cpp b/be/test/runtime/mem_limit_test.cpp index 470074d0ebc23f..811102b2168a3c 100644 --- a/be/test/runtime/mem_limit_test.cpp +++ b/be/test/runtime/mem_limit_test.cpp @@ -100,7 +100,8 @@ TEST(MemTestTest, TrackerHierarchyTryConsume) { auto c2 = std::make_unique(50, "c2", p); // everything below limits - bool consumption = c1->try_consume(60).ok(); + std::string err_msg = ""; + bool consumption = c1->try_consume(60, err_msg); EXPECT_EQ(consumption, true); EXPECT_EQ(c1->consumption(), 60); EXPECT_FALSE(c1->limit_exceeded()); @@ -113,7 +114,7 @@ TEST(MemTestTest, TrackerHierarchyTryConsume) { EXPECT_FALSE(p->any_limit_exceeded()); // p goes over limit - consumption = c2->try_consume(50).ok(); + consumption = c2->try_consume(50, err_msg); EXPECT_EQ(consumption, false); EXPECT_EQ(c1->consumption(), 60); EXPECT_FALSE(c1->limit_exceeded());