Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](memtracker) Fix thread mem tracker try consume accuracy #12782

Merged
merged 1 commit into from
Sep 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 29 additions & 39 deletions be/src/runtime/memory/mem_tracker_limiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
#include "gutil/walltime.h"
#include "runtime/runtime_state.h"
#include "runtime/thread_context.h"
#include "service/backend_options.h"
#include "util/pretty_printer.h"
#include "util/string_util.h"

Expand Down Expand Up @@ -192,36 +191,34 @@ std::string MemTrackerLimiter::log_usage(int max_recursive_depth,
return usage_strings.size() == 0 ? "" : "\n " + join(usage_strings, "\n ");
}

Status MemTrackerLimiter::mem_limit_exceeded_construct(const std::string& msg) {
std::string detail = fmt::format(
"{}. backend {} process memory used {}, limit {}. If query tracker exceed, `set "
"exec_mem_limit=8G` to change limit, details mem usage see be.INFO.",
msg, BackendOptions::get_localhost(), print_bytes(PerfCounters::get_vm_rss()),
print_bytes(MemInfo::mem_limit()));
return Status::MemoryLimitExceeded(detail);
}

void MemTrackerLimiter::print_log_usage(const std::string& msg) {
DCHECK(_limit != -1);
// only print the tracker log_usage in be log.
std::string detail = msg;
detail += "\n " + fmt::format(
"process memory used {}, limit {}, hard limit {}, tc/jemalloc "
"allocator cache {}",
PerfCounters::get_vm_rss_str(), MemInfo::mem_limit_str(),
print_bytes(MemInfo::hard_mem_limit()),
MemInfo::allocator_cache_mem_str());
if (_print_log_usage) {
if (_label == "Process") {
// Dumping the process MemTracker is expensive. Limiting the recursive depth to two
// levels limits the level of detail to a one-line summary for each query MemTracker.
detail += "\n" + log_usage(2);
detail += "\n " + log_usage(2);
} else {
detail += "\n" + log_usage();
detail += "\n " + log_usage();
}
// TODO: memory leak by calling `boost::stacktrace` in mem hook
// TODO: memory leak by calling `boost::stacktrace` in tcmalloc hook,
// test whether overwriting malloc/free is the same problem in jemalloc/tcmalloc.
// detail += "\n" + boost::stacktrace::to_string(boost::stacktrace::stacktrace());
LOG(WARNING) << detail;
_print_log_usage = false;
}
}

