From c32d29690aff21674fe314c68cc7f9510664d407 Mon Sep 17 00:00:00 2001 From: 924060929 <924060929@qq.com> Date: Tue, 30 Jul 2024 16:17:02 +0800 Subject: [PATCH 1/3] fix --- .../commands/insert/OlapInsertExecutor.java | 24 ++++++++++++++- .../suites/insert_p0/insert.groovy | 29 +++++++++++++++++++ 2 files changed, 52 insertions(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java index 0153700863d3be..43a3327a378884 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapInsertExecutor.java @@ -40,6 +40,7 @@ import org.apache.doris.planner.DataSink; import org.apache.doris.planner.DataStreamSink; import org.apache.doris.planner.ExchangeNode; +import org.apache.doris.planner.MultiCastDataSink; import org.apache.doris.planner.OlapTableSink; import org.apache.doris.planner.PlanFragment; import org.apache.doris.qe.ConnectContext; @@ -139,7 +140,28 @@ public void finalizeSink(PlanFragment fragment, DataSink sink, PhysicalSink phys // set schema and partition info for tablet id shuffle exchange if (fragment.getPlanRoot() instanceof ExchangeNode && fragment.getDataPartition().getType() == TPartitionType.TABLET_SINK_SHUFFLE_PARTITIONED) { - DataStreamSink dataStreamSink = (DataStreamSink) (fragment.getChild(0).getSink()); + DataSink childFragmentSink = fragment.getChild(0).getSink(); + DataStreamSink dataStreamSink = null; + if (childFragmentSink instanceof MultiCastDataSink) { + MultiCastDataSink multiCastDataSink = (MultiCastDataSink) childFragmentSink; + int outputExchangeId = (fragment.getPlanRoot()).getId().asInt(); + // which DataStreamSink link to the output exchangeNode? + for (DataStreamSink currentDataStreamSink : multiCastDataSink.getDataStreamSinks()) { + int sinkExchangeId = currentDataStreamSink.getExchNodeId().asInt(); + if (outputExchangeId == sinkExchangeId) { + dataStreamSink = currentDataStreamSink; + break; + } + } + if (dataStreamSink == null) { + throw new IllegalStateException("Can not find DataStreamSink in the MultiCastDataSink"); + } + } else if (childFragmentSink instanceof DataStreamSink) { + dataStreamSink = (DataStreamSink) childFragmentSink; + } else { + throw new IllegalStateException("Unsupported DataSink: " + childFragmentSink); + } + Analyzer analyzer = new Analyzer(Env.getCurrentEnv(), ConnectContext.get()); dataStreamSink.setTabletSinkSchemaParam(olapTableSink.createSchema( database.getId(), olapTableSink.getDstTable(), analyzer)); diff --git a/regression-test/suites/insert_p0/insert.groovy b/regression-test/suites/insert_p0/insert.groovy index 573d5d8366c6d6..83a1a472781c3d 100644 --- a/regression-test/suites/insert_p0/insert.groovy +++ b/regression-test/suites/insert_p0/insert.groovy @@ -83,4 +83,33 @@ suite("insert") { sql "sync" qt_insert """ select * from mutable_datatype order by c_bigint, c_double, c_string, c_date, c_timestamp, c_boolean, c_short_decimal""" + + multi_sql """ + drop table if exists table_select_test1; + CREATE TABLE table_select_test1 ( + `id` int + ) + distributed by hash(id) + properties('replication_num'='1'); + + insert into table_select_test1 values(2); + + drop table if exists table_test_insert1; + create table table_test_insert1 (id int) + partition by range(id) + ( + partition p1 values[('1'), ('50')), + partition p2 values[('50'), ('100')) + ) + distributed by hash(id) buckets 100 + properties('replication_num'='1') + + insert into table_test_insert1 values(1), (10); + + insert into table_test_insert1 + with + a as (select * from table_select_test1 where id > 10), + b as (select * from a union all select * from a) + select id from b; + """ } From efcfa9d7490243bfd2ee51cccebc723a45a0ae44 Mon Sep 17 00:00:00 2001 From: 924060929 <924060929@qq.com> Date: Tue, 30 Jul 2024 16:28:36 +0800 Subject: [PATCH 2/3] fix --- .../suites/insert_p0/insert.groovy | 132 +++++++++--------- 1 file changed, 66 insertions(+), 66 deletions(-) diff --git a/regression-test/suites/insert_p0/insert.groovy b/regression-test/suites/insert_p0/insert.groovy index 83a1a472781c3d..92bbb17b9324d7 100644 --- a/regression-test/suites/insert_p0/insert.groovy +++ b/regression-test/suites/insert_p0/insert.groovy @@ -20,68 +20,68 @@ // /testing/trino-product-tests/src/main/resources/sql-tests/testcases // and modified by Doris. suite("insert") { - def tables=["datatype", "mutable_datatype"] - - for (String table in tables) { - sql """ DROP TABLE IF EXISTS $table """ - } - - for (String table in tables) { - sql new File("""${context.file.parent}/ddl/${table}.sql""").text - } - - streamLoad { - // you can skip declare db, because a default db already specify in ${DORIS_HOME}/conf/regression-conf.groovy - // db 'regression_test' - table 'datatype' - - // default label is UUID: - // set 'label' UUID.randomUUID().toString() - - // default column_separator is specify in doris fe config, usually is '\t'. - // this line change to ',' - set 'column_separator', '|' - - // relate to ${DORIS_HOME}/regression-test/data/demo/streamload_input.csv. - // also, you can stream load a http stream, e.g. http://xxx/some.csv - file 'datetype.csv' - - time 10000 // limit inflight 10s - - // stream load action will check result, include Success status, and NumberTotalRows == NumberLoadedRows - - // if declared a check callback, the default check condition will ignore. - // So you must check all condition - check { result, exception, startTime, endTime -> - if (exception != null) { - throw exception - } - log.info("Stream load result: ${result}".toString()) - def json = parseJson(result) - assertEquals("success", json.Status.toLowerCase()) - assertEquals(json.NumberTotalRows, json.NumberLoadedRows) - assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0) - } - } - - sql """ insert into mutable_datatype select c_bigint, c_double, c_string, c_date, c_timestamp, c_boolean, c_short_decimal, c_long_decimal from datatype where c_double < 20 """ - sql """ insert into mutable_datatype select 1, c_double, 'abc', cast('2014-01-01' as date), c_timestamp, FALSE, '123.22', '123456789012345678.012345678' from datatype """ - sql """ insert into mutable_datatype select 1, cast(2.2 as double), 'abc', cast('2014-01-01' as date), cast('2015-01-01 03:15:16' as datetime), false, '123.22', '123456789012345678.012345678' from datatype """ - sql """ insert into mutable_datatype select 1, cast(2.1 as double), 'abc', cast('2014-01-01' as date), cast('2015-01-01 03:15:16' as datetime), FALSE, '123.22', '123456789012345678.012345678' """ - sql """ insert into mutable_datatype values (null, null, null, null, null, null, null, null) """ - sql """ insert into mutable_datatype select count(*), cast(1.1 as double), 'a', cast('2016-01-01' as date), cast('2015-01-01 03:15:16' as datetime), FALSE, '-123.22', '-123456789012345678.012345678' from datatype group by c_bigint """ - sql """ insert into mutable_datatype select 5 * c_bigint, c_double + 15, c_string, c_date, c_timestamp, c_boolean, cast((c_short_decimal / '2.00') as decimal(5,2)), cast((c_long_decimal % '10') as decimal(27,9)) from datatype """ - sql """ insert into mutable_datatype select * from datatype """ - sql """ insert into mutable_datatype select * from mutable_datatype """ - sql """ insert into mutable_datatype select * from datatype union all select * from datatype """ - sql """ insert into mutable_datatype select * from datatype order by 1 limit 2 """ - sql """ insert into mutable_datatype select * from datatype where c_bigint < 0 """ - sql """ insert into mutable_datatype values(1,cast(2.34567 as double),'a',cast('2014-01-01' as date), cast ('2015-01-01 03:15:16' as datetime), TRUE, '123.22', '123456789012345678.012345678') """ - sql """ insert into mutable_datatype values(1, cast(2.1 as double), 'abc', cast('2014-01-01' as date), cast('2015-01-01 03:15:16' as datetime), FALSE, '-123.22', '-123456789012345678.012345678') """ - sql """ insert into mutable_datatype values(5 * 10, cast(4.1 + 5 as double), 'abc', cast('2014-01-01' as date), cast('2015-01-01 03:15:16' as datetime), TRUE, '123.22', '123456789012345678.012345678') """ - - sql "sync" - qt_insert """ select * from mutable_datatype order by c_bigint, c_double, c_string, c_date, c_timestamp, c_boolean, c_short_decimal""" +// def tables=["datatype", "mutable_datatype"] +// +// for (String table in tables) { +// sql """ DROP TABLE IF EXISTS $table """ +// } +// +// for (String table in tables) { +// sql new File("""${context.file.parent}/ddl/${table}.sql""").text +// } +// +// streamLoad { +// // you can skip declare db, because a default db already specify in ${DORIS_HOME}/conf/regression-conf.groovy +// // db 'regression_test' +// table 'datatype' +// +// // default label is UUID: +// // set 'label' UUID.randomUUID().toString() +// +// // default column_separator is specify in doris fe config, usually is '\t'. +// // this line change to ',' +// set 'column_separator', '|' +// +// // relate to ${DORIS_HOME}/regression-test/data/demo/streamload_input.csv. +// // also, you can stream load a http stream, e.g. http://xxx/some.csv +// file 'datetype.csv' +// +// time 10000 // limit inflight 10s +// +// // stream load action will check result, include Success status, and NumberTotalRows == NumberLoadedRows +// +// // if declared a check callback, the default check condition will ignore. +// // So you must check all condition +// check { result, exception, startTime, endTime -> +// if (exception != null) { +// throw exception +// } +// log.info("Stream load result: ${result}".toString()) +// def json = parseJson(result) +// assertEquals("success", json.Status.toLowerCase()) +// assertEquals(json.NumberTotalRows, json.NumberLoadedRows) +// assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0) +// } +// } +// +// sql """ insert into mutable_datatype select c_bigint, c_double, c_string, c_date, c_timestamp, c_boolean, c_short_decimal, c_long_decimal from datatype where c_double < 20 """ +// sql """ insert into mutable_datatype select 1, c_double, 'abc', cast('2014-01-01' as date), c_timestamp, FALSE, '123.22', '123456789012345678.012345678' from datatype """ +// sql """ insert into mutable_datatype select 1, cast(2.2 as double), 'abc', cast('2014-01-01' as date), cast('2015-01-01 03:15:16' as datetime), false, '123.22', '123456789012345678.012345678' from datatype """ +// sql """ insert into mutable_datatype select 1, cast(2.1 as double), 'abc', cast('2014-01-01' as date), cast('2015-01-01 03:15:16' as datetime), FALSE, '123.22', '123456789012345678.012345678' """ +// sql """ insert into mutable_datatype values (null, null, null, null, null, null, null, null) """ +// sql """ insert into mutable_datatype select count(*), cast(1.1 as double), 'a', cast('2016-01-01' as date), cast('2015-01-01 03:15:16' as datetime), FALSE, '-123.22', '-123456789012345678.012345678' from datatype group by c_bigint """ +// sql """ insert into mutable_datatype select 5 * c_bigint, c_double + 15, c_string, c_date, c_timestamp, c_boolean, cast((c_short_decimal / '2.00') as decimal(5,2)), cast((c_long_decimal % '10') as decimal(27,9)) from datatype """ +// sql """ insert into mutable_datatype select * from datatype """ +// sql """ insert into mutable_datatype select * from mutable_datatype """ +// sql """ insert into mutable_datatype select * from datatype union all select * from datatype """ +// sql """ insert into mutable_datatype select * from datatype order by 1 limit 2 """ +// sql """ insert into mutable_datatype select * from datatype where c_bigint < 0 """ +// sql """ insert into mutable_datatype values(1,cast(2.34567 as double),'a',cast('2014-01-01' as date), cast ('2015-01-01 03:15:16' as datetime), TRUE, '123.22', '123456789012345678.012345678') """ +// sql """ insert into mutable_datatype values(1, cast(2.1 as double), 'abc', cast('2014-01-01' as date), cast('2015-01-01 03:15:16' as datetime), FALSE, '-123.22', '-123456789012345678.012345678') """ +// sql """ insert into mutable_datatype values(5 * 10, cast(4.1 + 5 as double), 'abc', cast('2014-01-01' as date), cast('2015-01-01 03:15:16' as datetime), TRUE, '123.22', '123456789012345678.012345678') """ +// +// sql "sync" +// qt_insert """ select * from mutable_datatype order by c_bigint, c_double, c_string, c_date, c_timestamp, c_boolean, c_short_decimal""" multi_sql """ @@ -104,12 +104,12 @@ suite("insert") { distributed by hash(id) buckets 100 properties('replication_num'='1') - insert into table_test_insert1 values(1), (10); + insert into table_test_insert1 values(1), (50); insert into table_test_insert1 with - a as (select * from table_select_test1 where id > 10), - b as (select * from a union all select * from a) - select id from b; + a as (select * from table_select_test1), + b as (select * from a) + select id from a; """ } From 116a1c295da067ced45274bcbaac3722439b6eb1 Mon Sep 17 00:00:00 2001 From: 924060929 <924060929@qq.com> Date: Tue, 30 Jul 2024 16:30:17 +0800 Subject: [PATCH 3/3] fix --- .../suites/insert_p0/insert.groovy | 124 +++++++++--------- 1 file changed, 62 insertions(+), 62 deletions(-) diff --git a/regression-test/suites/insert_p0/insert.groovy b/regression-test/suites/insert_p0/insert.groovy index 92bbb17b9324d7..4d1eae21962621 100644 --- a/regression-test/suites/insert_p0/insert.groovy +++ b/regression-test/suites/insert_p0/insert.groovy @@ -20,68 +20,68 @@ // /testing/trino-product-tests/src/main/resources/sql-tests/testcases // and modified by Doris. suite("insert") { -// def tables=["datatype", "mutable_datatype"] -// -// for (String table in tables) { -// sql """ DROP TABLE IF EXISTS $table """ -// } -// -// for (String table in tables) { -// sql new File("""${context.file.parent}/ddl/${table}.sql""").text -// } -// -// streamLoad { -// // you can skip declare db, because a default db already specify in ${DORIS_HOME}/conf/regression-conf.groovy -// // db 'regression_test' -// table 'datatype' -// -// // default label is UUID: -// // set 'label' UUID.randomUUID().toString() -// -// // default column_separator is specify in doris fe config, usually is '\t'. -// // this line change to ',' -// set 'column_separator', '|' -// -// // relate to ${DORIS_HOME}/regression-test/data/demo/streamload_input.csv. -// // also, you can stream load a http stream, e.g. http://xxx/some.csv -// file 'datetype.csv' -// -// time 10000 // limit inflight 10s -// -// // stream load action will check result, include Success status, and NumberTotalRows == NumberLoadedRows -// -// // if declared a check callback, the default check condition will ignore. -// // So you must check all condition -// check { result, exception, startTime, endTime -> -// if (exception != null) { -// throw exception -// } -// log.info("Stream load result: ${result}".toString()) -// def json = parseJson(result) -// assertEquals("success", json.Status.toLowerCase()) -// assertEquals(json.NumberTotalRows, json.NumberLoadedRows) -// assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0) -// } -// } -// -// sql """ insert into mutable_datatype select c_bigint, c_double, c_string, c_date, c_timestamp, c_boolean, c_short_decimal, c_long_decimal from datatype where c_double < 20 """ -// sql """ insert into mutable_datatype select 1, c_double, 'abc', cast('2014-01-01' as date), c_timestamp, FALSE, '123.22', '123456789012345678.012345678' from datatype """ -// sql """ insert into mutable_datatype select 1, cast(2.2 as double), 'abc', cast('2014-01-01' as date), cast('2015-01-01 03:15:16' as datetime), false, '123.22', '123456789012345678.012345678' from datatype """ -// sql """ insert into mutable_datatype select 1, cast(2.1 as double), 'abc', cast('2014-01-01' as date), cast('2015-01-01 03:15:16' as datetime), FALSE, '123.22', '123456789012345678.012345678' """ -// sql """ insert into mutable_datatype values (null, null, null, null, null, null, null, null) """ -// sql """ insert into mutable_datatype select count(*), cast(1.1 as double), 'a', cast('2016-01-01' as date), cast('2015-01-01 03:15:16' as datetime), FALSE, '-123.22', '-123456789012345678.012345678' from datatype group by c_bigint """ -// sql """ insert into mutable_datatype select 5 * c_bigint, c_double + 15, c_string, c_date, c_timestamp, c_boolean, cast((c_short_decimal / '2.00') as decimal(5,2)), cast((c_long_decimal % '10') as decimal(27,9)) from datatype """ -// sql """ insert into mutable_datatype select * from datatype """ -// sql """ insert into mutable_datatype select * from mutable_datatype """ -// sql """ insert into mutable_datatype select * from datatype union all select * from datatype """ -// sql """ insert into mutable_datatype select * from datatype order by 1 limit 2 """ -// sql """ insert into mutable_datatype select * from datatype where c_bigint < 0 """ -// sql """ insert into mutable_datatype values(1,cast(2.34567 as double),'a',cast('2014-01-01' as date), cast ('2015-01-01 03:15:16' as datetime), TRUE, '123.22', '123456789012345678.012345678') """ -// sql """ insert into mutable_datatype values(1, cast(2.1 as double), 'abc', cast('2014-01-01' as date), cast('2015-01-01 03:15:16' as datetime), FALSE, '-123.22', '-123456789012345678.012345678') """ -// sql """ insert into mutable_datatype values(5 * 10, cast(4.1 + 5 as double), 'abc', cast('2014-01-01' as date), cast('2015-01-01 03:15:16' as datetime), TRUE, '123.22', '123456789012345678.012345678') """ -// -// sql "sync" -// qt_insert """ select * from mutable_datatype order by c_bigint, c_double, c_string, c_date, c_timestamp, c_boolean, c_short_decimal""" + def tables=["datatype", "mutable_datatype"] + + for (String table in tables) { + sql """ DROP TABLE IF EXISTS $table """ + } + + for (String table in tables) { + sql new File("""${context.file.parent}/ddl/${table}.sql""").text + } + + streamLoad { + // you can skip declare db, because a default db already specify in ${DORIS_HOME}/conf/regression-conf.groovy + // db 'regression_test' + table 'datatype' + + // default label is UUID: + // set 'label' UUID.randomUUID().toString() + + // default column_separator is specify in doris fe config, usually is '\t'. + // this line change to ',' + set 'column_separator', '|' + + // relate to ${DORIS_HOME}/regression-test/data/demo/streamload_input.csv. + // also, you can stream load a http stream, e.g. http://xxx/some.csv + file 'datetype.csv' + + time 10000 // limit inflight 10s + + // stream load action will check result, include Success status, and NumberTotalRows == NumberLoadedRows + + // if declared a check callback, the default check condition will ignore. + // So you must check all condition + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(json.NumberTotalRows, json.NumberLoadedRows) + assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0) + } + } + + sql """ insert into mutable_datatype select c_bigint, c_double, c_string, c_date, c_timestamp, c_boolean, c_short_decimal, c_long_decimal from datatype where c_double < 20 """ + sql """ insert into mutable_datatype select 1, c_double, 'abc', cast('2014-01-01' as date), c_timestamp, FALSE, '123.22', '123456789012345678.012345678' from datatype """ + sql """ insert into mutable_datatype select 1, cast(2.2 as double), 'abc', cast('2014-01-01' as date), cast('2015-01-01 03:15:16' as datetime), false, '123.22', '123456789012345678.012345678' from datatype """ + sql """ insert into mutable_datatype select 1, cast(2.1 as double), 'abc', cast('2014-01-01' as date), cast('2015-01-01 03:15:16' as datetime), FALSE, '123.22', '123456789012345678.012345678' """ + sql """ insert into mutable_datatype values (null, null, null, null, null, null, null, null) """ + sql """ insert into mutable_datatype select count(*), cast(1.1 as double), 'a', cast('2016-01-01' as date), cast('2015-01-01 03:15:16' as datetime), FALSE, '-123.22', '-123456789012345678.012345678' from datatype group by c_bigint """ + sql """ insert into mutable_datatype select 5 * c_bigint, c_double + 15, c_string, c_date, c_timestamp, c_boolean, cast((c_short_decimal / '2.00') as decimal(5,2)), cast((c_long_decimal % '10') as decimal(27,9)) from datatype """ + sql """ insert into mutable_datatype select * from datatype """ + sql """ insert into mutable_datatype select * from mutable_datatype """ + sql """ insert into mutable_datatype select * from datatype union all select * from datatype """ + sql """ insert into mutable_datatype select * from datatype order by 1 limit 2 """ + sql """ insert into mutable_datatype select * from datatype where c_bigint < 0 """ + sql """ insert into mutable_datatype values(1,cast(2.34567 as double),'a',cast('2014-01-01' as date), cast ('2015-01-01 03:15:16' as datetime), TRUE, '123.22', '123456789012345678.012345678') """ + sql """ insert into mutable_datatype values(1, cast(2.1 as double), 'abc', cast('2014-01-01' as date), cast('2015-01-01 03:15:16' as datetime), FALSE, '-123.22', '-123456789012345678.012345678') """ + sql """ insert into mutable_datatype values(5 * 10, cast(4.1 + 5 as double), 'abc', cast('2014-01-01' as date), cast('2015-01-01 03:15:16' as datetime), TRUE, '123.22', '123456789012345678.012345678') """ + + sql "sync" + qt_insert """ select * from mutable_datatype order by c_bigint, c_double, c_string, c_date, c_timestamp, c_boolean, c_short_decimal""" multi_sql """