Skip to content

Commit

Permalink
[fix](pipeline) sort_merge should throw exception in has_next_block i…
Browse files Browse the repository at this point in the history
…f got failed status (#29076)

Test in regression-test/suites/datatype_p0/decimalv3/test_decimalv3_overflow.groovy::249 sometimes failed when there are multiple BEs and FE process report status slowly for some reason.

explain select k1, k2, k1 * k2 from test_decimal128_overflow2 order by 1,2,3
--------------

+----------------------------------------------------------------------------------------------------------------------------+
| Explain String(Nereids Planner)                                                                                            |
+----------------------------------------------------------------------------------------------------------------------------+
| PLAN FRAGMENT 0                                                                                                            |
|   OUTPUT EXPRS:                                                                                                            |
|     k1[#5]                                                                                                                 |
|     k2[#6]                                                                                                                 |
|     (k1 * k2)[#7]                                                                                                          |
|   PARTITION: UNPARTITIONED                                                                                                 |
|                                                                                                                            |
|   HAS_COLO_PLAN_NODE: false                                                                                                |
|                                                                                                                            |
|   VRESULT SINK                                                                                                             |
|      MYSQL_PROTOCAL                                                                                                        |
|                                                                                                                            |
|   111:VMERGING-EXCHANGE                                                                                                    |
|      offset: 0                                                                                                             |
|                                                                                                                            |
| PLAN FRAGMENT 1                                                                                                            |
|                                                                                                                            |
|   PARTITION: HASH_PARTITIONED: k1[#0], k2[#1]                                                                              |
|                                                                                                                            |
|   HAS_COLO_PLAN_NODE: false                                                                                                |
|                                                                                                                            |
|   STREAM DATA SINK                                                                                                         |
|     EXCHANGE ID: 111                                                                                                       |
|     UNPARTITIONED                                                                                                          |
|                                                                                                                            |
|   108:VSORT                                                                                                                |
|   |  order by: k1[#5] ASC, k2[#6] ASC, (k1 * k2)[#7] ASC                                                                   |
|   |  offset: 0                                                                                                             |
|   |                                                                                                                        |
|   102:VOlapScanNode                                                                                                        |
|      TABLE: regression_test_datatype_p0_decimalv3.test_decimal128_overflow2(test_decimal128_overflow2), PREAGGREGATION: ON |
|      partitions=1/1 (test_decimal128_overflow2), tablets=8/8, tabletList=22841,22843,22845 ...                             |
|      cardinality=6, avgRowSize=0.0, numNodes=1                                                                             |
|      pushAggOp=NONE                                                                                                        |
|      projections: k1[#0], k2[#1], (k1[#0] * k2[#1])                                                                        |
|      project output tuple id: 1                                                                                            |
+----------------------------------------------------------------------------------------------------------------------------+
36 rows in set (0.03 sec)
Why failed:

Multiple BEs
Fragments 0 and 1 are MUST on different BEs
Pipeline task of VOlapScanNode which executes k1*k2 failed sets query status to cancelled
Pipeline task of VSort call try close, send Cancelled status to VMergeExchange
sort_curso did not throw exception when it meets error
  • Loading branch information
zhiqiang-hhhh authored Dec 27, 2023
1 parent a4e69f7 commit 6d26aca
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 5 deletions.
2 changes: 2 additions & 0 deletions be/src/vec/core/sort_cursor.h
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,8 @@ struct BlockSupplierSortCursorImpl : public MergeSortCursorImpl {
}
MergeSortCursorImpl::reset(_block);
return status.ok();
} else if (!status.ok()) {
throw std::runtime_error(status.msg());
}
return false;
}
Expand Down
14 changes: 9 additions & 5 deletions be/src/vec/runtime/vsorted_run_merger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,16 @@ void VSortedRunMerger::init_timers(RuntimeProfile* profile) {
}

Status VSortedRunMerger::prepare(const vector<BlockSupplier>& input_runs) {
for (const auto& supplier : input_runs) {
if (_use_sort_desc) {
_cursors.emplace_back(supplier, _desc);
} else {
_cursors.emplace_back(supplier, _ordering_expr, _is_asc_order, _nulls_first);
try {
for (const auto& supplier : input_runs) {
if (_use_sort_desc) {
_cursors.emplace_back(supplier, _desc);
} else {
_cursors.emplace_back(supplier, _ordering_expr, _is_asc_order, _nulls_first);
}
}
} catch (const std::exception& e) {
return Status::Cancelled(e.what());
}

for (auto& _cursor : _cursors) {
Expand Down

0 comments on commit 6d26aca

Please sign in to comment.