[feature](partial update) support MOW partial update for insert statement (#21597)

This commit is contained in:
bobhan1
2023-09-16 17:11:59 +08:00
committed by GitHub
parent 643d09de06
commit ed8db3727c
24 changed files with 1078 additions and 33 deletions

View File

@ -203,6 +203,7 @@ public class DeleteStmt extends DdlStmt {
null,
isPartialUpdate,
false);
((NativeInsertStmt) insertStmt).setIsFromDeleteOrUpdateStmt(true);
}
private void analyzeTargetTable(Analyzer analyzer) throws UserException {

View File

@ -167,6 +167,7 @@ public class NativeInsertStmt extends InsertStmt {
public boolean isInnerGroupCommit = false;
private boolean isInsertIgnore = false;
private boolean isFromDeleteOrUpdateStmt = false;
public NativeInsertStmt(InsertTarget target, String label, List<String> cols, InsertSource source,
List<String> hints) {
@ -405,8 +406,10 @@ public class NativeInsertStmt extends InsertStmt {
OlapTableSink sink = (OlapTableSink) dataSink;
TUniqueId loadId = analyzer.getContext().queryId();
int sendBatchParallelism = analyzer.getContext().getSessionVariable().getSendBatchParallelism();
boolean isInsertStrict = analyzer.getContext().getSessionVariable().getEnableInsertStrict()
&& !isFromDeleteOrUpdateStmt;
sink.init(loadId, transactionId, db.getId(), timeoutSecond,
sendBatchParallelism, false, false, isInsertIgnore);
sendBatchParallelism, false, isInsertStrict, isInsertIgnore);
}
}
@ -618,6 +621,10 @@ public class NativeInsertStmt extends InsertStmt {
ErrorReport.reportAnalysisException(ErrorCode.ERR_WRONG_VALUE_COUNT);
}
if (analyzer.getContext().getSessionVariable().isEnableUniqueKeyPartialUpdate()) {
trySetPartialUpdate();
}
// Check if all columns mentioned is enough
checkColumnCoverage(mentionedColumns, targetTable.getBaseSchema());
@ -1098,4 +1105,54 @@ public class NativeInsertStmt extends InsertStmt {
public ByteString getRangeBytes() {
return rangeBytes;
}
public void setIsFromDeleteOrUpdateStmt(boolean isFromDeleteOrUpdateStmt) {
this.isFromDeleteOrUpdateStmt = isFromDeleteOrUpdateStmt;
}
private void trySetPartialUpdate() throws UserException {
if (isFromDeleteOrUpdateStmt || isPartialUpdate || !(targetTable instanceof OlapTable)) {
return;
}
OlapTable olapTable = (OlapTable) targetTable;
if (!olapTable.getEnableUniqueKeyMergeOnWrite()) {
throw new UserException("Partial update is only allowed in unique table with merge-on-write enabled.");
}
for (Column col : olapTable.getFullSchema()) {
boolean exists = false;
for (Column insertCol : targetColumns) {
if (insertCol.getName() != null && insertCol.getName().equals(col.getName())) {
if (!col.isVisible() && !Column.DELETE_SIGN.equals(col.getName())) {
throw new UserException("Partial update should not include invisible column except"
+ " delete sign column: " + col.getName());
}
exists = true;
break;
}
}
if (col.isKey() && !exists) {
throw new UserException("Partial update should include all key columns, missing: " + col.getName());
}
}
isPartialUpdate = true;
partialUpdateCols.addAll(targetColumnNames);
if (isPartialUpdate && olapTable.hasSequenceCol() && olapTable.getSequenceMapCol() != null
&& partialUpdateCols.contains(olapTable.getSequenceMapCol())) {
partialUpdateCols.add(Column.SEQUENCE_COL);
}
// we should re-generate olapTuple
DescriptorTable descTable = analyzer.getDescTbl();
olapTuple = descTable.createTupleDescriptor();
for (Column col : olapTable.getFullSchema()) {
if (!partialUpdateCols.contains(col.getName())) {
continue;
}
SlotDescriptor slotDesc = descTable.addSlotDescriptor(olapTuple);
slotDesc.setIsMaterialized(true);
slotDesc.setType(col.getType());
slotDesc.setColumn(col);
slotDesc.setIsNullable(col.isAllowNull());
}
}
}

View File

@ -127,6 +127,7 @@ public class UpdateStmt extends DdlStmt {
null,
isPartialUpdate,
false);
((NativeInsertStmt) insertStmt).setIsFromDeleteOrUpdateStmt(true);
}
private void analyzeTargetTable(Analyzer analyzer) throws UserException {

View File

@ -47,29 +47,39 @@ public class UnboundOlapTableSink<CHILD_TYPE extends Plan> extends LogicalSink<C
private final List<String> hints;
private final List<String> partitions;
private final boolean isPartialUpdate;
private final boolean isFromNativeInsertStmt;
public UnboundOlapTableSink(List<String> nameParts, List<String> colNames, List<String> hints,
List<String> partitions, CHILD_TYPE child) {
this(nameParts, colNames, hints, partitions, false, Optional.empty(), Optional.empty(), child);
this(nameParts, colNames, hints, partitions, false, false, Optional.empty(), Optional.empty(), child);
}
public UnboundOlapTableSink(List<String> nameParts, List<String> colNames, List<String> hints,
List<String> partitions, boolean isPartialUpdate, CHILD_TYPE child) {
this(nameParts, colNames, hints, partitions, isPartialUpdate, Optional.empty(), Optional.empty(), child);
this(nameParts, colNames, hints, partitions, isPartialUpdate, false,
Optional.empty(), Optional.empty(), child);
}
public UnboundOlapTableSink(List<String> nameParts, List<String> colNames, List<String> hints,
List<String> partitions, boolean isPartialUpdate, boolean isFromNativeInsertStmt, CHILD_TYPE child) {
this(nameParts, colNames, hints, partitions, isPartialUpdate, isFromNativeInsertStmt,
Optional.empty(), Optional.empty(), child);
}
/**
* constructor
*/
public UnboundOlapTableSink(List<String> nameParts, List<String> colNames, List<String> hints,
List<String> partitions, boolean isPartialUpdate, Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, CHILD_TYPE child) {
List<String> partitions, boolean isPartialUpdate, boolean isFromNativeInsertStmt,
Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties,
CHILD_TYPE child) {
super(PlanType.LOGICAL_UNBOUND_OLAP_TABLE_SINK, ImmutableList.of(), groupExpression, logicalProperties, child);
this.nameParts = Utils.copyRequiredList(nameParts);
this.colNames = Utils.copyRequiredList(colNames);
this.hints = Utils.copyRequiredList(hints);
this.partitions = Utils.copyRequiredList(partitions);
this.isPartialUpdate = isPartialUpdate;
this.isFromNativeInsertStmt = isFromNativeInsertStmt;
}
public List<String> getColNames() {
@ -92,11 +102,15 @@ public class UnboundOlapTableSink<CHILD_TYPE extends Plan> extends LogicalSink<C
return isPartialUpdate;
}
public boolean isFromNativeInsertStmt() {
return isFromNativeInsertStmt;
}
@Override
public Plan withChildren(List<Plan> children) {
Preconditions.checkArgument(children.size() == 1, "UnboundOlapTableSink only accepts one child");
return new UnboundOlapTableSink<>(nameParts, colNames, hints, partitions, isPartialUpdate,
groupExpression, Optional.of(getLogicalProperties()), children.get(0));
isFromNativeInsertStmt, groupExpression, Optional.of(getLogicalProperties()), children.get(0));
}
@Override
@ -131,15 +145,15 @@ public class UnboundOlapTableSink<CHILD_TYPE extends Plan> extends LogicalSink<C
@Override
public Plan withGroupExpression(Optional<GroupExpression> groupExpression) {
return new UnboundOlapTableSink<>(nameParts, colNames, hints, partitions,
isPartialUpdate, groupExpression, Optional.of(getLogicalProperties()), child());
return new UnboundOlapTableSink<>(nameParts, colNames, hints, partitions, isPartialUpdate,
isFromNativeInsertStmt, groupExpression, Optional.of(getLogicalProperties()), child());
}
@Override
public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, List<Plan> children) {
return new UnboundOlapTableSink<>(nameParts, colNames, hints, partitions,
isPartialUpdate, groupExpression, logicalProperties, children.get(0));
isPartialUpdate, isFromNativeInsertStmt, groupExpression, logicalProperties, children.get(0));
}
@Override

