[Feature](planner) use partial update in update from & delete from (#19262)
This commit is contained in:
@ -130,11 +130,15 @@ public class DeleteStmt extends DdlStmt {
|
||||
+ " Please check the following session variables: "
|
||||
+ String.join(", ", SessionVariable.DEBUG_VARIABLES));
|
||||
}
|
||||
boolean isMow = ((OlapTable) targetTable).getEnableUniqueKeyMergeOnWrite();
|
||||
for (Column column : targetTable.getColumns()) {
|
||||
Expr expr;
|
||||
// in mow, we can use partial update so we only need key column and delete sign
|
||||
if (!column.isVisible() && column.getName().equalsIgnoreCase(Column.DELETE_SIGN)) {
|
||||
expr = new BoolLiteral(true);
|
||||
} else if (column.isKey() || !column.isVisible() || (!column.isAllowNull() && !column.hasDefaultValue())) {
|
||||
} else if (column.isKey()) {
|
||||
expr = new SlotRef(targetTableRef.getAliasAsName(), column.getName());
|
||||
} else if (!isMow && !column.isVisible() || (!column.isAllowNull() && !column.hasDefaultValue())) {
|
||||
expr = new SlotRef(targetTableRef.getAliasAsName(), column.getName());
|
||||
} else {
|
||||
continue;
|
||||
@ -166,13 +170,19 @@ public class DeleteStmt extends DdlStmt {
|
||||
// limit
|
||||
LimitElement.NO_LIMIT
|
||||
);
|
||||
boolean isPartialUpdate = false;
|
||||
if (((OlapTable) targetTable).getEnableUniqueKeyMergeOnWrite()
|
||||
&& cols.size() < targetTable.getColumns().size()) {
|
||||
isPartialUpdate = true;
|
||||
}
|
||||
|
||||
insertStmt = new NativeInsertStmt(
|
||||
new InsertTarget(tableName, null),
|
||||
null,
|
||||
cols,
|
||||
new InsertSource(selectStmt),
|
||||
null);
|
||||
null,
|
||||
isPartialUpdate);
|
||||
}
|
||||
|
||||
private void analyzeTargetTable(Analyzer analyzer) throws UserException {
|
||||
|
||||
@ -67,6 +67,7 @@ import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
@ -131,6 +132,9 @@ public class NativeInsertStmt extends InsertStmt {
|
||||
|
||||
private boolean isValuesOrConstantSelect;
|
||||
|
||||
private boolean isPartialUpdate = false;
|
||||
|
||||
private HashSet<String> partialUpdateCols = new HashSet<String>();
|
||||
|
||||
public NativeInsertStmt(InsertTarget target, String label, List<String> cols, InsertSource source,
|
||||
List<String> hints) {
|
||||
@ -157,6 +161,13 @@ public class NativeInsertStmt extends InsertStmt {
|
||||
&& ((SelectStmt) queryStmt).getTableRefs().isEmpty());
|
||||
}
|
||||
|
||||
public NativeInsertStmt(InsertTarget target, String label, List<String> cols, InsertSource source,
|
||||
List<String> hints, boolean isPartialUpdate) {
|
||||
this(target, label, cols, source, hints);
|
||||
this.isPartialUpdate = isPartialUpdate;
|
||||
this.partialUpdateCols.addAll(cols);
|
||||
}
|
||||
|
||||
public boolean isValuesOrConstantSelect() {
|
||||
return isValuesOrConstantSelect;
|
||||
}
|
||||
@ -344,6 +355,9 @@ public class NativeInsertStmt extends InsertStmt {
|
||||
DescriptorTable descTable = analyzer.getDescTbl();
|
||||
olapTuple = descTable.createTupleDescriptor();
|
||||
for (Column col : olapTable.getFullSchema()) {
|
||||
if (isPartialUpdate && !partialUpdateCols.contains(col.getName())) {
|
||||
continue;
|
||||
}
|
||||
SlotDescriptor slotDesc = descTable.addSlotDescriptor(olapTuple);
|
||||
slotDesc.setIsMaterialized(true);
|
||||
slotDesc.setType(col.getType());
|
||||
@ -377,6 +391,9 @@ public class NativeInsertStmt extends InsertStmt {
|
||||
|
||||
// check columns of target table
|
||||
for (Column col : baseColumns) {
|
||||
if (isPartialUpdate && !partialUpdateCols.contains(col.getName())) {
|
||||
continue;
|
||||
}
|
||||
if (mentionedCols.contains(col.getName())) {
|
||||
continue;
|
||||
}
|
||||
@ -694,6 +711,9 @@ public class NativeInsertStmt extends InsertStmt {
|
||||
List<Pair<String, Expr>> resultExprByName = Lists.newArrayList();
|
||||
// reorder resultExprs in table column order
|
||||
for (Column col : targetTable.getFullSchema()) {
|
||||
if (isPartialUpdate && !partialUpdateCols.contains(col.getName())) {
|
||||
continue;
|
||||
}
|
||||
if (exprByName.containsKey(col.getName())) {
|
||||
resultExprByName.add(Pair.of(col.getName(), exprByName.get(col.getName())));
|
||||
} else {
|
||||
@ -732,6 +752,8 @@ public class NativeInsertStmt extends InsertStmt {
|
||||
if (targetTable instanceof OlapTable) {
|
||||
dataSink = new OlapTableSink((OlapTable) targetTable, olapTuple, targetPartitionIds,
|
||||
analyzer.getContext().getSessionVariable().isEnableSingleReplicaInsert());
|
||||
OlapTableSink sink = (OlapTableSink) dataSink;
|
||||
sink.setPartialUpdateInputColumns(isPartialUpdate, partialUpdateCols);
|
||||
dataPartition = dataSink.getOutputPartition();
|
||||
} else if (targetTable instanceof BrokerTable) {
|
||||
BrokerTable table = (BrokerTable) targetTable;
|
||||
|
||||
@ -57,7 +57,6 @@ import java.util.TreeSet;
|
||||
* {expr}
|
||||
*/
|
||||
public class UpdateStmt extends DdlStmt {
|
||||
|
||||
private TableRef targetTableRef;
|
||||
private TableName tableName;
|
||||
private final List<BinaryPredicate> setExprs;
|
||||
@ -67,6 +66,7 @@ public class UpdateStmt extends DdlStmt {
|
||||
private TableIf targetTable;
|
||||
List<SelectListItem> selectListItems = Lists.newArrayList();
|
||||
List<String> cols = Lists.newArrayList();
|
||||
private boolean isPartialUpdate = false;
|
||||
|
||||
public UpdateStmt(TableRef targetTableRef, List<BinaryPredicate> setExprs, FromClause fromClause, Expr whereExpr) {
|
||||
this.targetTableRef = targetTableRef;
|
||||
@ -124,7 +124,8 @@ public class UpdateStmt extends DdlStmt {
|
||||
null,
|
||||
cols,
|
||||
new InsertSource(selectStmt),
|
||||
null);
|
||||
null,
|
||||
isPartialUpdate);
|
||||
}
|
||||
|
||||
private void analyzeTargetTable(Analyzer analyzer) throws UserException {
|
||||
@ -186,16 +187,36 @@ public class UpdateStmt extends DdlStmt {
|
||||
}
|
||||
|
||||
// step3: generate select list and insert column name list in insert stmt
|
||||
boolean isMow = ((OlapTable) targetTable).getEnableUniqueKeyMergeOnWrite();
|
||||
int setExprCnt = 0;
|
||||
for (Column column : targetTable.getColumns()) {
|
||||
for (BinaryPredicate setExpr : setExprs) {
|
||||
Expr lhs = setExpr.getChild(0);
|
||||
if (((SlotRef) lhs).getColumn().equals(column)) {
|
||||
setExprCnt++;
|
||||
}
|
||||
}
|
||||
}
|
||||
// table with sequence col cannot use partial update cause in MOW, we encode pk
|
||||
// with seq column but we don't know which column is sequence in update
|
||||
if (isMow && ((OlapTable) targetTable).getSequenceCol() == null
|
||||
&& setExprCnt <= targetTable.getColumns().size() * 3 / 10) {
|
||||
isPartialUpdate = true;
|
||||
}
|
||||
for (Column column : targetTable.getColumns()) {
|
||||
Expr expr = new SlotRef(targetTableRef.getAliasAsName(), column.getName());
|
||||
boolean existInExpr = false;
|
||||
for (BinaryPredicate setExpr : setExprs) {
|
||||
Expr lhs = setExpr.getChild(0);
|
||||
if (((SlotRef) lhs).getColumn().equals(column)) {
|
||||
expr = setExpr.getChild(1);
|
||||
existInExpr = true;
|
||||
}
|
||||
}
|
||||
selectListItems.add(new SelectListItem(expr, null));
|
||||
cols.add(column.getName());
|
||||
if (column.isKey() || existInExpr || !isPartialUpdate) {
|
||||
selectListItems.add(new SelectListItem(expr, null));
|
||||
cols.add(column.getName());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user