[branch-2.1] Picks "[Fix](partial update) Fix wrongly update autoinc column in partial update #39996" (#40073)

## Proposed changes

pick https://github.com/apache/doris/pull/39996
This commit is contained in:
bobhan1
2024-08-29 16:41:51 +08:00
committed by GitHub
parent 131238ff71
commit eb6a040a03
10 changed files with 194 additions and 97 deletions

View File

@ -1361,7 +1361,7 @@ public class NativeInsertStmt extends InsertStmt {
if (hasEmptyTargetColumns) {
return;
}
boolean hasMissingColExceptAutoInc = false;
boolean hasMissingColExceptAutoIncKey = false;
for (Column col : olapTable.getFullSchema()) {
boolean exists = false;
for (Column insertCol : targetColumns) {
@ -1374,16 +1374,16 @@ public class NativeInsertStmt extends InsertStmt {
break;
}
}
if (!exists && !col.isAutoInc()) {
if (col.isKey()) {
if (!exists) {
if (col.isKey() && !col.isAutoInc()) {
throw new UserException("Partial update should include all key columns, missing: " + col.getName());
}
if (col.isVisible()) {
hasMissingColExceptAutoInc = true;
if (!(col.isKey() && col.isAutoInc()) && col.isVisible()) {
hasMissingColExceptAutoIncKey = true;
}
}
}
if (!hasMissingColExceptAutoInc) {
if (!hasMissingColExceptAutoIncKey) {
return;
}

View File

@ -276,21 +276,19 @@ public class InsertUtils {
if (unboundLogicalSink.getColNames().isEmpty()) {
((UnboundTableSink<? extends Plan>) unboundLogicalSink).setPartialUpdate(false);
} else {
boolean hasMissingColExceptAutoInc = false;
boolean hasMissingColExceptAutoIncKey = false;
for (Column col : olapTable.getFullSchema()) {
Optional<String> insertCol = unboundLogicalSink.getColNames().stream()
.filter(c -> c.equalsIgnoreCase(col.getName())).findFirst();
if (!col.isAutoInc() && !insertCol.isPresent()) {
if (col.isKey()) {
throw new AnalysisException("Partial update should include all key columns,"
+ " missing: " + col.getName());
}
if (col.isVisible()) {
hasMissingColExceptAutoInc = true;
}
if (col.isKey() && !col.isAutoInc() && !insertCol.isPresent()) {
throw new AnalysisException("Partial update should include all key columns,"
+ " missing: " + col.getName());
}
if (!(col.isAutoInc() && col.isKey()) && !insertCol.isPresent() && col.isVisible()) {
hasMissingColExceptAutoIncKey = true;
}
}
if (!hasMissingColExceptAutoInc) {
if (!hasMissingColExceptAutoIncKey) {
((UnboundTableSink<? extends Plan>) unboundLogicalSink).setPartialUpdate(false);
}
}

View File

@ -148,6 +148,13 @@ public class StreamLoadPlanner {
if (isPartialUpdate && !destTable.getEnableUniqueKeyMergeOnWrite()) {
throw new UserException("Only unique key merge on write support partial update");
}
// try to convert to upsert if only has missing auto-increment key column
boolean hasMissingColExceptAutoIncKey = false;
if (taskInfo.getColumnExprDescs().descs.isEmpty()) {
isPartialUpdate = false;
}
HashSet<String> partialUpdateInputColumns = new HashSet<>();
if (isPartialUpdate) {
for (Column col : destTable.getFullSchema()) {
@ -172,14 +179,23 @@ public class StreamLoadPlanner {
break;
}
}
if (col.isKey() && !existInExpr) {
throw new UserException("Partial update should include all key columns, missing: " + col.getName());
if (!existInExpr) {
if (col.isKey() && !col.isAutoInc()) {
throw new UserException("Partial update should include all key columns, missing: "
+ col.getName());
}
if (!(col.isKey() && col.isAutoInc()) && col.isVisible()) {
hasMissingColExceptAutoIncKey = true;
}
}
}
if (taskInfo.getMergeType() == LoadTask.MergeType.DELETE) {
partialUpdateInputColumns.add(Column.DELETE_SIGN);
}
}
if (isPartialUpdate && !hasMissingColExceptAutoIncKey) {
isPartialUpdate = false;
}
// here we should be full schema to fill the descriptor table
for (Column col : destTable.getFullSchema()) {
if (isPartialUpdate && !partialUpdateInputColumns.contains(col.getName())) {
@ -247,7 +263,7 @@ public class StreamLoadPlanner {
// The load id will pass to csv reader to find the stream load context from new load stream manager
fileScanNode.setLoadInfo(loadId, taskInfo.getTxnId(), destTable, BrokerDesc.createForStreamLoad(),
fileGroup, fileStatus, taskInfo.isStrictMode(), taskInfo.getFileType(), taskInfo.getHiddenColumns(),
taskInfo.isPartialUpdate());
isPartialUpdate);
scanNode = fileScanNode;
scanNode.init(analyzer);
@ -383,6 +399,13 @@ public class StreamLoadPlanner {
if (isPartialUpdate && !destTable.getEnableUniqueKeyMergeOnWrite()) {
throw new UserException("Only unique key merge on write support partial update");
}
// try to convert to upsert if only has missing auto-increment key column
boolean hasMissingColExceptAutoIncKey = false;
if (taskInfo.getColumnExprDescs().descs.isEmpty()) {
isPartialUpdate = false;
}
HashSet<String> partialUpdateInputColumns = new HashSet<>();
if (isPartialUpdate) {
for (Column col : destTable.getFullSchema()) {
@ -407,14 +430,23 @@ public class StreamLoadPlanner {
break;
}
}
if (col.isKey() && !existInExpr) {
throw new UserException("Partial update should include all key columns, missing: " + col.getName());
if (!existInExpr) {
if (col.isKey() && !col.isAutoInc()) {
throw new UserException("Partial update should include all key columns, missing: "
+ col.getName());
}
if (!(col.isKey() && col.isAutoInc()) && col.isVisible()) {
hasMissingColExceptAutoIncKey = true;
}
}
}
if (taskInfo.getMergeType() == LoadTask.MergeType.DELETE) {
partialUpdateInputColumns.add(Column.DELETE_SIGN);
}
}
if (isPartialUpdate && !hasMissingColExceptAutoIncKey) {
isPartialUpdate = false;
}
// here we should be full schema to fill the descriptor table
for (Column col : destTable.getFullSchema()) {
if (isPartialUpdate && !partialUpdateInputColumns.contains(col.getName())) {