Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][memory tracker] Fix lru cache, compaction tracker, add USE_MEM_TRACKER compile #9661

Merged
merged 4 commits into from
May 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions be/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,14 @@ if (WITH_LZO)
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -DDORIS_WITH_LZO")
endif()

# Enable memory tracker, which allows BE to limit the memory of tasks such as query, load,
# and compaction,and observe the memory of BE through be_ip:http_port/MemTracker.
# Adding the option `USE_MEM_TRACKER=OFF sh build.sh` when compiling can turn off the memory tracker,
# which will bring about a 2% performance improvement, which may be useful in performance POC.
if (USE_MEM_TRACKER)
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -DUSE_MEM_TRACKER")
endif()

if (CMAKE_CXX_COMPILER_VERSION VERSION_GREATER 7.0)
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -faligned-new")
endif()
Expand Down
2 changes: 2 additions & 0 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1609,6 +1609,8 @@ void TaskWorkerPool::_random_sleep(int second) {
}

void TaskWorkerPool::_submit_table_compaction_worker_thread_callback() {
SCOPED_ATTACH_TASK_THREAD(ThreadContext::TaskType::COMPACTION,
StorageEngine::instance()->compaction_mem_tracker());
while (_is_work) {
TAgentTaskRequest agent_task_req;
TCompactionReq compaction_req;
Expand Down
4 changes: 4 additions & 0 deletions be/src/exec/json_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ class JsonScanner : public BaseScanner {
// Get next tuple
Status get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, bool* fill_tuple) override;

Status get_next(vectorized::Block* block, bool* eof) override {
return Status::NotSupported("Not Implemented get block");
}

// Close this scanner
void close() override;

Expand Down
25 changes: 16 additions & 9 deletions be/src/gutil/strings/numbers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1442,7 +1442,6 @@ char* SimpleItoaWithCommas(__int128_t i, char* buffer, int32_t buffer_size) {
return p;
}


// ----------------------------------------------------------------------
// ItoaKMGT()
// Description: converts an integer to a string
Expand Down Expand Up @@ -1480,7 +1479,7 @@ string ItoaKMGT(int64 i) {
}

string AccurateItoaKMGT(int64 i) {
const char *sign = "";
const char* sign = "";
if (i < 0) {
// We lose some accuracy if the caller passes LONG_LONG_MIN, but
// that's OK as this function is only for human readability
Expand All @@ -1489,31 +1488,39 @@ string AccurateItoaKMGT(int64 i) {
i = -i;
}

string ret = std::to_string(i) + " : " + StringPrintf("%s", sign);
string ret = std::to_string(i) + " = " + StringPrintf("%s", sign);
int64 val;
if ((val = (i >> 40)) > 1) {
ret += StringPrintf("%" PRId64 "%s", val, "T");
ret += StringPrintf("%" PRId64
"%s"
" + ",
val, "T");
i = i - (val << 40);
}
if ((val = (i >> 30)) > 1) {
ret += StringPrintf(" %" PRId64 "%s", val, "G");
ret += StringPrintf("%" PRId64
"%s"
" + ",
val, "G");
i = i - (val << 30);
}
if ((val = (i >> 20)) > 1) {
ret += StringPrintf(" %" PRId64 "%s", val, "M");
ret += StringPrintf("%" PRId64
"%s"
" + ",
val, "M");
i = i - (val << 20);
}
if ((val = (i >> 10)) > 1) {
ret += StringPrintf(" %" PRId64 "%s", val, "K");
ret += StringPrintf("%" PRId64 "%s", val, "K");
i = i - (val << 10);
} else {
ret += StringPrintf(" %" PRId64 "%s", i, "K");
ret += StringPrintf("%" PRId64 "%s", i, "K");
}

return ret;
}


// DEPRECATED(wadetregaskis).
// These are non-inline because some BUILD files turn on -Wformat-non-literal.

Expand Down
13 changes: 10 additions & 3 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,18 @@ using std::vector;
namespace doris {

Compaction::Compaction(TabletSharedPtr tablet, const std::string& label)
: _mem_tracker(MemTracker::create_tracker(-1, label, nullptr, MemTrackerLevel::INSTANCE)),
_tablet(tablet),
: _tablet(tablet),
_input_rowsets_size(0),
_input_row_num(0),
_state(CompactionState::INITED) {}
_state(CompactionState::INITED) {
#ifndef BE_TEST
_mem_tracker = MemTracker::create_tracker(-1, label,
StorageEngine::instance()->compaction_mem_tracker(),
MemTrackerLevel::INSTANCE);
#else
_mem_tracker = MemTracker::get_process_tracker();
#endif
}

Compaction::~Compaction() {}

Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ class Compaction {
Status execute_compact();
virtual Status execute_compact_impl() = 0;

std::shared_ptr<MemTracker>& get_mem_tracker() { return _mem_tracker; }

protected:
virtual Status pick_rowsets_to_compact() = 0;
virtual std::string compaction_name() const = 0;
Expand Down
20 changes: 8 additions & 12 deletions be/src/olap/lru_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ void LRUCache::_evict_one_entry(LRUHandle* e) {

Cache::Handle* LRUCache::insert(const CacheKey& key, uint32_t hash, void* value, size_t charge,
void (*deleter)(const CacheKey& key, void* value),
CachePriority priority) {
CachePriority priority, MemTracker* tracker) {
size_t handle_size = sizeof(LRUHandle) - 1 + key.size();
LRUHandle* e = reinterpret_cast<LRUHandle*>(malloc(handle_size));
e->value = value;
Expand All @@ -296,7 +296,12 @@ Cache::Handle* LRUCache::insert(const CacheKey& key, uint32_t hash, void* value,
e->next = e->prev = nullptr;
e->in_cache = true;
e->priority = priority;
e->mem_tracker = tracker;
memcpy(e->key_data, key.data(), key.size());
// The memory of the parameter value should be recorded in the tls mem tracker,
// transfer the memory ownership of the value to ShardedLRUCache::_mem_tracker.
if (tracker)
tls_ctx()->_thread_mem_tracker_mgr->mem_tracker()->transfer_to(tracker, e->total_size);
LRUHandle* to_remove_head = nullptr;
{
std::lock_guard<std::mutex> l(_mutex);
Expand Down Expand Up @@ -433,7 +438,6 @@ ShardedLRUCache::ShardedLRUCache(const std::string& name, size_t total_capacity,
: _name(name),
_last_id(1),
_mem_tracker(MemTracker::create_tracker(-1, name, nullptr, MemTrackerLevel::OVERVIEW)) {
SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_END_CLEAR(_mem_tracker);
const size_t per_shard = (total_capacity + (kNumShards - 1)) / kNumShards;
for (int s = 0; s < kNumShards; s++) {
_shards[s] = new LRUCache(type);
Expand All @@ -452,7 +456,6 @@ ShardedLRUCache::ShardedLRUCache(const std::string& name, size_t total_capacity,
}

ShardedLRUCache::~ShardedLRUCache() {
SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
for (int s = 0; s < kNumShards; s++) {
delete _shards[s];
}
Expand All @@ -463,12 +466,9 @@ ShardedLRUCache::~ShardedLRUCache() {
Cache::Handle* ShardedLRUCache::insert(const CacheKey& key, void* value, size_t charge,
void (*deleter)(const CacheKey& key, void* value),
CachePriority priority) {
// The memory of the parameter value should be recorded in the tls mem tracker,
// transfer the memory ownership of the value to ShardedLRUCache::_mem_tracker.
tls_ctx()->_thread_mem_tracker_mgr->mem_tracker()->transfer_to(_mem_tracker.get(), charge);
SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
const uint32_t hash = _hash_slice(key);
return _shards[_shard(hash)]->insert(key, hash, value, charge, deleter, priority);
return _shards[_shard(hash)]->insert(key, hash, value, charge, deleter, priority,
_mem_tracker.get());
}

Cache::Handle* ShardedLRUCache::lookup(const CacheKey& key) {
Expand All @@ -477,13 +477,11 @@ Cache::Handle* ShardedLRUCache::lookup(const CacheKey& key) {
}

void ShardedLRUCache::release(Handle* handle) {
SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
LRUHandle* h = reinterpret_cast<LRUHandle*>(handle);
_shards[_shard(h->hash)]->release(handle);
}

void ShardedLRUCache::erase(const CacheKey& key) {
SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
const uint32_t hash = _hash_slice(key);
_shards[_shard(hash)]->erase(key, hash);
}
Expand All @@ -502,7 +500,6 @@ uint64_t ShardedLRUCache::new_id() {
}

int64_t ShardedLRUCache::prune() {
SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
int64_t num_prune = 0;
for (int s = 0; s < kNumShards; s++) {
num_prune += _shards[s]->prune();
Expand All @@ -511,7 +508,6 @@ int64_t ShardedLRUCache::prune() {
}

int64_t ShardedLRUCache::prune_if(CacheValuePredicate pred) {
SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
int64_t num_prune = 0;
for (int s = 0; s < kNumShards; s++) {
num_prune += _shards[s]->prune_if(pred);
Expand Down
8 changes: 7 additions & 1 deletion be/src/olap/lru_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

#include "olap/olap_common.h"
#include "runtime/mem_tracker.h"
#include "runtime/thread_context.h"
#include "util/metrics.h"
#include "util/slice.h"

Expand Down Expand Up @@ -236,6 +237,7 @@ typedef struct LRUHandle {
uint32_t refs;
uint32_t hash; // Hash of key(); used for fast sharding and comparisons
CachePriority priority = CachePriority::NORMAL;
MemTracker* mem_tracker;
char key_data[1]; // Beginning of key

CacheKey key() const {
Expand All @@ -250,6 +252,9 @@ typedef struct LRUHandle {

void free() {
(*deleter)(key(), value);
if (mem_tracker)
mem_tracker->transfer_to(tls_ctx()->_thread_mem_tracker_mgr->mem_tracker().get(),
total_size);
::free(this);
}

Expand Down Expand Up @@ -308,7 +313,8 @@ class LRUCache {
// Like Cache methods, but with an extra "hash" parameter.
Cache::Handle* insert(const CacheKey& key, uint32_t hash, void* value, size_t charge,
void (*deleter)(const CacheKey& key, void* value),
CachePriority priority = CachePriority::NORMAL);
CachePriority priority = CachePriority::NORMAL,
MemTracker* tracker = nullptr);
Cache::Handle* lookup(const CacheKey& key, uint32_t hash);
void release(Cache::Handle* handle);
void erase(const CacheKey& key, uint32_t hash);
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/olap_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,8 @@ Status StorageEngine::_submit_compaction_task(TabletSharedPtr tablet,
Status st = tablet->prepare_compaction_and_calculate_permits(compaction_type, tablet, &permits);
if (st.ok() && permits > 0 && _permit_limiter.request(permits)) {
auto st = _compaction_thread_pool->submit_func([=]() {
SCOPED_ATTACH_TASK_THREAD(ThreadContext::TaskType::COMPACTION,
tablet->get_compaction_mem_tracker(compaction_type));
CgroupsMgr::apply_system_cgroup();
tablet->execute_compaction(compaction_type);
_permit_limiter.release(permits);
Expand Down
9 changes: 9 additions & 0 deletions be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1504,6 +1504,7 @@ Status Tablet::create_rowset_writer(const int64_t& txn_id, const PUniqueId& load
void Tablet::_init_context_common_fields(RowsetWriterContext& context) {
context.rowset_id = StorageEngine::instance()->next_rowset_id();
context.tablet_uid = tablet_uid();

context.tablet_id = tablet_id();
context.partition_id = partition_id();
context.tablet_schema_hash = schema_hash();
Expand All @@ -1522,4 +1523,12 @@ Status Tablet::create_rowset(RowsetMetaSharedPtr rowset_meta, RowsetSharedPtr* r
return RowsetFactory::create_rowset(&tablet_schema(), tablet_path_desc(), rowset_meta, rowset);
}

std::shared_ptr<MemTracker>& Tablet::get_compaction_mem_tracker(CompactionType compaction_type) {
if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) {
return _cumulative_compaction->get_mem_tracker();
} else {
return _base_compaction->get_mem_tracker();
}
}

} // namespace doris
2 changes: 2 additions & 0 deletions be/src/olap/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,8 @@ class Tablet : public BaseTablet {
return _cumulative_compaction_policy;
}

std::shared_ptr<MemTracker>& get_compaction_mem_tracker(CompactionType compaction_type);

inline bool all_beta() const {
std::shared_lock rdlock(_meta_lock);
return _tablet_meta->all_beta();
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/load_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ bool LoadChannel::is_finished() {
}

Status LoadChannel::cancel() {
SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
std::lock_guard<std::mutex> l(_lock);
for (auto& it : _tablets_channels) {
it.second->cancel();
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/mem_tracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ void MemTracker::init_virtual() {
MemTracker::~MemTracker() {
consume(_untracked_mem.exchange(0)); // before memory_leak_check
// TCMalloc hook will be triggered during destructor memtracker, may cause crash.
if (_label == "Process") GLOBAL_STOP_THREAD_LOCAL_MEM_TRACKER();
if (_label == "Process") STOP_THREAD_LOCAL_MEM_TRACKER(false);
if (!_virtual && config::memory_leak_detection) MemTracker::memory_leak_check(this);
if (!_virtual && parent()) {
// Do not call release on the parent tracker to avoid repeated releases.
Expand Down
Loading