View File

@ -367,11 +367,34 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla
olapTableSink.getPartitionIds().isEmpty() ? null : olapTableSink.getPartitionIds(),
olapTableSink.isSingleReplicaLoad()
);
if (olapTableSink.isPartialUpdate()) {
if (olapTableSink.isPartialUpdate() || (olapTableSink.isFromNativeInsertStmt()
&& ConnectContext.get().getSessionVariable().isEnableUniqueKeyPartialUpdate())) {
OlapTable olapTable = (OlapTable) olapTableSink.getTargetTable();
if (!olapTable.getEnableUniqueKeyMergeOnWrite()) {
throw new AnalysisException("Partial update is only allowed in"
+ "unique table with merge-on-write enabled.");
}
HashSet<String> partialUpdateCols = new HashSet<>();
for (Column col : olapTable.getFullSchema()) {
boolean exists = false;
for (Column insertCol : olapTableSink.getCols()) {
if (insertCol.getName() != null && insertCol.getName().equals(col.getName())) {
exists = true;
break;
}
}
if (col.isKey() && !exists) {
throw new AnalysisException("Partial update should include all key columns, missing: "
+ col.getName());
}
}
for (Column col : olapTableSink.getCols()) {
partialUpdateCols.add(col.getName());
}
if (olapTable.hasSequenceCol() && olapTable.getSequenceMapCol() != null
&& partialUpdateCols.contains(olapTable.getSequenceMapCol())) {
partialUpdateCols.add(Column.SEQUENCE_COL);
}
sink.setPartialUpdateInputColumns(true, partialUpdateCols);
}
rootFragment.setSink(sink);

View File

@ -411,6 +411,8 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> {
colNames,
ImmutableList.of(),
partitions,
false,
true,
visitQuery(ctx.query()));
if (ctx.explain() != null) {
return withExplain(sink, ctx.explain());

View File

@ -84,6 +84,7 @@ public class BindSink implements AnalysisRuleFactory {
.map(NamedExpression.class::cast)
.collect(ImmutableList.toImmutableList()),
sink.isPartialUpdate(),
sink.isFromNativeInsertStmt(),
sink.child());
// we need to insert all the columns of the target table

View File

@ -40,6 +40,7 @@ public class LogicalOlapTableSinkToPhysicalOlapTableSink extends OneImplementati
sink.getCols(),
ctx.connectContext.getSessionVariable().isEnableSingleReplicaInsert(),
sink.isPartialUpdate(),
sink.isFromNativeInsertStmt(),
Optional.empty(),
sink.getLogicalProperties(),
sink.child());

View File

@ -250,6 +250,8 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync,
sink.getColNames(),
sink.getHints(),
tempPartitionNames,
sink.isPartialUpdate(),
sink.isFromNativeInsertStmt(),
(LogicalPlan) (sink.child(0)));
new InsertIntoTableCommand(copySink, labelName, false).run(ctx, executor);
if (ctx.getState().getStateType() == MysqlStateType.ERR) {

View File

@ -46,10 +46,12 @@ public class LogicalOlapTableSink<CHILD_TYPE extends Plan> extends LogicalSink<C
private final List<Column> cols;
private final List<Long> partitionIds;
private final boolean isPartialUpdate;
private final boolean isFromNativeInsertStmt;
public LogicalOlapTableSink(Database database, OlapTable targetTable, List<Column> cols, List<Long> partitionIds,
List<NamedExpression> outputExprs, boolean isPartialUpdate, CHILD_TYPE child) {
this(database, targetTable, cols, partitionIds, outputExprs, isPartialUpdate,
List<NamedExpression> outputExprs, boolean isPartialUpdate, boolean isFromNativeInsertStmt,
CHILD_TYPE child) {
this(database, targetTable, cols, partitionIds, outputExprs, isPartialUpdate, isFromNativeInsertStmt,
Optional.empty(), Optional.empty(), child);
}
@ -58,13 +60,14 @@ public class LogicalOlapTableSink<CHILD_TYPE extends Plan> extends LogicalSink<C
*/
public LogicalOlapTableSink(Database database, OlapTable targetTable, List<Column> cols,
List<Long> partitionIds, List<NamedExpression> outputExprs, boolean isPartialUpdate,
Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties,
CHILD_TYPE child) {
boolean isFromNativeInsertStmt, Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, CHILD_TYPE child) {
super(PlanType.LOGICAL_OLAP_TABLE_SINK, outputExprs, groupExpression, logicalProperties, child);
this.database = Objects.requireNonNull(database, "database != null in LogicalOlapTableSink");
this.targetTable = Objects.requireNonNull(targetTable, "targetTable != null in LogicalOlapTableSink");
this.cols = Utils.copyRequiredList(cols);
this.isPartialUpdate = isPartialUpdate;
this.isFromNativeInsertStmt = isFromNativeInsertStmt;
this.partitionIds = Utils.copyRequiredList(partitionIds);
}
@ -73,14 +76,14 @@ public class LogicalOlapTableSink<CHILD_TYPE extends Plan> extends LogicalSink<C
.map(NamedExpression.class::cast)
.collect(ImmutableList.toImmutableList());
return new LogicalOlapTableSink<>(database, targetTable, cols, partitionIds, output, isPartialUpdate,
Optional.empty(), Optional.empty(), child);
isFromNativeInsertStmt, Optional.empty(), Optional.empty(), child);
}
@Override
public Plan withChildren(List<Plan> children) {
Preconditions.checkArgument(children.size() == 1, "LogicalOlapTableSink only accepts one child");
return new LogicalOlapTableSink<>(database, targetTable, cols, partitionIds, outputExprs, isPartialUpdate,
Optional.empty(), Optional.empty(), children.get(0));
isFromNativeInsertStmt, Optional.empty(), Optional.empty(), children.get(0));
}
public Database getDatabase() {
@ -103,6 +106,10 @@ public class LogicalOlapTableSink<CHILD_TYPE extends Plan> extends LogicalSink<C
return isPartialUpdate;
}
public boolean isFromNativeInsertStmt() {
return isFromNativeInsertStmt;
}
@Override
public boolean equals(Object o) {
if (this == o) {
@ -115,14 +122,16 @@ public class LogicalOlapTableSink<CHILD_TYPE extends Plan> extends LogicalSink<C
return false;
}
LogicalOlapTableSink<?> that = (LogicalOlapTableSink<?>) o;
return isPartialUpdate == that.isPartialUpdate && Objects.equals(database, that.database)
return isPartialUpdate == that.isPartialUpdate && isFromNativeInsertStmt == that.isFromNativeInsertStmt
&& Objects.equals(database, that.database)
&& Objects.equals(targetTable, that.targetTable) && Objects.equals(cols, that.cols)
&& Objects.equals(partitionIds, that.partitionIds);
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), database, targetTable, cols, partitionIds, isPartialUpdate);
return Objects.hash(super.hashCode(), database, targetTable, cols, partitionIds,
isPartialUpdate, isFromNativeInsertStmt);
}
@Override
@ -133,13 +142,13 @@ public class LogicalOlapTableSink<CHILD_TYPE extends Plan> extends LogicalSink<C
@Override
public Plan withGroupExpression(Optional<GroupExpression> groupExpression) {
return new LogicalOlapTableSink<>(database, targetTable, cols, partitionIds, outputExprs, isPartialUpdate,
groupExpression, Optional.of(getLogicalProperties()), child());
isFromNativeInsertStmt, groupExpression, Optional.of(getLogicalProperties()), child());
}
@Override
public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, List<Plan> children) {
return new LogicalOlapTableSink<>(database, targetTable, cols, partitionIds, outputExprs, isPartialUpdate,
groupExpression, logicalProperties, children.get(0));
isFromNativeInsertStmt, groupExpression, logicalProperties, children.get(0));
}
}

