From 100eeb18cdfcaefc677e18cb72c02ababbb85f99 Mon Sep 17 00:00:00 2001 From: kangkaisen Date: Thu, 31 Jan 2019 19:23:12 +0800 Subject: [PATCH] Add test for colocate table (#587) --- .../org/apache/doris/catalog/Catalog.java | 35 +- .../doris/catalog/ColocateTableIndex.java | 86 +++- .../doris/catalog/ColocateTableUtils.java | 3 +- .../java/org/apache/doris/catalog/Table.java | 2 +- .../org/apache/doris/clone/CloneChecker.java | 6 +- .../doris/http/meta/ColocateMetaService.java | 10 +- .../apache/doris/master/ReportHandler.java | 2 +- .../doris/planner/DistributedPlanner.java | 8 +- .../doris/catalog/ColocateTableTest.java | 476 ++++++++++++++++++ 9 files changed, 585 insertions(+), 43 deletions(-) create mode 100644 fe/src/test/java/org/apache/doris/catalog/ColocateTableTest.java diff --git a/fe/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/src/main/java/org/apache/doris/catalog/Catalog.java index 61a2502569..96df9a00de 100644 --- a/fe/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/src/main/java/org/apache/doris/catalog/Catalog.java @@ -2849,7 +2849,7 @@ public class Catalog { distributionInfo = defaultDistributionInfo; } - if (olapTable.getColocateTable() != null) { + if (Catalog.getCurrentColocateIndex().isColocateTable(olapTable.getId())) { ColocateTableUtils.checkReplicationNum(rangePartitionInfo, singlePartitionDesc.getReplicationNum()); ColocateTableUtils.checkBucketNum(olapTable.getDefaultDistributionInfo(), distributionInfo ); } @@ -3122,7 +3122,7 @@ public class Catalog { if (newReplicationNum == oldReplicationNum) { newReplicationNum = (short) -1; - } else if (olapTable.getColocateTable() != null) { + } else if (Catalog.getCurrentColocateIndex().isColocateTable(olapTable.getId())) { ErrorReport.reportDdlException(ErrorCode.ERR_COLOCATE_TABLE_MUST_SAME_REPLICAT_NUM, oldReplicationNum); } @@ -3395,7 +3395,9 @@ public class Catalog { ColocateTableUtils.checkDistributionColumnSizeAndType((OlapTable) parentTable, distributionInfo); - getColocateTableIndex().addTableToGroup(db.getId(), tableId, parentTable.getId()); + //for C -> B, B -> A. we need get table A id + long groupId = getColocateTableIndex().getGroup(parentTable.getId()); + getColocateTableIndex().addTableToGroup(db.getId(), tableId, groupId); } else { getColocateTableIndex().addTableToGroup(db.getId(), tableId, tableId); } @@ -3455,7 +3457,7 @@ public class Catalog { short replicationNum = FeConstants.default_replication_num; try { replicationNum = PropertyAnalyzer.analyzeReplicationNum(properties, replicationNum); - if (olapTable.getColocateTable() != null && !olapTable.getColocateTable().equalsIgnoreCase(tableName)) { + if (getColocateTableIndex().isColocateChildTable(olapTable.getId())) { Table parentTable = ColocateTableUtils.getColocateTable(db, olapTable.getColocateTable()); ColocateTableUtils.checkReplicationNum(((OlapTable)parentTable).getPartitionInfo(), replicationNum); } @@ -3485,7 +3487,7 @@ public class Catalog { // and then check if there still has unknown properties PropertyAnalyzer.analyzeDataProperty(stmt.getProperties(), DataProperty.DEFAULT_HDD_DATA_PROPERTY); PropertyAnalyzer.analyzeReplicationNum(properties, FeConstants.default_replication_num); - if (olapTable.getColocateTable() != null && !olapTable.getColocateTable().equalsIgnoreCase(tableName)) { + if (getColocateTableIndex().isColocateChildTable(olapTable.getId())) { Table parentTable = ColocateTableUtils.getColocateTable(db, olapTable.getColocateTable()); ColocateTableUtils.checkReplicationNum((OlapTable)parentTable, partitionInfo); } @@ -3532,8 +3534,9 @@ public class Catalog { } //we have added these index to memory, only need to persist here - if (olapTable.getColocateTable() != null) { - Long groupId = ColocateTableUtils.getColocateTable(db, olapTable.getColocateTable()).getId(); + if (getColocateTableIndex().isColocateTable(tableId)) { + long colocateTableId = ColocateTableUtils.getColocateTable(db, olapTable.getColocateTable()).getId(); + long groupId = getColocateTableIndex().getGroup(colocateTableId); ColocatePersistInfo info; if (getColocateTableIndex().isColocateParentTable(tableId)) { List> backendsPerBucketSeq = getColocateTableIndex().getBackendsPerBucketSeq(groupId); @@ -3541,7 +3544,7 @@ public class Catalog { } else { info = ColocatePersistInfo.CreateForAddTable(tableId, groupId, db.getId(), new ArrayList<>()); } - editLog.logColocateAddTable(info); + Catalog.getInstance().getEditLog().logColocateAddTable(info); } LOG.info("successfully create table[{};{}]", tableName, tableId); @@ -3552,12 +3555,8 @@ public class Catalog { } //only remove from memory, because we have not persist it - if (olapTable.getColocateTable() != null) { + if (getColocateTableIndex().isColocateTable(tableId)) { getColocateTableIndex().removeTable(tableId); - - if (getColocateTableIndex().isColocateParentTable(tableId)) { - getColocateTableIndex().removeBackendsPerBucketSeq(tableId); - } } throw e; @@ -4091,8 +4090,8 @@ public class Catalog { String tableName = stmt.getTableName(); // check database - Database db = this.fullNameToDb.get(dbName); - if (fullNameToDb.get(dbName) == null) { + Database db = getDb(dbName); + if (db == null) { ErrorReport.reportDdlException(ErrorCode.ERR_BAD_DB_ERROR, dbName); } @@ -4123,11 +4122,11 @@ public class Catalog { unprotectDropTable(db, table.getId()); DropInfo info = new DropInfo(db.getId(), table.getId(), -1L); - editLog.logDropTable(info); - + Catalog.getInstance().getEditLog().logDropTable(info); + if (Catalog.getCurrentColocateIndex().removeTable(table.getId())) { ColocatePersistInfo colocateInfo = ColocatePersistInfo.CreateForRemoveTable(table.getId()); - editLog.logColocateRemoveTable(colocateInfo); + Catalog.getInstance().getEditLog().logColocateRemoveTable(colocateInfo); } } finally { db.writeUnlock(); diff --git a/fe/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java b/fe/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java index f91dfa477e..65f8473457 100644 --- a/fe/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java +++ b/fe/src/main/java/org/apache/doris/catalog/ColocateTableIndex.java @@ -129,6 +129,10 @@ public class ColocateTableIndex implements Writable { try { if (groupId == tableId) { //for parent table + for(Long id: group2Tables.get(groupId)) { + table2Group.remove(id); + } + group2Tables.removeAll(groupId); group2BackendsPerBucketSeq.remove(groupId); group2DB.remove(groupId); @@ -136,17 +140,9 @@ public class ColocateTableIndex implements Writable { } else { //for child table group2Tables.remove(groupId, tableId); + table2Group.remove(tableId); } - table2Group.remove(tableId); - } finally { - writeUnlock(); - } - } - public void removeBackendsPerBucketSeq(long groupId) { - writeLock(); - try { - group2BackendsPerBucketSeq.remove(groupId); } finally { writeUnlock(); } @@ -166,6 +162,16 @@ public class ColocateTableIndex implements Writable { } + public boolean isColocateChildTable(long tableId) { + readLock(); + try { + return table2Group.containsKey(tableId) && !group2Tables.containsKey(tableId); + } finally { + readUnlock(); + } + + } + public boolean isColocateTable(long tableId) { readLock(); try { @@ -184,6 +190,54 @@ public class ColocateTableIndex implements Writable { } } + public boolean isSameGroup(long table1, long table2) { + readLock(); + try { + if (table2Group.containsKey(table1) && table2Group.containsKey(table2)) { + return table2Group.get(table1).equals(table2Group.get(table2)); + } + return false; + } finally { + readUnlock(); + } + } + + public Multimap getGroup2Tables() { + readLock(); + try { + return group2Tables; + } finally { + readUnlock(); + } + } + + public Map getTable2Group() { + readLock(); + try { + return table2Group; + } finally { + readUnlock(); + } + } + + public Map getGroup2DB() { + readLock(); + try { + return group2DB; + } finally { + readUnlock(); + } + } + + public Map>> getGroup2BackendsPerBucketSeq() { + readLock(); + try { + return group2BackendsPerBucketSeq; + } finally { + readUnlock(); + } + } + public Set getBalancingGroupIds() { return balancingGroups; } @@ -278,6 +332,20 @@ public class ColocateTableIndex implements Writable { removeTable(info.getTableId()); } + // only for test + public void clear() { + writeLock(); + try { + group2Tables.clear(); + table2Group.clear(); + group2DB.clear(); + group2BackendsPerBucketSeq.clear(); + balancingGroups.clear(); + } finally { + writeUnlock(); + } + } + @Override public void write(DataOutput out) throws IOException { int size = group2Tables.asMap().size(); diff --git a/fe/src/main/java/org/apache/doris/catalog/ColocateTableUtils.java b/fe/src/main/java/org/apache/doris/catalog/ColocateTableUtils.java index 2397bd013a..270cb4b5f4 100644 --- a/fe/src/main/java/org/apache/doris/catalog/ColocateTableUtils.java +++ b/fe/src/main/java/org/apache/doris/catalog/ColocateTableUtils.java @@ -92,11 +92,10 @@ public class ColocateTableUtils { } for (int i = 0; i < parentColumnSize; i++) { - String parentColumnName = parentColumns.get(i).getName(); Type parentColumnType = parentColumns.get(i).getType(); if (!parentColumnType.equals(childColumns.get(i).getType())) { ErrorReport.reportDdlException(ErrorCode.ERR_COLOCATE_TABLE_SAME_DISTRIBUTED_COLUMNS_TYPE, - parentColumnName, parentColumnType); + childColumns.get(i).getName(), parentColumnType); } } } diff --git a/fe/src/main/java/org/apache/doris/catalog/Table.java b/fe/src/main/java/org/apache/doris/catalog/Table.java index b9995631d5..4c4c93f489 100644 --- a/fe/src/main/java/org/apache/doris/catalog/Table.java +++ b/fe/src/main/java/org/apache/doris/catalog/Table.java @@ -278,7 +278,7 @@ public class Table extends MetaObject implements Writable { OlapTable olapTable = (OlapTable) this; - if (!Strings.isNullOrEmpty(olapTable.getColocateTable())) { + if (Catalog.getCurrentColocateIndex().isColocateTable(olapTable.getId())) { LOG.info("table {} is a colocate table, skip tablet scheduler.", name); return false; } diff --git a/fe/src/main/java/org/apache/doris/clone/CloneChecker.java b/fe/src/main/java/org/apache/doris/clone/CloneChecker.java index 9489ef08c3..794682a247 100644 --- a/fe/src/main/java/org/apache/doris/clone/CloneChecker.java +++ b/fe/src/main/java/org/apache/doris/clone/CloneChecker.java @@ -137,7 +137,7 @@ public class CloneChecker extends Daemon { return false; } - if (olapTable.getColocateTable() != null) { + if (Catalog.getCurrentColocateIndex().isColocateTable(olapTable.getId())) { LOG.debug("{} is colocate table, ColocateTableBalancer will handle. Skip", olapTable.getName()); return false; } @@ -431,7 +431,7 @@ public class CloneChecker extends Daemon { OlapTable olapTable = (OlapTable) table; - if (olapTable.getColocateTable() != null) { + if (Catalog.getCurrentColocateIndex().isColocateTable(olapTable.getId())) { LOG.debug("{} is colocate table, ColocateTableBalancer will handle. Skip", olapTable.getName()); continue; } @@ -957,7 +957,7 @@ public class CloneChecker extends Daemon { return; } - if (olapTable.getColocateTable() != null) { + if (Catalog.getCurrentColocateIndex().isColocateTable(olapTable.getId())) { LOG.debug("{} is colocate table, ColocateTableBalancer will handle. Skip", olapTable.getName()); return ; } diff --git a/fe/src/main/java/org/apache/doris/http/meta/ColocateMetaService.java b/fe/src/main/java/org/apache/doris/http/meta/ColocateMetaService.java index bfa18c3f01..3219b2a7d4 100644 --- a/fe/src/main/java/org/apache/doris/http/meta/ColocateMetaService.java +++ b/fe/src/main/java/org/apache/doris/http/meta/ColocateMetaService.java @@ -165,11 +165,11 @@ public class ColocateMetaService { long tableId = getTableId(request); LOG.info("will delete table {} from colocate meta", tableId); - Catalog.getCurrentColocateIndex().removeTable(tableId); - ColocatePersistInfo colocateInfo = ColocatePersistInfo.CreateForRemoveTable(tableId); - Catalog.getInstance().getEditLog().logColocateRemoveTable(colocateInfo); - LOG.info("table {} has deleted from colocate meta", tableId); - + if (Catalog.getCurrentColocateIndex().removeTable(tableId)) { + ColocatePersistInfo colocateInfo = ColocatePersistInfo.CreateForRemoveTable(tableId); + Catalog.getInstance().getEditLog().logColocateRemoveTable(colocateInfo); + LOG.info("table {} has deleted from colocate meta", tableId); + } sendResult(request, response); } } diff --git a/fe/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/src/main/java/org/apache/doris/master/ReportHandler.java index a5e7a68b34..60d92b3e7a 100644 --- a/fe/src/main/java/org/apache/doris/master/ReportHandler.java +++ b/fe/src/main/java/org/apache/doris/master/ReportHandler.java @@ -809,7 +809,7 @@ public class ReportHandler extends Daemon { } // colocate table will delete Replica in meta when balance - if (olapTable.getColocateTable() != null) { + if (Catalog.getCurrentColocateIndex().isColocateTable(olapTable.getId())) { return; } diff --git a/fe/src/main/java/org/apache/doris/planner/DistributedPlanner.java b/fe/src/main/java/org/apache/doris/planner/DistributedPlanner.java index be5617daa4..d2cd3f1c91 100644 --- a/fe/src/main/java/org/apache/doris/planner/DistributedPlanner.java +++ b/fe/src/main/java/org/apache/doris/planner/DistributedPlanner.java @@ -455,17 +455,17 @@ public class DistributedPlanner { OlapTable leftTable = ((OlapScanNode) leftRoot).getOlapTable(); OlapTable rightTable = ((OlapScanNode) rightRoot).getOlapTable(); + ColocateTableIndex colocateIndex = Catalog.getCurrentColocateIndex(); + //1 the table must be colocate - if (leftTable.getColocateTable() == null - || !leftTable.getColocateTable().equalsIgnoreCase(rightTable.getColocateTable())) { + if (!colocateIndex.isSameGroup(leftTable.getId(), rightTable.getId())) { return false; } //2 the colocate group must be stable - ColocateTableIndex colocateIndex = Catalog.getCurrentColocateIndex(); long groupId = colocateIndex.getGroup(leftTable.getId()); if (colocateIndex.isGroupBalancing(groupId)) { - LOG.warn("colocate group {} is balancing", leftTable.getColocateTable()); + LOG.info("colocate group {} is balancing", leftTable.getColocateTable()); return false; } diff --git a/fe/src/test/java/org/apache/doris/catalog/ColocateTableTest.java b/fe/src/test/java/org/apache/doris/catalog/ColocateTableTest.java new file mode 100644 index 0000000000..d072a9cedb --- /dev/null +++ b/fe/src/test/java/org/apache/doris/catalog/ColocateTableTest.java @@ -0,0 +1,476 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.catalog; + +import com.google.common.collect.Lists; +import mockit.Expectations; +import mockit.Injectable; +import mockit.Mock; +import mockit.MockUp; +import org.apache.doris.analysis.Analyzer; +import org.apache.doris.analysis.ColumnDef; +import org.apache.doris.analysis.CreateTableStmt; +import org.apache.doris.analysis.DropTableStmt; +import org.apache.doris.analysis.HashDistributionDesc; +import org.apache.doris.analysis.KeysDesc; +import org.apache.doris.analysis.TableName; +import org.apache.doris.analysis.TypeDef; +import org.apache.doris.common.Config; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.util.PropertyAnalyzer; +import org.apache.doris.mysql.privilege.PaloAuth; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.persist.EditLog; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.system.SystemInfoService; +import org.apache.doris.task.AgentBatchTask; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +public class ColocateTableTest { + private TableName dbTableName1; + private TableName dbTableName2; + private TableName dbTableName3; + private String tableName1 = "t1"; + private String tableName2 = "t2"; + private String tableName3 = "t3"; + private String clusterName = "default"; + private List beIds = Lists.newArrayList(); + private List columnNames = Lists.newArrayList(); + private List columnDefs = Lists.newArrayList(); + private Map properties = new HashMap(); + + private Catalog catalog; + private Database db = new Database(); + private Analyzer analyzer; + + @Injectable + private ConnectContext connectContext; + @Injectable + private SystemInfoService systemInfoService; + @Injectable + private PaloAuth paloAuth; + @Injectable + private EditLog editLog; + + @Rule + public ExpectedException expectedEx = ExpectedException.none(); + + @Before + public void setUp() throws Exception { + String dbName = "testDb"; + dbTableName1 = new TableName(dbName, tableName1); + dbTableName2 = new TableName(dbName, tableName2); + dbTableName3 = new TableName(dbName, tableName3); + + beIds.add(1L); + beIds.add(2L); + beIds.add(3L); + + columnNames.add("key1"); + columnNames.add("key2"); + + columnDefs.add(new ColumnDef("key1", new TypeDef(ScalarType.createType(PrimitiveType.INT)))); + columnDefs.add(new ColumnDef("key2", new TypeDef(ScalarType.createVarchar(10)))); + + catalog = Catalog.getInstance(); + analyzer = new Analyzer(catalog, connectContext); + + new Expectations(analyzer) { + { + analyzer.getClusterName(); + result = clusterName; + } + }; + + dbTableName1.analyze(analyzer); + dbTableName2.analyze(analyzer); + dbTableName3.analyze(analyzer); + + Config.disable_colocate_join = false; + + Catalog.getInstance().getColocateTableIndex().clear(); + + new Expectations(catalog) { + { + catalog.getDb(anyString); + result = db; + catalog.getDb(anyLong); + result = db; + + Catalog.getCurrentSystemInfo(); + result = systemInfoService; + + systemInfoService.checkClusterCapacity(anyString); + systemInfoService.seqChooseBackendIds(anyInt, true, true, anyString); + result = beIds; + + catalog.getAuth(); + result = paloAuth; + paloAuth.checkTblPriv((ConnectContext) any, anyString, anyString, PrivPredicate.CREATE); + result = true; + paloAuth.checkTblPriv((ConnectContext) any, anyString, anyString, PrivPredicate.DROP); + result = true; + + catalog.getEditLog(); + result = editLog; + } + }; + + new MockUp() { + @Mock + void run() { + return; + } + }; + + new MockUp() { + @Mock + boolean await(long timeout, TimeUnit unit) { + return true; + } + }; + } + + private void CreateParentTable(int numBecket, Map properties) throws Exception { + properties.put(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH, tableName1); + + CreateTableStmt stmt = new CreateTableStmt(false, false, dbTableName1, columnDefs, "olap", + new KeysDesc(KeysType.AGG_KEYS, columnNames), null, + new HashDistributionDesc(numBecket, Lists.newArrayList("key1")), properties, null); + stmt.analyze(analyzer); + catalog.createTable(stmt); + } + + @Test + public void testCreateAndDropParentTable() throws Exception { + int numBecket = 1; + + CreateParentTable(numBecket, properties); + + ColocateTableIndex index = Catalog.getCurrentColocateIndex(); + long tableId = db.getTable(tableName1).getId(); + + Assert.assertEquals(1, index.getGroup2DB().size()); + Assert.assertEquals(1, index.getGroup2Tables().size()); + Assert.assertEquals(1, index.getAllGroupIds().size()); + Assert.assertEquals(1, index.getTable2Group().size()); + Assert.assertEquals(1, index.getGroup2BackendsPerBucketSeq().size()); + Assert.assertEquals(0, index.getBalancingGroupIds().size()); + + Assert.assertTrue(index.isColocateTable(tableId)); + Assert.assertTrue(index.isColocateParentTable(tableId)); + Assert.assertFalse(index.isColocateChildTable(tableId)); + + Assert.assertEquals(tableId, index.getGroup(tableId)); + + Long dbId = db.getId(); + Assert.assertEquals(index.getDB(tableId), dbId); + + List backendIds = index.getBackendsPerBucketSeq(tableId).get(0); + Assert.assertEquals(beIds, backendIds); + + DropTableStmt dropTableStmt = new DropTableStmt(false, dbTableName1); + dropTableStmt.analyze(analyzer); + catalog.dropTable(dropTableStmt); + + Assert.assertEquals(0, index.getGroup2DB().size()); + Assert.assertEquals(0, index.getGroup2Tables().size()); + Assert.assertEquals(0, index.getAllGroupIds().size()); + Assert.assertEquals(0, index.getTable2Group().size()); + Assert.assertEquals(0, index.getGroup2BackendsPerBucketSeq().size()); + Assert.assertEquals(0, index.getBalancingGroupIds().size()); + } + + @Test + public void testCreateAndDropParentTableWithOneChild() throws Exception { + int numBecket = 1; + + CreateParentTable(numBecket, properties); + + properties.put(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH, tableName1); + CreateTableStmt childStmt = new CreateTableStmt(false, false, dbTableName2, columnDefs, "olap", + new KeysDesc(KeysType.AGG_KEYS, columnNames), null, + new HashDistributionDesc(numBecket, Lists.newArrayList("key1")), properties, null); + childStmt.analyze(analyzer); + catalog.createTable(childStmt); + + ColocateTableIndex index = Catalog.getCurrentColocateIndex(); + long parentId = db.getTable(tableName1).getId(); + long childId = db.getTable(tableName2).getId(); + + Assert.assertEquals(1, index.getGroup2DB().size()); + Assert.assertEquals(2, index.getGroup2Tables().size()); + Assert.assertEquals(1, index.getAllGroupIds().size()); + Assert.assertEquals(2, index.getTable2Group().size()); + Assert.assertEquals(1, index.getGroup2BackendsPerBucketSeq().size()); + Assert.assertEquals(0, index.getBalancingGroupIds().size()); + + Assert.assertTrue(index.isColocateTable(parentId)); + Assert.assertTrue(index.isColocateParentTable(parentId)); + Assert.assertFalse(index.isColocateChildTable(parentId)); + + Assert.assertTrue(index.isColocateTable(childId)); + Assert.assertFalse(index.isColocateParentTable(childId)); + Assert.assertTrue(index.isColocateChildTable(childId)); + + Assert.assertEquals(parentId, index.getGroup(parentId)); + Assert.assertEquals(parentId, index.getGroup(childId)); + + Assert.assertTrue(index.isSameGroup(parentId, childId)); + + DropTableStmt dropTableStmt = new DropTableStmt(false, dbTableName2); + dropTableStmt.analyze(analyzer); + catalog.dropTable(dropTableStmt); + + Assert.assertEquals(1, index.getGroup2DB().size()); + Assert.assertEquals(1, index.getGroup2Tables().size()); + Assert.assertEquals(1, index.getTable2Group().size()); + Assert.assertEquals(1, index.getGroup2BackendsPerBucketSeq().size()); + Assert.assertEquals(0, index.getBalancingGroupIds().size()); + + Assert.assertTrue(index.isColocateTable(parentId)); + Assert.assertTrue(index.isColocateParentTable(parentId)); + Assert.assertFalse(index.isColocateChildTable(parentId)); + + Assert.assertFalse(index.isColocateTable(childId)); + Assert.assertFalse(index.isColocateParentTable(childId)); + Assert.assertFalse(index.isColocateChildTable(childId)); + + Assert.assertFalse(index.isSameGroup(parentId, childId)); + } + + @Test + // C -> B, B -> A + public void testCreateAndDropMultilevelColocateTable() throws Exception { + int numBecket = 1; + + CreateParentTable(numBecket, properties); + + properties.put(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH, tableName1); + CreateTableStmt childStmt = new CreateTableStmt(false, false, dbTableName2, columnDefs, "olap", + new KeysDesc(KeysType.AGG_KEYS, columnNames), null, + new HashDistributionDesc(numBecket, Lists.newArrayList("key1")), properties, null); + childStmt.analyze(analyzer); + catalog.createTable(childStmt); + + properties.put(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH, tableName2); + CreateTableStmt grandchildStmt = new CreateTableStmt(false, false, dbTableName3, columnDefs, "olap", + new KeysDesc(KeysType.AGG_KEYS, columnNames), null, + new HashDistributionDesc(numBecket, Lists.newArrayList("key1")), properties, null); + grandchildStmt.analyze(analyzer); + catalog.createTable(grandchildStmt); + + ColocateTableIndex index = Catalog.getCurrentColocateIndex(); + + long parentId = db.getTable(tableName1).getId(); + long childId = db.getTable(tableName2).getId(); + long grandchildId = db.getTable(tableName3).getId(); + + Assert.assertEquals(1, index.getGroup2DB().size()); + Assert.assertEquals(3, index.getGroup2Tables().size()); + Assert.assertEquals(1, index.getAllGroupIds().size()); + Assert.assertEquals(3, index.getTable2Group().size()); + Assert.assertEquals(1, index.getGroup2BackendsPerBucketSeq().size()); + Assert.assertEquals(0, index.getBalancingGroupIds().size()); + + Assert.assertTrue(index.isColocateTable(parentId)); + Assert.assertTrue(index.isColocateParentTable(parentId)); + Assert.assertFalse(index.isColocateChildTable(parentId)); + + Assert.assertTrue(index.isColocateTable(childId)); + Assert.assertFalse(index.isColocateParentTable(childId)); + Assert.assertTrue(index.isColocateChildTable(childId)); + + Assert.assertTrue(index.isColocateTable(grandchildId)); + Assert.assertFalse(index.isColocateParentTable(grandchildId)); + Assert.assertTrue(index.isColocateChildTable(grandchildId)); + + Assert.assertEquals(parentId, index.getGroup(parentId)); + Assert.assertEquals(parentId, index.getGroup(childId)); + Assert.assertEquals(parentId, index.getGroup(grandchildId)); + + Assert.assertTrue(index.isSameGroup(parentId, childId)); + Assert.assertTrue(index.isSameGroup(parentId, grandchildId)); + Assert.assertTrue(index.isSameGroup(childId, grandchildId)); + + DropTableStmt dropTableStmt = new DropTableStmt(false, dbTableName2); + dropTableStmt.analyze(analyzer); + catalog.dropTable(dropTableStmt); + + Assert.assertEquals(1, index.getGroup2DB().size()); + Assert.assertEquals(2, index.getGroup2Tables().size()); + Assert.assertEquals(2, index.getTable2Group().size()); + Assert.assertEquals(1, index.getGroup2BackendsPerBucketSeq().size()); + Assert.assertEquals(0, index.getBalancingGroupIds().size()); + + Assert.assertTrue(index.isColocateTable(parentId)); + Assert.assertTrue(index.isColocateParentTable(parentId)); + Assert.assertFalse(index.isColocateChildTable(parentId)); + + Assert.assertFalse(index.isColocateTable(childId)); + Assert.assertFalse(index.isColocateParentTable(childId)); + Assert.assertFalse(index.isColocateChildTable(childId)); + + Assert.assertTrue(index.isColocateTable(grandchildId)); + Assert.assertFalse(index.isColocateParentTable(grandchildId)); + Assert.assertTrue(index.isColocateChildTable(grandchildId)); + + Assert.assertEquals(parentId, index.getGroup(parentId)); + expectedEx.expect(IllegalStateException.class); + index.getGroup(childId); + Assert.assertEquals(parentId, index.getGroup(grandchildId)); + + Assert.assertFalse(index.isSameGroup(parentId, childId)); + Assert.assertTrue(index.isSameGroup(parentId, grandchildId)); + Assert.assertFalse(index.isSameGroup(childId, grandchildId)); + } + + @Test + public void testBucketNum() throws Exception { + int parentBecketNum = 1; + + CreateParentTable(parentBecketNum, properties); + + properties.put(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH, tableName1); + int childBecketNum = 2; + CreateTableStmt childStmt = new CreateTableStmt(false, false, dbTableName2, columnDefs, "olap", + new KeysDesc(KeysType.AGG_KEYS, columnNames), null, + new HashDistributionDesc(childBecketNum, Lists.newArrayList("key1")), properties, null); + childStmt.analyze(analyzer); + + expectedEx.expect(DdlException.class); + expectedEx.expectMessage("Colocate tables must have the same bucket num: 1"); + + catalog.createTable(childStmt); + } + + @Test + public void testReplicationNum() throws Exception { + int bucketNum = 1; + + CreateParentTable(bucketNum, properties); + + properties.put(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH, tableName1); + properties.put(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM, "2"); + CreateTableStmt childStmt = new CreateTableStmt(false, false, dbTableName2, columnDefs, "olap", + new KeysDesc(KeysType.AGG_KEYS, columnNames), null, + new HashDistributionDesc(bucketNum, Lists.newArrayList("key1")), properties, null); + childStmt.analyze(analyzer); + + expectedEx.expect(DdlException.class); + expectedEx.expectMessage("Colocate tables must have the same replication num: 3"); + + catalog.createTable(childStmt); + } + + @Test + public void testDistributionColumnsSize() throws Exception { + int bucketNum = 1; + + CreateParentTable(bucketNum, properties); + + properties.put(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH, tableName1); + CreateTableStmt childStmt = new CreateTableStmt(false, false, dbTableName2, columnDefs, "olap", + new KeysDesc(KeysType.AGG_KEYS, columnNames), null, + new HashDistributionDesc(bucketNum, Lists.newArrayList("key1", "key2")), properties, null); + childStmt.analyze(analyzer); + + expectedEx.expect(DdlException.class); + expectedEx.expectMessage("Colocate table distribution columns size must be same : 1"); + + catalog.createTable(childStmt); + } + + @Test + public void testDistributionColumnsType() throws Exception { + int bucketNum = 1; + + CreateParentTable(bucketNum, properties); + + properties.put(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH, tableName1); + CreateTableStmt childStmt = new CreateTableStmt(false, false, dbTableName2, columnDefs, "olap", + new KeysDesc(KeysType.AGG_KEYS, columnNames), null, + new HashDistributionDesc(bucketNum, Lists.newArrayList("key2")), properties, null); + childStmt.analyze(analyzer); + + expectedEx.expect(DdlException.class); + expectedEx.expectMessage("Colocate table distribution columns must have the same data type: key2 should be INT"); + + catalog.createTable(childStmt); + } + + @Test + public void testParentTableNotExist() throws Exception { + String tableName = "t8"; + Map properties = new HashMap(); + properties.put(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH, tableName); + + int bucketNum = 1; + CreateTableStmt parentStmt = new CreateTableStmt(false, false, dbTableName1, columnDefs, "olap", + new KeysDesc(KeysType.AGG_KEYS, columnNames), null, + new HashDistributionDesc(bucketNum, Lists.newArrayList("key1")), properties, null); + parentStmt.analyze(analyzer); + + expectedEx.expect(DdlException.class); + expectedEx.expectMessage(String.format("Colocate table '%s' no exist", tableName)); + + catalog.createTable(parentStmt); + } + + @Test + public void testParentTableType() throws Exception { + Table mysqlTable = new MysqlTable(); + String mysqlTableName = "mysqlTable"; + + new Expectations(mysqlTable) { + { + mysqlTable.getName(); + result = mysqlTableName; + } + }; + + new Expectations(db) { + { + db.getTable(tableName1); + result = mysqlTable; + } + }; + + Map properties = new HashMap(); + properties.put(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH, tableName1); + CreateTableStmt parentStmt = new CreateTableStmt(false, false, dbTableName2, columnDefs, "olap", + new KeysDesc(KeysType.AGG_KEYS, columnNames), null, + new HashDistributionDesc(1, Lists.newArrayList("key1")), properties, null); + parentStmt.analyze(analyzer); + + expectedEx.expect(DdlException.class); + expectedEx.expectMessage(String.format("Colocate tables '%s' must be OLAP table", mysqlTableName)); + + catalog.createTable(parentStmt); + } +}