[fix](storage medium) Fix show partition storage medium not right whe… (#30160) (#31642)

This commit is contained in:
deardeng
2024-03-04 20:34:25 +08:00
committed by yiguolei
parent 1dcb99519d
commit 7e4cc1d177
9 changed files with 207 additions and 98 deletions

View File

@ -1142,8 +1142,9 @@ public class RestoreJob extends AbstractJob {
// replicas
try {
Map<Tag, List<Long>> beIds = Env.getCurrentSystemInfo()
Pair<Map<Tag, List<Long>>, TStorageMedium> beIdsAndMedium = Env.getCurrentSystemInfo()
.selectBackendIdsForReplicaCreation(replicaAlloc, nextIndexs, null, false, false);
Map<Tag, List<Long>> beIds = beIdsAndMedium.first;
for (Map.Entry<Tag, List<Long>> entry : beIds.entrySet()) {
for (Long beId : entry.getValue()) {
long newReplicaId = env.getNextId();

View File

@ -118,6 +118,10 @@ public class DataProperty implements Writable, GsonPostProcessable {
storageMediumSpecified = isSpecified;
}
public void setStorageMedium(TStorageMedium medium) {
this.storageMedium = medium;
}
public static DataProperty read(DataInput in) throws IOException {
if (Env.getCurrentEnvJournalVersion() >= FeMetaVersion.VERSION_108) {
String json = Text.readString(in);

View File

@ -637,9 +637,10 @@ public class OlapTable extends Table implements MTMVRelatedTableIf {
// replicas
try {
Map<Tag, List<Long>> tag2beIds =
Pair<Map<Tag, List<Long>>, TStorageMedium> tag2beIdsAndMedium =
Env.getCurrentSystemInfo().selectBackendIdsForReplicaCreation(
replicaAlloc, nextIndexs, null, false, false);
Map<Tag, List<Long>> tag2beIds = tag2beIdsAndMedium.first;
for (Map.Entry<Tag, List<Long>> entry3 : tag2beIds.entrySet()) {
for (Long beId : entry3.getValue()) {
long newReplicaId = env.getNextId();

View File

@ -1565,20 +1565,13 @@ public class InternalCatalog implements CatalogIf<Database> {
}
try {
long partitionId = idGeneratorBuffer.getNextId();
Partition partition = createPartitionWithIndices(db.getId(), olapTable.getId(),
olapTable.getName(), olapTable.getBaseIndexId(), partitionId, partitionName, indexIdToMeta,
distributionInfo, dataProperty.getStorageMedium(), singlePartitionDesc.getReplicaAlloc(),
singlePartitionDesc.getVersionInfo(), bfColumns, olapTable.getBfFpp(), tabletIdSet,
olapTable.getCopiedIndexes(), singlePartitionDesc.isInMemory(), olapTable.getStorageFormat(),
singlePartitionDesc.getTabletType(), olapTable.getCompressionType(), olapTable.getDataSortInfo(),
olapTable.getEnableUniqueKeyMergeOnWrite(), storagePolicy, idGeneratorBuffer,
olapTable.disableAutoCompaction(), olapTable.enableSingleReplicaCompaction(),
olapTable.skipWriteIndexOnLoad(), olapTable.getCompactionPolicy(),
olapTable.getTimeSeriesCompactionGoalSizeMbytes(),
olapTable.getTimeSeriesCompactionFileCountThreshold(),
olapTable.getTimeSeriesCompactionTimeThresholdSeconds(),
olapTable.getTimeSeriesCompactionEmptyRowsetsThreshold(),
olapTable.storeRowColumn(),
Partition partition = createPartitionWithIndices(db.getId(), olapTable,
partitionId, partitionName, indexIdToMeta,
distributionInfo, dataProperty, singlePartitionDesc.getReplicaAlloc(),
singlePartitionDesc.getVersionInfo(), bfColumns, tabletIdSet,
singlePartitionDesc.isInMemory(),
singlePartitionDesc.getTabletType(),
storagePolicy, idGeneratorBuffer,
binlogConfig, dataProperty.isStorageMediumSpecified(), null);
// TODO cluster key ids
@ -1827,33 +1820,33 @@ public class InternalCatalog implements CatalogIf<Database> {
}
}
private Partition createPartitionWithIndices(long dbId, long tableId, String tableName,
long baseIndexId, long partitionId, String partitionName, Map<Long, MaterializedIndexMeta> indexIdToMeta,
DistributionInfo distributionInfo, TStorageMedium storageMedium, ReplicaAllocation replicaAlloc,
Long versionInfo, Set<String> bfColumns, double bfFpp, Set<Long> tabletIdSet, List<Index> indexes,
boolean isInMemory, TStorageFormat storageFormat, TTabletType tabletType, TCompressionType compressionType,
DataSortInfo dataSortInfo, boolean enableUniqueKeyMergeOnWrite, String storagePolicy,
IdGeneratorBuffer idGeneratorBuffer, boolean disableAutoCompaction,
boolean enableSingleReplicaCompaction, boolean skipWriteIndexOnLoad,
String compactionPolicy, Long timeSeriesCompactionGoalSizeMbytes,
Long timeSeriesCompactionFileCountThreshold, Long timeSeriesCompactionTimeThresholdSeconds,
Long timeSeriesCompactionEmptyRowsetsThreshold,
boolean storeRowColumn, BinlogConfig binlogConfig,
boolean isStorageMediumSpecified, List<Integer> clusterKeyIndexes) throws DdlException {
protected Partition createPartitionWithIndices(long dbId, OlapTable tbl, long partitionId,
String partitionName, Map<Long, MaterializedIndexMeta> indexIdToMeta,
DistributionInfo distributionInfo, DataProperty dataProperty,
ReplicaAllocation replicaAlloc,
Long versionInfo, Set<String> bfColumns, Set<Long> tabletIdSet,
boolean isInMemory,
TTabletType tabletType,
String storagePolicy,
IdGeneratorBuffer idGeneratorBuffer,
BinlogConfig binlogConfig,
boolean isStorageMediumSpecified, List<Integer> clusterKeyIndexes)
throws DdlException {
// create base index first.
Preconditions.checkArgument(baseIndexId != -1);
MaterializedIndex baseIndex = new MaterializedIndex(baseIndexId, IndexState.NORMAL);
Preconditions.checkArgument(tbl.getBaseIndexId() != -1);
MaterializedIndex baseIndex = new MaterializedIndex(tbl.getBaseIndexId(), IndexState.NORMAL);
// create partition with base index
Partition partition = new Partition(partitionId, partitionName, baseIndex, distributionInfo);
// add to index map
Map<Long, MaterializedIndex> indexMap = new HashMap<>();
indexMap.put(baseIndexId, baseIndex);
indexMap.put(tbl.getBaseIndexId(), baseIndex);
// create rollup index if has
for (long indexId : indexIdToMeta.keySet()) {
if (indexId == baseIndexId) {
if (indexId == tbl.getBaseIndexId()) {
continue;
}
@ -1869,6 +1862,7 @@ public class InternalCatalog implements CatalogIf<Database> {
long version = partition.getVisibleVersion();
short totalReplicaNum = replicaAlloc.getTotalReplicaNum();
TStorageMedium realStorageMedium = null;
for (Map.Entry<Long, MaterializedIndex> entry : indexMap.entrySet()) {
long indexId = entry.getKey();
MaterializedIndex index = entry.getValue();
@ -1876,9 +1870,16 @@ public class InternalCatalog implements CatalogIf<Database> {
// create tablets
int schemaHash = indexMeta.getSchemaHash();
TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, schemaHash, storageMedium);
createTablets(index, ReplicaState.NORMAL, distributionInfo, version, replicaAlloc, tabletMeta,
tabletIdSet, idGeneratorBuffer, isStorageMediumSpecified);
TabletMeta tabletMeta = new TabletMeta(dbId, tbl.getId(), partitionId, indexId,
schemaHash, dataProperty.getStorageMedium());
realStorageMedium = createTablets(index, ReplicaState.NORMAL, distributionInfo, version, replicaAlloc,
tabletMeta, tabletIdSet, idGeneratorBuffer, dataProperty.isStorageMediumSpecified());
if (realStorageMedium != null && !realStorageMedium.equals(dataProperty.getStorageMedium())) {
dataProperty.setStorageMedium(realStorageMedium);
LOG.info("real medium not eq default "
+ "tableName={} tableId={} partitionName={} partitionId={} readMedium {}",
tbl.getName(), tbl.getId(), partitionName, partitionId, realStorageMedium);
}
boolean ok = false;
String errMsg = null;
@ -1897,17 +1898,20 @@ public class InternalCatalog implements CatalogIf<Database> {
long backendId = replica.getBackendId();
long replicaId = replica.getId();
countDownLatch.addMark(backendId, tabletId);
CreateReplicaTask task = new CreateReplicaTask(backendId, dbId, tableId, partitionId, indexId,
CreateReplicaTask task = new CreateReplicaTask(backendId, dbId, tbl.getId(), partitionId, indexId,
tabletId, replicaId, shortKeyColumnCount, schemaHash, version, keysType, storageType,
storageMedium, schema, bfColumns, bfFpp, countDownLatch, indexes, isInMemory, tabletType,
dataSortInfo, compressionType, enableUniqueKeyMergeOnWrite, storagePolicy,
disableAutoCompaction, enableSingleReplicaCompaction, skipWriteIndexOnLoad,
compactionPolicy, timeSeriesCompactionGoalSizeMbytes,
timeSeriesCompactionFileCountThreshold, timeSeriesCompactionTimeThresholdSeconds,
timeSeriesCompactionEmptyRowsetsThreshold,
storeRowColumn, binlogConfig);
realStorageMedium, schema, bfColumns, tbl.getBfFpp(), countDownLatch,
tbl.getCopiedIndexes(), tbl.isInMemory(), tabletType,
tbl.getDataSortInfo(), tbl.getCompressionType(),
tbl.getEnableUniqueKeyMergeOnWrite(), storagePolicy, tbl.disableAutoCompaction(),
tbl.enableSingleReplicaCompaction(), tbl.skipWriteIndexOnLoad(),
tbl.getCompactionPolicy(), tbl.getTimeSeriesCompactionGoalSizeMbytes(),
tbl.getTimeSeriesCompactionFileCountThreshold(),
tbl.getTimeSeriesCompactionTimeThresholdSeconds(),
tbl.getTimeSeriesCompactionEmptyRowsetsThreshold(),
tbl.storeRowColumn(), binlogConfig);
task.setStorageFormat(storageFormat);
task.setStorageFormat(tbl.getStorageFormat());
task.setClusterKeyIndexes(clusterKeyIndexes);
batchTask.addTask(task);
// add to AgentTaskQueue for handling finish report.
@ -1971,14 +1975,14 @@ public class InternalCatalog implements CatalogIf<Database> {
}
}
if (index.getId() != baseIndexId) {
if (index.getId() != tbl.getBaseIndexId()) {
// add rollup index to partition
partition.createRollupIndex(index);
}
} // end for indexMap
LOG.info("succeed in creating partition[{}-{}], table : [{}-{}]", partitionId, partitionName,
tableId, tableName);
tbl.getId(), tbl.getName());
return partition;
}
@ -2566,20 +2570,14 @@ public class InternalCatalog implements CatalogIf<Database> {
"Database " + db.getFullName() + " create unpartitioned table " + tableName + " increasing "
+ totalReplicaNum + " of replica exceeds quota[" + db.getReplicaQuota() + "]");
}
Partition partition = createPartitionWithIndices(db.getId(), olapTable.getId(),
olapTable.getName(), olapTable.getBaseIndexId(), partitionId, partitionName,
Partition partition = createPartitionWithIndices(db.getId(), olapTable, partitionId, partitionName,
olapTable.getIndexIdToMeta(), partitionDistributionInfo,
partitionInfo.getDataProperty(partitionId).getStorageMedium(),
partitionInfo.getReplicaAllocation(partitionId), versionInfo, bfColumns, bfFpp, tabletIdSet,
olapTable.getCopiedIndexes(), isInMemory, storageFormat, tabletType, compressionType,
olapTable.getDataSortInfo(), olapTable.getEnableUniqueKeyMergeOnWrite(), storagePolicy,
idGeneratorBuffer, olapTable.disableAutoCompaction(),
olapTable.enableSingleReplicaCompaction(), skipWriteIndexOnLoad,
olapTable.getCompactionPolicy(), olapTable.getTimeSeriesCompactionGoalSizeMbytes(),
olapTable.getTimeSeriesCompactionFileCountThreshold(),
olapTable.getTimeSeriesCompactionTimeThresholdSeconds(),
olapTable.getTimeSeriesCompactionEmptyRowsetsThreshold(),
storeRowColumn, binlogConfigForTask,
partitionInfo.getDataProperty(partitionId),
partitionInfo.getReplicaAllocation(partitionId), versionInfo, bfColumns, tabletIdSet,
isInMemory, tabletType,
storagePolicy,
idGeneratorBuffer,
binlogConfigForTask,
partitionInfo.getDataProperty(partitionId).isStorageMediumSpecified(),
keysDesc.getClusterKeysColumnIds());
olapTable.addPartition(partition);
@ -2665,20 +2663,15 @@ public class InternalCatalog implements CatalogIf<Database> {
Env.getCurrentEnv().getPolicyMgr().checkStoragePolicyExist(partionStoragePolicy);
Partition partition = createPartitionWithIndices(db.getId(),
olapTable.getId(), olapTable.getName(), olapTable.getBaseIndexId(), entry.getValue(),
olapTable, entry.getValue(),
entry.getKey(), olapTable.getIndexIdToMeta(), partitionDistributionInfo,
dataProperty.getStorageMedium(), partitionInfo.getReplicaAllocation(entry.getValue()),
versionInfo, bfColumns, bfFpp, tabletIdSet, olapTable.getCopiedIndexes(), isInMemory,
storageFormat, partitionInfo.getTabletType(entry.getValue()), compressionType,
olapTable.getDataSortInfo(), olapTable.getEnableUniqueKeyMergeOnWrite(),
partionStoragePolicy, idGeneratorBuffer, olapTable.disableAutoCompaction(),
olapTable.enableSingleReplicaCompaction(), skipWriteIndexOnLoad,
olapTable.getCompactionPolicy(), olapTable.getTimeSeriesCompactionGoalSizeMbytes(),
olapTable.getTimeSeriesCompactionFileCountThreshold(),
olapTable.getTimeSeriesCompactionTimeThresholdSeconds(),
olapTable.getTimeSeriesCompactionEmptyRowsetsThreshold(),
storeRowColumn, binlogConfigForTask,
dataProperty.isStorageMediumSpecified(), keysDesc.getClusterKeysColumnIds());
dataProperty, partitionInfo.getReplicaAllocation(entry.getValue()),
versionInfo, bfColumns, tabletIdSet, isInMemory,
partitionInfo.getTabletType(entry.getValue()),
partionStoragePolicy, idGeneratorBuffer,
binlogConfigForTask,
dataProperty.isStorageMediumSpecified(),
keysDesc.getClusterKeysColumnIds());
olapTable.addPartition(partition);
olapTable.getPartitionInfo().getDataProperty(partition.getId())
.setStoragePolicy(partionStoragePolicy);
@ -2872,7 +2865,7 @@ public class InternalCatalog implements CatalogIf<Database> {
}
@VisibleForTesting
public void createTablets(MaterializedIndex index, ReplicaState replicaState,
public TStorageMedium createTablets(MaterializedIndex index, ReplicaState replicaState,
DistributionInfo distributionInfo, long version, ReplicaAllocation replicaAlloc, TabletMeta tabletMeta,
Set<Long> tabletIdSet, IdGeneratorBuffer idGeneratorBuffer, boolean isStorageMediumSpecified)
throws DdlException {
@ -2925,9 +2918,12 @@ public class InternalCatalog implements CatalogIf<Database> {
if (chooseBackendsArbitrary) {
// This is the first colocate table in the group, or just a normal table,
// choose backends
chosenBackendIds = systemInfoService.selectBackendIdsForReplicaCreation(replicaAlloc, nextIndexs,
Pair<Map<Tag, List<Long>>, TStorageMedium> chosenBackendIdsAndMedium
= systemInfoService.selectBackendIdsForReplicaCreation(
replicaAlloc, nextIndexs,
storageMedium, isStorageMediumSpecified, false);
chosenBackendIds = chosenBackendIdsAndMedium.first;
storageMedium = chosenBackendIdsAndMedium.second;
for (Map.Entry<Tag, List<Long>> entry : chosenBackendIds.entrySet()) {
backendsPerBucketSeq.putIfAbsent(entry.getKey(), Lists.newArrayList());
backendsPerBucketSeq.get(entry.getKey()).add(entry.getValue());
@ -2959,6 +2955,7 @@ public class InternalCatalog implements CatalogIf<Database> {
ColocatePersistInfo info = ColocatePersistInfo.createForBackendsPerBucketSeq(groupId, backendsPerBucketSeq);
Env.getCurrentEnv().getEditLog().logColocateBackendsPerBucketSeq(info);
}
return storageMedium;
}
/*
@ -3110,23 +3107,16 @@ public class InternalCatalog implements CatalogIf<Database> {
// which is the right behavior.
long oldPartitionId = entry.getValue();
long newPartitionId = idGeneratorBuffer.getNextId();
Partition newPartition = createPartitionWithIndices(db.getId(), copiedTbl.getId(),
copiedTbl.getName(), copiedTbl.getBaseIndexId(), newPartitionId, entry.getKey(),
Partition newPartition = createPartitionWithIndices(db.getId(), copiedTbl,
newPartitionId, entry.getKey(),
copiedTbl.getIndexIdToMeta(), partitionsDistributionInfo.get(oldPartitionId),
copiedTbl.getPartitionInfo().getDataProperty(oldPartitionId).getStorageMedium(),
copiedTbl.getPartitionInfo().getDataProperty(oldPartitionId),
copiedTbl.getPartitionInfo().getReplicaAllocation(oldPartitionId), null /* version info */,
copiedTbl.getCopiedBfColumns(), copiedTbl.getBfFpp(), tabletIdSet, copiedTbl.getCopiedIndexes(),
copiedTbl.isInMemory(), copiedTbl.getStorageFormat(),
copiedTbl.getPartitionInfo().getTabletType(oldPartitionId), copiedTbl.getCompressionType(),
copiedTbl.getDataSortInfo(), copiedTbl.getEnableUniqueKeyMergeOnWrite(),
copiedTbl.getCopiedBfColumns(), tabletIdSet,
copiedTbl.isInMemory(),
copiedTbl.getPartitionInfo().getTabletType(oldPartitionId),
olapTable.getPartitionInfo().getDataProperty(oldPartitionId).getStoragePolicy(),
idGeneratorBuffer, olapTable.disableAutoCompaction(),
olapTable.enableSingleReplicaCompaction(), olapTable.skipWriteIndexOnLoad(),
olapTable.getCompactionPolicy(), olapTable.getTimeSeriesCompactionGoalSizeMbytes(),
olapTable.getTimeSeriesCompactionFileCountThreshold(),
olapTable.getTimeSeriesCompactionTimeThresholdSeconds(),
olapTable.getTimeSeriesCompactionEmptyRowsetsThreshold(),
olapTable.storeRowColumn(), binlogConfig,
idGeneratorBuffer, binlogConfig,
copiedTbl.getPartitionInfo().getDataProperty(oldPartitionId).isStorageMediumSpecified(),
clusterKeyIdxes);
newPartitions.add(newPartition);

View File

@ -485,7 +485,7 @@ public class SystemInfoService {
* @return return the selected backend ids group by tag.
* @throws DdlException
*/
public Map<Tag, List<Long>> selectBackendIdsForReplicaCreation(
public Pair<Map<Tag, List<Long>>, TStorageMedium> selectBackendIdsForReplicaCreation(
ReplicaAllocation replicaAlloc, Map<Tag, Integer> nextIndexs,
TStorageMedium storageMedium, boolean isStorageMediumSpecified,
boolean isOnlyForCheck)
@ -520,6 +520,7 @@ public class SystemInfoService {
List<Long> beIds = selectBackendIdsByPolicy(policy, entry.getValue());
// first time empty, retry with different storage medium
// if only for check, no need to retry different storage medium to get backend
TStorageMedium originalStorageMedium = storageMedium;
if (beIds.isEmpty() && storageMedium != null && !isStorageMediumSpecified && !isOnlyForCheck) {
storageMedium = (storageMedium == TStorageMedium.HDD) ? TStorageMedium.SSD : TStorageMedium.HDD;
builder.setStorageMedium(storageMedium);
@ -534,10 +535,10 @@ public class SystemInfoService {
}
// after retry different storage medium, it's still empty
if (beIds.isEmpty()) {
LOG.error("failed backend(s) for policy:" + policy);
LOG.error("failed backend(s) for policy: {} real medium {}", policy, originalStorageMedium);
String errorReplication = "replication tag: " + entry.getKey()
+ ", replication num: " + entry.getValue()
+ ", storage medium: " + storageMedium;
+ ", storage medium: " + originalStorageMedium;
failedEntries.add(errorReplication);
} else {
chosenBackendIds.put(entry.getKey(), beIds);
@ -554,7 +555,7 @@ public class SystemInfoService {
}
Preconditions.checkState(totalReplicaNum == replicaAlloc.getTotalReplicaNum());
return chosenBackendIds;
return Pair.of(chosenBackendIds, storageMedium);
}
/**