(improvement)[bucket] Add auto bucket implement (#15250)

This commit is contained in:
Drogon
2023-01-18 19:50:18 +08:00
committed by GitHub
parent 0916cbcb10
commit 34075368ec
20 changed files with 836 additions and 147 deletions

View File

@ -45,6 +45,7 @@ import org.apache.doris.catalog.StructField;
import org.apache.doris.catalog.StructType;
import org.apache.doris.catalog.View;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Version;
import org.apache.doris.mysql.MysqlPassword;
import org.apache.doris.load.loadv2.LoadTask;
@ -616,7 +617,8 @@ terminal String
KW_YEAR,
KW_MTMV,
KW_TYPECAST,
KW_HISTOGRAM;
KW_HISTOGRAM,
KW_AUTO;
terminal COMMA, COLON, DOT, DOTDOTDOT, AT, STAR, LPAREN, RPAREN, SEMICOLON, LBRACKET, RBRACKET, DIVIDE, MOD, ADD, SUBTRACT;
terminal BITAND, BITOR, BITXOR, BITNOT;
@ -2914,12 +2916,16 @@ opt_distribution ::=
/* Hash distributed */
| KW_DISTRIBUTED KW_BY KW_HASH LPAREN ident_list:columns RPAREN opt_distribution_number:numDistribution
{:
RESULT = new HashDistributionDesc(numDistribution, columns);
int bucketNum = (numDistribution == null ? -1 : numDistribution);
boolean is_auto_bucket = (numDistribution == null);
RESULT = new HashDistributionDesc(bucketNum, is_auto_bucket, columns);
:}
/* Random distributed */
| KW_DISTRIBUTED KW_BY KW_RANDOM opt_distribution_number:numDistribution
{:
RESULT = new RandomDistributionDesc(numDistribution);
int bucketNum = (numDistribution == null ? -1 : numDistribution);
boolean is_auto_bucket = (numDistribution == null);
RESULT = new RandomDistributionDesc(bucketNum, is_auto_bucket);
:}
;
@ -2937,13 +2943,17 @@ opt_rollup ::=
opt_distribution_number ::=
/* Empty */
{:
/* If distribution number is null, default distribution number is 10. */
RESULT = 10;
/* If distribution number is null, default distribution number is FeConstants.default_bucket_num. */
RESULT = FeConstants.default_bucket_num;
:}
| KW_BUCKETS INTEGER_LITERAL:numDistribution
{:
RESULT = numDistribution.intValue();
:}
| KW_BUCKETS KW_AUTO
{:
RESULT = null;
:}
;
opt_keys ::=

View File

@ -31,6 +31,8 @@ import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.AutoBucketUtils;
import org.apache.doris.common.util.ParseUtil;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.common.util.Util;
@ -94,6 +96,32 @@ public class CreateTableStmt extends DdlStmt {
engineNames.add("jdbc");
}
// if auto bucket auto bucket enable, rewrite distribution bucket num &&
// set properties[PropertyAnalyzer.PROPERTIES_AUTO_BUCKET] = "true"
private static Map<String, String> maybeRewriteByAutoBucket(DistributionDesc distributionDesc,
Map<String, String> properties) throws AnalysisException {
if (distributionDesc == null || !distributionDesc.isAutoBucket()) {
return properties;
}
// auto bucket is enable
Map<String, String> newProperties = properties;
if (newProperties == null) {
newProperties = new HashMap<String, String>();
}
newProperties.put(PropertyAnalyzer.PROPERTIES_AUTO_BUCKET, "true");
if (!newProperties.containsKey(PropertyAnalyzer.PROPERTIES_ESTIMATE_PARTITION_SIZE)) {
distributionDesc.setBuckets(FeConstants.default_bucket_num);
} else {
long partitionSize = ParseUtil
.analyzeDataVolumn(newProperties.get(PropertyAnalyzer.PROPERTIES_ESTIMATE_PARTITION_SIZE));
distributionDesc.setBuckets(AutoBucketUtils.getBucketsNum(partitionSize));
}
return newProperties;
}
public CreateTableStmt() {
// for persist
tableName = new TableName();
@ -260,7 +288,11 @@ public class CreateTableStmt extends DdlStmt {
}
@Override
public void analyze(Analyzer analyzer) throws UserException {
public void analyze(Analyzer analyzer) throws UserException, AnalysisException {
if (Strings.isNullOrEmpty(engineName) || engineName.equalsIgnoreCase("olap")) {
this.properties = maybeRewriteByAutoBucket(distributionDesc, properties);
}
super.analyze(analyzer);
tableName.analyze(analyzer);
FeNameFormat.checkTableName(tableName.getTbl());

View File

@ -22,22 +22,36 @@ import org.apache.doris.catalog.DistributionInfo;
import org.apache.doris.catalog.DistributionInfo.DistributionInfoType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.commons.lang.NotImplementedException;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
import java.util.Set;
public class DistributionDesc implements Writable {
public class DistributionDesc {
protected DistributionInfoType type;
protected int numBucket;
protected boolean autoBucket;
public DistributionDesc() {
public DistributionDesc(int numBucket) {
this(numBucket, false);
}
public DistributionDesc(int numBucket, boolean autoBucket) {
this.numBucket = numBucket;
this.autoBucket = autoBucket;
}
public int getBuckets() {
return numBucket;
}
public int setBuckets(int numBucket) {
return this.numBucket = numBucket;
}
public boolean isAutoBucket() {
return autoBucket;
}
public void analyze(Set<String> colSet, List<ColumnDef> columnDefs, KeysDesc keysDesc) throws AnalysisException {
@ -51,28 +65,4 @@ public class DistributionDesc implements Writable {
public DistributionInfo toDistributionInfo(List<Column> columns) throws DdlException {
throw new NotImplementedException();
}
public static DistributionDesc read(DataInput in) throws IOException {
DistributionInfoType type = DistributionInfoType.valueOf(Text.readString(in));
if (type == DistributionInfoType.HASH) {
DistributionDesc desc = new HashDistributionDesc();
desc.readFields(in);
return desc;
} else if (type == DistributionInfoType.RANDOM) {
DistributionDesc desc = new RandomDistributionDesc();
desc.readFields(in);
return desc;
} else {
throw new IOException("Unknown distribution type: " + type);
}
}
@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, type.name());
}
public void readFields(DataInput in) throws IOException {
throw new NotImplementedException();
}
}

View File

@ -25,29 +25,25 @@ import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.io.Text;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
import java.util.Set;
public class HashDistributionDesc extends DistributionDesc {
private int numBucket;
private List<String> distributionColumnNames;
public HashDistributionDesc() {
public HashDistributionDesc(int numBucket, List<String> distributionColumnNames) {
super(numBucket);
type = DistributionInfoType.HASH;
distributionColumnNames = Lists.newArrayList();
this.distributionColumnNames = distributionColumnNames;
}
public HashDistributionDesc(int numBucket, List<String> distributionColumnNames) {
public HashDistributionDesc(int numBucket, boolean autoBucket, List<String> distributionColumnNames) {
super(numBucket, autoBucket);
type = DistributionInfoType.HASH;
this.numBucket = numBucket;
this.distributionColumnNames = distributionColumnNames;
}
@ -55,14 +51,10 @@ public class HashDistributionDesc extends DistributionDesc {
return distributionColumnNames;
}
public int getBuckets() {
return numBucket;
}
@Override
public void analyze(Set<String> colSet, List<ColumnDef> columnDefs, KeysDesc keysDesc) throws AnalysisException {
if (numBucket <= 0) {
throw new AnalysisException("Number of hash distribution should be larger than zero.");
throw new AnalysisException("Number of hash distribution should be greater than zero.");
}
if (distributionColumnNames == null || distributionColumnNames.size() == 0) {
throw new AnalysisException("Number of hash column should be larger than zero.");
@ -100,7 +92,11 @@ public class HashDistributionDesc extends DistributionDesc {
i++;
}
stringBuilder.append(")\n");
stringBuilder.append("BUCKETS ").append(numBucket);
if (autoBucket) {
stringBuilder.append("BUCKETS AUTO");
} else {
stringBuilder.append("BUCKETS ").append(numBucket);
}
return stringBuilder.toString();
}
@ -139,27 +135,8 @@ public class HashDistributionDesc extends DistributionDesc {
}
}
HashDistributionInfo hashDistributionInfo = new HashDistributionInfo(numBucket, distributionColumns);
HashDistributionInfo hashDistributionInfo =
new HashDistributionInfo(numBucket, autoBucket, distributionColumns);
return hashDistributionInfo;
}
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
out.writeInt(numBucket);
int count = distributionColumnNames.size();
out.writeInt(count);
for (String colName : distributionColumnNames) {
Text.writeString(out, colName);
}
}
public void readFields(DataInput in) throws IOException {
numBucket = in.readInt();
int count = in.readInt();
for (int i = 0; i < count; i++) {
distributionColumnNames.add(Text.readString(in));
}
}
}

View File

@ -23,28 +23,24 @@ import org.apache.doris.catalog.DistributionInfo.DistributionInfoType;
import org.apache.doris.catalog.RandomDistributionInfo;
import org.apache.doris.common.AnalysisException;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
import java.util.Set;
public class RandomDistributionDesc extends DistributionDesc {
int numBucket;
public RandomDistributionDesc() {
public RandomDistributionDesc(int numBucket) {
super(numBucket);
type = DistributionInfoType.RANDOM;
}
public RandomDistributionDesc(int numBucket) {
public RandomDistributionDesc(int numBucket, boolean autoBucket) {
super(numBucket, autoBucket);
type = DistributionInfoType.RANDOM;
this.numBucket = numBucket;
}
@Override
public void analyze(Set<String> colSet, List<ColumnDef> columnDefs, KeysDesc keysDesc) throws AnalysisException {
if (numBucket <= 0) {
throw new AnalysisException("Number of random distribution should be larger than zero.");
throw new AnalysisException("Number of random distribution should be greater than zero.");
}
}
@ -52,23 +48,18 @@ public class RandomDistributionDesc extends DistributionDesc {
public String toSql() {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append("DISTRIBUTED BY RANDOM\n")
.append("BUCKETS ").append(numBucket);
.append("BUCKETS ");
if (autoBucket) {
stringBuilder.append("AUTO");
} else {
stringBuilder.append(numBucket);
}
return stringBuilder.toString();
}
@Override
public DistributionInfo toDistributionInfo(List<Column> columns) {
RandomDistributionInfo randomDistributionInfo = new RandomDistributionInfo(numBucket);
RandomDistributionInfo randomDistributionInfo = new RandomDistributionInfo(numBucket, autoBucket);
return randomDistributionInfo;
}
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
out.writeInt(numBucket);
}
public void readFields(DataInput in) throws IOException {
numBucket = in.readInt();
}
}

View File

@ -40,12 +40,28 @@ public abstract class DistributionInfo implements Writable {
@SerializedName(value = "type")
protected DistributionInfoType type;
@SerializedName(value = "bucketNum")
protected int bucketNum;
@SerializedName(value = "autoBucket")
protected boolean autoBucket;
public DistributionInfo() {
// for persist
}
public DistributionInfo(DistributionInfoType type) {
this(type, 0, false);
}
public DistributionInfo(DistributionInfoType type, int bucketNum) {
this(type, bucketNum, false);
}
public DistributionInfo(DistributionInfoType type, int bucketNum, boolean autoBucket) {
this.type = type;
this.bucketNum = bucketNum;
this.autoBucket = autoBucket;
}
public DistributionInfoType getType() {
@ -62,6 +78,10 @@ public abstract class DistributionInfo implements Writable {
throw new NotImplementedException("not implemented");
}
public void markAutoBucket() {
autoBucket = true;
}
public DistributionDesc toDistributionDesc() {
throw new NotImplementedException();
}

View File

@ -2881,6 +2881,12 @@ public class Env {
sb.append(olapTable.getCompressionType()).append("\"");
}
// estimate_partition_size
if (!olapTable.getEstimatePartitionSize().equals("")) {
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_ESTIMATE_PARTITION_SIZE).append("\" = \"");
sb.append(olapTable.getEstimatePartitionSize()).append("\"");
}
// unique key table with merge on write
if (olapTable.getKeysType() == KeysType.UNIQUE_KEYS && olapTable.getEnableUniqueKeyMergeOnWrite()) {
sb.append(",\n\"").append(PropertyAnalyzer.ENABLE_UNIQUE_KEY_MERGE_ON_WRITE).append("\" = \"");

View File

@ -37,8 +37,6 @@ import java.util.Objects;
public class HashDistributionInfo extends DistributionInfo {
@SerializedName(value = "distributionColumns")
private List<Column> distributionColumns;
@SerializedName(value = "bucketNum")
private int bucketNum;
public HashDistributionInfo() {
super();
@ -46,9 +44,13 @@ public class HashDistributionInfo extends DistributionInfo {
}
public HashDistributionInfo(int bucketNum, List<Column> distributionColumns) {
super(DistributionInfoType.HASH);
super(DistributionInfoType.HASH, bucketNum);
this.distributionColumns = distributionColumns;
}
public HashDistributionInfo(int bucketNum, boolean autoBucket, List<Column> distributionColumns) {
super(DistributionInfoType.HASH, bucketNum, autoBucket);
this.distributionColumns = distributionColumns;
this.bucketNum = bucketNum;
}
public List<Column> getDistributionColumns() {
@ -65,6 +67,7 @@ public class HashDistributionInfo extends DistributionInfo {
this.bucketNum = bucketNum;
}
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
int columnCount = distributionColumns.size();
@ -75,6 +78,7 @@ public class HashDistributionInfo extends DistributionInfo {
out.writeInt(bucketNum);
}
@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
int columnCount = in.readInt();
@ -117,7 +121,7 @@ public class HashDistributionInfo extends DistributionInfo {
for (Column col : distributionColumns) {
distriColNames.add(col.getName());
}
DistributionDesc distributionDesc = new HashDistributionDesc(bucketNum, distriColNames);
DistributionDesc distributionDesc = new HashDistributionDesc(bucketNum, autoBucket, distriColNames);
return distributionDesc;
}
@ -133,7 +137,11 @@ public class HashDistributionInfo extends DistributionInfo {
String colList = Joiner.on(", ").join(colNames);
builder.append(colList);
builder.append(") BUCKETS ").append(bucketNum);
if (autoBucket) {
builder.append(") BUCKETS AUTO");
} else {
builder.append(") BUCKETS ").append(bucketNum);
}
return builder.toString();
}
@ -148,7 +156,11 @@ public class HashDistributionInfo extends DistributionInfo {
}
builder.append("]; ");
builder.append("bucket num: ").append(bucketNum).append("; ");
if (autoBucket) {
builder.append("bucket num: auto;");
} else {
builder.append("bucket num: ").append(bucketNum).append(";");
}
return builder.toString();
}

View File

@ -594,8 +594,8 @@ public class OlapTable extends Table {
if (full) {
return indexIdToMeta.get(indexId).getSchema();
} else {
return indexIdToMeta.get(indexId).getSchema().stream().filter(column ->
column.isVisible()).collect(Collectors.toList());
return indexIdToMeta.get(indexId).getSchema().stream().filter(column -> column.isVisible())
.collect(Collectors.toList());
}
}
@ -1133,7 +1133,6 @@ public class OlapTable extends Table {
return false;
}
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
@ -1283,6 +1282,9 @@ public class OlapTable extends Table {
if (in.readBoolean()) {
tableProperty = TableProperty.read(in);
}
if (isAutoBucket()) {
defaultDistributionInfo.markAutoBucket();
}
// temp partitions
tempPartitions = TempPartitions.read(in);
@ -1626,6 +1628,36 @@ public class OlapTable extends Table {
tableProperty.buildInMemory();
}
public Boolean isAutoBucket() {
if (tableProperty != null) {
return tableProperty.isAutoBucket();
}
return false;
}
public void setIsAutoBucket(boolean isAutoBucket) {
if (tableProperty == null) {
tableProperty = new TableProperty(new HashMap<>());
}
tableProperty.modifyTableProperties(PropertyAnalyzer.PROPERTIES_AUTO_BUCKET,
Boolean.valueOf(isAutoBucket).toString());
}
public void setEstimatePartitionSize(String estimatePartitionSize) {
if (tableProperty == null) {
tableProperty = new TableProperty(new HashMap<>());
}
tableProperty.modifyTableProperties(PropertyAnalyzer.PROPERTIES_ESTIMATE_PARTITION_SIZE,
estimatePartitionSize);
}
public String getEstimatePartitionSize() {
if (tableProperty != null) {
return tableProperty.getEstimatePartitionSize();
}
return "";
}
public boolean getEnableLightSchemaChange() {
if (tableProperty != null) {
return tableProperty.getUseSchemaLightChange();
@ -1879,11 +1911,11 @@ public class OlapTable extends Table {
return false;
}
List<Expr> partitionExps = aggregateInfo.getPartitionExprs() != null
? aggregateInfo.getPartitionExprs() : groupingExps;
? aggregateInfo.getPartitionExprs()
: groupingExps;
DistributionInfo distribution = getDefaultDistributionInfo();
if (distribution instanceof HashDistributionInfo) {
List<Column> distributeColumns =
((HashDistributionInfo) distribution).getDistributionColumns();
List<Column> distributeColumns = ((HashDistributionInfo) distribution).getDistributionColumns();
PartitionInfo partitionInfo = getPartitionInfo();
if (partitionInfo instanceof RangePartitionInfo) {
List<Column> rangeColumns = partitionInfo.getPartitionColumns();
@ -1891,8 +1923,7 @@ public class OlapTable extends Table {
return false;
}
}
List<SlotRef> partitionSlots =
partitionExps.stream().map(Expr::unwrapSlotRef).collect(Collectors.toList());
List<SlotRef> partitionSlots = partitionExps.stream().map(Expr::unwrapSlotRef).collect(Collectors.toList());
if (partitionSlots.contains(null)) {
return false;
}

View File

@ -29,21 +29,21 @@ import java.util.Objects;
* Random partition.
*/
public class RandomDistributionInfo extends DistributionInfo {
private int bucketNum;
public RandomDistributionInfo() {
super();
}
public RandomDistributionInfo(int bucketNum) {
super(DistributionInfoType.RANDOM);
this.bucketNum = bucketNum;
super(DistributionInfoType.RANDOM, bucketNum);
}
public RandomDistributionInfo(int bucketNum, boolean autoBucket) {
super(DistributionInfoType.RANDOM, bucketNum, autoBucket);
}
@Override
public DistributionDesc toDistributionDesc() {
DistributionDesc distributionDesc = new RandomDistributionDesc(bucketNum);
DistributionDesc distributionDesc = new RandomDistributionDesc(bucketNum, autoBucket);
return distributionDesc;
}
@ -55,15 +55,21 @@ public class RandomDistributionInfo extends DistributionInfo {
@Override
public String toSql() {
StringBuilder builder = new StringBuilder();
builder.append("DISTRIBUTED BY RANDOM BUCKETS ").append(bucketNum);
if (autoBucket) {
builder.append("DISTRIBUTED BY RANDOM() BUCKETS AUTO");
} else {
builder.append("DISTRIBUTED BY RANDOM() BUCKETS ").append(bucketNum);
}
return builder.toString();
}
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
out.writeInt(bucketNum);
}
@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
bucketNum = in.readInt();

View File

@ -244,6 +244,14 @@ public class TableProperty implements Writable {
return isInMemory;
}
public boolean isAutoBucket() {
return Boolean.parseBoolean(properties.getOrDefault(PropertyAnalyzer.PROPERTIES_AUTO_BUCKET, "false"));
}
public String getEstimatePartitionSize() {
return properties.getOrDefault(PropertyAnalyzer.PROPERTIES_ESTIMATE_PARTITION_SIZE, "");
}
public TStorageFormat getStorageFormat() {
// Force convert all V1 table to V2 table
if (TStorageFormat.V1 == storageFormat) {

View File

@ -32,6 +32,7 @@ import org.apache.doris.catalog.DynamicPartitionProperty;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.HashDistributionInfo;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.PartitionKey;
import org.apache.doris.catalog.RangePartitionInfo;
@ -42,6 +43,7 @@ import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.AutoBucketUtils;
import org.apache.doris.common.util.DynamicPartitionUtil;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.common.util.PropertyAnalyzer;
@ -140,6 +142,75 @@ public class DynamicPartitionScheduler extends MasterDaemon {
return defaultRuntimeInfo;
}
// exponential moving average
private static long ema(ArrayList<Long> history, int period) {
double alpha = 2.0 / (period + 1);
double ema = history.get(0);
for (int i = 1; i < history.size(); i++) {
ema = alpha * history.get(i) + (1 - alpha) * ema;
}
return (long) ema;
}
private static long getNextPartitionSize(ArrayList<Long> historyPartitionsSize) {
if (historyPartitionsSize.size() < 2) {
return historyPartitionsSize.get(0);
}
int size = historyPartitionsSize.size() > 7 ? 7 : historyPartitionsSize.size();
boolean isAscending = true;
for (int i = 1; i < size; i++) {
if (historyPartitionsSize.get(i) < historyPartitionsSize.get(i - 1)) {
isAscending = false;
break;
}
}
if (isAscending) {
ArrayList<Long> historyDeltaSize = Lists.newArrayList();
for (int i = 1; i < size; i++) {
historyDeltaSize.add(historyPartitionsSize.get(i) - historyPartitionsSize.get(i - 1));
}
return historyPartitionsSize.get(size - 1) + ema(historyDeltaSize, 7);
} else {
return ema(historyPartitionsSize, 7);
}
}
private static int getBucketsNum(DynamicPartitionProperty property, OlapTable table) {
if (!table.isAutoBucket()) {
return property.getBuckets();
}
List<Partition> partitions = Lists.newArrayList();
RangePartitionInfo info = (RangePartitionInfo) (table.getPartitionInfo());
List<Map.Entry<Long, PartitionItem>> idToItems = new ArrayList<>(info.getIdToItem(false).entrySet());
idToItems.sort(Comparator.comparing(o -> ((RangePartitionItem) o.getValue()).getItems().upperEndpoint()));
for (Map.Entry<Long, PartitionItem> idToItem : idToItems) {
Partition partition = table.getPartition(idToItem.getKey());
if (partition != null) {
partitions.add(partition);
}
}
// auto bucket
if (partitions.size() == 0) {
return property.getBuckets();
}
ArrayList<Long> partitionSizeArray = Lists.newArrayList();
for (Partition partition : partitions) {
if (partition.getVisibleVersion() >= 2) {
partitionSizeArray.add(partition.getDataSize());
}
}
// * 5 for uncompressed data
long uncompressedPartitionSize = getNextPartitionSize(partitionSizeArray) * 5;
return AutoBucketUtils.getBucketsNum(uncompressedPartitionSize);
}
private ArrayList<AddPartitionClause> getAddPartitionClause(Database db, OlapTable olapTable,
Column partitionColumn, String partitionFormat) {
ArrayList<AddPartitionClause> addPartitionClauses = new ArrayList<>();
@ -231,21 +302,22 @@ public class DynamicPartitionScheduler extends MasterDaemon {
String partitionName = dynamicPartitionProperty.getPrefix()
+ DynamicPartitionUtil.getFormattedPartitionName(dynamicPartitionProperty.getTimeZone(),
prevBorder, dynamicPartitionProperty.getTimeUnit());
prevBorder, dynamicPartitionProperty.getTimeUnit());
SinglePartitionDesc rangePartitionDesc = new SinglePartitionDesc(true, partitionName,
partitionKeyDesc, partitionProperties);
DistributionDesc distributionDesc = null;
DistributionInfo distributionInfo = olapTable.getDefaultDistributionInfo();
int bucketsNum = getBucketsNum(dynamicPartitionProperty, olapTable);
if (distributionInfo.getType() == DistributionInfo.DistributionInfoType.HASH) {
HashDistributionInfo hashDistributionInfo = (HashDistributionInfo) distributionInfo;
List<String> distColumnNames = new ArrayList<>();
for (Column distributionColumn : hashDistributionInfo.getDistributionColumns()) {
distColumnNames.add(distributionColumn.getName());
}
distributionDesc = new HashDistributionDesc(dynamicPartitionProperty.getBuckets(), distColumnNames);
distributionDesc = new HashDistributionDesc(bucketsNum, distColumnNames);
} else {
distributionDesc = new RandomDistributionDesc(dynamicPartitionProperty.getBuckets());
distributionDesc = new RandomDistributionDesc(bucketsNum);
}
// add partition according to partition desc and distribution desc
addPartitionClauses.add(new AddPartitionClause(rangePartitionDesc, distributionDesc, null, false));
@ -265,8 +337,8 @@ public class DynamicPartitionScheduler extends MasterDaemon {
}
private void setStoragePolicyProperty(HashMap<String, String> partitionProperties,
DynamicPartitionProperty property, ZonedDateTime now, int offset,
String storagePolicyName) {
DynamicPartitionProperty property, ZonedDateTime now, int offset,
String storagePolicyName) {
partitionProperties.put(PropertyAnalyzer.PROPERTIES_STORAGE_POLICY, storagePolicyName);
String baseTime = DynamicPartitionUtil.getPartitionRangeString(
property, now, offset, DynamicPartitionUtil.DATETIME_FORMAT);
@ -341,8 +413,8 @@ public class DynamicPartitionScheduler extends MasterDaemon {
dynamicPartitionProperty, range.lowerEndpoint().toString(), partitionFormat);
String upperBorderOfReservedHistory = DynamicPartitionUtil.getHistoryPartitionRangeString(
dynamicPartitionProperty, range.upperEndpoint().toString(), partitionFormat);
Range<PartitionKey> reservedHistoryPartitionKeyRange
= getClosedRange(db, olapTable, partitionColumn, partitionFormat,
Range<PartitionKey> reservedHistoryPartitionKeyRange = getClosedRange(db, olapTable,
partitionColumn, partitionFormat,
lowerBorderOfReservedHistory, upperBorderOfReservedHistory);
reservedHistoryPartitionKeyRangeList.add(reservedHistoryPartitionKeyRange);
} catch (IllegalArgumentException e) {

View File

@ -22,6 +22,10 @@ import org.apache.doris.persist.meta.FeMetaFormat;
public class FeConstants {
// Database and table's default configurations, we will never change them
public static short default_replication_num = 3;
// The default value of bucket setting && auto bucket without estimate_partition_size
public static int default_bucket_num = 10;
/*
* Those two fields is responsible for determining the default key columns in duplicate table.
* If user does not specify key of duplicate table in create table stmt,

View File

@ -0,0 +1,98 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.common.util;
import org.apache.doris.catalog.DiskInfo;
import org.apache.doris.catalog.DiskInfo.DiskState;
import org.apache.doris.catalog.Env;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import com.google.common.collect.ImmutableMap;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class AutoBucketUtils {
private static Logger logger = LogManager.getLogger(AutoBucketUtils.class);
static final long SIZE_100MB = 100 * 1024 * 1024L;
static final long SIZE_1GB = 1 * 1024 * 1024 * 1024L;
static final long SIZE_1TB = 1024 * SIZE_1GB;
private static int getBENum() {
SystemInfoService infoService = Env.getCurrentSystemInfo();
ImmutableMap<Long, Backend> backends = infoService.getBackendsInCluster(null);
int activeBENum = 0;
for (Backend backend : backends.values()) {
if (backend.isAlive()) {
++activeBENum;
}
}
return activeBENum;
}
private static int getBucketsNumByBEDisks() {
SystemInfoService infoService = Env.getCurrentSystemInfo();
ImmutableMap<Long, Backend> backends = infoService.getBackendsInCluster(null);
int buckets = 0;
for (Backend backend : backends.values()) {
if (!backend.isLoadAvailable()) {
continue;
}
ImmutableMap<String, DiskInfo> disks = backend.getDisks();
for (DiskInfo diskInfo : disks.values()) {
if (diskInfo.getState() == DiskState.ONLINE && diskInfo.hasPathHash()) {
buckets += (diskInfo.getAvailableCapacityB() - 1) / (50 * SIZE_1GB) + 1;
}
}
}
return buckets;
}
private static int convertParitionSizeToBucketsNum(long partitionSize) {
partitionSize /= 5; // for compression 5:1
// <= 100MB, 1 bucket
// <= 1GB, 2 buckets
// > 1GB, round to (size / 1G)
if (partitionSize <= SIZE_100MB) {
return 1;
} else if (partitionSize <= SIZE_1GB) {
return 2;
} else {
return (int) ((partitionSize - 1) / SIZE_1GB + 1);
}
}
public static int getBucketsNum(long partitionSize) {
int bucketsNumByPartitionSize = convertParitionSizeToBucketsNum(partitionSize);
int bucketsNumByBE = getBucketsNumByBEDisks();
int bucketsNum = Math.min(128, Math.min(bucketsNumByPartitionSize, bucketsNumByBE));
int beNum = getBENum();
logger.debug("AutoBucketsUtil: bucketsNumByPartitionSize {}, bucketsNumByBE {}, bucketsNum {}, beNum {}",
bucketsNumByPartitionSize, bucketsNumByBE, bucketsNum, beNum);
if (bucketsNum < bucketsNumByPartitionSize && bucketsNum < beNum) {
bucketsNum = beNum;
}
logger.debug("AutoBucketsUtil: final bucketsNum {}", bucketsNum);
return bucketsNum;
}
}

View File

@ -92,6 +92,10 @@ public class PropertyAnalyzer {
public static final String PROPERTIES_INMEMORY = "in_memory";
// _auto_bucket can only set in create table stmt rewrite bucket and can not be changed
public static final String PROPERTIES_AUTO_BUCKET = "_auto_bucket";
public static final String PROPERTIES_ESTIMATE_PARTITION_SIZE = "estimate_partition_size";
public static final String PROPERTIES_TABLET_TYPE = "tablet_type";
public static final String PROPERTIES_STRICT_RANGE = "strict_range";
@ -131,7 +135,7 @@ public class PropertyAnalyzer {
/**
* check and replace members of DataProperty by properties.
*
* @param properties key->value for members to change.
* @param properties key->value for members to change.
* @param oldDataProperty old DataProperty
* @return new DataProperty
* @throws AnalysisException property has invalid key->value
@ -246,7 +250,8 @@ public class PropertyAnalyzer {
throws AnalysisException {
Short replicationNum = oldReplicationNum;
String propKey = Strings.isNullOrEmpty(prefix)
? PROPERTIES_REPLICATION_NUM : prefix + "." + PROPERTIES_REPLICATION_NUM;
? PROPERTIES_REPLICATION_NUM
: prefix + "." + PROPERTIES_REPLICATION_NUM;
if (properties != null && properties.containsKey(propKey)) {
try {
replicationNum = Short.valueOf(properties.get(propKey));
@ -348,7 +353,7 @@ public class PropertyAnalyzer {
}
public static Set<String> analyzeBloomFilterColumns(Map<String, String> properties, List<Column> columns,
KeysType keysType) throws AnalysisException {
KeysType keysType) throws AnalysisException {
Set<String> bfColumns = null;
if (properties != null && properties.containsKey(PROPERTIES_BF_COLUMNS)) {
bfColumns = Sets.newHashSet();
@ -483,7 +488,7 @@ public class PropertyAnalyzer {
}
// analyzeCompressionType will parse the compression type from properties
public static TCompressionType analyzeCompressionType(Map<String, String> properties) throws AnalysisException {
public static TCompressionType analyzeCompressionType(Map<String, String> properties) throws AnalysisException {
String compressionType = "";
if (properties != null && properties.containsKey(PROPERTIES_COMPRESSION)) {
compressionType = properties.get(PROPERTIES_COMPRESSION);
@ -545,6 +550,15 @@ public class PropertyAnalyzer {
return defaultVal;
}
public static String analyzeEstimatePartitionSize(Map<String, String> properties) {
String estimatePartitionSize = "";
if (properties != null && properties.containsKey(PROPERTIES_ESTIMATE_PARTITION_SIZE)) {
estimatePartitionSize = properties.get(PROPERTIES_ESTIMATE_PARTITION_SIZE);
properties.remove(PROPERTIES_ESTIMATE_PARTITION_SIZE);
}
return estimatePartitionSize;
}
public static String analyzeStoragePolicy(Map<String, String> properties) throws AnalysisException {
String storagePolicy = "";
if (properties != null && properties.containsKey(PROPERTIES_STORAGE_POLICY)) {
@ -760,7 +774,6 @@ public class PropertyAnalyzer {
throw new AnalysisException(PropertyAnalyzer.ENABLE_UNIQUE_KEY_MERGE_ON_WRITE + " must be `true` or `false`");
}
/**
* Check the type property of the catalog props.
*/
@ -776,5 +789,3 @@ public class PropertyAnalyzer {
}
}
}

View File

@ -242,7 +242,6 @@ public class InternalCatalog implements CatalogIf<Database> {
return INTERNAL_CATALOG_NAME;
}
@Override
public List<String> getDbNames() {
return Lists.newArrayList(fullNameToDb.keySet());
@ -736,12 +735,12 @@ public class InternalCatalog implements CatalogIf<Database> {
if (Strings.isNullOrEmpty(newPartitionName)) {
if (olapTable.getPartition(partitionName) != null) {
throw new DdlException("partition[" + partitionName + "] "
+ "already exist in table[" + tableName + "]");
+ "already exist in table[" + tableName + "]");
}
} else {
if (olapTable.getPartition(newPartitionName) != null) {
throw new DdlException("partition[" + newPartitionName + "] "
+ "already exist in table[" + tableName + "]");
+ "already exist in table[" + tableName + "]");
}
}
@ -934,7 +933,7 @@ public class InternalCatalog implements CatalogIf<Database> {
}
public boolean unprotectDropTable(Database db, Table table, boolean isForceDrop, boolean isReplay,
long recycleTime) {
long recycleTime) {
if (table.getType() == TableType.ELASTICSEARCH) {
esRepository.deRegisterTable(table.getId());
} else if (table.getType() == TableType.OLAP) {
@ -966,7 +965,7 @@ public class InternalCatalog implements CatalogIf<Database> {
}
public void replayDropTable(Database db, long tableId, boolean isForceDrop,
Long recycleTime) throws MetaNotFoundException {
Long recycleTime) throws MetaNotFoundException {
Table table = db.getTableOrMetaException(tableId);
db.writeLock();
table.writeLock();
@ -1004,10 +1003,10 @@ public class InternalCatalog implements CatalogIf<Database> {
schemaHash = olapTable.getSchemaHashByIndexId(info.getIndexId());
}
Replica replica =
new Replica(info.getReplicaId(), info.getBackendId(), info.getVersion(), schemaHash, info.getDataSize(),
info.getRemoteDataSize(), info.getRowCount(), ReplicaState.NORMAL, info.getLastFailedVersion(),
info.getLastSuccessVersion());
Replica replica = new Replica(info.getReplicaId(), info.getBackendId(), info.getVersion(), schemaHash,
info.getDataSize(),
info.getRemoteDataSize(), info.getRowCount(), ReplicaState.NORMAL, info.getLastFailedVersion(),
info.getLastSuccessVersion());
tablet.addReplica(replica);
}
@ -1371,8 +1370,8 @@ public class InternalCatalog implements CatalogIf<Database> {
if (distributionInfo.getType() == DistributionInfoType.HASH) {
HashDistributionInfo hashDistributionInfo = (HashDistributionInfo) distributionInfo;
List<Column> newDistriCols = hashDistributionInfo.getDistributionColumns();
List<Column> defaultDistriCols
= ((HashDistributionInfo) defaultDistributionInfo).getDistributionColumns();
List<Column> defaultDistriCols = ((HashDistributionInfo) defaultDistributionInfo)
.getDistributionColumns();
if (!newDistriCols.equals(defaultDistriCols)) {
throw new DdlException(
"Cannot assign hash distribution with different distribution cols. " + "default is: "
@ -1633,7 +1632,7 @@ public class InternalCatalog implements CatalogIf<Database> {
olapTable.dropTempPartition(info.getPartitionName(), true);
} else {
Partition partition = olapTable.dropPartition(info.getDbId(), info.getPartitionName(),
info.isForceDrop());
info.isForceDrop());
if (!info.isForceDrop() && partition != null && info.getRecycleTime() != 0) {
Env.getCurrentRecycleBin().setRecycleTimeByIdForReplay(partition.getId(), info.getRecycleTime());
}
@ -1664,7 +1663,7 @@ public class InternalCatalog implements CatalogIf<Database> {
DistributionInfo distributionInfo, TStorageMedium storageMedium, ReplicaAllocation replicaAlloc,
Long versionInfo, Set<String> bfColumns, double bfFpp, Set<Long> tabletIdSet, List<Index> indexes,
boolean isInMemory, TStorageFormat storageFormat, TTabletType tabletType, TCompressionType compressionType,
DataSortInfo dataSortInfo, boolean enableUniqueKeyMergeOnWrite, String storagePolicy,
DataSortInfo dataSortInfo, boolean enableUniqueKeyMergeOnWrite, String storagePolicy,
IdGeneratorBuffer idGeneratorBuffer, boolean disableAutoCompaction) throws DdlException {
// create base index first.
Preconditions.checkArgument(baseIndexId != -1);
@ -1927,6 +1926,17 @@ public class InternalCatalog implements CatalogIf<Database> {
olapTable.setReplicationAllocation(replicaAlloc);
// set auto bucket
boolean isAutoBucket = PropertyAnalyzer.analyzeBooleanProp(properties, PropertyAnalyzer.PROPERTIES_AUTO_BUCKET,
false);
olapTable.setIsAutoBucket(isAutoBucket);
// set estimate partition size
if (isAutoBucket) {
String estimatePartitionSize = PropertyAnalyzer.analyzeEstimatePartitionSize(properties);
olapTable.setEstimatePartitionSize(estimatePartitionSize);
}
// set in memory
boolean isInMemory = PropertyAnalyzer.analyzeBooleanProp(properties, PropertyAnalyzer.PROPERTIES_INMEMORY,
false);

View File

@ -479,6 +479,7 @@ import org.apache.doris.qe.SqlModeHelper;
keywordMap.put("year", new Integer(SqlParserSymbols.KW_YEAR));
keywordMap.put("mtmv", new Integer(SqlParserSymbols.KW_MTMV));
keywordMap.put("histogram", new Integer(SqlParserSymbols.KW_HISTOGRAM));
keywordMap.put("auto", new Integer(SqlParserSymbols.KW_AUTO));
}
// map from token id to token description

View File

@ -0,0 +1,294 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.common.util;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.persist.EditLog;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ResultSetMetaData;
import org.apache.doris.qe.ShowResultSet;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TDisk;
import org.apache.doris.thrift.TStorageMedium;
import org.apache.doris.utframe.UtFrameUtils;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import mockit.Expectations;
import mockit.Mocked;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.StringContains;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
public class AutoBucketUtilsTest {
private static String databaseName = "AutoBucketUtilsTest";
// use a unique dir so that it won't be conflict with other unit test which
// may also start a Mocked Frontend
private static String runningDirBase = "fe";
private static String runningDir = runningDirBase + "/mocked/AutoBucketUtilsTest/" + UUID.randomUUID().toString()
+ "/";
private static List<Backend> backends = Lists.newArrayList();
private static Random random = new Random(System.currentTimeMillis());
private ConnectContext connectContext;
// // create backends by be num, disk num, disk capacity
private static void createClusterWithBackends(int beNum, int diskNum, long diskCapacity) throws Exception {
UtFrameUtils.createDorisClusterWithMultiTag(runningDir, beNum);
// must set disk info, or the tablet scheduler won't work
backends = Env.getCurrentSystemInfo().getClusterBackends(SystemInfoService.DEFAULT_CLUSTER);
for (Backend be : backends) {
setDiskInfos(diskNum, diskCapacity, be);
}
}
private static ImmutableMap<Long, Backend> createBackends(int beNum, int diskNum, long diskCapacity)
throws Exception {
// must set disk info, or the tablet scheduler won't work
Map<Long, Backend> backends = Maps.newHashMap();
for (int i = 0; i < beNum; ++i) {
Backend be = new Backend(10000 + i, "127.0.0." + (i + 1), 9000 + i);
be.setAlive(true);
backends.put(be.getId(), be);
}
for (Backend be : backends.values()) {
setDiskInfos(diskNum, diskCapacity, be);
}
return ImmutableMap.copyOf(backends);
}
private static void setDiskInfos(int diskNum, long diskCapacity, Backend be) {
Map<String, TDisk> backendDisks = Maps.newHashMap();
for (int i = 0; i < diskNum; ++i) {
TDisk disk = new TDisk();
disk.setRootPath("/home/doris/" + UUID.randomUUID().toString());
disk.setDiskTotalCapacity(diskCapacity);
disk.setDataUsedCapacity(0);
disk.setUsed(true);
disk.setDiskAvailableCapacity(disk.disk_total_capacity - disk.data_used_capacity);
disk.setPathHash(random.nextLong());
disk.setStorageMedium(TStorageMedium.HDD);
backendDisks.put(disk.getRootPath(), disk);
}
be.updateDisks(backendDisks);
}
private void expectations(Env env, EditLog editLog, SystemInfoService systemInfoService,
ImmutableMap<Long, Backend> backends) {
new Expectations() {
{
Env.getCurrentSystemInfo();
minTimes = 0;
result = systemInfoService;
systemInfoService.getBackendsInCluster(null);
minTimes = 0;
result = backends;
Env.getCurrentEnv();
minTimes = 0;
result = env;
env.getEditLog();
minTimes = 0;
result = editLog;
editLog.logBackendStateChange((Backend) any);
minTimes = 0;
}
};
}
@Before
public void setUp() throws Exception {
FeConstants.runningUnitTest = true;
FeConstants.tablet_checker_interval_ms = 1000;
FeConstants.default_scheduler_interval_millisecond = 100;
Config.tablet_repair_delay_factor_second = 1;
connectContext = UtFrameUtils.createDefaultCtx();
}
@After
public void tearDown() {
Env.getCurrentEnv().clear();
UtFrameUtils.cleanDorisFeDir(runningDirBase);
}
private static String genTableNameWithoutDatabase(String estimatePartitionSize) {
return "size_" + estimatePartitionSize;
}
private static String genTableName(String estimatePartitionSize) {
return databaseName + "." + genTableNameWithoutDatabase(estimatePartitionSize);
}
private static String genTableNameByTag(String estimatePartitionSize, String tag) {
return databaseName + "." + genTableNameWithoutDatabase(estimatePartitionSize) + "_" + tag;
}
private static String genCreateTableSql(String estimatePartitionSize) {
return "CREATE TABLE IF NOT EXISTS " + genTableName(estimatePartitionSize) + "\n"
+ "(\n"
+ "`user_id` LARGEINT NOT NULL\n"
+ ")\n"
+ "DISTRIBUTED BY HASH(`user_id`) BUCKETS AUTO\n"
+ "PROPERTIES (\n"
+ "\"estimate_partition_size\" = \"" + estimatePartitionSize + "\",\n"
+ "\"replication_num\" = \"1\"\n"
+ ")";
}
private void createTable(String sql) throws Exception {
// create database first
UtFrameUtils.createDatabase(connectContext, databaseName);
UtFrameUtils.createTable(connectContext, sql);
}
private void createTableBySize(String estimatePartitionSize) throws Exception {
createTable(genCreateTableSql(estimatePartitionSize));
}
private int getPartitionBucketNum(String tableName) throws Exception {
ShowResultSet result = UtFrameUtils.showPartitionsByName(connectContext, tableName);
ResultSetMetaData metaData = result.getMetaData();
for (int i = 0; i < metaData.getColumnCount(); ++i) {
if (metaData.getColumn(i).getName().equalsIgnoreCase("buckets")) {
return Integer.valueOf(result.getResultRows().get(0).get(i));
}
}
throw new Exception("No buckets column in show partitions result");
}
// also has checked create table && show partitions
@Test
public void testWithoutEstimatePartitionSize() throws Exception {
String tableName = genTableName("");
String sql = "CREATE TABLE IF NOT EXISTS " + tableName + "\n"
+ "(\n"
+ "`user_id` LARGEINT NOT NULL\n"
+ ")\n"
+ "DISTRIBUTED BY HASH(`user_id`) BUCKETS AUTO\n"
+ "PROPERTIES (\n"
+ "\"replication_num\" = \"1\"\n"
+ ")";
createClusterWithBackends(1, 1, 2000000000);
createTable(sql);
ShowResultSet showCreateTableResult = UtFrameUtils.showCreateTableByName(connectContext, tableName);
String showCreateTableResultSql = showCreateTableResult.getResultRows().get(0).get(1);
MatcherAssert.assertThat(showCreateTableResultSql,
StringContains.containsString("DISTRIBUTED BY HASH(`user_id`) BUCKETS AUTO\n"));
int bucketNum = getPartitionBucketNum(tableName);
Assert.assertEquals(FeConstants.default_bucket_num, bucketNum);
}
@Test
public void test100MB(@Mocked Env env, @Mocked EditLog editLog, @Mocked SystemInfoService systemInfoService)
throws Exception {
long estimatePartitionSize = AutoBucketUtils.SIZE_100MB;
ImmutableMap<Long, Backend> backends = createBackends(10, 3, 2000000000);
expectations(env, editLog, systemInfoService, backends);
Assert.assertEquals(1, AutoBucketUtils.getBucketsNum(estimatePartitionSize));
}
@Test
public void test500MB(@Mocked Env env, @Mocked EditLog editLog, @Mocked SystemInfoService systemInfoService)
throws Exception {
long estimatePartitionSize = 5 * AutoBucketUtils.SIZE_100MB;
ImmutableMap<Long, Backend> backends = createBackends(10, 3, 2000000000);
expectations(env, editLog, systemInfoService, backends);
Assert.assertEquals(1, AutoBucketUtils.getBucketsNum(estimatePartitionSize));
}
@Test
public void test1G(@Mocked Env env, @Mocked EditLog editLog, @Mocked SystemInfoService systemInfoService)
throws Exception {
long estimatePartitionSize = AutoBucketUtils.SIZE_1GB;
ImmutableMap<Long, Backend> backends = createBackends(3, 2, 500 * AutoBucketUtils.SIZE_1GB);
expectations(env, editLog, systemInfoService, backends);
Assert.assertEquals(2, AutoBucketUtils.getBucketsNum(estimatePartitionSize));
}
@Test
public void test100G(@Mocked Env env, @Mocked EditLog editLog, @Mocked SystemInfoService systemInfoService)
throws Exception {
long estimatePartitionSize = 100 * AutoBucketUtils.SIZE_1GB;
ImmutableMap<Long, Backend> backends = createBackends(3, 2, 500 * AutoBucketUtils.SIZE_1GB);
expectations(env, editLog, systemInfoService, backends);
Assert.assertEquals(20, AutoBucketUtils.getBucketsNum(estimatePartitionSize));
}
@Test
public void test500G_0(@Mocked Env env, @Mocked EditLog editLog, @Mocked SystemInfoService systemInfoService)
throws Exception {
long estimatePartitionSize = 500 * AutoBucketUtils.SIZE_1GB;
ImmutableMap<Long, Backend> backends = createBackends(3, 1, AutoBucketUtils.SIZE_1TB);
expectations(env, editLog, systemInfoService, backends);
Assert.assertEquals(63, AutoBucketUtils.getBucketsNum(estimatePartitionSize));
}
@Test
public void test500G_1(@Mocked Env env, @Mocked EditLog editLog, @Mocked SystemInfoService systemInfoService)
throws Exception {
long estimatePartitionSize = 500 * AutoBucketUtils.SIZE_1GB;
ImmutableMap<Long, Backend> backends = createBackends(10, 3, 2 * AutoBucketUtils.SIZE_1TB);
expectations(env, editLog, systemInfoService, backends);
Assert.assertEquals(100, AutoBucketUtils.getBucketsNum(estimatePartitionSize));
}
@Test
public void test500G_2(@Mocked Env env, @Mocked EditLog editLog, @Mocked SystemInfoService systemInfoService)
throws Exception {
long estimatePartitionSize = 500 * AutoBucketUtils.SIZE_1GB;
ImmutableMap<Long, Backend> backends = createBackends(1, 1, 100 * AutoBucketUtils.SIZE_1TB);
expectations(env, editLog, systemInfoService, backends);
Assert.assertEquals(100, AutoBucketUtils.getBucketsNum(estimatePartitionSize));
}
@Test
public void test1T_0(@Mocked Env env, @Mocked EditLog editLog, @Mocked SystemInfoService systemInfoService)
throws Exception {
long estimatePartitionSize = AutoBucketUtils.SIZE_1TB;
ImmutableMap<Long, Backend> backends = createBackends(10, 3, 2 * AutoBucketUtils.SIZE_1TB);
expectations(env, editLog, systemInfoService, backends);
Assert.assertEquals(128, AutoBucketUtils.getBucketsNum(estimatePartitionSize));
}
@Test
public void test1T_1(@Mocked Env env, @Mocked EditLog editLog, @Mocked SystemInfoService systemInfoService)
throws Exception {
long estimatePartitionSize = AutoBucketUtils.SIZE_1TB;
ImmutableMap<Long, Backend> backends = createBackends(200, 7, 4 * AutoBucketUtils.SIZE_1TB);
expectations(env, editLog, systemInfoService, backends);
Assert.assertEquals(200, AutoBucketUtils.getBucketsNum(estimatePartitionSize));
}
}

View File

@ -18,14 +18,20 @@
package org.apache.doris.utframe;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.CreateDbStmt;
import org.apache.doris.analysis.CreateTableStmt;
import org.apache.doris.analysis.ExplainOptions;
import org.apache.doris.analysis.QueryStmt;
import org.apache.doris.analysis.ShowCreateTableStmt;
import org.apache.doris.analysis.ShowPartitionsStmt;
import org.apache.doris.analysis.SqlParser;
import org.apache.doris.analysis.SqlScanner;
import org.apache.doris.analysis.StatementBase;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.DiskInfo;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.TabletMeta;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
@ -35,6 +41,8 @@ import org.apache.doris.planner.Planner;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.qe.QueryState;
import org.apache.doris.qe.ShowExecutor;
import org.apache.doris.qe.ShowResultSet;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
@ -62,6 +70,7 @@ import java.net.ServerSocket;
import java.net.SocketException;
import java.nio.channels.SocketChannel;
import java.nio.file.Files;
import java.util.ConcurrentModificationException;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@ -267,7 +276,7 @@ public class UtFrameUtils {
// start be
MockedBackend backend = MockedBackendFactory.createBackend(beHost, beHeartbeatPort, beThriftPort, beBrpcPort,
beHttpPort, new DefaultHeartbeatServiceImpl(beThriftPort, beHttpPort, beBrpcPort),
new DefaultBeThriftServiceImpl(), new DefaultPBackendServiceImpl());
new DefaultBeThriftServiceImpl(), new DefaultPBackendServiceImpl());
backend.setFeAddress(new TNetworkAddress("127.0.0.1", feRpcPort));
backend.start();
@ -307,7 +316,7 @@ public class UtFrameUtils {
datagramSocket.setReuseAddress(true);
break;
} catch (SocketException e) {
System.out.println("The port " + port + " is invalid and try another port.");
System.out.println("The port " + port + " is invalid and try another port.");
}
} catch (IOException e) {
throw new IllegalStateException("Could not find a free TCP/IP port to start HTTP Server on");
@ -357,8 +366,8 @@ public class UtFrameUtils {
}
public static String getStmtDigest(ConnectContext connectContext, String originStmt) throws Exception {
SqlScanner input =
new SqlScanner(new StringReader(originStmt), connectContext.getSessionVariable().getSqlMode());
SqlScanner input = new SqlScanner(new StringReader(originStmt),
connectContext.getSessionVariable().getSqlMode());
SqlParser parser = new SqlParser(input);
StatementBase statementBase = SqlParserUtils.getFirstStmt(parser);
Preconditions.checkState(statementBase instanceof QueryStmt);
@ -372,4 +381,70 @@ public class UtFrameUtils {
String realVNodeName = idx + ":V" + nodeName;
return planResult.contains(realNodeName) || planResult.contains(realVNodeName);
}
public static void createDatabase(ConnectContext ctx, String db) throws Exception {
String createDbStmtStr = "CREATE DATABASE " + db;
CreateDbStmt createDbStmt = (CreateDbStmt) parseAndAnalyzeStmt(createDbStmtStr, ctx);
Env.getCurrentEnv().createDb(createDbStmt);
}
public static void createTable(ConnectContext ctx, String sql) throws Exception {
try {
createTables(ctx, sql);
} catch (ConcurrentModificationException e) {
e.printStackTrace();
throw e;
}
}
public static void createTables(ConnectContext ctx, String... sqls) throws Exception {
for (String sql : sqls) {
CreateTableStmt stmt = (CreateTableStmt) parseAndAnalyzeStmt(sql, ctx);
Env.getCurrentEnv().createTable(stmt);
}
updateReplicaPathHash();
}
public static ShowResultSet showCreateTable(ConnectContext ctx, String sql) throws Exception {
ShowCreateTableStmt stmt = (ShowCreateTableStmt) parseAndAnalyzeStmt(sql, ctx);
ShowExecutor executor = new ShowExecutor(ctx, stmt);
return executor.execute();
}
public static ShowResultSet showCreateTableByName(ConnectContext ctx, String table) throws Exception {
String sql = "show create table " + table;
return showCreateTable(ctx, sql);
}
public static ShowResultSet showPartitions(ConnectContext ctx, String sql) throws Exception {
ShowPartitionsStmt stmt = (ShowPartitionsStmt) parseAndAnalyzeStmt(sql, ctx);
ShowExecutor executor = new ShowExecutor(ctx, stmt);
return executor.execute();
}
public static ShowResultSet showPartitionsByName(ConnectContext ctx, String table) throws Exception {
String sql = "show partitions from " + table;
return showPartitions(ctx, sql);
}
private static void updateReplicaPathHash() {
com.google.common.collect.Table<Long, Long, Replica> replicaMetaTable = Env.getCurrentInvertedIndex()
.getReplicaMetaTable();
for (com.google.common.collect.Table.Cell<Long, Long, Replica> cell : replicaMetaTable.cellSet()) {
long beId = cell.getColumnKey();
Backend be = Env.getCurrentSystemInfo().getBackend(beId);
if (be == null) {
continue;
}
Replica replica = cell.getValue();
TabletMeta tabletMeta = Env.getCurrentInvertedIndex().getTabletMeta(cell.getRowKey());
ImmutableMap<String, DiskInfo> diskMap = be.getDisks();
for (DiskInfo diskInfo : diskMap.values()) {
if (diskInfo.getStorageMedium() == tabletMeta.getStorageMedium()) {
replica.setPathHash(diskInfo.getPathHash());
break;
}
}
}
}
}

View File

@ -0,0 +1,41 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
suite("test_autobucket") {
sql "drop table if exists autobucket_test"
result = sql """
CREATE TABLE `autobucket_test` (
`user_id` largeint(40) NOT NULL
) ENGINE=OLAP
DUPLICATE KEY(`user_id`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`user_id`) BUCKETS AUTO
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
)
"""
result = sql "show create table autobucket_test"
assertTrue(result.toString().containsIgnoreCase("BUCKETS AUTO"))
result = sql "show partitions from autobucket_test"
logger.info("${result}")
// XXX: buckets at pos(8), next maybe impl by sql meta
assertEquals(Integer.valueOf(result.get(0).get(8)), 10)
sql "drop table if exists autobucket_test"
}