Skip to content

Commit

Permalink
[fix](memtracker) Fix thread mem tracker try consume accuracy apache#…
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz authored and Yijia Su committed Oct 8, 2022
1 parent 7677426 commit 76c9756
Show file tree
Hide file tree
Showing 9 changed files with 121 additions and 87 deletions.
68 changes: 29 additions & 39 deletions be/src/runtime/memory/mem_tracker_limiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
#include "gutil/walltime.h"
#include "runtime/runtime_state.h"
#include "runtime/thread_context.h"
#include "service/backend_options.h"
#include "util/pretty_printer.h"
#include "util/string_util.h"

Expand Down Expand Up @@ -192,36 +191,34 @@ std::string MemTrackerLimiter::log_usage(int max_recursive_depth,
return usage_strings.size() == 0 ? "" : "\n " + join(usage_strings, "\n ");
}

Status MemTrackerLimiter::mem_limit_exceeded_construct(const std::string& msg) {
std::string detail = fmt::format(
"{}. backend {} process memory used {}, limit {}. If query tracker exceed, `set "
"exec_mem_limit=8G` to change limit, details mem usage see be.INFO.",
msg, BackendOptions::get_localhost(), print_bytes(PerfCounters::get_vm_rss()),
print_bytes(MemInfo::mem_limit()));
return Status::MemoryLimitExceeded(detail);
}

void MemTrackerLimiter::print_log_usage(const std::string& msg) {
DCHECK(_limit != -1);
// only print the tracker log_usage in be log.
std::string detail = msg;
detail += "\n " + fmt::format(
"process memory used {}, limit {}, hard limit {}, tc/jemalloc "
"allocator cache {}",
PerfCounters::get_vm_rss_str(), MemInfo::mem_limit_str(),
print_bytes(MemInfo::hard_mem_limit()),
MemInfo::allocator_cache_mem_str());
if (_print_log_usage) {
if (_label == "Process") {
// Dumping the process MemTracker is expensive. Limiting the recursive depth to two
// levels limits the level of detail to a one-line summary for each query MemTracker.
detail += "\n" + log_usage(2);
detail += "\n " + log_usage(2);
} else {
detail += "\n" + log_usage();
detail += "\n " + log_usage();
}
// TODO: memory leak by calling `boost::stacktrace` in mem hook
// TODO: memory leak by calling `boost::stacktrace` in tcmalloc hook,
// test whether overwriting malloc/free is the same problem in jemalloc/tcmalloc.
// detail += "\n" + boost::stacktrace::to_string(boost::stacktrace::stacktrace());
LOG(WARNING) << detail;
_print_log_usage = false;
}
}

