This reverts commit f22611769944e78c28f1b0a1eeb7b7414a16e8db.
This commit is contained in:
@ -25,7 +25,6 @@ import org.apache.doris.catalog.Type;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.thrift.TStringLiteral;
|
||||
|
||||
import com.github.javaparser.quality.Preconditions;
|
||||
import com.google.common.collect.Maps;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
@ -114,20 +113,9 @@ public class PartitionExprUtil {
|
||||
return null;
|
||||
}
|
||||
|
||||
// In one calling, because we have partition values filter, the same partition
|
||||
// value won't make duplicate AddPartitionClause.
|
||||
// But if there's same partition values in two calling of this. we may have the
|
||||
// different partition name because we have timestamp suffix here.
|
||||
// Should check existence of partitions in this table. so need at least readlock
|
||||
// first.
|
||||
// @return <newName, newPartitionClause>
|
||||
// @return existPartitionIds will save exist partition's id.
|
||||
public static Map<String, AddPartitionClause> getNonExistPartitionAddClause(OlapTable olapTable,
|
||||
ArrayList<TStringLiteral> partitionValues, PartitionInfo partitionInfo, ArrayList<Long> existPartitionIds)
|
||||
public static Map<String, AddPartitionClause> getAddPartitionClauseFromPartitionValues(OlapTable olapTable,
|
||||
ArrayList<TStringLiteral> partitionValues, PartitionInfo partitionInfo)
|
||||
throws AnalysisException {
|
||||
Preconditions.checkArgument(!partitionInfo.isMultiColumnPartition(),
|
||||
"now dont support multi key columns in auto-partition.");
|
||||
|
||||
Map<String, AddPartitionClause> result = Maps.newHashMap();
|
||||
ArrayList<Expr> partitionExprs = partitionInfo.getPartitionExprs();
|
||||
PartitionType partitionType = partitionInfo.getType();
|
||||
@ -144,14 +132,6 @@ public class PartitionExprUtil {
|
||||
continue;
|
||||
}
|
||||
filterPartitionValues.add(value);
|
||||
|
||||
// check if this key value has been covered by some partition.
|
||||
Long id = partitionInfo.contains(partitionValue, partitionType);
|
||||
if (id != null) { // found
|
||||
existPartitionIds.add(id);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (partitionType == PartitionType.RANGE) {
|
||||
String beginTime = value;
|
||||
DateLiteral beginDateTime = new DateLiteral(beginTime, partitionColumnType);
|
||||
@ -167,24 +147,21 @@ public class PartitionExprUtil {
|
||||
listValues.add(Collections.singletonList(lowerValue));
|
||||
partitionKeyDesc = PartitionKeyDesc.createIn(
|
||||
listValues);
|
||||
// the partition's name can't contain some special characters. so some string
|
||||
// values(like a*b and ab) will get same partition name. to distingush them, we
|
||||
// have to add a timestamp.
|
||||
partitionName += getFormatPartitionValue(lowerValue.getStringValue());
|
||||
if (partitionColumnType.isStringType()) {
|
||||
partitionName += "_" + System.currentTimeMillis();
|
||||
}
|
||||
} else {
|
||||
throw new AnalysisException("auto-partition only support range and list partition");
|
||||
throw new AnalysisException("now only support range and list partition");
|
||||
}
|
||||
|
||||
Map<String, String> partitionProperties = Maps.newHashMap();
|
||||
DistributionDesc distributionDesc = olapTable.getDefaultDistributionInfo().toDistributionDesc();
|
||||
|
||||
SinglePartitionDesc partitionDesc = new SinglePartitionDesc(true, partitionName,
|
||||
SinglePartitionDesc singleRangePartitionDesc = new SinglePartitionDesc(true, partitionName,
|
||||
partitionKeyDesc, partitionProperties);
|
||||
|
||||
AddPartitionClause addPartitionClause = new AddPartitionClause(partitionDesc,
|
||||
AddPartitionClause addPartitionClause = new AddPartitionClause(singleRangePartitionDesc,
|
||||
distributionDesc, partitionProperties, false);
|
||||
result.put(partitionName, addPartitionClause);
|
||||
}
|
||||
|
||||
@ -322,29 +322,6 @@ public class Database extends MetaObject implements Writable, DatabaseIf<Table>
|
||||
return Math.max(leftReplicaQuota, 0L);
|
||||
}
|
||||
|
||||
public long getReplicaCountWithoutLock() {
|
||||
readLock();
|
||||
try {
|
||||
long usedReplicaCount = 0;
|
||||
for (Table table : this.idToTable.values()) {
|
||||
if (table.getType() != TableType.OLAP) {
|
||||
continue;
|
||||
}
|
||||
|
||||
OlapTable olapTable = (OlapTable) table;
|
||||
usedReplicaCount = usedReplicaCount + olapTable.getReplicaCount();
|
||||
}
|
||||
return usedReplicaCount;
|
||||
} finally {
|
||||
readUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
public long getReplicaQuotaLeftWithoutLock() {
|
||||
long leftReplicaQuota = replicaQuotaSize - getReplicaCountWithoutLock();
|
||||
return Math.max(leftReplicaQuota, 0L);
|
||||
}
|
||||
|
||||
public void checkDataSizeQuota() throws DdlException {
|
||||
Pair<Double, String> quotaUnitPair = DebugUtil.getByteUint(dataQuotaBytes);
|
||||
String readableQuota = DebugUtil.DECIMAL_FORMAT_SCALE_3.format(quotaUnitPair.first) + " "
|
||||
|
||||
@ -2899,12 +2899,7 @@ public class Env {
|
||||
}
|
||||
|
||||
public void addPartition(Database db, String tableName, AddPartitionClause addPartitionClause) throws DdlException {
|
||||
getInternalCatalog().addPartition(db, tableName, addPartitionClause, false);
|
||||
}
|
||||
|
||||
public void addPartitionSkipLock(Database db, OlapTable table, AddPartitionClause addPartitionClause)
|
||||
throws DdlException {
|
||||
getInternalCatalog().addPartition(db, table.getName(), addPartitionClause, true);
|
||||
getInternalCatalog().addPartition(db, tableName, addPartitionClause);
|
||||
}
|
||||
|
||||
public void addPartitionLike(Database db, String tableName, AddPartitionLikeClause addPartitionLikeClause)
|
||||
|
||||
@ -965,10 +965,6 @@ public class OlapTable extends Table {
|
||||
return partition;
|
||||
}
|
||||
|
||||
public int getPartitionNum() {
|
||||
return idToPartition.size();
|
||||
}
|
||||
|
||||
// get all partitions except temp partitions
|
||||
public Collection<Partition> getPartitions() {
|
||||
return idToPartition.values();
|
||||
|
||||
@ -19,7 +19,6 @@ package org.apache.doris.catalog;
|
||||
|
||||
import org.apache.doris.analysis.DateLiteral;
|
||||
import org.apache.doris.analysis.Expr;
|
||||
import org.apache.doris.analysis.LiteralExpr;
|
||||
import org.apache.doris.analysis.MaxLiteral;
|
||||
import org.apache.doris.analysis.PartitionDesc;
|
||||
import org.apache.doris.analysis.PartitionValue;
|
||||
@ -30,7 +29,6 @@ import org.apache.doris.common.FeMetaVersion;
|
||||
import org.apache.doris.common.io.Text;
|
||||
import org.apache.doris.common.io.Writable;
|
||||
import org.apache.doris.thrift.TStorageMedium;
|
||||
import org.apache.doris.thrift.TStringLiteral;
|
||||
import org.apache.doris.thrift.TTabletType;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
@ -151,99 +149,6 @@ public class PartitionInfo implements Writable {
|
||||
}
|
||||
}
|
||||
|
||||
// only for auto partition. now only support one column.
|
||||
// @return: null for not contain. otherwise partition id.
|
||||
public Long contains(TStringLiteral key, PartitionType partitionType) throws AnalysisException {
|
||||
if (idToItem.isEmpty() && idToTempItem.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
|
||||
if (partitionType == PartitionType.LIST) {
|
||||
PartitionValue keyValue = new PartitionValue(key.getValue());
|
||||
|
||||
PrimitiveType toType;
|
||||
if (!idToItem.isEmpty()) {
|
||||
PartitionItem aItem = idToItem.values().iterator().next();
|
||||
toType = ((ListPartitionItem) aItem).getItems().get(0).getTypes().get(0);
|
||||
} else {
|
||||
PartitionItem aItem = idToTempItem.values().iterator().next();
|
||||
toType = ((ListPartitionItem) aItem).getItems().get(0).getTypes().get(0);
|
||||
}
|
||||
LiteralExpr detectExpr = LiteralExpr.create(keyValue.getStringValue(), Type.fromPrimitiveType(toType));
|
||||
|
||||
for (Map.Entry<Long, PartitionItem> entry : idToItem.entrySet()) {
|
||||
Long id = entry.getKey();
|
||||
ListPartitionItem item = (ListPartitionItem) (entry.getValue()); // a item is a partiton
|
||||
// in one list partition, there's maybe many acceptable value
|
||||
for (PartitionKey keysInItem : item.getItems()) {
|
||||
Preconditions.checkArgument(keysInItem.getKeys().size() == 1,
|
||||
"only support 1 column in auto partition now");
|
||||
if (detectExpr.compareLiteral(keysInItem.getKeys().get(0)) == 0) {
|
||||
return id;
|
||||
}
|
||||
}
|
||||
}
|
||||
for (Map.Entry<Long, PartitionItem> entry : idToTempItem.entrySet()) {
|
||||
Long id = entry.getKey();
|
||||
ListPartitionItem item = (ListPartitionItem) (entry.getValue()); // a item is a partiton
|
||||
// in one list partition, there's maybe many acceptable value
|
||||
for (PartitionKey keysInItem : item.getItems()) {
|
||||
Preconditions.checkArgument(keysInItem.getKeys().size() == 1,
|
||||
"only support 1 column in auto partition now");
|
||||
if (detectExpr.compareLiteral(keysInItem.getKeys().get(0)) == 0) {
|
||||
return id;
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if (partitionType == PartitionType.RANGE) {
|
||||
PartitionValue keyValue = new PartitionValue(key.getValue());
|
||||
|
||||
PrimitiveType toType;
|
||||
if (!idToItem.isEmpty()) {
|
||||
PartitionItem aItem = idToItem.values().iterator().next();
|
||||
toType = ((RangePartitionItem) aItem).getItems().lowerEndpoint().getTypes().get(0);
|
||||
} else {
|
||||
PartitionItem aItem = idToTempItem.values().iterator().next();
|
||||
toType = ((RangePartitionItem) aItem).getItems().lowerEndpoint().getTypes().get(0);
|
||||
}
|
||||
LiteralExpr detectExpr = LiteralExpr.create(keyValue.getStringValue(), Type.fromPrimitiveType(toType));
|
||||
|
||||
for (Map.Entry<Long, PartitionItem> entry : idToItem.entrySet()) {
|
||||
Long id = entry.getKey();
|
||||
RangePartitionItem item = (RangePartitionItem) (entry.getValue());
|
||||
// lower/upper for each columns
|
||||
PartitionKey lower = item.getItems().lowerEndpoint();
|
||||
PartitionKey upper = item.getItems().lowerEndpoint();
|
||||
Preconditions.checkArgument(lower.getKeys().size() == 1 && upper.getKeys().size() == 1,
|
||||
"only support 1 column in auto partition now");
|
||||
LiteralExpr lowerKey = lower.getKeys().get(0);
|
||||
LiteralExpr upperKey = lower.getKeys().get(0);
|
||||
if (detectExpr.compareLiteral(lowerKey) >= 0
|
||||
&& (detectExpr.compareLiteral(upperKey) < 0 || upperKey instanceof MaxLiteral)) {
|
||||
return id;
|
||||
}
|
||||
}
|
||||
for (Map.Entry<Long, PartitionItem> entry : idToTempItem.entrySet()) {
|
||||
Long id = entry.getKey();
|
||||
RangePartitionItem item = (RangePartitionItem) (entry.getValue());
|
||||
// lower/upper for each columns
|
||||
PartitionKey lower = item.getItems().lowerEndpoint();
|
||||
PartitionKey upper = item.getItems().lowerEndpoint();
|
||||
Preconditions.checkArgument(lower.getKeys().size() == 1 && upper.getKeys().size() == 1,
|
||||
"only support 1 column in auto partition now");
|
||||
LiteralExpr lowerKey = lower.getKeys().get(0);
|
||||
LiteralExpr upperKey = lower.getKeys().get(0);
|
||||
if (detectExpr.compareLiteral(lowerKey) >= 0
|
||||
&& (detectExpr.compareLiteral(upperKey) < 0 || upperKey instanceof MaxLiteral)) {
|
||||
return id;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
throw new AnalysisException("Only support List/Range on checking partition's inclusion");
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public PartitionItem handleNewSinglePartitionDesc(SinglePartitionDesc desc,
|
||||
long partitionId, boolean isTemp) throws DdlException {
|
||||
Preconditions.checkArgument(desc.isAnalyzed());
|
||||
|
||||
@ -1370,7 +1370,7 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
} finally {
|
||||
table.readUnlock();
|
||||
}
|
||||
addPartition(db, tableName, clause, false);
|
||||
addPartition(db, tableName, clause);
|
||||
|
||||
} catch (UserException e) {
|
||||
throw new DdlException("Failed to ADD PARTITION " + addPartitionLikeClause.getPartitionName()
|
||||
@ -1378,10 +1378,7 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
}
|
||||
}
|
||||
|
||||
// if skipLock = true. there's not any lock operation. In generally it means we
|
||||
// have a relative process outside and under a same huge lock.
|
||||
public void addPartition(Database db, String tableName, AddPartitionClause addPartitionClause, boolean skipLock)
|
||||
throws DdlException {
|
||||
public void addPartition(Database db, String tableName, AddPartitionClause addPartitionClause) throws DdlException {
|
||||
SinglePartitionDesc singlePartitionDesc = addPartitionClause.getSingeRangePartitionDesc();
|
||||
DistributionDesc distributionDesc = addPartitionClause.getDistributionDesc();
|
||||
boolean isTempPartition = addPartitionClause.isTempPartition();
|
||||
@ -1394,9 +1391,7 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
|
||||
// check
|
||||
OlapTable olapTable = db.getOlapTableOrDdlException(tableName);
|
||||
if (!skipLock) {
|
||||
olapTable.readLock();
|
||||
}
|
||||
olapTable.readLock();
|
||||
try {
|
||||
olapTable.checkNormalStateForAlter();
|
||||
// check partition type
|
||||
@ -1528,11 +1523,8 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
} catch (AnalysisException e) {
|
||||
throw new DdlException(e.getMessage());
|
||||
} finally {
|
||||
if (!skipLock) {
|
||||
olapTable.readUnlock();
|
||||
}
|
||||
olapTable.readUnlock();
|
||||
}
|
||||
// now we still hold the read lock.
|
||||
|
||||
Preconditions.checkNotNull(distributionInfo);
|
||||
Preconditions.checkNotNull(olapTable);
|
||||
@ -1546,7 +1538,7 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
long bucketNum = distributionInfo.getBucketNum();
|
||||
long replicaNum = singlePartitionDesc.getReplicaAlloc().getTotalReplicaNum();
|
||||
long totalReplicaNum = indexNum * bucketNum * replicaNum;
|
||||
if (totalReplicaNum >= db.getReplicaQuotaLeftWithoutLock()) { // this may have a little risk
|
||||
if (totalReplicaNum >= db.getReplicaQuotaLeftWithLock()) {
|
||||
throw new DdlException("Database " + db.getFullName() + " table " + tableName + " add partition increasing "
|
||||
+ totalReplicaNum + " of replica exceeds quota[" + db.getReplicaQuota() + "]");
|
||||
}
|
||||
@ -1574,12 +1566,9 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
olapTable.storeRowColumn(),
|
||||
binlogConfig, dataProperty.isStorageMediumSpecified());
|
||||
|
||||
// check again.
|
||||
// if we have lock outside, skip the check cuz the table wouldn'tbe delete.
|
||||
if (!skipLock) {
|
||||
olapTable = db.getOlapTableOrDdlException(tableName);
|
||||
olapTable.writeLockOrDdlException();
|
||||
}
|
||||
// check again
|
||||
olapTable = db.getOlapTableOrDdlException(tableName);
|
||||
olapTable.writeLockOrDdlException();
|
||||
try {
|
||||
olapTable.checkNormalStateForAlter();
|
||||
// check partition name
|
||||
@ -1634,6 +1623,8 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
if (metaChanged) {
|
||||
throw new DdlException("Table[" + tableName + "]'s meta has been changed. try again.");
|
||||
}
|
||||
@ -1672,9 +1663,7 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
|
||||
LOG.info("succeed in creating partition[{}], temp: {}", partitionId, isTempPartition);
|
||||
} finally {
|
||||
if (!skipLock) {
|
||||
olapTable.writeUnlock();
|
||||
}
|
||||
olapTable.writeUnlock();
|
||||
}
|
||||
} catch (DdlException e) {
|
||||
for (Long tabletId : tabletIdSet) {
|
||||
|
||||
@ -226,7 +226,6 @@ import com.google.common.base.Strings;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Multimap;
|
||||
import com.google.common.collect.Streams;
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
@ -3256,7 +3255,8 @@ public class FrontendServiceImpl implements FrontendService.Iface {
|
||||
return result;
|
||||
}
|
||||
|
||||
// extract request's partitions
|
||||
OlapTable olapTable = (OlapTable) table;
|
||||
PartitionInfo partitionInfo = olapTable.getPartitionInfo();
|
||||
ArrayList<TStringLiteral> partitionValues = new ArrayList<TStringLiteral>();
|
||||
for (int i = 0; i < request.partitionValues.size(); i++) {
|
||||
if (request.partitionValues.get(i).size() != 1) {
|
||||
@ -3267,71 +3267,34 @@ public class FrontendServiceImpl implements FrontendService.Iface {
|
||||
}
|
||||
partitionValues.add(request.partitionValues.get(i).get(0));
|
||||
}
|
||||
|
||||
// get the table and its partitions.
|
||||
OlapTable olapTable = (OlapTable) table;
|
||||
PartitionInfo partitionInfo = olapTable.getPartitionInfo();
|
||||
|
||||
// generate the partitions from value.
|
||||
Map<String, AddPartitionClause> addPartitionClauseMap; // name to partition. each is one partition.
|
||||
ArrayList<Long> existPartitionIds = Lists.newArrayList();
|
||||
Map<String, AddPartitionClause> addPartitionClauseMap;
|
||||
try {
|
||||
// Lock from here
|
||||
olapTable.writeLockOrDdlException();
|
||||
// won't get duplicate values. If exist, the origin partition will save id in
|
||||
// existPartitionIds, no go to return ClauseMap
|
||||
addPartitionClauseMap = PartitionExprUtil.getNonExistPartitionAddClause(olapTable,
|
||||
partitionValues, partitionInfo, existPartitionIds);
|
||||
} catch (DdlException ddlEx) {
|
||||
errorStatus.setErrorMsgs(Lists.newArrayList(ddlEx.getMessage()));
|
||||
result.setStatus(errorStatus);
|
||||
return result;
|
||||
addPartitionClauseMap = PartitionExprUtil.getAddPartitionClauseFromPartitionValues(olapTable,
|
||||
partitionValues, partitionInfo);
|
||||
} catch (AnalysisException ex) {
|
||||
olapTable.writeUnlock();
|
||||
errorStatus.setErrorMsgs(Lists.newArrayList(ex.getMessage()));
|
||||
result.setStatus(errorStatus);
|
||||
return result;
|
||||
}
|
||||
|
||||
// check partition's number limit.
|
||||
int partitionNum = olapTable.getPartitionNum() + addPartitionClauseMap.size();
|
||||
if (partitionNum > Config.max_auto_partition_num) {
|
||||
olapTable.writeUnlock();
|
||||
String errorMessage = String.format(
|
||||
"create partition failed. partition numbers %d will exceed limit variable max_auto_partition_num%d",
|
||||
partitionNum, Config.max_auto_partition_num);
|
||||
LOG.warn(errorMessage);
|
||||
errorStatus.setErrorMsgs(Lists.newArrayList(errorMessage));
|
||||
result.setStatus(errorStatus);
|
||||
return result;
|
||||
}
|
||||
|
||||
// add partitions to table. will write metadata.
|
||||
try {
|
||||
for (AddPartitionClause addPartitionClause : addPartitionClauseMap.values()) {
|
||||
Env.getCurrentEnv().addPartitionSkipLock(db, olapTable, addPartitionClause);
|
||||
for (AddPartitionClause addPartitionClause : addPartitionClauseMap.values()) {
|
||||
try {
|
||||
// here maybe check and limit created partitions num
|
||||
Env.getCurrentEnv().addPartition(db, olapTable.getName(), addPartitionClause);
|
||||
} catch (DdlException e) {
|
||||
LOG.warn(e);
|
||||
errorStatus.setErrorMsgs(
|
||||
Lists.newArrayList(String.format("create partition failed. error:%s", e.getMessage())));
|
||||
result.setStatus(errorStatus);
|
||||
return result;
|
||||
}
|
||||
} catch (DdlException e) {
|
||||
LOG.warn(e);
|
||||
errorStatus.setErrorMsgs(
|
||||
Lists.newArrayList(String.format("create partition failed. error:%s", e.getMessage())));
|
||||
result.setStatus(errorStatus);
|
||||
return result;
|
||||
} finally {
|
||||
// read/write metadata finished. free lock.
|
||||
olapTable.writeUnlock();
|
||||
}
|
||||
|
||||
// build partition & tablets
|
||||
List<TOlapTablePartition> partitions = Lists.newArrayList();
|
||||
List<TTabletLocation> tablets = Lists.newArrayList();
|
||||
|
||||
// two part: we create + we found others create(before we try to create and after we found loss in BE)
|
||||
List<Partition> returnPartitions = Streams
|
||||
.concat(existPartitionIds.stream().map(id -> olapTable.getPartition(id)),
|
||||
addPartitionClauseMap.keySet().stream().map(str -> olapTable.getPartition(str)))
|
||||
.collect(Collectors.toList());
|
||||
for (Partition partition : returnPartitions) {
|
||||
for (String partitionName : addPartitionClauseMap.keySet()) {
|
||||
Partition partition = table.getPartition(partitionName);
|
||||
TOlapTablePartition tPartition = new TOlapTablePartition();
|
||||
tPartition.setId(partition.getId());
|
||||
int partColNum = partitionInfo.getPartitionColumns().size();
|
||||
|
||||
Reference in New Issue
Block a user