[fix](random-distribution) Make aggregate keys table with replace type columns and unique keys table can only have hash distribution to make data computing correctly (#10414)
This commit is contained in:
@ -48,6 +48,7 @@ import org.apache.doris.catalog.OlapTable.OlapTableState;
|
||||
import org.apache.doris.catalog.Partition;
|
||||
import org.apache.doris.catalog.PartitionInfo;
|
||||
import org.apache.doris.catalog.PartitionType;
|
||||
import org.apache.doris.catalog.RandomDistributionInfo;
|
||||
import org.apache.doris.catalog.Replica;
|
||||
import org.apache.doris.catalog.Replica.ReplicaState;
|
||||
import org.apache.doris.catalog.ReplicaAllocation;
|
||||
@ -775,6 +776,13 @@ public class SchemaChangeHandler extends AlterHandler {
|
||||
&& newColumn.getDefaultValue() != null && !newColumn.getDefaultValue().equals("0")) {
|
||||
throw new DdlException("The default value of '"
|
||||
+ newColName + "' with SUM aggregation function must be zero");
|
||||
} else if (olapTable.getDefaultDistributionInfo() instanceof RandomDistributionInfo) {
|
||||
if (newColumn.getAggregationType() == AggregateType.REPLACE
|
||||
|| newColumn.getAggregationType() == AggregateType.REPLACE_IF_NOT_NULL) {
|
||||
throw new DdlException("Can not add value column with aggregation type "
|
||||
+ newColumn.getAggregationType() + " for olap table with random distribution : "
|
||||
+ newColName);
|
||||
}
|
||||
}
|
||||
} else if (KeysType.UNIQUE_KEYS == olapTable.getKeysType()) {
|
||||
if (newColumn.getAggregationType() != null) {
|
||||
@ -1499,6 +1507,11 @@ public class SchemaChangeHandler extends AlterHandler {
|
||||
Catalog.getCurrentCatalog().modifyTableColocate(db, olapTable, colocateGroup, false, null);
|
||||
return;
|
||||
} else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_DISTRIBUTION_TYPE)) {
|
||||
String distributionType = properties.get(PropertyAnalyzer.PROPERTIES_DISTRIBUTION_TYPE);
|
||||
if (!distributionType.equalsIgnoreCase("random")) {
|
||||
throw new DdlException("Only support modifying distribution type of table from"
|
||||
+ " hash to random");
|
||||
}
|
||||
Catalog.getCurrentCatalog().convertDistributionType(db, olapTable);
|
||||
return;
|
||||
} else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_SEND_CLEAR_ALTER_TASK)) {
|
||||
|
||||
@ -20,6 +20,7 @@ package org.apache.doris.analysis;
|
||||
import org.apache.doris.catalog.AggregateType;
|
||||
import org.apache.doris.catalog.Catalog;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.DistributionInfo;
|
||||
import org.apache.doris.catalog.Index;
|
||||
import org.apache.doris.catalog.KeysType;
|
||||
import org.apache.doris.catalog.PrimitiveType;
|
||||
@ -90,9 +91,6 @@ public class CreateTableStmt extends DdlStmt {
|
||||
engineNames.add("hudi");
|
||||
}
|
||||
|
||||
// for backup. set to -1 for normal use
|
||||
private int tableSignature;
|
||||
|
||||
public CreateTableStmt() {
|
||||
// for persist
|
||||
tableName = new TableName();
|
||||
@ -163,7 +161,6 @@ public class CreateTableStmt extends DdlStmt {
|
||||
this.ifNotExists = ifNotExists;
|
||||
this.comment = Strings.nullToEmpty(comment);
|
||||
|
||||
this.tableSignature = -1;
|
||||
this.rollupAlterClauseList = rollupAlterClauseList == null ? new ArrayList<>() : rollupAlterClauseList;
|
||||
}
|
||||
|
||||
@ -243,14 +240,6 @@ public class CreateTableStmt extends DdlStmt {
|
||||
return tableName.getDb();
|
||||
}
|
||||
|
||||
public void setTableSignature(int tableSignature) {
|
||||
this.tableSignature = tableSignature;
|
||||
}
|
||||
|
||||
public int getTableSignature() {
|
||||
return tableSignature;
|
||||
}
|
||||
|
||||
public void setTableName(String newTableName) {
|
||||
tableName = new TableName(tableName.getDb(), newTableName);
|
||||
}
|
||||
@ -421,6 +410,20 @@ public class CreateTableStmt extends DdlStmt {
|
||||
throw new AnalysisException("Create olap table should contain distribution desc");
|
||||
}
|
||||
distributionDesc.analyze(columnSet, columnDefs);
|
||||
if (distributionDesc.type == DistributionInfo.DistributionInfoType.RANDOM) {
|
||||
if (keysDesc.getKeysType() == KeysType.UNIQUE_KEYS) {
|
||||
throw new AnalysisException("Create unique keys table should not contain random distribution desc");
|
||||
} else if (keysDesc.getKeysType() == KeysType.AGG_KEYS) {
|
||||
for (ColumnDef columnDef : columnDefs) {
|
||||
if (columnDef.getAggregateType() == AggregateType.REPLACE
|
||||
|| columnDef.getAggregateType() == AggregateType.REPLACE_IF_NOT_NULL) {
|
||||
throw new AnalysisException("Create aggregate keys table with value columns of which"
|
||||
+ " aggregate type is " + columnDef.getAggregateType() + " should not contain random"
|
||||
+ " distribution desc");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if (engineName.equalsIgnoreCase("elasticsearch")) {
|
||||
EsUtil.analyzePartitionAndDistributionDesc(partitionDesc, distributionDesc);
|
||||
} else {
|
||||
|
||||
@ -4597,6 +4597,21 @@ public class Catalog {
|
||||
public void convertDistributionType(Database db, OlapTable tbl) throws DdlException {
|
||||
tbl.writeLockOrDdlException();
|
||||
try {
|
||||
if (tbl.isColocateTable()) {
|
||||
throw new DdlException("Cannot change distribution type of colocate table.");
|
||||
}
|
||||
if (tbl.getKeysType() == KeysType.UNIQUE_KEYS) {
|
||||
throw new DdlException("Cannot change distribution type of unique keys table.");
|
||||
}
|
||||
if (tbl.getKeysType() == KeysType.AGG_KEYS) {
|
||||
for (Column column : tbl.getBaseSchema()) {
|
||||
if (column.getAggregationType() == AggregateType.REPLACE
|
||||
|| column.getAggregationType() == AggregateType.REPLACE_IF_NOT_NULL) {
|
||||
throw new DdlException("Cannot change distribution type of aggregate keys table which has value"
|
||||
+ " columns with " + column.getAggregationType() + " type.");
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!tbl.convertHashDistributionToRandomDistribution()) {
|
||||
throw new DdlException("Table " + tbl.getName() + " is not hash distributed");
|
||||
}
|
||||
|
||||
@ -18,19 +18,15 @@
|
||||
package org.apache.doris.catalog;
|
||||
|
||||
import org.apache.doris.analysis.DistributionDesc;
|
||||
import org.apache.doris.analysis.Expr;
|
||||
import org.apache.doris.common.io.Text;
|
||||
import org.apache.doris.common.io.Writable;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.gson.annotations.SerializedName;
|
||||
import org.apache.commons.lang.NotImplementedException;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public abstract class DistributionInfo implements Writable {
|
||||
|
||||
@ -88,33 +84,4 @@ public abstract class DistributionInfo implements Writable {
|
||||
public boolean equals(DistributionInfo info) {
|
||||
return false;
|
||||
}
|
||||
|
||||
public static List<Expr> toDistExpr(OlapTable tbl, DistributionInfo distInfo, Map<String, Expr> exprByCol) {
|
||||
List<Expr> distExprs = Lists.newArrayList();
|
||||
if (distInfo instanceof RandomDistributionInfo) {
|
||||
for (Column col : tbl.getBaseSchema()) {
|
||||
if (col.isKey()) {
|
||||
Expr distExpr = exprByCol.get(col.getName());
|
||||
// used to compute hash
|
||||
if (col.getDataType() == PrimitiveType.CHAR) {
|
||||
distExpr.setType(Type.CHAR);
|
||||
}
|
||||
distExprs.add(distExpr);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
} else if (distInfo instanceof HashDistributionInfo) {
|
||||
HashDistributionInfo hashDistInfo = (HashDistributionInfo) distInfo;
|
||||
for (Column col : hashDistInfo.getDistributionColumns()) {
|
||||
Expr distExpr = exprByCol.get(col.getName());
|
||||
// used to compute hash
|
||||
if (col.getDataType() == PrimitiveType.CHAR) {
|
||||
distExpr.setType(Type.CHAR);
|
||||
}
|
||||
distExprs.add(distExpr);
|
||||
}
|
||||
}
|
||||
return distExprs;
|
||||
}
|
||||
}
|
||||
|
||||
@ -31,11 +31,13 @@ import org.apache.doris.backup.CatalogMocker;
|
||||
import org.apache.doris.catalog.AggregateType;
|
||||
import org.apache.doris.catalog.Catalog;
|
||||
import org.apache.doris.catalog.CatalogTestUtil;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.DistributionInfo;
|
||||
import org.apache.doris.catalog.DynamicPartitionProperty;
|
||||
import org.apache.doris.catalog.FakeCatalog;
|
||||
import org.apache.doris.catalog.FakeEditLog;
|
||||
import org.apache.doris.catalog.KeysType;
|
||||
import org.apache.doris.catalog.MaterializedIndex;
|
||||
import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
|
||||
import org.apache.doris.catalog.MaterializedIndex.IndexState;
|
||||
@ -47,6 +49,7 @@ import org.apache.doris.catalog.PrimitiveType;
|
||||
import org.apache.doris.catalog.Replica;
|
||||
import org.apache.doris.catalog.ScalarType;
|
||||
import org.apache.doris.catalog.Tablet;
|
||||
import org.apache.doris.catalog.Type;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
@ -62,7 +65,10 @@ import org.apache.doris.thrift.TTaskType;
|
||||
import org.apache.doris.transaction.FakeTransactionIDGenerator;
|
||||
import org.apache.doris.transaction.GlobalTransactionMgr;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import mockit.Expectations;
|
||||
import mockit.Injectable;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
@ -353,7 +359,7 @@ public class SchemaChangeJobV2Test {
|
||||
Assert.assertEquals(3, olapTable.getTableProperty().getDynamicPartitionProperty().getBuckets());
|
||||
}
|
||||
|
||||
public void modifyDynamicPartitionWithoutTableProperty(String propertyKey, String propertyValue, String missPropertyKey)
|
||||
public void modifyDynamicPartitionWithoutTableProperty(String propertyKey, String propertyValue)
|
||||
throws UserException {
|
||||
fakeCatalog = new FakeCatalog();
|
||||
FakeCatalog.setCatalog(masterCatalog);
|
||||
@ -375,11 +381,11 @@ public class SchemaChangeJobV2Test {
|
||||
|
||||
@Test
|
||||
public void testModifyDynamicPartitionWithoutTableProperty() throws UserException {
|
||||
modifyDynamicPartitionWithoutTableProperty(DynamicPartitionProperty.ENABLE, "false", DynamicPartitionProperty.TIME_UNIT);
|
||||
modifyDynamicPartitionWithoutTableProperty(DynamicPartitionProperty.TIME_UNIT, "day", DynamicPartitionProperty.ENABLE);
|
||||
modifyDynamicPartitionWithoutTableProperty(DynamicPartitionProperty.END, "3", DynamicPartitionProperty.ENABLE);
|
||||
modifyDynamicPartitionWithoutTableProperty(DynamicPartitionProperty.PREFIX, "p", DynamicPartitionProperty.ENABLE);
|
||||
modifyDynamicPartitionWithoutTableProperty(DynamicPartitionProperty.BUCKETS, "30", DynamicPartitionProperty.ENABLE);
|
||||
modifyDynamicPartitionWithoutTableProperty(DynamicPartitionProperty.ENABLE, "false");
|
||||
modifyDynamicPartitionWithoutTableProperty(DynamicPartitionProperty.TIME_UNIT, "day");
|
||||
modifyDynamicPartitionWithoutTableProperty(DynamicPartitionProperty.END, "3");
|
||||
modifyDynamicPartitionWithoutTableProperty(DynamicPartitionProperty.PREFIX, "p");
|
||||
modifyDynamicPartitionWithoutTableProperty(DynamicPartitionProperty.BUCKETS, "30");
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -433,4 +439,64 @@ public class SchemaChangeJobV2Test {
|
||||
Partition partition1 = olapTable.getPartition(CatalogTestUtil.testPartitionId1);
|
||||
Assert.assertTrue(partition1.getDistributionInfo().getType() == DistributionInfo.DistributionInfoType.RANDOM);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAbnormalModifyTableDistributionType1(@Injectable OlapTable table) throws UserException {
|
||||
fakeCatalog = new FakeCatalog();
|
||||
fakeEditLog = new FakeEditLog();
|
||||
FakeCatalog.setCatalog(masterCatalog);
|
||||
Database db = masterCatalog.getDb(CatalogTestUtil.testDbId1).get();
|
||||
new Expectations() {
|
||||
{
|
||||
table.isColocateTable();
|
||||
result = true;
|
||||
}
|
||||
};
|
||||
expectedEx.expect(DdlException.class);
|
||||
expectedEx.expectMessage("errCode = 2, detailMessage = Cannot change distribution type of colocate table.");
|
||||
Catalog.getCurrentCatalog().convertDistributionType(db, table);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAbnormalModifyTableDistributionType2(@Injectable OlapTable table) throws UserException {
|
||||
fakeCatalog = new FakeCatalog();
|
||||
fakeEditLog = new FakeEditLog();
|
||||
FakeCatalog.setCatalog(masterCatalog);
|
||||
Database db = masterCatalog.getDb(CatalogTestUtil.testDbId1).get();
|
||||
new Expectations() {
|
||||
{
|
||||
table.isColocateTable();
|
||||
result = false;
|
||||
table.getKeysType();
|
||||
result = KeysType.UNIQUE_KEYS;
|
||||
}
|
||||
};
|
||||
expectedEx.expect(DdlException.class);
|
||||
expectedEx.expectMessage("errCode = 2, detailMessage = Cannot change distribution type of unique keys table.");
|
||||
Catalog.getCurrentCatalog().convertDistributionType(db, table);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAbnormalModifyTableDistributionType3(@Injectable OlapTable table) throws UserException {
|
||||
fakeCatalog = new FakeCatalog();
|
||||
fakeEditLog = new FakeEditLog();
|
||||
FakeCatalog.setCatalog(masterCatalog);
|
||||
Database db = masterCatalog.getDb(CatalogTestUtil.testDbId1).get();
|
||||
new Expectations() {
|
||||
{
|
||||
table.isColocateTable();
|
||||
result = false;
|
||||
table.getKeysType();
|
||||
result = KeysType.AGG_KEYS;
|
||||
table.getBaseSchema();
|
||||
result = Lists.newArrayList(
|
||||
new Column("k1", Type.INT, true, null, "0", ""),
|
||||
new Column("v1", Type.INT, false, AggregateType.REPLACE, "0", ""));
|
||||
}
|
||||
};
|
||||
expectedEx.expect(DdlException.class);
|
||||
expectedEx.expectMessage("errCode = 2, detailMessage = Cannot change "
|
||||
+ "distribution type of aggregate keys table which has value columns with REPLACE type.");
|
||||
Catalog.getCurrentCatalog().convertDistributionType(db, table);
|
||||
}
|
||||
}
|
||||
|
||||
@ -28,7 +28,6 @@ import org.apache.doris.common.MetaNotFoundException;
|
||||
import org.apache.doris.persist.EditLog;
|
||||
import org.apache.doris.system.Backend;
|
||||
import org.apache.doris.system.SystemInfoService;
|
||||
import org.apache.doris.thrift.TDisk;
|
||||
import org.apache.doris.thrift.TStorageMedium;
|
||||
import org.apache.doris.thrift.TStorageType;
|
||||
|
||||
@ -38,7 +37,6 @@ import com.google.common.collect.Maps;
|
||||
import java.lang.reflect.Constructor;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
@ -325,16 +323,4 @@ public class CatalogTestUtil {
|
||||
backend.setAlive(true);
|
||||
return backend;
|
||||
}
|
||||
|
||||
public static Backend createBackend(long id, String host, int heartPort, int bePort, int httpPort,
|
||||
long totalCapacityB, long avaiLabelCapacityB) {
|
||||
Backend backend = createBackend(id, host, heartPort, bePort, httpPort);
|
||||
Map<String, TDisk> backendDisks = new HashMap<String, TDisk>();
|
||||
String rootPath = "root_path";
|
||||
TDisk disk = new TDisk(rootPath, totalCapacityB, avaiLabelCapacityB, true);
|
||||
backendDisks.put(rootPath, disk);
|
||||
backend.updateDisks(backendDisks);
|
||||
backend.setAlive(true);
|
||||
return backend;
|
||||
}
|
||||
}
|
||||
|
||||
@ -485,6 +485,36 @@ public class CreateTableTest {
|
||||
+ " \"dynamic_partition.start_day_of_month\" = \"3\"\n"
|
||||
+ ");"));
|
||||
|
||||
ExceptionChecker.expectThrowsWithMsg(AnalysisException.class,
|
||||
"Create unique keys table should not contain random distribution desc",
|
||||
() -> createTable("CREATE TABLE test.tbl21\n"
|
||||
+ "(\n"
|
||||
+ " `k1` bigint(20) NULL COMMENT \"\",\n"
|
||||
+ " `k2` largeint(40) NULL COMMENT \"\",\n"
|
||||
+ " `v1` varchar(204) NULL COMMENT \"\",\n"
|
||||
+ " `v2` smallint(6) NULL DEFAULT \"10\" COMMENT \"\"\n"
|
||||
+ ") ENGINE=OLAP\n"
|
||||
+ "UNIQUE KEY(`k1`, `k2`)\n"
|
||||
+ "DISTRIBUTED BY RANDOM BUCKETS 32\n"
|
||||
+ "PROPERTIES (\n"
|
||||
+ "\"replication_allocation\" = \"tag.location.default: 1\"\n"
|
||||
+ ");"));
|
||||
|
||||
ExceptionChecker.expectThrowsWithMsg(AnalysisException.class,
|
||||
"Create aggregate keys table with value columns of which aggregate type"
|
||||
+ " is REPLACE should not contain random distribution desc",
|
||||
() -> createTable("CREATE TABLE test.tbl22\n"
|
||||
+ "(\n"
|
||||
+ " `k1` bigint(20) NULL COMMENT \"\",\n"
|
||||
+ " `k2` largeint(40) NULL COMMENT \"\",\n"
|
||||
+ " `v1` bigint(20) REPLACE NULL COMMENT \"\",\n"
|
||||
+ " `v2` smallint(6) REPLACE_IF_NOT_NULL NULL DEFAULT \"10\" COMMENT \"\"\n"
|
||||
+ ") ENGINE=OLAP\n"
|
||||
+ "AGGREGATE KEY(`k1`, `k2`)\n"
|
||||
+ "DISTRIBUTED BY RANDOM BUCKETS 32\n"
|
||||
+ "PROPERTIES (\n"
|
||||
+ "\"replication_allocation\" = \"tag.location.default: 1\"\n"
|
||||
+ ");"));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
Reference in New Issue
Block a user