[Enhancement](IdGenerator) Use IdGeneratorBuffer to get better performance for getNextId operation when create table, truncate table, add partition and so on (#11479)

Co-authored-by: caiconghui1 <caiconghui1@jd.com>
This commit is contained in:
caiconghui
2022-08-04 11:21:35 +08:00
committed by GitHub
parent 36784d9131
commit e7f378fec6
5 changed files with 137 additions and 35 deletions

View File

@ -83,6 +83,7 @@ import org.apache.doris.blockrule.SqlBlockRuleMgr;
import org.apache.doris.catalog.ColocateTableIndex.GroupId;
import org.apache.doris.catalog.DistributionInfo.DistributionInfoType;
import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
import org.apache.doris.catalog.MetaIdGenerator.IdGeneratorBuffer;
import org.apache.doris.catalog.OlapTable.OlapTableState;
import org.apache.doris.catalog.Replica.ReplicaStatus;
import org.apache.doris.catalog.TableIf.TableType;
@ -344,7 +345,7 @@ public class Env {
private int masterHttpPort;
private String masterIp;
private CatalogIdGenerator idGenerator = new CatalogIdGenerator(NEXT_ID_INIT_VALUE);
private MetaIdGenerator idGenerator = new MetaIdGenerator(NEXT_ID_INIT_VALUE);
private EditLog editLog;
private int clusterId;
@ -3205,6 +3206,10 @@ public class Env {
return idGenerator.getNextId();
}
public IdGeneratorBuffer getIdGeneratorBuffer(long bufferSize) {
return idGenerator.getIdGeneratorBuffer(bufferSize);
}
public HashMap<Long, TStorageMedium> getPartitionIdToStorageMediumMap() {
HashMap<Long, TStorageMedium> storageMediumMap = new HashMap<Long, TStorageMedium>();

View File

@ -19,10 +19,12 @@ package org.apache.doris.catalog;
import org.apache.doris.persist.EditLog;
import com.google.common.base.Preconditions;
// This new Id generator is just same as TransactionIdGenerator.
// But we can't just use TransactionIdGenerator to replace the old catalog's 'nextId' for compatibility reason.
// cause they are using different edit log operation type.
public class CatalogIdGenerator {
public class MetaIdGenerator {
private static final int BATCH_ID_INTERVAL = 1000;
private long nextId;
@ -30,7 +32,7 @@ public class CatalogIdGenerator {
private EditLog editLog;
public CatalogIdGenerator(long initValue) {
public MetaIdGenerator(long initValue) {
nextId = initValue + 1;
batchEndId = initValue;
}
@ -41,16 +43,27 @@ public class CatalogIdGenerator {
// performance is more quickly
public synchronized long getNextId() {
if (nextId < batchEndId) {
return nextId++;
} else {
if (nextId >= batchEndId) {
batchEndId = batchEndId + BATCH_ID_INTERVAL;
if (editLog != null) {
// add this check just for unit test
editLog.logSaveNextId(batchEndId);
}
return nextId++;
}
return nextId++;
}
public synchronized IdGeneratorBuffer getIdGeneratorBuffer(long bufferSize) {
Preconditions.checkState(bufferSize > 0);
IdGeneratorBuffer idGeneratorBuffer = new IdGeneratorBuffer(nextId, nextId + bufferSize - 1);
nextId = nextId + bufferSize;
if (nextId > batchEndId) {
batchEndId = batchEndId + (bufferSize / BATCH_ID_INTERVAL + 1) * BATCH_ID_INTERVAL;
if (editLog != null) {
editLog.logSaveNextId(batchEndId);
}
}
return idGeneratorBuffer;
}
public synchronized void setId(long id) {
@ -64,4 +77,19 @@ public class CatalogIdGenerator {
public long getBatchEndId() {
return batchEndId;
}
public class IdGeneratorBuffer {
private long nextId;
private long batchEndId;
private IdGeneratorBuffer(long nextId, long batchEndId) {
this.nextId = nextId;
this.batchEndId = batchEndId;
}
public long getNextId() {
Preconditions.checkState(nextId <= batchEndId);
return nextId++;
}
}
}

View File

@ -0,0 +1,61 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.common.util;
import org.apache.doris.analysis.CreateTableStmt;
import org.apache.doris.analysis.SinglePartitionDesc;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.ReplicaAllocation;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import java.util.Collection;
public class IdGeneratorUtil {
public static long getBufferSize(CreateTableStmt stmt, ReplicaAllocation replicaAlloc) throws DdlException,
AnalysisException {
long bufferSize = 1;
long partitionNum = stmt.getPartitionDesc() == null ? 1 :
stmt.getPartitionDesc().getSinglePartitionDescs().size();
long indexNum = stmt.getRollupAlterClauseList().size() + 1;
long bucketNum = stmt.getDistributionDesc().toDistributionInfo(stmt.getColumns()).getBucketNum();
bufferSize = bufferSize + partitionNum + indexNum;
if (stmt.getPartitionDesc() == null) {
bufferSize = bufferSize + (replicaAlloc.getTotalReplicaNum() + 1) * indexNum * bucketNum;
} else {
for (SinglePartitionDesc partitionDesc : stmt.getPartitionDesc().getSinglePartitionDescs()) {
long replicaNum = partitionDesc.getReplicaAlloc().getTotalReplicaNum();
bufferSize = bufferSize + (replicaNum + 1) * indexNum * bucketNum;
}
}
return bufferSize;
}
public static long getBufferSize(OlapTable table, Collection<Long> partitionIds) {
long bufferSize = 0;
for (Long partitionId : partitionIds) {
bufferSize = bufferSize + 1;
long replicaNum = table.getPartitionInfo().getReplicaAllocation(partitionId).getTotalReplicaNum();
long indexNum = table.getIndexIdToMeta().size();
long bucketNum = table.getPartition(partitionId).getDistributionInfo().getBucketNum();
bufferSize = bufferSize + (replicaNum + 1) * indexNum * bucketNum;
}
return bufferSize;
}
}

View File

@ -85,6 +85,7 @@ import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
import org.apache.doris.catalog.MaterializedIndex.IndexState;
import org.apache.doris.catalog.MaterializedIndexMeta;
import org.apache.doris.catalog.MetaIdGenerator.IdGeneratorBuffer;
import org.apache.doris.catalog.MysqlTable;
import org.apache.doris.catalog.OdbcTable;
import org.apache.doris.catalog.OlapTable;
@ -125,6 +126,7 @@ import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.CountingDataOutputStream;
import org.apache.doris.common.util.DynamicPartitionUtil;
import org.apache.doris.common.util.IdGeneratorUtil;
import org.apache.doris.common.util.MetaLockUtils;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.common.util.QueryableReentrantLock;
@ -1312,16 +1314,18 @@ public class InternalDataSource implements DataSourceIf<Database> {
throw new DdlException("Database " + db.getFullName() + " table " + tableName + " add partition increasing "
+ totalReplicaNum + " of replica exceeds quota[" + db.getReplicaQuota() + "]");
}
Set<Long> tabletIdSet = new HashSet<Long>();
Set<Long> tabletIdSet = new HashSet<>();
long bufferSize = 1 + totalReplicaNum + indexNum * bucketNum;
IdGeneratorBuffer idGeneratorBuffer = Env.getCurrentEnv().getIdGeneratorBuffer(bufferSize);
try {
long partitionId = Env.getCurrentEnv().getNextId();
long partitionId = idGeneratorBuffer.getNextId();
Partition partition = createPartitionWithIndices(db.getClusterName(), db.getId(), olapTable.getId(),
olapTable.getBaseIndexId(), partitionId, partitionName, indexIdToMeta, distributionInfo,
dataProperty.getStorageMedium(), singlePartitionDesc.getReplicaAlloc(),
singlePartitionDesc.getVersionInfo(), bfColumns, olapTable.getBfFpp(), tabletIdSet,
olapTable.getCopiedIndexes(), singlePartitionDesc.isInMemory(), olapTable.getStorageFormat(),
singlePartitionDesc.getTabletType(), olapTable.getCompressionType(), olapTable.getDataSortInfo(),
olapTable.getEnableUniqueKeyMergeOnWrite(), olapTable.getStoragePolicy());
olapTable.getEnableUniqueKeyMergeOnWrite(), olapTable.getStoragePolicy(), idGeneratorBuffer);
// check again
table = db.getOlapTableOrDdlException(tableName);
@ -1542,7 +1546,8 @@ public class InternalDataSource implements DataSourceIf<Database> {
DistributionInfo distributionInfo, TStorageMedium storageMedium, ReplicaAllocation replicaAlloc,
Long versionInfo, Set<String> bfColumns, double bfFpp, Set<Long> tabletIdSet, List<Index> indexes,
boolean isInMemory, TStorageFormat storageFormat, TTabletType tabletType, TCompressionType compressionType,
DataSortInfo dataSortInfo, boolean enableUniqueKeyMergeOnWrite, String storagePolicy) throws DdlException {
DataSortInfo dataSortInfo, boolean enableUniqueKeyMergeOnWrite, String storagePolicy,
IdGeneratorBuffer idGeneratorBuffer) throws DdlException {
// create base index first.
Preconditions.checkArgument(baseIndexId != -1);
MaterializedIndex baseIndex = new MaterializedIndex(baseIndexId, IndexState.NORMAL);
@ -1580,7 +1585,7 @@ public class InternalDataSource implements DataSourceIf<Database> {
int schemaHash = indexMeta.getSchemaHash();
TabletMeta tabletMeta = new TabletMeta(dbId, tableId, partitionId, indexId, schemaHash, storageMedium);
createTablets(clusterName, index, ReplicaState.NORMAL, distributionInfo, version, replicaAlloc, tabletMeta,
tabletIdSet);
tabletIdSet, idGeneratorBuffer);
boolean ok = false;
String errMsg = null;
@ -1659,15 +1664,23 @@ public class InternalDataSource implements DataSourceIf<Database> {
List<Column> baseSchema = stmt.getColumns();
validateColumns(baseSchema);
// analyze replica allocation
ReplicaAllocation replicaAlloc = PropertyAnalyzer.analyzeReplicaAllocation(stmt.getProperties(), "");
if (replicaAlloc.isNotSet()) {
replicaAlloc = ReplicaAllocation.DEFAULT_ALLOCATION;
}
long bufferSize = IdGeneratorUtil.getBufferSize(stmt, replicaAlloc);
IdGeneratorBuffer idGeneratorBuffer = Env.getCurrentEnv().getIdGeneratorBuffer(bufferSize);
// create partition info
PartitionDesc partitionDesc = stmt.getPartitionDesc();
PartitionInfo partitionInfo = null;
Map<String, Long> partitionNameToId = Maps.newHashMap();
if (partitionDesc != null) {
// gen partition id first
PartitionDesc partDesc = partitionDesc;
for (SinglePartitionDesc desc : partDesc.getSinglePartitionDescs()) {
long partitionId = Env.getCurrentEnv().getNextId();
long partitionId = idGeneratorBuffer.getNextId();
partitionNameToId.put(desc.getPartitionName(), partitionId);
}
partitionInfo = partitionDesc.toPartitionInfo(baseSchema, partitionNameToId, false);
@ -1675,7 +1688,7 @@ public class InternalDataSource implements DataSourceIf<Database> {
if (DynamicPartitionUtil.checkDynamicPartitionPropertiesExist(stmt.getProperties())) {
throw new DdlException("Only support dynamic partition properties on range partition table");
}
long partitionId = Env.getCurrentEnv().getNextId();
long partitionId = idGeneratorBuffer.getNextId();
// use table name as single partition name
partitionNameToId.put(tableName, partitionId);
partitionInfo = new SinglePartitionInfo();
@ -1699,7 +1712,7 @@ public class InternalDataSource implements DataSourceIf<Database> {
TableIndexes indexes = new TableIndexes(stmt.getIndexes());
// create table
long tableId = Env.getCurrentEnv().getNextId();
long tableId = idGeneratorBuffer.getNextId();
OlapTable olapTable = new OlapTable(tableId, tableName, baseSchema, keysType, partitionInfo,
defaultDistributionInfo, indexes);
@ -1711,7 +1724,7 @@ public class InternalDataSource implements DataSourceIf<Database> {
olapTable.setComment(stmt.getComment());
// set base index id
long baseIndexId = Env.getCurrentEnv().getNextId();
long baseIndexId = idGeneratorBuffer.getNextId();
olapTable.setBaseIndexId(baseIndexId);
// set base index info to table
@ -1770,11 +1783,6 @@ public class InternalDataSource implements DataSourceIf<Database> {
throw new DdlException(e.getMessage());
}
// analyze replica allocation
ReplicaAllocation replicaAlloc = PropertyAnalyzer.analyzeReplicaAllocation(properties, "");
if (replicaAlloc.isNotSet()) {
replicaAlloc = ReplicaAllocation.DEFAULT_ALLOCATION;
}
olapTable.setReplicationAllocation(replicaAlloc);
// set in memory
@ -1880,7 +1888,7 @@ public class InternalDataSource implements DataSourceIf<Database> {
.checkAndPrepareMaterializedView(addRollupClause, olapTable, baseRollupIndex, false);
short rollupShortKeyColumnCount = Env.calcShortKeyColumnCount(rollupColumns, alterClause.getProperties());
int rollupSchemaHash = Util.generateSchemaHash();
long rollupIndexId = Env.getCurrentEnv().getNextId();
long rollupIndexId = idGeneratorBuffer.getNextId();
olapTable.setIndexMeta(rollupIndexId, addRollupClause.getRollupName(), rollupColumns, schemaVersion,
rollupSchemaHash, rollupShortKeyColumnCount, rollupIndexStorageType, keysType);
}
@ -1938,7 +1946,8 @@ public class InternalDataSource implements DataSourceIf<Database> {
partitionDistributionInfo, partitionInfo.getDataProperty(partitionId).getStorageMedium(),
partitionInfo.getReplicaAllocation(partitionId), versionInfo, bfColumns, bfFpp, tabletIdSet,
olapTable.getCopiedIndexes(), isInMemory, storageFormat, tabletType, compressionType,
olapTable.getDataSortInfo(), olapTable.getEnableUniqueKeyMergeOnWrite(), storagePolicy);
olapTable.getDataSortInfo(), olapTable.getEnableUniqueKeyMergeOnWrite(), storagePolicy,
idGeneratorBuffer);
olapTable.addPartition(partition);
} else if (partitionInfo.getType() == PartitionType.RANGE
|| partitionInfo.getType() == PartitionType.LIST) {
@ -1995,7 +2004,8 @@ public class InternalDataSource implements DataSourceIf<Database> {
partitionInfo.getReplicaAllocation(entry.getValue()), versionInfo, bfColumns, bfFpp,
tabletIdSet, olapTable.getCopiedIndexes(), isInMemory, storageFormat,
partitionInfo.getTabletType(entry.getValue()), compressionType,
olapTable.getDataSortInfo(), olapTable.getEnableUniqueKeyMergeOnWrite(), storagePolicy);
olapTable.getDataSortInfo(), olapTable.getEnableUniqueKeyMergeOnWrite(), storagePolicy,
idGeneratorBuffer);
olapTable.addPartition(partition);
}
} else {
@ -2059,7 +2069,6 @@ public class InternalDataSource implements DataSourceIf<Database> {
ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
}
LOG.info("successfully create table[{}-{}]", tableName, tableId);
return;
}
private void createOdbcTable(Database db, CreateTableStmt stmt) throws DdlException {
@ -2073,7 +2082,6 @@ public class InternalDataSource implements DataSourceIf<Database> {
ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
}
LOG.info("successfully create table[{}-{}]", tableName, tableId);
return;
}
private Table createEsTable(Database db, CreateTableStmt stmt) throws DdlException {
@ -2130,8 +2138,6 @@ public class InternalDataSource implements DataSourceIf<Database> {
ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
}
LOG.info("successfully create table[{}-{}]", tableName, tableId);
return;
}
private void createHiveTable(Database db, CreateTableStmt stmt) throws DdlException {
@ -2200,7 +2206,7 @@ public class InternalDataSource implements DataSourceIf<Database> {
private void createTablets(String clusterName, MaterializedIndex index, ReplicaState replicaState,
DistributionInfo distributionInfo, long version, ReplicaAllocation replicaAlloc, TabletMeta tabletMeta,
Set<Long> tabletIdSet) throws DdlException {
Set<Long> tabletIdSet, IdGeneratorBuffer idGeneratorBuffer) throws DdlException {
ColocateTableIndex colocateIndex = Env.getCurrentColocateIndex();
Map<Tag, List<List<Long>>> backendsPerBucketSeq = null;
GroupId groupId = null;
@ -2222,7 +2228,7 @@ public class InternalDataSource implements DataSourceIf<Database> {
}
for (int i = 0; i < distributionInfo.getBucketNum(); ++i) {
// create a new tablet with random chosen backends
Tablet tablet = new Tablet(Env.getCurrentEnv().getNextId());
Tablet tablet = new Tablet(idGeneratorBuffer.getNextId());
// add tablet to inverted index first
index.addTablet(tablet, tabletMeta);
@ -2257,7 +2263,7 @@ public class InternalDataSource implements DataSourceIf<Database> {
short totalReplicaNum = (short) 0;
for (List<Long> backendIds : chosenBackendIds.values()) {
for (long backendId : backendIds) {
long replicaId = Env.getCurrentEnv().getNextId();
long replicaId = idGeneratorBuffer.getNextId();
Replica replica = new Replica(replicaId, backendId, replicaState, version,
tabletMeta.getOldSchemaHash());
tablet.addReplica(replica);
@ -2356,6 +2362,8 @@ public class InternalDataSource implements DataSourceIf<Database> {
List<Partition> newPartitions = Lists.newArrayList();
// tabletIdSet to save all newly created tablet ids.
Set<Long> tabletIdSet = Sets.newHashSet();
long bufferSize = IdGeneratorUtil.getBufferSize(copiedTbl, origPartitions.values());
IdGeneratorBuffer idGeneratorBuffer = Env.getCurrentEnv().getIdGeneratorBuffer(bufferSize);
try {
for (Map.Entry<String, Long> entry : origPartitions.entrySet()) {
// the new partition must use new id
@ -2364,7 +2372,7 @@ public class InternalDataSource implements DataSourceIf<Database> {
// By using a new id, load job will be aborted(just like partition is dropped),
// which is the right behavior.
long oldPartitionId = entry.getValue();
long newPartitionId = Env.getCurrentEnv().getNextId();
long newPartitionId = idGeneratorBuffer.getNextId();
Partition newPartition = createPartitionWithIndices(db.getClusterName(), db.getId(), copiedTbl.getId(),
copiedTbl.getBaseIndexId(), newPartitionId, entry.getKey(), copiedTbl.getIndexIdToMeta(),
partitionsDistributionInfo.get(oldPartitionId),
@ -2374,7 +2382,7 @@ public class InternalDataSource implements DataSourceIf<Database> {
copiedTbl.isInMemory(), copiedTbl.getStorageFormat(),
copiedTbl.getPartitionInfo().getTabletType(oldPartitionId), copiedTbl.getCompressionType(),
copiedTbl.getDataSortInfo(), copiedTbl.getEnableUniqueKeyMergeOnWrite(),
olapTable.getStoragePolicy());
olapTable.getStoragePolicy(), idGeneratorBuffer);
newPartitions.add(newPartition);
}
} catch (DdlException e) {

View File

@ -326,7 +326,7 @@ public class DatabaseTransactionMgr {
checkRunningTxnExceedLimit(sourceType);
long tid = idGenerator.getNextTransactionId();
LOG.info("begin transaction: txn id {} with label {} from coordinator {}, listner id: {}",
LOG.info("begin transaction: txn id {} with label {} from coordinator {}, listener id: {}",
tid, label, coordinator, listenerId);
TransactionState transactionState = new TransactionState(dbId, tableIdList,
tid, label, requestId, sourceType, coordinator, listenerId, timeoutSecond * 1000);