Support alter default bucket_num of partitioned olap table (#6023)

* Support modify partitioned olap table's bucket num

Co-authored-by: EmmyMiao87 <522274284@qq.com>
This commit is contained in:
flynn
2021-07-12 20:28:40 +08:00
committed by GitHub
parent dd15da4e12
commit 76e148988a
15 changed files with 292 additions and 3 deletions

View File

@ -1012,6 +1012,10 @@ alter_table_clause ::=
{:
RESULT = new EnableFeatureClause(featureName, properties);
:}
| KW_MODIFY KW_DISTRIBUTION opt_distribution:distribution
{:
RESULT = new ModifyDistributionClause(distribution);
:}
;
opt_enable_feature_properties ::=

View File

@ -26,6 +26,7 @@ import org.apache.doris.analysis.ColumnRenameClause;
import org.apache.doris.analysis.CreateMaterializedViewStmt;
import org.apache.doris.analysis.DropMaterializedViewStmt;
import org.apache.doris.analysis.DropPartitionClause;
import org.apache.doris.analysis.ModifyDistributionClause;
import org.apache.doris.analysis.ModifyPartitionClause;
import org.apache.doris.analysis.ModifyTablePropertiesClause;
import org.apache.doris.analysis.PartitionRenameClause;
@ -191,6 +192,10 @@ public class Alter {
processReplaceTable(db, olapTable, alterClauses);
} else if (currentAlterOps.contains(AlterOpType.MODIFY_TABLE_PROPERTY_SYNC)) {
needProcessOutsideTableLock = true;
} else if (currentAlterOps.contains(AlterOpType.MODIFY_DISTRIBUTION)) {
Preconditions.checkState(alterClauses.size() == 1);
AlterClause alterClause = alterClauses.get(0);
Catalog.getCurrentCatalog().modifyDefaultDistributionBucketNum(db, olapTable, (ModifyDistributionClause) alterClause);
} else {
throw new DdlException("Invalid alter operations: " + currentAlterOps);
}

View File

@ -37,6 +37,7 @@ public enum AlterOpType {
ALTER_OTHER,
ENABLE_FEATURE,
REPLACE_TABLE,
MODIFY_DISTRIBUTION,
INVALID_OP; // INVALID_OP must be the last one
// true means 2 operations have no conflict.

View File

@ -80,6 +80,10 @@ public class AlterOperations {
return currentOps.contains(AlterOpType.REPLACE_TABLE);
}
public boolean hasModifyBucketNumOp() {
return currentOps.contains(AlterOpType.MODIFY_DISTRIBUTION);
}
public boolean contains(AlterOpType op) {
return currentOps.contains(op);
}

View File

@ -0,0 +1,57 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.analysis;
import org.apache.doris.alter.AlterOpType;
import org.apache.doris.common.AnalysisException;
// clause which is used to modify the default bucket number of hash distribution
// MODIFY DISTRIBUTION DISTRIBUTED BY HASH('key') BUCKETS number;
public class ModifyDistributionClause extends AlterTableClause {
private DistributionDesc distributionDesc;
public DistributionDesc getDistributionDesc() {
return distributionDesc;
}
public ModifyDistributionClause(DistributionDesc distributionDesc) {
super(AlterOpType.MODIFY_DISTRIBUTION);
this.distributionDesc = distributionDesc;
this.needTableStable = false;
}
@Override
public void analyze(Analyzer analyzer) throws AnalysisException {
}
@Override
public String toSql() {
StringBuilder sb = new StringBuilder();
sb.append("MODIFY DISTRIBUTION ");
if (distributionDesc != null) {
sb.append(distributionDesc.toSql());
}
return sb.toString();
}
@Override
public String toString() {
return toSql();
}
}

View File

@ -82,6 +82,7 @@ import org.apache.doris.analysis.TruncateTableStmt;
import org.apache.doris.analysis.UninstallPluginStmt;
import org.apache.doris.analysis.UserDesc;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.analysis.ModifyDistributionClause;
import org.apache.doris.backup.BackupHandler;
import org.apache.doris.catalog.ColocateTableIndex.GroupId;
import org.apache.doris.catalog.Database.DbState;
@ -174,6 +175,7 @@ import org.apache.doris.persist.DropPartitionInfo;
import org.apache.doris.persist.EditLog;
import org.apache.doris.persist.GlobalVarPersistInfo;
import org.apache.doris.persist.ModifyPartitionInfo;
import org.apache.doris.persist.ModifyTableDefaultDistributionBucketNumOperationLog;
import org.apache.doris.persist.ModifyTablePropertyOperationLog;
import org.apache.doris.persist.OperationType;
import org.apache.doris.persist.PartitionPersistInfo;
@ -2196,7 +2198,7 @@ public class Catalog {
return checksum;
}
public long saveResources(DataOutputStream out, long checksum) throws IOException {
public long saveResources(DataOutputStream out, long checksum) throws IOException {
Catalog.getCurrentCatalog().getResourceMgr().write(out);
return checksum;
}
@ -5580,6 +5582,80 @@ public class Catalog {
}
}
public void modifyDefaultDistributionBucketNum(Database db, OlapTable olapTable, ModifyDistributionClause modifyDistributionClause) throws DdlException {
olapTable.writeLock();
try {
if (olapTable.isColocateTable()) {
throw new DdlException("Cannot change default bucket number of colocate table.");
}
if (olapTable.getPartitionInfo().getType() != PartitionType.RANGE) {
throw new DdlException("Only support change partitioned table's distribution.");
}
DistributionInfo defaultDistributionInfo = olapTable.getDefaultDistributionInfo();
if (defaultDistributionInfo.getType() != DistributionInfoType.HASH) {
throw new DdlException("Cannot change default bucket number of distribution type " + defaultDistributionInfo.getType());
}
DistributionDesc distributionDesc = modifyDistributionClause.getDistributionDesc();
DistributionInfo distributionInfo = null;
List<Column> baseSchema = olapTable.getBaseSchema();
if (distributionDesc != null) {
distributionInfo = distributionDesc.toDistributionInfo(baseSchema);
// for now. we only support modify distribution's bucket num
if (distributionInfo.getType() != DistributionInfoType.HASH) {
throw new DdlException("Cannot change distribution type to " + distributionInfo.getType());
}
HashDistributionInfo hashDistributionInfo = (HashDistributionInfo) distributionInfo;
List<Column> newDistriCols = hashDistributionInfo.getDistributionColumns();
List<Column> defaultDistriCols = ((HashDistributionInfo) defaultDistributionInfo).getDistributionColumns();
if (!newDistriCols.equals(defaultDistriCols)) {
throw new DdlException("Cannot assign hash distribution with different distribution cols. "
+ "default is: " + defaultDistriCols);
}
int bucketNum = hashDistributionInfo.getBucketNum();
if (bucketNum <= 0) {
throw new DdlException("Cannot assign hash distribution buckets less than 1");
}
defaultDistributionInfo.setBucketNum(bucketNum);
ModifyTableDefaultDistributionBucketNumOperationLog info = new ModifyTableDefaultDistributionBucketNumOperationLog(db.getId(), olapTable.getId(), bucketNum);
editLog.logModifyDefaultDistributionBucketNum(info);
LOG.info("modify table[{}] default bucket num to {}", olapTable.getName(), bucketNum);
}
} finally {
olapTable.writeUnlock();
}
}
public void replayModifyTableDefaultDistributionBucketNum(short opCode, ModifyTableDefaultDistributionBucketNumOperationLog info) {
long dbId = info.getDbId();
long tableId = info.getTableId();
int bucketNum = info.getBucketNum();
Database db = getDb(dbId);
OlapTable olapTable = (OlapTable) db.getTable(tableId);
if (olapTable == null) {
LOG.warn("table {} does not exist when replaying modify table default distribution bucket number log. db: {}", tableId, dbId);
return;
}
olapTable.writeLock();
try {
DistributionInfo defaultDistributionInfo = olapTable.getDefaultDistributionInfo();
defaultDistributionInfo.setBucketNum(bucketNum);
} finally {
olapTable.writeUnlock();
}
}
/*
* used for handling AlterClusterStmt
* (for client is the ALTER CLUSTER command).
@ -7026,4 +7102,3 @@ public class Catalog {
}
}
}

View File

@ -65,6 +65,11 @@ public abstract class DistributionInfo implements Writable {
throw new NotImplementedException("not implemented");
}
public void setBucketNum(int bucketNum) {
// should override in sub class
throw new NotImplementedException("not implemented");
}
public DistributionDesc toDistributionDesc() {
throw new NotImplementedException();
}

View File

@ -59,6 +59,11 @@ public class HashDistributionInfo extends DistributionInfo {
return bucketNum;
}
@Override
public void setBucketNum(int bucketNum) {
this.bucketNum = bucketNum;
}
public void write(DataOutput out) throws IOException {
super.write(out);
int columnCount = distributionColumns.size();

View File

@ -66,6 +66,7 @@ import org.apache.doris.persist.DropResourceOperationLog;
import org.apache.doris.persist.GlobalVarPersistInfo;
import org.apache.doris.persist.HbPackage;
import org.apache.doris.persist.ModifyPartitionInfo;
import org.apache.doris.persist.ModifyTableDefaultDistributionBucketNumOperationLog;
import org.apache.doris.persist.ModifyTablePropertyOperationLog;
import org.apache.doris.persist.OperationType;
import org.apache.doris.persist.PartitionPersistInfo;
@ -562,6 +563,11 @@ public class JournalEntity implements Writable {
isRead = true;
break;
}
case OperationType.OP_MODIFY_DISTRIBUTION_BUCKET_NUM: {
data = ModifyTableDefaultDistributionBucketNumOperationLog.read(in);
isRead = true;
break;
}
case OperationType.OP_REPLACE_TEMP_PARTITION: {
data = ReplacePartitionOperationLog.read(in);
isRead = true;

View File

@ -755,6 +755,11 @@ public class EditLog {
catalog.replayModifyTableProperty(opCode, modifyTablePropertyOperationLog);
break;
}
case OperationType.OP_MODIFY_DISTRIBUTION_BUCKET_NUM: {
ModifyTableDefaultDistributionBucketNumOperationLog modifyTableDefaultDistributionBucketNumOperationLog = (ModifyTableDefaultDistributionBucketNumOperationLog) journal.getData();
catalog.replayModifyTableDefaultDistributionBucketNum(opCode, modifyTableDefaultDistributionBucketNumOperationLog);
break;
}
case OperationType.OP_REPLACE_TEMP_PARTITION: {
ReplacePartitionOperationLog replaceTempPartitionLog = (ReplacePartitionOperationLog) journal.getData();
catalog.replayReplaceTempPartition(replaceTempPartitionLog);
@ -1340,6 +1345,10 @@ public class EditLog {
logEdit(OperationType.OP_MODIFY_REPLICATION_NUM, info);
}
public void logModifyDefaultDistributionBucketNum(ModifyTableDefaultDistributionBucketNumOperationLog info) {
logEdit(OperationType.OP_MODIFY_DISTRIBUTION_BUCKET_NUM, info);
}
public void logModifyInMemory(ModifyTablePropertyOperationLog info) {
logEdit(OperationType.OP_MODIFY_IN_MEMORY, info);
}

View File

@ -0,0 +1,64 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.persist;
import com.google.gson.annotations.SerializedName;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonUtils;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class ModifyTableDefaultDistributionBucketNumOperationLog implements Writable {
@SerializedName(value = "dbId")
private long dbId;
@SerializedName(value = "tableId")
private long tableId;
@SerializedName(value = "bucketNum")
private int bucketNum;
public ModifyTableDefaultDistributionBucketNumOperationLog(long dbId, long tableId, int bucketNum) {
this.dbId = dbId;
this.tableId = tableId;
this.bucketNum = bucketNum;
}
public long getDbId() {
return dbId;
}
public long getTableId() {
return tableId;
}
public int getBucketNum() {
return bucketNum;
}
@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, GsonUtils.GSON.toJson(this));
}
public static ModifyTableDefaultDistributionBucketNumOperationLog read(DataInput in) throws IOException {
return GsonUtils.GSON.fromJson(Text.readString(in), ModifyTableDefaultDistributionBucketNumOperationLog.class);
}
}

View File

@ -182,6 +182,9 @@ public class OperationType {
// set table in memory
public static final short OP_MODIFY_IN_MEMORY = 267;
// set table default distribution bucket num
public static final short OP_MODIFY_DISTRIBUTION_BUCKET_NUM = 268;
// plugin 270~275
public static final short OP_INSTALL_PLUGIN = 270;

View File

@ -627,6 +627,37 @@ public class AlterTest {
Assert.assertNotNull(replace3.getIndexIdByName("r2"));
}
@Test
public void testModifyBucketNum() throws Exception {
String stmt = "CREATE TABLE test.bucket\n" +
"(\n" +
" k1 int, k2 int, k3 int sum\n" +
")\n" +
"ENGINE = OLAP\n" +
"PARTITION BY RANGE(k1)\n" +
"(\n" +
"PARTITION p1 VALUES LESS THAN (\"100000\"),\n" +
"PARTITION p2 VALUES LESS THAN (\"200000\"),\n" +
"PARTITION p3 VALUES LESS THAN (\"300000\")\n" +
")\n" +
"DISTRIBUTED BY HASH(k1) BUCKETS 10\n" +
"PROPERTIES(\"replication_num\" = \"1\");";
createTable(stmt);
Database db = Catalog.getCurrentCatalog().getDb("default_cluster:test");
String modifyBucketNumStmt = "ALTER TABLE test.bucket MODIFY DISTRIBUTION DISTRIBUTED BY HASH(k1) BUCKETS 1;";
alterTable(modifyBucketNumStmt, false);
OlapTable bucket = (OlapTable) db.getTable("bucket");
Assert.assertEquals(1, bucket.getDefaultDistributionInfo().getBucketNum());
modifyBucketNumStmt = "ALTER TABLE test.bucket MODIFY DISTRIBUTION DISTRIBUTED BY HASH(k1) BUCKETS 30;";
alterTable(modifyBucketNumStmt, false);
bucket = (OlapTable) db.getTable("bucket");
Assert.assertEquals(30, bucket.getDefaultDistributionInfo().getBucketNum());
}
private boolean checkAllTabletsExists(List<Long> tabletIds) {
TabletInvertedIndex invertedIndex = Catalog.getCurrentCatalog().getTabletInvertedIndex();
for (long tabletId : tabletIds) {