[Bug](load) fix load failed on stream load tvf into agg state (#28420)

fix load failed on stream load tvf into agg state
This commit is contained in:
Pxl
2024-01-04 17:38:31 +08:00
committed by GitHub
parent e0e34b8f93
commit 441fb49345
7 changed files with 102 additions and 40 deletions

View File

@ -45,6 +45,7 @@ import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.jdbc.JdbcExternalCatalog;
import org.apache.doris.load.Load;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.planner.DataPartition;
import org.apache.doris.planner.DataSink;
@ -709,15 +710,16 @@ public class NativeInsertStmt extends InsertStmt {
if (entry.second == null) {
queryStmt.getResultExprs().add(queryStmt.getResultExprs().get(entry.first));
} else {
//substitute define expr slot with select statement result expr
// substitute define expr slot with select statement result expr
ExprSubstitutionMap smap = new ExprSubstitutionMap();
List<SlotRef> columns = entry.second.getRefColumns();
for (SlotRef slot : columns) {
smap.getLhs().add(slot);
smap.getRhs().add(slotToIndex.get(slot.getColumnName()));
smap.getRhs()
.add(Load.getExprFromDesc(analyzer, slotToIndex.get(slot.getColumnName()), slot));
}
Expr e = Expr.substituteList(Lists.newArrayList(entry.second.getDefineExpr()),
smap, analyzer, false).get(0);
Expr e = entry.second.getDefineExpr().clone(smap);
e.analyze(analyzer);
queryStmt.getResultExprs().add(e);
}
}
@ -740,15 +742,16 @@ public class NativeInsertStmt extends InsertStmt {
if (entry.second == null) {
queryStmt.getBaseTblResultExprs().add(queryStmt.getBaseTblResultExprs().get(entry.first));
} else {
//substitute define expr slot with select statement result expr
// substitute define expr slot with select statement result expr
ExprSubstitutionMap smap = new ExprSubstitutionMap();
List<SlotRef> columns = entry.second.getRefColumns();
for (SlotRef slot : columns) {
smap.getLhs().add(slot);
smap.getRhs().add(slotToIndex.get(slot.getColumnName()));
smap.getRhs()
.add(Load.getExprFromDesc(analyzer, slotToIndex.get(slot.getColumnName()), slot));
}
Expr e = Expr.substituteList(Lists.newArrayList(entry.second.getDefineExpr()),
smap, analyzer, false).get(0);
Expr e = entry.second.getDefineExpr().clone(smap);
e.analyze(analyzer);
queryStmt.getBaseTblResultExprs().add(e);
}
}
@ -835,7 +838,8 @@ public class NativeInsertStmt extends InsertStmt {
List<SlotRef> columns = entry.second.getRefColumns();
for (SlotRef slot : columns) {
smap.getLhs().add(slot);
smap.getRhs().add(slotToIndex.get(slot.getColumnName()));
smap.getRhs()
.add(Load.getExprFromDesc(analyzer, slotToIndex.get(slot.getColumnName()), slot));
}
extentedRow.add(Expr.substituteList(Lists.newArrayList(entry.second.getDefineExpr()),
smap, analyzer, false).get(0));
@ -901,9 +905,7 @@ public class NativeInsertStmt extends InsertStmt {
int numCols = targetColumns.size();
for (int i = 0; i < numCols; ++i) {
Column col = targetColumns.get(i);
Expr expr = selectList.get(i).checkTypeCompatibility(col.getType());
selectList.set(i, expr);
exprByName.put(col.getName(), expr);
exprByName.put(col.getName(), selectList.get(i));
}
List<Pair<String, Expr>> resultExprByName = Lists.newArrayList();
@ -933,16 +935,7 @@ public class NativeInsertStmt extends InsertStmt {
}
continue;
} else if (col.getDefineExpr() != null) {
// substitute define expr slot with select statement result expr
ExprSubstitutionMap smap = new ExprSubstitutionMap();
List<SlotRef> columns = col.getRefColumns();
for (SlotRef slot : columns) {
smap.getLhs().add(slot);
smap.getRhs().add(slotToIndex.get(slot.getColumnName()));
}
targetExpr = Expr
.substituteList(Lists.newArrayList(col.getDefineExpr().clone()), smap, analyzer, false)
.get(0);
targetExpr = col.getDefineExpr().clone();
} else if (col.getDefaultValue() == null) {
targetExpr = NullLiteral.create(col.getType());
} else {
@ -955,12 +948,25 @@ public class NativeInsertStmt extends InsertStmt {
}
}
}
List<SlotRef> columns = col.getRefColumns();
if (columns != null) {
// substitute define expr slot with select statement result expr
ExprSubstitutionMap smap = new ExprSubstitutionMap();
for (SlotRef slot : columns) {
smap.getLhs().add(slot);
smap.getRhs().add(Load.getExprFromDesc(analyzer, slotToIndex.get(slot.getColumnName()), slot));
}
targetExpr = targetExpr.clone(smap);
targetExpr.analyze(analyzer);
}
resultExprByName.add(Pair.of(col.getName(), targetExpr));
slotToIndex.put(col.getName(), targetExpr);
}
resultExprs.addAll(resultExprByName.stream().map(Pair::value).collect(Collectors.toList()));
}
private DataSink createDataSink() throws AnalysisException {
if (dataSink != null) {
return dataSink;

View File

@ -472,11 +472,14 @@ public class Load {
LOG.debug("after init column, exprMap: {}", exprsByName);
}
private static Expr getExprFromDesc(Analyzer analyzer, SlotDescriptor slotDesc, SlotRef slot)
throws AnalysisException {
SlotRef newSlot = new SlotRef(slotDesc);
newSlot.setType(slotDesc.getType());
Expr rhs = newSlot;
private static SlotRef getSlotFromDesc(SlotDescriptor slotDesc) {
SlotRef slot = new SlotRef(slotDesc);
slot.setType(slotDesc.getType());
return slot;
}
public static Expr getExprFromDesc(Analyzer analyzer, Expr rhs, SlotRef slot) throws AnalysisException {
Type rhsType = rhs.getType();
rhs = rhs.castTo(slot.getType());
if (slot.getDesc() == null) {
@ -484,13 +487,13 @@ public class Load {
return rhs;
}
if (newSlot.isNullable() && !slot.isNullable()) {
if (rhs.isNullable() && !slot.isNullable()) {
rhs = new FunctionCallExpr("non_nullable", Lists.newArrayList(rhs));
rhs.setType(slotDesc.getType());
rhs.setType(rhsType);
rhs.analyze(analyzer);
} else if (!newSlot.isNullable() && slot.isNullable()) {
} else if (!rhs.isNullable() && slot.isNullable()) {
rhs = new FunctionCallExpr("nullable", Lists.newArrayList(rhs));
rhs.setType(slotDesc.getType());
rhs.setType(rhsType);
rhs.analyze(analyzer);
}
return rhs;
@ -553,7 +556,8 @@ public class Load {
for (SlotRef slot : slots) {
if (slotDescByName.get(slot.getColumnName()) != null) {
smap.getLhs().add(slot);
smap.getRhs().add(getExprFromDesc(analyzer, slotDescByName.get(slot.getColumnName()), slot));
smap.getRhs().add(
getExprFromDesc(analyzer, getSlotFromDesc(slotDescByName.get(slot.getColumnName())), slot));
} else if (exprsByName.get(slot.getColumnName()) != null) {
smap.getLhs().add(slot);
smap.getRhs().add(new CastExpr(tbl.getColumn(slot.getColumnName()).getType(),

View File

@ -248,9 +248,14 @@ public class OriginalPlanner extends Planner {
rootFragment.setSink(insertStmt.getDataSink());
insertStmt.complete();
List<Expr> exprs = statement.getResultExprs();
List<Expr> resExprs = Expr.substituteList(
exprs, rootFragment.getPlanRoot().getOutputSmap(), analyzer, true);
rootFragment.setOutputExprs(resExprs);
if (analyzer.getContext().getConnectionId() == 0) {
// stream load tvf
rootFragment.setOutputExprs(exprs);
} else {
List<Expr> resExprs = Expr.substituteList(exprs, rootFragment.getPlanRoot().getOutputSmap(), analyzer,
true);
rootFragment.setOutputExprs(resExprs);
}
} else {
List<Expr> resExprs = Expr.substituteList(queryStmt.getResultExprs(),
rootFragment.getPlanRoot().getOutputSmap(), analyzer, false);

View File

@ -36,7 +36,6 @@ import mockit.Injectable;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.StringReader;
import java.util.ArrayList;
@ -156,7 +155,6 @@ public class InsertStmtTest {
@Injectable
Table targetTable;
@Test
public void testNormal() throws Exception {
ConnectContext ctx = UtFrameUtils.createDefaultCtx();
String sql = "values(1,'a',2,'b')";
@ -228,7 +226,6 @@ public class InsertStmtTest {
Assert.assertEquals(queryStmtSubstitute.getResultExprs().get(1), slots.get(0));
}
@Test
public void testInsertSelect() throws Exception {
ConnectContext ctx = UtFrameUtils.createDefaultCtx();
String sql = "select kk1, kk2, kk3, kk4 from db.tbl";

View File

@ -2062,8 +2062,8 @@ public class QueryPlanTest extends TestWithFeService {
String explainString = getSQLPlanOrErrorMsg(queryStr);
Assert.assertFalse(explainString.contains("OUTPUT EXPRS:\n 3\n 4"));
System.out.println(explainString);
Assert.assertTrue(explainString.contains(
"OUTPUT EXPRS:\n" + " CAST(<slot 4> <slot 2> 3 AS INT)\n" + " CAST(<slot 5> <slot 3> 4 AS INT)"));
Assert.assertTrue(explainString, explainString
.contains("OUTPUT EXPRS:\n" + " CAST(`a`.`aid` AS INT)\n" + " CAST(`b`.`bid` AS INT)"));
}
@Test

View File

@ -6,6 +6,8 @@
1 1 1 a
1 2 2 b
100 200 300 lalala
100 200 300 lalala
111 -444 -4444 ddd
111 -444 -4444 ddd
-- !select_mv --
@ -14,6 +16,23 @@
100 200
111 -444
-- !select_star --
\N 4 \N d
\N 4 \N d
\N 4 \N d
1 -4 -4 d
1 -3 \N c
1 1 1 a
1 2 2 b
100 200 300 lalala
100 200 300 lalala
100 200 300 lalala
100 200 300 lalala
111 -444 -4444 ddd
111 -444 -4444 ddd
111 -444 -4444 ddd
111 -444 -4444 ddd
-- !select_mv --
\N \N
1 4

View File

@ -52,6 +52,17 @@ suite ("test_agg_state_max_by") {
time 10000 // limit inflight 10s
}
streamLoad {
set 'version', '1'
set 'sql', """
insert into regression_test_mv_p0_agg_state.d_table select * from http_stream
("format"="csv", "column_separator"=",")
"""
file './test'
time 10000 // limit inflight 10s
}
qt_select_star "select * from d_table order by 1,2;"
explain {
sql("select k1,max_by(k2,k3) from d_table group by k1 order by 1,2;")
@ -67,6 +78,26 @@ suite ("test_agg_state_max_by") {
sql "set enable_nereids_dml = true"
sql "insert into d_table(k4,k2) values('d',4);"
streamLoad {
table "d_table"
set 'column_separator', ','
file './test'
time 10000 // limit inflight 10s
}
streamLoad {
set 'version', '1'
set 'sql', """
insert into regression_test_mv_p0_agg_state.d_table select * from http_stream
("format"="csv", "column_separator"=",")
"""
file './test'
time 10000 // limit inflight 10s
}
qt_select_star "select * from d_table order by 1,2;"
explain {
sql("select k1,max_by(k2+k3,abs(k3)) from d_table group by k1 order by 1,2;")
contains "(k1mbcp1)"