From d9bd1fad67af08d4d8027035e39855535de4a433 Mon Sep 17 00:00:00 2001 From: wuwenchi Date: Mon, 6 Nov 2023 09:56:46 +0800 Subject: [PATCH] [fix](spark-load)fix-Unique-key-with-MOR-by-sparkload (#26383) When a Unique key table carries the `enable_unique_key_merge_on_write` attribute, the value of the agg type is none. Therefore, when doing sparkload, we need to specify the agg type as `REPLACE`. --- .../doris/load/loadv2/SparkLoadPendingTask.java | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java index 8e9599f774..32749fd8a7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadPendingTask.java @@ -247,10 +247,13 @@ public class SparkLoadPendingTask extends LoadTask { long indexId = entry.getKey(); int schemaHash = table.getSchemaHashByIndexId(indexId); + boolean changeAggType = table.getKeysTypeByIndexId(indexId).equals(KeysType.UNIQUE_KEYS) + && table.getTableProperty().getEnableUniqueKeyMergeOnWrite(); + // columns List etlColumns = Lists.newArrayList(); for (Column column : entry.getValue()) { - etlColumns.add(createEtlColumn(column)); + etlColumns.add(createEtlColumn(column, changeAggType)); } // check distribution type @@ -290,7 +293,7 @@ public class SparkLoadPendingTask extends LoadTask { return etlIndexes; } - private EtlColumn createEtlColumn(Column column) { + private EtlColumn createEtlColumn(Column column, boolean changeAggType) { // column name String name = column.getName().toLowerCase(Locale.ROOT); // column type @@ -304,7 +307,11 @@ public class SparkLoadPendingTask extends LoadTask { // aggregation type String aggregationType = null; if (column.getAggregationType() != null) { - aggregationType = column.getAggregationType().toString(); + if (changeAggType && !column.isKey()) { + aggregationType = AggregateType.REPLACE.toSql(); + } else { + aggregationType = column.getAggregationType().toString(); + } } // default value