[fix](partial update) Fix some bugs about partial update (#28358)

This commit is contained in:
bobhan1
2023-12-15 00:04:29 +08:00
committed by GitHub
parent 8ca7bd8f98
commit 415c6d854d
4 changed files with 32 additions and 28 deletions

View File

@ -1021,7 +1021,7 @@ public class NativeInsertStmt extends InsertStmt {
throw new DdlException("txn does not exist: " + transactionId);
}
txnState.addTableIndexes((OlapTable) targetTable);
if (!isFromDeleteOrUpdateStmt && isPartialUpdate) {
if (isPartialUpdate) {
txnState.setSchemaForPartialUpdate((OlapTable) targetTable);
}
}

View File

@ -105,31 +105,6 @@ public class BindSink implements AnalysisRuleFactory {
sink.getDMLCommandType(),
child);
if (isPartialUpdate) {
// check the necessary conditions for partial updates
if (!table.getEnableUniqueKeyMergeOnWrite()) {
throw new AnalysisException("Partial update is only allowed on "
+ "unique table with merge-on-write enabled.");
}
if (sink.getColNames().isEmpty() && sink.getDMLCommandType() == DMLCommandType.INSERT) {
throw new AnalysisException("You must explicitly specify the columns to be updated when "
+ "updating partial columns using the INSERT statement.");
}
for (Column col : table.getFullSchema()) {
boolean exists = false;
for (Column insertCol : boundSink.getCols()) {
if (insertCol.getName().equals(col.getName())) {
exists = true;
break;
}
}
if (col.isKey() && !exists) {
throw new AnalysisException("Partial update should include all key columns, missing: "
+ col.getName());
}
}
}
// we need to insert all the columns of the target table
// although some columns are not mentions.
// so we add a projects to supply the default value.

View File

@ -23,6 +23,7 @@ import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
@ -52,6 +53,7 @@ import org.apache.doris.nereids.trees.expressions.literal.Literal;
import org.apache.doris.nereids.trees.expressions.literal.NullLiteral;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.algebra.SetOperation.Qualifier;
import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType;
import org.apache.doris.nereids.trees.plans.logical.LogicalInlineTable;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.types.DataType;
@ -91,6 +93,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
/**
@ -175,7 +178,7 @@ public class InsertExecutor {
throw new AnalysisException("txn does not exist: " + txnId);
}
state.addTableIndexes((OlapTable) table);
if (isPartialUpdate && isFromInsert) {
if (isPartialUpdate) {
state.setSchemaForPartialUpdate((OlapTable) table);
}
}
@ -495,6 +498,31 @@ public class InsertExecutor {
*/
public static Plan normalizePlan(Plan plan, TableIf table) {
UnboundTableSink<? extends Plan> unboundTableSink = (UnboundTableSink<? extends Plan>) plan;
if (table instanceof OlapTable && ((OlapTable) table).getKeysType() == KeysType.UNIQUE_KEYS
&& unboundTableSink.isPartialUpdate()) {
// check the necessary conditions for partial updates
OlapTable olapTable = (OlapTable) table;
if (!olapTable.getEnableUniqueKeyMergeOnWrite()) {
throw new AnalysisException("Partial update is only allowed on "
+ "unique table with merge-on-write enabled.");
}
if (unboundTableSink.getDMLCommandType() == DMLCommandType.INSERT) {
if (unboundTableSink.getColNames().isEmpty()) {
throw new AnalysisException("You must explicitly specify the columns to be updated when "
+ "updating partial columns using the INSERT statement.");
}
for (Column col : olapTable.getFullSchema()) {
Optional<String> insertCol = unboundTableSink.getColNames().stream()
.filter(c -> c.equalsIgnoreCase(col.getName())).findFirst();
if (col.isKey() && !insertCol.isPresent()) {
throw new AnalysisException("Partial update should include all key columns, missing: "
+ col.getName());
}
}
}
}
Plan query = unboundTableSink.child();
if (!(query instanceof LogicalInlineTable)) {
return plan;
@ -502,6 +530,7 @@ public class InsertExecutor {
LogicalInlineTable logicalInlineTable = (LogicalInlineTable) query;
ImmutableList.Builder<LogicalPlan> oneRowRelationBuilder = ImmutableList.builder();
List<Column> columns = table.getBaseSchema(false);
for (List<NamedExpression> values : logicalInlineTable.getConstantExprsList()) {
ImmutableList.Builder<NamedExpression> constantExprs = ImmutableList.builder();
if (values.isEmpty()) {