[Enhancement](config) optimize behavior of default_storage_medium (#20739)
This commit is contained in:
@ -1091,7 +1091,7 @@ public class RestoreJob extends AbstractJob {
|
||||
// replicas
|
||||
try {
|
||||
Map<Tag, List<Long>> beIds = Env.getCurrentSystemInfo()
|
||||
.selectBackendIdsForReplicaCreation(replicaAlloc, null);
|
||||
.selectBackendIdsForReplicaCreation(replicaAlloc, null, false, false);
|
||||
for (Map.Entry<Tag, List<Long>> entry : beIds.entrySet()) {
|
||||
for (Long beId : entry.getValue()) {
|
||||
long newReplicaId = env.getNextId();
|
||||
|
||||
@ -49,6 +49,7 @@ public class DataProperty implements Writable, GsonPostProcessable {
|
||||
private String storagePolicy;
|
||||
@SerializedName(value = "isMutable")
|
||||
private boolean isMutable = true;
|
||||
private boolean storageMediumSpecified;
|
||||
|
||||
private DataProperty() {
|
||||
// for persist
|
||||
@ -97,6 +98,10 @@ public class DataProperty implements Writable, GsonPostProcessable {
|
||||
return storagePolicy;
|
||||
}
|
||||
|
||||
public boolean isStorageMediumSpecified() {
|
||||
return storageMediumSpecified;
|
||||
}
|
||||
|
||||
public boolean isMutable() {
|
||||
return isMutable;
|
||||
}
|
||||
@ -105,6 +110,10 @@ public class DataProperty implements Writable, GsonPostProcessable {
|
||||
isMutable = mutable;
|
||||
}
|
||||
|
||||
public void setStorageMediumSpecified(boolean isSpecified) {
|
||||
storageMediumSpecified = isSpecified;
|
||||
}
|
||||
|
||||
public static DataProperty read(DataInput in) throws IOException {
|
||||
if (Env.getCurrentEnvJournalVersion() >= FeMetaVersion.VERSION_108) {
|
||||
String json = Text.readString(in);
|
||||
@ -164,4 +173,5 @@ public class DataProperty implements Writable, GsonPostProcessable {
|
||||
// storagePolicy is a newly added field, it may be null when replaying from old version.
|
||||
this.storagePolicy = Strings.nullToEmpty(this.storagePolicy);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -578,7 +578,7 @@ public class OlapTable extends Table {
|
||||
try {
|
||||
Map<Tag, List<Long>> tag2beIds =
|
||||
Env.getCurrentSystemInfo().selectBackendIdsForReplicaCreation(
|
||||
replicaAlloc, null);
|
||||
replicaAlloc, null, false, false);
|
||||
for (Map.Entry<Tag, List<Long>> entry3 : tag2beIds.entrySet()) {
|
||||
for (Long beId : entry3.getValue()) {
|
||||
long newReplicaId = env.getNextId();
|
||||
|
||||
@ -228,7 +228,7 @@ public class DynamicPartitionUtil {
|
||||
ErrorReport.reportDdlException(ErrorCode.ERROR_DYNAMIC_PARTITION_REPLICATION_NUM_FORMAT, val);
|
||||
}
|
||||
ReplicaAllocation replicaAlloc = new ReplicaAllocation(Short.valueOf(val));
|
||||
Env.getCurrentSystemInfo().selectBackendIdsForReplicaCreation(replicaAlloc, null);
|
||||
Env.getCurrentSystemInfo().selectBackendIdsForReplicaCreation(replicaAlloc, null, false, true);
|
||||
}
|
||||
|
||||
private static void checkReplicaAllocation(ReplicaAllocation replicaAlloc, int hotPartitionNum,
|
||||
@ -237,13 +237,14 @@ public class DynamicPartitionUtil {
|
||||
ErrorReport.reportDdlException(ErrorCode.ERROR_DYNAMIC_PARTITION_REPLICATION_NUM_ZERO);
|
||||
}
|
||||
|
||||
Env.getCurrentSystemInfo().selectBackendIdsForReplicaCreation(replicaAlloc, null);
|
||||
Env.getCurrentSystemInfo().selectBackendIdsForReplicaCreation(replicaAlloc, null, false, true);
|
||||
if (hotPartitionNum <= 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
Env.getCurrentSystemInfo().selectBackendIdsForReplicaCreation(replicaAlloc, TStorageMedium.SSD);
|
||||
Env.getCurrentSystemInfo().selectBackendIdsForReplicaCreation(replicaAlloc, TStorageMedium.SSD, false,
|
||||
true);
|
||||
} catch (DdlException e) {
|
||||
throw new DdlException("Failed to find enough backend for ssd storage medium. When setting "
|
||||
+ DynamicPartitionProperty.HOT_PARTITION_NUM + " > 0, the hot partitions will store "
|
||||
|
||||
@ -140,12 +140,6 @@ public class PropertyAnalyzer {
|
||||
|
||||
public static final String PROPERTIES_ENABLE_DUPLICATE_WITHOUT_KEYS_BY_DEFAULT =
|
||||
"enable_duplicate_without_keys_by_default";
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(PropertyAnalyzer.class);
|
||||
private static final String COMMA_SEPARATOR = ",";
|
||||
private static final double MAX_FPP = 0.05;
|
||||
private static final double MIN_FPP = 0.0001;
|
||||
|
||||
// For unique key data model, the feature Merge-on-Write will leverage a primary
|
||||
// key index and a delete-bitmap to mark duplicate keys as deleted in load stage,
|
||||
// which can avoid the merging cost in read stage, and accelerate the aggregation
|
||||
@ -153,6 +147,10 @@ public class PropertyAnalyzer {
|
||||
// For the detail design, see the [DISP-018](https://cwiki.apache.org/confluence/
|
||||
// display/DORIS/DSIP-018%3A+Support+Merge-On-Write+implementation+for+UNIQUE+KEY+data+model)
|
||||
public static final String ENABLE_UNIQUE_KEY_MERGE_ON_WRITE = "enable_unique_key_merge_on_write";
|
||||
private static final Logger LOG = LogManager.getLogger(PropertyAnalyzer.class);
|
||||
private static final String COMMA_SEPARATOR = ",";
|
||||
private static final double MAX_FPP = 0.05;
|
||||
private static final double MIN_FPP = 0.0001;
|
||||
|
||||
/**
|
||||
* check and replace members of DataProperty by properties.
|
||||
@ -172,6 +170,7 @@ public class PropertyAnalyzer {
|
||||
long cooldownTimestamp = oldDataProperty.getCooldownTimeMs();
|
||||
String newStoragePolicy = oldDataProperty.getStoragePolicy();
|
||||
boolean hasStoragePolicy = false;
|
||||
boolean storageMediumSpecified = false;
|
||||
|
||||
for (Map.Entry<String, String> entry : properties.entrySet()) {
|
||||
String key = entry.getKey();
|
||||
@ -179,8 +178,10 @@ public class PropertyAnalyzer {
|
||||
if (key.equalsIgnoreCase(PROPERTIES_STORAGE_MEDIUM)) {
|
||||
if (value.equalsIgnoreCase(TStorageMedium.SSD.name())) {
|
||||
storageMedium = TStorageMedium.SSD;
|
||||
storageMediumSpecified = true;
|
||||
} else if (value.equalsIgnoreCase(TStorageMedium.HDD.name())) {
|
||||
storageMedium = TStorageMedium.HDD;
|
||||
storageMediumSpecified = true;
|
||||
} else {
|
||||
throw new AnalysisException("Invalid storage medium: " + value);
|
||||
}
|
||||
@ -247,7 +248,12 @@ public class PropertyAnalyzer {
|
||||
boolean mutable = PropertyAnalyzer.analyzeBooleanProp(properties, PROPERTIES_MUTABLE, true);
|
||||
properties.remove(PROPERTIES_MUTABLE);
|
||||
|
||||
return new DataProperty(storageMedium, cooldownTimestamp, newStoragePolicy, mutable);
|
||||
DataProperty dataProperty = new DataProperty(storageMedium, cooldownTimestamp, newStoragePolicy, mutable);
|
||||
// check the state of data property
|
||||
if (storageMediumSpecified) {
|
||||
dataProperty.setStorageMediumSpecified(true);
|
||||
}
|
||||
return dataProperty;
|
||||
}
|
||||
|
||||
public static short analyzeShortKeyColumnCount(Map<String, String> properties) throws AnalysisException {
|
||||
|
||||
@ -222,19 +222,14 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
fullNameToDb.put(db.getFullName(), db);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getId() {
|
||||
return INTERNAL_CATALOG_ID;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getType() {
|
||||
return "internal";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getComment() {
|
||||
return "Doris internal catalog";
|
||||
public long getId() {
|
||||
return INTERNAL_CATALOG_ID;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -247,6 +242,10 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
return Lists.newArrayList(fullNameToDb.keySet());
|
||||
}
|
||||
|
||||
public List<Long> getDbIds() {
|
||||
return Lists.newArrayList(idToDb.keySet());
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public Database getDbNullable(String dbName) {
|
||||
@ -278,16 +277,6 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
return idToDb.get(dbId);
|
||||
}
|
||||
|
||||
public TableName getTableNameByTableId(Long tableId) {
|
||||
for (Database db : fullNameToDb.values()) {
|
||||
Table table = db.getTableNullable(tableId);
|
||||
if (table != null) {
|
||||
return new TableName("", db.getFullName(), table.getName());
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, String> getProperties() {
|
||||
return Maps.newHashMap();
|
||||
@ -303,6 +292,21 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
LOG.warn("Ignore the modify catalog props in build-in catalog.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getComment() {
|
||||
return "Doris internal catalog";
|
||||
}
|
||||
|
||||
public TableName getTableNameByTableId(Long tableId) {
|
||||
for (Database db : fullNameToDb.values()) {
|
||||
Table table = db.getTableNullable(tableId);
|
||||
if (table != null) {
|
||||
return new TableName("", db.getFullName(), table.getName());
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
// Use tryLock to avoid potential dead lock
|
||||
private boolean tryLock(boolean mustLock) {
|
||||
while (true) {
|
||||
@ -336,10 +340,6 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
}
|
||||
}
|
||||
|
||||
public List<Long> getDbIds() {
|
||||
return Lists.newArrayList(idToDb.keySet());
|
||||
}
|
||||
|
||||
public List<Database> getDbs() {
|
||||
return Lists.newArrayList(idToDb.values());
|
||||
}
|
||||
@ -1225,7 +1225,7 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
ColumnDef columnDef;
|
||||
if (resultExpr.getSrcSlotRef() == null) {
|
||||
columnDef = new ColumnDef(name, typeDef, false, null,
|
||||
true, false, new DefaultValue(false, null), "");
|
||||
true, false, new DefaultValue(false, null), "");
|
||||
} else {
|
||||
Column column = resultExpr.getSrcSlotRef().getDesc().getColumn();
|
||||
boolean setDefault = StringUtils.isNotBlank(column.getDefaultValue());
|
||||
@ -1517,7 +1517,7 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
olapTable.getEnableUniqueKeyMergeOnWrite(), storagePolicy, idGeneratorBuffer,
|
||||
olapTable.disableAutoCompaction(), olapTable.enableSingleReplicaCompaction(),
|
||||
olapTable.skipWriteIndexOnLoad(), olapTable.storeRowColumn(), olapTable.isDynamicSchema(),
|
||||
binlogConfig);
|
||||
binlogConfig, dataProperty.isStorageMediumSpecified());
|
||||
|
||||
// check again
|
||||
olapTable = db.getOlapTableOrDdlException(tableName);
|
||||
@ -1744,7 +1744,8 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
DataSortInfo dataSortInfo, boolean enableUniqueKeyMergeOnWrite, String storagePolicy,
|
||||
IdGeneratorBuffer idGeneratorBuffer, boolean disableAutoCompaction,
|
||||
boolean enableSingleReplicaCompaction, boolean skipWriteIndexOnLoad,
|
||||
boolean storeRowColumn, boolean isDynamicSchema, BinlogConfig binlogConfig) throws DdlException {
|
||||
boolean storeRowColumn, boolean isDynamicSchema, BinlogConfig binlogConfig,
|
||||
boolean isStorageMediumSpecified) throws DdlException {
|
||||
// create base index first.
|
||||
Preconditions.checkArgument(baseIndexId != -1);
|
||||
MaterializedIndex baseIndex = new MaterializedIndex(baseIndexId, IndexState.NORMAL);
|
||||
@ -1783,7 +1784,7 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
int schemaHash = indexMeta.getSchemaHash();
|
||||
TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, schemaHash, storageMedium);
|
||||
createTablets(clusterName, index, ReplicaState.NORMAL, distributionInfo, version, replicaAlloc, tabletMeta,
|
||||
tabletIdSet, idGeneratorBuffer);
|
||||
tabletIdSet, idGeneratorBuffer, isStorageMediumSpecified);
|
||||
|
||||
boolean ok = false;
|
||||
String errMsg = null;
|
||||
@ -2190,7 +2191,7 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
List<Column> rollupColumns = Env.getCurrentEnv().getMaterializedViewHandler()
|
||||
.checkAndPrepareMaterializedView(addRollupClause, olapTable, baseRollupIndex, false);
|
||||
short rollupShortKeyColumnCount = Env.calcShortKeyColumnCount(rollupColumns, alterClause.getProperties(),
|
||||
true/*isKeysRequired*/);
|
||||
true/*isKeysRequired*/);
|
||||
int rollupSchemaHash = Util.generateSchemaHash();
|
||||
long rollupIndexId = idGeneratorBuffer.getNextId();
|
||||
olapTable.setIndexMeta(rollupIndexId, addRollupClause.getRollupName(), rollupColumns, schemaVersion,
|
||||
@ -2269,7 +2270,6 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
"Database " + db.getFullName() + " create unpartitioned table " + tableName + " increasing "
|
||||
+ totalReplicaNum + " of replica exceeds quota[" + db.getReplicaQuota() + "]");
|
||||
}
|
||||
// create partition
|
||||
Partition partition = createPartitionWithIndices(db.getClusterName(), db.getId(), olapTable.getId(),
|
||||
olapTable.getBaseIndexId(), partitionId, partitionName, olapTable.getIndexIdToMeta(),
|
||||
partitionDistributionInfo, partitionInfo.getDataProperty(partitionId).getStorageMedium(),
|
||||
@ -2278,7 +2278,8 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
olapTable.getDataSortInfo(), olapTable.getEnableUniqueKeyMergeOnWrite(), storagePolicy,
|
||||
idGeneratorBuffer, olapTable.disableAutoCompaction(),
|
||||
olapTable.enableSingleReplicaCompaction(), skipWriteIndexOnLoad,
|
||||
storeRowColumn, isDynamicSchema, binlogConfigForTask);
|
||||
storeRowColumn, isDynamicSchema, binlogConfigForTask,
|
||||
partitionInfo.getDataProperty(partitionId).isStorageMediumSpecified());
|
||||
olapTable.addPartition(partition);
|
||||
} else if (partitionInfo.getType() == PartitionType.RANGE
|
||||
|| partitionInfo.getType() == PartitionType.LIST) {
|
||||
@ -2329,22 +2330,23 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
&& !Strings.isNullOrEmpty(partionStoragePolicy)) {
|
||||
throw new AnalysisException(
|
||||
"Can not create UNIQUE KEY table that enables Merge-On-write"
|
||||
+ " with storage policy(" + partionStoragePolicy + ")");
|
||||
+ " with storage policy(" + partionStoragePolicy + ")");
|
||||
}
|
||||
if (!partionStoragePolicy.equals("")) {
|
||||
storagePolicy = partionStoragePolicy;
|
||||
}
|
||||
Env.getCurrentEnv().getPolicyMgr().checkStoragePolicyExist(storagePolicy);
|
||||
Partition partition = createPartitionWithIndices(db.getClusterName(), db.getId(), olapTable.getId(),
|
||||
olapTable.getBaseIndexId(), entry.getValue(), entry.getKey(), olapTable.getIndexIdToMeta(),
|
||||
partitionDistributionInfo, dataProperty.getStorageMedium(),
|
||||
partitionInfo.getReplicaAllocation(entry.getValue()), versionInfo, bfColumns, bfFpp,
|
||||
tabletIdSet, olapTable.getCopiedIndexes(), isInMemory, storageFormat,
|
||||
partitionInfo.getTabletType(entry.getValue()), compressionType,
|
||||
|
||||
Partition partition = createPartitionWithIndices(db.getClusterName(), db.getId(),
|
||||
olapTable.getId(), olapTable.getBaseIndexId(), entry.getValue(), entry.getKey(),
|
||||
olapTable.getIndexIdToMeta(), partitionDistributionInfo,
|
||||
dataProperty.getStorageMedium(), partitionInfo.getReplicaAllocation(entry.getValue()),
|
||||
versionInfo, bfColumns, bfFpp, tabletIdSet, olapTable.getCopiedIndexes(), isInMemory,
|
||||
storageFormat, partitionInfo.getTabletType(entry.getValue()), compressionType,
|
||||
olapTable.getDataSortInfo(), olapTable.getEnableUniqueKeyMergeOnWrite(), storagePolicy,
|
||||
idGeneratorBuffer, olapTable.disableAutoCompaction(),
|
||||
olapTable.enableSingleReplicaCompaction(), skipWriteIndexOnLoad,
|
||||
storeRowColumn, isDynamicSchema, binlogConfigForTask);
|
||||
olapTable.enableSingleReplicaCompaction(), skipWriteIndexOnLoad, storeRowColumn,
|
||||
isDynamicSchema, binlogConfigForTask, dataProperty.isStorageMediumSpecified());
|
||||
olapTable.addPartition(partition);
|
||||
}
|
||||
} else {
|
||||
@ -2380,7 +2382,7 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
// register or remove table from DynamicPartition after table created
|
||||
DynamicPartitionUtil.registerOrRemoveDynamicPartitionTable(db.getId(), olapTable, false);
|
||||
Env.getCurrentEnv().getDynamicPartitionScheduler()
|
||||
.executeDynamicPartitionFirstTime(db.getId(), olapTable.getId());
|
||||
.executeDynamicPartitionFirstTime(db.getId(), olapTable.getId());
|
||||
Env.getCurrentEnv().getDynamicPartitionScheduler()
|
||||
.createOrUpdateRuntimeInfo(tableId, DynamicPartitionScheduler.LAST_UPDATE_TIME,
|
||||
TimeUtils.getCurrentFormatTime());
|
||||
@ -2528,7 +2530,8 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
@VisibleForTesting
|
||||
public void createTablets(String clusterName, MaterializedIndex index, ReplicaState replicaState,
|
||||
DistributionInfo distributionInfo, long version, ReplicaAllocation replicaAlloc, TabletMeta tabletMeta,
|
||||
Set<Long> tabletIdSet, IdGeneratorBuffer idGeneratorBuffer) throws DdlException {
|
||||
Set<Long> tabletIdSet, IdGeneratorBuffer idGeneratorBuffer, boolean isStorageMediumSpecified)
|
||||
throws DdlException {
|
||||
ColocateTableIndex colocateIndex = Env.getCurrentColocateIndex();
|
||||
Map<Tag, List<List<Long>>> backendsPerBucketSeq = null;
|
||||
GroupId groupId = null;
|
||||
@ -2588,10 +2591,12 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
} else {
|
||||
if (!Config.disable_storage_medium_check) {
|
||||
chosenBackendIds = Env.getCurrentSystemInfo()
|
||||
.selectBackendIdsForReplicaCreation(replicaAlloc, tabletMeta.getStorageMedium());
|
||||
.selectBackendIdsForReplicaCreation(replicaAlloc, tabletMeta.getStorageMedium(),
|
||||
isStorageMediumSpecified, false);
|
||||
} else {
|
||||
chosenBackendIds = Env.getCurrentSystemInfo()
|
||||
.selectBackendIdsForReplicaCreation(replicaAlloc, null);
|
||||
.selectBackendIdsForReplicaCreation(replicaAlloc, null,
|
||||
isStorageMediumSpecified, false);
|
||||
}
|
||||
}
|
||||
|
||||
@ -2764,7 +2769,8 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
olapTable.getPartitionInfo().getDataProperty(oldPartitionId).getStoragePolicy(),
|
||||
idGeneratorBuffer, olapTable.disableAutoCompaction(),
|
||||
olapTable.enableSingleReplicaCompaction(), olapTable.skipWriteIndexOnLoad(),
|
||||
olapTable.storeRowColumn(), olapTable.isDynamicSchema(), binlogConfig);
|
||||
olapTable.storeRowColumn(), olapTable.isDynamicSchema(), binlogConfig,
|
||||
copiedTbl.getPartitionInfo().getDataProperty(oldPartitionId).isStorageMediumSpecified());
|
||||
newPartitions.add(newPartition);
|
||||
}
|
||||
} catch (DdlException e) {
|
||||
|
||||
@ -528,11 +528,14 @@ public class SystemInfoService {
|
||||
*
|
||||
* @param replicaAlloc
|
||||
* @param storageMedium
|
||||
* @param isStorageMediumSpecified
|
||||
* @param isOnlyForCheck set true if only used for check available backend
|
||||
* @return return the selected backend ids group by tag.
|
||||
* @throws DdlException
|
||||
*/
|
||||
public Map<Tag, List<Long>> selectBackendIdsForReplicaCreation(
|
||||
ReplicaAllocation replicaAlloc, TStorageMedium storageMedium)
|
||||
ReplicaAllocation replicaAlloc, TStorageMedium storageMedium, boolean isStorageMediumSpecified,
|
||||
boolean isOnlyForCheck)
|
||||
throws DdlException {
|
||||
Map<Long, Backend> copiedBackends = Maps.newHashMap(idToBackendRef);
|
||||
Map<Tag, List<Long>> chosenBackendIds = Maps.newHashMap();
|
||||
@ -557,6 +560,14 @@ public class SystemInfoService {
|
||||
|
||||
BeSelectionPolicy policy = builder.build();
|
||||
List<Long> beIds = selectBackendIdsByPolicy(policy, entry.getValue());
|
||||
// first time empty, retry with different storage medium
|
||||
// if only for check, no need to retry different storage medium to get backend
|
||||
if (beIds.isEmpty() && storageMedium != null && !isStorageMediumSpecified && !isOnlyForCheck) {
|
||||
storageMedium = (storageMedium == TStorageMedium.HDD) ? TStorageMedium.SSD : TStorageMedium.HDD;
|
||||
policy = builder.setStorageMedium(storageMedium).build();
|
||||
beIds = selectBackendIdsByPolicy(policy, entry.getValue());
|
||||
}
|
||||
// after retry different storage medium, it's still empty
|
||||
if (beIds.isEmpty()) {
|
||||
LOG.error("failed backend(s) for policy:" + policy);
|
||||
String errorReplication = "replication tag: " + entry.getKey()
|
||||
|
||||
@ -153,7 +153,8 @@ public class RestoreJobTest {
|
||||
|
||||
new Expectations() {
|
||||
{
|
||||
systemInfoService.selectBackendIdsForReplicaCreation((ReplicaAllocation) any, (TStorageMedium) any);
|
||||
systemInfoService.selectBackendIdsForReplicaCreation((ReplicaAllocation) any, (TStorageMedium) any,
|
||||
false, true);
|
||||
minTimes = 0;
|
||||
result = new Delegate() {
|
||||
public synchronized List<Long> selectBackendIdsForReplicaCreation(
|
||||
|
||||
@ -0,0 +1,83 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.catalog;
|
||||
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.system.Backend;
|
||||
import org.apache.doris.system.SystemInfoService;
|
||||
import org.apache.doris.thrift.TStorageMedium;
|
||||
import org.apache.doris.utframe.TestWithFeService;
|
||||
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public class CreateTableElasticOnStorageMediumTest extends TestWithFeService {
|
||||
|
||||
@Override
|
||||
protected void runAfterAll() throws Exception {
|
||||
Env.getCurrentEnv().clear();
|
||||
}
|
||||
|
||||
public void setStorageMediumToSSDTest() throws Exception {
|
||||
SystemInfoService clusterInfo = Env.getCurrentEnv().getClusterInfo();
|
||||
List<Backend> allBackends = clusterInfo.getAllBackends();
|
||||
// set all backends' storage medium to SSD
|
||||
for (Backend backend : allBackends) {
|
||||
if (backend.hasPathHash()) {
|
||||
backend.getDisks().values().stream()
|
||||
.peek(diskInfo -> diskInfo.setStorageMedium(TStorageMedium.SSD));
|
||||
}
|
||||
}
|
||||
createDatabase("db1");
|
||||
|
||||
String sql1 = "CREATE TABLE IF NOT EXISTS db1.t1 (pk INT, v1 INT sum) AGGREGATE KEY (pk) "
|
||||
+ "DISTRIBUTED BY HASH(pk) BUCKETS 1 PROPERTIES ('replication_num' = '1');";
|
||||
Assertions.assertDoesNotThrow(() -> createTables(sql1));
|
||||
String sql2 = "CREATE TABLE IF NOT EXISTS db1.t2 (pk INT, v1 INT sum) AGGREGATE KEY (pk) "
|
||||
+ "DISTRIBUTED BY HASH(pk) BUCKETS 1 PROPERTIES ('replication_num' = '1', 'storage_medium' = 'ssd');";
|
||||
Assertions.assertDoesNotThrow(() -> createTables(sql2));
|
||||
String sql3 = "CREATE TABLE IF NOT EXISTS db1.t3 (pk INT, v1 INT sum) AGGREGATE KEY (pk) "
|
||||
+ "DISTRIBUTED BY HASH(pk) BUCKETS 1 PROPERTIES ('replication_num' = '1', 'storage_medium' = 'hdd');";
|
||||
Assertions.assertThrows(DdlException.class, () -> createTables(sql3));
|
||||
}
|
||||
|
||||
public void setStorageMediumToHDDTest() throws Exception {
|
||||
SystemInfoService clusterInfo = Env.getCurrentEnv().getClusterInfo();
|
||||
List<Backend> allBackends = clusterInfo.getAllBackends();
|
||||
// set all backends' storage medium to SSD
|
||||
for (Backend backend : allBackends) {
|
||||
if (backend.hasPathHash()) {
|
||||
backend.getDisks().values().stream()
|
||||
.peek(diskInfo -> diskInfo.setStorageMedium(TStorageMedium.HDD));
|
||||
}
|
||||
}
|
||||
createDatabase("db1");
|
||||
|
||||
String sql1 = "CREATE TABLE IF NOT EXISTS db1.t4 (pk INT, v1 INT sum) AGGREGATE KEY (pk) "
|
||||
+ "DISTRIBUTED BY HASH(pk) BUCKETS 1 PROPERTIES ('replication_num' = '1');";
|
||||
Assertions.assertDoesNotThrow(() -> createTables(sql1));
|
||||
String sql2 = "CREATE TABLE IF NOT EXISTS db1.t5 (pk INT, v1 INT sum) AGGREGATE KEY (pk) "
|
||||
+ "DISTRIBUTED BY HASH(pk) BUCKETS 1 PROPERTIES ('replication_num' = '1', 'storage_medium' = 'hdd');";
|
||||
Assertions.assertDoesNotThrow(() -> createTables(sql2));
|
||||
String sql3 = "CREATE TABLE IF NOT EXISTS db1.t6 (pk INT, v1 INT sum) AGGREGATE KEY (pk) "
|
||||
+ "DISTRIBUTED BY HASH(pk) BUCKETS 1 PROPERTIES ('replication_num' = '1', 'storage_medium' = 'ssd');";
|
||||
Assertions.assertThrows(DdlException.class, () -> createTables(sql3));
|
||||
}
|
||||
|
||||
}
|
||||
@ -82,7 +82,7 @@ public class ModifyBackendTest {
|
||||
CreateTableStmt createStmt = (CreateTableStmt) UtFrameUtils.parseAndAnalyzeStmt(createStr, connectContext);
|
||||
ExceptionChecker.expectThrowsWithMsg(DdlException.class, "Failed to find enough backend, please check the replication num,replication tag and storage medium.\n"
|
||||
+ "Create failed replications:\n"
|
||||
+ "replication tag: {\"location\" : \"default\"}, replication num: 1, storage medium: HDD",
|
||||
+ "replication tag: {\"location\" : \"default\"}, replication num: 1, storage medium: SSD",
|
||||
() -> DdlExecutor.execute(Env.getCurrentEnv(), createStmt));
|
||||
|
||||
createStr = "create table test.tbl1(\n" + "k1 int\n" + ") distributed by hash(k1)\n" + "buckets 3 properties(\n"
|
||||
|
||||
@ -113,7 +113,7 @@ public class RoundRobinCreateTabletTest {
|
||||
try {
|
||||
Env.getCurrentEnv().getInternalCatalog().createTablets(clusterName, index, ReplicaState.NORMAL,
|
||||
distributionInfo, 0, replicaAlloc, tabletMeta,
|
||||
tabletIdSet, idGeneratorBuffer);
|
||||
tabletIdSet, idGeneratorBuffer, false);
|
||||
} catch (Exception e) {
|
||||
System.out.println("failed to create tablets " + e.getMessage());
|
||||
}
|
||||
|
||||
@ -150,7 +150,7 @@ public class CanalSyncDataTest {
|
||||
result = execPlanFragmentParams;
|
||||
|
||||
systemInfoService.selectBackendIdsForReplicaCreation((ReplicaAllocation) any,
|
||||
(TStorageMedium) any);
|
||||
(TStorageMedium) any, false, true);
|
||||
minTimes = 0;
|
||||
result = backendIds;
|
||||
|
||||
|
||||
@ -373,7 +373,7 @@ public class SystemInfoServiceTest {
|
||||
Map<Long, Integer> beCounterMap = Maps.newHashMap();
|
||||
for (int i = 0; i < 10000; ++i) {
|
||||
Map<Tag, List<Long>> res = infoService.selectBackendIdsForReplicaCreation(replicaAlloc,
|
||||
TStorageMedium.HDD);
|
||||
TStorageMedium.HDD, false, false);
|
||||
Assert.assertEquals(3, res.get(Tag.DEFAULT_BACKEND_TAG).size());
|
||||
for (Long beId : res.get(Tag.DEFAULT_BACKEND_TAG)) {
|
||||
beCounterMap.put(beId, beCounterMap.getOrDefault(beId, 0) + 1);
|
||||
|
||||
Reference in New Issue
Block a user