View File

@ -59,14 +59,15 @@ public class PhysicalOlapTableSink<CHILD_TYPE extends Plan> extends PhysicalSink
private final List<Long> partitionIds;
private final boolean singleReplicaLoad;
private final boolean isPartialUpdate;
private final boolean isFromNativeInsertStmt;
/**
* Constructor
*/
public PhysicalOlapTableSink(Database database, OlapTable targetTable, List<Long> partitionIds, List<Column> cols,
boolean singleReplicaLoad, boolean isPartialUpdate, Optional<GroupExpression> groupExpression,
LogicalProperties logicalProperties, CHILD_TYPE child) {
this(database, targetTable, partitionIds, cols, singleReplicaLoad, isPartialUpdate,
boolean singleReplicaLoad, boolean isPartialUpdate, boolean isFromNativeInsertStmt,
Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties, CHILD_TYPE child) {
this(database, targetTable, partitionIds, cols, singleReplicaLoad, isPartialUpdate, isFromNativeInsertStmt,
groupExpression, logicalProperties, PhysicalProperties.GATHER, null, child);
}
@ -74,9 +75,9 @@ public class PhysicalOlapTableSink<CHILD_TYPE extends Plan> extends PhysicalSink
* Constructor
*/
public PhysicalOlapTableSink(Database database, OlapTable targetTable, List<Long> partitionIds, List<Column> cols,
boolean singleReplicaLoad, boolean isPartialUpdate, Optional<GroupExpression> groupExpression,
LogicalProperties logicalProperties, PhysicalProperties physicalProperties, Statistics statistics,
CHILD_TYPE child) {
boolean singleReplicaLoad, boolean isPartialUpdate, boolean isFromNativeInsertStmt,
Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties,
PhysicalProperties physicalProperties, Statistics statistics, CHILD_TYPE child) {
super(PlanType.PHYSICAL_OLAP_TABLE_SINK, groupExpression, logicalProperties, physicalProperties,
statistics, child);
this.database = Objects.requireNonNull(database, "database != null in PhysicalOlapTableSink");
@ -85,6 +86,7 @@ public class PhysicalOlapTableSink<CHILD_TYPE extends Plan> extends PhysicalSink
this.partitionIds = Utils.copyRequiredList(partitionIds);
this.singleReplicaLoad = singleReplicaLoad;
this.isPartialUpdate = isPartialUpdate;
this.isFromNativeInsertStmt = isFromNativeInsertStmt;
}
public Database getDatabase() {
@ -111,12 +113,16 @@ public class PhysicalOlapTableSink<CHILD_TYPE extends Plan> extends PhysicalSink
return isPartialUpdate;
}
public boolean isFromNativeInsertStmt() {
return isFromNativeInsertStmt;
}
@Override
public Plan withChildren(List<Plan> children) {
Preconditions.checkArgument(children.size() == 1, "PhysicalOlapTableSink only accepts one child");
return new PhysicalOlapTableSink<>(database, targetTable, partitionIds, cols,
singleReplicaLoad, isPartialUpdate, groupExpression, getLogicalProperties(), physicalProperties,
statistics, children.get(0));
singleReplicaLoad, isPartialUpdate, isFromNativeInsertStmt, groupExpression,
getLogicalProperties(), physicalProperties, statistics, children.get(0));
}
@Override
@ -130,6 +136,7 @@ public class PhysicalOlapTableSink<CHILD_TYPE extends Plan> extends PhysicalSink
PhysicalOlapTableSink<?> that = (PhysicalOlapTableSink<?>) o;
return singleReplicaLoad == that.singleReplicaLoad
&& isPartialUpdate == that.isPartialUpdate
&& isFromNativeInsertStmt == that.isFromNativeInsertStmt
&& Objects.equals(database, that.database)
&& Objects.equals(targetTable, that.targetTable)
&& Objects.equals(cols, that.cols)
@ -138,7 +145,8 @@ public class PhysicalOlapTableSink<CHILD_TYPE extends Plan> extends PhysicalSink
@Override
public int hashCode() {
return Objects.hash(database, targetTable, cols, partitionIds, singleReplicaLoad, isPartialUpdate);
return Objects.hash(database, targetTable, cols, partitionIds, singleReplicaLoad,
isPartialUpdate, isFromNativeInsertStmt);
}
@Override
@ -172,20 +180,21 @@ public class PhysicalOlapTableSink<CHILD_TYPE extends Plan> extends PhysicalSink
@Override
public Plan withGroupExpression(Optional<GroupExpression> groupExpression) {
return new PhysicalOlapTableSink<>(database, targetTable, partitionIds, cols, singleReplicaLoad,
isPartialUpdate, groupExpression, getLogicalProperties(), child());
isPartialUpdate, isFromNativeInsertStmt, groupExpression, getLogicalProperties(), child());
}
@Override
public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, List<Plan> children) {
return new PhysicalOlapTableSink<>(database, targetTable, partitionIds, cols, singleReplicaLoad,
isPartialUpdate, groupExpression, logicalProperties.get(), children.get(0));
isPartialUpdate, isFromNativeInsertStmt, groupExpression, logicalProperties.get(), children.get(0));
}
@Override
public PhysicalPlan withPhysicalPropertiesAndStats(PhysicalProperties physicalProperties, Statistics statistics) {
return new PhysicalOlapTableSink<>(database, targetTable, partitionIds, cols, singleReplicaLoad,
isPartialUpdate, groupExpression, getLogicalProperties(), physicalProperties, statistics, child());
isPartialUpdate, isFromNativeInsertStmt, groupExpression, getLogicalProperties(),
physicalProperties, statistics, child());
}
/**
@ -225,6 +234,7 @@ public class PhysicalOlapTableSink<CHILD_TYPE extends Plan> extends PhysicalSink
@Override
public PhysicalOlapTableSink<Plan> resetLogicalProperties() {
return new PhysicalOlapTableSink<>(database, targetTable, partitionIds, cols, singleReplicaLoad,
isPartialUpdate, groupExpression, null, physicalProperties, statistics, child());
isPartialUpdate, isFromNativeInsertStmt, groupExpression,
null, physicalProperties, statistics, child());
}
}

View File

@ -400,6 +400,8 @@ public class SessionVariable implements Serializable, Writable {
public static final String ENABLE_MEMTABLE_ON_SINK_NODE =
"enable_memtable_on_sink_node";
public static final String ENABLE_UNIQUE_KEY_PARTIAL_UPDATE = "enable_unique_key_partial_update";
public static final String INVERTED_INDEX_CONJUNCTION_OPT_THRESHOLD = "inverted_index_conjunction_opt_threshold";
public static final String FULL_AUTO_ANALYZE_START_TIME = "full_auto_analyze_start_time";
@ -1192,6 +1194,9 @@ public class SessionVariable implements Serializable, Writable {
flag = VariableMgr.GLOBAL)
public String fullAutoAnalyzeEndTime = "";
@VariableMgr.VarAttr(name = ENABLE_UNIQUE_KEY_PARTIAL_UPDATE, needForward = false)
public boolean enableUniqueKeyPartialUpdate = false;
// If this fe is in fuzzy mode, then will use initFuzzyModeVariables to generate some variables,
// not the default value set in the code.
public void initFuzzyModeVariables() {
@ -2198,6 +2203,14 @@ public class SessionVariable implements Serializable, Writable {
this.truncateCharOrVarcharColumns = truncateCharOrVarcharColumns;
}
public boolean isEnableUniqueKeyPartialUpdate() {
return enableUniqueKeyPartialUpdate;
}
public void setEnableUniqueKeyPartialUpdate(boolean enableUniqueKeyPartialUpdate) {
this.enableUniqueKeyPartialUpdate = enableUniqueKeyPartialUpdate;
}
/**
* Serialize to thrift object.
* Used for rest api.