[CreateTable] Check backend disk has available capacity by storage medium before create table (#3519)

Currently we choose BE random without check disk is available, 
the create table will failed until create tablet task is sent to BE
and BE will check is there has available capacity to create tablet.
So check backend disk available by storage medium will reduce unnecessary RPC call.
This commit is contained in:
WingC
2020-06-27 20:36:31 -05:00
committed by GitHub
parent dc603de4bd
commit b2b9e22b24
14 changed files with 488 additions and 400 deletions

View File

@ -591,3 +591,10 @@ The value for thrift_client_timeout_ms is set to be larger than zero to prevent
### `with_k8s_certs`
### `enable_strict_storage_medium_check`
This configuration indicates that when the table is being built, it checks for the presence of the appropriate storage medium in the cluster. For example, when the user specifies that the storage medium is' SSD 'when the table is built, but only' HDD 'disks exist in the cluster,
If this parameter is' True ', the error 'Failed to find enough host in all Backends with storage medium with storage medium is SSD, need 3'.
If this parameter is' False ', no error is reported when the table is built. Instead, the table is built on a disk with 'HDD' as the storage medium.

View File

@ -251,7 +251,9 @@ Replication_num
* The BE data storage directory can be explicitly specified as SSD or HDD (differentiated by .SSD or .HDD suffix). When you build a table, you can uniformly specify the media for all Partition initial storage. Note that the suffix is ​​to explicitly specify the disk media without checking to see if it matches the actual media type.
* The default initial storage media can be specified by `default_storage_medium= XXX` in the fe configuration file `fe.conf`, or, if not, by default, HDD. If specified as an SSD, the data is initially stored on the SSD.
* If storage\_cooldown\_time is not specified, the data is automatically migrated from the SSD to the HDD after 30 days by default. If storage\_cooldown\_time is specified, the data will not migrate until the storage_cooldown_time time is reached.
* Note that this parameter is just a "best effort" setting when storage_medium is specified. Even if no SSD storage media is set in the cluster, no error is reported and it is automatically stored in the available data directory. Similarly, if the SSD media is inaccessible and out of space, the data may initially be stored directly on other available media. When the data expires and is migrated to the HDD, if the HDD media is inaccessible and there is not enough space, the migration may fail (but will continue to try).
* Note that when storage_medium is specified, if FE parameter 'enable_strict_storage_medium_check' is' True 'this parameter is simply a' do your best 'setting. Even if SSD storage media is not set up within the cluster, no errors are reported, and it is automatically stored in the available data directory.
Similarly, if the SSD media is not accessible and space is insufficient, it is possible to initially store data directly on other available media. When the data is due to be migrated to an HDD, the migration may also fail (but will try again and again) if the HDD medium is not accessible and space is insufficient.
If FE parameter 'enable_strict_storage_medium_check' is' False ', then 'Failed to find enough host in all Backends with storage medium is SSD' will be reported when SSD storage medium is not set in the cluster.
### ENGINE

View File

@ -222,6 +222,7 @@ Syntax:
```
storage_medium: SSD or HDD, The default initial storage media can be specified by `default_storage_medium= XXX` in the fe configuration file `fe.conf`, or, if not, by default, HDD.
Note: when FE configuration 'enable_strict_storage_medium_check' is' True ', if the corresponding storage medium is not set in the cluster, the construction clause 'Failed to find enough host in all backends with storage medium is SSD|HDD'.
storage_cooldown_time: If storage_medium is SSD, data will be automatically moved to HDD when timeout.
Default is 30 days.
Format: "yyyy-MM-dd HH:mm:ss"

View File

@ -589,3 +589,10 @@ thrift_client_timeout_ms 的值被设置为大于0来避免线程卡在java.net.
### `with_k8s_certs`
### `enable_strict_storage_medium`
该配置表示在建表时,检查集群中是否存在相应的存储介质。例如当用户指定建表时存储介质为`SSD`,但此时集群中只存在`HDD`的磁盘时,
若该参数为`True`,则建表时会报错 `Failed to find enough host in all backends with storage medium with storage medium is SSD, need 3`.
若该参数为`False`,则建表时不会报错,而是将表建立在存储介质为`HDD`的磁盘上。

View File

@ -254,7 +254,9 @@ PARTITION BY RANGE(`date`, `id`)
* BE 的数据存储目录可以显式的指定为 SSD 或者 HDD(通过 .SSD 或者 .HDD 后缀区分)。建表时,可以统一指定所有 Partition 初始存储的介质。注意,后缀作用是显式指定磁盘介质,而不会检查是否与实际介质类型相符。
* 默认初始存储介质可通过fe的配置文件 `fe.conf` 中指定 `default_storage_medium=xxx`,如果没有指定,则默认为 HDD。如果指定为 SSD,则数据初始存放在 SSD 上。
* 如果没有指定 storage\_cooldown\_time,则默认 30 天后,数据会从 SSD 自动迁移到 HDD 上。如果指定了 storage\_cooldown\_time,则在到达 storage_cooldown_time 时间后,数据才会迁移。
* 注意,当指定 storage_medium 时,该参数只是一个“尽力而为”的设置。即使集群内没有设置 SSD 存储介质,也不会报错,而是自动存储在可用的数据目录中。同样,如果 SSD 介质不可访问、空间不足,都可能导致数据初始直接存储在其他可用介质上。而数据到期迁移到 HDD 时,如果 HDD 介质不可访问、空间不足,也可能迁移失败(但是会不断尝试)。
* 注意,当指定 storage_medium 时,如果FE参数 `enable_strict_storage_medium_check` 为 `True` 该参数只是一个“尽力而为”的设置。即使集群内没有设置 SSD 存储介质,也不会报错,而是自动存储在可用的数据目录中。
同样,如果 SSD 介质不可访问、空间不足,都可能导致数据初始直接存储在其他可用介质上。而数据到期迁移到 HDD 时,如果 HDD 介质不可访问、空间不足,也可能迁移失败(但是会不断尝试)。
如果FE参数 `enable_strict_storage_medium_check` 为 `False` 则当集群内没有设置 SSD 存储介质时,会报错 `Failed to find enough host in all backends with storage medium is SSD`。
### ENGINE

View File

@ -243,10 +243,11 @@ under the License.
```
storage_medium: 用于指定该分区的初始存储介质,可选择 SSD 或 HDD。默认初始存储介质可通过fe的配置文件 `fe.conf` 中指定 `default_storage_medium=xxx`,如果没有指定,则默认为 HDD。
storage_cooldown_time: 当设置存储介质为 SSD 时,指定该分区在 SSD 上的存储到期时间。
默认存放 30 天
格式为:"yyyy-MM-dd HH:mm:ss"
replication_num: 指定分区的副本数。默认为 3
注意:当FE配置项 `enable_strict_storage_medium_check` 为 `True` 时,若集群中没有设置对应的存储介质时,建表语句会报错 `Failed to find enough host in all backends with storage medium is SSD|HDD`.
storage_cooldown_time: 当设置存储介质为 SSD 时,指定该分区在 SSD 上的存储到期时间
默认存放 30 天。
格式为:"yyyy-MM-dd HH:mm:ss"
replication_num: 指定分区的副本数。默认为 3
当表为单分区表时,这些属性为表的属性。
当表为两级分区时,这些属性为附属于每一个分区。

View File

@ -436,6 +436,11 @@ public class Catalog {
return this.tabletInvertedIndex;
}
// only for test
public void setColocateTableIndex(ColocateTableIndex colocateTableIndex) {
this.colocateTableIndex = colocateTableIndex;
}
public ColocateTableIndex getColocateTableIndex() {
return this.colocateTableIndex;
}
@ -4282,7 +4287,11 @@ public class Catalog {
if (chooseBackendsArbitrary) {
// This is the first colocate table in the group, or just a normal table,
// randomly choose backends
chosenBackendIds = chosenBackendIdBySeq(replicationNum, clusterName);
if (Config.enable_strict_storage_medium_check) {
chosenBackendIds = chosenBackendIdBySeq(replicationNum, clusterName, tabletMeta.getStorageMedium());
} else {
chosenBackendIds = chosenBackendIdBySeq(replicationNum, clusterName);
}
backendsPerBucketSeq.add(chosenBackendIds);
} else {
// get backends from existing backend sequence
@ -4299,7 +4308,7 @@ public class Catalog {
Preconditions.checkState(chosenBackendIds.size() == replicationNum, chosenBackendIds.size() + " vs. "+ replicationNum);
}
if (backendsPerBucketSeq != null && groupId != null) {
if (groupId != null) {
colocateIndex.addBackendsPerBucketSeq(groupId, backendsPerBucketSeq);
}
@ -4309,6 +4318,15 @@ public class Catalog {
}
// create replicas for tablet with random chosen backends
private List<Long> chosenBackendIdBySeq(int replicationNum, String clusterName, TStorageMedium storageMedium) throws DdlException {
List<Long> chosenBackendIds = Catalog.getCurrentSystemInfo().seqChooseBackendIdsByStorageMedium(replicationNum,
true, true, clusterName, storageMedium);
if (chosenBackendIds == null) {
throw new DdlException("Failed to find enough host with storage medium is " + storageMedium + " in all backends. need: " + replicationNum);
}
return chosenBackendIds;
}
private List<Long> chosenBackendIdBySeq(int replicationNum, String clusterName) throws DdlException {
List<Long> chosenBackendIds = Catalog.getCurrentSystemInfo().seqChooseBackendIds(replicationNum, true, true, clusterName);
if (chosenBackendIds == null) {
@ -5003,12 +5021,9 @@ public class Catalog {
}
// check if rollup has same name
if (table.getType() == TableType.OLAP) {
OlapTable olapTable = (OlapTable) table;
for (String idxName: olapTable.getIndexNameToId().keySet()) {
if (idxName.equals(newTableName)) {
throw new DdlException("New name conflicts with rollup index name: " + idxName);
}
for (String idxName : table.getIndexNameToId().keySet()) {
if (idxName.equals(newTableName)) {
throw new DdlException("New name conflicts with rollup index name: " + idxName);
}
}

File diff suppressed because it is too large Load Diff

View File

@ -32,6 +32,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.doris.thrift.TStorageMedium;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -303,7 +304,7 @@ public class Backend implements Writable {
}
public boolean hasPathHash() {
return disksRef.get().values().stream().allMatch(v -> v.hasPathHash());
return disksRef.get().values().stream().allMatch(DiskInfo::hasPathHash);
}
public long getTotalCapacityB() {
@ -354,6 +355,36 @@ public class Backend implements Writable {
return maxPct;
}
public boolean diskExceedLimitByStorageMedium(TStorageMedium storageMedium) {
if (getDiskNumByStorageMedium(storageMedium) <= 0) {
return true;
}
ImmutableMap<String, DiskInfo> diskInfos = disksRef.get();
boolean exceedLimit = true;
for (DiskInfo diskInfo : diskInfos.values()) {
if (diskInfo.getState() == DiskState.ONLINE && diskInfo.getStorageMedium() == storageMedium && !diskInfo.exceedLimit(true)) {
exceedLimit = false;
break;
}
}
return exceedLimit;
}
public boolean diskExceedLimit() {
if (getDiskNum() <= 0) {
return true;
}
ImmutableMap<String, DiskInfo> diskInfos = disksRef.get();
boolean exceedLimit = true;
for (DiskInfo diskInfo : diskInfos.values()) {
if (diskInfo.getState() == DiskState.ONLINE && !diskInfo.exceedLimit(true)) {
exceedLimit = false;
break;
}
}
return exceedLimit;
}
public String getPathByPathHash(long pathHash) {
for (DiskInfo diskInfo : disksRef.get().values()) {
if (diskInfo.getPathHash() == pathHash) {
@ -640,6 +671,14 @@ public class Backend implements Writable {
return tabletMaxCompactionScore.get();
}
private long getDiskNumByStorageMedium(TStorageMedium storageMedium) {
return disksRef.get().values().stream().filter(v -> v.getStorageMedium() == storageMedium).count();
}
private int getDiskNum() {
return disksRef.get().size();
}
/**
* Note: This class must be a POJO in order to display in JSON format
* Add additional information in the class to show in `show backends`

View File

@ -40,6 +40,7 @@ import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import org.apache.commons.validator.routines.InetAddressValidator;
import org.apache.doris.thrift.TStorageMedium;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -58,6 +59,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
public class SystemInfoService {
private static final Logger LOG = LogManager.getLogger(SystemInfoService.class);
@ -421,6 +423,7 @@ public class SystemInfoService {
* @param shrinkNum
* @return
*/
@Deprecated
public List<Long> calculateDecommissionBackends(String clusterName, int shrinkNum) {
LOG.info("calculate decommission backend in cluster: {}. decommission num: {}", clusterName, shrinkNum);
@ -725,12 +728,24 @@ public class SystemInfoService {
return classMap;
}
public List<Long> seqChooseBackendIdsByStorageMedium(int backendNum, boolean needAlive, boolean isCreate,
String clusterName, TStorageMedium storageMedium) {
final List<Backend> backends = getClusterBackends(clusterName).stream().filter(v -> !v.diskExceedLimitByStorageMedium(storageMedium)).collect(Collectors.toList());
return seqChooseBackendIds(backendNum, needAlive, isCreate, clusterName, backends);
}
public List<Long> seqChooseBackendIds(int backendNum, boolean needAlive, boolean isCreate,
String clusterName) {
final List<Backend> backends = getClusterBackends(clusterName).stream().filter(v -> !v.diskExceedLimit()).collect(Collectors.toList());
return seqChooseBackendIds(backendNum, needAlive, isCreate, clusterName, backends);
}
// choose backends by round robin
// return null if not enough backend
// use synchronized to run serially
public synchronized List<Long> seqChooseBackendIds(int backendNum, boolean needAlive, boolean isCreate,
String clusterName) {
long lastBackendId = -1L;
String clusterName, final List<Backend> srcBackends) {
long lastBackendId;
if (clusterName.equals(DEFAULT_CLUSTER)) {
if (isCreate) {
@ -756,8 +771,6 @@ public class SystemInfoService {
}
}
// put backend with same host in same list
final List<Backend> srcBackends = getClusterBackends(clusterName);
// host -> BE list
Map<String, List<Backend>> backendMaps = Maps.newHashMap();
for (Backend backend : srcBackends) {

View File

@ -17,205 +17,94 @@
package org.apache.doris.catalog;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.ColumnDef;
import org.apache.doris.analysis.CreateDbStmt;
import org.apache.doris.analysis.CreateTableStmt;
import org.apache.doris.analysis.HashDistributionDesc;
import org.apache.doris.analysis.KeysDesc;
import org.apache.doris.analysis.TableName;
import org.apache.doris.analysis.TypeDef;
import org.apache.doris.analysis.DropDbStmt;
import org.apache.doris.catalog.ColocateTableIndex.GroupId;
import org.apache.doris.cluster.Cluster;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.jmockit.Deencapsulation;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.mysql.privilege.PaloAuth;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.persist.EditLog;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.task.AgentBatchTask;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import org.apache.doris.utframe.UtFrameUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.util.HashMap;
import java.io.File;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import mockit.Expectations;
import mockit.Injectable;
import mockit.Mock;
import mockit.MockUp;
import java.util.UUID;
public class ColocateTableTest {
private TableName dbTableName1;
private TableName dbTableName2;
private TableName dbTableName3;
private String dbName = "default:testDb";
private String groupName1 = "group1";
private String tableName1 = "t1";
private String tableName2 = "t2";
private String tableName3 = "t3";
private String clusterName = "default";
private List<Long> beIds = Lists.newArrayList();
private List<String> columnNames = Lists.newArrayList();
private List<ColumnDef> columnDefs = Lists.newArrayList();
private Map<String, String> properties = new HashMap<String, String>();
private static String runningDir = "fe/mocked/ColocateTableTest" + UUID.randomUUID().toString() + "/";
private Catalog catalog;
private Database db;
private Analyzer analyzer;
@Injectable
private ConnectContext connectContext;
@Injectable
private SystemInfoService systemInfoService;
@Injectable
private PaloAuth paloAuth;
@Injectable
private EditLog editLog;
private static ConnectContext connectContext;
private static String dbName = "testDb";
private static String fullDbName = "default_cluster:" + dbName;
private static String tableName1 = "t1";
private static String tableName2 = "t2";
private static String groupName = "group1";
@Rule
public ExpectedException expectedEx = ExpectedException.none();
@Before
public void setUp() throws Exception {
dbTableName1 = new TableName(dbName, tableName1);
dbTableName2 = new TableName(dbName, tableName2);
dbTableName3 = new TableName(dbName, tableName3);
@BeforeClass
public static void beforeClass() throws Exception {
UtFrameUtils.createMinDorisCluster(runningDir);
connectContext = UtFrameUtils.createDefaultCtx();
beIds.clear();
beIds.add(1L);
beIds.add(2L);
beIds.add(3L);
columnNames.clear();
columnNames.add("key1");
columnNames.add("key2");
columnDefs.clear();
columnDefs.add(new ColumnDef("key1", new TypeDef(ScalarType.createType(PrimitiveType.INT))));
columnDefs.add(new ColumnDef("key2", new TypeDef(ScalarType.createVarchar(10))));
catalog = Deencapsulation.newInstance(Catalog.class);
analyzer = new Analyzer(catalog, connectContext);
new Expectations(analyzer) {
{
analyzer.getClusterName();
result = clusterName;
}
};
dbTableName1.analyze(analyzer);
dbTableName2.analyze(analyzer);
dbTableName3.analyze(analyzer);
Config.disable_colocate_join = false;
new Expectations(catalog) {
{
Catalog.getCurrentCatalog();
result = catalog;
Catalog.getCurrentCatalog();
result = catalog;
Catalog.getCurrentSystemInfo();
result = systemInfoService;
systemInfoService.checkClusterCapacity(anyString);
systemInfoService.seqChooseBackendIds(anyInt, true, true, anyString);
result = beIds;
catalog.getAuth();
result = paloAuth;
paloAuth.checkTblPriv((ConnectContext) any, anyString, anyString, PrivPredicate.CREATE);
result = true;
paloAuth.checkTblPriv((ConnectContext) any, anyString, anyString, PrivPredicate.DROP);
result = true; minTimes = 0; maxTimes = 1;
}
};
new Expectations() {
{
Deencapsulation.setField(catalog, "editLog", editLog);
}
};
initDatabase();
db = catalog.getDb(dbName);
new MockUp<AgentBatchTask>() {
@Mock
void run() {
return;
}
};
new MockUp<CountDownLatch>() {
@Mock
boolean await(long timeout, TimeUnit unit) {
return true;
}
};
}
private void initDatabase() throws Exception {
CreateDbStmt dbStmt = new CreateDbStmt(true, dbName);
new Expectations(dbStmt) {
{
dbStmt.getClusterName();
result = clusterName;
}
};
@AfterClass
public static void tearDown() {
File file = new File(runningDir);
file.delete();
}
ConcurrentHashMap<String, Cluster> nameToCluster = new ConcurrentHashMap<>();
nameToCluster.put(clusterName, new Cluster(clusterName, 1));
new Expectations() {
{
Deencapsulation.setField(catalog, "nameToCluster", nameToCluster);
}
};
catalog.createDb(dbStmt);
@Before
public void createDb() throws Exception {
String createDbStmtStr = "create database " + dbName;
CreateDbStmt createDbStmt = (CreateDbStmt) UtFrameUtils.parseAndAnalyzeStmt(createDbStmtStr, connectContext);
Catalog.getCurrentCatalog().createDb(createDbStmt);
Catalog.getCurrentCatalog().setColocateTableIndex(new ColocateTableIndex());
}
@After
public void tearDown() throws Exception {
catalog.clear();
public void dropDb() throws Exception {
String dropDbStmtStr = "drop database " + dbName;
DropDbStmt dropDbStmt = (DropDbStmt) UtFrameUtils.parseAndAnalyzeStmt(dropDbStmtStr, connectContext);
Catalog.getCurrentCatalog().dropDb(dropDbStmt);
}
private void createOneTable(int numBucket, Map<String, String> properties) throws Exception {
properties.put(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH, groupName1);
CreateTableStmt stmt = new CreateTableStmt(false, false, dbTableName1, columnDefs, "olap",
new KeysDesc(KeysType.AGG_KEYS, columnNames), null,
new HashDistributionDesc(numBucket, Lists.newArrayList("key1")), properties, null, "");
stmt.analyze(analyzer);
catalog.createTable(stmt);
private static void createTable(String sql) throws Exception {
CreateTableStmt createTableStmt = (CreateTableStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, connectContext);
Catalog.getCurrentCatalog().createTable(createTableStmt);
}
@Test
public void testCreateOneTable() throws Exception {
int numBucket = 1;
createOneTable(numBucket, properties);
createTable("create table " + dbName + "." + tableName1 + " (\n" +
" `k1` int NULL COMMENT \"\",\n" +
" `k2` varchar(10) NULL COMMENT \"\"\n" +
") ENGINE=OLAP\n" +
"DUPLICATE KEY(`k1`, `k2`)\n" +
"COMMENT \"OLAP\"\n" +
"DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 1\n" +
"PROPERTIES (\n" +
" \"replication_num\" = \"1\",\n" +
" \"colocate_with\" = \"" + groupName + "\"\n" +
");");
ColocateTableIndex index = Catalog.getCurrentColocateIndex();
Database db = Catalog.getCurrentCatalog().getDb(fullDbName);
long tableId = db.getTable(tableName1).getId();
Assert.assertEquals(1, Deencapsulation.<Multimap<GroupId, Long>>getField(index, "group2Tables").size());
@ -232,35 +121,49 @@ public class ColocateTableTest {
GroupId groupId = index.getGroup(tableId);
List<Long> backendIds = index.getBackendsPerBucketSeq(groupId).get(0);
Assert.assertEquals(beIds, backendIds);
System.out.println(backendIds);
Assert.assertEquals(Collections.singletonList(10001L), backendIds);
String fullGroupName = dbId + "_" + groupName1;
String fullGroupName = dbId + "_" + groupName;
Assert.assertEquals(tableId, index.getTableIdByGroup(fullGroupName));
ColocateGroupSchema groupSchema = index.getGroupSchema(fullGroupName);
Assert.assertNotNull(groupSchema);
Assert.assertEquals(dbId, groupSchema.getGroupId().dbId);
Assert.assertEquals(numBucket, groupSchema.getBucketsNum());
Assert.assertEquals(3, groupSchema.getReplicationNum());
Assert.assertEquals(1, groupSchema.getBucketsNum());
Assert.assertEquals(1, groupSchema.getReplicationNum());
}
@Test
public void testCreateTwoTableWithSameGroup() throws Exception {
int numBucket = 1;
createTable("create table " + dbName + "." + tableName1 + " (\n" +
" `k1` int NULL COMMENT \"\",\n" +
" `k2` varchar(10) NULL COMMENT \"\"\n" +
") ENGINE=OLAP\n" +
"DUPLICATE KEY(`k1`, `k2`)\n" +
"COMMENT \"OLAP\"\n" +
"DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 1\n" +
"PROPERTIES (\n" +
" \"replication_num\" = \"1\",\n" +
" \"colocate_with\" = \"" + groupName + "\"\n" +
");");
createOneTable(numBucket, properties);
// create second table
properties.put(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH, groupName1);
CreateTableStmt secondStmt = new CreateTableStmt(false, false, dbTableName2, columnDefs, "olap",
new KeysDesc(KeysType.AGG_KEYS, columnNames), null,
new HashDistributionDesc(numBucket, Lists.newArrayList("key1")), properties, null, "");
secondStmt.analyze(analyzer);
catalog.createTable(secondStmt);
createTable("create table " + dbName + "." + tableName2 + " (\n" +
" `k1` int NULL COMMENT \"\",\n" +
" `k2` varchar(10) NULL COMMENT \"\"\n" +
") ENGINE=OLAP\n" +
"DUPLICATE KEY(`k1`, `k2`)\n" +
"COMMENT \"OLAP\"\n" +
"DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 1\n" +
"PROPERTIES (\n" +
" \"replication_num\" = \"1\",\n" +
" \"colocate_with\" = \"" + groupName + "\"\n" +
");");
ColocateTableIndex index = Catalog.getCurrentColocateIndex();
Database db = Catalog.getCurrentCatalog().getDb(fullDbName);
long firstTblId = db.getTable(tableName1).getId();
long secondTblId = db.getTable(tableName2).getId();
Assert.assertEquals(2, Deencapsulation.<Multimap<GroupId, Long>>getField(index, "group2Tables").size());
Assert.assertEquals(1, index.getAllGroupIds().size());
Assert.assertEquals(2, Deencapsulation.<Map<Long, GroupId>>getField(index, "table2Group").size());
@ -301,74 +204,118 @@ public class ColocateTableTest {
@Test
public void testBucketNum() throws Exception {
int firstBucketNum = 1;
createOneTable(firstBucketNum, properties);
properties.put(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH, groupName1);
int secondBucketNum = 2;
CreateTableStmt secondStmt = new CreateTableStmt(false, false, dbTableName2, columnDefs, "olap",
new KeysDesc(KeysType.AGG_KEYS, columnNames), null,
new HashDistributionDesc(secondBucketNum, Lists.newArrayList("key1")), properties, null, "");
secondStmt.analyze(analyzer);
createTable("create table " + dbName + "." + tableName1 + " (\n" +
" `k1` int NULL COMMENT \"\",\n" +
" `k2` varchar(10) NULL COMMENT \"\"\n" +
") ENGINE=OLAP\n" +
"DUPLICATE KEY(`k1`, `k2`)\n" +
"COMMENT \"OLAP\"\n" +
"DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 1\n" +
"PROPERTIES (\n" +
" \"replication_num\" = \"1\",\n" +
" \"colocate_with\" = \"" + groupName + "\"\n" +
");");
expectedEx.expect(DdlException.class);
expectedEx.expectMessage("Colocate tables must have same bucket num: 1");
createTable("create table " + dbName + "." + tableName2 + " (\n" +
" `k1` int NULL COMMENT \"\",\n" +
" `k2` varchar(10) NULL COMMENT \"\"\n" +
") ENGINE=OLAP\n" +
"DUPLICATE KEY(`k1`, `k2`)\n" +
"COMMENT \"OLAP\"\n" +
"DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 2\n" +
"PROPERTIES (\n" +
" \"replication_num\" = \"1\",\n" +
" \"colocate_with\" = \"" + groupName + "\"\n" +
");");
catalog.createTable(secondStmt);
}
@Test
public void testReplicationNum() throws Exception {
int bucketNum = 1;
createOneTable(bucketNum, properties);
properties.put(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH, groupName1);
properties.put(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM, "2");
CreateTableStmt secondStmt = new CreateTableStmt(false, false, dbTableName2, columnDefs, "olap",
new KeysDesc(KeysType.AGG_KEYS, columnNames), null,
new HashDistributionDesc(bucketNum, Lists.newArrayList("key1")), properties, null, "");
secondStmt.analyze(analyzer);
createTable("create table " + dbName + "." + tableName1 + " (\n" +
" `k1` int NULL COMMENT \"\",\n" +
" `k2` varchar(10) NULL COMMENT \"\"\n" +
") ENGINE=OLAP\n" +
"DUPLICATE KEY(`k1`, `k2`)\n" +
"COMMENT \"OLAP\"\n" +
"DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 1\n" +
"PROPERTIES (\n" +
" \"replication_num\" = \"1\",\n" +
" \"colocate_with\" = \"" + groupName + "\"\n" +
");");
expectedEx.expect(DdlException.class);
expectedEx.expectMessage("Colocate tables must have same replication num: 3");
catalog.createTable(secondStmt);
expectedEx.expectMessage("Colocate tables must have same replication num: 1");
createTable("create table " + dbName + "." + tableName2 + " (\n" +
" `k1` int NULL COMMENT \"\",\n" +
" `k2` varchar(10) NULL COMMENT \"\"\n" +
") ENGINE=OLAP\n" +
"DUPLICATE KEY(`k1`, `k2`)\n" +
"COMMENT \"OLAP\"\n" +
"DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 1\n" +
"PROPERTIES (\n" +
" \"replication_num\" = \"2\",\n" +
" \"colocate_with\" = \"" + groupName + "\"\n" +
");");
}
@Test
public void testDistributionColumnsSize() throws Exception {
int bucketNum = 1;
createOneTable(bucketNum, properties);
properties.put(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH, groupName1);
CreateTableStmt childStmt = new CreateTableStmt(false, false, dbTableName2, columnDefs, "olap",
new KeysDesc(KeysType.AGG_KEYS, columnNames), null,
new HashDistributionDesc(bucketNum, Lists.newArrayList("key1", "key2")), properties, null, "");
childStmt.analyze(analyzer);
createTable("create table " + dbName + "." + tableName1 + " (\n" +
" `k1` int NULL COMMENT \"\",\n" +
" `k2` varchar(10) NULL COMMENT \"\"\n" +
") ENGINE=OLAP\n" +
"DUPLICATE KEY(`k1`, `k2`)\n" +
"COMMENT \"OLAP\"\n" +
"DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 1\n" +
"PROPERTIES (\n" +
" \"replication_num\" = \"1\",\n" +
" \"colocate_with\" = \"" + groupName + "\"\n" +
");");
expectedEx.expect(DdlException.class);
expectedEx.expectMessage("Colocate tables distribution columns size must be same : 1");
catalog.createTable(childStmt);
expectedEx.expectMessage("Colocate tables distribution columns size must be same : 2");
createTable("create table " + dbName + "." + tableName2 + " (\n" +
" `k1` int NULL COMMENT \"\",\n" +
" `k2` varchar(10) NULL COMMENT \"\"\n" +
") ENGINE=OLAP\n" +
"DUPLICATE KEY(`k1`, `k2`)\n" +
"COMMENT \"OLAP\"\n" +
"DISTRIBUTED BY HASH(`k1`) BUCKETS 1\n" +
"PROPERTIES (\n" +
" \"replication_num\" = \"1\",\n" +
" \"colocate_with\" = \"" + groupName + "\"\n" +
");");
}
@Test
public void testDistributionColumnsType() throws Exception {
int bucketNum = 1;
createOneTable(bucketNum, properties);
properties.put(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH, groupName1);
CreateTableStmt childStmt = new CreateTableStmt(false, false, dbTableName2, columnDefs, "olap",
new KeysDesc(KeysType.AGG_KEYS, columnNames), null,
new HashDistributionDesc(bucketNum, Lists.newArrayList("key2")), properties, null, "");
childStmt.analyze(analyzer);
createTable("create table " + dbName + "." + tableName1 + " (\n" +
" `k1` int NULL COMMENT \"\",\n" +
" `k2` int NULL COMMENT \"\"\n" +
") ENGINE=OLAP\n" +
"DUPLICATE KEY(`k1`, `k2`)\n" +
"COMMENT \"OLAP\"\n" +
"DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 1\n" +
"PROPERTIES (\n" +
" \"replication_num\" = \"1\",\n" +
" \"colocate_with\" = \"" + groupName + "\"\n" +
");");
expectedEx.expect(DdlException.class);
expectedEx.expectMessage(
"Colocate tables distribution columns must have the same data type: key2 should be INT");
catalog.createTable(childStmt);
expectedEx.expectMessage("Colocate tables distribution columns must have the same data type: k2 should be INT");
createTable("create table " + dbName + "." + tableName2 + " (\n" +
" `k1` int NULL COMMENT \"\",\n" +
" `k2` varchar(10) NULL COMMENT \"\"\n" +
") ENGINE=OLAP\n" +
"DUPLICATE KEY(`k1`, `k2`)\n" +
"COMMENT \"OLAP\"\n" +
"DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 1\n" +
"PROPERTIES (\n" +
" \"replication_num\" = \"1\",\n" +
" \"colocate_with\" = \"" + groupName + "\"\n" +
");");
}
}

View File

@ -20,6 +20,7 @@ package org.apache.doris.catalog;
import org.apache.doris.analysis.CreateDbStmt;
import org.apache.doris.analysis.CreateTableStmt;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ConfigBase;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ExceptionChecker;
import org.apache.doris.qe.ConnectContext;
@ -49,7 +50,7 @@ public class CreateTableTest {
CreateDbStmt createDbStmt = (CreateDbStmt) UtFrameUtils.parseAndAnalyzeStmt(createDbStmtStr, connectContext);
Catalog.getCurrentCatalog().createDb(createDbStmt);
}
@AfterClass
public static void tearDown() {
File file = new File(runningDir);
@ -62,7 +63,7 @@ public class CreateTableTest {
}
@Test
public void testNormal() {
public void testNormal() throws DdlException {
ExceptionChecker.expectThrowsNoException(
() -> createTable("create table test.tbl1\n" + "(k1 int, k2 int)\n" + "duplicate key(k1)\n"
+ "distributed by hash(k2) buckets 1\n" + "properties('replication_num' = '1'); "));
@ -96,6 +97,11 @@ public class CreateTableTest {
+ "partition by range(k2)\n" + "(partition p1 values less than(\"10\"))\n"
+ "distributed by hash(k2) buckets 1\n" + "properties('replication_num' = '1');"));
ConfigBase.setMutableConfig("enable_strict_storage_medium_check", "false");
ExceptionChecker
.expectThrowsNoException(() -> createTable("create table test.tb7(key1 int, key2 varchar(10)) \n"
+ "distributed by hash(key1) buckets 1 properties('replication_num' = '1', 'storage_medium' = 'ssd');"));
Database db = Catalog.getCurrentCatalog().getDb("default_cluster:test");
OlapTable tbl6 = (OlapTable) db.getTable("tbl6");
Assert.assertTrue(tbl6.getColumn("k1").isKey());
@ -109,7 +115,7 @@ public class CreateTableTest {
}
@Test
public void testAbormal() {
public void testAbormal() throws DdlException {
ExceptionChecker.expectThrowsWithMsg(DdlException.class,
"Floating point type column can not be distribution column",
() -> createTable("create table test.atbl1\n" + "(k1 int, k2 float)\n" + "duplicate key(k1)\n"
@ -147,5 +153,11 @@ public class CreateTableTest {
() -> createTable("create table test.atbl6\n" + "(k1 int, k2 int, k3 int)\n"
+ "duplicate key(k1, k2, k3)\n" + "distributed by hash(k1) buckets 1\n"
+ "properties('replication_num' = '1');"));
ConfigBase.setMutableConfig("enable_strict_storage_medium_check", "true");
ExceptionChecker
.expectThrowsWithMsg(DdlException.class, "Failed to find enough host with storage medium is SSD in all backends. need: 1",
() -> createTable("create table test.tb7(key1 int, key2 varchar(10)) distributed by hash(key1) \n"
+ "buckets 1 properties('replication_num' = '1', 'storage_medium' = 'ssd');"));
}
}

View File

@ -17,10 +17,12 @@
package org.apache.doris.utframe;
import com.google.common.collect.ImmutableMap;
import org.apache.doris.analysis.CreateDbStmt;
import org.apache.doris.analysis.CreateTableStmt;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DiskInfo;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
@ -30,6 +32,8 @@ import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.Planner;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.utframe.MockedBackendFactory.DefaultBeThriftServiceImpl;
import org.apache.doris.utframe.MockedBackendFactory.DefaultHeartbeatServiceImpl;
@ -107,12 +111,20 @@ public class AnotherDemoTest {
backend.start();
// add be
List<Pair<String, Integer>> bes = Lists.newArrayList();
bes.add(Pair.create(backend.getHost(), backend.getHeartbeatPort()));
Catalog.getCurrentSystemInfo().addBackends(bes, false, "default_cluster");
Backend be = new Backend(10001, backend.getHost(), backend.getHeartbeatPort());
Map<String, DiskInfo> disks = Maps.newHashMap();
DiskInfo diskInfo1 = new DiskInfo("/path1");
diskInfo1.setTotalCapacityB(1000000);
diskInfo1.setAvailableCapacityB(500000);
diskInfo1.setDataUsedCapacityB(480000);
disks.put(diskInfo1.getRootPath(), diskInfo1);
be.setDisks(ImmutableMap.copyOf(disks));
be.setAlive(true);
be.setOwnerClusterName(SystemInfoService.DEFAULT_CLUSTER);
Catalog.getCurrentSystemInfo().addBackend(be);
// sleep to wait first heartbeat
Thread.sleep(5000);
Thread.sleep(6000);
}
@AfterClass

View File

@ -17,12 +17,14 @@
package org.apache.doris.utframe;
import com.google.common.collect.ImmutableMap;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.SqlParser;
import org.apache.doris.analysis.SqlScanner;
import org.apache.doris.analysis.StatementBase;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.DiskInfo;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
@ -33,6 +35,7 @@ import org.apache.doris.planner.Planner;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QueryState;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TNetworkAddress;
@ -55,6 +58,7 @@ import java.io.StringReader;
import java.net.ServerSocket;
import java.nio.channels.SocketChannel;
import java.nio.file.Files;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -166,9 +170,17 @@ public class UtFrameUtils {
backend.start();
// add be
List<Pair<String, Integer>> bes = Lists.newArrayList();
bes.add(Pair.create(backend.getHost(), backend.getHeartbeatPort()));
Catalog.getCurrentSystemInfo().addBackends(bes, false, "default_cluster");
Backend be = new Backend(10001, backend.getHost(), backend.getHeartbeatPort());
Map<String, DiskInfo> disks = Maps.newHashMap();
DiskInfo diskInfo1 = new DiskInfo("/path1");
diskInfo1.setTotalCapacityB(1000000);
diskInfo1.setAvailableCapacityB(500000);
diskInfo1.setDataUsedCapacityB(480000);
disks.put(diskInfo1.getRootPath(), diskInfo1);
be.setDisks(ImmutableMap.copyOf(disks));
be.setAlive(true);
be.setOwnerClusterName(SystemInfoService.DEFAULT_CLUSTER);
Catalog.getCurrentSystemInfo().addBackend(be);
// sleep to wait first heartbeat
Thread.sleep(6000);