Skip to content

Commit

Permalink
[fix](Nereids) mow with sync mv could not be deleted (apache#39578)
Browse files Browse the repository at this point in the history
before this PR will throw exception: Unknown column
  • Loading branch information
morrySnow authored Aug 22, 2024
1 parent 729f4bf commit e1a27f2
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -401,10 +401,17 @@ public LogicalPlan completeQueryPlan(ConnectContext ctx, LogicalPlan logicalQuer
boolean isMow = targetTable.getEnableUniqueKeyMergeOnWrite();
String tableName = tableAlias != null ? tableAlias : targetTable.getName();
boolean hasClusterKey = targetTable.getBaseSchema().stream().anyMatch(Column::isClusterKey);
boolean hasSyncMaterializedView = false;
// currently cluster key doesn't support partial update, so we can't convert
// a delete stmt to partial update load if the table has cluster key
for (Column column : targetTable.getFullSchema()) {
NamedExpression expr = null;
if (column.isMaterializedViewColumn()) {
hasSyncMaterializedView = true;
break;
}
}
for (Column column : targetTable.getBaseSchema(true)) {
NamedExpression expr;
if (column.getName().equalsIgnoreCase(Column.DELETE_SIGN)) {
expr = new UnboundAlias(new TinyIntLiteral(((byte) 1)), Column.DELETE_SIGN);
} else if (column.getName().equalsIgnoreCase(Column.SEQUENCE_COL)
Expand All @@ -414,7 +421,7 @@ public LogicalPlan completeQueryPlan(ConnectContext ctx, LogicalPlan logicalQuer
expr = new UnboundSlot(tableName, column.getName());
} else if (!isMow && (!column.isVisible() || (!column.isAllowNull() && !column.hasDefaultValue()))) {
expr = new UnboundSlot(tableName, column.getName());
} else if (hasClusterKey) {
} else if (hasClusterKey || hasSyncMaterializedView) {
expr = new UnboundSlot(tableName, column.getName());
} else {
continue;
Expand All @@ -425,7 +432,8 @@ public LogicalPlan completeQueryPlan(ConnectContext ctx, LogicalPlan logicalQuer

logicalQuery = new LogicalProject<>(selectLists, logicalQuery);

boolean isPartialUpdate = isMow && !hasClusterKey && cols.size() < targetTable.getColumns().size();
boolean isPartialUpdate = isMow && !hasClusterKey && !hasSyncMaterializedView
&& cols.size() < targetTable.getColumns().size();
logicalQuery = handleCte(logicalQuery);
// make UnboundTableSink
return UnboundTableSinkCreator.createUnboundTableSink(nameParts, cols, ImmutableList.of(),
Expand Down
76 changes: 76 additions & 0 deletions regression-test/suites/delete_p0/test_delete_with_sync_mv.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_delete_with_sync_mv") {

sql """drop table if exists test_delete_with_sync_mv"""

sql """
CREATE TABLE `test_delete_with_sync_mv` (
`l_orderkey` BIGINT NULL,
`l_linenumber` INT NULL,
`l_partkey` INT NULL,
`l_suppkey` INT NULL,
`l_shipdate` DATE not NULL,
`l_quantity` DECIMAL(15, 2) NULL,
`l_extendedprice` DECIMAL(15, 2) NULL,
`l_discount` DECIMAL(15, 2) NULL,
`l_tax` DECIMAL(15, 2) NULL,
`l_returnflag` VARCHAR(1) NULL,
`l_linestatus` VARCHAR(1) NULL,
`l_commitdate` DATE NULL,
`l_receiptdate` DATE NULL,
`l_shipinstruct` VARCHAR(25) NULL,
`l_shipmode` VARCHAR(10) NULL,
`l_comment` VARCHAR(44) NULL
)
unique KEY(l_orderkey, l_linenumber, l_partkey, l_suppkey, l_shipdate)
DISTRIBUTED BY HASH(`l_orderkey`) BUCKETS 96
PROPERTIES (
"replication_num" = "1",
"enable_unique_key_merge_on_write" = "true"
);
"""

sql """
insert into test_delete_with_sync_mv values
(null, 1, 2, 3, '2023-10-17', 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-17', '2023-10-17', 'a', 'b', 'yyyyyyyyy'),
(1, null, 3, 1, '2023-10-17', 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-18', '2023-10-18', 'a', 'b', 'yyyyyyyyy'),
(3, 3, null, 2, '2023-10-19', 7.5, 8.5, 9.5, 10.5, 'k', 'o', '2023-10-19', '2023-10-19', 'c', 'd', 'xxxxxxxxx'),
(1, 2, 3, null, '2023-10-17', 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-17', '2023-10-17', 'a', 'b', 'yyyyyyyyy'),
(2, 3, 2, 1, '2023-10-18', 5.5, 6.5, 7.5, 8.5, 'o', 'k', null, '2023-10-18', 'a', 'b', 'yyyyyyyyy'),
(3, 1, 1, 2, '2023-10-19', 7.5, 8.5, 9.5, 10.5, 'k', 'o', '2023-10-19', null, 'c', 'd', 'xxxxxxxxx'),
(1, 3, 2, 2, '2023-10-17', 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-17', '2023-10-17', 'a', 'b', 'yyyyyyyyy'),
(null, 1, 2, 3, '2023-10-17', 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-17', '2023-10-17', 'a', 'b', 'yyyyyyyyy'),
(1, null, 3, 1, '2023-10-17', 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-18', '2023-10-18', 'a', 'b', 'yyyyyyyyy'),
(3, 3, null, 2, '2023-10-19', 7.5, 8.5, 9.5, 10.5, 'k', 'o', '2023-10-19', '2023-10-19', 'c', 'd', 'xxxxxxxxx'),
(1, 2, 3, null, '2023-10-17', 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-17', '2023-10-17', 'a', 'b', 'yyyyyyyyy'),
(2, 3, 2, 1, '2023-10-18', 5.5, 6.5, 7.5, 8.5, 'o', 'k', null, '2023-10-18', 'a', 'b', 'yyyyyyyyy'),
(3, 1, 1, 2, '2023-10-19', 7.5, 8.5, 9.5, 10.5, 'k', 'o', '2023-10-19', null, 'c', 'd', 'xxxxxxxxx'),
(1, 3, 2, 2, '2023-10-17', 5.5, 6.5, 7.5, 8.5, 'o', 'k', '2023-10-17', '2023-10-17', 'a', 'b', 'yyyyyyyyy')
"""

createMV ("""
CREATE MATERIALIZED VIEW mv
AS
select l_orderkey, l_linenumber, l_partkey, l_suppkey, l_shipdate,
substring(concat(l_returnflag, l_linestatus), 1)
from test_delete_with_sync_mv;
""")

sql """delete from test_delete_with_sync_mv where l_orderkey = 2"""
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,15 @@ suite("load_four_step") {
for (int i = 1; i <= 5; i++) {
def loadRowCount = sql "select count(1) from ${tableName}"
logger.info("select ${tableName} numbers: ${loadRowCount[0][0]}".toString())
assertTrue(loadRowCount[0][0] == rows[1])
assertEquals(rows[1], loadRowCount[0][0])
}
}
sql """ set delete_without_partition = true; """
sql new File("""${context.file.parentFile.parent}/ddl/${tableName}_part_delete.sql""").text
for (int i = 1; i <= 5; i++) {
def loadRowCount = sql "select count(1) from ${tableName}"
logger.info("select ${tableName} numbers: ${loadRowCount[0][0]}".toString())
assertTrue(loadRowCount[0][0] == rows[3])
assertEquals(rows[3], loadRowCount[0][0])
}
streamLoad {
table tableName
Expand Down Expand Up @@ -105,7 +105,7 @@ suite("load_four_step") {
for (int i = 1; i <= 5; i++) {
def loadRowCount = sql "select count(1) from ${tableName}"
logger.info("select ${tableName} numbers: ${loadRowCount[0][0]}".toString())
assertTrue(loadRowCount[0][0] == rows[1])
assertEquals(rows[1], loadRowCount[0][0])
}
}
}

0 comments on commit e1a27f2

Please sign in to comment.