Skip to content

Commit

Permalink
fix query mem
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz committed Aug 12, 2022
1 parent 32e451e commit dd54cb5
Show file tree
Hide file tree
Showing 8 changed files with 104 additions and 20 deletions.
9 changes: 7 additions & 2 deletions be/src/runtime/mem_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@

#include "runtime/memory/chunk_allocator.h"
#include "runtime/memory/mem_tracker.h"
#include "runtime/thread_context.h"
#include "util/bit_util.h"
#include "util/doris_metrics.h"

Expand Down Expand Up @@ -67,6 +66,8 @@ MemPool::~MemPool() {
total_bytes_released += chunk.chunk.size;
ChunkAllocator::instance()->free(chunk.chunk);
}
THREAD_MEM_TRACKER_TRANSFER_FROM(total_bytes_released - peak_allocated_bytes_,
ExecEnv::GetInstance()->process_mem_tracker().get());
if (_mem_tracker) _mem_tracker->release(total_bytes_released);
DorisMetrics::instance()->memory_pool_bytes_total->increment(-total_bytes_released);
}
Expand All @@ -87,12 +88,15 @@ void MemPool::free_all() {
total_bytes_released += chunk.chunk.size;
ChunkAllocator::instance()->free(chunk.chunk);
}
THREAD_MEM_TRACKER_TRANSFER_FROM(total_bytes_released - peak_allocated_bytes_,
ExecEnv::GetInstance()->process_mem_tracker().get());
if (_mem_tracker) _mem_tracker->release(total_bytes_released);
chunks_.clear();
next_chunk_size_ = INITIAL_CHUNK_SIZE;
current_chunk_idx_ = -1;
total_allocated_bytes_ = 0;
total_reserved_bytes_ = 0;
peak_allocated_bytes_ = 0;

