[feature-wip](unique-key-merge-on-write) Add option to enable unique-key-merge-on-write, DSIP-018[5/1] (#10814)
* Add option in FE * add opt in be * some fix * update * fix code style * fix typo * fix typo * update * code format
This commit is contained in:
@ -236,7 +236,9 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable {
|
||||
tbl.getCopiedIndexes(),
|
||||
tbl.isInMemory(),
|
||||
tabletType,
|
||||
tbl.getCompressionType());
|
||||
null,
|
||||
tbl.getCompressionType(),
|
||||
tbl.getEnableUniqueKeyMergeOnWrite());
|
||||
createReplicaTask.setBaseTablet(tabletIdMap.get(rollupTabletId), baseSchemaHash);
|
||||
if (this.storageFormat != null) {
|
||||
createReplicaTask.setStorageFormat(this.storageFormat);
|
||||
|
||||
@ -259,7 +259,9 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
|
||||
shadowSchema, bfColumns, bfFpp, countDownLatch, indexes,
|
||||
tbl.isInMemory(),
|
||||
tbl.getPartitionInfo().getTabletType(partitionId),
|
||||
tbl.getCompressionType());
|
||||
null,
|
||||
tbl.getCompressionType(),
|
||||
tbl.getEnableUniqueKeyMergeOnWrite());
|
||||
createReplicaTask.setBaseTablet(partitionIndexTabletMap.get(partitionId, shadowIdxId)
|
||||
.get(shadowTabletId), originSchemaHash);
|
||||
if (this.storageFormat != null) {
|
||||
|
||||
@ -973,7 +973,9 @@ public class RestoreJob extends AbstractJob {
|
||||
localTbl.getCopiedIndexes(),
|
||||
localTbl.isInMemory(),
|
||||
localTbl.getPartitionInfo().getTabletType(restorePart.getId()),
|
||||
localTbl.getCompressionType());
|
||||
null,
|
||||
localTbl.getCompressionType(),
|
||||
localTbl.getEnableUniqueKeyMergeOnWrite());
|
||||
task.setInRestoreMode(true);
|
||||
batchTask.addTask(task);
|
||||
}
|
||||
|
||||
@ -1761,6 +1761,20 @@ public class OlapTable extends Table {
|
||||
return tableProperty.getRemoteStoragePolicy();
|
||||
}
|
||||
|
||||
public void setEnableUniqueKeyMergeOnWrite(boolean speedup) {
|
||||
if (tableProperty == null) {
|
||||
tableProperty = new TableProperty(new HashMap<>());
|
||||
}
|
||||
tableProperty.setEnableUniqueKeyMergeOnWrite(speedup);
|
||||
}
|
||||
|
||||
public boolean getEnableUniqueKeyMergeOnWrite() {
|
||||
if (tableProperty == null) {
|
||||
return false;
|
||||
}
|
||||
return tableProperty.getEnableUniqueKeyMergeOnWrite();
|
||||
}
|
||||
|
||||
// For non partitioned table:
|
||||
// The table's distribute hash columns need to be a subset of the aggregate columns.
|
||||
//
|
||||
|
||||
@ -239,6 +239,15 @@ public class TableProperty implements Writable {
|
||||
return compressionType;
|
||||
}
|
||||
|
||||
public void setEnableUniqueKeyMergeOnWrite(boolean enable) {
|
||||
properties.put(PropertyAnalyzer.ENABLE_UNIQUE_KEY_MERGE_ON_WRITE, Boolean.toString(enable));
|
||||
}
|
||||
|
||||
public boolean getEnableUniqueKeyMergeOnWrite() {
|
||||
return Boolean.parseBoolean(properties.getOrDefault(
|
||||
PropertyAnalyzer.ENABLE_UNIQUE_KEY_MERGE_ON_WRITE, "false"));
|
||||
}
|
||||
|
||||
public void buildReplicaAllocation() {
|
||||
try {
|
||||
// Must copy the properties because "analyzeReplicaAllocation" with remove the property
|
||||
|
||||
@ -114,6 +114,8 @@ public class PropertyAnalyzer {
|
||||
private static final double MAX_FPP = 0.05;
|
||||
private static final double MIN_FPP = 0.0001;
|
||||
|
||||
public static final String ENABLE_UNIQUE_KEY_MERGE_ON_WRITE = "enable_unique_key_merge_on_write";
|
||||
|
||||
/**
|
||||
* check and replace members of DataProperty by properties.
|
||||
*
|
||||
@ -666,4 +668,22 @@ public class PropertyAnalyzer {
|
||||
DataSortInfo dataSortInfo = new DataSortInfo(sortType, colNum);
|
||||
return dataSortInfo;
|
||||
}
|
||||
|
||||
public static boolean analyzeUniqueKeyMergeOnWrite(Map<String, String> properties) throws AnalysisException {
|
||||
if (properties == null || properties.isEmpty()) {
|
||||
return false;
|
||||
}
|
||||
String value = properties.get(PropertyAnalyzer.ENABLE_UNIQUE_KEY_MERGE_ON_WRITE);
|
||||
if (value == null) {
|
||||
return false;
|
||||
}
|
||||
properties.remove(PropertyAnalyzer.ENABLE_UNIQUE_KEY_MERGE_ON_WRITE);
|
||||
if (value.equals("true")) {
|
||||
return true;
|
||||
} else if (value.equals("false")) {
|
||||
return false;
|
||||
}
|
||||
throw new AnalysisException(PropertyAnalyzer.ENABLE_UNIQUE_KEY_MERGE_ON_WRITE
|
||||
+ " must be `true` or `false`");
|
||||
}
|
||||
}
|
||||
|
||||
@ -1321,7 +1321,8 @@ public class InternalDataSource implements DataSourceIf<Database> {
|
||||
dataProperty.getStorageMedium(), singlePartitionDesc.getReplicaAlloc(),
|
||||
singlePartitionDesc.getVersionInfo(), bfColumns, olapTable.getBfFpp(), tabletIdSet,
|
||||
olapTable.getCopiedIndexes(), singlePartitionDesc.isInMemory(), olapTable.getStorageFormat(),
|
||||
singlePartitionDesc.getTabletType(), olapTable.getCompressionType(), olapTable.getDataSortInfo());
|
||||
singlePartitionDesc.getTabletType(), olapTable.getCompressionType(), olapTable.getDataSortInfo(),
|
||||
olapTable.getEnableUniqueKeyMergeOnWrite());
|
||||
|
||||
// check again
|
||||
table = db.getOlapTableOrDdlException(tableName);
|
||||
@ -1542,7 +1543,7 @@ public class InternalDataSource implements DataSourceIf<Database> {
|
||||
DistributionInfo distributionInfo, TStorageMedium storageMedium, ReplicaAllocation replicaAlloc,
|
||||
Long versionInfo, Set<String> bfColumns, double bfFpp, Set<Long> tabletIdSet, List<Index> indexes,
|
||||
boolean isInMemory, TStorageFormat storageFormat, TTabletType tabletType, TCompressionType compressionType,
|
||||
DataSortInfo dataSortInfo) throws DdlException {
|
||||
DataSortInfo dataSortInfo, boolean enableUniqueKeyMergeOnWrite) throws DdlException {
|
||||
// create base index first.
|
||||
Preconditions.checkArgument(baseIndexId != -1);
|
||||
MaterializedIndex baseIndex = new MaterializedIndex(baseIndexId, IndexState.NORMAL);
|
||||
@ -1602,7 +1603,7 @@ public class InternalDataSource implements DataSourceIf<Database> {
|
||||
CreateReplicaTask task = new CreateReplicaTask(backendId, dbId, tableId, partitionId, indexId,
|
||||
tabletId, replicaId, shortKeyColumnCount, schemaHash, version, keysType, storageType,
|
||||
storageMedium, schema, bfColumns, bfFpp, countDownLatch, indexes, isInMemory, tabletType,
|
||||
dataSortInfo, compressionType);
|
||||
dataSortInfo, compressionType, enableUniqueKeyMergeOnWrite);
|
||||
task.setStorageFormat(storageFormat);
|
||||
batchTask.addTask(task);
|
||||
// add to AgentTaskQueue for handling finish report.
|
||||
@ -1741,6 +1742,14 @@ public class InternalDataSource implements DataSourceIf<Database> {
|
||||
keysDesc.keysColumnSize(), storageFormat);
|
||||
olapTable.setDataSortInfo(dataSortInfo);
|
||||
|
||||
boolean enableUniqueKeyMergeOnWrite = false;
|
||||
try {
|
||||
enableUniqueKeyMergeOnWrite = PropertyAnalyzer.analyzeUniqueKeyMergeOnWrite(properties);
|
||||
} catch (AnalysisException e) {
|
||||
throw new DdlException(e.getMessage());
|
||||
}
|
||||
olapTable.setEnableUniqueKeyMergeOnWrite(enableUniqueKeyMergeOnWrite);
|
||||
|
||||
// analyze bloom filter columns
|
||||
Set<String> bfColumns = null;
|
||||
double bfFpp = 0;
|
||||
@ -1919,7 +1928,7 @@ public class InternalDataSource implements DataSourceIf<Database> {
|
||||
partitionDistributionInfo, partitionInfo.getDataProperty(partitionId).getStorageMedium(),
|
||||
partitionInfo.getReplicaAllocation(partitionId), versionInfo, bfColumns, bfFpp, tabletIdSet,
|
||||
olapTable.getCopiedIndexes(), isInMemory, storageFormat, tabletType, compressionType,
|
||||
olapTable.getDataSortInfo());
|
||||
olapTable.getDataSortInfo(), olapTable.getEnableUniqueKeyMergeOnWrite());
|
||||
olapTable.addPartition(partition);
|
||||
} else if (partitionInfo.getType() == PartitionType.RANGE
|
||||
|| partitionInfo.getType() == PartitionType.LIST) {
|
||||
@ -1970,7 +1979,7 @@ public class InternalDataSource implements DataSourceIf<Database> {
|
||||
partitionInfo.getReplicaAllocation(entry.getValue()), versionInfo, bfColumns, bfFpp,
|
||||
tabletIdSet, olapTable.getCopiedIndexes(), isInMemory, storageFormat,
|
||||
partitionInfo.getTabletType(entry.getValue()), compressionType,
|
||||
olapTable.getDataSortInfo());
|
||||
olapTable.getDataSortInfo(), olapTable.getEnableUniqueKeyMergeOnWrite());
|
||||
olapTable.addPartition(partition);
|
||||
}
|
||||
} else {
|
||||
@ -2348,7 +2357,7 @@ public class InternalDataSource implements DataSourceIf<Database> {
|
||||
copiedTbl.getCopiedBfColumns(), copiedTbl.getBfFpp(), tabletIdSet, copiedTbl.getCopiedIndexes(),
|
||||
copiedTbl.isInMemory(), copiedTbl.getStorageFormat(),
|
||||
copiedTbl.getPartitionInfo().getTabletType(oldPartitionId), copiedTbl.getCompressionType(),
|
||||
copiedTbl.getDataSortInfo());
|
||||
copiedTbl.getDataSortInfo(), copiedTbl.getEnableUniqueKeyMergeOnWrite());
|
||||
newPartitions.add(newPartition);
|
||||
}
|
||||
} catch (DdlException e) {
|
||||
|
||||
@ -603,7 +603,9 @@ public class ReportHandler extends Daemon {
|
||||
olapTable.getCopiedIndexes(),
|
||||
olapTable.isInMemory(),
|
||||
olapTable.getPartitionInfo().getTabletType(partitionId),
|
||||
olapTable.getCompressionType());
|
||||
null,
|
||||
olapTable.getCompressionType(),
|
||||
olapTable.getEnableUniqueKeyMergeOnWrite());
|
||||
createReplicaTask.setIsRecoverTask(true);
|
||||
createReplicaBatchTask.addTask(createReplicaTask);
|
||||
} else {
|
||||
|
||||
@ -89,38 +89,7 @@ public class CreateReplicaTask extends AgentTask {
|
||||
|
||||
private DataSortInfo dataSortInfo;
|
||||
|
||||
public CreateReplicaTask(long backendId, long dbId, long tableId, long partitionId, long indexId, long tabletId,
|
||||
long replicaId, short shortKeyColumnCount, int schemaHash, long version,
|
||||
KeysType keysType, TStorageType storageType,
|
||||
TStorageMedium storageMedium, List<Column> columns,
|
||||
Set<String> bfColumns, double bfFpp, MarkedCountDownLatch<Long, Long> latch,
|
||||
List<Index> indexes,
|
||||
boolean isInMemory,
|
||||
TTabletType tabletType, TCompressionType compressionType) {
|
||||
super(null, backendId, TTaskType.CREATE, dbId, tableId, partitionId, indexId, tabletId);
|
||||
|
||||
this.replicaId = replicaId;
|
||||
this.shortKeyColumnCount = shortKeyColumnCount;
|
||||
this.schemaHash = schemaHash;
|
||||
|
||||
this.version = version;
|
||||
|
||||
this.keysType = keysType;
|
||||
this.storageType = storageType;
|
||||
this.storageMedium = storageMedium;
|
||||
this.compressionType = compressionType;
|
||||
|
||||
this.columns = columns;
|
||||
|
||||
this.bfColumns = bfColumns;
|
||||
this.indexes = indexes;
|
||||
this.bfFpp = bfFpp;
|
||||
|
||||
this.latch = latch;
|
||||
|
||||
this.isInMemory = isInMemory;
|
||||
this.tabletType = tabletType;
|
||||
}
|
||||
private boolean enableUniqueKeyMergeOnWrite;
|
||||
|
||||
public CreateReplicaTask(long backendId, long dbId, long tableId, long partitionId, long indexId, long tabletId,
|
||||
long replicaId, short shortKeyColumnCount, int schemaHash, long version,
|
||||
@ -131,7 +100,8 @@ public class CreateReplicaTask extends AgentTask {
|
||||
boolean isInMemory,
|
||||
TTabletType tabletType,
|
||||
DataSortInfo dataSortInfo,
|
||||
TCompressionType compressionType) {
|
||||
TCompressionType compressionType,
|
||||
boolean enableUniqueKeyMergeOnWrite) {
|
||||
super(null, backendId, TTaskType.CREATE, dbId, tableId, partitionId, indexId, tabletId);
|
||||
|
||||
this.replicaId = replicaId;
|
||||
@ -156,6 +126,7 @@ public class CreateReplicaTask extends AgentTask {
|
||||
this.isInMemory = isInMemory;
|
||||
this.tabletType = tabletType;
|
||||
this.dataSortInfo = dataSortInfo;
|
||||
this.enableUniqueKeyMergeOnWrite = (keysType == KeysType.UNIQUE_KEYS && enableUniqueKeyMergeOnWrite);
|
||||
}
|
||||
|
||||
public void setIsRecoverTask(boolean isRecoverTask) {
|
||||
@ -277,6 +248,7 @@ public class CreateReplicaTask extends AgentTask {
|
||||
|
||||
createTabletReq.setTabletType(tabletType);
|
||||
createTabletReq.setCompressionType(compressionType);
|
||||
createTabletReq.setEnableUniqueKeyMergeOnWrite(enableUniqueKeyMergeOnWrite);
|
||||
return createTabletReq;
|
||||
}
|
||||
}
|
||||
|
||||
@ -109,7 +109,7 @@ public class AgentTaskTest {
|
||||
version, KeysType.AGG_KEYS,
|
||||
storageType, TStorageMedium.SSD,
|
||||
columns, null, 0, latch, null,
|
||||
false, TTabletType.TABLET_TYPE_DISK, TCompressionType.LZ4F);
|
||||
false, TTabletType.TABLET_TYPE_DISK, null, TCompressionType.LZ4F, false);
|
||||
|
||||
// drop
|
||||
dropTask = new DropReplicaTask(backendId1, tabletId1, replicaId1, schemaHash1);
|
||||
|
||||
Reference in New Issue
Block a user