[Dynamic Partition] Support for automatically drop partitions (#3081)
This commit is contained in:
@ -33,13 +33,15 @@ public class ShowDynamicPartitionStmt extends ShowStmt {
|
||||
.addColumn(new Column("TableName", ScalarType.createVarchar(20)))
|
||||
.addColumn(new Column("Enable", ScalarType.createVarchar(20)))
|
||||
.addColumn(new Column("TimeUnit", ScalarType.createVarchar(20)))
|
||||
.addColumn(new Column("Start", ScalarType.createVarchar(20)))
|
||||
.addColumn(new Column("End", ScalarType.createVarchar(20)))
|
||||
.addColumn(new Column("Prefix", ScalarType.createVarchar(20)))
|
||||
.addColumn(new Column("Buckets", ScalarType.createVarchar(20)))
|
||||
.addColumn(new Column("LastUpdateTime", ScalarType.createVarchar(20)))
|
||||
.addColumn(new Column("LastSchedulerTime", ScalarType.createVarchar(20)))
|
||||
.addColumn(new Column("State", ScalarType.createVarchar(20)))
|
||||
.addColumn(new Column("Msg", ScalarType.createVarchar(20)))
|
||||
.addColumn(new Column("LastCreatePartitionMsg", ScalarType.createVarchar(20)))
|
||||
.addColumn(new Column("LastDropPartitionMsg", ScalarType.createVarchar(20)))
|
||||
.build();
|
||||
|
||||
ShowDynamicPartitionStmt(String db) {
|
||||
|
||||
@ -21,6 +21,7 @@ import java.util.Map;
|
||||
|
||||
public class DynamicPartitionProperty{
|
||||
public static final String TIME_UNIT = "dynamic_partition.time_unit";
|
||||
public static final String START = "dynamic_partition.start";
|
||||
public static final String END = "dynamic_partition.end";
|
||||
public static final String PREFIX = "dynamic_partition.prefix";
|
||||
public static final String BUCKETS = "dynamic_partition.buckets";
|
||||
@ -30,6 +31,7 @@ public class DynamicPartitionProperty{
|
||||
|
||||
private boolean enable;
|
||||
private String timeUnit;
|
||||
private int start;
|
||||
private int end;
|
||||
private String prefix;
|
||||
private int buckets;
|
||||
@ -39,6 +41,7 @@ public class DynamicPartitionProperty{
|
||||
this.exist = true;
|
||||
this.enable = Boolean.parseBoolean(properties.get(ENABLE));
|
||||
this.timeUnit = properties.get(TIME_UNIT);
|
||||
this.start = Integer.parseInt(properties.get(START));
|
||||
this.end = Integer.parseInt(properties.get(END));
|
||||
this.prefix = properties.get(PREFIX);
|
||||
this.buckets = Integer.parseInt(properties.get(BUCKETS));
|
||||
@ -55,6 +58,10 @@ public class DynamicPartitionProperty{
|
||||
return timeUnit;
|
||||
}
|
||||
|
||||
public int getStart() {
|
||||
return start;
|
||||
}
|
||||
|
||||
public int getEnd() {
|
||||
return end;
|
||||
}
|
||||
@ -75,6 +82,7 @@ public class DynamicPartitionProperty{
|
||||
public String toString() {
|
||||
return ",\n\"" + ENABLE + "\" = \"" + enable + "\"" +
|
||||
",\n\"" + TIME_UNIT + "\" = \"" + timeUnit + "\"" +
|
||||
",\n\"" + START + "\" = \"" + start + "\"" +
|
||||
",\n\"" + END + "\" = \"" + end + "\"" +
|
||||
",\n\"" + PREFIX + "\" = \"" + prefix + "\"" +
|
||||
",\n\"" + BUCKETS + "\" = \"" + buckets + "\"";
|
||||
|
||||
@ -19,6 +19,7 @@ package org.apache.doris.clone;
|
||||
|
||||
import org.apache.doris.analysis.AddPartitionClause;
|
||||
import org.apache.doris.analysis.DistributionDesc;
|
||||
import org.apache.doris.analysis.DropPartitionClause;
|
||||
import org.apache.doris.analysis.HashDistributionDesc;
|
||||
import org.apache.doris.analysis.PartitionKeyDesc;
|
||||
import org.apache.doris.analysis.PartitionValue;
|
||||
@ -33,13 +34,13 @@ import org.apache.doris.catalog.PartitionInfo;
|
||||
import org.apache.doris.catalog.PartitionKey;
|
||||
import org.apache.doris.catalog.RangePartitionInfo;
|
||||
import org.apache.doris.catalog.Table;
|
||||
import org.apache.doris.catalog.TableProperty;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.common.util.DynamicPartitionUtil;
|
||||
import org.apache.doris.common.util.MasterDaemon;
|
||||
import org.apache.doris.common.util.RangeUtils;
|
||||
import org.apache.doris.common.util.TimeUtils;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
@ -52,6 +53,7 @@ import org.apache.logging.log4j.Logger;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Calendar;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
@ -68,7 +70,8 @@ public class DynamicPartitionScheduler extends MasterDaemon {
|
||||
public static final String LAST_SCHEDULER_TIME = "lastSchedulerTime";
|
||||
public static final String LAST_UPDATE_TIME = "lastUpdateTime";
|
||||
public static final String DYNAMIC_PARTITION_STATE = "dynamicPartitionState";
|
||||
public static final String MSG = "msg";
|
||||
public static final String CREATE_PARTITION_MSG = "createPartitionMsg";
|
||||
public static final String DROP_PARTITION_MSG = "dropPartitionMsg";
|
||||
|
||||
private final String DEFAULT_RUNTIME_VALUE = "N/A";
|
||||
|
||||
@ -120,11 +123,118 @@ public class DynamicPartitionScheduler extends MasterDaemon {
|
||||
defaultRuntimeInfo.put(LAST_UPDATE_TIME, DEFAULT_RUNTIME_VALUE);
|
||||
defaultRuntimeInfo.put(LAST_SCHEDULER_TIME, DEFAULT_RUNTIME_VALUE);
|
||||
defaultRuntimeInfo.put(DYNAMIC_PARTITION_STATE, State.NORMAL.toString());
|
||||
defaultRuntimeInfo.put(MSG, DEFAULT_RUNTIME_VALUE);
|
||||
defaultRuntimeInfo.put(CREATE_PARTITION_MSG, DEFAULT_RUNTIME_VALUE);
|
||||
defaultRuntimeInfo.put(DROP_PARTITION_MSG, DEFAULT_RUNTIME_VALUE);
|
||||
return defaultRuntimeInfo;
|
||||
}
|
||||
|
||||
private void dynamicAddPartition() {
|
||||
private ArrayList<AddPartitionClause> getAddPartitionClause(OlapTable olapTable, Column partitionColumn, String partitionFormat) {
|
||||
ArrayList<AddPartitionClause> addPartitionClauses = new ArrayList<>();
|
||||
Calendar calendar = Calendar.getInstance();
|
||||
DynamicPartitionProperty dynamicPartitionProperty = olapTable.getTableProperty().getDynamicPartitionProperty();
|
||||
for (int i = 0; i <= dynamicPartitionProperty.getEnd(); i++) {
|
||||
String prevBorder = DynamicPartitionUtil.getPartitionRange(dynamicPartitionProperty.getTimeUnit(),
|
||||
i, (Calendar) calendar.clone(), partitionFormat);
|
||||
// continue if partition already exists
|
||||
String nextBorder = DynamicPartitionUtil.getPartitionRange(dynamicPartitionProperty.getTimeUnit(),
|
||||
i + 1, (Calendar) calendar.clone(), partitionFormat);
|
||||
PartitionValue lowerValue = new PartitionValue(prevBorder);
|
||||
PartitionValue upperValue = new PartitionValue(nextBorder);
|
||||
PartitionInfo partitionInfo = olapTable.getPartitionInfo();
|
||||
RangePartitionInfo info = (RangePartitionInfo) (partitionInfo);
|
||||
boolean isPartitionExists = false;
|
||||
Range<PartitionKey> addPartitionKeyRange;
|
||||
try {
|
||||
PartitionKey lowerBound = PartitionKey.createPartitionKey(Collections.singletonList(lowerValue), Collections.singletonList(partitionColumn));
|
||||
PartitionKey upperBound = PartitionKey.createPartitionKey(Collections.singletonList(upperValue), Collections.singletonList(partitionColumn));
|
||||
addPartitionKeyRange = Range.closedOpen(lowerBound, upperBound);
|
||||
} catch (AnalysisException e) {
|
||||
// keys.size is always equal to column.size, cannot reach this exception
|
||||
LOG.warn("Keys size is not equal to column size. Error={}", e.getMessage());
|
||||
continue;
|
||||
}
|
||||
for (Range<PartitionKey> partitionKeyRange : info.getIdToRange(false).values()) {
|
||||
// only support single column partition now
|
||||
try {
|
||||
RangeUtils.checkRangeIntersect(partitionKeyRange, addPartitionKeyRange);
|
||||
} catch (DdlException e) {
|
||||
isPartitionExists = true;
|
||||
if (addPartitionKeyRange.equals(partitionKeyRange)) {
|
||||
clearCreatePartitionFailedMsg(olapTable.getName());
|
||||
} else {
|
||||
recordCreatePartitionFailedMsg(olapTable.getName(), e.getMessage());
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (isPartitionExists) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// construct partition desc
|
||||
PartitionKeyDesc partitionKeyDesc = new PartitionKeyDesc(Collections.singletonList(lowerValue), Collections.singletonList(upperValue));
|
||||
HashMap<String, String> partitionProperties = new HashMap<>(1);
|
||||
partitionProperties.put("replication_num", String.valueOf(DynamicPartitionUtil.estimateReplicateNum(olapTable)));
|
||||
String partitionName = dynamicPartitionProperty.getPrefix() + DynamicPartitionUtil.getFormattedPartitionName(prevBorder);
|
||||
SingleRangePartitionDesc rangePartitionDesc = new SingleRangePartitionDesc(true, partitionName,
|
||||
partitionKeyDesc, partitionProperties);
|
||||
|
||||
// construct distribution desc
|
||||
HashDistributionInfo hashDistributionInfo = (HashDistributionInfo) olapTable.getDefaultDistributionInfo();
|
||||
List<String> distColumnNames = new ArrayList<>();
|
||||
for (Column distributionColumn : hashDistributionInfo.getDistributionColumns()) {
|
||||
distColumnNames.add(distributionColumn.getName());
|
||||
}
|
||||
DistributionDesc distributionDesc = new HashDistributionDesc(dynamicPartitionProperty.getBuckets(), distColumnNames);
|
||||
|
||||
// add partition according to partition desc and distribution desc
|
||||
addPartitionClauses.add(new AddPartitionClause(rangePartitionDesc, distributionDesc, null, false));
|
||||
}
|
||||
return addPartitionClauses;
|
||||
}
|
||||
|
||||
private ArrayList<DropPartitionClause> getDropPartitionClause(OlapTable olapTable, Column partitionColumn, String partitionFormat) {
|
||||
ArrayList<DropPartitionClause> dropPartitionClauses = new ArrayList<>();
|
||||
Calendar calendar = Calendar.getInstance();
|
||||
DynamicPartitionProperty dynamicPartitionProperty = olapTable.getTableProperty().getDynamicPartitionProperty();
|
||||
|
||||
String lowerBorder = DynamicPartitionUtil.getPartitionRange(dynamicPartitionProperty.getTimeUnit(),
|
||||
dynamicPartitionProperty.getStart(), (Calendar) calendar.clone(), partitionFormat);
|
||||
String upperBorder = DynamicPartitionUtil.getPartitionRange(dynamicPartitionProperty.getTimeUnit(),
|
||||
0, (Calendar) calendar.clone(), partitionFormat);
|
||||
PartitionValue lowerPartitionValue = new PartitionValue(lowerBorder);
|
||||
PartitionValue upperPartitionValue = new PartitionValue(upperBorder);
|
||||
Range<PartitionKey> reservePartitionKeyRange;
|
||||
try {
|
||||
PartitionKey lowerBound = PartitionKey.createPartitionKey(Collections.singletonList(lowerPartitionValue), Collections.singletonList(partitionColumn));
|
||||
PartitionKey upperBound = PartitionKey.createPartitionKey(Collections.singletonList(upperPartitionValue), Collections.singletonList(partitionColumn));
|
||||
reservePartitionKeyRange = Range.closedOpen(lowerBound, upperBound);
|
||||
} catch (AnalysisException e) {
|
||||
// keys.size is always equal to column.size, cannot reach this exception
|
||||
LOG.warn("Keys size is not equal to column size. Error={}", e.getMessage());
|
||||
return dropPartitionClauses;
|
||||
}
|
||||
RangePartitionInfo info = (RangePartitionInfo) (olapTable.getPartitionInfo());
|
||||
|
||||
List<Map.Entry<Long, Range<PartitionKey>>> idToRanges = new ArrayList<>(info.getIdToRange(false).entrySet());
|
||||
idToRanges.sort(Comparator.comparing(o -> o.getValue().upperEndpoint()));
|
||||
for (Map.Entry<Long, Range<PartitionKey>> idToRange : idToRanges) {
|
||||
try {
|
||||
Long checkDropPartitionId = idToRange.getKey();
|
||||
Range<PartitionKey> checkDropPartitionKey = idToRange.getValue();
|
||||
RangeUtils.checkRangeIntersect(reservePartitionKeyRange, checkDropPartitionKey);
|
||||
if (checkDropPartitionKey.upperEndpoint().compareTo(reservePartitionKeyRange.lowerEndpoint()) <= 0) {
|
||||
String dropPartitionName = olapTable.getPartition(checkDropPartitionId).getName();
|
||||
dropPartitionClauses.add(new DropPartitionClause(false, dropPartitionName, false));
|
||||
}
|
||||
} catch (DdlException e) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
return dropPartitionClauses;
|
||||
}
|
||||
|
||||
private void executeDynamicPartition() {
|
||||
Iterator<Pair<Long, Long>> iterator = dynamicPartitionTableInfo.iterator();
|
||||
while (iterator.hasNext()) {
|
||||
Pair<Long, Long> tableInfo = iterator.next();
|
||||
@ -135,12 +245,16 @@ public class DynamicPartitionScheduler extends MasterDaemon {
|
||||
iterator.remove();
|
||||
continue;
|
||||
}
|
||||
String tableName;
|
||||
|
||||
ArrayList<AddPartitionClause> addPartitionClauses = new ArrayList<>();
|
||||
ArrayList<DropPartitionClause> dropPartitionClauses;
|
||||
String tableName;
|
||||
boolean skipAddPartition = false;
|
||||
db.readLock();
|
||||
OlapTable olapTable;
|
||||
try {
|
||||
olapTable = (OlapTable) db.getTable(tableId);
|
||||
// Only OlapTable has DynamicPartitionProperty
|
||||
OlapTable olapTable = (OlapTable) db.getTable(tableId);
|
||||
if (olapTable == null
|
||||
|| !olapTable.dynamicPartitionExists()
|
||||
|| !olapTable.getTableProperty().getDynamicPartitionProperty().getEnable()) {
|
||||
@ -151,9 +265,9 @@ public class DynamicPartitionScheduler extends MasterDaemon {
|
||||
if (olapTable.getState() != OlapTable.OlapTableState.NORMAL) {
|
||||
String errorMsg = "Table[" + olapTable.getName() + "]'s state is not NORMAL."
|
||||
+ "Do not allow doing dynamic add partition. table state=" + olapTable.getState();
|
||||
recordFailedMsg(olapTable.getName(), errorMsg);
|
||||
recordCreatePartitionFailedMsg(olapTable.getName(), errorMsg);
|
||||
LOG.info(errorMsg);
|
||||
continue;
|
||||
skipAddPartition = true;
|
||||
}
|
||||
|
||||
// Determine the partition column type
|
||||
@ -167,97 +281,64 @@ public class DynamicPartitionScheduler extends MasterDaemon {
|
||||
try {
|
||||
partitionFormat = DynamicPartitionUtil.getPartitionFormat(partitionColumn);
|
||||
} catch (DdlException e) {
|
||||
recordFailedMsg(olapTable.getName(), e.getMessage());
|
||||
recordCreatePartitionFailedMsg(olapTable.getName(), e.getMessage());
|
||||
continue;
|
||||
}
|
||||
|
||||
Calendar calendar = Calendar.getInstance();
|
||||
TableProperty tableProperty = olapTable.getTableProperty();
|
||||
DynamicPartitionProperty dynamicPartitionProperty = tableProperty.getDynamicPartitionProperty();
|
||||
|
||||
for (int i = 0; i <= dynamicPartitionProperty.getEnd(); i++) {
|
||||
String dynamicPartitionPrefix = dynamicPartitionProperty.getPrefix();
|
||||
String prevBorder = DynamicPartitionUtil.getPartitionRange(dynamicPartitionProperty.getTimeUnit(),
|
||||
i, (Calendar) calendar.clone(), partitionFormat);
|
||||
String partitionName = dynamicPartitionPrefix + DynamicPartitionUtil.getFormattedPartitionName(prevBorder);
|
||||
|
||||
// continue if partition already exists
|
||||
String nextBorder = DynamicPartitionUtil.getPartitionRange(dynamicPartitionProperty.getTimeUnit(),
|
||||
i + 1, (Calendar) calendar.clone(), partitionFormat);
|
||||
PartitionValue lowerValue = new PartitionValue(prevBorder);
|
||||
PartitionValue upperValue = new PartitionValue(nextBorder);
|
||||
PartitionInfo partitionInfo = olapTable.getPartitionInfo();
|
||||
RangePartitionInfo info = (RangePartitionInfo) (partitionInfo);
|
||||
boolean isPartitionExists = false;
|
||||
Range<PartitionKey> addPartitionKeyRange = null;
|
||||
try {
|
||||
PartitionKey lowerBound = PartitionKey.createPartitionKey(Collections.singletonList(lowerValue), Collections.singletonList(partitionColumn));
|
||||
PartitionKey upperBound = PartitionKey.createPartitionKey(Collections.singletonList(upperValue), Collections.singletonList(partitionColumn));
|
||||
addPartitionKeyRange = Range.closedOpen(lowerBound, upperBound);
|
||||
} catch (AnalysisException e) {
|
||||
// keys.size is always equal to column.size, cannot reach this exception
|
||||
LOG.error("Keys size is not equl to column size.");
|
||||
continue;
|
||||
}
|
||||
|
||||
Range<PartitionKey> intersectRange = info.getAnyIntersectRange(addPartitionKeyRange, false);
|
||||
if (intersectRange != null) {
|
||||
isPartitionExists = true;
|
||||
if (addPartitionKeyRange.equals(intersectRange)) {
|
||||
clearFailedMsg(olapTable.getName());
|
||||
} else {
|
||||
recordFailedMsg(olapTable.getName(), "range " + addPartitionKeyRange
|
||||
+ " intersect with existing range: " + intersectRange);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
if (isPartitionExists) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// construct partition desc
|
||||
PartitionKeyDesc partitionKeyDesc = new PartitionKeyDesc(Collections.singletonList(lowerValue), Collections.singletonList(upperValue));
|
||||
HashMap<String, String> partitionProperties = new HashMap<>(1);
|
||||
partitionProperties.put("replication_num", String.valueOf(DynamicPartitionUtil.estimateReplicateNum(olapTable)));
|
||||
SingleRangePartitionDesc rangePartitionDesc = new SingleRangePartitionDesc(true, partitionName,
|
||||
partitionKeyDesc, partitionProperties);
|
||||
|
||||
// construct distribution desc
|
||||
HashDistributionInfo hashDistributionInfo = (HashDistributionInfo) olapTable.getDefaultDistributionInfo();
|
||||
List<String> distColumnNames = new ArrayList<>();
|
||||
for (Column distributionColumn : hashDistributionInfo.getDistributionColumns()) {
|
||||
distColumnNames.add(distributionColumn.getName());
|
||||
}
|
||||
DistributionDesc distributionDesc = new HashDistributionDesc(dynamicPartitionProperty.getBuckets(), distColumnNames);
|
||||
|
||||
// add partition according to partition desc and distribution desc
|
||||
addPartitionClauses.add(new AddPartitionClause(rangePartitionDesc, distributionDesc, null, false));
|
||||
if (!skipAddPartition) {
|
||||
addPartitionClauses = getAddPartitionClause(olapTable, partitionColumn, partitionFormat);
|
||||
}
|
||||
dropPartitionClauses = getDropPartitionClause(olapTable, partitionColumn, partitionFormat);
|
||||
tableName = olapTable.getName();
|
||||
} finally {
|
||||
db.readUnlock();
|
||||
}
|
||||
for (AddPartitionClause addPartitionClause : addPartitionClauses) {
|
||||
try {
|
||||
Catalog.getCurrentCatalog().addPartition(db, tableName, addPartitionClause);
|
||||
clearFailedMsg(tableName);
|
||||
} catch (DdlException e) {
|
||||
recordFailedMsg(tableName, e.getMessage());
|
||||
}
|
||||
|
||||
for (DropPartitionClause dropPartitionClause : dropPartitionClauses) {
|
||||
db.writeLock();
|
||||
try {
|
||||
Catalog.getCurrentCatalog().dropPartition(db, olapTable, dropPartitionClause);
|
||||
clearDropPartitionFailedMsg(tableName);
|
||||
} catch (DdlException e) {
|
||||
recordDropPartitionFailedMsg(tableName, e.getMessage());
|
||||
} finally {
|
||||
db.writeUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
if (!skipAddPartition) {
|
||||
for (AddPartitionClause addPartitionClause : addPartitionClauses) {
|
||||
try {
|
||||
Catalog.getCurrentCatalog().addPartition(db, tableName, addPartitionClause);
|
||||
clearCreatePartitionFailedMsg(tableName);
|
||||
} catch (DdlException e) {
|
||||
recordCreatePartitionFailedMsg(tableName, e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void recordFailedMsg(String tableName, String msg) {
|
||||
private void recordCreatePartitionFailedMsg(String tableName, String msg) {
|
||||
LOG.warn("dynamic add partition failed: " + msg);
|
||||
createOrUpdateRuntimeInfo(tableName, DYNAMIC_PARTITION_STATE, State.ERROR.toString());
|
||||
createOrUpdateRuntimeInfo(tableName, MSG, msg);
|
||||
createOrUpdateRuntimeInfo(tableName, CREATE_PARTITION_MSG, msg);
|
||||
}
|
||||
|
||||
private void clearFailedMsg(String tableName) {
|
||||
private void clearCreatePartitionFailedMsg(String tableName) {
|
||||
createOrUpdateRuntimeInfo(tableName, DYNAMIC_PARTITION_STATE, State.NORMAL.toString());
|
||||
createOrUpdateRuntimeInfo(tableName, MSG, DEFAULT_RUNTIME_VALUE);
|
||||
createOrUpdateRuntimeInfo(tableName, CREATE_PARTITION_MSG, DEFAULT_RUNTIME_VALUE);
|
||||
}
|
||||
|
||||
private void recordDropPartitionFailedMsg(String tableName, String msg) {
|
||||
LOG.warn("dynamic drop partition failed: " + msg);
|
||||
createOrUpdateRuntimeInfo(tableName, DYNAMIC_PARTITION_STATE, State.ERROR.toString());
|
||||
createOrUpdateRuntimeInfo(tableName, DROP_PARTITION_MSG, msg);
|
||||
}
|
||||
|
||||
private void clearDropPartitionFailedMsg(String tableName) {
|
||||
createOrUpdateRuntimeInfo(tableName, DYNAMIC_PARTITION_STATE, State.NORMAL.toString());
|
||||
createOrUpdateRuntimeInfo(tableName, DROP_PARTITION_MSG, DEFAULT_RUNTIME_VALUE);
|
||||
}
|
||||
|
||||
private void initDynamicPartitionTable() {
|
||||
@ -287,7 +368,7 @@ public class DynamicPartitionScheduler extends MasterDaemon {
|
||||
initDynamicPartitionTable();
|
||||
}
|
||||
if (Config.dynamic_partition_enable) {
|
||||
dynamicAddPartition();
|
||||
executeDynamicPartition();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -211,6 +211,10 @@ public enum ErrorCode {
|
||||
ERR_INVALID_OPERATION(5065, new byte[] { '4', '2', '0', '0', '0' }, "Operation %s is invalid"),
|
||||
ERROR_DYNAMIC_PARTITION_TIME_UNIT(5065, new byte[] {'4', '2', '0', '0', '0'},
|
||||
"Unsupported time unit %s. Expect DAY WEEK MONTH."),
|
||||
ERROR_DYNAMIC_PARTITION_START_ZERO(5066, new byte[] {'4', '2', '0', '0', '0'},
|
||||
"Dynamic partition start must less than 0"),
|
||||
ERROR_DYNAMIC_PARTITION_START_FORMAT(5066, new byte[] {'4', '2', '0', '0', '0'},
|
||||
"Invalid dynamic partition start %s"),
|
||||
ERROR_DYNAMIC_PARTITION_END_ZERO(5066, new byte[] {'4', '2', '0', '0', '0'},
|
||||
"Dynamic partition end must greater than 0"),
|
||||
ERROR_DYNAMIC_PARTITION_END_FORMAT(5066, new byte[] {'4', '2', '0', '0', '0'},
|
||||
|
||||
@ -65,6 +65,16 @@ public class DynamicPartitionUtil {
|
||||
}
|
||||
}
|
||||
|
||||
private static void checkStart(String start) throws DdlException {
|
||||
try {
|
||||
if (Integer.parseInt(start) >= 0) {
|
||||
ErrorReport.reportDdlException(ErrorCode.ERROR_DYNAMIC_PARTITION_START_ZERO, start);
|
||||
}
|
||||
} catch (NumberFormatException e) {
|
||||
ErrorReport.reportDdlException(ErrorCode.ERROR_DYNAMIC_PARTITION_START_FORMAT, start);
|
||||
}
|
||||
}
|
||||
|
||||
private static void checkEnd(String end) throws DdlException {
|
||||
if (Strings.isNullOrEmpty(end)) {
|
||||
ErrorReport.reportDdlException(ErrorCode.ERROR_DYNAMIC_PARTITION_END_EMPTY);
|
||||
@ -103,6 +113,7 @@ public class DynamicPartitionUtil {
|
||||
return false;
|
||||
}
|
||||
return properties.containsKey(DynamicPartitionProperty.TIME_UNIT) ||
|
||||
properties.containsKey(DynamicPartitionProperty.START) ||
|
||||
properties.containsKey(DynamicPartitionProperty.END) ||
|
||||
properties.containsKey(DynamicPartitionProperty.PREFIX) ||
|
||||
properties.containsKey(DynamicPartitionProperty.BUCKETS) ||
|
||||
@ -118,12 +129,14 @@ public class DynamicPartitionUtil {
|
||||
}
|
||||
String timeUnit = properties.get(DynamicPartitionProperty.TIME_UNIT);
|
||||
String prefix = properties.get(DynamicPartitionProperty.PREFIX);
|
||||
String start = properties.get(DynamicPartitionProperty.START);
|
||||
String end = properties.get(DynamicPartitionProperty.END);
|
||||
String buckets = properties.get(DynamicPartitionProperty.BUCKETS);
|
||||
String enable = properties.get(DynamicPartitionProperty.ENABLE);
|
||||
if (!((Strings.isNullOrEmpty(enable) &&
|
||||
Strings.isNullOrEmpty(timeUnit) &&
|
||||
Strings.isNullOrEmpty(prefix) &&
|
||||
Strings.isNullOrEmpty(start) &&
|
||||
Strings.isNullOrEmpty(end) &&
|
||||
Strings.isNullOrEmpty(buckets)))) {
|
||||
if (Strings.isNullOrEmpty(enable)) {
|
||||
@ -135,6 +148,9 @@ public class DynamicPartitionUtil {
|
||||
if (Strings.isNullOrEmpty(prefix)) {
|
||||
throw new DdlException("Must assign dynamic_partition.prefix properties");
|
||||
}
|
||||
if (Strings.isNullOrEmpty(start)) {
|
||||
properties.put(DynamicPartitionProperty.START, String.valueOf(Integer.MIN_VALUE));
|
||||
}
|
||||
if (Strings.isNullOrEmpty(end)) {
|
||||
throw new DdlException("Must assign dynamic_partition.end properties");
|
||||
}
|
||||
@ -189,6 +205,13 @@ public class DynamicPartitionUtil {
|
||||
properties.remove(DynamicPartitionProperty.ENABLE);
|
||||
analyzedProperties.put(DynamicPartitionProperty.ENABLE, enableValue);
|
||||
}
|
||||
// If dynamic property is not specified.Use Integer.MIN_VALUE as default
|
||||
if (properties.containsKey(DynamicPartitionProperty.START)) {
|
||||
String startValue = properties.get(DynamicPartitionProperty.START);
|
||||
checkStart(properties.get(DynamicPartitionProperty.START));
|
||||
properties.remove(DynamicPartitionProperty.START);
|
||||
analyzedProperties.put(DynamicPartitionProperty.START, startValue);
|
||||
}
|
||||
return analyzedProperties;
|
||||
}
|
||||
|
||||
|
||||
@ -1486,13 +1486,15 @@ public class ShowExecutor {
|
||||
tableName,
|
||||
String.valueOf(dynamicPartitionProperty.getEnable()),
|
||||
dynamicPartitionProperty.getTimeUnit().toUpperCase(),
|
||||
String.valueOf(dynamicPartitionProperty.getStart()),
|
||||
String.valueOf(dynamicPartitionProperty.getEnd()),
|
||||
dynamicPartitionProperty.getPrefix(),
|
||||
String.valueOf(dynamicPartitionProperty.getBuckets()),
|
||||
dynamicPartitionScheduler.getRuntimeInfo(tableName, DynamicPartitionScheduler.LAST_UPDATE_TIME),
|
||||
dynamicPartitionScheduler.getRuntimeInfo(tableName, DynamicPartitionScheduler.LAST_SCHEDULER_TIME),
|
||||
dynamicPartitionScheduler.getRuntimeInfo(tableName, DynamicPartitionScheduler.DYNAMIC_PARTITION_STATE),
|
||||
dynamicPartitionScheduler.getRuntimeInfo(tableName, DynamicPartitionScheduler.MSG)));
|
||||
dynamicPartitionScheduler.getRuntimeInfo(tableName, DynamicPartitionScheduler.CREATE_PARTITION_MSG),
|
||||
dynamicPartitionScheduler.getRuntimeInfo(tableName, DynamicPartitionScheduler.DROP_PARTITION_MSG)));
|
||||
}
|
||||
} finally {
|
||||
db.readUnlock();
|
||||
@ -1506,7 +1508,7 @@ public class ShowExecutor {
|
||||
ShowTransactionStmt showStmt = (ShowTransactionStmt) stmt;
|
||||
Database db = ctx.getCatalog().getDb(showStmt.getDbName());
|
||||
if (db == null) {
|
||||
ErrorReport.reportAnalysisException(ErrorCode.ERR_BAD_DB_ERROR, showStmt.getDbName().toString());
|
||||
ErrorReport.reportAnalysisException(ErrorCode.ERR_BAD_DB_ERROR, showStmt.getDbName());
|
||||
}
|
||||
|
||||
long txnId = showStmt.getTxnId();
|
||||
|
||||
@ -80,6 +80,7 @@ public class DynamicPartitionTableTest {
|
||||
properties.put(DynamicPartitionProperty.ENABLE, "true");
|
||||
properties.put(DynamicPartitionProperty.PREFIX, "p");
|
||||
properties.put(DynamicPartitionProperty.TIME_UNIT, "day");
|
||||
properties.put(DynamicPartitionProperty.START, "-3");
|
||||
properties.put(DynamicPartitionProperty.END, "3");
|
||||
properties.put(DynamicPartitionProperty.BUCKETS, "30");
|
||||
|
||||
@ -293,6 +294,50 @@ public class DynamicPartitionTableTest {
|
||||
catalog.createTable(stmt);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMissSTART(@Injectable SystemInfoService systemInfoService,
|
||||
@Injectable PaloAuth paloAuth,
|
||||
@Injectable EditLog editLog) throws UserException {
|
||||
new Expectations(catalog) {
|
||||
{
|
||||
catalog.getDb(dbTableName.getDb());
|
||||
minTimes = 0;
|
||||
result = db;
|
||||
|
||||
Catalog.getCurrentSystemInfo();
|
||||
minTimes = 0;
|
||||
result = systemInfoService;
|
||||
|
||||
systemInfoService.checkClusterCapacity(anyString);
|
||||
minTimes = 0;
|
||||
systemInfoService.seqChooseBackendIds(anyInt, true, true, anyString);
|
||||
minTimes = 0;
|
||||
result = beIds;
|
||||
|
||||
catalog.getAuth();
|
||||
minTimes = 0;
|
||||
result = paloAuth;
|
||||
paloAuth.checkTblPriv((ConnectContext) any, anyString, anyString, PrivPredicate.CREATE);
|
||||
minTimes = 0;
|
||||
result = true;
|
||||
|
||||
catalog.getEditLog();
|
||||
minTimes = 0;
|
||||
result = editLog;
|
||||
}
|
||||
};
|
||||
|
||||
properties.remove(DynamicPartitionProperty.START);
|
||||
|
||||
CreateTableStmt stmt = new CreateTableStmt(false, false, dbTableName, columnDefs, "olap",
|
||||
new KeysDesc(KeysType.AGG_KEYS, columnNames),
|
||||
new RangePartitionDesc(Lists.newArrayList("key1"), singleRangePartitionDescs),
|
||||
new HashDistributionDesc(1, Lists.newArrayList("key1")), properties, null, "");
|
||||
stmt.analyze(analyzer);
|
||||
|
||||
catalog.createTable(stmt);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMissEnd(@Injectable SystemInfoService systemInfoService,
|
||||
@Injectable PaloAuth paloAuth,
|
||||
|
||||
@ -49,6 +49,7 @@ public class TablePropertyTest {
|
||||
HashMap<String, String> properties = new HashMap<>();
|
||||
properties.put(DynamicPartitionProperty.ENABLE, "true");
|
||||
properties.put(DynamicPartitionProperty.TIME_UNIT, "day");
|
||||
properties.put(DynamicPartitionProperty.START, "-3");
|
||||
properties.put(DynamicPartitionProperty.END, "3");
|
||||
properties.put(DynamicPartitionProperty.PREFIX, "p");
|
||||
properties.put(DynamicPartitionProperty.BUCKETS, "30");
|
||||
@ -67,6 +68,7 @@ public class TablePropertyTest {
|
||||
Assert.assertEquals(readDynamicPartitionProperty.getEnable(), dynamicPartitionProperty.getEnable());
|
||||
Assert.assertEquals(readDynamicPartitionProperty.getBuckets(), dynamicPartitionProperty.getBuckets());
|
||||
Assert.assertEquals(readDynamicPartitionProperty.getPrefix(), dynamicPartitionProperty.getPrefix());
|
||||
Assert.assertEquals(readDynamicPartitionProperty.getStart(), dynamicPartitionProperty.getStart());
|
||||
Assert.assertEquals(readDynamicPartitionProperty.getEnd(), dynamicPartitionProperty.getEnd());
|
||||
Assert.assertEquals(readDynamicPartitionProperty.getTimeUnit(), dynamicPartitionProperty.getTimeUnit());
|
||||
in.close();
|
||||
|
||||
@ -49,6 +49,7 @@ public class ModifyDynamicPartitionInfoTest {
|
||||
HashMap<String, String> properties = new HashMap<>();
|
||||
properties.put(DynamicPartitionProperty.ENABLE, "true");
|
||||
properties.put(DynamicPartitionProperty.TIME_UNIT, "day");
|
||||
properties.put(DynamicPartitionProperty.START, "-3");
|
||||
properties.put(DynamicPartitionProperty.END, "3");
|
||||
properties.put(DynamicPartitionProperty.PREFIX, "p");
|
||||
properties.put(DynamicPartitionProperty.BUCKETS, "30");
|
||||
|
||||
@ -125,6 +125,38 @@ public class QueryPlanTest {
|
||||
"PROPERTIES (\n" +
|
||||
"\"replication_num\" = \"1\"\n" +
|
||||
");");
|
||||
|
||||
createTable("CREATE TABLE test.`dynamic_partition` (\n" +
|
||||
" `k1` date NULL COMMENT \"\",\n" +
|
||||
" `k2` smallint(6) NULL COMMENT \"\",\n" +
|
||||
" `k3` int(11) NULL COMMENT \"\",\n" +
|
||||
" `k4` bigint(20) NULL COMMENT \"\",\n" +
|
||||
" `k5` decimal(9, 3) NULL COMMENT \"\",\n" +
|
||||
" `k6` char(5) NULL COMMENT \"\",\n" +
|
||||
" `k10` date NULL COMMENT \"\",\n" +
|
||||
" `k11` datetime NULL COMMENT \"\",\n" +
|
||||
" `k7` varchar(20) NULL COMMENT \"\",\n" +
|
||||
" `k8` double MAX NULL COMMENT \"\",\n" +
|
||||
" `k9` float SUM NULL COMMENT \"\"\n" +
|
||||
") ENGINE=OLAP\n" +
|
||||
"AGGREGATE KEY(`k1`, `k2`, `k3`, `k4`, `k5`, `k6`, `k10`, `k11`, `k7`)\n" +
|
||||
"COMMENT \"OLAP\"\n" +
|
||||
"PARTITION BY RANGE (k1)\n" +
|
||||
"(\n" +
|
||||
"PARTITION p1 VALUES LESS THAN (\"2014-01-01\"),\n" +
|
||||
"PARTITION p2 VALUES LESS THAN (\"2014-06-01\"),\n" +
|
||||
"PARTITION p3 VALUES LESS THAN (\"2014-12-01\")\n" +
|
||||
")\n" +
|
||||
"DISTRIBUTED BY HASH(`k1`) BUCKETS 5\n" +
|
||||
"PROPERTIES (\n" +
|
||||
"\"replication_num\" = \"1\",\n" +
|
||||
"\"dynamic_partition.enable\" = \"true\",\n" +
|
||||
"\"dynamic_partition.start\" = \"-3\",\n" +
|
||||
"\"dynamic_partition.end\" = \"3\",\n" +
|
||||
"\"dynamic_partition.time_unit\" = \"day\",\n" +
|
||||
"\"dynamic_partition.prefix\" = \"p\",\n" +
|
||||
"\"dynamic_partition.buckets\" = \"1\"\n" +
|
||||
");");
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
|
||||
Reference in New Issue
Block a user