Skip to content

Commit

Permalink
[branch-1.1-lts](cherry-pick) Some fixes for mem tracker (#12889)
Browse files Browse the repository at this point in the history
* [fix][memtracker] remove gc and fix print

* [fix](memory) Fix BE OOM when load -238 fail

* [fix](memtracker) Process physical mem check does not include tc/jemalloc allocator cache (#12688)

tcmalloc/jemalloc allocator cache does not participate in the mem check as part of the process physical memory.

because new/malloc will trigger mem hook when using tcmalloc/jemalloc allocator cache, but it may not actually alloc physical memory, which is not expected in mem hook fail.

in addition:

The value of tcmalloc/jemalloc allocator cache is used as a mem tracker, the parent is the process mem tracker, which is updated every 1s.
Modify the process default mem_limit to 90%. expect mem tracker to effectively limit the memory usage of the process.

* Fix memory leak by calling  in mem hook (#12708)

After the consume mem tracker exceeds the mem limit in the mem hook, the boost stacktrace will be printed. A query/load will only be printed once, and the process tracker will only be printed once per second.

After the process memory reaches the upper limit, the boost stacktrace will be printed every second. The observed phenomena are as follows:

After query/load is canceled, the memory increases instantly;
tcmalloc profile total physical memory is less than perf process memory;
The process mem tracker is smaller than the perf process memory;

* [fix](memtracker) Fix thread mem tracker try consume accuracy #12782

* [Bugfix](mem) Fix memory limit check may overflow (#12776)

This bug is because the result of subtracting signed and unsigned numbers may overflow if it is negative.

Co-authored-by: Zhengguo Yang <yangzhgg@gmail.com>
  • Loading branch information
xinyiZzz and yangzhg authored Sep 22, 2022
1 parent 062fd45 commit d3006dd
Show file tree
Hide file tree
Showing 21 changed files with 235 additions and 195 deletions.
2 changes: 1 addition & 1 deletion be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ CONF_Int64(tc_max_total_thread_cache_bytes, "1073741824");
// defaults to bytes if no unit is given"
// must larger than 0. and if larger than physical memory size,
// it will be set to physical memory size.
CONF_String(mem_limit, "80%");
CONF_String(mem_limit, "90%");

// the port heartbeat service used
CONF_Int32(heartbeat_service_port, "9050");
Expand Down
1 change: 1 addition & 0 deletions be/src/common/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ namespace doris {
bool k_doris_exit = false;

void Daemon::tcmalloc_gc_thread() {
// TODO All cache GC wish to be supported
while (!_stop_background_threads_latch.wait_for(MonoDelta::FromSeconds(10))) {
size_t used_size = 0;
size_t free_size = 0;
Expand Down
5 changes: 3 additions & 2 deletions be/src/http/default_path_handlers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "runtime/mem_tracker.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "util/debug_util.h"
#include "util/perf_counters.h"
#include "util/pretty_printer.h"
#include "util/thread.h"

Expand Down Expand Up @@ -88,8 +89,8 @@ void mem_usage_handler(const std::shared_ptr<MemTracker>& mem_tracker,
(*output) << "<pre>"
<< "Mem Limit: " << PrettyPrinter::print(mem_tracker->limit(), TUnit::BYTES)
<< std::endl
<< "Mem Consumption: "
<< PrettyPrinter::print(mem_tracker->consumption(), TUnit::BYTES) << std::endl
<< "Physical Mem From Perf: "
<< PrettyPrinter::print(PerfCounters::get_vm_rss(), TUnit::BYTES) << std::endl
<< "</pre>";
} else {
(*output) << "<pre>"
Expand Down
9 changes: 9 additions & 0 deletions be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class EvHttpServer;
class ExternalScanContextMgr;
class FragmentMgr;
class ResultCache;
class NewMemTracker;
class LoadPathMgr;
class LoadStreamMgr;
class MemTracker;
Expand Down Expand Up @@ -123,6 +124,13 @@ class ExecEnv {
std::shared_ptr<MemTrackerLimiter> new_process_mem_tracker() { return _process_mem_tracker; }
MemTrackerLimiter* process_mem_tracker_raw() { return _process_mem_tracker_raw; }

void set_process_mem_tracker(const std::shared_ptr<MemTrackerLimiter>& tracker) {
_process_mem_tracker = tracker;
_process_mem_tracker_raw = tracker.get();
}
std::shared_ptr<NewMemTracker> allocator_cache_mem_tracker() {
return _allocator_cache_mem_tracker;
}
std::shared_ptr<MemTrackerLimiter> query_pool_mem_tracker() { return _query_pool_mem_tracker; }
std::shared_ptr<MemTrackerLimiter> load_pool_mem_tracker() { return _load_pool_mem_tracker; }
MemTrackerTaskPool* task_pool_mem_tracker_registry() { return _task_pool_mem_tracker_registry; }
Expand Down Expand Up @@ -199,6 +207,7 @@ class ExecEnv {
// The ancestor for all trackers. Every tracker is visible from the process down.
// Not limit total memory by process tracker, and it's just used to track virtual memory of process.
std::shared_ptr<MemTrackerLimiter> _process_mem_tracker;
std::shared_ptr<NewMemTracker> _allocator_cache_mem_tracker;
MemTrackerLimiter* _process_mem_tracker_raw;
// The ancestor for all querys tracker.
std::shared_ptr<MemTrackerLimiter> _query_pool_mem_tracker;
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ Status ExecEnv::_init_mem_tracker() {
}
#endif

_allocator_cache_mem_tracker = std::make_shared<NewMemTracker>("Tc/JemallocAllocatorCache");
_query_pool_mem_tracker =
std::make_shared<MemTrackerLimiter>(-1, "QueryPool", _process_mem_tracker);
_load_pool_mem_tracker =
Expand Down
9 changes: 5 additions & 4 deletions be/src/runtime/load_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ Status LoadChannel::add_batch(const PTabletWriterAddBatchRequest& request,
}

// 2. check if mem consumption exceed limit
handle_mem_exceed_limit(false);
RETURN_IF_ERROR(handle_mem_exceed_limit(false));

// 3. add batch to tablets channel
if (request.has_row_batch()) {
Expand All @@ -111,11 +111,11 @@ Status LoadChannel::add_batch(const PTabletWriterAddBatchRequest& request,
return st;
}

void LoadChannel::handle_mem_exceed_limit(bool force) {
Status LoadChannel::handle_mem_exceed_limit(bool force) {
// lock so that only one thread can check mem limit
std::lock_guard<std::mutex> l(_lock);
if (!(force || _mem_tracker->limit_exceeded())) {
return;
return Status::OK();
}

if (!force) {
Expand All @@ -125,12 +125,13 @@ void LoadChannel::handle_mem_exceed_limit(bool force) {

std::shared_ptr<TabletsChannel> channel;
if (_find_largest_consumption_channel(&channel)) {
channel->reduce_mem_usage(_mem_tracker->limit());
return channel->reduce_mem_usage(_mem_tracker->limit());
} else {
// should not happen, add log to observe
LOG(WARNING) << "fail to find suitable tablets-channel when memory exceed. "
<< "load_id=" << _load_id;
}
return Status::OK();
}

// lock should be held when calling this method
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/load_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class LoadChannel {
// If yes, it will pick a tablets channel to try to reduce memory consumption.
// If force is true, even if this load channel does not exceeds limit, it will still
// try to reduce memory.
void handle_mem_exceed_limit(bool force);
Status handle_mem_exceed_limit(bool force);

int64_t mem_consumption() const { return _mem_tracker->consumption(); }

Expand Down
10 changes: 5 additions & 5 deletions be/src/runtime/load_channel_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ Status LoadChannelMgr::add_batch(const PTabletWriterAddBatchRequest& request,
// 2. check if mem consumption exceed limit
// If this is a high priority load task, do not handle this.
// because this may block for a while, which may lead to rpc timeout.
_handle_mem_exceed_limit();
RETURN_IF_ERROR(_handle_mem_exceed_limit());
}

// 3. add batch to load channel
Expand All @@ -175,11 +175,11 @@ Status LoadChannelMgr::add_batch(const PTabletWriterAddBatchRequest& request,
return Status::OK();
}

void LoadChannelMgr::_handle_mem_exceed_limit() {
Status LoadChannelMgr::_handle_mem_exceed_limit() {
// lock so that only one thread can check mem limit
std::lock_guard<std::mutex> l(_lock);
if (!_mem_tracker->limit_exceeded()) {
return;
return Status::OK();
}

int64_t max_consume = 0;
Expand All @@ -198,14 +198,14 @@ void LoadChannelMgr::_handle_mem_exceed_limit() {
if (max_consume == 0) {
// should not happen, add log to observe
LOG(WARNING) << "failed to find suitable load channel when total load mem limit exceed";
return;
return Status::OK();
}
DCHECK(channel.get() != nullptr);

// force reduce mem limit of the selected channel
LOG(INFO) << "reducing memory of " << *channel << " because total load mem consumption "
<< _mem_tracker->consumption() << " has exceeded limit " << _mem_tracker->limit();
channel->handle_mem_exceed_limit(true);
return channel->handle_mem_exceed_limit(true);
}

Status LoadChannelMgr::cancel(const PTabletWriterCancelRequest& params) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/load_channel_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class LoadChannelMgr {
private:
// check if the total load mem consumption exceeds limit.
// If yes, it will pick a load channel to try to reduce memory consumption.
void _handle_mem_exceed_limit();
Status _handle_mem_exceed_limit();

Status _start_bg_worker();

Expand Down
9 changes: 5 additions & 4 deletions be/src/runtime/memory/mem_tracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,11 @@ void NewMemTracker::make_group_snapshot(std::vector<NewMemTracker::Snapshot>* sn
}

std::string NewMemTracker::log_usage(NewMemTracker::Snapshot snapshot) {
return fmt::format("NewMemTracker Label={}, Parent Label={}, Used={}, Peak={}", snapshot.label,
snapshot.parent,
PrettyPrinter::print(snapshot.cur_consumption, TUnit::BYTES),
PrettyPrinter::print(snapshot.peak_consumption, TUnit::BYTES));
return fmt::format(
"MemTracker Label={}, Parent Label={}, Used={}({} B), Peak={}({} B)", snapshot.label,
snapshot.parent, PrettyPrinter::print(snapshot.cur_consumption, TUnit::BYTES),
snapshot.cur_consumption, PrettyPrinter::print(snapshot.peak_consumption, TUnit::BYTES),
snapshot.peak_consumption);
}

} // namespace doris
Loading

0 comments on commit d3006dd

Please sign in to comment.