[Fix](partial-update) Fix wrong column number passing to BE when partial and enable nereids (#31461)

* Problem:
Inconsistent behavior occurs when executing partial column update `UPDATE` statements and `INSERT` statements on merge-on-write tables with the Nereids optimizer enabled. The number of columns passed to BE differs; `UPDATE` operations incorrectly pass all columns, while `INSERT` operations correctly pass only the updated columns.

Reason:
The Nereids optimizer does not handle partial column update `UPDATE` statements properly. The processing logic for `UPDATE` statements rewrites them as equivalent `INSERT` statements, which are then processed according to the logic of `INSERT` statements. For example, assuming a MoW table structure with columns k1, k2, v1, v2, the correct rewrite should be:
* `UPDATE` table t1 set v1 = v1 + 1 where k1 = 1 and k2 = 2
 * =>
 * `INSERT` into table (v1) select v1 + 1 from table t1 where k1 = 1 and k2 = 2

However, the actual rewriting process does not consider the logic for partial column updates, leading to all columns being included in the `INSERT` statement, i.e., the result is:
* `INSERT` into table (k1, k2, v1, v2) select k1, k2, v1 + 1, v2 from table t1 where k1 = 1 and k2 = 2

This results in `UPDATE` operations incorrectly passing all columns to BE.

Solution:
Having analyzed the cause, the solution is straightforward: when rewriting partial column update `UPDATE` statements to `INSERT` statements, only retain the updated columns and all key columns (as partial column updates must include all key columns). Additionally, this PR includes error injection cases to verify the number of columns passed to BE is correct.

* 2

* 3

* 4

* 5
This commit is contained in:
abmdocrt
2024-03-08 13:47:56 +08:00
committed by yiguolei
parent e8aa5ee7d5
commit 609761567c
10 changed files with 263 additions and 28 deletions

View File

@ -49,6 +49,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@ -103,10 +104,17 @@ public class UpdateCommand extends Command implements ForwardWithSync, Explainab
checkTable(ctx);
Map<String, Expression> colNameToExpression = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
Map<String, Expression> partialUpdateColNameToExpression = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
for (EqualTo equalTo : assignments) {
List<String> nameParts = ((UnboundSlot) equalTo.left()).getNameParts();
checkAssignmentColumn(ctx, nameParts);
colNameToExpression.put(nameParts.get(nameParts.size() - 1), equalTo.right());
partialUpdateColNameToExpression.put(nameParts.get(nameParts.size() - 1), equalTo.right());
}
// check if any key in update clause
if (targetTable.getFullSchema().stream().filter(Column::isKey)
.anyMatch(column -> partialUpdateColNameToExpression.containsKey(column.getName()))) {
throw new AnalysisException("Only value columns of unique table could be updated");
}
List<NamedExpression> selectItems = Lists.newArrayList();
String tableName = tableAlias != null ? tableAlias : targetTable.getName();
@ -147,16 +155,40 @@ public class UpdateCommand extends Command implements ForwardWithSync, Explainab
+ String.join(", ", colNameToExpression.keySet()));
}
logicalQuery = new LogicalProject<>(selectItems, logicalQuery);
boolean isPartialUpdate = targetTable.getEnableUniqueKeyMergeOnWrite()
&& selectItems.size() < targetTable.getColumns().size()
&& !targetTable.hasVariantColumns() && targetTable.getSequenceCol() == null
&& partialUpdateColNameToExpression.size() <= targetTable.getFullSchema().size() * 3 / 10;
List<String> partialUpdateColNames = new ArrayList<>();
List<NamedExpression> partialUpdateSelectItems = new ArrayList<>();
if (isPartialUpdate) {
for (Column column : targetTable.getFullSchema()) {
Expression expr = new NereidsParser().parseExpression(tableName + "." + column.getName());
boolean existInExpr = false;
for (String colName : partialUpdateColNameToExpression.keySet()) {
if (colName.equalsIgnoreCase(column.getName())) {
expr = partialUpdateColNameToExpression.get(column.getName());
existInExpr = true;
break;
}
}
if (column.isKey() || existInExpr) {
partialUpdateSelectItems.add(expr instanceof UnboundSlot
? ((NamedExpression) expr)
: new UnboundAlias(expr));
partialUpdateColNames.add(column.getName());
}
}
}
logicalQuery = new LogicalProject<>(isPartialUpdate ? partialUpdateSelectItems : selectItems, logicalQuery);
if (cte.isPresent()) {
logicalQuery = ((LogicalPlan) cte.get().withChildren(logicalQuery));
}
boolean isPartialUpdate = targetTable.getEnableUniqueKeyMergeOnWrite()
&& selectItems.size() < targetTable.getColumns().size()
&& !targetTable.hasVariantColumns();
// make UnboundTableSink
return new UnboundTableSink<>(nameParts, ImmutableList.of(), ImmutableList.of(),
return new UnboundTableSink<>(nameParts, isPartialUpdate ? partialUpdateColNames : ImmutableList.of(),
ImmutableList.of(),
false, ImmutableList.of(), isPartialUpdate, DMLCommandType.UPDATE, logicalQuery);
}