[Bug][Refactor] Fix the conflict of temp partition and dynamic partition operations (#3201)

The bug is described in issue: #3200.

This CL solve the problem by:
1. Refactor the alter operation conflict checking logic by introducing new classes `AlterOperations` and `AlterOpType`.
2. Allow add/drop temporary partition when dynamic partition feature is enabled.
3. Allow modifying table's property when there is temporary partition in table.
4. Make the properties `dynamic_partition.enable` optional, and default is true.
This commit is contained in:
Mingyu Chen
2020-03-27 20:25:15 +08:00
committed by GitHub
parent c1969a3fb3
commit aa8b2f86c4
40 changed files with 552 additions and 277 deletions

View File

@ -0,0 +1,190 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.alter;
import org.apache.doris.analysis.AlterTableStmt;
import org.apache.doris.analysis.CreateDbStmt;
import org.apache.doris.analysis.CreateTableStmt;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.utframe.UtFrameUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.File;
import java.util.Map;
import java.util.UUID;
public class AlterTest {
private static String runningDir = "fe/mocked/AlterTest/" + UUID.randomUUID().toString() + "/";
private static ConnectContext connectContext;
@BeforeClass
public static void beforeClass() throws Exception {
FeConstants.runningUnitTest = true;
FeConstants.default_scheduler_interval_millisecond = 100;
Config.dynamic_partition_enable = true;
UtFrameUtils.createMinDorisCluster(runningDir);
// create connect context
connectContext = UtFrameUtils.createDefaultCtx();
// create database
String createDbStmtStr = "create database test;";
CreateDbStmt createDbStmt = (CreateDbStmt) UtFrameUtils.parseAndAnalyzeStmt(createDbStmtStr, connectContext);
Catalog.getCurrentCatalog().createDb(createDbStmt);
createTable("CREATE TABLE test.tbl1\n" +
"(\n" +
" k1 date,\n" +
" k2 int,\n" +
" v1 int sum\n" +
")\n" +
"PARTITION BY RANGE(k1)\n" +
"(\n" +
" PARTITION p1 values less than('2020-02-01'),\n" +
" PARTITION p2 values less than('2020-03-01')\n" +
")\n" +
"DISTRIBUTED BY HASH(k2) BUCKETS 3\n" +
"PROPERTIES('replication_num' = '1');");
}
@AfterClass
public static void tearDown() {
File file = new File(runningDir);
file.delete();
}
private static void createTable(String sql) throws Exception {
CreateTableStmt createTableStmt = (CreateTableStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, connectContext);
Catalog.getCurrentCatalog().createTable(createTableStmt);
}
private static void alterTable(String sql, boolean expectedException) throws Exception {
AlterTableStmt alterTableStmt = (AlterTableStmt) UtFrameUtils.parseAndAnalyzeStmt(sql, connectContext);
try {
Catalog.getCurrentCatalog().alterTable(alterTableStmt);
if (expectedException) {
Assert.fail();
}
} catch (Exception e) {
e.printStackTrace();
if (!expectedException) {
Assert.fail();
}
}
}
@Test
public void testConflictAlterOperations() throws Exception {
String stmt = "alter table test.tbl1 add partition p3 values less than('2020-04-01'), add partition p4 values less than('2020-05-01')";
alterTable(stmt, true);
stmt = "alter table test.tbl1 add partition p3 values less than('2020-04-01'), drop partition p4";
alterTable(stmt, true);
stmt = "alter table test.tbl1 drop partition p3, drop partition p4";
alterTable(stmt, true);
stmt = "alter table test.tbl1 drop partition p3, add column k3 int";
alterTable(stmt, true);
// no conflict
stmt = "alter table test.tbl1 add column k3 int, add column k4 int";
alterTable(stmt, false);
waitSchemaChangeJobDone(false);
stmt = "alter table test.tbl1 add rollup r1 (k1)";
alterTable(stmt, false);
waitSchemaChangeJobDone(true);
stmt = "alter table test.tbl1 add rollup r2 (k1), r3 (k1)";
alterTable(stmt, false);
waitSchemaChangeJobDone(true);
// enable dynamic partition
stmt = "alter table test.tbl1 set (\n" +
"'dynamic_partition.enable' = 'true',\n" +
"'dynamic_partition.time_unit' = 'DAY',\n" +
"'dynamic_partition.start' = '-3',\n" +
"'dynamic_partition.end' = '3',\n" +
"'dynamic_partition.prefix' = 'p',\n" +
"'dynamic_partition.buckets' = '3'\n" +
" );";
alterTable(stmt, false);
Database db = Catalog.getCurrentCatalog().getDb("default_cluster:test");
OlapTable tbl = (OlapTable)db.getTable("tbl1");
Assert.assertTrue(tbl.getTableProperty().getDynamicPartitionProperty().getEnable());
Assert.assertEquals(4, tbl.getIndexIdToSchema().size());
// add partition when dynamic partition is enable
stmt = "alter table test.tbl1 add partition p3 values less than('2020-04-01') distributed by hash(k2) buckets 4 PROPERTIES ('replication_num' = '1')";
alterTable(stmt, true);
// add temp partition when dynamic partition is enable
stmt = "alter table test.tbl1 add temporary partition tp3 values less than('2020-04-01') distributed by hash(k2) buckets 4 PROPERTIES ('replication_num' = '1')";
alterTable(stmt, false);
Assert.assertEquals(1, tbl.getTempPartitions().size());
// disable the dynamic partition
stmt = "alter table test.tbl1 set ('dynamic_partition.enable' = 'false')";
alterTable(stmt, false);
Assert.assertFalse(tbl.getTableProperty().getDynamicPartitionProperty().getEnable());
// add partition when dynamic partition is disable
stmt = "alter table test.tbl1 add partition p3 values less than('2020-04-01') distributed by hash(k2) buckets 4";
alterTable(stmt, false);
// set table's default replication num
Assert.assertEquals(Short.valueOf("1"), tbl.getDefaultReplicationNum());
stmt = "alter table test.tbl1 set ('default.replication_num' = '3');";
alterTable(stmt, false);
Assert.assertEquals(Short.valueOf("3"), tbl.getDefaultReplicationNum());
// add partition without set replication num
stmt = "alter table test.tbl1 add partition p4 values less than('2020-05-01')";
alterTable(stmt, true);
// add partition when dynamic partition is disable
stmt = "alter table test.tbl1 add partition p4 values less than('2020-05-01') ('replication_num' = '1')";
alterTable(stmt, false);
}
private void waitSchemaChangeJobDone(boolean rollupJob) throws InterruptedException {
Map<Long, AlterJobV2> alterJobs = Catalog.getCurrentCatalog().getSchemaChangeHandler().getAlterJobsV2();
if (rollupJob) {
alterJobs = Catalog.getCurrentCatalog().getRollupHandler().getAlterJobsV2();
}
for (AlterJobV2 alterJobV2 : alterJobs.values()) {
while (!alterJobV2.getJobState().isFinalState()) {
System.out.println("alter job " + alterJobV2.getJobId() + " is running. state: " + alterJobV2.getJobState());
Thread.sleep(1000);
}
System.out.println(alterJobV2.getType() + " alter job " + alterJobV2.getJobId() + " is done. state: " + alterJobV2.getJobState());
Assert.assertEquals(AlterJobV2.JobState.FINISHED, alterJobV2.getJobState());
}
}
}

View File

@ -1,10 +1,5 @@
package org.apache.doris.catalog;
import com.google.common.collect.Lists;
import mockit.Expectations;
import mockit.Injectable;
import mockit.Mock;
import mockit.MockUp;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.ColumnDef;
import org.apache.doris.analysis.CreateTableStmt;
@ -24,6 +19,9 @@ import org.apache.doris.persist.EditLog;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.task.AgentBatchTask;
import com.google.common.collect.Lists;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@ -36,6 +34,11 @@ import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import mockit.Expectations;
import mockit.Injectable;
import mockit.Mock;
import mockit.MockUp;
public class DynamicPartitionTableTest {
private TableName dbTableName;
private String dbName = "testDb";
@ -153,53 +156,6 @@ public class DynamicPartitionTableTest {
catalog.createTable(stmt);
}
@Test
public void testMissEnable(@Injectable SystemInfoService systemInfoService,
@Injectable PaloAuth paloAuth,
@Injectable EditLog editLog) throws UserException {
new Expectations(catalog) {
{
catalog.getDb(dbTableName.getDb());
minTimes = 0;
result = db;
Catalog.getCurrentSystemInfo();
minTimes = 0;
result = systemInfoService;
systemInfoService.checkClusterCapacity(anyString);
minTimes = 0;
systemInfoService.seqChooseBackendIds(anyInt, true, true, anyString);
minTimes = 0;
result = beIds;
catalog.getAuth();
minTimes = 0;
result = paloAuth;
paloAuth.checkTblPriv((ConnectContext) any, anyString, anyString, PrivPredicate.CREATE);
minTimes = 0;
result = true;
catalog.getEditLog();
minTimes = 0;
result = editLog;
}
};
properties.remove(DynamicPartitionProperty.ENABLE);
CreateTableStmt stmt = new CreateTableStmt(false, false, dbTableName, columnDefs, "olap",
new KeysDesc(KeysType.AGG_KEYS, columnNames),
new RangePartitionDesc(Lists.newArrayList("key1"), singleRangePartitionDescs),
new HashDistributionDesc(1, Lists.newArrayList("key1")), properties, null, "");
stmt.analyze(analyzer);
expectedEx.expect(DdlException.class);
expectedEx.expectMessage("Must assign dynamic_partition.enable properties");
catalog.createTable(stmt);
}
@Test
public void testMissPrefix(@Injectable SystemInfoService systemInfoService,
@Injectable PaloAuth paloAuth,

View File

@ -109,10 +109,10 @@ public class DemoTest {
Assert.assertEquals(1, alterJobs.size());
for (AlterJobV2 alterJobV2 : alterJobs.values()) {
while (!alterJobV2.getJobState().isFinalState()) {
System.out.println("alter job " + alterJobV2.getDbId() + " is running. state: " + alterJobV2.getJobState());
System.out.println("alter job " + alterJobV2.getJobId() + " is running. state: " + alterJobV2.getJobState());
Thread.sleep(1000);
}
System.out.println("alter job " + alterJobV2.getDbId() + " is done. state: " + alterJobV2.getJobState());
System.out.println("alter job " + alterJobV2.getJobId() + " is done. state: " + alterJobV2.getJobState());
Assert.assertEquals(AlterJobV2.JobState.FINISHED, alterJobV2.getJobState());
}
db.readLock();