diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java index 176833b865..41e20df9ad 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java @@ -45,6 +45,7 @@ import org.apache.doris.common.util.DebugUtil; import org.apache.doris.common.util.Util; import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.jdbc.JdbcExternalCatalog; +import org.apache.doris.load.Load; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.planner.DataPartition; import org.apache.doris.planner.DataSink; @@ -709,15 +710,16 @@ public class NativeInsertStmt extends InsertStmt { if (entry.second == null) { queryStmt.getResultExprs().add(queryStmt.getResultExprs().get(entry.first)); } else { - //substitute define expr slot with select statement result expr + // substitute define expr slot with select statement result expr ExprSubstitutionMap smap = new ExprSubstitutionMap(); List columns = entry.second.getRefColumns(); for (SlotRef slot : columns) { smap.getLhs().add(slot); - smap.getRhs().add(slotToIndex.get(slot.getColumnName())); + smap.getRhs() + .add(Load.getExprFromDesc(analyzer, slotToIndex.get(slot.getColumnName()), slot)); } - Expr e = Expr.substituteList(Lists.newArrayList(entry.second.getDefineExpr()), - smap, analyzer, false).get(0); + Expr e = entry.second.getDefineExpr().clone(smap); + e.analyze(analyzer); queryStmt.getResultExprs().add(e); } } @@ -740,15 +742,16 @@ public class NativeInsertStmt extends InsertStmt { if (entry.second == null) { queryStmt.getBaseTblResultExprs().add(queryStmt.getBaseTblResultExprs().get(entry.first)); } else { - //substitute define expr slot with select statement result expr + // substitute define expr slot with select statement result expr ExprSubstitutionMap smap = new ExprSubstitutionMap(); List columns = entry.second.getRefColumns(); for (SlotRef slot : columns) { smap.getLhs().add(slot); - smap.getRhs().add(slotToIndex.get(slot.getColumnName())); + smap.getRhs() + .add(Load.getExprFromDesc(analyzer, slotToIndex.get(slot.getColumnName()), slot)); } - Expr e = Expr.substituteList(Lists.newArrayList(entry.second.getDefineExpr()), - smap, analyzer, false).get(0); + Expr e = entry.second.getDefineExpr().clone(smap); + e.analyze(analyzer); queryStmt.getBaseTblResultExprs().add(e); } } @@ -835,7 +838,8 @@ public class NativeInsertStmt extends InsertStmt { List columns = entry.second.getRefColumns(); for (SlotRef slot : columns) { smap.getLhs().add(slot); - smap.getRhs().add(slotToIndex.get(slot.getColumnName())); + smap.getRhs() + .add(Load.getExprFromDesc(analyzer, slotToIndex.get(slot.getColumnName()), slot)); } extentedRow.add(Expr.substituteList(Lists.newArrayList(entry.second.getDefineExpr()), smap, analyzer, false).get(0)); @@ -901,9 +905,7 @@ public class NativeInsertStmt extends InsertStmt { int numCols = targetColumns.size(); for (int i = 0; i < numCols; ++i) { Column col = targetColumns.get(i); - Expr expr = selectList.get(i).checkTypeCompatibility(col.getType()); - selectList.set(i, expr); - exprByName.put(col.getName(), expr); + exprByName.put(col.getName(), selectList.get(i)); } List> resultExprByName = Lists.newArrayList(); @@ -933,16 +935,7 @@ public class NativeInsertStmt extends InsertStmt { } continue; } else if (col.getDefineExpr() != null) { - // substitute define expr slot with select statement result expr - ExprSubstitutionMap smap = new ExprSubstitutionMap(); - List columns = col.getRefColumns(); - for (SlotRef slot : columns) { - smap.getLhs().add(slot); - smap.getRhs().add(slotToIndex.get(slot.getColumnName())); - } - targetExpr = Expr - .substituteList(Lists.newArrayList(col.getDefineExpr().clone()), smap, analyzer, false) - .get(0); + targetExpr = col.getDefineExpr().clone(); } else if (col.getDefaultValue() == null) { targetExpr = NullLiteral.create(col.getType()); } else { @@ -955,12 +948,25 @@ public class NativeInsertStmt extends InsertStmt { } } } + + List columns = col.getRefColumns(); + if (columns != null) { + // substitute define expr slot with select statement result expr + ExprSubstitutionMap smap = new ExprSubstitutionMap(); + for (SlotRef slot : columns) { + smap.getLhs().add(slot); + smap.getRhs().add(Load.getExprFromDesc(analyzer, slotToIndex.get(slot.getColumnName()), slot)); + } + targetExpr = targetExpr.clone(smap); + targetExpr.analyze(analyzer); + } resultExprByName.add(Pair.of(col.getName(), targetExpr)); slotToIndex.put(col.getName(), targetExpr); } resultExprs.addAll(resultExprByName.stream().map(Pair::value).collect(Collectors.toList())); } + private DataSink createDataSink() throws AnalysisException { if (dataSink != null) { return dataSink; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java index 8dd3a0ebc2..b27ce8c16f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java @@ -472,11 +472,14 @@ public class Load { LOG.debug("after init column, exprMap: {}", exprsByName); } - private static Expr getExprFromDesc(Analyzer analyzer, SlotDescriptor slotDesc, SlotRef slot) - throws AnalysisException { - SlotRef newSlot = new SlotRef(slotDesc); - newSlot.setType(slotDesc.getType()); - Expr rhs = newSlot; + private static SlotRef getSlotFromDesc(SlotDescriptor slotDesc) { + SlotRef slot = new SlotRef(slotDesc); + slot.setType(slotDesc.getType()); + return slot; + } + + public static Expr getExprFromDesc(Analyzer analyzer, Expr rhs, SlotRef slot) throws AnalysisException { + Type rhsType = rhs.getType(); rhs = rhs.castTo(slot.getType()); if (slot.getDesc() == null) { @@ -484,13 +487,13 @@ public class Load { return rhs; } - if (newSlot.isNullable() && !slot.isNullable()) { + if (rhs.isNullable() && !slot.isNullable()) { rhs = new FunctionCallExpr("non_nullable", Lists.newArrayList(rhs)); - rhs.setType(slotDesc.getType()); + rhs.setType(rhsType); rhs.analyze(analyzer); - } else if (!newSlot.isNullable() && slot.isNullable()) { + } else if (!rhs.isNullable() && slot.isNullable()) { rhs = new FunctionCallExpr("nullable", Lists.newArrayList(rhs)); - rhs.setType(slotDesc.getType()); + rhs.setType(rhsType); rhs.analyze(analyzer); } return rhs; @@ -553,7 +556,8 @@ public class Load { for (SlotRef slot : slots) { if (slotDescByName.get(slot.getColumnName()) != null) { smap.getLhs().add(slot); - smap.getRhs().add(getExprFromDesc(analyzer, slotDescByName.get(slot.getColumnName()), slot)); + smap.getRhs().add( + getExprFromDesc(analyzer, getSlotFromDesc(slotDescByName.get(slot.getColumnName())), slot)); } else if (exprsByName.get(slot.getColumnName()) != null) { smap.getLhs().add(slot); smap.getRhs().add(new CastExpr(tbl.getColumn(slot.getColumnName()).getType(), diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java index 89044987c9..4e8fb7a4ea 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java @@ -248,9 +248,14 @@ public class OriginalPlanner extends Planner { rootFragment.setSink(insertStmt.getDataSink()); insertStmt.complete(); List exprs = statement.getResultExprs(); - List resExprs = Expr.substituteList( - exprs, rootFragment.getPlanRoot().getOutputSmap(), analyzer, true); - rootFragment.setOutputExprs(resExprs); + if (analyzer.getContext().getConnectionId() == 0) { + // stream load tvf + rootFragment.setOutputExprs(exprs); + } else { + List resExprs = Expr.substituteList(exprs, rootFragment.getPlanRoot().getOutputSmap(), analyzer, + true); + rootFragment.setOutputExprs(resExprs); + } } else { List resExprs = Expr.substituteList(queryStmt.getResultExprs(), rootFragment.getPlanRoot().getOutputSmap(), analyzer, false); diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/InsertStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/InsertStmtTest.java index 64c3494ba7..5074b0f380 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/InsertStmtTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/InsertStmtTest.java @@ -36,7 +36,6 @@ import mockit.Injectable; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; -import org.junit.Test; import java.io.StringReader; import java.util.ArrayList; @@ -156,7 +155,6 @@ public class InsertStmtTest { @Injectable Table targetTable; - @Test public void testNormal() throws Exception { ConnectContext ctx = UtFrameUtils.createDefaultCtx(); String sql = "values(1,'a',2,'b')"; @@ -228,7 +226,6 @@ public class InsertStmtTest { Assert.assertEquals(queryStmtSubstitute.getResultExprs().get(1), slots.get(0)); } - @Test public void testInsertSelect() throws Exception { ConnectContext ctx = UtFrameUtils.createDefaultCtx(); String sql = "select kk1, kk2, kk3, kk4 from db.tbl"; diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java index 632b598815..3f7aaa7099 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java @@ -2062,8 +2062,8 @@ public class QueryPlanTest extends TestWithFeService { String explainString = getSQLPlanOrErrorMsg(queryStr); Assert.assertFalse(explainString.contains("OUTPUT EXPRS:\n 3\n 4")); System.out.println(explainString); - Assert.assertTrue(explainString.contains( - "OUTPUT EXPRS:\n" + " CAST( 3 AS INT)\n" + " CAST( 4 AS INT)")); + Assert.assertTrue(explainString, explainString + .contains("OUTPUT EXPRS:\n" + " CAST(`a`.`aid` AS INT)\n" + " CAST(`b`.`bid` AS INT)")); } @Test diff --git a/regression-test/data/mv_p0/agg_state/test_agg_state_max_by.out b/regression-test/data/mv_p0/agg_state/test_agg_state_max_by.out index fef8545d8c..f8b29bc36f 100644 --- a/regression-test/data/mv_p0/agg_state/test_agg_state_max_by.out +++ b/regression-test/data/mv_p0/agg_state/test_agg_state_max_by.out @@ -6,6 +6,8 @@ 1 1 1 a 1 2 2 b 100 200 300 lalala +100 200 300 lalala +111 -444 -4444 ddd 111 -444 -4444 ddd -- !select_mv -- @@ -14,6 +16,23 @@ 100 200 111 -444 +-- !select_star -- +\N 4 \N d +\N 4 \N d +\N 4 \N d +1 -4 -4 d +1 -3 \N c +1 1 1 a +1 2 2 b +100 200 300 lalala +100 200 300 lalala +100 200 300 lalala +100 200 300 lalala +111 -444 -4444 ddd +111 -444 -4444 ddd +111 -444 -4444 ddd +111 -444 -4444 ddd + -- !select_mv -- \N \N 1 4 diff --git a/regression-test/suites/mv_p0/agg_state/test_agg_state_max_by.groovy b/regression-test/suites/mv_p0/agg_state/test_agg_state_max_by.groovy index 90ad5cfb98..8c8083e343 100644 --- a/regression-test/suites/mv_p0/agg_state/test_agg_state_max_by.groovy +++ b/regression-test/suites/mv_p0/agg_state/test_agg_state_max_by.groovy @@ -52,6 +52,17 @@ suite ("test_agg_state_max_by") { time 10000 // limit inflight 10s } + streamLoad { + set 'version', '1' + set 'sql', """ + insert into regression_test_mv_p0_agg_state.d_table select * from http_stream + ("format"="csv", "column_separator"=",") + """ + file './test' + + time 10000 // limit inflight 10s + } + qt_select_star "select * from d_table order by 1,2;" explain { sql("select k1,max_by(k2,k3) from d_table group by k1 order by 1,2;") @@ -67,6 +78,26 @@ suite ("test_agg_state_max_by") { sql "set enable_nereids_dml = true" sql "insert into d_table(k4,k2) values('d',4);" + streamLoad { + table "d_table" + set 'column_separator', ',' + file './test' + time 10000 // limit inflight 10s + } + + streamLoad { + set 'version', '1' + set 'sql', """ + insert into regression_test_mv_p0_agg_state.d_table select * from http_stream + ("format"="csv", "column_separator"=",") + """ + file './test' + + time 10000 // limit inflight 10s + } + + qt_select_star "select * from d_table order by 1,2;" + explain { sql("select k1,max_by(k2+k3,abs(k3)) from d_table group by k1 order by 1,2;") contains "(k1mbcp1)"