[feature](colocate) support cross database colocate join (#18152)

This commit is contained in:
Mingyu Chen
2023-04-03 14:03:42 +08:00
committed by GitHub
parent e260dca7a1
commit ecd3fd07f6
18 changed files with 476 additions and 127 deletions

View File

@ -90,6 +90,11 @@ public class ColocateGroupSchema implements Writable {
// distribution col type
for (int i = 0; i < distributionColTypes.size(); i++) {
Type targetColType = distributionColTypes.get(i);
// varchar and string has same distribution hash value if it's data is same
if (targetColType.isVarcharOrStringType() && info.getDistributionColumns().get(i).getType()
.isVarcharOrStringType()) {
continue;
}
if (!targetColType.equals(info.getDistributionColumns().get(i).getType())) {
ErrorReport.reportDdlException(ErrorCode.ERR_COLOCATE_TABLE_MUST_HAS_SAME_DISTRIBUTION_COLUMN_TYPE,
info.getDistributionColumns().get(i).getName(), targetColType);
@ -98,7 +103,7 @@ public class ColocateGroupSchema implements Writable {
}
}
public void checkReplicaAllocation(PartitionInfo partitionInfo) throws DdlException {
private void checkReplicaAllocation(PartitionInfo partitionInfo) throws DdlException {
for (ReplicaAllocation replicaAlloc : partitionInfo.idToReplicaAllocation.values()) {
if (!replicaAlloc.equals(this.replicaAlloc)) {
ErrorReport.reportDdlException(ErrorCode.ERR_COLOCATE_TABLE_MUST_HAS_SAME_REPLICATION_ALLOCATION,

View File

@ -22,6 +22,7 @@ import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.ColocatePersistInfo;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.resource.Tag;
@ -52,16 +53,23 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
/**
* maintain the colocate table related indexes and meta
* maintain the colocation table related indexes and meta
*/
public class ColocateTableIndex implements Writable {
private static final Logger LOG = LogManager.getLogger(ColocateTableIndex.class);
public static class GroupId implements Writable {
public static class GroupId implements Writable, GsonPostProcessable {
public static final String GLOBAL_COLOCATE_PREFIX = "__global__";
@SerializedName(value = "dbId")
public Long dbId;
@SerializedName(value = "grpId")
public Long grpId;
// only available when dbId = 0
// because for global colocate table, the dbId is 0, so we do not know which db the table belongs to,
// so we use tblId2DbId to record the dbId of each table
@SerializedName(value = "tblId2DbId")
private Map<Long, Long> tblId2DbId = Maps.newHashMap();
private GroupId() {
}
@ -71,6 +79,23 @@ public class ColocateTableIndex implements Writable {
this.grpId = grpId;
}
public void addTblId2DbId(long tblId, long dbId) {
Preconditions.checkState(this.dbId == 0);
tblId2DbId.put(tblId, dbId);
}
public void removeTblId2DbId(long tblId) {
tblId2DbId.remove(tblId);
}
public long getDbIdByTblId(long tblId) {
return tblId2DbId.get(tblId);
}
public int getTblId2DbIdSize() {
return tblId2DbId.size();
}
public static GroupId read(DataInput in) throws IOException {
if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_105) {
GroupId groupId = new GroupId();
@ -102,6 +127,13 @@ public class ColocateTableIndex implements Writable {
return dbId.equals(other.dbId) && grpId.equals(other.grpId);
}
@Override
public void gsonPostProcess() throws IOException {
if (tblId2DbId == null) {
tblId2DbId = Maps.newHashMap();
}
}
@Override
public int hashCode() {
int result = 17;
@ -114,6 +146,18 @@ public class ColocateTableIndex implements Writable {
public String toString() {
return dbId + "." + grpId;
}
public static String getFullGroupName(long dbId, String colocateGroup) {
if (colocateGroup.startsWith(GLOBAL_COLOCATE_PREFIX)) {
return colocateGroup;
} else {
return dbId + "_" + colocateGroup;
}
}
public static boolean isGlobalGroupName(String groupName) {
return groupName.startsWith(GLOBAL_COLOCATE_PREFIX);
}
}
// group_name -> group_id
@ -155,11 +199,10 @@ public class ColocateTableIndex implements Writable {
// NOTICE: call 'addTableToGroup()' will not modify 'group2BackendsPerBucketSeq'
// 'group2BackendsPerBucketSeq' need to be set manually before or after, if necessary.
public GroupId addTableToGroup(long dbId, OlapTable tbl, String groupName, GroupId assignedGroupId) {
public GroupId addTableToGroup(long dbId, OlapTable tbl, String fullGroupName, GroupId assignedGroupId) {
writeLock();
try {
GroupId groupId = null;
String fullGroupName = dbId + "_" + groupName;
if (groupName2Id.containsKey(fullGroupName)) {
groupId = groupName2Id.get(fullGroupName);
} else {
@ -168,7 +211,11 @@ public class ColocateTableIndex implements Writable {
groupId = assignedGroupId;
} else {
// generate a new one
groupId = new GroupId(dbId, Env.getCurrentEnv().getNextId());
if (GroupId.isGlobalGroupName(fullGroupName)) {
groupId = new GroupId(0, Env.getCurrentEnv().getNextId());
} else {
groupId = new GroupId(dbId, Env.getCurrentEnv().getNextId());
}
}
HashDistributionInfo distributionInfo = (HashDistributionInfo) tbl.getDefaultDistributionInfo();
ColocateGroupSchema groupSchema = new ColocateGroupSchema(groupId,
@ -178,6 +225,10 @@ public class ColocateTableIndex implements Writable {
group2Schema.put(groupId, groupSchema);
group2ErrMsgs.put(groupId, "");
}
// for global colocate table, dbId is 0, and we need to save the real dbId of the table
if (groupId.dbId == 0) {
groupId.addTblId2DbId(tbl.getId(), dbId);
}
group2Tables.put(groupId, tbl.getId());
table2Group.put(tbl.getId(), groupId);
return groupId;
@ -252,6 +303,7 @@ public class ColocateTableIndex implements Writable {
}
GroupId groupId = table2Group.remove(tableId);
groupId.removeTblId2DbId(tableId);
group2Tables.remove(groupId, tableId);
if (!group2Tables.containsKey(groupId)) {
// all tables of this group are removed, remove the group
@ -514,14 +566,19 @@ public class ColocateTableIndex implements Writable {
// remove from old group
removeTable(tbl.getId());
}
return addTableToGroup(dbId, tbl, newGroup, assignedGroupId);
String fullNewGroupName = GroupId.getFullGroupName(dbId, newGroup);
return addTableToGroup(dbId, tbl, fullNewGroupName, assignedGroupId);
} finally {
writeUnlock();
}
}
public void replayAddTableToGroup(ColocatePersistInfo info) throws MetaNotFoundException {
Database db = Env.getCurrentInternalCatalog().getDbOrMetaException(info.getGroupId().dbId);
long dbId = info.getGroupId().dbId;
if (dbId == 0) {
dbId = info.getGroupId().getDbIdByTblId(info.getTableId());
}
Database db = Env.getCurrentInternalCatalog().getDbOrMetaException(dbId);
OlapTable tbl = (OlapTable) db.getTableOrMetaException(info.getTableId(),
org.apache.doris.catalog.Table.TableType.OLAP);
writeLock();
@ -530,7 +587,8 @@ public class ColocateTableIndex implements Writable {
for (Map.Entry<Tag, List<List<Long>>> entry : map.entrySet()) {
group2BackendsPerBucketSeq.put(info.getGroupId(), entry.getKey(), entry.getValue());
}
addTableToGroup(info.getGroupId().dbId, tbl, tbl.getColocateGroup(), info.getGroupId());
String fullGroupName = GroupId.getFullGroupName(dbId, tbl.getColocateGroup());
addTableToGroup(dbId, tbl, fullGroupName, info.getGroupId());
} finally {
writeUnlock();
}

View File

@ -3885,23 +3885,23 @@ public class Env {
}
// the invoker should keep table's write lock
public void modifyTableColocate(Database db, OlapTable table, String colocateGroup, boolean isReplay,
public void modifyTableColocate(Database db, OlapTable table, String assignedGroup, boolean isReplay,
GroupId assignedGroupId)
throws DdlException {
String oldGroup = table.getColocateGroup();
GroupId groupId = null;
if (!Strings.isNullOrEmpty(colocateGroup)) {
String fullGroupName = db.getId() + "_" + colocateGroup;
if (!Strings.isNullOrEmpty(assignedGroup)) {
String fullAssignedGroupName = GroupId.getFullGroupName(db.getId(), assignedGroup);
//When the new name is the same as the old name, we return it to prevent npe
if (!Strings.isNullOrEmpty(oldGroup)) {
String oldFullGroupName = db.getId() + "_" + oldGroup;
if (oldFullGroupName.equals(fullGroupName)) {
String oldFullGroupName = GroupId.getFullGroupName(db.getId(), oldGroup);
if (oldFullGroupName.equals(fullAssignedGroupName)) {
LOG.warn("modify table[{}] group name same as old group name,skip.", table.getName());
return;
}
}
ColocateGroupSchema groupSchema = colocateTableIndex.getGroupSchema(fullGroupName);
ColocateGroupSchema groupSchema = colocateTableIndex.getGroupSchema(fullAssignedGroupName);
if (groupSchema == null) {
// user set a new colocate group,
// check if all partitions all this table has same buckets num and same replication number
@ -3938,7 +3938,7 @@ public class Env {
backendsPerBucketSeq = table.getArbitraryTabletBucketsSeq();
}
// change group after getting backends sequence(if has), in case 'getArbitraryTabletBucketsSeq' failed
groupId = colocateTableIndex.changeGroup(db.getId(), table, oldGroup, colocateGroup, assignedGroupId);
groupId = colocateTableIndex.changeGroup(db.getId(), table, oldGroup, assignedGroup, assignedGroupId);
if (groupSchema == null) {
Preconditions.checkNotNull(backendsPerBucketSeq);
@ -3948,7 +3948,7 @@ public class Env {
// set this group as unstable
colocateTableIndex.markGroupUnstable(groupId, "Colocation group modified by user",
false /* edit log is along with modify table log */);
table.setColocateGroup(colocateGroup);
table.setColocateGroup(assignedGroup);
} else {
// unset colocation group
if (Strings.isNullOrEmpty(oldGroup)) {
@ -3957,17 +3957,16 @@ public class Env {
}
// when replayModifyTableColocate, we need the groupId info
String fullGroupName = db.getId() + "_" + oldGroup;
String fullGroupName = GroupId.getFullGroupName(db.getId(), oldGroup);
groupId = colocateTableIndex.getGroupSchema(fullGroupName).getGroupId();
colocateTableIndex.removeTable(table.getId());
table.setColocateGroup(null);
}
if (!isReplay) {
Map<String, String> properties = Maps.newHashMapWithExpectedSize(1);
properties.put(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH, colocateGroup);
TablePropertyInfo info = new TablePropertyInfo(table.getId(), groupId, properties);
properties.put(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH, assignedGroup);
TablePropertyInfo info = new TablePropertyInfo(db.getId(), table.getId(), groupId, properties);
editLog.logModifyTableColocate(info);
}
LOG.info("finished modify table's colocation property. table: {}, is replay: {}", table.getName(), isReplay);
@ -3975,6 +3974,10 @@ public class Env {
public void replayModifyTableColocate(TablePropertyInfo info) throws MetaNotFoundException {
long dbId = info.getGroupId().dbId;
if (dbId == 0) {
dbId = info.getDbId();
}
Preconditions.checkState(dbId != 0, "replay modify table colocate failed, table id: " + info.getTableId());
long tableId = info.getTableId();
Map<String, String> properties = info.getPropertyMap();

View File

@ -146,11 +146,6 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon {
// get all groups
Set<GroupId> groupIds = colocateIndex.getAllGroupIds();
for (GroupId groupId : groupIds) {
Database db = env.getInternalCatalog().getDbNullable(groupId.dbId);
if (db == null) {
continue;
}
Table<String, Tag, ClusterLoadStatistic> statisticMap = env.getTabletScheduler().getStatisticMap();
if (statisticMap == null) {
continue;
@ -159,7 +154,7 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon {
ColocateGroupSchema groupSchema = colocateIndex.getGroupSchema(groupId);
ReplicaAllocation replicaAlloc = groupSchema.getReplicaAlloc();
try {
Env.getCurrentSystemInfo().checkReplicaAllocation(db.getClusterName(), replicaAlloc);
Env.getCurrentSystemInfo().checkReplicaAllocation(SystemInfoService.DEFAULT_CLUSTER, replicaAlloc);
} catch (DdlException e) {
colocateIndex.setErrMsgForGroup(groupId, e.getMessage());
continue;
@ -168,7 +163,7 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon {
for (Map.Entry<Tag, Short> entry : allocMap.entrySet()) {
Tag tag = entry.getKey();
ClusterLoadStatistic statistic = statisticMap.get(db.getClusterName(), tag);
ClusterLoadStatistic statistic = statisticMap.get(SystemInfoService.DEFAULT_CLUSTER, tag);
if (statistic == null) {
continue;
}
@ -182,7 +177,8 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon {
infoService, colocateIndex, groupId, tag);
// get all available backends for this group
Set<Long> beIdsInOtherTag = colocateIndex.getBackendIdsExceptForTag(groupId, tag);
List<Long> availableBeIds = getAvailableBeIds(db.getClusterName(), tag, beIdsInOtherTag, infoService);
List<Long> availableBeIds = getAvailableBeIds(SystemInfoService.DEFAULT_CLUSTER, tag, beIdsInOtherTag,
infoService);
// try relocate or balance this group for specified tag
List<List<Long>> balancedBackendsPerBucketSeq = Lists.newArrayList();
if (relocateAndBalance(groupId, tag, unavailableBeIdsInGroup, availableBeIds, colocateIndex,
@ -214,11 +210,6 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon {
Set<GroupId> groupIds = colocateIndex.getAllGroupIds();
for (GroupId groupId : groupIds) {
List<Long> tableIds = colocateIndex.getAllTableIds(groupId);
Database db = env.getInternalCatalog().getDbNullable(groupId.dbId);
if (db == null) {
continue;
}
List<Set<Long>> backendBucketsSeq = colocateIndex.getBackendsPerBucketSeqSet(groupId);
if (backendBucketsSeq.isEmpty()) {
continue;
@ -227,6 +218,14 @@ public class ColocateTableCheckerAndBalancer extends MasterDaemon {
String unstableReason = null;
OUT:
for (Long tableId : tableIds) {
long dbId = groupId.dbId;
if (dbId == 0) {
dbId = groupId.getDbIdByTblId(tableId);
}
Database db = env.getInternalCatalog().getDbNullable(dbId);
if (db == null) {
continue;
}
OlapTable olapTable = (OlapTable) db.getTableNullable(tableId);
if (olapTable == null || !colocateIndex.isColocateTable(olapTable.getId())) {
continue;

View File

@ -435,6 +435,7 @@ public class PropertyAnalyzer {
return bfFpp;
}
// analyze the colocation properties of table
public static String analyzeColocate(Map<String, String> properties) throws AnalysisException {
String colocateGroup = null;
if (properties != null && properties.containsKey(PROPERTIES_COLOCATE_WITH)) {

View File

@ -1443,7 +1443,7 @@ public class InternalCatalog implements CatalogIf<Database> {
// check colocation
if (Env.getCurrentColocateIndex().isColocateTable(olapTable.getId())) {
String fullGroupName = db.getId() + "_" + olapTable.getColocateGroup();
String fullGroupName = GroupId.getFullGroupName(db.getId(), olapTable.getColocateGroup());
ColocateGroupSchema groupSchema = Env.getCurrentColocateIndex().getGroupSchema(fullGroupName);
Preconditions.checkNotNull(groupSchema);
groupSchema.checkDistribution(distributionInfo);
@ -2066,7 +2066,7 @@ public class InternalCatalog implements CatalogIf<Database> {
if (defaultDistributionInfo.getType() == DistributionInfoType.RANDOM) {
throw new AnalysisException("Random distribution for colocate table is unsupported");
}
String fullGroupName = db.getId() + "_" + colocateGroup;
String fullGroupName = GroupId.getFullGroupName(db.getId(), colocateGroup);
ColocateGroupSchema groupSchema = Env.getCurrentColocateIndex().getGroupSchema(fullGroupName);
if (groupSchema != null) {
// group already exist, check if this table can be added to this group
@ -2075,7 +2075,7 @@ public class InternalCatalog implements CatalogIf<Database> {
}
// add table to this group, if group does not exist, create a new one
Env.getCurrentColocateIndex()
.addTableToGroup(db.getId(), olapTable, colocateGroup, null /* generate group id inside */);
.addTableToGroup(db.getId(), olapTable, fullGroupName, null /* generate group id inside */);
olapTable.setColocateGroup(colocateGroup);
}
} catch (AnalysisException e) {
@ -2277,7 +2277,7 @@ public class InternalCatalog implements CatalogIf<Database> {
if (result.second) {
if (Env.getCurrentColocateIndex().isColocateTable(tableId)) {
// if this is a colocate join table, its table id is already added to colocate group
// if this is a colocate table, its table id is already added to colocate group
// so we should remove the tableId here
Env.getCurrentColocateIndex().removeTable(tableId);
}

View File

@ -485,8 +485,7 @@ public class JournalEntity implements Writable {
break;
}
case OperationType.OP_MODIFY_TABLE_COLOCATE: {
data = new TablePropertyInfo();
((TablePropertyInfo) data).readFields(in);
data = TablePropertyInfo.read(in);
isRead = true;
break;
}

View File

@ -46,10 +46,6 @@ public class ColocatePersistInfo implements Writable {
@SerializedName(value = "backendsPerBucketSeq")
private Map<Tag, List<List<Long>>> backendsPerBucketSeq = Maps.newHashMap();
public ColocatePersistInfo() {
}
private ColocatePersistInfo(GroupId groupId, long tableId, Map<Tag, List<List<Long>>> backendsPerBucketSeq) {
this.groupId = groupId;
this.tableId = tableId;
@ -74,10 +70,6 @@ public class ColocatePersistInfo implements Writable {
return new ColocatePersistInfo(groupId, -1L, Maps.newHashMap());
}
public static ColocatePersistInfo createForRemoveTable(long tableId) {
return new ColocatePersistInfo(new GroupId(-1, -1), tableId, Maps.newHashMap());
}
public static ColocatePersistInfo read(DataInput in) throws IOException {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, ColocatePersistInfo.class);

View File

@ -18,10 +18,14 @@
package org.apache.doris.persist;
import org.apache.doris.catalog.ColocateTableIndex.GroupId;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonUtils;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import java.io.DataInput;
import java.io.DataOutput;
@ -32,15 +36,20 @@ import java.util.Map;
* PersistInfo for Table properties
*/
public class TablePropertyInfo implements Writable {
@SerializedName(value = "dbId")
private long dbId;
@SerializedName(value = "tableId")
private long tableId;
@SerializedName(value = "propertyMap")
private Map<String, String> propertyMap;
@SerializedName(value = "groupId")
private GroupId groupId;
public TablePropertyInfo() {
private TablePropertyInfo() {
}
public TablePropertyInfo(long tableId, GroupId groupId, Map<String, String> propertyMap) {
public TablePropertyInfo(long dbId, long tableId, GroupId groupId, Map<String, String> propertyMap) {
this.dbId = dbId;
this.tableId = tableId;
this.groupId = groupId;
this.propertyMap = propertyMap;
@ -50,6 +59,10 @@ public class TablePropertyInfo implements Writable {
return propertyMap;
}
public long getDbId() {
return dbId;
}
public long getTableId() {
return tableId;
}
@ -60,22 +73,22 @@ public class TablePropertyInfo implements Writable {
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(tableId);
if (groupId == null) {
out.writeBoolean(false);
Text.writeString(out, GsonUtils.GSON.toJson(this));
}
public static TablePropertyInfo read(DataInput in) throws IOException {
if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_119) {
TablePropertyInfo info = new TablePropertyInfo();
info.readFields(in);
return info;
} else {
out.writeBoolean(true);
groupId.write(out);
}
int size = propertyMap.size();
out.writeInt(size);
for (Map.Entry<String, String> kv : propertyMap.entrySet()) {
Text.writeString(out, kv.getKey());
Text.writeString(out, kv.getValue());
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, TablePropertyInfo.class);
}
}
public void readFields(DataInput in) throws IOException {
@Deprecated
private void readFields(DataInput in) throws IOException {
tableId = in.readLong();
if (in.readBoolean()) {
groupId = GroupId.read(in);
@ -102,13 +115,14 @@ public class TablePropertyInfo implements Writable {
TablePropertyInfo info = (TablePropertyInfo) obj;
return tableId == info.tableId && groupId.equals(info.groupId)
return dbId == info.dbId && tableId == info.tableId && groupId.equals(info.groupId)
&& propertyMap.equals(info.propertyMap);
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(" db id: ").append(dbId);
sb.append(" table id: ").append(tableId);
sb.append(" group id: ").append(groupId);
sb.append(" propertyMap: ").append(propertyMap);

View File

@ -592,7 +592,7 @@ public class DistributedPlanner {
}
}
//3 the join columns should contains all distribute columns to enable colocate join
//3 the join columns should contain all distribute columns to enable colocate join
if (leftJoinColumns.containsAll(leftDistributeColumns)
&& rightJoinColumns.containsAll(rightDistributeColumns)) {
return true;

View File

@ -129,7 +129,7 @@ public class ColocateTableTest {
Map<Tag, List<List<Long>>> backendIds = index.getBackendsPerBucketSeq(groupId);
Assert.assertEquals(1, backendIds.get(Tag.DEFAULT_BACKEND_TAG).get(0).size());
String fullGroupName = dbId + "_" + groupName;
String fullGroupName = GroupId.getFullGroupName(dbId, groupName);
Assert.assertEquals(tableId, index.getTableIdByGroup(fullGroupName));
ColocateGroupSchema groupSchema = index.getGroupSchema(fullGroupName);
Assert.assertNotNull(groupSchema);

View File

@ -17,66 +17,61 @@
package org.apache.doris.planner;
import org.apache.doris.analysis.CreateDbStmt;
import org.apache.doris.analysis.CreateTableStmt;
import org.apache.doris.catalog.ColocateGroupSchema;
import org.apache.doris.catalog.ColocateTableIndex;
import org.apache.doris.catalog.ColocateTableIndex.GroupId;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.jmockit.Deencapsulation;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.Coordinator;
import org.apache.doris.qe.QueryStatisticsItem;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.utframe.UtFrameUtils;
import org.apache.doris.utframe.TestWithFeService;
import org.apache.commons.lang.StringUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.jupiter.api.Test;
import java.io.File;
import java.util.List;
import java.util.UUID;
public class ColocatePlanTest {
public class ColocatePlanTest extends TestWithFeService {
public static final String COLOCATE_ENABLE = "COLOCATE";
private static String runningDir = "fe/mocked/DemoTest/" + UUID.randomUUID().toString() + "/";
private static ConnectContext ctx;
private static final String GLOBAL_GROUP = "__global__group1";
private static final String GLOBAL_GROUP2 = "__global__group2";
@BeforeClass
public static void setUp() throws Exception {
@Override
protected void runBeforeAll() throws Exception {
FeConstants.runningUnitTest = true;
UtFrameUtils.createDorisCluster(runningDir, 2);
ctx = UtFrameUtils.createDefaultCtx();
String createDbStmtStr = "create database db1;";
CreateDbStmt createDbStmt = (CreateDbStmt) UtFrameUtils.parseAndAnalyzeStmt(createDbStmtStr, ctx);
Env.getCurrentEnv().createDb(createDbStmt);
// create table test_colocate (k1 int ,k2 int, k3 int, k4 int)
// distributed by hash(k1, k2) buckets 10
// properties ("replication_num" = "2");
String createColocateTblStmtStr = "create table db1.test_colocate(k1 int, k2 int, k3 int, k4 int) "
createDatabase("db1");
createTable("create table db1.test_colocate(k1 int, k2 int, k3 int, k4 int) "
+ "distributed by hash(k1, k2) buckets 10 properties('replication_num' = '2',"
+ "'colocate_with' = 'group1');";
CreateTableStmt createColocateTableStmt = (CreateTableStmt) UtFrameUtils.parseAndAnalyzeStmt(createColocateTblStmtStr, ctx);
Env.getCurrentEnv().createTable(createColocateTableStmt);
String createTblStmtStr = "create table db1.test(k1 int, k2 int, k3 int, k4 int)"
+ "'colocate_with' = 'group1');");
createTable("create table db1.test(k1 int, k2 int, k3 int, k4 int)"
+ "partition by range(k1) (partition p1 values less than (\"1\"), partition p2 values less than (\"2\"))"
+ "distributed by hash(k1, k2) buckets 10 properties('replication_num' = '2')";
CreateTableStmt createTableStmt = (CreateTableStmt) UtFrameUtils.parseAndAnalyzeStmt(createTblStmtStr, ctx);
Env.getCurrentEnv().createTable(createTableStmt);
String createMultiPartitionTableStmt = "create table db1.test_multi_partition(k1 int, k2 int)"
+ "distributed by hash(k1, k2) buckets 10 properties('replication_num' = '2')");
createTable("create table db1.test_multi_partition(k1 int, k2 int)"
+ "partition by range(k1) (partition p1 values less than(\"1\"), partition p2 values less than (\"2\"))"
+ "distributed by hash(k2) buckets 10 properties ('replication_num' = '2', 'colocate_with' = 'group2')";
CreateTableStmt createMultiTableStmt = (CreateTableStmt) UtFrameUtils
.parseAndAnalyzeStmt(createMultiPartitionTableStmt, ctx);
Env.getCurrentEnv().createTable(createMultiTableStmt);
+ "distributed by hash(k2) buckets 10 properties ('replication_num' = '2', 'colocate_with' = 'group2')");
// global colocate tables
createDatabase("db2");
createTable("create table db1.test_global_colocate1(k1 varchar(10), k2 int, k3 int, k4 int) "
+ "distributed by hash(k1, k2) buckets 10 properties('replication_num' = '2',"
+ "'colocate_with' = '" + GLOBAL_GROUP + "');");
createTable("create table db2.test_global_colocate2(k1 varchar(20), k2 int, k3 int) "
+ "distributed by hash(k1, k2) buckets 10 properties('replication_num' = '2',"
+ "'colocate_with' = '" + GLOBAL_GROUP + "');");
createTable("create table db2.test_global_colocate3(k1 varchar(20), k2 int, k3 date) "
+ "partition by range(k3) (partition p1 values less than(\"2020-01-01\"), partition p2 values less than (\"2020-02-01\")) "
+ "distributed by hash(k1, k2) buckets 10 properties('replication_num' = '2',"
+ "'colocate_with' = '" + GLOBAL_GROUP + "');");
}
@AfterClass
public static void tearDown() {
File file = new File(runningDir);
file.delete();
@Override
protected int backendNum() {
return 2;
}
// without
@ -84,9 +79,9 @@ public class ColocatePlanTest {
// 2. join: src data has been redistributed
@Test
public void sqlDistributedSmallerThanData1() throws Exception {
String sql = "explain select * from (select k1 from db1.test_colocate group by k1) a , db1.test_colocate b "
+ "where a.k1=b.k1";
String plan1 = UtFrameUtils.getSQLPlanOrErrorMsg(ctx, sql);
String plan1 = getSQLPlanOrErrorMsg(
"explain select * from (select k1 from db1.test_colocate group by k1) a , db1.test_colocate b "
+ "where a.k1=b.k1");
Assert.assertEquals(2, StringUtils.countMatches(plan1, "AGGREGATE"));
Assert.assertTrue(plan1.contains(DistributedPlanColocateRule.REDISTRIBUTED_SRC_DATA));
}
@ -96,7 +91,7 @@ public class ColocatePlanTest {
public void sqlDistributedSmallerThanData2() throws Exception {
String sql = "explain select * from (select k1 from db1.test_colocate group by k1, k2) a , db1.test_colocate b "
+ "where a.k1=b.k1";
String plan1 = UtFrameUtils.getSQLPlanOrErrorMsg(ctx, sql);
String plan1 = getSQLPlanOrErrorMsg(sql);
Assert.assertTrue(plan1.contains(DistributedPlanColocateRule.INCONSISTENT_DISTRIBUTION_OF_TABLE_AND_QUERY));
}
@ -105,9 +100,10 @@ public class ColocatePlanTest {
// 2. hash columns = agg output columns = distributed columns
@Test
public void sqlAggAndJoinSameAsTableMeta() throws Exception {
String sql = "explain select * from (select k1, k2 from db1.test_colocate group by k1, k2) a , db1.test_colocate b "
+ "where a.k1=b.k1 and a.k2=b.k2";
String plan1 = UtFrameUtils.getSQLPlanOrErrorMsg(ctx, sql);
String sql =
"explain select * from (select k1, k2 from db1.test_colocate group by k1, k2) a , db1.test_colocate b "
+ "where a.k1=b.k1 and a.k2=b.k2";
String plan1 = getSQLPlanOrErrorMsg(sql);
Assert.assertEquals(1, StringUtils.countMatches(plan1, "AGGREGATE"));
Assert.assertTrue(plan1.contains(COLOCATE_ENABLE));
}
@ -119,7 +115,7 @@ public class ColocatePlanTest {
public void sqlAggAndJoinMoreThanTableMeta() throws Exception {
String sql = "explain select * from (select k1, k2, k3 from db1.test_colocate group by k1, k2, k3) a , "
+ "db1.test_colocate b where a.k1=b.k1 and a.k2=b.k2 and a.k3=b.k3";
String plan1 = UtFrameUtils.getSQLPlanOrErrorMsg(ctx, sql);
String plan1 = getSQLPlanOrErrorMsg(sql);
Assert.assertEquals(1, StringUtils.countMatches(plan1, "AGGREGATE"));
Assert.assertTrue(plan1.contains(COLOCATE_ENABLE));
}
@ -131,7 +127,7 @@ public class ColocatePlanTest {
public void sqlAggMoreThanTableMeta() throws Exception {
String sql = "explain select * from (select k1, k2, k3 from db1.test_colocate group by k1, k2, k3) a , "
+ "db1.test_colocate b where a.k1=b.k1 and a.k2=b.k2";
String plan1 = UtFrameUtils.getSQLPlanOrErrorMsg(ctx, sql);
String plan1 = getSQLPlanOrErrorMsg(sql);
Assert.assertEquals(1, StringUtils.countMatches(plan1, "AGGREGATE"));
Assert.assertTrue(plan1.contains(COLOCATE_ENABLE));
}
@ -144,7 +140,7 @@ public class ColocatePlanTest {
@Test
public void sqlAggWithNonColocateTable() throws Exception {
String sql = "explain select k1, k2 from db1.test group by k1, k2";
String plan1 = UtFrameUtils.getSQLPlanOrErrorMsg(ctx, sql);
String plan1 = getSQLPlanOrErrorMsg(sql);
Assert.assertEquals(2, StringUtils.countMatches(plan1, "AGGREGATE"));
Assert.assertFalse(plan1.contains(COLOCATE_ENABLE));
}
@ -156,7 +152,7 @@ public class ColocatePlanTest {
@Test
public void sqlAggWithColocateTable() throws Exception {
String sql = "select k1, k2, count(*) from db1.test_multi_partition where k2 = 1 group by k1, k2";
StmtExecutor executor = UtFrameUtils.getSqlStmtExecutor(ctx, sql);
StmtExecutor executor = getSqlStmtExecutor(sql);
Planner planner = executor.planner();
Coordinator coordinator = Deencapsulation.getField(executor, "coord");
List<ScanNode> scanNodeList = planner.getScanNodes();
@ -173,8 +169,9 @@ public class ColocatePlanTest {
@Test
public void checkColocatePlanFragment() throws Exception {
String sql = "select a.k1 from db1.test_colocate a, db1.test_colocate b where a.k1=b.k1 and a.k2=b.k2 group by a.k1;";
StmtExecutor executor = UtFrameUtils.getSqlStmtExecutor(ctx, sql);
String sql
= "select a.k1 from db1.test_colocate a, db1.test_colocate b where a.k1=b.k1 and a.k2=b.k2 group by a.k1;";
StmtExecutor executor = getSqlStmtExecutor(sql);
Planner planner = executor.planner();
Coordinator coordinator = Deencapsulation.getField(executor, "coord");
boolean isColocateFragment0 = Deencapsulation.invoke(coordinator, "isColocateFragment",
@ -190,18 +187,120 @@ public class ColocatePlanTest {
public void rollupAndMoreThanOneInstanceWithoutColocate() throws Exception {
String createColocateTblStmtStr = "create table db1.test_colocate_one_backend(k1 int, k2 int, k3 int, k4 int) "
+ "distributed by hash(k1, k2, k3) buckets 10 properties('replication_num' = '1');";
CreateTableStmt createColocateTableStmt = (CreateTableStmt) UtFrameUtils.parseAndAnalyzeStmt(createColocateTblStmtStr, ctx);
Env.getCurrentEnv().createTable(createColocateTableStmt);
createTable(createColocateTblStmtStr);
String sql = "select a.k1, a.k2, sum(a.k3) "
+ "from db1.test_colocate_one_backend a join[shuffle] db1.test_colocate_one_backend b on a.k1=b.k1 "
+ "group by rollup(a.k1, a.k2);";
Deencapsulation.setField(ctx.getSessionVariable(), "parallelExecInstanceNum", 2);
String plan1 = UtFrameUtils.getSQLPlanOrErrorMsg(ctx, sql);
Deencapsulation.setField(connectContext.getSessionVariable(), "parallelExecInstanceNum", 2);
String plan1 = getSQLPlanOrErrorMsg(sql);
Assert.assertEquals(2, StringUtils.countMatches(plan1, "AGGREGATE"));
Assert.assertEquals(5, StringUtils.countMatches(plan1, "PLAN FRAGMENT"));
}
@Test
public void testGlobalColocateGroup() throws Exception {
Database db1 = Env.getCurrentEnv().getInternalCatalog().getDbNullable("default_cluster:db1");
Database db2 = Env.getCurrentEnv().getInternalCatalog().getDbNullable("default_cluster:db2");
OlapTable tbl1 = (OlapTable) db1.getTableNullable("test_global_colocate1");
OlapTable tbl2 = (OlapTable) db2.getTableNullable("test_global_colocate2");
OlapTable tbl3 = (OlapTable) db2.getTableNullable("test_global_colocate3");
String sql = "explain select * from (select k1, k2 from "
+ "db1.test_global_colocate1 group by k1, k2) a , db2.test_global_colocate2 b "
+ "where a.k1=b.k1 and a.k2=b.k2";
String plan1 = getSQLPlanOrErrorMsg(sql);
Assert.assertEquals(1, StringUtils.countMatches(plan1, "AGGREGATE"));
Assert.assertTrue(plan1.contains(COLOCATE_ENABLE));
ColocateTableIndex colocateTableIndex = Env.getCurrentColocateIndex();
ColocateGroupSchema groupSchema = colocateTableIndex.getGroupSchema(
GroupId.getFullGroupName(1000, GLOBAL_GROUP));
Assert.assertNotNull(groupSchema);
GroupId groupId = groupSchema.getGroupId();
List<Long> tableIds = colocateTableIndex.getAllTableIds(groupId);
Assert.assertEquals(3, tableIds.size());
Assert.assertTrue(tableIds.contains(tbl1.getId()));
Assert.assertTrue(tableIds.contains(tbl2.getId()));
Assert.assertTrue(tableIds.contains(tbl3.getId()));
Assert.assertEquals(3, groupId.getTblId2DbIdSize());
Assert.assertEquals(db1.getId(), groupId.getDbIdByTblId(tbl1.getId()));
Assert.assertEquals(db2.getId(), groupId.getDbIdByTblId(tbl2.getId()));
Assert.assertEquals(db2.getId(), groupId.getDbIdByTblId(tbl3.getId()));
sql = "explain select * from (select k1, k2 from "
+ "db1.test_global_colocate1 group by k1, k2) a , db2.test_global_colocate3 b "
+ "where a.k1=b.k1 and a.k2=b.k2";
plan1 = getSQLPlanOrErrorMsg(sql);
Assert.assertEquals(1, StringUtils.countMatches(plan1, "AGGREGATE"));
Assert.assertTrue(plan1.contains(COLOCATE_ENABLE));
String addPartitionStmt
= "alter table db2.test_global_colocate3 add partition p3 values less than (\"2020-03-01\");";
alterTableSync(addPartitionStmt);
try {
createTable("create table db1.test_global_colocate4(k1 int, k2 int, k3 int, k4 int) "
+ "distributed by hash(k1, k2) buckets 10 properties('replication_num' = '2',"
+ "'colocate_with' = '" + GLOBAL_GROUP + "');");
Assert.fail();
} catch (Exception e) {
e.printStackTrace();
Assert.assertTrue(
e.getMessage().contains("Colocate tables distribution columns must have the same data type"));
List<Long> tmpTableIds = colocateTableIndex.getAllTableIds(groupId);
Assert.assertEquals(3, tmpTableIds.size());
Assert.assertTrue(tmpTableIds.contains(tbl1.getId()));
Assert.assertTrue(tmpTableIds.contains(tbl2.getId()));
Assert.assertTrue(tmpTableIds.contains(tbl3.getId()));
Assert.assertEquals(3, groupId.getTblId2DbIdSize());
Assert.assertEquals(db1.getId(), groupId.getDbIdByTblId(tbl1.getId()));
Assert.assertEquals(db2.getId(), groupId.getDbIdByTblId(tbl2.getId()));
Assert.assertEquals(db2.getId(), groupId.getDbIdByTblId(tbl3.getId()));
}
// modify table's colocate group
String modifyStmt = "alter table db2.test_global_colocate3 set ('colocate_with' = '');";
alterTableSync(modifyStmt);
tableIds = colocateTableIndex.getAllTableIds(groupId);
Assert.assertEquals(2, tableIds.size());
Assert.assertTrue(tableIds.contains(tbl1.getId()));
Assert.assertTrue(tableIds.contains(tbl2.getId()));
Assert.assertEquals(2, groupId.getTblId2DbIdSize());
Assert.assertEquals(db1.getId(), groupId.getDbIdByTblId(tbl1.getId()));
Assert.assertEquals(db2.getId(), groupId.getDbIdByTblId(tbl2.getId()));
// change table's colocate group
modifyStmt = "alter table db2.test_global_colocate2 set ('colocate_with' = '" + GLOBAL_GROUP2 + "');";
alterTableSync(modifyStmt);
tableIds = colocateTableIndex.getAllTableIds(groupId);
Assert.assertEquals(1, tableIds.size());
Assert.assertTrue(tableIds.contains(tbl1.getId()));
Assert.assertEquals(1, groupId.getTblId2DbIdSize());
Assert.assertEquals(db1.getId(), groupId.getDbIdByTblId(tbl1.getId()));
GroupId groupId2 = colocateTableIndex.getGroupSchema(
GroupId.getFullGroupName(1000, GLOBAL_GROUP2)).getGroupId();
tableIds = colocateTableIndex.getAllTableIds(groupId2);
Assert.assertEquals(1, tableIds.size());
Assert.assertTrue(tableIds.contains(tbl2.getId()));
Assert.assertEquals(1, groupId2.getTblId2DbIdSize());
Assert.assertEquals(db2.getId(), groupId2.getDbIdByTblId(tbl2.getId()));
// checkpoint
// Get currentCatalog first
Env currentEnv = Env.getCurrentEnv();
// Save real ckptThreadId
long ckptThreadId = currentEnv.getCheckpointer().getId();
try {
// set checkpointThreadId to current thread id, so that when do checkpoint manually here,
// the Catalog.isCheckpointThread() will return true.
Deencapsulation.setField(Env.class, "checkpointThreadId", Thread.currentThread().getId());
currentEnv.getCheckpointer().doCheckpoint();
} catch (Throwable e) {
e.printStackTrace();
Assert.fail(e.getMessage());
} finally {
// Restore the ckptThreadId
Deencapsulation.setField(Env.class, "checkpointThreadId", ckptThreadId);
}
}
}

View File

@ -617,6 +617,12 @@ public abstract class TestWithFeService {
Thread.sleep(100);
}
protected void alterTableSync(String sql) throws Exception {
AlterTableStmt alterTableStmt = (AlterTableStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, connectContext);
Env.getCurrentEnv().alterTable(alterTableStmt);
Thread.sleep(100);
}
protected void createMv(String sql) throws Exception {
CreateMaterializedViewStmt createMaterializedViewStmt =
(CreateMaterializedViewStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, connectContext);