Status MemTrackerLimiter::mem_limit_exceeded(const std::string& msg,
int64_t failed_allocation_size) {
std::string MemTrackerLimiter::mem_limit_exceeded(const std::string& msg,
int64_t failed_allocation_size) {
STOP_CHECK_THREAD_MEM_TRACKER_LIMIT();
std::string detail = fmt::format("Memory limit exceeded:<consuming tracker:<{}>, ", _label);
MemTrackerLimiter* exceeded_tracker = nullptr;
Expand All @@ -242,19 +239,13 @@ Status MemTrackerLimiter::mem_limit_exceeded(const std::string& msg,
}
}

auto sys_exceed_st = check_sys_mem_info(failed_allocation_size);
MemTrackerLimiter* print_log_usage_tracker = nullptr;
if (exceeded_tracker != nullptr) {
detail += fmt::format(
"failed alloc size {}, exceeded tracker:<{}>, limit {}, peak used {}, "
"current used {}>, executing msg:<{}>",
print_bytes(failed_allocation_size), exceeded_tracker->label(),
print_bytes(exceeded_tracker->limit()),
print_bytes(exceeded_tracker->peak_consumption()),
print_bytes(exceeded_tracker->consumption()), msg);
detail += limit_exceeded_errmsg_prefix_str(failed_allocation_size, exceeded_tracker);
print_log_usage_tracker = exceeded_tracker;
} else if (!sys_exceed_st) {
detail += fmt::format("{}>, executing msg:<{}>", sys_exceed_st.get_error_msg(), msg);
} else if (sys_mem_exceed_limit_check(failed_allocation_size)) {
detail += fmt::format("{}>, executing msg:<{}>",
limit_exceeded_errmsg_sys_str(failed_allocation_size), msg);
} else if (max_consumption_tracker != nullptr) {
// must after check_sys_mem_info false
detail += fmt::format(
Expand All @@ -271,29 +262,28 @@ Status MemTrackerLimiter::mem_limit_exceeded(const std::string& msg,
detail += fmt::format("unknown exceed reason, executing msg:<{}>", msg);
print_log_usage_tracker = ExecEnv::GetInstance()->process_mem_tracker_raw();
}
auto st = MemTrackerLimiter::mem_limit_exceeded_construct(detail);
if (print_log_usage_tracker != nullptr)
print_log_usage_tracker->print_log_usage(st.get_error_msg());
return st;
auto failed_msg = MemTrackerLimiter::limit_exceeded_errmsg_suffix_str(detail);
if (print_log_usage_tracker != nullptr) print_log_usage_tracker->print_log_usage(failed_msg);
return failed_msg;
}

Status MemTrackerLimiter::mem_limit_exceeded(const std::string& msg,
MemTrackerLimiter* failed_tracker,
Status failed_try_consume_st) {
std::string MemTrackerLimiter::mem_limit_exceeded(const std::string& msg,
MemTrackerLimiter* failed_tracker,
const std::string& limit_exceeded_errmsg_prefix) {
STOP_CHECK_THREAD_MEM_TRACKER_LIMIT();
std::string detail =
fmt::format("Memory limit exceeded:<consuming tracker:<{}>, {}>, executing msg:<{}>",
_label, failed_try_consume_st.get_error_msg(), msg);
auto st = MemTrackerLimiter::mem_limit_exceeded_construct(detail);
failed_tracker->print_log_usage(st.get_error_msg());
return st;
_label, limit_exceeded_errmsg_prefix, msg);
auto failed_msg = MemTrackerLimiter::limit_exceeded_errmsg_suffix_str(detail);
failed_tracker->print_log_usage(failed_msg);
return failed_msg;
}

Status MemTrackerLimiter::mem_limit_exceeded(RuntimeState* state, const std::string& msg,
int64_t failed_alloc_size) {
Status rt = mem_limit_exceeded(msg, failed_alloc_size);
state->log_error(rt.to_string());
return rt;
auto failed_msg = mem_limit_exceeded(msg, failed_alloc_size);
state->log_error(failed_msg);
return Status::MemoryLimitExceeded(failed_msg);
}

} // namespace doris
84 changes: 51 additions & 33 deletions be/src/runtime/memory/mem_tracker_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "common/config.h"
#include "runtime/exec_env.h"
#include "runtime/memory/mem_tracker.h"
#include "service/backend_options.h"
#include "util/mem_info.h"
#include "util/perf_counters.h"

Expand Down Expand Up @@ -64,7 +65,7 @@ class MemTrackerLimiter final : public MemTracker {
size_t upper_level) const;

public:
static Status check_sys_mem_info(int64_t bytes) {
static bool sys_mem_exceed_limit_check(int64_t bytes) {
// Limit process memory usage using the actual physical memory of the process in `/proc/self/status`.
// This is independent of the consumption value of the mem tracker, which counts the virtual memory
// of the process malloc.
Expand All @@ -78,16 +79,9 @@ class MemTrackerLimiter final : public MemTracker {
bytes >=
MemInfo::mem_limit() ||
PerfCounters::get_vm_rss() + bytes >= MemInfo::hard_mem_limit()) {
auto st = Status::MemoryLimitExceeded(
"process memory used {}, tc/jemalloc cache {}, exceed limit {}, hard limit {}, "
"failed alloc size {}",
print_bytes(PerfCounters::get_vm_rss()),
print_bytes(MemInfo::allocator_cache_mem()), print_bytes(MemInfo::mem_limit()),
print_bytes(MemInfo::hard_mem_limit()), print_bytes(bytes));
ExecEnv::GetInstance()->process_mem_tracker_raw()->print_log_usage(st.get_error_msg());
return st;
return true;
}
return Status::OK();
return false;
}

int64_t group_num() const { return _group_num; }
Expand Down Expand Up @@ -150,9 +144,9 @@ class MemTrackerLimiter final : public MemTracker {
// If 'failed_allocation_size' is greater than zero, logs the allocation size. If
// 'failed_allocation_size' is zero, nothing about the allocation size is logged.
// If 'state' is non-nullptr, logs the error to 'state'.
Status mem_limit_exceeded(const std::string& msg, int64_t failed_allocation_size = 0);
Status mem_limit_exceeded(const std::string& msg, MemTrackerLimiter* failed_tracker,
Status failed_try_consume_st);
std::string mem_limit_exceeded(const std::string& msg, int64_t failed_allocation_size = 0);
std::string mem_limit_exceeded(const std::string& msg, MemTrackerLimiter* failed_tracker,
const std::string& limit_exceeded_errmsg_prefix);
Status mem_limit_exceeded(RuntimeState* state, const std::string& msg,
int64_t failed_allocation_size = 0);

Expand All @@ -167,7 +161,7 @@ class MemTrackerLimiter final : public MemTracker {
}

static std::string print_bytes(int64_t bytes) {
return fmt::format("{}", PrettyPrinter::print(bytes, TUnit::BYTES));
return PrettyPrinter::print(bytes, TUnit::BYTES);
}

private:
Expand All @@ -184,7 +178,7 @@ class MemTrackerLimiter final : public MemTracker {
// they can all consume 'bytes' without exceeding limit. If limit would be exceed,
// no MemTrackerLimiters are updated. Returns true if the consumption was successfully updated.
WARN_UNUSED_RESULT
Status try_consume(int64_t bytes);
bool try_consume(int64_t bytes, std::string& failed_msg);

// When the accumulated untracked memory value exceeds the upper limit,
// the current value is returned and set to 0.
Expand All @@ -198,7 +192,33 @@ class MemTrackerLimiter final : public MemTracker {
const std::list<MemTrackerLimiter*>& trackers,
int64_t* logged_consumption);

static Status mem_limit_exceeded_construct(const std::string& msg);
static std::string limit_exceeded_errmsg_prefix_str(int64_t bytes,
MemTrackerLimiter* exceed_tracker) {
return fmt::format(
"failed alloc size {}, exceeded tracker:<{}>, limit {}, peak "
"used {}, current used {}",
print_bytes(bytes), exceed_tracker->label(), print_bytes(exceed_tracker->limit()),
print_bytes(exceed_tracker->_consumption->value()),
print_bytes(exceed_tracker->_consumption->current_value()));
}

static std::string limit_exceeded_errmsg_suffix_str(const std::string& msg) {
return fmt::format(
"{}. backend {} process memory used {}, limit {}. If query tracker exceed, `set "
"exec_mem_limit=8G` to change limit, details mem usage see be.INFO.",
msg, BackendOptions::get_localhost(), PerfCounters::get_vm_rss_str(),
MemInfo::mem_limit_str());
}

static std::string limit_exceeded_errmsg_sys_str(int64_t bytes) {
auto err_msg = fmt::format(
"process memory used {}, tc/jemalloc allocator cache {}, exceed limit {}, failed "
"alloc size {}",
PerfCounters::get_vm_rss_str(), MemInfo::allocator_cache_mem_str(),
MemInfo::mem_limit_str(), print_bytes(bytes));
ExecEnv::GetInstance()->process_mem_tracker_raw()->print_log_usage(err_msg);
return err_msg;
}

private:
// Limit on memory consumption, in bytes. If limit_ == -1, there is no consumption limit. Used in log_usage。
Expand Down Expand Up @@ -258,12 +278,16 @@ inline void MemTrackerLimiter::cache_consume_local(int64_t bytes) {
}
}

inline Status MemTrackerLimiter::try_consume(int64_t bytes) {
inline bool MemTrackerLimiter::try_consume(int64_t bytes, std::string& failed_msg) {
if (bytes <= 0) {
release(-bytes);
return Status::OK();
failed_msg = std::string();
return true;
}
if (sys_mem_exceed_limit_check(bytes)) {
failed_msg = limit_exceeded_errmsg_sys_str(bytes);
return false;
}
RETURN_IF_ERROR(check_sys_mem_info(bytes));
int i;
// Walk the tracker tree top-down.
for (i = _all_ancestors.size() - 1; i >= 0; --i) {
Expand All @@ -278,34 +302,28 @@ inline Status MemTrackerLimiter::try_consume(int64_t bytes) {
for (int j = _all_ancestors.size() - 1; j > i; --j) {
_all_ancestors[j]->_consumption->add(-bytes);
}
return Status::MemoryLimitExceeded(fmt::format(
"failed alloc size {}, exceeded tracker:<{}>, limit {}, peak "
"used {}, current used {}",
print_bytes(bytes), tracker->label(), print_bytes(tracker->limit()),
print_bytes(tracker->_consumption->value()),
print_bytes(tracker->_consumption->current_value())));
failed_msg = limit_exceeded_errmsg_prefix_str(bytes, tracker);
return false;
}
}
}
// Everyone succeeded, return.
DCHECK_EQ(i, -1);
return Status::OK();
failed_msg = std::string();
return true;
}

inline Status MemTrackerLimiter::check_limit(int64_t bytes) {
if (bytes <= 0) return Status::OK();
RETURN_IF_ERROR(check_sys_mem_info(bytes));
if (sys_mem_exceed_limit_check(bytes)) {
return Status::MemoryLimitExceeded(limit_exceeded_errmsg_sys_str(bytes));
}
int i;
// Walk the tracker tree top-down.
for (i = _limited_ancestors.size() - 1; i >= 0; --i) {
MemTrackerLimiter* tracker = _limited_ancestors[i];
if (tracker->_consumption->current_value() + bytes > tracker->limit()) {
return Status::MemoryLimitExceeded(
fmt::format("expected alloc size {}, exceeded tracker:<{}>, limit {}, peak "
"used {}, current used {}",
print_bytes(bytes), tracker->label(), print_bytes(tracker->limit()),
print_bytes(tracker->_consumption->value()),
print_bytes(tracker->_consumption->current_value())));
return Status::MemoryLimitExceeded(limit_exceeded_errmsg_prefix_str(bytes, tracker));
}
}
return Status::OK();
Expand Down
9 changes: 5 additions & 4 deletions be/src/runtime/memory/thread_mem_tracker_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,17 @@ void ThreadMemTrackerMgr::exceeded_cancel_task(const std::string& cancel_details
}
}

void ThreadMemTrackerMgr::exceeded(Status failed_try_consume_st) {
void ThreadMemTrackerMgr::exceeded(const std::string& failed_msg) {
if (_cb_func != nullptr) {
_cb_func();
}
if (is_attach_query()) {
auto st = _limiter_tracker_raw->mem_limit_exceeded(
auto cancel_msg = _limiter_tracker_raw->mem_limit_exceeded(
fmt::format("exec node:<{}>", last_consumer_tracker()),
_limiter_tracker_raw->parent().get(), failed_try_consume_st);
exceeded_cancel_task(st.get_error_msg());
_limiter_tracker_raw->parent().get(), failed_msg);
exceeded_cancel_task(cancel_msg);
_check_limit = false; // Make sure it will only be canceled once
}
}

} // namespace doris
18 changes: 10 additions & 8 deletions be/src/runtime/memory/thread_mem_tracker_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,14 @@ class ThreadMemTrackerMgr {
// If tryConsume fails due to task mem tracker exceeding the limit, the task must be canceled
void exceeded_cancel_task(const std::string& cancel_details);

void exceeded(Status failed_try_consume_st);
void exceeded(const std::string& failed_msg);

private:
// Cache untracked mem, only update to _untracked_mems when switching mem tracker.
// Frequent calls to unordered_map _untracked_mems[] in consume will degrade performance.
int64_t _untracked_mem = 0;
int64_t old_untracked_mem = 0;
std::string failed_msg = std::string();

std::shared_ptr<MemTrackerLimiter> _limiter_tracker;
MemTrackerLimiter* _limiter_tracker_raw;
Expand Down Expand Up @@ -170,6 +172,7 @@ inline void ThreadMemTrackerMgr::flush_untracked_mem() {
// Temporary memory may be allocated during the consumption of the mem tracker, which will lead to entering
// the TCMalloc Hook again, so suspend consumption to avoid falling into an infinite loop.
_stop_consume = true;
old_untracked_mem = _untracked_mem;
DCHECK(_limiter_tracker);
if (CheckLimit) {
#ifndef BE_TEST
Expand All @@ -182,20 +185,19 @@ inline void ThreadMemTrackerMgr::flush_untracked_mem() {
// DCHECK(!_check_attach || btls_key != EMPTY_BTLS_KEY ||
// _limiter_tracker->label() != "Process");
#endif
Status st = _limiter_tracker_raw->try_consume(_untracked_mem);
if (!st) {
if (!_limiter_tracker_raw->try_consume(old_untracked_mem, failed_msg)) {
// The memory has been allocated, so when TryConsume fails, need to continue to complete
// the consume to ensure the accuracy of the statistics.
_limiter_tracker_raw->consume(_untracked_mem);
exceeded(st);
_limiter_tracker_raw->consume(old_untracked_mem);
exceeded(failed_msg);
}
} else {
_limiter_tracker_raw->consume(_untracked_mem);
_limiter_tracker_raw->consume(old_untracked_mem);
}
for (auto tracker : _consumer_tracker_stack) {
tracker->consume(_untracked_mem);
tracker->consume(old_untracked_mem);
}
_untracked_mem = 0;
_untracked_mem -= old_untracked_mem;
_stop_consume = false;
}

Expand Down
3 changes: 3 additions & 0 deletions be/src/util/mem_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,15 @@ namespace doris {
bool MemInfo::_s_initialized = false;
int64_t MemInfo::_s_physical_mem = -1;
int64_t MemInfo::_s_mem_limit = -1;
std::string MemInfo::_s_mem_limit_str = "";
int64_t MemInfo::_s_hard_mem_limit = -1;
size_t MemInfo::_s_allocator_physical_mem = 0;
size_t MemInfo::_s_tcmalloc_pageheap_free_bytes = 0;
size_t MemInfo::_s_tcmalloc_central_bytes = 0;
size_t MemInfo::_s_tcmalloc_transfer_bytes = 0;
size_t MemInfo::_s_tcmalloc_thread_bytes = 0;
size_t MemInfo::_s_allocator_cache_mem = 0;
std::string MemInfo::_s_allocator_cache_mem_str = "";

void MemInfo::init() {
// Read from /proc/meminfo
Expand Down Expand Up @@ -94,6 +96,7 @@ void MemInfo::init() {

bool is_percent = true;
_s_mem_limit = ParseUtil::parse_mem_spec(config::mem_limit, -1, _s_physical_mem, &is_percent);
_s_mem_limit_str = PrettyPrinter::print(_s_mem_limit, TUnit::BYTES);
_s_hard_mem_limit = _s_physical_mem - std::min(209715200L, _s_physical_mem / 10); // 200M

LOG(INFO) << "Physical Memory: " << PrettyPrinter::print(_s_physical_mem, TUnit::BYTES);
Expand Down
Loading

0 comments on commit 76c9756

Please sign in to comment.