Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](scanner) Fix deadlock when scanner submit failed #40495

Merged
merged 4 commits into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 33 additions & 11 deletions be/src/vec/exec/scan/scanner_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ Status ScannerContext::init() {
for (int i = 0; i < _max_thread_num; ++i) {
std::weak_ptr<ScannerDelegate> next_scanner;
if (_scanners.try_dequeue(next_scanner)) {
submit_scan_task(std::make_shared<ScanTask>(next_scanner));
RETURN_IF_ERROR(submit_scan_task(std::make_shared<ScanTask>(next_scanner)));
_num_running_scanners++;
}
}
Expand Down Expand Up @@ -196,10 +196,10 @@ bool ScannerContext::empty_in_queue(int id) {
return _blocks_queue.empty();
}

void ScannerContext::submit_scan_task(std::shared_ptr<ScanTask> scan_task) {
Status ScannerContext::submit_scan_task(std::shared_ptr<ScanTask> scan_task) {
_scanner_sched_counter->update(1);
_num_scheduled_scanners++;
_scanner_scheduler->submit(shared_from_this(), scan_task);
return _scanner_scheduler->submit(shared_from_this(), scan_task);
}

void ScannerContext::append_block_to_queue(std::shared_ptr<ScanTask> scan_task) {
Expand Down Expand Up @@ -247,10 +247,15 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo
auto scan_task = _blocks_queue.front();
DCHECK(scan_task);

// The abnormal status of scanner may come from the execution of the scanner itself,
// or come from the scanner scheduler, such as TooManyTasks.
if (!scan_task->status_ok()) {
// TODO: If the scanner status is TooManyTasks, maybe we can retry the scanner after a while.
_process_status = scan_task->get_status();
_set_scanner_done();
return scan_task->get_status();
return _process_status;
}

if (!scan_task->cached_blocks.empty()) {
auto [current_block, block_size] = std::move(scan_task->cached_blocks.front());
scan_task->cached_blocks.pop_front();
Expand All @@ -263,13 +268,20 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo
block->swap(*current_block);
return_free_block(std::move(current_block));
} else {
// This scan task do not have any cached blocks.
_blocks_queue.pop_front();
if (scan_task->is_eos()) { // current scanner is finished, and no more data to read
// current scanner is finished, and no more data to read
if (scan_task->is_eos()) {
_num_finished_scanners++;
std::weak_ptr<ScannerDelegate> next_scanner;
// submit one of the remaining scanners
if (_scanners.try_dequeue(next_scanner)) {
submit_scan_task(std::make_shared<ScanTask>(next_scanner));
auto submit_status = submit_scan_task(std::make_shared<ScanTask>(next_scanner));
if (!submit_status.ok()) {
_process_status = submit_status;
_set_scanner_done();
return _process_status;
}
} else {
// no more scanner to be scheduled
// `_free_blocks` serve all running scanners, maybe it's too large for the remaining scanners
Expand All @@ -284,11 +296,16 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo
}
} else {
// resubmit current running scanner to read the next block
submit_scan_task(scan_task);
Status submit_status = submit_scan_task(scan_task);
if (!submit_status.ok()) {
_process_status = submit_status;
_set_scanner_done();
return _process_status;
}
}
}
// scale up
_try_to_scale_up();
RETURN_IF_ERROR(_try_to_scale_up());
}

if (_num_finished_scanners == _all_scanners.size() && _blocks_queue.empty()) {
Expand All @@ -303,7 +320,7 @@ Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Blo
return Status::OK();
}

void ScannerContext::_try_to_scale_up() {
Status ScannerContext::_try_to_scale_up() {
// Four criteria to determine whether to increase the parallelism of the scanners
// 1. It ran for at least `SCALE_UP_DURATION` ms after last scale up
// 2. Half(`WAIT_BLOCK_DURATION_RATIO`) of the duration is waiting to get blocks
Expand All @@ -320,7 +337,7 @@ void ScannerContext::_try_to_scale_up() {
// when _last_wait_duration_ratio > 0, it has scaled up before.
// we need to determine if the scale-up is effective:
// the wait duration ratio after last scaling up should less than 80% of `_last_wait_duration_ratio`
return;
return Status::OK();
}

bool is_scale_up = false;
Expand All @@ -335,7 +352,10 @@ void ScannerContext::_try_to_scale_up() {
// get enough memory to launch one more scanner.
std::weak_ptr<ScannerDelegate> scale_up_scanner;
if (_scanners.try_dequeue(scale_up_scanner)) {
submit_scan_task(std::make_shared<ScanTask>(scale_up_scanner));
// Just return error to caller.
// Because _try_to_scale_up is called under _transfer_lock locked, if we add the scanner
// to the block queue, we will get a deadlock.
RETURN_IF_ERROR(submit_scan_task(std::make_shared<ScanTask>(scale_up_scanner)));
_num_running_scanners++;
_scale_up_scanners_counter->update(1);
is_scale_up = true;
Expand All @@ -350,6 +370,8 @@ void ScannerContext::_try_to_scale_up() {
_total_wait_block_time = 0;
}
}

return Status::OK();
}

Status ScannerContext::validate_block_schema(Block* block) {
Expand Down
4 changes: 2 additions & 2 deletions be/src/vec/exec/scan/scanner_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ class ScannerContext : public std::enable_shared_from_this<ScannerContext>,
// set the next scanned block to `ScanTask::current_block`
// set the error state to `ScanTask::status`
// set the `eos` to `ScanTask::eos` if there is no more data in current scanner
void submit_scan_task(std::shared_ptr<ScanTask> scan_task);
Status submit_scan_task(std::shared_ptr<ScanTask> scan_task);

// append the running scanner and its cached block to `_blocks_queue`
void append_block_to_queue(std::shared_ptr<ScanTask> scan_task);
Expand Down Expand Up @@ -186,7 +186,7 @@ class ScannerContext : public std::enable_shared_from_this<ScannerContext>,
/// 3. `_free_blocks_memory_usage` < `_max_bytes_in_queue`, remains enough memory to scale up
/// 4. At most scale up `MAX_SCALE_UP_RATIO` times to `_max_thread_num`
void _set_scanner_done();
void _try_to_scale_up();
Status _try_to_scale_up();

RuntimeState* _state = nullptr;
pipeline::ScanLocalStateBase* _local_state = nullptr;
Expand Down
31 changes: 17 additions & 14 deletions be/src/vec/exec/scan/scanner_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,23 +120,23 @@ Status ScannerScheduler::init(ExecEnv* env) {
return Status::OK();
}

void ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
std::shared_ptr<ScanTask> scan_task) {
Status ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
std::shared_ptr<ScanTask> scan_task) {
scan_task->last_submit_time = GetCurrentTimeNanos();
if (ctx->done()) {
return;
return Status::OK();
}
auto task_lock = ctx->task_exec_ctx();
if (task_lock == nullptr) {
LOG(INFO) << "could not lock task execution context, query " << ctx->debug_string()
<< " maybe finished";
return;
return Status::OK();
}

if (ctx->thread_token != nullptr) {
std::shared_ptr<ScannerDelegate> scanner_delegate = scan_task->scanner.lock();
if (scanner_delegate == nullptr) {
return;
return Status::OK();
}

scanner_delegate->_scanner->start_wait_worker_timer();
Expand All @@ -153,13 +153,12 @@ void ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
});
if (!s.ok()) {
scan_task->set_status(s);
ctx->append_block_to_queue(scan_task);
return;
return s;
}
} else {
std::shared_ptr<ScannerDelegate> scanner_delegate = scan_task->scanner.lock();
if (scanner_delegate == nullptr) {
return;
return Status::OK();
}

scanner_delegate->_scanner->start_wait_worker_timer();
Expand Down Expand Up @@ -187,14 +186,18 @@ void ScannerScheduler::submit(std::shared_ptr<ScannerContext> ctx,
return scan_sched->submit_scan_task(simple_scan_task);
};

if (auto ret = sumbit_task(); !ret) {
scan_task->set_status(Status::InternalError(
"Failed to submit scanner to scanner pool reason:" + std::string(ret.msg()) +
"|type:" + std::to_string(type)));
ctx->append_block_to_queue(scan_task);
return;
Status submit_status = sumbit_task();
if (!submit_status.ok()) {
// User will see TooManyTasks error. It looks like a more reasonable error.
Status scan_task_status = Status::TooManyTasks(
"Failed to submit scanner to scanner pool reason:" +
std::string(submit_status.msg()) + "|type:" + std::to_string(type));
scan_task->set_status(scan_task_status);
return scan_task_status;
}
}

return Status::OK();
}

std::unique_ptr<ThreadPoolToken> ScannerScheduler::new_limited_scan_pool_token(
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/exec/scan/scanner_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class ScannerScheduler {

[[nodiscard]] Status init(ExecEnv* env);

void submit(std::shared_ptr<ScannerContext> ctx, std::shared_ptr<ScanTask> scan_task);
Status submit(std::shared_ptr<ScannerContext> ctx, std::shared_ptr<ScanTask> scan_task);

void stop();

Expand Down
Loading