From 299fcc443e87d1d4b31b06a696b700aa9a556942 Mon Sep 17 00:00:00 2001 From: Pxl Date: Wed, 6 Dec 2023 20:41:29 +0800 Subject: [PATCH] [Bug](agg-state) fix stream load failed on agg-state column (#28050) --- .../main/java/org/apache/doris/load/Load.java | 32 ++++++++++++++++--- regression-test/data/mv_p0/agg_state/test | 2 ++ .../mv_p0/agg_state/test_agg_state_max_by.out | 10 ++++++ .../agg_state/test_agg_state_max_by.groovy | 8 +++++ 4 files changed, 48 insertions(+), 4 deletions(-) create mode 100644 regression-test/data/mv_p0/agg_state/test diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java index a3e25cde6b..ea50266539 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java @@ -232,8 +232,9 @@ public class Load { * -> * (A, B, C) SET (__doris_shadow_B = B) */ - ImportColumnDesc importColumnDesc = new ImportColumnDesc(column.getName(), - new SlotRef(null, originCol)); + SlotRef slot = new SlotRef(null, originCol); + slot.setType(column.getType()); + ImportColumnDesc importColumnDesc = new ImportColumnDesc(column.getName(), slot); shadowColumnDescs.add(importColumnDesc); } } else { @@ -464,6 +465,30 @@ public class Load { LOG.debug("after init column, exprMap: {}", exprsByName); } + private static Expr getExprFromDesc(Analyzer analyzer, SlotDescriptor slotDesc, SlotRef slot) + throws AnalysisException { + SlotRef newSlot = new SlotRef(slotDesc); + newSlot.setType(slotDesc.getType()); + Expr rhs = newSlot; + rhs = rhs.castTo(slot.getType()); + + if (slot.getDesc() == null) { + // shadow column + return rhs; + } + + if (newSlot.isNullable() && !slot.isNullable()) { + rhs = new FunctionCallExpr("non_nullable", Lists.newArrayList(rhs)); + rhs.setType(slotDesc.getType()); + rhs.analyze(analyzer); + } else if (!newSlot.isNullable() && slot.isNullable()) { + rhs = new FunctionCallExpr("nullable", Lists.newArrayList(rhs)); + rhs.setType(slotDesc.getType()); + rhs.analyze(analyzer); + } + return rhs; + } + private static void analyzeAllExprs(Table tbl, Analyzer analyzer, Map exprsByName, Map mvDefineExpr, Map slotDescByName) throws UserException { // analyze all exprs @@ -521,8 +546,7 @@ public class Load { for (SlotRef slot : slots) { if (slotDescByName.get(slot.getColumnName()) != null) { smap.getLhs().add(slot); - smap.getRhs().add(new CastExpr(tbl.getColumn(slot.getColumnName()).getType(), - new SlotRef(slotDescByName.get(slot.getColumnName())))); + smap.getRhs().add(getExprFromDesc(analyzer, slotDescByName.get(slot.getColumnName()), slot)); } else if (exprsByName.get(slot.getColumnName()) != null) { smap.getLhs().add(slot); smap.getRhs().add(new CastExpr(tbl.getColumn(slot.getColumnName()).getType(), diff --git a/regression-test/data/mv_p0/agg_state/test b/regression-test/data/mv_p0/agg_state/test new file mode 100644 index 0000000000..668ff5ffcf --- /dev/null +++ b/regression-test/data/mv_p0/agg_state/test @@ -0,0 +1,2 @@ +100,200,300,lalala +111,-444,-4444,ddd \ No newline at end of file diff --git a/regression-test/data/mv_p0/agg_state/test_agg_state_max_by.out b/regression-test/data/mv_p0/agg_state/test_agg_state_max_by.out index 406e9fa334..fef8545d8c 100644 --- a/regression-test/data/mv_p0/agg_state/test_agg_state_max_by.out +++ b/regression-test/data/mv_p0/agg_state/test_agg_state_max_by.out @@ -5,20 +5,30 @@ 1 -3 \N c 1 1 1 a 1 2 2 b +100 200 300 lalala +111 -444 -4444 ddd -- !select_mv -- \N \N 1 2 +100 200 +111 -444 -- !select_mv -- \N \N 1 4 +100 500 +111 -4888 -- !select_mv -- \N \N 1 4 +100 500 +111 -4888 -- !select_mv -- \N \N 1 -4 +100 200 +111 -444 diff --git a/regression-test/suites/mv_p0/agg_state/test_agg_state_max_by.groovy b/regression-test/suites/mv_p0/agg_state/test_agg_state_max_by.groovy index 08c8e00e91..90ad5cfb98 100644 --- a/regression-test/suites/mv_p0/agg_state/test_agg_state_max_by.groovy +++ b/regression-test/suites/mv_p0/agg_state/test_agg_state_max_by.groovy @@ -44,6 +44,14 @@ suite ("test_agg_state_max_by") { sql "insert into d_table select 1,-4,-4,'d';" + + streamLoad { + table "d_table" + set 'column_separator', ',' + file './test' + time 10000 // limit inflight 10s + } + qt_select_star "select * from d_table order by 1,2;" explain { sql("select k1,max_by(k2,k3) from d_table group by k1 order by 1,2;")