Add test for colocate table (#587)
This commit is contained in:
@ -2849,7 +2849,7 @@ public class Catalog {
|
||||
distributionInfo = defaultDistributionInfo;
|
||||
}
|
||||
|
||||
if (olapTable.getColocateTable() != null) {
|
||||
if (Catalog.getCurrentColocateIndex().isColocateTable(olapTable.getId())) {
|
||||
ColocateTableUtils.checkReplicationNum(rangePartitionInfo, singlePartitionDesc.getReplicationNum());
|
||||
ColocateTableUtils.checkBucketNum(olapTable.getDefaultDistributionInfo(), distributionInfo );
|
||||
}
|
||||
@ -3122,7 +3122,7 @@ public class Catalog {
|
||||
|
||||
if (newReplicationNum == oldReplicationNum) {
|
||||
newReplicationNum = (short) -1;
|
||||
} else if (olapTable.getColocateTable() != null) {
|
||||
} else if (Catalog.getCurrentColocateIndex().isColocateTable(olapTable.getId())) {
|
||||
ErrorReport.reportDdlException(ErrorCode.ERR_COLOCATE_TABLE_MUST_SAME_REPLICAT_NUM, oldReplicationNum);
|
||||
}
|
||||
|
||||
@ -3395,7 +3395,9 @@ public class Catalog {
|
||||
|
||||
ColocateTableUtils.checkDistributionColumnSizeAndType((OlapTable) parentTable, distributionInfo);
|
||||
|
||||
getColocateTableIndex().addTableToGroup(db.getId(), tableId, parentTable.getId());
|
||||
//for C -> B, B -> A. we need get table A id
|
||||
long groupId = getColocateTableIndex().getGroup(parentTable.getId());
|
||||
getColocateTableIndex().addTableToGroup(db.getId(), tableId, groupId);
|
||||
} else {
|
||||
getColocateTableIndex().addTableToGroup(db.getId(), tableId, tableId);
|
||||
}
|
||||
@ -3455,7 +3457,7 @@ public class Catalog {
|
||||
short replicationNum = FeConstants.default_replication_num;
|
||||
try {
|
||||
replicationNum = PropertyAnalyzer.analyzeReplicationNum(properties, replicationNum);
|
||||
if (olapTable.getColocateTable() != null && !olapTable.getColocateTable().equalsIgnoreCase(tableName)) {
|
||||
if (getColocateTableIndex().isColocateChildTable(olapTable.getId())) {
|
||||
Table parentTable = ColocateTableUtils.getColocateTable(db, olapTable.getColocateTable());
|
||||
ColocateTableUtils.checkReplicationNum(((OlapTable)parentTable).getPartitionInfo(), replicationNum);
|
||||
}
|
||||
@ -3485,7 +3487,7 @@ public class Catalog {
|
||||
// and then check if there still has unknown properties
|
||||
PropertyAnalyzer.analyzeDataProperty(stmt.getProperties(), DataProperty.DEFAULT_HDD_DATA_PROPERTY);
|
||||
PropertyAnalyzer.analyzeReplicationNum(properties, FeConstants.default_replication_num);
|
||||
if (olapTable.getColocateTable() != null && !olapTable.getColocateTable().equalsIgnoreCase(tableName)) {
|
||||
if (getColocateTableIndex().isColocateChildTable(olapTable.getId())) {
|
||||
Table parentTable = ColocateTableUtils.getColocateTable(db, olapTable.getColocateTable());
|
||||
ColocateTableUtils.checkReplicationNum((OlapTable)parentTable, partitionInfo);
|
||||
}
|
||||
@ -3532,8 +3534,9 @@ public class Catalog {
|
||||
}
|
||||
|
||||
//we have added these index to memory, only need to persist here
|
||||
if (olapTable.getColocateTable() != null) {
|
||||
Long groupId = ColocateTableUtils.getColocateTable(db, olapTable.getColocateTable()).getId();
|
||||
if (getColocateTableIndex().isColocateTable(tableId)) {
|
||||
long colocateTableId = ColocateTableUtils.getColocateTable(db, olapTable.getColocateTable()).getId();
|
||||
long groupId = getColocateTableIndex().getGroup(colocateTableId);
|
||||
ColocatePersistInfo info;
|
||||
if (getColocateTableIndex().isColocateParentTable(tableId)) {
|
||||
List<List<Long>> backendsPerBucketSeq = getColocateTableIndex().getBackendsPerBucketSeq(groupId);
|
||||
@ -3541,7 +3544,7 @@ public class Catalog {
|
||||
} else {
|
||||
info = ColocatePersistInfo.CreateForAddTable(tableId, groupId, db.getId(), new ArrayList<>());
|
||||
}
|
||||
editLog.logColocateAddTable(info);
|
||||
Catalog.getInstance().getEditLog().logColocateAddTable(info);
|
||||
}
|
||||
|
||||
LOG.info("successfully create table[{};{}]", tableName, tableId);
|
||||
@ -3552,12 +3555,8 @@ public class Catalog {
|
||||
}
|
||||
|
||||
//only remove from memory, because we have not persist it
|
||||
if (olapTable.getColocateTable() != null) {
|
||||
if (getColocateTableIndex().isColocateTable(tableId)) {
|
||||
getColocateTableIndex().removeTable(tableId);
|
||||
|
||||
if (getColocateTableIndex().isColocateParentTable(tableId)) {
|
||||
getColocateTableIndex().removeBackendsPerBucketSeq(tableId);
|
||||
}
|
||||
}
|
||||
|
||||
throw e;
|
||||
@ -4091,8 +4090,8 @@ public class Catalog {
|
||||
String tableName = stmt.getTableName();
|
||||
|
||||
// check database
|
||||
Database db = this.fullNameToDb.get(dbName);
|
||||
if (fullNameToDb.get(dbName) == null) {
|
||||
Database db = getDb(dbName);
|
||||
if (db == null) {
|
||||
ErrorReport.reportDdlException(ErrorCode.ERR_BAD_DB_ERROR, dbName);
|
||||
}
|
||||
|
||||
@ -4123,11 +4122,11 @@ public class Catalog {
|
||||
unprotectDropTable(db, table.getId());
|
||||
|
||||
DropInfo info = new DropInfo(db.getId(), table.getId(), -1L);
|
||||
editLog.logDropTable(info);
|
||||
|
||||
Catalog.getInstance().getEditLog().logDropTable(info);
|
||||
|
||||
if (Catalog.getCurrentColocateIndex().removeTable(table.getId())) {
|
||||
ColocatePersistInfo colocateInfo = ColocatePersistInfo.CreateForRemoveTable(table.getId());
|
||||
editLog.logColocateRemoveTable(colocateInfo);
|
||||
Catalog.getInstance().getEditLog().logColocateRemoveTable(colocateInfo);
|
||||
}
|
||||
} finally {
|
||||
db.writeUnlock();
|
||||
|
||||
@ -129,6 +129,10 @@ public class ColocateTableIndex implements Writable {
|
||||
try {
|
||||
if (groupId == tableId) {
|
||||
//for parent table
|
||||
for(Long id: group2Tables.get(groupId)) {
|
||||
table2Group.remove(id);
|
||||
}
|
||||
|
||||
group2Tables.removeAll(groupId);
|
||||
group2BackendsPerBucketSeq.remove(groupId);
|
||||
group2DB.remove(groupId);
|
||||
@ -136,17 +140,9 @@ public class ColocateTableIndex implements Writable {
|
||||
} else {
|
||||
//for child table
|
||||
group2Tables.remove(groupId, tableId);
|
||||
table2Group.remove(tableId);
|
||||
}
|
||||
table2Group.remove(tableId);
|
||||
} finally {
|
||||
writeUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
public void removeBackendsPerBucketSeq(long groupId) {
|
||||
writeLock();
|
||||
try {
|
||||
group2BackendsPerBucketSeq.remove(groupId);
|
||||
} finally {
|
||||
writeUnlock();
|
||||
}
|
||||
@ -166,6 +162,16 @@ public class ColocateTableIndex implements Writable {
|
||||
|
||||
}
|
||||
|
||||
public boolean isColocateChildTable(long tableId) {
|
||||
readLock();
|
||||
try {
|
||||
return table2Group.containsKey(tableId) && !group2Tables.containsKey(tableId);
|
||||
} finally {
|
||||
readUnlock();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public boolean isColocateTable(long tableId) {
|
||||
readLock();
|
||||
try {
|
||||
@ -184,6 +190,54 @@ public class ColocateTableIndex implements Writable {
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isSameGroup(long table1, long table2) {
|
||||
readLock();
|
||||
try {
|
||||
if (table2Group.containsKey(table1) && table2Group.containsKey(table2)) {
|
||||
return table2Group.get(table1).equals(table2Group.get(table2));
|
||||
}
|
||||
return false;
|
||||
} finally {
|
||||
readUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
public Multimap<Long, Long> getGroup2Tables() {
|
||||
readLock();
|
||||
try {
|
||||
return group2Tables;
|
||||
} finally {
|
||||
readUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
public Map<Long, Long> getTable2Group() {
|
||||
readLock();
|
||||
try {
|
||||
return table2Group;
|
||||
} finally {
|
||||
readUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
public Map<Long, Long> getGroup2DB() {
|
||||
readLock();
|
||||
try {
|
||||
return group2DB;
|
||||
} finally {
|
||||
readUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
public Map<Long, List<List<Long>>> getGroup2BackendsPerBucketSeq() {
|
||||
readLock();
|
||||
try {
|
||||
return group2BackendsPerBucketSeq;
|
||||
} finally {
|
||||
readUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
public Set<Long> getBalancingGroupIds() {
|
||||
return balancingGroups;
|
||||
}
|
||||
@ -278,6 +332,20 @@ public class ColocateTableIndex implements Writable {
|
||||
removeTable(info.getTableId());
|
||||
}
|
||||
|
||||
// only for test
|
||||
public void clear() {
|
||||
writeLock();
|
||||
try {
|
||||
group2Tables.clear();
|
||||
table2Group.clear();
|
||||
group2DB.clear();
|
||||
group2BackendsPerBucketSeq.clear();
|
||||
balancingGroups.clear();
|
||||
} finally {
|
||||
writeUnlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(DataOutput out) throws IOException {
|
||||
int size = group2Tables.asMap().size();
|
||||
|
||||
@ -92,11 +92,10 @@ public class ColocateTableUtils {
|
||||
}
|
||||
|
||||
for (int i = 0; i < parentColumnSize; i++) {
|
||||
String parentColumnName = parentColumns.get(i).getName();
|
||||
Type parentColumnType = parentColumns.get(i).getType();
|
||||
if (!parentColumnType.equals(childColumns.get(i).getType())) {
|
||||
ErrorReport.reportDdlException(ErrorCode.ERR_COLOCATE_TABLE_SAME_DISTRIBUTED_COLUMNS_TYPE,
|
||||
parentColumnName, parentColumnType);
|
||||
childColumns.get(i).getName(), parentColumnType);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -278,7 +278,7 @@ public class Table extends MetaObject implements Writable {
|
||||
|
||||
OlapTable olapTable = (OlapTable) this;
|
||||
|
||||
if (!Strings.isNullOrEmpty(olapTable.getColocateTable())) {
|
||||
if (Catalog.getCurrentColocateIndex().isColocateTable(olapTable.getId())) {
|
||||
LOG.info("table {} is a colocate table, skip tablet scheduler.", name);
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -137,7 +137,7 @@ public class CloneChecker extends Daemon {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (olapTable.getColocateTable() != null) {
|
||||
if (Catalog.getCurrentColocateIndex().isColocateTable(olapTable.getId())) {
|
||||
LOG.debug("{} is colocate table, ColocateTableBalancer will handle. Skip", olapTable.getName());
|
||||
return false;
|
||||
}
|
||||
@ -431,7 +431,7 @@ public class CloneChecker extends Daemon {
|
||||
|
||||
OlapTable olapTable = (OlapTable) table;
|
||||
|
||||
if (olapTable.getColocateTable() != null) {
|
||||
if (Catalog.getCurrentColocateIndex().isColocateTable(olapTable.getId())) {
|
||||
LOG.debug("{} is colocate table, ColocateTableBalancer will handle. Skip", olapTable.getName());
|
||||
continue;
|
||||
}
|
||||
@ -957,7 +957,7 @@ public class CloneChecker extends Daemon {
|
||||
return;
|
||||
}
|
||||
|
||||
if (olapTable.getColocateTable() != null) {
|
||||
if (Catalog.getCurrentColocateIndex().isColocateTable(olapTable.getId())) {
|
||||
LOG.debug("{} is colocate table, ColocateTableBalancer will handle. Skip", olapTable.getName());
|
||||
return ;
|
||||
}
|
||||
|
||||
@ -165,11 +165,11 @@ public class ColocateMetaService {
|
||||
long tableId = getTableId(request);
|
||||
|
||||
LOG.info("will delete table {} from colocate meta", tableId);
|
||||
Catalog.getCurrentColocateIndex().removeTable(tableId);
|
||||
ColocatePersistInfo colocateInfo = ColocatePersistInfo.CreateForRemoveTable(tableId);
|
||||
Catalog.getInstance().getEditLog().logColocateRemoveTable(colocateInfo);
|
||||
LOG.info("table {} has deleted from colocate meta", tableId);
|
||||
|
||||
if (Catalog.getCurrentColocateIndex().removeTable(tableId)) {
|
||||
ColocatePersistInfo colocateInfo = ColocatePersistInfo.CreateForRemoveTable(tableId);
|
||||
Catalog.getInstance().getEditLog().logColocateRemoveTable(colocateInfo);
|
||||
LOG.info("table {} has deleted from colocate meta", tableId);
|
||||
}
|
||||
sendResult(request, response);
|
||||
}
|
||||
}
|
||||
|
||||
@ -809,7 +809,7 @@ public class ReportHandler extends Daemon {
|
||||
}
|
||||
|
||||
// colocate table will delete Replica in meta when balance
|
||||
if (olapTable.getColocateTable() != null) {
|
||||
if (Catalog.getCurrentColocateIndex().isColocateTable(olapTable.getId())) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
@ -455,17 +455,17 @@ public class DistributedPlanner {
|
||||
OlapTable leftTable = ((OlapScanNode) leftRoot).getOlapTable();
|
||||
OlapTable rightTable = ((OlapScanNode) rightRoot).getOlapTable();
|
||||
|
||||
ColocateTableIndex colocateIndex = Catalog.getCurrentColocateIndex();
|
||||
|
||||
//1 the table must be colocate
|
||||
if (leftTable.getColocateTable() == null
|
||||
|| !leftTable.getColocateTable().equalsIgnoreCase(rightTable.getColocateTable())) {
|
||||
if (!colocateIndex.isSameGroup(leftTable.getId(), rightTable.getId())) {
|
||||
return false;
|
||||
}
|
||||
|
||||
//2 the colocate group must be stable
|
||||
ColocateTableIndex colocateIndex = Catalog.getCurrentColocateIndex();
|
||||
long groupId = colocateIndex.getGroup(leftTable.getId());
|
||||
if (colocateIndex.isGroupBalancing(groupId)) {
|
||||
LOG.warn("colocate group {} is balancing", leftTable.getColocateTable());
|
||||
LOG.info("colocate group {} is balancing", leftTable.getColocateTable());
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
476
fe/src/test/java/org/apache/doris/catalog/ColocateTableTest.java
Normal file
476
fe/src/test/java/org/apache/doris/catalog/ColocateTableTest.java
Normal file
@ -0,0 +1,476 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.catalog;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import mockit.Expectations;
|
||||
import mockit.Injectable;
|
||||
import mockit.Mock;
|
||||
import mockit.MockUp;
|
||||
import org.apache.doris.analysis.Analyzer;
|
||||
import org.apache.doris.analysis.ColumnDef;
|
||||
import org.apache.doris.analysis.CreateTableStmt;
|
||||
import org.apache.doris.analysis.DropTableStmt;
|
||||
import org.apache.doris.analysis.HashDistributionDesc;
|
||||
import org.apache.doris.analysis.KeysDesc;
|
||||
import org.apache.doris.analysis.TableName;
|
||||
import org.apache.doris.analysis.TypeDef;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.util.PropertyAnalyzer;
|
||||
import org.apache.doris.mysql.privilege.PaloAuth;
|
||||
import org.apache.doris.mysql.privilege.PrivPredicate;
|
||||
import org.apache.doris.persist.EditLog;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.system.SystemInfoService;
|
||||
import org.apache.doris.task.AgentBatchTask;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.ExpectedException;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class ColocateTableTest {
|
||||
private TableName dbTableName1;
|
||||
private TableName dbTableName2;
|
||||
private TableName dbTableName3;
|
||||
private String tableName1 = "t1";
|
||||
private String tableName2 = "t2";
|
||||
private String tableName3 = "t3";
|
||||
private String clusterName = "default";
|
||||
private List<Long> beIds = Lists.newArrayList();
|
||||
private List<String> columnNames = Lists.newArrayList();
|
||||
private List<ColumnDef> columnDefs = Lists.newArrayList();
|
||||
private Map<String, String> properties = new HashMap<String, String>();
|
||||
|
||||
private Catalog catalog;
|
||||
private Database db = new Database();
|
||||
private Analyzer analyzer;
|
||||
|
||||
@Injectable
|
||||
private ConnectContext connectContext;
|
||||
@Injectable
|
||||
private SystemInfoService systemInfoService;
|
||||
@Injectable
|
||||
private PaloAuth paloAuth;
|
||||
@Injectable
|
||||
private EditLog editLog;
|
||||
|
||||
@Rule
|
||||
public ExpectedException expectedEx = ExpectedException.none();
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
String dbName = "testDb";
|
||||
dbTableName1 = new TableName(dbName, tableName1);
|
||||
dbTableName2 = new TableName(dbName, tableName2);
|
||||
dbTableName3 = new TableName(dbName, tableName3);
|
||||
|
||||
beIds.add(1L);
|
||||
beIds.add(2L);
|
||||
beIds.add(3L);
|
||||
|
||||
columnNames.add("key1");
|
||||
columnNames.add("key2");
|
||||
|
||||
columnDefs.add(new ColumnDef("key1", new TypeDef(ScalarType.createType(PrimitiveType.INT))));
|
||||
columnDefs.add(new ColumnDef("key2", new TypeDef(ScalarType.createVarchar(10))));
|
||||
|
||||
catalog = Catalog.getInstance();
|
||||
analyzer = new Analyzer(catalog, connectContext);
|
||||
|
||||
new Expectations(analyzer) {
|
||||
{
|
||||
analyzer.getClusterName();
|
||||
result = clusterName;
|
||||
}
|
||||
};
|
||||
|
||||
dbTableName1.analyze(analyzer);
|
||||
dbTableName2.analyze(analyzer);
|
||||
dbTableName3.analyze(analyzer);
|
||||
|
||||
Config.disable_colocate_join = false;
|
||||
|
||||
Catalog.getInstance().getColocateTableIndex().clear();
|
||||
|
||||
new Expectations(catalog) {
|
||||
{
|
||||
catalog.getDb(anyString);
|
||||
result = db;
|
||||
catalog.getDb(anyLong);
|
||||
result = db;
|
||||
|
||||
Catalog.getCurrentSystemInfo();
|
||||
result = systemInfoService;
|
||||
|
||||
systemInfoService.checkClusterCapacity(anyString);
|
||||
systemInfoService.seqChooseBackendIds(anyInt, true, true, anyString);
|
||||
result = beIds;
|
||||
|
||||
catalog.getAuth();
|
||||
result = paloAuth;
|
||||
paloAuth.checkTblPriv((ConnectContext) any, anyString, anyString, PrivPredicate.CREATE);
|
||||
result = true;
|
||||
paloAuth.checkTblPriv((ConnectContext) any, anyString, anyString, PrivPredicate.DROP);
|
||||
result = true;
|
||||
|
||||
catalog.getEditLog();
|
||||
result = editLog;
|
||||
}
|
||||
};
|
||||
|
||||
new MockUp<AgentBatchTask>() {
|
||||
@Mock
|
||||
void run() {
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
new MockUp<CountDownLatch>() {
|
||||
@Mock
|
||||
boolean await(long timeout, TimeUnit unit) {
|
||||
return true;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private void CreateParentTable(int numBecket, Map<String, String> properties) throws Exception {
|
||||
properties.put(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH, tableName1);
|
||||
|
||||
CreateTableStmt stmt = new CreateTableStmt(false, false, dbTableName1, columnDefs, "olap",
|
||||
new KeysDesc(KeysType.AGG_KEYS, columnNames), null,
|
||||
new HashDistributionDesc(numBecket, Lists.newArrayList("key1")), properties, null);
|
||||
stmt.analyze(analyzer);
|
||||
catalog.createTable(stmt);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateAndDropParentTable() throws Exception {
|
||||
int numBecket = 1;
|
||||
|
||||
CreateParentTable(numBecket, properties);
|
||||
|
||||
ColocateTableIndex index = Catalog.getCurrentColocateIndex();
|
||||
long tableId = db.getTable(tableName1).getId();
|
||||
|
||||
Assert.assertEquals(1, index.getGroup2DB().size());
|
||||
Assert.assertEquals(1, index.getGroup2Tables().size());
|
||||
Assert.assertEquals(1, index.getAllGroupIds().size());
|
||||
Assert.assertEquals(1, index.getTable2Group().size());
|
||||
Assert.assertEquals(1, index.getGroup2BackendsPerBucketSeq().size());
|
||||
Assert.assertEquals(0, index.getBalancingGroupIds().size());
|
||||
|
||||
Assert.assertTrue(index.isColocateTable(tableId));
|
||||
Assert.assertTrue(index.isColocateParentTable(tableId));
|
||||
Assert.assertFalse(index.isColocateChildTable(tableId));
|
||||
|
||||
Assert.assertEquals(tableId, index.getGroup(tableId));
|
||||
|
||||
Long dbId = db.getId();
|
||||
Assert.assertEquals(index.getDB(tableId), dbId);
|
||||
|
||||
List<Long> backendIds = index.getBackendsPerBucketSeq(tableId).get(0);
|
||||
Assert.assertEquals(beIds, backendIds);
|
||||
|
||||
DropTableStmt dropTableStmt = new DropTableStmt(false, dbTableName1);
|
||||
dropTableStmt.analyze(analyzer);
|
||||
catalog.dropTable(dropTableStmt);
|
||||
|
||||
Assert.assertEquals(0, index.getGroup2DB().size());
|
||||
Assert.assertEquals(0, index.getGroup2Tables().size());
|
||||
Assert.assertEquals(0, index.getAllGroupIds().size());
|
||||
Assert.assertEquals(0, index.getTable2Group().size());
|
||||
Assert.assertEquals(0, index.getGroup2BackendsPerBucketSeq().size());
|
||||
Assert.assertEquals(0, index.getBalancingGroupIds().size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateAndDropParentTableWithOneChild() throws Exception {
|
||||
int numBecket = 1;
|
||||
|
||||
CreateParentTable(numBecket, properties);
|
||||
|
||||
properties.put(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH, tableName1);
|
||||
CreateTableStmt childStmt = new CreateTableStmt(false, false, dbTableName2, columnDefs, "olap",
|
||||
new KeysDesc(KeysType.AGG_KEYS, columnNames), null,
|
||||
new HashDistributionDesc(numBecket, Lists.newArrayList("key1")), properties, null);
|
||||
childStmt.analyze(analyzer);
|
||||
catalog.createTable(childStmt);
|
||||
|
||||
ColocateTableIndex index = Catalog.getCurrentColocateIndex();
|
||||
long parentId = db.getTable(tableName1).getId();
|
||||
long childId = db.getTable(tableName2).getId();
|
||||
|
||||
Assert.assertEquals(1, index.getGroup2DB().size());
|
||||
Assert.assertEquals(2, index.getGroup2Tables().size());
|
||||
Assert.assertEquals(1, index.getAllGroupIds().size());
|
||||
Assert.assertEquals(2, index.getTable2Group().size());
|
||||
Assert.assertEquals(1, index.getGroup2BackendsPerBucketSeq().size());
|
||||
Assert.assertEquals(0, index.getBalancingGroupIds().size());
|
||||
|
||||
Assert.assertTrue(index.isColocateTable(parentId));
|
||||
Assert.assertTrue(index.isColocateParentTable(parentId));
|
||||
Assert.assertFalse(index.isColocateChildTable(parentId));
|
||||
|
||||
Assert.assertTrue(index.isColocateTable(childId));
|
||||
Assert.assertFalse(index.isColocateParentTable(childId));
|
||||
Assert.assertTrue(index.isColocateChildTable(childId));
|
||||
|
||||
Assert.assertEquals(parentId, index.getGroup(parentId));
|
||||
Assert.assertEquals(parentId, index.getGroup(childId));
|
||||
|
||||
Assert.assertTrue(index.isSameGroup(parentId, childId));
|
||||
|
||||
DropTableStmt dropTableStmt = new DropTableStmt(false, dbTableName2);
|
||||
dropTableStmt.analyze(analyzer);
|
||||
catalog.dropTable(dropTableStmt);
|
||||
|
||||
Assert.assertEquals(1, index.getGroup2DB().size());
|
||||
Assert.assertEquals(1, index.getGroup2Tables().size());
|
||||
Assert.assertEquals(1, index.getTable2Group().size());
|
||||
Assert.assertEquals(1, index.getGroup2BackendsPerBucketSeq().size());
|
||||
Assert.assertEquals(0, index.getBalancingGroupIds().size());
|
||||
|
||||
Assert.assertTrue(index.isColocateTable(parentId));
|
||||
Assert.assertTrue(index.isColocateParentTable(parentId));
|
||||
Assert.assertFalse(index.isColocateChildTable(parentId));
|
||||
|
||||
Assert.assertFalse(index.isColocateTable(childId));
|
||||
Assert.assertFalse(index.isColocateParentTable(childId));
|
||||
Assert.assertFalse(index.isColocateChildTable(childId));
|
||||
|
||||
Assert.assertFalse(index.isSameGroup(parentId, childId));
|
||||
}
|
||||
|
||||
@Test
|
||||
// C -> B, B -> A
|
||||
public void testCreateAndDropMultilevelColocateTable() throws Exception {
|
||||
int numBecket = 1;
|
||||
|
||||
CreateParentTable(numBecket, properties);
|
||||
|
||||
properties.put(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH, tableName1);
|
||||
CreateTableStmt childStmt = new CreateTableStmt(false, false, dbTableName2, columnDefs, "olap",
|
||||
new KeysDesc(KeysType.AGG_KEYS, columnNames), null,
|
||||
new HashDistributionDesc(numBecket, Lists.newArrayList("key1")), properties, null);
|
||||
childStmt.analyze(analyzer);
|
||||
catalog.createTable(childStmt);
|
||||
|
||||
properties.put(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH, tableName2);
|
||||
CreateTableStmt grandchildStmt = new CreateTableStmt(false, false, dbTableName3, columnDefs, "olap",
|
||||
new KeysDesc(KeysType.AGG_KEYS, columnNames), null,
|
||||
new HashDistributionDesc(numBecket, Lists.newArrayList("key1")), properties, null);
|
||||
grandchildStmt.analyze(analyzer);
|
||||
catalog.createTable(grandchildStmt);
|
||||
|
||||
ColocateTableIndex index = Catalog.getCurrentColocateIndex();
|
||||
|
||||
long parentId = db.getTable(tableName1).getId();
|
||||
long childId = db.getTable(tableName2).getId();
|
||||
long grandchildId = db.getTable(tableName3).getId();
|
||||
|
||||
Assert.assertEquals(1, index.getGroup2DB().size());
|
||||
Assert.assertEquals(3, index.getGroup2Tables().size());
|
||||
Assert.assertEquals(1, index.getAllGroupIds().size());
|
||||
Assert.assertEquals(3, index.getTable2Group().size());
|
||||
Assert.assertEquals(1, index.getGroup2BackendsPerBucketSeq().size());
|
||||
Assert.assertEquals(0, index.getBalancingGroupIds().size());
|
||||
|
||||
Assert.assertTrue(index.isColocateTable(parentId));
|
||||
Assert.assertTrue(index.isColocateParentTable(parentId));
|
||||
Assert.assertFalse(index.isColocateChildTable(parentId));
|
||||
|
||||
Assert.assertTrue(index.isColocateTable(childId));
|
||||
Assert.assertFalse(index.isColocateParentTable(childId));
|
||||
Assert.assertTrue(index.isColocateChildTable(childId));
|
||||
|
||||
Assert.assertTrue(index.isColocateTable(grandchildId));
|
||||
Assert.assertFalse(index.isColocateParentTable(grandchildId));
|
||||
Assert.assertTrue(index.isColocateChildTable(grandchildId));
|
||||
|
||||
Assert.assertEquals(parentId, index.getGroup(parentId));
|
||||
Assert.assertEquals(parentId, index.getGroup(childId));
|
||||
Assert.assertEquals(parentId, index.getGroup(grandchildId));
|
||||
|
||||
Assert.assertTrue(index.isSameGroup(parentId, childId));
|
||||
Assert.assertTrue(index.isSameGroup(parentId, grandchildId));
|
||||
Assert.assertTrue(index.isSameGroup(childId, grandchildId));
|
||||
|
||||
DropTableStmt dropTableStmt = new DropTableStmt(false, dbTableName2);
|
||||
dropTableStmt.analyze(analyzer);
|
||||
catalog.dropTable(dropTableStmt);
|
||||
|
||||
Assert.assertEquals(1, index.getGroup2DB().size());
|
||||
Assert.assertEquals(2, index.getGroup2Tables().size());
|
||||
Assert.assertEquals(2, index.getTable2Group().size());
|
||||
Assert.assertEquals(1, index.getGroup2BackendsPerBucketSeq().size());
|
||||
Assert.assertEquals(0, index.getBalancingGroupIds().size());
|
||||
|
||||
Assert.assertTrue(index.isColocateTable(parentId));
|
||||
Assert.assertTrue(index.isColocateParentTable(parentId));
|
||||
Assert.assertFalse(index.isColocateChildTable(parentId));
|
||||
|
||||
Assert.assertFalse(index.isColocateTable(childId));
|
||||
Assert.assertFalse(index.isColocateParentTable(childId));
|
||||
Assert.assertFalse(index.isColocateChildTable(childId));
|
||||
|
||||
Assert.assertTrue(index.isColocateTable(grandchildId));
|
||||
Assert.assertFalse(index.isColocateParentTable(grandchildId));
|
||||
Assert.assertTrue(index.isColocateChildTable(grandchildId));
|
||||
|
||||
Assert.assertEquals(parentId, index.getGroup(parentId));
|
||||
expectedEx.expect(IllegalStateException.class);
|
||||
index.getGroup(childId);
|
||||
Assert.assertEquals(parentId, index.getGroup(grandchildId));
|
||||
|
||||
Assert.assertFalse(index.isSameGroup(parentId, childId));
|
||||
Assert.assertTrue(index.isSameGroup(parentId, grandchildId));
|
||||
Assert.assertFalse(index.isSameGroup(childId, grandchildId));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBucketNum() throws Exception {
|
||||
int parentBecketNum = 1;
|
||||
|
||||
CreateParentTable(parentBecketNum, properties);
|
||||
|
||||
properties.put(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH, tableName1);
|
||||
int childBecketNum = 2;
|
||||
CreateTableStmt childStmt = new CreateTableStmt(false, false, dbTableName2, columnDefs, "olap",
|
||||
new KeysDesc(KeysType.AGG_KEYS, columnNames), null,
|
||||
new HashDistributionDesc(childBecketNum, Lists.newArrayList("key1")), properties, null);
|
||||
childStmt.analyze(analyzer);
|
||||
|
||||
expectedEx.expect(DdlException.class);
|
||||
expectedEx.expectMessage("Colocate tables must have the same bucket num: 1");
|
||||
|
||||
catalog.createTable(childStmt);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReplicationNum() throws Exception {
|
||||
int bucketNum = 1;
|
||||
|
||||
CreateParentTable(bucketNum, properties);
|
||||
|
||||
properties.put(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH, tableName1);
|
||||
properties.put(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM, "2");
|
||||
CreateTableStmt childStmt = new CreateTableStmt(false, false, dbTableName2, columnDefs, "olap",
|
||||
new KeysDesc(KeysType.AGG_KEYS, columnNames), null,
|
||||
new HashDistributionDesc(bucketNum, Lists.newArrayList("key1")), properties, null);
|
||||
childStmt.analyze(analyzer);
|
||||
|
||||
expectedEx.expect(DdlException.class);
|
||||
expectedEx.expectMessage("Colocate tables must have the same replication num: 3");
|
||||
|
||||
catalog.createTable(childStmt);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDistributionColumnsSize() throws Exception {
|
||||
int bucketNum = 1;
|
||||
|
||||
CreateParentTable(bucketNum, properties);
|
||||
|
||||
properties.put(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH, tableName1);
|
||||
CreateTableStmt childStmt = new CreateTableStmt(false, false, dbTableName2, columnDefs, "olap",
|
||||
new KeysDesc(KeysType.AGG_KEYS, columnNames), null,
|
||||
new HashDistributionDesc(bucketNum, Lists.newArrayList("key1", "key2")), properties, null);
|
||||
childStmt.analyze(analyzer);
|
||||
|
||||
expectedEx.expect(DdlException.class);
|
||||
expectedEx.expectMessage("Colocate table distribution columns size must be same : 1");
|
||||
|
||||
catalog.createTable(childStmt);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDistributionColumnsType() throws Exception {
|
||||
int bucketNum = 1;
|
||||
|
||||
CreateParentTable(bucketNum, properties);
|
||||
|
||||
properties.put(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH, tableName1);
|
||||
CreateTableStmt childStmt = new CreateTableStmt(false, false, dbTableName2, columnDefs, "olap",
|
||||
new KeysDesc(KeysType.AGG_KEYS, columnNames), null,
|
||||
new HashDistributionDesc(bucketNum, Lists.newArrayList("key2")), properties, null);
|
||||
childStmt.analyze(analyzer);
|
||||
|
||||
expectedEx.expect(DdlException.class);
|
||||
expectedEx.expectMessage("Colocate table distribution columns must have the same data type: key2 should be INT");
|
||||
|
||||
catalog.createTable(childStmt);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParentTableNotExist() throws Exception {
|
||||
String tableName = "t8";
|
||||
Map<String, String> properties = new HashMap<String, String>();
|
||||
properties.put(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH, tableName);
|
||||
|
||||
int bucketNum = 1;
|
||||
CreateTableStmt parentStmt = new CreateTableStmt(false, false, dbTableName1, columnDefs, "olap",
|
||||
new KeysDesc(KeysType.AGG_KEYS, columnNames), null,
|
||||
new HashDistributionDesc(bucketNum, Lists.newArrayList("key1")), properties, null);
|
||||
parentStmt.analyze(analyzer);
|
||||
|
||||
expectedEx.expect(DdlException.class);
|
||||
expectedEx.expectMessage(String.format("Colocate table '%s' no exist", tableName));
|
||||
|
||||
catalog.createTable(parentStmt);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParentTableType() throws Exception {
|
||||
Table mysqlTable = new MysqlTable();
|
||||
String mysqlTableName = "mysqlTable";
|
||||
|
||||
new Expectations(mysqlTable) {
|
||||
{
|
||||
mysqlTable.getName();
|
||||
result = mysqlTableName;
|
||||
}
|
||||
};
|
||||
|
||||
new Expectations(db) {
|
||||
{
|
||||
db.getTable(tableName1);
|
||||
result = mysqlTable;
|
||||
}
|
||||
};
|
||||
|
||||
Map<String, String> properties = new HashMap<String, String>();
|
||||
properties.put(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH, tableName1);
|
||||
CreateTableStmt parentStmt = new CreateTableStmt(false, false, dbTableName2, columnDefs, "olap",
|
||||
new KeysDesc(KeysType.AGG_KEYS, columnNames), null,
|
||||
new HashDistributionDesc(1, Lists.newArrayList("key1")), properties, null);
|
||||
parentStmt.analyze(analyzer);
|
||||
|
||||
expectedEx.expect(DdlException.class);
|
||||
expectedEx.expectMessage(String.format("Colocate tables '%s' must be OLAP table", mysqlTableName));
|
||||
|
||||
catalog.createTable(parentStmt);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user