[fix](spark-load)fix-Unique-key-with-MOR-by-sparkload (#26383)

When a Unique key table carries the `enable_unique_key_merge_on_write` attribute, the value of the agg type is none. Therefore, when doing sparkload, we need to specify the agg type as `REPLACE`.
This commit is contained in:
wuwenchi
2023-11-06 09:56:46 +08:00
committed by GitHub
parent 99de6c7afe
commit d9bd1fad67

View File

@ -247,10 +247,13 @@ public class SparkLoadPendingTask extends LoadTask {
long indexId = entry.getKey();
int schemaHash = table.getSchemaHashByIndexId(indexId);
boolean changeAggType = table.getKeysTypeByIndexId(indexId).equals(KeysType.UNIQUE_KEYS)
&& table.getTableProperty().getEnableUniqueKeyMergeOnWrite();
// columns
List<EtlColumn> etlColumns = Lists.newArrayList();
for (Column column : entry.getValue()) {
etlColumns.add(createEtlColumn(column));
etlColumns.add(createEtlColumn(column, changeAggType));
}
// check distribution type
@ -290,7 +293,7 @@ public class SparkLoadPendingTask extends LoadTask {
return etlIndexes;
}
private EtlColumn createEtlColumn(Column column) {
private EtlColumn createEtlColumn(Column column, boolean changeAggType) {
// column name
String name = column.getName().toLowerCase(Locale.ROOT);
// column type
@ -304,7 +307,11 @@ public class SparkLoadPendingTask extends LoadTask {
// aggregation type
String aggregationType = null;
if (column.getAggregationType() != null) {
aggregationType = column.getAggregationType().toString();
if (changeAggType && !column.isKey()) {
aggregationType = AggregateType.REPLACE.toSql();
} else {
aggregationType = column.getAggregationType().toString();
}
}
// default value