Status MemTrackerLimiter::mem_limit_exceeded(const std::string& msg,
int64_t failed_allocation_size) {
std::string MemTrackerLimiter::mem_limit_exceeded(const std::string& msg,
int64_t failed_allocation_size) {
STOP_CHECK_THREAD_MEM_TRACKER_LIMIT();
std::string detail = fmt::format("Memory limit exceeded:<consuming tracker:<{}>, ", _label);
MemTrackerLimiter* exceeded_tracker = nullptr;
Expand All @@ -242,19 +239,13 @@ Status MemTrackerLimiter::mem_limit_exceeded(const std::string& msg,
}
}

auto sys_exceed_st = check_sys_mem_info(failed_allocation_size);
MemTrackerLimiter* print_log_usage_tracker = nullptr;
if (exceeded_tracker != nullptr) {
detail += fmt::format(
"failed alloc size {}, exceeded tracker:<{}>, limit {}, peak used {}, "
"current used {}>, executing msg:<{}>",
print_bytes(failed_allocation_size), exceeded_tracker->label(),
print_bytes(exceeded_tracker->limit()),
print_bytes(exceeded_tracker->peak_consumption()),
print_bytes(exceeded_tracker->consumption()), msg);
detail += limit_exceeded_errmsg_prefix_str(failed_allocation_size, exceeded_tracker);
print_log_usage_tracker = exceeded_tracker;
} else if (!sys_exceed_st) {
detail += fmt::format("{}>, executing msg:<{}>", sys_exceed_st.get_error_msg(), msg);
} else if (sys_mem_exceed_limit_check(failed_allocation_size)) {
detail += fmt::format("{}>, executing msg:<{}>",
limit_exceeded_errmsg_sys_str(failed_allocation_size), msg);
} else if (max_consumption_tracker != nullptr) {
// must after check_sys_mem_info false
detail += fmt::format(
Expand All @@ -271,29 +262,28 @@ Status MemTrackerLimiter::mem_limit_exceeded(const std::string& msg,
detail += fmt::format("unknown exceed reason, executing msg:<{}>", msg);
print_log_usage_tracker = ExecEnv::GetInstance()->process_mem_tracker_raw();
}
auto st = MemTrackerLimiter::mem_limit_exceeded_construct(detail);
if (print_log_usage_tracker != nullptr)
print_log_usage_tracker->print_log_usage(st.get_error_msg());
return st;
auto failed_msg = MemTrackerLimiter::limit_exceeded_errmsg_suffix_str(detail);
if (print_log_usage_tracker != nullptr) print_log_usage_tracker->print_log_usage(failed_msg);
return failed_msg;
}

Status MemTrackerLimiter::mem_limit_exceeded(const std::string& msg,
MemTrackerLimiter* failed_tracker,
Status failed_try_consume_st) {
std::string MemTrackerLimiter::mem_limit_exceeded(const std::string& msg,
MemTrackerLimiter* failed_tracker,
const std::string& limit_exceeded_errmsg_prefix) {
STOP_CHECK_THREAD_MEM_TRACKER_LIMIT();
std::string detail =
fmt::format("Memory limit exceeded:<consuming tracker:<{}>, {}>, executing msg:<{}>",
_label, failed_try_consume_st.get_error_msg(), msg);
auto st = MemTrackerLimiter::mem_limit_exceeded_construct(detail);
failed_tracker->print_log_usage(st.get_error_msg());
return st;
_label, limit_exceeded_errmsg_prefix, msg);
auto failed_msg = MemTrackerLimiter::limit_exceeded_errmsg_suffix_str(detail);
failed_tracker->print_log_usage(failed_msg);
return failed_msg;
}

Status MemTrackerLimiter::mem_limit_exceeded(RuntimeState* state, const std::string& msg,
int64_t failed_alloc_size) {
Status rt = mem_limit_exceeded(msg, failed_alloc_size);
state->log_error(rt.to_string());
return rt;
auto failed_msg = mem_limit_exceeded(msg, failed_alloc_size);
state->log_error(failed_msg);
return Status::MemoryLimitExceeded(failed_msg);
}

} // namespace doris
84 changes: 51 additions & 33 deletions be/src/runtime/memory/mem_tracker_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "common/config.h"
#include "runtime/exec_env.h"
#include "runtime/memory/mem_tracker.h"
#include "service/backend_options.h"
#include "util/mem_info.h"
#include "util/perf_counters.h"

Expand Down Expand Up @@ -64,7 +65,7 @@ class MemTrackerLimiter final : public MemTracker {
size_t upper_level) const;

public:
static Status check_sys_mem_info(int64_t bytes) {
static bool sys_mem_exceed_limit_check(int64_t bytes) {
// Limit process memory usage using the actual physical memory of the process in `/proc/self/status`.
// This is independent of the consumption value of the mem tracker, which counts the virtual memory
// of the process malloc.
Expand All @@ -78,16 +79,9 @@ class MemTrackerLimiter final : public MemTracker {
bytes >=
MemInfo::mem_limit() ||
PerfCounters::get_vm_rss() + bytes >= MemInfo::hard_mem_limit()) {
auto st = Status::MemoryLimitExceeded(
"process memory used {}, tc/jemalloc cache {}, exceed limit {}, hard limit {}, "
"failed alloc size {}",
print_bytes(PerfCounters::get_vm_rss()),
print_bytes(MemInfo::allocator_cache_mem()), print_bytes(MemInfo::mem_limit()),
print_bytes(MemInfo::hard_mem_limit()), print_bytes(bytes));
ExecEnv::GetInstance()->process_mem_tracker_raw()->print_log_usage(st.get_error_msg());
return st;
return true;
}
return Status::OK();
return false;
}

int64_t group_num() const { return _group_num; }
Expand Down Expand Up @@ -150,9 +144,9 @@ class MemTrackerLimiter final : public MemTracker {
// If 'failed_allocation_size' is greater than zero, logs the allocation size. If
// 'failed_allocation_size' is zero, nothing about the allocation size is logged.
// If 'state' is non-nullptr, logs the error to 'state'.
Status mem_limit_exceeded(const std::string& msg, int64_t failed_allocation_size = 0);
Status mem_limit_exceeded(const std::string& msg, MemTrackerLimiter* failed_tracker,
Status failed_try_consume_st);
std::string mem_limit_exceeded(const std::string& msg, int64_t failed_allocation_size = 0);
std::string mem_limit_exceeded(const std::string& msg, MemTrackerLimiter* failed_tracker,
const std::string& limit_exceeded_errmsg_prefix);
Status mem_limit_exceeded(RuntimeState* state, const std::string& msg,
int64_t failed_allocation_size = 0);

Expand All @@ -167,7 +161,7 @@ class MemTrackerLimiter final : public MemTracker {
}

static std::string print_bytes(int64_t bytes) {
return fmt::format("{}", PrettyPrinter::print(bytes, TUnit::BYTES));
return PrettyPrinter::print(bytes, TUnit::BYTES);
}

private:
Expand All @@ -184,7 +178,7 @@ class MemTrackerLimiter final : public MemTracker {
// they can all consume 'bytes' without exceeding limit. If limit would be exceed,
// no MemTrackerLimiters are updated. Returns true if the consumption was successfully updated.
WARN_UNUSED_RESULT
Status try_consume(int64_t bytes);
bool try_consume(int64_t bytes, std::string& failed_msg);

// When the accumulated untracked memory value exceeds the upper limit,
// the current value is returned and set to 0.
Expand All @@ -198,7 +192,33 @@ class MemTrackerLimiter final : public MemTracker {
const std::list<MemTrackerLimiter*>& trackers,
int64_t* logged_consumption);

static Status mem_limit_exceeded_construct(const std::string& msg);
static std::string limit_exceeded_errmsg_prefix_str(int64_t bytes,
MemTrackerLimiter* exceed_tracker) {
return fmt::format(
"failed alloc size {}, exceeded tracker:<{}>, limit {}, peak "
"used {}, current used {}",
print_bytes(bytes), exceed_tracker->label(), print_bytes(exceed_tracker->limit()),
print_bytes(exceed_tracker->_consumption->value()),
print_bytes(exceed_tracker->_consumption->current_value()));
}

static std::string limit_exceeded_errmsg_suffix_str(const std::string& msg) {
return fmt::format(
"{}. backend {} process memory used {}, limit {}. If query tracker exceed, `set "
"exec_mem_limit=8G` to change limit, details mem usage see be.INFO.",
msg, BackendOptions::get_localhost(), PerfCounters::get_vm_rss_str(),
MemInfo::mem_limit_str());
}

static std::string limit_exceeded_errmsg_sys_str(int64_t bytes) {
auto err_msg = fmt::format(
"process memory used {}, tc/jemalloc allocator cache {}, exceed limit {}, failed "
"alloc size {}",
PerfCounters::get_vm_rss_str(), MemInfo::allocator_cache_mem_str(),
MemInfo::mem_limit_str(), print_bytes(bytes));
ExecEnv::GetInstance()->process_mem_tracker_raw()->print_log_usage(err_msg);
return err_msg;
}

private:
// Limit on memory consumption, in bytes. If limit_ == -1, there is no consumption limit. Used in log_usage。
Expand Down Expand Up @@ -258,12 +278,16 @@ inline void MemTrackerLimiter::cache_consume_local(int64_t bytes) {
}
}

inline Status MemTrackerLimiter::try_consume(int64_t bytes) {
inline bool MemTrackerLimiter::try_consume(int64_t bytes, std::string& failed_msg) {
if (bytes <= 0) {
release(-bytes);
return Status::OK();
failed_msg = std::string();
return true;
}
if (sys_mem_exceed_limit_check(bytes)) {
failed_msg = limit_exceeded_errmsg_sys_str(bytes);
return false;
}
RETURN_IF_ERROR(check_sys_mem_info(bytes));
int i;
// Walk the tracker tree top-down.
for (i = _all_ancestors.size() - 1; i >= 0; --i) {
Expand All @@ -278,34 +302,28 @@ inline Status MemTrackerLimiter::try_consume(int64_t bytes) {
for (int j = _all_ancestors.size() - 1; j > i; --j) {
_all_ancestors[j]->_consumption->add(-bytes);
}
return Status::MemoryLimitExceeded(fmt::format(
"failed alloc size {}, exceeded tracker:<{}>, limit {}, peak "
"used {}, current used {}",
print_bytes(bytes), tracker->label(), print_bytes(tracker->limit()),
print_bytes(tracker->_consumption->value()),
print_bytes(tracker->_consumption->current_value())));
failed_msg = limit_exceeded_errmsg_prefix_str(bytes, tracker);
return false;
}
}
}
// Everyone succeeded, return.
DCHECK_EQ(i, -1);
return Status::OK();
failed_msg = std::string();
return true;
}

inline Status MemTrackerLimiter::check_limit(int64_t bytes) {
if (bytes <= 0) return Status::OK();
RETURN_IF_ERROR(check_sys_mem_info(bytes));
if (sys_mem_exceed_limit_check(bytes)) {
return Status::MemoryLimitExceeded(limit_exceeded_errmsg_sys_str(bytes));
}
int i;
// Walk the tracker tree top-down.
for (i = _limited_ancestors.size() - 1; i >= 0; --i) {
MemTrackerLimiter* tracker = _limited_ancestors[i];
if (tracker->_consumption->current_value() + bytes > tracker->limit()) {
return Status::MemoryLimitExceeded(
fmt::format("expected alloc size {}, exceeded tracker:<{}>, limit {}, peak "
"used {}, current used {}",
print_bytes(bytes), tracker->label(), print_bytes(tracker->limit()),
print_bytes(tracker->_consumption->value()),
print_bytes(tracker->_consumption->current_value())));
return Status::MemoryLimitExceeded(limit_exceeded_errmsg_prefix_str(bytes, tracker));
}
}
return Status::OK();
Expand Down
9 changes: 5 additions & 4 deletions be/src/runtime/memory/thread_mem_tracker_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,17 @@ void ThreadMemTrackerMgr::exceeded_cancel_task(const std::string& cancel_details
}
}

void ThreadMemTrackerMgr::exceeded(Status failed_try_consume_st) {
void ThreadMemTrackerMgr::exceeded(const std::string& failed_msg) {
if (_cb_func != nullptr) {
_cb_func();
}
if (is_attach_query()) {
auto st = _limiter_tracker_raw->mem_limit_exceeded(
auto cancel_msg = _limiter_tracker_raw->mem_limit_exceeded(
fmt::format("exec node:<{}>", last_consumer_tracker()),
_limiter_tracker_raw->parent().get(), failed_try_consume_st);
exceeded_cancel_task(st.get_error_msg());
_limiter_tracker_raw->parent().get(), failed_msg);
exceeded_cancel_task(cancel_msg);
_check_limit = false; // Make sure it will only be canceled once
}
}

} // namespace doris
18 changes: 10 additions & 8 deletions be/src/runtime/memory/thread_mem_tracker_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,14 @@ class ThreadMemTrackerMgr {
// If tryConsume fails due to task mem tracker exceeding the limit, the task must be canceled
void exceeded_cancel_task(const std::string& cancel_details);

void exceeded(Status failed_try_consume_st);
void exceeded(const std::string& failed_msg);

private:
// Cache untracked mem, only update to _untracked_mems when switching mem tracker.
// Frequent calls to unordered_map _untracked_mems[] in consume will degrade performance.
int64_t _untracked_mem = 0;
int64_t old_untracked_mem = 0;
std::string failed_msg = std::string();

std::shared_ptr<MemTrackerLimiter> _limiter_tracker;
MemTrackerLimiter* _limiter_tracker_raw;
Expand Down Expand Up @@ -170,6 +172,7 @@ inline void ThreadMemTrackerMgr::flush_untracked_mem() {
// Temporary memory may be allocated during the consumption of the mem tracker, which will lead to entering
// the TCMalloc Hook again, so suspend consumption to avoid falling into an infinite loop.
_stop_consume = true;
old_untracked_mem = _untracked_mem;
DCHECK(_limiter_tracker);
if (CheckLimit) {
#ifndef BE_TEST
Expand All @@ -182,20 +185,19 @@ inline void ThreadMemTrackerMgr::flush_untracked_mem() {
// DCHECK(!_check_attach || btls_key != EMPTY_BTLS_KEY ||
// _limiter_tracker->label() != "Process");
#endif
Status st = _limiter_tracker_raw->try_consume(_untracked_mem);
if (!st) {
if (!_limiter_tracker_raw->try_consume(old_untracked_mem, failed_msg)) {
// The memory has been allocated, so when TryConsume fails, need to continue to complete
// the consume to ensure the accuracy of the statistics.
_limiter_tracker_raw->consume(_untracked_mem);
exceeded(st);
_limiter_tracker_raw->consume(old_untracked_mem);
exceeded(failed_msg);
}
} else {
_limiter_tracker_raw->consume(_untracked_mem);
_limiter_tracker_raw->consume(old_untracked_mem);
}
for (auto tracker : _consumer_tracker_stack) {
tracker->consume(_untracked_mem);
tracker->consume(old_untracked_mem);
}
_untracked_mem = 0;
_untracked_mem -= old_untracked_mem;
_stop_consume = false;
}

Expand Down
3 changes: 3 additions & 0 deletions be/src/util/mem_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,15 @@ namespace doris {
bool MemInfo::_s_initialized = false;
int64_t MemInfo::_s_physical_mem = -1;
int64_t MemInfo::_s_mem_limit = -1;
std::string MemInfo::_s_mem_limit_str = "";
int64_t MemInfo::_s_hard_mem_limit = -1;
size_t MemInfo::_s_allocator_physical_mem = 0;
size_t MemInfo::_s_tcmalloc_pageheap_free_bytes = 0;
size_t MemInfo::_s_tcmalloc_central_bytes = 0;
size_t MemInfo::_s_tcmalloc_transfer_bytes = 0;
size_t MemInfo::_s_tcmalloc_thread_bytes = 0;
size_t MemInfo::_s_allocator_cache_mem = 0;
std::string MemInfo::_s_allocator_cache_mem_str = "";

void MemInfo::init() {
// Read from /proc/meminfo
Expand Down Expand Up @@ -94,6 +96,7 @@ void MemInfo::init() {

bool is_percent = true;
_s_mem_limit = ParseUtil::parse_mem_spec(config::mem_limit, -1, _s_physical_mem, &is_percent);
_s_mem_limit_str = PrettyPrinter::print(_s_mem_limit, TUnit::BYTES);
_s_hard_mem_limit = _s_physical_mem - std::min(209715200L, _s_physical_mem / 10); // 200M

LOG(INFO) << "Physical Memory: " << PrettyPrinter::print(_s_physical_mem, TUnit::BYTES);
Expand Down
Loading