[fix](nereids) fix insert stmt throw MultiCastDataSink cannot be cast to DataStreamSink (#38526) (#38547)

fix `insert ... with ... select ...`, which not use some cte, and throw an exception:
```
errCode = 2, detailMessage = class org.apache.doris.planner.MultiCastDataSink cannot be cast to class org.apache.doris.planner.DataStreamSink (org.apache.doris.planner.MultiCastDataSink and org.apache.doris.planner.DataStreamSink are in unnamed module of loader 'app')
```
This commit is contained in:
924060929
2024-07-31 00:47:30 +08:00
committed by GitHub
parent 1ba1e343bd
commit a328e01d97
2 changed files with 52 additions and 1 deletions

View File

@ -38,6 +38,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalSink;
import org.apache.doris.planner.DataSink;
import org.apache.doris.planner.DataStreamSink;
import org.apache.doris.planner.ExchangeNode;
import org.apache.doris.planner.MultiCastDataSink;
import org.apache.doris.planner.OlapTableSink;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.qe.ConnectContext;
@ -147,7 +148,28 @@ public class OlapInsertExecutor extends AbstractInsertExecutor {
// set schema and partition info for tablet id shuffle exchange
if (fragment.getPlanRoot() instanceof ExchangeNode
&& fragment.getDataPartition().getType() == TPartitionType.TABLET_SINK_SHUFFLE_PARTITIONED) {
DataStreamSink dataStreamSink = (DataStreamSink) (fragment.getChild(0).getSink());
DataSink childFragmentSink = fragment.getChild(0).getSink();
DataStreamSink dataStreamSink = null;
if (childFragmentSink instanceof MultiCastDataSink) {
MultiCastDataSink multiCastDataSink = (MultiCastDataSink) childFragmentSink;
int outputExchangeId = (fragment.getPlanRoot()).getId().asInt();
// which DataStreamSink link to the output exchangeNode?
for (DataStreamSink currentDataStreamSink : multiCastDataSink.getDataStreamSinks()) {
int sinkExchangeId = currentDataStreamSink.getExchNodeId().asInt();
if (outputExchangeId == sinkExchangeId) {
dataStreamSink = currentDataStreamSink;
break;
}
}
if (dataStreamSink == null) {
throw new IllegalStateException("Can not find DataStreamSink in the MultiCastDataSink");
}
} else if (childFragmentSink instanceof DataStreamSink) {
dataStreamSink = (DataStreamSink) childFragmentSink;
} else {
throw new IllegalStateException("Unsupported DataSink: " + childFragmentSink);
}
Analyzer analyzer = new Analyzer(Env.getCurrentEnv(), ConnectContext.get());
dataStreamSink.setTabletSinkSchemaParam(olapTableSink.createSchema(
database.getId(), olapTableSink.getDstTable(), analyzer));