Skip to content

Commit

Permalink
fix ut
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz committed Sep 20, 2022
1 parent e87f2cd commit 89d4616
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 13 deletions.
12 changes: 6 additions & 6 deletions be/src/runtime/memory/thread_mem_tracker_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,25 +29,25 @@ void ThreadMemTrackerMgr::attach_limiter_tracker(
const std::shared_ptr<MemTrackerLimiter>& mem_tracker) {
DCHECK(mem_tracker);
flush_untracked_mem<false>();
_task_id = task_id;
_fragment_instance_id = fragment_instance_id;
_task_id_stack.push_back(task_id);
_fragment_instance_id_stack.push_back(fragment_instance_id);
_limiter_tracker_stack.push_back(mem_tracker);
_limiter_tracker_raw = mem_tracker.get();
}

void ThreadMemTrackerMgr::detach_limiter_tracker() {
DCHECK(!_limiter_tracker_stack.empty());
flush_untracked_mem<false>();
_task_id = "";
_fragment_instance_id = TUniqueId();
_task_id_stack.pop_back();
_fragment_instance_id_stack.pop_back();
_limiter_tracker_stack.pop_back();
_limiter_tracker_raw = _limiter_tracker_stack.back().get();
}

void ThreadMemTrackerMgr::exceeded_cancel_task(const std::string& cancel_details) {
if (_fragment_instance_id != TUniqueId()) {
if (_fragment_instance_id_stack.back() != TUniqueId()) {
ExecEnv::GetInstance()->fragment_mgr()->cancel(
_fragment_instance_id, PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED,
_fragment_instance_id_stack.back(), PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED,
cancel_details);
}
}
Expand Down
13 changes: 7 additions & 6 deletions be/src/runtime/memory/thread_mem_tracker_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class ThreadMemTrackerMgr {
template <bool CheckLimit>
void flush_untracked_mem();

bool is_attach_query() { return _fragment_instance_id != TUniqueId(); }
bool is_attach_query() { return _fragment_instance_id_stack.back() != TUniqueId(); }

std::shared_ptr<MemTrackerLimiter> limiter_mem_tracker() {
return _limiter_tracker_stack.back();
Expand All @@ -100,8 +100,8 @@ class ThreadMemTrackerMgr {
return fmt::format(
"ThreadMemTrackerMgr debug, _untracked_mem:{}, _task_id:{}, "
"_limiter_tracker:<{}>, _consumer_tracker_stack:<{}>",
std::to_string(_untracked_mem), _task_id, _limiter_tracker_raw->log_usage(1),
fmt::to_string(consumer_tracker_buf));
std::to_string(_untracked_mem), _task_id_stack.back(),
_limiter_tracker_raw->log_usage(1), fmt::to_string(consumer_tracker_buf));
}

private:
Expand All @@ -125,8 +125,8 @@ class ThreadMemTrackerMgr {
// If there is a memory new/delete operation in the consume method, it may enter infinite recursion.
bool _stop_consume = false;
bool _check_attach = true;
std::string _task_id;
TUniqueId _fragment_instance_id;
std::vector<std::string> _task_id_stack;
std::vector<TUniqueId> _fragment_instance_id_stack;
ExceedCallBack _cb_func = nullptr;
};

Expand All @@ -135,10 +135,11 @@ inline void ThreadMemTrackerMgr::init() {
// _limiter_tracker_stack[0] = orphan_mem_tracker
DCHECK(_limiter_tracker_stack.size() <= 1)
<< "limiter_tracker_stack.size(): " << _limiter_tracker_stack.size();
_task_id = "";
if (_limiter_tracker_stack.size() == 0) {
_limiter_tracker_stack.push_back(ExecEnv::GetInstance()->orphan_mem_tracker());
_limiter_tracker_raw = ExecEnv::GetInstance()->orphan_mem_tracker_raw();
_task_id_stack.push_back("");
_fragment_instance_id_stack.push_back(TUniqueId());
}
_check_limit = true;
}
Expand Down
2 changes: 1 addition & 1 deletion be/test/testutil/run_all_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ int main(int argc, char** argv) {
std::shared_ptr<doris::MemTrackerLimiter> process_mem_tracker =
std::make_shared<doris::MemTrackerLimiter>(-1, "Process");
std::shared_ptr<doris::MemTrackerLimiter> _orphan_mem_tracker =
std::make_shared<MemTrackerLimiter>(-1, "Orphan", process_mem_tracker);
std::make_shared<doris::MemTrackerLimiter>(-1, "Orphan", process_mem_tracker);
doris::ExecEnv::GetInstance()->set_global_mem_tracker(process_mem_tracker, _orphan_mem_tracker);
doris::thread_context()->_thread_mem_tracker_mgr->init();
doris::TabletSchemaCache::create_global_schema_cache();
Expand Down

0 comments on commit 89d4616

Please sign in to comment.