[Feature](partitions) Support auto partition FE part (#24079)
This commit is contained in:
@ -3228,7 +3228,28 @@ opt_partition ::=
|
||||
| KW_PARTITION KW_BY KW_LIST LPAREN ident_list:columns RPAREN
|
||||
LPAREN opt_all_partition_desc_list:list RPAREN
|
||||
{:
|
||||
RESULT = new ListPartitionDesc(columns, list);
|
||||
RESULT = new ListPartitionDesc(columns, list);
|
||||
:}
|
||||
/* expr range partition */
|
||||
| KW_AUTO KW_PARTITION KW_BY KW_RANGE function_call_expr:fnExpr
|
||||
LPAREN opt_all_partition_desc_list:list RPAREN
|
||||
{:
|
||||
ArrayList<Expr> exprs = new ArrayList<Expr>();
|
||||
exprs.add(fnExpr);
|
||||
RESULT = RangePartitionDesc.createRangePartitionDesc(exprs, list);
|
||||
:}
|
||||
/* expr list partition */
|
||||
| KW_AUTO KW_PARTITION KW_BY KW_LIST LPAREN expr_list:exprs RPAREN
|
||||
LPAREN opt_all_partition_desc_list:list RPAREN
|
||||
{:
|
||||
RESULT = ListPartitionDesc.createListPartitionDesc(exprs, list);
|
||||
:}
|
||||
| KW_AUTO KW_PARTITION KW_BY KW_LIST function_call_expr:fnExpr
|
||||
LPAREN opt_all_partition_desc_list:list RPAREN
|
||||
{:
|
||||
ArrayList<Expr> exprs = new ArrayList<Expr>();
|
||||
exprs.add(fnExpr);
|
||||
RESULT = ListPartitionDesc.createListPartitionDesc(exprs, list);
|
||||
:}
|
||||
;
|
||||
|
||||
|
||||
@ -36,6 +36,24 @@ public class ListPartitionDesc extends PartitionDesc {
|
||||
List<AllPartitionDesc> allPartitionDescs) throws AnalysisException {
|
||||
super(partitionColNames, allPartitionDescs);
|
||||
type = PartitionType.LIST;
|
||||
this.isAutoCreatePartitions = false;
|
||||
}
|
||||
|
||||
public ListPartitionDesc(ArrayList<Expr> exprs, List<String> partitionColNames,
|
||||
List<AllPartitionDesc> allPartitionDescs) throws AnalysisException {
|
||||
if (exprs != null) {
|
||||
this.partitionExprs = exprs;
|
||||
}
|
||||
this.partitionColNames = partitionColNames;
|
||||
this.singlePartitionDescs = handleAllPartitionDesc(allPartitionDescs);
|
||||
this.type = PartitionType.LIST;
|
||||
this.isAutoCreatePartitions = true;
|
||||
}
|
||||
|
||||
public static ListPartitionDesc createListPartitionDesc(ArrayList<Expr> exprs,
|
||||
List<AllPartitionDesc> allPartitionDescs) throws AnalysisException {
|
||||
List<String> colNames = getColNamesFromExpr(exprs, true);
|
||||
return new ListPartitionDesc(exprs, colNames, allPartitionDescs);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -100,7 +118,8 @@ public class ListPartitionDesc extends PartitionDesc {
|
||||
}
|
||||
}
|
||||
|
||||
ListPartitionInfo listPartitionInfo = new ListPartitionInfo(partitionColumns);
|
||||
ListPartitionInfo listPartitionInfo = new ListPartitionInfo(this.isAutoCreatePartitions, this.partitionExprs,
|
||||
partitionColumns);
|
||||
for (SinglePartitionDesc desc : singlePartitionDescs) {
|
||||
long partitionId = partitionNameToId.get(desc.getPartitionName());
|
||||
listPartitionInfo.handleNewSinglePartitionDesc(desc, partitionId, isTemp);
|
||||
|
||||
@ -27,11 +27,14 @@ import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.util.PropertyAnalyzer;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.collect.ImmutableSortedSet;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Sets;
|
||||
import org.apache.commons.lang3.NotImplementedException;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
@ -39,14 +42,23 @@ import java.util.Set;
|
||||
public class PartitionDesc {
|
||||
protected List<String> partitionColNames;
|
||||
protected List<SinglePartitionDesc> singlePartitionDescs;
|
||||
|
||||
protected ArrayList<Expr> partitionExprs; //eg: auto partition by range date_trunc(column, 'day')
|
||||
protected boolean isAutoCreatePartitions;
|
||||
protected PartitionType type;
|
||||
public static final ImmutableSet<String> RANGE_PARTITION_FUNCTIONS = new ImmutableSortedSet.Builder<String>(
|
||||
String.CASE_INSENSITIVE_ORDER).add("date_trunc").add("date_ceil").add("date_floor")
|
||||
.build();
|
||||
|
||||
public PartitionDesc() {}
|
||||
|
||||
public PartitionDesc(List<String> partitionColNames,
|
||||
List<AllPartitionDesc> allPartitionDescs) throws AnalysisException {
|
||||
this.partitionColNames = partitionColNames;
|
||||
this.singlePartitionDescs = handleAllPartitionDesc(allPartitionDescs);
|
||||
}
|
||||
|
||||
public List<SinglePartitionDesc> handleAllPartitionDesc(List<AllPartitionDesc> allPartitionDescs)
|
||||
throws AnalysisException {
|
||||
boolean isMultiPartition = false;
|
||||
List<SinglePartitionDesc> tmpList = Lists.newArrayList();
|
||||
if (allPartitionDescs != null) {
|
||||
@ -65,7 +77,7 @@ public class PartitionDesc {
|
||||
throw new AnalysisException("multi partition column size except 1 but provided "
|
||||
+ partitionColNames.size() + ".");
|
||||
}
|
||||
this.singlePartitionDescs = tmpList;
|
||||
return tmpList;
|
||||
}
|
||||
|
||||
public List<SinglePartitionDesc> getSinglePartitionDescs() {
|
||||
@ -85,6 +97,62 @@ public class PartitionDesc {
|
||||
return partitionColNames;
|
||||
}
|
||||
|
||||
// 1. partition by list (column) : now support one slotRef
|
||||
// 2. partition by range(column/function(column)) : support slotRef and some
|
||||
// special function eg: date_trunc, date_floor/ceil
|
||||
public static List<String> getColNamesFromExpr(ArrayList<Expr> exprs, boolean isListPartition)
|
||||
throws AnalysisException {
|
||||
List<String> colNames = new ArrayList<>();
|
||||
for (Expr expr : exprs) {
|
||||
if ((expr instanceof FunctionCallExpr) && (isListPartition == false)) {
|
||||
FunctionCallExpr functionCallExpr = (FunctionCallExpr) expr;
|
||||
List<Expr> paramsExpr = functionCallExpr.getParams().exprs();
|
||||
String name = functionCallExpr.getFnName().getFunction();
|
||||
if (RANGE_PARTITION_FUNCTIONS.contains(name)) {
|
||||
for (Expr param : paramsExpr) {
|
||||
if (param instanceof SlotRef) {
|
||||
if (colNames.isEmpty()) {
|
||||
colNames.add(((SlotRef) param).getColumnName());
|
||||
} else {
|
||||
throw new AnalysisException(
|
||||
"auto create partition only support one slotRef in function expr. "
|
||||
+ expr.toSql());
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
throw new AnalysisException(
|
||||
"auto create partition only support function call expr is date_trunc/date_floor/date_ceil. "
|
||||
+ expr.toSql());
|
||||
}
|
||||
} else if (expr instanceof SlotRef) {
|
||||
if (colNames.isEmpty()) {
|
||||
colNames.add(((SlotRef) expr).getColumnName());
|
||||
} else {
|
||||
throw new AnalysisException(
|
||||
"auto create partition only support one slotRef in expr. "
|
||||
+ expr.toSql());
|
||||
}
|
||||
} else {
|
||||
if (!isListPartition) {
|
||||
throw new AnalysisException(
|
||||
"auto create partition only support slotRef and date_trunc/date_floor/date_ceil"
|
||||
+ "function in range partitions. " + expr.toSql());
|
||||
} else {
|
||||
throw new AnalysisException(
|
||||
"auto create partition only support slotRef in list partitions. "
|
||||
+ expr.toSql());
|
||||
}
|
||||
}
|
||||
}
|
||||
if (colNames.isEmpty()) {
|
||||
throw new AnalysisException(
|
||||
"auto create partition have not find any partition columns. "
|
||||
+ exprs.get(0).toSql());
|
||||
}
|
||||
return colNames;
|
||||
}
|
||||
|
||||
public void analyze(List<ColumnDef> columnDefs, Map<String, String> otherProperties) throws AnalysisException {
|
||||
if (partitionColNames == null || partitionColNames.isEmpty()) {
|
||||
throw new AnalysisException("No partition columns.");
|
||||
@ -128,6 +196,15 @@ public class PartitionDesc {
|
||||
if (this instanceof ListPartitionDesc && columnDef.isAllowNull()) {
|
||||
throw new AnalysisException("The list partition column must be NOT NULL");
|
||||
}
|
||||
if (this instanceof RangePartitionDesc && partitionExprs != null) {
|
||||
if (partitionExprs.get(0) instanceof FunctionCallExpr) {
|
||||
if (!columnDef.getType().isDatetime() && !columnDef.getType().isDatetimeV2()) {
|
||||
throw new AnalysisException(
|
||||
"auto create partition function expr need datetime/datetimev2 type. "
|
||||
+ partitionExprs.get(0).toSql());
|
||||
}
|
||||
}
|
||||
}
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
|
||||
@ -0,0 +1,193 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.analysis;
|
||||
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.catalog.PartitionInfo;
|
||||
import org.apache.doris.catalog.PartitionType;
|
||||
import org.apache.doris.catalog.Type;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.thrift.TStringLiteral;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
public class PartitionExprUtil {
|
||||
public static final String DATETIME_FORMATTER = "%04d-%02d-%02d %02d:%02d:%02d";
|
||||
public static final String DATE_FORMATTER = "%04d-%02d-%02d";
|
||||
public static final String DATETIME_NAME_FORMATTER = "%04d%02d%02d%02d%02d%02d";
|
||||
private static final Logger LOG = LogManager.getLogger(PartitionExprUtil.class);
|
||||
private static final PartitionExprUtil partitionExprUtil = new PartitionExprUtil();
|
||||
|
||||
public static FunctionIntervalInfo getFunctionIntervalInfo(ArrayList<Expr> partitionExprs,
|
||||
PartitionType partitionType) throws AnalysisException {
|
||||
if (partitionType != PartitionType.RANGE) {
|
||||
return null;
|
||||
}
|
||||
if (partitionExprs.size() != 1) {
|
||||
throw new AnalysisException("now only support one expr in range partition");
|
||||
}
|
||||
|
||||
Expr e = partitionExprs.get(0);
|
||||
if (!(e instanceof FunctionCallExpr)) {
|
||||
throw new AnalysisException("now range partition only support FunctionCallExpr");
|
||||
}
|
||||
FunctionCallExpr functionCallExpr = (FunctionCallExpr) e;
|
||||
String fnName = functionCallExpr.getFnName().getFunction();
|
||||
String timeUnit;
|
||||
int interval;
|
||||
if ("date_trunc".equalsIgnoreCase(fnName)) {
|
||||
List<Expr> paramsExprs = functionCallExpr.getParams().exprs();
|
||||
if (paramsExprs.size() != 2) {
|
||||
throw new AnalysisException("date_trunc params exprs size should be 2.");
|
||||
}
|
||||
Expr param = paramsExprs.get(1);
|
||||
if (!(param instanceof StringLiteral)) {
|
||||
throw new AnalysisException("date_trunc param of time unit is not string literal.");
|
||||
}
|
||||
timeUnit = ((StringLiteral) param).getStringValue().toLowerCase();
|
||||
interval = 1;
|
||||
} else {
|
||||
throw new AnalysisException("now range partition only support date_trunc.");
|
||||
}
|
||||
return partitionExprUtil.new FunctionIntervalInfo(timeUnit, interval);
|
||||
}
|
||||
|
||||
public static DateLiteral getRangeEnd(DateLiteral beginTime, FunctionIntervalInfo intervalInfo)
|
||||
throws AnalysisException {
|
||||
String timeUnit = intervalInfo.timeUnit;
|
||||
int interval = intervalInfo.interval;
|
||||
switch (timeUnit) {
|
||||
case "year":
|
||||
return beginTime.plusYears(interval);
|
||||
case "month":
|
||||
return beginTime.plusMonths(interval);
|
||||
case "day":
|
||||
return beginTime.plusDays(interval);
|
||||
case "hour":
|
||||
return beginTime.plusHours(interval);
|
||||
case "minute":
|
||||
return beginTime.plusMinutes(interval);
|
||||
case "second":
|
||||
return beginTime.plusSeconds(interval);
|
||||
default:
|
||||
break;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public static Map<String, AddPartitionClause> getAddPartitionClauseFromPartitionValues(OlapTable olapTable,
|
||||
ArrayList<TStringLiteral> partitionValues, PartitionInfo partitionInfo)
|
||||
throws AnalysisException {
|
||||
Map<String, AddPartitionClause> result = Maps.newHashMap();
|
||||
ArrayList<Expr> partitionExprs = partitionInfo.getPartitionExprs();
|
||||
PartitionType partitionType = partitionInfo.getType();
|
||||
List<Column> partiitonColumn = partitionInfo.getPartitionColumns();
|
||||
Type partitionColumnType = partiitonColumn.get(0).getType();
|
||||
FunctionIntervalInfo intervalInfo = getFunctionIntervalInfo(partitionExprs, partitionType);
|
||||
Set<String> filterPartitionValues = new HashSet<String>();
|
||||
|
||||
for (TStringLiteral partitionValue : partitionValues) {
|
||||
PartitionKeyDesc partitionKeyDesc = null;
|
||||
String partitionName = "p";
|
||||
String value = partitionValue.value;
|
||||
if (filterPartitionValues.contains(value)) {
|
||||
continue;
|
||||
}
|
||||
filterPartitionValues.add(value);
|
||||
if (partitionType == PartitionType.RANGE) {
|
||||
String beginTime = value;
|
||||
DateLiteral beginDateTime = new DateLiteral(beginTime, Type.DATETIMEV2);
|
||||
partitionName += String.format(DATETIME_NAME_FORMATTER,
|
||||
beginDateTime.getYear(), beginDateTime.getMonth(), beginDateTime.getDay(),
|
||||
beginDateTime.getHour(), beginDateTime.getMinute(), beginDateTime.getSecond());
|
||||
DateLiteral endDateTime = getRangeEnd(beginDateTime, intervalInfo);
|
||||
partitionKeyDesc = createPartitionKeyDescWithRange(beginDateTime, endDateTime, partitionColumnType);
|
||||
} else if (partitionType == PartitionType.LIST) {
|
||||
List<List<PartitionValue>> listValues = new ArrayList<>();
|
||||
// TODO: need to support any type
|
||||
String pointValue = value;
|
||||
PartitionValue lowerValue = new PartitionValue(pointValue);
|
||||
listValues.add(Collections.singletonList(lowerValue));
|
||||
partitionKeyDesc = PartitionKeyDesc.createIn(
|
||||
listValues);
|
||||
partitionName += lowerValue.getStringValue();
|
||||
} else {
|
||||
throw new AnalysisException("now only support range and list partition");
|
||||
}
|
||||
|
||||
Map<String, String> partitionProperties = Maps.newHashMap();
|
||||
DistributionDesc distributionDesc = olapTable.getDefaultDistributionInfo().toDistributionDesc();
|
||||
|
||||
SinglePartitionDesc singleRangePartitionDesc = new SinglePartitionDesc(true, partitionName,
|
||||
partitionKeyDesc, partitionProperties);
|
||||
|
||||
AddPartitionClause addPartitionClause = new AddPartitionClause(singleRangePartitionDesc,
|
||||
distributionDesc, partitionProperties, false);
|
||||
result.put(partitionName, addPartitionClause);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
public static PartitionKeyDesc createPartitionKeyDescWithRange(DateLiteral beginDateTime,
|
||||
DateLiteral endDateTime, Type partitionColumnType) throws AnalysisException {
|
||||
String beginTime;
|
||||
String endTime;
|
||||
// maybe need check the range in FE also, like getAddPartitionClause.
|
||||
if (partitionColumnType.isDate() || partitionColumnType.isDateV2()) {
|
||||
beginTime = String.format(DATE_FORMATTER, beginDateTime.getYear(), beginDateTime.getMonth(),
|
||||
beginDateTime.getDay());
|
||||
endTime = String.format(DATE_FORMATTER, endDateTime.getYear(), endDateTime.getMonth(),
|
||||
endDateTime.getDay());
|
||||
} else if (partitionColumnType.isDatetime() || partitionColumnType.isDatetimeV2()) {
|
||||
beginTime = String.format(DATETIME_FORMATTER,
|
||||
beginDateTime.getYear(), beginDateTime.getMonth(), beginDateTime.getDay(),
|
||||
beginDateTime.getHour(), beginDateTime.getMinute(), beginDateTime.getSecond());
|
||||
endTime = String.format(DATETIME_FORMATTER,
|
||||
endDateTime.getYear(), endDateTime.getMonth(), endDateTime.getDay(),
|
||||
endDateTime.getHour(), endDateTime.getMinute(), endDateTime.getSecond());
|
||||
} else {
|
||||
throw new AnalysisException(
|
||||
"not support range partition with column type : " + partitionColumnType.toString());
|
||||
}
|
||||
PartitionValue lowerValue = new PartitionValue(beginTime);
|
||||
PartitionValue upperValue = new PartitionValue(endTime);
|
||||
return PartitionKeyDesc.createFixed(
|
||||
Collections.singletonList(lowerValue),
|
||||
Collections.singletonList(upperValue));
|
||||
}
|
||||
|
||||
public class FunctionIntervalInfo {
|
||||
public String timeUnit;
|
||||
public int interval;
|
||||
|
||||
public FunctionIntervalInfo(String timeUnit, int interval) {
|
||||
this.timeUnit = timeUnit;
|
||||
this.interval = interval;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -35,6 +35,22 @@ public class RangePartitionDesc extends PartitionDesc {
|
||||
List<AllPartitionDesc> allPartitionDescs) throws AnalysisException {
|
||||
super(partitionColNames, allPartitionDescs);
|
||||
type = org.apache.doris.catalog.PartitionType.RANGE;
|
||||
this.isAutoCreatePartitions = false;
|
||||
}
|
||||
|
||||
public RangePartitionDesc(ArrayList<Expr> exprs, List<String> partitionColNames,
|
||||
List<AllPartitionDesc> allPartitionDescs) throws AnalysisException {
|
||||
this.partitionExprs = exprs;
|
||||
this.partitionColNames = partitionColNames;
|
||||
this.singlePartitionDescs = handleAllPartitionDesc(allPartitionDescs);
|
||||
this.type = org.apache.doris.catalog.PartitionType.RANGE;
|
||||
this.isAutoCreatePartitions = true;
|
||||
}
|
||||
|
||||
public static RangePartitionDesc createRangePartitionDesc(ArrayList<Expr> exprs,
|
||||
List<AllPartitionDesc> allPartitionDescs) throws AnalysisException {
|
||||
List<String> colNames = getColNamesFromExpr(exprs, false);
|
||||
return new RangePartitionDesc(exprs, colNames, allPartitionDescs);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -116,7 +132,8 @@ public class RangePartitionDesc extends PartitionDesc {
|
||||
* [ {10, 100, 1000}, {50, 500, MIN } )
|
||||
* [ {50, 500, MIN }, {80, MIN, MIN } )
|
||||
*/
|
||||
RangePartitionInfo rangePartitionInfo = new RangePartitionInfo(partitionColumns);
|
||||
RangePartitionInfo rangePartitionInfo = new RangePartitionInfo(this.isAutoCreatePartitions, this.partitionExprs,
|
||||
partitionColumns);
|
||||
for (SinglePartitionDesc desc : singlePartitionDescs) {
|
||||
long partitionId = partitionNameToId.get(desc.getPartitionName());
|
||||
rangePartitionInfo.handleNewSinglePartitionDesc(desc, partitionId, isTemp);
|
||||
|
||||
@ -18,11 +18,13 @@
|
||||
package org.apache.doris.catalog;
|
||||
|
||||
import org.apache.doris.analysis.AllPartitionDesc;
|
||||
import org.apache.doris.analysis.Expr;
|
||||
import org.apache.doris.analysis.ListPartitionDesc;
|
||||
import org.apache.doris.analysis.PartitionDesc;
|
||||
import org.apache.doris.analysis.PartitionKeyDesc;
|
||||
import org.apache.doris.analysis.PartitionValue;
|
||||
import org.apache.doris.analysis.SinglePartitionDesc;
|
||||
import org.apache.doris.analysis.SlotRef;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.util.ListUtil;
|
||||
@ -54,6 +56,14 @@ public class ListPartitionInfo extends PartitionInfo {
|
||||
this.isMultiColumnPartition = partitionColumns.size() > 1;
|
||||
}
|
||||
|
||||
public ListPartitionInfo(boolean isAutoCreatePartitions, ArrayList<Expr> exprs, List<Column> partitionColumns) {
|
||||
super(PartitionType.LIST, partitionColumns);
|
||||
this.isAutoCreatePartitions = isAutoCreatePartitions;
|
||||
if (exprs != null) {
|
||||
this.partitionExprs.addAll(exprs);
|
||||
}
|
||||
}
|
||||
|
||||
public static PartitionInfo read(DataInput in) throws IOException {
|
||||
PartitionInfo partitionInfo = new ListPartitionInfo();
|
||||
partitionInfo.readFields(in);
|
||||
@ -186,16 +196,31 @@ public class ListPartitionInfo extends PartitionInfo {
|
||||
@Override
|
||||
public String toSql(OlapTable table, List<Long> partitionId) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("PARTITION BY LIST(");
|
||||
int idx = 0;
|
||||
for (Column column : partitionColumns) {
|
||||
if (idx != 0) {
|
||||
sb.append(", ");
|
||||
if (enableAutomaticPartition()) {
|
||||
sb.append("AUTO PARTITION BY LIST ");
|
||||
for (Expr e : partitionExprs) {
|
||||
boolean isSlotRef = (e instanceof SlotRef);
|
||||
if (isSlotRef) {
|
||||
sb.append("(");
|
||||
}
|
||||
sb.append(e.toSql());
|
||||
if (isSlotRef) {
|
||||
sb.append(")");
|
||||
}
|
||||
}
|
||||
sb.append("`").append(column.getName()).append("`");
|
||||
idx++;
|
||||
sb.append("\n(");
|
||||
} else {
|
||||
sb.append("PARTITION BY LIST(");
|
||||
for (Column column : partitionColumns) {
|
||||
if (idx != 0) {
|
||||
sb.append(", ");
|
||||
}
|
||||
sb.append("`").append(column.getName()).append("`");
|
||||
idx++;
|
||||
}
|
||||
sb.append(")\n(");
|
||||
}
|
||||
sb.append(")\n(");
|
||||
|
||||
// sort list
|
||||
List<Map.Entry<Long, PartitionItem>> entries = new ArrayList<>(this.idToItem.entrySet());
|
||||
@ -269,6 +294,6 @@ public class ListPartitionInfo extends PartitionInfo {
|
||||
|
||||
allPartitionDescs.add(new SinglePartitionDesc(false, partitionName, partitionKeyDesc, properties));
|
||||
}
|
||||
return new ListPartitionDesc(partitionColumnNames, allPartitionDescs);
|
||||
return new ListPartitionDesc(this.partitionExprs, partitionColumnNames, allPartitionDescs);
|
||||
}
|
||||
}
|
||||
|
||||
@ -137,7 +137,7 @@ public class OlapTable extends Table {
|
||||
private PartitionInfo partitionInfo;
|
||||
@SerializedName("idToPartition")
|
||||
private Map<Long, Partition> idToPartition = new HashMap<>();
|
||||
private Map<String, Partition> nameToPartition = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
|
||||
private Map<String, Partition> nameToPartition = Maps.newTreeMap();
|
||||
|
||||
@SerializedName(value = "distributionInfo")
|
||||
private DistributionInfo defaultDistributionInfo;
|
||||
|
||||
@ -18,6 +18,7 @@
|
||||
package org.apache.doris.catalog;
|
||||
|
||||
import org.apache.doris.analysis.DateLiteral;
|
||||
import org.apache.doris.analysis.Expr;
|
||||
import org.apache.doris.analysis.MaxLiteral;
|
||||
import org.apache.doris.analysis.PartitionDesc;
|
||||
import org.apache.doris.analysis.PartitionValue;
|
||||
@ -40,6 +41,7 @@ import org.apache.logging.log4j.Logger;
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
@ -81,6 +83,10 @@ public class PartitionInfo implements Writable {
|
||||
// so we defer adding meta serialization until memory engine feature is more complete.
|
||||
protected Map<Long, TTabletType> idToTabletType;
|
||||
|
||||
// the enable automatic partition will hold this, could create partition by expr result
|
||||
protected ArrayList<Expr> partitionExprs;
|
||||
protected boolean isAutoCreatePartitions;
|
||||
|
||||
public PartitionInfo() {
|
||||
this.type = PartitionType.UNPARTITIONED;
|
||||
this.idToDataProperty = new HashMap<>();
|
||||
@ -88,6 +94,7 @@ public class PartitionInfo implements Writable {
|
||||
this.idToInMemory = new HashMap<>();
|
||||
this.idToTabletType = new HashMap<>();
|
||||
this.idToStoragePolicy = new HashMap<>();
|
||||
this.partitionExprs = new ArrayList<>();
|
||||
}
|
||||
|
||||
public PartitionInfo(PartitionType type) {
|
||||
@ -97,6 +104,7 @@ public class PartitionInfo implements Writable {
|
||||
this.idToInMemory = new HashMap<>();
|
||||
this.idToTabletType = new HashMap<>();
|
||||
this.idToStoragePolicy = new HashMap<>();
|
||||
this.partitionExprs = new ArrayList<>();
|
||||
}
|
||||
|
||||
public PartitionInfo(PartitionType type, List<Column> partitionColumns) {
|
||||
@ -215,6 +223,14 @@ public class PartitionInfo implements Writable {
|
||||
return null;
|
||||
}
|
||||
|
||||
public boolean enableAutomaticPartition() {
|
||||
return isAutoCreatePartitions;
|
||||
}
|
||||
|
||||
public ArrayList<Expr> getPartitionExprs() {
|
||||
return this.partitionExprs;
|
||||
}
|
||||
|
||||
public void checkPartitionItemListsMatch(List<PartitionItem> list1, List<PartitionItem> list2) throws DdlException {
|
||||
}
|
||||
|
||||
@ -374,6 +390,13 @@ public class PartitionInfo implements Writable {
|
||||
idToReplicaAllocation.get(entry.getKey()).write(out);
|
||||
out.writeBoolean(idToInMemory.get(entry.getKey()));
|
||||
}
|
||||
int size = partitionExprs.size();
|
||||
out.writeInt(size);
|
||||
for (int i = 0; i < size; ++i) {
|
||||
Expr e = this.partitionExprs.get(i);
|
||||
Expr.writeTo(e, out);
|
||||
}
|
||||
out.writeBoolean(isAutoCreatePartitions);
|
||||
}
|
||||
|
||||
public void readFields(DataInput in) throws IOException {
|
||||
@ -400,6 +423,14 @@ public class PartitionInfo implements Writable {
|
||||
|
||||
idToInMemory.put(partitionId, in.readBoolean());
|
||||
}
|
||||
if (Env.getCurrentEnvJournalVersion() >= FeMetaVersion.VERSION_125) {
|
||||
int size = in.readInt();
|
||||
for (int i = 0; i < size; ++i) {
|
||||
Expr e = Expr.readIn(in);
|
||||
this.partitionExprs.add(e);
|
||||
}
|
||||
this.isAutoCreatePartitions = in.readBoolean();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -438,12 +469,13 @@ public class PartitionInfo implements Writable {
|
||||
&& Objects.equals(idToTempItem, that.idToTempItem) && Objects.equals(idToDataProperty,
|
||||
that.idToDataProperty) && Objects.equals(idToStoragePolicy, that.idToStoragePolicy)
|
||||
&& Objects.equals(idToReplicaAllocation, that.idToReplicaAllocation) && Objects.equals(
|
||||
idToInMemory, that.idToInMemory) && Objects.equals(idToTabletType, that.idToTabletType);
|
||||
idToInMemory, that.idToInMemory) && Objects.equals(idToTabletType, that.idToTabletType)
|
||||
&& Objects.equals(partitionExprs, that.partitionExprs);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(type, partitionColumns, idToItem, idToTempItem, idToDataProperty, idToStoragePolicy,
|
||||
idToReplicaAllocation, isMultiColumnPartition, idToInMemory, idToTabletType);
|
||||
idToReplicaAllocation, isMultiColumnPartition, idToInMemory, idToTabletType, partitionExprs);
|
||||
}
|
||||
}
|
||||
|
||||
@ -18,10 +18,12 @@
|
||||
package org.apache.doris.catalog;
|
||||
|
||||
import org.apache.doris.analysis.AllPartitionDesc;
|
||||
import org.apache.doris.analysis.Expr;
|
||||
import org.apache.doris.analysis.PartitionDesc;
|
||||
import org.apache.doris.analysis.PartitionKeyDesc;
|
||||
import org.apache.doris.analysis.RangePartitionDesc;
|
||||
import org.apache.doris.analysis.SinglePartitionDesc;
|
||||
import org.apache.doris.analysis.SlotRef;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.util.RangeUtils;
|
||||
@ -54,6 +56,14 @@ public class RangePartitionInfo extends PartitionInfo {
|
||||
this.isMultiColumnPartition = partitionColumns.size() > 1;
|
||||
}
|
||||
|
||||
public RangePartitionInfo(boolean isAutoCreatePartitions, ArrayList<Expr> exprs, List<Column> partitionColumns) {
|
||||
super(PartitionType.RANGE, partitionColumns);
|
||||
this.isAutoCreatePartitions = isAutoCreatePartitions;
|
||||
if (exprs != null) {
|
||||
this.partitionExprs.addAll(exprs);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public PartitionItem createAndCheckPartitionItem(SinglePartitionDesc desc, boolean isTemp) throws DdlException {
|
||||
Range<PartitionKey> newRange = null;
|
||||
@ -252,16 +262,31 @@ public class RangePartitionInfo extends PartitionInfo {
|
||||
@Override
|
||||
public String toSql(OlapTable table, List<Long> partitionId) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("PARTITION BY RANGE(");
|
||||
int idx = 0;
|
||||
for (Column column : partitionColumns) {
|
||||
if (idx != 0) {
|
||||
sb.append(", ");
|
||||
if (enableAutomaticPartition()) {
|
||||
sb.append("AUTO PARTITION BY RANGE ");
|
||||
for (Expr e : partitionExprs) {
|
||||
boolean isSlotRef = (e instanceof SlotRef);
|
||||
if (isSlotRef) {
|
||||
sb.append("(");
|
||||
}
|
||||
sb.append(e.toSql());
|
||||
if (isSlotRef) {
|
||||
sb.append(")");
|
||||
}
|
||||
}
|
||||
sb.append("`").append(column.getName()).append("`");
|
||||
idx++;
|
||||
sb.append("\n(");
|
||||
} else {
|
||||
sb.append("PARTITION BY RANGE(");
|
||||
for (Column column : partitionColumns) {
|
||||
if (idx != 0) {
|
||||
sb.append(", ");
|
||||
}
|
||||
sb.append("`").append(column.getName()).append("`");
|
||||
idx++;
|
||||
}
|
||||
sb.append(")\n(");
|
||||
}
|
||||
sb.append(")\n(");
|
||||
|
||||
// sort range
|
||||
List<Map.Entry<Long, PartitionItem>> entries = new ArrayList<>(this.idToItem.entrySet());
|
||||
@ -325,6 +350,6 @@ public class RangePartitionInfo extends PartitionInfo {
|
||||
|
||||
allPartitionDescs.add(new SinglePartitionDesc(false, partitionName, partitionKeyDesc, properties));
|
||||
}
|
||||
return new RangePartitionDesc(partitionColumnNames, allPartitionDescs);
|
||||
return new RangePartitionDesc(this.partitionExprs, partitionColumnNames, allPartitionDescs);
|
||||
}
|
||||
}
|
||||
|
||||
@ -131,7 +131,7 @@ public class OlapTableSink extends DataSink {
|
||||
|
||||
if (partitionIds == null) {
|
||||
partitionIds = dstTable.getPartitionIds();
|
||||
if (partitionIds.isEmpty()) {
|
||||
if (partitionIds.isEmpty() && dstTable.getPartitionInfo().enableAutomaticPartition() == false) {
|
||||
ErrorReport.reportAnalysisException(ErrorCode.ERR_EMPTY_PARTITION_IN_TABLE, dstTable.getName());
|
||||
}
|
||||
}
|
||||
@ -178,7 +178,7 @@ public class OlapTableSink extends DataSink {
|
||||
tSink.setNumReplicas(numReplicas);
|
||||
tSink.setNeedGenRollup(dstTable.shouldLoadToNewRollup());
|
||||
tSink.setSchema(createSchema(tSink.getDbId(), dstTable, analyzer));
|
||||
tSink.setPartition(createPartition(tSink.getDbId(), dstTable));
|
||||
tSink.setPartition(createPartition(tSink.getDbId(), dstTable, analyzer));
|
||||
List<TOlapTableLocationParam> locationParams = createLocation(dstTable);
|
||||
tSink.setLocation(locationParams.get(0));
|
||||
if (singleReplicaLoad) {
|
||||
@ -293,7 +293,8 @@ public class OlapTableSink extends DataSink {
|
||||
return distColumns;
|
||||
}
|
||||
|
||||
private TOlapTablePartitionParam createPartition(long dbId, OlapTable table) throws UserException {
|
||||
private TOlapTablePartitionParam createPartition(long dbId, OlapTable table, Analyzer analyzer)
|
||||
throws UserException {
|
||||
TOlapTablePartitionParam partitionParam = new TOlapTablePartitionParam();
|
||||
partitionParam.setDbId(dbId);
|
||||
partitionParam.setTableId(table.getId());
|
||||
@ -337,6 +338,22 @@ public class OlapTableSink extends DataSink {
|
||||
}
|
||||
}
|
||||
}
|
||||
// for auto create partition by function expr, there is no any partition firstly,
|
||||
// But this is required in thrift struct.
|
||||
if (partitionIds.isEmpty()) {
|
||||
partitionParam.setDistributedColumns(getDistColumns(table.getDefaultDistributionInfo()));
|
||||
partitionParam.setPartitions(new ArrayList<TOlapTablePartition>());
|
||||
}
|
||||
ArrayList<Expr> exprs = partitionInfo.getPartitionExprs();
|
||||
if (exprs != null && analyzer != null) {
|
||||
tupleDescriptor.setTable(table);
|
||||
analyzer.registerTupleDescriptor(tupleDescriptor);
|
||||
for (Expr e : exprs) {
|
||||
e.analyze(analyzer);
|
||||
}
|
||||
partitionParam.setPartitionFunctionExprs(Expr.treesToThrift(exprs));
|
||||
}
|
||||
partitionParam.setEnableAutomaticPartition(partitionInfo.enableAutomaticPartition());
|
||||
break;
|
||||
}
|
||||
case UNPARTITIONED: {
|
||||
@ -362,16 +379,18 @@ public class OlapTableSink extends DataSink {
|
||||
}
|
||||
partitionParam.addToPartitions(tPartition);
|
||||
partitionParam.setDistributedColumns(getDistColumns(partition.getDistributionInfo()));
|
||||
partitionParam.setEnableAutomaticPartition(false);
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
throw new UserException("unsupported partition for OlapTable, partition=" + partType);
|
||||
}
|
||||
}
|
||||
partitionParam.setPartitionType(partType.toThrift());
|
||||
return partitionParam;
|
||||
}
|
||||
|
||||
private void setPartitionKeys(TOlapTablePartition tPartition, PartitionItem partitionItem, int partColNum) {
|
||||
public static void setPartitionKeys(TOlapTablePartition tPartition, PartitionItem partitionItem, int partColNum) {
|
||||
if (partitionItem instanceof RangePartitionItem) {
|
||||
Range<PartitionKey> range = partitionItem.getItems();
|
||||
// set start keys
|
||||
@ -439,6 +458,10 @@ public class OlapTableSink extends DataSink {
|
||||
}
|
||||
}
|
||||
|
||||
// for partition by function expr, there is no any partition firstly, But this is required in thrift struct.
|
||||
if (partitionIds.isEmpty()) {
|
||||
locationParam.setTablets(new ArrayList<TTabletLocation>());
|
||||
}
|
||||
// check if disk capacity reach limit
|
||||
// this is for load process, so use high water mark to check
|
||||
Status st = Env.getCurrentSystemInfo().checkExceedDiskCapacityLimit(allBePathsMap, true);
|
||||
|
||||
@ -19,10 +19,12 @@ package org.apache.doris.service;
|
||||
|
||||
import org.apache.doris.alter.SchemaChangeHandler;
|
||||
import org.apache.doris.analysis.AddColumnsClause;
|
||||
import org.apache.doris.analysis.AddPartitionClause;
|
||||
import org.apache.doris.analysis.Analyzer;
|
||||
import org.apache.doris.analysis.ColumnDef;
|
||||
import org.apache.doris.analysis.LabelName;
|
||||
import org.apache.doris.analysis.NativeInsertStmt;
|
||||
import org.apache.doris.analysis.PartitionExprUtil;
|
||||
import org.apache.doris.analysis.RestoreStmt;
|
||||
import org.apache.doris.analysis.SetType;
|
||||
import org.apache.doris.analysis.SqlParser;
|
||||
@ -37,7 +39,10 @@ import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.DatabaseIf;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.Index;
|
||||
import org.apache.doris.catalog.MaterializedIndex;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.catalog.Partition;
|
||||
import org.apache.doris.catalog.PartitionInfo;
|
||||
import org.apache.doris.catalog.Replica;
|
||||
import org.apache.doris.catalog.Table;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
@ -50,6 +55,7 @@ import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.AuthenticationException;
|
||||
import org.apache.doris.common.CaseSensibility;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.DuplicatedRequestException;
|
||||
import org.apache.doris.common.LabelAlreadyUsedException;
|
||||
import org.apache.doris.common.MetaNotFoundException;
|
||||
@ -72,6 +78,7 @@ import org.apache.doris.master.MasterImpl;
|
||||
import org.apache.doris.mysql.privilege.AccessControllerManager;
|
||||
import org.apache.doris.mysql.privilege.PrivPredicate;
|
||||
import org.apache.doris.persist.gson.GsonUtils;
|
||||
import org.apache.doris.planner.OlapTableSink;
|
||||
import org.apache.doris.planner.StreamLoadPlanner;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.qe.ConnectProcessor;
|
||||
@ -110,6 +117,8 @@ import org.apache.doris.thrift.TCommitTxnRequest;
|
||||
import org.apache.doris.thrift.TCommitTxnResult;
|
||||
import org.apache.doris.thrift.TConfirmUnusedRemoteFilesRequest;
|
||||
import org.apache.doris.thrift.TConfirmUnusedRemoteFilesResult;
|
||||
import org.apache.doris.thrift.TCreatePartitionRequest;
|
||||
import org.apache.doris.thrift.TCreatePartitionResult;
|
||||
import org.apache.doris.thrift.TDescribeTableParams;
|
||||
import org.apache.doris.thrift.TDescribeTableResult;
|
||||
import org.apache.doris.thrift.TDescribeTablesParams;
|
||||
@ -155,6 +164,9 @@ import org.apache.doris.thrift.TMasterOpResult;
|
||||
import org.apache.doris.thrift.TMasterResult;
|
||||
import org.apache.doris.thrift.TMySqlLoadAcquireTokenResult;
|
||||
import org.apache.doris.thrift.TNetworkAddress;
|
||||
import org.apache.doris.thrift.TNodeInfo;
|
||||
import org.apache.doris.thrift.TOlapTableIndexTablets;
|
||||
import org.apache.doris.thrift.TOlapTablePartition;
|
||||
import org.apache.doris.thrift.TPipelineFragmentParams;
|
||||
import org.apache.doris.thrift.TPrivilegeCtrl;
|
||||
import org.apache.doris.thrift.TPrivilegeHier;
|
||||
@ -180,10 +192,12 @@ import org.apache.doris.thrift.TStatusCode;
|
||||
import org.apache.doris.thrift.TStreamLoadMultiTablePutResult;
|
||||
import org.apache.doris.thrift.TStreamLoadPutRequest;
|
||||
import org.apache.doris.thrift.TStreamLoadPutResult;
|
||||
import org.apache.doris.thrift.TStringLiteral;
|
||||
import org.apache.doris.thrift.TTableIndexQueryStats;
|
||||
import org.apache.doris.thrift.TTableMetadataNameIds;
|
||||
import org.apache.doris.thrift.TTableQueryStats;
|
||||
import org.apache.doris.thrift.TTableStatus;
|
||||
import org.apache.doris.thrift.TTabletLocation;
|
||||
import org.apache.doris.thrift.TTxnParams;
|
||||
import org.apache.doris.thrift.TUpdateExportTaskStatusRequest;
|
||||
import org.apache.doris.thrift.TUpdateFollowerStatsCacheRequest;
|
||||
@ -200,6 +214,7 @@ import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Multimap;
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
@ -3035,4 +3050,124 @@ public class FrontendServiceImpl implements FrontendService.Iface {
|
||||
// Return Ok anyway
|
||||
return new TStatus(TStatusCode.OK);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TCreatePartitionResult createPartition(TCreatePartitionRequest request) throws TException {
|
||||
LOG.info("Receive create partition request: {}", request);
|
||||
long dbId = request.getDbId();
|
||||
long tableId = request.getTableId();
|
||||
TCreatePartitionResult result = new TCreatePartitionResult();
|
||||
TStatus errorStatus = new TStatus(TStatusCode.RUNTIME_ERROR);
|
||||
|
||||
Database db = Env.getCurrentEnv().getInternalCatalog().getDbNullable(dbId);
|
||||
if (db == null) {
|
||||
errorStatus.setErrorMsgs(Lists.newArrayList(String.format("dbId=%d is not exists", dbId)));
|
||||
result.setStatus(errorStatus);
|
||||
return result;
|
||||
}
|
||||
|
||||
Table table = db.getTable(tableId).get();
|
||||
if (table == null) {
|
||||
errorStatus.setErrorMsgs(
|
||||
(Lists.newArrayList(String.format("dbId=%d tableId=%d is not exists", dbId, tableId))));
|
||||
result.setStatus(errorStatus);
|
||||
return result;
|
||||
}
|
||||
|
||||
if (!(table instanceof OlapTable)) {
|
||||
errorStatus.setErrorMsgs(
|
||||
Lists.newArrayList(String.format("dbId=%d tableId=%d is not olap table", dbId, tableId)));
|
||||
result.setStatus(errorStatus);
|
||||
return result;
|
||||
}
|
||||
|
||||
if (request.partitionValues == null) {
|
||||
errorStatus.setErrorMsgs(Lists.newArrayList("partitionValues should not null."));
|
||||
result.setStatus(errorStatus);
|
||||
return result;
|
||||
}
|
||||
|
||||
OlapTable olapTable = (OlapTable) table;
|
||||
PartitionInfo partitionInfo = olapTable.getPartitionInfo();
|
||||
ArrayList<TStringLiteral> partitionValues = new ArrayList<TStringLiteral>();
|
||||
for (int i = 0; i < request.partitionValues.size(); i++) {
|
||||
if (request.partitionValues.get(i).size() != 1) {
|
||||
errorStatus.setErrorMsgs(
|
||||
Lists.newArrayList("Only support single partition, partitionValues size should equal 1."));
|
||||
result.setStatus(errorStatus);
|
||||
return result;
|
||||
}
|
||||
partitionValues.add(request.partitionValues.get(i).get(0));
|
||||
}
|
||||
Map<String, AddPartitionClause> addPartitionClauseMap;
|
||||
try {
|
||||
addPartitionClauseMap = PartitionExprUtil.getAddPartitionClauseFromPartitionValues(olapTable,
|
||||
partitionValues, partitionInfo);
|
||||
} catch (AnalysisException ex) {
|
||||
errorStatus.setErrorMsgs(Lists.newArrayList(ex.getMessage()));
|
||||
result.setStatus(errorStatus);
|
||||
return result;
|
||||
}
|
||||
|
||||
for (AddPartitionClause addPartitionClause : addPartitionClauseMap.values()) {
|
||||
try {
|
||||
// here maybe check and limit created partitions num
|
||||
Env.getCurrentEnv().addPartition(db, olapTable.getName(), addPartitionClause);
|
||||
} catch (DdlException e) {
|
||||
LOG.warn(e);
|
||||
errorStatus.setErrorMsgs(
|
||||
Lists.newArrayList(String.format("create partition failed. error:%s", e.getMessage())));
|
||||
result.setStatus(errorStatus);
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
// build partition & tablets
|
||||
List<TOlapTablePartition> partitions = Lists.newArrayList();
|
||||
List<TTabletLocation> tablets = Lists.newArrayList();
|
||||
for (String partitionName : addPartitionClauseMap.keySet()) {
|
||||
Partition partition = table.getPartition(partitionName);
|
||||
TOlapTablePartition tPartition = new TOlapTablePartition();
|
||||
tPartition.setId(partition.getId());
|
||||
int partColNum = partitionInfo.getPartitionColumns().size();
|
||||
// set partition keys
|
||||
OlapTableSink.setPartitionKeys(tPartition, partitionInfo.getItem(partition.getId()), partColNum);
|
||||
for (MaterializedIndex index : partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)) {
|
||||
tPartition.addToIndexes(new TOlapTableIndexTablets(index.getId(), Lists.newArrayList(
|
||||
index.getTablets().stream().map(Tablet::getId).collect(Collectors.toList()))));
|
||||
tPartition.setNumBuckets(index.getTablets().size());
|
||||
}
|
||||
tPartition.setIsMutable(olapTable.getPartitionInfo().getIsMutable(partition.getId()));
|
||||
partitions.add(tPartition);
|
||||
// tablet
|
||||
int quorum = olapTable.getPartitionInfo().getReplicaAllocation(partition.getId()).getTotalReplicaNum() / 2
|
||||
+ 1;
|
||||
for (MaterializedIndex index : partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)) {
|
||||
for (Tablet tablet : index.getTablets()) {
|
||||
// we should ensure the replica backend is alive
|
||||
// otherwise, there will be a 'unknown node id, id=xxx' error for stream load
|
||||
// BE id -> path hash
|
||||
Multimap<Long, Long> bePathsMap = tablet.getNormalReplicaBackendPathMap();
|
||||
if (bePathsMap.keySet().size() < quorum) {
|
||||
LOG.warn("auto go quorum exception");
|
||||
}
|
||||
tablets.add(new TTabletLocation(tablet.getId(), Lists.newArrayList(bePathsMap.keySet())));
|
||||
}
|
||||
}
|
||||
}
|
||||
result.setPartitions(partitions);
|
||||
result.setTablets(tablets);
|
||||
|
||||
// build nodes
|
||||
List<TNodeInfo> nodeInfos = Lists.newArrayList();
|
||||
SystemInfoService systemInfoService = Env.getCurrentSystemInfo();
|
||||
for (Long id : systemInfoService.getAllBackendIds(false)) {
|
||||
Backend backend = systemInfoService.getBackend(id);
|
||||
nodeInfos.add(new TNodeInfo(backend.getId(), 0, backend.getHost(), backend.getBrpcPort()));
|
||||
}
|
||||
result.setNodes(nodeInfos);
|
||||
result.setStatus(new TStatus(TStatusCode.OK));
|
||||
LOG.debug("send create partition result: {}", result);
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
@ -84,26 +84,27 @@ public class TruncateTableTest {
|
||||
|
||||
@Test
|
||||
public void testTruncateWithCaseInsensitivePartitionName() throws Exception {
|
||||
//now in order to support auto create partition, need set partition name is case sensitive
|
||||
Database db = Env.getCurrentInternalCatalog().getDbNullable("default_cluster:test");
|
||||
OlapTable tbl = db.getOlapTableOrDdlException("case_sensitive_table");
|
||||
long p20211006Id = tbl.getPartition("P20211006").getId();
|
||||
long p20211006Id = tbl.getPartition("p20211006").getId();
|
||||
long p20211007Id = tbl.getPartition("P20211007").getId();
|
||||
long p20211008Id = tbl.getPartition("p20211008").getId();
|
||||
long p20211008Id = tbl.getPartition("P20211008").getId();
|
||||
// truncate p20211008(real name is P20211008)
|
||||
String truncateStr = "TRUNCATE TABLE test.case_sensitive_table PARTITION p20211008; \n";
|
||||
String truncateStr = "TRUNCATE TABLE test.case_sensitive_table PARTITION P20211008; \n";
|
||||
TruncateTableStmt truncateTableStmt
|
||||
= (TruncateTableStmt) UtFrameUtils.parseAndAnalyzeStmt(truncateStr, connectContext);
|
||||
Env.getCurrentEnv().truncateTable(truncateTableStmt);
|
||||
Assert.assertNotEquals(p20211008Id, tbl.getPartition("p20211008").getId());
|
||||
Assert.assertNotEquals(p20211008Id, tbl.getPartition("P20211008").getId());
|
||||
// 2. truncate P20211007
|
||||
truncateStr = "TRUNCATE TABLE test.case_sensitive_table PARTITION P20211007; \n";
|
||||
truncateTableStmt = (TruncateTableStmt) UtFrameUtils.parseAndAnalyzeStmt(truncateStr, connectContext);
|
||||
Env.getCurrentEnv().truncateTable(truncateTableStmt);
|
||||
Assert.assertEquals(3, tbl.getPartitionInfo().idToDataProperty.size());
|
||||
Assert.assertNotEquals(p20211007Id, tbl.getPartition("p20211007").getId());
|
||||
Assert.assertNotEquals(p20211007Id, tbl.getPartition("P20211007").getId());
|
||||
Assert.assertEquals(p20211006Id, tbl.getPartition("p20211006").getId());
|
||||
Assert.assertNotNull(tbl.getPartition("p20211006"));
|
||||
Assert.assertNotNull(tbl.getPartition("P20211006"));
|
||||
Assert.assertNotNull(tbl.getPartition("p20211006"));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
@ -222,7 +222,7 @@ public class DynamicPartitionUtilTest {
|
||||
List<Column> partitionColumnList = Lists.newArrayList();
|
||||
Column partitionColumn = new Column();
|
||||
partitionColumn.setType(Type.DATE);
|
||||
Deencapsulation.setField(rangePartitionInfo, partitionColumnList);
|
||||
Deencapsulation.setField(rangePartitionInfo, "partitionColumns", partitionColumnList);
|
||||
try {
|
||||
Deencapsulation.invoke(dynamicPartitionUtil, "checkTimeUnit", "HOUR", rangePartitionInfo);
|
||||
Assert.fail();
|
||||
|
||||
@ -0,0 +1,159 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.service;
|
||||
|
||||
|
||||
import org.apache.doris.analysis.CreateDbStmt;
|
||||
import org.apache.doris.analysis.CreateTableStmt;
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.catalog.Partition;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.thrift.TCreatePartitionRequest;
|
||||
import org.apache.doris.thrift.TCreatePartitionResult;
|
||||
import org.apache.doris.thrift.TStatusCode;
|
||||
import org.apache.doris.thrift.TStringLiteral;
|
||||
import org.apache.doris.utframe.UtFrameUtils;
|
||||
|
||||
import mockit.Mocked;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.ExpectedException;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
public class FrontendServiceImplTest {
|
||||
private static String runningDir = "fe/mocked/FrontendServiceImplTest/" + UUID.randomUUID().toString() + "/";
|
||||
private static ConnectContext connectContext;
|
||||
@Rule
|
||||
public ExpectedException expectedException = ExpectedException.none();
|
||||
@Mocked
|
||||
ExecuteEnv exeEnv;
|
||||
|
||||
@BeforeClass
|
||||
public static void beforeClass() throws Exception {
|
||||
FeConstants.runningUnitTest = true;
|
||||
FeConstants.default_scheduler_interval_millisecond = 100;
|
||||
Config.dynamic_partition_enable = true;
|
||||
Config.dynamic_partition_check_interval_seconds = 1;
|
||||
UtFrameUtils.createDorisCluster(runningDir);
|
||||
// create connect context
|
||||
connectContext = UtFrameUtils.createDefaultCtx();
|
||||
// create database
|
||||
String createDbStmtStr = "create database test;";
|
||||
CreateDbStmt createDbStmt = (CreateDbStmt) UtFrameUtils.parseAndAnalyzeStmt(createDbStmtStr, connectContext);
|
||||
Env.getCurrentEnv().createDb(createDbStmt);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() {
|
||||
UtFrameUtils.cleanDorisFeDir(runningDir);
|
||||
}
|
||||
|
||||
private static void createTable(String sql) throws Exception {
|
||||
CreateTableStmt createTableStmt = (CreateTableStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, connectContext);
|
||||
Env.getCurrentEnv().createTable(createTableStmt);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testCreatePartitionRange() throws Exception {
|
||||
String createOlapTblStmt = new String("CREATE TABLE test.partition_range(\n"
|
||||
+ " event_day DATETIME,\n"
|
||||
+ " site_id INT DEFAULT '10',\n"
|
||||
+ " city_code VARCHAR(100)\n"
|
||||
+ ")\n"
|
||||
+ "DUPLICATE KEY(event_day, site_id, city_code)\n"
|
||||
+ "AUTO PARTITION BY range date_trunc( event_day,'day') (\n"
|
||||
+ "\n"
|
||||
+ ")\n"
|
||||
+ "DISTRIBUTED BY HASH(event_day, site_id) BUCKETS 2\n"
|
||||
+ "PROPERTIES(\"replication_num\" = \"1\");");
|
||||
|
||||
createTable(createOlapTblStmt);
|
||||
Database db = Env.getCurrentInternalCatalog().getDbOrAnalysisException("default_cluster:test");
|
||||
OlapTable table = (OlapTable) db.getTableOrAnalysisException("partition_range");
|
||||
|
||||
List<List<TStringLiteral>> partitionValues = new ArrayList<>();
|
||||
List<TStringLiteral> values = new ArrayList<>();
|
||||
|
||||
TStringLiteral start = new TStringLiteral();
|
||||
start.setValue("2023-08-07 00:00:00");
|
||||
values.add(start);
|
||||
|
||||
partitionValues.add(values);
|
||||
|
||||
FrontendServiceImpl impl = new FrontendServiceImpl(exeEnv);
|
||||
TCreatePartitionRequest request = new TCreatePartitionRequest();
|
||||
request.setDbId(db.getId());
|
||||
request.setTableId(table.getId());
|
||||
request.setPartitionValues(partitionValues);
|
||||
TCreatePartitionResult partition = impl.createPartition(request);
|
||||
|
||||
Assert.assertEquals(partition.getStatus().getStatusCode(), TStatusCode.OK);
|
||||
Partition p20230807 = table.getPartition("p20230807000000");
|
||||
Assert.assertNotNull(p20230807);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreatePartitionList() throws Exception {
|
||||
String createOlapTblStmt = new String("CREATE TABLE test.partition_list(\n"
|
||||
+ " event_day DATETIME,\n"
|
||||
+ " site_id INT DEFAULT '10',\n"
|
||||
+ " city_code VARCHAR(100) not null\n"
|
||||
+ ")\n"
|
||||
+ "DUPLICATE KEY(event_day, site_id, city_code)\n"
|
||||
+ "AUTO PARTITION BY list (city_code) (\n"
|
||||
+ "\n"
|
||||
+ ")\n"
|
||||
+ "DISTRIBUTED BY HASH(event_day, site_id) BUCKETS 2\n"
|
||||
+ "PROPERTIES(\"replication_num\" = \"1\");");
|
||||
|
||||
createTable(createOlapTblStmt);
|
||||
Database db = Env.getCurrentInternalCatalog().getDbOrAnalysisException("default_cluster:test");
|
||||
OlapTable table = (OlapTable) db.getTableOrAnalysisException("partition_list");
|
||||
|
||||
List<List<TStringLiteral>> partitionValues = new ArrayList<>();
|
||||
List<TStringLiteral> values = new ArrayList<>();
|
||||
|
||||
TStringLiteral start = new TStringLiteral();
|
||||
start.setValue("BEIJING");
|
||||
values.add(start);
|
||||
|
||||
partitionValues.add(values);
|
||||
|
||||
FrontendServiceImpl impl = new FrontendServiceImpl(exeEnv);
|
||||
TCreatePartitionRequest request = new TCreatePartitionRequest();
|
||||
request.setDbId(db.getId());
|
||||
request.setTableId(table.getId());
|
||||
request.setPartitionValues(partitionValues);
|
||||
TCreatePartitionResult partition = impl.createPartition(request);
|
||||
|
||||
Assert.assertEquals(partition.getStatus().getStatusCode(), TStatusCode.OK);
|
||||
Partition pbeijing = table.getPartition("pBEIJING");
|
||||
Assert.assertNotNull(pbeijing);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user