From d926041cc3c03160545785c729c1143e05115885 Mon Sep 17 00:00:00 2001 From: zhiqiang Date: Tue, 10 Sep 2024 11:00:56 +0800 Subject: [PATCH] [fix](scanner) Fix deadlock when scanner submit failed (#40495) We have dead lock when submit scanner to scheduler failed. pstack looks like ```txt Thread 2012 (Thread 0x7f87363fb700 (LWP 4179707) "Pipe_normal [wo"): #0 0x00007f8b8f3dc82d in __lll_lock_wait () from /lib64/libpthread.so.0 #1 0x00007f8b8f3d5ad9 in pthread_mutex_lock () from /lib64/libpthread.so.0 #2 0x000055b20f333e7a in __gthread_mutex_lock (__mutex=0x7f8733d960a8) at /mnt/disk1/hezhiqiang/toolchains/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/x86_64-linux-gnu/c++/11/bits/gthr-default .h:749 #3 std::mutex::lock (this=0x7f8733d960a8) at /mnt/disk1/hezhiqiang/toolchains/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/std_mutex.h:100 #4 std::lock_guard::lock_guard (__m=..., this=) at /mnt/disk1/hezhiqiang/toolchains/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/std_mutex.h:229 #5 doris::vectorized::ScannerContext::append_block_to_queue (this=, scan_task=...) at /mnt/disk1/hezhiqiang/doris/be/src/vec/exec/scan/scanner_context.cpp:234 #6 0x000055b20f32c0f9 in doris::vectorized::ScannerScheduler::submit (this=, ctx=..., scan_task=...) at /mnt/disk1/hezhiqiang/doris/be/src/vec/exec/scan/scanner_scheduler.cpp:209 #7 0x000055b20f3338fc in doris::vectorized::ScannerContext::submit_scan_task (this=this@entry=0x7f8733d96010, scan_task=...) at /mnt/disk1/hezhiqiang/doris/be/src/vec/exec/scan/scanner_context.cpp:217 #8 0x000055b20f3346cd in doris::vectorized::ScannerContext::get_block_from_queue (this=0x7f8733d96010, state=, block=0x7f871f728de0, eos=0x7f871abce470, id=) at /mnt/disk1/hezhiqiang/doris/be/src/vec/exec/scan/scanner_context.cpp:290 #9 0x000055b214cb4f13 in doris::pipeline::ScanOperatorX::get_block (this=, state=0x7f872f0eb400, block=0x7f8b8f3dc82d <__lll_lock_wait+29>, eos=0x7f871abce470) at /mnt/disk1/hezhiqiang/doris/be/src/pipeline/exec/scan_operator.cpp:1292 #10 0x000055b2142b5772 in doris::pipeline::ScanOperatorX::get_block_after_projects (this=0x80, state=0x0, block=0x7f8b8f3dc82d <__lll_lock_wait+29>, eos=0x7f8733d960a8) at /mnt/disk1/hezhiqiang/doris/be/src/pipeline/exec/scan_operator.h:363 #11 0x000055b2142e7880 in doris::pipeline::StatefulOperatorX::get_block (this=0x7f871f9bee00, state=0x7f872f0eb400, block=0x7f8716d49060, eos=0x7f87363f4937) at /mnt/disk1/hezhiqiang/doris/be/src/pipeline/exec/operator.cpp:587 ``` Deallock happens with following ```cpp Status ScannerContext::get_block_from_queue { std::unique_lock l(_transfer_lock); ... if (scan_task->is_eos()) { ... } else { // resubmit current running scanner to read the next block submit_scan_task(scan_task); } } ScannerContext::submit_scan_task(std::shared_ptr scan_task) { _scanner_scheduler->submit(shared_from_this(), scan_task); } void ScannerScheduler::submit(std::shared_ptr ctx, std::shared_ptr scan_task) { ... if (auto ret = sumbit_task(); !ret) { scan_task->set_status(Status::InternalError( "Failed to submit scanner to scanner pool reason:" + std::string(ret.msg()) + "|type:" + std::to_string(type))); ctx->append_block_to_queue(scan_task); return; } } void ScannerContext::append_block_to_queue(std::shared_ptr scan_task) { ... std::lock_guard l(_transfer_lock); ... } ``` Since mutex in cpp is not re-enterable, so the scanner thread will deadlock with itself. This pr fix the problem by making `ScannerScheduler::submit` return a Status instead of doing append failed task to the ScannerContext. The caller itself will decide where resubmit the scanner or just abort the execution of the query. --- be/src/vec/exec/scan/scanner_context.cpp | 44 ++++++++++++++++------ be/src/vec/exec/scan/scanner_context.h | 4 +- be/src/vec/exec/scan/scanner_scheduler.cpp | 31 ++++++++------- be/src/vec/exec/scan/scanner_scheduler.h | 2 +- 4 files changed, 53 insertions(+), 28 deletions(-) diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index bab11616c77c06..52838d7cf4651f 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -141,7 +141,7 @@ Status ScannerContext::init() { for (int i = 0; i < _max_thread_num; ++i) { std::weak_ptr next_scanner; if (_scanners.try_dequeue(next_scanner)) { - submit_scan_task(std::make_shared(next_scanner)); + RETURN_IF_ERROR(submit_scan_task(std::make_shared(next_scanner))); _num_running_scanners++; } } @@ -181,10 +181,10 @@ bool ScannerContext::empty_in_queue(int id) { return _blocks_queue.empty(); } -void ScannerContext::submit_scan_task(std::shared_ptr scan_task) { +Status ScannerContext::submit_scan_task(std::shared_ptr scan_task) { _scanner_sched_counter->update(1); _num_scheduled_scanners++; - _scanner_scheduler->submit(shared_from_this(), scan_task); + return _scanner_scheduler->submit(shared_from_this(), scan_task); } void ScannerContext::append_block_to_queue(std::shared_ptr scan_task) { @@ -232,10 +232,15 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo auto scan_task = _blocks_queue.front(); DCHECK(scan_task); + // The abnormal status of scanner may come from the execution of the scanner itself, + // or come from the scanner scheduler, such as TooManyTasks. if (!scan_task->status_ok()) { + // TODO: If the scanner status is TooManyTasks, maybe we can retry the scanner after a while. + _process_status = scan_task->get_status(); _set_scanner_done(); - return scan_task->get_status(); + return _process_status; } + if (!scan_task->cached_blocks.empty()) { auto [current_block, block_size] = std::move(scan_task->cached_blocks.front()); scan_task->cached_blocks.pop_front(); @@ -248,13 +253,20 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo block->swap(*current_block); return_free_block(std::move(current_block)); } else { + // This scan task do not have any cached blocks. _blocks_queue.pop_front(); - if (scan_task->is_eos()) { // current scanner is finished, and no more data to read + // current scanner is finished, and no more data to read + if (scan_task->is_eos()) { _num_finished_scanners++; std::weak_ptr next_scanner; // submit one of the remaining scanners if (_scanners.try_dequeue(next_scanner)) { - submit_scan_task(std::make_shared(next_scanner)); + auto submit_status = submit_scan_task(std::make_shared(next_scanner)); + if (!submit_status.ok()) { + _process_status = submit_status; + _set_scanner_done(); + return _process_status; + } } else { // no more scanner to be scheduled // `_free_blocks` serve all running scanners, maybe it's too large for the remaining scanners @@ -270,11 +282,16 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo } } else { // resubmit current running scanner to read the next block - submit_scan_task(scan_task); + Status submit_status = submit_scan_task(scan_task); + if (!submit_status.ok()) { + _process_status = submit_status; + _set_scanner_done(); + return _process_status; + } } } // scale up - _try_to_scale_up(); + RETURN_IF_ERROR(_try_to_scale_up()); } if (_num_finished_scanners == _all_scanners.size() && _blocks_queue.empty()) { @@ -289,7 +306,7 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo return Status::OK(); } -void ScannerContext::_try_to_scale_up() { +Status ScannerContext::_try_to_scale_up() { // Four criteria to determine whether to increase the parallelism of the scanners // 1. It ran for at least `SCALE_UP_DURATION` ms after last scale up // 2. Half(`WAIT_BLOCK_DURATION_RATIO`) of the duration is waiting to get blocks @@ -306,7 +323,7 @@ void ScannerContext::_try_to_scale_up() { // when _last_wait_duration_ratio > 0, it has scaled up before. // we need to determine if the scale-up is effective: // the wait duration ratio after last scaling up should less than 80% of `_last_wait_duration_ratio` - return; + return Status::OK(); } bool is_scale_up = false; @@ -322,7 +339,10 @@ void ScannerContext::_try_to_scale_up() { // get enough memory to launch one more scanner. std::weak_ptr scale_up_scanner; if (_scanners.try_dequeue(scale_up_scanner)) { - submit_scan_task(std::make_shared(scale_up_scanner)); + // Just return error to caller. + // Because _try_to_scale_up is called under _transfer_lock locked, if we add the scanner + // to the block queue, we will get a deadlock. + RETURN_IF_ERROR(submit_scan_task(std::make_shared(scale_up_scanner))); _num_running_scanners++; _scale_up_scanners_counter->update(1); is_scale_up = true; @@ -337,6 +357,8 @@ void ScannerContext::_try_to_scale_up() { _total_wait_block_time = 0; } } + + return Status::OK(); } Status ScannerContext::validate_block_schema(Block* block) { diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h index d97fc731fe5067..972ec3a6b30656 100644 --- a/be/src/vec/exec/scan/scanner_context.h +++ b/be/src/vec/exec/scan/scanner_context.h @@ -137,7 +137,7 @@ class ScannerContext : public std::enable_shared_from_this, // set the next scanned block to `ScanTask::current_block` // set the error state to `ScanTask::status` // set the `eos` to `ScanTask::eos` if there is no more data in current scanner - void submit_scan_task(std::shared_ptr scan_task); + Status submit_scan_task(std::shared_ptr scan_task); // append the running scanner and its cached block to `_blocks_queue` void append_block_to_queue(std::shared_ptr scan_task); @@ -184,7 +184,7 @@ class ScannerContext : public std::enable_shared_from_this, /// 3. `_free_blocks_memory_usage` < `_max_bytes_in_queue`, remains enough memory to scale up /// 4. At most scale up `MAX_SCALE_UP_RATIO` times to `_max_thread_num` void _set_scanner_done(); - void _try_to_scale_up(); + Status _try_to_scale_up(); RuntimeState* _state = nullptr; pipeline::ScanLocalStateBase* _local_state = nullptr; diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp b/be/src/vec/exec/scan/scanner_scheduler.cpp index 351f5d4e275074..7f868fba5a666e 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.cpp +++ b/be/src/vec/exec/scan/scanner_scheduler.cpp @@ -119,23 +119,23 @@ Status ScannerScheduler::init(ExecEnv* env) { return Status::OK(); } -void ScannerScheduler::submit(std::shared_ptr ctx, - std::shared_ptr scan_task) { +Status ScannerScheduler::submit(std::shared_ptr ctx, + std::shared_ptr scan_task) { scan_task->last_submit_time = GetCurrentTimeNanos(); if (ctx->done()) { - return; + return Status::OK(); } auto task_lock = ctx->task_exec_ctx(); if (task_lock == nullptr) { LOG(INFO) << "could not lock task execution context, query " << ctx->debug_string() << " maybe finished"; - return; + return Status::OK(); } if (ctx->thread_token != nullptr) { std::shared_ptr scanner_delegate = scan_task->scanner.lock(); if (scanner_delegate == nullptr) { - return; + return Status::OK(); } scanner_delegate->_scanner->start_wait_worker_timer(); @@ -152,13 +152,12 @@ void ScannerScheduler::submit(std::shared_ptr ctx, }); if (!s.ok()) { scan_task->set_status(s); - ctx->append_block_to_queue(scan_task); - return; + return s; } } else { std::shared_ptr scanner_delegate = scan_task->scanner.lock(); if (scanner_delegate == nullptr) { - return; + return Status::OK(); } scanner_delegate->_scanner->start_wait_worker_timer(); @@ -186,14 +185,18 @@ void ScannerScheduler::submit(std::shared_ptr ctx, return scan_sched->submit_scan_task(simple_scan_task); }; - if (auto ret = sumbit_task(); !ret) { - scan_task->set_status(Status::InternalError( - "Failed to submit scanner to scanner pool reason:" + std::string(ret.msg()) + - "|type:" + std::to_string(type))); - ctx->append_block_to_queue(scan_task); - return; + Status submit_status = sumbit_task(); + if (!submit_status.ok()) { + // User will see TooManyTasks error. It looks like a more reasonable error. + Status scan_task_status = Status::TooManyTasks( + "Failed to submit scanner to scanner pool reason:" + + std::string(submit_status.msg()) + "|type:" + std::to_string(type)); + scan_task->set_status(scan_task_status); + return scan_task_status; } } + + return Status::OK(); } std::unique_ptr ScannerScheduler::new_limited_scan_pool_token( diff --git a/be/src/vec/exec/scan/scanner_scheduler.h b/be/src/vec/exec/scan/scanner_scheduler.h index ddc61396e23f15..439291f2107185 100644 --- a/be/src/vec/exec/scan/scanner_scheduler.h +++ b/be/src/vec/exec/scan/scanner_scheduler.h @@ -57,7 +57,7 @@ class ScannerScheduler { [[nodiscard]] Status init(ExecEnv* env); - void submit(std::shared_ptr ctx, std::shared_ptr scan_task); + Status submit(std::shared_ptr ctx, std::shared_ptr scan_task); void stop();