DorisMetrics::instance()->memory_pool_bytes_total->increment(-total_bytes_released);
}
Expand Down Expand Up @@ -145,6 +149,7 @@ Status MemPool::find_chunk(size_t min_size, bool check_limits) {
// Allocate a new chunk. Return early if allocate fails.
Chunk chunk;
RETURN_IF_ERROR(ChunkAllocator::instance()->allocate(chunk_size, &chunk));
THREAD_MEM_TRACKER_TRANSFER_TO(chunk_size, ExecEnv::GetInstance()->process_mem_tracker().get());
if (_mem_tracker) _mem_tracker->consume(chunk_size);
ASAN_POISON_MEMORY_REGION(chunk.data, chunk_size);
// Put it before the first free chunk. If no free chunks, it goes at the end.
Expand Down Expand Up @@ -215,7 +220,7 @@ void MemPool::acquire_data(MemPool* src, bool keep_current) {
src->total_allocated_bytes_ = 0;
}

peak_allocated_bytes_ = std::max(total_allocated_bytes_, peak_allocated_bytes_);
reset_peak();

if (!keep_current) src->free_all();
DCHECK(src->check_integrity(false));
Expand Down
12 changes: 11 additions & 1 deletion be/src/runtime/mem_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@
#include "common/status.h"
#include "gutil/dynamic_annotations.h"
#include "olap/olap_define.h"
#include "runtime/exec_env.h"
#include "runtime/memory/chunk.h"
#include "runtime/thread_context.h"
#include "util/bit_util.h"

namespace doris {
Expand Down Expand Up @@ -209,6 +211,14 @@ class MemPool {
/// data. Otherwise the current chunk can be either empty or full.
bool check_integrity(bool check_current_chunk_empty);

void reset_peak() {
if (total_allocated_bytes_ - peak_allocated_bytes_ > 1024) {
THREAD_MEM_TRACKER_TRANSFER_FROM(total_allocated_bytes_ - peak_allocated_bytes_,
ExecEnv::GetInstance()->process_mem_tracker().get());
peak_allocated_bytes_ = total_allocated_bytes_;
}
}

/// Return offset to unoccupied space in current chunk.
int64_t get_free_offset() const {
if (current_chunk_idx_ == -1) return 0;
Expand Down Expand Up @@ -240,7 +250,7 @@ class MemPool {
DCHECK_LE(info.allocated_bytes + size, info.chunk.size);
info.allocated_bytes += padding + size;
total_allocated_bytes_ += padding + size;
peak_allocated_bytes_ = std::max(total_allocated_bytes_, peak_allocated_bytes_);
reset_peak();
DCHECK_LE(current_chunk_idx_, chunks_.size() - 1);
return result;
}
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/memory/mem_tracker_limiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ MemTrackerLimiter::~MemTrackerLimiter() {
// TCMalloc hook will be triggered during destructor memtracker, may cause crash.
if (_label == "Process") doris::thread_context_ptr._init = false;
DCHECK(remain_child_count() == 0 || _label == "Process");
consume(_untracked_mem.exchange(0));
if (_parent) {
std::lock_guard<std::mutex> l(_parent->_child_tracker_limiter_lock);
if (_child_tracker_it != _parent->_child_tracker_limiters.end()) {
Expand Down
34 changes: 28 additions & 6 deletions be/src/runtime/memory/mem_tracker_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,12 @@ class MemTrackerLimiter final : public MemTracker {
WARN_UNUSED_RESULT
Status try_consume(int64_t bytes);

// When the accumulated untracked memory value exceeds the upper limit,
// the current value is returned and set to 0.
// Thread safety.
int64_t add_untracked_mem(int64_t bytes);
void consume_cache(int64_t bytes);

// Log consumption of all the trackers provided. Returns the sum of consumption in
// 'logged_consumption'. 'max_recursive_depth' specifies the maximum number of levels
// of children to include in the dump. If it is zero, then no children are dumped.
Expand All @@ -194,6 +200,10 @@ class MemTrackerLimiter final : public MemTracker {
// _all_ancestors with valid limits
std::vector<MemTrackerLimiter*> _limited_ancestors;

// Consume size smaller than mem_tracker_consume_min_size_bytes will continue to accumulate
// to avoid frequent calls to consume/release of MemTracker.
std::atomic<int64_t> _untracked_mem = 0;

// Child trackers of this tracker limiter. Used for error reporting and
// listing only (i.e. updating the consumption of a parent tracker limiter does not
// update that of its children).
Expand Down Expand Up @@ -222,12 +232,24 @@ class MemTrackerLimiter final : public MemTracker {
};

inline void MemTrackerLimiter::consume(int64_t bytes) {
if (bytes == 0) {
return;
} else {
for (auto& tracker : _all_ancestors) {
tracker->_consumption->add(bytes);
}
if (bytes == 0) return;
for (auto& tracker : _all_ancestors) {
tracker->_consumption->add(bytes);
}
}

inline int64_t MemTrackerLimiter::add_untracked_mem(int64_t bytes) {
_untracked_mem += bytes;
if (std::abs(_untracked_mem) >= config::mem_tracker_consume_min_size_bytes) {
return _untracked_mem.exchange(0);
}
return 0;
}

inline void MemTrackerLimiter::consume_cache(int64_t bytes) {
int64_t consume_bytes = add_untracked_mem(bytes);
if (consume_bytes != 0) {
consume(consume_bytes);
}
}

Expand Down
4 changes: 3 additions & 1 deletion be/src/runtime/memory/mem_tracker_task_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,9 @@ void MemTrackerTaskPool::logout_task_mem_tracker() {
// the effect of the ended query mem tracker on the query pool mem tracker should be cleared, that is,
// the negative number of the current value of consume.
it->second->parent()->consumption_revise(-it->second->consumption());
LOG(INFO) << "Deregister query/load memory tracker, queryId/loadId: " << it->first;
LOG(INFO) << fmt::format(
"Deregister query/load memory tracker, queryId={}, Limit={}, PeakUsed={}",
it->first, it->second->limit(), it->second->peak_consumption());
expired_task_ids.emplace_back(it->first);
}
}
Expand Down
5 changes: 3 additions & 2 deletions be/src/runtime/memory/thread_mem_tracker_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,13 @@ class ThreadMemTrackerMgr {
// must increase the control to avoid entering infinite recursion, otherwise it may cause crash or stuck,
void consume(int64_t size);

// Will not change the value of process_mem_tracker, even though mem_tracker == process_mem_tracker.
void transfer_to(int64_t size, MemTrackerLimiter* mem_tracker) {
consume(-size);
mem_tracker->consume(size);
mem_tracker->consume_cache(size);
}
void transfer_from(int64_t size, MemTrackerLimiter* mem_tracker) {
mem_tracker->release(size);
mem_tracker->consume_cache(-size);
consume(size);
}

Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/thread_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ class StopCheckThreadMemTrackerLimit {
}
};

// The following macros are used to fix the tracking accuracy of caches etc.
#define STOP_CHECK_THREAD_MEM_TRACKER_LIMIT() \
auto VARNAME_LINENUM(stop_check_limit) = StopCheckThreadMemTrackerLimit()
#define CONSUME_THREAD_MEM_TRACKER(size) \
Expand Down
Loading

0 comments on commit dd54cb5

Please sign in to comment.