Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
924060929 committed Jul 30, 2024
1 parent 0b43fa9 commit c32d296
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.doris.planner.DataSink;
import org.apache.doris.planner.DataStreamSink;
import org.apache.doris.planner.ExchangeNode;
import org.apache.doris.planner.MultiCastDataSink;
import org.apache.doris.planner.OlapTableSink;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.qe.ConnectContext;
Expand Down Expand Up @@ -139,7 +140,28 @@ public void finalizeSink(PlanFragment fragment, DataSink sink, PhysicalSink phys
// set schema and partition info for tablet id shuffle exchange
if (fragment.getPlanRoot() instanceof ExchangeNode
&& fragment.getDataPartition().getType() == TPartitionType.TABLET_SINK_SHUFFLE_PARTITIONED) {
DataStreamSink dataStreamSink = (DataStreamSink) (fragment.getChild(0).getSink());
DataSink childFragmentSink = fragment.getChild(0).getSink();
DataStreamSink dataStreamSink = null;
if (childFragmentSink instanceof MultiCastDataSink) {
MultiCastDataSink multiCastDataSink = (MultiCastDataSink) childFragmentSink;
int outputExchangeId = (fragment.getPlanRoot()).getId().asInt();
// which DataStreamSink link to the output exchangeNode?
for (DataStreamSink currentDataStreamSink : multiCastDataSink.getDataStreamSinks()) {
int sinkExchangeId = currentDataStreamSink.getExchNodeId().asInt();
if (outputExchangeId == sinkExchangeId) {
dataStreamSink = currentDataStreamSink;
break;
}
}
if (dataStreamSink == null) {
throw new IllegalStateException("Can not find DataStreamSink in the MultiCastDataSink");
}
} else if (childFragmentSink instanceof DataStreamSink) {
dataStreamSink = (DataStreamSink) childFragmentSink;
} else {
throw new IllegalStateException("Unsupported DataSink: " + childFragmentSink);
}

Analyzer analyzer = new Analyzer(Env.getCurrentEnv(), ConnectContext.get());
dataStreamSink.setTabletSinkSchemaParam(olapTableSink.createSchema(
database.getId(), olapTableSink.getDstTable(), analyzer));
Expand Down
29 changes: 29 additions & 0 deletions regression-test/suites/insert_p0/insert.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,33 @@ suite("insert") {
sql "sync"
qt_insert """ select * from mutable_datatype order by c_bigint, c_double, c_string, c_date, c_timestamp, c_boolean, c_short_decimal"""


multi_sql """
drop table if exists table_select_test1;
CREATE TABLE table_select_test1 (
`id` int
)
distributed by hash(id)
properties('replication_num'='1');
insert into table_select_test1 values(2);
drop table if exists table_test_insert1;
create table table_test_insert1 (id int)
partition by range(id)
(
partition p1 values[('1'), ('50')),
partition p2 values[('50'), ('100'))
)
distributed by hash(id) buckets 100
properties('replication_num'='1')
insert into table_test_insert1 values(1), (10);
insert into table_test_insert1
with
a as (select * from table_select_test1 where id > 10),
b as (select * from a union all select * from a)
select id from b;
"""
}

0 comments on commit c32d296

Please sign in to comment.