Skip to content

Commit

Permalink
fix ut
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz committed Sep 21, 2022
1 parent 129c77c commit 691116a
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 18 deletions.
14 changes: 10 additions & 4 deletions be/src/runtime/memory/mem_tracker_limiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,17 +77,23 @@ MemTrackerLimiter::~MemTrackerLimiter() {
if (_label == "Process") doris::thread_context_ptr._init = false;
DCHECK(remain_child_count() == 0 || _label == "Process");
consume(_untracked_mem.exchange(0));
// In order to ensure `consumption of all limiter trackers` + `orphan tracker consumption` = `process tracker consumption`
// in real time. Merge its consumption into orphan when all third level limiter trackers are destructed, to avoid repetition.
// the first layer: process;
// the second layer: a tracker that will not be destructed globally (query/load pool, load channel mgr, etc.);
// the third layer: a query/load/compaction task generates a tracker (query tracker, load channel tracker, etc.).
if (_parent->parent()->label() == "_parent") {
ExecEnv::GetInstance()->orphan_mem_tracker_raw()->cache_consume_local(
_consumption->current_value());
}

if (_parent) {
std::lock_guard<std::mutex> l(_parent->_child_tracker_limiter_lock);
if (_child_tracker_it != _parent->_child_tracker_limiters.end()) {
_parent->_child_tracker_limiters.erase(_child_tracker_it);
_child_tracker_it = _parent->_child_tracker_limiters.end();
}
}
// In order to ensure `consumption of all limiter trackers` + `orphan tracker consumption` = `process tracker consumption`
// in real time. Merge its consumption into orphan when all other trackers are destructed.
ExecEnv::GetInstance()->orphan_mem_tracker_raw()->cache_consume_local(
_consumption->current_value());
}

MemTracker::Snapshot MemTrackerLimiter::make_snapshot(size_t level) const {
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/memory/mem_tracker_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ class MemTrackerLimiter final : public MemTracker {
"alloc size {}",
PerfCounters::get_vm_rss_str(), MemInfo::allocator_cache_mem_str(),
MemInfo::mem_limit_str(), print_bytes(bytes));
ExecEnv::GetInstance()->process_mem_tracker_raw()->print_log_usage(err_msg);
ExecEnv::GetInstance()->process_mem_tracker()->print_log_usage(err_msg);
return err_msg;
}

Expand Down
12 changes: 6 additions & 6 deletions be/src/runtime/memory/thread_mem_tracker_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,25 +29,25 @@ void ThreadMemTrackerMgr::attach_limiter_tracker(
const std::shared_ptr<MemTrackerLimiter>& mem_tracker) {
DCHECK(mem_tracker);
flush_untracked_mem<false>();
_task_id = task_id;
_fragment_instance_id = fragment_instance_id;
_task_id_stack.push_back(task_id);
_fragment_instance_id_stack.push_back(fragment_instance_id);
_limiter_tracker_stack.push_back(mem_tracker);
_limiter_tracker_raw = mem_tracker.get();
}

void ThreadMemTrackerMgr::detach_limiter_tracker() {
DCHECK(!_limiter_tracker_stack.empty());
flush_untracked_mem<false>();
_task_id = "";
_fragment_instance_id = TUniqueId();
_task_id_stack.pop_back();
_fragment_instance_id_stack.pop_back();
_limiter_tracker_stack.pop_back();
_limiter_tracker_raw = _limiter_tracker_stack.back().get();
}

void ThreadMemTrackerMgr::exceeded_cancel_task(const std::string& cancel_details) {
if (_fragment_instance_id != TUniqueId()) {
if (_fragment_instance_id_stack.back() != TUniqueId()) {
ExecEnv::GetInstance()->fragment_mgr()->cancel(
_fragment_instance_id, PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED,
_fragment_instance_id_stack.back(), PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED,
cancel_details);
}
}
Expand Down
13 changes: 7 additions & 6 deletions be/src/runtime/memory/thread_mem_tracker_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class ThreadMemTrackerMgr {
template <bool CheckLimit>
void flush_untracked_mem();

bool is_attach_query() { return _fragment_instance_id != TUniqueId(); }
bool is_attach_query() { return _fragment_instance_id_stack.back() != TUniqueId(); }

std::shared_ptr<MemTrackerLimiter> limiter_mem_tracker() {
return _limiter_tracker_stack.back();
Expand All @@ -100,8 +100,8 @@ class ThreadMemTrackerMgr {
return fmt::format(
"ThreadMemTrackerMgr debug, _untracked_mem:{}, _task_id:{}, "
"_limiter_tracker:<{}>, _consumer_tracker_stack:<{}>",
std::to_string(_untracked_mem), _task_id, _limiter_tracker_raw->log_usage(1),
fmt::to_string(consumer_tracker_buf));
std::to_string(_untracked_mem), _task_id_stack.back(),
_limiter_tracker_raw->log_usage(1), fmt::to_string(consumer_tracker_buf));
}

private:
Expand All @@ -127,8 +127,8 @@ class ThreadMemTrackerMgr {
// If there is a memory new/delete operation in the consume method, it may enter infinite recursion.
bool _stop_consume = false;
bool _check_attach = true;
std::string _task_id;
TUniqueId _fragment_instance_id;
std::vector<std::string> _task_id_stack;
std::vector<TUniqueId> _fragment_instance_id_stack;
ExceedCallBack _cb_func = nullptr;
};

Expand All @@ -137,10 +137,11 @@ inline void ThreadMemTrackerMgr::init() {
// _limiter_tracker_stack[0] = orphan_mem_tracker
DCHECK(_limiter_tracker_stack.size() <= 1)
<< "limiter_tracker_stack.size(): " << _limiter_tracker_stack.size();
_task_id = "";
if (_limiter_tracker_stack.size() == 0) {
_limiter_tracker_stack.push_back(ExecEnv::GetInstance()->orphan_mem_tracker());
_limiter_tracker_raw = ExecEnv::GetInstance()->orphan_mem_tracker_raw();
_task_id_stack.push_back("");
_fragment_instance_id_stack.push_back(TUniqueId());
}
_check_limit = true;
}
Expand Down
2 changes: 1 addition & 1 deletion be/test/testutil/run_all_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ int main(int argc, char** argv) {
std::shared_ptr<doris::MemTrackerLimiter> process_mem_tracker =
std::make_shared<doris::MemTrackerLimiter>(-1, "Process");
std::shared_ptr<doris::MemTrackerLimiter> _orphan_mem_tracker =
std::make_shared<MemTrackerLimiter>(-1, "Orphan", process_mem_tracker);
std::make_shared<doris::MemTrackerLimiter>(-1, "Orphan", process_mem_tracker);
doris::ExecEnv::GetInstance()->set_global_mem_tracker(process_mem_tracker, _orphan_mem_tracker);
doris::thread_context()->_thread_mem_tracker_mgr->init();
doris::TabletSchemaCache::create_global_schema_cache();
Expand Down

0 comments on commit 691116a

Please sign in to comment.