diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index b9108998f22f2b..631c9bebed0f10 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -581,7 +581,7 @@ void NodeChannel::try_send_batch(RuntimeState* state) { _add_batch_closure->cntl.http_request().set_method(brpc::HTTP_METHOD_POST); _add_batch_closure->cntl.http_request().set_content_type("application/json"); { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->bthread_mem_tracker()); + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker()); _brpc_http_stub->tablet_writer_add_batch_by_http(&_add_batch_closure->cntl, NULL, &_add_batch_closure->result, _add_batch_closure); @@ -589,7 +589,7 @@ void NodeChannel::try_send_batch(RuntimeState* state) { } else { _add_batch_closure->cntl.http_request().Clear(); { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->bthread_mem_tracker()); + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker()); _stub->tablet_writer_add_batch(&_add_batch_closure->cntl, &request, &_add_batch_closure->result, _add_batch_closure); } diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h index 3e8d66fdbd9396..92a3e3839aae08 100644 --- a/be/src/exec/tablet_sink.h +++ b/be/src/exec/tablet_sink.h @@ -96,7 +96,7 @@ class ReusableClosure final : public google::protobuf::Closure { ~ReusableClosure() override { // shouldn't delete when Run() is calling or going to be called, wait for current Run() done. join(); - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->bthread_mem_tracker()); + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker()); cntl.Reset(); } @@ -124,7 +124,7 @@ class ReusableClosure final : public google::protobuf::Closure { // plz follow this order: reset() -> set_in_flight() -> send brpc batch void reset() { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->bthread_mem_tracker()); + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker()); cntl.Reset(); cid = cntl.call_id(); } diff --git a/be/src/olap/base_compaction.cpp b/be/src/olap/base_compaction.cpp index 63da436d239079..0e1d737b1587b3 100644 --- a/be/src/olap/base_compaction.cpp +++ b/be/src/olap/base_compaction.cpp @@ -68,7 +68,7 @@ Status BaseCompaction::execute_compact_impl() { return Status::OLAPInternalError(OLAP_ERR_BE_CLONE_OCCURRED); } - SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::COMPACTION); + SCOPED_ATTACH_TASK(_mem_tracker); // 2. do base compaction, merge rowsets int64_t permits = get_compaction_permits(); diff --git a/be/src/olap/cumulative_compaction.cpp b/be/src/olap/cumulative_compaction.cpp index 4461a240b5addc..4736454c09916c 100644 --- a/be/src/olap/cumulative_compaction.cpp +++ b/be/src/olap/cumulative_compaction.cpp @@ -70,7 +70,7 @@ Status CumulativeCompaction::execute_compact_impl() { return Status::OLAPInternalError(OLAP_ERR_CUMULATIVE_CLONE_OCCURRED); } - SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::COMPACTION); + SCOPED_ATTACH_TASK(_mem_tracker); // 3. do cumulative compaction, merge rowsets int64_t permits = get_compaction_permits(); diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index e7ade2021455ab..b042edac976c7b 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -164,7 +164,7 @@ Status DeltaWriter::write(Tuple* tuple) { return Status::OLAPInternalError(OLAP_ERR_ALREADY_CANCELLED); } - SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::LOAD); + SCOPED_ATTACH_TASK(_mem_tracker); _mem_table->insert(tuple); diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index 5eed903d296fc6..a999cdd6ef3d7c 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -54,7 +54,7 @@ MemTable::MemTable(TabletSharedPtr tablet, Schema* schema, const TabletSchema* t _rowset_ids(rowset_ids), _cur_max_version(cur_max_version) { _mem_tracker_hook->enable_reset_zero(); - SCOPED_ATTACH_TASK(_mem_tracker_hook, ThreadContext::TaskType::LOAD); + SCOPED_ATTACH_TASK(_mem_tracker_hook); _mem_tracker_manual = std::make_unique( fmt::format("MemTableManual:tabletId={}", std::to_string(tablet_id()))); _buffer_mem_pool = std::make_unique(_mem_tracker_manual.get()); @@ -176,7 +176,7 @@ int MemTable::RowInBlockComparator::operator()(const RowInBlock* left, } void MemTable::insert(const vectorized::Block* input_block, const std::vector& row_idxs) { - SCOPED_ATTACH_TASK(_mem_tracker_hook, ThreadContext::TaskType::LOAD); + SCOPED_ATTACH_TASK(_mem_tracker_hook); auto target_block = input_block->copy_block(_column_offset); if (_is_first_insertion) { _is_first_insertion = false; @@ -392,7 +392,7 @@ void MemTable::_collect_vskiplist_results() { } void MemTable::shrink_memtable_by_agg() { - SCOPED_ATTACH_TASK(_mem_tracker_hook, ThreadContext::TaskType::LOAD); + SCOPED_ATTACH_TASK(_mem_tracker_hook); if (keys_type() == KeysType::DUP_KEYS) { return; } @@ -434,7 +434,7 @@ Status MemTable::_generate_delete_bitmap() { } Status MemTable::flush() { - SCOPED_ATTACH_TASK(_mem_tracker_hook, ThreadContext::TaskType::LOAD); + SCOPED_ATTACH_TASK(_mem_tracker_hook); VLOG_CRITICAL << "begin to flush memtable for tablet: " << tablet_id() << ", memsize: " << memory_usage() << ", rows: " << _rows; int64_t duration_ns = 0; diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h index 9d837b5560a6aa..afb4db9889fc99 100644 --- a/be/src/olap/memtable.h +++ b/be/src/olap/memtable.h @@ -54,7 +54,7 @@ class MemTable { size_t memory_usage() const { return _mem_tracker_manual->consumption(); } inline void insert(const Tuple* tuple) { - SCOPED_ATTACH_TASK(_mem_tracker_hook, ThreadContext::TaskType::LOAD); + SCOPED_ATTACH_TASK(_mem_tracker_hook); (this->*_insert_fn)(tuple); } // insert tuple from (row_pos) to (row_pos+num_rows) diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index 845031c229393a..606cfddf6e9bb9 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -117,7 +117,7 @@ Status StorageEngine::start_bg_threads() { RETURN_IF_ERROR(Thread::create( "StorageEngine", "path_scan_thread", [this, data_dir]() { - SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::STORAGE); + SCOPED_ATTACH_TASK(_mem_tracker); this->_path_scan_thread_callback(data_dir); }, &path_scan_thread)); @@ -127,7 +127,7 @@ Status StorageEngine::start_bg_threads() { RETURN_IF_ERROR(Thread::create( "StorageEngine", "path_gc_thread", [this, data_dir]() { - SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::STORAGE); + SCOPED_ATTACH_TASK(_mem_tracker); this->_path_gc_thread_callback(data_dir); }, &path_gc_thread)); diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index 183e4fa5ba25b6..85e9ff5ed9300d 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -166,7 +166,7 @@ void StorageEngine::load_data_dirs(const std::vector& data_dirs) { std::vector threads; for (auto data_dir : data_dirs) { threads.emplace_back([this, data_dir] { - SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::STORAGE); + SCOPED_ATTACH_TASK(_mem_tracker); auto res = data_dir->load(); if (!res.ok()) { LOG(WARNING) << "io error when init load tables. res=" << res @@ -212,7 +212,7 @@ Status StorageEngine::_init_store_map() { _tablet_manager.get(), _txn_manager.get()); tmp_stores.emplace_back(store); threads.emplace_back([this, store, &error_msg_lock, &error_msg]() { - SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::STORAGE); + SCOPED_ATTACH_TASK(_mem_tracker); auto st = store->init(); if (!st.ok()) { { diff --git a/be/src/olap/task/engine_alter_tablet_task.cpp b/be/src/olap/task/engine_alter_tablet_task.cpp index 8164049296a9ff..cfe923415bc127 100644 --- a/be/src/olap/task/engine_alter_tablet_task.cpp +++ b/be/src/olap/task/engine_alter_tablet_task.cpp @@ -34,7 +34,7 @@ EngineAlterTabletTask::EngineAlterTabletTask(const TAlterTabletReqV2& request) } Status EngineAlterTabletTask::execute() { - SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::STORAGE); + SCOPED_ATTACH_TASK(_mem_tracker); DorisMetrics::instance()->create_rollup_requests_total->increment(1); Status res = SchemaChangeHandler::process_alter_tablet_v2(_alter_tablet_req); diff --git a/be/src/olap/task/engine_batch_load_task.cpp b/be/src/olap/task/engine_batch_load_task.cpp index 9b5605533922a9..755f6a3bbdc3d9 100644 --- a/be/src/olap/task/engine_batch_load_task.cpp +++ b/be/src/olap/task/engine_batch_load_task.cpp @@ -58,7 +58,7 @@ EngineBatchLoadTask::EngineBatchLoadTask(TPushReq& push_req, std::vectortablet_manager()->register_clone_tablet(_clone_req.tablet_id); Status st = _do_clone(); StorageEngine::instance()->tablet_manager()->unregister_clone_tablet(_clone_req.tablet_id); diff --git a/be/src/runtime/buffer_control_block.cpp b/be/src/runtime/buffer_control_block.cpp index 79496b449b6463..03bd6d346687b6 100644 --- a/be/src/runtime/buffer_control_block.cpp +++ b/be/src/runtime/buffer_control_block.cpp @@ -30,7 +30,7 @@ void GetResultBatchCtx::on_failure(const Status& status) { status.to_protobuf(result->mutable_status()); { // call by result sink - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->bthread_mem_tracker()); + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker()); done->Run(); } delete this; @@ -45,7 +45,7 @@ void GetResultBatchCtx::on_close(int64_t packet_seq, QueryStatistics* statistics result->set_packet_seq(packet_seq); result->set_eos(true); { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->bthread_mem_tracker()); + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker()); done->Run(); } delete this; @@ -73,7 +73,7 @@ void GetResultBatchCtx::on_data(const std::unique_ptr& t_resul } st.to_protobuf(result->mutable_status()); { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->bthread_mem_tracker()); + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker()); done->Run(); } delete this; diff --git a/be/src/runtime/data_stream_recvr.cc b/be/src/runtime/data_stream_recvr.cc index c9533959f8c2f2..59d46102b3ef30 100644 --- a/be/src/runtime/data_stream_recvr.cc +++ b/be/src/runtime/data_stream_recvr.cc @@ -186,10 +186,7 @@ Status DataStreamRecvr::SenderQueue::get_batch(RowBatch** next_batch) { if (!_pending_closures.empty()) { auto closure_pair = _pending_closures.front(); - { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->bthread_mem_tracker()); - closure_pair.first->Run(); - } + closure_pair.first->Run(); _pending_closures.pop_front(); closure_pair.second.stop(); @@ -339,11 +336,8 @@ void DataStreamRecvr::SenderQueue::cancel() { { std::lock_guard l(_lock); - { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->bthread_mem_tracker()); - for (auto closure_pair : _pending_closures) { - closure_pair.first->Run(); - } + for (auto closure_pair : _pending_closures) { + closure_pair.first->Run(); } _pending_closures.clear(); } @@ -357,11 +351,8 @@ void DataStreamRecvr::SenderQueue::close() { std::lock_guard l(_lock); _is_cancelled = true; - { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->bthread_mem_tracker()); - for (auto closure_pair : _pending_closures) { - closure_pair.first->Run(); - } + for (auto closure_pair : _pending_closures) { + closure_pair.first->Run(); } _pending_closures.clear(); } diff --git a/be/src/runtime/data_stream_sender.cpp b/be/src/runtime/data_stream_sender.cpp index cb024917016c07..df1495b7501a80 100644 --- a/be/src/runtime/data_stream_sender.cpp +++ b/be/src/runtime/data_stream_sender.cpp @@ -138,7 +138,7 @@ Status DataStreamSender::Channel::send_batch(PRowBatch* batch, bool eos) { _closure->ref(); } else { RETURN_IF_ERROR(_wait_last_brpc()); - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->bthread_mem_tracker()); + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker()); _closure->cntl.Reset(); } VLOG_ROW << "Channel::send_batch() instance_id=" << _fragment_instance_id @@ -160,7 +160,6 @@ Status DataStreamSender::Channel::send_batch(PRowBatch* batch, bool eos) { if (_parent->_transfer_large_data_by_brpc && _brpc_request.has_row_batch() && _brpc_request.row_batch().has_tuple_data() && _brpc_request.ByteSizeLong() > MIN_HTTP_BRPC_SIZE) { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->bthread_mem_tracker()); Status st = request_embed_attachment_contain_tuple>( &_brpc_request, _closure); @@ -174,11 +173,17 @@ Status DataStreamSender::Channel::send_batch(PRowBatch* batch, bool eos) { brpc_url + "/PInternalServiceImpl/transmit_data_by_http"; _closure->cntl.http_request().set_method(brpc::HTTP_METHOD_POST); _closure->cntl.http_request().set_content_type("application/json"); - _brpc_http_stub->transmit_data_by_http(&_closure->cntl, NULL, &_closure->result, _closure); + { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker()); + _brpc_http_stub->transmit_data_by_http(&_closure->cntl, NULL, &_closure->result, + _closure); + } } else { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->bthread_mem_tracker()); _closure->cntl.http_request().Clear(); - _brpc_stub->transmit_data(&_closure->cntl, &_brpc_request, &_closure->result, _closure); + { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker()); + _brpc_stub->transmit_data(&_closure->cntl, &_brpc_request, &_closure->result, _closure); + } } if (batch != nullptr) { diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 81af4a18f354c9..615741a6e7ebaf 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -120,12 +120,10 @@ class ExecEnv { std::shared_ptr process_mem_tracker() { return _process_mem_tracker; } void set_global_mem_tracker(const std::shared_ptr& process_tracker, const std::shared_ptr& orphan_tracker, - const std::shared_ptr& nursery_mem_tracker, const std::shared_ptr& bthread_mem_tracker) { _process_mem_tracker = process_tracker; _orphan_mem_tracker = orphan_tracker; _orphan_mem_tracker_raw = orphan_tracker.get(); - _nursery_mem_tracker = nursery_mem_tracker; _bthread_mem_tracker = bthread_mem_tracker; } std::shared_ptr allocator_cache_mem_tracker() { @@ -133,7 +131,6 @@ class ExecEnv { } std::shared_ptr orphan_mem_tracker() { return _orphan_mem_tracker; } MemTrackerLimiter* orphan_mem_tracker_raw() { return _orphan_mem_tracker_raw; } - std::shared_ptr nursery_mem_tracker() { return _nursery_mem_tracker; } std::shared_ptr bthread_mem_tracker() { return _bthread_mem_tracker; } std::shared_ptr query_pool_mem_tracker() { return _query_pool_mem_tracker; } std::shared_ptr load_pool_mem_tracker() { return _load_pool_mem_tracker; } @@ -229,8 +226,6 @@ class ExecEnv { // and the consumption of the orphan mem tracker is close to 0, but greater than 0. std::shared_ptr _orphan_mem_tracker; MemTrackerLimiter* _orphan_mem_tracker_raw; - // Parent is orphan, Nursery of orphan memory after manually switching thread mem tracker - std::shared_ptr _nursery_mem_tracker; // Parent is orphan, bthread default mem tracker std::shared_ptr _bthread_mem_tracker; // The ancestor for all querys tracker. diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index dfdc868e67adb0..bf0f66d3442aae 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -202,7 +202,6 @@ Status ExecEnv::_init_mem_tracker() { std::make_shared(global_memory_limit_bytes, "Process"); _orphan_mem_tracker = std::make_shared(-1, "Orphan", _process_mem_tracker); _orphan_mem_tracker_raw = _orphan_mem_tracker.get(); - _nursery_mem_tracker = std::make_shared(-1, "Nursery", _orphan_mem_tracker); _bthread_mem_tracker = std::make_shared(-1, "Bthread", _orphan_mem_tracker); thread_context()->_thread_mem_tracker_mgr->init(); thread_context()->_thread_mem_tracker_mgr->set_check_attach(false); diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp index b19f9181b0120b..379d4582520ed2 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp @@ -29,25 +29,25 @@ void ThreadMemTrackerMgr::attach_limiter_tracker( const std::shared_ptr& mem_tracker) { DCHECK(mem_tracker); flush_untracked_mem(); - _task_id_stack.push_back(task_id); - _fragment_instance_id_stack.push_back(fragment_instance_id); - _limiter_tracker_stack.push_back(mem_tracker); + _task_id = task_id; + _fragment_instance_id = fragment_instance_id; + _limiter_tracker = mem_tracker; _limiter_tracker_raw = mem_tracker.get(); } -void ThreadMemTrackerMgr::detach_limiter_tracker() { - DCHECK(!_limiter_tracker_stack.empty()); +void ThreadMemTrackerMgr::detach_limiter_tracker( + const std::shared_ptr& old_mem_tracker) { flush_untracked_mem(); - _task_id_stack.pop_back(); - _fragment_instance_id_stack.pop_back(); - _limiter_tracker_stack.pop_back(); - _limiter_tracker_raw = _limiter_tracker_stack.back().get(); + _task_id = ""; + _fragment_instance_id = TUniqueId(); + _limiter_tracker = old_mem_tracker; + _limiter_tracker_raw = old_mem_tracker.get(); } void ThreadMemTrackerMgr::exceeded_cancel_task(const std::string& cancel_details) { - if (_fragment_instance_id_stack.back() != TUniqueId()) { + if (_fragment_instance_id != TUniqueId()) { ExecEnv::GetInstance()->fragment_mgr()->cancel( - _fragment_instance_id_stack.back(), PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED, + _fragment_instance_id, PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED, cancel_details); } } diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h index 5289db99e384f4..afae746e440218 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.h +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h @@ -47,11 +47,6 @@ class ThreadMemTrackerMgr { // if _init == false, exec env is not initialized when init(). and never consumed mem tracker once. if (_init) { flush_untracked_mem(); - if (bthread_self() == 0) { - DCHECK(_consumer_tracker_stack.empty()); - DCHECK(_limiter_tracker_stack.size() == 1) - << ", limiter_tracker_stack.size(): " << _limiter_tracker_stack.size(); - } } } @@ -62,19 +57,13 @@ class ThreadMemTrackerMgr { } } - // After thread initialization, calling `init` again must call `clear_untracked_mems` first - // to avoid memory tracking loss. void init(); - void init_impl(); - void clear(); // After attach, the current thread TCMalloc Hook starts to consume/release task mem_tracker void attach_limiter_tracker(const std::string& task_id, const TUniqueId& fragment_instance_id, const std::shared_ptr& mem_tracker); - void detach_limiter_tracker(); - // Usually there are only two layers, the first is the default trackerOrphan; - // the second is the query tracker or bthread tracker. - int64_t get_attach_layers() { return _limiter_tracker_stack.size(); } + void detach_limiter_tracker(const std::shared_ptr& old_mem_tracker = + ExecEnv::GetInstance()->orphan_mem_tracker()); // Must be fast enough! Thread update_tracker may be called very frequently. // So for performance, add tracker as early as possible, and then call update_tracker. @@ -105,14 +94,14 @@ class ThreadMemTrackerMgr { template void flush_untracked_mem(); - bool is_attach_query() { return _fragment_instance_id_stack.back() != TUniqueId(); } + bool is_attach_query() { return _fragment_instance_id != TUniqueId(); } std::shared_ptr limiter_mem_tracker() { - if (!_init) init(); - return _limiter_tracker_stack.back(); + if (!_init) init(); // ExecEnv not initialized when thread is created. + return _limiter_tracker; } MemTrackerLimiter* limiter_mem_tracker_raw() { - if (!_init) init(); + if (!_init) init(); // ExecEnv not initialized when thread is created. return _limiter_tracker_raw; } @@ -129,8 +118,8 @@ class ThreadMemTrackerMgr { return fmt::format( "ThreadMemTrackerMgr debug, _untracked_mem:{}, _task_id:{}, " "_limiter_tracker:<{}>, _consumer_tracker_stack:<{}>", - std::to_string(_untracked_mem), _task_id_stack.back(), - _limiter_tracker_raw->log_usage(1), fmt::to_string(consumer_tracker_buf)); + std::to_string(_untracked_mem), _task_id, _limiter_tracker_raw->log_usage(1), + fmt::to_string(consumer_tracker_buf)); } private: @@ -152,8 +141,7 @@ class ThreadMemTrackerMgr { std::string failed_msg = std::string(); - // _limiter_tracker_stack[0] = orphan_mem_tracker - std::vector> _limiter_tracker_stack; + std::shared_ptr _limiter_tracker; MemTrackerLimiter* _limiter_tracker_raw = nullptr; std::vector _consumer_tracker_stack; @@ -162,36 +150,20 @@ class ThreadMemTrackerMgr { // If there is a memory new/delete operation in the consume method, it may enter infinite recursion. bool _stop_consume = false; bool _check_attach = true; - std::vector _task_id_stack; - std::vector _fragment_instance_id_stack; + std::string _task_id = ""; + TUniqueId _fragment_instance_id = TUniqueId(); ExceedCallBack _cb_func = nullptr; }; inline void ThreadMemTrackerMgr::init() { - DCHECK(_limiter_tracker_stack.size() == 0); + DCHECK(_limiter_tracker == nullptr); DCHECK(_limiter_tracker_raw == nullptr); DCHECK(_consumer_tracker_stack.empty()); - init_impl(); -} - -inline void ThreadMemTrackerMgr::init_impl() { - _limiter_tracker_stack.push_back(ExecEnv::GetInstance()->orphan_mem_tracker()); + _limiter_tracker = ExecEnv::GetInstance()->orphan_mem_tracker(); _limiter_tracker_raw = ExecEnv::GetInstance()->orphan_mem_tracker_raw(); - _task_id_stack.push_back(""); - _fragment_instance_id_stack.push_back(TUniqueId()); - _check_limit = true; _init = true; } -inline void ThreadMemTrackerMgr::clear() { - flush_untracked_mem(); - std::vector>().swap(_limiter_tracker_stack); - std::vector().swap(_consumer_tracker_stack); - std::vector().swap(_task_id_stack); - std::vector().swap(_fragment_instance_id_stack); - init_impl(); -} - inline void ThreadMemTrackerMgr::push_consumer_tracker(MemTracker* tracker) { DCHECK(tracker) << print_debug_string(); DCHECK(!std::count(_consumer_tracker_stack.begin(), _consumer_tracker_stack.end(), tracker)) @@ -228,7 +200,7 @@ inline void ThreadMemTrackerMgr::flush_untracked_mem() { // Temporary memory may be allocated during the consumption of the mem tracker, which will lead to entering // the TCMalloc Hook again, so suspend consumption to avoid falling into an infinite loop. _stop_consume = true; - if (!_init) init(); + if (!_init) init(); // ExecEnv not initialized when thread is created. DCHECK(_limiter_tracker_raw); old_untracked_mem = _untracked_mem; if (_count_scope_mem) _scope_mem += _untracked_mem; diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 3766431f50f402..97dd2730720d87 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -243,8 +243,7 @@ Status RuntimeState::init_mem_trackers(const TUniqueId& query_id) { _exec_env->task_pool_mem_tracker_registry()->register_load_scanner_mem_tracker( print_id(query_id)); } else { - DCHECK(false); - _query_mem_tracker = ExecEnv::GetInstance()->query_pool_mem_tracker(); + CHECK(false) << "query_typ: " << query_type(); } _query_mem_tracker->enable_reset_zero(); diff --git a/be/src/runtime/sorted_run_merger.cc b/be/src/runtime/sorted_run_merger.cc index 28d347462f8d08..32bdad9a6d97a8 100644 --- a/be/src/runtime/sorted_run_merger.cc +++ b/be/src/runtime/sorted_run_merger.cc @@ -183,7 +183,7 @@ class SortedRunMerger::ParallelBatchedRowSupplier : public SortedRunMerger::Batc std::condition_variable _batch_prepared_cv; void process_sorted_run_task(const std::shared_ptr& mem_tracker) { - SCOPED_ATTACH_TASK(mem_tracker, ThreadContext::TaskType::QUERY); + SCOPED_ATTACH_TASK(mem_tracker); std::unique_lock lock(_mutex); while (true) { _batch_prepared_cv.wait(lock, [this]() { return !_backup_ready.load(); }); diff --git a/be/src/runtime/stream_load/stream_load_pipe.h b/be/src/runtime/stream_load/stream_load_pipe.h index 106426653ea673..fac16b81ff7ab5 100644 --- a/be/src/runtime/stream_load/stream_load_pipe.h +++ b/be/src/runtime/stream_load/stream_load_pipe.h @@ -47,7 +47,7 @@ class StreamLoadPipe : public MessageBodySink, public FileReader { _use_proto(use_proto) {} virtual ~StreamLoadPipe() { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->nursery_mem_tracker()); + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker()); while (!_buf_queue.empty()) _buf_queue.pop_front(); } @@ -119,7 +119,7 @@ class StreamLoadPipe : public MessageBodySink, public FileReader { } Status read(uint8_t* data, int64_t data_size, int64_t* bytes_read, bool* eof) override { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->nursery_mem_tracker()); + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker()); *bytes_read = 0; while (*bytes_read < data_size) { std::unique_lock l(_lock); diff --git a/be/src/runtime/thread_context.cpp b/be/src/runtime/thread_context.cpp index 1e6be4a0d9f20c..c4f60f8861468d 100644 --- a/be/src/runtime/thread_context.cpp +++ b/be/src/runtime/thread_context.cpp @@ -39,10 +39,9 @@ ScopeMemCount::~ScopeMemCount() { } AttachTask::AttachTask(const std::shared_ptr& mem_tracker, - const ThreadContext::TaskType& type, const std::string& task_id, - const TUniqueId& fragment_instance_id) { + const std::string& task_id, const TUniqueId& fragment_instance_id) { DCHECK(mem_tracker); - thread_context()->attach_task(type, task_id, fragment_instance_id, mem_tracker); + thread_context()->attach_task(task_id, fragment_instance_id, mem_tracker); } AttachTask::AttachTask(RuntimeState* runtime_state) { @@ -51,8 +50,7 @@ AttachTask::AttachTask(RuntimeState* runtime_state) { DCHECK(runtime_state->fragment_instance_id() != TUniqueId()); #endif // BE_TEST DCHECK(runtime_state->instance_mem_tracker()); - thread_context()->attach_task(ThreadContext::query_to_task_type(runtime_state->query_type()), - print_id(runtime_state->query_id()), + thread_context()->attach_task(print_id(runtime_state->query_id()), runtime_state->fragment_instance_id(), runtime_state->instance_mem_tracker()); } @@ -67,12 +65,13 @@ AttachTask::~AttachTask() { SwitchThreadMemTrackerLimiter::SwitchThreadMemTrackerLimiter( const std::shared_ptr& mem_tracker_limiter) { DCHECK(mem_tracker_limiter); + _old_mem_tracker = thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker(); thread_context()->_thread_mem_tracker_mgr->attach_limiter_tracker("", TUniqueId(), mem_tracker_limiter); } SwitchThreadMemTrackerLimiter::~SwitchThreadMemTrackerLimiter() { - thread_context()->_thread_mem_tracker_mgr->detach_limiter_tracker(); + thread_context()->_thread_mem_tracker_mgr->detach_limiter_tracker(_old_mem_tracker); } AddThreadMemTrackerConsumer::AddThreadMemTrackerConsumer(MemTracker* mem_tracker) { diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index 20a0b074c1287b..5c1216eae63ab6 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -140,82 +140,40 @@ inline thread_local bthread_t bthread_id; // // There may be other optional info to be added later. class ThreadContext { -public: - enum TaskType { - UNKNOWN = 0, - QUERY = 1, - LOAD = 2, - COMPACTION = 3, - STORAGE = 4, - BRPC = 5 - // to be added ... - }; - inline static const std::string TaskTypeStr[] = {"UNKNOWN", "QUERY", "LOAD", - "COMPACTION", "STORAGE", "BRPC"}; - public: ThreadContext() { _thread_mem_tracker_mgr.reset(new ThreadMemTrackerMgr()); init(); } - ~ThreadContext() { - // Restore to the memory state before init=true to ensure accurate overall memory statistics. - // Thereby ensuring that the memory alloc size is not tracked during the initialization of the - // ThreadContext before `init = true in ThreadContextPtr()`, - // Equal to the size of the memory release that is not tracked during the destruction of the - // ThreadContext after `init = false in ~ThreadContextPtr()`, - if (ExecEnv::GetInstance()->initialized()) _thread_mem_tracker_mgr->clear(); - thread_context_ptr.init = false; - } + ~ThreadContext() { thread_context_ptr.init = false; } void init() { - _type = TaskType::UNKNOWN; if (ExecEnv::GetInstance()->initialized()) _thread_mem_tracker_mgr->init(); _thread_id = get_thread_id(); } - void attach_task(const TaskType& type, const std::string& task_id, - const TUniqueId& fragment_instance_id, + void attach_task(const std::string& task_id, const TUniqueId& fragment_instance_id, const std::shared_ptr& mem_tracker) { #ifndef BE_TEST // will only attach_task at the beginning of the thread function, there should be no duplicate attach_task. - DCHECK((_type == TaskType::UNKNOWN || _type == TaskType::BRPC) && - type != TaskType::UNKNOWN && _task_id == "" && mem_tracker != nullptr) - << ",new tracker label: " << mem_tracker->label() << ",old tracker label: " - << _thread_mem_tracker_mgr->limiter_mem_tracker_raw()->label(); + DCHECK(mem_tracker != nullptr); #endif - _type = type; _task_id = task_id; _fragment_instance_id = fragment_instance_id; _thread_mem_tracker_mgr->attach_limiter_tracker(task_id, fragment_instance_id, mem_tracker); } void detach_task() { - _type = TaskType::UNKNOWN; _task_id = ""; _fragment_instance_id = TUniqueId(); _thread_mem_tracker_mgr->detach_limiter_tracker(); } - const TaskType& type() const { return _type; } - const void set_type(const TaskType& type) { _type = type; } const std::string& task_id() const { return _task_id; } const std::string& thread_id_str() const { return _thread_id; } const TUniqueId& fragment_instance_id() const { return _fragment_instance_id; } - static TaskType query_to_task_type(const TQueryType::type& query_type) { - switch (query_type) { - case TQueryType::SELECT: - return TaskType::QUERY; - case TQueryType::LOAD: - return TaskType::LOAD; - default: - DCHECK(false); - return TaskType::UNKNOWN; - } - } - std::string get_thread_id() { std::stringstream ss; ss << std::this_thread::get_id(); @@ -232,12 +190,13 @@ class ThreadContext { private: std::string _thread_id; - TaskType _type; std::string _task_id; TUniqueId _fragment_instance_id; }; -static void attach_bthread() { +// Cache the pointer of bthread local in pthead local, +// Avoid calling bthread_getspecific frequently to get bthread local, which has performance problems. +static void pthread_attach_bthread() { bthread_id = bthread_self(); bthread_context = static_cast(bthread_getspecific(btls_key)); if (bthread_context == nullptr) { @@ -252,7 +211,7 @@ static void attach_bthread() { std::shared_ptr btls_tracker = std::make_shared(-1, "Bthread:id=" + std::to_string(bthread_id), ExecEnv::GetInstance()->bthread_mem_tracker()); - bthread_context->attach_task(ThreadContext::TaskType::BRPC, "", TUniqueId(), btls_tracker); + bthread_context->attach_task("", TUniqueId(), btls_tracker); // set the data so that next time bthread_getspecific in the thread returns the data. CHECK_EQ(0, bthread_setspecific(btls_key, bthread_context)); } else { @@ -260,7 +219,6 @@ static void attach_bthread() { // 1. A new bthread starts, but get a reuses btls. // 2. A pthread switch occurs. Because the pthread switch cannot be accurately identified at the moment. // So tracker call reset 0 like reuses btls. - DCHECK(bthread_context->_thread_mem_tracker_mgr->get_attach_layers() == 2); bthread_context->_thread_mem_tracker_mgr->limiter_mem_tracker_raw()->reset_zero(); } } @@ -268,9 +226,9 @@ static void attach_bthread() { static ThreadContext* thread_context() { if (bthread_self() != 0) { if (bthread_self() != bthread_id) { - // A new bthread starts or pthread switch occurs. + // A new bthread starts or pthread switch occurs, during this period, stop the use of thread_context. thread_context_ptr.init = false; - attach_bthread(); + pthread_attach_bthread(); thread_context_ptr.init = true; } return bthread_context; @@ -292,7 +250,6 @@ class ScopeMemCount { class AttachTask { public: explicit AttachTask(const std::shared_ptr& mem_tracker, - const ThreadContext::TaskType& type = ThreadContext::TaskType::UNKNOWN, const std::string& task_id = "", const TUniqueId& fragment_instance_id = TUniqueId()); @@ -307,6 +264,9 @@ class SwitchThreadMemTrackerLimiter { const std::shared_ptr& mem_tracker_limiter); ~SwitchThreadMemTrackerLimiter(); + +private: + std::shared_ptr _old_mem_tracker; }; class AddThreadMemTrackerConsumer { diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 356627b43cf92e..f5afe34d8efbee 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -62,12 +62,14 @@ static void thread_context_deleter(void* d) { } template -class NewHttpClosure : public ::google::protobuf::Closure { +class NewClosure : public ::google::protobuf::Closure { public: - NewHttpClosure(T* request, google::protobuf::Closure* done) : _request(request), _done(done) {} - ~NewHttpClosure() {} + NewClosure(google::protobuf::Closure* done) : _done(done) {} + NewClosure(T* request, google::protobuf::Closure* done) : _request(request), _done(done) {} + ~NewClosure() {} void Run() { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker()); if (_request != nullptr) { delete _request; _request = nullptr; @@ -102,22 +104,22 @@ void PInternalServiceImpl::transmit_data(google::protobuf::RpcController* cntl_b PTransmitDataResult* response, google::protobuf::Closure* done) { // TODO(zxy) delete in 1.2 version + google::protobuf::Closure* new_done = new NewClosure(done); brpc::Controller* cntl = static_cast(cntl_base); attachment_transfer_request_row_batch(request, cntl); - _transmit_data(cntl_base, request, response, done, Status::OK()); + _transmit_data(cntl_base, request, response, new_done, Status::OK()); } void PInternalServiceImpl::transmit_data_by_http(google::protobuf::RpcController* cntl_base, const PEmptyRequest* request, PTransmitDataResult* response, google::protobuf::Closure* done) { - PTransmitDataParams* request_raw = new PTransmitDataParams(); - google::protobuf::Closure* done_raw = - new NewHttpClosure(request_raw, done); + PTransmitDataParams* new_request = new PTransmitDataParams(); + google::protobuf::Closure* new_done = new NewClosure(new_request, done); brpc::Controller* cntl = static_cast(cntl_base); - Status st = attachment_extract_request_contain_tuple(request_raw, cntl); - _transmit_data(cntl_base, request_raw, response, done_raw, st); + Status st = attachment_extract_request_contain_tuple(new_request, cntl); + _transmit_data(cntl_base, new_request, response, new_done, st); } void PInternalServiceImpl::_transmit_data(google::protobuf::RpcController* cntl_base, @@ -146,7 +148,7 @@ void PInternalServiceImpl::_transmit_data(google::protobuf::RpcController* cntl_ Status st; st.to_protobuf(response->mutable_status()); if (extract_st.ok()) { - SCOPED_ATTACH_TASK(transmit_tracker, ThreadContext::TaskType::QUERY, query_id, finst_id); + SCOPED_ATTACH_TASK(transmit_tracker, query_id, finst_id); st = _exec_env->stream_mgr()->transmit_data(request, &done); if (!st.ok()) { LOG(WARNING) << "transmit_data failed, message=" << st.get_error_msg() @@ -219,23 +221,24 @@ void PInternalServiceImpl::tablet_writer_add_block(google::protobuf::RpcControll PTabletWriterAddBlockResult* response, google::protobuf::Closure* done) { // TODO(zxy) delete in 1.2 version + google::protobuf::Closure* new_done = new NewClosure(done); brpc::Controller* cntl = static_cast(cntl_base); attachment_transfer_request_block(request, cntl); - _tablet_writer_add_block(cntl_base, request, response, done); + _tablet_writer_add_block(cntl_base, request, response, new_done); } void PInternalServiceImpl::tablet_writer_add_block_by_http( google::protobuf::RpcController* cntl_base, const ::doris::PEmptyRequest* request, PTabletWriterAddBlockResult* response, google::protobuf::Closure* done) { - PTabletWriterAddBlockRequest* request_raw = new PTabletWriterAddBlockRequest(); - google::protobuf::Closure* done_raw = - new NewHttpClosure(request_raw, done); + PTabletWriterAddBlockRequest* new_request = new PTabletWriterAddBlockRequest(); + google::protobuf::Closure* new_done = + new NewClosure(new_request, done); brpc::Controller* cntl = static_cast(cntl_base); - Status st = attachment_extract_request_contain_block(request_raw, + Status st = attachment_extract_request_contain_block(new_request, cntl); if (st.ok()) { - _tablet_writer_add_block(cntl_base, request_raw, response, done_raw); + _tablet_writer_add_block(cntl_base, new_request, response, new_done); } else { st.to_protobuf(response->mutable_status()); } @@ -274,20 +277,21 @@ void PInternalServiceImpl::tablet_writer_add_batch(google::protobuf::RpcControll const PTabletWriterAddBatchRequest* request, PTabletWriterAddBatchResult* response, google::protobuf::Closure* done) { - _tablet_writer_add_batch(cntl_base, request, response, done); + google::protobuf::Closure* new_done = new NewClosure(done); + _tablet_writer_add_batch(cntl_base, request, response, new_done); } void PInternalServiceImpl::tablet_writer_add_batch_by_http( google::protobuf::RpcController* cntl_base, const ::doris::PEmptyRequest* request, PTabletWriterAddBatchResult* response, google::protobuf::Closure* done) { - PTabletWriterAddBatchRequest* request_raw = new PTabletWriterAddBatchRequest(); - google::protobuf::Closure* done_raw = - new NewHttpClosure(request_raw, done); + PTabletWriterAddBatchRequest* new_request = new PTabletWriterAddBatchRequest(); + google::protobuf::Closure* new_done = + new NewClosure(new_request, done); brpc::Controller* cntl = static_cast(cntl_base); - Status st = attachment_extract_request_contain_tuple(request_raw, + Status st = attachment_extract_request_contain_tuple(new_request, cntl); if (st.ok()) { - _tablet_writer_add_batch(cntl_base, request_raw, response, done_raw); + _tablet_writer_add_batch(cntl_base, new_request, response, new_done); } else { st.to_protobuf(response->mutable_status()); } @@ -611,22 +615,22 @@ void PInternalServiceImpl::transmit_block(google::protobuf::RpcController* cntl_ PTransmitDataResult* response, google::protobuf::Closure* done) { // TODO(zxy) delete in 1.2 version + google::protobuf::Closure* new_done = new NewClosure(done); brpc::Controller* cntl = static_cast(cntl_base); attachment_transfer_request_block(request, cntl); - _transmit_block(cntl_base, request, response, done, Status::OK()); + _transmit_block(cntl_base, request, response, new_done, Status::OK()); } void PInternalServiceImpl::transmit_block_by_http(google::protobuf::RpcController* cntl_base, const PEmptyRequest* request, PTransmitDataResult* response, google::protobuf::Closure* done) { - PTransmitDataParams* request_raw = new PTransmitDataParams(); - google::protobuf::Closure* done_raw = - new NewHttpClosure(request_raw, done); + PTransmitDataParams* new_request = new PTransmitDataParams(); + google::protobuf::Closure* new_done = new NewClosure(new_request, done); brpc::Controller* cntl = static_cast(cntl_base); - Status st = attachment_extract_request_contain_block(request_raw, cntl); - _transmit_block(cntl_base, request_raw, response, done_raw, st); + Status st = attachment_extract_request_contain_block(new_request, cntl); + _transmit_block(cntl_base, new_request, response, new_done, st); } void PInternalServiceImpl::_transmit_block(google::protobuf::RpcController* cntl_base, @@ -656,7 +660,7 @@ void PInternalServiceImpl::_transmit_block(google::protobuf::RpcController* cntl Status st; st.to_protobuf(response->mutable_status()); if (extract_st.ok()) { - SCOPED_ATTACH_TASK(transmit_tracker, ThreadContext::TaskType::QUERY, query_id, finst_id); + SCOPED_ATTACH_TASK(transmit_tracker, query_id, finst_id); st = _exec_env->vstream_mgr()->transmit_block(request, &done); if (!st.ok()) { LOG(WARNING) << "transmit_block failed, message=" << st.get_error_msg() diff --git a/be/src/util/ref_count_closure.h b/be/src/util/ref_count_closure.h index b91d6225ed6515..c278dae9a3872b 100644 --- a/be/src/util/ref_count_closure.h +++ b/be/src/util/ref_count_closure.h @@ -37,6 +37,7 @@ class RefCountClosure : public google::protobuf::Closure { bool unref() { return _refs.fetch_sub(1) == 1; } void Run() override { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker()); if (unref()) { delete this; } diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index b561fca483527e..15e5b583f25fc1 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -186,7 +186,6 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, ScannerContext INIT_AND_SCOPE_REENTRANT_SPAN_IF(ctx->state()->enable_profile(), ctx->state()->get_tracer(), ctx->scan_span(), "VScanner::scan"); SCOPED_ATTACH_TASK(scanner->runtime_state()->scanner_mem_tracker(), - ThreadContext::query_to_task_type(scanner->runtime_state()->query_type()), print_id(scanner->runtime_state()->query_id()), scanner->runtime_state()->fragment_instance_id()); Thread::set_self_name("_scanner_scan"); diff --git a/be/src/vec/exec/volap_scan_node.cpp b/be/src/vec/exec/volap_scan_node.cpp index 2fbd9bb99d2003..60b047f40f2491 100644 --- a/be/src/vec/exec/volap_scan_node.cpp +++ b/be/src/vec/exec/volap_scan_node.cpp @@ -394,9 +394,7 @@ void VOlapScanNode::transfer_thread(RuntimeState* state) { } void VOlapScanNode::scanner_thread(VOlapScanner* scanner) { - SCOPED_ATTACH_TASK(_runtime_state->scanner_mem_tracker(), - ThreadContext::query_to_task_type(_runtime_state->query_type()), - print_id(_runtime_state->query_id()), + SCOPED_ATTACH_TASK(_runtime_state->scanner_mem_tracker(), print_id(_runtime_state->query_id()), _runtime_state->fragment_instance_id()); Thread::set_self_name("volap_scanner"); int64_t wait_time = scanner->update_wait_worker_timer(); diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index 71e2d36ab30517..0e40404733243f 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -77,10 +77,7 @@ Status VDataStreamRecvr::SenderQueue::get_batch(Block** next_block) { if (!_pending_closures.empty()) { auto closure_pair = _pending_closures.front(); - { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->bthread_mem_tracker()); - closure_pair.first->Run(); - } + closure_pair.first->Run(); _pending_closures.pop_front(); closure_pair.second.stop(); @@ -222,11 +219,8 @@ void VDataStreamRecvr::SenderQueue::cancel() { { std::lock_guard l(_lock); - { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->bthread_mem_tracker()); - for (auto closure_pair : _pending_closures) { - closure_pair.first->Run(); - } + for (auto closure_pair : _pending_closures) { + closure_pair.first->Run(); } _pending_closures.clear(); } @@ -240,11 +234,8 @@ void VDataStreamRecvr::SenderQueue::close() { std::lock_guard l(_lock); _is_cancelled = true; - { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->bthread_mem_tracker()); - for (auto closure_pair : _pending_closures) { - closure_pair.first->Run(); - } + for (auto closure_pair : _pending_closures) { + closure_pair.first->Run(); } _pending_closures.clear(); } diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp index e92929d0d93384..6d8c097ce02fbd 100644 --- a/be/src/vec/sink/vdata_stream_sender.cpp +++ b/be/src/vec/sink/vdata_stream_sender.cpp @@ -139,7 +139,7 @@ Status VDataStreamSender::Channel::send_block(PBlock* block, bool eos) { _closure->ref(); } else { RETURN_IF_ERROR(_wait_last_brpc()); - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->bthread_mem_tracker()); + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker()); _closure->cntl.Reset(); } VLOG_ROW << "Channel::send_batch() instance_id=" << _fragment_instance_id @@ -162,7 +162,7 @@ Status VDataStreamSender::Channel::send_block(PBlock* block, bool eos) { if (_parent->_transfer_large_data_by_brpc && _brpc_request.has_block() && _brpc_request.block().has_column_values() && _brpc_request.ByteSizeLong() > MIN_HTTP_BRPC_SIZE) { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->bthread_mem_tracker()); + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker()); Status st = request_embed_attachment_contain_block>( &_brpc_request, _closure); @@ -179,7 +179,7 @@ Status VDataStreamSender::Channel::send_block(PBlock* block, bool eos) { _brpc_http_stub->transmit_block_by_http(&_closure->cntl, nullptr, &_closure->result, _closure); } else { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->bthread_mem_tracker()); + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker()); _closure->cntl.http_request().Clear(); _brpc_stub->transmit_block(&_closure->cntl, &_brpc_request, &_closure->result, _closure); } diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp index 3f4dfb5e725ab2..8f8f813b499930 100644 --- a/be/src/vec/sink/vtablet_sink.cpp +++ b/be/src/vec/sink/vtablet_sink.cpp @@ -354,7 +354,7 @@ void VNodeChannel::try_send_block(RuntimeState* state) { _add_block_closure->cntl.http_request().set_content_type("application/json"); { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->bthread_mem_tracker()); + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker()); _brpc_http_stub->tablet_writer_add_block_by_http(&_add_block_closure->cntl, NULL, &_add_block_closure->result, _add_block_closure); @@ -362,7 +362,7 @@ void VNodeChannel::try_send_block(RuntimeState* state) { } else { _add_block_closure->cntl.http_request().Clear(); { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->bthread_mem_tracker()); + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker()); _stub->tablet_writer_add_block(&_add_block_closure->cntl, &request, &_add_block_closure->result, _add_block_closure); } diff --git a/be/test/testutil/run_all_tests.cpp b/be/test/testutil/run_all_tests.cpp index a1e53f7ed0930b..ffe239cc0f9368 100644 --- a/be/test/testutil/run_all_tests.cpp +++ b/be/test/testutil/run_all_tests.cpp @@ -32,12 +32,10 @@ int main(int argc, char** argv) { std::make_shared(-1, "Process"); std::shared_ptr orphan_mem_tracker = std::make_shared(-1, "Orphan", process_mem_tracker); - std::shared_ptr nursery_mem_tracker = - std::make_shared(-1, "Nursery", orphan_mem_tracker); std::shared_ptr bthread_mem_tracker = std::make_shared(-1, "Bthread", orphan_mem_tracker); doris::ExecEnv::GetInstance()->set_global_mem_tracker(process_mem_tracker, orphan_mem_tracker, - nursery_mem_tracker, bthread_mem_tracker); + bthread_mem_tracker); doris::thread_context()->_thread_mem_tracker_mgr->init(); doris::TabletSchemaCache::create_global_schema_cache(); doris::StoragePageCache::create_global_cache(1 << 30, 10);